Skip to main content

ave_core/approval/
mod.rs

1use std::collections::HashSet;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use ave_actors::ActorPath;
6use ave_actors::{
7    Actor, ActorContext, ActorError, ChildAction, Handler, Message,
8    NotPersistentActor,
9};
10
11use ave_common::identity::{
12    CryptoError, DigestIdentifier, HashAlgorithm, PublicKey, Signature, Signed,
13    hash_borsh,
14};
15
16use request::ApprovalReq;
17use response::ApprovalRes;
18
19use tracing::{Span, debug, error, info_span, warn};
20
21use crate::approval::light::{ApprLight, ApprLightMessage};
22use crate::approval::persist::{ApprPersist, ApprPersistMessage};
23use crate::governance::model::Quorum;
24use crate::helpers::network::service::NetworkSender;
25use crate::model::common::emit_fail;
26
27use crate::model::event::ApprovalData;
28use crate::model::network::TimeOut;
29
30use crate::request::manager::{RequestManager, RequestManagerMessage};
31
32pub mod light;
33pub mod persist;
34pub mod request;
35pub mod response;
36pub mod types;
37
38#[derive(Clone, Debug)]
39pub struct Approval {
40    hash: HashAlgorithm,
41    network: Arc<NetworkSender>,
42    our_key: Arc<PublicKey>,
43    quorum: Quorum,
44    request_id: DigestIdentifier,
45    version: u64,
46    request: Signed<ApprovalReq>,
47    approvers: HashSet<PublicKey>,
48    approvers_timeout: Vec<TimeOut>,
49    approvers_agrees: Vec<Signature>,
50    approvers_disagrees: Vec<Signature>,
51    approval_req_hash: DigestIdentifier,
52    approvers_quantity: u32,
53}
54
55impl Approval {
56    pub fn new(
57        our_key: Arc<PublicKey>,
58        request: Signed<ApprovalReq>,
59        quorum: Quorum,
60        approvers: HashSet<PublicKey>,
61        hash: HashAlgorithm,
62        network: Arc<NetworkSender>,
63    ) -> Self {
64        Self {
65            hash,
66            network,
67            our_key,
68            quorum,
69            request,
70            approvers_quantity: approvers.len() as u32,
71            approvers,
72            request_id: DigestIdentifier::default(),
73            version: 0,
74            approvers_timeout: vec![],
75            approvers_agrees: vec![],
76            approvers_disagrees: vec![],
77            approval_req_hash: DigestIdentifier::default(),
78        }
79    }
80
81    async fn create_approvers(
82        &self,
83        ctx: &mut ActorContext<Self>,
84        signer: PublicKey,
85    ) -> Result<(), ActorError> {
86        let subject_id = self.request.content().subject_id.to_string();
87
88        if signer == *self.our_key {
89            let approver_path =
90                ActorPath::from(format!("/user/node/{}/approver", subject_id));
91            let approver_actor = ctx
92                .system()
93                .get_actor::<ApprPersist>(&approver_path)
94                .await?;
95            approver_actor
96                .tell(ApprPersistMessage::LocalApproval {
97                    request_id: self.request_id.clone(),
98                    version: self.version,
99                    approval_req: self.request.clone(),
100                })
101                .await?
102        } else {
103            // Create Approvers child
104            let child = ctx
105                .create_child(
106                    &signer.to_string(),
107                    ApprLight::new(
108                        self.network.clone(),
109                        signer.clone(),
110                        self.request_id.clone(),
111                        self.version,
112                    ),
113                )
114                .await?;
115
116            child
117                .tell(ApprLightMessage::NetworkApproval {
118                    approval_req: self.request.clone(),
119                })
120                .await?;
121        }
122
123        Ok(())
124    }
125    fn check_approval(&mut self, approver: PublicKey) -> bool {
126        self.approvers.remove(&approver)
127    }
128
129    async fn send_approval_to_req(
130        &self,
131        ctx: &ActorContext<Self>,
132        response: bool,
133    ) -> Result<(), ActorError> {
134        let req_actor = ctx.get_parent::<RequestManager>().await?;
135
136        req_actor
137            .tell(RequestManagerMessage::ApprovalRes {
138                request_id: self.request_id.clone(),
139                appro_res: ApprovalData {
140                    approval_req_signature: self.request.signature().clone(),
141                    approval_req_hash: self.approval_req_hash.clone(),
142                    approvers_agrees_signatures: self.approvers_agrees.clone(),
143                    approvers_disagrees_signatures: self
144                        .approvers_disagrees
145                        .clone(),
146                    approvers_timeout: self.approvers_timeout.clone(),
147                    approved: response,
148                },
149            })
150            .await
151    }
152
153    fn create_appro_req_hash(&self) -> Result<DigestIdentifier, CryptoError> {
154        hash_borsh(&*self.hash.hasher(), &self.request)
155    }
156}
157
158#[derive(Debug, Clone)]
159pub enum ApprovalMessage {
160    Create {
161        request_id: DigestIdentifier,
162        version: u64,
163    },
164    Response {
165        approval_res: ApprovalRes,
166        sender: PublicKey,
167        signature: Option<Signature>,
168    },
169}
170
171impl Message for ApprovalMessage {}
172
173#[async_trait]
174impl Actor for Approval {
175    type Event = ();
176    type Message = ApprovalMessage;
177    type Response = ();
178
179    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
180        parent_span.map_or_else(
181            || info_span!("Approval"),
182            |parent_span| info_span!(parent: parent_span, "Approval"),
183        )
184    }
185}
186
187#[async_trait]
188impl Handler<Self> for Approval {
189    async fn handle_message(
190        &mut self,
191        __sender: ActorPath,
192        msg: ApprovalMessage,
193        ctx: &mut ActorContext<Self>,
194    ) -> Result<(), ActorError> {
195        match msg {
196            ApprovalMessage::Create {
197                request_id,
198                version,
199            } => {
200                let approval_req_hash = match self.create_appro_req_hash() {
201                    Ok(digest) => digest,
202                    Err(e) => {
203                        error!(
204                            msg_type = "Create",
205                            error = %e,
206                            "Failed to create approval request hash"
207                        );
208                        return Err(emit_fail(
209                            ctx,
210                            ActorError::FunctionalCritical {
211                                description: format!(
212                                    "Cannot create approval request hash: {}",
213                                    e
214                                ),
215                            },
216                        )
217                        .await);
218                    }
219                };
220
221                self.approval_req_hash = approval_req_hash;
222                self.request_id = request_id.clone();
223                self.version = version;
224
225                for signer in self.approvers.clone() {
226                    if let Err(e) =
227                        self.create_approvers(ctx, signer.clone()).await
228                    {
229                        error!(
230                            msg_type = "Create",
231                            error = %e,
232                            signer = %signer,
233                            "Failed to create approver actor"
234                        );
235                        return Err(emit_fail(ctx, e).await);
236                    }
237                }
238
239                debug!(
240                    msg_type = "Create",
241                    request_id = %request_id,
242                    version = version,
243                    approvers_count = self.approvers_quantity,
244                    "Approval created and approvers initialized"
245                );
246            }
247            ApprovalMessage::Response {
248                approval_res,
249                sender,
250                signature,
251            } => {
252                if self.check_approval(sender.clone()) {
253                    match approval_res.clone() {
254                        ApprovalRes::Response {
255                            approval_req_hash,
256                            agrees,
257                            ..
258                        } => {
259                            if approval_req_hash != self.approval_req_hash {
260                                error!(
261                                    msg_type = "Response",
262                                    expected_hash = %self.approval_req_hash,
263                                    received_hash = %approval_req_hash,
264                                    "Invalid approval request hash"
265                                );
266                                return Err(ActorError::Functional {
267                                    description: "Approval Response, Invalid approval request hash".to_owned(),
268                                });
269                            }
270
271                            let Some(signature) = signature else {
272                                error!(
273                                    msg_type = "Response",
274                                    sender = %sender,
275                                    "Approval response without signature"
276                                );
277                                return Err(ActorError::Functional {
278                                    description: "Approval Response solver without signature".to_owned(),
279                                });
280                            };
281
282                            if agrees {
283                                self.approvers_agrees.push(signature);
284                            } else {
285                                self.approvers_disagrees.push(signature);
286                            }
287                        }
288                        ApprovalRes::TimeOut(approval_time_out) => {
289                            self.approvers_timeout.push(approval_time_out);
290                        }
291                    };
292
293                    // si hemos llegado al quorum y hay suficientes aprobaciones aprobamos...
294                    if self.quorum.check_quorum(
295                        self.approvers_quantity,
296                        self.approvers_agrees.len() as u32
297                            + self.approvers_timeout.len() as u32,
298                    ) {
299                        if let Err(e) =
300                            self.send_approval_to_req(ctx, true).await
301                        {
302                            error!(
303                                msg_type = "Response",
304                                error = %e,
305                                "Failed to send approval response to request actor"
306                            );
307                            return Err(emit_fail(ctx, e).await);
308                        };
309
310                        debug!(
311                            msg_type = "Response",
312                            agrees = self.approvers_agrees.len(),
313                            disagrees = self.approvers_disagrees.len(),
314                            timeouts = self.approvers_timeout.len(),
315                            "Quorum reached, approval accepted"
316                        );
317                    } else if self.approvers.is_empty()
318                        && let Err(e) =
319                            self.send_approval_to_req(ctx, false).await
320                    {
321                        error!(
322                            msg_type = "Response",
323                            error = %e,
324                            "Failed to send approval response to request actor"
325                        );
326                        return Err(emit_fail(ctx, e).await);
327                    } else if self.approvers.is_empty() {
328                        debug!(
329                            msg_type = "Response",
330                            agrees = self.approvers_agrees.len(),
331                            disagrees = self.approvers_disagrees.len(),
332                            timeouts = self.approvers_timeout.len(),
333                            "All approvers responded, approval rejected"
334                        );
335                    }
336                } else {
337                    warn!(
338                        msg_type = "Response",
339                        sender = %sender,
340                        "Response from unexpected sender"
341                    );
342                }
343            }
344        }
345        Ok(())
346    }
347
348    async fn on_child_fault(
349        &mut self,
350        error: ActorError,
351        ctx: &mut ActorContext<Self>,
352    ) -> ChildAction {
353        error!(
354            request_id = %self.request_id,
355            version = self.version,
356            error = %error,
357            "Child fault in approval actor"
358        );
359        emit_fail(ctx, error).await;
360        ChildAction::Stop
361    }
362}
363
364impl NotPersistentActor for Approval {}