Skip to main content

ave_core/approval/
mod.rs

1use std::collections::HashSet;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use ave_actors::ActorPath;
6use ave_actors::{
7    Actor, ActorContext, ActorError, ChildAction, Handler, Message,
8    NotPersistentActor,
9};
10
11use ave_common::identity::{
12    CryptoError, DigestIdentifier, HashAlgorithm, PublicKey, Signature, Signed,
13    hash_borsh,
14};
15
16use request::ApprovalReq;
17use response::ApprovalRes;
18
19use tracing::{Span, debug, error, info_span, warn};
20
21use crate::approval::light::{ApprLight, ApprLightMessage};
22use crate::approval::persist::{ApprPersist, ApprPersistMessage};
23use crate::governance::model::Quorum;
24use crate::helpers::network::service::NetworkSender;
25use crate::metrics::try_core_metrics;
26use crate::model::common::{abort_req, emit_fail};
27
28use crate::model::event::ApprovalData;
29use crate::model::network::TimeOut;
30
31use crate::request::manager::{RequestManager, RequestManagerMessage};
32
33pub mod light;
34pub mod persist;
35pub mod request;
36pub mod response;
37pub mod types;
38
39#[derive(Clone, Debug)]
40pub struct Approval {
41    hash: HashAlgorithm,
42    network: Arc<NetworkSender>,
43    our_key: Arc<PublicKey>,
44    quorum: Quorum,
45    request_id: DigestIdentifier,
46    version: u64,
47    request: Signed<ApprovalReq>,
48    approvers: HashSet<PublicKey>,
49    approvers_timeout: Vec<TimeOut>,
50    approvers_agrees: Vec<Signature>,
51    approvers_disagrees: Vec<Signature>,
52    approval_req_hash: DigestIdentifier,
53    approvers_quantity: u32,
54}
55
56impl Approval {
57    fn observe_event(result: &'static str) {
58        if let Some(metrics) = try_core_metrics() {
59            metrics.observe_protocol_event("approval", result);
60        }
61    }
62
63    pub fn new(
64        our_key: Arc<PublicKey>,
65        request: Signed<ApprovalReq>,
66        quorum: Quorum,
67        approvers: HashSet<PublicKey>,
68        hash: HashAlgorithm,
69        network: Arc<NetworkSender>,
70    ) -> Self {
71        Self {
72            hash,
73            network,
74            our_key,
75            quorum,
76            request,
77            approvers_quantity: approvers.len() as u32,
78            approvers,
79            request_id: DigestIdentifier::default(),
80            version: 0,
81            approvers_timeout: vec![],
82            approvers_agrees: vec![],
83            approvers_disagrees: vec![],
84            approval_req_hash: DigestIdentifier::default(),
85        }
86    }
87
88    async fn create_approvers(
89        &self,
90        ctx: &mut ActorContext<Self>,
91        signer: PublicKey,
92    ) -> Result<(), ActorError> {
93        let subject_id = self.request.content().subject_id.to_string();
94
95        if signer == *self.our_key {
96            let approver_path = ActorPath::from(format!(
97                "/user/node/subject_manager/{}/approver",
98                subject_id
99            ));
100            let approver_actor = ctx
101                .system()
102                .get_actor::<ApprPersist>(&approver_path)
103                .await?;
104            approver_actor
105                .tell(ApprPersistMessage::LocalApproval {
106                    request_id: self.request_id.clone(),
107                    version: self.version,
108                    approval_req: self.request.clone(),
109                })
110                .await?
111        } else {
112            // Create Approvers child
113            let child = ctx
114                .create_child(
115                    &signer.to_string(),
116                    ApprLight::new(
117                        self.network.clone(),
118                        signer.clone(),
119                        self.request_id.clone(),
120                        self.version,
121                    ),
122                )
123                .await?;
124
125            child
126                .tell(ApprLightMessage::NetworkApproval {
127                    approval_req: self.request.clone(),
128                })
129                .await?;
130        }
131
132        Ok(())
133    }
134    fn check_approval(&mut self, approver: PublicKey) -> bool {
135        self.approvers.remove(&approver)
136    }
137
138    async fn send_approval_to_req(
139        &self,
140        ctx: &ActorContext<Self>,
141        response: bool,
142    ) -> Result<(), ActorError> {
143        let req_actor = ctx.get_parent::<RequestManager>().await?;
144
145        req_actor
146            .tell(RequestManagerMessage::ApprovalRes {
147                request_id: self.request_id.clone(),
148                appro_res: ApprovalData {
149                    approval_req_signature: self.request.signature().clone(),
150                    approval_req_hash: self.approval_req_hash.clone(),
151                    approvers_agrees_signatures: self.approvers_agrees.clone(),
152                    approvers_disagrees_signatures: self
153                        .approvers_disagrees
154                        .clone(),
155                    approvers_timeout: self.approvers_timeout.clone(),
156                    approved: response,
157                },
158            })
159            .await
160    }
161
162    fn create_appro_req_hash(&self) -> Result<DigestIdentifier, CryptoError> {
163        hash_borsh(&*self.hash.hasher(), self.request.content())
164    }
165}
166
167#[derive(Debug, Clone)]
168pub enum ApprovalMessage {
169    Create {
170        request_id: DigestIdentifier,
171        version: u64,
172    },
173    Response {
174        approval_res: ApprovalRes,
175        sender: PublicKey,
176        signature: Option<Signature>,
177    },
178}
179
180impl Message for ApprovalMessage {}
181
182#[async_trait]
183impl Actor for Approval {
184    type Event = ();
185    type Message = ApprovalMessage;
186    type Response = ();
187
188    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
189        parent_span.map_or_else(
190            || info_span!("Approval"),
191            |parent_span| info_span!(parent: parent_span, "Approval"),
192        )
193    }
194}
195
196#[async_trait]
197impl Handler<Self> for Approval {
198    async fn handle_message(
199        &mut self,
200        __sender: ActorPath,
201        msg: ApprovalMessage,
202        ctx: &mut ActorContext<Self>,
203    ) -> Result<(), ActorError> {
204        match msg {
205            ApprovalMessage::Create {
206                request_id,
207                version,
208            } => {
209                let approval_req_hash = match self.create_appro_req_hash() {
210                    Ok(digest) => digest,
211                    Err(e) => {
212                        error!(
213                            msg_type = "Create",
214                            error = %e,
215                            "Failed to create approval request hash"
216                        );
217                        return Err(emit_fail(
218                            ctx,
219                            ActorError::FunctionalCritical {
220                                description: format!(
221                                    "Cannot create approval request hash: {}",
222                                    e
223                                ),
224                            },
225                        )
226                        .await);
227                    }
228                };
229
230                self.approval_req_hash = approval_req_hash;
231                self.request_id = request_id.clone();
232                self.version = version;
233
234                for signer in self.approvers.clone() {
235                    if let Err(e) =
236                        self.create_approvers(ctx, signer.clone()).await
237                    {
238                        error!(
239                            msg_type = "Create",
240                            error = %e,
241                            signer = %signer,
242                            "Failed to create approver actor"
243                        );
244                        return Err(emit_fail(ctx, e).await);
245                    }
246                }
247
248                debug!(
249                    msg_type = "Create",
250                    request_id = %request_id,
251                    version = version,
252                    approvers_count = self.approvers_quantity,
253                    "Approval created and approvers initialized"
254                );
255            }
256            ApprovalMessage::Response {
257                approval_res,
258                sender,
259                signature,
260            } => {
261                if self.check_approval(sender.clone()) {
262                    match approval_res.clone() {
263                        ApprovalRes::Response {
264                            approval_req_hash,
265                            agrees,
266                            ..
267                        } => {
268                            if approval_req_hash != self.approval_req_hash {
269                                error!(
270                                    msg_type = "Response",
271                                    expected_hash = %self.approval_req_hash,
272                                    received_hash = %approval_req_hash,
273                                    "Invalid approval request hash"
274                                );
275                                return Err(ActorError::Functional {
276                                    description: "Approval Response, Invalid approval request hash".to_owned(),
277                                });
278                            }
279
280                            let Some(signature) = signature else {
281                                error!(
282                                    msg_type = "Response",
283                                    sender = %sender,
284                                    "Approval response without signature"
285                                );
286                                return Err(ActorError::Functional {
287                                    description: "Approval Response solver without signature".to_owned(),
288                                });
289                            };
290
291                            if agrees {
292                                self.approvers_agrees.push(signature);
293                            } else {
294                                self.approvers_disagrees.push(signature);
295                            }
296                        }
297                        ApprovalRes::Abort(error) => {
298                            Self::observe_event("abort");
299                            if let Err(e) = abort_req(
300                                ctx,
301                                self.request_id.clone(),
302                                sender.clone(),
303                                error.clone(),
304                                self.request.content().sn,
305                            )
306                            .await
307                            {
308                                error!(
309                                    msg_type = "Response",
310                                    request_id = %self.request_id,
311                                    sender = %sender,
312                                    abort_reason = %error,
313                                    error = %e,
314                                    "Failed to abort request"
315                                );
316                                return Err(emit_fail(ctx, e).await);
317                            }
318
319                            debug!(
320                                msg_type = "Response",
321                                request_id = %self.request_id,
322                                sender = %sender,
323                                abort_reason = %error,
324                                "Approval aborted"
325                            );
326
327                            return Ok(());
328                        }
329                        ApprovalRes::TimeOut(approval_time_out) => {
330                            Self::observe_event("timeout");
331                            self.approvers_timeout.push(approval_time_out);
332                        }
333                    };
334
335                    // si hemos llegado al quorum y hay suficientes aprobaciones aprobamos...
336                    if self.quorum.check_quorum(
337                        self.approvers_quantity,
338                        self.approvers_agrees.len() as u32
339                            + self.approvers_timeout.len() as u32,
340                    ) {
341                        Self::observe_event("approved");
342                        if let Err(e) =
343                            self.send_approval_to_req(ctx, true).await
344                        {
345                            error!(
346                                msg_type = "Response",
347                                error = %e,
348                                "Failed to send approval response to request actor"
349                            );
350                            return Err(emit_fail(ctx, e).await);
351                        };
352
353                        debug!(
354                            msg_type = "Response",
355                            agrees = self.approvers_agrees.len(),
356                            disagrees = self.approvers_disagrees.len(),
357                            timeouts = self.approvers_timeout.len(),
358                            "Quorum reached, approval accepted"
359                        );
360                    } else if self.approvers.is_empty()
361                        && let Err(e) =
362                            self.send_approval_to_req(ctx, false).await
363                    {
364                        error!(
365                            msg_type = "Response",
366                            error = %e,
367                            "Failed to send approval response to request actor"
368                        );
369                        return Err(emit_fail(ctx, e).await);
370                    } else if self.approvers.is_empty() {
371                        Self::observe_event("rejected");
372                        debug!(
373                            msg_type = "Response",
374                            agrees = self.approvers_agrees.len(),
375                            disagrees = self.approvers_disagrees.len(),
376                            timeouts = self.approvers_timeout.len(),
377                            "All approvers responded, approval rejected"
378                        );
379                    }
380                } else {
381                    warn!(
382                        msg_type = "Response",
383                        sender = %sender,
384                        "Response from unexpected sender"
385                    );
386                }
387            }
388        }
389        Ok(())
390    }
391
392    async fn on_child_fault(
393        &mut self,
394        error: ActorError,
395        ctx: &mut ActorContext<Self>,
396    ) -> ChildAction {
397        Self::observe_event("error");
398        error!(
399            request_id = %self.request_id,
400            version = self.version,
401            error = %error,
402            "Child fault in approval actor"
403        );
404        emit_fail(ctx, error).await;
405        ChildAction::Stop
406    }
407}
408
409impl NotPersistentActor for Approval {}