leptos_sync_core/sync/
mod.rs

1//! Synchronization engine implementation
2
3pub mod engine;
4pub mod conflict;
5pub mod realtime;
6
7use crate::{
8    crdt::{Mergeable, ReplicaId},
9    storage::{LocalStorage, StorageError},
10    transport::{SyncTransport, TransportError},
11};
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use thiserror::Error;
15
16pub use engine::{SyncEngine, SyncEngineError, SyncState, PeerInfo, PeerSyncStatus, DefaultConflictResolver};
17
18#[derive(Error, Debug)]
19pub enum SyncError {
20    #[error("Storage error: {0}")]
21    Storage(#[from] StorageError),
22    #[error("Transport error: {0}")]
23    Transport(#[from] TransportError),
24    #[error("Serialization error: {0}")]
25    Serialization(#[from] serde_json::Error),
26    #[error("CRDT error: {0}")]
27    CrdtError(#[from] std::io::Error),
28    #[error("Sync operation failed: {0}")]
29    SyncFailed(String),
30}
31
32/// Legacy synchronization message types (for backward compatibility)
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub enum SyncMessage<T> {
35    /// Sync request with data
36    Sync { key: String, data: T },
37    /// Acknowledgment of sync
38    Ack { key: String },
39    /// Peer presence announcement
40    Presence { replica_id: ReplicaId },
41}
42
43/// Legacy synchronization manager (for backward compatibility)
44pub struct SyncManager<S, T> 
45where 
46    S: LocalStorage,
47    T: SyncTransport,
48{
49    replica_id: ReplicaId,
50    state: SyncState,
51    peers: HashMap<ReplicaId, PeerInfo>,
52    storage: S,
53    transport: T,
54}
55
56impl<S, T> SyncManager<S, T>
57where
58    S: LocalStorage,
59    T: SyncTransport,
60{
61    pub fn new(storage: S, transport: T) -> Self {
62        Self {
63            replica_id: ReplicaId::default(),
64            state: SyncState::NotSynced,
65            peers: HashMap::new(),
66            storage,
67            transport,
68        }
69    }
70
71    pub fn with_replica_id(storage: S, transport: T, replica_id: ReplicaId) -> Self {
72        Self {
73            replica_id,
74            state: SyncState::NotSynced,
75            peers: HashMap::new(),
76            storage,
77            transport,
78        }
79    }
80
81    pub fn state(&self) -> &SyncState {
82        &self.state
83    }
84
85    pub fn replica_id(&self) -> ReplicaId {
86        self.replica_id
87    }
88
89    pub fn is_online(&self) -> bool {
90        self.transport.is_connected()
91    }
92
93    pub fn peer_count(&self) -> usize {
94        self.peers.len()
95    }
96
97    /// Sync a CRDT value with peers
98    pub async fn sync<V>(&mut self, key: &str, value: &V) -> Result<(), SyncError>
99    where
100        V: Mergeable + Serialize + Send + Sync + Clone,
101        V::Error: Into<SyncError>,
102    {
103        // Store locally first
104        self.storage.set(key, value).await
105            .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))?;
106
107        // Announce to peers if connected
108        if self.transport.is_connected() {
109            let message = SyncMessage::Sync {
110                key: key.to_string(),
111                data: value.clone(),
112            };
113            let serialized = serde_json::to_vec(&message)?;
114            self.transport.send(&serialized).await
115                .map_err(|e| SyncError::SyncFailed(format!("Transport error: {}", e)))?;
116        }
117
118        Ok(())
119    }
120
121    /// Process incoming sync messages
122    pub async fn process_messages<V>(&mut self) -> Result<Vec<(String, V)>, SyncError>
123    where
124        V: Mergeable + Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync,
125        V::Error: Into<SyncError>,
126    {
127        let mut updates = Vec::new();
128
129        // Check for incoming messages
130        let messages = self.transport.receive().await
131            .map_err(|e| SyncError::SyncFailed(format!("Transport error: {}", e)))?;
132        
133        for message_bytes in messages {
134            match serde_json::from_slice::<SyncMessage<V>>(&message_bytes) {
135                Ok(SyncMessage::Sync { key, data }) => {
136                    // Try to merge with existing data
137                    match self.storage.get::<V>(&key).await
138                        .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))? {
139                        Some(mut existing) => {
140                            existing.merge(&data).map_err(Into::into)?;
141                            self.storage.set(&key, &existing).await
142                                .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))?;
143                            updates.push((key, existing));
144                        }
145                        None => {
146                            // No existing data, store as-is
147                            self.storage.set(&key, &data).await
148                                .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))?;
149                            updates.push((key, data));
150                        }
151                    }
152                }
153                Ok(SyncMessage::Ack { key: _ }) => {
154                    // Handle acknowledgment
155                    tracing::debug!("Received sync acknowledgment");
156                }
157                Ok(SyncMessage::Presence { replica_id }) => {
158                    // Update peer info
159                    let peer_info = PeerInfo {
160                        replica_id,
161                        last_seen: chrono::Utc::now(),
162                        is_online: true,
163                        last_sync: None,
164                        sync_status: PeerSyncStatus::Never,
165                    };
166                    self.peers.insert(replica_id, peer_info);
167                }
168                Err(e) => {
169                    tracing::warn!("Failed to deserialize sync message: {}", e);
170                }
171            }
172        }
173
174        Ok(updates)
175    }
176
177    /// Announce presence to peers
178    pub async fn announce_presence(&mut self) -> Result<(), SyncError> {
179        if !self.transport.is_connected() {
180            return Ok(());
181        }
182
183        let message = SyncMessage::<()>::Presence {
184            replica_id: self.replica_id,
185        };
186        let serialized = serde_json::to_vec(&message)?;
187        self.transport.send(&serialized).await
188            .map_err(|e| SyncError::SyncFailed(format!("Transport error: {}", e)))?;
189
190        Ok(())
191    }
192
193    /// Get all peers
194    pub fn peers(&self) -> impl Iterator<Item = (&ReplicaId, &PeerInfo)> {
195        self.peers.iter()
196    }
197}