1use kotoba_db_engine_memory::MemoryStorageEngine;
2#[cfg(feature = "lsm")]
3use kotoba_db_engine_lsm::LSMStorageEngine;
4use kotoba_db_core::engine::StorageEngine;
5use kotoba_db_core::types::{Block, Cid, NodeBlock, EdgeBlock, Value};
6use std::collections::{BTreeMap, HashMap, HashSet};
7use std::path::Path;
8use std::sync::Arc;
9use anyhow::Result;
10use tokio::sync::RwLock;
11
12pub struct DB {
15 engine: Box<dyn StorageEngine>,
16 transactions: Arc<RwLock<HashMap<u64, Transaction>>>,
18 next_txn_id: Arc<RwLock<u64>>,
20}
21
22pub struct Transaction {
24 id: u64,
25 operations: Vec<Operation>,
26 state: TransactionState,
27 created_at: std::time::Instant,
28}
29
30#[derive(Debug, Clone, PartialEq)]
32pub enum TransactionState {
33 Active,
34 Committed,
35 RolledBack,
36 Failed,
37}
38
39#[derive(Debug, Clone)]
41pub enum Operation {
42 CreateNode { properties: BTreeMap<String, Value>, cid: Option<Cid> },
43 CreateEdge { label: String, from_cid: Cid, to_cid: Cid, properties: BTreeMap<String, Value>, cid: Option<Cid> },
44 UpdateNode { cid: Cid, properties: BTreeMap<String, Value> },
45 UpdateEdge { cid: Cid, properties: BTreeMap<String, Value> },
46 DeleteNode { cid: Cid },
47 DeleteEdge { cid: Cid },
48}
49
50impl DB {
51 pub fn open_memory() -> Result<Self> {
54 Ok(Self {
55 engine: Box::new(MemoryStorageEngine::new()),
56 transactions: Arc::new(RwLock::new(HashMap::new())),
57 next_txn_id: Arc::new(RwLock::new(1)),
58 })
59 }
60
61 #[cfg(feature = "lsm")]
67 pub async fn open_lsm<P: AsRef<Path>>(path: P) -> Result<Self> {
68 let engine = LSMStorageEngine::new(path).await?;
69 Ok(Self {
70 engine: Box::new(engine),
71 transactions: Arc::new(RwLock::new(HashMap::new())),
72 next_txn_id: Arc::new(RwLock::new(1)),
73 })
74 }
75
76 pub async fn begin_transaction(&self) -> Result<u64> {
78 let mut next_id = self.next_txn_id.write().await;
79 let txn_id = *next_id;
80 *next_id += 1;
81
82 let transaction = Transaction {
83 id: txn_id,
84 operations: Vec::new(),
85 state: TransactionState::Active,
86 created_at: std::time::Instant::now(),
87 };
88
89 let mut transactions = self.transactions.write().await;
90 transactions.insert(txn_id, transaction);
91
92 Ok(txn_id)
93 }
94
95 pub async fn commit_transaction(&mut self, txn_id: u64) -> Result<()> {
97 let operations = {
99 let transactions = self.transactions.read().await;
100 if let Some(txn) = transactions.get(&txn_id) {
101 if txn.state != TransactionState::Active {
102 return Err(anyhow::anyhow!("Transaction is not active"));
103 }
104 txn.operations.clone()
105 } else {
106 return Err(anyhow::anyhow!("Transaction not found"));
107 }
108 };
109
110 for op in operations {
112 match op {
113 Operation::CreateNode { properties, .. } => {
114 self.create_node(properties).await?;
115 }
116 Operation::CreateEdge { label, from_cid, to_cid, properties, .. } => {
117 self.create_edge(label, from_cid, to_cid, properties).await?;
118 }
119 Operation::UpdateNode { cid, properties } => {
120 let existing_node = self.get_node(&cid).await?
123 .ok_or_else(|| anyhow::anyhow!("Node not found"))?;
124 let mut new_properties = existing_node.properties.clone();
125 new_properties.extend(properties);
126 self.create_node(new_properties).await?;
127 }
128 Operation::UpdateEdge { cid, properties } => {
129 let existing_edge = self.get_edge(&cid).await?
132 .ok_or_else(|| anyhow::anyhow!("Edge not found"))?;
133 let mut new_properties = existing_edge.properties.clone();
134 new_properties.extend(properties);
135 self.create_edge(existing_edge.label.clone(),
136 existing_edge.from,
137 existing_edge.to,
138 new_properties).await?;
139 }
140 Operation::DeleteNode { cid } => {
141 let _ = cid; }
145 Operation::DeleteEdge { cid } => {
146 let _ = cid; }
150 }
151 }
152
153 let mut transactions = self.transactions.write().await;
155 if let Some(txn) = transactions.get_mut(&txn_id) {
156 txn.state = TransactionState::Committed;
157 }
158
159 Ok(())
160 }
161
162 pub async fn rollback_transaction(&mut self, txn_id: u64) -> Result<()> {
164 let mut transactions = self.transactions.write().await;
165 if let Some(txn) = transactions.get_mut(&txn_id) {
166 if txn.state != TransactionState::Active {
167 return Err(anyhow::anyhow!("Transaction is not active"));
168 }
169
170 txn.operations.clear();
171 txn.state = TransactionState::RolledBack;
172 Ok(())
173 } else {
174 Err(anyhow::anyhow!("Transaction not found"))
175 }
176 }
177
178 pub async fn add_operation(&self, txn_id: u64, operation: Operation) -> Result<()> {
180 let mut transactions = self.transactions.write().await;
181 if let Some(txn) = transactions.get_mut(&txn_id) {
182 if txn.state != TransactionState::Active {
183 return Err(anyhow::anyhow!("Transaction is not active"));
184 }
185 txn.operations.push(operation);
186 Ok(())
187 } else {
188 Err(anyhow::anyhow!("Transaction not found"))
189 }
190 }
191
192 pub async fn create_node(&mut self, properties: BTreeMap<String, Value>) -> Result<Cid> {
200 let node_block = NodeBlock {
201 properties,
202 edges: Vec::new(), };
204 let block = Block::Node(node_block);
205 self.engine.put_block(&block).await
206 }
207
208 pub async fn create_edge(
219 &mut self,
220 label: String,
221 from_cid: Cid,
222 to_cid: Cid,
223 properties: BTreeMap<String, Value>,
224 ) -> Result<Cid> {
225 let edge_block = EdgeBlock {
226 label,
227 from: from_cid,
228 to: to_cid,
229 properties,
230 };
231 let block = Block::Edge(edge_block);
232 self.engine.put_block(&block).await
233 }
234
235 pub async fn get_block(&self, cid: &Cid) -> Result<Option<Block>> {
237 self.engine.get_block(cid).await
238 }
239
240 pub async fn get_node(&self, cid: &Cid) -> Result<Option<NodeBlock>> {
242 match self.get_block(cid).await? {
243 Some(Block::Node(node)) => Ok(Some(node)),
244 Some(Block::Edge(_)) => Ok(None),
245 None => Ok(None),
246 }
247 }
248
249 pub async fn get_edge(&self, cid: &Cid) -> Result<Option<EdgeBlock>> {
251 match self.get_block(cid).await? {
252 Some(Block::Edge(edge)) => Ok(Some(edge)),
253 Some(Block::Node(_)) => Ok(None),
254 None => Ok(None),
255 }
256 }
257
258 pub async fn find_nodes(&self, filters: &[(String, Value)]) -> Result<Vec<(Cid, NodeBlock)>> {
260 let mut results = Vec::new();
261
262 let all_keys = self.engine.scan(b"").await?; for (key_bytes, _) in all_keys {
269 if let Ok(cid) = <[u8; 32]>::try_from(&key_bytes[..]) {
270 if let Some(Block::Node(node)) = self.get_block(&cid).await? {
271 let mut matches = true;
273 for (prop_name, expected_value) in filters {
274 if let Some(actual_value) = node.properties.get(prop_name) {
275 if actual_value != expected_value {
276 matches = false;
277 break;
278 }
279 } else {
280 matches = false;
281 break;
282 }
283 }
284 if matches {
285 results.push((cid, node));
286 }
287 }
288 }
289 }
290
291 Ok(results)
292 }
293
294 pub async fn find_edges(&self,
296 label_filter: Option<&str>,
297 from_filter: Option<Cid>,
298 to_filter: Option<Cid>,
299 property_filters: &[(String, Value)]) -> Result<Vec<(Cid, EdgeBlock)>> {
300 let mut results = Vec::new();
301
302 let all_keys = self.engine.scan(b"").await?;
304
305 for (key_bytes, _) in all_keys {
306 if let Ok(cid) = <[u8; 32]>::try_from(&key_bytes[..]) {
307 if let Some(Block::Edge(edge)) = self.get_block(&cid).await? {
308 let mut matches = true;
310
311 if let Some(expected_label) = label_filter {
313 if edge.label != expected_label {
314 matches = false;
315 }
316 }
317
318 if let Some(expected_from) = from_filter {
320 if edge.from != expected_from {
321 matches = false;
322 }
323 }
324
325 if let Some(expected_to) = to_filter {
327 if edge.to != expected_to {
328 matches = false;
329 }
330 }
331
332 for (prop_name, expected_value) in property_filters {
334 if let Some(actual_value) = edge.properties.get(prop_name) {
335 if actual_value != expected_value {
336 matches = false;
337 break;
338 }
339 } else {
340 matches = false;
341 break;
342 }
343 }
344
345 if matches {
346 results.push((cid, edge));
347 }
348 }
349 }
350 }
351
352 Ok(results)
353 }
354
355 pub async fn traverse(&self,
357 start_cid: Cid,
358 direction: TraversalDirection,
359 max_depth: usize,
360 edge_labels: Option<&[String]>) -> Result<HashMap<Cid, Vec<Cid>>> {
361 let mut visited = HashSet::new();
362 let mut result = HashMap::new();
363 let mut queue = Vec::new();
364
365 queue.push((start_cid, 0)); visited.insert(start_cid);
367
368 while let Some((current_cid, depth)) = queue.pop() {
369 if depth >= max_depth {
370 continue;
371 }
372
373 let edges = match direction {
375 TraversalDirection::Outgoing => {
376 self.find_edges(None, Some(current_cid), None, &[]).await?
377 }
378 TraversalDirection::Incoming => {
379 self.find_edges(None, None, Some(current_cid), &[]).await?
380 }
381 TraversalDirection::Both => {
382 let mut all_edges = self.find_edges(None, Some(current_cid), None, &[]).await?;
383 all_edges.extend(self.find_edges(None, None, Some(current_cid), &[]).await?);
384 all_edges
385 }
386 };
387
388 let mut neighbors = Vec::new();
389
390 for (_, edge) in edges {
391 if let Some(labels) = edge_labels {
393 if !labels.contains(&edge.label) {
394 continue;
395 }
396 }
397
398 let neighbor_cid = match direction {
399 TraversalDirection::Outgoing => edge.to,
400 TraversalDirection::Incoming => edge.from,
401 TraversalDirection::Both => {
402 if edge.from == current_cid {
403 edge.to
404 } else {
405 edge.from
406 }
407 }
408 };
409
410 if visited.insert(neighbor_cid) {
411 neighbors.push(neighbor_cid);
412 queue.push((neighbor_cid, depth + 1));
413 } else {
414 neighbors.push(neighbor_cid);
415 }
416 }
417
418 if !neighbors.is_empty() {
419 result.insert(current_cid, neighbors);
420 }
421 }
422
423 Ok(result)
424 }
425}
426
427#[derive(Debug, Clone, Copy)]
429pub enum TraversalDirection {
430 Outgoing, Incoming, Both, }
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438
439 #[tokio::test]
440 async fn test_basic_operations() {
441 let mut db = DB::open_memory().unwrap();
442
443 let mut properties = BTreeMap::new();
445 properties.insert("name".to_string(), Value::String("Alice".to_string()));
446 properties.insert("age".to_string(), Value::Int(30));
447
448 let node_cid = db.create_node(properties).await.unwrap();
449
450 let node = db.get_node(&node_cid).await.unwrap().unwrap();
452 assert_eq!(node.properties["name"], Value::String("Alice".to_string()));
453 assert_eq!(node.properties["age"], Value::Int(30));
454 assert!(node.edges.is_empty());
455
456 let mut edge_props = BTreeMap::new();
458 edge_props.insert("since".to_string(), Value::Int(2020));
459
460 let edge_cid = db.create_edge(
461 "FRIENDS_WITH".to_string(),
462 node_cid,
463 node_cid, edge_props,
465 ).await.unwrap();
466
467 let edge = db.get_edge(&edge_cid).await.unwrap().unwrap();
469 assert_eq!(edge.label, "FRIENDS_WITH");
470 assert_eq!(edge.from, node_cid);
471 assert_eq!(edge.to, node_cid);
472 assert_eq!(edge.properties["since"], Value::Int(2020));
473 }
474
475 #[cfg(feature = "lsm")]
476 #[tokio::test]
477 async fn test_lsm_engine_creation() {
478 let temp_dir = std::env::temp_dir().join("test_kotoba_db");
479 std::fs::create_dir_all(&temp_dir).unwrap();
480
481 let db = DB::open_lsm(&temp_dir).await;
483 assert!(db.is_ok(), "LSM engine should be created successfully");
484
485 std::fs::remove_dir_all(&temp_dir).unwrap();
487 }
488
489 #[cfg(feature = "lsm")]
490 #[tokio::test]
491 async fn test_lsm_compaction() {
492 let temp_dir = std::env::temp_dir().join("test_kotoba_db_compaction");
493 std::fs::create_dir_all(&temp_dir).unwrap();
494
495 let compaction_config = kotoba_db_engine_lsm::CompactionConfig {
497 max_sstables: 3, min_compaction_files: 2,
499 };
500
501 let mut db = {
502 use kotoba_db_engine_lsm::LSMStorageEngine;
503 let engine = LSMStorageEngine::with_config(&temp_dir, compaction_config).await.unwrap();
504 DB {
505 engine: Box::new(engine),
506 transactions: Arc::new(RwLock::new(HashMap::new())),
507 next_txn_id: Arc::new(RwLock::new(1)),
508 }
509 };
510
511 for i in 0..50 {
513 let key = format!("key_{:03}", i);
514 let value = format!("value_{}", i);
515
516 let mut properties = BTreeMap::new();
517 properties.insert("key".to_string(), Value::String(key.clone()));
518 properties.insert("value".to_string(), Value::String(value));
519
520 db.create_node(properties).await.unwrap();
521
522 if i % 10 == 0 {
524 let mut update_props = BTreeMap::new();
525 update_props.insert("key".to_string(), Value::String(key));
526 update_props.insert("updated".to_string(), Value::String(format!("updated_{}", i)));
527 db.create_node(update_props).await.unwrap();
528 }
529 }
530
531 let mut properties = BTreeMap::new();
534 properties.insert("test_key".to_string(), Value::String("test_value".to_string()));
535
536 let test_cid = db.create_node(properties).await.unwrap();
537
538 let node = db.get_node(&test_cid).await.unwrap().unwrap();
540 assert_eq!(node.properties["test_key"], Value::String("test_value".to_string()));
541
542 std::fs::remove_dir_all(&temp_dir).unwrap();
548 }
549
550 #[tokio::test]
551 async fn test_transaction_operations() {
552 let mut db = DB::open_memory().unwrap();
553
554 let txn_id = db.begin_transaction().await.unwrap();
556
557 let mut node_props = BTreeMap::new();
559 node_props.insert("name".to_string(), Value::String("Alice".to_string()));
560 node_props.insert("age".to_string(), Value::Int(30));
561
562 db.add_operation(txn_id, Operation::CreateNode {
563 properties: node_props,
564 cid: None,
565 }).await.unwrap();
566
567 db.commit_transaction(txn_id).await.unwrap();
569
570 let nodes = db.find_nodes(&[("name".to_string(), Value::String("Alice".to_string()))]).await.unwrap();
572 assert_eq!(nodes.len(), 1);
573 assert_eq!(nodes[0].1.properties["name"], Value::String("Alice".to_string()));
574 }
575
576 #[tokio::test]
577 async fn test_query_operations() {
578 let mut db = DB::open_memory().unwrap();
579
580 let mut alice_props = BTreeMap::new();
582 alice_props.insert("name".to_string(), Value::String("Alice".to_string()));
583 alice_props.insert("age".to_string(), Value::Int(30));
584 alice_props.insert("city".to_string(), Value::String("Tokyo".to_string()));
585
586 let mut bob_props = BTreeMap::new();
587 bob_props.insert("name".to_string(), Value::String("Bob".to_string()));
588 bob_props.insert("age".to_string(), Value::Int(25));
589 bob_props.insert("city".to_string(), Value::String("Tokyo".to_string()));
590
591 let alice_cid = db.create_node(alice_props).await.unwrap();
592 let bob_cid = db.create_node(bob_props).await.unwrap();
593
594 let mut friendship_props = BTreeMap::new();
596 friendship_props.insert("since".to_string(), Value::Int(2020));
597 db.create_edge("FRIENDS".to_string(), alice_cid, bob_cid, friendship_props).await.unwrap();
598
599 let tokyo_nodes = db.find_nodes(&[("city".to_string(), Value::String("Tokyo".to_string()))]).await.unwrap();
601 assert_eq!(tokyo_nodes.len(), 2);
602
603 let alice_nodes = db.find_nodes(&[
604 ("name".to_string(), Value::String("Alice".to_string())),
605 ("age".to_string(), Value::Int(30))
606 ]).await.unwrap();
607 assert_eq!(alice_nodes.len(), 1);
608
609 let friendships = db.find_edges(Some("FRIENDS"), None, None, &[]).await.unwrap();
611 assert_eq!(friendships.len(), 1);
612
613 let traversal_result = db.traverse(alice_cid, TraversalDirection::Outgoing, 2, None).await.unwrap();
615 assert!(traversal_result.contains_key(&alice_cid));
616 assert_eq!(traversal_result[&alice_cid].len(), 1);
617 assert_eq!(traversal_result[&alice_cid][0], bob_cid);
618 }
619}