distributed_topic_tracker/
config.rs1use std::time::Duration;
2
3#[derive(Debug, Clone)]
5pub struct TimeoutConfig {
6 join_peer_timeout: Duration,
7 broadcast_timeout: Duration,
8 broadcast_neighbors_timeout: Duration,
9}
10
11impl TimeoutConfig {
12 pub fn builder() -> TimeoutConfigBuilder {
14 TimeoutConfigBuilder {
15 timeouts: TimeoutConfig::default(),
16 }
17 }
18
19 pub fn join_peer_timeout(&self) -> Duration {
23 self.join_peer_timeout
24 }
25
26 pub fn broadcast_timeout(&self) -> Duration {
30 self.broadcast_timeout
31 }
32
33 pub fn broadcast_neighbors_timeout(&self) -> Duration {
37 self.broadcast_neighbors_timeout
38 }
39}
40
41impl Default for TimeoutConfig {
42 fn default() -> Self {
43 Self {
44 join_peer_timeout: Duration::from_secs(5),
45 broadcast_timeout: Duration::from_secs(5),
46 broadcast_neighbors_timeout: Duration::from_secs(5),
47 }
48 }
49}
50
51#[derive(Debug)]
53pub struct TimeoutConfigBuilder {
54 timeouts: TimeoutConfig,
55}
56
57impl TimeoutConfigBuilder {
58 pub fn join_peer_timeout(mut self, timeout: Duration) -> Self {
62 self.timeouts.join_peer_timeout = timeout;
63 self
64 }
65
66 pub fn broadcast_timeout(mut self, timeout: Duration) -> Self {
70 self.timeouts.broadcast_timeout = timeout;
71 self
72 }
73
74 pub fn broadcast_neighbors_timeout(mut self, timeout: Duration) -> Self {
78 self.timeouts.broadcast_neighbors_timeout = timeout;
79 self
80 }
81
82 pub fn build(self) -> TimeoutConfig {
84 self.timeouts
85 }
86}
87
88#[derive(Debug, Clone)]
90pub struct DhtConfig {
91 retries: usize,
92 base_retry_interval: Duration,
93 max_retry_jitter: Duration,
94 put_timeout: Duration,
95 get_timeout: Duration,
96}
97
98#[derive(Debug, Clone)]
100pub struct DhtConfigBuilder {
101 config: DhtConfig,
102}
103
104impl DhtConfigBuilder {
105 pub fn retries(mut self, retries: usize) -> Self {
111 self.config.retries = retries;
112 self
113 }
114
115 pub fn base_retry_interval(mut self, interval: Duration) -> Self {
122 if interval > Duration::ZERO {
123 self.config.base_retry_interval = interval;
124 }
125 self
126 }
127
128 pub fn max_retry_jitter(mut self, jitter: Duration) -> Self {
132 self.config.max_retry_jitter = jitter;
133 self
134 }
135
136 pub fn put_timeout(mut self, timeout: Duration) -> Self {
143 if timeout > Duration::ZERO {
144 self.config.put_timeout = timeout;
145 }
146 self
147 }
148
149 pub fn get_timeout(mut self, timeout: Duration) -> Self {
156 if timeout > Duration::ZERO {
157 self.config.get_timeout = timeout;
158 }
159 self
160 }
161
162 pub fn build(self) -> DhtConfig {
164 self.config
165 }
166}
167
168impl DhtConfig {
169 pub fn builder() -> DhtConfigBuilder {
171 DhtConfigBuilder {
172 config: DhtConfig::default(),
173 }
174 }
175
176 pub fn retries(&self) -> usize {
182 self.retries
183 }
184
185 pub fn base_retry_interval(&self) -> Duration {
189 self.base_retry_interval
190 }
191
192 pub fn max_retry_jitter(&self) -> Duration {
196 self.max_retry_jitter
197 }
198
199 pub fn put_timeout(&self) -> Duration {
203 self.put_timeout
204 }
205
206 pub fn get_timeout(&self) -> Duration {
210 self.get_timeout
211 }
212}
213
214impl Default for DhtConfig {
215 fn default() -> Self {
216 Self {
217 retries: 3,
218 base_retry_interval: Duration::from_secs(5),
219 max_retry_jitter: Duration::from_secs(10),
220 put_timeout: Duration::from_secs(10),
221 get_timeout: Duration::from_secs(10),
222 }
223 }
224}
225
226#[derive(Debug, Clone)]
228pub enum BubbleMergeConfig {
229 Enabled(BubbleMergeConfigInner),
230 Disabled,
231}
232
233#[derive(Debug, Clone)]
234pub struct BubbleMergeConfigInner {
235 base_interval: Duration,
236 max_jitter: Duration,
237 min_neighbors: usize,
238 fail_topic_creation_on_merge_startup_failure: bool,
239}
240
241#[derive(Debug, Clone)]
242pub struct BubbleMergeConfigBuilder {
243 config: BubbleMergeConfigInner,
244}
245
246impl Default for BubbleMergeConfig {
247 fn default() -> Self {
248 Self::Enabled(BubbleMergeConfigInner::default())
249 }
250}
251
252impl Default for BubbleMergeConfigInner {
253 fn default() -> Self {
254 Self {
255 base_interval: Duration::from_secs(60),
256 max_jitter: Duration::from_secs(120),
257 min_neighbors: 4,
258 fail_topic_creation_on_merge_startup_failure: true,
259 }
260 }
261}
262
263impl BubbleMergeConfig {
264 pub fn builder() -> BubbleMergeConfigBuilder {
266 BubbleMergeConfigBuilder {
267 config: BubbleMergeConfigInner::default(),
268 }
269 }
270}
271
272impl BubbleMergeConfigInner {
273 pub fn base_interval(&self) -> Duration {
279 self.base_interval
280 }
281
282 pub fn max_jitter(&self) -> Duration {
286 self.max_jitter
287 }
288
289 pub fn min_neighbors(&self) -> usize {
293 self.min_neighbors
294 }
295
296 pub fn fail_topic_creation_on_merge_startup_failure(&self) -> bool {
302 self.fail_topic_creation_on_merge_startup_failure
303 }
304}
305
306impl BubbleMergeConfigBuilder {
307 pub fn base_interval(mut self, interval: Duration) -> Self {
314 if interval > Duration::ZERO {
315 self.config.base_interval = interval;
316 }
317 self
318 }
319
320 pub fn max_jitter(mut self, jitter: Duration) -> Self {
324 self.config.max_jitter = jitter;
325 self
326 }
327
328 pub fn min_neighbors(mut self, min_neighbors: usize) -> Self {
332 self.config.min_neighbors = min_neighbors;
333 self
334 }
335
336 pub fn fail_topic_creation_on_merge_startup_failure(mut self, fail: bool) -> Self {
342 self.config.fail_topic_creation_on_merge_startup_failure = fail;
343 self
344 }
345
346 pub fn build(self) -> BubbleMergeConfig {
348 BubbleMergeConfig::Enabled(self.config)
349 }
350}
351
352#[derive(Debug, Clone)]
354pub enum MessageOverlapMergeConfig {
355 Enabled(MessageOverlapMergeConfigInner),
356 Disabled,
357}
358
359#[derive(Debug, Clone)]
360pub struct MessageOverlapMergeConfigInner {
361 base_interval: Duration,
362 max_jitter: Duration,
363 fail_topic_creation_on_merge_startup_failure: bool,
364}
365
366#[derive(Debug, Clone)]
367pub struct MessageOverlapMergeConfigBuilder {
368 config: MessageOverlapMergeConfigInner,
369}
370
371impl Default for MessageOverlapMergeConfigInner {
372 fn default() -> Self {
373 Self {
374 base_interval: Duration::from_secs(60),
375 max_jitter: Duration::from_secs(120),
376 fail_topic_creation_on_merge_startup_failure: true,
377 }
378 }
379}
380
381impl Default for MessageOverlapMergeConfig {
382 fn default() -> Self {
383 Self::Enabled(MessageOverlapMergeConfigInner::default())
384 }
385}
386
387impl MessageOverlapMergeConfig {
388 pub fn builder() -> MessageOverlapMergeConfigBuilder {
390 MessageOverlapMergeConfigBuilder {
391 config: MessageOverlapMergeConfigInner::default(),
392 }
393 }
394}
395
396impl MessageOverlapMergeConfigInner {
397 pub fn base_interval(&self) -> Duration {
403 self.base_interval
404 }
405
406 pub fn max_jitter(&self) -> Duration {
410 self.max_jitter
411 }
412
413 pub fn fail_topic_creation_on_merge_startup_failure(&self) -> bool {
419 self.fail_topic_creation_on_merge_startup_failure
420 }
421}
422
423impl MessageOverlapMergeConfigBuilder {
424 pub fn base_interval(mut self, interval: Duration) -> Self {
431 if interval > Duration::ZERO {
432 self.config.base_interval = interval;
433 }
434 self
435 }
436
437 pub fn max_jitter(mut self, jitter: Duration) -> Self {
441 self.config.max_jitter = jitter;
442 self
443 }
444
445 pub fn fail_topic_creation_on_merge_startup_failure(mut self, fail: bool) -> Self {
451 self.config.fail_topic_creation_on_merge_startup_failure = fail;
452 self
453 }
454
455 pub fn build(self) -> MessageOverlapMergeConfig {
457 MessageOverlapMergeConfig::Enabled(self.config)
458 }
459}
460
461#[derive(Debug, Clone)]
463pub enum PublisherConfig {
464 Enabled(PublisherConfigInner),
465 Disabled,
466}
467
468#[derive(Debug, Clone)]
469pub struct PublisherConfigInner {
470 initial_delay: Duration,
471 base_interval: Duration,
472 max_jitter: Duration,
473 fail_topic_creation_on_publishing_startup_failure: bool,
474}
475
476#[derive(Debug, Clone)]
477pub struct PublisherConfigBuilder {
478 config: PublisherConfigInner,
479}
480
481impl Default for PublisherConfigInner {
482 fn default() -> Self {
483 Self {
484 initial_delay: Duration::from_secs(10),
485 base_interval: Duration::from_secs(10),
486 max_jitter: Duration::from_secs(50),
487 fail_topic_creation_on_publishing_startup_failure: true,
488 }
489 }
490}
491
492impl Default for PublisherConfig {
493 fn default() -> Self {
494 Self::Enabled(PublisherConfigInner::default())
495 }
496}
497
498impl PublisherConfig {
499 pub fn builder() -> PublisherConfigBuilder {
501 PublisherConfigBuilder {
502 config: PublisherConfigInner::default(),
503 }
504 }
505}
506
507impl PublisherConfigInner {
508 pub fn initial_delay(&self) -> Duration {
512 self.initial_delay
513 }
514
515 pub fn base_interval(&self) -> Duration {
521 self.base_interval
522 }
523
524 pub fn max_jitter(&self) -> Duration {
528 self.max_jitter
529 }
530
531 pub fn fail_topic_creation_on_publishing_startup_failure(&self) -> bool {
537 self.fail_topic_creation_on_publishing_startup_failure
538 }
539}
540
541impl PublisherConfigBuilder {
542 pub fn initial_delay(mut self, delay: Duration) -> Self {
546 self.config.initial_delay = delay;
547 self
548 }
549
550 pub fn base_interval(mut self, interval: Duration) -> Self {
557 if interval > Duration::ZERO {
558 self.config.base_interval = interval;
559 }
560 self
561 }
562
563 pub fn max_jitter(mut self, jitter: Duration) -> Self {
567 self.config.max_jitter = jitter;
568 self
569 }
570
571 pub fn fail_topic_creation_on_publishing_startup_failure(mut self, fail: bool) -> Self {
577 self.config
578 .fail_topic_creation_on_publishing_startup_failure = fail;
579 self
580 }
581
582 pub fn build(self) -> PublisherConfig {
584 PublisherConfig::Enabled(self.config)
585 }
586}
587
588#[derive(Debug, Clone, Default)]
592pub struct MergeConfig {
593 bubble_merge: BubbleMergeConfig,
594 message_overlap_merge: MessageOverlapMergeConfig,
595}
596
597#[derive(Debug, Clone)]
599pub struct MergeConfigBuilder {
600 config: MergeConfig,
601}
602
603impl MergeConfig {
604 pub fn builder() -> MergeConfigBuilder {
606 MergeConfigBuilder {
607 config: MergeConfig::default(),
608 }
609 }
610
611 pub fn bubble_merge(&self) -> &BubbleMergeConfig {
615 &self.bubble_merge
616 }
617
618 pub fn message_overlap_merge(&self) -> &MessageOverlapMergeConfig {
622 &self.message_overlap_merge
623 }
624}
625
626impl MergeConfigBuilder {
627 pub fn bubble_merge(mut self, bubble_merge: BubbleMergeConfig) -> Self {
631 self.config.bubble_merge = bubble_merge;
632 self
633 }
634
635 pub fn message_overlap_merge(
639 mut self,
640 message_overlap_merge: MessageOverlapMergeConfig,
641 ) -> Self {
642 self.config.message_overlap_merge = message_overlap_merge;
643 self
644 }
645
646 pub fn build(self) -> MergeConfig {
648 self.config
649 }
650}
651
652#[derive(Debug, Clone)]
654pub struct BootstrapConfig {
655 max_bootstrap_records: usize,
656 no_peers_retry_interval: Duration,
657 per_peer_join_settle_time: Duration,
658 join_confirmation_wait_time: Duration,
659 discovery_poll_interval: Duration,
660 publish_record_on_startup: bool,
661 check_older_records_first_on_startup: bool,
662}
663
664impl Default for BootstrapConfig {
665 fn default() -> Self {
666 Self {
667 max_bootstrap_records: 5,
668 no_peers_retry_interval: Duration::from_millis(1500),
669 per_peer_join_settle_time: Duration::from_millis(100),
670 join_confirmation_wait_time: Duration::from_millis(500),
671 discovery_poll_interval: Duration::from_millis(2000),
672 publish_record_on_startup: true,
673 check_older_records_first_on_startup: false,
674 }
675 }
676}
677
678#[derive(Debug)]
680pub struct BootstrapConfigBuilder {
681 config: BootstrapConfig,
682}
683
684impl BootstrapConfigBuilder {
685 pub fn max_bootstrap_records(mut self, max_records: usize) -> Self {
691 self.config.max_bootstrap_records = max_records;
692 self
693 }
694
695 pub fn no_peers_retry_interval(mut self, interval: Duration) -> Self {
699 self.config.no_peers_retry_interval = interval;
700 self
701 }
702
703 pub fn per_peer_join_settle_time(mut self, interval: Duration) -> Self {
707 self.config.per_peer_join_settle_time = interval;
708 self
709 }
710
711 pub fn join_confirmation_wait_time(mut self, interval: Duration) -> Self {
715 self.config.join_confirmation_wait_time = interval;
716 self
717 }
718
719 pub fn discovery_poll_interval(mut self, interval: Duration) -> Self {
723 self.config.discovery_poll_interval = interval;
724 self
725 }
726
727 pub fn publish_record_on_startup(mut self, publish: bool) -> Self {
731 self.config.publish_record_on_startup = publish;
732 self
733 }
734
735 pub fn check_older_records_first_on_startup(mut self, check: bool) -> Self {
745 self.config.check_older_records_first_on_startup = check;
746 self
747 }
748
749 pub fn build(self) -> BootstrapConfig {
751 self.config
752 }
753}
754
755impl BootstrapConfig {
756 pub fn builder() -> BootstrapConfigBuilder {
758 BootstrapConfigBuilder {
759 config: BootstrapConfig::default(),
760 }
761 }
762
763 pub fn max_bootstrap_records(&self) -> usize {
769 self.max_bootstrap_records
770 }
771
772 pub fn no_peers_retry_interval(&self) -> Duration {
776 self.no_peers_retry_interval
777 }
778
779 pub fn per_peer_join_settle_time(&self) -> Duration {
783 self.per_peer_join_settle_time
784 }
785
786 pub fn join_confirmation_wait_time(&self) -> Duration {
790 self.join_confirmation_wait_time
791 }
792
793 pub fn discovery_poll_interval(&self) -> Duration {
797 self.discovery_poll_interval
798 }
799
800 pub fn publish_record_on_startup(&self) -> bool {
804 self.publish_record_on_startup
805 }
806
807 pub fn check_older_records_first_on_startup(&self) -> bool {
817 self.check_older_records_first_on_startup
818 }
819}
820
821#[derive(Debug, Clone)]
823pub struct Config {
824 bootstrap_config: BootstrapConfig,
825 publisher_config: PublisherConfig,
826 dht_config: DhtConfig,
827
828 merge_config: MergeConfig,
829
830 max_join_peer_count: usize,
831 timeouts: TimeoutConfig,
832}
833
834impl Config {
835 pub fn builder() -> ConfigBuilder {
837 ConfigBuilder {
838 config: Config::default(),
839 }
840 }
841
842 pub fn publisher_config(&self) -> &PublisherConfig {
846 &self.publisher_config
847 }
848
849 pub fn dht_config(&self) -> &DhtConfig {
853 &self.dht_config
854 }
855
856 pub fn bootstrap_config(&self) -> &BootstrapConfig {
860 &self.bootstrap_config
861 }
862
863 pub fn max_join_peer_count(&self) -> usize {
869 self.max_join_peer_count
870 }
871
872 pub fn timeouts(&self) -> &TimeoutConfig {
876 &self.timeouts
877 }
878
879 pub fn merge_config(&self) -> &MergeConfig {
883 &self.merge_config
884 }
885}
886
887impl Default for Config {
888 fn default() -> Self {
889 Self {
890 merge_config: MergeConfig::default(),
891 bootstrap_config: BootstrapConfig::default(),
892 publisher_config: PublisherConfig::default(),
893 dht_config: DhtConfig::default(),
894 max_join_peer_count: 4,
895 timeouts: TimeoutConfig::default(),
896 }
897 }
898}
899
900#[derive(Debug)]
902pub struct ConfigBuilder {
903 config: Config,
904}
905
906impl ConfigBuilder {
907 pub fn merge_config(mut self, merge_config: MergeConfig) -> Self {
911 self.config.merge_config = merge_config;
912 self
913 }
914
915 pub fn publisher_config(mut self, publisher_config: PublisherConfig) -> Self {
919 self.config.publisher_config = publisher_config;
920 self
921 }
922
923 pub fn dht_config(mut self, dht_config: DhtConfig) -> Self {
927 self.config.dht_config = dht_config;
928 self
929 }
930
931 pub fn bootstrap_config(mut self, bootstrap_config: BootstrapConfig) -> Self {
935 self.config.bootstrap_config = bootstrap_config;
936 self
937 }
938
939 pub fn max_join_peer_count(mut self, max_peers: usize) -> Self {
946 if max_peers > 0 {
947 self.config.max_join_peer_count = max_peers;
948 }
949 self
950 }
951
952 pub fn timeouts(mut self, timeouts: TimeoutConfig) -> Self {
956 self.config.timeouts = timeouts;
957 self
958 }
959
960 pub fn build(self) -> Config {
964 let mut config = self.config;
965 if config.bootstrap_config.max_bootstrap_records == 0
966 && matches!(config.publisher_config, PublisherConfig::Enabled(_))
967 {
968 tracing::warn!(
970 "Publisher is enabled via PublisherConfig::Enabled(_) but BootstrapConfig.max_bootstrap_records is set to 0 (we effectively never publish). Overriding PublisherConfig to PublisherConfig::Disabled."
971 );
972 config.publisher_config = PublisherConfig::Disabled;
973 }
974
975 config
976 }
977}