Skip to main content

grafeo_engine/
database.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::atomic::AtomicUsize;
8
9use parking_lot::RwLock;
10
11#[cfg(feature = "wal")]
12use grafeo_adapters::storage::wal::{
13    DurabilityMode as WalDurabilityMode, WalConfig, WalManager, WalRecord, WalRecovery,
14};
15use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
16use grafeo_common::types::{EdgeId, NodeId, Value};
17use grafeo_common::utils::error::{Error, Result};
18use grafeo_core::graph::lpg::LpgStore;
19#[cfg(feature = "rdf")]
20use grafeo_core::graph::rdf::RdfStore;
21
22use crate::config::Config;
23use crate::query::cache::QueryCache;
24use crate::session::Session;
25use crate::transaction::TransactionManager;
26
27/// Your handle to a Grafeo database.
28///
29/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
30/// quick experiments, or [`open()`](Self::open) for persistent storage.
31/// Then grab a [`session()`](Self::session) to start querying.
32///
33/// # Examples
34///
35/// ```
36/// use grafeo_engine::GrafeoDB;
37///
38/// // Quick in-memory database
39/// let db = GrafeoDB::new_in_memory();
40///
41/// // Add some data
42/// db.create_node(&["Person"]);
43///
44/// // Query it
45/// let session = db.session();
46/// let result = session.execute("MATCH (p:Person) RETURN p")?;
47/// # Ok::<(), grafeo_common::utils::error::Error>(())
48/// ```
49pub struct GrafeoDB {
50    /// Database configuration.
51    config: Config,
52    /// The underlying graph store.
53    store: Arc<LpgStore>,
54    /// RDF triple store (if RDF feature is enabled).
55    #[cfg(feature = "rdf")]
56    rdf_store: Arc<RdfStore>,
57    /// Transaction manager.
58    tx_manager: Arc<TransactionManager>,
59    /// Unified buffer manager.
60    buffer_manager: Arc<BufferManager>,
61    /// Write-ahead log manager (if durability is enabled).
62    #[cfg(feature = "wal")]
63    wal: Option<Arc<WalManager>>,
64    /// Query cache for parsed and optimized plans.
65    query_cache: Arc<QueryCache>,
66    /// Shared commit counter for auto-GC across sessions.
67    commit_counter: Arc<AtomicUsize>,
68    /// Whether the database is open.
69    is_open: RwLock<bool>,
70}
71
72impl GrafeoDB {
73    /// Creates an in-memory database - fast to create, gone when dropped.
74    ///
75    /// Use this for tests, experiments, or when you don't need persistence.
76    /// For data that survives restarts, use [`open()`](Self::open) instead.
77    ///
78    /// # Examples
79    ///
80    /// ```
81    /// use grafeo_engine::GrafeoDB;
82    ///
83    /// let db = GrafeoDB::new_in_memory();
84    /// let session = db.session();
85    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
86    /// # Ok::<(), grafeo_common::utils::error::Error>(())
87    /// ```
88    #[must_use]
89    pub fn new_in_memory() -> Self {
90        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
91    }
92
93    /// Opens a database at the given path, creating it if it doesn't exist.
94    ///
95    /// If you've used this path before, Grafeo recovers your data from the
96    /// write-ahead log automatically. First open on a new path creates an
97    /// empty database.
98    ///
99    /// # Errors
100    ///
101    /// Returns an error if the path isn't writable or recovery fails.
102    ///
103    /// # Examples
104    ///
105    /// ```no_run
106    /// use grafeo_engine::GrafeoDB;
107    ///
108    /// let db = GrafeoDB::open("./my_social_network")?;
109    /// # Ok::<(), grafeo_common::utils::error::Error>(())
110    /// ```
111    #[cfg(feature = "wal")]
112    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
113        Self::with_config(Config::persistent(path.as_ref()))
114    }
115
116    /// Creates a database with custom configuration.
117    ///
118    /// Use this when you need fine-grained control over memory limits,
119    /// thread counts, or persistence settings. For most cases,
120    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
121    /// are simpler.
122    ///
123    /// # Errors
124    ///
125    /// Returns an error if the database can't be created or recovery fails.
126    ///
127    /// # Examples
128    ///
129    /// ```
130    /// use grafeo_engine::{GrafeoDB, Config};
131    ///
132    /// // In-memory with a 512MB limit
133    /// let config = Config::in_memory()
134    ///     .with_memory_limit(512 * 1024 * 1024);
135    ///
136    /// let db = GrafeoDB::with_config(config)?;
137    /// # Ok::<(), grafeo_common::utils::error::Error>(())
138    /// ```
139    pub fn with_config(config: Config) -> Result<Self> {
140        // Validate configuration before proceeding
141        config
142            .validate()
143            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
144
145        let store = Arc::new(LpgStore::new());
146        #[cfg(feature = "rdf")]
147        let rdf_store = Arc::new(RdfStore::new());
148        let tx_manager = Arc::new(TransactionManager::new());
149
150        // Create buffer manager with configured limits
151        let buffer_config = BufferManagerConfig {
152            budget: config.memory_limit.unwrap_or_else(|| {
153                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
154            }),
155            spill_path: config
156                .spill_path
157                .clone()
158                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
159            ..BufferManagerConfig::default()
160        };
161        let buffer_manager = BufferManager::new(buffer_config);
162
163        // Initialize WAL if persistence is enabled
164        #[cfg(feature = "wal")]
165        let wal = if config.wal_enabled {
166            if let Some(ref db_path) = config.path {
167                // Create database directory if it doesn't exist
168                std::fs::create_dir_all(db_path)?;
169
170                let wal_path = db_path.join("wal");
171
172                // Check if WAL exists and recover if needed
173                if wal_path.exists() {
174                    let recovery = WalRecovery::new(&wal_path);
175                    let records = recovery.recover()?;
176                    Self::apply_wal_records(&store, &records)?;
177                }
178
179                // Open/create WAL manager with configured durability
180                let wal_durability = match config.wal_durability {
181                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
182                    crate::config::DurabilityMode::Batch {
183                        max_delay_ms,
184                        max_records,
185                    } => WalDurabilityMode::Batch {
186                        max_delay_ms,
187                        max_records,
188                    },
189                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
190                        WalDurabilityMode::Adaptive { target_interval_ms }
191                    }
192                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
193                };
194                let wal_config = WalConfig {
195                    durability: wal_durability,
196                    ..WalConfig::default()
197                };
198                let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
199                Some(Arc::new(wal_manager))
200            } else {
201                None
202            }
203        } else {
204            None
205        };
206
207        // Create query cache with default capacity (1000 queries)
208        let query_cache = Arc::new(QueryCache::default());
209
210        Ok(Self {
211            config,
212            store,
213            #[cfg(feature = "rdf")]
214            rdf_store,
215            tx_manager,
216            buffer_manager,
217            #[cfg(feature = "wal")]
218            wal,
219            query_cache,
220            commit_counter: Arc::new(AtomicUsize::new(0)),
221            is_open: RwLock::new(true),
222        })
223    }
224
225    /// Applies WAL records to restore the database state.
226    #[cfg(feature = "wal")]
227    fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
228        for record in records {
229            match record {
230                WalRecord::CreateNode { id, labels } => {
231                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
232                    store.create_node_with_id(*id, &label_refs);
233                }
234                WalRecord::DeleteNode { id } => {
235                    store.delete_node(*id);
236                }
237                WalRecord::CreateEdge {
238                    id,
239                    src,
240                    dst,
241                    edge_type,
242                } => {
243                    store.create_edge_with_id(*id, *src, *dst, edge_type);
244                }
245                WalRecord::DeleteEdge { id } => {
246                    store.delete_edge(*id);
247                }
248                WalRecord::SetNodeProperty { id, key, value } => {
249                    store.set_node_property(*id, key, value.clone());
250                }
251                WalRecord::SetEdgeProperty { id, key, value } => {
252                    store.set_edge_property(*id, key, value.clone());
253                }
254                WalRecord::AddNodeLabel { id, label } => {
255                    store.add_label(*id, label);
256                }
257                WalRecord::RemoveNodeLabel { id, label } => {
258                    store.remove_label(*id, label);
259                }
260                WalRecord::TxCommit { .. }
261                | WalRecord::TxAbort { .. }
262                | WalRecord::Checkpoint { .. } => {
263                    // Transaction control records don't need replay action
264                    // (recovery already filtered to only committed transactions)
265                }
266            }
267        }
268        Ok(())
269    }
270
271    /// Opens a new session for running queries.
272    ///
273    /// Sessions are cheap to create - spin up as many as you need. Each
274    /// gets its own transaction context, so concurrent sessions won't
275    /// block each other on reads.
276    ///
277    /// # Examples
278    ///
279    /// ```
280    /// use grafeo_engine::GrafeoDB;
281    ///
282    /// let db = GrafeoDB::new_in_memory();
283    /// let session = db.session();
284    ///
285    /// // Run queries through the session
286    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
287    /// # Ok::<(), grafeo_common::utils::error::Error>(())
288    /// ```
289    #[must_use]
290    pub fn session(&self) -> Session {
291        #[cfg(feature = "rdf")]
292        {
293            Session::with_rdf_store_and_adaptive(
294                Arc::clone(&self.store),
295                Arc::clone(&self.rdf_store),
296                Arc::clone(&self.tx_manager),
297                Arc::clone(&self.query_cache),
298                self.config.adaptive.clone(),
299                self.config.factorized_execution,
300                self.config.graph_model,
301                self.config.query_timeout,
302                Arc::clone(&self.commit_counter),
303                self.config.gc_interval,
304            )
305        }
306        #[cfg(not(feature = "rdf"))]
307        {
308            Session::with_adaptive(
309                Arc::clone(&self.store),
310                Arc::clone(&self.tx_manager),
311                Arc::clone(&self.query_cache),
312                self.config.adaptive.clone(),
313                self.config.factorized_execution,
314                self.config.graph_model,
315                self.config.query_timeout,
316                Arc::clone(&self.commit_counter),
317                self.config.gc_interval,
318            )
319        }
320    }
321
322    /// Returns the adaptive execution configuration.
323    #[must_use]
324    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
325        &self.config.adaptive
326    }
327
328    /// Runs a query directly on the database.
329    ///
330    /// A convenience method that creates a temporary session behind the
331    /// scenes. If you're running multiple queries, grab a
332    /// [`session()`](Self::session) instead to avoid the overhead.
333    ///
334    /// # Errors
335    ///
336    /// Returns an error if parsing or execution fails.
337    pub fn execute(&self, query: &str) -> Result<QueryResult> {
338        let session = self.session();
339        session.execute(query)
340    }
341
342    /// Executes a query with parameters and returns the result.
343    ///
344    /// # Errors
345    ///
346    /// Returns an error if the query fails.
347    pub fn execute_with_params(
348        &self,
349        query: &str,
350        params: std::collections::HashMap<String, grafeo_common::types::Value>,
351    ) -> Result<QueryResult> {
352        let session = self.session();
353        session.execute_with_params(query, params)
354    }
355
356    /// Executes a Cypher query and returns the result.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if the query fails.
361    #[cfg(feature = "cypher")]
362    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
363        let session = self.session();
364        session.execute_cypher(query)
365    }
366
367    /// Executes a Cypher query with parameters and returns the result.
368    ///
369    /// # Errors
370    ///
371    /// Returns an error if the query fails.
372    #[cfg(feature = "cypher")]
373    pub fn execute_cypher_with_params(
374        &self,
375        query: &str,
376        params: std::collections::HashMap<String, grafeo_common::types::Value>,
377    ) -> Result<QueryResult> {
378        use crate::query::processor::{QueryLanguage, QueryProcessor};
379
380        // Create processor
381        let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
382        processor.process(query, QueryLanguage::Cypher, Some(&params))
383    }
384
385    /// Executes a Gremlin query and returns the result.
386    ///
387    /// # Errors
388    ///
389    /// Returns an error if the query fails.
390    #[cfg(feature = "gremlin")]
391    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
392        let session = self.session();
393        session.execute_gremlin(query)
394    }
395
396    /// Executes a Gremlin query with parameters and returns the result.
397    ///
398    /// # Errors
399    ///
400    /// Returns an error if the query fails.
401    #[cfg(feature = "gremlin")]
402    pub fn execute_gremlin_with_params(
403        &self,
404        query: &str,
405        params: std::collections::HashMap<String, grafeo_common::types::Value>,
406    ) -> Result<QueryResult> {
407        let session = self.session();
408        session.execute_gremlin_with_params(query, params)
409    }
410
411    /// Executes a GraphQL query and returns the result.
412    ///
413    /// # Errors
414    ///
415    /// Returns an error if the query fails.
416    #[cfg(feature = "graphql")]
417    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
418        let session = self.session();
419        session.execute_graphql(query)
420    }
421
422    /// Executes a GraphQL query with parameters and returns the result.
423    ///
424    /// # Errors
425    ///
426    /// Returns an error if the query fails.
427    #[cfg(feature = "graphql")]
428    pub fn execute_graphql_with_params(
429        &self,
430        query: &str,
431        params: std::collections::HashMap<String, grafeo_common::types::Value>,
432    ) -> Result<QueryResult> {
433        let session = self.session();
434        session.execute_graphql_with_params(query, params)
435    }
436
437    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE) and returns the result.
438    ///
439    /// # Errors
440    ///
441    /// Returns an error if the query fails.
442    #[cfg(feature = "sql-pgq")]
443    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
444        let session = self.session();
445        session.execute_sql(query)
446    }
447
448    /// Executes a SQL/PGQ query with parameters and returns the result.
449    ///
450    /// # Errors
451    ///
452    /// Returns an error if the query fails.
453    #[cfg(feature = "sql-pgq")]
454    pub fn execute_sql_with_params(
455        &self,
456        query: &str,
457        params: std::collections::HashMap<String, grafeo_common::types::Value>,
458    ) -> Result<QueryResult> {
459        use crate::query::processor::{QueryLanguage, QueryProcessor};
460
461        // Create processor
462        let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
463        processor.process(query, QueryLanguage::SqlPgq, Some(&params))
464    }
465
466    /// Executes a SPARQL query and returns the result.
467    ///
468    /// SPARQL queries operate on the RDF triple store.
469    ///
470    /// # Errors
471    ///
472    /// Returns an error if the query fails.
473    ///
474    /// # Examples
475    ///
476    /// ```ignore
477    /// use grafeo_engine::GrafeoDB;
478    ///
479    /// let db = GrafeoDB::new_in_memory();
480    /// let result = db.execute_sparql("SELECT ?s ?p ?o WHERE { ?s ?p ?o }")?;
481    /// ```
482    #[cfg(all(feature = "sparql", feature = "rdf"))]
483    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
484        use crate::query::{
485            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
486        };
487
488        // Parse and translate the SPARQL query to a logical plan
489        let logical_plan = sparql_translator::translate(query)?;
490
491        // Optimize the plan
492        let optimizer = Optimizer::from_store(&self.store);
493        let optimized_plan = optimizer.optimize(logical_plan)?;
494
495        // Convert to physical plan using RDF planner
496        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
497        let mut physical_plan = planner.plan(&optimized_plan)?;
498
499        // Execute the plan
500        let executor = Executor::with_columns(physical_plan.columns.clone());
501        executor.execute(physical_plan.operator.as_mut())
502    }
503
504    /// Returns the RDF store.
505    ///
506    /// This provides direct access to the RDF store for triple operations.
507    #[cfg(feature = "rdf")]
508    #[must_use]
509    pub fn rdf_store(&self) -> &Arc<RdfStore> {
510        &self.rdf_store
511    }
512
513    /// Executes a query and returns a single scalar value.
514    ///
515    /// # Errors
516    ///
517    /// Returns an error if the query fails or doesn't return exactly one row.
518    pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
519        let result = self.execute(query)?;
520        result.scalar()
521    }
522
523    /// Returns the configuration.
524    #[must_use]
525    pub fn config(&self) -> &Config {
526        &self.config
527    }
528
529    /// Returns the graph data model of this database.
530    #[must_use]
531    pub fn graph_model(&self) -> crate::config::GraphModel {
532        self.config.graph_model
533    }
534
535    /// Returns the configured memory limit in bytes, if any.
536    #[must_use]
537    pub fn memory_limit(&self) -> Option<usize> {
538        self.config.memory_limit
539    }
540
541    /// Returns the underlying store.
542    ///
543    /// This provides direct access to the LPG store for algorithm implementations.
544    #[must_use]
545    pub fn store(&self) -> &Arc<LpgStore> {
546        &self.store
547    }
548
549    /// Garbage collects old MVCC versions that are no longer visible.
550    ///
551    /// Determines the minimum epoch required by active transactions and prunes
552    /// version chains older than that threshold. Also cleans up completed
553    /// transaction metadata in the transaction manager.
554    pub fn gc(&self) {
555        let min_epoch = self.tx_manager.min_active_epoch();
556        self.store.gc_versions(min_epoch);
557        self.tx_manager.gc();
558    }
559
560    /// Returns the buffer manager for memory-aware operations.
561    #[must_use]
562    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
563        &self.buffer_manager
564    }
565
566    /// Closes the database, flushing all pending writes.
567    ///
568    /// For persistent databases, this ensures everything is safely on disk.
569    /// Called automatically when the database is dropped, but you can call
570    /// it explicitly if you need to guarantee durability at a specific point.
571    ///
572    /// # Errors
573    ///
574    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
575    pub fn close(&self) -> Result<()> {
576        let mut is_open = self.is_open.write();
577        if !*is_open {
578            return Ok(());
579        }
580
581        // Commit and checkpoint WAL
582        #[cfg(feature = "wal")]
583        if let Some(ref wal) = self.wal {
584            let epoch = self.store.current_epoch();
585
586            // Use the last assigned transaction ID, or create a checkpoint-only tx
587            let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
588                // No transactions have been started; begin one for checkpoint
589                self.tx_manager.begin()
590            });
591
592            // Log a TxCommit to mark all pending records as committed
593            wal.log(&WalRecord::TxCommit {
594                tx_id: checkpoint_tx,
595            })?;
596
597            // Then checkpoint
598            wal.checkpoint(checkpoint_tx, epoch)?;
599            wal.sync()?;
600        }
601
602        *is_open = false;
603        Ok(())
604    }
605
606    /// Returns the WAL manager if available.
607    #[cfg(feature = "wal")]
608    #[must_use]
609    pub fn wal(&self) -> Option<&Arc<WalManager>> {
610        self.wal.as_ref()
611    }
612
613    /// Logs a WAL record if WAL is enabled.
614    #[cfg(feature = "wal")]
615    fn log_wal(&self, record: &WalRecord) -> Result<()> {
616        if let Some(ref wal) = self.wal {
617            wal.log(record)?;
618        }
619        Ok(())
620    }
621
622    /// Returns the number of nodes in the database.
623    #[must_use]
624    pub fn node_count(&self) -> usize {
625        self.store.node_count()
626    }
627
628    /// Returns the number of edges in the database.
629    #[must_use]
630    pub fn edge_count(&self) -> usize {
631        self.store.edge_count()
632    }
633
634    /// Returns the number of distinct labels in the database.
635    #[must_use]
636    pub fn label_count(&self) -> usize {
637        self.store.label_count()
638    }
639
640    /// Returns the number of distinct property keys in the database.
641    #[must_use]
642    pub fn property_key_count(&self) -> usize {
643        self.store.property_key_count()
644    }
645
646    /// Returns the number of distinct edge types in the database.
647    #[must_use]
648    pub fn edge_type_count(&self) -> usize {
649        self.store.edge_type_count()
650    }
651
652    // === Node Operations ===
653
654    /// Creates a node with the given labels and returns its ID.
655    ///
656    /// Labels categorize nodes - think of them like tags. A node can have
657    /// multiple labels (e.g., `["Person", "Employee"]`).
658    ///
659    /// # Examples
660    ///
661    /// ```
662    /// use grafeo_engine::GrafeoDB;
663    ///
664    /// let db = GrafeoDB::new_in_memory();
665    /// let alice = db.create_node(&["Person"]);
666    /// let company = db.create_node(&["Company", "Startup"]);
667    /// ```
668    pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
669        let id = self.store.create_node(labels);
670
671        // Log to WAL if enabled
672        #[cfg(feature = "wal")]
673        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
674            id,
675            labels: labels.iter().map(|s| (*s).to_string()).collect(),
676        }) {
677            tracing::warn!("Failed to log CreateNode to WAL: {}", e);
678        }
679
680        id
681    }
682
683    /// Creates a new node with labels and properties.
684    ///
685    /// If WAL is enabled, the operation is logged for durability.
686    pub fn create_node_with_props(
687        &self,
688        labels: &[&str],
689        properties: impl IntoIterator<
690            Item = (
691                impl Into<grafeo_common::types::PropertyKey>,
692                impl Into<grafeo_common::types::Value>,
693            ),
694        >,
695    ) -> grafeo_common::types::NodeId {
696        // Collect properties first so we can log them to WAL
697        let props: Vec<(
698            grafeo_common::types::PropertyKey,
699            grafeo_common::types::Value,
700        )> = properties
701            .into_iter()
702            .map(|(k, v)| (k.into(), v.into()))
703            .collect();
704
705        let id = self
706            .store
707            .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
708
709        // Log node creation to WAL
710        #[cfg(feature = "wal")]
711        {
712            if let Err(e) = self.log_wal(&WalRecord::CreateNode {
713                id,
714                labels: labels.iter().map(|s| (*s).to_string()).collect(),
715            }) {
716                tracing::warn!("Failed to log CreateNode to WAL: {}", e);
717            }
718
719            // Log each property to WAL for full durability
720            for (key, value) in props {
721                if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
722                    id,
723                    key: key.to_string(),
724                    value,
725                }) {
726                    tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
727                }
728            }
729        }
730
731        id
732    }
733
734    /// Gets a node by ID.
735    #[must_use]
736    pub fn get_node(
737        &self,
738        id: grafeo_common::types::NodeId,
739    ) -> Option<grafeo_core::graph::lpg::Node> {
740        self.store.get_node(id)
741    }
742
743    /// Deletes a node and all its edges.
744    ///
745    /// If WAL is enabled, the operation is logged for durability.
746    pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
747        // Collect matching vector indexes BEFORE deletion removes labels
748        #[cfg(feature = "vector-index")]
749        let indexes_to_clean: Vec<std::sync::Arc<grafeo_core::index::vector::HnswIndex>> = self
750            .store
751            .get_node(id)
752            .map(|node| {
753                let mut indexes = Vec::new();
754                for label in &node.labels {
755                    let prefix = format!("{}:", label.as_str());
756                    for (key, index) in self.store.vector_index_entries() {
757                        if key.starts_with(&prefix) {
758                            indexes.push(index);
759                        }
760                    }
761                }
762                indexes
763            })
764            .unwrap_or_default();
765
766        let result = self.store.delete_node(id);
767
768        // Remove from vector indexes after successful deletion
769        #[cfg(feature = "vector-index")]
770        if result {
771            for index in indexes_to_clean {
772                index.remove(id);
773            }
774        }
775
776        #[cfg(feature = "wal")]
777        if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
778            tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
779        }
780
781        result
782    }
783
784    /// Sets a property on a node.
785    ///
786    /// If WAL is enabled, the operation is logged for durability.
787    pub fn set_node_property(
788        &self,
789        id: grafeo_common::types::NodeId,
790        key: &str,
791        value: grafeo_common::types::Value,
792    ) {
793        // Extract vector data before the value is moved into the store
794        #[cfg(feature = "vector-index")]
795        let vector_data = match &value {
796            grafeo_common::types::Value::Vector(v) => Some(v.clone()),
797            _ => None,
798        };
799
800        // Log to WAL first
801        #[cfg(feature = "wal")]
802        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
803            id,
804            key: key.to_string(),
805            value: value.clone(),
806        }) {
807            tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
808        }
809
810        self.store.set_node_property(id, key, value);
811
812        // Auto-insert into matching vector indexes
813        #[cfg(feature = "vector-index")]
814        if let Some(vec) = vector_data
815            && let Some(node) = self.store.get_node(id)
816        {
817            for label in &node.labels {
818                if let Some(index) = self.store.get_vector_index(label.as_str(), key) {
819                    let accessor =
820                        grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, key);
821                    index.insert(id, &vec, &accessor);
822                }
823            }
824        }
825    }
826
827    /// Adds a label to an existing node.
828    ///
829    /// Returns `true` if the label was added, `false` if the node doesn't exist
830    /// or already has the label.
831    ///
832    /// # Examples
833    ///
834    /// ```
835    /// use grafeo_engine::GrafeoDB;
836    ///
837    /// let db = GrafeoDB::new_in_memory();
838    /// let alice = db.create_node(&["Person"]);
839    ///
840    /// // Promote Alice to Employee
841    /// let added = db.add_node_label(alice, "Employee");
842    /// assert!(added);
843    /// ```
844    pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
845        let result = self.store.add_label(id, label);
846
847        #[cfg(feature = "wal")]
848        if result {
849            // Log to WAL if enabled
850            if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
851                id,
852                label: label.to_string(),
853            }) {
854                tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
855            }
856        }
857
858        // Auto-insert into vector indexes for the newly-added label
859        #[cfg(feature = "vector-index")]
860        if result {
861            let prefix = format!("{label}:");
862            for (key, index) in self.store.vector_index_entries() {
863                if let Some(property) = key.strip_prefix(&prefix)
864                    && let Some(node) = self.store.get_node(id)
865                {
866                    let prop_key = grafeo_common::types::PropertyKey::new(property);
867                    if let Some(grafeo_common::types::Value::Vector(v)) =
868                        node.properties.get(&prop_key)
869                    {
870                        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
871                            &self.store,
872                            property,
873                        );
874                        index.insert(id, v, &accessor);
875                    }
876                }
877            }
878        }
879
880        result
881    }
882
883    /// Removes a label from a node.
884    ///
885    /// Returns `true` if the label was removed, `false` if the node doesn't exist
886    /// or doesn't have the label.
887    ///
888    /// # Examples
889    ///
890    /// ```
891    /// use grafeo_engine::GrafeoDB;
892    ///
893    /// let db = GrafeoDB::new_in_memory();
894    /// let alice = db.create_node(&["Person", "Employee"]);
895    ///
896    /// // Remove Employee status
897    /// let removed = db.remove_node_label(alice, "Employee");
898    /// assert!(removed);
899    /// ```
900    pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
901        let result = self.store.remove_label(id, label);
902
903        #[cfg(feature = "wal")]
904        if result {
905            // Log to WAL if enabled
906            if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
907                id,
908                label: label.to_string(),
909            }) {
910                tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
911            }
912        }
913
914        result
915    }
916
917    /// Gets all labels for a node.
918    ///
919    /// Returns `None` if the node doesn't exist.
920    ///
921    /// # Examples
922    ///
923    /// ```
924    /// use grafeo_engine::GrafeoDB;
925    ///
926    /// let db = GrafeoDB::new_in_memory();
927    /// let alice = db.create_node(&["Person", "Employee"]);
928    ///
929    /// let labels = db.get_node_labels(alice).unwrap();
930    /// assert!(labels.contains(&"Person".to_string()));
931    /// assert!(labels.contains(&"Employee".to_string()));
932    /// ```
933    #[must_use]
934    pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
935        self.store
936            .get_node(id)
937            .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
938    }
939
940    // === Edge Operations ===
941
942    /// Creates an edge (relationship) between two nodes.
943    ///
944    /// Edges connect nodes and have a type that describes the relationship.
945    /// They're directed - the order of `src` and `dst` matters.
946    ///
947    /// # Examples
948    ///
949    /// ```
950    /// use grafeo_engine::GrafeoDB;
951    ///
952    /// let db = GrafeoDB::new_in_memory();
953    /// let alice = db.create_node(&["Person"]);
954    /// let bob = db.create_node(&["Person"]);
955    ///
956    /// // Alice knows Bob (directed: Alice -> Bob)
957    /// let edge = db.create_edge(alice, bob, "KNOWS");
958    /// ```
959    pub fn create_edge(
960        &self,
961        src: grafeo_common::types::NodeId,
962        dst: grafeo_common::types::NodeId,
963        edge_type: &str,
964    ) -> grafeo_common::types::EdgeId {
965        let id = self.store.create_edge(src, dst, edge_type);
966
967        // Log to WAL if enabled
968        #[cfg(feature = "wal")]
969        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
970            id,
971            src,
972            dst,
973            edge_type: edge_type.to_string(),
974        }) {
975            tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
976        }
977
978        id
979    }
980
981    /// Creates a new edge with properties.
982    ///
983    /// If WAL is enabled, the operation is logged for durability.
984    pub fn create_edge_with_props(
985        &self,
986        src: grafeo_common::types::NodeId,
987        dst: grafeo_common::types::NodeId,
988        edge_type: &str,
989        properties: impl IntoIterator<
990            Item = (
991                impl Into<grafeo_common::types::PropertyKey>,
992                impl Into<grafeo_common::types::Value>,
993            ),
994        >,
995    ) -> grafeo_common::types::EdgeId {
996        // Collect properties first so we can log them to WAL
997        let props: Vec<(
998            grafeo_common::types::PropertyKey,
999            grafeo_common::types::Value,
1000        )> = properties
1001            .into_iter()
1002            .map(|(k, v)| (k.into(), v.into()))
1003            .collect();
1004
1005        let id = self.store.create_edge_with_props(
1006            src,
1007            dst,
1008            edge_type,
1009            props.iter().map(|(k, v)| (k.clone(), v.clone())),
1010        );
1011
1012        // Log edge creation to WAL
1013        #[cfg(feature = "wal")]
1014        {
1015            if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
1016                id,
1017                src,
1018                dst,
1019                edge_type: edge_type.to_string(),
1020            }) {
1021                tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
1022            }
1023
1024            // Log each property to WAL for full durability
1025            for (key, value) in props {
1026                if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
1027                    id,
1028                    key: key.to_string(),
1029                    value,
1030                }) {
1031                    tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
1032                }
1033            }
1034        }
1035
1036        id
1037    }
1038
1039    /// Gets an edge by ID.
1040    #[must_use]
1041    pub fn get_edge(
1042        &self,
1043        id: grafeo_common::types::EdgeId,
1044    ) -> Option<grafeo_core::graph::lpg::Edge> {
1045        self.store.get_edge(id)
1046    }
1047
1048    /// Deletes an edge.
1049    ///
1050    /// If WAL is enabled, the operation is logged for durability.
1051    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
1052        let result = self.store.delete_edge(id);
1053
1054        #[cfg(feature = "wal")]
1055        if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
1056            tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
1057        }
1058
1059        result
1060    }
1061
1062    /// Sets a property on an edge.
1063    ///
1064    /// If WAL is enabled, the operation is logged for durability.
1065    pub fn set_edge_property(
1066        &self,
1067        id: grafeo_common::types::EdgeId,
1068        key: &str,
1069        value: grafeo_common::types::Value,
1070    ) {
1071        // Log to WAL first
1072        #[cfg(feature = "wal")]
1073        if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
1074            id,
1075            key: key.to_string(),
1076            value: value.clone(),
1077        }) {
1078            tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
1079        }
1080        self.store.set_edge_property(id, key, value);
1081    }
1082
1083    /// Removes a property from a node.
1084    ///
1085    /// Returns true if the property existed and was removed, false otherwise.
1086    pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
1087        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
1088        self.store.remove_node_property(id, key).is_some()
1089    }
1090
1091    /// Removes a property from an edge.
1092    ///
1093    /// Returns true if the property existed and was removed, false otherwise.
1094    pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
1095        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
1096        self.store.remove_edge_property(id, key).is_some()
1097    }
1098
1099    // =========================================================================
1100    // PROPERTY INDEX API
1101    // =========================================================================
1102
1103    /// Creates an index on a node property for O(1) lookups by value.
1104    ///
1105    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
1106    /// O(1) instead of O(n) for this property. The index is automatically
1107    /// maintained when properties are set or removed.
1108    ///
1109    /// # Example
1110    ///
1111    /// ```ignore
1112    /// // Create an index on the 'email' property
1113    /// db.create_property_index("email");
1114    ///
1115    /// // Now lookups by email are O(1)
1116    /// let nodes = db.find_nodes_by_property("email", &Value::from("alice@example.com"));
1117    /// ```
1118    pub fn create_property_index(&self, property: &str) {
1119        self.store.create_property_index(property);
1120    }
1121
1122    /// Creates a vector similarity index on a node property.
1123    ///
1124    /// This enables efficient approximate nearest-neighbor search on vector
1125    /// properties. Currently validates the index parameters and scans existing
1126    /// nodes to verify the property contains vectors of the expected dimensions.
1127    ///
1128    /// # Arguments
1129    ///
1130    /// * `label` - Node label to index (e.g., `"Doc"`)
1131    /// * `property` - Property containing vector embeddings (e.g., `"embedding"`)
1132    /// * `dimensions` - Expected vector dimensions (inferred from data if `None`)
1133    /// * `metric` - Distance metric: `"cosine"` (default), `"euclidean"`, `"dot_product"`, `"manhattan"`
1134    /// * `m` - HNSW links per node (default: 16). Higher = better recall, more memory.
1135    /// * `ef_construction` - Construction beam width (default: 128). Higher = better index quality, slower build.
1136    ///
1137    /// # Errors
1138    ///
1139    /// Returns an error if the metric is invalid, no vectors are found, or
1140    /// dimensions don't match.
1141    pub fn create_vector_index(
1142        &self,
1143        label: &str,
1144        property: &str,
1145        dimensions: Option<usize>,
1146        metric: Option<&str>,
1147        m: Option<usize>,
1148        ef_construction: Option<usize>,
1149    ) -> Result<()> {
1150        use grafeo_common::types::{PropertyKey, Value};
1151        use grafeo_core::index::vector::DistanceMetric;
1152
1153        let metric = match metric {
1154            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1155                grafeo_common::utils::error::Error::Internal(format!(
1156                    "Unknown distance metric '{}'. Use: cosine, euclidean, dot_product, manhattan",
1157                    m
1158                ))
1159            })?,
1160            None => DistanceMetric::Cosine,
1161        };
1162
1163        // Scan nodes to validate vectors exist and check dimensions
1164        let prop_key = PropertyKey::new(property);
1165        let mut found_dims: Option<usize> = dimensions;
1166        let mut vector_count = 0usize;
1167
1168        #[cfg(feature = "vector-index")]
1169        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1170
1171        for node in self.store.nodes_with_label(label) {
1172            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1173                if let Some(expected) = found_dims {
1174                    if v.len() != expected {
1175                        return Err(grafeo_common::utils::error::Error::Internal(format!(
1176                            "Vector dimension mismatch: expected {}, found {} on node {}",
1177                            expected,
1178                            v.len(),
1179                            node.id.0
1180                        )));
1181                    }
1182                } else {
1183                    found_dims = Some(v.len());
1184                }
1185                vector_count += 1;
1186                #[cfg(feature = "vector-index")]
1187                vectors.push((node.id, v.to_vec()));
1188            }
1189        }
1190
1191        if vector_count == 0 {
1192            return Err(grafeo_common::utils::error::Error::Internal(format!(
1193                "No vector properties found on :{label}({property})"
1194            )));
1195        }
1196
1197        let dims = found_dims.unwrap_or(0);
1198
1199        // Build and populate the HNSW index
1200        #[cfg(feature = "vector-index")]
1201        {
1202            use grafeo_core::index::vector::{HnswConfig, HnswIndex};
1203
1204            let mut config = HnswConfig::new(dims, metric);
1205            if let Some(m_val) = m {
1206                config = config.with_m(m_val);
1207            }
1208            if let Some(ef_c) = ef_construction {
1209                config = config.with_ef_construction(ef_c);
1210            }
1211
1212            let index = HnswIndex::with_capacity(config, vectors.len());
1213            let accessor =
1214                grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1215            for (node_id, vec) in &vectors {
1216                index.insert(*node_id, vec, &accessor);
1217            }
1218
1219            self.store
1220                .add_vector_index(label, property, Arc::new(index));
1221        }
1222
1223        // Suppress unused variable warnings when vector-index is off
1224        let _ = (m, ef_construction);
1225
1226        tracing::info!(
1227            "Vector index created: :{label}({property}) - {vector_count} vectors, {dims} dimensions, metric={metric_name}",
1228            metric_name = metric.name()
1229        );
1230
1231        Ok(())
1232    }
1233
1234    /// Drops a vector index for the given label and property.
1235    ///
1236    /// Returns `true` if the index existed and was removed, `false` if no
1237    /// index was found.
1238    ///
1239    /// After dropping, [`vector_search`](Self::vector_search) for this
1240    /// label+property pair will return an error.
1241    #[cfg(feature = "vector-index")]
1242    pub fn drop_vector_index(&self, label: &str, property: &str) -> bool {
1243        let removed = self.store.remove_vector_index(label, property);
1244        if removed {
1245            tracing::info!("Vector index dropped: :{label}({property})");
1246        }
1247        removed
1248    }
1249
1250    /// Drops and recreates a vector index, rescanning all matching nodes.
1251    ///
1252    /// This is useful after bulk inserts or when the index may be out of sync.
1253    /// The previous index configuration (dimensions, metric, M, ef\_construction)
1254    /// is preserved.
1255    ///
1256    /// # Errors
1257    ///
1258    /// Returns an error if no vector index exists for this label+property pair,
1259    /// or if the rebuild fails (e.g., no matching vectors found).
1260    #[cfg(feature = "vector-index")]
1261    pub fn rebuild_vector_index(&self, label: &str, property: &str) -> Result<()> {
1262        let config = self
1263            .store
1264            .get_vector_index(label, property)
1265            .map(|idx| idx.config().clone())
1266            .ok_or_else(|| {
1267                grafeo_common::utils::error::Error::Internal(format!(
1268                    "No vector index found for :{label}({property}). Cannot rebuild."
1269                ))
1270            })?;
1271
1272        self.store.remove_vector_index(label, property);
1273
1274        self.create_vector_index(
1275            label,
1276            property,
1277            Some(config.dimensions),
1278            Some(config.metric.name()),
1279            Some(config.m),
1280            Some(config.ef_construction),
1281        )
1282    }
1283
1284    /// Computes a node allowlist from property equality filters.
1285    ///
1286    /// Intersects `nodes_by_label(label)` with `find_nodes_by_property(key, value)`
1287    /// for each filter entry. Returns `None` if filters is `None` or empty (meaning
1288    /// no filtering), or `Some(set)` with the intersection (possibly empty).
1289    #[cfg(feature = "vector-index")]
1290    fn compute_filter_allowlist(
1291        &self,
1292        label: &str,
1293        filters: Option<&std::collections::HashMap<String, Value>>,
1294    ) -> Option<std::collections::HashSet<NodeId>> {
1295        let filters = filters.filter(|f| !f.is_empty())?;
1296
1297        // Start with all nodes for this label
1298        let label_nodes: std::collections::HashSet<NodeId> =
1299            self.store.nodes_by_label(label).into_iter().collect();
1300
1301        let mut allowlist = label_nodes;
1302
1303        for (key, value) in filters {
1304            let matching: std::collections::HashSet<NodeId> = self
1305                .store
1306                .find_nodes_by_property(key, value)
1307                .into_iter()
1308                .collect();
1309            allowlist = allowlist.intersection(&matching).copied().collect();
1310
1311            // Short-circuit: empty intersection means no results possible
1312            if allowlist.is_empty() {
1313                return Some(allowlist);
1314            }
1315        }
1316
1317        Some(allowlist)
1318    }
1319
1320    /// Searches for the k nearest neighbors of a query vector.
1321    ///
1322    /// Uses the HNSW index created by [`create_vector_index`](Self::create_vector_index).
1323    ///
1324    /// # Arguments
1325    ///
1326    /// * `label` - Node label that was indexed
1327    /// * `property` - Property that was indexed
1328    /// * `query` - Query vector (slice of floats)
1329    /// * `k` - Number of nearest neighbors to return
1330    /// * `ef` - Search beam width (higher = better recall, slower). Uses index default if `None`.
1331    /// * `filters` - Optional property equality filters. Only nodes matching all
1332    ///   `(key, value)` pairs will appear in results.
1333    ///
1334    /// # Returns
1335    ///
1336    /// Vector of `(NodeId, distance)` pairs sorted by distance ascending.
1337    #[cfg(feature = "vector-index")]
1338    pub fn vector_search(
1339        &self,
1340        label: &str,
1341        property: &str,
1342        query: &[f32],
1343        k: usize,
1344        ef: Option<usize>,
1345        filters: Option<&std::collections::HashMap<String, Value>>,
1346    ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1347        let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1348            grafeo_common::utils::error::Error::Internal(format!(
1349                "No vector index found for :{label}({property}). Call create_vector_index() first."
1350            ))
1351        })?;
1352
1353        let accessor =
1354            grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1355
1356        let results = match self.compute_filter_allowlist(label, filters) {
1357            Some(allowlist) => match ef {
1358                Some(ef_val) => {
1359                    index.search_with_ef_and_filter(query, k, ef_val, &allowlist, &accessor)
1360                }
1361                None => index.search_with_filter(query, k, &allowlist, &accessor),
1362            },
1363            None => match ef {
1364                Some(ef_val) => index.search_with_ef(query, k, ef_val, &accessor),
1365                None => index.search(query, k, &accessor),
1366            },
1367        };
1368
1369        Ok(results)
1370    }
1371
1372    /// Creates multiple nodes in bulk, each with a single vector property.
1373    ///
1374    /// Much faster than individual `create_node_with_props` calls because it
1375    /// acquires internal locks once and loops in Rust rather than crossing
1376    /// the FFI boundary per vector.
1377    ///
1378    /// # Arguments
1379    ///
1380    /// * `label` - Label applied to all created nodes
1381    /// * `property` - Property name for the vector data
1382    /// * `vectors` - Vector data for each node
1383    ///
1384    /// # Returns
1385    ///
1386    /// Vector of created `NodeId`s in the same order as the input vectors.
1387    pub fn batch_create_nodes(
1388        &self,
1389        label: &str,
1390        property: &str,
1391        vectors: Vec<Vec<f32>>,
1392    ) -> Vec<grafeo_common::types::NodeId> {
1393        use grafeo_common::types::{PropertyKey, Value};
1394
1395        let prop_key = PropertyKey::new(property);
1396        let labels: &[&str] = &[label];
1397
1398        let ids: Vec<grafeo_common::types::NodeId> = vectors
1399            .into_iter()
1400            .map(|vec| {
1401                let value = Value::Vector(vec.into());
1402                let id = self.store.create_node_with_props(
1403                    labels,
1404                    std::iter::once((prop_key.clone(), value.clone())),
1405                );
1406
1407                // Log to WAL
1408                #[cfg(feature = "wal")]
1409                {
1410                    if let Err(e) = self.log_wal(&WalRecord::CreateNode {
1411                        id,
1412                        labels: labels.iter().map(|s| (*s).to_string()).collect(),
1413                    }) {
1414                        tracing::warn!("Failed to log CreateNode to WAL: {}", e);
1415                    }
1416                    if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
1417                        id,
1418                        key: property.to_string(),
1419                        value,
1420                    }) {
1421                        tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
1422                    }
1423                }
1424
1425                id
1426            })
1427            .collect();
1428
1429        // Auto-insert into matching vector index if one exists
1430        #[cfg(feature = "vector-index")]
1431        if let Some(index) = self.store.get_vector_index(label, property) {
1432            let accessor =
1433                grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1434            for &id in &ids {
1435                if let Some(node) = self.store.get_node(id) {
1436                    let pk = grafeo_common::types::PropertyKey::new(property);
1437                    if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk) {
1438                        index.insert(id, v, &accessor);
1439                    }
1440                }
1441            }
1442        }
1443
1444        ids
1445    }
1446
1447    /// Searches for nearest neighbors for multiple query vectors in parallel.
1448    ///
1449    /// Uses rayon parallel iteration under the hood for multi-core throughput.
1450    ///
1451    /// # Arguments
1452    ///
1453    /// * `label` - Node label that was indexed
1454    /// * `property` - Property that was indexed
1455    /// * `queries` - Batch of query vectors
1456    /// * `k` - Number of nearest neighbors per query
1457    /// * `ef` - Search beam width (uses index default if `None`)
1458    /// * `filters` - Optional property equality filters
1459    #[cfg(feature = "vector-index")]
1460    pub fn batch_vector_search(
1461        &self,
1462        label: &str,
1463        property: &str,
1464        queries: &[Vec<f32>],
1465        k: usize,
1466        ef: Option<usize>,
1467        filters: Option<&std::collections::HashMap<String, Value>>,
1468    ) -> Result<Vec<Vec<(grafeo_common::types::NodeId, f32)>>> {
1469        let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1470            grafeo_common::utils::error::Error::Internal(format!(
1471                "No vector index found for :{label}({property}). Call create_vector_index() first."
1472            ))
1473        })?;
1474
1475        let accessor =
1476            grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1477
1478        let results = match self.compute_filter_allowlist(label, filters) {
1479            Some(allowlist) => match ef {
1480                Some(ef_val) => {
1481                    index.batch_search_with_ef_and_filter(queries, k, ef_val, &allowlist, &accessor)
1482                }
1483                None => index.batch_search_with_filter(queries, k, &allowlist, &accessor),
1484            },
1485            None => match ef {
1486                Some(ef_val) => index.batch_search_with_ef(queries, k, ef_val, &accessor),
1487                None => index.batch_search(queries, k, &accessor),
1488            },
1489        };
1490
1491        Ok(results)
1492    }
1493
1494    /// Searches for diverse nearest neighbors using Maximal Marginal Relevance (MMR).
1495    ///
1496    /// MMR balances relevance (similarity to query) with diversity (dissimilarity
1497    /// among selected results). This is the algorithm used by LangChain's
1498    /// `mmr_traversal_search()` for RAG applications.
1499    ///
1500    /// # Arguments
1501    ///
1502    /// * `label` - Node label that was indexed
1503    /// * `property` - Property that was indexed
1504    /// * `query` - Query vector
1505    /// * `k` - Number of diverse results to return
1506    /// * `fetch_k` - Number of initial candidates from HNSW (default: `4 * k`)
1507    /// * `lambda` - Relevance vs. diversity in \[0, 1\] (default: 0.5).
1508    ///   1.0 = pure relevance, 0.0 = pure diversity.
1509    /// * `ef` - HNSW search beam width (uses index default if `None`)
1510    /// * `filters` - Optional property equality filters
1511    ///
1512    /// # Returns
1513    ///
1514    /// `(NodeId, distance)` pairs in MMR selection order. The f32 is the original
1515    /// distance from the query, matching [`vector_search`](Self::vector_search).
1516    #[cfg(feature = "vector-index")]
1517    #[allow(clippy::too_many_arguments)]
1518    pub fn mmr_search(
1519        &self,
1520        label: &str,
1521        property: &str,
1522        query: &[f32],
1523        k: usize,
1524        fetch_k: Option<usize>,
1525        lambda: Option<f32>,
1526        ef: Option<usize>,
1527        filters: Option<&std::collections::HashMap<String, Value>>,
1528    ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1529        use grafeo_core::index::vector::mmr_select;
1530
1531        let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1532            grafeo_common::utils::error::Error::Internal(format!(
1533                "No vector index found for :{label}({property}). Call create_vector_index() first."
1534            ))
1535        })?;
1536
1537        let accessor =
1538            grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1539
1540        let fetch_k = fetch_k.unwrap_or(k.saturating_mul(4).max(k));
1541        let lambda = lambda.unwrap_or(0.5);
1542
1543        // Step 1: Fetch candidates from HNSW (with optional filter)
1544        let initial_results = match self.compute_filter_allowlist(label, filters) {
1545            Some(allowlist) => match ef {
1546                Some(ef_val) => {
1547                    index.search_with_ef_and_filter(query, fetch_k, ef_val, &allowlist, &accessor)
1548                }
1549                None => index.search_with_filter(query, fetch_k, &allowlist, &accessor),
1550            },
1551            None => match ef {
1552                Some(ef_val) => index.search_with_ef(query, fetch_k, ef_val, &accessor),
1553                None => index.search(query, fetch_k, &accessor),
1554            },
1555        };
1556
1557        if initial_results.is_empty() {
1558            return Ok(Vec::new());
1559        }
1560
1561        // Step 2: Retrieve stored vectors for MMR pairwise comparison
1562        use grafeo_core::index::vector::VectorAccessor;
1563        let candidates: Vec<(grafeo_common::types::NodeId, f32, std::sync::Arc<[f32]>)> =
1564            initial_results
1565                .into_iter()
1566                .filter_map(|(id, dist)| accessor.get_vector(id).map(|vec| (id, dist, vec)))
1567                .collect();
1568
1569        // Step 3: Build slice-based candidates for mmr_select
1570        let candidate_refs: Vec<(grafeo_common::types::NodeId, f32, &[f32])> = candidates
1571            .iter()
1572            .map(|(id, dist, vec)| (*id, *dist, vec.as_ref()))
1573            .collect();
1574
1575        // Step 4: Run MMR selection
1576        let metric = index.config().metric;
1577        Ok(mmr_select(query, &candidate_refs, k, lambda, metric))
1578    }
1579
1580    /// Drops an index on a node property.
1581    ///
1582    /// Returns `true` if the index existed and was removed.
1583    pub fn drop_property_index(&self, property: &str) -> bool {
1584        self.store.drop_property_index(property)
1585    }
1586
1587    /// Returns `true` if the property has an index.
1588    #[must_use]
1589    pub fn has_property_index(&self, property: &str) -> bool {
1590        self.store.has_property_index(property)
1591    }
1592
1593    /// Finds all nodes that have a specific property value.
1594    ///
1595    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
1596    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
1597    ///
1598    /// # Example
1599    ///
1600    /// ```ignore
1601    /// // Create index for fast lookups (optional but recommended)
1602    /// db.create_property_index("city");
1603    ///
1604    /// // Find all nodes where city = "NYC"
1605    /// let nyc_nodes = db.find_nodes_by_property("city", &Value::from("NYC"));
1606    /// ```
1607    #[must_use]
1608    pub fn find_nodes_by_property(
1609        &self,
1610        property: &str,
1611        value: &grafeo_common::types::Value,
1612    ) -> Vec<grafeo_common::types::NodeId> {
1613        self.store.find_nodes_by_property(property, value)
1614    }
1615
1616    // =========================================================================
1617    // ADMIN API: Introspection
1618    // =========================================================================
1619
1620    /// Returns true if this database is backed by a file (persistent).
1621    ///
1622    /// In-memory databases return false.
1623    #[must_use]
1624    pub fn is_persistent(&self) -> bool {
1625        self.config.path.is_some()
1626    }
1627
1628    /// Returns the database file path, if persistent.
1629    ///
1630    /// In-memory databases return None.
1631    #[must_use]
1632    pub fn path(&self) -> Option<&Path> {
1633        self.config.path.as_deref()
1634    }
1635
1636    /// Returns high-level database information.
1637    ///
1638    /// Includes node/edge counts, persistence status, and mode (LPG/RDF).
1639    #[must_use]
1640    pub fn info(&self) -> crate::admin::DatabaseInfo {
1641        crate::admin::DatabaseInfo {
1642            mode: crate::admin::DatabaseMode::Lpg,
1643            node_count: self.store.node_count(),
1644            edge_count: self.store.edge_count(),
1645            is_persistent: self.is_persistent(),
1646            path: self.config.path.clone(),
1647            wal_enabled: self.config.wal_enabled,
1648            version: env!("CARGO_PKG_VERSION").to_string(),
1649        }
1650    }
1651
1652    /// Returns detailed database statistics.
1653    ///
1654    /// Includes counts, memory usage, and index information.
1655    #[must_use]
1656    pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1657        #[cfg(feature = "wal")]
1658        let disk_bytes = self.config.path.as_ref().and_then(|p| {
1659            if p.exists() {
1660                Self::calculate_disk_usage(p).ok()
1661            } else {
1662                None
1663            }
1664        });
1665        #[cfg(not(feature = "wal"))]
1666        let disk_bytes: Option<usize> = None;
1667
1668        crate::admin::DatabaseStats {
1669            node_count: self.store.node_count(),
1670            edge_count: self.store.edge_count(),
1671            label_count: self.store.label_count(),
1672            edge_type_count: self.store.edge_type_count(),
1673            property_key_count: self.store.property_key_count(),
1674            index_count: 0, // TODO: implement index tracking
1675            memory_bytes: self.buffer_manager.allocated(),
1676            disk_bytes,
1677        }
1678    }
1679
1680    /// Calculates total disk usage for the database directory.
1681    #[cfg(feature = "wal")]
1682    fn calculate_disk_usage(path: &Path) -> Result<usize> {
1683        let mut total = 0usize;
1684        if path.is_dir() {
1685            for entry in std::fs::read_dir(path)? {
1686                let entry = entry?;
1687                let metadata = entry.metadata()?;
1688                if metadata.is_file() {
1689                    total += metadata.len() as usize;
1690                } else if metadata.is_dir() {
1691                    total += Self::calculate_disk_usage(&entry.path())?;
1692                }
1693            }
1694        }
1695        Ok(total)
1696    }
1697
1698    /// Returns schema information (labels, edge types, property keys).
1699    ///
1700    /// For LPG mode, returns label and edge type information.
1701    /// For RDF mode, returns predicate and named graph information.
1702    #[must_use]
1703    pub fn schema(&self) -> crate::admin::SchemaInfo {
1704        let labels = self
1705            .store
1706            .all_labels()
1707            .into_iter()
1708            .map(|name| crate::admin::LabelInfo {
1709                name: name.clone(),
1710                count: self.store.nodes_with_label(&name).count(),
1711            })
1712            .collect();
1713
1714        let edge_types = self
1715            .store
1716            .all_edge_types()
1717            .into_iter()
1718            .map(|name| crate::admin::EdgeTypeInfo {
1719                name: name.clone(),
1720                count: self.store.edges_with_type(&name).count(),
1721            })
1722            .collect();
1723
1724        let property_keys = self.store.all_property_keys();
1725
1726        crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
1727            labels,
1728            edge_types,
1729            property_keys,
1730        })
1731    }
1732
1733    /// Returns RDF schema information.
1734    ///
1735    /// Only available when the RDF feature is enabled.
1736    #[cfg(feature = "rdf")]
1737    #[must_use]
1738    pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
1739        let stats = self.rdf_store.stats();
1740
1741        let predicates = self
1742            .rdf_store
1743            .predicates()
1744            .into_iter()
1745            .map(|predicate| {
1746                let count = self.rdf_store.triples_with_predicate(&predicate).len();
1747                crate::admin::PredicateInfo {
1748                    iri: predicate.to_string(),
1749                    count,
1750                }
1751            })
1752            .collect();
1753
1754        crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
1755            predicates,
1756            named_graphs: Vec::new(), // Named graphs not yet implemented in RdfStore
1757            subject_count: stats.subject_count,
1758            object_count: stats.object_count,
1759        })
1760    }
1761
1762    /// Validates database integrity.
1763    ///
1764    /// Checks for:
1765    /// - Dangling edge references (edges pointing to non-existent nodes)
1766    /// - Internal index consistency
1767    ///
1768    /// Returns a list of errors and warnings. Empty errors = valid.
1769    #[must_use]
1770    pub fn validate(&self) -> crate::admin::ValidationResult {
1771        let mut result = crate::admin::ValidationResult::default();
1772
1773        // Check for dangling edge references
1774        for edge in self.store.all_edges() {
1775            if self.store.get_node(edge.src).is_none() {
1776                result.errors.push(crate::admin::ValidationError {
1777                    code: "DANGLING_SRC".to_string(),
1778                    message: format!(
1779                        "Edge {} references non-existent source node {}",
1780                        edge.id.0, edge.src.0
1781                    ),
1782                    context: Some(format!("edge:{}", edge.id.0)),
1783                });
1784            }
1785            if self.store.get_node(edge.dst).is_none() {
1786                result.errors.push(crate::admin::ValidationError {
1787                    code: "DANGLING_DST".to_string(),
1788                    message: format!(
1789                        "Edge {} references non-existent destination node {}",
1790                        edge.id.0, edge.dst.0
1791                    ),
1792                    context: Some(format!("edge:{}", edge.id.0)),
1793                });
1794            }
1795        }
1796
1797        // Add warnings for potential issues
1798        if self.store.node_count() > 0 && self.store.edge_count() == 0 {
1799            result.warnings.push(crate::admin::ValidationWarning {
1800                code: "NO_EDGES".to_string(),
1801                message: "Database has nodes but no edges".to_string(),
1802                context: None,
1803            });
1804        }
1805
1806        result
1807    }
1808
1809    /// Returns WAL (Write-Ahead Log) status.
1810    ///
1811    /// Returns None if WAL is not enabled.
1812    #[must_use]
1813    pub fn wal_status(&self) -> crate::admin::WalStatus {
1814        #[cfg(feature = "wal")]
1815        if let Some(ref wal) = self.wal {
1816            return crate::admin::WalStatus {
1817                enabled: true,
1818                path: self.config.path.as_ref().map(|p| p.join("wal")),
1819                size_bytes: wal.size_bytes(),
1820                record_count: wal.record_count() as usize,
1821                last_checkpoint: wal.last_checkpoint_timestamp(),
1822                current_epoch: self.store.current_epoch().as_u64(),
1823            };
1824        }
1825
1826        crate::admin::WalStatus {
1827            enabled: false,
1828            path: None,
1829            size_bytes: 0,
1830            record_count: 0,
1831            last_checkpoint: None,
1832            current_epoch: self.store.current_epoch().as_u64(),
1833        }
1834    }
1835
1836    /// Forces a WAL checkpoint.
1837    ///
1838    /// Flushes all pending WAL records to the main storage.
1839    ///
1840    /// # Errors
1841    ///
1842    /// Returns an error if the checkpoint fails.
1843    pub fn wal_checkpoint(&self) -> Result<()> {
1844        #[cfg(feature = "wal")]
1845        if let Some(ref wal) = self.wal {
1846            let epoch = self.store.current_epoch();
1847            let tx_id = self
1848                .tx_manager
1849                .last_assigned_tx_id()
1850                .unwrap_or_else(|| self.tx_manager.begin());
1851            wal.checkpoint(tx_id, epoch)?;
1852            wal.sync()?;
1853        }
1854        Ok(())
1855    }
1856
1857    // =========================================================================
1858    // ADMIN API: Persistence Control
1859    // =========================================================================
1860
1861    /// Saves the database to a file path.
1862    ///
1863    /// - If in-memory: creates a new persistent database at path
1864    /// - If file-backed: creates a copy at the new path
1865    ///
1866    /// The original database remains unchanged.
1867    ///
1868    /// # Errors
1869    ///
1870    /// Returns an error if the save operation fails.
1871    ///
1872    /// Requires the `wal` feature for persistence support.
1873    #[cfg(feature = "wal")]
1874    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
1875        let path = path.as_ref();
1876
1877        // Create target database with WAL enabled
1878        let target_config = Config::persistent(path);
1879        let target = Self::with_config(target_config)?;
1880
1881        // Copy all nodes using WAL-enabled methods
1882        for node in self.store.all_nodes() {
1883            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1884            target.store.create_node_with_id(node.id, &label_refs);
1885
1886            // Log to WAL
1887            target.log_wal(&WalRecord::CreateNode {
1888                id: node.id,
1889                labels: node.labels.iter().map(|s| s.to_string()).collect(),
1890            })?;
1891
1892            // Copy properties
1893            for (key, value) in node.properties {
1894                target
1895                    .store
1896                    .set_node_property(node.id, key.as_str(), value.clone());
1897                target.log_wal(&WalRecord::SetNodeProperty {
1898                    id: node.id,
1899                    key: key.to_string(),
1900                    value,
1901                })?;
1902            }
1903        }
1904
1905        // Copy all edges using WAL-enabled methods
1906        for edge in self.store.all_edges() {
1907            target
1908                .store
1909                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1910
1911            // Log to WAL
1912            target.log_wal(&WalRecord::CreateEdge {
1913                id: edge.id,
1914                src: edge.src,
1915                dst: edge.dst,
1916                edge_type: edge.edge_type.to_string(),
1917            })?;
1918
1919            // Copy properties
1920            for (key, value) in edge.properties {
1921                target
1922                    .store
1923                    .set_edge_property(edge.id, key.as_str(), value.clone());
1924                target.log_wal(&WalRecord::SetEdgeProperty {
1925                    id: edge.id,
1926                    key: key.to_string(),
1927                    value,
1928                })?;
1929            }
1930        }
1931
1932        // Checkpoint and close the target database
1933        target.close()?;
1934
1935        Ok(())
1936    }
1937
1938    /// Creates an in-memory copy of this database.
1939    ///
1940    /// Returns a new database that is completely independent.
1941    /// Useful for:
1942    /// - Testing modifications without affecting the original
1943    /// - Faster operations when persistence isn't needed
1944    ///
1945    /// # Errors
1946    ///
1947    /// Returns an error if the copy operation fails.
1948    pub fn to_memory(&self) -> Result<Self> {
1949        let config = Config::in_memory();
1950        let target = Self::with_config(config)?;
1951
1952        // Copy all nodes
1953        for node in self.store.all_nodes() {
1954            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1955            target.store.create_node_with_id(node.id, &label_refs);
1956
1957            // Copy properties
1958            for (key, value) in node.properties {
1959                target.store.set_node_property(node.id, key.as_str(), value);
1960            }
1961        }
1962
1963        // Copy all edges
1964        for edge in self.store.all_edges() {
1965            target
1966                .store
1967                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1968
1969            // Copy properties
1970            for (key, value) in edge.properties {
1971                target.store.set_edge_property(edge.id, key.as_str(), value);
1972            }
1973        }
1974
1975        Ok(target)
1976    }
1977
1978    /// Opens a database file and loads it entirely into memory.
1979    ///
1980    /// The returned database has no connection to the original file.
1981    /// Changes will NOT be written back to the file.
1982    ///
1983    /// # Errors
1984    ///
1985    /// Returns an error if the file can't be opened or loaded.
1986    #[cfg(feature = "wal")]
1987    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
1988        // Open the source database (triggers WAL recovery)
1989        let source = Self::open(path)?;
1990
1991        // Create in-memory copy
1992        let target = source.to_memory()?;
1993
1994        // Close the source (releases file handles)
1995        source.close()?;
1996
1997        Ok(target)
1998    }
1999
2000    // =========================================================================
2001    // ADMIN API: Snapshot Export/Import
2002    // =========================================================================
2003
2004    /// Exports the entire database to a binary snapshot.
2005    ///
2006    /// The returned bytes can be stored (e.g. in IndexedDB) and later
2007    /// restored with [`import_snapshot()`](Self::import_snapshot).
2008    ///
2009    /// # Errors
2010    ///
2011    /// Returns an error if serialization fails.
2012    pub fn export_snapshot(&self) -> Result<Vec<u8>> {
2013        let nodes: Vec<SnapshotNode> = self
2014            .store
2015            .all_nodes()
2016            .map(|n| SnapshotNode {
2017                id: n.id,
2018                labels: n.labels.iter().map(|l| l.to_string()).collect(),
2019                properties: n
2020                    .properties
2021                    .into_iter()
2022                    .map(|(k, v)| (k.to_string(), v))
2023                    .collect(),
2024            })
2025            .collect();
2026
2027        let edges: Vec<SnapshotEdge> = self
2028            .store
2029            .all_edges()
2030            .map(|e| SnapshotEdge {
2031                id: e.id,
2032                src: e.src,
2033                dst: e.dst,
2034                edge_type: e.edge_type.to_string(),
2035                properties: e
2036                    .properties
2037                    .into_iter()
2038                    .map(|(k, v)| (k.to_string(), v))
2039                    .collect(),
2040            })
2041            .collect();
2042
2043        let snapshot = Snapshot {
2044            version: 1,
2045            nodes,
2046            edges,
2047        };
2048
2049        let config = bincode::config::standard();
2050        bincode::serde::encode_to_vec(&snapshot, config)
2051            .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
2052    }
2053
2054    /// Creates a new in-memory database from a binary snapshot.
2055    ///
2056    /// The `data` must have been produced by [`export_snapshot()`](Self::export_snapshot).
2057    ///
2058    /// # Errors
2059    ///
2060    /// Returns an error if the snapshot is invalid or deserialization fails.
2061    pub fn import_snapshot(data: &[u8]) -> Result<Self> {
2062        let config = bincode::config::standard();
2063        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
2064            .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
2065
2066        if snapshot.version != 1 {
2067            return Err(Error::Internal(format!(
2068                "unsupported snapshot version: {}",
2069                snapshot.version
2070            )));
2071        }
2072
2073        let db = Self::new_in_memory();
2074
2075        for node in snapshot.nodes {
2076            let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
2077            db.store.create_node_with_id(node.id, &label_refs);
2078            for (key, value) in node.properties {
2079                db.store.set_node_property(node.id, &key, value);
2080            }
2081        }
2082
2083        for edge in snapshot.edges {
2084            db.store
2085                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
2086            for (key, value) in edge.properties {
2087                db.store.set_edge_property(edge.id, &key, value);
2088            }
2089        }
2090
2091        Ok(db)
2092    }
2093
2094    // =========================================================================
2095    // ADMIN API: Iteration
2096    // =========================================================================
2097
2098    /// Returns an iterator over all nodes in the database.
2099    ///
2100    /// Useful for dump/export operations.
2101    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
2102        self.store.all_nodes()
2103    }
2104
2105    /// Returns an iterator over all edges in the database.
2106    ///
2107    /// Useful for dump/export operations.
2108    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
2109        self.store.all_edges()
2110    }
2111}
2112
2113/// Binary snapshot format for database export/import.
2114#[derive(serde::Serialize, serde::Deserialize)]
2115struct Snapshot {
2116    version: u8,
2117    nodes: Vec<SnapshotNode>,
2118    edges: Vec<SnapshotEdge>,
2119}
2120
2121#[derive(serde::Serialize, serde::Deserialize)]
2122struct SnapshotNode {
2123    id: NodeId,
2124    labels: Vec<String>,
2125    properties: Vec<(String, Value)>,
2126}
2127
2128#[derive(serde::Serialize, serde::Deserialize)]
2129struct SnapshotEdge {
2130    id: EdgeId,
2131    src: NodeId,
2132    dst: NodeId,
2133    edge_type: String,
2134    properties: Vec<(String, Value)>,
2135}
2136
2137impl Drop for GrafeoDB {
2138    fn drop(&mut self) {
2139        if let Err(e) = self.close() {
2140            tracing::error!("Error closing database: {}", e);
2141        }
2142    }
2143}
2144
2145impl crate::admin::AdminService for GrafeoDB {
2146    fn info(&self) -> crate::admin::DatabaseInfo {
2147        self.info()
2148    }
2149
2150    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2151        self.detailed_stats()
2152    }
2153
2154    fn schema(&self) -> crate::admin::SchemaInfo {
2155        self.schema()
2156    }
2157
2158    fn validate(&self) -> crate::admin::ValidationResult {
2159        self.validate()
2160    }
2161
2162    fn wal_status(&self) -> crate::admin::WalStatus {
2163        self.wal_status()
2164    }
2165
2166    fn wal_checkpoint(&self) -> Result<()> {
2167        self.wal_checkpoint()
2168    }
2169}
2170
2171/// The result of running a query.
2172///
2173/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
2174/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
2175///
2176/// # Examples
2177///
2178/// ```
2179/// use grafeo_engine::GrafeoDB;
2180///
2181/// let db = GrafeoDB::new_in_memory();
2182/// db.create_node(&["Person"]);
2183///
2184/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
2185///
2186/// // Check what we got
2187/// println!("Columns: {:?}", result.columns);
2188/// println!("Rows: {}", result.row_count());
2189///
2190/// // Iterate through results
2191/// for row in result.iter() {
2192///     println!("{:?}", row);
2193/// }
2194/// # Ok::<(), grafeo_common::utils::error::Error>(())
2195/// ```
2196#[derive(Debug)]
2197pub struct QueryResult {
2198    /// Column names from the RETURN clause.
2199    pub columns: Vec<String>,
2200    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
2201    pub column_types: Vec<grafeo_common::types::LogicalType>,
2202    /// The actual result rows.
2203    pub rows: Vec<Vec<grafeo_common::types::Value>>,
2204    /// Query execution time in milliseconds (if timing was enabled).
2205    pub execution_time_ms: Option<f64>,
2206    /// Number of rows scanned during query execution (estimate).
2207    pub rows_scanned: Option<u64>,
2208}
2209
2210impl QueryResult {
2211    /// Creates a new empty query result.
2212    #[must_use]
2213    pub fn new(columns: Vec<String>) -> Self {
2214        let len = columns.len();
2215        Self {
2216            columns,
2217            column_types: vec![grafeo_common::types::LogicalType::Any; len],
2218            rows: Vec::new(),
2219            execution_time_ms: None,
2220            rows_scanned: None,
2221        }
2222    }
2223
2224    /// Creates a new empty query result with column types.
2225    #[must_use]
2226    pub fn with_types(
2227        columns: Vec<String>,
2228        column_types: Vec<grafeo_common::types::LogicalType>,
2229    ) -> Self {
2230        Self {
2231            columns,
2232            column_types,
2233            rows: Vec::new(),
2234            execution_time_ms: None,
2235            rows_scanned: None,
2236        }
2237    }
2238
2239    /// Sets the execution metrics on this result.
2240    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2241        self.execution_time_ms = Some(execution_time_ms);
2242        self.rows_scanned = Some(rows_scanned);
2243        self
2244    }
2245
2246    /// Returns the execution time in milliseconds, if available.
2247    #[must_use]
2248    pub fn execution_time_ms(&self) -> Option<f64> {
2249        self.execution_time_ms
2250    }
2251
2252    /// Returns the number of rows scanned, if available.
2253    #[must_use]
2254    pub fn rows_scanned(&self) -> Option<u64> {
2255        self.rows_scanned
2256    }
2257
2258    /// Returns the number of rows.
2259    #[must_use]
2260    pub fn row_count(&self) -> usize {
2261        self.rows.len()
2262    }
2263
2264    /// Returns the number of columns.
2265    #[must_use]
2266    pub fn column_count(&self) -> usize {
2267        self.columns.len()
2268    }
2269
2270    /// Returns true if the result is empty.
2271    #[must_use]
2272    pub fn is_empty(&self) -> bool {
2273        self.rows.is_empty()
2274    }
2275
2276    /// Extracts a single value from the result.
2277    ///
2278    /// Use this when your query returns exactly one row with one column,
2279    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
2280    ///
2281    /// # Errors
2282    ///
2283    /// Returns an error if the result has multiple rows or columns.
2284    pub fn scalar<T: FromValue>(&self) -> Result<T> {
2285        if self.rows.len() != 1 || self.columns.len() != 1 {
2286            return Err(grafeo_common::utils::error::Error::InvalidValue(
2287                "Expected single value".to_string(),
2288            ));
2289        }
2290        T::from_value(&self.rows[0][0])
2291    }
2292
2293    /// Returns an iterator over the rows.
2294    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2295        self.rows.iter()
2296    }
2297}
2298
2299/// Converts a [`Value`] to a concrete Rust type.
2300///
2301/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
2302/// Used by [`QueryResult::scalar()`] to extract typed values.
2303pub trait FromValue: Sized {
2304    /// Attempts the conversion, returning an error on type mismatch.
2305    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2306}
2307
2308impl FromValue for i64 {
2309    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2310        value
2311            .as_int64()
2312            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2313                expected: "INT64".to_string(),
2314                found: value.type_name().to_string(),
2315            })
2316    }
2317}
2318
2319impl FromValue for f64 {
2320    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2321        value
2322            .as_float64()
2323            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2324                expected: "FLOAT64".to_string(),
2325                found: value.type_name().to_string(),
2326            })
2327    }
2328}
2329
2330impl FromValue for String {
2331    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2332        value.as_str().map(String::from).ok_or_else(|| {
2333            grafeo_common::utils::error::Error::TypeMismatch {
2334                expected: "STRING".to_string(),
2335                found: value.type_name().to_string(),
2336            }
2337        })
2338    }
2339}
2340
2341impl FromValue for bool {
2342    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2343        value
2344            .as_bool()
2345            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2346                expected: "BOOL".to_string(),
2347                found: value.type_name().to_string(),
2348            })
2349    }
2350}
2351
2352#[cfg(test)]
2353mod tests {
2354    use super::*;
2355
2356    #[test]
2357    fn test_create_in_memory_database() {
2358        let db = GrafeoDB::new_in_memory();
2359        assert_eq!(db.node_count(), 0);
2360        assert_eq!(db.edge_count(), 0);
2361    }
2362
2363    #[test]
2364    fn test_database_config() {
2365        let config = Config::in_memory().with_threads(4).with_query_logging();
2366
2367        let db = GrafeoDB::with_config(config).unwrap();
2368        assert_eq!(db.config().threads, 4);
2369        assert!(db.config().query_logging);
2370    }
2371
2372    #[test]
2373    fn test_database_session() {
2374        let db = GrafeoDB::new_in_memory();
2375        let _session = db.session();
2376        // Session should be created successfully
2377    }
2378
2379    #[cfg(feature = "wal")]
2380    #[test]
2381    fn test_persistent_database_recovery() {
2382        use grafeo_common::types::Value;
2383        use tempfile::tempdir;
2384
2385        let dir = tempdir().unwrap();
2386        let db_path = dir.path().join("test_db");
2387
2388        // Create database and add some data
2389        {
2390            let db = GrafeoDB::open(&db_path).unwrap();
2391
2392            let alice = db.create_node(&["Person"]);
2393            db.set_node_property(alice, "name", Value::from("Alice"));
2394
2395            let bob = db.create_node(&["Person"]);
2396            db.set_node_property(bob, "name", Value::from("Bob"));
2397
2398            let _edge = db.create_edge(alice, bob, "KNOWS");
2399
2400            // Explicitly close to flush WAL
2401            db.close().unwrap();
2402        }
2403
2404        // Reopen and verify data was recovered
2405        {
2406            let db = GrafeoDB::open(&db_path).unwrap();
2407
2408            assert_eq!(db.node_count(), 2);
2409            assert_eq!(db.edge_count(), 1);
2410
2411            // Verify nodes exist
2412            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2413            assert!(node0.is_some());
2414
2415            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2416            assert!(node1.is_some());
2417        }
2418    }
2419
2420    #[cfg(feature = "wal")]
2421    #[test]
2422    fn test_wal_logging() {
2423        use tempfile::tempdir;
2424
2425        let dir = tempdir().unwrap();
2426        let db_path = dir.path().join("wal_test_db");
2427
2428        let db = GrafeoDB::open(&db_path).unwrap();
2429
2430        // Create some data
2431        let node = db.create_node(&["Test"]);
2432        db.delete_node(node);
2433
2434        // WAL should have records
2435        if let Some(wal) = db.wal() {
2436            assert!(wal.record_count() > 0);
2437        }
2438
2439        db.close().unwrap();
2440    }
2441
2442    #[cfg(feature = "wal")]
2443    #[test]
2444    fn test_wal_recovery_multiple_sessions() {
2445        // Tests that WAL recovery works correctly across multiple open/close cycles
2446        use grafeo_common::types::Value;
2447        use tempfile::tempdir;
2448
2449        let dir = tempdir().unwrap();
2450        let db_path = dir.path().join("multi_session_db");
2451
2452        // Session 1: Create initial data
2453        {
2454            let db = GrafeoDB::open(&db_path).unwrap();
2455            let alice = db.create_node(&["Person"]);
2456            db.set_node_property(alice, "name", Value::from("Alice"));
2457            db.close().unwrap();
2458        }
2459
2460        // Session 2: Add more data
2461        {
2462            let db = GrafeoDB::open(&db_path).unwrap();
2463            assert_eq!(db.node_count(), 1); // Previous data recovered
2464            let bob = db.create_node(&["Person"]);
2465            db.set_node_property(bob, "name", Value::from("Bob"));
2466            db.close().unwrap();
2467        }
2468
2469        // Session 3: Verify all data
2470        {
2471            let db = GrafeoDB::open(&db_path).unwrap();
2472            assert_eq!(db.node_count(), 2);
2473
2474            // Verify properties were recovered correctly
2475            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
2476            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
2477
2478            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
2479            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
2480        }
2481    }
2482
2483    #[cfg(feature = "wal")]
2484    #[test]
2485    fn test_database_consistency_after_mutations() {
2486        // Tests that database remains consistent after a series of create/delete operations
2487        use grafeo_common::types::Value;
2488        use tempfile::tempdir;
2489
2490        let dir = tempdir().unwrap();
2491        let db_path = dir.path().join("consistency_db");
2492
2493        {
2494            let db = GrafeoDB::open(&db_path).unwrap();
2495
2496            // Create nodes
2497            let a = db.create_node(&["Node"]);
2498            let b = db.create_node(&["Node"]);
2499            let c = db.create_node(&["Node"]);
2500
2501            // Create edges
2502            let e1 = db.create_edge(a, b, "LINKS");
2503            let _e2 = db.create_edge(b, c, "LINKS");
2504
2505            // Delete middle node and its edge
2506            db.delete_edge(e1);
2507            db.delete_node(b);
2508
2509            // Set properties on remaining nodes
2510            db.set_node_property(a, "value", Value::Int64(1));
2511            db.set_node_property(c, "value", Value::Int64(3));
2512
2513            db.close().unwrap();
2514        }
2515
2516        // Reopen and verify consistency
2517        {
2518            let db = GrafeoDB::open(&db_path).unwrap();
2519
2520            // Should have 2 nodes (a and c), b was deleted
2521            // Note: node_count includes deleted nodes in some implementations
2522            // What matters is that the non-deleted nodes are accessible
2523            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
2524            assert!(node_a.is_some());
2525
2526            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
2527            assert!(node_c.is_some());
2528
2529            // Middle node should be deleted
2530            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
2531            assert!(node_b.is_none());
2532        }
2533    }
2534
2535    #[cfg(feature = "wal")]
2536    #[test]
2537    fn test_close_is_idempotent() {
2538        // Calling close() multiple times should not cause errors
2539        use tempfile::tempdir;
2540
2541        let dir = tempdir().unwrap();
2542        let db_path = dir.path().join("close_test_db");
2543
2544        let db = GrafeoDB::open(&db_path).unwrap();
2545        db.create_node(&["Test"]);
2546
2547        // First close should succeed
2548        assert!(db.close().is_ok());
2549
2550        // Second close should also succeed (idempotent)
2551        assert!(db.close().is_ok());
2552    }
2553
2554    #[test]
2555    fn test_query_result_has_metrics() {
2556        // Verifies that query results include execution metrics
2557        let db = GrafeoDB::new_in_memory();
2558        db.create_node(&["Person"]);
2559        db.create_node(&["Person"]);
2560
2561        #[cfg(feature = "gql")]
2562        {
2563            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2564
2565            // Metrics should be populated
2566            assert!(result.execution_time_ms.is_some());
2567            assert!(result.rows_scanned.is_some());
2568            assert!(result.execution_time_ms.unwrap() >= 0.0);
2569            assert_eq!(result.rows_scanned.unwrap(), 2);
2570        }
2571    }
2572
2573    #[test]
2574    fn test_empty_query_result_metrics() {
2575        // Verifies metrics are correct for queries returning no results
2576        let db = GrafeoDB::new_in_memory();
2577        db.create_node(&["Person"]);
2578
2579        #[cfg(feature = "gql")]
2580        {
2581            // Query that matches nothing
2582            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2583
2584            assert!(result.execution_time_ms.is_some());
2585            assert!(result.rows_scanned.is_some());
2586            assert_eq!(result.rows_scanned.unwrap(), 0);
2587        }
2588    }
2589}