Skip to main content

grafeo_engine/database/
mod.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4//!
5//! Operations are split across focused submodules:
6//! - `query` - Query execution (execute, execute_cypher, etc.)
7//! - `crud` - Node/edge CRUD operations
8//! - `index` - Property, vector, and text index management
9//! - `search` - Vector, text, and hybrid search
10//! - `embed` - Embedding model management
11//! - `persistence` - Save, load, snapshots, iteration
12//! - `admin` - Stats, introspection, diagnostics, CDC
13
14mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21mod search;
22#[cfg(feature = "wal")]
23pub(crate) mod wal_store;
24
25#[cfg(feature = "wal")]
26use std::path::Path;
27use std::sync::Arc;
28use std::sync::atomic::AtomicUsize;
29
30use parking_lot::RwLock;
31
32#[cfg(feature = "wal")]
33use grafeo_adapters::storage::wal::{
34    DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
35};
36use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
37use grafeo_common::utils::error::Result;
38use grafeo_core::graph::GraphStoreMut;
39use grafeo_core::graph::lpg::LpgStore;
40#[cfg(feature = "rdf")]
41use grafeo_core::graph::rdf::RdfStore;
42
43use crate::catalog::Catalog;
44use crate::config::Config;
45use crate::query::cache::QueryCache;
46use crate::session::Session;
47use crate::transaction::TransactionManager;
48
49/// Your handle to a Grafeo database.
50///
51/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
52/// quick experiments, or [`open()`](Self::open) for persistent storage.
53/// Then grab a [`session()`](Self::session) to start querying.
54///
55/// # Examples
56///
57/// ```
58/// use grafeo_engine::GrafeoDB;
59///
60/// // Quick in-memory database
61/// let db = GrafeoDB::new_in_memory();
62///
63/// // Add some data
64/// db.create_node(&["Person"]);
65///
66/// // Query it
67/// let session = db.session();
68/// let result = session.execute("MATCH (p:Person) RETURN p")?;
69/// # Ok::<(), grafeo_common::utils::error::Error>(())
70/// ```
71pub struct GrafeoDB {
72    /// Database configuration.
73    pub(super) config: Config,
74    /// The underlying graph store.
75    pub(super) store: Arc<LpgStore>,
76    /// Schema and metadata catalog shared across sessions.
77    pub(super) catalog: Arc<Catalog>,
78    /// RDF triple store (if RDF feature is enabled).
79    #[cfg(feature = "rdf")]
80    pub(super) rdf_store: Arc<RdfStore>,
81    /// Transaction manager.
82    pub(super) tx_manager: Arc<TransactionManager>,
83    /// Unified buffer manager.
84    pub(super) buffer_manager: Arc<BufferManager>,
85    /// Write-ahead log manager (if durability is enabled).
86    #[cfg(feature = "wal")]
87    pub(super) wal: Option<Arc<LpgWal>>,
88    /// Query cache for parsed and optimized plans.
89    pub(super) query_cache: Arc<QueryCache>,
90    /// Shared commit counter for auto-GC across sessions.
91    pub(super) commit_counter: Arc<AtomicUsize>,
92    /// Whether the database is open.
93    pub(super) is_open: RwLock<bool>,
94    /// Change data capture log for tracking mutations.
95    #[cfg(feature = "cdc")]
96    pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
97    /// Registered embedding models for text-to-vector conversion.
98    #[cfg(feature = "embed")]
99    pub(super) embedding_models:
100        RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
101    /// External graph store (when using with_store()).
102    /// When set, sessions route queries through this store instead of the built-in LpgStore.
103    pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
104}
105
106impl GrafeoDB {
107    /// Creates an in-memory database - fast to create, gone when dropped.
108    ///
109    /// Use this for tests, experiments, or when you don't need persistence.
110    /// For data that survives restarts, use [`open()`](Self::open) instead.
111    ///
112    /// # Examples
113    ///
114    /// ```
115    /// use grafeo_engine::GrafeoDB;
116    ///
117    /// let db = GrafeoDB::new_in_memory();
118    /// let session = db.session();
119    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
120    /// # Ok::<(), grafeo_common::utils::error::Error>(())
121    /// ```
122    #[must_use]
123    pub fn new_in_memory() -> Self {
124        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
125    }
126
127    /// Opens a database at the given path, creating it if it doesn't exist.
128    ///
129    /// If you've used this path before, Grafeo recovers your data from the
130    /// write-ahead log automatically. First open on a new path creates an
131    /// empty database.
132    ///
133    /// # Errors
134    ///
135    /// Returns an error if the path isn't writable or recovery fails.
136    ///
137    /// # Examples
138    ///
139    /// ```no_run
140    /// use grafeo_engine::GrafeoDB;
141    ///
142    /// let db = GrafeoDB::open("./my_social_network")?;
143    /// # Ok::<(), grafeo_common::utils::error::Error>(())
144    /// ```
145    #[cfg(feature = "wal")]
146    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
147        Self::with_config(Config::persistent(path.as_ref()))
148    }
149
150    /// Creates a database with custom configuration.
151    ///
152    /// Use this when you need fine-grained control over memory limits,
153    /// thread counts, or persistence settings. For most cases,
154    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
155    /// are simpler.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the database can't be created or recovery fails.
160    ///
161    /// # Examples
162    ///
163    /// ```
164    /// use grafeo_engine::{GrafeoDB, Config};
165    ///
166    /// // In-memory with a 512MB limit
167    /// let config = Config::in_memory()
168    ///     .with_memory_limit(512 * 1024 * 1024);
169    ///
170    /// let db = GrafeoDB::with_config(config)?;
171    /// # Ok::<(), grafeo_common::utils::error::Error>(())
172    /// ```
173    pub fn with_config(config: Config) -> Result<Self> {
174        // Validate configuration before proceeding
175        config
176            .validate()
177            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
178
179        let store = Arc::new(LpgStore::new()?);
180        #[cfg(feature = "rdf")]
181        let rdf_store = Arc::new(RdfStore::new());
182        let tx_manager = Arc::new(TransactionManager::new());
183
184        // Create buffer manager with configured limits
185        let buffer_config = BufferManagerConfig {
186            budget: config.memory_limit.unwrap_or_else(|| {
187                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
188            }),
189            spill_path: config
190                .spill_path
191                .clone()
192                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
193            ..BufferManagerConfig::default()
194        };
195        let buffer_manager = BufferManager::new(buffer_config);
196
197        // Create catalog early so WAL replay can restore schema definitions
198        let catalog = Arc::new(Catalog::new());
199
200        // Initialize WAL if persistence is enabled
201        #[cfg(feature = "wal")]
202        let wal = if config.wal_enabled {
203            if let Some(ref db_path) = config.path {
204                // Create database directory if it doesn't exist
205                std::fs::create_dir_all(db_path)?;
206
207                let wal_path = db_path.join("wal");
208
209                // Check if WAL exists and recover if needed
210                if wal_path.exists() {
211                    let recovery = WalRecovery::new(&wal_path);
212                    let records = recovery.recover()?;
213                    Self::apply_wal_records(&store, &catalog, &records)?;
214                }
215
216                // Open/create WAL manager with configured durability
217                let wal_durability = match config.wal_durability {
218                    crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
219                    crate::config::DurabilityMode::Batch {
220                        max_delay_ms,
221                        max_records,
222                    } => WalDurabilityMode::Batch {
223                        max_delay_ms,
224                        max_records,
225                    },
226                    crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
227                        WalDurabilityMode::Adaptive { target_interval_ms }
228                    }
229                    crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
230                };
231                let wal_config = WalConfig {
232                    durability: wal_durability,
233                    ..WalConfig::default()
234                };
235                let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
236                Some(Arc::new(wal_manager))
237            } else {
238                None
239            }
240        } else {
241            None
242        };
243
244        // Create query cache with default capacity (1000 queries)
245        let query_cache = Arc::new(QueryCache::default());
246
247        Ok(Self {
248            config,
249            store,
250            catalog,
251            #[cfg(feature = "rdf")]
252            rdf_store,
253            tx_manager,
254            buffer_manager,
255            #[cfg(feature = "wal")]
256            wal,
257            query_cache,
258            commit_counter: Arc::new(AtomicUsize::new(0)),
259            is_open: RwLock::new(true),
260            #[cfg(feature = "cdc")]
261            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
262            #[cfg(feature = "embed")]
263            embedding_models: RwLock::new(hashbrown::HashMap::new()),
264            external_store: None,
265        })
266    }
267
268    /// Creates a database backed by a custom [`GraphStoreMut`] implementation.
269    ///
270    /// The external store handles all data persistence. WAL, CDC, and index
271    /// management are the responsibility of the store implementation.
272    ///
273    /// Query execution (all 6 languages, optimizer, planner) works through the
274    /// provided store. Admin operations (schema introspection, persistence,
275    /// vector/text indexes) are not available on external stores.
276    ///
277    /// # Examples
278    ///
279    /// ```no_run
280    /// use std::sync::Arc;
281    /// use grafeo_engine::{GrafeoDB, Config};
282    /// use grafeo_core::graph::GraphStoreMut;
283    ///
284    /// fn example(store: Arc<dyn GraphStoreMut>) -> grafeo_common::utils::error::Result<()> {
285    ///     let db = GrafeoDB::with_store(store, Config::in_memory())?;
286    ///     let result = db.execute("MATCH (n) RETURN count(n)")?;
287    ///     Ok(())
288    /// }
289    /// ```
290    ///
291    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
292    pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
293        config
294            .validate()
295            .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
296
297        let dummy_store = Arc::new(LpgStore::new()?);
298        let tx_manager = Arc::new(TransactionManager::new());
299
300        let buffer_config = BufferManagerConfig {
301            budget: config.memory_limit.unwrap_or_else(|| {
302                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
303            }),
304            spill_path: None,
305            ..BufferManagerConfig::default()
306        };
307        let buffer_manager = BufferManager::new(buffer_config);
308
309        let query_cache = Arc::new(QueryCache::default());
310
311        Ok(Self {
312            config,
313            store: dummy_store,
314            catalog: Arc::new(Catalog::new()),
315            #[cfg(feature = "rdf")]
316            rdf_store: Arc::new(RdfStore::new()),
317            tx_manager,
318            buffer_manager,
319            #[cfg(feature = "wal")]
320            wal: None,
321            query_cache,
322            commit_counter: Arc::new(AtomicUsize::new(0)),
323            is_open: RwLock::new(true),
324            #[cfg(feature = "cdc")]
325            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
326            #[cfg(feature = "embed")]
327            embedding_models: RwLock::new(hashbrown::HashMap::new()),
328            external_store: Some(store),
329        })
330    }
331
332    /// Applies WAL records to restore the database state.
333    #[cfg(feature = "wal")]
334    fn apply_wal_records(store: &LpgStore, catalog: &Catalog, records: &[WalRecord]) -> Result<()> {
335        use crate::catalog::{
336            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypeConstraint, TypedProperty,
337        };
338
339        for record in records {
340            match record {
341                WalRecord::CreateNode { id, labels } => {
342                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
343                    store.create_node_with_id(*id, &label_refs);
344                }
345                WalRecord::DeleteNode { id } => {
346                    store.delete_node(*id);
347                }
348                WalRecord::CreateEdge {
349                    id,
350                    src,
351                    dst,
352                    edge_type,
353                } => {
354                    store.create_edge_with_id(*id, *src, *dst, edge_type);
355                }
356                WalRecord::DeleteEdge { id } => {
357                    store.delete_edge(*id);
358                }
359                WalRecord::SetNodeProperty { id, key, value } => {
360                    store.set_node_property(*id, key, value.clone());
361                }
362                WalRecord::SetEdgeProperty { id, key, value } => {
363                    store.set_edge_property(*id, key, value.clone());
364                }
365                WalRecord::AddNodeLabel { id, label } => {
366                    store.add_label(*id, label);
367                }
368                WalRecord::RemoveNodeLabel { id, label } => {
369                    store.remove_label(*id, label);
370                }
371                WalRecord::RemoveNodeProperty { id, key } => {
372                    store.remove_node_property(*id, key);
373                }
374                WalRecord::RemoveEdgeProperty { id, key } => {
375                    store.remove_edge_property(*id, key);
376                }
377
378                // Schema DDL replay
379                WalRecord::CreateNodeType {
380                    name,
381                    properties,
382                    constraints,
383                } => {
384                    let def = NodeTypeDefinition {
385                        name: name.clone(),
386                        properties: properties
387                            .iter()
388                            .map(|(n, t, nullable)| TypedProperty {
389                                name: n.clone(),
390                                data_type: PropertyDataType::from_type_name(t),
391                                nullable: *nullable,
392                                default_value: None,
393                            })
394                            .collect(),
395                        constraints: constraints
396                            .iter()
397                            .map(|(kind, props)| match kind.as_str() {
398                                "unique" => TypeConstraint::Unique(props.clone()),
399                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
400                                "not_null" if !props.is_empty() => {
401                                    TypeConstraint::NotNull(props[0].clone())
402                                }
403                                _ => TypeConstraint::Unique(props.clone()),
404                            })
405                            .collect(),
406                    };
407                    let _ = catalog.register_node_type(def);
408                }
409                WalRecord::DropNodeType { name } => {
410                    let _ = catalog.drop_node_type(name);
411                }
412                WalRecord::CreateEdgeType {
413                    name,
414                    properties,
415                    constraints,
416                } => {
417                    let def = EdgeTypeDefinition {
418                        name: name.clone(),
419                        properties: properties
420                            .iter()
421                            .map(|(n, t, nullable)| TypedProperty {
422                                name: n.clone(),
423                                data_type: PropertyDataType::from_type_name(t),
424                                nullable: *nullable,
425                                default_value: None,
426                            })
427                            .collect(),
428                        constraints: constraints
429                            .iter()
430                            .map(|(kind, props)| match kind.as_str() {
431                                "unique" => TypeConstraint::Unique(props.clone()),
432                                "primary_key" => TypeConstraint::PrimaryKey(props.clone()),
433                                "not_null" if !props.is_empty() => {
434                                    TypeConstraint::NotNull(props[0].clone())
435                                }
436                                _ => TypeConstraint::Unique(props.clone()),
437                            })
438                            .collect(),
439                    };
440                    let _ = catalog.register_edge_type_def(def);
441                }
442                WalRecord::DropEdgeType { name } => {
443                    let _ = catalog.drop_edge_type_def(name);
444                }
445                WalRecord::CreateIndex { .. } | WalRecord::DropIndex { .. } => {
446                    // Index recreation is handled by the store on startup
447                    // (indexes are rebuilt from data, not WAL)
448                }
449                WalRecord::CreateConstraint { .. } | WalRecord::DropConstraint { .. } => {
450                    // Constraint definitions are part of type definitions
451                    // and replayed via CreateNodeType/CreateEdgeType
452                }
453                WalRecord::CreateGraphType {
454                    name,
455                    node_types,
456                    edge_types,
457                    open,
458                } => {
459                    use crate::catalog::GraphTypeDefinition;
460                    let def = GraphTypeDefinition {
461                        name: name.clone(),
462                        allowed_node_types: node_types.clone(),
463                        allowed_edge_types: edge_types.clone(),
464                        open: *open,
465                    };
466                    let _ = catalog.register_graph_type(def);
467                }
468                WalRecord::DropGraphType { name } => {
469                    let _ = catalog.drop_graph_type(name);
470                }
471                WalRecord::CreateSchema { name } => {
472                    let _ = catalog.register_schema_namespace(name.clone());
473                }
474                WalRecord::DropSchema { name } => {
475                    let _ = catalog.drop_schema_namespace(name);
476                }
477
478                WalRecord::AlterNodeType { name, alterations } => {
479                    for (action, prop_name, type_name, nullable) in alterations {
480                        match action.as_str() {
481                            "add" => {
482                                let prop = TypedProperty {
483                                    name: prop_name.clone(),
484                                    data_type: PropertyDataType::from_type_name(type_name),
485                                    nullable: *nullable,
486                                    default_value: None,
487                                };
488                                let _ = catalog.alter_node_type_add_property(name, prop);
489                            }
490                            "drop" => {
491                                let _ = catalog.alter_node_type_drop_property(name, prop_name);
492                            }
493                            _ => {}
494                        }
495                    }
496                }
497                WalRecord::AlterEdgeType { name, alterations } => {
498                    for (action, prop_name, type_name, nullable) in alterations {
499                        match action.as_str() {
500                            "add" => {
501                                let prop = TypedProperty {
502                                    name: prop_name.clone(),
503                                    data_type: PropertyDataType::from_type_name(type_name),
504                                    nullable: *nullable,
505                                    default_value: None,
506                                };
507                                let _ = catalog.alter_edge_type_add_property(name, prop);
508                            }
509                            "drop" => {
510                                let _ = catalog.alter_edge_type_drop_property(name, prop_name);
511                            }
512                            _ => {}
513                        }
514                    }
515                }
516                WalRecord::AlterGraphType { name, alterations } => {
517                    for (action, type_name) in alterations {
518                        match action.as_str() {
519                            "add_node" => {
520                                let _ =
521                                    catalog.alter_graph_type_add_node_type(name, type_name.clone());
522                            }
523                            "drop_node" => {
524                                let _ = catalog.alter_graph_type_drop_node_type(name, type_name);
525                            }
526                            "add_edge" => {
527                                let _ =
528                                    catalog.alter_graph_type_add_edge_type(name, type_name.clone());
529                            }
530                            "drop_edge" => {
531                                let _ = catalog.alter_graph_type_drop_edge_type(name, type_name);
532                            }
533                            _ => {}
534                        }
535                    }
536                }
537
538                WalRecord::CreateProcedure {
539                    name,
540                    params,
541                    returns,
542                    body,
543                } => {
544                    use crate::catalog::ProcedureDefinition;
545                    let def = ProcedureDefinition {
546                        name: name.clone(),
547                        params: params.clone(),
548                        returns: returns.clone(),
549                        body: body.clone(),
550                    };
551                    let _ = catalog.register_procedure(def);
552                }
553                WalRecord::DropProcedure { name } => {
554                    let _ = catalog.drop_procedure(name);
555                }
556
557                WalRecord::TxCommit { .. }
558                | WalRecord::TxAbort { .. }
559                | WalRecord::Checkpoint { .. } => {
560                    // Transaction control records don't need replay action
561                    // (recovery already filtered to only committed transactions)
562                }
563            }
564        }
565        Ok(())
566    }
567
568    // =========================================================================
569    // Session & Configuration
570    // =========================================================================
571
572    /// Opens a new session for running queries.
573    ///
574    /// Sessions are cheap to create - spin up as many as you need. Each
575    /// gets its own transaction context, so concurrent sessions won't
576    /// block each other on reads.
577    ///
578    /// # Examples
579    ///
580    /// ```
581    /// use grafeo_engine::GrafeoDB;
582    ///
583    /// let db = GrafeoDB::new_in_memory();
584    /// let session = db.session();
585    ///
586    /// // Run queries through the session
587    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
588    /// # Ok::<(), grafeo_common::utils::error::Error>(())
589    /// ```
590    #[must_use]
591    pub fn session(&self) -> Session {
592        if let Some(ref ext_store) = self.external_store {
593            return Session::with_external_store(
594                Arc::clone(ext_store),
595                Arc::clone(&self.tx_manager),
596                Arc::clone(&self.query_cache),
597                Arc::clone(&self.catalog),
598                self.config.adaptive.clone(),
599                self.config.factorized_execution,
600                self.config.graph_model,
601                self.config.query_timeout,
602                Arc::clone(&self.commit_counter),
603                self.config.gc_interval,
604            );
605        }
606
607        #[cfg(feature = "rdf")]
608        let mut session = Session::with_rdf_store_and_adaptive(
609            Arc::clone(&self.store),
610            Arc::clone(&self.rdf_store),
611            Arc::clone(&self.tx_manager),
612            Arc::clone(&self.query_cache),
613            Arc::clone(&self.catalog),
614            self.config.adaptive.clone(),
615            self.config.factorized_execution,
616            self.config.graph_model,
617            self.config.query_timeout,
618            Arc::clone(&self.commit_counter),
619            self.config.gc_interval,
620        );
621        #[cfg(not(feature = "rdf"))]
622        let mut session = Session::with_adaptive(
623            Arc::clone(&self.store),
624            Arc::clone(&self.tx_manager),
625            Arc::clone(&self.query_cache),
626            Arc::clone(&self.catalog),
627            self.config.adaptive.clone(),
628            self.config.factorized_execution,
629            self.config.graph_model,
630            self.config.query_timeout,
631            Arc::clone(&self.commit_counter),
632            self.config.gc_interval,
633        );
634
635        #[cfg(feature = "wal")]
636        if let Some(ref wal) = self.wal {
637            session.set_wal(Arc::clone(wal));
638        }
639
640        #[cfg(feature = "cdc")]
641        session.set_cdc_log(Arc::clone(&self.cdc_log));
642
643        // Suppress unused_mut when cdc/wal are disabled
644        let _ = &mut session;
645
646        session
647    }
648
649    /// Returns the adaptive execution configuration.
650    #[must_use]
651    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
652        &self.config.adaptive
653    }
654
655    /// Returns the configuration.
656    #[must_use]
657    pub fn config(&self) -> &Config {
658        &self.config
659    }
660
661    /// Returns the graph data model of this database.
662    #[must_use]
663    pub fn graph_model(&self) -> crate::config::GraphModel {
664        self.config.graph_model
665    }
666
667    /// Returns the configured memory limit in bytes, if any.
668    #[must_use]
669    pub fn memory_limit(&self) -> Option<usize> {
670        self.config.memory_limit
671    }
672
673    /// Returns the underlying store.
674    ///
675    /// This provides direct access to the LPG store for algorithm implementations
676    /// and admin operations (index management, schema introspection, MVCC internals).
677    ///
678    /// For code that only needs read/write graph operations, prefer
679    /// [`graph_store()`](Self::graph_store) which returns the trait interface.
680    #[must_use]
681    pub fn store(&self) -> &Arc<LpgStore> {
682        &self.store
683    }
684
685    // === Named Graph Management ===
686
687    /// Creates a named graph. Returns `true` if created, `false` if it already exists.
688    ///
689    /// # Errors
690    ///
691    /// Returns an error if arena allocation fails.
692    pub fn create_graph(&self, name: &str) -> Result<bool> {
693        Ok(self.store.create_graph(name)?)
694    }
695
696    /// Drops a named graph. Returns `true` if dropped, `false` if it did not exist.
697    pub fn drop_graph(&self, name: &str) -> bool {
698        self.store.drop_graph(name)
699    }
700
701    /// Returns all named graph names.
702    #[must_use]
703    pub fn list_graphs(&self) -> Vec<String> {
704        self.store.graph_names()
705    }
706
707    /// Returns the graph store as a trait object.
708    ///
709    /// This provides the [`GraphStoreMut`] interface for code that should work
710    /// with any storage backend. Use this when you only need graph read/write
711    /// operations and don't need admin methods like index management.
712    ///
713    /// [`GraphStoreMut`]: grafeo_core::graph::GraphStoreMut
714    #[must_use]
715    pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
716        if let Some(ref ext_store) = self.external_store {
717            Arc::clone(ext_store)
718        } else {
719            Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
720        }
721    }
722
723    /// Garbage collects old MVCC versions that are no longer visible.
724    ///
725    /// Determines the minimum epoch required by active transactions and prunes
726    /// version chains older than that threshold. Also cleans up completed
727    /// transaction metadata in the transaction manager.
728    pub fn gc(&self) {
729        let min_epoch = self.tx_manager.min_active_epoch();
730        self.store.gc_versions(min_epoch);
731        self.tx_manager.gc();
732    }
733
734    /// Returns the buffer manager for memory-aware operations.
735    #[must_use]
736    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
737        &self.buffer_manager
738    }
739
740    /// Returns the query cache.
741    #[must_use]
742    pub fn query_cache(&self) -> &Arc<QueryCache> {
743        &self.query_cache
744    }
745
746    // =========================================================================
747    // Lifecycle
748    // =========================================================================
749
750    /// Closes the database, flushing all pending writes.
751    ///
752    /// For persistent databases, this ensures everything is safely on disk.
753    /// Called automatically when the database is dropped, but you can call
754    /// it explicitly if you need to guarantee durability at a specific point.
755    ///
756    /// # Errors
757    ///
758    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
759    pub fn close(&self) -> Result<()> {
760        let mut is_open = self.is_open.write();
761        if !*is_open {
762            return Ok(());
763        }
764
765        // Commit and checkpoint WAL
766        #[cfg(feature = "wal")]
767        if let Some(ref wal) = self.wal {
768            let epoch = self.store.current_epoch();
769
770            // Use the last assigned transaction ID, or create a checkpoint-only tx
771            let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
772                // No transactions have been started; begin one for checkpoint
773                self.tx_manager.begin()
774            });
775
776            // Log a TxCommit to mark all pending records as committed
777            wal.log(&WalRecord::TxCommit {
778                tx_id: checkpoint_tx,
779            })?;
780
781            // Then checkpoint
782            wal.checkpoint(checkpoint_tx, epoch)?;
783            wal.sync()?;
784        }
785
786        *is_open = false;
787        Ok(())
788    }
789
790    /// Returns the typed WAL if available.
791    #[cfg(feature = "wal")]
792    #[must_use]
793    pub fn wal(&self) -> Option<&Arc<LpgWal>> {
794        self.wal.as_ref()
795    }
796
797    /// Logs a WAL record if WAL is enabled.
798    #[cfg(feature = "wal")]
799    pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
800        if let Some(ref wal) = self.wal {
801            wal.log(record)?;
802        }
803        Ok(())
804    }
805}
806
807impl Drop for GrafeoDB {
808    fn drop(&mut self) {
809        if let Err(e) = self.close() {
810            tracing::error!("Error closing database: {}", e);
811        }
812    }
813}
814
815impl crate::admin::AdminService for GrafeoDB {
816    fn info(&self) -> crate::admin::DatabaseInfo {
817        self.info()
818    }
819
820    fn detailed_stats(&self) -> crate::admin::DatabaseStats {
821        self.detailed_stats()
822    }
823
824    fn schema(&self) -> crate::admin::SchemaInfo {
825        self.schema()
826    }
827
828    fn validate(&self) -> crate::admin::ValidationResult {
829        self.validate()
830    }
831
832    fn wal_status(&self) -> crate::admin::WalStatus {
833        self.wal_status()
834    }
835
836    fn wal_checkpoint(&self) -> Result<()> {
837        self.wal_checkpoint()
838    }
839}
840
841// =========================================================================
842// Query Result Types
843// =========================================================================
844
845/// The result of running a query.
846///
847/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
848/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
849///
850/// # Examples
851///
852/// ```
853/// use grafeo_engine::GrafeoDB;
854///
855/// let db = GrafeoDB::new_in_memory();
856/// db.create_node(&["Person"]);
857///
858/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
859///
860/// // Check what we got
861/// println!("Columns: {:?}", result.columns);
862/// println!("Rows: {}", result.row_count());
863///
864/// // Iterate through results
865/// for row in result.iter() {
866///     println!("{:?}", row);
867/// }
868/// # Ok::<(), grafeo_common::utils::error::Error>(())
869/// ```
870#[derive(Debug)]
871pub struct QueryResult {
872    /// Column names from the RETURN clause.
873    pub columns: Vec<String>,
874    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
875    pub column_types: Vec<grafeo_common::types::LogicalType>,
876    /// The actual result rows.
877    pub rows: Vec<Vec<grafeo_common::types::Value>>,
878    /// Query execution time in milliseconds (if timing was enabled).
879    pub execution_time_ms: Option<f64>,
880    /// Number of rows scanned during query execution (estimate).
881    pub rows_scanned: Option<u64>,
882    /// Status message for DDL and session commands (e.g., "Created node type 'Person'").
883    pub status_message: Option<String>,
884}
885
886impl QueryResult {
887    /// Creates a fully empty query result (no columns, no rows).
888    #[must_use]
889    pub fn empty() -> Self {
890        Self {
891            columns: Vec::new(),
892            column_types: Vec::new(),
893            rows: Vec::new(),
894            execution_time_ms: None,
895            rows_scanned: None,
896            status_message: None,
897        }
898    }
899
900    /// Creates a query result with only a status message (for DDL commands).
901    #[must_use]
902    pub fn status(msg: impl Into<String>) -> Self {
903        Self {
904            columns: Vec::new(),
905            column_types: Vec::new(),
906            rows: Vec::new(),
907            execution_time_ms: None,
908            rows_scanned: None,
909            status_message: Some(msg.into()),
910        }
911    }
912
913    /// Creates a new empty query result.
914    #[must_use]
915    pub fn new(columns: Vec<String>) -> Self {
916        let len = columns.len();
917        Self {
918            columns,
919            column_types: vec![grafeo_common::types::LogicalType::Any; len],
920            rows: Vec::new(),
921            execution_time_ms: None,
922            rows_scanned: None,
923            status_message: None,
924        }
925    }
926
927    /// Creates a new empty query result with column types.
928    #[must_use]
929    pub fn with_types(
930        columns: Vec<String>,
931        column_types: Vec<grafeo_common::types::LogicalType>,
932    ) -> Self {
933        Self {
934            columns,
935            column_types,
936            rows: Vec::new(),
937            execution_time_ms: None,
938            rows_scanned: None,
939            status_message: None,
940        }
941    }
942
943    /// Sets the execution metrics on this result.
944    pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
945        self.execution_time_ms = Some(execution_time_ms);
946        self.rows_scanned = Some(rows_scanned);
947        self
948    }
949
950    /// Returns the execution time in milliseconds, if available.
951    #[must_use]
952    pub fn execution_time_ms(&self) -> Option<f64> {
953        self.execution_time_ms
954    }
955
956    /// Returns the number of rows scanned, if available.
957    #[must_use]
958    pub fn rows_scanned(&self) -> Option<u64> {
959        self.rows_scanned
960    }
961
962    /// Returns the number of rows.
963    #[must_use]
964    pub fn row_count(&self) -> usize {
965        self.rows.len()
966    }
967
968    /// Returns the number of columns.
969    #[must_use]
970    pub fn column_count(&self) -> usize {
971        self.columns.len()
972    }
973
974    /// Returns true if the result is empty.
975    #[must_use]
976    pub fn is_empty(&self) -> bool {
977        self.rows.is_empty()
978    }
979
980    /// Extracts a single value from the result.
981    ///
982    /// Use this when your query returns exactly one row with one column,
983    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
984    ///
985    /// # Errors
986    ///
987    /// Returns an error if the result has multiple rows or columns.
988    pub fn scalar<T: FromValue>(&self) -> Result<T> {
989        if self.rows.len() != 1 || self.columns.len() != 1 {
990            return Err(grafeo_common::utils::error::Error::InvalidValue(
991                "Expected single value".to_string(),
992            ));
993        }
994        T::from_value(&self.rows[0][0])
995    }
996
997    /// Returns an iterator over the rows.
998    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
999        self.rows.iter()
1000    }
1001}
1002
1003/// Converts a [`grafeo_common::types::Value`] to a concrete Rust type.
1004///
1005/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1006/// Used by [`QueryResult::scalar()`] to extract typed values.
1007pub trait FromValue: Sized {
1008    /// Attempts the conversion, returning an error on type mismatch.
1009    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1010}
1011
1012impl FromValue for i64 {
1013    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1014        value
1015            .as_int64()
1016            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1017                expected: "INT64".to_string(),
1018                found: value.type_name().to_string(),
1019            })
1020    }
1021}
1022
1023impl FromValue for f64 {
1024    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1025        value
1026            .as_float64()
1027            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1028                expected: "FLOAT64".to_string(),
1029                found: value.type_name().to_string(),
1030            })
1031    }
1032}
1033
1034impl FromValue for String {
1035    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1036        value.as_str().map(String::from).ok_or_else(|| {
1037            grafeo_common::utils::error::Error::TypeMismatch {
1038                expected: "STRING".to_string(),
1039                found: value.type_name().to_string(),
1040            }
1041        })
1042    }
1043}
1044
1045impl FromValue for bool {
1046    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1047        value
1048            .as_bool()
1049            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1050                expected: "BOOL".to_string(),
1051                found: value.type_name().to_string(),
1052            })
1053    }
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058    use super::*;
1059
1060    #[test]
1061    fn test_create_in_memory_database() {
1062        let db = GrafeoDB::new_in_memory();
1063        assert_eq!(db.node_count(), 0);
1064        assert_eq!(db.edge_count(), 0);
1065    }
1066
1067    #[test]
1068    fn test_database_config() {
1069        let config = Config::in_memory().with_threads(4).with_query_logging();
1070
1071        let db = GrafeoDB::with_config(config).unwrap();
1072        assert_eq!(db.config().threads, 4);
1073        assert!(db.config().query_logging);
1074    }
1075
1076    #[test]
1077    fn test_database_session() {
1078        let db = GrafeoDB::new_in_memory();
1079        let _session = db.session();
1080        // Session should be created successfully
1081    }
1082
1083    #[cfg(feature = "wal")]
1084    #[test]
1085    fn test_persistent_database_recovery() {
1086        use grafeo_common::types::Value;
1087        use tempfile::tempdir;
1088
1089        let dir = tempdir().unwrap();
1090        let db_path = dir.path().join("test_db");
1091
1092        // Create database and add some data
1093        {
1094            let db = GrafeoDB::open(&db_path).unwrap();
1095
1096            let alix = db.create_node(&["Person"]);
1097            db.set_node_property(alix, "name", Value::from("Alix"));
1098
1099            let gus = db.create_node(&["Person"]);
1100            db.set_node_property(gus, "name", Value::from("Gus"));
1101
1102            let _edge = db.create_edge(alix, gus, "KNOWS");
1103
1104            // Explicitly close to flush WAL
1105            db.close().unwrap();
1106        }
1107
1108        // Reopen and verify data was recovered
1109        {
1110            let db = GrafeoDB::open(&db_path).unwrap();
1111
1112            assert_eq!(db.node_count(), 2);
1113            assert_eq!(db.edge_count(), 1);
1114
1115            // Verify nodes exist
1116            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1117            assert!(node0.is_some());
1118
1119            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1120            assert!(node1.is_some());
1121        }
1122    }
1123
1124    #[cfg(feature = "wal")]
1125    #[test]
1126    fn test_wal_logging() {
1127        use tempfile::tempdir;
1128
1129        let dir = tempdir().unwrap();
1130        let db_path = dir.path().join("wal_test_db");
1131
1132        let db = GrafeoDB::open(&db_path).unwrap();
1133
1134        // Create some data
1135        let node = db.create_node(&["Test"]);
1136        db.delete_node(node);
1137
1138        // WAL should have records
1139        if let Some(wal) = db.wal() {
1140            assert!(wal.record_count() > 0);
1141        }
1142
1143        db.close().unwrap();
1144    }
1145
1146    #[cfg(feature = "wal")]
1147    #[test]
1148    fn test_wal_recovery_multiple_sessions() {
1149        // Tests that WAL recovery works correctly across multiple open/close cycles
1150        use grafeo_common::types::Value;
1151        use tempfile::tempdir;
1152
1153        let dir = tempdir().unwrap();
1154        let db_path = dir.path().join("multi_session_db");
1155
1156        // Session 1: Create initial data
1157        {
1158            let db = GrafeoDB::open(&db_path).unwrap();
1159            let alix = db.create_node(&["Person"]);
1160            db.set_node_property(alix, "name", Value::from("Alix"));
1161            db.close().unwrap();
1162        }
1163
1164        // Session 2: Add more data
1165        {
1166            let db = GrafeoDB::open(&db_path).unwrap();
1167            assert_eq!(db.node_count(), 1); // Previous data recovered
1168            let gus = db.create_node(&["Person"]);
1169            db.set_node_property(gus, "name", Value::from("Gus"));
1170            db.close().unwrap();
1171        }
1172
1173        // Session 3: Verify all data
1174        {
1175            let db = GrafeoDB::open(&db_path).unwrap();
1176            assert_eq!(db.node_count(), 2);
1177
1178            // Verify properties were recovered correctly
1179            let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
1180            assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
1181
1182            let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
1183            assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
1184        }
1185    }
1186
1187    #[cfg(feature = "wal")]
1188    #[test]
1189    fn test_database_consistency_after_mutations() {
1190        // Tests that database remains consistent after a series of create/delete operations
1191        use grafeo_common::types::Value;
1192        use tempfile::tempdir;
1193
1194        let dir = tempdir().unwrap();
1195        let db_path = dir.path().join("consistency_db");
1196
1197        {
1198            let db = GrafeoDB::open(&db_path).unwrap();
1199
1200            // Create nodes
1201            let a = db.create_node(&["Node"]);
1202            let b = db.create_node(&["Node"]);
1203            let c = db.create_node(&["Node"]);
1204
1205            // Create edges
1206            let e1 = db.create_edge(a, b, "LINKS");
1207            let _e2 = db.create_edge(b, c, "LINKS");
1208
1209            // Delete middle node and its edge
1210            db.delete_edge(e1);
1211            db.delete_node(b);
1212
1213            // Set properties on remaining nodes
1214            db.set_node_property(a, "value", Value::Int64(1));
1215            db.set_node_property(c, "value", Value::Int64(3));
1216
1217            db.close().unwrap();
1218        }
1219
1220        // Reopen and verify consistency
1221        {
1222            let db = GrafeoDB::open(&db_path).unwrap();
1223
1224            // Should have 2 nodes (a and c), b was deleted
1225            // Note: node_count includes deleted nodes in some implementations
1226            // What matters is that the non-deleted nodes are accessible
1227            let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
1228            assert!(node_a.is_some());
1229
1230            let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
1231            assert!(node_c.is_some());
1232
1233            // Middle node should be deleted
1234            let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
1235            assert!(node_b.is_none());
1236        }
1237    }
1238
1239    #[cfg(feature = "wal")]
1240    #[test]
1241    fn test_close_is_idempotent() {
1242        // Calling close() multiple times should not cause errors
1243        use tempfile::tempdir;
1244
1245        let dir = tempdir().unwrap();
1246        let db_path = dir.path().join("close_test_db");
1247
1248        let db = GrafeoDB::open(&db_path).unwrap();
1249        db.create_node(&["Test"]);
1250
1251        // First close should succeed
1252        assert!(db.close().is_ok());
1253
1254        // Second close should also succeed (idempotent)
1255        assert!(db.close().is_ok());
1256    }
1257
1258    #[test]
1259    fn test_query_result_has_metrics() {
1260        // Verifies that query results include execution metrics
1261        let db = GrafeoDB::new_in_memory();
1262        db.create_node(&["Person"]);
1263        db.create_node(&["Person"]);
1264
1265        #[cfg(feature = "gql")]
1266        {
1267            let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1268
1269            // Metrics should be populated
1270            assert!(result.execution_time_ms.is_some());
1271            assert!(result.rows_scanned.is_some());
1272            assert!(result.execution_time_ms.unwrap() >= 0.0);
1273            assert_eq!(result.rows_scanned.unwrap(), 2);
1274        }
1275    }
1276
1277    #[test]
1278    fn test_empty_query_result_metrics() {
1279        // Verifies metrics are correct for queries returning no results
1280        let db = GrafeoDB::new_in_memory();
1281        db.create_node(&["Person"]);
1282
1283        #[cfg(feature = "gql")]
1284        {
1285            // Query that matches nothing
1286            let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1287
1288            assert!(result.execution_time_ms.is_some());
1289            assert!(result.rows_scanned.is_some());
1290            assert_eq!(result.rows_scanned.unwrap(), 0);
1291        }
1292    }
1293
1294    #[cfg(feature = "cdc")]
1295    mod cdc_integration {
1296        use super::*;
1297
1298        #[test]
1299        fn test_node_lifecycle_history() {
1300            let db = GrafeoDB::new_in_memory();
1301
1302            // Create
1303            let id = db.create_node(&["Person"]);
1304            // Update
1305            db.set_node_property(id, "name", "Alix".into());
1306            db.set_node_property(id, "name", "Gus".into());
1307            // Delete
1308            db.delete_node(id);
1309
1310            let history = db.history(id).unwrap();
1311            assert_eq!(history.len(), 4); // create + 2 updates + delete
1312            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1313            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1314            assert!(history[1].before.is_none()); // first set_node_property has no prior value
1315            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1316            assert!(history[2].before.is_some()); // second update has prior "Alix"
1317            assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1318        }
1319
1320        #[test]
1321        fn test_edge_lifecycle_history() {
1322            let db = GrafeoDB::new_in_memory();
1323
1324            let alix = db.create_node(&["Person"]);
1325            let gus = db.create_node(&["Person"]);
1326            let edge = db.create_edge(alix, gus, "KNOWS");
1327            db.set_edge_property(edge, "since", 2024i64.into());
1328            db.delete_edge(edge);
1329
1330            let history = db.history(edge).unwrap();
1331            assert_eq!(history.len(), 3); // create + update + delete
1332            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1333            assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1334            assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1335        }
1336
1337        #[test]
1338        fn test_create_node_with_props_cdc() {
1339            let db = GrafeoDB::new_in_memory();
1340
1341            let id = db.create_node_with_props(
1342                &["Person"],
1343                vec![
1344                    ("name", grafeo_common::types::Value::from("Alix")),
1345                    ("age", grafeo_common::types::Value::from(30i64)),
1346                ],
1347            );
1348
1349            let history = db.history(id).unwrap();
1350            assert_eq!(history.len(), 1);
1351            assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1352            // Props should be captured
1353            let after = history[0].after.as_ref().unwrap();
1354            assert_eq!(after.len(), 2);
1355        }
1356
1357        #[test]
1358        fn test_changes_between() {
1359            let db = GrafeoDB::new_in_memory();
1360
1361            let id1 = db.create_node(&["A"]);
1362            let _id2 = db.create_node(&["B"]);
1363            db.set_node_property(id1, "x", 1i64.into());
1364
1365            // All events should be at the same epoch (in-memory, epoch doesn't advance without tx)
1366            let changes = db
1367                .changes_between(
1368                    grafeo_common::types::EpochId(0),
1369                    grafeo_common::types::EpochId(u64::MAX),
1370                )
1371                .unwrap();
1372            assert_eq!(changes.len(), 3); // 2 creates + 1 update
1373        }
1374    }
1375
1376    #[test]
1377    fn test_with_store_basic() {
1378        use grafeo_core::graph::lpg::LpgStore;
1379
1380        let store = Arc::new(LpgStore::new().unwrap());
1381        let n1 = store.create_node(&["Person"]);
1382        store.set_node_property(n1, "name", "Alix".into());
1383
1384        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1385        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1386
1387        let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
1388        assert_eq!(result.rows.len(), 1);
1389    }
1390
1391    #[test]
1392    fn test_with_store_session() {
1393        use grafeo_core::graph::lpg::LpgStore;
1394
1395        let store = Arc::new(LpgStore::new().unwrap());
1396        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1397        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1398
1399        let session = db.session();
1400        let result = session.execute("MATCH (n) RETURN count(n)").unwrap();
1401        assert_eq!(result.rows.len(), 1);
1402    }
1403
1404    #[test]
1405    fn test_with_store_mutations() {
1406        use grafeo_core::graph::lpg::LpgStore;
1407
1408        let store = Arc::new(LpgStore::new().unwrap());
1409        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
1410        let db = GrafeoDB::with_store(graph_store, Config::in_memory()).unwrap();
1411
1412        let session = db.session();
1413        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1414
1415        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1416        assert_eq!(result.rows.len(), 1);
1417
1418        // Data should also be visible via the original store
1419        assert_eq!(store.node_count(), 1);
1420    }
1421}