1pub mod conflict;
4pub mod end_to_end;
5pub mod engine;
6pub mod realtime;
7
8use crate::{
9 crdt::{Mergeable, ReplicaId},
10 storage::{LocalStorage, StorageError},
11 transport::{SyncTransport, TransportError},
12};
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use thiserror::Error;
16
17pub use end_to_end::{
18 CollectionMetadata, EndToEndSyncError, EndToEndSyncManager, SyncMessage as EndToEndSyncMessage,
19};
20pub use engine::{
21 DefaultConflictResolver, PeerInfo, PeerSyncStatus, SyncEngine, SyncEngineError, SyncState,
22};
23
24#[derive(Error, Debug)]
25pub enum SyncError {
26 #[error("Storage error: {0}")]
27 Storage(#[from] StorageError),
28 #[error("Transport error: {0}")]
29 Transport(#[from] TransportError),
30 #[error("Serialization error: {0}")]
31 Serialization(#[from] serde_json::Error),
32 #[error("CRDT error: {0}")]
33 CrdtError(#[from] std::io::Error),
34 #[error("Sync operation failed: {0}")]
35 SyncFailed(String),
36 #[error("Encryption error: {0}")]
37 EncryptionError(String),
38 #[error("Authentication error: {0}")]
39 AuthenticationError(String),
40 #[error("GDPR error: {0}")]
41 GDPRError(String),
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub enum SyncMessage<T> {
47 Sync { key: String, data: T },
49 Ack { key: String },
51 Presence { replica_id: ReplicaId },
53}
54
55pub struct SyncManager<S, T>
57where
58 S: LocalStorage,
59 T: SyncTransport,
60{
61 replica_id: ReplicaId,
62 state: SyncState,
63 peers: HashMap<ReplicaId, PeerInfo>,
64 storage: S,
65 transport: T,
66}
67
68impl<S, T> SyncManager<S, T>
69where
70 S: LocalStorage,
71 T: SyncTransport,
72{
73 pub fn new(storage: S, transport: T) -> Self {
74 Self {
75 replica_id: ReplicaId::default(),
76 state: SyncState::NotSynced,
77 peers: HashMap::new(),
78 storage,
79 transport,
80 }
81 }
82
83 pub fn with_replica_id(storage: S, transport: T, replica_id: ReplicaId) -> Self {
84 Self {
85 replica_id,
86 state: SyncState::NotSynced,
87 peers: HashMap::new(),
88 storage,
89 transport,
90 }
91 }
92
93 pub fn state(&self) -> &SyncState {
94 &self.state
95 }
96
97 pub fn replica_id(&self) -> ReplicaId {
98 self.replica_id
99 }
100
101 pub fn is_online(&self) -> bool {
102 self.transport.is_connected()
103 }
104
105 pub fn peer_count(&self) -> usize {
106 self.peers.len()
107 }
108
109 pub async fn sync<V>(&mut self, key: &str, value: &V) -> Result<(), SyncError>
111 where
112 V: Mergeable + Serialize + Send + Sync + Clone,
113 V::Error: Into<SyncError>,
114 {
115 self.storage
117 .set(key, value)
118 .await
119 .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))?;
120
121 if self.transport.is_connected() {
123 let message = SyncMessage::Sync {
124 key: key.to_string(),
125 data: value.clone(),
126 };
127 let serialized = serde_json::to_vec(&message)?;
128 self.transport
129 .send(&serialized)
130 .await
131 .map_err(|e| SyncError::SyncFailed(format!("Transport error: {}", e)))?;
132 }
133
134 Ok(())
135 }
136
137 pub async fn process_messages<V>(&mut self) -> Result<Vec<(String, V)>, SyncError>
139 where
140 V: Mergeable + Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync,
141 V::Error: Into<SyncError>,
142 {
143 let mut updates = Vec::new();
144
145 let messages = self
147 .transport
148 .receive()
149 .await
150 .map_err(|e| SyncError::SyncFailed(format!("Transport error: {}", e)))?;
151
152 for message_bytes in messages {
153 match serde_json::from_slice::<SyncMessage<V>>(&message_bytes) {
154 Ok(SyncMessage::Sync { key, data }) => {
155 match self
157 .storage
158 .get::<V>(&key)
159 .await
160 .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))?
161 {
162 Some(mut existing) => {
163 existing.merge(&data).map_err(Into::into)?;
164 self.storage.set(&key, &existing).await.map_err(|e| {
165 SyncError::SyncFailed(format!("Storage error: {}", e))
166 })?;
167 updates.push((key, existing));
168 }
169 None => {
170 self.storage.set(&key, &data).await.map_err(|e| {
172 SyncError::SyncFailed(format!("Storage error: {}", e))
173 })?;
174 updates.push((key, data));
175 }
176 }
177 }
178 Ok(SyncMessage::Ack { key: _ }) => {
179 tracing::debug!("Received sync acknowledgment");
181 }
182 Ok(SyncMessage::Presence { replica_id }) => {
183 let peer_info = PeerInfo {
185 replica_id: replica_id.clone(),
186 last_seen: chrono::Utc::now(),
187 is_online: true,
188 last_sync: None,
189 sync_status: PeerSyncStatus::Never,
190 id: replica_id,
192 status: PeerSyncStatus::Never,
193 version: 1,
194 };
195 self.peers.insert(replica_id, peer_info);
196 }
197 Err(e) => {
198 tracing::warn!("Failed to deserialize sync message: {}", e);
199 }
200 }
201 }
202
203 Ok(updates)
204 }
205
206 pub async fn announce_presence(&mut self) -> Result<(), SyncError> {
208 if !self.transport.is_connected() {
209 return Ok(());
210 }
211
212 let message = SyncMessage::<()>::Presence {
213 replica_id: self.replica_id,
214 };
215 let serialized = serde_json::to_vec(&message)?;
216 self.transport
217 .send(&serialized)
218 .await
219 .map_err(|e| SyncError::SyncFailed(format!("Transport error: {}", e)))?;
220
221 Ok(())
222 }
223
224 pub fn peers(&self) -> impl Iterator<Item = (&ReplicaId, &PeerInfo)> {
226 self.peers.iter()
227 }
228}