Skip to main content

mdcs_sdk/
sync.rs

1//! Synchronization primitives for the SDK.
2
3use crate::error::SdkError;
4use crate::network::{Message, NetworkTransport, PeerId};
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::Instant;
8
9/// Configuration for sync behavior.
10#[derive(Clone, Debug)]
11pub struct SyncConfig {
12    /// How often to send sync requests (in milliseconds).
13    pub sync_interval_ms: u64,
14    /// How often to send presence updates (in milliseconds).
15    pub presence_interval_ms: u64,
16    /// Timeout for sync requests (in milliseconds).
17    pub sync_timeout_ms: u64,
18    /// Maximum batch size for delta updates.
19    pub max_batch_size: usize,
20    /// Enable automatic background sync.
21    pub auto_sync: bool,
22}
23
24impl Default for SyncConfig {
25    fn default() -> Self {
26        Self {
27            sync_interval_ms: 1000,
28            presence_interval_ms: 500,
29            sync_timeout_ms: 5000,
30            max_batch_size: 100,
31            auto_sync: true,
32        }
33    }
34}
35
36/// Builder for sync configuration.
37pub struct SyncConfigBuilder {
38    config: SyncConfig,
39}
40
41impl SyncConfigBuilder {
42    /// Create a builder initialized with [`SyncConfig::default`].
43    pub fn new() -> Self {
44        Self {
45            config: SyncConfig::default(),
46        }
47    }
48
49    /// Set how often periodic sync should run (milliseconds).
50    pub fn sync_interval(mut self, ms: u64) -> Self {
51        self.config.sync_interval_ms = ms;
52        self
53    }
54
55    /// Set how often presence updates should be emitted (milliseconds).
56    pub fn presence_interval(mut self, ms: u64) -> Self {
57        self.config.presence_interval_ms = ms;
58        self
59    }
60
61    /// Set the sync request timeout (milliseconds).
62    pub fn sync_timeout(mut self, ms: u64) -> Self {
63        self.config.sync_timeout_ms = ms;
64        self
65    }
66
67    /// Set the maximum number of deltas to send in one batch.
68    pub fn max_batch_size(mut self, size: usize) -> Self {
69        self.config.max_batch_size = size;
70        self
71    }
72
73    /// Enable or disable automatic background synchronization.
74    pub fn auto_sync(mut self, enabled: bool) -> Self {
75        self.config.auto_sync = enabled;
76        self
77    }
78
79    /// Build and return the final [`SyncConfig`].
80    pub fn build(self) -> SyncConfig {
81        self.config
82    }
83}
84
85impl Default for SyncConfigBuilder {
86    fn default() -> Self {
87        Self::new()
88    }
89}
90
91/// Events emitted by the sync manager.
92#[derive(Clone, Debug)]
93pub enum SyncEvent {
94    /// Sync started with a peer.
95    SyncStarted(PeerId),
96    /// Sync completed with a peer.
97    SyncCompleted(PeerId),
98    /// Received update from peer.
99    ReceivedUpdate {
100        peer_id: PeerId,
101        document_id: String,
102    },
103    /// Sent update to peer.
104    SentUpdate {
105        peer_id: PeerId,
106        document_id: String,
107    },
108    /// Sync error occurred.
109    SyncError { peer_id: PeerId, error: String },
110}
111
112/// Sync state for a peer.
113#[derive(Clone, Debug, Default)]
114pub struct PeerSyncState {
115    /// Last known version for each document.
116    pub document_versions: HashMap<String, u64>,
117    /// Last sync time.
118    pub last_sync: Option<Instant>,
119}
120
121/// Manages synchronization between peers.
122pub struct SyncManager<T: NetworkTransport> {
123    transport: Arc<T>,
124    config: SyncConfig,
125    peer_states: HashMap<PeerId, PeerSyncState>,
126}
127
128impl<T: NetworkTransport> SyncManager<T> {
129    /// Create a new sync manager for a transport and configuration.
130    pub fn new(transport: Arc<T>, config: SyncConfig) -> Self {
131        Self {
132            transport,
133            config,
134            peer_states: HashMap::new(),
135        }
136    }
137
138    /// Return the active synchronization configuration.
139    pub fn config(&self) -> &SyncConfig {
140        &self.config
141    }
142
143    /// Broadcast a document update to all connected peers.
144    ///
145    /// # Errors
146    ///
147    /// Returns [`SdkError::SyncError`] if the transport broadcast fails.
148    pub async fn broadcast_update(
149        &mut self,
150        document_id: &str,
151        delta: Vec<u8>,
152        version: u64,
153    ) -> Result<(), SdkError> {
154        let message = Message::Update {
155            document_id: document_id.to_string(),
156            delta,
157            version,
158        };
159
160        self.transport
161            .broadcast(message)
162            .await
163            .map_err(|e| SdkError::SyncError(e.to_string()))
164    }
165
166    /// Send a sync request to a specific peer.
167    ///
168    /// # Errors
169    ///
170    /// Returns [`SdkError::SyncError`] if sending the request fails.
171    pub async fn request_sync(
172        &mut self,
173        peer_id: &PeerId,
174        document_id: &str,
175        version: u64,
176    ) -> Result<(), SdkError> {
177        let message = Message::SyncRequest {
178            document_id: document_id.to_string(),
179            version,
180        };
181
182        self.transport
183            .send(peer_id, message)
184            .await
185            .map_err(|e| SdkError::SyncError(e.to_string()))
186    }
187
188    /// Update local sync metadata for a peer/document pair.
189    pub fn update_peer_state(&mut self, peer_id: &PeerId, document_id: &str, version: u64) {
190        let state = self.peer_states.entry(peer_id.clone()).or_default();
191        state
192            .document_versions
193            .insert(document_id.to_string(), version);
194        state.last_sync = Some(Instant::now());
195    }
196
197    /// Return sync metadata for a peer if present.
198    pub fn get_peer_state(&self, peer_id: &PeerId) -> Option<&PeerSyncState> {
199        self.peer_states.get(peer_id)
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use crate::network::MemoryTransport;
207
208    #[test]
209    fn test_sync_config_builder() {
210        let config = SyncConfigBuilder::new()
211            .sync_interval(500)
212            .presence_interval(250)
213            .sync_timeout(3000)
214            .max_batch_size(50)
215            .auto_sync(false)
216            .build();
217
218        assert_eq!(config.sync_interval_ms, 500);
219        assert_eq!(config.presence_interval_ms, 250);
220        assert_eq!(config.sync_timeout_ms, 3000);
221        assert_eq!(config.max_batch_size, 50);
222        assert!(!config.auto_sync);
223    }
224
225    #[tokio::test]
226    async fn test_sync_manager_creation() {
227        let transport = Arc::new(MemoryTransport::new(PeerId::new("peer-1")));
228        let config = SyncConfig::default();
229        let manager = SyncManager::new(transport, config);
230
231        assert!(manager.config().auto_sync);
232    }
233}