1pub mod storage;
40pub mod tenant;
41pub mod wal;
42
43pub use storage::{PersistentStorage, StorageError, StorageResult};
44pub use tenant::{
45 AgentConfig, AutoEmbedConfig, LLMProvider, NLQConfig, ResourceQuotas, ResourceUsage, Tenant,
46 TenantError, TenantManager, TenantResult, ToolConfig,
47};
48pub use wal::{Wal, WalEntry, WalError, WalResult};
49
50use crate::graph::{Edge, GraphStore, Node, PropertyMap};
51use std::path::Path;
52use std::sync::Arc;
53use tracing::info;
55
56pub struct PersistenceManager {
58 base_path: std::path::PathBuf,
60 storage: Arc<PersistentStorage>,
62 wal: Arc<std::sync::Mutex<Wal>>,
64 tenants: Arc<TenantManager>,
66}
67
68impl PersistenceManager {
69 pub fn new(base_path: impl AsRef<Path>) -> Result<Self, PersistenceError> {
71 let base_path = base_path.as_ref().to_path_buf();
72
73 let storage_path = base_path.join("data");
75 let wal_path = base_path.join("wal");
76 let vector_path = base_path.join("vectors");
77
78 std::fs::create_dir_all(&storage_path)?;
79 std::fs::create_dir_all(&wal_path)?;
80 std::fs::create_dir_all(&vector_path)?;
81
82 info!("Initializing persistence manager at: {:?}", base_path);
83
84 let storage = PersistentStorage::open(&storage_path)?;
86 info!("Storage initialized");
87
88 let wal = Wal::new(&wal_path)?;
90 info!("WAL initialized");
91
92 let tenants = TenantManager::new();
94 info!("Tenant manager initialized");
95
96 Ok(Self {
97 base_path,
98 storage: Arc::new(storage),
99 wal: Arc::new(std::sync::Mutex::new(wal)),
100 tenants: Arc::new(tenants),
101 })
102 }
103
104 pub fn tenants(&self) -> &TenantManager {
106 &self.tenants
107 }
108
109 pub fn start_indexer(
111 &self,
112 store: &GraphStore,
113 receiver: tokio::sync::mpsc::UnboundedReceiver<crate::graph::event::IndexEvent>,
114 ) {
115 let vector_index = Arc::clone(&store.vector_index);
116 let property_index = Arc::clone(&store.property_index);
117 let tenant_manager = Arc::clone(&self.tenants);
118
119 tokio::spawn(async move {
120 GraphStore::start_background_indexer(
121 receiver,
122 vector_index,
123 property_index,
124 tenant_manager,
125 )
126 .await;
127 });
128 }
129
130 pub fn persist_create_node(&self, tenant: &str, node: &Node) -> Result<(), PersistenceError> {
132 self.tenants.check_quota(tenant, "nodes")?;
134
135 let properties = bincode::serialize(&node.properties)?;
137
138 let entry = WalEntry::CreateNode {
140 tenant: tenant.to_string(),
141 node_id: node.id.as_u64(),
142 labels: node.labels.iter().map(|l| l.as_str().to_string()).collect(),
143 properties,
144 };
145 self.wal.lock().unwrap().append(entry)?;
146
147 self.storage.put_node(tenant, node)?;
149
150 self.tenants.increment_usage(tenant, "nodes", 1)?;
152
153 Ok(())
154 }
155
156 pub fn persist_create_edge(&self, tenant: &str, edge: &Edge) -> Result<(), PersistenceError> {
158 self.tenants.check_quota(tenant, "edges")?;
160
161 let properties = bincode::serialize(&edge.properties)?;
163
164 let entry = WalEntry::CreateEdge {
166 tenant: tenant.to_string(),
167 edge_id: edge.id.as_u64(),
168 source: edge.source.as_u64(),
169 target: edge.target.as_u64(),
170 edge_type: edge.edge_type.as_str().to_string(),
171 properties,
172 };
173 self.wal.lock().unwrap().append(entry)?;
174
175 self.storage.put_edge(tenant, edge)?;
177
178 self.tenants.increment_usage(tenant, "edges", 1)?;
180
181 Ok(())
182 }
183
184 pub fn persist_delete_node(&self, tenant: &str, node_id: u64) -> Result<(), PersistenceError> {
186 let entry = WalEntry::DeleteNode {
188 tenant: tenant.to_string(),
189 node_id,
190 };
191 self.wal.lock().unwrap().append(entry)?;
192
193 self.storage.delete_node(tenant, node_id)?;
195
196 self.tenants.decrement_usage(tenant, "nodes", 1)?;
198
199 Ok(())
200 }
201
202 pub fn persist_delete_edge(&self, tenant: &str, edge_id: u64) -> Result<(), PersistenceError> {
204 let entry = WalEntry::DeleteEdge {
206 tenant: tenant.to_string(),
207 edge_id,
208 };
209 self.wal.lock().unwrap().append(entry)?;
210
211 self.storage.delete_edge(tenant, edge_id)?;
213
214 self.tenants.decrement_usage(tenant, "edges", 1)?;
216
217 Ok(())
218 }
219
220 pub fn persist_update_node_properties(
222 &self,
223 tenant: &str,
224 node_id: u64,
225 properties: &PropertyMap,
226 ) -> Result<(), PersistenceError> {
227 let properties_bytes = bincode::serialize(properties)?;
229
230 let entry = WalEntry::UpdateNodeProperties {
232 tenant: tenant.to_string(),
233 node_id,
234 properties: properties_bytes,
235 };
236 self.wal.lock().unwrap().append(entry)?;
237
238 Ok(())
242 }
243
244 pub fn list_persisted_tenants(&self) -> Result<Vec<String>, PersistenceError> {
246 Ok(self.storage.list_persisted_tenants()?)
247 }
248
249 pub fn recover(&self, tenant: &str) -> Result<(Vec<Node>, Vec<Edge>), PersistenceError> {
251 info!("Starting recovery for tenant: {}", tenant);
252
253 let nodes = self.storage.scan_nodes(tenant)?;
255 info!("Recovered {} nodes from storage", nodes.len());
256
257 let edges = self.storage.scan_edges(tenant)?;
259 info!("Recovered {} edges from storage", edges.len());
260
261 self.tenants.increment_usage(tenant, "nodes", nodes.len())?;
263 self.tenants.increment_usage(tenant, "edges", edges.len())?;
264
265 Ok((nodes, edges))
266 }
267
268 pub fn checkpoint(&self) -> Result<(), PersistenceError> {
270 info!("Creating checkpoint");
271
272 self.wal.lock().unwrap().flush()?;
274
275 self.storage.flush()?;
277
278 let sequence = self.wal.lock().unwrap().current_sequence();
283 self.wal.lock().unwrap().checkpoint(sequence)?;
284
285 info!("Checkpoint created successfully");
286
287 Ok(())
288 }
289
290 pub fn flush(&self) -> Result<(), PersistenceError> {
292 self.wal.lock().unwrap().flush()?;
293 self.storage.flush()?;
294 Ok(())
295 }
296
297 pub fn storage(&self) -> &PersistentStorage {
299 &self.storage
300 }
301
302 pub fn checkpoint_vectors(
304 &self,
305 vector_index: &crate::vector::VectorIndexManager,
306 ) -> Result<(), PersistenceError> {
307 let vector_path = self.base_path.join("vectors");
308 vector_index
309 .dump_all(&vector_path)
310 .map_err(|e| PersistenceError::Io(std::io::Error::other(e.to_string())))
311 }
312
313 pub fn recover_vectors(
315 &self,
316 vector_index: &crate::vector::VectorIndexManager,
317 ) -> Result<(), PersistenceError> {
318 let vector_path = self.base_path.join("vectors");
319 vector_index
320 .load_all(&vector_path)
321 .map_err(|e| PersistenceError::Io(std::io::Error::other(e.to_string())))
322 }
323}
324
325#[derive(Debug, thiserror::Error)]
327pub enum PersistenceError {
328 #[error("Storage error: {0}")]
329 Storage(#[from] StorageError),
330
331 #[error("WAL error: {0}")]
332 Wal(#[from] WalError),
333
334 #[error("Tenant error: {0}")]
335 Tenant(#[from] TenantError),
336
337 #[error("Serialization error: {0}")]
338 Serialization(#[from] bincode::Error),
339
340 #[error("I/O error: {0}")]
341 Io(#[from] std::io::Error),
342}
343
344pub type PersistenceResult<T> = Result<T, PersistenceError>;
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349 use crate::graph::{EdgeId, EdgeType, Label, NodeId, PropertyMap, PropertyValue};
350 use tempfile::TempDir;
351
352 #[test]
353 fn test_persistence_manager_creation() {
354 let temp_dir = TempDir::new().unwrap();
355 let manager = PersistenceManager::new(temp_dir.path()).unwrap();
356 assert!(manager.tenants().is_tenant_enabled("default"));
357 }
358
359 #[test]
360 fn test_persist_node() {
361 let temp_dir = TempDir::new().unwrap();
362 let manager = PersistenceManager::new(temp_dir.path()).unwrap();
363
364 let mut node = Node::new(NodeId::new(1), Label::new("Person"));
365 node.set_property("name", "Alice");
366
367 manager.persist_create_node("default", &node).unwrap();
368
369 let retrieved = manager.storage().get_node("default", 1).unwrap();
371 assert!(retrieved.is_some());
372 }
373
374 #[test]
375 fn test_recovery() {
376 let temp_dir = TempDir::new().unwrap();
377
378 {
380 let manager = PersistenceManager::new(temp_dir.path()).unwrap();
381
382 for i in 1..=5 {
383 let node = Node::new(NodeId::new(i), Label::new("Person"));
384 manager.persist_create_node("default", &node).unwrap();
385 }
386
387 manager.flush().unwrap();
388 }
389
390 {
392 let manager = PersistenceManager::new(temp_dir.path()).unwrap();
393 let (nodes, _edges) = manager.recover("default").unwrap();
394 assert_eq!(nodes.len(), 5);
395 }
396 }
397
398 #[test]
399 #[cfg(target_os = "linux")]
400 fn test_vector_index_persistence() {
401 use crate::graph::NodeId;
402 use crate::vector::{DistanceMetric, VectorIndexManager};
403
404 let temp_dir = TempDir::new().unwrap();
405 let manager = PersistenceManager::new(temp_dir.path()).unwrap();
406
407 let vim = VectorIndexManager::new();
409 vim.create_index("Person", "embedding", 3, DistanceMetric::Cosine)
410 .unwrap();
411 vim.add_vector("Person", "embedding", NodeId::new(1), &vec![1.0, 0.0, 0.0])
412 .unwrap();
413 vim.add_vector("Person", "embedding", NodeId::new(2), &vec![0.0, 1.0, 0.0])
414 .unwrap();
415 vim.add_vector("Person", "embedding", NodeId::new(3), &vec![0.0, 0.0, 1.0])
416 .unwrap();
417
418 manager.checkpoint_vectors(&vim).unwrap();
420
421 let vim2 = VectorIndexManager::new();
423 manager.recover_vectors(&vim2).unwrap();
424
425 let results = vim2
427 .search("Person", "embedding", &[1.0, 0.1, 0.0], 2)
428 .unwrap();
429 assert_eq!(results.len(), 2);
430 assert_eq!(results[0].0, NodeId::new(1));
431 }
432
433 #[test]
434 fn test_quota_enforcement() {
435 let temp_dir = TempDir::new().unwrap();
436 let manager = PersistenceManager::new(temp_dir.path()).unwrap();
437
438 let mut quotas = ResourceQuotas::default();
440 quotas.max_nodes = Some(3);
441 manager
442 .tenants()
443 .create_tenant(
444 "limited".to_string(),
445 "Limited Tenant".to_string(),
446 Some(quotas),
447 )
448 .unwrap();
449
450 for i in 1..=3 {
452 let node = Node::new(NodeId::new(i), Label::new("Test"));
453 manager.persist_create_node("limited", &node).unwrap();
454 }
455
456 let node = Node::new(NodeId::new(4), Label::new("Test"));
458 let result = manager.persist_create_node("limited", &node);
459 assert!(result.is_err());
460 }
461
462 #[test]
465 fn test_persist_create_edge() {
466 let temp_dir = TempDir::new().unwrap();
467 let manager = PersistenceManager::new(temp_dir.path()).unwrap();
468
469 let n1 = Node::new(NodeId::new(1), Label::new("Person"));
471 let n2 = Node::new(NodeId::new(2), Label::new("Person"));
472 manager.persist_create_node("default", &n1).unwrap();
473 manager.persist_create_node("default", &n2).unwrap();
474
475 let edge = Edge::new(
477 EdgeId::new(1),
478 NodeId::new(1),
479 NodeId::new(2),
480 EdgeType::new("KNOWS"),
481 );
482 let result = manager.persist_create_edge("default", &edge);
483 assert!(result.is_ok());
484 }
485
486 #[test]
487 fn test_persist_delete_node() {
488 let temp_dir = TempDir::new().unwrap();
489 let manager = PersistenceManager::new(temp_dir.path()).unwrap();
490
491 let node = Node::new(NodeId::new(1), Label::new("Person"));
492 manager.persist_create_node("default", &node).unwrap();
493
494 let result = manager.persist_delete_node("default", 1);
495 assert!(result.is_ok());
496 }
497
498 #[test]
499 fn test_persist_delete_edge() {
500 let temp_dir = TempDir::new().unwrap();
501 let manager = PersistenceManager::new(temp_dir.path()).unwrap();
502
503 let n1 = Node::new(NodeId::new(1), Label::new("A"));
504 let n2 = Node::new(NodeId::new(2), Label::new("B"));
505 manager.persist_create_node("default", &n1).unwrap();
506 manager.persist_create_node("default", &n2).unwrap();
507
508 let edge = Edge::new(
509 EdgeId::new(1),
510 NodeId::new(1),
511 NodeId::new(2),
512 EdgeType::new("E"),
513 );
514 manager.persist_create_edge("default", &edge).unwrap();
515
516 let result = manager.persist_delete_edge("default", 1);
517 assert!(result.is_ok());
518 }
519
520 #[test]
521 fn test_persist_update_node_properties() {
522 let temp_dir = TempDir::new().unwrap();
523 let manager = PersistenceManager::new(temp_dir.path()).unwrap();
524
525 let node = Node::new(NodeId::new(1), Label::new("Person"));
526 manager.persist_create_node("default", &node).unwrap();
527
528 let mut props = PropertyMap::new();
529 props.insert(
530 "name".to_string(),
531 PropertyValue::String("Alice".to_string()),
532 );
533
534 let result = manager.persist_update_node_properties("default", 1, &props);
535 assert!(result.is_ok());
536 }
537
538 #[test]
539 fn test_list_persisted_tenants() {
540 let temp_dir = TempDir::new().unwrap();
541 let manager = PersistenceManager::new(temp_dir.path()).unwrap();
542
543 let node = Node::new(NodeId::new(1), Label::new("Test"));
545 manager.persist_create_node("default", &node).unwrap();
546
547 let tenants = manager.list_persisted_tenants();
548 assert!(tenants.is_ok());
549 }
550}