peat_mesh/storage/traits.rs
1//! Storage backend trait abstraction
2//!
3//! This module defines the core traits for Peat mesh's storage layer,
4//! enabling runtime backend selection (Automerge is the default production backend).
5//!
6//! # Design Philosophy
7//!
8//! - **Backend agnostic**: Business logic doesn't depend on specific storage implementation
9//! - **Type-safe**: Rust's type system enforces correct usage
10//! - **Flexible**: Easy to add new backends (Redb, LMDB, etc.)
11//! - **Testable**: Mock implementations for testing without real storage
12//!
13//! # Example
14//!
15//! ```ignore
16//! use peat_protocol::storage::{StorageBackend, create_storage_backend};
17//!
18//! // Create backend from configuration
19//! let config = StorageConfig::from_env()?;
20//! let storage = create_storage_backend(&config)?;
21//!
22//! // Get collection and perform operations
23//! let cells = storage.collection("cells");
24//! cells.upsert("cell-1", serialize(&cell_state)?)?;
25//!
26//! let cell_bytes = cells.get("cell-1")?.unwrap();
27//! let cell = deserialize::<CellState>(&cell_bytes)?;
28//! ```
29
30use anyhow::Result;
31use std::sync::Arc;
32
33/// Type alias for document predicate functions
34///
35/// Used in `Collection::find()` to filter documents by their serialized bytes.
36pub type DocumentPredicate = Box<dyn Fn(&[u8]) -> bool + Send>;
37
38/// Main storage backend trait
39///
40/// Implementations provide access to collections and manage the underlying storage.
41/// All implementations must be thread-safe (Send + Sync).
42///
43/// # Implementations
44///
45/// - **AutomergeStore**: Automerge CRDT with redb persistence (production)
46/// - **InMemoryBackend**: In-memory store for testing
47///
48/// # Thread Safety
49///
50/// All methods are safe to call from multiple threads. Implementations should use
51/// appropriate synchronization (Arc, RwLock, etc.) as needed.
52pub trait StorageBackend: Send + Sync {
53 /// Get or create a collection by name
54 ///
55 /// Collections are logical groupings of documents (e.g., "cells", "nodes").
56 /// Multiple calls with the same name return references to the same collection.
57 ///
58 /// # Arguments
59 ///
60 /// * `name` - Collection name (e.g., "cells", "nodes", "capabilities")
61 ///
62 /// # Returns
63 ///
64 /// A thread-safe collection handle
65 ///
66 /// # Example
67 ///
68 /// ```ignore
69 /// let cells = storage.collection("cells");
70 /// let nodes = storage.collection("nodes");
71 /// ```
72 fn collection(&self, name: &str) -> Arc<dyn Collection>;
73
74 /// List all collection names
75 ///
76 /// Returns names of all collections that have been created or contain documents.
77 ///
78 /// # Returns
79 ///
80 /// Vector of collection names (may be empty)
81 fn list_collections(&self) -> Vec<String>;
82
83 /// Flush any pending writes to disk
84 ///
85 /// For in-memory backends, this is a no-op. For persistent backends (RocksDB),
86 /// this ensures all writes are durable.
87 ///
88 /// # Returns
89 ///
90 /// Ok(()) on success, Err if flush fails
91 fn flush(&self) -> Result<()>;
92
93 /// Close the storage backend cleanly
94 ///
95 /// Implementations should flush pending writes and release resources.
96 /// After calling close(), the backend should not be used.
97 ///
98 /// # Returns
99 ///
100 /// Ok(()) on success, Err if close fails
101 fn close(self) -> Result<()>;
102}
103
104/// Collection trait for storing and querying documents
105///
106/// A collection is a logical grouping of documents (key-value pairs).
107/// Documents are stored as raw bytes (typically serialized protobuf).
108///
109/// # Document Storage Format
110///
111/// - **Key**: String document ID (e.g., "cell-1", "node-abc123")
112/// - **Value**: Raw bytes (serialized protobuf message)
113///
114/// # Thread Safety
115///
116/// All operations are thread-safe. Multiple threads can read/write concurrently.
117///
118/// # Example
119///
120/// ```ignore
121/// let cells = storage.collection("cells");
122///
123/// // Create and store a cell
124/// let cell = CellState { id: "cell-1".to_string(), ..Default::default() };
125/// let bytes = cell.encode_to_vec();
126/// cells.upsert("cell-1", bytes)?;
127///
128/// // Retrieve the cell
129/// if let Some(stored) = cells.get("cell-1")? {
130/// let cell = CellState::decode(&stored[..])?;
131/// println!("Retrieved cell: {}", cell.id);
132/// }
133///
134/// // Query all cells
135/// for (id, bytes) in cells.scan()? {
136/// let cell = CellState::decode(&bytes[..])?;
137/// println!("Cell {}: {:?}", id, cell);
138/// }
139/// ```
140pub trait Collection: Send + Sync {
141 /// Insert or update a document
142 ///
143 /// If a document with the given ID exists, it is replaced. Otherwise, a new
144 /// document is created.
145 ///
146 /// # Arguments
147 ///
148 /// * `doc_id` - Unique document identifier
149 /// * `data` - Serialized document bytes (typically protobuf)
150 ///
151 /// # Returns
152 ///
153 /// Ok(()) on success, Err if upsert fails
154 ///
155 /// # Example
156 ///
157 /// ```ignore
158 /// let cell = CellState { id: "cell-1".to_string(), ..Default::default() };
159 /// cells.upsert("cell-1", cell.encode_to_vec())?;
160 /// ```
161 fn upsert(&self, doc_id: &str, data: Vec<u8>) -> Result<()>;
162
163 /// Get a document by ID
164 ///
165 /// # Arguments
166 ///
167 /// * `doc_id` - Document identifier to retrieve
168 ///
169 /// # Returns
170 ///
171 /// - `Ok(Some(bytes))` if document exists
172 /// - `Ok(None)` if document not found
173 /// - `Err` if query fails
174 ///
175 /// # Example
176 ///
177 /// ```ignore
178 /// match cells.get("cell-1")? {
179 /// Some(bytes) => {
180 /// let cell = CellState::decode(&bytes[..])?;
181 /// println!("Found cell: {}", cell.id);
182 /// }
183 /// None => println!("Cell not found"),
184 /// }
185 /// ```
186 fn get(&self, doc_id: &str) -> Result<Option<Vec<u8>>>;
187
188 /// Delete a document by ID
189 ///
190 /// If the document doesn't exist, this is a no-op (not an error).
191 ///
192 /// # Arguments
193 ///
194 /// * `doc_id` - Document identifier to delete
195 ///
196 /// # Returns
197 ///
198 /// Ok(()) on success (whether or not document existed), Err if delete fails
199 ///
200 /// # Example
201 ///
202 /// ```ignore
203 /// cells.delete("cell-1")?;
204 /// ```
205 fn delete(&self, doc_id: &str) -> Result<()>;
206
207 /// Scan all documents in the collection
208 ///
209 /// Returns all documents as (id, bytes) tuples. Order is implementation-defined.
210 ///
211 /// # Performance
212 ///
213 /// This loads all documents into memory. For large collections, consider
214 /// streaming or pagination (future enhancement).
215 ///
216 /// # Returns
217 ///
218 /// Vector of (document_id, document_bytes) tuples
219 ///
220 /// # Example
221 ///
222 /// ```ignore
223 /// for (id, bytes) in cells.scan()? {
224 /// let cell = CellState::decode(&bytes[..])?;
225 /// println!("Cell {}: {:?}", id, cell);
226 /// }
227 /// ```
228 fn scan(&self) -> Result<Vec<(String, Vec<u8>)>>;
229
230 /// Find documents matching a predicate
231 ///
232 /// Filters documents by applying a predicate function to their serialized bytes.
233 /// Less efficient than indexed queries, but flexible.
234 ///
235 /// # Arguments
236 ///
237 /// * `predicate` - Function that returns true for documents to include
238 ///
239 /// # Returns
240 ///
241 /// Vector of matching (document_id, document_bytes) tuples
242 ///
243 /// # Example
244 ///
245 /// ```ignore
246 /// // Find all cells with status "active"
247 /// let active_cells = cells.find(Box::new(|bytes| {
248 /// if let Ok(cell) = CellState::decode(bytes) {
249 /// cell.status == CellStatus::Active as i32
250 /// } else {
251 /// false
252 /// }
253 /// }))?;
254 /// ```
255 fn find(&self, predicate: DocumentPredicate) -> Result<Vec<(String, Vec<u8>)>>;
256
257 /// Query documents by geohash prefix (proximity queries)
258 ///
259 /// Geohash is a hierarchical spatial index. Documents with the same prefix
260 /// are geographically close. Longer prefixes = smaller areas.
261 ///
262 /// # Arguments
263 ///
264 /// * `prefix` - Geohash prefix (e.g., "9q8y" for San Francisco area)
265 ///
266 /// # Returns
267 ///
268 /// Vector of matching (document_id, document_bytes) tuples
269 ///
270 /// # Example
271 ///
272 /// ```ignore
273 /// // Find all nodes within ~5km of a location
274 /// let nearby_nodes = nodes.query_geohash_prefix("9q8yy")?;
275 /// ```
276 ///
277 /// # Implementation Notes
278 ///
279 /// - For Automerge/redb: Uses prefix scan (requires geohash in key)
280 /// - For in-memory: Scans all documents (inefficient for large datasets)
281 fn query_geohash_prefix(&self, prefix: &str) -> Result<Vec<(String, Vec<u8>)>>;
282
283 /// Count documents in the collection
284 ///
285 /// # Returns
286 ///
287 /// Number of documents in collection
288 ///
289 /// # Example
290 ///
291 /// ```ignore
292 /// let cell_count = cells.count()?;
293 /// println!("Total cells: {}", cell_count);
294 /// ```
295 fn count(&self) -> Result<usize>;
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use std::collections::HashMap;
302 use std::sync::RwLock;
303
304 // Test that traits are object-safe (can be used as trait objects)
305 #[test]
306 fn test_storage_backend_is_object_safe() {
307 fn _assert_object_safe(_: &dyn StorageBackend) {}
308 }
309
310 #[test]
311 fn test_collection_is_object_safe() {
312 fn _assert_object_safe(_: &dyn Collection) {}
313 }
314
315 // --- In-memory Collection for testing ---
316
317 struct InMemCollection {
318 data: RwLock<HashMap<String, Vec<u8>>>,
319 }
320
321 impl InMemCollection {
322 fn new() -> Self {
323 Self {
324 data: RwLock::new(HashMap::new()),
325 }
326 }
327 }
328
329 impl Collection for InMemCollection {
330 fn upsert(&self, doc_id: &str, data: Vec<u8>) -> Result<()> {
331 self.data
332 .write()
333 .unwrap_or_else(|e| e.into_inner())
334 .insert(doc_id.to_string(), data);
335 Ok(())
336 }
337
338 fn get(&self, doc_id: &str) -> Result<Option<Vec<u8>>> {
339 Ok(self
340 .data
341 .read()
342 .unwrap_or_else(|e| e.into_inner())
343 .get(doc_id)
344 .cloned())
345 }
346
347 fn delete(&self, doc_id: &str) -> Result<()> {
348 self.data
349 .write()
350 .unwrap_or_else(|e| e.into_inner())
351 .remove(doc_id);
352 Ok(())
353 }
354
355 fn scan(&self) -> Result<Vec<(String, Vec<u8>)>> {
356 Ok(self
357 .data
358 .read()
359 .unwrap()
360 .iter()
361 .map(|(k, v)| (k.clone(), v.clone()))
362 .collect())
363 }
364
365 fn find(&self, predicate: DocumentPredicate) -> Result<Vec<(String, Vec<u8>)>> {
366 Ok(self
367 .data
368 .read()
369 .unwrap()
370 .iter()
371 .filter(|(_, v)| predicate(v))
372 .map(|(k, v)| (k.clone(), v.clone()))
373 .collect())
374 }
375
376 fn query_geohash_prefix(&self, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
377 Ok(self
378 .data
379 .read()
380 .unwrap()
381 .iter()
382 .filter(|(k, _)| k.starts_with(prefix))
383 .map(|(k, v)| (k.clone(), v.clone()))
384 .collect())
385 }
386
387 fn count(&self) -> Result<usize> {
388 Ok(self.data.read().unwrap_or_else(|e| e.into_inner()).len())
389 }
390 }
391
392 #[test]
393 fn test_collection_upsert_and_get() {
394 let col = InMemCollection::new();
395 col.upsert("doc-1", vec![1, 2, 3]).unwrap();
396
397 let result = col.get("doc-1").unwrap();
398 assert_eq!(result, Some(vec![1, 2, 3]));
399 }
400
401 #[test]
402 fn test_collection_get_missing() {
403 let col = InMemCollection::new();
404 assert_eq!(col.get("nonexistent").unwrap(), None);
405 }
406
407 #[test]
408 fn test_collection_upsert_overwrite() {
409 let col = InMemCollection::new();
410 col.upsert("doc-1", vec![1]).unwrap();
411 col.upsert("doc-1", vec![2]).unwrap();
412
413 assert_eq!(col.get("doc-1").unwrap(), Some(vec![2]));
414 assert_eq!(col.count().unwrap(), 1);
415 }
416
417 #[test]
418 fn test_collection_delete() {
419 let col = InMemCollection::new();
420 col.upsert("doc-1", vec![1]).unwrap();
421 col.delete("doc-1").unwrap();
422
423 assert_eq!(col.get("doc-1").unwrap(), None);
424 assert_eq!(col.count().unwrap(), 0);
425 }
426
427 #[test]
428 fn test_collection_delete_nonexistent_noop() {
429 let col = InMemCollection::new();
430 col.delete("nonexistent").unwrap(); // should not error
431 }
432
433 #[test]
434 fn test_collection_scan() {
435 let col = InMemCollection::new();
436 col.upsert("a", vec![1]).unwrap();
437 col.upsert("b", vec![2]).unwrap();
438
439 let mut results = col.scan().unwrap();
440 results.sort_by(|a, b| a.0.cmp(&b.0));
441 assert_eq!(results.len(), 2);
442 assert_eq!(results[0], ("a".to_string(), vec![1]));
443 assert_eq!(results[1], ("b".to_string(), vec![2]));
444 }
445
446 #[test]
447 fn test_collection_find() {
448 let col = InMemCollection::new();
449 col.upsert("big", vec![100, 200]).unwrap();
450 col.upsert("small", vec![1]).unwrap();
451
452 let results = col.find(Box::new(|bytes| bytes.len() > 1)).unwrap();
453 assert_eq!(results.len(), 1);
454 assert_eq!(results[0].0, "big");
455 }
456
457 #[test]
458 fn test_collection_query_geohash_prefix() {
459 let col = InMemCollection::new();
460 col.upsert("9q8yy_a", vec![1]).unwrap();
461 col.upsert("9q8yy_b", vec![2]).unwrap();
462 col.upsert("u4pru_c", vec![3]).unwrap();
463
464 let results = col.query_geohash_prefix("9q8yy").unwrap();
465 assert_eq!(results.len(), 2);
466 }
467
468 #[test]
469 fn test_collection_count() {
470 let col = InMemCollection::new();
471 assert_eq!(col.count().unwrap(), 0);
472
473 col.upsert("a", vec![1]).unwrap();
474 col.upsert("b", vec![2]).unwrap();
475 assert_eq!(col.count().unwrap(), 2);
476 }
477
478 // --- In-memory StorageBackend for testing ---
479
480 struct InMemBackend {
481 collections: RwLock<HashMap<String, Arc<InMemCollection>>>,
482 }
483
484 impl InMemBackend {
485 fn new() -> Self {
486 Self {
487 collections: RwLock::new(HashMap::new()),
488 }
489 }
490 }
491
492 impl StorageBackend for InMemBackend {
493 fn collection(&self, name: &str) -> Arc<dyn Collection> {
494 let mut cols = self.collections.write().unwrap_or_else(|e| e.into_inner());
495 cols.entry(name.to_string())
496 .or_insert_with(|| Arc::new(InMemCollection::new()))
497 .clone()
498 }
499
500 fn list_collections(&self) -> Vec<String> {
501 self.collections
502 .read()
503 .unwrap_or_else(|e| e.into_inner())
504 .keys()
505 .cloned()
506 .collect()
507 }
508
509 fn flush(&self) -> Result<()> {
510 Ok(())
511 }
512
513 fn close(self) -> Result<()> {
514 Ok(())
515 }
516 }
517
518 #[test]
519 fn test_backend_collection_create_and_reuse() {
520 let backend = InMemBackend::new();
521 let col1 = backend.collection("cells");
522 col1.upsert("c1", vec![1]).unwrap();
523
524 let col2 = backend.collection("cells");
525 assert_eq!(col2.get("c1").unwrap(), Some(vec![1]));
526 }
527
528 #[test]
529 fn test_backend_list_collections() {
530 let backend = InMemBackend::new();
531 let _ = backend.collection("cells");
532 let _ = backend.collection("nodes");
533
534 let mut names = backend.list_collections();
535 names.sort();
536 assert_eq!(names, vec!["cells", "nodes"]);
537 }
538
539 #[test]
540 fn test_backend_flush() {
541 let backend = InMemBackend::new();
542 assert!(backend.flush().is_ok());
543 }
544
545 #[test]
546 fn test_backend_close() {
547 let backend = InMemBackend::new();
548 assert!(backend.close().is_ok());
549 }
550}