Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21#[cfg(feature = "rdf")]
22mod rdf_ops;
23mod search;
24#[cfg(feature = "wal")]
25pub(crate) mod wal_store;
26
27use grafeo_common::grafeo_error;
28#[cfg(feature = "wal")]
29use std::path::Path;
30use std::sync::Arc;
31use std::sync::atomic::AtomicUsize;
32
33use parking_lot::RwLock;
34
35#[cfg(feature = "grafeo-file")]
36use grafeo_adapters::storage::file::GrafeoFileManager;
37#[cfg(feature = "wal")]
38use grafeo_adapters::storage::wal::{
39    DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
40};
41use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
42use grafeo_common::utils::error::Result;
43use grafeo_core::graph::GraphStoreMut;
44use grafeo_core::graph::lpg::LpgStore;
45#[cfg(feature = "rdf")]
46use grafeo_core::graph::rdf::RdfStore;
47
48use crate::catalog::Catalog;
49use crate::config::Config;
50use crate::query::cache::QueryCache;
51use crate::session::Session;
52use crate::transaction::TransactionManager;
53
54/// Your handle to a Grafeo database.
55///
56/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
57/// quick experiments, or [`open()`](Self::open) for persistent storage.
58/// Then grab a [`session()`](Self::session) to start querying.
59///
60/// # Examples
61///
62/// ```
63/// use grafeo_engine::GrafeoDB;
64///
65/// // Quick in-memory database
66/// let db = GrafeoDB::new_in_memory();
67///
68/// // Add some data
69/// db.create_node(&["Person"]);
70///
71/// // Query it
72/// let session = db.session();
73/// let result = session.execute("MATCH (p:Person) RETURN p")?;
74/// # Ok::<(), grafeo_common::utils::error::Error>(())
75/// ```
76pub struct GrafeoDB {
77    /// Database configuration.
78    pub(super) config: Config,
79    /// The underlying graph store.
80    pub(super) store: Arc<LpgStore>,
81    /// Schema and metadata catalog shared across sessions.
82    pub(super) catalog: Arc<Catalog>,
83    /// RDF triple store (if RDF feature is enabled).
84    #[cfg(feature = "rdf")]
85    pub(super) rdf_store: Arc<RdfStore>,
86    /// Transaction manager.
87    pub(super) transaction_manager: Arc<TransactionManager>,
88    /// Unified buffer manager.
89    pub(super) buffer_manager: Arc<BufferManager>,
90    /// Write-ahead log manager (if durability is enabled).
91    #[cfg(feature = "wal")]
92    pub(super) wal: Option<Arc<LpgWal>>,
93    /// Shared WAL graph context tracker. Tracks which named graph was last
94    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
95    /// records only when the context actually changes.
96    #[cfg(feature = "wal")]
97    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
98    /// Query cache for parsed and optimized plans.
99    pub(super) query_cache: Arc<QueryCache>,
100    /// Shared commit counter for auto-GC across sessions.
101    pub(super) commit_counter: Arc<AtomicUsize>,
102    /// Whether the database is open.
103    pub(super) is_open: RwLock<bool>,
104    /// Change data capture log for tracking mutations.
105    #[cfg(feature = "cdc")]
106    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
107    /// Registered embedding models for text-to-vector conversion.
108    #[cfg(feature = "embed")]
109    pub(super) embedding_models:
110        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
111    /// Single-file database manager (when using `.grafeo` format).
112    #[cfg(feature = "grafeo-file")]
113    pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
114    /// External graph store (when using with_store()).
115    /// When set, sessions route queries through this store instead of the built-in LpgStore.
116    pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
117    /// Metrics registry shared across all sessions.
118    #[cfg(feature = "metrics")]
119    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
120    /// Persistent graph context for one-shot `execute()` calls.
121    /// When set, each call to `session()` pre-configures the session to this graph.
122    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
123    current_graph: RwLock<Option<String>>,
124    /// Persistent schema context for one-shot `execute()` calls.
125    /// When set, each call to `session()` pre-configures the session to this schema.
126    /// Updated after every one-shot `execute()` to reflect `SESSION SET SCHEMA` / `SESSION RESET`.
127    current_schema: RwLock<Option<String>>,
128    /// Whether this database is open in read-only mode.
129    /// When true, sessions automatically enforce read-only transactions.
130    read_only: bool,
131}
132
133impl GrafeoDB {
134    /// Creates an in-memory database, fast to create, gone when dropped.
135    ///
136    /// Use this for tests, experiments, or when you don't need persistence.
137    /// For data that survives restarts, use [`open()`](Self::open) instead.
138    ///
139    /// # Panics
140    ///
141    /// Panics if the internal arena allocator cannot be initialized (out of memory).
142    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
143    ///
144    /// # Examples
145    ///
146    /// ```
147    /// use grafeo_engine::GrafeoDB;
148    ///
149    /// let db = GrafeoDB::new_in_memory();
150    /// let session = db.session();
151    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
152    /// # Ok::<(), grafeo_common::utils::error::Error>(())
153    /// ```
154    #[must_use]
155    pub fn new_in_memory() -> Self {
156        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
157    }
158
159    /// Opens a database at the given path, creating it if it doesn't exist.
160    ///
161    /// If you've used this path before, Grafeo recovers your data from the
162    /// write-ahead log automatically. First open on a new path creates an
163    /// empty database.
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if the path isn't writable or recovery fails.
168    ///
169    /// # Examples
170    ///
171    /// ```no_run
172    /// use grafeo_engine::GrafeoDB;
173    ///
174    /// let db = GrafeoDB::open("./my_social_network")?;
175    /// # Ok::<(), grafeo_common::utils::error::Error>(())
176    /// ```
177    #[cfg(feature = "wal")]
178    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
179        Self::with_config(Config::persistent(path.as_ref()))
180    }
181
182    /// Opens an existing database in read-only mode.
183    ///
184    /// Uses a shared file lock, so multiple processes can read the same
185    /// `.grafeo` file concurrently. The database loads the last checkpoint
186    /// snapshot but does **not** replay the WAL or allow mutations.
187    ///
188    /// Currently only supports the single-file (`.grafeo`) format.
189    ///
190    /// # Errors
191    ///
192    /// Returns an error if the file doesn't exist or can't be read.
193    ///
194    /// # Examples
195    ///
196    /// ```no_run
197    /// use grafeo_engine::GrafeoDB;
198    ///
199    /// let db = GrafeoDB::open_read_only("./my_graph.grafeo")?;
200    /// let session = db.session();
201    /// let result = session.execute("MATCH (n) RETURN n LIMIT 10")?;
202    /// // Mutations will return an error:
203    /// // session.execute("INSERT (:Person)") => Err(ReadOnly)
204    /// # Ok::<(), grafeo_common::utils::error::Error>(())
205    /// ```
206    #[cfg(feature = "grafeo-file")]
207    pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
208        Self::with_config(Config::read_only(path.as_ref()))
209    }
210
211    /// Creates a database with custom configuration.
212    ///
213    /// Use this when you need fine-grained control over memory limits,
214    /// thread counts, or persistence settings. For most cases,
215    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
216    /// are simpler.
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if the database can't be created or recovery fails.
221    ///
222    /// # Examples
223    ///
224    /// ```
225    /// use grafeo_engine::{GrafeoDB, Config};
226    ///
227    /// // In-memory with a 512MB limit
228    /// let config = Config::in_memory()
229    ///     .with_memory_limit(512 * 1024 * 1024);
230    ///
231    /// let db = GrafeoDB::with_config(config)?;
232    /// # Ok::<(), grafeo_common::utils::error::Error>(())
233    /// ```
234    pub fn with_config(config: Config) -> Result<Self> {
235        // Validate configuration before proceeding
236        config
237            .validate()
238            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
239
240        let store = Arc::new(LpgStore::new()?);
241        #[cfg(feature = "rdf")]
242        let rdf_store = Arc::new(RdfStore::new());
243        let transaction_manager = Arc::new(TransactionManager::new());
244
245        // Create buffer manager with configured limits
246        let buffer_config = BufferManagerConfig {
247            budget: config.memory_limit.unwrap_or_else(|| {
248                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
249            }),
250            spill_path: config
251                .spill_path
252                .clone()
253                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
254            ..BufferManagerConfig::default()
255        };
256        let buffer_manager = BufferManager::new(buffer_config);
257
258        // Create catalog early so WAL replay can restore schema definitions
259        let catalog = Arc::new(Catalog::new());
260
261        let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
262
263        // --- Single-file format (.grafeo) ---
264        #[cfg(feature = "grafeo-file")]
265        let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
266            // Read-only mode: open with shared lock, load snapshot, skip WAL
267            if let Some(ref db_path) = config.path {
268                if db_path.exists() && db_path.is_file() {
269                    let fm = GrafeoFileManager::open_read_only(db_path)?;
270                    let snapshot_data = fm.read_snapshot()?;
271                    if !snapshot_data.is_empty() {
272                        Self::apply_snapshot_data(
273                            &store,
274                            &catalog,
275                            #[cfg(feature = "rdf")]
276                            &rdf_store,
277                            &snapshot_data,
278                        )?;
279                    }
280                    Some(Arc::new(fm))
281                } else {
282                    return Err(grafeo_common::utils::error::Error::Internal(format!(
283                        "read-only open requires an existing .grafeo file: {}",
284                        db_path.display()
285                    )));
286                }
287            } else {
288                return Err(grafeo_common::utils::error::Error::Internal(
289                    "read-only mode requires a database path".to_string(),
290                ));
291            }
292        } else if let Some(ref db_path) = config.path {
293            // Initialize the file manager whenever single-file format is selected,
294            // regardless of whether WAL is enabled. Without this, a database opened
295            // with wal_enabled:false + StorageFormat::SingleFile would produce no
296            // output at all (the file manager was previously gated behind wal_enabled).
297            if Self::should_use_single_file(db_path, config.storage_format) {
298                let fm = if db_path.exists() && db_path.is_file() {
299                    GrafeoFileManager::open(db_path)?
300                } else if !db_path.exists() {
301                    GrafeoFileManager::create(db_path)?
302                } else {
303                    // Path exists but is not a file (directory, etc.)
304                    return Err(grafeo_common::utils::error::Error::Internal(format!(
305                        "path exists but is not a file: {}",
306                        db_path.display()
307                    )));
308                };
309
310                // Load snapshot data from the file
311                let snapshot_data = fm.read_snapshot()?;
312                if !snapshot_data.is_empty() {
313                    Self::apply_snapshot_data(
314                        &store,
315                        &catalog,
316                        #[cfg(feature = "rdf")]
317                        &rdf_store,
318                        &snapshot_data,
319                    )?;
320                }
321
322                // Recover sidecar WAL if WAL is enabled and a sidecar exists
323                #[cfg(feature = "wal")]
324                if config.wal_enabled && fm.has_sidecar_wal() {
325                    let recovery = WalRecovery::new(fm.sidecar_wal_path());
326                    let records = recovery.recover()?;
327                    Self::apply_wal_records(
328                        &store,
329                        &catalog,
330                        #[cfg(feature = "rdf")]
331                        &rdf_store,
332                        &records,
333                    )?;
334                }
335
336                Some(Arc::new(fm))
337            } else {
338                None
339            }
340        } else {
341            None
342        };
343
344        // Determine whether to use the WAL directory path (legacy) or sidecar
345        // Read-only mode skips WAL entirely (no recovery, no creation).
346        #[cfg(feature = "wal")]
347        let wal = if is_read_only {
348            None
349        } else if config.wal_enabled {
350            if let Some(ref db_path) = config.path {
351                // When using single-file format, the WAL is a sidecar directory
352                #[cfg(feature = "grafeo-file")]
353                let wal_path = if let Some(ref fm) = file_manager {
354                    let p = fm.sidecar_wal_path();
355                    std::fs::create_dir_all(&p)?;
356                    p
357                } else {
358                    // Legacy: WAL inside the database directory
359                    std::fs::create_dir_all(db_path)?;
360                    db_path.join("wal")
361                };
362
363                #[cfg(not(feature = "grafeo-file"))]
364                let wal_path = {
365                    std::fs::create_dir_all(db_path)?;
366                    db_path.join("wal")
367                };
368
369                // For legacy WAL directory format, check if WAL exists and recover
370                #[cfg(feature = "grafeo-file")]
371                let is_single_file = file_manager.is_some();
372                #[cfg(not(feature = "grafeo-file"))]
373                let is_single_file = false;
374
375                if !is_single_file && wal_path.exists() {
376                    let recovery = WalRecovery::new(&wal_path);
377                    let records = recovery.recover()?;
378                    Self::apply_wal_records(
379                        &store,
380                        &catalog,
381                        #[cfg(feature = "rdf")]
382                        &rdf_store,
383                        &records,
384                    )?;
385                }
386
387                // Open/create WAL manager with configured durability
388                let wal_durability = match config.wal_durability {
389                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
390                    crate::config::DurabilityMode::Batch {
391                        max_delay_ms,
392                        max_records,
393                    } => WalDurabilityMode::Batch {
394                        max_delay_ms,
395                        max_records,
396                    },
397                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
398                        WalDurabilityMode::Adaptive { target_interval_ms }
399                    }
400                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
401                };
402                let wal_config = WalConfig {
403                    durability: wal_durability,
404                    ..WalConfig::default()
405                };
406                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
407                Some(Arc::new(wal_manager))
408            } else {
409                None
410            }
411        } else {
412            None
413        };
414
415        // Create query cache with default capacity (1000 queries)
416        let query_cache = Arc::new(QueryCache::default());
417
418        // After all snapshot/WAL recovery, sync TransactionManager epoch
419        // with the store so queries use the correct viewing epoch.
420        #[cfg(feature = "temporal")]
421        transaction_manager.sync_epoch(store.current_epoch());
422
423        Ok(Self {
424            config,
425            store,
426            catalog,
427            #[cfg(feature = "rdf")]
428            rdf_store,
429            transaction_manager,
430            buffer_manager,
431            #[cfg(feature = "wal")]
432            wal,
433            #[cfg(feature = "wal")]
434            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
435            query_cache,
436            commit_counter: Arc::new(AtomicUsize::new(0)),
437            is_open: RwLock::new(true),
438            #[cfg(feature = "cdc")]
439            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
440            #[cfg(feature = "embed")]
441            embedding_models: RwLock::new(hashbrown::HashMap::new()),
442            #[cfg(feature = "grafeo-file")]
443            file_manager,
444            external_store: None,
445            #[cfg(feature = "metrics")]
446            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
447            current_graph: RwLock::new(None),
448            current_schema: RwLock::new(None),
449            read_only: is_read_only,
450        })
451    }
452
453    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
454    ///
455    /// The external store handles all data persistence. WAL, CDC, and index
456    /// management are the responsibility of the store implementation.
457    ///
458    /// Query execution (all 6 languages, optimizer, planner) works through the
459    /// provided store. Admin operations (schema introspection, persistence,
460    /// vector/text indexes) are not available on external stores.
461    ///
462    /// # Examples
463    ///
464    /// ```no_run
465    /// use std::sync::Arc;
466    /// use grafeo_engine::{GrafeoDB, Config};
467    /// use grafeo_core::graph::GraphStoreMut;
468    ///
469    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
470    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
471    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
472    ///     Ok(())
473    /// }
474    /// ```
475    ///
476    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
477    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
478        config
479            .validate()
480            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
481
482        let dummy_store = Arc::new(LpgStore::new()?);
483        let transaction_manager = Arc::new(TransactionManager::new());
484
485        let buffer_config = BufferManagerConfig {
486            budget: config.memory_limit.unwrap_or_else(|| {
487                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
488            }),
489            spill_path: None,
490            ..BufferManagerConfig::default()
491        };
492        let buffer_manager = BufferManager::new(buffer_config);
493
494        let query_cache = Arc::new(QueryCache::default());
495
496        Ok(Self {
497            config,
498            store: dummy_store,
499            catalog: Arc::new(Catalog::new()),
500            #[cfg(feature = "rdf")]
501            rdf_store: Arc::new(RdfStore::new()),
502            transaction_manager,
503            buffer_manager,
504            #[cfg(feature = "wal")]
505            wal: None,
506            #[cfg(feature = "wal")]
507            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
508            query_cache,
509            commit_counter: Arc::new(AtomicUsize::new(0)),
510            is_open: RwLock::new(true),
511            #[cfg(feature = "cdc")]
512            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
513            #[cfg(feature = "embed")]
514            embedding_models: RwLock::new(hashbrown::HashMap::new()),
515            #[cfg(feature = "grafeo-file")]
516            file_manager: None,
517            external_store: Some(store),
518            #[cfg(feature = "metrics")]
519            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
520            current_graph: RwLock::new(None),
521            current_schema: RwLock::new(None),
522            read_only: false,
523        })
524    }
525
526    /// Applies WAL records to restore the database state.
527    ///
528    /// Data mutation records are routed through a graph cursor that tracks
529    /// `SwitchGraph` context markers, replaying mutations into the correct
530    /// named graph (or the default graph when cursor is `None`).
531    #[cfg(feature = "wal")]
532    fn apply_wal_records(
533        store: &Arc<LpgStore>,
534        catalog: &Catalog,
535        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
536        records: &[WalRecord],
537    ) -> Result<()> {
538        use crate::catalog::{
539            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
540        };
541        use grafeo_common::utils::error::Error;
542
543        // Graph cursor: tracks which named graph receives data mutations.
544        // `None` means the default graph.
545        let mut current_graph: Option<String> = None;
546        let mut target_store: Arc<LpgStore> = Arc::clone(store);
547
548        for record in records {
549            match record {
550                // --- Named graph lifecycle ---
551                WalRecord::CreateNamedGraph { name } => {
552                    let _ = store.create_graph(name);
553                }
554                WalRecord::DropNamedGraph { name } => {
555                    store.drop_graph(name);
556                    // Reset cursor if the dropped graph was active
557                    if current_graph.as_deref() == Some(name.as_str()) {
558                        current_graph = None;
559                        target_store = Arc::clone(store);
560                    }
561                }
562                WalRecord::SwitchGraph { name } => {
563                    current_graph.clone_from(name);
564                    target_store = match &current_graph {
565                        None => Arc::clone(store),
566                        Some(graph_name) => store
567                            .graph_or_create(graph_name)
568                            .map_err(|e| Error::Internal(e.to_string()))?,
569                    };
570                }
571
572                // --- Data mutations: routed through target_store ---
573                WalRecord::CreateNode { id, labels } => {
574                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
575                    target_store.create_node_with_id(*id, &label_refs)?;
576                }
577                WalRecord::DeleteNode { id } => {
578                    target_store.delete_node(*id);
579                }
580                WalRecord::CreateEdge {
581                    id,
582                    src,
583                    dst,
584                    edge_type,
585                } => {
586                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
587                }
588                WalRecord::DeleteEdge { id } => {
589                    target_store.delete_edge(*id);
590                }
591                WalRecord::SetNodeProperty { id, key, value } => {
592                    target_store.set_node_property(*id, key, value.clone());
593                }
594                WalRecord::SetEdgeProperty { id, key, value } => {
595                    target_store.set_edge_property(*id, key, value.clone());
596                }
597                WalRecord::AddNodeLabel { id, label } => {
598                    target_store.add_label(*id, label);
599                }
600                WalRecord::RemoveNodeLabel { id, label } => {
601                    target_store.remove_label(*id, label);
602                }
603                WalRecord::RemoveNodeProperty { id, key } => {
604                    target_store.remove_node_property(*id, key);
605                }
606                WalRecord::RemoveEdgeProperty { id, key } => {
607                    target_store.remove_edge_property(*id, key);
608                }
609
610                // --- Schema DDL replay (always on root catalog) ---
611                WalRecord::CreateNodeType {
612                    name,
613                    properties,
614                    constraints,
615                } => {
616                    let def = NodeTypeDefinition {
617                        name: name.clone(),
618                        properties: properties
619                            .iter()
620                            .map(|(n, t, nullable)| TypedProperty {
621                                name: n.clone(),
622                                data_type: PropertyDataType::from_type_name(t),
623                                nullable: *nullable,
624                                default_value: None,
625                            })
626                            .collect(),
627                        constraints: constraints
628                            .iter()
629                            .map(|(kind, props)| match kind.as_str() {
630                                "unique" => TypeConstraint::Unique(props.clone()),
631                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
632                                "not_null" if !props.is_empty() => {
633                                    TypeConstraint::NotNull(props[0].clone())
634                                }
635                                _ => TypeConstraint::Unique(props.clone()),
636                            })
637                            .collect(),
638                        parent_types: Vec::new(),
639                    };
640                    let _ = catalog.register_node_type(def);
641                }
642                WalRecord::DropNodeType { name } => {
643                    let _ = catalog.drop_node_type(name);
644                }
645                WalRecord::CreateEdgeType {
646                    name,
647                    properties,
648                    constraints,
649                } => {
650                    let def = EdgeTypeDefinition {
651                        name: name.clone(),
652                        properties: properties
653                            .iter()
654                            .map(|(n, t, nullable)| TypedProperty {
655                                name: n.clone(),
656                                data_type: PropertyDataType::from_type_name(t),
657                                nullable: *nullable,
658                                default_value: None,
659                            })
660                            .collect(),
661                        constraints: constraints
662                            .iter()
663                            .map(|(kind, props)| match kind.as_str() {
664                                "unique" => TypeConstraint::Unique(props.clone()),
665                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
666                                "not_null" if !props.is_empty() => {
667                                    TypeConstraint::NotNull(props[0].clone())
668                                }
669                                _ => TypeConstraint::Unique(props.clone()),
670                            })
671                            .collect(),
672                        source_node_types: Vec::new(),
673                        target_node_types: Vec::new(),
674                    };
675                    let _ = catalog.register_edge_type_def(def);
676                }
677                WalRecord::DropEdgeType { name } => {
678                    let _ = catalog.drop_edge_type_def(name);
679                }
680                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
681                    // Index recreation is handled by the store on startup
682                    // (indexes are rebuilt from data, not WAL)
683                }
684                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
685                    // Constraint definitions are part of type definitions
686                    // and replayed via CreateNodeType/CreateEdgeType
687                }
688                WalRecord::CreateGraphType {
689                    name,
690                    node_types,
691                    edge_types,
692                    open,
693                } => {
694                    use crate::catalog::GraphTypeDefinition;
695                    let def = GraphTypeDefinition {
696                        name: name.clone(),
697                        allowed_node_types: node_types.clone(),
698                        allowed_edge_types: edge_types.clone(),
699                        open: *open,
700                    };
701                    let _ = catalog.register_graph_type(def);
702                }
703                WalRecord::DropGraphType { name } => {
704                    let _ = catalog.drop_graph_type(name);
705                }
706                WalRecord::CreateSchema { name } => {
707                    let _ = catalog.register_schema_namespace(name.clone());
708                }
709                WalRecord::DropSchema { name } => {
710                    let _ = catalog.drop_schema_namespace(name);
711                }
712
713                WalRecord::AlterNodeType { name, alterations } => {
714                    for (action, prop_name, type_name, nullable) in alterations {
715                        match action.as_str() {
716                            "add" => {
717                                let prop = TypedProperty {
718                                    name: prop_name.clone(),
719                                    data_type: PropertyDataType::from_type_name(type_name),
720                                    nullable: *nullable,
721                                    default_value: None,
722                                };
723                                let _ = catalog.alter_node_type_add_property(name, prop);
724                            }
725                            "drop" => {
726                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
727                            }
728                            _ => {}
729                        }
730                    }
731                }
732                WalRecord::AlterEdgeType { name, alterations } => {
733                    for (action, prop_name, type_name, nullable) in alterations {
734                        match action.as_str() {
735                            "add" => {
736                                let prop = TypedProperty {
737                                    name: prop_name.clone(),
738                                    data_type: PropertyDataType::from_type_name(type_name),
739                                    nullable: *nullable,
740                                    default_value: None,
741                                };
742                                let _ = catalog.alter_edge_type_add_property(name, prop);
743                            }
744                            "drop" => {
745                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
746                            }
747                            _ => {}
748                        }
749                    }
750                }
751                WalRecord::AlterGraphType { name, alterations } => {
752                    for (action, type_name) in alterations {
753                        match action.as_str() {
754                            "add_node" => {
755                                let _ =
756                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
757                            }
758                            "drop_node" => {
759                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
760                            }
761                            "add_edge" => {
762                                let _ =
763                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
764                            }
765                            "drop_edge" => {
766                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
767                            }
768                            _ => {}
769                        }
770                    }
771                }
772
773                WalRecord::CreateProcedure {
774                    name,
775                    params,
776                    returns,
777                    body,
778                } => {
779                    use crate::catalog::ProcedureDefinition;
780                    let def = ProcedureDefinition {
781                        name: name.clone(),
782                        params: params.clone(),
783                        returns: returns.clone(),
784                        body: body.clone(),
785                    };
786                    let _ = catalog.register_procedure(def);
787                }
788                WalRecord::DropProcedure { name } => {
789                    let _ = catalog.drop_procedure(name);
790                }
791
792                // --- RDF triple replay ---
793                #[cfg(feature = "rdf")]
794                WalRecord::InsertRdfTriple { .. }
795                | WalRecord::DeleteRdfTriple { .. }
796                | WalRecord::ClearRdfGraph { .. }
797                | WalRecord::CreateRdfGraph { .. }
798                | WalRecord::DropRdfGraph { .. } => {
799                    rdf_ops::replay_rdf_wal_record(rdf_store, record);
800                }
801                #[cfg(not(feature = "rdf"))]
802                WalRecord::InsertRdfTriple { .. }
803                | WalRecord::DeleteRdfTriple { .. }
804                | WalRecord::ClearRdfGraph { .. }
805                | WalRecord::CreateRdfGraph { .. }
806                | WalRecord::DropRdfGraph { .. } => {}
807
808                WalRecord::TransactionCommit { .. } => {
809                    // In temporal mode, advance the store epoch on each committed
810                    // transaction so that subsequent property/label operations
811                    // are recorded at the correct epoch in their VersionLogs.
812                    #[cfg(feature = "temporal")]
813                    {
814                        target_store.new_epoch();
815                    }
816                }
817                WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
818                    // Transaction control records don't need replay action
819                    // (recovery already filtered to only committed transactions)
820                }
821            }
822        }
823        Ok(())
824    }
825
826    // =========================================================================
827    // Single-file format helpers
828    // =========================================================================
829
830    /// Returns `true` if the given path should use single-file format.
831    #[cfg(feature = "grafeo-file")]
832    fn should_use_single_file(
833        path: &std::path::Path,
834        configured: crate::config::StorageFormat,
835    ) -> bool {
836        use crate::config::StorageFormat;
837        match configured {
838            StorageFormat::SingleFile => true,
839            StorageFormat::WalDirectory => false,
840            StorageFormat::Auto => {
841                // Existing file: check magic bytes
842                if path.is_file() {
843                    if let Ok(mut f) = std::fs::File::open(path) {
844                        use std::io::Read;
845                        let mut magic = [0u8; 4];
846                        if f.read_exact(&mut magic).is_ok()
847                            && magic == grafeo_adapters::storage::file::MAGIC
848                        {
849                            return true;
850                        }
851                    }
852                    return false;
853                }
854                // Existing directory: legacy format
855                if path.is_dir() {
856                    return false;
857                }
858                // New path: check extension
859                path.extension().is_some_and(|ext| ext == "grafeo")
860            }
861        }
862    }
863
864    /// Applies snapshot data (from a `.grafeo` file) to restore the store and catalog.
865    #[cfg(feature = "grafeo-file")]
866    fn apply_snapshot_data(
867        store: &Arc<LpgStore>,
868        catalog: &Arc<crate::catalog::Catalog>,
869        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
870        data: &[u8],
871    ) -> Result<()> {
872        persistence::load_snapshot_into_store(
873            store,
874            catalog,
875            #[cfg(feature = "rdf")]
876            rdf_store,
877            data,
878        )
879    }
880
881    // =========================================================================
882    // Session & Configuration
883    // =========================================================================
884
885    /// Opens a new session for running queries.
886    ///
887    /// Sessions are cheap to create: spin up as many as you need. Each
888    /// gets its own transaction context, so concurrent sessions won't
889    /// block each other on reads.
890    ///
891    /// # Panics
892    ///
893    /// Panics if the database was configured with an external graph store and
894    /// the internal arena allocator cannot be initialized (out of memory).
895    ///
896    /// # Examples
897    ///
898    /// ```
899    /// use grafeo_engine::GrafeoDB;
900    ///
901    /// let db = GrafeoDB::new_in_memory();
902    /// let session = db.session();
903    ///
904    /// // Run queries through the session
905    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
906    /// # Ok::<(), grafeo_common::utils::error::Error>(())
907    /// ```
908    #[must_use]
909    pub fn session(&self) -> Session {
910        let session_cfg = || crate::session::SessionConfig {
911            transaction_manager: Arc::clone(&self.transaction_manager),
912            query_cache: Arc::clone(&self.query_cache),
913            catalog: Arc::clone(&self.catalog),
914            adaptive_config: self.config.adaptive.clone(),
915            factorized_execution: self.config.factorized_execution,
916            graph_model: self.config.graph_model,
917            query_timeout: self.config.query_timeout,
918            commit_counter: Arc::clone(&self.commit_counter),
919            gc_interval: self.config.gc_interval,
920            read_only: self.read_only,
921        };
922
923        if let Some(ref ext_store) = self.external_store {
924            return Session::with_external_store(Arc::clone(ext_store), session_cfg())
925                .expect("arena allocation for external store session");
926        }
927
928        #[cfg(feature = "rdf")]
929        let mut session = Session::with_rdf_store_and_adaptive(
930            Arc::clone(&self.store),
931            Arc::clone(&self.rdf_store),
932            session_cfg(),
933        );
934        #[cfg(not(feature = "rdf"))]
935        let mut session = Session::with_adaptive(Arc::clone(&self.store), session_cfg());
936
937        #[cfg(feature = "wal")]
938        if let Some(ref wal) = self.wal {
939            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
940        }
941
942        #[cfg(feature = "cdc")]
943        session.set_cdc_log(Arc::clone(&self.cdc_log));
944
945        #[cfg(feature = "metrics")]
946        {
947            if let Some(ref m) = self.metrics {
948                session.set_metrics(Arc::clone(m));
949                m.session_created
950                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
951                m.session_active
952                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
953            }
954        }
955
956        // Propagate persistent graph context to the new session
957        if let Some(ref graph) = *self.current_graph.read() {
958            session.use_graph(graph);
959        }
960
961        // Propagate persistent schema context to the new session
962        if let Some(ref schema) = *self.current_schema.read() {
963            session.set_schema(schema);
964        }
965
966        // Suppress unused_mut when cdc/wal are disabled
967        let _ = &mut session;
968
969        session
970    }
971
972    /// Returns the current graph name, if any.
973    ///
974    /// This is the persistent graph context used by one-shot `execute()` calls.
975    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
976    /// or `SESSION RESET`.
977    #[must_use]
978    pub fn current_graph(&self) -> Option<String> {
979        self.current_graph.read().clone()
980    }
981
982    /// Sets the current graph context for subsequent one-shot `execute()` calls.
983    ///
984    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
985    /// Pass `None` to reset to the default graph.
986    pub fn set_current_graph(&self, name: Option<&str>) {
987        *self.current_graph.write() = name.map(ToString::to_string);
988    }
989
990    /// Returns the current schema name, if any.
991    ///
992    /// This is the persistent schema context used by one-shot `execute()` calls.
993    /// It is updated whenever `execute()` encounters `SESSION SET SCHEMA` or `SESSION RESET`.
994    #[must_use]
995    pub fn current_schema(&self) -> Option<String> {
996        self.current_schema.read().clone()
997    }
998
999    /// Sets the current schema context for subsequent one-shot `execute()` calls.
1000    ///
1001    /// This is equivalent to running `SESSION SET SCHEMA <name>` but without creating
1002    /// a session. Pass `None` to clear the schema context.
1003    pub fn set_current_schema(&self, name: Option<&str>) {
1004        *self.current_schema.write() = name.map(ToString::to_string);
1005    }
1006
1007    /// Returns the adaptive execution configuration.
1008    #[must_use]
1009    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1010        &self.config.adaptive
1011    }
1012
1013    /// Returns `true` if this database was opened in read-only mode.
1014    #[must_use]
1015    pub fn is_read_only(&self) -> bool {
1016        self.read_only
1017    }
1018
1019    /// Returns the configuration.
1020    #[must_use]
1021    pub fn config(&self) -> &Config {
1022        &self.config
1023    }
1024
1025    /// Returns the graph data model of this database.
1026    #[must_use]
1027    pub fn graph_model(&self) -> crate::config::GraphModel {
1028        self.config.graph_model
1029    }
1030
1031    /// Returns the configured memory limit in bytes, if any.
1032    #[must_use]
1033    pub fn memory_limit(&self) -> Option<usize> {
1034        self.config.memory_limit
1035    }
1036
1037    /// Returns a point-in-time snapshot of all metrics.
1038    ///
1039    /// If the `metrics` feature is disabled or the registry is not
1040    /// initialized, returns a default (all-zero) snapshot.
1041    #[cfg(feature = "metrics")]
1042    #[must_use]
1043    pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1044        let mut snapshot = self
1045            .metrics
1046            .as_ref()
1047            .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1048
1049        // Augment with cache stats from the query cache (not tracked in the registry)
1050        let cache_stats = self.query_cache.stats();
1051        snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1052        snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1053        snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1054        snapshot.cache_invalidations = cache_stats.invalidations;
1055
1056        snapshot
1057    }
1058
1059    /// Returns all metrics in Prometheus text exposition format.
1060    ///
1061    /// The output is ready to serve from an HTTP `/metrics` endpoint.
1062    #[cfg(feature = "metrics")]
1063    #[must_use]
1064    pub fn metrics_prometheus(&self) -> String {
1065        self.metrics
1066            .as_ref()
1067            .map_or_else(String::new, |m| m.to_prometheus())
1068    }
1069
1070    /// Resets all metrics counters and histograms to zero.
1071    #[cfg(feature = "metrics")]
1072    pub fn reset_metrics(&self) {
1073        if let Some(ref m) = self.metrics {
1074            m.reset();
1075        }
1076        self.query_cache.reset_stats();
1077    }
1078
1079    /// Returns the underlying (default) store.
1080    ///
1081    /// This provides direct access to the LPG store for algorithm implementations
1082    /// and admin operations (index management, schema introspection, MVCC internals).
1083    ///
1084    /// For code that only needs read/write graph operations, prefer
1085    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
1086    #[must_use]
1087    pub fn store(&self) -> &Arc<LpgStore> {
1088        &self.store
1089    }
1090
1091    /// Returns the LPG store for the currently active graph.
1092    ///
1093    /// If [`current_graph`](Self::current_graph) is `None` or `"default"`, returns
1094    /// the default store. Otherwise looks up the named graph in the root store.
1095    /// Falls back to the default store if the named graph does not exist.
1096    #[allow(dead_code)] // Reserved for future graph-aware CRUD methods
1097    fn active_store(&self) -> Arc<LpgStore> {
1098        let graph_name = self.current_graph.read().clone();
1099        match graph_name {
1100            None => Arc::clone(&self.store),
1101            Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
1102            Some(ref name) => self
1103                .store
1104                .graph(name)
1105                .unwrap_or_else(|| Arc::clone(&self.store)),
1106        }
1107    }
1108
1109    // === Named Graph Management ===
1110
1111    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
1112    ///
1113    /// # Errors
1114    ///
1115    /// Returns an error if arena allocation fails.
1116    pub fn create_graph(&self, name: &str) -> Result<bool> {
1117        Ok(self.store.create_graph(name)?)
1118    }
1119
1120    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
1121    pub fn drop_graph(&self, name: &str) -> bool {
1122        self.store.drop_graph(name)
1123    }
1124
1125    /// Returns all named graph names.
1126    #[must_use]
1127    pub fn list_graphs(&self) -> Vec<String> {
1128        self.store.graph_names()
1129    }
1130
1131    /// Returns the graph store as a trait object.
1132    ///
1133    /// This provides the [`GraphStoreMut`] interface for code that should work
1134    /// with any storage backend. Use this when you only need graph read/write
1135    /// operations and don't need admin methods like index management.
1136    ///
1137    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
1138    #[must_use]
1139    pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
1140        if let Some(ref ext_store) = self.external_store {
1141            Arc::clone(ext_store)
1142        } else {
1143            Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
1144        }
1145    }
1146
1147    /// Garbage collects old MVCC versions that are no longer visible.
1148    ///
1149    /// Determines the minimum epoch required by active transactions and prunes
1150    /// version chains older than that threshold. Also cleans up completed
1151    /// transaction metadata in the transaction manager.
1152    pub fn gc(&self) {
1153        let min_epoch = self.transaction_manager.min_active_epoch();
1154        self.store.gc_versions(min_epoch);
1155        self.transaction_manager.gc();
1156    }
1157
1158    /// Returns the buffer manager for memory-aware operations.
1159    #[must_use]
1160    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1161        &self.buffer_manager
1162    }
1163
1164    /// Returns the query cache.
1165    #[must_use]
1166    pub fn query_cache(&self) -> &Arc<QueryCache> {
1167        &self.query_cache
1168    }
1169
1170    /// Clears all cached query plans.
1171    ///
1172    /// This is called automatically after DDL operations, but can also be
1173    /// invoked manually after external schema changes (e.g., WAL replay,
1174    /// import) or when you want to force re-optimization of all queries.
1175    pub fn clear_plan_cache(&self) {
1176        self.query_cache.clear();
1177    }
1178
1179    // =========================================================================
1180    // Lifecycle
1181    // =========================================================================
1182
1183    /// Closes the database, flushing all pending writes.
1184    ///
1185    /// For persistent databases, this ensures everything is safely on disk.
1186    /// Called automatically when the database is dropped, but you can call
1187    /// it explicitly if you need to guarantee durability at a specific point.
1188    ///
1189    /// # Errors
1190    ///
1191    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
1192    pub fn close(&self) -> Result<()> {
1193        let mut is_open = self.is_open.write();
1194        if !*is_open {
1195            return Ok(());
1196        }
1197
1198        // Read-only databases: just release the shared lock, no checkpointing
1199        if self.read_only {
1200            #[cfg(feature = "grafeo-file")]
1201            if let Some(ref fm) = self.file_manager {
1202                fm.close()?;
1203            }
1204            *is_open = false;
1205            return Ok(());
1206        }
1207
1208        // For single-file format: checkpoint to .grafeo file, then clean up sidecar WAL.
1209        // We must do this BEFORE the WAL close path because checkpoint_to_file
1210        // removes the sidecar WAL directory.
1211        #[cfg(feature = "grafeo-file")]
1212        let is_single_file = self.file_manager.is_some();
1213        #[cfg(not(feature = "grafeo-file"))]
1214        let is_single_file = false;
1215
1216        #[cfg(feature = "grafeo-file")]
1217        if let Some(ref fm) = self.file_manager {
1218            // Flush WAL first so all records are on disk before we snapshot
1219            #[cfg(feature = "wal")]
1220            if let Some(ref wal) = self.wal {
1221                wal.sync()?;
1222            }
1223            self.checkpoint_to_file(fm)?;
1224
1225            // Release WAL file handles before removing sidecar directory.
1226            // On Windows, open handles prevent directory deletion.
1227            #[cfg(feature = "wal")]
1228            if let Some(ref wal) = self.wal {
1229                wal.close_active_log();
1230            }
1231
1232            {
1233                use grafeo_core::testing::crash::maybe_crash;
1234                maybe_crash("close:before_remove_sidecar_wal");
1235            }
1236            fm.remove_sidecar_wal()?;
1237            fm.close()?;
1238        }
1239
1240        // Commit and sync WAL (legacy directory format only).
1241        // We intentionally do NOT call wal.checkpoint() here. Directory format
1242        // has no snapshot: the WAL files are the sole source of truth. Writing
1243        // checkpoint.meta would cause recovery to skip older WAL files, losing
1244        // all data that predates the current log sequence.
1245        #[cfg(feature = "wal")]
1246        if !is_single_file && let Some(ref wal) = self.wal {
1247            // Use the last assigned transaction ID, or create one for the commit record
1248            let commit_tx = self
1249                .transaction_manager
1250                .last_assigned_transaction_id()
1251                .unwrap_or_else(|| self.transaction_manager.begin());
1252
1253            // Log a TransactionCommit to mark all pending records as committed
1254            wal.log(&WalRecord::TransactionCommit {
1255                transaction_id: commit_tx,
1256            })?;
1257
1258            wal.sync()?;
1259        }
1260
1261        *is_open = false;
1262        Ok(())
1263    }
1264
1265    /// Returns the typed WAL if available.
1266    #[cfg(feature = "wal")]
1267    #[must_use]
1268    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1269        self.wal.as_ref()
1270    }
1271
1272    /// Logs a WAL record if WAL is enabled.
1273    #[cfg(feature = "wal")]
1274    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1275        if let Some(ref wal) = self.wal {
1276            wal.log(record)?;
1277        }
1278        Ok(())
1279    }
1280
1281    /// Writes the current database snapshot to the `.grafeo` file.
1282    ///
1283    /// Does NOT remove the sidecar WAL: callers that want to clean up
1284    /// the sidecar (e.g. `close()`) should call `fm.remove_sidecar_wal()`
1285    /// separately after this returns.
1286    #[cfg(feature = "grafeo-file")]
1287    fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1288        use grafeo_core::testing::crash::maybe_crash;
1289
1290        maybe_crash("checkpoint_to_file:before_export");
1291        let snapshot_data = self.export_snapshot()?;
1292        maybe_crash("checkpoint_to_file:after_export");
1293
1294        let epoch = self.store.current_epoch();
1295        let transaction_id = self
1296            .transaction_manager
1297            .last_assigned_transaction_id()
1298            .map_or(0, |t| t.0);
1299        let node_count = self.store.node_count() as u64;
1300        let edge_count = self.store.edge_count() as u64;
1301
1302        fm.write_snapshot(
1303            &snapshot_data,
1304            epoch.0,
1305            transaction_id,
1306            node_count,
1307            edge_count,
1308        )?;
1309
1310        maybe_crash("checkpoint_to_file:after_write_snapshot");
1311        Ok(())
1312    }
1313
1314    /// Returns the file manager if using single-file format.
1315    #[cfg(feature = "grafeo-file")]
1316    #[must_use]
1317    pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1318        self.file_manager.as_ref()
1319    }
1320}
1321
1322impl Drop for GrafeoDB {
1323    fn drop(&mut self) {
1324        if let Err(e) = self.close() {
1325            grafeo_error!("Error closing database: {}", e);
1326        }
1327    }
1328}
1329
1330impl crate::admin::AdminService for GrafeoDB {
1331    fn info(&self) -> crate::admin::DatabaseInfo {
1332        self.info()
1333    }
1334
1335    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1336        self.detailed_stats()
1337    }
1338
1339    fn schema(&self) -> crate::admin::SchemaInfo {
1340        self.schema()
1341    }
1342
1343    fn validate(&self) -> crate::admin::ValidationResult {
1344        self.validate()
1345    }
1346
1347    fn wal_status(&self) -> crate::admin::WalStatus {
1348        self.wal_status()
1349    }
1350
1351    fn wal_checkpoint(&self) -> Result<()> {
1352        self.wal_checkpoint()
1353    }
1354}
1355
1356// =========================================================================
1357// Query Result Types
1358// =========================================================================
1359
1360/// The result of running a query.
1361///
1362/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1363/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1364///
1365/// # Examples
1366///
1367/// ```
1368/// use grafeo_engine::GrafeoDB;
1369///
1370/// let db = GrafeoDB::new_in_memory();
1371/// db.create_node(&["Person"]);
1372///
1373/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1374///
1375/// // Check what we got
1376/// println!("Columns: {:?}", result.columns);
1377/// println!("Rows: {}", result.row_count());
1378///
1379/// // Iterate through results
1380/// for row in result.iter() {
1381///     println!("{:?}", row);
1382/// }
1383/// # Ok::<(), grafeo_common::utils::error::Error>(())
1384/// ```
1385#[derive(Debug)]
1386pub struct QueryResult {
1387    /// Column names from the RETURN clause.
1388    pub columns: Vec<String>,
1389    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1390    pub column_types: Vec<grafeo_common::types::LogicalType>,
1391    /// The actual result rows.
1392    pub rows: Vec<Vec<grafeo_common::types::Value>>,
1393    /// Query execution time in milliseconds (if timing was enabled).
1394    pub execution_time_ms: Option<f64>,
1395    /// Number of rows scanned during query execution (estimate).
1396    pub rows_scanned: Option<u64>,
1397    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
1398    pub status_message: Option<String>,
1399    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
1400    pub gql_status: grafeo_common::utils::GqlStatus,
1401}
1402
1403impl QueryResult {
1404    /// Creates a fully empty query result (no columns, no rows).
1405    #[must_use]
1406    pub fn empty() -> Self {
1407        Self {
1408            columns: Vec::new(),
1409            column_types: Vec::new(),
1410            rows: Vec::new(),
1411            execution_time_ms: None,
1412            rows_scanned: None,
1413            status_message: None,
1414            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1415        }
1416    }
1417
1418    /// Creates a query result with only a status message (for DDL commands).
1419    #[must_use]
1420    pub fn status(msg: impl Into<String>) -> Self {
1421        Self {
1422            columns: Vec::new(),
1423            column_types: Vec::new(),
1424            rows: Vec::new(),
1425            execution_time_ms: None,
1426            rows_scanned: None,
1427            status_message: Some(msg.into()),
1428            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1429        }
1430    }
1431
1432    /// Creates a new empty query result.
1433    #[must_use]
1434    pub fn new(columns: Vec<String>) -> Self {
1435        let len = columns.len();
1436        Self {
1437            columns,
1438            column_types: vec![grafeo_common::types::LogicalType::Any; len],
1439            rows: Vec::new(),
1440            execution_time_ms: None,
1441            rows_scanned: None,
1442            status_message: None,
1443            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1444        }
1445    }
1446
1447    /// Creates a new empty query result with column types.
1448    #[must_use]
1449    pub fn with_types(
1450        columns: Vec<String>,
1451        column_types: Vec<grafeo_common::types::LogicalType>,
1452    ) -> Self {
1453        Self {
1454            columns,
1455            column_types,
1456            rows: Vec::new(),
1457            execution_time_ms: None,
1458            rows_scanned: None,
1459            status_message: None,
1460            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1461        }
1462    }
1463
1464    /// Sets the execution metrics on this result.
1465    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1466        self.execution_time_ms = Some(execution_time_ms);
1467        self.rows_scanned = Some(rows_scanned);
1468        self
1469    }
1470
1471    /// Returns the execution time in milliseconds, if available.
1472    #[must_use]
1473    pub fn execution_time_ms(&self) -> Option<f64> {
1474        self.execution_time_ms
1475    }
1476
1477    /// Returns the number of rows scanned, if available.
1478    #[must_use]
1479    pub fn rows_scanned(&self) -> Option<u64> {
1480        self.rows_scanned
1481    }
1482
1483    /// Returns the number of rows.
1484    #[must_use]
1485    pub fn row_count(&self) -> usize {
1486        self.rows.len()
1487    }
1488
1489    /// Returns the number of columns.
1490    #[must_use]
1491    pub fn column_count(&self) -> usize {
1492        self.columns.len()
1493    }
1494
1495    /// Returns true if the result is empty.
1496    #[must_use]
1497    pub fn is_empty(&self) -> bool {
1498        self.rows.is_empty()
1499    }
1500
1501    /// Extracts a single value from the result.
1502    ///
1503    /// Use this when your query returns exactly one row with one column,
1504    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1505    ///
1506    /// # Errors
1507    ///
1508    /// Returns an error if the result has multiple rows or columns.
1509    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1510        if self.rows.len() != 1 || self.columns.len() != 1 {
1511            return Err(grafeo_common::utils::error::Error::InvalidValue(
1512                "Expected single value".to_string(),
1513            ));
1514        }
1515        T::from_value(&self.rows[0][0])
1516    }
1517
1518    /// Returns an iterator over the rows.
1519    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1520        self.rows.iter()
1521    }
1522}
1523
1524impl std::fmt::Display for QueryResult {
1525    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1526        let table = grafeo_common::fmt::format_result_table(
1527            &self.columns,
1528            &self.rows,
1529            self.execution_time_ms,
1530            self.status_message.as_deref(),
1531        );
1532        f.write_str(&table)
1533    }
1534}
1535
1536/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
1537///
1538/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1539/// Used by [`QueryResult::scalar()`] to extract typed values.
1540pub trait FromValue: Sized {
1541    /// Attempts the conversion, returning an error on type mismatch.
1542    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1543}
1544
1545impl FromValue for i64 {
1546    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1547        value
1548            .as_int64()
1549            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1550                expected: "INT64".to_string(),
1551                found: value.type_name().to_string(),
1552            })
1553    }
1554}
1555
1556impl FromValue for f64 {
1557    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1558        value
1559            .as_float64()
1560            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1561                expected: "FLOAT64".to_string(),
1562                found: value.type_name().to_string(),
1563            })
1564    }
1565}
1566
1567impl FromValue for String {
1568    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1569        value.as_str().map(String::from).ok_or_else(|| {
1570            grafeo_common::utils::error::Error::TypeMismatch {
1571                expected: "STRING".to_string(),
1572                found: value.type_name().to_string(),
1573            }
1574        })
1575    }
1576}
1577
1578impl FromValue for bool {
1579    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1580        value
1581            .as_bool()
1582            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1583                expected: "BOOL".to_string(),
1584                found: value.type_name().to_string(),
1585            })
1586    }
1587}
1588
1589#[cfg(test)]
1590mod tests {
1591    use super::*;
1592
1593    #[test]
1594    fn test_create_in_memory_database() {
1595        let db = GrafeoDB::new_in_memory();
1596        assert_eq!(db.node_count(), 0);
1597        assert_eq!(db.edge_count(), 0);
1598    }
1599
1600    #[test]
1601    fn test_database_config() {
1602        let config = Config::in_memory().with_threads(4).with_query_logging();
1603
1604        let db = GrafeoDB::with_config(config).unwrap();
1605        assert_eq!(db.config().threads, 4);
1606        assert!(db.config().query_logging);
1607    }
1608
1609    #[test]
1610    fn test_database_session() {
1611        let db = GrafeoDB::new_in_memory();
1612        let _session = db.session();
1613        // Session should be created successfully
1614    }
1615
1616    #[cfg(feature = "wal")]
1617    #[test]
1618    fn test_persistent_database_recovery() {
1619        use grafeo_common::types::Value;
1620        use tempfile::tempdir;
1621
1622        let dir = tempdir().unwrap();
1623        let db_path = dir.path().join("test_db");
1624
1625        // Create database and add some data
1626        {
1627            let db = GrafeoDB::open(&db_path).unwrap();
1628
1629            let alix = db.create_node(&["Person"]);
1630            db.set_node_property(alix, "name", Value::from("Alix"));
1631
1632            let gus = db.create_node(&["Person"]);
1633            db.set_node_property(gus, "name", Value::from("Gus"));
1634
1635            let _edge = db.create_edge(alix, gus, "KNOWS");
1636
1637            // Explicitly close to flush WAL
1638            db.close().unwrap();
1639        }
1640
1641        // Reopen and verify data was recovered
1642        {
1643            let db = GrafeoDB::open(&db_path).unwrap();
1644
1645            assert_eq!(db.node_count(), 2);
1646            assert_eq!(db.edge_count(), 1);
1647
1648            // Verify nodes exist
1649            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1650            assert!(node0.is_some());
1651
1652            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1653            assert!(node1.is_some());
1654        }
1655    }
1656
1657    #[cfg(feature = "wal")]
1658    #[test]
1659    fn test_wal_logging() {
1660        use tempfile::tempdir;
1661
1662        let dir = tempdir().unwrap();
1663        let db_path = dir.path().join("wal_test_db");
1664
1665        let db = GrafeoDB::open(&db_path).unwrap();
1666
1667        // Create some data
1668        let node = db.create_node(&["Test"]);
1669        db.delete_node(node);
1670
1671        // WAL should have records
1672        if let Some(wal) = db.wal() {
1673            assert!(wal.record_count() > 0);
1674        }
1675
1676        db.close().unwrap();
1677    }
1678
1679    #[cfg(feature = "wal")]
1680    #[test]
1681    fn test_wal_recovery_multiple_sessions() {
1682        // Tests that WAL recovery works correctly across multiple open/close cycles
1683        use grafeo_common::types::Value;
1684        use tempfile::tempdir;
1685
1686        let dir = tempdir().unwrap();
1687        let db_path = dir.path().join("multi_session_db");
1688
1689        // Session 1: Create initial data
1690        {
1691            let db = GrafeoDB::open(&db_path).unwrap();
1692            let alix = db.create_node(&["Person"]);
1693            db.set_node_property(alix, "name", Value::from("Alix"));
1694            db.close().unwrap();
1695        }
1696
1697        // Session 2: Add more data
1698        {
1699            let db = GrafeoDB::open(&db_path).unwrap();
1700            assert_eq!(db.node_count(), 1); // Previous data recovered
1701            let gus = db.create_node(&["Person"]);
1702            db.set_node_property(gus, "name", Value::from("Gus"));
1703            db.close().unwrap();
1704        }
1705
1706        // Session 3: Verify all data
1707        {
1708            let db = GrafeoDB::open(&db_path).unwrap();
1709            assert_eq!(db.node_count(), 2);
1710
1711            // Verify properties were recovered correctly
1712            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1713            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1714
1715            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1716            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1717        }
1718    }
1719
1720    #[cfg(feature = "wal")]
1721    #[test]
1722    fn test_database_consistency_after_mutations() {
1723        // Tests that database remains consistent after a series of create/delete operations
1724        use grafeo_common::types::Value;
1725        use tempfile::tempdir;
1726
1727        let dir = tempdir().unwrap();
1728        let db_path = dir.path().join("consistency_db");
1729
1730        {
1731            let db = GrafeoDB::open(&db_path).unwrap();
1732
1733            // Create nodes
1734            let a = db.create_node(&["Node"]);
1735            let b = db.create_node(&["Node"]);
1736            let c = db.create_node(&["Node"]);
1737
1738            // Create edges
1739            let e1 = db.create_edge(a, b, "LINKS");
1740            let _e2 = db.create_edge(b, c, "LINKS");
1741
1742            // Delete middle node and its edge
1743            db.delete_edge(e1);
1744            db.delete_node(b);
1745
1746            // Set properties on remaining nodes
1747            db.set_node_property(a, "value", Value::Int64(1));
1748            db.set_node_property(c, "value", Value::Int64(3));
1749
1750            db.close().unwrap();
1751        }
1752
1753        // Reopen and verify consistency
1754        {
1755            let db = GrafeoDB::open(&db_path).unwrap();
1756
1757            // Should have 2 nodes (a and c), b was deleted
1758            // Note: node_count includes deleted nodes in some implementations
1759            // What matters is that the non-deleted nodes are accessible
1760            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1761            assert!(node_a.is_some());
1762
1763            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1764            assert!(node_c.is_some());
1765
1766            // Middle node should be deleted
1767            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1768            assert!(node_b.is_none());
1769        }
1770    }
1771
1772    #[cfg(feature = "wal")]
1773    #[test]
1774    fn test_close_is_idempotent() {
1775        // Calling close() multiple times should not cause errors
1776        use tempfile::tempdir;
1777
1778        let dir = tempdir().unwrap();
1779        let db_path = dir.path().join("close_test_db");
1780
1781        let db = GrafeoDB::open(&db_path).unwrap();
1782        db.create_node(&["Test"]);
1783
1784        // First close should succeed
1785        assert!(db.close().is_ok());
1786
1787        // Second close should also succeed (idempotent)
1788        assert!(db.close().is_ok());
1789    }
1790
1791    #[test]
1792    fn test_with_store_external_backend() {
1793        use grafeo_core::graph::lpg::LpgStore;
1794
1795        let external = Arc::new(LpgStore::new().unwrap());
1796
1797        // Seed data on the external store directly
1798        let n1 = external.create_node(&["Person"]);
1799        external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
1800
1801        let db = GrafeoDB::with_store(
1802            Arc::clone(&external) as Arc<dyn GraphStoreMut>,
1803            Config::in_memory(),
1804        )
1805        .unwrap();
1806
1807        let session = db.session();
1808
1809        // Session should see data from the external store via execute
1810        #[cfg(feature = "gql")]
1811        {
1812            let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
1813            assert_eq!(result.rows.len(), 1);
1814        }
1815    }
1816
1817    #[test]
1818    fn test_with_config_custom_memory_limit() {
1819        let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); // 64 MB
1820
1821        let db = GrafeoDB::with_config(config).unwrap();
1822        assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
1823        assert_eq!(db.node_count(), 0);
1824    }
1825
1826    #[cfg(feature = "metrics")]
1827    #[test]
1828    fn test_database_metrics_registry() {
1829        let db = GrafeoDB::new_in_memory();
1830
1831        // Perform some operations
1832        db.create_node(&["Person"]);
1833        db.create_node(&["Person"]);
1834
1835        // Check that metrics snapshot returns data
1836        let snap = db.metrics();
1837        // Session created counter should reflect at least 0 (metrics is initialized)
1838        assert_eq!(snap.query_count, 0); // No queries executed yet
1839    }
1840
1841    #[test]
1842    fn test_query_result_has_metrics() {
1843        // Verifies that query results include execution metrics
1844        let db = GrafeoDB::new_in_memory();
1845        db.create_node(&["Person"]);
1846        db.create_node(&["Person"]);
1847
1848        #[cfg(feature = "gql")]
1849        {
1850            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1851
1852            // Metrics should be populated
1853            assert!(result.execution_time_ms.is_some());
1854            assert!(result.rows_scanned.is_some());
1855            assert!(result.execution_time_ms.unwrap() >= 0.0);
1856            assert_eq!(result.rows_scanned.unwrap(), 2);
1857        }
1858    }
1859
1860    #[test]
1861    fn test_empty_query_result_metrics() {
1862        // Verifies metrics are correct for queries returning no results
1863        let db = GrafeoDB::new_in_memory();
1864        db.create_node(&["Person"]);
1865
1866        #[cfg(feature = "gql")]
1867        {
1868            // Query that matches nothing
1869            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1870
1871            assert!(result.execution_time_ms.is_some());
1872            assert!(result.rows_scanned.is_some());
1873            assert_eq!(result.rows_scanned.unwrap(), 0);
1874        }
1875    }
1876
1877    #[cfg(feature = "cdc")]
1878    mod cdc_integration {
1879        use super::*;
1880
1881        #[test]
1882        fn test_node_lifecycle_history() {
1883            let db = GrafeoDB::new_in_memory();
1884
1885            // Create
1886            let id = db.create_node(&["Person"]);
1887            // Update
1888            db.set_node_property(id, "name", "Alix".into());
1889            db.set_node_property(id, "name", "Gus".into());
1890            // Delete
1891            db.delete_node(id);
1892
1893            let history = db.history(id).unwrap();
1894            assert_eq!(history.len(), 4); // create + 2 updates + delete
1895            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1896            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1897            assert!(history[1].before.is_none()); // first set_node_property has no prior value
1898            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1899            assert!(history[2].before.is_some()); // second update has prior "Alix"
1900            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1901        }
1902
1903        #[test]
1904        fn test_edge_lifecycle_history() {
1905            let db = GrafeoDB::new_in_memory();
1906
1907            let alix = db.create_node(&["Person"]);
1908            let gus = db.create_node(&["Person"]);
1909            let edge = db.create_edge(alix, gus, "KNOWS");
1910            db.set_edge_property(edge, "since", 2024i64.into());
1911            db.delete_edge(edge);
1912
1913            let history = db.history(edge).unwrap();
1914            assert_eq!(history.len(), 3); // create + update + delete
1915            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1916            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1917            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1918        }
1919
1920        #[test]
1921        fn test_create_node_with_props_cdc() {
1922            let db = GrafeoDB::new_in_memory();
1923
1924            let id = db.create_node_with_props(
1925                &["Person"],
1926                vec![
1927                    ("name", grafeo_common::types::Value::from("Alix")),
1928                    ("age", grafeo_common::types::Value::from(30i64)),
1929                ],
1930            );
1931
1932            let history = db.history(id).unwrap();
1933            assert_eq!(history.len(), 1);
1934            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1935            // Props should be captured
1936            let after = history[0].after.as_ref().unwrap();
1937            assert_eq!(after.len(), 2);
1938        }
1939
1940        #[test]
1941        fn test_changes_between() {
1942            let db = GrafeoDB::new_in_memory();
1943
1944            let id1 = db.create_node(&["A"]);
1945            let _id2 = db.create_node(&["B"]);
1946            db.set_node_property(id1, "x", 1i64.into());
1947
1948            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
1949            let changes = db
1950                .changes_between(
1951                    grafeo_common::types::EpochId(0),
1952                    grafeo_common::types::EpochId(u64::MAX),
1953                )
1954                .unwrap();
1955            assert_eq!(changes.len(), 3); // 2 creates + 1 update
1956        }
1957    }
1958
1959    #[test]
1960    fn test_with_store_basic() {
1961        use grafeo_core::graph::lpg::LpgStore;
1962
1963        let store = Arc::new(LpgStore::new().unwrap());
1964        let n1 = store.create_node(&["Person"]);
1965        store.set_node_property(n1, "name", "Alix".into());
1966
1967        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1968        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1969
1970        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1971        assert_eq!(result.rows.len(), 1);
1972    }
1973
1974    #[test]
1975    fn test_with_store_session() {
1976        use grafeo_core::graph::lpg::LpgStore;
1977
1978        let store = Arc::new(LpgStore::new().unwrap());
1979        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1980        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1981
1982        let session = db.session();
1983        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1984        assert_eq!(result.rows.len(), 1);
1985    }
1986
1987    #[test]
1988    fn test_with_store_mutations() {
1989        use grafeo_core::graph::lpg::LpgStore;
1990
1991        let store = Arc::new(LpgStore::new().unwrap());
1992        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1993        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1994
1995        let mut session = db.session();
1996
1997        // Use an explicit transaction so INSERT and MATCH share the same
1998        // transaction context. With PENDING epochs, uncommitted versions are
1999        // only visible to the owning transaction.
2000        session.begin_transaction().unwrap();
2001        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2002
2003        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
2004        assert_eq!(result.rows.len(), 1);
2005
2006        session.commit().unwrap();
2007    }
2008}