Skip to main content

grafeo_engine/
database.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4
5use std::path::Path;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10use grafeo_adapters::storage::wal::{WalConfig, WalManager, WalRecord, WalRecovery};
11use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::lpg::LpgStore;
14#[cfg(feature = "rdf")]
15use grafeo_core::graph::rdf::RdfStore;
16
17use crate::config::Config;
18use crate::query::cache::QueryCache;
19use crate::session::Session;
20use crate::transaction::TransactionManager;
21
22/// Your handle to a Grafeo database.
23///
24/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
25/// quick experiments, or [`open()`](Self::open) for persistent storage.
26/// Then grab a [`session()`](Self::session) to start querying.
27///
28/// # Examples
29///
30/// ```
31/// use grafeo_engine::GrafeoDB;
32///
33/// // Quick in-memory database
34/// let db = GrafeoDB::new_in_memory();
35///
36/// // Add some data
37/// db.create_node(&["Person"]);
38///
39/// // Query it
40/// let session = db.session();
41/// let result = session.execute("MATCH (p:Person) RETURN p")?;
42/// # Ok::<(), grafeo_common::utils::error::Error>(())
43/// ```
44pub struct GrafeoDB {
45    /// Database configuration.
46    config: Config,
47    /// The underlying graph store.
48    store: Arc<LpgStore>,
49    /// RDF triple store (if RDF feature is enabled).
50    #[cfg(feature = "rdf")]
51    rdf_store: Arc<RdfStore>,
52    /// Transaction manager.
53    tx_manager: Arc<TransactionManager>,
54    /// Unified buffer manager.
55    buffer_manager: Arc<BufferManager>,
56    /// Write-ahead log manager (if durability is enabled).
57    wal: Option<Arc<WalManager>>,
58    /// Query cache for parsed and optimized plans.
59    query_cache: Arc<QueryCache>,
60    /// Whether the database is open.
61    is_open: RwLock<bool>,
62}
63
64impl GrafeoDB {
65    /// Creates an in-memory database - fast to create, gone when dropped.
66    ///
67    /// Use this for tests, experiments, or when you don't need persistence.
68    /// For data that survives restarts, use [`open()`](Self::open) instead.
69    ///
70    /// # Examples
71    ///
72    /// ```
73    /// use grafeo_engine::GrafeoDB;
74    ///
75    /// let db = GrafeoDB::new_in_memory();
76    /// let session = db.session();
77    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
78    /// # Ok::<(), grafeo_common::utils::error::Error>(())
79    /// ```
80    #[must_use]
81    pub fn new_in_memory() -> Self {
82        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
83    }
84
85    /// Opens a database at the given path, creating it if it doesn't exist.
86    ///
87    /// If you've used this path before, Grafeo recovers your data from the
88    /// write-ahead log automatically. First open on a new path creates an
89    /// empty database.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if the path isn't writable or recovery fails.
94    ///
95    /// # Examples
96    ///
97    /// ```no_run
98    /// use grafeo_engine::GrafeoDB;
99    ///
100    /// let db = GrafeoDB::open("./my_social_network")?;
101    /// # Ok::<(), grafeo_common::utils::error::Error>(())
102    /// ```
103    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
104        Self::with_config(Config::persistent(path.as_ref()))
105    }
106
107    /// Creates a database with custom configuration.
108    ///
109    /// Use this when you need fine-grained control over memory limits,
110    /// thread counts, or persistence settings. For most cases,
111    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
112    /// are simpler.
113    ///
114    /// # Errors
115    ///
116    /// Returns an error if the database can't be created or recovery fails.
117    ///
118    /// # Examples
119    ///
120    /// ```
121    /// use grafeo_engine::{GrafeoDB, Config};
122    ///
123    /// // In-memory with a 512MB limit
124    /// let config = Config::in_memory()
125    ///     .with_memory_limit(512 * 1024 * 1024);
126    ///
127    /// let db = GrafeoDB::with_config(config)?;
128    /// # Ok::<(), grafeo_common::utils::error::Error>(())
129    /// ```
130    pub fn with_config(config: Config) -> Result<Self> {
131        let store = Arc::new(LpgStore::new());
132        #[cfg(feature = "rdf")]
133        let rdf_store = Arc::new(RdfStore::new());
134        let tx_manager = Arc::new(TransactionManager::new());
135
136        // Create buffer manager with configured limits
137        let buffer_config = BufferManagerConfig {
138            budget: config.memory_limit.unwrap_or_else(|| {
139                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
140            }),
141            spill_path: config
142                .spill_path
143                .clone()
144                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
145            ..BufferManagerConfig::default()
146        };
147        let buffer_manager = BufferManager::new(buffer_config);
148
149        // Initialize WAL if persistence is enabled
150        let wal = if config.wal_enabled {
151            if let Some(ref db_path) = config.path {
152                // Create database directory if it doesn't exist
153                std::fs::create_dir_all(db_path)?;
154
155                let wal_path = db_path.join("wal");
156
157                // Check if WAL exists and recover if needed
158                if wal_path.exists() {
159                    let recovery = WalRecovery::new(&wal_path);
160                    let records = recovery.recover()?;
161                    Self::apply_wal_records(&store, &records)?;
162                }
163
164                // Open/create WAL manager
165                let wal_config = WalConfig::default();
166                let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
167                Some(Arc::new(wal_manager))
168            } else {
169                None
170            }
171        } else {
172            None
173        };
174
175        // Create query cache with default capacity (1000 queries)
176        let query_cache = Arc::new(QueryCache::default());
177
178        Ok(Self {
179            config,
180            store,
181            #[cfg(feature = "rdf")]
182            rdf_store,
183            tx_manager,
184            buffer_manager,
185            wal,
186            query_cache,
187            is_open: RwLock::new(true),
188        })
189    }
190
191    /// Applies WAL records to restore the database state.
192    fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
193        for record in records {
194            match record {
195                WalRecord::CreateNode { id, labels } => {
196                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
197                    store.create_node_with_id(*id, &label_refs);
198                }
199                WalRecord::DeleteNode { id } => {
200                    store.delete_node(*id);
201                }
202                WalRecord::CreateEdge {
203                    id,
204                    src,
205                    dst,
206                    edge_type,
207                } => {
208                    store.create_edge_with_id(*id, *src, *dst, edge_type);
209                }
210                WalRecord::DeleteEdge { id } => {
211                    store.delete_edge(*id);
212                }
213                WalRecord::SetNodeProperty { id, key, value } => {
214                    store.set_node_property(*id, key, value.clone());
215                }
216                WalRecord::SetEdgeProperty { id, key, value } => {
217                    store.set_edge_property(*id, key, value.clone());
218                }
219                WalRecord::AddNodeLabel { id, label } => {
220                    store.add_label(*id, label);
221                }
222                WalRecord::RemoveNodeLabel { id, label } => {
223                    store.remove_label(*id, label);
224                }
225                WalRecord::TxCommit { .. }
226                | WalRecord::TxAbort { .. }
227                | WalRecord::Checkpoint { .. } => {
228                    // Transaction control records don't need replay action
229                    // (recovery already filtered to only committed transactions)
230                }
231            }
232        }
233        Ok(())
234    }
235
236    /// Opens a new session for running queries.
237    ///
238    /// Sessions are cheap to create - spin up as many as you need. Each
239    /// gets its own transaction context, so concurrent sessions won't
240    /// block each other on reads.
241    ///
242    /// # Examples
243    ///
244    /// ```
245    /// use grafeo_engine::GrafeoDB;
246    ///
247    /// let db = GrafeoDB::new_in_memory();
248    /// let session = db.session();
249    ///
250    /// // Run queries through the session
251    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
252    /// # Ok::<(), grafeo_common::utils::error::Error>(())
253    /// ```
254    #[must_use]
255    pub fn session(&self) -> Session {
256        #[cfg(feature = "rdf")]
257        {
258            Session::with_rdf_store_and_adaptive(
259                Arc::clone(&self.store),
260                Arc::clone(&self.rdf_store),
261                Arc::clone(&self.tx_manager),
262                Arc::clone(&self.query_cache),
263                self.config.adaptive.clone(),
264                self.config.factorized_execution,
265            )
266        }
267        #[cfg(not(feature = "rdf"))]
268        {
269            Session::with_adaptive(
270                Arc::clone(&self.store),
271                Arc::clone(&self.tx_manager),
272                Arc::clone(&self.query_cache),
273                self.config.adaptive.clone(),
274                self.config.factorized_execution,
275            )
276        }
277    }
278
279    /// Returns the adaptive execution configuration.
280    #[must_use]
281    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
282        &self.config.adaptive
283    }
284
285    /// Runs a query directly on the database.
286    ///
287    /// A convenience method that creates a temporary session behind the
288    /// scenes. If you're running multiple queries, grab a
289    /// [`session()`](Self::session) instead to avoid the overhead.
290    ///
291    /// # Errors
292    ///
293    /// Returns an error if parsing or execution fails.
294    pub fn execute(&self, query: &str) -> Result<QueryResult> {
295        let session = self.session();
296        session.execute(query)
297    }
298
299    /// Executes a query with parameters and returns the result.
300    ///
301    /// # Errors
302    ///
303    /// Returns an error if the query fails.
304    pub fn execute_with_params(
305        &self,
306        query: &str,
307        params: std::collections::HashMap<String, grafeo_common::types::Value>,
308    ) -> Result<QueryResult> {
309        let session = self.session();
310        session.execute_with_params(query, params)
311    }
312
313    /// Executes a Cypher query and returns the result.
314    ///
315    /// # Errors
316    ///
317    /// Returns an error if the query fails.
318    #[cfg(feature = "cypher")]
319    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
320        let session = self.session();
321        session.execute_cypher(query)
322    }
323
324    /// Executes a Cypher query with parameters and returns the result.
325    ///
326    /// # Errors
327    ///
328    /// Returns an error if the query fails.
329    #[cfg(feature = "cypher")]
330    pub fn execute_cypher_with_params(
331        &self,
332        query: &str,
333        params: std::collections::HashMap<String, grafeo_common::types::Value>,
334    ) -> Result<QueryResult> {
335        use crate::query::processor::{QueryLanguage, QueryProcessor};
336
337        // Create processor
338        let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
339        processor.process(query, QueryLanguage::Cypher, Some(&params))
340    }
341
342    /// Executes a Gremlin query and returns the result.
343    ///
344    /// # Errors
345    ///
346    /// Returns an error if the query fails.
347    #[cfg(feature = "gremlin")]
348    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
349        let session = self.session();
350        session.execute_gremlin(query)
351    }
352
353    /// Executes a Gremlin query with parameters and returns the result.
354    ///
355    /// # Errors
356    ///
357    /// Returns an error if the query fails.
358    #[cfg(feature = "gremlin")]
359    pub fn execute_gremlin_with_params(
360        &self,
361        query: &str,
362        params: std::collections::HashMap<String, grafeo_common::types::Value>,
363    ) -> Result<QueryResult> {
364        let session = self.session();
365        session.execute_gremlin_with_params(query, params)
366    }
367
368    /// Executes a GraphQL query and returns the result.
369    ///
370    /// # Errors
371    ///
372    /// Returns an error if the query fails.
373    #[cfg(feature = "graphql")]
374    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
375        let session = self.session();
376        session.execute_graphql(query)
377    }
378
379    /// Executes a GraphQL query with parameters and returns the result.
380    ///
381    /// # Errors
382    ///
383    /// Returns an error if the query fails.
384    #[cfg(feature = "graphql")]
385    pub fn execute_graphql_with_params(
386        &self,
387        query: &str,
388        params: std::collections::HashMap<String, grafeo_common::types::Value>,
389    ) -> Result<QueryResult> {
390        let session = self.session();
391        session.execute_graphql_with_params(query, params)
392    }
393
394    /// Executes a SPARQL query and returns the result.
395    ///
396    /// SPARQL queries operate on the RDF triple store.
397    ///
398    /// # Errors
399    ///
400    /// Returns an error if the query fails.
401    ///
402    /// # Examples
403    ///
404    /// ```ignore
405    /// use grafeo_engine::GrafeoDB;
406    ///
407    /// let db = GrafeoDB::new_in_memory();
408    /// let result = db.execute_sparql("SELECT ?s ?p ?o WHERE { ?s ?p ?o }")?;
409    /// ```
410    #[cfg(all(feature = "sparql", feature = "rdf"))]
411    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
412        use crate::query::{
413            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
414        };
415
416        // Parse and translate the SPARQL query to a logical plan
417        let logical_plan = sparql_translator::translate(query)?;
418
419        // Optimize the plan
420        let optimizer = Optimizer::from_store(&self.store);
421        let optimized_plan = optimizer.optimize(logical_plan)?;
422
423        // Convert to physical plan using RDF planner
424        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
425        let mut physical_plan = planner.plan(&optimized_plan)?;
426
427        // Execute the plan
428        let executor = Executor::with_columns(physical_plan.columns.clone());
429        executor.execute(physical_plan.operator.as_mut())
430    }
431
432    /// Returns the RDF store.
433    ///
434    /// This provides direct access to the RDF store for triple operations.
435    #[cfg(feature = "rdf")]
436    #[must_use]
437    pub fn rdf_store(&self) -> &Arc<RdfStore> {
438        &self.rdf_store
439    }
440
441    /// Executes a query and returns a single scalar value.
442    ///
443    /// # Errors
444    ///
445    /// Returns an error if the query fails or doesn't return exactly one row.
446    pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
447        let result = self.execute(query)?;
448        result.scalar()
449    }
450
451    /// Returns the configuration.
452    #[must_use]
453    pub fn config(&self) -> &Config {
454        &self.config
455    }
456
457    /// Returns the underlying store.
458    ///
459    /// This provides direct access to the LPG store for algorithm implementations.
460    #[must_use]
461    pub fn store(&self) -> &Arc<LpgStore> {
462        &self.store
463    }
464
465    /// Returns the buffer manager for memory-aware operations.
466    #[must_use]
467    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
468        &self.buffer_manager
469    }
470
471    /// Closes the database, flushing all pending writes.
472    ///
473    /// For persistent databases, this ensures everything is safely on disk.
474    /// Called automatically when the database is dropped, but you can call
475    /// it explicitly if you need to guarantee durability at a specific point.
476    ///
477    /// # Errors
478    ///
479    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
480    pub fn close(&self) -> Result<()> {
481        let mut is_open = self.is_open.write();
482        if !*is_open {
483            return Ok(());
484        }
485
486        // Commit and checkpoint WAL
487        if let Some(ref wal) = self.wal {
488            let epoch = self.store.current_epoch();
489
490            // Use the last assigned transaction ID, or create a checkpoint-only tx
491            let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
492                // No transactions have been started; begin one for checkpoint
493                self.tx_manager.begin()
494            });
495
496            // Log a TxCommit to mark all pending records as committed
497            wal.log(&WalRecord::TxCommit {
498                tx_id: checkpoint_tx,
499            })?;
500
501            // Then checkpoint
502            wal.checkpoint(checkpoint_tx, epoch)?;
503            wal.sync()?;
504        }
505
506        *is_open = false;
507        Ok(())
508    }
509
510    /// Returns the WAL manager if available.
511    #[must_use]
512    pub fn wal(&self) -> Option<&Arc<WalManager>> {
513        self.wal.as_ref()
514    }
515
516    /// Logs a WAL record if WAL is enabled.
517    fn log_wal(&self, record: &WalRecord) -> Result<()> {
518        if let Some(ref wal) = self.wal {
519            wal.log(record)?;
520        }
521        Ok(())
522    }
523
524    /// Returns the number of nodes in the database.
525    #[must_use]
526    pub fn node_count(&self) -> usize {
527        self.store.node_count()
528    }
529
530    /// Returns the number of edges in the database.
531    #[must_use]
532    pub fn edge_count(&self) -> usize {
533        self.store.edge_count()
534    }
535
536    /// Returns the number of distinct labels in the database.
537    #[must_use]
538    pub fn label_count(&self) -> usize {
539        self.store.label_count()
540    }
541
542    /// Returns the number of distinct property keys in the database.
543    #[must_use]
544    pub fn property_key_count(&self) -> usize {
545        self.store.property_key_count()
546    }
547
548    /// Returns the number of distinct edge types in the database.
549    #[must_use]
550    pub fn edge_type_count(&self) -> usize {
551        self.store.edge_type_count()
552    }
553
554    // === Node Operations ===
555
556    /// Creates a node with the given labels and returns its ID.
557    ///
558    /// Labels categorize nodes - think of them like tags. A node can have
559    /// multiple labels (e.g., `["Person", "Employee"]`).
560    ///
561    /// # Examples
562    ///
563    /// ```
564    /// use grafeo_engine::GrafeoDB;
565    ///
566    /// let db = GrafeoDB::new_in_memory();
567    /// let alice = db.create_node(&["Person"]);
568    /// let company = db.create_node(&["Company", "Startup"]);
569    /// ```
570    pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
571        let id = self.store.create_node(labels);
572
573        // Log to WAL if enabled
574        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
575            id,
576            labels: labels.iter().map(|s| s.to_string()).collect(),
577        }) {
578            tracing::warn!("Failed to log CreateNode to WAL: {}", e);
579        }
580
581        id
582    }
583
584    /// Creates a new node with labels and properties.
585    ///
586    /// If WAL is enabled, the operation is logged for durability.
587    pub fn create_node_with_props(
588        &self,
589        labels: &[&str],
590        properties: impl IntoIterator<
591            Item = (
592                impl Into<grafeo_common::types::PropertyKey>,
593                impl Into<grafeo_common::types::Value>,
594            ),
595        >,
596    ) -> grafeo_common::types::NodeId {
597        // Collect properties first so we can log them to WAL
598        let props: Vec<(
599            grafeo_common::types::PropertyKey,
600            grafeo_common::types::Value,
601        )> = properties
602            .into_iter()
603            .map(|(k, v)| (k.into(), v.into()))
604            .collect();
605
606        let id = self
607            .store
608            .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
609
610        // Log node creation to WAL
611        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
612            id,
613            labels: labels.iter().map(|s| s.to_string()).collect(),
614        }) {
615            tracing::warn!("Failed to log CreateNode to WAL: {}", e);
616        }
617
618        // Log each property to WAL for full durability
619        for (key, value) in props {
620            if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
621                id,
622                key: key.to_string(),
623                value,
624            }) {
625                tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
626            }
627        }
628
629        id
630    }
631
632    /// Gets a node by ID.
633    #[must_use]
634    pub fn get_node(
635        &self,
636        id: grafeo_common::types::NodeId,
637    ) -> Option<grafeo_core::graph::lpg::Node> {
638        self.store.get_node(id)
639    }
640
641    /// Deletes a node and all its edges.
642    ///
643    /// If WAL is enabled, the operation is logged for durability.
644    pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
645        let result = self.store.delete_node(id);
646
647        if result && let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
648            tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
649        }
650
651        result
652    }
653
654    /// Sets a property on a node.
655    ///
656    /// If WAL is enabled, the operation is logged for durability.
657    pub fn set_node_property(
658        &self,
659        id: grafeo_common::types::NodeId,
660        key: &str,
661        value: grafeo_common::types::Value,
662    ) {
663        // Log to WAL first
664        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
665            id,
666            key: key.to_string(),
667            value: value.clone(),
668        }) {
669            tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
670        }
671
672        self.store.set_node_property(id, key, value);
673    }
674
675    /// Adds a label to an existing node.
676    ///
677    /// Returns `true` if the label was added, `false` if the node doesn't exist
678    /// or already has the label.
679    ///
680    /// # Examples
681    ///
682    /// ```
683    /// use grafeo_engine::GrafeoDB;
684    ///
685    /// let db = GrafeoDB::new_in_memory();
686    /// let alice = db.create_node(&["Person"]);
687    ///
688    /// // Promote Alice to Employee
689    /// let added = db.add_node_label(alice, "Employee");
690    /// assert!(added);
691    /// ```
692    pub fn add_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
693        let result = self.store.add_label(id, label);
694
695        if result {
696            // Log to WAL if enabled
697            if let Err(e) = self.log_wal(&WalRecord::AddNodeLabel {
698                id,
699                label: label.to_string(),
700            }) {
701                tracing::warn!("Failed to log AddNodeLabel to WAL: {}", e);
702            }
703        }
704
705        result
706    }
707
708    /// Removes a label from a node.
709    ///
710    /// Returns `true` if the label was removed, `false` if the node doesn't exist
711    /// or doesn't have the label.
712    ///
713    /// # Examples
714    ///
715    /// ```
716    /// use grafeo_engine::GrafeoDB;
717    ///
718    /// let db = GrafeoDB::new_in_memory();
719    /// let alice = db.create_node(&["Person", "Employee"]);
720    ///
721    /// // Remove Employee status
722    /// let removed = db.remove_node_label(alice, "Employee");
723    /// assert!(removed);
724    /// ```
725    pub fn remove_node_label(&self, id: grafeo_common::types::NodeId, label: &str) -> bool {
726        let result = self.store.remove_label(id, label);
727
728        if result {
729            // Log to WAL if enabled
730            if let Err(e) = self.log_wal(&WalRecord::RemoveNodeLabel {
731                id,
732                label: label.to_string(),
733            }) {
734                tracing::warn!("Failed to log RemoveNodeLabel to WAL: {}", e);
735            }
736        }
737
738        result
739    }
740
741    /// Gets all labels for a node.
742    ///
743    /// Returns `None` if the node doesn't exist.
744    ///
745    /// # Examples
746    ///
747    /// ```
748    /// use grafeo_engine::GrafeoDB;
749    ///
750    /// let db = GrafeoDB::new_in_memory();
751    /// let alice = db.create_node(&["Person", "Employee"]);
752    ///
753    /// let labels = db.get_node_labels(alice).unwrap();
754    /// assert!(labels.contains(&"Person".to_string()));
755    /// assert!(labels.contains(&"Employee".to_string()));
756    /// ```
757    #[must_use]
758    pub fn get_node_labels(&self, id: grafeo_common::types::NodeId) -> Option<Vec<String>> {
759        self.store
760            .get_node(id)
761            .map(|node| node.labels.iter().map(|s| s.to_string()).collect())
762    }
763
764    // === Edge Operations ===
765
766    /// Creates an edge (relationship) between two nodes.
767    ///
768    /// Edges connect nodes and have a type that describes the relationship.
769    /// They're directed - the order of `src` and `dst` matters.
770    ///
771    /// # Examples
772    ///
773    /// ```
774    /// use grafeo_engine::GrafeoDB;
775    ///
776    /// let db = GrafeoDB::new_in_memory();
777    /// let alice = db.create_node(&["Person"]);
778    /// let bob = db.create_node(&["Person"]);
779    ///
780    /// // Alice knows Bob (directed: Alice -> Bob)
781    /// let edge = db.create_edge(alice, bob, "KNOWS");
782    /// ```
783    pub fn create_edge(
784        &self,
785        src: grafeo_common::types::NodeId,
786        dst: grafeo_common::types::NodeId,
787        edge_type: &str,
788    ) -> grafeo_common::types::EdgeId {
789        let id = self.store.create_edge(src, dst, edge_type);
790
791        // Log to WAL if enabled
792        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
793            id,
794            src,
795            dst,
796            edge_type: edge_type.to_string(),
797        }) {
798            tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
799        }
800
801        id
802    }
803
804    /// Creates a new edge with properties.
805    ///
806    /// If WAL is enabled, the operation is logged for durability.
807    pub fn create_edge_with_props(
808        &self,
809        src: grafeo_common::types::NodeId,
810        dst: grafeo_common::types::NodeId,
811        edge_type: &str,
812        properties: impl IntoIterator<
813            Item = (
814                impl Into<grafeo_common::types::PropertyKey>,
815                impl Into<grafeo_common::types::Value>,
816            ),
817        >,
818    ) -> grafeo_common::types::EdgeId {
819        // Collect properties first so we can log them to WAL
820        let props: Vec<(
821            grafeo_common::types::PropertyKey,
822            grafeo_common::types::Value,
823        )> = properties
824            .into_iter()
825            .map(|(k, v)| (k.into(), v.into()))
826            .collect();
827
828        let id = self.store.create_edge_with_props(
829            src,
830            dst,
831            edge_type,
832            props.iter().map(|(k, v)| (k.clone(), v.clone())),
833        );
834
835        // Log edge creation to WAL
836        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
837            id,
838            src,
839            dst,
840            edge_type: edge_type.to_string(),
841        }) {
842            tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
843        }
844
845        // Log each property to WAL for full durability
846        for (key, value) in props {
847            if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
848                id,
849                key: key.to_string(),
850                value,
851            }) {
852                tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
853            }
854        }
855
856        id
857    }
858
859    /// Gets an edge by ID.
860    #[must_use]
861    pub fn get_edge(
862        &self,
863        id: grafeo_common::types::EdgeId,
864    ) -> Option<grafeo_core::graph::lpg::Edge> {
865        self.store.get_edge(id)
866    }
867
868    /// Deletes an edge.
869    ///
870    /// If WAL is enabled, the operation is logged for durability.
871    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
872        let result = self.store.delete_edge(id);
873
874        if result && let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
875            tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
876        }
877
878        result
879    }
880
881    /// Sets a property on an edge.
882    ///
883    /// If WAL is enabled, the operation is logged for durability.
884    pub fn set_edge_property(
885        &self,
886        id: grafeo_common::types::EdgeId,
887        key: &str,
888        value: grafeo_common::types::Value,
889    ) {
890        // Log to WAL first
891        if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
892            id,
893            key: key.to_string(),
894            value: value.clone(),
895        }) {
896            tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
897        }
898        self.store.set_edge_property(id, key, value);
899    }
900
901    /// Removes a property from a node.
902    ///
903    /// Returns true if the property existed and was removed, false otherwise.
904    pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
905        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
906        self.store.remove_node_property(id, key).is_some()
907    }
908
909    /// Removes a property from an edge.
910    ///
911    /// Returns true if the property existed and was removed, false otherwise.
912    pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
913        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
914        self.store.remove_edge_property(id, key).is_some()
915    }
916
917    // =========================================================================
918    // PROPERTY INDEX API
919    // =========================================================================
920
921    /// Creates an index on a node property for O(1) lookups by value.
922    ///
923    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
924    /// O(1) instead of O(n) for this property. The index is automatically
925    /// maintained when properties are set or removed.
926    ///
927    /// # Example
928    ///
929    /// ```ignore
930    /// // Create an index on the 'email' property
931    /// db.create_property_index("email");
932    ///
933    /// // Now lookups by email are O(1)
934    /// let nodes = db.find_nodes_by_property("email", &Value::from("alice@example.com"));
935    /// ```
936    pub fn create_property_index(&self, property: &str) {
937        self.store.create_property_index(property);
938    }
939
940    /// Creates a vector similarity index on a node property.
941    ///
942    /// This enables efficient approximate nearest-neighbor search on vector
943    /// properties. Currently validates the index parameters and scans existing
944    /// nodes to verify the property contains vectors of the expected dimensions.
945    ///
946    /// # Arguments
947    ///
948    /// * `label` - Node label to index (e.g., `"Doc"`)
949    /// * `property` - Property containing vector embeddings (e.g., `"embedding"`)
950    /// * `dimensions` - Expected vector dimensions (inferred from data if `None`)
951    /// * `metric` - Distance metric: `"cosine"` (default), `"euclidean"`, `"dot_product"`, `"manhattan"`
952    /// * `m` - HNSW links per node (default: 16). Higher = better recall, more memory.
953    /// * `ef_construction` - Construction beam width (default: 128). Higher = better index quality, slower build.
954    ///
955    /// # Errors
956    ///
957    /// Returns an error if the metric is invalid, no vectors are found, or
958    /// dimensions don't match.
959    pub fn create_vector_index(
960        &self,
961        label: &str,
962        property: &str,
963        dimensions: Option<usize>,
964        metric: Option<&str>,
965        m: Option<usize>,
966        ef_construction: Option<usize>,
967    ) -> Result<()> {
968        use grafeo_common::types::{PropertyKey, Value};
969        use grafeo_core::index::vector::DistanceMetric;
970
971        let metric = match metric {
972            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
973                grafeo_common::utils::error::Error::Internal(format!(
974                    "Unknown distance metric '{}'. Use: cosine, euclidean, dot_product, manhattan",
975                    m
976                ))
977            })?,
978            None => DistanceMetric::Cosine,
979        };
980
981        // Scan nodes to validate vectors exist and check dimensions
982        let prop_key = PropertyKey::new(property);
983        let mut found_dims: Option<usize> = dimensions;
984        let mut vector_count = 0usize;
985
986        #[cfg(feature = "vector-index")]
987        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
988
989        for node in self.store.nodes_with_label(label) {
990            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
991                if let Some(expected) = found_dims {
992                    if v.len() != expected {
993                        return Err(grafeo_common::utils::error::Error::Internal(format!(
994                            "Vector dimension mismatch: expected {}, found {} on node {}",
995                            expected,
996                            v.len(),
997                            node.id.0
998                        )));
999                    }
1000                } else {
1001                    found_dims = Some(v.len());
1002                }
1003                vector_count += 1;
1004                #[cfg(feature = "vector-index")]
1005                vectors.push((node.id, v.to_vec()));
1006            }
1007        }
1008
1009        if vector_count == 0 {
1010            return Err(grafeo_common::utils::error::Error::Internal(format!(
1011                "No vector properties found on :{label}({property})"
1012            )));
1013        }
1014
1015        let dims = found_dims.unwrap_or(0);
1016
1017        // Build and populate the HNSW index
1018        #[cfg(feature = "vector-index")]
1019        {
1020            use grafeo_core::index::vector::{HnswConfig, HnswIndex};
1021
1022            let mut config = HnswConfig::new(dims, metric);
1023            if let Some(m_val) = m {
1024                config = config.with_m(m_val);
1025            }
1026            if let Some(ef_c) = ef_construction {
1027                config = config.with_ef_construction(ef_c);
1028            }
1029
1030            let index = HnswIndex::with_capacity(config, vectors.len());
1031            for (node_id, vec) in &vectors {
1032                index.insert(*node_id, vec);
1033            }
1034
1035            self.store
1036                .add_vector_index(label, property, Arc::new(index));
1037        }
1038
1039        // Suppress unused variable warnings when vector-index is off
1040        let _ = (m, ef_construction);
1041
1042        tracing::info!(
1043            "Vector index created: :{label}({property}) - {vector_count} vectors, {dims} dimensions, metric={metric_name}",
1044            metric_name = metric.name()
1045        );
1046
1047        Ok(())
1048    }
1049
1050    /// Searches for the k nearest neighbors of a query vector.
1051    ///
1052    /// Uses the HNSW index created by [`create_vector_index`](Self::create_vector_index).
1053    ///
1054    /// # Arguments
1055    ///
1056    /// * `label` - Node label that was indexed
1057    /// * `property` - Property that was indexed
1058    /// * `query` - Query vector (slice of floats)
1059    /// * `k` - Number of nearest neighbors to return
1060    /// * `ef` - Search beam width (higher = better recall, slower). Uses index default if `None`.
1061    ///
1062    /// # Returns
1063    ///
1064    /// Vector of `(NodeId, distance)` pairs sorted by distance ascending.
1065    #[cfg(feature = "vector-index")]
1066    pub fn vector_search(
1067        &self,
1068        label: &str,
1069        property: &str,
1070        query: &[f32],
1071        k: usize,
1072        ef: Option<usize>,
1073    ) -> Result<Vec<(grafeo_common::types::NodeId, f32)>> {
1074        let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1075            grafeo_common::utils::error::Error::Internal(format!(
1076                "No vector index found for :{label}({property}). Call create_vector_index() first."
1077            ))
1078        })?;
1079
1080        let results = match ef {
1081            Some(ef_val) => index.search_with_ef(query, k, ef_val),
1082            None => index.search(query, k),
1083        };
1084
1085        Ok(results)
1086    }
1087
1088    /// Creates multiple nodes in bulk, each with a single vector property.
1089    ///
1090    /// Much faster than individual `create_node_with_props` calls because it
1091    /// acquires internal locks once and loops in Rust rather than crossing
1092    /// the FFI boundary per vector.
1093    ///
1094    /// # Arguments
1095    ///
1096    /// * `label` - Label applied to all created nodes
1097    /// * `property` - Property name for the vector data
1098    /// * `vectors` - Vector data for each node
1099    ///
1100    /// # Returns
1101    ///
1102    /// Vector of created `NodeId`s in the same order as the input vectors.
1103    pub fn batch_create_nodes(
1104        &self,
1105        label: &str,
1106        property: &str,
1107        vectors: Vec<Vec<f32>>,
1108    ) -> Vec<grafeo_common::types::NodeId> {
1109        use grafeo_common::types::{PropertyKey, Value};
1110
1111        let prop_key = PropertyKey::new(property);
1112        let labels: &[&str] = &[label];
1113
1114        vectors
1115            .into_iter()
1116            .map(|vec| {
1117                let value = Value::Vector(vec.into());
1118                let id = self.store.create_node_with_props(
1119                    labels,
1120                    std::iter::once((prop_key.clone(), value.clone())),
1121                );
1122
1123                // Log to WAL
1124                if let Err(e) = self.log_wal(&WalRecord::CreateNode {
1125                    id,
1126                    labels: labels.iter().map(|s| s.to_string()).collect(),
1127                }) {
1128                    tracing::warn!("Failed to log CreateNode to WAL: {}", e);
1129                }
1130                if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
1131                    id,
1132                    key: property.to_string(),
1133                    value,
1134                }) {
1135                    tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
1136                }
1137
1138                id
1139            })
1140            .collect()
1141    }
1142
1143    /// Searches for nearest neighbors for multiple query vectors in parallel.
1144    ///
1145    /// Uses rayon parallel iteration under the hood for multi-core throughput.
1146    ///
1147    /// # Arguments
1148    ///
1149    /// * `label` - Node label that was indexed
1150    /// * `property` - Property that was indexed
1151    /// * `queries` - Batch of query vectors
1152    /// * `k` - Number of nearest neighbors per query
1153    /// * `ef` - Search beam width (uses index default if `None`)
1154    #[cfg(feature = "vector-index")]
1155    pub fn batch_vector_search(
1156        &self,
1157        label: &str,
1158        property: &str,
1159        queries: &[Vec<f32>],
1160        k: usize,
1161        ef: Option<usize>,
1162    ) -> Result<Vec<Vec<(grafeo_common::types::NodeId, f32)>>> {
1163        let index = self.store.get_vector_index(label, property).ok_or_else(|| {
1164            grafeo_common::utils::error::Error::Internal(format!(
1165                "No vector index found for :{label}({property}). Call create_vector_index() first."
1166            ))
1167        })?;
1168
1169        let results = match ef {
1170            Some(ef_val) => index.batch_search_with_ef(queries, k, ef_val),
1171            None => index.batch_search(queries, k),
1172        };
1173
1174        Ok(results)
1175    }
1176
1177    /// Drops an index on a node property.
1178    ///
1179    /// Returns `true` if the index existed and was removed.
1180    pub fn drop_property_index(&self, property: &str) -> bool {
1181        self.store.drop_property_index(property)
1182    }
1183
1184    /// Returns `true` if the property has an index.
1185    #[must_use]
1186    pub fn has_property_index(&self, property: &str) -> bool {
1187        self.store.has_property_index(property)
1188    }
1189
1190    /// Finds all nodes that have a specific property value.
1191    ///
1192    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
1193    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
1194    ///
1195    /// # Example
1196    ///
1197    /// ```ignore
1198    /// // Create index for fast lookups (optional but recommended)
1199    /// db.create_property_index("city");
1200    ///
1201    /// // Find all nodes where city = "NYC"
1202    /// let nyc_nodes = db.find_nodes_by_property("city", &Value::from("NYC"));
1203    /// ```
1204    #[must_use]
1205    pub fn find_nodes_by_property(
1206        &self,
1207        property: &str,
1208        value: &grafeo_common::types::Value,
1209    ) -> Vec<grafeo_common::types::NodeId> {
1210        self.store.find_nodes_by_property(property, value)
1211    }
1212
1213    // =========================================================================
1214    // ADMIN API: Introspection
1215    // =========================================================================
1216
1217    /// Returns true if this database is backed by a file (persistent).
1218    ///
1219    /// In-memory databases return false.
1220    #[must_use]
1221    pub fn is_persistent(&self) -> bool {
1222        self.config.path.is_some()
1223    }
1224
1225    /// Returns the database file path, if persistent.
1226    ///
1227    /// In-memory databases return None.
1228    #[must_use]
1229    pub fn path(&self) -> Option<&Path> {
1230        self.config.path.as_deref()
1231    }
1232
1233    /// Returns high-level database information.
1234    ///
1235    /// Includes node/edge counts, persistence status, and mode (LPG/RDF).
1236    #[must_use]
1237    pub fn info(&self) -> crate::admin::DatabaseInfo {
1238        crate::admin::DatabaseInfo {
1239            mode: crate::admin::DatabaseMode::Lpg,
1240            node_count: self.store.node_count(),
1241            edge_count: self.store.edge_count(),
1242            is_persistent: self.is_persistent(),
1243            path: self.config.path.clone(),
1244            wal_enabled: self.config.wal_enabled,
1245            version: env!("CARGO_PKG_VERSION").to_string(),
1246        }
1247    }
1248
1249    /// Returns detailed database statistics.
1250    ///
1251    /// Includes counts, memory usage, and index information.
1252    #[must_use]
1253    pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1254        let disk_bytes = self.config.path.as_ref().and_then(|p| {
1255            if p.exists() {
1256                Self::calculate_disk_usage(p).ok()
1257            } else {
1258                None
1259            }
1260        });
1261
1262        crate::admin::DatabaseStats {
1263            node_count: self.store.node_count(),
1264            edge_count: self.store.edge_count(),
1265            label_count: self.store.label_count(),
1266            edge_type_count: self.store.edge_type_count(),
1267            property_key_count: self.store.property_key_count(),
1268            index_count: 0, // TODO: implement index tracking
1269            memory_bytes: self.buffer_manager.allocated(),
1270            disk_bytes,
1271        }
1272    }
1273
1274    /// Calculates total disk usage for the database directory.
1275    fn calculate_disk_usage(path: &Path) -> Result<usize> {
1276        let mut total = 0usize;
1277        if path.is_dir() {
1278            for entry in std::fs::read_dir(path)? {
1279                let entry = entry?;
1280                let metadata = entry.metadata()?;
1281                if metadata.is_file() {
1282                    total += metadata.len() as usize;
1283                } else if metadata.is_dir() {
1284                    total += Self::calculate_disk_usage(&entry.path())?;
1285                }
1286            }
1287        }
1288        Ok(total)
1289    }
1290
1291    /// Returns schema information (labels, edge types, property keys).
1292    ///
1293    /// For LPG mode, returns label and edge type information.
1294    /// For RDF mode, returns predicate and named graph information.
1295    #[must_use]
1296    pub fn schema(&self) -> crate::admin::SchemaInfo {
1297        let labels = self
1298            .store
1299            .all_labels()
1300            .into_iter()
1301            .map(|name| crate::admin::LabelInfo {
1302                name: name.clone(),
1303                count: self.store.nodes_with_label(&name).count(),
1304            })
1305            .collect();
1306
1307        let edge_types = self
1308            .store
1309            .all_edge_types()
1310            .into_iter()
1311            .map(|name| crate::admin::EdgeTypeInfo {
1312                name: name.clone(),
1313                count: self.store.edges_with_type(&name).count(),
1314            })
1315            .collect();
1316
1317        let property_keys = self.store.all_property_keys();
1318
1319        crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
1320            labels,
1321            edge_types,
1322            property_keys,
1323        })
1324    }
1325
1326    /// Returns RDF schema information.
1327    ///
1328    /// Only available when the RDF feature is enabled.
1329    #[cfg(feature = "rdf")]
1330    #[must_use]
1331    pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
1332        let stats = self.rdf_store.stats();
1333
1334        let predicates = self
1335            .rdf_store
1336            .predicates()
1337            .into_iter()
1338            .map(|predicate| {
1339                let count = self.rdf_store.triples_with_predicate(&predicate).len();
1340                crate::admin::PredicateInfo {
1341                    iri: predicate.to_string(),
1342                    count,
1343                }
1344            })
1345            .collect();
1346
1347        crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
1348            predicates,
1349            named_graphs: Vec::new(), // Named graphs not yet implemented in RdfStore
1350            subject_count: stats.subject_count,
1351            object_count: stats.object_count,
1352        })
1353    }
1354
1355    /// Validates database integrity.
1356    ///
1357    /// Checks for:
1358    /// - Dangling edge references (edges pointing to non-existent nodes)
1359    /// - Internal index consistency
1360    ///
1361    /// Returns a list of errors and warnings. Empty errors = valid.
1362    #[must_use]
1363    pub fn validate(&self) -> crate::admin::ValidationResult {
1364        let mut result = crate::admin::ValidationResult::default();
1365
1366        // Check for dangling edge references
1367        for edge in self.store.all_edges() {
1368            if self.store.get_node(edge.src).is_none() {
1369                result.errors.push(crate::admin::ValidationError {
1370                    code: "DANGLING_SRC".to_string(),
1371                    message: format!(
1372                        "Edge {} references non-existent source node {}",
1373                        edge.id.0, edge.src.0
1374                    ),
1375                    context: Some(format!("edge:{}", edge.id.0)),
1376                });
1377            }
1378            if self.store.get_node(edge.dst).is_none() {
1379                result.errors.push(crate::admin::ValidationError {
1380                    code: "DANGLING_DST".to_string(),
1381                    message: format!(
1382                        "Edge {} references non-existent destination node {}",
1383                        edge.id.0, edge.dst.0
1384                    ),
1385                    context: Some(format!("edge:{}", edge.id.0)),
1386                });
1387            }
1388        }
1389
1390        // Add warnings for potential issues
1391        if self.store.node_count() > 0 && self.store.edge_count() == 0 {
1392            result.warnings.push(crate::admin::ValidationWarning {
1393                code: "NO_EDGES".to_string(),
1394                message: "Database has nodes but no edges".to_string(),
1395                context: None,
1396            });
1397        }
1398
1399        result
1400    }
1401
1402    /// Returns WAL (Write-Ahead Log) status.
1403    ///
1404    /// Returns None if WAL is not enabled.
1405    #[must_use]
1406    pub fn wal_status(&self) -> crate::admin::WalStatus {
1407        if let Some(ref wal) = self.wal {
1408            crate::admin::WalStatus {
1409                enabled: true,
1410                path: self.config.path.as_ref().map(|p| p.join("wal")),
1411                size_bytes: wal.size_bytes(),
1412                record_count: wal.record_count() as usize,
1413                last_checkpoint: wal.last_checkpoint_timestamp(),
1414                current_epoch: self.store.current_epoch().as_u64(),
1415            }
1416        } else {
1417            crate::admin::WalStatus {
1418                enabled: false,
1419                path: None,
1420                size_bytes: 0,
1421                record_count: 0,
1422                last_checkpoint: None,
1423                current_epoch: self.store.current_epoch().as_u64(),
1424            }
1425        }
1426    }
1427
1428    /// Forces a WAL checkpoint.
1429    ///
1430    /// Flushes all pending WAL records to the main storage.
1431    ///
1432    /// # Errors
1433    ///
1434    /// Returns an error if the checkpoint fails.
1435    pub fn wal_checkpoint(&self) -> Result<()> {
1436        if let Some(ref wal) = self.wal {
1437            let epoch = self.store.current_epoch();
1438            let tx_id = self
1439                .tx_manager
1440                .last_assigned_tx_id()
1441                .unwrap_or_else(|| self.tx_manager.begin());
1442            wal.checkpoint(tx_id, epoch)?;
1443            wal.sync()?;
1444        }
1445        Ok(())
1446    }
1447
1448    // =========================================================================
1449    // ADMIN API: Persistence Control
1450    // =========================================================================
1451
1452    /// Saves the database to a file path.
1453    ///
1454    /// - If in-memory: creates a new persistent database at path
1455    /// - If file-backed: creates a copy at the new path
1456    ///
1457    /// The original database remains unchanged.
1458    ///
1459    /// # Errors
1460    ///
1461    /// Returns an error if the save operation fails.
1462    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
1463        let path = path.as_ref();
1464
1465        // Create target database with WAL enabled
1466        let target_config = Config::persistent(path);
1467        let target = Self::with_config(target_config)?;
1468
1469        // Copy all nodes using WAL-enabled methods
1470        for node in self.store.all_nodes() {
1471            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1472            target.store.create_node_with_id(node.id, &label_refs);
1473
1474            // Log to WAL
1475            target.log_wal(&WalRecord::CreateNode {
1476                id: node.id,
1477                labels: node.labels.iter().map(|s| s.to_string()).collect(),
1478            })?;
1479
1480            // Copy properties
1481            for (key, value) in node.properties {
1482                target
1483                    .store
1484                    .set_node_property(node.id, key.as_str(), value.clone());
1485                target.log_wal(&WalRecord::SetNodeProperty {
1486                    id: node.id,
1487                    key: key.to_string(),
1488                    value,
1489                })?;
1490            }
1491        }
1492
1493        // Copy all edges using WAL-enabled methods
1494        for edge in self.store.all_edges() {
1495            target
1496                .store
1497                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1498
1499            // Log to WAL
1500            target.log_wal(&WalRecord::CreateEdge {
1501                id: edge.id,
1502                src: edge.src,
1503                dst: edge.dst,
1504                edge_type: edge.edge_type.to_string(),
1505            })?;
1506
1507            // Copy properties
1508            for (key, value) in edge.properties {
1509                target
1510                    .store
1511                    .set_edge_property(edge.id, key.as_str(), value.clone());
1512                target.log_wal(&WalRecord::SetEdgeProperty {
1513                    id: edge.id,
1514                    key: key.to_string(),
1515                    value,
1516                })?;
1517            }
1518        }
1519
1520        // Checkpoint and close the target database
1521        target.close()?;
1522
1523        Ok(())
1524    }
1525
1526    /// Creates an in-memory copy of this database.
1527    ///
1528    /// Returns a new database that is completely independent.
1529    /// Useful for:
1530    /// - Testing modifications without affecting the original
1531    /// - Faster operations when persistence isn't needed
1532    ///
1533    /// # Errors
1534    ///
1535    /// Returns an error if the copy operation fails.
1536    pub fn to_memory(&self) -> Result<Self> {
1537        let config = Config::in_memory();
1538        let target = Self::with_config(config)?;
1539
1540        // Copy all nodes
1541        for node in self.store.all_nodes() {
1542            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1543            target.store.create_node_with_id(node.id, &label_refs);
1544
1545            // Copy properties
1546            for (key, value) in node.properties {
1547                target.store.set_node_property(node.id, key.as_str(), value);
1548            }
1549        }
1550
1551        // Copy all edges
1552        for edge in self.store.all_edges() {
1553            target
1554                .store
1555                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1556
1557            // Copy properties
1558            for (key, value) in edge.properties {
1559                target.store.set_edge_property(edge.id, key.as_str(), value);
1560            }
1561        }
1562
1563        Ok(target)
1564    }
1565
1566    /// Opens a database file and loads it entirely into memory.
1567    ///
1568    /// The returned database has no connection to the original file.
1569    /// Changes will NOT be written back to the file.
1570    ///
1571    /// # Errors
1572    ///
1573    /// Returns an error if the file can't be opened or loaded.
1574    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
1575        // Open the source database (triggers WAL recovery)
1576        let source = Self::open(path)?;
1577
1578        // Create in-memory copy
1579        let target = source.to_memory()?;
1580
1581        // Close the source (releases file handles)
1582        source.close()?;
1583
1584        Ok(target)
1585    }
1586
1587    // =========================================================================
1588    // ADMIN API: Iteration
1589    // =========================================================================
1590
1591    /// Returns an iterator over all nodes in the database.
1592    ///
1593    /// Useful for dump/export operations.
1594    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1595        self.store.all_nodes()
1596    }
1597
1598    /// Returns an iterator over all edges in the database.
1599    ///
1600    /// Useful for dump/export operations.
1601    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1602        self.store.all_edges()
1603    }
1604}
1605
1606impl Drop for GrafeoDB {
1607    fn drop(&mut self) {
1608        if let Err(e) = self.close() {
1609            tracing::error!("Error closing database: {}", e);
1610        }
1611    }
1612}
1613
1614/// The result of running a query.
1615///
1616/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1617/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1618///
1619/// # Examples
1620///
1621/// ```
1622/// use grafeo_engine::GrafeoDB;
1623///
1624/// let db = GrafeoDB::new_in_memory();
1625/// db.create_node(&["Person"]);
1626///
1627/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1628///
1629/// // Check what we got
1630/// println!("Columns: {:?}", result.columns);
1631/// println!("Rows: {}", result.row_count());
1632///
1633/// // Iterate through results
1634/// for row in result.iter() {
1635///     println!("{:?}", row);
1636/// }
1637/// # Ok::<(), grafeo_common::utils::error::Error>(())
1638/// ```
1639#[derive(Debug)]
1640pub struct QueryResult {
1641    /// Column names from the RETURN clause.
1642    pub columns: Vec<String>,
1643    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1644    pub column_types: Vec<grafeo_common::types::LogicalType>,
1645    /// The actual result rows.
1646    pub rows: Vec<Vec<grafeo_common::types::Value>>,
1647    /// Query execution time in milliseconds (if timing was enabled).
1648    pub execution_time_ms: Option<f64>,
1649    /// Number of rows scanned during query execution (estimate).
1650    pub rows_scanned: Option<u64>,
1651}
1652
1653impl QueryResult {
1654    /// Creates a new empty query result.
1655    #[must_use]
1656    pub fn new(columns: Vec<String>) -> Self {
1657        let len = columns.len();
1658        Self {
1659            columns,
1660            column_types: vec![grafeo_common::types::LogicalType::Any; len],
1661            rows: Vec::new(),
1662            execution_time_ms: None,
1663            rows_scanned: None,
1664        }
1665    }
1666
1667    /// Creates a new empty query result with column types.
1668    #[must_use]
1669    pub fn with_types(
1670        columns: Vec<String>,
1671        column_types: Vec<grafeo_common::types::LogicalType>,
1672    ) -> Self {
1673        Self {
1674            columns,
1675            column_types,
1676            rows: Vec::new(),
1677            execution_time_ms: None,
1678            rows_scanned: None,
1679        }
1680    }
1681
1682    /// Sets the execution metrics on this result.
1683    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1684        self.execution_time_ms = Some(execution_time_ms);
1685        self.rows_scanned = Some(rows_scanned);
1686        self
1687    }
1688
1689    /// Returns the execution time in milliseconds, if available.
1690    #[must_use]
1691    pub fn execution_time_ms(&self) -> Option<f64> {
1692        self.execution_time_ms
1693    }
1694
1695    /// Returns the number of rows scanned, if available.
1696    #[must_use]
1697    pub fn rows_scanned(&self) -> Option<u64> {
1698        self.rows_scanned
1699    }
1700
1701    /// Returns the number of rows.
1702    #[must_use]
1703    pub fn row_count(&self) -> usize {
1704        self.rows.len()
1705    }
1706
1707    /// Returns the number of columns.
1708    #[must_use]
1709    pub fn column_count(&self) -> usize {
1710        self.columns.len()
1711    }
1712
1713    /// Returns true if the result is empty.
1714    #[must_use]
1715    pub fn is_empty(&self) -> bool {
1716        self.rows.is_empty()
1717    }
1718
1719    /// Extracts a single value from the result.
1720    ///
1721    /// Use this when your query returns exactly one row with one column,
1722    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1723    ///
1724    /// # Errors
1725    ///
1726    /// Returns an error if the result has multiple rows or columns.
1727    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1728        if self.rows.len() != 1 || self.columns.len() != 1 {
1729            return Err(grafeo_common::utils::error::Error::InvalidValue(
1730                "Expected single value".to_string(),
1731            ));
1732        }
1733        T::from_value(&self.rows[0][0])
1734    }
1735
1736    /// Returns an iterator over the rows.
1737    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1738        self.rows.iter()
1739    }
1740}
1741
1742/// Converts a [`Value`](grafeo_common::types::Value) to a concrete Rust type.
1743///
1744/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1745/// Used by [`QueryResult::scalar()`] to extract typed values.
1746pub trait FromValue: Sized {
1747    /// Attempts the conversion, returning an error on type mismatch.
1748    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1749}
1750
1751impl FromValue for i64 {
1752    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1753        value
1754            .as_int64()
1755            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1756                expected: "INT64".to_string(),
1757                found: value.type_name().to_string(),
1758            })
1759    }
1760}
1761
1762impl FromValue for f64 {
1763    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1764        value
1765            .as_float64()
1766            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1767                expected: "FLOAT64".to_string(),
1768                found: value.type_name().to_string(),
1769            })
1770    }
1771}
1772
1773impl FromValue for String {
1774    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1775        value.as_str().map(String::from).ok_or_else(|| {
1776            grafeo_common::utils::error::Error::TypeMismatch {
1777                expected: "STRING".to_string(),
1778                found: value.type_name().to_string(),
1779            }
1780        })
1781    }
1782}
1783
1784impl FromValue for bool {
1785    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1786        value
1787            .as_bool()
1788            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1789                expected: "BOOL".to_string(),
1790                found: value.type_name().to_string(),
1791            })
1792    }
1793}
1794
1795#[cfg(test)]
1796mod tests {
1797    use super::*;
1798
1799    #[test]
1800    fn test_create_in_memory_database() {
1801        let db = GrafeoDB::new_in_memory();
1802        assert_eq!(db.node_count(), 0);
1803        assert_eq!(db.edge_count(), 0);
1804    }
1805
1806    #[test]
1807    fn test_database_config() {
1808        let config = Config::in_memory().with_threads(4).with_query_logging();
1809
1810        let db = GrafeoDB::with_config(config).unwrap();
1811        assert_eq!(db.config().threads, 4);
1812        assert!(db.config().query_logging);
1813    }
1814
1815    #[test]
1816    fn test_database_session() {
1817        let db = GrafeoDB::new_in_memory();
1818        let _session = db.session();
1819        // Session should be created successfully
1820    }
1821
1822    #[test]
1823    fn test_persistent_database_recovery() {
1824        use grafeo_common::types::Value;
1825        use tempfile::tempdir;
1826
1827        let dir = tempdir().unwrap();
1828        let db_path = dir.path().join("test_db");
1829
1830        // Create database and add some data
1831        {
1832            let db = GrafeoDB::open(&db_path).unwrap();
1833
1834            let alice = db.create_node(&["Person"]);
1835            db.set_node_property(alice, "name", Value::from("Alice"));
1836
1837            let bob = db.create_node(&["Person"]);
1838            db.set_node_property(bob, "name", Value::from("Bob"));
1839
1840            let _edge = db.create_edge(alice, bob, "KNOWS");
1841
1842            // Explicitly close to flush WAL
1843            db.close().unwrap();
1844        }
1845
1846        // Reopen and verify data was recovered
1847        {
1848            let db = GrafeoDB::open(&db_path).unwrap();
1849
1850            assert_eq!(db.node_count(), 2);
1851            assert_eq!(db.edge_count(), 1);
1852
1853            // Verify nodes exist
1854            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1855            assert!(node0.is_some());
1856
1857            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1858            assert!(node1.is_some());
1859        }
1860    }
1861
1862    #[test]
1863    fn test_wal_logging() {
1864        use tempfile::tempdir;
1865
1866        let dir = tempdir().unwrap();
1867        let db_path = dir.path().join("wal_test_db");
1868
1869        let db = GrafeoDB::open(&db_path).unwrap();
1870
1871        // Create some data
1872        let node = db.create_node(&["Test"]);
1873        db.delete_node(node);
1874
1875        // WAL should have records
1876        if let Some(wal) = db.wal() {
1877            assert!(wal.record_count() > 0);
1878        }
1879
1880        db.close().unwrap();
1881    }
1882
1883    #[test]
1884    fn test_wal_recovery_multiple_sessions() {
1885        // Tests that WAL recovery works correctly across multiple open/close cycles
1886        use grafeo_common::types::Value;
1887        use tempfile::tempdir;
1888
1889        let dir = tempdir().unwrap();
1890        let db_path = dir.path().join("multi_session_db");
1891
1892        // Session 1: Create initial data
1893        {
1894            let db = GrafeoDB::open(&db_path).unwrap();
1895            let alice = db.create_node(&["Person"]);
1896            db.set_node_property(alice, "name", Value::from("Alice"));
1897            db.close().unwrap();
1898        }
1899
1900        // Session 2: Add more data
1901        {
1902            let db = GrafeoDB::open(&db_path).unwrap();
1903            assert_eq!(db.node_count(), 1); // Previous data recovered
1904            let bob = db.create_node(&["Person"]);
1905            db.set_node_property(bob, "name", Value::from("Bob"));
1906            db.close().unwrap();
1907        }
1908
1909        // Session 3: Verify all data
1910        {
1911            let db = GrafeoDB::open(&db_path).unwrap();
1912            assert_eq!(db.node_count(), 2);
1913
1914            // Verify properties were recovered correctly
1915            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1916            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1917
1918            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1919            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1920        }
1921    }
1922
1923    #[test]
1924    fn test_database_consistency_after_mutations() {
1925        // Tests that database remains consistent after a series of create/delete operations
1926        use grafeo_common::types::Value;
1927        use tempfile::tempdir;
1928
1929        let dir = tempdir().unwrap();
1930        let db_path = dir.path().join("consistency_db");
1931
1932        {
1933            let db = GrafeoDB::open(&db_path).unwrap();
1934
1935            // Create nodes
1936            let a = db.create_node(&["Node"]);
1937            let b = db.create_node(&["Node"]);
1938            let c = db.create_node(&["Node"]);
1939
1940            // Create edges
1941            let e1 = db.create_edge(a, b, "LINKS");
1942            let _e2 = db.create_edge(b, c, "LINKS");
1943
1944            // Delete middle node and its edge
1945            db.delete_edge(e1);
1946            db.delete_node(b);
1947
1948            // Set properties on remaining nodes
1949            db.set_node_property(a, "value", Value::Int64(1));
1950            db.set_node_property(c, "value", Value::Int64(3));
1951
1952            db.close().unwrap();
1953        }
1954
1955        // Reopen and verify consistency
1956        {
1957            let db = GrafeoDB::open(&db_path).unwrap();
1958
1959            // Should have 2 nodes (a and c), b was deleted
1960            // Note: node_count includes deleted nodes in some implementations
1961            // What matters is that the non-deleted nodes are accessible
1962            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1963            assert!(node_a.is_some());
1964
1965            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1966            assert!(node_c.is_some());
1967
1968            // Middle node should be deleted
1969            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1970            assert!(node_b.is_none());
1971        }
1972    }
1973
1974    #[test]
1975    fn test_close_is_idempotent() {
1976        // Calling close() multiple times should not cause errors
1977        use tempfile::tempdir;
1978
1979        let dir = tempdir().unwrap();
1980        let db_path = dir.path().join("close_test_db");
1981
1982        let db = GrafeoDB::open(&db_path).unwrap();
1983        db.create_node(&["Test"]);
1984
1985        // First close should succeed
1986        assert!(db.close().is_ok());
1987
1988        // Second close should also succeed (idempotent)
1989        assert!(db.close().is_ok());
1990    }
1991
1992    #[test]
1993    fn test_query_result_has_metrics() {
1994        // Verifies that query results include execution metrics
1995        let db = GrafeoDB::new_in_memory();
1996        db.create_node(&["Person"]);
1997        db.create_node(&["Person"]);
1998
1999        #[cfg(feature = "gql")]
2000        {
2001            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2002
2003            // Metrics should be populated
2004            assert!(result.execution_time_ms.is_some());
2005            assert!(result.rows_scanned.is_some());
2006            assert!(result.execution_time_ms.unwrap() >= 0.0);
2007            assert_eq!(result.rows_scanned.unwrap(), 2);
2008        }
2009    }
2010
2011    #[test]
2012    fn test_empty_query_result_metrics() {
2013        // Verifies metrics are correct for queries returning no results
2014        let db = GrafeoDB::new_in_memory();
2015        db.create_node(&["Person"]);
2016
2017        #[cfg(feature = "gql")]
2018        {
2019            // Query that matches nothing
2020            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2021
2022            assert!(result.execution_time_ms.is_some());
2023            assert!(result.rows_scanned.is_some());
2024            assert_eq!(result.rows_scanned.unwrap(), 0);
2025        }
2026    }
2027}