1use crate::{
4 crdt::{Mergeable, ReplicaId},
5 storage::{LocalStorage, Storage, StorageError},
6 sync::{SyncEngine, SyncState},
7 transport::{SyncTransport, TransportError},
8};
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use std::marker::PhantomData;
12use tokio::sync::RwLock;
13use thiserror::Error;
14
15#[derive(Error, Debug)]
16pub enum CollectionError {
17 #[error("Storage error: {0}")]
18 Storage(#[from] StorageError),
19 #[error("Transport error: {0}")]
20 Transport(#[from] TransportError),
21 #[error("Sync error: {0}")]
22 Sync(#[from] crate::sync::SyncEngineError),
23 #[error("Serialization error: {0}")]
24 Serialization(#[from] serde_json::Error),
25 #[error("Item not found: {0}")]
26 NotFound(String),
27 #[error("Invalid operation: {0}")]
28 InvalidOperation(String),
29}
30
31pub struct LocalFirstCollection<T, Tr>
33where
34 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
35 Tr: SyncTransport + Clone + 'static,
36{
37 storage: Storage,
38 sync_engine: Arc<RwLock<SyncEngine<Tr>>>,
39 auto_sync: bool,
40 _phantom: PhantomData<T>,
41}
42
43pub struct CollectionBuilder<Tr>
45where
46 Tr: SyncTransport + Clone + 'static,
47{
48 storage: Storage,
49 transport: Tr,
50 auto_sync: bool,
51 replica_id: Option<ReplicaId>,
52}
53
54impl<Tr> CollectionBuilder<Tr>
55where
56 Tr: SyncTransport + Clone + 'static,
57{
58 pub fn new(storage: Storage, transport: Tr) -> Self {
59 Self {
60 storage,
61 transport,
62 auto_sync: false,
63 replica_id: None,
64 }
65 }
66
67 pub fn with_auto_sync(mut self, enabled: bool) -> Self {
68 self.auto_sync = enabled;
69 self
70 }
71
72 pub fn with_replica_id(mut self, replica_id: ReplicaId) -> Self {
73 self.replica_id = Some(replica_id);
74 self
75 }
76
77 pub fn build<T>(self) -> LocalFirstCollection<T, Tr>
78 where
79 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
80 {
81 let sync_engine = if let Some(replica_id) = self.replica_id {
82 SyncEngine::with_replica_id(self.storage.clone(), self.transport.clone(), replica_id)
83 } else {
84 SyncEngine::new(self.storage.clone(), self.transport.clone())
85 };
86
87 LocalFirstCollection::<T, Tr> {
88 storage: self.storage,
89 sync_engine: Arc::new(RwLock::new(sync_engine)),
90 auto_sync: self.auto_sync,
91 _phantom: PhantomData,
92 }
93 }
94}
95
96impl<T, Tr> LocalFirstCollection<T, Tr>
97where
98 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
99 Tr: SyncTransport + Clone + 'static,
100{
101 pub fn new(storage: Storage, transport: Tr) -> Self {
103 let sync_engine = SyncEngine::new(storage.clone(), transport);
104
105 Self {
106 storage,
107 sync_engine: Arc::new(RwLock::new(sync_engine)),
108 auto_sync: false,
109 _phantom: PhantomData,
110 }
111 }
112
113 pub fn with_replica_id(storage: Storage, transport: Tr, replica_id: ReplicaId) -> Self {
115 let sync_engine = SyncEngine::with_replica_id(storage.clone(), transport, replica_id);
116
117 Self {
118 storage,
119 sync_engine: Arc::new(RwLock::new(sync_engine)),
120 auto_sync: false,
121 _phantom: PhantomData,
122 }
123 }
124
125 pub fn replica_id(&self) -> ReplicaId {
127 ReplicaId::default()
129 }
130
131 pub async fn insert(&self, key: &str, value: &T) -> Result<(), CollectionError> {
133 self.storage.set(key, value).await?;
135
136 if self.auto_sync {
138 let mut engine = self.sync_engine.write().await;
139 engine.sync(key, value).await?;
140 }
141
142 Ok(())
143 }
144
145 pub async fn get(&self, key: &str) -> Result<Option<T>, CollectionError> {
147 self.storage.get(key).await.map_err(Into::into)
148 }
149
150 pub async fn remove(&self, key: &str) -> Result<(), CollectionError> {
152 self.storage.remove(key).await.map_err(Into::into)
153 }
154
155 pub async fn keys(&self) -> Result<Vec<String>, CollectionError> {
157 self.storage.keys().await.map_err(Into::into)
158 }
159
160 pub async fn values(&self) -> Result<Vec<T>, CollectionError> {
162 let keys = self.storage.keys().await.map_err(|e| CollectionError::Storage(e))?;
163 let mut values = Vec::new();
164
165 for key in keys {
166 if let Some(value) = self.get(&key).await? {
167 values.push(value);
168 }
169 }
170
171 Ok(values)
172 }
173
174 pub async fn contains_key(&self, key: &str) -> Result<bool, CollectionError> {
176 self.storage.contains_key(key).await.map_err(Into::into)
177 }
178
179 pub async fn len(&self) -> Result<usize, CollectionError> {
181 self.storage.len().await.map_err(Into::into)
182 }
183
184 pub async fn is_empty(&self) -> Result<bool, CollectionError> {
186 self.storage.is_empty().await.map_err(Into::into)
187 }
188
189 pub async fn start_sync(&self) -> Result<(), CollectionError> {
191 let mut engine = self.sync_engine.write().await;
192 engine.start_sync().await.map_err(Into::into)
193 }
194
195 pub async fn stop_sync(&self) -> Result<(), CollectionError> {
197 let mut engine = self.sync_engine.write().await;
198 engine.stop_sync().await.map_err(Into::into)
199 }
200
201 pub async fn sync_state(&self) -> Result<SyncState, CollectionError> {
203 let engine = self.sync_engine.read().await;
204 Ok(engine.state().await)
205 }
206
207 pub async fn is_online(&self) -> Result<bool, CollectionError> {
209 let engine = self.sync_engine.read().await;
210 Ok(engine.is_online().await)
211 }
212
213 pub async fn peer_count(&self) -> Result<usize, CollectionError> {
215 let engine = self.sync_engine.read().await;
216 Ok(engine.peer_count().await)
217 }
218
219 pub fn set_auto_sync(&mut self, enabled: bool) {
221 self.auto_sync = enabled;
222 }
223
224 pub async fn force_sync(&self) -> Result<(), CollectionError> {
226 let mut engine = self.sync_engine.write().await;
227
228 engine.process_messages().await.map_err(|e| CollectionError::Sync(e))?;
230
231 Ok(())
232 }
233
234 pub async fn insert_batch(&self, items: impl IntoIterator<Item = (String, T)>) -> Result<(), CollectionError> {
236 let items: Vec<_> = items.into_iter().collect();
237
238 for (key, value) in &items {
240 self.storage.set(key, value).await?;
241 }
242
243 if self.auto_sync {
245 let mut engine = self.sync_engine.write().await;
246 for (key, value) in items {
247 engine.sync(&key, &value).await?;
248 }
249 }
250
251 Ok(())
252 }
253
254 pub async fn update_batch(&self, updates: impl IntoIterator<Item = (String, T)>) -> Result<(), CollectionError> {
256 let updates: Vec<_> = updates.into_iter().collect();
257
258 for (key, _) in &updates {
260 if !self.storage.contains_key(key).await? {
261 return Err(CollectionError::NotFound(key.clone()));
262 }
263 }
264
265 for (key, value) in &updates {
267 self.storage.set(key, value).await?;
268 }
269
270 if self.auto_sync {
272 let mut engine = self.sync_engine.write().await;
273 for (key, value) in updates {
274 engine.sync(&key, &value).await?;
275 }
276 }
277
278 Ok(())
279 }
280
281 pub async fn remove_batch(&self, keys: impl IntoIterator<Item = String>) -> Result<(), CollectionError> {
283 let keys: Vec<_> = keys.into_iter().collect();
284
285 for key in &keys {
287 self.storage.remove(key).await?;
288 }
289
290 if self.auto_sync {
292 let mut engine = self.sync_engine.write().await;
293 for key in keys {
294 engine.sync(&key, &T::default()).await?;
297 }
298 }
299
300 Ok(())
301 }
302
303 pub async fn get_batch(&self, keys: impl IntoIterator<Item = String>) -> Result<Vec<(String, Option<T>)>, CollectionError> {
305 let keys: Vec<_> = keys.into_iter().collect();
306 let mut results = Vec::new();
307
308 for key in keys {
309 let value = self.storage.get(&key).await?;
310 results.push((key, value));
311 }
312
313 Ok(results)
314 }
315
316 pub async fn contains_keys(&self, keys: impl IntoIterator<Item = String>) -> Result<Vec<(String, bool)>, CollectionError> {
318 let keys: Vec<_> = keys.into_iter().collect();
319 let mut results = Vec::new();
320
321 for key in keys {
322 let exists = self.storage.contains_key(&key).await?;
323 results.push((key, exists));
324 }
325
326 Ok(results)
327 }
328
329 pub async fn peers(&self) -> Result<impl Iterator<Item = (ReplicaId, crate::sync::PeerInfo)>, CollectionError> {
331 let engine = self.sync_engine.read().await;
332 Ok(engine.peers().await)
333 }
334
335 pub async fn sync_info(&self) -> Result<SyncInfo, CollectionError> {
337 let engine = self.sync_engine.read().await;
338
339 Ok(SyncInfo {
340 sync_state: engine.state().await,
341 peer_count: engine.peer_count().await,
342 is_online: engine.is_online().await,
343 })
344 }
345}
346
347#[derive(Debug, Clone)]
349pub struct SyncInfo {
350 pub sync_state: SyncState,
351 pub peer_count: usize,
352 pub is_online: bool,
353}
354
355pub struct CollectionIterator<T, Tr>
357where
358 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
359 Tr: SyncTransport + Clone + 'static,
360{
361 collection: Arc<LocalFirstCollection<T, Tr>>,
362 keys: Vec<String>,
363 current_index: usize,
364 _phantom: PhantomData<T>,
365}
366
367impl<T, Tr> CollectionIterator<T, Tr>
368where
369 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
370 Tr: SyncTransport + Clone + 'static,
371{
372 pub fn new(collection: Arc<LocalFirstCollection<T, Tr>>) -> Self {
373 Self {
374 collection,
375 keys: Vec::new(),
376 current_index: 0,
377 _phantom: PhantomData,
378 }
379 }
380
381 pub async fn load_keys(&mut self) -> Result<(), CollectionError> {
382 self.keys = self.collection.storage.keys().await.map_err(|e| CollectionError::Storage(e))?;
383 Ok(())
384 }
385}
386
387impl<T, Tr> Iterator for CollectionIterator<T, Tr>
388where
389 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
390 Tr: SyncTransport + Clone + 'static,
391{
392 type Item = (String, T);
393
394 fn next(&mut self) -> Option<Self::Item> {
395 if self.current_index >= self.keys.len() {
396 return None;
397 }
398
399 let key = self.keys[self.current_index].clone();
400 self.current_index += 1;
401
402 Some((key, T::default())) }
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411 use crate::storage::Storage;
412 use crate::transport::InMemoryTransport;
413 use crate::crdt::{LwwRegister, ReplicaId};
414
415 #[tokio::test]
416 async fn test_collection_basic_operations() {
417 let storage = Storage::memory();
418 let transport = InMemoryTransport::new();
419 let collection = LocalFirstCollection::<LwwRegister<String>, _>::new(storage, transport);
420
421 let value1 = LwwRegister::new("value1".to_string(), ReplicaId::default());
423 assert!(collection.insert("key1", &value1).await.is_ok());
424 let value = collection.get("key1").await.unwrap();
425 assert_eq!(value, Some(value1));
426
427 assert!(collection.remove("key1").await.is_ok());
429 let value = collection.get("key1").await.unwrap();
430 assert_eq!(value, None);
431 }
432
433 #[tokio::test]
434 async fn test_collection_builder() {
435 let storage = Storage::memory();
436 let transport = InMemoryTransport::new();
437
438 let collection = CollectionBuilder::new(storage, transport)
439 .with_auto_sync(true)
440 .build::<LwwRegister<String>>();
441
442 assert!(collection.auto_sync);
443 }
444
445 #[tokio::test]
446 async fn test_collection_batch_operations() {
447 let storage = Storage::memory();
448 let transport = InMemoryTransport::new();
449 let collection = LocalFirstCollection::<LwwRegister<String>, _>::new(storage, transport);
450
451 let items = vec![
453 ("key1".to_string(), LwwRegister::new("value1".to_string(), ReplicaId::default())),
454 ("key2".to_string(), LwwRegister::new("value2".to_string(), ReplicaId::default())),
455 ("key3".to_string(), LwwRegister::new("value3".to_string(), ReplicaId::default())),
456 ];
457
458 assert!(collection.insert_batch(items.clone()).await.is_ok());
459
460 let keys = vec!["key1".to_string(), "key2".to_string(), "key3".to_string()];
462 let results = collection.get_batch(keys).await.unwrap();
463 assert_eq!(results.len(), 3);
464
465 let exists_results = collection.contains_keys(vec!["key1".to_string(), "key2".to_string(), "key4".to_string()]).await.unwrap();
467 assert_eq!(exists_results, vec![
468 ("key1".to_string(), true),
469 ("key2".to_string(), true),
470 ("key4".to_string(), false),
471 ]);
472
473 let updates = vec![
475 ("key1".to_string(), LwwRegister::new("updated1".to_string(), ReplicaId::default())),
476 ("key2".to_string(), LwwRegister::new("updated2".to_string(), ReplicaId::default())),
477 ];
478 assert!(collection.update_batch(updates).await.is_ok());
479
480 let keys_to_remove = vec!["key1".to_string(), "key2".to_string()];
482 assert!(collection.remove_batch(keys_to_remove).await.is_ok());
483
484 let remaining = collection.get_batch(vec!["key3".to_string()]).await.unwrap();
486 assert_eq!(remaining.len(), 1);
487 assert_eq!(remaining[0].0, "key3");
488 }
489
490 #[tokio::test]
491 async fn test_collection_batch_performance() {
492 let storage = Storage::memory();
493 let transport = InMemoryTransport::new();
494 let collection = LocalFirstCollection::<LwwRegister<String>, _>::new(storage, transport);
495
496 let items: Vec<_> = (0..1000)
498 .map(|i| (
499 format!("key{}", i),
500 LwwRegister::new(format!("value{}", i), ReplicaId::default())
501 ))
502 .collect();
503
504 let start = std::time::Instant::now();
506 assert!(collection.insert_batch(items).await.is_ok());
507 let batch_duration = start.elapsed();
508
509 let individual_items: Vec<_> = (1000..2000)
511 .map(|i| (
512 format!("key{}", i),
513 LwwRegister::new(format!("value{}", i), ReplicaId::default())
514 ))
515 .collect();
516
517 let start = std::time::Instant::now();
518 for (key, value) in &individual_items {
519 assert!(collection.insert(key, value).await.is_ok());
520 }
521 let individual_duration = start.elapsed();
522
523 println!("Batch insert (1000 items): {:?}", batch_duration);
525 println!("Individual insert (1000 items): {:?}", individual_duration);
526
527 let total_count = collection.len().await.unwrap();
529 assert_eq!(total_count, 2000);
530 }
531}