Skip to main content

pulsedb/sync/
config.rs

1//! Sync configuration types.
2//!
3//! [`SyncConfig`] controls the behavior of the sync protocol including
4//! direction, conflict resolution, batch sizes, and retry policies.
5
6use serde::{Deserialize, Serialize};
7
8use crate::error::ValidationError;
9use crate::types::CollectiveId;
10
11// ============================================================================
12// SyncDirection
13// ============================================================================
14
15/// Direction of sync data flow.
16#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
17pub enum SyncDirection {
18    /// Only push local changes to the remote peer.
19    PushOnly,
20    /// Only pull remote changes to the local instance.
21    PullOnly,
22    /// Both push and pull (full bidirectional sync).
23    #[default]
24    Bidirectional,
25}
26
27// ============================================================================
28// ConflictResolution
29// ============================================================================
30
31/// Strategy for resolving conflicts when the same entity is modified
32/// on both peers.
33#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
34pub enum ConflictResolution {
35    /// Remote (server) changes always win on conflict.
36    #[default]
37    ServerWins,
38    /// The change with the latest timestamp wins.
39    LastWriteWins,
40}
41
42// ============================================================================
43// RetryConfig
44// ============================================================================
45
46/// Configuration for retry behavior on transient sync failures.
47///
48/// Uses exponential backoff with a configurable multiplier and cap.
49#[derive(Clone, Debug, Serialize, Deserialize)]
50pub struct RetryConfig {
51    /// Maximum number of consecutive retries before giving up.
52    pub max_retries: u32,
53
54    /// Initial backoff duration in milliseconds.
55    pub initial_backoff_ms: u64,
56
57    /// Maximum backoff duration in milliseconds (cap).
58    pub max_backoff_ms: u64,
59
60    /// Multiplier applied to backoff after each retry.
61    pub backoff_multiplier: f64,
62}
63
64impl Default for RetryConfig {
65    fn default() -> Self {
66        Self {
67            max_retries: 5,
68            initial_backoff_ms: 500,
69            max_backoff_ms: 30_000,
70            backoff_multiplier: 2.0,
71        }
72    }
73}
74
75// ============================================================================
76// SyncConfig
77// ============================================================================
78
79/// Configuration for the sync protocol.
80///
81/// Controls direction, conflict resolution, batch sizes, polling intervals,
82/// and which collectives to sync.
83///
84/// # Example
85/// ```
86/// use pulsedb::sync::config::{SyncConfig, SyncDirection};
87///
88/// let config = SyncConfig {
89///     direction: SyncDirection::PushOnly,
90///     batch_size: 200,
91///     ..Default::default()
92/// };
93/// assert!(config.validate().is_ok());
94/// ```
95#[derive(Clone, Debug, Serialize, Deserialize)]
96pub struct SyncConfig {
97    /// Direction of sync data flow.
98    pub direction: SyncDirection,
99
100    /// Strategy for resolving conflicts.
101    pub conflict_resolution: ConflictResolution,
102
103    /// Maximum number of changes per sync batch.
104    ///
105    /// Larger batches reduce round trips but increase memory usage.
106    /// Default: 500
107    pub batch_size: usize,
108
109    /// Interval between push cycles in milliseconds.
110    ///
111    /// Default: 1000 (1 second)
112    pub push_interval_ms: u64,
113
114    /// Interval between pull cycles in milliseconds.
115    ///
116    /// Default: 1000 (1 second)
117    pub pull_interval_ms: u64,
118
119    /// Retry configuration for transient failures.
120    pub retry: RetryConfig,
121
122    /// Optional filter: only sync these collectives.
123    ///
124    /// `None` means sync all collectives.
125    pub collectives: Option<Vec<CollectiveId>>,
126
127    /// Whether to sync experience relations.
128    ///
129    /// Default: true
130    pub sync_relations: bool,
131
132    /// Whether to sync derived insights.
133    ///
134    /// Default: true
135    pub sync_insights: bool,
136}
137
138impl Default for SyncConfig {
139    fn default() -> Self {
140        Self {
141            direction: SyncDirection::default(),
142            conflict_resolution: ConflictResolution::default(),
143            batch_size: 500,
144            push_interval_ms: 1000,
145            pull_interval_ms: 1000,
146            retry: RetryConfig::default(),
147            collectives: None,
148            sync_relations: true,
149            sync_insights: true,
150        }
151    }
152}
153
154impl SyncConfig {
155    /// Validates the sync configuration.
156    ///
157    /// # Errors
158    /// Returns `ValidationError` if:
159    /// - `batch_size` is 0
160    /// - `push_interval_ms` is 0
161    /// - `pull_interval_ms` is 0
162    pub fn validate(&self) -> Result<(), ValidationError> {
163        if self.batch_size == 0 {
164            return Err(ValidationError::invalid_field(
165                "batch_size",
166                "must be greater than 0",
167            ));
168        }
169        if self.push_interval_ms == 0 {
170            return Err(ValidationError::invalid_field(
171                "push_interval_ms",
172                "must be greater than 0",
173            ));
174        }
175        if self.pull_interval_ms == 0 {
176            return Err(ValidationError::invalid_field(
177                "pull_interval_ms",
178                "must be greater than 0",
179            ));
180        }
181        Ok(())
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188
189    #[test]
190    fn test_sync_config_defaults() {
191        let config = SyncConfig::default();
192        assert_eq!(config.direction, SyncDirection::Bidirectional);
193        assert_eq!(config.conflict_resolution, ConflictResolution::ServerWins);
194        assert_eq!(config.batch_size, 500);
195        assert_eq!(config.push_interval_ms, 1000);
196        assert_eq!(config.pull_interval_ms, 1000);
197        assert!(config.collectives.is_none());
198        assert!(config.sync_relations);
199        assert!(config.sync_insights);
200    }
201
202    #[test]
203    fn test_sync_config_validate_success() {
204        let config = SyncConfig::default();
205        assert!(config.validate().is_ok());
206    }
207
208    #[test]
209    fn test_sync_config_validate_zero_batch_size() {
210        let config = SyncConfig {
211            batch_size: 0,
212            ..Default::default()
213        };
214        let err = config.validate().unwrap_err();
215        assert!(
216            matches!(err, ValidationError::InvalidField { field, .. } if field == "batch_size")
217        );
218    }
219
220    #[test]
221    fn test_sync_config_validate_zero_push_interval() {
222        let config = SyncConfig {
223            push_interval_ms: 0,
224            ..Default::default()
225        };
226        let err = config.validate().unwrap_err();
227        assert!(
228            matches!(err, ValidationError::InvalidField { field, .. } if field == "push_interval_ms")
229        );
230    }
231
232    #[test]
233    fn test_sync_config_validate_zero_pull_interval() {
234        let config = SyncConfig {
235            pull_interval_ms: 0,
236            ..Default::default()
237        };
238        let err = config.validate().unwrap_err();
239        assert!(
240            matches!(err, ValidationError::InvalidField { field, .. } if field == "pull_interval_ms")
241        );
242    }
243
244    #[test]
245    fn test_sync_config_bincode_roundtrip() {
246        let config = SyncConfig {
247            direction: SyncDirection::PushOnly,
248            batch_size: 100,
249            collectives: Some(vec![CollectiveId::new()]),
250            ..Default::default()
251        };
252        let bytes = bincode::serialize(&config).unwrap();
253        let restored: SyncConfig = bincode::deserialize(&bytes).unwrap();
254        assert_eq!(config.direction, restored.direction);
255        assert_eq!(config.batch_size, restored.batch_size);
256    }
257
258    #[test]
259    fn test_retry_config_defaults() {
260        let config = RetryConfig::default();
261        assert_eq!(config.max_retries, 5);
262        assert_eq!(config.initial_backoff_ms, 500);
263        assert_eq!(config.max_backoff_ms, 30_000);
264        assert!((config.backoff_multiplier - 2.0).abs() < f64::EPSILON);
265    }
266}