Skip to main content

ave_core/approval/
light.rs

1use std::{collections::VecDeque, sync::Arc, time::Duration};
2
3use crate::{
4    ActorMessage, NetworkMessage,
5    approval::types::VotationType,
6    helpers::network::service::NetworkSender,
7    model::{
8        common::emit_fail,
9        network::{RetryNetwork, TimeOut},
10    },
11};
12use async_trait::async_trait;
13use ave_actors::{
14    Actor, ActorContext, ActorError, ActorPath, ChildAction,
15    CustomIntervalStrategy, Handler, Message, NotPersistentActor, RetryActor,
16    RetryMessage, Strategy,
17};
18use ave_common::identity::{
19    DigestIdentifier, HashAlgorithm, PublicKey, Signed, TimeStamp,
20};
21use network::ComunicateInfo;
22use tracing::{Span, debug, error, info_span, warn};
23
24use super::{
25    Approval, ApprovalMessage, request::ApprovalReq, response::ApprovalRes,
26};
27
28#[derive(Clone, Debug)]
29pub struct ApprLight {
30    network: Arc<NetworkSender>,
31    node_key: PublicKey,
32    request_id: DigestIdentifier,
33    version: u64,
34}
35
36impl ApprLight {
37    pub const fn new(
38        network: Arc<NetworkSender>,
39        node_key: PublicKey,
40        request_id: DigestIdentifier,
41        version: u64,
42    ) -> Self {
43        Self {
44            network,
45            node_key,
46            request_id,
47            version,
48        }
49    }
50}
51
52pub struct InitApprLight {
53    pub request_id: DigestIdentifier,
54    pub version: u64,
55    pub our_key: Arc<PublicKey>,
56    pub node_key: PublicKey,
57    pub subject_id: String,
58    pub pass_votation: VotationType,
59    pub hash: HashAlgorithm,
60    pub network: Arc<NetworkSender>,
61}
62
63#[derive(Debug, Clone)]
64pub enum ApprLightMessage {
65    EndRetry,
66    // Lanza los retries y envía la petición a la network(exponencial)
67    NetworkApproval {
68        approval_req: Signed<ApprovalReq>,
69    },
70    // Finaliza los retries y recibe la respuesta de la network
71    NetworkResponse {
72        approval_res: Signed<ApprovalRes>,
73        request_id: String,
74        version: u64,
75        sender: PublicKey,
76    },
77}
78
79impl Message for ApprLightMessage {}
80
81#[async_trait]
82impl Actor for ApprLight {
83    type Event = ();
84    type Message = ApprLightMessage;
85    type Response = ();
86
87    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
88        parent_span.map_or_else(
89            || info_span!("ApprLight", id),
90            |parent_span| info_span!(parent: parent_span, "ApprLight", id),
91        )
92    }
93}
94
95#[async_trait]
96impl Handler<Self> for ApprLight {
97    async fn handle_message(
98        &mut self,
99        _sender: ActorPath,
100        msg: ApprLightMessage,
101        ctx: &mut ActorContext<Self>,
102    ) -> Result<(), ActorError> {
103        match msg {
104            ApprLightMessage::EndRetry => {
105                debug!(
106                    node_key = %self.node_key,
107                    "Retry exhausted, notifying parent and stopping"
108                );
109
110                match ctx.get_parent::<Approval>().await {
111                    Ok(approval_actor) => {
112                        if let Err(e) = approval_actor
113                            .tell(ApprovalMessage::Response {
114                                approval_res: ApprovalRes::TimeOut(TimeOut {
115                                    re_trys: 3,
116                                    timestamp: TimeStamp::now(),
117                                    who: self.node_key.clone(),
118                                }),
119                                sender: self.node_key.clone(),
120                                signature: None,
121                            })
122                            .await
123                        {
124                            error!(
125                                error = %e,
126                                "Failed to send timeout response to approval actor"
127                            );
128                            emit_fail(ctx, e).await;
129                        }
130
131                        debug!(
132                            request_id = %self.request_id,
133                            version = self.version,
134                            node_key = %self.node_key,
135                            "Timeout response sent to approval actor"
136                        );
137                    }
138                    Err(e) => {
139                        error!(
140                            error = %e,
141                            path = %ctx.path().parent(),
142                            "Approval actor not found"
143                        );
144                        emit_fail(ctx, e).await;
145                    }
146                }
147
148                ctx.stop(None).await;
149            }
150            ApprLightMessage::NetworkApproval { approval_req } => {
151                // Solo admitimos eventos FACT
152                let subject_id = approval_req.content().subject_id.clone();
153
154                let receiver_actor =
155                    format!("/user/node/{}/approver", subject_id);
156
157                let message = NetworkMessage {
158                    info: ComunicateInfo {
159                        request_id: self.request_id.to_string(),
160                        version: self.version,
161                        receiver: self.node_key.clone(),
162                        receiver_actor,
163                    },
164                    message: ActorMessage::ApprovalReq { req: approval_req },
165                };
166
167                let target = RetryNetwork::new(self.network.clone());
168
169                let strategy = Strategy::CustomIntervalStrategy(
170                    CustomIntervalStrategy::new(VecDeque::from([
171                        Duration::from_hours(4),
172                        Duration::from_hours(8),
173                        Duration::from_hours(16),
174                    ])),
175                );
176
177                let retry_actor = RetryActor::new_with_parent_message::<Self>(
178                    target,
179                    message,
180                    strategy,
181                    ApprLightMessage::EndRetry,
182                );
183
184                let retry = match ctx
185                    .create_child::<RetryActor<RetryNetwork>, _>(
186                        "retry",
187                        retry_actor,
188                    )
189                    .await
190                {
191                    Ok(retry) => retry,
192                    Err(e) => {
193                        error!(
194                            msg_type = "NetworkApproval",
195                            error = %e,
196                            "Failed to create retry actor"
197                        );
198                        return Err(emit_fail(ctx, e).await);
199                    }
200                };
201
202                if let Err(e) = retry.tell(RetryMessage::Retry).await {
203                    error!(
204                        msg_type = "NetworkApproval",
205                        error = %e,
206                        "Failed to send retry message"
207                    );
208                    return Err(emit_fail(ctx, e).await);
209                };
210
211                debug!(
212                    msg_type = "NetworkApproval",
213                    request_id = %self.request_id,
214                    version = self.version,
215                    "Retry actor created and started"
216                );
217            }
218            // Finaliza los retries
219            ApprLightMessage::NetworkResponse {
220                approval_res,
221                request_id,
222                version,
223                sender,
224            } => {
225                if request_id == self.request_id.to_string()
226                    && version == self.version
227                {
228                    if self.node_key != sender
229                        || sender != approval_res.signature().signer
230                    {
231                        error!(
232                            msg_type = "NetworkResponse",
233                            expected_node = %self.node_key,
234                            received_sender = %sender,
235                            "Unexpected approval sender"
236                        );
237                        return Ok(());
238                    }
239
240                    if let Err(e) = approval_res.verify() {
241                        error!(
242                            msg_type = "NetworkResponse",
243                            error = %e,
244                            "Invalid approval signature"
245                        );
246                        return Ok(());
247                    }
248
249                    match ctx.get_parent::<Approval>().await {
250                        Ok(approval_actor) => {
251                            if let Err(e) = approval_actor
252                                .tell(ApprovalMessage::Response {
253                                    approval_res: approval_res
254                                        .content()
255                                        .clone(),
256                                    sender: self.node_key.clone(),
257                                    signature: Some(
258                                        approval_res.signature().clone(),
259                                    ),
260                                })
261                                .await
262                            {
263                                error!(
264                                    msg_type = "NetworkResponse",
265                                    error = %e,
266                                    "Failed to send response to approval actor"
267                                );
268                                return Err(emit_fail(ctx, e).await);
269                            }
270                        }
271                        Err(e) => {
272                            error!(
273                                msg_type = "NetworkResponse",
274                                path = %ctx.path().parent(),
275                                "Approval actor not found"
276                            );
277                            return Err(emit_fail(ctx, e).await);
278                        }
279                    };
280
281                    'retry: {
282                        let Ok(retry) = ctx
283                            .get_child::<RetryActor<RetryNetwork>>("retry")
284                            .await
285                        else {
286                            break 'retry;
287                        };
288
289                        if let Err(e) = retry.tell(RetryMessage::End).await {
290                            error!(
291                                msg_type = "NetworkResponse",
292                                error = %e,
293                                "Failed to send end message to retry actor"
294                            );
295                            break 'retry;
296                        };
297                    }
298
299                    debug!(
300                        msg_type = "NetworkResponse",
301                        request_id = %request_id,
302                        version = version,
303                        "Approval response processed successfully"
304                    );
305
306                    ctx.stop(None).await;
307                } else {
308                    warn!(
309                        msg_type = "NetworkResponse",
310                        expected_request_id = %self.request_id,
311                        received_request_id = %request_id,
312                        expected_version = self.version,
313                        received_version = version,
314                        "Mismatched request id or version"
315                    );
316                }
317            }
318        }
319        Ok(())
320    }
321
322    async fn on_child_fault(
323        &mut self,
324        error: ActorError,
325        ctx: &mut ActorContext<Self>,
326    ) -> ChildAction {
327        error!(
328            request_id = %self.request_id,
329            version = self.version,
330            node_key = %self.node_key,
331            error = %error,
332            "Child fault in approval light actor"
333        );
334        emit_fail(ctx, error).await;
335        ChildAction::Stop
336    }
337}
338
339impl NotPersistentActor for ApprLight {}