1use crate::{
4 crdt::{Mergeable, ReplicaId},
5 storage::Storage,
6 transport::{SyncTransport, TransportError},
7};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12use thiserror::Error;
13
14#[derive(Error, Debug)]
15pub enum SyncEngineError {
16 #[error("Storage error: {0}")]
17 Storage(#[from] crate::storage::StorageError),
18 #[error("Transport error: {0}")]
19 Transport(#[from] TransportError),
20 #[error("Serialization error: {0}")]
21 Serialization(#[from] serde_json::Error),
22 #[error("CRDT error: {0}")]
23 CrdtError(#[from] Box<dyn std::error::Error + Send + Sync>),
24 #[error("Sync operation failed: {0}")]
25 SyncFailed(String),
26 #[error("Conflict resolution failed: {0}")]
27 ConflictResolution(String),
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
32pub enum SyncState {
33 NotSynced,
35 Syncing,
37 Synced,
39 Failed(String),
41 ResolvingConflicts,
43 Offline,
45 Connected,
47 Disconnected,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub enum SyncMessage<T> {
54 Sync { key: String, data: T, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
56 Ack { key: String, replica_id: ReplicaId },
58 Presence { replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
60 Conflict { key: String, data: T, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
62 Heartbeat { replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
64}
65
66pub struct SyncEngine<Tr>
68where
69 Tr: SyncTransport + Clone,
70{
71 replica_id: ReplicaId,
72 state: Arc<RwLock<SyncState>>,
73 peers: Arc<RwLock<HashMap<ReplicaId, PeerInfo>>>,
74 storage: Storage,
75 transport: Tr,
76 sync_queue: Arc<RwLock<Vec<SyncMessage<Vec<u8>>>>>,
77 conflict_resolver: Arc<RwLock<Option<DefaultConflictResolver>>>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct PeerInfo {
83 pub replica_id: ReplicaId,
84 pub last_seen: chrono::DateTime<chrono::Utc>,
85 pub is_online: bool,
86 pub last_sync: Option<chrono::DateTime<chrono::Utc>>,
87 pub sync_status: PeerSyncStatus,
88 pub id: ReplicaId,
90 pub status: PeerSyncStatus,
91 pub version: u32,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
96pub enum PeerSyncStatus {
97 Never,
99 Success { timestamp: chrono::DateTime<chrono::Utc> },
101 Failed { timestamp: chrono::DateTime<chrono::Utc>, error: String },
103 Syncing { started: chrono::DateTime<chrono::Utc> },
105 Connected,
107 Disconnected,
109}
110
111pub struct DefaultConflictResolver;
113
114impl DefaultConflictResolver {
115 pub fn resolve<T: Mergeable>(&self, local: &T, remote: &T) -> Result<T, Box<dyn std::error::Error + Send + Sync>> {
117 let mut result = local.clone();
119 result.merge(remote).map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
120 Ok(result)
121 }
122}
123
124impl<Tr> SyncEngine<Tr>
125where
126 Tr: SyncTransport + Clone + 'static,
127{
128 pub fn new(storage: Storage, transport: Tr) -> Self {
129 Self {
130 replica_id: ReplicaId::default(),
131 state: Arc::new(RwLock::new(SyncState::NotSynced)),
132 peers: Arc::new(RwLock::new(HashMap::new())),
133 storage,
134 transport,
135 sync_queue: Arc::new(RwLock::new(Vec::new())),
136 conflict_resolver: Arc::new(RwLock::new(Some(DefaultConflictResolver))),
137 }
138 }
139
140 pub fn with_replica_id(storage: Storage, transport: Tr, replica_id: ReplicaId) -> Self {
141 Self {
142 replica_id,
143 state: Arc::new(RwLock::new(SyncState::NotSynced)),
144 peers: Arc::new(RwLock::new(HashMap::new())),
145 storage,
146 transport,
147 sync_queue: Arc::new(RwLock::new(Vec::new())),
148 conflict_resolver: Arc::new(RwLock::new(Some(DefaultConflictResolver))),
149 }
150 }
151
152 pub async fn state(&self) -> SyncState {
153 self.state.read().await.clone()
154 }
155
156 pub fn replica_id(&self) -> ReplicaId {
157 self.replica_id
158 }
159
160 pub async fn is_online(&self) -> bool {
161 self.transport.is_connected()
162 }
163
164 pub async fn peer_count(&self) -> usize {
165 self.peers.read().await.len()
166 }
167
168 pub async fn start_sync(&mut self) -> Result<(), SyncEngineError> {
170 let mut state = self.state.write().await;
171 *state = SyncState::Syncing;
172
173 if !self.transport.is_connected() {
175 tracing::info!("Transport not connected, attempting to connect...");
177 }
178
179 self.announce_presence().await?;
181
182 self.start_background_sync().await;
184
185 Ok(())
186 }
187
188 pub async fn stop_sync(&mut self) -> Result<(), SyncEngineError> {
190 let mut state = self.state.write().await;
191 *state = SyncState::NotSynced;
192
193 if self.transport.is_connected() {
195 tracing::info!("Stopping sync, disconnecting from transport...");
196 }
197
198 Ok(())
199 }
200
201 pub async fn sync<V>(&mut self, key: &str, value: &V) -> Result<(), SyncEngineError>
203 where
204 V: Mergeable + Serialize + Send + Sync + Clone,
205 {
206 let data = serde_json::to_vec(value)?;
208
209 let message = SyncMessage::Sync {
211 key: key.to_string(),
212 data,
213 replica_id: self.replica_id,
214 timestamp: chrono::Utc::now(),
215 };
216
217 {
219 let mut queue = self.sync_queue.write().await;
220 queue.push(SyncMessage::Sync {
221 key: key.to_string(),
222 data: serde_json::to_vec(value)?,
223 replica_id: self.replica_id,
224 timestamp: chrono::Utc::now(),
225 });
226 }
227
228 let message_bytes = serde_json::to_vec(&message)?;
230 self.transport.send(&message_bytes).await
231 .map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
232
233 Ok(())
234 }
235
236 pub async fn process_messages(&mut self) -> Result<(), SyncEngineError> {
238 let messages = self.transport.receive().await
240 .map_err(|e| SyncEngineError::Transport(TransportError::ReceiveFailed(e.to_string())))?;
241
242 for message_bytes in messages {
243 let message: SyncMessage<Vec<u8>> = serde_json::from_slice(&message_bytes)?;
244
245 match message {
246 SyncMessage::Sync { key, data, replica_id, timestamp } => {
247 self.handle_sync_message(key, data, replica_id, timestamp).await?;
249 }
250 SyncMessage::Ack { key, replica_id } => {
251 self.handle_ack_message(key, replica_id).await?;
253 }
254 SyncMessage::Presence { replica_id, timestamp } => {
255 self.handle_presence_message(replica_id, timestamp).await?;
257 }
258 SyncMessage::Conflict { key, data, replica_id, timestamp } => {
259 self.handle_conflict_message(key, data, replica_id, timestamp).await?;
261 }
262 SyncMessage::Heartbeat { replica_id, timestamp } => {
263 self.handle_heartbeat_message(replica_id, timestamp).await?;
265 }
266 }
267 }
268
269 Ok(())
270 }
271
272 async fn announce_presence(&self) -> Result<(), SyncEngineError> {
274 let message: SyncMessage<()> = SyncMessage::Presence {
275 replica_id: self.replica_id,
276 timestamp: chrono::Utc::now(),
277 };
278
279 let message_bytes = serde_json::to_vec(&message)?;
280 self.transport.send(&message_bytes).await
281 .map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
282
283 Ok(())
284 }
285
286 async fn send_heartbeat(&self) -> Result<(), SyncEngineError> {
288 let message: SyncMessage<()> = SyncMessage::Heartbeat {
289 replica_id: self.replica_id,
290 timestamp: chrono::Utc::now(),
291 };
292
293 let message_bytes = serde_json::to_vec(&message)?;
294 self.transport.send(&message_bytes).await
295 .map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
296
297 Ok(())
298 }
299
300 async fn start_background_sync(&self) {
302 let transport = self.transport.clone();
303 let replica_id = self.replica_id;
304
305 tokio::spawn(async move {
306 let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
307
308 loop {
309 interval.tick().await;
310
311 let message: SyncMessage<()> = SyncMessage::Heartbeat {
313 replica_id,
314 timestamp: chrono::Utc::now(),
315 };
316
317 if let Ok(message_bytes) = serde_json::to_vec(&message) {
318 let _ = transport.send(&message_bytes).await;
319 }
320 }
321 });
322 }
323
324 async fn handle_sync_message(&mut self, key: String, _data: Vec<u8>, replica_id: ReplicaId, _timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
326 tracing::debug!("Received sync message for key {} from replica {}", key, replica_id);
327
328 let ack: SyncMessage<()> = SyncMessage::Ack {
330 key,
331 replica_id,
332 };
333
334 let ack_bytes = serde_json::to_vec(&ack)?;
335 self.transport.send(&ack_bytes).await
336 .map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
337
338 Ok(())
339 }
340
341 async fn handle_ack_message(&mut self, _key: String, _replica_id: ReplicaId) -> Result<(), SyncEngineError> {
343 tracing::debug!("Received ack message");
345 Ok(())
346 }
347
348 async fn handle_presence_message(&mut self, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
350 let mut peers = self.peers.write().await;
351
352 let peer_info = PeerInfo {
353 replica_id: replica_id.clone(),
354 last_seen: timestamp,
355 is_online: true,
356 last_sync: None,
357 sync_status: PeerSyncStatus::Never,
358 id: replica_id,
360 status: PeerSyncStatus::Never,
361 version: 1,
362 };
363
364 peers.insert(replica_id, peer_info);
365
366 tracing::debug!("Updated peer info for replica {}", replica_id);
367 Ok(())
368 }
369
370 async fn handle_conflict_message(&mut self, _key: String, _data: Vec<u8>, _replica_id: ReplicaId, _timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
372 tracing::debug!("Received conflict message");
374 Ok(())
375 }
376
377 async fn handle_heartbeat_message(&mut self, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
379 let mut peers = self.peers.write().await;
380
381 if let Some(peer_info) = peers.get_mut(&replica_id) {
382 peer_info.last_seen = timestamp;
383 tracing::debug!("Updated heartbeat for replica {}", replica_id);
384 }
385
386 Ok(())
387 }
388
389 pub async fn peers(&self) -> impl Iterator<Item = (ReplicaId, PeerInfo)> + 'static {
391 let peers = self.peers.read().await;
392 peers.clone().into_iter()
393 }
394
395 fn has_conflict<V: Mergeable>(&self, _local: &V, _remote: &V) -> bool {
397 false
399 }
400}