Skip to main content

graphos_engine/
database.rs

1//! GraphosDB main database struct.
2
3use std::path::Path;
4use std::sync::Arc;
5
6use parking_lot::RwLock;
7
8use graphos_adapters::storage::wal::{WalConfig, WalManager, WalRecord, WalRecovery};
9use graphos_common::memory::buffer::{BufferManager, BufferManagerConfig};
10use graphos_common::utils::error::Result;
11use graphos_core::graph::lpg::LpgStore;
12#[cfg(feature = "rdf")]
13use graphos_core::graph::rdf::RdfStore;
14
15use crate::config::Config;
16use crate::session::Session;
17use crate::transaction::TransactionManager;
18
19/// The main Graphos database.
20pub struct GraphosDB {
21    /// Database configuration.
22    config: Config,
23    /// The underlying graph store.
24    store: Arc<LpgStore>,
25    /// RDF triple store (if RDF feature is enabled).
26    #[cfg(feature = "rdf")]
27    rdf_store: Arc<RdfStore>,
28    /// Transaction manager.
29    tx_manager: Arc<TransactionManager>,
30    /// Unified buffer manager.
31    buffer_manager: Arc<BufferManager>,
32    /// Write-ahead log manager (if durability is enabled).
33    wal: Option<Arc<WalManager>>,
34    /// Whether the database is open.
35    is_open: RwLock<bool>,
36}
37
38impl GraphosDB {
39    /// Creates a new in-memory database.
40    ///
41    /// # Examples
42    ///
43    /// ```
44    /// use graphos_engine::GraphosDB;
45    ///
46    /// let db = GraphosDB::new_in_memory();
47    /// let session = db.session();
48    /// ```
49    #[must_use]
50    pub fn new_in_memory() -> Self {
51        Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
52    }
53
54    /// Opens or creates a database at the given path.
55    ///
56    /// If the database exists, it will be recovered from the WAL.
57    /// If the database does not exist, a new one will be created.
58    ///
59    /// # Errors
60    ///
61    /// Returns an error if the database cannot be opened or created.
62    ///
63    /// # Examples
64    ///
65    /// ```no_run
66    /// use graphos_engine::GraphosDB;
67    ///
68    /// let db = GraphosDB::open("./my_database").expect("Failed to open database");
69    /// ```
70    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
71        Self::with_config(Config::persistent(path.as_ref()))
72    }
73
74    /// Creates a database with the given configuration.
75    ///
76    /// If WAL is enabled and a database exists at the configured path,
77    /// the database will be recovered from the WAL.
78    ///
79    /// # Errors
80    ///
81    /// Returns an error if the database cannot be created or recovery fails.
82    ///
83    /// # Examples
84    ///
85    /// ```
86    /// use graphos_engine::{GraphosDB, Config};
87    ///
88    /// let config = Config::in_memory()
89    ///     .with_memory_limit(512 * 1024 * 1024); // 512MB
90    ///
91    /// let db = GraphosDB::with_config(config).unwrap();
92    /// ```
93    pub fn with_config(config: Config) -> Result<Self> {
94        let store = Arc::new(LpgStore::new());
95        #[cfg(feature = "rdf")]
96        let rdf_store = Arc::new(RdfStore::new());
97        let tx_manager = Arc::new(TransactionManager::new());
98
99        // Create buffer manager with configured limits
100        let buffer_config = BufferManagerConfig {
101            budget: config.memory_limit.unwrap_or_else(|| {
102                (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
103            }),
104            spill_path: config
105                .spill_path
106                .clone()
107                .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
108            ..BufferManagerConfig::default()
109        };
110        let buffer_manager = BufferManager::new(buffer_config);
111
112        // Initialize WAL if persistence is enabled
113        let wal = if config.wal_enabled {
114            if let Some(ref db_path) = config.path {
115                // Create database directory if it doesn't exist
116                std::fs::create_dir_all(db_path)?;
117
118                let wal_path = db_path.join("wal");
119
120                // Check if WAL exists and recover if needed
121                if wal_path.exists() {
122                    let recovery = WalRecovery::new(&wal_path);
123                    let records = recovery.recover()?;
124                    Self::apply_wal_records(&store, &records)?;
125                }
126
127                // Open/create WAL manager
128                let wal_config = WalConfig::default();
129                let wal_manager = WalManager::with_config(&wal_path, wal_config)?;
130                Some(Arc::new(wal_manager))
131            } else {
132                None
133            }
134        } else {
135            None
136        };
137
138        Ok(Self {
139            config,
140            store,
141            #[cfg(feature = "rdf")]
142            rdf_store,
143            tx_manager,
144            buffer_manager,
145            wal,
146            is_open: RwLock::new(true),
147        })
148    }
149
150    /// Applies WAL records to restore the database state.
151    fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
152        for record in records {
153            match record {
154                WalRecord::CreateNode { id, labels } => {
155                    let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
156                    store.create_node_with_id(*id, &label_refs);
157                }
158                WalRecord::DeleteNode { id } => {
159                    store.delete_node(*id);
160                }
161                WalRecord::CreateEdge {
162                    id,
163                    src,
164                    dst,
165                    edge_type,
166                } => {
167                    store.create_edge_with_id(*id, *src, *dst, edge_type);
168                }
169                WalRecord::DeleteEdge { id } => {
170                    store.delete_edge(*id);
171                }
172                WalRecord::SetNodeProperty { id, key, value } => {
173                    store.set_node_property(*id, key, value.clone());
174                }
175                WalRecord::SetEdgeProperty { id, key, value } => {
176                    store.set_edge_property(*id, key, value.clone());
177                }
178                WalRecord::TxCommit { .. }
179                | WalRecord::TxAbort { .. }
180                | WalRecord::Checkpoint { .. } => {
181                    // Transaction control records don't need replay action
182                    // (recovery already filtered to only committed transactions)
183                }
184            }
185        }
186        Ok(())
187    }
188
189    /// Creates a new session for interacting with the database.
190    ///
191    /// # Examples
192    ///
193    /// ```
194    /// use graphos_engine::GraphosDB;
195    ///
196    /// let db = GraphosDB::new_in_memory();
197    /// let session = db.session();
198    /// // Use session for queries and transactions
199    /// ```
200    #[must_use]
201    pub fn session(&self) -> Session {
202        Session::new(Arc::clone(&self.store), Arc::clone(&self.tx_manager))
203    }
204
205    /// Executes a query and returns the result.
206    ///
207    /// This is a convenience method that creates a session, executes the query,
208    /// and returns the result.
209    ///
210    /// # Errors
211    ///
212    /// Returns an error if the query fails.
213    pub fn execute(&self, query: &str) -> Result<QueryResult> {
214        let session = self.session();
215        session.execute(query)
216    }
217
218    /// Executes a query with parameters and returns the result.
219    ///
220    /// # Errors
221    ///
222    /// Returns an error if the query fails.
223    pub fn execute_with_params(
224        &self,
225        query: &str,
226        params: std::collections::HashMap<String, graphos_common::types::Value>,
227    ) -> Result<QueryResult> {
228        let session = self.session();
229        session.execute_with_params(query, params)
230    }
231
232    /// Executes a Gremlin query and returns the result.
233    ///
234    /// # Errors
235    ///
236    /// Returns an error if the query fails.
237    #[cfg(feature = "gremlin")]
238    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
239        let session = self.session();
240        session.execute_gremlin(query)
241    }
242
243    /// Executes a Gremlin query with parameters and returns the result.
244    ///
245    /// # Errors
246    ///
247    /// Returns an error if the query fails.
248    #[cfg(feature = "gremlin")]
249    pub fn execute_gremlin_with_params(
250        &self,
251        query: &str,
252        params: std::collections::HashMap<String, graphos_common::types::Value>,
253    ) -> Result<QueryResult> {
254        let session = self.session();
255        session.execute_gremlin_with_params(query, params)
256    }
257
258    /// Executes a GraphQL query and returns the result.
259    ///
260    /// # Errors
261    ///
262    /// Returns an error if the query fails.
263    #[cfg(feature = "graphql")]
264    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
265        let session = self.session();
266        session.execute_graphql(query)
267    }
268
269    /// Executes a GraphQL query with parameters and returns the result.
270    ///
271    /// # Errors
272    ///
273    /// Returns an error if the query fails.
274    #[cfg(feature = "graphql")]
275    pub fn execute_graphql_with_params(
276        &self,
277        query: &str,
278        params: std::collections::HashMap<String, graphos_common::types::Value>,
279    ) -> Result<QueryResult> {
280        let session = self.session();
281        session.execute_graphql_with_params(query, params)
282    }
283
284    /// Executes a SPARQL query and returns the result.
285    ///
286    /// SPARQL queries operate on the RDF triple store.
287    ///
288    /// # Errors
289    ///
290    /// Returns an error if the query fails.
291    ///
292    /// # Examples
293    ///
294    /// ```ignore
295    /// use graphos_engine::GraphosDB;
296    ///
297    /// let db = GraphosDB::new_in_memory();
298    /// let result = db.execute_sparql("SELECT ?s ?p ?o WHERE { ?s ?p ?o }")?;
299    /// ```
300    #[cfg(all(feature = "sparql", feature = "rdf"))]
301    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
302        use crate::query::{
303            Executor, optimizer::Optimizer, planner_rdf::RdfPlanner, sparql_translator,
304        };
305
306        // Parse and translate the SPARQL query to a logical plan
307        let logical_plan = sparql_translator::translate(query)?;
308
309        // Optimize the plan
310        let optimizer = Optimizer::new();
311        let optimized_plan = optimizer.optimize(logical_plan)?;
312
313        // Convert to physical plan using RDF planner
314        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store));
315        let mut physical_plan = planner.plan(&optimized_plan)?;
316
317        // Execute the plan
318        let executor = Executor::with_columns(physical_plan.columns.clone());
319        executor.execute(physical_plan.operator.as_mut())
320    }
321
322    /// Returns the RDF store.
323    ///
324    /// This provides direct access to the RDF store for triple operations.
325    #[cfg(feature = "rdf")]
326    #[must_use]
327    pub fn rdf_store(&self) -> &Arc<RdfStore> {
328        &self.rdf_store
329    }
330
331    /// Executes a query and returns a single scalar value.
332    ///
333    /// # Errors
334    ///
335    /// Returns an error if the query fails or doesn't return exactly one row.
336    pub fn query_scalar<T: FromValue>(&self, query: &str) -> Result<T> {
337        let result = self.execute(query)?;
338        result.scalar()
339    }
340
341    /// Returns the configuration.
342    #[must_use]
343    pub fn config(&self) -> &Config {
344        &self.config
345    }
346
347    /// Returns the underlying store.
348    ///
349    /// This provides direct access to the LPG store for algorithm implementations.
350    #[must_use]
351    pub fn store(&self) -> &Arc<LpgStore> {
352        &self.store
353    }
354
355    /// Returns the buffer manager for memory-aware operations.
356    #[must_use]
357    pub fn buffer_manager(&self) -> &Arc<BufferManager> {
358        &self.buffer_manager
359    }
360
361    /// Closes the database.
362    ///
363    /// This will:
364    /// - Commit any pending WAL records
365    /// - Create a checkpoint
366    /// - Sync the WAL to disk
367    ///
368    /// # Errors
369    ///
370    /// Returns an error if the WAL cannot be flushed.
371    pub fn close(&self) -> Result<()> {
372        let mut is_open = self.is_open.write();
373        if !*is_open {
374            return Ok(());
375        }
376
377        // Commit and checkpoint WAL
378        if let Some(ref wal) = self.wal {
379            let epoch = self.store.current_epoch();
380
381            // Use the last assigned transaction ID, or create a checkpoint-only tx
382            let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
383                // No transactions have been started; begin one for checkpoint
384                self.tx_manager.begin()
385            });
386
387            // Log a TxCommit to mark all pending records as committed
388            wal.log(&WalRecord::TxCommit {
389                tx_id: checkpoint_tx,
390            })?;
391
392            // Then checkpoint
393            wal.checkpoint(checkpoint_tx, epoch)?;
394            wal.sync()?;
395        }
396
397        *is_open = false;
398        Ok(())
399    }
400
401    /// Returns the WAL manager if available.
402    #[must_use]
403    pub fn wal(&self) -> Option<&Arc<WalManager>> {
404        self.wal.as_ref()
405    }
406
407    /// Logs a WAL record if WAL is enabled.
408    fn log_wal(&self, record: &WalRecord) -> Result<()> {
409        if let Some(ref wal) = self.wal {
410            wal.log(record)?;
411        }
412        Ok(())
413    }
414
415    /// Returns the number of nodes in the database.
416    #[must_use]
417    pub fn node_count(&self) -> usize {
418        self.store.node_count()
419    }
420
421    /// Returns the number of edges in the database.
422    #[must_use]
423    pub fn edge_count(&self) -> usize {
424        self.store.edge_count()
425    }
426
427    /// Returns the number of distinct labels in the database.
428    #[must_use]
429    pub fn label_count(&self) -> usize {
430        self.store.label_count()
431    }
432
433    /// Returns the number of distinct property keys in the database.
434    #[must_use]
435    pub fn property_key_count(&self) -> usize {
436        self.store.property_key_count()
437    }
438
439    /// Returns the number of distinct edge types in the database.
440    #[must_use]
441    pub fn edge_type_count(&self) -> usize {
442        self.store.edge_type_count()
443    }
444
445    // === Node Operations ===
446
447    /// Creates a new node with the given labels.
448    ///
449    /// If WAL is enabled, the operation is logged for durability.
450    pub fn create_node(&self, labels: &[&str]) -> graphos_common::types::NodeId {
451        let id = self.store.create_node(labels);
452
453        // Log to WAL if enabled
454        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
455            id,
456            labels: labels.iter().map(|s| s.to_string()).collect(),
457        }) {
458            tracing::warn!("Failed to log CreateNode to WAL: {}", e);
459        }
460
461        id
462    }
463
464    /// Creates a new node with labels and properties.
465    ///
466    /// If WAL is enabled, the operation is logged for durability.
467    pub fn create_node_with_props(
468        &self,
469        labels: &[&str],
470        properties: impl IntoIterator<
471            Item = (
472                impl Into<graphos_common::types::PropertyKey>,
473                impl Into<graphos_common::types::Value>,
474            ),
475        >,
476    ) -> graphos_common::types::NodeId {
477        // Collect properties first so we can log them to WAL
478        let props: Vec<(
479            graphos_common::types::PropertyKey,
480            graphos_common::types::Value,
481        )> = properties
482            .into_iter()
483            .map(|(k, v)| (k.into(), v.into()))
484            .collect();
485
486        let id = self
487            .store
488            .create_node_with_props(labels, props.iter().map(|(k, v)| (k.clone(), v.clone())));
489
490        // Log node creation to WAL
491        if let Err(e) = self.log_wal(&WalRecord::CreateNode {
492            id,
493            labels: labels.iter().map(|s| s.to_string()).collect(),
494        }) {
495            tracing::warn!("Failed to log CreateNode to WAL: {}", e);
496        }
497
498        // Log each property to WAL for full durability
499        for (key, value) in props {
500            if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
501                id,
502                key: key.to_string(),
503                value,
504            }) {
505                tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
506            }
507        }
508
509        id
510    }
511
512    /// Gets a node by ID.
513    #[must_use]
514    pub fn get_node(
515        &self,
516        id: graphos_common::types::NodeId,
517    ) -> Option<graphos_core::graph::lpg::Node> {
518        self.store.get_node(id)
519    }
520
521    /// Deletes a node and all its edges.
522    ///
523    /// If WAL is enabled, the operation is logged for durability.
524    pub fn delete_node(&self, id: graphos_common::types::NodeId) -> bool {
525        let result = self.store.delete_node(id);
526
527        if result {
528            if let Err(e) = self.log_wal(&WalRecord::DeleteNode { id }) {
529                tracing::warn!("Failed to log DeleteNode to WAL: {}", e);
530            }
531        }
532
533        result
534    }
535
536    /// Sets a property on a node.
537    ///
538    /// If WAL is enabled, the operation is logged for durability.
539    pub fn set_node_property(
540        &self,
541        id: graphos_common::types::NodeId,
542        key: &str,
543        value: graphos_common::types::Value,
544    ) {
545        // Log to WAL first
546        if let Err(e) = self.log_wal(&WalRecord::SetNodeProperty {
547            id,
548            key: key.to_string(),
549            value: value.clone(),
550        }) {
551            tracing::warn!("Failed to log SetNodeProperty to WAL: {}", e);
552        }
553
554        self.store.set_node_property(id, key, value);
555    }
556
557    // === Edge Operations ===
558
559    /// Creates a new edge between two nodes.
560    ///
561    /// If WAL is enabled, the operation is logged for durability.
562    pub fn create_edge(
563        &self,
564        src: graphos_common::types::NodeId,
565        dst: graphos_common::types::NodeId,
566        edge_type: &str,
567    ) -> graphos_common::types::EdgeId {
568        let id = self.store.create_edge(src, dst, edge_type);
569
570        // Log to WAL if enabled
571        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
572            id,
573            src,
574            dst,
575            edge_type: edge_type.to_string(),
576        }) {
577            tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
578        }
579
580        id
581    }
582
583    /// Creates a new edge with properties.
584    ///
585    /// If WAL is enabled, the operation is logged for durability.
586    pub fn create_edge_with_props(
587        &self,
588        src: graphos_common::types::NodeId,
589        dst: graphos_common::types::NodeId,
590        edge_type: &str,
591        properties: impl IntoIterator<
592            Item = (
593                impl Into<graphos_common::types::PropertyKey>,
594                impl Into<graphos_common::types::Value>,
595            ),
596        >,
597    ) -> graphos_common::types::EdgeId {
598        // Collect properties first so we can log them to WAL
599        let props: Vec<(
600            graphos_common::types::PropertyKey,
601            graphos_common::types::Value,
602        )> = properties
603            .into_iter()
604            .map(|(k, v)| (k.into(), v.into()))
605            .collect();
606
607        let id = self.store.create_edge_with_props(
608            src,
609            dst,
610            edge_type,
611            props.iter().map(|(k, v)| (k.clone(), v.clone())),
612        );
613
614        // Log edge creation to WAL
615        if let Err(e) = self.log_wal(&WalRecord::CreateEdge {
616            id,
617            src,
618            dst,
619            edge_type: edge_type.to_string(),
620        }) {
621            tracing::warn!("Failed to log CreateEdge to WAL: {}", e);
622        }
623
624        // Log each property to WAL for full durability
625        for (key, value) in props {
626            if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
627                id,
628                key: key.to_string(),
629                value,
630            }) {
631                tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
632            }
633        }
634
635        id
636    }
637
638    /// Gets an edge by ID.
639    #[must_use]
640    pub fn get_edge(
641        &self,
642        id: graphos_common::types::EdgeId,
643    ) -> Option<graphos_core::graph::lpg::Edge> {
644        self.store.get_edge(id)
645    }
646
647    /// Deletes an edge.
648    ///
649    /// If WAL is enabled, the operation is logged for durability.
650    pub fn delete_edge(&self, id: graphos_common::types::EdgeId) -> bool {
651        let result = self.store.delete_edge(id);
652
653        if result {
654            if let Err(e) = self.log_wal(&WalRecord::DeleteEdge { id }) {
655                tracing::warn!("Failed to log DeleteEdge to WAL: {}", e);
656            }
657        }
658
659        result
660    }
661
662    /// Sets a property on an edge.
663    ///
664    /// If WAL is enabled, the operation is logged for durability.
665    pub fn set_edge_property(
666        &self,
667        id: graphos_common::types::EdgeId,
668        key: &str,
669        value: graphos_common::types::Value,
670    ) {
671        // Log to WAL first
672        if let Err(e) = self.log_wal(&WalRecord::SetEdgeProperty {
673            id,
674            key: key.to_string(),
675            value: value.clone(),
676        }) {
677            tracing::warn!("Failed to log SetEdgeProperty to WAL: {}", e);
678        }
679        self.store.set_edge_property(id, key, value);
680    }
681
682    /// Removes a property from a node.
683    ///
684    /// Returns true if the property existed and was removed, false otherwise.
685    pub fn remove_node_property(&self, id: graphos_common::types::NodeId, key: &str) -> bool {
686        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
687        self.store.remove_node_property(id, key).is_some()
688    }
689
690    /// Removes a property from an edge.
691    ///
692    /// Returns true if the property existed and was removed, false otherwise.
693    pub fn remove_edge_property(&self, id: graphos_common::types::EdgeId, key: &str) -> bool {
694        // Note: RemoveProperty WAL records not yet implemented, but operation works in memory
695        self.store.remove_edge_property(id, key).is_some()
696    }
697}
698
699impl Drop for GraphosDB {
700    fn drop(&mut self) {
701        if let Err(e) = self.close() {
702            tracing::error!("Error closing database: {}", e);
703        }
704    }
705}
706
707/// Result of a query execution.
708#[derive(Debug)]
709pub struct QueryResult {
710    /// Column names.
711    pub columns: Vec<String>,
712    /// Column types (used for distinguishing Node/Edge IDs from regular integers).
713    pub column_types: Vec<graphos_common::types::LogicalType>,
714    /// Result rows.
715    pub rows: Vec<Vec<graphos_common::types::Value>>,
716}
717
718impl QueryResult {
719    /// Creates a new empty query result.
720    #[must_use]
721    pub fn new(columns: Vec<String>) -> Self {
722        let len = columns.len();
723        Self {
724            columns,
725            column_types: vec![graphos_common::types::LogicalType::Any; len],
726            rows: Vec::new(),
727        }
728    }
729
730    /// Creates a new empty query result with column types.
731    #[must_use]
732    pub fn with_types(
733        columns: Vec<String>,
734        column_types: Vec<graphos_common::types::LogicalType>,
735    ) -> Self {
736        Self {
737            columns,
738            column_types,
739            rows: Vec::new(),
740        }
741    }
742
743    /// Returns the number of rows.
744    #[must_use]
745    pub fn row_count(&self) -> usize {
746        self.rows.len()
747    }
748
749    /// Returns the number of columns.
750    #[must_use]
751    pub fn column_count(&self) -> usize {
752        self.columns.len()
753    }
754
755    /// Returns true if the result is empty.
756    #[must_use]
757    pub fn is_empty(&self) -> bool {
758        self.rows.is_empty()
759    }
760
761    /// Gets a single scalar value from the result.
762    ///
763    /// # Errors
764    ///
765    /// Returns an error if the result doesn't have exactly one row and one column.
766    pub fn scalar<T: FromValue>(&self) -> Result<T> {
767        if self.rows.len() != 1 || self.columns.len() != 1 {
768            return Err(graphos_common::utils::error::Error::InvalidValue(
769                "Expected single value".to_string(),
770            ));
771        }
772        T::from_value(&self.rows[0][0])
773    }
774
775    /// Returns an iterator over the rows.
776    pub fn iter(&self) -> impl Iterator<Item = &Vec<graphos_common::types::Value>> {
777        self.rows.iter()
778    }
779}
780
781/// Trait for converting from Value.
782pub trait FromValue: Sized {
783    /// Converts from a Value.
784    ///
785    /// # Errors
786    ///
787    /// Returns an error if the conversion fails.
788    fn from_value(value: &graphos_common::types::Value) -> Result<Self>;
789}
790
791impl FromValue for i64 {
792    fn from_value(value: &graphos_common::types::Value) -> Result<Self> {
793        value
794            .as_int64()
795            .ok_or_else(|| graphos_common::utils::error::Error::TypeMismatch {
796                expected: "INT64".to_string(),
797                found: value.type_name().to_string(),
798            })
799    }
800}
801
802impl FromValue for f64 {
803    fn from_value(value: &graphos_common::types::Value) -> Result<Self> {
804        value
805            .as_float64()
806            .ok_or_else(|| graphos_common::utils::error::Error::TypeMismatch {
807                expected: "FLOAT64".to_string(),
808                found: value.type_name().to_string(),
809            })
810    }
811}
812
813impl FromValue for String {
814    fn from_value(value: &graphos_common::types::Value) -> Result<Self> {
815        value.as_str().map(String::from).ok_or_else(|| {
816            graphos_common::utils::error::Error::TypeMismatch {
817                expected: "STRING".to_string(),
818                found: value.type_name().to_string(),
819            }
820        })
821    }
822}
823
824impl FromValue for bool {
825    fn from_value(value: &graphos_common::types::Value) -> Result<Self> {
826        value
827            .as_bool()
828            .ok_or_else(|| graphos_common::utils::error::Error::TypeMismatch {
829                expected: "BOOL".to_string(),
830                found: value.type_name().to_string(),
831            })
832    }
833}
834
835#[cfg(test)]
836mod tests {
837    use super::*;
838
839    #[test]
840    fn test_create_in_memory_database() {
841        let db = GraphosDB::new_in_memory();
842        assert_eq!(db.node_count(), 0);
843        assert_eq!(db.edge_count(), 0);
844    }
845
846    #[test]
847    fn test_database_config() {
848        let config = Config::in_memory().with_threads(4).with_query_logging();
849
850        let db = GraphosDB::with_config(config).unwrap();
851        assert_eq!(db.config().threads, 4);
852        assert!(db.config().query_logging);
853    }
854
855    #[test]
856    fn test_database_session() {
857        let db = GraphosDB::new_in_memory();
858        let _session = db.session();
859        // Session should be created successfully
860    }
861
862    #[test]
863    fn test_persistent_database_recovery() {
864        use graphos_common::types::Value;
865        use tempfile::tempdir;
866
867        let dir = tempdir().unwrap();
868        let db_path = dir.path().join("test_db");
869
870        // Create database and add some data
871        {
872            let db = GraphosDB::open(&db_path).unwrap();
873
874            let alice = db.create_node(&["Person"]);
875            db.set_node_property(alice, "name", Value::from("Alice"));
876
877            let bob = db.create_node(&["Person"]);
878            db.set_node_property(bob, "name", Value::from("Bob"));
879
880            let _edge = db.create_edge(alice, bob, "KNOWS");
881
882            // Explicitly close to flush WAL
883            db.close().unwrap();
884        }
885
886        // Reopen and verify data was recovered
887        {
888            let db = GraphosDB::open(&db_path).unwrap();
889
890            assert_eq!(db.node_count(), 2);
891            assert_eq!(db.edge_count(), 1);
892
893            // Verify nodes exist
894            let node0 = db.get_node(graphos_common::types::NodeId::new(0));
895            assert!(node0.is_some());
896
897            let node1 = db.get_node(graphos_common::types::NodeId::new(1));
898            assert!(node1.is_some());
899        }
900    }
901
902    #[test]
903    fn test_wal_logging() {
904        use tempfile::tempdir;
905
906        let dir = tempdir().unwrap();
907        let db_path = dir.path().join("wal_test_db");
908
909        let db = GraphosDB::open(&db_path).unwrap();
910
911        // Create some data
912        let node = db.create_node(&["Test"]);
913        db.delete_node(node);
914
915        // WAL should have records
916        if let Some(wal) = db.wal() {
917            assert!(wal.record_count() > 0);
918        }
919
920        db.close().unwrap();
921    }
922}