Skip to main content

ave_core/update/
updater.rs

1use std::{sync::Arc, time::Duration};
2
3use ave_actors::{
4    Actor, ActorContext, ActorError, ActorPath, ChildAction,
5    FixedIntervalStrategy, Handler, Message, NotPersistentActor, RetryActor,
6    RetryMessage, Strategy,
7};
8
9use async_trait::async_trait;
10use ave_common::identity::{DigestIdentifier, PublicKey};
11use network::ComunicateInfo;
12use tracing::{Span, debug, error, info_span, warn};
13
14use crate::{
15    ActorMessage,
16    helpers::network::{NetworkMessage, service::NetworkSender},
17    model::{common::emit_fail, network::RetryNetwork},
18};
19
20use super::{Update, UpdateMessage};
21
22#[derive(Clone, Debug)]
23pub struct Updater {
24    network: Arc<NetworkSender>,
25    node_key: PublicKey,
26}
27
28impl Updater {
29    pub const fn new(node_key: PublicKey, network: Arc<NetworkSender>) -> Self {
30        Self { node_key, network }
31    }
32}
33
34#[derive(Debug, Clone)]
35pub enum UpdaterMessage {
36    EndRetry,
37    NetworkLastSn {
38        subject_id: DigestIdentifier,
39        node_key: PublicKey,
40    },
41    NetworkResponse {
42        sn: u64,
43        sender: PublicKey,
44    },
45}
46
47impl Message for UpdaterMessage {}
48
49impl NotPersistentActor for Updater {}
50
51#[async_trait]
52impl Actor for Updater {
53    type Event = ();
54    type Message = UpdaterMessage;
55    type Response = ();
56
57    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
58        parent_span.map_or_else(
59            || info_span!("Updater", id),
60            |parent_span| info_span!(parent: parent_span, "Updater", id),
61        )
62    }
63}
64
65#[async_trait]
66impl Handler<Self> for Updater {
67    async fn handle_message(
68        &mut self,
69        _sender: ActorPath,
70        msg: UpdaterMessage,
71        ctx: &mut ActorContext<Self>,
72    ) -> Result<(), ActorError> {
73        match msg {
74            UpdaterMessage::EndRetry => {
75                debug!(
76                    node_key = %self.node_key,
77                    "Retry exhausted, notifying parent and stopping"
78                );
79
80                match ctx.get_parent::<Update>().await {
81                    Ok(update_actor) => {
82                        if let Err(e) = update_actor
83                            .tell(UpdateMessage::Response {
84                                sender: self.node_key.clone(),
85                                sn: 0,
86                            })
87                            .await
88                        {
89                            error!(
90                                error = %e,
91                                "Failed to send timeout response to update actor"
92                            );
93                            emit_fail(ctx, e).await;
94                        } else {
95                            debug!(
96                                node = %self.node_key,
97                                "Timeout response sent to update actor"
98                            );
99                        }
100                    }
101                    Err(e) => {
102                        error!(
103                            path = %ctx.path().parent(),
104                            "Update actor not found"
105                        );
106                        emit_fail(ctx, e).await;
107                    }
108                };
109
110                ctx.stop(None).await;
111            }
112            UpdaterMessage::NetworkLastSn {
113                subject_id,
114                node_key,
115            } => {
116                let message = NetworkMessage {
117                    info: ComunicateInfo {
118                        request_id: String::default(),
119                        version: 0,
120                        receiver: node_key.clone(),
121                        receiver_actor: format!(
122                            "/user/node/distributor_{}",
123                            subject_id
124                        ),
125                    },
126                    message: ActorMessage::DistributionGetLastSn {
127                        subject_id: subject_id.clone(),
128                        receiver_actor: ctx.path().to_string(),
129                    },
130                };
131
132                let target = RetryNetwork::new(self.network.clone());
133
134                let strategy = Strategy::FixedInterval(
135                    FixedIntervalStrategy::new(1, Duration::from_secs(10)),
136                );
137
138                let retry_actor = RetryActor::new_with_parent_message::<Self>(
139                    target,
140                    message,
141                    strategy,
142                    UpdaterMessage::EndRetry,
143                );
144
145                let retry = match ctx
146                    .create_child::<RetryActor<RetryNetwork>, _>(
147                        "retry",
148                        retry_actor,
149                    )
150                    .await
151                {
152                    Ok(retry) => retry,
153                    Err(e) => {
154                        error!(
155                            msg_type = "NetworkLastSn",
156                            error = %e,
157                            "Failed to create retry actor"
158                        );
159                        return Err(emit_fail(ctx, e).await);
160                    }
161                };
162
163                if let Err(e) = retry.tell(RetryMessage::Retry).await {
164                    error!(
165                        msg_type = "NetworkLastSn",
166                        error = %e,
167                        "Failed to send retry message to retry actor"
168                    );
169                    return Err(emit_fail(ctx, e).await);
170                } else {
171                    debug!(
172                        msg_type = "NetworkLastSn",
173                        subject_id = %subject_id,
174                        node_key = %node_key,
175                        "Last SN request sent to network with retry"
176                    );
177                };
178            }
179            UpdaterMessage::NetworkResponse { sn, sender } => {
180                if sender == self.node_key {
181                    match ctx.get_parent::<Update>().await {
182                        Ok(update_actor) => {
183                            if let Err(e) = update_actor
184                                .tell(UpdateMessage::Response {
185                                    sender: self.node_key.clone(),
186                                    sn,
187                                })
188                                .await
189                            {
190                                error!(
191                                    msg_type = "NetworkResponse",
192                                    error = %e,
193                                    "Failed to send response to update actor"
194                                );
195                                return Err(emit_fail(ctx, e).await);
196                            }
197                        }
198                        Err(e) => {
199                            error!(
200                                msg_type = "NetworkResponse",
201                                path = %ctx.path().parent(),
202                                "Update actor not found"
203                            );
204                            return Err(emit_fail(ctx, e).await);
205                        }
206                    };
207
208                    'retry: {
209                        let Ok(retry) = ctx
210                            .get_child::<RetryActor<RetryNetwork>>("retry")
211                            .await
212                        else {
213                            // Aquí me da igual, porque al parar este actor para el hijo
214                            break 'retry;
215                        };
216
217                        if let Err(e) = retry.tell(RetryMessage::End).await {
218                            warn!(
219                                msg_type = "NetworkResponse",
220                                error = %e,
221                                "Failed to end retry actor"
222                            );
223                            // Aquí me da igual, porque al parar este actor para el hijo
224                            break 'retry;
225                        };
226                    }
227
228                    debug!(
229                        msg_type = "NetworkResponse",
230                        sn = sn,
231                        sender = %sender,
232                        "Network response processed successfully"
233                    );
234
235                    ctx.stop(None).await;
236                }
237            }
238        };
239
240        Ok(())
241    }
242
243    async fn on_child_fault(
244        &mut self,
245        error: ActorError,
246        ctx: &mut ActorContext<Self>,
247    ) -> ChildAction {
248        error!(
249            node = %self.node_key,
250            error = %error,
251            "Child fault in updater actor"
252        );
253        emit_fail(ctx, error).await;
254        ChildAction::Stop
255    }
256}