Skip to main content

ave_core/governance/
tracker_sync.rs

1use std::{
2    collections::{HashSet, VecDeque},
3    sync::Arc,
4    time::Duration,
5};
6
7use async_trait::async_trait;
8use ave_actors::{
9    Actor, ActorContext, ActorError, ActorPath, Handler, Message,
10    NotPersistentActor, Response,
11};
12use ave_common::identity::{DigestIdentifier, PublicKey};
13use ave_network::ComunicateInfo;
14use rand::seq::IteratorRandom;
15use tracing::{Span, debug, info_span, warn};
16
17use crate::auth::{Auth, AuthMessage, AuthResponse};
18use crate::governance::witnesses_register::{
19    CurrentWitnessSubject, WitnessesRegister, WitnessesRegisterMessage,
20    WitnessesRegisterResponse,
21};
22use crate::governance::{
23    Governance, GovernanceMessage, GovernanceResponse, model::WitnessesData,
24};
25use crate::helpers::network::{
26    ActorMessage, NetworkMessage, service::NetworkSender,
27};
28use crate::metrics::try_core_metrics;
29use crate::model::common::node::get_subject_data;
30use crate::model::common::subject::acquire_subject;
31use crate::node::SubjectData;
32use crate::tracker::{Tracker, TrackerMessage, TrackerResponse};
33
34#[derive(Debug, Clone)]
35pub enum TrackerSyncMessage {
36    Tick,
37    FetchTimeout { request_nonce: u64 },
38    UpdateTimeout { batch_nonce: u64 },
39    NetworkRequest(TrackerSyncNetworkRequest),
40    NetworkResponse(TrackerSyncNetworkResponse),
41}
42
43impl Message for TrackerSyncMessage {}
44
45#[derive(Debug, Clone)]
46pub enum TrackerSyncResponse {
47    None,
48}
49
50impl Response for TrackerSyncResponse {}
51
52#[derive(Debug, Clone)]
53struct FetchState {
54    peer: PublicKey,
55    governance_version: u64,
56    request_nonce: u64,
57}
58
59#[derive(Debug, Clone)]
60struct UpdateState {
61    peer: PublicKey,
62    governance_version: u64,
63    pending_items: VecDeque<CurrentWitnessSubject>,
64    next_cursor: Option<DigestIdentifier>,
65    active_batch: Vec<ActiveUpdate>,
66    batch_nonce: u64,
67}
68
69#[derive(Debug, Clone)]
70struct ActiveUpdate {
71    item: CurrentWitnessSubject,
72    last_seen_sn: Option<u64>,
73    stalled_checks: u8,
74}
75
76#[derive(Debug, Clone, Default)]
77enum SyncState {
78    #[default]
79    Idle,
80    Fetching(FetchState),
81    Updating(UpdateState),
82}
83
84#[derive(Debug, Clone)]
85pub struct TrackerSyncConfig {
86    pub service: bool,
87    pub tick_interval: Duration,
88    pub response_timeout: Duration,
89    pub page_size: usize,
90    pub update_batch_size: usize,
91    pub update_timeout: Duration,
92}
93
94#[derive(Debug, Clone)]
95pub struct TrackerSyncNetworkRequest {
96    pub request_nonce: u64,
97    pub governance_version: u64,
98    pub after_subject_id: Option<DigestIdentifier>,
99    pub limit: usize,
100    pub info: ComunicateInfo,
101    pub sender: PublicKey,
102    pub receiver_actor: String,
103}
104
105#[derive(Debug, Clone)]
106pub struct TrackerSyncNetworkResponse {
107    pub peer: PublicKey,
108    pub request_nonce: u64,
109    pub governance_version: u64,
110    pub items: Vec<CurrentWitnessSubject>,
111    pub next_cursor: Option<DigestIdentifier>,
112}
113
114pub struct TrackerSync {
115    governance_id: DigestIdentifier,
116    our_key: Arc<PublicKey>,
117    network: Arc<NetworkSender>,
118    service: bool,
119    tick_interval: Duration,
120    response_timeout: Duration,
121    page_size: usize,
122    update_batch_size: usize,
123    update_timeout: Duration,
124    next_nonce: u64,
125    state: SyncState,
126}
127
128const MAX_STALLED_UPDATE_CHECKS: u8 = 3;
129
130impl TrackerSync {
131    fn observe_round(result: &'static str) {
132        if let Some(metrics) = try_core_metrics() {
133            metrics.observe_tracker_sync_round(result);
134        }
135    }
136
137    fn observe_update(result: &'static str) {
138        if let Some(metrics) = try_core_metrics() {
139            metrics.observe_tracker_sync_update(result);
140        }
141    }
142
143    pub fn new(
144        governance_id: DigestIdentifier,
145        our_key: Arc<PublicKey>,
146        network: Arc<NetworkSender>,
147        config: TrackerSyncConfig,
148    ) -> Self {
149        Self {
150            governance_id,
151            our_key,
152            network,
153            service: config.service,
154            tick_interval: config.tick_interval,
155            response_timeout: config.response_timeout,
156            page_size: config.page_size.max(1),
157            update_batch_size: config.update_batch_size.max(1),
158            update_timeout: config.update_timeout,
159            next_nonce: 0,
160            state: SyncState::Idle,
161        }
162    }
163
164    const fn allocate_nonce(&mut self) -> u64 {
165        self.next_nonce += 1;
166        self.next_nonce
167    }
168
169    async fn schedule_tick(
170        &self,
171        ctx: &ActorContext<Self>,
172    ) -> Result<(), ActorError> {
173        if !self.service {
174            return Ok(());
175        }
176
177        let actor = ctx.reference().await?;
178        let delay = self.tick_interval;
179        tokio::spawn(async move {
180            tokio::time::sleep(delay).await;
181            let _ = actor.tell(TrackerSyncMessage::Tick).await;
182        });
183        Ok(())
184    }
185
186    async fn schedule_fetch_timeout(
187        &self,
188        ctx: &ActorContext<Self>,
189        request_nonce: u64,
190    ) -> Result<(), ActorError> {
191        let actor = ctx.reference().await?;
192        let delay = self.response_timeout;
193        tokio::spawn(async move {
194            tokio::time::sleep(delay).await;
195            let _ = actor
196                .tell(TrackerSyncMessage::FetchTimeout { request_nonce })
197                .await;
198        });
199        Ok(())
200    }
201
202    async fn schedule_update_timeout(
203        &self,
204        ctx: &ActorContext<Self>,
205        batch_nonce: u64,
206    ) -> Result<(), ActorError> {
207        let actor = ctx.reference().await?;
208        let delay = self.update_timeout;
209        tokio::spawn(async move {
210            tokio::time::sleep(delay).await;
211            let _ = actor
212                .tell(TrackerSyncMessage::UpdateTimeout { batch_nonce })
213                .await;
214        });
215        Ok(())
216    }
217
218    async fn get_governance_version(
219        &self,
220        ctx: &ActorContext<Self>,
221    ) -> Result<u64, ActorError> {
222        let path = ActorPath::from(format!(
223            "/user/node/subject_manager/{}",
224            self.governance_id
225        ));
226        let actor = ctx.system().get_actor::<Governance>(&path).await?;
227        let response = actor.ask(GovernanceMessage::GetVersion).await?;
228
229        match response {
230            GovernanceResponse::Version(version) => Ok(version),
231            _ => Err(ActorError::UnexpectedResponse {
232                path,
233                expected: "GovernanceResponse::Version".to_owned(),
234            }),
235        }
236    }
237
238    async fn get_governance_peers(
239        &self,
240        ctx: &ActorContext<Self>,
241    ) -> Result<HashSet<PublicKey>, ActorError> {
242        let path = ActorPath::from(format!(
243            "/user/node/subject_manager/{}",
244            self.governance_id
245        ));
246        let actor = ctx.system().get_actor::<Governance>(&path).await?;
247        let response = actor.ask(GovernanceMessage::GetGovernance).await?;
248
249        let GovernanceResponse::Governance(governance) = response else {
250            return Err(ActorError::UnexpectedResponse {
251                path,
252                expected: "GovernanceResponse::Governance".to_owned(),
253            });
254        };
255
256        governance.get_witnesses(WitnessesData::Gov).map_err(|e| {
257            ActorError::Functional {
258                description: e.to_string(),
259            }
260        })
261    }
262
263    async fn get_auth_peers(
264        &self,
265        ctx: &ActorContext<Self>,
266    ) -> Result<HashSet<PublicKey>, ActorError> {
267        let auth_path = ActorPath::from("/user/node/auth");
268        let auth = ctx.system().get_actor::<Auth>(&auth_path).await?;
269        match auth
270            .ask(AuthMessage::GetAuth {
271                subject_id: self.governance_id.clone(),
272            })
273            .await
274        {
275            Ok(AuthResponse::Witnesses(mut witnesses)) => {
276                witnesses.remove(&*self.our_key);
277                Ok(witnesses)
278            }
279            Ok(_) => Ok(HashSet::new()),
280            Err(ActorError::Functional { .. }) => Ok(HashSet::new()),
281            Err(error) => Err(error),
282        }
283    }
284
285    async fn select_peer(
286        &self,
287        ctx: &ActorContext<Self>,
288    ) -> Result<Option<PublicKey>, ActorError> {
289        let mut peers = self.get_governance_peers(ctx).await?;
290        peers.extend(self.get_auth_peers(ctx).await?);
291        peers.remove(&*self.our_key);
292
293        let mut rng = rand::rng();
294        Ok(peers.into_iter().choose(&mut rng))
295    }
296
297    async fn start_fetch(
298        &mut self,
299        ctx: &ActorContext<Self>,
300        peer: PublicKey,
301        governance_version: u64,
302        after_subject_id: Option<DigestIdentifier>,
303    ) -> Result<(), ActorError> {
304        let request_nonce = self.allocate_nonce();
305
306        self.network
307            .send_command(ave_network::CommandHelper::SendMessage {
308                message: NetworkMessage {
309                    info: ComunicateInfo {
310                        receiver: peer.clone(),
311                        request_id: String::default(),
312                        version: 0,
313                        receiver_actor: format!(
314                            "/user/node/subject_manager/{}/tracker_sync",
315                            self.governance_id
316                        ),
317                    },
318                    message: ActorMessage::TrackerSyncReq {
319                        subject_id: self.governance_id.clone(),
320                        request_nonce,
321                        governance_version,
322                        after_subject_id,
323                        limit: self.page_size,
324                        receiver_actor: ctx.path().to_string(),
325                    },
326                },
327            })
328            .await?;
329
330        self.state = SyncState::Fetching(FetchState {
331            peer,
332            governance_version,
333            request_nonce,
334        });
335        self.schedule_fetch_timeout(ctx, request_nonce).await
336    }
337
338    async fn get_local_tracker_sn(
339        &self,
340        ctx: &mut ActorContext<Self>,
341        subject_id: &DigestIdentifier,
342    ) -> Result<Option<u64>, ActorError> {
343        let Some(data) = get_subject_data(ctx, subject_id).await? else {
344            return Ok(None);
345        };
346
347        if !matches!(data, SubjectData::Tracker { .. }) {
348            return Ok(None);
349        }
350
351        let requester = format!("tracker_sync_local:{}", subject_id);
352        let lease =
353            acquire_subject(ctx, subject_id, requester, None, true).await?;
354        let path = ActorPath::from(format!(
355            "/user/node/subject_manager/{}",
356            subject_id
357        ));
358        let tracker = ctx.system().get_actor::<Tracker>(&path).await?;
359        let response = tracker.ask(TrackerMessage::GetMetadata).await;
360        lease.finish(ctx).await?;
361        let response = response?;
362
363        match response {
364            TrackerResponse::Metadata(metadata) => Ok(Some(metadata.sn)),
365            _ => Err(ActorError::UnexpectedResponse {
366                path,
367                expected: "TrackerResponse::Metadata".to_owned(),
368            }),
369        }
370    }
371
372    async fn request_tracker_update(
373        &self,
374        peer: &PublicKey,
375        subject_id: &DigestIdentifier,
376        actual_sn: Option<u64>,
377    ) -> Result<(), ActorError> {
378        self.network
379            .send_command(ave_network::CommandHelper::SendMessage {
380                message: NetworkMessage {
381                    info: ComunicateInfo {
382                        receiver: peer.clone(),
383                        request_id: String::default(),
384                        version: 0,
385                        receiver_actor: format!(
386                            "/user/node/distributor_{}",
387                            subject_id
388                        ),
389                    },
390                    message: ActorMessage::DistributionLedgerReq {
391                        actual_sn,
392                        target_sn: None,
393                        subject_id: subject_id.clone(),
394                    },
395                },
396            })
397            .await
398    }
399
400    async fn build_pending_updates(
401        &self,
402        ctx: &mut ActorContext<Self>,
403        items: Vec<CurrentWitnessSubject>,
404    ) -> Result<VecDeque<CurrentWitnessSubject>, ActorError> {
405        let mut pending_items = VecDeque::new();
406
407        for item in items {
408            let local_sn =
409                self.get_local_tracker_sn(ctx, &item.subject_id).await?;
410            if local_sn.is_none_or(|local_sn| local_sn < item.target_sn) {
411                pending_items.push_back(item);
412            }
413        }
414
415        Ok(pending_items)
416    }
417
418    async fn start_update_phase(
419        &mut self,
420        ctx: &mut ActorContext<Self>,
421        peer: PublicKey,
422        governance_version: u64,
423        pending_items: VecDeque<CurrentWitnessSubject>,
424        next_cursor: Option<DigestIdentifier>,
425    ) -> Result<(), ActorError> {
426        self.state = SyncState::Updating(UpdateState {
427            peer,
428            governance_version,
429            pending_items,
430            next_cursor,
431            active_batch: Vec::new(),
432            batch_nonce: 0,
433        });
434
435        self.advance_update_phase(ctx).await
436    }
437
438    async fn advance_update_phase(
439        &mut self,
440        ctx: &mut ActorContext<Self>,
441    ) -> Result<(), ActorError> {
442        let current_local_version = self.get_governance_version(ctx).await?;
443        let SyncState::Updating(mut state) = std::mem::take(&mut self.state)
444        else {
445            return Ok(());
446        };
447
448        if current_local_version != state.governance_version {
449            return self
450                .start_fetch(ctx, state.peer, current_local_version, None)
451                .await;
452        }
453
454        if !state.active_batch.is_empty() {
455            let mut still_running =
456                Vec::with_capacity(state.active_batch.len());
457            for mut active_update in state.active_batch {
458                let current_sn = self
459                    .get_local_tracker_sn(ctx, &active_update.item.subject_id)
460                    .await?;
461
462                if current_sn.is_some_and(|current_sn| {
463                    current_sn >= active_update.item.target_sn
464                }) {
465                    Self::observe_update("completed");
466                    continue;
467                }
468
469                if current_sn != active_update.last_seen_sn {
470                    active_update.last_seen_sn = current_sn;
471                    active_update.stalled_checks = 0;
472                    still_running.push(active_update);
473                    continue;
474                }
475
476                active_update.stalled_checks += 1;
477                if active_update.stalled_checks < MAX_STALLED_UPDATE_CHECKS {
478                    still_running.push(active_update);
479                } else {
480                    Self::observe_update("stalled");
481                    warn!(
482                        governance_id = %self.governance_id,
483                        subject_id = %active_update.item.subject_id,
484                        target_sn = active_update.item.target_sn,
485                        current_sn = ?current_sn,
486                        "Tracker sync update stalled, skipping subject"
487                    );
488                }
489            }
490
491            state.active_batch = still_running;
492            if !state.active_batch.is_empty() {
493                let batch_nonce = self.allocate_nonce();
494                state.batch_nonce = batch_nonce;
495                self.state = SyncState::Updating(state);
496                return self.schedule_update_timeout(ctx, batch_nonce).await;
497            }
498        }
499
500        if state.pending_items.is_empty() {
501            if let Some(after_subject_id) = state.next_cursor {
502                return self
503                    .start_fetch(
504                        ctx,
505                        state.peer,
506                        state.governance_version,
507                        Some(after_subject_id),
508                    )
509                    .await;
510            }
511
512            Self::observe_round("completed");
513            return self.finish_cycle(ctx).await;
514        }
515
516        let batch_nonce = self.allocate_nonce();
517        let peer = state.peer.clone();
518        let mut active_batch = Vec::with_capacity(self.update_batch_size);
519        for _ in 0..self.update_batch_size {
520            let Some(item) = state.pending_items.pop_front() else {
521                break;
522            };
523            let last_seen_sn =
524                self.get_local_tracker_sn(ctx, &item.subject_id).await?;
525            self.request_tracker_update(&peer, &item.subject_id, last_seen_sn)
526                .await?;
527            Self::observe_update("launched");
528            active_batch.push(ActiveUpdate {
529                item,
530                last_seen_sn,
531                stalled_checks: 0,
532            });
533        }
534
535        state.active_batch = active_batch;
536        state.batch_nonce = batch_nonce;
537        self.state = SyncState::Updating(state);
538
539        self.schedule_update_timeout(ctx, batch_nonce).await
540    }
541
542    async fn handle_tick(
543        &mut self,
544        ctx: &ActorContext<Self>,
545    ) -> Result<(), ActorError> {
546        if !self.service || !matches!(self.state, SyncState::Idle) {
547            return Ok(());
548        }
549
550        let Some(peer) = self.select_peer(ctx).await? else {
551            Self::observe_round("no_peer");
552            self.schedule_tick(ctx).await?;
553            return Ok(());
554        };
555
556        let governance_version = self.get_governance_version(ctx).await?;
557        Self::observe_round("started");
558        self.start_fetch(ctx, peer, governance_version, None).await
559    }
560
561    async fn handle_network_request(
562        &self,
563        ctx: &ActorContext<Self>,
564        request: TrackerSyncNetworkRequest,
565    ) -> Result<(), ActorError> {
566        let path = ActorPath::from(format!(
567            "/user/node/subject_manager/{}/witnesses_register",
568            self.governance_id
569        ));
570        let actor = ctx.system().get_actor::<WitnessesRegister>(&path).await?;
571        let response = actor
572            .ask(WitnessesRegisterMessage::ListCurrentWitnessSubjects {
573                node: request.sender.clone(),
574                governance_version: request.governance_version,
575                after_subject_id: request.after_subject_id,
576                limit: request.limit,
577            })
578            .await?;
579
580        let WitnessesRegisterResponse::CurrentWitnessSubjects {
581            governance_version,
582            items,
583            next_cursor,
584        } = response
585        else {
586            return Err(ActorError::UnexpectedResponse {
587                path,
588                expected: "WitnessesRegisterResponse::CurrentWitnessSubjects"
589                    .to_owned(),
590            });
591        };
592
593        self.network
594            .send_command(ave_network::CommandHelper::SendMessage {
595                message: NetworkMessage {
596                    info: ComunicateInfo {
597                        receiver: request.sender,
598                        request_id: request.info.request_id,
599                        version: request.info.version,
600                        receiver_actor: request.receiver_actor,
601                    },
602                    message: ActorMessage::TrackerSyncRes {
603                        request_nonce: request.request_nonce,
604                        governance_version,
605                        items,
606                        next_cursor,
607                    },
608                },
609            })
610            .await
611    }
612
613    async fn finish_cycle(
614        &mut self,
615        ctx: &ActorContext<Self>,
616    ) -> Result<(), ActorError> {
617        self.state = SyncState::Idle;
618        self.schedule_tick(ctx).await
619    }
620}
621
622#[async_trait]
623impl Actor for TrackerSync {
624    type Event = ();
625    type Message = TrackerSyncMessage;
626    type Response = TrackerSyncResponse;
627
628    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
629        parent_span.map_or_else(
630            || info_span!("TrackerSync"),
631            |parent| info_span!(parent: parent, "TrackerSync"),
632        )
633    }
634
635    async fn pre_start(
636        &mut self,
637        ctx: &mut ActorContext<Self>,
638    ) -> Result<(), ActorError> {
639        self.schedule_tick(ctx).await
640    }
641}
642
643impl NotPersistentActor for TrackerSync {}
644
645#[async_trait]
646impl Handler<Self> for TrackerSync {
647    async fn handle_message(
648        &mut self,
649        _sender: ActorPath,
650        msg: TrackerSyncMessage,
651        ctx: &mut ActorContext<Self>,
652    ) -> Result<TrackerSyncResponse, ActorError> {
653        match msg {
654            TrackerSyncMessage::Tick => {
655                if let Err(error) = self.handle_tick(ctx).await {
656                    Self::observe_round("error");
657                    warn!(
658                        governance_id = %self.governance_id,
659                        error = %error,
660                        "Tracker sync tick failed"
661                    );
662                    self.finish_cycle(ctx).await?;
663                }
664            }
665            TrackerSyncMessage::FetchTimeout { request_nonce } => {
666                let timed_out = matches!(
667                    &self.state,
668                    SyncState::Fetching(state)
669                        if state.request_nonce == request_nonce
670                );
671
672                if timed_out {
673                    Self::observe_round("timeout");
674                    debug!(
675                        governance_id = %self.governance_id,
676                        request_nonce = request_nonce,
677                        "Tracker sync fetch timed out"
678                    );
679                    self.finish_cycle(ctx).await?;
680                }
681            }
682            TrackerSyncMessage::UpdateTimeout { batch_nonce } => {
683                let timed_out = matches!(
684                    &self.state,
685                    SyncState::Updating(state)
686                        if state.batch_nonce == batch_nonce
687                );
688
689                if timed_out {
690                    self.advance_update_phase(ctx).await?;
691                }
692            }
693            TrackerSyncMessage::NetworkRequest(request) => {
694                self.handle_network_request(ctx, request).await?;
695            }
696            TrackerSyncMessage::NetworkResponse(
697                TrackerSyncNetworkResponse {
698                    peer,
699                    request_nonce,
700                    governance_version,
701                    items,
702                    next_cursor,
703                },
704            ) => {
705                let (active_peer, active_governance_version) = match &self.state
706                {
707                    SyncState::Fetching(state)
708                        if state.peer == peer
709                            && state.request_nonce == request_nonce =>
710                    {
711                        (state.peer.clone(), state.governance_version)
712                    }
713                    _ => return Ok(TrackerSyncResponse::None),
714                };
715
716                debug!(
717                    governance_id = %self.governance_id,
718                    peer = %peer,
719                    request_nonce = request_nonce,
720                    governance_version = governance_version,
721                    item_count = items.len(),
722                    has_next = next_cursor.is_some(),
723                    "Received tracker sync page"
724                );
725
726                let local_governance_version =
727                    self.get_governance_version(ctx).await?;
728                let effective_governance_version =
729                    local_governance_version.max(governance_version);
730
731                if effective_governance_version != active_governance_version {
732                    Self::observe_round("gov_changed");
733                    self.start_fetch(
734                        ctx,
735                        active_peer,
736                        effective_governance_version,
737                        None,
738                    )
739                    .await?;
740                    return Ok(TrackerSyncResponse::None);
741                }
742
743                let pending_items =
744                    self.build_pending_updates(ctx, items).await?;
745                if pending_items.is_empty() {
746                    if let Some(after_subject_id) = next_cursor {
747                        self.start_fetch(
748                            ctx,
749                            active_peer,
750                            governance_version,
751                            Some(after_subject_id),
752                        )
753                        .await?;
754                    } else {
755                        Self::observe_round("completed");
756                        self.finish_cycle(ctx).await?;
757                    }
758                    return Ok(TrackerSyncResponse::None);
759                }
760
761                self.start_update_phase(
762                    ctx,
763                    active_peer,
764                    governance_version,
765                    pending_items,
766                    next_cursor,
767                )
768                .await?;
769            }
770        }
771
772        Ok(TrackerSyncResponse::None)
773    }
774}