leptos_sync_core/
collection.rs

1//! Local-first collection with synchronization capabilities
2
3use crate::{
4    crdt::{Mergeable, ReplicaId},
5    storage::{LocalStorage, Storage, StorageError},
6    sync::{SyncEngine, SyncState},
7    transport::{SyncTransport, TransportError},
8};
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use std::marker::PhantomData;
12use tokio::sync::RwLock;
13use thiserror::Error;
14
15#[derive(Error, Debug)]
16pub enum CollectionError {
17    #[error("Storage error: {0}")]
18    Storage(#[from] StorageError),
19    #[error("Transport error: {0}")]
20    Transport(#[from] TransportError),
21    #[error("Sync error: {0}")]
22    Sync(#[from] crate::sync::SyncEngineError),
23    #[error("Serialization error: {0}")]
24    Serialization(#[from] serde_json::Error),
25    #[error("Item not found: {0}")]
26    NotFound(String),
27    #[error("Invalid operation: {0}")]
28    InvalidOperation(String),
29}
30
31/// Local-first collection that can synchronize with remote peers
32pub struct LocalFirstCollection<T, Tr>
33where
34    T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
35    Tr: SyncTransport + Clone + 'static,
36{
37    storage: Storage,
38    sync_engine: Arc<RwLock<SyncEngine<Tr>>>,
39    auto_sync: bool,
40    _phantom: PhantomData<T>,
41}
42
43/// Builder for LocalFirstCollection
44pub struct CollectionBuilder<Tr>
45where
46    Tr: SyncTransport + Clone + 'static,
47{
48    storage: Storage,
49    transport: Tr,
50    auto_sync: bool,
51    replica_id: Option<ReplicaId>,
52}
53
54impl<Tr> CollectionBuilder<Tr>
55where
56    Tr: SyncTransport + Clone + 'static,
57{
58    pub fn new(storage: Storage, transport: Tr) -> Self {
59        Self {
60            storage,
61            transport,
62            auto_sync: false,
63            replica_id: None,
64        }
65    }
66
67    pub fn with_auto_sync(mut self, enabled: bool) -> Self {
68        self.auto_sync = enabled;
69        self
70    }
71
72    pub fn with_replica_id(mut self, replica_id: ReplicaId) -> Self {
73        self.replica_id = Some(replica_id);
74        self
75    }
76
77    pub fn build<T>(self) -> LocalFirstCollection<T, Tr>
78    where
79        T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
80    {
81        let sync_engine = if let Some(replica_id) = self.replica_id {
82            SyncEngine::with_replica_id(self.storage.clone(), self.transport.clone(), replica_id)
83        } else {
84            SyncEngine::new(self.storage.clone(), self.transport.clone())
85        };
86
87        LocalFirstCollection::<T, Tr> {
88            storage: self.storage,
89            sync_engine: Arc::new(RwLock::new(sync_engine)),
90            auto_sync: self.auto_sync,
91            _phantom: PhantomData,
92        }
93    }
94}
95
96impl<T, Tr> LocalFirstCollection<T, Tr>
97where
98    T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
99    Tr: SyncTransport + Clone + 'static,
100{
101    /// Create a new collection
102    pub fn new(storage: Storage, transport: Tr) -> Self {
103        let sync_engine = SyncEngine::new(storage.clone(), transport);
104        
105        Self {
106            storage,
107            sync_engine: Arc::new(RwLock::new(sync_engine)),
108            auto_sync: false,
109            _phantom: PhantomData,
110        }
111    }
112
113    /// Create a collection with a specific replica ID
114    pub fn with_replica_id(storage: Storage, transport: Tr, replica_id: ReplicaId) -> Self {
115        let sync_engine = SyncEngine::with_replica_id(storage.clone(), transport, replica_id);
116        
117        Self {
118            storage,
119            sync_engine: Arc::new(RwLock::new(sync_engine)),
120            auto_sync: false,
121            _phantom: PhantomData,
122        }
123    }
124
125    /// Get the replica ID for this collection
126    pub fn replica_id(&self) -> ReplicaId {
127        // This is a simplified version - in a real implementation you'd get it from the sync engine
128        ReplicaId::default()
129    }
130
131    /// Insert or update an item
132    pub async fn insert(&self, key: &str, value: &T) -> Result<(), CollectionError> {
133        // Store locally first
134        self.storage.set(key, value).await?;
135
136        // Sync if auto-sync is enabled
137        if self.auto_sync {
138            let mut engine = self.sync_engine.write().await;
139            engine.sync(key, value).await?;
140        }
141
142        Ok(())
143    }
144
145    /// Get an item by key
146    pub async fn get(&self, key: &str) -> Result<Option<T>, CollectionError> {
147        self.storage.get(key).await.map_err(Into::into)
148    }
149
150    /// Remove an item
151    pub async fn remove(&self, key: &str) -> Result<(), CollectionError> {
152        self.storage.remove(key).await.map_err(Into::into)
153    }
154
155    /// Get all keys
156    pub async fn keys(&self) -> Result<Vec<String>, CollectionError> {
157        self.storage.keys().await.map_err(Into::into)
158    }
159
160    /// Get all values
161    pub async fn values(&self) -> Result<Vec<T>, CollectionError> {
162        let keys = self.storage.keys().await.map_err(|e| CollectionError::Storage(e))?;
163        let mut values = Vec::new();
164        
165        for key in keys {
166            if let Some(value) = self.get(&key).await? {
167                values.push(value);
168            }
169        }
170        
171        Ok(values)
172    }
173
174    /// Check if a key exists
175    pub async fn contains_key(&self, key: &str) -> Result<bool, CollectionError> {
176        self.storage.contains_key(key).await.map_err(Into::into)
177    }
178
179    /// Get the number of items
180    pub async fn len(&self) -> Result<usize, CollectionError> {
181        self.storage.len().await.map_err(Into::into)
182    }
183
184    /// Check if the collection is empty
185    pub async fn is_empty(&self) -> Result<bool, CollectionError> {
186        self.storage.is_empty().await.map_err(Into::into)
187    }
188
189    /// Start synchronization
190    pub async fn start_sync(&self) -> Result<(), CollectionError> {
191        let mut engine = self.sync_engine.write().await;
192        engine.start_sync().await.map_err(Into::into)
193    }
194
195    /// Stop synchronization
196    pub async fn stop_sync(&self) -> Result<(), CollectionError> {
197        let mut engine = self.sync_engine.write().await;
198        engine.stop_sync().await.map_err(Into::into)
199    }
200
201    /// Get synchronization state
202    pub async fn sync_state(&self) -> Result<SyncState, CollectionError> {
203        let engine = self.sync_engine.read().await;
204        Ok(engine.state().await)
205    }
206
207    /// Check if online
208    pub async fn is_online(&self) -> Result<bool, CollectionError> {
209        let engine = self.sync_engine.read().await;
210        Ok(engine.is_online().await)
211    }
212
213    /// Get peer count
214    pub async fn peer_count(&self) -> Result<usize, CollectionError> {
215        let engine = self.sync_engine.read().await;
216        Ok(engine.peer_count().await)
217    }
218
219    /// Set auto-sync mode
220    pub fn set_auto_sync(&mut self, enabled: bool) {
221        self.auto_sync = enabled;
222    }
223
224    /// Force synchronization
225    pub async fn force_sync(&self) -> Result<(), CollectionError> {
226        let mut engine = self.sync_engine.write().await;
227        
228        // Process any pending messages
229        engine.process_messages().await.map_err(|e| CollectionError::Sync(e))?;
230        
231        Ok(())
232    }
233
234    /// Get all peers
235    pub async fn peers(&self) -> Result<impl Iterator<Item = (ReplicaId, crate::sync::PeerInfo)>, CollectionError> {
236        let engine = self.sync_engine.read().await;
237        Ok(engine.peers().await)
238    }
239
240    /// Get sync information
241    pub async fn sync_info(&self) -> Result<SyncInfo, CollectionError> {
242        let engine = self.sync_engine.read().await;
243        
244        Ok(SyncInfo {
245            sync_state: engine.state().await,
246            peer_count: engine.peer_count().await,
247            is_online: engine.is_online().await,
248        })
249    }
250}
251
252/// Synchronization information
253#[derive(Debug, Clone)]
254pub struct SyncInfo {
255    pub sync_state: SyncState,
256    pub peer_count: usize,
257    pub is_online: bool,
258}
259
260/// Iterator over collection items
261pub struct CollectionIterator<T, Tr>
262where
263    T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
264    Tr: SyncTransport + Clone + 'static,
265{
266    collection: Arc<LocalFirstCollection<T, Tr>>,
267    keys: Vec<String>,
268    current_index: usize,
269    _phantom: PhantomData<T>,
270}
271
272impl<T, Tr> CollectionIterator<T, Tr>
273where
274    T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
275    Tr: SyncTransport + Clone + 'static,
276{
277    pub fn new(collection: Arc<LocalFirstCollection<T, Tr>>) -> Self {
278        Self {
279            collection,
280            keys: Vec::new(),
281            current_index: 0,
282            _phantom: PhantomData,
283        }
284    }
285
286    pub async fn load_keys(&mut self) -> Result<(), CollectionError> {
287        self.keys = self.collection.storage.keys().await.map_err(|e| CollectionError::Storage(e))?;
288        Ok(())
289    }
290}
291
292impl<T, Tr> Iterator for CollectionIterator<T, Tr>
293where
294    T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
295    Tr: SyncTransport + Clone + 'static,
296{
297    type Item = (String, T);
298
299    fn next(&mut self) -> Option<Self::Item> {
300        if self.current_index >= self.keys.len() {
301            return None;
302        }
303
304        let key = self.keys[self.current_index].clone();
305        self.current_index += 1;
306
307        // Note: This is a simplified implementation
308        // In a real implementation, you'd want to handle the async nature properly
309        Some((key, T::default())) // Placeholder
310    }
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316    use crate::storage::Storage;
317    use crate::transport::InMemoryTransport;
318    use crate::crdt::{LwwRegister, ReplicaId};
319
320    #[tokio::test]
321    async fn test_collection_basic_operations() {
322        let storage = Storage::memory();
323        let transport = InMemoryTransport::new();
324        let collection = LocalFirstCollection::<LwwRegister<String>, _>::new(storage, transport);
325
326        // Test insert and get
327        let value1 = LwwRegister::new("value1".to_string(), ReplicaId::default());
328        assert!(collection.insert("key1", &value1).await.is_ok());
329        let value = collection.get("key1").await.unwrap();
330        assert_eq!(value, Some(value1));
331
332        // Test remove
333        assert!(collection.remove("key1").await.is_ok());
334        let value = collection.get("key1").await.unwrap();
335        assert_eq!(value, None);
336    }
337
338    #[tokio::test]
339    async fn test_collection_builder() {
340        let storage = Storage::memory();
341        let transport = InMemoryTransport::new();
342        
343        let collection = CollectionBuilder::new(storage, transport)
344            .with_auto_sync(true)
345            .build::<LwwRegister<String>>();
346
347        assert!(collection.auto_sync);
348    }
349}