Skip to main content

ave_core/update/
updater.rs

1use std::{sync::Arc, time::Duration};
2
3use ave_actors::{
4    Actor, ActorContext, ActorError, ActorPath, ChildAction,
5    FixedIntervalStrategy, Handler, Message, NotPersistentActor, RetryActor,
6    RetryMessage, Strategy,
7};
8
9use async_trait::async_trait;
10use ave_common::identity::{DigestIdentifier, PublicKey};
11use ave_network::ComunicateInfo;
12use tracing::{Span, debug, error, info_span, warn};
13
14use crate::{
15    ActorMessage,
16    helpers::network::{NetworkMessage, service::NetworkSender},
17    model::{common::emit_fail, network::RetryNetwork},
18    update::UpdateWitnessOffer,
19};
20
21use super::{Update, UpdateMessage};
22
23#[derive(Clone, Debug)]
24pub struct Updater {
25    network: Arc<NetworkSender>,
26    node_key: PublicKey,
27    round: u64,
28    witness_retry_count: usize,
29    witness_retry_interval_secs: u64,
30}
31
32impl Updater {
33    pub const fn new(
34        node_key: PublicKey,
35        round: u64,
36        network: Arc<NetworkSender>,
37        witness_retry_count: usize,
38        witness_retry_interval_secs: u64,
39    ) -> Self {
40        Self {
41            node_key,
42            network,
43            round,
44            witness_retry_count,
45            witness_retry_interval_secs,
46        }
47    }
48}
49
50#[derive(Debug, Clone)]
51pub enum UpdaterMessage {
52    EndRetry,
53    NetworkLastSn {
54        subject_id: DigestIdentifier,
55        actual_sn: Option<u64>,
56    },
57    NetworkNoOffer {
58        sender: PublicKey,
59    },
60    NetworkResponse {
61        offer: UpdateWitnessOffer,
62        sender: PublicKey,
63    },
64}
65
66impl Message for UpdaterMessage {}
67
68impl NotPersistentActor for Updater {}
69
70#[async_trait]
71impl Actor for Updater {
72    type Event = ();
73    type Message = UpdaterMessage;
74    type Response = ();
75
76    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
77        parent_span.map_or_else(
78            || info_span!("Updater", id),
79            |parent_span| info_span!(parent: parent_span, "Updater", id),
80        )
81    }
82}
83
84#[async_trait]
85impl Handler<Self> for Updater {
86    async fn handle_message(
87        &mut self,
88        _sender: ActorPath,
89        msg: UpdaterMessage,
90        ctx: &mut ActorContext<Self>,
91    ) -> Result<(), ActorError> {
92        match msg {
93            UpdaterMessage::EndRetry => {
94                warn!(
95                    node_key = %self.node_key,
96                    "Retry exhausted, notifying parent and stopping"
97                );
98
99                match ctx.get_parent::<Update>().await {
100                    Ok(update_actor) => {
101                        if let Err(e) = update_actor
102                            .tell(UpdateMessage::Response {
103                                sender: self.node_key.clone(),
104                                offer: None,
105                                round: self.round,
106                            })
107                            .await
108                        {
109                            error!(
110                                error = %e,
111                                "Failed to send timeout response to update actor"
112                            );
113                            emit_fail(ctx, e).await;
114                        } else {
115                            debug!(
116                                node = %self.node_key,
117                                "Timeout response sent to update actor"
118                            );
119                        }
120                    }
121                    Err(e) => {
122                        error!(
123                            error = %e,
124                            path = %ctx.path().parent(),
125                            "Update actor not found"
126                        );
127                        emit_fail(ctx, e).await;
128                    }
129                };
130
131                ctx.stop(None).await;
132            }
133            UpdaterMessage::NetworkLastSn {
134                subject_id,
135                actual_sn,
136            } => {
137                let message = NetworkMessage {
138                    info: ComunicateInfo {
139                        request_id: String::default(),
140                        version: 0,
141                        receiver: self.node_key.clone(),
142                        receiver_actor: format!(
143                            "/user/node/distributor_{}",
144                            subject_id
145                        ),
146                    },
147                    message: ActorMessage::DistributionGetLastSn {
148                        subject_id: subject_id.clone(),
149                        actual_sn,
150                        receiver_actor: ctx.path().to_string(),
151                    },
152                };
153
154                let target = RetryNetwork::new(self.network.clone());
155
156                let strategy =
157                    Strategy::FixedInterval(FixedIntervalStrategy::new(
158                        self.witness_retry_count.max(1),
159                        Duration::from_secs(
160                            self.witness_retry_interval_secs.max(1),
161                        ),
162                    ));
163
164                let retry_actor = RetryActor::new_with_parent_message::<Self>(
165                    target,
166                    message,
167                    strategy,
168                    UpdaterMessage::EndRetry,
169                );
170
171                let retry = match ctx
172                    .create_child::<RetryActor<RetryNetwork>, _>(
173                        "retry",
174                        retry_actor,
175                    )
176                    .await
177                {
178                    Ok(retry) => retry,
179                    Err(e) => {
180                        error!(
181                            msg_type = "NetworkLastSn",
182                            error = %e,
183                            "Failed to create retry actor"
184                        );
185                        return Err(emit_fail(ctx, e).await);
186                    }
187                };
188
189                if let Err(e) = retry.tell(RetryMessage::Retry).await {
190                    error!(
191                        msg_type = "NetworkLastSn",
192                        error = %e,
193                        "Failed to send retry message to retry actor"
194                    );
195                    return Err(emit_fail(ctx, e).await);
196                } else {
197                    debug!(
198                        msg_type = "NetworkLastSn",
199                        subject_id = %subject_id,
200                        node_key = %self.node_key,
201                        "Last SN request sent to network with retry"
202                    );
203                };
204            }
205            UpdaterMessage::NetworkResponse { offer, sender } => {
206                if sender != self.node_key {
207                    warn!(
208                        msg_type = "NetworkResponse",
209                        expected_node = %self.node_key,
210                        sender = %sender,
211                        "Ignoring update response from unexpected sender"
212                    );
213                    return Ok(());
214                }
215
216                match ctx.get_parent::<Update>().await {
217                    Ok(update_actor) => {
218                        if let Err(e) = update_actor
219                            .tell(UpdateMessage::Response {
220                                sender: self.node_key.clone(),
221                                offer: Some(offer.clone()),
222                                round: self.round,
223                            })
224                            .await
225                        {
226                            error!(
227                                msg_type = "NetworkResponse",
228                                error = %e,
229                                "Failed to send response to update actor"
230                            );
231                            return Err(emit_fail(ctx, e).await);
232                        }
233                    }
234                    Err(e) => {
235                        error!(
236                            msg_type = "NetworkResponse",
237                            error = %e,
238                            path = %ctx.path().parent(),
239                            "Update actor not found"
240                        );
241                        return Err(emit_fail(ctx, e).await);
242                    }
243                };
244
245                'retry: {
246                    let Ok(retry) = ctx
247                        .get_child::<RetryActor<RetryNetwork>>("retry")
248                        .await
249                    else {
250                        debug!(
251                            msg_type = "NetworkResponse",
252                            sender = %sender,
253                            "Retry actor not found while closing updater"
254                        );
255                        // Aquí me da igual, porque al parar este actor para el hijo
256                        break 'retry;
257                    };
258
259                    if let Err(e) = retry.tell(RetryMessage::End).await {
260                        warn!(
261                            msg_type = "NetworkResponse",
262                            error = %e,
263                            "Failed to end retry actor"
264                        );
265                        // Aquí me da igual, porque al parar este actor para el hijo
266                        break 'retry;
267                    };
268                }
269
270                debug!(
271                    msg_type = "NetworkResponse",
272                    sn = offer.sn,
273                    sender = %sender,
274                    "Network response processed successfully"
275                );
276
277                ctx.stop(None).await;
278            }
279            UpdaterMessage::NetworkNoOffer { sender } => {
280                if sender != self.node_key {
281                    warn!(
282                        msg_type = "NetworkNoOffer",
283                        expected_node = %self.node_key,
284                        sender = %sender,
285                        "Ignoring empty update response from unexpected sender"
286                    );
287                    return Ok(());
288                }
289
290                match ctx.get_parent::<Update>().await {
291                    Ok(update_actor) => {
292                        if let Err(e) = update_actor
293                            .tell(UpdateMessage::Response {
294                                sender: self.node_key.clone(),
295                                offer: None,
296                                round: self.round,
297                            })
298                            .await
299                        {
300                            error!(
301                                msg_type = "NetworkNoOffer",
302                                error = %e,
303                                "Failed to send empty response to update actor"
304                            );
305                            return Err(emit_fail(ctx, e).await);
306                        }
307                    }
308                    Err(e) => {
309                        error!(
310                            msg_type = "NetworkNoOffer",
311                            error = %e,
312                            path = %ctx.path().parent(),
313                            "Update actor not found"
314                        );
315                        return Err(emit_fail(ctx, e).await);
316                    }
317                };
318
319                'retry: {
320                    let Ok(retry) = ctx
321                        .get_child::<RetryActor<RetryNetwork>>("retry")
322                        .await
323                    else {
324                        debug!(
325                            msg_type = "NetworkNoOffer",
326                            sender = %sender,
327                            "Retry actor not found while closing updater"
328                        );
329                        break 'retry;
330                    };
331
332                    if let Err(e) = retry.tell(RetryMessage::End).await {
333                        warn!(
334                            msg_type = "NetworkNoOffer",
335                            error = %e,
336                            "Failed to end retry actor"
337                        );
338                        break 'retry;
339                    };
340                }
341
342                debug!(
343                    msg_type = "NetworkNoOffer",
344                    sender = %sender,
345                    "Empty network response processed successfully"
346                );
347
348                ctx.stop(None).await;
349            }
350        };
351
352        Ok(())
353    }
354
355    async fn on_child_fault(
356        &mut self,
357        error: ActorError,
358        ctx: &mut ActorContext<Self>,
359    ) -> ChildAction {
360        error!(
361            node = %self.node_key,
362            error = %error,
363            "Child fault in updater actor"
364        );
365        emit_fail(ctx, error).await;
366        ChildAction::Stop
367    }
368}