kotoba_db/
lib.rs

1use kotoba_db_engine_memory::MemoryStorageEngine;
2#[cfg(feature = "lsm")]
3use kotoba_db_engine_lsm::LSMStorageEngine;
4use kotoba_db_core::engine::StorageEngine;
5use kotoba_db_core::types::{Block, Cid, NodeBlock, EdgeBlock, Value};
6use std::collections::{BTreeMap, HashMap, HashSet};
7use std::path::Path;
8use std::sync::Arc;
9use anyhow::Result;
10use tokio::sync::RwLock;
11
12/// The main database handle for KotobaDB.
13/// This provides the user-facing API for database operations.
14pub struct DB {
15    engine: Box<dyn StorageEngine>,
16    /// Active transactions
17    transactions: Arc<RwLock<HashMap<u64, Transaction>>>,
18    /// Transaction ID counter
19    next_txn_id: Arc<RwLock<u64>>,
20}
21
22/// A database transaction that supports ACID operations
23pub struct Transaction {
24    id: u64,
25    operations: Vec<Operation>,
26    state: TransactionState,
27    created_at: std::time::Instant,
28}
29
30/// State of a transaction
31#[derive(Debug, Clone, PartialEq)]
32pub enum TransactionState {
33    Active,
34    Committed,
35    RolledBack,
36    Failed,
37}
38
39/// Database operation within a transaction
40#[derive(Debug, Clone)]
41pub enum Operation {
42    CreateNode { properties: BTreeMap<String, Value>, cid: Option<Cid> },
43    CreateEdge { label: String, from_cid: Cid, to_cid: Cid, properties: BTreeMap<String, Value>, cid: Option<Cid> },
44    UpdateNode { cid: Cid, properties: BTreeMap<String, Value> },
45    UpdateEdge { cid: Cid, properties: BTreeMap<String, Value> },
46    DeleteNode { cid: Cid },
47    DeleteEdge { cid: Cid },
48}
49
50impl DB {
51    /// Opens a new database instance using an in-memory storage engine.
52    /// This is useful for testing, prototyping, or temporary data.
53    pub fn open_memory() -> Result<Self> {
54        Ok(Self {
55            engine: Box::new(MemoryStorageEngine::new()),
56            transactions: Arc::new(RwLock::new(HashMap::new())),
57            next_txn_id: Arc::new(RwLock::new(1)),
58        })
59    }
60
61    /// Opens a new database instance using an LSM-Tree based storage engine.
62    /// This provides durable, high-performance persistent storage.
63    ///
64    /// # Arguments
65    /// * `path` - Directory path where database files will be stored
66    #[cfg(feature = "lsm")]
67    pub async fn open_lsm<P: AsRef<Path>>(path: P) -> Result<Self> {
68        let engine = LSMStorageEngine::new(path).await?;
69        Ok(Self {
70            engine: Box::new(engine),
71            transactions: Arc::new(RwLock::new(HashMap::new())),
72            next_txn_id: Arc::new(RwLock::new(1)),
73        })
74    }
75
76    /// Begins a new transaction
77    pub async fn begin_transaction(&self) -> Result<u64> {
78        let mut next_id = self.next_txn_id.write().await;
79        let txn_id = *next_id;
80        *next_id += 1;
81
82        let transaction = Transaction {
83            id: txn_id,
84            operations: Vec::new(),
85            state: TransactionState::Active,
86            created_at: std::time::Instant::now(),
87        };
88
89        let mut transactions = self.transactions.write().await;
90        transactions.insert(txn_id, transaction);
91
92        Ok(txn_id)
93    }
94
95    /// Commits a transaction
96    pub async fn commit_transaction(&mut self, txn_id: u64) -> Result<()> {
97        // Get the transaction operations first
98        let operations = {
99            let transactions = self.transactions.read().await;
100            if let Some(txn) = transactions.get(&txn_id) {
101                if txn.state != TransactionState::Active {
102                    return Err(anyhow::anyhow!("Transaction is not active"));
103                }
104                txn.operations.clone()
105            } else {
106                return Err(anyhow::anyhow!("Transaction not found"));
107            }
108        };
109
110        // Execute all operations (without holding the transactions lock)
111        for op in operations {
112            match op {
113                Operation::CreateNode { properties, .. } => {
114                    self.create_node(properties).await?;
115                }
116                Operation::CreateEdge { label, from_cid, to_cid, properties, .. } => {
117                    self.create_edge(label, from_cid, to_cid, properties).await?;
118                }
119                Operation::UpdateNode { cid, properties } => {
120                    // For now, we recreate the node with updated properties
121                    // TODO: Implement proper update semantics
122                    let existing_node = self.get_node(&cid).await?
123                        .ok_or_else(|| anyhow::anyhow!("Node not found"))?;
124                    let mut new_properties = existing_node.properties.clone();
125                    new_properties.extend(properties);
126                    self.create_node(new_properties).await?;
127                }
128                Operation::UpdateEdge { cid, properties } => {
129                    // For now, we recreate the edge with updated properties
130                    // TODO: Implement proper update semantics
131                    let existing_edge = self.get_edge(&cid).await?
132                        .ok_or_else(|| anyhow::anyhow!("Edge not found"))?;
133                    let mut new_properties = existing_edge.properties.clone();
134                    new_properties.extend(properties);
135                    self.create_edge(existing_edge.label.clone(),
136                                   existing_edge.from,
137                                   existing_edge.to,
138                                   new_properties).await?;
139                }
140                Operation::DeleteNode { cid } => {
141                    // Mark as deleted by creating a tombstone
142                    // TODO: Implement proper deletion
143                    let _ = cid; // For now, just ignore
144                }
145                Operation::DeleteEdge { cid } => {
146                    // Mark as deleted by creating a tombstone
147                    // TODO: Implement proper deletion
148                    let _ = cid; // For now, just ignore
149                }
150            }
151        }
152
153        // Mark transaction as committed
154        let mut transactions = self.transactions.write().await;
155        if let Some(txn) = transactions.get_mut(&txn_id) {
156            txn.state = TransactionState::Committed;
157        }
158
159        Ok(())
160    }
161
162    /// Rolls back a transaction
163    pub async fn rollback_transaction(&mut self, txn_id: u64) -> Result<()> {
164        let mut transactions = self.transactions.write().await;
165        if let Some(txn) = transactions.get_mut(&txn_id) {
166            if txn.state != TransactionState::Active {
167                return Err(anyhow::anyhow!("Transaction is not active"));
168            }
169
170            txn.operations.clear();
171            txn.state = TransactionState::RolledBack;
172            Ok(())
173        } else {
174            Err(anyhow::anyhow!("Transaction not found"))
175        }
176    }
177
178    /// Adds an operation to a transaction
179    pub async fn add_operation(&self, txn_id: u64, operation: Operation) -> Result<()> {
180        let mut transactions = self.transactions.write().await;
181        if let Some(txn) = transactions.get_mut(&txn_id) {
182            if txn.state != TransactionState::Active {
183                return Err(anyhow::anyhow!("Transaction is not active"));
184            }
185            txn.operations.push(operation);
186            Ok(())
187        } else {
188            Err(anyhow::anyhow!("Transaction not found"))
189        }
190    }
191
192    /// Creates a new node in the database.
193    ///
194    /// # Arguments
195    /// * `properties` - A map of property names to values for this node
196    ///
197    /// # Returns
198    /// The CID of the created node block
199    pub async fn create_node(&mut self, properties: BTreeMap<String, Value>) -> Result<Cid> {
200        let node_block = NodeBlock {
201            properties,
202            edges: Vec::new(), // Start with no edges
203        };
204        let block = Block::Node(node_block);
205        self.engine.put_block(&block).await
206    }
207
208    /// Creates a new edge in the database.
209    ///
210    /// # Arguments
211    /// * `label` - The label/type of the edge (e.g., "FRIENDS_WITH", "WORKS_AT")
212    /// * `from_cid` - CID of the source node
213    /// * `to_cid` - CID of the target node
214    /// * `properties` - A map of property names to values for this edge
215    ///
216    /// # Returns
217    /// The CID of the created edge block
218    pub async fn create_edge(
219        &mut self,
220        label: String,
221        from_cid: Cid,
222        to_cid: Cid,
223        properties: BTreeMap<String, Value>,
224    ) -> Result<Cid> {
225        let edge_block = EdgeBlock {
226            label,
227            from: from_cid,
228            to: to_cid,
229            properties,
230        };
231        let block = Block::Edge(edge_block);
232        self.engine.put_block(&block).await
233    }
234
235    /// Retrieves a block by its CID.
236    pub async fn get_block(&self, cid: &Cid) -> Result<Option<Block>> {
237        self.engine.get_block(cid).await
238    }
239
240    /// Retrieves a node by its CID.
241    pub async fn get_node(&self, cid: &Cid) -> Result<Option<NodeBlock>> {
242        match self.get_block(cid).await? {
243            Some(Block::Node(node)) => Ok(Some(node)),
244            Some(Block::Edge(_)) => Ok(None),
245            None => Ok(None),
246        }
247    }
248
249    /// Retrieves an edge by its CID.
250    pub async fn get_edge(&self, cid: &Cid) -> Result<Option<EdgeBlock>> {
251        match self.get_block(cid).await? {
252            Some(Block::Edge(edge)) => Ok(Some(edge)),
253            Some(Block::Node(_)) => Ok(None),
254            None => Ok(None),
255        }
256    }
257
258    /// Finds nodes that match the given property filters
259    pub async fn find_nodes(&self, filters: &[(String, Value)]) -> Result<Vec<(Cid, NodeBlock)>> {
260        let mut results = Vec::new();
261
262        // For now, we scan through all nodes (TODO: implement proper indexing)
263        // In a real implementation, this would use indexes for efficient filtering
264
265        // Get all node CIDs by scanning (this is inefficient but works for basic functionality)
266        let all_keys = self.engine.scan(b"").await?; // Get all keys
267
268        for (key_bytes, _) in all_keys {
269            if let Ok(cid) = <[u8; 32]>::try_from(&key_bytes[..]) {
270                if let Some(Block::Node(node)) = self.get_block(&cid).await? {
271                    // Check if node matches all filters
272                    let mut matches = true;
273                    for (prop_name, expected_value) in filters {
274                        if let Some(actual_value) = node.properties.get(prop_name) {
275                            if actual_value != expected_value {
276                                matches = false;
277                                break;
278                            }
279                        } else {
280                            matches = false;
281                            break;
282                        }
283                    }
284                    if matches {
285                        results.push((cid, node));
286                    }
287                }
288            }
289        }
290
291        Ok(results)
292    }
293
294    /// Finds edges that match the given criteria
295    pub async fn find_edges(&self,
296                           label_filter: Option<&str>,
297                           from_filter: Option<Cid>,
298                           to_filter: Option<Cid>,
299                           property_filters: &[(String, Value)]) -> Result<Vec<(Cid, EdgeBlock)>> {
300        let mut results = Vec::new();
301
302        // Get all keys and filter edges
303        let all_keys = self.engine.scan(b"").await?;
304
305        for (key_bytes, _) in all_keys {
306            if let Ok(cid) = <[u8; 32]>::try_from(&key_bytes[..]) {
307                if let Some(Block::Edge(edge)) = self.get_block(&cid).await? {
308                    // Check filters
309                    let mut matches = true;
310
311                    // Label filter
312                    if let Some(expected_label) = label_filter {
313                        if edge.label != expected_label {
314                            matches = false;
315                        }
316                    }
317
318                    // From node filter
319                    if let Some(expected_from) = from_filter {
320                        if edge.from != expected_from {
321                            matches = false;
322                        }
323                    }
324
325                    // To node filter
326                    if let Some(expected_to) = to_filter {
327                        if edge.to != expected_to {
328                            matches = false;
329                        }
330                    }
331
332                    // Property filters
333                    for (prop_name, expected_value) in property_filters {
334                        if let Some(actual_value) = edge.properties.get(prop_name) {
335                            if actual_value != expected_value {
336                                matches = false;
337                                break;
338                            }
339                        } else {
340                            matches = false;
341                            break;
342                        }
343                    }
344
345                    if matches {
346                        results.push((cid, edge));
347                    }
348                }
349            }
350        }
351
352        Ok(results)
353    }
354
355    /// Performs a basic graph traversal from a starting node
356    pub async fn traverse(&self,
357                         start_cid: Cid,
358                         direction: TraversalDirection,
359                         max_depth: usize,
360                         edge_labels: Option<&[String]>) -> Result<HashMap<Cid, Vec<Cid>>> {
361        let mut visited = HashSet::new();
362        let mut result = HashMap::new();
363        let mut queue = Vec::new();
364
365        queue.push((start_cid, 0)); // (node_cid, depth)
366        visited.insert(start_cid);
367
368        while let Some((current_cid, depth)) = queue.pop() {
369            if depth >= max_depth {
370                continue;
371            }
372
373            // Find edges from/to this node
374            let edges = match direction {
375                TraversalDirection::Outgoing => {
376                    self.find_edges(None, Some(current_cid), None, &[]).await?
377                }
378                TraversalDirection::Incoming => {
379                    self.find_edges(None, None, Some(current_cid), &[]).await?
380                }
381                TraversalDirection::Both => {
382                    let mut all_edges = self.find_edges(None, Some(current_cid), None, &[]).await?;
383                    all_edges.extend(self.find_edges(None, None, Some(current_cid), &[]).await?);
384                    all_edges
385                }
386            };
387
388            let mut neighbors = Vec::new();
389
390            for (_, edge) in edges {
391                // Filter by edge labels if specified
392                if let Some(labels) = edge_labels {
393                    if !labels.contains(&edge.label) {
394                        continue;
395                    }
396                }
397
398                let neighbor_cid = match direction {
399                    TraversalDirection::Outgoing => edge.to,
400                    TraversalDirection::Incoming => edge.from,
401                    TraversalDirection::Both => {
402                        if edge.from == current_cid {
403                            edge.to
404                        } else {
405                            edge.from
406                        }
407                    }
408                };
409
410                if visited.insert(neighbor_cid) {
411                    neighbors.push(neighbor_cid);
412                    queue.push((neighbor_cid, depth + 1));
413                } else {
414                    neighbors.push(neighbor_cid);
415                }
416            }
417
418            if !neighbors.is_empty() {
419                result.insert(current_cid, neighbors);
420            }
421        }
422
423        Ok(result)
424    }
425}
426
427/// Direction for graph traversal
428#[derive(Debug, Clone, Copy)]
429pub enum TraversalDirection {
430    Outgoing,  // Follow outgoing edges
431    Incoming,  // Follow incoming edges
432    Both,      // Follow both directions
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438
439    #[tokio::test]
440    async fn test_basic_operations() {
441        let mut db = DB::open_memory().unwrap();
442
443        // Create a node
444        let mut properties = BTreeMap::new();
445        properties.insert("name".to_string(), Value::String("Alice".to_string()));
446        properties.insert("age".to_string(), Value::Int(30));
447
448        let node_cid = db.create_node(properties).await.unwrap();
449
450        // Retrieve the node
451        let node = db.get_node(&node_cid).await.unwrap().unwrap();
452        assert_eq!(node.properties["name"], Value::String("Alice".to_string()));
453        assert_eq!(node.properties["age"], Value::Int(30));
454        assert!(node.edges.is_empty());
455
456        // Create an edge
457        let mut edge_props = BTreeMap::new();
458        edge_props.insert("since".to_string(), Value::Int(2020));
459
460        let edge_cid = db.create_edge(
461            "FRIENDS_WITH".to_string(),
462            node_cid,
463            node_cid, // self-loop for simplicity
464            edge_props,
465        ).await.unwrap();
466
467        // Retrieve the edge
468        let edge = db.get_edge(&edge_cid).await.unwrap().unwrap();
469        assert_eq!(edge.label, "FRIENDS_WITH");
470        assert_eq!(edge.from, node_cid);
471        assert_eq!(edge.to, node_cid);
472        assert_eq!(edge.properties["since"], Value::Int(2020));
473    }
474
475    #[cfg(feature = "lsm")]
476    #[tokio::test]
477    async fn test_lsm_engine_creation() {
478        let temp_dir = std::env::temp_dir().join("test_kotoba_db");
479        std::fs::create_dir_all(&temp_dir).unwrap();
480
481        // Test that LSM engine can be created
482        let db = DB::open_lsm(&temp_dir).await;
483        assert!(db.is_ok(), "LSM engine should be created successfully");
484
485        // Clean up
486        std::fs::remove_dir_all(&temp_dir).unwrap();
487    }
488
489    #[cfg(feature = "lsm")]
490    #[tokio::test]
491    async fn test_lsm_compaction() {
492        let temp_dir = std::env::temp_dir().join("test_kotoba_db_compaction");
493        std::fs::create_dir_all(&temp_dir).unwrap();
494
495        // Create LSM engine with low compaction threshold for testing
496        let compaction_config = kotoba_db_engine_lsm::CompactionConfig {
497            max_sstables: 3,  // Trigger compaction after 3 SSTables
498            min_compaction_files: 2,
499        };
500
501        let mut db = {
502            use kotoba_db_engine_lsm::LSMStorageEngine;
503            let engine = LSMStorageEngine::with_config(&temp_dir, compaction_config).await.unwrap();
504            DB {
505                engine: Box::new(engine),
506                transactions: Arc::new(RwLock::new(HashMap::new())),
507                next_txn_id: Arc::new(RwLock::new(1)),
508            }
509        };
510
511        // Insert enough data to trigger multiple flushes and compaction
512        for i in 0..50 {
513            let key = format!("key_{:03}", i);
514            let value = format!("value_{}", i);
515
516            let mut properties = BTreeMap::new();
517            properties.insert("key".to_string(), Value::String(key.clone()));
518            properties.insert("value".to_string(), Value::String(value));
519
520            db.create_node(properties).await.unwrap();
521
522            // Update the same key multiple times to create tombstones and updates
523            if i % 10 == 0 {
524                let mut update_props = BTreeMap::new();
525                update_props.insert("key".to_string(), Value::String(key));
526                update_props.insert("updated".to_string(), Value::String(format!("updated_{}", i)));
527                db.create_node(update_props).await.unwrap();
528            }
529        }
530
531        // Verify that compaction worked by checking that we can still read data
532        // and that the database is functional after compaction
533        let mut properties = BTreeMap::new();
534        properties.insert("test_key".to_string(), Value::String("test_value".to_string()));
535
536        let test_cid = db.create_node(properties).await.unwrap();
537
538        // Verify we can read back the test data
539        let node = db.get_node(&test_cid).await.unwrap().unwrap();
540        assert_eq!(node.properties["test_key"], Value::String("test_value".to_string()));
541
542        // Also verify that some of the original data is still accessible
543        // (we can't easily test all 50 items without storing their CIDs)
544        // but the fact that compaction completed without errors is a good sign
545
546        // Clean up
547        std::fs::remove_dir_all(&temp_dir).unwrap();
548    }
549
550    #[tokio::test]
551    async fn test_transaction_operations() {
552        let mut db = DB::open_memory().unwrap();
553
554        // Begin transaction
555        let txn_id = db.begin_transaction().await.unwrap();
556
557        // Add operations to transaction
558        let mut node_props = BTreeMap::new();
559        node_props.insert("name".to_string(), Value::String("Alice".to_string()));
560        node_props.insert("age".to_string(), Value::Int(30));
561
562        db.add_operation(txn_id, Operation::CreateNode {
563            properties: node_props,
564            cid: None,
565        }).await.unwrap();
566
567        // Commit transaction
568        db.commit_transaction(txn_id).await.unwrap();
569
570        // Verify the node was created
571        let nodes = db.find_nodes(&[("name".to_string(), Value::String("Alice".to_string()))]).await.unwrap();
572        assert_eq!(nodes.len(), 1);
573        assert_eq!(nodes[0].1.properties["name"], Value::String("Alice".to_string()));
574    }
575
576    #[tokio::test]
577    async fn test_query_operations() {
578        let mut db = DB::open_memory().unwrap();
579
580        // Create some test data
581        let mut alice_props = BTreeMap::new();
582        alice_props.insert("name".to_string(), Value::String("Alice".to_string()));
583        alice_props.insert("age".to_string(), Value::Int(30));
584        alice_props.insert("city".to_string(), Value::String("Tokyo".to_string()));
585
586        let mut bob_props = BTreeMap::new();
587        bob_props.insert("name".to_string(), Value::String("Bob".to_string()));
588        bob_props.insert("age".to_string(), Value::Int(25));
589        bob_props.insert("city".to_string(), Value::String("Tokyo".to_string()));
590
591        let alice_cid = db.create_node(alice_props).await.unwrap();
592        let bob_cid = db.create_node(bob_props).await.unwrap();
593
594        // Create an edge
595        let mut friendship_props = BTreeMap::new();
596        friendship_props.insert("since".to_string(), Value::Int(2020));
597        db.create_edge("FRIENDS".to_string(), alice_cid, bob_cid, friendship_props).await.unwrap();
598
599        // Test node filtering
600        let tokyo_nodes = db.find_nodes(&[("city".to_string(), Value::String("Tokyo".to_string()))]).await.unwrap();
601        assert_eq!(tokyo_nodes.len(), 2);
602
603        let alice_nodes = db.find_nodes(&[
604            ("name".to_string(), Value::String("Alice".to_string())),
605            ("age".to_string(), Value::Int(30))
606        ]).await.unwrap();
607        assert_eq!(alice_nodes.len(), 1);
608
609        // Test edge filtering
610        let friendships = db.find_edges(Some("FRIENDS"), None, None, &[]).await.unwrap();
611        assert_eq!(friendships.len(), 1);
612
613        // Test traversal
614        let traversal_result = db.traverse(alice_cid, TraversalDirection::Outgoing, 2, None).await.unwrap();
615        assert!(traversal_result.contains_key(&alice_cid));
616        assert_eq!(traversal_result[&alice_cid].len(), 1);
617        assert_eq!(traversal_result[&alice_cid][0], bob_cid);
618    }
619}