1use crate::retry::QueryError;
7use crate::types::QueryKey;
8use serde::{Deserialize, Serialize};
9use serde::de::DeserializeOwned;
10use std::collections::HashMap;
11use std::time::Duration;
12
13#[cfg(feature = "sync")]
14use leptos_sync_core::{
15 LocalFirstCollection,
16 LwwRegister,
17 storage::Storage,
18 transport::HybridTransport
19};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum NetworkStatus {
24 Online,
25 Offline,
26}
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum ConflictResolutionStrategy {
31 LastWriterWins,
32 Merge,
33 Custom,
34}
35
36#[derive(Debug, Clone)]
38pub struct SyncResult {
39 pub synced_operations: usize,
40 pub conflicts_resolved: usize,
41 pub duration: Duration,
42}
43
44pub type OperationId = uuid::Uuid;
46
47#[cfg(feature = "sync")]
49pub struct SyncManager {
50 data: HashMap<String, serde_json::Value>,
52 network_status: NetworkStatus,
54 queued_operations: Vec<QueuedOperation>,
56}
57
58#[cfg(feature = "sync")]
59#[derive(Debug, Clone)]
60struct QueuedOperation {
61 id: OperationId,
62 key: QueryKey,
63 data: serde_json::Value,
64 operation_type: OperationType,
65}
66
67#[cfg(feature = "sync")]
68#[derive(Debug, Clone)]
69enum OperationType {
70 Store,
71 Update,
72 Delete,
73}
74
75#[cfg(feature = "sync")]
76impl SyncManager {
77 pub async fn new() -> Result<Self, QueryError> {
79 Ok(Self {
80 data: HashMap::new(),
81 network_status: NetworkStatus::Online,
82 queued_operations: Vec::new(),
83 })
84 }
85
86 pub async fn store_with_crdt<T>(&mut self, key: &QueryKey, data: T) -> Result<(), QueryError>
88 where
89 T: Serialize + Clone,
90 {
91 let key_str = key.to_string();
92 let json_data = serde_json::to_value(data)
93 .map_err(|e| QueryError::SerializationError(e.to_string()))?;
94
95 if let Some(existing_data) = self.data.get(&key_str) {
97 if let (Some(new_version), Some(existing_version)) = (
98 json_data.get("version").and_then(|v| v.as_u64()),
99 existing_data.get("version").and_then(|v| v.as_u64())
100 ) {
101 if new_version <= existing_version {
103 return Ok(()); }
105 }
106 }
107
108 self.data.insert(key_str, json_data);
110 Ok(())
111 }
112
113 pub async fn get_with_crdt<T>(&self, key: &QueryKey) -> Result<Option<T>, QueryError>
115 where
116 T: DeserializeOwned,
117 {
118 let key_str = key.to_string();
119
120 if let Some(json_data) = self.data.get(&key_str) {
121 let deserialized: T = serde_json::from_value(json_data.clone())
122 .map_err(|e| QueryError::DeserializationError(e.to_string()))?;
123 return Ok(Some(deserialized));
124 }
125
126 Ok(None)
127 }
128
129 pub async fn resolve_conflicts(
131 &mut self,
132 key: &QueryKey,
133 strategy: ConflictResolutionStrategy,
134 ) -> Result<(), QueryError> {
135 let key_str = key.to_string();
136
137 match strategy {
138 ConflictResolutionStrategy::LastWriterWins => {
139 Ok(())
142 }
143 ConflictResolutionStrategy::Merge => {
144 Ok(())
147 }
148 ConflictResolutionStrategy::Custom => {
149 Ok(())
151 }
152 }
153 }
154
155 pub fn set_network_status(&mut self, status: NetworkStatus) {
157 self.network_status = status;
158 }
159
160 pub async fn queue_operation<T>(&mut self, key: &QueryKey, data: T) -> Result<Option<OperationId>, QueryError>
162 where
163 T: Serialize + Clone,
164 {
165 if self.network_status == NetworkStatus::Offline {
166 let operation_id = uuid::Uuid::new_v4();
167 let json_data = serde_json::to_value(data)
168 .map_err(|e| QueryError::SerializationError(e.to_string()))?;
169
170 let operation = QueuedOperation {
171 id: operation_id,
172 key: key.clone(),
173 data: json_data,
174 operation_type: OperationType::Store,
175 };
176
177 self.queued_operations.push(operation);
178 return Ok(Some(operation_id));
179 }
180
181 Ok(None)
182 }
183
184 pub fn has_pending_operations(&self) -> bool {
186 !self.queued_operations.is_empty()
187 }
188
189 pub fn pending_operation_count(&self) -> usize {
191 self.queued_operations.len()
192 }
193
194 pub async fn process_queued_operations(&mut self) -> Result<(), QueryError> {
196 let operations = std::mem::take(&mut self.queued_operations);
197
198 for operation in operations {
199 match operation.operation_type {
200 OperationType::Store => {
201 self.store_with_crdt(&operation.key, operation.data).await?;
202 }
203 OperationType::Update => {
204 self.store_with_crdt(&operation.key, operation.data).await?;
205 }
206 OperationType::Delete => {
207 }
209 }
210 }
211
212 Ok(())
213 }
214
215 pub async fn merge_with(&mut self, other: &mut SyncManager) -> Result<(), QueryError> {
217 for (key, value) in other.data.iter() {
219 self.data.insert(key.clone(), value.clone());
220 }
221
222 self.queued_operations.extend(other.queued_operations.clone());
224
225 Ok(())
226 }
227
228 pub async fn detect_conflicts(&self, key: &QueryKey) -> Result<Vec<Conflict>, QueryError> {
230 let key_str = key.to_string();
231 let mut conflicts = Vec::new();
232
233 if self.data.contains_key(&key_str) {
235 conflicts.push(Conflict {
236 key: key.clone(),
237 conflict_type: ConflictType::ConcurrentUpdate,
238 resolution_strategy: ConflictResolutionStrategy::LastWriterWins,
239 });
240 }
241
242 Ok(conflicts)
243 }
244
245 pub async fn auto_sync(&mut self) -> Result<SyncResult, QueryError> {
247 let start_time = std::time::Instant::now();
248 let mut synced_operations = 0;
249 let mut conflicts_resolved = 0;
250
251 if !self.queued_operations.is_empty() {
253 let operation_count = self.queued_operations.len();
254 self.process_queued_operations().await?;
255 synced_operations = operation_count;
256 }
257
258 if !self.data.is_empty() {
260 synced_operations += self.data.len();
261 }
262
263 let duration = start_time.elapsed();
264
265 Ok(SyncResult {
266 synced_operations,
267 conflicts_resolved,
268 duration,
269 })
270 }
271}
272
273#[derive(Debug, Clone)]
275pub struct Conflict {
276 pub key: QueryKey,
277 pub conflict_type: ConflictType,
278 pub resolution_strategy: ConflictResolutionStrategy,
279}
280
281#[derive(Debug, Clone)]
283pub enum ConflictType {
284 ConcurrentUpdate,
285 DataMismatch,
286 VersionConflict,
287}
288
289#[cfg(not(feature = "sync"))]
291#[derive(Clone)]
292pub struct SyncManager {
293 _placeholder: (),
295}
296
297#[cfg(not(feature = "sync"))]
298impl SyncManager {
299 pub async fn new() -> Result<Self, QueryError> {
300 Err(QueryError::GenericError("Sync feature not enabled".to_string()))
301 }
302}
303
304#[cfg(feature = "sync")]
309pub mod crdt {
310 }
313
314#[cfg(not(feature = "sync"))]
315pub mod crdt {
316 }