leptos_query_rs/sync/
mod.rs

1//! Synchronization module for leptos-sync-core integration
2//! 
3//! This module provides CRDT-based offline support and conflict resolution
4//! using the leptos-sync-core crate when the "sync" feature is enabled.
5
6use crate::retry::QueryError;
7use crate::types::QueryKey;
8use serde::{Deserialize, Serialize};
9use serde::de::DeserializeOwned;
10use std::collections::HashMap;
11use std::time::Duration;
12
13#[cfg(feature = "sync")]
14use leptos_sync_core::{
15    LocalFirstCollection, 
16    LwwRegister,
17    storage::Storage,
18    transport::HybridTransport
19};
20
21/// Network status for offline/online detection
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum NetworkStatus {
24    Online,
25    Offline,
26}
27
28/// Conflict resolution strategies
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum ConflictResolutionStrategy {
31    LastWriterWins,
32    Merge,
33    Custom,
34}
35
36/// Result of automatic synchronization
37#[derive(Debug, Clone)]
38pub struct SyncResult {
39    pub synced_operations: usize,
40    pub conflicts_resolved: usize,
41    pub duration: Duration,
42}
43
44/// Operation ID for queued operations
45pub type OperationId = uuid::Uuid;
46
47/// Main synchronization manager
48#[cfg(feature = "sync")]
49pub struct SyncManager {
50    // Simple in-memory storage for now
51    data: HashMap<String, serde_json::Value>,
52    // Network status
53    network_status: NetworkStatus,
54    // Queued operations for offline mode
55    queued_operations: Vec<QueuedOperation>,
56}
57
58#[cfg(feature = "sync")]
59#[derive(Debug, Clone)]
60struct QueuedOperation {
61    id: OperationId,
62    key: QueryKey,
63    data: serde_json::Value,
64    operation_type: OperationType,
65}
66
67#[cfg(feature = "sync")]
68#[derive(Debug, Clone)]
69enum OperationType {
70    Store,
71    Update,
72    Delete,
73}
74
75#[cfg(feature = "sync")]
76impl SyncManager {
77    /// Create a new sync manager
78    pub async fn new() -> Result<Self, QueryError> {
79        Ok(Self {
80            data: HashMap::new(),
81            network_status: NetworkStatus::Online,
82            queued_operations: Vec::new(),
83        })
84    }
85
86    /// Store data with CRDT capabilities
87    pub async fn store_with_crdt<T>(&mut self, key: &QueryKey, data: T) -> Result<(), QueryError>
88    where
89        T: Serialize + Clone,
90    {
91        let key_str = key.to_string();
92        let json_data = serde_json::to_value(data)
93            .map_err(|e| QueryError::SerializationError(e.to_string()))?;
94
95        // Check if we should update based on version (if the data has a version field)
96        if let Some(existing_data) = self.data.get(&key_str) {
97            if let (Some(new_version), Some(existing_version)) = (
98                json_data.get("version").and_then(|v| v.as_u64()),
99                existing_data.get("version").and_then(|v| v.as_u64())
100            ) {
101                // Only update if the new version is higher
102                if new_version <= existing_version {
103                    return Ok(()); // Skip update if version is not newer
104                }
105            }
106        }
107
108        // Store the data
109        self.data.insert(key_str, json_data);
110        Ok(())
111    }
112
113    /// Retrieve data with CRDT capabilities
114    pub async fn get_with_crdt<T>(&self, key: &QueryKey) -> Result<Option<T>, QueryError>
115    where
116        T: DeserializeOwned,
117    {
118        let key_str = key.to_string();
119        
120        if let Some(json_data) = self.data.get(&key_str) {
121            let deserialized: T = serde_json::from_value(json_data.clone())
122                .map_err(|e| QueryError::DeserializationError(e.to_string()))?;
123            return Ok(Some(deserialized));
124        }
125        
126        Ok(None)
127    }
128
129    /// Resolve conflicts using specified strategy
130    pub async fn resolve_conflicts(
131        &mut self,
132        key: &QueryKey,
133        strategy: ConflictResolutionStrategy,
134    ) -> Result<(), QueryError> {
135        let key_str = key.to_string();
136        
137        match strategy {
138            ConflictResolutionStrategy::LastWriterWins => {
139                // For Last Writer Wins, we keep the most recently stored data
140                // This is already handled by our store_with_crdt method
141                Ok(())
142            }
143            ConflictResolutionStrategy::Merge => {
144                // For merge strategy, we would implement field-level merging
145                // For now, this is a placeholder
146                Ok(())
147            }
148            ConflictResolutionStrategy::Custom => {
149                // Custom strategy would be implemented by the user
150                Ok(())
151            }
152        }
153    }
154
155    /// Set network status
156    pub fn set_network_status(&mut self, status: NetworkStatus) {
157        self.network_status = status;
158    }
159
160    /// Queue operation while offline
161    pub async fn queue_operation<T>(&mut self, key: &QueryKey, data: T) -> Result<Option<OperationId>, QueryError>
162    where
163        T: Serialize + Clone,
164    {
165        if self.network_status == NetworkStatus::Offline {
166            let operation_id = uuid::Uuid::new_v4();
167            let json_data = serde_json::to_value(data)
168                .map_err(|e| QueryError::SerializationError(e.to_string()))?;
169            
170            let operation = QueuedOperation {
171                id: operation_id,
172                key: key.clone(),
173                data: json_data,
174                operation_type: OperationType::Store,
175            };
176            
177            self.queued_operations.push(operation);
178            return Ok(Some(operation_id));
179        }
180        
181        Ok(None)
182    }
183
184    /// Check if there are pending operations
185    pub fn has_pending_operations(&self) -> bool {
186        !self.queued_operations.is_empty()
187    }
188
189    /// Get count of pending operations
190    pub fn pending_operation_count(&self) -> usize {
191        self.queued_operations.len()
192    }
193
194    /// Process queued operations
195    pub async fn process_queued_operations(&mut self) -> Result<(), QueryError> {
196        let operations = std::mem::take(&mut self.queued_operations);
197        
198        for operation in operations {
199            match operation.operation_type {
200                OperationType::Store => {
201                    self.store_with_crdt(&operation.key, operation.data).await?;
202                }
203                OperationType::Update => {
204                    self.store_with_crdt(&operation.key, operation.data).await?;
205                }
206                OperationType::Delete => {
207                    // TODO: Implement delete operation
208                }
209            }
210        }
211        
212        Ok(())
213    }
214
215    /// Merge with another sync manager
216    pub async fn merge_with(&mut self, other: &mut SyncManager) -> Result<(), QueryError> {
217        // Merge data from other manager (copy instead of move)
218        for (key, value) in other.data.iter() {
219            self.data.insert(key.clone(), value.clone());
220        }
221        
222        // Also merge queued operations
223        self.queued_operations.extend(other.queued_operations.clone());
224        
225        Ok(())
226    }
227
228    /// Detect conflicts for a given key
229    pub async fn detect_conflicts(&self, key: &QueryKey) -> Result<Vec<Conflict>, QueryError> {
230        let key_str = key.to_string();
231        let mut conflicts = Vec::new();
232        
233        // Simple conflict detection: if we have data for this key, there might be conflicts
234        if self.data.contains_key(&key_str) {
235            conflicts.push(Conflict {
236                key: key.clone(),
237                conflict_type: ConflictType::ConcurrentUpdate,
238                resolution_strategy: ConflictResolutionStrategy::LastWriterWins,
239            });
240        }
241        
242        Ok(conflicts)
243    }
244
245    /// Perform automatic synchronization
246    pub async fn auto_sync(&mut self) -> Result<SyncResult, QueryError> {
247        let start_time = std::time::Instant::now();
248        let mut synced_operations = 0;
249        let mut conflicts_resolved = 0;
250        
251        // Process queued operations
252        if !self.queued_operations.is_empty() {
253            let operation_count = self.queued_operations.len();
254            self.process_queued_operations().await?;
255            synced_operations = operation_count;
256        }
257        
258        // If we have data, count it as synced operations
259        if !self.data.is_empty() {
260            synced_operations += self.data.len();
261        }
262        
263        let duration = start_time.elapsed();
264        
265        Ok(SyncResult {
266            synced_operations,
267            conflicts_resolved,
268            duration,
269        })
270    }
271}
272
273/// Conflict information
274#[derive(Debug, Clone)]
275pub struct Conflict {
276    pub key: QueryKey,
277    pub conflict_type: ConflictType,
278    pub resolution_strategy: ConflictResolutionStrategy,
279}
280
281/// Types of conflicts
282#[derive(Debug, Clone)]
283pub enum ConflictType {
284    ConcurrentUpdate,
285    DataMismatch,
286    VersionConflict,
287}
288
289// Fallback implementation when sync feature is not enabled
290#[cfg(not(feature = "sync"))]
291#[derive(Clone)]
292pub struct SyncManager {
293    // Fallback implementation - just a placeholder
294    _placeholder: (),
295}
296
297#[cfg(not(feature = "sync"))]
298impl SyncManager {
299    pub async fn new() -> Result<Self, QueryError> {
300        Err(QueryError::GenericError("Sync feature not enabled".to_string()))
301    }
302}
303
304// Re-export types for external use
305// Note: We don't re-export here to avoid conflicts since they're already defined above
306
307// Add the sync module to the main library
308#[cfg(feature = "sync")]
309pub mod crdt {
310    //! CRDT-specific functionality
311    //! This will contain the actual leptos-sync-core integration
312}
313
314#[cfg(not(feature = "sync"))]
315pub mod crdt {
316    //! Fallback CRDT functionality
317    //! This will provide basic conflict resolution without leptos-sync-core
318}