Skip to main content

ave_core/update/
mod.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::Arc,
4};
5
6use ave_actors::{
7    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
8    NotPersistentActor,
9};
10
11use async_trait::async_trait;
12use ave_common::identity::{DigestIdentifier, PublicKey};
13use ave_network::ComunicateInfo;
14use serde::{Deserialize, Serialize};
15use tracing::{Span, debug, error, info_span, warn};
16use updater::{Updater, UpdaterMessage};
17
18use crate::{
19    NetworkMessage,
20    governance::witnesses_register::{
21        TrackerDeliveryMode, TrackerDeliveryRange,
22    },
23    helpers::network::{ActorMessage, service::NetworkSender},
24    model::common::{emit_fail, subject::get_local_subject_sn},
25    request::manager::{RequestManager, RequestManagerMessage},
26};
27
28pub mod updater;
29
30#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
31pub enum UpdateSubjectKind {
32    Governance,
33    Tracker,
34}
35
36#[derive(Debug, Serialize, Deserialize, Clone)]
37pub struct UpdateWitnessOffer {
38    pub kind: UpdateSubjectKind,
39    pub sn: u64,
40    pub clear_sn: Option<u64>,
41    pub ranges: Vec<TrackerDeliveryRange>,
42}
43
44#[derive(Clone, Copy, Debug, PartialEq, Eq)]
45enum UpdateStartMode {
46    Direct,
47    Sweep,
48    Empty,
49}
50
51#[derive(Clone, Debug, Serialize, Deserialize)]
52pub enum UpdateType {
53    Auth,
54    Request {
55        subject_id: DigestIdentifier,
56        id: DigestIdentifier,
57    },
58}
59
60pub struct UpdateNew {
61    pub subject_id: DigestIdentifier,
62    pub witnesses: HashSet<PublicKey>,
63    pub update_type: UpdateType,
64    pub network: Arc<NetworkSender>,
65    pub our_sn: Option<u64>,
66    pub subject_kind_hint: Option<UpdateSubjectKind>,
67    pub round_retry_interval_secs: u64,
68    pub max_round_retries: usize,
69    pub witness_retry_count: usize,
70    pub witness_retry_interval_secs: u64,
71}
72
73#[derive(Clone, Debug)]
74pub struct Update {
75    subject_id: DigestIdentifier,
76    witnesses: HashSet<PublicKey>,
77    all_witnesses: HashSet<PublicKey>,
78    offers: HashMap<PublicKey, UpdateWitnessOffer>,
79    our_sn: Option<u64>,
80    update_type: UpdateType,
81    network: Arc<NetworkSender>,
82    retry_round: u64,
83    retry_token: u64,
84    retry_attempt: usize,
85    subject_kind_hint: Option<UpdateSubjectKind>,
86    round_retry_interval_secs: u64,
87    max_round_retries: usize,
88    witness_retry_count: usize,
89    witness_retry_interval_secs: u64,
90}
91
92impl Update {
93    pub fn new(data: UpdateNew) -> Self {
94        Self {
95            network: data.network,
96            subject_id: data.subject_id,
97            witnesses: data.witnesses.clone(),
98            all_witnesses: data.witnesses,
99            update_type: data.update_type,
100            our_sn: data.our_sn,
101            offers: HashMap::new(),
102            retry_round: 0,
103            retry_token: 0,
104            retry_attempt: 0,
105            subject_kind_hint: data.subject_kind_hint,
106            round_retry_interval_secs: data.round_retry_interval_secs,
107            max_round_retries: data.max_round_retries,
108            witness_retry_count: data.witness_retry_count,
109            witness_retry_interval_secs: data.witness_retry_interval_secs,
110        }
111    }
112
113    const fn should_retry_auth_rounds(&self) -> bool {
114        matches!(self.update_type, UpdateType::Auth)
115            && !matches!(
116                self.subject_kind_hint,
117                Some(UpdateSubjectKind::Governance)
118            )
119    }
120
121    fn has_progress(&self, sn: u64) -> bool {
122        self.our_sn.is_none_or(|our_sn| sn > our_sn)
123    }
124
125    fn next_needed_sn(&self) -> u64 {
126        self.our_sn.map_or(0, |sn| sn.saturating_add(1))
127    }
128
129    fn next_tracker_range<'a>(
130        &self,
131        ranges: &'a [TrackerDeliveryRange],
132    ) -> Option<&'a TrackerDeliveryRange> {
133        let next_sn = self.next_needed_sn();
134        ranges
135            .iter()
136            .find(|range| range.from_sn <= next_sn && next_sn <= range.to_sn)
137    }
138
139    const fn tracker_range_rank(mode: &TrackerDeliveryMode) -> u8 {
140        match mode {
141            TrackerDeliveryMode::Clear => 1,
142            TrackerDeliveryMode::Opaque => 0,
143        }
144    }
145
146    fn insert_offer(
147        &mut self,
148        sender: PublicKey,
149        offer: Option<UpdateWitnessOffer>,
150    ) {
151        if let Some(offer) = offer
152            && self.has_progress(offer.sn)
153        {
154            self.offers.insert(sender, offer);
155        }
156    }
157
158    fn select_tracker_offer(
159        &self,
160    ) -> Option<(PublicKey, UpdateWitnessOffer, u64)> {
161        self.offers
162            .iter()
163            .filter(|(_, offer)| offer.kind == UpdateSubjectKind::Tracker)
164            .filter_map(|(sender, offer)| {
165                if !self.has_progress(offer.sn) {
166                    return None;
167                }
168
169                let range = self.next_tracker_range(&offer.ranges)?;
170                let target_sn = range.to_sn.min(offer.sn);
171                Some((
172                    sender.clone(),
173                    offer.clone(),
174                    target_sn,
175                    (
176                        Self::tracker_range_rank(&range.mode),
177                        target_sn,
178                        offer.sn,
179                    ),
180                ))
181            })
182            .max_by_key(|(.., rank)| *rank)
183            .map(|(sender, offer, target_sn, _)| (sender, offer, target_sn))
184    }
185
186    fn select_governance_offer(
187        &self,
188    ) -> Option<(PublicKey, UpdateWitnessOffer, u64)> {
189        self.offers
190            .iter()
191            .filter(|(_, offer)| offer.kind == UpdateSubjectKind::Governance)
192            .filter(|(_, offer)| self.has_progress(offer.sn))
193            .max_by_key(|(_, offer)| offer.sn)
194            .map(|(sender, offer)| (sender.clone(), offer.clone(), offer.sn))
195    }
196
197    fn select_next_request(
198        &self,
199    ) -> Option<(PublicKey, UpdateWitnessOffer, u64)> {
200        self.select_tracker_offer()
201            .or_else(|| self.select_governance_offer())
202    }
203
204    fn check_witness(&mut self, witness: PublicKey) -> bool {
205        self.witnesses.remove(&witness)
206    }
207
208    fn reset_round(&mut self) {
209        self.witnesses = self.all_witnesses.clone();
210        self.offers.clear();
211    }
212
213    async fn stop_active_updaters(
214        &self,
215        ctx: &ActorContext<Self>,
216    ) -> Result<(), ActorError> {
217        let children = ctx.system().children(ctx.path()).await;
218        for child_path in children {
219            let Ok(child) =
220                ctx.system().get_actor::<Updater>(&child_path).await
221            else {
222                continue;
223            };
224
225            if let Err(e) = child.ask_stop().await {
226                warn!(
227                    subject_id = %self.subject_id,
228                    child = %child_path,
229                    error = %e,
230                    "Failed to stop stale updater child before starting next round"
231                );
232            }
233        }
234
235        Ok(())
236    }
237
238    async fn request_distribution(
239        &self,
240        witness: PublicKey,
241        target_sn: Option<u64>,
242    ) -> Result<(), ActorError> {
243        let info = ComunicateInfo {
244            receiver: witness,
245            request_id: String::default(),
246            version: 0,
247            receiver_actor: format!(
248                "/user/node/distributor_{}",
249                self.subject_id
250            ),
251        };
252
253        self.network
254            .send_command(ave_network::CommandHelper::SendMessage {
255                message: NetworkMessage {
256                    info,
257                    message: ActorMessage::DistributionLedgerReq {
258                        actual_sn: self.our_sn,
259                        target_sn,
260                        subject_id: self.subject_id.clone(),
261                    },
262                },
263            })
264            .await
265    }
266
267    async fn create_updates(
268        &mut self,
269        ctx: &mut ActorContext<Self>,
270    ) -> Result<UpdateStartMode, ActorError> {
271        if self.all_witnesses.len() == 1 && self.our_sn.is_none() {
272            let Some(witness) = self.all_witnesses.iter().next() else {
273                return Ok(UpdateStartMode::Direct);
274            };
275
276            self.request_distribution(witness.clone(), None).await?;
277            return Ok(UpdateStartMode::Direct);
278        }
279
280        self.stop_active_updaters(ctx).await?;
281
282        for witness in self.witnesses.clone() {
283            let updater = Updater::new(
284                witness.clone(),
285                self.retry_round,
286                self.network.clone(),
287                self.witness_retry_count,
288                self.witness_retry_interval_secs,
289            );
290            let child_name = format!("{}_{}", witness, self.retry_round);
291            let child = ctx.create_child(&child_name, updater).await?;
292            let message = UpdaterMessage::NetworkLastSn {
293                subject_id: self.subject_id.clone(),
294                actual_sn: self.our_sn,
295            };
296
297            if let Err(e) = child.tell(message).await {
298                warn!(
299                    subject_id = %self.subject_id,
300                    witness = %witness,
301                    error = %e,
302                    "Updater child rejected round start message, skipping this witness in current round"
303                );
304                self.witnesses.remove(&witness);
305                continue;
306            }
307        }
308        if self.witnesses.is_empty() {
309            Ok(UpdateStartMode::Empty)
310        } else {
311            Ok(UpdateStartMode::Sweep)
312        }
313    }
314
315    async fn schedule_retry(
316        &mut self,
317        ctx: &ActorContext<Self>,
318        expected_target_sn: u64,
319        attempt: usize,
320    ) -> Result<(), ActorError> {
321        let actor = ctx.reference().await?;
322        let round = self.retry_round;
323        let token = self.retry_token.saturating_add(1);
324        self.retry_token = token;
325        let retry_interval_secs = self.round_retry_interval_secs.max(1);
326        tokio::spawn(async move {
327            tokio::time::sleep(std::time::Duration::from_secs(
328                retry_interval_secs,
329            ))
330            .await;
331            let _ = actor
332                .tell(UpdateMessage::RetryRound {
333                    expected_target_sn,
334                    round,
335                    attempt,
336                    token,
337                })
338                .await;
339        });
340
341        Ok(())
342    }
343}
344
345#[derive(Debug, Clone)]
346pub enum UpdateMessage {
347    Run,
348    Continue,
349    RetryRound {
350        expected_target_sn: u64,
351        round: u64,
352        attempt: usize,
353        token: u64,
354    },
355    Response {
356        sender: PublicKey,
357        offer: Option<UpdateWitnessOffer>,
358        round: u64,
359    },
360}
361
362impl Message for UpdateMessage {}
363
364#[async_trait]
365impl Actor for Update {
366    type Event = ();
367    type Message = UpdateMessage;
368    type Response = ();
369
370    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
371        parent_span.map_or_else(
372            || info_span!("Update", id),
373            |parent_span| info_span!(parent: parent_span, "Update", id),
374        )
375    }
376}
377
378impl NotPersistentActor for Update {}
379
380#[async_trait]
381impl Handler<Self> for Update {
382    async fn handle_message(
383        &mut self,
384        _sender: ActorPath,
385        msg: UpdateMessage,
386        ctx: &mut ActorContext<Self>,
387    ) -> Result<(), ActorError> {
388        match msg {
389            UpdateMessage::Run => {
390                self.retry_attempt = 0;
391                self.reset_round();
392                let start_mode = match self.create_updates(ctx).await {
393                    Ok(start_mode) => start_mode,
394                    Err(e) => {
395                        error!(
396                            msg_type = "Run",
397                            error = %e,
398                            "Failed to create updates"
399                        );
400                        return Err(emit_fail(ctx, e).await);
401                    }
402                };
403
404                match start_mode {
405                    UpdateStartMode::Direct | UpdateStartMode::Empty => {
406                        ctx.stop(None).await;
407                        return Ok(());
408                    }
409                    UpdateStartMode::Sweep => {}
410                }
411
412                debug!(
413                    msg_type = "Run",
414                    witnesses_count = self.witnesses.len(),
415                    "Updates created successfully"
416                );
417            }
418            UpdateMessage::Continue => {
419                let current_sn =
420                    get_local_subject_sn(ctx, &self.subject_id).await?;
421                self.our_sn = current_sn;
422                self.retry_round = self.retry_round.saturating_add(1);
423                self.retry_attempt = 0;
424                self.reset_round();
425
426                let start_mode = match self.create_updates(ctx).await {
427                    Ok(start_mode) => start_mode,
428                    Err(e) => {
429                        error!(
430                            msg_type = "Continue",
431                            subject_id = %self.subject_id,
432                            current_sn = ?current_sn,
433                            error = %e,
434                            "Failed to continue update round"
435                        );
436                        return Err(emit_fail(ctx, e).await);
437                    }
438                };
439
440                match start_mode {
441                    UpdateStartMode::Direct | UpdateStartMode::Empty => {
442                        ctx.stop(None).await;
443                        return Ok(());
444                    }
445                    UpdateStartMode::Sweep => {}
446                }
447            }
448            UpdateMessage::RetryRound {
449                expected_target_sn,
450                round,
451                attempt,
452                token,
453            } => {
454                if round != self.retry_round || token != self.retry_token {
455                    return Ok(());
456                }
457
458                let current_sn =
459                    get_local_subject_sn(ctx, &self.subject_id).await?;
460                if current_sn.is_some_and(|sn| sn >= expected_target_sn) {
461                    debug!(
462                        msg_type = "RetryRound",
463                        subject_id = %self.subject_id,
464                        current_sn = ?current_sn,
465                        expected_target_sn = expected_target_sn,
466                        "Update target already reached before retry round restart"
467                    );
468                    ctx.stop(None).await;
469                    return Ok(());
470                }
471
472                if attempt >= self.max_round_retries {
473                    warn!(
474                        msg_type = "RetryRound",
475                        subject_id = %self.subject_id,
476                        current_sn = ?current_sn,
477                        expected_target_sn = expected_target_sn,
478                        attempt = attempt,
479                        "Update retry round exhausted before reaching target"
480                    );
481                    ctx.stop(None).await;
482                    return Ok(());
483                }
484
485                self.our_sn = current_sn;
486                self.retry_round = self.retry_round.saturating_add(1);
487                self.retry_attempt = attempt.saturating_add(1);
488                self.reset_round();
489
490                let start_mode = match self.create_updates(ctx).await {
491                    Ok(start_mode) => start_mode,
492                    Err(e) => {
493                        error!(
494                            msg_type = "RetryRound",
495                            subject_id = %self.subject_id,
496                            current_sn = ?current_sn,
497                            expected_target_sn = expected_target_sn,
498                            error = %e,
499                            "Failed to restart update round"
500                        );
501                        return Err(emit_fail(ctx, e).await);
502                    }
503                };
504
505                match start_mode {
506                    UpdateStartMode::Direct | UpdateStartMode::Empty => {
507                        ctx.stop(None).await;
508                        return Ok(());
509                    }
510                    UpdateStartMode::Sweep => {}
511                }
512            }
513            UpdateMessage::Response {
514                sender,
515                offer,
516                round,
517            } => {
518                if round != self.retry_round {
519                    return Ok(());
520                }
521
522                if self.check_witness(sender.clone()) {
523                    self.insert_offer(sender, offer);
524
525                    if self.witnesses.is_empty() {
526                        let selected_request = self.select_next_request();
527                        let mut keep_running = false;
528
529                        if let Some((better_node, offer, target_sn)) =
530                            selected_request.clone()
531                        {
532                            let expected_target_sn = self
533                                .offers
534                                .values()
535                                .filter(|offer| {
536                                    offer.kind == UpdateSubjectKind::Tracker
537                                })
538                                .map(|offer| offer.sn)
539                                .max()
540                                .unwrap_or(target_sn);
541
542                            if let Err(e) = self
543                                .request_distribution(
544                                    better_node.clone(),
545                                    Some(target_sn),
546                                )
547                                .await
548                            {
549                                error!(
550                                    msg_type = "Response",
551                                    error = %e,
552                                    node = %better_node,
553                                    "Failed to send request to network"
554                                );
555                                return Err(emit_fail(ctx, e).await);
556                            } else {
557                                debug!(
558                                    msg_type = "Response",
559                                    node = %better_node,
560                                    subject_id = %self.subject_id,
561                                    offer_kind = ?offer.kind,
562                                    offer_sn = offer.sn,
563                                    offer_clear_sn = ?offer.clear_sn,
564                                    target_sn = target_sn,
565                                    "Request sent to better node"
566                                );
567                            }
568
569                            if matches!(offer.kind, UpdateSubjectKind::Tracker)
570                                && self.should_retry_auth_rounds()
571                            {
572                                keep_running = true;
573                                if let Err(e) = self
574                                    .schedule_retry(
575                                        ctx,
576                                        expected_target_sn,
577                                        self.retry_attempt,
578                                    )
579                                    .await
580                                {
581                                    error!(
582                                        msg_type = "Response",
583                                        subject_id = %self.subject_id,
584                                        expected_target_sn = expected_target_sn,
585                                        error = %e,
586                                        "Failed to schedule update retry"
587                                    );
588                                    return Err(emit_fail(ctx, e).await);
589                                }
590                            }
591                        }
592
593                        if let UpdateType::Request { id, subject_id } =
594                            &self.update_type
595                        {
596                            let request_path = ActorPath::from(format!(
597                                "/user/request/{}",
598                                subject_id
599                            ));
600                            match ctx
601                                .system()
602                                .get_actor::<RequestManager>(&request_path)
603                                .await
604                            {
605                                Ok(request_actor) => {
606                                    let request = if self.offers.is_empty() {
607                                        RequestManagerMessage::FinishReboot {
608                                            request_id: id.clone(),
609                                        }
610                                    } else {
611                                        RequestManagerMessage::RebootWait {
612                                            request_id: id.clone(),
613                                            governance_id: self
614                                                .subject_id
615                                                .clone(),
616                                        }
617                                    };
618
619                                    if let Err(e) =
620                                        request_actor.tell(request).await
621                                    {
622                                        error!(
623                                            msg_type = "Response",
624                                            error = %e,
625                                            subject_id = %self.subject_id,
626                                            "Failed to send response to request actor"
627                                        );
628                                        return Err(emit_fail(ctx, e).await);
629                                    }
630                                }
631                                Err(e) => {
632                                    error!(
633                                        msg_type = "Response",
634                                        error = %e,
635                                        path = %request_path,
636                                        subject_id = %self.subject_id,
637                                        "Request actor not found"
638                                    );
639                                    return Err(emit_fail(ctx, e).await);
640                                }
641                            };
642                        };
643
644                        debug!(
645                            msg_type = "Response",
646                            subject_id = %self.subject_id,
647                            has_better = selected_request.is_some(),
648                            "All witnesses responded, update complete"
649                        );
650
651                        if !self.should_retry_auth_rounds() || !keep_running {
652                            ctx.stop(None).await;
653                        }
654                    }
655                } else {
656                    warn!(
657                        msg_type = "Response",
658                        subject_id = %self.subject_id,
659                        sender = %sender,
660                        has_offer = self.offers.contains_key(&sender),
661                        "Ignoring response from unexpected or already-processed witness"
662                    );
663                }
664            }
665        };
666
667        Ok(())
668    }
669
670    async fn on_child_fault(
671        &mut self,
672        error: ActorError,
673        ctx: &mut ActorContext<Self>,
674    ) -> ChildAction {
675        error!(
676            subject_id = %self.subject_id,
677            update_type = ?self.update_type,
678            error = %error,
679            "Child fault in update actor"
680        );
681        emit_fail(ctx, error).await;
682        ChildAction::Stop
683    }
684}