Skip to main content

kyu_api/
connection.rs

1//! Connection — executes Cypher queries and DDL against a Database.
2
3use std::sync::{Arc, RwLock};
4
5use std::collections::HashMap;
6
7use std::time::Instant;
8
9use kyu_binder::{
10    BindContext, Binder, BoundMatchClause, BoundNodePattern, BoundPatternElement, BoundQuery,
11    BoundReadingClause, BoundStatement, BoundUpdatingClause,
12};
13use kyu_catalog::{Catalog, NodeTableEntry, Property, RelTableEntry};
14use kyu_common::id::TableId;
15use kyu_common::{KyuError, KyuResult};
16use kyu_delta::{DeltaBatch, DeltaStats, GraphDelta};
17use kyu_executor::{ExecutionContext, QueryResult, Storage, execute};
18use kyu_expression::{FunctionRegistry, evaluate, evaluate_constant};
19use kyu_planner::{build_query_plan, optimize, resolve_properties};
20use kyu_transaction::{Checkpointer, TransactionManager, TransactionType, Wal};
21use kyu_types::{LogicalType, TypedValue};
22use smol_str::SmolStr;
23
24use crate::storage::NodeGroupStorage;
25
26/// A connection to a KyuGraph database.
27///
28/// Connections share catalog and storage via `Arc`. Each query gets a
29/// consistent catalog snapshot for binding and planning; DDL mutates
30/// both catalog and storage atomically. Every query is wrapped in a
31/// transaction for crash safety and isolation.
32pub struct Connection {
33    catalog: Arc<Catalog>,
34    storage: Arc<RwLock<NodeGroupStorage>>,
35    txn_mgr: Arc<TransactionManager>,
36    wal: Arc<Wal>,
37    checkpointer: Arc<Checkpointer>,
38    extensions: Arc<Vec<Box<dyn kyu_extension::Extension>>>,
39}
40
41impl Connection {
42    pub(crate) fn new(
43        catalog: Arc<Catalog>,
44        storage: Arc<RwLock<NodeGroupStorage>>,
45        txn_mgr: Arc<TransactionManager>,
46        wal: Arc<Wal>,
47        checkpointer: Arc<Checkpointer>,
48        extensions: Arc<Vec<Box<dyn kyu_extension::Extension>>>,
49    ) -> Self {
50        Self {
51            catalog,
52            storage,
53            txn_mgr,
54            wal,
55            checkpointer,
56            extensions,
57        }
58    }
59
60    /// Execute a Cypher statement, returning a QueryResult.
61    ///
62    /// Each statement is wrapped in a transaction: read-only queries get a
63    /// `ReadOnly` transaction, write queries get a `Write` transaction.
64    /// The transaction is committed on success and rolled back on error.
65    pub fn query(&self, cypher: &str) -> KyuResult<QueryResult> {
66        self.query_internal(cypher, BindContext::empty())
67    }
68
69    /// Execute a Cypher statement with parameter bindings for `$param` placeholders.
70    ///
71    /// Parameters are resolved to literal values at bind time, before planning
72    /// and execution. This is the preferred way to pass dynamic values safely.
73    ///
74    /// ```ignore
75    /// use std::collections::HashMap;
76    /// use kyu_types::TypedValue;
77    ///
78    /// let mut params = HashMap::new();
79    /// params.insert("min_age".to_string(), TypedValue::Int64(25));
80    /// let result = conn.query_with_params(
81    ///     "MATCH (p:Person) WHERE p.age > $min_age RETURN p.name",
82    ///     params,
83    /// ).unwrap();
84    /// ```
85    pub fn query_with_params(
86        &self,
87        cypher: &str,
88        params: HashMap<String, TypedValue>,
89    ) -> KyuResult<QueryResult> {
90        let ctx = BindContext {
91            params: params
92                .into_iter()
93                .map(|(k, v)| (SmolStr::new(k), v))
94                .collect(),
95            env: HashMap::new(),
96        };
97        self.query_internal(cypher, ctx)
98    }
99
100    /// Full VM-style execution with both `$param` bindings and `env()` values.
101    ///
102    /// The Cypher evaluator is treated like a virtual machine: it accepts a
103    /// query string plus two context maps that are resolved before planning:
104    ///
105    /// - `params`: `$param` placeholders → `TypedValue`
106    /// - `env`: `env('KEY')` lookups → `TypedValue`
107    ///
108    /// ```ignore
109    /// use std::collections::HashMap;
110    /// use kyu_types::TypedValue;
111    ///
112    /// let mut params = HashMap::new();
113    /// params.insert("name".to_string(), TypedValue::String("Alice".into()));
114    /// let mut env = HashMap::new();
115    /// env.insert("PREFIX".to_string(), TypedValue::String("graph_".into()));
116    /// let result = conn.execute(
117    ///     "MATCH (p:Person) WHERE p.name = $name RETURN p.name",
118    ///     params,
119    ///     env,
120    /// ).unwrap();
121    /// ```
122    pub fn execute(
123        &self,
124        cypher: &str,
125        params: HashMap<String, TypedValue>,
126        env: HashMap<String, TypedValue>,
127    ) -> KyuResult<QueryResult> {
128        let ctx = BindContext {
129            params: params
130                .into_iter()
131                .map(|(k, v)| (SmolStr::new(k), v))
132                .collect(),
133            env: env.into_iter().map(|(k, v)| (SmolStr::new(k), v)).collect(),
134        };
135        self.query_internal(cypher, ctx)
136    }
137
138    fn query_internal(&self, cypher: &str, ctx: BindContext) -> KyuResult<QueryResult> {
139        // Fast path: CHECKPOINT command.
140        if cypher.trim().eq_ignore_ascii_case("CHECKPOINT")
141            || cypher.trim().eq_ignore_ascii_case("CHECKPOINT;")
142        {
143            self.checkpointer
144                .checkpoint()
145                .map_err(|e| KyuError::Transaction(format!("checkpoint failed: {e}")))?;
146            return Ok(QueryResult::new(vec![], vec![]));
147        }
148
149        // Fast path: CALL ext.proc(...) routing to extensions.
150        if let Some(result) = self.try_call_extension(cypher)? {
151            return Ok(result);
152        }
153
154        // 1. Parse
155        let parse_result = kyu_parser::parse(cypher);
156        let stmt = parse_result
157            .ast
158            .ok_or_else(|| KyuError::Parser(format!("{:?}", parse_result.errors)))?;
159
160        // 2. Bind (against a catalog snapshot)
161        let catalog_snapshot = self.catalog.read();
162        let mut binder =
163            Binder::new(catalog_snapshot, FunctionRegistry::with_builtins()).with_context(ctx);
164        let bound = binder.bind(&stmt)?;
165
166        // 3. Determine if this is a write operation and/or DDL.
167        let is_ddl = matches!(
168            &bound,
169            BoundStatement::CreateNodeTable(_)
170                | BoundStatement::CreateRelTable(_)
171                | BoundStatement::Drop(_)
172        );
173        let is_write = match &bound {
174            BoundStatement::Query(q) => self.is_standalone_dml(q) || self.has_match_mutations(q),
175            BoundStatement::CopyFrom(_) | BoundStatement::LoadFrom(_) => true,
176            _ => is_ddl,
177        };
178
179        // 4. Begin transaction.
180        let txn_type = if is_write {
181            TransactionType::Write
182        } else {
183            TransactionType::ReadOnly
184        };
185        let mut txn = self
186            .txn_mgr
187            .begin(txn_type)
188            .map_err(|e| KyuError::Transaction(e.to_string()))?;
189
190        // 5. Execute within the transaction.
191        let result = self.execute_bound(bound);
192
193        // 6. Commit on success, rollback on error.
194        match &result {
195            Ok(_) => {
196                // For DDL, snapshot the catalog to WAL for crash recovery.
197                if is_ddl {
198                    let snapshot = self.catalog.read().serialize_json();
199                    txn.log_catalog_snapshot(snapshot.into_bytes());
200                }
201                self.txn_mgr
202                    .commit(&mut txn, &self.wal, |_, _| {})
203                    .map_err(|e| KyuError::Transaction(e.to_string()))?;
204                // Auto-checkpoint after write commits if WAL exceeds threshold.
205                if is_write {
206                    let _ = self.checkpointer.try_checkpoint();
207                }
208            }
209            Err(_) => {
210                let _ = self.txn_mgr.rollback(&mut txn, |_| {});
211            }
212        }
213
214        result
215    }
216
217    /// Route a bound statement to the appropriate executor.
218    fn execute_bound(&self, bound: BoundStatement) -> KyuResult<QueryResult> {
219        match bound {
220            BoundStatement::Query(query) => {
221                if self.is_standalone_dml(&query) {
222                    return self.exec_dml(&query);
223                }
224                if self.has_match_mutations(&query) {
225                    return self.exec_match_dml(&query);
226                }
227                let catalog_snapshot = self.catalog.read();
228                let plan = build_query_plan(&query, &catalog_snapshot)?;
229                let plan = optimize(plan, &catalog_snapshot);
230                let storage_guard = self.storage.read().unwrap();
231                let ctx = ExecutionContext::new(catalog_snapshot, &*storage_guard);
232                execute(&plan, &query.output_schema, &ctx)
233            }
234            BoundStatement::CreateNodeTable(create) => self.exec_create_node_table(&create),
235            BoundStatement::CreateRelTable(create) => self.exec_create_rel_table(&create),
236            BoundStatement::Drop(drop) => self.exec_drop(&drop),
237            BoundStatement::CopyFrom(copy) => self.exec_copy_from(&copy),
238            BoundStatement::LoadFrom(load) => self.exec_load_from(&load),
239            _ => Err(KyuError::NotImplemented(
240                "statement type not yet supported".into(),
241            )),
242        }
243    }
244
245    // ---- DML execution ----
246
247    /// Returns true if the query has no reading clauses (standalone CREATE, CREATE...RETURN).
248    fn is_standalone_dml(&self, query: &BoundQuery) -> bool {
249        query
250            .parts
251            .iter()
252            .all(|part| part.reading_clauses.is_empty() && !part.updating_clauses.is_empty())
253    }
254
255    /// Execute a standalone DML statement (CREATE nodes/rels, with optional RETURN).
256    fn exec_dml(&self, query: &BoundQuery) -> KyuResult<QueryResult> {
257        let catalog_snapshot = self.catalog.read();
258
259        for part in &query.parts {
260            // Track created nodes for potential RETURN projection.
261            let mut created_nodes: Vec<(Option<u32>, TableId, Vec<TypedValue>)> = Vec::new();
262
263            for clause in &part.updating_clauses {
264                match clause {
265                    BoundUpdatingClause::Create(patterns) => {
266                        for pattern in patterns {
267                            for element in &pattern.elements {
268                                match element {
269                                    BoundPatternElement::Node(node) => {
270                                        let values =
271                                            self.exec_create_node(node, &catalog_snapshot)?;
272                                        created_nodes.push((
273                                            node.variable_index,
274                                            node.table_id,
275                                            values,
276                                        ));
277                                    }
278                                    BoundPatternElement::Relationship(_rel) => {
279                                        return Err(KyuError::NotImplemented(
280                                            "CREATE relationship not yet supported".into(),
281                                        ));
282                                    }
283                                }
284                            }
285                        }
286                    }
287                    BoundUpdatingClause::Set(_) => {
288                        return Err(KyuError::NotImplemented(
289                            "standalone SET without MATCH".into(),
290                        ));
291                    }
292                    BoundUpdatingClause::Delete(_) => {
293                        return Err(KyuError::NotImplemented(
294                            "standalone DELETE without MATCH".into(),
295                        ));
296                    }
297                }
298            }
299
300            // Handle RETURN projection if present.
301            if let Some(ref proj) = part.projection {
302                let mut prop_map: HashMap<(u32, SmolStr), u32> = HashMap::new();
303                let mut combined_values: Vec<TypedValue> = Vec::new();
304                let mut offset = 0u32;
305
306                for (var_idx, table_id, values) in &created_nodes {
307                    if let Some(entry) = catalog_snapshot.find_by_id(*table_id) {
308                        if let Some(vi) = var_idx {
309                            for (i, prop) in entry.properties().iter().enumerate() {
310                                prop_map.insert((*vi, prop.name.clone()), offset + i as u32);
311                            }
312                        }
313                        offset += entry.properties().len() as u32;
314                    }
315                    combined_values.extend(values.iter().cloned());
316                }
317
318                let col_names: Vec<SmolStr> =
319                    proj.items.iter().map(|item| item.alias.clone()).collect();
320                let col_types: Vec<LogicalType> = proj
321                    .items
322                    .iter()
323                    .map(|item| item.expression.result_type().clone())
324                    .collect();
325
326                let mut row: Vec<TypedValue> = Vec::with_capacity(proj.items.len());
327                for item in &proj.items {
328                    let resolved = resolve_properties(&item.expression, &prop_map);
329                    let value = evaluate(&resolved, combined_values.as_slice())?;
330                    row.push(value);
331                }
332
333                let mut result = QueryResult::new(col_names, col_types);
334                result.push_row(row);
335                return Ok(result);
336            }
337        }
338
339        Ok(QueryResult::new(vec![], vec![]))
340    }
341
342    /// Insert a single node row into storage based on a CREATE pattern.
343    /// Returns the created values in catalog property order.
344    fn exec_create_node(
345        &self,
346        node: &BoundNodePattern,
347        catalog: &kyu_catalog::CatalogContent,
348    ) -> KyuResult<Vec<TypedValue>> {
349        let entry = catalog
350            .find_by_id(node.table_id)
351            .ok_or_else(|| KyuError::Catalog(format!("table {:?} not found", node.table_id)))?;
352        let properties = entry.properties();
353
354        let mut values = Vec::with_capacity(properties.len());
355        for prop in properties {
356            let value = if let Some((_pid, expr)) =
357                node.properties.iter().find(|(pid, _)| *pid == prop.id)
358            {
359                evaluate_constant(expr)?
360            } else {
361                TypedValue::Null
362            };
363            values.push(value);
364        }
365
366        self.storage
367            .write()
368            .unwrap()
369            .insert_row(node.table_id, &values)?;
370
371        Ok(values)
372    }
373
374    /// Returns true if the query has both MATCH (reading) and SET/DELETE (updating) clauses.
375    fn has_match_mutations(&self, query: &BoundQuery) -> bool {
376        query
377            .parts
378            .iter()
379            .any(|part| !part.reading_clauses.is_empty() && !part.updating_clauses.is_empty())
380    }
381
382    /// Execute MATCH...SET/DELETE/CREATE: scan, filter, then mutate.
383    ///
384    /// Supports multiple chained MATCH clauses (e.g. MATCH (a:X) WHERE ...
385    /// MATCH (b:Y) WHERE ... CREATE (a)-[:REL]->(b)).
386    fn exec_match_dml(&self, query: &BoundQuery) -> KyuResult<QueryResult> {
387        let catalog_snapshot = self.catalog.read();
388
389        for part in &query.parts {
390            // Collect all MATCH clauses in this part.
391            let match_clauses: Vec<&BoundMatchClause> = part
392                .reading_clauses
393                .iter()
394                .filter_map(|c| match c {
395                    BoundReadingClause::Match(m) => Some(m),
396                    _ => None,
397                })
398                .collect();
399
400            if match_clauses.is_empty() {
401                return Err(KyuError::NotImplemented(
402                    "MATCH...mutation requires at least one MATCH clause".into(),
403                ));
404            }
405
406            // Build combined result: for each MATCH clause, scan + filter
407            // and store matched rows indexed by variable_index.
408            // Each "binding" maps variable_index → row values.
409            let mut bindings: Vec<HashMap<u32, Vec<TypedValue>>> = vec![HashMap::new()];
410            let mut prop_map: HashMap<(u32, SmolStr), u32> = HashMap::new();
411            let mut global_offset = 0u32;
412
413            for mc in &match_clauses {
414                let (table_id, var_idx) = self.extract_match_node(mc)?;
415                let entry = catalog_snapshot
416                    .find_by_id(table_id)
417                    .ok_or_else(|| KyuError::Catalog(format!("table {:?} not found", table_id)))?;
418                let properties = entry.properties();
419
420                // Add this node's properties to the combined prop_map.
421                if let Some(vi) = var_idx {
422                    for (i, p) in properties.iter().enumerate() {
423                        prop_map.insert((vi, p.name.clone()), global_offset + i as u32);
424                    }
425                }
426
427                // Resolve WHERE predicate against current prop_map.
428                let resolved_where = mc
429                    .where_clause
430                    .as_ref()
431                    .map(|w| resolve_properties(w, &prop_map));
432
433                // Scan this table.
434                let rows = self.storage.read().unwrap().scan_rows(table_id)?;
435
436                // Cross-product with existing bindings, filtering by WHERE.
437                let mut new_bindings = Vec::new();
438                for existing in &bindings {
439                    for (_row_idx, row_values) in &rows {
440                        // Build combined tuple for WHERE evaluation.
441                        let mut combined = Vec::new();
442                        // Gather values from previously matched variables in order.
443                        let mut entries: Vec<(u32, &Vec<TypedValue>)> =
444                            existing.iter().map(|(&k, v)| (k, v)).collect();
445                        entries.sort_by_key(|(k, _)| *k);
446                        for (_, vals) in &entries {
447                            combined.extend(vals.iter().cloned());
448                        }
449                        combined.extend(row_values.iter().cloned());
450
451                        // Evaluate WHERE against combined tuple.
452                        if let Some(ref pred) = resolved_where {
453                            let result = evaluate(pred, combined.as_slice())?;
454                            if result != TypedValue::Bool(true) {
455                                continue;
456                            }
457                        }
458
459                        let mut binding = existing.clone();
460                        if let Some(vi) = var_idx {
461                            binding.insert(vi, row_values.clone());
462                        }
463                        new_bindings.push(binding);
464                    }
465                }
466                bindings = new_bindings;
467                global_offset += properties.len() as u32;
468            }
469
470            // Now process updating clauses for each binding.
471            let mut set_mutations: Vec<(TableId, u64, usize, TypedValue)> = Vec::new();
472            let mut delete_rows: Vec<(TableId, u64)> = Vec::new();
473            let mut rel_inserts: Vec<(TableId, Vec<TypedValue>)> = Vec::new();
474
475            for binding in &bindings {
476                // Build combined flat tuple for expression evaluation.
477                let mut combined = Vec::new();
478                let mut entries: Vec<(u32, &Vec<TypedValue>)> =
479                    binding.iter().map(|(&k, v)| (k, v)).collect();
480                entries.sort_by_key(|(k, _)| *k);
481                for (_, vals) in &entries {
482                    combined.extend(vals.iter().cloned());
483                }
484
485                for clause in &part.updating_clauses {
486                    match clause {
487                        BoundUpdatingClause::Set(items) => {
488                            // For SET, we need the first MATCH clause's table.
489                            let (table_id, _) = self.extract_match_node(match_clauses[0])?;
490                            let entry = catalog_snapshot.find_by_id(table_id).unwrap();
491                            let properties = entry.properties();
492                            let rows = self.storage.read().unwrap().scan_rows(table_id)?;
493                            // Find the matching row by PK comparison.
494                            let first_var =
495                                match_clauses[0].patterns[0].elements.iter().find_map(|e| {
496                                    if let BoundPatternElement::Node(n) = e {
497                                        n.variable_index
498                                    } else {
499                                        None
500                                    }
501                                });
502                            if let Some(vi) = first_var
503                                && let Some(var_vals) = binding.get(&vi)
504                            {
505                                for (row_idx, row_values) in &rows {
506                                    if row_values == var_vals {
507                                        for item in items {
508                                            let resolved_value =
509                                                resolve_properties(&item.value, &prop_map);
510                                            let new_value =
511                                                evaluate(&resolved_value, combined.as_slice())?;
512                                            let col_idx = properties
513                                                .iter()
514                                                .position(|p| p.id == item.property_id)
515                                                .ok_or_else(|| {
516                                                    KyuError::Storage(format!(
517                                                        "property {:?} not found",
518                                                        item.property_id
519                                                    ))
520                                                })?;
521                                            set_mutations
522                                                .push((table_id, *row_idx, col_idx, new_value));
523                                        }
524                                        break;
525                                    }
526                                }
527                            }
528                        }
529                        BoundUpdatingClause::Delete(_) => {
530                            let (table_id, _) = self.extract_match_node(match_clauses[0])?;
531                            let rows = self.storage.read().unwrap().scan_rows(table_id)?;
532                            let first_var =
533                                match_clauses[0].patterns[0].elements.iter().find_map(|e| {
534                                    if let BoundPatternElement::Node(n) = e {
535                                        n.variable_index
536                                    } else {
537                                        None
538                                    }
539                                });
540                            if let Some(vi) = first_var
541                                && let Some(var_vals) = binding.get(&vi)
542                            {
543                                for (row_idx, row_values) in &rows {
544                                    if row_values == var_vals {
545                                        delete_rows.push((table_id, *row_idx));
546                                        break;
547                                    }
548                                }
549                            }
550                        }
551                        BoundUpdatingClause::Create(patterns) => {
552                            for pattern in patterns {
553                                self.exec_create_in_binding(
554                                    pattern,
555                                    binding,
556                                    &catalog_snapshot,
557                                    &mut rel_inserts,
558                                )?;
559                            }
560                        }
561                    }
562                }
563            }
564
565            // Phase 2: Write — apply mutations.
566            let mut storage = self.storage.write().unwrap();
567            for (table_id, row_idx, col_idx, value) in &set_mutations {
568                storage.update_cell(*table_id, *row_idx, *col_idx, value)?;
569            }
570            for (table_id, row_idx) in &delete_rows {
571                storage.delete_row(*table_id, *row_idx)?;
572            }
573            for (table_id, values) in &rel_inserts {
574                storage.insert_row(*table_id, values)?;
575            }
576        }
577
578        Ok(QueryResult::new(vec![], vec![]))
579    }
580
581    /// Execute CREATE patterns within a binding (matched variable → row values).
582    /// Handles relationship creation: extracts src/dst PK from bound variables.
583    fn exec_create_in_binding(
584        &self,
585        pattern: &kyu_binder::BoundPattern,
586        binding: &HashMap<u32, Vec<TypedValue>>,
587        catalog: &kyu_catalog::CatalogContent,
588        rel_inserts: &mut Vec<(TableId, Vec<TypedValue>)>,
589    ) -> KyuResult<()> {
590        let elements = &pattern.elements;
591        let mut i = 0;
592        while i < elements.len() {
593            match &elements[i] {
594                BoundPatternElement::Node(_node) => {
595                    // Node in CREATE within MATCH context is a reference to an
596                    // already-matched variable — no new node creation needed.
597                    i += 1;
598                }
599                BoundPatternElement::Relationship(rel) => {
600                    // Expect: Node(src), Rel, Node(dst)
601                    let src_var = if i > 0 {
602                        if let BoundPatternElement::Node(n) = &elements[i - 1] {
603                            n.variable_index
604                        } else {
605                            None
606                        }
607                    } else {
608                        None
609                    };
610                    let dst_var = if i + 1 < elements.len() {
611                        if let BoundPatternElement::Node(n) = &elements[i + 1] {
612                            n.variable_index
613                        } else {
614                            None
615                        }
616                    } else {
617                        None
618                    };
619
620                    let src_vi = src_var.ok_or_else(|| {
621                        KyuError::Runtime("CREATE rel: cannot resolve source node".into())
622                    })?;
623                    let dst_vi = dst_var.ok_or_else(|| {
624                        KyuError::Runtime("CREATE rel: cannot resolve destination node".into())
625                    })?;
626
627                    let src_vals = binding.get(&src_vi).ok_or_else(|| {
628                        KyuError::Runtime(format!(
629                            "CREATE rel: source var {src_vi} not in bindings"
630                        ))
631                    })?;
632                    let dst_vals = binding.get(&dst_vi).ok_or_else(|| {
633                        KyuError::Runtime(format!("CREATE rel: dest var {dst_vi} not in bindings"))
634                    })?;
635
636                    // Look up the rel table entry to find src/dst node tables.
637                    let rel_entry = catalog
638                        .find_by_id(rel.table_id)
639                        .and_then(|e| e.as_rel_table())
640                        .ok_or_else(|| {
641                            KyuError::Catalog(format!("rel table {:?} not found", rel.table_id))
642                        })?;
643
644                    let src_node_entry = catalog
645                        .find_by_id(rel_entry.from_table_id)
646                        .and_then(|e| e.as_node_table())
647                        .ok_or_else(|| KyuError::Catalog("source node table not found".into()))?;
648                    let dst_node_entry = catalog
649                        .find_by_id(rel_entry.to_table_id)
650                        .and_then(|e| e.as_node_table())
651                        .ok_or_else(|| KyuError::Catalog("dest node table not found".into()))?;
652
653                    // Extract primary key values from matched rows.
654                    let src_pk = src_vals[src_node_entry.primary_key_idx].clone();
655                    let dst_pk = dst_vals[dst_node_entry.primary_key_idx].clone();
656
657                    // Build rel row: [src_pk, dst_pk, ...user_properties]
658                    let mut values = vec![src_pk, dst_pk];
659                    let rel_properties = rel_entry.properties.as_slice();
660                    for prop in rel_properties {
661                        let value = if let Some((_pid, expr)) =
662                            rel.properties.iter().find(|(pid, _)| *pid == prop.id)
663                        {
664                            evaluate_constant(expr)?
665                        } else {
666                            TypedValue::Null
667                        };
668                        values.push(value);
669                    }
670
671                    rel_inserts.push((rel.table_id, values));
672                    i += 1;
673                }
674            }
675        }
676        Ok(())
677    }
678
679    /// Extract the single node table_id and variable_index from a MATCH clause.
680    fn extract_match_node(
681        &self,
682        match_clause: &BoundMatchClause,
683    ) -> KyuResult<(TableId, Option<u32>)> {
684        for pattern in &match_clause.patterns {
685            for element in &pattern.elements {
686                if let BoundPatternElement::Node(node) = element {
687                    return Ok((node.table_id, node.variable_index));
688                }
689            }
690        }
691        Err(KyuError::NotImplemented(
692            "MATCH clause must contain at least one node pattern".into(),
693        ))
694    }
695
696    // ---- Delta Fast Path ----
697
698    /// Apply a batch of conflict-free, idempotent upserts that bypass OCC.
699    ///
700    /// All deltas in the batch commit atomically via a single WAL append.
701    /// Semantics are "last write wins" — safe only when upsert semantics
702    /// are acceptable (ingestion pipelines, agentic code graphs, document
703    /// processing).
704    pub fn apply_delta(&self, batch: DeltaBatch) -> KyuResult<DeltaStats> {
705        let start = Instant::now();
706        let mut stats = DeltaStats {
707            total_deltas: batch.len() as u64,
708            ..DeltaStats::default()
709        };
710
711        // Begin a write transaction for WAL serialization.
712        let mut txn = self
713            .txn_mgr
714            .begin(TransactionType::Write)
715            .map_err(|e| KyuError::Transaction(e.to_string()))?;
716
717        let catalog = self.catalog.read();
718        let mut storage = self.storage.write().unwrap();
719
720        for delta in batch.iter() {
721            match delta {
722                GraphDelta::UpsertNode {
723                    key,
724                    labels: _,
725                    props,
726                } => {
727                    let entry = catalog.find_by_name(key.label.as_str()).ok_or_else(|| {
728                        KyuError::Catalog(format!("node table '{}' not found", key.label))
729                    })?;
730                    let node_entry = entry.as_node_table().ok_or_else(|| {
731                        KyuError::Catalog(format!("'{}' is not a node table", key.label))
732                    })?;
733                    let table_id = node_entry.table_id;
734                    let pk_col_idx = node_entry.primary_key_idx;
735                    let pk_type = &node_entry.properties[pk_col_idx].data_type;
736                    let pk_value = parse_primary_key(key.primary_key.as_str(), pk_type)?;
737
738                    let existing = find_row_by_pk(&storage, table_id, pk_col_idx, &pk_value)?;
739
740                    if let Some(row_idx) = existing {
741                        // UPDATE: merge properties (unmentioned props unchanged).
742                        for (prop_name, value) in props {
743                            if let Some(col_idx) =
744                                find_property_index(node_entry, prop_name.as_str())
745                            {
746                                storage.update_cell(table_id, row_idx, col_idx, value)?;
747                            }
748                        }
749                        stats.nodes_updated += 1;
750                    } else {
751                        // INSERT: build full row with PK + props, nulls for absent columns.
752                        let values = build_node_row(node_entry, &pk_value, props);
753                        storage.insert_row(table_id, &values)?;
754                        stats.nodes_created += 1;
755                    }
756                }
757
758                GraphDelta::UpsertEdge {
759                    src,
760                    rel_type,
761                    dst,
762                    props,
763                } => {
764                    let entry = catalog.find_by_name(rel_type.as_str()).ok_or_else(|| {
765                        KyuError::Catalog(format!("rel table '{}' not found", rel_type))
766                    })?;
767                    let rel_entry = entry.as_rel_table().ok_or_else(|| {
768                        KyuError::Catalog(format!("'{}' is not a rel table", rel_type))
769                    })?;
770                    let rel_table_id = rel_entry.table_id;
771
772                    // Resolve src/dst primary key types from their node tables.
773                    let src_node = catalog
774                        .find_by_name(src.label.as_str())
775                        .and_then(|e| e.as_node_table())
776                        .ok_or_else(|| {
777                            KyuError::Catalog(format!("node table '{}' not found", src.label))
778                        })?;
779                    let dst_node = catalog
780                        .find_by_name(dst.label.as_str())
781                        .and_then(|e| e.as_node_table())
782                        .ok_or_else(|| {
783                            KyuError::Catalog(format!("node table '{}' not found", dst.label))
784                        })?;
785
786                    let src_pk_type = &src_node.properties[src_node.primary_key_idx].data_type;
787                    let dst_pk_type = &dst_node.properties[dst_node.primary_key_idx].data_type;
788                    let src_pk = parse_primary_key(src.primary_key.as_str(), src_pk_type)?;
789                    let dst_pk = parse_primary_key(dst.primary_key.as_str(), dst_pk_type)?;
790
791                    // Rel table storage schema: [src_key, dst_key, ...user_props]
792                    let existing = find_edge_row(&storage, rel_table_id, &src_pk, &dst_pk)?;
793
794                    if let Some(row_idx) = existing {
795                        // UPDATE: merge edge properties (offset by 2 for src/dst key cols).
796                        for (prop_name, value) in props {
797                            if let Some(prop_idx) =
798                                find_rel_property_index(rel_entry, prop_name.as_str())
799                            {
800                                let col_idx = prop_idx + 2; // skip src_key, dst_key columns
801                                storage.update_cell(rel_table_id, row_idx, col_idx, value)?;
802                            }
803                        }
804                        stats.edges_updated += 1;
805                    } else {
806                        // INSERT: [src_pk, dst_pk, ...props]
807                        let values = build_edge_row(rel_entry, &src_pk, &dst_pk, props);
808                        storage.insert_row(rel_table_id, &values)?;
809                        stats.edges_created += 1;
810                    }
811                }
812
813                GraphDelta::DeleteNode { key } => {
814                    let entry = catalog
815                        .find_by_name(key.label.as_str())
816                        .and_then(|e| e.as_node_table())
817                        .ok_or_else(|| {
818                            KyuError::Catalog(format!("node table '{}' not found", key.label))
819                        })?;
820                    let table_id = entry.table_id;
821                    let pk_col_idx = entry.primary_key_idx;
822                    let pk_type = &entry.properties[pk_col_idx].data_type;
823                    let pk_value = parse_primary_key(key.primary_key.as_str(), pk_type)?;
824
825                    if let Some(row_idx) =
826                        find_row_by_pk(&storage, table_id, pk_col_idx, &pk_value)?
827                    {
828                        storage.delete_row(table_id, row_idx)?;
829                        stats.nodes_deleted += 1;
830                    }
831                }
832
833                GraphDelta::DeleteEdge { src, rel_type, dst } => {
834                    let rel_entry = catalog
835                        .find_by_name(rel_type.as_str())
836                        .and_then(|e| e.as_rel_table())
837                        .ok_or_else(|| {
838                            KyuError::Catalog(format!("rel table '{}' not found", rel_type))
839                        })?;
840                    let rel_table_id = rel_entry.table_id;
841
842                    let src_node = catalog
843                        .find_by_name(src.label.as_str())
844                        .and_then(|e| e.as_node_table())
845                        .ok_or_else(|| {
846                            KyuError::Catalog(format!("node table '{}' not found", src.label))
847                        })?;
848                    let dst_node = catalog
849                        .find_by_name(dst.label.as_str())
850                        .and_then(|e| e.as_node_table())
851                        .ok_or_else(|| {
852                            KyuError::Catalog(format!("node table '{}' not found", dst.label))
853                        })?;
854
855                    let src_pk = parse_primary_key(
856                        src.primary_key.as_str(),
857                        &src_node.properties[src_node.primary_key_idx].data_type,
858                    )?;
859                    let dst_pk = parse_primary_key(
860                        dst.primary_key.as_str(),
861                        &dst_node.properties[dst_node.primary_key_idx].data_type,
862                    )?;
863
864                    if let Some(row_idx) = find_edge_row(&storage, rel_table_id, &src_pk, &dst_pk)?
865                    {
866                        storage.delete_row(rel_table_id, row_idx)?;
867                        stats.edges_deleted += 1;
868                    }
869                }
870            }
871        }
872
873        drop(storage);
874        drop(catalog);
875
876        // Commit WAL record.
877        self.txn_mgr
878            .commit(&mut txn, &self.wal, |_, _| {})
879            .map_err(|e| KyuError::Transaction(e.to_string()))?;
880        let _ = self.checkpointer.try_checkpoint();
881
882        stats.elapsed_micros = start.elapsed().as_micros() as u64;
883        Ok(stats)
884    }
885
886    // ---- Extension CALL routing ----
887
888    /// Try to parse and route a `CALL ext.proc(args...)` statement to a registered extension.
889    /// Returns `None` if the statement is not a CALL.
890    fn try_call_extension(&self, cypher: &str) -> KyuResult<Option<QueryResult>> {
891        let trimmed = cypher.trim();
892        if !trimmed.to_uppercase().starts_with("CALL ") {
893            return Ok(None);
894        }
895
896        // Parse: CALL <ext>.<proc>(<arg1>, <arg2>, ...)
897        let rest = trimmed[5..].trim();
898        let dot_pos = rest.find('.').ok_or_else(|| {
899            KyuError::Binder("CALL requires <extension>.<procedure>(...) syntax".into())
900        })?;
901        let ext_name = &rest[..dot_pos];
902        let after_dot = &rest[dot_pos + 1..];
903
904        let paren_pos = after_dot.find('(').ok_or_else(|| {
905            KyuError::Binder("CALL requires <extension>.<procedure>(...) syntax".into())
906        })?;
907        let proc_name = &after_dot[..paren_pos];
908        let args_str = after_dot[paren_pos + 1..].trim_end_matches([')', ';']);
909
910        let args: Vec<String> = if args_str.trim().is_empty() {
911            Vec::new()
912        } else {
913            args_str
914                .split(',')
915                .map(|s| s.trim().trim_matches('\'').to_string())
916                .collect()
917        };
918
919        // Find matching extension.
920        let ext = self
921            .extensions
922            .iter()
923            .find(|e| e.name() == ext_name)
924            .ok_or_else(|| KyuError::Binder(format!("unknown extension '{ext_name}'")))?;
925
926        // Build adjacency only if the extension needs it (e.g., graph algorithms).
927        let adjacency = if ext.needs_graph() {
928            self.build_graph_adjacency()
929        } else {
930            std::collections::HashMap::new()
931        };
932
933        // Execute.
934        let rows = ext
935            .execute(proc_name, &args, &adjacency)
936            .map_err(|e| KyuError::Runtime(format!("extension error: {e}")))?;
937
938        // Get procedure signature to determine column names and types.
939        let proc_sig = ext
940            .procedures()
941            .into_iter()
942            .find(|p| p.name == proc_name)
943            .ok_or_else(|| {
944                KyuError::Binder(format!(
945                    "unknown procedure '{proc_name}' in extension '{ext_name}'"
946                ))
947            })?;
948
949        let col_names: Vec<SmolStr> = proc_sig
950            .columns
951            .iter()
952            .map(|c| SmolStr::new(&c.name))
953            .collect();
954        let col_types: Vec<LogicalType> = proc_sig
955            .columns
956            .iter()
957            .map(|c| c.data_type.clone())
958            .collect();
959
960        let mut result = QueryResult::new(col_names, col_types);
961        for proc_row in rows {
962            result.push_row(proc_row);
963        }
964
965        Ok(Some(result))
966    }
967
968    /// Build a complete graph adjacency map from all relationship tables.
969    ///
970    /// Uses typed slice accessors on FlatVector columns for direct i64 buffer
971    /// access, avoiding per-element get_value() dispatch and TypedValue construction.
972    fn build_graph_adjacency(&self) -> std::collections::HashMap<i64, Vec<(i64, f64)>> {
973        use kyu_executor::value_vector::ValueVector;
974
975        let mut adjacency: std::collections::HashMap<i64, Vec<(i64, f64)>> =
976            std::collections::HashMap::new();
977        let catalog = self.catalog.read();
978        let storage = self.storage.read().unwrap();
979
980        for rel in catalog.rel_tables() {
981            let table_id = rel.table_id;
982            for chunk in storage.scan_table(table_id) {
983                let n = chunk.num_rows();
984                if n == 0 {
985                    continue;
986                }
987
988                let src_col = chunk.column(0);
989                let dst_col = chunk.column(1);
990
991                // Fast path: both columns are FlatVector Int64 with identity selection.
992                if chunk.selection().is_identity()
993                    && let (ValueVector::Flat(src_flat), ValueVector::Flat(dst_flat)) =
994                        (src_col, dst_col)
995                {
996                    let src_slice = src_flat.data_as_i64_slice();
997                    let dst_slice = dst_flat.data_as_i64_slice();
998                    let src_nm = src_flat.null_mask();
999                    let dst_nm = dst_flat.null_mask();
1000                    for i in 0..n {
1001                        if !src_nm.is_null(i as u64) && !dst_nm.is_null(i as u64) {
1002                            adjacency
1003                                .entry(src_slice[i])
1004                                .or_default()
1005                                .push((dst_slice[i], 1.0));
1006                        }
1007                    }
1008                    continue;
1009                }
1010
1011                // Fallback: per-element extraction.
1012                for row_idx in 0..n {
1013                    let src = chunk.get_value(row_idx, 0);
1014                    let dst = chunk.get_value(row_idx, 1);
1015                    if let (TypedValue::Int64(s), TypedValue::Int64(d)) = (src, dst) {
1016                        adjacency.entry(s).or_default().push((d, 1.0));
1017                    }
1018                }
1019            }
1020        }
1021
1022        adjacency
1023    }
1024
1025    // ---- DDL execution ----
1026
1027    fn exec_create_node_table(
1028        &self,
1029        create: &kyu_binder::BoundCreateNodeTable,
1030    ) -> KyuResult<QueryResult> {
1031        let mut catalog = self.catalog.begin_write();
1032
1033        let table_id = catalog.alloc_table_id();
1034        let properties: Vec<Property> = create
1035            .columns
1036            .iter()
1037            .map(|col| {
1038                let prop_id = catalog.alloc_property_id();
1039                Property::new(
1040                    prop_id,
1041                    col.name.clone(),
1042                    col.data_type.clone(),
1043                    col.property_id.0 as usize == create.primary_key_idx,
1044                )
1045            })
1046            .collect();
1047
1048        let schema: Vec<LogicalType> = create.columns.iter().map(|c| c.data_type.clone()).collect();
1049
1050        catalog.add_node_table(NodeTableEntry {
1051            table_id,
1052            name: create.name.clone(),
1053            properties,
1054            primary_key_idx: create.primary_key_idx,
1055            num_rows: 0,
1056            comment: None,
1057        })?;
1058
1059        self.catalog.commit_write(catalog);
1060
1061        // Create corresponding storage table.
1062        self.storage.write().unwrap().create_table(table_id, schema);
1063
1064        Ok(QueryResult::new(vec![], vec![]))
1065    }
1066
1067    fn exec_create_rel_table(
1068        &self,
1069        create: &kyu_binder::BoundCreateRelTable,
1070    ) -> KyuResult<QueryResult> {
1071        let mut catalog = self.catalog.begin_write();
1072
1073        let table_id = catalog.alloc_table_id();
1074        let properties: Vec<Property> = create
1075            .columns
1076            .iter()
1077            .map(|col| {
1078                let prop_id = catalog.alloc_property_id();
1079                Property::new(prop_id, col.name.clone(), col.data_type.clone(), false)
1080            })
1081            .collect();
1082
1083        // Storage schema: src_key_type, dst_key_type, then user properties.
1084        let from_key_type = catalog
1085            .find_by_id(create.from_table_id)
1086            .and_then(|e| e.as_node_table())
1087            .map(|n| n.primary_key_property().data_type.clone())
1088            .unwrap_or(LogicalType::Int64);
1089        let to_key_type = catalog
1090            .find_by_id(create.to_table_id)
1091            .and_then(|e| e.as_node_table())
1092            .map(|n| n.primary_key_property().data_type.clone())
1093            .unwrap_or(LogicalType::Int64);
1094        let mut schema = vec![from_key_type, to_key_type];
1095        schema.extend(create.columns.iter().map(|c| c.data_type.clone()));
1096
1097        catalog.add_rel_table(RelTableEntry {
1098            table_id,
1099            name: create.name.clone(),
1100            from_table_id: create.from_table_id,
1101            to_table_id: create.to_table_id,
1102            properties,
1103            num_rows: 0,
1104            comment: None,
1105        })?;
1106
1107        self.catalog.commit_write(catalog);
1108
1109        self.storage.write().unwrap().create_table(table_id, schema);
1110
1111        Ok(QueryResult::new(vec![], vec![]))
1112    }
1113
1114    fn exec_drop(&self, drop: &kyu_binder::BoundDrop) -> KyuResult<QueryResult> {
1115        let mut catalog = self.catalog.begin_write();
1116        catalog
1117            .remove_by_id(drop.table_id)
1118            .ok_or_else(|| KyuError::Catalog(format!("table '{}' not found", drop.name)))?;
1119        self.catalog.commit_write(catalog);
1120
1121        self.storage.write().unwrap().drop_table(drop.table_id);
1122
1123        Ok(QueryResult::new(vec![], vec![]))
1124    }
1125
1126    // ---- COPY FROM ----
1127
1128    fn exec_copy_from(&self, copy: &kyu_binder::BoundCopyFrom) -> KyuResult<QueryResult> {
1129        // Evaluate source expression to get file path.
1130        let path_val = evaluate_constant(&copy.source)?;
1131        let path = match &path_val {
1132            TypedValue::String(s) => s.as_str().to_string(),
1133            _ => {
1134                return Err(KyuError::Copy(
1135                    "COPY FROM source must be a string path".into(),
1136                ));
1137            }
1138        };
1139
1140        // Get the table schema from catalog.
1141        let catalog_snapshot = self.catalog.read();
1142        let entry = catalog_snapshot
1143            .find_by_id(copy.table_id)
1144            .ok_or_else(|| KyuError::Catalog(format!("table {:?} not found", copy.table_id)))?;
1145        let properties = entry.properties();
1146        let schema: Vec<LogicalType> = properties.iter().map(|p| p.data_type.clone()).collect();
1147        drop(catalog_snapshot);
1148
1149        // Open reader (auto-detects format by extension: .csv, .parquet, .arrow, .ipc).
1150        let reader = kyu_copy::open_reader(&path, &schema)?;
1151
1152        let mut storage = self.storage.write().unwrap();
1153        for row_result in reader {
1154            let values = row_result?;
1155            storage.insert_row(copy.table_id, &values)?;
1156        }
1157
1158        Ok(QueryResult::new(vec![], vec![]))
1159    }
1160
1161    // ---- LOAD FROM (RDF import) ----
1162
1163    fn exec_load_from(&self, load: &kyu_binder::BoundLoadFrom) -> KyuResult<QueryResult> {
1164        let path_val = evaluate_constant(&load.source)?;
1165        let path = match &path_val {
1166            TypedValue::String(s) => s.as_str().to_string(),
1167            _ => {
1168                return Err(KyuError::Copy(
1169                    "LOAD FROM source must be a string path".into(),
1170                ));
1171            }
1172        };
1173
1174        let triples = ext_rdf::parse_triples(&path)?;
1175        let schema = ext_rdf::infer_schema(&triples)?;
1176
1177        // Track (table_name → TableId) for rel-table foreign references.
1178        let mut node_table_ids: HashMap<String, TableId> = HashMap::new();
1179
1180        // Create node tables.
1181        for node_table in &schema.node_tables {
1182            let mut catalog = self.catalog.begin_write();
1183
1184            let table_id = catalog.alloc_table_id();
1185
1186            // uri column (primary key) + user properties.
1187            let mut storage_schema = vec![LogicalType::String];
1188            let uri_prop_id = catalog.alloc_property_id();
1189            let mut properties = vec![Property::new(
1190                uri_prop_id,
1191                SmolStr::new("uri"),
1192                LogicalType::String,
1193                true,
1194            )];
1195
1196            for (prop_name, logical_type) in &node_table.properties {
1197                let prop_id = catalog.alloc_property_id();
1198                properties.push(Property::new(
1199                    prop_id,
1200                    SmolStr::new(prop_name),
1201                    logical_type.clone(),
1202                    false,
1203                ));
1204                storage_schema.push(logical_type.clone());
1205            }
1206
1207            catalog.add_node_table(NodeTableEntry {
1208                table_id,
1209                name: SmolStr::new(&node_table.name),
1210                properties,
1211                primary_key_idx: 0,
1212                num_rows: 0,
1213                comment: None,
1214            })?;
1215
1216            self.catalog.commit_write(catalog);
1217            self.storage
1218                .write()
1219                .unwrap()
1220                .create_table(table_id, storage_schema);
1221
1222            node_table_ids.insert(node_table.name.clone(), table_id);
1223        }
1224
1225        // Insert node rows.
1226        {
1227            let mut storage = self.storage.write().unwrap();
1228            for node_table in &schema.node_tables {
1229                let table_id = node_table_ids[&node_table.name];
1230                for (uri, prop_values) in &node_table.rows {
1231                    let mut row = vec![TypedValue::String(SmolStr::new(uri))];
1232                    row.extend_from_slice(prop_values);
1233                    storage.insert_row(table_id, &row)?;
1234                }
1235            }
1236        }
1237
1238        // Create rel tables and insert edges.
1239        for rel_table in &schema.rel_tables {
1240            let from_id = match node_table_ids.get(&rel_table.from_table) {
1241                Some(id) => *id,
1242                None => continue, // skip if source table wasn't created
1243            };
1244            let to_id = match node_table_ids.get(&rel_table.to_table) {
1245                Some(id) => *id,
1246                None => continue,
1247            };
1248
1249            let mut catalog = self.catalog.begin_write();
1250            let table_id = catalog.alloc_table_id();
1251
1252            catalog.add_rel_table(RelTableEntry {
1253                table_id,
1254                name: SmolStr::new(&rel_table.name),
1255                from_table_id: from_id,
1256                to_table_id: to_id,
1257                properties: vec![],
1258                num_rows: 0,
1259                comment: None,
1260            })?;
1261
1262            self.catalog.commit_write(catalog);
1263
1264            // Storage schema: [src_uri:String, dst_uri:String].
1265            self.storage
1266                .write()
1267                .unwrap()
1268                .create_table(table_id, vec![LogicalType::String, LogicalType::String]);
1269
1270            let mut storage = self.storage.write().unwrap();
1271            for (src_uri, dst_uri) in &rel_table.edges {
1272                let row = vec![
1273                    TypedValue::String(SmolStr::new(src_uri)),
1274                    TypedValue::String(SmolStr::new(dst_uri)),
1275                ];
1276                storage.insert_row(table_id, &row)?;
1277            }
1278        }
1279
1280        Ok(QueryResult::new(vec![], vec![]))
1281    }
1282}
1283
1284// ---- Delta helpers (module-level, used by Connection::apply_delta) ----
1285
1286/// Parse a string primary key into the correct TypedValue for the given column type.
1287fn parse_primary_key(value: &str, ty: &LogicalType) -> KyuResult<TypedValue> {
1288    match ty {
1289        LogicalType::Int8 => value
1290            .parse::<i8>()
1291            .map(TypedValue::Int8)
1292            .map_err(|e| KyuError::Delta(format!("cannot parse PK '{value}' as INT8: {e}"))),
1293        LogicalType::Int16 => value
1294            .parse::<i16>()
1295            .map(TypedValue::Int16)
1296            .map_err(|e| KyuError::Delta(format!("cannot parse PK '{value}' as INT16: {e}"))),
1297        LogicalType::Int32 => value
1298            .parse::<i32>()
1299            .map(TypedValue::Int32)
1300            .map_err(|e| KyuError::Delta(format!("cannot parse PK '{value}' as INT32: {e}"))),
1301        LogicalType::Int64 | LogicalType::Serial => value
1302            .parse::<i64>()
1303            .map(TypedValue::Int64)
1304            .map_err(|e| KyuError::Delta(format!("cannot parse PK '{value}' as INT64: {e}"))),
1305        LogicalType::String => Ok(TypedValue::String(SmolStr::new(value))),
1306        _ => Err(KyuError::Delta(format!(
1307            "unsupported primary key type '{}' for delta upsert",
1308            ty.type_name()
1309        ))),
1310    }
1311}
1312
1313/// Find the global row index of the first live row matching a primary key value.
1314fn find_row_by_pk(
1315    storage: &crate::storage::NodeGroupStorage,
1316    table_id: TableId,
1317    pk_col_idx: usize,
1318    pk_value: &TypedValue,
1319) -> KyuResult<Option<u64>> {
1320    let rows = storage.scan_rows(table_id)?;
1321    for (row_idx, row_values) in &rows {
1322        if row_values.get(pk_col_idx) == Some(pk_value) {
1323            return Ok(Some(*row_idx));
1324        }
1325    }
1326    Ok(None)
1327}
1328
1329/// Find the global row index of an edge row matching src and dst primary keys.
1330/// Rel table storage schema: [src_key, dst_key, ...user_props].
1331fn find_edge_row(
1332    storage: &crate::storage::NodeGroupStorage,
1333    rel_table_id: TableId,
1334    src_pk: &TypedValue,
1335    dst_pk: &TypedValue,
1336) -> KyuResult<Option<u64>> {
1337    let rows = storage.scan_rows(rel_table_id)?;
1338    for (row_idx, row_values) in &rows {
1339        if row_values.first() == Some(src_pk) && row_values.get(1) == Some(dst_pk) {
1340            return Ok(Some(*row_idx));
1341        }
1342    }
1343    Ok(None)
1344}
1345
1346/// Find a property's column index in a node table entry by name.
1347fn find_property_index(entry: &NodeTableEntry, name: &str) -> Option<usize> {
1348    let lower = name.to_lowercase();
1349    entry
1350        .properties
1351        .iter()
1352        .position(|p| p.name.to_lowercase() == lower)
1353}
1354
1355/// Find a property's index in a rel table entry by name (0-based within user properties).
1356fn find_rel_property_index(entry: &RelTableEntry, name: &str) -> Option<usize> {
1357    let lower = name.to_lowercase();
1358    entry
1359        .properties
1360        .iter()
1361        .position(|p| p.name.to_lowercase() == lower)
1362}
1363
1364/// Build a full node row from delta properties. Columns not in `props` default to Null.
1365fn build_node_row(
1366    entry: &NodeTableEntry,
1367    pk_value: &TypedValue,
1368    props: &hashbrown::HashMap<SmolStr, TypedValue>,
1369) -> Vec<TypedValue> {
1370    entry
1371        .properties
1372        .iter()
1373        .enumerate()
1374        .map(|(i, prop)| {
1375            if i == entry.primary_key_idx {
1376                pk_value.clone()
1377            } else if let Some(val) = props.get(&prop.name) {
1378                val.clone()
1379            } else {
1380                TypedValue::Null
1381            }
1382        })
1383        .collect()
1384}
1385
1386/// Build a full edge row: [src_pk, dst_pk, ...user_props].
1387fn build_edge_row(
1388    entry: &RelTableEntry,
1389    src_pk: &TypedValue,
1390    dst_pk: &TypedValue,
1391    props: &hashbrown::HashMap<SmolStr, TypedValue>,
1392) -> Vec<TypedValue> {
1393    let mut row = vec![src_pk.clone(), dst_pk.clone()];
1394    for prop in &entry.properties {
1395        if let Some(val) = props.get(&prop.name) {
1396            row.push(val.clone());
1397        } else {
1398            row.push(TypedValue::Null);
1399        }
1400    }
1401    row
1402}
1403
1404#[cfg(test)]
1405mod tests {
1406    use crate::database::Database;
1407    use kyu_types::TypedValue;
1408    use smol_str::SmolStr;
1409
1410    #[test]
1411    fn create_database_and_connect() {
1412        let db = Database::in_memory();
1413        let _conn = db.connect();
1414        assert_eq!(db.catalog().num_tables(), 0);
1415    }
1416
1417    #[test]
1418    fn return_literal() {
1419        let db = Database::in_memory();
1420        let conn = db.connect();
1421        let result = conn.query("RETURN 1 AS x").unwrap();
1422        assert_eq!(result.num_rows(), 1);
1423        assert_eq!(result.row(0), vec![TypedValue::Int64(1)]);
1424    }
1425
1426    #[test]
1427    fn return_arithmetic() {
1428        let db = Database::in_memory();
1429        let conn = db.connect();
1430        let result = conn.query("RETURN 2 + 3 AS sum").unwrap();
1431        assert_eq!(result.row(0), vec![TypedValue::Int64(5)]);
1432    }
1433
1434    #[test]
1435    fn create_node_table() {
1436        let db = Database::in_memory();
1437        let conn = db.connect();
1438        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1439            .unwrap();
1440
1441        assert_eq!(db.catalog().num_tables(), 1);
1442        let snapshot = db.catalog().read();
1443        let entry = snapshot.find_by_name("Person").unwrap();
1444        assert!(entry.is_node_table());
1445        assert_eq!(entry.properties().len(), 2);
1446
1447        assert!(db.storage().read().unwrap().has_table(entry.table_id()));
1448    }
1449
1450    #[test]
1451    fn create_and_query_empty_table() {
1452        let db = Database::in_memory();
1453        let conn = db.connect();
1454        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1455            .unwrap();
1456        let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1457        assert_eq!(result.num_rows(), 0);
1458    }
1459
1460    #[test]
1461    fn create_rel_table() {
1462        let db = Database::in_memory();
1463        let conn = db.connect();
1464        conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1465            .unwrap();
1466        conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person, since INT64)")
1467            .unwrap();
1468
1469        assert_eq!(db.catalog().num_tables(), 2);
1470        let snapshot = db.catalog().read();
1471        let entry = snapshot.find_by_name("KNOWS").unwrap();
1472        assert!(entry.is_rel_table());
1473    }
1474
1475    #[test]
1476    fn drop_table() {
1477        let db = Database::in_memory();
1478        let conn = db.connect();
1479        conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1480            .unwrap();
1481        assert_eq!(db.catalog().num_tables(), 1);
1482
1483        conn.query("DROP TABLE Person").unwrap();
1484        assert_eq!(db.catalog().num_tables(), 0);
1485    }
1486
1487    #[test]
1488    fn create_duplicate_error() {
1489        let db = Database::in_memory();
1490        let conn = db.connect();
1491        conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1492            .unwrap();
1493        let result = conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))");
1494        assert!(result.is_err());
1495    }
1496
1497    #[test]
1498    fn parse_error_propagated() {
1499        let db = Database::in_memory();
1500        let conn = db.connect();
1501        let result = conn.query("THIS IS NOT VALID CYPHER !!!");
1502        assert!(result.is_err());
1503    }
1504
1505    #[test]
1506    fn multiple_connections_share_state() {
1507        let db = Database::in_memory();
1508        let conn1 = db.connect();
1509        let conn2 = db.connect();
1510
1511        conn1
1512            .query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1513            .unwrap();
1514
1515        // conn2 should see the table created by conn1.
1516        assert_eq!(db.catalog().num_tables(), 1);
1517        let result = conn2.query("MATCH (p:Person) RETURN p.id").unwrap();
1518        assert_eq!(result.num_rows(), 0);
1519    }
1520
1521    #[test]
1522    fn create_node_via_cypher() {
1523        let db = Database::in_memory();
1524        let conn = db.connect();
1525        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1526            .unwrap();
1527
1528        conn.query("CREATE (n:Person {id: 1, name: 'Alice'})")
1529            .unwrap();
1530
1531        let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1532        assert_eq!(result.num_rows(), 1);
1533        assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1534    }
1535
1536    #[test]
1537    fn create_multiple_nodes() {
1538        let db = Database::in_memory();
1539        let conn = db.connect();
1540        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1541            .unwrap();
1542
1543        conn.query("CREATE (a:Person {id: 1, name: 'Alice'}), (b:Person {id: 2, name: 'Bob'})")
1544            .unwrap();
1545
1546        let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1547        assert_eq!(result.num_rows(), 2);
1548    }
1549
1550    #[test]
1551    fn create_node_partial_properties() {
1552        let db = Database::in_memory();
1553        let conn = db.connect();
1554        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1555            .unwrap();
1556
1557        // name not specified — should be NULL
1558        conn.query("CREATE (n:Person {id: 1})").unwrap();
1559
1560        let result = conn.query("MATCH (p:Person) RETURN p.id, p.name").unwrap();
1561        assert_eq!(result.num_rows(), 1);
1562        assert_eq!(result.row(0)[0], TypedValue::Int64(1));
1563        assert_eq!(result.row(0)[1], TypedValue::Null);
1564    }
1565
1566    #[test]
1567    fn create_and_return() {
1568        let db = Database::in_memory();
1569        let conn = db.connect();
1570        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1571            .unwrap();
1572
1573        let result = conn
1574            .query("CREATE (n:Person {id: 1, name: 'Alice'}) RETURN n.name, n.id")
1575            .unwrap();
1576        assert_eq!(result.num_rows(), 1);
1577        assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1578        assert_eq!(result.row(0)[1], TypedValue::Int64(1));
1579    }
1580
1581    #[test]
1582    fn match_set_property() {
1583        let db = Database::in_memory();
1584        let conn = db.connect();
1585        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, age INT64, PRIMARY KEY (id))")
1586            .unwrap();
1587        conn.query("CREATE (n:Person {id: 1, name: 'Alice', age: 25})")
1588            .unwrap();
1589
1590        conn.query("MATCH (p:Person) WHERE p.name = 'Alice' SET p.age = 31")
1591            .unwrap();
1592
1593        let result = conn.query("MATCH (p:Person) RETURN p.age").unwrap();
1594        assert_eq!(result.num_rows(), 1);
1595        assert_eq!(result.row(0)[0], TypedValue::Int64(31));
1596    }
1597
1598    #[test]
1599    fn match_set_with_where() {
1600        let db = Database::in_memory();
1601        let conn = db.connect();
1602        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, age INT64, PRIMARY KEY (id))")
1603            .unwrap();
1604        conn.query("CREATE (a:Person {id: 1, name: 'Alice', age: 25})")
1605            .unwrap();
1606        conn.query("CREATE (b:Person {id: 2, name: 'Bob', age: 30})")
1607            .unwrap();
1608
1609        // Only update Alice's age.
1610        conn.query("MATCH (p:Person) WHERE p.id = 1 SET p.age = 26")
1611            .unwrap();
1612
1613        let result = conn.query("MATCH (p:Person) RETURN p.name, p.age").unwrap();
1614        assert_eq!(result.num_rows(), 2);
1615        // Find Alice's row and Bob's row.
1616        let alice_row = result
1617            .iter_rows()
1618            .find(|r| r[0] == TypedValue::String(SmolStr::new("Alice")))
1619            .unwrap();
1620        let bob_row = result
1621            .iter_rows()
1622            .find(|r| r[0] == TypedValue::String(SmolStr::new("Bob")))
1623            .unwrap();
1624        assert_eq!(alice_row[1], TypedValue::Int64(26)); // updated
1625        assert_eq!(bob_row[1], TypedValue::Int64(30)); // unchanged
1626    }
1627
1628    #[test]
1629    fn match_set_all_rows() {
1630        let db = Database::in_memory();
1631        let conn = db.connect();
1632        conn.query("CREATE NODE TABLE Person (id INT64, active INT64, PRIMARY KEY (id))")
1633            .unwrap();
1634        conn.query("CREATE (a:Person {id: 1, active: 0})").unwrap();
1635        conn.query("CREATE (b:Person {id: 2, active: 0})").unwrap();
1636
1637        // SET without WHERE — affects all rows.
1638        conn.query("MATCH (p:Person) SET p.active = 1").unwrap();
1639
1640        let result = conn.query("MATCH (p:Person) RETURN p.active").unwrap();
1641        assert_eq!(result.num_rows(), 2);
1642        assert_eq!(result.row(0)[0], TypedValue::Int64(1));
1643        assert_eq!(result.row(1)[0], TypedValue::Int64(1));
1644    }
1645
1646    #[test]
1647    fn match_delete() {
1648        let db = Database::in_memory();
1649        let conn = db.connect();
1650        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1651            .unwrap();
1652        conn.query("CREATE (a:Person {id: 1, name: 'Alice'})")
1653            .unwrap();
1654        conn.query("CREATE (b:Person {id: 2, name: 'Bob'})")
1655            .unwrap();
1656
1657        conn.query("MATCH (p:Person) WHERE p.name = 'Alice' DELETE p")
1658            .unwrap();
1659
1660        let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1661        assert_eq!(result.num_rows(), 1);
1662        assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Bob")));
1663    }
1664
1665    #[test]
1666    fn match_delete_all() {
1667        let db = Database::in_memory();
1668        let conn = db.connect();
1669        conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1670            .unwrap();
1671        conn.query("CREATE (a:Person {id: 1})").unwrap();
1672        conn.query("CREATE (b:Person {id: 2})").unwrap();
1673
1674        conn.query("MATCH (p:Person) DELETE p").unwrap();
1675
1676        let result = conn.query("MATCH (p:Person) RETURN p.id").unwrap();
1677        assert_eq!(result.num_rows(), 0);
1678    }
1679
1680    #[test]
1681    fn storage_roundtrip_insert_scan() {
1682        let db = Database::in_memory();
1683        let conn = db.connect();
1684        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1685            .unwrap();
1686
1687        // Get table ID from catalog.
1688        let snapshot = db.catalog().read();
1689        let table_id = snapshot.find_by_name("Person").unwrap().table_id();
1690        drop(snapshot);
1691
1692        // Insert directly via storage API (DML via Cypher deferred).
1693        db.storage()
1694            .write()
1695            .unwrap()
1696            .insert_row(
1697                table_id,
1698                &[
1699                    TypedValue::Int64(1),
1700                    TypedValue::String(SmolStr::new("Alice")),
1701                ],
1702            )
1703            .unwrap();
1704
1705        // Query reads from real NodeGroup/ColumnChunk storage.
1706        let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1707        assert_eq!(result.num_rows(), 1);
1708        assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1709    }
1710
1711    #[test]
1712    fn storage_roundtrip_multiple_rows() {
1713        let db = Database::in_memory();
1714        let conn = db.connect();
1715        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, age INT64, PRIMARY KEY (id))")
1716            .unwrap();
1717
1718        let snapshot = db.catalog().read();
1719        let table_id = snapshot.find_by_name("Person").unwrap().table_id();
1720        drop(snapshot);
1721
1722        let mut storage = db.storage().write().unwrap();
1723        storage
1724            .insert_row(
1725                table_id,
1726                &[
1727                    TypedValue::Int64(1),
1728                    TypedValue::String(SmolStr::new("Alice")),
1729                    TypedValue::Int64(25),
1730                ],
1731            )
1732            .unwrap();
1733        storage
1734            .insert_row(
1735                table_id,
1736                &[
1737                    TypedValue::Int64(2),
1738                    TypedValue::String(SmolStr::new("Bob")),
1739                    TypedValue::Int64(30),
1740                ],
1741            )
1742            .unwrap();
1743        drop(storage);
1744
1745        let result = conn.query("MATCH (p:Person) RETURN p.name, p.age").unwrap();
1746        assert_eq!(result.num_rows(), 2);
1747    }
1748
1749    #[test]
1750    fn copy_from_csv() {
1751        use std::io::Write;
1752
1753        let dir = std::env::temp_dir().join("kyu_test_csv");
1754        let _ = std::fs::create_dir_all(&dir);
1755        let csv_path = dir.join("persons.csv");
1756        {
1757            let mut f = std::fs::File::create(&csv_path).unwrap();
1758            writeln!(f, "id,name").unwrap();
1759            writeln!(f, "1,Alice").unwrap();
1760            writeln!(f, "2,Bob").unwrap();
1761            writeln!(f, "3,Charlie").unwrap();
1762        }
1763
1764        let db = Database::in_memory();
1765        let conn = db.connect();
1766        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1767            .unwrap();
1768        conn.query(&format!("COPY Person FROM '{}'", csv_path.display()))
1769            .unwrap();
1770
1771        let result = conn.query("MATCH (p:Person) RETURN p.id, p.name").unwrap();
1772        assert_eq!(result.num_rows(), 3);
1773
1774        // Clean up.
1775        let _ = std::fs::remove_file(&csv_path);
1776    }
1777
1778    #[test]
1779    fn copy_from_csv_multiple_types() {
1780        use std::io::Write;
1781
1782        let dir = std::env::temp_dir().join("kyu_test_csv");
1783        let _ = std::fs::create_dir_all(&dir);
1784        let csv_path = dir.join("typed.csv");
1785        {
1786            let mut f = std::fs::File::create(&csv_path).unwrap();
1787            writeln!(f, "id,name,score,active").unwrap();
1788            writeln!(f, "1,Alice,95.5,true").unwrap();
1789            writeln!(f, "2,Bob,87.3,false").unwrap();
1790        }
1791
1792        let db = Database::in_memory();
1793        let conn = db.connect();
1794        conn.query(
1795            "CREATE NODE TABLE Student (id INT64, name STRING, score DOUBLE, active BOOL, PRIMARY KEY (id))",
1796        )
1797        .unwrap();
1798        conn.query(&format!("COPY Student FROM '{}'", csv_path.display()))
1799            .unwrap();
1800
1801        let result = conn
1802            .query("MATCH (s:Student) RETURN s.name, s.score, s.active")
1803            .unwrap();
1804        assert_eq!(result.num_rows(), 2);
1805        assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1806        assert_eq!(result.row(0)[1], TypedValue::Double(95.5));
1807        assert_eq!(result.row(0)[2], TypedValue::Bool(true));
1808
1809        let _ = std::fs::remove_file(&csv_path);
1810    }
1811
1812    #[test]
1813    fn copy_from_parquet() {
1814        use arrow::array::{Int64Array, StringArray};
1815        use arrow::datatypes::{DataType, Field, Schema};
1816        use arrow::record_batch::RecordBatch;
1817        use parquet::arrow::ArrowWriter;
1818        use std::sync::Arc;
1819
1820        let dir = std::env::temp_dir().join("kyu_test_parquet_copy");
1821        let _ = std::fs::create_dir_all(&dir);
1822        let parquet_path = dir.join("persons.parquet");
1823        {
1824            let schema = Arc::new(Schema::new(vec![
1825                Field::new("id", DataType::Int64, false),
1826                Field::new("name", DataType::Utf8, false),
1827            ]));
1828            let ids = Int64Array::from(vec![1, 2, 3]);
1829            let names = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
1830            let batch =
1831                RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(ids), Arc::new(names)])
1832                    .unwrap();
1833            let file = std::fs::File::create(&parquet_path).unwrap();
1834            let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), None).unwrap();
1835            writer.write(&batch).unwrap();
1836            writer.close().unwrap();
1837        }
1838
1839        let db = Database::in_memory();
1840        let conn = db.connect();
1841        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1842            .unwrap();
1843        conn.query(&format!("COPY Person FROM '{}'", parquet_path.display()))
1844            .unwrap();
1845
1846        let result = conn.query("MATCH (p:Person) RETURN p.id, p.name").unwrap();
1847        assert_eq!(result.num_rows(), 3);
1848        assert_eq!(result.row(0)[0], TypedValue::Int64(1));
1849        assert_eq!(result.row(0)[1], TypedValue::String(SmolStr::new("Alice")));
1850
1851        let _ = std::fs::remove_dir_all(&dir);
1852    }
1853
1854    #[test]
1855    fn call_extension_pagerank() {
1856        let mut db = Database::in_memory();
1857        db.register_extension(Box::new(ext_algo::AlgoExtension));
1858        let conn = db.connect();
1859
1860        // Create graph: 1->2->3->1 (cycle).
1861        conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1862            .unwrap();
1863        conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person)")
1864            .unwrap();
1865        conn.query("CREATE (n:Person {id: 1})").unwrap();
1866        conn.query("CREATE (n:Person {id: 2})").unwrap();
1867        conn.query("CREATE (n:Person {id: 3})").unwrap();
1868
1869        // Insert relationships directly.
1870        let snapshot = db.catalog().read();
1871        let rel_table_id = snapshot.find_by_name("KNOWS").unwrap().table_id();
1872        drop(snapshot);
1873        {
1874            let mut storage = db.storage().write().unwrap();
1875            storage
1876                .insert_row(rel_table_id, &[TypedValue::Int64(1), TypedValue::Int64(2)])
1877                .unwrap();
1878            storage
1879                .insert_row(rel_table_id, &[TypedValue::Int64(2), TypedValue::Int64(3)])
1880                .unwrap();
1881            storage
1882                .insert_row(rel_table_id, &[TypedValue::Int64(3), TypedValue::Int64(1)])
1883                .unwrap();
1884        }
1885
1886        let result = conn
1887            .query("CALL algo.pageRank(0.85, 20, 0.000001)")
1888            .unwrap();
1889        assert_eq!(result.num_rows(), 3);
1890        assert_eq!(result.column_names.len(), 2);
1891        // All ranks should be positive.
1892        for row in result.iter_rows() {
1893            if let TypedValue::Double(rank) = &row[1] {
1894                assert!(*rank > 0.0);
1895            }
1896        }
1897    }
1898
1899    #[test]
1900    fn call_extension_wcc() {
1901        let mut db = Database::in_memory();
1902        db.register_extension(Box::new(ext_algo::AlgoExtension));
1903        let conn = db.connect();
1904
1905        conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1906            .unwrap();
1907        conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person)")
1908            .unwrap();
1909        conn.query("CREATE (n:Person {id: 1})").unwrap();
1910        conn.query("CREATE (n:Person {id: 2})").unwrap();
1911        conn.query("CREATE (n:Person {id: 10})").unwrap();
1912        conn.query("CREATE (n:Person {id: 11})").unwrap();
1913
1914        let snapshot = db.catalog().read();
1915        let rel_table_id = snapshot.find_by_name("KNOWS").unwrap().table_id();
1916        drop(snapshot);
1917        {
1918            let mut storage = db.storage().write().unwrap();
1919            storage
1920                .insert_row(rel_table_id, &[TypedValue::Int64(1), TypedValue::Int64(2)])
1921                .unwrap();
1922            storage
1923                .insert_row(
1924                    rel_table_id,
1925                    &[TypedValue::Int64(10), TypedValue::Int64(11)],
1926                )
1927                .unwrap();
1928        }
1929
1930        let result = conn.query("CALL algo.wcc()").unwrap();
1931        assert_eq!(result.num_rows(), 4);
1932    }
1933
1934    #[test]
1935    fn call_extension_betweenness() {
1936        let mut db = Database::in_memory();
1937        db.register_extension(Box::new(ext_algo::AlgoExtension));
1938        let conn = db.connect();
1939
1940        conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1941            .unwrap();
1942        conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person)")
1943            .unwrap();
1944        conn.query("CREATE (n:Person {id: 1})").unwrap();
1945        conn.query("CREATE (n:Person {id: 2})").unwrap();
1946        conn.query("CREATE (n:Person {id: 3})").unwrap();
1947
1948        let snapshot = db.catalog().read();
1949        let rel_table_id = snapshot.find_by_name("KNOWS").unwrap().table_id();
1950        drop(snapshot);
1951        {
1952            let mut storage = db.storage().write().unwrap();
1953            storage
1954                .insert_row(rel_table_id, &[TypedValue::Int64(1), TypedValue::Int64(2)])
1955                .unwrap();
1956            storage
1957                .insert_row(rel_table_id, &[TypedValue::Int64(2), TypedValue::Int64(3)])
1958                .unwrap();
1959        }
1960
1961        let result = conn.query("CALL algo.betweenness()").unwrap();
1962        assert_eq!(result.num_rows(), 3);
1963    }
1964
1965    #[test]
1966    fn call_unknown_extension() {
1967        let db = Database::in_memory();
1968        let conn = db.connect();
1969        let result = conn.query("CALL nonexistent.proc()");
1970        assert!(result.is_err());
1971    }
1972
1973    #[test]
1974    fn persistence_survives_restart() {
1975        let dir = std::env::temp_dir().join("kyu_test_persist_e2e");
1976        let _ = std::fs::remove_dir_all(&dir);
1977
1978        // Phase 1: Create schema and insert data, then drop the database.
1979        {
1980            let db = Database::open(&dir).unwrap();
1981            let conn = db.connect();
1982            conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1983                .unwrap();
1984            conn.query("CREATE (n:Person {id: 1, name: 'Alice'})")
1985                .unwrap();
1986            conn.query("CREATE (n:Person {id: 2, name: 'Bob'})")
1987                .unwrap();
1988            // Drop triggers checkpoint (Drop impl).
1989        }
1990
1991        // Phase 2: Reopen and verify schema + data survived.
1992        {
1993            let db = Database::open(&dir).unwrap();
1994            let conn = db.connect();
1995
1996            // Schema should be recovered.
1997            assert_eq!(db.catalog().num_tables(), 1);
1998            let snapshot = db.catalog().read();
1999            assert!(snapshot.find_by_name("Person").is_some());
2000            drop(snapshot);
2001
2002            // Data should be recovered.
2003            let result = conn.query("MATCH (p:Person) RETURN p.id, p.name").unwrap();
2004            assert_eq!(result.num_rows(), 2);
2005        }
2006
2007        let _ = std::fs::remove_dir_all(&dir);
2008    }
2009
2010    #[test]
2011    fn persistence_ddl_recovery_via_wal() {
2012        let dir = std::env::temp_dir().join("kyu_test_persist_ddl");
2013        let _ = std::fs::remove_dir_all(&dir);
2014
2015        // Phase 1: Create schema. The checkpoint on Drop will flush everything.
2016        {
2017            let db = Database::open(&dir).unwrap();
2018            let conn = db.connect();
2019            conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
2020                .unwrap();
2021            conn.query("CREATE NODE TABLE Organization (id INT64, name STRING, PRIMARY KEY (id))")
2022                .unwrap();
2023        }
2024
2025        // Phase 2: Verify both tables survived.
2026        {
2027            let db = Database::open(&dir).unwrap();
2028            assert_eq!(db.catalog().num_tables(), 2);
2029            let snapshot = db.catalog().read();
2030            assert!(snapshot.find_by_name("Person").is_some());
2031            assert!(snapshot.find_by_name("Organization").is_some());
2032        }
2033
2034        let _ = std::fs::remove_dir_all(&dir);
2035    }
2036
2037    #[test]
2038    fn persistence_empty_database() {
2039        let dir = std::env::temp_dir().join("kyu_test_persist_empty_db");
2040        let _ = std::fs::remove_dir_all(&dir);
2041
2042        // Open, do nothing, drop.
2043        {
2044            let _db = Database::open(&dir).unwrap();
2045        }
2046
2047        // Reopen — should be empty.
2048        {
2049            let db = Database::open(&dir).unwrap();
2050            assert_eq!(db.catalog().num_tables(), 0);
2051        }
2052
2053        let _ = std::fs::remove_dir_all(&dir);
2054    }
2055
2056    // ---- Parameterized query tests ----
2057
2058    #[test]
2059    fn return_param() {
2060        let db = Database::in_memory();
2061        let conn = db.connect();
2062        let mut params = std::collections::HashMap::new();
2063        params.insert("x".to_string(), TypedValue::Int64(42));
2064        let result = conn.query_with_params("RETURN $x AS val", params).unwrap();
2065        assert_eq!(result.num_rows(), 1);
2066        assert_eq!(result.row(0), vec![TypedValue::Int64(42)]);
2067    }
2068
2069    #[test]
2070    fn parameterized_where() {
2071        let db = Database::in_memory();
2072        let conn = db.connect();
2073        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, age INT64, PRIMARY KEY (id))")
2074            .unwrap();
2075        conn.query("CREATE (n:Person {id: 1, name: 'Alice', age: 30})")
2076            .unwrap();
2077        conn.query("CREATE (n:Person {id: 2, name: 'Bob', age: 20})")
2078            .unwrap();
2079
2080        let mut params = std::collections::HashMap::new();
2081        params.insert("min_age".to_string(), TypedValue::Int64(25));
2082        let result = conn
2083            .query_with_params(
2084                "MATCH (p:Person) WHERE p.age > $min_age RETURN p.name",
2085                params,
2086            )
2087            .unwrap();
2088        assert_eq!(result.num_rows(), 1);
2089        assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
2090    }
2091
2092    #[test]
2093    fn parameterized_create() {
2094        let db = Database::in_memory();
2095        let conn = db.connect();
2096        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
2097            .unwrap();
2098
2099        let mut params = std::collections::HashMap::new();
2100        params.insert("id".to_string(), TypedValue::Int64(1));
2101        params.insert(
2102            "name".to_string(),
2103            TypedValue::String(SmolStr::new("Alice")),
2104        );
2105        conn.query_with_params("CREATE (n:Person {id: $id, name: $name})", params)
2106            .unwrap();
2107
2108        let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
2109        assert_eq!(result.num_rows(), 1);
2110        assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
2111    }
2112
2113    #[test]
2114    fn parameterized_set() {
2115        let db = Database::in_memory();
2116        let conn = db.connect();
2117        conn.query("CREATE NODE TABLE Person (id INT64, age INT64, PRIMARY KEY (id))")
2118            .unwrap();
2119        conn.query("CREATE (n:Person {id: 1, age: 25})").unwrap();
2120
2121        let mut params = std::collections::HashMap::new();
2122        params.insert("new_age".to_string(), TypedValue::Int64(31));
2123        conn.query_with_params(
2124            "MATCH (p:Person) WHERE p.id = 1 SET p.age = $new_age",
2125            params,
2126        )
2127        .unwrap();
2128
2129        let result = conn.query("MATCH (p:Person) RETURN p.age").unwrap();
2130        assert_eq!(result.row(0)[0], TypedValue::Int64(31));
2131    }
2132
2133    #[test]
2134    fn unresolved_param_error() {
2135        let db = Database::in_memory();
2136        let conn = db.connect();
2137        let result = conn.query("RETURN $missing AS val");
2138        assert!(result.is_err());
2139        assert!(
2140            result
2141                .unwrap_err()
2142                .to_string()
2143                .contains("unresolved parameter")
2144        );
2145    }
2146
2147    #[test]
2148    fn env_resolved() {
2149        let db = Database::in_memory();
2150        let conn = db.connect();
2151        let mut env = std::collections::HashMap::new();
2152        env.insert(
2153            "GREETING".to_string(),
2154            TypedValue::String(SmolStr::new("hello")),
2155        );
2156        let result = conn
2157            .execute(
2158                "RETURN env('GREETING') AS val",
2159                std::collections::HashMap::new(),
2160                env,
2161            )
2162            .unwrap();
2163        assert_eq!(result.num_rows(), 1);
2164        assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("hello")));
2165    }
2166
2167    #[test]
2168    fn env_missing_returns_null() {
2169        let db = Database::in_memory();
2170        let conn = db.connect();
2171        let result = conn
2172            .execute(
2173                "RETURN env('MISSING') AS val",
2174                std::collections::HashMap::new(),
2175                std::collections::HashMap::new(),
2176            )
2177            .unwrap();
2178        assert_eq!(result.num_rows(), 1);
2179        assert_eq!(result.row(0)[0], TypedValue::Null);
2180    }
2181
2182    // ---- apply_delta tests ----
2183
2184    #[test]
2185    fn delta_upsert_new_nodes() {
2186        use kyu_delta::DeltaBatchBuilder;
2187
2188        let db = Database::in_memory();
2189        let conn = db.connect();
2190        conn.query("CREATE NODE TABLE Function (name STRING, lines INT64, PRIMARY KEY (name))")
2191            .unwrap();
2192
2193        let batch = DeltaBatchBuilder::new("file:main.rs", 1)
2194            .upsert_node(
2195                "Function",
2196                "main",
2197                vec![],
2198                [("lines", TypedValue::Int64(42))],
2199            )
2200            .upsert_node(
2201                "Function",
2202                "helper",
2203                vec![],
2204                [("lines", TypedValue::Int64(10))],
2205            )
2206            .build();
2207
2208        let stats = conn.apply_delta(batch).unwrap();
2209        assert_eq!(stats.nodes_created, 2);
2210        assert_eq!(stats.nodes_updated, 0);
2211
2212        let result = conn
2213            .query("MATCH (f:Function) RETURN f.name, f.lines")
2214            .unwrap();
2215        assert_eq!(result.num_rows(), 2);
2216    }
2217
2218    #[test]
2219    fn delta_upsert_existing_node_merges() {
2220        use kyu_delta::DeltaBatchBuilder;
2221
2222        let db = Database::in_memory();
2223        let conn = db.connect();
2224        conn.query("CREATE NODE TABLE Function (name STRING, lines INT64, PRIMARY KEY (name))")
2225            .unwrap();
2226
2227        // Create initial node.
2228        let batch1 = DeltaBatchBuilder::new("file:main.rs", 1)
2229            .upsert_node(
2230                "Function",
2231                "main",
2232                vec![],
2233                [("lines", TypedValue::Int64(42))],
2234            )
2235            .build();
2236        conn.apply_delta(batch1).unwrap();
2237
2238        // Upsert same node with updated lines.
2239        let batch2 = DeltaBatchBuilder::new("file:main.rs", 2)
2240            .upsert_node(
2241                "Function",
2242                "main",
2243                vec![],
2244                [("lines", TypedValue::Int64(50))],
2245            )
2246            .build();
2247        let stats = conn.apply_delta(batch2).unwrap();
2248        assert_eq!(stats.nodes_created, 0);
2249        assert_eq!(stats.nodes_updated, 1);
2250
2251        let result = conn
2252            .query("MATCH (f:Function) WHERE f.name = 'main' RETURN f.lines")
2253            .unwrap();
2254        assert_eq!(result.num_rows(), 1);
2255        assert_eq!(result.row(0)[0], TypedValue::Int64(50));
2256    }
2257
2258    #[test]
2259    fn delta_delete_node() {
2260        use kyu_delta::DeltaBatchBuilder;
2261
2262        let db = Database::in_memory();
2263        let conn = db.connect();
2264        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
2265            .unwrap();
2266        conn.query("CREATE (n:Person {id: 1, name: 'Alice'})")
2267            .unwrap();
2268        conn.query("CREATE (n:Person {id: 2, name: 'Bob'})")
2269            .unwrap();
2270
2271        let batch = DeltaBatchBuilder::new("cleanup", 1)
2272            .delete_node("Person", "1")
2273            .build();
2274        let stats = conn.apply_delta(batch).unwrap();
2275        assert_eq!(stats.nodes_deleted, 1);
2276
2277        let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
2278        assert_eq!(result.num_rows(), 1);
2279        assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Bob")));
2280    }
2281
2282    #[test]
2283    fn delta_upsert_and_delete_edges() {
2284        use kyu_delta::DeltaBatchBuilder;
2285
2286        let db = Database::in_memory();
2287        let conn = db.connect();
2288        conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
2289            .unwrap();
2290        conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person, since INT64)")
2291            .unwrap();
2292        conn.query("CREATE (n:Person {id: 1})").unwrap();
2293        conn.query("CREATE (n:Person {id: 2})").unwrap();
2294
2295        // Create edge via delta.
2296        let batch = DeltaBatchBuilder::new("social", 1)
2297            .upsert_edge(
2298                "Person",
2299                "1",
2300                "KNOWS",
2301                "Person",
2302                "2",
2303                [("since", TypedValue::Int64(2024))],
2304            )
2305            .build();
2306        let stats = conn.apply_delta(batch).unwrap();
2307        assert_eq!(stats.edges_created, 1);
2308
2309        // Verify edge exists.
2310        let storage = db.storage().read().unwrap();
2311        let catalog = db.catalog().read();
2312        let rel_table_id = catalog.find_by_name("KNOWS").unwrap().table_id();
2313        let rows = storage.scan_rows(rel_table_id).unwrap();
2314        assert_eq!(rows.len(), 1);
2315        assert_eq!(rows[0].1[0], TypedValue::Int64(1)); // src
2316        assert_eq!(rows[0].1[1], TypedValue::Int64(2)); // dst
2317        assert_eq!(rows[0].1[2], TypedValue::Int64(2024)); // since
2318        drop(storage);
2319        drop(catalog);
2320
2321        // Update edge property.
2322        let batch2 = DeltaBatchBuilder::new("social", 2)
2323            .upsert_edge(
2324                "Person",
2325                "1",
2326                "KNOWS",
2327                "Person",
2328                "2",
2329                [("since", TypedValue::Int64(2025))],
2330            )
2331            .build();
2332        let stats2 = conn.apply_delta(batch2).unwrap();
2333        assert_eq!(stats2.edges_updated, 1);
2334
2335        let storage = db.storage().read().unwrap();
2336        let rows = storage.scan_rows(rel_table_id).unwrap();
2337        assert_eq!(rows[0].1[2], TypedValue::Int64(2025));
2338        drop(storage);
2339
2340        // Delete edge.
2341        let batch3 = DeltaBatchBuilder::new("social", 3)
2342            .delete_edge("Person", "1", "KNOWS", "Person", "2")
2343            .build();
2344        let stats3 = conn.apply_delta(batch3).unwrap();
2345        assert_eq!(stats3.edges_deleted, 1);
2346
2347        let storage = db.storage().read().unwrap();
2348        let rows = storage.scan_rows(rel_table_id).unwrap();
2349        assert_eq!(rows.len(), 0);
2350    }
2351
2352    #[test]
2353    fn delta_idempotent_replay() {
2354        use kyu_delta::DeltaBatchBuilder;
2355
2356        let db = Database::in_memory();
2357        let conn = db.connect();
2358        conn.query("CREATE NODE TABLE File (path STRING, hash STRING, PRIMARY KEY (path))")
2359            .unwrap();
2360
2361        let batch = DeltaBatchBuilder::new("watcher", 100)
2362            .upsert_node(
2363                "File",
2364                "src/main.rs",
2365                vec![],
2366                [("hash", TypedValue::String(SmolStr::new("abc123")))],
2367            )
2368            .build();
2369
2370        // Apply once.
2371        let stats1 = conn.apply_delta(batch.clone()).unwrap();
2372        assert_eq!(stats1.nodes_created, 1);
2373
2374        // Apply again — same batch is idempotent (update, not create).
2375        let stats2 = conn.apply_delta(batch).unwrap();
2376        assert_eq!(stats2.nodes_created, 0);
2377        assert_eq!(stats2.nodes_updated, 1);
2378
2379        // Still only one row.
2380        let result = conn.query("MATCH (f:File) RETURN f.path").unwrap();
2381        assert_eq!(result.num_rows(), 1);
2382    }
2383
2384    #[test]
2385    fn delta_stats_correct() {
2386        use kyu_delta::DeltaBatchBuilder;
2387
2388        let db = Database::in_memory();
2389        let conn = db.connect();
2390        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
2391            .unwrap();
2392        conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person)")
2393            .unwrap();
2394
2395        let batch = DeltaBatchBuilder::new("test", 1)
2396            .upsert_node(
2397                "Person",
2398                "1",
2399                vec![],
2400                [("name", TypedValue::String(SmolStr::new("Alice")))],
2401            )
2402            .upsert_node(
2403                "Person",
2404                "2",
2405                vec![],
2406                [("name", TypedValue::String(SmolStr::new("Bob")))],
2407            )
2408            .upsert_edge(
2409                "Person",
2410                "1",
2411                "KNOWS",
2412                "Person",
2413                "2",
2414                Vec::<(&str, TypedValue)>::new(),
2415            )
2416            .build();
2417
2418        let stats = conn.apply_delta(batch).unwrap();
2419        assert_eq!(stats.nodes_created, 2);
2420        assert_eq!(stats.edges_created, 1);
2421        assert_eq!(stats.total_deltas, 3);
2422        assert!(stats.elapsed_micros > 0);
2423    }
2424}