Skip to main content

grafeo_engine/
database.rs

1//! The main database struct and operations.
2//!
3//! Start here with [`GrafeoDB`] - it's your handle to everything.
4
5use std::path::Path;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10use grafeo_adapters::storage::wal::{WalConfig, WalManager, WalRecord, WalRecovery};
11use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::lpg::LpgStore;
14#[cfg(feature = "rdf")]
15use grafeo_core::graph::rdf::RdfStore;
16
17use crate::config::Config;
18use crate::session::Session;
19use crate::transaction::TransactionManager;
20
21/// Your handle to a Grafeo database.
22///
23/// Start here. Create one with [`new_in_memory()`](Self::new_in_memory) for
24/// quick experiments, or [`open()`](Self::open) for persistent storage.
25/// Then grab a [`session()`](Self::session) to start querying.
26///
27/// # Examples
28///
29/// ```
30/// use grafeo_engine::GrafeoDB;
31///
32/// // Quick in-memory database
33/// let db = GrafeoDB::new_in_memory();
34///
35/// // Add some data
36/// db.create_node(&["Person"]);
37///
38/// // Query it
39/// let session = db.session();
40/// let result = session.execute("MATCH (p:Person) RETURN p")?;
41/// # Ok::<(), grafeo_common::utils::error::Error>(())
42/// ```
43pub struct GrafeoDB {
44    /// Database configuration.
45    config: Config,
46    /// The underlying graph store.
47    store: Arc<LpgStore>,
48    /// RDF triple store (if RDF feature is enabled).
49    #[cfg(feature = "rdf")]
50    rdf_store: Arc<RdfStore>,
51    /// Transaction manager.
52    tx_manager: Arc<TransactionManager>,
53    /// Unified buffer manager.
54    buffer_manager: Arc<BufferManager>,
55    /// Write-ahead log manager (if durability is enabled).
56    wal: Option<Arc<WalManager>>,
57    /// Whether the database is open.
58    is_open: RwLock<bool>,
59}
60
61impl GrafeoDB {
62    /// Creates an in-memory database - fast to create, gone when dropped.
63    ///
64    /// Use this for tests, experiments, or when you don't need persistence.
65    /// For data that survives restarts, use [`open()`](Self::open) instead.
66    ///
67    /// # Examples
68    ///
69    /// ```
70    /// use grafeo_engine::GrafeoDB;
71    ///
72    /// let db = GrafeoDB::new_in_memory();
73    /// let session = db.session();
74    /// session.execute("INSERT (:Person {name: 'Alice'})")?;
75    /// # Ok::<(), grafeo_common::utils::error::Error>(())
76    /// ```
77    #[must_use]
78    pub fn new_in_memory() -> Self {
79        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
80    }
81
82    /// Opens a database at the given path, creating it if it doesn't exist.
83    ///
84    /// If you've used this path before, Grafeo recovers your data from the
85    /// write-ahead log automatically. First open on a new path creates an
86    /// empty database.
87    ///
88    /// # Errors
89    ///
90    /// Returns an error if the path isn't writable or recovery fails.
91    ///
92    /// # Examples
93    ///
94    /// ```no_run
95    /// use grafeo_engine::GrafeoDB;
96    ///
97    /// let db = GrafeoDB::open("./my_social_network")?;
98    /// # Ok::<(), grafeo_common::utils::error::Error>(())
99    /// ```
100    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
101        Self::with_config(Config::persistent(path.as_ref()))
102    }
103
104    /// Creates a database with custom configuration.
105    ///
106    /// Use this when you need fine-grained control over memory limits,
107    /// thread counts, or persistence settings. For most cases,
108    /// [`new_in_memory()`](Self::new_in_memory) or [`open()`](Self::open)
109    /// are simpler.
110    ///
111    /// # Errors
112    ///
113    /// Returns an error if the database can't be created or recovery fails.
114    ///
115    /// # Examples
116    ///
117    /// ```
118    /// use grafeo_engine::{GrafeoDB, Config};
119    ///
120    /// // In-memory with a 512MB limit
121    /// let config = Config::in_memory()
122    ///     .with_memory_limit(512 * 1024 * 1024);
123    ///
124    /// let db = GrafeoDB::with_config(config)?;
125    /// # Ok::<(), grafeo_common::utils::error::Error>(())
126    /// ```
127    pub fn with_config(config: Config) -> Result<Self> {
128        let store = Arc::new(LpgStore::new());
129        #[cfg(feature = "rdf")]
130        let rdf_store = Arc::new(RdfStore::new());
131        let tx_manager = Arc::new(TransactionManager::new());
132
133        // Create buffer manager with configured limits
134        let buffer_config = BufferManagerConfig {
135            budget: config.memory_limit.unwrap_or_else(|| {
136                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
137            }),
138            spill_path: config
139                .spill_path
140                .clone()
141                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
142            ..BufferManagerConfig::default()
143        };
144        let buffer_manager = BufferManager::new(buffer_config);
145
146        // Initialize WAL if persistence is enabled
147        let wal = if config.wal_enabled {
148            if let Some(ref db_path) = config.path {
149                // Create database directory if it doesn't exist
150                std::fs::create_dir_all(db_path)?;
151
152                let wal_path = db_path.join("wal");
153
154                // Check if WAL exists and recover if needed
155                if wal_path.exists() {
156                    let recovery = WalRecovery::new(&wal_path);
157                    let records = recovery.recover()?;
158                    Self::apply_wal_records(&store, &records)?;
159                }
160
161                // Open/create WAL manager
162                let wal_config = WalConfig::default();
163                let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
164                Some(Arc::new(wal_manager))
165            } else {
166                None
167            }
168        } else {
169            None
170        };
171
172        Ok(Self {
173            config,
174            store,
175            #[cfg(feature = "rdf")]
176            rdf_store,
177            tx_manager,
178            buffer_manager,
179            wal,
180            is_open: RwLock::new(true),
181        })
182    }
183
184    /// Applies WAL records to restore the database state.
185    fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
186        for record in records {
187            match record {
188                WalRecord::CreateNode { id, labels } => {
189                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
190                    store.create_node_with_id(*id, &label_refs);
191                }
192                WalRecord::DeleteNode { id } => {
193                    store.delete_node(*id);
194                }
195                WalRecord::CreateEdge {
196                    id,
197                    src,
198                    dst,
199                    edge_type,
200                } => {
201                    store.create_edge_with_id(*id, *src, *dst, edge_type);
202                }
203                WalRecord::DeleteEdge { id } => {
204                    store.delete_edge(*id);
205                }
206                WalRecord::SetNodeProperty { id, key, value } => {
207                    store.set_node_property(*id, key, value.clone());
208                }
209                WalRecord::SetEdgeProperty { id, key, value } => {
210                    store.set_edge_property(*id, key, value.clone());
211                }
212                WalRecord::TxCommit { .. }
213                | WalRecord::TxAbort { .. }
214                | WalRecord::Checkpoint { .. } => {
215                    // Transaction control records don't need replay action
216                    // (recovery already filtered to only committed transactions)
217                }
218            }
219        }
220        Ok(())
221    }
222
223    /// Opens a new session for running queries.
224    ///
225    /// Sessions are cheap to create - spin up as many as you need. Each
226    /// gets its own transaction context, so concurrent sessions won't
227    /// block each other on reads.
228    ///
229    /// # Examples
230    ///
231    /// ```
232    /// use grafeo_engine::GrafeoDB;
233    ///
234    /// let db = GrafeoDB::new_in_memory();
235    /// let session = db.session();
236    ///
237    /// // Run queries through the session
238    /// let result = session.execute("MATCH (n) RETURN count(n)")?;
239    /// # Ok::<(), grafeo_common::utils::error::Error>(())
240    /// ```
241    #[must_use]
242    pub fn session(&self) -> Session {
243        #[cfg(feature = "rdf")]
244        {
245            Session::with_rdf_store_and_adaptive(
246                Arc::clone(&self.store),
247                Arc::clone(&self.rdf_store),
248                Arc::clone(&self.tx_manager),
249                self.config.adaptive.clone(),
250            )
251        }
252        #[cfg(not(feature = "rdf"))]
253        {
254            Session::with_adaptive(
255                Arc::clone(&self.store),
256                Arc::clone(&self.tx_manager),
257                self.config.adaptive.clone(),
258            )
259        }
260    }
261
262    /// Returns the adaptive execution configuration.
263    #[must_use]
264    pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
265        &self.config.adaptive
266    }
267
268    /// Runs a query directly on the database.
269    ///
270    /// A convenience method that creates a temporary session behind the
271    /// scenes. If you're running multiple queries, grab a
272    /// [`session()`](Self::session) instead to avoid the overhead.
273    ///
274    /// # Errors
275    ///
276    /// Returns an error if parsing or execution fails.
277    pub fn execute(&self, query: &str) -> Result<QueryResult> {
278        let session = self.session();
279        session.execute(query)
280    }
281
282    /// Executes a query with parameters and returns the result.
283    ///
284    /// # Errors
285    ///
286    /// Returns an error if the query fails.
287    pub fn execute_with_params(
288        &self,
289        query: &str,
290        params: std::collections::HashMap<String, grafeo_common::types::Value>,
291    ) -> Result<QueryResult> {
292        let session = self.session();
293        session.execute_with_params(query, params)
294    }
295
296    /// Executes a Cypher query and returns the result.
297    ///
298    /// # Errors
299    ///
300    /// Returns an error if the query fails.
301    #[cfg(feature = "cypher")]
302    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
303        let session = self.session();
304        session.execute_cypher(query)
305    }
306
307    /// Executes a Cypher query with parameters and returns the result.
308    ///
309    /// # Errors
310    ///
311    /// Returns an error if the query fails.
312    #[cfg(feature = "cypher")]
313    pub fn execute_cypher_with_params(
314        &self,
315        query: &str,
316        params: std::collections::HashMap<String, grafeo_common::types::Value>,
317    ) -> Result<QueryResult> {
318        use crate::query::processor::{QueryLanguage, QueryProcessor};
319
320        // Create processor
321        let processor = QueryProcessor::for_lpg(Arc::clone(&self.store));
322        processor.process(query, QueryLanguage::Cypher, Some(&params))
323    }
324
325    /// Executes a Gremlin query and returns the result.
326    ///
327    /// # Errors
328    ///
329    /// Returns an error if the query fails.
330    #[cfg(feature = "gremlin")]
331    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
332        let session = self.session();
333        session.execute_gremlin(query)
334    }
335
336    /// Executes a Gremlin query with parameters and returns the result.
337    ///
338    /// # Errors
339    ///
340    /// Returns an error if the query fails.
341    #[cfg(feature = "gremlin")]
342    pub fn execute_gremlin_with_params(
343        &self,
344        query: &str,
345        params: std::collections::HashMap<String, grafeo_common::types::Value>,
346    ) -> Result<QueryResult> {
347        let session = self.session();
348        session.execute_gremlin_with_params(query, params)
349    }
350
351    /// Executes a GraphQL query and returns the result.
352    ///
353    /// # Errors
354    ///
355    /// Returns an error if the query fails.
356    #[cfg(feature = "graphql")]
357    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
358        let session = self.session();
359        session.execute_graphql(query)
360    }
361
362    /// Executes a GraphQL query with parameters and returns the result.
363    ///
364    /// # Errors
365    ///
366    /// Returns an error if the query fails.
367    #[cfg(feature = "graphql")]
368    pub fn execute_graphql_with_params(
369        &self,
370        query: &str,
371        params: std::collections::HashMap<String, grafeo_common::types::Value>,
372    ) -> Result<QueryResult> {
373        let session = self.session();
374        session.execute_graphql_with_params(query, params)
375    }
376
377    /// Executes a SPARQL query and returns the result.
378    ///
379    /// SPARQL queries operate on the RDF triple store.
380    ///
381    /// # Errors
382    ///
383    /// Returns an error if the query fails.
384    ///
385    /// # Examples
386    ///
387    /// ```ignore
388    /// use grafeo_engine::GrafeoDB;
389    ///
390    /// let db = GrafeoDB::new_in_memory();
391    /// let result = db.execute_sparql("SELECT ?s ?p ?o WHERE { ?s ?p ?o }")?;
392    /// ```
393    #[cfg(all(feature = "sparql", feature = "rdf"))]
394    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
395        use crate::query::{
396            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
397        };
398
399        // Parse and translate the SPARQL query to a logical plan
400        let logical_plan = sparql_translator::translate(query)?;
401
402        // Optimize the plan
403        let optimizer = Optimizer::new();
404        let optimized_plan = optimizer.optimize(logical_plan)?;
405
406        // Convert to physical plan using RDF planner
407        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
408        let mut physical_plan = planner.plan(&optimized_plan)?;
409
410        // Execute the plan
411        let executor = Executor::with_columns(physical_plan.columns.clone());
412        executor.execute(physical_plan.operator.as_mut())
413    }
414
415    /// Returns the RDF store.
416    ///
417    /// This provides direct access to the RDF store for triple operations.
418    #[cfg(feature = "rdf")]
419    #[must_use]
420    pub fn rdf_store(&self) -> &Arc<RdfStore> {
421        &self.rdf_store
422    }
423
424    /// Executes a query and returns a single scalar value.
425    ///
426    /// # Errors
427    ///
428    /// Returns an error if the query fails or doesn't return exactly one row.
429    pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
430        let result = self.execute(query)?;
431        result.scalar()
432    }
433
434    /// Returns the configuration.
435    #[must_use]
436    pub fn config(&self) -> &Config {
437        &self.config
438    }
439
440    /// Returns the underlying store.
441    ///
442    /// This provides direct access to the LPG store for algorithm implementations.
443    #[must_use]
444    pub fn store(&self) -> &Arc<LpgStore> {
445        &self.store
446    }
447
448    /// Returns the buffer manager for memory-aware operations.
449    #[must_use]
450    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
451        &self.buffer_manager
452    }
453
454    /// Closes the database, flushing all pending writes.
455    ///
456    /// For persistent databases, this ensures everything is safely on disk.
457    /// Called automatically when the database is dropped, but you can call
458    /// it explicitly if you need to guarantee durability at a specific point.
459    ///
460    /// # Errors
461    ///
462    /// Returns an error if the WAL can't be flushed (check disk space/permissions).
463    pub fn close(&self) -> Result<()> {
464        let mut is_open = self.is_open.write();
465        if !*is_open {
466            return Ok(());
467        }
468
469        // Commit and checkpoint WAL
470        if let Some(ref wal) = self.wal {
471            let epoch = self.store.current_epoch();
472
473            // Use the last assigned transaction ID, or create a checkpoint-only tx
474            let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
475                // No transactions have been started; begin one for checkpoint
476                self.tx_manager.begin()
477            });
478
479            // Log a TxCommit to mark all pending records as committed
480            wal.log(&WalRecord::TxCommit {
481                tx_id: checkpoint_tx,
482            })?;
483
484            // Then checkpoint
485            wal.checkpoint(checkpoint_tx, epoch)?;
486            wal.sync()?;
487        }
488
489        *is_open = false;
490        Ok(())
491    }
492
493    /// Returns the WAL manager if available.
494    #[must_use]
495    pub fn wal(&self) -> Option<&Arc<WalManager>> {
496        self.wal.as_ref()
497    }
498
499    /// Logs a WAL record if WAL is enabled.
500    fn log_wal(&self, record: &WalRecord) -> Result<()> {
501        if let Some(ref wal) = self.wal {
502            wal.log(record)?;
503        }
504        Ok(())
505    }
506
507    /// Returns the number of nodes in the database.
508    #[must_use]
509    pub fn node_count(&self) -> usize {
510        self.store.node_count()
511    }
512
513    /// Returns the number of edges in the database.
514    #[must_use]
515    pub fn edge_count(&self) -> usize {
516        self.store.edge_count()
517    }
518
519    /// Returns the number of distinct labels in the database.
520    #[must_use]
521    pub fn label_count(&self) -> usize {
522        self.store.label_count()
523    }
524
525    /// Returns the number of distinct property keys in the database.
526    #[must_use]
527    pub fn property_key_count(&self) -> usize {
528        self.store.property_key_count()
529    }
530
531    /// Returns the number of distinct edge types in the database.
532    #[must_use]
533    pub fn edge_type_count(&self) -> usize {
534        self.store.edge_type_count()
535    }
536
537    // === Node Operations ===
538
539    /// Creates a node with the given labels and returns its ID.
540    ///
541    /// Labels categorize nodes - think of them like tags. A node can have
542    /// multiple labels (e.g., `["Person", "Employee"]`).
543    ///
544    /// # Examples
545    ///
546    /// ```
547    /// use grafeo_engine::GrafeoDB;
548    ///
549    /// let db = GrafeoDB::new_in_memory();
550    /// let alice = db.create_node(&["Person"]);
551    /// let company = db.create_node(&["Company", "Startup"]);
552    /// ```
553    pub fn create_node(&self, labels: &[&str]) -> grafeo_common::types::NodeId {
554        let id = self.store.create_node(labels);
555
556        // Log to WAL if enabled
557        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
558            id,
559            labels: labels.iter().map(|s| s.to_string()).collect(),
560        }) {
561            tracing::warn!("Failed to log CreateNode to WAL: {}", e);
562        }
563
564        id
565    }
566
567    /// Creates a new node with labels and properties.
568    ///
569    /// If WAL is enabled, the operation is logged for durability.
570    pub fn create_node_with_props(
571        &self,
572        labels: &[&str],
573        properties: impl IntoIterator<
574            Item = (
575                impl Into<grafeo_common::types::PropertyKey>,
576                impl Into<grafeo_common::types::Value>,
577            ),
578        >,
579    ) -> grafeo_common::types::NodeId {
580        // Collect properties first so we can log them to WAL
581        let props: Vec<(
582            grafeo_common::types::PropertyKey,
583            grafeo_common::types::Value,
584        )> = properties
585            .into_iter()
586            .map(|(k, v)| (k.into(), v.into()))
587            .collect();
588
589        let id = self
590            .store
591            .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
592
593        // Log node creation to WAL
594        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
595            id,
596            labels: labels.iter().map(|s| s.to_string()).collect(),
597        }) {
598            tracing::warn!("Failed to log CreateNode to WAL: {}", e);
599        }
600
601        // Log each property to WAL for full durability
602        for (key, value) in props {
603            if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
604                id,
605                key: key.to_string(),
606                value,
607            }) {
608                tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
609            }
610        }
611
612        id
613    }
614
615    /// Gets a node by ID.
616    #[must_use]
617    pub fn get_node(
618        &self,
619        id: grafeo_common::types::NodeId,
620    ) -> Option<grafeo_core::graph::lpg::Node> {
621        self.store.get_node(id)
622    }
623
624    /// Deletes a node and all its edges.
625    ///
626    /// If WAL is enabled, the operation is logged for durability.
627    pub fn delete_node(&self, id: grafeo_common::types::NodeId) -> bool {
628        let result = self.store.delete_node(id);
629
630        if result {
631            if let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
632                tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
633            }
634        }
635
636        result
637    }
638
639    /// Sets a property on a node.
640    ///
641    /// If WAL is enabled, the operation is logged for durability.
642    pub fn set_node_property(
643        &self,
644        id: grafeo_common::types::NodeId,
645        key: &str,
646        value: grafeo_common::types::Value,
647    ) {
648        // Log to WAL first
649        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
650            id,
651            key: key.to_string(),
652            value: value.clone(),
653        }) {
654            tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
655        }
656
657        self.store.set_node_property(id, key, value);
658    }
659
660    // === Edge Operations ===
661
662    /// Creates an edge (relationship) between two nodes.
663    ///
664    /// Edges connect nodes and have a type that describes the relationship.
665    /// They're directed - the order of `src` and `dst` matters.
666    ///
667    /// # Examples
668    ///
669    /// ```
670    /// use grafeo_engine::GrafeoDB;
671    ///
672    /// let db = GrafeoDB::new_in_memory();
673    /// let alice = db.create_node(&["Person"]);
674    /// let bob = db.create_node(&["Person"]);
675    ///
676    /// // Alice knows Bob (directed: Alice -> Bob)
677    /// let edge = db.create_edge(alice, bob, "KNOWS");
678    /// ```
679    pub fn create_edge(
680        &self,
681        src: grafeo_common::types::NodeId,
682        dst: grafeo_common::types::NodeId,
683        edge_type: &str,
684    ) -> grafeo_common::types::EdgeId {
685        let id = self.store.create_edge(src, dst, edge_type);
686
687        // Log to WAL if enabled
688        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
689            id,
690            src,
691            dst,
692            edge_type: edge_type.to_string(),
693        }) {
694            tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
695        }
696
697        id
698    }
699
700    /// Creates a new edge with properties.
701    ///
702    /// If WAL is enabled, the operation is logged for durability.
703    pub fn create_edge_with_props(
704        &self,
705        src: grafeo_common::types::NodeId,
706        dst: grafeo_common::types::NodeId,
707        edge_type: &str,
708        properties: impl IntoIterator<
709            Item = (
710                impl Into<grafeo_common::types::PropertyKey>,
711                impl Into<grafeo_common::types::Value>,
712            ),
713        >,
714    ) -> grafeo_common::types::EdgeId {
715        // Collect properties first so we can log them to WAL
716        let props: Vec<(
717            grafeo_common::types::PropertyKey,
718            grafeo_common::types::Value,
719        )> = properties
720            .into_iter()
721            .map(|(k, v)| (k.into(), v.into()))
722            .collect();
723
724        let id = self.store.create_edge_with_props(
725            src,
726            dst,
727            edge_type,
728            props.iter().map(|(k, v)| (k.clone(), v.clone())),
729        );
730
731        // Log edge creation to WAL
732        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
733            id,
734            src,
735            dst,
736            edge_type: edge_type.to_string(),
737        }) {
738            tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
739        }
740
741        // Log each property to WAL for full durability
742        for (key, value) in props {
743            if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
744                id,
745                key: key.to_string(),
746                value,
747            }) {
748                tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
749            }
750        }
751
752        id
753    }
754
755    /// Gets an edge by ID.
756    #[must_use]
757    pub fn get_edge(
758        &self,
759        id: grafeo_common::types::EdgeId,
760    ) -> Option<grafeo_core::graph::lpg::Edge> {
761        self.store.get_edge(id)
762    }
763
764    /// Deletes an edge.
765    ///
766    /// If WAL is enabled, the operation is logged for durability.
767    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
768        let result = self.store.delete_edge(id);
769
770        if result {
771            if let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
772                tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
773            }
774        }
775
776        result
777    }
778
779    /// Sets a property on an edge.
780    ///
781    /// If WAL is enabled, the operation is logged for durability.
782    pub fn set_edge_property(
783        &self,
784        id: grafeo_common::types::EdgeId,
785        key: &str,
786        value: grafeo_common::types::Value,
787    ) {
788        // Log to WAL first
789        if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
790            id,
791            key: key.to_string(),
792            value: value.clone(),
793        }) {
794            tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
795        }
796        self.store.set_edge_property(id, key, value);
797    }
798
799    /// Removes a property from a node.
800    ///
801    /// Returns true if the property existed and was removed, false otherwise.
802    pub fn remove_node_property(&self, id: grafeo_common::types::NodeId, key: &str) -> bool {
803        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
804        self.store.remove_node_property(id, key).is_some()
805    }
806
807    /// Removes a property from an edge.
808    ///
809    /// Returns true if the property existed and was removed, false otherwise.
810    pub fn remove_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str) -> bool {
811        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
812        self.store.remove_edge_property(id, key).is_some()
813    }
814
815    // =========================================================================
816    // ADMIN API: Introspection
817    // =========================================================================
818
819    /// Returns true if this database is backed by a file (persistent).
820    ///
821    /// In-memory databases return false.
822    #[must_use]
823    pub fn is_persistent(&self) -> bool {
824        self.config.path.is_some()
825    }
826
827    /// Returns the database file path, if persistent.
828    ///
829    /// In-memory databases return None.
830    #[must_use]
831    pub fn path(&self) -> Option<&Path> {
832        self.config.path.as_deref()
833    }
834
835    /// Returns high-level database information.
836    ///
837    /// Includes node/edge counts, persistence status, and mode (LPG/RDF).
838    #[must_use]
839    pub fn info(&self) -> crate::admin::DatabaseInfo {
840        crate::admin::DatabaseInfo {
841            mode: crate::admin::DatabaseMode::Lpg,
842            node_count: self.store.node_count(),
843            edge_count: self.store.edge_count(),
844            is_persistent: self.is_persistent(),
845            path: self.config.path.clone(),
846            wal_enabled: self.config.wal_enabled,
847            version: env!("CARGO_PKG_VERSION").to_string(),
848        }
849    }
850
851    /// Returns detailed database statistics.
852    ///
853    /// Includes counts, memory usage, and index information.
854    #[must_use]
855    pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
856        let disk_bytes = self.config.path.as_ref().and_then(|p| {
857            if p.exists() {
858                Self::calculate_disk_usage(p).ok()
859            } else {
860                None
861            }
862        });
863
864        crate::admin::DatabaseStats {
865            node_count: self.store.node_count(),
866            edge_count: self.store.edge_count(),
867            label_count: self.store.label_count(),
868            edge_type_count: self.store.edge_type_count(),
869            property_key_count: self.store.property_key_count(),
870            index_count: 0, // TODO: implement index tracking
871            memory_bytes: self.buffer_manager.allocated(),
872            disk_bytes,
873        }
874    }
875
876    /// Calculates total disk usage for the database directory.
877    fn calculate_disk_usage(path: &Path) -> Result<usize> {
878        let mut total = 0usize;
879        if path.is_dir() {
880            for entry in std::fs::read_dir(path)? {
881                let entry = entry?;
882                let metadata = entry.metadata()?;
883                if metadata.is_file() {
884                    total += metadata.len() as usize;
885                } else if metadata.is_dir() {
886                    total += Self::calculate_disk_usage(&entry.path())?;
887                }
888            }
889        }
890        Ok(total)
891    }
892
893    /// Returns schema information (labels, edge types, property keys).
894    ///
895    /// For LPG mode, returns label and edge type information.
896    /// For RDF mode, returns predicate and named graph information.
897    #[must_use]
898    pub fn schema(&self) -> crate::admin::SchemaInfo {
899        let labels = self
900            .store
901            .all_labels()
902            .into_iter()
903            .map(|name| crate::admin::LabelInfo {
904                name: name.clone(),
905                count: self.store.nodes_with_label(&name).count(),
906            })
907            .collect();
908
909        let edge_types = self
910            .store
911            .all_edge_types()
912            .into_iter()
913            .map(|name| crate::admin::EdgeTypeInfo {
914                name: name.clone(),
915                count: self.store.edges_with_type(&name).count(),
916            })
917            .collect();
918
919        let property_keys = self.store.all_property_keys();
920
921        crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
922            labels,
923            edge_types,
924            property_keys,
925        })
926    }
927
928    /// Returns RDF schema information.
929    ///
930    /// Only available when the RDF feature is enabled.
931    #[cfg(feature = "rdf")]
932    #[must_use]
933    pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
934        let stats = self.rdf_store.stats();
935
936        let predicates = self
937            .rdf_store
938            .predicates()
939            .into_iter()
940            .map(|predicate| {
941                let count = self.rdf_store.triples_with_predicate(&predicate).len();
942                crate::admin::PredicateInfo {
943                    iri: predicate.to_string(),
944                    count,
945                }
946            })
947            .collect();
948
949        crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
950            predicates,
951            named_graphs: Vec::new(), // Named graphs not yet implemented in RdfStore
952            subject_count: stats.subject_count,
953            object_count: stats.object_count,
954        })
955    }
956
957    /// Validates database integrity.
958    ///
959    /// Checks for:
960    /// - Dangling edge references (edges pointing to non-existent nodes)
961    /// - Internal index consistency
962    ///
963    /// Returns a list of errors and warnings. Empty errors = valid.
964    #[must_use]
965    pub fn validate(&self) -> crate::admin::ValidationResult {
966        let mut result = crate::admin::ValidationResult::default();
967
968        // Check for dangling edge references
969        for edge in self.store.all_edges() {
970            if self.store.get_node(edge.src).is_none() {
971                result.errors.push(crate::admin::ValidationError {
972                    code: "DANGLING_SRC".to_string(),
973                    message: format!(
974                        "Edge {} references non-existent source node {}",
975                        edge.id.0, edge.src.0
976                    ),
977                    context: Some(format!("edge:{}", edge.id.0)),
978                });
979            }
980            if self.store.get_node(edge.dst).is_none() {
981                result.errors.push(crate::admin::ValidationError {
982                    code: "DANGLING_DST".to_string(),
983                    message: format!(
984                        "Edge {} references non-existent destination node {}",
985                        edge.id.0, edge.dst.0
986                    ),
987                    context: Some(format!("edge:{}", edge.id.0)),
988                });
989            }
990        }
991
992        // Add warnings for potential issues
993        if self.store.node_count() > 0 && self.store.edge_count() == 0 {
994            result.warnings.push(crate::admin::ValidationWarning {
995                code: "NO_EDGES".to_string(),
996                message: "Database has nodes but no edges".to_string(),
997                context: None,
998            });
999        }
1000
1001        result
1002    }
1003
1004    /// Returns WAL (Write-Ahead Log) status.
1005    ///
1006    /// Returns None if WAL is not enabled.
1007    #[must_use]
1008    pub fn wal_status(&self) -> crate::admin::WalStatus {
1009        if let Some(ref wal) = self.wal {
1010            crate::admin::WalStatus {
1011                enabled: true,
1012                path: self.config.path.as_ref().map(|p| p.join("wal")),
1013                size_bytes: wal.size_bytes(),
1014                record_count: wal.record_count() as usize,
1015                last_checkpoint: wal.last_checkpoint_timestamp(),
1016                current_epoch: self.store.current_epoch().as_u64(),
1017            }
1018        } else {
1019            crate::admin::WalStatus {
1020                enabled: false,
1021                path: None,
1022                size_bytes: 0,
1023                record_count: 0,
1024                last_checkpoint: None,
1025                current_epoch: self.store.current_epoch().as_u64(),
1026            }
1027        }
1028    }
1029
1030    /// Forces a WAL checkpoint.
1031    ///
1032    /// Flushes all pending WAL records to the main storage.
1033    ///
1034    /// # Errors
1035    ///
1036    /// Returns an error if the checkpoint fails.
1037    pub fn wal_checkpoint(&self) -> Result<()> {
1038        if let Some(ref wal) = self.wal {
1039            let epoch = self.store.current_epoch();
1040            let tx_id = self
1041                .tx_manager
1042                .last_assigned_tx_id()
1043                .unwrap_or_else(|| self.tx_manager.begin());
1044            wal.checkpoint(tx_id, epoch)?;
1045            wal.sync()?;
1046        }
1047        Ok(())
1048    }
1049
1050    // =========================================================================
1051    // ADMIN API: Persistence Control
1052    // =========================================================================
1053
1054    /// Saves the database to a file path.
1055    ///
1056    /// - If in-memory: creates a new persistent database at path
1057    /// - If file-backed: creates a copy at the new path
1058    ///
1059    /// The original database remains unchanged.
1060    ///
1061    /// # Errors
1062    ///
1063    /// Returns an error if the save operation fails.
1064    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
1065        let path = path.as_ref();
1066
1067        // Create target database with WAL enabled
1068        let target_config = Config::persistent(path);
1069        let target = Self::with_config(target_config)?;
1070
1071        // Copy all nodes using WAL-enabled methods
1072        for node in self.store.all_nodes() {
1073            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1074            target.store.create_node_with_id(node.id, &label_refs);
1075
1076            // Log to WAL
1077            target.log_wal(&WalRecord::CreateNode {
1078                id: node.id,
1079                labels: node.labels.iter().map(|s| s.to_string()).collect(),
1080            })?;
1081
1082            // Copy properties
1083            for (key, value) in node.properties {
1084                target
1085                    .store
1086                    .set_node_property(node.id, key.as_str(), value.clone());
1087                target.log_wal(&WalRecord::SetNodeProperty {
1088                    id: node.id,
1089                    key: key.to_string(),
1090                    value,
1091                })?;
1092            }
1093        }
1094
1095        // Copy all edges using WAL-enabled methods
1096        for edge in self.store.all_edges() {
1097            target
1098                .store
1099                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1100
1101            // Log to WAL
1102            target.log_wal(&WalRecord::CreateEdge {
1103                id: edge.id,
1104                src: edge.src,
1105                dst: edge.dst,
1106                edge_type: edge.edge_type.to_string(),
1107            })?;
1108
1109            // Copy properties
1110            for (key, value) in edge.properties {
1111                target
1112                    .store
1113                    .set_edge_property(edge.id, key.as_str(), value.clone());
1114                target.log_wal(&WalRecord::SetEdgeProperty {
1115                    id: edge.id,
1116                    key: key.to_string(),
1117                    value,
1118                })?;
1119            }
1120        }
1121
1122        // Checkpoint and close the target database
1123        target.close()?;
1124
1125        Ok(())
1126    }
1127
1128    /// Creates an in-memory copy of this database.
1129    ///
1130    /// Returns a new database that is completely independent.
1131    /// Useful for:
1132    /// - Testing modifications without affecting the original
1133    /// - Faster operations when persistence isn't needed
1134    ///
1135    /// # Errors
1136    ///
1137    /// Returns an error if the copy operation fails.
1138    pub fn to_memory(&self) -> Result<Self> {
1139        let config = Config::in_memory();
1140        let target = Self::with_config(config)?;
1141
1142        // Copy all nodes
1143        for node in self.store.all_nodes() {
1144            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
1145            target.store.create_node_with_id(node.id, &label_refs);
1146
1147            // Copy properties
1148            for (key, value) in node.properties {
1149                target.store.set_node_property(node.id, key.as_str(), value);
1150            }
1151        }
1152
1153        // Copy all edges
1154        for edge in self.store.all_edges() {
1155            target
1156                .store
1157                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
1158
1159            // Copy properties
1160            for (key, value) in edge.properties {
1161                target.store.set_edge_property(edge.id, key.as_str(), value);
1162            }
1163        }
1164
1165        Ok(target)
1166    }
1167
1168    /// Opens a database file and loads it entirely into memory.
1169    ///
1170    /// The returned database has no connection to the original file.
1171    /// Changes will NOT be written back to the file.
1172    ///
1173    /// # Errors
1174    ///
1175    /// Returns an error if the file can't be opened or loaded.
1176    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
1177        // Open the source database (triggers WAL recovery)
1178        let source = Self::open(path)?;
1179
1180        // Create in-memory copy
1181        let target = source.to_memory()?;
1182
1183        // Close the source (releases file handles)
1184        source.close()?;
1185
1186        Ok(target)
1187    }
1188
1189    // =========================================================================
1190    // ADMIN API: Iteration
1191    // =========================================================================
1192
1193    /// Returns an iterator over all nodes in the database.
1194    ///
1195    /// Useful for dump/export operations.
1196    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1197        self.store.all_nodes()
1198    }
1199
1200    /// Returns an iterator over all edges in the database.
1201    ///
1202    /// Useful for dump/export operations.
1203    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1204        self.store.all_edges()
1205    }
1206}
1207
1208impl Drop for GrafeoDB {
1209    fn drop(&mut self) {
1210        if let Err(e) = self.close() {
1211            tracing::error!("Error closing database: {}", e);
1212        }
1213    }
1214}
1215
1216/// The result of running a query.
1217///
1218/// Contains rows and columns, like a table. Use [`iter()`](Self::iter) to
1219/// loop through rows, or [`scalar()`](Self::scalar) if you expect a single value.
1220///
1221/// # Examples
1222///
1223/// ```
1224/// use grafeo_engine::GrafeoDB;
1225///
1226/// let db = GrafeoDB::new_in_memory();
1227/// db.create_node(&["Person"]);
1228///
1229/// let result = db.execute("MATCH (p:Person) RETURN count(p) AS total")?;
1230///
1231/// // Check what we got
1232/// println!("Columns: {:?}", result.columns);
1233/// println!("Rows: {}", result.row_count());
1234///
1235/// // Iterate through results
1236/// for row in result.iter() {
1237///     println!("{:?}", row);
1238/// }
1239/// # Ok::<(), grafeo_common::utils::error::Error>(())
1240/// ```
1241#[derive(Debug)]
1242pub struct QueryResult {
1243    /// Column names from the RETURN clause.
1244    pub columns: Vec<String>,
1245    /// Column types - useful for distinguishing NodeId/EdgeId from plain integers.
1246    pub column_types: Vec<grafeo_common::types::LogicalType>,
1247    /// The actual result rows.
1248    pub rows: Vec<Vec<grafeo_common::types::Value>>,
1249}
1250
1251impl QueryResult {
1252    /// Creates a new empty query result.
1253    #[must_use]
1254    pub fn new(columns: Vec<String>) -> Self {
1255        let len = columns.len();
1256        Self {
1257            columns,
1258            column_types: vec![grafeo_common::types::LogicalType::Any; len],
1259            rows: Vec::new(),
1260        }
1261    }
1262
1263    /// Creates a new empty query result with column types.
1264    #[must_use]
1265    pub fn with_types(
1266        columns: Vec<String>,
1267        column_types: Vec<grafeo_common::types::LogicalType>,
1268    ) -> Self {
1269        Self {
1270            columns,
1271            column_types,
1272            rows: Vec::new(),
1273        }
1274    }
1275
1276    /// Returns the number of rows.
1277    #[must_use]
1278    pub fn row_count(&self) -> usize {
1279        self.rows.len()
1280    }
1281
1282    /// Returns the number of columns.
1283    #[must_use]
1284    pub fn column_count(&self) -> usize {
1285        self.columns.len()
1286    }
1287
1288    /// Returns true if the result is empty.
1289    #[must_use]
1290    pub fn is_empty(&self) -> bool {
1291        self.rows.is_empty()
1292    }
1293
1294    /// Extracts a single value from the result.
1295    ///
1296    /// Use this when your query returns exactly one row with one column,
1297    /// like `RETURN count(n)` or `RETURN sum(p.amount)`.
1298    ///
1299    /// # Errors
1300    ///
1301    /// Returns an error if the result has multiple rows or columns.
1302    pub fn scalar<T: FromValue>(&self) -> Result<T> {
1303        if self.rows.len() != 1 || self.columns.len() != 1 {
1304            return Err(grafeo_common::utils::error::Error::InvalidValue(
1305                "Expected single value".to_string(),
1306            ));
1307        }
1308        T::from_value(&self.rows[0][0])
1309    }
1310
1311    /// Returns an iterator over the rows.
1312    pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
1313        self.rows.iter()
1314    }
1315}
1316
1317/// Converts a [`Value`](grafeo_common::types::Value) to a concrete Rust type.
1318///
1319/// Implemented for common types like `i64`, `f64`, `String`, and `bool`.
1320/// Used by [`QueryResult::scalar()`] to extract typed values.
1321pub trait FromValue: Sized {
1322    /// Attempts the conversion, returning an error on type mismatch.
1323    fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
1324}
1325
1326impl FromValue for i64 {
1327    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1328        value
1329            .as_int64()
1330            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1331                expected: "INT64".to_string(),
1332                found: value.type_name().to_string(),
1333            })
1334    }
1335}
1336
1337impl FromValue for f64 {
1338    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1339        value
1340            .as_float64()
1341            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1342                expected: "FLOAT64".to_string(),
1343                found: value.type_name().to_string(),
1344            })
1345    }
1346}
1347
1348impl FromValue for String {
1349    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1350        value.as_str().map(String::from).ok_or_else(|| {
1351            grafeo_common::utils::error::Error::TypeMismatch {
1352                expected: "STRING".to_string(),
1353                found: value.type_name().to_string(),
1354            }
1355        })
1356    }
1357}
1358
1359impl FromValue for bool {
1360    fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
1361        value
1362            .as_bool()
1363            .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
1364                expected: "BOOL".to_string(),
1365                found: value.type_name().to_string(),
1366            })
1367    }
1368}
1369
1370#[cfg(test)]
1371mod tests {
1372    use super::*;
1373
1374    #[test]
1375    fn test_create_in_memory_database() {
1376        let db = GrafeoDB::new_in_memory();
1377        assert_eq!(db.node_count(), 0);
1378        assert_eq!(db.edge_count(), 0);
1379    }
1380
1381    #[test]
1382    fn test_database_config() {
1383        let config = Config::in_memory().with_threads(4).with_query_logging();
1384
1385        let db = GrafeoDB::with_config(config).unwrap();
1386        assert_eq!(db.config().threads, 4);
1387        assert!(db.config().query_logging);
1388    }
1389
1390    #[test]
1391    fn test_database_session() {
1392        let db = GrafeoDB::new_in_memory();
1393        let _session = db.session();
1394        // Session should be created successfully
1395    }
1396
1397    #[test]
1398    fn test_persistent_database_recovery() {
1399        use grafeo_common::types::Value;
1400        use tempfile::tempdir;
1401
1402        let dir = tempdir().unwrap();
1403        let db_path = dir.path().join("test_db");
1404
1405        // Create database and add some data
1406        {
1407            let db = GrafeoDB::open(&db_path).unwrap();
1408
1409            let alice = db.create_node(&["Person"]);
1410            db.set_node_property(alice, "name", Value::from("Alice"));
1411
1412            let bob = db.create_node(&["Person"]);
1413            db.set_node_property(bob, "name", Value::from("Bob"));
1414
1415            let _edge = db.create_edge(alice, bob, "KNOWS");
1416
1417            // Explicitly close to flush WAL
1418            db.close().unwrap();
1419        }
1420
1421        // Reopen and verify data was recovered
1422        {
1423            let db = GrafeoDB::open(&db_path).unwrap();
1424
1425            assert_eq!(db.node_count(), 2);
1426            assert_eq!(db.edge_count(), 1);
1427
1428            // Verify nodes exist
1429            let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
1430            assert!(node0.is_some());
1431
1432            let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
1433            assert!(node1.is_some());
1434        }
1435    }
1436
1437    #[test]
1438    fn test_wal_logging() {
1439        use tempfile::tempdir;
1440
1441        let dir = tempdir().unwrap();
1442        let db_path = dir.path().join("wal_test_db");
1443
1444        let db = GrafeoDB::open(&db_path).unwrap();
1445
1446        // Create some data
1447        let node = db.create_node(&["Test"]);
1448        db.delete_node(node);
1449
1450        // WAL should have records
1451        if let Some(wal) = db.wal() {
1452            assert!(wal.record_count() > 0);
1453        }
1454
1455        db.close().unwrap();
1456    }
1457}