Skip to main content

peat_mesh/storage/
traits.rs

1//! Storage backend trait abstraction
2//!
3//! This module defines the core traits for Peat mesh's storage layer,
4//! enabling runtime backend selection (Automerge is the default production backend).
5//!
6//! # Design Philosophy
7//!
8//! - **Backend agnostic**: Business logic doesn't depend on specific storage implementation
9//! - **Type-safe**: Rust's type system enforces correct usage
10//! - **Flexible**: Easy to add new backends (Redb, LMDB, etc.)
11//! - **Testable**: Mock implementations for testing without real storage
12//!
13//! # Example
14//!
15//! ```ignore
16//! use peat_protocol::storage::{StorageBackend, create_storage_backend};
17//!
18//! // Create backend from configuration
19//! let config = StorageConfig::from_env()?;
20//! let storage = create_storage_backend(&config)?;
21//!
22//! // Get collection and perform operations
23//! let cells = storage.collection("cells");
24//! cells.upsert("cell-1", serialize(&cell_state)?)?;
25//!
26//! let cell_bytes = cells.get("cell-1")?.unwrap();
27//! let cell = deserialize::<CellState>(&cell_bytes)?;
28//! ```
29
30use anyhow::Result;
31use std::sync::Arc;
32
33/// Type alias for document predicate functions
34///
35/// Used in `Collection::find()` to filter documents by their serialized bytes.
36pub type DocumentPredicate = Box<dyn Fn(&[u8]) -> bool + Send>;
37
38/// Main storage backend trait
39///
40/// Implementations provide access to collections and manage the underlying storage.
41/// All implementations must be thread-safe (Send + Sync).
42///
43/// # Implementations
44///
45/// - **AutomergeStore**: Automerge CRDT with redb persistence (production)
46/// - **InMemoryBackend**: In-memory store for testing
47///
48/// # Thread Safety
49///
50/// All methods are safe to call from multiple threads. Implementations should use
51/// appropriate synchronization (Arc, RwLock, etc.) as needed.
52pub trait StorageBackend: Send + Sync {
53    /// Get or create a collection by name
54    ///
55    /// Collections are logical groupings of documents (e.g., "cells", "nodes").
56    /// Multiple calls with the same name return references to the same collection.
57    ///
58    /// # Arguments
59    ///
60    /// * `name` - Collection name (e.g., "cells", "nodes", "capabilities")
61    ///
62    /// # Returns
63    ///
64    /// A thread-safe collection handle
65    ///
66    /// # Example
67    ///
68    /// ```ignore
69    /// let cells = storage.collection("cells");
70    /// let nodes = storage.collection("nodes");
71    /// ```
72    fn collection(&self, name: &str) -> Arc<dyn Collection>;
73
74    /// List all collection names
75    ///
76    /// Returns names of all collections that have been created or contain documents.
77    ///
78    /// # Returns
79    ///
80    /// Vector of collection names (may be empty)
81    fn list_collections(&self) -> Vec<String>;
82
83    /// Flush any pending writes to disk
84    ///
85    /// For in-memory backends, this is a no-op. For persistent backends (RocksDB),
86    /// this ensures all writes are durable.
87    ///
88    /// # Returns
89    ///
90    /// Ok(()) on success, Err if flush fails
91    fn flush(&self) -> Result<()>;
92
93    /// Close the storage backend cleanly
94    ///
95    /// Implementations should flush pending writes and release resources.
96    /// After calling close(), the backend should not be used.
97    ///
98    /// # Returns
99    ///
100    /// Ok(()) on success, Err if close fails
101    fn close(self) -> Result<()>;
102}
103
104/// Collection trait for storing and querying documents
105///
106/// A collection is a logical grouping of documents (key-value pairs).
107/// Documents are stored as raw bytes (typically serialized protobuf).
108///
109/// # Document Storage Format
110///
111/// - **Key**: String document ID (e.g., "cell-1", "node-abc123")
112/// - **Value**: Raw bytes (serialized protobuf message)
113///
114/// # Thread Safety
115///
116/// All operations are thread-safe. Multiple threads can read/write concurrently.
117///
118/// # Example
119///
120/// ```ignore
121/// let cells = storage.collection("cells");
122///
123/// // Create and store a cell
124/// let cell = CellState { id: "cell-1".to_string(), ..Default::default() };
125/// let bytes = cell.encode_to_vec();
126/// cells.upsert("cell-1", bytes)?;
127///
128/// // Retrieve the cell
129/// if let Some(stored) = cells.get("cell-1")? {
130///     let cell = CellState::decode(&stored[..])?;
131///     println!("Retrieved cell: {}", cell.id);
132/// }
133///
134/// // Query all cells
135/// for (id, bytes) in cells.scan()? {
136///     let cell = CellState::decode(&bytes[..])?;
137///     println!("Cell {}: {:?}", id, cell);
138/// }
139/// ```
140pub trait Collection: Send + Sync {
141    /// Insert or update a document
142    ///
143    /// If a document with the given ID exists, it is replaced. Otherwise, a new
144    /// document is created.
145    ///
146    /// # Arguments
147    ///
148    /// * `doc_id` - Unique document identifier
149    /// * `data` - Serialized document bytes (typically protobuf)
150    ///
151    /// # Returns
152    ///
153    /// Ok(()) on success, Err if upsert fails
154    ///
155    /// # Example
156    ///
157    /// ```ignore
158    /// let cell = CellState { id: "cell-1".to_string(), ..Default::default() };
159    /// cells.upsert("cell-1", cell.encode_to_vec())?;
160    /// ```
161    fn upsert(&self, doc_id: &str, data: Vec<u8>) -> Result<()>;
162
163    /// Get a document by ID
164    ///
165    /// # Arguments
166    ///
167    /// * `doc_id` - Document identifier to retrieve
168    ///
169    /// # Returns
170    ///
171    /// - `Ok(Some(bytes))` if document exists
172    /// - `Ok(None)` if document not found
173    /// - `Err` if query fails
174    ///
175    /// # Example
176    ///
177    /// ```ignore
178    /// match cells.get("cell-1")? {
179    ///     Some(bytes) => {
180    ///         let cell = CellState::decode(&bytes[..])?;
181    ///         println!("Found cell: {}", cell.id);
182    ///     }
183    ///     None => println!("Cell not found"),
184    /// }
185    /// ```
186    fn get(&self, doc_id: &str) -> Result<Option<Vec<u8>>>;
187
188    /// Delete a document by ID
189    ///
190    /// If the document doesn't exist, this is a no-op (not an error).
191    ///
192    /// # Arguments
193    ///
194    /// * `doc_id` - Document identifier to delete
195    ///
196    /// # Returns
197    ///
198    /// Ok(()) on success (whether or not document existed), Err if delete fails
199    ///
200    /// # Example
201    ///
202    /// ```ignore
203    /// cells.delete("cell-1")?;
204    /// ```
205    fn delete(&self, doc_id: &str) -> Result<()>;
206
207    /// Scan all documents in the collection
208    ///
209    /// Returns all documents as (id, bytes) tuples. Order is implementation-defined.
210    ///
211    /// # Performance
212    ///
213    /// This loads all documents into memory. For large collections, consider
214    /// streaming or pagination (future enhancement).
215    ///
216    /// # Returns
217    ///
218    /// Vector of (document_id, document_bytes) tuples
219    ///
220    /// # Example
221    ///
222    /// ```ignore
223    /// for (id, bytes) in cells.scan()? {
224    ///     let cell = CellState::decode(&bytes[..])?;
225    ///     println!("Cell {}: {:?}", id, cell);
226    /// }
227    /// ```
228    fn scan(&self) -> Result<Vec<(String, Vec<u8>)>>;
229
230    /// Find documents matching a predicate
231    ///
232    /// Filters documents by applying a predicate function to their serialized bytes.
233    /// Less efficient than indexed queries, but flexible.
234    ///
235    /// # Arguments
236    ///
237    /// * `predicate` - Function that returns true for documents to include
238    ///
239    /// # Returns
240    ///
241    /// Vector of matching (document_id, document_bytes) tuples
242    ///
243    /// # Example
244    ///
245    /// ```ignore
246    /// // Find all cells with status "active"
247    /// let active_cells = cells.find(Box::new(|bytes| {
248    ///     if let Ok(cell) = CellState::decode(bytes) {
249    ///         cell.status == CellStatus::Active as i32
250    ///     } else {
251    ///         false
252    ///     }
253    /// }))?;
254    /// ```
255    fn find(&self, predicate: DocumentPredicate) -> Result<Vec<(String, Vec<u8>)>>;
256
257    /// Query documents by geohash prefix (proximity queries)
258    ///
259    /// Geohash is a hierarchical spatial index. Documents with the same prefix
260    /// are geographically close. Longer prefixes = smaller areas.
261    ///
262    /// # Arguments
263    ///
264    /// * `prefix` - Geohash prefix (e.g., "9q8y" for San Francisco area)
265    ///
266    /// # Returns
267    ///
268    /// Vector of matching (document_id, document_bytes) tuples
269    ///
270    /// # Example
271    ///
272    /// ```ignore
273    /// // Find all nodes within ~5km of a location
274    /// let nearby_nodes = nodes.query_geohash_prefix("9q8yy")?;
275    /// ```
276    ///
277    /// # Implementation Notes
278    ///
279    /// - For Automerge/redb: Uses prefix scan (requires geohash in key)
280    /// - For in-memory: Scans all documents (inefficient for large datasets)
281    fn query_geohash_prefix(&self, prefix: &str) -> Result<Vec<(String, Vec<u8>)>>;
282
283    /// Count documents in the collection
284    ///
285    /// # Returns
286    ///
287    /// Number of documents in collection
288    ///
289    /// # Example
290    ///
291    /// ```ignore
292    /// let cell_count = cells.count()?;
293    /// println!("Total cells: {}", cell_count);
294    /// ```
295    fn count(&self) -> Result<usize>;
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use std::collections::HashMap;
302    use std::sync::RwLock;
303
304    // Test that traits are object-safe (can be used as trait objects)
305    #[test]
306    fn test_storage_backend_is_object_safe() {
307        fn _assert_object_safe(_: &dyn StorageBackend) {}
308    }
309
310    #[test]
311    fn test_collection_is_object_safe() {
312        fn _assert_object_safe(_: &dyn Collection) {}
313    }
314
315    // --- In-memory Collection for testing ---
316
317    struct InMemCollection {
318        data: RwLock<HashMap<String, Vec<u8>>>,
319    }
320
321    impl InMemCollection {
322        fn new() -> Self {
323            Self {
324                data: RwLock::new(HashMap::new()),
325            }
326        }
327    }
328
329    impl Collection for InMemCollection {
330        fn upsert(&self, doc_id: &str, data: Vec<u8>) -> Result<()> {
331            self.data
332                .write()
333                .unwrap_or_else(|e| e.into_inner())
334                .insert(doc_id.to_string(), data);
335            Ok(())
336        }
337
338        fn get(&self, doc_id: &str) -> Result<Option<Vec<u8>>> {
339            Ok(self
340                .data
341                .read()
342                .unwrap_or_else(|e| e.into_inner())
343                .get(doc_id)
344                .cloned())
345        }
346
347        fn delete(&self, doc_id: &str) -> Result<()> {
348            self.data
349                .write()
350                .unwrap_or_else(|e| e.into_inner())
351                .remove(doc_id);
352            Ok(())
353        }
354
355        fn scan(&self) -> Result<Vec<(String, Vec<u8>)>> {
356            Ok(self
357                .data
358                .read()
359                .unwrap()
360                .iter()
361                .map(|(k, v)| (k.clone(), v.clone()))
362                .collect())
363        }
364
365        fn find(&self, predicate: DocumentPredicate) -> Result<Vec<(String, Vec<u8>)>> {
366            Ok(self
367                .data
368                .read()
369                .unwrap()
370                .iter()
371                .filter(|(_, v)| predicate(v))
372                .map(|(k, v)| (k.clone(), v.clone()))
373                .collect())
374        }
375
376        fn query_geohash_prefix(&self, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
377            Ok(self
378                .data
379                .read()
380                .unwrap()
381                .iter()
382                .filter(|(k, _)| k.starts_with(prefix))
383                .map(|(k, v)| (k.clone(), v.clone()))
384                .collect())
385        }
386
387        fn count(&self) -> Result<usize> {
388            Ok(self.data.read().unwrap_or_else(|e| e.into_inner()).len())
389        }
390    }
391
392    #[test]
393    fn test_collection_upsert_and_get() {
394        let col = InMemCollection::new();
395        col.upsert("doc-1", vec![1, 2, 3]).unwrap();
396
397        let result = col.get("doc-1").unwrap();
398        assert_eq!(result, Some(vec![1, 2, 3]));
399    }
400
401    #[test]
402    fn test_collection_get_missing() {
403        let col = InMemCollection::new();
404        assert_eq!(col.get("nonexistent").unwrap(), None);
405    }
406
407    #[test]
408    fn test_collection_upsert_overwrite() {
409        let col = InMemCollection::new();
410        col.upsert("doc-1", vec![1]).unwrap();
411        col.upsert("doc-1", vec![2]).unwrap();
412
413        assert_eq!(col.get("doc-1").unwrap(), Some(vec![2]));
414        assert_eq!(col.count().unwrap(), 1);
415    }
416
417    #[test]
418    fn test_collection_delete() {
419        let col = InMemCollection::new();
420        col.upsert("doc-1", vec![1]).unwrap();
421        col.delete("doc-1").unwrap();
422
423        assert_eq!(col.get("doc-1").unwrap(), None);
424        assert_eq!(col.count().unwrap(), 0);
425    }
426
427    #[test]
428    fn test_collection_delete_nonexistent_noop() {
429        let col = InMemCollection::new();
430        col.delete("nonexistent").unwrap(); // should not error
431    }
432
433    #[test]
434    fn test_collection_scan() {
435        let col = InMemCollection::new();
436        col.upsert("a", vec![1]).unwrap();
437        col.upsert("b", vec![2]).unwrap();
438
439        let mut results = col.scan().unwrap();
440        results.sort_by(|a, b| a.0.cmp(&b.0));
441        assert_eq!(results.len(), 2);
442        assert_eq!(results[0], ("a".to_string(), vec![1]));
443        assert_eq!(results[1], ("b".to_string(), vec![2]));
444    }
445
446    #[test]
447    fn test_collection_find() {
448        let col = InMemCollection::new();
449        col.upsert("big", vec![100, 200]).unwrap();
450        col.upsert("small", vec![1]).unwrap();
451
452        let results = col.find(Box::new(|bytes| bytes.len() > 1)).unwrap();
453        assert_eq!(results.len(), 1);
454        assert_eq!(results[0].0, "big");
455    }
456
457    #[test]
458    fn test_collection_query_geohash_prefix() {
459        let col = InMemCollection::new();
460        col.upsert("9q8yy_a", vec![1]).unwrap();
461        col.upsert("9q8yy_b", vec![2]).unwrap();
462        col.upsert("u4pru_c", vec![3]).unwrap();
463
464        let results = col.query_geohash_prefix("9q8yy").unwrap();
465        assert_eq!(results.len(), 2);
466    }
467
468    #[test]
469    fn test_collection_count() {
470        let col = InMemCollection::new();
471        assert_eq!(col.count().unwrap(), 0);
472
473        col.upsert("a", vec![1]).unwrap();
474        col.upsert("b", vec![2]).unwrap();
475        assert_eq!(col.count().unwrap(), 2);
476    }
477
478    // --- In-memory StorageBackend for testing ---
479
480    struct InMemBackend {
481        collections: RwLock<HashMap<String, Arc<InMemCollection>>>,
482    }
483
484    impl InMemBackend {
485        fn new() -> Self {
486            Self {
487                collections: RwLock::new(HashMap::new()),
488            }
489        }
490    }
491
492    impl StorageBackend for InMemBackend {
493        fn collection(&self, name: &str) -> Arc<dyn Collection> {
494            let mut cols = self.collections.write().unwrap_or_else(|e| e.into_inner());
495            cols.entry(name.to_string())
496                .or_insert_with(|| Arc::new(InMemCollection::new()))
497                .clone()
498        }
499
500        fn list_collections(&self) -> Vec<String> {
501            self.collections
502                .read()
503                .unwrap_or_else(|e| e.into_inner())
504                .keys()
505                .cloned()
506                .collect()
507        }
508
509        fn flush(&self) -> Result<()> {
510            Ok(())
511        }
512
513        fn close(self) -> Result<()> {
514            Ok(())
515        }
516    }
517
518    #[test]
519    fn test_backend_collection_create_and_reuse() {
520        let backend = InMemBackend::new();
521        let col1 = backend.collection("cells");
522        col1.upsert("c1", vec![1]).unwrap();
523
524        let col2 = backend.collection("cells");
525        assert_eq!(col2.get("c1").unwrap(), Some(vec![1]));
526    }
527
528    #[test]
529    fn test_backend_list_collections() {
530        let backend = InMemBackend::new();
531        let _ = backend.collection("cells");
532        let _ = backend.collection("nodes");
533
534        let mut names = backend.list_collections();
535        names.sort();
536        assert_eq!(names, vec!["cells", "nodes"]);
537    }
538
539    #[test]
540    fn test_backend_flush() {
541        let backend = InMemBackend::new();
542        assert!(backend.flush().is_ok());
543    }
544
545    #[test]
546    fn test_backend_close() {
547        let backend = InMemBackend::new();
548        assert!(backend.close().is_ok());
549    }
550}