Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14mod admin;
15#[cfg(feature = "async-storage")]
16mod async_ops;
17#[cfg(feature = "async-storage")]
18pub(crate) mod async_wal_store;
19#[cfg(feature = "cdc")]
20pub(crate) mod cdc_store;
21mod crud;
22#[cfg(feature = "embed")]
23mod embed;
24mod import;
25mod index;
26mod persistence;
27mod query;
28#[cfg(feature = "rdf")]
29mod rdf_ops;
30mod search;
31#[cfg(feature = "wal")]
32pub(crate) mod wal_store;
33
34use grafeo_common::grafeo_error;
35#[cfg(feature = "wal")]
36use std::path::Path;
37use std::sync::Arc;
38use std::sync::atomic::AtomicUsize;
39
40use parking_lot::RwLock;
41
42#[cfg(feature = "grafeo-file")]
43use grafeo_adapters::storage::file::GrafeoFileManager;
44#[cfg(feature = "wal")]
45use grafeo_adapters::storage::wal::{
46    DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
47};
48use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
49use grafeo_common::utils::error::Result;
50use grafeo_core::graph::lpg::LpgStore;
51#[cfg(feature = "rdf")]
52use grafeo_core::graph::rdf::RdfStore;
53use grafeo_core::graph::{GraphStore, GraphStoreMut};
54
55use crate::catalog::Catalog;
56use crate::config::Config;
57use crate::query::cache::QueryCache;
58use crate::session::Session;
59use crate::transaction::TransactionManager;
60
61/// Your handle to a Grafeo database.
62///
63/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
64/// quick experiments, or [`open()`](Self::open) for persistent storage.
65/// Then grab a [`session()`](Self::session) to start querying.
66///
67/// # Examples
68///
69/// ```
70/// use grafeo_engine::GrafeoDB;
71///
72/// // Quick in-memory database
73/// let db = GrafeoDB::new_in_memory();
74///
75/// // Add some data
76/// db.create_node(&["Person"]);
77///
78/// // Query it
79/// let session = db.session();
80/// let result = session.execute("MATCH (p:Person) RETURN p")?;
81/// # Ok::<(), grafeo_common::utils::error::Error>(())
82/// ```
83pub struct GrafeoDB {
84    /// Database configuration.
85    pub(super) config: Config,
86    /// The underlying graph store (None when using an external store).
87    pub(super) store: Option<Arc<LpgStore>>,
88    /// Schema and metadata catalog shared across sessions.
89    pub(super) catalog: Arc<Catalog>,
90    /// RDF triple store (if RDF feature is enabled).
91    #[cfg(feature = "rdf")]
92    pub(super) rdf_store: Arc<RdfStore>,
93    /// Transaction manager.
94    pub(super) transaction_manager: Arc<TransactionManager>,
95    /// Unified buffer manager.
96    pub(super) buffer_manager: Arc<BufferManager>,
97    /// Write-ahead log manager (if durability is enabled).
98    #[cfg(feature = "wal")]
99    pub(super) wal: Option<Arc<LpgWal>>,
100    /// Shared WAL graph context tracker. Tracks which named graph was last
101    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
102    /// records only when the context actually changes.
103    #[cfg(feature = "wal")]
104    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
105    /// Query cache for parsed and optimized plans.
106    pub(super) query_cache: Arc<QueryCache>,
107    /// Shared commit counter for auto-GC across sessions.
108    pub(super) commit_counter: Arc<AtomicUsize>,
109    /// Whether the database is open.
110    pub(super) is_open: RwLock<bool>,
111    /// Change data capture log for tracking mutations.
112    #[cfg(feature = "cdc")]
113    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
114    /// Whether CDC is active for new sessions and direct CRUD (runtime-mutable).
115    #[cfg(feature = "cdc")]
116    cdc_enabled: std::sync::atomic::AtomicBool,
117    /// Registered embedding models for text-to-vector conversion.
118    #[cfg(feature = "embed")]
119    pub(super) embedding_models:
120        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
121    /// Single-file database manager (when using `.grafeo` format).
122    #[cfg(feature = "grafeo-file")]
123    pub(super) file_manager: Option<Arc<GrafeoFileManager>>,
124    /// External read-only graph store (when using with_store() or with_read_store()).
125    /// When set, sessions route queries through this store instead of the built-in LpgStore.
126    pub(super) external_read_store: Option<Arc<dyn GraphStore>>,
127    /// External writable graph store (when using with_store()).
128    /// None for read-only databases created via with_read_store().
129    pub(super) external_write_store: Option<Arc<dyn GraphStoreMut>>,
130    /// Metrics registry shared across all sessions.
131    #[cfg(feature = "metrics")]
132    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
133    /// Persistent graph context for one-shot `execute()` calls.
134    /// When set, each call to `session()` pre-configures the session to this graph.
135    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
136    current_graph: RwLock<Option<String>>,
137    /// Persistent schema context for one-shot `execute()` calls.
138    /// When set, each call to `session()` pre-configures the session to this schema.
139    /// Updated after every one-shot `execute()` to reflect `SESSION SET SCHEMA` / `SESSION RESET`.
140    current_schema: RwLock<Option<String>>,
141    /// Whether this database is open in read-only mode.
142    /// When true, sessions automatically enforce read-only transactions.
143    read_only: bool,
144}
145
146impl GrafeoDB {
147    /// Returns a reference to the built-in LPG store.
148    ///
149    /// # Panics
150    ///
151    /// Panics if the database was created with [`with_store()`](Self::with_store) or
152    /// [`with_read_store()`](Self::with_read_store), which use an external store
153    /// instead of the built-in LPG store.
154    fn lpg_store(&self) -> &Arc<LpgStore> {
155        self.store.as_ref().expect(
156            "no built-in LpgStore: this GrafeoDB was created with an external store \
157             (with_store / with_read_store). Use session() or graph_store() instead.",
158        )
159    }
160
161    /// Returns whether CDC is active (runtime check).
162    #[cfg(feature = "cdc")]
163    #[inline]
164    pub(super) fn cdc_active(&self) -> bool {
165        self.cdc_enabled.load(std::sync::atomic::Ordering::Relaxed)
166    }
167
168    /// Creates an in-memory database, fast to create, gone when dropped.
169    ///
170    /// Use this for tests, experiments, or when you don't need persistence.
171    /// For data that survives restarts, use [`open()`](Self::open) instead.
172    ///
173    /// # Panics
174    ///
175    /// Panics if the internal arena allocator cannot be initialized (out of memory).
176    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
177    ///
178    /// # Examples
179    ///
180    /// ```
181    /// use grafeo_engine::GrafeoDB;
182    ///
183    /// let db = GrafeoDB::new_in_memory();
184    /// let session = db.session();
185    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
186    /// # Ok::<(), grafeo_common::utils::error::Error>(())
187    /// ```
188    #[must_use]
189    pub fn new_in_memory() -> Self {
190        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
191    }
192
193    /// Opens a database at the given path, creating it if it doesn't exist.
194    ///
195    /// If you've used this path before, Grafeo recovers your data from the
196    /// write-ahead log automatically. First open on a new path creates an
197    /// empty database.
198    ///
199    /// # Errors
200    ///
201    /// Returns an error if the path isn't writable or recovery fails.
202    ///
203    /// # Examples
204    ///
205    /// ```no_run
206    /// use grafeo_engine::GrafeoDB;
207    ///
208    /// let db = GrafeoDB::open("./my_social_network")?;
209    /// # Ok::<(), grafeo_common::utils::error::Error>(())
210    /// ```
211    #[cfg(feature = "wal")]
212    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
213        Self::with_config(Config::persistent(path.as_ref()))
214    }
215
216    /// Opens an existing database in read-only mode.
217    ///
218    /// Uses a shared file lock, so multiple processes can read the same
219    /// `.grafeo` file concurrently. The database loads the last checkpoint
220    /// snapshot but does **not** replay the WAL or allow mutations.
221    ///
222    /// Currently only supports the single-file (`.grafeo`) format.
223    ///
224    /// # Errors
225    ///
226    /// Returns an error if the file doesn't exist or can't be read.
227    ///
228    /// # Examples
229    ///
230    /// ```no_run
231    /// use grafeo_engine::GrafeoDB;
232    ///
233    /// let db = GrafeoDB::open_read_only("./my_graph.grafeo")?;
234    /// let session = db.session();
235    /// let result = session.execute("MATCH (n) RETURN n LIMIT 10")?;
236    /// // Mutations will return an error:
237    /// // session.execute("INSERT (:Person)") => Err(ReadOnly)
238    /// # Ok::<(), grafeo_common::utils::error::Error>(())
239    /// ```
240    #[cfg(feature = "grafeo-file")]
241    pub fn open_read_only(path: impl AsRef<std::path::Path>) -> Result<Self> {
242        Self::with_config(Config::read_only(path.as_ref()))
243    }
244
245    /// Creates a database with custom configuration.
246    ///
247    /// Use this when you need fine-grained control over memory limits,
248    /// thread counts, or persistence settings. For most cases,
249    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
250    /// are simpler.
251    ///
252    /// # Errors
253    ///
254    /// Returns an error if the database can't be created or recovery fails.
255    ///
256    /// # Examples
257    ///
258    /// ```
259    /// use grafeo_engine::{GrafeoDB, Config};
260    ///
261    /// // In-memory with a 512MB limit
262    /// let config = Config::in_memory()
263    ///     .with_memory_limit(512 * 1024 * 1024);
264    ///
265    /// let db = GrafeoDB::with_config(config)?;
266    /// # Ok::<(), grafeo_common::utils::error::Error>(())
267    /// ```
268    pub fn with_config(config: Config) -> Result<Self> {
269        // Validate configuration before proceeding
270        config
271            .validate()
272            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
273
274        let store = Arc::new(LpgStore::new()?);
275        #[cfg(feature = "rdf")]
276        let rdf_store = Arc::new(RdfStore::new());
277        let transaction_manager = Arc::new(TransactionManager::new());
278
279        // Create buffer manager with configured limits
280        let buffer_config = BufferManagerConfig {
281            budget: config.memory_limit.unwrap_or_else(|| {
282                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
283            }),
284            spill_path: config
285                .spill_path
286                .clone()
287                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
288            ..BufferManagerConfig::default()
289        };
290        let buffer_manager = BufferManager::new(buffer_config);
291
292        // Create catalog early so WAL replay can restore schema definitions
293        let catalog = Arc::new(Catalog::new());
294
295        let is_read_only = config.access_mode == crate::config::AccessMode::ReadOnly;
296
297        // --- Single-file format (.grafeo) ---
298        #[cfg(feature = "grafeo-file")]
299        let file_manager: Option<Arc<GrafeoFileManager>> = if is_read_only {
300            // Read-only mode: open with shared lock, load snapshot, skip WAL
301            if let Some(ref db_path) = config.path {
302                if db_path.exists() && db_path.is_file() {
303                    let fm = GrafeoFileManager::open_read_only(db_path)?;
304                    let snapshot_data = fm.read_snapshot()?;
305                    if !snapshot_data.is_empty() {
306                        Self::apply_snapshot_data(
307                            &store,
308                            &catalog,
309                            #[cfg(feature = "rdf")]
310                            &rdf_store,
311                            &snapshot_data,
312                        )?;
313                    }
314                    Some(Arc::new(fm))
315                } else {
316                    return Err(grafeo_common::utils::error::Error::Internal(format!(
317                        "read-only open requires an existing .grafeo file: {}",
318                        db_path.display()
319                    )));
320                }
321            } else {
322                return Err(grafeo_common::utils::error::Error::Internal(
323                    "read-only mode requires a database path".to_string(),
324                ));
325            }
326        } else if let Some(ref db_path) = config.path {
327            // Initialize the file manager whenever single-file format is selected,
328            // regardless of whether WAL is enabled. Without this, a database opened
329            // with wal_enabled:false + StorageFormat::SingleFile would produce no
330            // output at all (the file manager was previously gated behind wal_enabled).
331            if Self::should_use_single_file(db_path, config.storage_format) {
332                let fm = if db_path.exists() && db_path.is_file() {
333                    GrafeoFileManager::open(db_path)?
334                } else if !db_path.exists() {
335                    GrafeoFileManager::create(db_path)?
336                } else {
337                    // Path exists but is not a file (directory, etc.)
338                    return Err(grafeo_common::utils::error::Error::Internal(format!(
339                        "path exists but is not a file: {}",
340                        db_path.display()
341                    )));
342                };
343
344                // Load snapshot data from the file
345                let snapshot_data = fm.read_snapshot()?;
346                if !snapshot_data.is_empty() {
347                    Self::apply_snapshot_data(
348                        &store,
349                        &catalog,
350                        #[cfg(feature = "rdf")]
351                        &rdf_store,
352                        &snapshot_data,
353                    )?;
354                }
355
356                // Recover sidecar WAL if WAL is enabled and a sidecar exists
357                #[cfg(feature = "wal")]
358                if config.wal_enabled && fm.has_sidecar_wal() {
359                    let recovery = WalRecovery::new(fm.sidecar_wal_path());
360                    let records = recovery.recover()?;
361                    Self::apply_wal_records(
362                        &store,
363                        &catalog,
364                        #[cfg(feature = "rdf")]
365                        &rdf_store,
366                        &records,
367                    )?;
368                }
369
370                Some(Arc::new(fm))
371            } else {
372                None
373            }
374        } else {
375            None
376        };
377
378        // Determine whether to use the WAL directory path (legacy) or sidecar
379        // Read-only mode skips WAL entirely (no recovery, no creation).
380        #[cfg(feature = "wal")]
381        let wal = if is_read_only {
382            None
383        } else if config.wal_enabled {
384            if let Some(ref db_path) = config.path {
385                // When using single-file format, the WAL is a sidecar directory
386                #[cfg(feature = "grafeo-file")]
387                let wal_path = if let Some(ref fm) = file_manager {
388                    let p = fm.sidecar_wal_path();
389                    std::fs::create_dir_all(&p)?;
390                    p
391                } else {
392                    // Legacy: WAL inside the database directory
393                    std::fs::create_dir_all(db_path)?;
394                    db_path.join("wal")
395                };
396
397                #[cfg(not(feature = "grafeo-file"))]
398                let wal_path = {
399                    std::fs::create_dir_all(db_path)?;
400                    db_path.join("wal")
401                };
402
403                // For legacy WAL directory format, check if WAL exists and recover
404                #[cfg(feature = "grafeo-file")]
405                let is_single_file = file_manager.is_some();
406                #[cfg(not(feature = "grafeo-file"))]
407                let is_single_file = false;
408
409                if !is_single_file && wal_path.exists() {
410                    let recovery = WalRecovery::new(&wal_path);
411                    let records = recovery.recover()?;
412                    Self::apply_wal_records(
413                        &store,
414                        &catalog,
415                        #[cfg(feature = "rdf")]
416                        &rdf_store,
417                        &records,
418                    )?;
419                }
420
421                // Open/create WAL manager with configured durability
422                let wal_durability = match config.wal_durability {
423                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
424                    crate::config::DurabilityMode::Batch {
425                        max_delay_ms,
426                        max_records,
427                    } => WalDurabilityMode::Batch {
428                        max_delay_ms,
429                        max_records,
430                    },
431                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
432                        WalDurabilityMode::Adaptive { target_interval_ms }
433                    }
434                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
435                };
436                let wal_config = WalConfig {
437                    durability: wal_durability,
438                    ..WalConfig::default()
439                };
440                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
441                Some(Arc::new(wal_manager))
442            } else {
443                None
444            }
445        } else {
446            None
447        };
448
449        // Create query cache with default capacity (1000 queries)
450        let query_cache = Arc::new(QueryCache::default());
451
452        // After all snapshot/WAL recovery, sync TransactionManager epoch
453        // with the store so queries use the correct viewing epoch.
454        #[cfg(feature = "temporal")]
455        transaction_manager.sync_epoch(store.current_epoch());
456
457        #[cfg(feature = "cdc")]
458        let cdc_enabled_val = config.cdc_enabled;
459
460        Ok(Self {
461            config,
462            store: Some(store),
463            catalog,
464            #[cfg(feature = "rdf")]
465            rdf_store,
466            transaction_manager,
467            buffer_manager,
468            #[cfg(feature = "wal")]
469            wal,
470            #[cfg(feature = "wal")]
471            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
472            query_cache,
473            commit_counter: Arc::new(AtomicUsize::new(0)),
474            is_open: RwLock::new(true),
475            #[cfg(feature = "cdc")]
476            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
477            #[cfg(feature = "cdc")]
478            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
479            #[cfg(feature = "embed")]
480            embedding_models: RwLock::new(hashbrown::HashMap::new()),
481            #[cfg(feature = "grafeo-file")]
482            file_manager,
483            external_read_store: None,
484            external_write_store: None,
485            #[cfg(feature = "metrics")]
486            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
487            current_graph: RwLock::new(None),
488            current_schema: RwLock::new(None),
489            read_only: is_read_only,
490        })
491    }
492
493    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
494    ///
495    /// The external store handles all data persistence. WAL, CDC, and index
496    /// management are the responsibility of the store implementation.
497    ///
498    /// Query execution (all 6 languages, optimizer, planner) works through the
499    /// provided store. Admin operations (schema introspection, persistence,
500    /// vector/text indexes) are not available on external stores.
501    ///
502    /// # Examples
503    ///
504    /// ```no_run
505    /// use std::sync::Arc;
506    /// use grafeo_engine::{GrafeoDB, Config};
507    /// use grafeo_core::graph::GraphStoreMut;
508    ///
509    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
510    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
511    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
512    ///     Ok(())
513    /// }
514    /// ```
515    ///
516    /// # Errors
517    ///
518    /// Returns an error if config validation fails.
519    ///
520    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
521    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
522        config
523            .validate()
524            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
525
526        let transaction_manager = Arc::new(TransactionManager::new());
527
528        let buffer_config = BufferManagerConfig {
529            budget: config.memory_limit.unwrap_or_else(|| {
530                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
531            }),
532            spill_path: None,
533            ..BufferManagerConfig::default()
534        };
535        let buffer_manager = BufferManager::new(buffer_config);
536
537        let query_cache = Arc::new(QueryCache::default());
538
539        #[cfg(feature = "cdc")]
540        let cdc_enabled_val = config.cdc_enabled;
541
542        Ok(Self {
543            config,
544            store: None,
545            catalog: Arc::new(Catalog::new()),
546            #[cfg(feature = "rdf")]
547            rdf_store: Arc::new(RdfStore::new()),
548            transaction_manager,
549            buffer_manager,
550            #[cfg(feature = "wal")]
551            wal: None,
552            #[cfg(feature = "wal")]
553            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
554            query_cache,
555            commit_counter: Arc::new(AtomicUsize::new(0)),
556            is_open: RwLock::new(true),
557            #[cfg(feature = "cdc")]
558            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
559            #[cfg(feature = "cdc")]
560            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
561            #[cfg(feature = "embed")]
562            embedding_models: RwLock::new(hashbrown::HashMap::new()),
563            #[cfg(feature = "grafeo-file")]
564            file_manager: None,
565            external_read_store: Some(Arc::clone(&store) as Arc<dyn GraphStore>),
566            external_write_store: Some(store),
567            #[cfg(feature = "metrics")]
568            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
569            current_graph: RwLock::new(None),
570            current_schema: RwLock::new(None),
571            read_only: false,
572        })
573    }
574
575    /// Creates a database backed by a read-only [`GraphStore`].
576    ///
577    /// The database is set to read-only mode. Write queries (CREATE, SET,
578    /// DELETE) will return `TransactionError::ReadOnly`.
579    ///
580    /// # Examples
581    ///
582    /// ```no_run
583    /// use std::sync::Arc;
584    /// use grafeo_engine::{GrafeoDB, Config};
585    /// use grafeo_core::graph::GraphStore;
586    ///
587    /// fn example(store: Arc<dyn GraphStore>) -> grafeo_common::utils::error::Result<()> {
588    ///     let db = GrafeoDB::with_read_store(store, Config::in_memory())?;
589    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
590    ///     Ok(())
591    /// }
592    /// ```
593    ///
594    /// # Errors
595    ///
596    /// Returns an error if config validation fails.
597    ///
598    /// [`GraphStore`]: grafeo_core::graph::GraphStore
599    pub fn with_read_store(store: Arc<dyn GraphStore>, config: Config) -> Result<Self> {
600        config
601            .validate()
602            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
603
604        let transaction_manager = Arc::new(TransactionManager::new());
605
606        let buffer_config = BufferManagerConfig {
607            budget: config.memory_limit.unwrap_or_else(|| {
608                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
609            }),
610            spill_path: None,
611            ..BufferManagerConfig::default()
612        };
613        let buffer_manager = BufferManager::new(buffer_config);
614
615        let query_cache = Arc::new(QueryCache::default());
616
617        #[cfg(feature = "cdc")]
618        let cdc_enabled_val = config.cdc_enabled;
619
620        Ok(Self {
621            config,
622            store: None,
623            catalog: Arc::new(Catalog::new()),
624            #[cfg(feature = "rdf")]
625            rdf_store: Arc::new(RdfStore::new()),
626            transaction_manager,
627            buffer_manager,
628            #[cfg(feature = "wal")]
629            wal: None,
630            #[cfg(feature = "wal")]
631            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
632            query_cache,
633            commit_counter: Arc::new(AtomicUsize::new(0)),
634            is_open: RwLock::new(true),
635            #[cfg(feature = "cdc")]
636            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
637            #[cfg(feature = "cdc")]
638            cdc_enabled: std::sync::atomic::AtomicBool::new(cdc_enabled_val),
639            #[cfg(feature = "embed")]
640            embedding_models: RwLock::new(hashbrown::HashMap::new()),
641            #[cfg(feature = "grafeo-file")]
642            file_manager: None,
643            external_read_store: Some(store),
644            external_write_store: None,
645            #[cfg(feature = "metrics")]
646            metrics: Some(Arc::new(crate::metrics::MetricsRegistry::new())),
647            current_graph: RwLock::new(None),
648            current_schema: RwLock::new(None),
649            read_only: true,
650        })
651    }
652
653    /// Converts the database to a read-only [`CompactStore`] for faster queries.
654    ///
655    /// Takes a snapshot of all nodes and edges from the current store, builds
656    /// a columnar `CompactStore` with CSR adjacency, and switches the database
657    /// to read-only mode. The original store is dropped to free memory.
658    ///
659    /// After calling this, all write queries will fail with
660    /// `TransactionError::ReadOnly`. Read queries (across all supported
661    /// languages) continue to work and benefit from ~60x memory reduction
662    /// and 100x+ traversal speedup.
663    ///
664    /// # Errors
665    ///
666    /// Returns an error if the conversion fails (e.g. more than 32,767
667    /// distinct labels or edge types).
668    ///
669    /// [`CompactStore`]: grafeo_core::graph::compact::CompactStore
670    #[cfg(feature = "compact-store")]
671    pub fn compact(&mut self) -> Result<()> {
672        use grafeo_core::graph::compact::from_graph_store;
673
674        let current_store = self.graph_store();
675        let compact = from_graph_store(current_store.as_ref())
676            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
677
678        self.external_read_store = Some(Arc::new(compact) as Arc<dyn GraphStore>);
679        self.external_write_store = None;
680        self.store = None;
681        self.read_only = true;
682        self.query_cache = Arc::new(QueryCache::default());
683
684        Ok(())
685    }
686
687    /// Applies WAL records to restore the database state.
688    ///
689    /// Data mutation records are routed through a graph cursor that tracks
690    /// `SwitchGraph` context markers, replaying mutations into the correct
691    /// named graph (or the default graph when cursor is `None`).
692    #[cfg(feature = "wal")]
693    fn apply_wal_records(
694        store: &Arc<LpgStore>,
695        catalog: &Catalog,
696        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
697        records: &[WalRecord],
698    ) -> Result<()> {
699        use crate::catalog::{
700            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
701        };
702        use grafeo_common::utils::error::Error;
703
704        // Graph cursor: tracks which named graph receives data mutations.
705        // `None` means the default graph.
706        let mut current_graph: Option<String> = None;
707        let mut target_store: Arc<LpgStore> = Arc::clone(store);
708
709        for record in records {
710            match record {
711                // --- Named graph lifecycle ---
712                WalRecord::CreateNamedGraph { name } => {
713                    let _ = store.create_graph(name);
714                }
715                WalRecord::DropNamedGraph { name } => {
716                    store.drop_graph(name);
717                    // Reset cursor if the dropped graph was active
718                    if current_graph.as_deref() == Some(name.as_str()) {
719                        current_graph = None;
720                        target_store = Arc::clone(store);
721                    }
722                }
723                WalRecord::SwitchGraph { name } => {
724                    current_graph.clone_from(name);
725                    target_store = match &current_graph {
726                        None => Arc::clone(store),
727                        Some(graph_name) => store
728                            .graph_or_create(graph_name)
729                            .map_err(|e| Error::Internal(e.to_string()))?,
730                    };
731                }
732
733                // --- Data mutations: routed through target_store ---
734                WalRecord::CreateNode { id, labels } => {
735                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
736                    target_store.create_node_with_id(*id, &label_refs)?;
737                }
738                WalRecord::DeleteNode { id } => {
739                    target_store.delete_node(*id);
740                }
741                WalRecord::CreateEdge {
742                    id,
743                    src,
744                    dst,
745                    edge_type,
746                } => {
747                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
748                }
749                WalRecord::DeleteEdge { id } => {
750                    target_store.delete_edge(*id);
751                }
752                WalRecord::SetNodeProperty { id, key, value } => {
753                    target_store.set_node_property(*id, key, value.clone());
754                }
755                WalRecord::SetEdgeProperty { id, key, value } => {
756                    target_store.set_edge_property(*id, key, value.clone());
757                }
758                WalRecord::AddNodeLabel { id, label } => {
759                    target_store.add_label(*id, label);
760                }
761                WalRecord::RemoveNodeLabel { id, label } => {
762                    target_store.remove_label(*id, label);
763                }
764                WalRecord::RemoveNodeProperty { id, key } => {
765                    target_store.remove_node_property(*id, key);
766                }
767                WalRecord::RemoveEdgeProperty { id, key } => {
768                    target_store.remove_edge_property(*id, key);
769                }
770
771                // --- Schema DDL replay (always on root catalog) ---
772                WalRecord::CreateNodeType {
773                    name,
774                    properties,
775                    constraints,
776                } => {
777                    let def = NodeTypeDefinition {
778                        name: name.clone(),
779                        properties: properties
780                            .iter()
781                            .map(|(n, t, nullable)| TypedProperty {
782                                name: n.clone(),
783                                data_type: PropertyDataType::from_type_name(t),
784                                nullable: *nullable,
785                                default_value: None,
786                            })
787                            .collect(),
788                        constraints: constraints
789                            .iter()
790                            .map(|(kind, props)| match kind.as_str() {
791                                "unique" => TypeConstraint::Unique(props.clone()),
792                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
793                                "not_null" if !props.is_empty() => {
794                                    TypeConstraint::NotNull(props[0].clone())
795                                }
796                                _ => TypeConstraint::Unique(props.clone()),
797                            })
798                            .collect(),
799                        parent_types: Vec::new(),
800                    };
801                    let _ = catalog.register_node_type(def);
802                }
803                WalRecord::DropNodeType { name } => {
804                    let _ = catalog.drop_node_type(name);
805                }
806                WalRecord::CreateEdgeType {
807                    name,
808                    properties,
809                    constraints,
810                } => {
811                    let def = EdgeTypeDefinition {
812                        name: name.clone(),
813                        properties: properties
814                            .iter()
815                            .map(|(n, t, nullable)| TypedProperty {
816                                name: n.clone(),
817                                data_type: PropertyDataType::from_type_name(t),
818                                nullable: *nullable,
819                                default_value: None,
820                            })
821                            .collect(),
822                        constraints: constraints
823                            .iter()
824                            .map(|(kind, props)| match kind.as_str() {
825                                "unique" => TypeConstraint::Unique(props.clone()),
826                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
827                                "not_null" if !props.is_empty() => {
828                                    TypeConstraint::NotNull(props[0].clone())
829                                }
830                                _ => TypeConstraint::Unique(props.clone()),
831                            })
832                            .collect(),
833                        source_node_types: Vec::new(),
834                        target_node_types: Vec::new(),
835                    };
836                    let _ = catalog.register_edge_type_def(def);
837                }
838                WalRecord::DropEdgeType { name } => {
839                    let _ = catalog.drop_edge_type_def(name);
840                }
841                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
842                    // Index recreation is handled by the store on startup
843                    // (indexes are rebuilt from data, not WAL)
844                }
845                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
846                    // Constraint definitions are part of type definitions
847                    // and replayed via CreateNodeType/CreateEdgeType
848                }
849                WalRecord::CreateGraphType {
850                    name,
851                    node_types,
852                    edge_types,
853                    open,
854                } => {
855                    use crate::catalog::GraphTypeDefinition;
856                    let def = GraphTypeDefinition {
857                        name: name.clone(),
858                        allowed_node_types: node_types.clone(),
859                        allowed_edge_types: edge_types.clone(),
860                        open: *open,
861                    };
862                    let _ = catalog.register_graph_type(def);
863                }
864                WalRecord::DropGraphType { name } => {
865                    let _ = catalog.drop_graph_type(name);
866                }
867                WalRecord::CreateSchema { name } => {
868                    let _ = catalog.register_schema_namespace(name.clone());
869                }
870                WalRecord::DropSchema { name } => {
871                    let _ = catalog.drop_schema_namespace(name);
872                }
873
874                WalRecord::AlterNodeType { name, alterations } => {
875                    for (action, prop_name, type_name, nullable) in alterations {
876                        match action.as_str() {
877                            "add" => {
878                                let prop = TypedProperty {
879                                    name: prop_name.clone(),
880                                    data_type: PropertyDataType::from_type_name(type_name),
881                                    nullable: *nullable,
882                                    default_value: None,
883                                };
884                                let _ = catalog.alter_node_type_add_property(name, prop);
885                            }
886                            "drop" => {
887                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
888                            }
889                            _ => {}
890                        }
891                    }
892                }
893                WalRecord::AlterEdgeType { name, alterations } => {
894                    for (action, prop_name, type_name, nullable) in alterations {
895                        match action.as_str() {
896                            "add" => {
897                                let prop = TypedProperty {
898                                    name: prop_name.clone(),
899                                    data_type: PropertyDataType::from_type_name(type_name),
900                                    nullable: *nullable,
901                                    default_value: None,
902                                };
903                                let _ = catalog.alter_edge_type_add_property(name, prop);
904                            }
905                            "drop" => {
906                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
907                            }
908                            _ => {}
909                        }
910                    }
911                }
912                WalRecord::AlterGraphType { name, alterations } => {
913                    for (action, type_name) in alterations {
914                        match action.as_str() {
915                            "add_node" => {
916                                let _ =
917                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
918                            }
919                            "drop_node" => {
920                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
921                            }
922                            "add_edge" => {
923                                let _ =
924                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
925                            }
926                            "drop_edge" => {
927                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
928                            }
929                            _ => {}
930                        }
931                    }
932                }
933
934                WalRecord::CreateProcedure {
935                    name,
936                    params,
937                    returns,
938                    body,
939                } => {
940                    use crate::catalog::ProcedureDefinition;
941                    let def = ProcedureDefinition {
942                        name: name.clone(),
943                        params: params.clone(),
944                        returns: returns.clone(),
945                        body: body.clone(),
946                    };
947                    let _ = catalog.register_procedure(def);
948                }
949                WalRecord::DropProcedure { name } => {
950                    let _ = catalog.drop_procedure(name);
951                }
952
953                // --- RDF triple replay ---
954                #[cfg(feature = "rdf")]
955                WalRecord::InsertRdfTriple { .. }
956                | WalRecord::DeleteRdfTriple { .. }
957                | WalRecord::ClearRdfGraph { .. }
958                | WalRecord::CreateRdfGraph { .. }
959                | WalRecord::DropRdfGraph { .. } => {
960                    rdf_ops::replay_rdf_wal_record(rdf_store, record);
961                }
962                #[cfg(not(feature = "rdf"))]
963                WalRecord::InsertRdfTriple { .. }
964                | WalRecord::DeleteRdfTriple { .. }
965                | WalRecord::ClearRdfGraph { .. }
966                | WalRecord::CreateRdfGraph { .. }
967                | WalRecord::DropRdfGraph { .. } => {}
968
969                WalRecord::TransactionCommit { .. } => {
970                    // In temporal mode, advance the store epoch on each committed
971                    // transaction so that subsequent property/label operations
972                    // are recorded at the correct epoch in their VersionLogs.
973                    #[cfg(feature = "temporal")]
974                    {
975                        target_store.new_epoch();
976                    }
977                }
978                WalRecord::TransactionAbort { .. } | WalRecord::Checkpoint { .. } => {
979                    // Transaction control records don't need replay action
980                    // (recovery already filtered to only committed transactions)
981                }
982            }
983        }
984        Ok(())
985    }
986
987    // =========================================================================
988    // Single-file format helpers
989    // =========================================================================
990
991    /// Returns `true` if the given path should use single-file format.
992    #[cfg(feature = "grafeo-file")]
993    fn should_use_single_file(
994        path: &std::path::Path,
995        configured: crate::config::StorageFormat,
996    ) -> bool {
997        use crate::config::StorageFormat;
998        match configured {
999            StorageFormat::SingleFile => true,
1000            StorageFormat::WalDirectory => false,
1001            StorageFormat::Auto => {
1002                // Existing file: check magic bytes
1003                if path.is_file() {
1004                    if let Ok(mut f) = std::fs::File::open(path) {
1005                        use std::io::Read;
1006                        let mut magic = [0u8; 4];
1007                        if f.read_exact(&mut magic).is_ok()
1008                            && magic == grafeo_adapters::storage::file::MAGIC
1009                        {
1010                            return true;
1011                        }
1012                    }
1013                    return false;
1014                }
1015                // Existing directory: legacy format
1016                if path.is_dir() {
1017                    return false;
1018                }
1019                // New path: check extension
1020                path.extension().is_some_and(|ext| ext == "grafeo")
1021            }
1022        }
1023    }
1024
1025    /// Applies snapshot data (from a `.grafeo` file) to restore the store and catalog.
1026    #[cfg(feature = "grafeo-file")]
1027    fn apply_snapshot_data(
1028        store: &Arc<LpgStore>,
1029        catalog: &Arc<crate::catalog::Catalog>,
1030        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
1031        data: &[u8],
1032    ) -> Result<()> {
1033        persistence::load_snapshot_into_store(
1034            store,
1035            catalog,
1036            #[cfg(feature = "rdf")]
1037            rdf_store,
1038            data,
1039        )
1040    }
1041
1042    // =========================================================================
1043    // Session & Configuration
1044    // =========================================================================
1045
1046    /// Opens a new session for running queries.
1047    ///
1048    /// Sessions are cheap to create: spin up as many as you need. Each
1049    /// gets its own transaction context, so concurrent sessions won't
1050    /// block each other on reads.
1051    ///
1052    /// # Panics
1053    ///
1054    /// Panics if the database was configured with an external graph store and
1055    /// the internal arena allocator cannot be initialized (out of memory).
1056    ///
1057    /// # Examples
1058    ///
1059    /// ```
1060    /// use grafeo_engine::GrafeoDB;
1061    ///
1062    /// let db = GrafeoDB::new_in_memory();
1063    /// let session = db.session();
1064    ///
1065    /// // Run queries through the session
1066    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
1067    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1068    /// ```
1069    #[must_use]
1070    pub fn session(&self) -> Session {
1071        self.create_session_inner(None)
1072    }
1073
1074    /// Creates a session with an explicit CDC override.
1075    ///
1076    /// When `cdc_enabled` is `true`, mutations in this session are tracked
1077    /// regardless of the database default. When `false`, mutations are not
1078    /// tracked regardless of the database default.
1079    ///
1080    /// # Examples
1081    ///
1082    /// ```
1083    /// use grafeo_engine::GrafeoDB;
1084    ///
1085    /// let db = GrafeoDB::new_in_memory();
1086    ///
1087    /// // Opt in to CDC for just this session
1088    /// let tracked = db.session_with_cdc(true);
1089    /// tracked.execute("INSERT (:Person {name: 'Alix'})")?;
1090    /// # Ok::<(), grafeo_common::utils::error::Error>(())
1091    /// ```
1092    #[cfg(feature = "cdc")]
1093    #[must_use]
1094    pub fn session_with_cdc(&self, cdc_enabled: bool) -> Session {
1095        self.create_session_inner(Some(cdc_enabled))
1096    }
1097
1098    /// Creates a read-only session regardless of the database's access mode.
1099    ///
1100    /// Mutations executed through this session will fail with
1101    /// `TransactionError::ReadOnly`. Useful for replication replicas where
1102    /// the database itself must remain writable (for applying CDC changes)
1103    /// but client-facing queries must be read-only.
1104    #[must_use]
1105    pub fn session_read_only(&self) -> Session {
1106        self.create_session_inner_opts(None, true)
1107    }
1108
1109    /// Shared session creation logic.
1110    ///
1111    /// `cdc_override` overrides the database-wide `cdc_enabled` default when
1112    /// `Some`. `None` falls back to the database default.
1113    #[allow(unused_variables)] // cdc_override unused when cdc feature is off
1114    fn create_session_inner(&self, cdc_override: Option<bool>) -> Session {
1115        self.create_session_inner_opts(cdc_override, false)
1116    }
1117
1118    /// Shared session creation with all overrides.
1119    #[allow(unused_variables)]
1120    fn create_session_inner_opts(
1121        &self,
1122        cdc_override: Option<bool>,
1123        force_read_only: bool,
1124    ) -> Session {
1125        let session_cfg = || crate::session::SessionConfig {
1126            transaction_manager: Arc::clone(&self.transaction_manager),
1127            query_cache: Arc::clone(&self.query_cache),
1128            catalog: Arc::clone(&self.catalog),
1129            adaptive_config: self.config.adaptive.clone(),
1130            factorized_execution: self.config.factorized_execution,
1131            graph_model: self.config.graph_model,
1132            query_timeout: self.config.query_timeout,
1133            commit_counter: Arc::clone(&self.commit_counter),
1134            gc_interval: self.config.gc_interval,
1135            read_only: self.read_only || force_read_only,
1136        };
1137
1138        if let Some(ref ext_read) = self.external_read_store {
1139            return Session::with_external_store(
1140                Arc::clone(ext_read),
1141                self.external_write_store.as_ref().map(Arc::clone),
1142                session_cfg(),
1143            )
1144            .expect("arena allocation for external store session");
1145        }
1146
1147        #[cfg(feature = "rdf")]
1148        let mut session = Session::with_rdf_store_and_adaptive(
1149            Arc::clone(self.lpg_store()),
1150            Arc::clone(&self.rdf_store),
1151            session_cfg(),
1152        );
1153        #[cfg(not(feature = "rdf"))]
1154        let mut session = Session::with_adaptive(Arc::clone(self.lpg_store()), session_cfg());
1155
1156        #[cfg(feature = "wal")]
1157        if let Some(ref wal) = self.wal {
1158            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
1159        }
1160
1161        #[cfg(feature = "cdc")]
1162        {
1163            let should_enable = cdc_override.unwrap_or_else(|| self.cdc_active());
1164            if should_enable {
1165                session.set_cdc_log(Arc::clone(&self.cdc_log));
1166            }
1167        }
1168
1169        #[cfg(feature = "metrics")]
1170        {
1171            if let Some(ref m) = self.metrics {
1172                session.set_metrics(Arc::clone(m));
1173                m.session_created
1174                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1175                m.session_active
1176                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1177            }
1178        }
1179
1180        // Propagate persistent graph context to the new session
1181        if let Some(ref graph) = *self.current_graph.read() {
1182            session.use_graph(graph);
1183        }
1184
1185        // Propagate persistent schema context to the new session
1186        if let Some(ref schema) = *self.current_schema.read() {
1187            session.set_schema(schema);
1188        }
1189
1190        // Suppress unused_mut when cdc/wal are disabled
1191        let _ = &mut session;
1192
1193        session
1194    }
1195
1196    /// Returns the current graph name, if any.
1197    ///
1198    /// This is the persistent graph context used by one-shot `execute()` calls.
1199    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
1200    /// or `SESSION RESET`.
1201    #[must_use]
1202    pub fn current_graph(&self) -> Option<String> {
1203        self.current_graph.read().clone()
1204    }
1205
1206    /// Sets the current graph context for subsequent one-shot `execute()` calls.
1207    ///
1208    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
1209    /// Pass `None` to reset to the default graph.
1210    pub fn set_current_graph(&self, name: Option<&str>) {
1211        *self.current_graph.write() = name.map(ToString::to_string);
1212    }
1213
1214    /// Returns the current schema name, if any.
1215    ///
1216    /// This is the persistent schema context used by one-shot `execute()` calls.
1217    /// It is updated whenever `execute()` encounters `SESSION SET SCHEMA` or `SESSION RESET`.
1218    #[must_use]
1219    pub fn current_schema(&self) -> Option<String> {
1220        self.current_schema.read().clone()
1221    }
1222
1223    /// Sets the current schema context for subsequent one-shot `execute()` calls.
1224    ///
1225    /// This is equivalent to running `SESSION SET SCHEMA <name>` but without creating
1226    /// a session. Pass `None` to clear the schema context.
1227    pub fn set_current_schema(&self, name: Option<&str>) {
1228        *self.current_schema.write() = name.map(ToString::to_string);
1229    }
1230
1231    /// Returns the adaptive execution configuration.
1232    #[must_use]
1233    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
1234        &self.config.adaptive
1235    }
1236
1237    /// Returns `true` if this database was opened in read-only mode.
1238    #[must_use]
1239    pub fn is_read_only(&self) -> bool {
1240        self.read_only
1241    }
1242
1243    /// Returns the configuration.
1244    #[must_use]
1245    pub fn config(&self) -> &Config {
1246        &self.config
1247    }
1248
1249    /// Returns the graph data model of this database.
1250    #[must_use]
1251    pub fn graph_model(&self) -> crate::config::GraphModel {
1252        self.config.graph_model
1253    }
1254
1255    /// Returns the configured memory limit in bytes, if any.
1256    #[must_use]
1257    pub fn memory_limit(&self) -> Option<usize> {
1258        self.config.memory_limit
1259    }
1260
1261    /// Returns a point-in-time snapshot of all metrics.
1262    ///
1263    /// If the `metrics` feature is disabled or the registry is not
1264    /// initialized, returns a default (all-zero) snapshot.
1265    #[cfg(feature = "metrics")]
1266    #[must_use]
1267    pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
1268        let mut snapshot = self
1269            .metrics
1270            .as_ref()
1271            .map_or_else(crate::metrics::MetricsSnapshot::default, |m| m.snapshot());
1272
1273        // Augment with cache stats from the query cache (not tracked in the registry)
1274        let cache_stats = self.query_cache.stats();
1275        snapshot.cache_hits = cache_stats.parsed_hits + cache_stats.optimized_hits;
1276        snapshot.cache_misses = cache_stats.parsed_misses + cache_stats.optimized_misses;
1277        snapshot.cache_size = cache_stats.parsed_size + cache_stats.optimized_size;
1278        snapshot.cache_invalidations = cache_stats.invalidations;
1279
1280        snapshot
1281    }
1282
1283    /// Returns all metrics in Prometheus text exposition format.
1284    ///
1285    /// The output is ready to serve from an HTTP `/metrics` endpoint.
1286    #[cfg(feature = "metrics")]
1287    #[must_use]
1288    pub fn metrics_prometheus(&self) -> String {
1289        self.metrics
1290            .as_ref()
1291            .map_or_else(String::new, |m| m.to_prometheus())
1292    }
1293
1294    /// Resets all metrics counters and histograms to zero.
1295    #[cfg(feature = "metrics")]
1296    pub fn reset_metrics(&self) {
1297        if let Some(ref m) = self.metrics {
1298            m.reset();
1299        }
1300        self.query_cache.reset_stats();
1301    }
1302
1303    /// Returns the underlying (default) store.
1304    ///
1305    /// This provides direct access to the LPG store for algorithm implementations
1306    /// and admin operations (index management, schema introspection, MVCC internals).
1307    ///
1308    /// For code that only needs read/write graph operations, prefer
1309    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
1310    #[must_use]
1311    pub fn store(&self) -> &Arc<LpgStore> {
1312        self.lpg_store()
1313    }
1314
1315    /// Returns the LPG store for the currently active graph.
1316    ///
1317    /// If [`current_graph`](Self::current_graph) is `None` or `"default"`, returns
1318    /// the default store. Otherwise looks up the named graph in the root store.
1319    /// Falls back to the default store if the named graph does not exist.
1320    #[allow(dead_code)] // Reserved for future graph-aware CRUD methods
1321    fn active_store(&self) -> Arc<LpgStore> {
1322        let store = self.lpg_store();
1323        let graph_name = self.current_graph.read().clone();
1324        match graph_name {
1325            None => Arc::clone(store),
1326            Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(store),
1327            Some(ref name) => store.graph(name).unwrap_or_else(|| Arc::clone(store)),
1328        }
1329    }
1330
1331    // === Named Graph Management ===
1332
1333    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
1334    ///
1335    /// # Errors
1336    ///
1337    /// Returns an error if arena allocation fails.
1338    pub fn create_graph(&self, name: &str) -> Result<bool> {
1339        Ok(self.lpg_store().create_graph(name)?)
1340    }
1341
1342    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
1343    pub fn drop_graph(&self, name: &str) -> bool {
1344        self.lpg_store().drop_graph(name)
1345    }
1346
1347    /// Returns all named graph names.
1348    #[must_use]
1349    pub fn list_graphs(&self) -> Vec<String> {
1350        self.lpg_store().graph_names()
1351    }
1352
1353    /// Returns the graph store as a trait object.
1354    ///
1355    /// Returns a read-only trait object for the active graph store.
1356    ///
1357    /// This provides the [`GraphStore`] interface for code that only needs
1358    /// read operations. For write access, use [`graph_store_mut()`](Self::graph_store_mut).
1359    ///
1360    /// [`GraphStore`]: grafeo_core::graph::GraphStore
1361    #[must_use]
1362    pub fn graph_store(&self) -> Arc<dyn GraphStore> {
1363        if let Some(ref ext_read) = self.external_read_store {
1364            Arc::clone(ext_read)
1365        } else {
1366            Arc::clone(self.lpg_store()) as Arc<dyn GraphStore>
1367        }
1368    }
1369
1370    /// Returns the writable graph store, if available.
1371    ///
1372    /// Returns `None` for read-only databases created via
1373    /// [`with_read_store()`](Self::with_read_store).
1374    #[must_use]
1375    pub fn graph_store_mut(&self) -> Option<Arc<dyn GraphStoreMut>> {
1376        if self.external_read_store.is_some() {
1377            self.external_write_store.as_ref().map(Arc::clone)
1378        } else {
1379            Some(Arc::clone(self.lpg_store()) as Arc<dyn GraphStoreMut>)
1380        }
1381    }
1382
1383    /// Garbage collects old MVCC versions that are no longer visible.
1384    ///
1385    /// Determines the minimum epoch required by active transactions and prunes
1386    /// version chains older than that threshold. Also cleans up completed
1387    /// transaction metadata in the transaction manager.
1388    pub fn gc(&self) {
1389        let min_epoch = self.transaction_manager.min_active_epoch();
1390        self.lpg_store().gc_versions(min_epoch);
1391        self.transaction_manager.gc();
1392    }
1393
1394    /// Returns the buffer manager for memory-aware operations.
1395    #[must_use]
1396    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
1397        &self.buffer_manager
1398    }
1399
1400    /// Returns the query cache.
1401    #[must_use]
1402    pub fn query_cache(&self) -> &Arc<QueryCache> {
1403        &self.query_cache
1404    }
1405
1406    /// Clears all cached query plans.
1407    ///
1408    /// This is called automatically after DDL operations, but can also be
1409    /// invoked manually after external schema changes (e.g., WAL replay,
1410    /// import) or when you want to force re-optimization of all queries.
1411    pub fn clear_plan_cache(&self) {
1412        self.query_cache.clear();
1413    }
1414
1415    // =========================================================================
1416    // Lifecycle
1417    // =========================================================================
1418
1419    /// Closes the database, flushing all pending writes.
1420    ///
1421    /// For persistent databases, this ensures everything is safely on disk.
1422    /// Called automatically when the database is dropped, but you can call
1423    /// it explicitly if you need to guarantee durability at a specific point.
1424    ///
1425    /// # Errors
1426    ///
1427    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
1428    pub fn close(&self) -> Result<()> {
1429        let mut is_open = self.is_open.write();
1430        if !*is_open {
1431            return Ok(());
1432        }
1433
1434        // Read-only databases: just release the shared lock, no checkpointing
1435        if self.read_only {
1436            #[cfg(feature = "grafeo-file")]
1437            if let Some(ref fm) = self.file_manager {
1438                fm.close()?;
1439            }
1440            *is_open = false;
1441            return Ok(());
1442        }
1443
1444        // For single-file format: checkpoint to .grafeo file, then clean up sidecar WAL.
1445        // We must do this BEFORE the WAL close path because checkpoint_to_file
1446        // removes the sidecar WAL directory.
1447        #[cfg(feature = "grafeo-file")]
1448        let is_single_file = self.file_manager.is_some();
1449        #[cfg(not(feature = "grafeo-file"))]
1450        let is_single_file = false;
1451
1452        #[cfg(feature = "grafeo-file")]
1453        if let Some(ref fm) = self.file_manager {
1454            // Flush WAL first so all records are on disk before we snapshot
1455            #[cfg(feature = "wal")]
1456            if let Some(ref wal) = self.wal {
1457                wal.sync()?;
1458            }
1459            self.checkpoint_to_file(fm)?;
1460
1461            // Release WAL file handles before removing sidecar directory.
1462            // On Windows, open handles prevent directory deletion.
1463            #[cfg(feature = "wal")]
1464            if let Some(ref wal) = self.wal {
1465                wal.close_active_log();
1466            }
1467
1468            {
1469                use grafeo_core::testing::crash::maybe_crash;
1470                maybe_crash("close:before_remove_sidecar_wal");
1471            }
1472            fm.remove_sidecar_wal()?;
1473            fm.close()?;
1474        }
1475
1476        // Commit and sync WAL (legacy directory format only).
1477        // We intentionally do NOT call wal.checkpoint() here. Directory format
1478        // has no snapshot: the WAL files are the sole source of truth. Writing
1479        // checkpoint.meta would cause recovery to skip older WAL files, losing
1480        // all data that predates the current log sequence.
1481        #[cfg(feature = "wal")]
1482        if !is_single_file && let Some(ref wal) = self.wal {
1483            // Use the last assigned transaction ID, or create one for the commit record
1484            let commit_tx = self
1485                .transaction_manager
1486                .last_assigned_transaction_id()
1487                .unwrap_or_else(|| self.transaction_manager.begin());
1488
1489            // Log a TransactionCommit to mark all pending records as committed
1490            wal.log(&WalRecord::TransactionCommit {
1491                transaction_id: commit_tx,
1492            })?;
1493
1494            wal.sync()?;
1495        }
1496
1497        *is_open = false;
1498        Ok(())
1499    }
1500
1501    /// Returns the typed WAL if available.
1502    #[cfg(feature = "wal")]
1503    #[must_use]
1504    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
1505        self.wal.as_ref()
1506    }
1507
1508    /// Logs a WAL record if WAL is enabled.
1509    #[cfg(feature = "wal")]
1510    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
1511        if let Some(ref wal) = self.wal {
1512            wal.log(record)?;
1513        }
1514        Ok(())
1515    }
1516
1517    /// Writes the current database snapshot to the `.grafeo` file.
1518    ///
1519    /// Does NOT remove the sidecar WAL: callers that want to clean up
1520    /// the sidecar (e.g. `close()`) should call `fm.remove_sidecar_wal()`
1521    /// separately after this returns.
1522    #[cfg(feature = "grafeo-file")]
1523    fn checkpoint_to_file(&self, fm: &GrafeoFileManager) -> Result<()> {
1524        use grafeo_core::testing::crash::maybe_crash;
1525
1526        maybe_crash("checkpoint_to_file:before_export");
1527        let snapshot_data = self.export_snapshot()?;
1528        maybe_crash("checkpoint_to_file:after_export");
1529
1530        let epoch = self.lpg_store().current_epoch();
1531        let transaction_id = self
1532            .transaction_manager
1533            .last_assigned_transaction_id()
1534            .map_or(0, |t| t.0);
1535        let node_count = self.lpg_store().node_count() as u64;
1536        let edge_count = self.lpg_store().edge_count() as u64;
1537
1538        fm.write_snapshot(
1539            &snapshot_data,
1540            epoch.0,
1541            transaction_id,
1542            node_count,
1543            edge_count,
1544        )?;
1545
1546        maybe_crash("checkpoint_to_file:after_write_snapshot");
1547        Ok(())
1548    }
1549
1550    /// Returns the file manager if using single-file format.
1551    #[cfg(feature = "grafeo-file")]
1552    #[must_use]
1553    pub fn file_manager(&self) -> Option<&Arc<GrafeoFileManager>> {
1554        self.file_manager.as_ref()
1555    }
1556}
1557
1558impl Drop for GrafeoDB {
1559    fn drop(&mut self) {
1560        if let Err(e) = self.close() {
1561            grafeo_error!("Error closing database: {}", e);
1562        }
1563    }
1564}
1565
1566impl crate::admin::AdminService for GrafeoDB {
1567    fn info(&self) -> crate::admin::DatabaseInfo {
1568        self.info()
1569    }
1570
1571    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
1572        self.detailed_stats()
1573    }
1574
1575    fn schema(&self) -> crate::admin::SchemaInfo {
1576        self.schema()
1577    }
1578
1579    fn validate(&self) -> crate::admin::ValidationResult {
1580        self.validate()
1581    }
1582
1583    fn wal_status(&self) -> crate::admin::WalStatus {
1584        self.wal_status()
1585    }
1586
1587    fn wal_checkpoint(&self) -> Result<()> {
1588        self.wal_checkpoint()
1589    }
1590}
1591
1592// =========================================================================
1593// Query Result Types
1594// =========================================================================
1595
1596/// The result of running a query.
1597///
1598/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1599/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1600///
1601/// # Examples
1602///
1603/// ```
1604/// use grafeo_engine::GrafeoDB;
1605///
1606/// let db = GrafeoDB::new_in_memory();
1607/// db.create_node(&["Person"]);
1608///
1609/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1610///
1611/// // Check what we got
1612/// println!("Columns: {:?}", result.columns);
1613/// println!("Rows: {}", result.row_count());
1614///
1615/// // Iterate through results
1616/// for row in result.iter() {
1617///     println!("{:?}", row);
1618/// }
1619/// # Ok::<(), grafeo_common::utils::error::Error>(())
1620/// ```
1621#[derive(Debug)]
1622pub struct QueryResult {
1623    /// Column names from the RETURN clause.
1624    pub columns: Vec<String>,
1625    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1626    pub column_types: Vec<grafeo_common::types::LogicalType>,
1627    /// The actual result rows.
1628    pub rows: Vec<Vec<grafeo_common::types::Value>>,
1629    /// Query execution time in milliseconds (if timing was enabled).
1630    pub execution_time_ms: Option<f64>,
1631    /// Number of rows scanned during query execution (estimate).
1632    pub rows_scanned: Option<u64>,
1633    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
1634    pub status_message: Option<String>,
1635    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
1636    pub gql_status: grafeo_common::utils::GqlStatus,
1637}
1638
1639impl QueryResult {
1640    /// Creates a fully empty query result (no columns, no rows).
1641    #[must_use]
1642    pub fn empty() -> Self {
1643        Self {
1644            columns: Vec::new(),
1645            column_types: Vec::new(),
1646            rows: Vec::new(),
1647            execution_time_ms: None,
1648            rows_scanned: None,
1649            status_message: None,
1650            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1651        }
1652    }
1653
1654    /// Creates a query result with only a status message (for DDL commands).
1655    #[must_use]
1656    pub fn status(msg: impl Into<String>) -> Self {
1657        Self {
1658            columns: Vec::new(),
1659            column_types: Vec::new(),
1660            rows: Vec::new(),
1661            execution_time_ms: None,
1662            rows_scanned: None,
1663            status_message: Some(msg.into()),
1664            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1665        }
1666    }
1667
1668    /// Creates a new empty query result.
1669    #[must_use]
1670    pub fn new(columns: Vec<String>) -> Self {
1671        let len = columns.len();
1672        Self {
1673            columns,
1674            column_types: vec![grafeo_common::types::LogicalType::Any; len],
1675            rows: Vec::new(),
1676            execution_time_ms: None,
1677            rows_scanned: None,
1678            status_message: None,
1679            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1680        }
1681    }
1682
1683    /// Creates a new empty query result with column types.
1684    #[must_use]
1685    pub fn with_types(
1686        columns: Vec<String>,
1687        column_types: Vec<grafeo_common::types::LogicalType>,
1688    ) -> Self {
1689        Self {
1690            columns,
1691            column_types,
1692            rows: Vec::new(),
1693            execution_time_ms: None,
1694            rows_scanned: None,
1695            status_message: None,
1696            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1697        }
1698    }
1699
1700    /// Sets the execution metrics on this result.
1701    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1702        self.execution_time_ms = Some(execution_time_ms);
1703        self.rows_scanned = Some(rows_scanned);
1704        self
1705    }
1706
1707    /// Returns the execution time in milliseconds, if available.
1708    #[must_use]
1709    pub fn execution_time_ms(&self) -> Option<f64> {
1710        self.execution_time_ms
1711    }
1712
1713    /// Returns the number of rows scanned, if available.
1714    #[must_use]
1715    pub fn rows_scanned(&self) -> Option<u64> {
1716        self.rows_scanned
1717    }
1718
1719    /// Returns the number of rows.
1720    #[must_use]
1721    pub fn row_count(&self) -> usize {
1722        self.rows.len()
1723    }
1724
1725    /// Returns the number of columns.
1726    #[must_use]
1727    pub fn column_count(&self) -> usize {
1728        self.columns.len()
1729    }
1730
1731    /// Returns true if the result is empty.
1732    #[must_use]
1733    pub fn is_empty(&self) -> bool {
1734        self.rows.is_empty()
1735    }
1736
1737    /// Extracts a single value from the result.
1738    ///
1739    /// Use this when your query returns exactly one row with one column,
1740    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1741    ///
1742    /// # Errors
1743    ///
1744    /// Returns an error if the result has multiple rows or columns.
1745    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1746        if self.rows.len() != 1 || self.columns.len() != 1 {
1747            return Err(grafeo_common::utils::error::Error::InvalidValue(
1748                "Expected single value".to_string(),
1749            ));
1750        }
1751        T::from_value(&self.rows[0][0])
1752    }
1753
1754    /// Returns an iterator over the rows.
1755    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1756        self.rows.iter()
1757    }
1758}
1759
1760impl std::fmt::Display for QueryResult {
1761    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1762        let table = grafeo_common::fmt::format_result_table(
1763            &self.columns,
1764            &self.rows,
1765            self.execution_time_ms,
1766            self.status_message.as_deref(),
1767        );
1768        f.write_str(&table)
1769    }
1770}
1771
1772/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
1773///
1774/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1775/// Used by [`QueryResult::scalar()`] to extract typed values.
1776pub trait FromValue: Sized {
1777    /// Attempts the conversion, returning an error on type mismatch.
1778    ///
1779    /// # Errors
1780    ///
1781    /// Returns `Error::TypeMismatch` if the value is not the expected type.
1782    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1783}
1784
1785impl FromValue for i64 {
1786    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1787        value
1788            .as_int64()
1789            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1790                expected: "INT64".to_string(),
1791                found: value.type_name().to_string(),
1792            })
1793    }
1794}
1795
1796impl FromValue for f64 {
1797    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1798        value
1799            .as_float64()
1800            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1801                expected: "FLOAT64".to_string(),
1802                found: value.type_name().to_string(),
1803            })
1804    }
1805}
1806
1807impl FromValue for String {
1808    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1809        value.as_str().map(String::from).ok_or_else(|| {
1810            grafeo_common::utils::error::Error::TypeMismatch {
1811                expected: "STRING".to_string(),
1812                found: value.type_name().to_string(),
1813            }
1814        })
1815    }
1816}
1817
1818impl FromValue for bool {
1819    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1820        value
1821            .as_bool()
1822            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1823                expected: "BOOL".to_string(),
1824                found: value.type_name().to_string(),
1825            })
1826    }
1827}
1828
1829#[cfg(test)]
1830mod tests {
1831    use super::*;
1832
1833    #[test]
1834    fn test_create_in_memory_database() {
1835        let db = GrafeoDB::new_in_memory();
1836        assert_eq!(db.node_count(), 0);
1837        assert_eq!(db.edge_count(), 0);
1838    }
1839
1840    #[test]
1841    fn test_database_config() {
1842        let config = Config::in_memory().with_threads(4).with_query_logging();
1843
1844        let db = GrafeoDB::with_config(config).unwrap();
1845        assert_eq!(db.config().threads, 4);
1846        assert!(db.config().query_logging);
1847    }
1848
1849    #[test]
1850    fn test_database_session() {
1851        let db = GrafeoDB::new_in_memory();
1852        let _session = db.session();
1853        // Session should be created successfully
1854    }
1855
1856    #[cfg(feature = "wal")]
1857    #[test]
1858    fn test_persistent_database_recovery() {
1859        use grafeo_common::types::Value;
1860        use tempfile::tempdir;
1861
1862        let dir = tempdir().unwrap();
1863        let db_path = dir.path().join("test_db");
1864
1865        // Create database and add some data
1866        {
1867            let db = GrafeoDB::open(&db_path).unwrap();
1868
1869            let alix = db.create_node(&["Person"]);
1870            db.set_node_property(alix, "name", Value::from("Alix"));
1871
1872            let gus = db.create_node(&["Person"]);
1873            db.set_node_property(gus, "name", Value::from("Gus"));
1874
1875            let _edge = db.create_edge(alix, gus, "KNOWS");
1876
1877            // Explicitly close to flush WAL
1878            db.close().unwrap();
1879        }
1880
1881        // Reopen and verify data was recovered
1882        {
1883            let db = GrafeoDB::open(&db_path).unwrap();
1884
1885            assert_eq!(db.node_count(), 2);
1886            assert_eq!(db.edge_count(), 1);
1887
1888            // Verify nodes exist
1889            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1890            assert!(node0.is_some());
1891
1892            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1893            assert!(node1.is_some());
1894        }
1895    }
1896
1897    #[cfg(feature = "wal")]
1898    #[test]
1899    fn test_wal_logging() {
1900        use tempfile::tempdir;
1901
1902        let dir = tempdir().unwrap();
1903        let db_path = dir.path().join("wal_test_db");
1904
1905        let db = GrafeoDB::open(&db_path).unwrap();
1906
1907        // Create some data
1908        let node = db.create_node(&["Test"]);
1909        db.delete_node(node);
1910
1911        // WAL should have records
1912        if let Some(wal) = db.wal() {
1913            assert!(wal.record_count() > 0);
1914        }
1915
1916        db.close().unwrap();
1917    }
1918
1919    #[cfg(feature = "wal")]
1920    #[test]
1921    fn test_wal_recovery_multiple_sessions() {
1922        // Tests that WAL recovery works correctly across multiple open/close cycles
1923        use grafeo_common::types::Value;
1924        use tempfile::tempdir;
1925
1926        let dir = tempdir().unwrap();
1927        let db_path = dir.path().join("multi_session_db");
1928
1929        // Session 1: Create initial data
1930        {
1931            let db = GrafeoDB::open(&db_path).unwrap();
1932            let alix = db.create_node(&["Person"]);
1933            db.set_node_property(alix, "name", Value::from("Alix"));
1934            db.close().unwrap();
1935        }
1936
1937        // Session 2: Add more data
1938        {
1939            let db = GrafeoDB::open(&db_path).unwrap();
1940            assert_eq!(db.node_count(), 1); // Previous data recovered
1941            let gus = db.create_node(&["Person"]);
1942            db.set_node_property(gus, "name", Value::from("Gus"));
1943            db.close().unwrap();
1944        }
1945
1946        // Session 3: Verify all data
1947        {
1948            let db = GrafeoDB::open(&db_path).unwrap();
1949            assert_eq!(db.node_count(), 2);
1950
1951            // Verify properties were recovered correctly
1952            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1953            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1954
1955            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1956            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1957        }
1958    }
1959
1960    #[cfg(feature = "wal")]
1961    #[test]
1962    fn test_database_consistency_after_mutations() {
1963        // Tests that database remains consistent after a series of create/delete operations
1964        use grafeo_common::types::Value;
1965        use tempfile::tempdir;
1966
1967        let dir = tempdir().unwrap();
1968        let db_path = dir.path().join("consistency_db");
1969
1970        {
1971            let db = GrafeoDB::open(&db_path).unwrap();
1972
1973            // Create nodes
1974            let a = db.create_node(&["Node"]);
1975            let b = db.create_node(&["Node"]);
1976            let c = db.create_node(&["Node"]);
1977
1978            // Create edges
1979            let e1 = db.create_edge(a, b, "LINKS");
1980            let _e2 = db.create_edge(b, c, "LINKS");
1981
1982            // Delete middle node and its edge
1983            db.delete_edge(e1);
1984            db.delete_node(b);
1985
1986            // Set properties on remaining nodes
1987            db.set_node_property(a, "value", Value::Int64(1));
1988            db.set_node_property(c, "value", Value::Int64(3));
1989
1990            db.close().unwrap();
1991        }
1992
1993        // Reopen and verify consistency
1994        {
1995            let db = GrafeoDB::open(&db_path).unwrap();
1996
1997            // Should have 2 nodes (a and c), b was deleted
1998            // Note: node_count includes deleted nodes in some implementations
1999            // What matters is that the non-deleted nodes are accessible
2000            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
2001            assert!(node_a.is_some());
2002
2003            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
2004            assert!(node_c.is_some());
2005
2006            // Middle node should be deleted
2007            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
2008            assert!(node_b.is_none());
2009        }
2010    }
2011
2012    #[cfg(feature = "wal")]
2013    #[test]
2014    fn test_close_is_idempotent() {
2015        // Calling close() multiple times should not cause errors
2016        use tempfile::tempdir;
2017
2018        let dir = tempdir().unwrap();
2019        let db_path = dir.path().join("close_test_db");
2020
2021        let db = GrafeoDB::open(&db_path).unwrap();
2022        db.create_node(&["Test"]);
2023
2024        // First close should succeed
2025        assert!(db.close().is_ok());
2026
2027        // Second close should also succeed (idempotent)
2028        assert!(db.close().is_ok());
2029    }
2030
2031    #[test]
2032    fn test_with_store_external_backend() {
2033        use grafeo_core::graph::lpg::LpgStore;
2034
2035        let external = Arc::new(LpgStore::new().unwrap());
2036
2037        // Seed data on the external store directly
2038        let n1 = external.create_node(&["Person"]);
2039        external.set_node_property(n1, "name", grafeo_common::types::Value::from("Alix"));
2040
2041        let db = GrafeoDB::with_store(
2042            Arc::clone(&external) as Arc<dyn GraphStoreMut>,
2043            Config::in_memory(),
2044        )
2045        .unwrap();
2046
2047        let session = db.session();
2048
2049        // Session should see data from the external store via execute
2050        #[cfg(feature = "gql")]
2051        {
2052            let result = session.execute("MATCH (p:Person) RETURN p.name").unwrap();
2053            assert_eq!(result.rows.len(), 1);
2054        }
2055    }
2056
2057    #[test]
2058    fn test_with_config_custom_memory_limit() {
2059        let config = Config::in_memory().with_memory_limit(64 * 1024 * 1024); // 64 MB
2060
2061        let db = GrafeoDB::with_config(config).unwrap();
2062        assert_eq!(db.config().memory_limit, Some(64 * 1024 * 1024));
2063        assert_eq!(db.node_count(), 0);
2064    }
2065
2066    #[cfg(feature = "metrics")]
2067    #[test]
2068    fn test_database_metrics_registry() {
2069        let db = GrafeoDB::new_in_memory();
2070
2071        // Perform some operations
2072        db.create_node(&["Person"]);
2073        db.create_node(&["Person"]);
2074
2075        // Check that metrics snapshot returns data
2076        let snap = db.metrics();
2077        // Session created counter should reflect at least 0 (metrics is initialized)
2078        assert_eq!(snap.query_count, 0); // No queries executed yet
2079    }
2080
2081    #[test]
2082    fn test_query_result_has_metrics() {
2083        // Verifies that query results include execution metrics
2084        let db = GrafeoDB::new_in_memory();
2085        db.create_node(&["Person"]);
2086        db.create_node(&["Person"]);
2087
2088        #[cfg(feature = "gql")]
2089        {
2090            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
2091
2092            // Metrics should be populated
2093            assert!(result.execution_time_ms.is_some());
2094            assert!(result.rows_scanned.is_some());
2095            assert!(result.execution_time_ms.unwrap() >= 0.0);
2096            assert_eq!(result.rows_scanned.unwrap(), 2);
2097        }
2098    }
2099
2100    #[test]
2101    fn test_empty_query_result_metrics() {
2102        // Verifies metrics are correct for queries returning no results
2103        let db = GrafeoDB::new_in_memory();
2104        db.create_node(&["Person"]);
2105
2106        #[cfg(feature = "gql")]
2107        {
2108            // Query that matches nothing
2109            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
2110
2111            assert!(result.execution_time_ms.is_some());
2112            assert!(result.rows_scanned.is_some());
2113            assert_eq!(result.rows_scanned.unwrap(), 0);
2114        }
2115    }
2116
2117    #[cfg(feature = "cdc")]
2118    mod cdc_integration {
2119        use super::*;
2120
2121        /// Helper: creates an in-memory database with CDC enabled.
2122        fn cdc_db() -> GrafeoDB {
2123            GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
2124        }
2125
2126        #[test]
2127        fn test_node_lifecycle_history() {
2128            let db = cdc_db();
2129
2130            // Create
2131            let id = db.create_node(&["Person"]);
2132            // Update
2133            db.set_node_property(id, "name", "Alix".into());
2134            db.set_node_property(id, "name", "Gus".into());
2135            // Delete
2136            db.delete_node(id);
2137
2138            let history = db.history(id).unwrap();
2139            assert_eq!(history.len(), 4); // create + 2 updates + delete
2140            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2141            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2142            assert!(history[1].before.is_none()); // first set_node_property has no prior value
2143            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
2144            assert!(history[2].before.is_some()); // second update has prior "Alix"
2145            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
2146        }
2147
2148        #[test]
2149        fn test_edge_lifecycle_history() {
2150            let db = cdc_db();
2151
2152            let alix = db.create_node(&["Person"]);
2153            let gus = db.create_node(&["Person"]);
2154            let edge = db.create_edge(alix, gus, "KNOWS");
2155            db.set_edge_property(edge, "since", 2024i64.into());
2156            db.delete_edge(edge);
2157
2158            let history = db.history(edge).unwrap();
2159            assert_eq!(history.len(), 3); // create + update + delete
2160            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2161            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
2162            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
2163        }
2164
2165        #[test]
2166        fn test_create_node_with_props_cdc() {
2167            let db = cdc_db();
2168
2169            let id = db.create_node_with_props(
2170                &["Person"],
2171                vec![
2172                    ("name", grafeo_common::types::Value::from("Alix")),
2173                    ("age", grafeo_common::types::Value::from(30i64)),
2174                ],
2175            );
2176
2177            let history = db.history(id).unwrap();
2178            assert_eq!(history.len(), 1);
2179            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
2180            // Props should be captured
2181            let after = history[0].after.as_ref().unwrap();
2182            assert_eq!(after.len(), 2);
2183        }
2184
2185        #[test]
2186        fn test_changes_between() {
2187            let db = cdc_db();
2188
2189            let id1 = db.create_node(&["A"]);
2190            let _id2 = db.create_node(&["B"]);
2191            db.set_node_property(id1, "x", 1i64.into());
2192
2193            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
2194            let changes = db
2195                .changes_between(
2196                    grafeo_common::types::EpochId(0),
2197                    grafeo_common::types::EpochId(u64::MAX),
2198                )
2199                .unwrap();
2200            assert_eq!(changes.len(), 3); // 2 creates + 1 update
2201        }
2202
2203        #[test]
2204        fn test_cdc_disabled_by_default() {
2205            let db = GrafeoDB::new_in_memory();
2206            assert!(!db.is_cdc_enabled());
2207
2208            let id = db.create_node(&["Person"]);
2209            db.set_node_property(id, "name", "Alix".into());
2210
2211            let history = db.history(id).unwrap();
2212            assert!(history.is_empty(), "CDC off by default: no events recorded");
2213        }
2214
2215        #[test]
2216        fn test_session_with_cdc_override_on() {
2217            // Database default is off, but session opts in
2218            let db = GrafeoDB::new_in_memory();
2219            let session = db.session_with_cdc(true);
2220            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2221            // The CDC log should have events from the opted-in session
2222            let changes = db
2223                .changes_between(
2224                    grafeo_common::types::EpochId(0),
2225                    grafeo_common::types::EpochId(u64::MAX),
2226                )
2227                .unwrap();
2228            assert!(
2229                !changes.is_empty(),
2230                "session_with_cdc(true) should record events"
2231            );
2232        }
2233
2234        #[test]
2235        fn test_session_with_cdc_override_off() {
2236            // Database default is on, but session opts out
2237            let db = cdc_db();
2238            let session = db.session_with_cdc(false);
2239            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2240            let changes = db
2241                .changes_between(
2242                    grafeo_common::types::EpochId(0),
2243                    grafeo_common::types::EpochId(u64::MAX),
2244                )
2245                .unwrap();
2246            assert!(
2247                changes.is_empty(),
2248                "session_with_cdc(false) should not record events"
2249            );
2250        }
2251
2252        #[test]
2253        fn test_set_cdc_enabled_runtime() {
2254            let db = GrafeoDB::new_in_memory();
2255            assert!(!db.is_cdc_enabled());
2256
2257            // Enable at runtime
2258            db.set_cdc_enabled(true);
2259            assert!(db.is_cdc_enabled());
2260
2261            let id = db.create_node(&["Person"]);
2262            let history = db.history(id).unwrap();
2263            assert_eq!(history.len(), 1, "CDC enabled at runtime records events");
2264
2265            // Disable again
2266            db.set_cdc_enabled(false);
2267            let id2 = db.create_node(&["Person"]);
2268            let history2 = db.history(id2).unwrap();
2269            assert!(
2270                history2.is_empty(),
2271                "CDC disabled at runtime stops recording"
2272            );
2273        }
2274    }
2275
2276    #[test]
2277    fn test_with_store_basic() {
2278        use grafeo_core::graph::lpg::LpgStore;
2279
2280        let store = Arc::new(LpgStore::new().unwrap());
2281        let n1 = store.create_node(&["Person"]);
2282        store.set_node_property(n1, "name", "Alix".into());
2283
2284        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2285        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2286
2287        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
2288        assert_eq!(result.rows.len(), 1);
2289    }
2290
2291    #[test]
2292    fn test_with_store_session() {
2293        use grafeo_core::graph::lpg::LpgStore;
2294
2295        let store = Arc::new(LpgStore::new().unwrap());
2296        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2297        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2298
2299        let session = db.session();
2300        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
2301        assert_eq!(result.rows.len(), 1);
2302    }
2303
2304    #[test]
2305    fn test_with_store_mutations() {
2306        use grafeo_core::graph::lpg::LpgStore;
2307
2308        let store = Arc::new(LpgStore::new().unwrap());
2309        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
2310        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
2311
2312        let mut session = db.session();
2313
2314        // Use an explicit transaction so INSERT and MATCH share the same
2315        // transaction context. With PENDING epochs, uncommitted versions are
2316        // only visible to the owning transaction.
2317        session.begin_transaction().unwrap();
2318        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
2319
2320        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
2321        assert_eq!(result.rows.len(), 1);
2322
2323        session.commit().unwrap();
2324    }
2325}