use std::time::Duration;
#[derive(Debug, Clone)]
pub struct TimeoutConfig {
join_peer_timeout: Duration,
broadcast_timeout: Duration,
broadcast_neighbors_timeout: Duration,
}
impl TimeoutConfig {
pub fn builder() -> TimeoutConfigBuilder {
TimeoutConfigBuilder {
timeouts: TimeoutConfig::default(),
}
}
pub fn join_peer_timeout(&self) -> Duration {
self.join_peer_timeout
}
pub fn broadcast_timeout(&self) -> Duration {
self.broadcast_timeout
}
pub fn broadcast_neighbors_timeout(&self) -> Duration {
self.broadcast_neighbors_timeout
}
}
impl Default for TimeoutConfig {
fn default() -> Self {
Self {
join_peer_timeout: Duration::from_secs(5),
broadcast_timeout: Duration::from_secs(5),
broadcast_neighbors_timeout: Duration::from_secs(5),
}
}
}
#[derive(Debug)]
pub struct TimeoutConfigBuilder {
timeouts: TimeoutConfig,
}
impl TimeoutConfigBuilder {
pub fn join_peer_timeout(mut self, timeout: Duration) -> Self {
self.timeouts.join_peer_timeout = timeout;
self
}
pub fn broadcast_timeout(mut self, timeout: Duration) -> Self {
self.timeouts.broadcast_timeout = timeout;
self
}
pub fn broadcast_neighbors_timeout(mut self, timeout: Duration) -> Self {
self.timeouts.broadcast_neighbors_timeout = timeout;
self
}
pub fn build(self) -> TimeoutConfig {
self.timeouts
}
}
#[derive(Debug, Clone)]
pub struct DhtConfig {
retries: usize,
base_retry_interval: Duration,
max_retry_jitter: Duration,
put_timeout: Duration,
get_timeout: Duration,
}
#[derive(Debug, Clone)]
pub struct DhtConfigBuilder {
config: DhtConfig,
}
impl DhtConfigBuilder {
pub fn retries(mut self, retries: usize) -> Self {
self.config.retries = retries;
self
}
pub fn base_retry_interval(mut self, interval: Duration) -> Self {
if interval > Duration::ZERO {
self.config.base_retry_interval = interval;
}
self
}
pub fn max_retry_jitter(mut self, jitter: Duration) -> Self {
self.config.max_retry_jitter = jitter;
self
}
pub fn put_timeout(mut self, timeout: Duration) -> Self {
if timeout > Duration::ZERO {
self.config.put_timeout = timeout;
}
self
}
pub fn get_timeout(mut self, timeout: Duration) -> Self {
if timeout > Duration::ZERO {
self.config.get_timeout = timeout;
}
self
}
pub fn build(self) -> DhtConfig {
self.config
}
}
impl DhtConfig {
pub fn builder() -> DhtConfigBuilder {
DhtConfigBuilder {
config: DhtConfig::default(),
}
}
pub fn retries(&self) -> usize {
self.retries
}
pub fn base_retry_interval(&self) -> Duration {
self.base_retry_interval
}
pub fn max_retry_jitter(&self) -> Duration {
self.max_retry_jitter
}
pub fn put_timeout(&self) -> Duration {
self.put_timeout
}
pub fn get_timeout(&self) -> Duration {
self.get_timeout
}
}
impl Default for DhtConfig {
fn default() -> Self {
Self {
retries: 3,
base_retry_interval: Duration::from_secs(5),
max_retry_jitter: Duration::from_secs(10),
put_timeout: Duration::from_secs(10),
get_timeout: Duration::from_secs(10),
}
}
}
#[derive(Debug, Clone)]
pub enum BubbleMergeConfig {
Enabled(BubbleMergeConfigInner),
Disabled,
}
#[derive(Debug, Clone)]
pub struct BubbleMergeConfigInner {
initial_interval: Duration,
base_interval: Duration,
max_jitter: Duration,
min_neighbors: usize,
fail_topic_creation_on_merge_startup_failure: bool,
max_join_peers: usize,
}
#[derive(Debug, Clone)]
pub struct BubbleMergeConfigBuilder {
config: BubbleMergeConfigInner,
}
impl Default for BubbleMergeConfig {
fn default() -> Self {
Self::Enabled(BubbleMergeConfigInner::default())
}
}
impl Default for BubbleMergeConfigInner {
fn default() -> Self {
Self {
initial_interval: Duration::from_secs(30),
base_interval: Duration::from_secs(60),
max_jitter: Duration::from_secs(120),
min_neighbors: 4,
fail_topic_creation_on_merge_startup_failure: true,
max_join_peers: 2,
}
}
}
impl BubbleMergeConfig {
pub fn builder() -> BubbleMergeConfigBuilder {
BubbleMergeConfigBuilder {
config: BubbleMergeConfigInner::default(),
}
}
}
impl BubbleMergeConfigInner {
pub fn initial_interval(&self) -> Duration {
self.initial_interval
}
pub fn base_interval(&self) -> Duration {
self.base_interval
}
pub fn max_jitter(&self) -> Duration {
self.max_jitter
}
pub fn min_neighbors(&self) -> usize {
self.min_neighbors
}
pub fn fail_topic_creation_on_merge_startup_failure(&self) -> bool {
self.fail_topic_creation_on_merge_startup_failure
}
pub fn max_join_peers(&self) -> usize {
self.max_join_peers
}
}
impl BubbleMergeConfigBuilder {
pub fn initial_interval(mut self, interval: Duration) -> Self {
self.config.initial_interval = interval;
self
}
pub fn base_interval(mut self, interval: Duration) -> Self {
if interval > Duration::ZERO {
self.config.base_interval = interval;
}
self
}
pub fn max_jitter(mut self, jitter: Duration) -> Self {
self.config.max_jitter = jitter;
self
}
pub fn min_neighbors(mut self, min_neighbors: usize) -> Self {
self.config.min_neighbors = min_neighbors;
self
}
pub fn fail_topic_creation_on_merge_startup_failure(mut self, fail: bool) -> Self {
self.config.fail_topic_creation_on_merge_startup_failure = fail;
self
}
pub fn max_join_peers(mut self, max_join_peers: usize) -> Self {
if max_join_peers > 0 {
self.config.max_join_peers = max_join_peers;
}
self
}
pub fn build(self) -> BubbleMergeConfig {
BubbleMergeConfig::Enabled(self.config)
}
}
#[derive(Debug, Clone)]
pub enum MessageOverlapMergeConfig {
Enabled(MessageOverlapMergeConfigInner),
Disabled,
}
#[derive(Debug, Clone)]
pub struct MessageOverlapMergeConfigInner {
initial_interval: Duration,
base_interval: Duration,
max_jitter: Duration,
fail_topic_creation_on_merge_startup_failure: bool,
max_join_peers: usize,
}
#[derive(Debug, Clone)]
pub struct MessageOverlapMergeConfigBuilder {
config: MessageOverlapMergeConfigInner,
}
impl Default for MessageOverlapMergeConfigInner {
fn default() -> Self {
Self {
initial_interval: Duration::from_secs(30),
base_interval: Duration::from_secs(60),
max_jitter: Duration::from_secs(120),
fail_topic_creation_on_merge_startup_failure: true,
max_join_peers: 2,
}
}
}
impl Default for MessageOverlapMergeConfig {
fn default() -> Self {
Self::Enabled(MessageOverlapMergeConfigInner::default())
}
}
impl MessageOverlapMergeConfig {
pub fn builder() -> MessageOverlapMergeConfigBuilder {
MessageOverlapMergeConfigBuilder {
config: MessageOverlapMergeConfigInner::default(),
}
}
}
impl MessageOverlapMergeConfigInner {
pub fn initial_interval(&self) -> Duration {
self.initial_interval
}
pub fn base_interval(&self) -> Duration {
self.base_interval
}
pub fn max_jitter(&self) -> Duration {
self.max_jitter
}
pub fn fail_topic_creation_on_merge_startup_failure(&self) -> bool {
self.fail_topic_creation_on_merge_startup_failure
}
pub fn max_join_peers(&self) -> usize {
self.max_join_peers
}
}
impl MessageOverlapMergeConfigBuilder {
pub fn initial_interval(mut self, interval: Duration) -> Self {
self.config.initial_interval = interval;
self
}
pub fn base_interval(mut self, interval: Duration) -> Self {
if interval > Duration::ZERO {
self.config.base_interval = interval;
}
self
}
pub fn max_jitter(mut self, jitter: Duration) -> Self {
self.config.max_jitter = jitter;
self
}
pub fn fail_topic_creation_on_merge_startup_failure(mut self, fail: bool) -> Self {
self.config.fail_topic_creation_on_merge_startup_failure = fail;
self
}
pub fn max_join_peers(mut self, max_join_peers: usize) -> Self {
if max_join_peers > 0 {
self.config.max_join_peers = max_join_peers;
}
self
}
pub fn build(self) -> MessageOverlapMergeConfig {
MessageOverlapMergeConfig::Enabled(self.config)
}
}
#[derive(Debug, Clone)]
pub enum PublisherConfig {
Enabled(PublisherConfigInner),
Disabled,
}
#[derive(Debug, Clone)]
pub struct PublisherConfigInner {
initial_delay: Duration,
base_interval: Duration,
max_jitter: Duration,
fail_topic_creation_on_publishing_startup_failure: bool,
}
#[derive(Debug, Clone)]
pub struct PublisherConfigBuilder {
config: PublisherConfigInner,
}
impl Default for PublisherConfigInner {
fn default() -> Self {
Self {
initial_delay: Duration::from_secs(10),
base_interval: Duration::from_secs(10),
max_jitter: Duration::from_secs(50),
fail_topic_creation_on_publishing_startup_failure: true,
}
}
}
impl Default for PublisherConfig {
fn default() -> Self {
Self::Enabled(PublisherConfigInner::default())
}
}
impl PublisherConfig {
pub fn builder() -> PublisherConfigBuilder {
PublisherConfigBuilder {
config: PublisherConfigInner::default(),
}
}
}
impl PublisherConfigInner {
pub fn initial_delay(&self) -> Duration {
self.initial_delay
}
pub fn base_interval(&self) -> Duration {
self.base_interval
}
pub fn max_jitter(&self) -> Duration {
self.max_jitter
}
pub fn fail_topic_creation_on_publishing_startup_failure(&self) -> bool {
self.fail_topic_creation_on_publishing_startup_failure
}
}
impl PublisherConfigBuilder {
pub fn initial_delay(mut self, delay: Duration) -> Self {
self.config.initial_delay = delay;
self
}
pub fn base_interval(mut self, interval: Duration) -> Self {
if interval > Duration::ZERO {
self.config.base_interval = interval;
}
self
}
pub fn max_jitter(mut self, jitter: Duration) -> Self {
self.config.max_jitter = jitter;
self
}
pub fn fail_topic_creation_on_publishing_startup_failure(mut self, fail: bool) -> Self {
self.config
.fail_topic_creation_on_publishing_startup_failure = fail;
self
}
pub fn build(self) -> PublisherConfig {
PublisherConfig::Enabled(self.config)
}
}
#[derive(Debug, Clone, Default)]
pub struct MergeConfig {
bubble_merge: BubbleMergeConfig,
message_overlap_merge: MessageOverlapMergeConfig,
}
#[derive(Debug, Clone)]
pub struct MergeConfigBuilder {
config: MergeConfig,
}
impl MergeConfig {
pub fn builder() -> MergeConfigBuilder {
MergeConfigBuilder {
config: MergeConfig::default(),
}
}
pub fn bubble_merge(&self) -> &BubbleMergeConfig {
&self.bubble_merge
}
pub fn message_overlap_merge(&self) -> &MessageOverlapMergeConfig {
&self.message_overlap_merge
}
}
impl MergeConfigBuilder {
pub fn bubble_merge(mut self, bubble_merge: BubbleMergeConfig) -> Self {
self.config.bubble_merge = bubble_merge;
self
}
pub fn message_overlap_merge(
mut self,
message_overlap_merge: MessageOverlapMergeConfig,
) -> Self {
self.config.message_overlap_merge = message_overlap_merge;
self
}
pub fn build(self) -> MergeConfig {
self.config
}
}
#[derive(Debug, Clone)]
pub struct BootstrapConfig {
max_bootstrap_records: usize,
no_peers_retry_interval: Duration,
per_peer_join_settle_time: Duration,
join_confirmation_wait_time: Duration,
discovery_poll_interval: Duration,
publish_record_on_startup: bool,
check_older_records_first_on_startup: bool,
}
impl Default for BootstrapConfig {
fn default() -> Self {
Self {
max_bootstrap_records: 5,
no_peers_retry_interval: Duration::from_millis(1500),
per_peer_join_settle_time: Duration::from_millis(100),
join_confirmation_wait_time: Duration::from_millis(500),
discovery_poll_interval: Duration::from_millis(2000),
publish_record_on_startup: true,
check_older_records_first_on_startup: false,
}
}
}
#[derive(Debug)]
pub struct BootstrapConfigBuilder {
config: BootstrapConfig,
}
impl BootstrapConfigBuilder {
pub fn max_bootstrap_records(mut self, max_records: usize) -> Self {
self.config.max_bootstrap_records = max_records;
self
}
pub fn no_peers_retry_interval(mut self, interval: Duration) -> Self {
self.config.no_peers_retry_interval = interval;
self
}
pub fn per_peer_join_settle_time(mut self, interval: Duration) -> Self {
self.config.per_peer_join_settle_time = interval;
self
}
pub fn join_confirmation_wait_time(mut self, interval: Duration) -> Self {
self.config.join_confirmation_wait_time = interval;
self
}
pub fn discovery_poll_interval(mut self, interval: Duration) -> Self {
self.config.discovery_poll_interval = interval;
self
}
pub fn publish_record_on_startup(mut self, publish: bool) -> Self {
self.config.publish_record_on_startup = publish;
self
}
pub fn check_older_records_first_on_startup(mut self, check: bool) -> Self {
self.config.check_older_records_first_on_startup = check;
self
}
pub fn build(self) -> BootstrapConfig {
self.config
}
}
impl BootstrapConfig {
pub fn builder() -> BootstrapConfigBuilder {
BootstrapConfigBuilder {
config: BootstrapConfig::default(),
}
}
pub fn max_bootstrap_records(&self) -> usize {
self.max_bootstrap_records
}
pub fn no_peers_retry_interval(&self) -> Duration {
self.no_peers_retry_interval
}
pub fn per_peer_join_settle_time(&self) -> Duration {
self.per_peer_join_settle_time
}
pub fn join_confirmation_wait_time(&self) -> Duration {
self.join_confirmation_wait_time
}
pub fn discovery_poll_interval(&self) -> Duration {
self.discovery_poll_interval
}
pub fn publish_record_on_startup(&self) -> bool {
self.publish_record_on_startup
}
pub fn check_older_records_first_on_startup(&self) -> bool {
self.check_older_records_first_on_startup
}
}
#[derive(Debug, Clone)]
pub struct Config {
bootstrap_config: BootstrapConfig,
publisher_config: PublisherConfig,
dht_config: DhtConfig,
merge_config: MergeConfig,
max_join_peer_count: usize,
timeouts: TimeoutConfig,
}
impl Config {
pub fn builder() -> ConfigBuilder {
ConfigBuilder {
config: Config::default(),
}
}
pub fn publisher_config(&self) -> &PublisherConfig {
&self.publisher_config
}
pub fn dht_config(&self) -> &DhtConfig {
&self.dht_config
}
pub fn bootstrap_config(&self) -> &BootstrapConfig {
&self.bootstrap_config
}
pub fn max_join_peer_count(&self) -> usize {
self.max_join_peer_count
}
pub fn timeouts(&self) -> &TimeoutConfig {
&self.timeouts
}
pub fn merge_config(&self) -> &MergeConfig {
&self.merge_config
}
}
impl Default for Config {
fn default() -> Self {
Self {
merge_config: MergeConfig::default(),
bootstrap_config: BootstrapConfig::default(),
publisher_config: PublisherConfig::default(),
dht_config: DhtConfig::default(),
max_join_peer_count: 4,
timeouts: TimeoutConfig::default(),
}
}
}
#[derive(Debug)]
pub struct ConfigBuilder {
config: Config,
}
impl ConfigBuilder {
pub fn merge_config(mut self, merge_config: MergeConfig) -> Self {
self.config.merge_config = merge_config;
self
}
pub fn publisher_config(mut self, publisher_config: PublisherConfig) -> Self {
self.config.publisher_config = publisher_config;
self
}
pub fn dht_config(mut self, dht_config: DhtConfig) -> Self {
self.config.dht_config = dht_config;
self
}
pub fn bootstrap_config(mut self, bootstrap_config: BootstrapConfig) -> Self {
self.config.bootstrap_config = bootstrap_config;
self
}
pub fn max_join_peer_count(mut self, max_peers: usize) -> Self {
if max_peers > 0 {
self.config.max_join_peer_count = max_peers;
}
self
}
pub fn timeouts(mut self, timeouts: TimeoutConfig) -> Self {
self.config.timeouts = timeouts;
self
}
pub fn build(self) -> Config {
let mut config = self.config;
if config.bootstrap_config.max_bootstrap_records == 0
&& matches!(config.publisher_config, PublisherConfig::Enabled(_))
{
tracing::warn!(
"Publisher is enabled via PublisherConfig::Enabled(_) but BootstrapConfig.max_bootstrap_records is set to 0 (we effectively never publish). Overriding PublisherConfig to PublisherConfig::Disabled."
);
config.publisher_config = PublisherConfig::Disabled;
}
config
}
}