Skip to main content

ave_core/governance/
tracker_sync.rs

1use std::{
2    collections::{HashSet, VecDeque},
3    sync::Arc,
4    time::Duration,
5};
6
7use async_trait::async_trait;
8use ave_actors::{
9    Actor, ActorContext, ActorError, ActorPath, Handler, Message,
10    NotPersistentActor, Response,
11};
12use ave_common::identity::{DigestIdentifier, PublicKey};
13use network::ComunicateInfo;
14use rand::seq::IteratorRandom;
15use tracing::{Span, debug, info_span, warn};
16
17use crate::auth::{Auth, AuthMessage, AuthResponse};
18use crate::governance::witnesses_register::{
19    CurrentWitnessSubject, WitnessesRegister, WitnessesRegisterMessage,
20    WitnessesRegisterResponse,
21};
22use crate::governance::{
23    Governance, GovernanceMessage, GovernanceResponse, model::WitnessesData,
24};
25use crate::helpers::network::{
26    ActorMessage, NetworkMessage, service::NetworkSender,
27};
28use crate::metrics::try_core_metrics;
29use crate::model::common::node::get_subject_data;
30use crate::model::common::subject::acquire_subject;
31use crate::node::SubjectData;
32use crate::tracker::{Tracker, TrackerMessage, TrackerResponse};
33
34#[derive(Debug, Clone)]
35pub enum TrackerSyncMessage {
36    Tick,
37    FetchTimeout { request_nonce: u64 },
38    UpdateTimeout { batch_nonce: u64 },
39    NetworkRequest(TrackerSyncNetworkRequest),
40    NetworkResponse(TrackerSyncNetworkResponse),
41}
42
43impl Message for TrackerSyncMessage {}
44
45#[derive(Debug, Clone)]
46pub enum TrackerSyncResponse {
47    None,
48}
49
50impl Response for TrackerSyncResponse {}
51
52#[derive(Debug, Clone)]
53struct FetchState {
54    peer: PublicKey,
55    governance_version: u64,
56    request_nonce: u64,
57}
58
59#[derive(Debug, Clone)]
60struct UpdateState {
61    peer: PublicKey,
62    governance_version: u64,
63    pending_items: VecDeque<CurrentWitnessSubject>,
64    next_cursor: Option<DigestIdentifier>,
65    active_batch: Vec<ActiveUpdate>,
66    batch_nonce: u64,
67}
68
69#[derive(Debug, Clone)]
70struct ActiveUpdate {
71    item: CurrentWitnessSubject,
72    last_seen_sn: Option<u64>,
73    stalled_checks: u8,
74}
75
76#[derive(Debug, Clone, Default)]
77enum SyncState {
78    #[default]
79    Idle,
80    Fetching(FetchState),
81    Updating(UpdateState),
82}
83
84#[derive(Debug, Clone)]
85pub struct TrackerSyncConfig {
86    pub service: bool,
87    pub tick_interval: Duration,
88    pub response_timeout: Duration,
89    pub page_size: usize,
90    pub update_batch_size: usize,
91    pub update_timeout: Duration,
92}
93
94#[derive(Debug, Clone)]
95pub struct TrackerSyncNetworkRequest {
96    pub request_nonce: u64,
97    pub governance_version: u64,
98    pub after_subject_id: Option<DigestIdentifier>,
99    pub limit: usize,
100    pub info: ComunicateInfo,
101    pub sender: PublicKey,
102    pub receiver_actor: String,
103}
104
105#[derive(Debug, Clone)]
106pub struct TrackerSyncNetworkResponse {
107    pub peer: PublicKey,
108    pub request_nonce: u64,
109    pub governance_version: u64,
110    pub items: Vec<CurrentWitnessSubject>,
111    pub next_cursor: Option<DigestIdentifier>,
112}
113
114pub struct TrackerSync {
115    governance_id: DigestIdentifier,
116    our_key: Arc<PublicKey>,
117    network: Arc<NetworkSender>,
118    service: bool,
119    tick_interval: Duration,
120    response_timeout: Duration,
121    page_size: usize,
122    update_batch_size: usize,
123    update_timeout: Duration,
124    next_nonce: u64,
125    state: SyncState,
126}
127
128const MAX_STALLED_UPDATE_CHECKS: u8 = 3;
129
130impl TrackerSync {
131    fn observe_round(result: &'static str) {
132        if let Some(metrics) = try_core_metrics() {
133            metrics.observe_tracker_sync_round(result);
134        }
135    }
136
137    fn observe_update(result: &'static str) {
138        if let Some(metrics) = try_core_metrics() {
139            metrics.observe_tracker_sync_update(result);
140        }
141    }
142
143    pub fn new(
144        governance_id: DigestIdentifier,
145        our_key: Arc<PublicKey>,
146        network: Arc<NetworkSender>,
147        config: TrackerSyncConfig,
148    ) -> Self {
149        Self {
150            governance_id,
151            our_key,
152            network,
153            service: config.service,
154            tick_interval: config.tick_interval,
155            response_timeout: config.response_timeout,
156            page_size: config.page_size.max(1),
157            update_batch_size: config.update_batch_size.max(1),
158            update_timeout: config.update_timeout,
159            next_nonce: 0,
160            state: SyncState::Idle,
161        }
162    }
163
164    const fn allocate_nonce(&mut self) -> u64 {
165        self.next_nonce += 1;
166        self.next_nonce
167    }
168
169    async fn schedule_tick(
170        &self,
171        ctx: &ActorContext<Self>,
172    ) -> Result<(), ActorError> {
173        if !self.service {
174            return Ok(());
175        }
176
177        let actor = ctx.reference().await?;
178        let delay = self.tick_interval;
179        tokio::spawn(async move {
180            tokio::time::sleep(delay).await;
181            let _ = actor.tell(TrackerSyncMessage::Tick).await;
182        });
183        Ok(())
184    }
185
186    async fn schedule_fetch_timeout(
187        &self,
188        ctx: &ActorContext<Self>,
189        request_nonce: u64,
190    ) -> Result<(), ActorError> {
191        let actor = ctx.reference().await?;
192        let delay = self.response_timeout;
193        tokio::spawn(async move {
194            tokio::time::sleep(delay).await;
195            let _ = actor
196                .tell(TrackerSyncMessage::FetchTimeout { request_nonce })
197                .await;
198        });
199        Ok(())
200    }
201
202    async fn schedule_update_timeout(
203        &self,
204        ctx: &ActorContext<Self>,
205        batch_nonce: u64,
206    ) -> Result<(), ActorError> {
207        let actor = ctx.reference().await?;
208        let delay = self.update_timeout;
209        tokio::spawn(async move {
210            tokio::time::sleep(delay).await;
211            let _ = actor
212                .tell(TrackerSyncMessage::UpdateTimeout { batch_nonce })
213                .await;
214        });
215        Ok(())
216    }
217
218    async fn get_governance_version(
219        &self,
220        ctx: &ActorContext<Self>,
221    ) -> Result<u64, ActorError> {
222        let path = ActorPath::from(format!(
223            "/user/node/subject_manager/{}",
224            self.governance_id
225        ));
226        let actor = ctx.system().get_actor::<Governance>(&path).await?;
227        let response = actor.ask(GovernanceMessage::GetVersion).await?;
228
229        match response {
230            GovernanceResponse::Version(version) => Ok(version),
231            _ => Err(ActorError::UnexpectedResponse {
232                path,
233                expected: "GovernanceResponse::Version".to_owned(),
234            }),
235        }
236    }
237
238    async fn get_governance_peers(
239        &self,
240        ctx: &ActorContext<Self>,
241    ) -> Result<HashSet<PublicKey>, ActorError> {
242        let path = ActorPath::from(format!(
243            "/user/node/subject_manager/{}",
244            self.governance_id
245        ));
246        let actor = ctx.system().get_actor::<Governance>(&path).await?;
247        let response = actor.ask(GovernanceMessage::GetGovernance).await?;
248
249        let GovernanceResponse::Governance(governance) = response else {
250            return Err(ActorError::UnexpectedResponse {
251                path,
252                expected: "GovernanceResponse::Governance".to_owned(),
253            });
254        };
255
256        governance.get_witnesses(WitnessesData::Gov).map_err(|e| {
257            ActorError::Functional {
258                description: e.to_string(),
259            }
260        })
261    }
262
263    async fn get_auth_peers(
264        &self,
265        ctx: &ActorContext<Self>,
266    ) -> Result<HashSet<PublicKey>, ActorError> {
267        let auth_path = ActorPath::from("/user/node/auth");
268        let auth = ctx.system().get_actor::<Auth>(&auth_path).await?;
269        match auth
270            .ask(AuthMessage::GetAuth {
271                subject_id: self.governance_id.clone(),
272            })
273            .await
274        {
275            Ok(AuthResponse::Witnesses(mut witnesses)) => {
276                witnesses.remove(&*self.our_key);
277                Ok(witnesses)
278            }
279            Ok(_) => Ok(HashSet::new()),
280            Err(ActorError::Functional { .. }) => Ok(HashSet::new()),
281            Err(error) => Err(error),
282        }
283    }
284
285    async fn select_peer(
286        &self,
287        ctx: &ActorContext<Self>,
288    ) -> Result<Option<PublicKey>, ActorError> {
289        let mut peers = self.get_governance_peers(ctx).await?;
290        peers.extend(self.get_auth_peers(ctx).await?);
291        peers.remove(&*self.our_key);
292
293        let mut rng = rand::rng();
294        Ok(peers.into_iter().choose(&mut rng))
295    }
296
297    async fn start_fetch(
298        &mut self,
299        ctx: &ActorContext<Self>,
300        peer: PublicKey,
301        governance_version: u64,
302        after_subject_id: Option<DigestIdentifier>,
303    ) -> Result<(), ActorError> {
304        let request_nonce = self.allocate_nonce();
305
306        self.network
307            .send_command(network::CommandHelper::SendMessage {
308                message: NetworkMessage {
309                    info: ComunicateInfo {
310                        receiver: peer.clone(),
311                        request_id: String::default(),
312                        version: 0,
313                        receiver_actor: format!(
314                            "/user/node/subject_manager/{}/tracker_sync",
315                            self.governance_id
316                        ),
317                    },
318                    message: ActorMessage::TrackerSyncReq {
319                        subject_id: self.governance_id.clone(),
320                        request_nonce,
321                        governance_version,
322                        after_subject_id,
323                        limit: self.page_size,
324                        receiver_actor: ctx.path().to_string(),
325                    },
326                },
327            })
328            .await?;
329
330        self.state = SyncState::Fetching(FetchState {
331            peer,
332            governance_version,
333            request_nonce,
334        });
335        self.schedule_fetch_timeout(ctx, request_nonce).await
336    }
337
338    async fn get_local_tracker_sn(
339        &self,
340        ctx: &mut ActorContext<Self>,
341        subject_id: &DigestIdentifier,
342    ) -> Result<Option<u64>, ActorError> {
343        let Some(data) = get_subject_data(ctx, subject_id).await? else {
344            return Ok(None);
345        };
346
347        if !matches!(data, SubjectData::Tracker { .. }) {
348            return Ok(None);
349        }
350
351        let requester = format!("tracker_sync_local:{}", subject_id);
352        let lease =
353            acquire_subject(ctx, subject_id, requester, None, true).await?;
354        let path = ActorPath::from(format!(
355            "/user/node/subject_manager/{}",
356            subject_id
357        ));
358        let tracker = ctx.system().get_actor::<Tracker>(&path).await?;
359        let response = tracker.ask(TrackerMessage::GetMetadata).await;
360        lease.finish(ctx).await?;
361        let response = response?;
362
363        match response {
364            TrackerResponse::Metadata(metadata) => Ok(Some(metadata.sn)),
365            _ => Err(ActorError::UnexpectedResponse {
366                path,
367                expected: "TrackerResponse::Metadata".to_owned(),
368            }),
369        }
370    }
371
372    async fn request_tracker_update(
373        &self,
374        peer: &PublicKey,
375        subject_id: &DigestIdentifier,
376        actual_sn: Option<u64>,
377    ) -> Result<(), ActorError> {
378        self.network
379            .send_command(network::CommandHelper::SendMessage {
380                message: NetworkMessage {
381                    info: ComunicateInfo {
382                        receiver: peer.clone(),
383                        request_id: String::default(),
384                        version: 0,
385                        receiver_actor: format!(
386                            "/user/node/distributor_{}",
387                            subject_id
388                        ),
389                    },
390                    message: ActorMessage::DistributionLedgerReq {
391                        actual_sn,
392                        subject_id: subject_id.clone(),
393                    },
394                },
395            })
396            .await
397    }
398
399    async fn build_pending_updates(
400        &self,
401        ctx: &mut ActorContext<Self>,
402        items: Vec<CurrentWitnessSubject>,
403    ) -> Result<VecDeque<CurrentWitnessSubject>, ActorError> {
404        let mut pending_items = VecDeque::new();
405
406        for item in items {
407            let local_sn =
408                self.get_local_tracker_sn(ctx, &item.subject_id).await?;
409            if local_sn.is_none_or(|local_sn| local_sn < item.target_sn) {
410                pending_items.push_back(item);
411            }
412        }
413
414        Ok(pending_items)
415    }
416
417    async fn start_update_phase(
418        &mut self,
419        ctx: &mut ActorContext<Self>,
420        peer: PublicKey,
421        governance_version: u64,
422        pending_items: VecDeque<CurrentWitnessSubject>,
423        next_cursor: Option<DigestIdentifier>,
424    ) -> Result<(), ActorError> {
425        self.state = SyncState::Updating(UpdateState {
426            peer,
427            governance_version,
428            pending_items,
429            next_cursor,
430            active_batch: Vec::new(),
431            batch_nonce: 0,
432        });
433
434        self.advance_update_phase(ctx).await
435    }
436
437    async fn advance_update_phase(
438        &mut self,
439        ctx: &mut ActorContext<Self>,
440    ) -> Result<(), ActorError> {
441        let current_local_version = self.get_governance_version(ctx).await?;
442        let SyncState::Updating(mut state) = std::mem::take(&mut self.state)
443        else {
444            return Ok(());
445        };
446
447        if current_local_version != state.governance_version {
448            return self
449                .start_fetch(ctx, state.peer, current_local_version, None)
450                .await;
451        }
452
453        if !state.active_batch.is_empty() {
454            let mut still_running =
455                Vec::with_capacity(state.active_batch.len());
456            for mut active_update in state.active_batch {
457                let current_sn = self
458                    .get_local_tracker_sn(ctx, &active_update.item.subject_id)
459                    .await?;
460
461                if current_sn.is_some_and(|current_sn| {
462                    current_sn >= active_update.item.target_sn
463                }) {
464                    Self::observe_update("completed");
465                    continue;
466                }
467
468                if current_sn != active_update.last_seen_sn {
469                    active_update.last_seen_sn = current_sn;
470                    active_update.stalled_checks = 0;
471                    still_running.push(active_update);
472                    continue;
473                }
474
475                active_update.stalled_checks += 1;
476                if active_update.stalled_checks < MAX_STALLED_UPDATE_CHECKS {
477                    still_running.push(active_update);
478                } else {
479                    Self::observe_update("stalled");
480                    warn!(
481                        governance_id = %self.governance_id,
482                        subject_id = %active_update.item.subject_id,
483                        target_sn = active_update.item.target_sn,
484                        current_sn = ?current_sn,
485                        "Tracker sync update stalled, skipping subject"
486                    );
487                }
488            }
489
490            state.active_batch = still_running;
491            if !state.active_batch.is_empty() {
492                let batch_nonce = self.allocate_nonce();
493                state.batch_nonce = batch_nonce;
494                self.state = SyncState::Updating(state);
495                return self.schedule_update_timeout(ctx, batch_nonce).await;
496            }
497        }
498
499        if state.pending_items.is_empty() {
500            if let Some(after_subject_id) = state.next_cursor {
501                return self
502                    .start_fetch(
503                        ctx,
504                        state.peer,
505                        state.governance_version,
506                        Some(after_subject_id),
507                    )
508                    .await;
509            }
510
511            Self::observe_round("completed");
512            return self.finish_cycle(ctx).await;
513        }
514
515        let batch_nonce = self.allocate_nonce();
516        let peer = state.peer.clone();
517        let mut active_batch = Vec::with_capacity(self.update_batch_size);
518        for _ in 0..self.update_batch_size {
519            let Some(item) = state.pending_items.pop_front() else {
520                break;
521            };
522            let last_seen_sn =
523                self.get_local_tracker_sn(ctx, &item.subject_id).await?;
524            self.request_tracker_update(&peer, &item.subject_id, last_seen_sn)
525                .await?;
526            Self::observe_update("launched");
527            active_batch.push(ActiveUpdate {
528                item,
529                last_seen_sn,
530                stalled_checks: 0,
531            });
532        }
533
534        state.active_batch = active_batch;
535        state.batch_nonce = batch_nonce;
536        self.state = SyncState::Updating(state);
537
538        self.schedule_update_timeout(ctx, batch_nonce).await
539    }
540
541    async fn handle_tick(
542        &mut self,
543        ctx: &ActorContext<Self>,
544    ) -> Result<(), ActorError> {
545        if !self.service || !matches!(self.state, SyncState::Idle) {
546            return Ok(());
547        }
548
549        let Some(peer) = self.select_peer(ctx).await? else {
550            Self::observe_round("no_peer");
551            self.schedule_tick(ctx).await?;
552            return Ok(());
553        };
554
555        let governance_version = self.get_governance_version(ctx).await?;
556        Self::observe_round("started");
557        self.start_fetch(ctx, peer, governance_version, None).await
558    }
559
560    async fn handle_network_request(
561        &self,
562        ctx: &ActorContext<Self>,
563        request: TrackerSyncNetworkRequest,
564    ) -> Result<(), ActorError> {
565        let path = ActorPath::from(format!(
566            "/user/node/subject_manager/{}/witnesses_register",
567            self.governance_id
568        ));
569        let actor = ctx.system().get_actor::<WitnessesRegister>(&path).await?;
570        let response = actor
571            .ask(WitnessesRegisterMessage::ListCurrentWitnessSubjects {
572                node: request.sender.clone(),
573                governance_version: request.governance_version,
574                after_subject_id: request.after_subject_id,
575                limit: request.limit,
576            })
577            .await?;
578
579        let WitnessesRegisterResponse::CurrentWitnessSubjects {
580            governance_version,
581            items,
582            next_cursor,
583        } = response
584        else {
585            return Err(ActorError::UnexpectedResponse {
586                path,
587                expected: "WitnessesRegisterResponse::CurrentWitnessSubjects"
588                    .to_owned(),
589            });
590        };
591
592        self.network
593            .send_command(network::CommandHelper::SendMessage {
594                message: NetworkMessage {
595                    info: ComunicateInfo {
596                        receiver: request.sender,
597                        request_id: request.info.request_id,
598                        version: request.info.version,
599                        receiver_actor: request.receiver_actor,
600                    },
601                    message: ActorMessage::TrackerSyncRes {
602                        request_nonce: request.request_nonce,
603                        governance_version,
604                        items,
605                        next_cursor,
606                    },
607                },
608            })
609            .await
610    }
611
612    async fn finish_cycle(
613        &mut self,
614        ctx: &ActorContext<Self>,
615    ) -> Result<(), ActorError> {
616        self.state = SyncState::Idle;
617        self.schedule_tick(ctx).await
618    }
619}
620
621#[async_trait]
622impl Actor for TrackerSync {
623    type Event = ();
624    type Message = TrackerSyncMessage;
625    type Response = TrackerSyncResponse;
626
627    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
628        parent_span.map_or_else(
629            || info_span!("TrackerSync"),
630            |parent| info_span!(parent: parent, "TrackerSync"),
631        )
632    }
633
634    async fn pre_start(
635        &mut self,
636        ctx: &mut ActorContext<Self>,
637    ) -> Result<(), ActorError> {
638        self.schedule_tick(ctx).await
639    }
640}
641
642impl NotPersistentActor for TrackerSync {}
643
644#[async_trait]
645impl Handler<Self> for TrackerSync {
646    async fn handle_message(
647        &mut self,
648        _sender: ActorPath,
649        msg: TrackerSyncMessage,
650        ctx: &mut ActorContext<Self>,
651    ) -> Result<TrackerSyncResponse, ActorError> {
652        match msg {
653            TrackerSyncMessage::Tick => {
654                if let Err(error) = self.handle_tick(ctx).await {
655                    Self::observe_round("error");
656                    warn!(
657                        governance_id = %self.governance_id,
658                        error = %error,
659                        "Tracker sync tick failed"
660                    );
661                    self.finish_cycle(ctx).await?;
662                }
663            }
664            TrackerSyncMessage::FetchTimeout { request_nonce } => {
665                let timed_out = matches!(
666                    &self.state,
667                    SyncState::Fetching(state)
668                        if state.request_nonce == request_nonce
669                );
670
671                if timed_out {
672                    Self::observe_round("timeout");
673                    warn!(
674                        governance_id = %self.governance_id,
675                        request_nonce = request_nonce,
676                        "Tracker sync fetch timed out"
677                    );
678                    self.finish_cycle(ctx).await?;
679                }
680            }
681            TrackerSyncMessage::UpdateTimeout { batch_nonce } => {
682                let timed_out = matches!(
683                    &self.state,
684                    SyncState::Updating(state)
685                        if state.batch_nonce == batch_nonce
686                );
687
688                if timed_out {
689                    self.advance_update_phase(ctx).await?;
690                }
691            }
692            TrackerSyncMessage::NetworkRequest(request) => {
693                self.handle_network_request(ctx, request).await?;
694            }
695            TrackerSyncMessage::NetworkResponse(
696                TrackerSyncNetworkResponse {
697                    peer,
698                    request_nonce,
699                    governance_version,
700                    items,
701                    next_cursor,
702                },
703            ) => {
704                let (active_peer, active_governance_version) = match &self.state
705                {
706                    SyncState::Fetching(state)
707                        if state.peer == peer
708                            && state.request_nonce == request_nonce =>
709                    {
710                        (state.peer.clone(), state.governance_version)
711                    }
712                    _ => return Ok(TrackerSyncResponse::None),
713                };
714
715                debug!(
716                    governance_id = %self.governance_id,
717                    peer = %peer,
718                    request_nonce = request_nonce,
719                    governance_version = governance_version,
720                    item_count = items.len(),
721                    has_next = next_cursor.is_some(),
722                    "Received tracker sync page"
723                );
724
725                let local_governance_version =
726                    self.get_governance_version(ctx).await?;
727                let effective_governance_version =
728                    local_governance_version.max(governance_version);
729
730                if effective_governance_version != active_governance_version {
731                    Self::observe_round("gov_changed");
732                    self.start_fetch(
733                        ctx,
734                        active_peer,
735                        effective_governance_version,
736                        None,
737                    )
738                    .await?;
739                    return Ok(TrackerSyncResponse::None);
740                }
741
742                let pending_items =
743                    self.build_pending_updates(ctx, items).await?;
744                if pending_items.is_empty() {
745                    if let Some(after_subject_id) = next_cursor {
746                        self.start_fetch(
747                            ctx,
748                            active_peer,
749                            governance_version,
750                            Some(after_subject_id),
751                        )
752                        .await?;
753                    } else {
754                        Self::observe_round("completed");
755                        self.finish_cycle(ctx).await?;
756                    }
757                    return Ok(TrackerSyncResponse::None);
758                }
759
760                self.start_update_phase(
761                    ctx,
762                    active_peer,
763                    governance_version,
764                    pending_items,
765                    next_cursor,
766                )
767                .await?;
768            }
769        }
770
771        Ok(TrackerSyncResponse::None)
772    }
773}