1use std::sync::{Arc, RwLock};
4
5use std::collections::HashMap;
6
7use std::time::Instant;
8
9use kyu_binder::{
10 BindContext, Binder, BoundMatchClause, BoundNodePattern, BoundPatternElement, BoundQuery,
11 BoundReadingClause, BoundStatement, BoundUpdatingClause,
12};
13use kyu_catalog::{Catalog, NodeTableEntry, Property, RelTableEntry};
14use kyu_common::id::TableId;
15use kyu_common::{KyuError, KyuResult};
16use kyu_delta::{DeltaBatch, DeltaStats, GraphDelta};
17use kyu_executor::{ExecutionContext, QueryResult, Storage, execute};
18use kyu_expression::{FunctionRegistry, evaluate, evaluate_constant};
19use kyu_planner::{build_query_plan, optimize, resolve_properties};
20use kyu_transaction::{Checkpointer, TransactionManager, TransactionType, Wal};
21use kyu_types::{LogicalType, TypedValue};
22use smol_str::SmolStr;
23
24use crate::storage::NodeGroupStorage;
25
26pub struct Connection {
33 catalog: Arc<Catalog>,
34 storage: Arc<RwLock<NodeGroupStorage>>,
35 txn_mgr: Arc<TransactionManager>,
36 wal: Arc<Wal>,
37 checkpointer: Arc<Checkpointer>,
38 extensions: Arc<Vec<Box<dyn kyu_extension::Extension>>>,
39}
40
41impl Connection {
42 pub(crate) fn new(
43 catalog: Arc<Catalog>,
44 storage: Arc<RwLock<NodeGroupStorage>>,
45 txn_mgr: Arc<TransactionManager>,
46 wal: Arc<Wal>,
47 checkpointer: Arc<Checkpointer>,
48 extensions: Arc<Vec<Box<dyn kyu_extension::Extension>>>,
49 ) -> Self {
50 Self {
51 catalog,
52 storage,
53 txn_mgr,
54 wal,
55 checkpointer,
56 extensions,
57 }
58 }
59
60 pub fn query(&self, cypher: &str) -> KyuResult<QueryResult> {
66 self.query_internal(cypher, BindContext::empty())
67 }
68
69 pub fn query_with_params(
86 &self,
87 cypher: &str,
88 params: HashMap<String, TypedValue>,
89 ) -> KyuResult<QueryResult> {
90 let ctx = BindContext {
91 params: params
92 .into_iter()
93 .map(|(k, v)| (SmolStr::new(k), v))
94 .collect(),
95 env: HashMap::new(),
96 };
97 self.query_internal(cypher, ctx)
98 }
99
100 pub fn execute(
123 &self,
124 cypher: &str,
125 params: HashMap<String, TypedValue>,
126 env: HashMap<String, TypedValue>,
127 ) -> KyuResult<QueryResult> {
128 let ctx = BindContext {
129 params: params
130 .into_iter()
131 .map(|(k, v)| (SmolStr::new(k), v))
132 .collect(),
133 env: env.into_iter().map(|(k, v)| (SmolStr::new(k), v)).collect(),
134 };
135 self.query_internal(cypher, ctx)
136 }
137
138 fn query_internal(&self, cypher: &str, ctx: BindContext) -> KyuResult<QueryResult> {
139 if cypher.trim().eq_ignore_ascii_case("CHECKPOINT")
141 || cypher.trim().eq_ignore_ascii_case("CHECKPOINT;")
142 {
143 self.checkpointer
144 .checkpoint()
145 .map_err(|e| KyuError::Transaction(format!("checkpoint failed: {e}")))?;
146 return Ok(QueryResult::new(vec![], vec![]));
147 }
148
149 if let Some(result) = self.try_call_extension(cypher)? {
151 return Ok(result);
152 }
153
154 let parse_result = kyu_parser::parse(cypher);
156 let stmt = parse_result
157 .ast
158 .ok_or_else(|| KyuError::Parser(format!("{:?}", parse_result.errors)))?;
159
160 let catalog_snapshot = self.catalog.read();
162 let mut binder =
163 Binder::new(catalog_snapshot, FunctionRegistry::with_builtins()).with_context(ctx);
164 let bound = binder.bind(&stmt)?;
165
166 let is_ddl = matches!(
168 &bound,
169 BoundStatement::CreateNodeTable(_)
170 | BoundStatement::CreateRelTable(_)
171 | BoundStatement::Drop(_)
172 );
173 let is_write = match &bound {
174 BoundStatement::Query(q) => self.is_standalone_dml(q) || self.has_match_mutations(q),
175 BoundStatement::CopyFrom(_) | BoundStatement::LoadFrom(_) => true,
176 _ => is_ddl,
177 };
178
179 let txn_type = if is_write {
181 TransactionType::Write
182 } else {
183 TransactionType::ReadOnly
184 };
185 let mut txn = self
186 .txn_mgr
187 .begin(txn_type)
188 .map_err(|e| KyuError::Transaction(e.to_string()))?;
189
190 let result = self.execute_bound(bound);
192
193 match &result {
195 Ok(_) => {
196 if is_ddl {
198 let snapshot = self.catalog.read().serialize_json();
199 txn.log_catalog_snapshot(snapshot.into_bytes());
200 }
201 self.txn_mgr
202 .commit(&mut txn, &self.wal, |_, _| {})
203 .map_err(|e| KyuError::Transaction(e.to_string()))?;
204 if is_write {
206 let _ = self.checkpointer.try_checkpoint();
207 }
208 }
209 Err(_) => {
210 let _ = self.txn_mgr.rollback(&mut txn, |_| {});
211 }
212 }
213
214 result
215 }
216
217 fn execute_bound(&self, bound: BoundStatement) -> KyuResult<QueryResult> {
219 match bound {
220 BoundStatement::Query(query) => {
221 if self.is_standalone_dml(&query) {
222 return self.exec_dml(&query);
223 }
224 if self.has_match_mutations(&query) {
225 return self.exec_match_dml(&query);
226 }
227 let catalog_snapshot = self.catalog.read();
228 let plan = build_query_plan(&query, &catalog_snapshot)?;
229 let plan = optimize(plan, &catalog_snapshot);
230 let storage_guard = self.storage.read().unwrap();
231 let ctx = ExecutionContext::new(catalog_snapshot, &*storage_guard);
232 execute(&plan, &query.output_schema, &ctx)
233 }
234 BoundStatement::CreateNodeTable(create) => self.exec_create_node_table(&create),
235 BoundStatement::CreateRelTable(create) => self.exec_create_rel_table(&create),
236 BoundStatement::Drop(drop) => self.exec_drop(&drop),
237 BoundStatement::CopyFrom(copy) => self.exec_copy_from(©),
238 BoundStatement::LoadFrom(load) => self.exec_load_from(&load),
239 _ => Err(KyuError::NotImplemented(
240 "statement type not yet supported".into(),
241 )),
242 }
243 }
244
245 fn is_standalone_dml(&self, query: &BoundQuery) -> bool {
249 query
250 .parts
251 .iter()
252 .all(|part| part.reading_clauses.is_empty() && !part.updating_clauses.is_empty())
253 }
254
255 fn exec_dml(&self, query: &BoundQuery) -> KyuResult<QueryResult> {
257 let catalog_snapshot = self.catalog.read();
258
259 for part in &query.parts {
260 let mut created_nodes: Vec<(Option<u32>, TableId, Vec<TypedValue>)> = Vec::new();
262
263 for clause in &part.updating_clauses {
264 match clause {
265 BoundUpdatingClause::Create(patterns) => {
266 for pattern in patterns {
267 for element in &pattern.elements {
268 match element {
269 BoundPatternElement::Node(node) => {
270 let values =
271 self.exec_create_node(node, &catalog_snapshot)?;
272 created_nodes.push((
273 node.variable_index,
274 node.table_id,
275 values,
276 ));
277 }
278 BoundPatternElement::Relationship(_rel) => {
279 return Err(KyuError::NotImplemented(
280 "CREATE relationship not yet supported".into(),
281 ));
282 }
283 }
284 }
285 }
286 }
287 BoundUpdatingClause::Set(_) => {
288 return Err(KyuError::NotImplemented(
289 "standalone SET without MATCH".into(),
290 ));
291 }
292 BoundUpdatingClause::Delete(_) => {
293 return Err(KyuError::NotImplemented(
294 "standalone DELETE without MATCH".into(),
295 ));
296 }
297 }
298 }
299
300 if let Some(ref proj) = part.projection {
302 let mut prop_map: HashMap<(u32, SmolStr), u32> = HashMap::new();
303 let mut combined_values: Vec<TypedValue> = Vec::new();
304 let mut offset = 0u32;
305
306 for (var_idx, table_id, values) in &created_nodes {
307 if let Some(entry) = catalog_snapshot.find_by_id(*table_id) {
308 if let Some(vi) = var_idx {
309 for (i, prop) in entry.properties().iter().enumerate() {
310 prop_map.insert((*vi, prop.name.clone()), offset + i as u32);
311 }
312 }
313 offset += entry.properties().len() as u32;
314 }
315 combined_values.extend(values.iter().cloned());
316 }
317
318 let col_names: Vec<SmolStr> =
319 proj.items.iter().map(|item| item.alias.clone()).collect();
320 let col_types: Vec<LogicalType> = proj
321 .items
322 .iter()
323 .map(|item| item.expression.result_type().clone())
324 .collect();
325
326 let mut row: Vec<TypedValue> = Vec::with_capacity(proj.items.len());
327 for item in &proj.items {
328 let resolved = resolve_properties(&item.expression, &prop_map);
329 let value = evaluate(&resolved, combined_values.as_slice())?;
330 row.push(value);
331 }
332
333 let mut result = QueryResult::new(col_names, col_types);
334 result.push_row(row);
335 return Ok(result);
336 }
337 }
338
339 Ok(QueryResult::new(vec![], vec![]))
340 }
341
342 fn exec_create_node(
345 &self,
346 node: &BoundNodePattern,
347 catalog: &kyu_catalog::CatalogContent,
348 ) -> KyuResult<Vec<TypedValue>> {
349 let entry = catalog
350 .find_by_id(node.table_id)
351 .ok_or_else(|| KyuError::Catalog(format!("table {:?} not found", node.table_id)))?;
352 let properties = entry.properties();
353
354 let mut values = Vec::with_capacity(properties.len());
355 for prop in properties {
356 let value = if let Some((_pid, expr)) =
357 node.properties.iter().find(|(pid, _)| *pid == prop.id)
358 {
359 evaluate_constant(expr)?
360 } else {
361 TypedValue::Null
362 };
363 values.push(value);
364 }
365
366 self.storage
367 .write()
368 .unwrap()
369 .insert_row(node.table_id, &values)?;
370
371 Ok(values)
372 }
373
374 fn has_match_mutations(&self, query: &BoundQuery) -> bool {
376 query
377 .parts
378 .iter()
379 .any(|part| !part.reading_clauses.is_empty() && !part.updating_clauses.is_empty())
380 }
381
382 fn exec_match_dml(&self, query: &BoundQuery) -> KyuResult<QueryResult> {
387 let catalog_snapshot = self.catalog.read();
388
389 for part in &query.parts {
390 let match_clauses: Vec<&BoundMatchClause> = part
392 .reading_clauses
393 .iter()
394 .filter_map(|c| match c {
395 BoundReadingClause::Match(m) => Some(m),
396 _ => None,
397 })
398 .collect();
399
400 if match_clauses.is_empty() {
401 return Err(KyuError::NotImplemented(
402 "MATCH...mutation requires at least one MATCH clause".into(),
403 ));
404 }
405
406 let mut bindings: Vec<HashMap<u32, Vec<TypedValue>>> = vec![HashMap::new()];
410 let mut prop_map: HashMap<(u32, SmolStr), u32> = HashMap::new();
411 let mut global_offset = 0u32;
412
413 for mc in &match_clauses {
414 let (table_id, var_idx) = self.extract_match_node(mc)?;
415 let entry = catalog_snapshot
416 .find_by_id(table_id)
417 .ok_or_else(|| KyuError::Catalog(format!("table {:?} not found", table_id)))?;
418 let properties = entry.properties();
419
420 if let Some(vi) = var_idx {
422 for (i, p) in properties.iter().enumerate() {
423 prop_map.insert((vi, p.name.clone()), global_offset + i as u32);
424 }
425 }
426
427 let resolved_where = mc
429 .where_clause
430 .as_ref()
431 .map(|w| resolve_properties(w, &prop_map));
432
433 let rows = self.storage.read().unwrap().scan_rows(table_id)?;
435
436 let mut new_bindings = Vec::new();
438 for existing in &bindings {
439 for (_row_idx, row_values) in &rows {
440 let mut combined = Vec::new();
442 let mut entries: Vec<(u32, &Vec<TypedValue>)> =
444 existing.iter().map(|(&k, v)| (k, v)).collect();
445 entries.sort_by_key(|(k, _)| *k);
446 for (_, vals) in &entries {
447 combined.extend(vals.iter().cloned());
448 }
449 combined.extend(row_values.iter().cloned());
450
451 if let Some(ref pred) = resolved_where {
453 let result = evaluate(pred, combined.as_slice())?;
454 if result != TypedValue::Bool(true) {
455 continue;
456 }
457 }
458
459 let mut binding = existing.clone();
460 if let Some(vi) = var_idx {
461 binding.insert(vi, row_values.clone());
462 }
463 new_bindings.push(binding);
464 }
465 }
466 bindings = new_bindings;
467 global_offset += properties.len() as u32;
468 }
469
470 let mut set_mutations: Vec<(TableId, u64, usize, TypedValue)> = Vec::new();
472 let mut delete_rows: Vec<(TableId, u64)> = Vec::new();
473 let mut rel_inserts: Vec<(TableId, Vec<TypedValue>)> = Vec::new();
474
475 for binding in &bindings {
476 let mut combined = Vec::new();
478 let mut entries: Vec<(u32, &Vec<TypedValue>)> =
479 binding.iter().map(|(&k, v)| (k, v)).collect();
480 entries.sort_by_key(|(k, _)| *k);
481 for (_, vals) in &entries {
482 combined.extend(vals.iter().cloned());
483 }
484
485 for clause in &part.updating_clauses {
486 match clause {
487 BoundUpdatingClause::Set(items) => {
488 let (table_id, _) = self.extract_match_node(match_clauses[0])?;
490 let entry = catalog_snapshot.find_by_id(table_id).unwrap();
491 let properties = entry.properties();
492 let rows = self.storage.read().unwrap().scan_rows(table_id)?;
493 let first_var =
495 match_clauses[0].patterns[0].elements.iter().find_map(|e| {
496 if let BoundPatternElement::Node(n) = e {
497 n.variable_index
498 } else {
499 None
500 }
501 });
502 if let Some(vi) = first_var
503 && let Some(var_vals) = binding.get(&vi)
504 {
505 for (row_idx, row_values) in &rows {
506 if row_values == var_vals {
507 for item in items {
508 let resolved_value =
509 resolve_properties(&item.value, &prop_map);
510 let new_value =
511 evaluate(&resolved_value, combined.as_slice())?;
512 let col_idx = properties
513 .iter()
514 .position(|p| p.id == item.property_id)
515 .ok_or_else(|| {
516 KyuError::Storage(format!(
517 "property {:?} not found",
518 item.property_id
519 ))
520 })?;
521 set_mutations
522 .push((table_id, *row_idx, col_idx, new_value));
523 }
524 break;
525 }
526 }
527 }
528 }
529 BoundUpdatingClause::Delete(_) => {
530 let (table_id, _) = self.extract_match_node(match_clauses[0])?;
531 let rows = self.storage.read().unwrap().scan_rows(table_id)?;
532 let first_var =
533 match_clauses[0].patterns[0].elements.iter().find_map(|e| {
534 if let BoundPatternElement::Node(n) = e {
535 n.variable_index
536 } else {
537 None
538 }
539 });
540 if let Some(vi) = first_var
541 && let Some(var_vals) = binding.get(&vi)
542 {
543 for (row_idx, row_values) in &rows {
544 if row_values == var_vals {
545 delete_rows.push((table_id, *row_idx));
546 break;
547 }
548 }
549 }
550 }
551 BoundUpdatingClause::Create(patterns) => {
552 for pattern in patterns {
553 self.exec_create_in_binding(
554 pattern,
555 binding,
556 &catalog_snapshot,
557 &mut rel_inserts,
558 )?;
559 }
560 }
561 }
562 }
563 }
564
565 let mut storage = self.storage.write().unwrap();
567 for (table_id, row_idx, col_idx, value) in &set_mutations {
568 storage.update_cell(*table_id, *row_idx, *col_idx, value)?;
569 }
570 for (table_id, row_idx) in &delete_rows {
571 storage.delete_row(*table_id, *row_idx)?;
572 }
573 for (table_id, values) in &rel_inserts {
574 storage.insert_row(*table_id, values)?;
575 }
576 }
577
578 Ok(QueryResult::new(vec![], vec![]))
579 }
580
581 fn exec_create_in_binding(
584 &self,
585 pattern: &kyu_binder::BoundPattern,
586 binding: &HashMap<u32, Vec<TypedValue>>,
587 catalog: &kyu_catalog::CatalogContent,
588 rel_inserts: &mut Vec<(TableId, Vec<TypedValue>)>,
589 ) -> KyuResult<()> {
590 let elements = &pattern.elements;
591 let mut i = 0;
592 while i < elements.len() {
593 match &elements[i] {
594 BoundPatternElement::Node(_node) => {
595 i += 1;
598 }
599 BoundPatternElement::Relationship(rel) => {
600 let src_var = if i > 0 {
602 if let BoundPatternElement::Node(n) = &elements[i - 1] {
603 n.variable_index
604 } else {
605 None
606 }
607 } else {
608 None
609 };
610 let dst_var = if i + 1 < elements.len() {
611 if let BoundPatternElement::Node(n) = &elements[i + 1] {
612 n.variable_index
613 } else {
614 None
615 }
616 } else {
617 None
618 };
619
620 let src_vi = src_var.ok_or_else(|| {
621 KyuError::Runtime("CREATE rel: cannot resolve source node".into())
622 })?;
623 let dst_vi = dst_var.ok_or_else(|| {
624 KyuError::Runtime("CREATE rel: cannot resolve destination node".into())
625 })?;
626
627 let src_vals = binding.get(&src_vi).ok_or_else(|| {
628 KyuError::Runtime(format!(
629 "CREATE rel: source var {src_vi} not in bindings"
630 ))
631 })?;
632 let dst_vals = binding.get(&dst_vi).ok_or_else(|| {
633 KyuError::Runtime(format!("CREATE rel: dest var {dst_vi} not in bindings"))
634 })?;
635
636 let rel_entry = catalog
638 .find_by_id(rel.table_id)
639 .and_then(|e| e.as_rel_table())
640 .ok_or_else(|| {
641 KyuError::Catalog(format!("rel table {:?} not found", rel.table_id))
642 })?;
643
644 let src_node_entry = catalog
645 .find_by_id(rel_entry.from_table_id)
646 .and_then(|e| e.as_node_table())
647 .ok_or_else(|| KyuError::Catalog("source node table not found".into()))?;
648 let dst_node_entry = catalog
649 .find_by_id(rel_entry.to_table_id)
650 .and_then(|e| e.as_node_table())
651 .ok_or_else(|| KyuError::Catalog("dest node table not found".into()))?;
652
653 let src_pk = src_vals[src_node_entry.primary_key_idx].clone();
655 let dst_pk = dst_vals[dst_node_entry.primary_key_idx].clone();
656
657 let mut values = vec![src_pk, dst_pk];
659 let rel_properties = rel_entry.properties.as_slice();
660 for prop in rel_properties {
661 let value = if let Some((_pid, expr)) =
662 rel.properties.iter().find(|(pid, _)| *pid == prop.id)
663 {
664 evaluate_constant(expr)?
665 } else {
666 TypedValue::Null
667 };
668 values.push(value);
669 }
670
671 rel_inserts.push((rel.table_id, values));
672 i += 1;
673 }
674 }
675 }
676 Ok(())
677 }
678
679 fn extract_match_node(
681 &self,
682 match_clause: &BoundMatchClause,
683 ) -> KyuResult<(TableId, Option<u32>)> {
684 for pattern in &match_clause.patterns {
685 for element in &pattern.elements {
686 if let BoundPatternElement::Node(node) = element {
687 return Ok((node.table_id, node.variable_index));
688 }
689 }
690 }
691 Err(KyuError::NotImplemented(
692 "MATCH clause must contain at least one node pattern".into(),
693 ))
694 }
695
696 pub fn apply_delta(&self, batch: DeltaBatch) -> KyuResult<DeltaStats> {
705 let start = Instant::now();
706 let mut stats = DeltaStats {
707 total_deltas: batch.len() as u64,
708 ..DeltaStats::default()
709 };
710
711 let mut txn = self
713 .txn_mgr
714 .begin(TransactionType::Write)
715 .map_err(|e| KyuError::Transaction(e.to_string()))?;
716
717 let catalog = self.catalog.read();
718 let mut storage = self.storage.write().unwrap();
719
720 for delta in batch.iter() {
721 match delta {
722 GraphDelta::UpsertNode {
723 key,
724 labels: _,
725 props,
726 } => {
727 let entry = catalog.find_by_name(key.label.as_str()).ok_or_else(|| {
728 KyuError::Catalog(format!("node table '{}' not found", key.label))
729 })?;
730 let node_entry = entry.as_node_table().ok_or_else(|| {
731 KyuError::Catalog(format!("'{}' is not a node table", key.label))
732 })?;
733 let table_id = node_entry.table_id;
734 let pk_col_idx = node_entry.primary_key_idx;
735 let pk_type = &node_entry.properties[pk_col_idx].data_type;
736 let pk_value = parse_primary_key(key.primary_key.as_str(), pk_type)?;
737
738 let existing = find_row_by_pk(&storage, table_id, pk_col_idx, &pk_value)?;
739
740 if let Some(row_idx) = existing {
741 for (prop_name, value) in props {
743 if let Some(col_idx) =
744 find_property_index(node_entry, prop_name.as_str())
745 {
746 storage.update_cell(table_id, row_idx, col_idx, value)?;
747 }
748 }
749 stats.nodes_updated += 1;
750 } else {
751 let values = build_node_row(node_entry, &pk_value, props);
753 storage.insert_row(table_id, &values)?;
754 stats.nodes_created += 1;
755 }
756 }
757
758 GraphDelta::UpsertEdge {
759 src,
760 rel_type,
761 dst,
762 props,
763 } => {
764 let entry = catalog.find_by_name(rel_type.as_str()).ok_or_else(|| {
765 KyuError::Catalog(format!("rel table '{}' not found", rel_type))
766 })?;
767 let rel_entry = entry.as_rel_table().ok_or_else(|| {
768 KyuError::Catalog(format!("'{}' is not a rel table", rel_type))
769 })?;
770 let rel_table_id = rel_entry.table_id;
771
772 let src_node = catalog
774 .find_by_name(src.label.as_str())
775 .and_then(|e| e.as_node_table())
776 .ok_or_else(|| {
777 KyuError::Catalog(format!("node table '{}' not found", src.label))
778 })?;
779 let dst_node = catalog
780 .find_by_name(dst.label.as_str())
781 .and_then(|e| e.as_node_table())
782 .ok_or_else(|| {
783 KyuError::Catalog(format!("node table '{}' not found", dst.label))
784 })?;
785
786 let src_pk_type = &src_node.properties[src_node.primary_key_idx].data_type;
787 let dst_pk_type = &dst_node.properties[dst_node.primary_key_idx].data_type;
788 let src_pk = parse_primary_key(src.primary_key.as_str(), src_pk_type)?;
789 let dst_pk = parse_primary_key(dst.primary_key.as_str(), dst_pk_type)?;
790
791 let existing = find_edge_row(&storage, rel_table_id, &src_pk, &dst_pk)?;
793
794 if let Some(row_idx) = existing {
795 for (prop_name, value) in props {
797 if let Some(prop_idx) =
798 find_rel_property_index(rel_entry, prop_name.as_str())
799 {
800 let col_idx = prop_idx + 2; storage.update_cell(rel_table_id, row_idx, col_idx, value)?;
802 }
803 }
804 stats.edges_updated += 1;
805 } else {
806 let values = build_edge_row(rel_entry, &src_pk, &dst_pk, props);
808 storage.insert_row(rel_table_id, &values)?;
809 stats.edges_created += 1;
810 }
811 }
812
813 GraphDelta::DeleteNode { key } => {
814 let entry = catalog
815 .find_by_name(key.label.as_str())
816 .and_then(|e| e.as_node_table())
817 .ok_or_else(|| {
818 KyuError::Catalog(format!("node table '{}' not found", key.label))
819 })?;
820 let table_id = entry.table_id;
821 let pk_col_idx = entry.primary_key_idx;
822 let pk_type = &entry.properties[pk_col_idx].data_type;
823 let pk_value = parse_primary_key(key.primary_key.as_str(), pk_type)?;
824
825 if let Some(row_idx) =
826 find_row_by_pk(&storage, table_id, pk_col_idx, &pk_value)?
827 {
828 storage.delete_row(table_id, row_idx)?;
829 stats.nodes_deleted += 1;
830 }
831 }
832
833 GraphDelta::DeleteEdge { src, rel_type, dst } => {
834 let rel_entry = catalog
835 .find_by_name(rel_type.as_str())
836 .and_then(|e| e.as_rel_table())
837 .ok_or_else(|| {
838 KyuError::Catalog(format!("rel table '{}' not found", rel_type))
839 })?;
840 let rel_table_id = rel_entry.table_id;
841
842 let src_node = catalog
843 .find_by_name(src.label.as_str())
844 .and_then(|e| e.as_node_table())
845 .ok_or_else(|| {
846 KyuError::Catalog(format!("node table '{}' not found", src.label))
847 })?;
848 let dst_node = catalog
849 .find_by_name(dst.label.as_str())
850 .and_then(|e| e.as_node_table())
851 .ok_or_else(|| {
852 KyuError::Catalog(format!("node table '{}' not found", dst.label))
853 })?;
854
855 let src_pk = parse_primary_key(
856 src.primary_key.as_str(),
857 &src_node.properties[src_node.primary_key_idx].data_type,
858 )?;
859 let dst_pk = parse_primary_key(
860 dst.primary_key.as_str(),
861 &dst_node.properties[dst_node.primary_key_idx].data_type,
862 )?;
863
864 if let Some(row_idx) = find_edge_row(&storage, rel_table_id, &src_pk, &dst_pk)?
865 {
866 storage.delete_row(rel_table_id, row_idx)?;
867 stats.edges_deleted += 1;
868 }
869 }
870 }
871 }
872
873 drop(storage);
874 drop(catalog);
875
876 self.txn_mgr
878 .commit(&mut txn, &self.wal, |_, _| {})
879 .map_err(|e| KyuError::Transaction(e.to_string()))?;
880 let _ = self.checkpointer.try_checkpoint();
881
882 stats.elapsed_micros = start.elapsed().as_micros() as u64;
883 Ok(stats)
884 }
885
886 fn try_call_extension(&self, cypher: &str) -> KyuResult<Option<QueryResult>> {
891 let trimmed = cypher.trim();
892 if !trimmed.to_uppercase().starts_with("CALL ") {
893 return Ok(None);
894 }
895
896 let rest = trimmed[5..].trim();
898 let dot_pos = rest.find('.').ok_or_else(|| {
899 KyuError::Binder("CALL requires <extension>.<procedure>(...) syntax".into())
900 })?;
901 let ext_name = &rest[..dot_pos];
902 let after_dot = &rest[dot_pos + 1..];
903
904 let paren_pos = after_dot.find('(').ok_or_else(|| {
905 KyuError::Binder("CALL requires <extension>.<procedure>(...) syntax".into())
906 })?;
907 let proc_name = &after_dot[..paren_pos];
908 let args_str = after_dot[paren_pos + 1..].trim_end_matches([')', ';']);
909
910 let args: Vec<String> = if args_str.trim().is_empty() {
911 Vec::new()
912 } else {
913 args_str
914 .split(',')
915 .map(|s| s.trim().trim_matches('\'').to_string())
916 .collect()
917 };
918
919 let ext = self
921 .extensions
922 .iter()
923 .find(|e| e.name() == ext_name)
924 .ok_or_else(|| KyuError::Binder(format!("unknown extension '{ext_name}'")))?;
925
926 let adjacency = if ext.needs_graph() {
928 self.build_graph_adjacency()
929 } else {
930 std::collections::HashMap::new()
931 };
932
933 let rows = ext
935 .execute(proc_name, &args, &adjacency)
936 .map_err(|e| KyuError::Runtime(format!("extension error: {e}")))?;
937
938 let proc_sig = ext
940 .procedures()
941 .into_iter()
942 .find(|p| p.name == proc_name)
943 .ok_or_else(|| {
944 KyuError::Binder(format!(
945 "unknown procedure '{proc_name}' in extension '{ext_name}'"
946 ))
947 })?;
948
949 let col_names: Vec<SmolStr> = proc_sig
950 .columns
951 .iter()
952 .map(|c| SmolStr::new(&c.name))
953 .collect();
954 let col_types: Vec<LogicalType> = proc_sig
955 .columns
956 .iter()
957 .map(|c| c.data_type.clone())
958 .collect();
959
960 let mut result = QueryResult::new(col_names, col_types);
961 for proc_row in rows {
962 result.push_row(proc_row);
963 }
964
965 Ok(Some(result))
966 }
967
968 fn build_graph_adjacency(&self) -> std::collections::HashMap<i64, Vec<(i64, f64)>> {
973 use kyu_executor::value_vector::ValueVector;
974
975 let mut adjacency: std::collections::HashMap<i64, Vec<(i64, f64)>> =
976 std::collections::HashMap::new();
977 let catalog = self.catalog.read();
978 let storage = self.storage.read().unwrap();
979
980 for rel in catalog.rel_tables() {
981 let table_id = rel.table_id;
982 for chunk in storage.scan_table(table_id) {
983 let n = chunk.num_rows();
984 if n == 0 {
985 continue;
986 }
987
988 let src_col = chunk.column(0);
989 let dst_col = chunk.column(1);
990
991 if chunk.selection().is_identity()
993 && let (ValueVector::Flat(src_flat), ValueVector::Flat(dst_flat)) =
994 (src_col, dst_col)
995 {
996 let src_slice = src_flat.data_as_i64_slice();
997 let dst_slice = dst_flat.data_as_i64_slice();
998 let src_nm = src_flat.null_mask();
999 let dst_nm = dst_flat.null_mask();
1000 for i in 0..n {
1001 if !src_nm.is_null(i as u64) && !dst_nm.is_null(i as u64) {
1002 adjacency
1003 .entry(src_slice[i])
1004 .or_default()
1005 .push((dst_slice[i], 1.0));
1006 }
1007 }
1008 continue;
1009 }
1010
1011 for row_idx in 0..n {
1013 let src = chunk.get_value(row_idx, 0);
1014 let dst = chunk.get_value(row_idx, 1);
1015 if let (TypedValue::Int64(s), TypedValue::Int64(d)) = (src, dst) {
1016 adjacency.entry(s).or_default().push((d, 1.0));
1017 }
1018 }
1019 }
1020 }
1021
1022 adjacency
1023 }
1024
1025 fn exec_create_node_table(
1028 &self,
1029 create: &kyu_binder::BoundCreateNodeTable,
1030 ) -> KyuResult<QueryResult> {
1031 let mut catalog = self.catalog.begin_write();
1032
1033 let table_id = catalog.alloc_table_id();
1034 let properties: Vec<Property> = create
1035 .columns
1036 .iter()
1037 .map(|col| {
1038 let prop_id = catalog.alloc_property_id();
1039 Property::new(
1040 prop_id,
1041 col.name.clone(),
1042 col.data_type.clone(),
1043 col.property_id.0 as usize == create.primary_key_idx,
1044 )
1045 })
1046 .collect();
1047
1048 let schema: Vec<LogicalType> = create.columns.iter().map(|c| c.data_type.clone()).collect();
1049
1050 catalog.add_node_table(NodeTableEntry {
1051 table_id,
1052 name: create.name.clone(),
1053 properties,
1054 primary_key_idx: create.primary_key_idx,
1055 num_rows: 0,
1056 comment: None,
1057 })?;
1058
1059 self.catalog.commit_write(catalog);
1060
1061 self.storage.write().unwrap().create_table(table_id, schema);
1063
1064 Ok(QueryResult::new(vec![], vec![]))
1065 }
1066
1067 fn exec_create_rel_table(
1068 &self,
1069 create: &kyu_binder::BoundCreateRelTable,
1070 ) -> KyuResult<QueryResult> {
1071 let mut catalog = self.catalog.begin_write();
1072
1073 let table_id = catalog.alloc_table_id();
1074 let properties: Vec<Property> = create
1075 .columns
1076 .iter()
1077 .map(|col| {
1078 let prop_id = catalog.alloc_property_id();
1079 Property::new(prop_id, col.name.clone(), col.data_type.clone(), false)
1080 })
1081 .collect();
1082
1083 let from_key_type = catalog
1085 .find_by_id(create.from_table_id)
1086 .and_then(|e| e.as_node_table())
1087 .map(|n| n.primary_key_property().data_type.clone())
1088 .unwrap_or(LogicalType::Int64);
1089 let to_key_type = catalog
1090 .find_by_id(create.to_table_id)
1091 .and_then(|e| e.as_node_table())
1092 .map(|n| n.primary_key_property().data_type.clone())
1093 .unwrap_or(LogicalType::Int64);
1094 let mut schema = vec![from_key_type, to_key_type];
1095 schema.extend(create.columns.iter().map(|c| c.data_type.clone()));
1096
1097 catalog.add_rel_table(RelTableEntry {
1098 table_id,
1099 name: create.name.clone(),
1100 from_table_id: create.from_table_id,
1101 to_table_id: create.to_table_id,
1102 properties,
1103 num_rows: 0,
1104 comment: None,
1105 })?;
1106
1107 self.catalog.commit_write(catalog);
1108
1109 self.storage.write().unwrap().create_table(table_id, schema);
1110
1111 Ok(QueryResult::new(vec![], vec![]))
1112 }
1113
1114 fn exec_drop(&self, drop: &kyu_binder::BoundDrop) -> KyuResult<QueryResult> {
1115 let mut catalog = self.catalog.begin_write();
1116 catalog
1117 .remove_by_id(drop.table_id)
1118 .ok_or_else(|| KyuError::Catalog(format!("table '{}' not found", drop.name)))?;
1119 self.catalog.commit_write(catalog);
1120
1121 self.storage.write().unwrap().drop_table(drop.table_id);
1122
1123 Ok(QueryResult::new(vec![], vec![]))
1124 }
1125
1126 fn exec_copy_from(&self, copy: &kyu_binder::BoundCopyFrom) -> KyuResult<QueryResult> {
1129 let path_val = evaluate_constant(©.source)?;
1131 let path = match &path_val {
1132 TypedValue::String(s) => s.as_str().to_string(),
1133 _ => {
1134 return Err(KyuError::Copy(
1135 "COPY FROM source must be a string path".into(),
1136 ));
1137 }
1138 };
1139
1140 let catalog_snapshot = self.catalog.read();
1142 let entry = catalog_snapshot
1143 .find_by_id(copy.table_id)
1144 .ok_or_else(|| KyuError::Catalog(format!("table {:?} not found", copy.table_id)))?;
1145 let properties = entry.properties();
1146 let schema: Vec<LogicalType> = properties.iter().map(|p| p.data_type.clone()).collect();
1147 drop(catalog_snapshot);
1148
1149 let reader = kyu_copy::open_reader(&path, &schema)?;
1151
1152 let mut storage = self.storage.write().unwrap();
1153 for row_result in reader {
1154 let values = row_result?;
1155 storage.insert_row(copy.table_id, &values)?;
1156 }
1157
1158 Ok(QueryResult::new(vec![], vec![]))
1159 }
1160
1161 fn exec_load_from(&self, load: &kyu_binder::BoundLoadFrom) -> KyuResult<QueryResult> {
1164 let path_val = evaluate_constant(&load.source)?;
1165 let path = match &path_val {
1166 TypedValue::String(s) => s.as_str().to_string(),
1167 _ => {
1168 return Err(KyuError::Copy(
1169 "LOAD FROM source must be a string path".into(),
1170 ));
1171 }
1172 };
1173
1174 let triples = ext_rdf::parse_triples(&path)?;
1175 let schema = ext_rdf::infer_schema(&triples)?;
1176
1177 let mut node_table_ids: HashMap<String, TableId> = HashMap::new();
1179
1180 for node_table in &schema.node_tables {
1182 let mut catalog = self.catalog.begin_write();
1183
1184 let table_id = catalog.alloc_table_id();
1185
1186 let mut storage_schema = vec![LogicalType::String];
1188 let uri_prop_id = catalog.alloc_property_id();
1189 let mut properties = vec![Property::new(
1190 uri_prop_id,
1191 SmolStr::new("uri"),
1192 LogicalType::String,
1193 true,
1194 )];
1195
1196 for (prop_name, logical_type) in &node_table.properties {
1197 let prop_id = catalog.alloc_property_id();
1198 properties.push(Property::new(
1199 prop_id,
1200 SmolStr::new(prop_name),
1201 logical_type.clone(),
1202 false,
1203 ));
1204 storage_schema.push(logical_type.clone());
1205 }
1206
1207 catalog.add_node_table(NodeTableEntry {
1208 table_id,
1209 name: SmolStr::new(&node_table.name),
1210 properties,
1211 primary_key_idx: 0,
1212 num_rows: 0,
1213 comment: None,
1214 })?;
1215
1216 self.catalog.commit_write(catalog);
1217 self.storage
1218 .write()
1219 .unwrap()
1220 .create_table(table_id, storage_schema);
1221
1222 node_table_ids.insert(node_table.name.clone(), table_id);
1223 }
1224
1225 {
1227 let mut storage = self.storage.write().unwrap();
1228 for node_table in &schema.node_tables {
1229 let table_id = node_table_ids[&node_table.name];
1230 for (uri, prop_values) in &node_table.rows {
1231 let mut row = vec![TypedValue::String(SmolStr::new(uri))];
1232 row.extend_from_slice(prop_values);
1233 storage.insert_row(table_id, &row)?;
1234 }
1235 }
1236 }
1237
1238 for rel_table in &schema.rel_tables {
1240 let from_id = match node_table_ids.get(&rel_table.from_table) {
1241 Some(id) => *id,
1242 None => continue, };
1244 let to_id = match node_table_ids.get(&rel_table.to_table) {
1245 Some(id) => *id,
1246 None => continue,
1247 };
1248
1249 let mut catalog = self.catalog.begin_write();
1250 let table_id = catalog.alloc_table_id();
1251
1252 catalog.add_rel_table(RelTableEntry {
1253 table_id,
1254 name: SmolStr::new(&rel_table.name),
1255 from_table_id: from_id,
1256 to_table_id: to_id,
1257 properties: vec![],
1258 num_rows: 0,
1259 comment: None,
1260 })?;
1261
1262 self.catalog.commit_write(catalog);
1263
1264 self.storage
1266 .write()
1267 .unwrap()
1268 .create_table(table_id, vec![LogicalType::String, LogicalType::String]);
1269
1270 let mut storage = self.storage.write().unwrap();
1271 for (src_uri, dst_uri) in &rel_table.edges {
1272 let row = vec![
1273 TypedValue::String(SmolStr::new(src_uri)),
1274 TypedValue::String(SmolStr::new(dst_uri)),
1275 ];
1276 storage.insert_row(table_id, &row)?;
1277 }
1278 }
1279
1280 Ok(QueryResult::new(vec![], vec![]))
1281 }
1282}
1283
1284fn parse_primary_key(value: &str, ty: &LogicalType) -> KyuResult<TypedValue> {
1288 match ty {
1289 LogicalType::Int8 => value
1290 .parse::<i8>()
1291 .map(TypedValue::Int8)
1292 .map_err(|e| KyuError::Delta(format!("cannot parse PK '{value}' as INT8: {e}"))),
1293 LogicalType::Int16 => value
1294 .parse::<i16>()
1295 .map(TypedValue::Int16)
1296 .map_err(|e| KyuError::Delta(format!("cannot parse PK '{value}' as INT16: {e}"))),
1297 LogicalType::Int32 => value
1298 .parse::<i32>()
1299 .map(TypedValue::Int32)
1300 .map_err(|e| KyuError::Delta(format!("cannot parse PK '{value}' as INT32: {e}"))),
1301 LogicalType::Int64 | LogicalType::Serial => value
1302 .parse::<i64>()
1303 .map(TypedValue::Int64)
1304 .map_err(|e| KyuError::Delta(format!("cannot parse PK '{value}' as INT64: {e}"))),
1305 LogicalType::String => Ok(TypedValue::String(SmolStr::new(value))),
1306 _ => Err(KyuError::Delta(format!(
1307 "unsupported primary key type '{}' for delta upsert",
1308 ty.type_name()
1309 ))),
1310 }
1311}
1312
1313fn find_row_by_pk(
1315 storage: &crate::storage::NodeGroupStorage,
1316 table_id: TableId,
1317 pk_col_idx: usize,
1318 pk_value: &TypedValue,
1319) -> KyuResult<Option<u64>> {
1320 let rows = storage.scan_rows(table_id)?;
1321 for (row_idx, row_values) in &rows {
1322 if row_values.get(pk_col_idx) == Some(pk_value) {
1323 return Ok(Some(*row_idx));
1324 }
1325 }
1326 Ok(None)
1327}
1328
1329fn find_edge_row(
1332 storage: &crate::storage::NodeGroupStorage,
1333 rel_table_id: TableId,
1334 src_pk: &TypedValue,
1335 dst_pk: &TypedValue,
1336) -> KyuResult<Option<u64>> {
1337 let rows = storage.scan_rows(rel_table_id)?;
1338 for (row_idx, row_values) in &rows {
1339 if row_values.first() == Some(src_pk) && row_values.get(1) == Some(dst_pk) {
1340 return Ok(Some(*row_idx));
1341 }
1342 }
1343 Ok(None)
1344}
1345
1346fn find_property_index(entry: &NodeTableEntry, name: &str) -> Option<usize> {
1348 let lower = name.to_lowercase();
1349 entry
1350 .properties
1351 .iter()
1352 .position(|p| p.name.to_lowercase() == lower)
1353}
1354
1355fn find_rel_property_index(entry: &RelTableEntry, name: &str) -> Option<usize> {
1357 let lower = name.to_lowercase();
1358 entry
1359 .properties
1360 .iter()
1361 .position(|p| p.name.to_lowercase() == lower)
1362}
1363
1364fn build_node_row(
1366 entry: &NodeTableEntry,
1367 pk_value: &TypedValue,
1368 props: &hashbrown::HashMap<SmolStr, TypedValue>,
1369) -> Vec<TypedValue> {
1370 entry
1371 .properties
1372 .iter()
1373 .enumerate()
1374 .map(|(i, prop)| {
1375 if i == entry.primary_key_idx {
1376 pk_value.clone()
1377 } else if let Some(val) = props.get(&prop.name) {
1378 val.clone()
1379 } else {
1380 TypedValue::Null
1381 }
1382 })
1383 .collect()
1384}
1385
1386fn build_edge_row(
1388 entry: &RelTableEntry,
1389 src_pk: &TypedValue,
1390 dst_pk: &TypedValue,
1391 props: &hashbrown::HashMap<SmolStr, TypedValue>,
1392) -> Vec<TypedValue> {
1393 let mut row = vec![src_pk.clone(), dst_pk.clone()];
1394 for prop in &entry.properties {
1395 if let Some(val) = props.get(&prop.name) {
1396 row.push(val.clone());
1397 } else {
1398 row.push(TypedValue::Null);
1399 }
1400 }
1401 row
1402}
1403
1404#[cfg(test)]
1405mod tests {
1406 use crate::database::Database;
1407 use kyu_types::TypedValue;
1408 use smol_str::SmolStr;
1409
1410 #[test]
1411 fn create_database_and_connect() {
1412 let db = Database::in_memory();
1413 let _conn = db.connect();
1414 assert_eq!(db.catalog().num_tables(), 0);
1415 }
1416
1417 #[test]
1418 fn return_literal() {
1419 let db = Database::in_memory();
1420 let conn = db.connect();
1421 let result = conn.query("RETURN 1 AS x").unwrap();
1422 assert_eq!(result.num_rows(), 1);
1423 assert_eq!(result.row(0), vec![TypedValue::Int64(1)]);
1424 }
1425
1426 #[test]
1427 fn return_arithmetic() {
1428 let db = Database::in_memory();
1429 let conn = db.connect();
1430 let result = conn.query("RETURN 2 + 3 AS sum").unwrap();
1431 assert_eq!(result.row(0), vec![TypedValue::Int64(5)]);
1432 }
1433
1434 #[test]
1435 fn create_node_table() {
1436 let db = Database::in_memory();
1437 let conn = db.connect();
1438 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1439 .unwrap();
1440
1441 assert_eq!(db.catalog().num_tables(), 1);
1442 let snapshot = db.catalog().read();
1443 let entry = snapshot.find_by_name("Person").unwrap();
1444 assert!(entry.is_node_table());
1445 assert_eq!(entry.properties().len(), 2);
1446
1447 assert!(db.storage().read().unwrap().has_table(entry.table_id()));
1448 }
1449
1450 #[test]
1451 fn create_and_query_empty_table() {
1452 let db = Database::in_memory();
1453 let conn = db.connect();
1454 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1455 .unwrap();
1456 let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1457 assert_eq!(result.num_rows(), 0);
1458 }
1459
1460 #[test]
1461 fn create_rel_table() {
1462 let db = Database::in_memory();
1463 let conn = db.connect();
1464 conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1465 .unwrap();
1466 conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person, since INT64)")
1467 .unwrap();
1468
1469 assert_eq!(db.catalog().num_tables(), 2);
1470 let snapshot = db.catalog().read();
1471 let entry = snapshot.find_by_name("KNOWS").unwrap();
1472 assert!(entry.is_rel_table());
1473 }
1474
1475 #[test]
1476 fn drop_table() {
1477 let db = Database::in_memory();
1478 let conn = db.connect();
1479 conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1480 .unwrap();
1481 assert_eq!(db.catalog().num_tables(), 1);
1482
1483 conn.query("DROP TABLE Person").unwrap();
1484 assert_eq!(db.catalog().num_tables(), 0);
1485 }
1486
1487 #[test]
1488 fn create_duplicate_error() {
1489 let db = Database::in_memory();
1490 let conn = db.connect();
1491 conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1492 .unwrap();
1493 let result = conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))");
1494 assert!(result.is_err());
1495 }
1496
1497 #[test]
1498 fn parse_error_propagated() {
1499 let db = Database::in_memory();
1500 let conn = db.connect();
1501 let result = conn.query("THIS IS NOT VALID CYPHER !!!");
1502 assert!(result.is_err());
1503 }
1504
1505 #[test]
1506 fn multiple_connections_share_state() {
1507 let db = Database::in_memory();
1508 let conn1 = db.connect();
1509 let conn2 = db.connect();
1510
1511 conn1
1512 .query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1513 .unwrap();
1514
1515 assert_eq!(db.catalog().num_tables(), 1);
1517 let result = conn2.query("MATCH (p:Person) RETURN p.id").unwrap();
1518 assert_eq!(result.num_rows(), 0);
1519 }
1520
1521 #[test]
1522 fn create_node_via_cypher() {
1523 let db = Database::in_memory();
1524 let conn = db.connect();
1525 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1526 .unwrap();
1527
1528 conn.query("CREATE (n:Person {id: 1, name: 'Alice'})")
1529 .unwrap();
1530
1531 let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1532 assert_eq!(result.num_rows(), 1);
1533 assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1534 }
1535
1536 #[test]
1537 fn create_multiple_nodes() {
1538 let db = Database::in_memory();
1539 let conn = db.connect();
1540 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1541 .unwrap();
1542
1543 conn.query("CREATE (a:Person {id: 1, name: 'Alice'}), (b:Person {id: 2, name: 'Bob'})")
1544 .unwrap();
1545
1546 let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1547 assert_eq!(result.num_rows(), 2);
1548 }
1549
1550 #[test]
1551 fn create_node_partial_properties() {
1552 let db = Database::in_memory();
1553 let conn = db.connect();
1554 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1555 .unwrap();
1556
1557 conn.query("CREATE (n:Person {id: 1})").unwrap();
1559
1560 let result = conn.query("MATCH (p:Person) RETURN p.id, p.name").unwrap();
1561 assert_eq!(result.num_rows(), 1);
1562 assert_eq!(result.row(0)[0], TypedValue::Int64(1));
1563 assert_eq!(result.row(0)[1], TypedValue::Null);
1564 }
1565
1566 #[test]
1567 fn create_and_return() {
1568 let db = Database::in_memory();
1569 let conn = db.connect();
1570 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1571 .unwrap();
1572
1573 let result = conn
1574 .query("CREATE (n:Person {id: 1, name: 'Alice'}) RETURN n.name, n.id")
1575 .unwrap();
1576 assert_eq!(result.num_rows(), 1);
1577 assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1578 assert_eq!(result.row(0)[1], TypedValue::Int64(1));
1579 }
1580
1581 #[test]
1582 fn match_set_property() {
1583 let db = Database::in_memory();
1584 let conn = db.connect();
1585 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, age INT64, PRIMARY KEY (id))")
1586 .unwrap();
1587 conn.query("CREATE (n:Person {id: 1, name: 'Alice', age: 25})")
1588 .unwrap();
1589
1590 conn.query("MATCH (p:Person) WHERE p.name = 'Alice' SET p.age = 31")
1591 .unwrap();
1592
1593 let result = conn.query("MATCH (p:Person) RETURN p.age").unwrap();
1594 assert_eq!(result.num_rows(), 1);
1595 assert_eq!(result.row(0)[0], TypedValue::Int64(31));
1596 }
1597
1598 #[test]
1599 fn match_set_with_where() {
1600 let db = Database::in_memory();
1601 let conn = db.connect();
1602 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, age INT64, PRIMARY KEY (id))")
1603 .unwrap();
1604 conn.query("CREATE (a:Person {id: 1, name: 'Alice', age: 25})")
1605 .unwrap();
1606 conn.query("CREATE (b:Person {id: 2, name: 'Bob', age: 30})")
1607 .unwrap();
1608
1609 conn.query("MATCH (p:Person) WHERE p.id = 1 SET p.age = 26")
1611 .unwrap();
1612
1613 let result = conn.query("MATCH (p:Person) RETURN p.name, p.age").unwrap();
1614 assert_eq!(result.num_rows(), 2);
1615 let alice_row = result
1617 .iter_rows()
1618 .find(|r| r[0] == TypedValue::String(SmolStr::new("Alice")))
1619 .unwrap();
1620 let bob_row = result
1621 .iter_rows()
1622 .find(|r| r[0] == TypedValue::String(SmolStr::new("Bob")))
1623 .unwrap();
1624 assert_eq!(alice_row[1], TypedValue::Int64(26)); assert_eq!(bob_row[1], TypedValue::Int64(30)); }
1627
1628 #[test]
1629 fn match_set_all_rows() {
1630 let db = Database::in_memory();
1631 let conn = db.connect();
1632 conn.query("CREATE NODE TABLE Person (id INT64, active INT64, PRIMARY KEY (id))")
1633 .unwrap();
1634 conn.query("CREATE (a:Person {id: 1, active: 0})").unwrap();
1635 conn.query("CREATE (b:Person {id: 2, active: 0})").unwrap();
1636
1637 conn.query("MATCH (p:Person) SET p.active = 1").unwrap();
1639
1640 let result = conn.query("MATCH (p:Person) RETURN p.active").unwrap();
1641 assert_eq!(result.num_rows(), 2);
1642 assert_eq!(result.row(0)[0], TypedValue::Int64(1));
1643 assert_eq!(result.row(1)[0], TypedValue::Int64(1));
1644 }
1645
1646 #[test]
1647 fn match_delete() {
1648 let db = Database::in_memory();
1649 let conn = db.connect();
1650 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1651 .unwrap();
1652 conn.query("CREATE (a:Person {id: 1, name: 'Alice'})")
1653 .unwrap();
1654 conn.query("CREATE (b:Person {id: 2, name: 'Bob'})")
1655 .unwrap();
1656
1657 conn.query("MATCH (p:Person) WHERE p.name = 'Alice' DELETE p")
1658 .unwrap();
1659
1660 let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1661 assert_eq!(result.num_rows(), 1);
1662 assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Bob")));
1663 }
1664
1665 #[test]
1666 fn match_delete_all() {
1667 let db = Database::in_memory();
1668 let conn = db.connect();
1669 conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1670 .unwrap();
1671 conn.query("CREATE (a:Person {id: 1})").unwrap();
1672 conn.query("CREATE (b:Person {id: 2})").unwrap();
1673
1674 conn.query("MATCH (p:Person) DELETE p").unwrap();
1675
1676 let result = conn.query("MATCH (p:Person) RETURN p.id").unwrap();
1677 assert_eq!(result.num_rows(), 0);
1678 }
1679
1680 #[test]
1681 fn storage_roundtrip_insert_scan() {
1682 let db = Database::in_memory();
1683 let conn = db.connect();
1684 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1685 .unwrap();
1686
1687 let snapshot = db.catalog().read();
1689 let table_id = snapshot.find_by_name("Person").unwrap().table_id();
1690 drop(snapshot);
1691
1692 db.storage()
1694 .write()
1695 .unwrap()
1696 .insert_row(
1697 table_id,
1698 &[
1699 TypedValue::Int64(1),
1700 TypedValue::String(SmolStr::new("Alice")),
1701 ],
1702 )
1703 .unwrap();
1704
1705 let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1707 assert_eq!(result.num_rows(), 1);
1708 assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1709 }
1710
1711 #[test]
1712 fn storage_roundtrip_multiple_rows() {
1713 let db = Database::in_memory();
1714 let conn = db.connect();
1715 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, age INT64, PRIMARY KEY (id))")
1716 .unwrap();
1717
1718 let snapshot = db.catalog().read();
1719 let table_id = snapshot.find_by_name("Person").unwrap().table_id();
1720 drop(snapshot);
1721
1722 let mut storage = db.storage().write().unwrap();
1723 storage
1724 .insert_row(
1725 table_id,
1726 &[
1727 TypedValue::Int64(1),
1728 TypedValue::String(SmolStr::new("Alice")),
1729 TypedValue::Int64(25),
1730 ],
1731 )
1732 .unwrap();
1733 storage
1734 .insert_row(
1735 table_id,
1736 &[
1737 TypedValue::Int64(2),
1738 TypedValue::String(SmolStr::new("Bob")),
1739 TypedValue::Int64(30),
1740 ],
1741 )
1742 .unwrap();
1743 drop(storage);
1744
1745 let result = conn.query("MATCH (p:Person) RETURN p.name, p.age").unwrap();
1746 assert_eq!(result.num_rows(), 2);
1747 }
1748
1749 #[test]
1750 fn copy_from_csv() {
1751 use std::io::Write;
1752
1753 let dir = std::env::temp_dir().join("kyu_test_csv");
1754 let _ = std::fs::create_dir_all(&dir);
1755 let csv_path = dir.join("persons.csv");
1756 {
1757 let mut f = std::fs::File::create(&csv_path).unwrap();
1758 writeln!(f, "id,name").unwrap();
1759 writeln!(f, "1,Alice").unwrap();
1760 writeln!(f, "2,Bob").unwrap();
1761 writeln!(f, "3,Charlie").unwrap();
1762 }
1763
1764 let db = Database::in_memory();
1765 let conn = db.connect();
1766 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1767 .unwrap();
1768 conn.query(&format!("COPY Person FROM '{}'", csv_path.display()))
1769 .unwrap();
1770
1771 let result = conn.query("MATCH (p:Person) RETURN p.id, p.name").unwrap();
1772 assert_eq!(result.num_rows(), 3);
1773
1774 let _ = std::fs::remove_file(&csv_path);
1776 }
1777
1778 #[test]
1779 fn copy_from_csv_multiple_types() {
1780 use std::io::Write;
1781
1782 let dir = std::env::temp_dir().join("kyu_test_csv");
1783 let _ = std::fs::create_dir_all(&dir);
1784 let csv_path = dir.join("typed.csv");
1785 {
1786 let mut f = std::fs::File::create(&csv_path).unwrap();
1787 writeln!(f, "id,name,score,active").unwrap();
1788 writeln!(f, "1,Alice,95.5,true").unwrap();
1789 writeln!(f, "2,Bob,87.3,false").unwrap();
1790 }
1791
1792 let db = Database::in_memory();
1793 let conn = db.connect();
1794 conn.query(
1795 "CREATE NODE TABLE Student (id INT64, name STRING, score DOUBLE, active BOOL, PRIMARY KEY (id))",
1796 )
1797 .unwrap();
1798 conn.query(&format!("COPY Student FROM '{}'", csv_path.display()))
1799 .unwrap();
1800
1801 let result = conn
1802 .query("MATCH (s:Student) RETURN s.name, s.score, s.active")
1803 .unwrap();
1804 assert_eq!(result.num_rows(), 2);
1805 assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1806 assert_eq!(result.row(0)[1], TypedValue::Double(95.5));
1807 assert_eq!(result.row(0)[2], TypedValue::Bool(true));
1808
1809 let _ = std::fs::remove_file(&csv_path);
1810 }
1811
1812 #[test]
1813 fn copy_from_parquet() {
1814 use arrow::array::{Int64Array, StringArray};
1815 use arrow::datatypes::{DataType, Field, Schema};
1816 use arrow::record_batch::RecordBatch;
1817 use parquet::arrow::ArrowWriter;
1818 use std::sync::Arc;
1819
1820 let dir = std::env::temp_dir().join("kyu_test_parquet_copy");
1821 let _ = std::fs::create_dir_all(&dir);
1822 let parquet_path = dir.join("persons.parquet");
1823 {
1824 let schema = Arc::new(Schema::new(vec![
1825 Field::new("id", DataType::Int64, false),
1826 Field::new("name", DataType::Utf8, false),
1827 ]));
1828 let ids = Int64Array::from(vec![1, 2, 3]);
1829 let names = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
1830 let batch =
1831 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(ids), Arc::new(names)])
1832 .unwrap();
1833 let file = std::fs::File::create(&parquet_path).unwrap();
1834 let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), None).unwrap();
1835 writer.write(&batch).unwrap();
1836 writer.close().unwrap();
1837 }
1838
1839 let db = Database::in_memory();
1840 let conn = db.connect();
1841 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1842 .unwrap();
1843 conn.query(&format!("COPY Person FROM '{}'", parquet_path.display()))
1844 .unwrap();
1845
1846 let result = conn.query("MATCH (p:Person) RETURN p.id, p.name").unwrap();
1847 assert_eq!(result.num_rows(), 3);
1848 assert_eq!(result.row(0)[0], TypedValue::Int64(1));
1849 assert_eq!(result.row(0)[1], TypedValue::String(SmolStr::new("Alice")));
1850
1851 let _ = std::fs::remove_dir_all(&dir);
1852 }
1853
1854 #[test]
1855 fn call_extension_pagerank() {
1856 let mut db = Database::in_memory();
1857 db.register_extension(Box::new(ext_algo::AlgoExtension));
1858 let conn = db.connect();
1859
1860 conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1862 .unwrap();
1863 conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person)")
1864 .unwrap();
1865 conn.query("CREATE (n:Person {id: 1})").unwrap();
1866 conn.query("CREATE (n:Person {id: 2})").unwrap();
1867 conn.query("CREATE (n:Person {id: 3})").unwrap();
1868
1869 let snapshot = db.catalog().read();
1871 let rel_table_id = snapshot.find_by_name("KNOWS").unwrap().table_id();
1872 drop(snapshot);
1873 {
1874 let mut storage = db.storage().write().unwrap();
1875 storage
1876 .insert_row(rel_table_id, &[TypedValue::Int64(1), TypedValue::Int64(2)])
1877 .unwrap();
1878 storage
1879 .insert_row(rel_table_id, &[TypedValue::Int64(2), TypedValue::Int64(3)])
1880 .unwrap();
1881 storage
1882 .insert_row(rel_table_id, &[TypedValue::Int64(3), TypedValue::Int64(1)])
1883 .unwrap();
1884 }
1885
1886 let result = conn
1887 .query("CALL algo.pageRank(0.85, 20, 0.000001)")
1888 .unwrap();
1889 assert_eq!(result.num_rows(), 3);
1890 assert_eq!(result.column_names.len(), 2);
1891 for row in result.iter_rows() {
1893 if let TypedValue::Double(rank) = &row[1] {
1894 assert!(*rank > 0.0);
1895 }
1896 }
1897 }
1898
1899 #[test]
1900 fn call_extension_wcc() {
1901 let mut db = Database::in_memory();
1902 db.register_extension(Box::new(ext_algo::AlgoExtension));
1903 let conn = db.connect();
1904
1905 conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1906 .unwrap();
1907 conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person)")
1908 .unwrap();
1909 conn.query("CREATE (n:Person {id: 1})").unwrap();
1910 conn.query("CREATE (n:Person {id: 2})").unwrap();
1911 conn.query("CREATE (n:Person {id: 10})").unwrap();
1912 conn.query("CREATE (n:Person {id: 11})").unwrap();
1913
1914 let snapshot = db.catalog().read();
1915 let rel_table_id = snapshot.find_by_name("KNOWS").unwrap().table_id();
1916 drop(snapshot);
1917 {
1918 let mut storage = db.storage().write().unwrap();
1919 storage
1920 .insert_row(rel_table_id, &[TypedValue::Int64(1), TypedValue::Int64(2)])
1921 .unwrap();
1922 storage
1923 .insert_row(
1924 rel_table_id,
1925 &[TypedValue::Int64(10), TypedValue::Int64(11)],
1926 )
1927 .unwrap();
1928 }
1929
1930 let result = conn.query("CALL algo.wcc()").unwrap();
1931 assert_eq!(result.num_rows(), 4);
1932 }
1933
1934 #[test]
1935 fn call_extension_betweenness() {
1936 let mut db = Database::in_memory();
1937 db.register_extension(Box::new(ext_algo::AlgoExtension));
1938 let conn = db.connect();
1939
1940 conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1941 .unwrap();
1942 conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person)")
1943 .unwrap();
1944 conn.query("CREATE (n:Person {id: 1})").unwrap();
1945 conn.query("CREATE (n:Person {id: 2})").unwrap();
1946 conn.query("CREATE (n:Person {id: 3})").unwrap();
1947
1948 let snapshot = db.catalog().read();
1949 let rel_table_id = snapshot.find_by_name("KNOWS").unwrap().table_id();
1950 drop(snapshot);
1951 {
1952 let mut storage = db.storage().write().unwrap();
1953 storage
1954 .insert_row(rel_table_id, &[TypedValue::Int64(1), TypedValue::Int64(2)])
1955 .unwrap();
1956 storage
1957 .insert_row(rel_table_id, &[TypedValue::Int64(2), TypedValue::Int64(3)])
1958 .unwrap();
1959 }
1960
1961 let result = conn.query("CALL algo.betweenness()").unwrap();
1962 assert_eq!(result.num_rows(), 3);
1963 }
1964
1965 #[test]
1966 fn call_unknown_extension() {
1967 let db = Database::in_memory();
1968 let conn = db.connect();
1969 let result = conn.query("CALL nonexistent.proc()");
1970 assert!(result.is_err());
1971 }
1972
1973 #[test]
1974 fn persistence_survives_restart() {
1975 let dir = std::env::temp_dir().join("kyu_test_persist_e2e");
1976 let _ = std::fs::remove_dir_all(&dir);
1977
1978 {
1980 let db = Database::open(&dir).unwrap();
1981 let conn = db.connect();
1982 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1983 .unwrap();
1984 conn.query("CREATE (n:Person {id: 1, name: 'Alice'})")
1985 .unwrap();
1986 conn.query("CREATE (n:Person {id: 2, name: 'Bob'})")
1987 .unwrap();
1988 }
1990
1991 {
1993 let db = Database::open(&dir).unwrap();
1994 let conn = db.connect();
1995
1996 assert_eq!(db.catalog().num_tables(), 1);
1998 let snapshot = db.catalog().read();
1999 assert!(snapshot.find_by_name("Person").is_some());
2000 drop(snapshot);
2001
2002 let result = conn.query("MATCH (p:Person) RETURN p.id, p.name").unwrap();
2004 assert_eq!(result.num_rows(), 2);
2005 }
2006
2007 let _ = std::fs::remove_dir_all(&dir);
2008 }
2009
2010 #[test]
2011 fn persistence_ddl_recovery_via_wal() {
2012 let dir = std::env::temp_dir().join("kyu_test_persist_ddl");
2013 let _ = std::fs::remove_dir_all(&dir);
2014
2015 {
2017 let db = Database::open(&dir).unwrap();
2018 let conn = db.connect();
2019 conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
2020 .unwrap();
2021 conn.query("CREATE NODE TABLE Organization (id INT64, name STRING, PRIMARY KEY (id))")
2022 .unwrap();
2023 }
2024
2025 {
2027 let db = Database::open(&dir).unwrap();
2028 assert_eq!(db.catalog().num_tables(), 2);
2029 let snapshot = db.catalog().read();
2030 assert!(snapshot.find_by_name("Person").is_some());
2031 assert!(snapshot.find_by_name("Organization").is_some());
2032 }
2033
2034 let _ = std::fs::remove_dir_all(&dir);
2035 }
2036
2037 #[test]
2038 fn persistence_empty_database() {
2039 let dir = std::env::temp_dir().join("kyu_test_persist_empty_db");
2040 let _ = std::fs::remove_dir_all(&dir);
2041
2042 {
2044 let _db = Database::open(&dir).unwrap();
2045 }
2046
2047 {
2049 let db = Database::open(&dir).unwrap();
2050 assert_eq!(db.catalog().num_tables(), 0);
2051 }
2052
2053 let _ = std::fs::remove_dir_all(&dir);
2054 }
2055
2056 #[test]
2059 fn return_param() {
2060 let db = Database::in_memory();
2061 let conn = db.connect();
2062 let mut params = std::collections::HashMap::new();
2063 params.insert("x".to_string(), TypedValue::Int64(42));
2064 let result = conn.query_with_params("RETURN $x AS val", params).unwrap();
2065 assert_eq!(result.num_rows(), 1);
2066 assert_eq!(result.row(0), vec![TypedValue::Int64(42)]);
2067 }
2068
2069 #[test]
2070 fn parameterized_where() {
2071 let db = Database::in_memory();
2072 let conn = db.connect();
2073 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, age INT64, PRIMARY KEY (id))")
2074 .unwrap();
2075 conn.query("CREATE (n:Person {id: 1, name: 'Alice', age: 30})")
2076 .unwrap();
2077 conn.query("CREATE (n:Person {id: 2, name: 'Bob', age: 20})")
2078 .unwrap();
2079
2080 let mut params = std::collections::HashMap::new();
2081 params.insert("min_age".to_string(), TypedValue::Int64(25));
2082 let result = conn
2083 .query_with_params(
2084 "MATCH (p:Person) WHERE p.age > $min_age RETURN p.name",
2085 params,
2086 )
2087 .unwrap();
2088 assert_eq!(result.num_rows(), 1);
2089 assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
2090 }
2091
2092 #[test]
2093 fn parameterized_create() {
2094 let db = Database::in_memory();
2095 let conn = db.connect();
2096 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
2097 .unwrap();
2098
2099 let mut params = std::collections::HashMap::new();
2100 params.insert("id".to_string(), TypedValue::Int64(1));
2101 params.insert(
2102 "name".to_string(),
2103 TypedValue::String(SmolStr::new("Alice")),
2104 );
2105 conn.query_with_params("CREATE (n:Person {id: $id, name: $name})", params)
2106 .unwrap();
2107
2108 let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
2109 assert_eq!(result.num_rows(), 1);
2110 assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
2111 }
2112
2113 #[test]
2114 fn parameterized_set() {
2115 let db = Database::in_memory();
2116 let conn = db.connect();
2117 conn.query("CREATE NODE TABLE Person (id INT64, age INT64, PRIMARY KEY (id))")
2118 .unwrap();
2119 conn.query("CREATE (n:Person {id: 1, age: 25})").unwrap();
2120
2121 let mut params = std::collections::HashMap::new();
2122 params.insert("new_age".to_string(), TypedValue::Int64(31));
2123 conn.query_with_params(
2124 "MATCH (p:Person) WHERE p.id = 1 SET p.age = $new_age",
2125 params,
2126 )
2127 .unwrap();
2128
2129 let result = conn.query("MATCH (p:Person) RETURN p.age").unwrap();
2130 assert_eq!(result.row(0)[0], TypedValue::Int64(31));
2131 }
2132
2133 #[test]
2134 fn unresolved_param_error() {
2135 let db = Database::in_memory();
2136 let conn = db.connect();
2137 let result = conn.query("RETURN $missing AS val");
2138 assert!(result.is_err());
2139 assert!(
2140 result
2141 .unwrap_err()
2142 .to_string()
2143 .contains("unresolved parameter")
2144 );
2145 }
2146
2147 #[test]
2148 fn env_resolved() {
2149 let db = Database::in_memory();
2150 let conn = db.connect();
2151 let mut env = std::collections::HashMap::new();
2152 env.insert(
2153 "GREETING".to_string(),
2154 TypedValue::String(SmolStr::new("hello")),
2155 );
2156 let result = conn
2157 .execute(
2158 "RETURN env('GREETING') AS val",
2159 std::collections::HashMap::new(),
2160 env,
2161 )
2162 .unwrap();
2163 assert_eq!(result.num_rows(), 1);
2164 assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("hello")));
2165 }
2166
2167 #[test]
2168 fn env_missing_returns_null() {
2169 let db = Database::in_memory();
2170 let conn = db.connect();
2171 let result = conn
2172 .execute(
2173 "RETURN env('MISSING') AS val",
2174 std::collections::HashMap::new(),
2175 std::collections::HashMap::new(),
2176 )
2177 .unwrap();
2178 assert_eq!(result.num_rows(), 1);
2179 assert_eq!(result.row(0)[0], TypedValue::Null);
2180 }
2181
2182 #[test]
2185 fn delta_upsert_new_nodes() {
2186 use kyu_delta::DeltaBatchBuilder;
2187
2188 let db = Database::in_memory();
2189 let conn = db.connect();
2190 conn.query("CREATE NODE TABLE Function (name STRING, lines INT64, PRIMARY KEY (name))")
2191 .unwrap();
2192
2193 let batch = DeltaBatchBuilder::new("file:main.rs", 1)
2194 .upsert_node(
2195 "Function",
2196 "main",
2197 vec![],
2198 [("lines", TypedValue::Int64(42))],
2199 )
2200 .upsert_node(
2201 "Function",
2202 "helper",
2203 vec![],
2204 [("lines", TypedValue::Int64(10))],
2205 )
2206 .build();
2207
2208 let stats = conn.apply_delta(batch).unwrap();
2209 assert_eq!(stats.nodes_created, 2);
2210 assert_eq!(stats.nodes_updated, 0);
2211
2212 let result = conn
2213 .query("MATCH (f:Function) RETURN f.name, f.lines")
2214 .unwrap();
2215 assert_eq!(result.num_rows(), 2);
2216 }
2217
2218 #[test]
2219 fn delta_upsert_existing_node_merges() {
2220 use kyu_delta::DeltaBatchBuilder;
2221
2222 let db = Database::in_memory();
2223 let conn = db.connect();
2224 conn.query("CREATE NODE TABLE Function (name STRING, lines INT64, PRIMARY KEY (name))")
2225 .unwrap();
2226
2227 let batch1 = DeltaBatchBuilder::new("file:main.rs", 1)
2229 .upsert_node(
2230 "Function",
2231 "main",
2232 vec![],
2233 [("lines", TypedValue::Int64(42))],
2234 )
2235 .build();
2236 conn.apply_delta(batch1).unwrap();
2237
2238 let batch2 = DeltaBatchBuilder::new("file:main.rs", 2)
2240 .upsert_node(
2241 "Function",
2242 "main",
2243 vec![],
2244 [("lines", TypedValue::Int64(50))],
2245 )
2246 .build();
2247 let stats = conn.apply_delta(batch2).unwrap();
2248 assert_eq!(stats.nodes_created, 0);
2249 assert_eq!(stats.nodes_updated, 1);
2250
2251 let result = conn
2252 .query("MATCH (f:Function) WHERE f.name = 'main' RETURN f.lines")
2253 .unwrap();
2254 assert_eq!(result.num_rows(), 1);
2255 assert_eq!(result.row(0)[0], TypedValue::Int64(50));
2256 }
2257
2258 #[test]
2259 fn delta_delete_node() {
2260 use kyu_delta::DeltaBatchBuilder;
2261
2262 let db = Database::in_memory();
2263 let conn = db.connect();
2264 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
2265 .unwrap();
2266 conn.query("CREATE (n:Person {id: 1, name: 'Alice'})")
2267 .unwrap();
2268 conn.query("CREATE (n:Person {id: 2, name: 'Bob'})")
2269 .unwrap();
2270
2271 let batch = DeltaBatchBuilder::new("cleanup", 1)
2272 .delete_node("Person", "1")
2273 .build();
2274 let stats = conn.apply_delta(batch).unwrap();
2275 assert_eq!(stats.nodes_deleted, 1);
2276
2277 let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
2278 assert_eq!(result.num_rows(), 1);
2279 assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Bob")));
2280 }
2281
2282 #[test]
2283 fn delta_upsert_and_delete_edges() {
2284 use kyu_delta::DeltaBatchBuilder;
2285
2286 let db = Database::in_memory();
2287 let conn = db.connect();
2288 conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
2289 .unwrap();
2290 conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person, since INT64)")
2291 .unwrap();
2292 conn.query("CREATE (n:Person {id: 1})").unwrap();
2293 conn.query("CREATE (n:Person {id: 2})").unwrap();
2294
2295 let batch = DeltaBatchBuilder::new("social", 1)
2297 .upsert_edge(
2298 "Person",
2299 "1",
2300 "KNOWS",
2301 "Person",
2302 "2",
2303 [("since", TypedValue::Int64(2024))],
2304 )
2305 .build();
2306 let stats = conn.apply_delta(batch).unwrap();
2307 assert_eq!(stats.edges_created, 1);
2308
2309 let storage = db.storage().read().unwrap();
2311 let catalog = db.catalog().read();
2312 let rel_table_id = catalog.find_by_name("KNOWS").unwrap().table_id();
2313 let rows = storage.scan_rows(rel_table_id).unwrap();
2314 assert_eq!(rows.len(), 1);
2315 assert_eq!(rows[0].1[0], TypedValue::Int64(1)); assert_eq!(rows[0].1[1], TypedValue::Int64(2)); assert_eq!(rows[0].1[2], TypedValue::Int64(2024)); drop(storage);
2319 drop(catalog);
2320
2321 let batch2 = DeltaBatchBuilder::new("social", 2)
2323 .upsert_edge(
2324 "Person",
2325 "1",
2326 "KNOWS",
2327 "Person",
2328 "2",
2329 [("since", TypedValue::Int64(2025))],
2330 )
2331 .build();
2332 let stats2 = conn.apply_delta(batch2).unwrap();
2333 assert_eq!(stats2.edges_updated, 1);
2334
2335 let storage = db.storage().read().unwrap();
2336 let rows = storage.scan_rows(rel_table_id).unwrap();
2337 assert_eq!(rows[0].1[2], TypedValue::Int64(2025));
2338 drop(storage);
2339
2340 let batch3 = DeltaBatchBuilder::new("social", 3)
2342 .delete_edge("Person", "1", "KNOWS", "Person", "2")
2343 .build();
2344 let stats3 = conn.apply_delta(batch3).unwrap();
2345 assert_eq!(stats3.edges_deleted, 1);
2346
2347 let storage = db.storage().read().unwrap();
2348 let rows = storage.scan_rows(rel_table_id).unwrap();
2349 assert_eq!(rows.len(), 0);
2350 }
2351
2352 #[test]
2353 fn delta_idempotent_replay() {
2354 use kyu_delta::DeltaBatchBuilder;
2355
2356 let db = Database::in_memory();
2357 let conn = db.connect();
2358 conn.query("CREATE NODE TABLE File (path STRING, hash STRING, PRIMARY KEY (path))")
2359 .unwrap();
2360
2361 let batch = DeltaBatchBuilder::new("watcher", 100)
2362 .upsert_node(
2363 "File",
2364 "src/main.rs",
2365 vec![],
2366 [("hash", TypedValue::String(SmolStr::new("abc123")))],
2367 )
2368 .build();
2369
2370 let stats1 = conn.apply_delta(batch.clone()).unwrap();
2372 assert_eq!(stats1.nodes_created, 1);
2373
2374 let stats2 = conn.apply_delta(batch).unwrap();
2376 assert_eq!(stats2.nodes_created, 0);
2377 assert_eq!(stats2.nodes_updated, 1);
2378
2379 let result = conn.query("MATCH (f:File) RETURN f.path").unwrap();
2381 assert_eq!(result.num_rows(), 1);
2382 }
2383
2384 #[test]
2385 fn delta_stats_correct() {
2386 use kyu_delta::DeltaBatchBuilder;
2387
2388 let db = Database::in_memory();
2389 let conn = db.connect();
2390 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
2391 .unwrap();
2392 conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person)")
2393 .unwrap();
2394
2395 let batch = DeltaBatchBuilder::new("test", 1)
2396 .upsert_node(
2397 "Person",
2398 "1",
2399 vec![],
2400 [("name", TypedValue::String(SmolStr::new("Alice")))],
2401 )
2402 .upsert_node(
2403 "Person",
2404 "2",
2405 vec![],
2406 [("name", TypedValue::String(SmolStr::new("Bob")))],
2407 )
2408 .upsert_edge(
2409 "Person",
2410 "1",
2411 "KNOWS",
2412 "Person",
2413 "2",
2414 Vec::<(&str, TypedValue)>::new(),
2415 )
2416 .build();
2417
2418 let stats = conn.apply_delta(batch).unwrap();
2419 assert_eq!(stats.nodes_created, 2);
2420 assert_eq!(stats.edges_created, 1);
2421 assert_eq!(stats.total_deltas, 3);
2422 assert!(stats.elapsed_micros > 0);
2423 }
2424}