leptos_sync_core/sync/
mod.rs

1//! Synchronization engine implementation
2
3pub mod engine;
4pub mod conflict;
5pub mod realtime;
6
7use crate::{
8    crdt::{Mergeable, ReplicaId},
9    storage::{LocalStorage, StorageError},
10    transport::{SyncTransport, TransportError},
11};
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use thiserror::Error;
15
16pub use engine::{SyncEngine, SyncEngineError, SyncState, PeerInfo, PeerSyncStatus, DefaultConflictResolver};
17
18#[derive(Error, Debug)]
19pub enum SyncError {
20    #[error("Storage error: {0}")]
21    Storage(#[from] StorageError),
22    #[error("Transport error: {0}")]
23    Transport(#[from] TransportError),
24    #[error("Serialization error: {0}")]
25    Serialization(#[from] serde_json::Error),
26    #[error("CRDT error: {0}")]
27    CrdtError(#[from] std::io::Error),
28    #[error("Sync operation failed: {0}")]
29    SyncFailed(String),
30    #[error("Encryption error: {0}")]
31    EncryptionError(String),
32    #[error("Authentication error: {0}")]
33    AuthenticationError(String),
34    #[error("GDPR error: {0}")]
35    GDPRError(String),
36}
37
38/// Legacy synchronization message types (for backward compatibility)
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub enum SyncMessage<T> {
41    /// Sync request with data
42    Sync { key: String, data: T },
43    /// Acknowledgment of sync
44    Ack { key: String },
45    /// Peer presence announcement
46    Presence { replica_id: ReplicaId },
47}
48
49/// Legacy synchronization manager (for backward compatibility)
50pub struct SyncManager<S, T> 
51where 
52    S: LocalStorage,
53    T: SyncTransport,
54{
55    replica_id: ReplicaId,
56    state: SyncState,
57    peers: HashMap<ReplicaId, PeerInfo>,
58    storage: S,
59    transport: T,
60}
61
62impl<S, T> SyncManager<S, T>
63where
64    S: LocalStorage,
65    T: SyncTransport,
66{
67    pub fn new(storage: S, transport: T) -> Self {
68        Self {
69            replica_id: ReplicaId::default(),
70            state: SyncState::NotSynced,
71            peers: HashMap::new(),
72            storage,
73            transport,
74        }
75    }
76
77    pub fn with_replica_id(storage: S, transport: T, replica_id: ReplicaId) -> Self {
78        Self {
79            replica_id,
80            state: SyncState::NotSynced,
81            peers: HashMap::new(),
82            storage,
83            transport,
84        }
85    }
86
87    pub fn state(&self) -> &SyncState {
88        &self.state
89    }
90
91    pub fn replica_id(&self) -> ReplicaId {
92        self.replica_id
93    }
94
95    pub fn is_online(&self) -> bool {
96        self.transport.is_connected()
97    }
98
99    pub fn peer_count(&self) -> usize {
100        self.peers.len()
101    }
102
103    /// Sync a CRDT value with peers
104    pub async fn sync<V>(&mut self, key: &str, value: &V) -> Result<(), SyncError>
105    where
106        V: Mergeable + Serialize + Send + Sync + Clone,
107        V::Error: Into<SyncError>,
108    {
109        // Store locally first
110        self.storage.set(key, value).await
111            .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))?;
112
113        // Announce to peers if connected
114        if self.transport.is_connected() {
115            let message = SyncMessage::Sync {
116                key: key.to_string(),
117                data: value.clone(),
118            };
119            let serialized = serde_json::to_vec(&message)?;
120            self.transport.send(&serialized).await
121                .map_err(|e| SyncError::SyncFailed(format!("Transport error: {}", e)))?;
122        }
123
124        Ok(())
125    }
126
127    /// Process incoming sync messages
128    pub async fn process_messages<V>(&mut self) -> Result<Vec<(String, V)>, SyncError>
129    where
130        V: Mergeable + Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync,
131        V::Error: Into<SyncError>,
132    {
133        let mut updates = Vec::new();
134
135        // Check for incoming messages
136        let messages = self.transport.receive().await
137            .map_err(|e| SyncError::SyncFailed(format!("Transport error: {}", e)))?;
138        
139        for message_bytes in messages {
140            match serde_json::from_slice::<SyncMessage<V>>(&message_bytes) {
141                Ok(SyncMessage::Sync { key, data }) => {
142                    // Try to merge with existing data
143                    match self.storage.get::<V>(&key).await
144                        .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))? {
145                        Some(mut existing) => {
146                            existing.merge(&data).map_err(Into::into)?;
147                            self.storage.set(&key, &existing).await
148                                .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))?;
149                            updates.push((key, existing));
150                        }
151                        None => {
152                            // No existing data, store as-is
153                            self.storage.set(&key, &data).await
154                                .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))?;
155                            updates.push((key, data));
156                        }
157                    }
158                }
159                Ok(SyncMessage::Ack { key: _ }) => {
160                    // Handle acknowledgment
161                    tracing::debug!("Received sync acknowledgment");
162                }
163                Ok(SyncMessage::Presence { replica_id }) => {
164                    // Update peer info
165                    let peer_info = PeerInfo {
166                        replica_id,
167                        last_seen: chrono::Utc::now(),
168                        is_online: true,
169                        last_sync: None,
170                        sync_status: PeerSyncStatus::Never,
171                    };
172                    self.peers.insert(replica_id, peer_info);
173                }
174                Err(e) => {
175                    tracing::warn!("Failed to deserialize sync message: {}", e);
176                }
177            }
178        }
179
180        Ok(updates)
181    }
182
183    /// Announce presence to peers
184    pub async fn announce_presence(&mut self) -> Result<(), SyncError> {
185        if !self.transport.is_connected() {
186            return Ok(());
187        }
188
189        let message = SyncMessage::<()>::Presence {
190            replica_id: self.replica_id,
191        };
192        let serialized = serde_json::to_vec(&message)?;
193        self.transport.send(&serialized).await
194            .map_err(|e| SyncError::SyncFailed(format!("Transport error: {}", e)))?;
195
196        Ok(())
197    }
198
199    /// Get all peers
200    pub fn peers(&self) -> impl Iterator<Item = (&ReplicaId, &PeerInfo)> {
201        self.peers.iter()
202    }
203}