peat_mesh/storage/traits.rs
1//! Storage backend trait abstraction
2//!
3//! This module defines the core traits for Peat mesh's storage layer,
4//! enabling runtime backend selection between Ditto, Automerge, RocksDB, etc.
5//!
6//! # Design Philosophy
7//!
8//! - **Backend agnostic**: Business logic doesn't depend on specific storage implementation
9//! - **Type-safe**: Rust's type system enforces correct usage
10//! - **Flexible**: Easy to add new backends (Redb, LMDB, etc.)
11//! - **Testable**: Mock implementations for testing without real storage
12//!
13//! # Example
14//!
15//! ```ignore
16//! use peat_protocol::storage::{StorageBackend, create_storage_backend};
17//!
18//! // Create backend from configuration
19//! let config = StorageConfig::from_env()?;
20//! let storage = create_storage_backend(&config)?;
21//!
22//! // Get collection and perform operations
23//! let cells = storage.collection("cells");
24//! cells.upsert("cell-1", serialize(&cell_state)?)?;
25//!
26//! let cell_bytes = cells.get("cell-1")?.unwrap();
27//! let cell = deserialize::<CellState>(&cell_bytes)?;
28//! ```
29
30use anyhow::Result;
31use std::sync::Arc;
32
33/// Type alias for document predicate functions
34///
35/// Used in `Collection::find()` to filter documents by their serialized bytes.
36pub type DocumentPredicate = Box<dyn Fn(&[u8]) -> bool + Send>;
37
38/// Main storage backend trait
39///
40/// Implementations provide access to collections and manage the underlying storage.
41/// All implementations must be thread-safe (Send + Sync).
42///
43/// # Implementations
44///
45/// - **DittoBackend**: Wraps existing Ditto SDK (proprietary, production-ready)
46/// - **AutomergeInMemoryBackend**: In-memory Automerge (POC, testing)
47/// - **RocksDbBackend**: RocksDB persistence (production target)
48///
49/// # Thread Safety
50///
51/// All methods are safe to call from multiple threads. Implementations should use
52/// appropriate synchronization (Arc, RwLock, etc.) as needed.
53pub trait StorageBackend: Send + Sync {
54 /// Get or create a collection by name
55 ///
56 /// Collections are logical groupings of documents (e.g., "cells", "nodes").
57 /// Multiple calls with the same name return references to the same collection.
58 ///
59 /// # Arguments
60 ///
61 /// * `name` - Collection name (e.g., "cells", "nodes", "capabilities")
62 ///
63 /// # Returns
64 ///
65 /// A thread-safe collection handle
66 ///
67 /// # Example
68 ///
69 /// ```ignore
70 /// let cells = storage.collection("cells");
71 /// let nodes = storage.collection("nodes");
72 /// ```
73 fn collection(&self, name: &str) -> Arc<dyn Collection>;
74
75 /// List all collection names
76 ///
77 /// Returns names of all collections that have been created or contain documents.
78 ///
79 /// # Returns
80 ///
81 /// Vector of collection names (may be empty)
82 fn list_collections(&self) -> Vec<String>;
83
84 /// Flush any pending writes to disk
85 ///
86 /// For in-memory backends, this is a no-op. For persistent backends (RocksDB),
87 /// this ensures all writes are durable.
88 ///
89 /// # Returns
90 ///
91 /// Ok(()) on success, Err if flush fails
92 fn flush(&self) -> Result<()>;
93
94 /// Close the storage backend cleanly
95 ///
96 /// Implementations should flush pending writes and release resources.
97 /// After calling close(), the backend should not be used.
98 ///
99 /// # Returns
100 ///
101 /// Ok(()) on success, Err if close fails
102 fn close(self) -> Result<()>;
103}
104
105/// Collection trait for storing and querying documents
106///
107/// A collection is a logical grouping of documents (key-value pairs).
108/// Documents are stored as raw bytes (typically serialized protobuf).
109///
110/// # Document Storage Format
111///
112/// - **Key**: String document ID (e.g., "cell-1", "node-abc123")
113/// - **Value**: Raw bytes (serialized protobuf message)
114///
115/// # Thread Safety
116///
117/// All operations are thread-safe. Multiple threads can read/write concurrently.
118///
119/// # Example
120///
121/// ```ignore
122/// let cells = storage.collection("cells");
123///
124/// // Create and store a cell
125/// let cell = CellState { id: "cell-1".to_string(), ..Default::default() };
126/// let bytes = cell.encode_to_vec();
127/// cells.upsert("cell-1", bytes)?;
128///
129/// // Retrieve the cell
130/// if let Some(stored) = cells.get("cell-1")? {
131/// let cell = CellState::decode(&stored[..])?;
132/// println!("Retrieved cell: {}", cell.id);
133/// }
134///
135/// // Query all cells
136/// for (id, bytes) in cells.scan()? {
137/// let cell = CellState::decode(&bytes[..])?;
138/// println!("Cell {}: {:?}", id, cell);
139/// }
140/// ```
141pub trait Collection: Send + Sync {
142 /// Insert or update a document
143 ///
144 /// If a document with the given ID exists, it is replaced. Otherwise, a new
145 /// document is created.
146 ///
147 /// # Arguments
148 ///
149 /// * `doc_id` - Unique document identifier
150 /// * `data` - Serialized document bytes (typically protobuf)
151 ///
152 /// # Returns
153 ///
154 /// Ok(()) on success, Err if upsert fails
155 ///
156 /// # Example
157 ///
158 /// ```ignore
159 /// let cell = CellState { id: "cell-1".to_string(), ..Default::default() };
160 /// cells.upsert("cell-1", cell.encode_to_vec())?;
161 /// ```
162 fn upsert(&self, doc_id: &str, data: Vec<u8>) -> Result<()>;
163
164 /// Get a document by ID
165 ///
166 /// # Arguments
167 ///
168 /// * `doc_id` - Document identifier to retrieve
169 ///
170 /// # Returns
171 ///
172 /// - `Ok(Some(bytes))` if document exists
173 /// - `Ok(None)` if document not found
174 /// - `Err` if query fails
175 ///
176 /// # Example
177 ///
178 /// ```ignore
179 /// match cells.get("cell-1")? {
180 /// Some(bytes) => {
181 /// let cell = CellState::decode(&bytes[..])?;
182 /// println!("Found cell: {}", cell.id);
183 /// }
184 /// None => println!("Cell not found"),
185 /// }
186 /// ```
187 fn get(&self, doc_id: &str) -> Result<Option<Vec<u8>>>;
188
189 /// Delete a document by ID
190 ///
191 /// If the document doesn't exist, this is a no-op (not an error).
192 ///
193 /// # Arguments
194 ///
195 /// * `doc_id` - Document identifier to delete
196 ///
197 /// # Returns
198 ///
199 /// Ok(()) on success (whether or not document existed), Err if delete fails
200 ///
201 /// # Example
202 ///
203 /// ```ignore
204 /// cells.delete("cell-1")?;
205 /// ```
206 fn delete(&self, doc_id: &str) -> Result<()>;
207
208 /// Scan all documents in the collection
209 ///
210 /// Returns all documents as (id, bytes) tuples. Order is implementation-defined.
211 ///
212 /// # Performance
213 ///
214 /// This loads all documents into memory. For large collections, consider
215 /// streaming or pagination (future enhancement).
216 ///
217 /// # Returns
218 ///
219 /// Vector of (document_id, document_bytes) tuples
220 ///
221 /// # Example
222 ///
223 /// ```ignore
224 /// for (id, bytes) in cells.scan()? {
225 /// let cell = CellState::decode(&bytes[..])?;
226 /// println!("Cell {}: {:?}", id, cell);
227 /// }
228 /// ```
229 fn scan(&self) -> Result<Vec<(String, Vec<u8>)>>;
230
231 /// Find documents matching a predicate
232 ///
233 /// Filters documents by applying a predicate function to their serialized bytes.
234 /// Less efficient than indexed queries, but flexible.
235 ///
236 /// # Arguments
237 ///
238 /// * `predicate` - Function that returns true for documents to include
239 ///
240 /// # Returns
241 ///
242 /// Vector of matching (document_id, document_bytes) tuples
243 ///
244 /// # Example
245 ///
246 /// ```ignore
247 /// // Find all cells with status "active"
248 /// let active_cells = cells.find(Box::new(|bytes| {
249 /// if let Ok(cell) = CellState::decode(bytes) {
250 /// cell.status == CellStatus::Active as i32
251 /// } else {
252 /// false
253 /// }
254 /// }))?;
255 /// ```
256 fn find(&self, predicate: DocumentPredicate) -> Result<Vec<(String, Vec<u8>)>>;
257
258 /// Query documents by geohash prefix (proximity queries)
259 ///
260 /// Geohash is a hierarchical spatial index. Documents with the same prefix
261 /// are geographically close. Longer prefixes = smaller areas.
262 ///
263 /// # Arguments
264 ///
265 /// * `prefix` - Geohash prefix (e.g., "9q8y" for San Francisco area)
266 ///
267 /// # Returns
268 ///
269 /// Vector of matching (document_id, document_bytes) tuples
270 ///
271 /// # Example
272 ///
273 /// ```ignore
274 /// // Find all nodes within ~5km of a location
275 /// let nearby_nodes = nodes.query_geohash_prefix("9q8yy")?;
276 /// ```
277 ///
278 /// # Implementation Notes
279 ///
280 /// - For Ditto: Uses geohash index (efficient)
281 /// - For RocksDB: Uses prefix scan (requires geohash in key)
282 /// - For in-memory: Scans all documents (inefficient for large datasets)
283 fn query_geohash_prefix(&self, prefix: &str) -> Result<Vec<(String, Vec<u8>)>>;
284
285 /// Count documents in the collection
286 ///
287 /// # Returns
288 ///
289 /// Number of documents in collection
290 ///
291 /// # Example
292 ///
293 /// ```ignore
294 /// let cell_count = cells.count()?;
295 /// println!("Total cells: {}", cell_count);
296 /// ```
297 fn count(&self) -> Result<usize>;
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303 use std::collections::HashMap;
304 use std::sync::RwLock;
305
306 // Test that traits are object-safe (can be used as trait objects)
307 #[test]
308 fn test_storage_backend_is_object_safe() {
309 fn _assert_object_safe(_: &dyn StorageBackend) {}
310 }
311
312 #[test]
313 fn test_collection_is_object_safe() {
314 fn _assert_object_safe(_: &dyn Collection) {}
315 }
316
317 // --- In-memory Collection for testing ---
318
319 struct InMemCollection {
320 data: RwLock<HashMap<String, Vec<u8>>>,
321 }
322
323 impl InMemCollection {
324 fn new() -> Self {
325 Self {
326 data: RwLock::new(HashMap::new()),
327 }
328 }
329 }
330
331 impl Collection for InMemCollection {
332 fn upsert(&self, doc_id: &str, data: Vec<u8>) -> Result<()> {
333 self.data
334 .write()
335 .unwrap_or_else(|e| e.into_inner())
336 .insert(doc_id.to_string(), data);
337 Ok(())
338 }
339
340 fn get(&self, doc_id: &str) -> Result<Option<Vec<u8>>> {
341 Ok(self
342 .data
343 .read()
344 .unwrap_or_else(|e| e.into_inner())
345 .get(doc_id)
346 .cloned())
347 }
348
349 fn delete(&self, doc_id: &str) -> Result<()> {
350 self.data
351 .write()
352 .unwrap_or_else(|e| e.into_inner())
353 .remove(doc_id);
354 Ok(())
355 }
356
357 fn scan(&self) -> Result<Vec<(String, Vec<u8>)>> {
358 Ok(self
359 .data
360 .read()
361 .unwrap()
362 .iter()
363 .map(|(k, v)| (k.clone(), v.clone()))
364 .collect())
365 }
366
367 fn find(&self, predicate: DocumentPredicate) -> Result<Vec<(String, Vec<u8>)>> {
368 Ok(self
369 .data
370 .read()
371 .unwrap()
372 .iter()
373 .filter(|(_, v)| predicate(v))
374 .map(|(k, v)| (k.clone(), v.clone()))
375 .collect())
376 }
377
378 fn query_geohash_prefix(&self, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
379 Ok(self
380 .data
381 .read()
382 .unwrap()
383 .iter()
384 .filter(|(k, _)| k.starts_with(prefix))
385 .map(|(k, v)| (k.clone(), v.clone()))
386 .collect())
387 }
388
389 fn count(&self) -> Result<usize> {
390 Ok(self.data.read().unwrap_or_else(|e| e.into_inner()).len())
391 }
392 }
393
394 #[test]
395 fn test_collection_upsert_and_get() {
396 let col = InMemCollection::new();
397 col.upsert("doc-1", vec![1, 2, 3]).unwrap();
398
399 let result = col.get("doc-1").unwrap();
400 assert_eq!(result, Some(vec![1, 2, 3]));
401 }
402
403 #[test]
404 fn test_collection_get_missing() {
405 let col = InMemCollection::new();
406 assert_eq!(col.get("nonexistent").unwrap(), None);
407 }
408
409 #[test]
410 fn test_collection_upsert_overwrite() {
411 let col = InMemCollection::new();
412 col.upsert("doc-1", vec![1]).unwrap();
413 col.upsert("doc-1", vec![2]).unwrap();
414
415 assert_eq!(col.get("doc-1").unwrap(), Some(vec![2]));
416 assert_eq!(col.count().unwrap(), 1);
417 }
418
419 #[test]
420 fn test_collection_delete() {
421 let col = InMemCollection::new();
422 col.upsert("doc-1", vec![1]).unwrap();
423 col.delete("doc-1").unwrap();
424
425 assert_eq!(col.get("doc-1").unwrap(), None);
426 assert_eq!(col.count().unwrap(), 0);
427 }
428
429 #[test]
430 fn test_collection_delete_nonexistent_noop() {
431 let col = InMemCollection::new();
432 col.delete("nonexistent").unwrap(); // should not error
433 }
434
435 #[test]
436 fn test_collection_scan() {
437 let col = InMemCollection::new();
438 col.upsert("a", vec![1]).unwrap();
439 col.upsert("b", vec![2]).unwrap();
440
441 let mut results = col.scan().unwrap();
442 results.sort_by(|a, b| a.0.cmp(&b.0));
443 assert_eq!(results.len(), 2);
444 assert_eq!(results[0], ("a".to_string(), vec![1]));
445 assert_eq!(results[1], ("b".to_string(), vec![2]));
446 }
447
448 #[test]
449 fn test_collection_find() {
450 let col = InMemCollection::new();
451 col.upsert("big", vec![100, 200]).unwrap();
452 col.upsert("small", vec![1]).unwrap();
453
454 let results = col.find(Box::new(|bytes| bytes.len() > 1)).unwrap();
455 assert_eq!(results.len(), 1);
456 assert_eq!(results[0].0, "big");
457 }
458
459 #[test]
460 fn test_collection_query_geohash_prefix() {
461 let col = InMemCollection::new();
462 col.upsert("9q8yy_a", vec![1]).unwrap();
463 col.upsert("9q8yy_b", vec![2]).unwrap();
464 col.upsert("u4pru_c", vec![3]).unwrap();
465
466 let results = col.query_geohash_prefix("9q8yy").unwrap();
467 assert_eq!(results.len(), 2);
468 }
469
470 #[test]
471 fn test_collection_count() {
472 let col = InMemCollection::new();
473 assert_eq!(col.count().unwrap(), 0);
474
475 col.upsert("a", vec![1]).unwrap();
476 col.upsert("b", vec![2]).unwrap();
477 assert_eq!(col.count().unwrap(), 2);
478 }
479
480 // --- In-memory StorageBackend for testing ---
481
482 struct InMemBackend {
483 collections: RwLock<HashMap<String, Arc<InMemCollection>>>,
484 }
485
486 impl InMemBackend {
487 fn new() -> Self {
488 Self {
489 collections: RwLock::new(HashMap::new()),
490 }
491 }
492 }
493
494 impl StorageBackend for InMemBackend {
495 fn collection(&self, name: &str) -> Arc<dyn Collection> {
496 let mut cols = self.collections.write().unwrap_or_else(|e| e.into_inner());
497 cols.entry(name.to_string())
498 .or_insert_with(|| Arc::new(InMemCollection::new()))
499 .clone()
500 }
501
502 fn list_collections(&self) -> Vec<String> {
503 self.collections
504 .read()
505 .unwrap_or_else(|e| e.into_inner())
506 .keys()
507 .cloned()
508 .collect()
509 }
510
511 fn flush(&self) -> Result<()> {
512 Ok(())
513 }
514
515 fn close(self) -> Result<()> {
516 Ok(())
517 }
518 }
519
520 #[test]
521 fn test_backend_collection_create_and_reuse() {
522 let backend = InMemBackend::new();
523 let col1 = backend.collection("cells");
524 col1.upsert("c1", vec![1]).unwrap();
525
526 let col2 = backend.collection("cells");
527 assert_eq!(col2.get("c1").unwrap(), Some(vec![1]));
528 }
529
530 #[test]
531 fn test_backend_list_collections() {
532 let backend = InMemBackend::new();
533 let _ = backend.collection("cells");
534 let _ = backend.collection("nodes");
535
536 let mut names = backend.list_collections();
537 names.sort();
538 assert_eq!(names, vec!["cells", "nodes"]);
539 }
540
541 #[test]
542 fn test_backend_flush() {
543 let backend = InMemBackend::new();
544 assert!(backend.flush().is_ok());
545 }
546
547 #[test]
548 fn test_backend_close() {
549 let backend = InMemBackend::new();
550 assert!(backend.close().is_ok());
551 }
552}