1use std::collections::{HashMap, HashSet, BTreeMap};
7use std::sync::Arc;
8use tokio::sync::RwLock;
9use rocksdb::{DB, ColumnFamilyDescriptor, Options, WriteBatch, IteratorMode};
10use serde::{Deserialize, Serialize};
11use tracing::{info, warn, error, instrument};
12use dashmap::DashMap;
13use bincode;
14use uuid::Uuid;
15use chrono::{DateTime, Utc};
16
17pub struct GraphDB {
19 db: Arc<DB>,
21 node_cache: Arc<DashMap<String, Node>>,
23 edge_cache: Arc<DashMap<String, Edge>>,
25 schema: Arc<RwLock<Schema>>,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct Node {
32 pub id: String,
34 pub labels: Vec<String>,
36 pub properties: BTreeMap<String, PropertyValue>,
38 pub created_at: DateTime<Utc>,
40 pub updated_at: DateTime<Utc>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct Edge {
47 pub id: String,
49 pub from_node: String,
51 pub to_node: String,
53 pub label: String,
55 pub properties: BTreeMap<String, PropertyValue>,
57 pub created_at: DateTime<Utc>,
59 pub updated_at: DateTime<Utc>,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
65#[serde(untagged)]
66pub enum PropertyValue {
67 String(String),
69 Integer(i64),
71 Float(f64),
73 Boolean(bool),
75 Date(DateTime<Utc>),
77 List(Vec<PropertyValue>),
79 Map(BTreeMap<String, PropertyValue>),
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct GraphQuery {
86 pub node_patterns: Vec<NodePattern>,
88 pub edge_patterns: Vec<EdgePattern>,
90 pub conditions: Vec<QueryCondition>,
92 pub returns: Vec<ReturnSpec>,
94 pub limit: Option<usize>,
96 pub skip: Option<usize>,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct NodePattern {
103 pub variable: Option<String>,
105 pub labels: Vec<String>,
107 pub properties: BTreeMap<String, PropertyCondition>,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct EdgePattern {
114 pub variable: Option<String>,
116 pub label: Option<String>,
118 pub properties: BTreeMap<String, PropertyCondition>,
120 pub from_variable: Option<String>,
122 pub to_variable: Option<String>,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct PropertyCondition {
129 pub operator: ComparisonOperator,
131 pub value: PropertyValue,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
137pub enum ComparisonOperator {
138 Equal,
139 NotEqual,
140 GreaterThan,
141 LessThan,
142 GreaterEqual,
143 LessEqual,
144 Contains,
145 StartsWith,
146 EndsWith,
147 In,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
152pub enum QueryCondition {
153 Property {
155 variable: String,
156 property: String,
157 condition: PropertyCondition,
158 },
159 And(Box<QueryCondition>, Box<QueryCondition>),
161 Or(Box<QueryCondition>, Box<QueryCondition>),
163 Not(Box<QueryCondition>),
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
169pub enum ReturnSpec {
170 Node(String),
172 Edge(String),
174 Property { variable: String, property: String },
176 Count,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct QueryResult {
183 pub columns: Vec<String>,
185 pub rows: Vec<ResultRow>,
187 pub statistics: QueryStatistics,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct ResultRow {
194 pub data: BTreeMap<String, PropertyValue>,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct QueryStatistics {
201 pub execution_time_ms: u64,
203 pub nodes_scanned: u64,
205 pub edges_scanned: u64,
207 pub results_returned: u64,
209}
210
211pub struct GraphTransaction<'a> {
213 db: &'a GraphDB,
215 batch: WriteBatch,
217 modified_nodes: HashSet<String>,
219 modified_edges: HashSet<String>,
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize)]
225struct Schema {
226 node_labels: HashMap<String, LabelSchema>,
228 edge_labels: HashMap<String, LabelSchema>,
230 indexes: HashMap<String, IndexSchema>,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize)]
236struct LabelSchema {
237 name: String,
239 properties: HashMap<String, PropertySchema>,
241}
242
243#[derive(Debug, Clone, Serialize, Deserialize)]
245struct PropertySchema {
246 name: String,
248 data_type: PropertyType,
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
254enum PropertyType {
255 String,
256 Integer,
257 Float,
258 Boolean,
259 Date,
260 List,
261 Map,
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize)]
266struct IndexSchema {
267 name: String,
269 properties: Vec<String>,
271 index_type: IndexType,
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
277enum IndexType {
278 Single,
280 Composite,
282 FullText,
284}
285
286impl GraphDB {
287 pub async fn new(path: &str) -> Result<Self, GraphError> {
289 info!("Initializing RocksDB GraphDB at: {}", path);
290
291 let mut db_opts = Options::default();
293 db_opts.create_if_missing(true);
294 db_opts.create_missing_column_families(true);
295 db_opts.set_max_background_jobs(4);
296 db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
297 db_opts.set_target_file_size_base(64 * 1024 * 1024); db_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
299
300 let cf_names = vec![
302 "default",
303 "nodes",
304 "edges",
305 "indexes",
306 "schema",
307 "metadata",
308 ];
309
310 let cf_descriptors: Vec<ColumnFamilyDescriptor> = cf_names
311 .iter()
312 .map(|name| {
313 let mut cf_opts = Options::default();
314 cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
315 ColumnFamilyDescriptor::new(*name, cf_opts)
316 })
317 .collect();
318
319 let db = DB::open_cf_descriptors(&db_opts, path, cf_descriptors)
321 .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
322
323 let graphdb = Self {
324 db: Arc::new(db),
325 node_cache: Arc::new(DashMap::new()),
326 edge_cache: Arc::new(DashMap::new()),
327 schema: Arc::new(RwLock::new(Schema::default())),
328 };
329
330 graphdb.load_schema().await?;
332
333 info!("RocksDB GraphDB initialized successfully");
334 Ok(graphdb)
335 }
336
337 #[instrument(skip(self))]
339 pub async fn create_node(
340 &self,
341 id: Option<String>,
342 labels: Vec<String>,
343 properties: BTreeMap<String, PropertyValue>,
344 ) -> Result<String, GraphError> {
345 let node_id = id.unwrap_or_else(|| Uuid::new_v4().to_string());
346 let now = Utc::now();
347
348 let node = Node {
349 id: node_id.clone(),
350 labels: labels.clone(),
351 properties: properties.clone(),
352 created_at: now,
353 updated_at: now,
354 };
355
356 let node_data = bincode::serialize(&node)
358 .map_err(|e| GraphError::SerializationError(e.to_string()))?;
359
360 let cf_nodes = self.db.cf_handle("nodes")
362 .ok_or_else(|| GraphError::ColumnFamilyNotFound("nodes".to_string()))?;
363
364 let node_key = format!("node:{}", node_id);
365 self.db.put_cf(&cf_nodes, &node_key, node_data)
366 .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
367
368 self.update_schema_for_node(&node).await?;
370
371 self.create_indexes_for_node(&node).await?;
373
374 self.node_cache.insert(node_id.clone(), node);
376
377 info!("Created node: {}", node_id);
378 Ok(node_id)
379 }
380
381 #[instrument(skip(self))]
383 pub async fn get_node(&self, node_id: &str) -> Result<Option<Node>, GraphError> {
384 if let Some(node) = self.node_cache.get(node_id) {
386 return Ok(Some(node.clone()));
387 }
388
389 let cf_nodes = self.db.cf_handle("nodes")
391 .ok_or_else(|| GraphError::ColumnFamilyNotFound("nodes".to_string()))?;
392
393 let node_key = format!("node:{}", node_id);
394
395 if let Some(node_data) = self.db.get_cf(&cf_nodes, &node_key)
396 .map_err(|e| GraphError::RocksDBError(e.to_string()))? {
397
398 let node: Node = bincode::deserialize(&node_data)
399 .map_err(|e| GraphError::SerializationError(e.to_string()))?;
400
401 self.node_cache.insert(node_id.to_string(), node.clone());
403
404 Ok(Some(node))
405 } else {
406 Ok(None)
407 }
408 }
409
410 #[instrument(skip(self))]
412 pub async fn update_node(
413 &self,
414 node_id: &str,
415 properties: BTreeMap<String, PropertyValue>,
416 ) -> Result<(), GraphError> {
417 let mut node = self.get_node(node_id).await?
418 .ok_or_else(|| GraphError::NodeNotFound(node_id.to_string()))?;
419
420 node.properties.extend(properties);
421 node.updated_at = Utc::now();
422
423 let node_data = bincode::serialize(&node)
425 .map_err(|e| GraphError::SerializationError(e.to_string()))?;
426
427 let cf_nodes = self.db.cf_handle("nodes")
429 .ok_or_else(|| GraphError::ColumnFamilyNotFound("nodes".to_string()))?;
430
431 let node_key = format!("node:{}", node_id);
432 self.db.put_cf(&cf_nodes, &node_key, node_data)
433 .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
434
435 self.update_indexes_for_node(&node).await?;
437
438 self.node_cache.insert(node_id.to_string(), node);
440
441 info!("Updated node: {}", node_id);
442 Ok(())
443 }
444
445 #[instrument(skip(self))]
447 pub async fn delete_node(&self, node_id: &str) -> Result<(), GraphError> {
448 let cf_nodes = self.db.cf_handle("nodes")
450 .ok_or_else(|| GraphError::ColumnFamilyNotFound("nodes".to_string()))?;
451
452 let node_key = format!("node:{}", node_id);
453 self.db.delete_cf(&cf_nodes, &node_key)
454 .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
455
456 self.delete_edges_for_node(node_id).await?;
458
459 self.delete_indexes_for_node(node_id).await?;
461
462 self.node_cache.remove(node_id);
464
465 info!("Deleted node: {}", node_id);
466 Ok(())
467 }
468
469 #[instrument(skip(self))]
471 pub async fn create_edge(
472 &self,
473 id: Option<String>,
474 from_node: &str,
475 to_node: &str,
476 label: String,
477 properties: BTreeMap<String, PropertyValue>,
478 ) -> Result<String, GraphError> {
479 let edge_id = id.unwrap_or_else(|| Uuid::new_v4().to_string());
480 let now = Utc::now();
481
482 let edge = Edge {
483 id: edge_id.clone(),
484 from_node: from_node.to_string(),
485 to_node: to_node.to_string(),
486 label: label.clone(),
487 properties: properties.clone(),
488 created_at: now,
489 updated_at: now,
490 };
491
492 let edge_data = bincode::serialize(&edge)
494 .map_err(|e| GraphError::SerializationError(e.to_string()))?;
495
496 let cf_edges = self.db.cf_handle("edges")
498 .ok_or_else(|| GraphError::ColumnFamilyNotFound("edges".to_string()))?;
499
500 let edge_key = format!("edge:{}", edge_id);
501 self.db.put_cf(&cf_edges, &edge_key, edge_data)
502 .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
503
504 self.store_edge_mappings(&edge).await?;
506
507 self.update_schema_for_edge(&edge).await?;
509
510 self.create_indexes_for_edge(&edge).await?;
512
513 self.edge_cache.insert(edge_id.clone(), edge);
515
516 info!("Created edge: {} ({} -> {} : {})", edge_id, from_node, to_node, label);
517 Ok(edge_id)
518 }
519
520 #[instrument(skip(self))]
522 pub async fn get_edge(&self, edge_id: &str) -> Result<Option<Edge>, GraphError> {
523 if let Some(edge) = self.edge_cache.get(edge_id) {
525 return Ok(Some(edge.clone()));
526 }
527
528 let cf_edges = self.db.cf_handle("edges")
530 .ok_or_else(|| GraphError::ColumnFamilyNotFound("edges".to_string()))?;
531
532 let edge_key = format!("edge:{}", edge_id);
533
534 if let Some(edge_data) = self.db.get_cf(&cf_edges, &edge_key)
535 .map_err(|e| GraphError::RocksDBError(e.to_string()))? {
536
537 let edge: Edge = bincode::deserialize(&edge_data)
538 .map_err(|e| GraphError::SerializationError(e.to_string()))?;
539
540 self.edge_cache.insert(edge_id.to_string(), edge.clone());
542
543 Ok(Some(edge))
544 } else {
545 Ok(None)
546 }
547 }
548
549 #[instrument(skip(self))]
551 pub async fn get_edges_from_node(&self, node_id: &str, label: Option<&str>) -> Result<Vec<Edge>, GraphError> {
552 let cf_edges = self.db.cf_handle("edges")
553 .ok_or_else(|| GraphError::ColumnFamilyNotFound("edges".to_string()))?;
554
555 let prefix = format!("outgoing:{}:", node_id);
556 let mut edges = Vec::new();
557
558 let iter = self.db.iterator_cf(&cf_edges, IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward));
559 for item in iter {
560 let (key, value) = item.map_err(|e| GraphError::RocksDBError(e.to_string()))?;
561 if key.starts_with(prefix.as_bytes()) {
562 let edge_id = String::from_utf8(value.to_vec())
563 .map_err(|_| GraphError::InvalidData("Invalid edge ID".to_string()))?;
564
565 if let Some(edge) = self.get_edge(&edge_id).await? {
566 if label.is_none() || edge.label == label.unwrap() {
567 edges.push(edge);
568 }
569 }
570 }
571 }
572
573 Ok(edges)
574 }
575
576 #[instrument(skip(self))]
578 pub async fn get_edges_to_node(&self, node_id: &str, label: Option<&str>) -> Result<Vec<Edge>, GraphError> {
579 let cf_edges = self.db.cf_handle("edges")
580 .ok_or_else(|| GraphError::ColumnFamilyNotFound("edges".to_string()))?;
581
582 let prefix = format!("incoming:{}:", node_id);
583 let mut edges = Vec::new();
584
585 let iter = self.db.iterator_cf(&cf_edges, IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward));
586 for item in iter {
587 let (key, value) = item.map_err(|e| GraphError::RocksDBError(e.to_string()))?;
588 if key.starts_with(prefix.as_bytes()) {
589 let edge_id = String::from_utf8(value.to_vec())
590 .map_err(|_| GraphError::InvalidData("Invalid edge ID".to_string()))?;
591
592 if let Some(edge) = self.get_edge(&edge_id).await? {
593 if label.is_none() || edge.label == label.unwrap() {
594 edges.push(edge);
595 }
596 }
597 }
598 }
599
600 Ok(edges)
601 }
602
603 #[instrument(skip(self, query))]
605 pub async fn execute_query(&self, query: GraphQuery) -> Result<QueryResult, GraphError> {
606 let start_time = std::time::Instant::now();
607
608 let mut results = Vec::new();
611 let mut nodes_scanned = 0u64;
612 let mut edges_scanned = 0u64;
613
614 if let Some(node_pattern) = query.node_patterns.first() {
616 let nodes = self.query_nodes_by_pattern(node_pattern).await?;
617 nodes_scanned = nodes.len() as u64;
618
619 for node in nodes {
620 let mut row_data = BTreeMap::new();
621 if let Some(var) = &node_pattern.variable {
622 row_data.insert(var.clone(), PropertyValue::String(node.id.clone()));
623 }
624 results.push(ResultRow { data: row_data });
625 }
626 }
627
628 let execution_time = start_time.elapsed().as_millis() as u64;
629
630 let statistics = QueryStatistics {
631 execution_time_ms: execution_time,
632 nodes_scanned,
633 edges_scanned,
634 results_returned: results.len() as u64,
635 };
636
637 Ok(QueryResult {
638 columns: query.returns.iter().map(|r| format!("{:?}", r)).collect(),
639 rows: results,
640 statistics,
641 })
642 }
643
644 pub async fn begin_transaction(&self) -> GraphTransaction {
646 GraphTransaction {
647 db: self,
648 batch: WriteBatch::default(),
649 modified_nodes: HashSet::new(),
650 modified_edges: HashSet::new(),
651 }
652 }
653
654 #[instrument(skip(self))]
656 pub async fn scan_nodes(&self) -> Result<Vec<Node>, GraphError> {
657 let cf_nodes = self.db.cf_handle("nodes")
658 .ok_or_else(|| GraphError::ColumnFamilyNotFound("nodes".to_string()))?;
659
660 let mut nodes = Vec::new();
661 let iter = self.db.iterator_cf(&cf_nodes, IteratorMode::Start);
662
663 for item in iter {
664 let (key, value) = item.map_err(|e| GraphError::RocksDBError(e.to_string()))?;
665 let node_id = String::from_utf8(key.to_vec())
666 .map_err(|e| GraphError::SerializationError(e.to_string()))?;
667 let node: Node = bincode::deserialize(&value)
668 .map_err(|e| GraphError::SerializationError(e.to_string()))?;
669 nodes.push(node);
670 }
671
672 Ok(nodes)
673 }
674
675 #[instrument(skip(self))]
677 pub async fn scan_edges(&self) -> Result<Vec<Edge>, GraphError> {
678 let cf_edges = self.db.cf_handle("edges")
679 .ok_or_else(|| GraphError::ColumnFamilyNotFound("edges".to_string()))?;
680
681 let mut edges = Vec::new();
682 let iter = self.db.iterator_cf(&cf_edges, IteratorMode::Start);
683
684 for item in iter {
685 let (key, value) = item.map_err(|e| GraphError::RocksDBError(e.to_string()))?;
686 let edge_id = String::from_utf8(key.to_vec())
687 .map_err(|e| GraphError::SerializationError(e.to_string()))?;
688 let edge: Edge = bincode::deserialize(&value)
689 .map_err(|e| GraphError::SerializationError(e.to_string()))?;
690 edges.push(edge);
691 }
692
693 Ok(edges)
694 }
695
696 #[instrument(skip(self))]
698 pub async fn get_statistics(&self) -> Result<GraphStatistics, GraphError> {
699 let cf_nodes = self.db.cf_handle("nodes")
700 .ok_or_else(|| GraphError::ColumnFamilyNotFound("nodes".to_string()))?;
701 let cf_edges = self.db.cf_handle("edges")
702 .ok_or_else(|| GraphError::ColumnFamilyNotFound("edges".to_string()))?;
703
704 let mut node_count = 0u64;
706 let iter = self.db.iterator_cf(&cf_nodes, IteratorMode::Start);
707 for _ in iter {
708 node_count += 1;
709 }
710
711 let mut edge_count = 0u64;
713 let iter = self.db.iterator_cf(&cf_edges, IteratorMode::Start);
714 for _ in iter {
715 edge_count += 1;
716 }
717
718 Ok(GraphStatistics {
719 node_count,
720 edge_count,
721 cache_size: self.node_cache.len() + self.edge_cache.len(),
722 })
723 }
724
725 async fn load_schema(&self) -> Result<(), GraphError> {
727 let cf_schema = self.db.cf_handle("schema")
728 .ok_or_else(|| GraphError::ColumnFamilyNotFound("schema".to_string()))?;
729
730 if let Some(schema_data) = self.db.get_cf(&cf_schema, "schema")
731 .map_err(|e| GraphError::RocksDBError(e.to_string()))? {
732
733 let schema: Schema = bincode::deserialize(&schema_data)
734 .map_err(|e| GraphError::SerializationError(e.to_string()))?;
735
736 *self.schema.write().await = schema;
737 }
738
739 Ok(())
740 }
741
742 async fn update_schema_for_node(&self, node: &Node) -> Result<(), GraphError> {
743 let mut schema = self.schema.write().await;
744
745 for label in &node.labels {
746 let label_schema = schema.node_labels.entry(label.clone()).or_insert_with(|| LabelSchema {
747 name: label.clone(),
748 properties: HashMap::new(),
749 });
750
751 for (prop_name, prop_value) in &node.properties {
752 let prop_type = self.infer_property_type(prop_value);
753 label_schema.properties.insert(prop_name.clone(), PropertySchema {
754 name: prop_name.clone(),
755 data_type: prop_type,
756 });
757 }
758 }
759
760 self.persist_schema(&schema).await?;
762 Ok(())
763 }
764
765 async fn update_schema_for_edge(&self, edge: &Edge) -> Result<(), GraphError> {
766 let mut schema = self.schema.write().await;
767
768 let label_schema = schema.edge_labels.entry(edge.label.clone()).or_insert_with(|| LabelSchema {
769 name: edge.label.clone(),
770 properties: HashMap::new(),
771 });
772
773 for (prop_name, prop_value) in &edge.properties {
774 let prop_type = self.infer_property_type(prop_value);
775 label_schema.properties.insert(prop_name.clone(), PropertySchema {
776 name: prop_name.clone(),
777 data_type: prop_type,
778 });
779 }
780
781 self.persist_schema(&schema).await?;
783 Ok(())
784 }
785
786 fn infer_property_type(&self, value: &PropertyValue) -> PropertyType {
787 match value {
788 PropertyValue::String(_) => PropertyType::String,
789 PropertyValue::Integer(_) => PropertyType::Integer,
790 PropertyValue::Float(_) => PropertyType::Float,
791 PropertyValue::Boolean(_) => PropertyType::Boolean,
792 PropertyValue::Date(_) => PropertyType::Date,
793 PropertyValue::List(_) => PropertyType::List,
794 PropertyValue::Map(_) => PropertyType::Map,
795 }
796 }
797
798 async fn persist_schema(&self, schema: &Schema) -> Result<(), GraphError> {
799 let cf_schema = self.db.cf_handle("schema")
800 .ok_or_else(|| GraphError::ColumnFamilyNotFound("schema".to_string()))?;
801
802 let schema_data = bincode::serialize(schema)
803 .map_err(|e| GraphError::SerializationError(e.to_string()))?;
804
805 self.db.put_cf(&cf_schema, "schema", schema_data)
806 .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
807
808 Ok(())
809 }
810
811 async fn store_edge_mappings(&self, edge: &Edge) -> Result<(), GraphError> {
812 let cf_edges = self.db.cf_handle("edges")
813 .ok_or_else(|| GraphError::ColumnFamilyNotFound("edges".to_string()))?;
814
815 let outgoing_key = format!("outgoing:{}:{}", edge.from_node, edge.label);
817 self.db.put_cf(&cf_edges, &outgoing_key, edge.id.as_bytes())
818 .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
819
820 let incoming_key = format!("incoming:{}:{}", edge.to_node, edge.label);
822 self.db.put_cf(&cf_edges, &incoming_key, edge.id.as_bytes())
823 .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
824
825 Ok(())
826 }
827
828 async fn create_indexes_for_node(&self, node: &Node) -> Result<(), GraphError> {
829 for (prop_name, prop_value) in &node.properties {
831 let index_key = format!("prop:node:{}:{}:{}", prop_name, self.property_value_to_string(prop_value), node.id);
832 self.store_index(&index_key, &node.id).await?;
833 }
834
835 for label in &node.labels {
837 let index_key = format!("label:node:{}:{}", label, node.id);
838 self.store_index(&index_key, &node.id).await?;
839 }
840
841 Ok(())
842 }
843
844 async fn create_indexes_for_edge(&self, edge: &Edge) -> Result<(), GraphError> {
845 for (prop_name, prop_value) in &edge.properties {
847 let index_key = format!("prop:edge:{}:{}:{}", prop_name, self.property_value_to_string(prop_value), edge.id);
848 self.store_index(&index_key, &edge.id).await?;
849 }
850
851 let index_key = format!("label:edge:{}:{}", edge.label, edge.id);
853 self.store_index(&index_key, &edge.id).await?;
854
855 Ok(())
856 }
857
858 async fn store_index(&self, index_key: &str, entity_id: &str) -> Result<(), GraphError> {
859 let cf_indexes = self.db.cf_handle("indexes")
860 .ok_or_else(|| GraphError::ColumnFamilyNotFound("indexes".to_string()))?;
861
862 self.db.put_cf(&cf_indexes, index_key, entity_id)
863 .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
864
865 Ok(())
866 }
867
868 fn property_value_to_string(&self, value: &PropertyValue) -> String {
869 match value {
870 PropertyValue::String(s) => s.clone(),
871 PropertyValue::Integer(i) => i.to_string(),
872 PropertyValue::Float(f) => f.to_string(),
873 PropertyValue::Boolean(b) => b.to_string(),
874 PropertyValue::Date(dt) => dt.to_rfc3339(),
875 PropertyValue::List(_) => "[LIST]".to_string(),
876 PropertyValue::Map(_) => "[MAP]".to_string(),
877 }
878 }
879
880 async fn query_nodes_by_pattern(&self, pattern: &NodePattern) -> Result<Vec<Node>, GraphError> {
881 let cf_nodes = self.db.cf_handle("nodes")
884 .ok_or_else(|| GraphError::ColumnFamilyNotFound("nodes".to_string()))?;
885
886 let mut nodes = Vec::new();
887 let iter = self.db.iterator_cf(&cf_nodes, IteratorMode::Start);
888
889 for item in iter {
890 let (_, value) = item.map_err(|e| GraphError::RocksDBError(e.to_string()))?;
891 let node: Node = bincode::deserialize(&value)
892 .map_err(|e| GraphError::SerializationError(e.to_string()))?;
893
894 if self.node_matches_pattern(&node, pattern) {
896 nodes.push(node);
897 }
898 }
899
900 Ok(nodes)
901 }
902
903 fn node_matches_pattern(&self, node: &Node, pattern: &NodePattern) -> bool {
904 if !pattern.labels.is_empty() {
906 let node_labels: HashSet<String> = node.labels.iter().cloned().collect();
907 let pattern_labels: HashSet<String> = pattern.labels.iter().cloned().collect();
908 if !pattern_labels.is_subset(&node_labels) {
909 return false;
910 }
911 }
912
913 for (prop_name, condition) in &pattern.properties {
915 if let Some(node_value) = node.properties.get(prop_name) {
916 if !self.property_matches_condition(node_value, condition) {
917 return false;
918 }
919 } else {
920 return false;
921 }
922 }
923
924 true
925 }
926
927 fn property_matches_condition(&self, value: &PropertyValue, condition: &PropertyCondition) -> bool {
928 match condition.operator {
929 ComparisonOperator::Equal => value == &condition.value,
930 ComparisonOperator::NotEqual => value != &condition.value,
931 _ => false,
933 }
934 }
935
936 async fn delete_edges_for_node(&self, node_id: &str) -> Result<(), GraphError> {
937 let outgoing_edges = self.get_edges_from_node(node_id, None).await?;
939 for edge in outgoing_edges {
940 self.delete_edge(&edge.id).await?;
941 }
942
943 let incoming_edges = self.get_edges_to_node(node_id, None).await?;
945 for edge in incoming_edges {
946 self.delete_edge(&edge.id).await?;
947 }
948
949 Ok(())
950 }
951
952 async fn delete_edge(&self, edge_id: &str) -> Result<(), GraphError> {
953 let cf_edges = self.db.cf_handle("edges")
954 .ok_or_else(|| GraphError::ColumnFamilyNotFound("edges".to_string()))?;
955
956 let edge_key = format!("edge:{}", edge_id);
957 self.db.delete_cf(&cf_edges, &edge_key)
958 .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
959
960 self.edge_cache.remove(edge_id);
961 Ok(())
962 }
963
964 async fn update_indexes_for_node(&self, node: &Node) -> Result<(), GraphError> {
965 self.delete_indexes_for_node(&node.id).await?;
967 self.create_indexes_for_node(node).await?;
968 Ok(())
969 }
970
971 async fn delete_indexes_for_node(&self, node_id: &str) -> Result<(), GraphError> {
972 Ok(())
975 }
976}
977
978impl<'a> GraphTransaction<'a> {
980 pub async fn create_node(
982 &mut self,
983 id: Option<String>,
984 labels: Vec<String>,
985 properties: BTreeMap<String, PropertyValue>,
986 ) -> Result<String, GraphError> {
987 let node_id = id.unwrap_or_else(|| Uuid::new_v4().to_string());
988 let now = Utc::now();
989
990 let node = Node {
991 id: node_id.clone(),
992 labels,
993 properties,
994 created_at: now,
995 updated_at: now,
996 };
997
998 let node_data = bincode::serialize(&node)
999 .map_err(|e| GraphError::SerializationError(e.to_string()))?;
1000
1001 let cf_nodes = self.db.db.cf_handle("nodes")
1002 .ok_or_else(|| GraphError::ColumnFamilyNotFound("nodes".to_string()))?;
1003
1004 let node_key = format!("node:{}", node_id);
1005 self.batch.put_cf(&cf_nodes, &node_key, node_data);
1006
1007 self.modified_nodes.insert(node_id.clone());
1008
1009 Ok(node_id)
1010 }
1011
1012 pub async fn create_edge(
1014 &mut self,
1015 id: Option<String>,
1016 from_node: &str,
1017 to_node: &str,
1018 label: String,
1019 properties: BTreeMap<String, PropertyValue>,
1020 ) -> Result<String, GraphError> {
1021 let edge_id = id.unwrap_or_else(|| Uuid::new_v4().to_string());
1022 let now = Utc::now();
1023
1024 let edge = Edge {
1025 id: edge_id.clone(),
1026 from_node: from_node.to_string(),
1027 to_node: to_node.to_string(),
1028 label: label.clone(),
1029 properties,
1030 created_at: now,
1031 updated_at: now,
1032 };
1033
1034 let edge_data = bincode::serialize(&edge)
1035 .map_err(|e| GraphError::SerializationError(e.to_string()))?;
1036
1037 let cf_edges = self.db.db.cf_handle("edges")
1038 .ok_or_else(|| GraphError::ColumnFamilyNotFound("edges".to_string()))?;
1039
1040 let edge_key = format!("edge:{}", edge_id);
1041 self.batch.put_cf(&cf_edges, &edge_key, edge_data);
1042
1043 let outgoing_key = format!("outgoing:{}:{}", from_node, label);
1045 let incoming_key = format!("incoming:{}:{}", to_node, label);
1046 self.batch.put_cf(&cf_edges, &outgoing_key, edge_id.as_bytes());
1047 self.batch.put_cf(&cf_edges, &incoming_key, edge_id.as_bytes());
1048
1049 self.modified_edges.insert(edge_id.clone());
1050
1051 Ok(edge_id)
1052 }
1053
1054 pub async fn commit(self) -> Result<(), GraphError> {
1056 self.db.db.write(self.batch)
1057 .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
1058
1059 for node_id in self.modified_nodes {
1061 if let Some(node) = self.db.get_node(&node_id).await? {
1062 self.db.node_cache.insert(node_id, node);
1063 }
1064 }
1065
1066 for edge_id in self.modified_edges {
1067 if let Some(edge) = self.db.get_edge(&edge_id).await? {
1068 self.db.edge_cache.insert(edge_id, edge);
1069 }
1070 }
1071
1072 Ok(())
1073 }
1074
1075 pub fn rollback(self) -> Result<(), GraphError> {
1077 Ok(())
1079 }
1080}
1081
1082#[derive(Debug, Clone, Serialize, Deserialize)]
1084pub struct GraphStatistics {
1085 pub node_count: u64,
1086 pub edge_count: u64,
1087 pub cache_size: usize,
1088}
1089
1090#[derive(thiserror::Error, Debug)]
1092pub enum GraphError {
1093 #[error("RocksDB error: {0}")]
1094 RocksDBError(String),
1095
1096 #[error("Serialization error: {0}")]
1097 SerializationError(String),
1098
1099 #[error("Column family not found: {0}")]
1100 ColumnFamilyNotFound(String),
1101
1102 #[error("Node not found: {0}")]
1103 NodeNotFound(String),
1104
1105 #[error("Edge not found: {0}")]
1106 EdgeNotFound(String),
1107
1108 #[error("Invalid data: {0}")]
1109 InvalidData(String),
1110
1111 #[error("Query error: {0}")]
1112 QueryError(String),
1113
1114 #[error("Transaction error: {0}")]
1115 TransactionError(String),
1116}
1117
1118impl Default for Schema {
1119 fn default() -> Self {
1120 Self {
1121 node_labels: HashMap::new(),
1122 edge_labels: HashMap::new(),
1123 indexes: HashMap::new(),
1124 }
1125 }
1126}
1127
1128#[cfg(test)]
1129mod tests {
1130 use super::*;
1131 use tempfile::tempdir;
1132
1133 #[tokio::test]
1134 async fn test_graphdb_creation() {
1135 let temp_dir = tempdir().unwrap();
1136 let graphdb = GraphDB::new(temp_dir.path().to_str().unwrap()).await;
1137 assert!(graphdb.is_ok(), "GraphDB should be created successfully");
1138 }
1139
1140 #[tokio::test]
1141 async fn test_node_operations() {
1142 let temp_dir = tempdir().unwrap();
1143 let graphdb = GraphDB::new(temp_dir.path().to_str().unwrap()).await.unwrap();
1144
1145 let mut properties = BTreeMap::new();
1147 properties.insert("name".to_string(), PropertyValue::String("Alice".to_string()));
1148 properties.insert("age".to_string(), PropertyValue::Integer(30));
1149
1150 let node_id = graphdb.create_node(
1151 None,
1152 vec!["Person".to_string()],
1153 properties,
1154 ).await.unwrap();
1155
1156 let node = graphdb.get_node(&node_id).await.unwrap().unwrap();
1158 assert_eq!(node.labels, vec!["Person"]);
1159 assert_eq!(node.properties["name"], PropertyValue::String("Alice".to_string()));
1160
1161 let mut update_props = BTreeMap::new();
1163 update_props.insert("city".to_string(), PropertyValue::String("Tokyo".to_string()));
1164 graphdb.update_node(&node_id, update_props).await.unwrap();
1165
1166 let updated_node = graphdb.get_node(&node_id).await.unwrap().unwrap();
1168 assert_eq!(updated_node.properties["city"], PropertyValue::String("Tokyo".to_string()));
1169 }
1170
1171 #[tokio::test]
1172 async fn test_edge_operations() {
1173 let temp_dir = tempdir().unwrap();
1174 let graphdb = GraphDB::new(temp_dir.path().to_str().unwrap()).await.unwrap();
1175
1176 let node1_id = graphdb.create_node(
1178 None,
1179 vec!["Person".to_string()],
1180 BTreeMap::new(),
1181 ).await.unwrap();
1182
1183 let node2_id = graphdb.create_node(
1184 None,
1185 vec!["Person".to_string()],
1186 BTreeMap::new(),
1187 ).await.unwrap();
1188
1189 let mut edge_props = BTreeMap::new();
1191 edge_props.insert("since".to_string(), PropertyValue::Integer(2020));
1192
1193 let edge_id = graphdb.create_edge(
1194 None,
1195 &node1_id,
1196 &node2_id,
1197 "KNOWS".to_string(),
1198 edge_props,
1199 ).await.unwrap();
1200
1201 let edge = graphdb.get_edge(&edge_id).await.unwrap().unwrap();
1203 assert_eq!(edge.from_node, node1_id);
1204 assert_eq!(edge.to_node, node2_id);
1205 assert_eq!(edge.label, "KNOWS");
1206
1207 let outgoing = graphdb.get_edges_from_node(&node1_id, Some("KNOWS")).await.unwrap();
1209 assert_eq!(outgoing.len(), 1);
1210 assert_eq!(outgoing[0].id, edge_id);
1211 }
1212
1213 #[tokio::test]
1214 async fn test_transaction() {
1215 let temp_dir = tempdir().unwrap();
1216 let graphdb = GraphDB::new(temp_dir.path().to_str().unwrap()).await.unwrap();
1217
1218 let mut tx = graphdb.begin_transaction().await;
1220
1221 let node1_id = tx.create_node(
1223 None,
1224 vec!["Person".to_string()],
1225 BTreeMap::new(),
1226 ).await.unwrap();
1227
1228 let node2_id = tx.create_node(
1229 None,
1230 vec!["Person".to_string()],
1231 BTreeMap::new(),
1232 ).await.unwrap();
1233
1234 tx.commit().await.unwrap();
1236
1237 assert!(graphdb.get_node(&node1_id).await.unwrap().is_some());
1239 assert!(graphdb.get_node(&node2_id).await.unwrap().is_some());
1240 }
1241}