Skip to main content

graphmind/persistence/
mod.rs

1//! # Persistence Layer
2//!
3//! ## ACID guarantees
4//!
5//! Database persistence is built around the ACID properties:
6//! - **Atomicity**: operations either fully complete or fully roll back — no partial writes
7//! - **Consistency**: the database always moves from one valid state to another
8//! - **Isolation**: concurrent transactions don't interfere with each other
9//! - **Durability**: once a write is committed, it survives crashes and power loss
10//!
11//! ## Write-Ahead Log (WAL)
12//!
13//! The WAL is the standard technique for durability. The idea: write the operation to a
14//! sequential log file BEFORE modifying the actual data. If the process crashes mid-write,
15//! the log can be replayed on recovery to reconstruct the correct state. This same pattern
16//! is used by PostgreSQL, SQLite, and RocksDB internally.
17//!
18//! ## RocksDB
19//!
20//! RocksDB is Facebook's embedded key-value store, evolved from Google's LevelDB. It uses
21//! an LSM-tree (Log-Structured Merge-tree) architecture optimized for write-heavy workloads:
22//! writes go to an in-memory buffer (memtable), which periodically flushes to sorted disk
23//! files (SSTables) that are compacted in the background. Column families provide logical
24//! separation within a single database instance — each is an independent LSM-tree.
25//!
26//! ## Multi-tenancy
27//!
28//! Multiple isolated "tenants" share one database process. Each tenant gets its own RocksDB
29//! column family (like a namespace), ensuring data isolation. Resource quotas (max nodes,
30//! max edges, storage limits) prevent any single tenant from monopolizing shared resources.
31//!
32//! ## PersistenceManager
33//!
34//! The `PersistenceManager` orchestrates WAL + RocksDB + TenantManager. All writes flow
35//! through the WAL first (for durability), then to RocksDB (for indexed storage). On
36//! startup, any WAL entries written after the last checkpoint are replayed to bring the
37//! in-memory graph state up to date.
38
39pub mod storage;
40pub mod tenant;
41pub mod wal;
42
43pub use storage::{PersistentStorage, StorageError, StorageResult};
44pub use tenant::{
45    AgentConfig, AutoEmbedConfig, LLMProvider, NLQConfig, ResourceQuotas, ResourceUsage, Tenant,
46    TenantError, TenantManager, TenantResult, ToolConfig,
47};
48pub use wal::{Wal, WalEntry, WalError, WalResult};
49
50use crate::graph::{Edge, GraphStore, Node, PropertyMap};
51use std::path::Path;
52use std::sync::Arc;
53// warn removed - was unused import causing compiler warning
54use tracing::info;
55
56/// Integrated persistence manager combining WAL, storage, and tenancy
57pub struct PersistenceManager {
58    /// Base path for all data
59    base_path: std::path::PathBuf,
60    /// RocksDB storage
61    storage: Arc<PersistentStorage>,
62    /// Write-Ahead Log
63    wal: Arc<std::sync::Mutex<Wal>>,
64    /// Tenant manager
65    tenants: Arc<TenantManager>,
66}
67
68impl PersistenceManager {
69    /// Create a new persistence manager
70    pub fn new(base_path: impl AsRef<Path>) -> Result<Self, PersistenceError> {
71        let base_path = base_path.as_ref().to_path_buf();
72
73        // Create subdirectories
74        let storage_path = base_path.join("data");
75        let wal_path = base_path.join("wal");
76        let vector_path = base_path.join("vectors");
77
78        std::fs::create_dir_all(&storage_path)?;
79        std::fs::create_dir_all(&wal_path)?;
80        std::fs::create_dir_all(&vector_path)?;
81
82        info!("Initializing persistence manager at: {:?}", base_path);
83
84        // Initialize storage
85        let storage = PersistentStorage::open(&storage_path)?;
86        info!("Storage initialized");
87
88        // Initialize WAL
89        let wal = Wal::new(&wal_path)?;
90        info!("WAL initialized");
91
92        // Initialize tenant manager
93        let tenants = TenantManager::new();
94        info!("Tenant manager initialized");
95
96        Ok(Self {
97            base_path,
98            storage: Arc::new(storage),
99            wal: Arc::new(std::sync::Mutex::new(wal)),
100            tenants: Arc::new(tenants),
101        })
102    }
103
104    /// Get tenant manager
105    pub fn tenants(&self) -> &TenantManager {
106        &self.tenants
107    }
108
109    /// Start the background indexer for a store
110    pub fn start_indexer(
111        &self,
112        store: &GraphStore,
113        receiver: tokio::sync::mpsc::UnboundedReceiver<crate::graph::event::IndexEvent>,
114    ) {
115        let vector_index = Arc::clone(&store.vector_index);
116        let property_index = Arc::clone(&store.property_index);
117        let tenant_manager = Arc::clone(&self.tenants);
118
119        tokio::spawn(async move {
120            GraphStore::start_background_indexer(
121                receiver,
122                vector_index,
123                property_index,
124                tenant_manager,
125            )
126            .await;
127        });
128    }
129
130    /// Persist a node creation
131    pub fn persist_create_node(&self, tenant: &str, node: &Node) -> Result<(), PersistenceError> {
132        // Check tenant quota
133        self.tenants.check_quota(tenant, "nodes")?;
134
135        // Serialize properties
136        let properties = bincode::serialize(&node.properties)?;
137
138        // Write to WAL
139        let entry = WalEntry::CreateNode {
140            tenant: tenant.to_string(),
141            node_id: node.id.as_u64(),
142            labels: node.labels.iter().map(|l| l.as_str().to_string()).collect(),
143            properties,
144        };
145        self.wal.lock().unwrap().append(entry)?;
146
147        // Write to storage
148        self.storage.put_node(tenant, node)?;
149
150        // Update usage
151        self.tenants.increment_usage(tenant, "nodes", 1)?;
152
153        Ok(())
154    }
155
156    /// Persist an edge creation
157    pub fn persist_create_edge(&self, tenant: &str, edge: &Edge) -> Result<(), PersistenceError> {
158        // Check tenant quota
159        self.tenants.check_quota(tenant, "edges")?;
160
161        // Serialize properties
162        let properties = bincode::serialize(&edge.properties)?;
163
164        // Write to WAL
165        let entry = WalEntry::CreateEdge {
166            tenant: tenant.to_string(),
167            edge_id: edge.id.as_u64(),
168            source: edge.source.as_u64(),
169            target: edge.target.as_u64(),
170            edge_type: edge.edge_type.as_str().to_string(),
171            properties,
172        };
173        self.wal.lock().unwrap().append(entry)?;
174
175        // Write to storage
176        self.storage.put_edge(tenant, edge)?;
177
178        // Update usage
179        self.tenants.increment_usage(tenant, "edges", 1)?;
180
181        Ok(())
182    }
183
184    /// Persist a node deletion
185    pub fn persist_delete_node(&self, tenant: &str, node_id: u64) -> Result<(), PersistenceError> {
186        // Write to WAL
187        let entry = WalEntry::DeleteNode {
188            tenant: tenant.to_string(),
189            node_id,
190        };
191        self.wal.lock().unwrap().append(entry)?;
192
193        // Write to storage
194        self.storage.delete_node(tenant, node_id)?;
195
196        // Update usage
197        self.tenants.decrement_usage(tenant, "nodes", 1)?;
198
199        Ok(())
200    }
201
202    /// Persist an edge deletion
203    pub fn persist_delete_edge(&self, tenant: &str, edge_id: u64) -> Result<(), PersistenceError> {
204        // Write to WAL
205        let entry = WalEntry::DeleteEdge {
206            tenant: tenant.to_string(),
207            edge_id,
208        };
209        self.wal.lock().unwrap().append(entry)?;
210
211        // Write to storage
212        self.storage.delete_edge(tenant, edge_id)?;
213
214        // Update usage
215        self.tenants.decrement_usage(tenant, "edges", 1)?;
216
217        Ok(())
218    }
219
220    /// Update node properties
221    pub fn persist_update_node_properties(
222        &self,
223        tenant: &str,
224        node_id: u64,
225        properties: &PropertyMap,
226    ) -> Result<(), PersistenceError> {
227        // Serialize properties
228        let properties_bytes = bincode::serialize(properties)?;
229
230        // Write to WAL
231        let entry = WalEntry::UpdateNodeProperties {
232            tenant: tenant.to_string(),
233            node_id,
234            properties: properties_bytes,
235        };
236        self.wal.lock().unwrap().append(entry)?;
237
238        // Note: Full node update would require getting the node first
239        // This is a simplified implementation
240
241        Ok(())
242    }
243
244    /// List all tenants that have persisted data in RocksDB
245    pub fn list_persisted_tenants(&self) -> Result<Vec<String>, PersistenceError> {
246        Ok(self.storage.list_persisted_tenants()?)
247    }
248
249    /// Recover from storage and WAL
250    pub fn recover(&self, tenant: &str) -> Result<(Vec<Node>, Vec<Edge>), PersistenceError> {
251        info!("Starting recovery for tenant: {}", tenant);
252
253        // Load nodes from storage
254        let nodes = self.storage.scan_nodes(tenant)?;
255        info!("Recovered {} nodes from storage", nodes.len());
256
257        // Load edges from storage
258        let edges = self.storage.scan_edges(tenant)?;
259        info!("Recovered {} edges from storage", edges.len());
260
261        // Update resource usage
262        self.tenants.increment_usage(tenant, "nodes", nodes.len())?;
263        self.tenants.increment_usage(tenant, "edges", edges.len())?;
264
265        Ok((nodes, edges))
266    }
267
268    /// Create a checkpoint
269    pub fn checkpoint(&self) -> Result<(), PersistenceError> {
270        info!("Creating checkpoint");
271
272        // Flush WAL
273        self.wal.lock().unwrap().flush()?;
274
275        // Flush storage
276        self.storage.flush()?;
277
278        // Create WAL checkpoint with the actual current sequence number
279        // Previously this was hardcoded to 0, which caused misleading output
280        // in the banking demo where WAL checkpoint always showed "sequence 0"
281        // even after writing thousands of entries
282        let sequence = self.wal.lock().unwrap().current_sequence();
283        self.wal.lock().unwrap().checkpoint(sequence)?;
284
285        info!("Checkpoint created successfully");
286
287        Ok(())
288    }
289
290    /// Flush all pending writes
291    pub fn flush(&self) -> Result<(), PersistenceError> {
292        self.wal.lock().unwrap().flush()?;
293        self.storage.flush()?;
294        Ok(())
295    }
296
297    /// Get storage reference
298    pub fn storage(&self) -> &PersistentStorage {
299        &self.storage
300    }
301
302    /// Save vector indices to disk
303    pub fn checkpoint_vectors(
304        &self,
305        vector_index: &crate::vector::VectorIndexManager,
306    ) -> Result<(), PersistenceError> {
307        let vector_path = self.base_path.join("vectors");
308        vector_index
309            .dump_all(&vector_path)
310            .map_err(|e| PersistenceError::Io(std::io::Error::other(e.to_string())))
311    }
312
313    /// Load vector indices from disk
314    pub fn recover_vectors(
315        &self,
316        vector_index: &crate::vector::VectorIndexManager,
317    ) -> Result<(), PersistenceError> {
318        let vector_path = self.base_path.join("vectors");
319        vector_index
320            .load_all(&vector_path)
321            .map_err(|e| PersistenceError::Io(std::io::Error::other(e.to_string())))
322    }
323}
324
325/// Persistence errors
326#[derive(Debug, thiserror::Error)]
327pub enum PersistenceError {
328    #[error("Storage error: {0}")]
329    Storage(#[from] StorageError),
330
331    #[error("WAL error: {0}")]
332    Wal(#[from] WalError),
333
334    #[error("Tenant error: {0}")]
335    Tenant(#[from] TenantError),
336
337    #[error("Serialization error: {0}")]
338    Serialization(#[from] bincode::Error),
339
340    #[error("I/O error: {0}")]
341    Io(#[from] std::io::Error),
342}
343
344pub type PersistenceResult<T> = Result<T, PersistenceError>;
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349    use crate::graph::{EdgeId, EdgeType, Label, NodeId, PropertyMap, PropertyValue};
350    use tempfile::TempDir;
351
352    #[test]
353    fn test_persistence_manager_creation() {
354        let temp_dir = TempDir::new().unwrap();
355        let manager = PersistenceManager::new(temp_dir.path()).unwrap();
356        assert!(manager.tenants().is_tenant_enabled("default"));
357    }
358
359    #[test]
360    fn test_persist_node() {
361        let temp_dir = TempDir::new().unwrap();
362        let manager = PersistenceManager::new(temp_dir.path()).unwrap();
363
364        let mut node = Node::new(NodeId::new(1), Label::new("Person"));
365        node.set_property("name", "Alice");
366
367        manager.persist_create_node("default", &node).unwrap();
368
369        // Verify it was persisted
370        let retrieved = manager.storage().get_node("default", 1).unwrap();
371        assert!(retrieved.is_some());
372    }
373
374    #[test]
375    fn test_recovery() {
376        let temp_dir = TempDir::new().unwrap();
377
378        // Create and persist some data
379        {
380            let manager = PersistenceManager::new(temp_dir.path()).unwrap();
381
382            for i in 1..=5 {
383                let node = Node::new(NodeId::new(i), Label::new("Person"));
384                manager.persist_create_node("default", &node).unwrap();
385            }
386
387            manager.flush().unwrap();
388        }
389
390        // Recover in a new manager instance
391        {
392            let manager = PersistenceManager::new(temp_dir.path()).unwrap();
393            let (nodes, _edges) = manager.recover("default").unwrap();
394            assert_eq!(nodes.len(), 5);
395        }
396    }
397
398    #[test]
399    #[cfg(target_os = "linux")]
400    fn test_vector_index_persistence() {
401        use crate::graph::NodeId;
402        use crate::vector::{DistanceMetric, VectorIndexManager};
403
404        let temp_dir = TempDir::new().unwrap();
405        let manager = PersistenceManager::new(temp_dir.path()).unwrap();
406
407        // Create and populate a vector index
408        let vim = VectorIndexManager::new();
409        vim.create_index("Person", "embedding", 3, DistanceMetric::Cosine)
410            .unwrap();
411        vim.add_vector("Person", "embedding", NodeId::new(1), &vec![1.0, 0.0, 0.0])
412            .unwrap();
413        vim.add_vector("Person", "embedding", NodeId::new(2), &vec![0.0, 1.0, 0.0])
414            .unwrap();
415        vim.add_vector("Person", "embedding", NodeId::new(3), &vec![0.0, 0.0, 1.0])
416            .unwrap();
417
418        // Checkpoint vectors to disk
419        manager.checkpoint_vectors(&vim).unwrap();
420
421        // Load vectors into a fresh manager
422        let vim2 = VectorIndexManager::new();
423        manager.recover_vectors(&vim2).unwrap();
424
425        // Verify search works after recovery
426        let results = vim2
427            .search("Person", "embedding", &[1.0, 0.1, 0.0], 2)
428            .unwrap();
429        assert_eq!(results.len(), 2);
430        assert_eq!(results[0].0, NodeId::new(1));
431    }
432
433    #[test]
434    fn test_quota_enforcement() {
435        let temp_dir = TempDir::new().unwrap();
436        let manager = PersistenceManager::new(temp_dir.path()).unwrap();
437
438        // Create tenant with small quota
439        let mut quotas = ResourceQuotas::default();
440        quotas.max_nodes = Some(3);
441        manager
442            .tenants()
443            .create_tenant(
444                "limited".to_string(),
445                "Limited Tenant".to_string(),
446                Some(quotas),
447            )
448            .unwrap();
449
450        // Should succeed for first 3 nodes
451        for i in 1..=3 {
452            let node = Node::new(NodeId::new(i), Label::new("Test"));
453            manager.persist_create_node("limited", &node).unwrap();
454        }
455
456        // 4th should fail
457        let node = Node::new(NodeId::new(4), Label::new("Test"));
458        let result = manager.persist_create_node("limited", &node);
459        assert!(result.is_err());
460    }
461
462    // ========== Batch 7: Additional Persistence Tests ==========
463
464    #[test]
465    fn test_persist_create_edge() {
466        let temp_dir = TempDir::new().unwrap();
467        let manager = PersistenceManager::new(temp_dir.path()).unwrap();
468
469        // Create two nodes first
470        let n1 = Node::new(NodeId::new(1), Label::new("Person"));
471        let n2 = Node::new(NodeId::new(2), Label::new("Person"));
472        manager.persist_create_node("default", &n1).unwrap();
473        manager.persist_create_node("default", &n2).unwrap();
474
475        // Create edge
476        let edge = Edge::new(
477            EdgeId::new(1),
478            NodeId::new(1),
479            NodeId::new(2),
480            EdgeType::new("KNOWS"),
481        );
482        let result = manager.persist_create_edge("default", &edge);
483        assert!(result.is_ok());
484    }
485
486    #[test]
487    fn test_persist_delete_node() {
488        let temp_dir = TempDir::new().unwrap();
489        let manager = PersistenceManager::new(temp_dir.path()).unwrap();
490
491        let node = Node::new(NodeId::new(1), Label::new("Person"));
492        manager.persist_create_node("default", &node).unwrap();
493
494        let result = manager.persist_delete_node("default", 1);
495        assert!(result.is_ok());
496    }
497
498    #[test]
499    fn test_persist_delete_edge() {
500        let temp_dir = TempDir::new().unwrap();
501        let manager = PersistenceManager::new(temp_dir.path()).unwrap();
502
503        let n1 = Node::new(NodeId::new(1), Label::new("A"));
504        let n2 = Node::new(NodeId::new(2), Label::new("B"));
505        manager.persist_create_node("default", &n1).unwrap();
506        manager.persist_create_node("default", &n2).unwrap();
507
508        let edge = Edge::new(
509            EdgeId::new(1),
510            NodeId::new(1),
511            NodeId::new(2),
512            EdgeType::new("E"),
513        );
514        manager.persist_create_edge("default", &edge).unwrap();
515
516        let result = manager.persist_delete_edge("default", 1);
517        assert!(result.is_ok());
518    }
519
520    #[test]
521    fn test_persist_update_node_properties() {
522        let temp_dir = TempDir::new().unwrap();
523        let manager = PersistenceManager::new(temp_dir.path()).unwrap();
524
525        let node = Node::new(NodeId::new(1), Label::new("Person"));
526        manager.persist_create_node("default", &node).unwrap();
527
528        let mut props = PropertyMap::new();
529        props.insert(
530            "name".to_string(),
531            PropertyValue::String("Alice".to_string()),
532        );
533
534        let result = manager.persist_update_node_properties("default", 1, &props);
535        assert!(result.is_ok());
536    }
537
538    #[test]
539    fn test_list_persisted_tenants() {
540        let temp_dir = TempDir::new().unwrap();
541        let manager = PersistenceManager::new(temp_dir.path()).unwrap();
542
543        // Persist to default tenant
544        let node = Node::new(NodeId::new(1), Label::new("Test"));
545        manager.persist_create_node("default", &node).unwrap();
546
547        let tenants = manager.list_persisted_tenants();
548        assert!(tenants.is_ok());
549    }
550}