Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14mod admin;
15#[cfg(feature = "async-storage")]
16mod async_ops;
17#[cfg(feature = "async-storage")]
18pub(crate) mod async_wal_store;
19mod crud;
20#[cfg(feature = "embed")]
21mod embed;
22mod index;
23mod persistence;
24mod query;
25#[cfg(feature = "rdf")]
26mod rdf_ops;
27mod search;
28#[cfg(feature = "wal")]
29pub(crate) mod wal_store;
30
31use grafeo_common::grafeo_error;
32#[cfg(feature = "wal")]
33use std::path::Path;
34use std::sync::Arc;
35use std::sync::atomic::AtomicUsize;
36
37use parking_lot::RwLock;
38
39#[cfg(feature = "grafeo-file")]
40use grafeo_adapters::storage::file::GrafeoFileManager;
41#[cfg(feature = "wal")]
42use grafeo_adapters::storage::wal::{
43    DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
44};
45use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
46use grafeo_common::utils::error::Result;
47use grafeo_core::graph::GraphStoreMut;
48use grafeo_core::graph::lpg::LpgStore;
49#[cfg(feature = "rdf")]
50use grafeo_core::graph::rdf::RdfStore;
51
52use crate::catalog::Catalog;
53use crate::config::Config;
54use crate::query::cache::QueryCache;
55use crate::session::Session;
56use crate::transaction::TransactionManager;
57
58/// Your handle to a Grafeo database.
59///
60/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
61/// quick experiments, or [`open()`](Self::open) for persistent storage.
62/// Then grab a [`session()`](Self::session) to start querying.
63///
64/// # Examples
65///
66/// ```
67/// use grafeo_engine::GrafeoDB;
68///
69/// // Quick in-memory database
70/// let db = GrafeoDB::new_in_memory();
71///
72/// // Add some data
73/// db.create_node(&["Person"]);
74///
75/// // Query it
76/// let session = db.session();
77/// let result = session.execute("MATCH (p:Person) RETURN p")?;
78/// # Ok::<(), grafeo_common::utils::error::Error>(())
79/// ```
80pub struct GrafeoDB {
81    /// Database configuration.
82    pub(super) config: Config,
83    /// The underlying graph store.
84    pub(super) store: Arc<LpgStore>,
85    /// Schema and metadata catalog shared across sessions.
86    pub(super) catalog: Arc<Catalog>,
87    /// RDF triple store (if RDF feature is enabled).
88    #[cfg(feature = "rdf")]
89    pub(super) rdf_store: Arc<RdfStore>,
90    /// Transaction manager.
91    pub(super) transaction_manager: Arc<TransactionManager>,
92    /// Unified buffer manager.
93    pub(super) buffer_manager: Arc<BufferManager>,
94    /// Write-ahead log manager (if durability is enabled).
95    #[cfg(feature = "wal")]
96    pub(super) wal: Option<Arc<LpgWal>>,
97    /// Shared WAL graph context tracker. Tracks which named graph was last
98    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
99    /// records only when the context actually changes.
100    #[cfg(feature = "wal")]
101    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
102    /// Query cache for parsed and optimized plans.
103    pub(super) query_cache: Arc<QueryCache>,
104    /// Shared commit counter for auto-GC across sessions.
105    pub(super) commit_counter: Arc<AtomicUsize>,
106    /// Whether the database is open.
107    pub(super) is_open: RwLock<bool>,
108    /// Change data capture log for tracking mutations.
109    #[cfg(feature = "cdc")]
110    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
111    /// Registered embedding models for text-to-vector conversion.
112    #[cfg(feature = "embed")]
113    pub(super) embedding_models:
114        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
115    /// Single-file database manager (when using `.grafeo` format).
116    #[cfg(feature = "grafeo-file")]
117    pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
118    /// External graph store (when using with_store()).
119    /// When set, sessions route queries through this store instead of the built-in LpgStore.
120    pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
121    /// Metrics registry shared across all sessions.
122    #[cfg(feature = "metrics")]
123    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
124    /// Persistent graph context for one-shot `execute()` calls.
125    /// When set, each call to `session()` pre-configures the session to this graph.
126    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
127    current_graph: RwLock<Option<String>>,
128    /// Persistent schema context for one-shot `execute()` calls.
129    /// When set, each call to `session()` pre-configures the session to this schema.
130    /// Updated after every one-shot `execute()` to reflect `SESSION SET SCHEMA` / `SESSION RESET`.
131    current_schema: RwLock<Option<String>>,
132    /// Whether this database is open in read-only mode.
133    /// When true, sessions automatically enforce read-only transactions.
134    read_only: bool,
135}
136
137impl GrafeoDB {
138    /// Creates an in-memory database, fast to create, gone when dropped.
139    ///
140    /// Use this for tests, experiments, or when you don't need persistence.
141    /// For data that survives restarts, use [`open()`](Self::open) instead.
142    ///
143    /// # Panics
144    ///
145    /// Panics if the internal arena allocator cannot be initialized (out of memory).
146    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
147    ///
148    /// # Examples
149    ///
150    /// ```
151    /// use grafeo_engine::GrafeoDB;
152    ///
153    /// let db = GrafeoDB::new_in_memory();
154    /// let session = db.session();
155    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
156    /// # Ok::<(), grafeo_common::utils::error::Error>(())
157    /// ```
158    #[must_use]
159    pub fn new_in_memory() -> Self {
160        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
161    }
162
163    /// Opens a database at the given path, creating it if it doesn't exist.
164    ///
165    /// If you've used this path before, Grafeo recovers your data from the
166    /// write-ahead log automatically. First open on a new path creates an
167    /// empty database.
168    ///
169    /// # Errors
170    ///
171    /// Returns an error if the path isn't writable or recovery fails.
172    ///
173    /// # Examples
174    ///
175    /// ```no_run
176    /// use grafeo_engine::GrafeoDB;
177    ///
178    /// let db = GrafeoDB::open("./my_social_network")?;
179    /// # Ok::<(), grafeo_common::utils::error::Error>(())
180    /// ```
181    #[cfg(feature = "wal")]
182    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
183        Self::with_config(Config::persistent(path.as_ref()))
184    }
185
186    /// Opens an existing database in read-only mode.
187    ///
188    /// Uses a shared file lock, so multiple processes can read the same
189    /// `.grafeo` file concurrently. The database loads the last checkpoint
190    /// snapshot but does **not** replay the WAL or allow mutations.
191    ///
192    /// Currently only supports the single-file (`.grafeo`) format.
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if the file doesn't exist or can't be read.
197    ///
198    /// # Examples
199    ///
200    /// ```no_run
201    /// use grafeo_engine::GrafeoDB;
202    ///
203    /// let db = GrafeoDB::open_read_only("./my_graph.grafeo")?;
204    /// let session = db.session();
205    /// let result = session.execute("MATCH (n) RETURN n LIMIT 10")?;
206    /// // Mutations will return an error:
207    /// // session.execute("INSERT (:Person)") => Err(ReadOnly)
208    /// # Ok::<(), grafeo_common::utils::error::Error>(())
209    /// ```
210    #[cfg(feature = "grafeo-file")]
211    pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
212        Self::with_config(Config::read_only(path.as_ref()))
213    }
214
215    /// Creates a database with custom configuration.
216    ///
217    /// Use this when you need fine-grained control over memory limits,
218    /// thread counts, or persistence settings. For most cases,
219    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
220    /// are simpler.
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if the database can't be created or recovery fails.
225    ///
226    /// # Examples
227    ///
228    /// ```
229    /// use grafeo_engine::{GrafeoDB, Config};
230    ///
231    /// // In-memory with a 512MB limit
232    /// let config = Config::in_memory()
233    ///     .with_memory_limit(512 * 1024 * 1024);
234    ///
235    /// let db = GrafeoDB::with_config(config)?;
236    /// # Ok::<(), grafeo_common::utils::error::Error>(())
237    /// ```
238    pub fn with_config(config: Config) -> Result<Self> {
239        // Validate configuration before proceeding
240        config
241            .validate()
242            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
243
244        let store = Arc::new(LpgStore::new()?);
245        #[cfg(feature = "rdf")]
246        let rdf_store = Arc::new(RdfStore::new());
247        let transaction_manager = Arc::new(TransactionManager::new());
248
249        // Create buffer manager with configured limits
250        let buffer_config = BufferManagerConfig {
251            budget: config.memory_limit.unwrap_or_else(|| {
252                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
253            }),
254            spill_path: config
255                .spill_path
256                .clone()
257                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
258            ..BufferManagerConfig::default()
259        };
260        let buffer_manager = BufferManager::new(buffer_config);
261
262        // Create catalog early so WAL replay can restore schema definitions
263        let catalog = Arc::new(Catalog::new());
264
265        let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
266
267        // --- Single-file format (.grafeo) ---
268        #[cfg(feature = "grafeo-file")]
269        let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
270            // Read-only mode: open with shared lock, load snapshot, skip WAL
271            if let Some(ref db_path) = config.path {
272                if db_path.exists() && db_path.is_file() {
273                    let fm = GrafeoFileManager::open_read_only(db_path)?;
274                    let snapshot_data = fm.read_snapshot()?;
275                    if !snapshot_data.is_empty() {
276                        Self::apply_snapshot_data(
277                            &store,
278                            &catalog,
279                            #[cfg(feature = "rdf")]
280                            &rdf_store,
281                            &snapshot_data,
282                        )?;
283                    }
284                    Some(Arc::new(fm))
285                } else {
286                    return Err(grafeo_common::utils::error::Error::Internal(format!(
287                        "read-only open requires an existing .grafeo file: {}",
288                        db_path.display()
289                    )));
290                }
291            } else {
292                return Err(grafeo_common::utils::error::Error::Internal(
293                    "read-only mode requires a database path".to_string(),
294                ));
295            }
296        } else if let Some(ref db_path) = config.path {
297            // Initialize the file manager whenever single-file format is selected,
298            // regardless of whether WAL is enabled. Without this, a database opened
299            // with wal_enabled:false + StorageFormat::SingleFile would produce no
300            // output at all (the file manager was previously gated behind wal_enabled).
301            if Self::should_use_single_file(db_path, config.storage_format) {
302                let fm = if db_path.exists() && db_path.is_file() {
303                    GrafeoFileManager::open(db_path)?
304                } else if !db_path.exists() {
305                    GrafeoFileManager::create(db_path)?
306                } else {
307                    // Path exists but is not a file (directory, etc.)
308                    return Err(grafeo_common::utils::error::Error::Internal(format!(
309                        "path exists but is not a file: {}",
310                        db_path.display()
311                    )));
312                };
313
314                // Load snapshot data from the file
315                let snapshot_data = fm.read_snapshot()?;
316                if !snapshot_data.is_empty() {
317                    Self::apply_snapshot_data(
318                        &store,
319                        &catalog,
320                        #[cfg(feature = "rdf")]
321                        &rdf_store,
322                        &snapshot_data,
323                    )?;
324                }
325
326                // Recover sidecar WAL if WAL is enabled and a sidecar exists
327                #[cfg(feature = "wal")]
328                if config.wal_enabled && fm.has_sidecar_wal() {
329                    let recovery = WalRecovery::new(fm.sidecar_wal_path());
330                    let records = recovery.recover()?;
331                    Self::apply_wal_records(
332                        &store,
333                        &catalog,
334                        #[cfg(feature = "rdf")]
335                        &rdf_store,
336                        &records,
337                    )?;
338                }
339
340                Some(Arc::new(fm))
341            } else {
342                None
343            }
344        } else {
345            None
346        };
347
348        // Determine whether to use the WAL directory path (legacy) or sidecar
349        // Read-only mode skips WAL entirely (no recovery, no creation).
350        #[cfg(feature = "wal")]
351        let wal = if is_read_only {
352            None
353        } else if config.wal_enabled {
354            if let Some(ref db_path) = config.path {
355                // When using single-file format, the WAL is a sidecar directory
356                #[cfg(feature = "grafeo-file")]
357                let wal_path = if let Some(ref fm) = file_manager {
358                    let p = fm.sidecar_wal_path();
359                    std::fs::create_dir_all(&p)?;
360                    p
361                } else {
362                    // Legacy: WAL inside the database directory
363                    std::fs::create_dir_all(db_path)?;
364                    db_path.join("wal")
365                };
366
367                #[cfg(not(feature = "grafeo-file"))]
368                let wal_path = {
369                    std::fs::create_dir_all(db_path)?;
370                    db_path.join("wal")
371                };
372
373                // For legacy WAL directory format, check if WAL exists and recover
374                #[cfg(feature = "grafeo-file")]
375                let is_single_file = file_manager.is_some();
376                #[cfg(not(feature = "grafeo-file"))]
377                let is_single_file = false;
378
379                if !is_single_file && wal_path.exists() {
380                    let recovery = WalRecovery::new(&wal_path);
381                    let records = recovery.recover()?;
382                    Self::apply_wal_records(
383                        &store,
384                        &catalog,
385                        #[cfg(feature = "rdf")]
386                        &rdf_store,
387                        &records,
388                    )?;
389                }
390
391                // Open/create WAL manager with configured durability
392                let wal_durability = match config.wal_durability {
393                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
394                    crate::config::DurabilityMode::Batch {
395                        max_delay_ms,
396                        max_records,
397                    } => WalDurabilityMode::Batch {
398                        max_delay_ms,
399                        max_records,
400                    },
401                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
402                        WalDurabilityMode::Adaptive { target_interval_ms }
403                    }
404                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
405                };
406                let wal_config = WalConfig {
407                    durability: wal_durability,
408                    ..WalConfig::default()
409                };
410                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
411                Some(Arc::new(wal_manager))
412            } else {
413                None
414            }
415        } else {
416            None
417        };
418
419        // Create query cache with default capacity (1000 queries)
420        let query_cache = Arc::new(QueryCache::default());
421
422        // After all snapshot/WAL recovery, sync TransactionManager epoch
423        // with the store so queries use the correct viewing epoch.
424        #[cfg(feature = "temporal")]
425        transaction_manager.sync_epoch(store.current_epoch());
426
427        Ok(Self {
428            config,
429            store,
430            catalog,
431            #[cfg(feature = "rdf")]
432            rdf_store,
433            transaction_manager,
434            buffer_manager,
435            #[cfg(feature = "wal")]
436            wal,
437            #[cfg(feature = "wal")]
438            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
439            query_cache,
440            commit_counter: Arc::new(AtomicUsize::new(0)),
441            is_open: RwLock::new(true),
442            #[cfg(feature = "cdc")]
443            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
444            #[cfg(feature = "embed")]
445            embedding_models: RwLock::new(hashbrown::HashMap::new()),
446            #[cfg(feature = "grafeo-file")]
447            file_manager,
448            external_store: None,
449            #[cfg(feature = "metrics")]
450            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
451            current_graph: RwLock::new(None),
452            current_schema: RwLock::new(None),
453            read_only: is_read_only,
454        })
455    }
456
457    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
458    ///
459    /// The external store handles all data persistence. WAL, CDC, and index
460    /// management are the responsibility of the store implementation.
461    ///
462    /// Query execution (all 6 languages, optimizer, planner) works through the
463    /// provided store. Admin operations (schema introspection, persistence,
464    /// vector/text indexes) are not available on external stores.
465    ///
466    /// # Examples
467    ///
468    /// ```no_run
469    /// use std::sync::Arc;
470    /// use grafeo_engine::{GrafeoDB, Config};
471    /// use grafeo_core::graph::GraphStoreMut;
472    ///
473    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
474    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
475    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
476    ///     Ok(())
477    /// }
478    /// ```
479    ///
480    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
481    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
482        config
483            .validate()
484            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
485
486        let dummy_store = Arc::new(LpgStore::new()?);
487        let transaction_manager = Arc::new(TransactionManager::new());
488
489        let buffer_config = BufferManagerConfig {
490            budget: config.memory_limit.unwrap_or_else(|| {
491                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
492            }),
493            spill_path: None,
494            ..BufferManagerConfig::default()
495        };
496        let buffer_manager = BufferManager::new(buffer_config);
497
498        let query_cache = Arc::new(QueryCache::default());
499
500        Ok(Self {
501            config,
502            store: dummy_store,
503            catalog: Arc::new(Catalog::new()),
504            #[cfg(feature = "rdf")]
505            rdf_store: Arc::new(RdfStore::new()),
506            transaction_manager,
507            buffer_manager,
508            #[cfg(feature = "wal")]
509            wal: None,
510            #[cfg(feature = "wal")]
511            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
512            query_cache,
513            commit_counter: Arc::new(AtomicUsize::new(0)),
514            is_open: RwLock::new(true),
515            #[cfg(feature = "cdc")]
516            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
517            #[cfg(feature = "embed")]
518            embedding_models: RwLock::new(hashbrown::HashMap::new()),
519            #[cfg(feature = "grafeo-file")]
520            file_manager: None,
521            external_store: Some(store),
522            #[cfg(feature = "metrics")]
523            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
524            current_graph: RwLock::new(None),
525            current_schema: RwLock::new(None),
526            read_only: false,
527        })
528    }
529
530    /// Applies WAL records to restore the database state.
531    ///
532    /// Data mutation records are routed through a graph cursor that tracks
533    /// `SwitchGraph` context markers, replaying mutations into the correct
534    /// named graph (or the default graph when cursor is `None`).
535    #[cfg(feature = "wal")]
536    fn apply_wal_records(
537        store: &Arc<LpgStore>,
538        catalog: &Catalog,
539        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
540        records: &[WalRecord],
541    ) -> Result<()> {
542        use crate::catalog::{
543            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
544        };
545        use grafeo_common::utils::error::Error;
546
547        // Graph cursor: tracks which named graph receives data mutations.
548        // `None` means the default graph.
549        let mut current_graph: Option<String> = None;
550        let mut target_store: Arc<LpgStore> = Arc::clone(store);
551
552        for record in records {
553            match record {
554                // --- Named graph lifecycle ---
555                WalRecord::CreateNamedGraph { name } => {
556                    let _ = store.create_graph(name);
557                }
558                WalRecord::DropNamedGraph { name } => {
559                    store.drop_graph(name);
560                    // Reset cursor if the dropped graph was active
561                    if current_graph.as_deref() == Some(name.as_str()) {
562                        current_graph = None;
563                        target_store = Arc::clone(store);
564                    }
565                }
566                WalRecord::SwitchGraph { name } => {
567                    current_graph.clone_from(name);
568                    target_store = match &current_graph {
569                        None => Arc::clone(store),
570                        Some(graph_name) => store
571                            .graph_or_create(graph_name)
572                            .map_err(|e| Error::Internal(e.to_string()))?,
573                    };
574                }
575
576                // --- Data mutations: routed through target_store ---
577                WalRecord::CreateNode { id, labels } => {
578                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
579                    target_store.create_node_with_id(*id, &label_refs)?;
580                }
581                WalRecord::DeleteNode { id } => {
582                    target_store.delete_node(*id);
583                }
584                WalRecord::CreateEdge {
585                    id,
586                    src,
587                    dst,
588                    edge_type,
589                } => {
590                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
591                }
592                WalRecord::DeleteEdge { id } => {
593                    target_store.delete_edge(*id);
594                }
595                WalRecord::SetNodeProperty { id, key, value } => {
596                    target_store.set_node_property(*id, key, value.clone());
597                }
598                WalRecord::SetEdgeProperty { id, key, value } => {
599                    target_store.set_edge_property(*id, key, value.clone());
600                }
601                WalRecord::AddNodeLabel { id, label } => {
602                    target_store.add_label(*id, label);
603                }
604                WalRecord::RemoveNodeLabel { id, label } => {
605                    target_store.remove_label(*id, label);
606                }
607                WalRecord::RemoveNodeProperty { id, key } => {
608                    target_store.remove_node_property(*id, key);
609                }
610                WalRecord::RemoveEdgeProperty { id, key } => {
611                    target_store.remove_edge_property(*id, key);
612                }
613
614                // --- Schema DDL replay (always on root catalog) ---
615                WalRecord::CreateNodeType {
616                    name,
617                    properties,
618                    constraints,
619                } => {
620                    let def = NodeTypeDefinition {
621                        name: name.clone(),
622                        properties: properties
623                            .iter()
624                            .map(|(n, t, nullable)| TypedProperty {
625                                name: n.clone(),
626                                data_type: PropertyDataType::from_type_name(t),
627                                nullable: *nullable,
628                                default_value: None,
629                            })
630                            .collect(),
631                        constraints: constraints
632                            .iter()
633                            .map(|(kind, props)| match kind.as_str() {
634                                "unique" => TypeConstraint::Unique(props.clone()),
635                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
636                                "not_null" if !props.is_empty() => {
637                                    TypeConstraint::NotNull(props[0].clone())
638                                }
639                                _ => TypeConstraint::Unique(props.clone()),
640                            })
641                            .collect(),
642                        parent_types: Vec::new(),
643                    };
644                    let _ = catalog.register_node_type(def);
645                }
646                WalRecord::DropNodeType { name } => {
647                    let _ = catalog.drop_node_type(name);
648                }
649                WalRecord::CreateEdgeType {
650                    name,
651                    properties,
652                    constraints,
653                } => {
654                    let def = EdgeTypeDefinition {
655                        name: name.clone(),
656                        properties: properties
657                            .iter()
658                            .map(|(n, t, nullable)| TypedProperty {
659                                name: n.clone(),
660                                data_type: PropertyDataType::from_type_name(t),
661                                nullable: *nullable,
662                                default_value: None,
663                            })
664                            .collect(),
665                        constraints: constraints
666                            .iter()
667                            .map(|(kind, props)| match kind.as_str() {
668                                "unique" => TypeConstraint::Unique(props.clone()),
669                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
670                                "not_null" if !props.is_empty() => {
671                                    TypeConstraint::NotNull(props[0].clone())
672                                }
673                                _ => TypeConstraint::Unique(props.clone()),
674                            })
675                            .collect(),
676                        source_node_types: Vec::new(),
677                        target_node_types: Vec::new(),
678                    };
679                    let _ = catalog.register_edge_type_def(def);
680                }
681                WalRecord::DropEdgeType { name } => {
682                    let _ = catalog.drop_edge_type_def(name);
683                }
684                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
685                    // Index recreation is handled by the store on startup
686                    // (indexes are rebuilt from data, not WAL)
687                }
688                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
689                    // Constraint definitions are part of type definitions
690                    // and replayed via CreateNodeType/CreateEdgeType
691                }
692                WalRecord::CreateGraphType {
693                    name,
694                    node_types,
695                    edge_types,
696                    open,
697                } => {
698                    use crate::catalog::GraphTypeDefinition;
699                    let def = GraphTypeDefinition {
700                        name: name.clone(),
701                        allowed_node_types: node_types.clone(),
702                        allowed_edge_types: edge_types.clone(),
703                        open: *open,
704                    };
705                    let _ = catalog.register_graph_type(def);
706                }
707                WalRecord::DropGraphType { name } => {
708                    let _ = catalog.drop_graph_type(name);
709                }
710                WalRecord::CreateSchema { name } => {
711                    let _ = catalog.register_schema_namespace(name.clone());
712                }
713                WalRecord::DropSchema { name } => {
714                    let _ = catalog.drop_schema_namespace(name);
715                }
716
717                WalRecord::AlterNodeType { name, alterations } => {
718                    for (action, prop_name, type_name, nullable) in alterations {
719                        match action.as_str() {
720                            "add" => {
721                                let prop = TypedProperty {
722                                    name: prop_name.clone(),
723                                    data_type: PropertyDataType::from_type_name(type_name),
724                                    nullable: *nullable,
725                                    default_value: None,
726                                };
727                                let _ = catalog.alter_node_type_add_property(name, prop);
728                            }
729                            "drop" => {
730                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
731                            }
732                            _ => {}
733                        }
734                    }
735                }
736                WalRecord::AlterEdgeType { name, alterations } => {
737                    for (action, prop_name, type_name, nullable) in alterations {
738                        match action.as_str() {
739                            "add" => {
740                                let prop = TypedProperty {
741                                    name: prop_name.clone(),
742                                    data_type: PropertyDataType::from_type_name(type_name),
743                                    nullable: *nullable,
744                                    default_value: None,
745                                };
746                                let _ = catalog.alter_edge_type_add_property(name, prop);
747                            }
748                            "drop" => {
749                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
750                            }
751                            _ => {}
752                        }
753                    }
754                }
755                WalRecord::AlterGraphType { name, alterations } => {
756                    for (action, type_name) in alterations {
757                        match action.as_str() {
758                            "add_node" => {
759                                let _ =
760                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
761                            }
762                            "drop_node" => {
763                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
764                            }
765                            "add_edge" => {
766                                let _ =
767                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
768                            }
769                            "drop_edge" => {
770                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
771                            }
772                            _ => {}
773                        }
774                    }
775                }
776
777                WalRecord::CreateProcedure {
778                    name,
779                    params,
780                    returns,
781                    body,
782                } => {
783                    use crate::catalog::ProcedureDefinition;
784                    let def = ProcedureDefinition {
785                        name: name.clone(),
786                        params: params.clone(),
787                        returns: returns.clone(),
788                        body: body.clone(),
789                    };
790                    let _ = catalog.register_procedure(def);
791                }
792                WalRecord::DropProcedure { name } => {
793                    let _ = catalog.drop_procedure(name);
794                }
795
796                // --- RDF triple replay ---
797                #[cfg(feature = "rdf")]
798                WalRecord::InsertRdfTriple { .. }
799                | WalRecord::DeleteRdfTriple { .. }
800                | WalRecord::ClearRdfGraph { .. }
801                | WalRecord::CreateRdfGraph { .. }
802                | WalRecord::DropRdfGraph { .. } => {
803                    rdf_ops::replay_rdf_wal_record(rdf_store, record);
804                }
805                #[cfg(not(feature = "rdf"))]
806                WalRecord::InsertRdfTriple { .. }
807                | WalRecord::DeleteRdfTriple { .. }
808                | WalRecord::ClearRdfGraph { .. }
809                | WalRecord::CreateRdfGraph { .. }
810                | WalRecord::DropRdfGraph { .. } => {}
811
812                WalRecord::TransactionCommit { .. } => {
813                    // In temporal mode, advance the store epoch on each committed
814                    // transaction so that subsequent property/label operations
815                    // are recorded at the correct epoch in their VersionLogs.
816                    #[cfg(feature = "temporal")]
817                    {
818                        target_store.new_epoch();
819                    }
820                }
821                WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
822                    // Transaction control records don't need replay action
823                    // (recovery already filtered to only committed transactions)
824                }
825            }
826        }
827        Ok(())
828    }
829
830    // =========================================================================
831    // Single-file format helpers
832    // =========================================================================
833
834    /// Returns `true` if the given path should use single-file format.
835    #[cfg(feature = "grafeo-file")]
836    fn should_use_single_file(
837        path: &std::path::Path,
838        configured: crate::config::StorageFormat,
839    ) -> bool {
840        use crate::config::StorageFormat;
841        match configured {
842            StorageFormat::SingleFile => true,
843            StorageFormat::WalDirectory => false,
844            StorageFormat::Auto => {
845                // Existing file: check magic bytes
846                if path.is_file() {
847                    if let Ok(mut f) = std::fs::File::open(path) {
848                        use std::io::Read;
849                        let mut magic = [0u8; 4];
850                        if f.read_exact(&mut magic).is_ok()
851                            && magic == grafeo_adapters::storage::file::MAGIC
852                        {
853                            return true;
854                        }
855                    }
856                    return false;
857                }
858                // Existing directory: legacy format
859                if path.is_dir() {
860                    return false;
861                }
862                // New path: check extension
863                path.extension().is_some_and(|ext| ext == "grafeo")
864            }
865        }
866    }
867
868    /// Applies snapshot data (from a `.grafeo` file) to restore the store and catalog.
869    #[cfg(feature = "grafeo-file")]
870    fn apply_snapshot_data(
871        store: &Arc<LpgStore>,
872        catalog: &Arc<crate::catalog::Catalog>,
873        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
874        data: &[u8],
875    ) -> Result<()> {
876        persistence::load_snapshot_into_store(
877            store,
878            catalog,
879            #[cfg(feature = "rdf")]
880            rdf_store,
881            data,
882        )
883    }
884
885    // =========================================================================
886    // Session & Configuration
887    // =========================================================================
888
889    /// Opens a new session for running queries.
890    ///
891    /// Sessions are cheap to create: spin up as many as you need. Each
892    /// gets its own transaction context, so concurrent sessions won't
893    /// block each other on reads.
894    ///
895    /// # Panics
896    ///
897    /// Panics if the database was configured with an external graph store and
898    /// the internal arena allocator cannot be initialized (out of memory).
899    ///
900    /// # Examples
901    ///
902    /// ```
903    /// use grafeo_engine::GrafeoDB;
904    ///
905    /// let db = GrafeoDB::new_in_memory();
906    /// let session = db.session();
907    ///
908    /// // Run queries through the session
909    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
910    /// # Ok::<(), grafeo_common::utils::error::Error>(())
911    /// ```
912    #[must_use]
913    pub fn session(&self) -> Session {
914        let session_cfg = || crate::session::SessionConfig {
915            transaction_manager: Arc::clone(&self.transaction_manager),
916            query_cache: Arc::clone(&self.query_cache),
917            catalog: Arc::clone(&self.catalog),
918            adaptive_config: self.config.adaptive.clone(),
919            factorized_execution: self.config.factorized_execution,
920            graph_model: self.config.graph_model,
921            query_timeout: self.config.query_timeout,
922            commit_counter: Arc::clone(&self.commit_counter),
923            gc_interval: self.config.gc_interval,
924            read_only: self.read_only,
925        };
926
927        if let Some(ref ext_store) = self.external_store {
928            return Session::with_external_store(Arc::clone(ext_store), session_cfg())
929                .expect("arena allocation for external store session");
930        }
931
932        #[cfg(feature = "rdf")]
933        let mut session = Session::with_rdf_store_and_adaptive(
934            Arc::clone(&self.store),
935            Arc::clone(&self.rdf_store),
936            session_cfg(),
937        );
938        #[cfg(not(feature = "rdf"))]
939        let mut session = Session::with_adaptive(Arc::clone(&self.store), session_cfg());
940
941        #[cfg(feature = "wal")]
942        if let Some(ref wal) = self.wal {
943            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
944        }
945
946        #[cfg(feature = "cdc")]
947        session.set_cdc_log(Arc::clone(&self.cdc_log));
948
949        #[cfg(feature = "metrics")]
950        {
951            if let Some(ref m) = self.metrics {
952                session.set_metrics(Arc::clone(m));
953                m.session_created
954                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
955                m.session_active
956                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
957            }
958        }
959
960        // Propagate persistent graph context to the new session
961        if let Some(ref graph) = *self.current_graph.read() {
962            session.use_graph(graph);
963        }
964
965        // Propagate persistent schema context to the new session
966        if let Some(ref schema) = *self.current_schema.read() {
967            session.set_schema(schema);
968        }
969
970        // Suppress unused_mut when cdc/wal are disabled
971        let _ = &mut session;
972
973        session
974    }
975
976    /// Returns the current graph name, if any.
977    ///
978    /// This is the persistent graph context used by one-shot `execute()` calls.
979    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
980    /// or `SESSION RESET`.
981    #[must_use]
982    pub fn current_graph(&self) -> Option<String> {
983        self.current_graph.read().clone()
984    }
985
986    /// Sets the current graph context for subsequent one-shot `execute()` calls.
987    ///
988    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
989    /// Pass `None` to reset to the default graph.
990    pub fn set_current_graph(&self, name: Option<&str>) {
991        *self.current_graph.write() = name.map(ToString::to_string);
992    }
993
994    /// Returns the current schema name, if any.
995    ///
996    /// This is the persistent schema context used by one-shot `execute()` calls.
997    /// It is updated whenever `execute()` encounters `SESSION SET SCHEMA` or `SESSION RESET`.
998    #[must_use]
999    pub fn current_schema(&self) -> Option<String> {
1000        self.current_schema.read().clone()
1001    }
1002
1003    /// Sets the current schema context for subsequent one-shot `execute()` calls.
1004    ///
1005    /// This is equivalent to running `SESSION SET SCHEMA <name>` but without creating
1006    /// a session. Pass `None` to clear the schema context.
1007    pub fn set_current_schema(&self, name: Option<&str>) {
1008        *self.current_schema.write() = name.map(ToString::to_string);
1009    }
1010
1011    /// Returns the adaptive execution configuration.
1012    #[must_use]
1013    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1014        &self.config.adaptive
1015    }
1016
1017    /// Returns `true` if this database was opened in read-only mode.
1018    #[must_use]
1019    pub fn is_read_only(&self) -> bool {
1020        self.read_only
1021    }
1022
1023    /// Returns the configuration.
1024    #[must_use]
1025    pub fn config(&self) -> &Config {
1026        &self.config
1027    }
1028
1029    /// Returns the graph data model of this database.
1030    #[must_use]
1031    pub fn graph_model(&self) -> crate::config::GraphModel {
1032        self.config.graph_model
1033    }
1034
1035    /// Returns the configured memory limit in bytes, if any.
1036    #[must_use]
1037    pub fn memory_limit(&self) -> Option<usize> {
1038        self.config.memory_limit
1039    }
1040
1041    /// Returns a point-in-time snapshot of all metrics.
1042    ///
1043    /// If the `metrics` feature is disabled or the registry is not
1044    /// initialized, returns a default (all-zero) snapshot.
1045    #[cfg(feature = "metrics")]
1046    #[must_use]
1047    pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1048        let mut snapshot = self
1049            .metrics
1050            .as_ref()
1051            .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1052
1053        // Augment with cache stats from the query cache (not tracked in the registry)
1054        let cache_stats = self.query_cache.stats();
1055        snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1056        snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1057        snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1058        snapshot.cache_invalidations = cache_stats.invalidations;
1059
1060        snapshot
1061    }
1062
1063    /// Returns all metrics in Prometheus text exposition format.
1064    ///
1065    /// The output is ready to serve from an HTTP `/metrics` endpoint.
1066    #[cfg(feature = "metrics")]
1067    #[must_use]
1068    pub fn metrics_prometheus(&self) -> String {
1069        self.metrics
1070            .as_ref()
1071            .map_or_else(String::new, |m| m.to_prometheus())
1072    }
1073
1074    /// Resets all metrics counters and histograms to zero.
1075    #[cfg(feature = "metrics")]
1076    pub fn reset_metrics(&self) {
1077        if let Some(ref m) = self.metrics {
1078            m.reset();
1079        }
1080        self.query_cache.reset_stats();
1081    }
1082
1083    /// Returns the underlying (default) store.
1084    ///
1085    /// This provides direct access to the LPG store for algorithm implementations
1086    /// and admin operations (index management, schema introspection, MVCC internals).
1087    ///
1088    /// For code that only needs read/write graph operations, prefer
1089    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
1090    #[must_use]
1091    pub fn store(&self) -> &Arc<LpgStore> {
1092        &self.store
1093    }
1094
1095    /// Returns the LPG store for the currently active graph.
1096    ///
1097    /// If [`current_graph`](Self::current_graph) is `None` or `"default"`, returns
1098    /// the default store. Otherwise looks up the named graph in the root store.
1099    /// Falls back to the default store if the named graph does not exist.
1100    #[allow(dead_code)] // Reserved for future graph-aware CRUD methods
1101    fn active_store(&self) -> Arc<LpgStore> {
1102        let graph_name = self.current_graph.read().clone();
1103        match graph_name {
1104            None => Arc::clone(&self.store),
1105            Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
1106            Some(ref name) => self
1107                .store
1108                .graph(name)
1109                .unwrap_or_else(|| Arc::clone(&self.store)),
1110        }
1111    }
1112
1113    // === Named Graph Management ===
1114
1115    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
1116    ///
1117    /// # Errors
1118    ///
1119    /// Returns an error if arena allocation fails.
1120    pub fn create_graph(&self, name: &str) -> Result<bool> {
1121        Ok(self.store.create_graph(name)?)
1122    }
1123
1124    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
1125    pub fn drop_graph(&self, name: &str) -> bool {
1126        self.store.drop_graph(name)
1127    }
1128
1129    /// Returns all named graph names.
1130    #[must_use]
1131    pub fn list_graphs(&self) -> Vec<String> {
1132        self.store.graph_names()
1133    }
1134
1135    /// Returns the graph store as a trait object.
1136    ///
1137    /// This provides the [`GraphStoreMut`] interface for code that should work
1138    /// with any storage backend. Use this when you only need graph read/write
1139    /// operations and don't need admin methods like index management.
1140    ///
1141    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
1142    #[must_use]
1143    pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
1144        if let Some(ref ext_store) = self.external_store {
1145            Arc::clone(ext_store)
1146        } else {
1147            Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
1148        }
1149    }
1150
1151    /// Garbage collects old MVCC versions that are no longer visible.
1152    ///
1153    /// Determines the minimum epoch required by active transactions and prunes
1154    /// version chains older than that threshold. Also cleans up completed
1155    /// transaction metadata in the transaction manager.
1156    pub fn gc(&self) {
1157        let min_epoch = self.transaction_manager.min_active_epoch();
1158        self.store.gc_versions(min_epoch);
1159        self.transaction_manager.gc();
1160    }
1161
1162    /// Returns the buffer manager for memory-aware operations.
1163    #[must_use]
1164    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1165        &self.buffer_manager
1166    }
1167
1168    /// Returns the query cache.
1169    #[must_use]
1170    pub fn query_cache(&self) -> &Arc<QueryCache> {
1171        &self.query_cache
1172    }
1173
1174    /// Clears all cached query plans.
1175    ///
1176    /// This is called automatically after DDL operations, but can also be
1177    /// invoked manually after external schema changes (e.g., WAL replay,
1178    /// import) or when you want to force re-optimization of all queries.
1179    pub fn clear_plan_cache(&self) {
1180        self.query_cache.clear();
1181    }
1182
1183    // =========================================================================
1184    // Lifecycle
1185    // =========================================================================
1186
1187    /// Closes the database, flushing all pending writes.
1188    ///
1189    /// For persistent databases, this ensures everything is safely on disk.
1190    /// Called automatically when the database is dropped, but you can call
1191    /// it explicitly if you need to guarantee durability at a specific point.
1192    ///
1193    /// # Errors
1194    ///
1195    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
1196    pub fn close(&self) -> Result<()> {
1197        let mut is_open = self.is_open.write();
1198        if !*is_open {
1199            return Ok(());
1200        }
1201
1202        // Read-only databases: just release the shared lock, no checkpointing
1203        if self.read_only {
1204            #[cfg(feature = "grafeo-file")]
1205            if let Some(ref fm) = self.file_manager {
1206                fm.close()?;
1207            }
1208            *is_open = false;
1209            return Ok(());
1210        }
1211
1212        // For single-file format: checkpoint to .grafeo file, then clean up sidecar WAL.
1213        // We must do this BEFORE the WAL close path because checkpoint_to_file
1214        // removes the sidecar WAL directory.
1215        #[cfg(feature = "grafeo-file")]
1216        let is_single_file = self.file_manager.is_some();
1217        #[cfg(not(feature = "grafeo-file"))]
1218        let is_single_file = false;
1219
1220        #[cfg(feature = "grafeo-file")]
1221        if let Some(ref fm) = self.file_manager {
1222            // Flush WAL first so all records are on disk before we snapshot
1223            #[cfg(feature = "wal")]
1224            if let Some(ref wal) = self.wal {
1225                wal.sync()?;
1226            }
1227            self.checkpoint_to_file(fm)?;
1228
1229            // Release WAL file handles before removing sidecar directory.
1230            // On Windows, open handles prevent directory deletion.
1231            #[cfg(feature = "wal")]
1232            if let Some(ref wal) = self.wal {
1233                wal.close_active_log();
1234            }
1235
1236            {
1237                use grafeo_core::testing::crash::maybe_crash;
1238                maybe_crash("close:before_remove_sidecar_wal");
1239            }
1240            fm.remove_sidecar_wal()?;
1241            fm.close()?;
1242        }
1243
1244        // Commit and sync WAL (legacy directory format only).
1245        // We intentionally do NOT call wal.checkpoint() here. Directory format
1246        // has no snapshot: the WAL files are the sole source of truth. Writing
1247        // checkpoint.meta would cause recovery to skip older WAL files, losing
1248        // all data that predates the current log sequence.
1249        #[cfg(feature = "wal")]
1250        if !is_single_file && let Some(ref wal) = self.wal {
1251            // Use the last assigned transaction ID, or create one for the commit record
1252            let commit_tx = self
1253                .transaction_manager
1254                .last_assigned_transaction_id()
1255                .unwrap_or_else(|| self.transaction_manager.begin());
1256
1257            // Log a TransactionCommit to mark all pending records as committed
1258            wal.log(&WalRecord::TransactionCommit {
1259                transaction_id: commit_tx,
1260            })?;
1261
1262            wal.sync()?;
1263        }
1264
1265        *is_open = false;
1266        Ok(())
1267    }
1268
1269    /// Returns the typed WAL if available.
1270    #[cfg(feature = "wal")]
1271    #[must_use]
1272    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1273        self.wal.as_ref()
1274    }
1275
1276    /// Logs a WAL record if WAL is enabled.
1277    #[cfg(feature = "wal")]
1278    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1279        if let Some(ref wal) = self.wal {
1280            wal.log(record)?;
1281        }
1282        Ok(())
1283    }
1284
1285    /// Writes the current database snapshot to the `.grafeo` file.
1286    ///
1287    /// Does NOT remove the sidecar WAL: callers that want to clean up
1288    /// the sidecar (e.g. `close()`) should call `fm.remove_sidecar_wal()`
1289    /// separately after this returns.
1290    #[cfg(feature = "grafeo-file")]
1291    fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1292        use grafeo_core::testing::crash::maybe_crash;
1293
1294        maybe_crash("checkpoint_to_file:before_export");
1295        let snapshot_data = self.export_snapshot()?;
1296        maybe_crash("checkpoint_to_file:after_export");
1297
1298        let epoch = self.store.current_epoch();
1299        let transaction_id = self
1300            .transaction_manager
1301            .last_assigned_transaction_id()
1302            .map_or(0, |t| t.0);
1303        let node_count = self.store.node_count() as u64;
1304        let edge_count = self.store.edge_count() as u64;
1305
1306        fm.write_snapshot(
1307            &snapshot_data,
1308            epoch.0,
1309            transaction_id,
1310            node_count,
1311            edge_count,
1312        )?;
1313
1314        maybe_crash("checkpoint_to_file:after_write_snapshot");
1315        Ok(())
1316    }
1317
1318    /// Returns the file manager if using single-file format.
1319    #[cfg(feature = "grafeo-file")]
1320    #[must_use]
1321    pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1322        self.file_manager.as_ref()
1323    }
1324}
1325
1326impl Drop for GrafeoDB {
1327    fn drop(&mut self) {
1328        if let Err(e) = self.close() {
1329            grafeo_error!("Error closing database: {}", e);
1330        }
1331    }
1332}
1333
1334impl crate::admin::AdminService for GrafeoDB {
1335    fn info(&self) -> crate::admin::DatabaseInfo {
1336        self.info()
1337    }
1338
1339    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1340        self.detailed_stats()
1341    }
1342
1343    fn schema(&self) -> crate::admin::SchemaInfo {
1344        self.schema()
1345    }
1346
1347    fn validate(&self) -> crate::admin::ValidationResult {
1348        self.validate()
1349    }
1350
1351    fn wal_status(&self) -> crate::admin::WalStatus {
1352        self.wal_status()
1353    }
1354
1355    fn wal_checkpoint(&self) -> Result<()> {
1356        self.wal_checkpoint()
1357    }
1358}
1359
1360// =========================================================================
1361// Query Result Types
1362// =========================================================================
1363
1364/// The result of running a query.
1365///
1366/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1367/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1368///
1369/// # Examples
1370///
1371/// ```
1372/// use grafeo_engine::GrafeoDB;
1373///
1374/// let db = GrafeoDB::new_in_memory();
1375/// db.create_node(&["Person"]);
1376///
1377/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1378///
1379/// // Check what we got
1380/// println!("Columns: {:?}", result.columns);
1381/// println!("Rows: {}", result.row_count());
1382///
1383/// // Iterate through results
1384/// for row in result.iter() {
1385///     println!("{:?}", row);
1386/// }
1387/// # Ok::<(), grafeo_common::utils::error::Error>(())
1388/// ```
1389#[derive(Debug)]
1390pub struct QueryResult {
1391    /// Column names from the RETURN clause.
1392    pub columns: Vec<String>,
1393    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1394    pub column_types: Vec<grafeo_common::types::LogicalType>,
1395    /// The actual result rows.
1396    pub rows: Vec<Vec<grafeo_common::types::Value>>,
1397    /// Query execution time in milliseconds (if timing was enabled).
1398    pub execution_time_ms: Option<f64>,
1399    /// Number of rows scanned during query execution (estimate).
1400    pub rows_scanned: Option<u64>,
1401    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
1402    pub status_message: Option<String>,
1403    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
1404    pub gql_status: grafeo_common::utils::GqlStatus,
1405}
1406
1407impl QueryResult {
1408    /// Creates a fully empty query result (no columns, no rows).
1409    #[must_use]
1410    pub fn empty() -> Self {
1411        Self {
1412            columns: Vec::new(),
1413            column_types: Vec::new(),
1414            rows: Vec::new(),
1415            execution_time_ms: None,
1416            rows_scanned: None,
1417            status_message: None,
1418            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1419        }
1420    }
1421
1422    /// Creates a query result with only a status message (for DDL commands).
1423    #[must_use]
1424    pub fn status(msg: impl Into<String>) -> Self {
1425        Self {
1426            columns: Vec::new(),
1427            column_types: Vec::new(),
1428            rows: Vec::new(),
1429            execution_time_ms: None,
1430            rows_scanned: None,
1431            status_message: Some(msg.into()),
1432            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1433        }
1434    }
1435
1436    /// Creates a new empty query result.
1437    #[must_use]
1438    pub fn new(columns: Vec<String>) -> Self {
1439        let len = columns.len();
1440        Self {
1441            columns,
1442            column_types: vec![grafeo_common::types::LogicalType::Any; len],
1443            rows: Vec::new(),
1444            execution_time_ms: None,
1445            rows_scanned: None,
1446            status_message: None,
1447            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1448        }
1449    }
1450
1451    /// Creates a new empty query result with column types.
1452    #[must_use]
1453    pub fn with_types(
1454        columns: Vec<String>,
1455        column_types: Vec<grafeo_common::types::LogicalType>,
1456    ) -> Self {
1457        Self {
1458            columns,
1459            column_types,
1460            rows: Vec::new(),
1461            execution_time_ms: None,
1462            rows_scanned: None,
1463            status_message: None,
1464            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1465        }
1466    }
1467
1468    /// Sets the execution metrics on this result.
1469    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1470        self.execution_time_ms = Some(execution_time_ms);
1471        self.rows_scanned = Some(rows_scanned);
1472        self
1473    }
1474
1475    /// Returns the execution time in milliseconds, if available.
1476    #[must_use]
1477    pub fn execution_time_ms(&self) -> Option<f64> {
1478        self.execution_time_ms
1479    }
1480
1481    /// Returns the number of rows scanned, if available.
1482    #[must_use]
1483    pub fn rows_scanned(&self) -> Option<u64> {
1484        self.rows_scanned
1485    }
1486
1487    /// Returns the number of rows.
1488    #[must_use]
1489    pub fn row_count(&self) -> usize {
1490        self.rows.len()
1491    }
1492
1493    /// Returns the number of columns.
1494    #[must_use]
1495    pub fn column_count(&self) -> usize {
1496        self.columns.len()
1497    }
1498
1499    /// Returns true if the result is empty.
1500    #[must_use]
1501    pub fn is_empty(&self) -> bool {
1502        self.rows.is_empty()
1503    }
1504
1505    /// Extracts a single value from the result.
1506    ///
1507    /// Use this when your query returns exactly one row with one column,
1508    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1509    ///
1510    /// # Errors
1511    ///
1512    /// Returns an error if the result has multiple rows or columns.
1513    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1514        if self.rows.len() != 1 || self.columns.len() != 1 {
1515            return Err(grafeo_common::utils::error::Error::InvalidValue(
1516                "Expected single value".to_string(),
1517            ));
1518        }
1519        T::from_value(&self.rows[0][0])
1520    }
1521
1522    /// Returns an iterator over the rows.
1523    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1524        self.rows.iter()
1525    }
1526}
1527
1528impl std::fmt::Display for QueryResult {
1529    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1530        let table = grafeo_common::fmt::format_result_table(
1531            &self.columns,
1532            &self.rows,
1533            self.execution_time_ms,
1534            self.status_message.as_deref(),
1535        );
1536        f.write_str(&table)
1537    }
1538}
1539
1540/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
1541///
1542/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1543/// Used by [`QueryResult::scalar()`] to extract typed values.
1544pub trait FromValue: Sized {
1545    /// Attempts the conversion, returning an error on type mismatch.
1546    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1547}
1548
1549impl FromValue for i64 {
1550    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1551        value
1552            .as_int64()
1553            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1554                expected: "INT64".to_string(),
1555                found: value.type_name().to_string(),
1556            })
1557    }
1558}
1559
1560impl FromValue for f64 {
1561    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1562        value
1563            .as_float64()
1564            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1565                expected: "FLOAT64".to_string(),
1566                found: value.type_name().to_string(),
1567            })
1568    }
1569}
1570
1571impl FromValue for String {
1572    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1573        value.as_str().map(String::from).ok_or_else(|| {
1574            grafeo_common::utils::error::Error::TypeMismatch {
1575                expected: "STRING".to_string(),
1576                found: value.type_name().to_string(),
1577            }
1578        })
1579    }
1580}
1581
1582impl FromValue for bool {
1583    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1584        value
1585            .as_bool()
1586            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1587                expected: "BOOL".to_string(),
1588                found: value.type_name().to_string(),
1589            })
1590    }
1591}
1592
1593#[cfg(test)]
1594mod tests {
1595    use super::*;
1596
1597    #[test]
1598    fn test_create_in_memory_database() {
1599        let db = GrafeoDB::new_in_memory();
1600        assert_eq!(db.node_count(), 0);
1601        assert_eq!(db.edge_count(), 0);
1602    }
1603
1604    #[test]
1605    fn test_database_config() {
1606        let config = Config::in_memory().with_threads(4).with_query_logging();
1607
1608        let db = GrafeoDB::with_config(config).unwrap();
1609        assert_eq!(db.config().threads, 4);
1610        assert!(db.config().query_logging);
1611    }
1612
1613    #[test]
1614    fn test_database_session() {
1615        let db = GrafeoDB::new_in_memory();
1616        let _session = db.session();
1617        // Session should be created successfully
1618    }
1619
1620    #[cfg(feature = "wal")]
1621    #[test]
1622    fn test_persistent_database_recovery() {
1623        use grafeo_common::types::Value;
1624        use tempfile::tempdir;
1625
1626        let dir = tempdir().unwrap();
1627        let db_path = dir.path().join("test_db");
1628
1629        // Create database and add some data
1630        {
1631            let db = GrafeoDB::open(&db_path).unwrap();
1632
1633            let alix = db.create_node(&["Person"]);
1634            db.set_node_property(alix, "name", Value::from("Alix"));
1635
1636            let gus = db.create_node(&["Person"]);
1637            db.set_node_property(gus, "name", Value::from("Gus"));
1638
1639            let _edge = db.create_edge(alix, gus, "KNOWS");
1640
1641            // Explicitly close to flush WAL
1642            db.close().unwrap();
1643        }
1644
1645        // Reopen and verify data was recovered
1646        {
1647            let db = GrafeoDB::open(&db_path).unwrap();
1648
1649            assert_eq!(db.node_count(), 2);
1650            assert_eq!(db.edge_count(), 1);
1651
1652            // Verify nodes exist
1653            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1654            assert!(node0.is_some());
1655
1656            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1657            assert!(node1.is_some());
1658        }
1659    }
1660
1661    #[cfg(feature = "wal")]
1662    #[test]
1663    fn test_wal_logging() {
1664        use tempfile::tempdir;
1665
1666        let dir = tempdir().unwrap();
1667        let db_path = dir.path().join("wal_test_db");
1668
1669        let db = GrafeoDB::open(&db_path).unwrap();
1670
1671        // Create some data
1672        let node = db.create_node(&["Test"]);
1673        db.delete_node(node);
1674
1675        // WAL should have records
1676        if let Some(wal) = db.wal() {
1677            assert!(wal.record_count() > 0);
1678        }
1679
1680        db.close().unwrap();
1681    }
1682
1683    #[cfg(feature = "wal")]
1684    #[test]
1685    fn test_wal_recovery_multiple_sessions() {
1686        // Tests that WAL recovery works correctly across multiple open/close cycles
1687        use grafeo_common::types::Value;
1688        use tempfile::tempdir;
1689
1690        let dir = tempdir().unwrap();
1691        let db_path = dir.path().join("multi_session_db");
1692
1693        // Session 1: Create initial data
1694        {
1695            let db = GrafeoDB::open(&db_path).unwrap();
1696            let alix = db.create_node(&["Person"]);
1697            db.set_node_property(alix, "name", Value::from("Alix"));
1698            db.close().unwrap();
1699        }
1700
1701        // Session 2: Add more data
1702        {
1703            let db = GrafeoDB::open(&db_path).unwrap();
1704            assert_eq!(db.node_count(), 1); // Previous data recovered
1705            let gus = db.create_node(&["Person"]);
1706            db.set_node_property(gus, "name", Value::from("Gus"));
1707            db.close().unwrap();
1708        }
1709
1710        // Session 3: Verify all data
1711        {
1712            let db = GrafeoDB::open(&db_path).unwrap();
1713            assert_eq!(db.node_count(), 2);
1714
1715            // Verify properties were recovered correctly
1716            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1717            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1718
1719            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1720            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1721        }
1722    }
1723
1724    #[cfg(feature = "wal")]
1725    #[test]
1726    fn test_database_consistency_after_mutations() {
1727        // Tests that database remains consistent after a series of create/delete operations
1728        use grafeo_common::types::Value;
1729        use tempfile::tempdir;
1730
1731        let dir = tempdir().unwrap();
1732        let db_path = dir.path().join("consistency_db");
1733
1734        {
1735            let db = GrafeoDB::open(&db_path).unwrap();
1736
1737            // Create nodes
1738            let a = db.create_node(&["Node"]);
1739            let b = db.create_node(&["Node"]);
1740            let c = db.create_node(&["Node"]);
1741
1742            // Create edges
1743            let e1 = db.create_edge(a, b, "LINKS");
1744            let _e2 = db.create_edge(b, c, "LINKS");
1745
1746            // Delete middle node and its edge
1747            db.delete_edge(e1);
1748            db.delete_node(b);
1749
1750            // Set properties on remaining nodes
1751            db.set_node_property(a, "value", Value::Int64(1));
1752            db.set_node_property(c, "value", Value::Int64(3));
1753
1754            db.close().unwrap();
1755        }
1756
1757        // Reopen and verify consistency
1758        {
1759            let db = GrafeoDB::open(&db_path).unwrap();
1760
1761            // Should have 2 nodes (a and c), b was deleted
1762            // Note: node_count includes deleted nodes in some implementations
1763            // What matters is that the non-deleted nodes are accessible
1764            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1765            assert!(node_a.is_some());
1766
1767            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1768            assert!(node_c.is_some());
1769
1770            // Middle node should be deleted
1771            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1772            assert!(node_b.is_none());
1773        }
1774    }
1775
1776    #[cfg(feature = "wal")]
1777    #[test]
1778    fn test_close_is_idempotent() {
1779        // Calling close() multiple times should not cause errors
1780        use tempfile::tempdir;
1781
1782        let dir = tempdir().unwrap();
1783        let db_path = dir.path().join("close_test_db");
1784
1785        let db = GrafeoDB::open(&db_path).unwrap();
1786        db.create_node(&["Test"]);
1787
1788        // First close should succeed
1789        assert!(db.close().is_ok());
1790
1791        // Second close should also succeed (idempotent)
1792        assert!(db.close().is_ok());
1793    }
1794
1795    #[test]
1796    fn test_with_store_external_backend() {
1797        use grafeo_core::graph::lpg::LpgStore;
1798
1799        let external = Arc::new(LpgStore::new().unwrap());
1800
1801        // Seed data on the external store directly
1802        let n1 = external.create_node(&["Person"]);
1803        external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
1804
1805        let db = GrafeoDB::with_store(
1806            Arc::clone(&external) as Arc<dyn GraphStoreMut>,
1807            Config::in_memory(),
1808        )
1809        .unwrap();
1810
1811        let session = db.session();
1812
1813        // Session should see data from the external store via execute
1814        #[cfg(feature = "gql")]
1815        {
1816            let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
1817            assert_eq!(result.rows.len(), 1);
1818        }
1819    }
1820
1821    #[test]
1822    fn test_with_config_custom_memory_limit() {
1823        let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); // 64 MB
1824
1825        let db = GrafeoDB::with_config(config).unwrap();
1826        assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
1827        assert_eq!(db.node_count(), 0);
1828    }
1829
1830    #[cfg(feature = "metrics")]
1831    #[test]
1832    fn test_database_metrics_registry() {
1833        let db = GrafeoDB::new_in_memory();
1834
1835        // Perform some operations
1836        db.create_node(&["Person"]);
1837        db.create_node(&["Person"]);
1838
1839        // Check that metrics snapshot returns data
1840        let snap = db.metrics();
1841        // Session created counter should reflect at least 0 (metrics is initialized)
1842        assert_eq!(snap.query_count, 0); // No queries executed yet
1843    }
1844
1845    #[test]
1846    fn test_query_result_has_metrics() {
1847        // Verifies that query results include execution metrics
1848        let db = GrafeoDB::new_in_memory();
1849        db.create_node(&["Person"]);
1850        db.create_node(&["Person"]);
1851
1852        #[cfg(feature = "gql")]
1853        {
1854            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1855
1856            // Metrics should be populated
1857            assert!(result.execution_time_ms.is_some());
1858            assert!(result.rows_scanned.is_some());
1859            assert!(result.execution_time_ms.unwrap() >= 0.0);
1860            assert_eq!(result.rows_scanned.unwrap(), 2);
1861        }
1862    }
1863
1864    #[test]
1865    fn test_empty_query_result_metrics() {
1866        // Verifies metrics are correct for queries returning no results
1867        let db = GrafeoDB::new_in_memory();
1868        db.create_node(&["Person"]);
1869
1870        #[cfg(feature = "gql")]
1871        {
1872            // Query that matches nothing
1873            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1874
1875            assert!(result.execution_time_ms.is_some());
1876            assert!(result.rows_scanned.is_some());
1877            assert_eq!(result.rows_scanned.unwrap(), 0);
1878        }
1879    }
1880
1881    #[cfg(feature = "cdc")]
1882    mod cdc_integration {
1883        use super::*;
1884
1885        #[test]
1886        fn test_node_lifecycle_history() {
1887            let db = GrafeoDB::new_in_memory();
1888
1889            // Create
1890            let id = db.create_node(&["Person"]);
1891            // Update
1892            db.set_node_property(id, "name", "Alix".into());
1893            db.set_node_property(id, "name", "Gus".into());
1894            // Delete
1895            db.delete_node(id);
1896
1897            let history = db.history(id).unwrap();
1898            assert_eq!(history.len(), 4); // create + 2 updates + delete
1899            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1900            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1901            assert!(history[1].before.is_none()); // first set_node_property has no prior value
1902            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1903            assert!(history[2].before.is_some()); // second update has prior "Alix"
1904            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1905        }
1906
1907        #[test]
1908        fn test_edge_lifecycle_history() {
1909            let db = GrafeoDB::new_in_memory();
1910
1911            let alix = db.create_node(&["Person"]);
1912            let gus = db.create_node(&["Person"]);
1913            let edge = db.create_edge(alix, gus, "KNOWS");
1914            db.set_edge_property(edge, "since", 2024i64.into());
1915            db.delete_edge(edge);
1916
1917            let history = db.history(edge).unwrap();
1918            assert_eq!(history.len(), 3); // create + update + delete
1919            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1920            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1921            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1922        }
1923
1924        #[test]
1925        fn test_create_node_with_props_cdc() {
1926            let db = GrafeoDB::new_in_memory();
1927
1928            let id = db.create_node_with_props(
1929                &["Person"],
1930                vec![
1931                    ("name", grafeo_common::types::Value::from("Alix")),
1932                    ("age", grafeo_common::types::Value::from(30i64)),
1933                ],
1934            );
1935
1936            let history = db.history(id).unwrap();
1937            assert_eq!(history.len(), 1);
1938            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1939            // Props should be captured
1940            let after = history[0].after.as_ref().unwrap();
1941            assert_eq!(after.len(), 2);
1942        }
1943
1944        #[test]
1945        fn test_changes_between() {
1946            let db = GrafeoDB::new_in_memory();
1947
1948            let id1 = db.create_node(&["A"]);
1949            let _id2 = db.create_node(&["B"]);
1950            db.set_node_property(id1, "x", 1i64.into());
1951
1952            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
1953            let changes = db
1954                .changes_between(
1955                    grafeo_common::types::EpochId(0),
1956                    grafeo_common::types::EpochId(u64::MAX),
1957                )
1958                .unwrap();
1959            assert_eq!(changes.len(), 3); // 2 creates + 1 update
1960        }
1961    }
1962
1963    #[test]
1964    fn test_with_store_basic() {
1965        use grafeo_core::graph::lpg::LpgStore;
1966
1967        let store = Arc::new(LpgStore::new().unwrap());
1968        let n1 = store.create_node(&["Person"]);
1969        store.set_node_property(n1, "name", "Alix".into());
1970
1971        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1972        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1973
1974        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1975        assert_eq!(result.rows.len(), 1);
1976    }
1977
1978    #[test]
1979    fn test_with_store_session() {
1980        use grafeo_core::graph::lpg::LpgStore;
1981
1982        let store = Arc::new(LpgStore::new().unwrap());
1983        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1984        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1985
1986        let session = db.session();
1987        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1988        assert_eq!(result.rows.len(), 1);
1989    }
1990
1991    #[test]
1992    fn test_with_store_mutations() {
1993        use grafeo_core::graph::lpg::LpgStore;
1994
1995        let store = Arc::new(LpgStore::new().unwrap());
1996        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1997        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1998
1999        let mut session = db.session();
2000
2001        // Use an explicit transaction so INSERT and MATCH share the same
2002        // transaction context. With PENDING epochs, uncommitted versions are
2003        // only visible to the owning transaction.
2004        session.begin_transaction().unwrap();
2005        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2006
2007        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
2008        assert_eq!(result.rows.len(), 1);
2009
2010        session.commit().unwrap();
2011    }
2012}