Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21mod search;
22#[cfg(feature = "wal")]
23pub(crate) mod wal_store;
24
25#[cfg(feature = "wal")]
26use std::path::Path;
27use std::sync::Arc;
28use std::sync::atomic::AtomicUsize;
29
30use parking_lot::RwLock;
31
32#[cfg(feature = "wal")]
33use grafeo_adapters::storage::wal::{
34    DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
35};
36use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
37use grafeo_common::utils::error::Result;
38use grafeo_core::graph::GraphStoreMut;
39use grafeo_core::graph::lpg::LpgStore;
40#[cfg(feature = "rdf")]
41use grafeo_core::graph::rdf::RdfStore;
42
43use crate::catalog::Catalog;
44use crate::config::Config;
45use crate::query::cache::QueryCache;
46use crate::session::Session;
47use crate::transaction::TransactionManager;
48
49/// Your handle to a Grafeo database.
50///
51/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
52/// quick experiments, or [`open()`](Self::open) for persistent storage.
53/// Then grab a [`session()`](Self::session) to start querying.
54///
55/// # Examples
56///
57/// ```
58/// use grafeo_engine::GrafeoDB;
59///
60/// // Quick in-memory database
61/// let db = GrafeoDB::new_in_memory();
62///
63/// // Add some data
64/// db.create_node(&["Person"]);
65///
66/// // Query it
67/// let session = db.session();
68/// let result = session.execute("MATCH (p:Person) RETURN p")?;
69/// # Ok::<(), grafeo_common::utils::error::Error>(())
70/// ```
71pub struct GrafeoDB {
72    /// Database configuration.
73    pub(super) config: Config,
74    /// The underlying graph store.
75    pub(super) store: Arc<LpgStore>,
76    /// Schema and metadata catalog shared across sessions.
77    pub(super) catalog: Arc<Catalog>,
78    /// RDF triple store (if RDF feature is enabled).
79    #[cfg(feature = "rdf")]
80    pub(super) rdf_store: Arc<RdfStore>,
81    /// Transaction manager.
82    pub(super) transaction_manager: Arc<TransactionManager>,
83    /// Unified buffer manager.
84    pub(super) buffer_manager: Arc<BufferManager>,
85    /// Write-ahead log manager (if durability is enabled).
86    #[cfg(feature = "wal")]
87    pub(super) wal: Option<Arc<LpgWal>>,
88    /// Query cache for parsed and optimized plans.
89    pub(super) query_cache: Arc<QueryCache>,
90    /// Shared commit counter for auto-GC across sessions.
91    pub(super) commit_counter: Arc<AtomicUsize>,
92    /// Whether the database is open.
93    pub(super) is_open: RwLock<bool>,
94    /// Change data capture log for tracking mutations.
95    #[cfg(feature = "cdc")]
96    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
97    /// Registered embedding models for text-to-vector conversion.
98    #[cfg(feature = "embed")]
99    pub(super) embedding_models:
100        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
101    /// External graph store (when using with_store()).
102    /// When set, sessions route queries through this store instead of the built-in LpgStore.
103    pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
104}
105
106impl GrafeoDB {
107    /// Creates an in-memory database - fast to create, gone when dropped.
108    ///
109    /// Use this for tests, experiments, or when you don't need persistence.
110    /// For data that survives restarts, use [`open()`](Self::open) instead.
111    ///
112    /// # Examples
113    ///
114    /// ```
115    /// use grafeo_engine::GrafeoDB;
116    ///
117    /// let db = GrafeoDB::new_in_memory();
118    /// let session = db.session();
119    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
120    /// # Ok::<(), grafeo_common::utils::error::Error>(())
121    /// ```
122    #[must_use]
123    pub fn new_in_memory() -> Self {
124        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
125    }
126
127    /// Opens a database at the given path, creating it if it doesn't exist.
128    ///
129    /// If you've used this path before, Grafeo recovers your data from the
130    /// write-ahead log automatically. First open on a new path creates an
131    /// empty database.
132    ///
133    /// # Errors
134    ///
135    /// Returns an error if the path isn't writable or recovery fails.
136    ///
137    /// # Examples
138    ///
139    /// ```no_run
140    /// use grafeo_engine::GrafeoDB;
141    ///
142    /// let db = GrafeoDB::open("./my_social_network")?;
143    /// # Ok::<(), grafeo_common::utils::error::Error>(())
144    /// ```
145    #[cfg(feature = "wal")]
146    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
147        Self::with_config(Config::persistent(path.as_ref()))
148    }
149
150    /// Creates a database with custom configuration.
151    ///
152    /// Use this when you need fine-grained control over memory limits,
153    /// thread counts, or persistence settings. For most cases,
154    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
155    /// are simpler.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the database can't be created or recovery fails.
160    ///
161    /// # Examples
162    ///
163    /// ```
164    /// use grafeo_engine::{GrafeoDB, Config};
165    ///
166    /// // In-memory with a 512MB limit
167    /// let config = Config::in_memory()
168    ///     .with_memory_limit(512 * 1024 * 1024);
169    ///
170    /// let db = GrafeoDB::with_config(config)?;
171    /// # Ok::<(), grafeo_common::utils::error::Error>(())
172    /// ```
173    pub fn with_config(config: Config) -> Result<Self> {
174        // Validate configuration before proceeding
175        config
176            .validate()
177            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
178
179        let store = Arc::new(LpgStore::new()?);
180        #[cfg(feature = "rdf")]
181        let rdf_store = Arc::new(RdfStore::new());
182        let transaction_manager = Arc::new(TransactionManager::new());
183
184        // Create buffer manager with configured limits
185        let buffer_config = BufferManagerConfig {
186            budget: config.memory_limit.unwrap_or_else(|| {
187                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
188            }),
189            spill_path: config
190                .spill_path
191                .clone()
192                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
193            ..BufferManagerConfig::default()
194        };
195        let buffer_manager = BufferManager::new(buffer_config);
196
197        // Create catalog early so WAL replay can restore schema definitions
198        let catalog = Arc::new(Catalog::new());
199
200        // Initialize WAL if persistence is enabled
201        #[cfg(feature = "wal")]
202        let wal = if config.wal_enabled {
203            if let Some(ref db_path) = config.path {
204                // Create database directory if it doesn't exist
205                std::fs::create_dir_all(db_path)?;
206
207                let wal_path = db_path.join("wal");
208
209                // Check if WAL exists and recover if needed
210                if wal_path.exists() {
211                    let recovery = WalRecovery::new(&wal_path);
212                    let records = recovery.recover()?;
213                    Self::apply_wal_records(&store, &catalog, &records)?;
214                }
215
216                // Open/create WAL manager with configured durability
217                let wal_durability = match config.wal_durability {
218                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
219                    crate::config::DurabilityMode::Batch {
220                        max_delay_ms,
221                        max_records,
222                    } => WalDurabilityMode::Batch {
223                        max_delay_ms,
224                        max_records,
225                    },
226                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
227                        WalDurabilityMode::Adaptive { target_interval_ms }
228                    }
229                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
230                };
231                let wal_config = WalConfig {
232                    durability: wal_durability,
233                    ..WalConfig::default()
234                };
235                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
236                Some(Arc::new(wal_manager))
237            } else {
238                None
239            }
240        } else {
241            None
242        };
243
244        // Create query cache with default capacity (1000 queries)
245        let query_cache = Arc::new(QueryCache::default());
246
247        Ok(Self {
248            config,
249            store,
250            catalog,
251            #[cfg(feature = "rdf")]
252            rdf_store,
253            transaction_manager,
254            buffer_manager,
255            #[cfg(feature = "wal")]
256            wal,
257            query_cache,
258            commit_counter: Arc::new(AtomicUsize::new(0)),
259            is_open: RwLock::new(true),
260            #[cfg(feature = "cdc")]
261            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
262            #[cfg(feature = "embed")]
263            embedding_models: RwLock::new(hashbrown::HashMap::new()),
264            external_store: None,
265        })
266    }
267
268    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
269    ///
270    /// The external store handles all data persistence. WAL, CDC, and index
271    /// management are the responsibility of the store implementation.
272    ///
273    /// Query execution (all 6 languages, optimizer, planner) works through the
274    /// provided store. Admin operations (schema introspection, persistence,
275    /// vector/text indexes) are not available on external stores.
276    ///
277    /// # Examples
278    ///
279    /// ```no_run
280    /// use std::sync::Arc;
281    /// use grafeo_engine::{GrafeoDB, Config};
282    /// use grafeo_core::graph::GraphStoreMut;
283    ///
284    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
285    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
286    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
287    ///     Ok(())
288    /// }
289    /// ```
290    ///
291    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
292    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
293        config
294            .validate()
295            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
296
297        let dummy_store = Arc::new(LpgStore::new()?);
298        let transaction_manager = Arc::new(TransactionManager::new());
299
300        let buffer_config = BufferManagerConfig {
301            budget: config.memory_limit.unwrap_or_else(|| {
302                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
303            }),
304            spill_path: None,
305            ..BufferManagerConfig::default()
306        };
307        let buffer_manager = BufferManager::new(buffer_config);
308
309        let query_cache = Arc::new(QueryCache::default());
310
311        Ok(Self {
312            config,
313            store: dummy_store,
314            catalog: Arc::new(Catalog::new()),
315            #[cfg(feature = "rdf")]
316            rdf_store: Arc::new(RdfStore::new()),
317            transaction_manager,
318            buffer_manager,
319            #[cfg(feature = "wal")]
320            wal: None,
321            query_cache,
322            commit_counter: Arc::new(AtomicUsize::new(0)),
323            is_open: RwLock::new(true),
324            #[cfg(feature = "cdc")]
325            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
326            #[cfg(feature = "embed")]
327            embedding_models: RwLock::new(hashbrown::HashMap::new()),
328            external_store: Some(store),
329        })
330    }
331
332    /// Applies WAL records to restore the database state.
333    #[cfg(feature = "wal")]
334    fn apply_wal_records(store: &LpgStore, catalog: &Catalog, records: &[WalRecord]) -> Result<()> {
335        use crate::catalog::{
336            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
337        };
338
339        for record in records {
340            match record {
341                WalRecord::CreateNode { id, labels } => {
342                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
343                    store.create_node_with_id(*id, &label_refs);
344                }
345                WalRecord::DeleteNode { id } => {
346                    store.delete_node(*id);
347                }
348                WalRecord::CreateEdge {
349                    id,
350                    src,
351                    dst,
352                    edge_type,
353                } => {
354                    store.create_edge_with_id(*id, *src, *dst, edge_type);
355                }
356                WalRecord::DeleteEdge { id } => {
357                    store.delete_edge(*id);
358                }
359                WalRecord::SetNodeProperty { id, key, value } => {
360                    store.set_node_property(*id, key, value.clone());
361                }
362                WalRecord::SetEdgeProperty { id, key, value } => {
363                    store.set_edge_property(*id, key, value.clone());
364                }
365                WalRecord::AddNodeLabel { id, label } => {
366                    store.add_label(*id, label);
367                }
368                WalRecord::RemoveNodeLabel { id, label } => {
369                    store.remove_label(*id, label);
370                }
371                WalRecord::RemoveNodeProperty { id, key } => {
372                    store.remove_node_property(*id, key);
373                }
374                WalRecord::RemoveEdgeProperty { id, key } => {
375                    store.remove_edge_property(*id, key);
376                }
377
378                // Schema DDL replay
379                WalRecord::CreateNodeType {
380                    name,
381                    properties,
382                    constraints,
383                } => {
384                    let def = NodeTypeDefinition {
385                        name: name.clone(),
386                        properties: properties
387                            .iter()
388                            .map(|(n, t, nullable)| TypedProperty {
389                                name: n.clone(),
390                                data_type: PropertyDataType::from_type_name(t),
391                                nullable: *nullable,
392                                default_value: None,
393                            })
394                            .collect(),
395                        constraints: constraints
396                            .iter()
397                            .map(|(kind, props)| match kind.as_str() {
398                                "unique" => TypeConstraint::Unique(props.clone()),
399                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
400                                "not_null" if !props.is_empty() => {
401                                    TypeConstraint::NotNull(props[0].clone())
402                                }
403                                _ => TypeConstraint::Unique(props.clone()),
404                            })
405                            .collect(),
406                    };
407                    let _ = catalog.register_node_type(def);
408                }
409                WalRecord::DropNodeType { name } => {
410                    let _ = catalog.drop_node_type(name);
411                }
412                WalRecord::CreateEdgeType {
413                    name,
414                    properties,
415                    constraints,
416                } => {
417                    let def = EdgeTypeDefinition {
418                        name: name.clone(),
419                        properties: properties
420                            .iter()
421                            .map(|(n, t, nullable)| TypedProperty {
422                                name: n.clone(),
423                                data_type: PropertyDataType::from_type_name(t),
424                                nullable: *nullable,
425                                default_value: None,
426                            })
427                            .collect(),
428                        constraints: constraints
429                            .iter()
430                            .map(|(kind, props)| match kind.as_str() {
431                                "unique" => TypeConstraint::Unique(props.clone()),
432                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
433                                "not_null" if !props.is_empty() => {
434                                    TypeConstraint::NotNull(props[0].clone())
435                                }
436                                _ => TypeConstraint::Unique(props.clone()),
437                            })
438                            .collect(),
439                    };
440                    let _ = catalog.register_edge_type_def(def);
441                }
442                WalRecord::DropEdgeType { name } => {
443                    let _ = catalog.drop_edge_type_def(name);
444                }
445                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
446                    // Index recreation is handled by the store on startup
447                    // (indexes are rebuilt from data, not WAL)
448                }
449                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
450                    // Constraint definitions are part of type definitions
451                    // and replayed via CreateNodeType/CreateEdgeType
452                }
453                WalRecord::CreateGraphType {
454                    name,
455                    node_types,
456                    edge_types,
457                    open,
458                } => {
459                    use crate::catalog::GraphTypeDefinition;
460                    let def = GraphTypeDefinition {
461                        name: name.clone(),
462                        allowed_node_types: node_types.clone(),
463                        allowed_edge_types: edge_types.clone(),
464                        open: *open,
465                    };
466                    let _ = catalog.register_graph_type(def);
467                }
468                WalRecord::DropGraphType { name } => {
469                    let _ = catalog.drop_graph_type(name);
470                }
471                WalRecord::CreateSchema { name } => {
472                    let _ = catalog.register_schema_namespace(name.clone());
473                }
474                WalRecord::DropSchema { name } => {
475                    let _ = catalog.drop_schema_namespace(name);
476                }
477
478                WalRecord::AlterNodeType { name, alterations } => {
479                    for (action, prop_name, type_name, nullable) in alterations {
480                        match action.as_str() {
481                            "add" => {
482                                let prop = TypedProperty {
483                                    name: prop_name.clone(),
484                                    data_type: PropertyDataType::from_type_name(type_name),
485                                    nullable: *nullable,
486                                    default_value: None,
487                                };
488                                let _ = catalog.alter_node_type_add_property(name, prop);
489                            }
490                            "drop" => {
491                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
492                            }
493                            _ => {}
494                        }
495                    }
496                }
497                WalRecord::AlterEdgeType { name, alterations } => {
498                    for (action, prop_name, type_name, nullable) in alterations {
499                        match action.as_str() {
500                            "add" => {
501                                let prop = TypedProperty {
502                                    name: prop_name.clone(),
503                                    data_type: PropertyDataType::from_type_name(type_name),
504                                    nullable: *nullable,
505                                    default_value: None,
506                                };
507                                let _ = catalog.alter_edge_type_add_property(name, prop);
508                            }
509                            "drop" => {
510                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
511                            }
512                            _ => {}
513                        }
514                    }
515                }
516                WalRecord::AlterGraphType { name, alterations } => {
517                    for (action, type_name) in alterations {
518                        match action.as_str() {
519                            "add_node" => {
520                                let _ =
521                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
522                            }
523                            "drop_node" => {
524                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
525                            }
526                            "add_edge" => {
527                                let _ =
528                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
529                            }
530                            "drop_edge" => {
531                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
532                            }
533                            _ => {}
534                        }
535                    }
536                }
537
538                WalRecord::CreateProcedure {
539                    name,
540                    params,
541                    returns,
542                    body,
543                } => {
544                    use crate::catalog::ProcedureDefinition;
545                    let def = ProcedureDefinition {
546                        name: name.clone(),
547                        params: params.clone(),
548                        returns: returns.clone(),
549                        body: body.clone(),
550                    };
551                    let _ = catalog.register_procedure(def);
552                }
553                WalRecord::DropProcedure { name } => {
554                    let _ = catalog.drop_procedure(name);
555                }
556
557                WalRecord::TransactionCommit { .. }
558                | WalRecord::TransactionAbort { .. }
559                | WalRecord::Checkpoint { .. } => {
560                    // Transaction control records don't need replay action
561                    // (recovery already filtered to only committed transactions)
562                }
563            }
564        }
565        Ok(())
566    }
567
568    // =========================================================================
569    // Session & Configuration
570    // =========================================================================
571
572    /// Opens a new session for running queries.
573    ///
574    /// Sessions are cheap to create - spin up as many as you need. Each
575    /// gets its own transaction context, so concurrent sessions won't
576    /// block each other on reads.
577    ///
578    /// # Examples
579    ///
580    /// ```
581    /// use grafeo_engine::GrafeoDB;
582    ///
583    /// let db = GrafeoDB::new_in_memory();
584    /// let session = db.session();
585    ///
586    /// // Run queries through the session
587    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
588    /// # Ok::<(), grafeo_common::utils::error::Error>(())
589    /// ```
590    #[must_use]
591    pub fn session(&self) -> Session {
592        if let Some(ref ext_store) = self.external_store {
593            return Session::with_external_store(
594                Arc::clone(ext_store),
595                Arc::clone(&self.transaction_manager),
596                Arc::clone(&self.query_cache),
597                Arc::clone(&self.catalog),
598                self.config.adaptive.clone(),
599                self.config.factorized_execution,
600                self.config.graph_model,
601                self.config.query_timeout,
602                Arc::clone(&self.commit_counter),
603                self.config.gc_interval,
604            );
605        }
606
607        #[cfg(feature = "rdf")]
608        let mut session = Session::with_rdf_store_and_adaptive(
609            Arc::clone(&self.store),
610            Arc::clone(&self.rdf_store),
611            Arc::clone(&self.transaction_manager),
612            Arc::clone(&self.query_cache),
613            Arc::clone(&self.catalog),
614            self.config.adaptive.clone(),
615            self.config.factorized_execution,
616            self.config.graph_model,
617            self.config.query_timeout,
618            Arc::clone(&self.commit_counter),
619            self.config.gc_interval,
620        );
621        #[cfg(not(feature = "rdf"))]
622        let mut session = Session::with_adaptive(
623            Arc::clone(&self.store),
624            Arc::clone(&self.transaction_manager),
625            Arc::clone(&self.query_cache),
626            Arc::clone(&self.catalog),
627            self.config.adaptive.clone(),
628            self.config.factorized_execution,
629            self.config.graph_model,
630            self.config.query_timeout,
631            Arc::clone(&self.commit_counter),
632            self.config.gc_interval,
633        );
634
635        #[cfg(feature = "wal")]
636        if let Some(ref wal) = self.wal {
637            session.set_wal(Arc::clone(wal));
638        }
639
640        #[cfg(feature = "cdc")]
641        session.set_cdc_log(Arc::clone(&self.cdc_log));
642
643        // Suppress unused_mut when cdc/wal are disabled
644        let _ = &mut session;
645
646        session
647    }
648
649    /// Returns the adaptive execution configuration.
650    #[must_use]
651    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
652        &self.config.adaptive
653    }
654
655    /// Returns the configuration.
656    #[must_use]
657    pub fn config(&self) -> &Config {
658        &self.config
659    }
660
661    /// Returns the graph data model of this database.
662    #[must_use]
663    pub fn graph_model(&self) -> crate::config::GraphModel {
664        self.config.graph_model
665    }
666
667    /// Returns the configured memory limit in bytes, if any.
668    #[must_use]
669    pub fn memory_limit(&self) -> Option<usize> {
670        self.config.memory_limit
671    }
672
673    /// Returns the underlying store.
674    ///
675    /// This provides direct access to the LPG store for algorithm implementations
676    /// and admin operations (index management, schema introspection, MVCC internals).
677    ///
678    /// For code that only needs read/write graph operations, prefer
679    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
680    #[must_use]
681    pub fn store(&self) -> &Arc<LpgStore> {
682        &self.store
683    }
684
685    // === Named Graph Management ===
686
687    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
688    ///
689    /// # Errors
690    ///
691    /// Returns an error if arena allocation fails.
692    pub fn create_graph(&self, name: &str) -> Result<bool> {
693        Ok(self.store.create_graph(name)?)
694    }
695
696    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
697    pub fn drop_graph(&self, name: &str) -> bool {
698        self.store.drop_graph(name)
699    }
700
701    /// Returns all named graph names.
702    #[must_use]
703    pub fn list_graphs(&self) -> Vec<String> {
704        self.store.graph_names()
705    }
706
707    /// Returns the graph store as a trait object.
708    ///
709    /// This provides the [`GraphStoreMut`] interface for code that should work
710    /// with any storage backend. Use this when you only need graph read/write
711    /// operations and don't need admin methods like index management.
712    ///
713    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
714    #[must_use]
715    pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
716        if let Some(ref ext_store) = self.external_store {
717            Arc::clone(ext_store)
718        } else {
719            Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
720        }
721    }
722
723    /// Garbage collects old MVCC versions that are no longer visible.
724    ///
725    /// Determines the minimum epoch required by active transactions and prunes
726    /// version chains older than that threshold. Also cleans up completed
727    /// transaction metadata in the transaction manager.
728    pub fn gc(&self) {
729        let min_epoch = self.transaction_manager.min_active_epoch();
730        self.store.gc_versions(min_epoch);
731        self.transaction_manager.gc();
732    }
733
734    /// Returns the buffer manager for memory-aware operations.
735    #[must_use]
736    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
737        &self.buffer_manager
738    }
739
740    /// Returns the query cache.
741    #[must_use]
742    pub fn query_cache(&self) -> &Arc<QueryCache> {
743        &self.query_cache
744    }
745
746    /// Clears all cached query plans.
747    ///
748    /// This is called automatically after DDL operations, but can also be
749    /// invoked manually after external schema changes (e.g., WAL replay,
750    /// import) or when you want to force re-optimization of all queries.
751    pub fn clear_plan_cache(&self) {
752        self.query_cache.clear();
753    }
754
755    // =========================================================================
756    // Lifecycle
757    // =========================================================================
758
759    /// Closes the database, flushing all pending writes.
760    ///
761    /// For persistent databases, this ensures everything is safely on disk.
762    /// Called automatically when the database is dropped, but you can call
763    /// it explicitly if you need to guarantee durability at a specific point.
764    ///
765    /// # Errors
766    ///
767    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
768    pub fn close(&self) -> Result<()> {
769        let mut is_open = self.is_open.write();
770        if !*is_open {
771            return Ok(());
772        }
773
774        // Commit and checkpoint WAL
775        #[cfg(feature = "wal")]
776        if let Some(ref wal) = self.wal {
777            let epoch = self.store.current_epoch();
778
779            // Use the last assigned transaction ID, or create a checkpoint-only tx
780            let checkpoint_tx = self
781                .transaction_manager
782                .last_assigned_transaction_id()
783                .unwrap_or_else(|| {
784                    // No transactions have been started; begin one for checkpoint
785                    self.transaction_manager.begin()
786                });
787
788            // Log a TransactionCommit to mark all pending records as committed
789            wal.log(&WalRecord::TransactionCommit {
790                transaction_id: checkpoint_tx,
791            })?;
792
793            // Then checkpoint
794            wal.checkpoint(checkpoint_tx, epoch)?;
795            wal.sync()?;
796        }
797
798        *is_open = false;
799        Ok(())
800    }
801
802    /// Returns the typed WAL if available.
803    #[cfg(feature = "wal")]
804    #[must_use]
805    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
806        self.wal.as_ref()
807    }
808
809    /// Logs a WAL record if WAL is enabled.
810    #[cfg(feature = "wal")]
811    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
812        if let Some(ref wal) = self.wal {
813            wal.log(record)?;
814        }
815        Ok(())
816    }
817}
818
819impl Drop for GrafeoDB {
820    fn drop(&mut self) {
821        if let Err(e) = self.close() {
822            tracing::error!("Error closing database: {}", e);
823        }
824    }
825}
826
827impl crate::admin::AdminService for GrafeoDB {
828    fn info(&self) -> crate::admin::DatabaseInfo {
829        self.info()
830    }
831
832    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
833        self.detailed_stats()
834    }
835
836    fn schema(&self) -> crate::admin::SchemaInfo {
837        self.schema()
838    }
839
840    fn validate(&self) -> crate::admin::ValidationResult {
841        self.validate()
842    }
843
844    fn wal_status(&self) -> crate::admin::WalStatus {
845        self.wal_status()
846    }
847
848    fn wal_checkpoint(&self) -> Result<()> {
849        self.wal_checkpoint()
850    }
851}
852
853// =========================================================================
854// Query Result Types
855// =========================================================================
856
857/// The result of running a query.
858///
859/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
860/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
861///
862/// # Examples
863///
864/// ```
865/// use grafeo_engine::GrafeoDB;
866///
867/// let db = GrafeoDB::new_in_memory();
868/// db.create_node(&["Person"]);
869///
870/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
871///
872/// // Check what we got
873/// println!("Columns: {:?}", result.columns);
874/// println!("Rows: {}", result.row_count());
875///
876/// // Iterate through results
877/// for row in result.iter() {
878///     println!("{:?}", row);
879/// }
880/// # Ok::<(), grafeo_common::utils::error::Error>(())
881/// ```
882#[derive(Debug)]
883pub struct QueryResult {
884    /// Column names from the RETURN clause.
885    pub columns: Vec<String>,
886    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
887    pub column_types: Vec<grafeo_common::types::LogicalType>,
888    /// The actual result rows.
889    pub rows: Vec<Vec<grafeo_common::types::Value>>,
890    /// Query execution time in milliseconds (if timing was enabled).
891    pub execution_time_ms: Option<f64>,
892    /// Number of rows scanned during query execution (estimate).
893    pub rows_scanned: Option<u64>,
894    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
895    pub status_message: Option<String>,
896    /// GQLSTATUS code per ISO/IEC 39075:2024, sec 23.
897    pub gql_status: grafeo_common::utils::GqlStatus,
898}
899
900impl QueryResult {
901    /// Creates a fully empty query result (no columns, no rows).
902    #[must_use]
903    pub fn empty() -> Self {
904        Self {
905            columns: Vec::new(),
906            column_types: Vec::new(),
907            rows: Vec::new(),
908            execution_time_ms: None,
909            rows_scanned: None,
910            status_message: None,
911            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
912        }
913    }
914
915    /// Creates a query result with only a status message (for DDL commands).
916    #[must_use]
917    pub fn status(msg: impl Into<String>) -> Self {
918        Self {
919            columns: Vec::new(),
920            column_types: Vec::new(),
921            rows: Vec::new(),
922            execution_time_ms: None,
923            rows_scanned: None,
924            status_message: Some(msg.into()),
925            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
926        }
927    }
928
929    /// Creates a new empty query result.
930    #[must_use]
931    pub fn new(columns: Vec<String>) -> Self {
932        let len = columns.len();
933        Self {
934            columns,
935            column_types: vec![grafeo_common::types::LogicalType::Any; len],
936            rows: Vec::new(),
937            execution_time_ms: None,
938            rows_scanned: None,
939            status_message: None,
940            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
941        }
942    }
943
944    /// Creates a new empty query result with column types.
945    #[must_use]
946    pub fn with_types(
947        columns: Vec<String>,
948        column_types: Vec<grafeo_common::types::LogicalType>,
949    ) -> Self {
950        Self {
951            columns,
952            column_types,
953            rows: Vec::new(),
954            execution_time_ms: None,
955            rows_scanned: None,
956            status_message: None,
957            gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
958        }
959    }
960
961    /// Sets the execution metrics on this result.
962    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
963        self.execution_time_ms = Some(execution_time_ms);
964        self.rows_scanned = Some(rows_scanned);
965        self
966    }
967
968    /// Returns the execution time in milliseconds, if available.
969    #[must_use]
970    pub fn execution_time_ms(&self) -> Option<f64> {
971        self.execution_time_ms
972    }
973
974    /// Returns the number of rows scanned, if available.
975    #[must_use]
976    pub fn rows_scanned(&self) -> Option<u64> {
977        self.rows_scanned
978    }
979
980    /// Returns the number of rows.
981    #[must_use]
982    pub fn row_count(&self) -> usize {
983        self.rows.len()
984    }
985
986    /// Returns the number of columns.
987    #[must_use]
988    pub fn column_count(&self) -> usize {
989        self.columns.len()
990    }
991
992    /// Returns true if the result is empty.
993    #[must_use]
994    pub fn is_empty(&self) -> bool {
995        self.rows.is_empty()
996    }
997
998    /// Extracts a single value from the result.
999    ///
1000    /// Use this when your query returns exactly one row with one column,
1001    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1002    ///
1003    /// # Errors
1004    ///
1005    /// Returns an error if the result has multiple rows or columns.
1006    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1007        if self.rows.len() != 1 || self.columns.len() != 1 {
1008            return Err(grafeo_common::utils::error::Error::InvalidValue(
1009                "Expected single value".to_string(),
1010            ));
1011        }
1012        T::from_value(&self.rows[0][0])
1013    }
1014
1015    /// Returns an iterator over the rows.
1016    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1017        self.rows.iter()
1018    }
1019}
1020
1021/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
1022///
1023/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1024/// Used by [`QueryResult::scalar()`] to extract typed values.
1025pub trait FromValue: Sized {
1026    /// Attempts the conversion, returning an error on type mismatch.
1027    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1028}
1029
1030impl FromValue for i64 {
1031    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1032        value
1033            .as_int64()
1034            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1035                expected: "INT64".to_string(),
1036                found: value.type_name().to_string(),
1037            })
1038    }
1039}
1040
1041impl FromValue for f64 {
1042    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1043        value
1044            .as_float64()
1045            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1046                expected: "FLOAT64".to_string(),
1047                found: value.type_name().to_string(),
1048            })
1049    }
1050}
1051
1052impl FromValue for String {
1053    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1054        value.as_str().map(String::from).ok_or_else(|| {
1055            grafeo_common::utils::error::Error::TypeMismatch {
1056                expected: "STRING".to_string(),
1057                found: value.type_name().to_string(),
1058            }
1059        })
1060    }
1061}
1062
1063impl FromValue for bool {
1064    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1065        value
1066            .as_bool()
1067            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1068                expected: "BOOL".to_string(),
1069                found: value.type_name().to_string(),
1070            })
1071    }
1072}
1073
1074#[cfg(test)]
1075mod tests {
1076    use super::*;
1077
1078    #[test]
1079    fn test_create_in_memory_database() {
1080        let db = GrafeoDB::new_in_memory();
1081        assert_eq!(db.node_count(), 0);
1082        assert_eq!(db.edge_count(), 0);
1083    }
1084
1085    #[test]
1086    fn test_database_config() {
1087        let config = Config::in_memory().with_threads(4).with_query_logging();
1088
1089        let db = GrafeoDB::with_config(config).unwrap();
1090        assert_eq!(db.config().threads, 4);
1091        assert!(db.config().query_logging);
1092    }
1093
1094    #[test]
1095    fn test_database_session() {
1096        let db = GrafeoDB::new_in_memory();
1097        let _session = db.session();
1098        // Session should be created successfully
1099    }
1100
1101    #[cfg(feature = "wal")]
1102    #[test]
1103    fn test_persistent_database_recovery() {
1104        use grafeo_common::types::Value;
1105        use tempfile::tempdir;
1106
1107        let dir = tempdir().unwrap();
1108        let db_path = dir.path().join("test_db");
1109
1110        // Create database and add some data
1111        {
1112            let db = GrafeoDB::open(&db_path).unwrap();
1113
1114            let alix = db.create_node(&["Person"]);
1115            db.set_node_property(alix, "name", Value::from("Alix"));
1116
1117            let gus = db.create_node(&["Person"]);
1118            db.set_node_property(gus, "name", Value::from("Gus"));
1119
1120            let _edge = db.create_edge(alix, gus, "KNOWS");
1121
1122            // Explicitly close to flush WAL
1123            db.close().unwrap();
1124        }
1125
1126        // Reopen and verify data was recovered
1127        {
1128            let db = GrafeoDB::open(&db_path).unwrap();
1129
1130            assert_eq!(db.node_count(), 2);
1131            assert_eq!(db.edge_count(), 1);
1132
1133            // Verify nodes exist
1134            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1135            assert!(node0.is_some());
1136
1137            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1138            assert!(node1.is_some());
1139        }
1140    }
1141
1142    #[cfg(feature = "wal")]
1143    #[test]
1144    fn test_wal_logging() {
1145        use tempfile::tempdir;
1146
1147        let dir = tempdir().unwrap();
1148        let db_path = dir.path().join("wal_test_db");
1149
1150        let db = GrafeoDB::open(&db_path).unwrap();
1151
1152        // Create some data
1153        let node = db.create_node(&["Test"]);
1154        db.delete_node(node);
1155
1156        // WAL should have records
1157        if let Some(wal) = db.wal() {
1158            assert!(wal.record_count() > 0);
1159        }
1160
1161        db.close().unwrap();
1162    }
1163
1164    #[cfg(feature = "wal")]
1165    #[test]
1166    fn test_wal_recovery_multiple_sessions() {
1167        // Tests that WAL recovery works correctly across multiple open/close cycles
1168        use grafeo_common::types::Value;
1169        use tempfile::tempdir;
1170
1171        let dir = tempdir().unwrap();
1172        let db_path = dir.path().join("multi_session_db");
1173
1174        // Session 1: Create initial data
1175        {
1176            let db = GrafeoDB::open(&db_path).unwrap();
1177            let alix = db.create_node(&["Person"]);
1178            db.set_node_property(alix, "name", Value::from("Alix"));
1179            db.close().unwrap();
1180        }
1181
1182        // Session 2: Add more data
1183        {
1184            let db = GrafeoDB::open(&db_path).unwrap();
1185            assert_eq!(db.node_count(), 1); // Previous data recovered
1186            let gus = db.create_node(&["Person"]);
1187            db.set_node_property(gus, "name", Value::from("Gus"));
1188            db.close().unwrap();
1189        }
1190
1191        // Session 3: Verify all data
1192        {
1193            let db = GrafeoDB::open(&db_path).unwrap();
1194            assert_eq!(db.node_count(), 2);
1195
1196            // Verify properties were recovered correctly
1197            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1198            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1199
1200            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1201            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1202        }
1203    }
1204
1205    #[cfg(feature = "wal")]
1206    #[test]
1207    fn test_database_consistency_after_mutations() {
1208        // Tests that database remains consistent after a series of create/delete operations
1209        use grafeo_common::types::Value;
1210        use tempfile::tempdir;
1211
1212        let dir = tempdir().unwrap();
1213        let db_path = dir.path().join("consistency_db");
1214
1215        {
1216            let db = GrafeoDB::open(&db_path).unwrap();
1217
1218            // Create nodes
1219            let a = db.create_node(&["Node"]);
1220            let b = db.create_node(&["Node"]);
1221            let c = db.create_node(&["Node"]);
1222
1223            // Create edges
1224            let e1 = db.create_edge(a, b, "LINKS");
1225            let _e2 = db.create_edge(b, c, "LINKS");
1226
1227            // Delete middle node and its edge
1228            db.delete_edge(e1);
1229            db.delete_node(b);
1230
1231            // Set properties on remaining nodes
1232            db.set_node_property(a, "value", Value::Int64(1));
1233            db.set_node_property(c, "value", Value::Int64(3));
1234
1235            db.close().unwrap();
1236        }
1237
1238        // Reopen and verify consistency
1239        {
1240            let db = GrafeoDB::open(&db_path).unwrap();
1241
1242            // Should have 2 nodes (a and c), b was deleted
1243            // Note: node_count includes deleted nodes in some implementations
1244            // What matters is that the non-deleted nodes are accessible
1245            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1246            assert!(node_a.is_some());
1247
1248            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1249            assert!(node_c.is_some());
1250
1251            // Middle node should be deleted
1252            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1253            assert!(node_b.is_none());
1254        }
1255    }
1256
1257    #[cfg(feature = "wal")]
1258    #[test]
1259    fn test_close_is_idempotent() {
1260        // Calling close() multiple times should not cause errors
1261        use tempfile::tempdir;
1262
1263        let dir = tempdir().unwrap();
1264        let db_path = dir.path().join("close_test_db");
1265
1266        let db = GrafeoDB::open(&db_path).unwrap();
1267        db.create_node(&["Test"]);
1268
1269        // First close should succeed
1270        assert!(db.close().is_ok());
1271
1272        // Second close should also succeed (idempotent)
1273        assert!(db.close().is_ok());
1274    }
1275
1276    #[test]
1277    fn test_query_result_has_metrics() {
1278        // Verifies that query results include execution metrics
1279        let db = GrafeoDB::new_in_memory();
1280        db.create_node(&["Person"]);
1281        db.create_node(&["Person"]);
1282
1283        #[cfg(feature = "gql")]
1284        {
1285            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1286
1287            // Metrics should be populated
1288            assert!(result.execution_time_ms.is_some());
1289            assert!(result.rows_scanned.is_some());
1290            assert!(result.execution_time_ms.unwrap() >= 0.0);
1291            assert_eq!(result.rows_scanned.unwrap(), 2);
1292        }
1293    }
1294
1295    #[test]
1296    fn test_empty_query_result_metrics() {
1297        // Verifies metrics are correct for queries returning no results
1298        let db = GrafeoDB::new_in_memory();
1299        db.create_node(&["Person"]);
1300
1301        #[cfg(feature = "gql")]
1302        {
1303            // Query that matches nothing
1304            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1305
1306            assert!(result.execution_time_ms.is_some());
1307            assert!(result.rows_scanned.is_some());
1308            assert_eq!(result.rows_scanned.unwrap(), 0);
1309        }
1310    }
1311
1312    #[cfg(feature = "cdc")]
1313    mod cdc_integration {
1314        use super::*;
1315
1316        #[test]
1317        fn test_node_lifecycle_history() {
1318            let db = GrafeoDB::new_in_memory();
1319
1320            // Create
1321            let id = db.create_node(&["Person"]);
1322            // Update
1323            db.set_node_property(id, "name", "Alix".into());
1324            db.set_node_property(id, "name", "Gus".into());
1325            // Delete
1326            db.delete_node(id);
1327
1328            let history = db.history(id).unwrap();
1329            assert_eq!(history.len(), 4); // create + 2 updates + delete
1330            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1331            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1332            assert!(history[1].before.is_none()); // first set_node_property has no prior value
1333            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1334            assert!(history[2].before.is_some()); // second update has prior "Alix"
1335            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1336        }
1337
1338        #[test]
1339        fn test_edge_lifecycle_history() {
1340            let db = GrafeoDB::new_in_memory();
1341
1342            let alix = db.create_node(&["Person"]);
1343            let gus = db.create_node(&["Person"]);
1344            let edge = db.create_edge(alix, gus, "KNOWS");
1345            db.set_edge_property(edge, "since", 2024i64.into());
1346            db.delete_edge(edge);
1347
1348            let history = db.history(edge).unwrap();
1349            assert_eq!(history.len(), 3); // create + update + delete
1350            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1351            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1352            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1353        }
1354
1355        #[test]
1356        fn test_create_node_with_props_cdc() {
1357            let db = GrafeoDB::new_in_memory();
1358
1359            let id = db.create_node_with_props(
1360                &["Person"],
1361                vec![
1362                    ("name", grafeo_common::types::Value::from("Alix")),
1363                    ("age", grafeo_common::types::Value::from(30i64)),
1364                ],
1365            );
1366
1367            let history = db.history(id).unwrap();
1368            assert_eq!(history.len(), 1);
1369            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1370            // Props should be captured
1371            let after = history[0].after.as_ref().unwrap();
1372            assert_eq!(after.len(), 2);
1373        }
1374
1375        #[test]
1376        fn test_changes_between() {
1377            let db = GrafeoDB::new_in_memory();
1378
1379            let id1 = db.create_node(&["A"]);
1380            let _id2 = db.create_node(&["B"]);
1381            db.set_node_property(id1, "x", 1i64.into());
1382
1383            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
1384            let changes = db
1385                .changes_between(
1386                    grafeo_common::types::EpochId(0),
1387                    grafeo_common::types::EpochId(u64::MAX),
1388                )
1389                .unwrap();
1390            assert_eq!(changes.len(), 3); // 2 creates + 1 update
1391        }
1392    }
1393
1394    #[test]
1395    fn test_with_store_basic() {
1396        use grafeo_core::graph::lpg::LpgStore;
1397
1398        let store = Arc::new(LpgStore::new().unwrap());
1399        let n1 = store.create_node(&["Person"]);
1400        store.set_node_property(n1, "name", "Alix".into());
1401
1402        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1403        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1404
1405        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1406        assert_eq!(result.rows.len(), 1);
1407    }
1408
1409    #[test]
1410    fn test_with_store_session() {
1411        use grafeo_core::graph::lpg::LpgStore;
1412
1413        let store = Arc::new(LpgStore::new().unwrap());
1414        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1415        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1416
1417        let session = db.session();
1418        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1419        assert_eq!(result.rows.len(), 1);
1420    }
1421
1422    #[test]
1423    fn test_with_store_mutations() {
1424        use grafeo_core::graph::lpg::LpgStore;
1425
1426        let store = Arc::new(LpgStore::new().unwrap());
1427        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1428        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1429
1430        let session = db.session();
1431        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1432
1433        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1434        assert_eq!(result.rows.len(), 1);
1435
1436        // Data should also be visible via the original store
1437        assert_eq!(store.node_count(), 1);
1438    }
1439}