Skip to main content

peat_mesh/storage/
traits.rs

1//! Storage backend trait abstraction
2//!
3//! This module defines the core traits for Peat mesh's storage layer,
4//! enabling runtime backend selection between Ditto, Automerge, RocksDB, etc.
5//!
6//! # Design Philosophy
7//!
8//! - **Backend agnostic**: Business logic doesn't depend on specific storage implementation
9//! - **Type-safe**: Rust's type system enforces correct usage
10//! - **Flexible**: Easy to add new backends (Redb, LMDB, etc.)
11//! - **Testable**: Mock implementations for testing without real storage
12//!
13//! # Example
14//!
15//! ```ignore
16//! use peat_protocol::storage::{StorageBackend, create_storage_backend};
17//!
18//! // Create backend from configuration
19//! let config = StorageConfig::from_env()?;
20//! let storage = create_storage_backend(&config)?;
21//!
22//! // Get collection and perform operations
23//! let cells = storage.collection("cells");
24//! cells.upsert("cell-1", serialize(&cell_state)?)?;
25//!
26//! let cell_bytes = cells.get("cell-1")?.unwrap();
27//! let cell = deserialize::<CellState>(&cell_bytes)?;
28//! ```
29
30use anyhow::Result;
31use std::sync::Arc;
32
33/// Type alias for document predicate functions
34///
35/// Used in `Collection::find()` to filter documents by their serialized bytes.
36pub type DocumentPredicate = Box<dyn Fn(&[u8]) -> bool + Send>;
37
38/// Main storage backend trait
39///
40/// Implementations provide access to collections and manage the underlying storage.
41/// All implementations must be thread-safe (Send + Sync).
42///
43/// # Implementations
44///
45/// - **DittoBackend**: Wraps existing Ditto SDK (proprietary, production-ready)
46/// - **AutomergeInMemoryBackend**: In-memory Automerge (POC, testing)
47/// - **RocksDbBackend**: RocksDB persistence (production target)
48///
49/// # Thread Safety
50///
51/// All methods are safe to call from multiple threads. Implementations should use
52/// appropriate synchronization (Arc, RwLock, etc.) as needed.
53pub trait StorageBackend: Send + Sync {
54    /// Get or create a collection by name
55    ///
56    /// Collections are logical groupings of documents (e.g., "cells", "nodes").
57    /// Multiple calls with the same name return references to the same collection.
58    ///
59    /// # Arguments
60    ///
61    /// * `name` - Collection name (e.g., "cells", "nodes", "capabilities")
62    ///
63    /// # Returns
64    ///
65    /// A thread-safe collection handle
66    ///
67    /// # Example
68    ///
69    /// ```ignore
70    /// let cells = storage.collection("cells");
71    /// let nodes = storage.collection("nodes");
72    /// ```
73    fn collection(&self, name: &str) -> Arc<dyn Collection>;
74
75    /// List all collection names
76    ///
77    /// Returns names of all collections that have been created or contain documents.
78    ///
79    /// # Returns
80    ///
81    /// Vector of collection names (may be empty)
82    fn list_collections(&self) -> Vec<String>;
83
84    /// Flush any pending writes to disk
85    ///
86    /// For in-memory backends, this is a no-op. For persistent backends (RocksDB),
87    /// this ensures all writes are durable.
88    ///
89    /// # Returns
90    ///
91    /// Ok(()) on success, Err if flush fails
92    fn flush(&self) -> Result<()>;
93
94    /// Close the storage backend cleanly
95    ///
96    /// Implementations should flush pending writes and release resources.
97    /// After calling close(), the backend should not be used.
98    ///
99    /// # Returns
100    ///
101    /// Ok(()) on success, Err if close fails
102    fn close(self) -> Result<()>;
103}
104
105/// Collection trait for storing and querying documents
106///
107/// A collection is a logical grouping of documents (key-value pairs).
108/// Documents are stored as raw bytes (typically serialized protobuf).
109///
110/// # Document Storage Format
111///
112/// - **Key**: String document ID (e.g., "cell-1", "node-abc123")
113/// - **Value**: Raw bytes (serialized protobuf message)
114///
115/// # Thread Safety
116///
117/// All operations are thread-safe. Multiple threads can read/write concurrently.
118///
119/// # Example
120///
121/// ```ignore
122/// let cells = storage.collection("cells");
123///
124/// // Create and store a cell
125/// let cell = CellState { id: "cell-1".to_string(), ..Default::default() };
126/// let bytes = cell.encode_to_vec();
127/// cells.upsert("cell-1", bytes)?;
128///
129/// // Retrieve the cell
130/// if let Some(stored) = cells.get("cell-1")? {
131///     let cell = CellState::decode(&stored[..])?;
132///     println!("Retrieved cell: {}", cell.id);
133/// }
134///
135/// // Query all cells
136/// for (id, bytes) in cells.scan()? {
137///     let cell = CellState::decode(&bytes[..])?;
138///     println!("Cell {}: {:?}", id, cell);
139/// }
140/// ```
141pub trait Collection: Send + Sync {
142    /// Insert or update a document
143    ///
144    /// If a document with the given ID exists, it is replaced. Otherwise, a new
145    /// document is created.
146    ///
147    /// # Arguments
148    ///
149    /// * `doc_id` - Unique document identifier
150    /// * `data` - Serialized document bytes (typically protobuf)
151    ///
152    /// # Returns
153    ///
154    /// Ok(()) on success, Err if upsert fails
155    ///
156    /// # Example
157    ///
158    /// ```ignore
159    /// let cell = CellState { id: "cell-1".to_string(), ..Default::default() };
160    /// cells.upsert("cell-1", cell.encode_to_vec())?;
161    /// ```
162    fn upsert(&self, doc_id: &str, data: Vec<u8>) -> Result<()>;
163
164    /// Get a document by ID
165    ///
166    /// # Arguments
167    ///
168    /// * `doc_id` - Document identifier to retrieve
169    ///
170    /// # Returns
171    ///
172    /// - `Ok(Some(bytes))` if document exists
173    /// - `Ok(None)` if document not found
174    /// - `Err` if query fails
175    ///
176    /// # Example
177    ///
178    /// ```ignore
179    /// match cells.get("cell-1")? {
180    ///     Some(bytes) => {
181    ///         let cell = CellState::decode(&bytes[..])?;
182    ///         println!("Found cell: {}", cell.id);
183    ///     }
184    ///     None => println!("Cell not found"),
185    /// }
186    /// ```
187    fn get(&self, doc_id: &str) -> Result<Option<Vec<u8>>>;
188
189    /// Delete a document by ID
190    ///
191    /// If the document doesn't exist, this is a no-op (not an error).
192    ///
193    /// # Arguments
194    ///
195    /// * `doc_id` - Document identifier to delete
196    ///
197    /// # Returns
198    ///
199    /// Ok(()) on success (whether or not document existed), Err if delete fails
200    ///
201    /// # Example
202    ///
203    /// ```ignore
204    /// cells.delete("cell-1")?;
205    /// ```
206    fn delete(&self, doc_id: &str) -> Result<()>;
207
208    /// Scan all documents in the collection
209    ///
210    /// Returns all documents as (id, bytes) tuples. Order is implementation-defined.
211    ///
212    /// # Performance
213    ///
214    /// This loads all documents into memory. For large collections, consider
215    /// streaming or pagination (future enhancement).
216    ///
217    /// # Returns
218    ///
219    /// Vector of (document_id, document_bytes) tuples
220    ///
221    /// # Example
222    ///
223    /// ```ignore
224    /// for (id, bytes) in cells.scan()? {
225    ///     let cell = CellState::decode(&bytes[..])?;
226    ///     println!("Cell {}: {:?}", id, cell);
227    /// }
228    /// ```
229    fn scan(&self) -> Result<Vec<(String, Vec<u8>)>>;
230
231    /// Find documents matching a predicate
232    ///
233    /// Filters documents by applying a predicate function to their serialized bytes.
234    /// Less efficient than indexed queries, but flexible.
235    ///
236    /// # Arguments
237    ///
238    /// * `predicate` - Function that returns true for documents to include
239    ///
240    /// # Returns
241    ///
242    /// Vector of matching (document_id, document_bytes) tuples
243    ///
244    /// # Example
245    ///
246    /// ```ignore
247    /// // Find all cells with status "active"
248    /// let active_cells = cells.find(Box::new(|bytes| {
249    ///     if let Ok(cell) = CellState::decode(bytes) {
250    ///         cell.status == CellStatus::Active as i32
251    ///     } else {
252    ///         false
253    ///     }
254    /// }))?;
255    /// ```
256    fn find(&self, predicate: DocumentPredicate) -> Result<Vec<(String, Vec<u8>)>>;
257
258    /// Query documents by geohash prefix (proximity queries)
259    ///
260    /// Geohash is a hierarchical spatial index. Documents with the same prefix
261    /// are geographically close. Longer prefixes = smaller areas.
262    ///
263    /// # Arguments
264    ///
265    /// * `prefix` - Geohash prefix (e.g., "9q8y" for San Francisco area)
266    ///
267    /// # Returns
268    ///
269    /// Vector of matching (document_id, document_bytes) tuples
270    ///
271    /// # Example
272    ///
273    /// ```ignore
274    /// // Find all nodes within ~5km of a location
275    /// let nearby_nodes = nodes.query_geohash_prefix("9q8yy")?;
276    /// ```
277    ///
278    /// # Implementation Notes
279    ///
280    /// - For Ditto: Uses geohash index (efficient)
281    /// - For RocksDB: Uses prefix scan (requires geohash in key)
282    /// - For in-memory: Scans all documents (inefficient for large datasets)
283    fn query_geohash_prefix(&self, prefix: &str) -> Result<Vec<(String, Vec<u8>)>>;
284
285    /// Count documents in the collection
286    ///
287    /// # Returns
288    ///
289    /// Number of documents in collection
290    ///
291    /// # Example
292    ///
293    /// ```ignore
294    /// let cell_count = cells.count()?;
295    /// println!("Total cells: {}", cell_count);
296    /// ```
297    fn count(&self) -> Result<usize>;
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303    use std::collections::HashMap;
304    use std::sync::RwLock;
305
306    // Test that traits are object-safe (can be used as trait objects)
307    #[test]
308    fn test_storage_backend_is_object_safe() {
309        fn _assert_object_safe(_: &dyn StorageBackend) {}
310    }
311
312    #[test]
313    fn test_collection_is_object_safe() {
314        fn _assert_object_safe(_: &dyn Collection) {}
315    }
316
317    // --- In-memory Collection for testing ---
318
319    struct InMemCollection {
320        data: RwLock<HashMap<String, Vec<u8>>>,
321    }
322
323    impl InMemCollection {
324        fn new() -> Self {
325            Self {
326                data: RwLock::new(HashMap::new()),
327            }
328        }
329    }
330
331    impl Collection for InMemCollection {
332        fn upsert(&self, doc_id: &str, data: Vec<u8>) -> Result<()> {
333            self.data
334                .write()
335                .unwrap_or_else(|e| e.into_inner())
336                .insert(doc_id.to_string(), data);
337            Ok(())
338        }
339
340        fn get(&self, doc_id: &str) -> Result<Option<Vec<u8>>> {
341            Ok(self
342                .data
343                .read()
344                .unwrap_or_else(|e| e.into_inner())
345                .get(doc_id)
346                .cloned())
347        }
348
349        fn delete(&self, doc_id: &str) -> Result<()> {
350            self.data
351                .write()
352                .unwrap_or_else(|e| e.into_inner())
353                .remove(doc_id);
354            Ok(())
355        }
356
357        fn scan(&self) -> Result<Vec<(String, Vec<u8>)>> {
358            Ok(self
359                .data
360                .read()
361                .unwrap()
362                .iter()
363                .map(|(k, v)| (k.clone(), v.clone()))
364                .collect())
365        }
366
367        fn find(&self, predicate: DocumentPredicate) -> Result<Vec<(String, Vec<u8>)>> {
368            Ok(self
369                .data
370                .read()
371                .unwrap()
372                .iter()
373                .filter(|(_, v)| predicate(v))
374                .map(|(k, v)| (k.clone(), v.clone()))
375                .collect())
376        }
377
378        fn query_geohash_prefix(&self, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
379            Ok(self
380                .data
381                .read()
382                .unwrap()
383                .iter()
384                .filter(|(k, _)| k.starts_with(prefix))
385                .map(|(k, v)| (k.clone(), v.clone()))
386                .collect())
387        }
388
389        fn count(&self) -> Result<usize> {
390            Ok(self.data.read().unwrap_or_else(|e| e.into_inner()).len())
391        }
392    }
393
394    #[test]
395    fn test_collection_upsert_and_get() {
396        let col = InMemCollection::new();
397        col.upsert("doc-1", vec![1, 2, 3]).unwrap();
398
399        let result = col.get("doc-1").unwrap();
400        assert_eq!(result, Some(vec![1, 2, 3]));
401    }
402
403    #[test]
404    fn test_collection_get_missing() {
405        let col = InMemCollection::new();
406        assert_eq!(col.get("nonexistent").unwrap(), None);
407    }
408
409    #[test]
410    fn test_collection_upsert_overwrite() {
411        let col = InMemCollection::new();
412        col.upsert("doc-1", vec![1]).unwrap();
413        col.upsert("doc-1", vec![2]).unwrap();
414
415        assert_eq!(col.get("doc-1").unwrap(), Some(vec![2]));
416        assert_eq!(col.count().unwrap(), 1);
417    }
418
419    #[test]
420    fn test_collection_delete() {
421        let col = InMemCollection::new();
422        col.upsert("doc-1", vec![1]).unwrap();
423        col.delete("doc-1").unwrap();
424
425        assert_eq!(col.get("doc-1").unwrap(), None);
426        assert_eq!(col.count().unwrap(), 0);
427    }
428
429    #[test]
430    fn test_collection_delete_nonexistent_noop() {
431        let col = InMemCollection::new();
432        col.delete("nonexistent").unwrap(); // should not error
433    }
434
435    #[test]
436    fn test_collection_scan() {
437        let col = InMemCollection::new();
438        col.upsert("a", vec![1]).unwrap();
439        col.upsert("b", vec![2]).unwrap();
440
441        let mut results = col.scan().unwrap();
442        results.sort_by(|a, b| a.0.cmp(&b.0));
443        assert_eq!(results.len(), 2);
444        assert_eq!(results[0], ("a".to_string(), vec![1]));
445        assert_eq!(results[1], ("b".to_string(), vec![2]));
446    }
447
448    #[test]
449    fn test_collection_find() {
450        let col = InMemCollection::new();
451        col.upsert("big", vec![100, 200]).unwrap();
452        col.upsert("small", vec![1]).unwrap();
453
454        let results = col.find(Box::new(|bytes| bytes.len() > 1)).unwrap();
455        assert_eq!(results.len(), 1);
456        assert_eq!(results[0].0, "big");
457    }
458
459    #[test]
460    fn test_collection_query_geohash_prefix() {
461        let col = InMemCollection::new();
462        col.upsert("9q8yy_a", vec![1]).unwrap();
463        col.upsert("9q8yy_b", vec![2]).unwrap();
464        col.upsert("u4pru_c", vec![3]).unwrap();
465
466        let results = col.query_geohash_prefix("9q8yy").unwrap();
467        assert_eq!(results.len(), 2);
468    }
469
470    #[test]
471    fn test_collection_count() {
472        let col = InMemCollection::new();
473        assert_eq!(col.count().unwrap(), 0);
474
475        col.upsert("a", vec![1]).unwrap();
476        col.upsert("b", vec![2]).unwrap();
477        assert_eq!(col.count().unwrap(), 2);
478    }
479
480    // --- In-memory StorageBackend for testing ---
481
482    struct InMemBackend {
483        collections: RwLock<HashMap<String, Arc<InMemCollection>>>,
484    }
485
486    impl InMemBackend {
487        fn new() -> Self {
488            Self {
489                collections: RwLock::new(HashMap::new()),
490            }
491        }
492    }
493
494    impl StorageBackend for InMemBackend {
495        fn collection(&self, name: &str) -> Arc<dyn Collection> {
496            let mut cols = self.collections.write().unwrap_or_else(|e| e.into_inner());
497            cols.entry(name.to_string())
498                .or_insert_with(|| Arc::new(InMemCollection::new()))
499                .clone()
500        }
501
502        fn list_collections(&self) -> Vec<String> {
503            self.collections
504                .read()
505                .unwrap_or_else(|e| e.into_inner())
506                .keys()
507                .cloned()
508                .collect()
509        }
510
511        fn flush(&self) -> Result<()> {
512            Ok(())
513        }
514
515        fn close(self) -> Result<()> {
516            Ok(())
517        }
518    }
519
520    #[test]
521    fn test_backend_collection_create_and_reuse() {
522        let backend = InMemBackend::new();
523        let col1 = backend.collection("cells");
524        col1.upsert("c1", vec![1]).unwrap();
525
526        let col2 = backend.collection("cells");
527        assert_eq!(col2.get("c1").unwrap(), Some(vec![1]));
528    }
529
530    #[test]
531    fn test_backend_list_collections() {
532        let backend = InMemBackend::new();
533        let _ = backend.collection("cells");
534        let _ = backend.collection("nodes");
535
536        let mut names = backend.list_collections();
537        names.sort();
538        assert_eq!(names, vec!["cells", "nodes"]);
539    }
540
541    #[test]
542    fn test_backend_flush() {
543        let backend = InMemBackend::new();
544        assert!(backend.flush().is_ok());
545    }
546
547    #[test]
548    fn test_backend_close() {
549        let backend = InMemBackend::new();
550        assert!(backend.close().is_ok());
551    }
552}