Skip to main content

kyu_api/
connection.rs

1//! Connection — executes Cypher queries and DDL against a Database.
2
3use std::sync::{Arc, RwLock};
4
5use std::collections::HashMap;
6
7use std::time::Instant;
8
9use kyu_binder::{
10    BindContext, Binder, BoundMatchClause, BoundNodePattern, BoundPatternElement, BoundQuery,
11    BoundReadingClause, BoundStatement, BoundUpdatingClause,
12};
13use kyu_catalog::{Catalog, NodeTableEntry, Property, RelTableEntry};
14use kyu_common::id::TableId;
15use kyu_common::{KyuError, KyuResult};
16use kyu_delta::{DeltaBatch, DeltaStats, GraphDelta};
17use kyu_executor::{ExecutionContext, QueryResult, Storage, execute};
18use kyu_expression::{FunctionRegistry, evaluate, evaluate_constant};
19use kyu_planner::{build_query_plan, optimize, resolve_properties};
20use kyu_transaction::{Checkpointer, TransactionManager, TransactionType, Wal};
21use kyu_types::{LogicalType, TypedValue};
22use smol_str::SmolStr;
23
24use crate::storage::NodeGroupStorage;
25
26/// A connection to a KyuGraph database.
27///
28/// Connections share catalog and storage via `Arc`. Each query gets a
29/// consistent catalog snapshot for binding and planning; DDL mutates
30/// both catalog and storage atomically. Every query is wrapped in a
31/// transaction for crash safety and isolation.
32pub struct Connection {
33    catalog: Arc<Catalog>,
34    storage: Arc<RwLock<NodeGroupStorage>>,
35    txn_mgr: Arc<TransactionManager>,
36    wal: Arc<Wal>,
37    checkpointer: Arc<Checkpointer>,
38    extensions: Arc<Vec<Box<dyn kyu_extension::Extension>>>,
39}
40
41impl Connection {
42    pub(crate) fn new(
43        catalog: Arc<Catalog>,
44        storage: Arc<RwLock<NodeGroupStorage>>,
45        txn_mgr: Arc<TransactionManager>,
46        wal: Arc<Wal>,
47        checkpointer: Arc<Checkpointer>,
48        extensions: Arc<Vec<Box<dyn kyu_extension::Extension>>>,
49    ) -> Self {
50        Self {
51            catalog,
52            storage,
53            txn_mgr,
54            wal,
55            checkpointer,
56            extensions,
57        }
58    }
59
60    /// Execute a Cypher statement, returning a QueryResult.
61    ///
62    /// Each statement is wrapped in a transaction: read-only queries get a
63    /// `ReadOnly` transaction, write queries get a `Write` transaction.
64    /// The transaction is committed on success and rolled back on error.
65    pub fn query(&self, cypher: &str) -> KyuResult<QueryResult> {
66        self.query_internal(cypher, BindContext::empty())
67    }
68
69    /// Execute a Cypher statement with parameter bindings for `$param` placeholders.
70    ///
71    /// Parameters are resolved to literal values at bind time, before planning
72    /// and execution. This is the preferred way to pass dynamic values safely.
73    ///
74    /// ```ignore
75    /// use std::collections::HashMap;
76    /// use kyu_types::TypedValue;
77    ///
78    /// let mut params = HashMap::new();
79    /// params.insert("min_age".to_string(), TypedValue::Int64(25));
80    /// let result = conn.query_with_params(
81    ///     "MATCH (p:Person) WHERE p.age > $min_age RETURN p.name",
82    ///     params,
83    /// ).unwrap();
84    /// ```
85    pub fn query_with_params(
86        &self,
87        cypher: &str,
88        params: HashMap<String, TypedValue>,
89    ) -> KyuResult<QueryResult> {
90        let ctx = BindContext {
91            params: params
92                .into_iter()
93                .map(|(k, v)| (SmolStr::new(k), v))
94                .collect(),
95            env: HashMap::new(),
96        };
97        self.query_internal(cypher, ctx)
98    }
99
100    /// Full VM-style execution with both `$param` bindings and `env()` values.
101    ///
102    /// The Cypher evaluator is treated like a virtual machine: it accepts a
103    /// query string plus two context maps that are resolved before planning:
104    ///
105    /// - `params`: `$param` placeholders → `TypedValue`
106    /// - `env`: `env('KEY')` lookups → `TypedValue`
107    ///
108    /// ```ignore
109    /// use std::collections::HashMap;
110    /// use kyu_types::TypedValue;
111    ///
112    /// let mut params = HashMap::new();
113    /// params.insert("name".to_string(), TypedValue::String("Alice".into()));
114    /// let mut env = HashMap::new();
115    /// env.insert("PREFIX".to_string(), TypedValue::String("graph_".into()));
116    /// let result = conn.execute(
117    ///     "MATCH (p:Person) WHERE p.name = $name RETURN p.name",
118    ///     params,
119    ///     env,
120    /// ).unwrap();
121    /// ```
122    pub fn execute(
123        &self,
124        cypher: &str,
125        params: HashMap<String, TypedValue>,
126        env: HashMap<String, TypedValue>,
127    ) -> KyuResult<QueryResult> {
128        let ctx = BindContext {
129            params: params
130                .into_iter()
131                .map(|(k, v)| (SmolStr::new(k), v))
132                .collect(),
133            env: env.into_iter().map(|(k, v)| (SmolStr::new(k), v)).collect(),
134        };
135        self.query_internal(cypher, ctx)
136    }
137
138    fn query_internal(&self, cypher: &str, ctx: BindContext) -> KyuResult<QueryResult> {
139        // Fast path: CHECKPOINT command.
140        if cypher.trim().eq_ignore_ascii_case("CHECKPOINT")
141            || cypher.trim().eq_ignore_ascii_case("CHECKPOINT;")
142        {
143            self.checkpointer
144                .checkpoint()
145                .map_err(|e| KyuError::Transaction(format!("checkpoint failed: {e}")))?;
146            return Ok(QueryResult::new(vec![], vec![]));
147        }
148
149        // Fast path: CALL ext.proc(...) routing to extensions.
150        if let Some(result) = self.try_call_extension(cypher)? {
151            return Ok(result);
152        }
153
154        // 1. Parse
155        let parse_result = kyu_parser::parse(cypher);
156        let stmt = parse_result
157            .ast
158            .ok_or_else(|| KyuError::Parser(format!("{:?}", parse_result.errors)))?;
159
160        // 2. Bind (against a catalog snapshot)
161        let catalog_snapshot = self.catalog.read();
162        let mut binder =
163            Binder::new(catalog_snapshot, FunctionRegistry::with_builtins()).with_context(ctx);
164        let bound = binder.bind(&stmt)?;
165
166        // 3. Determine if this is a write operation and/or DDL.
167        let is_ddl = matches!(
168            &bound,
169            BoundStatement::CreateNodeTable(_)
170                | BoundStatement::CreateRelTable(_)
171                | BoundStatement::Drop(_)
172        );
173        let is_write = match &bound {
174            BoundStatement::Query(q) => self.is_standalone_dml(q) || self.has_match_mutations(q),
175            BoundStatement::CopyFrom(_) => true,
176            _ => is_ddl,
177        };
178
179        // 4. Begin transaction.
180        let txn_type = if is_write {
181            TransactionType::Write
182        } else {
183            TransactionType::ReadOnly
184        };
185        let mut txn = self
186            .txn_mgr
187            .begin(txn_type)
188            .map_err(|e| KyuError::Transaction(e.to_string()))?;
189
190        // 5. Execute within the transaction.
191        let result = self.execute_bound(bound);
192
193        // 6. Commit on success, rollback on error.
194        match &result {
195            Ok(_) => {
196                // For DDL, snapshot the catalog to WAL for crash recovery.
197                if is_ddl {
198                    let snapshot = self.catalog.read().serialize_json();
199                    txn.log_catalog_snapshot(snapshot.into_bytes());
200                }
201                self.txn_mgr
202                    .commit(&mut txn, &self.wal, |_, _| {})
203                    .map_err(|e| KyuError::Transaction(e.to_string()))?;
204                // Auto-checkpoint after write commits if WAL exceeds threshold.
205                if is_write {
206                    let _ = self.checkpointer.try_checkpoint();
207                }
208            }
209            Err(_) => {
210                let _ = self.txn_mgr.rollback(&mut txn, |_| {});
211            }
212        }
213
214        result
215    }
216
217    /// Route a bound statement to the appropriate executor.
218    fn execute_bound(&self, bound: BoundStatement) -> KyuResult<QueryResult> {
219        match bound {
220            BoundStatement::Query(query) => {
221                if self.is_standalone_dml(&query) {
222                    return self.exec_dml(&query);
223                }
224                if self.has_match_mutations(&query) {
225                    return self.exec_match_dml(&query);
226                }
227                let catalog_snapshot = self.catalog.read();
228                let plan = build_query_plan(&query, &catalog_snapshot)?;
229                let plan = optimize(plan, &catalog_snapshot);
230                let storage_guard = self.storage.read().unwrap();
231                let ctx = ExecutionContext::new(catalog_snapshot, &*storage_guard);
232                execute(&plan, &query.output_schema, &ctx)
233            }
234            BoundStatement::CreateNodeTable(create) => self.exec_create_node_table(&create),
235            BoundStatement::CreateRelTable(create) => self.exec_create_rel_table(&create),
236            BoundStatement::Drop(drop) => self.exec_drop(&drop),
237            BoundStatement::CopyFrom(copy) => self.exec_copy_from(&copy),
238            _ => Err(KyuError::NotImplemented(
239                "statement type not yet supported".into(),
240            )),
241        }
242    }
243
244    // ---- DML execution ----
245
246    /// Returns true if the query has no reading clauses (standalone CREATE, CREATE...RETURN).
247    fn is_standalone_dml(&self, query: &BoundQuery) -> bool {
248        query
249            .parts
250            .iter()
251            .all(|part| part.reading_clauses.is_empty() && !part.updating_clauses.is_empty())
252    }
253
254    /// Execute a standalone DML statement (CREATE nodes/rels, with optional RETURN).
255    fn exec_dml(&self, query: &BoundQuery) -> KyuResult<QueryResult> {
256        let catalog_snapshot = self.catalog.read();
257
258        for part in &query.parts {
259            // Track created nodes for potential RETURN projection.
260            let mut created_nodes: Vec<(Option<u32>, TableId, Vec<TypedValue>)> = Vec::new();
261
262            for clause in &part.updating_clauses {
263                match clause {
264                    BoundUpdatingClause::Create(patterns) => {
265                        for pattern in patterns {
266                            for element in &pattern.elements {
267                                match element {
268                                    BoundPatternElement::Node(node) => {
269                                        let values =
270                                            self.exec_create_node(node, &catalog_snapshot)?;
271                                        created_nodes.push((
272                                            node.variable_index,
273                                            node.table_id,
274                                            values,
275                                        ));
276                                    }
277                                    BoundPatternElement::Relationship(_rel) => {
278                                        return Err(KyuError::NotImplemented(
279                                            "CREATE relationship not yet supported".into(),
280                                        ));
281                                    }
282                                }
283                            }
284                        }
285                    }
286                    BoundUpdatingClause::Set(_) => {
287                        return Err(KyuError::NotImplemented(
288                            "standalone SET without MATCH".into(),
289                        ));
290                    }
291                    BoundUpdatingClause::Delete(_) => {
292                        return Err(KyuError::NotImplemented(
293                            "standalone DELETE without MATCH".into(),
294                        ));
295                    }
296                }
297            }
298
299            // Handle RETURN projection if present.
300            if let Some(ref proj) = part.projection {
301                let mut prop_map: HashMap<(u32, SmolStr), u32> = HashMap::new();
302                let mut combined_values: Vec<TypedValue> = Vec::new();
303                let mut offset = 0u32;
304
305                for (var_idx, table_id, values) in &created_nodes {
306                    if let Some(entry) = catalog_snapshot.find_by_id(*table_id) {
307                        if let Some(vi) = var_idx {
308                            for (i, prop) in entry.properties().iter().enumerate() {
309                                prop_map.insert((*vi, prop.name.clone()), offset + i as u32);
310                            }
311                        }
312                        offset += entry.properties().len() as u32;
313                    }
314                    combined_values.extend(values.iter().cloned());
315                }
316
317                let col_names: Vec<SmolStr> =
318                    proj.items.iter().map(|item| item.alias.clone()).collect();
319                let col_types: Vec<LogicalType> = proj
320                    .items
321                    .iter()
322                    .map(|item| item.expression.result_type().clone())
323                    .collect();
324
325                let mut row: Vec<TypedValue> = Vec::with_capacity(proj.items.len());
326                for item in &proj.items {
327                    let resolved = resolve_properties(&item.expression, &prop_map);
328                    let value = evaluate(&resolved, combined_values.as_slice())?;
329                    row.push(value);
330                }
331
332                let mut result = QueryResult::new(col_names, col_types);
333                result.push_row(row);
334                return Ok(result);
335            }
336        }
337
338        Ok(QueryResult::new(vec![], vec![]))
339    }
340
341    /// Insert a single node row into storage based on a CREATE pattern.
342    /// Returns the created values in catalog property order.
343    fn exec_create_node(
344        &self,
345        node: &BoundNodePattern,
346        catalog: &kyu_catalog::CatalogContent,
347    ) -> KyuResult<Vec<TypedValue>> {
348        let entry = catalog
349            .find_by_id(node.table_id)
350            .ok_or_else(|| KyuError::Catalog(format!("table {:?} not found", node.table_id)))?;
351        let properties = entry.properties();
352
353        let mut values = Vec::with_capacity(properties.len());
354        for prop in properties {
355            let value = if let Some((_pid, expr)) =
356                node.properties.iter().find(|(pid, _)| *pid == prop.id)
357            {
358                evaluate_constant(expr)?
359            } else {
360                TypedValue::Null
361            };
362            values.push(value);
363        }
364
365        self.storage
366            .write()
367            .unwrap()
368            .insert_row(node.table_id, &values)?;
369
370        Ok(values)
371    }
372
373    /// Returns true if the query has both MATCH (reading) and SET/DELETE (updating) clauses.
374    fn has_match_mutations(&self, query: &BoundQuery) -> bool {
375        query
376            .parts
377            .iter()
378            .any(|part| !part.reading_clauses.is_empty() && !part.updating_clauses.is_empty())
379    }
380
381    /// Execute MATCH...SET/DELETE/CREATE: scan, filter, then mutate.
382    ///
383    /// Supports multiple chained MATCH clauses (e.g. MATCH (a:X) WHERE ...
384    /// MATCH (b:Y) WHERE ... CREATE (a)-[:REL]->(b)).
385    fn exec_match_dml(&self, query: &BoundQuery) -> KyuResult<QueryResult> {
386        let catalog_snapshot = self.catalog.read();
387
388        for part in &query.parts {
389            // Collect all MATCH clauses in this part.
390            let match_clauses: Vec<&BoundMatchClause> = part
391                .reading_clauses
392                .iter()
393                .filter_map(|c| match c {
394                    BoundReadingClause::Match(m) => Some(m),
395                    _ => None,
396                })
397                .collect();
398
399            if match_clauses.is_empty() {
400                return Err(KyuError::NotImplemented(
401                    "MATCH...mutation requires at least one MATCH clause".into(),
402                ));
403            }
404
405            // Build combined result: for each MATCH clause, scan + filter
406            // and store matched rows indexed by variable_index.
407            // Each "binding" maps variable_index → row values.
408            let mut bindings: Vec<HashMap<u32, Vec<TypedValue>>> = vec![HashMap::new()];
409            let mut prop_map: HashMap<(u32, SmolStr), u32> = HashMap::new();
410            let mut global_offset = 0u32;
411
412            for mc in &match_clauses {
413                let (table_id, var_idx) = self.extract_match_node(mc)?;
414                let entry = catalog_snapshot
415                    .find_by_id(table_id)
416                    .ok_or_else(|| KyuError::Catalog(format!("table {:?} not found", table_id)))?;
417                let properties = entry.properties();
418
419                // Add this node's properties to the combined prop_map.
420                if let Some(vi) = var_idx {
421                    for (i, p) in properties.iter().enumerate() {
422                        prop_map.insert((vi, p.name.clone()), global_offset + i as u32);
423                    }
424                }
425
426                // Resolve WHERE predicate against current prop_map.
427                let resolved_where = mc
428                    .where_clause
429                    .as_ref()
430                    .map(|w| resolve_properties(w, &prop_map));
431
432                // Scan this table.
433                let rows = self.storage.read().unwrap().scan_rows(table_id)?;
434
435                // Cross-product with existing bindings, filtering by WHERE.
436                let mut new_bindings = Vec::new();
437                for existing in &bindings {
438                    for (_row_idx, row_values) in &rows {
439                        // Build combined tuple for WHERE evaluation.
440                        let mut combined = Vec::new();
441                        // Gather values from previously matched variables in order.
442                        let mut entries: Vec<(u32, &Vec<TypedValue>)> =
443                            existing.iter().map(|(&k, v)| (k, v)).collect();
444                        entries.sort_by_key(|(k, _)| *k);
445                        for (_, vals) in &entries {
446                            combined.extend(vals.iter().cloned());
447                        }
448                        combined.extend(row_values.iter().cloned());
449
450                        // Evaluate WHERE against combined tuple.
451                        if let Some(ref pred) = resolved_where {
452                            let result = evaluate(pred, combined.as_slice())?;
453                            if result != TypedValue::Bool(true) {
454                                continue;
455                            }
456                        }
457
458                        let mut binding = existing.clone();
459                        if let Some(vi) = var_idx {
460                            binding.insert(vi, row_values.clone());
461                        }
462                        new_bindings.push(binding);
463                    }
464                }
465                bindings = new_bindings;
466                global_offset += properties.len() as u32;
467            }
468
469            // Now process updating clauses for each binding.
470            let mut set_mutations: Vec<(TableId, u64, usize, TypedValue)> = Vec::new();
471            let mut delete_rows: Vec<(TableId, u64)> = Vec::new();
472            let mut rel_inserts: Vec<(TableId, Vec<TypedValue>)> = Vec::new();
473
474            for binding in &bindings {
475                // Build combined flat tuple for expression evaluation.
476                let mut combined = Vec::new();
477                let mut entries: Vec<(u32, &Vec<TypedValue>)> =
478                    binding.iter().map(|(&k, v)| (k, v)).collect();
479                entries.sort_by_key(|(k, _)| *k);
480                for (_, vals) in &entries {
481                    combined.extend(vals.iter().cloned());
482                }
483
484                for clause in &part.updating_clauses {
485                    match clause {
486                        BoundUpdatingClause::Set(items) => {
487                            // For SET, we need the first MATCH clause's table.
488                            let (table_id, _) = self.extract_match_node(match_clauses[0])?;
489                            let entry = catalog_snapshot.find_by_id(table_id).unwrap();
490                            let properties = entry.properties();
491                            let rows = self.storage.read().unwrap().scan_rows(table_id)?;
492                            // Find the matching row by PK comparison.
493                            let first_var =
494                                match_clauses[0].patterns[0].elements.iter().find_map(|e| {
495                                    if let BoundPatternElement::Node(n) = e {
496                                        n.variable_index
497                                    } else {
498                                        None
499                                    }
500                                });
501                            if let Some(vi) = first_var
502                                && let Some(var_vals) = binding.get(&vi)
503                            {
504                                for (row_idx, row_values) in &rows {
505                                    if row_values == var_vals {
506                                        for item in items {
507                                            let resolved_value =
508                                                resolve_properties(&item.value, &prop_map);
509                                            let new_value =
510                                                evaluate(&resolved_value, combined.as_slice())?;
511                                            let col_idx = properties
512                                                .iter()
513                                                .position(|p| p.id == item.property_id)
514                                                .ok_or_else(|| {
515                                                    KyuError::Storage(format!(
516                                                        "property {:?} not found",
517                                                        item.property_id
518                                                    ))
519                                                })?;
520                                            set_mutations
521                                                .push((table_id, *row_idx, col_idx, new_value));
522                                        }
523                                        break;
524                                    }
525                                }
526                            }
527                        }
528                        BoundUpdatingClause::Delete(_) => {
529                            let (table_id, _) = self.extract_match_node(match_clauses[0])?;
530                            let rows = self.storage.read().unwrap().scan_rows(table_id)?;
531                            let first_var =
532                                match_clauses[0].patterns[0].elements.iter().find_map(|e| {
533                                    if let BoundPatternElement::Node(n) = e {
534                                        n.variable_index
535                                    } else {
536                                        None
537                                    }
538                                });
539                            if let Some(vi) = first_var
540                                && let Some(var_vals) = binding.get(&vi)
541                            {
542                                for (row_idx, row_values) in &rows {
543                                    if row_values == var_vals {
544                                        delete_rows.push((table_id, *row_idx));
545                                        break;
546                                    }
547                                }
548                            }
549                        }
550                        BoundUpdatingClause::Create(patterns) => {
551                            for pattern in patterns {
552                                self.exec_create_in_binding(
553                                    pattern,
554                                    binding,
555                                    &catalog_snapshot,
556                                    &mut rel_inserts,
557                                )?;
558                            }
559                        }
560                    }
561                }
562            }
563
564            // Phase 2: Write — apply mutations.
565            let mut storage = self.storage.write().unwrap();
566            for (table_id, row_idx, col_idx, value) in &set_mutations {
567                storage.update_cell(*table_id, *row_idx, *col_idx, value)?;
568            }
569            for (table_id, row_idx) in &delete_rows {
570                storage.delete_row(*table_id, *row_idx)?;
571            }
572            for (table_id, values) in &rel_inserts {
573                storage.insert_row(*table_id, values)?;
574            }
575        }
576
577        Ok(QueryResult::new(vec![], vec![]))
578    }
579
580    /// Execute CREATE patterns within a binding (matched variable → row values).
581    /// Handles relationship creation: extracts src/dst PK from bound variables.
582    fn exec_create_in_binding(
583        &self,
584        pattern: &kyu_binder::BoundPattern,
585        binding: &HashMap<u32, Vec<TypedValue>>,
586        catalog: &kyu_catalog::CatalogContent,
587        rel_inserts: &mut Vec<(TableId, Vec<TypedValue>)>,
588    ) -> KyuResult<()> {
589        let elements = &pattern.elements;
590        let mut i = 0;
591        while i < elements.len() {
592            match &elements[i] {
593                BoundPatternElement::Node(_node) => {
594                    // Node in CREATE within MATCH context is a reference to an
595                    // already-matched variable — no new node creation needed.
596                    i += 1;
597                }
598                BoundPatternElement::Relationship(rel) => {
599                    // Expect: Node(src), Rel, Node(dst)
600                    let src_var = if i > 0 {
601                        if let BoundPatternElement::Node(n) = &elements[i - 1] {
602                            n.variable_index
603                        } else {
604                            None
605                        }
606                    } else {
607                        None
608                    };
609                    let dst_var = if i + 1 < elements.len() {
610                        if let BoundPatternElement::Node(n) = &elements[i + 1] {
611                            n.variable_index
612                        } else {
613                            None
614                        }
615                    } else {
616                        None
617                    };
618
619                    let src_vi = src_var.ok_or_else(|| {
620                        KyuError::Runtime("CREATE rel: cannot resolve source node".into())
621                    })?;
622                    let dst_vi = dst_var.ok_or_else(|| {
623                        KyuError::Runtime("CREATE rel: cannot resolve destination node".into())
624                    })?;
625
626                    let src_vals = binding.get(&src_vi).ok_or_else(|| {
627                        KyuError::Runtime(format!(
628                            "CREATE rel: source var {src_vi} not in bindings"
629                        ))
630                    })?;
631                    let dst_vals = binding.get(&dst_vi).ok_or_else(|| {
632                        KyuError::Runtime(format!("CREATE rel: dest var {dst_vi} not in bindings"))
633                    })?;
634
635                    // Look up the rel table entry to find src/dst node tables.
636                    let rel_entry = catalog
637                        .find_by_id(rel.table_id)
638                        .and_then(|e| e.as_rel_table())
639                        .ok_or_else(|| {
640                            KyuError::Catalog(format!("rel table {:?} not found", rel.table_id))
641                        })?;
642
643                    let src_node_entry = catalog
644                        .find_by_id(rel_entry.from_table_id)
645                        .and_then(|e| e.as_node_table())
646                        .ok_or_else(|| KyuError::Catalog("source node table not found".into()))?;
647                    let dst_node_entry = catalog
648                        .find_by_id(rel_entry.to_table_id)
649                        .and_then(|e| e.as_node_table())
650                        .ok_or_else(|| KyuError::Catalog("dest node table not found".into()))?;
651
652                    // Extract primary key values from matched rows.
653                    let src_pk = src_vals[src_node_entry.primary_key_idx].clone();
654                    let dst_pk = dst_vals[dst_node_entry.primary_key_idx].clone();
655
656                    // Build rel row: [src_pk, dst_pk, ...user_properties]
657                    let mut values = vec![src_pk, dst_pk];
658                    let rel_properties = rel_entry.properties.as_slice();
659                    for prop in rel_properties {
660                        let value = if let Some((_pid, expr)) =
661                            rel.properties.iter().find(|(pid, _)| *pid == prop.id)
662                        {
663                            evaluate_constant(expr)?
664                        } else {
665                            TypedValue::Null
666                        };
667                        values.push(value);
668                    }
669
670                    rel_inserts.push((rel.table_id, values));
671                    i += 1;
672                }
673            }
674        }
675        Ok(())
676    }
677
678    /// Extract the single node table_id and variable_index from a MATCH clause.
679    fn extract_match_node(
680        &self,
681        match_clause: &BoundMatchClause,
682    ) -> KyuResult<(TableId, Option<u32>)> {
683        for pattern in &match_clause.patterns {
684            for element in &pattern.elements {
685                if let BoundPatternElement::Node(node) = element {
686                    return Ok((node.table_id, node.variable_index));
687                }
688            }
689        }
690        Err(KyuError::NotImplemented(
691            "MATCH clause must contain at least one node pattern".into(),
692        ))
693    }
694
695    // ---- Delta Fast Path ----
696
697    /// Apply a batch of conflict-free, idempotent upserts that bypass OCC.
698    ///
699    /// All deltas in the batch commit atomically via a single WAL append.
700    /// Semantics are "last write wins" — safe only when upsert semantics
701    /// are acceptable (ingestion pipelines, agentic code graphs, document
702    /// processing).
703    pub fn apply_delta(&self, batch: DeltaBatch) -> KyuResult<DeltaStats> {
704        let start = Instant::now();
705        let mut stats = DeltaStats {
706            total_deltas: batch.len() as u64,
707            ..DeltaStats::default()
708        };
709
710        // Begin a write transaction for WAL serialization.
711        let mut txn = self
712            .txn_mgr
713            .begin(TransactionType::Write)
714            .map_err(|e| KyuError::Transaction(e.to_string()))?;
715
716        let catalog = self.catalog.read();
717        let mut storage = self.storage.write().unwrap();
718
719        for delta in batch.iter() {
720            match delta {
721                GraphDelta::UpsertNode {
722                    key,
723                    labels: _,
724                    props,
725                } => {
726                    let entry = catalog.find_by_name(key.label.as_str()).ok_or_else(|| {
727                        KyuError::Catalog(format!("node table '{}' not found", key.label))
728                    })?;
729                    let node_entry = entry.as_node_table().ok_or_else(|| {
730                        KyuError::Catalog(format!("'{}' is not a node table", key.label))
731                    })?;
732                    let table_id = node_entry.table_id;
733                    let pk_col_idx = node_entry.primary_key_idx;
734                    let pk_type = &node_entry.properties[pk_col_idx].data_type;
735                    let pk_value = parse_primary_key(key.primary_key.as_str(), pk_type)?;
736
737                    let existing = find_row_by_pk(&storage, table_id, pk_col_idx, &pk_value)?;
738
739                    if let Some(row_idx) = existing {
740                        // UPDATE: merge properties (unmentioned props unchanged).
741                        for (prop_name, value) in props {
742                            if let Some(col_idx) =
743                                find_property_index(node_entry, prop_name.as_str())
744                            {
745                                storage.update_cell(table_id, row_idx, col_idx, value)?;
746                            }
747                        }
748                        stats.nodes_updated += 1;
749                    } else {
750                        // INSERT: build full row with PK + props, nulls for absent columns.
751                        let values = build_node_row(node_entry, &pk_value, props);
752                        storage.insert_row(table_id, &values)?;
753                        stats.nodes_created += 1;
754                    }
755                }
756
757                GraphDelta::UpsertEdge {
758                    src,
759                    rel_type,
760                    dst,
761                    props,
762                } => {
763                    let entry = catalog.find_by_name(rel_type.as_str()).ok_or_else(|| {
764                        KyuError::Catalog(format!("rel table '{}' not found", rel_type))
765                    })?;
766                    let rel_entry = entry.as_rel_table().ok_or_else(|| {
767                        KyuError::Catalog(format!("'{}' is not a rel table", rel_type))
768                    })?;
769                    let rel_table_id = rel_entry.table_id;
770
771                    // Resolve src/dst primary key types from their node tables.
772                    let src_node = catalog
773                        .find_by_name(src.label.as_str())
774                        .and_then(|e| e.as_node_table())
775                        .ok_or_else(|| {
776                            KyuError::Catalog(format!("node table '{}' not found", src.label))
777                        })?;
778                    let dst_node = catalog
779                        .find_by_name(dst.label.as_str())
780                        .and_then(|e| e.as_node_table())
781                        .ok_or_else(|| {
782                            KyuError::Catalog(format!("node table '{}' not found", dst.label))
783                        })?;
784
785                    let src_pk_type = &src_node.properties[src_node.primary_key_idx].data_type;
786                    let dst_pk_type = &dst_node.properties[dst_node.primary_key_idx].data_type;
787                    let src_pk = parse_primary_key(src.primary_key.as_str(), src_pk_type)?;
788                    let dst_pk = parse_primary_key(dst.primary_key.as_str(), dst_pk_type)?;
789
790                    // Rel table storage schema: [src_key, dst_key, ...user_props]
791                    let existing = find_edge_row(&storage, rel_table_id, &src_pk, &dst_pk)?;
792
793                    if let Some(row_idx) = existing {
794                        // UPDATE: merge edge properties (offset by 2 for src/dst key cols).
795                        for (prop_name, value) in props {
796                            if let Some(prop_idx) =
797                                find_rel_property_index(rel_entry, prop_name.as_str())
798                            {
799                                let col_idx = prop_idx + 2; // skip src_key, dst_key columns
800                                storage.update_cell(rel_table_id, row_idx, col_idx, value)?;
801                            }
802                        }
803                        stats.edges_updated += 1;
804                    } else {
805                        // INSERT: [src_pk, dst_pk, ...props]
806                        let values = build_edge_row(rel_entry, &src_pk, &dst_pk, props);
807                        storage.insert_row(rel_table_id, &values)?;
808                        stats.edges_created += 1;
809                    }
810                }
811
812                GraphDelta::DeleteNode { key } => {
813                    let entry = catalog
814                        .find_by_name(key.label.as_str())
815                        .and_then(|e| e.as_node_table())
816                        .ok_or_else(|| {
817                            KyuError::Catalog(format!("node table '{}' not found", key.label))
818                        })?;
819                    let table_id = entry.table_id;
820                    let pk_col_idx = entry.primary_key_idx;
821                    let pk_type = &entry.properties[pk_col_idx].data_type;
822                    let pk_value = parse_primary_key(key.primary_key.as_str(), pk_type)?;
823
824                    if let Some(row_idx) =
825                        find_row_by_pk(&storage, table_id, pk_col_idx, &pk_value)?
826                    {
827                        storage.delete_row(table_id, row_idx)?;
828                        stats.nodes_deleted += 1;
829                    }
830                }
831
832                GraphDelta::DeleteEdge { src, rel_type, dst } => {
833                    let rel_entry = catalog
834                        .find_by_name(rel_type.as_str())
835                        .and_then(|e| e.as_rel_table())
836                        .ok_or_else(|| {
837                            KyuError::Catalog(format!("rel table '{}' not found", rel_type))
838                        })?;
839                    let rel_table_id = rel_entry.table_id;
840
841                    let src_node = catalog
842                        .find_by_name(src.label.as_str())
843                        .and_then(|e| e.as_node_table())
844                        .ok_or_else(|| {
845                            KyuError::Catalog(format!("node table '{}' not found", src.label))
846                        })?;
847                    let dst_node = catalog
848                        .find_by_name(dst.label.as_str())
849                        .and_then(|e| e.as_node_table())
850                        .ok_or_else(|| {
851                            KyuError::Catalog(format!("node table '{}' not found", dst.label))
852                        })?;
853
854                    let src_pk = parse_primary_key(
855                        src.primary_key.as_str(),
856                        &src_node.properties[src_node.primary_key_idx].data_type,
857                    )?;
858                    let dst_pk = parse_primary_key(
859                        dst.primary_key.as_str(),
860                        &dst_node.properties[dst_node.primary_key_idx].data_type,
861                    )?;
862
863                    if let Some(row_idx) = find_edge_row(&storage, rel_table_id, &src_pk, &dst_pk)?
864                    {
865                        storage.delete_row(rel_table_id, row_idx)?;
866                        stats.edges_deleted += 1;
867                    }
868                }
869            }
870        }
871
872        drop(storage);
873        drop(catalog);
874
875        // Commit WAL record.
876        self.txn_mgr
877            .commit(&mut txn, &self.wal, |_, _| {})
878            .map_err(|e| KyuError::Transaction(e.to_string()))?;
879        let _ = self.checkpointer.try_checkpoint();
880
881        stats.elapsed_micros = start.elapsed().as_micros() as u64;
882        Ok(stats)
883    }
884
885    // ---- Extension CALL routing ----
886
887    /// Try to parse and route a `CALL ext.proc(args...)` statement to a registered extension.
888    /// Returns `None` if the statement is not a CALL.
889    fn try_call_extension(&self, cypher: &str) -> KyuResult<Option<QueryResult>> {
890        let trimmed = cypher.trim();
891        if !trimmed.to_uppercase().starts_with("CALL ") {
892            return Ok(None);
893        }
894
895        // Parse: CALL <ext>.<proc>(<arg1>, <arg2>, ...)
896        let rest = trimmed[5..].trim();
897        let dot_pos = rest.find('.').ok_or_else(|| {
898            KyuError::Binder("CALL requires <extension>.<procedure>(...) syntax".into())
899        })?;
900        let ext_name = &rest[..dot_pos];
901        let after_dot = &rest[dot_pos + 1..];
902
903        let paren_pos = after_dot.find('(').ok_or_else(|| {
904            KyuError::Binder("CALL requires <extension>.<procedure>(...) syntax".into())
905        })?;
906        let proc_name = &after_dot[..paren_pos];
907        let args_str = after_dot[paren_pos + 1..].trim_end_matches([')', ';']);
908
909        let args: Vec<String> = if args_str.trim().is_empty() {
910            Vec::new()
911        } else {
912            args_str
913                .split(',')
914                .map(|s| s.trim().trim_matches('\'').to_string())
915                .collect()
916        };
917
918        // Find matching extension.
919        let ext = self
920            .extensions
921            .iter()
922            .find(|e| e.name() == ext_name)
923            .ok_or_else(|| KyuError::Binder(format!("unknown extension '{ext_name}'")))?;
924
925        // Build adjacency only if the extension needs it (e.g., graph algorithms).
926        let adjacency = if ext.needs_graph() {
927            self.build_graph_adjacency()
928        } else {
929            std::collections::HashMap::new()
930        };
931
932        // Execute.
933        let rows = ext
934            .execute(proc_name, &args, &adjacency)
935            .map_err(|e| KyuError::Runtime(format!("extension error: {e}")))?;
936
937        // Get procedure signature to determine column names and types.
938        let proc_sig = ext
939            .procedures()
940            .into_iter()
941            .find(|p| p.name == proc_name)
942            .ok_or_else(|| {
943                KyuError::Binder(format!(
944                    "unknown procedure '{proc_name}' in extension '{ext_name}'"
945                ))
946            })?;
947
948        let col_names: Vec<SmolStr> = proc_sig
949            .columns
950            .iter()
951            .map(|c| SmolStr::new(&c.name))
952            .collect();
953        let col_types: Vec<LogicalType> = proc_sig
954            .columns
955            .iter()
956            .map(|c| c.data_type.clone())
957            .collect();
958
959        let mut result = QueryResult::new(col_names, col_types);
960        for proc_row in rows {
961            result.push_row(proc_row);
962        }
963
964        Ok(Some(result))
965    }
966
967    /// Build a complete graph adjacency map from all relationship tables.
968    ///
969    /// Uses typed slice accessors on FlatVector columns for direct i64 buffer
970    /// access, avoiding per-element get_value() dispatch and TypedValue construction.
971    fn build_graph_adjacency(&self) -> std::collections::HashMap<i64, Vec<(i64, f64)>> {
972        use kyu_executor::value_vector::ValueVector;
973
974        let mut adjacency: std::collections::HashMap<i64, Vec<(i64, f64)>> =
975            std::collections::HashMap::new();
976        let catalog = self.catalog.read();
977        let storage = self.storage.read().unwrap();
978
979        for rel in catalog.rel_tables() {
980            let table_id = rel.table_id;
981            for chunk in storage.scan_table(table_id) {
982                let n = chunk.num_rows();
983                if n == 0 {
984                    continue;
985                }
986
987                let src_col = chunk.column(0);
988                let dst_col = chunk.column(1);
989
990                // Fast path: both columns are FlatVector Int64 with identity selection.
991                if chunk.selection().is_identity()
992                    && let (ValueVector::Flat(src_flat), ValueVector::Flat(dst_flat)) =
993                        (src_col, dst_col)
994                {
995                    let src_slice = src_flat.data_as_i64_slice();
996                    let dst_slice = dst_flat.data_as_i64_slice();
997                    let src_nm = src_flat.null_mask();
998                    let dst_nm = dst_flat.null_mask();
999                    for i in 0..n {
1000                        if !src_nm.is_null(i as u64) && !dst_nm.is_null(i as u64) {
1001                            adjacency
1002                                .entry(src_slice[i])
1003                                .or_default()
1004                                .push((dst_slice[i], 1.0));
1005                        }
1006                    }
1007                    continue;
1008                }
1009
1010                // Fallback: per-element extraction.
1011                for row_idx in 0..n {
1012                    let src = chunk.get_value(row_idx, 0);
1013                    let dst = chunk.get_value(row_idx, 1);
1014                    if let (TypedValue::Int64(s), TypedValue::Int64(d)) = (src, dst) {
1015                        adjacency.entry(s).or_default().push((d, 1.0));
1016                    }
1017                }
1018            }
1019        }
1020
1021        adjacency
1022    }
1023
1024    // ---- DDL execution ----
1025
1026    fn exec_create_node_table(
1027        &self,
1028        create: &kyu_binder::BoundCreateNodeTable,
1029    ) -> KyuResult<QueryResult> {
1030        let mut catalog = self.catalog.begin_write();
1031
1032        let table_id = catalog.alloc_table_id();
1033        let properties: Vec<Property> = create
1034            .columns
1035            .iter()
1036            .map(|col| {
1037                let prop_id = catalog.alloc_property_id();
1038                Property::new(
1039                    prop_id,
1040                    col.name.clone(),
1041                    col.data_type.clone(),
1042                    col.property_id.0 as usize == create.primary_key_idx,
1043                )
1044            })
1045            .collect();
1046
1047        let schema: Vec<LogicalType> = create.columns.iter().map(|c| c.data_type.clone()).collect();
1048
1049        catalog.add_node_table(NodeTableEntry {
1050            table_id,
1051            name: create.name.clone(),
1052            properties,
1053            primary_key_idx: create.primary_key_idx,
1054            num_rows: 0,
1055            comment: None,
1056        })?;
1057
1058        self.catalog.commit_write(catalog);
1059
1060        // Create corresponding storage table.
1061        self.storage.write().unwrap().create_table(table_id, schema);
1062
1063        Ok(QueryResult::new(vec![], vec![]))
1064    }
1065
1066    fn exec_create_rel_table(
1067        &self,
1068        create: &kyu_binder::BoundCreateRelTable,
1069    ) -> KyuResult<QueryResult> {
1070        let mut catalog = self.catalog.begin_write();
1071
1072        let table_id = catalog.alloc_table_id();
1073        let properties: Vec<Property> = create
1074            .columns
1075            .iter()
1076            .map(|col| {
1077                let prop_id = catalog.alloc_property_id();
1078                Property::new(prop_id, col.name.clone(), col.data_type.clone(), false)
1079            })
1080            .collect();
1081
1082        // Storage schema: src_key_type, dst_key_type, then user properties.
1083        let from_key_type = catalog
1084            .find_by_id(create.from_table_id)
1085            .and_then(|e| e.as_node_table())
1086            .map(|n| n.primary_key_property().data_type.clone())
1087            .unwrap_or(LogicalType::Int64);
1088        let to_key_type = catalog
1089            .find_by_id(create.to_table_id)
1090            .and_then(|e| e.as_node_table())
1091            .map(|n| n.primary_key_property().data_type.clone())
1092            .unwrap_or(LogicalType::Int64);
1093        let mut schema = vec![from_key_type, to_key_type];
1094        schema.extend(create.columns.iter().map(|c| c.data_type.clone()));
1095
1096        catalog.add_rel_table(RelTableEntry {
1097            table_id,
1098            name: create.name.clone(),
1099            from_table_id: create.from_table_id,
1100            to_table_id: create.to_table_id,
1101            properties,
1102            num_rows: 0,
1103            comment: None,
1104        })?;
1105
1106        self.catalog.commit_write(catalog);
1107
1108        self.storage.write().unwrap().create_table(table_id, schema);
1109
1110        Ok(QueryResult::new(vec![], vec![]))
1111    }
1112
1113    fn exec_drop(&self, drop: &kyu_binder::BoundDrop) -> KyuResult<QueryResult> {
1114        let mut catalog = self.catalog.begin_write();
1115        catalog
1116            .remove_by_id(drop.table_id)
1117            .ok_or_else(|| KyuError::Catalog(format!("table '{}' not found", drop.name)))?;
1118        self.catalog.commit_write(catalog);
1119
1120        self.storage.write().unwrap().drop_table(drop.table_id);
1121
1122        Ok(QueryResult::new(vec![], vec![]))
1123    }
1124
1125    // ---- COPY FROM ----
1126
1127    fn exec_copy_from(&self, copy: &kyu_binder::BoundCopyFrom) -> KyuResult<QueryResult> {
1128        // Evaluate source expression to get file path.
1129        let path_val = evaluate_constant(&copy.source)?;
1130        let path = match &path_val {
1131            TypedValue::String(s) => s.as_str().to_string(),
1132            _ => {
1133                return Err(KyuError::Copy(
1134                    "COPY FROM source must be a string path".into(),
1135                ));
1136            }
1137        };
1138
1139        // Get the table schema from catalog.
1140        let catalog_snapshot = self.catalog.read();
1141        let entry = catalog_snapshot
1142            .find_by_id(copy.table_id)
1143            .ok_or_else(|| KyuError::Catalog(format!("table {:?} not found", copy.table_id)))?;
1144        let properties = entry.properties();
1145        let schema: Vec<LogicalType> = properties.iter().map(|p| p.data_type.clone()).collect();
1146        drop(catalog_snapshot);
1147
1148        // Open reader (auto-detects format by extension: .csv, .parquet, .arrow, .ipc).
1149        let reader = kyu_copy::open_reader(&path, &schema)?;
1150
1151        let mut storage = self.storage.write().unwrap();
1152        for row_result in reader {
1153            let values = row_result?;
1154            storage.insert_row(copy.table_id, &values)?;
1155        }
1156
1157        Ok(QueryResult::new(vec![], vec![]))
1158    }
1159}
1160
1161// ---- Delta helpers (module-level, used by Connection::apply_delta) ----
1162
1163/// Parse a string primary key into the correct TypedValue for the given column type.
1164fn parse_primary_key(value: &str, ty: &LogicalType) -> KyuResult<TypedValue> {
1165    match ty {
1166        LogicalType::Int8 => value
1167            .parse::<i8>()
1168            .map(TypedValue::Int8)
1169            .map_err(|e| KyuError::Delta(format!("cannot parse PK '{value}' as INT8: {e}"))),
1170        LogicalType::Int16 => value
1171            .parse::<i16>()
1172            .map(TypedValue::Int16)
1173            .map_err(|e| KyuError::Delta(format!("cannot parse PK '{value}' as INT16: {e}"))),
1174        LogicalType::Int32 => value
1175            .parse::<i32>()
1176            .map(TypedValue::Int32)
1177            .map_err(|e| KyuError::Delta(format!("cannot parse PK '{value}' as INT32: {e}"))),
1178        LogicalType::Int64 | LogicalType::Serial => value
1179            .parse::<i64>()
1180            .map(TypedValue::Int64)
1181            .map_err(|e| KyuError::Delta(format!("cannot parse PK '{value}' as INT64: {e}"))),
1182        LogicalType::String => Ok(TypedValue::String(SmolStr::new(value))),
1183        _ => Err(KyuError::Delta(format!(
1184            "unsupported primary key type '{}' for delta upsert",
1185            ty.type_name()
1186        ))),
1187    }
1188}
1189
1190/// Find the global row index of the first live row matching a primary key value.
1191fn find_row_by_pk(
1192    storage: &crate::storage::NodeGroupStorage,
1193    table_id: TableId,
1194    pk_col_idx: usize,
1195    pk_value: &TypedValue,
1196) -> KyuResult<Option<u64>> {
1197    let rows = storage.scan_rows(table_id)?;
1198    for (row_idx, row_values) in &rows {
1199        if row_values.get(pk_col_idx) == Some(pk_value) {
1200            return Ok(Some(*row_idx));
1201        }
1202    }
1203    Ok(None)
1204}
1205
1206/// Find the global row index of an edge row matching src and dst primary keys.
1207/// Rel table storage schema: [src_key, dst_key, ...user_props].
1208fn find_edge_row(
1209    storage: &crate::storage::NodeGroupStorage,
1210    rel_table_id: TableId,
1211    src_pk: &TypedValue,
1212    dst_pk: &TypedValue,
1213) -> KyuResult<Option<u64>> {
1214    let rows = storage.scan_rows(rel_table_id)?;
1215    for (row_idx, row_values) in &rows {
1216        if row_values.first() == Some(src_pk) && row_values.get(1) == Some(dst_pk) {
1217            return Ok(Some(*row_idx));
1218        }
1219    }
1220    Ok(None)
1221}
1222
1223/// Find a property's column index in a node table entry by name.
1224fn find_property_index(entry: &NodeTableEntry, name: &str) -> Option<usize> {
1225    let lower = name.to_lowercase();
1226    entry
1227        .properties
1228        .iter()
1229        .position(|p| p.name.to_lowercase() == lower)
1230}
1231
1232/// Find a property's index in a rel table entry by name (0-based within user properties).
1233fn find_rel_property_index(entry: &RelTableEntry, name: &str) -> Option<usize> {
1234    let lower = name.to_lowercase();
1235    entry
1236        .properties
1237        .iter()
1238        .position(|p| p.name.to_lowercase() == lower)
1239}
1240
1241/// Build a full node row from delta properties. Columns not in `props` default to Null.
1242fn build_node_row(
1243    entry: &NodeTableEntry,
1244    pk_value: &TypedValue,
1245    props: &hashbrown::HashMap<SmolStr, TypedValue>,
1246) -> Vec<TypedValue> {
1247    entry
1248        .properties
1249        .iter()
1250        .enumerate()
1251        .map(|(i, prop)| {
1252            if i == entry.primary_key_idx {
1253                pk_value.clone()
1254            } else if let Some(val) = props.get(&prop.name) {
1255                val.clone()
1256            } else {
1257                TypedValue::Null
1258            }
1259        })
1260        .collect()
1261}
1262
1263/// Build a full edge row: [src_pk, dst_pk, ...user_props].
1264fn build_edge_row(
1265    entry: &RelTableEntry,
1266    src_pk: &TypedValue,
1267    dst_pk: &TypedValue,
1268    props: &hashbrown::HashMap<SmolStr, TypedValue>,
1269) -> Vec<TypedValue> {
1270    let mut row = vec![src_pk.clone(), dst_pk.clone()];
1271    for prop in &entry.properties {
1272        if let Some(val) = props.get(&prop.name) {
1273            row.push(val.clone());
1274        } else {
1275            row.push(TypedValue::Null);
1276        }
1277    }
1278    row
1279}
1280
1281#[cfg(test)]
1282mod tests {
1283    use crate::database::Database;
1284    use kyu_types::TypedValue;
1285    use smol_str::SmolStr;
1286
1287    #[test]
1288    fn create_database_and_connect() {
1289        let db = Database::in_memory();
1290        let _conn = db.connect();
1291        assert_eq!(db.catalog().num_tables(), 0);
1292    }
1293
1294    #[test]
1295    fn return_literal() {
1296        let db = Database::in_memory();
1297        let conn = db.connect();
1298        let result = conn.query("RETURN 1 AS x").unwrap();
1299        assert_eq!(result.num_rows(), 1);
1300        assert_eq!(result.row(0), vec![TypedValue::Int64(1)]);
1301    }
1302
1303    #[test]
1304    fn return_arithmetic() {
1305        let db = Database::in_memory();
1306        let conn = db.connect();
1307        let result = conn.query("RETURN 2 + 3 AS sum").unwrap();
1308        assert_eq!(result.row(0), vec![TypedValue::Int64(5)]);
1309    }
1310
1311    #[test]
1312    fn create_node_table() {
1313        let db = Database::in_memory();
1314        let conn = db.connect();
1315        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1316            .unwrap();
1317
1318        assert_eq!(db.catalog().num_tables(), 1);
1319        let snapshot = db.catalog().read();
1320        let entry = snapshot.find_by_name("Person").unwrap();
1321        assert!(entry.is_node_table());
1322        assert_eq!(entry.properties().len(), 2);
1323
1324        assert!(db.storage().read().unwrap().has_table(entry.table_id()));
1325    }
1326
1327    #[test]
1328    fn create_and_query_empty_table() {
1329        let db = Database::in_memory();
1330        let conn = db.connect();
1331        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1332            .unwrap();
1333        let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1334        assert_eq!(result.num_rows(), 0);
1335    }
1336
1337    #[test]
1338    fn create_rel_table() {
1339        let db = Database::in_memory();
1340        let conn = db.connect();
1341        conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1342            .unwrap();
1343        conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person, since INT64)")
1344            .unwrap();
1345
1346        assert_eq!(db.catalog().num_tables(), 2);
1347        let snapshot = db.catalog().read();
1348        let entry = snapshot.find_by_name("KNOWS").unwrap();
1349        assert!(entry.is_rel_table());
1350    }
1351
1352    #[test]
1353    fn drop_table() {
1354        let db = Database::in_memory();
1355        let conn = db.connect();
1356        conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1357            .unwrap();
1358        assert_eq!(db.catalog().num_tables(), 1);
1359
1360        conn.query("DROP TABLE Person").unwrap();
1361        assert_eq!(db.catalog().num_tables(), 0);
1362    }
1363
1364    #[test]
1365    fn create_duplicate_error() {
1366        let db = Database::in_memory();
1367        let conn = db.connect();
1368        conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1369            .unwrap();
1370        let result = conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))");
1371        assert!(result.is_err());
1372    }
1373
1374    #[test]
1375    fn parse_error_propagated() {
1376        let db = Database::in_memory();
1377        let conn = db.connect();
1378        let result = conn.query("THIS IS NOT VALID CYPHER !!!");
1379        assert!(result.is_err());
1380    }
1381
1382    #[test]
1383    fn multiple_connections_share_state() {
1384        let db = Database::in_memory();
1385        let conn1 = db.connect();
1386        let conn2 = db.connect();
1387
1388        conn1
1389            .query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1390            .unwrap();
1391
1392        // conn2 should see the table created by conn1.
1393        assert_eq!(db.catalog().num_tables(), 1);
1394        let result = conn2.query("MATCH (p:Person) RETURN p.id").unwrap();
1395        assert_eq!(result.num_rows(), 0);
1396    }
1397
1398    #[test]
1399    fn create_node_via_cypher() {
1400        let db = Database::in_memory();
1401        let conn = db.connect();
1402        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1403            .unwrap();
1404
1405        conn.query("CREATE (n:Person {id: 1, name: 'Alice'})")
1406            .unwrap();
1407
1408        let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1409        assert_eq!(result.num_rows(), 1);
1410        assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1411    }
1412
1413    #[test]
1414    fn create_multiple_nodes() {
1415        let db = Database::in_memory();
1416        let conn = db.connect();
1417        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1418            .unwrap();
1419
1420        conn.query("CREATE (a:Person {id: 1, name: 'Alice'}), (b:Person {id: 2, name: 'Bob'})")
1421            .unwrap();
1422
1423        let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1424        assert_eq!(result.num_rows(), 2);
1425    }
1426
1427    #[test]
1428    fn create_node_partial_properties() {
1429        let db = Database::in_memory();
1430        let conn = db.connect();
1431        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1432            .unwrap();
1433
1434        // name not specified — should be NULL
1435        conn.query("CREATE (n:Person {id: 1})").unwrap();
1436
1437        let result = conn.query("MATCH (p:Person) RETURN p.id, p.name").unwrap();
1438        assert_eq!(result.num_rows(), 1);
1439        assert_eq!(result.row(0)[0], TypedValue::Int64(1));
1440        assert_eq!(result.row(0)[1], TypedValue::Null);
1441    }
1442
1443    #[test]
1444    fn create_and_return() {
1445        let db = Database::in_memory();
1446        let conn = db.connect();
1447        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1448            .unwrap();
1449
1450        let result = conn
1451            .query("CREATE (n:Person {id: 1, name: 'Alice'}) RETURN n.name, n.id")
1452            .unwrap();
1453        assert_eq!(result.num_rows(), 1);
1454        assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1455        assert_eq!(result.row(0)[1], TypedValue::Int64(1));
1456    }
1457
1458    #[test]
1459    fn match_set_property() {
1460        let db = Database::in_memory();
1461        let conn = db.connect();
1462        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, age INT64, PRIMARY KEY (id))")
1463            .unwrap();
1464        conn.query("CREATE (n:Person {id: 1, name: 'Alice', age: 25})")
1465            .unwrap();
1466
1467        conn.query("MATCH (p:Person) WHERE p.name = 'Alice' SET p.age = 31")
1468            .unwrap();
1469
1470        let result = conn.query("MATCH (p:Person) RETURN p.age").unwrap();
1471        assert_eq!(result.num_rows(), 1);
1472        assert_eq!(result.row(0)[0], TypedValue::Int64(31));
1473    }
1474
1475    #[test]
1476    fn match_set_with_where() {
1477        let db = Database::in_memory();
1478        let conn = db.connect();
1479        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, age INT64, PRIMARY KEY (id))")
1480            .unwrap();
1481        conn.query("CREATE (a:Person {id: 1, name: 'Alice', age: 25})")
1482            .unwrap();
1483        conn.query("CREATE (b:Person {id: 2, name: 'Bob', age: 30})")
1484            .unwrap();
1485
1486        // Only update Alice's age.
1487        conn.query("MATCH (p:Person) WHERE p.id = 1 SET p.age = 26")
1488            .unwrap();
1489
1490        let result = conn.query("MATCH (p:Person) RETURN p.name, p.age").unwrap();
1491        assert_eq!(result.num_rows(), 2);
1492        // Find Alice's row and Bob's row.
1493        let alice_row = result
1494            .iter_rows()
1495            .find(|r| r[0] == TypedValue::String(SmolStr::new("Alice")))
1496            .unwrap();
1497        let bob_row = result
1498            .iter_rows()
1499            .find(|r| r[0] == TypedValue::String(SmolStr::new("Bob")))
1500            .unwrap();
1501        assert_eq!(alice_row[1], TypedValue::Int64(26)); // updated
1502        assert_eq!(bob_row[1], TypedValue::Int64(30)); // unchanged
1503    }
1504
1505    #[test]
1506    fn match_set_all_rows() {
1507        let db = Database::in_memory();
1508        let conn = db.connect();
1509        conn.query("CREATE NODE TABLE Person (id INT64, active INT64, PRIMARY KEY (id))")
1510            .unwrap();
1511        conn.query("CREATE (a:Person {id: 1, active: 0})").unwrap();
1512        conn.query("CREATE (b:Person {id: 2, active: 0})").unwrap();
1513
1514        // SET without WHERE — affects all rows.
1515        conn.query("MATCH (p:Person) SET p.active = 1").unwrap();
1516
1517        let result = conn.query("MATCH (p:Person) RETURN p.active").unwrap();
1518        assert_eq!(result.num_rows(), 2);
1519        assert_eq!(result.row(0)[0], TypedValue::Int64(1));
1520        assert_eq!(result.row(1)[0], TypedValue::Int64(1));
1521    }
1522
1523    #[test]
1524    fn match_delete() {
1525        let db = Database::in_memory();
1526        let conn = db.connect();
1527        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1528            .unwrap();
1529        conn.query("CREATE (a:Person {id: 1, name: 'Alice'})")
1530            .unwrap();
1531        conn.query("CREATE (b:Person {id: 2, name: 'Bob'})")
1532            .unwrap();
1533
1534        conn.query("MATCH (p:Person) WHERE p.name = 'Alice' DELETE p")
1535            .unwrap();
1536
1537        let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1538        assert_eq!(result.num_rows(), 1);
1539        assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Bob")));
1540    }
1541
1542    #[test]
1543    fn match_delete_all() {
1544        let db = Database::in_memory();
1545        let conn = db.connect();
1546        conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1547            .unwrap();
1548        conn.query("CREATE (a:Person {id: 1})").unwrap();
1549        conn.query("CREATE (b:Person {id: 2})").unwrap();
1550
1551        conn.query("MATCH (p:Person) DELETE p").unwrap();
1552
1553        let result = conn.query("MATCH (p:Person) RETURN p.id").unwrap();
1554        assert_eq!(result.num_rows(), 0);
1555    }
1556
1557    #[test]
1558    fn storage_roundtrip_insert_scan() {
1559        let db = Database::in_memory();
1560        let conn = db.connect();
1561        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1562            .unwrap();
1563
1564        // Get table ID from catalog.
1565        let snapshot = db.catalog().read();
1566        let table_id = snapshot.find_by_name("Person").unwrap().table_id();
1567        drop(snapshot);
1568
1569        // Insert directly via storage API (DML via Cypher deferred).
1570        db.storage()
1571            .write()
1572            .unwrap()
1573            .insert_row(
1574                table_id,
1575                &[
1576                    TypedValue::Int64(1),
1577                    TypedValue::String(SmolStr::new("Alice")),
1578                ],
1579            )
1580            .unwrap();
1581
1582        // Query reads from real NodeGroup/ColumnChunk storage.
1583        let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1584        assert_eq!(result.num_rows(), 1);
1585        assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1586    }
1587
1588    #[test]
1589    fn storage_roundtrip_multiple_rows() {
1590        let db = Database::in_memory();
1591        let conn = db.connect();
1592        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, age INT64, PRIMARY KEY (id))")
1593            .unwrap();
1594
1595        let snapshot = db.catalog().read();
1596        let table_id = snapshot.find_by_name("Person").unwrap().table_id();
1597        drop(snapshot);
1598
1599        let mut storage = db.storage().write().unwrap();
1600        storage
1601            .insert_row(
1602                table_id,
1603                &[
1604                    TypedValue::Int64(1),
1605                    TypedValue::String(SmolStr::new("Alice")),
1606                    TypedValue::Int64(25),
1607                ],
1608            )
1609            .unwrap();
1610        storage
1611            .insert_row(
1612                table_id,
1613                &[
1614                    TypedValue::Int64(2),
1615                    TypedValue::String(SmolStr::new("Bob")),
1616                    TypedValue::Int64(30),
1617                ],
1618            )
1619            .unwrap();
1620        drop(storage);
1621
1622        let result = conn.query("MATCH (p:Person) RETURN p.name, p.age").unwrap();
1623        assert_eq!(result.num_rows(), 2);
1624    }
1625
1626    #[test]
1627    fn copy_from_csv() {
1628        use std::io::Write;
1629
1630        let dir = std::env::temp_dir().join("kyu_test_csv");
1631        let _ = std::fs::create_dir_all(&dir);
1632        let csv_path = dir.join("persons.csv");
1633        {
1634            let mut f = std::fs::File::create(&csv_path).unwrap();
1635            writeln!(f, "id,name").unwrap();
1636            writeln!(f, "1,Alice").unwrap();
1637            writeln!(f, "2,Bob").unwrap();
1638            writeln!(f, "3,Charlie").unwrap();
1639        }
1640
1641        let db = Database::in_memory();
1642        let conn = db.connect();
1643        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1644            .unwrap();
1645        conn.query(&format!("COPY Person FROM '{}'", csv_path.display()))
1646            .unwrap();
1647
1648        let result = conn.query("MATCH (p:Person) RETURN p.id, p.name").unwrap();
1649        assert_eq!(result.num_rows(), 3);
1650
1651        // Clean up.
1652        let _ = std::fs::remove_file(&csv_path);
1653    }
1654
1655    #[test]
1656    fn copy_from_csv_multiple_types() {
1657        use std::io::Write;
1658
1659        let dir = std::env::temp_dir().join("kyu_test_csv");
1660        let _ = std::fs::create_dir_all(&dir);
1661        let csv_path = dir.join("typed.csv");
1662        {
1663            let mut f = std::fs::File::create(&csv_path).unwrap();
1664            writeln!(f, "id,name,score,active").unwrap();
1665            writeln!(f, "1,Alice,95.5,true").unwrap();
1666            writeln!(f, "2,Bob,87.3,false").unwrap();
1667        }
1668
1669        let db = Database::in_memory();
1670        let conn = db.connect();
1671        conn.query(
1672            "CREATE NODE TABLE Student (id INT64, name STRING, score DOUBLE, active BOOL, PRIMARY KEY (id))",
1673        )
1674        .unwrap();
1675        conn.query(&format!("COPY Student FROM '{}'", csv_path.display()))
1676            .unwrap();
1677
1678        let result = conn
1679            .query("MATCH (s:Student) RETURN s.name, s.score, s.active")
1680            .unwrap();
1681        assert_eq!(result.num_rows(), 2);
1682        assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1683        assert_eq!(result.row(0)[1], TypedValue::Double(95.5));
1684        assert_eq!(result.row(0)[2], TypedValue::Bool(true));
1685
1686        let _ = std::fs::remove_file(&csv_path);
1687    }
1688
1689    #[test]
1690    fn copy_from_parquet() {
1691        use arrow::array::{Int64Array, StringArray};
1692        use arrow::datatypes::{DataType, Field, Schema};
1693        use arrow::record_batch::RecordBatch;
1694        use parquet::arrow::ArrowWriter;
1695        use std::sync::Arc;
1696
1697        let dir = std::env::temp_dir().join("kyu_test_parquet_copy");
1698        let _ = std::fs::create_dir_all(&dir);
1699        let parquet_path = dir.join("persons.parquet");
1700        {
1701            let schema = Arc::new(Schema::new(vec![
1702                Field::new("id", DataType::Int64, false),
1703                Field::new("name", DataType::Utf8, false),
1704            ]));
1705            let ids = Int64Array::from(vec![1, 2, 3]);
1706            let names = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
1707            let batch =
1708                RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(ids), Arc::new(names)])
1709                    .unwrap();
1710            let file = std::fs::File::create(&parquet_path).unwrap();
1711            let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), None).unwrap();
1712            writer.write(&batch).unwrap();
1713            writer.close().unwrap();
1714        }
1715
1716        let db = Database::in_memory();
1717        let conn = db.connect();
1718        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1719            .unwrap();
1720        conn.query(&format!("COPY Person FROM '{}'", parquet_path.display()))
1721            .unwrap();
1722
1723        let result = conn.query("MATCH (p:Person) RETURN p.id, p.name").unwrap();
1724        assert_eq!(result.num_rows(), 3);
1725        assert_eq!(result.row(0)[0], TypedValue::Int64(1));
1726        assert_eq!(result.row(0)[1], TypedValue::String(SmolStr::new("Alice")));
1727
1728        let _ = std::fs::remove_dir_all(&dir);
1729    }
1730
1731    #[test]
1732    fn call_extension_pagerank() {
1733        let mut db = Database::in_memory();
1734        db.register_extension(Box::new(ext_algo::AlgoExtension));
1735        let conn = db.connect();
1736
1737        // Create graph: 1->2->3->1 (cycle).
1738        conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1739            .unwrap();
1740        conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person)")
1741            .unwrap();
1742        conn.query("CREATE (n:Person {id: 1})").unwrap();
1743        conn.query("CREATE (n:Person {id: 2})").unwrap();
1744        conn.query("CREATE (n:Person {id: 3})").unwrap();
1745
1746        // Insert relationships directly.
1747        let snapshot = db.catalog().read();
1748        let rel_table_id = snapshot.find_by_name("KNOWS").unwrap().table_id();
1749        drop(snapshot);
1750        {
1751            let mut storage = db.storage().write().unwrap();
1752            storage
1753                .insert_row(rel_table_id, &[TypedValue::Int64(1), TypedValue::Int64(2)])
1754                .unwrap();
1755            storage
1756                .insert_row(rel_table_id, &[TypedValue::Int64(2), TypedValue::Int64(3)])
1757                .unwrap();
1758            storage
1759                .insert_row(rel_table_id, &[TypedValue::Int64(3), TypedValue::Int64(1)])
1760                .unwrap();
1761        }
1762
1763        let result = conn
1764            .query("CALL algo.pageRank(0.85, 20, 0.000001)")
1765            .unwrap();
1766        assert_eq!(result.num_rows(), 3);
1767        assert_eq!(result.column_names.len(), 2);
1768        // All ranks should be positive.
1769        for row in result.iter_rows() {
1770            if let TypedValue::Double(rank) = &row[1] {
1771                assert!(*rank > 0.0);
1772            }
1773        }
1774    }
1775
1776    #[test]
1777    fn call_extension_wcc() {
1778        let mut db = Database::in_memory();
1779        db.register_extension(Box::new(ext_algo::AlgoExtension));
1780        let conn = db.connect();
1781
1782        conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1783            .unwrap();
1784        conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person)")
1785            .unwrap();
1786        conn.query("CREATE (n:Person {id: 1})").unwrap();
1787        conn.query("CREATE (n:Person {id: 2})").unwrap();
1788        conn.query("CREATE (n:Person {id: 10})").unwrap();
1789        conn.query("CREATE (n:Person {id: 11})").unwrap();
1790
1791        let snapshot = db.catalog().read();
1792        let rel_table_id = snapshot.find_by_name("KNOWS").unwrap().table_id();
1793        drop(snapshot);
1794        {
1795            let mut storage = db.storage().write().unwrap();
1796            storage
1797                .insert_row(rel_table_id, &[TypedValue::Int64(1), TypedValue::Int64(2)])
1798                .unwrap();
1799            storage
1800                .insert_row(
1801                    rel_table_id,
1802                    &[TypedValue::Int64(10), TypedValue::Int64(11)],
1803                )
1804                .unwrap();
1805        }
1806
1807        let result = conn.query("CALL algo.wcc()").unwrap();
1808        assert_eq!(result.num_rows(), 4);
1809    }
1810
1811    #[test]
1812    fn call_extension_betweenness() {
1813        let mut db = Database::in_memory();
1814        db.register_extension(Box::new(ext_algo::AlgoExtension));
1815        let conn = db.connect();
1816
1817        conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1818            .unwrap();
1819        conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person)")
1820            .unwrap();
1821        conn.query("CREATE (n:Person {id: 1})").unwrap();
1822        conn.query("CREATE (n:Person {id: 2})").unwrap();
1823        conn.query("CREATE (n:Person {id: 3})").unwrap();
1824
1825        let snapshot = db.catalog().read();
1826        let rel_table_id = snapshot.find_by_name("KNOWS").unwrap().table_id();
1827        drop(snapshot);
1828        {
1829            let mut storage = db.storage().write().unwrap();
1830            storage
1831                .insert_row(rel_table_id, &[TypedValue::Int64(1), TypedValue::Int64(2)])
1832                .unwrap();
1833            storage
1834                .insert_row(rel_table_id, &[TypedValue::Int64(2), TypedValue::Int64(3)])
1835                .unwrap();
1836        }
1837
1838        let result = conn.query("CALL algo.betweenness()").unwrap();
1839        assert_eq!(result.num_rows(), 3);
1840    }
1841
1842    #[test]
1843    fn call_unknown_extension() {
1844        let db = Database::in_memory();
1845        let conn = db.connect();
1846        let result = conn.query("CALL nonexistent.proc()");
1847        assert!(result.is_err());
1848    }
1849
1850    #[test]
1851    fn persistence_survives_restart() {
1852        let dir = std::env::temp_dir().join("kyu_test_persist_e2e");
1853        let _ = std::fs::remove_dir_all(&dir);
1854
1855        // Phase 1: Create schema and insert data, then drop the database.
1856        {
1857            let db = Database::open(&dir).unwrap();
1858            let conn = db.connect();
1859            conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1860                .unwrap();
1861            conn.query("CREATE (n:Person {id: 1, name: 'Alice'})")
1862                .unwrap();
1863            conn.query("CREATE (n:Person {id: 2, name: 'Bob'})")
1864                .unwrap();
1865            // Drop triggers checkpoint (Drop impl).
1866        }
1867
1868        // Phase 2: Reopen and verify schema + data survived.
1869        {
1870            let db = Database::open(&dir).unwrap();
1871            let conn = db.connect();
1872
1873            // Schema should be recovered.
1874            assert_eq!(db.catalog().num_tables(), 1);
1875            let snapshot = db.catalog().read();
1876            assert!(snapshot.find_by_name("Person").is_some());
1877            drop(snapshot);
1878
1879            // Data should be recovered.
1880            let result = conn.query("MATCH (p:Person) RETURN p.id, p.name").unwrap();
1881            assert_eq!(result.num_rows(), 2);
1882        }
1883
1884        let _ = std::fs::remove_dir_all(&dir);
1885    }
1886
1887    #[test]
1888    fn persistence_ddl_recovery_via_wal() {
1889        let dir = std::env::temp_dir().join("kyu_test_persist_ddl");
1890        let _ = std::fs::remove_dir_all(&dir);
1891
1892        // Phase 1: Create schema. The checkpoint on Drop will flush everything.
1893        {
1894            let db = Database::open(&dir).unwrap();
1895            let conn = db.connect();
1896            conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
1897                .unwrap();
1898            conn.query("CREATE NODE TABLE Organization (id INT64, name STRING, PRIMARY KEY (id))")
1899                .unwrap();
1900        }
1901
1902        // Phase 2: Verify both tables survived.
1903        {
1904            let db = Database::open(&dir).unwrap();
1905            assert_eq!(db.catalog().num_tables(), 2);
1906            let snapshot = db.catalog().read();
1907            assert!(snapshot.find_by_name("Person").is_some());
1908            assert!(snapshot.find_by_name("Organization").is_some());
1909        }
1910
1911        let _ = std::fs::remove_dir_all(&dir);
1912    }
1913
1914    #[test]
1915    fn persistence_empty_database() {
1916        let dir = std::env::temp_dir().join("kyu_test_persist_empty_db");
1917        let _ = std::fs::remove_dir_all(&dir);
1918
1919        // Open, do nothing, drop.
1920        {
1921            let _db = Database::open(&dir).unwrap();
1922        }
1923
1924        // Reopen — should be empty.
1925        {
1926            let db = Database::open(&dir).unwrap();
1927            assert_eq!(db.catalog().num_tables(), 0);
1928        }
1929
1930        let _ = std::fs::remove_dir_all(&dir);
1931    }
1932
1933    // ---- Parameterized query tests ----
1934
1935    #[test]
1936    fn return_param() {
1937        let db = Database::in_memory();
1938        let conn = db.connect();
1939        let mut params = std::collections::HashMap::new();
1940        params.insert("x".to_string(), TypedValue::Int64(42));
1941        let result = conn.query_with_params("RETURN $x AS val", params).unwrap();
1942        assert_eq!(result.num_rows(), 1);
1943        assert_eq!(result.row(0), vec![TypedValue::Int64(42)]);
1944    }
1945
1946    #[test]
1947    fn parameterized_where() {
1948        let db = Database::in_memory();
1949        let conn = db.connect();
1950        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, age INT64, PRIMARY KEY (id))")
1951            .unwrap();
1952        conn.query("CREATE (n:Person {id: 1, name: 'Alice', age: 30})")
1953            .unwrap();
1954        conn.query("CREATE (n:Person {id: 2, name: 'Bob', age: 20})")
1955            .unwrap();
1956
1957        let mut params = std::collections::HashMap::new();
1958        params.insert("min_age".to_string(), TypedValue::Int64(25));
1959        let result = conn
1960            .query_with_params(
1961                "MATCH (p:Person) WHERE p.age > $min_age RETURN p.name",
1962                params,
1963            )
1964            .unwrap();
1965        assert_eq!(result.num_rows(), 1);
1966        assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1967    }
1968
1969    #[test]
1970    fn parameterized_create() {
1971        let db = Database::in_memory();
1972        let conn = db.connect();
1973        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
1974            .unwrap();
1975
1976        let mut params = std::collections::HashMap::new();
1977        params.insert("id".to_string(), TypedValue::Int64(1));
1978        params.insert(
1979            "name".to_string(),
1980            TypedValue::String(SmolStr::new("Alice")),
1981        );
1982        conn.query_with_params("CREATE (n:Person {id: $id, name: $name})", params)
1983            .unwrap();
1984
1985        let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
1986        assert_eq!(result.num_rows(), 1);
1987        assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Alice")));
1988    }
1989
1990    #[test]
1991    fn parameterized_set() {
1992        let db = Database::in_memory();
1993        let conn = db.connect();
1994        conn.query("CREATE NODE TABLE Person (id INT64, age INT64, PRIMARY KEY (id))")
1995            .unwrap();
1996        conn.query("CREATE (n:Person {id: 1, age: 25})").unwrap();
1997
1998        let mut params = std::collections::HashMap::new();
1999        params.insert("new_age".to_string(), TypedValue::Int64(31));
2000        conn.query_with_params(
2001            "MATCH (p:Person) WHERE p.id = 1 SET p.age = $new_age",
2002            params,
2003        )
2004        .unwrap();
2005
2006        let result = conn.query("MATCH (p:Person) RETURN p.age").unwrap();
2007        assert_eq!(result.row(0)[0], TypedValue::Int64(31));
2008    }
2009
2010    #[test]
2011    fn unresolved_param_error() {
2012        let db = Database::in_memory();
2013        let conn = db.connect();
2014        let result = conn.query("RETURN $missing AS val");
2015        assert!(result.is_err());
2016        assert!(
2017            result
2018                .unwrap_err()
2019                .to_string()
2020                .contains("unresolved parameter")
2021        );
2022    }
2023
2024    #[test]
2025    fn env_resolved() {
2026        let db = Database::in_memory();
2027        let conn = db.connect();
2028        let mut env = std::collections::HashMap::new();
2029        env.insert(
2030            "GREETING".to_string(),
2031            TypedValue::String(SmolStr::new("hello")),
2032        );
2033        let result = conn
2034            .execute(
2035                "RETURN env('GREETING') AS val",
2036                std::collections::HashMap::new(),
2037                env,
2038            )
2039            .unwrap();
2040        assert_eq!(result.num_rows(), 1);
2041        assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("hello")));
2042    }
2043
2044    #[test]
2045    fn env_missing_returns_null() {
2046        let db = Database::in_memory();
2047        let conn = db.connect();
2048        let result = conn
2049            .execute(
2050                "RETURN env('MISSING') AS val",
2051                std::collections::HashMap::new(),
2052                std::collections::HashMap::new(),
2053            )
2054            .unwrap();
2055        assert_eq!(result.num_rows(), 1);
2056        assert_eq!(result.row(0)[0], TypedValue::Null);
2057    }
2058
2059    // ---- apply_delta tests ----
2060
2061    #[test]
2062    fn delta_upsert_new_nodes() {
2063        use kyu_delta::DeltaBatchBuilder;
2064
2065        let db = Database::in_memory();
2066        let conn = db.connect();
2067        conn.query("CREATE NODE TABLE Function (name STRING, lines INT64, PRIMARY KEY (name))")
2068            .unwrap();
2069
2070        let batch = DeltaBatchBuilder::new("file:main.rs", 1)
2071            .upsert_node(
2072                "Function",
2073                "main",
2074                vec![],
2075                [("lines", TypedValue::Int64(42))],
2076            )
2077            .upsert_node(
2078                "Function",
2079                "helper",
2080                vec![],
2081                [("lines", TypedValue::Int64(10))],
2082            )
2083            .build();
2084
2085        let stats = conn.apply_delta(batch).unwrap();
2086        assert_eq!(stats.nodes_created, 2);
2087        assert_eq!(stats.nodes_updated, 0);
2088
2089        let result = conn
2090            .query("MATCH (f:Function) RETURN f.name, f.lines")
2091            .unwrap();
2092        assert_eq!(result.num_rows(), 2);
2093    }
2094
2095    #[test]
2096    fn delta_upsert_existing_node_merges() {
2097        use kyu_delta::DeltaBatchBuilder;
2098
2099        let db = Database::in_memory();
2100        let conn = db.connect();
2101        conn.query("CREATE NODE TABLE Function (name STRING, lines INT64, PRIMARY KEY (name))")
2102            .unwrap();
2103
2104        // Create initial node.
2105        let batch1 = DeltaBatchBuilder::new("file:main.rs", 1)
2106            .upsert_node(
2107                "Function",
2108                "main",
2109                vec![],
2110                [("lines", TypedValue::Int64(42))],
2111            )
2112            .build();
2113        conn.apply_delta(batch1).unwrap();
2114
2115        // Upsert same node with updated lines.
2116        let batch2 = DeltaBatchBuilder::new("file:main.rs", 2)
2117            .upsert_node(
2118                "Function",
2119                "main",
2120                vec![],
2121                [("lines", TypedValue::Int64(50))],
2122            )
2123            .build();
2124        let stats = conn.apply_delta(batch2).unwrap();
2125        assert_eq!(stats.nodes_created, 0);
2126        assert_eq!(stats.nodes_updated, 1);
2127
2128        let result = conn
2129            .query("MATCH (f:Function) WHERE f.name = 'main' RETURN f.lines")
2130            .unwrap();
2131        assert_eq!(result.num_rows(), 1);
2132        assert_eq!(result.row(0)[0], TypedValue::Int64(50));
2133    }
2134
2135    #[test]
2136    fn delta_delete_node() {
2137        use kyu_delta::DeltaBatchBuilder;
2138
2139        let db = Database::in_memory();
2140        let conn = db.connect();
2141        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
2142            .unwrap();
2143        conn.query("CREATE (n:Person {id: 1, name: 'Alice'})")
2144            .unwrap();
2145        conn.query("CREATE (n:Person {id: 2, name: 'Bob'})")
2146            .unwrap();
2147
2148        let batch = DeltaBatchBuilder::new("cleanup", 1)
2149            .delete_node("Person", "1")
2150            .build();
2151        let stats = conn.apply_delta(batch).unwrap();
2152        assert_eq!(stats.nodes_deleted, 1);
2153
2154        let result = conn.query("MATCH (p:Person) RETURN p.name").unwrap();
2155        assert_eq!(result.num_rows(), 1);
2156        assert_eq!(result.row(0)[0], TypedValue::String(SmolStr::new("Bob")));
2157    }
2158
2159    #[test]
2160    fn delta_upsert_and_delete_edges() {
2161        use kyu_delta::DeltaBatchBuilder;
2162
2163        let db = Database::in_memory();
2164        let conn = db.connect();
2165        conn.query("CREATE NODE TABLE Person (id INT64, PRIMARY KEY (id))")
2166            .unwrap();
2167        conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person, since INT64)")
2168            .unwrap();
2169        conn.query("CREATE (n:Person {id: 1})").unwrap();
2170        conn.query("CREATE (n:Person {id: 2})").unwrap();
2171
2172        // Create edge via delta.
2173        let batch = DeltaBatchBuilder::new("social", 1)
2174            .upsert_edge(
2175                "Person",
2176                "1",
2177                "KNOWS",
2178                "Person",
2179                "2",
2180                [("since", TypedValue::Int64(2024))],
2181            )
2182            .build();
2183        let stats = conn.apply_delta(batch).unwrap();
2184        assert_eq!(stats.edges_created, 1);
2185
2186        // Verify edge exists.
2187        let storage = db.storage().read().unwrap();
2188        let catalog = db.catalog().read();
2189        let rel_table_id = catalog.find_by_name("KNOWS").unwrap().table_id();
2190        let rows = storage.scan_rows(rel_table_id).unwrap();
2191        assert_eq!(rows.len(), 1);
2192        assert_eq!(rows[0].1[0], TypedValue::Int64(1)); // src
2193        assert_eq!(rows[0].1[1], TypedValue::Int64(2)); // dst
2194        assert_eq!(rows[0].1[2], TypedValue::Int64(2024)); // since
2195        drop(storage);
2196        drop(catalog);
2197
2198        // Update edge property.
2199        let batch2 = DeltaBatchBuilder::new("social", 2)
2200            .upsert_edge(
2201                "Person",
2202                "1",
2203                "KNOWS",
2204                "Person",
2205                "2",
2206                [("since", TypedValue::Int64(2025))],
2207            )
2208            .build();
2209        let stats2 = conn.apply_delta(batch2).unwrap();
2210        assert_eq!(stats2.edges_updated, 1);
2211
2212        let storage = db.storage().read().unwrap();
2213        let rows = storage.scan_rows(rel_table_id).unwrap();
2214        assert_eq!(rows[0].1[2], TypedValue::Int64(2025));
2215        drop(storage);
2216
2217        // Delete edge.
2218        let batch3 = DeltaBatchBuilder::new("social", 3)
2219            .delete_edge("Person", "1", "KNOWS", "Person", "2")
2220            .build();
2221        let stats3 = conn.apply_delta(batch3).unwrap();
2222        assert_eq!(stats3.edges_deleted, 1);
2223
2224        let storage = db.storage().read().unwrap();
2225        let rows = storage.scan_rows(rel_table_id).unwrap();
2226        assert_eq!(rows.len(), 0);
2227    }
2228
2229    #[test]
2230    fn delta_idempotent_replay() {
2231        use kyu_delta::DeltaBatchBuilder;
2232
2233        let db = Database::in_memory();
2234        let conn = db.connect();
2235        conn.query("CREATE NODE TABLE File (path STRING, hash STRING, PRIMARY KEY (path))")
2236            .unwrap();
2237
2238        let batch = DeltaBatchBuilder::new("watcher", 100)
2239            .upsert_node(
2240                "File",
2241                "src/main.rs",
2242                vec![],
2243                [("hash", TypedValue::String(SmolStr::new("abc123")))],
2244            )
2245            .build();
2246
2247        // Apply once.
2248        let stats1 = conn.apply_delta(batch.clone()).unwrap();
2249        assert_eq!(stats1.nodes_created, 1);
2250
2251        // Apply again — same batch is idempotent (update, not create).
2252        let stats2 = conn.apply_delta(batch).unwrap();
2253        assert_eq!(stats2.nodes_created, 0);
2254        assert_eq!(stats2.nodes_updated, 1);
2255
2256        // Still only one row.
2257        let result = conn.query("MATCH (f:File) RETURN f.path").unwrap();
2258        assert_eq!(result.num_rows(), 1);
2259    }
2260
2261    #[test]
2262    fn delta_stats_correct() {
2263        use kyu_delta::DeltaBatchBuilder;
2264
2265        let db = Database::in_memory();
2266        let conn = db.connect();
2267        conn.query("CREATE NODE TABLE Person (id INT64, name STRING, PRIMARY KEY (id))")
2268            .unwrap();
2269        conn.query("CREATE REL TABLE KNOWS (FROM Person TO Person)")
2270            .unwrap();
2271
2272        let batch = DeltaBatchBuilder::new("test", 1)
2273            .upsert_node(
2274                "Person",
2275                "1",
2276                vec![],
2277                [("name", TypedValue::String(SmolStr::new("Alice")))],
2278            )
2279            .upsert_node(
2280                "Person",
2281                "2",
2282                vec![],
2283                [("name", TypedValue::String(SmolStr::new("Bob")))],
2284            )
2285            .upsert_edge(
2286                "Person",
2287                "1",
2288                "KNOWS",
2289                "Person",
2290                "2",
2291                Vec::<(&str, TypedValue)>::new(),
2292            )
2293            .build();
2294
2295        let stats = conn.apply_delta(batch).unwrap();
2296        assert_eq!(stats.nodes_created, 2);
2297        assert_eq!(stats.edges_created, 1);
2298        assert_eq!(stats.total_deltas, 3);
2299        assert!(stats.elapsed_micros > 0);
2300    }
2301}