p2panda_net/sync/
config.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// SPDX-License-Identifier: MIT OR Apache-2.0

use std::sync::Arc;

use tokio::time::Duration;

use p2panda_sync::{SyncProtocol, TopicQuery};

const MAX_CONCURRENT_SYNC_SESSIONS: usize = 128;
const MAX_RETRY_ATTEMPTS: u8 = 5;
const RESYNC_INTERVAL: Duration = Duration::from_secs(60);
const RESYNC_POLL_INTERVAL: Duration = Duration::from_secs(1);
const SYNC_QUEUE_SEND_TIMEOUT: Duration = Duration::from_millis(100);

/// Configuration parameters for resync behaviour.
#[derive(Clone, Debug)]
pub struct ResyncConfiguration {
    /// Minimum interval between resync attempts for a single peer-topic combination.
    ///
    /// Default: 60 seconds.
    pub(crate) interval: Duration,

    /// Minimum interval between each poll of the resync queue.
    ///
    /// Default: 1 second.
    pub(crate) poll_interval: Duration,
}

impl ResyncConfiguration {
    /// Return a default instance of `ResyncConfiguration`.
    pub fn new() -> Self {
        Default::default()
    }

    /// Define the minimum number of seconds between resync attempts for a single peer-topic
    /// combination.
    pub fn interval(mut self, seconds: u64) -> Self {
        self.interval = Duration::from_secs(seconds);
        self
    }

    /// Define the minimum number of seconds between poll of the resync queue.
    pub fn poll_interval(mut self, seconds: u64) -> Self {
        self.poll_interval = Duration::from_secs(seconds);
        self
    }
}

impl Default for ResyncConfiguration {
    fn default() -> Self {
        ResyncConfiguration {
            interval: RESYNC_INTERVAL,
            poll_interval: RESYNC_POLL_INTERVAL,
        }
    }
}

/// Configuration parameters for data synchronisation between peers.
#[derive(Clone, Debug)]
pub struct SyncConfiguration<T> {
    protocol: Arc<dyn for<'a> SyncProtocol<'a, T> + 'static>,

    /// Resync configuration (`None` represents no resync).
    pub(crate) resync: Option<ResyncConfiguration>,

    /// Maximum number of concurrent sync sessions.
    ///
    /// Default: 128.
    pub(crate) max_concurrent_sync_sessions: usize,

    /// Maximum number of attempts at successfully completing a sync session with a specific peer.
    ///
    /// Default: 5.
    pub(crate) max_retry_attempts: u8,

    /// Maximum time to wait for sync attempt queue to have an open slot before failing.
    ///
    /// Default: 100 milliseconds.
    pub(crate) sync_queue_send_timeout: Duration,
}

impl<T> SyncConfiguration<T>
where
    T: TopicQuery,
{
    /// Return a default instance of `SyncConfiguration`.
    pub fn new(protocol: impl for<'a> SyncProtocol<'a, T> + 'static) -> Self {
        Self {
            protocol: Arc::new(protocol),
            max_concurrent_sync_sessions: MAX_CONCURRENT_SYNC_SESSIONS,
            max_retry_attempts: MAX_RETRY_ATTEMPTS,
            resync: None,
            sync_queue_send_timeout: SYNC_QUEUE_SEND_TIMEOUT,
        }
    }

    /// Define the maximum number of concurrent sync sessions.
    pub fn max_concurrent_sync_sessions(mut self, sessions: usize) -> Self {
        self.max_concurrent_sync_sessions = sessions;
        self
    }

    /// Define the maximum number of attempts at successfully completing a sync session with a
    /// specific peer.
    pub fn max_retry_attempts(mut self, attempts: u8) -> Self {
        self.max_retry_attempts = attempts;
        self
    }

    /// Return the sync protocol from the given configuration.
    pub fn protocol(&self) -> Arc<dyn for<'a> SyncProtocol<'a, T>> {
        self.protocol.clone()
    }

    /// Provide the resync configuration for the sync scheduler.
    pub fn resync(mut self, config: ResyncConfiguration) -> Self {
        self.resync = Some(config);
        self
    }

    /// Is resync enabled?
    pub fn is_resync(&mut self) -> bool {
        self.resync.is_some()
    }

    /// Define the maximum number of seconds to wait for sync attempt queue to have an open slot
    /// before failing.
    pub fn sync_queue_send_timeout(mut self, seconds: u64) -> Self {
        self.sync_queue_send_timeout = Duration::from_secs(seconds);
        self
    }
}