Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14mod admin;
15#[cfg(feature = "async-storage")]
16mod async_ops;
17#[cfg(feature = "async-storage")]
18pub(crate) mod async_wal_store;
19#[cfg(feature = "cdc")]
20pub(crate) mod cdc_store;
21mod crud;
22#[cfg(feature = "embed")]
23mod embed;
24mod import;
25mod index;
26mod persistence;
27mod query;
28#[cfg(feature = "rdf")]
29mod rdf_ops;
30mod search;
31#[cfg(feature = "wal")]
32pub(crate) mod wal_store;
33
34use grafeo_common::grafeo_error;
35#[cfg(feature = "wal")]
36use std::path::Path;
37use std::sync::Arc;
38use std::sync::atomic::AtomicUsize;
39
40use parking_lot::RwLock;
41
42#[cfg(feature = "grafeo-file")]
43use grafeo_adapters::storage::file::GrafeoFileManager;
44#[cfg(feature = "wal")]
45use grafeo_adapters::storage::wal::{
46    DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
47};
48use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
49use grafeo_common::utils::error::Result;
50use grafeo_core::graph::lpg::LpgStore;
51#[cfg(feature = "rdf")]
52use grafeo_core::graph::rdf::RdfStore;
53use grafeo_core::graph::{GraphStore, GraphStoreMut};
54
55use crate::catalog::Catalog;
56use crate::config::Config;
57use crate::query::cache::QueryCache;
58use crate::session::Session;
59use crate::transaction::TransactionManager;
60
61/// Your handle to a Grafeo database.
62///
63/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
64/// quick experiments, or [`open()`](Self::open) for persistent storage.
65/// Then grab a [`session()`](Self::session) to start querying.
66///
67/// # Examples
68///
69/// ```
70/// use grafeo_engine::GrafeoDB;
71///
72/// // Quick in-memory database
73/// let db = GrafeoDB::new_in_memory();
74///
75/// // Add some data
76/// db.create_node(&["Person"]);
77///
78/// // Query it
79/// let session = db.session();
80/// let result = session.execute("MATCH (p:Person) RETURN p")?;
81/// # Ok::<(), grafeo_common::utils::error::Error>(())
82/// ```
83pub struct GrafeoDB {
84    /// Database configuration.
85    pub(super) config: Config,
86    /// The underlying graph store (None when using an external store).
87    pub(super) store: Option<Arc<LpgStore>>,
88    /// Schema and metadata catalog shared across sessions.
89    pub(super) catalog: Arc<Catalog>,
90    /// RDF triple store (if RDF feature is enabled).
91    #[cfg(feature = "rdf")]
92    pub(super) rdf_store: Arc<RdfStore>,
93    /// Transaction manager.
94    pub(super) transaction_manager: Arc<TransactionManager>,
95    /// Unified buffer manager.
96    pub(super) buffer_manager: Arc<BufferManager>,
97    /// Write-ahead log manager (if durability is enabled).
98    #[cfg(feature = "wal")]
99    pub(super) wal: Option<Arc<LpgWal>>,
100    /// Shared WAL graph context tracker. Tracks which named graph was last
101    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
102    /// records only when the context actually changes.
103    #[cfg(feature = "wal")]
104    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
105    /// Query cache for parsed and optimized plans.
106    pub(super) query_cache: Arc<QueryCache>,
107    /// Shared commit counter for auto-GC across sessions.
108    pub(super) commit_counter: Arc<AtomicUsize>,
109    /// Whether the database is open.
110    pub(super) is_open: RwLock<bool>,
111    /// Change data capture log for tracking mutations.
112    #[cfg(feature = "cdc")]
113    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
114    /// Whether CDC is active for new sessions and direct CRUD (runtime-mutable).
115    #[cfg(feature = "cdc")]
116    cdc_enabled: std::sync::atomic::AtomicBool,
117    /// Registered embedding models for text-to-vector conversion.
118    #[cfg(feature = "embed")]
119    pub(super) embedding_models:
120        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
121    /// Single-file database manager (when using `.grafeo` format).
122    #[cfg(feature = "grafeo-file")]
123    pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
124    /// External read-only graph store (when using with_store() or with_read_store()).
125    /// When set, sessions route queries through this store instead of the built-in LpgStore.
126    pub(super) external_read_store: Option<Arc<dyn GraphStore>>,
127    /// External writable graph store (when using with_store()).
128    /// None for read-only databases created via with_read_store().
129    pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
130    /// Metrics registry shared across all sessions.
131    #[cfg(feature = "metrics")]
132    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
133    /// Persistent graph context for one-shot `execute()` calls.
134    /// When set, each call to `session()` pre-configures the session to this graph.
135    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
136    current_graph: RwLock<Option<String>>,
137    /// Persistent schema context for one-shot `execute()` calls.
138    /// When set, each call to `session()` pre-configures the session to this schema.
139    /// Updated after every one-shot `execute()` to reflect `SESSION SET SCHEMA` / `SESSION RESET`.
140    current_schema: RwLock<Option<String>>,
141    /// Whether this database is open in read-only mode.
142    /// When true, sessions automatically enforce read-only transactions.
143    read_only: bool,
144}
145
146impl GrafeoDB {
147    /// Returns a reference to the built-in LPG store.
148    ///
149    /// # Panics
150    ///
151    /// Panics if the database was created with [`with_store()`](Self::with_store) or
152    /// [`with_read_store()`](Self::with_read_store), which use an external store
153    /// instead of the built-in LPG store.
154    fn lpg_store(&self) -> &Arc<LpgStore> {
155        self.store.as_ref().expect(
156            "no built-in LpgStore: this GrafeoDB was created with an external store \
157             (with_store / with_read_store). Use session() or graph_store() instead.",
158        )
159    }
160
161    /// Returns whether CDC is active (runtime check).
162    #[cfg(feature = "cdc")]
163    #[inline]
164    pub(super) fn cdc_active(&self) -> bool {
165        self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
166    }
167
168    /// Creates an in-memory database, fast to create, gone when dropped.
169    ///
170    /// Use this for tests, experiments, or when you don't need persistence.
171    /// For data that survives restarts, use [`open()`](Self::open) instead.
172    ///
173    /// # Panics
174    ///
175    /// Panics if the internal arena allocator cannot be initialized (out of memory).
176    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
177    ///
178    /// # Examples
179    ///
180    /// ```
181    /// use grafeo_engine::GrafeoDB;
182    ///
183    /// let db = GrafeoDB::new_in_memory();
184    /// let session = db.session();
185    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
186    /// # Ok::<(), grafeo_common::utils::error::Error>(())
187    /// ```
188    #[must_use]
189    pub fn new_in_memory() -> Self {
190        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
191    }
192
193    /// Opens a database at the given path, creating it if it doesn't exist.
194    ///
195    /// If you've used this path before, Grafeo recovers your data from the
196    /// write-ahead log automatically. First open on a new path creates an
197    /// empty database.
198    ///
199    /// # Errors
200    ///
201    /// Returns an error if the path isn't writable or recovery fails.
202    ///
203    /// # Examples
204    ///
205    /// ```no_run
206    /// use grafeo_engine::GrafeoDB;
207    ///
208    /// let db = GrafeoDB::open("./my_social_network")?;
209    /// # Ok::<(), grafeo_common::utils::error::Error>(())
210    /// ```
211    #[cfg(feature = "wal")]
212    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
213        Self::with_config(Config::persistent(path.as_ref()))
214    }
215
216    /// Opens an existing database in read-only mode.
217    ///
218    /// Uses a shared file lock, so multiple processes can read the same
219    /// `.grafeo` file concurrently. The database loads the last checkpoint
220    /// snapshot but does **not** replay the WAL or allow mutations.
221    ///
222    /// Currently only supports the single-file (`.grafeo`) format.
223    ///
224    /// # Errors
225    ///
226    /// Returns an error if the file doesn't exist or can't be read.
227    ///
228    /// # Examples
229    ///
230    /// ```no_run
231    /// use grafeo_engine::GrafeoDB;
232    ///
233    /// let db = GrafeoDB::open_read_only("./my_graph.grafeo")?;
234    /// let session = db.session();
235    /// let result = session.execute("MATCH (n) RETURN n LIMIT 10")?;
236    /// // Mutations will return an error:
237    /// // session.execute("INSERT (:Person)") => Err(ReadOnly)
238    /// # Ok::<(), grafeo_common::utils::error::Error>(())
239    /// ```
240    #[cfg(feature = "grafeo-file")]
241    pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
242        Self::with_config(Config::read_only(path.as_ref()))
243    }
244
245    /// Creates a database with custom configuration.
246    ///
247    /// Use this when you need fine-grained control over memory limits,
248    /// thread counts, or persistence settings. For most cases,
249    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
250    /// are simpler.
251    ///
252    /// # Errors
253    ///
254    /// Returns an error if the database can't be created or recovery fails.
255    ///
256    /// # Examples
257    ///
258    /// ```
259    /// use grafeo_engine::{GrafeoDB, Config};
260    ///
261    /// // In-memory with a 512MB limit
262    /// let config = Config::in_memory()
263    ///     .with_memory_limit(512 * 1024 * 1024);
264    ///
265    /// let db = GrafeoDB::with_config(config)?;
266    /// # Ok::<(), grafeo_common::utils::error::Error>(())
267    /// ```
268    pub fn with_config(config: Config) -> Result<Self> {
269        // Validate configuration before proceeding
270        config
271            .validate()
272            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
273
274        let store = Arc::new(LpgStore::new()?);
275        #[cfg(feature = "rdf")]
276        let rdf_store = Arc::new(RdfStore::new());
277        let transaction_manager = Arc::new(TransactionManager::new());
278
279        // Create buffer manager with configured limits
280        let buffer_config = BufferManagerConfig {
281            budget: config.memory_limit.unwrap_or_else(|| {
282                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
283            }),
284            spill_path: config
285                .spill_path
286                .clone()
287                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
288            ..BufferManagerConfig::default()
289        };
290        let buffer_manager = BufferManager::new(buffer_config);
291
292        // Create catalog early so WAL replay can restore schema definitions
293        let catalog = Arc::new(Catalog::new());
294
295        let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
296
297        // --- Single-file format (.grafeo) ---
298        #[cfg(feature = "grafeo-file")]
299        let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
300            // Read-only mode: open with shared lock, load snapshot, skip WAL
301            if let Some(ref db_path) = config.path {
302                if db_path.exists() && db_path.is_file() {
303                    let fm = GrafeoFileManager::open_read_only(db_path)?;
304                    let snapshot_data = fm.read_snapshot()?;
305                    if !snapshot_data.is_empty() {
306                        Self::apply_snapshot_data(
307                            &store,
308                            &catalog,
309                            #[cfg(feature = "rdf")]
310                            &rdf_store,
311                            &snapshot_data,
312                        )?;
313                    }
314                    Some(Arc::new(fm))
315                } else {
316                    return Err(grafeo_common::utils::error::Error::Internal(format!(
317                        "read-only open requires an existing .grafeo file: {}",
318                        db_path.display()
319                    )));
320                }
321            } else {
322                return Err(grafeo_common::utils::error::Error::Internal(
323                    "read-only mode requires a database path".to_string(),
324                ));
325            }
326        } else if let Some(ref db_path) = config.path {
327            // Initialize the file manager whenever single-file format is selected,
328            // regardless of whether WAL is enabled. Without this, a database opened
329            // with wal_enabled:false + StorageFormat::SingleFile would produce no
330            // output at all (the file manager was previously gated behind wal_enabled).
331            if Self::should_use_single_file(db_path, config.storage_format) {
332                let fm = if db_path.exists() && db_path.is_file() {
333                    GrafeoFileManager::open(db_path)?
334                } else if !db_path.exists() {
335                    GrafeoFileManager::create(db_path)?
336                } else {
337                    // Path exists but is not a file (directory, etc.)
338                    return Err(grafeo_common::utils::error::Error::Internal(format!(
339                        "path exists but is not a file: {}",
340                        db_path.display()
341                    )));
342                };
343
344                // Load snapshot data from the file
345                let snapshot_data = fm.read_snapshot()?;
346                if !snapshot_data.is_empty() {
347                    Self::apply_snapshot_data(
348                        &store,
349                        &catalog,
350                        #[cfg(feature = "rdf")]
351                        &rdf_store,
352                        &snapshot_data,
353                    )?;
354                }
355
356                // Recover sidecar WAL if WAL is enabled and a sidecar exists
357                #[cfg(feature = "wal")]
358                if config.wal_enabled && fm.has_sidecar_wal() {
359                    let recovery = WalRecovery::new(fm.sidecar_wal_path());
360                    let records = recovery.recover()?;
361                    Self::apply_wal_records(
362                        &store,
363                        &catalog,
364                        #[cfg(feature = "rdf")]
365                        &rdf_store,
366                        &records,
367                    )?;
368                }
369
370                Some(Arc::new(fm))
371            } else {
372                None
373            }
374        } else {
375            None
376        };
377
378        // Determine whether to use the WAL directory path (legacy) or sidecar
379        // Read-only mode skips WAL entirely (no recovery, no creation).
380        #[cfg(feature = "wal")]
381        let wal = if is_read_only {
382            None
383        } else if config.wal_enabled {
384            if let Some(ref db_path) = config.path {
385                // When using single-file format, the WAL is a sidecar directory
386                #[cfg(feature = "grafeo-file")]
387                let wal_path = if let Some(ref fm) = file_manager {
388                    let p = fm.sidecar_wal_path();
389                    std::fs::create_dir_all(&p)?;
390                    p
391                } else {
392                    // Legacy: WAL inside the database directory
393                    std::fs::create_dir_all(db_path)?;
394                    db_path.join("wal")
395                };
396
397                #[cfg(not(feature = "grafeo-file"))]
398                let wal_path = {
399                    std::fs::create_dir_all(db_path)?;
400                    db_path.join("wal")
401                };
402
403                // For legacy WAL directory format, check if WAL exists and recover
404                #[cfg(feature = "grafeo-file")]
405                let is_single_file = file_manager.is_some();
406                #[cfg(not(feature = "grafeo-file"))]
407                let is_single_file = false;
408
409                if !is_single_file && wal_path.exists() {
410                    let recovery = WalRecovery::new(&wal_path);
411                    let records = recovery.recover()?;
412                    Self::apply_wal_records(
413                        &store,
414                        &catalog,
415                        #[cfg(feature = "rdf")]
416                        &rdf_store,
417                        &records,
418                    )?;
419                }
420
421                // Open/create WAL manager with configured durability
422                let wal_durability = match config.wal_durability {
423                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
424                    crate::config::DurabilityMode::Batch {
425                        max_delay_ms,
426                        max_records,
427                    } => WalDurabilityMode::Batch {
428                        max_delay_ms,
429                        max_records,
430                    },
431                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
432                        WalDurabilityMode::Adaptive { target_interval_ms }
433                    }
434                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
435                };
436                let wal_config = WalConfig {
437                    durability: wal_durability,
438                    ..WalConfig::default()
439                };
440                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
441                Some(Arc::new(wal_manager))
442            } else {
443                None
444            }
445        } else {
446            None
447        };
448
449        // Create query cache with default capacity (1000 queries)
450        let query_cache = Arc::new(QueryCache::default());
451
452        // After all snapshot/WAL recovery, sync TransactionManager epoch
453        // with the store so queries use the correct viewing epoch.
454        #[cfg(feature = "temporal")]
455        transaction_manager.sync_epoch(store.current_epoch());
456
457        #[cfg(feature = "cdc")]
458        let cdc_enabled_val = config.cdc_enabled;
459
460        Ok(Self {
461            config,
462            store: Some(store),
463            catalog,
464            #[cfg(feature = "rdf")]
465            rdf_store,
466            transaction_manager,
467            buffer_manager,
468            #[cfg(feature = "wal")]
469            wal,
470            #[cfg(feature = "wal")]
471            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
472            query_cache,
473            commit_counter: Arc::new(AtomicUsize::new(0)),
474            is_open: RwLock::new(true),
475            #[cfg(feature = "cdc")]
476            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
477            #[cfg(feature = "cdc")]
478            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
479            #[cfg(feature = "embed")]
480            embedding_models: RwLock::new(hashbrown::HashMap::new()),
481            #[cfg(feature = "grafeo-file")]
482            file_manager,
483            external_read_store: None,
484            external_write_store: None,
485            #[cfg(feature = "metrics")]
486            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
487            current_graph: RwLock::new(None),
488            current_schema: RwLock::new(None),
489            read_only: is_read_only,
490        })
491    }
492
493    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
494    ///
495    /// The external store handles all data persistence. WAL, CDC, and index
496    /// management are the responsibility of the store implementation.
497    ///
498    /// Query execution (all 6 languages, optimizer, planner) works through the
499    /// provided store. Admin operations (schema introspection, persistence,
500    /// vector/text indexes) are not available on external stores.
501    ///
502    /// # Examples
503    ///
504    /// ```no_run
505    /// use std::sync::Arc;
506    /// use grafeo_engine::{GrafeoDB, Config};
507    /// use grafeo_core::graph::GraphStoreMut;
508    ///
509    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
510    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
511    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
512    ///     Ok(())
513    /// }
514    /// ```
515    ///
516    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
517    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
518        config
519            .validate()
520            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
521
522        let transaction_manager = Arc::new(TransactionManager::new());
523
524        let buffer_config = BufferManagerConfig {
525            budget: config.memory_limit.unwrap_or_else(|| {
526                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
527            }),
528            spill_path: None,
529            ..BufferManagerConfig::default()
530        };
531        let buffer_manager = BufferManager::new(buffer_config);
532
533        let query_cache = Arc::new(QueryCache::default());
534
535        #[cfg(feature = "cdc")]
536        let cdc_enabled_val = config.cdc_enabled;
537
538        Ok(Self {
539            config,
540            store: None,
541            catalog: Arc::new(Catalog::new()),
542            #[cfg(feature = "rdf")]
543            rdf_store: Arc::new(RdfStore::new()),
544            transaction_manager,
545            buffer_manager,
546            #[cfg(feature = "wal")]
547            wal: None,
548            #[cfg(feature = "wal")]
549            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
550            query_cache,
551            commit_counter: Arc::new(AtomicUsize::new(0)),
552            is_open: RwLock::new(true),
553            #[cfg(feature = "cdc")]
554            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
555            #[cfg(feature = "cdc")]
556            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
557            #[cfg(feature = "embed")]
558            embedding_models: RwLock::new(hashbrown::HashMap::new()),
559            #[cfg(feature = "grafeo-file")]
560            file_manager: None,
561            external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStore>),
562            external_write_store: Some(store),
563            #[cfg(feature = "metrics")]
564            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
565            current_graph: RwLock::new(None),
566            current_schema: RwLock::new(None),
567            read_only: false,
568        })
569    }
570
571    /// Creates a database backed by a read-only [`GraphStore`].
572    ///
573    /// The database is set to read-only mode. Write queries (CREATE, SET,
574    /// DELETE) will return `TransactionError::ReadOnly`.
575    ///
576    /// # Examples
577    ///
578    /// ```no_run
579    /// use std::sync::Arc;
580    /// use grafeo_engine::{GrafeoDB, Config};
581    /// use grafeo_core::graph::GraphStore;
582    ///
583    /// fn example(store: Arc<dyn GraphStore>) -> grafeo_common::utils::error::Result<()> {
584    ///     let db = GrafeoDB::with_read_store(store, Config::in_memory())?;
585    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
586    ///     Ok(())
587    /// }
588    /// ```
589    ///
590    /// [`GraphStore`]: grafeo_core::graph::GraphStore
591    pub fn with_read_store(store: Arc<dyn GraphStore>, config: Config) -> Result<Self> {
592        config
593            .validate()
594            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
595
596        let transaction_manager = Arc::new(TransactionManager::new());
597
598        let buffer_config = BufferManagerConfig {
599            budget: config.memory_limit.unwrap_or_else(|| {
600                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
601            }),
602            spill_path: None,
603            ..BufferManagerConfig::default()
604        };
605        let buffer_manager = BufferManager::new(buffer_config);
606
607        let query_cache = Arc::new(QueryCache::default());
608
609        #[cfg(feature = "cdc")]
610        let cdc_enabled_val = config.cdc_enabled;
611
612        Ok(Self {
613            config,
614            store: None,
615            catalog: Arc::new(Catalog::new()),
616            #[cfg(feature = "rdf")]
617            rdf_store: Arc::new(RdfStore::new()),
618            transaction_manager,
619            buffer_manager,
620            #[cfg(feature = "wal")]
621            wal: None,
622            #[cfg(feature = "wal")]
623            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
624            query_cache,
625            commit_counter: Arc::new(AtomicUsize::new(0)),
626            is_open: RwLock::new(true),
627            #[cfg(feature = "cdc")]
628            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
629            #[cfg(feature = "cdc")]
630            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
631            #[cfg(feature = "embed")]
632            embedding_models: RwLock::new(hashbrown::HashMap::new()),
633            #[cfg(feature = "grafeo-file")]
634            file_manager: None,
635            external_read_store: Some(store),
636            external_write_store: None,
637            #[cfg(feature = "metrics")]
638            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
639            current_graph: RwLock::new(None),
640            current_schema: RwLock::new(None),
641            read_only: true,
642        })
643    }
644
645    /// Converts the database to a read-only [`CompactStore`] for faster queries.
646    ///
647    /// Takes a snapshot of all nodes and edges from the current store, builds
648    /// a columnar `CompactStore` with CSR adjacency, and switches the database
649    /// to read-only mode. The original store is dropped to free memory.
650    ///
651    /// After calling this, all write queries will fail with
652    /// `TransactionError::ReadOnly`. Read queries (across all supported
653    /// languages) continue to work and benefit from ~60x memory reduction
654    /// and 100x+ traversal speedup.
655    ///
656    /// # Errors
657    ///
658    /// Returns an error if the conversion fails (e.g. more than 32,767
659    /// distinct labels or edge types).
660    ///
661    /// [`CompactStore`]: grafeo_core::graph::compact::CompactStore
662    #[cfg(feature = "compact-store")]
663    pub fn compact(&mut self) -> Result<()> {
664        use grafeo_core::graph::compact::from_graph_store;
665
666        let current_store = self.graph_store();
667        let compact = from_graph_store(current_store.as_ref())
668            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
669
670        self.external_read_store = Some(Arc::new(compact) as Arc<dyn GraphStore>);
671        self.external_write_store = None;
672        self.store = None;
673        self.read_only = true;
674        self.query_cache = Arc::new(QueryCache::default());
675
676        Ok(())
677    }
678
679    /// Applies WAL records to restore the database state.
680    ///
681    /// Data mutation records are routed through a graph cursor that tracks
682    /// `SwitchGraph` context markers, replaying mutations into the correct
683    /// named graph (or the default graph when cursor is `None`).
684    #[cfg(feature = "wal")]
685    fn apply_wal_records(
686        store: &Arc<LpgStore>,
687        catalog: &Catalog,
688        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
689        records: &[WalRecord],
690    ) -> Result<()> {
691        use crate::catalog::{
692            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
693        };
694        use grafeo_common::utils::error::Error;
695
696        // Graph cursor: tracks which named graph receives data mutations.
697        // `None` means the default graph.
698        let mut current_graph: Option<String> = None;
699        let mut target_store: Arc<LpgStore> = Arc::clone(store);
700
701        for record in records {
702            match record {
703                // --- Named graph lifecycle ---
704                WalRecord::CreateNamedGraph { name } => {
705                    let _ = store.create_graph(name);
706                }
707                WalRecord::DropNamedGraph { name } => {
708                    store.drop_graph(name);
709                    // Reset cursor if the dropped graph was active
710                    if current_graph.as_deref() == Some(name.as_str()) {
711                        current_graph = None;
712                        target_store = Arc::clone(store);
713                    }
714                }
715                WalRecord::SwitchGraph { name } => {
716                    current_graph.clone_from(name);
717                    target_store = match &current_graph {
718                        None => Arc::clone(store),
719                        Some(graph_name) => store
720                            .graph_or_create(graph_name)
721                            .map_err(|e| Error::Internal(e.to_string()))?,
722                    };
723                }
724
725                // --- Data mutations: routed through target_store ---
726                WalRecord::CreateNode { id, labels } => {
727                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
728                    target_store.create_node_with_id(*id, &label_refs)?;
729                }
730                WalRecord::DeleteNode { id } => {
731                    target_store.delete_node(*id);
732                }
733                WalRecord::CreateEdge {
734                    id,
735                    src,
736                    dst,
737                    edge_type,
738                } => {
739                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
740                }
741                WalRecord::DeleteEdge { id } => {
742                    target_store.delete_edge(*id);
743                }
744                WalRecord::SetNodeProperty { id, key, value } => {
745                    target_store.set_node_property(*id, key, value.clone());
746                }
747                WalRecord::SetEdgeProperty { id, key, value } => {
748                    target_store.set_edge_property(*id, key, value.clone());
749                }
750                WalRecord::AddNodeLabel { id, label } => {
751                    target_store.add_label(*id, label);
752                }
753                WalRecord::RemoveNodeLabel { id, label } => {
754                    target_store.remove_label(*id, label);
755                }
756                WalRecord::RemoveNodeProperty { id, key } => {
757                    target_store.remove_node_property(*id, key);
758                }
759                WalRecord::RemoveEdgeProperty { id, key } => {
760                    target_store.remove_edge_property(*id, key);
761                }
762
763                // --- Schema DDL replay (always on root catalog) ---
764                WalRecord::CreateNodeType {
765                    name,
766                    properties,
767                    constraints,
768                } => {
769                    let def = NodeTypeDefinition {
770                        name: name.clone(),
771                        properties: properties
772                            .iter()
773                            .map(|(n, t, nullable)| TypedProperty {
774                                name: n.clone(),
775                                data_type: PropertyDataType::from_type_name(t),
776                                nullable: *nullable,
777                                default_value: None,
778                            })
779                            .collect(),
780                        constraints: constraints
781                            .iter()
782                            .map(|(kind, props)| match kind.as_str() {
783                                "unique" => TypeConstraint::Unique(props.clone()),
784                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
785                                "not_null" if !props.is_empty() => {
786                                    TypeConstraint::NotNull(props[0].clone())
787                                }
788                                _ => TypeConstraint::Unique(props.clone()),
789                            })
790                            .collect(),
791                        parent_types: Vec::new(),
792                    };
793                    let _ = catalog.register_node_type(def);
794                }
795                WalRecord::DropNodeType { name } => {
796                    let _ = catalog.drop_node_type(name);
797                }
798                WalRecord::CreateEdgeType {
799                    name,
800                    properties,
801                    constraints,
802                } => {
803                    let def = EdgeTypeDefinition {
804                        name: name.clone(),
805                        properties: properties
806                            .iter()
807                            .map(|(n, t, nullable)| TypedProperty {
808                                name: n.clone(),
809                                data_type: PropertyDataType::from_type_name(t),
810                                nullable: *nullable,
811                                default_value: None,
812                            })
813                            .collect(),
814                        constraints: constraints
815                            .iter()
816                            .map(|(kind, props)| match kind.as_str() {
817                                "unique" => TypeConstraint::Unique(props.clone()),
818                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
819                                "not_null" if !props.is_empty() => {
820                                    TypeConstraint::NotNull(props[0].clone())
821                                }
822                                _ => TypeConstraint::Unique(props.clone()),
823                            })
824                            .collect(),
825                        source_node_types: Vec::new(),
826                        target_node_types: Vec::new(),
827                    };
828                    let _ = catalog.register_edge_type_def(def);
829                }
830                WalRecord::DropEdgeType { name } => {
831                    let _ = catalog.drop_edge_type_def(name);
832                }
833                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
834                    // Index recreation is handled by the store on startup
835                    // (indexes are rebuilt from data, not WAL)
836                }
837                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
838                    // Constraint definitions are part of type definitions
839                    // and replayed via CreateNodeType/CreateEdgeType
840                }
841                WalRecord::CreateGraphType {
842                    name,
843                    node_types,
844                    edge_types,
845                    open,
846                } => {
847                    use crate::catalog::GraphTypeDefinition;
848                    let def = GraphTypeDefinition {
849                        name: name.clone(),
850                        allowed_node_types: node_types.clone(),
851                        allowed_edge_types: edge_types.clone(),
852                        open: *open,
853                    };
854                    let _ = catalog.register_graph_type(def);
855                }
856                WalRecord::DropGraphType { name } => {
857                    let _ = catalog.drop_graph_type(name);
858                }
859                WalRecord::CreateSchema { name } => {
860                    let _ = catalog.register_schema_namespace(name.clone());
861                }
862                WalRecord::DropSchema { name } => {
863                    let _ = catalog.drop_schema_namespace(name);
864                }
865
866                WalRecord::AlterNodeType { name, alterations } => {
867                    for (action, prop_name, type_name, nullable) in alterations {
868                        match action.as_str() {
869                            "add" => {
870                                let prop = TypedProperty {
871                                    name: prop_name.clone(),
872                                    data_type: PropertyDataType::from_type_name(type_name),
873                                    nullable: *nullable,
874                                    default_value: None,
875                                };
876                                let _ = catalog.alter_node_type_add_property(name, prop);
877                            }
878                            "drop" => {
879                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
880                            }
881                            _ => {}
882                        }
883                    }
884                }
885                WalRecord::AlterEdgeType { name, alterations } => {
886                    for (action, prop_name, type_name, nullable) in alterations {
887                        match action.as_str() {
888                            "add" => {
889                                let prop = TypedProperty {
890                                    name: prop_name.clone(),
891                                    data_type: PropertyDataType::from_type_name(type_name),
892                                    nullable: *nullable,
893                                    default_value: None,
894                                };
895                                let _ = catalog.alter_edge_type_add_property(name, prop);
896                            }
897                            "drop" => {
898                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
899                            }
900                            _ => {}
901                        }
902                    }
903                }
904                WalRecord::AlterGraphType { name, alterations } => {
905                    for (action, type_name) in alterations {
906                        match action.as_str() {
907                            "add_node" => {
908                                let _ =
909                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
910                            }
911                            "drop_node" => {
912                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
913                            }
914                            "add_edge" => {
915                                let _ =
916                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
917                            }
918                            "drop_edge" => {
919                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
920                            }
921                            _ => {}
922                        }
923                    }
924                }
925
926                WalRecord::CreateProcedure {
927                    name,
928                    params,
929                    returns,
930                    body,
931                } => {
932                    use crate::catalog::ProcedureDefinition;
933                    let def = ProcedureDefinition {
934                        name: name.clone(),
935                        params: params.clone(),
936                        returns: returns.clone(),
937                        body: body.clone(),
938                    };
939                    let _ = catalog.register_procedure(def);
940                }
941                WalRecord::DropProcedure { name } => {
942                    let _ = catalog.drop_procedure(name);
943                }
944
945                // --- RDF triple replay ---
946                #[cfg(feature = "rdf")]
947                WalRecord::InsertRdfTriple { .. }
948                | WalRecord::DeleteRdfTriple { .. }
949                | WalRecord::ClearRdfGraph { .. }
950                | WalRecord::CreateRdfGraph { .. }
951                | WalRecord::DropRdfGraph { .. } => {
952                    rdf_ops::replay_rdf_wal_record(rdf_store, record);
953                }
954                #[cfg(not(feature = "rdf"))]
955                WalRecord::InsertRdfTriple { .. }
956                | WalRecord::DeleteRdfTriple { .. }
957                | WalRecord::ClearRdfGraph { .. }
958                | WalRecord::CreateRdfGraph { .. }
959                | WalRecord::DropRdfGraph { .. } => {}
960
961                WalRecord::TransactionCommit { .. } => {
962                    // In temporal mode, advance the store epoch on each committed
963                    // transaction so that subsequent property/label operations
964                    // are recorded at the correct epoch in their VersionLogs.
965                    #[cfg(feature = "temporal")]
966                    {
967                        target_store.new_epoch();
968                    }
969                }
970                WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
971                    // Transaction control records don't need replay action
972                    // (recovery already filtered to only committed transactions)
973                }
974            }
975        }
976        Ok(())
977    }
978
979    // =========================================================================
980    // Single-file format helpers
981    // =========================================================================
982
983    /// Returns `true` if the given path should use single-file format.
984    #[cfg(feature = "grafeo-file")]
985    fn should_use_single_file(
986        path: &std::path::Path,
987        configured: crate::config::StorageFormat,
988    ) -> bool {
989        use crate::config::StorageFormat;
990        match configured {
991            StorageFormat::SingleFile => true,
992            StorageFormat::WalDirectory => false,
993            StorageFormat::Auto => {
994                // Existing file: check magic bytes
995                if path.is_file() {
996                    if let Ok(mut f) = std::fs::File::open(path) {
997                        use std::io::Read;
998                        let mut magic = [0u8; 4];
999                        if f.read_exact(&mut magic).is_ok()
1000                            && magic == grafeo_adapters::storage::file::MAGIC
1001                        {
1002                            return true;
1003                        }
1004                    }
1005                    return false;
1006                }
1007                // Existing directory: legacy format
1008                if path.is_dir() {
1009                    return false;
1010                }
1011                // New path: check extension
1012                path.extension().is_some_and(|ext| ext == "grafeo")
1013            }
1014        }
1015    }
1016
1017    /// Applies snapshot data (from a `.grafeo` file) to restore the store and catalog.
1018    #[cfg(feature = "grafeo-file")]
1019    fn apply_snapshot_data(
1020        store: &Arc<LpgStore>,
1021        catalog: &Arc<crate::catalog::Catalog>,
1022        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
1023        data: &[u8],
1024    ) -> Result<()> {
1025        persistence::load_snapshot_into_store(
1026            store,
1027            catalog,
1028            #[cfg(feature = "rdf")]
1029            rdf_store,
1030            data,
1031        )
1032    }
1033
1034    // =========================================================================
1035    // Session & Configuration
1036    // =========================================================================
1037
1038    /// Opens a new session for running queries.
1039    ///
1040    /// Sessions are cheap to create: spin up as many as you need. Each
1041    /// gets its own transaction context, so concurrent sessions won't
1042    /// block each other on reads.
1043    ///
1044    /// # Panics
1045    ///
1046    /// Panics if the database was configured with an external graph store and
1047    /// the internal arena allocator cannot be initialized (out of memory).
1048    ///
1049    /// # Examples
1050    ///
1051    /// ```
1052    /// use grafeo_engine::GrafeoDB;
1053    ///
1054    /// let db = GrafeoDB::new_in_memory();
1055    /// let session = db.session();
1056    ///
1057    /// // Run queries through the session
1058    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
1059    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1060    /// ```
1061    #[must_use]
1062    pub fn session(&self) -> Session {
1063        self.create_session_inner(None)
1064    }
1065
1066    /// Creates a session with an explicit CDC override.
1067    ///
1068    /// When `cdc_enabled` is `true`, mutations in this session are tracked
1069    /// regardless of the database default. When `false`, mutations are not
1070    /// tracked regardless of the database default.
1071    ///
1072    /// # Examples
1073    ///
1074    /// ```
1075    /// use grafeo_engine::GrafeoDB;
1076    ///
1077    /// let db = GrafeoDB::new_in_memory();
1078    ///
1079    /// // Opt in to CDC for just this session
1080    /// let tracked = db.session_with_cdc(true);
1081    /// tracked.execute("INSERT (:Person {name: 'Alix'})")?;
1082    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1083    /// ```
1084    #[cfg(feature = "cdc")]
1085    #[must_use]
1086    pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1087        self.create_session_inner(Some(cdc_enabled))
1088    }
1089
1090    /// Creates a read-only session regardless of the database's access mode.
1091    ///
1092    /// Mutations executed through this session will fail with
1093    /// `TransactionError::ReadOnly`. Useful for replication replicas where
1094    /// the database itself must remain writable (for applying CDC changes)
1095    /// but client-facing queries must be read-only.
1096    #[must_use]
1097    pub fn session_read_only(&self) -> Session {
1098        self.create_session_inner_opts(None, true)
1099    }
1100
1101    /// Shared session creation logic.
1102    ///
1103    /// `cdc_override` overrides the database-wide `cdc_enabled` default when
1104    /// `Some`. `None` falls back to the database default.
1105    #[allow(unused_variables)] // cdc_override unused when cdc feature is off
1106    fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1107        self.create_session_inner_opts(cdc_override, false)
1108    }
1109
1110    /// Shared session creation with all overrides.
1111    #[allow(unused_variables)]
1112    fn create_session_inner_opts(
1113        &self,
1114        cdc_override: Option<bool>,
1115        force_read_only: bool,
1116    ) -> Session {
1117        let session_cfg = || crate::session::SessionConfig {
1118            transaction_manager: Arc::clone(&self.transaction_manager),
1119            query_cache: Arc::clone(&self.query_cache),
1120            catalog: Arc::clone(&self.catalog),
1121            adaptive_config: self.config.adaptive.clone(),
1122            factorized_execution: self.config.factorized_execution,
1123            graph_model: self.config.graph_model,
1124            query_timeout: self.config.query_timeout,
1125            commit_counter: Arc::clone(&self.commit_counter),
1126            gc_interval: self.config.gc_interval,
1127            read_only: self.read_only || force_read_only,
1128        };
1129
1130        if let Some(ref ext_read) = self.external_read_store {
1131            return Session::with_external_store(
1132                Arc::clone(ext_read),
1133                self.external_write_store.as_ref().map(Arc::clone),
1134                session_cfg(),
1135            )
1136            .expect("arena allocation for external store session");
1137        }
1138
1139        #[cfg(feature = "rdf")]
1140        let mut session = Session::with_rdf_store_and_adaptive(
1141            Arc::clone(self.lpg_store()),
1142            Arc::clone(&self.rdf_store),
1143            session_cfg(),
1144        );
1145        #[cfg(not(feature = "rdf"))]
1146        let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1147
1148        #[cfg(feature = "wal")]
1149        if let Some(ref wal) = self.wal {
1150            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1151        }
1152
1153        #[cfg(feature = "cdc")]
1154        {
1155            let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1156            if should_enable {
1157                session.set_cdc_log(Arc::clone(&self.cdc_log));
1158            }
1159        }
1160
1161        #[cfg(feature = "metrics")]
1162        {
1163            if let Some(ref m) = self.metrics {
1164                session.set_metrics(Arc::clone(m));
1165                m.session_created
1166                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1167                m.session_active
1168                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1169            }
1170        }
1171
1172        // Propagate persistent graph context to the new session
1173        if let Some(ref graph) = *self.current_graph.read() {
1174            session.use_graph(graph);
1175        }
1176
1177        // Propagate persistent schema context to the new session
1178        if let Some(ref schema) = *self.current_schema.read() {
1179            session.set_schema(schema);
1180        }
1181
1182        // Suppress unused_mut when cdc/wal are disabled
1183        let _ = &mut session;
1184
1185        session
1186    }
1187
1188    /// Returns the current graph name, if any.
1189    ///
1190    /// This is the persistent graph context used by one-shot `execute()` calls.
1191    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
1192    /// or `SESSION RESET`.
1193    #[must_use]
1194    pub fn current_graph(&self) -> Option<String> {
1195        self.current_graph.read().clone()
1196    }
1197
1198    /// Sets the current graph context for subsequent one-shot `execute()` calls.
1199    ///
1200    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
1201    /// Pass `None` to reset to the default graph.
1202    pub fn set_current_graph(&self, name: Option<&str>) {
1203        *self.current_graph.write() = name.map(ToString::to_string);
1204    }
1205
1206    /// Returns the current schema name, if any.
1207    ///
1208    /// This is the persistent schema context used by one-shot `execute()` calls.
1209    /// It is updated whenever `execute()` encounters `SESSION SET SCHEMA` or `SESSION RESET`.
1210    #[must_use]
1211    pub fn current_schema(&self) -> Option<String> {
1212        self.current_schema.read().clone()
1213    }
1214
1215    /// Sets the current schema context for subsequent one-shot `execute()` calls.
1216    ///
1217    /// This is equivalent to running `SESSION SET SCHEMA <name>` but without creating
1218    /// a session. Pass `None` to clear the schema context.
1219    pub fn set_current_schema(&self, name: Option<&str>) {
1220        *self.current_schema.write() = name.map(ToString::to_string);
1221    }
1222
1223    /// Returns the adaptive execution configuration.
1224    #[must_use]
1225    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1226        &self.config.adaptive
1227    }
1228
1229    /// Returns `true` if this database was opened in read-only mode.
1230    #[must_use]
1231    pub fn is_read_only(&self) -> bool {
1232        self.read_only
1233    }
1234
1235    /// Returns the configuration.
1236    #[must_use]
1237    pub fn config(&self) -> &Config {
1238        &self.config
1239    }
1240
1241    /// Returns the graph data model of this database.
1242    #[must_use]
1243    pub fn graph_model(&self) -> crate::config::GraphModel {
1244        self.config.graph_model
1245    }
1246
1247    /// Returns the configured memory limit in bytes, if any.
1248    #[must_use]
1249    pub fn memory_limit(&self) -> Option<usize> {
1250        self.config.memory_limit
1251    }
1252
1253    /// Returns a point-in-time snapshot of all metrics.
1254    ///
1255    /// If the `metrics` feature is disabled or the registry is not
1256    /// initialized, returns a default (all-zero) snapshot.
1257    #[cfg(feature = "metrics")]
1258    #[must_use]
1259    pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1260        let mut snapshot = self
1261            .metrics
1262            .as_ref()
1263            .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1264
1265        // Augment with cache stats from the query cache (not tracked in the registry)
1266        let cache_stats = self.query_cache.stats();
1267        snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1268        snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1269        snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1270        snapshot.cache_invalidations = cache_stats.invalidations;
1271
1272        snapshot
1273    }
1274
1275    /// Returns all metrics in Prometheus text exposition format.
1276    ///
1277    /// The output is ready to serve from an HTTP `/metrics` endpoint.
1278    #[cfg(feature = "metrics")]
1279    #[must_use]
1280    pub fn metrics_prometheus(&self) -> String {
1281        self.metrics
1282            .as_ref()
1283            .map_or_else(String::new, |m| m.to_prometheus())
1284    }
1285
1286    /// Resets all metrics counters and histograms to zero.
1287    #[cfg(feature = "metrics")]
1288    pub fn reset_metrics(&self) {
1289        if let Some(ref m) = self.metrics {
1290            m.reset();
1291        }
1292        self.query_cache.reset_stats();
1293    }
1294
1295    /// Returns the underlying (default) store.
1296    ///
1297    /// This provides direct access to the LPG store for algorithm implementations
1298    /// and admin operations (index management, schema introspection, MVCC internals).
1299    ///
1300    /// For code that only needs read/write graph operations, prefer
1301    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
1302    #[must_use]
1303    pub fn store(&self) -> &Arc<LpgStore> {
1304        self.lpg_store()
1305    }
1306
1307    /// Returns the LPG store for the currently active graph.
1308    ///
1309    /// If [`current_graph`](Self::current_graph) is `None` or `"default"`, returns
1310    /// the default store. Otherwise looks up the named graph in the root store.
1311    /// Falls back to the default store if the named graph does not exist.
1312    #[allow(dead_code)] // Reserved for future graph-aware CRUD methods
1313    fn active_store(&self) -> Arc<LpgStore> {
1314        let store = self.lpg_store();
1315        let graph_name = self.current_graph.read().clone();
1316        match graph_name {
1317            None => Arc::clone(store),
1318            Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(store),
1319            Some(ref name) => store.graph(name).unwrap_or_else(|| Arc::clone(store)),
1320        }
1321    }
1322
1323    // === Named Graph Management ===
1324
1325    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
1326    ///
1327    /// # Errors
1328    ///
1329    /// Returns an error if arena allocation fails.
1330    pub fn create_graph(&self, name: &str) -> Result<bool> {
1331        Ok(self.lpg_store().create_graph(name)?)
1332    }
1333
1334    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
1335    pub fn drop_graph(&self, name: &str) -> bool {
1336        self.lpg_store().drop_graph(name)
1337    }
1338
1339    /// Returns all named graph names.
1340    #[must_use]
1341    pub fn list_graphs(&self) -> Vec<String> {
1342        self.lpg_store().graph_names()
1343    }
1344
1345    /// Returns the graph store as a trait object.
1346    ///
1347    /// Returns a read-only trait object for the active graph store.
1348    ///
1349    /// This provides the [`GraphStore`] interface for code that only needs
1350    /// read operations. For write access, use [`graph_store_mut()`](Self::graph_store_mut).
1351    ///
1352    /// [`GraphStore`]: grafeo_core::graph::GraphStore
1353    #[must_use]
1354    pub fn graph_store(&self) -> Arc<dyn GraphStore> {
1355        if let Some(ref ext_read) = self.external_read_store {
1356            Arc::clone(ext_read)
1357        } else {
1358            Arc::clone(self.lpg_store()) as Arc<dyn GraphStore>
1359        }
1360    }
1361
1362    /// Returns the writable graph store, if available.
1363    ///
1364    /// Returns `None` for read-only databases created via
1365    /// [`with_read_store()`](Self::with_read_store).
1366    #[must_use]
1367    pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1368        if self.external_read_store.is_some() {
1369            self.external_write_store.as_ref().map(Arc::clone)
1370        } else {
1371            Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1372        }
1373    }
1374
1375    /// Garbage collects old MVCC versions that are no longer visible.
1376    ///
1377    /// Determines the minimum epoch required by active transactions and prunes
1378    /// version chains older than that threshold. Also cleans up completed
1379    /// transaction metadata in the transaction manager.
1380    pub fn gc(&self) {
1381        let min_epoch = self.transaction_manager.min_active_epoch();
1382        self.lpg_store().gc_versions(min_epoch);
1383        self.transaction_manager.gc();
1384    }
1385
1386    /// Returns the buffer manager for memory-aware operations.
1387    #[must_use]
1388    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1389        &self.buffer_manager
1390    }
1391
1392    /// Returns the query cache.
1393    #[must_use]
1394    pub fn query_cache(&self) -> &Arc<QueryCache> {
1395        &self.query_cache
1396    }
1397
1398    /// Clears all cached query plans.
1399    ///
1400    /// This is called automatically after DDL operations, but can also be
1401    /// invoked manually after external schema changes (e.g., WAL replay,
1402    /// import) or when you want to force re-optimization of all queries.
1403    pub fn clear_plan_cache(&self) {
1404        self.query_cache.clear();
1405    }
1406
1407    // =========================================================================
1408    // Lifecycle
1409    // =========================================================================
1410
1411    /// Closes the database, flushing all pending writes.
1412    ///
1413    /// For persistent databases, this ensures everything is safely on disk.
1414    /// Called automatically when the database is dropped, but you can call
1415    /// it explicitly if you need to guarantee durability at a specific point.
1416    ///
1417    /// # Errors
1418    ///
1419    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
1420    pub fn close(&self) -> Result<()> {
1421        let mut is_open = self.is_open.write();
1422        if !*is_open {
1423            return Ok(());
1424        }
1425
1426        // Read-only databases: just release the shared lock, no checkpointing
1427        if self.read_only {
1428            #[cfg(feature = "grafeo-file")]
1429            if let Some(ref fm) = self.file_manager {
1430                fm.close()?;
1431            }
1432            *is_open = false;
1433            return Ok(());
1434        }
1435
1436        // For single-file format: checkpoint to .grafeo file, then clean up sidecar WAL.
1437        // We must do this BEFORE the WAL close path because checkpoint_to_file
1438        // removes the sidecar WAL directory.
1439        #[cfg(feature = "grafeo-file")]
1440        let is_single_file = self.file_manager.is_some();
1441        #[cfg(not(feature = "grafeo-file"))]
1442        let is_single_file = false;
1443
1444        #[cfg(feature = "grafeo-file")]
1445        if let Some(ref fm) = self.file_manager {
1446            // Flush WAL first so all records are on disk before we snapshot
1447            #[cfg(feature = "wal")]
1448            if let Some(ref wal) = self.wal {
1449                wal.sync()?;
1450            }
1451            self.checkpoint_to_file(fm)?;
1452
1453            // Release WAL file handles before removing sidecar directory.
1454            // On Windows, open handles prevent directory deletion.
1455            #[cfg(feature = "wal")]
1456            if let Some(ref wal) = self.wal {
1457                wal.close_active_log();
1458            }
1459
1460            {
1461                use grafeo_core::testing::crash::maybe_crash;
1462                maybe_crash("close:before_remove_sidecar_wal");
1463            }
1464            fm.remove_sidecar_wal()?;
1465            fm.close()?;
1466        }
1467
1468        // Commit and sync WAL (legacy directory format only).
1469        // We intentionally do NOT call wal.checkpoint() here. Directory format
1470        // has no snapshot: the WAL files are the sole source of truth. Writing
1471        // checkpoint.meta would cause recovery to skip older WAL files, losing
1472        // all data that predates the current log sequence.
1473        #[cfg(feature = "wal")]
1474        if !is_single_file && let Some(ref wal) = self.wal {
1475            // Use the last assigned transaction ID, or create one for the commit record
1476            let commit_tx = self
1477                .transaction_manager
1478                .last_assigned_transaction_id()
1479                .unwrap_or_else(|| self.transaction_manager.begin());
1480
1481            // Log a TransactionCommit to mark all pending records as committed
1482            wal.log(&WalRecord::TransactionCommit {
1483                transaction_id: commit_tx,
1484            })?;
1485
1486            wal.sync()?;
1487        }
1488
1489        *is_open = false;
1490        Ok(())
1491    }
1492
1493    /// Returns the typed WAL if available.
1494    #[cfg(feature = "wal")]
1495    #[must_use]
1496    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1497        self.wal.as_ref()
1498    }
1499
1500    /// Logs a WAL record if WAL is enabled.
1501    #[cfg(feature = "wal")]
1502    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1503        if let Some(ref wal) = self.wal {
1504            wal.log(record)?;
1505        }
1506        Ok(())
1507    }
1508
1509    /// Writes the current database snapshot to the `.grafeo` file.
1510    ///
1511    /// Does NOT remove the sidecar WAL: callers that want to clean up
1512    /// the sidecar (e.g. `close()`) should call `fm.remove_sidecar_wal()`
1513    /// separately after this returns.
1514    #[cfg(feature = "grafeo-file")]
1515    fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1516        use grafeo_core::testing::crash::maybe_crash;
1517
1518        maybe_crash("checkpoint_to_file:before_export");
1519        let snapshot_data = self.export_snapshot()?;
1520        maybe_crash("checkpoint_to_file:after_export");
1521
1522        let epoch = self.lpg_store().current_epoch();
1523        let transaction_id = self
1524            .transaction_manager
1525            .last_assigned_transaction_id()
1526            .map_or(0, |t| t.0);
1527        let node_count = self.lpg_store().node_count() as u64;
1528        let edge_count = self.lpg_store().edge_count() as u64;
1529
1530        fm.write_snapshot(
1531            &snapshot_data,
1532            epoch.0,
1533            transaction_id,
1534            node_count,
1535            edge_count,
1536        )?;
1537
1538        maybe_crash("checkpoint_to_file:after_write_snapshot");
1539        Ok(())
1540    }
1541
1542    /// Returns the file manager if using single-file format.
1543    #[cfg(feature = "grafeo-file")]
1544    #[must_use]
1545    pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1546        self.file_manager.as_ref()
1547    }
1548}
1549
1550impl Drop for GrafeoDB {
1551    fn drop(&mut self) {
1552        if let Err(e) = self.close() {
1553            grafeo_error!("Error closing database: {}", e);
1554        }
1555    }
1556}
1557
1558impl crate::admin::AdminService for GrafeoDB {
1559    fn info(&self) -> crate::admin::DatabaseInfo {
1560        self.info()
1561    }
1562
1563    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1564        self.detailed_stats()
1565    }
1566
1567    fn schema(&self) -> crate::admin::SchemaInfo {
1568        self.schema()
1569    }
1570
1571    fn validate(&self) -> crate::admin::ValidationResult {
1572        self.validate()
1573    }
1574
1575    fn wal_status(&self) -> crate::admin::WalStatus {
1576        self.wal_status()
1577    }
1578
1579    fn wal_checkpoint(&self) -> Result<()> {
1580        self.wal_checkpoint()
1581    }
1582}
1583
1584// =========================================================================
1585// Query Result Types
1586// =========================================================================
1587
1588/// The result of running a query.
1589///
1590/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1591/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1592///
1593/// # Examples
1594///
1595/// ```
1596/// use grafeo_engine::GrafeoDB;
1597///
1598/// let db = GrafeoDB::new_in_memory();
1599/// db.create_node(&["Person"]);
1600///
1601/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1602///
1603/// // Check what we got
1604/// println!("Columns: {:?}", result.columns);
1605/// println!("Rows: {}", result.row_count());
1606///
1607/// // Iterate through results
1608/// for row in result.iter() {
1609///     println!("{:?}", row);
1610/// }
1611/// # Ok::<(), grafeo_common::utils::error::Error>(())
1612/// ```
1613#[derive(Debug)]
1614pub struct QueryResult {
1615    /// Column names from the RETURN clause.
1616    pub columns: Vec<String>,
1617    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1618    pub column_types: Vec<grafeo_common::types::LogicalType>,
1619    /// The actual result rows.
1620    pub rows: Vec<Vec<grafeo_common::types::Value>>,
1621    /// Query execution time in milliseconds (if timing was enabled).
1622    pub execution_time_ms: Option<f64>,
1623    /// Number of rows scanned during query execution (estimate).
1624    pub rows_scanned: Option<u64>,
1625    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
1626    pub status_message: Option<String>,
1627    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
1628    pub gql_status: grafeo_common::utils::GqlStatus,
1629}
1630
1631impl QueryResult {
1632    /// Creates a fully empty query result (no columns, no rows).
1633    #[must_use]
1634    pub fn empty() -> Self {
1635        Self {
1636            columns: Vec::new(),
1637            column_types: Vec::new(),
1638            rows: Vec::new(),
1639            execution_time_ms: None,
1640            rows_scanned: None,
1641            status_message: None,
1642            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1643        }
1644    }
1645
1646    /// Creates a query result with only a status message (for DDL commands).
1647    #[must_use]
1648    pub fn status(msg: impl Into<String>) -> Self {
1649        Self {
1650            columns: Vec::new(),
1651            column_types: Vec::new(),
1652            rows: Vec::new(),
1653            execution_time_ms: None,
1654            rows_scanned: None,
1655            status_message: Some(msg.into()),
1656            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1657        }
1658    }
1659
1660    /// Creates a new empty query result.
1661    #[must_use]
1662    pub fn new(columns: Vec<String>) -> Self {
1663        let len = columns.len();
1664        Self {
1665            columns,
1666            column_types: vec![grafeo_common::types::LogicalType::Any; len],
1667            rows: Vec::new(),
1668            execution_time_ms: None,
1669            rows_scanned: None,
1670            status_message: None,
1671            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1672        }
1673    }
1674
1675    /// Creates a new empty query result with column types.
1676    #[must_use]
1677    pub fn with_types(
1678        columns: Vec<String>,
1679        column_types: Vec<grafeo_common::types::LogicalType>,
1680    ) -> Self {
1681        Self {
1682            columns,
1683            column_types,
1684            rows: Vec::new(),
1685            execution_time_ms: None,
1686            rows_scanned: None,
1687            status_message: None,
1688            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1689        }
1690    }
1691
1692    /// Sets the execution metrics on this result.
1693    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1694        self.execution_time_ms = Some(execution_time_ms);
1695        self.rows_scanned = Some(rows_scanned);
1696        self
1697    }
1698
1699    /// Returns the execution time in milliseconds, if available.
1700    #[must_use]
1701    pub fn execution_time_ms(&self) -> Option<f64> {
1702        self.execution_time_ms
1703    }
1704
1705    /// Returns the number of rows scanned, if available.
1706    #[must_use]
1707    pub fn rows_scanned(&self) -> Option<u64> {
1708        self.rows_scanned
1709    }
1710
1711    /// Returns the number of rows.
1712    #[must_use]
1713    pub fn row_count(&self) -> usize {
1714        self.rows.len()
1715    }
1716
1717    /// Returns the number of columns.
1718    #[must_use]
1719    pub fn column_count(&self) -> usize {
1720        self.columns.len()
1721    }
1722
1723    /// Returns true if the result is empty.
1724    #[must_use]
1725    pub fn is_empty(&self) -> bool {
1726        self.rows.is_empty()
1727    }
1728
1729    /// Extracts a single value from the result.
1730    ///
1731    /// Use this when your query returns exactly one row with one column,
1732    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1733    ///
1734    /// # Errors
1735    ///
1736    /// Returns an error if the result has multiple rows or columns.
1737    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1738        if self.rows.len() != 1 || self.columns.len() != 1 {
1739            return Err(grafeo_common::utils::error::Error::InvalidValue(
1740                "Expected single value".to_string(),
1741            ));
1742        }
1743        T::from_value(&self.rows[0][0])
1744    }
1745
1746    /// Returns an iterator over the rows.
1747    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1748        self.rows.iter()
1749    }
1750}
1751
1752impl std::fmt::Display for QueryResult {
1753    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1754        let table = grafeo_common::fmt::format_result_table(
1755            &self.columns,
1756            &self.rows,
1757            self.execution_time_ms,
1758            self.status_message.as_deref(),
1759        );
1760        f.write_str(&table)
1761    }
1762}
1763
1764/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
1765///
1766/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1767/// Used by [`QueryResult::scalar()`] to extract typed values.
1768pub trait FromValue: Sized {
1769    /// Attempts the conversion, returning an error on type mismatch.
1770    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1771}
1772
1773impl FromValue for i64 {
1774    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1775        value
1776            .as_int64()
1777            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1778                expected: "INT64".to_string(),
1779                found: value.type_name().to_string(),
1780            })
1781    }
1782}
1783
1784impl FromValue for f64 {
1785    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1786        value
1787            .as_float64()
1788            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1789                expected: "FLOAT64".to_string(),
1790                found: value.type_name().to_string(),
1791            })
1792    }
1793}
1794
1795impl FromValue for String {
1796    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1797        value.as_str().map(String::from).ok_or_else(|| {
1798            grafeo_common::utils::error::Error::TypeMismatch {
1799                expected: "STRING".to_string(),
1800                found: value.type_name().to_string(),
1801            }
1802        })
1803    }
1804}
1805
1806impl FromValue for bool {
1807    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1808        value
1809            .as_bool()
1810            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1811                expected: "BOOL".to_string(),
1812                found: value.type_name().to_string(),
1813            })
1814    }
1815}
1816
1817#[cfg(test)]
1818mod tests {
1819    use super::*;
1820
1821    #[test]
1822    fn test_create_in_memory_database() {
1823        let db = GrafeoDB::new_in_memory();
1824        assert_eq!(db.node_count(), 0);
1825        assert_eq!(db.edge_count(), 0);
1826    }
1827
1828    #[test]
1829    fn test_database_config() {
1830        let config = Config::in_memory().with_threads(4).with_query_logging();
1831
1832        let db = GrafeoDB::with_config(config).unwrap();
1833        assert_eq!(db.config().threads, 4);
1834        assert!(db.config().query_logging);
1835    }
1836
1837    #[test]
1838    fn test_database_session() {
1839        let db = GrafeoDB::new_in_memory();
1840        let _session = db.session();
1841        // Session should be created successfully
1842    }
1843
1844    #[cfg(feature = "wal")]
1845    #[test]
1846    fn test_persistent_database_recovery() {
1847        use grafeo_common::types::Value;
1848        use tempfile::tempdir;
1849
1850        let dir = tempdir().unwrap();
1851        let db_path = dir.path().join("test_db");
1852
1853        // Create database and add some data
1854        {
1855            let db = GrafeoDB::open(&db_path).unwrap();
1856
1857            let alix = db.create_node(&["Person"]);
1858            db.set_node_property(alix, "name", Value::from("Alix"));
1859
1860            let gus = db.create_node(&["Person"]);
1861            db.set_node_property(gus, "name", Value::from("Gus"));
1862
1863            let _edge = db.create_edge(alix, gus, "KNOWS");
1864
1865            // Explicitly close to flush WAL
1866            db.close().unwrap();
1867        }
1868
1869        // Reopen and verify data was recovered
1870        {
1871            let db = GrafeoDB::open(&db_path).unwrap();
1872
1873            assert_eq!(db.node_count(), 2);
1874            assert_eq!(db.edge_count(), 1);
1875
1876            // Verify nodes exist
1877            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1878            assert!(node0.is_some());
1879
1880            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1881            assert!(node1.is_some());
1882        }
1883    }
1884
1885    #[cfg(feature = "wal")]
1886    #[test]
1887    fn test_wal_logging() {
1888        use tempfile::tempdir;
1889
1890        let dir = tempdir().unwrap();
1891        let db_path = dir.path().join("wal_test_db");
1892
1893        let db = GrafeoDB::open(&db_path).unwrap();
1894
1895        // Create some data
1896        let node = db.create_node(&["Test"]);
1897        db.delete_node(node);
1898
1899        // WAL should have records
1900        if let Some(wal) = db.wal() {
1901            assert!(wal.record_count() > 0);
1902        }
1903
1904        db.close().unwrap();
1905    }
1906
1907    #[cfg(feature = "wal")]
1908    #[test]
1909    fn test_wal_recovery_multiple_sessions() {
1910        // Tests that WAL recovery works correctly across multiple open/close cycles
1911        use grafeo_common::types::Value;
1912        use tempfile::tempdir;
1913
1914        let dir = tempdir().unwrap();
1915        let db_path = dir.path().join("multi_session_db");
1916
1917        // Session 1: Create initial data
1918        {
1919            let db = GrafeoDB::open(&db_path).unwrap();
1920            let alix = db.create_node(&["Person"]);
1921            db.set_node_property(alix, "name", Value::from("Alix"));
1922            db.close().unwrap();
1923        }
1924
1925        // Session 2: Add more data
1926        {
1927            let db = GrafeoDB::open(&db_path).unwrap();
1928            assert_eq!(db.node_count(), 1); // Previous data recovered
1929            let gus = db.create_node(&["Person"]);
1930            db.set_node_property(gus, "name", Value::from("Gus"));
1931            db.close().unwrap();
1932        }
1933
1934        // Session 3: Verify all data
1935        {
1936            let db = GrafeoDB::open(&db_path).unwrap();
1937            assert_eq!(db.node_count(), 2);
1938
1939            // Verify properties were recovered correctly
1940            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1941            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1942
1943            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1944            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1945        }
1946    }
1947
1948    #[cfg(feature = "wal")]
1949    #[test]
1950    fn test_database_consistency_after_mutations() {
1951        // Tests that database remains consistent after a series of create/delete operations
1952        use grafeo_common::types::Value;
1953        use tempfile::tempdir;
1954
1955        let dir = tempdir().unwrap();
1956        let db_path = dir.path().join("consistency_db");
1957
1958        {
1959            let db = GrafeoDB::open(&db_path).unwrap();
1960
1961            // Create nodes
1962            let a = db.create_node(&["Node"]);
1963            let b = db.create_node(&["Node"]);
1964            let c = db.create_node(&["Node"]);
1965
1966            // Create edges
1967            let e1 = db.create_edge(a, b, "LINKS");
1968            let _e2 = db.create_edge(b, c, "LINKS");
1969
1970            // Delete middle node and its edge
1971            db.delete_edge(e1);
1972            db.delete_node(b);
1973
1974            // Set properties on remaining nodes
1975            db.set_node_property(a, "value", Value::Int64(1));
1976            db.set_node_property(c, "value", Value::Int64(3));
1977
1978            db.close().unwrap();
1979        }
1980
1981        // Reopen and verify consistency
1982        {
1983            let db = GrafeoDB::open(&db_path).unwrap();
1984
1985            // Should have 2 nodes (a and c), b was deleted
1986            // Note: node_count includes deleted nodes in some implementations
1987            // What matters is that the non-deleted nodes are accessible
1988            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1989            assert!(node_a.is_some());
1990
1991            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1992            assert!(node_c.is_some());
1993
1994            // Middle node should be deleted
1995            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1996            assert!(node_b.is_none());
1997        }
1998    }
1999
2000    #[cfg(feature = "wal")]
2001    #[test]
2002    fn test_close_is_idempotent() {
2003        // Calling close() multiple times should not cause errors
2004        use tempfile::tempdir;
2005
2006        let dir = tempdir().unwrap();
2007        let db_path = dir.path().join("close_test_db");
2008
2009        let db = GrafeoDB::open(&db_path).unwrap();
2010        db.create_node(&["Test"]);
2011
2012        // First close should succeed
2013        assert!(db.close().is_ok());
2014
2015        // Second close should also succeed (idempotent)
2016        assert!(db.close().is_ok());
2017    }
2018
2019    #[test]
2020    fn test_with_store_external_backend() {
2021        use grafeo_core::graph::lpg::LpgStore;
2022
2023        let external = Arc::new(LpgStore::new().unwrap());
2024
2025        // Seed data on the external store directly
2026        let n1 = external.create_node(&["Person"]);
2027        external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
2028
2029        let db = GrafeoDB::with_store(
2030            Arc::clone(&external) as Arc<dyn GraphStoreMut>,
2031            Config::in_memory(),
2032        )
2033        .unwrap();
2034
2035        let session = db.session();
2036
2037        // Session should see data from the external store via execute
2038        #[cfg(feature = "gql")]
2039        {
2040            let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
2041            assert_eq!(result.rows.len(), 1);
2042        }
2043    }
2044
2045    #[test]
2046    fn test_with_config_custom_memory_limit() {
2047        let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); // 64 MB
2048
2049        let db = GrafeoDB::with_config(config).unwrap();
2050        assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
2051        assert_eq!(db.node_count(), 0);
2052    }
2053
2054    #[cfg(feature = "metrics")]
2055    #[test]
2056    fn test_database_metrics_registry() {
2057        let db = GrafeoDB::new_in_memory();
2058
2059        // Perform some operations
2060        db.create_node(&["Person"]);
2061        db.create_node(&["Person"]);
2062
2063        // Check that metrics snapshot returns data
2064        let snap = db.metrics();
2065        // Session created counter should reflect at least 0 (metrics is initialized)
2066        assert_eq!(snap.query_count, 0); // No queries executed yet
2067    }
2068
2069    #[test]
2070    fn test_query_result_has_metrics() {
2071        // Verifies that query results include execution metrics
2072        let db = GrafeoDB::new_in_memory();
2073        db.create_node(&["Person"]);
2074        db.create_node(&["Person"]);
2075
2076        #[cfg(feature = "gql")]
2077        {
2078            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2079
2080            // Metrics should be populated
2081            assert!(result.execution_time_ms.is_some());
2082            assert!(result.rows_scanned.is_some());
2083            assert!(result.execution_time_ms.unwrap() >= 0.0);
2084            assert_eq!(result.rows_scanned.unwrap(), 2);
2085        }
2086    }
2087
2088    #[test]
2089    fn test_empty_query_result_metrics() {
2090        // Verifies metrics are correct for queries returning no results
2091        let db = GrafeoDB::new_in_memory();
2092        db.create_node(&["Person"]);
2093
2094        #[cfg(feature = "gql")]
2095        {
2096            // Query that matches nothing
2097            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2098
2099            assert!(result.execution_time_ms.is_some());
2100            assert!(result.rows_scanned.is_some());
2101            assert_eq!(result.rows_scanned.unwrap(), 0);
2102        }
2103    }
2104
2105    #[cfg(feature = "cdc")]
2106    mod cdc_integration {
2107        use super::*;
2108
2109        /// Helper: creates an in-memory database with CDC enabled.
2110        fn cdc_db() -> GrafeoDB {
2111            GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
2112        }
2113
2114        #[test]
2115        fn test_node_lifecycle_history() {
2116            let db = cdc_db();
2117
2118            // Create
2119            let id = db.create_node(&["Person"]);
2120            // Update
2121            db.set_node_property(id, "name", "Alix".into());
2122            db.set_node_property(id, "name", "Gus".into());
2123            // Delete
2124            db.delete_node(id);
2125
2126            let history = db.history(id).unwrap();
2127            assert_eq!(history.len(), 4); // create + 2 updates + delete
2128            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2129            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2130            assert!(history[1].before.is_none()); // first set_node_property has no prior value
2131            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
2132            assert!(history[2].before.is_some()); // second update has prior "Alix"
2133            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
2134        }
2135
2136        #[test]
2137        fn test_edge_lifecycle_history() {
2138            let db = cdc_db();
2139
2140            let alix = db.create_node(&["Person"]);
2141            let gus = db.create_node(&["Person"]);
2142            let edge = db.create_edge(alix, gus, "KNOWS");
2143            db.set_edge_property(edge, "since", 2024i64.into());
2144            db.delete_edge(edge);
2145
2146            let history = db.history(edge).unwrap();
2147            assert_eq!(history.len(), 3); // create + update + delete
2148            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2149            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2150            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
2151        }
2152
2153        #[test]
2154        fn test_create_node_with_props_cdc() {
2155            let db = cdc_db();
2156
2157            let id = db.create_node_with_props(
2158                &["Person"],
2159                vec![
2160                    ("name", grafeo_common::types::Value::from("Alix")),
2161                    ("age", grafeo_common::types::Value::from(30i64)),
2162                ],
2163            );
2164
2165            let history = db.history(id).unwrap();
2166            assert_eq!(history.len(), 1);
2167            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2168            // Props should be captured
2169            let after = history[0].after.as_ref().unwrap();
2170            assert_eq!(after.len(), 2);
2171        }
2172
2173        #[test]
2174        fn test_changes_between() {
2175            let db = cdc_db();
2176
2177            let id1 = db.create_node(&["A"]);
2178            let _id2 = db.create_node(&["B"]);
2179            db.set_node_property(id1, "x", 1i64.into());
2180
2181            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
2182            let changes = db
2183                .changes_between(
2184                    grafeo_common::types::EpochId(0),
2185                    grafeo_common::types::EpochId(u64::MAX),
2186                )
2187                .unwrap();
2188            assert_eq!(changes.len(), 3); // 2 creates + 1 update
2189        }
2190
2191        #[test]
2192        fn test_cdc_disabled_by_default() {
2193            let db = GrafeoDB::new_in_memory();
2194            assert!(!db.is_cdc_enabled());
2195
2196            let id = db.create_node(&["Person"]);
2197            db.set_node_property(id, "name", "Alix".into());
2198
2199            let history = db.history(id).unwrap();
2200            assert!(history.is_empty(), "CDC off by default: no events recorded");
2201        }
2202
2203        #[test]
2204        fn test_session_with_cdc_override_on() {
2205            // Database default is off, but session opts in
2206            let db = GrafeoDB::new_in_memory();
2207            let session = db.session_with_cdc(true);
2208            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2209            // The CDC log should have events from the opted-in session
2210            let changes = db
2211                .changes_between(
2212                    grafeo_common::types::EpochId(0),
2213                    grafeo_common::types::EpochId(u64::MAX),
2214                )
2215                .unwrap();
2216            assert!(
2217                !changes.is_empty(),
2218                "session_with_cdc(true) should record events"
2219            );
2220        }
2221
2222        #[test]
2223        fn test_session_with_cdc_override_off() {
2224            // Database default is on, but session opts out
2225            let db = cdc_db();
2226            let session = db.session_with_cdc(false);
2227            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2228            let changes = db
2229                .changes_between(
2230                    grafeo_common::types::EpochId(0),
2231                    grafeo_common::types::EpochId(u64::MAX),
2232                )
2233                .unwrap();
2234            assert!(
2235                changes.is_empty(),
2236                "session_with_cdc(false) should not record events"
2237            );
2238        }
2239
2240        #[test]
2241        fn test_set_cdc_enabled_runtime() {
2242            let db = GrafeoDB::new_in_memory();
2243            assert!(!db.is_cdc_enabled());
2244
2245            // Enable at runtime
2246            db.set_cdc_enabled(true);
2247            assert!(db.is_cdc_enabled());
2248
2249            let id = db.create_node(&["Person"]);
2250            let history = db.history(id).unwrap();
2251            assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
2252
2253            // Disable again
2254            db.set_cdc_enabled(false);
2255            let id2 = db.create_node(&["Person"]);
2256            let history2 = db.history(id2).unwrap();
2257            assert!(
2258                history2.is_empty(),
2259                "CDC disabled at runtime stops recording"
2260            );
2261        }
2262    }
2263
2264    #[test]
2265    fn test_with_store_basic() {
2266        use grafeo_core::graph::lpg::LpgStore;
2267
2268        let store = Arc::new(LpgStore::new().unwrap());
2269        let n1 = store.create_node(&["Person"]);
2270        store.set_node_property(n1, "name", "Alix".into());
2271
2272        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2273        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2274
2275        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
2276        assert_eq!(result.rows.len(), 1);
2277    }
2278
2279    #[test]
2280    fn test_with_store_session() {
2281        use grafeo_core::graph::lpg::LpgStore;
2282
2283        let store = Arc::new(LpgStore::new().unwrap());
2284        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2285        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2286
2287        let session = db.session();
2288        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
2289        assert_eq!(result.rows.len(), 1);
2290    }
2291
2292    #[test]
2293    fn test_with_store_mutations() {
2294        use grafeo_core::graph::lpg::LpgStore;
2295
2296        let store = Arc::new(LpgStore::new().unwrap());
2297        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2298        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2299
2300        let mut session = db.session();
2301
2302        // Use an explicit transaction so INSERT and MATCH share the same
2303        // transaction context. With PENDING epochs, uncommitted versions are
2304        // only visible to the owning transaction.
2305        session.begin_transaction().unwrap();
2306        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2307
2308        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
2309        assert_eq!(result.rows.len(), 1);
2310
2311        session.commit().unwrap();
2312    }
2313}