1pub mod engine;
4pub mod conflict;
5pub mod realtime;
6
7use crate::{
8 crdt::{Mergeable, ReplicaId},
9 storage::{LocalStorage, StorageError},
10 transport::{SyncTransport, TransportError},
11};
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use thiserror::Error;
15
16pub use engine::{SyncEngine, SyncEngineError, SyncState, PeerInfo, PeerSyncStatus, DefaultConflictResolver};
17
18#[derive(Error, Debug)]
19pub enum SyncError {
20 #[error("Storage error: {0}")]
21 Storage(#[from] StorageError),
22 #[error("Transport error: {0}")]
23 Transport(#[from] TransportError),
24 #[error("Serialization error: {0}")]
25 Serialization(#[from] serde_json::Error),
26 #[error("CRDT error: {0}")]
27 CrdtError(#[from] std::io::Error),
28 #[error("Sync operation failed: {0}")]
29 SyncFailed(String),
30 #[error("Encryption error: {0}")]
31 EncryptionError(String),
32 #[error("Authentication error: {0}")]
33 AuthenticationError(String),
34 #[error("GDPR error: {0}")]
35 GDPRError(String),
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub enum SyncMessage<T> {
41 Sync { key: String, data: T },
43 Ack { key: String },
45 Presence { replica_id: ReplicaId },
47}
48
49pub struct SyncManager<S, T>
51where
52 S: LocalStorage,
53 T: SyncTransport,
54{
55 replica_id: ReplicaId,
56 state: SyncState,
57 peers: HashMap<ReplicaId, PeerInfo>,
58 storage: S,
59 transport: T,
60}
61
62impl<S, T> SyncManager<S, T>
63where
64 S: LocalStorage,
65 T: SyncTransport,
66{
67 pub fn new(storage: S, transport: T) -> Self {
68 Self {
69 replica_id: ReplicaId::default(),
70 state: SyncState::NotSynced,
71 peers: HashMap::new(),
72 storage,
73 transport,
74 }
75 }
76
77 pub fn with_replica_id(storage: S, transport: T, replica_id: ReplicaId) -> Self {
78 Self {
79 replica_id,
80 state: SyncState::NotSynced,
81 peers: HashMap::new(),
82 storage,
83 transport,
84 }
85 }
86
87 pub fn state(&self) -> &SyncState {
88 &self.state
89 }
90
91 pub fn replica_id(&self) -> ReplicaId {
92 self.replica_id
93 }
94
95 pub fn is_online(&self) -> bool {
96 self.transport.is_connected()
97 }
98
99 pub fn peer_count(&self) -> usize {
100 self.peers.len()
101 }
102
103 pub async fn sync<V>(&mut self, key: &str, value: &V) -> Result<(), SyncError>
105 where
106 V: Mergeable + Serialize + Send + Sync + Clone,
107 V::Error: Into<SyncError>,
108 {
109 self.storage.set(key, value).await
111 .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))?;
112
113 if self.transport.is_connected() {
115 let message = SyncMessage::Sync {
116 key: key.to_string(),
117 data: value.clone(),
118 };
119 let serialized = serde_json::to_vec(&message)?;
120 self.transport.send(&serialized).await
121 .map_err(|e| SyncError::SyncFailed(format!("Transport error: {}", e)))?;
122 }
123
124 Ok(())
125 }
126
127 pub async fn process_messages<V>(&mut self) -> Result<Vec<(String, V)>, SyncError>
129 where
130 V: Mergeable + Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync,
131 V::Error: Into<SyncError>,
132 {
133 let mut updates = Vec::new();
134
135 let messages = self.transport.receive().await
137 .map_err(|e| SyncError::SyncFailed(format!("Transport error: {}", e)))?;
138
139 for message_bytes in messages {
140 match serde_json::from_slice::<SyncMessage<V>>(&message_bytes) {
141 Ok(SyncMessage::Sync { key, data }) => {
142 match self.storage.get::<V>(&key).await
144 .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))? {
145 Some(mut existing) => {
146 existing.merge(&data).map_err(Into::into)?;
147 self.storage.set(&key, &existing).await
148 .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))?;
149 updates.push((key, existing));
150 }
151 None => {
152 self.storage.set(&key, &data).await
154 .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))?;
155 updates.push((key, data));
156 }
157 }
158 }
159 Ok(SyncMessage::Ack { key: _ }) => {
160 tracing::debug!("Received sync acknowledgment");
162 }
163 Ok(SyncMessage::Presence { replica_id }) => {
164 let peer_info = PeerInfo {
166 replica_id,
167 last_seen: chrono::Utc::now(),
168 is_online: true,
169 last_sync: None,
170 sync_status: PeerSyncStatus::Never,
171 };
172 self.peers.insert(replica_id, peer_info);
173 }
174 Err(e) => {
175 tracing::warn!("Failed to deserialize sync message: {}", e);
176 }
177 }
178 }
179
180 Ok(updates)
181 }
182
183 pub async fn announce_presence(&mut self) -> Result<(), SyncError> {
185 if !self.transport.is_connected() {
186 return Ok(());
187 }
188
189 let message = SyncMessage::<()>::Presence {
190 replica_id: self.replica_id,
191 };
192 let serialized = serde_json::to_vec(&message)?;
193 self.transport.send(&serialized).await
194 .map_err(|e| SyncError::SyncFailed(format!("Transport error: {}", e)))?;
195
196 Ok(())
197 }
198
199 pub fn peers(&self) -> impl Iterator<Item = (&ReplicaId, &PeerInfo)> {
201 self.peers.iter()
202 }
203}