1mod admin;
15mod crud;
16#[cfg(feature = "embed")]
17mod embed;
18mod index;
19mod persistence;
20mod query;
21mod search;
22
23use std::path::Path;
24use std::sync::Arc;
25use std::sync::atomic::AtomicUsize;
26
27use parking_lot::RwLock;
28
29#[cfg(feature = "wal")]
30use grafeo_adapters::storage::wal::{
31 DurabilityMode as WalDurabilityMode, LpgWal, WalConfig, WalRecord, WalRecovery,
32};
33use grafeo_common::memory::buffer::{BufferManager, BufferManagerConfig};
34use grafeo_common::utils::error::Result;
35use grafeo_core::graph::GraphStoreMut;
36use grafeo_core::graph::lpg::LpgStore;
37#[cfg(feature = "rdf")]
38use grafeo_core::graph::rdf::RdfStore;
39
40use crate::config::Config;
41use crate::query::cache::QueryCache;
42use crate::session::Session;
43use crate::transaction::TransactionManager;
44
45pub struct GrafeoDB {
68 pub(super) config: Config,
70 pub(super) store: Arc<LpgStore>,
72 #[cfg(feature = "rdf")]
74 pub(super) rdf_store: Arc<RdfStore>,
75 pub(super) tx_manager: Arc<TransactionManager>,
77 pub(super) buffer_manager: Arc<BufferManager>,
79 #[cfg(feature = "wal")]
81 pub(super) wal: Option<Arc<LpgWal>>,
82 pub(super) query_cache: Arc<QueryCache>,
84 pub(super) commit_counter: Arc<AtomicUsize>,
86 pub(super) is_open: RwLock<bool>,
88 #[cfg(feature = "cdc")]
90 pub(super) cdc_log: Arc<crate::cdc::CdcLog>,
91 #[cfg(feature = "embed")]
93 pub(super) embedding_models:
94 RwLock<hashbrown::HashMap<String, Arc<dyn crate::embedding::EmbeddingModel>>>,
95 pub(super) external_store: Option<Arc<dyn GraphStoreMut>>,
98}
99
100impl GrafeoDB {
101 #[must_use]
117 pub fn new_in_memory() -> Self {
118 Self::with_config(Config::in_memory()).expect("In-memory database creation should not fail")
119 }
120
121 #[cfg(feature = "wal")]
140 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
141 Self::with_config(Config::persistent(path.as_ref()))
142 }
143
144 pub fn with_config(config: Config) -> Result<Self> {
168 config
170 .validate()
171 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
172
173 let store = Arc::new(LpgStore::new());
174 #[cfg(feature = "rdf")]
175 let rdf_store = Arc::new(RdfStore::new());
176 let tx_manager = Arc::new(TransactionManager::new());
177
178 let buffer_config = BufferManagerConfig {
180 budget: config.memory_limit.unwrap_or_else(|| {
181 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
182 }),
183 spill_path: config
184 .spill_path
185 .clone()
186 .or_else(|| config.path.as_ref().map(|p| p.join("spill"))),
187 ..BufferManagerConfig::default()
188 };
189 let buffer_manager = BufferManager::new(buffer_config);
190
191 #[cfg(feature = "wal")]
193 let wal = if config.wal_enabled {
194 if let Some(ref db_path) = config.path {
195 std::fs::create_dir_all(db_path)?;
197
198 let wal_path = db_path.join("wal");
199
200 if wal_path.exists() {
202 let recovery = WalRecovery::new(&wal_path);
203 let records = recovery.recover()?;
204 Self::apply_wal_records(&store, &records)?;
205 }
206
207 let wal_durability = match config.wal_durability {
209 crate::config::DurabilityMode::Sync => WalDurabilityMode::Sync,
210 crate::config::DurabilityMode::Batch {
211 max_delay_ms,
212 max_records,
213 } => WalDurabilityMode::Batch {
214 max_delay_ms,
215 max_records,
216 },
217 crate::config::DurabilityMode::Adaptive { target_interval_ms } => {
218 WalDurabilityMode::Adaptive { target_interval_ms }
219 }
220 crate::config::DurabilityMode::NoSync => WalDurabilityMode::NoSync,
221 };
222 let wal_config = WalConfig {
223 durability: wal_durability,
224 ..WalConfig::default()
225 };
226 let wal_manager = LpgWal::with_config(&wal_path, wal_config)?;
227 Some(Arc::new(wal_manager))
228 } else {
229 None
230 }
231 } else {
232 None
233 };
234
235 let query_cache = Arc::new(QueryCache::default());
237
238 Ok(Self {
239 config,
240 store,
241 #[cfg(feature = "rdf")]
242 rdf_store,
243 tx_manager,
244 buffer_manager,
245 #[cfg(feature = "wal")]
246 wal,
247 query_cache,
248 commit_counter: Arc::new(AtomicUsize::new(0)),
249 is_open: RwLock::new(true),
250 #[cfg(feature = "cdc")]
251 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
252 #[cfg(feature = "embed")]
253 embedding_models: RwLock::new(hashbrown::HashMap::new()),
254 external_store: None,
255 })
256 }
257
258 pub fn with_store(store: Arc<dyn GraphStoreMut>, config: Config) -> Result<Self> {
283 config
284 .validate()
285 .map_err(|e| grafeo_common::utils::error::Error::Internal(e.to_string()))?;
286
287 let dummy_store = Arc::new(LpgStore::new());
288 let tx_manager = Arc::new(TransactionManager::new());
289
290 let buffer_config = BufferManagerConfig {
291 budget: config.memory_limit.unwrap_or_else(|| {
292 (BufferManagerConfig::detect_system_memory() as f64 * 0.75) as usize
293 }),
294 spill_path: None,
295 ..BufferManagerConfig::default()
296 };
297 let buffer_manager = BufferManager::new(buffer_config);
298
299 let query_cache = Arc::new(QueryCache::default());
300
301 Ok(Self {
302 config,
303 store: dummy_store,
304 #[cfg(feature = "rdf")]
305 rdf_store: Arc::new(RdfStore::new()),
306 tx_manager,
307 buffer_manager,
308 #[cfg(feature = "wal")]
309 wal: None,
310 query_cache,
311 commit_counter: Arc::new(AtomicUsize::new(0)),
312 is_open: RwLock::new(true),
313 #[cfg(feature = "cdc")]
314 cdc_log: Arc::new(crate::cdc::CdcLog::new()),
315 #[cfg(feature = "embed")]
316 embedding_models: RwLock::new(hashbrown::HashMap::new()),
317 external_store: Some(store),
318 })
319 }
320
321 #[cfg(feature = "wal")]
323 fn apply_wal_records(store: &LpgStore, records: &[WalRecord]) -> Result<()> {
324 for record in records {
325 match record {
326 WalRecord::CreateNode { id, labels } => {
327 let label_refs: Vec<&str> = labels.iter().map(|s| s.as_str()).collect();
328 store.create_node_with_id(*id, &label_refs);
329 }
330 WalRecord::DeleteNode { id } => {
331 store.delete_node(*id);
332 }
333 WalRecord::CreateEdge {
334 id,
335 src,
336 dst,
337 edge_type,
338 } => {
339 store.create_edge_with_id(*id, *src, *dst, edge_type);
340 }
341 WalRecord::DeleteEdge { id } => {
342 store.delete_edge(*id);
343 }
344 WalRecord::SetNodeProperty { id, key, value } => {
345 store.set_node_property(*id, key, value.clone());
346 }
347 WalRecord::SetEdgeProperty { id, key, value } => {
348 store.set_edge_property(*id, key, value.clone());
349 }
350 WalRecord::AddNodeLabel { id, label } => {
351 store.add_label(*id, label);
352 }
353 WalRecord::RemoveNodeLabel { id, label } => {
354 store.remove_label(*id, label);
355 }
356 WalRecord::TxCommit { .. }
357 | WalRecord::TxAbort { .. }
358 | WalRecord::Checkpoint { .. } => {
359 }
362 }
363 }
364 Ok(())
365 }
366
367 #[must_use]
390 pub fn session(&self) -> Session {
391 if let Some(ref ext_store) = self.external_store {
392 return Session::with_external_store(
393 Arc::clone(ext_store),
394 Arc::clone(&self.tx_manager),
395 Arc::clone(&self.query_cache),
396 self.config.adaptive.clone(),
397 self.config.factorized_execution,
398 self.config.graph_model,
399 self.config.query_timeout,
400 Arc::clone(&self.commit_counter),
401 self.config.gc_interval,
402 );
403 }
404
405 #[cfg(feature = "rdf")]
406 let mut session = Session::with_rdf_store_and_adaptive(
407 Arc::clone(&self.store),
408 Arc::clone(&self.rdf_store),
409 Arc::clone(&self.tx_manager),
410 Arc::clone(&self.query_cache),
411 self.config.adaptive.clone(),
412 self.config.factorized_execution,
413 self.config.graph_model,
414 self.config.query_timeout,
415 Arc::clone(&self.commit_counter),
416 self.config.gc_interval,
417 );
418 #[cfg(not(feature = "rdf"))]
419 let mut session = Session::with_adaptive(
420 Arc::clone(&self.store),
421 Arc::clone(&self.tx_manager),
422 Arc::clone(&self.query_cache),
423 self.config.adaptive.clone(),
424 self.config.factorized_execution,
425 self.config.graph_model,
426 self.config.query_timeout,
427 Arc::clone(&self.commit_counter),
428 self.config.gc_interval,
429 );
430
431 #[cfg(feature = "cdc")]
432 session.set_cdc_log(Arc::clone(&self.cdc_log));
433
434 let _ = &mut session;
436
437 session
438 }
439
440 #[must_use]
442 pub fn adaptive_config(&self) -> &crate::config::AdaptiveConfig {
443 &self.config.adaptive
444 }
445
446 #[must_use]
448 pub fn config(&self) -> &Config {
449 &self.config
450 }
451
452 #[must_use]
454 pub fn graph_model(&self) -> crate::config::GraphModel {
455 self.config.graph_model
456 }
457
458 #[must_use]
460 pub fn memory_limit(&self) -> Option<usize> {
461 self.config.memory_limit
462 }
463
464 #[must_use]
472 pub fn store(&self) -> &Arc<LpgStore> {
473 &self.store
474 }
475
476 #[must_use]
484 pub fn graph_store(&self) -> Arc<dyn GraphStoreMut> {
485 if let Some(ref ext_store) = self.external_store {
486 Arc::clone(ext_store)
487 } else {
488 Arc::clone(&self.store) as Arc<dyn GraphStoreMut>
489 }
490 }
491
492 pub fn gc(&self) {
498 let min_epoch = self.tx_manager.min_active_epoch();
499 self.store.gc_versions(min_epoch);
500 self.tx_manager.gc();
501 }
502
503 #[must_use]
505 pub fn buffer_manager(&self) -> &Arc<BufferManager> {
506 &self.buffer_manager
507 }
508
509 #[must_use]
511 pub fn query_cache(&self) -> &Arc<QueryCache> {
512 &self.query_cache
513 }
514
515 pub fn close(&self) -> Result<()> {
529 let mut is_open = self.is_open.write();
530 if !*is_open {
531 return Ok(());
532 }
533
534 #[cfg(feature = "wal")]
536 if let Some(ref wal) = self.wal {
537 let epoch = self.store.current_epoch();
538
539 let checkpoint_tx = self.tx_manager.last_assigned_tx_id().unwrap_or_else(|| {
541 self.tx_manager.begin()
543 });
544
545 wal.log(&WalRecord::TxCommit {
547 tx_id: checkpoint_tx,
548 })?;
549
550 wal.checkpoint(checkpoint_tx, epoch)?;
552 wal.sync()?;
553 }
554
555 *is_open = false;
556 Ok(())
557 }
558
559 #[cfg(feature = "wal")]
561 #[must_use]
562 pub fn wal(&self) -> Option<&Arc<LpgWal>> {
563 self.wal.as_ref()
564 }
565
566 #[cfg(feature = "wal")]
568 pub(super) fn log_wal(&self, record: &WalRecord) -> Result<()> {
569 if let Some(ref wal) = self.wal {
570 wal.log(record)?;
571 }
572 Ok(())
573 }
574}
575
576impl Drop for GrafeoDB {
577 fn drop(&mut self) {
578 if let Err(e) = self.close() {
579 tracing::error!("Error closing database: {}", e);
580 }
581 }
582}
583
584impl crate::admin::AdminService for GrafeoDB {
585 fn info(&self) -> crate::admin::DatabaseInfo {
586 self.info()
587 }
588
589 fn detailed_stats(&self) -> crate::admin::DatabaseStats {
590 self.detailed_stats()
591 }
592
593 fn schema(&self) -> crate::admin::SchemaInfo {
594 self.schema()
595 }
596
597 fn validate(&self) -> crate::admin::ValidationResult {
598 self.validate()
599 }
600
601 fn wal_status(&self) -> crate::admin::WalStatus {
602 self.wal_status()
603 }
604
605 fn wal_checkpoint(&self) -> Result<()> {
606 self.wal_checkpoint()
607 }
608}
609
610#[derive(Debug)]
640pub struct QueryResult {
641 pub columns: Vec<String>,
643 pub column_types: Vec<grafeo_common::types::LogicalType>,
645 pub rows: Vec<Vec<grafeo_common::types::Value>>,
647 pub execution_time_ms: Option<f64>,
649 pub rows_scanned: Option<u64>,
651}
652
653impl QueryResult {
654 #[must_use]
656 pub fn new(columns: Vec<String>) -> Self {
657 let len = columns.len();
658 Self {
659 columns,
660 column_types: vec![grafeo_common::types::LogicalType::Any; len],
661 rows: Vec::new(),
662 execution_time_ms: None,
663 rows_scanned: None,
664 }
665 }
666
667 #[must_use]
669 pub fn with_types(
670 columns: Vec<String>,
671 column_types: Vec<grafeo_common::types::LogicalType>,
672 ) -> Self {
673 Self {
674 columns,
675 column_types,
676 rows: Vec::new(),
677 execution_time_ms: None,
678 rows_scanned: None,
679 }
680 }
681
682 pub fn with_metrics(mut self, execution_time_ms: f64, rows_scanned: u64) -> Self {
684 self.execution_time_ms = Some(execution_time_ms);
685 self.rows_scanned = Some(rows_scanned);
686 self
687 }
688
689 #[must_use]
691 pub fn execution_time_ms(&self) -> Option<f64> {
692 self.execution_time_ms
693 }
694
695 #[must_use]
697 pub fn rows_scanned(&self) -> Option<u64> {
698 self.rows_scanned
699 }
700
701 #[must_use]
703 pub fn row_count(&self) -> usize {
704 self.rows.len()
705 }
706
707 #[must_use]
709 pub fn column_count(&self) -> usize {
710 self.columns.len()
711 }
712
713 #[must_use]
715 pub fn is_empty(&self) -> bool {
716 self.rows.is_empty()
717 }
718
719 pub fn scalar<T: FromValue>(&self) -> Result<T> {
728 if self.rows.len() != 1 || self.columns.len() != 1 {
729 return Err(grafeo_common::utils::error::Error::InvalidValue(
730 "Expected single value".to_string(),
731 ));
732 }
733 T::from_value(&self.rows[0][0])
734 }
735
736 pub fn iter(&self) -> impl Iterator<Item = &Vec<grafeo_common::types::Value>> {
738 self.rows.iter()
739 }
740}
741
742pub trait FromValue: Sized {
747 fn from_value(value: &grafeo_common::types::Value) -> Result<Self>;
749}
750
751impl FromValue for i64 {
752 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
753 value
754 .as_int64()
755 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
756 expected: "INT64".to_string(),
757 found: value.type_name().to_string(),
758 })
759 }
760}
761
762impl FromValue for f64 {
763 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
764 value
765 .as_float64()
766 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
767 expected: "FLOAT64".to_string(),
768 found: value.type_name().to_string(),
769 })
770 }
771}
772
773impl FromValue for String {
774 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
775 value.as_str().map(String::from).ok_or_else(|| {
776 grafeo_common::utils::error::Error::TypeMismatch {
777 expected: "STRING".to_string(),
778 found: value.type_name().to_string(),
779 }
780 })
781 }
782}
783
784impl FromValue for bool {
785 fn from_value(value: &grafeo_common::types::Value) -> Result<Self> {
786 value
787 .as_bool()
788 .ok_or_else(|| grafeo_common::utils::error::Error::TypeMismatch {
789 expected: "BOOL".to_string(),
790 found: value.type_name().to_string(),
791 })
792 }
793}
794
795#[cfg(test)]
796mod tests {
797 use super::*;
798
799 #[test]
800 fn test_create_in_memory_database() {
801 let db = GrafeoDB::new_in_memory();
802 assert_eq!(db.node_count(), 0);
803 assert_eq!(db.edge_count(), 0);
804 }
805
806 #[test]
807 fn test_database_config() {
808 let config = Config::in_memory().with_threads(4).with_query_logging();
809
810 let db = GrafeoDB::with_config(config).unwrap();
811 assert_eq!(db.config().threads, 4);
812 assert!(db.config().query_logging);
813 }
814
815 #[test]
816 fn test_database_session() {
817 let db = GrafeoDB::new_in_memory();
818 let _session = db.session();
819 }
821
822 #[cfg(feature = "wal")]
823 #[test]
824 fn test_persistent_database_recovery() {
825 use grafeo_common::types::Value;
826 use tempfile::tempdir;
827
828 let dir = tempdir().unwrap();
829 let db_path = dir.path().join("test_db");
830
831 {
833 let db = GrafeoDB::open(&db_path).unwrap();
834
835 let alice = db.create_node(&["Person"]);
836 db.set_node_property(alice, "name", Value::from("Alice"));
837
838 let bob = db.create_node(&["Person"]);
839 db.set_node_property(bob, "name", Value::from("Bob"));
840
841 let _edge = db.create_edge(alice, bob, "KNOWS");
842
843 db.close().unwrap();
845 }
846
847 {
849 let db = GrafeoDB::open(&db_path).unwrap();
850
851 assert_eq!(db.node_count(), 2);
852 assert_eq!(db.edge_count(), 1);
853
854 let node0 = db.get_node(grafeo_common::types::NodeId::new(0));
856 assert!(node0.is_some());
857
858 let node1 = db.get_node(grafeo_common::types::NodeId::new(1));
859 assert!(node1.is_some());
860 }
861 }
862
863 #[cfg(feature = "wal")]
864 #[test]
865 fn test_wal_logging() {
866 use tempfile::tempdir;
867
868 let dir = tempdir().unwrap();
869 let db_path = dir.path().join("wal_test_db");
870
871 let db = GrafeoDB::open(&db_path).unwrap();
872
873 let node = db.create_node(&["Test"]);
875 db.delete_node(node);
876
877 if let Some(wal) = db.wal() {
879 assert!(wal.record_count() > 0);
880 }
881
882 db.close().unwrap();
883 }
884
885 #[cfg(feature = "wal")]
886 #[test]
887 fn test_wal_recovery_multiple_sessions() {
888 use grafeo_common::types::Value;
890 use tempfile::tempdir;
891
892 let dir = tempdir().unwrap();
893 let db_path = dir.path().join("multi_session_db");
894
895 {
897 let db = GrafeoDB::open(&db_path).unwrap();
898 let alice = db.create_node(&["Person"]);
899 db.set_node_property(alice, "name", Value::from("Alice"));
900 db.close().unwrap();
901 }
902
903 {
905 let db = GrafeoDB::open(&db_path).unwrap();
906 assert_eq!(db.node_count(), 1); let bob = db.create_node(&["Person"]);
908 db.set_node_property(bob, "name", Value::from("Bob"));
909 db.close().unwrap();
910 }
911
912 {
914 let db = GrafeoDB::open(&db_path).unwrap();
915 assert_eq!(db.node_count(), 2);
916
917 let node0 = db.get_node(grafeo_common::types::NodeId::new(0)).unwrap();
919 assert!(node0.labels.iter().any(|l| l.as_str() == "Person"));
920
921 let node1 = db.get_node(grafeo_common::types::NodeId::new(1)).unwrap();
922 assert!(node1.labels.iter().any(|l| l.as_str() == "Person"));
923 }
924 }
925
926 #[cfg(feature = "wal")]
927 #[test]
928 fn test_database_consistency_after_mutations() {
929 use grafeo_common::types::Value;
931 use tempfile::tempdir;
932
933 let dir = tempdir().unwrap();
934 let db_path = dir.path().join("consistency_db");
935
936 {
937 let db = GrafeoDB::open(&db_path).unwrap();
938
939 let a = db.create_node(&["Node"]);
941 let b = db.create_node(&["Node"]);
942 let c = db.create_node(&["Node"]);
943
944 let e1 = db.create_edge(a, b, "LINKS");
946 let _e2 = db.create_edge(b, c, "LINKS");
947
948 db.delete_edge(e1);
950 db.delete_node(b);
951
952 db.set_node_property(a, "value", Value::Int64(1));
954 db.set_node_property(c, "value", Value::Int64(3));
955
956 db.close().unwrap();
957 }
958
959 {
961 let db = GrafeoDB::open(&db_path).unwrap();
962
963 let node_a = db.get_node(grafeo_common::types::NodeId::new(0));
967 assert!(node_a.is_some());
968
969 let node_c = db.get_node(grafeo_common::types::NodeId::new(2));
970 assert!(node_c.is_some());
971
972 let node_b = db.get_node(grafeo_common::types::NodeId::new(1));
974 assert!(node_b.is_none());
975 }
976 }
977
978 #[cfg(feature = "wal")]
979 #[test]
980 fn test_close_is_idempotent() {
981 use tempfile::tempdir;
983
984 let dir = tempdir().unwrap();
985 let db_path = dir.path().join("close_test_db");
986
987 let db = GrafeoDB::open(&db_path).unwrap();
988 db.create_node(&["Test"]);
989
990 assert!(db.close().is_ok());
992
993 assert!(db.close().is_ok());
995 }
996
997 #[test]
998 fn test_query_result_has_metrics() {
999 let db = GrafeoDB::new_in_memory();
1001 db.create_node(&["Person"]);
1002 db.create_node(&["Person"]);
1003
1004 #[cfg(feature = "gql")]
1005 {
1006 let result = db.execute("MATCH (n:Person) RETURN n").unwrap();
1007
1008 assert!(result.execution_time_ms.is_some());
1010 assert!(result.rows_scanned.is_some());
1011 assert!(result.execution_time_ms.unwrap() >= 0.0);
1012 assert_eq!(result.rows_scanned.unwrap(), 2);
1013 }
1014 }
1015
1016 #[test]
1017 fn test_empty_query_result_metrics() {
1018 let db = GrafeoDB::new_in_memory();
1020 db.create_node(&["Person"]);
1021
1022 #[cfg(feature = "gql")]
1023 {
1024 let result = db.execute("MATCH (n:NonExistent) RETURN n").unwrap();
1026
1027 assert!(result.execution_time_ms.is_some());
1028 assert!(result.rows_scanned.is_some());
1029 assert_eq!(result.rows_scanned.unwrap(), 0);
1030 }
1031 }
1032
1033 #[cfg(feature = "cdc")]
1034 mod cdc_integration {
1035 use super::*;
1036
1037 #[test]
1038 fn test_node_lifecycle_history() {
1039 let db = GrafeoDB::new_in_memory();
1040
1041 let id = db.create_node(&["Person"]);
1043 db.set_node_property(id, "name", "Alice".into());
1045 db.set_node_property(id, "name", "Bob".into());
1046 db.delete_node(id);
1048
1049 let history = db.history(id).unwrap();
1050 assert_eq!(history.len(), 4); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1052 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1053 assert!(history[1].before.is_none()); assert_eq!(history[2].kind, crate::cdc::ChangeKind::Update);
1055 assert!(history[2].before.is_some()); assert_eq!(history[3].kind, crate::cdc::ChangeKind::Delete);
1057 }
1058
1059 #[test]
1060 fn test_edge_lifecycle_history() {
1061 let db = GrafeoDB::new_in_memory();
1062
1063 let alice = db.create_node(&["Person"]);
1064 let bob = db.create_node(&["Person"]);
1065 let edge = db.create_edge(alice, bob, "KNOWS");
1066 db.set_edge_property(edge, "since", 2024i64.into());
1067 db.delete_edge(edge);
1068
1069 let history = db.history(edge).unwrap();
1070 assert_eq!(history.len(), 3); assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1072 assert_eq!(history[1].kind, crate::cdc::ChangeKind::Update);
1073 assert_eq!(history[2].kind, crate::cdc::ChangeKind::Delete);
1074 }
1075
1076 #[test]
1077 fn test_create_node_with_props_cdc() {
1078 let db = GrafeoDB::new_in_memory();
1079
1080 let id = db.create_node_with_props(
1081 &["Person"],
1082 vec![
1083 ("name", grafeo_common::types::Value::from("Alice")),
1084 ("age", grafeo_common::types::Value::from(30i64)),
1085 ],
1086 );
1087
1088 let history = db.history(id).unwrap();
1089 assert_eq!(history.len(), 1);
1090 assert_eq!(history[0].kind, crate::cdc::ChangeKind::Create);
1091 let after = history[0].after.as_ref().unwrap();
1093 assert_eq!(after.len(), 2);
1094 }
1095
1096 #[test]
1097 fn test_changes_between() {
1098 let db = GrafeoDB::new_in_memory();
1099
1100 let id1 = db.create_node(&["A"]);
1101 let _id2 = db.create_node(&["B"]);
1102 db.set_node_property(id1, "x", 1i64.into());
1103
1104 let changes = db
1106 .changes_between(
1107 grafeo_common::types::EpochId(0),
1108 grafeo_common::types::EpochId(u64::MAX),
1109 )
1110 .unwrap();
1111 assert_eq!(changes.len(), 3); }
1113 }
1114}