Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21#[cfg(feature = "rdf")]
22mod rdf_ops;
23mod search;
24#[cfg(feature = "wal")]
25pub(crate) mod wal_store;
26
27use grafeo_common::grafeo_error;
28#[cfg(feature = "wal")]
29use std::path::Path;
30use std::sync::Arc;
31use std::sync::atomic::AtomicUsize;
32
33use parking_lot::RwLock;
34
35#[cfg(feature = "grafeo-file")]
36use grafeo_adapters::storage::file::GrafeoFileManager;
37#[cfg(feature = "wal")]
38use grafeo_adapters::storage::wal::{
39    DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
40};
41use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
42use grafeo_common::utils::error::Result;
43use grafeo_core::graph::GraphStoreMut;
44use grafeo_core::graph::lpg::LpgStore;
45#[cfg(feature = "rdf")]
46use grafeo_core::graph::rdf::RdfStore;
47
48use crate::catalog::Catalog;
49use crate::config::Config;
50use crate::query::cache::QueryCache;
51use crate::session::Session;
52use crate::transaction::TransactionManager;
53
54/// Your handle to a Grafeo database.
55///
56/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
57/// quick experiments, or [`open()`](Self::open) for persistent storage.
58/// Then grab a [`session()`](Self::session) to start querying.
59///
60/// # Examples
61///
62/// ```
63/// use grafeo_engine::GrafeoDB;
64///
65/// // Quick in-memory database
66/// let db = GrafeoDB::new_in_memory();
67///
68/// // Add some data
69/// db.create_node(&["Person"]);
70///
71/// // Query it
72/// let session = db.session();
73/// let result = session.execute("MATCH (p:Person) RETURN p")?;
74/// # Ok::<(), grafeo_common::utils::error::Error>(())
75/// ```
76pub struct GrafeoDB {
77    /// Database configuration.
78    pub(super) config: Config,
79    /// The underlying graph store.
80    pub(super) store: Arc<LpgStore>,
81    /// Schema and metadata catalog shared across sessions.
82    pub(super) catalog: Arc<Catalog>,
83    /// RDF triple store (if RDF feature is enabled).
84    #[cfg(feature = "rdf")]
85    pub(super) rdf_store: Arc<RdfStore>,
86    /// Transaction manager.
87    pub(super) transaction_manager: Arc<TransactionManager>,
88    /// Unified buffer manager.
89    pub(super) buffer_manager: Arc<BufferManager>,
90    /// Write-ahead log manager (if durability is enabled).
91    #[cfg(feature = "wal")]
92    pub(super) wal: Option<Arc<LpgWal>>,
93    /// Shared WAL graph context tracker. Tracks which named graph was last
94    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
95    /// records only when the context actually changes.
96    #[cfg(feature = "wal")]
97    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
98    /// Query cache for parsed and optimized plans.
99    pub(super) query_cache: Arc<QueryCache>,
100    /// Shared commit counter for auto-GC across sessions.
101    pub(super) commit_counter: Arc<AtomicUsize>,
102    /// Whether the database is open.
103    pub(super) is_open: RwLock<bool>,
104    /// Change data capture log for tracking mutations.
105    #[cfg(feature = "cdc")]
106    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
107    /// Registered embedding models for text-to-vector conversion.
108    #[cfg(feature = "embed")]
109    pub(super) embedding_models:
110        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
111    /// Single-file database manager (when using `.grafeo` format).
112    #[cfg(feature = "grafeo-file")]
113    pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
114    /// External graph store (when using with_store()).
115    /// When set, sessions route queries through this store instead of the built-in LpgStore.
116    pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
117    /// Metrics registry shared across all sessions.
118    #[cfg(feature = "metrics")]
119    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
120    /// Persistent graph context for one-shot `execute()` calls.
121    /// When set, each call to `session()` pre-configures the session to this graph.
122    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
123    current_graph: RwLock<Option<String>>,
124    /// Persistent schema context for one-shot `execute()` calls.
125    /// When set, each call to `session()` pre-configures the session to this schema.
126    /// Updated after every one-shot `execute()` to reflect `SESSION SET SCHEMA` / `SESSION RESET`.
127    current_schema: RwLock<Option<String>>,
128    /// Whether this database is open in read-only mode.
129    /// When true, sessions automatically enforce read-only transactions.
130    read_only: bool,
131}
132
133impl GrafeoDB {
134    /// Creates an in-memory database, fast to create, gone when dropped.
135    ///
136    /// Use this for tests, experiments, or when you don't need persistence.
137    /// For data that survives restarts, use [`open()`](Self::open) instead.
138    ///
139    /// # Panics
140    ///
141    /// Panics if the internal arena allocator cannot be initialized (out of memory).
142    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
143    ///
144    /// # Examples
145    ///
146    /// ```
147    /// use grafeo_engine::GrafeoDB;
148    ///
149    /// let db = GrafeoDB::new_in_memory();
150    /// let session = db.session();
151    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
152    /// # Ok::<(), grafeo_common::utils::error::Error>(())
153    /// ```
154    #[must_use]
155    pub fn new_in_memory() -> Self {
156        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
157    }
158
159    /// Opens a database at the given path, creating it if it doesn't exist.
160    ///
161    /// If you've used this path before, Grafeo recovers your data from the
162    /// write-ahead log automatically. First open on a new path creates an
163    /// empty database.
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if the path isn't writable or recovery fails.
168    ///
169    /// # Examples
170    ///
171    /// ```no_run
172    /// use grafeo_engine::GrafeoDB;
173    ///
174    /// let db = GrafeoDB::open("./my_social_network")?;
175    /// # Ok::<(), grafeo_common::utils::error::Error>(())
176    /// ```
177    #[cfg(feature = "wal")]
178    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
179        Self::with_config(Config::persistent(path.as_ref()))
180    }
181
182    /// Opens an existing database in read-only mode.
183    ///
184    /// Uses a shared file lock, so multiple processes can read the same
185    /// `.grafeo` file concurrently. The database loads the last checkpoint
186    /// snapshot but does **not** replay the WAL or allow mutations.
187    ///
188    /// Currently only supports the single-file (`.grafeo`) format.
189    ///
190    /// # Errors
191    ///
192    /// Returns an error if the file doesn't exist or can't be read.
193    ///
194    /// # Examples
195    ///
196    /// ```no_run
197    /// use grafeo_engine::GrafeoDB;
198    ///
199    /// let db = GrafeoDB::open_read_only("./my_graph.grafeo")?;
200    /// let session = db.session();
201    /// let result = session.execute("MATCH (n) RETURN n LIMIT 10")?;
202    /// // Mutations will return an error:
203    /// // session.execute("INSERT (:Person)") => Err(ReadOnly)
204    /// # Ok::<(), grafeo_common::utils::error::Error>(())
205    /// ```
206    #[cfg(feature = "grafeo-file")]
207    pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
208        Self::with_config(Config::read_only(path.as_ref()))
209    }
210
211    /// Creates a database with custom configuration.
212    ///
213    /// Use this when you need fine-grained control over memory limits,
214    /// thread counts, or persistence settings. For most cases,
215    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
216    /// are simpler.
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if the database can't be created or recovery fails.
221    ///
222    /// # Examples
223    ///
224    /// ```
225    /// use grafeo_engine::{GrafeoDB, Config};
226    ///
227    /// // In-memory with a 512MB limit
228    /// let config = Config::in_memory()
229    ///     .with_memory_limit(512 * 1024 * 1024);
230    ///
231    /// let db = GrafeoDB::with_config(config)?;
232    /// # Ok::<(), grafeo_common::utils::error::Error>(())
233    /// ```
234    pub fn with_config(config: Config) -> Result<Self> {
235        // Validate configuration before proceeding
236        config
237            .validate()
238            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
239
240        let store = Arc::new(LpgStore::new()?);
241        #[cfg(feature = "rdf")]
242        let rdf_store = Arc::new(RdfStore::new());
243        let transaction_manager = Arc::new(TransactionManager::new());
244
245        // Create buffer manager with configured limits
246        let buffer_config = BufferManagerConfig {
247            budget: config.memory_limit.unwrap_or_else(|| {
248                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
249            }),
250            spill_path: config
251                .spill_path
252                .clone()
253                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
254            ..BufferManagerConfig::default()
255        };
256        let buffer_manager = BufferManager::new(buffer_config);
257
258        // Create catalog early so WAL replay can restore schema definitions
259        let catalog = Arc::new(Catalog::new());
260
261        let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
262
263        // --- Single-file format (.grafeo) ---
264        #[cfg(feature = "grafeo-file")]
265        let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
266            // Read-only mode: open with shared lock, load snapshot, skip WAL
267            if let Some(ref db_path) = config.path {
268                if db_path.exists() && db_path.is_file() {
269                    let fm = GrafeoFileManager::open_read_only(db_path)?;
270                    let snapshot_data = fm.read_snapshot()?;
271                    if !snapshot_data.is_empty() {
272                        Self::apply_snapshot_data(
273                            &store,
274                            &catalog,
275                            #[cfg(feature = "rdf")]
276                            &rdf_store,
277                            &snapshot_data,
278                        )?;
279                    }
280                    Some(Arc::new(fm))
281                } else {
282                    return Err(grafeo_common::utils::error::Error::Internal(format!(
283                        "read-only open requires an existing .grafeo file: {}",
284                        db_path.display()
285                    )));
286                }
287            } else {
288                return Err(grafeo_common::utils::error::Error::Internal(
289                    "read-only mode requires a database path".to_string(),
290                ));
291            }
292        } else if config.wal_enabled {
293            if let Some(ref db_path) = config.path {
294                if Self::should_use_single_file(db_path, config.storage_format) {
295                    let fm = if db_path.exists() && db_path.is_file() {
296                        GrafeoFileManager::open(db_path)?
297                    } else if !db_path.exists() {
298                        GrafeoFileManager::create(db_path)?
299                    } else {
300                        // Path exists but is not a file (directory, etc.)
301                        return Err(grafeo_common::utils::error::Error::Internal(format!(
302                            "path exists but is not a file: {}",
303                            db_path.display()
304                        )));
305                    };
306
307                    // Load snapshot data from the file
308                    let snapshot_data = fm.read_snapshot()?;
309                    if !snapshot_data.is_empty() {
310                        Self::apply_snapshot_data(
311                            &store,
312                            &catalog,
313                            #[cfg(feature = "rdf")]
314                            &rdf_store,
315                            &snapshot_data,
316                        )?;
317                    }
318
319                    // Recover sidecar WAL if present
320                    if fm.has_sidecar_wal() {
321                        let recovery = WalRecovery::new(fm.sidecar_wal_path());
322                        let records = recovery.recover()?;
323                        Self::apply_wal_records(
324                            &store,
325                            &catalog,
326                            #[cfg(feature = "rdf")]
327                            &rdf_store,
328                            &records,
329                        )?;
330                    }
331
332                    Some(Arc::new(fm))
333                } else {
334                    None
335                }
336            } else {
337                None
338            }
339        } else {
340            None
341        };
342
343        // Determine whether to use the WAL directory path (legacy) or sidecar
344        // Read-only mode skips WAL entirely (no recovery, no creation).
345        #[cfg(feature = "wal")]
346        let wal = if is_read_only {
347            None
348        } else if config.wal_enabled {
349            if let Some(ref db_path) = config.path {
350                // When using single-file format, the WAL is a sidecar directory
351                #[cfg(feature = "grafeo-file")]
352                let wal_path = if let Some(ref fm) = file_manager {
353                    let p = fm.sidecar_wal_path();
354                    std::fs::create_dir_all(&p)?;
355                    p
356                } else {
357                    // Legacy: WAL inside the database directory
358                    std::fs::create_dir_all(db_path)?;
359                    db_path.join("wal")
360                };
361
362                #[cfg(not(feature = "grafeo-file"))]
363                let wal_path = {
364                    std::fs::create_dir_all(db_path)?;
365                    db_path.join("wal")
366                };
367
368                // For legacy WAL directory format, check if WAL exists and recover
369                #[cfg(feature = "grafeo-file")]
370                let is_single_file = file_manager.is_some();
371                #[cfg(not(feature = "grafeo-file"))]
372                let is_single_file = false;
373
374                if !is_single_file && wal_path.exists() {
375                    let recovery = WalRecovery::new(&wal_path);
376                    let records = recovery.recover()?;
377                    Self::apply_wal_records(
378                        &store,
379                        &catalog,
380                        #[cfg(feature = "rdf")]
381                        &rdf_store,
382                        &records,
383                    )?;
384                }
385
386                // Open/create WAL manager with configured durability
387                let wal_durability = match config.wal_durability {
388                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
389                    crate::config::DurabilityMode::Batch {
390                        max_delay_ms,
391                        max_records,
392                    } => WalDurabilityMode::Batch {
393                        max_delay_ms,
394                        max_records,
395                    },
396                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
397                        WalDurabilityMode::Adaptive { target_interval_ms }
398                    }
399                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
400                };
401                let wal_config = WalConfig {
402                    durability: wal_durability,
403                    ..WalConfig::default()
404                };
405                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
406                Some(Arc::new(wal_manager))
407            } else {
408                None
409            }
410        } else {
411            None
412        };
413
414        // Create query cache with default capacity (1000 queries)
415        let query_cache = Arc::new(QueryCache::default());
416
417        // After all snapshot/WAL recovery, sync TransactionManager epoch
418        // with the store so queries use the correct viewing epoch.
419        #[cfg(feature = "temporal")]
420        transaction_manager.sync_epoch(store.current_epoch());
421
422        Ok(Self {
423            config,
424            store,
425            catalog,
426            #[cfg(feature = "rdf")]
427            rdf_store,
428            transaction_manager,
429            buffer_manager,
430            #[cfg(feature = "wal")]
431            wal,
432            #[cfg(feature = "wal")]
433            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
434            query_cache,
435            commit_counter: Arc::new(AtomicUsize::new(0)),
436            is_open: RwLock::new(true),
437            #[cfg(feature = "cdc")]
438            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
439            #[cfg(feature = "embed")]
440            embedding_models: RwLock::new(hashbrown::HashMap::new()),
441            #[cfg(feature = "grafeo-file")]
442            file_manager,
443            external_store: None,
444            #[cfg(feature = "metrics")]
445            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
446            current_graph: RwLock::new(None),
447            current_schema: RwLock::new(None),
448            read_only: is_read_only,
449        })
450    }
451
452    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
453    ///
454    /// The external store handles all data persistence. WAL, CDC, and index
455    /// management are the responsibility of the store implementation.
456    ///
457    /// Query execution (all 6 languages, optimizer, planner) works through the
458    /// provided store. Admin operations (schema introspection, persistence,
459    /// vector/text indexes) are not available on external stores.
460    ///
461    /// # Examples
462    ///
463    /// ```no_run
464    /// use std::sync::Arc;
465    /// use grafeo_engine::{GrafeoDB, Config};
466    /// use grafeo_core::graph::GraphStoreMut;
467    ///
468    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
469    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
470    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
471    ///     Ok(())
472    /// }
473    /// ```
474    ///
475    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
476    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
477        config
478            .validate()
479            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
480
481        let dummy_store = Arc::new(LpgStore::new()?);
482        let transaction_manager = Arc::new(TransactionManager::new());
483
484        let buffer_config = BufferManagerConfig {
485            budget: config.memory_limit.unwrap_or_else(|| {
486                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
487            }),
488            spill_path: None,
489            ..BufferManagerConfig::default()
490        };
491        let buffer_manager = BufferManager::new(buffer_config);
492
493        let query_cache = Arc::new(QueryCache::default());
494
495        Ok(Self {
496            config,
497            store: dummy_store,
498            catalog: Arc::new(Catalog::new()),
499            #[cfg(feature = "rdf")]
500            rdf_store: Arc::new(RdfStore::new()),
501            transaction_manager,
502            buffer_manager,
503            #[cfg(feature = "wal")]
504            wal: None,
505            #[cfg(feature = "wal")]
506            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
507            query_cache,
508            commit_counter: Arc::new(AtomicUsize::new(0)),
509            is_open: RwLock::new(true),
510            #[cfg(feature = "cdc")]
511            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
512            #[cfg(feature = "embed")]
513            embedding_models: RwLock::new(hashbrown::HashMap::new()),
514            #[cfg(feature = "grafeo-file")]
515            file_manager: None,
516            external_store: Some(store),
517            #[cfg(feature = "metrics")]
518            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
519            current_graph: RwLock::new(None),
520            current_schema: RwLock::new(None),
521            read_only: false,
522        })
523    }
524
525    /// Applies WAL records to restore the database state.
526    ///
527    /// Data mutation records are routed through a graph cursor that tracks
528    /// `SwitchGraph` context markers, replaying mutations into the correct
529    /// named graph (or the default graph when cursor is `None`).
530    #[cfg(feature = "wal")]
531    fn apply_wal_records(
532        store: &Arc<LpgStore>,
533        catalog: &Catalog,
534        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
535        records: &[WalRecord],
536    ) -> Result<()> {
537        use crate::catalog::{
538            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
539        };
540        use grafeo_common::utils::error::Error;
541
542        // Graph cursor: tracks which named graph receives data mutations.
543        // `None` means the default graph.
544        let mut current_graph: Option<String> = None;
545        let mut target_store: Arc<LpgStore> = Arc::clone(store);
546
547        for record in records {
548            match record {
549                // --- Named graph lifecycle ---
550                WalRecord::CreateNamedGraph { name } => {
551                    let _ = store.create_graph(name);
552                }
553                WalRecord::DropNamedGraph { name } => {
554                    store.drop_graph(name);
555                    // Reset cursor if the dropped graph was active
556                    if current_graph.as_deref() == Some(name.as_str()) {
557                        current_graph = None;
558                        target_store = Arc::clone(store);
559                    }
560                }
561                WalRecord::SwitchGraph { name } => {
562                    current_graph.clone_from(name);
563                    target_store = match &current_graph {
564                        None => Arc::clone(store),
565                        Some(graph_name) => store
566                            .graph_or_create(graph_name)
567                            .map_err(|e| Error::Internal(e.to_string()))?,
568                    };
569                }
570
571                // --- Data mutations: routed through target_store ---
572                WalRecord::CreateNode { id, labels } => {
573                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
574                    target_store.create_node_with_id(*id, &label_refs)?;
575                }
576                WalRecord::DeleteNode { id } => {
577                    target_store.delete_node(*id);
578                }
579                WalRecord::CreateEdge {
580                    id,
581                    src,
582                    dst,
583                    edge_type,
584                } => {
585                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
586                }
587                WalRecord::DeleteEdge { id } => {
588                    target_store.delete_edge(*id);
589                }
590                WalRecord::SetNodeProperty { id, key, value } => {
591                    target_store.set_node_property(*id, key, value.clone());
592                }
593                WalRecord::SetEdgeProperty { id, key, value } => {
594                    target_store.set_edge_property(*id, key, value.clone());
595                }
596                WalRecord::AddNodeLabel { id, label } => {
597                    target_store.add_label(*id, label);
598                }
599                WalRecord::RemoveNodeLabel { id, label } => {
600                    target_store.remove_label(*id, label);
601                }
602                WalRecord::RemoveNodeProperty { id, key } => {
603                    target_store.remove_node_property(*id, key);
604                }
605                WalRecord::RemoveEdgeProperty { id, key } => {
606                    target_store.remove_edge_property(*id, key);
607                }
608
609                // --- Schema DDL replay (always on root catalog) ---
610                WalRecord::CreateNodeType {
611                    name,
612                    properties,
613                    constraints,
614                } => {
615                    let def = NodeTypeDefinition {
616                        name: name.clone(),
617                        properties: properties
618                            .iter()
619                            .map(|(n, t, nullable)| TypedProperty {
620                                name: n.clone(),
621                                data_type: PropertyDataType::from_type_name(t),
622                                nullable: *nullable,
623                                default_value: None,
624                            })
625                            .collect(),
626                        constraints: constraints
627                            .iter()
628                            .map(|(kind, props)| match kind.as_str() {
629                                "unique" => TypeConstraint::Unique(props.clone()),
630                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
631                                "not_null" if !props.is_empty() => {
632                                    TypeConstraint::NotNull(props[0].clone())
633                                }
634                                _ => TypeConstraint::Unique(props.clone()),
635                            })
636                            .collect(),
637                        parent_types: Vec::new(),
638                    };
639                    let _ = catalog.register_node_type(def);
640                }
641                WalRecord::DropNodeType { name } => {
642                    let _ = catalog.drop_node_type(name);
643                }
644                WalRecord::CreateEdgeType {
645                    name,
646                    properties,
647                    constraints,
648                } => {
649                    let def = EdgeTypeDefinition {
650                        name: name.clone(),
651                        properties: properties
652                            .iter()
653                            .map(|(n, t, nullable)| TypedProperty {
654                                name: n.clone(),
655                                data_type: PropertyDataType::from_type_name(t),
656                                nullable: *nullable,
657                                default_value: None,
658                            })
659                            .collect(),
660                        constraints: constraints
661                            .iter()
662                            .map(|(kind, props)| match kind.as_str() {
663                                "unique" => TypeConstraint::Unique(props.clone()),
664                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
665                                "not_null" if !props.is_empty() => {
666                                    TypeConstraint::NotNull(props[0].clone())
667                                }
668                                _ => TypeConstraint::Unique(props.clone()),
669                            })
670                            .collect(),
671                        source_node_types: Vec::new(),
672                        target_node_types: Vec::new(),
673                    };
674                    let _ = catalog.register_edge_type_def(def);
675                }
676                WalRecord::DropEdgeType { name } => {
677                    let _ = catalog.drop_edge_type_def(name);
678                }
679                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
680                    // Index recreation is handled by the store on startup
681                    // (indexes are rebuilt from data, not WAL)
682                }
683                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
684                    // Constraint definitions are part of type definitions
685                    // and replayed via CreateNodeType/CreateEdgeType
686                }
687                WalRecord::CreateGraphType {
688                    name,
689                    node_types,
690                    edge_types,
691                    open,
692                } => {
693                    use crate::catalog::GraphTypeDefinition;
694                    let def = GraphTypeDefinition {
695                        name: name.clone(),
696                        allowed_node_types: node_types.clone(),
697                        allowed_edge_types: edge_types.clone(),
698                        open: *open,
699                    };
700                    let _ = catalog.register_graph_type(def);
701                }
702                WalRecord::DropGraphType { name } => {
703                    let _ = catalog.drop_graph_type(name);
704                }
705                WalRecord::CreateSchema { name } => {
706                    let _ = catalog.register_schema_namespace(name.clone());
707                }
708                WalRecord::DropSchema { name } => {
709                    let _ = catalog.drop_schema_namespace(name);
710                }
711
712                WalRecord::AlterNodeType { name, alterations } => {
713                    for (action, prop_name, type_name, nullable) in alterations {
714                        match action.as_str() {
715                            "add" => {
716                                let prop = TypedProperty {
717                                    name: prop_name.clone(),
718                                    data_type: PropertyDataType::from_type_name(type_name),
719                                    nullable: *nullable,
720                                    default_value: None,
721                                };
722                                let _ = catalog.alter_node_type_add_property(name, prop);
723                            }
724                            "drop" => {
725                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
726                            }
727                            _ => {}
728                        }
729                    }
730                }
731                WalRecord::AlterEdgeType { name, alterations } => {
732                    for (action, prop_name, type_name, nullable) in alterations {
733                        match action.as_str() {
734                            "add" => {
735                                let prop = TypedProperty {
736                                    name: prop_name.clone(),
737                                    data_type: PropertyDataType::from_type_name(type_name),
738                                    nullable: *nullable,
739                                    default_value: None,
740                                };
741                                let _ = catalog.alter_edge_type_add_property(name, prop);
742                            }
743                            "drop" => {
744                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
745                            }
746                            _ => {}
747                        }
748                    }
749                }
750                WalRecord::AlterGraphType { name, alterations } => {
751                    for (action, type_name) in alterations {
752                        match action.as_str() {
753                            "add_node" => {
754                                let _ =
755                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
756                            }
757                            "drop_node" => {
758                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
759                            }
760                            "add_edge" => {
761                                let _ =
762                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
763                            }
764                            "drop_edge" => {
765                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
766                            }
767                            _ => {}
768                        }
769                    }
770                }
771
772                WalRecord::CreateProcedure {
773                    name,
774                    params,
775                    returns,
776                    body,
777                } => {
778                    use crate::catalog::ProcedureDefinition;
779                    let def = ProcedureDefinition {
780                        name: name.clone(),
781                        params: params.clone(),
782                        returns: returns.clone(),
783                        body: body.clone(),
784                    };
785                    let _ = catalog.register_procedure(def);
786                }
787                WalRecord::DropProcedure { name } => {
788                    let _ = catalog.drop_procedure(name);
789                }
790
791                // --- RDF triple replay ---
792                #[cfg(feature = "rdf")]
793                WalRecord::InsertRdfTriple { .. }
794                | WalRecord::DeleteRdfTriple { .. }
795                | WalRecord::ClearRdfGraph { .. }
796                | WalRecord::CreateRdfGraph { .. }
797                | WalRecord::DropRdfGraph { .. } => {
798                    rdf_ops::replay_rdf_wal_record(rdf_store, record);
799                }
800                #[cfg(not(feature = "rdf"))]
801                WalRecord::InsertRdfTriple { .. }
802                | WalRecord::DeleteRdfTriple { .. }
803                | WalRecord::ClearRdfGraph { .. }
804                | WalRecord::CreateRdfGraph { .. }
805                | WalRecord::DropRdfGraph { .. } => {}
806
807                WalRecord::TransactionCommit { .. } => {
808                    // In temporal mode, advance the store epoch on each committed
809                    // transaction so that subsequent property/label operations
810                    // are recorded at the correct epoch in their VersionLogs.
811                    #[cfg(feature = "temporal")]
812                    {
813                        target_store.new_epoch();
814                    }
815                }
816                WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
817                    // Transaction control records don't need replay action
818                    // (recovery already filtered to only committed transactions)
819                }
820            }
821        }
822        Ok(())
823    }
824
825    // =========================================================================
826    // Single-file format helpers
827    // =========================================================================
828
829    /// Returns `true` if the given path should use single-file format.
830    #[cfg(feature = "grafeo-file")]
831    fn should_use_single_file(
832        path: &std::path::Path,
833        configured: crate::config::StorageFormat,
834    ) -> bool {
835        use crate::config::StorageFormat;
836        match configured {
837            StorageFormat::SingleFile => true,
838            StorageFormat::WalDirectory => false,
839            StorageFormat::Auto => {
840                // Existing file: check magic bytes
841                if path.is_file() {
842                    if let Ok(mut f) = std::fs::File::open(path) {
843                        use std::io::Read;
844                        let mut magic = [0u8; 4];
845                        if f.read_exact(&mut magic).is_ok()
846                            && magic == grafeo_adapters::storage::file::MAGIC
847                        {
848                            return true;
849                        }
850                    }
851                    return false;
852                }
853                // Existing directory: legacy format
854                if path.is_dir() {
855                    return false;
856                }
857                // New path: check extension
858                path.extension().is_some_and(|ext| ext == "grafeo")
859            }
860        }
861    }
862
863    /// Applies snapshot data (from a `.grafeo` file) to restore the store and catalog.
864    #[cfg(feature = "grafeo-file")]
865    fn apply_snapshot_data(
866        store: &Arc<LpgStore>,
867        catalog: &Arc<crate::catalog::Catalog>,
868        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
869        data: &[u8],
870    ) -> Result<()> {
871        persistence::load_snapshot_into_store(
872            store,
873            catalog,
874            #[cfg(feature = "rdf")]
875            rdf_store,
876            data,
877        )
878    }
879
880    // =========================================================================
881    // Session & Configuration
882    // =========================================================================
883
884    /// Opens a new session for running queries.
885    ///
886    /// Sessions are cheap to create: spin up as many as you need. Each
887    /// gets its own transaction context, so concurrent sessions won't
888    /// block each other on reads.
889    ///
890    /// # Panics
891    ///
892    /// Panics if the database was configured with an external graph store and
893    /// the internal arena allocator cannot be initialized (out of memory).
894    ///
895    /// # Examples
896    ///
897    /// ```
898    /// use grafeo_engine::GrafeoDB;
899    ///
900    /// let db = GrafeoDB::new_in_memory();
901    /// let session = db.session();
902    ///
903    /// // Run queries through the session
904    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
905    /// # Ok::<(), grafeo_common::utils::error::Error>(())
906    /// ```
907    #[must_use]
908    pub fn session(&self) -> Session {
909        let session_cfg = || crate::session::SessionConfig {
910            transaction_manager: Arc::clone(&self.transaction_manager),
911            query_cache: Arc::clone(&self.query_cache),
912            catalog: Arc::clone(&self.catalog),
913            adaptive_config: self.config.adaptive.clone(),
914            factorized_execution: self.config.factorized_execution,
915            graph_model: self.config.graph_model,
916            query_timeout: self.config.query_timeout,
917            commit_counter: Arc::clone(&self.commit_counter),
918            gc_interval: self.config.gc_interval,
919            read_only: self.read_only,
920        };
921
922        if let Some(ref ext_store) = self.external_store {
923            return Session::with_external_store(Arc::clone(ext_store), session_cfg())
924                .expect("arena allocation for external store session");
925        }
926
927        #[cfg(feature = "rdf")]
928        let mut session = Session::with_rdf_store_and_adaptive(
929            Arc::clone(&self.store),
930            Arc::clone(&self.rdf_store),
931            session_cfg(),
932        );
933        #[cfg(not(feature = "rdf"))]
934        let mut session = Session::with_adaptive(Arc::clone(&self.store), session_cfg());
935
936        #[cfg(feature = "wal")]
937        if let Some(ref wal) = self.wal {
938            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
939        }
940
941        #[cfg(feature = "cdc")]
942        session.set_cdc_log(Arc::clone(&self.cdc_log));
943
944        #[cfg(feature = "metrics")]
945        {
946            if let Some(ref m) = self.metrics {
947                session.set_metrics(Arc::clone(m));
948                m.session_created
949                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
950                m.session_active
951                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
952            }
953        }
954
955        // Propagate persistent graph context to the new session
956        if let Some(ref graph) = *self.current_graph.read() {
957            session.use_graph(graph);
958        }
959
960        // Propagate persistent schema context to the new session
961        if let Some(ref schema) = *self.current_schema.read() {
962            session.set_schema(schema);
963        }
964
965        // Suppress unused_mut when cdc/wal are disabled
966        let _ = &mut session;
967
968        session
969    }
970
971    /// Returns the current graph name, if any.
972    ///
973    /// This is the persistent graph context used by one-shot `execute()` calls.
974    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
975    /// or `SESSION RESET`.
976    #[must_use]
977    pub fn current_graph(&self) -> Option<String> {
978        self.current_graph.read().clone()
979    }
980
981    /// Sets the current graph context for subsequent one-shot `execute()` calls.
982    ///
983    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
984    /// Pass `None` to reset to the default graph.
985    pub fn set_current_graph(&self, name: Option<&str>) {
986        *self.current_graph.write() = name.map(ToString::to_string);
987    }
988
989    /// Returns the current schema name, if any.
990    ///
991    /// This is the persistent schema context used by one-shot `execute()` calls.
992    /// It is updated whenever `execute()` encounters `SESSION SET SCHEMA` or `SESSION RESET`.
993    #[must_use]
994    pub fn current_schema(&self) -> Option<String> {
995        self.current_schema.read().clone()
996    }
997
998    /// Sets the current schema context for subsequent one-shot `execute()` calls.
999    ///
1000    /// This is equivalent to running `SESSION SET SCHEMA <name>` but without creating
1001    /// a session. Pass `None` to clear the schema context.
1002    pub fn set_current_schema(&self, name: Option<&str>) {
1003        *self.current_schema.write() = name.map(ToString::to_string);
1004    }
1005
1006    /// Returns the adaptive execution configuration.
1007    #[must_use]
1008    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1009        &self.config.adaptive
1010    }
1011
1012    /// Returns `true` if this database was opened in read-only mode.
1013    #[must_use]
1014    pub fn is_read_only(&self) -> bool {
1015        self.read_only
1016    }
1017
1018    /// Returns the configuration.
1019    #[must_use]
1020    pub fn config(&self) -> &Config {
1021        &self.config
1022    }
1023
1024    /// Returns the graph data model of this database.
1025    #[must_use]
1026    pub fn graph_model(&self) -> crate::config::GraphModel {
1027        self.config.graph_model
1028    }
1029
1030    /// Returns the configured memory limit in bytes, if any.
1031    #[must_use]
1032    pub fn memory_limit(&self) -> Option<usize> {
1033        self.config.memory_limit
1034    }
1035
1036    /// Returns a point-in-time snapshot of all metrics.
1037    ///
1038    /// If the `metrics` feature is disabled or the registry is not
1039    /// initialized, returns a default (all-zero) snapshot.
1040    #[cfg(feature = "metrics")]
1041    #[must_use]
1042    pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1043        let mut snapshot = self
1044            .metrics
1045            .as_ref()
1046            .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1047
1048        // Augment with cache stats from the query cache (not tracked in the registry)
1049        let cache_stats = self.query_cache.stats();
1050        snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1051        snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1052        snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1053        snapshot.cache_invalidations = cache_stats.invalidations;
1054
1055        snapshot
1056    }
1057
1058    /// Returns all metrics in Prometheus text exposition format.
1059    ///
1060    /// The output is ready to serve from an HTTP `/metrics` endpoint.
1061    #[cfg(feature = "metrics")]
1062    #[must_use]
1063    pub fn metrics_prometheus(&self) -> String {
1064        self.metrics
1065            .as_ref()
1066            .map_or_else(String::new, |m| m.to_prometheus())
1067    }
1068
1069    /// Resets all metrics counters and histograms to zero.
1070    #[cfg(feature = "metrics")]
1071    pub fn reset_metrics(&self) {
1072        if let Some(ref m) = self.metrics {
1073            m.reset();
1074        }
1075        self.query_cache.reset_stats();
1076    }
1077
1078    /// Returns the underlying (default) store.
1079    ///
1080    /// This provides direct access to the LPG store for algorithm implementations
1081    /// and admin operations (index management, schema introspection, MVCC internals).
1082    ///
1083    /// For code that only needs read/write graph operations, prefer
1084    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
1085    #[must_use]
1086    pub fn store(&self) -> &Arc<LpgStore> {
1087        &self.store
1088    }
1089
1090    /// Returns the LPG store for the currently active graph.
1091    ///
1092    /// If [`current_graph`](Self::current_graph) is `None` or `"default"`, returns
1093    /// the default store. Otherwise looks up the named graph in the root store.
1094    /// Falls back to the default store if the named graph does not exist.
1095    #[allow(dead_code)] // Reserved for future graph-aware CRUD methods
1096    fn active_store(&self) -> Arc<LpgStore> {
1097        let graph_name = self.current_graph.read().clone();
1098        match graph_name {
1099            None => Arc::clone(&self.store),
1100            Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
1101            Some(ref name) => self
1102                .store
1103                .graph(name)
1104                .unwrap_or_else(|| Arc::clone(&self.store)),
1105        }
1106    }
1107
1108    // === Named Graph Management ===
1109
1110    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
1111    ///
1112    /// # Errors
1113    ///
1114    /// Returns an error if arena allocation fails.
1115    pub fn create_graph(&self, name: &str) -> Result<bool> {
1116        Ok(self.store.create_graph(name)?)
1117    }
1118
1119    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
1120    pub fn drop_graph(&self, name: &str) -> bool {
1121        self.store.drop_graph(name)
1122    }
1123
1124    /// Returns all named graph names.
1125    #[must_use]
1126    pub fn list_graphs(&self) -> Vec<String> {
1127        self.store.graph_names()
1128    }
1129
1130    /// Returns the graph store as a trait object.
1131    ///
1132    /// This provides the [`GraphStoreMut`] interface for code that should work
1133    /// with any storage backend. Use this when you only need graph read/write
1134    /// operations and don't need admin methods like index management.
1135    ///
1136    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
1137    #[must_use]
1138    pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
1139        if let Some(ref ext_store) = self.external_store {
1140            Arc::clone(ext_store)
1141        } else {
1142            Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
1143        }
1144    }
1145
1146    /// Garbage collects old MVCC versions that are no longer visible.
1147    ///
1148    /// Determines the minimum epoch required by active transactions and prunes
1149    /// version chains older than that threshold. Also cleans up completed
1150    /// transaction metadata in the transaction manager.
1151    pub fn gc(&self) {
1152        let min_epoch = self.transaction_manager.min_active_epoch();
1153        self.store.gc_versions(min_epoch);
1154        self.transaction_manager.gc();
1155    }
1156
1157    /// Returns the buffer manager for memory-aware operations.
1158    #[must_use]
1159    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1160        &self.buffer_manager
1161    }
1162
1163    /// Returns the query cache.
1164    #[must_use]
1165    pub fn query_cache(&self) -> &Arc<QueryCache> {
1166        &self.query_cache
1167    }
1168
1169    /// Clears all cached query plans.
1170    ///
1171    /// This is called automatically after DDL operations, but can also be
1172    /// invoked manually after external schema changes (e.g., WAL replay,
1173    /// import) or when you want to force re-optimization of all queries.
1174    pub fn clear_plan_cache(&self) {
1175        self.query_cache.clear();
1176    }
1177
1178    // =========================================================================
1179    // Lifecycle
1180    // =========================================================================
1181
1182    /// Closes the database, flushing all pending writes.
1183    ///
1184    /// For persistent databases, this ensures everything is safely on disk.
1185    /// Called automatically when the database is dropped, but you can call
1186    /// it explicitly if you need to guarantee durability at a specific point.
1187    ///
1188    /// # Errors
1189    ///
1190    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
1191    pub fn close(&self) -> Result<()> {
1192        let mut is_open = self.is_open.write();
1193        if !*is_open {
1194            return Ok(());
1195        }
1196
1197        // Read-only databases: just release the shared lock, no checkpointing
1198        if self.read_only {
1199            #[cfg(feature = "grafeo-file")]
1200            if let Some(ref fm) = self.file_manager {
1201                fm.close()?;
1202            }
1203            *is_open = false;
1204            return Ok(());
1205        }
1206
1207        // For single-file format: checkpoint to .grafeo file, then clean up sidecar WAL.
1208        // We must do this BEFORE the WAL close path because checkpoint_to_file
1209        // removes the sidecar WAL directory.
1210        #[cfg(feature = "grafeo-file")]
1211        let is_single_file = self.file_manager.is_some();
1212        #[cfg(not(feature = "grafeo-file"))]
1213        let is_single_file = false;
1214
1215        #[cfg(feature = "grafeo-file")]
1216        if let Some(ref fm) = self.file_manager {
1217            // Flush WAL first so all records are on disk before we snapshot
1218            #[cfg(feature = "wal")]
1219            if let Some(ref wal) = self.wal {
1220                wal.sync()?;
1221            }
1222            self.checkpoint_to_file(fm)?;
1223
1224            // Release WAL file handles before removing sidecar directory.
1225            // On Windows, open handles prevent directory deletion.
1226            #[cfg(feature = "wal")]
1227            if let Some(ref wal) = self.wal {
1228                wal.close_active_log();
1229            }
1230
1231            fm.remove_sidecar_wal()?;
1232            fm.close()?;
1233        }
1234
1235        // Commit and sync WAL (legacy directory format only).
1236        // We intentionally do NOT call wal.checkpoint() here. Directory format
1237        // has no snapshot: the WAL files are the sole source of truth. Writing
1238        // checkpoint.meta would cause recovery to skip older WAL files, losing
1239        // all data that predates the current log sequence.
1240        #[cfg(feature = "wal")]
1241        if !is_single_file && let Some(ref wal) = self.wal {
1242            // Use the last assigned transaction ID, or create one for the commit record
1243            let commit_tx = self
1244                .transaction_manager
1245                .last_assigned_transaction_id()
1246                .unwrap_or_else(|| self.transaction_manager.begin());
1247
1248            // Log a TransactionCommit to mark all pending records as committed
1249            wal.log(&WalRecord::TransactionCommit {
1250                transaction_id: commit_tx,
1251            })?;
1252
1253            wal.sync()?;
1254        }
1255
1256        *is_open = false;
1257        Ok(())
1258    }
1259
1260    /// Returns the typed WAL if available.
1261    #[cfg(feature = "wal")]
1262    #[must_use]
1263    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1264        self.wal.as_ref()
1265    }
1266
1267    /// Logs a WAL record if WAL is enabled.
1268    #[cfg(feature = "wal")]
1269    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1270        if let Some(ref wal) = self.wal {
1271            wal.log(record)?;
1272        }
1273        Ok(())
1274    }
1275
1276    /// Writes the current database snapshot to the `.grafeo` file.
1277    ///
1278    /// Does NOT remove the sidecar WAL: callers that want to clean up
1279    /// the sidecar (e.g. `close()`) should call `fm.remove_sidecar_wal()`
1280    /// separately after this returns.
1281    #[cfg(feature = "grafeo-file")]
1282    fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1283        use grafeo_core::testing::crash::maybe_crash;
1284
1285        maybe_crash("checkpoint_to_file:before_export");
1286        let snapshot_data = self.export_snapshot()?;
1287        maybe_crash("checkpoint_to_file:after_export");
1288
1289        let epoch = self.store.current_epoch();
1290        let transaction_id = self
1291            .transaction_manager
1292            .last_assigned_transaction_id()
1293            .map_or(0, |t| t.0);
1294        let node_count = self.store.node_count() as u64;
1295        let edge_count = self.store.edge_count() as u64;
1296
1297        fm.write_snapshot(
1298            &snapshot_data,
1299            epoch.0,
1300            transaction_id,
1301            node_count,
1302            edge_count,
1303        )?;
1304
1305        maybe_crash("checkpoint_to_file:after_write_snapshot");
1306        Ok(())
1307    }
1308
1309    /// Returns the file manager if using single-file format.
1310    #[cfg(feature = "grafeo-file")]
1311    #[must_use]
1312    pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1313        self.file_manager.as_ref()
1314    }
1315}
1316
1317impl Drop for GrafeoDB {
1318    fn drop(&mut self) {
1319        if let Err(e) = self.close() {
1320            grafeo_error!("Error closing database: {}", e);
1321        }
1322    }
1323}
1324
1325impl crate::admin::AdminService for GrafeoDB {
1326    fn info(&self) -> crate::admin::DatabaseInfo {
1327        self.info()
1328    }
1329
1330    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1331        self.detailed_stats()
1332    }
1333
1334    fn schema(&self) -> crate::admin::SchemaInfo {
1335        self.schema()
1336    }
1337
1338    fn validate(&self) -> crate::admin::ValidationResult {
1339        self.validate()
1340    }
1341
1342    fn wal_status(&self) -> crate::admin::WalStatus {
1343        self.wal_status()
1344    }
1345
1346    fn wal_checkpoint(&self) -> Result<()> {
1347        self.wal_checkpoint()
1348    }
1349}
1350
1351// =========================================================================
1352// Query Result Types
1353// =========================================================================
1354
1355/// The result of running a query.
1356///
1357/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1358/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1359///
1360/// # Examples
1361///
1362/// ```
1363/// use grafeo_engine::GrafeoDB;
1364///
1365/// let db = GrafeoDB::new_in_memory();
1366/// db.create_node(&["Person"]);
1367///
1368/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1369///
1370/// // Check what we got
1371/// println!("Columns: {:?}", result.columns);
1372/// println!("Rows: {}", result.row_count());
1373///
1374/// // Iterate through results
1375/// for row in result.iter() {
1376///     println!("{:?}", row);
1377/// }
1378/// # Ok::<(), grafeo_common::utils::error::Error>(())
1379/// ```
1380#[derive(Debug)]
1381pub struct QueryResult {
1382    /// Column names from the RETURN clause.
1383    pub columns: Vec<String>,
1384    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1385    pub column_types: Vec<grafeo_common::types::LogicalType>,
1386    /// The actual result rows.
1387    pub rows: Vec<Vec<grafeo_common::types::Value>>,
1388    /// Query execution time in milliseconds (if timing was enabled).
1389    pub execution_time_ms: Option<f64>,
1390    /// Number of rows scanned during query execution (estimate).
1391    pub rows_scanned: Option<u64>,
1392    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
1393    pub status_message: Option<String>,
1394    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
1395    pub gql_status: grafeo_common::utils::GqlStatus,
1396}
1397
1398impl QueryResult {
1399    /// Creates a fully empty query result (no columns, no rows).
1400    #[must_use]
1401    pub fn empty() -> Self {
1402        Self {
1403            columns: Vec::new(),
1404            column_types: Vec::new(),
1405            rows: Vec::new(),
1406            execution_time_ms: None,
1407            rows_scanned: None,
1408            status_message: None,
1409            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1410        }
1411    }
1412
1413    /// Creates a query result with only a status message (for DDL commands).
1414    #[must_use]
1415    pub fn status(msg: impl Into<String>) -> Self {
1416        Self {
1417            columns: Vec::new(),
1418            column_types: Vec::new(),
1419            rows: Vec::new(),
1420            execution_time_ms: None,
1421            rows_scanned: None,
1422            status_message: Some(msg.into()),
1423            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1424        }
1425    }
1426
1427    /// Creates a new empty query result.
1428    #[must_use]
1429    pub fn new(columns: Vec<String>) -> Self {
1430        let len = columns.len();
1431        Self {
1432            columns,
1433            column_types: vec![grafeo_common::types::LogicalType::Any; len],
1434            rows: Vec::new(),
1435            execution_time_ms: None,
1436            rows_scanned: None,
1437            status_message: None,
1438            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1439        }
1440    }
1441
1442    /// Creates a new empty query result with column types.
1443    #[must_use]
1444    pub fn with_types(
1445        columns: Vec<String>,
1446        column_types: Vec<grafeo_common::types::LogicalType>,
1447    ) -> Self {
1448        Self {
1449            columns,
1450            column_types,
1451            rows: Vec::new(),
1452            execution_time_ms: None,
1453            rows_scanned: None,
1454            status_message: None,
1455            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1456        }
1457    }
1458
1459    /// Sets the execution metrics on this result.
1460    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1461        self.execution_time_ms = Some(execution_time_ms);
1462        self.rows_scanned = Some(rows_scanned);
1463        self
1464    }
1465
1466    /// Returns the execution time in milliseconds, if available.
1467    #[must_use]
1468    pub fn execution_time_ms(&self) -> Option<f64> {
1469        self.execution_time_ms
1470    }
1471
1472    /// Returns the number of rows scanned, if available.
1473    #[must_use]
1474    pub fn rows_scanned(&self) -> Option<u64> {
1475        self.rows_scanned
1476    }
1477
1478    /// Returns the number of rows.
1479    #[must_use]
1480    pub fn row_count(&self) -> usize {
1481        self.rows.len()
1482    }
1483
1484    /// Returns the number of columns.
1485    #[must_use]
1486    pub fn column_count(&self) -> usize {
1487        self.columns.len()
1488    }
1489
1490    /// Returns true if the result is empty.
1491    #[must_use]
1492    pub fn is_empty(&self) -> bool {
1493        self.rows.is_empty()
1494    }
1495
1496    /// Extracts a single value from the result.
1497    ///
1498    /// Use this when your query returns exactly one row with one column,
1499    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1500    ///
1501    /// # Errors
1502    ///
1503    /// Returns an error if the result has multiple rows or columns.
1504    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1505        if self.rows.len() != 1 || self.columns.len() != 1 {
1506            return Err(grafeo_common::utils::error::Error::InvalidValue(
1507                "Expected single value".to_string(),
1508            ));
1509        }
1510        T::from_value(&self.rows[0][0])
1511    }
1512
1513    /// Returns an iterator over the rows.
1514    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1515        self.rows.iter()
1516    }
1517}
1518
1519impl std::fmt::Display for QueryResult {
1520    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1521        let table = grafeo_common::fmt::format_result_table(
1522            &self.columns,
1523            &self.rows,
1524            self.execution_time_ms,
1525            self.status_message.as_deref(),
1526        );
1527        f.write_str(&table)
1528    }
1529}
1530
1531/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
1532///
1533/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1534/// Used by [`QueryResult::scalar()`] to extract typed values.
1535pub trait FromValue: Sized {
1536    /// Attempts the conversion, returning an error on type mismatch.
1537    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1538}
1539
1540impl FromValue for i64 {
1541    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1542        value
1543            .as_int64()
1544            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1545                expected: "INT64".to_string(),
1546                found: value.type_name().to_string(),
1547            })
1548    }
1549}
1550
1551impl FromValue for f64 {
1552    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1553        value
1554            .as_float64()
1555            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1556                expected: "FLOAT64".to_string(),
1557                found: value.type_name().to_string(),
1558            })
1559    }
1560}
1561
1562impl FromValue for String {
1563    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1564        value.as_str().map(String::from).ok_or_else(|| {
1565            grafeo_common::utils::error::Error::TypeMismatch {
1566                expected: "STRING".to_string(),
1567                found: value.type_name().to_string(),
1568            }
1569        })
1570    }
1571}
1572
1573impl FromValue for bool {
1574    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1575        value
1576            .as_bool()
1577            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1578                expected: "BOOL".to_string(),
1579                found: value.type_name().to_string(),
1580            })
1581    }
1582}
1583
1584#[cfg(test)]
1585mod tests {
1586    use super::*;
1587
1588    #[test]
1589    fn test_create_in_memory_database() {
1590        let db = GrafeoDB::new_in_memory();
1591        assert_eq!(db.node_count(), 0);
1592        assert_eq!(db.edge_count(), 0);
1593    }
1594
1595    #[test]
1596    fn test_database_config() {
1597        let config = Config::in_memory().with_threads(4).with_query_logging();
1598
1599        let db = GrafeoDB::with_config(config).unwrap();
1600        assert_eq!(db.config().threads, 4);
1601        assert!(db.config().query_logging);
1602    }
1603
1604    #[test]
1605    fn test_database_session() {
1606        let db = GrafeoDB::new_in_memory();
1607        let _session = db.session();
1608        // Session should be created successfully
1609    }
1610
1611    #[cfg(feature = "wal")]
1612    #[test]
1613    fn test_persistent_database_recovery() {
1614        use grafeo_common::types::Value;
1615        use tempfile::tempdir;
1616
1617        let dir = tempdir().unwrap();
1618        let db_path = dir.path().join("test_db");
1619
1620        // Create database and add some data
1621        {
1622            let db = GrafeoDB::open(&db_path).unwrap();
1623
1624            let alix = db.create_node(&["Person"]);
1625            db.set_node_property(alix, "name", Value::from("Alix"));
1626
1627            let gus = db.create_node(&["Person"]);
1628            db.set_node_property(gus, "name", Value::from("Gus"));
1629
1630            let _edge = db.create_edge(alix, gus, "KNOWS");
1631
1632            // Explicitly close to flush WAL
1633            db.close().unwrap();
1634        }
1635
1636        // Reopen and verify data was recovered
1637        {
1638            let db = GrafeoDB::open(&db_path).unwrap();
1639
1640            assert_eq!(db.node_count(), 2);
1641            assert_eq!(db.edge_count(), 1);
1642
1643            // Verify nodes exist
1644            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1645            assert!(node0.is_some());
1646
1647            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1648            assert!(node1.is_some());
1649        }
1650    }
1651
1652    #[cfg(feature = "wal")]
1653    #[test]
1654    fn test_wal_logging() {
1655        use tempfile::tempdir;
1656
1657        let dir = tempdir().unwrap();
1658        let db_path = dir.path().join("wal_test_db");
1659
1660        let db = GrafeoDB::open(&db_path).unwrap();
1661
1662        // Create some data
1663        let node = db.create_node(&["Test"]);
1664        db.delete_node(node);
1665
1666        // WAL should have records
1667        if let Some(wal) = db.wal() {
1668            assert!(wal.record_count() > 0);
1669        }
1670
1671        db.close().unwrap();
1672    }
1673
1674    #[cfg(feature = "wal")]
1675    #[test]
1676    fn test_wal_recovery_multiple_sessions() {
1677        // Tests that WAL recovery works correctly across multiple open/close cycles
1678        use grafeo_common::types::Value;
1679        use tempfile::tempdir;
1680
1681        let dir = tempdir().unwrap();
1682        let db_path = dir.path().join("multi_session_db");
1683
1684        // Session 1: Create initial data
1685        {
1686            let db = GrafeoDB::open(&db_path).unwrap();
1687            let alix = db.create_node(&["Person"]);
1688            db.set_node_property(alix, "name", Value::from("Alix"));
1689            db.close().unwrap();
1690        }
1691
1692        // Session 2: Add more data
1693        {
1694            let db = GrafeoDB::open(&db_path).unwrap();
1695            assert_eq!(db.node_count(), 1); // Previous data recovered
1696            let gus = db.create_node(&["Person"]);
1697            db.set_node_property(gus, "name", Value::from("Gus"));
1698            db.close().unwrap();
1699        }
1700
1701        // Session 3: Verify all data
1702        {
1703            let db = GrafeoDB::open(&db_path).unwrap();
1704            assert_eq!(db.node_count(), 2);
1705
1706            // Verify properties were recovered correctly
1707            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1708            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1709
1710            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1711            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1712        }
1713    }
1714
1715    #[cfg(feature = "wal")]
1716    #[test]
1717    fn test_database_consistency_after_mutations() {
1718        // Tests that database remains consistent after a series of create/delete operations
1719        use grafeo_common::types::Value;
1720        use tempfile::tempdir;
1721
1722        let dir = tempdir().unwrap();
1723        let db_path = dir.path().join("consistency_db");
1724
1725        {
1726            let db = GrafeoDB::open(&db_path).unwrap();
1727
1728            // Create nodes
1729            let a = db.create_node(&["Node"]);
1730            let b = db.create_node(&["Node"]);
1731            let c = db.create_node(&["Node"]);
1732
1733            // Create edges
1734            let e1 = db.create_edge(a, b, "LINKS");
1735            let _e2 = db.create_edge(b, c, "LINKS");
1736
1737            // Delete middle node and its edge
1738            db.delete_edge(e1);
1739            db.delete_node(b);
1740
1741            // Set properties on remaining nodes
1742            db.set_node_property(a, "value", Value::Int64(1));
1743            db.set_node_property(c, "value", Value::Int64(3));
1744
1745            db.close().unwrap();
1746        }
1747
1748        // Reopen and verify consistency
1749        {
1750            let db = GrafeoDB::open(&db_path).unwrap();
1751
1752            // Should have 2 nodes (a and c), b was deleted
1753            // Note: node_count includes deleted nodes in some implementations
1754            // What matters is that the non-deleted nodes are accessible
1755            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1756            assert!(node_a.is_some());
1757
1758            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1759            assert!(node_c.is_some());
1760
1761            // Middle node should be deleted
1762            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1763            assert!(node_b.is_none());
1764        }
1765    }
1766
1767    #[cfg(feature = "wal")]
1768    #[test]
1769    fn test_close_is_idempotent() {
1770        // Calling close() multiple times should not cause errors
1771        use tempfile::tempdir;
1772
1773        let dir = tempdir().unwrap();
1774        let db_path = dir.path().join("close_test_db");
1775
1776        let db = GrafeoDB::open(&db_path).unwrap();
1777        db.create_node(&["Test"]);
1778
1779        // First close should succeed
1780        assert!(db.close().is_ok());
1781
1782        // Second close should also succeed (idempotent)
1783        assert!(db.close().is_ok());
1784    }
1785
1786    #[test]
1787    fn test_with_store_external_backend() {
1788        use grafeo_core::graph::lpg::LpgStore;
1789
1790        let external = Arc::new(LpgStore::new().unwrap());
1791
1792        // Seed data on the external store directly
1793        let n1 = external.create_node(&["Person"]);
1794        external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
1795
1796        let db = GrafeoDB::with_store(
1797            Arc::clone(&external) as Arc<dyn GraphStoreMut>,
1798            Config::in_memory(),
1799        )
1800        .unwrap();
1801
1802        let session = db.session();
1803
1804        // Session should see data from the external store via execute
1805        #[cfg(feature = "gql")]
1806        {
1807            let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
1808            assert_eq!(result.rows.len(), 1);
1809        }
1810    }
1811
1812    #[test]
1813    fn test_with_config_custom_memory_limit() {
1814        let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); // 64 MB
1815
1816        let db = GrafeoDB::with_config(config).unwrap();
1817        assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
1818        assert_eq!(db.node_count(), 0);
1819    }
1820
1821    #[cfg(feature = "metrics")]
1822    #[test]
1823    fn test_database_metrics_registry() {
1824        let db = GrafeoDB::new_in_memory();
1825
1826        // Perform some operations
1827        db.create_node(&["Person"]);
1828        db.create_node(&["Person"]);
1829
1830        // Check that metrics snapshot returns data
1831        let snap = db.metrics();
1832        // Session created counter should reflect at least 0 (metrics is initialized)
1833        assert_eq!(snap.query_count, 0); // No queries executed yet
1834    }
1835
1836    #[test]
1837    fn test_query_result_has_metrics() {
1838        // Verifies that query results include execution metrics
1839        let db = GrafeoDB::new_in_memory();
1840        db.create_node(&["Person"]);
1841        db.create_node(&["Person"]);
1842
1843        #[cfg(feature = "gql")]
1844        {
1845            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1846
1847            // Metrics should be populated
1848            assert!(result.execution_time_ms.is_some());
1849            assert!(result.rows_scanned.is_some());
1850            assert!(result.execution_time_ms.unwrap() >= 0.0);
1851            assert_eq!(result.rows_scanned.unwrap(), 2);
1852        }
1853    }
1854
1855    #[test]
1856    fn test_empty_query_result_metrics() {
1857        // Verifies metrics are correct for queries returning no results
1858        let db = GrafeoDB::new_in_memory();
1859        db.create_node(&["Person"]);
1860
1861        #[cfg(feature = "gql")]
1862        {
1863            // Query that matches nothing
1864            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1865
1866            assert!(result.execution_time_ms.is_some());
1867            assert!(result.rows_scanned.is_some());
1868            assert_eq!(result.rows_scanned.unwrap(), 0);
1869        }
1870    }
1871
1872    #[cfg(feature = "cdc")]
1873    mod cdc_integration {
1874        use super::*;
1875
1876        #[test]
1877        fn test_node_lifecycle_history() {
1878            let db = GrafeoDB::new_in_memory();
1879
1880            // Create
1881            let id = db.create_node(&["Person"]);
1882            // Update
1883            db.set_node_property(id, "name", "Alix".into());
1884            db.set_node_property(id, "name", "Gus".into());
1885            // Delete
1886            db.delete_node(id);
1887
1888            let history = db.history(id).unwrap();
1889            assert_eq!(history.len(), 4); // create + 2 updates + delete
1890            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1891            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1892            assert!(history[1].before.is_none()); // first set_node_property has no prior value
1893            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1894            assert!(history[2].before.is_some()); // second update has prior "Alix"
1895            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1896        }
1897
1898        #[test]
1899        fn test_edge_lifecycle_history() {
1900            let db = GrafeoDB::new_in_memory();
1901
1902            let alix = db.create_node(&["Person"]);
1903            let gus = db.create_node(&["Person"]);
1904            let edge = db.create_edge(alix, gus, "KNOWS");
1905            db.set_edge_property(edge, "since", 2024i64.into());
1906            db.delete_edge(edge);
1907
1908            let history = db.history(edge).unwrap();
1909            assert_eq!(history.len(), 3); // create + update + delete
1910            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1911            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1912            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1913        }
1914
1915        #[test]
1916        fn test_create_node_with_props_cdc() {
1917            let db = GrafeoDB::new_in_memory();
1918
1919            let id = db.create_node_with_props(
1920                &["Person"],
1921                vec![
1922                    ("name", grafeo_common::types::Value::from("Alix")),
1923                    ("age", grafeo_common::types::Value::from(30i64)),
1924                ],
1925            );
1926
1927            let history = db.history(id).unwrap();
1928            assert_eq!(history.len(), 1);
1929            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1930            // Props should be captured
1931            let after = history[0].after.as_ref().unwrap();
1932            assert_eq!(after.len(), 2);
1933        }
1934
1935        #[test]
1936        fn test_changes_between() {
1937            let db = GrafeoDB::new_in_memory();
1938
1939            let id1 = db.create_node(&["A"]);
1940            let _id2 = db.create_node(&["B"]);
1941            db.set_node_property(id1, "x", 1i64.into());
1942
1943            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
1944            let changes = db
1945                .changes_between(
1946                    grafeo_common::types::EpochId(0),
1947                    grafeo_common::types::EpochId(u64::MAX),
1948                )
1949                .unwrap();
1950            assert_eq!(changes.len(), 3); // 2 creates + 1 update
1951        }
1952    }
1953
1954    #[test]
1955    fn test_with_store_basic() {
1956        use grafeo_core::graph::lpg::LpgStore;
1957
1958        let store = Arc::new(LpgStore::new().unwrap());
1959        let n1 = store.create_node(&["Person"]);
1960        store.set_node_property(n1, "name", "Alix".into());
1961
1962        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1963        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1964
1965        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1966        assert_eq!(result.rows.len(), 1);
1967    }
1968
1969    #[test]
1970    fn test_with_store_session() {
1971        use grafeo_core::graph::lpg::LpgStore;
1972
1973        let store = Arc::new(LpgStore::new().unwrap());
1974        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1975        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1976
1977        let session = db.session();
1978        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1979        assert_eq!(result.rows.len(), 1);
1980    }
1981
1982    #[test]
1983    fn test_with_store_mutations() {
1984        use grafeo_core::graph::lpg::LpgStore;
1985
1986        let store = Arc::new(LpgStore::new().unwrap());
1987        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1988        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1989
1990        let mut session = db.session();
1991
1992        // Use an explicit transaction so INSERT and MATCH share the same
1993        // transaction context. With PENDING epochs, uncommitted versions are
1994        // only visible to the owning transaction.
1995        session.begin_transaction().unwrap();
1996        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1997
1998        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1999        assert_eq!(result.rows.len(), 1);
2000
2001        session.commit().unwrap();
2002    }
2003}