Skip to main content

grafeo_engine/
database.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::atomic::AtomicUsize;
8
9use parking_lot::RwLock;
10
11#[cfg(feature = "wal")]
12use grafeo_adapters::storage::wal::{
13    DurabilityMode as WalDurabilityMode, WalConfig, WalManager, WalRecord, WalRecovery,
14};
15use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
16use grafeo_common::types::{EdgeId, NodeId, Value};
17use grafeo_common::utils::error::{Error, Result};
18use grafeo_core::graph::lpg::LpgStore;
19#[cfg(feature = "rdf")]
20use grafeo_core::graph::rdf::RdfStore;
21
22use crate::config::Config;
23use crate::query::cache::QueryCache;
24use crate::session::Session;
25use crate::transaction::TransactionManager;
26
27/// Your handle to a Grafeo database.
28///
29/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
30/// quick experiments, or [`open()`](Self::open) for persistent storage.
31/// Then grab a [`session()`](Self::session) to start querying.
32///
33/// # Examples
34///
35/// ```
36/// use grafeo_engine::GrafeoDB;
37///
38/// // Quick in-memory database
39/// let db = GrafeoDB::new_in_memory();
40///
41/// // Add some data
42/// db.create_node(&["Person"]);
43///
44/// // Query it
45/// let session = db.session();
46/// let result = session.execute("MATCH (p:Person) RETURN p")?;
47/// # Ok::<(), grafeo_common::utils::error::Error>(())
48/// ```
49pub struct GrafeoDB {
50    /// Database configuration.
51    config: Config,
52    /// The underlying graph store.
53    store: Arc<LpgStore>,
54    /// RDF triple store (if RDF feature is enabled).
55    #[cfg(feature = "rdf")]
56    rdf_store: Arc<RdfStore>,
57    /// Transaction manager.
58    tx_manager: Arc<TransactionManager>,
59    /// Unified buffer manager.
60    buffer_manager: Arc<BufferManager>,
61    /// Write-ahead log manager (if durability is enabled).
62    #[cfg(feature = "wal")]
63    wal: Option<Arc<WalManager>>,
64    /// Query cache for parsed and optimized plans.
65    query_cache: Arc<QueryCache>,
66    /// Shared commit counter for auto-GC across sessions.
67    commit_counter: Arc<AtomicUsize>,
68    /// Whether the database is open.
69    is_open: RwLock<bool>,
70    /// Change data capture log for tracking mutations.
71    #[cfg(feature = "cdc")]
72    cdc_log: Arc<crate::cdc::CdcLog>,
73    /// Registered embedding models for text-to-vector conversion.
74    #[cfg(feature = "embed")]
75    embedding_models: RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
76}
77
78impl GrafeoDB {
79    /// Creates an in-memory database - fast to create, gone when dropped.
80    ///
81    /// Use this for tests, experiments, or when you don't need persistence.
82    /// For data that survives restarts, use [`open()`](Self::open) instead.
83    ///
84    /// # Examples
85    ///
86    /// ```
87    /// use grafeo_engine::GrafeoDB;
88    ///
89    /// let db = GrafeoDB::new_in_memory();
90    /// let session = db.session();
91    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
92    /// # Ok::<(), grafeo_common::utils::error::Error>(())
93    /// ```
94    #[must_use]
95    pub fn new_in_memory() -> Self {
96        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
97    }
98
99    /// Opens a database at the given path, creating it if it doesn't exist.
100    ///
101    /// If you've used this path before, Grafeo recovers your data from the
102    /// write-ahead log automatically. First open on a new path creates an
103    /// empty database.
104    ///
105    /// # Errors
106    ///
107    /// Returns an error if the path isn't writable or recovery fails.
108    ///
109    /// # Examples
110    ///
111    /// ```no_run
112    /// use grafeo_engine::GrafeoDB;
113    ///
114    /// let db = GrafeoDB::open("./my_social_network")?;
115    /// # Ok::<(), grafeo_common::utils::error::Error>(())
116    /// ```
117    #[cfg(feature = "wal")]
118    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
119        Self::with_config(Config::persistent(path.as_ref()))
120    }
121
122    /// Creates a database with custom configuration.
123    ///
124    /// Use this when you need fine-grained control over memory limits,
125    /// thread counts, or persistence settings. For most cases,
126    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
127    /// are simpler.
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if the database can't be created or recovery fails.
132    ///
133    /// # Examples
134    ///
135    /// ```
136    /// use grafeo_engine::{GrafeoDB, Config};
137    ///
138    /// // In-memory with a 512MB limit
139    /// let config = Config::in_memory()
140    ///     .with_memory_limit(512 * 1024 * 1024);
141    ///
142    /// let db = GrafeoDB::with_config(config)?;
143    /// # Ok::<(), grafeo_common::utils::error::Error>(())
144    /// ```
145    pub fn with_config(config: Config) -> Result<Self> {
146        // Validate configuration before proceeding
147        config
148            .validate()
149            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
150
151        let store = Arc::new(LpgStore::new());
152        #[cfg(feature = "rdf")]
153        let rdf_store = Arc::new(RdfStore::new());
154        let tx_manager = Arc::new(TransactionManager::new());
155
156        // Create buffer manager with configured limits
157        let buffer_config = BufferManagerConfig {
158            budget: config.memory_limit.unwrap_or_else(|| {
159                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
160            }),
161            spill_path: config
162                .spill_path
163                .clone()
164                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
165            ..BufferManagerConfig::default()
166        };
167        let buffer_manager = BufferManager::new(buffer_config);
168
169        // Initialize WAL if persistence is enabled
170        #[cfg(feature = "wal")]
171        let wal = if config.wal_enabled {
172            if let Some(ref db_path) = config.path {
173                // Create database directory if it doesn't exist
174                std::fs::create_dir_all(db_path)?;
175
176                let wal_path = db_path.join("wal");
177
178                // Check if WAL exists and recover if needed
179                if wal_path.exists() {
180                    let recovery = WalRecovery::new(&wal_path);
181                    let records = recovery.recover()?;
182                    Self::apply_wal_records(&store, &records)?;
183                }
184
185                // Open/create WAL manager with configured durability
186                let wal_durability = match config.wal_durability {
187                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
188                    crate::config::DurabilityMode::Batch {
189                        max_delay_ms,
190                        max_records,
191                    } => WalDurabilityMode::Batch {
192                        max_delay_ms,
193                        max_records,
194                    },
195                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
196                        WalDurabilityMode::Adaptive { target_interval_ms }
197                    }
198                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
199                };
200                let wal_config = WalConfig {
201                    durability: wal_durability,
202                    ..WalConfig::default()
203                };
204                let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
205                Some(Arc::new(wal_manager))
206            } else {
207                None
208            }
209        } else {
210            None
211        };
212
213        // Create query cache with default capacity (1000 queries)
214        let query_cache = Arc::new(QueryCache::default());
215
216        Ok(Self {
217            config,
218            store,
219            #[cfg(feature = "rdf")]
220            rdf_store,
221            tx_manager,
222            buffer_manager,
223            #[cfg(feature = "wal")]
224            wal,
225            query_cache,
226            commit_counter: Arc::new(AtomicUsize::new(0)),
227            is_open: RwLock::new(true),
228            #[cfg(feature = "cdc")]
229            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
230            #[cfg(feature = "embed")]
231            embedding_models: RwLock::new(hashbrown::HashMap::new()),
232        })
233    }
234
235    /// Applies WAL records to restore the database state.
236    #[cfg(feature = "wal")]
237    fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
238        for record in records {
239            match record {
240                WalRecord::CreateNode { id, labels } => {
241                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
242                    store.create_node_with_id(*id, &label_refs);
243                }
244                WalRecord::DeleteNode { id } => {
245                    store.delete_node(*id);
246                }
247                WalRecord::CreateEdge {
248                    id,
249                    src,
250                    dst,
251                    edge_type,
252                } => {
253                    store.create_edge_with_id(*id, *src, *dst, edge_type);
254                }
255                WalRecord::DeleteEdge { id } => {
256                    store.delete_edge(*id);
257                }
258                WalRecord::SetNodeProperty { id, key, value } => {
259                    store.set_node_property(*id, key, value.clone());
260                }
261                WalRecord::SetEdgeProperty { id, key, value } => {
262                    store.set_edge_property(*id, key, value.clone());
263                }
264                WalRecord::AddNodeLabel { id, label } => {
265                    store.add_label(*id, label);
266                }
267                WalRecord::RemoveNodeLabel { id, label } => {
268                    store.remove_label(*id, label);
269                }
270                WalRecord::TxCommit { .. }
271                | WalRecord::TxAbort { .. }
272                | WalRecord::Checkpoint { .. } => {
273                    // Transaction control records don't need replay action
274                    // (recovery already filtered to only committed transactions)
275                }
276            }
277        }
278        Ok(())
279    }
280
281    /// Opens a new session for running queries.
282    ///
283    /// Sessions are cheap to create - spin up as many as you need. Each
284    /// gets its own transaction context, so concurrent sessions won't
285    /// block each other on reads.
286    ///
287    /// # Examples
288    ///
289    /// ```
290    /// use grafeo_engine::GrafeoDB;
291    ///
292    /// let db = GrafeoDB::new_in_memory();
293    /// let session = db.session();
294    ///
295    /// // Run queries through the session
296    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
297    /// # Ok::<(), grafeo_common::utils::error::Error>(())
298    /// ```
299    #[must_use]
300    pub fn session(&self) -> Session {
301        #[cfg(feature = "rdf")]
302        let mut session = Session::with_rdf_store_and_adaptive(
303            Arc::clone(&self.store),
304            Arc::clone(&self.rdf_store),
305            Arc::clone(&self.tx_manager),
306            Arc::clone(&self.query_cache),
307            self.config.adaptive.clone(),
308            self.config.factorized_execution,
309            self.config.graph_model,
310            self.config.query_timeout,
311            Arc::clone(&self.commit_counter),
312            self.config.gc_interval,
313        );
314        #[cfg(not(feature = "rdf"))]
315        let mut session = Session::with_adaptive(
316            Arc::clone(&self.store),
317            Arc::clone(&self.tx_manager),
318            Arc::clone(&self.query_cache),
319            self.config.adaptive.clone(),
320            self.config.factorized_execution,
321            self.config.graph_model,
322            self.config.query_timeout,
323            Arc::clone(&self.commit_counter),
324            self.config.gc_interval,
325        );
326
327        #[cfg(feature = "cdc")]
328        session.set_cdc_log(Arc::clone(&self.cdc_log));
329
330        // Suppress unused_mut when cdc is disabled
331        let _ = &mut session;
332
333        session
334    }
335
336    /// Returns the adaptive execution configuration.
337    #[must_use]
338    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
339        &self.config.adaptive
340    }
341
342    /// Runs a query directly on the database.
343    ///
344    /// A convenience method that creates a temporary session behind the
345    /// scenes. If you're running multiple queries, grab a
346    /// [`session()`](Self::session) instead to avoid the overhead.
347    ///
348    /// # Errors
349    ///
350    /// Returns an error if parsing or execution fails.
351    pub fn execute(&self, query: &str) -> Result<QueryResult> {
352        let session = self.session();
353        session.execute(query)
354    }
355
356    /// Executes a query with parameters and returns the result.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if the query fails.
361    pub fn execute_with_params(
362        &self,
363        query: &str,
364        params: std::collections::HashMap<String, grafeo_common::types::Value>,
365    ) -> Result<QueryResult> {
366        let session = self.session();
367        session.execute_with_params(query, params)
368    }
369
370    /// Executes a Cypher query and returns the result.
371    ///
372    /// # Errors
373    ///
374    /// Returns an error if the query fails.
375    #[cfg(feature = "cypher")]
376    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
377        let session = self.session();
378        session.execute_cypher(query)
379    }
380
381    /// Executes a Cypher query with parameters and returns the result.
382    ///
383    /// # Errors
384    ///
385    /// Returns an error if the query fails.
386    #[cfg(feature = "cypher")]
387    pub fn execute_cypher_with_params(
388        &self,
389        query: &str,
390        params: std::collections::HashMap<String, grafeo_common::types::Value>,
391    ) -> Result<QueryResult> {
392        use crate::query::processor::{QueryLanguage, QueryProcessor};
393
394        // Create processor
395        let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
396        processor.process(query, QueryLanguage::Cypher, Some(&params))
397    }
398
399    /// Executes a Gremlin query and returns the result.
400    ///
401    /// # Errors
402    ///
403    /// Returns an error if the query fails.
404    #[cfg(feature = "gremlin")]
405    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
406        let session = self.session();
407        session.execute_gremlin(query)
408    }
409
410    /// Executes a Gremlin query with parameters and returns the result.
411    ///
412    /// # Errors
413    ///
414    /// Returns an error if the query fails.
415    #[cfg(feature = "gremlin")]
416    pub fn execute_gremlin_with_params(
417        &self,
418        query: &str,
419        params: std::collections::HashMap<String, grafeo_common::types::Value>,
420    ) -> Result<QueryResult> {
421        let session = self.session();
422        session.execute_gremlin_with_params(query, params)
423    }
424
425    /// Executes a GraphQL query and returns the result.
426    ///
427    /// # Errors
428    ///
429    /// Returns an error if the query fails.
430    #[cfg(feature = "graphql")]
431    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
432        let session = self.session();
433        session.execute_graphql(query)
434    }
435
436    /// Executes a GraphQL query with parameters and returns the result.
437    ///
438    /// # Errors
439    ///
440    /// Returns an error if the query fails.
441    #[cfg(feature = "graphql")]
442    pub fn execute_graphql_with_params(
443        &self,
444        query: &str,
445        params: std::collections::HashMap<String, grafeo_common::types::Value>,
446    ) -> Result<QueryResult> {
447        let session = self.session();
448        session.execute_graphql_with_params(query, params)
449    }
450
451    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE) and returns the result.
452    ///
453    /// # Errors
454    ///
455    /// Returns an error if the query fails.
456    #[cfg(feature = "sql-pgq")]
457    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
458        let session = self.session();
459        session.execute_sql(query)
460    }
461
462    /// Executes a SQL/PGQ query with parameters and returns the result.
463    ///
464    /// # Errors
465    ///
466    /// Returns an error if the query fails.
467    #[cfg(feature = "sql-pgq")]
468    pub fn execute_sql_with_params(
469        &self,
470        query: &str,
471        params: std::collections::HashMap<String, grafeo_common::types::Value>,
472    ) -> Result<QueryResult> {
473        use crate::query::processor::{QueryLanguage, QueryProcessor};
474
475        // Create processor
476        let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
477        processor.process(query, QueryLanguage::SqlPgq, Some(&params))
478    }
479
480    /// Executes a SPARQL query and returns the result.
481    ///
482    /// SPARQL queries operate on the RDF triple store.
483    ///
484    /// # Errors
485    ///
486    /// Returns an error if the query fails.
487    ///
488    /// # Examples
489    ///
490    /// ```no_run
491    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
492    /// use grafeo_engine::GrafeoDB;
493    ///
494    /// let db = GrafeoDB::new_in_memory();
495    /// let result = db.execute_sparql("SELECT ?s ?p ?o WHERE { ?s ?p ?o }")?;
496    /// # Ok(())
497    /// # }
498    /// ```
499    #[cfg(all(feature = "sparql", feature = "rdf"))]
500    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
501        use crate::query::{
502            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
503        };
504
505        // Parse and translate the SPARQL query to a logical plan
506        let logical_plan = sparql_translator::translate(query)?;
507
508        // Optimize the plan
509        let optimizer = Optimizer::from_store(&self.store);
510        let optimized_plan = optimizer.optimize(logical_plan)?;
511
512        // Convert to physical plan using RDF planner
513        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
514        let mut physical_plan = planner.plan(&optimized_plan)?;
515
516        // Execute the plan
517        let executor = Executor::with_columns(physical_plan.columns.clone());
518        executor.execute(physical_plan.operator.as_mut())
519    }
520
521    /// Returns the RDF store.
522    ///
523    /// This provides direct access to the RDF store for triple operations.
524    #[cfg(feature = "rdf")]
525    #[must_use]
526    pub fn rdf_store(&self) -> &Arc<RdfStore> {
527        &self.rdf_store
528    }
529
530    /// Executes a query and returns a single scalar value.
531    ///
532    /// # Errors
533    ///
534    /// Returns an error if the query fails or doesn't return exactly one row.
535    pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
536        let result = self.execute(query)?;
537        result.scalar()
538    }
539
540    /// Returns the configuration.
541    #[must_use]
542    pub fn config(&self) -> &Config {
543        &self.config
544    }
545
546    /// Returns the graph data model of this database.
547    #[must_use]
548    pub fn graph_model(&self) -> crate::config::GraphModel {
549        self.config.graph_model
550    }
551
552    /// Returns the configured memory limit in bytes, if any.
553    #[must_use]
554    pub fn memory_limit(&self) -> Option<usize> {
555        self.config.memory_limit
556    }
557
558    /// Returns the underlying store.
559    ///
560    /// This provides direct access to the LPG store for algorithm implementations.
561    #[must_use]
562    pub fn store(&self) -> &Arc<LpgStore> {
563        &self.store
564    }
565
566    /// Garbage collects old MVCC versions that are no longer visible.
567    ///
568    /// Determines the minimum epoch required by active transactions and prunes
569    /// version chains older than that threshold. Also cleans up completed
570    /// transaction metadata in the transaction manager.
571    pub fn gc(&self) {
572        let min_epoch = self.tx_manager.min_active_epoch();
573        self.store.gc_versions(min_epoch);
574        self.tx_manager.gc();
575    }
576
577    /// Returns the buffer manager for memory-aware operations.
578    #[must_use]
579    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
580        &self.buffer_manager
581    }
582
583    /// Closes the database, flushing all pending writes.
584    ///
585    /// For persistent databases, this ensures everything is safely on disk.
586    /// Called automatically when the database is dropped, but you can call
587    /// it explicitly if you need to guarantee durability at a specific point.
588    ///
589    /// # Errors
590    ///
591    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
592    pub fn close(&self) -> Result<()> {
593        let mut is_open = self.is_open.write();
594        if !*is_open {
595            return Ok(());
596        }
597
598        // Commit and checkpoint WAL
599        #[cfg(feature = "wal")]
600        if let Some(ref wal) = self.wal {
601            let epoch = self.store.current_epoch();
602
603            // Use the last assigned transaction ID, or create a checkpoint-only tx
604            let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
605                // No transactions have been started; begin one for checkpoint
606                self.tx_manager.begin()
607            });
608
609            // Log a TxCommit to mark all pending records as committed
610            wal.log(&WalRecord::TxCommit {
611                tx_id: checkpoint_tx,
612            })?;
613
614            // Then checkpoint
615            wal.checkpoint(checkpoint_tx, epoch)?;
616            wal.sync()?;
617        }
618
619        *is_open = false;
620        Ok(())
621    }
622
623    /// Returns the WAL manager if available.
624    #[cfg(feature = "wal")]
625    #[must_use]
626    pub fn wal(&self) -> Option<&Arc<WalManager>> {
627        self.wal.as_ref()
628    }
629
630    /// Logs a WAL record if WAL is enabled.
631    #[cfg(feature = "wal")]
632    fn log_wal(&self, record: &WalRecord) -> Result<()> {
633        if let Some(ref wal) = self.wal {
634            wal.log(record)?;
635        }
636        Ok(())
637    }
638
639    /// Returns the number of nodes in the database.
640    #[must_use]
641    pub fn node_count(&self) -> usize {
642        self.store.node_count()
643    }
644
645    /// Returns the number of edges in the database.
646    #[must_use]
647    pub fn edge_count(&self) -> usize {
648        self.store.edge_count()
649    }
650
651    /// Returns the number of distinct labels in the database.
652    #[must_use]
653    pub fn label_count(&self) -> usize {
654        self.store.label_count()
655    }
656
657    /// Returns the number of distinct property keys in the database.
658    #[must_use]
659    pub fn property_key_count(&self) -> usize {
660        self.store.property_key_count()
661    }
662
663    /// Returns the number of distinct edge types in the database.
664    #[must_use]
665    pub fn edge_type_count(&self) -> usize {
666        self.store.edge_type_count()
667    }
668
669    // === Node Operations ===
670
671    /// Creates a node with the given labels and returns its ID.
672    ///
673    /// Labels categorize nodes - think of them like tags. A node can have
674    /// multiple labels (e.g., `["Person", "Employee"]`).
675    ///
676    /// # Examples
677    ///
678    /// ```
679    /// use grafeo_engine::GrafeoDB;
680    ///
681    /// let db = GrafeoDB::new_in_memory();
682    /// let alice = db.create_node(&["Person"]);
683    /// let company = db.create_node(&["Company", "Startup"]);
684    /// ```
685    pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
686        let id = self.store.create_node(labels);
687
688        // Log to WAL if enabled
689        #[cfg(feature = "wal")]
690        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
691            id,
692            labels: labels.iter().map(|s| (*s).to_string()).collect(),
693        }) {
694            tracing::warn!("Failed to log CreateNode to WAL: {}", e);
695        }
696
697        #[cfg(feature = "cdc")]
698        self.cdc_log
699            .record_create_node(id, self.store.current_epoch(), None);
700
701        id
702    }
703
704    /// Creates a new node with labels and properties.
705    ///
706    /// If WAL is enabled, the operation is logged for durability.
707    pub fn create_node_with_props(
708        &self,
709        labels: &[&str],
710        properties: impl IntoIterator<
711            Item = (
712                impl Into<grafeo_common::types::PropertyKey>,
713                impl Into<grafeo_common::types::Value>,
714            ),
715        >,
716    ) -> grafeo_common::types::NodeId {
717        // Collect properties first so we can log them to WAL
718        let props: Vec<(
719            grafeo_common::types::PropertyKey,
720            grafeo_common::types::Value,
721        )> = properties
722            .into_iter()
723            .map(|(k, v)| (k.into(), v.into()))
724            .collect();
725
726        let id = self
727            .store
728            .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
729
730        // Build CDC snapshot before WAL consumes props
731        #[cfg(feature = "cdc")]
732        let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
733            .iter()
734            .map(|(k, v)| (k.to_string(), v.clone()))
735            .collect();
736
737        // Log node creation to WAL
738        #[cfg(feature = "wal")]
739        {
740            if let Err(e) = self.log_wal(&WalRecord::CreateNode {
741                id,
742                labels: labels.iter().map(|s| (*s).to_string()).collect(),
743            }) {
744                tracing::warn!("Failed to log CreateNode to WAL: {}", e);
745            }
746
747            // Log each property to WAL for full durability
748            for (key, value) in props {
749                if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
750                    id,
751                    key: key.to_string(),
752                    value,
753                }) {
754                    tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
755                }
756            }
757        }
758
759        #[cfg(feature = "cdc")]
760        self.cdc_log.record_create_node(
761            id,
762            self.store.current_epoch(),
763            if cdc_props.is_empty() {
764                None
765            } else {
766                Some(cdc_props)
767            },
768        );
769
770        // Auto-insert into matching text indexes for the new node
771        #[cfg(feature = "text-index")]
772        if let Some(node) = self.store.get_node(id) {
773            for label in &node.labels {
774                for (prop_key, prop_val) in &node.properties {
775                    if let grafeo_common::types::Value::String(text) = prop_val
776                        && let Some(index) =
777                            self.store.get_text_index(label.as_str(), prop_key.as_ref())
778                    {
779                        index.write().insert(id, text);
780                    }
781                }
782            }
783        }
784
785        id
786    }
787
788    /// Gets a node by ID.
789    #[must_use]
790    pub fn get_node(
791        &self,
792        id: grafeo_common::types::NodeId,
793    ) -> Option<grafeo_core::graph::lpg::Node> {
794        self.store.get_node(id)
795    }
796
797    /// Deletes a node and all its edges.
798    ///
799    /// If WAL is enabled, the operation is logged for durability.
800    pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
801        // Capture properties for CDC before deletion
802        #[cfg(feature = "cdc")]
803        let cdc_props = self.store.get_node(id).map(|node| {
804            node.properties
805                .iter()
806                .map(|(k, v)| (k.to_string(), v.clone()))
807                .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
808        });
809
810        // Collect matching vector indexes BEFORE deletion removes labels
811        #[cfg(feature = "vector-index")]
812        let indexes_to_clean: Vec<std::sync::Arc<grafeo_core::index::vector::HnswIndex>> = self
813            .store
814            .get_node(id)
815            .map(|node| {
816                let mut indexes = Vec::new();
817                for label in &node.labels {
818                    let prefix = format!("{}:", label.as_str());
819                    for (key, index) in self.store.vector_index_entries() {
820                        if key.starts_with(&prefix) {
821                            indexes.push(index);
822                        }
823                    }
824                }
825                indexes
826            })
827            .unwrap_or_default();
828
829        // Collect matching text indexes BEFORE deletion removes labels
830        #[cfg(feature = "text-index")]
831        let text_indexes_to_clean: Vec<
832            std::sync::Arc<parking_lot::RwLock<grafeo_core::index::text::InvertedIndex>>,
833        > = self
834            .store
835            .get_node(id)
836            .map(|node| {
837                let mut indexes = Vec::new();
838                for label in &node.labels {
839                    let prefix = format!("{}:", label.as_str());
840                    for (key, index) in self.store.text_index_entries() {
841                        if key.starts_with(&prefix) {
842                            indexes.push(index);
843                        }
844                    }
845                }
846                indexes
847            })
848            .unwrap_or_default();
849
850        let result = self.store.delete_node(id);
851
852        // Remove from vector indexes after successful deletion
853        #[cfg(feature = "vector-index")]
854        if result {
855            for index in indexes_to_clean {
856                index.remove(id);
857            }
858        }
859
860        // Remove from text indexes after successful deletion
861        #[cfg(feature = "text-index")]
862        if result {
863            for index in text_indexes_to_clean {
864                index.write().remove(id);
865            }
866        }
867
868        #[cfg(feature = "wal")]
869        if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
870            tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
871        }
872
873        #[cfg(feature = "cdc")]
874        if result {
875            self.cdc_log.record_delete(
876                crate::cdc::EntityId::Node(id),
877                self.store.current_epoch(),
878                cdc_props,
879            );
880        }
881
882        result
883    }
884
885    /// Sets a property on a node.
886    ///
887    /// If WAL is enabled, the operation is logged for durability.
888    pub fn set_node_property(
889        &self,
890        id: grafeo_common::types::NodeId,
891        key: &str,
892        value: grafeo_common::types::Value,
893    ) {
894        // Extract vector data before the value is moved into the store
895        #[cfg(feature = "vector-index")]
896        let vector_data = match &value {
897            grafeo_common::types::Value::Vector(v) => Some(v.clone()),
898            _ => None,
899        };
900
901        // Log to WAL first
902        #[cfg(feature = "wal")]
903        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
904            id,
905            key: key.to_string(),
906            value: value.clone(),
907        }) {
908            tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
909        }
910
911        // Capture old value for CDC before the store write
912        #[cfg(feature = "cdc")]
913        let cdc_old_value = self
914            .store
915            .get_node_property(id, &grafeo_common::types::PropertyKey::new(key));
916        #[cfg(feature = "cdc")]
917        let cdc_new_value = value.clone();
918
919        self.store.set_node_property(id, key, value);
920
921        #[cfg(feature = "cdc")]
922        self.cdc_log.record_update(
923            crate::cdc::EntityId::Node(id),
924            self.store.current_epoch(),
925            key,
926            cdc_old_value,
927            cdc_new_value,
928        );
929
930        // Auto-insert into matching vector indexes
931        #[cfg(feature = "vector-index")]
932        if let Some(vec) = vector_data
933            && let Some(node) = self.store.get_node(id)
934        {
935            for label in &node.labels {
936                if let Some(index) = self.store.get_vector_index(label.as_str(), key) {
937                    let accessor =
938                        grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, key);
939                    index.insert(id, &vec, &accessor);
940                }
941            }
942        }
943
944        // Auto-update matching text indexes
945        #[cfg(feature = "text-index")]
946        if let Some(node) = self.store.get_node(id) {
947            let text_val = node
948                .properties
949                .get(&grafeo_common::types::PropertyKey::new(key))
950                .and_then(|v| match v {
951                    grafeo_common::types::Value::String(s) => Some(s.to_string()),
952                    _ => None,
953                });
954            for label in &node.labels {
955                if let Some(index) = self.store.get_text_index(label.as_str(), key) {
956                    let mut idx = index.write();
957                    if let Some(ref text) = text_val {
958                        idx.insert(id, text);
959                    } else {
960                        idx.remove(id);
961                    }
962                }
963            }
964        }
965    }
966
967    /// Adds a label to an existing node.
968    ///
969    /// Returns `true` if the label was added, `false` if the node doesn't exist
970    /// or already has the label.
971    ///
972    /// # Examples
973    ///
974    /// ```
975    /// use grafeo_engine::GrafeoDB;
976    ///
977    /// let db = GrafeoDB::new_in_memory();
978    /// let alice = db.create_node(&["Person"]);
979    ///
980    /// // Promote Alice to Employee
981    /// let added = db.add_node_label(alice, "Employee");
982    /// assert!(added);
983    /// ```
984    pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
985        let result = self.store.add_label(id, label);
986
987        #[cfg(feature = "wal")]
988        if result {
989            // Log to WAL if enabled
990            if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
991                id,
992                label: label.to_string(),
993            }) {
994                tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
995            }
996        }
997
998        // Auto-insert into vector indexes for the newly-added label
999        #[cfg(feature = "vector-index")]
1000        if result {
1001            let prefix = format!("{label}:");
1002            for (key, index) in self.store.vector_index_entries() {
1003                if let Some(property) = key.strip_prefix(&prefix)
1004                    && let Some(node) = self.store.get_node(id)
1005                {
1006                    let prop_key = grafeo_common::types::PropertyKey::new(property);
1007                    if let Some(grafeo_common::types::Value::Vector(v)) =
1008                        node.properties.get(&prop_key)
1009                    {
1010                        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
1011                            &self.store,
1012                            property,
1013                        );
1014                        index.insert(id, v, &accessor);
1015                    }
1016                }
1017            }
1018        }
1019
1020        // Auto-insert into text indexes for the newly-added label
1021        #[cfg(feature = "text-index")]
1022        if result && let Some(node) = self.store.get_node(id) {
1023            for (prop_key, prop_val) in &node.properties {
1024                if let grafeo_common::types::Value::String(text) = prop_val
1025                    && let Some(index) = self.store.get_text_index(label, prop_key.as_ref())
1026                {
1027                    index.write().insert(id, text);
1028                }
1029            }
1030        }
1031
1032        result
1033    }
1034
1035    /// Removes a label from a node.
1036    ///
1037    /// Returns `true` if the label was removed, `false` if the node doesn't exist
1038    /// or doesn't have the label.
1039    ///
1040    /// # Examples
1041    ///
1042    /// ```
1043    /// use grafeo_engine::GrafeoDB;
1044    ///
1045    /// let db = GrafeoDB::new_in_memory();
1046    /// let alice = db.create_node(&["Person", "Employee"]);
1047    ///
1048    /// // Remove Employee status
1049    /// let removed = db.remove_node_label(alice, "Employee");
1050    /// assert!(removed);
1051    /// ```
1052    pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
1053        let result = self.store.remove_label(id, label);
1054
1055        #[cfg(feature = "wal")]
1056        if result {
1057            // Log to WAL if enabled
1058            if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
1059                id,
1060                label: label.to_string(),
1061            }) {
1062                tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
1063            }
1064        }
1065
1066        result
1067    }
1068
1069    /// Gets all labels for a node.
1070    ///
1071    /// Returns `None` if the node doesn't exist.
1072    ///
1073    /// # Examples
1074    ///
1075    /// ```
1076    /// use grafeo_engine::GrafeoDB;
1077    ///
1078    /// let db = GrafeoDB::new_in_memory();
1079    /// let alice = db.create_node(&["Person", "Employee"]);
1080    ///
1081    /// let labels = db.get_node_labels(alice).unwrap();
1082    /// assert!(labels.contains(&"Person".to_string()));
1083    /// assert!(labels.contains(&"Employee".to_string()));
1084    /// ```
1085    #[must_use]
1086    pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
1087        self.store
1088            .get_node(id)
1089            .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
1090    }
1091
1092    // === Edge Operations ===
1093
1094    /// Creates an edge (relationship) between two nodes.
1095    ///
1096    /// Edges connect nodes and have a type that describes the relationship.
1097    /// They're directed - the order of `src` and `dst` matters.
1098    ///
1099    /// # Examples
1100    ///
1101    /// ```
1102    /// use grafeo_engine::GrafeoDB;
1103    ///
1104    /// let db = GrafeoDB::new_in_memory();
1105    /// let alice = db.create_node(&["Person"]);
1106    /// let bob = db.create_node(&["Person"]);
1107    ///
1108    /// // Alice knows Bob (directed: Alice -> Bob)
1109    /// let edge = db.create_edge(alice, bob, "KNOWS");
1110    /// ```
1111    pub fn create_edge(
1112        &self,
1113        src: grafeo_common::types::NodeId,
1114        dst: grafeo_common::types::NodeId,
1115        edge_type: &str,
1116    ) -> grafeo_common::types::EdgeId {
1117        let id = self.store.create_edge(src, dst, edge_type);
1118
1119        // Log to WAL if enabled
1120        #[cfg(feature = "wal")]
1121        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
1122            id,
1123            src,
1124            dst,
1125            edge_type: edge_type.to_string(),
1126        }) {
1127            tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
1128        }
1129
1130        #[cfg(feature = "cdc")]
1131        self.cdc_log
1132            .record_create_edge(id, self.store.current_epoch(), None);
1133
1134        id
1135    }
1136
1137    /// Creates a new edge with properties.
1138    ///
1139    /// If WAL is enabled, the operation is logged for durability.
1140    pub fn create_edge_with_props(
1141        &self,
1142        src: grafeo_common::types::NodeId,
1143        dst: grafeo_common::types::NodeId,
1144        edge_type: &str,
1145        properties: impl IntoIterator<
1146            Item = (
1147                impl Into<grafeo_common::types::PropertyKey>,
1148                impl Into<grafeo_common::types::Value>,
1149            ),
1150        >,
1151    ) -> grafeo_common::types::EdgeId {
1152        // Collect properties first so we can log them to WAL
1153        let props: Vec<(
1154            grafeo_common::types::PropertyKey,
1155            grafeo_common::types::Value,
1156        )> = properties
1157            .into_iter()
1158            .map(|(k, v)| (k.into(), v.into()))
1159            .collect();
1160
1161        let id = self.store.create_edge_with_props(
1162            src,
1163            dst,
1164            edge_type,
1165            props.iter().map(|(k, v)| (k.clone(), v.clone())),
1166        );
1167
1168        // Build CDC snapshot before WAL consumes props
1169        #[cfg(feature = "cdc")]
1170        let cdc_props: std::collections::HashMap<String, grafeo_common::types::Value> = props
1171            .iter()
1172            .map(|(k, v)| (k.to_string(), v.clone()))
1173            .collect();
1174
1175        // Log edge creation to WAL
1176        #[cfg(feature = "wal")]
1177        {
1178            if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
1179                id,
1180                src,
1181                dst,
1182                edge_type: edge_type.to_string(),
1183            }) {
1184                tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
1185            }
1186
1187            // Log each property to WAL for full durability
1188            for (key, value) in props {
1189                if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
1190                    id,
1191                    key: key.to_string(),
1192                    value,
1193                }) {
1194                    tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
1195                }
1196            }
1197        }
1198
1199        #[cfg(feature = "cdc")]
1200        self.cdc_log.record_create_edge(
1201            id,
1202            self.store.current_epoch(),
1203            if cdc_props.is_empty() {
1204                None
1205            } else {
1206                Some(cdc_props)
1207            },
1208        );
1209
1210        id
1211    }
1212
1213    /// Gets an edge by ID.
1214    #[must_use]
1215    pub fn get_edge(
1216        &self,
1217        id: grafeo_common::types::EdgeId,
1218    ) -> Option<grafeo_core::graph::lpg::Edge> {
1219        self.store.get_edge(id)
1220    }
1221
1222    /// Deletes an edge.
1223    ///
1224    /// If WAL is enabled, the operation is logged for durability.
1225    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
1226        // Capture properties for CDC before deletion
1227        #[cfg(feature = "cdc")]
1228        let cdc_props = self.store.get_edge(id).map(|edge| {
1229            edge.properties
1230                .iter()
1231                .map(|(k, v)| (k.to_string(), v.clone()))
1232                .collect::<std::collections::HashMap<String, grafeo_common::types::Value>>()
1233        });
1234
1235        let result = self.store.delete_edge(id);
1236
1237        #[cfg(feature = "wal")]
1238        if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
1239            tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
1240        }
1241
1242        #[cfg(feature = "cdc")]
1243        if result {
1244            self.cdc_log.record_delete(
1245                crate::cdc::EntityId::Edge(id),
1246                self.store.current_epoch(),
1247                cdc_props,
1248            );
1249        }
1250
1251        result
1252    }
1253
1254    /// Sets a property on an edge.
1255    ///
1256    /// If WAL is enabled, the operation is logged for durability.
1257    pub fn set_edge_property(
1258        &self,
1259        id: grafeo_common::types::EdgeId,
1260        key: &str,
1261        value: grafeo_common::types::Value,
1262    ) {
1263        // Log to WAL first
1264        #[cfg(feature = "wal")]
1265        if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
1266            id,
1267            key: key.to_string(),
1268            value: value.clone(),
1269        }) {
1270            tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
1271        }
1272
1273        // Capture old value for CDC before the store write
1274        #[cfg(feature = "cdc")]
1275        let cdc_old_value = self
1276            .store
1277            .get_edge_property(id, &grafeo_common::types::PropertyKey::new(key));
1278        #[cfg(feature = "cdc")]
1279        let cdc_new_value = value.clone();
1280
1281        self.store.set_edge_property(id, key, value);
1282
1283        #[cfg(feature = "cdc")]
1284        self.cdc_log.record_update(
1285            crate::cdc::EntityId::Edge(id),
1286            self.store.current_epoch(),
1287            key,
1288            cdc_old_value,
1289            cdc_new_value,
1290        );
1291    }
1292
1293    /// Removes a property from a node.
1294    ///
1295    /// Returns true if the property existed and was removed, false otherwise.
1296    pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
1297        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
1298        self.store.remove_node_property(id, key).is_some()
1299    }
1300
1301    /// Removes a property from an edge.
1302    ///
1303    /// Returns true if the property existed and was removed, false otherwise.
1304    pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
1305        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
1306        self.store.remove_edge_property(id, key).is_some()
1307    }
1308
1309    // =========================================================================
1310    // PROPERTY INDEX API
1311    // =========================================================================
1312
1313    /// Creates an index on a node property for O(1) lookups by value.
1314    ///
1315    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
1316    /// O(1) instead of O(n) for this property. The index is automatically
1317    /// maintained when properties are set or removed.
1318    ///
1319    /// # Example
1320    ///
1321    /// ```no_run
1322    /// # use grafeo_engine::GrafeoDB;
1323    /// # use grafeo_common::types::Value;
1324    /// # let db = GrafeoDB::new_in_memory();
1325    /// // Create an index on the 'email' property
1326    /// db.create_property_index("email");
1327    ///
1328    /// // Now lookups by email are O(1)
1329    /// let nodes = db.find_nodes_by_property("email", &Value::from("alice@example.com"));
1330    /// ```
1331    pub fn create_property_index(&self, property: &str) {
1332        self.store.create_property_index(property);
1333    }
1334
1335    /// Creates a vector similarity index on a node property.
1336    ///
1337    /// This enables efficient approximate nearest-neighbor search on vector
1338    /// properties. Currently validates the index parameters and scans existing
1339    /// nodes to verify the property contains vectors of the expected dimensions.
1340    ///
1341    /// # Arguments
1342    ///
1343    /// * `label` - Node label to index (e.g., `"Doc"`)
1344    /// * `property` - Property containing vector embeddings (e.g., `"embedding"`)
1345    /// * `dimensions` - Expected vector dimensions (inferred from data if `None`)
1346    /// * `metric` - Distance metric: `"cosine"` (default), `"euclidean"`, `"dot_product"`, `"manhattan"`
1347    /// * `m` - HNSW links per node (default: 16). Higher = better recall, more memory.
1348    /// * `ef_construction` - Construction beam width (default: 128). Higher = better index quality, slower build.
1349    ///
1350    /// # Errors
1351    ///
1352    /// Returns an error if the metric is invalid, no vectors are found, or
1353    /// dimensions don't match.
1354    pub fn create_vector_index(
1355        &self,
1356        label: &str,
1357        property: &str,
1358        dimensions: Option<usize>,
1359        metric: Option<&str>,
1360        m: Option<usize>,
1361        ef_construction: Option<usize>,
1362    ) -> Result<()> {
1363        use grafeo_common::types::{PropertyKey, Value};
1364        use grafeo_core::index::vector::DistanceMetric;
1365
1366        let metric = match metric {
1367            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1368                grafeo_common::utils::error::Error::Internal(format!(
1369                    "Unknown distance metric '{}'. Use: cosine, euclidean, dot_product, manhattan",
1370                    m
1371                ))
1372            })?,
1373            None => DistanceMetric::Cosine,
1374        };
1375
1376        // Scan nodes to validate vectors exist and check dimensions
1377        let prop_key = PropertyKey::new(property);
1378        let mut found_dims: Option<usize> = dimensions;
1379        let mut vector_count = 0usize;
1380
1381        #[cfg(feature = "vector-index")]
1382        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1383
1384        for node in self.store.nodes_with_label(label) {
1385            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1386                if let Some(expected) = found_dims {
1387                    if v.len() != expected {
1388                        return Err(grafeo_common::utils::error::Error::Internal(format!(
1389                            "Vector dimension mismatch: expected {}, found {} on node {}",
1390                            expected,
1391                            v.len(),
1392                            node.id.0
1393                        )));
1394                    }
1395                } else {
1396                    found_dims = Some(v.len());
1397                }
1398                vector_count += 1;
1399                #[cfg(feature = "vector-index")]
1400                vectors.push((node.id, v.to_vec()));
1401            }
1402        }
1403
1404        let Some(dims) = found_dims else {
1405            // No vectors found yet — caller must have supplied explicit dimensions
1406            // so we can create an empty index that auto-populates via set_node_property.
1407            return if let Some(d) = dimensions {
1408                #[cfg(feature = "vector-index")]
1409                {
1410                    use grafeo_core::index::vector::{HnswConfig, HnswIndex};
1411
1412                    let mut config = HnswConfig::new(d, metric);
1413                    if let Some(m_val) = m {
1414                        config = config.with_m(m_val);
1415                    }
1416                    if let Some(ef_c) = ef_construction {
1417                        config = config.with_ef_construction(ef_c);
1418                    }
1419
1420                    let index = HnswIndex::new(config);
1421                    self.store
1422                        .add_vector_index(label, property, Arc::new(index));
1423                }
1424
1425                let _ = (m, ef_construction);
1426                tracing::info!(
1427                    "Empty vector index created: :{label}({property}) - 0 vectors, {d} dimensions, metric={metric_name}",
1428                    metric_name = metric.name()
1429                );
1430                Ok(())
1431            } else {
1432                Err(grafeo_common::utils::error::Error::Internal(format!(
1433                    "No vector properties found on :{label}({property}) and no dimensions specified"
1434                )))
1435            };
1436        };
1437
1438        // Build and populate the HNSW index
1439        #[cfg(feature = "vector-index")]
1440        {
1441            use grafeo_core::index::vector::{HnswConfig, HnswIndex};
1442
1443            let mut config = HnswConfig::new(dims, metric);
1444            if let Some(m_val) = m {
1445                config = config.with_m(m_val);
1446            }
1447            if let Some(ef_c) = ef_construction {
1448                config = config.with_ef_construction(ef_c);
1449            }
1450
1451            let index = HnswIndex::with_capacity(config, vectors.len());
1452            let accessor =
1453                grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1454            for (node_id, vec) in &vectors {
1455                index.insert(*node_id, vec, &accessor);
1456            }
1457
1458            self.store
1459                .add_vector_index(label, property, Arc::new(index));
1460        }
1461
1462        // Suppress unused variable warnings when vector-index is off
1463        let _ = (m, ef_construction);
1464
1465        tracing::info!(
1466            "Vector index created: :{label}({property}) - {vector_count} vectors, {dims} dimensions, metric={metric_name}",
1467            metric_name = metric.name()
1468        );
1469
1470        Ok(())
1471    }
1472
1473    /// Drops a vector index for the given label and property.
1474    ///
1475    /// Returns `true` if the index existed and was removed, `false` if no
1476    /// index was found.
1477    ///
1478    /// After dropping, [`vector_search`](Self::vector_search) for this
1479    /// label+property pair will return an error.
1480    #[cfg(feature = "vector-index")]
1481    pub fn drop_vector_index(&self, label: &str, property: &str) -> bool {
1482        let removed = self.store.remove_vector_index(label, property);
1483        if removed {
1484            tracing::info!("Vector index dropped: :{label}({property})");
1485        }
1486        removed
1487    }
1488
1489    /// Drops and recreates a vector index, rescanning all matching nodes.
1490    ///
1491    /// This is useful after bulk inserts or when the index may be out of sync.
1492    /// The previous index configuration (dimensions, metric, M, ef\_construction)
1493    /// is preserved.
1494    ///
1495    /// # Errors
1496    ///
1497    /// Returns an error if no vector index exists for this label+property pair,
1498    /// or if the rebuild fails (e.g., no matching vectors found).
1499    #[cfg(feature = "vector-index")]
1500    pub fn rebuild_vector_index(&self, label: &str, property: &str) -> Result<()> {
1501        let config = self
1502            .store
1503            .get_vector_index(label, property)
1504            .map(|idx| idx.config().clone())
1505            .ok_or_else(|| {
1506                grafeo_common::utils::error::Error::Internal(format!(
1507                    "No vector index found for :{label}({property}). Cannot rebuild."
1508                ))
1509            })?;
1510
1511        self.store.remove_vector_index(label, property);
1512
1513        self.create_vector_index(
1514            label,
1515            property,
1516            Some(config.dimensions),
1517            Some(config.metric.name()),
1518            Some(config.m),
1519            Some(config.ef_construction),
1520        )
1521    }
1522
1523    /// Computes a node allowlist from property filters.
1524    ///
1525    /// Supports equality filters (scalar values) and operator filters (Map values
1526    /// with `$`-prefixed keys like `$gt`, `$lt`, `$in`, `$contains`).
1527    ///
1528    /// Returns `None` if filters is `None` or empty (meaning no filtering),
1529    /// or `Some(set)` with the intersection (possibly empty).
1530    #[cfg(feature = "vector-index")]
1531    fn compute_filter_allowlist(
1532        &self,
1533        label: &str,
1534        filters: Option<&std::collections::HashMap<String, Value>>,
1535    ) -> Option<std::collections::HashSet<NodeId>> {
1536        let filters = filters.filter(|f| !f.is_empty())?;
1537
1538        // Start with all nodes for this label
1539        let label_nodes: std::collections::HashSet<NodeId> =
1540            self.store.nodes_by_label(label).into_iter().collect();
1541
1542        let mut allowlist = label_nodes;
1543
1544        for (key, filter_value) in filters {
1545            // Check if this is an operator filter (Map with $-prefixed keys)
1546            let is_operator_filter = matches!(filter_value, Value::Map(ops) if ops.keys().any(|k| k.as_str().starts_with('$')));
1547
1548            let matching: std::collections::HashSet<NodeId> = if is_operator_filter {
1549                // Operator filter: must scan nodes and check each
1550                self.store
1551                    .find_nodes_matching_filter(key, filter_value)
1552                    .into_iter()
1553                    .collect()
1554            } else {
1555                // Equality filter: use indexed lookup when available
1556                self.store
1557                    .find_nodes_by_property(key, filter_value)
1558                    .into_iter()
1559                    .collect()
1560            };
1561            allowlist = allowlist.intersection(&matching).copied().collect();
1562
1563            // Short-circuit: empty intersection means no results possible
1564            if allowlist.is_empty() {
1565                return Some(allowlist);
1566            }
1567        }
1568
1569        Some(allowlist)
1570    }
1571
1572    /// Searches for the k nearest neighbors of a query vector.
1573    ///
1574    /// Uses the HNSW index created by [`create_vector_index`](Self::create_vector_index).
1575    ///
1576    /// # Arguments
1577    ///
1578    /// * `label` - Node label that was indexed
1579    /// * `property` - Property that was indexed
1580    /// * `query` - Query vector (slice of floats)
1581    /// * `k` - Number of nearest neighbors to return
1582    /// * `ef` - Search beam width (higher = better recall, slower). Uses index default if `None`.
1583    /// * `filters` - Optional property equality filters. Only nodes matching all
1584    ///   `(key, value)` pairs will appear in results.
1585    ///
1586    /// # Returns
1587    ///
1588    /// Vector of `(NodeId, distance)` pairs sorted by distance ascending.
1589    #[cfg(feature = "vector-index")]
1590    pub fn vector_search(
1591        &self,
1592        label: &str,
1593        property: &str,
1594        query: &[f32],
1595        k: usize,
1596        ef: Option<usize>,
1597        filters: Option<&std::collections::HashMap<String, Value>>,
1598    ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1599        let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1600            grafeo_common::utils::error::Error::Internal(format!(
1601                "No vector index found for :{label}({property}). Call create_vector_index() first."
1602            ))
1603        })?;
1604
1605        let accessor =
1606            grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1607
1608        let results = match self.compute_filter_allowlist(label, filters) {
1609            Some(allowlist) => match ef {
1610                Some(ef_val) => {
1611                    index.search_with_ef_and_filter(query, k, ef_val, &allowlist, &accessor)
1612                }
1613                None => index.search_with_filter(query, k, &allowlist, &accessor),
1614            },
1615            None => match ef {
1616                Some(ef_val) => index.search_with_ef(query, k, ef_val, &accessor),
1617                None => index.search(query, k, &accessor),
1618            },
1619        };
1620
1621        Ok(results)
1622    }
1623
1624    /// Creates multiple nodes in bulk, each with a single vector property.
1625    ///
1626    /// Much faster than individual `create_node_with_props` calls because it
1627    /// acquires internal locks once and loops in Rust rather than crossing
1628    /// the FFI boundary per vector.
1629    ///
1630    /// # Arguments
1631    ///
1632    /// * `label` - Label applied to all created nodes
1633    /// * `property` - Property name for the vector data
1634    /// * `vectors` - Vector data for each node
1635    ///
1636    /// # Returns
1637    ///
1638    /// Vector of created `NodeId`s in the same order as the input vectors.
1639    pub fn batch_create_nodes(
1640        &self,
1641        label: &str,
1642        property: &str,
1643        vectors: Vec<Vec<f32>>,
1644    ) -> Vec<grafeo_common::types::NodeId> {
1645        use grafeo_common::types::{PropertyKey, Value};
1646
1647        let prop_key = PropertyKey::new(property);
1648        let labels: &[&str] = &[label];
1649
1650        let ids: Vec<grafeo_common::types::NodeId> = vectors
1651            .into_iter()
1652            .map(|vec| {
1653                let value = Value::Vector(vec.into());
1654                let id = self.store.create_node_with_props(
1655                    labels,
1656                    std::iter::once((prop_key.clone(), value.clone())),
1657                );
1658
1659                // Log to WAL
1660                #[cfg(feature = "wal")]
1661                {
1662                    if let Err(e) = self.log_wal(&WalRecord::CreateNode {
1663                        id,
1664                        labels: labels.iter().map(|s| (*s).to_string()).collect(),
1665                    }) {
1666                        tracing::warn!("Failed to log CreateNode to WAL: {}", e);
1667                    }
1668                    if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
1669                        id,
1670                        key: property.to_string(),
1671                        value,
1672                    }) {
1673                        tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
1674                    }
1675                }
1676
1677                id
1678            })
1679            .collect();
1680
1681        // Auto-insert into matching vector index if one exists
1682        #[cfg(feature = "vector-index")]
1683        if let Some(index) = self.store.get_vector_index(label, property) {
1684            let accessor =
1685                grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1686            for &id in &ids {
1687                if let Some(node) = self.store.get_node(id) {
1688                    let pk = grafeo_common::types::PropertyKey::new(property);
1689                    if let Some(grafeo_common::types::Value::Vector(v)) = node.properties.get(&pk) {
1690                        index.insert(id, v, &accessor);
1691                    }
1692                }
1693            }
1694        }
1695
1696        ids
1697    }
1698
1699    /// Searches for nearest neighbors for multiple query vectors in parallel.
1700    ///
1701    /// Uses rayon parallel iteration under the hood for multi-core throughput.
1702    ///
1703    /// # Arguments
1704    ///
1705    /// * `label` - Node label that was indexed
1706    /// * `property` - Property that was indexed
1707    /// * `queries` - Batch of query vectors
1708    /// * `k` - Number of nearest neighbors per query
1709    /// * `ef` - Search beam width (uses index default if `None`)
1710    /// * `filters` - Optional property equality filters
1711    #[cfg(feature = "vector-index")]
1712    pub fn batch_vector_search(
1713        &self,
1714        label: &str,
1715        property: &str,
1716        queries: &[Vec<f32>],
1717        k: usize,
1718        ef: Option<usize>,
1719        filters: Option<&std::collections::HashMap<String, Value>>,
1720    ) -> Result<Vec<Vec<(grafeo_common::types::NodeId, f32)>>> {
1721        let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1722            grafeo_common::utils::error::Error::Internal(format!(
1723                "No vector index found for :{label}({property}). Call create_vector_index() first."
1724            ))
1725        })?;
1726
1727        let accessor =
1728            grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1729
1730        let results = match self.compute_filter_allowlist(label, filters) {
1731            Some(allowlist) => match ef {
1732                Some(ef_val) => {
1733                    index.batch_search_with_ef_and_filter(queries, k, ef_val, &allowlist, &accessor)
1734                }
1735                None => index.batch_search_with_filter(queries, k, &allowlist, &accessor),
1736            },
1737            None => match ef {
1738                Some(ef_val) => index.batch_search_with_ef(queries, k, ef_val, &accessor),
1739                None => index.batch_search(queries, k, &accessor),
1740            },
1741        };
1742
1743        Ok(results)
1744    }
1745
1746    /// Searches for diverse nearest neighbors using Maximal Marginal Relevance (MMR).
1747    ///
1748    /// MMR balances relevance (similarity to query) with diversity (dissimilarity
1749    /// among selected results). This is the algorithm used by LangChain's
1750    /// `mmr_traversal_search()` for RAG applications.
1751    ///
1752    /// # Arguments
1753    ///
1754    /// * `label` - Node label that was indexed
1755    /// * `property` - Property that was indexed
1756    /// * `query` - Query vector
1757    /// * `k` - Number of diverse results to return
1758    /// * `fetch_k` - Number of initial candidates from HNSW (default: `4 * k`)
1759    /// * `lambda` - Relevance vs. diversity in \[0, 1\] (default: 0.5).
1760    ///   1.0 = pure relevance, 0.0 = pure diversity.
1761    /// * `ef` - HNSW search beam width (uses index default if `None`)
1762    /// * `filters` - Optional property equality filters
1763    ///
1764    /// # Returns
1765    ///
1766    /// `(NodeId, distance)` pairs in MMR selection order. The f32 is the original
1767    /// distance from the query, matching [`vector_search`](Self::vector_search).
1768    #[cfg(feature = "vector-index")]
1769    #[allow(clippy::too_many_arguments)]
1770    pub fn mmr_search(
1771        &self,
1772        label: &str,
1773        property: &str,
1774        query: &[f32],
1775        k: usize,
1776        fetch_k: Option<usize>,
1777        lambda: Option<f32>,
1778        ef: Option<usize>,
1779        filters: Option<&std::collections::HashMap<String, Value>>,
1780    ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1781        use grafeo_core::index::vector::mmr_select;
1782
1783        let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1784            grafeo_common::utils::error::Error::Internal(format!(
1785                "No vector index found for :{label}({property}). Call create_vector_index() first."
1786            ))
1787        })?;
1788
1789        let accessor =
1790            grafeo_core::index::vector::PropertyVectorAccessor::new(&self.store, property);
1791
1792        let fetch_k = fetch_k.unwrap_or(k.saturating_mul(4).max(k));
1793        let lambda = lambda.unwrap_or(0.5);
1794
1795        // Step 1: Fetch candidates from HNSW (with optional filter)
1796        let initial_results = match self.compute_filter_allowlist(label, filters) {
1797            Some(allowlist) => match ef {
1798                Some(ef_val) => {
1799                    index.search_with_ef_and_filter(query, fetch_k, ef_val, &allowlist, &accessor)
1800                }
1801                None => index.search_with_filter(query, fetch_k, &allowlist, &accessor),
1802            },
1803            None => match ef {
1804                Some(ef_val) => index.search_with_ef(query, fetch_k, ef_val, &accessor),
1805                None => index.search(query, fetch_k, &accessor),
1806            },
1807        };
1808
1809        if initial_results.is_empty() {
1810            return Ok(Vec::new());
1811        }
1812
1813        // Step 2: Retrieve stored vectors for MMR pairwise comparison
1814        use grafeo_core::index::vector::VectorAccessor;
1815        let candidates: Vec<(grafeo_common::types::NodeId, f32, std::sync::Arc<[f32]>)> =
1816            initial_results
1817                .into_iter()
1818                .filter_map(|(id, dist)| accessor.get_vector(id).map(|vec| (id, dist, vec)))
1819                .collect();
1820
1821        // Step 3: Build slice-based candidates for mmr_select
1822        let candidate_refs: Vec<(grafeo_common::types::NodeId, f32, &[f32])> = candidates
1823            .iter()
1824            .map(|(id, dist, vec)| (*id, *dist, vec.as_ref()))
1825            .collect();
1826
1827        // Step 4: Run MMR selection
1828        let metric = index.config().metric;
1829        Ok(mmr_select(query, &candidate_refs, k, lambda, metric))
1830    }
1831
1832    // ── Text Search ──────────────────────────────────────────────────────
1833
1834    /// Creates a BM25 text index on a node property for full-text search.
1835    ///
1836    /// Indexes all existing nodes with the given label and property.
1837    /// New mutations to indexed properties are **not** automatically tracked;
1838    /// call [`rebuild_text_index`](Self::rebuild_text_index) after bulk updates.
1839    ///
1840    /// # Errors
1841    ///
1842    /// Returns an error if the label has no nodes or the property contains no text values.
1843    #[cfg(feature = "text-index")]
1844    pub fn create_text_index(&self, label: &str, property: &str) -> Result<()> {
1845        use grafeo_common::types::PropertyKey;
1846        use grafeo_core::index::text::{BM25Config, InvertedIndex};
1847
1848        let mut index = InvertedIndex::new(BM25Config::default());
1849        let prop_key = PropertyKey::new(property);
1850
1851        // Index all existing nodes with this label + property
1852        let nodes = self.store.nodes_by_label(label);
1853        for node_id in nodes {
1854            if let Some(Value::String(text)) = self.store.get_node_property(node_id, &prop_key) {
1855                index.insert(node_id, text.as_str());
1856            }
1857        }
1858
1859        self.store
1860            .add_text_index(label, property, Arc::new(RwLock::new(index)));
1861        Ok(())
1862    }
1863
1864    /// Drops a text index on a label+property pair.
1865    ///
1866    /// Returns `true` if the index existed and was removed.
1867    #[cfg(feature = "text-index")]
1868    pub fn drop_text_index(&self, label: &str, property: &str) -> bool {
1869        self.store.remove_text_index(label, property)
1870    }
1871
1872    /// Rebuilds a text index by re-scanning all matching nodes.
1873    ///
1874    /// Use after bulk property updates to keep the index current.
1875    ///
1876    /// # Errors
1877    ///
1878    /// Returns an error if no text index exists for this label+property.
1879    #[cfg(feature = "text-index")]
1880    pub fn rebuild_text_index(&self, label: &str, property: &str) -> Result<()> {
1881        self.store.remove_text_index(label, property);
1882        self.create_text_index(label, property)
1883    }
1884
1885    /// Searches a text index using BM25 scoring.
1886    ///
1887    /// Returns up to `k` results as `(NodeId, score)` pairs sorted by
1888    /// descending relevance score.
1889    ///
1890    /// # Errors
1891    ///
1892    /// Returns an error if no text index exists for this label+property.
1893    #[cfg(feature = "text-index")]
1894    pub fn text_search(
1895        &self,
1896        label: &str,
1897        property: &str,
1898        query: &str,
1899        k: usize,
1900    ) -> Result<Vec<(NodeId, f64)>> {
1901        let index = self.store.get_text_index(label, property).ok_or_else(|| {
1902            Error::Internal(format!(
1903                "No text index found for :{label}({property}). Call create_text_index() first."
1904            ))
1905        })?;
1906
1907        Ok(index.read().search(query, k))
1908    }
1909
1910    /// Performs hybrid search combining text (BM25) and vector similarity.
1911    ///
1912    /// Runs both text search and vector search, then fuses results using
1913    /// the specified method (default: Reciprocal Rank Fusion).
1914    ///
1915    /// # Arguments
1916    ///
1917    /// * `label` - Node label to search within
1918    /// * `text_property` - Property indexed for text search
1919    /// * `vector_property` - Property indexed for vector search
1920    /// * `query_text` - Text query for BM25 search
1921    /// * `query_vector` - Vector query for similarity search (optional)
1922    /// * `k` - Number of results to return
1923    /// * `fusion` - Score fusion method (default: RRF with k=60)
1924    ///
1925    /// # Errors
1926    ///
1927    /// Returns an error if the required indexes don't exist.
1928    #[cfg(feature = "hybrid-search")]
1929    #[allow(clippy::too_many_arguments)]
1930    pub fn hybrid_search(
1931        &self,
1932        label: &str,
1933        text_property: &str,
1934        vector_property: &str,
1935        query_text: &str,
1936        query_vector: Option<&[f32]>,
1937        k: usize,
1938        fusion: Option<grafeo_core::index::text::FusionMethod>,
1939    ) -> Result<Vec<(NodeId, f64)>> {
1940        use grafeo_core::index::text::fuse_results;
1941
1942        let fusion_method = fusion.unwrap_or_default();
1943        let mut sources: Vec<Vec<(NodeId, f64)>> = Vec::new();
1944
1945        // Text search
1946        if let Some(text_index) = self.store.get_text_index(label, text_property) {
1947            let text_results = text_index.read().search(query_text, k * 2);
1948            if !text_results.is_empty() {
1949                sources.push(text_results);
1950            }
1951        }
1952
1953        // Vector search (if query vector provided)
1954        if let Some(query_vec) = query_vector
1955            && let Some(vector_index) = self.store.get_vector_index(label, vector_property)
1956        {
1957            let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(
1958                &self.store,
1959                vector_property,
1960            );
1961            let vector_results = vector_index.search(query_vec, k * 2, &accessor);
1962            if !vector_results.is_empty() {
1963                sources.push(
1964                    vector_results
1965                        .into_iter()
1966                        .map(|(id, dist)| (id, f64::from(dist)))
1967                        .collect(),
1968                );
1969            }
1970        }
1971
1972        if sources.is_empty() {
1973            return Ok(Vec::new());
1974        }
1975
1976        Ok(fuse_results(&sources, &fusion_method, k))
1977    }
1978
1979    // ── Embedding ────────────────────────────────────────────────────────
1980
1981    /// Registers an embedding model for text-to-vector conversion.
1982    ///
1983    /// Once registered, you can use [`embed_text()`](Self::embed_text) and
1984    /// [`vector_search_text()`](Self::vector_search_text) with the model name.
1985    #[cfg(feature = "embed")]
1986    pub fn register_embedding_model(
1987        &self,
1988        name: &str,
1989        model: Arc<dyn crate::embedding::EmbeddingModel>,
1990    ) {
1991        self.embedding_models
1992            .write()
1993            .insert(name.to_string(), model);
1994    }
1995
1996    /// Generates embeddings for a batch of texts using a registered model.
1997    ///
1998    /// # Errors
1999    ///
2000    /// Returns an error if the model is not registered or embedding fails.
2001    #[cfg(feature = "embed")]
2002    pub fn embed_text(&self, model_name: &str, texts: &[&str]) -> Result<Vec<Vec<f32>>> {
2003        let models = self.embedding_models.read();
2004        let model = models.get(model_name).ok_or_else(|| {
2005            grafeo_common::utils::error::Error::Internal(format!(
2006                "Embedding model '{}' not registered",
2007                model_name
2008            ))
2009        })?;
2010        model.embed(texts)
2011    }
2012
2013    /// Searches a vector index using a text query, generating the embedding on-the-fly.
2014    ///
2015    /// This combines [`embed_text()`](Self::embed_text) with
2016    /// [`vector_search()`](Self::vector_search) in a single call.
2017    ///
2018    /// # Errors
2019    ///
2020    /// Returns an error if the model is not registered, embedding fails,
2021    /// or the vector index doesn't exist.
2022    #[cfg(all(feature = "embed", feature = "vector-index"))]
2023    pub fn vector_search_text(
2024        &self,
2025        label: &str,
2026        property: &str,
2027        model_name: &str,
2028        query_text: &str,
2029        k: usize,
2030        ef: Option<usize>,
2031    ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
2032        let vectors = self.embed_text(model_name, &[query_text])?;
2033        let query_vec = vectors.into_iter().next().ok_or_else(|| {
2034            grafeo_common::utils::error::Error::Internal(
2035                "Embedding model returned no vectors".to_string(),
2036            )
2037        })?;
2038        self.vector_search(label, property, &query_vec, k, ef, None)
2039    }
2040
2041    // ── Change Data Capture ─────────────────────────────────────────────
2042
2043    /// Returns the full change history for an entity (node or edge).
2044    ///
2045    /// Events are ordered chronologically by epoch.
2046    ///
2047    /// # Errors
2048    ///
2049    /// Returns an error if the CDC feature is not enabled.
2050    #[cfg(feature = "cdc")]
2051    pub fn history(
2052        &self,
2053        entity_id: impl Into<crate::cdc::EntityId>,
2054    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2055        Ok(self.cdc_log.history(entity_id.into()))
2056    }
2057
2058    /// Returns change events for an entity since the given epoch.
2059    #[cfg(feature = "cdc")]
2060    pub fn history_since(
2061        &self,
2062        entity_id: impl Into<crate::cdc::EntityId>,
2063        since_epoch: grafeo_common::types::EpochId,
2064    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2065        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
2066    }
2067
2068    /// Returns all change events across all entities in an epoch range.
2069    #[cfg(feature = "cdc")]
2070    pub fn changes_between(
2071        &self,
2072        start_epoch: grafeo_common::types::EpochId,
2073        end_epoch: grafeo_common::types::EpochId,
2074    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
2075        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
2076    }
2077
2078    // ── Property Indexes ────────────────────────────────────────────────
2079
2080    /// Drops an index on a node property.
2081    ///
2082    /// Returns `true` if the index existed and was removed.
2083    pub fn drop_property_index(&self, property: &str) -> bool {
2084        self.store.drop_property_index(property)
2085    }
2086
2087    /// Returns `true` if the property has an index.
2088    #[must_use]
2089    pub fn has_property_index(&self, property: &str) -> bool {
2090        self.store.has_property_index(property)
2091    }
2092
2093    /// Finds all nodes that have a specific property value.
2094    ///
2095    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
2096    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
2097    ///
2098    /// # Example
2099    ///
2100    /// ```no_run
2101    /// # use grafeo_engine::GrafeoDB;
2102    /// # use grafeo_common::types::Value;
2103    /// # let db = GrafeoDB::new_in_memory();
2104    /// // Create index for fast lookups (optional but recommended)
2105    /// db.create_property_index("city");
2106    ///
2107    /// // Find all nodes where city = "NYC"
2108    /// let nyc_nodes = db.find_nodes_by_property("city", &Value::from("NYC"));
2109    /// ```
2110    #[must_use]
2111    pub fn find_nodes_by_property(
2112        &self,
2113        property: &str,
2114        value: &grafeo_common::types::Value,
2115    ) -> Vec<grafeo_common::types::NodeId> {
2116        self.store.find_nodes_by_property(property, value)
2117    }
2118
2119    // =========================================================================
2120    // ADMIN API: Introspection
2121    // =========================================================================
2122
2123    /// Returns true if this database is backed by a file (persistent).
2124    ///
2125    /// In-memory databases return false.
2126    #[must_use]
2127    pub fn is_persistent(&self) -> bool {
2128        self.config.path.is_some()
2129    }
2130
2131    /// Returns the database file path, if persistent.
2132    ///
2133    /// In-memory databases return None.
2134    #[must_use]
2135    pub fn path(&self) -> Option<&Path> {
2136        self.config.path.as_deref()
2137    }
2138
2139    /// Returns high-level database information.
2140    ///
2141    /// Includes node/edge counts, persistence status, and mode (LPG/RDF).
2142    #[must_use]
2143    pub fn info(&self) -> crate::admin::DatabaseInfo {
2144        crate::admin::DatabaseInfo {
2145            mode: crate::admin::DatabaseMode::Lpg,
2146            node_count: self.store.node_count(),
2147            edge_count: self.store.edge_count(),
2148            is_persistent: self.is_persistent(),
2149            path: self.config.path.clone(),
2150            wal_enabled: self.config.wal_enabled,
2151            version: env!("CARGO_PKG_VERSION").to_string(),
2152        }
2153    }
2154
2155    /// Returns detailed database statistics.
2156    ///
2157    /// Includes counts, memory usage, and index information.
2158    #[must_use]
2159    pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2160        #[cfg(feature = "wal")]
2161        let disk_bytes = self.config.path.as_ref().and_then(|p| {
2162            if p.exists() {
2163                Self::calculate_disk_usage(p).ok()
2164            } else {
2165                None
2166            }
2167        });
2168        #[cfg(not(feature = "wal"))]
2169        let disk_bytes: Option<usize> = None;
2170
2171        crate::admin::DatabaseStats {
2172            node_count: self.store.node_count(),
2173            edge_count: self.store.edge_count(),
2174            label_count: self.store.label_count(),
2175            edge_type_count: self.store.edge_type_count(),
2176            property_key_count: self.store.property_key_count(),
2177            index_count: 0, // TODO: implement index tracking
2178            memory_bytes: self.buffer_manager.allocated(),
2179            disk_bytes,
2180        }
2181    }
2182
2183    /// Calculates total disk usage for the database directory.
2184    #[cfg(feature = "wal")]
2185    fn calculate_disk_usage(path: &Path) -> Result<usize> {
2186        let mut total = 0usize;
2187        if path.is_dir() {
2188            for entry in std::fs::read_dir(path)? {
2189                let entry = entry?;
2190                let metadata = entry.metadata()?;
2191                if metadata.is_file() {
2192                    total += metadata.len() as usize;
2193                } else if metadata.is_dir() {
2194                    total += Self::calculate_disk_usage(&entry.path())?;
2195                }
2196            }
2197        }
2198        Ok(total)
2199    }
2200
2201    /// Returns schema information (labels, edge types, property keys).
2202    ///
2203    /// For LPG mode, returns label and edge type information.
2204    /// For RDF mode, returns predicate and named graph information.
2205    #[must_use]
2206    pub fn schema(&self) -> crate::admin::SchemaInfo {
2207        let labels = self
2208            .store
2209            .all_labels()
2210            .into_iter()
2211            .map(|name| crate::admin::LabelInfo {
2212                name: name.clone(),
2213                count: self.store.nodes_with_label(&name).count(),
2214            })
2215            .collect();
2216
2217        let edge_types = self
2218            .store
2219            .all_edge_types()
2220            .into_iter()
2221            .map(|name| crate::admin::EdgeTypeInfo {
2222                name: name.clone(),
2223                count: self.store.edges_with_type(&name).count(),
2224            })
2225            .collect();
2226
2227        let property_keys = self.store.all_property_keys();
2228
2229        crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
2230            labels,
2231            edge_types,
2232            property_keys,
2233        })
2234    }
2235
2236    /// Returns RDF schema information.
2237    ///
2238    /// Only available when the RDF feature is enabled.
2239    #[cfg(feature = "rdf")]
2240    #[must_use]
2241    pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
2242        let stats = self.rdf_store.stats();
2243
2244        let predicates = self
2245            .rdf_store
2246            .predicates()
2247            .into_iter()
2248            .map(|predicate| {
2249                let count = self.rdf_store.triples_with_predicate(&predicate).len();
2250                crate::admin::PredicateInfo {
2251                    iri: predicate.to_string(),
2252                    count,
2253                }
2254            })
2255            .collect();
2256
2257        crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
2258            predicates,
2259            named_graphs: Vec::new(), // Named graphs not yet implemented in RdfStore
2260            subject_count: stats.subject_count,
2261            object_count: stats.object_count,
2262        })
2263    }
2264
2265    /// Validates database integrity.
2266    ///
2267    /// Checks for:
2268    /// - Dangling edge references (edges pointing to non-existent nodes)
2269    /// - Internal index consistency
2270    ///
2271    /// Returns a list of errors and warnings. Empty errors = valid.
2272    #[must_use]
2273    pub fn validate(&self) -> crate::admin::ValidationResult {
2274        let mut result = crate::admin::ValidationResult::default();
2275
2276        // Check for dangling edge references
2277        for edge in self.store.all_edges() {
2278            if self.store.get_node(edge.src).is_none() {
2279                result.errors.push(crate::admin::ValidationError {
2280                    code: "DANGLING_SRC".to_string(),
2281                    message: format!(
2282                        "Edge {} references non-existent source node {}",
2283                        edge.id.0, edge.src.0
2284                    ),
2285                    context: Some(format!("edge:{}", edge.id.0)),
2286                });
2287            }
2288            if self.store.get_node(edge.dst).is_none() {
2289                result.errors.push(crate::admin::ValidationError {
2290                    code: "DANGLING_DST".to_string(),
2291                    message: format!(
2292                        "Edge {} references non-existent destination node {}",
2293                        edge.id.0, edge.dst.0
2294                    ),
2295                    context: Some(format!("edge:{}", edge.id.0)),
2296                });
2297            }
2298        }
2299
2300        // Add warnings for potential issues
2301        if self.store.node_count() > 0 && self.store.edge_count() == 0 {
2302            result.warnings.push(crate::admin::ValidationWarning {
2303                code: "NO_EDGES".to_string(),
2304                message: "Database has nodes but no edges".to_string(),
2305                context: None,
2306            });
2307        }
2308
2309        result
2310    }
2311
2312    /// Returns WAL (Write-Ahead Log) status.
2313    ///
2314    /// Returns None if WAL is not enabled.
2315    #[must_use]
2316    pub fn wal_status(&self) -> crate::admin::WalStatus {
2317        #[cfg(feature = "wal")]
2318        if let Some(ref wal) = self.wal {
2319            return crate::admin::WalStatus {
2320                enabled: true,
2321                path: self.config.path.as_ref().map(|p| p.join("wal")),
2322                size_bytes: wal.size_bytes(),
2323                record_count: wal.record_count() as usize,
2324                last_checkpoint: wal.last_checkpoint_timestamp(),
2325                current_epoch: self.store.current_epoch().as_u64(),
2326            };
2327        }
2328
2329        crate::admin::WalStatus {
2330            enabled: false,
2331            path: None,
2332            size_bytes: 0,
2333            record_count: 0,
2334            last_checkpoint: None,
2335            current_epoch: self.store.current_epoch().as_u64(),
2336        }
2337    }
2338
2339    /// Forces a WAL checkpoint.
2340    ///
2341    /// Flushes all pending WAL records to the main storage.
2342    ///
2343    /// # Errors
2344    ///
2345    /// Returns an error if the checkpoint fails.
2346    pub fn wal_checkpoint(&self) -> Result<()> {
2347        #[cfg(feature = "wal")]
2348        if let Some(ref wal) = self.wal {
2349            let epoch = self.store.current_epoch();
2350            let tx_id = self
2351                .tx_manager
2352                .last_assigned_tx_id()
2353                .unwrap_or_else(|| self.tx_manager.begin());
2354            wal.checkpoint(tx_id, epoch)?;
2355            wal.sync()?;
2356        }
2357        Ok(())
2358    }
2359
2360    // =========================================================================
2361    // ADMIN API: Persistence Control
2362    // =========================================================================
2363
2364    /// Saves the database to a file path.
2365    ///
2366    /// - If in-memory: creates a new persistent database at path
2367    /// - If file-backed: creates a copy at the new path
2368    ///
2369    /// The original database remains unchanged.
2370    ///
2371    /// # Errors
2372    ///
2373    /// Returns an error if the save operation fails.
2374    ///
2375    /// Requires the `wal` feature for persistence support.
2376    #[cfg(feature = "wal")]
2377    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
2378        let path = path.as_ref();
2379
2380        // Create target database with WAL enabled
2381        let target_config = Config::persistent(path);
2382        let target = Self::with_config(target_config)?;
2383
2384        // Copy all nodes using WAL-enabled methods
2385        for node in self.store.all_nodes() {
2386            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
2387            target.store.create_node_with_id(node.id, &label_refs);
2388
2389            // Log to WAL
2390            target.log_wal(&WalRecord::CreateNode {
2391                id: node.id,
2392                labels: node.labels.iter().map(|s| s.to_string()).collect(),
2393            })?;
2394
2395            // Copy properties
2396            for (key, value) in node.properties {
2397                target
2398                    .store
2399                    .set_node_property(node.id, key.as_str(), value.clone());
2400                target.log_wal(&WalRecord::SetNodeProperty {
2401                    id: node.id,
2402                    key: key.to_string(),
2403                    value,
2404                })?;
2405            }
2406        }
2407
2408        // Copy all edges using WAL-enabled methods
2409        for edge in self.store.all_edges() {
2410            target
2411                .store
2412                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
2413
2414            // Log to WAL
2415            target.log_wal(&WalRecord::CreateEdge {
2416                id: edge.id,
2417                src: edge.src,
2418                dst: edge.dst,
2419                edge_type: edge.edge_type.to_string(),
2420            })?;
2421
2422            // Copy properties
2423            for (key, value) in edge.properties {
2424                target
2425                    .store
2426                    .set_edge_property(edge.id, key.as_str(), value.clone());
2427                target.log_wal(&WalRecord::SetEdgeProperty {
2428                    id: edge.id,
2429                    key: key.to_string(),
2430                    value,
2431                })?;
2432            }
2433        }
2434
2435        // Checkpoint and close the target database
2436        target.close()?;
2437
2438        Ok(())
2439    }
2440
2441    /// Creates an in-memory copy of this database.
2442    ///
2443    /// Returns a new database that is completely independent.
2444    /// Useful for:
2445    /// - Testing modifications without affecting the original
2446    /// - Faster operations when persistence isn't needed
2447    ///
2448    /// # Errors
2449    ///
2450    /// Returns an error if the copy operation fails.
2451    pub fn to_memory(&self) -> Result<Self> {
2452        let config = Config::in_memory();
2453        let target = Self::with_config(config)?;
2454
2455        // Copy all nodes
2456        for node in self.store.all_nodes() {
2457            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
2458            target.store.create_node_with_id(node.id, &label_refs);
2459
2460            // Copy properties
2461            for (key, value) in node.properties {
2462                target.store.set_node_property(node.id, key.as_str(), value);
2463            }
2464        }
2465
2466        // Copy all edges
2467        for edge in self.store.all_edges() {
2468            target
2469                .store
2470                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
2471
2472            // Copy properties
2473            for (key, value) in edge.properties {
2474                target.store.set_edge_property(edge.id, key.as_str(), value);
2475            }
2476        }
2477
2478        Ok(target)
2479    }
2480
2481    /// Opens a database file and loads it entirely into memory.
2482    ///
2483    /// The returned database has no connection to the original file.
2484    /// Changes will NOT be written back to the file.
2485    ///
2486    /// # Errors
2487    ///
2488    /// Returns an error if the file can't be opened or loaded.
2489    #[cfg(feature = "wal")]
2490    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
2491        // Open the source database (triggers WAL recovery)
2492        let source = Self::open(path)?;
2493
2494        // Create in-memory copy
2495        let target = source.to_memory()?;
2496
2497        // Close the source (releases file handles)
2498        source.close()?;
2499
2500        Ok(target)
2501    }
2502
2503    // =========================================================================
2504    // ADMIN API: Snapshot Export/Import
2505    // =========================================================================
2506
2507    /// Exports the entire database to a binary snapshot.
2508    ///
2509    /// The returned bytes can be stored (e.g. in IndexedDB) and later
2510    /// restored with [`import_snapshot()`](Self::import_snapshot).
2511    ///
2512    /// # Errors
2513    ///
2514    /// Returns an error if serialization fails.
2515    pub fn export_snapshot(&self) -> Result<Vec<u8>> {
2516        let nodes: Vec<SnapshotNode> = self
2517            .store
2518            .all_nodes()
2519            .map(|n| SnapshotNode {
2520                id: n.id,
2521                labels: n.labels.iter().map(|l| l.to_string()).collect(),
2522                properties: n
2523                    .properties
2524                    .into_iter()
2525                    .map(|(k, v)| (k.to_string(), v))
2526                    .collect(),
2527            })
2528            .collect();
2529
2530        let edges: Vec<SnapshotEdge> = self
2531            .store
2532            .all_edges()
2533            .map(|e| SnapshotEdge {
2534                id: e.id,
2535                src: e.src,
2536                dst: e.dst,
2537                edge_type: e.edge_type.to_string(),
2538                properties: e
2539                    .properties
2540                    .into_iter()
2541                    .map(|(k, v)| (k.to_string(), v))
2542                    .collect(),
2543            })
2544            .collect();
2545
2546        let snapshot = Snapshot {
2547            version: 1,
2548            nodes,
2549            edges,
2550        };
2551
2552        let config = bincode::config::standard();
2553        bincode::serde::encode_to_vec(&snapshot, config)
2554            .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
2555    }
2556
2557    /// Creates a new in-memory database from a binary snapshot.
2558    ///
2559    /// The `data` must have been produced by [`export_snapshot()`](Self::export_snapshot).
2560    ///
2561    /// # Errors
2562    ///
2563    /// Returns an error if the snapshot is invalid or deserialization fails.
2564    pub fn import_snapshot(data: &[u8]) -> Result<Self> {
2565        let config = bincode::config::standard();
2566        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
2567            .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
2568
2569        if snapshot.version != 1 {
2570            return Err(Error::Internal(format!(
2571                "unsupported snapshot version: {}",
2572                snapshot.version
2573            )));
2574        }
2575
2576        let db = Self::new_in_memory();
2577
2578        for node in snapshot.nodes {
2579            let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
2580            db.store.create_node_with_id(node.id, &label_refs);
2581            for (key, value) in node.properties {
2582                db.store.set_node_property(node.id, &key, value);
2583            }
2584        }
2585
2586        for edge in snapshot.edges {
2587            db.store
2588                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
2589            for (key, value) in edge.properties {
2590                db.store.set_edge_property(edge.id, &key, value);
2591            }
2592        }
2593
2594        Ok(db)
2595    }
2596
2597    // =========================================================================
2598    // ADMIN API: Iteration
2599    // =========================================================================
2600
2601    /// Returns an iterator over all nodes in the database.
2602    ///
2603    /// Useful for dump/export operations.
2604    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
2605        self.store.all_nodes()
2606    }
2607
2608    /// Returns an iterator over all edges in the database.
2609    ///
2610    /// Useful for dump/export operations.
2611    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
2612        self.store.all_edges()
2613    }
2614}
2615
2616/// Binary snapshot format for database export/import.
2617#[derive(serde::Serialize, serde::Deserialize)]
2618struct Snapshot {
2619    version: u8,
2620    nodes: Vec<SnapshotNode>,
2621    edges: Vec<SnapshotEdge>,
2622}
2623
2624#[derive(serde::Serialize, serde::Deserialize)]
2625struct SnapshotNode {
2626    id: NodeId,
2627    labels: Vec<String>,
2628    properties: Vec<(String, Value)>,
2629}
2630
2631#[derive(serde::Serialize, serde::Deserialize)]
2632struct SnapshotEdge {
2633    id: EdgeId,
2634    src: NodeId,
2635    dst: NodeId,
2636    edge_type: String,
2637    properties: Vec<(String, Value)>,
2638}
2639
2640impl Drop for GrafeoDB {
2641    fn drop(&mut self) {
2642        if let Err(e) = self.close() {
2643            tracing::error!("Error closing database: {}", e);
2644        }
2645    }
2646}
2647
2648impl crate::admin::AdminService for GrafeoDB {
2649    fn info(&self) -> crate::admin::DatabaseInfo {
2650        self.info()
2651    }
2652
2653    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
2654        self.detailed_stats()
2655    }
2656
2657    fn schema(&self) -> crate::admin::SchemaInfo {
2658        self.schema()
2659    }
2660
2661    fn validate(&self) -> crate::admin::ValidationResult {
2662        self.validate()
2663    }
2664
2665    fn wal_status(&self) -> crate::admin::WalStatus {
2666        self.wal_status()
2667    }
2668
2669    fn wal_checkpoint(&self) -> Result<()> {
2670        self.wal_checkpoint()
2671    }
2672}
2673
2674/// The result of running a query.
2675///
2676/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
2677/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
2678///
2679/// # Examples
2680///
2681/// ```
2682/// use grafeo_engine::GrafeoDB;
2683///
2684/// let db = GrafeoDB::new_in_memory();
2685/// db.create_node(&["Person"]);
2686///
2687/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
2688///
2689/// // Check what we got
2690/// println!("Columns: {:?}", result.columns);
2691/// println!("Rows: {}", result.row_count());
2692///
2693/// // Iterate through results
2694/// for row in result.iter() {
2695///     println!("{:?}", row);
2696/// }
2697/// # Ok::<(), grafeo_common::utils::error::Error>(())
2698/// ```
2699#[derive(Debug)]
2700pub struct QueryResult {
2701    /// Column names from the RETURN clause.
2702    pub columns: Vec<String>,
2703    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
2704    pub column_types: Vec<grafeo_common::types::LogicalType>,
2705    /// The actual result rows.
2706    pub rows: Vec<Vec<grafeo_common::types::Value>>,
2707    /// Query execution time in milliseconds (if timing was enabled).
2708    pub execution_time_ms: Option<f64>,
2709    /// Number of rows scanned during query execution (estimate).
2710    pub rows_scanned: Option<u64>,
2711}
2712
2713impl QueryResult {
2714    /// Creates a new empty query result.
2715    #[must_use]
2716    pub fn new(columns: Vec<String>) -> Self {
2717        let len = columns.len();
2718        Self {
2719            columns,
2720            column_types: vec![grafeo_common::types::LogicalType::Any; len],
2721            rows: Vec::new(),
2722            execution_time_ms: None,
2723            rows_scanned: None,
2724        }
2725    }
2726
2727    /// Creates a new empty query result with column types.
2728    #[must_use]
2729    pub fn with_types(
2730        columns: Vec<String>,
2731        column_types: Vec<grafeo_common::types::LogicalType>,
2732    ) -> Self {
2733        Self {
2734            columns,
2735            column_types,
2736            rows: Vec::new(),
2737            execution_time_ms: None,
2738            rows_scanned: None,
2739        }
2740    }
2741
2742    /// Sets the execution metrics on this result.
2743    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
2744        self.execution_time_ms = Some(execution_time_ms);
2745        self.rows_scanned = Some(rows_scanned);
2746        self
2747    }
2748
2749    /// Returns the execution time in milliseconds, if available.
2750    #[must_use]
2751    pub fn execution_time_ms(&self) -> Option<f64> {
2752        self.execution_time_ms
2753    }
2754
2755    /// Returns the number of rows scanned, if available.
2756    #[must_use]
2757    pub fn rows_scanned(&self) -> Option<u64> {
2758        self.rows_scanned
2759    }
2760
2761    /// Returns the number of rows.
2762    #[must_use]
2763    pub fn row_count(&self) -> usize {
2764        self.rows.len()
2765    }
2766
2767    /// Returns the number of columns.
2768    #[must_use]
2769    pub fn column_count(&self) -> usize {
2770        self.columns.len()
2771    }
2772
2773    /// Returns true if the result is empty.
2774    #[must_use]
2775    pub fn is_empty(&self) -> bool {
2776        self.rows.is_empty()
2777    }
2778
2779    /// Extracts a single value from the result.
2780    ///
2781    /// Use this when your query returns exactly one row with one column,
2782    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
2783    ///
2784    /// # Errors
2785    ///
2786    /// Returns an error if the result has multiple rows or columns.
2787    pub fn scalar<T: FromValue>(&self) -> Result<T> {
2788        if self.rows.len() != 1 || self.columns.len() != 1 {
2789            return Err(grafeo_common::utils::error::Error::InvalidValue(
2790                "Expected single value".to_string(),
2791            ));
2792        }
2793        T::from_value(&self.rows[0][0])
2794    }
2795
2796    /// Returns an iterator over the rows.
2797    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
2798        self.rows.iter()
2799    }
2800}
2801
2802/// Converts a [`Value`] to a concrete Rust type.
2803///
2804/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
2805/// Used by [`QueryResult::scalar()`] to extract typed values.
2806pub trait FromValue: Sized {
2807    /// Attempts the conversion, returning an error on type mismatch.
2808    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
2809}
2810
2811impl FromValue for i64 {
2812    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2813        value
2814            .as_int64()
2815            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2816                expected: "INT64".to_string(),
2817                found: value.type_name().to_string(),
2818            })
2819    }
2820}
2821
2822impl FromValue for f64 {
2823    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2824        value
2825            .as_float64()
2826            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2827                expected: "FLOAT64".to_string(),
2828                found: value.type_name().to_string(),
2829            })
2830    }
2831}
2832
2833impl FromValue for String {
2834    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2835        value.as_str().map(String::from).ok_or_else(|| {
2836            grafeo_common::utils::error::Error::TypeMismatch {
2837                expected: "STRING".to_string(),
2838                found: value.type_name().to_string(),
2839            }
2840        })
2841    }
2842}
2843
2844impl FromValue for bool {
2845    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
2846        value
2847            .as_bool()
2848            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
2849                expected: "BOOL".to_string(),
2850                found: value.type_name().to_string(),
2851            })
2852    }
2853}
2854
2855#[cfg(test)]
2856mod tests {
2857    use super::*;
2858
2859    #[test]
2860    fn test_create_in_memory_database() {
2861        let db = GrafeoDB::new_in_memory();
2862        assert_eq!(db.node_count(), 0);
2863        assert_eq!(db.edge_count(), 0);
2864    }
2865
2866    #[test]
2867    fn test_database_config() {
2868        let config = Config::in_memory().with_threads(4).with_query_logging();
2869
2870        let db = GrafeoDB::with_config(config).unwrap();
2871        assert_eq!(db.config().threads, 4);
2872        assert!(db.config().query_logging);
2873    }
2874
2875    #[test]
2876    fn test_database_session() {
2877        let db = GrafeoDB::new_in_memory();
2878        let _session = db.session();
2879        // Session should be created successfully
2880    }
2881
2882    #[cfg(feature = "wal")]
2883    #[test]
2884    fn test_persistent_database_recovery() {
2885        use grafeo_common::types::Value;
2886        use tempfile::tempdir;
2887
2888        let dir = tempdir().unwrap();
2889        let db_path = dir.path().join("test_db");
2890
2891        // Create database and add some data
2892        {
2893            let db = GrafeoDB::open(&db_path).unwrap();
2894
2895            let alice = db.create_node(&["Person"]);
2896            db.set_node_property(alice, "name", Value::from("Alice"));
2897
2898            let bob = db.create_node(&["Person"]);
2899            db.set_node_property(bob, "name", Value::from("Bob"));
2900
2901            let _edge = db.create_edge(alice, bob, "KNOWS");
2902
2903            // Explicitly close to flush WAL
2904            db.close().unwrap();
2905        }
2906
2907        // Reopen and verify data was recovered
2908        {
2909            let db = GrafeoDB::open(&db_path).unwrap();
2910
2911            assert_eq!(db.node_count(), 2);
2912            assert_eq!(db.edge_count(), 1);
2913
2914            // Verify nodes exist
2915            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
2916            assert!(node0.is_some());
2917
2918            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
2919            assert!(node1.is_some());
2920        }
2921    }
2922
2923    #[cfg(feature = "wal")]
2924    #[test]
2925    fn test_wal_logging() {
2926        use tempfile::tempdir;
2927
2928        let dir = tempdir().unwrap();
2929        let db_path = dir.path().join("wal_test_db");
2930
2931        let db = GrafeoDB::open(&db_path).unwrap();
2932
2933        // Create some data
2934        let node = db.create_node(&["Test"]);
2935        db.delete_node(node);
2936
2937        // WAL should have records
2938        if let Some(wal) = db.wal() {
2939            assert!(wal.record_count() > 0);
2940        }
2941
2942        db.close().unwrap();
2943    }
2944
2945    #[cfg(feature = "wal")]
2946    #[test]
2947    fn test_wal_recovery_multiple_sessions() {
2948        // Tests that WAL recovery works correctly across multiple open/close cycles
2949        use grafeo_common::types::Value;
2950        use tempfile::tempdir;
2951
2952        let dir = tempdir().unwrap();
2953        let db_path = dir.path().join("multi_session_db");
2954
2955        // Session 1: Create initial data
2956        {
2957            let db = GrafeoDB::open(&db_path).unwrap();
2958            let alice = db.create_node(&["Person"]);
2959            db.set_node_property(alice, "name", Value::from("Alice"));
2960            db.close().unwrap();
2961        }
2962
2963        // Session 2: Add more data
2964        {
2965            let db = GrafeoDB::open(&db_path).unwrap();
2966            assert_eq!(db.node_count(), 1); // Previous data recovered
2967            let bob = db.create_node(&["Person"]);
2968            db.set_node_property(bob, "name", Value::from("Bob"));
2969            db.close().unwrap();
2970        }
2971
2972        // Session 3: Verify all data
2973        {
2974            let db = GrafeoDB::open(&db_path).unwrap();
2975            assert_eq!(db.node_count(), 2);
2976
2977            // Verify properties were recovered correctly
2978            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
2979            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
2980
2981            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
2982            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
2983        }
2984    }
2985
2986    #[cfg(feature = "wal")]
2987    #[test]
2988    fn test_database_consistency_after_mutations() {
2989        // Tests that database remains consistent after a series of create/delete operations
2990        use grafeo_common::types::Value;
2991        use tempfile::tempdir;
2992
2993        let dir = tempdir().unwrap();
2994        let db_path = dir.path().join("consistency_db");
2995
2996        {
2997            let db = GrafeoDB::open(&db_path).unwrap();
2998
2999            // Create nodes
3000            let a = db.create_node(&["Node"]);
3001            let b = db.create_node(&["Node"]);
3002            let c = db.create_node(&["Node"]);
3003
3004            // Create edges
3005            let e1 = db.create_edge(a, b, "LINKS");
3006            let _e2 = db.create_edge(b, c, "LINKS");
3007
3008            // Delete middle node and its edge
3009            db.delete_edge(e1);
3010            db.delete_node(b);
3011
3012            // Set properties on remaining nodes
3013            db.set_node_property(a, "value", Value::Int64(1));
3014            db.set_node_property(c, "value", Value::Int64(3));
3015
3016            db.close().unwrap();
3017        }
3018
3019        // Reopen and verify consistency
3020        {
3021            let db = GrafeoDB::open(&db_path).unwrap();
3022
3023            // Should have 2 nodes (a and c), b was deleted
3024            // Note: node_count includes deleted nodes in some implementations
3025            // What matters is that the non-deleted nodes are accessible
3026            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
3027            assert!(node_a.is_some());
3028
3029            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
3030            assert!(node_c.is_some());
3031
3032            // Middle node should be deleted
3033            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
3034            assert!(node_b.is_none());
3035        }
3036    }
3037
3038    #[cfg(feature = "wal")]
3039    #[test]
3040    fn test_close_is_idempotent() {
3041        // Calling close() multiple times should not cause errors
3042        use tempfile::tempdir;
3043
3044        let dir = tempdir().unwrap();
3045        let db_path = dir.path().join("close_test_db");
3046
3047        let db = GrafeoDB::open(&db_path).unwrap();
3048        db.create_node(&["Test"]);
3049
3050        // First close should succeed
3051        assert!(db.close().is_ok());
3052
3053        // Second close should also succeed (idempotent)
3054        assert!(db.close().is_ok());
3055    }
3056
3057    #[test]
3058    fn test_query_result_has_metrics() {
3059        // Verifies that query results include execution metrics
3060        let db = GrafeoDB::new_in_memory();
3061        db.create_node(&["Person"]);
3062        db.create_node(&["Person"]);
3063
3064        #[cfg(feature = "gql")]
3065        {
3066            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
3067
3068            // Metrics should be populated
3069            assert!(result.execution_time_ms.is_some());
3070            assert!(result.rows_scanned.is_some());
3071            assert!(result.execution_time_ms.unwrap() >= 0.0);
3072            assert_eq!(result.rows_scanned.unwrap(), 2);
3073        }
3074    }
3075
3076    #[test]
3077    fn test_empty_query_result_metrics() {
3078        // Verifies metrics are correct for queries returning no results
3079        let db = GrafeoDB::new_in_memory();
3080        db.create_node(&["Person"]);
3081
3082        #[cfg(feature = "gql")]
3083        {
3084            // Query that matches nothing
3085            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
3086
3087            assert!(result.execution_time_ms.is_some());
3088            assert!(result.rows_scanned.is_some());
3089            assert_eq!(result.rows_scanned.unwrap(), 0);
3090        }
3091    }
3092
3093    #[cfg(feature = "cdc")]
3094    mod cdc_integration {
3095        use super::*;
3096
3097        #[test]
3098        fn test_node_lifecycle_history() {
3099            let db = GrafeoDB::new_in_memory();
3100
3101            // Create
3102            let id = db.create_node(&["Person"]);
3103            // Update
3104            db.set_node_property(id, "name", "Alice".into());
3105            db.set_node_property(id, "name", "Bob".into());
3106            // Delete
3107            db.delete_node(id);
3108
3109            let history = db.history(id).unwrap();
3110            assert_eq!(history.len(), 4); // create + 2 updates + delete
3111            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3112            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3113            assert!(history[1].before.is_none()); // first set_node_property has no prior value
3114            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
3115            assert!(history[2].before.is_some()); // second update has prior "Alice"
3116            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
3117        }
3118
3119        #[test]
3120        fn test_edge_lifecycle_history() {
3121            let db = GrafeoDB::new_in_memory();
3122
3123            let alice = db.create_node(&["Person"]);
3124            let bob = db.create_node(&["Person"]);
3125            let edge = db.create_edge(alice, bob, "KNOWS");
3126            db.set_edge_property(edge, "since", 2024i64.into());
3127            db.delete_edge(edge);
3128
3129            let history = db.history(edge).unwrap();
3130            assert_eq!(history.len(), 3); // create + update + delete
3131            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3132            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
3133            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
3134        }
3135
3136        #[test]
3137        fn test_create_node_with_props_cdc() {
3138            let db = GrafeoDB::new_in_memory();
3139
3140            let id = db.create_node_with_props(
3141                &["Person"],
3142                vec![
3143                    ("name", grafeo_common::types::Value::from("Alice")),
3144                    ("age", grafeo_common::types::Value::from(30i64)),
3145                ],
3146            );
3147
3148            let history = db.history(id).unwrap();
3149            assert_eq!(history.len(), 1);
3150            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
3151            // Props should be captured
3152            let after = history[0].after.as_ref().unwrap();
3153            assert_eq!(after.len(), 2);
3154        }
3155
3156        #[test]
3157        fn test_changes_between() {
3158            let db = GrafeoDB::new_in_memory();
3159
3160            let id1 = db.create_node(&["A"]);
3161            let _id2 = db.create_node(&["B"]);
3162            db.set_node_property(id1, "x", 1i64.into());
3163
3164            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
3165            let changes = db
3166                .changes_between(
3167                    grafeo_common::types::EpochId(0),
3168                    grafeo_common::types::EpochId(u64::MAX),
3169                )
3170                .unwrap();
3171            assert_eq!(changes.len(), 3); // 2 creates + 1 update
3172        }
3173    }
3174}