kotoba_graphdb/
lib.rs

1//! `kotoba-graphdb`
2//!
3//! RocksDB-based Graph Database for KotobaDB.
4//! Provides efficient storage and querying of graph data structures.
5
6use std::collections::{HashMap, HashSet, BTreeMap};
7use std::sync::Arc;
8use tokio::sync::RwLock;
9use rocksdb::{DB, ColumnFamilyDescriptor, Options, WriteBatch, IteratorMode};
10use serde::{Deserialize, Serialize};
11use tracing::{info, warn, error, instrument};
12use dashmap::DashMap;
13use bincode;
14use uuid::Uuid;
15use chrono::{DateTime, Utc};
16
17/// Main GraphDB instance
18pub struct GraphDB {
19    /// RocksDB instance
20    db: Arc<DB>,
21    /// Node cache
22    node_cache: Arc<DashMap<String, Node>>,
23    /// Edge cache
24    edge_cache: Arc<DashMap<String, Edge>>,
25    /// Schema information
26    schema: Arc<RwLock<Schema>>,
27}
28
29/// Node (Vertex) in the graph
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct Node {
32    /// Unique node ID
33    pub id: String,
34    /// Node labels
35    pub labels: Vec<String>,
36    /// Node properties
37    pub properties: BTreeMap<String, PropertyValue>,
38    /// Creation timestamp
39    pub created_at: DateTime<Utc>,
40    /// Last update timestamp
41    pub updated_at: DateTime<Utc>,
42}
43
44/// Edge (Relationship) in the graph
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct Edge {
47    /// Unique edge ID
48    pub id: String,
49    /// Source node ID
50    pub from_node: String,
51    /// Target node ID
52    pub to_node: String,
53    /// Edge label
54    pub label: String,
55    /// Edge properties
56    pub properties: BTreeMap<String, PropertyValue>,
57    /// Creation timestamp
58    pub created_at: DateTime<Utc>,
59    /// Last update timestamp
60    pub updated_at: DateTime<Utc>,
61}
62
63/// Property value types
64#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
65#[serde(untagged)]
66pub enum PropertyValue {
67    /// String value
68    String(String),
69    /// Integer value
70    Integer(i64),
71    /// Float value
72    Float(f64),
73    /// Boolean value
74    Boolean(bool),
75    /// Date/time value
76    Date(DateTime<Utc>),
77    /// List of values
78    List(Vec<PropertyValue>),
79    /// Map of values
80    Map(BTreeMap<String, PropertyValue>),
81}
82
83/// Graph query
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct GraphQuery {
86    /// Node patterns to match
87    pub node_patterns: Vec<NodePattern>,
88    /// Edge patterns to match
89    pub edge_patterns: Vec<EdgePattern>,
90    /// WHERE conditions
91    pub conditions: Vec<QueryCondition>,
92    /// RETURN specifications
93    pub returns: Vec<ReturnSpec>,
94    /// LIMIT clause
95    pub limit: Option<usize>,
96    /// SKIP clause
97    pub skip: Option<usize>,
98}
99
100/// Node pattern for matching
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct NodePattern {
103    /// Variable name for the node
104    pub variable: Option<String>,
105    /// Node labels to match
106    pub labels: Vec<String>,
107    /// Property conditions
108    pub properties: BTreeMap<String, PropertyCondition>,
109}
110
111/// Edge pattern for matching
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct EdgePattern {
114    /// Variable name for the edge
115    pub variable: Option<String>,
116    /// Edge label to match
117    pub label: Option<String>,
118    /// Property conditions
119    pub properties: BTreeMap<String, PropertyCondition>,
120    /// Source node variable
121    pub from_variable: Option<String>,
122    /// Target node variable
123    pub to_variable: Option<String>,
124}
125
126/// Property condition
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct PropertyCondition {
129    /// Comparison operator
130    pub operator: ComparisonOperator,
131    /// Value to compare against
132    pub value: PropertyValue,
133}
134
135/// Comparison operators
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub enum ComparisonOperator {
138    Equal,
139    NotEqual,
140    GreaterThan,
141    LessThan,
142    GreaterEqual,
143    LessEqual,
144    Contains,
145    StartsWith,
146    EndsWith,
147    In,
148}
149
150/// Query condition
151#[derive(Debug, Clone, Serialize, Deserialize)]
152pub enum QueryCondition {
153    /// Property comparison
154    Property {
155        variable: String,
156        property: String,
157        condition: PropertyCondition,
158    },
159    /// Logical AND
160    And(Box<QueryCondition>, Box<QueryCondition>),
161    /// Logical OR
162    Or(Box<QueryCondition>, Box<QueryCondition>),
163    /// Logical NOT
164    Not(Box<QueryCondition>),
165}
166
167/// Return specification
168#[derive(Debug, Clone, Serialize, Deserialize)]
169pub enum ReturnSpec {
170    /// Return node
171    Node(String),
172    /// Return edge
173    Edge(String),
174    /// Return property
175    Property { variable: String, property: String },
176    /// Return count
177    Count,
178}
179
180/// Query result
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct QueryResult {
183    /// Column names
184    pub columns: Vec<String>,
185    /// Result rows
186    pub rows: Vec<ResultRow>,
187    /// Query statistics
188    pub statistics: QueryStatistics,
189}
190
191/// Result row
192#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct ResultRow {
194    /// Row data
195    pub data: BTreeMap<String, PropertyValue>,
196}
197
198/// Query statistics
199#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct QueryStatistics {
201    /// Execution time in milliseconds
202    pub execution_time_ms: u64,
203    /// Number of nodes scanned
204    pub nodes_scanned: u64,
205    /// Number of edges scanned
206    pub edges_scanned: u64,
207    /// Number of results returned
208    pub results_returned: u64,
209}
210
211/// Graph transaction
212pub struct GraphTransaction<'a> {
213    /// GraphDB reference
214    db: &'a GraphDB,
215    /// Write batch for atomic operations
216    batch: WriteBatch,
217    /// Modified nodes
218    modified_nodes: HashSet<String>,
219    /// Modified edges
220    modified_edges: HashSet<String>,
221}
222
223/// Database schema information
224#[derive(Debug, Clone, Serialize, Deserialize)]
225struct Schema {
226    /// Node labels and their properties
227    node_labels: HashMap<String, LabelSchema>,
228    /// Edge labels and their properties
229    edge_labels: HashMap<String, LabelSchema>,
230    /// Property indexes
231    indexes: HashMap<String, IndexSchema>,
232}
233
234/// Label schema
235#[derive(Debug, Clone, Serialize, Deserialize)]
236struct LabelSchema {
237    /// Label name
238    name: String,
239    /// Property definitions
240    properties: HashMap<String, PropertySchema>,
241}
242
243/// Property schema
244#[derive(Debug, Clone, Serialize, Deserialize)]
245struct PropertySchema {
246    /// Property name
247    name: String,
248    /// Property type
249    data_type: PropertyType,
250}
251
252/// Property types
253#[derive(Debug, Clone, Serialize, Deserialize)]
254enum PropertyType {
255    String,
256    Integer,
257    Float,
258    Boolean,
259    Date,
260    List,
261    Map,
262}
263
264/// Index schema
265#[derive(Debug, Clone, Serialize, Deserialize)]
266struct IndexSchema {
267    /// Index name
268    name: String,
269    /// Indexed properties
270    properties: Vec<String>,
271    /// Index type
272    index_type: IndexType,
273}
274
275/// Index types
276#[derive(Debug, Clone, Serialize, Deserialize)]
277enum IndexType {
278    /// Single property index
279    Single,
280    /// Composite property index
281    Composite,
282    /// Full-text search index
283    FullText,
284}
285
286impl GraphDB {
287    /// Create a new GraphDB instance
288    pub async fn new(path: &str) -> Result<Self, GraphError> {
289        info!("Initializing RocksDB GraphDB at: {}", path);
290
291        // Configure RocksDB options
292        let mut db_opts = Options::default();
293        db_opts.create_if_missing(true);
294        db_opts.create_missing_column_families(true);
295        db_opts.set_max_background_jobs(4);
296        db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
297        db_opts.set_target_file_size_base(64 * 1024 * 1024); // 64MB
298        db_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
299
300        // Define column families
301        let cf_names = vec![
302            "default",
303            "nodes",
304            "edges",
305            "indexes",
306            "schema",
307            "metadata",
308        ];
309
310        let cf_descriptors: Vec<ColumnFamilyDescriptor> = cf_names
311            .iter()
312            .map(|name| {
313                let mut cf_opts = Options::default();
314                cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
315                ColumnFamilyDescriptor::new(*name, cf_opts)
316            })
317            .collect();
318
319        // Open database
320        let db = DB::open_cf_descriptors(&db_opts, path, cf_descriptors)
321            .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
322
323        let graphdb = Self {
324            db: Arc::new(db),
325            node_cache: Arc::new(DashMap::new()),
326            edge_cache: Arc::new(DashMap::new()),
327            schema: Arc::new(RwLock::new(Schema::default())),
328        };
329
330        // Load schema
331        graphdb.load_schema().await?;
332
333        info!("RocksDB GraphDB initialized successfully");
334        Ok(graphdb)
335    }
336
337    /// Create a new node
338    #[instrument(skip(self))]
339    pub async fn create_node(
340        &self,
341        id: Option<String>,
342        labels: Vec<String>,
343        properties: BTreeMap<String, PropertyValue>,
344    ) -> Result<String, GraphError> {
345        let node_id = id.unwrap_or_else(|| Uuid::new_v4().to_string());
346        let now = Utc::now();
347
348        let node = Node {
349            id: node_id.clone(),
350            labels: labels.clone(),
351            properties: properties.clone(),
352            created_at: now,
353            updated_at: now,
354        };
355
356        // Serialize node
357        let node_data = bincode::serialize(&node)
358            .map_err(|e| GraphError::SerializationError(e.to_string()))?;
359
360        // Store in nodes column family
361        let cf_nodes = self.db.cf_handle("nodes")
362            .ok_or_else(|| GraphError::ColumnFamilyNotFound("nodes".to_string()))?;
363
364        let node_key = format!("node:{}", node_id);
365        self.db.put_cf(&cf_nodes, &node_key, node_data)
366            .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
367
368        // Update schema
369        self.update_schema_for_node(&node).await?;
370
371        // Create indexes
372        self.create_indexes_for_node(&node).await?;
373
374        // Cache the node
375        self.node_cache.insert(node_id.clone(), node);
376
377        info!("Created node: {}", node_id);
378        Ok(node_id)
379    }
380
381    /// Get a node by ID
382    #[instrument(skip(self))]
383    pub async fn get_node(&self, node_id: &str) -> Result<Option<Node>, GraphError> {
384        // Check cache first
385        if let Some(node) = self.node_cache.get(node_id) {
386            return Ok(Some(node.clone()));
387        }
388
389        // Load from database
390        let cf_nodes = self.db.cf_handle("nodes")
391            .ok_or_else(|| GraphError::ColumnFamilyNotFound("nodes".to_string()))?;
392
393        let node_key = format!("node:{}", node_id);
394
395        if let Some(node_data) = self.db.get_cf(&cf_nodes, &node_key)
396            .map_err(|e| GraphError::RocksDBError(e.to_string()))? {
397
398            let node: Node = bincode::deserialize(&node_data)
399                .map_err(|e| GraphError::SerializationError(e.to_string()))?;
400
401            // Cache the node
402            self.node_cache.insert(node_id.to_string(), node.clone());
403
404            Ok(Some(node))
405        } else {
406            Ok(None)
407        }
408    }
409
410    /// Update a node
411    #[instrument(skip(self))]
412    pub async fn update_node(
413        &self,
414        node_id: &str,
415        properties: BTreeMap<String, PropertyValue>,
416    ) -> Result<(), GraphError> {
417        let mut node = self.get_node(node_id).await?
418            .ok_or_else(|| GraphError::NodeNotFound(node_id.to_string()))?;
419
420        node.properties.extend(properties);
421        node.updated_at = Utc::now();
422
423        // Serialize updated node
424        let node_data = bincode::serialize(&node)
425            .map_err(|e| GraphError::SerializationError(e.to_string()))?;
426
427        // Store updated node
428        let cf_nodes = self.db.cf_handle("nodes")
429            .ok_or_else(|| GraphError::ColumnFamilyNotFound("nodes".to_string()))?;
430
431        let node_key = format!("node:{}", node_id);
432        self.db.put_cf(&cf_nodes, &node_key, node_data)
433            .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
434
435        // Update indexes
436        self.update_indexes_for_node(&node).await?;
437
438        // Update cache
439        self.node_cache.insert(node_id.to_string(), node);
440
441        info!("Updated node: {}", node_id);
442        Ok(())
443    }
444
445    /// Delete a node
446    #[instrument(skip(self))]
447    pub async fn delete_node(&self, node_id: &str) -> Result<(), GraphError> {
448        // Delete from database
449        let cf_nodes = self.db.cf_handle("nodes")
450            .ok_or_else(|| GraphError::ColumnFamilyNotFound("nodes".to_string()))?;
451
452        let node_key = format!("node:{}", node_id);
453        self.db.delete_cf(&cf_nodes, &node_key)
454            .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
455
456        // Delete related edges
457        self.delete_edges_for_node(node_id).await?;
458
459        // Delete from indexes
460        self.delete_indexes_for_node(node_id).await?;
461
462        // Remove from cache
463        self.node_cache.remove(node_id);
464
465        info!("Deleted node: {}", node_id);
466        Ok(())
467    }
468
469    /// Create an edge
470    #[instrument(skip(self))]
471    pub async fn create_edge(
472        &self,
473        id: Option<String>,
474        from_node: &str,
475        to_node: &str,
476        label: String,
477        properties: BTreeMap<String, PropertyValue>,
478    ) -> Result<String, GraphError> {
479        let edge_id = id.unwrap_or_else(|| Uuid::new_v4().to_string());
480        let now = Utc::now();
481
482        let edge = Edge {
483            id: edge_id.clone(),
484            from_node: from_node.to_string(),
485            to_node: to_node.to_string(),
486            label: label.clone(),
487            properties: properties.clone(),
488            created_at: now,
489            updated_at: now,
490        };
491
492        // Serialize edge
493        let edge_data = bincode::serialize(&edge)
494            .map_err(|e| GraphError::SerializationError(e.to_string()))?;
495
496        // Store in edges column family
497        let cf_edges = self.db.cf_handle("edges")
498            .ok_or_else(|| GraphError::ColumnFamilyNotFound("edges".to_string()))?;
499
500        let edge_key = format!("edge:{}", edge_id);
501        self.db.put_cf(&cf_edges, &edge_key, edge_data)
502            .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
503
504        // Store reverse mappings for efficient traversal
505        self.store_edge_mappings(&edge).await?;
506
507        // Update schema
508        self.update_schema_for_edge(&edge).await?;
509
510        // Create indexes
511        self.create_indexes_for_edge(&edge).await?;
512
513        // Cache the edge
514        self.edge_cache.insert(edge_id.clone(), edge);
515
516        info!("Created edge: {} ({} -> {} : {})", edge_id, from_node, to_node, label);
517        Ok(edge_id)
518    }
519
520    /// Get an edge by ID
521    #[instrument(skip(self))]
522    pub async fn get_edge(&self, edge_id: &str) -> Result<Option<Edge>, GraphError> {
523        // Check cache first
524        if let Some(edge) = self.edge_cache.get(edge_id) {
525            return Ok(Some(edge.clone()));
526        }
527
528        // Load from database
529        let cf_edges = self.db.cf_handle("edges")
530            .ok_or_else(|| GraphError::ColumnFamilyNotFound("edges".to_string()))?;
531
532        let edge_key = format!("edge:{}", edge_id);
533
534        if let Some(edge_data) = self.db.get_cf(&cf_edges, &edge_key)
535            .map_err(|e| GraphError::RocksDBError(e.to_string()))? {
536
537            let edge: Edge = bincode::deserialize(&edge_data)
538                .map_err(|e| GraphError::SerializationError(e.to_string()))?;
539
540            // Cache the edge
541            self.edge_cache.insert(edge_id.to_string(), edge.clone());
542
543            Ok(Some(edge))
544        } else {
545            Ok(None)
546        }
547    }
548
549    /// Get edges from a node
550    #[instrument(skip(self))]
551    pub async fn get_edges_from_node(&self, node_id: &str, label: Option<&str>) -> Result<Vec<Edge>, GraphError> {
552        let cf_edges = self.db.cf_handle("edges")
553            .ok_or_else(|| GraphError::ColumnFamilyNotFound("edges".to_string()))?;
554
555        let prefix = format!("outgoing:{}:", node_id);
556        let mut edges = Vec::new();
557
558        let iter = self.db.iterator_cf(&cf_edges, IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward));
559        for item in iter {
560            let (key, value) = item.map_err(|e| GraphError::RocksDBError(e.to_string()))?;
561            if key.starts_with(prefix.as_bytes()) {
562                let edge_id = String::from_utf8(value.to_vec())
563                    .map_err(|_| GraphError::InvalidData("Invalid edge ID".to_string()))?;
564
565                if let Some(edge) = self.get_edge(&edge_id).await? {
566                    if label.is_none() || edge.label == label.unwrap() {
567                        edges.push(edge);
568                    }
569                }
570            }
571        }
572
573        Ok(edges)
574    }
575
576    /// Get edges to a node
577    #[instrument(skip(self))]
578    pub async fn get_edges_to_node(&self, node_id: &str, label: Option<&str>) -> Result<Vec<Edge>, GraphError> {
579        let cf_edges = self.db.cf_handle("edges")
580            .ok_or_else(|| GraphError::ColumnFamilyNotFound("edges".to_string()))?;
581
582        let prefix = format!("incoming:{}:", node_id);
583        let mut edges = Vec::new();
584
585        let iter = self.db.iterator_cf(&cf_edges, IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward));
586        for item in iter {
587            let (key, value) = item.map_err(|e| GraphError::RocksDBError(e.to_string()))?;
588            if key.starts_with(prefix.as_bytes()) {
589                let edge_id = String::from_utf8(value.to_vec())
590                    .map_err(|_| GraphError::InvalidData("Invalid edge ID".to_string()))?;
591
592                if let Some(edge) = self.get_edge(&edge_id).await? {
593                    if label.is_none() || edge.label == label.unwrap() {
594                        edges.push(edge);
595                    }
596                }
597            }
598        }
599
600        Ok(edges)
601    }
602
603    /// Execute a graph query
604    #[instrument(skip(self, query))]
605    pub async fn execute_query(&self, query: GraphQuery) -> Result<QueryResult, GraphError> {
606        let start_time = std::time::Instant::now();
607
608        // This is a simplified implementation
609        // In a full implementation, this would parse the query and execute it efficiently
610        let mut results = Vec::new();
611        let mut nodes_scanned = 0u64;
612        let mut edges_scanned = 0u64;
613
614        // For now, return all nodes that match the first node pattern
615        if let Some(node_pattern) = query.node_patterns.first() {
616            let nodes = self.query_nodes_by_pattern(node_pattern).await?;
617            nodes_scanned = nodes.len() as u64;
618
619            for node in nodes {
620                let mut row_data = BTreeMap::new();
621                if let Some(var) = &node_pattern.variable {
622                    row_data.insert(var.clone(), PropertyValue::String(node.id.clone()));
623                }
624                results.push(ResultRow { data: row_data });
625            }
626        }
627
628        let execution_time = start_time.elapsed().as_millis() as u64;
629
630        let statistics = QueryStatistics {
631            execution_time_ms: execution_time,
632            nodes_scanned,
633            edges_scanned,
634            results_returned: results.len() as u64,
635        };
636
637        Ok(QueryResult {
638            columns: query.returns.iter().map(|r| format!("{:?}", r)).collect(),
639            rows: results,
640            statistics,
641        })
642    }
643
644    /// Start a transaction
645    pub async fn begin_transaction(&self) -> GraphTransaction {
646        GraphTransaction {
647            db: self,
648            batch: WriteBatch::default(),
649            modified_nodes: HashSet::new(),
650            modified_edges: HashSet::new(),
651        }
652    }
653
654    /// Scan all nodes
655    #[instrument(skip(self))]
656    pub async fn scan_nodes(&self) -> Result<Vec<Node>, GraphError> {
657        let cf_nodes = self.db.cf_handle("nodes")
658            .ok_or_else(|| GraphError::ColumnFamilyNotFound("nodes".to_string()))?;
659
660        let mut nodes = Vec::new();
661        let iter = self.db.iterator_cf(&cf_nodes, IteratorMode::Start);
662
663        for item in iter {
664            let (key, value) = item.map_err(|e| GraphError::RocksDBError(e.to_string()))?;
665            let node_id = String::from_utf8(key.to_vec())
666                .map_err(|e| GraphError::SerializationError(e.to_string()))?;
667            let node: Node = bincode::deserialize(&value)
668                .map_err(|e| GraphError::SerializationError(e.to_string()))?;
669            nodes.push(node);
670        }
671
672        Ok(nodes)
673    }
674
675    /// Scan all edges
676    #[instrument(skip(self))]
677    pub async fn scan_edges(&self) -> Result<Vec<Edge>, GraphError> {
678        let cf_edges = self.db.cf_handle("edges")
679            .ok_or_else(|| GraphError::ColumnFamilyNotFound("edges".to_string()))?;
680
681        let mut edges = Vec::new();
682        let iter = self.db.iterator_cf(&cf_edges, IteratorMode::Start);
683
684        for item in iter {
685            let (key, value) = item.map_err(|e| GraphError::RocksDBError(e.to_string()))?;
686            let edge_id = String::from_utf8(key.to_vec())
687                .map_err(|e| GraphError::SerializationError(e.to_string()))?;
688            let edge: Edge = bincode::deserialize(&value)
689                .map_err(|e| GraphError::SerializationError(e.to_string()))?;
690            edges.push(edge);
691        }
692
693        Ok(edges)
694    }
695
696    /// Get database statistics
697    #[instrument(skip(self))]
698    pub async fn get_statistics(&self) -> Result<GraphStatistics, GraphError> {
699        let cf_nodes = self.db.cf_handle("nodes")
700            .ok_or_else(|| GraphError::ColumnFamilyNotFound("nodes".to_string()))?;
701        let cf_edges = self.db.cf_handle("edges")
702            .ok_or_else(|| GraphError::ColumnFamilyNotFound("edges".to_string()))?;
703
704        // Count nodes
705        let mut node_count = 0u64;
706        let iter = self.db.iterator_cf(&cf_nodes, IteratorMode::Start);
707        for _ in iter {
708            node_count += 1;
709        }
710
711        // Count edges
712        let mut edge_count = 0u64;
713        let iter = self.db.iterator_cf(&cf_edges, IteratorMode::Start);
714        for _ in iter {
715            edge_count += 1;
716        }
717
718        Ok(GraphStatistics {
719            node_count,
720            edge_count,
721            cache_size: self.node_cache.len() + self.edge_cache.len(),
722        })
723    }
724
725    // Internal helper methods
726    async fn load_schema(&self) -> Result<(), GraphError> {
727        let cf_schema = self.db.cf_handle("schema")
728            .ok_or_else(|| GraphError::ColumnFamilyNotFound("schema".to_string()))?;
729
730        if let Some(schema_data) = self.db.get_cf(&cf_schema, "schema")
731            .map_err(|e| GraphError::RocksDBError(e.to_string()))? {
732
733            let schema: Schema = bincode::deserialize(&schema_data)
734                .map_err(|e| GraphError::SerializationError(e.to_string()))?;
735
736            *self.schema.write().await = schema;
737        }
738
739        Ok(())
740    }
741
742    async fn update_schema_for_node(&self, node: &Node) -> Result<(), GraphError> {
743        let mut schema = self.schema.write().await;
744
745        for label in &node.labels {
746            let label_schema = schema.node_labels.entry(label.clone()).or_insert_with(|| LabelSchema {
747                name: label.clone(),
748                properties: HashMap::new(),
749            });
750
751            for (prop_name, prop_value) in &node.properties {
752                let prop_type = self.infer_property_type(prop_value);
753                label_schema.properties.insert(prop_name.clone(), PropertySchema {
754                    name: prop_name.clone(),
755                    data_type: prop_type,
756                });
757            }
758        }
759
760        // Persist schema
761        self.persist_schema(&schema).await?;
762        Ok(())
763    }
764
765    async fn update_schema_for_edge(&self, edge: &Edge) -> Result<(), GraphError> {
766        let mut schema = self.schema.write().await;
767
768        let label_schema = schema.edge_labels.entry(edge.label.clone()).or_insert_with(|| LabelSchema {
769            name: edge.label.clone(),
770            properties: HashMap::new(),
771        });
772
773        for (prop_name, prop_value) in &edge.properties {
774            let prop_type = self.infer_property_type(prop_value);
775            label_schema.properties.insert(prop_name.clone(), PropertySchema {
776                name: prop_name.clone(),
777                data_type: prop_type,
778            });
779        }
780
781        // Persist schema
782        self.persist_schema(&schema).await?;
783        Ok(())
784    }
785
786    fn infer_property_type(&self, value: &PropertyValue) -> PropertyType {
787        match value {
788            PropertyValue::String(_) => PropertyType::String,
789            PropertyValue::Integer(_) => PropertyType::Integer,
790            PropertyValue::Float(_) => PropertyType::Float,
791            PropertyValue::Boolean(_) => PropertyType::Boolean,
792            PropertyValue::Date(_) => PropertyType::Date,
793            PropertyValue::List(_) => PropertyType::List,
794            PropertyValue::Map(_) => PropertyType::Map,
795        }
796    }
797
798    async fn persist_schema(&self, schema: &Schema) -> Result<(), GraphError> {
799        let cf_schema = self.db.cf_handle("schema")
800            .ok_or_else(|| GraphError::ColumnFamilyNotFound("schema".to_string()))?;
801
802        let schema_data = bincode::serialize(schema)
803            .map_err(|e| GraphError::SerializationError(e.to_string()))?;
804
805        self.db.put_cf(&cf_schema, "schema", schema_data)
806            .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
807
808        Ok(())
809    }
810
811    async fn store_edge_mappings(&self, edge: &Edge) -> Result<(), GraphError> {
812        let cf_edges = self.db.cf_handle("edges")
813            .ok_or_else(|| GraphError::ColumnFamilyNotFound("edges".to_string()))?;
814
815        // Store outgoing mapping
816        let outgoing_key = format!("outgoing:{}:{}", edge.from_node, edge.label);
817        self.db.put_cf(&cf_edges, &outgoing_key, edge.id.as_bytes())
818            .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
819
820        // Store incoming mapping
821        let incoming_key = format!("incoming:{}:{}", edge.to_node, edge.label);
822        self.db.put_cf(&cf_edges, &incoming_key, edge.id.as_bytes())
823            .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
824
825        Ok(())
826    }
827
828    async fn create_indexes_for_node(&self, node: &Node) -> Result<(), GraphError> {
829        // Create property indexes
830        for (prop_name, prop_value) in &node.properties {
831            let index_key = format!("prop:node:{}:{}:{}", prop_name, self.property_value_to_string(prop_value), node.id);
832            self.store_index(&index_key, &node.id).await?;
833        }
834
835        // Create label indexes
836        for label in &node.labels {
837            let index_key = format!("label:node:{}:{}", label, node.id);
838            self.store_index(&index_key, &node.id).await?;
839        }
840
841        Ok(())
842    }
843
844    async fn create_indexes_for_edge(&self, edge: &Edge) -> Result<(), GraphError> {
845        // Create property indexes
846        for (prop_name, prop_value) in &edge.properties {
847            let index_key = format!("prop:edge:{}:{}:{}", prop_name, self.property_value_to_string(prop_value), edge.id);
848            self.store_index(&index_key, &edge.id).await?;
849        }
850
851        // Create label index
852        let index_key = format!("label:edge:{}:{}", edge.label, edge.id);
853        self.store_index(&index_key, &edge.id).await?;
854
855        Ok(())
856    }
857
858    async fn store_index(&self, index_key: &str, entity_id: &str) -> Result<(), GraphError> {
859        let cf_indexes = self.db.cf_handle("indexes")
860            .ok_or_else(|| GraphError::ColumnFamilyNotFound("indexes".to_string()))?;
861
862        self.db.put_cf(&cf_indexes, index_key, entity_id)
863            .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
864
865        Ok(())
866    }
867
868    fn property_value_to_string(&self, value: &PropertyValue) -> String {
869        match value {
870            PropertyValue::String(s) => s.clone(),
871            PropertyValue::Integer(i) => i.to_string(),
872            PropertyValue::Float(f) => f.to_string(),
873            PropertyValue::Boolean(b) => b.to_string(),
874            PropertyValue::Date(dt) => dt.to_rfc3339(),
875            PropertyValue::List(_) => "[LIST]".to_string(),
876            PropertyValue::Map(_) => "[MAP]".to_string(),
877        }
878    }
879
880    async fn query_nodes_by_pattern(&self, pattern: &NodePattern) -> Result<Vec<Node>, GraphError> {
881        // This is a simplified implementation
882        // In a real implementation, this would use indexes efficiently
883        let cf_nodes = self.db.cf_handle("nodes")
884            .ok_or_else(|| GraphError::ColumnFamilyNotFound("nodes".to_string()))?;
885
886        let mut nodes = Vec::new();
887        let iter = self.db.iterator_cf(&cf_nodes, IteratorMode::Start);
888
889        for item in iter {
890            let (_, value) = item.map_err(|e| GraphError::RocksDBError(e.to_string()))?;
891            let node: Node = bincode::deserialize(&value)
892                .map_err(|e| GraphError::SerializationError(e.to_string()))?;
893
894            // Check if node matches pattern
895            if self.node_matches_pattern(&node, pattern) {
896                nodes.push(node);
897            }
898        }
899
900        Ok(nodes)
901    }
902
903    fn node_matches_pattern(&self, node: &Node, pattern: &NodePattern) -> bool {
904        // Check labels
905        if !pattern.labels.is_empty() {
906            let node_labels: HashSet<String> = node.labels.iter().cloned().collect();
907            let pattern_labels: HashSet<String> = pattern.labels.iter().cloned().collect();
908            if !pattern_labels.is_subset(&node_labels) {
909                return false;
910            }
911        }
912
913        // Check properties
914        for (prop_name, condition) in &pattern.properties {
915            if let Some(node_value) = node.properties.get(prop_name) {
916                if !self.property_matches_condition(node_value, condition) {
917                    return false;
918                }
919            } else {
920                return false;
921            }
922        }
923
924        true
925    }
926
927    fn property_matches_condition(&self, value: &PropertyValue, condition: &PropertyCondition) -> bool {
928        match condition.operator {
929            ComparisonOperator::Equal => value == &condition.value,
930            ComparisonOperator::NotEqual => value != &condition.value,
931            // Other operators would be implemented here
932            _ => false,
933        }
934    }
935
936    async fn delete_edges_for_node(&self, node_id: &str) -> Result<(), GraphError> {
937        // Delete outgoing edges
938        let outgoing_edges = self.get_edges_from_node(node_id, None).await?;
939        for edge in outgoing_edges {
940            self.delete_edge(&edge.id).await?;
941        }
942
943        // Delete incoming edges
944        let incoming_edges = self.get_edges_to_node(node_id, None).await?;
945        for edge in incoming_edges {
946            self.delete_edge(&edge.id).await?;
947        }
948
949        Ok(())
950    }
951
952    async fn delete_edge(&self, edge_id: &str) -> Result<(), GraphError> {
953        let cf_edges = self.db.cf_handle("edges")
954            .ok_or_else(|| GraphError::ColumnFamilyNotFound("edges".to_string()))?;
955
956        let edge_key = format!("edge:{}", edge_id);
957        self.db.delete_cf(&cf_edges, &edge_key)
958            .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
959
960        self.edge_cache.remove(edge_id);
961        Ok(())
962    }
963
964    async fn update_indexes_for_node(&self, node: &Node) -> Result<(), GraphError> {
965        // Delete old indexes and create new ones
966        self.delete_indexes_for_node(&node.id).await?;
967        self.create_indexes_for_node(node).await?;
968        Ok(())
969    }
970
971    async fn delete_indexes_for_node(&self, node_id: &str) -> Result<(), GraphError> {
972        // This is a simplified implementation
973        // In a real implementation, you'd track and delete specific indexes
974        Ok(())
975    }
976}
977
978/// Graph transaction implementation
979impl<'a> GraphTransaction<'a> {
980    /// Create a node in the transaction
981    pub async fn create_node(
982        &mut self,
983        id: Option<String>,
984        labels: Vec<String>,
985        properties: BTreeMap<String, PropertyValue>,
986    ) -> Result<String, GraphError> {
987        let node_id = id.unwrap_or_else(|| Uuid::new_v4().to_string());
988        let now = Utc::now();
989
990        let node = Node {
991            id: node_id.clone(),
992            labels,
993            properties,
994            created_at: now,
995            updated_at: now,
996        };
997
998        let node_data = bincode::serialize(&node)
999            .map_err(|e| GraphError::SerializationError(e.to_string()))?;
1000
1001        let cf_nodes = self.db.db.cf_handle("nodes")
1002            .ok_or_else(|| GraphError::ColumnFamilyNotFound("nodes".to_string()))?;
1003
1004        let node_key = format!("node:{}", node_id);
1005        self.batch.put_cf(&cf_nodes, &node_key, node_data);
1006
1007        self.modified_nodes.insert(node_id.clone());
1008
1009        Ok(node_id)
1010    }
1011
1012    /// Create an edge in the transaction
1013    pub async fn create_edge(
1014        &mut self,
1015        id: Option<String>,
1016        from_node: &str,
1017        to_node: &str,
1018        label: String,
1019        properties: BTreeMap<String, PropertyValue>,
1020    ) -> Result<String, GraphError> {
1021        let edge_id = id.unwrap_or_else(|| Uuid::new_v4().to_string());
1022        let now = Utc::now();
1023
1024        let edge = Edge {
1025            id: edge_id.clone(),
1026            from_node: from_node.to_string(),
1027            to_node: to_node.to_string(),
1028            label: label.clone(),
1029            properties,
1030            created_at: now,
1031            updated_at: now,
1032        };
1033
1034        let edge_data = bincode::serialize(&edge)
1035            .map_err(|e| GraphError::SerializationError(e.to_string()))?;
1036
1037        let cf_edges = self.db.db.cf_handle("edges")
1038            .ok_or_else(|| GraphError::ColumnFamilyNotFound("edges".to_string()))?;
1039
1040        let edge_key = format!("edge:{}", edge_id);
1041        self.batch.put_cf(&cf_edges, &edge_key, edge_data);
1042
1043        // Store reverse mappings for efficient traversal
1044        let outgoing_key = format!("outgoing:{}:{}", from_node, label);
1045        let incoming_key = format!("incoming:{}:{}", to_node, label);
1046        self.batch.put_cf(&cf_edges, &outgoing_key, edge_id.as_bytes());
1047        self.batch.put_cf(&cf_edges, &incoming_key, edge_id.as_bytes());
1048
1049        self.modified_edges.insert(edge_id.clone());
1050
1051        Ok(edge_id)
1052    }
1053
1054    /// Commit the transaction
1055    pub async fn commit(self) -> Result<(), GraphError> {
1056        self.db.db.write(self.batch)
1057            .map_err(|e| GraphError::RocksDBError(e.to_string()))?;
1058
1059        // Update caches
1060        for node_id in self.modified_nodes {
1061            if let Some(node) = self.db.get_node(&node_id).await? {
1062                self.db.node_cache.insert(node_id, node);
1063            }
1064        }
1065
1066        for edge_id in self.modified_edges {
1067            if let Some(edge) = self.db.get_edge(&edge_id).await? {
1068                self.db.edge_cache.insert(edge_id, edge);
1069            }
1070        }
1071
1072        Ok(())
1073    }
1074
1075    /// Rollback the transaction
1076    pub fn rollback(self) -> Result<(), GraphError> {
1077        // Transaction is automatically rolled back when dropped
1078        Ok(())
1079    }
1080}
1081
1082/// Graph statistics
1083#[derive(Debug, Clone, Serialize, Deserialize)]
1084pub struct GraphStatistics {
1085    pub node_count: u64,
1086    pub edge_count: u64,
1087    pub cache_size: usize,
1088}
1089
1090/// Graph error types
1091#[derive(thiserror::Error, Debug)]
1092pub enum GraphError {
1093    #[error("RocksDB error: {0}")]
1094    RocksDBError(String),
1095
1096    #[error("Serialization error: {0}")]
1097    SerializationError(String),
1098
1099    #[error("Column family not found: {0}")]
1100    ColumnFamilyNotFound(String),
1101
1102    #[error("Node not found: {0}")]
1103    NodeNotFound(String),
1104
1105    #[error("Edge not found: {0}")]
1106    EdgeNotFound(String),
1107
1108    #[error("Invalid data: {0}")]
1109    InvalidData(String),
1110
1111    #[error("Query error: {0}")]
1112    QueryError(String),
1113
1114    #[error("Transaction error: {0}")]
1115    TransactionError(String),
1116}
1117
1118impl Default for Schema {
1119    fn default() -> Self {
1120        Self {
1121            node_labels: HashMap::new(),
1122            edge_labels: HashMap::new(),
1123            indexes: HashMap::new(),
1124        }
1125    }
1126}
1127
1128#[cfg(test)]
1129mod tests {
1130    use super::*;
1131    use tempfile::tempdir;
1132
1133    #[tokio::test]
1134    async fn test_graphdb_creation() {
1135        let temp_dir = tempdir().unwrap();
1136        let graphdb = GraphDB::new(temp_dir.path().to_str().unwrap()).await;
1137        assert!(graphdb.is_ok(), "GraphDB should be created successfully");
1138    }
1139
1140    #[tokio::test]
1141    async fn test_node_operations() {
1142        let temp_dir = tempdir().unwrap();
1143        let graphdb = GraphDB::new(temp_dir.path().to_str().unwrap()).await.unwrap();
1144
1145        // Create node
1146        let mut properties = BTreeMap::new();
1147        properties.insert("name".to_string(), PropertyValue::String("Alice".to_string()));
1148        properties.insert("age".to_string(), PropertyValue::Integer(30));
1149
1150        let node_id = graphdb.create_node(
1151            None,
1152            vec!["Person".to_string()],
1153            properties,
1154        ).await.unwrap();
1155
1156        // Get node
1157        let node = graphdb.get_node(&node_id).await.unwrap().unwrap();
1158        assert_eq!(node.labels, vec!["Person"]);
1159        assert_eq!(node.properties["name"], PropertyValue::String("Alice".to_string()));
1160
1161        // Update node
1162        let mut update_props = BTreeMap::new();
1163        update_props.insert("city".to_string(), PropertyValue::String("Tokyo".to_string()));
1164        graphdb.update_node(&node_id, update_props).await.unwrap();
1165
1166        // Verify update
1167        let updated_node = graphdb.get_node(&node_id).await.unwrap().unwrap();
1168        assert_eq!(updated_node.properties["city"], PropertyValue::String("Tokyo".to_string()));
1169    }
1170
1171    #[tokio::test]
1172    async fn test_edge_operations() {
1173        let temp_dir = tempdir().unwrap();
1174        let graphdb = GraphDB::new(temp_dir.path().to_str().unwrap()).await.unwrap();
1175
1176        // Create nodes
1177        let node1_id = graphdb.create_node(
1178            None,
1179            vec!["Person".to_string()],
1180            BTreeMap::new(),
1181        ).await.unwrap();
1182
1183        let node2_id = graphdb.create_node(
1184            None,
1185            vec!["Person".to_string()],
1186            BTreeMap::new(),
1187        ).await.unwrap();
1188
1189        // Create edge
1190        let mut edge_props = BTreeMap::new();
1191        edge_props.insert("since".to_string(), PropertyValue::Integer(2020));
1192
1193        let edge_id = graphdb.create_edge(
1194            None,
1195            &node1_id,
1196            &node2_id,
1197            "KNOWS".to_string(),
1198            edge_props,
1199        ).await.unwrap();
1200
1201        // Get edge
1202        let edge = graphdb.get_edge(&edge_id).await.unwrap().unwrap();
1203        assert_eq!(edge.from_node, node1_id);
1204        assert_eq!(edge.to_node, node2_id);
1205        assert_eq!(edge.label, "KNOWS");
1206
1207        // Get edges from node
1208        let outgoing = graphdb.get_edges_from_node(&node1_id, Some("KNOWS")).await.unwrap();
1209        assert_eq!(outgoing.len(), 1);
1210        assert_eq!(outgoing[0].id, edge_id);
1211    }
1212
1213    #[tokio::test]
1214    async fn test_transaction() {
1215        let temp_dir = tempdir().unwrap();
1216        let graphdb = GraphDB::new(temp_dir.path().to_str().unwrap()).await.unwrap();
1217
1218        // Start transaction
1219        let mut tx = graphdb.begin_transaction().await;
1220
1221        // Create nodes in transaction
1222        let node1_id = tx.create_node(
1223            None,
1224            vec!["Person".to_string()],
1225            BTreeMap::new(),
1226        ).await.unwrap();
1227
1228        let node2_id = tx.create_node(
1229            None,
1230            vec!["Person".to_string()],
1231            BTreeMap::new(),
1232        ).await.unwrap();
1233
1234        // Commit transaction
1235        tx.commit().await.unwrap();
1236
1237        // Verify nodes exist
1238        assert!(graphdb.get_node(&node1_id).await.unwrap().is_some());
1239        assert!(graphdb.get_node(&node2_id).await.unwrap().is_some());
1240    }
1241}