leptos_sync_core/sync/
mod.rs1pub mod engine;
4pub mod conflict;
5pub mod realtime;
6
7use crate::{
8 crdt::{Mergeable, ReplicaId},
9 storage::{LocalStorage, StorageError},
10 transport::{SyncTransport, TransportError},
11};
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use thiserror::Error;
15
16pub use engine::{SyncEngine, SyncEngineError, SyncState, PeerInfo, PeerSyncStatus, DefaultConflictResolver};
17
18#[derive(Error, Debug)]
19pub enum SyncError {
20 #[error("Storage error: {0}")]
21 Storage(#[from] StorageError),
22 #[error("Transport error: {0}")]
23 Transport(#[from] TransportError),
24 #[error("Serialization error: {0}")]
25 Serialization(#[from] serde_json::Error),
26 #[error("CRDT error: {0}")]
27 CrdtError(#[from] std::io::Error),
28 #[error("Sync operation failed: {0}")]
29 SyncFailed(String),
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
34pub enum SyncMessage<T> {
35 Sync { key: String, data: T },
37 Ack { key: String },
39 Presence { replica_id: ReplicaId },
41}
42
43pub struct SyncManager<S, T>
45where
46 S: LocalStorage,
47 T: SyncTransport,
48{
49 replica_id: ReplicaId,
50 state: SyncState,
51 peers: HashMap<ReplicaId, PeerInfo>,
52 storage: S,
53 transport: T,
54}
55
56impl<S, T> SyncManager<S, T>
57where
58 S: LocalStorage,
59 T: SyncTransport,
60{
61 pub fn new(storage: S, transport: T) -> Self {
62 Self {
63 replica_id: ReplicaId::default(),
64 state: SyncState::NotSynced,
65 peers: HashMap::new(),
66 storage,
67 transport,
68 }
69 }
70
71 pub fn with_replica_id(storage: S, transport: T, replica_id: ReplicaId) -> Self {
72 Self {
73 replica_id,
74 state: SyncState::NotSynced,
75 peers: HashMap::new(),
76 storage,
77 transport,
78 }
79 }
80
81 pub fn state(&self) -> &SyncState {
82 &self.state
83 }
84
85 pub fn replica_id(&self) -> ReplicaId {
86 self.replica_id
87 }
88
89 pub fn is_online(&self) -> bool {
90 self.transport.is_connected()
91 }
92
93 pub fn peer_count(&self) -> usize {
94 self.peers.len()
95 }
96
97 pub async fn sync<V>(&mut self, key: &str, value: &V) -> Result<(), SyncError>
99 where
100 V: Mergeable + Serialize + Send + Sync + Clone,
101 V::Error: Into<SyncError>,
102 {
103 self.storage.set(key, value).await
105 .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))?;
106
107 if self.transport.is_connected() {
109 let message = SyncMessage::Sync {
110 key: key.to_string(),
111 data: value.clone(),
112 };
113 let serialized = serde_json::to_vec(&message)?;
114 self.transport.send(&serialized).await
115 .map_err(|e| SyncError::SyncFailed(format!("Transport error: {}", e)))?;
116 }
117
118 Ok(())
119 }
120
121 pub async fn process_messages<V>(&mut self) -> Result<Vec<(String, V)>, SyncError>
123 where
124 V: Mergeable + Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync,
125 V::Error: Into<SyncError>,
126 {
127 let mut updates = Vec::new();
128
129 let messages = self.transport.receive().await
131 .map_err(|e| SyncError::SyncFailed(format!("Transport error: {}", e)))?;
132
133 for message_bytes in messages {
134 match serde_json::from_slice::<SyncMessage<V>>(&message_bytes) {
135 Ok(SyncMessage::Sync { key, data }) => {
136 match self.storage.get::<V>(&key).await
138 .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))? {
139 Some(mut existing) => {
140 existing.merge(&data).map_err(Into::into)?;
141 self.storage.set(&key, &existing).await
142 .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))?;
143 updates.push((key, existing));
144 }
145 None => {
146 self.storage.set(&key, &data).await
148 .map_err(|e| SyncError::SyncFailed(format!("Storage error: {}", e)))?;
149 updates.push((key, data));
150 }
151 }
152 }
153 Ok(SyncMessage::Ack { key: _ }) => {
154 tracing::debug!("Received sync acknowledgment");
156 }
157 Ok(SyncMessage::Presence { replica_id }) => {
158 let peer_info = PeerInfo {
160 replica_id,
161 last_seen: chrono::Utc::now(),
162 is_online: true,
163 last_sync: None,
164 sync_status: PeerSyncStatus::Never,
165 };
166 self.peers.insert(replica_id, peer_info);
167 }
168 Err(e) => {
169 tracing::warn!("Failed to deserialize sync message: {}", e);
170 }
171 }
172 }
173
174 Ok(updates)
175 }
176
177 pub async fn announce_presence(&mut self) -> Result<(), SyncError> {
179 if !self.transport.is_connected() {
180 return Ok(());
181 }
182
183 let message = SyncMessage::<()>::Presence {
184 replica_id: self.replica_id,
185 };
186 let serialized = serde_json::to_vec(&message)?;
187 self.transport.send(&serialized).await
188 .map_err(|e| SyncError::SyncFailed(format!("Transport error: {}", e)))?;
189
190 Ok(())
191 }
192
193 pub fn peers(&self) -> impl Iterator<Item = (&ReplicaId, &PeerInfo)> {
195 self.peers.iter()
196 }
197}