1use crate::{
4 crdt::{Mergeable, ReplicaId},
5 storage::{LocalStorage, Storage, StorageError},
6 sync::{SyncEngine, SyncState},
7 transport::{SyncTransport, TransportError},
8};
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use std::marker::PhantomData;
12use tokio::sync::RwLock;
13use thiserror::Error;
14
15#[derive(Error, Debug)]
16pub enum CollectionError {
17 #[error("Storage error: {0}")]
18 Storage(#[from] StorageError),
19 #[error("Transport error: {0}")]
20 Transport(#[from] TransportError),
21 #[error("Sync error: {0}")]
22 Sync(#[from] crate::sync::SyncEngineError),
23 #[error("Serialization error: {0}")]
24 Serialization(#[from] serde_json::Error),
25 #[error("Item not found: {0}")]
26 NotFound(String),
27 #[error("Invalid operation: {0}")]
28 InvalidOperation(String),
29}
30
31pub struct LocalFirstCollection<T, Tr>
33where
34 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
35 Tr: SyncTransport + Clone + 'static,
36{
37 storage: Storage,
38 sync_engine: Arc<RwLock<SyncEngine<Tr>>>,
39 auto_sync: bool,
40 _phantom: PhantomData<T>,
41}
42
43pub struct CollectionBuilder<Tr>
45where
46 Tr: SyncTransport + Clone + 'static,
47{
48 storage: Storage,
49 transport: Tr,
50 auto_sync: bool,
51 replica_id: Option<ReplicaId>,
52}
53
54impl<Tr> CollectionBuilder<Tr>
55where
56 Tr: SyncTransport + Clone + 'static,
57{
58 pub fn new(storage: Storage, transport: Tr) -> Self {
59 Self {
60 storage,
61 transport,
62 auto_sync: false,
63 replica_id: None,
64 }
65 }
66
67 pub fn with_auto_sync(mut self, enabled: bool) -> Self {
68 self.auto_sync = enabled;
69 self
70 }
71
72 pub fn with_replica_id(mut self, replica_id: ReplicaId) -> Self {
73 self.replica_id = Some(replica_id);
74 self
75 }
76
77 pub fn build<T>(self) -> LocalFirstCollection<T, Tr>
78 where
79 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
80 {
81 let sync_engine = if let Some(replica_id) = self.replica_id {
82 SyncEngine::with_replica_id(self.storage.clone(), self.transport.clone(), replica_id)
83 } else {
84 SyncEngine::new(self.storage.clone(), self.transport.clone())
85 };
86
87 LocalFirstCollection::<T, Tr> {
88 storage: self.storage,
89 sync_engine: Arc::new(RwLock::new(sync_engine)),
90 auto_sync: self.auto_sync,
91 _phantom: PhantomData,
92 }
93 }
94}
95
96impl<T, Tr> LocalFirstCollection<T, Tr>
97where
98 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
99 Tr: SyncTransport + Clone + 'static,
100{
101 pub fn new(storage: Storage, transport: Tr) -> Self {
103 let sync_engine = SyncEngine::new(storage.clone(), transport);
104
105 Self {
106 storage,
107 sync_engine: Arc::new(RwLock::new(sync_engine)),
108 auto_sync: false,
109 _phantom: PhantomData,
110 }
111 }
112
113 pub fn with_replica_id(storage: Storage, transport: Tr, replica_id: ReplicaId) -> Self {
115 let sync_engine = SyncEngine::with_replica_id(storage.clone(), transport, replica_id);
116
117 Self {
118 storage,
119 sync_engine: Arc::new(RwLock::new(sync_engine)),
120 auto_sync: false,
121 _phantom: PhantomData,
122 }
123 }
124
125 pub fn replica_id(&self) -> ReplicaId {
127 ReplicaId::default()
129 }
130
131 pub async fn insert(&self, key: &str, value: &T) -> Result<(), CollectionError> {
133 self.storage.set(key, value).await?;
135
136 if self.auto_sync {
138 let mut engine = self.sync_engine.write().await;
139 engine.sync(key, value).await?;
140 }
141
142 Ok(())
143 }
144
145 pub async fn get(&self, key: &str) -> Result<Option<T>, CollectionError> {
147 self.storage.get(key).await.map_err(Into::into)
148 }
149
150 pub async fn remove(&self, key: &str) -> Result<(), CollectionError> {
152 self.storage.remove(key).await.map_err(Into::into)
153 }
154
155 pub async fn keys(&self) -> Result<Vec<String>, CollectionError> {
157 self.storage.keys().await.map_err(Into::into)
158 }
159
160 pub async fn values(&self) -> Result<Vec<T>, CollectionError> {
162 let keys = self.storage.keys().await.map_err(|e| CollectionError::Storage(e))?;
163 let mut values = Vec::new();
164
165 for key in keys {
166 if let Some(value) = self.get(&key).await? {
167 values.push(value);
168 }
169 }
170
171 Ok(values)
172 }
173
174 pub async fn contains_key(&self, key: &str) -> Result<bool, CollectionError> {
176 self.storage.contains_key(key).await.map_err(Into::into)
177 }
178
179 pub async fn len(&self) -> Result<usize, CollectionError> {
181 self.storage.len().await.map_err(Into::into)
182 }
183
184 pub async fn is_empty(&self) -> Result<bool, CollectionError> {
186 self.storage.is_empty().await.map_err(Into::into)
187 }
188
189 pub async fn start_sync(&self) -> Result<(), CollectionError> {
191 let mut engine = self.sync_engine.write().await;
192 engine.start_sync().await.map_err(Into::into)
193 }
194
195 pub async fn stop_sync(&self) -> Result<(), CollectionError> {
197 let mut engine = self.sync_engine.write().await;
198 engine.stop_sync().await.map_err(Into::into)
199 }
200
201 pub async fn sync_state(&self) -> Result<SyncState, CollectionError> {
203 let engine = self.sync_engine.read().await;
204 Ok(engine.state().await)
205 }
206
207 pub async fn is_online(&self) -> Result<bool, CollectionError> {
209 let engine = self.sync_engine.read().await;
210 Ok(engine.is_online().await)
211 }
212
213 pub async fn peer_count(&self) -> Result<usize, CollectionError> {
215 let engine = self.sync_engine.read().await;
216 Ok(engine.peer_count().await)
217 }
218
219 pub fn set_auto_sync(&mut self, enabled: bool) {
221 self.auto_sync = enabled;
222 }
223
224 pub async fn force_sync(&self) -> Result<(), CollectionError> {
226 let mut engine = self.sync_engine.write().await;
227
228 engine.process_messages().await.map_err(|e| CollectionError::Sync(e))?;
230
231 Ok(())
232 }
233
234 pub async fn peers(&self) -> Result<impl Iterator<Item = (ReplicaId, crate::sync::PeerInfo)>, CollectionError> {
236 let engine = self.sync_engine.read().await;
237 Ok(engine.peers().await)
238 }
239
240 pub async fn sync_info(&self) -> Result<SyncInfo, CollectionError> {
242 let engine = self.sync_engine.read().await;
243
244 Ok(SyncInfo {
245 sync_state: engine.state().await,
246 peer_count: engine.peer_count().await,
247 is_online: engine.is_online().await,
248 })
249 }
250}
251
252#[derive(Debug, Clone)]
254pub struct SyncInfo {
255 pub sync_state: SyncState,
256 pub peer_count: usize,
257 pub is_online: bool,
258}
259
260pub struct CollectionIterator<T, Tr>
262where
263 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
264 Tr: SyncTransport + Clone + 'static,
265{
266 collection: Arc<LocalFirstCollection<T, Tr>>,
267 keys: Vec<String>,
268 current_index: usize,
269 _phantom: PhantomData<T>,
270}
271
272impl<T, Tr> CollectionIterator<T, Tr>
273where
274 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
275 Tr: SyncTransport + Clone + 'static,
276{
277 pub fn new(collection: Arc<LocalFirstCollection<T, Tr>>) -> Self {
278 Self {
279 collection,
280 keys: Vec::new(),
281 current_index: 0,
282 _phantom: PhantomData,
283 }
284 }
285
286 pub async fn load_keys(&mut self) -> Result<(), CollectionError> {
287 self.keys = self.collection.storage.keys().await.map_err(|e| CollectionError::Storage(e))?;
288 Ok(())
289 }
290}
291
292impl<T, Tr> Iterator for CollectionIterator<T, Tr>
293where
294 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + Mergeable + Default,
295 Tr: SyncTransport + Clone + 'static,
296{
297 type Item = (String, T);
298
299 fn next(&mut self) -> Option<Self::Item> {
300 if self.current_index >= self.keys.len() {
301 return None;
302 }
303
304 let key = self.keys[self.current_index].clone();
305 self.current_index += 1;
306
307 Some((key, T::default())) }
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316 use crate::storage::Storage;
317 use crate::transport::InMemoryTransport;
318 use crate::crdt::{LwwRegister, ReplicaId};
319
320 #[tokio::test]
321 async fn test_collection_basic_operations() {
322 let storage = Storage::memory();
323 let transport = InMemoryTransport::new();
324 let collection = LocalFirstCollection::<LwwRegister<String>, _>::new(storage, transport);
325
326 let value1 = LwwRegister::new("value1".to_string(), ReplicaId::default());
328 assert!(collection.insert("key1", &value1).await.is_ok());
329 let value = collection.get("key1").await.unwrap();
330 assert_eq!(value, Some(value1));
331
332 assert!(collection.remove("key1").await.is_ok());
334 let value = collection.get("key1").await.unwrap();
335 assert_eq!(value, None);
336 }
337
338 #[tokio::test]
339 async fn test_collection_builder() {
340 let storage = Storage::memory();
341 let transport = InMemoryTransport::new();
342
343 let collection = CollectionBuilder::new(storage, transport)
344 .with_auto_sync(true)
345 .build::<LwwRegister<String>>();
346
347 assert!(collection.auto_sync);
348 }
349}