1use crate::{
4 crdt::{Mergeable, ReplicaId},
5 storage::Storage,
6 transport::{SyncTransport, TransportError},
7};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12use thiserror::Error;
13
14#[derive(Error, Debug)]
15pub enum SyncEngineError {
16 #[error("Storage error: {0}")]
17 Storage(#[from] crate::storage::StorageError),
18 #[error("Transport error: {0}")]
19 Transport(#[from] TransportError),
20 #[error("Serialization error: {0}")]
21 Serialization(#[from] serde_json::Error),
22 #[error("CRDT error: {0}")]
23 CrdtError(#[from] Box<dyn std::error::Error + Send + Sync>),
24 #[error("Sync operation failed: {0}")]
25 SyncFailed(String),
26 #[error("Conflict resolution failed: {0}")]
27 ConflictResolution(String),
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub enum SyncState {
33 NotSynced,
35 Syncing,
37 Synced,
39 Failed(String),
41 ResolvingConflicts,
43 Offline,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub enum SyncMessage<T> {
50 Sync { key: String, data: T, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
52 Ack { key: String, replica_id: ReplicaId },
54 Presence { replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
56 Conflict { key: String, data: T, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
58 Heartbeat { replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
60}
61
62pub struct SyncEngine<Tr>
64where
65 Tr: SyncTransport + Clone,
66{
67 replica_id: ReplicaId,
68 state: Arc<RwLock<SyncState>>,
69 peers: Arc<RwLock<HashMap<ReplicaId, PeerInfo>>>,
70 storage: Storage,
71 transport: Tr,
72 sync_queue: Arc<RwLock<Vec<SyncMessage<Vec<u8>>>>>,
73 conflict_resolver: Arc<RwLock<Option<DefaultConflictResolver>>>,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct PeerInfo {
79 pub replica_id: ReplicaId,
80 pub last_seen: chrono::DateTime<chrono::Utc>,
81 pub is_online: bool,
82 pub last_sync: Option<chrono::DateTime<chrono::Utc>>,
83 pub sync_status: PeerSyncStatus,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub enum PeerSyncStatus {
89 Never,
91 Success { timestamp: chrono::DateTime<chrono::Utc> },
93 Failed { timestamp: chrono::DateTime<chrono::Utc>, error: String },
95 Syncing { started: chrono::DateTime<chrono::Utc> },
97}
98
99pub struct DefaultConflictResolver;
101
102impl DefaultConflictResolver {
103 pub fn resolve<T: Mergeable>(&self, local: &T, remote: &T) -> Result<T, Box<dyn std::error::Error + Send + Sync>> {
105 let mut result = local.clone();
107 result.merge(remote).map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
108 Ok(result)
109 }
110}
111
112impl<Tr> SyncEngine<Tr>
113where
114 Tr: SyncTransport + Clone + 'static,
115{
116 pub fn new(storage: Storage, transport: Tr) -> Self {
117 Self {
118 replica_id: ReplicaId::default(),
119 state: Arc::new(RwLock::new(SyncState::NotSynced)),
120 peers: Arc::new(RwLock::new(HashMap::new())),
121 storage,
122 transport,
123 sync_queue: Arc::new(RwLock::new(Vec::new())),
124 conflict_resolver: Arc::new(RwLock::new(Some(DefaultConflictResolver))),
125 }
126 }
127
128 pub fn with_replica_id(storage: Storage, transport: Tr, replica_id: ReplicaId) -> Self {
129 Self {
130 replica_id,
131 state: Arc::new(RwLock::new(SyncState::NotSynced)),
132 peers: Arc::new(RwLock::new(HashMap::new())),
133 storage,
134 transport,
135 sync_queue: Arc::new(RwLock::new(Vec::new())),
136 conflict_resolver: Arc::new(RwLock::new(Some(DefaultConflictResolver))),
137 }
138 }
139
140 pub async fn state(&self) -> SyncState {
141 self.state.read().await.clone()
142 }
143
144 pub fn replica_id(&self) -> ReplicaId {
145 self.replica_id
146 }
147
148 pub async fn is_online(&self) -> bool {
149 self.transport.is_connected()
150 }
151
152 pub async fn peer_count(&self) -> usize {
153 self.peers.read().await.len()
154 }
155
156 pub async fn start_sync(&mut self) -> Result<(), SyncEngineError> {
158 let mut state = self.state.write().await;
159 *state = SyncState::Syncing;
160
161 if !self.transport.is_connected() {
163 tracing::info!("Transport not connected, attempting to connect...");
165 }
166
167 self.announce_presence().await?;
169
170 self.start_background_sync().await;
172
173 Ok(())
174 }
175
176 pub async fn stop_sync(&mut self) -> Result<(), SyncEngineError> {
178 let mut state = self.state.write().await;
179 *state = SyncState::NotSynced;
180
181 if self.transport.is_connected() {
183 tracing::info!("Stopping sync, disconnecting from transport...");
184 }
185
186 Ok(())
187 }
188
189 pub async fn sync<V>(&mut self, key: &str, value: &V) -> Result<(), SyncEngineError>
191 where
192 V: Mergeable + Serialize + Send + Sync + Clone,
193 {
194 let data = serde_json::to_vec(value)?;
196
197 let message = SyncMessage::Sync {
199 key: key.to_string(),
200 data,
201 replica_id: self.replica_id,
202 timestamp: chrono::Utc::now(),
203 };
204
205 {
207 let mut queue = self.sync_queue.write().await;
208 queue.push(SyncMessage::Sync {
209 key: key.to_string(),
210 data: serde_json::to_vec(value)?,
211 replica_id: self.replica_id,
212 timestamp: chrono::Utc::now(),
213 });
214 }
215
216 let message_bytes = serde_json::to_vec(&message)?;
218 self.transport.send(&message_bytes).await
219 .map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
220
221 Ok(())
222 }
223
224 pub async fn process_messages(&mut self) -> Result<(), SyncEngineError> {
226 let messages = self.transport.receive().await
228 .map_err(|e| SyncEngineError::Transport(TransportError::ReceiveFailed(e.to_string())))?;
229
230 for message_bytes in messages {
231 let message: SyncMessage<Vec<u8>> = serde_json::from_slice(&message_bytes)?;
232
233 match message {
234 SyncMessage::Sync { key, data, replica_id, timestamp } => {
235 self.handle_sync_message(key, data, replica_id, timestamp).await?;
237 }
238 SyncMessage::Ack { key, replica_id } => {
239 self.handle_ack_message(key, replica_id).await?;
241 }
242 SyncMessage::Presence { replica_id, timestamp } => {
243 self.handle_presence_message(replica_id, timestamp).await?;
245 }
246 SyncMessage::Conflict { key, data, replica_id, timestamp } => {
247 self.handle_conflict_message(key, data, replica_id, timestamp).await?;
249 }
250 SyncMessage::Heartbeat { replica_id, timestamp } => {
251 self.handle_heartbeat_message(replica_id, timestamp).await?;
253 }
254 }
255 }
256
257 Ok(())
258 }
259
260 async fn announce_presence(&self) -> Result<(), SyncEngineError> {
262 let message: SyncMessage<()> = SyncMessage::Presence {
263 replica_id: self.replica_id,
264 timestamp: chrono::Utc::now(),
265 };
266
267 let message_bytes = serde_json::to_vec(&message)?;
268 self.transport.send(&message_bytes).await
269 .map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
270
271 Ok(())
272 }
273
274 async fn send_heartbeat(&self) -> Result<(), SyncEngineError> {
276 let message: SyncMessage<()> = SyncMessage::Heartbeat {
277 replica_id: self.replica_id,
278 timestamp: chrono::Utc::now(),
279 };
280
281 let message_bytes = serde_json::to_vec(&message)?;
282 self.transport.send(&message_bytes).await
283 .map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
284
285 Ok(())
286 }
287
288 async fn start_background_sync(&self) {
290 let transport = self.transport.clone();
291 let replica_id = self.replica_id;
292
293 tokio::spawn(async move {
294 let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
295
296 loop {
297 interval.tick().await;
298
299 let message: SyncMessage<()> = SyncMessage::Heartbeat {
301 replica_id,
302 timestamp: chrono::Utc::now(),
303 };
304
305 if let Ok(message_bytes) = serde_json::to_vec(&message) {
306 let _ = transport.send(&message_bytes).await;
307 }
308 }
309 });
310 }
311
312 async fn handle_sync_message(&mut self, key: String, _data: Vec<u8>, replica_id: ReplicaId, _timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
314 tracing::debug!("Received sync message for key {} from replica {}", key, replica_id);
315
316 let ack: SyncMessage<()> = SyncMessage::Ack {
318 key,
319 replica_id,
320 };
321
322 let ack_bytes = serde_json::to_vec(&ack)?;
323 self.transport.send(&ack_bytes).await
324 .map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
325
326 Ok(())
327 }
328
329 async fn handle_ack_message(&mut self, _key: String, _replica_id: ReplicaId) -> Result<(), SyncEngineError> {
331 tracing::debug!("Received ack message");
333 Ok(())
334 }
335
336 async fn handle_presence_message(&mut self, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
338 let mut peers = self.peers.write().await;
339
340 let peer_info = PeerInfo {
341 replica_id,
342 last_seen: timestamp,
343 is_online: true,
344 last_sync: None,
345 sync_status: PeerSyncStatus::Never,
346 };
347
348 peers.insert(replica_id, peer_info);
349
350 tracing::debug!("Updated peer info for replica {}", replica_id);
351 Ok(())
352 }
353
354 async fn handle_conflict_message(&mut self, _key: String, _data: Vec<u8>, _replica_id: ReplicaId, _timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
356 tracing::debug!("Received conflict message");
358 Ok(())
359 }
360
361 async fn handle_heartbeat_message(&mut self, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
363 let mut peers = self.peers.write().await;
364
365 if let Some(peer_info) = peers.get_mut(&replica_id) {
366 peer_info.last_seen = timestamp;
367 tracing::debug!("Updated heartbeat for replica {}", replica_id);
368 }
369
370 Ok(())
371 }
372
373 pub async fn peers(&self) -> impl Iterator<Item = (ReplicaId, PeerInfo)> {
375 let peers = self.peers.read().await;
376 peers.clone().into_iter()
377 }
378
379 fn has_conflict<V: Mergeable>(&self, _local: &V, _remote: &V) -> bool {
381 false
383 }
384}