grafeo_engine/database.rs
1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4
5use std::path::Path;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10use grafeo_adapters::storage::wal::{WalConfig, WalManager, WalRecord, WalRecovery};
11use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::lpg::LpgStore;
14#[cfg(feature = "rdf")]
15use grafeo_core::graph::rdf::RdfStore;
16
17use crate::config::Config;
18use crate::query::cache::QueryCache;
19use crate::session::Session;
20use crate::transaction::TransactionManager;
21
22/// Your handle to a Grafeo database.
23///
24/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
25/// quick experiments, or [`open()`](Self::open) for persistent storage.
26/// Then grab a [`session()`](Self::session) to start querying.
27///
28/// # Examples
29///
30/// ```
31/// use grafeo_engine::GrafeoDB;
32///
33/// // Quick in-memory database
34/// let db = GrafeoDB::new_in_memory();
35///
36/// // Add some data
37/// db.create_node(&["Person"]);
38///
39/// // Query it
40/// let session = db.session();
41/// let result = session.execute("MATCH (p:Person) RETURN p")?;
42/// # Ok::<(), grafeo_common::utils::error::Error>(())
43/// ```
44pub struct GrafeoDB {
45 /// Database configuration.
46 config: Config,
47 /// The underlying graph store.
48 store: Arc<LpgStore>,
49 /// RDF triple store (if RDF feature is enabled).
50 #[cfg(feature = "rdf")]
51 rdf_store: Arc<RdfStore>,
52 /// Transaction manager.
53 tx_manager: Arc<TransactionManager>,
54 /// Unified buffer manager.
55 buffer_manager: Arc<BufferManager>,
56 /// Write-ahead log manager (if durability is enabled).
57 wal: Option<Arc<WalManager>>,
58 /// Query cache for parsed and optimized plans.
59 query_cache: Arc<QueryCache>,
60 /// Whether the database is open.
61 is_open: RwLock<bool>,
62}
63
64impl GrafeoDB {
65 /// Creates an in-memory database - fast to create, gone when dropped.
66 ///
67 /// Use this for tests, experiments, or when you don't need persistence.
68 /// For data that survives restarts, use [`open()`](Self::open) instead.
69 ///
70 /// # Examples
71 ///
72 /// ```
73 /// use grafeo_engine::GrafeoDB;
74 ///
75 /// let db = GrafeoDB::new_in_memory();
76 /// let session = db.session();
77 /// session.execute("INSERT (:Person {name: 'Alice'})")?;
78 /// # Ok::<(), grafeo_common::utils::error::Error>(())
79 /// ```
80 #[must_use]
81 pub fn new_in_memory() -> Self {
82 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
83 }
84
85 /// Opens a database at the given path, creating it if it doesn't exist.
86 ///
87 /// If you've used this path before, Grafeo recovers your data from the
88 /// write-ahead log automatically. First open on a new path creates an
89 /// empty database.
90 ///
91 /// # Errors
92 ///
93 /// Returns an error if the path isn't writable or recovery fails.
94 ///
95 /// # Examples
96 ///
97 /// ```no_run
98 /// use grafeo_engine::GrafeoDB;
99 ///
100 /// let db = GrafeoDB::open("./my_social_network")?;
101 /// # Ok::<(), grafeo_common::utils::error::Error>(())
102 /// ```
103 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
104 Self::with_config(Config::persistent(path.as_ref()))
105 }
106
107 /// Creates a database with custom configuration.
108 ///
109 /// Use this when you need fine-grained control over memory limits,
110 /// thread counts, or persistence settings. For most cases,
111 /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
112 /// are simpler.
113 ///
114 /// # Errors
115 ///
116 /// Returns an error if the database can't be created or recovery fails.
117 ///
118 /// # Examples
119 ///
120 /// ```
121 /// use grafeo_engine::{GrafeoDB, Config};
122 ///
123 /// // In-memory with a 512MB limit
124 /// let config = Config::in_memory()
125 /// .with_memory_limit(512 * 1024 * 1024);
126 ///
127 /// let db = GrafeoDB::with_config(config)?;
128 /// # Ok::<(), grafeo_common::utils::error::Error>(())
129 /// ```
130 pub fn with_config(config: Config) -> Result<Self> {
131 let store = Arc::new(LpgStore::new());
132 #[cfg(feature = "rdf")]
133 let rdf_store = Arc::new(RdfStore::new());
134 let tx_manager = Arc::new(TransactionManager::new());
135
136 // Create buffer manager with configured limits
137 let buffer_config = BufferManagerConfig {
138 budget: config.memory_limit.unwrap_or_else(|| {
139 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
140 }),
141 spill_path: config
142 .spill_path
143 .clone()
144 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
145 ..BufferManagerConfig::default()
146 };
147 let buffer_manager = BufferManager::new(buffer_config);
148
149 // Initialize WAL if persistence is enabled
150 let wal = if config.wal_enabled {
151 if let Some(ref db_path) = config.path {
152 // Create database directory if it doesn't exist
153 std::fs::create_dir_all(db_path)?;
154
155 let wal_path = db_path.join("wal");
156
157 // Check if WAL exists and recover if needed
158 if wal_path.exists() {
159 let recovery = WalRecovery::new(&wal_path);
160 let records = recovery.recover()?;
161 Self::apply_wal_records(&store, &records)?;
162 }
163
164 // Open/create WAL manager
165 let wal_config = WalConfig::default();
166 let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
167 Some(Arc::new(wal_manager))
168 } else {
169 None
170 }
171 } else {
172 None
173 };
174
175 // Create query cache with default capacity (1000 queries)
176 let query_cache = Arc::new(QueryCache::default());
177
178 Ok(Self {
179 config,
180 store,
181 #[cfg(feature = "rdf")]
182 rdf_store,
183 tx_manager,
184 buffer_manager,
185 wal,
186 query_cache,
187 is_open: RwLock::new(true),
188 })
189 }
190
191 /// Applies WAL records to restore the database state.
192 fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
193 for record in records {
194 match record {
195 WalRecord::CreateNode { id, labels } => {
196 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
197 store.create_node_with_id(*id, &label_refs);
198 }
199 WalRecord::DeleteNode { id } => {
200 store.delete_node(*id);
201 }
202 WalRecord::CreateEdge {
203 id,
204 src,
205 dst,
206 edge_type,
207 } => {
208 store.create_edge_with_id(*id, *src, *dst, edge_type);
209 }
210 WalRecord::DeleteEdge { id } => {
211 store.delete_edge(*id);
212 }
213 WalRecord::SetNodeProperty { id, key, value } => {
214 store.set_node_property(*id, key, value.clone());
215 }
216 WalRecord::SetEdgeProperty { id, key, value } => {
217 store.set_edge_property(*id, key, value.clone());
218 }
219 WalRecord::AddNodeLabel { id, label } => {
220 store.add_label(*id, label);
221 }
222 WalRecord::RemoveNodeLabel { id, label } => {
223 store.remove_label(*id, label);
224 }
225 WalRecord::TxCommit { .. }
226 | WalRecord::TxAbort { .. }
227 | WalRecord::Checkpoint { .. } => {
228 // Transaction control records don't need replay action
229 // (recovery already filtered to only committed transactions)
230 }
231 }
232 }
233 Ok(())
234 }
235
236 /// Opens a new session for running queries.
237 ///
238 /// Sessions are cheap to create - spin up as many as you need. Each
239 /// gets its own transaction context, so concurrent sessions won't
240 /// block each other on reads.
241 ///
242 /// # Examples
243 ///
244 /// ```
245 /// use grafeo_engine::GrafeoDB;
246 ///
247 /// let db = GrafeoDB::new_in_memory();
248 /// let session = db.session();
249 ///
250 /// // Run queries through the session
251 /// let result = session.execute("MATCH (n) RETURN count(n)")?;
252 /// # Ok::<(), grafeo_common::utils::error::Error>(())
253 /// ```
254 #[must_use]
255 pub fn session(&self) -> Session {
256 #[cfg(feature = "rdf")]
257 {
258 Session::with_rdf_store_and_adaptive(
259 Arc::clone(&self.store),
260 Arc::clone(&self.rdf_store),
261 Arc::clone(&self.tx_manager),
262 Arc::clone(&self.query_cache),
263 self.config.adaptive.clone(),
264 self.config.factorized_execution,
265 )
266 }
267 #[cfg(not(feature = "rdf"))]
268 {
269 Session::with_adaptive(
270 Arc::clone(&self.store),
271 Arc::clone(&self.tx_manager),
272 Arc::clone(&self.query_cache),
273 self.config.adaptive.clone(),
274 self.config.factorized_execution,
275 )
276 }
277 }
278
279 /// Returns the adaptive execution configuration.
280 #[must_use]
281 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
282 &self.config.adaptive
283 }
284
285 /// Runs a query directly on the database.
286 ///
287 /// A convenience method that creates a temporary session behind the
288 /// scenes. If you're running multiple queries, grab a
289 /// [`session()`](Self::session) instead to avoid the overhead.
290 ///
291 /// # Errors
292 ///
293 /// Returns an error if parsing or execution fails.
294 pub fn execute(&self, query: &str) -> Result<QueryResult> {
295 let session = self.session();
296 session.execute(query)
297 }
298
299 /// Executes a query with parameters and returns the result.
300 ///
301 /// # Errors
302 ///
303 /// Returns an error if the query fails.
304 pub fn execute_with_params(
305 &self,
306 query: &str,
307 params: std::collections::HashMap<String, grafeo_common::types::Value>,
308 ) -> Result<QueryResult> {
309 let session = self.session();
310 session.execute_with_params(query, params)
311 }
312
313 /// Executes a Cypher query and returns the result.
314 ///
315 /// # Errors
316 ///
317 /// Returns an error if the query fails.
318 #[cfg(feature = "cypher")]
319 pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
320 let session = self.session();
321 session.execute_cypher(query)
322 }
323
324 /// Executes a Cypher query with parameters and returns the result.
325 ///
326 /// # Errors
327 ///
328 /// Returns an error if the query fails.
329 #[cfg(feature = "cypher")]
330 pub fn execute_cypher_with_params(
331 &self,
332 query: &str,
333 params: std::collections::HashMap<String, grafeo_common::types::Value>,
334 ) -> Result<QueryResult> {
335 use crate::query::processor::{QueryLanguage, QueryProcessor};
336
337 // Create processor
338 let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
339 processor.process(query, QueryLanguage::Cypher, Some(¶ms))
340 }
341
342 /// Executes a Gremlin query and returns the result.
343 ///
344 /// # Errors
345 ///
346 /// Returns an error if the query fails.
347 #[cfg(feature = "gremlin")]
348 pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
349 let session = self.session();
350 session.execute_gremlin(query)
351 }
352
353 /// Executes a Gremlin query with parameters and returns the result.
354 ///
355 /// # Errors
356 ///
357 /// Returns an error if the query fails.
358 #[cfg(feature = "gremlin")]
359 pub fn execute_gremlin_with_params(
360 &self,
361 query: &str,
362 params: std::collections::HashMap<String, grafeo_common::types::Value>,
363 ) -> Result<QueryResult> {
364 let session = self.session();
365 session.execute_gremlin_with_params(query, params)
366 }
367
368 /// Executes a GraphQL query and returns the result.
369 ///
370 /// # Errors
371 ///
372 /// Returns an error if the query fails.
373 #[cfg(feature = "graphql")]
374 pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
375 let session = self.session();
376 session.execute_graphql(query)
377 }
378
379 /// Executes a GraphQL query with parameters and returns the result.
380 ///
381 /// # Errors
382 ///
383 /// Returns an error if the query fails.
384 #[cfg(feature = "graphql")]
385 pub fn execute_graphql_with_params(
386 &self,
387 query: &str,
388 params: std::collections::HashMap<String, grafeo_common::types::Value>,
389 ) -> Result<QueryResult> {
390 let session = self.session();
391 session.execute_graphql_with_params(query, params)
392 }
393
394 /// Executes a SPARQL query and returns the result.
395 ///
396 /// SPARQL queries operate on the RDF triple store.
397 ///
398 /// # Errors
399 ///
400 /// Returns an error if the query fails.
401 ///
402 /// # Examples
403 ///
404 /// ```ignore
405 /// use grafeo_engine::GrafeoDB;
406 ///
407 /// let db = GrafeoDB::new_in_memory();
408 /// let result = db.execute_sparql("SELECT ?s ?p ?o WHERE { ?s ?p ?o }")?;
409 /// ```
410 #[cfg(all(feature = "sparql", feature = "rdf"))]
411 pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
412 use crate::query::{
413 Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
414 };
415
416 // Parse and translate the SPARQL query to a logical plan
417 let logical_plan = sparql_translator::translate(query)?;
418
419 // Optimize the plan
420 let optimizer = Optimizer::new();
421 let optimized_plan = optimizer.optimize(logical_plan)?;
422
423 // Convert to physical plan using RDF planner
424 let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
425 let mut physical_plan = planner.plan(&optimized_plan)?;
426
427 // Execute the plan
428 let executor = Executor::with_columns(physical_plan.columns.clone());
429 executor.execute(physical_plan.operator.as_mut())
430 }
431
432 /// Returns the RDF store.
433 ///
434 /// This provides direct access to the RDF store for triple operations.
435 #[cfg(feature = "rdf")]
436 #[must_use]
437 pub fn rdf_store(&self) -> &Arc<RdfStore> {
438 &self.rdf_store
439 }
440
441 /// Executes a query and returns a single scalar value.
442 ///
443 /// # Errors
444 ///
445 /// Returns an error if the query fails or doesn't return exactly one row.
446 pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
447 let result = self.execute(query)?;
448 result.scalar()
449 }
450
451 /// Returns the configuration.
452 #[must_use]
453 pub fn config(&self) -> &Config {
454 &self.config
455 }
456
457 /// Returns the underlying store.
458 ///
459 /// This provides direct access to the LPG store for algorithm implementations.
460 #[must_use]
461 pub fn store(&self) -> &Arc<LpgStore> {
462 &self.store
463 }
464
465 /// Returns the buffer manager for memory-aware operations.
466 #[must_use]
467 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
468 &self.buffer_manager
469 }
470
471 /// Closes the database, flushing all pending writes.
472 ///
473 /// For persistent databases, this ensures everything is safely on disk.
474 /// Called automatically when the database is dropped, but you can call
475 /// it explicitly if you need to guarantee durability at a specific point.
476 ///
477 /// # Errors
478 ///
479 /// Returns an error if the WAL can't be flushed (check disk space/permissions).
480 pub fn close(&self) -> Result<()> {
481 let mut is_open = self.is_open.write();
482 if !*is_open {
483 return Ok(());
484 }
485
486 // Commit and checkpoint WAL
487 if let Some(ref wal) = self.wal {
488 let epoch = self.store.current_epoch();
489
490 // Use the last assigned transaction ID, or create a checkpoint-only tx
491 let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
492 // No transactions have been started; begin one for checkpoint
493 self.tx_manager.begin()
494 });
495
496 // Log a TxCommit to mark all pending records as committed
497 wal.log(&WalRecord::TxCommit {
498 tx_id: checkpoint_tx,
499 })?;
500
501 // Then checkpoint
502 wal.checkpoint(checkpoint_tx, epoch)?;
503 wal.sync()?;
504 }
505
506 *is_open = false;
507 Ok(())
508 }
509
510 /// Returns the WAL manager if available.
511 #[must_use]
512 pub fn wal(&self) -> Option<&Arc<WalManager>> {
513 self.wal.as_ref()
514 }
515
516 /// Logs a WAL record if WAL is enabled.
517 fn log_wal(&self, record: &WalRecord) -> Result<()> {
518 if let Some(ref wal) = self.wal {
519 wal.log(record)?;
520 }
521 Ok(())
522 }
523
524 /// Returns the number of nodes in the database.
525 #[must_use]
526 pub fn node_count(&self) -> usize {
527 self.store.node_count()
528 }
529
530 /// Returns the number of edges in the database.
531 #[must_use]
532 pub fn edge_count(&self) -> usize {
533 self.store.edge_count()
534 }
535
536 /// Returns the number of distinct labels in the database.
537 #[must_use]
538 pub fn label_count(&self) -> usize {
539 self.store.label_count()
540 }
541
542 /// Returns the number of distinct property keys in the database.
543 #[must_use]
544 pub fn property_key_count(&self) -> usize {
545 self.store.property_key_count()
546 }
547
548 /// Returns the number of distinct edge types in the database.
549 #[must_use]
550 pub fn edge_type_count(&self) -> usize {
551 self.store.edge_type_count()
552 }
553
554 // === Node Operations ===
555
556 /// Creates a node with the given labels and returns its ID.
557 ///
558 /// Labels categorize nodes - think of them like tags. A node can have
559 /// multiple labels (e.g., `["Person", "Employee"]`).
560 ///
561 /// # Examples
562 ///
563 /// ```
564 /// use grafeo_engine::GrafeoDB;
565 ///
566 /// let db = GrafeoDB::new_in_memory();
567 /// let alice = db.create_node(&["Person"]);
568 /// let company = db.create_node(&["Company", "Startup"]);
569 /// ```
570 pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
571 let id = self.store.create_node(labels);
572
573 // Log to WAL if enabled
574 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
575 id,
576 labels: labels.iter().map(|s| s.to_string()).collect(),
577 }) {
578 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
579 }
580
581 id
582 }
583
584 /// Creates a new node with labels and properties.
585 ///
586 /// If WAL is enabled, the operation is logged for durability.
587 pub fn create_node_with_props(
588 &self,
589 labels: &[&str],
590 properties: impl IntoIterator<
591 Item = (
592 impl Into<grafeo_common::types::PropertyKey>,
593 impl Into<grafeo_common::types::Value>,
594 ),
595 >,
596 ) -> grafeo_common::types::NodeId {
597 // Collect properties first so we can log them to WAL
598 let props: Vec<(
599 grafeo_common::types::PropertyKey,
600 grafeo_common::types::Value,
601 )> = properties
602 .into_iter()
603 .map(|(k, v)| (k.into(), v.into()))
604 .collect();
605
606 let id = self
607 .store
608 .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
609
610 // Log node creation to WAL
611 if let Err(e) = self.log_wal(&WalRecord::CreateNode {
612 id,
613 labels: labels.iter().map(|s| s.to_string()).collect(),
614 }) {
615 tracing::warn!("Failed to log CreateNode to WAL: {}", e);
616 }
617
618 // Log each property to WAL for full durability
619 for (key, value) in props {
620 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
621 id,
622 key: key.to_string(),
623 value,
624 }) {
625 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
626 }
627 }
628
629 id
630 }
631
632 /// Gets a node by ID.
633 #[must_use]
634 pub fn get_node(
635 &self,
636 id: grafeo_common::types::NodeId,
637 ) -> Option<grafeo_core::graph::lpg::Node> {
638 self.store.get_node(id)
639 }
640
641 /// Deletes a node and all its edges.
642 ///
643 /// If WAL is enabled, the operation is logged for durability.
644 pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
645 let result = self.store.delete_node(id);
646
647 if result {
648 if let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
649 tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
650 }
651 }
652
653 result
654 }
655
656 /// Sets a property on a node.
657 ///
658 /// If WAL is enabled, the operation is logged for durability.
659 pub fn set_node_property(
660 &self,
661 id: grafeo_common::types::NodeId,
662 key: &str,
663 value: grafeo_common::types::Value,
664 ) {
665 // Log to WAL first
666 if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
667 id,
668 key: key.to_string(),
669 value: value.clone(),
670 }) {
671 tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
672 }
673
674 self.store.set_node_property(id, key, value);
675 }
676
677 /// Adds a label to an existing node.
678 ///
679 /// Returns `true` if the label was added, `false` if the node doesn't exist
680 /// or already has the label.
681 ///
682 /// # Examples
683 ///
684 /// ```
685 /// use grafeo_engine::GrafeoDB;
686 ///
687 /// let db = GrafeoDB::new_in_memory();
688 /// let alice = db.create_node(&["Person"]);
689 ///
690 /// // Promote Alice to Employee
691 /// let added = db.add_node_label(alice, "Employee");
692 /// assert!(added);
693 /// ```
694 pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
695 let result = self.store.add_label(id, label);
696
697 if result {
698 // Log to WAL if enabled
699 if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
700 id,
701 label: label.to_string(),
702 }) {
703 tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
704 }
705 }
706
707 result
708 }
709
710 /// Removes a label from a node.
711 ///
712 /// Returns `true` if the label was removed, `false` if the node doesn't exist
713 /// or doesn't have the label.
714 ///
715 /// # Examples
716 ///
717 /// ```
718 /// use grafeo_engine::GrafeoDB;
719 ///
720 /// let db = GrafeoDB::new_in_memory();
721 /// let alice = db.create_node(&["Person", "Employee"]);
722 ///
723 /// // Remove Employee status
724 /// let removed = db.remove_node_label(alice, "Employee");
725 /// assert!(removed);
726 /// ```
727 pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
728 let result = self.store.remove_label(id, label);
729
730 if result {
731 // Log to WAL if enabled
732 if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
733 id,
734 label: label.to_string(),
735 }) {
736 tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
737 }
738 }
739
740 result
741 }
742
743 /// Gets all labels for a node.
744 ///
745 /// Returns `None` if the node doesn't exist.
746 ///
747 /// # Examples
748 ///
749 /// ```
750 /// use grafeo_engine::GrafeoDB;
751 ///
752 /// let db = GrafeoDB::new_in_memory();
753 /// let alice = db.create_node(&["Person", "Employee"]);
754 ///
755 /// let labels = db.get_node_labels(alice).unwrap();
756 /// assert!(labels.contains(&"Person".to_string()));
757 /// assert!(labels.contains(&"Employee".to_string()));
758 /// ```
759 #[must_use]
760 pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
761 self.store
762 .get_node(id)
763 .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
764 }
765
766 // === Edge Operations ===
767
768 /// Creates an edge (relationship) between two nodes.
769 ///
770 /// Edges connect nodes and have a type that describes the relationship.
771 /// They're directed - the order of `src` and `dst` matters.
772 ///
773 /// # Examples
774 ///
775 /// ```
776 /// use grafeo_engine::GrafeoDB;
777 ///
778 /// let db = GrafeoDB::new_in_memory();
779 /// let alice = db.create_node(&["Person"]);
780 /// let bob = db.create_node(&["Person"]);
781 ///
782 /// // Alice knows Bob (directed: Alice -> Bob)
783 /// let edge = db.create_edge(alice, bob, "KNOWS");
784 /// ```
785 pub fn create_edge(
786 &self,
787 src: grafeo_common::types::NodeId,
788 dst: grafeo_common::types::NodeId,
789 edge_type: &str,
790 ) -> grafeo_common::types::EdgeId {
791 let id = self.store.create_edge(src, dst, edge_type);
792
793 // Log to WAL if enabled
794 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
795 id,
796 src,
797 dst,
798 edge_type: edge_type.to_string(),
799 }) {
800 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
801 }
802
803 id
804 }
805
806 /// Creates a new edge with properties.
807 ///
808 /// If WAL is enabled, the operation is logged for durability.
809 pub fn create_edge_with_props(
810 &self,
811 src: grafeo_common::types::NodeId,
812 dst: grafeo_common::types::NodeId,
813 edge_type: &str,
814 properties: impl IntoIterator<
815 Item = (
816 impl Into<grafeo_common::types::PropertyKey>,
817 impl Into<grafeo_common::types::Value>,
818 ),
819 >,
820 ) -> grafeo_common::types::EdgeId {
821 // Collect properties first so we can log them to WAL
822 let props: Vec<(
823 grafeo_common::types::PropertyKey,
824 grafeo_common::types::Value,
825 )> = properties
826 .into_iter()
827 .map(|(k, v)| (k.into(), v.into()))
828 .collect();
829
830 let id = self.store.create_edge_with_props(
831 src,
832 dst,
833 edge_type,
834 props.iter().map(|(k, v)| (k.clone(), v.clone())),
835 );
836
837 // Log edge creation to WAL
838 if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
839 id,
840 src,
841 dst,
842 edge_type: edge_type.to_string(),
843 }) {
844 tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
845 }
846
847 // Log each property to WAL for full durability
848 for (key, value) in props {
849 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
850 id,
851 key: key.to_string(),
852 value,
853 }) {
854 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
855 }
856 }
857
858 id
859 }
860
861 /// Gets an edge by ID.
862 #[must_use]
863 pub fn get_edge(
864 &self,
865 id: grafeo_common::types::EdgeId,
866 ) -> Option<grafeo_core::graph::lpg::Edge> {
867 self.store.get_edge(id)
868 }
869
870 /// Deletes an edge.
871 ///
872 /// If WAL is enabled, the operation is logged for durability.
873 pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
874 let result = self.store.delete_edge(id);
875
876 if result {
877 if let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
878 tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
879 }
880 }
881
882 result
883 }
884
885 /// Sets a property on an edge.
886 ///
887 /// If WAL is enabled, the operation is logged for durability.
888 pub fn set_edge_property(
889 &self,
890 id: grafeo_common::types::EdgeId,
891 key: &str,
892 value: grafeo_common::types::Value,
893 ) {
894 // Log to WAL first
895 if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
896 id,
897 key: key.to_string(),
898 value: value.clone(),
899 }) {
900 tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
901 }
902 self.store.set_edge_property(id, key, value);
903 }
904
905 /// Removes a property from a node.
906 ///
907 /// Returns true if the property existed and was removed, false otherwise.
908 pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
909 // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
910 self.store.remove_node_property(id, key).is_some()
911 }
912
913 /// Removes a property from an edge.
914 ///
915 /// Returns true if the property existed and was removed, false otherwise.
916 pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
917 // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
918 self.store.remove_edge_property(id, key).is_some()
919 }
920
921 // =========================================================================
922 // PROPERTY INDEX API
923 // =========================================================================
924
925 /// Creates an index on a node property for O(1) lookups by value.
926 ///
927 /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
928 /// O(1) instead of O(n) for this property. The index is automatically
929 /// maintained when properties are set or removed.
930 ///
931 /// # Example
932 ///
933 /// ```ignore
934 /// // Create an index on the 'email' property
935 /// db.create_property_index("email");
936 ///
937 /// // Now lookups by email are O(1)
938 /// let nodes = db.find_nodes_by_property("email", &Value::from("alice@example.com"));
939 /// ```
940 pub fn create_property_index(&self, property: &str) {
941 self.store.create_property_index(property);
942 }
943
944 /// Drops an index on a node property.
945 ///
946 /// Returns `true` if the index existed and was removed.
947 pub fn drop_property_index(&self, property: &str) -> bool {
948 self.store.drop_property_index(property)
949 }
950
951 /// Returns `true` if the property has an index.
952 #[must_use]
953 pub fn has_property_index(&self, property: &str) -> bool {
954 self.store.has_property_index(property)
955 }
956
957 /// Finds all nodes that have a specific property value.
958 ///
959 /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
960 /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
961 ///
962 /// # Example
963 ///
964 /// ```ignore
965 /// // Create index for fast lookups (optional but recommended)
966 /// db.create_property_index("city");
967 ///
968 /// // Find all nodes where city = "NYC"
969 /// let nyc_nodes = db.find_nodes_by_property("city", &Value::from("NYC"));
970 /// ```
971 #[must_use]
972 pub fn find_nodes_by_property(
973 &self,
974 property: &str,
975 value: &grafeo_common::types::Value,
976 ) -> Vec<grafeo_common::types::NodeId> {
977 self.store.find_nodes_by_property(property, value)
978 }
979
980 // =========================================================================
981 // ADMIN API: Introspection
982 // =========================================================================
983
984 /// Returns true if this database is backed by a file (persistent).
985 ///
986 /// In-memory databases return false.
987 #[must_use]
988 pub fn is_persistent(&self) -> bool {
989 self.config.path.is_some()
990 }
991
992 /// Returns the database file path, if persistent.
993 ///
994 /// In-memory databases return None.
995 #[must_use]
996 pub fn path(&self) -> Option<&Path> {
997 self.config.path.as_deref()
998 }
999
1000 /// Returns high-level database information.
1001 ///
1002 /// Includes node/edge counts, persistence status, and mode (LPG/RDF).
1003 #[must_use]
1004 pub fn info(&self) -> crate::admin::DatabaseInfo {
1005 crate::admin::DatabaseInfo {
1006 mode: crate::admin::DatabaseMode::Lpg,
1007 node_count: self.store.node_count(),
1008 edge_count: self.store.edge_count(),
1009 is_persistent: self.is_persistent(),
1010 path: self.config.path.clone(),
1011 wal_enabled: self.config.wal_enabled,
1012 version: env!("CARGO_PKG_VERSION").to_string(),
1013 }
1014 }
1015
1016 /// Returns detailed database statistics.
1017 ///
1018 /// Includes counts, memory usage, and index information.
1019 #[must_use]
1020 pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1021 let disk_bytes = self.config.path.as_ref().and_then(|p| {
1022 if p.exists() {
1023 Self::calculate_disk_usage(p).ok()
1024 } else {
1025 None
1026 }
1027 });
1028
1029 crate::admin::DatabaseStats {
1030 node_count: self.store.node_count(),
1031 edge_count: self.store.edge_count(),
1032 label_count: self.store.label_count(),
1033 edge_type_count: self.store.edge_type_count(),
1034 property_key_count: self.store.property_key_count(),
1035 index_count: 0, // TODO: implement index tracking
1036 memory_bytes: self.buffer_manager.allocated(),
1037 disk_bytes,
1038 }
1039 }
1040
1041 /// Calculates total disk usage for the database directory.
1042 fn calculate_disk_usage(path: &Path) -> Result<usize> {
1043 let mut total = 0usize;
1044 if path.is_dir() {
1045 for entry in std::fs::read_dir(path)? {
1046 let entry = entry?;
1047 let metadata = entry.metadata()?;
1048 if metadata.is_file() {
1049 total += metadata.len() as usize;
1050 } else if metadata.is_dir() {
1051 total += Self::calculate_disk_usage(&entry.path())?;
1052 }
1053 }
1054 }
1055 Ok(total)
1056 }
1057
1058 /// Returns schema information (labels, edge types, property keys).
1059 ///
1060 /// For LPG mode, returns label and edge type information.
1061 /// For RDF mode, returns predicate and named graph information.
1062 #[must_use]
1063 pub fn schema(&self) -> crate::admin::SchemaInfo {
1064 let labels = self
1065 .store
1066 .all_labels()
1067 .into_iter()
1068 .map(|name| crate::admin::LabelInfo {
1069 name: name.clone(),
1070 count: self.store.nodes_with_label(&name).count(),
1071 })
1072 .collect();
1073
1074 let edge_types = self
1075 .store
1076 .all_edge_types()
1077 .into_iter()
1078 .map(|name| crate::admin::EdgeTypeInfo {
1079 name: name.clone(),
1080 count: self.store.edges_with_type(&name).count(),
1081 })
1082 .collect();
1083
1084 let property_keys = self.store.all_property_keys();
1085
1086 crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
1087 labels,
1088 edge_types,
1089 property_keys,
1090 })
1091 }
1092
1093 /// Returns RDF schema information.
1094 ///
1095 /// Only available when the RDF feature is enabled.
1096 #[cfg(feature = "rdf")]
1097 #[must_use]
1098 pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
1099 let stats = self.rdf_store.stats();
1100
1101 let predicates = self
1102 .rdf_store
1103 .predicates()
1104 .into_iter()
1105 .map(|predicate| {
1106 let count = self.rdf_store.triples_with_predicate(&predicate).len();
1107 crate::admin::PredicateInfo {
1108 iri: predicate.to_string(),
1109 count,
1110 }
1111 })
1112 .collect();
1113
1114 crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
1115 predicates,
1116 named_graphs: Vec::new(), // Named graphs not yet implemented in RdfStore
1117 subject_count: stats.subject_count,
1118 object_count: stats.object_count,
1119 })
1120 }
1121
1122 /// Validates database integrity.
1123 ///
1124 /// Checks for:
1125 /// - Dangling edge references (edges pointing to non-existent nodes)
1126 /// - Internal index consistency
1127 ///
1128 /// Returns a list of errors and warnings. Empty errors = valid.
1129 #[must_use]
1130 pub fn validate(&self) -> crate::admin::ValidationResult {
1131 let mut result = crate::admin::ValidationResult::default();
1132
1133 // Check for dangling edge references
1134 for edge in self.store.all_edges() {
1135 if self.store.get_node(edge.src).is_none() {
1136 result.errors.push(crate::admin::ValidationError {
1137 code: "DANGLING_SRC".to_string(),
1138 message: format!(
1139 "Edge {} references non-existent source node {}",
1140 edge.id.0, edge.src.0
1141 ),
1142 context: Some(format!("edge:{}", edge.id.0)),
1143 });
1144 }
1145 if self.store.get_node(edge.dst).is_none() {
1146 result.errors.push(crate::admin::ValidationError {
1147 code: "DANGLING_DST".to_string(),
1148 message: format!(
1149 "Edge {} references non-existent destination node {}",
1150 edge.id.0, edge.dst.0
1151 ),
1152 context: Some(format!("edge:{}", edge.id.0)),
1153 });
1154 }
1155 }
1156
1157 // Add warnings for potential issues
1158 if self.store.node_count() > 0 && self.store.edge_count() == 0 {
1159 result.warnings.push(crate::admin::ValidationWarning {
1160 code: "NO_EDGES".to_string(),
1161 message: "Database has nodes but no edges".to_string(),
1162 context: None,
1163 });
1164 }
1165
1166 result
1167 }
1168
1169 /// Returns WAL (Write-Ahead Log) status.
1170 ///
1171 /// Returns None if WAL is not enabled.
1172 #[must_use]
1173 pub fn wal_status(&self) -> crate::admin::WalStatus {
1174 if let Some(ref wal) = self.wal {
1175 crate::admin::WalStatus {
1176 enabled: true,
1177 path: self.config.path.as_ref().map(|p| p.join("wal")),
1178 size_bytes: wal.size_bytes(),
1179 record_count: wal.record_count() as usize,
1180 last_checkpoint: wal.last_checkpoint_timestamp(),
1181 current_epoch: self.store.current_epoch().as_u64(),
1182 }
1183 } else {
1184 crate::admin::WalStatus {
1185 enabled: false,
1186 path: None,
1187 size_bytes: 0,
1188 record_count: 0,
1189 last_checkpoint: None,
1190 current_epoch: self.store.current_epoch().as_u64(),
1191 }
1192 }
1193 }
1194
1195 /// Forces a WAL checkpoint.
1196 ///
1197 /// Flushes all pending WAL records to the main storage.
1198 ///
1199 /// # Errors
1200 ///
1201 /// Returns an error if the checkpoint fails.
1202 pub fn wal_checkpoint(&self) -> Result<()> {
1203 if let Some(ref wal) = self.wal {
1204 let epoch = self.store.current_epoch();
1205 let tx_id = self
1206 .tx_manager
1207 .last_assigned_tx_id()
1208 .unwrap_or_else(|| self.tx_manager.begin());
1209 wal.checkpoint(tx_id, epoch)?;
1210 wal.sync()?;
1211 }
1212 Ok(())
1213 }
1214
1215 // =========================================================================
1216 // ADMIN API: Persistence Control
1217 // =========================================================================
1218
1219 /// Saves the database to a file path.
1220 ///
1221 /// - If in-memory: creates a new persistent database at path
1222 /// - If file-backed: creates a copy at the new path
1223 ///
1224 /// The original database remains unchanged.
1225 ///
1226 /// # Errors
1227 ///
1228 /// Returns an error if the save operation fails.
1229 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
1230 let path = path.as_ref();
1231
1232 // Create target database with WAL enabled
1233 let target_config = Config::persistent(path);
1234 let target = Self::with_config(target_config)?;
1235
1236 // Copy all nodes using WAL-enabled methods
1237 for node in self.store.all_nodes() {
1238 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1239 target.store.create_node_with_id(node.id, &label_refs);
1240
1241 // Log to WAL
1242 target.log_wal(&WalRecord::CreateNode {
1243 id: node.id,
1244 labels: node.labels.iter().map(|s| s.to_string()).collect(),
1245 })?;
1246
1247 // Copy properties
1248 for (key, value) in node.properties {
1249 target
1250 .store
1251 .set_node_property(node.id, key.as_str(), value.clone());
1252 target.log_wal(&WalRecord::SetNodeProperty {
1253 id: node.id,
1254 key: key.to_string(),
1255 value,
1256 })?;
1257 }
1258 }
1259
1260 // Copy all edges using WAL-enabled methods
1261 for edge in self.store.all_edges() {
1262 target
1263 .store
1264 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1265
1266 // Log to WAL
1267 target.log_wal(&WalRecord::CreateEdge {
1268 id: edge.id,
1269 src: edge.src,
1270 dst: edge.dst,
1271 edge_type: edge.edge_type.to_string(),
1272 })?;
1273
1274 // Copy properties
1275 for (key, value) in edge.properties {
1276 target
1277 .store
1278 .set_edge_property(edge.id, key.as_str(), value.clone());
1279 target.log_wal(&WalRecord::SetEdgeProperty {
1280 id: edge.id,
1281 key: key.to_string(),
1282 value,
1283 })?;
1284 }
1285 }
1286
1287 // Checkpoint and close the target database
1288 target.close()?;
1289
1290 Ok(())
1291 }
1292
1293 /// Creates an in-memory copy of this database.
1294 ///
1295 /// Returns a new database that is completely independent.
1296 /// Useful for:
1297 /// - Testing modifications without affecting the original
1298 /// - Faster operations when persistence isn't needed
1299 ///
1300 /// # Errors
1301 ///
1302 /// Returns an error if the copy operation fails.
1303 pub fn to_memory(&self) -> Result<Self> {
1304 let config = Config::in_memory();
1305 let target = Self::with_config(config)?;
1306
1307 // Copy all nodes
1308 for node in self.store.all_nodes() {
1309 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1310 target.store.create_node_with_id(node.id, &label_refs);
1311
1312 // Copy properties
1313 for (key, value) in node.properties {
1314 target.store.set_node_property(node.id, key.as_str(), value);
1315 }
1316 }
1317
1318 // Copy all edges
1319 for edge in self.store.all_edges() {
1320 target
1321 .store
1322 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1323
1324 // Copy properties
1325 for (key, value) in edge.properties {
1326 target.store.set_edge_property(edge.id, key.as_str(), value);
1327 }
1328 }
1329
1330 Ok(target)
1331 }
1332
1333 /// Opens a database file and loads it entirely into memory.
1334 ///
1335 /// The returned database has no connection to the original file.
1336 /// Changes will NOT be written back to the file.
1337 ///
1338 /// # Errors
1339 ///
1340 /// Returns an error if the file can't be opened or loaded.
1341 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
1342 // Open the source database (triggers WAL recovery)
1343 let source = Self::open(path)?;
1344
1345 // Create in-memory copy
1346 let target = source.to_memory()?;
1347
1348 // Close the source (releases file handles)
1349 source.close()?;
1350
1351 Ok(target)
1352 }
1353
1354 // =========================================================================
1355 // ADMIN API: Iteration
1356 // =========================================================================
1357
1358 /// Returns an iterator over all nodes in the database.
1359 ///
1360 /// Useful for dump/export operations.
1361 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1362 self.store.all_nodes()
1363 }
1364
1365 /// Returns an iterator over all edges in the database.
1366 ///
1367 /// Useful for dump/export operations.
1368 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1369 self.store.all_edges()
1370 }
1371}
1372
1373impl Drop for GrafeoDB {
1374 fn drop(&mut self) {
1375 if let Err(e) = self.close() {
1376 tracing::error!("Error closing database: {}", e);
1377 }
1378 }
1379}
1380
1381/// The result of running a query.
1382///
1383/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1384/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1385///
1386/// # Examples
1387///
1388/// ```
1389/// use grafeo_engine::GrafeoDB;
1390///
1391/// let db = GrafeoDB::new_in_memory();
1392/// db.create_node(&["Person"]);
1393///
1394/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1395///
1396/// // Check what we got
1397/// println!("Columns: {:?}", result.columns);
1398/// println!("Rows: {}", result.row_count());
1399///
1400/// // Iterate through results
1401/// for row in result.iter() {
1402/// println!("{:?}", row);
1403/// }
1404/// # Ok::<(), grafeo_common::utils::error::Error>(())
1405/// ```
1406#[derive(Debug)]
1407pub struct QueryResult {
1408 /// Column names from the RETURN clause.
1409 pub columns: Vec<String>,
1410 /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1411 pub column_types: Vec<grafeo_common::types::LogicalType>,
1412 /// The actual result rows.
1413 pub rows: Vec<Vec<grafeo_common::types::Value>>,
1414}
1415
1416impl QueryResult {
1417 /// Creates a new empty query result.
1418 #[must_use]
1419 pub fn new(columns: Vec<String>) -> Self {
1420 let len = columns.len();
1421 Self {
1422 columns,
1423 column_types: vec![grafeo_common::types::LogicalType::Any; len],
1424 rows: Vec::new(),
1425 }
1426 }
1427
1428 /// Creates a new empty query result with column types.
1429 #[must_use]
1430 pub fn with_types(
1431 columns: Vec<String>,
1432 column_types: Vec<grafeo_common::types::LogicalType>,
1433 ) -> Self {
1434 Self {
1435 columns,
1436 column_types,
1437 rows: Vec::new(),
1438 }
1439 }
1440
1441 /// Returns the number of rows.
1442 #[must_use]
1443 pub fn row_count(&self) -> usize {
1444 self.rows.len()
1445 }
1446
1447 /// Returns the number of columns.
1448 #[must_use]
1449 pub fn column_count(&self) -> usize {
1450 self.columns.len()
1451 }
1452
1453 /// Returns true if the result is empty.
1454 #[must_use]
1455 pub fn is_empty(&self) -> bool {
1456 self.rows.is_empty()
1457 }
1458
1459 /// Extracts a single value from the result.
1460 ///
1461 /// Use this when your query returns exactly one row with one column,
1462 /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1463 ///
1464 /// # Errors
1465 ///
1466 /// Returns an error if the result has multiple rows or columns.
1467 pub fn scalar<T: FromValue>(&self) -> Result<T> {
1468 if self.rows.len() != 1 || self.columns.len() != 1 {
1469 return Err(grafeo_common::utils::error::Error::InvalidValue(
1470 "Expected single value".to_string(),
1471 ));
1472 }
1473 T::from_value(&self.rows[0][0])
1474 }
1475
1476 /// Returns an iterator over the rows.
1477 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1478 self.rows.iter()
1479 }
1480}
1481
1482/// Converts a [`Value`](grafeo_common::types::Value) to a concrete Rust type.
1483///
1484/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1485/// Used by [`QueryResult::scalar()`] to extract typed values.
1486pub trait FromValue: Sized {
1487 /// Attempts the conversion, returning an error on type mismatch.
1488 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1489}
1490
1491impl FromValue for i64 {
1492 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1493 value
1494 .as_int64()
1495 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1496 expected: "INT64".to_string(),
1497 found: value.type_name().to_string(),
1498 })
1499 }
1500}
1501
1502impl FromValue for f64 {
1503 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1504 value
1505 .as_float64()
1506 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1507 expected: "FLOAT64".to_string(),
1508 found: value.type_name().to_string(),
1509 })
1510 }
1511}
1512
1513impl FromValue for String {
1514 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1515 value.as_str().map(String::from).ok_or_else(|| {
1516 grafeo_common::utils::error::Error::TypeMismatch {
1517 expected: "STRING".to_string(),
1518 found: value.type_name().to_string(),
1519 }
1520 })
1521 }
1522}
1523
1524impl FromValue for bool {
1525 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1526 value
1527 .as_bool()
1528 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1529 expected: "BOOL".to_string(),
1530 found: value.type_name().to_string(),
1531 })
1532 }
1533}
1534
1535#[cfg(test)]
1536mod tests {
1537 use super::*;
1538
1539 #[test]
1540 fn test_create_in_memory_database() {
1541 let db = GrafeoDB::new_in_memory();
1542 assert_eq!(db.node_count(), 0);
1543 assert_eq!(db.edge_count(), 0);
1544 }
1545
1546 #[test]
1547 fn test_database_config() {
1548 let config = Config::in_memory().with_threads(4).with_query_logging();
1549
1550 let db = GrafeoDB::with_config(config).unwrap();
1551 assert_eq!(db.config().threads, 4);
1552 assert!(db.config().query_logging);
1553 }
1554
1555 #[test]
1556 fn test_database_session() {
1557 let db = GrafeoDB::new_in_memory();
1558 let _session = db.session();
1559 // Session should be created successfully
1560 }
1561
1562 #[test]
1563 fn test_persistent_database_recovery() {
1564 use grafeo_common::types::Value;
1565 use tempfile::tempdir;
1566
1567 let dir = tempdir().unwrap();
1568 let db_path = dir.path().join("test_db");
1569
1570 // Create database and add some data
1571 {
1572 let db = GrafeoDB::open(&db_path).unwrap();
1573
1574 let alice = db.create_node(&["Person"]);
1575 db.set_node_property(alice, "name", Value::from("Alice"));
1576
1577 let bob = db.create_node(&["Person"]);
1578 db.set_node_property(bob, "name", Value::from("Bob"));
1579
1580 let _edge = db.create_edge(alice, bob, "KNOWS");
1581
1582 // Explicitly close to flush WAL
1583 db.close().unwrap();
1584 }
1585
1586 // Reopen and verify data was recovered
1587 {
1588 let db = GrafeoDB::open(&db_path).unwrap();
1589
1590 assert_eq!(db.node_count(), 2);
1591 assert_eq!(db.edge_count(), 1);
1592
1593 // Verify nodes exist
1594 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1595 assert!(node0.is_some());
1596
1597 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1598 assert!(node1.is_some());
1599 }
1600 }
1601
1602 #[test]
1603 fn test_wal_logging() {
1604 use tempfile::tempdir;
1605
1606 let dir = tempdir().unwrap();
1607 let db_path = dir.path().join("wal_test_db");
1608
1609 let db = GrafeoDB::open(&db_path).unwrap();
1610
1611 // Create some data
1612 let node = db.create_node(&["Test"]);
1613 db.delete_node(node);
1614
1615 // WAL should have records
1616 if let Some(wal) = db.wal() {
1617 assert!(wal.record_count() > 0);
1618 }
1619
1620 db.close().unwrap();
1621 }
1622}