1use std::sync::{Arc, RwLock};
4
5use std::collections::HashMap;
6
7use std::time::Instant;
8
9use kyu_binder::{
10 BindContext, Binder, BoundMatchClause, BoundNodePattern, BoundPatternElement, BoundQuery,
11 BoundReadingClause, BoundStatement, BoundUpdatingClause,
12};
13use kyu_catalog::{Catalog, NodeTableEntry, Property, RelTableEntry};
14use kyu_common::id::TableId;
15use kyu_common::{KyuError, KyuResult};
16use kyu_delta::{DeltaBatch, DeltaStats, GraphDelta};
17use kyu_executor::{ExecutionContext, QueryResult, Storage, execute};
18use kyu_expression::{FunctionRegistry, evaluate, evaluate_constant};
19use kyu_planner::{build_query_plan, optimize, resolve_properties};
20use kyu_transaction::{Checkpointer, TransactionManager, TransactionType, Wal};
21use kyu_types::{LogicalType, TypedValue};
22use smol_str::SmolStr;
23
24use crate::storage::NodeGroupStorage;
25
26pub struct Connection {
33 catalog: Arc<Catalog>,
34 storage: Arc<RwLock<NodeGroupStorage>>,
35 txn_mgr: Arc<TransactionManager>,
36 wal: Arc<Wal>,
37 checkpointer: Arc<Checkpointer>,
38 extensions: Arc<Vec<Box<dyn kyu_extension::Extension>>>,
39}
40
41impl Connection {
42 pub(crate) fn new(
43 catalog: Arc<Catalog>,
44 storage: Arc<RwLock<NodeGroupStorage>>,
45 txn_mgr: Arc<TransactionManager>,
46 wal: Arc<Wal>,
47 checkpointer: Arc<Checkpointer>,
48 extensions: Arc<Vec<Box<dyn kyu_extension::Extension>>>,
49 ) -> Self {
50 Self {
51 catalog,
52 storage,
53 txn_mgr,
54 wal,
55 checkpointer,
56 extensions,
57 }
58 }
59
60 pub fn query(&self, cypher: &str) -> KyuResult<QueryResult> {
66 self.query_internal(cypher, BindContext::empty())
67 }
68
69 pub fn query_with_params(
86 &self,
87 cypher: &str,
88 params: HashMap<String, TypedValue>,
89 ) -> KyuResult<QueryResult> {
90 let ctx = BindContext {
91 params: params
92 .into_iter()
93 .map(|(k, v)| (SmolStr::new(k), v))
94 .collect(),
95 env: HashMap::new(),
96 };
97 self.query_internal(cypher, ctx)
98 }
99
100 pub fn execute(
123 &self,
124 cypher: &str,
125 params: HashMap<String, TypedValue>,
126 env: HashMap<String, TypedValue>,
127 ) -> KyuResult<QueryResult> {
128 let ctx = BindContext {
129 params: params
130 .into_iter()
131 .map(|(k, v)| (SmolStr::new(k), v))
132 .collect(),
133 env: env.into_iter().map(|(k, v)| (SmolStr::new(k), v)).collect(),
134 };
135 self.query_internal(cypher, ctx)
136 }
137
138 fn query_internal(&self, cypher: &str, ctx: BindContext) -> KyuResult<QueryResult> {
139 if cypher.trim().eq_ignore_ascii_case("CHECKPOINT")
141 || cypher.trim().eq_ignore_ascii_case("CHECKPOINT;")
142 {
143 self.checkpointer
144 .checkpoint()
145 .map_err(|e| KyuError::Transaction(format!("checkpoint failed: {e}")))?;
146 return Ok(QueryResult::new(vec![], vec![]));
147 }
148
149 if let Some(result) = self.try_call_extension(cypher)? {
151 return Ok(result);
152 }
153
154 let parse_result = kyu_parser::parse(cypher);
156 let stmt = parse_result
157 .ast
158 .ok_or_else(|| KyuError::Parser(format!("{:?}", parse_result.errors)))?;
159
160 let catalog_snapshot = self.catalog.read();
162 let mut binder =
163 Binder::new(catalog_snapshot, FunctionRegistry::with_builtins()).with_context(ctx);
164 let bound = binder.bind(&stmt)?;
165
166 let is_ddl = matches!(
168 &bound,
169 BoundStatement::CreateNodeTable(_)
170 | BoundStatement::CreateRelTable(_)
171 | BoundStatement::Drop(_)
172 );
173 let is_write = match &bound {
174 BoundStatement::Query(q) => self.is_standalone_dml(q) || self.has_match_mutations(q),
175 BoundStatement::CopyFrom(_) => true,
176 _ => is_ddl,
177 };
178
179 let txn_type = if is_write {
181 TransactionType::Write
182 } else {
183 TransactionType::ReadOnly
184 };
185 let mut txn = self
186 .txn_mgr
187 .begin(txn_type)
188 .map_err(|e| KyuError::Transaction(e.to_string()))?;
189
190 let result = self.execute_bound(bound);
192
193 match &result {
195 Ok(_) => {
196 if is_ddl {
198 let snapshot = self.catalog.read().serialize_json();
199 txn.log_catalog_snapshot(snapshot.into_bytes());
200 }
201 self.txn_mgr
202 .commit(&mut txn, &self.wal, |_, _| {})
203 .map_err(|e| KyuError::Transaction(e.to_string()))?;
204 if is_write {
206 let _ = self.checkpointer.try_checkpoint();
207 }
208 }
209 Err(_) => {
210 let _ = self.txn_mgr.rollback(&mut txn, |_| {});
211 }
212 }
213
214 result
215 }
216
217 fn execute_bound(&self, bound: BoundStatement) -> KyuResult<QueryResult> {
219 match bound {
220 BoundStatement::Query(query) => {
221 if self.is_standalone_dml(&query) {
222 return self.exec_dml(&query);
223 }
224 if self.has_match_mutations(&query) {
225 return self.exec_match_dml(&query);
226 }
227 let catalog_snapshot = self.catalog.read();
228 let plan = build_query_plan(&query, &catalog_snapshot)?;
229 let plan = optimize(plan, &catalog_snapshot);
230 let storage_guard = self.storage.read().unwrap();
231 let ctx = ExecutionContext::new(catalog_snapshot, &*storage_guard);
232 execute(&plan, &query.output_schema, &ctx)
233 }
234 BoundStatement::CreateNodeTable(create) => self.exec_create_node_table(&create),
235 BoundStatement::CreateRelTable(create) => self.exec_create_rel_table(&create),
236 BoundStatement::Drop(drop) => self.exec_drop(&drop),
237 BoundStatement::CopyFrom(copy) => self.exec_copy_from(©),
238 _ => Err(KyuError::NotImplemented(
239 "statement type not yet supported".into(),
240 )),
241 }
242 }
243
244 fn is_standalone_dml(&self, query: &BoundQuery) -> bool {
248 query
249 .parts
250 .iter()
251 .all(|part| part.reading_clauses.is_empty() && !part.updating_clauses.is_empty())
252 }
253
254 fn exec_dml(&self, query: &BoundQuery) -> KyuResult<QueryResult> {
256 let catalog_snapshot = self.catalog.read();
257
258 for part in &query.parts {
259 let mut created_nodes: Vec<(Option<u32>, TableId, Vec<TypedValue>)> = Vec::new();
261
262 for clause in &part.updating_clauses {
263 match clause {
264 BoundUpdatingClause::Create(patterns) => {
265 for pattern in patterns {
266 for element in &pattern.elements {
267 match element {
268 BoundPatternElement::Node(node) => {
269 let values =
270 self.exec_create_node(node, &catalog_snapshot)?;
271 created_nodes.push((
272 node.variable_index,
273 node.table_id,
274 values,
275 ));
276 }
277 BoundPatternElement::Relationship(_rel) => {
278 return Err(KyuError::NotImplemented(
279 "CREATE relationship not yet supported".into(),
280 ));
281 }
282 }
283 }
284 }
285 }
286 BoundUpdatingClause::Set(_) => {
287 return Err(KyuError::NotImplemented(
288 "standalone SET without MATCH".into(),
289 ));
290 }
291 BoundUpdatingClause::Delete(_) => {
292 return Err(KyuError::NotImplemented(
293 "standalone DELETE without MATCH".into(),
294 ));
295 }
296 }
297 }
298
299 if let Some(ref proj) = part.projection {
301 let mut prop_map: HashMap<(u32, SmolStr), u32> = HashMap::new();
302 let mut combined_values: Vec<TypedValue> = Vec::new();
303 let mut offset = 0u32;
304
305 for (var_idx, table_id, values) in &created_nodes {
306 if let Some(entry) = catalog_snapshot.find_by_id(*table_id) {
307 if let Some(vi) = var_idx {
308 for (i, prop) in entry.properties().iter().enumerate() {
309 prop_map.insert((*vi, prop.name.clone()), offset + i as u32);
310 }
311 }
312 offset += entry.properties().len() as u32;
313 }
314 combined_values.extend(values.iter().cloned());
315 }
316
317 let col_names: Vec<SmolStr> =
318 proj.items.iter().map(|item| item.alias.clone()).collect();
319 let col_types: Vec<LogicalType> = proj
320 .items
321 .iter()
322 .map(|item| item.expression.result_type().clone())
323 .collect();
324
325 let mut row: Vec<TypedValue> = Vec::with_capacity(proj.items.len());
326 for item in &proj.items {
327 let resolved = resolve_properties(&item.expression, &prop_map);
328 let value = evaluate(&resolved, combined_values.as_slice())?;
329 row.push(value);
330 }
331
332 let mut result = QueryResult::new(col_names, col_types);
333 result.push_row(row);
334 return Ok(result);
335 }
336 }
337
338 Ok(QueryResult::new(vec![], vec![]))
339 }
340
341 fn exec_create_node(
344 &self,
345 node: &BoundNodePattern,
346 catalog: &kyu_catalog::CatalogContent,
347 ) -> KyuResult<Vec<TypedValue>> {
348 let entry = catalog
349 .find_by_id(node.table_id)
350 .ok_or_else(|| KyuError::Catalog(format!("table {:?} not found", node.table_id)))?;
351 let properties = entry.properties();
352
353 let mut values = Vec::with_capacity(properties.len());
354 for prop in properties {
355 let value = if let Some((_pid, expr)) =
356 node.properties.iter().find(|(pid, _)| *pid == prop.id)
357 {
358 evaluate_constant(expr)?
359 } else {
360 TypedValue::Null
361 };
362 values.push(value);
363 }
364
365 self.storage
366 .write()
367 .unwrap()
368 .insert_row(node.table_id, &values)?;
369
370 Ok(values)
371 }
372
373 fn has_match_mutations(&self, query: &BoundQuery) -> bool {
375 query
376 .parts
377 .iter()
378 .any(|part| !part.reading_clauses.is_empty() && !part.updating_clauses.is_empty())
379 }
380
381 fn exec_match_dml(&self, query: &BoundQuery) -> KyuResult<QueryResult> {
386 let catalog_snapshot = self.catalog.read();
387
388 for part in &query.parts {
389 let match_clauses: Vec<&BoundMatchClause> = part
391 .reading_clauses
392 .iter()
393 .filter_map(|c| match c {
394 BoundReadingClause::Match(m) => Some(m),
395 _ => None,
396 })
397 .collect();
398
399 if match_clauses.is_empty() {
400 return Err(KyuError::NotImplemented(
401 "MATCH...mutation requires at least one MATCH clause".into(),
402 ));
403 }
404
405 let mut bindings: Vec<HashMap<u32, Vec<TypedValue>>> = vec![HashMap::new()];
409 let mut prop_map: HashMap<(u32, SmolStr), u32> = HashMap::new();
410 let mut global_offset = 0u32;
411
412 for mc in &match_clauses {
413 let (table_id, var_idx) = self.extract_match_node(mc)?;
414 let entry = catalog_snapshot
415 .find_by_id(table_id)
416 .ok_or_else(|| KyuError::Catalog(format!("table {:?} not found", table_id)))?;
417 let properties = entry.properties();
418
419 if let Some(vi) = var_idx {
421 for (i, p) in properties.iter().enumerate() {
422 prop_map.insert((vi, p.name.clone()), global_offset + i as u32);
423 }
424 }
425
426 let resolved_where = mc
428 .where_clause
429 .as_ref()
430 .map(|w| resolve_properties(w, &prop_map));
431
432 let rows = self.storage.read().unwrap().scan_rows(table_id)?;
434
435 let mut new_bindings = Vec::new();
437 for existing in &bindings {
438 for (_row_idx, row_values) in &rows {
439 let mut combined = Vec::new();
441 let mut entries: Vec<(u32, &Vec<TypedValue>)> =
443 existing.iter().map(|(&k, v)| (k, v)).collect();
444 entries.sort_by_key(|(k, _)| *k);
445 for (_, vals) in &entries {
446 combined.extend(vals.iter().cloned());
447 }
448 combined.extend(row_values.iter().cloned());
449
450 if let Some(ref pred) = resolved_where {
452 let result = evaluate(pred, combined.as_slice())?;
453 if result != TypedValue::Bool(true) {
454 continue;
455 }
456 }
457
458 let mut binding = existing.clone();
459 if let Some(vi) = var_idx {
460 binding.insert(vi, row_values.clone());
461 }
462 new_bindings.push(binding);
463 }
464 }
465 bindings = new_bindings;
466 global_offset += properties.len() as u32;
467 }
468
469 let mut set_mutations: Vec<(TableId, u64, usize, TypedValue)> = Vec::new();
471 let mut delete_rows: Vec<(TableId, u64)> = Vec::new();
472 let mut rel_inserts: Vec<(TableId, Vec<TypedValue>)> = Vec::new();
473
474 for binding in &bindings {
475 let mut combined = Vec::new();
477 let mut entries: Vec<(u32, &Vec<TypedValue>)> =
478 binding.iter().map(|(&k, v)| (k, v)).collect();
479 entries.sort_by_key(|(k, _)| *k);
480 for (_, vals) in &entries {
481 combined.extend(vals.iter().cloned());
482 }
483
484 for clause in &part.updating_clauses {
485 match clause {
486 BoundUpdatingClause::Set(items) => {
487 let (table_id, _) = self.extract_match_node(match_clauses[0])?;
489 let entry = catalog_snapshot.find_by_id(table_id).unwrap();
490 let properties = entry.properties();
491 let rows = self.storage.read().unwrap().scan_rows(table_id)?;
492 let first_var =
494 match_clauses[0].patterns[0].elements.iter().find_map(|e| {
495 if let BoundPatternElement::Node(n) = e {
496 n.variable_index
497 } else {
498 None
499 }
500 });
501 if let Some(vi) = first_var
502 && let Some(var_vals) = binding.get(&vi)
503 {
504 for (row_idx, row_values) in &rows {
505 if row_values == var_vals {
506 for item in items {
507 let resolved_value =
508 resolve_properties(&item.value, &prop_map);
509 let new_value =
510 evaluate(&resolved_value, combined.as_slice())?;
511 let col_idx = properties
512 .iter()
513 .position(|p| p.id == item.property_id)
514 .ok_or_else(|| {
515 KyuError::Storage(format!(
516 "property {:?} not found",
517 item.property_id
518 ))
519 })?;
520 set_mutations
521 .push((table_id, *row_idx, col_idx, new_value));
522 }
523 break;
524 }
525 }
526 }
527 }
528 BoundUpdatingClause::Delete(_) => {
529 let (table_id, _) = self.extract_match_node(match_clauses[0])?;
530 let rows = self.storage.read().unwrap().scan_rows(table_id)?;
531 let first_var =
532 match_clauses[0].patterns[0].elements.iter().find_map(|e| {
533 if let BoundPatternElement::Node(n) = e {
534 n.variable_index
535 } else {
536 None
537 }
538 });
539 if let Some(vi) = first_var
540 && let Some(var_vals) = binding.get(&vi)
541 {
542 for (row_idx, row_values) in &rows {
543 if row_values == var_vals {
544 delete_rows.push((table_id, *row_idx));
545 break;
546 }
547 }
548 }
549 }
550 BoundUpdatingClause::Create(patterns) => {
551 for pattern in patterns {
552 self.exec_create_in_binding(
553 pattern,
554 binding,
555 &catalog_snapshot,
556 &mut rel_inserts,
557 )?;
558 }
559 }
560 }
561 }
562 }
563
564 let mut storage = self.storage.write().unwrap();
566 for (table_id, row_idx, col_idx, value) in &set_mutations {
567 storage.update_cell(*table_id, *row_idx, *col_idx, value)?;
568 }
569 for (table_id, row_idx) in &delete_rows {
570 storage.delete_row(*table_id, *row_idx)?;
571 }
572 for (table_id, values) in &rel_inserts {
573 storage.insert_row(*table_id, values)?;
574 }
575 }
576
577 Ok(QueryResult::new(vec![], vec![]))
578 }
579
580 fn exec_create_in_binding(
583 &self,
584 pattern: &kyu_binder::BoundPattern,
585 binding: &HashMap<u32, Vec<TypedValue>>,
586 catalog: &kyu_catalog::CatalogContent,
587 rel_inserts: &mut Vec<(TableId, Vec<TypedValue>)>,
588 ) -> KyuResult<()> {
589 let elements = &pattern.elements;
590 let mut i = 0;
591 while i < elements.len() {
592 match &elements[i] {
593 BoundPatternElement::Node(_node) => {
594 i += 1;
597 }
598 BoundPatternElement::Relationship(rel) => {
599 let src_var = if i > 0 {
601 if let BoundPatternElement::Node(n) = &elements[i - 1] {
602 n.variable_index
603 } else {
604 None
605 }
606 } else {
607 None
608 };
609 let dst_var = if i + 1 < elements.len() {
610 if let BoundPatternElement::Node(n) = &elements[i + 1] {
611 n.variable_index
612 } else {
613 None
614 }
615 } else {
616 None
617 };
618
619 let src_vi = src_var.ok_or_else(|| {
620 KyuError::Runtime("CREATE rel: cannot resolve source node".into())
621 })?;
622 let dst_vi = dst_var.ok_or_else(|| {
623 KyuError::Runtime("CREATE rel: cannot resolve destination node".into())
624 })?;
625
626 let src_vals = binding.get(&src_vi).ok_or_else(|| {
627 KyuError::Runtime(format!(
628 "CREATE rel: source var {src_vi} not in bindings"
629 ))
630 })?;
631 let dst_vals = binding.get(&dst_vi).ok_or_else(|| {
632 KyuError::Runtime(format!("CREATE rel: dest var {dst_vi} not in bindings"))
633 })?;
634
635 let rel_entry = catalog
637 .find_by_id(rel.table_id)
638 .and_then(|e| e.as_rel_table())
639 .ok_or_else(|| {
640 KyuError::Catalog(format!("rel table {:?} not found", rel.table_id))
641 })?;
642
643 let src_node_entry = catalog
644 .find_by_id(rel_entry.from_table_id)
645 .and_then(|e| e.as_node_table())
646 .ok_or_else(|| KyuError::Catalog("source node table not found".into()))?;
647 let dst_node_entry = catalog
648 .find_by_id(rel_entry.to_table_id)
649 .and_then(|e| e.as_node_table())
650 .ok_or_else(|| KyuError::Catalog("dest node table not found".into()))?;
651
652 let src_pk = src_vals[src_node_entry.primary_key_idx].clone();
654 let dst_pk = dst_vals[dst_node_entry.primary_key_idx].clone();
655
656 let mut values = vec![src_pk, dst_pk];
658 let rel_properties = rel_entry.properties.as_slice();
659 for prop in rel_properties {
660 let value = if let Some((_pid, expr)) =
661 rel.properties.iter().find(|(pid, _)| *pid == prop.id)
662 {
663 evaluate_constant(expr)?
664 } else {
665 TypedValue::Null
666 };
667 values.push(value);
668 }
669
670 rel_inserts.push((rel.table_id, values));
671 i += 1;
672 }
673 }
674 }
675 Ok(())
676 }
677
678 fn extract_match_node(
680 &self,
681 match_clause: &BoundMatchClause,
682 ) -> KyuResult<(TableId, Option<u32>)> {
683 for pattern in &match_clause.patterns {
684 for element in &pattern.elements {
685 if let BoundPatternElement::Node(node) = element {
686 return Ok((node.table_id, node.variable_index));
687 }
688 }
689 }
690 Err(KyuError::NotImplemented(
691 "MATCH clause must contain at least one node pattern".into(),
692 ))
693 }
694
695 pub fn apply_delta(&self, batch: DeltaBatch) -> KyuResult<DeltaStats> {
704 let start = Instant::now();
705 let mut stats = DeltaStats {
706 total_deltas: batch.len() as u64,
707 ..DeltaStats::default()
708 };
709
710 let mut txn = self
712 .txn_mgr
713 .begin(TransactionType::Write)
714 .map_err(|e| KyuError::Transaction(e.to_string()))?;
715
716 let catalog = self.catalog.read();
717 let mut storage = self.storage.write().unwrap();
718
719 for delta in batch.iter() {
720 match delta {
721 GraphDelta::UpsertNode {
722 key,
723 labels: _,
724 props,
725 } => {
726 let entry = catalog.find_by_name(key.label.as_str()).ok_or_else(|| {
727 KyuError::Catalog(format!("node table '{}' not found", key.label))
728 })?;
729 let node_entry = entry.as_node_table().ok_or_else(|| {
730 KyuError::Catalog(format!("'{}' is not a node table", key.label))
731 })?;
732 let table_id = node_entry.table_id;
733 let pk_col_idx = node_entry.primary_key_idx;
734 let pk_type = &node_entry.properties[pk_col_idx].data_type;
735 let pk_value = parse_primary_key(key.primary_key.as_str(), pk_type)?;
736
737 let existing = find_row_by_pk(&storage, table_id, pk_col_idx, &pk_value)?;
738
739 if let Some(row_idx) = existing {
740 for (prop_name, value) in props {
742 if let Some(col_idx) =
743 find_property_index(node_entry, prop_name.as_str())
744 {
745 storage.update_cell(table_id, row_idx, col_idx, value)?;
746 }
747 }
748 stats.nodes_updated += 1;
749 } else {
750 let values = build_node_row(node_entry, &pk_value, props);
752 storage.insert_row(table_id, &values)?;
753 stats.nodes_created += 1;
754 }
755 }
756
757 GraphDelta::UpsertEdge {
758 src,
759 rel_type,
760 dst,
761 props,
762 } => {
763 let entry = catalog.find_by_name(rel_type.as_str()).ok_or_else(|| {
764 KyuError::Catalog(format!("rel table '{}' not found", rel_type))
765 })?;
766 let rel_entry = entry.as_rel_table().ok_or_else(|| {
767 KyuError::Catalog(format!("'{}' is not a rel table", rel_type))
768 })?;
769 let rel_table_id = rel_entry.table_id;
770
771 let src_node = catalog
773 .find_by_name(src.label.as_str())
774 .and_then(|e| e.as_node_table())
775 .ok_or_else(|| {
776 KyuError::Catalog(format!("node table '{}' not found", src.label))
777 })?;
778 let dst_node = catalog
779 .find_by_name(dst.label.as_str())
780 .and_then(|e| e.as_node_table())
781 .ok_or_else(|| {
782 KyuError::Catalog(format!("node table '{}' not found", dst.label))
783 })?;
784
785 let src_pk_type = &src_node.properties[src_node.primary_key_idx].data_type;
786 let dst_pk_type = &dst_node.properties[dst_node.primary_key_idx].data_type;
787 let src_pk = parse_primary_key(src.primary_key.as_str(), src_pk_type)?;
788 let dst_pk = parse_primary_key(dst.primary_key.as_str(), dst_pk_type)?;
789
790 let existing = find_edge_row(&storage, rel_table_id, &src_pk, &dst_pk)?;
792
793 if let Some(row_idx) = existing {
794 for (prop_name, value) in props {
796 if let Some(prop_idx) =
797 find_rel_property_index(rel_entry, prop_name.as_str())
798 {
799 let col_idx = prop_idx + 2; storage.update_cell(rel_table_id, row_idx, col_idx, value)?;
801 }
802 }
803 stats.edges_updated += 1;
804 } else {
805 let values = build_edge_row(rel_entry, &src_pk, &dst_pk, props);
807 storage.insert_row(rel_table_id, &values)?;
808 stats.edges_created += 1;
809 }
810 }
811
812 GraphDelta::DeleteNode { key } => {
813 let entry = catalog
814 .find_by_name(key.label.as_str())
815 .and_then(|e| e.as_node_table())
816 .ok_or_else(|| {
817 KyuError::Catalog(format!("node table '{}' not found", key.label))
818 })?;
819 let table_id = entry.table_id;
820 let pk_col_idx = entry.primary_key_idx;
821 let pk_type = &entry.properties[pk_col_idx].data_type;
822 let pk_value = parse_primary_key(key.primary_key.as_str(), pk_type)?;
823
824 if let Some(row_idx) =
825 find_row_by_pk(&storage, table_id, pk_col_idx, &pk_value)?
826 {
827 storage.delete_row(table_id, row_idx)?;
828 stats.nodes_deleted += 1;
829 }
830 }
831
832 GraphDelta::DeleteEdge { src, rel_type, dst } => {
833 let rel_entry = catalog
834 .find_by_name(rel_type.as_str())
835 .and_then(|e| e.as_rel_table())
836 .ok_or_else(|| {
837 KyuError::Catalog(format!("rel table '{}' not found", rel_type))
838 })?;
839 let rel_table_id = rel_entry.table_id;
840
841 let src_node = catalog
842 .find_by_name(src.label.as_str())
843 .and_then(|e| e.as_node_table())
844 .ok_or_else(|| {
845 KyuError::Catalog(format!("node table '{}' not found", src.label))
846 })?;
847 let dst_node = catalog
848 .find_by_name(dst.label.as_str())
849 .and_then(|e| e.as_node_table())
850 .ok_or_else(|| {
851 KyuError::Catalog(format!("node table '{}' not found", dst.label))
852 })?;
853
854 let src_pk = parse_primary_key(
855 src.primary_key.as_str(),
856 &src_node.properties[src_node.primary_key_idx].data_type,
857 )?;
858 let dst_pk = parse_primary_key(
859 dst.primary_key.as_str(),
860 &dst_node.properties[dst_node.primary_key_idx].data_type,
861 )?;
862
863 if let Some(row_idx) = find_edge_row(&storage, rel_table_id, &src_pk, &dst_pk)?
864 {
865 storage.delete_row(rel_table_id, row_idx)?;
866 stats.edges_deleted += 1;
867 }
868 }
869 }
870 }
871
872 drop(storage);
873 drop(catalog);
874
875 self.txn_mgr
877 .commit(&mut txn, &self.wal, |_, _| {})
878 .map_err(|e| KyuError::Transaction(e.to_string()))?;
879 let _ = self.checkpointer.try_checkpoint();
880
881 stats.elapsed_micros = start.elapsed().as_micros() as u64;
882 Ok(stats)
883 }
884
885 fn try_call_extension(&self, cypher: &str) -> KyuResult<Option<QueryResult>> {
890 let trimmed = cypher.trim();
891 if !trimmed.to_uppercase().starts_with("CALL ") {
892 return Ok(None);
893 }
894
895 let rest = trimmed[5..].trim();
897 let dot_pos = rest.find('.').ok_or_else(|| {
898 KyuError::Binder("CALL requires <extension>.<procedure>(...) syntax".into())
899 })?;
900 let ext_name = &rest[..dot_pos];
901 let after_dot = &rest[dot_pos + 1..];
902
903 let paren_pos = after_dot.find('(').ok_or_else(|| {
904 KyuError::Binder("CALL requires <extension>.<procedure>(...) syntax".into())
905 })?;
906 let proc_name = &after_dot[..paren_pos];
907 let args_str = after_dot[paren_pos + 1..].trim_end_matches([')', ';']);
908
909 let args: Vec<String> = if args_str.trim().is_empty() {
910 Vec::new()
911 } else {
912 args_str
913 .split(',')
914 .map(|s| s.trim().trim_matches('\'').to_string())
915 .collect()
916 };
917
918 let ext = self
920 .extensions
921 .iter()
922 .find(|e| e.name() == ext_name)
923 .ok_or_else(|| KyuError::Binder(format!("unknown extension '{ext_name}'")))?;
924
925 let adjacency = if ext.needs_graph() {
927 self.build_graph_adjacency()
928 } else {
929 std::collections::HashMap::new()
930 };
931
932 let rows = ext
934 .execute(proc_name, &args, &adjacency)
935 .map_err(|e| KyuError::Runtime(format!("extension error: {e}")))?;
936
937 let proc_sig = ext
939 .procedures()
940 .into_iter()
941 .find(|p| p.name == proc_name)
942 .ok_or_else(|| {
943 KyuError::Binder(format!(
944 "unknown procedure '{proc_name}' in extension '{ext_name}'"
945 ))
946 })?;
947
948 let col_names: Vec<SmolStr> = proc_sig
949 .columns
950 .iter()
951 .map(|c| SmolStr::new(&c.name))
952 .collect();
953 let col_types: Vec<LogicalType> = proc_sig
954 .columns
955 .iter()
956 .map(|c| c.data_type.clone())
957 .collect();
958
959 let mut result = QueryResult::new(col_names, col_types);
960 for proc_row in rows {
961 result.push_row(proc_row);
962 }
963
964 Ok(Some(result))
965 }
966
967 fn build_graph_adjacency(&self) -> std::collections::HashMap<i64, Vec<(i64, f64)>> {
972 use kyu_executor::value_vector::ValueVector;
973
974 let mut adjacency: std::collections::HashMap<i64, Vec<(i64, f64)>> =
975 std::collections::HashMap::new();
976 let catalog = self.catalog.read();
977 let storage = self.storage.read().unwrap();
978
979 for rel in catalog.rel_tables() {
980 let table_id = rel.table_id;
981 for chunk in storage.scan_table(table_id) {
982 let n = chunk.num_rows();
983 if n == 0 {
984 continue;
985 }
986
987 let src_col = chunk.column(0);
988 let dst_col = chunk.column(1);
989
990 if chunk.selection().is_identity()
992 && let (ValueVector::Flat(src_flat), ValueVector::Flat(dst_flat)) =
993 (src_col, dst_col)
994 {
995 let src_slice = src_flat.data_as_i64_slice();
996 let dst_slice = dst_flat.data_as_i64_slice();
997 let src_nm = src_flat.null_mask();
998 let dst_nm = dst_flat.null_mask();
999 for i in 0..n {
1000 if !src_nm.is_null(i as u64) && !dst_nm.is_null(i as u64) {
1001 adjacency
1002 .entry(src_slice[i])
1003 .or_default()
1004 .push((dst_slice[i], 1.0));
1005 }
1006 }
1007 continue;
1008 }
1009
1010 for row_idx in 0..n {
1012 let src = chunk.get_value(row_idx, 0);
1013 let dst = chunk.get_value(row_idx, 1);
1014 if let (TypedValue::Int64(s), TypedValue::Int64(d)) = (src, dst) {
1015 adjacency.entry(s).or_default().push((d, 1.0));
1016 }
1017 }
1018 }
1019 }
1020
1021 adjacency
1022 }
1023
1024 fn exec_create_node_table(
1027 &self,
1028 create: &kyu_binder::BoundCreateNodeTable,
1029 ) -> KyuResult<QueryResult> {
1030 let mut catalog = self.catalog.begin_write();
1031
1032 let table_id = catalog.alloc_table_id();
1033 let properties: Vec<Property> = create
1034 .columns
1035 .iter()
1036 .map(|col| {
1037 let prop_id = catalog.alloc_property_id();
1038 Property::new(
1039 prop_id,
1040 col.name.clone(),
1041 col.data_type.clone(),
1042 col.property_id.0 as usize == create.primary_key_idx,
1043 )
1044 })
1045 .collect();
1046
1047 let schema: Vec<LogicalType> = create.columns.iter().map(|c| c.data_type.clone()).collect();
1048
1049 catalog.add_node_table(NodeTableEntry {
1050 table_id,
1051 name: create.name.clone(),
1052 properties,
1053 primary_key_idx: create.primary_key_idx,
1054 num_rows: 0,
1055 comment: None,
1056 })?;
1057
1058 self.catalog.commit_write(catalog);
1059
1060 self.storage.write().unwrap().create_table(table_id, schema);
1062
1063 Ok(QueryResult::new(vec![], vec![]))
1064 }
1065
1066 fn exec_create_rel_table(
1067 &self,
1068 create: &kyu_binder::BoundCreateRelTable,
1069 ) -> KyuResult<QueryResult> {
1070 let mut catalog = self.catalog.begin_write();
1071
1072 let table_id = catalog.alloc_table_id();
1073 let properties: Vec<Property> = create
1074 .columns
1075 .iter()
1076 .map(|col| {
1077 let prop_id = catalog.alloc_property_id();
1078 Property::new(prop_id, col.name.clone(), col.data_type.clone(), false)
1079 })
1080 .collect();
1081
1082 let from_key_type = catalog
1084 .find_by_id(create.from_table_id)
1085 .and_then(|e| e.as_node_table())
1086 .map(|n| n.primary_key_property().data_type.clone())
1087 .unwrap_or(LogicalType::Int64);
1088 let to_key_type = catalog
1089 .find_by_id(create.to_table_id)
1090 .and_then(|e| e.as_node_table())
1091 .map(|n| n.primary_key_property().data_type.clone())
1092 .unwrap_or(LogicalType::Int64);
1093 let mut schema = vec![from_key_type, to_key_type];
1094 schema.extend(create.columns.iter().map(|c| c.data_type.clone()));
1095
1096 catalog.add_rel_table(RelTableEntry {
1097 table_id,
1098 name: create.name.clone(),
1099 from_table_id: create.from_table_id,
1100 to_table_id: create.to_table_id,
1101 properties,
1102 num_rows: 0,
1103 comment: None,
1104 })?;
1105
1106 self.catalog.commit_write(catalog);
1107
1108 self.storage.write().unwrap().create_table(table_id, schema);
1109
1110 Ok(QueryResult::new(vec![], vec![]))
1111 }
1112
1113 fn exec_drop(&self, drop: &kyu_binder::BoundDrop) -> KyuResult<QueryResult> {
1114 let mut catalog = self.catalog.begin_write();
1115 catalog
1116 .remove_by_id(drop.table_id)
1117 .ok_or_else(|| KyuError::Catalog(format!("table '{}' not found", drop.name)))?;
1118 self.catalog.commit_write(catalog);
1119
1120 self.storage.write().unwrap().drop_table(drop.table_id);
1121
1122 Ok(QueryResult::new(vec![], vec![]))
1123 }
1124
1125 fn exec_copy_from(&self, copy: &kyu_binder::BoundCopyFrom) -> KyuResult<QueryResult> {
1128 let path_val = evaluate_constant(©.source)?;
1130 let path = match &path_val {
1131 TypedValue::String(s) => s.as_str().to_string(),
1132 _ => {
1133 return Err(KyuError::Copy(
1134 "COPY FROM source must be a string path".into(),
1135 ));
1136 }
1137 };
1138
1139 let catalog_snapshot = self.catalog.read();
1141 let entry = catalog_snapshot
1142 .find_by_id(copy.table_id)
1143 .ok_or_else(|| KyuError::Catalog(format!("table {:?} not found", copy.table_id)))?;
1144 let properties = entry.properties();
1145 let schema: Vec<LogicalType> = properties.iter().map(|p| p.data_type.clone()).collect();
1146 drop(catalog_snapshot);
1147
1148 let reader = kyu_copy::open_reader(&path, &schema)?;
1150
1151 let mut storage = self.storage.write().unwrap();
1152 for row_result in reader {
1153 let values = row_result?;
1154 storage.insert_row(copy.table_id, &values)?;
1155 }
1156
1157 Ok(QueryResult::new(vec![], vec![]))
1158 }
1159}
1160
1161fn parse_primary_key(value: &str, ty: &LogicalType) -> KyuResult<TypedValue> {
1165 match ty {
1166 LogicalType::Int8 => value
1167 .parse::<i8>()
1168 .map(TypedValue::Int8)
1169 .map_err(|e| KyuError::Delta(format!("cannot parse PK '{value}' as INT8: {e}"))),
1170 LogicalType::Int16 => value
1171 .parse::<i16>()
1172 .map(TypedValue::Int16)
1173 .map_err(|e| KyuError::Delta(format!("cannot parse PK '{value}' as INT16: {e}"))),
1174 LogicalType::Int32 => value
1175 .parse::<i32>()
1176 .map(TypedValue::Int32)
1177 .map_err(|e| KyuError::Delta(format!("cannot parse PK '{value}' as INT32: {e}"))),
1178 LogicalType::Int64 | LogicalType::Serial => value
1179 .parse::<i64>()
1180 .map(TypedValue::Int64)
1181 .map_err(|e| KyuError::Delta(format!("cannot parse PK '{value}' as INT64: {e}"))),
1182 LogicalType::String => Ok(TypedValue::String(SmolStr::new(value))),
1183 _ => Err(KyuError::Delta(format!(
1184 "unsupported primary key type '{}' for delta upsert",
1185 ty.type_name()
1186 ))),
1187 }
1188}
1189
1190fn find_row_by_pk(
1192 storage: &crate::storage::NodeGroupStorage,
1193 table_id: TableId,
1194 pk_col_idx: usize,
1195 pk_value: &TypedValue,
1196) -> KyuResult<Option<u64>> {
1197 let rows = storage.scan_rows(table_id)?;
1198 for (row_idx, row_values) in &rows {
1199 if row_values.get(pk_col_idx) == Some(pk_value) {
1200 return Ok(Some(*row_idx));
1201 }
1202 }
1203 Ok(None)
1204}
1205
1206fn find_edge_row(
1209 storage: &crate::storage::NodeGroupStorage,
1210 rel_table_id: TableId,
1211 src_pk: &TypedValue,
1212 dst_pk: &TypedValue,
1213) -> KyuResult<Option<u64>> {
1214 let rows = storage.scan_rows(rel_table_id)?;
1215 for (row_idx, row_values) in &rows {
1216 if row_values.first() == Some(src_pk) && row_values.get(1) == Some(dst_pk) {
1217 return Ok(Some(*row_idx));
1218 }
1219 }
1220 Ok(None)
1221}
1222
1223fn find_property_index(entry: &NodeTableEntry, name: &str) -> Option<usize> {
1225 let lower = name.to_lowercase();
1226 entry
1227 .properties
1228 .iter()
1229 .position(|p| p.name.to_lowercase() == lower)
1230}
1231
1232fn find_rel_property_index(entry: &RelTableEntry, name: &str) -> Option<usize> {
1234 let lower = name.to_lowercase();
1235 entry
1236 .properties
1237 .iter()
1238 .position(|p| p.name.to_lowercase() == lower)
1239}
1240
1241fn build_node_row(
1243 entry: &NodeTableEntry,
1244 pk_value: &TypedValue,
1245 props: &hashbrown::HashMap<SmolStr, TypedValue>,
1246) -> Vec<TypedValue> {
1247 entry
1248 .properties
1249 .iter()
1250 .enumerate()
1251 .map(|(i, prop)| {
1252 if i == entry.primary_key_idx {
1253 pk_value.clone()
1254 } else if let Some(val) = props.get(&prop.name) {
1255 val.clone()
1256 } else {
1257 TypedValue::Null
1258 }
1259 })
1260 .collect()
1261}
1262
1263fn build_edge_row(
1265 entry: &RelTableEntry,
1266 src_pk: &TypedValue,
1267 dst_pk: &TypedValue,
1268 props: &hashbrown::HashMap<SmolStr, TypedValue>,
1269) -> Vec<TypedValue> {
1270 let mut row = vec![src_pk.clone(), dst_pk.clone()];
1271 for prop in &entry.properties {
1272 if let Some(val) = props.get(&prop.name) {
1273 row.push(val.clone());
1274 } else {
1275 row.push(TypedValue::Null);
1276 }
1277 }
1278 row
1279}
1280
1281#[cfg(test)]
1282mod tests {
1283 use crate::database::Database;
1284 use kyu_types::TypedValue;
1285 use smol_str::SmolStr;
1286
1287 #[test]
1288 fn create_database_and_connect() {
1289 let db = Database::in_memory();
1290 let _conn = db.connect();
1291 assert_eq!(db.catalog().num_tables(), 0);
1292 }
1293
1294 #[test]
1295 fn return_literal() {
1296 let db = Database::in_memory();
1297 let conn = db.connect();
1298 let result = conn.query("RETURN 1 AS x").unwrap();
1299 assert_eq!(result.num_rows(), 1);
1300 assert_eq!(result.row(0), vec![TypedValue::Int64(1)]);
1301 }
1302
1303 #[test]
1304 fn return_arithmetic() {
1305 let db = Database::in_memory();
1306 let conn = db.connect();
1307 let result = conn.query("RETURN 2 + 3 AS sum").unwrap();
1308 assert_eq!(result.row(0), vec![TypedValue::Int64(5)]);
1309 }
1310
1311 #[test]
1312 fn create_node_table() {
1313 let db = Database::in_memory();
1314 let conn = db.connect();
1315 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1316 .unwrap();
1317
1318 assert_eq!(db.catalog().num_tables(), 1);
1319 let snapshot = db.catalog().read();
1320 let entry = snapshot.find_by_name("Person").unwrap();
1321 assert!(entry.is_node_table());
1322 assert_eq!(entry.properties().len(), 2);
1323
1324 assert!(db.storage().read().unwrap().has_table(entry.table_id()));
1325 }
1326
1327 #[test]
1328 fn create_and_query_empty_table() {
1329 let db = Database::in_memory();
1330 let conn = db.connect();
1331 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1332 .unwrap();
1333 let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1334 assert_eq!(result.num_rows(), 0);
1335 }
1336
1337 #[test]
1338 fn create_rel_table() {
1339 let db = Database::in_memory();
1340 let conn = db.connect();
1341 conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1342 .unwrap();
1343 conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person, since INT64)")
1344 .unwrap();
1345
1346 assert_eq!(db.catalog().num_tables(), 2);
1347 let snapshot = db.catalog().read();
1348 let entry = snapshot.find_by_name("KNOWS").unwrap();
1349 assert!(entry.is_rel_table());
1350 }
1351
1352 #[test]
1353 fn drop_table() {
1354 let db = Database::in_memory();
1355 let conn = db.connect();
1356 conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1357 .unwrap();
1358 assert_eq!(db.catalog().num_tables(), 1);
1359
1360 conn.query("DROP TABLE Person").unwrap();
1361 assert_eq!(db.catalog().num_tables(), 0);
1362 }
1363
1364 #[test]
1365 fn create_duplicate_error() {
1366 let db = Database::in_memory();
1367 let conn = db.connect();
1368 conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1369 .unwrap();
1370 let result = conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))");
1371 assert!(result.is_err());
1372 }
1373
1374 #[test]
1375 fn parse_error_propagated() {
1376 let db = Database::in_memory();
1377 let conn = db.connect();
1378 let result = conn.query("THIS IS NOT VALID CYPHER !!!");
1379 assert!(result.is_err());
1380 }
1381
1382 #[test]
1383 fn multiple_connections_share_state() {
1384 let db = Database::in_memory();
1385 let conn1 = db.connect();
1386 let conn2 = db.connect();
1387
1388 conn1
1389 .query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1390 .unwrap();
1391
1392 assert_eq!(db.catalog().num_tables(), 1);
1394 let result = conn2.query("MATCH (p:Person) RETURN p.id").unwrap();
1395 assert_eq!(result.num_rows(), 0);
1396 }
1397
1398 #[test]
1399 fn create_node_via_cypher() {
1400 let db = Database::in_memory();
1401 let conn = db.connect();
1402 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1403 .unwrap();
1404
1405 conn.query("CREATE (n:Person {id: 1, name: 'Alice'})")
1406 .unwrap();
1407
1408 let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1409 assert_eq!(result.num_rows(), 1);
1410 assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1411 }
1412
1413 #[test]
1414 fn create_multiple_nodes() {
1415 let db = Database::in_memory();
1416 let conn = db.connect();
1417 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1418 .unwrap();
1419
1420 conn.query("CREATE (a:Person {id: 1, name: 'Alice'}), (b:Person {id: 2, name: 'Bob'})")
1421 .unwrap();
1422
1423 let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1424 assert_eq!(result.num_rows(), 2);
1425 }
1426
1427 #[test]
1428 fn create_node_partial_properties() {
1429 let db = Database::in_memory();
1430 let conn = db.connect();
1431 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1432 .unwrap();
1433
1434 conn.query("CREATE (n:Person {id: 1})").unwrap();
1436
1437 let result = conn.query("MATCH (p:Person) RETURN p.id, p.name").unwrap();
1438 assert_eq!(result.num_rows(), 1);
1439 assert_eq!(result.row(0)[0], TypedValue::Int64(1));
1440 assert_eq!(result.row(0)[1], TypedValue::Null);
1441 }
1442
1443 #[test]
1444 fn create_and_return() {
1445 let db = Database::in_memory();
1446 let conn = db.connect();
1447 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1448 .unwrap();
1449
1450 let result = conn
1451 .query("CREATE (n:Person {id: 1, name: 'Alice'}) RETURN n.name, n.id")
1452 .unwrap();
1453 assert_eq!(result.num_rows(), 1);
1454 assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1455 assert_eq!(result.row(0)[1], TypedValue::Int64(1));
1456 }
1457
1458 #[test]
1459 fn match_set_property() {
1460 let db = Database::in_memory();
1461 let conn = db.connect();
1462 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, age INT64, PRIMARY KEY (id))")
1463 .unwrap();
1464 conn.query("CREATE (n:Person {id: 1, name: 'Alice', age: 25})")
1465 .unwrap();
1466
1467 conn.query("MATCH (p:Person) WHERE p.name = 'Alice' SET p.age = 31")
1468 .unwrap();
1469
1470 let result = conn.query("MATCH (p:Person) RETURN p.age").unwrap();
1471 assert_eq!(result.num_rows(), 1);
1472 assert_eq!(result.row(0)[0], TypedValue::Int64(31));
1473 }
1474
1475 #[test]
1476 fn match_set_with_where() {
1477 let db = Database::in_memory();
1478 let conn = db.connect();
1479 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, age INT64, PRIMARY KEY (id))")
1480 .unwrap();
1481 conn.query("CREATE (a:Person {id: 1, name: 'Alice', age: 25})")
1482 .unwrap();
1483 conn.query("CREATE (b:Person {id: 2, name: 'Bob', age: 30})")
1484 .unwrap();
1485
1486 conn.query("MATCH (p:Person) WHERE p.id = 1 SET p.age = 26")
1488 .unwrap();
1489
1490 let result = conn.query("MATCH (p:Person) RETURN p.name, p.age").unwrap();
1491 assert_eq!(result.num_rows(), 2);
1492 let alice_row = result
1494 .iter_rows()
1495 .find(|r| r[0] == TypedValue::String(SmolStr::new("Alice")))
1496 .unwrap();
1497 let bob_row = result
1498 .iter_rows()
1499 .find(|r| r[0] == TypedValue::String(SmolStr::new("Bob")))
1500 .unwrap();
1501 assert_eq!(alice_row[1], TypedValue::Int64(26)); assert_eq!(bob_row[1], TypedValue::Int64(30)); }
1504
1505 #[test]
1506 fn match_set_all_rows() {
1507 let db = Database::in_memory();
1508 let conn = db.connect();
1509 conn.query("CREATE NODE TABLE Person (id INT64, active INT64, PRIMARY KEY (id))")
1510 .unwrap();
1511 conn.query("CREATE (a:Person {id: 1, active: 0})").unwrap();
1512 conn.query("CREATE (b:Person {id: 2, active: 0})").unwrap();
1513
1514 conn.query("MATCH (p:Person) SET p.active = 1").unwrap();
1516
1517 let result = conn.query("MATCH (p:Person) RETURN p.active").unwrap();
1518 assert_eq!(result.num_rows(), 2);
1519 assert_eq!(result.row(0)[0], TypedValue::Int64(1));
1520 assert_eq!(result.row(1)[0], TypedValue::Int64(1));
1521 }
1522
1523 #[test]
1524 fn match_delete() {
1525 let db = Database::in_memory();
1526 let conn = db.connect();
1527 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1528 .unwrap();
1529 conn.query("CREATE (a:Person {id: 1, name: 'Alice'})")
1530 .unwrap();
1531 conn.query("CREATE (b:Person {id: 2, name: 'Bob'})")
1532 .unwrap();
1533
1534 conn.query("MATCH (p:Person) WHERE p.name = 'Alice' DELETE p")
1535 .unwrap();
1536
1537 let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1538 assert_eq!(result.num_rows(), 1);
1539 assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Bob")));
1540 }
1541
1542 #[test]
1543 fn match_delete_all() {
1544 let db = Database::in_memory();
1545 let conn = db.connect();
1546 conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1547 .unwrap();
1548 conn.query("CREATE (a:Person {id: 1})").unwrap();
1549 conn.query("CREATE (b:Person {id: 2})").unwrap();
1550
1551 conn.query("MATCH (p:Person) DELETE p").unwrap();
1552
1553 let result = conn.query("MATCH (p:Person) RETURN p.id").unwrap();
1554 assert_eq!(result.num_rows(), 0);
1555 }
1556
1557 #[test]
1558 fn storage_roundtrip_insert_scan() {
1559 let db = Database::in_memory();
1560 let conn = db.connect();
1561 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1562 .unwrap();
1563
1564 let snapshot = db.catalog().read();
1566 let table_id = snapshot.find_by_name("Person").unwrap().table_id();
1567 drop(snapshot);
1568
1569 db.storage()
1571 .write()
1572 .unwrap()
1573 .insert_row(
1574 table_id,
1575 &[
1576 TypedValue::Int64(1),
1577 TypedValue::String(SmolStr::new("Alice")),
1578 ],
1579 )
1580 .unwrap();
1581
1582 let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1584 assert_eq!(result.num_rows(), 1);
1585 assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1586 }
1587
1588 #[test]
1589 fn storage_roundtrip_multiple_rows() {
1590 let db = Database::in_memory();
1591 let conn = db.connect();
1592 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, age INT64, PRIMARY KEY (id))")
1593 .unwrap();
1594
1595 let snapshot = db.catalog().read();
1596 let table_id = snapshot.find_by_name("Person").unwrap().table_id();
1597 drop(snapshot);
1598
1599 let mut storage = db.storage().write().unwrap();
1600 storage
1601 .insert_row(
1602 table_id,
1603 &[
1604 TypedValue::Int64(1),
1605 TypedValue::String(SmolStr::new("Alice")),
1606 TypedValue::Int64(25),
1607 ],
1608 )
1609 .unwrap();
1610 storage
1611 .insert_row(
1612 table_id,
1613 &[
1614 TypedValue::Int64(2),
1615 TypedValue::String(SmolStr::new("Bob")),
1616 TypedValue::Int64(30),
1617 ],
1618 )
1619 .unwrap();
1620 drop(storage);
1621
1622 let result = conn.query("MATCH (p:Person) RETURN p.name, p.age").unwrap();
1623 assert_eq!(result.num_rows(), 2);
1624 }
1625
1626 #[test]
1627 fn copy_from_csv() {
1628 use std::io::Write;
1629
1630 let dir = std::env::temp_dir().join("kyu_test_csv");
1631 let _ = std::fs::create_dir_all(&dir);
1632 let csv_path = dir.join("persons.csv");
1633 {
1634 let mut f = std::fs::File::create(&csv_path).unwrap();
1635 writeln!(f, "id,name").unwrap();
1636 writeln!(f, "1,Alice").unwrap();
1637 writeln!(f, "2,Bob").unwrap();
1638 writeln!(f, "3,Charlie").unwrap();
1639 }
1640
1641 let db = Database::in_memory();
1642 let conn = db.connect();
1643 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1644 .unwrap();
1645 conn.query(&format!("COPY Person FROM '{}'", csv_path.display()))
1646 .unwrap();
1647
1648 let result = conn.query("MATCH (p:Person) RETURN p.id, p.name").unwrap();
1649 assert_eq!(result.num_rows(), 3);
1650
1651 let _ = std::fs::remove_file(&csv_path);
1653 }
1654
1655 #[test]
1656 fn copy_from_csv_multiple_types() {
1657 use std::io::Write;
1658
1659 let dir = std::env::temp_dir().join("kyu_test_csv");
1660 let _ = std::fs::create_dir_all(&dir);
1661 let csv_path = dir.join("typed.csv");
1662 {
1663 let mut f = std::fs::File::create(&csv_path).unwrap();
1664 writeln!(f, "id,name,score,active").unwrap();
1665 writeln!(f, "1,Alice,95.5,true").unwrap();
1666 writeln!(f, "2,Bob,87.3,false").unwrap();
1667 }
1668
1669 let db = Database::in_memory();
1670 let conn = db.connect();
1671 conn.query(
1672 "CREATE NODE TABLE Student (id INT64, name STRING, score DOUBLE, active BOOL, PRIMARY KEY (id))",
1673 )
1674 .unwrap();
1675 conn.query(&format!("COPY Student FROM '{}'", csv_path.display()))
1676 .unwrap();
1677
1678 let result = conn
1679 .query("MATCH (s:Student) RETURN s.name, s.score, s.active")
1680 .unwrap();
1681 assert_eq!(result.num_rows(), 2);
1682 assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1683 assert_eq!(result.row(0)[1], TypedValue::Double(95.5));
1684 assert_eq!(result.row(0)[2], TypedValue::Bool(true));
1685
1686 let _ = std::fs::remove_file(&csv_path);
1687 }
1688
1689 #[test]
1690 fn copy_from_parquet() {
1691 use arrow::array::{Int64Array, StringArray};
1692 use arrow::datatypes::{DataType, Field, Schema};
1693 use arrow::record_batch::RecordBatch;
1694 use parquet::arrow::ArrowWriter;
1695 use std::sync::Arc;
1696
1697 let dir = std::env::temp_dir().join("kyu_test_parquet_copy");
1698 let _ = std::fs::create_dir_all(&dir);
1699 let parquet_path = dir.join("persons.parquet");
1700 {
1701 let schema = Arc::new(Schema::new(vec![
1702 Field::new("id", DataType::Int64, false),
1703 Field::new("name", DataType::Utf8, false),
1704 ]));
1705 let ids = Int64Array::from(vec![1, 2, 3]);
1706 let names = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
1707 let batch =
1708 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(ids), Arc::new(names)])
1709 .unwrap();
1710 let file = std::fs::File::create(&parquet_path).unwrap();
1711 let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), None).unwrap();
1712 writer.write(&batch).unwrap();
1713 writer.close().unwrap();
1714 }
1715
1716 let db = Database::in_memory();
1717 let conn = db.connect();
1718 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1719 .unwrap();
1720 conn.query(&format!("COPY Person FROM '{}'", parquet_path.display()))
1721 .unwrap();
1722
1723 let result = conn.query("MATCH (p:Person) RETURN p.id, p.name").unwrap();
1724 assert_eq!(result.num_rows(), 3);
1725 assert_eq!(result.row(0)[0], TypedValue::Int64(1));
1726 assert_eq!(result.row(0)[1], TypedValue::String(SmolStr::new("Alice")));
1727
1728 let _ = std::fs::remove_dir_all(&dir);
1729 }
1730
1731 #[test]
1732 fn call_extension_pagerank() {
1733 let mut db = Database::in_memory();
1734 db.register_extension(Box::new(ext_algo::AlgoExtension));
1735 let conn = db.connect();
1736
1737 conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1739 .unwrap();
1740 conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person)")
1741 .unwrap();
1742 conn.query("CREATE (n:Person {id: 1})").unwrap();
1743 conn.query("CREATE (n:Person {id: 2})").unwrap();
1744 conn.query("CREATE (n:Person {id: 3})").unwrap();
1745
1746 let snapshot = db.catalog().read();
1748 let rel_table_id = snapshot.find_by_name("KNOWS").unwrap().table_id();
1749 drop(snapshot);
1750 {
1751 let mut storage = db.storage().write().unwrap();
1752 storage
1753 .insert_row(rel_table_id, &[TypedValue::Int64(1), TypedValue::Int64(2)])
1754 .unwrap();
1755 storage
1756 .insert_row(rel_table_id, &[TypedValue::Int64(2), TypedValue::Int64(3)])
1757 .unwrap();
1758 storage
1759 .insert_row(rel_table_id, &[TypedValue::Int64(3), TypedValue::Int64(1)])
1760 .unwrap();
1761 }
1762
1763 let result = conn
1764 .query("CALL algo.pageRank(0.85, 20, 0.000001)")
1765 .unwrap();
1766 assert_eq!(result.num_rows(), 3);
1767 assert_eq!(result.column_names.len(), 2);
1768 for row in result.iter_rows() {
1770 if let TypedValue::Double(rank) = &row[1] {
1771 assert!(*rank > 0.0);
1772 }
1773 }
1774 }
1775
1776 #[test]
1777 fn call_extension_wcc() {
1778 let mut db = Database::in_memory();
1779 db.register_extension(Box::new(ext_algo::AlgoExtension));
1780 let conn = db.connect();
1781
1782 conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1783 .unwrap();
1784 conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person)")
1785 .unwrap();
1786 conn.query("CREATE (n:Person {id: 1})").unwrap();
1787 conn.query("CREATE (n:Person {id: 2})").unwrap();
1788 conn.query("CREATE (n:Person {id: 10})").unwrap();
1789 conn.query("CREATE (n:Person {id: 11})").unwrap();
1790
1791 let snapshot = db.catalog().read();
1792 let rel_table_id = snapshot.find_by_name("KNOWS").unwrap().table_id();
1793 drop(snapshot);
1794 {
1795 let mut storage = db.storage().write().unwrap();
1796 storage
1797 .insert_row(rel_table_id, &[TypedValue::Int64(1), TypedValue::Int64(2)])
1798 .unwrap();
1799 storage
1800 .insert_row(
1801 rel_table_id,
1802 &[TypedValue::Int64(10), TypedValue::Int64(11)],
1803 )
1804 .unwrap();
1805 }
1806
1807 let result = conn.query("CALL algo.wcc()").unwrap();
1808 assert_eq!(result.num_rows(), 4);
1809 }
1810
1811 #[test]
1812 fn call_extension_betweenness() {
1813 let mut db = Database::in_memory();
1814 db.register_extension(Box::new(ext_algo::AlgoExtension));
1815 let conn = db.connect();
1816
1817 conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1818 .unwrap();
1819 conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person)")
1820 .unwrap();
1821 conn.query("CREATE (n:Person {id: 1})").unwrap();
1822 conn.query("CREATE (n:Person {id: 2})").unwrap();
1823 conn.query("CREATE (n:Person {id: 3})").unwrap();
1824
1825 let snapshot = db.catalog().read();
1826 let rel_table_id = snapshot.find_by_name("KNOWS").unwrap().table_id();
1827 drop(snapshot);
1828 {
1829 let mut storage = db.storage().write().unwrap();
1830 storage
1831 .insert_row(rel_table_id, &[TypedValue::Int64(1), TypedValue::Int64(2)])
1832 .unwrap();
1833 storage
1834 .insert_row(rel_table_id, &[TypedValue::Int64(2), TypedValue::Int64(3)])
1835 .unwrap();
1836 }
1837
1838 let result = conn.query("CALL algo.betweenness()").unwrap();
1839 assert_eq!(result.num_rows(), 3);
1840 }
1841
1842 #[test]
1843 fn call_unknown_extension() {
1844 let db = Database::in_memory();
1845 let conn = db.connect();
1846 let result = conn.query("CALL nonexistent.proc()");
1847 assert!(result.is_err());
1848 }
1849
1850 #[test]
1851 fn persistence_survives_restart() {
1852 let dir = std::env::temp_dir().join("kyu_test_persist_e2e");
1853 let _ = std::fs::remove_dir_all(&dir);
1854
1855 {
1857 let db = Database::open(&dir).unwrap();
1858 let conn = db.connect();
1859 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1860 .unwrap();
1861 conn.query("CREATE (n:Person {id: 1, name: 'Alice'})")
1862 .unwrap();
1863 conn.query("CREATE (n:Person {id: 2, name: 'Bob'})")
1864 .unwrap();
1865 }
1867
1868 {
1870 let db = Database::open(&dir).unwrap();
1871 let conn = db.connect();
1872
1873 assert_eq!(db.catalog().num_tables(), 1);
1875 let snapshot = db.catalog().read();
1876 assert!(snapshot.find_by_name("Person").is_some());
1877 drop(snapshot);
1878
1879 let result = conn.query("MATCH (p:Person) RETURN p.id, p.name").unwrap();
1881 assert_eq!(result.num_rows(), 2);
1882 }
1883
1884 let _ = std::fs::remove_dir_all(&dir);
1885 }
1886
1887 #[test]
1888 fn persistence_ddl_recovery_via_wal() {
1889 let dir = std::env::temp_dir().join("kyu_test_persist_ddl");
1890 let _ = std::fs::remove_dir_all(&dir);
1891
1892 {
1894 let db = Database::open(&dir).unwrap();
1895 let conn = db.connect();
1896 conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1897 .unwrap();
1898 conn.query("CREATE NODE TABLE Organization (id INT64, name STRING, PRIMARY KEY (id))")
1899 .unwrap();
1900 }
1901
1902 {
1904 let db = Database::open(&dir).unwrap();
1905 assert_eq!(db.catalog().num_tables(), 2);
1906 let snapshot = db.catalog().read();
1907 assert!(snapshot.find_by_name("Person").is_some());
1908 assert!(snapshot.find_by_name("Organization").is_some());
1909 }
1910
1911 let _ = std::fs::remove_dir_all(&dir);
1912 }
1913
1914 #[test]
1915 fn persistence_empty_database() {
1916 let dir = std::env::temp_dir().join("kyu_test_persist_empty_db");
1917 let _ = std::fs::remove_dir_all(&dir);
1918
1919 {
1921 let _db = Database::open(&dir).unwrap();
1922 }
1923
1924 {
1926 let db = Database::open(&dir).unwrap();
1927 assert_eq!(db.catalog().num_tables(), 0);
1928 }
1929
1930 let _ = std::fs::remove_dir_all(&dir);
1931 }
1932
1933 #[test]
1936 fn return_param() {
1937 let db = Database::in_memory();
1938 let conn = db.connect();
1939 let mut params = std::collections::HashMap::new();
1940 params.insert("x".to_string(), TypedValue::Int64(42));
1941 let result = conn.query_with_params("RETURN $x AS val", params).unwrap();
1942 assert_eq!(result.num_rows(), 1);
1943 assert_eq!(result.row(0), vec![TypedValue::Int64(42)]);
1944 }
1945
1946 #[test]
1947 fn parameterized_where() {
1948 let db = Database::in_memory();
1949 let conn = db.connect();
1950 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, age INT64, PRIMARY KEY (id))")
1951 .unwrap();
1952 conn.query("CREATE (n:Person {id: 1, name: 'Alice', age: 30})")
1953 .unwrap();
1954 conn.query("CREATE (n:Person {id: 2, name: 'Bob', age: 20})")
1955 .unwrap();
1956
1957 let mut params = std::collections::HashMap::new();
1958 params.insert("min_age".to_string(), TypedValue::Int64(25));
1959 let result = conn
1960 .query_with_params(
1961 "MATCH (p:Person) WHERE p.age > $min_age RETURN p.name",
1962 params,
1963 )
1964 .unwrap();
1965 assert_eq!(result.num_rows(), 1);
1966 assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1967 }
1968
1969 #[test]
1970 fn parameterized_create() {
1971 let db = Database::in_memory();
1972 let conn = db.connect();
1973 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1974 .unwrap();
1975
1976 let mut params = std::collections::HashMap::new();
1977 params.insert("id".to_string(), TypedValue::Int64(1));
1978 params.insert(
1979 "name".to_string(),
1980 TypedValue::String(SmolStr::new("Alice")),
1981 );
1982 conn.query_with_params("CREATE (n:Person {id: $id, name: $name})", params)
1983 .unwrap();
1984
1985 let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1986 assert_eq!(result.num_rows(), 1);
1987 assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1988 }
1989
1990 #[test]
1991 fn parameterized_set() {
1992 let db = Database::in_memory();
1993 let conn = db.connect();
1994 conn.query("CREATE NODE TABLE Person (id INT64, age INT64, PRIMARY KEY (id))")
1995 .unwrap();
1996 conn.query("CREATE (n:Person {id: 1, age: 25})").unwrap();
1997
1998 let mut params = std::collections::HashMap::new();
1999 params.insert("new_age".to_string(), TypedValue::Int64(31));
2000 conn.query_with_params(
2001 "MATCH (p:Person) WHERE p.id = 1 SET p.age = $new_age",
2002 params,
2003 )
2004 .unwrap();
2005
2006 let result = conn.query("MATCH (p:Person) RETURN p.age").unwrap();
2007 assert_eq!(result.row(0)[0], TypedValue::Int64(31));
2008 }
2009
2010 #[test]
2011 fn unresolved_param_error() {
2012 let db = Database::in_memory();
2013 let conn = db.connect();
2014 let result = conn.query("RETURN $missing AS val");
2015 assert!(result.is_err());
2016 assert!(
2017 result
2018 .unwrap_err()
2019 .to_string()
2020 .contains("unresolved parameter")
2021 );
2022 }
2023
2024 #[test]
2025 fn env_resolved() {
2026 let db = Database::in_memory();
2027 let conn = db.connect();
2028 let mut env = std::collections::HashMap::new();
2029 env.insert(
2030 "GREETING".to_string(),
2031 TypedValue::String(SmolStr::new("hello")),
2032 );
2033 let result = conn
2034 .execute(
2035 "RETURN env('GREETING') AS val",
2036 std::collections::HashMap::new(),
2037 env,
2038 )
2039 .unwrap();
2040 assert_eq!(result.num_rows(), 1);
2041 assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("hello")));
2042 }
2043
2044 #[test]
2045 fn env_missing_returns_null() {
2046 let db = Database::in_memory();
2047 let conn = db.connect();
2048 let result = conn
2049 .execute(
2050 "RETURN env('MISSING') AS val",
2051 std::collections::HashMap::new(),
2052 std::collections::HashMap::new(),
2053 )
2054 .unwrap();
2055 assert_eq!(result.num_rows(), 1);
2056 assert_eq!(result.row(0)[0], TypedValue::Null);
2057 }
2058
2059 #[test]
2062 fn delta_upsert_new_nodes() {
2063 use kyu_delta::DeltaBatchBuilder;
2064
2065 let db = Database::in_memory();
2066 let conn = db.connect();
2067 conn.query("CREATE NODE TABLE Function (name STRING, lines INT64, PRIMARY KEY (name))")
2068 .unwrap();
2069
2070 let batch = DeltaBatchBuilder::new("file:main.rs", 1)
2071 .upsert_node(
2072 "Function",
2073 "main",
2074 vec![],
2075 [("lines", TypedValue::Int64(42))],
2076 )
2077 .upsert_node(
2078 "Function",
2079 "helper",
2080 vec![],
2081 [("lines", TypedValue::Int64(10))],
2082 )
2083 .build();
2084
2085 let stats = conn.apply_delta(batch).unwrap();
2086 assert_eq!(stats.nodes_created, 2);
2087 assert_eq!(stats.nodes_updated, 0);
2088
2089 let result = conn
2090 .query("MATCH (f:Function) RETURN f.name, f.lines")
2091 .unwrap();
2092 assert_eq!(result.num_rows(), 2);
2093 }
2094
2095 #[test]
2096 fn delta_upsert_existing_node_merges() {
2097 use kyu_delta::DeltaBatchBuilder;
2098
2099 let db = Database::in_memory();
2100 let conn = db.connect();
2101 conn.query("CREATE NODE TABLE Function (name STRING, lines INT64, PRIMARY KEY (name))")
2102 .unwrap();
2103
2104 let batch1 = DeltaBatchBuilder::new("file:main.rs", 1)
2106 .upsert_node(
2107 "Function",
2108 "main",
2109 vec![],
2110 [("lines", TypedValue::Int64(42))],
2111 )
2112 .build();
2113 conn.apply_delta(batch1).unwrap();
2114
2115 let batch2 = DeltaBatchBuilder::new("file:main.rs", 2)
2117 .upsert_node(
2118 "Function",
2119 "main",
2120 vec![],
2121 [("lines", TypedValue::Int64(50))],
2122 )
2123 .build();
2124 let stats = conn.apply_delta(batch2).unwrap();
2125 assert_eq!(stats.nodes_created, 0);
2126 assert_eq!(stats.nodes_updated, 1);
2127
2128 let result = conn
2129 .query("MATCH (f:Function) WHERE f.name = 'main' RETURN f.lines")
2130 .unwrap();
2131 assert_eq!(result.num_rows(), 1);
2132 assert_eq!(result.row(0)[0], TypedValue::Int64(50));
2133 }
2134
2135 #[test]
2136 fn delta_delete_node() {
2137 use kyu_delta::DeltaBatchBuilder;
2138
2139 let db = Database::in_memory();
2140 let conn = db.connect();
2141 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
2142 .unwrap();
2143 conn.query("CREATE (n:Person {id: 1, name: 'Alice'})")
2144 .unwrap();
2145 conn.query("CREATE (n:Person {id: 2, name: 'Bob'})")
2146 .unwrap();
2147
2148 let batch = DeltaBatchBuilder::new("cleanup", 1)
2149 .delete_node("Person", "1")
2150 .build();
2151 let stats = conn.apply_delta(batch).unwrap();
2152 assert_eq!(stats.nodes_deleted, 1);
2153
2154 let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
2155 assert_eq!(result.num_rows(), 1);
2156 assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Bob")));
2157 }
2158
2159 #[test]
2160 fn delta_upsert_and_delete_edges() {
2161 use kyu_delta::DeltaBatchBuilder;
2162
2163 let db = Database::in_memory();
2164 let conn = db.connect();
2165 conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
2166 .unwrap();
2167 conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person, since INT64)")
2168 .unwrap();
2169 conn.query("CREATE (n:Person {id: 1})").unwrap();
2170 conn.query("CREATE (n:Person {id: 2})").unwrap();
2171
2172 let batch = DeltaBatchBuilder::new("social", 1)
2174 .upsert_edge(
2175 "Person",
2176 "1",
2177 "KNOWS",
2178 "Person",
2179 "2",
2180 [("since", TypedValue::Int64(2024))],
2181 )
2182 .build();
2183 let stats = conn.apply_delta(batch).unwrap();
2184 assert_eq!(stats.edges_created, 1);
2185
2186 let storage = db.storage().read().unwrap();
2188 let catalog = db.catalog().read();
2189 let rel_table_id = catalog.find_by_name("KNOWS").unwrap().table_id();
2190 let rows = storage.scan_rows(rel_table_id).unwrap();
2191 assert_eq!(rows.len(), 1);
2192 assert_eq!(rows[0].1[0], TypedValue::Int64(1)); assert_eq!(rows[0].1[1], TypedValue::Int64(2)); assert_eq!(rows[0].1[2], TypedValue::Int64(2024)); drop(storage);
2196 drop(catalog);
2197
2198 let batch2 = DeltaBatchBuilder::new("social", 2)
2200 .upsert_edge(
2201 "Person",
2202 "1",
2203 "KNOWS",
2204 "Person",
2205 "2",
2206 [("since", TypedValue::Int64(2025))],
2207 )
2208 .build();
2209 let stats2 = conn.apply_delta(batch2).unwrap();
2210 assert_eq!(stats2.edges_updated, 1);
2211
2212 let storage = db.storage().read().unwrap();
2213 let rows = storage.scan_rows(rel_table_id).unwrap();
2214 assert_eq!(rows[0].1[2], TypedValue::Int64(2025));
2215 drop(storage);
2216
2217 let batch3 = DeltaBatchBuilder::new("social", 3)
2219 .delete_edge("Person", "1", "KNOWS", "Person", "2")
2220 .build();
2221 let stats3 = conn.apply_delta(batch3).unwrap();
2222 assert_eq!(stats3.edges_deleted, 1);
2223
2224 let storage = db.storage().read().unwrap();
2225 let rows = storage.scan_rows(rel_table_id).unwrap();
2226 assert_eq!(rows.len(), 0);
2227 }
2228
2229 #[test]
2230 fn delta_idempotent_replay() {
2231 use kyu_delta::DeltaBatchBuilder;
2232
2233 let db = Database::in_memory();
2234 let conn = db.connect();
2235 conn.query("CREATE NODE TABLE File (path STRING, hash STRING, PRIMARY KEY (path))")
2236 .unwrap();
2237
2238 let batch = DeltaBatchBuilder::new("watcher", 100)
2239 .upsert_node(
2240 "File",
2241 "src/main.rs",
2242 vec![],
2243 [("hash", TypedValue::String(SmolStr::new("abc123")))],
2244 )
2245 .build();
2246
2247 let stats1 = conn.apply_delta(batch.clone()).unwrap();
2249 assert_eq!(stats1.nodes_created, 1);
2250
2251 let stats2 = conn.apply_delta(batch).unwrap();
2253 assert_eq!(stats2.nodes_created, 0);
2254 assert_eq!(stats2.nodes_updated, 1);
2255
2256 let result = conn.query("MATCH (f:File) RETURN f.path").unwrap();
2258 assert_eq!(result.num_rows(), 1);
2259 }
2260
2261 #[test]
2262 fn delta_stats_correct() {
2263 use kyu_delta::DeltaBatchBuilder;
2264
2265 let db = Database::in_memory();
2266 let conn = db.connect();
2267 conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
2268 .unwrap();
2269 conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person)")
2270 .unwrap();
2271
2272 let batch = DeltaBatchBuilder::new("test", 1)
2273 .upsert_node(
2274 "Person",
2275 "1",
2276 vec![],
2277 [("name", TypedValue::String(SmolStr::new("Alice")))],
2278 )
2279 .upsert_node(
2280 "Person",
2281 "2",
2282 vec![],
2283 [("name", TypedValue::String(SmolStr::new("Bob")))],
2284 )
2285 .upsert_edge(
2286 "Person",
2287 "1",
2288 "KNOWS",
2289 "Person",
2290 "2",
2291 Vec::<(&str, TypedValue)>::new(),
2292 )
2293 .build();
2294
2295 let stats = conn.apply_delta(batch).unwrap();
2296 assert_eq!(stats.nodes_created, 2);
2297 assert_eq!(stats.edges_created, 1);
2298 assert_eq!(stats.total_deltas, 3);
2299 assert!(stats.elapsed_micros > 0);
2300 }
2301}