leptos_sync_core/sync/
mod.rs

1//! Synchronization engine implementation
2
3pub mod conflict;
4pub mod end_to_end;
5pub mod engine;
6pub mod realtime;
7
8use crate::{
9    crdt::{Mergeable, ReplicaId},
10    storage::{LocalStorage, StorageError},
11    transport::{SyncTransport, TransportError},
12};
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use thiserror::Error;
16
17pub use end_to_end::{
18    CollectionMetadata, EndToEndSyncError, EndToEndSyncManager, SyncMessage as EndToEndSyncMessage,
19};
20pub use engine::{
21    DefaultConflictResolver, PeerInfo, PeerSyncStatus, SyncEngine, SyncEngineError, SyncState,
22};
23
24#[derive(Error, Debug)]
25pub enum SyncError {
26    #[error("Storage error: {0}")]
27    Storage(#[from] StorageError),
28    #[error("Transport error: {0}")]
29    Transport(#[from] TransportError),
30    #[error("Serialization error: {0}")]
31    Serialization(#[from] serde_json::Error),
32    #[error("CRDT error: {0}")]
33    CrdtError(#[from] std::io::Error),
34    #[error("Sync operation failed: {0}")]
35    SyncFailed(String),
36    #[error("Encryption error: {0}")]
37    EncryptionError(String),
38    #[error("Authentication error: {0}")]
39    AuthenticationError(String),
40    #[error("GDPR error: {0}")]
41    GDPRError(String),
42}
43
44/// Legacy synchronization message types (for backward compatibility)
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub enum SyncMessage<T> {
47    /// Sync request with data
48    Sync { key: String, data: T },
49    /// Acknowledgment of sync
50    Ack { key: String },
51    /// Peer presence announcement
52    Presence { replica_id: ReplicaId },
53}
54
55/// Legacy synchronization manager (for backward compatibility)
56pub struct SyncManager<S, T>
57where
58    S: LocalStorage,
59    T: SyncTransport,
60{
61    replica_id: ReplicaId,
62    state: SyncState,
63    peers: HashMap<ReplicaId, PeerInfo>,
64    storage: S,
65    transport: T,
66}
67
68impl<S, T> SyncManager<S, T>
69where
70    S: LocalStorage,
71    T: SyncTransport,
72{
73    pub fn new(storage: S, transport: T) -> Self {
74        Self {
75            replica_id: ReplicaId::default(),
76            state: SyncState::NotSynced,
77            peers: HashMap::new(),
78            storage,
79            transport,
80        }
81    }
82
83    pub fn with_replica_id(storage: S, transport: T, replica_id: ReplicaId) -> Self {
84        Self {
85            replica_id,
86            state: SyncState::NotSynced,
87            peers: HashMap::new(),
88            storage,
89            transport,
90        }
91    }
92
93    pub fn state(&self) -> &SyncState {
94        &self.state
95    }
96
97    pub fn replica_id(&self) -> ReplicaId {
98        self.replica_id
99    }
100
101    pub fn is_online(&self) -> bool {
102        self.transport.is_connected()
103    }
104
105    pub fn peer_count(&self) -> usize {
106        self.peers.len()
107    }
108
109    /// Sync a CRDT value with peers
110    pub async fn sync<V>(&mut self, key: &str, value: &V) -> Result<(), SyncError>
111    where
112        V: Mergeable + Serialize + Send + Sync + Clone,
113        V::Error: Into<SyncError>,
114    {
115        // Store locally first
116        self.storage
117            .set(key, value)
118            .await
119            .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))?;
120
121        // Announce to peers if connected
122        if self.transport.is_connected() {
123            let message = SyncMessage::Sync {
124                key: key.to_string(),
125                data: value.clone(),
126            };
127            let serialized = serde_json::to_vec(&message)?;
128            self.transport
129                .send(&serialized)
130                .await
131                .map_err(|e| SyncError::SyncFailed(format!("Transport error: {}", e)))?;
132        }
133
134        Ok(())
135    }
136
137    /// Process incoming sync messages
138    pub async fn process_messages<V>(&mut self) -> Result<Vec<(String, V)>, SyncError>
139    where
140        V: Mergeable + Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync,
141        V::Error: Into<SyncError>,
142    {
143        let mut updates = Vec::new();
144
145        // Check for incoming messages
146        let messages = self
147            .transport
148            .receive()
149            .await
150            .map_err(|e| SyncError::SyncFailed(format!("Transport error: {}", e)))?;
151
152        for message_bytes in messages {
153            match serde_json::from_slice::<SyncMessage<V>>(&message_bytes) {
154                Ok(SyncMessage::Sync { key, data }) => {
155                    // Try to merge with existing data
156                    match self
157                        .storage
158                        .get::<V>(&key)
159                        .await
160                        .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))?
161                    {
162                        Some(mut existing) => {
163                            existing.merge(&data).map_err(Into::into)?;
164                            self.storage.set(&key, &existing).await.map_err(|e| {
165                                SyncError::SyncFailed(format!("Storage error: {}", e))
166                            })?;
167                            updates.push((key, existing));
168                        }
169                        None => {
170                            // No existing data, store as-is
171                            self.storage.set(&key, &data).await.map_err(|e| {
172                                SyncError::SyncFailed(format!("Storage error: {}", e))
173                            })?;
174                            updates.push((key, data));
175                        }
176                    }
177                }
178                Ok(SyncMessage::Ack { key: _ }) => {
179                    // Handle acknowledgment
180                    tracing::debug!("Received sync acknowledgment");
181                }
182                Ok(SyncMessage::Presence { replica_id }) => {
183                    // Update peer info
184                    let peer_info = PeerInfo {
185                        replica_id: replica_id.clone(),
186                        last_seen: chrono::Utc::now(),
187                        is_online: true,
188                        last_sync: None,
189                        sync_status: PeerSyncStatus::Never,
190                        // Additional fields for compatibility
191                        id: replica_id,
192                        status: PeerSyncStatus::Never,
193                        version: 1,
194                    };
195                    self.peers.insert(replica_id, peer_info);
196                }
197                Err(e) => {
198                    tracing::warn!("Failed to deserialize sync message: {}", e);
199                }
200            }
201        }
202
203        Ok(updates)
204    }
205
206    /// Announce presence to peers
207    pub async fn announce_presence(&mut self) -> Result<(), SyncError> {
208        if !self.transport.is_connected() {
209            return Ok(());
210        }
211
212        let message = SyncMessage::<()>::Presence {
213            replica_id: self.replica_id,
214        };
215        let serialized = serde_json::to_vec(&message)?;
216        self.transport
217            .send(&serialized)
218            .await
219            .map_err(|e| SyncError::SyncFailed(format!("Transport error: {}", e)))?;
220
221        Ok(())
222    }
223
224    /// Get all peers
225    pub fn peers(&self) -> impl Iterator<Item = (&ReplicaId, &PeerInfo)> {
226        self.peers.iter()
227    }
228}