1use std::{
2 collections::{HashMap, HashSet},
3 sync::Arc,
4};
5
6use ave_actors::{
7 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
8 NotPersistentActor,
9};
10
11use async_trait::async_trait;
12use ave_common::identity::{DigestIdentifier, PublicKey};
13use ave_network::ComunicateInfo;
14use serde::{Deserialize, Serialize};
15use tracing::{Span, debug, error, info_span, warn};
16use updater::{Updater, UpdaterMessage};
17
18use crate::{
19 NetworkMessage,
20 governance::witnesses_register::{
21 TrackerDeliveryMode, TrackerDeliveryRange,
22 },
23 helpers::network::{ActorMessage, service::NetworkSender},
24 model::common::{emit_fail, subject::get_local_subject_sn},
25 request::manager::{RequestManager, RequestManagerMessage},
26};
27
28pub mod updater;
29
30#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
31pub enum UpdateSubjectKind {
32 Governance,
33 Tracker,
34}
35
36#[derive(Debug, Serialize, Deserialize, Clone)]
37pub struct UpdateWitnessOffer {
38 pub kind: UpdateSubjectKind,
39 pub sn: u64,
40 pub clear_sn: Option<u64>,
41 pub ranges: Vec<TrackerDeliveryRange>,
42}
43
44#[derive(Clone, Copy, Debug, PartialEq, Eq)]
45enum UpdateStartMode {
46 Direct,
47 Sweep,
48 Empty,
49}
50
51#[derive(Clone, Debug, Serialize, Deserialize)]
52pub enum UpdateType {
53 Auth,
54 Request {
55 subject_id: DigestIdentifier,
56 id: DigestIdentifier,
57 },
58}
59
60pub struct UpdateNew {
61 pub subject_id: DigestIdentifier,
62 pub witnesses: HashSet<PublicKey>,
63 pub update_type: UpdateType,
64 pub network: Arc<NetworkSender>,
65 pub our_sn: Option<u64>,
66 pub subject_kind_hint: Option<UpdateSubjectKind>,
67 pub round_retry_interval_secs: u64,
68 pub max_round_retries: usize,
69 pub witness_retry_count: usize,
70 pub witness_retry_interval_secs: u64,
71}
72
73#[derive(Clone, Debug)]
74pub struct Update {
75 subject_id: DigestIdentifier,
76 witnesses: HashSet<PublicKey>,
77 all_witnesses: HashSet<PublicKey>,
78 offers: HashMap<PublicKey, UpdateWitnessOffer>,
79 our_sn: Option<u64>,
80 update_type: UpdateType,
81 network: Arc<NetworkSender>,
82 retry_round: u64,
83 retry_token: u64,
84 retry_attempt: usize,
85 subject_kind_hint: Option<UpdateSubjectKind>,
86 round_retry_interval_secs: u64,
87 max_round_retries: usize,
88 witness_retry_count: usize,
89 witness_retry_interval_secs: u64,
90}
91
92impl Update {
93 pub fn new(data: UpdateNew) -> Self {
94 Self {
95 network: data.network,
96 subject_id: data.subject_id,
97 witnesses: data.witnesses.clone(),
98 all_witnesses: data.witnesses,
99 update_type: data.update_type,
100 our_sn: data.our_sn,
101 offers: HashMap::new(),
102 retry_round: 0,
103 retry_token: 0,
104 retry_attempt: 0,
105 subject_kind_hint: data.subject_kind_hint,
106 round_retry_interval_secs: data.round_retry_interval_secs,
107 max_round_retries: data.max_round_retries,
108 witness_retry_count: data.witness_retry_count,
109 witness_retry_interval_secs: data.witness_retry_interval_secs,
110 }
111 }
112
113 const fn should_retry_auth_rounds(&self) -> bool {
114 matches!(self.update_type, UpdateType::Auth)
115 && !matches!(
116 self.subject_kind_hint,
117 Some(UpdateSubjectKind::Governance)
118 )
119 }
120
121 fn has_progress(&self, sn: u64) -> bool {
122 self.our_sn.is_none_or(|our_sn| sn > our_sn)
123 }
124
125 fn next_needed_sn(&self) -> u64 {
126 self.our_sn.map_or(0, |sn| sn.saturating_add(1))
127 }
128
129 fn next_tracker_range<'a>(
130 &self,
131 ranges: &'a [TrackerDeliveryRange],
132 ) -> Option<&'a TrackerDeliveryRange> {
133 let next_sn = self.next_needed_sn();
134 ranges
135 .iter()
136 .find(|range| range.from_sn <= next_sn && next_sn <= range.to_sn)
137 }
138
139 const fn tracker_range_rank(mode: &TrackerDeliveryMode) -> u8 {
140 match mode {
141 TrackerDeliveryMode::Clear => 1,
142 TrackerDeliveryMode::Opaque => 0,
143 }
144 }
145
146 fn insert_offer(
147 &mut self,
148 sender: PublicKey,
149 offer: Option<UpdateWitnessOffer>,
150 ) {
151 if let Some(offer) = offer
152 && self.has_progress(offer.sn)
153 {
154 self.offers.insert(sender, offer);
155 }
156 }
157
158 fn select_tracker_offer(
159 &self,
160 ) -> Option<(PublicKey, UpdateWitnessOffer, u64)> {
161 self.offers
162 .iter()
163 .filter(|(_, offer)| offer.kind == UpdateSubjectKind::Tracker)
164 .filter_map(|(sender, offer)| {
165 if !self.has_progress(offer.sn) {
166 return None;
167 }
168
169 let range = self.next_tracker_range(&offer.ranges)?;
170 let target_sn = range.to_sn.min(offer.sn);
171 Some((
172 sender.clone(),
173 offer.clone(),
174 target_sn,
175 (
176 Self::tracker_range_rank(&range.mode),
177 target_sn,
178 offer.sn,
179 ),
180 ))
181 })
182 .max_by_key(|(.., rank)| *rank)
183 .map(|(sender, offer, target_sn, _)| (sender, offer, target_sn))
184 }
185
186 fn select_governance_offer(
187 &self,
188 ) -> Option<(PublicKey, UpdateWitnessOffer, u64)> {
189 self.offers
190 .iter()
191 .filter(|(_, offer)| offer.kind == UpdateSubjectKind::Governance)
192 .filter(|(_, offer)| self.has_progress(offer.sn))
193 .max_by_key(|(_, offer)| offer.sn)
194 .map(|(sender, offer)| (sender.clone(), offer.clone(), offer.sn))
195 }
196
197 fn select_next_request(
198 &self,
199 ) -> Option<(PublicKey, UpdateWitnessOffer, u64)> {
200 self.select_tracker_offer()
201 .or_else(|| self.select_governance_offer())
202 }
203
204 fn check_witness(&mut self, witness: PublicKey) -> bool {
205 self.witnesses.remove(&witness)
206 }
207
208 fn reset_round(&mut self) {
209 self.witnesses = self.all_witnesses.clone();
210 self.offers.clear();
211 }
212
213 async fn stop_active_updaters(
214 &self,
215 ctx: &ActorContext<Self>,
216 ) -> Result<(), ActorError> {
217 let children = ctx.system().children(ctx.path()).await;
218 for child_path in children {
219 let Ok(child) =
220 ctx.system().get_actor::<Updater>(&child_path).await
221 else {
222 continue;
223 };
224
225 if let Err(e) = child.ask_stop().await {
226 warn!(
227 subject_id = %self.subject_id,
228 child = %child_path,
229 error = %e,
230 "Failed to stop stale updater child before starting next round"
231 );
232 }
233 }
234
235 Ok(())
236 }
237
238 async fn request_distribution(
239 &self,
240 witness: PublicKey,
241 target_sn: Option<u64>,
242 ) -> Result<(), ActorError> {
243 let info = ComunicateInfo {
244 receiver: witness,
245 request_id: String::default(),
246 version: 0,
247 receiver_actor: format!(
248 "/user/node/distributor_{}",
249 self.subject_id
250 ),
251 };
252
253 self.network
254 .send_command(ave_network::CommandHelper::SendMessage {
255 message: NetworkMessage {
256 info,
257 message: ActorMessage::DistributionLedgerReq {
258 actual_sn: self.our_sn,
259 target_sn,
260 subject_id: self.subject_id.clone(),
261 },
262 },
263 })
264 .await
265 }
266
267 async fn create_updates(
268 &mut self,
269 ctx: &mut ActorContext<Self>,
270 ) -> Result<UpdateStartMode, ActorError> {
271 if self.all_witnesses.len() == 1 && self.our_sn.is_none() {
272 let Some(witness) = self.all_witnesses.iter().next() else {
273 return Ok(UpdateStartMode::Direct);
274 };
275
276 self.request_distribution(witness.clone(), None).await?;
277 return Ok(UpdateStartMode::Direct);
278 }
279
280 self.stop_active_updaters(ctx).await?;
281
282 for witness in self.witnesses.clone() {
283 let updater = Updater::new(
284 witness.clone(),
285 self.retry_round,
286 self.network.clone(),
287 self.witness_retry_count,
288 self.witness_retry_interval_secs,
289 );
290 let child_name = format!("{}_{}", witness, self.retry_round);
291 let child = ctx.create_child(&child_name, updater).await?;
292 let message = UpdaterMessage::NetworkLastSn {
293 subject_id: self.subject_id.clone(),
294 actual_sn: self.our_sn,
295 };
296
297 if let Err(e) = child.tell(message).await {
298 warn!(
299 subject_id = %self.subject_id,
300 witness = %witness,
301 error = %e,
302 "Updater child rejected round start message, skipping this witness in current round"
303 );
304 self.witnesses.remove(&witness);
305 continue;
306 }
307 }
308 if self.witnesses.is_empty() {
309 Ok(UpdateStartMode::Empty)
310 } else {
311 Ok(UpdateStartMode::Sweep)
312 }
313 }
314
315 async fn schedule_retry(
316 &mut self,
317 ctx: &ActorContext<Self>,
318 expected_target_sn: u64,
319 attempt: usize,
320 ) -> Result<(), ActorError> {
321 let actor = ctx.reference().await?;
322 let round = self.retry_round;
323 let token = self.retry_token.saturating_add(1);
324 self.retry_token = token;
325 let retry_interval_secs = self.round_retry_interval_secs.max(1);
326 tokio::spawn(async move {
327 tokio::time::sleep(std::time::Duration::from_secs(
328 retry_interval_secs,
329 ))
330 .await;
331 let _ = actor
332 .tell(UpdateMessage::RetryRound {
333 expected_target_sn,
334 round,
335 attempt,
336 token,
337 })
338 .await;
339 });
340
341 Ok(())
342 }
343}
344
345#[derive(Debug, Clone)]
346pub enum UpdateMessage {
347 Run,
348 Continue,
349 RetryRound {
350 expected_target_sn: u64,
351 round: u64,
352 attempt: usize,
353 token: u64,
354 },
355 Response {
356 sender: PublicKey,
357 offer: Option<UpdateWitnessOffer>,
358 round: u64,
359 },
360}
361
362impl Message for UpdateMessage {}
363
364#[async_trait]
365impl Actor for Update {
366 type Event = ();
367 type Message = UpdateMessage;
368 type Response = ();
369
370 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
371 parent_span.map_or_else(
372 || info_span!("Update", id),
373 |parent_span| info_span!(parent: parent_span, "Update", id),
374 )
375 }
376}
377
378impl NotPersistentActor for Update {}
379
380#[async_trait]
381impl Handler<Self> for Update {
382 async fn handle_message(
383 &mut self,
384 _sender: ActorPath,
385 msg: UpdateMessage,
386 ctx: &mut ActorContext<Self>,
387 ) -> Result<(), ActorError> {
388 match msg {
389 UpdateMessage::Run => {
390 self.retry_attempt = 0;
391 self.reset_round();
392 let start_mode = match self.create_updates(ctx).await {
393 Ok(start_mode) => start_mode,
394 Err(e) => {
395 error!(
396 msg_type = "Run",
397 error = %e,
398 "Failed to create updates"
399 );
400 return Err(emit_fail(ctx, e).await);
401 }
402 };
403
404 match start_mode {
405 UpdateStartMode::Direct | UpdateStartMode::Empty => {
406 ctx.stop(None).await;
407 return Ok(());
408 }
409 UpdateStartMode::Sweep => {}
410 }
411
412 debug!(
413 msg_type = "Run",
414 witnesses_count = self.witnesses.len(),
415 "Updates created successfully"
416 );
417 }
418 UpdateMessage::Continue => {
419 let current_sn =
420 get_local_subject_sn(ctx, &self.subject_id).await?;
421 self.our_sn = current_sn;
422 self.retry_round = self.retry_round.saturating_add(1);
423 self.retry_attempt = 0;
424 self.reset_round();
425
426 let start_mode = match self.create_updates(ctx).await {
427 Ok(start_mode) => start_mode,
428 Err(e) => {
429 error!(
430 msg_type = "Continue",
431 subject_id = %self.subject_id,
432 current_sn = ?current_sn,
433 error = %e,
434 "Failed to continue update round"
435 );
436 return Err(emit_fail(ctx, e).await);
437 }
438 };
439
440 match start_mode {
441 UpdateStartMode::Direct | UpdateStartMode::Empty => {
442 ctx.stop(None).await;
443 return Ok(());
444 }
445 UpdateStartMode::Sweep => {}
446 }
447 }
448 UpdateMessage::RetryRound {
449 expected_target_sn,
450 round,
451 attempt,
452 token,
453 } => {
454 if round != self.retry_round || token != self.retry_token {
455 return Ok(());
456 }
457
458 let current_sn =
459 get_local_subject_sn(ctx, &self.subject_id).await?;
460 if current_sn.is_some_and(|sn| sn >= expected_target_sn) {
461 debug!(
462 msg_type = "RetryRound",
463 subject_id = %self.subject_id,
464 current_sn = ?current_sn,
465 expected_target_sn = expected_target_sn,
466 "Update target already reached before retry round restart"
467 );
468 ctx.stop(None).await;
469 return Ok(());
470 }
471
472 if attempt >= self.max_round_retries {
473 warn!(
474 msg_type = "RetryRound",
475 subject_id = %self.subject_id,
476 current_sn = ?current_sn,
477 expected_target_sn = expected_target_sn,
478 attempt = attempt,
479 "Update retry round exhausted before reaching target"
480 );
481 ctx.stop(None).await;
482 return Ok(());
483 }
484
485 self.our_sn = current_sn;
486 self.retry_round = self.retry_round.saturating_add(1);
487 self.retry_attempt = attempt.saturating_add(1);
488 self.reset_round();
489
490 let start_mode = match self.create_updates(ctx).await {
491 Ok(start_mode) => start_mode,
492 Err(e) => {
493 error!(
494 msg_type = "RetryRound",
495 subject_id = %self.subject_id,
496 current_sn = ?current_sn,
497 expected_target_sn = expected_target_sn,
498 error = %e,
499 "Failed to restart update round"
500 );
501 return Err(emit_fail(ctx, e).await);
502 }
503 };
504
505 match start_mode {
506 UpdateStartMode::Direct | UpdateStartMode::Empty => {
507 ctx.stop(None).await;
508 return Ok(());
509 }
510 UpdateStartMode::Sweep => {}
511 }
512 }
513 UpdateMessage::Response {
514 sender,
515 offer,
516 round,
517 } => {
518 if round != self.retry_round {
519 return Ok(());
520 }
521
522 if self.check_witness(sender.clone()) {
523 self.insert_offer(sender, offer);
524
525 if self.witnesses.is_empty() {
526 let selected_request = self.select_next_request();
527 let mut keep_running = false;
528
529 if let Some((better_node, offer, target_sn)) =
530 selected_request.clone()
531 {
532 let expected_target_sn = self
533 .offers
534 .values()
535 .filter(|offer| {
536 offer.kind == UpdateSubjectKind::Tracker
537 })
538 .map(|offer| offer.sn)
539 .max()
540 .unwrap_or(target_sn);
541
542 if let Err(e) = self
543 .request_distribution(
544 better_node.clone(),
545 Some(target_sn),
546 )
547 .await
548 {
549 error!(
550 msg_type = "Response",
551 error = %e,
552 node = %better_node,
553 "Failed to send request to network"
554 );
555 return Err(emit_fail(ctx, e).await);
556 } else {
557 debug!(
558 msg_type = "Response",
559 node = %better_node,
560 subject_id = %self.subject_id,
561 offer_kind = ?offer.kind,
562 offer_sn = offer.sn,
563 offer_clear_sn = ?offer.clear_sn,
564 target_sn = target_sn,
565 "Request sent to better node"
566 );
567 }
568
569 if matches!(offer.kind, UpdateSubjectKind::Tracker)
570 && self.should_retry_auth_rounds()
571 {
572 keep_running = true;
573 if let Err(e) = self
574 .schedule_retry(
575 ctx,
576 expected_target_sn,
577 self.retry_attempt,
578 )
579 .await
580 {
581 error!(
582 msg_type = "Response",
583 subject_id = %self.subject_id,
584 expected_target_sn = expected_target_sn,
585 error = %e,
586 "Failed to schedule update retry"
587 );
588 return Err(emit_fail(ctx, e).await);
589 }
590 }
591 }
592
593 if let UpdateType::Request { id, subject_id } =
594 &self.update_type
595 {
596 let request_path = ActorPath::from(format!(
597 "/user/request/{}",
598 subject_id
599 ));
600 match ctx
601 .system()
602 .get_actor::<RequestManager>(&request_path)
603 .await
604 {
605 Ok(request_actor) => {
606 let request = if self.offers.is_empty() {
607 RequestManagerMessage::FinishReboot {
608 request_id: id.clone(),
609 }
610 } else {
611 RequestManagerMessage::RebootWait {
612 request_id: id.clone(),
613 governance_id: self
614 .subject_id
615 .clone(),
616 }
617 };
618
619 if let Err(e) =
620 request_actor.tell(request).await
621 {
622 error!(
623 msg_type = "Response",
624 error = %e,
625 subject_id = %self.subject_id,
626 "Failed to send response to request actor"
627 );
628 return Err(emit_fail(ctx, e).await);
629 }
630 }
631 Err(e) => {
632 error!(
633 msg_type = "Response",
634 error = %e,
635 path = %request_path,
636 subject_id = %self.subject_id,
637 "Request actor not found"
638 );
639 return Err(emit_fail(ctx, e).await);
640 }
641 };
642 };
643
644 debug!(
645 msg_type = "Response",
646 subject_id = %self.subject_id,
647 has_better = selected_request.is_some(),
648 "All witnesses responded, update complete"
649 );
650
651 if !self.should_retry_auth_rounds() || !keep_running {
652 ctx.stop(None).await;
653 }
654 }
655 } else {
656 warn!(
657 msg_type = "Response",
658 subject_id = %self.subject_id,
659 sender = %sender,
660 has_offer = self.offers.contains_key(&sender),
661 "Ignoring response from unexpected or already-processed witness"
662 );
663 }
664 }
665 };
666
667 Ok(())
668 }
669
670 async fn on_child_fault(
671 &mut self,
672 error: ActorError,
673 ctx: &mut ActorContext<Self>,
674 ) -> ChildAction {
675 error!(
676 subject_id = %self.subject_id,
677 update_type = ?self.update_type,
678 error = %error,
679 "Child fault in update actor"
680 );
681 emit_fail(ctx, error).await;
682 ChildAction::Stop
683 }
684}