1use std::collections::HashSet;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use ave_actors::ActorPath;
6use ave_actors::{
7 Actor, ActorContext, ActorError, ChildAction, Handler, Message,
8 NotPersistentActor,
9};
10
11use ave_common::identity::{
12 CryptoError, DigestIdentifier, HashAlgorithm, PublicKey, Signature, Signed,
13 hash_borsh,
14};
15
16use request::ApprovalReq;
17use response::ApprovalRes;
18
19use tracing::{Span, debug, error, info_span, warn};
20
21use crate::approval::light::{ApprLight, ApprLightMessage};
22use crate::approval::persist::{ApprPersist, ApprPersistMessage};
23use crate::governance::model::Quorum;
24use crate::helpers::network::service::NetworkSender;
25use crate::model::common::emit_fail;
26
27use crate::model::event::ApprovalData;
28use crate::model::network::TimeOut;
29
30use crate::request::manager::{RequestManager, RequestManagerMessage};
31
32pub mod light;
33pub mod persist;
34pub mod request;
35pub mod response;
36pub mod types;
37
38#[derive(Clone, Debug)]
39pub struct Approval {
40 hash: HashAlgorithm,
41 network: Arc<NetworkSender>,
42 our_key: Arc<PublicKey>,
43 quorum: Quorum,
44 request_id: DigestIdentifier,
45 version: u64,
46 request: Signed<ApprovalReq>,
47 approvers: HashSet<PublicKey>,
48 approvers_timeout: Vec<TimeOut>,
49 approvers_agrees: Vec<Signature>,
50 approvers_disagrees: Vec<Signature>,
51 approval_req_hash: DigestIdentifier,
52 approvers_quantity: u32,
53}
54
55impl Approval {
56 pub fn new(
57 our_key: Arc<PublicKey>,
58 request: Signed<ApprovalReq>,
59 quorum: Quorum,
60 approvers: HashSet<PublicKey>,
61 hash: HashAlgorithm,
62 network: Arc<NetworkSender>,
63 ) -> Self {
64 Self {
65 hash,
66 network,
67 our_key,
68 quorum,
69 request,
70 approvers_quantity: approvers.len() as u32,
71 approvers,
72 request_id: DigestIdentifier::default(),
73 version: 0,
74 approvers_timeout: vec![],
75 approvers_agrees: vec![],
76 approvers_disagrees: vec![],
77 approval_req_hash: DigestIdentifier::default(),
78 }
79 }
80
81 async fn create_approvers(
82 &self,
83 ctx: &mut ActorContext<Self>,
84 signer: PublicKey,
85 ) -> Result<(), ActorError> {
86 let subject_id = self.request.content().subject_id.to_string();
87
88 if signer == *self.our_key {
89 let approver_path =
90 ActorPath::from(format!("/user/node/{}/approver", subject_id));
91 let approver_actor = ctx
92 .system()
93 .get_actor::<ApprPersist>(&approver_path)
94 .await?;
95 approver_actor
96 .tell(ApprPersistMessage::LocalApproval {
97 request_id: self.request_id.clone(),
98 version: self.version,
99 approval_req: self.request.clone(),
100 })
101 .await?
102 } else {
103 let child = ctx
105 .create_child(
106 &signer.to_string(),
107 ApprLight::new(
108 self.network.clone(),
109 signer.clone(),
110 self.request_id.clone(),
111 self.version,
112 ),
113 )
114 .await?;
115
116 child
117 .tell(ApprLightMessage::NetworkApproval {
118 approval_req: self.request.clone(),
119 })
120 .await?;
121 }
122
123 Ok(())
124 }
125 fn check_approval(&mut self, approver: PublicKey) -> bool {
126 self.approvers.remove(&approver)
127 }
128
129 async fn send_approval_to_req(
130 &self,
131 ctx: &ActorContext<Self>,
132 response: bool,
133 ) -> Result<(), ActorError> {
134 let req_actor = ctx.get_parent::<RequestManager>().await?;
135
136 req_actor
137 .tell(RequestManagerMessage::ApprovalRes {
138 request_id: self.request_id.clone(),
139 appro_res: ApprovalData {
140 approval_req_signature: self.request.signature().clone(),
141 approval_req_hash: self.approval_req_hash.clone(),
142 approvers_agrees_signatures: self.approvers_agrees.clone(),
143 approvers_disagrees_signatures: self
144 .approvers_disagrees
145 .clone(),
146 approvers_timeout: self.approvers_timeout.clone(),
147 approved: response,
148 },
149 })
150 .await
151 }
152
153 fn create_appro_req_hash(&self) -> Result<DigestIdentifier, CryptoError> {
154 hash_borsh(&*self.hash.hasher(), &self.request)
155 }
156}
157
158#[derive(Debug, Clone)]
159pub enum ApprovalMessage {
160 Create {
161 request_id: DigestIdentifier,
162 version: u64,
163 },
164 Response {
165 approval_res: ApprovalRes,
166 sender: PublicKey,
167 signature: Option<Signature>,
168 },
169}
170
171impl Message for ApprovalMessage {}
172
173#[async_trait]
174impl Actor for Approval {
175 type Event = ();
176 type Message = ApprovalMessage;
177 type Response = ();
178
179 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
180 parent_span.map_or_else(
181 || info_span!("Approval"),
182 |parent_span| info_span!(parent: parent_span, "Approval"),
183 )
184 }
185}
186
187#[async_trait]
188impl Handler<Self> for Approval {
189 async fn handle_message(
190 &mut self,
191 __sender: ActorPath,
192 msg: ApprovalMessage,
193 ctx: &mut ActorContext<Self>,
194 ) -> Result<(), ActorError> {
195 match msg {
196 ApprovalMessage::Create {
197 request_id,
198 version,
199 } => {
200 let approval_req_hash = match self.create_appro_req_hash() {
201 Ok(digest) => digest,
202 Err(e) => {
203 error!(
204 msg_type = "Create",
205 error = %e,
206 "Failed to create approval request hash"
207 );
208 return Err(emit_fail(
209 ctx,
210 ActorError::FunctionalCritical {
211 description: format!(
212 "Cannot create approval request hash: {}",
213 e
214 ),
215 },
216 )
217 .await);
218 }
219 };
220
221 self.approval_req_hash = approval_req_hash;
222 self.request_id = request_id.clone();
223 self.version = version;
224
225 for signer in self.approvers.clone() {
226 if let Err(e) =
227 self.create_approvers(ctx, signer.clone()).await
228 {
229 error!(
230 msg_type = "Create",
231 error = %e,
232 signer = %signer,
233 "Failed to create approver actor"
234 );
235 return Err(emit_fail(ctx, e).await);
236 }
237 }
238
239 debug!(
240 msg_type = "Create",
241 request_id = %request_id,
242 version = version,
243 approvers_count = self.approvers_quantity,
244 "Approval created and approvers initialized"
245 );
246 }
247 ApprovalMessage::Response {
248 approval_res,
249 sender,
250 signature,
251 } => {
252 if self.check_approval(sender.clone()) {
253 match approval_res.clone() {
254 ApprovalRes::Response {
255 approval_req_hash,
256 agrees,
257 ..
258 } => {
259 if approval_req_hash != self.approval_req_hash {
260 error!(
261 msg_type = "Response",
262 expected_hash = %self.approval_req_hash,
263 received_hash = %approval_req_hash,
264 "Invalid approval request hash"
265 );
266 return Err(ActorError::Functional {
267 description: "Approval Response, Invalid approval request hash".to_owned(),
268 });
269 }
270
271 let Some(signature) = signature else {
272 error!(
273 msg_type = "Response",
274 sender = %sender,
275 "Approval response without signature"
276 );
277 return Err(ActorError::Functional {
278 description: "Approval Response solver without signature".to_owned(),
279 });
280 };
281
282 if agrees {
283 self.approvers_agrees.push(signature);
284 } else {
285 self.approvers_disagrees.push(signature);
286 }
287 }
288 ApprovalRes::TimeOut(approval_time_out) => {
289 self.approvers_timeout.push(approval_time_out);
290 }
291 };
292
293 if self.quorum.check_quorum(
295 self.approvers_quantity,
296 self.approvers_agrees.len() as u32
297 + self.approvers_timeout.len() as u32,
298 ) {
299 if let Err(e) =
300 self.send_approval_to_req(ctx, true).await
301 {
302 error!(
303 msg_type = "Response",
304 error = %e,
305 "Failed to send approval response to request actor"
306 );
307 return Err(emit_fail(ctx, e).await);
308 };
309
310 debug!(
311 msg_type = "Response",
312 agrees = self.approvers_agrees.len(),
313 disagrees = self.approvers_disagrees.len(),
314 timeouts = self.approvers_timeout.len(),
315 "Quorum reached, approval accepted"
316 );
317 } else if self.approvers.is_empty()
318 && let Err(e) =
319 self.send_approval_to_req(ctx, false).await
320 {
321 error!(
322 msg_type = "Response",
323 error = %e,
324 "Failed to send approval response to request actor"
325 );
326 return Err(emit_fail(ctx, e).await);
327 } else if self.approvers.is_empty() {
328 debug!(
329 msg_type = "Response",
330 agrees = self.approvers_agrees.len(),
331 disagrees = self.approvers_disagrees.len(),
332 timeouts = self.approvers_timeout.len(),
333 "All approvers responded, approval rejected"
334 );
335 }
336 } else {
337 warn!(
338 msg_type = "Response",
339 sender = %sender,
340 "Response from unexpected sender"
341 );
342 }
343 }
344 }
345 Ok(())
346 }
347
348 async fn on_child_fault(
349 &mut self,
350 error: ActorError,
351 ctx: &mut ActorContext<Self>,
352 ) -> ChildAction {
353 error!(
354 request_id = %self.request_id,
355 version = self.version,
356 error = %error,
357 "Child fault in approval actor"
358 );
359 emit_fail(ctx, error).await;
360 ChildAction::Stop
361 }
362}
363
364impl NotPersistentActor for Approval {}