Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21#[cfg(feature = "rdf")]
22mod rdf_ops;
23mod search;
24#[cfg(feature = "wal")]
25pub(crate) mod wal_store;
26
27#[cfg(feature = "wal")]
28use std::path::Path;
29use std::sync::Arc;
30use std::sync::atomic::AtomicUsize;
31
32use parking_lot::RwLock;
33
34#[cfg(feature = "grafeo-file")]
35use grafeo_adapters::storage::file::GrafeoFileManager;
36#[cfg(feature = "wal")]
37use grafeo_adapters::storage::wal::{
38    DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
39};
40use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
41use grafeo_common::utils::error::Result;
42use grafeo_core::graph::GraphStoreMut;
43use grafeo_core::graph::lpg::LpgStore;
44#[cfg(feature = "rdf")]
45use grafeo_core::graph::rdf::RdfStore;
46
47use crate::catalog::Catalog;
48use crate::config::Config;
49use crate::query::cache::QueryCache;
50use crate::session::Session;
51use crate::transaction::TransactionManager;
52
53/// Your handle to a Grafeo database.
54///
55/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
56/// quick experiments, or [`open()`](Self::open) for persistent storage.
57/// Then grab a [`session()`](Self::session) to start querying.
58///
59/// # Examples
60///
61/// ```
62/// use grafeo_engine::GrafeoDB;
63///
64/// // Quick in-memory database
65/// let db = GrafeoDB::new_in_memory();
66///
67/// // Add some data
68/// db.create_node(&["Person"]);
69///
70/// // Query it
71/// let session = db.session();
72/// let result = session.execute("MATCH (p:Person) RETURN p")?;
73/// # Ok::<(), grafeo_common::utils::error::Error>(())
74/// ```
75pub struct GrafeoDB {
76    /// Database configuration.
77    pub(super) config: Config,
78    /// The underlying graph store.
79    pub(super) store: Arc<LpgStore>,
80    /// Schema and metadata catalog shared across sessions.
81    pub(super) catalog: Arc<Catalog>,
82    /// RDF triple store (if RDF feature is enabled).
83    #[cfg(feature = "rdf")]
84    pub(super) rdf_store: Arc<RdfStore>,
85    /// Transaction manager.
86    pub(super) transaction_manager: Arc<TransactionManager>,
87    /// Unified buffer manager.
88    pub(super) buffer_manager: Arc<BufferManager>,
89    /// Write-ahead log manager (if durability is enabled).
90    #[cfg(feature = "wal")]
91    pub(super) wal: Option<Arc<LpgWal>>,
92    /// Shared WAL graph context tracker. Tracks which named graph was last
93    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
94    /// records only when the context actually changes.
95    #[cfg(feature = "wal")]
96    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
97    /// Query cache for parsed and optimized plans.
98    pub(super) query_cache: Arc<QueryCache>,
99    /// Shared commit counter for auto-GC across sessions.
100    pub(super) commit_counter: Arc<AtomicUsize>,
101    /// Whether the database is open.
102    pub(super) is_open: RwLock<bool>,
103    /// Change data capture log for tracking mutations.
104    #[cfg(feature = "cdc")]
105    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
106    /// Registered embedding models for text-to-vector conversion.
107    #[cfg(feature = "embed")]
108    pub(super) embedding_models:
109        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
110    /// Single-file database manager (when using `.grafeo` format).
111    #[cfg(feature = "grafeo-file")]
112    pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
113    /// External graph store (when using with_store()).
114    /// When set, sessions route queries through this store instead of the built-in LpgStore.
115    pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
116    /// Metrics registry shared across all sessions.
117    #[cfg(feature = "metrics")]
118    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
119    /// Persistent graph context for one-shot `execute()` calls.
120    /// When set, each call to `session()` pre-configures the session to this graph.
121    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
122    current_graph: RwLock<Option<String>>,
123}
124
125impl GrafeoDB {
126    /// Creates an in-memory database, fast to create, gone when dropped.
127    ///
128    /// Use this for tests, experiments, or when you don't need persistence.
129    /// For data that survives restarts, use [`open()`](Self::open) instead.
130    ///
131    /// # Panics
132    ///
133    /// Panics if the internal arena allocator cannot be initialized (out of memory).
134    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
135    ///
136    /// # Examples
137    ///
138    /// ```
139    /// use grafeo_engine::GrafeoDB;
140    ///
141    /// let db = GrafeoDB::new_in_memory();
142    /// let session = db.session();
143    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
144    /// # Ok::<(), grafeo_common::utils::error::Error>(())
145    /// ```
146    #[must_use]
147    pub fn new_in_memory() -> Self {
148        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
149    }
150
151    /// Opens a database at the given path, creating it if it doesn't exist.
152    ///
153    /// If you've used this path before, Grafeo recovers your data from the
154    /// write-ahead log automatically. First open on a new path creates an
155    /// empty database.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the path isn't writable or recovery fails.
160    ///
161    /// # Examples
162    ///
163    /// ```no_run
164    /// use grafeo_engine::GrafeoDB;
165    ///
166    /// let db = GrafeoDB::open("./my_social_network")?;
167    /// # Ok::<(), grafeo_common::utils::error::Error>(())
168    /// ```
169    #[cfg(feature = "wal")]
170    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
171        Self::with_config(Config::persistent(path.as_ref()))
172    }
173
174    /// Creates a database with custom configuration.
175    ///
176    /// Use this when you need fine-grained control over memory limits,
177    /// thread counts, or persistence settings. For most cases,
178    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
179    /// are simpler.
180    ///
181    /// # Errors
182    ///
183    /// Returns an error if the database can't be created or recovery fails.
184    ///
185    /// # Examples
186    ///
187    /// ```
188    /// use grafeo_engine::{GrafeoDB, Config};
189    ///
190    /// // In-memory with a 512MB limit
191    /// let config = Config::in_memory()
192    ///     .with_memory_limit(512 * 1024 * 1024);
193    ///
194    /// let db = GrafeoDB::with_config(config)?;
195    /// # Ok::<(), grafeo_common::utils::error::Error>(())
196    /// ```
197    pub fn with_config(config: Config) -> Result<Self> {
198        // Validate configuration before proceeding
199        config
200            .validate()
201            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
202
203        let store = Arc::new(LpgStore::new()?);
204        #[cfg(feature = "rdf")]
205        let rdf_store = Arc::new(RdfStore::new());
206        let transaction_manager = Arc::new(TransactionManager::new());
207
208        // Create buffer manager with configured limits
209        let buffer_config = BufferManagerConfig {
210            budget: config.memory_limit.unwrap_or_else(|| {
211                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
212            }),
213            spill_path: config
214                .spill_path
215                .clone()
216                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
217            ..BufferManagerConfig::default()
218        };
219        let buffer_manager = BufferManager::new(buffer_config);
220
221        // Create catalog early so WAL replay can restore schema definitions
222        let catalog = Arc::new(Catalog::new());
223
224        // --- Single-file format (.grafeo) ---
225        #[cfg(feature = "grafeo-file")]
226        let file_manager: Option<Arc<GrafeoFileManager>> = if config.wal_enabled {
227            if let Some(ref db_path) = config.path {
228                if Self::should_use_single_file(db_path, config.storage_format) {
229                    let fm = if db_path.exists() && db_path.is_file() {
230                        GrafeoFileManager::open(db_path)?
231                    } else if !db_path.exists() {
232                        GrafeoFileManager::create(db_path)?
233                    } else {
234                        // Path exists but is not a file (directory, etc.)
235                        return Err(grafeo_common::utils::error::Error::Internal(format!(
236                            "path exists but is not a file: {}",
237                            db_path.display()
238                        )));
239                    };
240
241                    // Load snapshot data from the file
242                    let snapshot_data = fm.read_snapshot()?;
243                    if !snapshot_data.is_empty() {
244                        Self::apply_snapshot_data(
245                            &store,
246                            &catalog,
247                            #[cfg(feature = "rdf")]
248                            &rdf_store,
249                            &snapshot_data,
250                        )?;
251                    }
252
253                    // Recover sidecar WAL if present
254                    if fm.has_sidecar_wal() {
255                        let recovery = WalRecovery::new(fm.sidecar_wal_path());
256                        let records = recovery.recover()?;
257                        Self::apply_wal_records(
258                            &store,
259                            &catalog,
260                            #[cfg(feature = "rdf")]
261                            &rdf_store,
262                            &records,
263                        )?;
264                    }
265
266                    Some(Arc::new(fm))
267                } else {
268                    None
269                }
270            } else {
271                None
272            }
273        } else {
274            None
275        };
276
277        // Determine whether to use the WAL directory path (legacy) or sidecar
278        #[cfg(feature = "wal")]
279        let wal = if config.wal_enabled {
280            if let Some(ref db_path) = config.path {
281                // When using single-file format, the WAL is a sidecar directory
282                #[cfg(feature = "grafeo-file")]
283                let wal_path = if let Some(ref fm) = file_manager {
284                    let p = fm.sidecar_wal_path();
285                    std::fs::create_dir_all(&p)?;
286                    p
287                } else {
288                    // Legacy: WAL inside the database directory
289                    std::fs::create_dir_all(db_path)?;
290                    db_path.join("wal")
291                };
292
293                #[cfg(not(feature = "grafeo-file"))]
294                let wal_path = {
295                    std::fs::create_dir_all(db_path)?;
296                    db_path.join("wal")
297                };
298
299                // For legacy WAL directory format, check if WAL exists and recover
300                #[cfg(feature = "grafeo-file")]
301                let is_single_file = file_manager.is_some();
302                #[cfg(not(feature = "grafeo-file"))]
303                let is_single_file = false;
304
305                if !is_single_file && wal_path.exists() {
306                    let recovery = WalRecovery::new(&wal_path);
307                    let records = recovery.recover()?;
308                    Self::apply_wal_records(
309                        &store,
310                        &catalog,
311                        #[cfg(feature = "rdf")]
312                        &rdf_store,
313                        &records,
314                    )?;
315                }
316
317                // Open/create WAL manager with configured durability
318                let wal_durability = match config.wal_durability {
319                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
320                    crate::config::DurabilityMode::Batch {
321                        max_delay_ms,
322                        max_records,
323                    } => WalDurabilityMode::Batch {
324                        max_delay_ms,
325                        max_records,
326                    },
327                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
328                        WalDurabilityMode::Adaptive { target_interval_ms }
329                    }
330                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
331                };
332                let wal_config = WalConfig {
333                    durability: wal_durability,
334                    ..WalConfig::default()
335                };
336                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
337                Some(Arc::new(wal_manager))
338            } else {
339                None
340            }
341        } else {
342            None
343        };
344
345        // Create query cache with default capacity (1000 queries)
346        let query_cache = Arc::new(QueryCache::default());
347
348        Ok(Self {
349            config,
350            store,
351            catalog,
352            #[cfg(feature = "rdf")]
353            rdf_store,
354            transaction_manager,
355            buffer_manager,
356            #[cfg(feature = "wal")]
357            wal,
358            #[cfg(feature = "wal")]
359            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
360            query_cache,
361            commit_counter: Arc::new(AtomicUsize::new(0)),
362            is_open: RwLock::new(true),
363            #[cfg(feature = "cdc")]
364            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
365            #[cfg(feature = "embed")]
366            embedding_models: RwLock::new(hashbrown::HashMap::new()),
367            #[cfg(feature = "grafeo-file")]
368            file_manager,
369            external_store: None,
370            #[cfg(feature = "metrics")]
371            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
372            current_graph: RwLock::new(None),
373        })
374    }
375
376    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
377    ///
378    /// The external store handles all data persistence. WAL, CDC, and index
379    /// management are the responsibility of the store implementation.
380    ///
381    /// Query execution (all 6 languages, optimizer, planner) works through the
382    /// provided store. Admin operations (schema introspection, persistence,
383    /// vector/text indexes) are not available on external stores.
384    ///
385    /// # Examples
386    ///
387    /// ```no_run
388    /// use std::sync::Arc;
389    /// use grafeo_engine::{GrafeoDB, Config};
390    /// use grafeo_core::graph::GraphStoreMut;
391    ///
392    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
393    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
394    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
395    ///     Ok(())
396    /// }
397    /// ```
398    ///
399    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
400    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
401        config
402            .validate()
403            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
404
405        let dummy_store = Arc::new(LpgStore::new()?);
406        let transaction_manager = Arc::new(TransactionManager::new());
407
408        let buffer_config = BufferManagerConfig {
409            budget: config.memory_limit.unwrap_or_else(|| {
410                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
411            }),
412            spill_path: None,
413            ..BufferManagerConfig::default()
414        };
415        let buffer_manager = BufferManager::new(buffer_config);
416
417        let query_cache = Arc::new(QueryCache::default());
418
419        Ok(Self {
420            config,
421            store: dummy_store,
422            catalog: Arc::new(Catalog::new()),
423            #[cfg(feature = "rdf")]
424            rdf_store: Arc::new(RdfStore::new()),
425            transaction_manager,
426            buffer_manager,
427            #[cfg(feature = "wal")]
428            wal: None,
429            #[cfg(feature = "wal")]
430            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
431            query_cache,
432            commit_counter: Arc::new(AtomicUsize::new(0)),
433            is_open: RwLock::new(true),
434            #[cfg(feature = "cdc")]
435            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
436            #[cfg(feature = "embed")]
437            embedding_models: RwLock::new(hashbrown::HashMap::new()),
438            #[cfg(feature = "grafeo-file")]
439            file_manager: None,
440            external_store: Some(store),
441            #[cfg(feature = "metrics")]
442            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
443            current_graph: RwLock::new(None),
444        })
445    }
446
447    /// Applies WAL records to restore the database state.
448    ///
449    /// Data mutation records are routed through a graph cursor that tracks
450    /// `SwitchGraph` context markers, replaying mutations into the correct
451    /// named graph (or the default graph when cursor is `None`).
452    #[cfg(feature = "wal")]
453    fn apply_wal_records(
454        store: &Arc<LpgStore>,
455        catalog: &Catalog,
456        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
457        records: &[WalRecord],
458    ) -> Result<()> {
459        use crate::catalog::{
460            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
461        };
462        use grafeo_common::utils::error::Error;
463
464        // Graph cursor: tracks which named graph receives data mutations.
465        // `None` means the default graph.
466        let mut current_graph: Option<String> = None;
467        let mut target_store: Arc<LpgStore> = Arc::clone(store);
468
469        for record in records {
470            match record {
471                // --- Named graph lifecycle ---
472                WalRecord::CreateNamedGraph { name } => {
473                    let _ = store.create_graph(name);
474                }
475                WalRecord::DropNamedGraph { name } => {
476                    store.drop_graph(name);
477                    // Reset cursor if the dropped graph was active
478                    if current_graph.as_deref() == Some(name.as_str()) {
479                        current_graph = None;
480                        target_store = Arc::clone(store);
481                    }
482                }
483                WalRecord::SwitchGraph { name } => {
484                    current_graph.clone_from(name);
485                    target_store = match &current_graph {
486                        None => Arc::clone(store),
487                        Some(graph_name) => store
488                            .graph_or_create(graph_name)
489                            .map_err(|e| Error::Internal(e.to_string()))?,
490                    };
491                }
492
493                // --- Data mutations: routed through target_store ---
494                WalRecord::CreateNode { id, labels } => {
495                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
496                    target_store.create_node_with_id(*id, &label_refs)?;
497                }
498                WalRecord::DeleteNode { id } => {
499                    target_store.delete_node(*id);
500                }
501                WalRecord::CreateEdge {
502                    id,
503                    src,
504                    dst,
505                    edge_type,
506                } => {
507                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
508                }
509                WalRecord::DeleteEdge { id } => {
510                    target_store.delete_edge(*id);
511                }
512                WalRecord::SetNodeProperty { id, key, value } => {
513                    target_store.set_node_property(*id, key, value.clone());
514                }
515                WalRecord::SetEdgeProperty { id, key, value } => {
516                    target_store.set_edge_property(*id, key, value.clone());
517                }
518                WalRecord::AddNodeLabel { id, label } => {
519                    target_store.add_label(*id, label);
520                }
521                WalRecord::RemoveNodeLabel { id, label } => {
522                    target_store.remove_label(*id, label);
523                }
524                WalRecord::RemoveNodeProperty { id, key } => {
525                    target_store.remove_node_property(*id, key);
526                }
527                WalRecord::RemoveEdgeProperty { id, key } => {
528                    target_store.remove_edge_property(*id, key);
529                }
530
531                // --- Schema DDL replay (always on root catalog) ---
532                WalRecord::CreateNodeType {
533                    name,
534                    properties,
535                    constraints,
536                } => {
537                    let def = NodeTypeDefinition {
538                        name: name.clone(),
539                        properties: properties
540                            .iter()
541                            .map(|(n, t, nullable)| TypedProperty {
542                                name: n.clone(),
543                                data_type: PropertyDataType::from_type_name(t),
544                                nullable: *nullable,
545                                default_value: None,
546                            })
547                            .collect(),
548                        constraints: constraints
549                            .iter()
550                            .map(|(kind, props)| match kind.as_str() {
551                                "unique" => TypeConstraint::Unique(props.clone()),
552                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
553                                "not_null" if !props.is_empty() => {
554                                    TypeConstraint::NotNull(props[0].clone())
555                                }
556                                _ => TypeConstraint::Unique(props.clone()),
557                            })
558                            .collect(),
559                        parent_types: Vec::new(),
560                    };
561                    let _ = catalog.register_node_type(def);
562                }
563                WalRecord::DropNodeType { name } => {
564                    let _ = catalog.drop_node_type(name);
565                }
566                WalRecord::CreateEdgeType {
567                    name,
568                    properties,
569                    constraints,
570                } => {
571                    let def = EdgeTypeDefinition {
572                        name: name.clone(),
573                        properties: properties
574                            .iter()
575                            .map(|(n, t, nullable)| TypedProperty {
576                                name: n.clone(),
577                                data_type: PropertyDataType::from_type_name(t),
578                                nullable: *nullable,
579                                default_value: None,
580                            })
581                            .collect(),
582                        constraints: constraints
583                            .iter()
584                            .map(|(kind, props)| match kind.as_str() {
585                                "unique" => TypeConstraint::Unique(props.clone()),
586                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
587                                "not_null" if !props.is_empty() => {
588                                    TypeConstraint::NotNull(props[0].clone())
589                                }
590                                _ => TypeConstraint::Unique(props.clone()),
591                            })
592                            .collect(),
593                        source_node_types: Vec::new(),
594                        target_node_types: Vec::new(),
595                    };
596                    let _ = catalog.register_edge_type_def(def);
597                }
598                WalRecord::DropEdgeType { name } => {
599                    let _ = catalog.drop_edge_type_def(name);
600                }
601                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
602                    // Index recreation is handled by the store on startup
603                    // (indexes are rebuilt from data, not WAL)
604                }
605                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
606                    // Constraint definitions are part of type definitions
607                    // and replayed via CreateNodeType/CreateEdgeType
608                }
609                WalRecord::CreateGraphType {
610                    name,
611                    node_types,
612                    edge_types,
613                    open,
614                } => {
615                    use crate::catalog::GraphTypeDefinition;
616                    let def = GraphTypeDefinition {
617                        name: name.clone(),
618                        allowed_node_types: node_types.clone(),
619                        allowed_edge_types: edge_types.clone(),
620                        open: *open,
621                    };
622                    let _ = catalog.register_graph_type(def);
623                }
624                WalRecord::DropGraphType { name } => {
625                    let _ = catalog.drop_graph_type(name);
626                }
627                WalRecord::CreateSchema { name } => {
628                    let _ = catalog.register_schema_namespace(name.clone());
629                }
630                WalRecord::DropSchema { name } => {
631                    let _ = catalog.drop_schema_namespace(name);
632                }
633
634                WalRecord::AlterNodeType { name, alterations } => {
635                    for (action, prop_name, type_name, nullable) in alterations {
636                        match action.as_str() {
637                            "add" => {
638                                let prop = TypedProperty {
639                                    name: prop_name.clone(),
640                                    data_type: PropertyDataType::from_type_name(type_name),
641                                    nullable: *nullable,
642                                    default_value: None,
643                                };
644                                let _ = catalog.alter_node_type_add_property(name, prop);
645                            }
646                            "drop" => {
647                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
648                            }
649                            _ => {}
650                        }
651                    }
652                }
653                WalRecord::AlterEdgeType { name, alterations } => {
654                    for (action, prop_name, type_name, nullable) in alterations {
655                        match action.as_str() {
656                            "add" => {
657                                let prop = TypedProperty {
658                                    name: prop_name.clone(),
659                                    data_type: PropertyDataType::from_type_name(type_name),
660                                    nullable: *nullable,
661                                    default_value: None,
662                                };
663                                let _ = catalog.alter_edge_type_add_property(name, prop);
664                            }
665                            "drop" => {
666                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
667                            }
668                            _ => {}
669                        }
670                    }
671                }
672                WalRecord::AlterGraphType { name, alterations } => {
673                    for (action, type_name) in alterations {
674                        match action.as_str() {
675                            "add_node" => {
676                                let _ =
677                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
678                            }
679                            "drop_node" => {
680                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
681                            }
682                            "add_edge" => {
683                                let _ =
684                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
685                            }
686                            "drop_edge" => {
687                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
688                            }
689                            _ => {}
690                        }
691                    }
692                }
693
694                WalRecord::CreateProcedure {
695                    name,
696                    params,
697                    returns,
698                    body,
699                } => {
700                    use crate::catalog::ProcedureDefinition;
701                    let def = ProcedureDefinition {
702                        name: name.clone(),
703                        params: params.clone(),
704                        returns: returns.clone(),
705                        body: body.clone(),
706                    };
707                    let _ = catalog.register_procedure(def);
708                }
709                WalRecord::DropProcedure { name } => {
710                    let _ = catalog.drop_procedure(name);
711                }
712
713                // --- RDF triple replay ---
714                #[cfg(feature = "rdf")]
715                WalRecord::InsertRdfTriple { .. }
716                | WalRecord::DeleteRdfTriple { .. }
717                | WalRecord::ClearRdfGraph { .. }
718                | WalRecord::CreateRdfGraph { .. }
719                | WalRecord::DropRdfGraph { .. } => {
720                    rdf_ops::replay_rdf_wal_record(rdf_store, record);
721                }
722                #[cfg(not(feature = "rdf"))]
723                WalRecord::InsertRdfTriple { .. }
724                | WalRecord::DeleteRdfTriple { .. }
725                | WalRecord::ClearRdfGraph { .. }
726                | WalRecord::CreateRdfGraph { .. }
727                | WalRecord::DropRdfGraph { .. } => {}
728
729                WalRecord::TransactionCommit { .. }
730                | WalRecord::TransactionAbort { .. }
731                | WalRecord::Checkpoint { .. } => {
732                    // Transaction control records don't need replay action
733                    // (recovery already filtered to only committed transactions)
734                }
735            }
736        }
737        Ok(())
738    }
739
740    // =========================================================================
741    // Single-file format helpers
742    // =========================================================================
743
744    /// Returns `true` if the given path should use single-file format.
745    #[cfg(feature = "grafeo-file")]
746    fn should_use_single_file(
747        path: &std::path::Path,
748        configured: crate::config::StorageFormat,
749    ) -> bool {
750        use crate::config::StorageFormat;
751        match configured {
752            StorageFormat::SingleFile => true,
753            StorageFormat::WalDirectory => false,
754            StorageFormat::Auto => {
755                // Existing file: check magic bytes
756                if path.is_file() {
757                    if let Ok(mut f) = std::fs::File::open(path) {
758                        use std::io::Read;
759                        let mut magic = [0u8; 4];
760                        if f.read_exact(&mut magic).is_ok()
761                            && magic == grafeo_adapters::storage::file::MAGIC
762                        {
763                            return true;
764                        }
765                    }
766                    return false;
767                }
768                // Existing directory: legacy format
769                if path.is_dir() {
770                    return false;
771                }
772                // New path: check extension
773                path.extension().is_some_and(|ext| ext == "grafeo")
774            }
775        }
776    }
777
778    /// Applies snapshot data (from a `.grafeo` file) to restore the store and catalog.
779    #[cfg(feature = "grafeo-file")]
780    fn apply_snapshot_data(
781        store: &Arc<LpgStore>,
782        catalog: &Arc<crate::catalog::Catalog>,
783        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
784        data: &[u8],
785    ) -> Result<()> {
786        persistence::load_snapshot_into_store(
787            store,
788            catalog,
789            #[cfg(feature = "rdf")]
790            rdf_store,
791            data,
792        )
793    }
794
795    // =========================================================================
796    // Session & Configuration
797    // =========================================================================
798
799    /// Opens a new session for running queries.
800    ///
801    /// Sessions are cheap to create: spin up as many as you need. Each
802    /// gets its own transaction context, so concurrent sessions won't
803    /// block each other on reads.
804    ///
805    /// # Panics
806    ///
807    /// Panics if the database was configured with an external graph store and
808    /// the internal arena allocator cannot be initialized (out of memory).
809    ///
810    /// # Examples
811    ///
812    /// ```
813    /// use grafeo_engine::GrafeoDB;
814    ///
815    /// let db = GrafeoDB::new_in_memory();
816    /// let session = db.session();
817    ///
818    /// // Run queries through the session
819    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
820    /// # Ok::<(), grafeo_common::utils::error::Error>(())
821    /// ```
822    #[must_use]
823    pub fn session(&self) -> Session {
824        let session_cfg = || crate::session::SessionConfig {
825            transaction_manager: Arc::clone(&self.transaction_manager),
826            query_cache: Arc::clone(&self.query_cache),
827            catalog: Arc::clone(&self.catalog),
828            adaptive_config: self.config.adaptive.clone(),
829            factorized_execution: self.config.factorized_execution,
830            graph_model: self.config.graph_model,
831            query_timeout: self.config.query_timeout,
832            commit_counter: Arc::clone(&self.commit_counter),
833            gc_interval: self.config.gc_interval,
834        };
835
836        if let Some(ref ext_store) = self.external_store {
837            return Session::with_external_store(Arc::clone(ext_store), session_cfg())
838                .expect("arena allocation for external store session");
839        }
840
841        #[cfg(feature = "rdf")]
842        let mut session = Session::with_rdf_store_and_adaptive(
843            Arc::clone(&self.store),
844            Arc::clone(&self.rdf_store),
845            session_cfg(),
846        );
847        #[cfg(not(feature = "rdf"))]
848        let mut session = Session::with_adaptive(Arc::clone(&self.store), session_cfg());
849
850        #[cfg(feature = "wal")]
851        if let Some(ref wal) = self.wal {
852            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
853        }
854
855        #[cfg(feature = "cdc")]
856        session.set_cdc_log(Arc::clone(&self.cdc_log));
857
858        #[cfg(feature = "metrics")]
859        {
860            if let Some(ref m) = self.metrics {
861                session.set_metrics(Arc::clone(m));
862                m.session_created
863                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
864                m.session_active
865                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
866            }
867        }
868
869        // Propagate persistent graph context to the new session
870        if let Some(ref graph) = *self.current_graph.read() {
871            session.use_graph(graph);
872        }
873
874        // Suppress unused_mut when cdc/wal are disabled
875        let _ = &mut session;
876
877        session
878    }
879
880    /// Returns the current graph name, if any.
881    ///
882    /// This is the persistent graph context used by one-shot `execute()` calls.
883    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
884    /// or `SESSION RESET`.
885    #[must_use]
886    pub fn current_graph(&self) -> Option<String> {
887        self.current_graph.read().clone()
888    }
889
890    /// Sets the current graph context for subsequent one-shot `execute()` calls.
891    ///
892    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
893    /// Pass `None` to reset to the default graph.
894    pub fn set_current_graph(&self, name: Option<&str>) {
895        *self.current_graph.write() = name.map(ToString::to_string);
896    }
897
898    /// Returns the adaptive execution configuration.
899    #[must_use]
900    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
901        &self.config.adaptive
902    }
903
904    /// Returns the configuration.
905    #[must_use]
906    pub fn config(&self) -> &Config {
907        &self.config
908    }
909
910    /// Returns the graph data model of this database.
911    #[must_use]
912    pub fn graph_model(&self) -> crate::config::GraphModel {
913        self.config.graph_model
914    }
915
916    /// Returns the configured memory limit in bytes, if any.
917    #[must_use]
918    pub fn memory_limit(&self) -> Option<usize> {
919        self.config.memory_limit
920    }
921
922    /// Returns a point-in-time snapshot of all metrics.
923    ///
924    /// If the `metrics` feature is disabled or the registry is not
925    /// initialized, returns a default (all-zero) snapshot.
926    #[cfg(feature = "metrics")]
927    #[must_use]
928    pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
929        let mut snapshot = self
930            .metrics
931            .as_ref()
932            .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
933
934        // Augment with cache stats from the query cache (not tracked in the registry)
935        let cache_stats = self.query_cache.stats();
936        snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
937        snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
938        snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
939        snapshot.cache_invalidations = cache_stats.invalidations;
940
941        snapshot
942    }
943
944    /// Returns all metrics in Prometheus text exposition format.
945    ///
946    /// The output is ready to serve from an HTTP `/metrics` endpoint.
947    #[cfg(feature = "metrics")]
948    #[must_use]
949    pub fn metrics_prometheus(&self) -> String {
950        self.metrics
951            .as_ref()
952            .map_or_else(String::new, |m| m.to_prometheus())
953    }
954
955    /// Resets all metrics counters and histograms to zero.
956    #[cfg(feature = "metrics")]
957    pub fn reset_metrics(&self) {
958        if let Some(ref m) = self.metrics {
959            m.reset();
960        }
961        self.query_cache.reset_stats();
962    }
963
964    /// Returns the underlying (default) store.
965    ///
966    /// This provides direct access to the LPG store for algorithm implementations
967    /// and admin operations (index management, schema introspection, MVCC internals).
968    ///
969    /// For code that only needs read/write graph operations, prefer
970    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
971    #[must_use]
972    pub fn store(&self) -> &Arc<LpgStore> {
973        &self.store
974    }
975
976    /// Returns the LPG store for the currently active graph.
977    ///
978    /// If [`current_graph`](Self::current_graph) is `None` or `"default"`, returns
979    /// the default store. Otherwise looks up the named graph in the root store.
980    /// Falls back to the default store if the named graph does not exist.
981    #[allow(dead_code)] // Reserved for future graph-aware CRUD methods
982    fn active_store(&self) -> Arc<LpgStore> {
983        let graph_name = self.current_graph.read().clone();
984        match graph_name {
985            None => Arc::clone(&self.store),
986            Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
987            Some(ref name) => self
988                .store
989                .graph(name)
990                .unwrap_or_else(|| Arc::clone(&self.store)),
991        }
992    }
993
994    // === Named Graph Management ===
995
996    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
997    ///
998    /// # Errors
999    ///
1000    /// Returns an error if arena allocation fails.
1001    pub fn create_graph(&self, name: &str) -> Result<bool> {
1002        Ok(self.store.create_graph(name)?)
1003    }
1004
1005    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
1006    pub fn drop_graph(&self, name: &str) -> bool {
1007        self.store.drop_graph(name)
1008    }
1009
1010    /// Returns all named graph names.
1011    #[must_use]
1012    pub fn list_graphs(&self) -> Vec<String> {
1013        self.store.graph_names()
1014    }
1015
1016    /// Returns the graph store as a trait object.
1017    ///
1018    /// This provides the [`GraphStoreMut`] interface for code that should work
1019    /// with any storage backend. Use this when you only need graph read/write
1020    /// operations and don't need admin methods like index management.
1021    ///
1022    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
1023    #[must_use]
1024    pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
1025        if let Some(ref ext_store) = self.external_store {
1026            Arc::clone(ext_store)
1027        } else {
1028            Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
1029        }
1030    }
1031
1032    /// Garbage collects old MVCC versions that are no longer visible.
1033    ///
1034    /// Determines the minimum epoch required by active transactions and prunes
1035    /// version chains older than that threshold. Also cleans up completed
1036    /// transaction metadata in the transaction manager.
1037    pub fn gc(&self) {
1038        let min_epoch = self.transaction_manager.min_active_epoch();
1039        self.store.gc_versions(min_epoch);
1040        self.transaction_manager.gc();
1041    }
1042
1043    /// Returns the buffer manager for memory-aware operations.
1044    #[must_use]
1045    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1046        &self.buffer_manager
1047    }
1048
1049    /// Returns the query cache.
1050    #[must_use]
1051    pub fn query_cache(&self) -> &Arc<QueryCache> {
1052        &self.query_cache
1053    }
1054
1055    /// Clears all cached query plans.
1056    ///
1057    /// This is called automatically after DDL operations, but can also be
1058    /// invoked manually after external schema changes (e.g., WAL replay,
1059    /// import) or when you want to force re-optimization of all queries.
1060    pub fn clear_plan_cache(&self) {
1061        self.query_cache.clear();
1062    }
1063
1064    // =========================================================================
1065    // Lifecycle
1066    // =========================================================================
1067
1068    /// Closes the database, flushing all pending writes.
1069    ///
1070    /// For persistent databases, this ensures everything is safely on disk.
1071    /// Called automatically when the database is dropped, but you can call
1072    /// it explicitly if you need to guarantee durability at a specific point.
1073    ///
1074    /// # Errors
1075    ///
1076    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
1077    pub fn close(&self) -> Result<()> {
1078        let mut is_open = self.is_open.write();
1079        if !*is_open {
1080            return Ok(());
1081        }
1082
1083        // For single-file format: checkpoint to .grafeo file, then clean up sidecar WAL.
1084        // We must do this BEFORE the WAL close path because checkpoint_to_file
1085        // removes the sidecar WAL directory.
1086        #[cfg(feature = "grafeo-file")]
1087        let is_single_file = self.file_manager.is_some();
1088        #[cfg(not(feature = "grafeo-file"))]
1089        let is_single_file = false;
1090
1091        #[cfg(feature = "grafeo-file")]
1092        if let Some(ref fm) = self.file_manager {
1093            // Flush WAL first so all records are on disk before we snapshot
1094            #[cfg(feature = "wal")]
1095            if let Some(ref wal) = self.wal {
1096                wal.sync()?;
1097            }
1098            self.checkpoint_to_file(fm)?;
1099
1100            // Release WAL file handles before removing sidecar directory.
1101            // On Windows, open handles prevent directory deletion.
1102            #[cfg(feature = "wal")]
1103            if let Some(ref wal) = self.wal {
1104                wal.close_active_log();
1105            }
1106
1107            fm.remove_sidecar_wal()?;
1108            fm.close()?;
1109        }
1110
1111        // Commit and checkpoint WAL (legacy directory format only)
1112        #[cfg(feature = "wal")]
1113        if !is_single_file && let Some(ref wal) = self.wal {
1114            let epoch = self.store.current_epoch();
1115
1116            // Use the last assigned transaction ID, or create a checkpoint-only tx
1117            let checkpoint_tx = self
1118                .transaction_manager
1119                .last_assigned_transaction_id()
1120                .unwrap_or_else(|| {
1121                    // No transactions have been started; begin one for checkpoint
1122                    self.transaction_manager.begin()
1123                });
1124
1125            // Log a TransactionCommit to mark all pending records as committed
1126            wal.log(&WalRecord::TransactionCommit {
1127                transaction_id: checkpoint_tx,
1128            })?;
1129
1130            // Then checkpoint
1131            wal.checkpoint(checkpoint_tx, epoch)?;
1132            wal.sync()?;
1133        }
1134
1135        *is_open = false;
1136        Ok(())
1137    }
1138
1139    /// Returns the typed WAL if available.
1140    #[cfg(feature = "wal")]
1141    #[must_use]
1142    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1143        self.wal.as_ref()
1144    }
1145
1146    /// Logs a WAL record if WAL is enabled.
1147    #[cfg(feature = "wal")]
1148    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1149        if let Some(ref wal) = self.wal {
1150            wal.log(record)?;
1151        }
1152        Ok(())
1153    }
1154
1155    /// Writes the current database snapshot to the `.grafeo` file.
1156    ///
1157    /// Does NOT remove the sidecar WAL: callers that want to clean up
1158    /// the sidecar (e.g. `close()`) should call `fm.remove_sidecar_wal()`
1159    /// separately after this returns.
1160    #[cfg(feature = "grafeo-file")]
1161    fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1162        use grafeo_core::testing::crash::maybe_crash;
1163
1164        maybe_crash("checkpoint_to_file:before_export");
1165        let snapshot_data = self.export_snapshot()?;
1166        maybe_crash("checkpoint_to_file:after_export");
1167
1168        let epoch = self.store.current_epoch();
1169        let transaction_id = self
1170            .transaction_manager
1171            .last_assigned_transaction_id()
1172            .map_or(0, |t| t.0);
1173        let node_count = self.store.node_count() as u64;
1174        let edge_count = self.store.edge_count() as u64;
1175
1176        fm.write_snapshot(
1177            &snapshot_data,
1178            epoch.0,
1179            transaction_id,
1180            node_count,
1181            edge_count,
1182        )?;
1183
1184        maybe_crash("checkpoint_to_file:after_write_snapshot");
1185        Ok(())
1186    }
1187
1188    /// Returns the file manager if using single-file format.
1189    #[cfg(feature = "grafeo-file")]
1190    #[must_use]
1191    pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1192        self.file_manager.as_ref()
1193    }
1194}
1195
1196impl Drop for GrafeoDB {
1197    fn drop(&mut self) {
1198        if let Err(e) = self.close() {
1199            tracing::error!("Error closing database: {}", e);
1200        }
1201    }
1202}
1203
1204impl crate::admin::AdminService for GrafeoDB {
1205    fn info(&self) -> crate::admin::DatabaseInfo {
1206        self.info()
1207    }
1208
1209    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1210        self.detailed_stats()
1211    }
1212
1213    fn schema(&self) -> crate::admin::SchemaInfo {
1214        self.schema()
1215    }
1216
1217    fn validate(&self) -> crate::admin::ValidationResult {
1218        self.validate()
1219    }
1220
1221    fn wal_status(&self) -> crate::admin::WalStatus {
1222        self.wal_status()
1223    }
1224
1225    fn wal_checkpoint(&self) -> Result<()> {
1226        self.wal_checkpoint()
1227    }
1228}
1229
1230// =========================================================================
1231// Query Result Types
1232// =========================================================================
1233
1234/// The result of running a query.
1235///
1236/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1237/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1238///
1239/// # Examples
1240///
1241/// ```
1242/// use grafeo_engine::GrafeoDB;
1243///
1244/// let db = GrafeoDB::new_in_memory();
1245/// db.create_node(&["Person"]);
1246///
1247/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1248///
1249/// // Check what we got
1250/// println!("Columns: {:?}", result.columns);
1251/// println!("Rows: {}", result.row_count());
1252///
1253/// // Iterate through results
1254/// for row in result.iter() {
1255///     println!("{:?}", row);
1256/// }
1257/// # Ok::<(), grafeo_common::utils::error::Error>(())
1258/// ```
1259#[derive(Debug)]
1260pub struct QueryResult {
1261    /// Column names from the RETURN clause.
1262    pub columns: Vec<String>,
1263    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1264    pub column_types: Vec<grafeo_common::types::LogicalType>,
1265    /// The actual result rows.
1266    pub rows: Vec<Vec<grafeo_common::types::Value>>,
1267    /// Query execution time in milliseconds (if timing was enabled).
1268    pub execution_time_ms: Option<f64>,
1269    /// Number of rows scanned during query execution (estimate).
1270    pub rows_scanned: Option<u64>,
1271    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
1272    pub status_message: Option<String>,
1273    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
1274    pub gql_status: grafeo_common::utils::GqlStatus,
1275}
1276
1277impl QueryResult {
1278    /// Creates a fully empty query result (no columns, no rows).
1279    #[must_use]
1280    pub fn empty() -> Self {
1281        Self {
1282            columns: Vec::new(),
1283            column_types: Vec::new(),
1284            rows: Vec::new(),
1285            execution_time_ms: None,
1286            rows_scanned: None,
1287            status_message: None,
1288            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1289        }
1290    }
1291
1292    /// Creates a query result with only a status message (for DDL commands).
1293    #[must_use]
1294    pub fn status(msg: impl Into<String>) -> Self {
1295        Self {
1296            columns: Vec::new(),
1297            column_types: Vec::new(),
1298            rows: Vec::new(),
1299            execution_time_ms: None,
1300            rows_scanned: None,
1301            status_message: Some(msg.into()),
1302            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1303        }
1304    }
1305
1306    /// Creates a new empty query result.
1307    #[must_use]
1308    pub fn new(columns: Vec<String>) -> Self {
1309        let len = columns.len();
1310        Self {
1311            columns,
1312            column_types: vec![grafeo_common::types::LogicalType::Any; len],
1313            rows: Vec::new(),
1314            execution_time_ms: None,
1315            rows_scanned: None,
1316            status_message: None,
1317            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1318        }
1319    }
1320
1321    /// Creates a new empty query result with column types.
1322    #[must_use]
1323    pub fn with_types(
1324        columns: Vec<String>,
1325        column_types: Vec<grafeo_common::types::LogicalType>,
1326    ) -> Self {
1327        Self {
1328            columns,
1329            column_types,
1330            rows: Vec::new(),
1331            execution_time_ms: None,
1332            rows_scanned: None,
1333            status_message: None,
1334            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1335        }
1336    }
1337
1338    /// Sets the execution metrics on this result.
1339    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1340        self.execution_time_ms = Some(execution_time_ms);
1341        self.rows_scanned = Some(rows_scanned);
1342        self
1343    }
1344
1345    /// Returns the execution time in milliseconds, if available.
1346    #[must_use]
1347    pub fn execution_time_ms(&self) -> Option<f64> {
1348        self.execution_time_ms
1349    }
1350
1351    /// Returns the number of rows scanned, if available.
1352    #[must_use]
1353    pub fn rows_scanned(&self) -> Option<u64> {
1354        self.rows_scanned
1355    }
1356
1357    /// Returns the number of rows.
1358    #[must_use]
1359    pub fn row_count(&self) -> usize {
1360        self.rows.len()
1361    }
1362
1363    /// Returns the number of columns.
1364    #[must_use]
1365    pub fn column_count(&self) -> usize {
1366        self.columns.len()
1367    }
1368
1369    /// Returns true if the result is empty.
1370    #[must_use]
1371    pub fn is_empty(&self) -> bool {
1372        self.rows.is_empty()
1373    }
1374
1375    /// Extracts a single value from the result.
1376    ///
1377    /// Use this when your query returns exactly one row with one column,
1378    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1379    ///
1380    /// # Errors
1381    ///
1382    /// Returns an error if the result has multiple rows or columns.
1383    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1384        if self.rows.len() != 1 || self.columns.len() != 1 {
1385            return Err(grafeo_common::utils::error::Error::InvalidValue(
1386                "Expected single value".to_string(),
1387            ));
1388        }
1389        T::from_value(&self.rows[0][0])
1390    }
1391
1392    /// Returns an iterator over the rows.
1393    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1394        self.rows.iter()
1395    }
1396}
1397
1398impl std::fmt::Display for QueryResult {
1399    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1400        let table = grafeo_common::fmt::format_result_table(
1401            &self.columns,
1402            &self.rows,
1403            self.execution_time_ms,
1404            self.status_message.as_deref(),
1405        );
1406        f.write_str(&table)
1407    }
1408}
1409
1410/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
1411///
1412/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1413/// Used by [`QueryResult::scalar()`] to extract typed values.
1414pub trait FromValue: Sized {
1415    /// Attempts the conversion, returning an error on type mismatch.
1416    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1417}
1418
1419impl FromValue for i64 {
1420    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1421        value
1422            .as_int64()
1423            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1424                expected: "INT64".to_string(),
1425                found: value.type_name().to_string(),
1426            })
1427    }
1428}
1429
1430impl FromValue for f64 {
1431    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1432        value
1433            .as_float64()
1434            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1435                expected: "FLOAT64".to_string(),
1436                found: value.type_name().to_string(),
1437            })
1438    }
1439}
1440
1441impl FromValue for String {
1442    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1443        value.as_str().map(String::from).ok_or_else(|| {
1444            grafeo_common::utils::error::Error::TypeMismatch {
1445                expected: "STRING".to_string(),
1446                found: value.type_name().to_string(),
1447            }
1448        })
1449    }
1450}
1451
1452impl FromValue for bool {
1453    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1454        value
1455            .as_bool()
1456            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1457                expected: "BOOL".to_string(),
1458                found: value.type_name().to_string(),
1459            })
1460    }
1461}
1462
1463#[cfg(test)]
1464mod tests {
1465    use super::*;
1466
1467    #[test]
1468    fn test_create_in_memory_database() {
1469        let db = GrafeoDB::new_in_memory();
1470        assert_eq!(db.node_count(), 0);
1471        assert_eq!(db.edge_count(), 0);
1472    }
1473
1474    #[test]
1475    fn test_database_config() {
1476        let config = Config::in_memory().with_threads(4).with_query_logging();
1477
1478        let db = GrafeoDB::with_config(config).unwrap();
1479        assert_eq!(db.config().threads, 4);
1480        assert!(db.config().query_logging);
1481    }
1482
1483    #[test]
1484    fn test_database_session() {
1485        let db = GrafeoDB::new_in_memory();
1486        let _session = db.session();
1487        // Session should be created successfully
1488    }
1489
1490    #[cfg(feature = "wal")]
1491    #[test]
1492    fn test_persistent_database_recovery() {
1493        use grafeo_common::types::Value;
1494        use tempfile::tempdir;
1495
1496        let dir = tempdir().unwrap();
1497        let db_path = dir.path().join("test_db");
1498
1499        // Create database and add some data
1500        {
1501            let db = GrafeoDB::open(&db_path).unwrap();
1502
1503            let alix = db.create_node(&["Person"]);
1504            db.set_node_property(alix, "name", Value::from("Alix"));
1505
1506            let gus = db.create_node(&["Person"]);
1507            db.set_node_property(gus, "name", Value::from("Gus"));
1508
1509            let _edge = db.create_edge(alix, gus, "KNOWS");
1510
1511            // Explicitly close to flush WAL
1512            db.close().unwrap();
1513        }
1514
1515        // Reopen and verify data was recovered
1516        {
1517            let db = GrafeoDB::open(&db_path).unwrap();
1518
1519            assert_eq!(db.node_count(), 2);
1520            assert_eq!(db.edge_count(), 1);
1521
1522            // Verify nodes exist
1523            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1524            assert!(node0.is_some());
1525
1526            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1527            assert!(node1.is_some());
1528        }
1529    }
1530
1531    #[cfg(feature = "wal")]
1532    #[test]
1533    fn test_wal_logging() {
1534        use tempfile::tempdir;
1535
1536        let dir = tempdir().unwrap();
1537        let db_path = dir.path().join("wal_test_db");
1538
1539        let db = GrafeoDB::open(&db_path).unwrap();
1540
1541        // Create some data
1542        let node = db.create_node(&["Test"]);
1543        db.delete_node(node);
1544
1545        // WAL should have records
1546        if let Some(wal) = db.wal() {
1547            assert!(wal.record_count() > 0);
1548        }
1549
1550        db.close().unwrap();
1551    }
1552
1553    #[cfg(feature = "wal")]
1554    #[test]
1555    fn test_wal_recovery_multiple_sessions() {
1556        // Tests that WAL recovery works correctly across multiple open/close cycles
1557        use grafeo_common::types::Value;
1558        use tempfile::tempdir;
1559
1560        let dir = tempdir().unwrap();
1561        let db_path = dir.path().join("multi_session_db");
1562
1563        // Session 1: Create initial data
1564        {
1565            let db = GrafeoDB::open(&db_path).unwrap();
1566            let alix = db.create_node(&["Person"]);
1567            db.set_node_property(alix, "name", Value::from("Alix"));
1568            db.close().unwrap();
1569        }
1570
1571        // Session 2: Add more data
1572        {
1573            let db = GrafeoDB::open(&db_path).unwrap();
1574            assert_eq!(db.node_count(), 1); // Previous data recovered
1575            let gus = db.create_node(&["Person"]);
1576            db.set_node_property(gus, "name", Value::from("Gus"));
1577            db.close().unwrap();
1578        }
1579
1580        // Session 3: Verify all data
1581        {
1582            let db = GrafeoDB::open(&db_path).unwrap();
1583            assert_eq!(db.node_count(), 2);
1584
1585            // Verify properties were recovered correctly
1586            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1587            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1588
1589            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1590            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1591        }
1592    }
1593
1594    #[cfg(feature = "wal")]
1595    #[test]
1596    fn test_database_consistency_after_mutations() {
1597        // Tests that database remains consistent after a series of create/delete operations
1598        use grafeo_common::types::Value;
1599        use tempfile::tempdir;
1600
1601        let dir = tempdir().unwrap();
1602        let db_path = dir.path().join("consistency_db");
1603
1604        {
1605            let db = GrafeoDB::open(&db_path).unwrap();
1606
1607            // Create nodes
1608            let a = db.create_node(&["Node"]);
1609            let b = db.create_node(&["Node"]);
1610            let c = db.create_node(&["Node"]);
1611
1612            // Create edges
1613            let e1 = db.create_edge(a, b, "LINKS");
1614            let _e2 = db.create_edge(b, c, "LINKS");
1615
1616            // Delete middle node and its edge
1617            db.delete_edge(e1);
1618            db.delete_node(b);
1619
1620            // Set properties on remaining nodes
1621            db.set_node_property(a, "value", Value::Int64(1));
1622            db.set_node_property(c, "value", Value::Int64(3));
1623
1624            db.close().unwrap();
1625        }
1626
1627        // Reopen and verify consistency
1628        {
1629            let db = GrafeoDB::open(&db_path).unwrap();
1630
1631            // Should have 2 nodes (a and c), b was deleted
1632            // Note: node_count includes deleted nodes in some implementations
1633            // What matters is that the non-deleted nodes are accessible
1634            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1635            assert!(node_a.is_some());
1636
1637            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1638            assert!(node_c.is_some());
1639
1640            // Middle node should be deleted
1641            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1642            assert!(node_b.is_none());
1643        }
1644    }
1645
1646    #[cfg(feature = "wal")]
1647    #[test]
1648    fn test_close_is_idempotent() {
1649        // Calling close() multiple times should not cause errors
1650        use tempfile::tempdir;
1651
1652        let dir = tempdir().unwrap();
1653        let db_path = dir.path().join("close_test_db");
1654
1655        let db = GrafeoDB::open(&db_path).unwrap();
1656        db.create_node(&["Test"]);
1657
1658        // First close should succeed
1659        assert!(db.close().is_ok());
1660
1661        // Second close should also succeed (idempotent)
1662        assert!(db.close().is_ok());
1663    }
1664
1665    #[test]
1666    fn test_with_store_external_backend() {
1667        use grafeo_core::graph::lpg::LpgStore;
1668
1669        let external = Arc::new(LpgStore::new().unwrap());
1670
1671        // Seed data on the external store directly
1672        let n1 = external.create_node(&["Person"]);
1673        external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
1674
1675        let db = GrafeoDB::with_store(
1676            Arc::clone(&external) as Arc<dyn GraphStoreMut>,
1677            Config::in_memory(),
1678        )
1679        .unwrap();
1680
1681        let session = db.session();
1682
1683        // Session should see data from the external store via execute
1684        #[cfg(feature = "gql")]
1685        {
1686            let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
1687            assert_eq!(result.rows.len(), 1);
1688        }
1689    }
1690
1691    #[test]
1692    fn test_with_config_custom_memory_limit() {
1693        let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); // 64 MB
1694
1695        let db = GrafeoDB::with_config(config).unwrap();
1696        assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
1697        assert_eq!(db.node_count(), 0);
1698    }
1699
1700    #[cfg(feature = "metrics")]
1701    #[test]
1702    fn test_database_metrics_registry() {
1703        let db = GrafeoDB::new_in_memory();
1704
1705        // Perform some operations
1706        db.create_node(&["Person"]);
1707        db.create_node(&["Person"]);
1708
1709        // Check that metrics snapshot returns data
1710        let snap = db.metrics();
1711        // Session created counter should reflect at least 0 (metrics is initialized)
1712        assert_eq!(snap.query_count, 0); // No queries executed yet
1713    }
1714
1715    #[test]
1716    fn test_query_result_has_metrics() {
1717        // Verifies that query results include execution metrics
1718        let db = GrafeoDB::new_in_memory();
1719        db.create_node(&["Person"]);
1720        db.create_node(&["Person"]);
1721
1722        #[cfg(feature = "gql")]
1723        {
1724            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1725
1726            // Metrics should be populated
1727            assert!(result.execution_time_ms.is_some());
1728            assert!(result.rows_scanned.is_some());
1729            assert!(result.execution_time_ms.unwrap() >= 0.0);
1730            assert_eq!(result.rows_scanned.unwrap(), 2);
1731        }
1732    }
1733
1734    #[test]
1735    fn test_empty_query_result_metrics() {
1736        // Verifies metrics are correct for queries returning no results
1737        let db = GrafeoDB::new_in_memory();
1738        db.create_node(&["Person"]);
1739
1740        #[cfg(feature = "gql")]
1741        {
1742            // Query that matches nothing
1743            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1744
1745            assert!(result.execution_time_ms.is_some());
1746            assert!(result.rows_scanned.is_some());
1747            assert_eq!(result.rows_scanned.unwrap(), 0);
1748        }
1749    }
1750
1751    #[cfg(feature = "cdc")]
1752    mod cdc_integration {
1753        use super::*;
1754
1755        #[test]
1756        fn test_node_lifecycle_history() {
1757            let db = GrafeoDB::new_in_memory();
1758
1759            // Create
1760            let id = db.create_node(&["Person"]);
1761            // Update
1762            db.set_node_property(id, "name", "Alix".into());
1763            db.set_node_property(id, "name", "Gus".into());
1764            // Delete
1765            db.delete_node(id);
1766
1767            let history = db.history(id).unwrap();
1768            assert_eq!(history.len(), 4); // create + 2 updates + delete
1769            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1770            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1771            assert!(history[1].before.is_none()); // first set_node_property has no prior value
1772            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1773            assert!(history[2].before.is_some()); // second update has prior "Alix"
1774            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1775        }
1776
1777        #[test]
1778        fn test_edge_lifecycle_history() {
1779            let db = GrafeoDB::new_in_memory();
1780
1781            let alix = db.create_node(&["Person"]);
1782            let gus = db.create_node(&["Person"]);
1783            let edge = db.create_edge(alix, gus, "KNOWS");
1784            db.set_edge_property(edge, "since", 2024i64.into());
1785            db.delete_edge(edge);
1786
1787            let history = db.history(edge).unwrap();
1788            assert_eq!(history.len(), 3); // create + update + delete
1789            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1790            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1791            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1792        }
1793
1794        #[test]
1795        fn test_create_node_with_props_cdc() {
1796            let db = GrafeoDB::new_in_memory();
1797
1798            let id = db.create_node_with_props(
1799                &["Person"],
1800                vec![
1801                    ("name", grafeo_common::types::Value::from("Alix")),
1802                    ("age", grafeo_common::types::Value::from(30i64)),
1803                ],
1804            );
1805
1806            let history = db.history(id).unwrap();
1807            assert_eq!(history.len(), 1);
1808            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1809            // Props should be captured
1810            let after = history[0].after.as_ref().unwrap();
1811            assert_eq!(after.len(), 2);
1812        }
1813
1814        #[test]
1815        fn test_changes_between() {
1816            let db = GrafeoDB::new_in_memory();
1817
1818            let id1 = db.create_node(&["A"]);
1819            let _id2 = db.create_node(&["B"]);
1820            db.set_node_property(id1, "x", 1i64.into());
1821
1822            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
1823            let changes = db
1824                .changes_between(
1825                    grafeo_common::types::EpochId(0),
1826                    grafeo_common::types::EpochId(u64::MAX),
1827                )
1828                .unwrap();
1829            assert_eq!(changes.len(), 3); // 2 creates + 1 update
1830        }
1831    }
1832
1833    #[test]
1834    fn test_with_store_basic() {
1835        use grafeo_core::graph::lpg::LpgStore;
1836
1837        let store = Arc::new(LpgStore::new().unwrap());
1838        let n1 = store.create_node(&["Person"]);
1839        store.set_node_property(n1, "name", "Alix".into());
1840
1841        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1842        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1843
1844        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1845        assert_eq!(result.rows.len(), 1);
1846    }
1847
1848    #[test]
1849    fn test_with_store_session() {
1850        use grafeo_core::graph::lpg::LpgStore;
1851
1852        let store = Arc::new(LpgStore::new().unwrap());
1853        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1854        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1855
1856        let session = db.session();
1857        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1858        assert_eq!(result.rows.len(), 1);
1859    }
1860
1861    #[test]
1862    fn test_with_store_mutations() {
1863        use grafeo_core::graph::lpg::LpgStore;
1864
1865        let store = Arc::new(LpgStore::new().unwrap());
1866        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1867        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1868
1869        let mut session = db.session();
1870
1871        // Use an explicit transaction so INSERT and MATCH share the same
1872        // transaction context. With PENDING epochs, uncommitted versions are
1873        // only visible to the owning transaction.
1874        session.begin_transaction().unwrap();
1875        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1876
1877        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1878        assert_eq!(result.rows.len(), 1);
1879
1880        session.commit().unwrap();
1881    }
1882}