radicle/node/sync/
announce.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt;
3use std::ops::ControlFlow;
4use std::time;
5
6use crate::node::NodeId;
7
8use super::{PrivateNetwork, ReplicationFactor};
9
10#[derive(Debug)]
11pub struct Announcer {
12    local_node: NodeId,
13    target: Target,
14    synced: BTreeMap<NodeId, SyncStatus>,
15    to_sync: BTreeSet<NodeId>,
16}
17
18impl Announcer {
19    /// Construct a new [`Announcer`] from the [`AnnouncerConfig`].
20    ///
21    /// This will ensure that the local [`NodeId`], provided in the
22    /// [`AnnouncerConfig`], will be removed from all sets.
23    ///
24    /// # Errors
25    ///
26    /// Returns the following errors:
27    ///
28    ///   - [`AnnouncerError::NoSeeds`]: both sets of already synchronized and
29    ///     un-synchronized nodes were empty
30    ///     of nodes were empty
31    ///   - [`AnnouncerError::AlreadySynced`]: no more nodes are available for
32    ///     synchronizing with
33    ///   - [`AnnouncerError::Target`]: the target has no preferred seeds and no
34    ///     replicas
35    pub fn new(mut config: AnnouncerConfig) -> Result<Self, AnnouncerError> {
36        // N.b. ensure that local node is in none of the sets
37        config.preferred_seeds.remove(&config.local_node);
38        config.synced.remove(&config.local_node);
39        config.unsynced.remove(&config.local_node);
40
41        // N.b extend the unsynced set with any preferred seeds that are not yet
42        // synced
43        let unsynced_preferred = config
44            .preferred_seeds
45            .difference(&config.synced)
46            .copied()
47            .collect::<BTreeSet<_>>();
48        config.unsynced.extend(unsynced_preferred);
49
50        // Ensure that the unsynced set does not contain any of the synced set –
51        // we trust that the synced nodes are already synced with
52        let to_sync = config
53            .unsynced
54            .difference(&config.synced)
55            .copied()
56            .collect::<BTreeSet<_>>();
57
58        if config.synced.is_empty() && to_sync.is_empty() {
59            return Err(AnnouncerError::NoSeeds);
60        }
61
62        if to_sync.is_empty() {
63            let preferred = config.synced.intersection(&config.preferred_seeds).count();
64            return Err(AlreadySynced {
65                preferred,
66                synced: config.synced.len(),
67            }
68            .into());
69        }
70
71        let replicas = config.replicas.min(to_sync.len());
72        let announcer = Self {
73            local_node: config.local_node,
74            target: Target::new(config.preferred_seeds, replicas)
75                .map_err(AnnouncerError::Target)?,
76            synced: config
77                .synced
78                .into_iter()
79                .map(|nid| (nid, SyncStatus::AlreadySynced))
80                .collect(),
81            to_sync,
82        };
83        match announcer.is_target_reached() {
84            None => Ok(announcer),
85            Some(outcome) => match outcome {
86                SuccessfulOutcome::MinReplicationFactor { preferred, synced } => {
87                    Err(AlreadySynced { preferred, synced }.into())
88                }
89                SuccessfulOutcome::MaxReplicationFactor { preferred, synced } => {
90                    Err(AlreadySynced { preferred, synced }.into())
91                }
92                SuccessfulOutcome::PreferredNodes {
93                    preferred,
94                    total_nodes_synced,
95                } => Err(AlreadySynced {
96                    preferred,
97                    synced: total_nodes_synced,
98                }
99                .into()),
100            },
101        }
102    }
103
104    /// Mark the `node` as synchronized, with the given `duration` it took to
105    /// synchronize with.
106    ///
107    /// If the target for the [`Announcer`] has been reached, then a [`Success`] is
108    /// returned via [`ControlFlow::Break`]. Otherwise, [`Progress`] is returned
109    /// via [`ControlFlow::Continue`].
110    ///
111    /// The caller decides whether they wish to continue the announcement process.
112    pub fn synced_with(
113        &mut self,
114        node: NodeId,
115        duration: time::Duration,
116    ) -> ControlFlow<Success, Progress> {
117        if node == self.local_node {
118            return ControlFlow::Continue(self.progress());
119        }
120        self.to_sync.remove(&node);
121        self.synced.insert(node, SyncStatus::Synced { duration });
122        self.finished()
123    }
124
125    /// Complete the [`Announcer`] process returning a [`AnnouncerResult`].
126    ///
127    /// If the target for the [`Announcer`] has been reached, then the result
128    /// will be [`AnnouncerResult::Success`], otherwise, it will be
129    /// [`AnnouncerResult::TimedOut`].
130    pub fn timed_out(self) -> AnnouncerResult {
131        match self.is_target_reached() {
132            None => TimedOut {
133                synced: self.synced,
134                timed_out: self.to_sync,
135            }
136            .into(),
137            Some(outcome) => Success {
138                outcome,
139                synced: self.synced,
140            }
141            .into(),
142        }
143    }
144
145    /// Check if the [`Announcer`] can continue synchronizing with more nodes.
146    /// If there are no more nodes, then [`NoNodes`] is returned in the
147    /// [`ControlFlow::Break`], otherwise the [`Announcer`] is returned as-is in
148    /// the [`ControlFlow::Continue`].
149    // TODO(finto): I'm not sure this is needed with the change to the target
150    // logic. Since we can reach the replication factor OR the preferred seeds,
151    // AND the replication factor is always capped to the maximum number of
152    // seeds to sync with, I don't think we can ever reach a case where
153    // `can_continue` hits the `Break`.
154    pub fn can_continue(self) -> ControlFlow<NoNodes, Self> {
155        if self.to_sync.is_empty() {
156            ControlFlow::Break(NoNodes {
157                synced: self.synced,
158            })
159        } else {
160            ControlFlow::Continue(self)
161        }
162    }
163
164    /// Get all the nodes to be synchronized with.
165    pub fn to_sync(&self) -> BTreeSet<NodeId> {
166        self.to_sync
167            .iter()
168            .filter(|node| *node != &self.local_node)
169            .copied()
170            .collect()
171    }
172
173    /// Get the [`Target`] of the [`Announcer`].
174    pub fn target(&self) -> &Target {
175        &self.target
176    }
177
178    /// Get the [`Progress`] of the [`Announcer`].
179    pub fn progress(&self) -> Progress {
180        let SuccessCounts { preferred, synced } = self.success_counts();
181        let unsynced = self.to_sync.len();
182        Progress {
183            preferred,
184            synced,
185            unsynced,
186        }
187    }
188
189    fn finished(&self) -> ControlFlow<Success, Progress> {
190        let progress = self.progress();
191        self.is_target_reached()
192            .map_or(ControlFlow::Continue(progress), |outcome| {
193                ControlFlow::Break(Success {
194                    outcome,
195                    synced: self.synced.clone(),
196                })
197            })
198    }
199
200    fn is_target_reached(&self) -> Option<SuccessfulOutcome> {
201        // It should not be possible to construct a target that has no preferred
202        // seeds and set the target to 0
203        debug_assert!(self.target.has_preferred_seeds() || self.target.has_replication_factor());
204
205        let SuccessCounts { preferred, synced } = self.success_counts();
206        if self.target.has_preferred_seeds() && preferred >= self.target.preferred_seeds.len() {
207            Some(SuccessfulOutcome::PreferredNodes {
208                preferred: self.target.preferred_seeds.len(),
209                total_nodes_synced: synced,
210            })
211        } else {
212            // The only target to hit is preferred seeds
213            if !self.target.has_replication_factor() {
214                return None;
215            }
216            let replicas = self.target.replicas();
217            let min = replicas.lower_bound();
218            match replicas.upper_bound() {
219                None => (synced >= min)
220                    .then_some(SuccessfulOutcome::MinReplicationFactor { preferred, synced }),
221                Some(max) => (synced >= max)
222                    .then_some(SuccessfulOutcome::MaxReplicationFactor { preferred, synced }),
223            }
224        }
225    }
226
227    fn success_counts(&self) -> SuccessCounts {
228        self.synced
229            .keys()
230            .fold(SuccessCounts::default(), |counts, nid| {
231                if self.target.preferred_seeds.contains(nid) {
232                    counts.preferred().synced()
233                } else {
234                    counts.synced()
235                }
236            })
237    }
238}
239
240#[derive(Default)]
241struct SuccessCounts {
242    preferred: usize,
243    synced: usize,
244}
245
246impl SuccessCounts {
247    fn synced(self) -> Self {
248        Self {
249            synced: self.synced + 1,
250            ..self
251        }
252    }
253
254    fn preferred(self) -> Self {
255        Self {
256            preferred: self.preferred + 1,
257            ..self
258        }
259    }
260}
261
262/// Configuration of the [`Announcer`].
263#[derive(Clone, Debug)]
264pub struct AnnouncerConfig {
265    local_node: NodeId,
266    replicas: ReplicationFactor,
267    preferred_seeds: BTreeSet<NodeId>,
268    synced: BTreeSet<NodeId>,
269    unsynced: BTreeSet<NodeId>,
270}
271
272impl AnnouncerConfig {
273    /// Setup a private network `AnnouncerConfig`, populating the
274    /// [`AnnouncerConfig`]'s preferred seeds with the allowed set from the
275    /// [`PrivateNetwork`].
276    ///
277    /// `replicas` is the target number of seeds the [`Announcer`] should reach
278    /// before stopping.
279    ///
280    /// `local` is the [`NodeId`] of the local node, to ensure it is
281    /// excluded from the [`Announcer`] process.
282    pub fn private(local: NodeId, replicas: ReplicationFactor, network: PrivateNetwork) -> Self {
283        AnnouncerConfig {
284            local_node: local,
285            replicas,
286            preferred_seeds: network.allowed.clone(),
287            // TODO(finto): we should check if the seeds are synced with instead
288            // of assuming they haven't been yet.
289            synced: BTreeSet::new(),
290            unsynced: network.allowed,
291        }
292    }
293
294    /// Setup a public `AnnouncerConfig`.
295    ///
296    /// `preferred_seeds` is the target set of preferred seeds that [`Announcer`] should
297    /// attempt to synchronize with.
298    ///
299    /// `synced` and `unsynced` are the set of nodes that are currently
300    /// synchronized and un-synchronized with, respectively.
301    ///
302    /// `replicas` is the target number of seeds the [`Announcer`] should reach
303    /// before stopping.
304    ///
305    /// `local` is the [`NodeId`] of the local node, to ensure it is
306    /// excluded from the [`Announcer`] process.
307    pub fn public(
308        local: NodeId,
309        replicas: ReplicationFactor,
310        preferred_seeds: BTreeSet<NodeId>,
311        synced: BTreeSet<NodeId>,
312        unsynced: BTreeSet<NodeId>,
313    ) -> Self {
314        Self {
315            local_node: local,
316            replicas,
317            preferred_seeds,
318            synced,
319            unsynced,
320        }
321    }
322}
323
324/// Result of running an [`Announcer`] process.
325#[derive(Debug)]
326pub enum AnnouncerResult {
327    /// The target of the [`Announcer`] was successfully met.
328    Success(Success),
329    /// The [`Announcer`] process was timed out, and all un-synchronized nodes
330    /// are marked as timed out.
331    ///
332    /// Note that some nodes still may have synchronized.
333    TimedOut(TimedOut),
334    /// The [`Announcer`] ran out of nodes to synchronize with.
335    ///
336    /// Note that some nodes still may have synchronized.
337    NoNodes(NoNodes),
338}
339
340impl AnnouncerResult {
341    /// Get the synchronized nodes, regardless of the result.
342    pub fn synced(&self) -> &BTreeMap<NodeId, SyncStatus> {
343        match self {
344            AnnouncerResult::Success(Success { synced, .. }) => synced,
345            AnnouncerResult::TimedOut(TimedOut { synced, .. }) => synced,
346            AnnouncerResult::NoNodes(NoNodes { synced }) => synced,
347        }
348    }
349
350    /// Check if a given node is synchronized with.
351    pub fn is_synced(&self, node: &NodeId) -> bool {
352        let synced = self.synced();
353        synced.contains_key(node)
354    }
355}
356
357impl From<Success> for AnnouncerResult {
358    fn from(s: Success) -> Self {
359        Self::Success(s)
360    }
361}
362
363impl From<TimedOut> for AnnouncerResult {
364    fn from(to: TimedOut) -> Self {
365        Self::TimedOut(to)
366    }
367}
368
369impl From<NoNodes> for AnnouncerResult {
370    fn from(no: NoNodes) -> Self {
371        Self::NoNodes(no)
372    }
373}
374
375#[derive(Debug)]
376pub struct NoNodes {
377    synced: BTreeMap<NodeId, SyncStatus>,
378}
379
380impl NoNodes {
381    /// Get the set of synchronized nodes
382    pub fn synced(&self) -> &BTreeMap<NodeId, SyncStatus> {
383        &self.synced
384    }
385}
386
387#[derive(Debug)]
388pub struct TimedOut {
389    synced: BTreeMap<NodeId, SyncStatus>,
390    timed_out: BTreeSet<NodeId>,
391}
392
393impl TimedOut {
394    /// Get the set of synchronized nodes
395    pub fn synced(&self) -> &BTreeMap<NodeId, SyncStatus> {
396        &self.synced
397    }
398
399    /// Get the set of timed out nodes
400    pub fn timed_out(&self) -> &BTreeSet<NodeId> {
401        &self.timed_out
402    }
403}
404
405#[derive(Debug)]
406pub struct Success {
407    outcome: SuccessfulOutcome,
408    synced: BTreeMap<NodeId, SyncStatus>,
409}
410
411impl Success {
412    /// Get the [`SuccessfulOutcome`] of the success.
413    pub fn outcome(&self) -> SuccessfulOutcome {
414        self.outcome
415    }
416
417    /// Get the set of synchronized nodes.
418    pub fn synced(&self) -> &BTreeMap<NodeId, SyncStatus> {
419        &self.synced
420    }
421}
422
423/// Error in constructing the [`Announcer`].
424#[derive(Debug, PartialEq, Eq)]
425pub enum AnnouncerError {
426    /// Both sets of already synchronized and un-synchronized nodes were empty
427    /// of nodes were empty.
428    AlreadySynced(AlreadySynced),
429    /// No more nodes are available for synchronizing with.
430    NoSeeds,
431    /// The target could not be constructed.
432    Target(TargetError),
433}
434
435impl fmt::Display for AnnouncerError {
436    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
437        match self {
438            AnnouncerError::AlreadySynced(AlreadySynced { preferred, synced }) => write!(
439                f,
440                "already synchronized with {synced} nodes ({preferred} preferred nodes)"
441            ),
442            AnnouncerError::NoSeeds => {
443                f.write_str("no more nodes are available for synchronizing with")
444            }
445            AnnouncerError::Target(target_error) => target_error.fmt(f),
446        }
447    }
448}
449
450impl std::error::Error for AnnouncerError {}
451
452impl From<AlreadySynced> for AnnouncerError {
453    fn from(value: AlreadySynced) -> Self {
454        Self::AlreadySynced(value)
455    }
456}
457
458#[derive(Debug, PartialEq, Eq)]
459pub struct AlreadySynced {
460    /// The number of preferred nodes that are synchronized.
461    preferred: usize,
462    /// Total number nodes that are synchronized.
463    ///
464    /// Note that this includes [`AlreadySynced::preferred`].
465    synced: usize,
466}
467
468impl AlreadySynced {
469    /// Get the number of preferred nodes that are already synchronized.
470    pub fn preferred(&self) -> usize {
471        self.preferred
472    }
473
474    /// Get the total number of nodes that are already synchronized.
475    pub fn synced(&self) -> usize {
476        self.synced
477    }
478}
479
480/// The status of the synchronized node.
481#[derive(Clone, Copy, Debug, PartialEq, Eq)]
482pub enum SyncStatus {
483    /// The node was already synchronized before starting the [`Announcer`]
484    /// process.
485    AlreadySynced,
486    /// The node was synchronized as part of the [`Announcer`] process, marking
487    /// the amount of time that passed to synchronize with the node.
488    Synced { duration: time::Duration },
489}
490
491/// Progress of the [`Announcer`] process.
492#[derive(Clone, Copy, Debug, PartialEq, Eq)]
493pub struct Progress {
494    preferred: usize,
495    synced: usize,
496    unsynced: usize,
497}
498
499impl Progress {
500    /// The number of preferred seeds that are synchronized.
501    pub fn preferred(&self) -> usize {
502        self.preferred
503    }
504
505    /// The number of seeds that are synchronized.
506    pub fn synced(&self) -> usize {
507        self.synced
508    }
509
510    /// The number of seeds that are un-synchronized.
511    pub fn unsynced(&self) -> usize {
512        self.unsynced
513    }
514}
515
516#[derive(Debug, thiserror::Error, PartialEq, Eq)]
517#[non_exhaustive]
518#[error("a minimum number of replicas or set of preferred seeds must be provided")]
519pub struct TargetError;
520
521/// The target for the [`Announcer`] to reach.
522#[derive(Clone, Debug, PartialEq, Eq)]
523pub struct Target {
524    preferred_seeds: BTreeSet<NodeId>,
525    replicas: ReplicationFactor,
526}
527
528impl Target {
529    pub fn new(
530        preferred_seeds: BTreeSet<NodeId>,
531        replicas: ReplicationFactor,
532    ) -> Result<Self, TargetError> {
533        if replicas.lower_bound() == 0 && preferred_seeds.is_empty() {
534            Err(TargetError)
535        } else {
536            Ok(Self {
537                preferred_seeds,
538                replicas,
539            })
540        }
541    }
542
543    /// Get the set of preferred seeds that are trying to be synchronized with.
544    pub fn preferred_seeds(&self) -> &BTreeSet<NodeId> {
545        &self.preferred_seeds
546    }
547
548    /// Get the number of replicas that is trying to be reached.
549    pub fn replicas(&self) -> &ReplicationFactor {
550        &self.replicas
551    }
552
553    /// Check if the target has preferred seeds
554    pub fn has_preferred_seeds(&self) -> bool {
555        !self.preferred_seeds.is_empty()
556    }
557
558    /// Check that lower bound of the replication is greater than `0`
559    pub fn has_replication_factor(&self) -> bool {
560        self.replicas.lower_bound() != 0
561    }
562}
563
564#[derive(Clone, Copy, Debug, PartialEq, Eq)]
565pub enum SuccessfulOutcome {
566    MinReplicationFactor {
567        preferred: usize,
568        synced: usize,
569    },
570    MaxReplicationFactor {
571        preferred: usize,
572        synced: usize,
573    },
574    PreferredNodes {
575        preferred: usize,
576        total_nodes_synced: usize,
577    },
578}
579
580#[allow(clippy::unwrap_used)]
581#[cfg(test)]
582mod test {
583    use crate::{assert_matches, test::arbitrary};
584
585    use super::*;
586
587    #[test]
588    fn all_synced_nodes_are_preferred_seeds() {
589        let local = arbitrary::gen::<NodeId>(0);
590        let seeds = arbitrary::set::<NodeId>(5..=5);
591
592        // All preferred seeds, no regular seeds in unsynced
593        let preferred_seeds = seeds.iter().take(3).copied().collect::<BTreeSet<_>>();
594        let unsynced = preferred_seeds.clone(); // Only preferred seeds to sync with
595
596        let config = AnnouncerConfig::public(
597            local,
598            // High target that we won't reach with preferred alone
599            ReplicationFactor::must_reach(5),
600            preferred_seeds.clone(),
601            BTreeSet::new(),
602            unsynced,
603        );
604
605        let mut announcer = Announcer::new(config).unwrap();
606
607        // Sync with all preferred seeds
608        let mut synced_count = 0;
609        let mut result = None;
610        for &node in &preferred_seeds {
611            let duration = time::Duration::from_secs(1);
612            synced_count += 1;
613
614            match announcer.synced_with(node, duration) {
615                ControlFlow::Continue(progress) => {
616                    assert_eq!(
617                        progress.preferred(),
618                        synced_count,
619                        "Preferred count should increment for each preferred seed"
620                    );
621                    assert_eq!(
622                        progress.synced(),
623                        synced_count,
624                        "Total synced should equal preferred since all are preferred"
625                    );
626                }
627                ControlFlow::Break(success) => {
628                    result = Some(success);
629                    break;
630                }
631            }
632        }
633        assert_eq!(
634            result.unwrap().outcome(),
635            SuccessfulOutcome::PreferredNodes {
636                preferred: preferred_seeds.len(),
637                total_nodes_synced: preferred_seeds.len()
638            },
639            "Should succeed with PreferredNodes outcome"
640        );
641    }
642
643    #[test]
644    fn preferred_seeds_already_synced() {
645        let local = arbitrary::gen::<NodeId>(0);
646        let seeds = arbitrary::set::<NodeId>(6..=6);
647
648        let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
649        let already_synced = preferred_seeds.clone(); // Preferred seeds already synced
650        let regular_unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
651
652        let config = AnnouncerConfig::public(
653            local,
654            ReplicationFactor::must_reach(4),
655            preferred_seeds.clone(),
656            already_synced.clone(),
657            regular_unsynced.clone(),
658        );
659
660        assert_eq!(
661            Announcer::new(config).err(),
662            Some(AnnouncerError::AlreadySynced(AlreadySynced {
663                preferred: 2,
664                synced: 2
665            }))
666        );
667    }
668
669    #[test]
670    fn announcer_reached_min_replication_target() {
671        let local = arbitrary::gen::<NodeId>(0);
672        let seeds = arbitrary::set::<NodeId>(10..=10);
673        let unsynced = seeds.iter().skip(3).copied().collect::<BTreeSet<_>>();
674        let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
675        let config = AnnouncerConfig::public(
676            local,
677            ReplicationFactor::must_reach(3),
678            preferred_seeds.clone(),
679            BTreeSet::new(),
680            unsynced.clone(),
681        );
682        let mut announcer = Announcer::new(config).unwrap();
683        let to_sync = announcer.to_sync();
684        assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());
685
686        let mut synced_result = BTreeMap::new();
687        let mut success = None;
688        let mut successes = 0;
689
690        for node in preferred_seeds.iter().take(1) {
691            let t = time::Duration::from_secs(1);
692            synced_result.insert(*node, SyncStatus::Synced { duration: t });
693            successes += 1;
694            match announcer.synced_with(*node, t) {
695                ControlFlow::Continue(progress) => {
696                    assert_eq!(progress.synced(), successes)
697                }
698                ControlFlow::Break(stop) => {
699                    success = Some(stop);
700                    break;
701                }
702            }
703        }
704
705        for node in unsynced.iter() {
706            assert_ne!(*node, local);
707            let t = time::Duration::from_secs(1);
708            synced_result.insert(*node, SyncStatus::Synced { duration: t });
709            successes += 1;
710            match announcer.synced_with(*node, t) {
711                ControlFlow::Continue(progress) => {
712                    assert_eq!(progress.synced(), successes)
713                }
714                ControlFlow::Break(stop) => {
715                    success = Some(stop);
716                    break;
717                }
718            }
719        }
720        assert_eq!(*success.as_ref().unwrap().synced(), synced_result);
721        assert_eq!(
722            success.as_ref().unwrap().outcome(),
723            SuccessfulOutcome::MinReplicationFactor {
724                preferred: 1,
725                synced: 3,
726            }
727        )
728    }
729
730    #[test]
731    fn announcer_reached_max_replication_target() {
732        let local = arbitrary::gen::<NodeId>(0);
733        let seeds = arbitrary::set::<NodeId>(10..=10);
734        let unsynced = seeds.iter().skip(3).copied().collect::<BTreeSet<_>>();
735        let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
736        let config = AnnouncerConfig::public(
737            local,
738            ReplicationFactor::range(3, 6),
739            preferred_seeds.clone(),
740            BTreeSet::new(),
741            unsynced.clone(),
742        );
743        let mut announcer = Announcer::new(config).unwrap();
744        let to_sync = announcer.to_sync();
745        assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());
746
747        let mut synced_result = BTreeMap::new();
748        let mut success = None;
749        let mut successes = 0;
750
751        // Don't sync with preferred so that we don't hit that target.
752        for node in unsynced.iter() {
753            assert_ne!(*node, local);
754            let t = time::Duration::from_secs(1);
755            synced_result.insert(*node, SyncStatus::Synced { duration: t });
756            successes += 1;
757            match announcer.synced_with(*node, t) {
758                ControlFlow::Continue(progress) => {
759                    assert_eq!(progress.synced(), successes)
760                }
761                ControlFlow::Break(stop) => {
762                    success = Some(stop);
763                    break;
764                }
765            }
766        }
767        assert_eq!(*success.as_ref().unwrap().synced(), synced_result);
768        assert_eq!(
769            success.as_ref().unwrap().outcome(),
770            SuccessfulOutcome::MaxReplicationFactor {
771                preferred: 0,
772                synced: 6,
773            }
774        )
775    }
776
777    #[test]
778    fn announcer_preferred_seeds_or_replica_factor() {
779        let local = arbitrary::gen::<NodeId>(0);
780        let seeds = arbitrary::set::<NodeId>(10..=10);
781        let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
782        let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
783        let config = AnnouncerConfig::public(
784            local,
785            ReplicationFactor::range(3, 6),
786            preferred_seeds.clone(),
787            BTreeSet::new(),
788            unsynced.clone(),
789        );
790        let mut announcer = Announcer::new(config).unwrap();
791        let to_sync = announcer.to_sync();
792        assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());
793
794        let mut synced_result = BTreeMap::new();
795        let mut success = None;
796        let mut successes = 0;
797
798        // Reaches max replication factor, and stops.
799        for node in unsynced.iter() {
800            assert_ne!(*node, local);
801            let t = time::Duration::from_secs(1);
802            synced_result.insert(*node, SyncStatus::Synced { duration: t });
803            successes += 1;
804            match announcer.synced_with(*node, t) {
805                ControlFlow::Continue(progress) => {
806                    assert_eq!(progress.synced(), successes)
807                }
808                ControlFlow::Break(stop) => {
809                    success = Some(stop);
810                    break;
811                }
812            }
813        }
814        // If we try to continue to drive it forward, we get the extra sync of
815        // the preferred seed, but it stops immediately.
816        for node in preferred_seeds.iter() {
817            let t = time::Duration::from_secs(1);
818            synced_result.insert(*node, SyncStatus::Synced { duration: t });
819            successes += 1;
820            match announcer.synced_with(*node, t) {
821                ControlFlow::Continue(progress) => {
822                    assert_eq!(progress.synced(), successes)
823                }
824                ControlFlow::Break(stop) => {
825                    success = Some(stop);
826                    break;
827                }
828            }
829        }
830
831        assert_eq!(*success.as_ref().unwrap().synced(), synced_result);
832        assert_eq!(
833            success.as_ref().unwrap().outcome(),
834            SuccessfulOutcome::MaxReplicationFactor {
835                preferred: 1,
836                synced: 7,
837            }
838        )
839    }
840
841    #[test]
842    fn announcer_reached_preferred_seeds() {
843        let local = arbitrary::gen::<NodeId>(0);
844        let seeds = arbitrary::set::<NodeId>(10..=10);
845        let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
846        let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
847        let config = AnnouncerConfig::public(
848            local,
849            ReplicationFactor::must_reach(11),
850            preferred_seeds.clone(),
851            BTreeSet::new(),
852            unsynced.clone(),
853        );
854        let mut announcer = Announcer::new(config).unwrap();
855
856        let mut synced_result = BTreeMap::new();
857        let mut success = None;
858        let mut successes = 0;
859
860        // The preferred seeds then sync, allowing us to reach that part of the
861        // target
862        for node in preferred_seeds.iter() {
863            assert_ne!(*node, local);
864            let t = time::Duration::from_secs(1);
865            synced_result.insert(*node, SyncStatus::Synced { duration: t });
866            successes += 1;
867            match announcer.synced_with(*node, t) {
868                ControlFlow::Continue(progress) => {
869                    assert_eq!(progress.synced(), successes)
870                }
871                ControlFlow::Break(stop) => {
872                    success = Some(stop);
873                    break;
874                }
875            }
876        }
877
878        assert_eq!(*success.as_ref().unwrap().synced(), synced_result);
879        assert_eq!(
880            success.as_ref().unwrap().outcome(),
881            SuccessfulOutcome::PreferredNodes {
882                preferred: 2,
883                total_nodes_synced: 2,
884            }
885        )
886    }
887
888    #[test]
889    fn announcer_timed_out() {
890        let local = arbitrary::gen::<NodeId>(0);
891        let seeds = arbitrary::set::<NodeId>(10..=10);
892        let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
893        let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
894        let config = AnnouncerConfig::public(
895            local,
896            ReplicationFactor::must_reach(11),
897            preferred_seeds.clone(),
898            BTreeSet::new(),
899            unsynced.clone(),
900        );
901        let mut announcer = Announcer::new(config).unwrap();
902        let to_sync = announcer.to_sync();
903        assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());
904
905        let mut synced_result = BTreeMap::new();
906        let mut announcer_result = None;
907        let mut successes = 0;
908
909        // Simulate not being able to reach all nodes
910        for node in to_sync.iter() {
911            assert_ne!(*node, local);
912            if successes > 5 {
913                announcer_result = Some(announcer.timed_out());
914                break;
915            }
916            // Simulate not being able to reach the preferred seeds
917            if preferred_seeds.contains(node) {
918                continue;
919            }
920            let t = time::Duration::from_secs(1);
921            synced_result.insert(*node, SyncStatus::Synced { duration: t });
922            successes += 1;
923            match announcer.synced_with(*node, t) {
924                ControlFlow::Continue(progress) => {
925                    assert_eq!(progress.synced(), successes)
926                }
927                ControlFlow::Break(stop) => {
928                    announcer_result = Some(stop.into());
929                    break;
930                }
931            }
932        }
933
934        match announcer_result {
935            Some(AnnouncerResult::TimedOut(timeout)) => {
936                assert_eq!(timeout.synced, synced_result);
937                assert_eq!(
938                    timeout.timed_out,
939                    to_sync
940                        .difference(&synced_result.keys().copied().collect())
941                        .copied()
942                        .collect()
943                );
944            }
945            unexpected => panic!("Expected AnnouncerResult::TimedOut, found: {unexpected:#?}"),
946        }
947    }
948
949    #[test]
950    fn announcer_adapts_target_to_reach() {
951        let local = arbitrary::gen::<NodeId>(0);
952        // Only 3 nodes available
953        let unsynced = arbitrary::set::<NodeId>(3..=3)
954            .into_iter()
955            .collect::<BTreeSet<_>>();
956
957        let config = AnnouncerConfig::public(
958            local,
959            ReplicationFactor::must_reach(5), // Want 5 but only have 3
960            BTreeSet::new(),
961            BTreeSet::new(),
962            unsynced.clone(),
963        );
964
965        let announcer = Announcer::new(config).unwrap();
966        assert_eq!(announcer.target().replicas().lower_bound(), 3);
967    }
968
969    #[test]
970    fn announcer_with_replication_factor_zero_and_preferred_seeds() {
971        let local = arbitrary::gen::<NodeId>(0);
972        let seeds = arbitrary::set::<NodeId>(5..=5);
973
974        let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
975        let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
976
977        // Zero replication factor but with preferred seeds should work
978        let config = AnnouncerConfig::public(
979            local,
980            // Zero replication factor
981            ReplicationFactor::must_reach(0),
982            preferred_seeds.clone(),
983            BTreeSet::new(),
984            unsynced,
985        );
986
987        let mut announcer = Announcer::new(config).unwrap();
988
989        // Should succeed immediately when we sync with all preferred seeds
990        for &node in &preferred_seeds {
991            let duration = time::Duration::from_secs(1);
992            match announcer.synced_with(node, duration) {
993                ControlFlow::Continue(_) => {} // Continue until all preferred are synced
994                ControlFlow::Break(success) => {
995                    assert_eq!(
996                        success.outcome(),
997                        SuccessfulOutcome::PreferredNodes {
998                            preferred: preferred_seeds.len(),
999                            total_nodes_synced: preferred_seeds.len()
1000                        },
1001                        "Should succeed with preferred seeds even with zero replication factor"
1002                    );
1003                    return;
1004                }
1005            }
1006        }
1007
1008        panic!("Should have succeeded with preferred seeds");
1009    }
1010
1011    #[test]
1012    fn announcer_synced_with_unknown_node() {
1013        let local = arbitrary::gen::<NodeId>(0);
1014        let seeds = arbitrary::set::<NodeId>(5..=5);
1015
1016        let unsynced = seeds.iter().take(3).copied().collect::<BTreeSet<_>>();
1017        let unknown_node = arbitrary::gen::<NodeId>(100); // Node not in any set
1018
1019        let config = AnnouncerConfig::public(
1020            local,
1021            ReplicationFactor::must_reach(1),
1022            BTreeSet::new(),
1023            BTreeSet::new(),
1024            unsynced.clone(),
1025        );
1026
1027        let mut announcer = Announcer::new(config).unwrap();
1028
1029        // Try to sync with an unknown node
1030        let duration = time::Duration::from_secs(1);
1031        let mut target_reached = false;
1032        match announcer.synced_with(unknown_node, duration) {
1033            ControlFlow::Continue(_) => {}
1034            ControlFlow::Break(success) => {
1035                target_reached = true;
1036                assert_eq!(
1037                    success.outcome(),
1038                    SuccessfulOutcome::MinReplicationFactor {
1039                        preferred: 0,
1040                        synced: 1
1041                    },
1042                    "Should be able to reach target with unknown node"
1043                );
1044            }
1045        }
1046
1047        assert!(target_reached);
1048        // Verify the unknown node is now in the synced map
1049        assert!(
1050            announcer.synced.contains_key(&unknown_node),
1051            "Unknown node should be added to synced map"
1052        );
1053    }
1054
1055    #[test]
1056    fn synced_with_same_node_multiple_times() {
1057        let local = arbitrary::gen::<NodeId>(0);
1058        let unsynced = arbitrary::set::<NodeId>(3..=3)
1059            .into_iter()
1060            .collect::<BTreeSet<_>>();
1061
1062        let config = AnnouncerConfig::public(
1063            local,
1064            ReplicationFactor::must_reach(2),
1065            BTreeSet::new(),
1066            BTreeSet::new(),
1067            unsynced.clone(),
1068        );
1069
1070        let mut announcer = Announcer::new(config).unwrap();
1071        let target_node = *unsynced.iter().next().unwrap();
1072
1073        // First sync with the node
1074        let duration1 = time::Duration::from_secs(1);
1075        match announcer.synced_with(target_node, duration1) {
1076            ControlFlow::Continue(progress) => {
1077                assert_eq!(progress.synced(), 1, "First sync should count");
1078                assert_eq!(
1079                    progress.unsynced(),
1080                    unsynced.len() - 1,
1081                    "Should decrease unsynced"
1082                );
1083            }
1084            ControlFlow::Break(_) => panic!("Should not reach target yet"),
1085        }
1086
1087        // Sync with the SAME node again with different duration
1088        let duration2 = time::Duration::from_secs(5);
1089        let progress_before_duplicate = announcer.progress();
1090        match announcer.synced_with(target_node, duration2) {
1091            ControlFlow::Continue(progress) => {
1092                // Progress should be UNCHANGED since we already synced with this node
1093                assert_eq!(
1094                    progress.synced(),
1095                    progress_before_duplicate.synced(),
1096                    "Duplicate sync should not change synced count"
1097                );
1098                assert_eq!(
1099                    progress.unsynced(),
1100                    progress_before_duplicate.unsynced(),
1101                    "Duplicate sync should not change unsynced count"
1102                );
1103            }
1104            ControlFlow::Break(_) => panic!("Should not reach target with duplicate sync"),
1105        }
1106
1107        // Check that the duration was updated to the latest one
1108        assert_eq!(
1109            announcer.synced[&target_node],
1110            SyncStatus::Synced {
1111                duration: duration2
1112            },
1113            "Duplicate sync should update the duration"
1114        );
1115
1116        // Verify the node is no longer in to_sync (should have been removed on first sync)
1117        assert!(
1118            !announcer.to_sync.contains(&target_node),
1119            "Node should not be in to_sync after first sync"
1120        );
1121    }
1122
1123    #[test]
1124    fn timed_out_after_reaching_success() {
1125        let local = arbitrary::gen::<NodeId>(0);
1126        let unsynced = arbitrary::set::<NodeId>(3..=3)
1127            .into_iter()
1128            .collect::<BTreeSet<_>>();
1129
1130        let config = AnnouncerConfig::public(
1131            local,
1132            ReplicationFactor::must_reach(2),
1133            BTreeSet::new(),
1134            BTreeSet::new(),
1135            unsynced.clone(),
1136        );
1137
1138        let mut announcer = Announcer::new(config).unwrap();
1139
1140        // Sync with enough nodes to reach the target
1141        let mut synced_nodes = BTreeMap::new();
1142        for node in unsynced {
1143            let duration = time::Duration::from_secs(1);
1144            synced_nodes.insert(node, SyncStatus::Synced { duration });
1145
1146            match announcer.synced_with(node, duration) {
1147                ControlFlow::Continue(_) => continue,
1148                ControlFlow::Break(_) => break, // Reached target
1149            }
1150        }
1151
1152        // Now call timed_out even though we reached success
1153        match announcer.timed_out() {
1154            AnnouncerResult::Success(success) => {
1155                // Should return Success since target was reached
1156                assert_eq!(
1157                    success.outcome(),
1158                    SuccessfulOutcome::MinReplicationFactor {
1159                        preferred: 0,
1160                        synced: 2
1161                    },
1162                    "Should return success outcome even when called via timed_out"
1163                );
1164            }
1165            other => panic!("Expected Success via timed_out, got: {other:?}"),
1166        }
1167    }
1168
1169    #[test]
1170    fn construct_only_preferred_seeds_provided() {
1171        // Test: preferred_seeds non-empty, synced and unsynced empty
1172        // Expected: preferred seeds should be moved to to_sync, constructor succeeds
1173        let local = arbitrary::gen::<NodeId>(0);
1174        let preferred_seeds = arbitrary::set::<NodeId>(2..=2)
1175            .into_iter()
1176            .collect::<BTreeSet<_>>();
1177
1178        let config = AnnouncerConfig::public(
1179            local,
1180            ReplicationFactor::must_reach(1),
1181            preferred_seeds.clone(),
1182            BTreeSet::new(),
1183            BTreeSet::new(),
1184        );
1185
1186        let announcer = Announcer::new(config).unwrap();
1187
1188        // Constructor should move unsynced preferred seeds to to_sync
1189        assert_eq!(announcer.to_sync, preferred_seeds);
1190        assert_eq!(announcer.target().preferred_seeds(), &preferred_seeds);
1191        assert!(announcer.synced.is_empty());
1192    }
1193
1194    #[test]
1195    fn construct_node_appears_in_multiple_input_sets() {
1196        let local = arbitrary::gen::<NodeId>(0);
1197        let alice = arbitrary::gen::<NodeId>(1);
1198        let bob = arbitrary::gen::<NodeId>(2);
1199        let eve = arbitrary::gen::<NodeId>(3);
1200
1201        // alice will appear in synced and unsynced
1202        let synced = [alice].iter().copied().collect::<BTreeSet<_>>();
1203        let unsynced = [alice, bob, eve].iter().copied().collect::<BTreeSet<_>>();
1204
1205        let config = AnnouncerConfig::public(
1206            local,
1207            ReplicationFactor::must_reach(2),
1208            BTreeSet::new(),
1209            synced,
1210            unsynced,
1211        );
1212
1213        let announcer = Announcer::new(config).unwrap();
1214
1215        // synced takes precedence over to_sync when constructing
1216        assert!(
1217            announcer.synced.contains_key(&alice),
1218            "alice should be synced"
1219        );
1220        assert!(
1221            !announcer.to_sync.contains(&alice),
1222            "alice should not appear in to_sync"
1223        );
1224        // bob and eve should appear in to_sync
1225        assert!(
1226            announcer.to_sync.contains(&bob) && announcer.to_sync.contains(&eve),
1227            "Other node should be in to_sync"
1228        );
1229    }
1230
1231    #[test]
1232    fn cannot_construct_announcer() {
1233        let local = arbitrary::gen::<NodeId>(0);
1234        let seeds = arbitrary::set::<NodeId>(10..=10);
1235        let synced = seeds.iter().take(3).copied().collect::<BTreeSet<_>>();
1236        let unsynced = seeds.iter().skip(3).copied().collect::<BTreeSet<_>>();
1237        let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
1238        let replicas = ReplicationFactor::default();
1239        let config = AnnouncerConfig::public(
1240            local,
1241            ReplicationFactor::default(),
1242            BTreeSet::new(),
1243            BTreeSet::new(),
1244            BTreeSet::new(),
1245        );
1246        assert!(matches!(
1247            Announcer::new(config),
1248            Err(AnnouncerError::NoSeeds)
1249        ));
1250
1251        // No nodes to sync
1252        let config = AnnouncerConfig::public(
1253            local,
1254            replicas,
1255            preferred_seeds.clone(),
1256            synced.clone(),
1257            BTreeSet::new(),
1258        );
1259        assert!(matches!(
1260            Announcer::new(config),
1261            Err(AnnouncerError::AlreadySynced { .. })
1262        ));
1263
1264        let config = AnnouncerConfig::public(
1265            local,
1266            ReplicationFactor::must_reach(0),
1267            BTreeSet::new(),
1268            synced.clone(),
1269            unsynced.clone(),
1270        );
1271        assert!(matches!(
1272            Announcer::new(config),
1273            Err(AnnouncerError::Target(_))
1274        ));
1275
1276        let config = AnnouncerConfig::public(
1277            local,
1278            ReplicationFactor::MustReach(2),
1279            preferred_seeds.clone(),
1280            synced.clone(),
1281            unsynced.clone(),
1282        );
1283        // Min replication factor
1284        assert!(matches!(
1285            Announcer::new(config),
1286            Err(AnnouncerError::AlreadySynced { .. })
1287        ));
1288        let config = AnnouncerConfig::public(
1289            local,
1290            ReplicationFactor::range(2, 3),
1291            preferred_seeds,
1292            synced,
1293            unsynced,
1294        );
1295        // Max replication factor
1296        assert!(matches!(
1297            Announcer::new(config),
1298            Err(AnnouncerError::AlreadySynced { .. })
1299        ));
1300    }
1301
1302    #[test]
1303    fn invariant_progress_should_match_state() {
1304        let local = arbitrary::gen::<NodeId>(0);
1305        let seeds = arbitrary::set::<NodeId>(6..=6);
1306
1307        // Set up: 2 already synced, 4 unsynced initially
1308        let already_synced = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
1309        let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
1310
1311        let config = AnnouncerConfig::public(
1312            local,
1313            ReplicationFactor::must_reach(4), // Need 4 total
1314            BTreeSet::new(),                  // No preferred seeds
1315            already_synced.clone(),
1316            unsynced.clone(),
1317        );
1318
1319        let mut announcer = Announcer::new(config).unwrap();
1320
1321        // No progress made, so values should be the same
1322        assert_eq!(
1323            announcer.progress().unsynced(),
1324            announcer.to_sync().len(),
1325            "Expected unsynced progress to be the same as the number of nodes to sync"
1326        );
1327
1328        // Expected: progress.synced() should be the number of already synced nodes
1329        assert_eq!(
1330            announcer.progress().synced(),
1331            already_synced.len(),
1332            "Initial synced count should equal already synced nodes"
1333        );
1334
1335        // Now sync with one node and check progress again
1336        let first_unsynced = *unsynced.iter().next().unwrap();
1337        let duration = time::Duration::from_secs(1);
1338
1339        match announcer.synced_with(first_unsynced, duration) {
1340            ControlFlow::Continue(progress) => {
1341                assert_eq!(
1342                    progress.synced(),
1343                    already_synced.len() + 1,
1344                    "Synced count should increase by 1"
1345                );
1346
1347                assert_eq!(
1348                    progress.unsynced(),
1349                    announcer.to_sync().len(),
1350                    "Unsynced count should equal remaining to_sync length"
1351                );
1352
1353                assert_eq!(
1354                    progress.unsynced(),
1355                    unsynced.len() - 1,
1356                    "Unsynced should be original unsynced count minus nodes we've synced"
1357                );
1358            }
1359            ControlFlow::Break(outcome) => {
1360                panic!("Should not have reached target yet: {outcome:?}")
1361            }
1362        }
1363
1364        // Invariant:
1365        // synced nodes + unsynced nodes = progress.synced() + progress.unsynced()
1366        let final_progress = announcer.progress();
1367        let expected_total = already_synced.len() + unsynced.len();
1368        let actual_total = final_progress.synced() + final_progress.unsynced();
1369
1370        assert_eq!(
1371            actual_total, expected_total,
1372            "synced + unsynced should equal the total nodes we started with"
1373        );
1374    }
1375
1376    #[test]
1377    fn local_node_in_preferred_seeds() {
1378        let local = arbitrary::gen::<NodeId>(0);
1379        let other_seeds = arbitrary::set::<NodeId>(5..=5);
1380
1381        // Include local node in preferred seeds
1382        let mut preferred_seeds = other_seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
1383        preferred_seeds.insert(local);
1384
1385        let unsynced = other_seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
1386
1387        let config = AnnouncerConfig::public(
1388            local,
1389            ReplicationFactor::must_reach(3),
1390            preferred_seeds.clone(),
1391            BTreeSet::new(),
1392            unsynced.clone(),
1393        );
1394
1395        let announcer = Announcer::new(config).unwrap();
1396
1397        // Verify local node was removed from target's preferred seeds
1398        assert!(
1399            !announcer.target().preferred_seeds().contains(&local),
1400            "Local node should be removed from preferred seeds in target"
1401        );
1402
1403        // Verify local node is not in to_sync
1404        assert!(
1405            !announcer.to_sync().contains(&local),
1406            "Local node should not be in to_sync set"
1407        );
1408
1409        // The preferred seeds in the target should be one less than what we passed in
1410        assert_eq!(
1411            announcer.target().preferred_seeds().len(),
1412            preferred_seeds.len() - 1,
1413            "Target should have local node removed from preferred seeds"
1414        );
1415    }
1416
1417    #[test]
1418    fn local_node_in_synced_set() {
1419        let local = arbitrary::gen::<NodeId>(0);
1420        let other_seeds = arbitrary::set::<NodeId>(5..=5);
1421
1422        // Include local node in synced set
1423        let mut synced = other_seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
1424        synced.insert(local);
1425
1426        let unsynced = other_seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
1427
1428        let config = AnnouncerConfig::public(
1429            local,
1430            ReplicationFactor::must_reach(4),
1431            BTreeSet::new(),
1432            synced.clone(),
1433            unsynced.clone(),
1434        );
1435
1436        let announcer = Announcer::new(config).unwrap();
1437
1438        // Verify local node is not counted in synced nodes
1439        assert!(
1440            !announcer.synced.contains_key(&local),
1441            "Local node should not be in internal synced map"
1442        );
1443
1444        // Progress should reflect only the non-local synced nodes
1445        assert_eq!(
1446            announcer.progress().synced(),
1447            synced.len() - 1,
1448            "Progress should not count local node as synced"
1449        );
1450    }
1451
1452    #[test]
1453    fn local_node_in_unsynced_set() {
1454        let local = arbitrary::gen::<NodeId>(0);
1455        let other_seeds = arbitrary::set::<NodeId>(5..=5);
1456
1457        let synced = other_seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
1458
1459        // Include local node in unsynced set
1460        let mut unsynced = other_seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
1461        unsynced.insert(local);
1462
1463        let config = AnnouncerConfig::public(
1464            local,
1465            ReplicationFactor::must_reach(4),
1466            BTreeSet::new(),
1467            synced.clone(),
1468            unsynced.clone(),
1469        );
1470
1471        let announcer = Announcer::new(config).unwrap();
1472
1473        // Verify local node is not in to_sync
1474        assert!(
1475            !announcer.to_sync().contains(&local),
1476            "Local node should not be in to_sync set"
1477        );
1478
1479        // The internal to_sync should not contain local node
1480        assert!(
1481            !announcer.to_sync.contains(&local),
1482            "Internal to_sync should not contain local node"
1483        );
1484
1485        // Progress unsynced count should not include local node
1486        assert_eq!(
1487            announcer.progress().unsynced(),
1488            unsynced.len() - 1,
1489            "Progress unsynced should not count local node"
1490        );
1491    }
1492
1493    #[test]
1494    fn local_node_in_multiple_sets() {
1495        let local = arbitrary::gen::<NodeId>(0);
1496        let other_seeds = arbitrary::set::<NodeId>(5..=5);
1497
1498        // Include local node in ALL sets
1499        let mut preferred_seeds = other_seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
1500        preferred_seeds.insert(local);
1501
1502        let mut synced = other_seeds
1503            .iter()
1504            .skip(2)
1505            .take(1)
1506            .copied()
1507            .collect::<BTreeSet<_>>();
1508        synced.insert(local);
1509
1510        let mut unsynced = other_seeds.iter().skip(3).copied().collect::<BTreeSet<_>>();
1511        unsynced.insert(local);
1512
1513        let config = AnnouncerConfig::public(
1514            local,
1515            ReplicationFactor::must_reach(3),
1516            preferred_seeds.clone(),
1517            synced.clone(),
1518            unsynced.clone(),
1519        );
1520
1521        let announcer = Announcer::new(config).unwrap();
1522
1523        // Verify local node is completely absent from all internal structures
1524        assert!(
1525            !announcer.target().preferred_seeds().contains(&local),
1526            "Local node should be removed from preferred seeds"
1527        );
1528        assert!(
1529            !announcer.synced.contains_key(&local),
1530            "Local node should not be in synced map"
1531        );
1532        assert!(
1533            !announcer.to_sync().contains(&local),
1534            "Local node should not be in to_sync"
1535        );
1536        assert!(
1537            !announcer.to_sync.contains(&local),
1538            "Local node should not be in internal to_sync"
1539        );
1540
1541        // Verify counts are correct (excluding local node from all)
1542        assert_eq!(
1543            announcer.target().preferred_seeds().len(),
1544            preferred_seeds.len() - 1
1545        );
1546        assert_eq!(announcer.progress().synced(), synced.len() - 1);
1547        // The unsynced nodes includes the preferred seeds, since they are not
1548        // in the synced set, and `- 1` from each for the local node
1549        assert_eq!(
1550            announcer.progress().unsynced(),
1551            (unsynced.len() - 1) + (preferred_seeds.len() - 1)
1552        );
1553    }
1554
1555    #[test]
1556    fn synced_with_local_node_is_ignored() {
1557        let local = arbitrary::gen::<NodeId>(0);
1558        let unsynced = arbitrary::set::<NodeId>(3..=3).into_iter().collect();
1559
1560        let config = AnnouncerConfig::public(
1561            local,
1562            ReplicationFactor::must_reach(2),
1563            BTreeSet::new(),
1564            BTreeSet::new(),
1565            unsynced,
1566        );
1567
1568        let mut announcer = Announcer::new(config).unwrap();
1569        let initial_progress = announcer.progress();
1570
1571        // Try to sync with the local node - this should be ignored
1572        let duration = time::Duration::from_secs(1);
1573        match announcer.synced_with(local, duration) {
1574            ControlFlow::Continue(progress) => {
1575                // Progress should be unchanged
1576                assert_eq!(
1577                    progress.synced(),
1578                    initial_progress.synced(),
1579                    "Syncing with local node should not change synced count"
1580                );
1581                assert_eq!(
1582                    progress.unsynced(),
1583                    initial_progress.unsynced(),
1584                    "Syncing with local node should not change unsynced count"
1585                );
1586            }
1587            ControlFlow::Break(_) => panic!("Should not reach target by syncing with local node"),
1588        }
1589
1590        // Verify local node is still not in synced map
1591        assert!(
1592            !announcer.synced.contains_key(&local),
1593            "Local node should not be added to synced map"
1594        );
1595    }
1596
1597    #[test]
1598    fn local_node_only_in_all_sets_results_in_no_seeds_error() {
1599        let local = arbitrary::gen::<NodeId>(0);
1600
1601        // Create sets that contain ONLY the local node
1602        let preferred_seeds = [local].iter().copied().collect::<BTreeSet<_>>();
1603        let synced = [local].iter().copied().collect::<BTreeSet<_>>();
1604        let unsynced = [local].iter().copied().collect::<BTreeSet<_>>();
1605
1606        let config = AnnouncerConfig::public(
1607            local,
1608            ReplicationFactor::must_reach(1),
1609            preferred_seeds,
1610            synced,
1611            unsynced,
1612        );
1613
1614        // After removing local node from all sets, we should get NoSeeds error
1615        assert_matches!(Announcer::new(config), Err(AnnouncerError::NoSeeds));
1616    }
1617}