leptos_sync_core/
collection.rs

1//! Local-first collection with synchronization capabilities
2
3use crate::{
4    crdt::{Mergeable, ReplicaId},
5    storage::{LocalStorage, Storage, StorageError},
6    sync::{SyncEngine, SyncState},
7    transport::{SyncTransport, TransportError},
8};
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use std::marker::PhantomData;
12use tokio::sync::RwLock;
13use thiserror::Error;
14
15#[derive(Error, Debug)]
16pub enum CollectionError {
17    #[error("Storage error: {0}")]
18    Storage(#[from] StorageError),
19    #[error("Transport error: {0}")]
20    Transport(#[from] TransportError),
21    #[error("Sync error: {0}")]
22    Sync(#[from] crate::sync::SyncEngineError),
23    #[error("Serialization error: {0}")]
24    Serialization(#[from] serde_json::Error),
25    #[error("Item not found: {0}")]
26    NotFound(String),
27    #[error("Invalid operation: {0}")]
28    InvalidOperation(String),
29}
30
31/// Local-first collection that can synchronize with remote peers
32pub struct LocalFirstCollection<T, Tr>
33where
34    T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
35    Tr: SyncTransport + Clone + 'static,
36{
37    storage: Storage,
38    sync_engine: Arc<RwLock<SyncEngine<Tr>>>,
39    auto_sync: bool,
40    _phantom: PhantomData<T>,
41}
42
43/// Builder for LocalFirstCollection
44pub struct CollectionBuilder<Tr>
45where
46    Tr: SyncTransport + Clone + 'static,
47{
48    storage: Storage,
49    transport: Tr,
50    auto_sync: bool,
51    replica_id: Option<ReplicaId>,
52}
53
54impl<Tr> CollectionBuilder<Tr>
55where
56    Tr: SyncTransport + Clone + 'static,
57{
58    pub fn new(storage: Storage, transport: Tr) -> Self {
59        Self {
60            storage,
61            transport,
62            auto_sync: false,
63            replica_id: None,
64        }
65    }
66
67    pub fn with_auto_sync(mut self, enabled: bool) -> Self {
68        self.auto_sync = enabled;
69        self
70    }
71
72    pub fn with_replica_id(mut self, replica_id: ReplicaId) -> Self {
73        self.replica_id = Some(replica_id);
74        self
75    }
76
77    pub fn build<T>(self) -> LocalFirstCollection<T, Tr>
78    where
79        T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
80    {
81        let sync_engine = if let Some(replica_id) = self.replica_id {
82            SyncEngine::with_replica_id(self.storage.clone(), self.transport.clone(), replica_id)
83        } else {
84            SyncEngine::new(self.storage.clone(), self.transport.clone())
85        };
86
87        LocalFirstCollection::<T, Tr> {
88            storage: self.storage,
89            sync_engine: Arc::new(RwLock::new(sync_engine)),
90            auto_sync: self.auto_sync,
91            _phantom: PhantomData,
92        }
93    }
94}
95
96impl<T, Tr> LocalFirstCollection<T, Tr>
97where
98    T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
99    Tr: SyncTransport + Clone + 'static,
100{
101    /// Create a new collection
102    pub fn new(storage: Storage, transport: Tr) -> Self {
103        let sync_engine = SyncEngine::new(storage.clone(), transport);
104        
105        Self {
106            storage,
107            sync_engine: Arc::new(RwLock::new(sync_engine)),
108            auto_sync: false,
109            _phantom: PhantomData,
110        }
111    }
112
113    /// Create a collection with a specific replica ID
114    pub fn with_replica_id(storage: Storage, transport: Tr, replica_id: ReplicaId) -> Self {
115        let sync_engine = SyncEngine::with_replica_id(storage.clone(), transport, replica_id);
116        
117        Self {
118            storage,
119            sync_engine: Arc::new(RwLock::new(sync_engine)),
120            auto_sync: false,
121            _phantom: PhantomData,
122        }
123    }
124
125    /// Get the replica ID for this collection
126    pub fn replica_id(&self) -> ReplicaId {
127        // This is a simplified version - in a real implementation you'd get it from the sync engine
128        ReplicaId::default()
129    }
130
131    /// Insert or update an item
132    pub async fn insert(&self, key: &str, value: &T) -> Result<(), CollectionError> {
133        // Store locally first
134        self.storage.set(key, value).await?;
135
136        // Sync if auto-sync is enabled
137        if self.auto_sync {
138            let mut engine = self.sync_engine.write().await;
139            engine.sync(key, value).await?;
140        }
141
142        Ok(())
143    }
144
145    /// Get an item by key
146    pub async fn get(&self, key: &str) -> Result<Option<T>, CollectionError> {
147        self.storage.get(key).await.map_err(Into::into)
148    }
149
150    /// Remove an item
151    pub async fn remove(&self, key: &str) -> Result<(), CollectionError> {
152        self.storage.remove(key).await.map_err(Into::into)
153    }
154
155    /// Get all keys
156    pub async fn keys(&self) -> Result<Vec<String>, CollectionError> {
157        self.storage.keys().await.map_err(Into::into)
158    }
159
160    /// Get all values
161    pub async fn values(&self) -> Result<Vec<T>, CollectionError> {
162        let keys = self.storage.keys().await.map_err(|e| CollectionError::Storage(e))?;
163        let mut values = Vec::new();
164        
165        for key in keys {
166            if let Some(value) = self.get(&key).await? {
167                values.push(value);
168            }
169        }
170        
171        Ok(values)
172    }
173
174    /// Check if a key exists
175    pub async fn contains_key(&self, key: &str) -> Result<bool, CollectionError> {
176        self.storage.contains_key(key).await.map_err(Into::into)
177    }
178
179    /// Get the number of items
180    pub async fn len(&self) -> Result<usize, CollectionError> {
181        self.storage.len().await.map_err(Into::into)
182    }
183
184    /// Check if the collection is empty
185    pub async fn is_empty(&self) -> Result<bool, CollectionError> {
186        self.storage.is_empty().await.map_err(Into::into)
187    }
188
189    /// Start synchronization
190    pub async fn start_sync(&self) -> Result<(), CollectionError> {
191        let mut engine = self.sync_engine.write().await;
192        engine.start_sync().await.map_err(Into::into)
193    }
194
195    /// Stop synchronization
196    pub async fn stop_sync(&self) -> Result<(), CollectionError> {
197        let mut engine = self.sync_engine.write().await;
198        engine.stop_sync().await.map_err(Into::into)
199    }
200
201    /// Get synchronization state
202    pub async fn sync_state(&self) -> Result<SyncState, CollectionError> {
203        let engine = self.sync_engine.read().await;
204        Ok(engine.state().await)
205    }
206
207    /// Check if online
208    pub async fn is_online(&self) -> Result<bool, CollectionError> {
209        let engine = self.sync_engine.read().await;
210        Ok(engine.is_online().await)
211    }
212
213    /// Get peer count
214    pub async fn peer_count(&self) -> Result<usize, CollectionError> {
215        let engine = self.sync_engine.read().await;
216        Ok(engine.peer_count().await)
217    }
218
219    /// Set auto-sync mode
220    pub fn set_auto_sync(&mut self, enabled: bool) {
221        self.auto_sync = enabled;
222    }
223
224    /// Force synchronization
225    pub async fn force_sync(&self) -> Result<(), CollectionError> {
226        let mut engine = self.sync_engine.write().await;
227        
228        // Process any pending messages
229        engine.process_messages().await.map_err(|e| CollectionError::Sync(e))?;
230        
231        Ok(())
232    }
233
234    /// Insert or update multiple items in a batch
235    pub async fn insert_batch(&self, items: impl IntoIterator<Item = (String, T)>) -> Result<(), CollectionError> {
236        let items: Vec<_> = items.into_iter().collect();
237        
238        // Store locally first in batch
239        for (key, value) in &items {
240            self.storage.set(key, value).await?;
241        }
242
243        // Sync if auto-sync is enabled
244        if self.auto_sync {
245            let mut engine = self.sync_engine.write().await;
246            for (key, value) in items {
247                engine.sync(&key, &value).await?;
248            }
249        }
250
251        Ok(())
252    }
253
254    /// Update multiple items in a batch
255    pub async fn update_batch(&self, updates: impl IntoIterator<Item = (String, T)>) -> Result<(), CollectionError> {
256        let updates: Vec<_> = updates.into_iter().collect();
257        
258        // Verify all keys exist before updating
259        for (key, _) in &updates {
260            if !self.storage.contains_key(key).await? {
261                return Err(CollectionError::NotFound(key.clone()));
262            }
263        }
264        
265        // Update locally in batch
266        for (key, value) in &updates {
267            self.storage.set(key, value).await?;
268        }
269
270        // Sync if auto-sync is enabled
271        if self.auto_sync {
272            let mut engine = self.sync_engine.write().await;
273            for (key, value) in updates {
274                engine.sync(&key, &value).await?;
275            }
276        }
277
278        Ok(())
279    }
280
281    /// Remove multiple items in a batch
282    pub async fn remove_batch(&self, keys: impl IntoIterator<Item = String>) -> Result<(), CollectionError> {
283        let keys: Vec<_> = keys.into_iter().collect();
284        
285        // Remove locally in batch
286        for key in &keys {
287            self.storage.remove(key).await?;
288        }
289
290        // Sync if auto-sync is enabled
291        if self.auto_sync {
292            let mut engine = self.sync_engine.write().await;
293            for key in keys {
294                // Note: In a real implementation, you'd want to handle deletion sync
295                // This is a simplified version
296                engine.sync(&key, &T::default()).await?;
297            }
298        }
299
300        Ok(())
301    }
302
303    /// Get multiple items by keys
304    pub async fn get_batch(&self, keys: impl IntoIterator<Item = String>) -> Result<Vec<(String, Option<T>)>, CollectionError> {
305        let keys: Vec<_> = keys.into_iter().collect();
306        let mut results = Vec::new();
307        
308        for key in keys {
309            let value = self.storage.get(&key).await?;
310            results.push((key, value));
311        }
312        
313        Ok(results)
314    }
315
316    /// Check if multiple keys exist
317    pub async fn contains_keys(&self, keys: impl IntoIterator<Item = String>) -> Result<Vec<(String, bool)>, CollectionError> {
318        let keys: Vec<_> = keys.into_iter().collect();
319        let mut results = Vec::new();
320        
321        for key in keys {
322            let exists = self.storage.contains_key(&key).await?;
323            results.push((key, exists));
324        }
325        
326        Ok(results)
327    }
328
329    /// Get all peers
330    pub async fn peers(&self) -> Result<impl Iterator<Item = (ReplicaId, crate::sync::PeerInfo)>, CollectionError> {
331        let engine = self.sync_engine.read().await;
332        Ok(engine.peers().await)
333    }
334
335    /// Get sync information
336    pub async fn sync_info(&self) -> Result<SyncInfo, CollectionError> {
337        let engine = self.sync_engine.read().await;
338        
339        Ok(SyncInfo {
340            sync_state: engine.state().await,
341            peer_count: engine.peer_count().await,
342            is_online: engine.is_online().await,
343        })
344    }
345}
346
347/// Synchronization information
348#[derive(Debug, Clone)]
349pub struct SyncInfo {
350    pub sync_state: SyncState,
351    pub peer_count: usize,
352    pub is_online: bool,
353}
354
355/// Iterator over collection items
356pub struct CollectionIterator<T, Tr>
357where
358    T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
359    Tr: SyncTransport + Clone + 'static,
360{
361    collection: Arc<LocalFirstCollection<T, Tr>>,
362    keys: Vec<String>,
363    current_index: usize,
364    _phantom: PhantomData<T>,
365}
366
367impl<T, Tr> CollectionIterator<T, Tr>
368where
369    T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
370    Tr: SyncTransport + Clone + 'static,
371{
372    pub fn new(collection: Arc<LocalFirstCollection<T, Tr>>) -> Self {
373        Self {
374            collection,
375            keys: Vec::new(),
376            current_index: 0,
377            _phantom: PhantomData,
378        }
379    }
380
381    pub async fn load_keys(&mut self) -> Result<(), CollectionError> {
382        self.keys = self.collection.storage.keys().await.map_err(|e| CollectionError::Storage(e))?;
383        Ok(())
384    }
385}
386
387impl<T, Tr> Iterator for CollectionIterator<T, Tr>
388where
389    T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
390    Tr: SyncTransport + Clone + 'static,
391{
392    type Item = (String, T);
393
394    fn next(&mut self) -> Option<Self::Item> {
395        if self.current_index >= self.keys.len() {
396            return None;
397        }
398
399        let key = self.keys[self.current_index].clone();
400        self.current_index += 1;
401
402        // Note: This is a simplified implementation
403        // In a real implementation, you'd want to handle the async nature properly
404        Some((key, T::default())) // Placeholder
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411    use crate::storage::Storage;
412    use crate::transport::InMemoryTransport;
413    use crate::crdt::{LwwRegister, ReplicaId};
414
415    #[tokio::test]
416    async fn test_collection_basic_operations() {
417        let storage = Storage::memory();
418        let transport = InMemoryTransport::new();
419        let collection = LocalFirstCollection::<LwwRegister<String>, _>::new(storage, transport);
420
421        // Test insert and get
422        let value1 = LwwRegister::new("value1".to_string(), ReplicaId::default());
423        assert!(collection.insert("key1", &value1).await.is_ok());
424        let value = collection.get("key1").await.unwrap();
425        assert_eq!(value, Some(value1));
426
427        // Test remove
428        assert!(collection.remove("key1").await.is_ok());
429        let value = collection.get("key1").await.unwrap();
430        assert_eq!(value, None);
431    }
432
433    #[tokio::test]
434    async fn test_collection_builder() {
435        let storage = Storage::memory();
436        let transport = InMemoryTransport::new();
437        
438        let collection = CollectionBuilder::new(storage, transport)
439            .with_auto_sync(true)
440            .build::<LwwRegister<String>>();
441
442        assert!(collection.auto_sync);
443    }
444
445    #[tokio::test]
446    async fn test_collection_batch_operations() {
447        let storage = Storage::memory();
448        let transport = InMemoryTransport::new();
449        let collection = LocalFirstCollection::<LwwRegister<String>, _>::new(storage, transport);
450
451        // Test batch insert
452        let items = vec![
453            ("key1".to_string(), LwwRegister::new("value1".to_string(), ReplicaId::default())),
454            ("key2".to_string(), LwwRegister::new("value2".to_string(), ReplicaId::default())),
455            ("key3".to_string(), LwwRegister::new("value3".to_string(), ReplicaId::default())),
456        ];
457        
458        assert!(collection.insert_batch(items.clone()).await.is_ok());
459        
460        // Test batch get
461        let keys = vec!["key1".to_string(), "key2".to_string(), "key3".to_string()];
462        let results = collection.get_batch(keys).await.unwrap();
463        assert_eq!(results.len(), 3);
464        
465        // Test batch contains
466        let exists_results = collection.contains_keys(vec!["key1".to_string(), "key2".to_string(), "key4".to_string()]).await.unwrap();
467        assert_eq!(exists_results, vec![
468            ("key1".to_string(), true),
469            ("key2".to_string(), true),
470            ("key4".to_string(), false),
471        ]);
472        
473        // Test batch update
474        let updates = vec![
475            ("key1".to_string(), LwwRegister::new("updated1".to_string(), ReplicaId::default())),
476            ("key2".to_string(), LwwRegister::new("updated2".to_string(), ReplicaId::default())),
477        ];
478        assert!(collection.update_batch(updates).await.is_ok());
479        
480        // Test batch remove
481        let keys_to_remove = vec!["key1".to_string(), "key2".to_string()];
482        assert!(collection.remove_batch(keys_to_remove).await.is_ok());
483        
484        // Verify removal
485        let remaining = collection.get_batch(vec!["key3".to_string()]).await.unwrap();
486        assert_eq!(remaining.len(), 1);
487        assert_eq!(remaining[0].0, "key3");
488    }
489
490    #[tokio::test]
491    async fn test_collection_batch_performance() {
492        let storage = Storage::memory();
493        let transport = InMemoryTransport::new();
494        let collection = LocalFirstCollection::<LwwRegister<String>, _>::new(storage, transport);
495
496        // Create 1000 items for performance testing
497        let items: Vec<_> = (0..1000)
498            .map(|i| (
499                format!("key{}", i),
500                LwwRegister::new(format!("value{}", i), ReplicaId::default())
501            ))
502            .collect();
503
504        // Measure batch insert performance
505        let start = std::time::Instant::now();
506        assert!(collection.insert_batch(items).await.is_ok());
507        let batch_duration = start.elapsed();
508
509        // Measure individual insert performance for comparison
510        let individual_items: Vec<_> = (1000..2000)
511            .map(|i| (
512                format!("key{}", i),
513                LwwRegister::new(format!("value{}", i), ReplicaId::default())
514            ))
515            .collect();
516
517        let start = std::time::Instant::now();
518        for (key, value) in &individual_items {
519            assert!(collection.insert(key, value).await.is_ok());
520        }
521        let individual_duration = start.elapsed();
522
523        // Batch operations should be significantly faster
524        println!("Batch insert (1000 items): {:?}", batch_duration);
525        println!("Individual insert (1000 items): {:?}", individual_duration);
526        
527        // Verify all items were inserted
528        let total_count = collection.len().await.unwrap();
529        assert_eq!(total_count, 2000);
530    }
531}