Skip to main content

grafeo_engine/
database.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4
5use std::path::Path;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10#[cfg(feature = "wal")]
11use grafeo_adapters::storage::wal::{
12    DurabilityMode as WalDurabilityMode, WalConfig, WalManager, WalRecord, WalRecovery,
13};
14use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
15use grafeo_common::types::{EdgeId, NodeId, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::graph::lpg::LpgStore;
18#[cfg(feature = "rdf")]
19use grafeo_core::graph::rdf::RdfStore;
20
21use crate::config::Config;
22use crate::query::cache::QueryCache;
23use crate::session::Session;
24use crate::transaction::TransactionManager;
25
26/// Your handle to a Grafeo database.
27///
28/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
29/// quick experiments, or [`open()`](Self::open) for persistent storage.
30/// Then grab a [`session()`](Self::session) to start querying.
31///
32/// # Examples
33///
34/// ```
35/// use grafeo_engine::GrafeoDB;
36///
37/// // Quick in-memory database
38/// let db = GrafeoDB::new_in_memory();
39///
40/// // Add some data
41/// db.create_node(&["Person"]);
42///
43/// // Query it
44/// let session = db.session();
45/// let result = session.execute("MATCH (p:Person) RETURN p")?;
46/// # Ok::<(), grafeo_common::utils::error::Error>(())
47/// ```
48pub struct GrafeoDB {
49    /// Database configuration.
50    config: Config,
51    /// The underlying graph store.
52    store: Arc<LpgStore>,
53    /// RDF triple store (if RDF feature is enabled).
54    #[cfg(feature = "rdf")]
55    rdf_store: Arc<RdfStore>,
56    /// Transaction manager.
57    tx_manager: Arc<TransactionManager>,
58    /// Unified buffer manager.
59    buffer_manager: Arc<BufferManager>,
60    /// Write-ahead log manager (if durability is enabled).
61    #[cfg(feature = "wal")]
62    wal: Option<Arc<WalManager>>,
63    /// Query cache for parsed and optimized plans.
64    query_cache: Arc<QueryCache>,
65    /// Whether the database is open.
66    is_open: RwLock<bool>,
67}
68
69impl GrafeoDB {
70    /// Creates an in-memory database - fast to create, gone when dropped.
71    ///
72    /// Use this for tests, experiments, or when you don't need persistence.
73    /// For data that survives restarts, use [`open()`](Self::open) instead.
74    ///
75    /// # Examples
76    ///
77    /// ```
78    /// use grafeo_engine::GrafeoDB;
79    ///
80    /// let db = GrafeoDB::new_in_memory();
81    /// let session = db.session();
82    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
83    /// # Ok::<(), grafeo_common::utils::error::Error>(())
84    /// ```
85    #[must_use]
86    pub fn new_in_memory() -> Self {
87        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
88    }
89
90    /// Opens a database at the given path, creating it if it doesn't exist.
91    ///
92    /// If you've used this path before, Grafeo recovers your data from the
93    /// write-ahead log automatically. First open on a new path creates an
94    /// empty database.
95    ///
96    /// # Errors
97    ///
98    /// Returns an error if the path isn't writable or recovery fails.
99    ///
100    /// # Examples
101    ///
102    /// ```no_run
103    /// use grafeo_engine::GrafeoDB;
104    ///
105    /// let db = GrafeoDB::open("./my_social_network")?;
106    /// # Ok::<(), grafeo_common::utils::error::Error>(())
107    /// ```
108    #[cfg(feature = "wal")]
109    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
110        Self::with_config(Config::persistent(path.as_ref()))
111    }
112
113    /// Creates a database with custom configuration.
114    ///
115    /// Use this when you need fine-grained control over memory limits,
116    /// thread counts, or persistence settings. For most cases,
117    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
118    /// are simpler.
119    ///
120    /// # Errors
121    ///
122    /// Returns an error if the database can't be created or recovery fails.
123    ///
124    /// # Examples
125    ///
126    /// ```
127    /// use grafeo_engine::{GrafeoDB, Config};
128    ///
129    /// // In-memory with a 512MB limit
130    /// let config = Config::in_memory()
131    ///     .with_memory_limit(512 * 1024 * 1024);
132    ///
133    /// let db = GrafeoDB::with_config(config)?;
134    /// # Ok::<(), grafeo_common::utils::error::Error>(())
135    /// ```
136    pub fn with_config(config: Config) -> Result<Self> {
137        // Validate configuration before proceeding
138        config
139            .validate()
140            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
141
142        let store = Arc::new(LpgStore::new());
143        #[cfg(feature = "rdf")]
144        let rdf_store = Arc::new(RdfStore::new());
145        let tx_manager = Arc::new(TransactionManager::new());
146
147        // Create buffer manager with configured limits
148        let buffer_config = BufferManagerConfig {
149            budget: config.memory_limit.unwrap_or_else(|| {
150                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
151            }),
152            spill_path: config
153                .spill_path
154                .clone()
155                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
156            ..BufferManagerConfig::default()
157        };
158        let buffer_manager = BufferManager::new(buffer_config);
159
160        // Initialize WAL if persistence is enabled
161        #[cfg(feature = "wal")]
162        let wal = if config.wal_enabled {
163            if let Some(ref db_path) = config.path {
164                // Create database directory if it doesn't exist
165                std::fs::create_dir_all(db_path)?;
166
167                let wal_path = db_path.join("wal");
168
169                // Check if WAL exists and recover if needed
170                if wal_path.exists() {
171                    let recovery = WalRecovery::new(&wal_path);
172                    let records = recovery.recover()?;
173                    Self::apply_wal_records(&store, &records)?;
174                }
175
176                // Open/create WAL manager with configured durability
177                let wal_durability = match config.wal_durability {
178                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
179                    crate::config::DurabilityMode::Batch {
180                        max_delay_ms,
181                        max_records,
182                    } => WalDurabilityMode::Batch {
183                        max_delay_ms,
184                        max_records,
185                    },
186                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
187                        WalDurabilityMode::Adaptive { target_interval_ms }
188                    }
189                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
190                };
191                let wal_config = WalConfig {
192                    durability: wal_durability,
193                    ..WalConfig::default()
194                };
195                let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
196                Some(Arc::new(wal_manager))
197            } else {
198                None
199            }
200        } else {
201            None
202        };
203
204        // Create query cache with default capacity (1000 queries)
205        let query_cache = Arc::new(QueryCache::default());
206
207        Ok(Self {
208            config,
209            store,
210            #[cfg(feature = "rdf")]
211            rdf_store,
212            tx_manager,
213            buffer_manager,
214            #[cfg(feature = "wal")]
215            wal,
216            query_cache,
217            is_open: RwLock::new(true),
218        })
219    }
220
221    /// Applies WAL records to restore the database state.
222    #[cfg(feature = "wal")]
223    fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
224        for record in records {
225            match record {
226                WalRecord::CreateNode { id, labels } => {
227                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
228                    store.create_node_with_id(*id, &label_refs);
229                }
230                WalRecord::DeleteNode { id } => {
231                    store.delete_node(*id);
232                }
233                WalRecord::CreateEdge {
234                    id,
235                    src,
236                    dst,
237                    edge_type,
238                } => {
239                    store.create_edge_with_id(*id, *src, *dst, edge_type);
240                }
241                WalRecord::DeleteEdge { id } => {
242                    store.delete_edge(*id);
243                }
244                WalRecord::SetNodeProperty { id, key, value } => {
245                    store.set_node_property(*id, key, value.clone());
246                }
247                WalRecord::SetEdgeProperty { id, key, value } => {
248                    store.set_edge_property(*id, key, value.clone());
249                }
250                WalRecord::AddNodeLabel { id, label } => {
251                    store.add_label(*id, label);
252                }
253                WalRecord::RemoveNodeLabel { id, label } => {
254                    store.remove_label(*id, label);
255                }
256                WalRecord::TxCommit { .. }
257                | WalRecord::TxAbort { .. }
258                | WalRecord::Checkpoint { .. } => {
259                    // Transaction control records don't need replay action
260                    // (recovery already filtered to only committed transactions)
261                }
262            }
263        }
264        Ok(())
265    }
266
267    /// Opens a new session for running queries.
268    ///
269    /// Sessions are cheap to create - spin up as many as you need. Each
270    /// gets its own transaction context, so concurrent sessions won't
271    /// block each other on reads.
272    ///
273    /// # Examples
274    ///
275    /// ```
276    /// use grafeo_engine::GrafeoDB;
277    ///
278    /// let db = GrafeoDB::new_in_memory();
279    /// let session = db.session();
280    ///
281    /// // Run queries through the session
282    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
283    /// # Ok::<(), grafeo_common::utils::error::Error>(())
284    /// ```
285    #[must_use]
286    pub fn session(&self) -> Session {
287        #[cfg(feature = "rdf")]
288        {
289            Session::with_rdf_store_and_adaptive(
290                Arc::clone(&self.store),
291                Arc::clone(&self.rdf_store),
292                Arc::clone(&self.tx_manager),
293                Arc::clone(&self.query_cache),
294                self.config.adaptive.clone(),
295                self.config.factorized_execution,
296                self.config.graph_model,
297            )
298        }
299        #[cfg(not(feature = "rdf"))]
300        {
301            Session::with_adaptive(
302                Arc::clone(&self.store),
303                Arc::clone(&self.tx_manager),
304                Arc::clone(&self.query_cache),
305                self.config.adaptive.clone(),
306                self.config.factorized_execution,
307                self.config.graph_model,
308            )
309        }
310    }
311
312    /// Returns the adaptive execution configuration.
313    #[must_use]
314    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
315        &self.config.adaptive
316    }
317
318    /// Runs a query directly on the database.
319    ///
320    /// A convenience method that creates a temporary session behind the
321    /// scenes. If you're running multiple queries, grab a
322    /// [`session()`](Self::session) instead to avoid the overhead.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if parsing or execution fails.
327    pub fn execute(&self, query: &str) -> Result<QueryResult> {
328        let session = self.session();
329        session.execute(query)
330    }
331
332    /// Executes a query with parameters and returns the result.
333    ///
334    /// # Errors
335    ///
336    /// Returns an error if the query fails.
337    pub fn execute_with_params(
338        &self,
339        query: &str,
340        params: std::collections::HashMap<String, grafeo_common::types::Value>,
341    ) -> Result<QueryResult> {
342        let session = self.session();
343        session.execute_with_params(query, params)
344    }
345
346    /// Executes a Cypher query and returns the result.
347    ///
348    /// # Errors
349    ///
350    /// Returns an error if the query fails.
351    #[cfg(feature = "cypher")]
352    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
353        let session = self.session();
354        session.execute_cypher(query)
355    }
356
357    /// Executes a Cypher query with parameters and returns the result.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if the query fails.
362    #[cfg(feature = "cypher")]
363    pub fn execute_cypher_with_params(
364        &self,
365        query: &str,
366        params: std::collections::HashMap<String, grafeo_common::types::Value>,
367    ) -> Result<QueryResult> {
368        use crate::query::processor::{QueryLanguage, QueryProcessor};
369
370        // Create processor
371        let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
372        processor.process(query, QueryLanguage::Cypher, Some(&params))
373    }
374
375    /// Executes a Gremlin query and returns the result.
376    ///
377    /// # Errors
378    ///
379    /// Returns an error if the query fails.
380    #[cfg(feature = "gremlin")]
381    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
382        let session = self.session();
383        session.execute_gremlin(query)
384    }
385
386    /// Executes a Gremlin query with parameters and returns the result.
387    ///
388    /// # Errors
389    ///
390    /// Returns an error if the query fails.
391    #[cfg(feature = "gremlin")]
392    pub fn execute_gremlin_with_params(
393        &self,
394        query: &str,
395        params: std::collections::HashMap<String, grafeo_common::types::Value>,
396    ) -> Result<QueryResult> {
397        let session = self.session();
398        session.execute_gremlin_with_params(query, params)
399    }
400
401    /// Executes a GraphQL query and returns the result.
402    ///
403    /// # Errors
404    ///
405    /// Returns an error if the query fails.
406    #[cfg(feature = "graphql")]
407    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
408        let session = self.session();
409        session.execute_graphql(query)
410    }
411
412    /// Executes a GraphQL query with parameters and returns the result.
413    ///
414    /// # Errors
415    ///
416    /// Returns an error if the query fails.
417    #[cfg(feature = "graphql")]
418    pub fn execute_graphql_with_params(
419        &self,
420        query: &str,
421        params: std::collections::HashMap<String, grafeo_common::types::Value>,
422    ) -> Result<QueryResult> {
423        let session = self.session();
424        session.execute_graphql_with_params(query, params)
425    }
426
427    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE) and returns the result.
428    ///
429    /// # Errors
430    ///
431    /// Returns an error if the query fails.
432    #[cfg(feature = "sql-pgq")]
433    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
434        let session = self.session();
435        session.execute_sql(query)
436    }
437
438    /// Executes a SQL/PGQ query with parameters and returns the result.
439    ///
440    /// # Errors
441    ///
442    /// Returns an error if the query fails.
443    #[cfg(feature = "sql-pgq")]
444    pub fn execute_sql_with_params(
445        &self,
446        query: &str,
447        params: std::collections::HashMap<String, grafeo_common::types::Value>,
448    ) -> Result<QueryResult> {
449        use crate::query::processor::{QueryLanguage, QueryProcessor};
450
451        // Create processor
452        let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
453        processor.process(query, QueryLanguage::SqlPgq, Some(&params))
454    }
455
456    /// Executes a SPARQL query and returns the result.
457    ///
458    /// SPARQL queries operate on the RDF triple store.
459    ///
460    /// # Errors
461    ///
462    /// Returns an error if the query fails.
463    ///
464    /// # Examples
465    ///
466    /// ```ignore
467    /// use grafeo_engine::GrafeoDB;
468    ///
469    /// let db = GrafeoDB::new_in_memory();
470    /// let result = db.execute_sparql("SELECT ?s ?p ?o WHERE { ?s ?p ?o }")?;
471    /// ```
472    #[cfg(all(feature = "sparql", feature = "rdf"))]
473    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
474        use crate::query::{
475            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
476        };
477
478        // Parse and translate the SPARQL query to a logical plan
479        let logical_plan = sparql_translator::translate(query)?;
480
481        // Optimize the plan
482        let optimizer = Optimizer::from_store(&self.store);
483        let optimized_plan = optimizer.optimize(logical_plan)?;
484
485        // Convert to physical plan using RDF planner
486        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
487        let mut physical_plan = planner.plan(&optimized_plan)?;
488
489        // Execute the plan
490        let executor = Executor::with_columns(physical_plan.columns.clone());
491        executor.execute(physical_plan.operator.as_mut())
492    }
493
494    /// Returns the RDF store.
495    ///
496    /// This provides direct access to the RDF store for triple operations.
497    #[cfg(feature = "rdf")]
498    #[must_use]
499    pub fn rdf_store(&self) -> &Arc<RdfStore> {
500        &self.rdf_store
501    }
502
503    /// Executes a query and returns a single scalar value.
504    ///
505    /// # Errors
506    ///
507    /// Returns an error if the query fails or doesn't return exactly one row.
508    pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
509        let result = self.execute(query)?;
510        result.scalar()
511    }
512
513    /// Returns the configuration.
514    #[must_use]
515    pub fn config(&self) -> &Config {
516        &self.config
517    }
518
519    /// Returns the graph data model of this database.
520    #[must_use]
521    pub fn graph_model(&self) -> crate::config::GraphModel {
522        self.config.graph_model
523    }
524
525    /// Returns the configured memory limit in bytes, if any.
526    #[must_use]
527    pub fn memory_limit(&self) -> Option<usize> {
528        self.config.memory_limit
529    }
530
531    /// Returns the underlying store.
532    ///
533    /// This provides direct access to the LPG store for algorithm implementations.
534    #[must_use]
535    pub fn store(&self) -> &Arc<LpgStore> {
536        &self.store
537    }
538
539    /// Returns the buffer manager for memory-aware operations.
540    #[must_use]
541    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
542        &self.buffer_manager
543    }
544
545    /// Closes the database, flushing all pending writes.
546    ///
547    /// For persistent databases, this ensures everything is safely on disk.
548    /// Called automatically when the database is dropped, but you can call
549    /// it explicitly if you need to guarantee durability at a specific point.
550    ///
551    /// # Errors
552    ///
553    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
554    pub fn close(&self) -> Result<()> {
555        let mut is_open = self.is_open.write();
556        if !*is_open {
557            return Ok(());
558        }
559
560        // Commit and checkpoint WAL
561        #[cfg(feature = "wal")]
562        if let Some(ref wal) = self.wal {
563            let epoch = self.store.current_epoch();
564
565            // Use the last assigned transaction ID, or create a checkpoint-only tx
566            let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
567                // No transactions have been started; begin one for checkpoint
568                self.tx_manager.begin()
569            });
570
571            // Log a TxCommit to mark all pending records as committed
572            wal.log(&WalRecord::TxCommit {
573                tx_id: checkpoint_tx,
574            })?;
575
576            // Then checkpoint
577            wal.checkpoint(checkpoint_tx, epoch)?;
578            wal.sync()?;
579        }
580
581        *is_open = false;
582        Ok(())
583    }
584
585    /// Returns the WAL manager if available.
586    #[cfg(feature = "wal")]
587    #[must_use]
588    pub fn wal(&self) -> Option<&Arc<WalManager>> {
589        self.wal.as_ref()
590    }
591
592    /// Logs a WAL record if WAL is enabled.
593    #[cfg(feature = "wal")]
594    fn log_wal(&self, record: &WalRecord) -> Result<()> {
595        if let Some(ref wal) = self.wal {
596            wal.log(record)?;
597        }
598        Ok(())
599    }
600
601    /// Returns the number of nodes in the database.
602    #[must_use]
603    pub fn node_count(&self) -> usize {
604        self.store.node_count()
605    }
606
607    /// Returns the number of edges in the database.
608    #[must_use]
609    pub fn edge_count(&self) -> usize {
610        self.store.edge_count()
611    }
612
613    /// Returns the number of distinct labels in the database.
614    #[must_use]
615    pub fn label_count(&self) -> usize {
616        self.store.label_count()
617    }
618
619    /// Returns the number of distinct property keys in the database.
620    #[must_use]
621    pub fn property_key_count(&self) -> usize {
622        self.store.property_key_count()
623    }
624
625    /// Returns the number of distinct edge types in the database.
626    #[must_use]
627    pub fn edge_type_count(&self) -> usize {
628        self.store.edge_type_count()
629    }
630
631    // === Node Operations ===
632
633    /// Creates a node with the given labels and returns its ID.
634    ///
635    /// Labels categorize nodes - think of them like tags. A node can have
636    /// multiple labels (e.g., `["Person", "Employee"]`).
637    ///
638    /// # Examples
639    ///
640    /// ```
641    /// use grafeo_engine::GrafeoDB;
642    ///
643    /// let db = GrafeoDB::new_in_memory();
644    /// let alice = db.create_node(&["Person"]);
645    /// let company = db.create_node(&["Company", "Startup"]);
646    /// ```
647    pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
648        let id = self.store.create_node(labels);
649
650        // Log to WAL if enabled
651        #[cfg(feature = "wal")]
652        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
653            id,
654            labels: labels.iter().map(|s| (*s).to_string()).collect(),
655        }) {
656            tracing::warn!("Failed to log CreateNode to WAL: {}", e);
657        }
658
659        id
660    }
661
662    /// Creates a new node with labels and properties.
663    ///
664    /// If WAL is enabled, the operation is logged for durability.
665    pub fn create_node_with_props(
666        &self,
667        labels: &[&str],
668        properties: impl IntoIterator<
669            Item = (
670                impl Into<grafeo_common::types::PropertyKey>,
671                impl Into<grafeo_common::types::Value>,
672            ),
673        >,
674    ) -> grafeo_common::types::NodeId {
675        // Collect properties first so we can log them to WAL
676        let props: Vec<(
677            grafeo_common::types::PropertyKey,
678            grafeo_common::types::Value,
679        )> = properties
680            .into_iter()
681            .map(|(k, v)| (k.into(), v.into()))
682            .collect();
683
684        let id = self
685            .store
686            .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
687
688        // Log node creation to WAL
689        #[cfg(feature = "wal")]
690        {
691            if let Err(e) = self.log_wal(&WalRecord::CreateNode {
692                id,
693                labels: labels.iter().map(|s| (*s).to_string()).collect(),
694            }) {
695                tracing::warn!("Failed to log CreateNode to WAL: {}", e);
696            }
697
698            // Log each property to WAL for full durability
699            for (key, value) in props {
700                if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
701                    id,
702                    key: key.to_string(),
703                    value,
704                }) {
705                    tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
706                }
707            }
708        }
709
710        id
711    }
712
713    /// Gets a node by ID.
714    #[must_use]
715    pub fn get_node(
716        &self,
717        id: grafeo_common::types::NodeId,
718    ) -> Option<grafeo_core::graph::lpg::Node> {
719        self.store.get_node(id)
720    }
721
722    /// Deletes a node and all its edges.
723    ///
724    /// If WAL is enabled, the operation is logged for durability.
725    pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
726        // Collect matching vector indexes BEFORE deletion removes labels
727        #[cfg(feature = "vector-index")]
728        let indexes_to_clean: Vec<std::sync::Arc<grafeo_core::index::vector::HnswIndex>> = self
729            .store
730            .get_node(id)
731            .map(|node| {
732                let mut indexes = Vec::new();
733                for label in &node.labels {
734                    let prefix = format!("{}:", label.as_str());
735                    for (key, index) in self.store.vector_index_entries() {
736                        if key.starts_with(&prefix) {
737                            indexes.push(index);
738                        }
739                    }
740                }
741                indexes
742            })
743            .unwrap_or_default();
744
745        let result = self.store.delete_node(id);
746
747        // Remove from vector indexes after successful deletion
748        #[cfg(feature = "vector-index")]
749        if result {
750            for index in indexes_to_clean {
751                index.remove(id);
752            }
753        }
754
755        #[cfg(feature = "wal")]
756        if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
757            tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
758        }
759
760        result
761    }
762
763    /// Sets a property on a node.
764    ///
765    /// If WAL is enabled, the operation is logged for durability.
766    pub fn set_node_property(
767        &self,
768        id: grafeo_common::types::NodeId,
769        key: &str,
770        value: grafeo_common::types::Value,
771    ) {
772        // Extract vector data before the value is moved into the store
773        #[cfg(feature = "vector-index")]
774        let vector_data = match &value {
775            grafeo_common::types::Value::Vector(v) => Some(v.clone()),
776            _ => None,
777        };
778
779        // Log to WAL first
780        #[cfg(feature = "wal")]
781        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
782            id,
783            key: key.to_string(),
784            value: value.clone(),
785        }) {
786            tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
787        }
788
789        self.store.set_node_property(id, key, value);
790
791        // Auto-insert into matching vector indexes
792        #[cfg(feature = "vector-index")]
793        if let Some(vec) = vector_data
794            && let Some(node) = self.store.get_node(id)
795        {
796            for label in &node.labels {
797                if let Some(index) = self.store.get_vector_index(label.as_str(), key) {
798                    index.insert(id, &vec);
799                }
800            }
801        }
802    }
803
804    /// Adds a label to an existing node.
805    ///
806    /// Returns `true` if the label was added, `false` if the node doesn't exist
807    /// or already has the label.
808    ///
809    /// # Examples
810    ///
811    /// ```
812    /// use grafeo_engine::GrafeoDB;
813    ///
814    /// let db = GrafeoDB::new_in_memory();
815    /// let alice = db.create_node(&["Person"]);
816    ///
817    /// // Promote Alice to Employee
818    /// let added = db.add_node_label(alice, "Employee");
819    /// assert!(added);
820    /// ```
821    pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
822        let result = self.store.add_label(id, label);
823
824        #[cfg(feature = "wal")]
825        if result {
826            // Log to WAL if enabled
827            if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
828                id,
829                label: label.to_string(),
830            }) {
831                tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
832            }
833        }
834
835        // Auto-insert into vector indexes for the newly-added label
836        #[cfg(feature = "vector-index")]
837        if result {
838            let prefix = format!("{label}:");
839            for (key, index) in self.store.vector_index_entries() {
840                if let Some(property) = key.strip_prefix(&prefix)
841                    && let Some(node) = self.store.get_node(id)
842                {
843                    let prop_key = grafeo_common::types::PropertyKey::new(property);
844                    if let Some(grafeo_common::types::Value::Vector(v)) =
845                        node.properties.get(&prop_key)
846                    {
847                        index.insert(id, v);
848                    }
849                }
850            }
851        }
852
853        result
854    }
855
856    /// Removes a label from a node.
857    ///
858    /// Returns `true` if the label was removed, `false` if the node doesn't exist
859    /// or doesn't have the label.
860    ///
861    /// # Examples
862    ///
863    /// ```
864    /// use grafeo_engine::GrafeoDB;
865    ///
866    /// let db = GrafeoDB::new_in_memory();
867    /// let alice = db.create_node(&["Person", "Employee"]);
868    ///
869    /// // Remove Employee status
870    /// let removed = db.remove_node_label(alice, "Employee");
871    /// assert!(removed);
872    /// ```
873    pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
874        let result = self.store.remove_label(id, label);
875
876        #[cfg(feature = "wal")]
877        if result {
878            // Log to WAL if enabled
879            if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
880                id,
881                label: label.to_string(),
882            }) {
883                tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
884            }
885        }
886
887        result
888    }
889
890    /// Gets all labels for a node.
891    ///
892    /// Returns `None` if the node doesn't exist.
893    ///
894    /// # Examples
895    ///
896    /// ```
897    /// use grafeo_engine::GrafeoDB;
898    ///
899    /// let db = GrafeoDB::new_in_memory();
900    /// let alice = db.create_node(&["Person", "Employee"]);
901    ///
902    /// let labels = db.get_node_labels(alice).unwrap();
903    /// assert!(labels.contains(&"Person".to_string()));
904    /// assert!(labels.contains(&"Employee".to_string()));
905    /// ```
906    #[must_use]
907    pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
908        self.store
909            .get_node(id)
910            .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
911    }
912
913    // === Edge Operations ===
914
915    /// Creates an edge (relationship) between two nodes.
916    ///
917    /// Edges connect nodes and have a type that describes the relationship.
918    /// They're directed - the order of `src` and `dst` matters.
919    ///
920    /// # Examples
921    ///
922    /// ```
923    /// use grafeo_engine::GrafeoDB;
924    ///
925    /// let db = GrafeoDB::new_in_memory();
926    /// let alice = db.create_node(&["Person"]);
927    /// let bob = db.create_node(&["Person"]);
928    ///
929    /// // Alice knows Bob (directed: Alice -> Bob)
930    /// let edge = db.create_edge(alice, bob, "KNOWS");
931    /// ```
932    pub fn create_edge(
933        &self,
934        src: grafeo_common::types::NodeId,
935        dst: grafeo_common::types::NodeId,
936        edge_type: &str,
937    ) -> grafeo_common::types::EdgeId {
938        let id = self.store.create_edge(src, dst, edge_type);
939
940        // Log to WAL if enabled
941        #[cfg(feature = "wal")]
942        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
943            id,
944            src,
945            dst,
946            edge_type: edge_type.to_string(),
947        }) {
948            tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
949        }
950
951        id
952    }
953
954    /// Creates a new edge with properties.
955    ///
956    /// If WAL is enabled, the operation is logged for durability.
957    pub fn create_edge_with_props(
958        &self,
959        src: grafeo_common::types::NodeId,
960        dst: grafeo_common::types::NodeId,
961        edge_type: &str,
962        properties: impl IntoIterator<
963            Item = (
964                impl Into<grafeo_common::types::PropertyKey>,
965                impl Into<grafeo_common::types::Value>,
966            ),
967        >,
968    ) -> grafeo_common::types::EdgeId {
969        // Collect properties first so we can log them to WAL
970        let props: Vec<(
971            grafeo_common::types::PropertyKey,
972            grafeo_common::types::Value,
973        )> = properties
974            .into_iter()
975            .map(|(k, v)| (k.into(), v.into()))
976            .collect();
977
978        let id = self.store.create_edge_with_props(
979            src,
980            dst,
981            edge_type,
982            props.iter().map(|(k, v)| (k.clone(), v.clone())),
983        );
984
985        // Log edge creation to WAL
986        #[cfg(feature = "wal")]
987        {
988            if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
989                id,
990                src,
991                dst,
992                edge_type: edge_type.to_string(),
993            }) {
994                tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
995            }
996
997            // Log each property to WAL for full durability
998            for (key, value) in props {
999                if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
1000                    id,
1001                    key: key.to_string(),
1002                    value,
1003                }) {
1004                    tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
1005                }
1006            }
1007        }
1008
1009        id
1010    }
1011
1012    /// Gets an edge by ID.
1013    #[must_use]
1014    pub fn get_edge(
1015        &self,
1016        id: grafeo_common::types::EdgeId,
1017    ) -> Option<grafeo_core::graph::lpg::Edge> {
1018        self.store.get_edge(id)
1019    }
1020
1021    /// Deletes an edge.
1022    ///
1023    /// If WAL is enabled, the operation is logged for durability.
1024    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
1025        let result = self.store.delete_edge(id);
1026
1027        #[cfg(feature = "wal")]
1028        if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
1029            tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
1030        }
1031
1032        result
1033    }
1034
1035    /// Sets a property on an edge.
1036    ///
1037    /// If WAL is enabled, the operation is logged for durability.
1038    pub fn set_edge_property(
1039        &self,
1040        id: grafeo_common::types::EdgeId,
1041        key: &str,
1042        value: grafeo_common::types::Value,
1043    ) {
1044        // Log to WAL first
1045        #[cfg(feature = "wal")]
1046        if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
1047            id,
1048            key: key.to_string(),
1049            value: value.clone(),
1050        }) {
1051            tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
1052        }
1053        self.store.set_edge_property(id, key, value);
1054    }
1055
1056    /// Removes a property from a node.
1057    ///
1058    /// Returns true if the property existed and was removed, false otherwise.
1059    pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
1060        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
1061        self.store.remove_node_property(id, key).is_some()
1062    }
1063
1064    /// Removes a property from an edge.
1065    ///
1066    /// Returns true if the property existed and was removed, false otherwise.
1067    pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
1068        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
1069        self.store.remove_edge_property(id, key).is_some()
1070    }
1071
1072    // =========================================================================
1073    // PROPERTY INDEX API
1074    // =========================================================================
1075
1076    /// Creates an index on a node property for O(1) lookups by value.
1077    ///
1078    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
1079    /// O(1) instead of O(n) for this property. The index is automatically
1080    /// maintained when properties are set or removed.
1081    ///
1082    /// # Example
1083    ///
1084    /// ```ignore
1085    /// // Create an index on the 'email' property
1086    /// db.create_property_index("email");
1087    ///
1088    /// // Now lookups by email are O(1)
1089    /// let nodes = db.find_nodes_by_property("email", &Value::from("alice@example.com"));
1090    /// ```
1091    pub fn create_property_index(&self, property: &str) {
1092        self.store.create_property_index(property);
1093    }
1094
1095    /// Creates a vector similarity index on a node property.
1096    ///
1097    /// This enables efficient approximate nearest-neighbor search on vector
1098    /// properties. Currently validates the index parameters and scans existing
1099    /// nodes to verify the property contains vectors of the expected dimensions.
1100    ///
1101    /// # Arguments
1102    ///
1103    /// * `label` - Node label to index (e.g., `"Doc"`)
1104    /// * `property` - Property containing vector embeddings (e.g., `"embedding"`)
1105    /// * `dimensions` - Expected vector dimensions (inferred from data if `None`)
1106    /// * `metric` - Distance metric: `"cosine"` (default), `"euclidean"`, `"dot_product"`, `"manhattan"`
1107    /// * `m` - HNSW links per node (default: 16). Higher = better recall, more memory.
1108    /// * `ef_construction` - Construction beam width (default: 128). Higher = better index quality, slower build.
1109    ///
1110    /// # Errors
1111    ///
1112    /// Returns an error if the metric is invalid, no vectors are found, or
1113    /// dimensions don't match.
1114    pub fn create_vector_index(
1115        &self,
1116        label: &str,
1117        property: &str,
1118        dimensions: Option<usize>,
1119        metric: Option<&str>,
1120        m: Option<usize>,
1121        ef_construction: Option<usize>,
1122    ) -> Result<()> {
1123        use grafeo_common::types::{PropertyKey, Value};
1124        use grafeo_core::index::vector::DistanceMetric;
1125
1126        let metric = match metric {
1127            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1128                grafeo_common::utils::error::Error::Internal(format!(
1129                    "Unknown distance metric '{}'. Use: cosine, euclidean, dot_product, manhattan",
1130                    m
1131                ))
1132            })?,
1133            None => DistanceMetric::Cosine,
1134        };
1135
1136        // Scan nodes to validate vectors exist and check dimensions
1137        let prop_key = PropertyKey::new(property);
1138        let mut found_dims: Option<usize> = dimensions;
1139        let mut vector_count = 0usize;
1140
1141        #[cfg(feature = "vector-index")]
1142        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1143
1144        for node in self.store.nodes_with_label(label) {
1145            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1146                if let Some(expected) = found_dims {
1147                    if v.len() != expected {
1148                        return Err(grafeo_common::utils::error::Error::Internal(format!(
1149                            "Vector dimension mismatch: expected {}, found {} on node {}",
1150                            expected,
1151                            v.len(),
1152                            node.id.0
1153                        )));
1154                    }
1155                } else {
1156                    found_dims = Some(v.len());
1157                }
1158                vector_count += 1;
1159                #[cfg(feature = "vector-index")]
1160                vectors.push((node.id, v.to_vec()));
1161            }
1162        }
1163
1164        if vector_count == 0 {
1165            return Err(grafeo_common::utils::error::Error::Internal(format!(
1166                "No vector properties found on :{label}({property})"
1167            )));
1168        }
1169
1170        let dims = found_dims.unwrap_or(0);
1171
1172        // Build and populate the HNSW index
1173        #[cfg(feature = "vector-index")]
1174        {
1175            use grafeo_core::index::vector::{HnswConfig, HnswIndex};
1176
1177            let mut config = HnswConfig::new(dims, metric);
1178            if let Some(m_val) = m {
1179                config = config.with_m(m_val);
1180            }
1181            if let Some(ef_c) = ef_construction {
1182                config = config.with_ef_construction(ef_c);
1183            }
1184
1185            let index = HnswIndex::with_capacity(config, vectors.len());
1186            for (node_id, vec) in &vectors {
1187                index.insert(*node_id, vec);
1188            }
1189
1190            self.store
1191                .add_vector_index(label, property, Arc::new(index));
1192        }
1193
1194        // Suppress unused variable warnings when vector-index is off
1195        let _ = (m, ef_construction);
1196
1197        tracing::info!(
1198            "Vector index created: :{label}({property}) - {vector_count} vectors, {dims} dimensions, metric={metric_name}",
1199            metric_name = metric.name()
1200        );
1201
1202        Ok(())
1203    }
1204
1205    /// Drops a vector index for the given label and property.
1206    ///
1207    /// Returns `true` if the index existed and was removed, `false` if no
1208    /// index was found.
1209    ///
1210    /// After dropping, [`vector_search`](Self::vector_search) for this
1211    /// label+property pair will return an error.
1212    #[cfg(feature = "vector-index")]
1213    pub fn drop_vector_index(&self, label: &str, property: &str) -> bool {
1214        let removed = self.store.remove_vector_index(label, property);
1215        if removed {
1216            tracing::info!("Vector index dropped: :{label}({property})");
1217        }
1218        removed
1219    }
1220
1221    /// Drops and recreates a vector index, rescanning all matching nodes.
1222    ///
1223    /// This is useful after bulk inserts or when the index may be out of sync.
1224    /// The previous index configuration (dimensions, metric, M, ef\_construction)
1225    /// is preserved.
1226    ///
1227    /// # Errors
1228    ///
1229    /// Returns an error if no vector index exists for this label+property pair,
1230    /// or if the rebuild fails (e.g., no matching vectors found).
1231    #[cfg(feature = "vector-index")]
1232    pub fn rebuild_vector_index(&self, label: &str, property: &str) -> Result<()> {
1233        let config = self
1234            .store
1235            .get_vector_index(label, property)
1236            .map(|idx| idx.config().clone())
1237            .ok_or_else(|| {
1238                grafeo_common::utils::error::Error::Internal(format!(
1239                    "No vector index found for :{label}({property}). Cannot rebuild."
1240                ))
1241            })?;
1242
1243        self.store.remove_vector_index(label, property);
1244
1245        self.create_vector_index(
1246            label,
1247            property,
1248            Some(config.dimensions),
1249            Some(config.metric.name()),
1250            Some(config.m),
1251            Some(config.ef_construction),
1252        )
1253    }
1254
1255    /// Computes a node allowlist from property equality filters.
1256    ///
1257    /// Intersects `nodes_by_label(label)` with `find_nodes_by_property(key, value)`
1258    /// for each filter entry. Returns `None` if filters is `None` or empty (meaning
1259    /// no filtering), or `Some(set)` with the intersection (possibly empty).
1260    #[cfg(feature = "vector-index")]
1261    fn compute_filter_allowlist(
1262        &self,
1263        label: &str,
1264        filters: Option<&std::collections::HashMap<String, Value>>,
1265    ) -> Option<std::collections::HashSet<NodeId>> {
1266        let filters = filters.filter(|f| !f.is_empty())?;
1267
1268        // Start with all nodes for this label
1269        let label_nodes: std::collections::HashSet<NodeId> =
1270            self.store.nodes_by_label(label).into_iter().collect();
1271
1272        let mut allowlist = label_nodes;
1273
1274        for (key, value) in filters {
1275            let matching: std::collections::HashSet<NodeId> = self
1276                .store
1277                .find_nodes_by_property(key, value)
1278                .into_iter()
1279                .collect();
1280            allowlist = allowlist.intersection(&matching).copied().collect();
1281
1282            // Short-circuit: empty intersection means no results possible
1283            if allowlist.is_empty() {
1284                return Some(allowlist);
1285            }
1286        }
1287
1288        Some(allowlist)
1289    }
1290
1291    /// Searches for the k nearest neighbors of a query vector.
1292    ///
1293    /// Uses the HNSW index created by [`create_vector_index`](Self::create_vector_index).
1294    ///
1295    /// # Arguments
1296    ///
1297    /// * `label` - Node label that was indexed
1298    /// * `property` - Property that was indexed
1299    /// * `query` - Query vector (slice of floats)
1300    /// * `k` - Number of nearest neighbors to return
1301    /// * `ef` - Search beam width (higher = better recall, slower). Uses index default if `None`.
1302    /// * `filters` - Optional property equality filters. Only nodes matching all
1303    ///   `(key, value)` pairs will appear in results.
1304    ///
1305    /// # Returns
1306    ///
1307    /// Vector of `(NodeId, distance)` pairs sorted by distance ascending.
1308    #[cfg(feature = "vector-index")]
1309    pub fn vector_search(
1310        &self,
1311        label: &str,
1312        property: &str,
1313        query: &[f32],
1314        k: usize,
1315        ef: Option<usize>,
1316        filters: Option<&std::collections::HashMap<String, Value>>,
1317    ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1318        let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1319            grafeo_common::utils::error::Error::Internal(format!(
1320                "No vector index found for :{label}({property}). Call create_vector_index() first."
1321            ))
1322        })?;
1323
1324        let results = match self.compute_filter_allowlist(label, filters) {
1325            Some(allowlist) => match ef {
1326                Some(ef_val) => index.search_with_ef_and_filter(query, k, ef_val, &allowlist),
1327                None => index.search_with_filter(query, k, &allowlist),
1328            },
1329            None => match ef {
1330                Some(ef_val) => index.search_with_ef(query, k, ef_val),
1331                None => index.search(query, k),
1332            },
1333        };
1334
1335        Ok(results)
1336    }
1337
1338    /// Creates multiple nodes in bulk, each with a single vector property.
1339    ///
1340    /// Much faster than individual `create_node_with_props` calls because it
1341    /// acquires internal locks once and loops in Rust rather than crossing
1342    /// the FFI boundary per vector.
1343    ///
1344    /// # Arguments
1345    ///
1346    /// * `label` - Label applied to all created nodes
1347    /// * `property` - Property name for the vector data
1348    /// * `vectors` - Vector data for each node
1349    ///
1350    /// # Returns
1351    ///
1352    /// Vector of created `NodeId`s in the same order as the input vectors.
1353    pub fn batch_create_nodes(
1354        &self,
1355        label: &str,
1356        property: &str,
1357        vectors: Vec<Vec<f32>>,
1358    ) -> Vec<grafeo_common::types::NodeId> {
1359        use grafeo_common::types::{PropertyKey, Value};
1360
1361        let prop_key = PropertyKey::new(property);
1362        let labels: &[&str] = &[label];
1363
1364        let ids: Vec<grafeo_common::types::NodeId> = vectors
1365            .into_iter()
1366            .map(|vec| {
1367                let value = Value::Vector(vec.into());
1368                let id = self.store.create_node_with_props(
1369                    labels,
1370                    std::iter::once((prop_key.clone(), value.clone())),
1371                );
1372
1373                // Log to WAL
1374                #[cfg(feature = "wal")]
1375                {
1376                    if let Err(e) = self.log_wal(&WalRecord::CreateNode {
1377                        id,
1378                        labels: labels.iter().map(|s| (*s).to_string()).collect(),
1379                    }) {
1380                        tracing::warn!("Failed to log CreateNode to WAL: {}", e);
1381                    }
1382                    if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
1383                        id,
1384                        key: property.to_string(),
1385                        value,
1386                    }) {
1387                        tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
1388                    }
1389                }
1390
1391                id
1392            })
1393            .collect();
1394
1395        // Auto-insert into matching vector index if one exists
1396        #[cfg(feature = "vector-index")]
1397        if let Some(index) = self.store.get_vector_index(label, property) {
1398            for &id in &ids {
1399                if let Some(node) = self.store.get_node(id) {
1400                    let pk = grafeo_common::types::PropertyKey::new(property);
1401                    if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk) {
1402                        index.insert(id, v);
1403                    }
1404                }
1405            }
1406        }
1407
1408        ids
1409    }
1410
1411    /// Searches for nearest neighbors for multiple query vectors in parallel.
1412    ///
1413    /// Uses rayon parallel iteration under the hood for multi-core throughput.
1414    ///
1415    /// # Arguments
1416    ///
1417    /// * `label` - Node label that was indexed
1418    /// * `property` - Property that was indexed
1419    /// * `queries` - Batch of query vectors
1420    /// * `k` - Number of nearest neighbors per query
1421    /// * `ef` - Search beam width (uses index default if `None`)
1422    /// * `filters` - Optional property equality filters
1423    #[cfg(feature = "vector-index")]
1424    pub fn batch_vector_search(
1425        &self,
1426        label: &str,
1427        property: &str,
1428        queries: &[Vec<f32>],
1429        k: usize,
1430        ef: Option<usize>,
1431        filters: Option<&std::collections::HashMap<String, Value>>,
1432    ) -> Result<Vec<Vec<(grafeo_common::types::NodeId, f32)>>> {
1433        let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1434            grafeo_common::utils::error::Error::Internal(format!(
1435                "No vector index found for :{label}({property}). Call create_vector_index() first."
1436            ))
1437        })?;
1438
1439        let results = match self.compute_filter_allowlist(label, filters) {
1440            Some(allowlist) => match ef {
1441                Some(ef_val) => {
1442                    index.batch_search_with_ef_and_filter(queries, k, ef_val, &allowlist)
1443                }
1444                None => index.batch_search_with_filter(queries, k, &allowlist),
1445            },
1446            None => match ef {
1447                Some(ef_val) => index.batch_search_with_ef(queries, k, ef_val),
1448                None => index.batch_search(queries, k),
1449            },
1450        };
1451
1452        Ok(results)
1453    }
1454
1455    /// Searches for diverse nearest neighbors using Maximal Marginal Relevance (MMR).
1456    ///
1457    /// MMR balances relevance (similarity to query) with diversity (dissimilarity
1458    /// among selected results). This is the algorithm used by LangChain's
1459    /// `mmr_traversal_search()` for RAG applications.
1460    ///
1461    /// # Arguments
1462    ///
1463    /// * `label` - Node label that was indexed
1464    /// * `property` - Property that was indexed
1465    /// * `query` - Query vector
1466    /// * `k` - Number of diverse results to return
1467    /// * `fetch_k` - Number of initial candidates from HNSW (default: `4 * k`)
1468    /// * `lambda` - Relevance vs. diversity in \[0, 1\] (default: 0.5).
1469    ///   1.0 = pure relevance, 0.0 = pure diversity.
1470    /// * `ef` - HNSW search beam width (uses index default if `None`)
1471    /// * `filters` - Optional property equality filters
1472    ///
1473    /// # Returns
1474    ///
1475    /// `(NodeId, distance)` pairs in MMR selection order. The f32 is the original
1476    /// distance from the query, matching [`vector_search`](Self::vector_search).
1477    #[cfg(feature = "vector-index")]
1478    #[allow(clippy::too_many_arguments)]
1479    pub fn mmr_search(
1480        &self,
1481        label: &str,
1482        property: &str,
1483        query: &[f32],
1484        k: usize,
1485        fetch_k: Option<usize>,
1486        lambda: Option<f32>,
1487        ef: Option<usize>,
1488        filters: Option<&std::collections::HashMap<String, Value>>,
1489    ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1490        use grafeo_core::index::vector::mmr_select;
1491
1492        let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1493            grafeo_common::utils::error::Error::Internal(format!(
1494                "No vector index found for :{label}({property}). Call create_vector_index() first."
1495            ))
1496        })?;
1497
1498        let fetch_k = fetch_k.unwrap_or(k.saturating_mul(4).max(k));
1499        let lambda = lambda.unwrap_or(0.5);
1500
1501        // Step 1: Fetch candidates from HNSW (with optional filter)
1502        let initial_results = match self.compute_filter_allowlist(label, filters) {
1503            Some(allowlist) => match ef {
1504                Some(ef_val) => index.search_with_ef_and_filter(query, fetch_k, ef_val, &allowlist),
1505                None => index.search_with_filter(query, fetch_k, &allowlist),
1506            },
1507            None => match ef {
1508                Some(ef_val) => index.search_with_ef(query, fetch_k, ef_val),
1509                None => index.search(query, fetch_k),
1510            },
1511        };
1512
1513        if initial_results.is_empty() {
1514            return Ok(Vec::new());
1515        }
1516
1517        // Step 2: Retrieve stored vectors for MMR pairwise comparison
1518        let candidates: Vec<(grafeo_common::types::NodeId, f32, std::sync::Arc<[f32]>)> =
1519            initial_results
1520                .into_iter()
1521                .filter_map(|(id, dist)| index.get(id).map(|vec| (id, dist, vec)))
1522                .collect();
1523
1524        // Step 3: Build slice-based candidates for mmr_select
1525        let candidate_refs: Vec<(grafeo_common::types::NodeId, f32, &[f32])> = candidates
1526            .iter()
1527            .map(|(id, dist, vec)| (*id, *dist, vec.as_ref()))
1528            .collect();
1529
1530        // Step 4: Run MMR selection
1531        let metric = index.config().metric;
1532        Ok(mmr_select(query, &candidate_refs, k, lambda, metric))
1533    }
1534
1535    /// Drops an index on a node property.
1536    ///
1537    /// Returns `true` if the index existed and was removed.
1538    pub fn drop_property_index(&self, property: &str) -> bool {
1539        self.store.drop_property_index(property)
1540    }
1541
1542    /// Returns `true` if the property has an index.
1543    #[must_use]
1544    pub fn has_property_index(&self, property: &str) -> bool {
1545        self.store.has_property_index(property)
1546    }
1547
1548    /// Finds all nodes that have a specific property value.
1549    ///
1550    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
1551    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
1552    ///
1553    /// # Example
1554    ///
1555    /// ```ignore
1556    /// // Create index for fast lookups (optional but recommended)
1557    /// db.create_property_index("city");
1558    ///
1559    /// // Find all nodes where city = "NYC"
1560    /// let nyc_nodes = db.find_nodes_by_property("city", &Value::from("NYC"));
1561    /// ```
1562    #[must_use]
1563    pub fn find_nodes_by_property(
1564        &self,
1565        property: &str,
1566        value: &grafeo_common::types::Value,
1567    ) -> Vec<grafeo_common::types::NodeId> {
1568        self.store.find_nodes_by_property(property, value)
1569    }
1570
1571    // =========================================================================
1572    // ADMIN API: Introspection
1573    // =========================================================================
1574
1575    /// Returns true if this database is backed by a file (persistent).
1576    ///
1577    /// In-memory databases return false.
1578    #[must_use]
1579    pub fn is_persistent(&self) -> bool {
1580        self.config.path.is_some()
1581    }
1582
1583    /// Returns the database file path, if persistent.
1584    ///
1585    /// In-memory databases return None.
1586    #[must_use]
1587    pub fn path(&self) -> Option<&Path> {
1588        self.config.path.as_deref()
1589    }
1590
1591    /// Returns high-level database information.
1592    ///
1593    /// Includes node/edge counts, persistence status, and mode (LPG/RDF).
1594    #[must_use]
1595    pub fn info(&self) -> crate::admin::DatabaseInfo {
1596        crate::admin::DatabaseInfo {
1597            mode: crate::admin::DatabaseMode::Lpg,
1598            node_count: self.store.node_count(),
1599            edge_count: self.store.edge_count(),
1600            is_persistent: self.is_persistent(),
1601            path: self.config.path.clone(),
1602            wal_enabled: self.config.wal_enabled,
1603            version: env!("CARGO_PKG_VERSION").to_string(),
1604        }
1605    }
1606
1607    /// Returns detailed database statistics.
1608    ///
1609    /// Includes counts, memory usage, and index information.
1610    #[must_use]
1611    pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1612        #[cfg(feature = "wal")]
1613        let disk_bytes = self.config.path.as_ref().and_then(|p| {
1614            if p.exists() {
1615                Self::calculate_disk_usage(p).ok()
1616            } else {
1617                None
1618            }
1619        });
1620        #[cfg(not(feature = "wal"))]
1621        let disk_bytes: Option<usize> = None;
1622
1623        crate::admin::DatabaseStats {
1624            node_count: self.store.node_count(),
1625            edge_count: self.store.edge_count(),
1626            label_count: self.store.label_count(),
1627            edge_type_count: self.store.edge_type_count(),
1628            property_key_count: self.store.property_key_count(),
1629            index_count: 0, // TODO: implement index tracking
1630            memory_bytes: self.buffer_manager.allocated(),
1631            disk_bytes,
1632        }
1633    }
1634
1635    /// Calculates total disk usage for the database directory.
1636    #[cfg(feature = "wal")]
1637    fn calculate_disk_usage(path: &Path) -> Result<usize> {
1638        let mut total = 0usize;
1639        if path.is_dir() {
1640            for entry in std::fs::read_dir(path)? {
1641                let entry = entry?;
1642                let metadata = entry.metadata()?;
1643                if metadata.is_file() {
1644                    total += metadata.len() as usize;
1645                } else if metadata.is_dir() {
1646                    total += Self::calculate_disk_usage(&entry.path())?;
1647                }
1648            }
1649        }
1650        Ok(total)
1651    }
1652
1653    /// Returns schema information (labels, edge types, property keys).
1654    ///
1655    /// For LPG mode, returns label and edge type information.
1656    /// For RDF mode, returns predicate and named graph information.
1657    #[must_use]
1658    pub fn schema(&self) -> crate::admin::SchemaInfo {
1659        let labels = self
1660            .store
1661            .all_labels()
1662            .into_iter()
1663            .map(|name| crate::admin::LabelInfo {
1664                name: name.clone(),
1665                count: self.store.nodes_with_label(&name).count(),
1666            })
1667            .collect();
1668
1669        let edge_types = self
1670            .store
1671            .all_edge_types()
1672            .into_iter()
1673            .map(|name| crate::admin::EdgeTypeInfo {
1674                name: name.clone(),
1675                count: self.store.edges_with_type(&name).count(),
1676            })
1677            .collect();
1678
1679        let property_keys = self.store.all_property_keys();
1680
1681        crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
1682            labels,
1683            edge_types,
1684            property_keys,
1685        })
1686    }
1687
1688    /// Returns RDF schema information.
1689    ///
1690    /// Only available when the RDF feature is enabled.
1691    #[cfg(feature = "rdf")]
1692    #[must_use]
1693    pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
1694        let stats = self.rdf_store.stats();
1695
1696        let predicates = self
1697            .rdf_store
1698            .predicates()
1699            .into_iter()
1700            .map(|predicate| {
1701                let count = self.rdf_store.triples_with_predicate(&predicate).len();
1702                crate::admin::PredicateInfo {
1703                    iri: predicate.to_string(),
1704                    count,
1705                }
1706            })
1707            .collect();
1708
1709        crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
1710            predicates,
1711            named_graphs: Vec::new(), // Named graphs not yet implemented in RdfStore
1712            subject_count: stats.subject_count,
1713            object_count: stats.object_count,
1714        })
1715    }
1716
1717    /// Validates database integrity.
1718    ///
1719    /// Checks for:
1720    /// - Dangling edge references (edges pointing to non-existent nodes)
1721    /// - Internal index consistency
1722    ///
1723    /// Returns a list of errors and warnings. Empty errors = valid.
1724    #[must_use]
1725    pub fn validate(&self) -> crate::admin::ValidationResult {
1726        let mut result = crate::admin::ValidationResult::default();
1727
1728        // Check for dangling edge references
1729        for edge in self.store.all_edges() {
1730            if self.store.get_node(edge.src).is_none() {
1731                result.errors.push(crate::admin::ValidationError {
1732                    code: "DANGLING_SRC".to_string(),
1733                    message: format!(
1734                        "Edge {} references non-existent source node {}",
1735                        edge.id.0, edge.src.0
1736                    ),
1737                    context: Some(format!("edge:{}", edge.id.0)),
1738                });
1739            }
1740            if self.store.get_node(edge.dst).is_none() {
1741                result.errors.push(crate::admin::ValidationError {
1742                    code: "DANGLING_DST".to_string(),
1743                    message: format!(
1744                        "Edge {} references non-existent destination node {}",
1745                        edge.id.0, edge.dst.0
1746                    ),
1747                    context: Some(format!("edge:{}", edge.id.0)),
1748                });
1749            }
1750        }
1751
1752        // Add warnings for potential issues
1753        if self.store.node_count() > 0 && self.store.edge_count() == 0 {
1754            result.warnings.push(crate::admin::ValidationWarning {
1755                code: "NO_EDGES".to_string(),
1756                message: "Database has nodes but no edges".to_string(),
1757                context: None,
1758            });
1759        }
1760
1761        result
1762    }
1763
1764    /// Returns WAL (Write-Ahead Log) status.
1765    ///
1766    /// Returns None if WAL is not enabled.
1767    #[must_use]
1768    pub fn wal_status(&self) -> crate::admin::WalStatus {
1769        #[cfg(feature = "wal")]
1770        if let Some(ref wal) = self.wal {
1771            return crate::admin::WalStatus {
1772                enabled: true,
1773                path: self.config.path.as_ref().map(|p| p.join("wal")),
1774                size_bytes: wal.size_bytes(),
1775                record_count: wal.record_count() as usize,
1776                last_checkpoint: wal.last_checkpoint_timestamp(),
1777                current_epoch: self.store.current_epoch().as_u64(),
1778            };
1779        }
1780
1781        crate::admin::WalStatus {
1782            enabled: false,
1783            path: None,
1784            size_bytes: 0,
1785            record_count: 0,
1786            last_checkpoint: None,
1787            current_epoch: self.store.current_epoch().as_u64(),
1788        }
1789    }
1790
1791    /// Forces a WAL checkpoint.
1792    ///
1793    /// Flushes all pending WAL records to the main storage.
1794    ///
1795    /// # Errors
1796    ///
1797    /// Returns an error if the checkpoint fails.
1798    pub fn wal_checkpoint(&self) -> Result<()> {
1799        #[cfg(feature = "wal")]
1800        if let Some(ref wal) = self.wal {
1801            let epoch = self.store.current_epoch();
1802            let tx_id = self
1803                .tx_manager
1804                .last_assigned_tx_id()
1805                .unwrap_or_else(|| self.tx_manager.begin());
1806            wal.checkpoint(tx_id, epoch)?;
1807            wal.sync()?;
1808        }
1809        Ok(())
1810    }
1811
1812    // =========================================================================
1813    // ADMIN API: Persistence Control
1814    // =========================================================================
1815
1816    /// Saves the database to a file path.
1817    ///
1818    /// - If in-memory: creates a new persistent database at path
1819    /// - If file-backed: creates a copy at the new path
1820    ///
1821    /// The original database remains unchanged.
1822    ///
1823    /// # Errors
1824    ///
1825    /// Returns an error if the save operation fails.
1826    ///
1827    /// Requires the `wal` feature for persistence support.
1828    #[cfg(feature = "wal")]
1829    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
1830        let path = path.as_ref();
1831
1832        // Create target database with WAL enabled
1833        let target_config = Config::persistent(path);
1834        let target = Self::with_config(target_config)?;
1835
1836        // Copy all nodes using WAL-enabled methods
1837        for node in self.store.all_nodes() {
1838            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1839            target.store.create_node_with_id(node.id, &label_refs);
1840
1841            // Log to WAL
1842            target.log_wal(&WalRecord::CreateNode {
1843                id: node.id,
1844                labels: node.labels.iter().map(|s| s.to_string()).collect(),
1845            })?;
1846
1847            // Copy properties
1848            for (key, value) in node.properties {
1849                target
1850                    .store
1851                    .set_node_property(node.id, key.as_str(), value.clone());
1852                target.log_wal(&WalRecord::SetNodeProperty {
1853                    id: node.id,
1854                    key: key.to_string(),
1855                    value,
1856                })?;
1857            }
1858        }
1859
1860        // Copy all edges using WAL-enabled methods
1861        for edge in self.store.all_edges() {
1862            target
1863                .store
1864                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1865
1866            // Log to WAL
1867            target.log_wal(&WalRecord::CreateEdge {
1868                id: edge.id,
1869                src: edge.src,
1870                dst: edge.dst,
1871                edge_type: edge.edge_type.to_string(),
1872            })?;
1873
1874            // Copy properties
1875            for (key, value) in edge.properties {
1876                target
1877                    .store
1878                    .set_edge_property(edge.id, key.as_str(), value.clone());
1879                target.log_wal(&WalRecord::SetEdgeProperty {
1880                    id: edge.id,
1881                    key: key.to_string(),
1882                    value,
1883                })?;
1884            }
1885        }
1886
1887        // Checkpoint and close the target database
1888        target.close()?;
1889
1890        Ok(())
1891    }
1892
1893    /// Creates an in-memory copy of this database.
1894    ///
1895    /// Returns a new database that is completely independent.
1896    /// Useful for:
1897    /// - Testing modifications without affecting the original
1898    /// - Faster operations when persistence isn't needed
1899    ///
1900    /// # Errors
1901    ///
1902    /// Returns an error if the copy operation fails.
1903    pub fn to_memory(&self) -> Result<Self> {
1904        let config = Config::in_memory();
1905        let target = Self::with_config(config)?;
1906
1907        // Copy all nodes
1908        for node in self.store.all_nodes() {
1909            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1910            target.store.create_node_with_id(node.id, &label_refs);
1911
1912            // Copy properties
1913            for (key, value) in node.properties {
1914                target.store.set_node_property(node.id, key.as_str(), value);
1915            }
1916        }
1917
1918        // Copy all edges
1919        for edge in self.store.all_edges() {
1920            target
1921                .store
1922                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1923
1924            // Copy properties
1925            for (key, value) in edge.properties {
1926                target.store.set_edge_property(edge.id, key.as_str(), value);
1927            }
1928        }
1929
1930        Ok(target)
1931    }
1932
1933    /// Opens a database file and loads it entirely into memory.
1934    ///
1935    /// The returned database has no connection to the original file.
1936    /// Changes will NOT be written back to the file.
1937    ///
1938    /// # Errors
1939    ///
1940    /// Returns an error if the file can't be opened or loaded.
1941    #[cfg(feature = "wal")]
1942    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
1943        // Open the source database (triggers WAL recovery)
1944        let source = Self::open(path)?;
1945
1946        // Create in-memory copy
1947        let target = source.to_memory()?;
1948
1949        // Close the source (releases file handles)
1950        source.close()?;
1951
1952        Ok(target)
1953    }
1954
1955    // =========================================================================
1956    // ADMIN API: Snapshot Export/Import
1957    // =========================================================================
1958
1959    /// Exports the entire database to a binary snapshot.
1960    ///
1961    /// The returned bytes can be stored (e.g. in IndexedDB) and later
1962    /// restored with [`import_snapshot()`](Self::import_snapshot).
1963    ///
1964    /// # Errors
1965    ///
1966    /// Returns an error if serialization fails.
1967    pub fn export_snapshot(&self) -> Result<Vec<u8>> {
1968        let nodes: Vec<SnapshotNode> = self
1969            .store
1970            .all_nodes()
1971            .map(|n| SnapshotNode {
1972                id: n.id,
1973                labels: n.labels.iter().map(|l| l.to_string()).collect(),
1974                properties: n
1975                    .properties
1976                    .into_iter()
1977                    .map(|(k, v)| (k.to_string(), v))
1978                    .collect(),
1979            })
1980            .collect();
1981
1982        let edges: Vec<SnapshotEdge> = self
1983            .store
1984            .all_edges()
1985            .map(|e| SnapshotEdge {
1986                id: e.id,
1987                src: e.src,
1988                dst: e.dst,
1989                edge_type: e.edge_type.to_string(),
1990                properties: e
1991                    .properties
1992                    .into_iter()
1993                    .map(|(k, v)| (k.to_string(), v))
1994                    .collect(),
1995            })
1996            .collect();
1997
1998        let snapshot = Snapshot {
1999            version: 1,
2000            nodes,
2001            edges,
2002        };
2003
2004        let config = bincode::config::standard();
2005        bincode::serde::encode_to_vec(&snapshot, config)
2006            .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
2007    }
2008
2009    /// Creates a new in-memory database from a binary snapshot.
2010    ///
2011    /// The `data` must have been produced by [`export_snapshot()`](Self::export_snapshot).
2012    ///
2013    /// # Errors
2014    ///
2015    /// Returns an error if the snapshot is invalid or deserialization fails.
2016    pub fn import_snapshot(data: &[u8]) -> Result<Self> {
2017        let config = bincode::config::standard();
2018        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
2019            .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
2020
2021        if snapshot.version != 1 {
2022            return Err(Error::Internal(format!(
2023                "unsupported snapshot version: {}",
2024                snapshot.version
2025            )));
2026        }
2027
2028        let db = Self::new_in_memory();
2029
2030        for node in snapshot.nodes {
2031            let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
2032            db.store.create_node_with_id(node.id, &label_refs);
2033            for (key, value) in node.properties {
2034                db.store.set_node_property(node.id, &key, value);
2035            }
2036        }
2037
2038        for edge in snapshot.edges {
2039            db.store
2040                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
2041            for (key, value) in edge.properties {
2042                db.store.set_edge_property(edge.id, &key, value);
2043            }
2044        }
2045
2046        Ok(db)
2047    }
2048
2049    // =========================================================================
2050    // ADMIN API: Iteration
2051    // =========================================================================
2052
2053    /// Returns an iterator over all nodes in the database.
2054    ///
2055    /// Useful for dump/export operations.
2056    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
2057        self.store.all_nodes()
2058    }
2059
2060    /// Returns an iterator over all edges in the database.
2061    ///
2062    /// Useful for dump/export operations.
2063    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
2064        self.store.all_edges()
2065    }
2066}
2067
2068/// Binary snapshot format for database export/import.
2069#[derive(serde::Serialize, serde::Deserialize)]
2070struct Snapshot {
2071    version: u8,
2072    nodes: Vec<SnapshotNode>,
2073    edges: Vec<SnapshotEdge>,
2074}
2075
2076#[derive(serde::Serialize, serde::Deserialize)]
2077struct SnapshotNode {
2078    id: NodeId,
2079    labels: Vec<String>,
2080    properties: Vec<(String, Value)>,
2081}
2082
2083#[derive(serde::Serialize, serde::Deserialize)]
2084struct SnapshotEdge {
2085    id: EdgeId,
2086    src: NodeId,
2087    dst: NodeId,
2088    edge_type: String,
2089    properties: Vec<(String, Value)>,
2090}
2091
2092impl Drop for GrafeoDB {
2093    fn drop(&mut self) {
2094        if let Err(e) = self.close() {
2095            tracing::error!("Error closing database: {}", e);
2096        }
2097    }
2098}
2099
2100impl crate::admin::AdminService for GrafeoDB {
2101    fn info(&self) -> crate::admin::DatabaseInfo {
2102        self.info()
2103    }
2104
2105    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2106        self.detailed_stats()
2107    }
2108
2109    fn schema(&self) -> crate::admin::SchemaInfo {
2110        self.schema()
2111    }
2112
2113    fn validate(&self) -> crate::admin::ValidationResult {
2114        self.validate()
2115    }
2116
2117    fn wal_status(&self) -> crate::admin::WalStatus {
2118        self.wal_status()
2119    }
2120
2121    fn wal_checkpoint(&self) -> Result<()> {
2122        self.wal_checkpoint()
2123    }
2124}
2125
2126/// The result of running a query.
2127///
2128/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
2129/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
2130///
2131/// # Examples
2132///
2133/// ```
2134/// use grafeo_engine::GrafeoDB;
2135///
2136/// let db = GrafeoDB::new_in_memory();
2137/// db.create_node(&["Person"]);
2138///
2139/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
2140///
2141/// // Check what we got
2142/// println!("Columns: {:?}", result.columns);
2143/// println!("Rows: {}", result.row_count());
2144///
2145/// // Iterate through results
2146/// for row in result.iter() {
2147///     println!("{:?}", row);
2148/// }
2149/// # Ok::<(), grafeo_common::utils::error::Error>(())
2150/// ```
2151#[derive(Debug)]
2152pub struct QueryResult {
2153    /// Column names from the RETURN clause.
2154    pub columns: Vec<String>,
2155    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
2156    pub column_types: Vec<grafeo_common::types::LogicalType>,
2157    /// The actual result rows.
2158    pub rows: Vec<Vec<grafeo_common::types::Value>>,
2159    /// Query execution time in milliseconds (if timing was enabled).
2160    pub execution_time_ms: Option<f64>,
2161    /// Number of rows scanned during query execution (estimate).
2162    pub rows_scanned: Option<u64>,
2163}
2164
2165impl QueryResult {
2166    /// Creates a new empty query result.
2167    #[must_use]
2168    pub fn new(columns: Vec<String>) -> Self {
2169        let len = columns.len();
2170        Self {
2171            columns,
2172            column_types: vec![grafeo_common::types::LogicalType::Any; len],
2173            rows: Vec::new(),
2174            execution_time_ms: None,
2175            rows_scanned: None,
2176        }
2177    }
2178
2179    /// Creates a new empty query result with column types.
2180    #[must_use]
2181    pub fn with_types(
2182        columns: Vec<String>,
2183        column_types: Vec<grafeo_common::types::LogicalType>,
2184    ) -> Self {
2185        Self {
2186            columns,
2187            column_types,
2188            rows: Vec::new(),
2189            execution_time_ms: None,
2190            rows_scanned: None,
2191        }
2192    }
2193
2194    /// Sets the execution metrics on this result.
2195    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2196        self.execution_time_ms = Some(execution_time_ms);
2197        self.rows_scanned = Some(rows_scanned);
2198        self
2199    }
2200
2201    /// Returns the execution time in milliseconds, if available.
2202    #[must_use]
2203    pub fn execution_time_ms(&self) -> Option<f64> {
2204        self.execution_time_ms
2205    }
2206
2207    /// Returns the number of rows scanned, if available.
2208    #[must_use]
2209    pub fn rows_scanned(&self) -> Option<u64> {
2210        self.rows_scanned
2211    }
2212
2213    /// Returns the number of rows.
2214    #[must_use]
2215    pub fn row_count(&self) -> usize {
2216        self.rows.len()
2217    }
2218
2219    /// Returns the number of columns.
2220    #[must_use]
2221    pub fn column_count(&self) -> usize {
2222        self.columns.len()
2223    }
2224
2225    /// Returns true if the result is empty.
2226    #[must_use]
2227    pub fn is_empty(&self) -> bool {
2228        self.rows.is_empty()
2229    }
2230
2231    /// Extracts a single value from the result.
2232    ///
2233    /// Use this when your query returns exactly one row with one column,
2234    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
2235    ///
2236    /// # Errors
2237    ///
2238    /// Returns an error if the result has multiple rows or columns.
2239    pub fn scalar<T: FromValue>(&self) -> Result<T> {
2240        if self.rows.len() != 1 || self.columns.len() != 1 {
2241            return Err(grafeo_common::utils::error::Error::InvalidValue(
2242                "Expected single value".to_string(),
2243            ));
2244        }
2245        T::from_value(&self.rows[0][0])
2246    }
2247
2248    /// Returns an iterator over the rows.
2249    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2250        self.rows.iter()
2251    }
2252}
2253
2254/// Converts a [`Value`] to a concrete Rust type.
2255///
2256/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
2257/// Used by [`QueryResult::scalar()`] to extract typed values.
2258pub trait FromValue: Sized {
2259    /// Attempts the conversion, returning an error on type mismatch.
2260    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2261}
2262
2263impl FromValue for i64 {
2264    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2265        value
2266            .as_int64()
2267            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2268                expected: "INT64".to_string(),
2269                found: value.type_name().to_string(),
2270            })
2271    }
2272}
2273
2274impl FromValue for f64 {
2275    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2276        value
2277            .as_float64()
2278            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2279                expected: "FLOAT64".to_string(),
2280                found: value.type_name().to_string(),
2281            })
2282    }
2283}
2284
2285impl FromValue for String {
2286    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2287        value.as_str().map(String::from).ok_or_else(|| {
2288            grafeo_common::utils::error::Error::TypeMismatch {
2289                expected: "STRING".to_string(),
2290                found: value.type_name().to_string(),
2291            }
2292        })
2293    }
2294}
2295
2296impl FromValue for bool {
2297    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2298        value
2299            .as_bool()
2300            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2301                expected: "BOOL".to_string(),
2302                found: value.type_name().to_string(),
2303            })
2304    }
2305}
2306
2307#[cfg(test)]
2308mod tests {
2309    use super::*;
2310
2311    #[test]
2312    fn test_create_in_memory_database() {
2313        let db = GrafeoDB::new_in_memory();
2314        assert_eq!(db.node_count(), 0);
2315        assert_eq!(db.edge_count(), 0);
2316    }
2317
2318    #[test]
2319    fn test_database_config() {
2320        let config = Config::in_memory().with_threads(4).with_query_logging();
2321
2322        let db = GrafeoDB::with_config(config).unwrap();
2323        assert_eq!(db.config().threads, 4);
2324        assert!(db.config().query_logging);
2325    }
2326
2327    #[test]
2328    fn test_database_session() {
2329        let db = GrafeoDB::new_in_memory();
2330        let _session = db.session();
2331        // Session should be created successfully
2332    }
2333
2334    #[cfg(feature = "wal")]
2335    #[test]
2336    fn test_persistent_database_recovery() {
2337        use grafeo_common::types::Value;
2338        use tempfile::tempdir;
2339
2340        let dir = tempdir().unwrap();
2341        let db_path = dir.path().join("test_db");
2342
2343        // Create database and add some data
2344        {
2345            let db = GrafeoDB::open(&db_path).unwrap();
2346
2347            let alice = db.create_node(&["Person"]);
2348            db.set_node_property(alice, "name", Value::from("Alice"));
2349
2350            let bob = db.create_node(&["Person"]);
2351            db.set_node_property(bob, "name", Value::from("Bob"));
2352
2353            let _edge = db.create_edge(alice, bob, "KNOWS");
2354
2355            // Explicitly close to flush WAL
2356            db.close().unwrap();
2357        }
2358
2359        // Reopen and verify data was recovered
2360        {
2361            let db = GrafeoDB::open(&db_path).unwrap();
2362
2363            assert_eq!(db.node_count(), 2);
2364            assert_eq!(db.edge_count(), 1);
2365
2366            // Verify nodes exist
2367            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2368            assert!(node0.is_some());
2369
2370            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2371            assert!(node1.is_some());
2372        }
2373    }
2374
2375    #[cfg(feature = "wal")]
2376    #[test]
2377    fn test_wal_logging() {
2378        use tempfile::tempdir;
2379
2380        let dir = tempdir().unwrap();
2381        let db_path = dir.path().join("wal_test_db");
2382
2383        let db = GrafeoDB::open(&db_path).unwrap();
2384
2385        // Create some data
2386        let node = db.create_node(&["Test"]);
2387        db.delete_node(node);
2388
2389        // WAL should have records
2390        if let Some(wal) = db.wal() {
2391            assert!(wal.record_count() > 0);
2392        }
2393
2394        db.close().unwrap();
2395    }
2396
2397    #[cfg(feature = "wal")]
2398    #[test]
2399    fn test_wal_recovery_multiple_sessions() {
2400        // Tests that WAL recovery works correctly across multiple open/close cycles
2401        use grafeo_common::types::Value;
2402        use tempfile::tempdir;
2403
2404        let dir = tempdir().unwrap();
2405        let db_path = dir.path().join("multi_session_db");
2406
2407        // Session 1: Create initial data
2408        {
2409            let db = GrafeoDB::open(&db_path).unwrap();
2410            let alice = db.create_node(&["Person"]);
2411            db.set_node_property(alice, "name", Value::from("Alice"));
2412            db.close().unwrap();
2413        }
2414
2415        // Session 2: Add more data
2416        {
2417            let db = GrafeoDB::open(&db_path).unwrap();
2418            assert_eq!(db.node_count(), 1); // Previous data recovered
2419            let bob = db.create_node(&["Person"]);
2420            db.set_node_property(bob, "name", Value::from("Bob"));
2421            db.close().unwrap();
2422        }
2423
2424        // Session 3: Verify all data
2425        {
2426            let db = GrafeoDB::open(&db_path).unwrap();
2427            assert_eq!(db.node_count(), 2);
2428
2429            // Verify properties were recovered correctly
2430            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
2431            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
2432
2433            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
2434            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
2435        }
2436    }
2437
2438    #[cfg(feature = "wal")]
2439    #[test]
2440    fn test_database_consistency_after_mutations() {
2441        // Tests that database remains consistent after a series of create/delete operations
2442        use grafeo_common::types::Value;
2443        use tempfile::tempdir;
2444
2445        let dir = tempdir().unwrap();
2446        let db_path = dir.path().join("consistency_db");
2447
2448        {
2449            let db = GrafeoDB::open(&db_path).unwrap();
2450
2451            // Create nodes
2452            let a = db.create_node(&["Node"]);
2453            let b = db.create_node(&["Node"]);
2454            let c = db.create_node(&["Node"]);
2455
2456            // Create edges
2457            let e1 = db.create_edge(a, b, "LINKS");
2458            let _e2 = db.create_edge(b, c, "LINKS");
2459
2460            // Delete middle node and its edge
2461            db.delete_edge(e1);
2462            db.delete_node(b);
2463
2464            // Set properties on remaining nodes
2465            db.set_node_property(a, "value", Value::Int64(1));
2466            db.set_node_property(c, "value", Value::Int64(3));
2467
2468            db.close().unwrap();
2469        }
2470
2471        // Reopen and verify consistency
2472        {
2473            let db = GrafeoDB::open(&db_path).unwrap();
2474
2475            // Should have 2 nodes (a and c), b was deleted
2476            // Note: node_count includes deleted nodes in some implementations
2477            // What matters is that the non-deleted nodes are accessible
2478            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
2479            assert!(node_a.is_some());
2480
2481            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
2482            assert!(node_c.is_some());
2483
2484            // Middle node should be deleted
2485            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
2486            assert!(node_b.is_none());
2487        }
2488    }
2489
2490    #[cfg(feature = "wal")]
2491    #[test]
2492    fn test_close_is_idempotent() {
2493        // Calling close() multiple times should not cause errors
2494        use tempfile::tempdir;
2495
2496        let dir = tempdir().unwrap();
2497        let db_path = dir.path().join("close_test_db");
2498
2499        let db = GrafeoDB::open(&db_path).unwrap();
2500        db.create_node(&["Test"]);
2501
2502        // First close should succeed
2503        assert!(db.close().is_ok());
2504
2505        // Second close should also succeed (idempotent)
2506        assert!(db.close().is_ok());
2507    }
2508
2509    #[test]
2510    fn test_query_result_has_metrics() {
2511        // Verifies that query results include execution metrics
2512        let db = GrafeoDB::new_in_memory();
2513        db.create_node(&["Person"]);
2514        db.create_node(&["Person"]);
2515
2516        #[cfg(feature = "gql")]
2517        {
2518            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2519
2520            // Metrics should be populated
2521            assert!(result.execution_time_ms.is_some());
2522            assert!(result.rows_scanned.is_some());
2523            assert!(result.execution_time_ms.unwrap() >= 0.0);
2524            assert_eq!(result.rows_scanned.unwrap(), 2);
2525        }
2526    }
2527
2528    #[test]
2529    fn test_empty_query_result_metrics() {
2530        // Verifies metrics are correct for queries returning no results
2531        let db = GrafeoDB::new_in_memory();
2532        db.create_node(&["Person"]);
2533
2534        #[cfg(feature = "gql")]
2535        {
2536            // Query that matches nothing
2537            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2538
2539            assert!(result.execution_time_ms.is_some());
2540            assert!(result.rows_scanned.is_some());
2541            assert_eq!(result.rows_scanned.unwrap(), 0);
2542        }
2543    }
2544}