1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt;
3use std::ops::ControlFlow;
4use std::time;
5
6use crate::node::NodeId;
7
8use super::{PrivateNetwork, ReplicationFactor};
9
10#[derive(Debug)]
11pub struct Announcer {
12 local_node: NodeId,
13 target: Target,
14 synced: BTreeMap<NodeId, SyncStatus>,
15 to_sync: BTreeSet<NodeId>,
16}
17
18impl Announcer {
19 pub fn new(mut config: AnnouncerConfig) -> Result<Self, AnnouncerError> {
36 config.preferred_seeds.remove(&config.local_node);
38 config.synced.remove(&config.local_node);
39 config.unsynced.remove(&config.local_node);
40
41 let unsynced_preferred = config
44 .preferred_seeds
45 .difference(&config.synced)
46 .copied()
47 .collect::<BTreeSet<_>>();
48 config.unsynced.extend(unsynced_preferred);
49
50 let to_sync = config
53 .unsynced
54 .difference(&config.synced)
55 .copied()
56 .collect::<BTreeSet<_>>();
57
58 if config.synced.is_empty() && to_sync.is_empty() {
59 return Err(AnnouncerError::NoSeeds);
60 }
61
62 if to_sync.is_empty() {
63 let preferred = config.synced.intersection(&config.preferred_seeds).count();
64 return Err(AlreadySynced {
65 preferred,
66 synced: config.synced.len(),
67 }
68 .into());
69 }
70
71 let replicas = config.replicas.min(to_sync.len());
72 let announcer = Self {
73 local_node: config.local_node,
74 target: Target::new(config.preferred_seeds, replicas)
75 .map_err(AnnouncerError::Target)?,
76 synced: config
77 .synced
78 .into_iter()
79 .map(|nid| (nid, SyncStatus::AlreadySynced))
80 .collect(),
81 to_sync,
82 };
83 match announcer.is_target_reached() {
84 None => Ok(announcer),
85 Some(outcome) => match outcome {
86 SuccessfulOutcome::MinReplicationFactor { preferred, synced } => {
87 Err(AlreadySynced { preferred, synced }.into())
88 }
89 SuccessfulOutcome::MaxReplicationFactor { preferred, synced } => {
90 Err(AlreadySynced { preferred, synced }.into())
91 }
92 SuccessfulOutcome::PreferredNodes {
93 preferred,
94 total_nodes_synced,
95 } => Err(AlreadySynced {
96 preferred,
97 synced: total_nodes_synced,
98 }
99 .into()),
100 },
101 }
102 }
103
104 pub fn synced_with(
113 &mut self,
114 node: NodeId,
115 duration: time::Duration,
116 ) -> ControlFlow<Success, Progress> {
117 if node == self.local_node {
118 return ControlFlow::Continue(self.progress());
119 }
120 self.to_sync.remove(&node);
121 self.synced.insert(node, SyncStatus::Synced { duration });
122 self.finished()
123 }
124
125 pub fn timed_out(self) -> AnnouncerResult {
131 match self.is_target_reached() {
132 None => TimedOut {
133 synced: self.synced,
134 timed_out: self.to_sync,
135 }
136 .into(),
137 Some(outcome) => Success {
138 outcome,
139 synced: self.synced,
140 }
141 .into(),
142 }
143 }
144
145 pub fn can_continue(self) -> ControlFlow<NoNodes, Self> {
155 if self.to_sync.is_empty() {
156 ControlFlow::Break(NoNodes {
157 synced: self.synced,
158 })
159 } else {
160 ControlFlow::Continue(self)
161 }
162 }
163
164 pub fn to_sync(&self) -> BTreeSet<NodeId> {
166 self.to_sync
167 .iter()
168 .filter(|node| *node != &self.local_node)
169 .copied()
170 .collect()
171 }
172
173 pub fn target(&self) -> &Target {
175 &self.target
176 }
177
178 pub fn progress(&self) -> Progress {
180 let SuccessCounts { preferred, synced } = self.success_counts();
181 let unsynced = self.to_sync.len();
182 Progress {
183 preferred,
184 synced,
185 unsynced,
186 }
187 }
188
189 fn finished(&self) -> ControlFlow<Success, Progress> {
190 let progress = self.progress();
191 self.is_target_reached()
192 .map_or(ControlFlow::Continue(progress), |outcome| {
193 ControlFlow::Break(Success {
194 outcome,
195 synced: self.synced.clone(),
196 })
197 })
198 }
199
200 fn is_target_reached(&self) -> Option<SuccessfulOutcome> {
201 debug_assert!(self.target.has_preferred_seeds() || self.target.has_replication_factor());
204
205 let SuccessCounts { preferred, synced } = self.success_counts();
206 if self.target.has_preferred_seeds() && preferred >= self.target.preferred_seeds.len() {
207 Some(SuccessfulOutcome::PreferredNodes {
208 preferred: self.target.preferred_seeds.len(),
209 total_nodes_synced: synced,
210 })
211 } else {
212 if !self.target.has_replication_factor() {
214 return None;
215 }
216 let replicas = self.target.replicas();
217 let min = replicas.lower_bound();
218 match replicas.upper_bound() {
219 None => (synced >= min)
220 .then_some(SuccessfulOutcome::MinReplicationFactor { preferred, synced }),
221 Some(max) => (synced >= max)
222 .then_some(SuccessfulOutcome::MaxReplicationFactor { preferred, synced }),
223 }
224 }
225 }
226
227 fn success_counts(&self) -> SuccessCounts {
228 self.synced
229 .keys()
230 .fold(SuccessCounts::default(), |counts, nid| {
231 if self.target.preferred_seeds.contains(nid) {
232 counts.preferred().synced()
233 } else {
234 counts.synced()
235 }
236 })
237 }
238}
239
240#[derive(Default)]
241struct SuccessCounts {
242 preferred: usize,
243 synced: usize,
244}
245
246impl SuccessCounts {
247 fn synced(self) -> Self {
248 Self {
249 synced: self.synced + 1,
250 ..self
251 }
252 }
253
254 fn preferred(self) -> Self {
255 Self {
256 preferred: self.preferred + 1,
257 ..self
258 }
259 }
260}
261
262#[derive(Clone, Debug)]
264pub struct AnnouncerConfig {
265 local_node: NodeId,
266 replicas: ReplicationFactor,
267 preferred_seeds: BTreeSet<NodeId>,
268 synced: BTreeSet<NodeId>,
269 unsynced: BTreeSet<NodeId>,
270}
271
272impl AnnouncerConfig {
273 pub fn private(local: NodeId, replicas: ReplicationFactor, network: PrivateNetwork) -> Self {
283 AnnouncerConfig {
284 local_node: local,
285 replicas,
286 preferred_seeds: network.allowed.clone(),
287 synced: BTreeSet::new(),
290 unsynced: network.allowed,
291 }
292 }
293
294 pub fn public(
308 local: NodeId,
309 replicas: ReplicationFactor,
310 preferred_seeds: BTreeSet<NodeId>,
311 synced: BTreeSet<NodeId>,
312 unsynced: BTreeSet<NodeId>,
313 ) -> Self {
314 Self {
315 local_node: local,
316 replicas,
317 preferred_seeds,
318 synced,
319 unsynced,
320 }
321 }
322}
323
324#[derive(Debug)]
326pub enum AnnouncerResult {
327 Success(Success),
329 TimedOut(TimedOut),
334 NoNodes(NoNodes),
338}
339
340impl AnnouncerResult {
341 pub fn synced(&self) -> &BTreeMap<NodeId, SyncStatus> {
343 match self {
344 AnnouncerResult::Success(Success { synced, .. }) => synced,
345 AnnouncerResult::TimedOut(TimedOut { synced, .. }) => synced,
346 AnnouncerResult::NoNodes(NoNodes { synced }) => synced,
347 }
348 }
349
350 pub fn is_synced(&self, node: &NodeId) -> bool {
352 let synced = self.synced();
353 synced.contains_key(node)
354 }
355}
356
357impl From<Success> for AnnouncerResult {
358 fn from(s: Success) -> Self {
359 Self::Success(s)
360 }
361}
362
363impl From<TimedOut> for AnnouncerResult {
364 fn from(to: TimedOut) -> Self {
365 Self::TimedOut(to)
366 }
367}
368
369impl From<NoNodes> for AnnouncerResult {
370 fn from(no: NoNodes) -> Self {
371 Self::NoNodes(no)
372 }
373}
374
375#[derive(Debug)]
376pub struct NoNodes {
377 synced: BTreeMap<NodeId, SyncStatus>,
378}
379
380impl NoNodes {
381 pub fn synced(&self) -> &BTreeMap<NodeId, SyncStatus> {
383 &self.synced
384 }
385}
386
387#[derive(Debug)]
388pub struct TimedOut {
389 synced: BTreeMap<NodeId, SyncStatus>,
390 timed_out: BTreeSet<NodeId>,
391}
392
393impl TimedOut {
394 pub fn synced(&self) -> &BTreeMap<NodeId, SyncStatus> {
396 &self.synced
397 }
398
399 pub fn timed_out(&self) -> &BTreeSet<NodeId> {
401 &self.timed_out
402 }
403}
404
405#[derive(Debug)]
406pub struct Success {
407 outcome: SuccessfulOutcome,
408 synced: BTreeMap<NodeId, SyncStatus>,
409}
410
411impl Success {
412 pub fn outcome(&self) -> SuccessfulOutcome {
414 self.outcome
415 }
416
417 pub fn synced(&self) -> &BTreeMap<NodeId, SyncStatus> {
419 &self.synced
420 }
421}
422
423#[derive(Debug, PartialEq, Eq)]
425pub enum AnnouncerError {
426 AlreadySynced(AlreadySynced),
429 NoSeeds,
431 Target(TargetError),
433}
434
435impl fmt::Display for AnnouncerError {
436 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
437 match self {
438 AnnouncerError::AlreadySynced(AlreadySynced { preferred, synced }) => write!(
439 f,
440 "already synchronized with {synced} nodes ({preferred} preferred nodes)"
441 ),
442 AnnouncerError::NoSeeds => {
443 f.write_str("no more nodes are available for synchronizing with")
444 }
445 AnnouncerError::Target(target_error) => target_error.fmt(f),
446 }
447 }
448}
449
450impl std::error::Error for AnnouncerError {}
451
452impl From<AlreadySynced> for AnnouncerError {
453 fn from(value: AlreadySynced) -> Self {
454 Self::AlreadySynced(value)
455 }
456}
457
458#[derive(Debug, PartialEq, Eq)]
459pub struct AlreadySynced {
460 preferred: usize,
462 synced: usize,
466}
467
468impl AlreadySynced {
469 pub fn preferred(&self) -> usize {
471 self.preferred
472 }
473
474 pub fn synced(&self) -> usize {
476 self.synced
477 }
478}
479
480#[derive(Clone, Copy, Debug, PartialEq, Eq)]
482pub enum SyncStatus {
483 AlreadySynced,
486 Synced { duration: time::Duration },
489}
490
491#[derive(Clone, Copy, Debug, PartialEq, Eq)]
493pub struct Progress {
494 preferred: usize,
495 synced: usize,
496 unsynced: usize,
497}
498
499impl Progress {
500 pub fn preferred(&self) -> usize {
502 self.preferred
503 }
504
505 pub fn synced(&self) -> usize {
507 self.synced
508 }
509
510 pub fn unsynced(&self) -> usize {
512 self.unsynced
513 }
514}
515
516#[derive(Debug, thiserror::Error, PartialEq, Eq)]
517#[non_exhaustive]
518#[error("a minimum number of replicas or set of preferred seeds must be provided")]
519pub struct TargetError;
520
521#[derive(Clone, Debug, PartialEq, Eq)]
523pub struct Target {
524 preferred_seeds: BTreeSet<NodeId>,
525 replicas: ReplicationFactor,
526}
527
528impl Target {
529 pub fn new(
530 preferred_seeds: BTreeSet<NodeId>,
531 replicas: ReplicationFactor,
532 ) -> Result<Self, TargetError> {
533 if replicas.lower_bound() == 0 && preferred_seeds.is_empty() {
534 Err(TargetError)
535 } else {
536 Ok(Self {
537 preferred_seeds,
538 replicas,
539 })
540 }
541 }
542
543 pub fn preferred_seeds(&self) -> &BTreeSet<NodeId> {
545 &self.preferred_seeds
546 }
547
548 pub fn replicas(&self) -> &ReplicationFactor {
550 &self.replicas
551 }
552
553 pub fn has_preferred_seeds(&self) -> bool {
555 !self.preferred_seeds.is_empty()
556 }
557
558 pub fn has_replication_factor(&self) -> bool {
560 self.replicas.lower_bound() != 0
561 }
562}
563
564#[derive(Clone, Copy, Debug, PartialEq, Eq)]
565pub enum SuccessfulOutcome {
566 MinReplicationFactor {
567 preferred: usize,
568 synced: usize,
569 },
570 MaxReplicationFactor {
571 preferred: usize,
572 synced: usize,
573 },
574 PreferredNodes {
575 preferred: usize,
576 total_nodes_synced: usize,
577 },
578}
579
580#[allow(clippy::unwrap_used)]
581#[cfg(test)]
582mod test {
583 use crate::{assert_matches, test::arbitrary};
584
585 use super::*;
586
587 #[test]
588 fn all_synced_nodes_are_preferred_seeds() {
589 let local = arbitrary::gen::<NodeId>(0);
590 let seeds = arbitrary::set::<NodeId>(5..=5);
591
592 let preferred_seeds = seeds.iter().take(3).copied().collect::<BTreeSet<_>>();
594 let unsynced = preferred_seeds.clone(); let config = AnnouncerConfig::public(
597 local,
598 ReplicationFactor::must_reach(5),
600 preferred_seeds.clone(),
601 BTreeSet::new(),
602 unsynced,
603 );
604
605 let mut announcer = Announcer::new(config).unwrap();
606
607 let mut synced_count = 0;
609 let mut result = None;
610 for &node in &preferred_seeds {
611 let duration = time::Duration::from_secs(1);
612 synced_count += 1;
613
614 match announcer.synced_with(node, duration) {
615 ControlFlow::Continue(progress) => {
616 assert_eq!(
617 progress.preferred(),
618 synced_count,
619 "Preferred count should increment for each preferred seed"
620 );
621 assert_eq!(
622 progress.synced(),
623 synced_count,
624 "Total synced should equal preferred since all are preferred"
625 );
626 }
627 ControlFlow::Break(success) => {
628 result = Some(success);
629 break;
630 }
631 }
632 }
633 assert_eq!(
634 result.unwrap().outcome(),
635 SuccessfulOutcome::PreferredNodes {
636 preferred: preferred_seeds.len(),
637 total_nodes_synced: preferred_seeds.len()
638 },
639 "Should succeed with PreferredNodes outcome"
640 );
641 }
642
643 #[test]
644 fn preferred_seeds_already_synced() {
645 let local = arbitrary::gen::<NodeId>(0);
646 let seeds = arbitrary::set::<NodeId>(6..=6);
647
648 let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
649 let already_synced = preferred_seeds.clone(); let regular_unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
651
652 let config = AnnouncerConfig::public(
653 local,
654 ReplicationFactor::must_reach(4),
655 preferred_seeds.clone(),
656 already_synced.clone(),
657 regular_unsynced.clone(),
658 );
659
660 assert_eq!(
661 Announcer::new(config).err(),
662 Some(AnnouncerError::AlreadySynced(AlreadySynced {
663 preferred: 2,
664 synced: 2
665 }))
666 );
667 }
668
669 #[test]
670 fn announcer_reached_min_replication_target() {
671 let local = arbitrary::gen::<NodeId>(0);
672 let seeds = arbitrary::set::<NodeId>(10..=10);
673 let unsynced = seeds.iter().skip(3).copied().collect::<BTreeSet<_>>();
674 let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
675 let config = AnnouncerConfig::public(
676 local,
677 ReplicationFactor::must_reach(3),
678 preferred_seeds.clone(),
679 BTreeSet::new(),
680 unsynced.clone(),
681 );
682 let mut announcer = Announcer::new(config).unwrap();
683 let to_sync = announcer.to_sync();
684 assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());
685
686 let mut synced_result = BTreeMap::new();
687 let mut success = None;
688 let mut successes = 0;
689
690 for node in preferred_seeds.iter().take(1) {
691 let t = time::Duration::from_secs(1);
692 synced_result.insert(*node, SyncStatus::Synced { duration: t });
693 successes += 1;
694 match announcer.synced_with(*node, t) {
695 ControlFlow::Continue(progress) => {
696 assert_eq!(progress.synced(), successes)
697 }
698 ControlFlow::Break(stop) => {
699 success = Some(stop);
700 break;
701 }
702 }
703 }
704
705 for node in unsynced.iter() {
706 assert_ne!(*node, local);
707 let t = time::Duration::from_secs(1);
708 synced_result.insert(*node, SyncStatus::Synced { duration: t });
709 successes += 1;
710 match announcer.synced_with(*node, t) {
711 ControlFlow::Continue(progress) => {
712 assert_eq!(progress.synced(), successes)
713 }
714 ControlFlow::Break(stop) => {
715 success = Some(stop);
716 break;
717 }
718 }
719 }
720 assert_eq!(*success.as_ref().unwrap().synced(), synced_result);
721 assert_eq!(
722 success.as_ref().unwrap().outcome(),
723 SuccessfulOutcome::MinReplicationFactor {
724 preferred: 1,
725 synced: 3,
726 }
727 )
728 }
729
730 #[test]
731 fn announcer_reached_max_replication_target() {
732 let local = arbitrary::gen::<NodeId>(0);
733 let seeds = arbitrary::set::<NodeId>(10..=10);
734 let unsynced = seeds.iter().skip(3).copied().collect::<BTreeSet<_>>();
735 let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
736 let config = AnnouncerConfig::public(
737 local,
738 ReplicationFactor::range(3, 6),
739 preferred_seeds.clone(),
740 BTreeSet::new(),
741 unsynced.clone(),
742 );
743 let mut announcer = Announcer::new(config).unwrap();
744 let to_sync = announcer.to_sync();
745 assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());
746
747 let mut synced_result = BTreeMap::new();
748 let mut success = None;
749 let mut successes = 0;
750
751 for node in unsynced.iter() {
753 assert_ne!(*node, local);
754 let t = time::Duration::from_secs(1);
755 synced_result.insert(*node, SyncStatus::Synced { duration: t });
756 successes += 1;
757 match announcer.synced_with(*node, t) {
758 ControlFlow::Continue(progress) => {
759 assert_eq!(progress.synced(), successes)
760 }
761 ControlFlow::Break(stop) => {
762 success = Some(stop);
763 break;
764 }
765 }
766 }
767 assert_eq!(*success.as_ref().unwrap().synced(), synced_result);
768 assert_eq!(
769 success.as_ref().unwrap().outcome(),
770 SuccessfulOutcome::MaxReplicationFactor {
771 preferred: 0,
772 synced: 6,
773 }
774 )
775 }
776
777 #[test]
778 fn announcer_preferred_seeds_or_replica_factor() {
779 let local = arbitrary::gen::<NodeId>(0);
780 let seeds = arbitrary::set::<NodeId>(10..=10);
781 let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
782 let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
783 let config = AnnouncerConfig::public(
784 local,
785 ReplicationFactor::range(3, 6),
786 preferred_seeds.clone(),
787 BTreeSet::new(),
788 unsynced.clone(),
789 );
790 let mut announcer = Announcer::new(config).unwrap();
791 let to_sync = announcer.to_sync();
792 assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());
793
794 let mut synced_result = BTreeMap::new();
795 let mut success = None;
796 let mut successes = 0;
797
798 for node in unsynced.iter() {
800 assert_ne!(*node, local);
801 let t = time::Duration::from_secs(1);
802 synced_result.insert(*node, SyncStatus::Synced { duration: t });
803 successes += 1;
804 match announcer.synced_with(*node, t) {
805 ControlFlow::Continue(progress) => {
806 assert_eq!(progress.synced(), successes)
807 }
808 ControlFlow::Break(stop) => {
809 success = Some(stop);
810 break;
811 }
812 }
813 }
814 for node in preferred_seeds.iter() {
817 let t = time::Duration::from_secs(1);
818 synced_result.insert(*node, SyncStatus::Synced { duration: t });
819 successes += 1;
820 match announcer.synced_with(*node, t) {
821 ControlFlow::Continue(progress) => {
822 assert_eq!(progress.synced(), successes)
823 }
824 ControlFlow::Break(stop) => {
825 success = Some(stop);
826 break;
827 }
828 }
829 }
830
831 assert_eq!(*success.as_ref().unwrap().synced(), synced_result);
832 assert_eq!(
833 success.as_ref().unwrap().outcome(),
834 SuccessfulOutcome::MaxReplicationFactor {
835 preferred: 1,
836 synced: 7,
837 }
838 )
839 }
840
841 #[test]
842 fn announcer_reached_preferred_seeds() {
843 let local = arbitrary::gen::<NodeId>(0);
844 let seeds = arbitrary::set::<NodeId>(10..=10);
845 let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
846 let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
847 let config = AnnouncerConfig::public(
848 local,
849 ReplicationFactor::must_reach(11),
850 preferred_seeds.clone(),
851 BTreeSet::new(),
852 unsynced.clone(),
853 );
854 let mut announcer = Announcer::new(config).unwrap();
855
856 let mut synced_result = BTreeMap::new();
857 let mut success = None;
858 let mut successes = 0;
859
860 for node in preferred_seeds.iter() {
863 assert_ne!(*node, local);
864 let t = time::Duration::from_secs(1);
865 synced_result.insert(*node, SyncStatus::Synced { duration: t });
866 successes += 1;
867 match announcer.synced_with(*node, t) {
868 ControlFlow::Continue(progress) => {
869 assert_eq!(progress.synced(), successes)
870 }
871 ControlFlow::Break(stop) => {
872 success = Some(stop);
873 break;
874 }
875 }
876 }
877
878 assert_eq!(*success.as_ref().unwrap().synced(), synced_result);
879 assert_eq!(
880 success.as_ref().unwrap().outcome(),
881 SuccessfulOutcome::PreferredNodes {
882 preferred: 2,
883 total_nodes_synced: 2,
884 }
885 )
886 }
887
888 #[test]
889 fn announcer_timed_out() {
890 let local = arbitrary::gen::<NodeId>(0);
891 let seeds = arbitrary::set::<NodeId>(10..=10);
892 let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
893 let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
894 let config = AnnouncerConfig::public(
895 local,
896 ReplicationFactor::must_reach(11),
897 preferred_seeds.clone(),
898 BTreeSet::new(),
899 unsynced.clone(),
900 );
901 let mut announcer = Announcer::new(config).unwrap();
902 let to_sync = announcer.to_sync();
903 assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());
904
905 let mut synced_result = BTreeMap::new();
906 let mut announcer_result = None;
907 let mut successes = 0;
908
909 for node in to_sync.iter() {
911 assert_ne!(*node, local);
912 if successes > 5 {
913 announcer_result = Some(announcer.timed_out());
914 break;
915 }
916 if preferred_seeds.contains(node) {
918 continue;
919 }
920 let t = time::Duration::from_secs(1);
921 synced_result.insert(*node, SyncStatus::Synced { duration: t });
922 successes += 1;
923 match announcer.synced_with(*node, t) {
924 ControlFlow::Continue(progress) => {
925 assert_eq!(progress.synced(), successes)
926 }
927 ControlFlow::Break(stop) => {
928 announcer_result = Some(stop.into());
929 break;
930 }
931 }
932 }
933
934 match announcer_result {
935 Some(AnnouncerResult::TimedOut(timeout)) => {
936 assert_eq!(timeout.synced, synced_result);
937 assert_eq!(
938 timeout.timed_out,
939 to_sync
940 .difference(&synced_result.keys().copied().collect())
941 .copied()
942 .collect()
943 );
944 }
945 unexpected => panic!("Expected AnnouncerResult::TimedOut, found: {unexpected:#?}"),
946 }
947 }
948
949 #[test]
950 fn announcer_adapts_target_to_reach() {
951 let local = arbitrary::gen::<NodeId>(0);
952 let unsynced = arbitrary::set::<NodeId>(3..=3)
954 .into_iter()
955 .collect::<BTreeSet<_>>();
956
957 let config = AnnouncerConfig::public(
958 local,
959 ReplicationFactor::must_reach(5), BTreeSet::new(),
961 BTreeSet::new(),
962 unsynced.clone(),
963 );
964
965 let announcer = Announcer::new(config).unwrap();
966 assert_eq!(announcer.target().replicas().lower_bound(), 3);
967 }
968
969 #[test]
970 fn announcer_with_replication_factor_zero_and_preferred_seeds() {
971 let local = arbitrary::gen::<NodeId>(0);
972 let seeds = arbitrary::set::<NodeId>(5..=5);
973
974 let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
975 let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
976
977 let config = AnnouncerConfig::public(
979 local,
980 ReplicationFactor::must_reach(0),
982 preferred_seeds.clone(),
983 BTreeSet::new(),
984 unsynced,
985 );
986
987 let mut announcer = Announcer::new(config).unwrap();
988
989 for &node in &preferred_seeds {
991 let duration = time::Duration::from_secs(1);
992 match announcer.synced_with(node, duration) {
993 ControlFlow::Continue(_) => {} ControlFlow::Break(success) => {
995 assert_eq!(
996 success.outcome(),
997 SuccessfulOutcome::PreferredNodes {
998 preferred: preferred_seeds.len(),
999 total_nodes_synced: preferred_seeds.len()
1000 },
1001 "Should succeed with preferred seeds even with zero replication factor"
1002 );
1003 return;
1004 }
1005 }
1006 }
1007
1008 panic!("Should have succeeded with preferred seeds");
1009 }
1010
1011 #[test]
1012 fn announcer_synced_with_unknown_node() {
1013 let local = arbitrary::gen::<NodeId>(0);
1014 let seeds = arbitrary::set::<NodeId>(5..=5);
1015
1016 let unsynced = seeds.iter().take(3).copied().collect::<BTreeSet<_>>();
1017 let unknown_node = arbitrary::gen::<NodeId>(100); let config = AnnouncerConfig::public(
1020 local,
1021 ReplicationFactor::must_reach(1),
1022 BTreeSet::new(),
1023 BTreeSet::new(),
1024 unsynced.clone(),
1025 );
1026
1027 let mut announcer = Announcer::new(config).unwrap();
1028
1029 let duration = time::Duration::from_secs(1);
1031 let mut target_reached = false;
1032 match announcer.synced_with(unknown_node, duration) {
1033 ControlFlow::Continue(_) => {}
1034 ControlFlow::Break(success) => {
1035 target_reached = true;
1036 assert_eq!(
1037 success.outcome(),
1038 SuccessfulOutcome::MinReplicationFactor {
1039 preferred: 0,
1040 synced: 1
1041 },
1042 "Should be able to reach target with unknown node"
1043 );
1044 }
1045 }
1046
1047 assert!(target_reached);
1048 assert!(
1050 announcer.synced.contains_key(&unknown_node),
1051 "Unknown node should be added to synced map"
1052 );
1053 }
1054
1055 #[test]
1056 fn synced_with_same_node_multiple_times() {
1057 let local = arbitrary::gen::<NodeId>(0);
1058 let unsynced = arbitrary::set::<NodeId>(3..=3)
1059 .into_iter()
1060 .collect::<BTreeSet<_>>();
1061
1062 let config = AnnouncerConfig::public(
1063 local,
1064 ReplicationFactor::must_reach(2),
1065 BTreeSet::new(),
1066 BTreeSet::new(),
1067 unsynced.clone(),
1068 );
1069
1070 let mut announcer = Announcer::new(config).unwrap();
1071 let target_node = *unsynced.iter().next().unwrap();
1072
1073 let duration1 = time::Duration::from_secs(1);
1075 match announcer.synced_with(target_node, duration1) {
1076 ControlFlow::Continue(progress) => {
1077 assert_eq!(progress.synced(), 1, "First sync should count");
1078 assert_eq!(
1079 progress.unsynced(),
1080 unsynced.len() - 1,
1081 "Should decrease unsynced"
1082 );
1083 }
1084 ControlFlow::Break(_) => panic!("Should not reach target yet"),
1085 }
1086
1087 let duration2 = time::Duration::from_secs(5);
1089 let progress_before_duplicate = announcer.progress();
1090 match announcer.synced_with(target_node, duration2) {
1091 ControlFlow::Continue(progress) => {
1092 assert_eq!(
1094 progress.synced(),
1095 progress_before_duplicate.synced(),
1096 "Duplicate sync should not change synced count"
1097 );
1098 assert_eq!(
1099 progress.unsynced(),
1100 progress_before_duplicate.unsynced(),
1101 "Duplicate sync should not change unsynced count"
1102 );
1103 }
1104 ControlFlow::Break(_) => panic!("Should not reach target with duplicate sync"),
1105 }
1106
1107 assert_eq!(
1109 announcer.synced[&target_node],
1110 SyncStatus::Synced {
1111 duration: duration2
1112 },
1113 "Duplicate sync should update the duration"
1114 );
1115
1116 assert!(
1118 !announcer.to_sync.contains(&target_node),
1119 "Node should not be in to_sync after first sync"
1120 );
1121 }
1122
1123 #[test]
1124 fn timed_out_after_reaching_success() {
1125 let local = arbitrary::gen::<NodeId>(0);
1126 let unsynced = arbitrary::set::<NodeId>(3..=3)
1127 .into_iter()
1128 .collect::<BTreeSet<_>>();
1129
1130 let config = AnnouncerConfig::public(
1131 local,
1132 ReplicationFactor::must_reach(2),
1133 BTreeSet::new(),
1134 BTreeSet::new(),
1135 unsynced.clone(),
1136 );
1137
1138 let mut announcer = Announcer::new(config).unwrap();
1139
1140 let mut synced_nodes = BTreeMap::new();
1142 for node in unsynced {
1143 let duration = time::Duration::from_secs(1);
1144 synced_nodes.insert(node, SyncStatus::Synced { duration });
1145
1146 match announcer.synced_with(node, duration) {
1147 ControlFlow::Continue(_) => continue,
1148 ControlFlow::Break(_) => break, }
1150 }
1151
1152 match announcer.timed_out() {
1154 AnnouncerResult::Success(success) => {
1155 assert_eq!(
1157 success.outcome(),
1158 SuccessfulOutcome::MinReplicationFactor {
1159 preferred: 0,
1160 synced: 2
1161 },
1162 "Should return success outcome even when called via timed_out"
1163 );
1164 }
1165 other => panic!("Expected Success via timed_out, got: {other:?}"),
1166 }
1167 }
1168
1169 #[test]
1170 fn construct_only_preferred_seeds_provided() {
1171 let local = arbitrary::gen::<NodeId>(0);
1174 let preferred_seeds = arbitrary::set::<NodeId>(2..=2)
1175 .into_iter()
1176 .collect::<BTreeSet<_>>();
1177
1178 let config = AnnouncerConfig::public(
1179 local,
1180 ReplicationFactor::must_reach(1),
1181 preferred_seeds.clone(),
1182 BTreeSet::new(),
1183 BTreeSet::new(),
1184 );
1185
1186 let announcer = Announcer::new(config).unwrap();
1187
1188 assert_eq!(announcer.to_sync, preferred_seeds);
1190 assert_eq!(announcer.target().preferred_seeds(), &preferred_seeds);
1191 assert!(announcer.synced.is_empty());
1192 }
1193
1194 #[test]
1195 fn construct_node_appears_in_multiple_input_sets() {
1196 let local = arbitrary::gen::<NodeId>(0);
1197 let alice = arbitrary::gen::<NodeId>(1);
1198 let bob = arbitrary::gen::<NodeId>(2);
1199 let eve = arbitrary::gen::<NodeId>(3);
1200
1201 let synced = [alice].iter().copied().collect::<BTreeSet<_>>();
1203 let unsynced = [alice, bob, eve].iter().copied().collect::<BTreeSet<_>>();
1204
1205 let config = AnnouncerConfig::public(
1206 local,
1207 ReplicationFactor::must_reach(2),
1208 BTreeSet::new(),
1209 synced,
1210 unsynced,
1211 );
1212
1213 let announcer = Announcer::new(config).unwrap();
1214
1215 assert!(
1217 announcer.synced.contains_key(&alice),
1218 "alice should be synced"
1219 );
1220 assert!(
1221 !announcer.to_sync.contains(&alice),
1222 "alice should not appear in to_sync"
1223 );
1224 assert!(
1226 announcer.to_sync.contains(&bob) && announcer.to_sync.contains(&eve),
1227 "Other node should be in to_sync"
1228 );
1229 }
1230
1231 #[test]
1232 fn cannot_construct_announcer() {
1233 let local = arbitrary::gen::<NodeId>(0);
1234 let seeds = arbitrary::set::<NodeId>(10..=10);
1235 let synced = seeds.iter().take(3).copied().collect::<BTreeSet<_>>();
1236 let unsynced = seeds.iter().skip(3).copied().collect::<BTreeSet<_>>();
1237 let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
1238 let replicas = ReplicationFactor::default();
1239 let config = AnnouncerConfig::public(
1240 local,
1241 ReplicationFactor::default(),
1242 BTreeSet::new(),
1243 BTreeSet::new(),
1244 BTreeSet::new(),
1245 );
1246 assert!(matches!(
1247 Announcer::new(config),
1248 Err(AnnouncerError::NoSeeds)
1249 ));
1250
1251 let config = AnnouncerConfig::public(
1253 local,
1254 replicas,
1255 preferred_seeds.clone(),
1256 synced.clone(),
1257 BTreeSet::new(),
1258 );
1259 assert!(matches!(
1260 Announcer::new(config),
1261 Err(AnnouncerError::AlreadySynced { .. })
1262 ));
1263
1264 let config = AnnouncerConfig::public(
1265 local,
1266 ReplicationFactor::must_reach(0),
1267 BTreeSet::new(),
1268 synced.clone(),
1269 unsynced.clone(),
1270 );
1271 assert!(matches!(
1272 Announcer::new(config),
1273 Err(AnnouncerError::Target(_))
1274 ));
1275
1276 let config = AnnouncerConfig::public(
1277 local,
1278 ReplicationFactor::MustReach(2),
1279 preferred_seeds.clone(),
1280 synced.clone(),
1281 unsynced.clone(),
1282 );
1283 assert!(matches!(
1285 Announcer::new(config),
1286 Err(AnnouncerError::AlreadySynced { .. })
1287 ));
1288 let config = AnnouncerConfig::public(
1289 local,
1290 ReplicationFactor::range(2, 3),
1291 preferred_seeds,
1292 synced,
1293 unsynced,
1294 );
1295 assert!(matches!(
1297 Announcer::new(config),
1298 Err(AnnouncerError::AlreadySynced { .. })
1299 ));
1300 }
1301
1302 #[test]
1303 fn invariant_progress_should_match_state() {
1304 let local = arbitrary::gen::<NodeId>(0);
1305 let seeds = arbitrary::set::<NodeId>(6..=6);
1306
1307 let already_synced = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
1309 let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
1310
1311 let config = AnnouncerConfig::public(
1312 local,
1313 ReplicationFactor::must_reach(4), BTreeSet::new(), already_synced.clone(),
1316 unsynced.clone(),
1317 );
1318
1319 let mut announcer = Announcer::new(config).unwrap();
1320
1321 assert_eq!(
1323 announcer.progress().unsynced(),
1324 announcer.to_sync().len(),
1325 "Expected unsynced progress to be the same as the number of nodes to sync"
1326 );
1327
1328 assert_eq!(
1330 announcer.progress().synced(),
1331 already_synced.len(),
1332 "Initial synced count should equal already synced nodes"
1333 );
1334
1335 let first_unsynced = *unsynced.iter().next().unwrap();
1337 let duration = time::Duration::from_secs(1);
1338
1339 match announcer.synced_with(first_unsynced, duration) {
1340 ControlFlow::Continue(progress) => {
1341 assert_eq!(
1342 progress.synced(),
1343 already_synced.len() + 1,
1344 "Synced count should increase by 1"
1345 );
1346
1347 assert_eq!(
1348 progress.unsynced(),
1349 announcer.to_sync().len(),
1350 "Unsynced count should equal remaining to_sync length"
1351 );
1352
1353 assert_eq!(
1354 progress.unsynced(),
1355 unsynced.len() - 1,
1356 "Unsynced should be original unsynced count minus nodes we've synced"
1357 );
1358 }
1359 ControlFlow::Break(outcome) => {
1360 panic!("Should not have reached target yet: {outcome:?}")
1361 }
1362 }
1363
1364 let final_progress = announcer.progress();
1367 let expected_total = already_synced.len() + unsynced.len();
1368 let actual_total = final_progress.synced() + final_progress.unsynced();
1369
1370 assert_eq!(
1371 actual_total, expected_total,
1372 "synced + unsynced should equal the total nodes we started with"
1373 );
1374 }
1375
1376 #[test]
1377 fn local_node_in_preferred_seeds() {
1378 let local = arbitrary::gen::<NodeId>(0);
1379 let other_seeds = arbitrary::set::<NodeId>(5..=5);
1380
1381 let mut preferred_seeds = other_seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
1383 preferred_seeds.insert(local);
1384
1385 let unsynced = other_seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
1386
1387 let config = AnnouncerConfig::public(
1388 local,
1389 ReplicationFactor::must_reach(3),
1390 preferred_seeds.clone(),
1391 BTreeSet::new(),
1392 unsynced.clone(),
1393 );
1394
1395 let announcer = Announcer::new(config).unwrap();
1396
1397 assert!(
1399 !announcer.target().preferred_seeds().contains(&local),
1400 "Local node should be removed from preferred seeds in target"
1401 );
1402
1403 assert!(
1405 !announcer.to_sync().contains(&local),
1406 "Local node should not be in to_sync set"
1407 );
1408
1409 assert_eq!(
1411 announcer.target().preferred_seeds().len(),
1412 preferred_seeds.len() - 1,
1413 "Target should have local node removed from preferred seeds"
1414 );
1415 }
1416
1417 #[test]
1418 fn local_node_in_synced_set() {
1419 let local = arbitrary::gen::<NodeId>(0);
1420 let other_seeds = arbitrary::set::<NodeId>(5..=5);
1421
1422 let mut synced = other_seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
1424 synced.insert(local);
1425
1426 let unsynced = other_seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
1427
1428 let config = AnnouncerConfig::public(
1429 local,
1430 ReplicationFactor::must_reach(4),
1431 BTreeSet::new(),
1432 synced.clone(),
1433 unsynced.clone(),
1434 );
1435
1436 let announcer = Announcer::new(config).unwrap();
1437
1438 assert!(
1440 !announcer.synced.contains_key(&local),
1441 "Local node should not be in internal synced map"
1442 );
1443
1444 assert_eq!(
1446 announcer.progress().synced(),
1447 synced.len() - 1,
1448 "Progress should not count local node as synced"
1449 );
1450 }
1451
1452 #[test]
1453 fn local_node_in_unsynced_set() {
1454 let local = arbitrary::gen::<NodeId>(0);
1455 let other_seeds = arbitrary::set::<NodeId>(5..=5);
1456
1457 let synced = other_seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
1458
1459 let mut unsynced = other_seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
1461 unsynced.insert(local);
1462
1463 let config = AnnouncerConfig::public(
1464 local,
1465 ReplicationFactor::must_reach(4),
1466 BTreeSet::new(),
1467 synced.clone(),
1468 unsynced.clone(),
1469 );
1470
1471 let announcer = Announcer::new(config).unwrap();
1472
1473 assert!(
1475 !announcer.to_sync().contains(&local),
1476 "Local node should not be in to_sync set"
1477 );
1478
1479 assert!(
1481 !announcer.to_sync.contains(&local),
1482 "Internal to_sync should not contain local node"
1483 );
1484
1485 assert_eq!(
1487 announcer.progress().unsynced(),
1488 unsynced.len() - 1,
1489 "Progress unsynced should not count local node"
1490 );
1491 }
1492
1493 #[test]
1494 fn local_node_in_multiple_sets() {
1495 let local = arbitrary::gen::<NodeId>(0);
1496 let other_seeds = arbitrary::set::<NodeId>(5..=5);
1497
1498 let mut preferred_seeds = other_seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
1500 preferred_seeds.insert(local);
1501
1502 let mut synced = other_seeds
1503 .iter()
1504 .skip(2)
1505 .take(1)
1506 .copied()
1507 .collect::<BTreeSet<_>>();
1508 synced.insert(local);
1509
1510 let mut unsynced = other_seeds.iter().skip(3).copied().collect::<BTreeSet<_>>();
1511 unsynced.insert(local);
1512
1513 let config = AnnouncerConfig::public(
1514 local,
1515 ReplicationFactor::must_reach(3),
1516 preferred_seeds.clone(),
1517 synced.clone(),
1518 unsynced.clone(),
1519 );
1520
1521 let announcer = Announcer::new(config).unwrap();
1522
1523 assert!(
1525 !announcer.target().preferred_seeds().contains(&local),
1526 "Local node should be removed from preferred seeds"
1527 );
1528 assert!(
1529 !announcer.synced.contains_key(&local),
1530 "Local node should not be in synced map"
1531 );
1532 assert!(
1533 !announcer.to_sync().contains(&local),
1534 "Local node should not be in to_sync"
1535 );
1536 assert!(
1537 !announcer.to_sync.contains(&local),
1538 "Local node should not be in internal to_sync"
1539 );
1540
1541 assert_eq!(
1543 announcer.target().preferred_seeds().len(),
1544 preferred_seeds.len() - 1
1545 );
1546 assert_eq!(announcer.progress().synced(), synced.len() - 1);
1547 assert_eq!(
1550 announcer.progress().unsynced(),
1551 (unsynced.len() - 1) + (preferred_seeds.len() - 1)
1552 );
1553 }
1554
1555 #[test]
1556 fn synced_with_local_node_is_ignored() {
1557 let local = arbitrary::gen::<NodeId>(0);
1558 let unsynced = arbitrary::set::<NodeId>(3..=3).into_iter().collect();
1559
1560 let config = AnnouncerConfig::public(
1561 local,
1562 ReplicationFactor::must_reach(2),
1563 BTreeSet::new(),
1564 BTreeSet::new(),
1565 unsynced,
1566 );
1567
1568 let mut announcer = Announcer::new(config).unwrap();
1569 let initial_progress = announcer.progress();
1570
1571 let duration = time::Duration::from_secs(1);
1573 match announcer.synced_with(local, duration) {
1574 ControlFlow::Continue(progress) => {
1575 assert_eq!(
1577 progress.synced(),
1578 initial_progress.synced(),
1579 "Syncing with local node should not change synced count"
1580 );
1581 assert_eq!(
1582 progress.unsynced(),
1583 initial_progress.unsynced(),
1584 "Syncing with local node should not change unsynced count"
1585 );
1586 }
1587 ControlFlow::Break(_) => panic!("Should not reach target by syncing with local node"),
1588 }
1589
1590 assert!(
1592 !announcer.synced.contains_key(&local),
1593 "Local node should not be added to synced map"
1594 );
1595 }
1596
1597 #[test]
1598 fn local_node_only_in_all_sets_results_in_no_seeds_error() {
1599 let local = arbitrary::gen::<NodeId>(0);
1600
1601 let preferred_seeds = [local].iter().copied().collect::<BTreeSet<_>>();
1603 let synced = [local].iter().copied().collect::<BTreeSet<_>>();
1604 let unsynced = [local].iter().copied().collect::<BTreeSet<_>>();
1605
1606 let config = AnnouncerConfig::public(
1607 local,
1608 ReplicationFactor::must_reach(1),
1609 preferred_seeds,
1610 synced,
1611 unsynced,
1612 );
1613
1614 assert_matches!(Announcer::new(config), Err(AnnouncerError::NoSeeds));
1616 }
1617}