Skip to main content

grafeo_engine/
database.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4
5use std::path::Path;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10#[cfg(feature = "wal")]
11use grafeo_adapters::storage::wal::{
12    DurabilityMode as WalDurabilityMode, WalConfig, WalManager, WalRecord, WalRecovery,
13};
14use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
15use grafeo_common::types::{EdgeId, NodeId, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::graph::lpg::LpgStore;
18#[cfg(feature = "rdf")]
19use grafeo_core::graph::rdf::RdfStore;
20
21use crate::config::Config;
22use crate::query::cache::QueryCache;
23use crate::session::Session;
24use crate::transaction::TransactionManager;
25
26/// Your handle to a Grafeo database.
27///
28/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
29/// quick experiments, or [`open()`](Self::open) for persistent storage.
30/// Then grab a [`session()`](Self::session) to start querying.
31///
32/// # Examples
33///
34/// ```
35/// use grafeo_engine::GrafeoDB;
36///
37/// // Quick in-memory database
38/// let db = GrafeoDB::new_in_memory();
39///
40/// // Add some data
41/// db.create_node(&["Person"]);
42///
43/// // Query it
44/// let session = db.session();
45/// let result = session.execute("MATCH (p:Person) RETURN p")?;
46/// # Ok::<(), grafeo_common::utils::error::Error>(())
47/// ```
48pub struct GrafeoDB {
49    /// Database configuration.
50    config: Config,
51    /// The underlying graph store.
52    store: Arc<LpgStore>,
53    /// RDF triple store (if RDF feature is enabled).
54    #[cfg(feature = "rdf")]
55    rdf_store: Arc<RdfStore>,
56    /// Transaction manager.
57    tx_manager: Arc<TransactionManager>,
58    /// Unified buffer manager.
59    buffer_manager: Arc<BufferManager>,
60    /// Write-ahead log manager (if durability is enabled).
61    #[cfg(feature = "wal")]
62    wal: Option<Arc<WalManager>>,
63    /// Query cache for parsed and optimized plans.
64    query_cache: Arc<QueryCache>,
65    /// Whether the database is open.
66    is_open: RwLock<bool>,
67}
68
69impl GrafeoDB {
70    /// Creates an in-memory database - fast to create, gone when dropped.
71    ///
72    /// Use this for tests, experiments, or when you don't need persistence.
73    /// For data that survives restarts, use [`open()`](Self::open) instead.
74    ///
75    /// # Examples
76    ///
77    /// ```
78    /// use grafeo_engine::GrafeoDB;
79    ///
80    /// let db = GrafeoDB::new_in_memory();
81    /// let session = db.session();
82    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
83    /// # Ok::<(), grafeo_common::utils::error::Error>(())
84    /// ```
85    #[must_use]
86    pub fn new_in_memory() -> Self {
87        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
88    }
89
90    /// Opens a database at the given path, creating it if it doesn't exist.
91    ///
92    /// If you've used this path before, Grafeo recovers your data from the
93    /// write-ahead log automatically. First open on a new path creates an
94    /// empty database.
95    ///
96    /// # Errors
97    ///
98    /// Returns an error if the path isn't writable or recovery fails.
99    ///
100    /// # Examples
101    ///
102    /// ```no_run
103    /// use grafeo_engine::GrafeoDB;
104    ///
105    /// let db = GrafeoDB::open("./my_social_network")?;
106    /// # Ok::<(), grafeo_common::utils::error::Error>(())
107    /// ```
108    #[cfg(feature = "wal")]
109    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
110        Self::with_config(Config::persistent(path.as_ref()))
111    }
112
113    /// Creates a database with custom configuration.
114    ///
115    /// Use this when you need fine-grained control over memory limits,
116    /// thread counts, or persistence settings. For most cases,
117    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
118    /// are simpler.
119    ///
120    /// # Errors
121    ///
122    /// Returns an error if the database can't be created or recovery fails.
123    ///
124    /// # Examples
125    ///
126    /// ```
127    /// use grafeo_engine::{GrafeoDB, Config};
128    ///
129    /// // In-memory with a 512MB limit
130    /// let config = Config::in_memory()
131    ///     .with_memory_limit(512 * 1024 * 1024);
132    ///
133    /// let db = GrafeoDB::with_config(config)?;
134    /// # Ok::<(), grafeo_common::utils::error::Error>(())
135    /// ```
136    pub fn with_config(config: Config) -> Result<Self> {
137        // Validate configuration before proceeding
138        config
139            .validate()
140            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
141
142        let store = Arc::new(LpgStore::new());
143        #[cfg(feature = "rdf")]
144        let rdf_store = Arc::new(RdfStore::new());
145        let tx_manager = Arc::new(TransactionManager::new());
146
147        // Create buffer manager with configured limits
148        let buffer_config = BufferManagerConfig {
149            budget: config.memory_limit.unwrap_or_else(|| {
150                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
151            }),
152            spill_path: config
153                .spill_path
154                .clone()
155                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
156            ..BufferManagerConfig::default()
157        };
158        let buffer_manager = BufferManager::new(buffer_config);
159
160        // Initialize WAL if persistence is enabled
161        #[cfg(feature = "wal")]
162        let wal = if config.wal_enabled {
163            if let Some(ref db_path) = config.path {
164                // Create database directory if it doesn't exist
165                std::fs::create_dir_all(db_path)?;
166
167                let wal_path = db_path.join("wal");
168
169                // Check if WAL exists and recover if needed
170                if wal_path.exists() {
171                    let recovery = WalRecovery::new(&wal_path);
172                    let records = recovery.recover()?;
173                    Self::apply_wal_records(&store, &records)?;
174                }
175
176                // Open/create WAL manager with configured durability
177                let wal_durability = match config.wal_durability {
178                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
179                    crate::config::DurabilityMode::Batch {
180                        max_delay_ms,
181                        max_records,
182                    } => WalDurabilityMode::Batch {
183                        max_delay_ms,
184                        max_records,
185                    },
186                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
187                        WalDurabilityMode::Adaptive { target_interval_ms }
188                    }
189                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
190                };
191                let wal_config = WalConfig {
192                    durability: wal_durability,
193                    ..WalConfig::default()
194                };
195                let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
196                Some(Arc::new(wal_manager))
197            } else {
198                None
199            }
200        } else {
201            None
202        };
203
204        // Create query cache with default capacity (1000 queries)
205        let query_cache = Arc::new(QueryCache::default());
206
207        Ok(Self {
208            config,
209            store,
210            #[cfg(feature = "rdf")]
211            rdf_store,
212            tx_manager,
213            buffer_manager,
214            #[cfg(feature = "wal")]
215            wal,
216            query_cache,
217            is_open: RwLock::new(true),
218        })
219    }
220
221    /// Applies WAL records to restore the database state.
222    #[cfg(feature = "wal")]
223    fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
224        for record in records {
225            match record {
226                WalRecord::CreateNode { id, labels } => {
227                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
228                    store.create_node_with_id(*id, &label_refs);
229                }
230                WalRecord::DeleteNode { id } => {
231                    store.delete_node(*id);
232                }
233                WalRecord::CreateEdge {
234                    id,
235                    src,
236                    dst,
237                    edge_type,
238                } => {
239                    store.create_edge_with_id(*id, *src, *dst, edge_type);
240                }
241                WalRecord::DeleteEdge { id } => {
242                    store.delete_edge(*id);
243                }
244                WalRecord::SetNodeProperty { id, key, value } => {
245                    store.set_node_property(*id, key, value.clone());
246                }
247                WalRecord::SetEdgeProperty { id, key, value } => {
248                    store.set_edge_property(*id, key, value.clone());
249                }
250                WalRecord::AddNodeLabel { id, label } => {
251                    store.add_label(*id, label);
252                }
253                WalRecord::RemoveNodeLabel { id, label } => {
254                    store.remove_label(*id, label);
255                }
256                WalRecord::TxCommit { .. }
257                | WalRecord::TxAbort { .. }
258                | WalRecord::Checkpoint { .. } => {
259                    // Transaction control records don't need replay action
260                    // (recovery already filtered to only committed transactions)
261                }
262            }
263        }
264        Ok(())
265    }
266
267    /// Opens a new session for running queries.
268    ///
269    /// Sessions are cheap to create - spin up as many as you need. Each
270    /// gets its own transaction context, so concurrent sessions won't
271    /// block each other on reads.
272    ///
273    /// # Examples
274    ///
275    /// ```
276    /// use grafeo_engine::GrafeoDB;
277    ///
278    /// let db = GrafeoDB::new_in_memory();
279    /// let session = db.session();
280    ///
281    /// // Run queries through the session
282    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
283    /// # Ok::<(), grafeo_common::utils::error::Error>(())
284    /// ```
285    #[must_use]
286    pub fn session(&self) -> Session {
287        #[cfg(feature = "rdf")]
288        {
289            Session::with_rdf_store_and_adaptive(
290                Arc::clone(&self.store),
291                Arc::clone(&self.rdf_store),
292                Arc::clone(&self.tx_manager),
293                Arc::clone(&self.query_cache),
294                self.config.adaptive.clone(),
295                self.config.factorized_execution,
296                self.config.graph_model,
297            )
298        }
299        #[cfg(not(feature = "rdf"))]
300        {
301            Session::with_adaptive(
302                Arc::clone(&self.store),
303                Arc::clone(&self.tx_manager),
304                Arc::clone(&self.query_cache),
305                self.config.adaptive.clone(),
306                self.config.factorized_execution,
307                self.config.graph_model,
308            )
309        }
310    }
311
312    /// Returns the adaptive execution configuration.
313    #[must_use]
314    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
315        &self.config.adaptive
316    }
317
318    /// Runs a query directly on the database.
319    ///
320    /// A convenience method that creates a temporary session behind the
321    /// scenes. If you're running multiple queries, grab a
322    /// [`session()`](Self::session) instead to avoid the overhead.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if parsing or execution fails.
327    pub fn execute(&self, query: &str) -> Result<QueryResult> {
328        let session = self.session();
329        session.execute(query)
330    }
331
332    /// Executes a query with parameters and returns the result.
333    ///
334    /// # Errors
335    ///
336    /// Returns an error if the query fails.
337    pub fn execute_with_params(
338        &self,
339        query: &str,
340        params: std::collections::HashMap<String, grafeo_common::types::Value>,
341    ) -> Result<QueryResult> {
342        let session = self.session();
343        session.execute_with_params(query, params)
344    }
345
346    /// Executes a Cypher query and returns the result.
347    ///
348    /// # Errors
349    ///
350    /// Returns an error if the query fails.
351    #[cfg(feature = "cypher")]
352    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
353        let session = self.session();
354        session.execute_cypher(query)
355    }
356
357    /// Executes a Cypher query with parameters and returns the result.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if the query fails.
362    #[cfg(feature = "cypher")]
363    pub fn execute_cypher_with_params(
364        &self,
365        query: &str,
366        params: std::collections::HashMap<String, grafeo_common::types::Value>,
367    ) -> Result<QueryResult> {
368        use crate::query::processor::{QueryLanguage, QueryProcessor};
369
370        // Create processor
371        let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
372        processor.process(query, QueryLanguage::Cypher, Some(&params))
373    }
374
375    /// Executes a Gremlin query and returns the result.
376    ///
377    /// # Errors
378    ///
379    /// Returns an error if the query fails.
380    #[cfg(feature = "gremlin")]
381    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
382        let session = self.session();
383        session.execute_gremlin(query)
384    }
385
386    /// Executes a Gremlin query with parameters and returns the result.
387    ///
388    /// # Errors
389    ///
390    /// Returns an error if the query fails.
391    #[cfg(feature = "gremlin")]
392    pub fn execute_gremlin_with_params(
393        &self,
394        query: &str,
395        params: std::collections::HashMap<String, grafeo_common::types::Value>,
396    ) -> Result<QueryResult> {
397        let session = self.session();
398        session.execute_gremlin_with_params(query, params)
399    }
400
401    /// Executes a GraphQL query and returns the result.
402    ///
403    /// # Errors
404    ///
405    /// Returns an error if the query fails.
406    #[cfg(feature = "graphql")]
407    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
408        let session = self.session();
409        session.execute_graphql(query)
410    }
411
412    /// Executes a GraphQL query with parameters and returns the result.
413    ///
414    /// # Errors
415    ///
416    /// Returns an error if the query fails.
417    #[cfg(feature = "graphql")]
418    pub fn execute_graphql_with_params(
419        &self,
420        query: &str,
421        params: std::collections::HashMap<String, grafeo_common::types::Value>,
422    ) -> Result<QueryResult> {
423        let session = self.session();
424        session.execute_graphql_with_params(query, params)
425    }
426
427    /// Executes a SPARQL query and returns the result.
428    ///
429    /// SPARQL queries operate on the RDF triple store.
430    ///
431    /// # Errors
432    ///
433    /// Returns an error if the query fails.
434    ///
435    /// # Examples
436    ///
437    /// ```ignore
438    /// use grafeo_engine::GrafeoDB;
439    ///
440    /// let db = GrafeoDB::new_in_memory();
441    /// let result = db.execute_sparql("SELECT ?s ?p ?o WHERE { ?s ?p ?o }")?;
442    /// ```
443    #[cfg(all(feature = "sparql", feature = "rdf"))]
444    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
445        use crate::query::{
446            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
447        };
448
449        // Parse and translate the SPARQL query to a logical plan
450        let logical_plan = sparql_translator::translate(query)?;
451
452        // Optimize the plan
453        let optimizer = Optimizer::from_store(&self.store);
454        let optimized_plan = optimizer.optimize(logical_plan)?;
455
456        // Convert to physical plan using RDF planner
457        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
458        let mut physical_plan = planner.plan(&optimized_plan)?;
459
460        // Execute the plan
461        let executor = Executor::with_columns(physical_plan.columns.clone());
462        executor.execute(physical_plan.operator.as_mut())
463    }
464
465    /// Returns the RDF store.
466    ///
467    /// This provides direct access to the RDF store for triple operations.
468    #[cfg(feature = "rdf")]
469    #[must_use]
470    pub fn rdf_store(&self) -> &Arc<RdfStore> {
471        &self.rdf_store
472    }
473
474    /// Executes a query and returns a single scalar value.
475    ///
476    /// # Errors
477    ///
478    /// Returns an error if the query fails or doesn't return exactly one row.
479    pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
480        let result = self.execute(query)?;
481        result.scalar()
482    }
483
484    /// Returns the configuration.
485    #[must_use]
486    pub fn config(&self) -> &Config {
487        &self.config
488    }
489
490    /// Returns the graph data model of this database.
491    #[must_use]
492    pub fn graph_model(&self) -> crate::config::GraphModel {
493        self.config.graph_model
494    }
495
496    /// Returns the configured memory limit in bytes, if any.
497    #[must_use]
498    pub fn memory_limit(&self) -> Option<usize> {
499        self.config.memory_limit
500    }
501
502    /// Returns the underlying store.
503    ///
504    /// This provides direct access to the LPG store for algorithm implementations.
505    #[must_use]
506    pub fn store(&self) -> &Arc<LpgStore> {
507        &self.store
508    }
509
510    /// Returns the buffer manager for memory-aware operations.
511    #[must_use]
512    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
513        &self.buffer_manager
514    }
515
516    /// Closes the database, flushing all pending writes.
517    ///
518    /// For persistent databases, this ensures everything is safely on disk.
519    /// Called automatically when the database is dropped, but you can call
520    /// it explicitly if you need to guarantee durability at a specific point.
521    ///
522    /// # Errors
523    ///
524    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
525    pub fn close(&self) -> Result<()> {
526        let mut is_open = self.is_open.write();
527        if !*is_open {
528            return Ok(());
529        }
530
531        // Commit and checkpoint WAL
532        #[cfg(feature = "wal")]
533        if let Some(ref wal) = self.wal {
534            let epoch = self.store.current_epoch();
535
536            // Use the last assigned transaction ID, or create a checkpoint-only tx
537            let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
538                // No transactions have been started; begin one for checkpoint
539                self.tx_manager.begin()
540            });
541
542            // Log a TxCommit to mark all pending records as committed
543            wal.log(&WalRecord::TxCommit {
544                tx_id: checkpoint_tx,
545            })?;
546
547            // Then checkpoint
548            wal.checkpoint(checkpoint_tx, epoch)?;
549            wal.sync()?;
550        }
551
552        *is_open = false;
553        Ok(())
554    }
555
556    /// Returns the WAL manager if available.
557    #[cfg(feature = "wal")]
558    #[must_use]
559    pub fn wal(&self) -> Option<&Arc<WalManager>> {
560        self.wal.as_ref()
561    }
562
563    /// Logs a WAL record if WAL is enabled.
564    #[cfg(feature = "wal")]
565    fn log_wal(&self, record: &WalRecord) -> Result<()> {
566        if let Some(ref wal) = self.wal {
567            wal.log(record)?;
568        }
569        Ok(())
570    }
571
572    /// Returns the number of nodes in the database.
573    #[must_use]
574    pub fn node_count(&self) -> usize {
575        self.store.node_count()
576    }
577
578    /// Returns the number of edges in the database.
579    #[must_use]
580    pub fn edge_count(&self) -> usize {
581        self.store.edge_count()
582    }
583
584    /// Returns the number of distinct labels in the database.
585    #[must_use]
586    pub fn label_count(&self) -> usize {
587        self.store.label_count()
588    }
589
590    /// Returns the number of distinct property keys in the database.
591    #[must_use]
592    pub fn property_key_count(&self) -> usize {
593        self.store.property_key_count()
594    }
595
596    /// Returns the number of distinct edge types in the database.
597    #[must_use]
598    pub fn edge_type_count(&self) -> usize {
599        self.store.edge_type_count()
600    }
601
602    // === Node Operations ===
603
604    /// Creates a node with the given labels and returns its ID.
605    ///
606    /// Labels categorize nodes - think of them like tags. A node can have
607    /// multiple labels (e.g., `["Person", "Employee"]`).
608    ///
609    /// # Examples
610    ///
611    /// ```
612    /// use grafeo_engine::GrafeoDB;
613    ///
614    /// let db = GrafeoDB::new_in_memory();
615    /// let alice = db.create_node(&["Person"]);
616    /// let company = db.create_node(&["Company", "Startup"]);
617    /// ```
618    pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
619        let id = self.store.create_node(labels);
620
621        // Log to WAL if enabled
622        #[cfg(feature = "wal")]
623        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
624            id,
625            labels: labels.iter().map(|s| (*s).to_string()).collect(),
626        }) {
627            tracing::warn!("Failed to log CreateNode to WAL: {}", e);
628        }
629
630        id
631    }
632
633    /// Creates a new node with labels and properties.
634    ///
635    /// If WAL is enabled, the operation is logged for durability.
636    pub fn create_node_with_props(
637        &self,
638        labels: &[&str],
639        properties: impl IntoIterator<
640            Item = (
641                impl Into<grafeo_common::types::PropertyKey>,
642                impl Into<grafeo_common::types::Value>,
643            ),
644        >,
645    ) -> grafeo_common::types::NodeId {
646        // Collect properties first so we can log them to WAL
647        let props: Vec<(
648            grafeo_common::types::PropertyKey,
649            grafeo_common::types::Value,
650        )> = properties
651            .into_iter()
652            .map(|(k, v)| (k.into(), v.into()))
653            .collect();
654
655        let id = self
656            .store
657            .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
658
659        // Log node creation to WAL
660        #[cfg(feature = "wal")]
661        {
662            if let Err(e) = self.log_wal(&WalRecord::CreateNode {
663                id,
664                labels: labels.iter().map(|s| (*s).to_string()).collect(),
665            }) {
666                tracing::warn!("Failed to log CreateNode to WAL: {}", e);
667            }
668
669            // Log each property to WAL for full durability
670            for (key, value) in props {
671                if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
672                    id,
673                    key: key.to_string(),
674                    value,
675                }) {
676                    tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
677                }
678            }
679        }
680
681        id
682    }
683
684    /// Gets a node by ID.
685    #[must_use]
686    pub fn get_node(
687        &self,
688        id: grafeo_common::types::NodeId,
689    ) -> Option<grafeo_core::graph::lpg::Node> {
690        self.store.get_node(id)
691    }
692
693    /// Deletes a node and all its edges.
694    ///
695    /// If WAL is enabled, the operation is logged for durability.
696    pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
697        let result = self.store.delete_node(id);
698
699        #[cfg(feature = "wal")]
700        if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
701            tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
702        }
703
704        result
705    }
706
707    /// Sets a property on a node.
708    ///
709    /// If WAL is enabled, the operation is logged for durability.
710    pub fn set_node_property(
711        &self,
712        id: grafeo_common::types::NodeId,
713        key: &str,
714        value: grafeo_common::types::Value,
715    ) {
716        // Log to WAL first
717        #[cfg(feature = "wal")]
718        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
719            id,
720            key: key.to_string(),
721            value: value.clone(),
722        }) {
723            tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
724        }
725
726        self.store.set_node_property(id, key, value);
727    }
728
729    /// Adds a label to an existing node.
730    ///
731    /// Returns `true` if the label was added, `false` if the node doesn't exist
732    /// or already has the label.
733    ///
734    /// # Examples
735    ///
736    /// ```
737    /// use grafeo_engine::GrafeoDB;
738    ///
739    /// let db = GrafeoDB::new_in_memory();
740    /// let alice = db.create_node(&["Person"]);
741    ///
742    /// // Promote Alice to Employee
743    /// let added = db.add_node_label(alice, "Employee");
744    /// assert!(added);
745    /// ```
746    pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
747        let result = self.store.add_label(id, label);
748
749        #[cfg(feature = "wal")]
750        if result {
751            // Log to WAL if enabled
752            if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
753                id,
754                label: label.to_string(),
755            }) {
756                tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
757            }
758        }
759
760        result
761    }
762
763    /// Removes a label from a node.
764    ///
765    /// Returns `true` if the label was removed, `false` if the node doesn't exist
766    /// or doesn't have the label.
767    ///
768    /// # Examples
769    ///
770    /// ```
771    /// use grafeo_engine::GrafeoDB;
772    ///
773    /// let db = GrafeoDB::new_in_memory();
774    /// let alice = db.create_node(&["Person", "Employee"]);
775    ///
776    /// // Remove Employee status
777    /// let removed = db.remove_node_label(alice, "Employee");
778    /// assert!(removed);
779    /// ```
780    pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
781        let result = self.store.remove_label(id, label);
782
783        #[cfg(feature = "wal")]
784        if result {
785            // Log to WAL if enabled
786            if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
787                id,
788                label: label.to_string(),
789            }) {
790                tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
791            }
792        }
793
794        result
795    }
796
797    /// Gets all labels for a node.
798    ///
799    /// Returns `None` if the node doesn't exist.
800    ///
801    /// # Examples
802    ///
803    /// ```
804    /// use grafeo_engine::GrafeoDB;
805    ///
806    /// let db = GrafeoDB::new_in_memory();
807    /// let alice = db.create_node(&["Person", "Employee"]);
808    ///
809    /// let labels = db.get_node_labels(alice).unwrap();
810    /// assert!(labels.contains(&"Person".to_string()));
811    /// assert!(labels.contains(&"Employee".to_string()));
812    /// ```
813    #[must_use]
814    pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
815        self.store
816            .get_node(id)
817            .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
818    }
819
820    // === Edge Operations ===
821
822    /// Creates an edge (relationship) between two nodes.
823    ///
824    /// Edges connect nodes and have a type that describes the relationship.
825    /// They're directed - the order of `src` and `dst` matters.
826    ///
827    /// # Examples
828    ///
829    /// ```
830    /// use grafeo_engine::GrafeoDB;
831    ///
832    /// let db = GrafeoDB::new_in_memory();
833    /// let alice = db.create_node(&["Person"]);
834    /// let bob = db.create_node(&["Person"]);
835    ///
836    /// // Alice knows Bob (directed: Alice -> Bob)
837    /// let edge = db.create_edge(alice, bob, "KNOWS");
838    /// ```
839    pub fn create_edge(
840        &self,
841        src: grafeo_common::types::NodeId,
842        dst: grafeo_common::types::NodeId,
843        edge_type: &str,
844    ) -> grafeo_common::types::EdgeId {
845        let id = self.store.create_edge(src, dst, edge_type);
846
847        // Log to WAL if enabled
848        #[cfg(feature = "wal")]
849        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
850            id,
851            src,
852            dst,
853            edge_type: edge_type.to_string(),
854        }) {
855            tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
856        }
857
858        id
859    }
860
861    /// Creates a new edge with properties.
862    ///
863    /// If WAL is enabled, the operation is logged for durability.
864    pub fn create_edge_with_props(
865        &self,
866        src: grafeo_common::types::NodeId,
867        dst: grafeo_common::types::NodeId,
868        edge_type: &str,
869        properties: impl IntoIterator<
870            Item = (
871                impl Into<grafeo_common::types::PropertyKey>,
872                impl Into<grafeo_common::types::Value>,
873            ),
874        >,
875    ) -> grafeo_common::types::EdgeId {
876        // Collect properties first so we can log them to WAL
877        let props: Vec<(
878            grafeo_common::types::PropertyKey,
879            grafeo_common::types::Value,
880        )> = properties
881            .into_iter()
882            .map(|(k, v)| (k.into(), v.into()))
883            .collect();
884
885        let id = self.store.create_edge_with_props(
886            src,
887            dst,
888            edge_type,
889            props.iter().map(|(k, v)| (k.clone(), v.clone())),
890        );
891
892        // Log edge creation to WAL
893        #[cfg(feature = "wal")]
894        {
895            if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
896                id,
897                src,
898                dst,
899                edge_type: edge_type.to_string(),
900            }) {
901                tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
902            }
903
904            // Log each property to WAL for full durability
905            for (key, value) in props {
906                if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
907                    id,
908                    key: key.to_string(),
909                    value,
910                }) {
911                    tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
912                }
913            }
914        }
915
916        id
917    }
918
919    /// Gets an edge by ID.
920    #[must_use]
921    pub fn get_edge(
922        &self,
923        id: grafeo_common::types::EdgeId,
924    ) -> Option<grafeo_core::graph::lpg::Edge> {
925        self.store.get_edge(id)
926    }
927
928    /// Deletes an edge.
929    ///
930    /// If WAL is enabled, the operation is logged for durability.
931    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
932        let result = self.store.delete_edge(id);
933
934        #[cfg(feature = "wal")]
935        if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
936            tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
937        }
938
939        result
940    }
941
942    /// Sets a property on an edge.
943    ///
944    /// If WAL is enabled, the operation is logged for durability.
945    pub fn set_edge_property(
946        &self,
947        id: grafeo_common::types::EdgeId,
948        key: &str,
949        value: grafeo_common::types::Value,
950    ) {
951        // Log to WAL first
952        #[cfg(feature = "wal")]
953        if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
954            id,
955            key: key.to_string(),
956            value: value.clone(),
957        }) {
958            tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
959        }
960        self.store.set_edge_property(id, key, value);
961    }
962
963    /// Removes a property from a node.
964    ///
965    /// Returns true if the property existed and was removed, false otherwise.
966    pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
967        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
968        self.store.remove_node_property(id, key).is_some()
969    }
970
971    /// Removes a property from an edge.
972    ///
973    /// Returns true if the property existed and was removed, false otherwise.
974    pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
975        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
976        self.store.remove_edge_property(id, key).is_some()
977    }
978
979    // =========================================================================
980    // PROPERTY INDEX API
981    // =========================================================================
982
983    /// Creates an index on a node property for O(1) lookups by value.
984    ///
985    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
986    /// O(1) instead of O(n) for this property. The index is automatically
987    /// maintained when properties are set or removed.
988    ///
989    /// # Example
990    ///
991    /// ```ignore
992    /// // Create an index on the 'email' property
993    /// db.create_property_index("email");
994    ///
995    /// // Now lookups by email are O(1)
996    /// let nodes = db.find_nodes_by_property("email", &Value::from("alice@example.com"));
997    /// ```
998    pub fn create_property_index(&self, property: &str) {
999        self.store.create_property_index(property);
1000    }
1001
1002    /// Creates a vector similarity index on a node property.
1003    ///
1004    /// This enables efficient approximate nearest-neighbor search on vector
1005    /// properties. Currently validates the index parameters and scans existing
1006    /// nodes to verify the property contains vectors of the expected dimensions.
1007    ///
1008    /// # Arguments
1009    ///
1010    /// * `label` - Node label to index (e.g., `"Doc"`)
1011    /// * `property` - Property containing vector embeddings (e.g., `"embedding"`)
1012    /// * `dimensions` - Expected vector dimensions (inferred from data if `None`)
1013    /// * `metric` - Distance metric: `"cosine"` (default), `"euclidean"`, `"dot_product"`, `"manhattan"`
1014    /// * `m` - HNSW links per node (default: 16). Higher = better recall, more memory.
1015    /// * `ef_construction` - Construction beam width (default: 128). Higher = better index quality, slower build.
1016    ///
1017    /// # Errors
1018    ///
1019    /// Returns an error if the metric is invalid, no vectors are found, or
1020    /// dimensions don't match.
1021    pub fn create_vector_index(
1022        &self,
1023        label: &str,
1024        property: &str,
1025        dimensions: Option<usize>,
1026        metric: Option<&str>,
1027        m: Option<usize>,
1028        ef_construction: Option<usize>,
1029    ) -> Result<()> {
1030        use grafeo_common::types::{PropertyKey, Value};
1031        use grafeo_core::index::vector::DistanceMetric;
1032
1033        let metric = match metric {
1034            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1035                grafeo_common::utils::error::Error::Internal(format!(
1036                    "Unknown distance metric '{}'. Use: cosine, euclidean, dot_product, manhattan",
1037                    m
1038                ))
1039            })?,
1040            None => DistanceMetric::Cosine,
1041        };
1042
1043        // Scan nodes to validate vectors exist and check dimensions
1044        let prop_key = PropertyKey::new(property);
1045        let mut found_dims: Option<usize> = dimensions;
1046        let mut vector_count = 0usize;
1047
1048        #[cfg(feature = "vector-index")]
1049        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1050
1051        for node in self.store.nodes_with_label(label) {
1052            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1053                if let Some(expected) = found_dims {
1054                    if v.len() != expected {
1055                        return Err(grafeo_common::utils::error::Error::Internal(format!(
1056                            "Vector dimension mismatch: expected {}, found {} on node {}",
1057                            expected,
1058                            v.len(),
1059                            node.id.0
1060                        )));
1061                    }
1062                } else {
1063                    found_dims = Some(v.len());
1064                }
1065                vector_count += 1;
1066                #[cfg(feature = "vector-index")]
1067                vectors.push((node.id, v.to_vec()));
1068            }
1069        }
1070
1071        if vector_count == 0 {
1072            return Err(grafeo_common::utils::error::Error::Internal(format!(
1073                "No vector properties found on :{label}({property})"
1074            )));
1075        }
1076
1077        let dims = found_dims.unwrap_or(0);
1078
1079        // Build and populate the HNSW index
1080        #[cfg(feature = "vector-index")]
1081        {
1082            use grafeo_core::index::vector::{HnswConfig, HnswIndex};
1083
1084            let mut config = HnswConfig::new(dims, metric);
1085            if let Some(m_val) = m {
1086                config = config.with_m(m_val);
1087            }
1088            if let Some(ef_c) = ef_construction {
1089                config = config.with_ef_construction(ef_c);
1090            }
1091
1092            let index = HnswIndex::with_capacity(config, vectors.len());
1093            for (node_id, vec) in &vectors {
1094                index.insert(*node_id, vec);
1095            }
1096
1097            self.store
1098                .add_vector_index(label, property, Arc::new(index));
1099        }
1100
1101        // Suppress unused variable warnings when vector-index is off
1102        let _ = (m, ef_construction);
1103
1104        tracing::info!(
1105            "Vector index created: :{label}({property}) - {vector_count} vectors, {dims} dimensions, metric={metric_name}",
1106            metric_name = metric.name()
1107        );
1108
1109        Ok(())
1110    }
1111
1112    /// Searches for the k nearest neighbors of a query vector.
1113    ///
1114    /// Uses the HNSW index created by [`create_vector_index`](Self::create_vector_index).
1115    ///
1116    /// # Arguments
1117    ///
1118    /// * `label` - Node label that was indexed
1119    /// * `property` - Property that was indexed
1120    /// * `query` - Query vector (slice of floats)
1121    /// * `k` - Number of nearest neighbors to return
1122    /// * `ef` - Search beam width (higher = better recall, slower). Uses index default if `None`.
1123    ///
1124    /// # Returns
1125    ///
1126    /// Vector of `(NodeId, distance)` pairs sorted by distance ascending.
1127    #[cfg(feature = "vector-index")]
1128    pub fn vector_search(
1129        &self,
1130        label: &str,
1131        property: &str,
1132        query: &[f32],
1133        k: usize,
1134        ef: Option<usize>,
1135    ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1136        let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1137            grafeo_common::utils::error::Error::Internal(format!(
1138                "No vector index found for :{label}({property}). Call create_vector_index() first."
1139            ))
1140        })?;
1141
1142        let results = match ef {
1143            Some(ef_val) => index.search_with_ef(query, k, ef_val),
1144            None => index.search(query, k),
1145        };
1146
1147        Ok(results)
1148    }
1149
1150    /// Creates multiple nodes in bulk, each with a single vector property.
1151    ///
1152    /// Much faster than individual `create_node_with_props` calls because it
1153    /// acquires internal locks once and loops in Rust rather than crossing
1154    /// the FFI boundary per vector.
1155    ///
1156    /// # Arguments
1157    ///
1158    /// * `label` - Label applied to all created nodes
1159    /// * `property` - Property name for the vector data
1160    /// * `vectors` - Vector data for each node
1161    ///
1162    /// # Returns
1163    ///
1164    /// Vector of created `NodeId`s in the same order as the input vectors.
1165    pub fn batch_create_nodes(
1166        &self,
1167        label: &str,
1168        property: &str,
1169        vectors: Vec<Vec<f32>>,
1170    ) -> Vec<grafeo_common::types::NodeId> {
1171        use grafeo_common::types::{PropertyKey, Value};
1172
1173        let prop_key = PropertyKey::new(property);
1174        let labels: &[&str] = &[label];
1175
1176        vectors
1177            .into_iter()
1178            .map(|vec| {
1179                let value = Value::Vector(vec.into());
1180                let id = self.store.create_node_with_props(
1181                    labels,
1182                    std::iter::once((prop_key.clone(), value.clone())),
1183                );
1184
1185                // Log to WAL
1186                #[cfg(feature = "wal")]
1187                {
1188                    if let Err(e) = self.log_wal(&WalRecord::CreateNode {
1189                        id,
1190                        labels: labels.iter().map(|s| (*s).to_string()).collect(),
1191                    }) {
1192                        tracing::warn!("Failed to log CreateNode to WAL: {}", e);
1193                    }
1194                    if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
1195                        id,
1196                        key: property.to_string(),
1197                        value,
1198                    }) {
1199                        tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
1200                    }
1201                }
1202
1203                id
1204            })
1205            .collect()
1206    }
1207
1208    /// Searches for nearest neighbors for multiple query vectors in parallel.
1209    ///
1210    /// Uses rayon parallel iteration under the hood for multi-core throughput.
1211    ///
1212    /// # Arguments
1213    ///
1214    /// * `label` - Node label that was indexed
1215    /// * `property` - Property that was indexed
1216    /// * `queries` - Batch of query vectors
1217    /// * `k` - Number of nearest neighbors per query
1218    /// * `ef` - Search beam width (uses index default if `None`)
1219    #[cfg(feature = "vector-index")]
1220    pub fn batch_vector_search(
1221        &self,
1222        label: &str,
1223        property: &str,
1224        queries: &[Vec<f32>],
1225        k: usize,
1226        ef: Option<usize>,
1227    ) -> Result<Vec<Vec<(grafeo_common::types::NodeId, f32)>>> {
1228        let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1229            grafeo_common::utils::error::Error::Internal(format!(
1230                "No vector index found for :{label}({property}). Call create_vector_index() first."
1231            ))
1232        })?;
1233
1234        let results = match ef {
1235            Some(ef_val) => index.batch_search_with_ef(queries, k, ef_val),
1236            None => index.batch_search(queries, k),
1237        };
1238
1239        Ok(results)
1240    }
1241
1242    /// Drops an index on a node property.
1243    ///
1244    /// Returns `true` if the index existed and was removed.
1245    pub fn drop_property_index(&self, property: &str) -> bool {
1246        self.store.drop_property_index(property)
1247    }
1248
1249    /// Returns `true` if the property has an index.
1250    #[must_use]
1251    pub fn has_property_index(&self, property: &str) -> bool {
1252        self.store.has_property_index(property)
1253    }
1254
1255    /// Finds all nodes that have a specific property value.
1256    ///
1257    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
1258    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
1259    ///
1260    /// # Example
1261    ///
1262    /// ```ignore
1263    /// // Create index for fast lookups (optional but recommended)
1264    /// db.create_property_index("city");
1265    ///
1266    /// // Find all nodes where city = "NYC"
1267    /// let nyc_nodes = db.find_nodes_by_property("city", &Value::from("NYC"));
1268    /// ```
1269    #[must_use]
1270    pub fn find_nodes_by_property(
1271        &self,
1272        property: &str,
1273        value: &grafeo_common::types::Value,
1274    ) -> Vec<grafeo_common::types::NodeId> {
1275        self.store.find_nodes_by_property(property, value)
1276    }
1277
1278    // =========================================================================
1279    // ADMIN API: Introspection
1280    // =========================================================================
1281
1282    /// Returns true if this database is backed by a file (persistent).
1283    ///
1284    /// In-memory databases return false.
1285    #[must_use]
1286    pub fn is_persistent(&self) -> bool {
1287        self.config.path.is_some()
1288    }
1289
1290    /// Returns the database file path, if persistent.
1291    ///
1292    /// In-memory databases return None.
1293    #[must_use]
1294    pub fn path(&self) -> Option<&Path> {
1295        self.config.path.as_deref()
1296    }
1297
1298    /// Returns high-level database information.
1299    ///
1300    /// Includes node/edge counts, persistence status, and mode (LPG/RDF).
1301    #[must_use]
1302    pub fn info(&self) -> crate::admin::DatabaseInfo {
1303        crate::admin::DatabaseInfo {
1304            mode: crate::admin::DatabaseMode::Lpg,
1305            node_count: self.store.node_count(),
1306            edge_count: self.store.edge_count(),
1307            is_persistent: self.is_persistent(),
1308            path: self.config.path.clone(),
1309            wal_enabled: self.config.wal_enabled,
1310            version: env!("CARGO_PKG_VERSION").to_string(),
1311        }
1312    }
1313
1314    /// Returns detailed database statistics.
1315    ///
1316    /// Includes counts, memory usage, and index information.
1317    #[must_use]
1318    pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1319        #[cfg(feature = "wal")]
1320        let disk_bytes = self.config.path.as_ref().and_then(|p| {
1321            if p.exists() {
1322                Self::calculate_disk_usage(p).ok()
1323            } else {
1324                None
1325            }
1326        });
1327        #[cfg(not(feature = "wal"))]
1328        let disk_bytes: Option<usize> = None;
1329
1330        crate::admin::DatabaseStats {
1331            node_count: self.store.node_count(),
1332            edge_count: self.store.edge_count(),
1333            label_count: self.store.label_count(),
1334            edge_type_count: self.store.edge_type_count(),
1335            property_key_count: self.store.property_key_count(),
1336            index_count: 0, // TODO: implement index tracking
1337            memory_bytes: self.buffer_manager.allocated(),
1338            disk_bytes,
1339        }
1340    }
1341
1342    /// Calculates total disk usage for the database directory.
1343    #[cfg(feature = "wal")]
1344    fn calculate_disk_usage(path: &Path) -> Result<usize> {
1345        let mut total = 0usize;
1346        if path.is_dir() {
1347            for entry in std::fs::read_dir(path)? {
1348                let entry = entry?;
1349                let metadata = entry.metadata()?;
1350                if metadata.is_file() {
1351                    total += metadata.len() as usize;
1352                } else if metadata.is_dir() {
1353                    total += Self::calculate_disk_usage(&entry.path())?;
1354                }
1355            }
1356        }
1357        Ok(total)
1358    }
1359
1360    /// Returns schema information (labels, edge types, property keys).
1361    ///
1362    /// For LPG mode, returns label and edge type information.
1363    /// For RDF mode, returns predicate and named graph information.
1364    #[must_use]
1365    pub fn schema(&self) -> crate::admin::SchemaInfo {
1366        let labels = self
1367            .store
1368            .all_labels()
1369            .into_iter()
1370            .map(|name| crate::admin::LabelInfo {
1371                name: name.clone(),
1372                count: self.store.nodes_with_label(&name).count(),
1373            })
1374            .collect();
1375
1376        let edge_types = self
1377            .store
1378            .all_edge_types()
1379            .into_iter()
1380            .map(|name| crate::admin::EdgeTypeInfo {
1381                name: name.clone(),
1382                count: self.store.edges_with_type(&name).count(),
1383            })
1384            .collect();
1385
1386        let property_keys = self.store.all_property_keys();
1387
1388        crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
1389            labels,
1390            edge_types,
1391            property_keys,
1392        })
1393    }
1394
1395    /// Returns RDF schema information.
1396    ///
1397    /// Only available when the RDF feature is enabled.
1398    #[cfg(feature = "rdf")]
1399    #[must_use]
1400    pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
1401        let stats = self.rdf_store.stats();
1402
1403        let predicates = self
1404            .rdf_store
1405            .predicates()
1406            .into_iter()
1407            .map(|predicate| {
1408                let count = self.rdf_store.triples_with_predicate(&predicate).len();
1409                crate::admin::PredicateInfo {
1410                    iri: predicate.to_string(),
1411                    count,
1412                }
1413            })
1414            .collect();
1415
1416        crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
1417            predicates,
1418            named_graphs: Vec::new(), // Named graphs not yet implemented in RdfStore
1419            subject_count: stats.subject_count,
1420            object_count: stats.object_count,
1421        })
1422    }
1423
1424    /// Validates database integrity.
1425    ///
1426    /// Checks for:
1427    /// - Dangling edge references (edges pointing to non-existent nodes)
1428    /// - Internal index consistency
1429    ///
1430    /// Returns a list of errors and warnings. Empty errors = valid.
1431    #[must_use]
1432    pub fn validate(&self) -> crate::admin::ValidationResult {
1433        let mut result = crate::admin::ValidationResult::default();
1434
1435        // Check for dangling edge references
1436        for edge in self.store.all_edges() {
1437            if self.store.get_node(edge.src).is_none() {
1438                result.errors.push(crate::admin::ValidationError {
1439                    code: "DANGLING_SRC".to_string(),
1440                    message: format!(
1441                        "Edge {} references non-existent source node {}",
1442                        edge.id.0, edge.src.0
1443                    ),
1444                    context: Some(format!("edge:{}", edge.id.0)),
1445                });
1446            }
1447            if self.store.get_node(edge.dst).is_none() {
1448                result.errors.push(crate::admin::ValidationError {
1449                    code: "DANGLING_DST".to_string(),
1450                    message: format!(
1451                        "Edge {} references non-existent destination node {}",
1452                        edge.id.0, edge.dst.0
1453                    ),
1454                    context: Some(format!("edge:{}", edge.id.0)),
1455                });
1456            }
1457        }
1458
1459        // Add warnings for potential issues
1460        if self.store.node_count() > 0 && self.store.edge_count() == 0 {
1461            result.warnings.push(crate::admin::ValidationWarning {
1462                code: "NO_EDGES".to_string(),
1463                message: "Database has nodes but no edges".to_string(),
1464                context: None,
1465            });
1466        }
1467
1468        result
1469    }
1470
1471    /// Returns WAL (Write-Ahead Log) status.
1472    ///
1473    /// Returns None if WAL is not enabled.
1474    #[must_use]
1475    pub fn wal_status(&self) -> crate::admin::WalStatus {
1476        #[cfg(feature = "wal")]
1477        if let Some(ref wal) = self.wal {
1478            return crate::admin::WalStatus {
1479                enabled: true,
1480                path: self.config.path.as_ref().map(|p| p.join("wal")),
1481                size_bytes: wal.size_bytes(),
1482                record_count: wal.record_count() as usize,
1483                last_checkpoint: wal.last_checkpoint_timestamp(),
1484                current_epoch: self.store.current_epoch().as_u64(),
1485            };
1486        }
1487
1488        crate::admin::WalStatus {
1489            enabled: false,
1490            path: None,
1491            size_bytes: 0,
1492            record_count: 0,
1493            last_checkpoint: None,
1494            current_epoch: self.store.current_epoch().as_u64(),
1495        }
1496    }
1497
1498    /// Forces a WAL checkpoint.
1499    ///
1500    /// Flushes all pending WAL records to the main storage.
1501    ///
1502    /// # Errors
1503    ///
1504    /// Returns an error if the checkpoint fails.
1505    pub fn wal_checkpoint(&self) -> Result<()> {
1506        #[cfg(feature = "wal")]
1507        if let Some(ref wal) = self.wal {
1508            let epoch = self.store.current_epoch();
1509            let tx_id = self
1510                .tx_manager
1511                .last_assigned_tx_id()
1512                .unwrap_or_else(|| self.tx_manager.begin());
1513            wal.checkpoint(tx_id, epoch)?;
1514            wal.sync()?;
1515        }
1516        Ok(())
1517    }
1518
1519    // =========================================================================
1520    // ADMIN API: Persistence Control
1521    // =========================================================================
1522
1523    /// Saves the database to a file path.
1524    ///
1525    /// - If in-memory: creates a new persistent database at path
1526    /// - If file-backed: creates a copy at the new path
1527    ///
1528    /// The original database remains unchanged.
1529    ///
1530    /// # Errors
1531    ///
1532    /// Returns an error if the save operation fails.
1533    ///
1534    /// Requires the `wal` feature for persistence support.
1535    #[cfg(feature = "wal")]
1536    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
1537        let path = path.as_ref();
1538
1539        // Create target database with WAL enabled
1540        let target_config = Config::persistent(path);
1541        let target = Self::with_config(target_config)?;
1542
1543        // Copy all nodes using WAL-enabled methods
1544        for node in self.store.all_nodes() {
1545            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1546            target.store.create_node_with_id(node.id, &label_refs);
1547
1548            // Log to WAL
1549            target.log_wal(&WalRecord::CreateNode {
1550                id: node.id,
1551                labels: node.labels.iter().map(|s| s.to_string()).collect(),
1552            })?;
1553
1554            // Copy properties
1555            for (key, value) in node.properties {
1556                target
1557                    .store
1558                    .set_node_property(node.id, key.as_str(), value.clone());
1559                target.log_wal(&WalRecord::SetNodeProperty {
1560                    id: node.id,
1561                    key: key.to_string(),
1562                    value,
1563                })?;
1564            }
1565        }
1566
1567        // Copy all edges using WAL-enabled methods
1568        for edge in self.store.all_edges() {
1569            target
1570                .store
1571                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1572
1573            // Log to WAL
1574            target.log_wal(&WalRecord::CreateEdge {
1575                id: edge.id,
1576                src: edge.src,
1577                dst: edge.dst,
1578                edge_type: edge.edge_type.to_string(),
1579            })?;
1580
1581            // Copy properties
1582            for (key, value) in edge.properties {
1583                target
1584                    .store
1585                    .set_edge_property(edge.id, key.as_str(), value.clone());
1586                target.log_wal(&WalRecord::SetEdgeProperty {
1587                    id: edge.id,
1588                    key: key.to_string(),
1589                    value,
1590                })?;
1591            }
1592        }
1593
1594        // Checkpoint and close the target database
1595        target.close()?;
1596
1597        Ok(())
1598    }
1599
1600    /// Creates an in-memory copy of this database.
1601    ///
1602    /// Returns a new database that is completely independent.
1603    /// Useful for:
1604    /// - Testing modifications without affecting the original
1605    /// - Faster operations when persistence isn't needed
1606    ///
1607    /// # Errors
1608    ///
1609    /// Returns an error if the copy operation fails.
1610    pub fn to_memory(&self) -> Result<Self> {
1611        let config = Config::in_memory();
1612        let target = Self::with_config(config)?;
1613
1614        // Copy all nodes
1615        for node in self.store.all_nodes() {
1616            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1617            target.store.create_node_with_id(node.id, &label_refs);
1618
1619            // Copy properties
1620            for (key, value) in node.properties {
1621                target.store.set_node_property(node.id, key.as_str(), value);
1622            }
1623        }
1624
1625        // Copy all edges
1626        for edge in self.store.all_edges() {
1627            target
1628                .store
1629                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1630
1631            // Copy properties
1632            for (key, value) in edge.properties {
1633                target.store.set_edge_property(edge.id, key.as_str(), value);
1634            }
1635        }
1636
1637        Ok(target)
1638    }
1639
1640    /// Opens a database file and loads it entirely into memory.
1641    ///
1642    /// The returned database has no connection to the original file.
1643    /// Changes will NOT be written back to the file.
1644    ///
1645    /// # Errors
1646    ///
1647    /// Returns an error if the file can't be opened or loaded.
1648    #[cfg(feature = "wal")]
1649    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
1650        // Open the source database (triggers WAL recovery)
1651        let source = Self::open(path)?;
1652
1653        // Create in-memory copy
1654        let target = source.to_memory()?;
1655
1656        // Close the source (releases file handles)
1657        source.close()?;
1658
1659        Ok(target)
1660    }
1661
1662    // =========================================================================
1663    // ADMIN API: Snapshot Export/Import
1664    // =========================================================================
1665
1666    /// Exports the entire database to a binary snapshot.
1667    ///
1668    /// The returned bytes can be stored (e.g. in IndexedDB) and later
1669    /// restored with [`import_snapshot()`](Self::import_snapshot).
1670    ///
1671    /// # Errors
1672    ///
1673    /// Returns an error if serialization fails.
1674    pub fn export_snapshot(&self) -> Result<Vec<u8>> {
1675        let nodes: Vec<SnapshotNode> = self
1676            .store
1677            .all_nodes()
1678            .map(|n| SnapshotNode {
1679                id: n.id,
1680                labels: n.labels.iter().map(|l| l.to_string()).collect(),
1681                properties: n
1682                    .properties
1683                    .into_iter()
1684                    .map(|(k, v)| (k.to_string(), v))
1685                    .collect(),
1686            })
1687            .collect();
1688
1689        let edges: Vec<SnapshotEdge> = self
1690            .store
1691            .all_edges()
1692            .map(|e| SnapshotEdge {
1693                id: e.id,
1694                src: e.src,
1695                dst: e.dst,
1696                edge_type: e.edge_type.to_string(),
1697                properties: e
1698                    .properties
1699                    .into_iter()
1700                    .map(|(k, v)| (k.to_string(), v))
1701                    .collect(),
1702            })
1703            .collect();
1704
1705        let snapshot = Snapshot {
1706            version: 1,
1707            nodes,
1708            edges,
1709        };
1710
1711        let config = bincode::config::standard();
1712        bincode::serde::encode_to_vec(&snapshot, config)
1713            .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
1714    }
1715
1716    /// Creates a new in-memory database from a binary snapshot.
1717    ///
1718    /// The `data` must have been produced by [`export_snapshot()`](Self::export_snapshot).
1719    ///
1720    /// # Errors
1721    ///
1722    /// Returns an error if the snapshot is invalid or deserialization fails.
1723    pub fn import_snapshot(data: &[u8]) -> Result<Self> {
1724        let config = bincode::config::standard();
1725        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
1726            .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
1727
1728        if snapshot.version != 1 {
1729            return Err(Error::Internal(format!(
1730                "unsupported snapshot version: {}",
1731                snapshot.version
1732            )));
1733        }
1734
1735        let db = Self::new_in_memory();
1736
1737        for node in snapshot.nodes {
1738            let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
1739            db.store.create_node_with_id(node.id, &label_refs);
1740            for (key, value) in node.properties {
1741                db.store.set_node_property(node.id, &key, value);
1742            }
1743        }
1744
1745        for edge in snapshot.edges {
1746            db.store
1747                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1748            for (key, value) in edge.properties {
1749                db.store.set_edge_property(edge.id, &key, value);
1750            }
1751        }
1752
1753        Ok(db)
1754    }
1755
1756    // =========================================================================
1757    // ADMIN API: Iteration
1758    // =========================================================================
1759
1760    /// Returns an iterator over all nodes in the database.
1761    ///
1762    /// Useful for dump/export operations.
1763    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1764        self.store.all_nodes()
1765    }
1766
1767    /// Returns an iterator over all edges in the database.
1768    ///
1769    /// Useful for dump/export operations.
1770    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1771        self.store.all_edges()
1772    }
1773}
1774
1775/// Binary snapshot format for database export/import.
1776#[derive(serde::Serialize, serde::Deserialize)]
1777struct Snapshot {
1778    version: u8,
1779    nodes: Vec<SnapshotNode>,
1780    edges: Vec<SnapshotEdge>,
1781}
1782
1783#[derive(serde::Serialize, serde::Deserialize)]
1784struct SnapshotNode {
1785    id: NodeId,
1786    labels: Vec<String>,
1787    properties: Vec<(String, Value)>,
1788}
1789
1790#[derive(serde::Serialize, serde::Deserialize)]
1791struct SnapshotEdge {
1792    id: EdgeId,
1793    src: NodeId,
1794    dst: NodeId,
1795    edge_type: String,
1796    properties: Vec<(String, Value)>,
1797}
1798
1799impl Drop for GrafeoDB {
1800    fn drop(&mut self) {
1801        if let Err(e) = self.close() {
1802            tracing::error!("Error closing database: {}", e);
1803        }
1804    }
1805}
1806
1807/// The result of running a query.
1808///
1809/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1810/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1811///
1812/// # Examples
1813///
1814/// ```
1815/// use grafeo_engine::GrafeoDB;
1816///
1817/// let db = GrafeoDB::new_in_memory();
1818/// db.create_node(&["Person"]);
1819///
1820/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1821///
1822/// // Check what we got
1823/// println!("Columns: {:?}", result.columns);
1824/// println!("Rows: {}", result.row_count());
1825///
1826/// // Iterate through results
1827/// for row in result.iter() {
1828///     println!("{:?}", row);
1829/// }
1830/// # Ok::<(), grafeo_common::utils::error::Error>(())
1831/// ```
1832#[derive(Debug)]
1833pub struct QueryResult {
1834    /// Column names from the RETURN clause.
1835    pub columns: Vec<String>,
1836    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1837    pub column_types: Vec<grafeo_common::types::LogicalType>,
1838    /// The actual result rows.
1839    pub rows: Vec<Vec<grafeo_common::types::Value>>,
1840    /// Query execution time in milliseconds (if timing was enabled).
1841    pub execution_time_ms: Option<f64>,
1842    /// Number of rows scanned during query execution (estimate).
1843    pub rows_scanned: Option<u64>,
1844}
1845
1846impl QueryResult {
1847    /// Creates a new empty query result.
1848    #[must_use]
1849    pub fn new(columns: Vec<String>) -> Self {
1850        let len = columns.len();
1851        Self {
1852            columns,
1853            column_types: vec![grafeo_common::types::LogicalType::Any; len],
1854            rows: Vec::new(),
1855            execution_time_ms: None,
1856            rows_scanned: None,
1857        }
1858    }
1859
1860    /// Creates a new empty query result with column types.
1861    #[must_use]
1862    pub fn with_types(
1863        columns: Vec<String>,
1864        column_types: Vec<grafeo_common::types::LogicalType>,
1865    ) -> Self {
1866        Self {
1867            columns,
1868            column_types,
1869            rows: Vec::new(),
1870            execution_time_ms: None,
1871            rows_scanned: None,
1872        }
1873    }
1874
1875    /// Sets the execution metrics on this result.
1876    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1877        self.execution_time_ms = Some(execution_time_ms);
1878        self.rows_scanned = Some(rows_scanned);
1879        self
1880    }
1881
1882    /// Returns the execution time in milliseconds, if available.
1883    #[must_use]
1884    pub fn execution_time_ms(&self) -> Option<f64> {
1885        self.execution_time_ms
1886    }
1887
1888    /// Returns the number of rows scanned, if available.
1889    #[must_use]
1890    pub fn rows_scanned(&self) -> Option<u64> {
1891        self.rows_scanned
1892    }
1893
1894    /// Returns the number of rows.
1895    #[must_use]
1896    pub fn row_count(&self) -> usize {
1897        self.rows.len()
1898    }
1899
1900    /// Returns the number of columns.
1901    #[must_use]
1902    pub fn column_count(&self) -> usize {
1903        self.columns.len()
1904    }
1905
1906    /// Returns true if the result is empty.
1907    #[must_use]
1908    pub fn is_empty(&self) -> bool {
1909        self.rows.is_empty()
1910    }
1911
1912    /// Extracts a single value from the result.
1913    ///
1914    /// Use this when your query returns exactly one row with one column,
1915    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1916    ///
1917    /// # Errors
1918    ///
1919    /// Returns an error if the result has multiple rows or columns.
1920    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1921        if self.rows.len() != 1 || self.columns.len() != 1 {
1922            return Err(grafeo_common::utils::error::Error::InvalidValue(
1923                "Expected single value".to_string(),
1924            ));
1925        }
1926        T::from_value(&self.rows[0][0])
1927    }
1928
1929    /// Returns an iterator over the rows.
1930    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1931        self.rows.iter()
1932    }
1933}
1934
1935/// Converts a [`Value`] to a concrete Rust type.
1936///
1937/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1938/// Used by [`QueryResult::scalar()`] to extract typed values.
1939pub trait FromValue: Sized {
1940    /// Attempts the conversion, returning an error on type mismatch.
1941    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1942}
1943
1944impl FromValue for i64 {
1945    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1946        value
1947            .as_int64()
1948            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1949                expected: "INT64".to_string(),
1950                found: value.type_name().to_string(),
1951            })
1952    }
1953}
1954
1955impl FromValue for f64 {
1956    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1957        value
1958            .as_float64()
1959            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1960                expected: "FLOAT64".to_string(),
1961                found: value.type_name().to_string(),
1962            })
1963    }
1964}
1965
1966impl FromValue for String {
1967    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1968        value.as_str().map(String::from).ok_or_else(|| {
1969            grafeo_common::utils::error::Error::TypeMismatch {
1970                expected: "STRING".to_string(),
1971                found: value.type_name().to_string(),
1972            }
1973        })
1974    }
1975}
1976
1977impl FromValue for bool {
1978    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1979        value
1980            .as_bool()
1981            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1982                expected: "BOOL".to_string(),
1983                found: value.type_name().to_string(),
1984            })
1985    }
1986}
1987
1988#[cfg(test)]
1989mod tests {
1990    use super::*;
1991
1992    #[test]
1993    fn test_create_in_memory_database() {
1994        let db = GrafeoDB::new_in_memory();
1995        assert_eq!(db.node_count(), 0);
1996        assert_eq!(db.edge_count(), 0);
1997    }
1998
1999    #[test]
2000    fn test_database_config() {
2001        let config = Config::in_memory().with_threads(4).with_query_logging();
2002
2003        let db = GrafeoDB::with_config(config).unwrap();
2004        assert_eq!(db.config().threads, 4);
2005        assert!(db.config().query_logging);
2006    }
2007
2008    #[test]
2009    fn test_database_session() {
2010        let db = GrafeoDB::new_in_memory();
2011        let _session = db.session();
2012        // Session should be created successfully
2013    }
2014
2015    #[cfg(feature = "wal")]
2016    #[test]
2017    fn test_persistent_database_recovery() {
2018        use grafeo_common::types::Value;
2019        use tempfile::tempdir;
2020
2021        let dir = tempdir().unwrap();
2022        let db_path = dir.path().join("test_db");
2023
2024        // Create database and add some data
2025        {
2026            let db = GrafeoDB::open(&db_path).unwrap();
2027
2028            let alice = db.create_node(&["Person"]);
2029            db.set_node_property(alice, "name", Value::from("Alice"));
2030
2031            let bob = db.create_node(&["Person"]);
2032            db.set_node_property(bob, "name", Value::from("Bob"));
2033
2034            let _edge = db.create_edge(alice, bob, "KNOWS");
2035
2036            // Explicitly close to flush WAL
2037            db.close().unwrap();
2038        }
2039
2040        // Reopen and verify data was recovered
2041        {
2042            let db = GrafeoDB::open(&db_path).unwrap();
2043
2044            assert_eq!(db.node_count(), 2);
2045            assert_eq!(db.edge_count(), 1);
2046
2047            // Verify nodes exist
2048            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2049            assert!(node0.is_some());
2050
2051            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2052            assert!(node1.is_some());
2053        }
2054    }
2055
2056    #[cfg(feature = "wal")]
2057    #[test]
2058    fn test_wal_logging() {
2059        use tempfile::tempdir;
2060
2061        let dir = tempdir().unwrap();
2062        let db_path = dir.path().join("wal_test_db");
2063
2064        let db = GrafeoDB::open(&db_path).unwrap();
2065
2066        // Create some data
2067        let node = db.create_node(&["Test"]);
2068        db.delete_node(node);
2069
2070        // WAL should have records
2071        if let Some(wal) = db.wal() {
2072            assert!(wal.record_count() > 0);
2073        }
2074
2075        db.close().unwrap();
2076    }
2077
2078    #[cfg(feature = "wal")]
2079    #[test]
2080    fn test_wal_recovery_multiple_sessions() {
2081        // Tests that WAL recovery works correctly across multiple open/close cycles
2082        use grafeo_common::types::Value;
2083        use tempfile::tempdir;
2084
2085        let dir = tempdir().unwrap();
2086        let db_path = dir.path().join("multi_session_db");
2087
2088        // Session 1: Create initial data
2089        {
2090            let db = GrafeoDB::open(&db_path).unwrap();
2091            let alice = db.create_node(&["Person"]);
2092            db.set_node_property(alice, "name", Value::from("Alice"));
2093            db.close().unwrap();
2094        }
2095
2096        // Session 2: Add more data
2097        {
2098            let db = GrafeoDB::open(&db_path).unwrap();
2099            assert_eq!(db.node_count(), 1); // Previous data recovered
2100            let bob = db.create_node(&["Person"]);
2101            db.set_node_property(bob, "name", Value::from("Bob"));
2102            db.close().unwrap();
2103        }
2104
2105        // Session 3: Verify all data
2106        {
2107            let db = GrafeoDB::open(&db_path).unwrap();
2108            assert_eq!(db.node_count(), 2);
2109
2110            // Verify properties were recovered correctly
2111            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
2112            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
2113
2114            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
2115            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
2116        }
2117    }
2118
2119    #[cfg(feature = "wal")]
2120    #[test]
2121    fn test_database_consistency_after_mutations() {
2122        // Tests that database remains consistent after a series of create/delete operations
2123        use grafeo_common::types::Value;
2124        use tempfile::tempdir;
2125
2126        let dir = tempdir().unwrap();
2127        let db_path = dir.path().join("consistency_db");
2128
2129        {
2130            let db = GrafeoDB::open(&db_path).unwrap();
2131
2132            // Create nodes
2133            let a = db.create_node(&["Node"]);
2134            let b = db.create_node(&["Node"]);
2135            let c = db.create_node(&["Node"]);
2136
2137            // Create edges
2138            let e1 = db.create_edge(a, b, "LINKS");
2139            let _e2 = db.create_edge(b, c, "LINKS");
2140
2141            // Delete middle node and its edge
2142            db.delete_edge(e1);
2143            db.delete_node(b);
2144
2145            // Set properties on remaining nodes
2146            db.set_node_property(a, "value", Value::Int64(1));
2147            db.set_node_property(c, "value", Value::Int64(3));
2148
2149            db.close().unwrap();
2150        }
2151
2152        // Reopen and verify consistency
2153        {
2154            let db = GrafeoDB::open(&db_path).unwrap();
2155
2156            // Should have 2 nodes (a and c), b was deleted
2157            // Note: node_count includes deleted nodes in some implementations
2158            // What matters is that the non-deleted nodes are accessible
2159            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
2160            assert!(node_a.is_some());
2161
2162            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
2163            assert!(node_c.is_some());
2164
2165            // Middle node should be deleted
2166            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
2167            assert!(node_b.is_none());
2168        }
2169    }
2170
2171    #[cfg(feature = "wal")]
2172    #[test]
2173    fn test_close_is_idempotent() {
2174        // Calling close() multiple times should not cause errors
2175        use tempfile::tempdir;
2176
2177        let dir = tempdir().unwrap();
2178        let db_path = dir.path().join("close_test_db");
2179
2180        let db = GrafeoDB::open(&db_path).unwrap();
2181        db.create_node(&["Test"]);
2182
2183        // First close should succeed
2184        assert!(db.close().is_ok());
2185
2186        // Second close should also succeed (idempotent)
2187        assert!(db.close().is_ok());
2188    }
2189
2190    #[test]
2191    fn test_query_result_has_metrics() {
2192        // Verifies that query results include execution metrics
2193        let db = GrafeoDB::new_in_memory();
2194        db.create_node(&["Person"]);
2195        db.create_node(&["Person"]);
2196
2197        #[cfg(feature = "gql")]
2198        {
2199            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2200
2201            // Metrics should be populated
2202            assert!(result.execution_time_ms.is_some());
2203            assert!(result.rows_scanned.is_some());
2204            assert!(result.execution_time_ms.unwrap() >= 0.0);
2205            assert_eq!(result.rows_scanned.unwrap(), 2);
2206        }
2207    }
2208
2209    #[test]
2210    fn test_empty_query_result_metrics() {
2211        // Verifies metrics are correct for queries returning no results
2212        let db = GrafeoDB::new_in_memory();
2213        db.create_node(&["Person"]);
2214
2215        #[cfg(feature = "gql")]
2216        {
2217            // Query that matches nothing
2218            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2219
2220            assert!(result.execution_time_ms.is_some());
2221            assert!(result.rows_scanned.is_some());
2222            assert_eq!(result.rows_scanned.unwrap(), 0);
2223        }
2224    }
2225}