distributed_topic_tracker/
config.rs1use std::time::Duration;
2
3#[derive(Debug, Clone)]
5pub struct TimeoutConfig {
6 join_peer_timeout: Duration,
7 broadcast_timeout: Duration,
8 broadcast_neighbors_timeout: Duration,
9}
10
11impl TimeoutConfig {
12 pub fn builder() -> TimeoutConfigBuilder {
14 TimeoutConfigBuilder {
15 timeouts: TimeoutConfig::default(),
16 }
17 }
18
19 pub fn join_peer_timeout(&self) -> Duration {
23 self.join_peer_timeout
24 }
25
26 pub fn broadcast_timeout(&self) -> Duration {
30 self.broadcast_timeout
31 }
32
33 pub fn broadcast_neighbors_timeout(&self) -> Duration {
37 self.broadcast_neighbors_timeout
38 }
39}
40
41impl Default for TimeoutConfig {
42 fn default() -> Self {
43 Self {
44 join_peer_timeout: Duration::from_secs(5),
45 broadcast_timeout: Duration::from_secs(5),
46 broadcast_neighbors_timeout: Duration::from_secs(5),
47 }
48 }
49}
50
51#[derive(Debug)]
53pub struct TimeoutConfigBuilder {
54 timeouts: TimeoutConfig,
55}
56
57impl TimeoutConfigBuilder {
58 pub fn join_peer_timeout(mut self, timeout: Duration) -> Self {
62 self.timeouts.join_peer_timeout = timeout;
63 self
64 }
65
66 pub fn broadcast_timeout(mut self, timeout: Duration) -> Self {
70 self.timeouts.broadcast_timeout = timeout;
71 self
72 }
73
74 pub fn broadcast_neighbors_timeout(mut self, timeout: Duration) -> Self {
78 self.timeouts.broadcast_neighbors_timeout = timeout;
79 self
80 }
81
82 pub fn build(self) -> TimeoutConfig {
84 self.timeouts
85 }
86}
87
88#[derive(Debug, Clone)]
90pub struct DhtConfig {
91 retries: usize,
92 base_retry_interval: Duration,
93 max_retry_jitter: Duration,
94 put_timeout: Duration,
95 get_timeout: Duration,
96}
97
98#[derive(Debug, Clone)]
100pub struct DhtConfigBuilder {
101 config: DhtConfig,
102}
103
104impl DhtConfigBuilder {
105 pub fn retries(mut self, retries: usize) -> Self {
111 self.config.retries = retries;
112 self
113 }
114
115 pub fn base_retry_interval(mut self, interval: Duration) -> Self {
122 if interval > Duration::ZERO {
123 self.config.base_retry_interval = interval;
124 }
125 self
126 }
127
128 pub fn max_retry_jitter(mut self, jitter: Duration) -> Self {
132 self.config.max_retry_jitter = jitter;
133 self
134 }
135
136 pub fn put_timeout(mut self, timeout: Duration) -> Self {
143 if timeout > Duration::ZERO {
144 self.config.put_timeout = timeout;
145 }
146 self
147 }
148
149 pub fn get_timeout(mut self, timeout: Duration) -> Self {
156 if timeout > Duration::ZERO {
157 self.config.get_timeout = timeout;
158 }
159 self
160 }
161
162 pub fn build(self) -> DhtConfig {
164 self.config
165 }
166}
167
168impl DhtConfig {
169 pub fn builder() -> DhtConfigBuilder {
171 DhtConfigBuilder {
172 config: DhtConfig::default(),
173 }
174 }
175
176 pub fn retries(&self) -> usize {
182 self.retries
183 }
184
185 pub fn base_retry_interval(&self) -> Duration {
189 self.base_retry_interval
190 }
191
192 pub fn max_retry_jitter(&self) -> Duration {
196 self.max_retry_jitter
197 }
198
199 pub fn put_timeout(&self) -> Duration {
203 self.put_timeout
204 }
205
206 pub fn get_timeout(&self) -> Duration {
210 self.get_timeout
211 }
212}
213
214impl Default for DhtConfig {
215 fn default() -> Self {
216 Self {
217 retries: 3,
218 base_retry_interval: Duration::from_secs(5),
219 max_retry_jitter: Duration::from_secs(10),
220 put_timeout: Duration::from_secs(10),
221 get_timeout: Duration::from_secs(10),
222 }
223 }
224}
225
226#[derive(Debug, Clone)]
228pub enum BubbleMergeConfig {
229 Enabled(BubbleMergeConfigInner),
230 Disabled,
231}
232
233#[derive(Debug, Clone)]
234pub struct BubbleMergeConfigInner {
235 initial_interval: Duration,
236 base_interval: Duration,
237 max_jitter: Duration,
238 min_neighbors: usize,
239 fail_topic_creation_on_merge_startup_failure: bool,
240 max_join_peers: usize,
241}
242
243#[derive(Debug, Clone)]
244pub struct BubbleMergeConfigBuilder {
245 config: BubbleMergeConfigInner,
246}
247
248impl Default for BubbleMergeConfig {
249 fn default() -> Self {
250 Self::Enabled(BubbleMergeConfigInner::default())
251 }
252}
253
254impl Default for BubbleMergeConfigInner {
255 fn default() -> Self {
256 Self {
257 initial_interval: Duration::from_secs(30),
258 base_interval: Duration::from_secs(60),
259 max_jitter: Duration::from_secs(120),
260 min_neighbors: 4,
261 fail_topic_creation_on_merge_startup_failure: true,
262 max_join_peers: 2,
263 }
264 }
265}
266
267impl BubbleMergeConfig {
268 pub fn builder() -> BubbleMergeConfigBuilder {
270 BubbleMergeConfigBuilder {
271 config: BubbleMergeConfigInner::default(),
272 }
273 }
274}
275
276impl BubbleMergeConfigInner {
277 pub fn initial_interval(&self) -> Duration {
281 self.initial_interval
282 }
283
284 pub fn base_interval(&self) -> Duration {
290 self.base_interval
291 }
292
293 pub fn max_jitter(&self) -> Duration {
297 self.max_jitter
298 }
299
300 pub fn min_neighbors(&self) -> usize {
304 self.min_neighbors
305 }
306
307 pub fn fail_topic_creation_on_merge_startup_failure(&self) -> bool {
313 self.fail_topic_creation_on_merge_startup_failure
314 }
315
316 pub fn max_join_peers(&self) -> usize {
320 self.max_join_peers
321 }
322}
323
324impl BubbleMergeConfigBuilder {
325 pub fn initial_interval(mut self, interval: Duration) -> Self {
329 self.config.initial_interval = interval;
330 self
331 }
332
333 pub fn base_interval(mut self, interval: Duration) -> Self {
340 if interval > Duration::ZERO {
341 self.config.base_interval = interval;
342 }
343 self
344 }
345
346 pub fn max_jitter(mut self, jitter: Duration) -> Self {
350 self.config.max_jitter = jitter;
351 self
352 }
353
354 pub fn min_neighbors(mut self, min_neighbors: usize) -> Self {
358 self.config.min_neighbors = min_neighbors;
359 self
360 }
361
362 pub fn fail_topic_creation_on_merge_startup_failure(mut self, fail: bool) -> Self {
368 self.config.fail_topic_creation_on_merge_startup_failure = fail;
369 self
370 }
371
372 pub fn max_join_peers(mut self, max_join_peers: usize) -> Self {
379 if max_join_peers > 0 {
380 self.config.max_join_peers = max_join_peers;
381 }
382 self
383 }
384
385 pub fn build(self) -> BubbleMergeConfig {
387 BubbleMergeConfig::Enabled(self.config)
388 }
389}
390
391#[derive(Debug, Clone)]
393pub enum MessageOverlapMergeConfig {
394 Enabled(MessageOverlapMergeConfigInner),
395 Disabled,
396}
397
398#[derive(Debug, Clone)]
399pub struct MessageOverlapMergeConfigInner {
400 initial_interval: Duration,
401 base_interval: Duration,
402 max_jitter: Duration,
403 fail_topic_creation_on_merge_startup_failure: bool,
404 max_join_peers: usize,
405}
406
407#[derive(Debug, Clone)]
408pub struct MessageOverlapMergeConfigBuilder {
409 config: MessageOverlapMergeConfigInner,
410}
411
412impl Default for MessageOverlapMergeConfigInner {
413 fn default() -> Self {
414 Self {
415 initial_interval: Duration::from_secs(30),
416 base_interval: Duration::from_secs(60),
417 max_jitter: Duration::from_secs(120),
418 fail_topic_creation_on_merge_startup_failure: true,
419 max_join_peers: 2,
420 }
421 }
422}
423
424impl Default for MessageOverlapMergeConfig {
425 fn default() -> Self {
426 Self::Enabled(MessageOverlapMergeConfigInner::default())
427 }
428}
429
430impl MessageOverlapMergeConfig {
431 pub fn builder() -> MessageOverlapMergeConfigBuilder {
433 MessageOverlapMergeConfigBuilder {
434 config: MessageOverlapMergeConfigInner::default(),
435 }
436 }
437}
438
439impl MessageOverlapMergeConfigInner {
440 pub fn initial_interval(&self) -> Duration {
444 self.initial_interval
445 }
446
447 pub fn base_interval(&self) -> Duration {
453 self.base_interval
454 }
455
456 pub fn max_jitter(&self) -> Duration {
460 self.max_jitter
461 }
462
463 pub fn fail_topic_creation_on_merge_startup_failure(&self) -> bool {
469 self.fail_topic_creation_on_merge_startup_failure
470 }
471
472 pub fn max_join_peers(&self) -> usize {
476 self.max_join_peers
477 }
478}
479
480impl MessageOverlapMergeConfigBuilder {
481 pub fn initial_interval(mut self, interval: Duration) -> Self {
485 self.config.initial_interval = interval;
486 self
487 }
488
489 pub fn base_interval(mut self, interval: Duration) -> Self {
496 if interval > Duration::ZERO {
497 self.config.base_interval = interval;
498 }
499 self
500 }
501
502 pub fn max_jitter(mut self, jitter: Duration) -> Self {
506 self.config.max_jitter = jitter;
507 self
508 }
509
510 pub fn fail_topic_creation_on_merge_startup_failure(mut self, fail: bool) -> Self {
516 self.config.fail_topic_creation_on_merge_startup_failure = fail;
517 self
518 }
519
520 pub fn max_join_peers(mut self, max_join_peers: usize) -> Self {
527 if max_join_peers > 0 {
528 self.config.max_join_peers = max_join_peers;
529 }
530 self
531 }
532
533 pub fn build(self) -> MessageOverlapMergeConfig {
535 MessageOverlapMergeConfig::Enabled(self.config)
536 }
537}
538
539#[derive(Debug, Clone)]
541pub enum PublisherConfig {
542 Enabled(PublisherConfigInner),
543 Disabled,
544}
545
546#[derive(Debug, Clone)]
547pub struct PublisherConfigInner {
548 initial_delay: Duration,
549 base_interval: Duration,
550 max_jitter: Duration,
551 fail_topic_creation_on_publishing_startup_failure: bool,
552}
553
554#[derive(Debug, Clone)]
555pub struct PublisherConfigBuilder {
556 config: PublisherConfigInner,
557}
558
559impl Default for PublisherConfigInner {
560 fn default() -> Self {
561 Self {
562 initial_delay: Duration::from_secs(10),
563 base_interval: Duration::from_secs(10),
564 max_jitter: Duration::from_secs(50),
565 fail_topic_creation_on_publishing_startup_failure: true,
566 }
567 }
568}
569
570impl Default for PublisherConfig {
571 fn default() -> Self {
572 Self::Enabled(PublisherConfigInner::default())
573 }
574}
575
576impl PublisherConfig {
577 pub fn builder() -> PublisherConfigBuilder {
579 PublisherConfigBuilder {
580 config: PublisherConfigInner::default(),
581 }
582 }
583}
584
585impl PublisherConfigInner {
586 pub fn initial_delay(&self) -> Duration {
590 self.initial_delay
591 }
592
593 pub fn base_interval(&self) -> Duration {
599 self.base_interval
600 }
601
602 pub fn max_jitter(&self) -> Duration {
606 self.max_jitter
607 }
608
609 pub fn fail_topic_creation_on_publishing_startup_failure(&self) -> bool {
615 self.fail_topic_creation_on_publishing_startup_failure
616 }
617}
618
619impl PublisherConfigBuilder {
620 pub fn initial_delay(mut self, delay: Duration) -> Self {
624 self.config.initial_delay = delay;
625 self
626 }
627
628 pub fn base_interval(mut self, interval: Duration) -> Self {
635 if interval > Duration::ZERO {
636 self.config.base_interval = interval;
637 }
638 self
639 }
640
641 pub fn max_jitter(mut self, jitter: Duration) -> Self {
645 self.config.max_jitter = jitter;
646 self
647 }
648
649 pub fn fail_topic_creation_on_publishing_startup_failure(mut self, fail: bool) -> Self {
655 self.config
656 .fail_topic_creation_on_publishing_startup_failure = fail;
657 self
658 }
659
660 pub fn build(self) -> PublisherConfig {
662 PublisherConfig::Enabled(self.config)
663 }
664}
665
666#[derive(Debug, Clone, Default)]
670pub struct MergeConfig {
671 bubble_merge: BubbleMergeConfig,
672 message_overlap_merge: MessageOverlapMergeConfig,
673}
674
675#[derive(Debug, Clone)]
677pub struct MergeConfigBuilder {
678 config: MergeConfig,
679}
680
681impl MergeConfig {
682 pub fn builder() -> MergeConfigBuilder {
684 MergeConfigBuilder {
685 config: MergeConfig::default(),
686 }
687 }
688
689 pub fn bubble_merge(&self) -> &BubbleMergeConfig {
693 &self.bubble_merge
694 }
695
696 pub fn message_overlap_merge(&self) -> &MessageOverlapMergeConfig {
700 &self.message_overlap_merge
701 }
702}
703
704impl MergeConfigBuilder {
705 pub fn bubble_merge(mut self, bubble_merge: BubbleMergeConfig) -> Self {
709 self.config.bubble_merge = bubble_merge;
710 self
711 }
712
713 pub fn message_overlap_merge(
717 mut self,
718 message_overlap_merge: MessageOverlapMergeConfig,
719 ) -> Self {
720 self.config.message_overlap_merge = message_overlap_merge;
721 self
722 }
723
724 pub fn build(self) -> MergeConfig {
726 self.config
727 }
728}
729
730#[derive(Debug, Clone)]
732pub struct BootstrapConfig {
733 max_bootstrap_records: usize,
734 no_peers_retry_interval: Duration,
735 per_peer_join_settle_time: Duration,
736 join_confirmation_wait_time: Duration,
737 discovery_poll_interval: Duration,
738 publish_record_on_startup: bool,
739 check_older_records_first_on_startup: bool,
740}
741
742impl Default for BootstrapConfig {
743 fn default() -> Self {
744 Self {
745 max_bootstrap_records: 5,
746 no_peers_retry_interval: Duration::from_millis(1500),
747 per_peer_join_settle_time: Duration::from_millis(100),
748 join_confirmation_wait_time: Duration::from_millis(500),
749 discovery_poll_interval: Duration::from_millis(2000),
750 publish_record_on_startup: true,
751 check_older_records_first_on_startup: false,
752 }
753 }
754}
755
756#[derive(Debug)]
758pub struct BootstrapConfigBuilder {
759 config: BootstrapConfig,
760}
761
762impl BootstrapConfigBuilder {
763 pub fn max_bootstrap_records(mut self, max_records: usize) -> Self {
769 self.config.max_bootstrap_records = max_records;
770 self
771 }
772
773 pub fn no_peers_retry_interval(mut self, interval: Duration) -> Self {
777 self.config.no_peers_retry_interval = interval;
778 self
779 }
780
781 pub fn per_peer_join_settle_time(mut self, interval: Duration) -> Self {
785 self.config.per_peer_join_settle_time = interval;
786 self
787 }
788
789 pub fn join_confirmation_wait_time(mut self, interval: Duration) -> Self {
793 self.config.join_confirmation_wait_time = interval;
794 self
795 }
796
797 pub fn discovery_poll_interval(mut self, interval: Duration) -> Self {
801 self.config.discovery_poll_interval = interval;
802 self
803 }
804
805 pub fn publish_record_on_startup(mut self, publish: bool) -> Self {
809 self.config.publish_record_on_startup = publish;
810 self
811 }
812
813 pub fn check_older_records_first_on_startup(mut self, check: bool) -> Self {
823 self.config.check_older_records_first_on_startup = check;
824 self
825 }
826
827 pub fn build(self) -> BootstrapConfig {
829 self.config
830 }
831}
832
833impl BootstrapConfig {
834 pub fn builder() -> BootstrapConfigBuilder {
836 BootstrapConfigBuilder {
837 config: BootstrapConfig::default(),
838 }
839 }
840
841 pub fn max_bootstrap_records(&self) -> usize {
847 self.max_bootstrap_records
848 }
849
850 pub fn no_peers_retry_interval(&self) -> Duration {
854 self.no_peers_retry_interval
855 }
856
857 pub fn per_peer_join_settle_time(&self) -> Duration {
861 self.per_peer_join_settle_time
862 }
863
864 pub fn join_confirmation_wait_time(&self) -> Duration {
868 self.join_confirmation_wait_time
869 }
870
871 pub fn discovery_poll_interval(&self) -> Duration {
875 self.discovery_poll_interval
876 }
877
878 pub fn publish_record_on_startup(&self) -> bool {
882 self.publish_record_on_startup
883 }
884
885 pub fn check_older_records_first_on_startup(&self) -> bool {
895 self.check_older_records_first_on_startup
896 }
897}
898
899#[derive(Debug, Clone)]
901pub struct Config {
902 bootstrap_config: BootstrapConfig,
903 publisher_config: PublisherConfig,
904 dht_config: DhtConfig,
905
906 merge_config: MergeConfig,
907
908 max_join_peer_count: usize,
909 timeouts: TimeoutConfig,
910}
911
912impl Config {
913 pub fn builder() -> ConfigBuilder {
915 ConfigBuilder {
916 config: Config::default(),
917 }
918 }
919
920 pub fn publisher_config(&self) -> &PublisherConfig {
924 &self.publisher_config
925 }
926
927 pub fn dht_config(&self) -> &DhtConfig {
931 &self.dht_config
932 }
933
934 pub fn bootstrap_config(&self) -> &BootstrapConfig {
938 &self.bootstrap_config
939 }
940
941 pub fn max_join_peer_count(&self) -> usize {
947 self.max_join_peer_count
948 }
949
950 pub fn timeouts(&self) -> &TimeoutConfig {
954 &self.timeouts
955 }
956
957 pub fn merge_config(&self) -> &MergeConfig {
961 &self.merge_config
962 }
963}
964
965impl Default for Config {
966 fn default() -> Self {
967 Self {
968 merge_config: MergeConfig::default(),
969 bootstrap_config: BootstrapConfig::default(),
970 publisher_config: PublisherConfig::default(),
971 dht_config: DhtConfig::default(),
972 max_join_peer_count: 4,
973 timeouts: TimeoutConfig::default(),
974 }
975 }
976}
977
978#[derive(Debug)]
980pub struct ConfigBuilder {
981 config: Config,
982}
983
984impl ConfigBuilder {
985 pub fn merge_config(mut self, merge_config: MergeConfig) -> Self {
989 self.config.merge_config = merge_config;
990 self
991 }
992
993 pub fn publisher_config(mut self, publisher_config: PublisherConfig) -> Self {
997 self.config.publisher_config = publisher_config;
998 self
999 }
1000
1001 pub fn dht_config(mut self, dht_config: DhtConfig) -> Self {
1005 self.config.dht_config = dht_config;
1006 self
1007 }
1008
1009 pub fn bootstrap_config(mut self, bootstrap_config: BootstrapConfig) -> Self {
1013 self.config.bootstrap_config = bootstrap_config;
1014 self
1015 }
1016
1017 pub fn max_join_peer_count(mut self, max_peers: usize) -> Self {
1024 if max_peers > 0 {
1025 self.config.max_join_peer_count = max_peers;
1026 }
1027 self
1028 }
1029
1030 pub fn timeouts(mut self, timeouts: TimeoutConfig) -> Self {
1034 self.config.timeouts = timeouts;
1035 self
1036 }
1037
1038 pub fn build(self) -> Config {
1042 let mut config = self.config;
1043 if config.bootstrap_config.max_bootstrap_records == 0
1044 && matches!(config.publisher_config, PublisherConfig::Enabled(_))
1045 {
1046 tracing::warn!(
1048 "Publisher is enabled via PublisherConfig::Enabled(_) but BootstrapConfig.max_bootstrap_records is set to 0 (we effectively never publish). Overriding PublisherConfig to PublisherConfig::Disabled."
1049 );
1050 config.publisher_config = PublisherConfig::Disabled;
1051 }
1052
1053 config
1054 }
1055}