Skip to main content

distributed_topic_tracker/
config.rs

1use std::time::Duration;
2
3/// Timeout settings for gossip operations.
4#[derive(Debug, Clone)]
5pub struct TimeoutConfig {
6    join_peer_timeout: Duration,
7    broadcast_timeout: Duration,
8    broadcast_neighbors_timeout: Duration,
9}
10
11impl TimeoutConfig {
12    /// Create a new `TimeoutConfigBuilder` with default values.
13    pub fn builder() -> TimeoutConfigBuilder {
14        TimeoutConfigBuilder {
15            timeouts: TimeoutConfig::default(),
16        }
17    }
18
19    /// How long to wait when joining a peer.
20    ///
21    /// Default: 5s.
22    pub fn join_peer_timeout(&self) -> Duration {
23        self.join_peer_timeout
24    }
25
26    /// How long to wait when broadcasting messages.
27    ///
28    /// Default: 5s.
29    pub fn broadcast_timeout(&self) -> Duration {
30        self.broadcast_timeout
31    }
32
33    /// How long to wait when broadcasting to neighbors.
34    ///
35    /// Default: 5s.
36    pub fn broadcast_neighbors_timeout(&self) -> Duration {
37        self.broadcast_neighbors_timeout
38    }
39}
40
41impl Default for TimeoutConfig {
42    fn default() -> Self {
43        Self {
44            join_peer_timeout: Duration::from_secs(5),
45            broadcast_timeout: Duration::from_secs(5),
46            broadcast_neighbors_timeout: Duration::from_secs(5),
47        }
48    }
49}
50
51/// Builder for `TimeoutConfig`.
52#[derive(Debug)]
53pub struct TimeoutConfigBuilder {
54    timeouts: TimeoutConfig,
55}
56
57impl TimeoutConfigBuilder {
58    /// How long to wait when joining a peer.
59    ///
60    /// Default: 5s.
61    pub fn join_peer_timeout(mut self, timeout: Duration) -> Self {
62        self.timeouts.join_peer_timeout = timeout;
63        self
64    }
65
66    /// How long to wait when broadcasting messages.
67    ///
68    /// Default: 5s.
69    pub fn broadcast_timeout(mut self, timeout: Duration) -> Self {
70        self.timeouts.broadcast_timeout = timeout;
71        self
72    }
73
74    /// How long to wait when broadcasting to neighbors.
75    ///
76    /// Default: 5s.
77    pub fn broadcast_neighbors_timeout(mut self, timeout: Duration) -> Self {
78        self.timeouts.broadcast_neighbors_timeout = timeout;
79        self
80    }
81
82    /// Build the `TimeoutConfig`.
83    pub fn build(self) -> TimeoutConfig {
84        self.timeouts
85    }
86}
87
88/// DHT operation settings including retry logic and timeouts.
89#[derive(Debug, Clone)]
90pub struct DhtConfig {
91    retries: usize,
92    base_retry_interval: Duration,
93    max_retry_jitter: Duration,
94    put_timeout: Duration,
95    get_timeout: Duration,
96}
97
98/// Builder for `DhtConfig`.
99#[derive(Debug, Clone)]
100pub struct DhtConfigBuilder {
101    config: DhtConfig,
102}
103
104impl DhtConfigBuilder {
105    /// Number of retries after the initial attempt.
106    ///
107    /// Total attempts = 1 + retries.
108    ///
109    /// Default: 3.
110    pub fn retries(mut self, retries: usize) -> Self {
111        self.config.retries = retries;
112        self
113    }
114
115    /// Base delay between retries. No-op if `interval` is `Duration::ZERO`.
116    ///
117    /// If `base_retry_interval` is called only once with `Duration::ZERO`, default value prevails.
118    /// If `base_retry_interval` is first called with a > `Duration::ZERO`, and then again with `Duration::ZERO`, the first set value is kept.
119    ///
120    /// Default: 5s.
121    pub fn base_retry_interval(mut self, interval: Duration) -> Self {
122        if interval > Duration::ZERO {
123            self.config.base_retry_interval = interval;
124        }
125        self
126    }
127
128    /// Max random jitter added to retry interval.
129    ///
130    /// Default: 10s.
131    pub fn max_retry_jitter(mut self, jitter: Duration) -> Self {
132        self.config.max_retry_jitter = jitter;
133        self
134    }
135
136    /// Timeout for DHT put operations. No-op if `timeout` is `Duration::ZERO`.
137    ///
138    /// If `put_timeout` is called only once with `Duration::ZERO`, default value prevails.
139    /// If `put_timeout` is first called with a > `Duration::ZERO`, and then again with `Duration::ZERO`, the first set value is kept.
140    ///
141    /// Default: 10s.
142    pub fn put_timeout(mut self, timeout: Duration) -> Self {
143        if timeout > Duration::ZERO {
144            self.config.put_timeout = timeout;
145        }
146        self
147    }
148
149    /// Timeout for DHT get operations. No-op if `timeout` is `Duration::ZERO`.
150    ///
151    /// If `get_timeout` is called only once with `Duration::ZERO`, default value prevails.
152    /// If `get_timeout` is first called with a > `Duration::ZERO`, and then again with `Duration::ZERO`, the first set value is kept.
153    ///
154    /// Default: 10s.
155    pub fn get_timeout(mut self, timeout: Duration) -> Self {
156        if timeout > Duration::ZERO {
157            self.config.get_timeout = timeout;
158        }
159        self
160    }
161
162    /// Build the `DhtConfig`.
163    pub fn build(self) -> DhtConfig {
164        self.config
165    }
166}
167
168impl DhtConfig {
169    /// Create a new `DhtConfigBuilder` with default values.
170    pub fn builder() -> DhtConfigBuilder {
171        DhtConfigBuilder {
172            config: DhtConfig::default(),
173        }
174    }
175
176    /// Number of retries after the initial attempt.
177    ///
178    /// Total attempts = 1 + retries.
179    ///
180    /// Default: 3.
181    pub fn retries(&self) -> usize {
182        self.retries
183    }
184
185    /// Base delay between retries.
186    ///
187    /// Default: 5s.
188    pub fn base_retry_interval(&self) -> Duration {
189        self.base_retry_interval
190    }
191
192    /// Max random jitter added to retry interval.
193    ///
194    /// Default: 10s.
195    pub fn max_retry_jitter(&self) -> Duration {
196        self.max_retry_jitter
197    }
198
199    /// Timeout for DHT put operations.
200    ///
201    /// Default: 10s.
202    pub fn put_timeout(&self) -> Duration {
203        self.put_timeout
204    }
205
206    /// Timeout for DHT get operations.
207    ///
208    /// Default: 10s.
209    pub fn get_timeout(&self) -> Duration {
210        self.get_timeout
211    }
212}
213
214impl Default for DhtConfig {
215    fn default() -> Self {
216        Self {
217            retries: 3,
218            base_retry_interval: Duration::from_secs(5),
219            max_retry_jitter: Duration::from_secs(10),
220            put_timeout: Duration::from_secs(10),
221            get_timeout: Duration::from_secs(10),
222        }
223    }
224}
225
226/// Bubble merge strategy config for detecting and healing split-brain scenarios by joining small clusters with peers advertised in DHT that are not our neighbors.
227#[derive(Debug, Clone)]
228pub enum BubbleMergeConfig {
229    Enabled(BubbleMergeConfigInner),
230    Disabled,
231}
232
233#[derive(Debug, Clone)]
234pub struct BubbleMergeConfigInner {
235    initial_interval: Duration,
236    base_interval: Duration,
237    max_jitter: Duration,
238    min_neighbors: usize,
239    fail_topic_creation_on_merge_startup_failure: bool,
240    max_join_peers: usize,
241}
242
243#[derive(Debug, Clone)]
244pub struct BubbleMergeConfigBuilder {
245    config: BubbleMergeConfigInner,
246}
247
248impl Default for BubbleMergeConfig {
249    fn default() -> Self {
250        Self::Enabled(BubbleMergeConfigInner::default())
251    }
252}
253
254impl Default for BubbleMergeConfigInner {
255    fn default() -> Self {
256        Self {
257            initial_interval: Duration::from_secs(30),
258            base_interval: Duration::from_secs(60),
259            max_jitter: Duration::from_secs(120),
260            min_neighbors: 4,
261            fail_topic_creation_on_merge_startup_failure: true,
262            max_join_peers: 2,
263        }
264    }
265}
266
267impl BubbleMergeConfig {
268    /// Create a new `BubbleMergeConfigBuilder` with default values.
269    pub fn builder() -> BubbleMergeConfigBuilder {
270        BubbleMergeConfigBuilder {
271            config: BubbleMergeConfigInner::default(),
272        }
273    }
274}
275
276impl BubbleMergeConfigInner {
277    /// Initial delay before starting bubble merge attempts.
278    ///
279    /// Default: 30s.
280    pub fn initial_interval(&self) -> Duration {
281        self.initial_interval
282    }
283
284    /// Base interval for bubble merge attempts.
285    ///
286    /// `base_interval` > Duration::ZERO
287    ///
288    /// Default: 60s.
289    pub fn base_interval(&self) -> Duration {
290        self.base_interval
291    }
292
293    /// Max random jitter added to bubble merge interval.
294    ///
295    /// Default: 120s.
296    pub fn max_jitter(&self) -> Duration {
297        self.max_jitter
298    }
299
300    /// Minimum number of neighbors required to attempt a bubble merge.
301    ///
302    /// Default: 4.
303    pub fn min_neighbors(&self) -> usize {
304        self.min_neighbors
305    }
306
307    /// Whether to fail topic creation
308    ///
309    /// If a bubble merge startup check fails (ret Err()) or just log and run topic without.
310    ///
311    /// Default: true.
312    pub fn fail_topic_creation_on_merge_startup_failure(&self) -> bool {
313        self.fail_topic_creation_on_merge_startup_failure
314    }
315
316    /// Max number of peers to join during a bubble merge attempt.
317    ///
318    /// Default: 2.
319    pub fn max_join_peers(&self) -> usize {
320        self.max_join_peers
321    }
322}
323
324impl BubbleMergeConfigBuilder {
325    /// Initial delay before starting bubble merge attempts.
326    ///
327    /// Default: 30s.
328    pub fn initial_interval(mut self, interval: Duration) -> Self {
329        self.config.initial_interval = interval;
330        self
331    }
332
333    /// Base interval for bubble merge attempts. No-op if `interval` is `Duration::ZERO`.
334    ///
335    /// If `base_interval` is called only once with `Duration::ZERO`, default value prevails.
336    /// If `base_interval` is first called with a > `Duration::ZERO`, and then again with `Duration::ZERO`, the first set value is kept.
337    ///
338    /// Default: 60s.
339    pub fn base_interval(mut self, interval: Duration) -> Self {
340        if interval > Duration::ZERO {
341            self.config.base_interval = interval;
342        }
343        self
344    }
345
346    /// Max random jitter added to bubble merge interval.
347    ///
348    /// Default: 120s
349    pub fn max_jitter(mut self, jitter: Duration) -> Self {
350        self.config.max_jitter = jitter;
351        self
352    }
353
354    /// Minimum number of neighbors required to attempt a bubble merge.
355    ///
356    /// Default: 4.
357    pub fn min_neighbors(mut self, min_neighbors: usize) -> Self {
358        self.config.min_neighbors = min_neighbors;
359        self
360    }
361
362    /// Whether to fail topic creation
363    ///
364    /// If a bubble merge startup check fails (ret Err()) or just log and run topic without.
365    ///
366    /// Default: true.
367    pub fn fail_topic_creation_on_merge_startup_failure(mut self, fail: bool) -> Self {
368        self.config.fail_topic_creation_on_merge_startup_failure = fail;
369        self
370    }
371
372    /// Max number of peers to join during a bubble merge attempt. No-op if `max_join_peers` is zero.
373    ///
374    /// If `max_join_peers` is called only once with zero, default value prevails.
375    /// If `max_join_peers` is first called with a > zero, and then again with zero, the first set value is kept.
376    ///
377    /// Default: 2.
378    pub fn max_join_peers(mut self, max_join_peers: usize) -> Self {
379        if max_join_peers > 0 {
380            self.config.max_join_peers = max_join_peers;
381        }
382        self
383    }
384
385    /// Build the `BubbleMergeConfig`.
386    pub fn build(self) -> BubbleMergeConfig {
387        BubbleMergeConfig::Enabled(self.config)
388    }
389}
390
391/// Message overlap merge strategy config for detecting and healing split-brain scenarios by checking for overlapping message hashes with other cluster peers.
392#[derive(Debug, Clone)]
393pub enum MessageOverlapMergeConfig {
394    Enabled(MessageOverlapMergeConfigInner),
395    Disabled,
396}
397
398#[derive(Debug, Clone)]
399pub struct MessageOverlapMergeConfigInner {
400    initial_interval: Duration,
401    base_interval: Duration,
402    max_jitter: Duration,
403    fail_topic_creation_on_merge_startup_failure: bool,
404    max_join_peers: usize,
405}
406
407#[derive(Debug, Clone)]
408pub struct MessageOverlapMergeConfigBuilder {
409    config: MessageOverlapMergeConfigInner,
410}
411
412impl Default for MessageOverlapMergeConfigInner {
413    fn default() -> Self {
414        Self {
415            initial_interval: Duration::from_secs(30),
416            base_interval: Duration::from_secs(60),
417            max_jitter: Duration::from_secs(120),
418            fail_topic_creation_on_merge_startup_failure: true,
419            max_join_peers: 2,
420        }
421    }
422}
423
424impl Default for MessageOverlapMergeConfig {
425    fn default() -> Self {
426        Self::Enabled(MessageOverlapMergeConfigInner::default())
427    }
428}
429
430impl MessageOverlapMergeConfig {
431    /// Create a new `MessageOverlapMergeConfigBuilder` with default values.
432    pub fn builder() -> MessageOverlapMergeConfigBuilder {
433        MessageOverlapMergeConfigBuilder {
434            config: MessageOverlapMergeConfigInner::default(),
435        }
436    }
437}
438
439impl MessageOverlapMergeConfigInner {
440    /// Initial delay before starting message overlap merge attempts.
441    ///
442    /// Default: 30s.
443    pub fn initial_interval(&self) -> Duration {
444        self.initial_interval
445    }
446
447    /// Base interval for message overlap merge attempts.
448    ///
449    /// `base_interval` > Duration::ZERO
450    ///
451    /// Default: 60s.
452    pub fn base_interval(&self) -> Duration {
453        self.base_interval
454    }
455
456    /// Max random jitter added to message overlap merge interval.
457    ///
458    /// Default: 120s.
459    pub fn max_jitter(&self) -> Duration {
460        self.max_jitter
461    }
462
463    /// Whether to fail topic creation
464    ///
465    /// If a message overlap merge startup check fails (ret Err()) or just log and run topic without.
466    ///
467    /// Default: true.
468    pub fn fail_topic_creation_on_merge_startup_failure(&self) -> bool {
469        self.fail_topic_creation_on_merge_startup_failure
470    }
471
472    /// Max number of peers to join during a message overlap merge attempt.
473    ///
474    /// Default: 2.
475    pub fn max_join_peers(&self) -> usize {
476        self.max_join_peers
477    }
478}
479
480impl MessageOverlapMergeConfigBuilder {
481    /// Initial delay before starting message overlap merge attempts.
482    ///
483    /// Default: 30s.
484    pub fn initial_interval(mut self, interval: Duration) -> Self {
485        self.config.initial_interval = interval;
486        self
487    }
488
489    /// Base interval for message overlap merge attempts. No-op if `interval` is `Duration::ZERO`.
490    ///
491    /// If `base_interval` is called only once with `Duration::ZERO`, default value prevails.
492    /// If `base_interval` is first called with a > `Duration::ZERO`, and then again with `Duration::ZERO`, the first set value is kept.
493    ///
494    /// Default: 60s.
495    pub fn base_interval(mut self, interval: Duration) -> Self {
496        if interval > Duration::ZERO {
497            self.config.base_interval = interval;
498        }
499        self
500    }
501
502    /// Max random jitter added to message overlap merge interval.
503    ///
504    /// Default: 120s. Minimum is 0s.
505    pub fn max_jitter(mut self, jitter: Duration) -> Self {
506        self.config.max_jitter = jitter;
507        self
508    }
509
510    /// Whether to fail topic creation
511    ///
512    /// If a message overlap merge startup check fails (ret Err()) or just log and run topic without.
513    ///
514    /// Default: true.
515    pub fn fail_topic_creation_on_merge_startup_failure(mut self, fail: bool) -> Self {
516        self.config.fail_topic_creation_on_merge_startup_failure = fail;
517        self
518    }
519
520    /// Max number of peers to join during a message overlap merge attempt. No-op if `max_join_peers` is zero.
521    ///
522    /// If `max_join_peers` is called only once with zero, default value prevails.
523    /// If `max_join_peers` is first called with a > zero, and then again with zero, the first set value is kept.
524    ///
525    /// Default: 2.
526    pub fn max_join_peers(mut self, max_join_peers: usize) -> Self {
527        if max_join_peers > 0 {
528            self.config.max_join_peers = max_join_peers;
529        }
530        self
531    }
532
533    /// Build the `MessageOverlapMergeConfig`.
534    pub fn build(self) -> MessageOverlapMergeConfig {
535        MessageOverlapMergeConfig::Enabled(self.config)
536    }
537}
538
539/// Publisher strategy config for publishing bootstrap records to DHT for peer discovery.
540#[derive(Debug, Clone)]
541pub enum PublisherConfig {
542    Enabled(PublisherConfigInner),
543    Disabled,
544}
545
546#[derive(Debug, Clone)]
547pub struct PublisherConfigInner {
548    initial_delay: Duration,
549    base_interval: Duration,
550    max_jitter: Duration,
551    fail_topic_creation_on_publishing_startup_failure: bool,
552}
553
554#[derive(Debug, Clone)]
555pub struct PublisherConfigBuilder {
556    config: PublisherConfigInner,
557}
558
559impl Default for PublisherConfigInner {
560    fn default() -> Self {
561        Self {
562            initial_delay: Duration::from_secs(10),
563            base_interval: Duration::from_secs(10),
564            max_jitter: Duration::from_secs(50),
565            fail_topic_creation_on_publishing_startup_failure: true,
566        }
567    }
568}
569
570impl Default for PublisherConfig {
571    fn default() -> Self {
572        Self::Enabled(PublisherConfigInner::default())
573    }
574}
575
576impl PublisherConfig {
577    /// Create a new `PublisherConfigBuilder` with default values.
578    pub fn builder() -> PublisherConfigBuilder {
579        PublisherConfigBuilder {
580            config: PublisherConfigInner::default(),
581        }
582    }
583}
584
585impl PublisherConfigInner {
586    /// Initial delay before starting publisher.
587    ///
588    /// Default: 10s.
589    pub fn initial_delay(&self) -> Duration {
590        self.initial_delay
591    }
592
593    /// Base interval for publisher attempts.
594    ///
595    /// `base_interval` > Duration::ZERO
596    ///
597    /// Default: 10s.
598    pub fn base_interval(&self) -> Duration {
599        self.base_interval
600    }
601
602    /// Max random jitter added to publisher interval.
603    ///
604    /// Default: 50s.
605    pub fn max_jitter(&self) -> Duration {
606        self.max_jitter
607    }
608
609    /// Whether to fail topic creation
610    ///
611    /// If a publisher startup check fails (ret Err()) or just log and run topic without.
612    ///
613    /// Default: true.
614    pub fn fail_topic_creation_on_publishing_startup_failure(&self) -> bool {
615        self.fail_topic_creation_on_publishing_startup_failure
616    }
617}
618
619impl PublisherConfigBuilder {
620    /// Initial delay before starting publisher.
621    ///
622    /// Default: 10s.
623    pub fn initial_delay(mut self, delay: Duration) -> Self {
624        self.config.initial_delay = delay;
625        self
626    }
627
628    /// Base interval for publisher attempts. No-op if `interval` is `Duration::ZERO`.
629    ///
630    /// If `base_interval` is called only once with `Duration::ZERO`, default value prevails.
631    /// If `base_interval` is first called with a > `Duration::ZERO`, and then again with `Duration::ZERO`, the first set value is kept.
632    ///
633    /// Default: 10s.
634    pub fn base_interval(mut self, interval: Duration) -> Self {
635        if interval > Duration::ZERO {
636            self.config.base_interval = interval;
637        }
638        self
639    }
640
641    /// Max random jitter added to publisher interval.
642    ///
643    /// Default: 50s.
644    pub fn max_jitter(mut self, jitter: Duration) -> Self {
645        self.config.max_jitter = jitter;
646        self
647    }
648
649    /// Whether to fail topic creation
650    ///
651    /// If a publisher startup check fails (ret Err()) or just log and run topic without.
652    ///
653    /// Default: true.
654    pub fn fail_topic_creation_on_publishing_startup_failure(mut self, fail: bool) -> Self {
655        self.config
656            .fail_topic_creation_on_publishing_startup_failure = fail;
657        self
658    }
659
660    /// Build the `PublisherConfig`.
661    pub fn build(self) -> PublisherConfig {
662        PublisherConfig::Enabled(self.config)
663    }
664}
665
666/// Merge strategies run periodically in the background and attempt to merge split clusters by joining peers in DHT records
667/// and message hashes for bubble detection and merging, and by joining peers in DHT records with overlapping message hashes
668/// for message overlap detection and merging.
669#[derive(Debug, Clone, Default)]
670pub struct MergeConfig {
671    bubble_merge: BubbleMergeConfig,
672    message_overlap_merge: MessageOverlapMergeConfig,
673}
674
675/// Builder for `MergeConfig`.
676#[derive(Debug, Clone)]
677pub struct MergeConfigBuilder {
678    config: MergeConfig,
679}
680
681impl MergeConfig {
682    /// Create a new `MergeConfigBuilder` with default values.
683    pub fn builder() -> MergeConfigBuilder {
684        MergeConfigBuilder {
685            config: MergeConfig::default(),
686        }
687    }
688
689    /// Bubble merge strategy config.
690    ///
691    /// Default: BubbleMergeConfig::default()
692    pub fn bubble_merge(&self) -> &BubbleMergeConfig {
693        &self.bubble_merge
694    }
695
696    /// Message overlap merge strategy config.
697    ///
698    /// Default: MessageOverlapMergeConfig::default()
699    pub fn message_overlap_merge(&self) -> &MessageOverlapMergeConfig {
700        &self.message_overlap_merge
701    }
702}
703
704impl MergeConfigBuilder {
705    /// Bubble merge strategy config.
706    ///
707    /// Default: BubbleMergeConfig::default()
708    pub fn bubble_merge(mut self, bubble_merge: BubbleMergeConfig) -> Self {
709        self.config.bubble_merge = bubble_merge;
710        self
711    }
712
713    /// Message overlap merge strategy config.
714    ///
715    /// Default: MessageOverlapMergeConfig::default()
716    pub fn message_overlap_merge(
717        mut self,
718        message_overlap_merge: MessageOverlapMergeConfig,
719    ) -> Self {
720        self.config.message_overlap_merge = message_overlap_merge;
721        self
722    }
723
724    /// Build the `MergeConfig`.
725    pub fn build(self) -> MergeConfig {
726        self.config
727    }
728}
729
730/// Bootstrap process settings for peer discovery.
731#[derive(Debug, Clone)]
732pub struct BootstrapConfig {
733    max_bootstrap_records: usize,
734    no_peers_retry_interval: Duration,
735    per_peer_join_settle_time: Duration,
736    join_confirmation_wait_time: Duration,
737    discovery_poll_interval: Duration,
738    publish_record_on_startup: bool,
739    check_older_records_first_on_startup: bool,
740}
741
742impl Default for BootstrapConfig {
743    fn default() -> Self {
744        Self {
745            max_bootstrap_records: 5,
746            no_peers_retry_interval: Duration::from_millis(1500),
747            per_peer_join_settle_time: Duration::from_millis(100),
748            join_confirmation_wait_time: Duration::from_millis(500),
749            discovery_poll_interval: Duration::from_millis(2000),
750            publish_record_on_startup: true,
751            check_older_records_first_on_startup: false,
752        }
753    }
754}
755
756/// Builder for `BootstrapConfig`.
757#[derive(Debug)]
758pub struct BootstrapConfigBuilder {
759    config: BootstrapConfig,
760}
761
762impl BootstrapConfigBuilder {
763    /// Max bootstrap records per topic per minute slot.
764    ///
765    /// If zero, we don't publish (PublisherConfig will be set to Disabled).
766    ///
767    /// Default: 5.
768    pub fn max_bootstrap_records(mut self, max_records: usize) -> Self {
769        self.config.max_bootstrap_records = max_records;
770        self
771    }
772
773    /// How long to wait when no peers are found before retrying.
774    ///
775    /// Default: 1500ms.
776    pub fn no_peers_retry_interval(mut self, interval: Duration) -> Self {
777        self.config.no_peers_retry_interval = interval;
778        self
779    }
780
781    /// How long to wait after joining a peer before attempting to join another.
782    ///
783    /// Default: 100ms.
784    pub fn per_peer_join_settle_time(mut self, interval: Duration) -> Self {
785        self.config.per_peer_join_settle_time = interval;
786        self
787    }
788
789    /// How long to wait after joining a peer before checking if joined successfully.
790    ///
791    /// Default: 500ms.
792    pub fn join_confirmation_wait_time(mut self, interval: Duration) -> Self {
793        self.config.join_confirmation_wait_time = interval;
794        self
795    }
796
797    /// How long to wait between DHT discovery attempts.
798    ///
799    /// Default: 2000ms.
800    pub fn discovery_poll_interval(mut self, interval: Duration) -> Self {
801        self.config.discovery_poll_interval = interval;
802        self
803    }
804
805    /// Whether to publish a bootstrap record unconditionally on startup before dht get.
806    ///
807    /// Default: true.
808    pub fn publish_record_on_startup(mut self, publish: bool) -> Self {
809        self.config.publish_record_on_startup = publish;
810        self
811    }
812
813    /// Whether to check `unix_minute` and `unix_minute-1` or `unix_minute-1` and `unix_minute-2` on startup.
814    ///
815    /// If this is enabled, we first fetch `unix_minute-1` and `unix_minute-2`.
816    ///  
817    /// If joining longer running, existing topics is priority, set to true.
818    /// If minimizing bootstrap time for cluster cold starts (2+ nodes starting roughly
819    /// at the same time into a topic without peers), set to false.
820    ///
821    /// Default: false.
822    pub fn check_older_records_first_on_startup(mut self, check: bool) -> Self {
823        self.config.check_older_records_first_on_startup = check;
824        self
825    }
826
827    /// Build the `BootstrapConfig`.
828    pub fn build(self) -> BootstrapConfig {
829        self.config
830    }
831}
832
833impl BootstrapConfig {
834    /// Create a new `BootstrapConfigBuilder` with default values.
835    pub fn builder() -> BootstrapConfigBuilder {
836        BootstrapConfigBuilder {
837            config: BootstrapConfig::default(),
838        }
839    }
840
841    /// Max bootstrap records per topic per minute slot.
842    ///
843    /// If zero, we don't publish (PublisherConfig will be set to Disabled).
844    ///
845    /// Default: 5.
846    pub fn max_bootstrap_records(&self) -> usize {
847        self.max_bootstrap_records
848    }
849
850    /// How long to wait when no peers are found before retrying.
851    ///
852    /// Default: 1500ms.
853    pub fn no_peers_retry_interval(&self) -> Duration {
854        self.no_peers_retry_interval
855    }
856
857    /// How long to wait after joining a peer before attempting to join another.
858    ///
859    /// Default: 100ms.
860    pub fn per_peer_join_settle_time(&self) -> Duration {
861        self.per_peer_join_settle_time
862    }
863
864    /// How long to wait after joining a peer before checking if joined successfully.
865    ///
866    /// Default: 500ms.
867    pub fn join_confirmation_wait_time(&self) -> Duration {
868        self.join_confirmation_wait_time
869    }
870
871    /// How long to wait between DHT discovery attempts.
872    ///
873    /// Default: 2000ms.
874    pub fn discovery_poll_interval(&self) -> Duration {
875        self.discovery_poll_interval
876    }
877
878    /// Whether to publish a bootstrap record unconditionally on startup before dht get.
879    ///
880    /// Default: true.
881    pub fn publish_record_on_startup(&self) -> bool {
882        self.publish_record_on_startup
883    }
884
885    /// Whether to check `unix_minute` and `unix_minute-1` or `unix_minute-1` and `unix_minute-2` on startup.
886    ///
887    /// If this is enabled, we first fetch `unix_minute-1` and `unix_minute-2`.  
888    ///  
889    /// If joining longer running, existing topics is priority, set to true.
890    /// If minimizing bootstrap time for cluster cold starts (2+ nodes starting roughly
891    /// at the same time into a topic without peers), set to false.
892    ///
893    /// Default: false.
894    pub fn check_older_records_first_on_startup(&self) -> bool {
895        self.check_older_records_first_on_startup
896    }
897}
898
899/// Top-level configuration combining all settings.
900#[derive(Debug, Clone)]
901pub struct Config {
902    bootstrap_config: BootstrapConfig,
903    publisher_config: PublisherConfig,
904    dht_config: DhtConfig,
905
906    merge_config: MergeConfig,
907
908    max_join_peer_count: usize,
909    timeouts: TimeoutConfig,
910}
911
912impl Config {
913    /// Create a new `ConfigBuilder` with default values.
914    pub fn builder() -> ConfigBuilder {
915        ConfigBuilder {
916            config: Config::default(),
917        }
918    }
919
920    /// Publisher strategy config.
921    ///
922    /// Default: PublisherConfig::default().
923    pub fn publisher_config(&self) -> &PublisherConfig {
924        &self.publisher_config
925    }
926
927    /// DHT operation settings.
928    ///
929    /// Default: DhtConfig::default().
930    pub fn dht_config(&self) -> &DhtConfig {
931        &self.dht_config
932    }
933
934    /// Bootstrap strategy settings.
935    ///
936    /// Default: BootstrapConfig::default().
937    pub fn bootstrap_config(&self) -> &BootstrapConfig {
938        &self.bootstrap_config
939    }
940
941    /// Max peers to join simultaneously.
942    ///
943    /// Minimum is 1.
944    ///
945    /// Default: 4.
946    pub fn max_join_peer_count(&self) -> usize {
947        self.max_join_peer_count
948    }
949
950    /// Timeout settings.
951    ///
952    /// Default: TimeoutConfig::default().
953    pub fn timeouts(&self) -> &TimeoutConfig {
954        &self.timeouts
955    }
956
957    /// Merge strategy settings.
958    ///
959    /// Default: bubble and overlap merges enabled.
960    pub fn merge_config(&self) -> &MergeConfig {
961        &self.merge_config
962    }
963}
964
965impl Default for Config {
966    fn default() -> Self {
967        Self {
968            merge_config: MergeConfig::default(),
969            bootstrap_config: BootstrapConfig::default(),
970            publisher_config: PublisherConfig::default(),
971            dht_config: DhtConfig::default(),
972            max_join_peer_count: 4,
973            timeouts: TimeoutConfig::default(),
974        }
975    }
976}
977
978/// Builder for `Config`.
979#[derive(Debug)]
980pub struct ConfigBuilder {
981    config: Config,
982}
983
984impl ConfigBuilder {
985    /// Merge strategy settings.
986    ///
987    /// Default: MergeConfig::default().
988    pub fn merge_config(mut self, merge_config: MergeConfig) -> Self {
989        self.config.merge_config = merge_config;
990        self
991    }
992
993    /// Publisher strategy config.
994    ///
995    /// Default: PublisherConfig::default().
996    pub fn publisher_config(mut self, publisher_config: PublisherConfig) -> Self {
997        self.config.publisher_config = publisher_config;
998        self
999    }
1000
1001    /// DHT operation settings.
1002    ///
1003    /// Default: DhtConfig::default().
1004    pub fn dht_config(mut self, dht_config: DhtConfig) -> Self {
1005        self.config.dht_config = dht_config;
1006        self
1007    }
1008
1009    /// Bootstrap strategy settings.
1010    ///
1011    /// Default: BootstrapConfig::default().
1012    pub fn bootstrap_config(mut self, bootstrap_config: BootstrapConfig) -> Self {
1013        self.config.bootstrap_config = bootstrap_config;
1014        self
1015    }
1016
1017    /// Max peers to join simultaneously. No-op if `max_join_peer_count` is zero.
1018    ///
1019    /// If `max_join_peer_count` is called only once with zero, default value prevails.
1020    /// If `max_join_peer_count` is first called with a non-zero value, and then again with zero, the first set value is kept.
1021    ///
1022    /// Default: 4.
1023    pub fn max_join_peer_count(mut self, max_peers: usize) -> Self {
1024        if max_peers > 0 {
1025            self.config.max_join_peer_count = max_peers;
1026        }
1027        self
1028    }
1029
1030    /// Timeout settings.
1031    ///
1032    /// Default: TimeoutConfig::default().
1033    pub fn timeouts(mut self, timeouts: TimeoutConfig) -> Self {
1034        self.config.timeouts = timeouts;
1035        self
1036    }
1037
1038    /// Build the `Config`.
1039    ///
1040    /// If `max_bootstrap_records` is zero, `PublisherConfig` is set to `Disabled`.
1041    pub fn build(self) -> Config {
1042        let mut config = self.config;
1043        if config.bootstrap_config.max_bootstrap_records == 0
1044            && matches!(config.publisher_config, PublisherConfig::Enabled(_))
1045        {
1046            // if max_bootstrap_records is zero, we don't publish, so disable publisher to avoid confusion
1047            tracing::warn!(
1048                "Publisher is enabled via PublisherConfig::Enabled(_) but BootstrapConfig.max_bootstrap_records is set to 0 (we effectively never publish). Overriding PublisherConfig to PublisherConfig::Disabled."
1049            );
1050            config.publisher_config = PublisherConfig::Disabled;
1051        }
1052
1053        config
1054    }
1055}