Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21#[cfg(feature = "rdf")]
22mod rdf_ops;
23mod search;
24#[cfg(feature = "wal")]
25pub(crate) mod wal_store;
26
27#[cfg(feature = "wal")]
28use std::path::Path;
29use std::sync::Arc;
30use std::sync::atomic::AtomicUsize;
31
32use parking_lot::RwLock;
33
34#[cfg(feature = "grafeo-file")]
35use grafeo_adapters::storage::file::GrafeoFileManager;
36#[cfg(feature = "wal")]
37use grafeo_adapters::storage::wal::{
38    DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
39};
40use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
41use grafeo_common::utils::error::Result;
42use grafeo_core::graph::GraphStoreMut;
43use grafeo_core::graph::lpg::LpgStore;
44#[cfg(feature = "rdf")]
45use grafeo_core::graph::rdf::RdfStore;
46
47use crate::catalog::Catalog;
48use crate::config::Config;
49use crate::query::cache::QueryCache;
50use crate::session::Session;
51use crate::transaction::TransactionManager;
52
53/// Your handle to a Grafeo database.
54///
55/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
56/// quick experiments, or [`open()`](Self::open) for persistent storage.
57/// Then grab a [`session()`](Self::session) to start querying.
58///
59/// # Examples
60///
61/// ```
62/// use grafeo_engine::GrafeoDB;
63///
64/// // Quick in-memory database
65/// let db = GrafeoDB::new_in_memory();
66///
67/// // Add some data
68/// db.create_node(&["Person"]);
69///
70/// // Query it
71/// let session = db.session();
72/// let result = session.execute("MATCH (p:Person) RETURN p")?;
73/// # Ok::<(), grafeo_common::utils::error::Error>(())
74/// ```
75pub struct GrafeoDB {
76    /// Database configuration.
77    pub(super) config: Config,
78    /// The underlying graph store.
79    pub(super) store: Arc<LpgStore>,
80    /// Schema and metadata catalog shared across sessions.
81    pub(super) catalog: Arc<Catalog>,
82    /// RDF triple store (if RDF feature is enabled).
83    #[cfg(feature = "rdf")]
84    pub(super) rdf_store: Arc<RdfStore>,
85    /// Transaction manager.
86    pub(super) transaction_manager: Arc<TransactionManager>,
87    /// Unified buffer manager.
88    pub(super) buffer_manager: Arc<BufferManager>,
89    /// Write-ahead log manager (if durability is enabled).
90    #[cfg(feature = "wal")]
91    pub(super) wal: Option<Arc<LpgWal>>,
92    /// Shared WAL graph context tracker. Tracks which named graph was last
93    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
94    /// records only when the context actually changes.
95    #[cfg(feature = "wal")]
96    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
97    /// Query cache for parsed and optimized plans.
98    pub(super) query_cache: Arc<QueryCache>,
99    /// Shared commit counter for auto-GC across sessions.
100    pub(super) commit_counter: Arc<AtomicUsize>,
101    /// Whether the database is open.
102    pub(super) is_open: RwLock<bool>,
103    /// Change data capture log for tracking mutations.
104    #[cfg(feature = "cdc")]
105    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
106    /// Registered embedding models for text-to-vector conversion.
107    #[cfg(feature = "embed")]
108    pub(super) embedding_models:
109        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
110    /// Single-file database manager (when using `.grafeo` format).
111    #[cfg(feature = "grafeo-file")]
112    pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
113    /// External graph store (when using with_store()).
114    /// When set, sessions route queries through this store instead of the built-in LpgStore.
115    pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
116    /// Metrics registry shared across all sessions.
117    #[cfg(feature = "metrics")]
118    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
119    /// Persistent graph context for one-shot `execute()` calls.
120    /// When set, each call to `session()` pre-configures the session to this graph.
121    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
122    current_graph: RwLock<Option<String>>,
123    /// Whether this database is open in read-only mode.
124    /// When true, sessions automatically enforce read-only transactions.
125    read_only: bool,
126}
127
128impl GrafeoDB {
129    /// Creates an in-memory database, fast to create, gone when dropped.
130    ///
131    /// Use this for tests, experiments, or when you don't need persistence.
132    /// For data that survives restarts, use [`open()`](Self::open) instead.
133    ///
134    /// # Panics
135    ///
136    /// Panics if the internal arena allocator cannot be initialized (out of memory).
137    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
138    ///
139    /// # Examples
140    ///
141    /// ```
142    /// use grafeo_engine::GrafeoDB;
143    ///
144    /// let db = GrafeoDB::new_in_memory();
145    /// let session = db.session();
146    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
147    /// # Ok::<(), grafeo_common::utils::error::Error>(())
148    /// ```
149    #[must_use]
150    pub fn new_in_memory() -> Self {
151        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
152    }
153
154    /// Opens a database at the given path, creating it if it doesn't exist.
155    ///
156    /// If you've used this path before, Grafeo recovers your data from the
157    /// write-ahead log automatically. First open on a new path creates an
158    /// empty database.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if the path isn't writable or recovery fails.
163    ///
164    /// # Examples
165    ///
166    /// ```no_run
167    /// use grafeo_engine::GrafeoDB;
168    ///
169    /// let db = GrafeoDB::open("./my_social_network")?;
170    /// # Ok::<(), grafeo_common::utils::error::Error>(())
171    /// ```
172    #[cfg(feature = "wal")]
173    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
174        Self::with_config(Config::persistent(path.as_ref()))
175    }
176
177    /// Opens an existing database in read-only mode.
178    ///
179    /// Uses a shared file lock, so multiple processes can read the same
180    /// `.grafeo` file concurrently. The database loads the last checkpoint
181    /// snapshot but does **not** replay the WAL or allow mutations.
182    ///
183    /// Currently only supports the single-file (`.grafeo`) format.
184    ///
185    /// # Errors
186    ///
187    /// Returns an error if the file doesn't exist or can't be read.
188    ///
189    /// # Examples
190    ///
191    /// ```no_run
192    /// use grafeo_engine::GrafeoDB;
193    ///
194    /// let db = GrafeoDB::open_read_only("./my_graph.grafeo")?;
195    /// let session = db.session();
196    /// let result = session.execute("MATCH (n) RETURN n LIMIT 10")?;
197    /// // Mutations will return an error:
198    /// // session.execute("INSERT (:Person)") => Err(ReadOnly)
199    /// # Ok::<(), grafeo_common::utils::error::Error>(())
200    /// ```
201    #[cfg(feature = "grafeo-file")]
202    pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
203        Self::with_config(Config::read_only(path.as_ref()))
204    }
205
206    /// Creates a database with custom configuration.
207    ///
208    /// Use this when you need fine-grained control over memory limits,
209    /// thread counts, or persistence settings. For most cases,
210    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
211    /// are simpler.
212    ///
213    /// # Errors
214    ///
215    /// Returns an error if the database can't be created or recovery fails.
216    ///
217    /// # Examples
218    ///
219    /// ```
220    /// use grafeo_engine::{GrafeoDB, Config};
221    ///
222    /// // In-memory with a 512MB limit
223    /// let config = Config::in_memory()
224    ///     .with_memory_limit(512 * 1024 * 1024);
225    ///
226    /// let db = GrafeoDB::with_config(config)?;
227    /// # Ok::<(), grafeo_common::utils::error::Error>(())
228    /// ```
229    pub fn with_config(config: Config) -> Result<Self> {
230        // Validate configuration before proceeding
231        config
232            .validate()
233            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
234
235        let store = Arc::new(LpgStore::new()?);
236        #[cfg(feature = "rdf")]
237        let rdf_store = Arc::new(RdfStore::new());
238        let transaction_manager = Arc::new(TransactionManager::new());
239
240        // Create buffer manager with configured limits
241        let buffer_config = BufferManagerConfig {
242            budget: config.memory_limit.unwrap_or_else(|| {
243                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
244            }),
245            spill_path: config
246                .spill_path
247                .clone()
248                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
249            ..BufferManagerConfig::default()
250        };
251        let buffer_manager = BufferManager::new(buffer_config);
252
253        // Create catalog early so WAL replay can restore schema definitions
254        let catalog = Arc::new(Catalog::new());
255
256        let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
257
258        // --- Single-file format (.grafeo) ---
259        #[cfg(feature = "grafeo-file")]
260        let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
261            // Read-only mode: open with shared lock, load snapshot, skip WAL
262            if let Some(ref db_path) = config.path {
263                if db_path.exists() && db_path.is_file() {
264                    let fm = GrafeoFileManager::open_read_only(db_path)?;
265                    let snapshot_data = fm.read_snapshot()?;
266                    if !snapshot_data.is_empty() {
267                        Self::apply_snapshot_data(
268                            &store,
269                            &catalog,
270                            #[cfg(feature = "rdf")]
271                            &rdf_store,
272                            &snapshot_data,
273                        )?;
274                    }
275                    Some(Arc::new(fm))
276                } else {
277                    return Err(grafeo_common::utils::error::Error::Internal(format!(
278                        "read-only open requires an existing .grafeo file: {}",
279                        db_path.display()
280                    )));
281                }
282            } else {
283                return Err(grafeo_common::utils::error::Error::Internal(
284                    "read-only mode requires a database path".to_string(),
285                ));
286            }
287        } else if config.wal_enabled {
288            if let Some(ref db_path) = config.path {
289                if Self::should_use_single_file(db_path, config.storage_format) {
290                    let fm = if db_path.exists() && db_path.is_file() {
291                        GrafeoFileManager::open(db_path)?
292                    } else if !db_path.exists() {
293                        GrafeoFileManager::create(db_path)?
294                    } else {
295                        // Path exists but is not a file (directory, etc.)
296                        return Err(grafeo_common::utils::error::Error::Internal(format!(
297                            "path exists but is not a file: {}",
298                            db_path.display()
299                        )));
300                    };
301
302                    // Load snapshot data from the file
303                    let snapshot_data = fm.read_snapshot()?;
304                    if !snapshot_data.is_empty() {
305                        Self::apply_snapshot_data(
306                            &store,
307                            &catalog,
308                            #[cfg(feature = "rdf")]
309                            &rdf_store,
310                            &snapshot_data,
311                        )?;
312                    }
313
314                    // Recover sidecar WAL if present
315                    if fm.has_sidecar_wal() {
316                        let recovery = WalRecovery::new(fm.sidecar_wal_path());
317                        let records = recovery.recover()?;
318                        Self::apply_wal_records(
319                            &store,
320                            &catalog,
321                            #[cfg(feature = "rdf")]
322                            &rdf_store,
323                            &records,
324                        )?;
325                    }
326
327                    Some(Arc::new(fm))
328                } else {
329                    None
330                }
331            } else {
332                None
333            }
334        } else {
335            None
336        };
337
338        // Determine whether to use the WAL directory path (legacy) or sidecar
339        // Read-only mode skips WAL entirely (no recovery, no creation).
340        #[cfg(feature = "wal")]
341        let wal = if is_read_only {
342            None
343        } else if config.wal_enabled {
344            if let Some(ref db_path) = config.path {
345                // When using single-file format, the WAL is a sidecar directory
346                #[cfg(feature = "grafeo-file")]
347                let wal_path = if let Some(ref fm) = file_manager {
348                    let p = fm.sidecar_wal_path();
349                    std::fs::create_dir_all(&p)?;
350                    p
351                } else {
352                    // Legacy: WAL inside the database directory
353                    std::fs::create_dir_all(db_path)?;
354                    db_path.join("wal")
355                };
356
357                #[cfg(not(feature = "grafeo-file"))]
358                let wal_path = {
359                    std::fs::create_dir_all(db_path)?;
360                    db_path.join("wal")
361                };
362
363                // For legacy WAL directory format, check if WAL exists and recover
364                #[cfg(feature = "grafeo-file")]
365                let is_single_file = file_manager.is_some();
366                #[cfg(not(feature = "grafeo-file"))]
367                let is_single_file = false;
368
369                if !is_single_file && wal_path.exists() {
370                    let recovery = WalRecovery::new(&wal_path);
371                    let records = recovery.recover()?;
372                    Self::apply_wal_records(
373                        &store,
374                        &catalog,
375                        #[cfg(feature = "rdf")]
376                        &rdf_store,
377                        &records,
378                    )?;
379                }
380
381                // Open/create WAL manager with configured durability
382                let wal_durability = match config.wal_durability {
383                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
384                    crate::config::DurabilityMode::Batch {
385                        max_delay_ms,
386                        max_records,
387                    } => WalDurabilityMode::Batch {
388                        max_delay_ms,
389                        max_records,
390                    },
391                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
392                        WalDurabilityMode::Adaptive { target_interval_ms }
393                    }
394                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
395                };
396                let wal_config = WalConfig {
397                    durability: wal_durability,
398                    ..WalConfig::default()
399                };
400                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
401                Some(Arc::new(wal_manager))
402            } else {
403                None
404            }
405        } else {
406            None
407        };
408
409        // Create query cache with default capacity (1000 queries)
410        let query_cache = Arc::new(QueryCache::default());
411
412        // After all snapshot/WAL recovery, sync TransactionManager epoch
413        // with the store so queries use the correct viewing epoch.
414        #[cfg(feature = "temporal")]
415        transaction_manager.sync_epoch(store.current_epoch());
416
417        Ok(Self {
418            config,
419            store,
420            catalog,
421            #[cfg(feature = "rdf")]
422            rdf_store,
423            transaction_manager,
424            buffer_manager,
425            #[cfg(feature = "wal")]
426            wal,
427            #[cfg(feature = "wal")]
428            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
429            query_cache,
430            commit_counter: Arc::new(AtomicUsize::new(0)),
431            is_open: RwLock::new(true),
432            #[cfg(feature = "cdc")]
433            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
434            #[cfg(feature = "embed")]
435            embedding_models: RwLock::new(hashbrown::HashMap::new()),
436            #[cfg(feature = "grafeo-file")]
437            file_manager,
438            external_store: None,
439            #[cfg(feature = "metrics")]
440            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
441            current_graph: RwLock::new(None),
442            read_only: is_read_only,
443        })
444    }
445
446    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
447    ///
448    /// The external store handles all data persistence. WAL, CDC, and index
449    /// management are the responsibility of the store implementation.
450    ///
451    /// Query execution (all 6 languages, optimizer, planner) works through the
452    /// provided store. Admin operations (schema introspection, persistence,
453    /// vector/text indexes) are not available on external stores.
454    ///
455    /// # Examples
456    ///
457    /// ```no_run
458    /// use std::sync::Arc;
459    /// use grafeo_engine::{GrafeoDB, Config};
460    /// use grafeo_core::graph::GraphStoreMut;
461    ///
462    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
463    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
464    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
465    ///     Ok(())
466    /// }
467    /// ```
468    ///
469    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
470    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
471        config
472            .validate()
473            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
474
475        let dummy_store = Arc::new(LpgStore::new()?);
476        let transaction_manager = Arc::new(TransactionManager::new());
477
478        let buffer_config = BufferManagerConfig {
479            budget: config.memory_limit.unwrap_or_else(|| {
480                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
481            }),
482            spill_path: None,
483            ..BufferManagerConfig::default()
484        };
485        let buffer_manager = BufferManager::new(buffer_config);
486
487        let query_cache = Arc::new(QueryCache::default());
488
489        Ok(Self {
490            config,
491            store: dummy_store,
492            catalog: Arc::new(Catalog::new()),
493            #[cfg(feature = "rdf")]
494            rdf_store: Arc::new(RdfStore::new()),
495            transaction_manager,
496            buffer_manager,
497            #[cfg(feature = "wal")]
498            wal: None,
499            #[cfg(feature = "wal")]
500            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
501            query_cache,
502            commit_counter: Arc::new(AtomicUsize::new(0)),
503            is_open: RwLock::new(true),
504            #[cfg(feature = "cdc")]
505            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
506            #[cfg(feature = "embed")]
507            embedding_models: RwLock::new(hashbrown::HashMap::new()),
508            #[cfg(feature = "grafeo-file")]
509            file_manager: None,
510            external_store: Some(store),
511            #[cfg(feature = "metrics")]
512            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
513            current_graph: RwLock::new(None),
514            read_only: false,
515        })
516    }
517
518    /// Applies WAL records to restore the database state.
519    ///
520    /// Data mutation records are routed through a graph cursor that tracks
521    /// `SwitchGraph` context markers, replaying mutations into the correct
522    /// named graph (or the default graph when cursor is `None`).
523    #[cfg(feature = "wal")]
524    fn apply_wal_records(
525        store: &Arc<LpgStore>,
526        catalog: &Catalog,
527        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
528        records: &[WalRecord],
529    ) -> Result<()> {
530        use crate::catalog::{
531            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
532        };
533        use grafeo_common::utils::error::Error;
534
535        // Graph cursor: tracks which named graph receives data mutations.
536        // `None` means the default graph.
537        let mut current_graph: Option<String> = None;
538        let mut target_store: Arc<LpgStore> = Arc::clone(store);
539
540        for record in records {
541            match record {
542                // --- Named graph lifecycle ---
543                WalRecord::CreateNamedGraph { name } => {
544                    let _ = store.create_graph(name);
545                }
546                WalRecord::DropNamedGraph { name } => {
547                    store.drop_graph(name);
548                    // Reset cursor if the dropped graph was active
549                    if current_graph.as_deref() == Some(name.as_str()) {
550                        current_graph = None;
551                        target_store = Arc::clone(store);
552                    }
553                }
554                WalRecord::SwitchGraph { name } => {
555                    current_graph.clone_from(name);
556                    target_store = match &current_graph {
557                        None => Arc::clone(store),
558                        Some(graph_name) => store
559                            .graph_or_create(graph_name)
560                            .map_err(|e| Error::Internal(e.to_string()))?,
561                    };
562                }
563
564                // --- Data mutations: routed through target_store ---
565                WalRecord::CreateNode { id, labels } => {
566                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
567                    target_store.create_node_with_id(*id, &label_refs)?;
568                }
569                WalRecord::DeleteNode { id } => {
570                    target_store.delete_node(*id);
571                }
572                WalRecord::CreateEdge {
573                    id,
574                    src,
575                    dst,
576                    edge_type,
577                } => {
578                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
579                }
580                WalRecord::DeleteEdge { id } => {
581                    target_store.delete_edge(*id);
582                }
583                WalRecord::SetNodeProperty { id, key, value } => {
584                    target_store.set_node_property(*id, key, value.clone());
585                }
586                WalRecord::SetEdgeProperty { id, key, value } => {
587                    target_store.set_edge_property(*id, key, value.clone());
588                }
589                WalRecord::AddNodeLabel { id, label } => {
590                    target_store.add_label(*id, label);
591                }
592                WalRecord::RemoveNodeLabel { id, label } => {
593                    target_store.remove_label(*id, label);
594                }
595                WalRecord::RemoveNodeProperty { id, key } => {
596                    target_store.remove_node_property(*id, key);
597                }
598                WalRecord::RemoveEdgeProperty { id, key } => {
599                    target_store.remove_edge_property(*id, key);
600                }
601
602                // --- Schema DDL replay (always on root catalog) ---
603                WalRecord::CreateNodeType {
604                    name,
605                    properties,
606                    constraints,
607                } => {
608                    let def = NodeTypeDefinition {
609                        name: name.clone(),
610                        properties: properties
611                            .iter()
612                            .map(|(n, t, nullable)| TypedProperty {
613                                name: n.clone(),
614                                data_type: PropertyDataType::from_type_name(t),
615                                nullable: *nullable,
616                                default_value: None,
617                            })
618                            .collect(),
619                        constraints: constraints
620                            .iter()
621                            .map(|(kind, props)| match kind.as_str() {
622                                "unique" => TypeConstraint::Unique(props.clone()),
623                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
624                                "not_null" if !props.is_empty() => {
625                                    TypeConstraint::NotNull(props[0].clone())
626                                }
627                                _ => TypeConstraint::Unique(props.clone()),
628                            })
629                            .collect(),
630                        parent_types: Vec::new(),
631                    };
632                    let _ = catalog.register_node_type(def);
633                }
634                WalRecord::DropNodeType { name } => {
635                    let _ = catalog.drop_node_type(name);
636                }
637                WalRecord::CreateEdgeType {
638                    name,
639                    properties,
640                    constraints,
641                } => {
642                    let def = EdgeTypeDefinition {
643                        name: name.clone(),
644                        properties: properties
645                            .iter()
646                            .map(|(n, t, nullable)| TypedProperty {
647                                name: n.clone(),
648                                data_type: PropertyDataType::from_type_name(t),
649                                nullable: *nullable,
650                                default_value: None,
651                            })
652                            .collect(),
653                        constraints: constraints
654                            .iter()
655                            .map(|(kind, props)| match kind.as_str() {
656                                "unique" => TypeConstraint::Unique(props.clone()),
657                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
658                                "not_null" if !props.is_empty() => {
659                                    TypeConstraint::NotNull(props[0].clone())
660                                }
661                                _ => TypeConstraint::Unique(props.clone()),
662                            })
663                            .collect(),
664                        source_node_types: Vec::new(),
665                        target_node_types: Vec::new(),
666                    };
667                    let _ = catalog.register_edge_type_def(def);
668                }
669                WalRecord::DropEdgeType { name } => {
670                    let _ = catalog.drop_edge_type_def(name);
671                }
672                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
673                    // Index recreation is handled by the store on startup
674                    // (indexes are rebuilt from data, not WAL)
675                }
676                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
677                    // Constraint definitions are part of type definitions
678                    // and replayed via CreateNodeType/CreateEdgeType
679                }
680                WalRecord::CreateGraphType {
681                    name,
682                    node_types,
683                    edge_types,
684                    open,
685                } => {
686                    use crate::catalog::GraphTypeDefinition;
687                    let def = GraphTypeDefinition {
688                        name: name.clone(),
689                        allowed_node_types: node_types.clone(),
690                        allowed_edge_types: edge_types.clone(),
691                        open: *open,
692                    };
693                    let _ = catalog.register_graph_type(def);
694                }
695                WalRecord::DropGraphType { name } => {
696                    let _ = catalog.drop_graph_type(name);
697                }
698                WalRecord::CreateSchema { name } => {
699                    let _ = catalog.register_schema_namespace(name.clone());
700                }
701                WalRecord::DropSchema { name } => {
702                    let _ = catalog.drop_schema_namespace(name);
703                }
704
705                WalRecord::AlterNodeType { name, alterations } => {
706                    for (action, prop_name, type_name, nullable) in alterations {
707                        match action.as_str() {
708                            "add" => {
709                                let prop = TypedProperty {
710                                    name: prop_name.clone(),
711                                    data_type: PropertyDataType::from_type_name(type_name),
712                                    nullable: *nullable,
713                                    default_value: None,
714                                };
715                                let _ = catalog.alter_node_type_add_property(name, prop);
716                            }
717                            "drop" => {
718                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
719                            }
720                            _ => {}
721                        }
722                    }
723                }
724                WalRecord::AlterEdgeType { name, alterations } => {
725                    for (action, prop_name, type_name, nullable) in alterations {
726                        match action.as_str() {
727                            "add" => {
728                                let prop = TypedProperty {
729                                    name: prop_name.clone(),
730                                    data_type: PropertyDataType::from_type_name(type_name),
731                                    nullable: *nullable,
732                                    default_value: None,
733                                };
734                                let _ = catalog.alter_edge_type_add_property(name, prop);
735                            }
736                            "drop" => {
737                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
738                            }
739                            _ => {}
740                        }
741                    }
742                }
743                WalRecord::AlterGraphType { name, alterations } => {
744                    for (action, type_name) in alterations {
745                        match action.as_str() {
746                            "add_node" => {
747                                let _ =
748                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
749                            }
750                            "drop_node" => {
751                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
752                            }
753                            "add_edge" => {
754                                let _ =
755                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
756                            }
757                            "drop_edge" => {
758                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
759                            }
760                            _ => {}
761                        }
762                    }
763                }
764
765                WalRecord::CreateProcedure {
766                    name,
767                    params,
768                    returns,
769                    body,
770                } => {
771                    use crate::catalog::ProcedureDefinition;
772                    let def = ProcedureDefinition {
773                        name: name.clone(),
774                        params: params.clone(),
775                        returns: returns.clone(),
776                        body: body.clone(),
777                    };
778                    let _ = catalog.register_procedure(def);
779                }
780                WalRecord::DropProcedure { name } => {
781                    let _ = catalog.drop_procedure(name);
782                }
783
784                // --- RDF triple replay ---
785                #[cfg(feature = "rdf")]
786                WalRecord::InsertRdfTriple { .. }
787                | WalRecord::DeleteRdfTriple { .. }
788                | WalRecord::ClearRdfGraph { .. }
789                | WalRecord::CreateRdfGraph { .. }
790                | WalRecord::DropRdfGraph { .. } => {
791                    rdf_ops::replay_rdf_wal_record(rdf_store, record);
792                }
793                #[cfg(not(feature = "rdf"))]
794                WalRecord::InsertRdfTriple { .. }
795                | WalRecord::DeleteRdfTriple { .. }
796                | WalRecord::ClearRdfGraph { .. }
797                | WalRecord::CreateRdfGraph { .. }
798                | WalRecord::DropRdfGraph { .. } => {}
799
800                WalRecord::TransactionCommit { .. } => {
801                    // In temporal mode, advance the store epoch on each committed
802                    // transaction so that subsequent property/label operations
803                    // are recorded at the correct epoch in their VersionLogs.
804                    #[cfg(feature = "temporal")]
805                    {
806                        target_store.new_epoch();
807                    }
808                }
809                WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
810                    // Transaction control records don't need replay action
811                    // (recovery already filtered to only committed transactions)
812                }
813            }
814        }
815        Ok(())
816    }
817
818    // =========================================================================
819    // Single-file format helpers
820    // =========================================================================
821
822    /// Returns `true` if the given path should use single-file format.
823    #[cfg(feature = "grafeo-file")]
824    fn should_use_single_file(
825        path: &std::path::Path,
826        configured: crate::config::StorageFormat,
827    ) -> bool {
828        use crate::config::StorageFormat;
829        match configured {
830            StorageFormat::SingleFile => true,
831            StorageFormat::WalDirectory => false,
832            StorageFormat::Auto => {
833                // Existing file: check magic bytes
834                if path.is_file() {
835                    if let Ok(mut f) = std::fs::File::open(path) {
836                        use std::io::Read;
837                        let mut magic = [0u8; 4];
838                        if f.read_exact(&mut magic).is_ok()
839                            && magic == grafeo_adapters::storage::file::MAGIC
840                        {
841                            return true;
842                        }
843                    }
844                    return false;
845                }
846                // Existing directory: legacy format
847                if path.is_dir() {
848                    return false;
849                }
850                // New path: check extension
851                path.extension().is_some_and(|ext| ext == "grafeo")
852            }
853        }
854    }
855
856    /// Applies snapshot data (from a `.grafeo` file) to restore the store and catalog.
857    #[cfg(feature = "grafeo-file")]
858    fn apply_snapshot_data(
859        store: &Arc<LpgStore>,
860        catalog: &Arc<crate::catalog::Catalog>,
861        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
862        data: &[u8],
863    ) -> Result<()> {
864        persistence::load_snapshot_into_store(
865            store,
866            catalog,
867            #[cfg(feature = "rdf")]
868            rdf_store,
869            data,
870        )
871    }
872
873    // =========================================================================
874    // Session & Configuration
875    // =========================================================================
876
877    /// Opens a new session for running queries.
878    ///
879    /// Sessions are cheap to create: spin up as many as you need. Each
880    /// gets its own transaction context, so concurrent sessions won't
881    /// block each other on reads.
882    ///
883    /// # Panics
884    ///
885    /// Panics if the database was configured with an external graph store and
886    /// the internal arena allocator cannot be initialized (out of memory).
887    ///
888    /// # Examples
889    ///
890    /// ```
891    /// use grafeo_engine::GrafeoDB;
892    ///
893    /// let db = GrafeoDB::new_in_memory();
894    /// let session = db.session();
895    ///
896    /// // Run queries through the session
897    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
898    /// # Ok::<(), grafeo_common::utils::error::Error>(())
899    /// ```
900    #[must_use]
901    pub fn session(&self) -> Session {
902        let session_cfg = || crate::session::SessionConfig {
903            transaction_manager: Arc::clone(&self.transaction_manager),
904            query_cache: Arc::clone(&self.query_cache),
905            catalog: Arc::clone(&self.catalog),
906            adaptive_config: self.config.adaptive.clone(),
907            factorized_execution: self.config.factorized_execution,
908            graph_model: self.config.graph_model,
909            query_timeout: self.config.query_timeout,
910            commit_counter: Arc::clone(&self.commit_counter),
911            gc_interval: self.config.gc_interval,
912            read_only: self.read_only,
913        };
914
915        if let Some(ref ext_store) = self.external_store {
916            return Session::with_external_store(Arc::clone(ext_store), session_cfg())
917                .expect("arena allocation for external store session");
918        }
919
920        #[cfg(feature = "rdf")]
921        let mut session = Session::with_rdf_store_and_adaptive(
922            Arc::clone(&self.store),
923            Arc::clone(&self.rdf_store),
924            session_cfg(),
925        );
926        #[cfg(not(feature = "rdf"))]
927        let mut session = Session::with_adaptive(Arc::clone(&self.store), session_cfg());
928
929        #[cfg(feature = "wal")]
930        if let Some(ref wal) = self.wal {
931            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
932        }
933
934        #[cfg(feature = "cdc")]
935        session.set_cdc_log(Arc::clone(&self.cdc_log));
936
937        #[cfg(feature = "metrics")]
938        {
939            if let Some(ref m) = self.metrics {
940                session.set_metrics(Arc::clone(m));
941                m.session_created
942                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
943                m.session_active
944                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
945            }
946        }
947
948        // Propagate persistent graph context to the new session
949        if let Some(ref graph) = *self.current_graph.read() {
950            session.use_graph(graph);
951        }
952
953        // Suppress unused_mut when cdc/wal are disabled
954        let _ = &mut session;
955
956        session
957    }
958
959    /// Returns the current graph name, if any.
960    ///
961    /// This is the persistent graph context used by one-shot `execute()` calls.
962    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
963    /// or `SESSION RESET`.
964    #[must_use]
965    pub fn current_graph(&self) -> Option<String> {
966        self.current_graph.read().clone()
967    }
968
969    /// Sets the current graph context for subsequent one-shot `execute()` calls.
970    ///
971    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
972    /// Pass `None` to reset to the default graph.
973    pub fn set_current_graph(&self, name: Option<&str>) {
974        *self.current_graph.write() = name.map(ToString::to_string);
975    }
976
977    /// Returns the adaptive execution configuration.
978    #[must_use]
979    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
980        &self.config.adaptive
981    }
982
983    /// Returns `true` if this database was opened in read-only mode.
984    #[must_use]
985    pub fn is_read_only(&self) -> bool {
986        self.read_only
987    }
988
989    /// Returns the configuration.
990    #[must_use]
991    pub fn config(&self) -> &Config {
992        &self.config
993    }
994
995    /// Returns the graph data model of this database.
996    #[must_use]
997    pub fn graph_model(&self) -> crate::config::GraphModel {
998        self.config.graph_model
999    }
1000
1001    /// Returns the configured memory limit in bytes, if any.
1002    #[must_use]
1003    pub fn memory_limit(&self) -> Option<usize> {
1004        self.config.memory_limit
1005    }
1006
1007    /// Returns a point-in-time snapshot of all metrics.
1008    ///
1009    /// If the `metrics` feature is disabled or the registry is not
1010    /// initialized, returns a default (all-zero) snapshot.
1011    #[cfg(feature = "metrics")]
1012    #[must_use]
1013    pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1014        let mut snapshot = self
1015            .metrics
1016            .as_ref()
1017            .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1018
1019        // Augment with cache stats from the query cache (not tracked in the registry)
1020        let cache_stats = self.query_cache.stats();
1021        snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1022        snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1023        snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1024        snapshot.cache_invalidations = cache_stats.invalidations;
1025
1026        snapshot
1027    }
1028
1029    /// Returns all metrics in Prometheus text exposition format.
1030    ///
1031    /// The output is ready to serve from an HTTP `/metrics` endpoint.
1032    #[cfg(feature = "metrics")]
1033    #[must_use]
1034    pub fn metrics_prometheus(&self) -> String {
1035        self.metrics
1036            .as_ref()
1037            .map_or_else(String::new, |m| m.to_prometheus())
1038    }
1039
1040    /// Resets all metrics counters and histograms to zero.
1041    #[cfg(feature = "metrics")]
1042    pub fn reset_metrics(&self) {
1043        if let Some(ref m) = self.metrics {
1044            m.reset();
1045        }
1046        self.query_cache.reset_stats();
1047    }
1048
1049    /// Returns the underlying (default) store.
1050    ///
1051    /// This provides direct access to the LPG store for algorithm implementations
1052    /// and admin operations (index management, schema introspection, MVCC internals).
1053    ///
1054    /// For code that only needs read/write graph operations, prefer
1055    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
1056    #[must_use]
1057    pub fn store(&self) -> &Arc<LpgStore> {
1058        &self.store
1059    }
1060
1061    /// Returns the LPG store for the currently active graph.
1062    ///
1063    /// If [`current_graph`](Self::current_graph) is `None` or `"default"`, returns
1064    /// the default store. Otherwise looks up the named graph in the root store.
1065    /// Falls back to the default store if the named graph does not exist.
1066    #[allow(dead_code)] // Reserved for future graph-aware CRUD methods
1067    fn active_store(&self) -> Arc<LpgStore> {
1068        let graph_name = self.current_graph.read().clone();
1069        match graph_name {
1070            None => Arc::clone(&self.store),
1071            Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
1072            Some(ref name) => self
1073                .store
1074                .graph(name)
1075                .unwrap_or_else(|| Arc::clone(&self.store)),
1076        }
1077    }
1078
1079    // === Named Graph Management ===
1080
1081    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
1082    ///
1083    /// # Errors
1084    ///
1085    /// Returns an error if arena allocation fails.
1086    pub fn create_graph(&self, name: &str) -> Result<bool> {
1087        Ok(self.store.create_graph(name)?)
1088    }
1089
1090    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
1091    pub fn drop_graph(&self, name: &str) -> bool {
1092        self.store.drop_graph(name)
1093    }
1094
1095    /// Returns all named graph names.
1096    #[must_use]
1097    pub fn list_graphs(&self) -> Vec<String> {
1098        self.store.graph_names()
1099    }
1100
1101    /// Returns the graph store as a trait object.
1102    ///
1103    /// This provides the [`GraphStoreMut`] interface for code that should work
1104    /// with any storage backend. Use this when you only need graph read/write
1105    /// operations and don't need admin methods like index management.
1106    ///
1107    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
1108    #[must_use]
1109    pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
1110        if let Some(ref ext_store) = self.external_store {
1111            Arc::clone(ext_store)
1112        } else {
1113            Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
1114        }
1115    }
1116
1117    /// Garbage collects old MVCC versions that are no longer visible.
1118    ///
1119    /// Determines the minimum epoch required by active transactions and prunes
1120    /// version chains older than that threshold. Also cleans up completed
1121    /// transaction metadata in the transaction manager.
1122    pub fn gc(&self) {
1123        let min_epoch = self.transaction_manager.min_active_epoch();
1124        self.store.gc_versions(min_epoch);
1125        self.transaction_manager.gc();
1126    }
1127
1128    /// Returns the buffer manager for memory-aware operations.
1129    #[must_use]
1130    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1131        &self.buffer_manager
1132    }
1133
1134    /// Returns the query cache.
1135    #[must_use]
1136    pub fn query_cache(&self) -> &Arc<QueryCache> {
1137        &self.query_cache
1138    }
1139
1140    /// Clears all cached query plans.
1141    ///
1142    /// This is called automatically after DDL operations, but can also be
1143    /// invoked manually after external schema changes (e.g., WAL replay,
1144    /// import) or when you want to force re-optimization of all queries.
1145    pub fn clear_plan_cache(&self) {
1146        self.query_cache.clear();
1147    }
1148
1149    // =========================================================================
1150    // Lifecycle
1151    // =========================================================================
1152
1153    /// Closes the database, flushing all pending writes.
1154    ///
1155    /// For persistent databases, this ensures everything is safely on disk.
1156    /// Called automatically when the database is dropped, but you can call
1157    /// it explicitly if you need to guarantee durability at a specific point.
1158    ///
1159    /// # Errors
1160    ///
1161    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
1162    pub fn close(&self) -> Result<()> {
1163        let mut is_open = self.is_open.write();
1164        if !*is_open {
1165            return Ok(());
1166        }
1167
1168        // Read-only databases: just release the shared lock, no checkpointing
1169        if self.read_only {
1170            #[cfg(feature = "grafeo-file")]
1171            if let Some(ref fm) = self.file_manager {
1172                fm.close()?;
1173            }
1174            *is_open = false;
1175            return Ok(());
1176        }
1177
1178        // For single-file format: checkpoint to .grafeo file, then clean up sidecar WAL.
1179        // We must do this BEFORE the WAL close path because checkpoint_to_file
1180        // removes the sidecar WAL directory.
1181        #[cfg(feature = "grafeo-file")]
1182        let is_single_file = self.file_manager.is_some();
1183        #[cfg(not(feature = "grafeo-file"))]
1184        let is_single_file = false;
1185
1186        #[cfg(feature = "grafeo-file")]
1187        if let Some(ref fm) = self.file_manager {
1188            // Flush WAL first so all records are on disk before we snapshot
1189            #[cfg(feature = "wal")]
1190            if let Some(ref wal) = self.wal {
1191                wal.sync()?;
1192            }
1193            self.checkpoint_to_file(fm)?;
1194
1195            // Release WAL file handles before removing sidecar directory.
1196            // On Windows, open handles prevent directory deletion.
1197            #[cfg(feature = "wal")]
1198            if let Some(ref wal) = self.wal {
1199                wal.close_active_log();
1200            }
1201
1202            fm.remove_sidecar_wal()?;
1203            fm.close()?;
1204        }
1205
1206        // Commit and checkpoint WAL (legacy directory format only)
1207        #[cfg(feature = "wal")]
1208        if !is_single_file && let Some(ref wal) = self.wal {
1209            let epoch = self.store.current_epoch();
1210
1211            // Use the last assigned transaction ID, or create a checkpoint-only tx
1212            let checkpoint_tx = self
1213                .transaction_manager
1214                .last_assigned_transaction_id()
1215                .unwrap_or_else(|| {
1216                    // No transactions have been started; begin one for checkpoint
1217                    self.transaction_manager.begin()
1218                });
1219
1220            // Log a TransactionCommit to mark all pending records as committed
1221            wal.log(&WalRecord::TransactionCommit {
1222                transaction_id: checkpoint_tx,
1223            })?;
1224
1225            // Then checkpoint
1226            wal.checkpoint(checkpoint_tx, epoch)?;
1227            wal.sync()?;
1228        }
1229
1230        *is_open = false;
1231        Ok(())
1232    }
1233
1234    /// Returns the typed WAL if available.
1235    #[cfg(feature = "wal")]
1236    #[must_use]
1237    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1238        self.wal.as_ref()
1239    }
1240
1241    /// Logs a WAL record if WAL is enabled.
1242    #[cfg(feature = "wal")]
1243    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1244        if let Some(ref wal) = self.wal {
1245            wal.log(record)?;
1246        }
1247        Ok(())
1248    }
1249
1250    /// Writes the current database snapshot to the `.grafeo` file.
1251    ///
1252    /// Does NOT remove the sidecar WAL: callers that want to clean up
1253    /// the sidecar (e.g. `close()`) should call `fm.remove_sidecar_wal()`
1254    /// separately after this returns.
1255    #[cfg(feature = "grafeo-file")]
1256    fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1257        use grafeo_core::testing::crash::maybe_crash;
1258
1259        maybe_crash("checkpoint_to_file:before_export");
1260        let snapshot_data = self.export_snapshot()?;
1261        maybe_crash("checkpoint_to_file:after_export");
1262
1263        let epoch = self.store.current_epoch();
1264        let transaction_id = self
1265            .transaction_manager
1266            .last_assigned_transaction_id()
1267            .map_or(0, |t| t.0);
1268        let node_count = self.store.node_count() as u64;
1269        let edge_count = self.store.edge_count() as u64;
1270
1271        fm.write_snapshot(
1272            &snapshot_data,
1273            epoch.0,
1274            transaction_id,
1275            node_count,
1276            edge_count,
1277        )?;
1278
1279        maybe_crash("checkpoint_to_file:after_write_snapshot");
1280        Ok(())
1281    }
1282
1283    /// Returns the file manager if using single-file format.
1284    #[cfg(feature = "grafeo-file")]
1285    #[must_use]
1286    pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1287        self.file_manager.as_ref()
1288    }
1289}
1290
1291impl Drop for GrafeoDB {
1292    fn drop(&mut self) {
1293        if let Err(e) = self.close() {
1294            tracing::error!("Error closing database: {}", e);
1295        }
1296    }
1297}
1298
1299impl crate::admin::AdminService for GrafeoDB {
1300    fn info(&self) -> crate::admin::DatabaseInfo {
1301        self.info()
1302    }
1303
1304    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1305        self.detailed_stats()
1306    }
1307
1308    fn schema(&self) -> crate::admin::SchemaInfo {
1309        self.schema()
1310    }
1311
1312    fn validate(&self) -> crate::admin::ValidationResult {
1313        self.validate()
1314    }
1315
1316    fn wal_status(&self) -> crate::admin::WalStatus {
1317        self.wal_status()
1318    }
1319
1320    fn wal_checkpoint(&self) -> Result<()> {
1321        self.wal_checkpoint()
1322    }
1323}
1324
1325// =========================================================================
1326// Query Result Types
1327// =========================================================================
1328
1329/// The result of running a query.
1330///
1331/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1332/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1333///
1334/// # Examples
1335///
1336/// ```
1337/// use grafeo_engine::GrafeoDB;
1338///
1339/// let db = GrafeoDB::new_in_memory();
1340/// db.create_node(&["Person"]);
1341///
1342/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1343///
1344/// // Check what we got
1345/// println!("Columns: {:?}", result.columns);
1346/// println!("Rows: {}", result.row_count());
1347///
1348/// // Iterate through results
1349/// for row in result.iter() {
1350///     println!("{:?}", row);
1351/// }
1352/// # Ok::<(), grafeo_common::utils::error::Error>(())
1353/// ```
1354#[derive(Debug)]
1355pub struct QueryResult {
1356    /// Column names from the RETURN clause.
1357    pub columns: Vec<String>,
1358    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1359    pub column_types: Vec<grafeo_common::types::LogicalType>,
1360    /// The actual result rows.
1361    pub rows: Vec<Vec<grafeo_common::types::Value>>,
1362    /// Query execution time in milliseconds (if timing was enabled).
1363    pub execution_time_ms: Option<f64>,
1364    /// Number of rows scanned during query execution (estimate).
1365    pub rows_scanned: Option<u64>,
1366    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
1367    pub status_message: Option<String>,
1368    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
1369    pub gql_status: grafeo_common::utils::GqlStatus,
1370}
1371
1372impl QueryResult {
1373    /// Creates a fully empty query result (no columns, no rows).
1374    #[must_use]
1375    pub fn empty() -> Self {
1376        Self {
1377            columns: Vec::new(),
1378            column_types: Vec::new(),
1379            rows: Vec::new(),
1380            execution_time_ms: None,
1381            rows_scanned: None,
1382            status_message: None,
1383            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1384        }
1385    }
1386
1387    /// Creates a query result with only a status message (for DDL commands).
1388    #[must_use]
1389    pub fn status(msg: impl Into<String>) -> Self {
1390        Self {
1391            columns: Vec::new(),
1392            column_types: Vec::new(),
1393            rows: Vec::new(),
1394            execution_time_ms: None,
1395            rows_scanned: None,
1396            status_message: Some(msg.into()),
1397            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1398        }
1399    }
1400
1401    /// Creates a new empty query result.
1402    #[must_use]
1403    pub fn new(columns: Vec<String>) -> Self {
1404        let len = columns.len();
1405        Self {
1406            columns,
1407            column_types: vec![grafeo_common::types::LogicalType::Any; len],
1408            rows: Vec::new(),
1409            execution_time_ms: None,
1410            rows_scanned: None,
1411            status_message: None,
1412            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1413        }
1414    }
1415
1416    /// Creates a new empty query result with column types.
1417    #[must_use]
1418    pub fn with_types(
1419        columns: Vec<String>,
1420        column_types: Vec<grafeo_common::types::LogicalType>,
1421    ) -> Self {
1422        Self {
1423            columns,
1424            column_types,
1425            rows: Vec::new(),
1426            execution_time_ms: None,
1427            rows_scanned: None,
1428            status_message: None,
1429            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1430        }
1431    }
1432
1433    /// Sets the execution metrics on this result.
1434    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1435        self.execution_time_ms = Some(execution_time_ms);
1436        self.rows_scanned = Some(rows_scanned);
1437        self
1438    }
1439
1440    /// Returns the execution time in milliseconds, if available.
1441    #[must_use]
1442    pub fn execution_time_ms(&self) -> Option<f64> {
1443        self.execution_time_ms
1444    }
1445
1446    /// Returns the number of rows scanned, if available.
1447    #[must_use]
1448    pub fn rows_scanned(&self) -> Option<u64> {
1449        self.rows_scanned
1450    }
1451
1452    /// Returns the number of rows.
1453    #[must_use]
1454    pub fn row_count(&self) -> usize {
1455        self.rows.len()
1456    }
1457
1458    /// Returns the number of columns.
1459    #[must_use]
1460    pub fn column_count(&self) -> usize {
1461        self.columns.len()
1462    }
1463
1464    /// Returns true if the result is empty.
1465    #[must_use]
1466    pub fn is_empty(&self) -> bool {
1467        self.rows.is_empty()
1468    }
1469
1470    /// Extracts a single value from the result.
1471    ///
1472    /// Use this when your query returns exactly one row with one column,
1473    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1474    ///
1475    /// # Errors
1476    ///
1477    /// Returns an error if the result has multiple rows or columns.
1478    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1479        if self.rows.len() != 1 || self.columns.len() != 1 {
1480            return Err(grafeo_common::utils::error::Error::InvalidValue(
1481                "Expected single value".to_string(),
1482            ));
1483        }
1484        T::from_value(&self.rows[0][0])
1485    }
1486
1487    /// Returns an iterator over the rows.
1488    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1489        self.rows.iter()
1490    }
1491}
1492
1493impl std::fmt::Display for QueryResult {
1494    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1495        let table = grafeo_common::fmt::format_result_table(
1496            &self.columns,
1497            &self.rows,
1498            self.execution_time_ms,
1499            self.status_message.as_deref(),
1500        );
1501        f.write_str(&table)
1502    }
1503}
1504
1505/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
1506///
1507/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1508/// Used by [`QueryResult::scalar()`] to extract typed values.
1509pub trait FromValue: Sized {
1510    /// Attempts the conversion, returning an error on type mismatch.
1511    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1512}
1513
1514impl FromValue for i64 {
1515    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1516        value
1517            .as_int64()
1518            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1519                expected: "INT64".to_string(),
1520                found: value.type_name().to_string(),
1521            })
1522    }
1523}
1524
1525impl FromValue for f64 {
1526    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1527        value
1528            .as_float64()
1529            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1530                expected: "FLOAT64".to_string(),
1531                found: value.type_name().to_string(),
1532            })
1533    }
1534}
1535
1536impl FromValue for String {
1537    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1538        value.as_str().map(String::from).ok_or_else(|| {
1539            grafeo_common::utils::error::Error::TypeMismatch {
1540                expected: "STRING".to_string(),
1541                found: value.type_name().to_string(),
1542            }
1543        })
1544    }
1545}
1546
1547impl FromValue for bool {
1548    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1549        value
1550            .as_bool()
1551            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1552                expected: "BOOL".to_string(),
1553                found: value.type_name().to_string(),
1554            })
1555    }
1556}
1557
1558#[cfg(test)]
1559mod tests {
1560    use super::*;
1561
1562    #[test]
1563    fn test_create_in_memory_database() {
1564        let db = GrafeoDB::new_in_memory();
1565        assert_eq!(db.node_count(), 0);
1566        assert_eq!(db.edge_count(), 0);
1567    }
1568
1569    #[test]
1570    fn test_database_config() {
1571        let config = Config::in_memory().with_threads(4).with_query_logging();
1572
1573        let db = GrafeoDB::with_config(config).unwrap();
1574        assert_eq!(db.config().threads, 4);
1575        assert!(db.config().query_logging);
1576    }
1577
1578    #[test]
1579    fn test_database_session() {
1580        let db = GrafeoDB::new_in_memory();
1581        let _session = db.session();
1582        // Session should be created successfully
1583    }
1584
1585    #[cfg(feature = "wal")]
1586    #[test]
1587    fn test_persistent_database_recovery() {
1588        use grafeo_common::types::Value;
1589        use tempfile::tempdir;
1590
1591        let dir = tempdir().unwrap();
1592        let db_path = dir.path().join("test_db");
1593
1594        // Create database and add some data
1595        {
1596            let db = GrafeoDB::open(&db_path).unwrap();
1597
1598            let alix = db.create_node(&["Person"]);
1599            db.set_node_property(alix, "name", Value::from("Alix"));
1600
1601            let gus = db.create_node(&["Person"]);
1602            db.set_node_property(gus, "name", Value::from("Gus"));
1603
1604            let _edge = db.create_edge(alix, gus, "KNOWS");
1605
1606            // Explicitly close to flush WAL
1607            db.close().unwrap();
1608        }
1609
1610        // Reopen and verify data was recovered
1611        {
1612            let db = GrafeoDB::open(&db_path).unwrap();
1613
1614            assert_eq!(db.node_count(), 2);
1615            assert_eq!(db.edge_count(), 1);
1616
1617            // Verify nodes exist
1618            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1619            assert!(node0.is_some());
1620
1621            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1622            assert!(node1.is_some());
1623        }
1624    }
1625
1626    #[cfg(feature = "wal")]
1627    #[test]
1628    fn test_wal_logging() {
1629        use tempfile::tempdir;
1630
1631        let dir = tempdir().unwrap();
1632        let db_path = dir.path().join("wal_test_db");
1633
1634        let db = GrafeoDB::open(&db_path).unwrap();
1635
1636        // Create some data
1637        let node = db.create_node(&["Test"]);
1638        db.delete_node(node);
1639
1640        // WAL should have records
1641        if let Some(wal) = db.wal() {
1642            assert!(wal.record_count() > 0);
1643        }
1644
1645        db.close().unwrap();
1646    }
1647
1648    #[cfg(feature = "wal")]
1649    #[test]
1650    fn test_wal_recovery_multiple_sessions() {
1651        // Tests that WAL recovery works correctly across multiple open/close cycles
1652        use grafeo_common::types::Value;
1653        use tempfile::tempdir;
1654
1655        let dir = tempdir().unwrap();
1656        let db_path = dir.path().join("multi_session_db");
1657
1658        // Session 1: Create initial data
1659        {
1660            let db = GrafeoDB::open(&db_path).unwrap();
1661            let alix = db.create_node(&["Person"]);
1662            db.set_node_property(alix, "name", Value::from("Alix"));
1663            db.close().unwrap();
1664        }
1665
1666        // Session 2: Add more data
1667        {
1668            let db = GrafeoDB::open(&db_path).unwrap();
1669            assert_eq!(db.node_count(), 1); // Previous data recovered
1670            let gus = db.create_node(&["Person"]);
1671            db.set_node_property(gus, "name", Value::from("Gus"));
1672            db.close().unwrap();
1673        }
1674
1675        // Session 3: Verify all data
1676        {
1677            let db = GrafeoDB::open(&db_path).unwrap();
1678            assert_eq!(db.node_count(), 2);
1679
1680            // Verify properties were recovered correctly
1681            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1682            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1683
1684            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1685            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1686        }
1687    }
1688
1689    #[cfg(feature = "wal")]
1690    #[test]
1691    fn test_database_consistency_after_mutations() {
1692        // Tests that database remains consistent after a series of create/delete operations
1693        use grafeo_common::types::Value;
1694        use tempfile::tempdir;
1695
1696        let dir = tempdir().unwrap();
1697        let db_path = dir.path().join("consistency_db");
1698
1699        {
1700            let db = GrafeoDB::open(&db_path).unwrap();
1701
1702            // Create nodes
1703            let a = db.create_node(&["Node"]);
1704            let b = db.create_node(&["Node"]);
1705            let c = db.create_node(&["Node"]);
1706
1707            // Create edges
1708            let e1 = db.create_edge(a, b, "LINKS");
1709            let _e2 = db.create_edge(b, c, "LINKS");
1710
1711            // Delete middle node and its edge
1712            db.delete_edge(e1);
1713            db.delete_node(b);
1714
1715            // Set properties on remaining nodes
1716            db.set_node_property(a, "value", Value::Int64(1));
1717            db.set_node_property(c, "value", Value::Int64(3));
1718
1719            db.close().unwrap();
1720        }
1721
1722        // Reopen and verify consistency
1723        {
1724            let db = GrafeoDB::open(&db_path).unwrap();
1725
1726            // Should have 2 nodes (a and c), b was deleted
1727            // Note: node_count includes deleted nodes in some implementations
1728            // What matters is that the non-deleted nodes are accessible
1729            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1730            assert!(node_a.is_some());
1731
1732            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1733            assert!(node_c.is_some());
1734
1735            // Middle node should be deleted
1736            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1737            assert!(node_b.is_none());
1738        }
1739    }
1740
1741    #[cfg(feature = "wal")]
1742    #[test]
1743    fn test_close_is_idempotent() {
1744        // Calling close() multiple times should not cause errors
1745        use tempfile::tempdir;
1746
1747        let dir = tempdir().unwrap();
1748        let db_path = dir.path().join("close_test_db");
1749
1750        let db = GrafeoDB::open(&db_path).unwrap();
1751        db.create_node(&["Test"]);
1752
1753        // First close should succeed
1754        assert!(db.close().is_ok());
1755
1756        // Second close should also succeed (idempotent)
1757        assert!(db.close().is_ok());
1758    }
1759
1760    #[test]
1761    fn test_with_store_external_backend() {
1762        use grafeo_core::graph::lpg::LpgStore;
1763
1764        let external = Arc::new(LpgStore::new().unwrap());
1765
1766        // Seed data on the external store directly
1767        let n1 = external.create_node(&["Person"]);
1768        external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
1769
1770        let db = GrafeoDB::with_store(
1771            Arc::clone(&external) as Arc<dyn GraphStoreMut>,
1772            Config::in_memory(),
1773        )
1774        .unwrap();
1775
1776        let session = db.session();
1777
1778        // Session should see data from the external store via execute
1779        #[cfg(feature = "gql")]
1780        {
1781            let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
1782            assert_eq!(result.rows.len(), 1);
1783        }
1784    }
1785
1786    #[test]
1787    fn test_with_config_custom_memory_limit() {
1788        let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); // 64 MB
1789
1790        let db = GrafeoDB::with_config(config).unwrap();
1791        assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
1792        assert_eq!(db.node_count(), 0);
1793    }
1794
1795    #[cfg(feature = "metrics")]
1796    #[test]
1797    fn test_database_metrics_registry() {
1798        let db = GrafeoDB::new_in_memory();
1799
1800        // Perform some operations
1801        db.create_node(&["Person"]);
1802        db.create_node(&["Person"]);
1803
1804        // Check that metrics snapshot returns data
1805        let snap = db.metrics();
1806        // Session created counter should reflect at least 0 (metrics is initialized)
1807        assert_eq!(snap.query_count, 0); // No queries executed yet
1808    }
1809
1810    #[test]
1811    fn test_query_result_has_metrics() {
1812        // Verifies that query results include execution metrics
1813        let db = GrafeoDB::new_in_memory();
1814        db.create_node(&["Person"]);
1815        db.create_node(&["Person"]);
1816
1817        #[cfg(feature = "gql")]
1818        {
1819            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1820
1821            // Metrics should be populated
1822            assert!(result.execution_time_ms.is_some());
1823            assert!(result.rows_scanned.is_some());
1824            assert!(result.execution_time_ms.unwrap() >= 0.0);
1825            assert_eq!(result.rows_scanned.unwrap(), 2);
1826        }
1827    }
1828
1829    #[test]
1830    fn test_empty_query_result_metrics() {
1831        // Verifies metrics are correct for queries returning no results
1832        let db = GrafeoDB::new_in_memory();
1833        db.create_node(&["Person"]);
1834
1835        #[cfg(feature = "gql")]
1836        {
1837            // Query that matches nothing
1838            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1839
1840            assert!(result.execution_time_ms.is_some());
1841            assert!(result.rows_scanned.is_some());
1842            assert_eq!(result.rows_scanned.unwrap(), 0);
1843        }
1844    }
1845
1846    #[cfg(feature = "cdc")]
1847    mod cdc_integration {
1848        use super::*;
1849
1850        #[test]
1851        fn test_node_lifecycle_history() {
1852            let db = GrafeoDB::new_in_memory();
1853
1854            // Create
1855            let id = db.create_node(&["Person"]);
1856            // Update
1857            db.set_node_property(id, "name", "Alix".into());
1858            db.set_node_property(id, "name", "Gus".into());
1859            // Delete
1860            db.delete_node(id);
1861
1862            let history = db.history(id).unwrap();
1863            assert_eq!(history.len(), 4); // create + 2 updates + delete
1864            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1865            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1866            assert!(history[1].before.is_none()); // first set_node_property has no prior value
1867            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1868            assert!(history[2].before.is_some()); // second update has prior "Alix"
1869            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1870        }
1871
1872        #[test]
1873        fn test_edge_lifecycle_history() {
1874            let db = GrafeoDB::new_in_memory();
1875
1876            let alix = db.create_node(&["Person"]);
1877            let gus = db.create_node(&["Person"]);
1878            let edge = db.create_edge(alix, gus, "KNOWS");
1879            db.set_edge_property(edge, "since", 2024i64.into());
1880            db.delete_edge(edge);
1881
1882            let history = db.history(edge).unwrap();
1883            assert_eq!(history.len(), 3); // create + update + delete
1884            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1885            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1886            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1887        }
1888
1889        #[test]
1890        fn test_create_node_with_props_cdc() {
1891            let db = GrafeoDB::new_in_memory();
1892
1893            let id = db.create_node_with_props(
1894                &["Person"],
1895                vec![
1896                    ("name", grafeo_common::types::Value::from("Alix")),
1897                    ("age", grafeo_common::types::Value::from(30i64)),
1898                ],
1899            );
1900
1901            let history = db.history(id).unwrap();
1902            assert_eq!(history.len(), 1);
1903            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1904            // Props should be captured
1905            let after = history[0].after.as_ref().unwrap();
1906            assert_eq!(after.len(), 2);
1907        }
1908
1909        #[test]
1910        fn test_changes_between() {
1911            let db = GrafeoDB::new_in_memory();
1912
1913            let id1 = db.create_node(&["A"]);
1914            let _id2 = db.create_node(&["B"]);
1915            db.set_node_property(id1, "x", 1i64.into());
1916
1917            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
1918            let changes = db
1919                .changes_between(
1920                    grafeo_common::types::EpochId(0),
1921                    grafeo_common::types::EpochId(u64::MAX),
1922                )
1923                .unwrap();
1924            assert_eq!(changes.len(), 3); // 2 creates + 1 update
1925        }
1926    }
1927
1928    #[test]
1929    fn test_with_store_basic() {
1930        use grafeo_core::graph::lpg::LpgStore;
1931
1932        let store = Arc::new(LpgStore::new().unwrap());
1933        let n1 = store.create_node(&["Person"]);
1934        store.set_node_property(n1, "name", "Alix".into());
1935
1936        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1937        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1938
1939        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1940        assert_eq!(result.rows.len(), 1);
1941    }
1942
1943    #[test]
1944    fn test_with_store_session() {
1945        use grafeo_core::graph::lpg::LpgStore;
1946
1947        let store = Arc::new(LpgStore::new().unwrap());
1948        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1949        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1950
1951        let session = db.session();
1952        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1953        assert_eq!(result.rows.len(), 1);
1954    }
1955
1956    #[test]
1957    fn test_with_store_mutations() {
1958        use grafeo_core::graph::lpg::LpgStore;
1959
1960        let store = Arc::new(LpgStore::new().unwrap());
1961        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1962        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1963
1964        let mut session = db.session();
1965
1966        // Use an explicit transaction so INSERT and MATCH share the same
1967        // transaction context. With PENDING epochs, uncommitted versions are
1968        // only visible to the owning transaction.
1969        session.begin_transaction().unwrap();
1970        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1971
1972        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1973        assert_eq!(result.rows.len(), 1);
1974
1975        session.commit().unwrap();
1976    }
1977}