1use std::{
2 collections::{HashSet, VecDeque},
3 sync::Arc,
4 time::Duration,
5};
6
7use async_trait::async_trait;
8use ave_actors::{
9 Actor, ActorContext, ActorError, ActorPath, Handler, Message,
10 NotPersistentActor, Response,
11};
12use ave_common::identity::{DigestIdentifier, PublicKey};
13use network::ComunicateInfo;
14use rand::seq::IteratorRandom;
15use tracing::{Span, debug, info_span, warn};
16
17use crate::auth::{Auth, AuthMessage, AuthResponse};
18use crate::governance::witnesses_register::{
19 CurrentWitnessSubject, WitnessesRegister, WitnessesRegisterMessage,
20 WitnessesRegisterResponse,
21};
22use crate::governance::{
23 Governance, GovernanceMessage, GovernanceResponse, model::WitnessesData,
24};
25use crate::helpers::network::{
26 ActorMessage, NetworkMessage, service::NetworkSender,
27};
28use crate::metrics::try_core_metrics;
29use crate::model::common::node::get_subject_data;
30use crate::model::common::subject::acquire_subject;
31use crate::node::SubjectData;
32use crate::tracker::{Tracker, TrackerMessage, TrackerResponse};
33
34#[derive(Debug, Clone)]
35pub enum TrackerSyncMessage {
36 Tick,
37 FetchTimeout { request_nonce: u64 },
38 UpdateTimeout { batch_nonce: u64 },
39 NetworkRequest(TrackerSyncNetworkRequest),
40 NetworkResponse(TrackerSyncNetworkResponse),
41}
42
43impl Message for TrackerSyncMessage {}
44
45#[derive(Debug, Clone)]
46pub enum TrackerSyncResponse {
47 None,
48}
49
50impl Response for TrackerSyncResponse {}
51
52#[derive(Debug, Clone)]
53struct FetchState {
54 peer: PublicKey,
55 governance_version: u64,
56 request_nonce: u64,
57}
58
59#[derive(Debug, Clone)]
60struct UpdateState {
61 peer: PublicKey,
62 governance_version: u64,
63 pending_items: VecDeque<CurrentWitnessSubject>,
64 next_cursor: Option<DigestIdentifier>,
65 active_batch: Vec<ActiveUpdate>,
66 batch_nonce: u64,
67}
68
69#[derive(Debug, Clone)]
70struct ActiveUpdate {
71 item: CurrentWitnessSubject,
72 last_seen_sn: Option<u64>,
73 stalled_checks: u8,
74}
75
76#[derive(Debug, Clone, Default)]
77enum SyncState {
78 #[default]
79 Idle,
80 Fetching(FetchState),
81 Updating(UpdateState),
82}
83
84#[derive(Debug, Clone)]
85pub struct TrackerSyncConfig {
86 pub service: bool,
87 pub tick_interval: Duration,
88 pub response_timeout: Duration,
89 pub page_size: usize,
90 pub update_batch_size: usize,
91 pub update_timeout: Duration,
92}
93
94#[derive(Debug, Clone)]
95pub struct TrackerSyncNetworkRequest {
96 pub request_nonce: u64,
97 pub governance_version: u64,
98 pub after_subject_id: Option<DigestIdentifier>,
99 pub limit: usize,
100 pub info: ComunicateInfo,
101 pub sender: PublicKey,
102 pub receiver_actor: String,
103}
104
105#[derive(Debug, Clone)]
106pub struct TrackerSyncNetworkResponse {
107 pub peer: PublicKey,
108 pub request_nonce: u64,
109 pub governance_version: u64,
110 pub items: Vec<CurrentWitnessSubject>,
111 pub next_cursor: Option<DigestIdentifier>,
112}
113
114pub struct TrackerSync {
115 governance_id: DigestIdentifier,
116 our_key: Arc<PublicKey>,
117 network: Arc<NetworkSender>,
118 service: bool,
119 tick_interval: Duration,
120 response_timeout: Duration,
121 page_size: usize,
122 update_batch_size: usize,
123 update_timeout: Duration,
124 next_nonce: u64,
125 state: SyncState,
126}
127
128const MAX_STALLED_UPDATE_CHECKS: u8 = 3;
129
130impl TrackerSync {
131 fn observe_round(result: &'static str) {
132 if let Some(metrics) = try_core_metrics() {
133 metrics.observe_tracker_sync_round(result);
134 }
135 }
136
137 fn observe_update(result: &'static str) {
138 if let Some(metrics) = try_core_metrics() {
139 metrics.observe_tracker_sync_update(result);
140 }
141 }
142
143 pub fn new(
144 governance_id: DigestIdentifier,
145 our_key: Arc<PublicKey>,
146 network: Arc<NetworkSender>,
147 config: TrackerSyncConfig,
148 ) -> Self {
149 Self {
150 governance_id,
151 our_key,
152 network,
153 service: config.service,
154 tick_interval: config.tick_interval,
155 response_timeout: config.response_timeout,
156 page_size: config.page_size.max(1),
157 update_batch_size: config.update_batch_size.max(1),
158 update_timeout: config.update_timeout,
159 next_nonce: 0,
160 state: SyncState::Idle,
161 }
162 }
163
164 const fn allocate_nonce(&mut self) -> u64 {
165 self.next_nonce += 1;
166 self.next_nonce
167 }
168
169 async fn schedule_tick(
170 &self,
171 ctx: &ActorContext<Self>,
172 ) -> Result<(), ActorError> {
173 if !self.service {
174 return Ok(());
175 }
176
177 let actor = ctx.reference().await?;
178 let delay = self.tick_interval;
179 tokio::spawn(async move {
180 tokio::time::sleep(delay).await;
181 let _ = actor.tell(TrackerSyncMessage::Tick).await;
182 });
183 Ok(())
184 }
185
186 async fn schedule_fetch_timeout(
187 &self,
188 ctx: &ActorContext<Self>,
189 request_nonce: u64,
190 ) -> Result<(), ActorError> {
191 let actor = ctx.reference().await?;
192 let delay = self.response_timeout;
193 tokio::spawn(async move {
194 tokio::time::sleep(delay).await;
195 let _ = actor
196 .tell(TrackerSyncMessage::FetchTimeout { request_nonce })
197 .await;
198 });
199 Ok(())
200 }
201
202 async fn schedule_update_timeout(
203 &self,
204 ctx: &ActorContext<Self>,
205 batch_nonce: u64,
206 ) -> Result<(), ActorError> {
207 let actor = ctx.reference().await?;
208 let delay = self.update_timeout;
209 tokio::spawn(async move {
210 tokio::time::sleep(delay).await;
211 let _ = actor
212 .tell(TrackerSyncMessage::UpdateTimeout { batch_nonce })
213 .await;
214 });
215 Ok(())
216 }
217
218 async fn get_governance_version(
219 &self,
220 ctx: &ActorContext<Self>,
221 ) -> Result<u64, ActorError> {
222 let path = ActorPath::from(format!(
223 "/user/node/subject_manager/{}",
224 self.governance_id
225 ));
226 let actor = ctx.system().get_actor::<Governance>(&path).await?;
227 let response = actor.ask(GovernanceMessage::GetVersion).await?;
228
229 match response {
230 GovernanceResponse::Version(version) => Ok(version),
231 _ => Err(ActorError::UnexpectedResponse {
232 path,
233 expected: "GovernanceResponse::Version".to_owned(),
234 }),
235 }
236 }
237
238 async fn get_governance_peers(
239 &self,
240 ctx: &ActorContext<Self>,
241 ) -> Result<HashSet<PublicKey>, ActorError> {
242 let path = ActorPath::from(format!(
243 "/user/node/subject_manager/{}",
244 self.governance_id
245 ));
246 let actor = ctx.system().get_actor::<Governance>(&path).await?;
247 let response = actor.ask(GovernanceMessage::GetGovernance).await?;
248
249 let GovernanceResponse::Governance(governance) = response else {
250 return Err(ActorError::UnexpectedResponse {
251 path,
252 expected: "GovernanceResponse::Governance".to_owned(),
253 });
254 };
255
256 governance.get_witnesses(WitnessesData::Gov).map_err(|e| {
257 ActorError::Functional {
258 description: e.to_string(),
259 }
260 })
261 }
262
263 async fn get_auth_peers(
264 &self,
265 ctx: &ActorContext<Self>,
266 ) -> Result<HashSet<PublicKey>, ActorError> {
267 let auth_path = ActorPath::from("/user/node/auth");
268 let auth = ctx.system().get_actor::<Auth>(&auth_path).await?;
269 match auth
270 .ask(AuthMessage::GetAuth {
271 subject_id: self.governance_id.clone(),
272 })
273 .await
274 {
275 Ok(AuthResponse::Witnesses(mut witnesses)) => {
276 witnesses.remove(&*self.our_key);
277 Ok(witnesses)
278 }
279 Ok(_) => Ok(HashSet::new()),
280 Err(ActorError::Functional { .. }) => Ok(HashSet::new()),
281 Err(error) => Err(error),
282 }
283 }
284
285 async fn select_peer(
286 &self,
287 ctx: &ActorContext<Self>,
288 ) -> Result<Option<PublicKey>, ActorError> {
289 let mut peers = self.get_governance_peers(ctx).await?;
290 peers.extend(self.get_auth_peers(ctx).await?);
291 peers.remove(&*self.our_key);
292
293 let mut rng = rand::rng();
294 Ok(peers.into_iter().choose(&mut rng))
295 }
296
297 async fn start_fetch(
298 &mut self,
299 ctx: &ActorContext<Self>,
300 peer: PublicKey,
301 governance_version: u64,
302 after_subject_id: Option<DigestIdentifier>,
303 ) -> Result<(), ActorError> {
304 let request_nonce = self.allocate_nonce();
305
306 self.network
307 .send_command(network::CommandHelper::SendMessage {
308 message: NetworkMessage {
309 info: ComunicateInfo {
310 receiver: peer.clone(),
311 request_id: String::default(),
312 version: 0,
313 receiver_actor: format!(
314 "/user/node/subject_manager/{}/tracker_sync",
315 self.governance_id
316 ),
317 },
318 message: ActorMessage::TrackerSyncReq {
319 subject_id: self.governance_id.clone(),
320 request_nonce,
321 governance_version,
322 after_subject_id,
323 limit: self.page_size,
324 receiver_actor: ctx.path().to_string(),
325 },
326 },
327 })
328 .await?;
329
330 self.state = SyncState::Fetching(FetchState {
331 peer,
332 governance_version,
333 request_nonce,
334 });
335 self.schedule_fetch_timeout(ctx, request_nonce).await
336 }
337
338 async fn get_local_tracker_sn(
339 &self,
340 ctx: &mut ActorContext<Self>,
341 subject_id: &DigestIdentifier,
342 ) -> Result<Option<u64>, ActorError> {
343 let Some(data) = get_subject_data(ctx, subject_id).await? else {
344 return Ok(None);
345 };
346
347 if !matches!(data, SubjectData::Tracker { .. }) {
348 return Ok(None);
349 }
350
351 let requester = format!("tracker_sync_local:{}", subject_id);
352 let lease =
353 acquire_subject(ctx, subject_id, requester, None, true).await?;
354 let path = ActorPath::from(format!(
355 "/user/node/subject_manager/{}",
356 subject_id
357 ));
358 let tracker = ctx.system().get_actor::<Tracker>(&path).await?;
359 let response = tracker.ask(TrackerMessage::GetMetadata).await;
360 lease.finish(ctx).await?;
361 let response = response?;
362
363 match response {
364 TrackerResponse::Metadata(metadata) => Ok(Some(metadata.sn)),
365 _ => Err(ActorError::UnexpectedResponse {
366 path,
367 expected: "TrackerResponse::Metadata".to_owned(),
368 }),
369 }
370 }
371
372 async fn request_tracker_update(
373 &self,
374 peer: &PublicKey,
375 subject_id: &DigestIdentifier,
376 actual_sn: Option<u64>,
377 ) -> Result<(), ActorError> {
378 self.network
379 .send_command(network::CommandHelper::SendMessage {
380 message: NetworkMessage {
381 info: ComunicateInfo {
382 receiver: peer.clone(),
383 request_id: String::default(),
384 version: 0,
385 receiver_actor: format!(
386 "/user/node/distributor_{}",
387 subject_id
388 ),
389 },
390 message: ActorMessage::DistributionLedgerReq {
391 actual_sn,
392 subject_id: subject_id.clone(),
393 },
394 },
395 })
396 .await
397 }
398
399 async fn build_pending_updates(
400 &self,
401 ctx: &mut ActorContext<Self>,
402 items: Vec<CurrentWitnessSubject>,
403 ) -> Result<VecDeque<CurrentWitnessSubject>, ActorError> {
404 let mut pending_items = VecDeque::new();
405
406 for item in items {
407 let local_sn =
408 self.get_local_tracker_sn(ctx, &item.subject_id).await?;
409 if local_sn.is_none_or(|local_sn| local_sn < item.target_sn) {
410 pending_items.push_back(item);
411 }
412 }
413
414 Ok(pending_items)
415 }
416
417 async fn start_update_phase(
418 &mut self,
419 ctx: &mut ActorContext<Self>,
420 peer: PublicKey,
421 governance_version: u64,
422 pending_items: VecDeque<CurrentWitnessSubject>,
423 next_cursor: Option<DigestIdentifier>,
424 ) -> Result<(), ActorError> {
425 self.state = SyncState::Updating(UpdateState {
426 peer,
427 governance_version,
428 pending_items,
429 next_cursor,
430 active_batch: Vec::new(),
431 batch_nonce: 0,
432 });
433
434 self.advance_update_phase(ctx).await
435 }
436
437 async fn advance_update_phase(
438 &mut self,
439 ctx: &mut ActorContext<Self>,
440 ) -> Result<(), ActorError> {
441 let current_local_version = self.get_governance_version(ctx).await?;
442 let SyncState::Updating(mut state) = std::mem::take(&mut self.state)
443 else {
444 return Ok(());
445 };
446
447 if current_local_version != state.governance_version {
448 return self
449 .start_fetch(ctx, state.peer, current_local_version, None)
450 .await;
451 }
452
453 if !state.active_batch.is_empty() {
454 let mut still_running =
455 Vec::with_capacity(state.active_batch.len());
456 for mut active_update in state.active_batch {
457 let current_sn = self
458 .get_local_tracker_sn(ctx, &active_update.item.subject_id)
459 .await?;
460
461 if current_sn.is_some_and(|current_sn| {
462 current_sn >= active_update.item.target_sn
463 }) {
464 Self::observe_update("completed");
465 continue;
466 }
467
468 if current_sn != active_update.last_seen_sn {
469 active_update.last_seen_sn = current_sn;
470 active_update.stalled_checks = 0;
471 still_running.push(active_update);
472 continue;
473 }
474
475 active_update.stalled_checks += 1;
476 if active_update.stalled_checks < MAX_STALLED_UPDATE_CHECKS {
477 still_running.push(active_update);
478 } else {
479 Self::observe_update("stalled");
480 warn!(
481 governance_id = %self.governance_id,
482 subject_id = %active_update.item.subject_id,
483 target_sn = active_update.item.target_sn,
484 current_sn = ?current_sn,
485 "Tracker sync update stalled, skipping subject"
486 );
487 }
488 }
489
490 state.active_batch = still_running;
491 if !state.active_batch.is_empty() {
492 let batch_nonce = self.allocate_nonce();
493 state.batch_nonce = batch_nonce;
494 self.state = SyncState::Updating(state);
495 return self.schedule_update_timeout(ctx, batch_nonce).await;
496 }
497 }
498
499 if state.pending_items.is_empty() {
500 if let Some(after_subject_id) = state.next_cursor {
501 return self
502 .start_fetch(
503 ctx,
504 state.peer,
505 state.governance_version,
506 Some(after_subject_id),
507 )
508 .await;
509 }
510
511 Self::observe_round("completed");
512 return self.finish_cycle(ctx).await;
513 }
514
515 let batch_nonce = self.allocate_nonce();
516 let peer = state.peer.clone();
517 let mut active_batch = Vec::with_capacity(self.update_batch_size);
518 for _ in 0..self.update_batch_size {
519 let Some(item) = state.pending_items.pop_front() else {
520 break;
521 };
522 let last_seen_sn =
523 self.get_local_tracker_sn(ctx, &item.subject_id).await?;
524 self.request_tracker_update(&peer, &item.subject_id, last_seen_sn)
525 .await?;
526 Self::observe_update("launched");
527 active_batch.push(ActiveUpdate {
528 item,
529 last_seen_sn,
530 stalled_checks: 0,
531 });
532 }
533
534 state.active_batch = active_batch;
535 state.batch_nonce = batch_nonce;
536 self.state = SyncState::Updating(state);
537
538 self.schedule_update_timeout(ctx, batch_nonce).await
539 }
540
541 async fn handle_tick(
542 &mut self,
543 ctx: &ActorContext<Self>,
544 ) -> Result<(), ActorError> {
545 if !self.service || !matches!(self.state, SyncState::Idle) {
546 return Ok(());
547 }
548
549 let Some(peer) = self.select_peer(ctx).await? else {
550 Self::observe_round("no_peer");
551 self.schedule_tick(ctx).await?;
552 return Ok(());
553 };
554
555 let governance_version = self.get_governance_version(ctx).await?;
556 Self::observe_round("started");
557 self.start_fetch(ctx, peer, governance_version, None).await
558 }
559
560 async fn handle_network_request(
561 &self,
562 ctx: &ActorContext<Self>,
563 request: TrackerSyncNetworkRequest,
564 ) -> Result<(), ActorError> {
565 let path = ActorPath::from(format!(
566 "/user/node/subject_manager/{}/witnesses_register",
567 self.governance_id
568 ));
569 let actor = ctx.system().get_actor::<WitnessesRegister>(&path).await?;
570 let response = actor
571 .ask(WitnessesRegisterMessage::ListCurrentWitnessSubjects {
572 node: request.sender.clone(),
573 governance_version: request.governance_version,
574 after_subject_id: request.after_subject_id,
575 limit: request.limit,
576 })
577 .await?;
578
579 let WitnessesRegisterResponse::CurrentWitnessSubjects {
580 governance_version,
581 items,
582 next_cursor,
583 } = response
584 else {
585 return Err(ActorError::UnexpectedResponse {
586 path,
587 expected: "WitnessesRegisterResponse::CurrentWitnessSubjects"
588 .to_owned(),
589 });
590 };
591
592 self.network
593 .send_command(network::CommandHelper::SendMessage {
594 message: NetworkMessage {
595 info: ComunicateInfo {
596 receiver: request.sender,
597 request_id: request.info.request_id,
598 version: request.info.version,
599 receiver_actor: request.receiver_actor,
600 },
601 message: ActorMessage::TrackerSyncRes {
602 request_nonce: request.request_nonce,
603 governance_version,
604 items,
605 next_cursor,
606 },
607 },
608 })
609 .await
610 }
611
612 async fn finish_cycle(
613 &mut self,
614 ctx: &ActorContext<Self>,
615 ) -> Result<(), ActorError> {
616 self.state = SyncState::Idle;
617 self.schedule_tick(ctx).await
618 }
619}
620
621#[async_trait]
622impl Actor for TrackerSync {
623 type Event = ();
624 type Message = TrackerSyncMessage;
625 type Response = TrackerSyncResponse;
626
627 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
628 parent_span.map_or_else(
629 || info_span!("TrackerSync"),
630 |parent| info_span!(parent: parent, "TrackerSync"),
631 )
632 }
633
634 async fn pre_start(
635 &mut self,
636 ctx: &mut ActorContext<Self>,
637 ) -> Result<(), ActorError> {
638 self.schedule_tick(ctx).await
639 }
640}
641
642impl NotPersistentActor for TrackerSync {}
643
644#[async_trait]
645impl Handler<Self> for TrackerSync {
646 async fn handle_message(
647 &mut self,
648 _sender: ActorPath,
649 msg: TrackerSyncMessage,
650 ctx: &mut ActorContext<Self>,
651 ) -> Result<TrackerSyncResponse, ActorError> {
652 match msg {
653 TrackerSyncMessage::Tick => {
654 if let Err(error) = self.handle_tick(ctx).await {
655 Self::observe_round("error");
656 warn!(
657 governance_id = %self.governance_id,
658 error = %error,
659 "Tracker sync tick failed"
660 );
661 self.finish_cycle(ctx).await?;
662 }
663 }
664 TrackerSyncMessage::FetchTimeout { request_nonce } => {
665 let timed_out = matches!(
666 &self.state,
667 SyncState::Fetching(state)
668 if state.request_nonce == request_nonce
669 );
670
671 if timed_out {
672 Self::observe_round("timeout");
673 warn!(
674 governance_id = %self.governance_id,
675 request_nonce = request_nonce,
676 "Tracker sync fetch timed out"
677 );
678 self.finish_cycle(ctx).await?;
679 }
680 }
681 TrackerSyncMessage::UpdateTimeout { batch_nonce } => {
682 let timed_out = matches!(
683 &self.state,
684 SyncState::Updating(state)
685 if state.batch_nonce == batch_nonce
686 );
687
688 if timed_out {
689 self.advance_update_phase(ctx).await?;
690 }
691 }
692 TrackerSyncMessage::NetworkRequest(request) => {
693 self.handle_network_request(ctx, request).await?;
694 }
695 TrackerSyncMessage::NetworkResponse(
696 TrackerSyncNetworkResponse {
697 peer,
698 request_nonce,
699 governance_version,
700 items,
701 next_cursor,
702 },
703 ) => {
704 let (active_peer, active_governance_version) = match &self.state
705 {
706 SyncState::Fetching(state)
707 if state.peer == peer
708 && state.request_nonce == request_nonce =>
709 {
710 (state.peer.clone(), state.governance_version)
711 }
712 _ => return Ok(TrackerSyncResponse::None),
713 };
714
715 debug!(
716 governance_id = %self.governance_id,
717 peer = %peer,
718 request_nonce = request_nonce,
719 governance_version = governance_version,
720 item_count = items.len(),
721 has_next = next_cursor.is_some(),
722 "Received tracker sync page"
723 );
724
725 let local_governance_version =
726 self.get_governance_version(ctx).await?;
727 let effective_governance_version =
728 local_governance_version.max(governance_version);
729
730 if effective_governance_version != active_governance_version {
731 Self::observe_round("gov_changed");
732 self.start_fetch(
733 ctx,
734 active_peer,
735 effective_governance_version,
736 None,
737 )
738 .await?;
739 return Ok(TrackerSyncResponse::None);
740 }
741
742 let pending_items =
743 self.build_pending_updates(ctx, items).await?;
744 if pending_items.is_empty() {
745 if let Some(after_subject_id) = next_cursor {
746 self.start_fetch(
747 ctx,
748 active_peer,
749 governance_version,
750 Some(after_subject_id),
751 )
752 .await?;
753 } else {
754 Self::observe_round("completed");
755 self.finish_cycle(ctx).await?;
756 }
757 return Ok(TrackerSyncResponse::None);
758 }
759
760 self.start_update_phase(
761 ctx,
762 active_peer,
763 governance_version,
764 pending_items,
765 next_cursor,
766 )
767 .await?;
768 }
769 }
770
771 Ok(TrackerSyncResponse::None)
772 }
773}