Skip to main content

mdcs_sdk/
sync.rs

1//! Synchronization primitives for the SDK.
2
3use crate::error::SdkError;
4use crate::network::{Message, NetworkTransport, PeerId};
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::Instant;
8
9/// Configuration for sync behavior.
10#[derive(Clone, Debug)]
11pub struct SyncConfig {
12    /// How often to send sync requests (in milliseconds).
13    pub sync_interval_ms: u64,
14    /// How often to send presence updates (in milliseconds).
15    pub presence_interval_ms: u64,
16    /// Timeout for sync requests (in milliseconds).
17    pub sync_timeout_ms: u64,
18    /// Maximum batch size for delta updates.
19    pub max_batch_size: usize,
20    /// Enable automatic background sync.
21    pub auto_sync: bool,
22}
23
24impl Default for SyncConfig {
25    fn default() -> Self {
26        Self {
27            sync_interval_ms: 1000,
28            presence_interval_ms: 500,
29            sync_timeout_ms: 5000,
30            max_batch_size: 100,
31            auto_sync: true,
32        }
33    }
34}
35
36/// Builder for sync configuration.
37pub struct SyncConfigBuilder {
38    config: SyncConfig,
39}
40
41impl SyncConfigBuilder {
42    pub fn new() -> Self {
43        Self {
44            config: SyncConfig::default(),
45        }
46    }
47
48    pub fn sync_interval(mut self, ms: u64) -> Self {
49        self.config.sync_interval_ms = ms;
50        self
51    }
52
53    pub fn presence_interval(mut self, ms: u64) -> Self {
54        self.config.presence_interval_ms = ms;
55        self
56    }
57
58    pub fn sync_timeout(mut self, ms: u64) -> Self {
59        self.config.sync_timeout_ms = ms;
60        self
61    }
62
63    pub fn max_batch_size(mut self, size: usize) -> Self {
64        self.config.max_batch_size = size;
65        self
66    }
67
68    pub fn auto_sync(mut self, enabled: bool) -> Self {
69        self.config.auto_sync = enabled;
70        self
71    }
72
73    pub fn build(self) -> SyncConfig {
74        self.config
75    }
76}
77
78impl Default for SyncConfigBuilder {
79    fn default() -> Self {
80        Self::new()
81    }
82}
83
84/// Events emitted by the sync manager.
85#[derive(Clone, Debug)]
86pub enum SyncEvent {
87    /// Sync started with a peer.
88    SyncStarted(PeerId),
89    /// Sync completed with a peer.
90    SyncCompleted(PeerId),
91    /// Received update from peer.
92    ReceivedUpdate {
93        peer_id: PeerId,
94        document_id: String,
95    },
96    /// Sent update to peer.
97    SentUpdate {
98        peer_id: PeerId,
99        document_id: String,
100    },
101    /// Sync error occurred.
102    SyncError { peer_id: PeerId, error: String },
103}
104
105/// Sync state for a peer.
106#[derive(Clone, Debug, Default)]
107pub struct PeerSyncState {
108    /// Last known version for each document.
109    pub document_versions: HashMap<String, u64>,
110    /// Last sync time.
111    pub last_sync: Option<Instant>,
112}
113
114/// Manages synchronization between peers.
115pub struct SyncManager<T: NetworkTransport> {
116    transport: Arc<T>,
117    config: SyncConfig,
118    peer_states: HashMap<PeerId, PeerSyncState>,
119}
120
121impl<T: NetworkTransport> SyncManager<T> {
122    /// Create a new sync manager.
123    pub fn new(transport: Arc<T>, config: SyncConfig) -> Self {
124        Self {
125            transport,
126            config,
127            peer_states: HashMap::new(),
128        }
129    }
130
131    /// Get the sync configuration.
132    pub fn config(&self) -> &SyncConfig {
133        &self.config
134    }
135
136    /// Broadcast a document update to all connected peers.
137    pub async fn broadcast_update(
138        &mut self,
139        document_id: &str,
140        delta: Vec<u8>,
141        version: u64,
142    ) -> Result<(), SdkError> {
143        let message = Message::Update {
144            document_id: document_id.to_string(),
145            delta,
146            version,
147        };
148
149        self.transport
150            .broadcast(message)
151            .await
152            .map_err(|e| SdkError::SyncError(e.to_string()))
153    }
154
155    /// Send a sync request to a specific peer.
156    pub async fn request_sync(
157        &mut self,
158        peer_id: &PeerId,
159        document_id: &str,
160        version: u64,
161    ) -> Result<(), SdkError> {
162        let message = Message::SyncRequest {
163            document_id: document_id.to_string(),
164            version,
165        };
166
167        self.transport
168            .send(peer_id, message)
169            .await
170            .map_err(|e| SdkError::SyncError(e.to_string()))
171    }
172
173    /// Update sync state for a peer.
174    pub fn update_peer_state(&mut self, peer_id: &PeerId, document_id: &str, version: u64) {
175        let state = self.peer_states.entry(peer_id.clone()).or_default();
176        state
177            .document_versions
178            .insert(document_id.to_string(), version);
179        state.last_sync = Some(Instant::now());
180    }
181
182    /// Get sync state for a peer.
183    pub fn get_peer_state(&self, peer_id: &PeerId) -> Option<&PeerSyncState> {
184        self.peer_states.get(peer_id)
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    use crate::network::MemoryTransport;
192
193    #[test]
194    fn test_sync_config_builder() {
195        let config = SyncConfigBuilder::new()
196            .sync_interval(500)
197            .presence_interval(250)
198            .sync_timeout(3000)
199            .max_batch_size(50)
200            .auto_sync(false)
201            .build();
202
203        assert_eq!(config.sync_interval_ms, 500);
204        assert_eq!(config.presence_interval_ms, 250);
205        assert_eq!(config.sync_timeout_ms, 3000);
206        assert_eq!(config.max_batch_size, 50);
207        assert!(!config.auto_sync);
208    }
209
210    #[tokio::test]
211    async fn test_sync_manager_creation() {
212        let transport = Arc::new(MemoryTransport::new(PeerId::new("peer-1")));
213        let config = SyncConfig::default();
214        let manager = SyncManager::new(transport, config);
215
216        assert!(manager.config().auto_sync);
217    }
218}