Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21mod search;
22#[cfg(feature = "wal")]
23pub(crate) mod wal_store;
24
25#[cfg(feature = "wal")]
26use std::path::Path;
27use std::sync::Arc;
28use std::sync::atomic::AtomicUsize;
29
30use parking_lot::RwLock;
31
32#[cfg(feature = "wal")]
33use grafeo_adapters::storage::wal::{
34    DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
35};
36use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
37use grafeo_common::utils::error::Result;
38use grafeo_core::graph::GraphStoreMut;
39use grafeo_core::graph::lpg::LpgStore;
40#[cfg(feature = "rdf")]
41use grafeo_core::graph::rdf::RdfStore;
42
43use crate::catalog::Catalog;
44use crate::config::Config;
45use crate::query::cache::QueryCache;
46use crate::session::Session;
47use crate::transaction::TransactionManager;
48
49/// Your handle to a Grafeo database.
50///
51/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
52/// quick experiments, or [`open()`](Self::open) for persistent storage.
53/// Then grab a [`session()`](Self::session) to start querying.
54///
55/// # Examples
56///
57/// ```
58/// use grafeo_engine::GrafeoDB;
59///
60/// // Quick in-memory database
61/// let db = GrafeoDB::new_in_memory();
62///
63/// // Add some data
64/// db.create_node(&["Person"]);
65///
66/// // Query it
67/// let session = db.session();
68/// let result = session.execute("MATCH (p:Person) RETURN p")?;
69/// # Ok::<(), grafeo_common::utils::error::Error>(())
70/// ```
71pub struct GrafeoDB {
72    /// Database configuration.
73    pub(super) config: Config,
74    /// The underlying graph store.
75    pub(super) store: Arc<LpgStore>,
76    /// Schema and metadata catalog shared across sessions.
77    pub(super) catalog: Arc<Catalog>,
78    /// RDF triple store (if RDF feature is enabled).
79    #[cfg(feature = "rdf")]
80    pub(super) rdf_store: Arc<RdfStore>,
81    /// Transaction manager.
82    pub(super) transaction_manager: Arc<TransactionManager>,
83    /// Unified buffer manager.
84    pub(super) buffer_manager: Arc<BufferManager>,
85    /// Write-ahead log manager (if durability is enabled).
86    #[cfg(feature = "wal")]
87    pub(super) wal: Option<Arc<LpgWal>>,
88    /// Shared WAL graph context tracker. Tracks which named graph was last
89    /// written to the WAL, so concurrent sessions can emit `SwitchGraph`
90    /// records only when the context actually changes.
91    #[cfg(feature = "wal")]
92    pub(super) wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
93    /// Query cache for parsed and optimized plans.
94    pub(super) query_cache: Arc<QueryCache>,
95    /// Shared commit counter for auto-GC across sessions.
96    pub(super) commit_counter: Arc<AtomicUsize>,
97    /// Whether the database is open.
98    pub(super) is_open: RwLock<bool>,
99    /// Change data capture log for tracking mutations.
100    #[cfg(feature = "cdc")]
101    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
102    /// Registered embedding models for text-to-vector conversion.
103    #[cfg(feature = "embed")]
104    pub(super) embedding_models:
105        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
106    /// External graph store (when using with_store()).
107    /// When set, sessions route queries through this store instead of the built-in LpgStore.
108    pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
109    /// Persistent graph context for one-shot `execute()` calls.
110    /// When set, each call to `session()` pre-configures the session to this graph.
111    /// Updated after every one-shot `execute()` to reflect `USE GRAPH` / `SESSION RESET`.
112    current_graph: RwLock<Option<String>>,
113}
114
115impl GrafeoDB {
116    /// Creates an in-memory database, fast to create, gone when dropped.
117    ///
118    /// Use this for tests, experiments, or when you don't need persistence.
119    /// For data that survives restarts, use [`open()`](Self::open) instead.
120    ///
121    /// # Panics
122    ///
123    /// Panics if the internal arena allocator cannot be initialized (out of memory).
124    /// Use [`with_config()`](Self::with_config) for a fallible alternative.
125    ///
126    /// # Examples
127    ///
128    /// ```
129    /// use grafeo_engine::GrafeoDB;
130    ///
131    /// let db = GrafeoDB::new_in_memory();
132    /// let session = db.session();
133    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
134    /// # Ok::<(), grafeo_common::utils::error::Error>(())
135    /// ```
136    #[must_use]
137    pub fn new_in_memory() -> Self {
138        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
139    }
140
141    /// Opens a database at the given path, creating it if it doesn't exist.
142    ///
143    /// If you've used this path before, Grafeo recovers your data from the
144    /// write-ahead log automatically. First open on a new path creates an
145    /// empty database.
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if the path isn't writable or recovery fails.
150    ///
151    /// # Examples
152    ///
153    /// ```no_run
154    /// use grafeo_engine::GrafeoDB;
155    ///
156    /// let db = GrafeoDB::open("./my_social_network")?;
157    /// # Ok::<(), grafeo_common::utils::error::Error>(())
158    /// ```
159    #[cfg(feature = "wal")]
160    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
161        Self::with_config(Config::persistent(path.as_ref()))
162    }
163
164    /// Creates a database with custom configuration.
165    ///
166    /// Use this when you need fine-grained control over memory limits,
167    /// thread counts, or persistence settings. For most cases,
168    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
169    /// are simpler.
170    ///
171    /// # Errors
172    ///
173    /// Returns an error if the database can't be created or recovery fails.
174    ///
175    /// # Examples
176    ///
177    /// ```
178    /// use grafeo_engine::{GrafeoDB, Config};
179    ///
180    /// // In-memory with a 512MB limit
181    /// let config = Config::in_memory()
182    ///     .with_memory_limit(512 * 1024 * 1024);
183    ///
184    /// let db = GrafeoDB::with_config(config)?;
185    /// # Ok::<(), grafeo_common::utils::error::Error>(())
186    /// ```
187    pub fn with_config(config: Config) -> Result<Self> {
188        // Validate configuration before proceeding
189        config
190            .validate()
191            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
192
193        let store = Arc::new(LpgStore::new()?);
194        #[cfg(feature = "rdf")]
195        let rdf_store = Arc::new(RdfStore::new());
196        let transaction_manager = Arc::new(TransactionManager::new());
197
198        // Create buffer manager with configured limits
199        let buffer_config = BufferManagerConfig {
200            budget: config.memory_limit.unwrap_or_else(|| {
201                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
202            }),
203            spill_path: config
204                .spill_path
205                .clone()
206                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
207            ..BufferManagerConfig::default()
208        };
209        let buffer_manager = BufferManager::new(buffer_config);
210
211        // Create catalog early so WAL replay can restore schema definitions
212        let catalog = Arc::new(Catalog::new());
213
214        // Initialize WAL if persistence is enabled
215        #[cfg(feature = "wal")]
216        let wal = if config.wal_enabled {
217            if let Some(ref db_path) = config.path {
218                // Create database directory if it doesn't exist
219                std::fs::create_dir_all(db_path)?;
220
221                let wal_path = db_path.join("wal");
222
223                // Check if WAL exists and recover if needed
224                if wal_path.exists() {
225                    let recovery = WalRecovery::new(&wal_path);
226                    let records = recovery.recover()?;
227                    Self::apply_wal_records(
228                        &store,
229                        &catalog,
230                        #[cfg(feature = "rdf")]
231                        &rdf_store,
232                        &records,
233                    )?;
234                }
235
236                // Open/create WAL manager with configured durability
237                let wal_durability = match config.wal_durability {
238                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
239                    crate::config::DurabilityMode::Batch {
240                        max_delay_ms,
241                        max_records,
242                    } => WalDurabilityMode::Batch {
243                        max_delay_ms,
244                        max_records,
245                    },
246                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
247                        WalDurabilityMode::Adaptive { target_interval_ms }
248                    }
249                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
250                };
251                let wal_config = WalConfig {
252                    durability: wal_durability,
253                    ..WalConfig::default()
254                };
255                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
256                Some(Arc::new(wal_manager))
257            } else {
258                None
259            }
260        } else {
261            None
262        };
263
264        // Create query cache with default capacity (1000 queries)
265        let query_cache = Arc::new(QueryCache::default());
266
267        Ok(Self {
268            config,
269            store,
270            catalog,
271            #[cfg(feature = "rdf")]
272            rdf_store,
273            transaction_manager,
274            buffer_manager,
275            #[cfg(feature = "wal")]
276            wal,
277            #[cfg(feature = "wal")]
278            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
279            query_cache,
280            commit_counter: Arc::new(AtomicUsize::new(0)),
281            is_open: RwLock::new(true),
282            #[cfg(feature = "cdc")]
283            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
284            #[cfg(feature = "embed")]
285            embedding_models: RwLock::new(hashbrown::HashMap::new()),
286            external_store: None,
287            current_graph: RwLock::new(None),
288        })
289    }
290
291    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
292    ///
293    /// The external store handles all data persistence. WAL, CDC, and index
294    /// management are the responsibility of the store implementation.
295    ///
296    /// Query execution (all 6 languages, optimizer, planner) works through the
297    /// provided store. Admin operations (schema introspection, persistence,
298    /// vector/text indexes) are not available on external stores.
299    ///
300    /// # Examples
301    ///
302    /// ```no_run
303    /// use std::sync::Arc;
304    /// use grafeo_engine::{GrafeoDB, Config};
305    /// use grafeo_core::graph::GraphStoreMut;
306    ///
307    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
308    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
309    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
310    ///     Ok(())
311    /// }
312    /// ```
313    ///
314    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
315    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
316        config
317            .validate()
318            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
319
320        let dummy_store = Arc::new(LpgStore::new()?);
321        let transaction_manager = Arc::new(TransactionManager::new());
322
323        let buffer_config = BufferManagerConfig {
324            budget: config.memory_limit.unwrap_or_else(|| {
325                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
326            }),
327            spill_path: None,
328            ..BufferManagerConfig::default()
329        };
330        let buffer_manager = BufferManager::new(buffer_config);
331
332        let query_cache = Arc::new(QueryCache::default());
333
334        Ok(Self {
335            config,
336            store: dummy_store,
337            catalog: Arc::new(Catalog::new()),
338            #[cfg(feature = "rdf")]
339            rdf_store: Arc::new(RdfStore::new()),
340            transaction_manager,
341            buffer_manager,
342            #[cfg(feature = "wal")]
343            wal: None,
344            #[cfg(feature = "wal")]
345            wal_graph_context: Arc::new(parking_lot::Mutex::new(None)),
346            query_cache,
347            commit_counter: Arc::new(AtomicUsize::new(0)),
348            is_open: RwLock::new(true),
349            #[cfg(feature = "cdc")]
350            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
351            #[cfg(feature = "embed")]
352            embedding_models: RwLock::new(hashbrown::HashMap::new()),
353            external_store: Some(store),
354            current_graph: RwLock::new(None),
355        })
356    }
357
358    /// Applies WAL records to restore the database state.
359    ///
360    /// Data mutation records are routed through a graph cursor that tracks
361    /// `SwitchGraph` context markers, replaying mutations into the correct
362    /// named graph (or the default graph when cursor is `None`).
363    #[cfg(feature = "wal")]
364    fn apply_wal_records(
365        store: &Arc<LpgStore>,
366        catalog: &Catalog,
367        #[cfg(feature = "rdf")] rdf_store: &Arc<RdfStore>,
368        records: &[WalRecord],
369    ) -> Result<()> {
370        use crate::catalog::{
371            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
372        };
373        use grafeo_common::utils::error::Error;
374
375        // Graph cursor: tracks which named graph receives data mutations.
376        // `None` means the default graph.
377        let mut current_graph: Option<String> = None;
378        let mut target_store: Arc<LpgStore> = Arc::clone(store);
379
380        for record in records {
381            match record {
382                // --- Named graph lifecycle ---
383                WalRecord::CreateNamedGraph { name } => {
384                    let _ = store.create_graph(name);
385                }
386                WalRecord::DropNamedGraph { name } => {
387                    store.drop_graph(name);
388                    // Reset cursor if the dropped graph was active
389                    if current_graph.as_deref() == Some(name.as_str()) {
390                        current_graph = None;
391                        target_store = Arc::clone(store);
392                    }
393                }
394                WalRecord::SwitchGraph { name } => {
395                    current_graph.clone_from(name);
396                    target_store = match &current_graph {
397                        None => Arc::clone(store),
398                        Some(graph_name) => store
399                            .graph_or_create(graph_name)
400                            .map_err(|e| Error::Internal(e.to_string()))?,
401                    };
402                }
403
404                // --- Data mutations: routed through target_store ---
405                WalRecord::CreateNode { id, labels } => {
406                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
407                    target_store.create_node_with_id(*id, &label_refs)?;
408                }
409                WalRecord::DeleteNode { id } => {
410                    target_store.delete_node(*id);
411                }
412                WalRecord::CreateEdge {
413                    id,
414                    src,
415                    dst,
416                    edge_type,
417                } => {
418                    target_store.create_edge_with_id(*id, *src, *dst, edge_type)?;
419                }
420                WalRecord::DeleteEdge { id } => {
421                    target_store.delete_edge(*id);
422                }
423                WalRecord::SetNodeProperty { id, key, value } => {
424                    target_store.set_node_property(*id, key, value.clone());
425                }
426                WalRecord::SetEdgeProperty { id, key, value } => {
427                    target_store.set_edge_property(*id, key, value.clone());
428                }
429                WalRecord::AddNodeLabel { id, label } => {
430                    target_store.add_label(*id, label);
431                }
432                WalRecord::RemoveNodeLabel { id, label } => {
433                    target_store.remove_label(*id, label);
434                }
435                WalRecord::RemoveNodeProperty { id, key } => {
436                    target_store.remove_node_property(*id, key);
437                }
438                WalRecord::RemoveEdgeProperty { id, key } => {
439                    target_store.remove_edge_property(*id, key);
440                }
441
442                // --- Schema DDL replay (always on root catalog) ---
443                WalRecord::CreateNodeType {
444                    name,
445                    properties,
446                    constraints,
447                } => {
448                    let def = NodeTypeDefinition {
449                        name: name.clone(),
450                        properties: properties
451                            .iter()
452                            .map(|(n, t, nullable)| TypedProperty {
453                                name: n.clone(),
454                                data_type: PropertyDataType::from_type_name(t),
455                                nullable: *nullable,
456                                default_value: None,
457                            })
458                            .collect(),
459                        constraints: constraints
460                            .iter()
461                            .map(|(kind, props)| match kind.as_str() {
462                                "unique" => TypeConstraint::Unique(props.clone()),
463                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
464                                "not_null" if !props.is_empty() => {
465                                    TypeConstraint::NotNull(props[0].clone())
466                                }
467                                _ => TypeConstraint::Unique(props.clone()),
468                            })
469                            .collect(),
470                        parent_types: Vec::new(),
471                    };
472                    let _ = catalog.register_node_type(def);
473                }
474                WalRecord::DropNodeType { name } => {
475                    let _ = catalog.drop_node_type(name);
476                }
477                WalRecord::CreateEdgeType {
478                    name,
479                    properties,
480                    constraints,
481                } => {
482                    let def = EdgeTypeDefinition {
483                        name: name.clone(),
484                        properties: properties
485                            .iter()
486                            .map(|(n, t, nullable)| TypedProperty {
487                                name: n.clone(),
488                                data_type: PropertyDataType::from_type_name(t),
489                                nullable: *nullable,
490                                default_value: None,
491                            })
492                            .collect(),
493                        constraints: constraints
494                            .iter()
495                            .map(|(kind, props)| match kind.as_str() {
496                                "unique" => TypeConstraint::Unique(props.clone()),
497                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
498                                "not_null" if !props.is_empty() => {
499                                    TypeConstraint::NotNull(props[0].clone())
500                                }
501                                _ => TypeConstraint::Unique(props.clone()),
502                            })
503                            .collect(),
504                        source_node_types: Vec::new(),
505                        target_node_types: Vec::new(),
506                    };
507                    let _ = catalog.register_edge_type_def(def);
508                }
509                WalRecord::DropEdgeType { name } => {
510                    let _ = catalog.drop_edge_type_def(name);
511                }
512                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
513                    // Index recreation is handled by the store on startup
514                    // (indexes are rebuilt from data, not WAL)
515                }
516                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
517                    // Constraint definitions are part of type definitions
518                    // and replayed via CreateNodeType/CreateEdgeType
519                }
520                WalRecord::CreateGraphType {
521                    name,
522                    node_types,
523                    edge_types,
524                    open,
525                } => {
526                    use crate::catalog::GraphTypeDefinition;
527                    let def = GraphTypeDefinition {
528                        name: name.clone(),
529                        allowed_node_types: node_types.clone(),
530                        allowed_edge_types: edge_types.clone(),
531                        open: *open,
532                    };
533                    let _ = catalog.register_graph_type(def);
534                }
535                WalRecord::DropGraphType { name } => {
536                    let _ = catalog.drop_graph_type(name);
537                }
538                WalRecord::CreateSchema { name } => {
539                    let _ = catalog.register_schema_namespace(name.clone());
540                }
541                WalRecord::DropSchema { name } => {
542                    let _ = catalog.drop_schema_namespace(name);
543                }
544
545                WalRecord::AlterNodeType { name, alterations } => {
546                    for (action, prop_name, type_name, nullable) in alterations {
547                        match action.as_str() {
548                            "add" => {
549                                let prop = TypedProperty {
550                                    name: prop_name.clone(),
551                                    data_type: PropertyDataType::from_type_name(type_name),
552                                    nullable: *nullable,
553                                    default_value: None,
554                                };
555                                let _ = catalog.alter_node_type_add_property(name, prop);
556                            }
557                            "drop" => {
558                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
559                            }
560                            _ => {}
561                        }
562                    }
563                }
564                WalRecord::AlterEdgeType { name, alterations } => {
565                    for (action, prop_name, type_name, nullable) in alterations {
566                        match action.as_str() {
567                            "add" => {
568                                let prop = TypedProperty {
569                                    name: prop_name.clone(),
570                                    data_type: PropertyDataType::from_type_name(type_name),
571                                    nullable: *nullable,
572                                    default_value: None,
573                                };
574                                let _ = catalog.alter_edge_type_add_property(name, prop);
575                            }
576                            "drop" => {
577                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
578                            }
579                            _ => {}
580                        }
581                    }
582                }
583                WalRecord::AlterGraphType { name, alterations } => {
584                    for (action, type_name) in alterations {
585                        match action.as_str() {
586                            "add_node" => {
587                                let _ =
588                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
589                            }
590                            "drop_node" => {
591                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
592                            }
593                            "add_edge" => {
594                                let _ =
595                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
596                            }
597                            "drop_edge" => {
598                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
599                            }
600                            _ => {}
601                        }
602                    }
603                }
604
605                WalRecord::CreateProcedure {
606                    name,
607                    params,
608                    returns,
609                    body,
610                } => {
611                    use crate::catalog::ProcedureDefinition;
612                    let def = ProcedureDefinition {
613                        name: name.clone(),
614                        params: params.clone(),
615                        returns: returns.clone(),
616                        body: body.clone(),
617                    };
618                    let _ = catalog.register_procedure(def);
619                }
620                WalRecord::DropProcedure { name } => {
621                    let _ = catalog.drop_procedure(name);
622                }
623
624                // --- RDF triple replay ---
625                #[cfg(feature = "rdf")]
626                WalRecord::InsertRdfTriple {
627                    subject,
628                    predicate,
629                    object,
630                    graph,
631                } => {
632                    use grafeo_core::graph::rdf::Term;
633                    if let (Some(s), Some(p), Some(o)) = (
634                        Term::from_ntriples(subject),
635                        Term::from_ntriples(predicate),
636                        Term::from_ntriples(object),
637                    ) {
638                        let triple = grafeo_core::graph::rdf::Triple::new(s, p, o);
639                        let target = match graph {
640                            Some(name) => rdf_store.graph_or_create(name),
641                            None => Arc::clone(rdf_store),
642                        };
643                        target.insert(triple);
644                    }
645                }
646                #[cfg(feature = "rdf")]
647                WalRecord::DeleteRdfTriple {
648                    subject,
649                    predicate,
650                    object,
651                    graph,
652                } => {
653                    use grafeo_core::graph::rdf::Term;
654                    if let (Some(s), Some(p), Some(o)) = (
655                        Term::from_ntriples(subject),
656                        Term::from_ntriples(predicate),
657                        Term::from_ntriples(object),
658                    ) {
659                        let triple = grafeo_core::graph::rdf::Triple::new(s, p, o);
660                        let target = match graph {
661                            Some(name) => rdf_store.graph_or_create(name),
662                            None => Arc::clone(rdf_store),
663                        };
664                        target.remove(&triple);
665                    }
666                }
667                #[cfg(feature = "rdf")]
668                WalRecord::ClearRdfGraph { graph } => {
669                    rdf_store.clear_graph(graph.as_deref());
670                }
671                #[cfg(feature = "rdf")]
672                WalRecord::CreateRdfGraph { name } => {
673                    let _ = rdf_store.create_graph(name);
674                }
675                #[cfg(feature = "rdf")]
676                WalRecord::DropRdfGraph { name } => match name {
677                    None => rdf_store.clear(),
678                    Some(graph_name) => {
679                        rdf_store.drop_graph(graph_name);
680                    }
681                },
682                // Skip RDF records when rdf feature is disabled
683                #[cfg(not(feature = "rdf"))]
684                WalRecord::InsertRdfTriple { .. }
685                | WalRecord::DeleteRdfTriple { .. }
686                | WalRecord::ClearRdfGraph { .. }
687                | WalRecord::CreateRdfGraph { .. }
688                | WalRecord::DropRdfGraph { .. } => {}
689
690                WalRecord::TransactionCommit { .. }
691                | WalRecord::TransactionAbort { .. }
692                | WalRecord::Checkpoint { .. } => {
693                    // Transaction control records don't need replay action
694                    // (recovery already filtered to only committed transactions)
695                }
696            }
697        }
698        Ok(())
699    }
700
701    // =========================================================================
702    // Session & Configuration
703    // =========================================================================
704
705    /// Opens a new session for running queries.
706    ///
707    /// Sessions are cheap to create: spin up as many as you need. Each
708    /// gets its own transaction context, so concurrent sessions won't
709    /// block each other on reads.
710    ///
711    /// # Panics
712    ///
713    /// Panics if the database was configured with an external graph store and
714    /// the internal arena allocator cannot be initialized (out of memory).
715    ///
716    /// # Examples
717    ///
718    /// ```
719    /// use grafeo_engine::GrafeoDB;
720    ///
721    /// let db = GrafeoDB::new_in_memory();
722    /// let session = db.session();
723    ///
724    /// // Run queries through the session
725    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
726    /// # Ok::<(), grafeo_common::utils::error::Error>(())
727    /// ```
728    #[must_use]
729    pub fn session(&self) -> Session {
730        let session_cfg = || crate::session::SessionConfig {
731            transaction_manager: Arc::clone(&self.transaction_manager),
732            query_cache: Arc::clone(&self.query_cache),
733            catalog: Arc::clone(&self.catalog),
734            adaptive_config: self.config.adaptive.clone(),
735            factorized_execution: self.config.factorized_execution,
736            graph_model: self.config.graph_model,
737            query_timeout: self.config.query_timeout,
738            commit_counter: Arc::clone(&self.commit_counter),
739            gc_interval: self.config.gc_interval,
740        };
741
742        if let Some(ref ext_store) = self.external_store {
743            return Session::with_external_store(Arc::clone(ext_store), session_cfg())
744                .expect("arena allocation for external store session");
745        }
746
747        #[cfg(feature = "rdf")]
748        let mut session = Session::with_rdf_store_and_adaptive(
749            Arc::clone(&self.store),
750            Arc::clone(&self.rdf_store),
751            session_cfg(),
752        );
753        #[cfg(not(feature = "rdf"))]
754        let mut session = Session::with_adaptive(Arc::clone(&self.store), session_cfg());
755
756        #[cfg(feature = "wal")]
757        if let Some(ref wal) = self.wal {
758            session.set_wal(Arc::clone(wal), Arc::clone(&self.wal_graph_context));
759        }
760
761        #[cfg(feature = "cdc")]
762        session.set_cdc_log(Arc::clone(&self.cdc_log));
763
764        // Propagate persistent graph context to the new session
765        if let Some(ref graph) = *self.current_graph.read() {
766            session.use_graph(graph);
767        }
768
769        // Suppress unused_mut when cdc/wal are disabled
770        let _ = &mut session;
771
772        session
773    }
774
775    /// Returns the current graph name, if any.
776    ///
777    /// This is the persistent graph context used by one-shot `execute()` calls.
778    /// It is updated whenever `execute()` encounters `USE GRAPH`, `SESSION SET GRAPH`,
779    /// or `SESSION RESET`.
780    #[must_use]
781    pub fn current_graph(&self) -> Option<String> {
782        self.current_graph.read().clone()
783    }
784
785    /// Sets the current graph context for subsequent one-shot `execute()` calls.
786    ///
787    /// This is equivalent to running `USE GRAPH <name>` but without creating a session.
788    /// Pass `None` to reset to the default graph.
789    pub fn set_current_graph(&self, name: Option<&str>) {
790        *self.current_graph.write() = name.map(ToString::to_string);
791    }
792
793    /// Returns the adaptive execution configuration.
794    #[must_use]
795    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
796        &self.config.adaptive
797    }
798
799    /// Returns the configuration.
800    #[must_use]
801    pub fn config(&self) -> &Config {
802        &self.config
803    }
804
805    /// Returns the graph data model of this database.
806    #[must_use]
807    pub fn graph_model(&self) -> crate::config::GraphModel {
808        self.config.graph_model
809    }
810
811    /// Returns the configured memory limit in bytes, if any.
812    #[must_use]
813    pub fn memory_limit(&self) -> Option<usize> {
814        self.config.memory_limit
815    }
816
817    /// Returns the underlying (default) store.
818    ///
819    /// This provides direct access to the LPG store for algorithm implementations
820    /// and admin operations (index management, schema introspection, MVCC internals).
821    ///
822    /// For code that only needs read/write graph operations, prefer
823    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
824    #[must_use]
825    pub fn store(&self) -> &Arc<LpgStore> {
826        &self.store
827    }
828
829    /// Returns the LPG store for the currently active graph.
830    ///
831    /// If [`current_graph`](Self::current_graph) is `None` or `"default"`, returns
832    /// the default store. Otherwise looks up the named graph in the root store.
833    /// Falls back to the default store if the named graph does not exist.
834    #[allow(dead_code)] // Reserved for future graph-aware CRUD methods
835    fn active_store(&self) -> Arc<LpgStore> {
836        let graph_name = self.current_graph.read().clone();
837        match graph_name {
838            None => Arc::clone(&self.store),
839            Some(ref name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
840            Some(ref name) => self
841                .store
842                .graph(name)
843                .unwrap_or_else(|| Arc::clone(&self.store)),
844        }
845    }
846
847    // === Named Graph Management ===
848
849    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
850    ///
851    /// # Errors
852    ///
853    /// Returns an error if arena allocation fails.
854    pub fn create_graph(&self, name: &str) -> Result<bool> {
855        Ok(self.store.create_graph(name)?)
856    }
857
858    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
859    pub fn drop_graph(&self, name: &str) -> bool {
860        self.store.drop_graph(name)
861    }
862
863    /// Returns all named graph names.
864    #[must_use]
865    pub fn list_graphs(&self) -> Vec<String> {
866        self.store.graph_names()
867    }
868
869    /// Returns the graph store as a trait object.
870    ///
871    /// This provides the [`GraphStoreMut`] interface for code that should work
872    /// with any storage backend. Use this when you only need graph read/write
873    /// operations and don't need admin methods like index management.
874    ///
875    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
876    #[must_use]
877    pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
878        if let Some(ref ext_store) = self.external_store {
879            Arc::clone(ext_store)
880        } else {
881            Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
882        }
883    }
884
885    /// Garbage collects old MVCC versions that are no longer visible.
886    ///
887    /// Determines the minimum epoch required by active transactions and prunes
888    /// version chains older than that threshold. Also cleans up completed
889    /// transaction metadata in the transaction manager.
890    pub fn gc(&self) {
891        let min_epoch = self.transaction_manager.min_active_epoch();
892        self.store.gc_versions(min_epoch);
893        self.transaction_manager.gc();
894    }
895
896    /// Returns the buffer manager for memory-aware operations.
897    #[must_use]
898    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
899        &self.buffer_manager
900    }
901
902    /// Returns the query cache.
903    #[must_use]
904    pub fn query_cache(&self) -> &Arc<QueryCache> {
905        &self.query_cache
906    }
907
908    /// Clears all cached query plans.
909    ///
910    /// This is called automatically after DDL operations, but can also be
911    /// invoked manually after external schema changes (e.g., WAL replay,
912    /// import) or when you want to force re-optimization of all queries.
913    pub fn clear_plan_cache(&self) {
914        self.query_cache.clear();
915    }
916
917    // =========================================================================
918    // Lifecycle
919    // =========================================================================
920
921    /// Closes the database, flushing all pending writes.
922    ///
923    /// For persistent databases, this ensures everything is safely on disk.
924    /// Called automatically when the database is dropped, but you can call
925    /// it explicitly if you need to guarantee durability at a specific point.
926    ///
927    /// # Errors
928    ///
929    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
930    pub fn close(&self) -> Result<()> {
931        let mut is_open = self.is_open.write();
932        if !*is_open {
933            return Ok(());
934        }
935
936        // Commit and checkpoint WAL
937        #[cfg(feature = "wal")]
938        if let Some(ref wal) = self.wal {
939            let epoch = self.store.current_epoch();
940
941            // Use the last assigned transaction ID, or create a checkpoint-only tx
942            let checkpoint_tx = self
943                .transaction_manager
944                .last_assigned_transaction_id()
945                .unwrap_or_else(|| {
946                    // No transactions have been started; begin one for checkpoint
947                    self.transaction_manager.begin()
948                });
949
950            // Log a TransactionCommit to mark all pending records as committed
951            wal.log(&WalRecord::TransactionCommit {
952                transaction_id: checkpoint_tx,
953            })?;
954
955            // Then checkpoint
956            wal.checkpoint(checkpoint_tx, epoch)?;
957            wal.sync()?;
958        }
959
960        *is_open = false;
961        Ok(())
962    }
963
964    /// Returns the typed WAL if available.
965    #[cfg(feature = "wal")]
966    #[must_use]
967    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
968        self.wal.as_ref()
969    }
970
971    /// Logs a WAL record if WAL is enabled.
972    #[cfg(feature = "wal")]
973    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
974        if let Some(ref wal) = self.wal {
975            wal.log(record)?;
976        }
977        Ok(())
978    }
979}
980
981impl Drop for GrafeoDB {
982    fn drop(&mut self) {
983        if let Err(e) = self.close() {
984            tracing::error!("Error closing database: {}", e);
985        }
986    }
987}
988
989impl crate::admin::AdminService for GrafeoDB {
990    fn info(&self) -> crate::admin::DatabaseInfo {
991        self.info()
992    }
993
994    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
995        self.detailed_stats()
996    }
997
998    fn schema(&self) -> crate::admin::SchemaInfo {
999        self.schema()
1000    }
1001
1002    fn validate(&self) -> crate::admin::ValidationResult {
1003        self.validate()
1004    }
1005
1006    fn wal_status(&self) -> crate::admin::WalStatus {
1007        self.wal_status()
1008    }
1009
1010    fn wal_checkpoint(&self) -> Result<()> {
1011        self.wal_checkpoint()
1012    }
1013}
1014
1015// =========================================================================
1016// Query Result Types
1017// =========================================================================
1018
1019/// The result of running a query.
1020///
1021/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1022/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1023///
1024/// # Examples
1025///
1026/// ```
1027/// use grafeo_engine::GrafeoDB;
1028///
1029/// let db = GrafeoDB::new_in_memory();
1030/// db.create_node(&["Person"]);
1031///
1032/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1033///
1034/// // Check what we got
1035/// println!("Columns: {:?}", result.columns);
1036/// println!("Rows: {}", result.row_count());
1037///
1038/// // Iterate through results
1039/// for row in result.iter() {
1040///     println!("{:?}", row);
1041/// }
1042/// # Ok::<(), grafeo_common::utils::error::Error>(())
1043/// ```
1044#[derive(Debug)]
1045pub struct QueryResult {
1046    /// Column names from the RETURN clause.
1047    pub columns: Vec<String>,
1048    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1049    pub column_types: Vec<grafeo_common::types::LogicalType>,
1050    /// The actual result rows.
1051    pub rows: Vec<Vec<grafeo_common::types::Value>>,
1052    /// Query execution time in milliseconds (if timing was enabled).
1053    pub execution_time_ms: Option<f64>,
1054    /// Number of rows scanned during query execution (estimate).
1055    pub rows_scanned: Option<u64>,
1056    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
1057    pub status_message: Option<String>,
1058    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
1059    pub gql_status: grafeo_common::utils::GqlStatus,
1060}
1061
1062impl QueryResult {
1063    /// Creates a fully empty query result (no columns, no rows).
1064    #[must_use]
1065    pub fn empty() -> Self {
1066        Self {
1067            columns: Vec::new(),
1068            column_types: Vec::new(),
1069            rows: Vec::new(),
1070            execution_time_ms: None,
1071            rows_scanned: None,
1072            status_message: None,
1073            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1074        }
1075    }
1076
1077    /// Creates a query result with only a status message (for DDL commands).
1078    #[must_use]
1079    pub fn status(msg: impl Into<String>) -> Self {
1080        Self {
1081            columns: Vec::new(),
1082            column_types: Vec::new(),
1083            rows: Vec::new(),
1084            execution_time_ms: None,
1085            rows_scanned: None,
1086            status_message: Some(msg.into()),
1087            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1088        }
1089    }
1090
1091    /// Creates a new empty query result.
1092    #[must_use]
1093    pub fn new(columns: Vec<String>) -> Self {
1094        let len = columns.len();
1095        Self {
1096            columns,
1097            column_types: vec![grafeo_common::types::LogicalType::Any; len],
1098            rows: Vec::new(),
1099            execution_time_ms: None,
1100            rows_scanned: None,
1101            status_message: None,
1102            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1103        }
1104    }
1105
1106    /// Creates a new empty query result with column types.
1107    #[must_use]
1108    pub fn with_types(
1109        columns: Vec<String>,
1110        column_types: Vec<grafeo_common::types::LogicalType>,
1111    ) -> Self {
1112        Self {
1113            columns,
1114            column_types,
1115            rows: Vec::new(),
1116            execution_time_ms: None,
1117            rows_scanned: None,
1118            status_message: None,
1119            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
1120        }
1121    }
1122
1123    /// Sets the execution metrics on this result.
1124    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
1125        self.execution_time_ms = Some(execution_time_ms);
1126        self.rows_scanned = Some(rows_scanned);
1127        self
1128    }
1129
1130    /// Returns the execution time in milliseconds, if available.
1131    #[must_use]
1132    pub fn execution_time_ms(&self) -> Option<f64> {
1133        self.execution_time_ms
1134    }
1135
1136    /// Returns the number of rows scanned, if available.
1137    #[must_use]
1138    pub fn rows_scanned(&self) -> Option<u64> {
1139        self.rows_scanned
1140    }
1141
1142    /// Returns the number of rows.
1143    #[must_use]
1144    pub fn row_count(&self) -> usize {
1145        self.rows.len()
1146    }
1147
1148    /// Returns the number of columns.
1149    #[must_use]
1150    pub fn column_count(&self) -> usize {
1151        self.columns.len()
1152    }
1153
1154    /// Returns true if the result is empty.
1155    #[must_use]
1156    pub fn is_empty(&self) -> bool {
1157        self.rows.is_empty()
1158    }
1159
1160    /// Extracts a single value from the result.
1161    ///
1162    /// Use this when your query returns exactly one row with one column,
1163    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1164    ///
1165    /// # Errors
1166    ///
1167    /// Returns an error if the result has multiple rows or columns.
1168    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1169        if self.rows.len() != 1 || self.columns.len() != 1 {
1170            return Err(grafeo_common::utils::error::Error::InvalidValue(
1171                "Expected single value".to_string(),
1172            ));
1173        }
1174        T::from_value(&self.rows[0][0])
1175    }
1176
1177    /// Returns an iterator over the rows.
1178    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1179        self.rows.iter()
1180    }
1181}
1182
1183/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
1184///
1185/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1186/// Used by [`QueryResult::scalar()`] to extract typed values.
1187pub trait FromValue: Sized {
1188    /// Attempts the conversion, returning an error on type mismatch.
1189    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1190}
1191
1192impl FromValue for i64 {
1193    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1194        value
1195            .as_int64()
1196            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1197                expected: "INT64".to_string(),
1198                found: value.type_name().to_string(),
1199            })
1200    }
1201}
1202
1203impl FromValue for f64 {
1204    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1205        value
1206            .as_float64()
1207            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1208                expected: "FLOAT64".to_string(),
1209                found: value.type_name().to_string(),
1210            })
1211    }
1212}
1213
1214impl FromValue for String {
1215    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1216        value.as_str().map(String::from).ok_or_else(|| {
1217            grafeo_common::utils::error::Error::TypeMismatch {
1218                expected: "STRING".to_string(),
1219                found: value.type_name().to_string(),
1220            }
1221        })
1222    }
1223}
1224
1225impl FromValue for bool {
1226    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1227        value
1228            .as_bool()
1229            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1230                expected: "BOOL".to_string(),
1231                found: value.type_name().to_string(),
1232            })
1233    }
1234}
1235
1236#[cfg(test)]
1237mod tests {
1238    use super::*;
1239
1240    #[test]
1241    fn test_create_in_memory_database() {
1242        let db = GrafeoDB::new_in_memory();
1243        assert_eq!(db.node_count(), 0);
1244        assert_eq!(db.edge_count(), 0);
1245    }
1246
1247    #[test]
1248    fn test_database_config() {
1249        let config = Config::in_memory().with_threads(4).with_query_logging();
1250
1251        let db = GrafeoDB::with_config(config).unwrap();
1252        assert_eq!(db.config().threads, 4);
1253        assert!(db.config().query_logging);
1254    }
1255
1256    #[test]
1257    fn test_database_session() {
1258        let db = GrafeoDB::new_in_memory();
1259        let _session = db.session();
1260        // Session should be created successfully
1261    }
1262
1263    #[cfg(feature = "wal")]
1264    #[test]
1265    fn test_persistent_database_recovery() {
1266        use grafeo_common::types::Value;
1267        use tempfile::tempdir;
1268
1269        let dir = tempdir().unwrap();
1270        let db_path = dir.path().join("test_db");
1271
1272        // Create database and add some data
1273        {
1274            let db = GrafeoDB::open(&db_path).unwrap();
1275
1276            let alix = db.create_node(&["Person"]);
1277            db.set_node_property(alix, "name", Value::from("Alix"));
1278
1279            let gus = db.create_node(&["Person"]);
1280            db.set_node_property(gus, "name", Value::from("Gus"));
1281
1282            let _edge = db.create_edge(alix, gus, "KNOWS");
1283
1284            // Explicitly close to flush WAL
1285            db.close().unwrap();
1286        }
1287
1288        // Reopen and verify data was recovered
1289        {
1290            let db = GrafeoDB::open(&db_path).unwrap();
1291
1292            assert_eq!(db.node_count(), 2);
1293            assert_eq!(db.edge_count(), 1);
1294
1295            // Verify nodes exist
1296            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1297            assert!(node0.is_some());
1298
1299            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1300            assert!(node1.is_some());
1301        }
1302    }
1303
1304    #[cfg(feature = "wal")]
1305    #[test]
1306    fn test_wal_logging() {
1307        use tempfile::tempdir;
1308
1309        let dir = tempdir().unwrap();
1310        let db_path = dir.path().join("wal_test_db");
1311
1312        let db = GrafeoDB::open(&db_path).unwrap();
1313
1314        // Create some data
1315        let node = db.create_node(&["Test"]);
1316        db.delete_node(node);
1317
1318        // WAL should have records
1319        if let Some(wal) = db.wal() {
1320            assert!(wal.record_count() > 0);
1321        }
1322
1323        db.close().unwrap();
1324    }
1325
1326    #[cfg(feature = "wal")]
1327    #[test]
1328    fn test_wal_recovery_multiple_sessions() {
1329        // Tests that WAL recovery works correctly across multiple open/close cycles
1330        use grafeo_common::types::Value;
1331        use tempfile::tempdir;
1332
1333        let dir = tempdir().unwrap();
1334        let db_path = dir.path().join("multi_session_db");
1335
1336        // Session 1: Create initial data
1337        {
1338            let db = GrafeoDB::open(&db_path).unwrap();
1339            let alix = db.create_node(&["Person"]);
1340            db.set_node_property(alix, "name", Value::from("Alix"));
1341            db.close().unwrap();
1342        }
1343
1344        // Session 2: Add more data
1345        {
1346            let db = GrafeoDB::open(&db_path).unwrap();
1347            assert_eq!(db.node_count(), 1); // Previous data recovered
1348            let gus = db.create_node(&["Person"]);
1349            db.set_node_property(gus, "name", Value::from("Gus"));
1350            db.close().unwrap();
1351        }
1352
1353        // Session 3: Verify all data
1354        {
1355            let db = GrafeoDB::open(&db_path).unwrap();
1356            assert_eq!(db.node_count(), 2);
1357
1358            // Verify properties were recovered correctly
1359            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1360            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1361
1362            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1363            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1364        }
1365    }
1366
1367    #[cfg(feature = "wal")]
1368    #[test]
1369    fn test_database_consistency_after_mutations() {
1370        // Tests that database remains consistent after a series of create/delete operations
1371        use grafeo_common::types::Value;
1372        use tempfile::tempdir;
1373
1374        let dir = tempdir().unwrap();
1375        let db_path = dir.path().join("consistency_db");
1376
1377        {
1378            let db = GrafeoDB::open(&db_path).unwrap();
1379
1380            // Create nodes
1381            let a = db.create_node(&["Node"]);
1382            let b = db.create_node(&["Node"]);
1383            let c = db.create_node(&["Node"]);
1384
1385            // Create edges
1386            let e1 = db.create_edge(a, b, "LINKS");
1387            let _e2 = db.create_edge(b, c, "LINKS");
1388
1389            // Delete middle node and its edge
1390            db.delete_edge(e1);
1391            db.delete_node(b);
1392
1393            // Set properties on remaining nodes
1394            db.set_node_property(a, "value", Value::Int64(1));
1395            db.set_node_property(c, "value", Value::Int64(3));
1396
1397            db.close().unwrap();
1398        }
1399
1400        // Reopen and verify consistency
1401        {
1402            let db = GrafeoDB::open(&db_path).unwrap();
1403
1404            // Should have 2 nodes (a and c), b was deleted
1405            // Note: node_count includes deleted nodes in some implementations
1406            // What matters is that the non-deleted nodes are accessible
1407            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1408            assert!(node_a.is_some());
1409
1410            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1411            assert!(node_c.is_some());
1412
1413            // Middle node should be deleted
1414            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1415            assert!(node_b.is_none());
1416        }
1417    }
1418
1419    #[cfg(feature = "wal")]
1420    #[test]
1421    fn test_close_is_idempotent() {
1422        // Calling close() multiple times should not cause errors
1423        use tempfile::tempdir;
1424
1425        let dir = tempdir().unwrap();
1426        let db_path = dir.path().join("close_test_db");
1427
1428        let db = GrafeoDB::open(&db_path).unwrap();
1429        db.create_node(&["Test"]);
1430
1431        // First close should succeed
1432        assert!(db.close().is_ok());
1433
1434        // Second close should also succeed (idempotent)
1435        assert!(db.close().is_ok());
1436    }
1437
1438    #[test]
1439    fn test_query_result_has_metrics() {
1440        // Verifies that query results include execution metrics
1441        let db = GrafeoDB::new_in_memory();
1442        db.create_node(&["Person"]);
1443        db.create_node(&["Person"]);
1444
1445        #[cfg(feature = "gql")]
1446        {
1447            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1448
1449            // Metrics should be populated
1450            assert!(result.execution_time_ms.is_some());
1451            assert!(result.rows_scanned.is_some());
1452            assert!(result.execution_time_ms.unwrap() >= 0.0);
1453            assert_eq!(result.rows_scanned.unwrap(), 2);
1454        }
1455    }
1456
1457    #[test]
1458    fn test_empty_query_result_metrics() {
1459        // Verifies metrics are correct for queries returning no results
1460        let db = GrafeoDB::new_in_memory();
1461        db.create_node(&["Person"]);
1462
1463        #[cfg(feature = "gql")]
1464        {
1465            // Query that matches nothing
1466            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1467
1468            assert!(result.execution_time_ms.is_some());
1469            assert!(result.rows_scanned.is_some());
1470            assert_eq!(result.rows_scanned.unwrap(), 0);
1471        }
1472    }
1473
1474    #[cfg(feature = "cdc")]
1475    mod cdc_integration {
1476        use super::*;
1477
1478        #[test]
1479        fn test_node_lifecycle_history() {
1480            let db = GrafeoDB::new_in_memory();
1481
1482            // Create
1483            let id = db.create_node(&["Person"]);
1484            // Update
1485            db.set_node_property(id, "name", "Alix".into());
1486            db.set_node_property(id, "name", "Gus".into());
1487            // Delete
1488            db.delete_node(id);
1489
1490            let history = db.history(id).unwrap();
1491            assert_eq!(history.len(), 4); // create + 2 updates + delete
1492            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1493            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1494            assert!(history[1].before.is_none()); // first set_node_property has no prior value
1495            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1496            assert!(history[2].before.is_some()); // second update has prior "Alix"
1497            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1498        }
1499
1500        #[test]
1501        fn test_edge_lifecycle_history() {
1502            let db = GrafeoDB::new_in_memory();
1503
1504            let alix = db.create_node(&["Person"]);
1505            let gus = db.create_node(&["Person"]);
1506            let edge = db.create_edge(alix, gus, "KNOWS");
1507            db.set_edge_property(edge, "since", 2024i64.into());
1508            db.delete_edge(edge);
1509
1510            let history = db.history(edge).unwrap();
1511            assert_eq!(history.len(), 3); // create + update + delete
1512            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1513            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1514            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1515        }
1516
1517        #[test]
1518        fn test_create_node_with_props_cdc() {
1519            let db = GrafeoDB::new_in_memory();
1520
1521            let id = db.create_node_with_props(
1522                &["Person"],
1523                vec![
1524                    ("name", grafeo_common::types::Value::from("Alix")),
1525                    ("age", grafeo_common::types::Value::from(30i64)),
1526                ],
1527            );
1528
1529            let history = db.history(id).unwrap();
1530            assert_eq!(history.len(), 1);
1531            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1532            // Props should be captured
1533            let after = history[0].after.as_ref().unwrap();
1534            assert_eq!(after.len(), 2);
1535        }
1536
1537        #[test]
1538        fn test_changes_between() {
1539            let db = GrafeoDB::new_in_memory();
1540
1541            let id1 = db.create_node(&["A"]);
1542            let _id2 = db.create_node(&["B"]);
1543            db.set_node_property(id1, "x", 1i64.into());
1544
1545            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
1546            let changes = db
1547                .changes_between(
1548                    grafeo_common::types::EpochId(0),
1549                    grafeo_common::types::EpochId(u64::MAX),
1550                )
1551                .unwrap();
1552            assert_eq!(changes.len(), 3); // 2 creates + 1 update
1553        }
1554    }
1555
1556    #[test]
1557    fn test_with_store_basic() {
1558        use grafeo_core::graph::lpg::LpgStore;
1559
1560        let store = Arc::new(LpgStore::new().unwrap());
1561        let n1 = store.create_node(&["Person"]);
1562        store.set_node_property(n1, "name", "Alix".into());
1563
1564        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1565        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1566
1567        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1568        assert_eq!(result.rows.len(), 1);
1569    }
1570
1571    #[test]
1572    fn test_with_store_session() {
1573        use grafeo_core::graph::lpg::LpgStore;
1574
1575        let store = Arc::new(LpgStore::new().unwrap());
1576        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1577        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1578
1579        let session = db.session();
1580        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1581        assert_eq!(result.rows.len(), 1);
1582    }
1583
1584    #[test]
1585    fn test_with_store_mutations() {
1586        use grafeo_core::graph::lpg::LpgStore;
1587
1588        let store = Arc::new(LpgStore::new().unwrap());
1589        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1590        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1591
1592        let mut session = db.session();
1593
1594        // Use an explicit transaction so INSERT and MATCH share the same
1595        // transaction context. With PENDING epochs, uncommitted versions are
1596        // only visible to the owning transaction.
1597        session.begin_transaction().unwrap();
1598        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1599
1600        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1601        assert_eq!(result.rows.len(), 1);
1602
1603        session.commit().unwrap();
1604    }
1605}