Skip to main content

grafeo_engine/
database.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4
5use std::path::Path;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10#[cfg(feature = "wal")]
11use grafeo_adapters::storage::wal::{WalConfig, WalManager, WalRecord, WalRecovery};
12use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
13use grafeo_common::utils::error::Result;
14use grafeo_core::graph::lpg::LpgStore;
15#[cfg(feature = "rdf")]
16use grafeo_core::graph::rdf::RdfStore;
17
18use crate::config::Config;
19use crate::query::cache::QueryCache;
20use crate::session::Session;
21use crate::transaction::TransactionManager;
22
23/// Your handle to a Grafeo database.
24///
25/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
26/// quick experiments, or [`open()`](Self::open) for persistent storage.
27/// Then grab a [`session()`](Self::session) to start querying.
28///
29/// # Examples
30///
31/// ```
32/// use grafeo_engine::GrafeoDB;
33///
34/// // Quick in-memory database
35/// let db = GrafeoDB::new_in_memory();
36///
37/// // Add some data
38/// db.create_node(&["Person"]);
39///
40/// // Query it
41/// let session = db.session();
42/// let result = session.execute("MATCH (p:Person) RETURN p")?;
43/// # Ok::<(), grafeo_common::utils::error::Error>(())
44/// ```
45pub struct GrafeoDB {
46    /// Database configuration.
47    config: Config,
48    /// The underlying graph store.
49    store: Arc<LpgStore>,
50    /// RDF triple store (if RDF feature is enabled).
51    #[cfg(feature = "rdf")]
52    rdf_store: Arc<RdfStore>,
53    /// Transaction manager.
54    tx_manager: Arc<TransactionManager>,
55    /// Unified buffer manager.
56    buffer_manager: Arc<BufferManager>,
57    /// Write-ahead log manager (if durability is enabled).
58    #[cfg(feature = "wal")]
59    wal: Option<Arc<WalManager>>,
60    /// Query cache for parsed and optimized plans.
61    query_cache: Arc<QueryCache>,
62    /// Whether the database is open.
63    is_open: RwLock<bool>,
64}
65
66impl GrafeoDB {
67    /// Creates an in-memory database - fast to create, gone when dropped.
68    ///
69    /// Use this for tests, experiments, or when you don't need persistence.
70    /// For data that survives restarts, use [`open()`](Self::open) instead.
71    ///
72    /// # Examples
73    ///
74    /// ```
75    /// use grafeo_engine::GrafeoDB;
76    ///
77    /// let db = GrafeoDB::new_in_memory();
78    /// let session = db.session();
79    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
80    /// # Ok::<(), grafeo_common::utils::error::Error>(())
81    /// ```
82    #[must_use]
83    pub fn new_in_memory() -> Self {
84        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
85    }
86
87    /// Opens a database at the given path, creating it if it doesn't exist.
88    ///
89    /// If you've used this path before, Grafeo recovers your data from the
90    /// write-ahead log automatically. First open on a new path creates an
91    /// empty database.
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if the path isn't writable or recovery fails.
96    ///
97    /// # Examples
98    ///
99    /// ```no_run
100    /// use grafeo_engine::GrafeoDB;
101    ///
102    /// let db = GrafeoDB::open("./my_social_network")?;
103    /// # Ok::<(), grafeo_common::utils::error::Error>(())
104    /// ```
105    #[cfg(feature = "wal")]
106    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
107        Self::with_config(Config::persistent(path.as_ref()))
108    }
109
110    /// Creates a database with custom configuration.
111    ///
112    /// Use this when you need fine-grained control over memory limits,
113    /// thread counts, or persistence settings. For most cases,
114    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
115    /// are simpler.
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if the database can't be created or recovery fails.
120    ///
121    /// # Examples
122    ///
123    /// ```
124    /// use grafeo_engine::{GrafeoDB, Config};
125    ///
126    /// // In-memory with a 512MB limit
127    /// let config = Config::in_memory()
128    ///     .with_memory_limit(512 * 1024 * 1024);
129    ///
130    /// let db = GrafeoDB::with_config(config)?;
131    /// # Ok::<(), grafeo_common::utils::error::Error>(())
132    /// ```
133    pub fn with_config(config: Config) -> Result<Self> {
134        let store = Arc::new(LpgStore::new());
135        #[cfg(feature = "rdf")]
136        let rdf_store = Arc::new(RdfStore::new());
137        let tx_manager = Arc::new(TransactionManager::new());
138
139        // Create buffer manager with configured limits
140        let buffer_config = BufferManagerConfig {
141            budget: config.memory_limit.unwrap_or_else(|| {
142                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
143            }),
144            spill_path: config
145                .spill_path
146                .clone()
147                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
148            ..BufferManagerConfig::default()
149        };
150        let buffer_manager = BufferManager::new(buffer_config);
151
152        // Initialize WAL if persistence is enabled
153        #[cfg(feature = "wal")]
154        let wal = if config.wal_enabled {
155            if let Some(ref db_path) = config.path {
156                // Create database directory if it doesn't exist
157                std::fs::create_dir_all(db_path)?;
158
159                let wal_path = db_path.join("wal");
160
161                // Check if WAL exists and recover if needed
162                if wal_path.exists() {
163                    let recovery = WalRecovery::new(&wal_path);
164                    let records = recovery.recover()?;
165                    Self::apply_wal_records(&store, &records)?;
166                }
167
168                // Open/create WAL manager
169                let wal_config = WalConfig::default();
170                let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
171                Some(Arc::new(wal_manager))
172            } else {
173                None
174            }
175        } else {
176            None
177        };
178
179        // Create query cache with default capacity (1000 queries)
180        let query_cache = Arc::new(QueryCache::default());
181
182        Ok(Self {
183            config,
184            store,
185            #[cfg(feature = "rdf")]
186            rdf_store,
187            tx_manager,
188            buffer_manager,
189            #[cfg(feature = "wal")]
190            wal,
191            query_cache,
192            is_open: RwLock::new(true),
193        })
194    }
195
196    /// Applies WAL records to restore the database state.
197    #[cfg(feature = "wal")]
198    fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
199        for record in records {
200            match record {
201                WalRecord::CreateNode { id, labels } => {
202                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
203                    store.create_node_with_id(*id, &label_refs);
204                }
205                WalRecord::DeleteNode { id } => {
206                    store.delete_node(*id);
207                }
208                WalRecord::CreateEdge {
209                    id,
210                    src,
211                    dst,
212                    edge_type,
213                } => {
214                    store.create_edge_with_id(*id, *src, *dst, edge_type);
215                }
216                WalRecord::DeleteEdge { id } => {
217                    store.delete_edge(*id);
218                }
219                WalRecord::SetNodeProperty { id, key, value } => {
220                    store.set_node_property(*id, key, value.clone());
221                }
222                WalRecord::SetEdgeProperty { id, key, value } => {
223                    store.set_edge_property(*id, key, value.clone());
224                }
225                WalRecord::AddNodeLabel { id, label } => {
226                    store.add_label(*id, label);
227                }
228                WalRecord::RemoveNodeLabel { id, label } => {
229                    store.remove_label(*id, label);
230                }
231                WalRecord::TxCommit { .. }
232                | WalRecord::TxAbort { .. }
233                | WalRecord::Checkpoint { .. } => {
234                    // Transaction control records don't need replay action
235                    // (recovery already filtered to only committed transactions)
236                }
237            }
238        }
239        Ok(())
240    }
241
242    /// Opens a new session for running queries.
243    ///
244    /// Sessions are cheap to create - spin up as many as you need. Each
245    /// gets its own transaction context, so concurrent sessions won't
246    /// block each other on reads.
247    ///
248    /// # Examples
249    ///
250    /// ```
251    /// use grafeo_engine::GrafeoDB;
252    ///
253    /// let db = GrafeoDB::new_in_memory();
254    /// let session = db.session();
255    ///
256    /// // Run queries through the session
257    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
258    /// # Ok::<(), grafeo_common::utils::error::Error>(())
259    /// ```
260    #[must_use]
261    pub fn session(&self) -> Session {
262        #[cfg(feature = "rdf")]
263        {
264            Session::with_rdf_store_and_adaptive(
265                Arc::clone(&self.store),
266                Arc::clone(&self.rdf_store),
267                Arc::clone(&self.tx_manager),
268                Arc::clone(&self.query_cache),
269                self.config.adaptive.clone(),
270                self.config.factorized_execution,
271            )
272        }
273        #[cfg(not(feature = "rdf"))]
274        {
275            Session::with_adaptive(
276                Arc::clone(&self.store),
277                Arc::clone(&self.tx_manager),
278                Arc::clone(&self.query_cache),
279                self.config.adaptive.clone(),
280                self.config.factorized_execution,
281            )
282        }
283    }
284
285    /// Returns the adaptive execution configuration.
286    #[must_use]
287    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
288        &self.config.adaptive
289    }
290
291    /// Runs a query directly on the database.
292    ///
293    /// A convenience method that creates a temporary session behind the
294    /// scenes. If you're running multiple queries, grab a
295    /// [`session()`](Self::session) instead to avoid the overhead.
296    ///
297    /// # Errors
298    ///
299    /// Returns an error if parsing or execution fails.
300    pub fn execute(&self, query: &str) -> Result<QueryResult> {
301        let session = self.session();
302        session.execute(query)
303    }
304
305    /// Executes a query with parameters and returns the result.
306    ///
307    /// # Errors
308    ///
309    /// Returns an error if the query fails.
310    pub fn execute_with_params(
311        &self,
312        query: &str,
313        params: std::collections::HashMap<String, grafeo_common::types::Value>,
314    ) -> Result<QueryResult> {
315        let session = self.session();
316        session.execute_with_params(query, params)
317    }
318
319    /// Executes a Cypher query and returns the result.
320    ///
321    /// # Errors
322    ///
323    /// Returns an error if the query fails.
324    #[cfg(feature = "cypher")]
325    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
326        let session = self.session();
327        session.execute_cypher(query)
328    }
329
330    /// Executes a Cypher query with parameters and returns the result.
331    ///
332    /// # Errors
333    ///
334    /// Returns an error if the query fails.
335    #[cfg(feature = "cypher")]
336    pub fn execute_cypher_with_params(
337        &self,
338        query: &str,
339        params: std::collections::HashMap<String, grafeo_common::types::Value>,
340    ) -> Result<QueryResult> {
341        use crate::query::processor::{QueryLanguage, QueryProcessor};
342
343        // Create processor
344        let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
345        processor.process(query, QueryLanguage::Cypher, Some(&params))
346    }
347
348    /// Executes a Gremlin query and returns the result.
349    ///
350    /// # Errors
351    ///
352    /// Returns an error if the query fails.
353    #[cfg(feature = "gremlin")]
354    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
355        let session = self.session();
356        session.execute_gremlin(query)
357    }
358
359    /// Executes a Gremlin query with parameters and returns the result.
360    ///
361    /// # Errors
362    ///
363    /// Returns an error if the query fails.
364    #[cfg(feature = "gremlin")]
365    pub fn execute_gremlin_with_params(
366        &self,
367        query: &str,
368        params: std::collections::HashMap<String, grafeo_common::types::Value>,
369    ) -> Result<QueryResult> {
370        let session = self.session();
371        session.execute_gremlin_with_params(query, params)
372    }
373
374    /// Executes a GraphQL query and returns the result.
375    ///
376    /// # Errors
377    ///
378    /// Returns an error if the query fails.
379    #[cfg(feature = "graphql")]
380    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
381        let session = self.session();
382        session.execute_graphql(query)
383    }
384
385    /// Executes a GraphQL query with parameters and returns the result.
386    ///
387    /// # Errors
388    ///
389    /// Returns an error if the query fails.
390    #[cfg(feature = "graphql")]
391    pub fn execute_graphql_with_params(
392        &self,
393        query: &str,
394        params: std::collections::HashMap<String, grafeo_common::types::Value>,
395    ) -> Result<QueryResult> {
396        let session = self.session();
397        session.execute_graphql_with_params(query, params)
398    }
399
400    /// Executes a SPARQL query and returns the result.
401    ///
402    /// SPARQL queries operate on the RDF triple store.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if the query fails.
407    ///
408    /// # Examples
409    ///
410    /// ```ignore
411    /// use grafeo_engine::GrafeoDB;
412    ///
413    /// let db = GrafeoDB::new_in_memory();
414    /// let result = db.execute_sparql("SELECT ?s ?p ?o WHERE { ?s ?p ?o }")?;
415    /// ```
416    #[cfg(all(feature = "sparql", feature = "rdf"))]
417    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
418        use crate::query::{
419            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
420        };
421
422        // Parse and translate the SPARQL query to a logical plan
423        let logical_plan = sparql_translator::translate(query)?;
424
425        // Optimize the plan
426        let optimizer = Optimizer::from_store(&self.store);
427        let optimized_plan = optimizer.optimize(logical_plan)?;
428
429        // Convert to physical plan using RDF planner
430        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
431        let mut physical_plan = planner.plan(&optimized_plan)?;
432
433        // Execute the plan
434        let executor = Executor::with_columns(physical_plan.columns.clone());
435        executor.execute(physical_plan.operator.as_mut())
436    }
437
438    /// Returns the RDF store.
439    ///
440    /// This provides direct access to the RDF store for triple operations.
441    #[cfg(feature = "rdf")]
442    #[must_use]
443    pub fn rdf_store(&self) -> &Arc<RdfStore> {
444        &self.rdf_store
445    }
446
447    /// Executes a query and returns a single scalar value.
448    ///
449    /// # Errors
450    ///
451    /// Returns an error if the query fails or doesn't return exactly one row.
452    pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
453        let result = self.execute(query)?;
454        result.scalar()
455    }
456
457    /// Returns the configuration.
458    #[must_use]
459    pub fn config(&self) -> &Config {
460        &self.config
461    }
462
463    /// Returns the underlying store.
464    ///
465    /// This provides direct access to the LPG store for algorithm implementations.
466    #[must_use]
467    pub fn store(&self) -> &Arc<LpgStore> {
468        &self.store
469    }
470
471    /// Returns the buffer manager for memory-aware operations.
472    #[must_use]
473    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
474        &self.buffer_manager
475    }
476
477    /// Closes the database, flushing all pending writes.
478    ///
479    /// For persistent databases, this ensures everything is safely on disk.
480    /// Called automatically when the database is dropped, but you can call
481    /// it explicitly if you need to guarantee durability at a specific point.
482    ///
483    /// # Errors
484    ///
485    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
486    pub fn close(&self) -> Result<()> {
487        let mut is_open = self.is_open.write();
488        if !*is_open {
489            return Ok(());
490        }
491
492        // Commit and checkpoint WAL
493        #[cfg(feature = "wal")]
494        if let Some(ref wal) = self.wal {
495            let epoch = self.store.current_epoch();
496
497            // Use the last assigned transaction ID, or create a checkpoint-only tx
498            let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
499                // No transactions have been started; begin one for checkpoint
500                self.tx_manager.begin()
501            });
502
503            // Log a TxCommit to mark all pending records as committed
504            wal.log(&WalRecord::TxCommit {
505                tx_id: checkpoint_tx,
506            })?;
507
508            // Then checkpoint
509            wal.checkpoint(checkpoint_tx, epoch)?;
510            wal.sync()?;
511        }
512
513        *is_open = false;
514        Ok(())
515    }
516
517    /// Returns the WAL manager if available.
518    #[cfg(feature = "wal")]
519    #[must_use]
520    pub fn wal(&self) -> Option<&Arc<WalManager>> {
521        self.wal.as_ref()
522    }
523
524    /// Logs a WAL record if WAL is enabled.
525    #[cfg(feature = "wal")]
526    fn log_wal(&self, record: &WalRecord) -> Result<()> {
527        if let Some(ref wal) = self.wal {
528            wal.log(record)?;
529        }
530        Ok(())
531    }
532
533    /// Returns the number of nodes in the database.
534    #[must_use]
535    pub fn node_count(&self) -> usize {
536        self.store.node_count()
537    }
538
539    /// Returns the number of edges in the database.
540    #[must_use]
541    pub fn edge_count(&self) -> usize {
542        self.store.edge_count()
543    }
544
545    /// Returns the number of distinct labels in the database.
546    #[must_use]
547    pub fn label_count(&self) -> usize {
548        self.store.label_count()
549    }
550
551    /// Returns the number of distinct property keys in the database.
552    #[must_use]
553    pub fn property_key_count(&self) -> usize {
554        self.store.property_key_count()
555    }
556
557    /// Returns the number of distinct edge types in the database.
558    #[must_use]
559    pub fn edge_type_count(&self) -> usize {
560        self.store.edge_type_count()
561    }
562
563    // === Node Operations ===
564
565    /// Creates a node with the given labels and returns its ID.
566    ///
567    /// Labels categorize nodes - think of them like tags. A node can have
568    /// multiple labels (e.g., `["Person", "Employee"]`).
569    ///
570    /// # Examples
571    ///
572    /// ```
573    /// use grafeo_engine::GrafeoDB;
574    ///
575    /// let db = GrafeoDB::new_in_memory();
576    /// let alice = db.create_node(&["Person"]);
577    /// let company = db.create_node(&["Company", "Startup"]);
578    /// ```
579    pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
580        let id = self.store.create_node(labels);
581
582        // Log to WAL if enabled
583        #[cfg(feature = "wal")]
584        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
585            id,
586            labels: labels.iter().map(|s| s.to_string()).collect(),
587        }) {
588            tracing::warn!("Failed to log CreateNode to WAL: {}", e);
589        }
590
591        id
592    }
593
594    /// Creates a new node with labels and properties.
595    ///
596    /// If WAL is enabled, the operation is logged for durability.
597    pub fn create_node_with_props(
598        &self,
599        labels: &[&str],
600        properties: impl IntoIterator<
601            Item = (
602                impl Into<grafeo_common::types::PropertyKey>,
603                impl Into<grafeo_common::types::Value>,
604            ),
605        >,
606    ) -> grafeo_common::types::NodeId {
607        // Collect properties first so we can log them to WAL
608        let props: Vec<(
609            grafeo_common::types::PropertyKey,
610            grafeo_common::types::Value,
611        )> = properties
612            .into_iter()
613            .map(|(k, v)| (k.into(), v.into()))
614            .collect();
615
616        let id = self
617            .store
618            .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
619
620        // Log node creation to WAL
621        #[cfg(feature = "wal")]
622        {
623            if let Err(e) = self.log_wal(&WalRecord::CreateNode {
624                id,
625                labels: labels.iter().map(|s| s.to_string()).collect(),
626            }) {
627                tracing::warn!("Failed to log CreateNode to WAL: {}", e);
628            }
629
630            // Log each property to WAL for full durability
631            for (key, value) in props {
632                if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
633                    id,
634                    key: key.to_string(),
635                    value,
636                }) {
637                    tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
638                }
639            }
640        }
641
642        id
643    }
644
645    /// Gets a node by ID.
646    #[must_use]
647    pub fn get_node(
648        &self,
649        id: grafeo_common::types::NodeId,
650    ) -> Option<grafeo_core::graph::lpg::Node> {
651        self.store.get_node(id)
652    }
653
654    /// Deletes a node and all its edges.
655    ///
656    /// If WAL is enabled, the operation is logged for durability.
657    pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
658        let result = self.store.delete_node(id);
659
660        #[cfg(feature = "wal")]
661        if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
662            tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
663        }
664
665        result
666    }
667
668    /// Sets a property on a node.
669    ///
670    /// If WAL is enabled, the operation is logged for durability.
671    pub fn set_node_property(
672        &self,
673        id: grafeo_common::types::NodeId,
674        key: &str,
675        value: grafeo_common::types::Value,
676    ) {
677        // Log to WAL first
678        #[cfg(feature = "wal")]
679        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
680            id,
681            key: key.to_string(),
682            value: value.clone(),
683        }) {
684            tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
685        }
686
687        self.store.set_node_property(id, key, value);
688    }
689
690    /// Adds a label to an existing node.
691    ///
692    /// Returns `true` if the label was added, `false` if the node doesn't exist
693    /// or already has the label.
694    ///
695    /// # Examples
696    ///
697    /// ```
698    /// use grafeo_engine::GrafeoDB;
699    ///
700    /// let db = GrafeoDB::new_in_memory();
701    /// let alice = db.create_node(&["Person"]);
702    ///
703    /// // Promote Alice to Employee
704    /// let added = db.add_node_label(alice, "Employee");
705    /// assert!(added);
706    /// ```
707    pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
708        let result = self.store.add_label(id, label);
709
710        #[cfg(feature = "wal")]
711        if result {
712            // Log to WAL if enabled
713            if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
714                id,
715                label: label.to_string(),
716            }) {
717                tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
718            }
719        }
720
721        result
722    }
723
724    /// Removes a label from a node.
725    ///
726    /// Returns `true` if the label was removed, `false` if the node doesn't exist
727    /// or doesn't have the label.
728    ///
729    /// # Examples
730    ///
731    /// ```
732    /// use grafeo_engine::GrafeoDB;
733    ///
734    /// let db = GrafeoDB::new_in_memory();
735    /// let alice = db.create_node(&["Person", "Employee"]);
736    ///
737    /// // Remove Employee status
738    /// let removed = db.remove_node_label(alice, "Employee");
739    /// assert!(removed);
740    /// ```
741    pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
742        let result = self.store.remove_label(id, label);
743
744        #[cfg(feature = "wal")]
745        if result {
746            // Log to WAL if enabled
747            if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
748                id,
749                label: label.to_string(),
750            }) {
751                tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
752            }
753        }
754
755        result
756    }
757
758    /// Gets all labels for a node.
759    ///
760    /// Returns `None` if the node doesn't exist.
761    ///
762    /// # Examples
763    ///
764    /// ```
765    /// use grafeo_engine::GrafeoDB;
766    ///
767    /// let db = GrafeoDB::new_in_memory();
768    /// let alice = db.create_node(&["Person", "Employee"]);
769    ///
770    /// let labels = db.get_node_labels(alice).unwrap();
771    /// assert!(labels.contains(&"Person".to_string()));
772    /// assert!(labels.contains(&"Employee".to_string()));
773    /// ```
774    #[must_use]
775    pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
776        self.store
777            .get_node(id)
778            .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
779    }
780
781    // === Edge Operations ===
782
783    /// Creates an edge (relationship) between two nodes.
784    ///
785    /// Edges connect nodes and have a type that describes the relationship.
786    /// They're directed - the order of `src` and `dst` matters.
787    ///
788    /// # Examples
789    ///
790    /// ```
791    /// use grafeo_engine::GrafeoDB;
792    ///
793    /// let db = GrafeoDB::new_in_memory();
794    /// let alice = db.create_node(&["Person"]);
795    /// let bob = db.create_node(&["Person"]);
796    ///
797    /// // Alice knows Bob (directed: Alice -> Bob)
798    /// let edge = db.create_edge(alice, bob, "KNOWS");
799    /// ```
800    pub fn create_edge(
801        &self,
802        src: grafeo_common::types::NodeId,
803        dst: grafeo_common::types::NodeId,
804        edge_type: &str,
805    ) -> grafeo_common::types::EdgeId {
806        let id = self.store.create_edge(src, dst, edge_type);
807
808        // Log to WAL if enabled
809        #[cfg(feature = "wal")]
810        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
811            id,
812            src,
813            dst,
814            edge_type: edge_type.to_string(),
815        }) {
816            tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
817        }
818
819        id
820    }
821
822    /// Creates a new edge with properties.
823    ///
824    /// If WAL is enabled, the operation is logged for durability.
825    pub fn create_edge_with_props(
826        &self,
827        src: grafeo_common::types::NodeId,
828        dst: grafeo_common::types::NodeId,
829        edge_type: &str,
830        properties: impl IntoIterator<
831            Item = (
832                impl Into<grafeo_common::types::PropertyKey>,
833                impl Into<grafeo_common::types::Value>,
834            ),
835        >,
836    ) -> grafeo_common::types::EdgeId {
837        // Collect properties first so we can log them to WAL
838        let props: Vec<(
839            grafeo_common::types::PropertyKey,
840            grafeo_common::types::Value,
841        )> = properties
842            .into_iter()
843            .map(|(k, v)| (k.into(), v.into()))
844            .collect();
845
846        let id = self.store.create_edge_with_props(
847            src,
848            dst,
849            edge_type,
850            props.iter().map(|(k, v)| (k.clone(), v.clone())),
851        );
852
853        // Log edge creation to WAL
854        #[cfg(feature = "wal")]
855        {
856            if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
857                id,
858                src,
859                dst,
860                edge_type: edge_type.to_string(),
861            }) {
862                tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
863            }
864
865            // Log each property to WAL for full durability
866            for (key, value) in props {
867                if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
868                    id,
869                    key: key.to_string(),
870                    value,
871                }) {
872                    tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
873                }
874            }
875        }
876
877        id
878    }
879
880    /// Gets an edge by ID.
881    #[must_use]
882    pub fn get_edge(
883        &self,
884        id: grafeo_common::types::EdgeId,
885    ) -> Option<grafeo_core::graph::lpg::Edge> {
886        self.store.get_edge(id)
887    }
888
889    /// Deletes an edge.
890    ///
891    /// If WAL is enabled, the operation is logged for durability.
892    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
893        let result = self.store.delete_edge(id);
894
895        #[cfg(feature = "wal")]
896        if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
897            tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
898        }
899
900        result
901    }
902
903    /// Sets a property on an edge.
904    ///
905    /// If WAL is enabled, the operation is logged for durability.
906    pub fn set_edge_property(
907        &self,
908        id: grafeo_common::types::EdgeId,
909        key: &str,
910        value: grafeo_common::types::Value,
911    ) {
912        // Log to WAL first
913        #[cfg(feature = "wal")]
914        if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
915            id,
916            key: key.to_string(),
917            value: value.clone(),
918        }) {
919            tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
920        }
921        self.store.set_edge_property(id, key, value);
922    }
923
924    /// Removes a property from a node.
925    ///
926    /// Returns true if the property existed and was removed, false otherwise.
927    pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
928        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
929        self.store.remove_node_property(id, key).is_some()
930    }
931
932    /// Removes a property from an edge.
933    ///
934    /// Returns true if the property existed and was removed, false otherwise.
935    pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
936        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
937        self.store.remove_edge_property(id, key).is_some()
938    }
939
940    // =========================================================================
941    // PROPERTY INDEX API
942    // =========================================================================
943
944    /// Creates an index on a node property for O(1) lookups by value.
945    ///
946    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
947    /// O(1) instead of O(n) for this property. The index is automatically
948    /// maintained when properties are set or removed.
949    ///
950    /// # Example
951    ///
952    /// ```ignore
953    /// // Create an index on the 'email' property
954    /// db.create_property_index("email");
955    ///
956    /// // Now lookups by email are O(1)
957    /// let nodes = db.find_nodes_by_property("email", &Value::from("alice@example.com"));
958    /// ```
959    pub fn create_property_index(&self, property: &str) {
960        self.store.create_property_index(property);
961    }
962
963    /// Creates a vector similarity index on a node property.
964    ///
965    /// This enables efficient approximate nearest-neighbor search on vector
966    /// properties. Currently validates the index parameters and scans existing
967    /// nodes to verify the property contains vectors of the expected dimensions.
968    ///
969    /// # Arguments
970    ///
971    /// * `label` - Node label to index (e.g., `"Doc"`)
972    /// * `property` - Property containing vector embeddings (e.g., `"embedding"`)
973    /// * `dimensions` - Expected vector dimensions (inferred from data if `None`)
974    /// * `metric` - Distance metric: `"cosine"` (default), `"euclidean"`, `"dot_product"`, `"manhattan"`
975    /// * `m` - HNSW links per node (default: 16). Higher = better recall, more memory.
976    /// * `ef_construction` - Construction beam width (default: 128). Higher = better index quality, slower build.
977    ///
978    /// # Errors
979    ///
980    /// Returns an error if the metric is invalid, no vectors are found, or
981    /// dimensions don't match.
982    pub fn create_vector_index(
983        &self,
984        label: &str,
985        property: &str,
986        dimensions: Option<usize>,
987        metric: Option<&str>,
988        m: Option<usize>,
989        ef_construction: Option<usize>,
990    ) -> Result<()> {
991        use grafeo_common::types::{PropertyKey, Value};
992        use grafeo_core::index::vector::DistanceMetric;
993
994        let metric = match metric {
995            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
996                grafeo_common::utils::error::Error::Internal(format!(
997                    "Unknown distance metric '{}'. Use: cosine, euclidean, dot_product, manhattan",
998                    m
999                ))
1000            })?,
1001            None => DistanceMetric::Cosine,
1002        };
1003
1004        // Scan nodes to validate vectors exist and check dimensions
1005        let prop_key = PropertyKey::new(property);
1006        let mut found_dims: Option<usize> = dimensions;
1007        let mut vector_count = 0usize;
1008
1009        #[cfg(feature = "vector-index")]
1010        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1011
1012        for node in self.store.nodes_with_label(label) {
1013            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1014                if let Some(expected) = found_dims {
1015                    if v.len() != expected {
1016                        return Err(grafeo_common::utils::error::Error::Internal(format!(
1017                            "Vector dimension mismatch: expected {}, found {} on node {}",
1018                            expected,
1019                            v.len(),
1020                            node.id.0
1021                        )));
1022                    }
1023                } else {
1024                    found_dims = Some(v.len());
1025                }
1026                vector_count += 1;
1027                #[cfg(feature = "vector-index")]
1028                vectors.push((node.id, v.to_vec()));
1029            }
1030        }
1031
1032        if vector_count == 0 {
1033            return Err(grafeo_common::utils::error::Error::Internal(format!(
1034                "No vector properties found on :{label}({property})"
1035            )));
1036        }
1037
1038        let dims = found_dims.unwrap_or(0);
1039
1040        // Build and populate the HNSW index
1041        #[cfg(feature = "vector-index")]
1042        {
1043            use grafeo_core::index::vector::{HnswConfig, HnswIndex};
1044
1045            let mut config = HnswConfig::new(dims, metric);
1046            if let Some(m_val) = m {
1047                config = config.with_m(m_val);
1048            }
1049            if let Some(ef_c) = ef_construction {
1050                config = config.with_ef_construction(ef_c);
1051            }
1052
1053            let index = HnswIndex::with_capacity(config, vectors.len());
1054            for (node_id, vec) in &vectors {
1055                index.insert(*node_id, vec);
1056            }
1057
1058            self.store
1059                .add_vector_index(label, property, Arc::new(index));
1060        }
1061
1062        // Suppress unused variable warnings when vector-index is off
1063        let _ = (m, ef_construction);
1064
1065        tracing::info!(
1066            "Vector index created: :{label}({property}) - {vector_count} vectors, {dims} dimensions, metric={metric_name}",
1067            metric_name = metric.name()
1068        );
1069
1070        Ok(())
1071    }
1072
1073    /// Searches for the k nearest neighbors of a query vector.
1074    ///
1075    /// Uses the HNSW index created by [`create_vector_index`](Self::create_vector_index).
1076    ///
1077    /// # Arguments
1078    ///
1079    /// * `label` - Node label that was indexed
1080    /// * `property` - Property that was indexed
1081    /// * `query` - Query vector (slice of floats)
1082    /// * `k` - Number of nearest neighbors to return
1083    /// * `ef` - Search beam width (higher = better recall, slower). Uses index default if `None`.
1084    ///
1085    /// # Returns
1086    ///
1087    /// Vector of `(NodeId, distance)` pairs sorted by distance ascending.
1088    #[cfg(feature = "vector-index")]
1089    pub fn vector_search(
1090        &self,
1091        label: &str,
1092        property: &str,
1093        query: &[f32],
1094        k: usize,
1095        ef: Option<usize>,
1096    ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1097        let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1098            grafeo_common::utils::error::Error::Internal(format!(
1099                "No vector index found for :{label}({property}). Call create_vector_index() first."
1100            ))
1101        })?;
1102
1103        let results = match ef {
1104            Some(ef_val) => index.search_with_ef(query, k, ef_val),
1105            None => index.search(query, k),
1106        };
1107
1108        Ok(results)
1109    }
1110
1111    /// Creates multiple nodes in bulk, each with a single vector property.
1112    ///
1113    /// Much faster than individual `create_node_with_props` calls because it
1114    /// acquires internal locks once and loops in Rust rather than crossing
1115    /// the FFI boundary per vector.
1116    ///
1117    /// # Arguments
1118    ///
1119    /// * `label` - Label applied to all created nodes
1120    /// * `property` - Property name for the vector data
1121    /// * `vectors` - Vector data for each node
1122    ///
1123    /// # Returns
1124    ///
1125    /// Vector of created `NodeId`s in the same order as the input vectors.
1126    pub fn batch_create_nodes(
1127        &self,
1128        label: &str,
1129        property: &str,
1130        vectors: Vec<Vec<f32>>,
1131    ) -> Vec<grafeo_common::types::NodeId> {
1132        use grafeo_common::types::{PropertyKey, Value};
1133
1134        let prop_key = PropertyKey::new(property);
1135        let labels: &[&str] = &[label];
1136
1137        vectors
1138            .into_iter()
1139            .map(|vec| {
1140                let value = Value::Vector(vec.into());
1141                let id = self.store.create_node_with_props(
1142                    labels,
1143                    std::iter::once((prop_key.clone(), value.clone())),
1144                );
1145
1146                // Log to WAL
1147                #[cfg(feature = "wal")]
1148                {
1149                    if let Err(e) = self.log_wal(&WalRecord::CreateNode {
1150                        id,
1151                        labels: labels.iter().map(|s| s.to_string()).collect(),
1152                    }) {
1153                        tracing::warn!("Failed to log CreateNode to WAL: {}", e);
1154                    }
1155                    if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
1156                        id,
1157                        key: property.to_string(),
1158                        value,
1159                    }) {
1160                        tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
1161                    }
1162                }
1163
1164                id
1165            })
1166            .collect()
1167    }
1168
1169    /// Searches for nearest neighbors for multiple query vectors in parallel.
1170    ///
1171    /// Uses rayon parallel iteration under the hood for multi-core throughput.
1172    ///
1173    /// # Arguments
1174    ///
1175    /// * `label` - Node label that was indexed
1176    /// * `property` - Property that was indexed
1177    /// * `queries` - Batch of query vectors
1178    /// * `k` - Number of nearest neighbors per query
1179    /// * `ef` - Search beam width (uses index default if `None`)
1180    #[cfg(feature = "vector-index")]
1181    pub fn batch_vector_search(
1182        &self,
1183        label: &str,
1184        property: &str,
1185        queries: &[Vec<f32>],
1186        k: usize,
1187        ef: Option<usize>,
1188    ) -> Result<Vec<Vec<(grafeo_common::types::NodeId, f32)>>> {
1189        let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1190            grafeo_common::utils::error::Error::Internal(format!(
1191                "No vector index found for :{label}({property}). Call create_vector_index() first."
1192            ))
1193        })?;
1194
1195        let results = match ef {
1196            Some(ef_val) => index.batch_search_with_ef(queries, k, ef_val),
1197            None => index.batch_search(queries, k),
1198        };
1199
1200        Ok(results)
1201    }
1202
1203    /// Drops an index on a node property.
1204    ///
1205    /// Returns `true` if the index existed and was removed.
1206    pub fn drop_property_index(&self, property: &str) -> bool {
1207        self.store.drop_property_index(property)
1208    }
1209
1210    /// Returns `true` if the property has an index.
1211    #[must_use]
1212    pub fn has_property_index(&self, property: &str) -> bool {
1213        self.store.has_property_index(property)
1214    }
1215
1216    /// Finds all nodes that have a specific property value.
1217    ///
1218    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
1219    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
1220    ///
1221    /// # Example
1222    ///
1223    /// ```ignore
1224    /// // Create index for fast lookups (optional but recommended)
1225    /// db.create_property_index("city");
1226    ///
1227    /// // Find all nodes where city = "NYC"
1228    /// let nyc_nodes = db.find_nodes_by_property("city", &Value::from("NYC"));
1229    /// ```
1230    #[must_use]
1231    pub fn find_nodes_by_property(
1232        &self,
1233        property: &str,
1234        value: &grafeo_common::types::Value,
1235    ) -> Vec<grafeo_common::types::NodeId> {
1236        self.store.find_nodes_by_property(property, value)
1237    }
1238
1239    // =========================================================================
1240    // ADMIN API: Introspection
1241    // =========================================================================
1242
1243    /// Returns true if this database is backed by a file (persistent).
1244    ///
1245    /// In-memory databases return false.
1246    #[must_use]
1247    pub fn is_persistent(&self) -> bool {
1248        self.config.path.is_some()
1249    }
1250
1251    /// Returns the database file path, if persistent.
1252    ///
1253    /// In-memory databases return None.
1254    #[must_use]
1255    pub fn path(&self) -> Option<&Path> {
1256        self.config.path.as_deref()
1257    }
1258
1259    /// Returns high-level database information.
1260    ///
1261    /// Includes node/edge counts, persistence status, and mode (LPG/RDF).
1262    #[must_use]
1263    pub fn info(&self) -> crate::admin::DatabaseInfo {
1264        crate::admin::DatabaseInfo {
1265            mode: crate::admin::DatabaseMode::Lpg,
1266            node_count: self.store.node_count(),
1267            edge_count: self.store.edge_count(),
1268            is_persistent: self.is_persistent(),
1269            path: self.config.path.clone(),
1270            wal_enabled: self.config.wal_enabled,
1271            version: env!("CARGO_PKG_VERSION").to_string(),
1272        }
1273    }
1274
1275    /// Returns detailed database statistics.
1276    ///
1277    /// Includes counts, memory usage, and index information.
1278    #[must_use]
1279    pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1280        #[cfg(feature = "wal")]
1281        let disk_bytes = self.config.path.as_ref().and_then(|p| {
1282            if p.exists() {
1283                Self::calculate_disk_usage(p).ok()
1284            } else {
1285                None
1286            }
1287        });
1288        #[cfg(not(feature = "wal"))]
1289        let disk_bytes: Option<usize> = None;
1290
1291        crate::admin::DatabaseStats {
1292            node_count: self.store.node_count(),
1293            edge_count: self.store.edge_count(),
1294            label_count: self.store.label_count(),
1295            edge_type_count: self.store.edge_type_count(),
1296            property_key_count: self.store.property_key_count(),
1297            index_count: 0, // TODO: implement index tracking
1298            memory_bytes: self.buffer_manager.allocated(),
1299            disk_bytes,
1300        }
1301    }
1302
1303    /// Calculates total disk usage for the database directory.
1304    #[cfg(feature = "wal")]
1305    fn calculate_disk_usage(path: &Path) -> Result<usize> {
1306        let mut total = 0usize;
1307        if path.is_dir() {
1308            for entry in std::fs::read_dir(path)? {
1309                let entry = entry?;
1310                let metadata = entry.metadata()?;
1311                if metadata.is_file() {
1312                    total += metadata.len() as usize;
1313                } else if metadata.is_dir() {
1314                    total += Self::calculate_disk_usage(&entry.path())?;
1315                }
1316            }
1317        }
1318        Ok(total)
1319    }
1320
1321    /// Returns schema information (labels, edge types, property keys).
1322    ///
1323    /// For LPG mode, returns label and edge type information.
1324    /// For RDF mode, returns predicate and named graph information.
1325    #[must_use]
1326    pub fn schema(&self) -> crate::admin::SchemaInfo {
1327        let labels = self
1328            .store
1329            .all_labels()
1330            .into_iter()
1331            .map(|name| crate::admin::LabelInfo {
1332                name: name.clone(),
1333                count: self.store.nodes_with_label(&name).count(),
1334            })
1335            .collect();
1336
1337        let edge_types = self
1338            .store
1339            .all_edge_types()
1340            .into_iter()
1341            .map(|name| crate::admin::EdgeTypeInfo {
1342                name: name.clone(),
1343                count: self.store.edges_with_type(&name).count(),
1344            })
1345            .collect();
1346
1347        let property_keys = self.store.all_property_keys();
1348
1349        crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
1350            labels,
1351            edge_types,
1352            property_keys,
1353        })
1354    }
1355
1356    /// Returns RDF schema information.
1357    ///
1358    /// Only available when the RDF feature is enabled.
1359    #[cfg(feature = "rdf")]
1360    #[must_use]
1361    pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
1362        let stats = self.rdf_store.stats();
1363
1364        let predicates = self
1365            .rdf_store
1366            .predicates()
1367            .into_iter()
1368            .map(|predicate| {
1369                let count = self.rdf_store.triples_with_predicate(&predicate).len();
1370                crate::admin::PredicateInfo {
1371                    iri: predicate.to_string(),
1372                    count,
1373                }
1374            })
1375            .collect();
1376
1377        crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
1378            predicates,
1379            named_graphs: Vec::new(), // Named graphs not yet implemented in RdfStore
1380            subject_count: stats.subject_count,
1381            object_count: stats.object_count,
1382        })
1383    }
1384
1385    /// Validates database integrity.
1386    ///
1387    /// Checks for:
1388    /// - Dangling edge references (edges pointing to non-existent nodes)
1389    /// - Internal index consistency
1390    ///
1391    /// Returns a list of errors and warnings. Empty errors = valid.
1392    #[must_use]
1393    pub fn validate(&self) -> crate::admin::ValidationResult {
1394        let mut result = crate::admin::ValidationResult::default();
1395
1396        // Check for dangling edge references
1397        for edge in self.store.all_edges() {
1398            if self.store.get_node(edge.src).is_none() {
1399                result.errors.push(crate::admin::ValidationError {
1400                    code: "DANGLING_SRC".to_string(),
1401                    message: format!(
1402                        "Edge {} references non-existent source node {}",
1403                        edge.id.0, edge.src.0
1404                    ),
1405                    context: Some(format!("edge:{}", edge.id.0)),
1406                });
1407            }
1408            if self.store.get_node(edge.dst).is_none() {
1409                result.errors.push(crate::admin::ValidationError {
1410                    code: "DANGLING_DST".to_string(),
1411                    message: format!(
1412                        "Edge {} references non-existent destination node {}",
1413                        edge.id.0, edge.dst.0
1414                    ),
1415                    context: Some(format!("edge:{}", edge.id.0)),
1416                });
1417            }
1418        }
1419
1420        // Add warnings for potential issues
1421        if self.store.node_count() > 0 && self.store.edge_count() == 0 {
1422            result.warnings.push(crate::admin::ValidationWarning {
1423                code: "NO_EDGES".to_string(),
1424                message: "Database has nodes but no edges".to_string(),
1425                context: None,
1426            });
1427        }
1428
1429        result
1430    }
1431
1432    /// Returns WAL (Write-Ahead Log) status.
1433    ///
1434    /// Returns None if WAL is not enabled.
1435    #[must_use]
1436    pub fn wal_status(&self) -> crate::admin::WalStatus {
1437        #[cfg(feature = "wal")]
1438        if let Some(ref wal) = self.wal {
1439            return crate::admin::WalStatus {
1440                enabled: true,
1441                path: self.config.path.as_ref().map(|p| p.join("wal")),
1442                size_bytes: wal.size_bytes(),
1443                record_count: wal.record_count() as usize,
1444                last_checkpoint: wal.last_checkpoint_timestamp(),
1445                current_epoch: self.store.current_epoch().as_u64(),
1446            };
1447        }
1448
1449        crate::admin::WalStatus {
1450            enabled: false,
1451            path: None,
1452            size_bytes: 0,
1453            record_count: 0,
1454            last_checkpoint: None,
1455            current_epoch: self.store.current_epoch().as_u64(),
1456        }
1457    }
1458
1459    /// Forces a WAL checkpoint.
1460    ///
1461    /// Flushes all pending WAL records to the main storage.
1462    ///
1463    /// # Errors
1464    ///
1465    /// Returns an error if the checkpoint fails.
1466    pub fn wal_checkpoint(&self) -> Result<()> {
1467        #[cfg(feature = "wal")]
1468        if let Some(ref wal) = self.wal {
1469            let epoch = self.store.current_epoch();
1470            let tx_id = self
1471                .tx_manager
1472                .last_assigned_tx_id()
1473                .unwrap_or_else(|| self.tx_manager.begin());
1474            wal.checkpoint(tx_id, epoch)?;
1475            wal.sync()?;
1476        }
1477        Ok(())
1478    }
1479
1480    // =========================================================================
1481    // ADMIN API: Persistence Control
1482    // =========================================================================
1483
1484    /// Saves the database to a file path.
1485    ///
1486    /// - If in-memory: creates a new persistent database at path
1487    /// - If file-backed: creates a copy at the new path
1488    ///
1489    /// The original database remains unchanged.
1490    ///
1491    /// # Errors
1492    ///
1493    /// Returns an error if the save operation fails.
1494    ///
1495    /// Requires the `wal` feature for persistence support.
1496    #[cfg(feature = "wal")]
1497    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
1498        let path = path.as_ref();
1499
1500        // Create target database with WAL enabled
1501        let target_config = Config::persistent(path);
1502        let target = Self::with_config(target_config)?;
1503
1504        // Copy all nodes using WAL-enabled methods
1505        for node in self.store.all_nodes() {
1506            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1507            target.store.create_node_with_id(node.id, &label_refs);
1508
1509            // Log to WAL
1510            target.log_wal(&WalRecord::CreateNode {
1511                id: node.id,
1512                labels: node.labels.iter().map(|s| s.to_string()).collect(),
1513            })?;
1514
1515            // Copy properties
1516            for (key, value) in node.properties {
1517                target
1518                    .store
1519                    .set_node_property(node.id, key.as_str(), value.clone());
1520                target.log_wal(&WalRecord::SetNodeProperty {
1521                    id: node.id,
1522                    key: key.to_string(),
1523                    value,
1524                })?;
1525            }
1526        }
1527
1528        // Copy all edges using WAL-enabled methods
1529        for edge in self.store.all_edges() {
1530            target
1531                .store
1532                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1533
1534            // Log to WAL
1535            target.log_wal(&WalRecord::CreateEdge {
1536                id: edge.id,
1537                src: edge.src,
1538                dst: edge.dst,
1539                edge_type: edge.edge_type.to_string(),
1540            })?;
1541
1542            // Copy properties
1543            for (key, value) in edge.properties {
1544                target
1545                    .store
1546                    .set_edge_property(edge.id, key.as_str(), value.clone());
1547                target.log_wal(&WalRecord::SetEdgeProperty {
1548                    id: edge.id,
1549                    key: key.to_string(),
1550                    value,
1551                })?;
1552            }
1553        }
1554
1555        // Checkpoint and close the target database
1556        target.close()?;
1557
1558        Ok(())
1559    }
1560
1561    /// Creates an in-memory copy of this database.
1562    ///
1563    /// Returns a new database that is completely independent.
1564    /// Useful for:
1565    /// - Testing modifications without affecting the original
1566    /// - Faster operations when persistence isn't needed
1567    ///
1568    /// # Errors
1569    ///
1570    /// Returns an error if the copy operation fails.
1571    pub fn to_memory(&self) -> Result<Self> {
1572        let config = Config::in_memory();
1573        let target = Self::with_config(config)?;
1574
1575        // Copy all nodes
1576        for node in self.store.all_nodes() {
1577            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1578            target.store.create_node_with_id(node.id, &label_refs);
1579
1580            // Copy properties
1581            for (key, value) in node.properties {
1582                target.store.set_node_property(node.id, key.as_str(), value);
1583            }
1584        }
1585
1586        // Copy all edges
1587        for edge in self.store.all_edges() {
1588            target
1589                .store
1590                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1591
1592            // Copy properties
1593            for (key, value) in edge.properties {
1594                target.store.set_edge_property(edge.id, key.as_str(), value);
1595            }
1596        }
1597
1598        Ok(target)
1599    }
1600
1601    /// Opens a database file and loads it entirely into memory.
1602    ///
1603    /// The returned database has no connection to the original file.
1604    /// Changes will NOT be written back to the file.
1605    ///
1606    /// # Errors
1607    ///
1608    /// Returns an error if the file can't be opened or loaded.
1609    #[cfg(feature = "wal")]
1610    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
1611        // Open the source database (triggers WAL recovery)
1612        let source = Self::open(path)?;
1613
1614        // Create in-memory copy
1615        let target = source.to_memory()?;
1616
1617        // Close the source (releases file handles)
1618        source.close()?;
1619
1620        Ok(target)
1621    }
1622
1623    // =========================================================================
1624    // ADMIN API: Iteration
1625    // =========================================================================
1626
1627    /// Returns an iterator over all nodes in the database.
1628    ///
1629    /// Useful for dump/export operations.
1630    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1631        self.store.all_nodes()
1632    }
1633
1634    /// Returns an iterator over all edges in the database.
1635    ///
1636    /// Useful for dump/export operations.
1637    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1638        self.store.all_edges()
1639    }
1640}
1641
1642impl Drop for GrafeoDB {
1643    fn drop(&mut self) {
1644        if let Err(e) = self.close() {
1645            tracing::error!("Error closing database: {}", e);
1646        }
1647    }
1648}
1649
1650/// The result of running a query.
1651///
1652/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1653/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1654///
1655/// # Examples
1656///
1657/// ```
1658/// use grafeo_engine::GrafeoDB;
1659///
1660/// let db = GrafeoDB::new_in_memory();
1661/// db.create_node(&["Person"]);
1662///
1663/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1664///
1665/// // Check what we got
1666/// println!("Columns: {:?}", result.columns);
1667/// println!("Rows: {}", result.row_count());
1668///
1669/// // Iterate through results
1670/// for row in result.iter() {
1671///     println!("{:?}", row);
1672/// }
1673/// # Ok::<(), grafeo_common::utils::error::Error>(())
1674/// ```
1675#[derive(Debug)]
1676pub struct QueryResult {
1677    /// Column names from the RETURN clause.
1678    pub columns: Vec<String>,
1679    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1680    pub column_types: Vec<grafeo_common::types::LogicalType>,
1681    /// The actual result rows.
1682    pub rows: Vec<Vec<grafeo_common::types::Value>>,
1683    /// Query execution time in milliseconds (if timing was enabled).
1684    pub execution_time_ms: Option<f64>,
1685    /// Number of rows scanned during query execution (estimate).
1686    pub rows_scanned: Option<u64>,
1687}
1688
1689impl QueryResult {
1690    /// Creates a new empty query result.
1691    #[must_use]
1692    pub fn new(columns: Vec<String>) -> Self {
1693        let len = columns.len();
1694        Self {
1695            columns,
1696            column_types: vec![grafeo_common::types::LogicalType::Any; len],
1697            rows: Vec::new(),
1698            execution_time_ms: None,
1699            rows_scanned: None,
1700        }
1701    }
1702
1703    /// Creates a new empty query result with column types.
1704    #[must_use]
1705    pub fn with_types(
1706        columns: Vec<String>,
1707        column_types: Vec<grafeo_common::types::LogicalType>,
1708    ) -> Self {
1709        Self {
1710            columns,
1711            column_types,
1712            rows: Vec::new(),
1713            execution_time_ms: None,
1714            rows_scanned: None,
1715        }
1716    }
1717
1718    /// Sets the execution metrics on this result.
1719    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1720        self.execution_time_ms = Some(execution_time_ms);
1721        self.rows_scanned = Some(rows_scanned);
1722        self
1723    }
1724
1725    /// Returns the execution time in milliseconds, if available.
1726    #[must_use]
1727    pub fn execution_time_ms(&self) -> Option<f64> {
1728        self.execution_time_ms
1729    }
1730
1731    /// Returns the number of rows scanned, if available.
1732    #[must_use]
1733    pub fn rows_scanned(&self) -> Option<u64> {
1734        self.rows_scanned
1735    }
1736
1737    /// Returns the number of rows.
1738    #[must_use]
1739    pub fn row_count(&self) -> usize {
1740        self.rows.len()
1741    }
1742
1743    /// Returns the number of columns.
1744    #[must_use]
1745    pub fn column_count(&self) -> usize {
1746        self.columns.len()
1747    }
1748
1749    /// Returns true if the result is empty.
1750    #[must_use]
1751    pub fn is_empty(&self) -> bool {
1752        self.rows.is_empty()
1753    }
1754
1755    /// Extracts a single value from the result.
1756    ///
1757    /// Use this when your query returns exactly one row with one column,
1758    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1759    ///
1760    /// # Errors
1761    ///
1762    /// Returns an error if the result has multiple rows or columns.
1763    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1764        if self.rows.len() != 1 || self.columns.len() != 1 {
1765            return Err(grafeo_common::utils::error::Error::InvalidValue(
1766                "Expected single value".to_string(),
1767            ));
1768        }
1769        T::from_value(&self.rows[0][0])
1770    }
1771
1772    /// Returns an iterator over the rows.
1773    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1774        self.rows.iter()
1775    }
1776}
1777
1778/// Converts a [`Value`](grafeo_common::types::Value) to a concrete Rust type.
1779///
1780/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1781/// Used by [`QueryResult::scalar()`] to extract typed values.
1782pub trait FromValue: Sized {
1783    /// Attempts the conversion, returning an error on type mismatch.
1784    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1785}
1786
1787impl FromValue for i64 {
1788    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1789        value
1790            .as_int64()
1791            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1792                expected: "INT64".to_string(),
1793                found: value.type_name().to_string(),
1794            })
1795    }
1796}
1797
1798impl FromValue for f64 {
1799    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1800        value
1801            .as_float64()
1802            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1803                expected: "FLOAT64".to_string(),
1804                found: value.type_name().to_string(),
1805            })
1806    }
1807}
1808
1809impl FromValue for String {
1810    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1811        value.as_str().map(String::from).ok_or_else(|| {
1812            grafeo_common::utils::error::Error::TypeMismatch {
1813                expected: "STRING".to_string(),
1814                found: value.type_name().to_string(),
1815            }
1816        })
1817    }
1818}
1819
1820impl FromValue for bool {
1821    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1822        value
1823            .as_bool()
1824            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1825                expected: "BOOL".to_string(),
1826                found: value.type_name().to_string(),
1827            })
1828    }
1829}
1830
1831#[cfg(test)]
1832mod tests {
1833    use super::*;
1834
1835    #[test]
1836    fn test_create_in_memory_database() {
1837        let db = GrafeoDB::new_in_memory();
1838        assert_eq!(db.node_count(), 0);
1839        assert_eq!(db.edge_count(), 0);
1840    }
1841
1842    #[test]
1843    fn test_database_config() {
1844        let config = Config::in_memory().with_threads(4).with_query_logging();
1845
1846        let db = GrafeoDB::with_config(config).unwrap();
1847        assert_eq!(db.config().threads, 4);
1848        assert!(db.config().query_logging);
1849    }
1850
1851    #[test]
1852    fn test_database_session() {
1853        let db = GrafeoDB::new_in_memory();
1854        let _session = db.session();
1855        // Session should be created successfully
1856    }
1857
1858    #[cfg(feature = "wal")]
1859    #[test]
1860    fn test_persistent_database_recovery() {
1861        use grafeo_common::types::Value;
1862        use tempfile::tempdir;
1863
1864        let dir = tempdir().unwrap();
1865        let db_path = dir.path().join("test_db");
1866
1867        // Create database and add some data
1868        {
1869            let db = GrafeoDB::open(&db_path).unwrap();
1870
1871            let alice = db.create_node(&["Person"]);
1872            db.set_node_property(alice, "name", Value::from("Alice"));
1873
1874            let bob = db.create_node(&["Person"]);
1875            db.set_node_property(bob, "name", Value::from("Bob"));
1876
1877            let _edge = db.create_edge(alice, bob, "KNOWS");
1878
1879            // Explicitly close to flush WAL
1880            db.close().unwrap();
1881        }
1882
1883        // Reopen and verify data was recovered
1884        {
1885            let db = GrafeoDB::open(&db_path).unwrap();
1886
1887            assert_eq!(db.node_count(), 2);
1888            assert_eq!(db.edge_count(), 1);
1889
1890            // Verify nodes exist
1891            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1892            assert!(node0.is_some());
1893
1894            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1895            assert!(node1.is_some());
1896        }
1897    }
1898
1899    #[cfg(feature = "wal")]
1900    #[test]
1901    fn test_wal_logging() {
1902        use tempfile::tempdir;
1903
1904        let dir = tempdir().unwrap();
1905        let db_path = dir.path().join("wal_test_db");
1906
1907        let db = GrafeoDB::open(&db_path).unwrap();
1908
1909        // Create some data
1910        let node = db.create_node(&["Test"]);
1911        db.delete_node(node);
1912
1913        // WAL should have records
1914        if let Some(wal) = db.wal() {
1915            assert!(wal.record_count() > 0);
1916        }
1917
1918        db.close().unwrap();
1919    }
1920
1921    #[cfg(feature = "wal")]
1922    #[test]
1923    fn test_wal_recovery_multiple_sessions() {
1924        // Tests that WAL recovery works correctly across multiple open/close cycles
1925        use grafeo_common::types::Value;
1926        use tempfile::tempdir;
1927
1928        let dir = tempdir().unwrap();
1929        let db_path = dir.path().join("multi_session_db");
1930
1931        // Session 1: Create initial data
1932        {
1933            let db = GrafeoDB::open(&db_path).unwrap();
1934            let alice = db.create_node(&["Person"]);
1935            db.set_node_property(alice, "name", Value::from("Alice"));
1936            db.close().unwrap();
1937        }
1938
1939        // Session 2: Add more data
1940        {
1941            let db = GrafeoDB::open(&db_path).unwrap();
1942            assert_eq!(db.node_count(), 1); // Previous data recovered
1943            let bob = db.create_node(&["Person"]);
1944            db.set_node_property(bob, "name", Value::from("Bob"));
1945            db.close().unwrap();
1946        }
1947
1948        // Session 3: Verify all data
1949        {
1950            let db = GrafeoDB::open(&db_path).unwrap();
1951            assert_eq!(db.node_count(), 2);
1952
1953            // Verify properties were recovered correctly
1954            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1955            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1956
1957            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1958            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1959        }
1960    }
1961
1962    #[cfg(feature = "wal")]
1963    #[test]
1964    fn test_database_consistency_after_mutations() {
1965        // Tests that database remains consistent after a series of create/delete operations
1966        use grafeo_common::types::Value;
1967        use tempfile::tempdir;
1968
1969        let dir = tempdir().unwrap();
1970        let db_path = dir.path().join("consistency_db");
1971
1972        {
1973            let db = GrafeoDB::open(&db_path).unwrap();
1974
1975            // Create nodes
1976            let a = db.create_node(&["Node"]);
1977            let b = db.create_node(&["Node"]);
1978            let c = db.create_node(&["Node"]);
1979
1980            // Create edges
1981            let e1 = db.create_edge(a, b, "LINKS");
1982            let _e2 = db.create_edge(b, c, "LINKS");
1983
1984            // Delete middle node and its edge
1985            db.delete_edge(e1);
1986            db.delete_node(b);
1987
1988            // Set properties on remaining nodes
1989            db.set_node_property(a, "value", Value::Int64(1));
1990            db.set_node_property(c, "value", Value::Int64(3));
1991
1992            db.close().unwrap();
1993        }
1994
1995        // Reopen and verify consistency
1996        {
1997            let db = GrafeoDB::open(&db_path).unwrap();
1998
1999            // Should have 2 nodes (a and c), b was deleted
2000            // Note: node_count includes deleted nodes in some implementations
2001            // What matters is that the non-deleted nodes are accessible
2002            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
2003            assert!(node_a.is_some());
2004
2005            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
2006            assert!(node_c.is_some());
2007
2008            // Middle node should be deleted
2009            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
2010            assert!(node_b.is_none());
2011        }
2012    }
2013
2014    #[cfg(feature = "wal")]
2015    #[test]
2016    fn test_close_is_idempotent() {
2017        // Calling close() multiple times should not cause errors
2018        use tempfile::tempdir;
2019
2020        let dir = tempdir().unwrap();
2021        let db_path = dir.path().join("close_test_db");
2022
2023        let db = GrafeoDB::open(&db_path).unwrap();
2024        db.create_node(&["Test"]);
2025
2026        // First close should succeed
2027        assert!(db.close().is_ok());
2028
2029        // Second close should also succeed (idempotent)
2030        assert!(db.close().is_ok());
2031    }
2032
2033    #[test]
2034    fn test_query_result_has_metrics() {
2035        // Verifies that query results include execution metrics
2036        let db = GrafeoDB::new_in_memory();
2037        db.create_node(&["Person"]);
2038        db.create_node(&["Person"]);
2039
2040        #[cfg(feature = "gql")]
2041        {
2042            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2043
2044            // Metrics should be populated
2045            assert!(result.execution_time_ms.is_some());
2046            assert!(result.rows_scanned.is_some());
2047            assert!(result.execution_time_ms.unwrap() >= 0.0);
2048            assert_eq!(result.rows_scanned.unwrap(), 2);
2049        }
2050    }
2051
2052    #[test]
2053    fn test_empty_query_result_metrics() {
2054        // Verifies metrics are correct for queries returning no results
2055        let db = GrafeoDB::new_in_memory();
2056        db.create_node(&["Person"]);
2057
2058        #[cfg(feature = "gql")]
2059        {
2060            // Query that matches nothing
2061            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2062
2063            assert!(result.execution_time_ms.is_some());
2064            assert!(result.rows_scanned.is_some());
2065            assert_eq!(result.rows_scanned.unwrap(), 0);
2066        }
2067    }
2068}