1use std::{
2 collections::{HashSet, VecDeque},
3 sync::Arc,
4 time::Duration,
5};
6
7use async_trait::async_trait;
8use ave_actors::{
9 Actor, ActorContext, ActorError, ActorPath, Handler, Message,
10 NotPersistentActor, Response,
11};
12use ave_common::identity::{DigestIdentifier, PublicKey};
13use ave_network::ComunicateInfo;
14use rand::seq::IteratorRandom;
15use tracing::{Span, debug, info_span, warn};
16
17use crate::auth::{Auth, AuthMessage, AuthResponse};
18use crate::governance::witnesses_register::{
19 CurrentWitnessSubject, WitnessesRegister, WitnessesRegisterMessage,
20 WitnessesRegisterResponse,
21};
22use crate::governance::{
23 Governance, GovernanceMessage, GovernanceResponse, model::WitnessesData,
24};
25use crate::helpers::network::{
26 ActorMessage, NetworkMessage, service::NetworkSender,
27};
28use crate::metrics::try_core_metrics;
29use crate::model::common::node::get_subject_data;
30use crate::model::common::subject::acquire_subject;
31use crate::node::SubjectData;
32use crate::tracker::{Tracker, TrackerMessage, TrackerResponse};
33
34#[derive(Debug, Clone)]
35pub enum TrackerSyncMessage {
36 Tick,
37 FetchTimeout { request_nonce: u64 },
38 UpdateTimeout { batch_nonce: u64 },
39 NetworkRequest(TrackerSyncNetworkRequest),
40 NetworkResponse(TrackerSyncNetworkResponse),
41}
42
43impl Message for TrackerSyncMessage {}
44
45#[derive(Debug, Clone)]
46pub enum TrackerSyncResponse {
47 None,
48}
49
50impl Response for TrackerSyncResponse {}
51
52#[derive(Debug, Clone)]
53struct FetchState {
54 peer: PublicKey,
55 governance_version: u64,
56 request_nonce: u64,
57}
58
59#[derive(Debug, Clone)]
60struct UpdateState {
61 peer: PublicKey,
62 governance_version: u64,
63 pending_items: VecDeque<CurrentWitnessSubject>,
64 next_cursor: Option<DigestIdentifier>,
65 active_batch: Vec<ActiveUpdate>,
66 batch_nonce: u64,
67}
68
69#[derive(Debug, Clone)]
70struct ActiveUpdate {
71 item: CurrentWitnessSubject,
72 last_seen_sn: Option<u64>,
73 stalled_checks: u8,
74}
75
76#[derive(Debug, Clone, Default)]
77enum SyncState {
78 #[default]
79 Idle,
80 Fetching(FetchState),
81 Updating(UpdateState),
82}
83
84#[derive(Debug, Clone)]
85pub struct TrackerSyncConfig {
86 pub service: bool,
87 pub tick_interval: Duration,
88 pub response_timeout: Duration,
89 pub page_size: usize,
90 pub update_batch_size: usize,
91 pub update_timeout: Duration,
92}
93
94#[derive(Debug, Clone)]
95pub struct TrackerSyncNetworkRequest {
96 pub request_nonce: u64,
97 pub governance_version: u64,
98 pub after_subject_id: Option<DigestIdentifier>,
99 pub limit: usize,
100 pub info: ComunicateInfo,
101 pub sender: PublicKey,
102 pub receiver_actor: String,
103}
104
105#[derive(Debug, Clone)]
106pub struct TrackerSyncNetworkResponse {
107 pub peer: PublicKey,
108 pub request_nonce: u64,
109 pub governance_version: u64,
110 pub items: Vec<CurrentWitnessSubject>,
111 pub next_cursor: Option<DigestIdentifier>,
112}
113
114pub struct TrackerSync {
115 governance_id: DigestIdentifier,
116 our_key: Arc<PublicKey>,
117 network: Arc<NetworkSender>,
118 service: bool,
119 tick_interval: Duration,
120 response_timeout: Duration,
121 page_size: usize,
122 update_batch_size: usize,
123 update_timeout: Duration,
124 next_nonce: u64,
125 state: SyncState,
126}
127
128const MAX_STALLED_UPDATE_CHECKS: u8 = 3;
129
130impl TrackerSync {
131 fn observe_round(result: &'static str) {
132 if let Some(metrics) = try_core_metrics() {
133 metrics.observe_tracker_sync_round(result);
134 }
135 }
136
137 fn observe_update(result: &'static str) {
138 if let Some(metrics) = try_core_metrics() {
139 metrics.observe_tracker_sync_update(result);
140 }
141 }
142
143 pub fn new(
144 governance_id: DigestIdentifier,
145 our_key: Arc<PublicKey>,
146 network: Arc<NetworkSender>,
147 config: TrackerSyncConfig,
148 ) -> Self {
149 Self {
150 governance_id,
151 our_key,
152 network,
153 service: config.service,
154 tick_interval: config.tick_interval,
155 response_timeout: config.response_timeout,
156 page_size: config.page_size.max(1),
157 update_batch_size: config.update_batch_size.max(1),
158 update_timeout: config.update_timeout,
159 next_nonce: 0,
160 state: SyncState::Idle,
161 }
162 }
163
164 const fn allocate_nonce(&mut self) -> u64 {
165 self.next_nonce += 1;
166 self.next_nonce
167 }
168
169 async fn schedule_tick(
170 &self,
171 ctx: &ActorContext<Self>,
172 ) -> Result<(), ActorError> {
173 if !self.service {
174 return Ok(());
175 }
176
177 let actor = ctx.reference().await?;
178 let delay = self.tick_interval;
179 tokio::spawn(async move {
180 tokio::time::sleep(delay).await;
181 let _ = actor.tell(TrackerSyncMessage::Tick).await;
182 });
183 Ok(())
184 }
185
186 async fn schedule_fetch_timeout(
187 &self,
188 ctx: &ActorContext<Self>,
189 request_nonce: u64,
190 ) -> Result<(), ActorError> {
191 let actor = ctx.reference().await?;
192 let delay = self.response_timeout;
193 tokio::spawn(async move {
194 tokio::time::sleep(delay).await;
195 let _ = actor
196 .tell(TrackerSyncMessage::FetchTimeout { request_nonce })
197 .await;
198 });
199 Ok(())
200 }
201
202 async fn schedule_update_timeout(
203 &self,
204 ctx: &ActorContext<Self>,
205 batch_nonce: u64,
206 ) -> Result<(), ActorError> {
207 let actor = ctx.reference().await?;
208 let delay = self.update_timeout;
209 tokio::spawn(async move {
210 tokio::time::sleep(delay).await;
211 let _ = actor
212 .tell(TrackerSyncMessage::UpdateTimeout { batch_nonce })
213 .await;
214 });
215 Ok(())
216 }
217
218 async fn get_governance_version(
219 &self,
220 ctx: &ActorContext<Self>,
221 ) -> Result<u64, ActorError> {
222 let path = ActorPath::from(format!(
223 "/user/node/subject_manager/{}",
224 self.governance_id
225 ));
226 let actor = ctx.system().get_actor::<Governance>(&path).await?;
227 let response = actor.ask(GovernanceMessage::GetVersion).await?;
228
229 match response {
230 GovernanceResponse::Version(version) => Ok(version),
231 _ => Err(ActorError::UnexpectedResponse {
232 path,
233 expected: "GovernanceResponse::Version".to_owned(),
234 }),
235 }
236 }
237
238 async fn get_governance_peers(
239 &self,
240 ctx: &ActorContext<Self>,
241 ) -> Result<HashSet<PublicKey>, ActorError> {
242 let path = ActorPath::from(format!(
243 "/user/node/subject_manager/{}",
244 self.governance_id
245 ));
246 let actor = ctx.system().get_actor::<Governance>(&path).await?;
247 let response = actor.ask(GovernanceMessage::GetGovernance).await?;
248
249 let GovernanceResponse::Governance(governance) = response else {
250 return Err(ActorError::UnexpectedResponse {
251 path,
252 expected: "GovernanceResponse::Governance".to_owned(),
253 });
254 };
255
256 governance.get_witnesses(WitnessesData::Gov).map_err(|e| {
257 ActorError::Functional {
258 description: e.to_string(),
259 }
260 })
261 }
262
263 async fn get_auth_peers(
264 &self,
265 ctx: &ActorContext<Self>,
266 ) -> Result<HashSet<PublicKey>, ActorError> {
267 let auth_path = ActorPath::from("/user/node/auth");
268 let auth = ctx.system().get_actor::<Auth>(&auth_path).await?;
269 match auth
270 .ask(AuthMessage::GetAuth {
271 subject_id: self.governance_id.clone(),
272 })
273 .await
274 {
275 Ok(AuthResponse::Witnesses(mut witnesses)) => {
276 witnesses.remove(&*self.our_key);
277 Ok(witnesses)
278 }
279 Ok(_) => Ok(HashSet::new()),
280 Err(ActorError::Functional { .. }) => Ok(HashSet::new()),
281 Err(error) => Err(error),
282 }
283 }
284
285 async fn select_peer(
286 &self,
287 ctx: &ActorContext<Self>,
288 ) -> Result<Option<PublicKey>, ActorError> {
289 let mut peers = self.get_governance_peers(ctx).await?;
290 peers.extend(self.get_auth_peers(ctx).await?);
291 peers.remove(&*self.our_key);
292
293 let mut rng = rand::rng();
294 Ok(peers.into_iter().choose(&mut rng))
295 }
296
297 async fn start_fetch(
298 &mut self,
299 ctx: &ActorContext<Self>,
300 peer: PublicKey,
301 governance_version: u64,
302 after_subject_id: Option<DigestIdentifier>,
303 ) -> Result<(), ActorError> {
304 let request_nonce = self.allocate_nonce();
305
306 self.network
307 .send_command(ave_network::CommandHelper::SendMessage {
308 message: NetworkMessage {
309 info: ComunicateInfo {
310 receiver: peer.clone(),
311 request_id: String::default(),
312 version: 0,
313 receiver_actor: format!(
314 "/user/node/subject_manager/{}/tracker_sync",
315 self.governance_id
316 ),
317 },
318 message: ActorMessage::TrackerSyncReq {
319 subject_id: self.governance_id.clone(),
320 request_nonce,
321 governance_version,
322 after_subject_id,
323 limit: self.page_size,
324 receiver_actor: ctx.path().to_string(),
325 },
326 },
327 })
328 .await?;
329
330 self.state = SyncState::Fetching(FetchState {
331 peer,
332 governance_version,
333 request_nonce,
334 });
335 self.schedule_fetch_timeout(ctx, request_nonce).await
336 }
337
338 async fn get_local_tracker_sn(
339 &self,
340 ctx: &mut ActorContext<Self>,
341 subject_id: &DigestIdentifier,
342 ) -> Result<Option<u64>, ActorError> {
343 let Some(data) = get_subject_data(ctx, subject_id).await? else {
344 return Ok(None);
345 };
346
347 if !matches!(data, SubjectData::Tracker { .. }) {
348 return Ok(None);
349 }
350
351 let requester = format!("tracker_sync_local:{}", subject_id);
352 let lease =
353 acquire_subject(ctx, subject_id, requester, None, true).await?;
354 let path = ActorPath::from(format!(
355 "/user/node/subject_manager/{}",
356 subject_id
357 ));
358 let tracker = ctx.system().get_actor::<Tracker>(&path).await?;
359 let response = tracker.ask(TrackerMessage::GetMetadata).await;
360 lease.finish(ctx).await?;
361 let response = response?;
362
363 match response {
364 TrackerResponse::Metadata(metadata) => Ok(Some(metadata.sn)),
365 _ => Err(ActorError::UnexpectedResponse {
366 path,
367 expected: "TrackerResponse::Metadata".to_owned(),
368 }),
369 }
370 }
371
372 async fn request_tracker_update(
373 &self,
374 peer: &PublicKey,
375 subject_id: &DigestIdentifier,
376 actual_sn: Option<u64>,
377 ) -> Result<(), ActorError> {
378 self.network
379 .send_command(ave_network::CommandHelper::SendMessage {
380 message: NetworkMessage {
381 info: ComunicateInfo {
382 receiver: peer.clone(),
383 request_id: String::default(),
384 version: 0,
385 receiver_actor: format!(
386 "/user/node/distributor_{}",
387 subject_id
388 ),
389 },
390 message: ActorMessage::DistributionLedgerReq {
391 actual_sn,
392 target_sn: None,
393 subject_id: subject_id.clone(),
394 },
395 },
396 })
397 .await
398 }
399
400 async fn build_pending_updates(
401 &self,
402 ctx: &mut ActorContext<Self>,
403 items: Vec<CurrentWitnessSubject>,
404 ) -> Result<VecDeque<CurrentWitnessSubject>, ActorError> {
405 let mut pending_items = VecDeque::new();
406
407 for item in items {
408 let local_sn =
409 self.get_local_tracker_sn(ctx, &item.subject_id).await?;
410 if local_sn.is_none_or(|local_sn| local_sn < item.target_sn) {
411 pending_items.push_back(item);
412 }
413 }
414
415 Ok(pending_items)
416 }
417
418 async fn start_update_phase(
419 &mut self,
420 ctx: &mut ActorContext<Self>,
421 peer: PublicKey,
422 governance_version: u64,
423 pending_items: VecDeque<CurrentWitnessSubject>,
424 next_cursor: Option<DigestIdentifier>,
425 ) -> Result<(), ActorError> {
426 self.state = SyncState::Updating(UpdateState {
427 peer,
428 governance_version,
429 pending_items,
430 next_cursor,
431 active_batch: Vec::new(),
432 batch_nonce: 0,
433 });
434
435 self.advance_update_phase(ctx).await
436 }
437
438 async fn advance_update_phase(
439 &mut self,
440 ctx: &mut ActorContext<Self>,
441 ) -> Result<(), ActorError> {
442 let current_local_version = self.get_governance_version(ctx).await?;
443 let SyncState::Updating(mut state) = std::mem::take(&mut self.state)
444 else {
445 return Ok(());
446 };
447
448 if current_local_version != state.governance_version {
449 return self
450 .start_fetch(ctx, state.peer, current_local_version, None)
451 .await;
452 }
453
454 if !state.active_batch.is_empty() {
455 let mut still_running =
456 Vec::with_capacity(state.active_batch.len());
457 for mut active_update in state.active_batch {
458 let current_sn = self
459 .get_local_tracker_sn(ctx, &active_update.item.subject_id)
460 .await?;
461
462 if current_sn.is_some_and(|current_sn| {
463 current_sn >= active_update.item.target_sn
464 }) {
465 Self::observe_update("completed");
466 continue;
467 }
468
469 if current_sn != active_update.last_seen_sn {
470 active_update.last_seen_sn = current_sn;
471 active_update.stalled_checks = 0;
472 still_running.push(active_update);
473 continue;
474 }
475
476 active_update.stalled_checks += 1;
477 if active_update.stalled_checks < MAX_STALLED_UPDATE_CHECKS {
478 still_running.push(active_update);
479 } else {
480 Self::observe_update("stalled");
481 warn!(
482 governance_id = %self.governance_id,
483 subject_id = %active_update.item.subject_id,
484 target_sn = active_update.item.target_sn,
485 current_sn = ?current_sn,
486 "Tracker sync update stalled, skipping subject"
487 );
488 }
489 }
490
491 state.active_batch = still_running;
492 if !state.active_batch.is_empty() {
493 let batch_nonce = self.allocate_nonce();
494 state.batch_nonce = batch_nonce;
495 self.state = SyncState::Updating(state);
496 return self.schedule_update_timeout(ctx, batch_nonce).await;
497 }
498 }
499
500 if state.pending_items.is_empty() {
501 if let Some(after_subject_id) = state.next_cursor {
502 return self
503 .start_fetch(
504 ctx,
505 state.peer,
506 state.governance_version,
507 Some(after_subject_id),
508 )
509 .await;
510 }
511
512 Self::observe_round("completed");
513 return self.finish_cycle(ctx).await;
514 }
515
516 let batch_nonce = self.allocate_nonce();
517 let peer = state.peer.clone();
518 let mut active_batch = Vec::with_capacity(self.update_batch_size);
519 for _ in 0..self.update_batch_size {
520 let Some(item) = state.pending_items.pop_front() else {
521 break;
522 };
523 let last_seen_sn =
524 self.get_local_tracker_sn(ctx, &item.subject_id).await?;
525 self.request_tracker_update(&peer, &item.subject_id, last_seen_sn)
526 .await?;
527 Self::observe_update("launched");
528 active_batch.push(ActiveUpdate {
529 item,
530 last_seen_sn,
531 stalled_checks: 0,
532 });
533 }
534
535 state.active_batch = active_batch;
536 state.batch_nonce = batch_nonce;
537 self.state = SyncState::Updating(state);
538
539 self.schedule_update_timeout(ctx, batch_nonce).await
540 }
541
542 async fn handle_tick(
543 &mut self,
544 ctx: &ActorContext<Self>,
545 ) -> Result<(), ActorError> {
546 if !self.service || !matches!(self.state, SyncState::Idle) {
547 return Ok(());
548 }
549
550 let Some(peer) = self.select_peer(ctx).await? else {
551 Self::observe_round("no_peer");
552 self.schedule_tick(ctx).await?;
553 return Ok(());
554 };
555
556 let governance_version = self.get_governance_version(ctx).await?;
557 Self::observe_round("started");
558 self.start_fetch(ctx, peer, governance_version, None).await
559 }
560
561 async fn handle_network_request(
562 &self,
563 ctx: &ActorContext<Self>,
564 request: TrackerSyncNetworkRequest,
565 ) -> Result<(), ActorError> {
566 let path = ActorPath::from(format!(
567 "/user/node/subject_manager/{}/witnesses_register",
568 self.governance_id
569 ));
570 let actor = ctx.system().get_actor::<WitnessesRegister>(&path).await?;
571 let response = actor
572 .ask(WitnessesRegisterMessage::ListCurrentWitnessSubjects {
573 node: request.sender.clone(),
574 governance_version: request.governance_version,
575 after_subject_id: request.after_subject_id,
576 limit: request.limit,
577 })
578 .await?;
579
580 let WitnessesRegisterResponse::CurrentWitnessSubjects {
581 governance_version,
582 items,
583 next_cursor,
584 } = response
585 else {
586 return Err(ActorError::UnexpectedResponse {
587 path,
588 expected: "WitnessesRegisterResponse::CurrentWitnessSubjects"
589 .to_owned(),
590 });
591 };
592
593 self.network
594 .send_command(ave_network::CommandHelper::SendMessage {
595 message: NetworkMessage {
596 info: ComunicateInfo {
597 receiver: request.sender,
598 request_id: request.info.request_id,
599 version: request.info.version,
600 receiver_actor: request.receiver_actor,
601 },
602 message: ActorMessage::TrackerSyncRes {
603 request_nonce: request.request_nonce,
604 governance_version,
605 items,
606 next_cursor,
607 },
608 },
609 })
610 .await
611 }
612
613 async fn finish_cycle(
614 &mut self,
615 ctx: &ActorContext<Self>,
616 ) -> Result<(), ActorError> {
617 self.state = SyncState::Idle;
618 self.schedule_tick(ctx).await
619 }
620}
621
622#[async_trait]
623impl Actor for TrackerSync {
624 type Event = ();
625 type Message = TrackerSyncMessage;
626 type Response = TrackerSyncResponse;
627
628 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
629 parent_span.map_or_else(
630 || info_span!("TrackerSync"),
631 |parent| info_span!(parent: parent, "TrackerSync"),
632 )
633 }
634
635 async fn pre_start(
636 &mut self,
637 ctx: &mut ActorContext<Self>,
638 ) -> Result<(), ActorError> {
639 self.schedule_tick(ctx).await
640 }
641}
642
643impl NotPersistentActor for TrackerSync {}
644
645#[async_trait]
646impl Handler<Self> for TrackerSync {
647 async fn handle_message(
648 &mut self,
649 _sender: ActorPath,
650 msg: TrackerSyncMessage,
651 ctx: &mut ActorContext<Self>,
652 ) -> Result<TrackerSyncResponse, ActorError> {
653 match msg {
654 TrackerSyncMessage::Tick => {
655 if let Err(error) = self.handle_tick(ctx).await {
656 Self::observe_round("error");
657 warn!(
658 governance_id = %self.governance_id,
659 error = %error,
660 "Tracker sync tick failed"
661 );
662 self.finish_cycle(ctx).await?;
663 }
664 }
665 TrackerSyncMessage::FetchTimeout { request_nonce } => {
666 let timed_out = matches!(
667 &self.state,
668 SyncState::Fetching(state)
669 if state.request_nonce == request_nonce
670 );
671
672 if timed_out {
673 Self::observe_round("timeout");
674 debug!(
675 governance_id = %self.governance_id,
676 request_nonce = request_nonce,
677 "Tracker sync fetch timed out"
678 );
679 self.finish_cycle(ctx).await?;
680 }
681 }
682 TrackerSyncMessage::UpdateTimeout { batch_nonce } => {
683 let timed_out = matches!(
684 &self.state,
685 SyncState::Updating(state)
686 if state.batch_nonce == batch_nonce
687 );
688
689 if timed_out {
690 self.advance_update_phase(ctx).await?;
691 }
692 }
693 TrackerSyncMessage::NetworkRequest(request) => {
694 self.handle_network_request(ctx, request).await?;
695 }
696 TrackerSyncMessage::NetworkResponse(
697 TrackerSyncNetworkResponse {
698 peer,
699 request_nonce,
700 governance_version,
701 items,
702 next_cursor,
703 },
704 ) => {
705 let (active_peer, active_governance_version) = match &self.state
706 {
707 SyncState::Fetching(state)
708 if state.peer == peer
709 && state.request_nonce == request_nonce =>
710 {
711 (state.peer.clone(), state.governance_version)
712 }
713 _ => return Ok(TrackerSyncResponse::None),
714 };
715
716 debug!(
717 governance_id = %self.governance_id,
718 peer = %peer,
719 request_nonce = request_nonce,
720 governance_version = governance_version,
721 item_count = items.len(),
722 has_next = next_cursor.is_some(),
723 "Received tracker sync page"
724 );
725
726 let local_governance_version =
727 self.get_governance_version(ctx).await?;
728 let effective_governance_version =
729 local_governance_version.max(governance_version);
730
731 if effective_governance_version != active_governance_version {
732 Self::observe_round("gov_changed");
733 self.start_fetch(
734 ctx,
735 active_peer,
736 effective_governance_version,
737 None,
738 )
739 .await?;
740 return Ok(TrackerSyncResponse::None);
741 }
742
743 let pending_items =
744 self.build_pending_updates(ctx, items).await?;
745 if pending_items.is_empty() {
746 if let Some(after_subject_id) = next_cursor {
747 self.start_fetch(
748 ctx,
749 active_peer,
750 governance_version,
751 Some(after_subject_id),
752 )
753 .await?;
754 } else {
755 Self::observe_round("completed");
756 self.finish_cycle(ctx).await?;
757 }
758 return Ok(TrackerSyncResponse::None);
759 }
760
761 self.start_update_phase(
762 ctx,
763 active_peer,
764 governance_version,
765 pending_items,
766 next_cursor,
767 )
768 .await?;
769 }
770 }
771
772 Ok(TrackerSyncResponse::None)
773 }
774}