Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21#[cfg(feature = "rdf")]
22mod rdf_ops;
23mod search;
24#[cfg(feature = "wal")]
25pub(crate) mod wal_store;
26
27#[cfg(feature = "wal")]
28use std::path::Path;
29use std::sync::Arc;
30use std::sync::atomic::AtomicUsize;
31
32use parking_lot::RwLock;
33
34#[cfg(feature = "grafeo-file")]
35use grafeo_adapters::storage::file::GrafeoFileManager;
36#[cfg(feature = "wal")]
37use grafeo_adapters::storage::wal::{
38    DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
39};
40use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
41use grafeo_common::utils::error::Result;
42use grafeo_core::graph::GraphStoreMut;
43use grafeo_core::graph::lpg::LpgStore;
44#[cfg(feature = "rdf")]
45use grafeo_core::graph::rdf::RdfStore;
46
47use crate::catalog::Catalog;
48use crate::config::Config;
49use crate::query::cache::QueryCache;
50use crate::session::Session;
51use crate::transaction::TransactionManager;
52
53/// Your handle to a Grafeo database.
54///
55/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
56/// quick experiments, or [`open()`](Self::open) for persistent storage.
57/// Then grab a [`session()`](Self::session) to start querying.
58///
59/// # Examples
60///
61/// ```
62/// use grafeo_engine::GrafeoDB;
63///
64/// // Quick in-memory database
65/// let db = GrafeoDB::new_in_memory();
66///
67/// // Add some data
68/// db.create_node(&["Person"]);
69///
70/// // Query it
71/// let session = db.session();
72/// let result = session.execute("MATCH (p:Person) RETURN p")?;
73/// # Ok::<(), grafeo_common::utils::error::Error>(())
74/// ```
75pub struct GrafeoDB {
76    /// Database configuration.
77    pub(super) config: Config,
78    /// The underlying graph store.
79    pub(super) store: Arc<LpgStore>,
80    /// Schema and metadata catalog shared across sessions.
81    pub(super) catalog: Arc<Catalog>,
82    /// RDF triple store (if RDF feature is enabled).
83    #[cfg(feature = "rdf")]
84    pub(super) rdf_store: Arc<RdfStore>,
85    /// Transaction manager.
86    pub(super) transaction_manager: Arc<TransactionManager>,
87    /// Unified buffer manager.
88    pub(super) buffer_manager: Arc<BufferManager>,
89    /// Write-ahead log manager (if durability is enabled).
90    #[cfg(feature = "wal")]
91    pub(super) wal: Option<Arc<LpgWal>>,
92    /// Shared WAL graph context tracker. Tracks which named graph was last
93    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
94    /// records only when the context actually changes.
95    #[cfg(feature = "wal")]
96    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
97    /// Query cache for parsed and optimized plans.
98    pub(super) query_cache: Arc<QueryCache>,
99    /// Shared commit counter for auto-GC across sessions.
100    pub(super) commit_counter: Arc<AtomicUsize>,
101    /// Whether the database is open.
102    pub(super) is_open: RwLock<bool>,
103    /// Change data capture log for tracking mutations.
104    #[cfg(feature = "cdc")]
105    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
106    /// Registered embedding models for text-to-vector conversion.
107    #[cfg(feature = "embed")]
108    pub(super) embedding_models:
109        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
110    /// Single-file database manager (when using `.grafeo` format).
111    #[cfg(feature = "grafeo-file")]
112    pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
113    /// External graph store (when using with_store()).
114    /// When set, sessions route queries through this store instead of the built-in LpgStore.
115    pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
116    /// Metrics registry shared across all sessions.
117    #[cfg(feature = "metrics")]
118    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
119    /// Persistent graph context for one-shot `execute()` calls.
120    /// When set, each call to `session()` pre-configures the session to this graph.
121    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
122    current_graph: RwLock<Option<String>>,
123}
124
125impl GrafeoDB {
126    /// Creates an in-memory database, fast to create, gone when dropped.
127    ///
128    /// Use this for tests, experiments, or when you don't need persistence.
129    /// For data that survives restarts, use [`open()`](Self::open) instead.
130    ///
131    /// # Panics
132    ///
133    /// Panics if the internal arena allocator cannot be initialized (out of memory).
134    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
135    ///
136    /// # Examples
137    ///
138    /// ```
139    /// use grafeo_engine::GrafeoDB;
140    ///
141    /// let db = GrafeoDB::new_in_memory();
142    /// let session = db.session();
143    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
144    /// # Ok::<(), grafeo_common::utils::error::Error>(())
145    /// ```
146    #[must_use]
147    pub fn new_in_memory() -> Self {
148        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
149    }
150
151    /// Opens a database at the given path, creating it if it doesn't exist.
152    ///
153    /// If you've used this path before, Grafeo recovers your data from the
154    /// write-ahead log automatically. First open on a new path creates an
155    /// empty database.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the path isn't writable or recovery fails.
160    ///
161    /// # Examples
162    ///
163    /// ```no_run
164    /// use grafeo_engine::GrafeoDB;
165    ///
166    /// let db = GrafeoDB::open("./my_social_network")?;
167    /// # Ok::<(), grafeo_common::utils::error::Error>(())
168    /// ```
169    #[cfg(feature = "wal")]
170    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
171        Self::with_config(Config::persistent(path.as_ref()))
172    }
173
174    /// Creates a database with custom configuration.
175    ///
176    /// Use this when you need fine-grained control over memory limits,
177    /// thread counts, or persistence settings. For most cases,
178    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
179    /// are simpler.
180    ///
181    /// # Errors
182    ///
183    /// Returns an error if the database can't be created or recovery fails.
184    ///
185    /// # Examples
186    ///
187    /// ```
188    /// use grafeo_engine::{GrafeoDB, Config};
189    ///
190    /// // In-memory with a 512MB limit
191    /// let config = Config::in_memory()
192    ///     .with_memory_limit(512 * 1024 * 1024);
193    ///
194    /// let db = GrafeoDB::with_config(config)?;
195    /// # Ok::<(), grafeo_common::utils::error::Error>(())
196    /// ```
197    pub fn with_config(config: Config) -> Result<Self> {
198        // Validate configuration before proceeding
199        config
200            .validate()
201            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
202
203        let store = Arc::new(LpgStore::new()?);
204        #[cfg(feature = "rdf")]
205        let rdf_store = Arc::new(RdfStore::new());
206        let transaction_manager = Arc::new(TransactionManager::new());
207
208        // Create buffer manager with configured limits
209        let buffer_config = BufferManagerConfig {
210            budget: config.memory_limit.unwrap_or_else(|| {
211                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
212            }),
213            spill_path: config
214                .spill_path
215                .clone()
216                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
217            ..BufferManagerConfig::default()
218        };
219        let buffer_manager = BufferManager::new(buffer_config);
220
221        // Create catalog early so WAL replay can restore schema definitions
222        let catalog = Arc::new(Catalog::new());
223
224        // --- Single-file format (.grafeo) ---
225        #[cfg(feature = "grafeo-file")]
226        let file_manager: Option<Arc<GrafeoFileManager>> = if config.wal_enabled {
227            if let Some(ref db_path) = config.path {
228                if Self::should_use_single_file(db_path, config.storage_format) {
229                    let fm = if db_path.exists() && db_path.is_file() {
230                        GrafeoFileManager::open(db_path)?
231                    } else if !db_path.exists() {
232                        GrafeoFileManager::create(db_path)?
233                    } else {
234                        // Path exists but is not a file (directory, etc.)
235                        return Err(grafeo_common::utils::error::Error::Internal(format!(
236                            "path exists but is not a file: {}",
237                            db_path.display()
238                        )));
239                    };
240
241                    // Load snapshot data from the file
242                    let snapshot_data = fm.read_snapshot()?;
243                    if !snapshot_data.is_empty() {
244                        Self::apply_snapshot_data(
245                            &store,
246                            &catalog,
247                            #[cfg(feature = "rdf")]
248                            &rdf_store,
249                            &snapshot_data,
250                        )?;
251                    }
252
253                    // Recover sidecar WAL if present
254                    if fm.has_sidecar_wal() {
255                        let recovery = WalRecovery::new(fm.sidecar_wal_path());
256                        let records = recovery.recover()?;
257                        Self::apply_wal_records(
258                            &store,
259                            &catalog,
260                            #[cfg(feature = "rdf")]
261                            &rdf_store,
262                            &records,
263                        )?;
264                    }
265
266                    Some(Arc::new(fm))
267                } else {
268                    None
269                }
270            } else {
271                None
272            }
273        } else {
274            None
275        };
276
277        // Determine whether to use the WAL directory path (legacy) or sidecar
278        #[cfg(feature = "wal")]
279        let wal = if config.wal_enabled {
280            if let Some(ref db_path) = config.path {
281                // When using single-file format, the WAL is a sidecar directory
282                #[cfg(feature = "grafeo-file")]
283                let wal_path = if let Some(ref fm) = file_manager {
284                    let p = fm.sidecar_wal_path();
285                    std::fs::create_dir_all(&p)?;
286                    p
287                } else {
288                    // Legacy: WAL inside the database directory
289                    std::fs::create_dir_all(db_path)?;
290                    db_path.join("wal")
291                };
292
293                #[cfg(not(feature = "grafeo-file"))]
294                let wal_path = {
295                    std::fs::create_dir_all(db_path)?;
296                    db_path.join("wal")
297                };
298
299                // For legacy WAL directory format, check if WAL exists and recover
300                #[cfg(feature = "grafeo-file")]
301                let is_single_file = file_manager.is_some();
302                #[cfg(not(feature = "grafeo-file"))]
303                let is_single_file = false;
304
305                if !is_single_file && wal_path.exists() {
306                    let recovery = WalRecovery::new(&wal_path);
307                    let records = recovery.recover()?;
308                    Self::apply_wal_records(
309                        &store,
310                        &catalog,
311                        #[cfg(feature = "rdf")]
312                        &rdf_store,
313                        &records,
314                    )?;
315                }
316
317                // Open/create WAL manager with configured durability
318                let wal_durability = match config.wal_durability {
319                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
320                    crate::config::DurabilityMode::Batch {
321                        max_delay_ms,
322                        max_records,
323                    } => WalDurabilityMode::Batch {
324                        max_delay_ms,
325                        max_records,
326                    },
327                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
328                        WalDurabilityMode::Adaptive { target_interval_ms }
329                    }
330                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
331                };
332                let wal_config = WalConfig {
333                    durability: wal_durability,
334                    ..WalConfig::default()
335                };
336                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
337                Some(Arc::new(wal_manager))
338            } else {
339                None
340            }
341        } else {
342            None
343        };
344
345        // Create query cache with default capacity (1000 queries)
346        let query_cache = Arc::new(QueryCache::default());
347
348        Ok(Self {
349            config,
350            store,
351            catalog,
352            #[cfg(feature = "rdf")]
353            rdf_store,
354            transaction_manager,
355            buffer_manager,
356            #[cfg(feature = "wal")]
357            wal,
358            #[cfg(feature = "wal")]
359            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
360            query_cache,
361            commit_counter: Arc::new(AtomicUsize::new(0)),
362            is_open: RwLock::new(true),
363            #[cfg(feature = "cdc")]
364            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
365            #[cfg(feature = "embed")]
366            embedding_models: RwLock::new(hashbrown::HashMap::new()),
367            #[cfg(feature = "grafeo-file")]
368            file_manager,
369            external_store: None,
370            #[cfg(feature = "metrics")]
371            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
372            current_graph: RwLock::new(None),
373        })
374    }
375
376    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
377    ///
378    /// The external store handles all data persistence. WAL, CDC, and index
379    /// management are the responsibility of the store implementation.
380    ///
381    /// Query execution (all 6 languages, optimizer, planner) works through the
382    /// provided store. Admin operations (schema introspection, persistence,
383    /// vector/text indexes) are not available on external stores.
384    ///
385    /// # Examples
386    ///
387    /// ```no_run
388    /// use std::sync::Arc;
389    /// use grafeo_engine::{GrafeoDB, Config};
390    /// use grafeo_core::graph::GraphStoreMut;
391    ///
392    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
393    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
394    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
395    ///     Ok(())
396    /// }
397    /// ```
398    ///
399    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
400    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
401        config
402            .validate()
403            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
404
405        let dummy_store = Arc::new(LpgStore::new()?);
406        let transaction_manager = Arc::new(TransactionManager::new());
407
408        let buffer_config = BufferManagerConfig {
409            budget: config.memory_limit.unwrap_or_else(|| {
410                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
411            }),
412            spill_path: None,
413            ..BufferManagerConfig::default()
414        };
415        let buffer_manager = BufferManager::new(buffer_config);
416
417        let query_cache = Arc::new(QueryCache::default());
418
419        Ok(Self {
420            config,
421            store: dummy_store,
422            catalog: Arc::new(Catalog::new()),
423            #[cfg(feature = "rdf")]
424            rdf_store: Arc::new(RdfStore::new()),
425            transaction_manager,
426            buffer_manager,
427            #[cfg(feature = "wal")]
428            wal: None,
429            #[cfg(feature = "wal")]
430            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
431            query_cache,
432            commit_counter: Arc::new(AtomicUsize::new(0)),
433            is_open: RwLock::new(true),
434            #[cfg(feature = "cdc")]
435            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
436            #[cfg(feature = "embed")]
437            embedding_models: RwLock::new(hashbrown::HashMap::new()),
438            #[cfg(feature = "grafeo-file")]
439            file_manager: None,
440            external_store: Some(store),
441            #[cfg(feature = "metrics")]
442            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
443            current_graph: RwLock::new(None),
444        })
445    }
446
447    /// Applies WAL records to restore the database state.
448    ///
449    /// Data mutation records are routed through a graph cursor that tracks
450    /// `SwitchGraph` context markers, replaying mutations into the correct
451    /// named graph (or the default graph when cursor is `None`).
452    #[cfg(feature = "wal")]
453    fn apply_wal_records(
454        store: &Arc<LpgStore>,
455        catalog: &Catalog,
456        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
457        records: &[WalRecord],
458    ) -> Result<()> {
459        use crate::catalog::{
460            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
461        };
462        use grafeo_common::utils::error::Error;
463
464        // Graph cursor: tracks which named graph receives data mutations.
465        // `None` means the default graph.
466        let mut current_graph: Option<String> = None;
467        let mut target_store: Arc<LpgStore> = Arc::clone(store);
468
469        for record in records {
470            match record {
471                // --- Named graph lifecycle ---
472                WalRecord::CreateNamedGraph { name } => {
473                    let _ = store.create_graph(name);
474                }
475                WalRecord::DropNamedGraph { name } => {
476                    store.drop_graph(name);
477                    // Reset cursor if the dropped graph was active
478                    if current_graph.as_deref() == Some(name.as_str()) {
479                        current_graph = None;
480                        target_store = Arc::clone(store);
481                    }
482                }
483                WalRecord::SwitchGraph { name } => {
484                    current_graph.clone_from(name);
485                    target_store = match &current_graph {
486                        None => Arc::clone(store),
487                        Some(graph_name) => store
488                            .graph_or_create(graph_name)
489                            .map_err(|e| Error::Internal(e.to_string()))?,
490                    };
491                }
492
493                // --- Data mutations: routed through target_store ---
494                WalRecord::CreateNode { id, labels } => {
495                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
496                    target_store.create_node_with_id(*id, &label_refs)?;
497                }
498                WalRecord::DeleteNode { id } => {
499                    target_store.delete_node(*id);
500                }
501                WalRecord::CreateEdge {
502                    id,
503                    src,
504                    dst,
505                    edge_type,
506                } => {
507                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
508                }
509                WalRecord::DeleteEdge { id } => {
510                    target_store.delete_edge(*id);
511                }
512                WalRecord::SetNodeProperty { id, key, value } => {
513                    target_store.set_node_property(*id, key, value.clone());
514                }
515                WalRecord::SetEdgeProperty { id, key, value } => {
516                    target_store.set_edge_property(*id, key, value.clone());
517                }
518                WalRecord::AddNodeLabel { id, label } => {
519                    target_store.add_label(*id, label);
520                }
521                WalRecord::RemoveNodeLabel { id, label } => {
522                    target_store.remove_label(*id, label);
523                }
524                WalRecord::RemoveNodeProperty { id, key } => {
525                    target_store.remove_node_property(*id, key);
526                }
527                WalRecord::RemoveEdgeProperty { id, key } => {
528                    target_store.remove_edge_property(*id, key);
529                }
530
531                // --- Schema DDL replay (always on root catalog) ---
532                WalRecord::CreateNodeType {
533                    name,
534                    properties,
535                    constraints,
536                } => {
537                    let def = NodeTypeDefinition {
538                        name: name.clone(),
539                        properties: properties
540                            .iter()
541                            .map(|(n, t, nullable)| TypedProperty {
542                                name: n.clone(),
543                                data_type: PropertyDataType::from_type_name(t),
544                                nullable: *nullable,
545                                default_value: None,
546                            })
547                            .collect(),
548                        constraints: constraints
549                            .iter()
550                            .map(|(kind, props)| match kind.as_str() {
551                                "unique" => TypeConstraint::Unique(props.clone()),
552                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
553                                "not_null" if !props.is_empty() => {
554                                    TypeConstraint::NotNull(props[0].clone())
555                                }
556                                _ => TypeConstraint::Unique(props.clone()),
557                            })
558                            .collect(),
559                        parent_types: Vec::new(),
560                    };
561                    let _ = catalog.register_node_type(def);
562                }
563                WalRecord::DropNodeType { name } => {
564                    let _ = catalog.drop_node_type(name);
565                }
566                WalRecord::CreateEdgeType {
567                    name,
568                    properties,
569                    constraints,
570                } => {
571                    let def = EdgeTypeDefinition {
572                        name: name.clone(),
573                        properties: properties
574                            .iter()
575                            .map(|(n, t, nullable)| TypedProperty {
576                                name: n.clone(),
577                                data_type: PropertyDataType::from_type_name(t),
578                                nullable: *nullable,
579                                default_value: None,
580                            })
581                            .collect(),
582                        constraints: constraints
583                            .iter()
584                            .map(|(kind, props)| match kind.as_str() {
585                                "unique" => TypeConstraint::Unique(props.clone()),
586                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
587                                "not_null" if !props.is_empty() => {
588                                    TypeConstraint::NotNull(props[0].clone())
589                                }
590                                _ => TypeConstraint::Unique(props.clone()),
591                            })
592                            .collect(),
593                        source_node_types: Vec::new(),
594                        target_node_types: Vec::new(),
595                    };
596                    let _ = catalog.register_edge_type_def(def);
597                }
598                WalRecord::DropEdgeType { name } => {
599                    let _ = catalog.drop_edge_type_def(name);
600                }
601                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
602                    // Index recreation is handled by the store on startup
603                    // (indexes are rebuilt from data, not WAL)
604                }
605                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
606                    // Constraint definitions are part of type definitions
607                    // and replayed via CreateNodeType/CreateEdgeType
608                }
609                WalRecord::CreateGraphType {
610                    name,
611                    node_types,
612                    edge_types,
613                    open,
614                } => {
615                    use crate::catalog::GraphTypeDefinition;
616                    let def = GraphTypeDefinition {
617                        name: name.clone(),
618                        allowed_node_types: node_types.clone(),
619                        allowed_edge_types: edge_types.clone(),
620                        open: *open,
621                    };
622                    let _ = catalog.register_graph_type(def);
623                }
624                WalRecord::DropGraphType { name } => {
625                    let _ = catalog.drop_graph_type(name);
626                }
627                WalRecord::CreateSchema { name } => {
628                    let _ = catalog.register_schema_namespace(name.clone());
629                }
630                WalRecord::DropSchema { name } => {
631                    let _ = catalog.drop_schema_namespace(name);
632                }
633
634                WalRecord::AlterNodeType { name, alterations } => {
635                    for (action, prop_name, type_name, nullable) in alterations {
636                        match action.as_str() {
637                            "add" => {
638                                let prop = TypedProperty {
639                                    name: prop_name.clone(),
640                                    data_type: PropertyDataType::from_type_name(type_name),
641                                    nullable: *nullable,
642                                    default_value: None,
643                                };
644                                let _ = catalog.alter_node_type_add_property(name, prop);
645                            }
646                            "drop" => {
647                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
648                            }
649                            _ => {}
650                        }
651                    }
652                }
653                WalRecord::AlterEdgeType { name, alterations } => {
654                    for (action, prop_name, type_name, nullable) in alterations {
655                        match action.as_str() {
656                            "add" => {
657                                let prop = TypedProperty {
658                                    name: prop_name.clone(),
659                                    data_type: PropertyDataType::from_type_name(type_name),
660                                    nullable: *nullable,
661                                    default_value: None,
662                                };
663                                let _ = catalog.alter_edge_type_add_property(name, prop);
664                            }
665                            "drop" => {
666                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
667                            }
668                            _ => {}
669                        }
670                    }
671                }
672                WalRecord::AlterGraphType { name, alterations } => {
673                    for (action, type_name) in alterations {
674                        match action.as_str() {
675                            "add_node" => {
676                                let _ =
677                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
678                            }
679                            "drop_node" => {
680                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
681                            }
682                            "add_edge" => {
683                                let _ =
684                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
685                            }
686                            "drop_edge" => {
687                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
688                            }
689                            _ => {}
690                        }
691                    }
692                }
693
694                WalRecord::CreateProcedure {
695                    name,
696                    params,
697                    returns,
698                    body,
699                } => {
700                    use crate::catalog::ProcedureDefinition;
701                    let def = ProcedureDefinition {
702                        name: name.clone(),
703                        params: params.clone(),
704                        returns: returns.clone(),
705                        body: body.clone(),
706                    };
707                    let _ = catalog.register_procedure(def);
708                }
709                WalRecord::DropProcedure { name } => {
710                    let _ = catalog.drop_procedure(name);
711                }
712
713                // --- RDF triple replay ---
714                #[cfg(feature = "rdf")]
715                WalRecord::InsertRdfTriple { .. }
716                | WalRecord::DeleteRdfTriple { .. }
717                | WalRecord::ClearRdfGraph { .. }
718                | WalRecord::CreateRdfGraph { .. }
719                | WalRecord::DropRdfGraph { .. } => {
720                    rdf_ops::replay_rdf_wal_record(rdf_store, record);
721                }
722                #[cfg(not(feature = "rdf"))]
723                WalRecord::InsertRdfTriple { .. }
724                | WalRecord::DeleteRdfTriple { .. }
725                | WalRecord::ClearRdfGraph { .. }
726                | WalRecord::CreateRdfGraph { .. }
727                | WalRecord::DropRdfGraph { .. } => {}
728
729                WalRecord::TransactionCommit { .. }
730                | WalRecord::TransactionAbort { .. }
731                | WalRecord::Checkpoint { .. } => {
732                    // Transaction control records don't need replay action
733                    // (recovery already filtered to only committed transactions)
734                }
735            }
736        }
737        Ok(())
738    }
739
740    // =========================================================================
741    // Single-file format helpers
742    // =========================================================================
743
744    /// Returns `true` if the given path should use single-file format.
745    #[cfg(feature = "grafeo-file")]
746    fn should_use_single_file(
747        path: &std::path::Path,
748        configured: crate::config::StorageFormat,
749    ) -> bool {
750        use crate::config::StorageFormat;
751        match configured {
752            StorageFormat::SingleFile => true,
753            StorageFormat::WalDirectory => false,
754            StorageFormat::Auto => {
755                // Existing file: check magic bytes
756                if path.is_file() {
757                    if let Ok(mut f) = std::fs::File::open(path) {
758                        use std::io::Read;
759                        let mut magic = [0u8; 4];
760                        if f.read_exact(&mut magic).is_ok()
761                            && magic == grafeo_adapters::storage::file::MAGIC
762                        {
763                            return true;
764                        }
765                    }
766                    return false;
767                }
768                // Existing directory: legacy format
769                if path.is_dir() {
770                    return false;
771                }
772                // New path: check extension
773                path.extension().is_some_and(|ext| ext == "grafeo")
774            }
775        }
776    }
777
778    /// Applies snapshot data (from a `.grafeo` file) to restore the store and catalog.
779    #[cfg(feature = "grafeo-file")]
780    fn apply_snapshot_data(
781        store: &Arc<LpgStore>,
782        catalog: &Arc<crate::catalog::Catalog>,
783        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
784        data: &[u8],
785    ) -> Result<()> {
786        persistence::load_snapshot_into_store(
787            store,
788            catalog,
789            #[cfg(feature = "rdf")]
790            rdf_store,
791            data,
792        )
793    }
794
795    // =========================================================================
796    // Session & Configuration
797    // =========================================================================
798
799    /// Opens a new session for running queries.
800    ///
801    /// Sessions are cheap to create: spin up as many as you need. Each
802    /// gets its own transaction context, so concurrent sessions won't
803    /// block each other on reads.
804    ///
805    /// # Panics
806    ///
807    /// Panics if the database was configured with an external graph store and
808    /// the internal arena allocator cannot be initialized (out of memory).
809    ///
810    /// # Examples
811    ///
812    /// ```
813    /// use grafeo_engine::GrafeoDB;
814    ///
815    /// let db = GrafeoDB::new_in_memory();
816    /// let session = db.session();
817    ///
818    /// // Run queries through the session
819    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
820    /// # Ok::<(), grafeo_common::utils::error::Error>(())
821    /// ```
822    #[must_use]
823    pub fn session(&self) -> Session {
824        let session_cfg = || crate::session::SessionConfig {
825            transaction_manager: Arc::clone(&self.transaction_manager),
826            query_cache: Arc::clone(&self.query_cache),
827            catalog: Arc::clone(&self.catalog),
828            adaptive_config: self.config.adaptive.clone(),
829            factorized_execution: self.config.factorized_execution,
830            graph_model: self.config.graph_model,
831            query_timeout: self.config.query_timeout,
832            commit_counter: Arc::clone(&self.commit_counter),
833            gc_interval: self.config.gc_interval,
834        };
835
836        if let Some(ref ext_store) = self.external_store {
837            return Session::with_external_store(Arc::clone(ext_store), session_cfg())
838                .expect("arena allocation for external store session");
839        }
840
841        #[cfg(feature = "rdf")]
842        let mut session = Session::with_rdf_store_and_adaptive(
843            Arc::clone(&self.store),
844            Arc::clone(&self.rdf_store),
845            session_cfg(),
846        );
847        #[cfg(not(feature = "rdf"))]
848        let mut session = Session::with_adaptive(Arc::clone(&self.store), session_cfg());
849
850        #[cfg(feature = "wal")]
851        if let Some(ref wal) = self.wal {
852            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
853        }
854
855        #[cfg(feature = "cdc")]
856        session.set_cdc_log(Arc::clone(&self.cdc_log));
857
858        #[cfg(feature = "metrics")]
859        {
860            if let Some(ref m) = self.metrics {
861                session.set_metrics(Arc::clone(m));
862                m.session_created
863                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
864                m.session_active
865                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
866            }
867        }
868
869        // Propagate persistent graph context to the new session
870        if let Some(ref graph) = *self.current_graph.read() {
871            session.use_graph(graph);
872        }
873
874        // Suppress unused_mut when cdc/wal are disabled
875        let _ = &mut session;
876
877        session
878    }
879
880    /// Returns the current graph name, if any.
881    ///
882    /// This is the persistent graph context used by one-shot `execute()` calls.
883    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
884    /// or `SESSION RESET`.
885    #[must_use]
886    pub fn current_graph(&self) -> Option<String> {
887        self.current_graph.read().clone()
888    }
889
890    /// Sets the current graph context for subsequent one-shot `execute()` calls.
891    ///
892    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
893    /// Pass `None` to reset to the default graph.
894    pub fn set_current_graph(&self, name: Option<&str>) {
895        *self.current_graph.write() = name.map(ToString::to_string);
896    }
897
898    /// Returns the adaptive execution configuration.
899    #[must_use]
900    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
901        &self.config.adaptive
902    }
903
904    /// Returns the configuration.
905    #[must_use]
906    pub fn config(&self) -> &Config {
907        &self.config
908    }
909
910    /// Returns the graph data model of this database.
911    #[must_use]
912    pub fn graph_model(&self) -> crate::config::GraphModel {
913        self.config.graph_model
914    }
915
916    /// Returns the configured memory limit in bytes, if any.
917    #[must_use]
918    pub fn memory_limit(&self) -> Option<usize> {
919        self.config.memory_limit
920    }
921
922    /// Returns a point-in-time snapshot of all metrics.
923    ///
924    /// If the `metrics` feature is disabled or the registry is not
925    /// initialized, returns a default (all-zero) snapshot.
926    #[cfg(feature = "metrics")]
927    #[must_use]
928    pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
929        let mut snapshot = self
930            .metrics
931            .as_ref()
932            .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
933
934        // Augment with cache stats from the query cache (not tracked in the registry)
935        let cache_stats = self.query_cache.stats();
936        snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
937        snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
938        snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
939        snapshot.cache_invalidations = cache_stats.invalidations;
940
941        snapshot
942    }
943
944    /// Resets all metrics counters and histograms to zero.
945    #[cfg(feature = "metrics")]
946    pub fn reset_metrics(&self) {
947        if let Some(ref m) = self.metrics {
948            m.reset();
949        }
950        self.query_cache.reset_stats();
951    }
952
953    /// Returns the underlying (default) store.
954    ///
955    /// This provides direct access to the LPG store for algorithm implementations
956    /// and admin operations (index management, schema introspection, MVCC internals).
957    ///
958    /// For code that only needs read/write graph operations, prefer
959    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
960    #[must_use]
961    pub fn store(&self) -> &Arc<LpgStore> {
962        &self.store
963    }
964
965    /// Returns the LPG store for the currently active graph.
966    ///
967    /// If [`current_graph`](Self::current_graph) is `None` or `"default"`, returns
968    /// the default store. Otherwise looks up the named graph in the root store.
969    /// Falls back to the default store if the named graph does not exist.
970    #[allow(dead_code)] // Reserved for future graph-aware CRUD methods
971    fn active_store(&self) -> Arc<LpgStore> {
972        let graph_name = self.current_graph.read().clone();
973        match graph_name {
974            None => Arc::clone(&self.store),
975            Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
976            Some(ref name) => self
977                .store
978                .graph(name)
979                .unwrap_or_else(|| Arc::clone(&self.store)),
980        }
981    }
982
983    // === Named Graph Management ===
984
985    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
986    ///
987    /// # Errors
988    ///
989    /// Returns an error if arena allocation fails.
990    pub fn create_graph(&self, name: &str) -> Result<bool> {
991        Ok(self.store.create_graph(name)?)
992    }
993
994    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
995    pub fn drop_graph(&self, name: &str) -> bool {
996        self.store.drop_graph(name)
997    }
998
999    /// Returns all named graph names.
1000    #[must_use]
1001    pub fn list_graphs(&self) -> Vec<String> {
1002        self.store.graph_names()
1003    }
1004
1005    /// Returns the graph store as a trait object.
1006    ///
1007    /// This provides the [`GraphStoreMut`] interface for code that should work
1008    /// with any storage backend. Use this when you only need graph read/write
1009    /// operations and don't need admin methods like index management.
1010    ///
1011    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
1012    #[must_use]
1013    pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
1014        if let Some(ref ext_store) = self.external_store {
1015            Arc::clone(ext_store)
1016        } else {
1017            Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
1018        }
1019    }
1020
1021    /// Garbage collects old MVCC versions that are no longer visible.
1022    ///
1023    /// Determines the minimum epoch required by active transactions and prunes
1024    /// version chains older than that threshold. Also cleans up completed
1025    /// transaction metadata in the transaction manager.
1026    pub fn gc(&self) {
1027        let min_epoch = self.transaction_manager.min_active_epoch();
1028        self.store.gc_versions(min_epoch);
1029        self.transaction_manager.gc();
1030    }
1031
1032    /// Returns the buffer manager for memory-aware operations.
1033    #[must_use]
1034    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1035        &self.buffer_manager
1036    }
1037
1038    /// Returns the query cache.
1039    #[must_use]
1040    pub fn query_cache(&self) -> &Arc<QueryCache> {
1041        &self.query_cache
1042    }
1043
1044    /// Clears all cached query plans.
1045    ///
1046    /// This is called automatically after DDL operations, but can also be
1047    /// invoked manually after external schema changes (e.g., WAL replay,
1048    /// import) or when you want to force re-optimization of all queries.
1049    pub fn clear_plan_cache(&self) {
1050        self.query_cache.clear();
1051    }
1052
1053    // =========================================================================
1054    // Lifecycle
1055    // =========================================================================
1056
1057    /// Closes the database, flushing all pending writes.
1058    ///
1059    /// For persistent databases, this ensures everything is safely on disk.
1060    /// Called automatically when the database is dropped, but you can call
1061    /// it explicitly if you need to guarantee durability at a specific point.
1062    ///
1063    /// # Errors
1064    ///
1065    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
1066    pub fn close(&self) -> Result<()> {
1067        let mut is_open = self.is_open.write();
1068        if !*is_open {
1069            return Ok(());
1070        }
1071
1072        // For single-file format: checkpoint to .grafeo file, then clean up sidecar WAL.
1073        // We must do this BEFORE the WAL close path because checkpoint_to_file
1074        // removes the sidecar WAL directory.
1075        #[cfg(feature = "grafeo-file")]
1076        let is_single_file = self.file_manager.is_some();
1077        #[cfg(not(feature = "grafeo-file"))]
1078        let is_single_file = false;
1079
1080        #[cfg(feature = "grafeo-file")]
1081        if let Some(ref fm) = self.file_manager {
1082            // Flush WAL first so all records are on disk before we snapshot
1083            #[cfg(feature = "wal")]
1084            if let Some(ref wal) = self.wal {
1085                wal.sync()?;
1086            }
1087            self.checkpoint_to_file(fm)?;
1088
1089            // Release WAL file handles before removing sidecar directory.
1090            // On Windows, open handles prevent directory deletion.
1091            #[cfg(feature = "wal")]
1092            if let Some(ref wal) = self.wal {
1093                wal.close_active_log();
1094            }
1095
1096            fm.remove_sidecar_wal()?;
1097            fm.close()?;
1098        }
1099
1100        // Commit and checkpoint WAL (legacy directory format only)
1101        #[cfg(feature = "wal")]
1102        if !is_single_file && let Some(ref wal) = self.wal {
1103            let epoch = self.store.current_epoch();
1104
1105            // Use the last assigned transaction ID, or create a checkpoint-only tx
1106            let checkpoint_tx = self
1107                .transaction_manager
1108                .last_assigned_transaction_id()
1109                .unwrap_or_else(|| {
1110                    // No transactions have been started; begin one for checkpoint
1111                    self.transaction_manager.begin()
1112                });
1113
1114            // Log a TransactionCommit to mark all pending records as committed
1115            wal.log(&WalRecord::TransactionCommit {
1116                transaction_id: checkpoint_tx,
1117            })?;
1118
1119            // Then checkpoint
1120            wal.checkpoint(checkpoint_tx, epoch)?;
1121            wal.sync()?;
1122        }
1123
1124        *is_open = false;
1125        Ok(())
1126    }
1127
1128    /// Returns the typed WAL if available.
1129    #[cfg(feature = "wal")]
1130    #[must_use]
1131    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1132        self.wal.as_ref()
1133    }
1134
1135    /// Logs a WAL record if WAL is enabled.
1136    #[cfg(feature = "wal")]
1137    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1138        if let Some(ref wal) = self.wal {
1139            wal.log(record)?;
1140        }
1141        Ok(())
1142    }
1143
1144    /// Writes the current database snapshot to the `.grafeo` file.
1145    ///
1146    /// Does NOT remove the sidecar WAL: callers that want to clean up
1147    /// the sidecar (e.g. `close()`) should call `fm.remove_sidecar_wal()`
1148    /// separately after this returns.
1149    #[cfg(feature = "grafeo-file")]
1150    fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1151        use grafeo_core::testing::crash::maybe_crash;
1152
1153        maybe_crash("checkpoint_to_file:before_export");
1154        let snapshot_data = self.export_snapshot()?;
1155        maybe_crash("checkpoint_to_file:after_export");
1156
1157        let epoch = self.store.current_epoch();
1158        let transaction_id = self
1159            .transaction_manager
1160            .last_assigned_transaction_id()
1161            .map_or(0, |t| t.0);
1162        let node_count = self.store.node_count() as u64;
1163        let edge_count = self.store.edge_count() as u64;
1164
1165        fm.write_snapshot(
1166            &snapshot_data,
1167            epoch.0,
1168            transaction_id,
1169            node_count,
1170            edge_count,
1171        )?;
1172
1173        maybe_crash("checkpoint_to_file:after_write_snapshot");
1174        Ok(())
1175    }
1176
1177    /// Returns the file manager if using single-file format.
1178    #[cfg(feature = "grafeo-file")]
1179    #[must_use]
1180    pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1181        self.file_manager.as_ref()
1182    }
1183}
1184
1185impl Drop for GrafeoDB {
1186    fn drop(&mut self) {
1187        if let Err(e) = self.close() {
1188            tracing::error!("Error closing database: {}", e);
1189        }
1190    }
1191}
1192
1193impl crate::admin::AdminService for GrafeoDB {
1194    fn info(&self) -> crate::admin::DatabaseInfo {
1195        self.info()
1196    }
1197
1198    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1199        self.detailed_stats()
1200    }
1201
1202    fn schema(&self) -> crate::admin::SchemaInfo {
1203        self.schema()
1204    }
1205
1206    fn validate(&self) -> crate::admin::ValidationResult {
1207        self.validate()
1208    }
1209
1210    fn wal_status(&self) -> crate::admin::WalStatus {
1211        self.wal_status()
1212    }
1213
1214    fn wal_checkpoint(&self) -> Result<()> {
1215        self.wal_checkpoint()
1216    }
1217}
1218
1219// =========================================================================
1220// Query Result Types
1221// =========================================================================
1222
1223/// The result of running a query.
1224///
1225/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1226/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1227///
1228/// # Examples
1229///
1230/// ```
1231/// use grafeo_engine::GrafeoDB;
1232///
1233/// let db = GrafeoDB::new_in_memory();
1234/// db.create_node(&["Person"]);
1235///
1236/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1237///
1238/// // Check what we got
1239/// println!("Columns: {:?}", result.columns);
1240/// println!("Rows: {}", result.row_count());
1241///
1242/// // Iterate through results
1243/// for row in result.iter() {
1244///     println!("{:?}", row);
1245/// }
1246/// # Ok::<(), grafeo_common::utils::error::Error>(())
1247/// ```
1248#[derive(Debug)]
1249pub struct QueryResult {
1250    /// Column names from the RETURN clause.
1251    pub columns: Vec<String>,
1252    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1253    pub column_types: Vec<grafeo_common::types::LogicalType>,
1254    /// The actual result rows.
1255    pub rows: Vec<Vec<grafeo_common::types::Value>>,
1256    /// Query execution time in milliseconds (if timing was enabled).
1257    pub execution_time_ms: Option<f64>,
1258    /// Number of rows scanned during query execution (estimate).
1259    pub rows_scanned: Option<u64>,
1260    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
1261    pub status_message: Option<String>,
1262    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
1263    pub gql_status: grafeo_common::utils::GqlStatus,
1264}
1265
1266impl QueryResult {
1267    /// Creates a fully empty query result (no columns, no rows).
1268    #[must_use]
1269    pub fn empty() -> Self {
1270        Self {
1271            columns: Vec::new(),
1272            column_types: Vec::new(),
1273            rows: Vec::new(),
1274            execution_time_ms: None,
1275            rows_scanned: None,
1276            status_message: None,
1277            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1278        }
1279    }
1280
1281    /// Creates a query result with only a status message (for DDL commands).
1282    #[must_use]
1283    pub fn status(msg: impl Into<String>) -> Self {
1284        Self {
1285            columns: Vec::new(),
1286            column_types: Vec::new(),
1287            rows: Vec::new(),
1288            execution_time_ms: None,
1289            rows_scanned: None,
1290            status_message: Some(msg.into()),
1291            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1292        }
1293    }
1294
1295    /// Creates a new empty query result.
1296    #[must_use]
1297    pub fn new(columns: Vec<String>) -> Self {
1298        let len = columns.len();
1299        Self {
1300            columns,
1301            column_types: vec![grafeo_common::types::LogicalType::Any; len],
1302            rows: Vec::new(),
1303            execution_time_ms: None,
1304            rows_scanned: None,
1305            status_message: None,
1306            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1307        }
1308    }
1309
1310    /// Creates a new empty query result with column types.
1311    #[must_use]
1312    pub fn with_types(
1313        columns: Vec<String>,
1314        column_types: Vec<grafeo_common::types::LogicalType>,
1315    ) -> Self {
1316        Self {
1317            columns,
1318            column_types,
1319            rows: Vec::new(),
1320            execution_time_ms: None,
1321            rows_scanned: None,
1322            status_message: None,
1323            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1324        }
1325    }
1326
1327    /// Sets the execution metrics on this result.
1328    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1329        self.execution_time_ms = Some(execution_time_ms);
1330        self.rows_scanned = Some(rows_scanned);
1331        self
1332    }
1333
1334    /// Returns the execution time in milliseconds, if available.
1335    #[must_use]
1336    pub fn execution_time_ms(&self) -> Option<f64> {
1337        self.execution_time_ms
1338    }
1339
1340    /// Returns the number of rows scanned, if available.
1341    #[must_use]
1342    pub fn rows_scanned(&self) -> Option<u64> {
1343        self.rows_scanned
1344    }
1345
1346    /// Returns the number of rows.
1347    #[must_use]
1348    pub fn row_count(&self) -> usize {
1349        self.rows.len()
1350    }
1351
1352    /// Returns the number of columns.
1353    #[must_use]
1354    pub fn column_count(&self) -> usize {
1355        self.columns.len()
1356    }
1357
1358    /// Returns true if the result is empty.
1359    #[must_use]
1360    pub fn is_empty(&self) -> bool {
1361        self.rows.is_empty()
1362    }
1363
1364    /// Extracts a single value from the result.
1365    ///
1366    /// Use this when your query returns exactly one row with one column,
1367    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1368    ///
1369    /// # Errors
1370    ///
1371    /// Returns an error if the result has multiple rows or columns.
1372    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1373        if self.rows.len() != 1 || self.columns.len() != 1 {
1374            return Err(grafeo_common::utils::error::Error::InvalidValue(
1375                "Expected single value".to_string(),
1376            ));
1377        }
1378        T::from_value(&self.rows[0][0])
1379    }
1380
1381    /// Returns an iterator over the rows.
1382    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1383        self.rows.iter()
1384    }
1385}
1386
1387impl std::fmt::Display for QueryResult {
1388    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1389        let table = grafeo_common::fmt::format_result_table(
1390            &self.columns,
1391            &self.rows,
1392            self.execution_time_ms,
1393            self.status_message.as_deref(),
1394        );
1395        f.write_str(&table)
1396    }
1397}
1398
1399/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
1400///
1401/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1402/// Used by [`QueryResult::scalar()`] to extract typed values.
1403pub trait FromValue: Sized {
1404    /// Attempts the conversion, returning an error on type mismatch.
1405    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1406}
1407
1408impl FromValue for i64 {
1409    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1410        value
1411            .as_int64()
1412            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1413                expected: "INT64".to_string(),
1414                found: value.type_name().to_string(),
1415            })
1416    }
1417}
1418
1419impl FromValue for f64 {
1420    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1421        value
1422            .as_float64()
1423            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1424                expected: "FLOAT64".to_string(),
1425                found: value.type_name().to_string(),
1426            })
1427    }
1428}
1429
1430impl FromValue for String {
1431    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1432        value.as_str().map(String::from).ok_or_else(|| {
1433            grafeo_common::utils::error::Error::TypeMismatch {
1434                expected: "STRING".to_string(),
1435                found: value.type_name().to_string(),
1436            }
1437        })
1438    }
1439}
1440
1441impl FromValue for bool {
1442    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1443        value
1444            .as_bool()
1445            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1446                expected: "BOOL".to_string(),
1447                found: value.type_name().to_string(),
1448            })
1449    }
1450}
1451
1452#[cfg(test)]
1453mod tests {
1454    use super::*;
1455
1456    #[test]
1457    fn test_create_in_memory_database() {
1458        let db = GrafeoDB::new_in_memory();
1459        assert_eq!(db.node_count(), 0);
1460        assert_eq!(db.edge_count(), 0);
1461    }
1462
1463    #[test]
1464    fn test_database_config() {
1465        let config = Config::in_memory().with_threads(4).with_query_logging();
1466
1467        let db = GrafeoDB::with_config(config).unwrap();
1468        assert_eq!(db.config().threads, 4);
1469        assert!(db.config().query_logging);
1470    }
1471
1472    #[test]
1473    fn test_database_session() {
1474        let db = GrafeoDB::new_in_memory();
1475        let _session = db.session();
1476        // Session should be created successfully
1477    }
1478
1479    #[cfg(feature = "wal")]
1480    #[test]
1481    fn test_persistent_database_recovery() {
1482        use grafeo_common::types::Value;
1483        use tempfile::tempdir;
1484
1485        let dir = tempdir().unwrap();
1486        let db_path = dir.path().join("test_db");
1487
1488        // Create database and add some data
1489        {
1490            let db = GrafeoDB::open(&db_path).unwrap();
1491
1492            let alix = db.create_node(&["Person"]);
1493            db.set_node_property(alix, "name", Value::from("Alix"));
1494
1495            let gus = db.create_node(&["Person"]);
1496            db.set_node_property(gus, "name", Value::from("Gus"));
1497
1498            let _edge = db.create_edge(alix, gus, "KNOWS");
1499
1500            // Explicitly close to flush WAL
1501            db.close().unwrap();
1502        }
1503
1504        // Reopen and verify data was recovered
1505        {
1506            let db = GrafeoDB::open(&db_path).unwrap();
1507
1508            assert_eq!(db.node_count(), 2);
1509            assert_eq!(db.edge_count(), 1);
1510
1511            // Verify nodes exist
1512            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1513            assert!(node0.is_some());
1514
1515            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1516            assert!(node1.is_some());
1517        }
1518    }
1519
1520    #[cfg(feature = "wal")]
1521    #[test]
1522    fn test_wal_logging() {
1523        use tempfile::tempdir;
1524
1525        let dir = tempdir().unwrap();
1526        let db_path = dir.path().join("wal_test_db");
1527
1528        let db = GrafeoDB::open(&db_path).unwrap();
1529
1530        // Create some data
1531        let node = db.create_node(&["Test"]);
1532        db.delete_node(node);
1533
1534        // WAL should have records
1535        if let Some(wal) = db.wal() {
1536            assert!(wal.record_count() > 0);
1537        }
1538
1539        db.close().unwrap();
1540    }
1541
1542    #[cfg(feature = "wal")]
1543    #[test]
1544    fn test_wal_recovery_multiple_sessions() {
1545        // Tests that WAL recovery works correctly across multiple open/close cycles
1546        use grafeo_common::types::Value;
1547        use tempfile::tempdir;
1548
1549        let dir = tempdir().unwrap();
1550        let db_path = dir.path().join("multi_session_db");
1551
1552        // Session 1: Create initial data
1553        {
1554            let db = GrafeoDB::open(&db_path).unwrap();
1555            let alix = db.create_node(&["Person"]);
1556            db.set_node_property(alix, "name", Value::from("Alix"));
1557            db.close().unwrap();
1558        }
1559
1560        // Session 2: Add more data
1561        {
1562            let db = GrafeoDB::open(&db_path).unwrap();
1563            assert_eq!(db.node_count(), 1); // Previous data recovered
1564            let gus = db.create_node(&["Person"]);
1565            db.set_node_property(gus, "name", Value::from("Gus"));
1566            db.close().unwrap();
1567        }
1568
1569        // Session 3: Verify all data
1570        {
1571            let db = GrafeoDB::open(&db_path).unwrap();
1572            assert_eq!(db.node_count(), 2);
1573
1574            // Verify properties were recovered correctly
1575            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1576            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1577
1578            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1579            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1580        }
1581    }
1582
1583    #[cfg(feature = "wal")]
1584    #[test]
1585    fn test_database_consistency_after_mutations() {
1586        // Tests that database remains consistent after a series of create/delete operations
1587        use grafeo_common::types::Value;
1588        use tempfile::tempdir;
1589
1590        let dir = tempdir().unwrap();
1591        let db_path = dir.path().join("consistency_db");
1592
1593        {
1594            let db = GrafeoDB::open(&db_path).unwrap();
1595
1596            // Create nodes
1597            let a = db.create_node(&["Node"]);
1598            let b = db.create_node(&["Node"]);
1599            let c = db.create_node(&["Node"]);
1600
1601            // Create edges
1602            let e1 = db.create_edge(a, b, "LINKS");
1603            let _e2 = db.create_edge(b, c, "LINKS");
1604
1605            // Delete middle node and its edge
1606            db.delete_edge(e1);
1607            db.delete_node(b);
1608
1609            // Set properties on remaining nodes
1610            db.set_node_property(a, "value", Value::Int64(1));
1611            db.set_node_property(c, "value", Value::Int64(3));
1612
1613            db.close().unwrap();
1614        }
1615
1616        // Reopen and verify consistency
1617        {
1618            let db = GrafeoDB::open(&db_path).unwrap();
1619
1620            // Should have 2 nodes (a and c), b was deleted
1621            // Note: node_count includes deleted nodes in some implementations
1622            // What matters is that the non-deleted nodes are accessible
1623            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1624            assert!(node_a.is_some());
1625
1626            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1627            assert!(node_c.is_some());
1628
1629            // Middle node should be deleted
1630            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1631            assert!(node_b.is_none());
1632        }
1633    }
1634
1635    #[cfg(feature = "wal")]
1636    #[test]
1637    fn test_close_is_idempotent() {
1638        // Calling close() multiple times should not cause errors
1639        use tempfile::tempdir;
1640
1641        let dir = tempdir().unwrap();
1642        let db_path = dir.path().join("close_test_db");
1643
1644        let db = GrafeoDB::open(&db_path).unwrap();
1645        db.create_node(&["Test"]);
1646
1647        // First close should succeed
1648        assert!(db.close().is_ok());
1649
1650        // Second close should also succeed (idempotent)
1651        assert!(db.close().is_ok());
1652    }
1653
1654    #[test]
1655    fn test_with_store_external_backend() {
1656        use grafeo_core::graph::lpg::LpgStore;
1657
1658        let external = Arc::new(LpgStore::new().unwrap());
1659
1660        // Seed data on the external store directly
1661        let n1 = external.create_node(&["Person"]);
1662        external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
1663
1664        let db = GrafeoDB::with_store(
1665            Arc::clone(&external) as Arc<dyn GraphStoreMut>,
1666            Config::in_memory(),
1667        )
1668        .unwrap();
1669
1670        let session = db.session();
1671
1672        // Session should see data from the external store via execute
1673        #[cfg(feature = "gql")]
1674        {
1675            let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
1676            assert_eq!(result.rows.len(), 1);
1677        }
1678    }
1679
1680    #[test]
1681    fn test_with_config_custom_memory_limit() {
1682        let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); // 64 MB
1683
1684        let db = GrafeoDB::with_config(config).unwrap();
1685        assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
1686        assert_eq!(db.node_count(), 0);
1687    }
1688
1689    #[cfg(feature = "metrics")]
1690    #[test]
1691    fn test_database_metrics_registry() {
1692        let db = GrafeoDB::new_in_memory();
1693
1694        // Perform some operations
1695        db.create_node(&["Person"]);
1696        db.create_node(&["Person"]);
1697
1698        // Check that metrics snapshot returns data
1699        let snap = db.metrics();
1700        // Session created counter should reflect at least 0 (metrics is initialized)
1701        assert_eq!(snap.query_count, 0); // No queries executed yet
1702    }
1703
1704    #[test]
1705    fn test_query_result_has_metrics() {
1706        // Verifies that query results include execution metrics
1707        let db = GrafeoDB::new_in_memory();
1708        db.create_node(&["Person"]);
1709        db.create_node(&["Person"]);
1710
1711        #[cfg(feature = "gql")]
1712        {
1713            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1714
1715            // Metrics should be populated
1716            assert!(result.execution_time_ms.is_some());
1717            assert!(result.rows_scanned.is_some());
1718            assert!(result.execution_time_ms.unwrap() >= 0.0);
1719            assert_eq!(result.rows_scanned.unwrap(), 2);
1720        }
1721    }
1722
1723    #[test]
1724    fn test_empty_query_result_metrics() {
1725        // Verifies metrics are correct for queries returning no results
1726        let db = GrafeoDB::new_in_memory();
1727        db.create_node(&["Person"]);
1728
1729        #[cfg(feature = "gql")]
1730        {
1731            // Query that matches nothing
1732            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1733
1734            assert!(result.execution_time_ms.is_some());
1735            assert!(result.rows_scanned.is_some());
1736            assert_eq!(result.rows_scanned.unwrap(), 0);
1737        }
1738    }
1739
1740    #[cfg(feature = "cdc")]
1741    mod cdc_integration {
1742        use super::*;
1743
1744        #[test]
1745        fn test_node_lifecycle_history() {
1746            let db = GrafeoDB::new_in_memory();
1747
1748            // Create
1749            let id = db.create_node(&["Person"]);
1750            // Update
1751            db.set_node_property(id, "name", "Alix".into());
1752            db.set_node_property(id, "name", "Gus".into());
1753            // Delete
1754            db.delete_node(id);
1755
1756            let history = db.history(id).unwrap();
1757            assert_eq!(history.len(), 4); // create + 2 updates + delete
1758            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1759            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1760            assert!(history[1].before.is_none()); // first set_node_property has no prior value
1761            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1762            assert!(history[2].before.is_some()); // second update has prior "Alix"
1763            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1764        }
1765
1766        #[test]
1767        fn test_edge_lifecycle_history() {
1768            let db = GrafeoDB::new_in_memory();
1769
1770            let alix = db.create_node(&["Person"]);
1771            let gus = db.create_node(&["Person"]);
1772            let edge = db.create_edge(alix, gus, "KNOWS");
1773            db.set_edge_property(edge, "since", 2024i64.into());
1774            db.delete_edge(edge);
1775
1776            let history = db.history(edge).unwrap();
1777            assert_eq!(history.len(), 3); // create + update + delete
1778            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1779            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1780            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1781        }
1782
1783        #[test]
1784        fn test_create_node_with_props_cdc() {
1785            let db = GrafeoDB::new_in_memory();
1786
1787            let id = db.create_node_with_props(
1788                &["Person"],
1789                vec![
1790                    ("name", grafeo_common::types::Value::from("Alix")),
1791                    ("age", grafeo_common::types::Value::from(30i64)),
1792                ],
1793            );
1794
1795            let history = db.history(id).unwrap();
1796            assert_eq!(history.len(), 1);
1797            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1798            // Props should be captured
1799            let after = history[0].after.as_ref().unwrap();
1800            assert_eq!(after.len(), 2);
1801        }
1802
1803        #[test]
1804        fn test_changes_between() {
1805            let db = GrafeoDB::new_in_memory();
1806
1807            let id1 = db.create_node(&["A"]);
1808            let _id2 = db.create_node(&["B"]);
1809            db.set_node_property(id1, "x", 1i64.into());
1810
1811            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
1812            let changes = db
1813                .changes_between(
1814                    grafeo_common::types::EpochId(0),
1815                    grafeo_common::types::EpochId(u64::MAX),
1816                )
1817                .unwrap();
1818            assert_eq!(changes.len(), 3); // 2 creates + 1 update
1819        }
1820    }
1821
1822    #[test]
1823    fn test_with_store_basic() {
1824        use grafeo_core::graph::lpg::LpgStore;
1825
1826        let store = Arc::new(LpgStore::new().unwrap());
1827        let n1 = store.create_node(&["Person"]);
1828        store.set_node_property(n1, "name", "Alix".into());
1829
1830        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1831        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1832
1833        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1834        assert_eq!(result.rows.len(), 1);
1835    }
1836
1837    #[test]
1838    fn test_with_store_session() {
1839        use grafeo_core::graph::lpg::LpgStore;
1840
1841        let store = Arc::new(LpgStore::new().unwrap());
1842        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1843        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1844
1845        let session = db.session();
1846        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1847        assert_eq!(result.rows.len(), 1);
1848    }
1849
1850    #[test]
1851    fn test_with_store_mutations() {
1852        use grafeo_core::graph::lpg::LpgStore;
1853
1854        let store = Arc::new(LpgStore::new().unwrap());
1855        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1856        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1857
1858        let mut session = db.session();
1859
1860        // Use an explicit transaction so INSERT and MATCH share the same
1861        // transaction context. With PENDING epochs, uncommitted versions are
1862        // only visible to the owning transaction.
1863        session.begin_transaction().unwrap();
1864        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1865
1866        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1867        assert_eq!(result.rows.len(), 1);
1868
1869        session.commit().unwrap();
1870    }
1871}