p2panda_net/sync/config.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
// SPDX-License-Identifier: MIT OR Apache-2.0
use std::sync::Arc;
use tokio::time::Duration;
use p2panda_sync::{SyncProtocol, TopicQuery};
const MAX_CONCURRENT_SYNC_SESSIONS: usize = 128;
const MAX_RETRY_ATTEMPTS: u8 = 5;
const RESYNC_INTERVAL: Duration = Duration::from_secs(60);
const RESYNC_POLL_INTERVAL: Duration = Duration::from_secs(1);
const SYNC_QUEUE_SEND_TIMEOUT: Duration = Duration::from_millis(100);
/// Configuration parameters for resync behaviour.
#[derive(Clone, Debug)]
pub struct ResyncConfiguration {
/// Minimum interval between resync attempts for a single peer-topic combination.
///
/// Default: 60 seconds.
pub(crate) interval: Duration,
/// Minimum interval between each poll of the resync queue.
///
/// Default: 1 second.
pub(crate) poll_interval: Duration,
}
impl ResyncConfiguration {
/// Return a default instance of `ResyncConfiguration`.
pub fn new() -> Self {
Default::default()
}
/// Define the minimum number of seconds between resync attempts for a single peer-topic
/// combination.
pub fn interval(mut self, seconds: u64) -> Self {
self.interval = Duration::from_secs(seconds);
self
}
/// Define the minimum number of seconds between poll of the resync queue.
pub fn poll_interval(mut self, seconds: u64) -> Self {
self.poll_interval = Duration::from_secs(seconds);
self
}
}
impl Default for ResyncConfiguration {
fn default() -> Self {
ResyncConfiguration {
interval: RESYNC_INTERVAL,
poll_interval: RESYNC_POLL_INTERVAL,
}
}
}
/// Configuration parameters for data synchronisation between peers.
#[derive(Clone, Debug)]
pub struct SyncConfiguration<T> {
protocol: Arc<dyn for<'a> SyncProtocol<'a, T> + 'static>,
/// Resync configuration (`None` represents no resync).
pub(crate) resync: Option<ResyncConfiguration>,
/// Maximum number of concurrent sync sessions.
///
/// Default: 128.
pub(crate) max_concurrent_sync_sessions: usize,
/// Maximum number of attempts at successfully completing a sync session with a specific peer.
///
/// Default: 5.
pub(crate) max_retry_attempts: u8,
/// Maximum time to wait for sync attempt queue to have an open slot before failing.
///
/// Default: 100 milliseconds.
pub(crate) sync_queue_send_timeout: Duration,
}
impl<T> SyncConfiguration<T>
where
T: TopicQuery,
{
/// Return a default instance of `SyncConfiguration`.
pub fn new(protocol: impl for<'a> SyncProtocol<'a, T> + 'static) -> Self {
Self {
protocol: Arc::new(protocol),
max_concurrent_sync_sessions: MAX_CONCURRENT_SYNC_SESSIONS,
max_retry_attempts: MAX_RETRY_ATTEMPTS,
resync: None,
sync_queue_send_timeout: SYNC_QUEUE_SEND_TIMEOUT,
}
}
/// Define the maximum number of concurrent sync sessions.
pub fn max_concurrent_sync_sessions(mut self, sessions: usize) -> Self {
self.max_concurrent_sync_sessions = sessions;
self
}
/// Define the maximum number of attempts at successfully completing a sync session with a
/// specific peer.
pub fn max_retry_attempts(mut self, attempts: u8) -> Self {
self.max_retry_attempts = attempts;
self
}
/// Return the sync protocol from the given configuration.
pub fn protocol(&self) -> Arc<dyn for<'a> SyncProtocol<'a, T>> {
self.protocol.clone()
}
/// Provide the resync configuration for the sync scheduler.
pub fn resync(mut self, config: ResyncConfiguration) -> Self {
self.resync = Some(config);
self
}
/// Is resync enabled?
pub fn is_resync(&mut self) -> bool {
self.resync.is_some()
}
/// Define the maximum number of seconds to wait for sync attempt queue to have an open slot
/// before failing.
pub fn sync_queue_send_timeout(mut self, seconds: u64) -> Self {
self.sync_queue_send_timeout = Duration::from_secs(seconds);
self
}
}