Skip to main content

contextdb_engine/
executor.rs

1use crate::database::{Database, QueryResult};
2use crate::sync_types::ConflictPolicy;
3use contextdb_core::*;
4use contextdb_parser::ast::{
5    AlterAction, BinOp, ColumnRef, Cte, DataType, Expr, Literal, SelectStatement,
6    SetDiskLimitValue, SetMemoryLimitValue, SortDirection, Statement, UnaryOp,
7};
8use contextdb_planner::{
9    DeletePlan, GraphStepPlan, InsertPlan, OnConflictPlan, PhysicalPlan, UpdatePlan, plan,
10};
11use roaring::RoaringTreemap;
12use std::cmp::Ordering;
13use std::collections::{BTreeSet, HashMap, HashSet};
14use std::time::{SystemTime, UNIX_EPOCH};
15use time::OffsetDateTime;
16use time::format_description::well_known::Rfc3339;
17
18pub(crate) fn execute_plan(
19    db: &Database,
20    plan: &PhysicalPlan,
21    params: &HashMap<String, Value>,
22    tx: Option<TxId>,
23) -> Result<QueryResult> {
24    match plan {
25        PhysicalPlan::CreateTable(p) => {
26            db.check_disk_budget("CREATE TABLE")?;
27            let expires_column = expires_column_name(&p.columns)?;
28            let meta = TableMeta {
29                columns: p
30                    .columns
31                    .iter()
32                    .map(|c| contextdb_core::ColumnDef {
33                        name: c.name.clone(),
34                        column_type: map_column_type(&c.data_type),
35                        nullable: c.nullable,
36                        primary_key: c.primary_key,
37                        unique: c.unique,
38                        default: c.default.as_ref().map(stored_default_expr),
39                        references: c.references.as_ref().map(|reference| {
40                            contextdb_core::ForeignKeyReference {
41                                table: reference.table.clone(),
42                                column: reference.column.clone(),
43                            }
44                        }),
45                        expires: c.expires,
46                    })
47                    .collect(),
48                immutable: p.immutable,
49                state_machine: p.state_machine.as_ref().map(|sm| StateMachineConstraint {
50                    column: sm.column.clone(),
51                    transitions: sm
52                        .transitions
53                        .iter()
54                        .map(|(from, tos)| (from.clone(), tos.clone()))
55                        .collect(),
56                }),
57                dag_edge_types: p.dag_edge_types.clone(),
58                unique_constraints: p.unique_constraints.clone(),
59                natural_key_column: None,
60                propagation_rules: p.propagation_rules.clone(),
61                default_ttl_seconds: p.retain.as_ref().map(|r| r.duration_seconds),
62                sync_safe: p.retain.as_ref().is_some_and(|r| r.sync_safe),
63                expires_column,
64            };
65            let metadata_bytes = meta.estimated_bytes();
66            db.accountant().try_allocate_for(
67                metadata_bytes,
68                "ddl",
69                "create_table",
70                "Reduce schema size or raise MEMORY_LIMIT before creating more tables.",
71            )?;
72            db.relational_store().create_table(&p.name, meta);
73            if let Some(table_meta) = db.table_meta(&p.name) {
74                db.persist_table_meta(&p.name, &table_meta)?;
75                db.allocate_ddl_lsn(|lsn| db.log_create_table_ddl(&p.name, &table_meta, lsn));
76            }
77            Ok(QueryResult::empty_with_affected(0))
78        }
79        PhysicalPlan::DropTable(name) => {
80            let bytes_to_release = estimate_drop_table_bytes(db, name);
81            db.drop_table_aux_state(name);
82            db.relational_store().drop_table(name);
83            db.remove_persisted_table(name)?;
84            db.allocate_ddl_lsn(|lsn| db.log_drop_table_ddl(name, lsn));
85            db.accountant().release(bytes_to_release);
86            Ok(QueryResult::empty_with_affected(0))
87        }
88        PhysicalPlan::AlterTable(p) => {
89            db.check_disk_budget("ALTER TABLE")?;
90            let store = db.relational_store();
91            match &p.action {
92                AlterAction::AddColumn(col) => {
93                    if col.primary_key {
94                        return Err(Error::Other(
95                            "adding a primary key column via ALTER TABLE is not supported"
96                                .to_string(),
97                        ));
98                    }
99                    validate_expires_column(col)?;
100                    let core_col = contextdb_core::ColumnDef {
101                        name: col.name.clone(),
102                        column_type: map_column_type(&col.data_type),
103                        nullable: col.nullable,
104                        primary_key: col.primary_key,
105                        unique: col.unique,
106                        default: col.default.as_ref().map(stored_default_expr),
107                        references: col.references.as_ref().map(|reference| {
108                            contextdb_core::ForeignKeyReference {
109                                table: reference.table.clone(),
110                                column: reference.column.clone(),
111                            }
112                        }),
113                        expires: col.expires,
114                    };
115                    store
116                        .alter_table_add_column(&p.table, core_col)
117                        .map_err(Error::Other)?;
118                    if col.expires {
119                        let mut meta = store.table_meta.write();
120                        let table_meta = meta.get_mut(&p.table).ok_or_else(|| {
121                            Error::Other(format!("table '{}' not found", p.table))
122                        })?;
123                        table_meta.expires_column = Some(col.name.clone());
124                    }
125                }
126                AlterAction::DropColumn(name) => {
127                    store
128                        .alter_table_drop_column(&p.table, name)
129                        .map_err(Error::Other)?;
130                    let mut meta = store.table_meta.write();
131                    if let Some(table_meta) = meta.get_mut(&p.table)
132                        && table_meta.expires_column.as_deref() == Some(name.as_str())
133                    {
134                        table_meta.expires_column = None;
135                    }
136                }
137                AlterAction::RenameColumn { from, to } => {
138                    store
139                        .alter_table_rename_column(&p.table, from, to)
140                        .map_err(Error::Other)?;
141                    let mut meta = store.table_meta.write();
142                    if let Some(table_meta) = meta.get_mut(&p.table)
143                        && table_meta.expires_column.as_deref() == Some(from.as_str())
144                    {
145                        table_meta.expires_column = Some(to.clone());
146                    }
147                }
148                AlterAction::SetRetain {
149                    duration_seconds,
150                    sync_safe,
151                } => {
152                    let mut meta = store.table_meta.write();
153                    let table_meta = meta
154                        .get_mut(&p.table)
155                        .ok_or_else(|| Error::Other(format!("table '{}' not found", p.table)))?;
156                    if table_meta.immutable {
157                        return Err(Error::Other(
158                            "IMMUTABLE and RETAIN are mutually exclusive".to_string(),
159                        ));
160                    }
161                    table_meta.default_ttl_seconds = Some(*duration_seconds);
162                    table_meta.sync_safe = *sync_safe;
163                }
164                AlterAction::DropRetain => {
165                    let mut meta = store.table_meta.write();
166                    let table_meta = meta
167                        .get_mut(&p.table)
168                        .ok_or_else(|| Error::Other(format!("table '{}' not found", p.table)))?;
169                    table_meta.default_ttl_seconds = None;
170                    table_meta.sync_safe = false;
171                }
172                AlterAction::SetSyncConflictPolicy(policy) => {
173                    let cp = parse_conflict_policy(policy)?;
174                    db.set_table_conflict_policy(&p.table, cp);
175                }
176                AlterAction::DropSyncConflictPolicy => {
177                    db.drop_table_conflict_policy(&p.table);
178                }
179            }
180            if let Some(table_meta) = db.table_meta(&p.table) {
181                db.persist_table_meta(&p.table, &table_meta)?;
182                if !matches!(
183                    p.action,
184                    AlterAction::AddColumn(_)
185                        | AlterAction::SetRetain { .. }
186                        | AlterAction::DropRetain
187                        | AlterAction::SetSyncConflictPolicy(_)
188                        | AlterAction::DropSyncConflictPolicy
189                ) {
190                    db.persist_table_rows(&p.table)?;
191                }
192                db.allocate_ddl_lsn(|lsn| db.log_alter_table_ddl(&p.table, &table_meta, lsn));
193            }
194            Ok(QueryResult::empty_with_affected(0))
195        }
196        PhysicalPlan::Insert(p) => exec_insert(db, p, params, tx),
197        PhysicalPlan::Delete(p) => exec_delete(db, p, params, tx),
198        PhysicalPlan::Update(p) => exec_update(db, p, params, tx),
199        PhysicalPlan::Scan { table, filter, .. } => {
200            if table == "dual" {
201                return Ok(QueryResult {
202                    columns: vec![],
203                    rows: vec![vec![]],
204                    rows_affected: 0,
205                });
206            }
207            let snapshot = db.snapshot();
208            let rows = db.scan(table, snapshot)?;
209            let schema_columns = db.table_meta(table).map(|meta| {
210                meta.columns
211                    .into_iter()
212                    .map(|column| column.name)
213                    .collect::<Vec<_>>()
214            });
215            let resolved_filter = filter
216                .as_ref()
217                .map(|expr| resolve_in_subqueries(db, expr, params, tx))
218                .transpose()?;
219            materialize_rows(
220                rows,
221                resolved_filter.as_ref(),
222                params,
223                schema_columns.as_deref(),
224            )
225        }
226        PhysicalPlan::GraphBfs {
227            start_alias,
228            start_expr,
229            start_candidates,
230            steps,
231            filter,
232        } => {
233            let start_uuids = match resolve_uuid(start_expr, params) {
234                Ok(start) => vec![start],
235                Err(Error::PlanError(_))
236                    if matches!(
237                        start_expr,
238                        Expr::Column(contextdb_parser::ast::ColumnRef { table: None, .. })
239                    ) =>
240                {
241                    // Start node not directly specified — check if a subquery or filter can help
242                    if let Some(candidate_plan) = start_candidates {
243                        resolve_graph_start_nodes_from_plan(db, candidate_plan, params, tx)?
244                    } else if let Some(filter_expr) = filter {
245                        let resolved_filter = resolve_in_subqueries(db, filter_expr, params, tx)?;
246                        resolve_graph_start_nodes_from_filter(db, &resolved_filter, params)?
247                    } else {
248                        vec![]
249                    }
250                }
251                Err(err) => return Err(err),
252            };
253            if start_uuids.is_empty() {
254                return Ok(QueryResult {
255                    columns: vec!["id".to_string(), "depth".to_string()],
256                    rows: vec![],
257                    rows_affected: 0,
258                });
259            }
260            let snapshot = db.snapshot();
261            let mut frontier = start_uuids
262                .into_iter()
263                .map(|id| (HashMap::from([(start_alias.clone(), id)]), id, 0_u32))
264                .collect::<Vec<_>>();
265            let bfs_bytes = estimate_bfs_working_bytes(&frontier, steps);
266            db.accountant().try_allocate_for(
267                bfs_bytes,
268                "bfs_frontier",
269                "graph_bfs",
270                "Reduce traversal depth/fan-out or raise MEMORY_LIMIT before running BFS.",
271            )?;
272
273            let result = (|| {
274                for step in steps {
275                    let edge_types_ref = if step.edge_types.is_empty() {
276                        None
277                    } else {
278                        Some(step.edge_types.as_slice())
279                    };
280                    let mut next = Vec::new();
281
282                    for (bindings, start, base_depth) in &frontier {
283                        let res = db.graph().bfs(
284                            *start,
285                            edge_types_ref,
286                            step.direction,
287                            step.min_depth,
288                            step.max_depth,
289                            snapshot,
290                        )?;
291                        for node in res.nodes {
292                            let total_depth = base_depth.saturating_add(node.depth);
293                            let mut next_bindings = bindings.clone();
294                            next_bindings.insert(step.target_alias.clone(), node.id);
295                            next.push((next_bindings, node.id, total_depth));
296                        }
297                    }
298
299                    frontier = dedupe_graph_frontier(next, steps);
300                    if frontier.is_empty() {
301                        break;
302                    }
303                }
304
305                let mut columns =
306                    steps
307                        .iter()
308                        .fold(vec![format!("{start_alias}.id")], |mut cols, step| {
309                            cols.push(format!("{}.id", step.target_alias));
310                            cols
311                        });
312                columns.push("id".to_string());
313                columns.push("depth".to_string());
314
315                Ok(QueryResult {
316                    columns,
317                    rows: project_graph_frontier_rows(frontier, start_alias, steps)?,
318                    rows_affected: 0,
319                })
320            })();
321            db.accountant().release(bfs_bytes);
322
323            result
324        }
325        PhysicalPlan::VectorSearch {
326            table,
327            query_expr,
328            k,
329            candidates,
330            ..
331        }
332        | PhysicalPlan::HnswSearch {
333            table,
334            query_expr,
335            k,
336            candidates,
337            ..
338        } => {
339            let query_vec = resolve_vector_from_expr(query_expr, params)?;
340            let snapshot = db.snapshot();
341            let all_rows = db.scan(table, snapshot)?;
342            let candidate_bitmap = if let Some(cands_plan) = candidates {
343                let qr = execute_plan(db, cands_plan, params, tx)?;
344                let mut bm = RoaringTreemap::new();
345                let row_id_idx = qr.columns.iter().position(|column| {
346                    column == "row_id" || column.rsplit('.').next() == Some("row_id")
347                });
348                let id_idx = qr
349                    .columns
350                    .iter()
351                    .position(|column| column == "id" || column.rsplit('.').next() == Some("id"));
352
353                if let Some(idx) = row_id_idx {
354                    for row in qr.rows {
355                        if let Some(Value::Int64(id)) = row.get(idx) {
356                            bm.insert(*id as u64);
357                        }
358                    }
359                } else if let Some(idx) = id_idx {
360                    let uuid_to_row_id: HashMap<uuid::Uuid, u64> = all_rows
361                        .iter()
362                        .filter_map(|row| match row.values.get("id") {
363                            Some(Value::Uuid(uuid)) => Some((*uuid, row.row_id)),
364                            _ => None,
365                        })
366                        .collect();
367                    for row in qr.rows {
368                        if let Some(Value::Uuid(uuid)) = row.get(idx)
369                            && let Some(row_id) = uuid_to_row_id.get(uuid)
370                        {
371                            bm.insert(*row_id);
372                        }
373                    }
374                }
375                Some(bm)
376            } else {
377                None
378            };
379
380            let vector_bytes = estimate_vector_search_bytes(query_vec.len(), *k as usize);
381            db.accountant().try_allocate_for(
382                vector_bytes,
383                "vector_search",
384                "search",
385                "Reduce LIMIT/dimensionality or raise MEMORY_LIMIT before vector search.",
386            )?;
387            let res = db.query_vector(
388                &query_vec,
389                *k as usize,
390                candidate_bitmap.as_ref(),
391                db.snapshot(),
392            );
393            db.accountant().release(vector_bytes);
394            let res = res?;
395
396            // Re-materialize: look up actual rows by row_id so SELECT * returns user columns
397            let schema_columns = db.table_meta(table).map(|meta| {
398                meta.columns
399                    .into_iter()
400                    .map(|column| column.name)
401                    .collect::<Vec<_>>()
402            });
403            let keys = if let Some(ref sc) = schema_columns {
404                sc.clone()
405            } else {
406                let mut ks = BTreeSet::new();
407                for r in &all_rows {
408                    for k in r.values.keys() {
409                        ks.insert(k.clone());
410                    }
411                }
412                ks.into_iter().collect::<Vec<_>>()
413            };
414
415            let row_map: HashMap<u64, &VersionedRow> =
416                all_rows.iter().map(|r| (r.row_id, r)).collect();
417
418            let mut columns = vec!["row_id".to_string()];
419            columns.extend(keys.iter().cloned());
420            columns.push("score".to_string());
421
422            let rows = res
423                .into_iter()
424                .filter_map(|(rid, score)| {
425                    row_map.get(&rid).map(|row| {
426                        let mut out = vec![Value::Int64(rid as i64)];
427                        for k in &keys {
428                            out.push(row.values.get(k).cloned().unwrap_or(Value::Null));
429                        }
430                        out.push(Value::Float64(score as f64));
431                        out
432                    })
433                })
434                .collect();
435
436            Ok(QueryResult {
437                columns,
438                rows,
439                rows_affected: 0,
440            })
441        }
442        PhysicalPlan::MaterializeCte { input, .. } => execute_plan(db, input, params, tx),
443        PhysicalPlan::Project { input, columns } => {
444            let input_result = execute_plan(db, input, params, tx)?;
445            let has_aggregate = columns.iter().any(|column| {
446                matches!(
447                    &column.expr,
448                    Expr::FunctionCall { name, .. } if name.eq_ignore_ascii_case("count")
449                )
450            });
451            if has_aggregate {
452                if columns.iter().any(|column| {
453                    !matches!(
454                        &column.expr,
455                        Expr::FunctionCall { name, .. } if name.eq_ignore_ascii_case("count")
456                    )
457                }) {
458                    return Err(Error::PlanError(
459                        "mixed aggregate and non-aggregate columns without GROUP BY".to_string(),
460                    ));
461                }
462
463                let output_columns = columns
464                    .iter()
465                    .map(|column| {
466                        column.alias.clone().unwrap_or_else(|| match &column.expr {
467                            Expr::FunctionCall { name, .. } => name.clone(),
468                            _ => "expr".to_string(),
469                        })
470                    })
471                    .collect::<Vec<_>>();
472
473                let aggregate_row = columns
474                    .iter()
475                    .map(|column| match &column.expr {
476                        Expr::FunctionCall { name: _, args } => {
477                            let count = if matches!(
478                                args.as_slice(),
479                                [Expr::Column(contextdb_parser::ast::ColumnRef { table: None, column })]
480                                if column == "*"
481                            ) {
482                                input_result.rows.len() as i64
483                            } else {
484                                input_result
485                                    .rows
486                                    .iter()
487                                    .filter_map(|row| {
488                                        args.first().map(|arg| {
489                                            eval_query_result_expr(
490                                                arg,
491                                                row,
492                                                &input_result.columns,
493                                                params,
494                                            )
495                                        })
496                                    })
497                                    .collect::<Result<Vec<_>>>()?
498                                    .into_iter()
499                                    .filter(|value| *value != Value::Null)
500                                    .count() as i64
501                            };
502                            Ok(Value::Int64(count))
503                        }
504                        _ => Err(Error::PlanError(
505                            "mixed aggregate and non-aggregate columns without GROUP BY"
506                                .to_string(),
507                        )),
508                    })
509                    .collect::<Result<Vec<_>>>()?;
510
511                return Ok(QueryResult {
512                    columns: output_columns,
513                    rows: vec![aggregate_row],
514                    rows_affected: 0,
515                });
516            }
517
518            let output_columns = columns
519                .iter()
520                .map(|c| {
521                    c.alias.clone().unwrap_or_else(|| match &c.expr {
522                        Expr::Column(col) => col.column.clone(),
523                        _ => "expr".to_string(),
524                    })
525                })
526                .collect::<Vec<_>>();
527
528            let mut output_rows = Vec::with_capacity(input_result.rows.len());
529            for row in &input_result.rows {
530                let mut projected = Vec::with_capacity(columns.len());
531                for col in columns {
532                    projected.push(eval_project_expr(
533                        &col.expr,
534                        row,
535                        &input_result.columns,
536                        params,
537                    )?);
538                }
539                output_rows.push(projected);
540            }
541
542            Ok(QueryResult {
543                columns: output_columns,
544                rows: output_rows,
545                rows_affected: 0,
546            })
547        }
548        PhysicalPlan::Sort { input, keys } => {
549            let mut input_result = execute_plan(db, input, params, tx)?;
550            input_result.rows.sort_by(|left, right| {
551                for key in keys {
552                    let Expr::Column(column_ref) = &key.expr else {
553                        return Ordering::Equal;
554                    };
555                    let left_value =
556                        match lookup_query_result_column(left, &input_result.columns, column_ref) {
557                            Ok(value) => value,
558                            Err(_) => return Ordering::Equal,
559                        };
560                    let right_value = match lookup_query_result_column(
561                        right,
562                        &input_result.columns,
563                        column_ref,
564                    ) {
565                        Ok(value) => value,
566                        Err(_) => return Ordering::Equal,
567                    };
568                    let ordering = compare_sort_values(&left_value, &right_value, key.direction);
569                    if ordering != Ordering::Equal {
570                        return ordering;
571                    }
572                }
573                Ordering::Equal
574            });
575            Ok(input_result)
576        }
577        PhysicalPlan::Limit { input, count } => {
578            let mut input_result = execute_plan(db, input, params, tx)?;
579            input_result.rows.truncate(*count as usize);
580            Ok(input_result)
581        }
582        PhysicalPlan::Filter { input, predicate } => {
583            let mut input_result = execute_plan(db, input, params, tx)?;
584            input_result.rows.retain(|row| {
585                query_result_row_matches(row, &input_result.columns, predicate, params)
586                    .unwrap_or(false)
587            });
588            Ok(input_result)
589        }
590        PhysicalPlan::Distinct { input } => {
591            let input_result = execute_plan(db, input, params, tx)?;
592            let mut seen = HashSet::<Vec<u8>>::new();
593            let rows = input_result
594                .rows
595                .into_iter()
596                .filter(|row| seen.insert(distinct_row_key(row)))
597                .collect();
598            Ok(QueryResult {
599                columns: input_result.columns,
600                rows,
601                rows_affected: input_result.rows_affected,
602            })
603        }
604        PhysicalPlan::Join {
605            left,
606            right,
607            condition,
608            join_type,
609            left_alias,
610            right_alias,
611        } => {
612            let left_result = execute_plan(db, left, params, tx)?;
613            let right_result = execute_plan(db, right, params, tx)?;
614            let right_duplicate_names =
615                duplicate_column_names(&left_result.columns, &right_result.columns);
616            let right_prefix = right_alias
617                .clone()
618                .unwrap_or_else(|| right_table_name(right));
619            let right_columns = right_result
620                .columns
621                .iter()
622                .map(|column| {
623                    if right_duplicate_names.contains(column) {
624                        format!("{right_prefix}.{column}")
625                    } else {
626                        column.clone()
627                    }
628                })
629                .collect::<Vec<_>>();
630
631            let mut columns = left_result.columns.clone();
632            columns.extend(right_columns);
633
634            let mut rows = Vec::new();
635            for left_row in &left_result.rows {
636                let mut matched = false;
637                for right_row in &right_result.rows {
638                    let combined = concatenate_rows(left_row, right_row);
639                    if query_result_row_matches(&combined, &columns, condition, params)? {
640                        matched = true;
641                        rows.push(combined);
642                    }
643                }
644
645                if !matched && matches!(join_type, contextdb_planner::JoinType::Left) {
646                    let mut combined = left_row.clone();
647                    combined.extend(std::iter::repeat_n(Value::Null, right_result.columns.len()));
648                    rows.push(combined);
649                }
650            }
651
652            let output_columns = qualify_join_columns(
653                &columns,
654                &left_result.columns,
655                &right_result.columns,
656                left_alias,
657                &right_prefix,
658            );
659
660            Ok(QueryResult {
661                columns: output_columns,
662                rows,
663                rows_affected: 0,
664            })
665        }
666        PhysicalPlan::CreateIndex(_) => Ok(QueryResult::empty_with_affected(0)),
667        PhysicalPlan::SetMemoryLimit(val) => {
668            let limit = match val {
669                SetMemoryLimitValue::Bytes(bytes) => Some(*bytes),
670                SetMemoryLimitValue::None => None,
671            };
672            db.accountant().set_budget(limit)?;
673            db.persist_memory_limit(limit)?;
674            Ok(QueryResult::empty())
675        }
676        PhysicalPlan::ShowMemoryLimit => {
677            let usage = db.accountant().usage();
678            Ok(QueryResult {
679                columns: vec![
680                    "limit".to_string(),
681                    "used".to_string(),
682                    "available".to_string(),
683                    "startup_ceiling".to_string(),
684                ],
685                rows: vec![vec![
686                    usage
687                        .limit
688                        .map(|value| Value::Int64(value as i64))
689                        .unwrap_or_else(|| Value::Text("none".to_string())),
690                    Value::Int64(usage.used as i64),
691                    usage
692                        .available
693                        .map(|value| Value::Int64(value as i64))
694                        .unwrap_or_else(|| Value::Text("none".to_string())),
695                    usage
696                        .startup_ceiling
697                        .map(|value| Value::Int64(value as i64))
698                        .unwrap_or_else(|| Value::Text("none".to_string())),
699                ]],
700                rows_affected: 0,
701            })
702        }
703        PhysicalPlan::SetDiskLimit(val) => {
704            let limit = match val {
705                SetDiskLimitValue::Bytes(bytes) => Some(*bytes),
706                SetDiskLimitValue::None => None,
707            };
708            db.set_disk_limit(limit)?;
709            db.persist_disk_limit(limit)?;
710            Ok(QueryResult::empty())
711        }
712        PhysicalPlan::ShowDiskLimit => {
713            let limit = db.disk_limit();
714            let used = db.disk_file_size();
715            let startup_ceiling = db.disk_limit_startup_ceiling();
716            Ok(QueryResult {
717                columns: vec![
718                    "limit".to_string(),
719                    "used".to_string(),
720                    "available".to_string(),
721                    "startup_ceiling".to_string(),
722                ],
723                rows: vec![vec![
724                    limit
725                        .map(|value| Value::Int64(value as i64))
726                        .unwrap_or_else(|| Value::Text("none".to_string())),
727                    used.map(|value| Value::Int64(value as i64))
728                        .unwrap_or(Value::Null),
729                    match (limit, used) {
730                        (Some(limit), Some(used)) => {
731                            Value::Int64(limit.saturating_sub(used) as i64)
732                        }
733                        _ => Value::Null,
734                    },
735                    startup_ceiling
736                        .map(|value| Value::Int64(value as i64))
737                        .unwrap_or_else(|| Value::Text("none".to_string())),
738                ]],
739                rows_affected: 0,
740            })
741        }
742        PhysicalPlan::SetSyncConflictPolicy(policy) => {
743            let cp = parse_conflict_policy(policy)?;
744            db.set_default_conflict_policy(cp);
745            Ok(QueryResult::empty())
746        }
747        PhysicalPlan::ShowSyncConflictPolicy => {
748            let policies = db.conflict_policies();
749            let default_str = conflict_policy_to_string(policies.default);
750            let mut rows = vec![vec![Value::Text(default_str)]];
751            for (table, policy) in &policies.per_table {
752                rows.push(vec![Value::Text(format!(
753                    "{}={}",
754                    table,
755                    conflict_policy_to_string(*policy)
756                ))]);
757            }
758            Ok(QueryResult {
759                columns: vec!["policy".to_string()],
760                rows,
761                rows_affected: 0,
762            })
763        }
764        PhysicalPlan::Pipeline(plans) => {
765            let mut last = QueryResult::empty();
766            for p in plans {
767                last = execute_plan(db, p, params, tx)?;
768            }
769            Ok(last)
770        }
771        _ => Err(Error::PlanError(
772            "unsupported plan node in executor".to_string(),
773        )),
774    }
775}
776
777fn eval_project_expr(
778    expr: &Expr,
779    row: &[Value],
780    input_columns: &[String],
781    params: &HashMap<String, Value>,
782) -> Result<Value> {
783    match expr {
784        Expr::Column(c) => lookup_query_result_column(row, input_columns, c),
785        Expr::Literal(lit) => resolve_expr(&Expr::Literal(lit.clone()), params),
786        Expr::Parameter(name) => params
787            .get(name)
788            .cloned()
789            .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", name))),
790        Expr::BinaryOp { left, op, right } => {
791            let left = eval_query_result_expr(left, row, input_columns, params)?;
792            let right = eval_query_result_expr(right, row, input_columns, params)?;
793            eval_binary_op(op, &left, &right)
794        }
795        Expr::UnaryOp { op, operand } => {
796            let value = eval_query_result_expr(operand, row, input_columns, params)?;
797            match op {
798                UnaryOp::Not => Ok(Value::Bool(!value_to_bool(&value))),
799                UnaryOp::Neg => match value {
800                    Value::Int64(v) => Ok(Value::Int64(-v)),
801                    Value::Float64(v) => Ok(Value::Float64(-v)),
802                    _ => Err(Error::PlanError(
803                        "cannot negate non-numeric value".to_string(),
804                    )),
805                },
806            }
807        }
808        Expr::FunctionCall { name, args } => {
809            let values = args
810                .iter()
811                .map(|arg| eval_query_result_expr(arg, row, input_columns, params))
812                .collect::<Result<Vec<_>>>()?;
813            eval_function(name, &values)
814        }
815        Expr::IsNull { expr, negated } => {
816            let is_null = eval_query_result_expr(expr, row, input_columns, params)? == Value::Null;
817            Ok(Value::Bool(if *negated { !is_null } else { is_null }))
818        }
819        Expr::InList {
820            expr,
821            list,
822            negated,
823        } => {
824            let needle = eval_query_result_expr(expr, row, input_columns, params)?;
825            let matched = list.iter().try_fold(false, |found, item| {
826                if found {
827                    Ok(true)
828                } else {
829                    let candidate = eval_query_result_expr(item, row, input_columns, params)?;
830                    Ok(
831                        matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
832                            || (needle != Value::Null
833                                && candidate != Value::Null
834                                && needle == candidate),
835                    )
836                }
837            })?;
838            Ok(Value::Bool(if *negated { !matched } else { matched }))
839        }
840        Expr::Like {
841            expr,
842            pattern,
843            negated,
844        } => {
845            let matches = match (
846                eval_query_result_expr(expr, row, input_columns, params)?,
847                eval_query_result_expr(pattern, row, input_columns, params)?,
848            ) {
849                (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
850                _ => false,
851            };
852            Ok(Value::Bool(if *negated { !matches } else { matches }))
853        }
854        _ => resolve_expr(expr, params),
855    }
856}
857
858fn eval_query_result_expr(
859    expr: &Expr,
860    row: &[Value],
861    input_columns: &[String],
862    params: &HashMap<String, Value>,
863) -> Result<Value> {
864    match expr {
865        Expr::Column(c) => lookup_query_result_column(row, input_columns, c),
866        Expr::Literal(lit) => resolve_expr(&Expr::Literal(lit.clone()), params),
867        Expr::Parameter(name) => params
868            .get(name)
869            .cloned()
870            .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", name))),
871        Expr::FunctionCall { name, args } => {
872            let values = args
873                .iter()
874                .map(|arg| eval_query_result_expr(arg, row, input_columns, params))
875                .collect::<Result<Vec<_>>>()?;
876            eval_function(name, &values)
877        }
878        _ => resolve_expr(expr, params),
879    }
880}
881
882fn exec_insert(
883    db: &Database,
884    p: &InsertPlan,
885    params: &HashMap<String, Value>,
886    tx: Option<TxId>,
887) -> Result<QueryResult> {
888    db.check_disk_budget("INSERT")?;
889    let txid = tx.ok_or_else(|| Error::Other("missing tx for insert".to_string()))?;
890
891    // When no column list is provided (INSERT INTO t VALUES (...)),
892    // infer column names from table metadata in declaration order.
893    let columns: Vec<String> = if p.columns.is_empty() {
894        let meta = db
895            .table_meta(&p.table)
896            .ok_or_else(|| Error::TableNotFound(p.table.clone()))?;
897        meta.columns.iter().map(|c| c.name.clone()).collect()
898    } else {
899        p.columns.clone()
900    };
901
902    let mut rows_affected = 0;
903    for row in &p.values {
904        let mut values = HashMap::new();
905        for (idx, expr) in row.iter().enumerate() {
906            let col = columns
907                .get(idx)
908                .ok_or_else(|| Error::PlanError("column/value count mismatch".to_string()))?;
909            let v = resolve_expr(expr, params)?;
910            values.insert(col.clone(), coerce_value_for_column(db, &p.table, col, v)?);
911        }
912
913        apply_missing_column_defaults(db, &p.table, &mut values)?;
914
915        validate_vector_columns(db, &p.table, &values)?;
916        let row_bytes = estimate_table_row_bytes(db, &p.table, &values)?;
917        db.accountant().try_allocate_for(
918            row_bytes,
919            "insert",
920            "row_insert",
921            "Reduce row size or raise MEMORY_LIMIT before inserting more data.",
922        )?;
923        let checkpoint = db.write_set_checkpoint(txid)?;
924
925        let row_id = if let Some(on_conflict) = &p.on_conflict {
926            let conflict_col = &on_conflict.columns[0];
927            let conflict_value = values
928                .get(conflict_col)
929                .ok_or_else(|| Error::Other("conflict column not in values".to_string()))?;
930            let existing =
931                db.point_lookup(&p.table, conflict_col, conflict_value, db.snapshot())?;
932            let existing_row_id = existing.as_ref().map(|row| row.row_id);
933            let existing_has_vector = existing
934                .as_ref()
935                .is_some_and(|row| db.has_live_vector(row.row_id, db.snapshot()));
936            let upsert_values = if let Some(existing_row) = existing.as_ref() {
937                apply_on_conflict_updates(
938                    db,
939                    &p.table,
940                    values.clone(),
941                    existing_row,
942                    on_conflict,
943                    params,
944                )?
945            } else {
946                values.clone()
947            };
948
949            match db.upsert_row(txid, &p.table, conflict_col, upsert_values) {
950                Ok(UpsertResult::Inserted) => {
951                    db.point_lookup_in_tx(
952                        txid,
953                        &p.table,
954                        conflict_col,
955                        conflict_value,
956                        db.snapshot(),
957                    )?
958                    .ok_or_else(|| {
959                        Error::Other("inserted upsert row not visible in tx".to_string())
960                    })?
961                    .row_id
962                }
963                Ok(UpsertResult::Updated) => {
964                    if existing_has_vector && let Some(existing_row_id) = existing_row_id {
965                        db.delete_vector(txid, existing_row_id)?;
966                    }
967                    db.point_lookup_in_tx(
968                        txid,
969                        &p.table,
970                        conflict_col,
971                        conflict_value,
972                        db.snapshot(),
973                    )?
974                    .ok_or_else(|| {
975                        Error::Other("updated upsert row not visible in tx".to_string())
976                    })?
977                    .row_id
978                }
979                Ok(UpsertResult::NoOp) => {
980                    db.accountant().release(row_bytes);
981                    0
982                }
983                Err(err) => {
984                    db.accountant().release(row_bytes);
985                    return Err(err);
986                }
987            }
988        } else {
989            match db.insert_row(txid, &p.table, values.clone()) {
990                Ok(row_id) => row_id,
991                Err(err) => {
992                    db.accountant().release(row_bytes);
993                    return Err(err);
994                }
995            }
996        };
997
998        if should_route_insert_to_graph(db, &p.table)
999            && let (
1000                Some(Value::Uuid(source)),
1001                Some(Value::Uuid(target)),
1002                Some(Value::Text(edge_type)),
1003            ) = (
1004                values.get("source_id"),
1005                values.get("target_id"),
1006                values.get("edge_type"),
1007            )
1008        {
1009            match db.insert_edge(txid, *source, *target, edge_type.clone(), HashMap::new()) {
1010                Ok(true) => {}
1011                Ok(false) => {
1012                    let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1013                    db.accountant().release(row_bytes);
1014                    continue;
1015                }
1016                Err(err) => {
1017                    let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1018                    db.accountant().release(row_bytes);
1019                    return Err(err);
1020                }
1021            }
1022        }
1023
1024        if let Some(v) = vector_value_for_table(db, &p.table, &values)
1025            && row_id != 0
1026            && let Err(err) = db.insert_vector(txid, row_id, v.clone())
1027        {
1028            let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1029            db.accountant().release(row_bytes);
1030            return Err(err);
1031        }
1032
1033        rows_affected += 1;
1034    }
1035
1036    Ok(QueryResult::empty_with_affected(rows_affected))
1037}
1038
1039fn exec_delete(
1040    db: &Database,
1041    p: &DeletePlan,
1042    params: &HashMap<String, Value>,
1043    tx: Option<TxId>,
1044) -> Result<QueryResult> {
1045    let txid = tx.ok_or_else(|| Error::Other("missing tx for delete".to_string()))?;
1046    let snapshot = db.snapshot();
1047    let rows = db.scan(&p.table, snapshot)?;
1048    let resolved_where = p
1049        .where_clause
1050        .as_ref()
1051        .map(|expr| resolve_in_subqueries(db, expr, params, tx))
1052        .transpose()?;
1053    let matched: Vec<_> = rows
1054        .into_iter()
1055        .filter(|r| {
1056            resolved_where
1057                .as_ref()
1058                .is_none_or(|w| row_matches(r, w, params).unwrap_or(false))
1059        })
1060        .collect();
1061
1062    for row in &matched {
1063        if db.has_live_vector(row.row_id, snapshot) {
1064            db.delete_vector(txid, row.row_id)?;
1065        }
1066        db.delete_row(txid, &p.table, row.row_id)?;
1067    }
1068
1069    Ok(QueryResult::empty_with_affected(matched.len() as u64))
1070}
1071
1072fn exec_update(
1073    db: &Database,
1074    p: &UpdatePlan,
1075    params: &HashMap<String, Value>,
1076    tx: Option<TxId>,
1077) -> Result<QueryResult> {
1078    db.check_disk_budget("UPDATE")?;
1079    let txid = tx.ok_or_else(|| Error::Other("missing tx for update".to_string()))?;
1080    let snapshot = db.snapshot();
1081    let rows = db.scan(&p.table, snapshot)?;
1082    let resolved_where = p
1083        .where_clause
1084        .as_ref()
1085        .map(|expr| resolve_in_subqueries(db, expr, params, tx))
1086        .transpose()?;
1087    let matched: Vec<_> = rows
1088        .into_iter()
1089        .filter(|r| {
1090            resolved_where
1091                .as_ref()
1092                .is_none_or(|w| row_matches(r, w, params).unwrap_or(false))
1093        })
1094        .collect();
1095
1096    for row in &matched {
1097        let mut values = row.values.clone();
1098        for (k, vexpr) in &p.assignments {
1099            let value = eval_assignment_expr(vexpr, &row.values, params)?;
1100            values.insert(k.clone(), coerce_value_for_column(db, &p.table, k, value)?);
1101        }
1102        validate_update_state_transition(db, &p.table, row, &values)?;
1103        let row_uuid = values.get("id").and_then(Value::as_uuid).copied();
1104        let new_state = db
1105            .table_meta(&p.table)
1106            .as_ref()
1107            .and_then(|meta| meta.state_machine.as_ref())
1108            .and_then(|sm| values.get(&sm.column))
1109            .and_then(Value::as_text)
1110            .map(std::borrow::ToOwned::to_owned);
1111
1112        let old_has_vector = db.has_live_vector(row.row_id, snapshot);
1113        validate_vector_columns(db, &p.table, &values)?;
1114        let new_vector = vector_value_for_table(db, &p.table, &values).cloned();
1115        let new_row_bytes = estimate_table_row_bytes(db, &p.table, &values)?;
1116        db.accountant().try_allocate_for(
1117            new_row_bytes,
1118            "update",
1119            "row_replace",
1120            "Reduce row growth or raise MEMORY_LIMIT before updating this row.",
1121        )?;
1122        let checkpoint = db.write_set_checkpoint(txid)?;
1123
1124        if let Err(err) = db.delete_row(txid, &p.table, row.row_id) {
1125            db.accountant().release(new_row_bytes);
1126            return Err(err);
1127        }
1128        if old_has_vector && let Err(err) = db.delete_vector(txid, row.row_id) {
1129            db.accountant().release(new_row_bytes);
1130            let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1131            return Err(err);
1132        }
1133
1134        let new_row_id = match db.insert_row(txid, &p.table, values) {
1135            Ok(row_id) => row_id,
1136            Err(err) => {
1137                db.accountant().release(new_row_bytes);
1138                let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1139                return Err(err);
1140            }
1141        };
1142        if let Some(vector) = new_vector
1143            && let Err(err) = db.insert_vector(txid, new_row_id, vector)
1144        {
1145            db.accountant().release(new_row_bytes);
1146            let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1147            return Err(err);
1148        }
1149        if let Err(err) =
1150            db.propagate_state_change_if_needed(txid, &p.table, row_uuid, new_state.as_deref())
1151        {
1152            db.accountant().release(new_row_bytes);
1153            let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1154            return Err(err);
1155        }
1156    }
1157
1158    Ok(QueryResult::empty_with_affected(matched.len() as u64))
1159}
1160
1161fn estimate_table_row_bytes(
1162    db: &Database,
1163    table: &str,
1164    values: &HashMap<String, Value>,
1165) -> Result<usize> {
1166    let meta = db
1167        .table_meta(table)
1168        .ok_or_else(|| Error::TableNotFound(table.to_string()))?;
1169    Ok(estimate_row_bytes_for_meta(values, &meta, false))
1170}
1171
1172fn validate_update_state_transition(
1173    db: &Database,
1174    table: &str,
1175    existing: &VersionedRow,
1176    next_values: &HashMap<String, Value>,
1177) -> Result<()> {
1178    let Some(meta) = db.table_meta(table) else {
1179        return Ok(());
1180    };
1181    let Some(state_machine) = meta.state_machine else {
1182        return Ok(());
1183    };
1184
1185    let old_state = existing
1186        .values
1187        .get(&state_machine.column)
1188        .and_then(Value::as_text);
1189    let new_state = next_values
1190        .get(&state_machine.column)
1191        .and_then(Value::as_text);
1192
1193    let (Some(old_state), Some(new_state)) = (old_state, new_state) else {
1194        return Ok(());
1195    };
1196
1197    if old_state == new_state
1198        || db.relational_store().validate_state_transition(
1199            table,
1200            &state_machine.column,
1201            old_state,
1202            new_state,
1203        )
1204    {
1205        return Ok(());
1206    }
1207
1208    Err(Error::InvalidStateTransition(format!(
1209        "{old_state} -> {new_state}"
1210    )))
1211}
1212
1213fn estimate_row_bytes_for_meta(
1214    values: &HashMap<String, Value>,
1215    meta: &TableMeta,
1216    include_vectors: bool,
1217) -> usize {
1218    let mut bytes = 96usize;
1219    for column in &meta.columns {
1220        let Some(value) = values.get(&column.name) else {
1221            continue;
1222        };
1223        if !include_vectors && matches!(column.column_type, ColumnType::Vector(_)) {
1224            continue;
1225        }
1226        bytes = bytes.saturating_add(32 + column.name.len() * 8 + value.estimated_bytes());
1227    }
1228    bytes
1229}
1230
1231fn estimate_vector_search_bytes(dimension: usize, k: usize) -> usize {
1232    k.saturating_mul(3)
1233        .saturating_mul(dimension)
1234        .saturating_mul(std::mem::size_of::<f32>())
1235}
1236
1237fn estimate_bfs_working_bytes<T>(
1238    frontier: &[T],
1239    steps: &[contextdb_planner::GraphStepPlan],
1240) -> usize {
1241    let max_hops = steps.iter().fold(0usize, |acc, step| {
1242        acc.saturating_add(step.max_depth as usize)
1243    });
1244    frontier
1245        .len()
1246        .saturating_mul(2048)
1247        .saturating_mul(max_hops.max(1))
1248}
1249
1250fn dedupe_graph_frontier(
1251    frontier: Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>,
1252    steps: &[contextdb_planner::GraphStepPlan],
1253) -> Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)> {
1254    let mut best =
1255        HashMap::<Vec<uuid::Uuid>, (HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>::new();
1256
1257    for (bindings, current_id, depth) in frontier {
1258        let mut key = Vec::with_capacity(steps.len());
1259        for step in steps {
1260            if let Some(id) = bindings.get(&step.target_alias) {
1261                key.push(*id);
1262            }
1263        }
1264
1265        best.entry(key)
1266            .and_modify(|existing| {
1267                if depth < existing.2 {
1268                    *existing = (bindings.clone(), current_id, depth);
1269                }
1270            })
1271            .or_insert((bindings, current_id, depth));
1272    }
1273
1274    best.into_values().collect()
1275}
1276
1277fn estimate_drop_table_bytes(db: &Database, table: &str) -> usize {
1278    let meta = db.table_meta(table);
1279    let metadata_bytes = meta.as_ref().map(TableMeta::estimated_bytes).unwrap_or(0);
1280    let snapshot = db.snapshot();
1281    let rows = db.scan(table, snapshot).unwrap_or_default();
1282    let row_bytes = rows.iter().fold(0usize, |acc, row| {
1283        acc.saturating_add(meta.as_ref().map_or_else(
1284            || row.estimated_bytes(),
1285            |meta| estimate_row_bytes_for_meta(&row.values, meta, false),
1286        ))
1287    });
1288    let vector_bytes = rows
1289        .iter()
1290        .filter_map(|row| db.live_vector_entry(row.row_id, snapshot))
1291        .fold(0usize, |acc, entry| {
1292            acc.saturating_add(entry.estimated_bytes())
1293        });
1294    let edge_bytes = rows.iter().fold(0usize, |acc, row| {
1295        match (
1296            row.values.get("source_id").and_then(Value::as_uuid),
1297            row.values.get("target_id").and_then(Value::as_uuid),
1298            row.values.get("edge_type").and_then(Value::as_text),
1299        ) {
1300            (Some(_), Some(_), Some(edge_type)) => acc.saturating_add(
1301                96 + edge_type.len().saturating_mul(16) + estimate_row_value_bytes(&HashMap::new()),
1302            ),
1303            _ => acc,
1304        }
1305    });
1306    metadata_bytes
1307        .saturating_add(row_bytes)
1308        .saturating_add(vector_bytes)
1309        .saturating_add(edge_bytes)
1310}
1311
1312fn materialize_rows(
1313    rows: Vec<VersionedRow>,
1314    filter: Option<&Expr>,
1315    params: &HashMap<String, Value>,
1316    schema_columns: Option<&[String]>,
1317) -> Result<QueryResult> {
1318    let filtered: Vec<VersionedRow> = rows
1319        .into_iter()
1320        .filter(|r| filter.is_none_or(|f| row_matches(r, f, params).unwrap_or(false)))
1321        .collect();
1322
1323    let keys = if let Some(schema_columns) = schema_columns {
1324        schema_columns.to_vec()
1325    } else {
1326        let mut keys = BTreeSet::new();
1327        for r in &filtered {
1328            for k in r.values.keys() {
1329                keys.insert(k.clone());
1330            }
1331        }
1332        keys.into_iter().collect::<Vec<_>>()
1333    };
1334
1335    let mut columns = vec!["row_id".to_string()];
1336    columns.extend(keys.iter().cloned());
1337
1338    let rows = filtered
1339        .into_iter()
1340        .map(|r| {
1341            let mut out = vec![Value::Int64(r.row_id as i64)];
1342            for k in &keys {
1343                out.push(r.values.get(k).cloned().unwrap_or(Value::Null));
1344            }
1345            out
1346        })
1347        .collect();
1348
1349    Ok(QueryResult {
1350        columns,
1351        rows,
1352        rows_affected: 0,
1353    })
1354}
1355
1356fn row_matches(row: &VersionedRow, expr: &Expr, params: &HashMap<String, Value>) -> Result<bool> {
1357    Ok(eval_bool_expr(row, expr, params)?.unwrap_or(false))
1358}
1359
1360fn eval_expr_value(
1361    row: &VersionedRow,
1362    expr: &Expr,
1363    params: &HashMap<String, Value>,
1364) -> Result<Value> {
1365    match expr {
1366        Expr::Column(c) => {
1367            if c.column == "row_id" {
1368                Ok(Value::Int64(row.row_id as i64))
1369            } else {
1370                Ok(row.values.get(&c.column).cloned().unwrap_or(Value::Null))
1371            }
1372        }
1373        Expr::BinaryOp { left, op, right } => {
1374            let left = eval_expr_value(row, left, params)?;
1375            let right = eval_expr_value(row, right, params)?;
1376            eval_binary_op(op, &left, &right)
1377        }
1378        Expr::UnaryOp { op, operand } => {
1379            let value = eval_expr_value(row, operand, params)?;
1380            match op {
1381                UnaryOp::Not => Ok(Value::Bool(!value_to_bool(&value))),
1382                UnaryOp::Neg => match value {
1383                    Value::Int64(v) => Ok(Value::Int64(-v)),
1384                    Value::Float64(v) => Ok(Value::Float64(-v)),
1385                    _ => Err(Error::PlanError(
1386                        "cannot negate non-numeric value".to_string(),
1387                    )),
1388                },
1389            }
1390        }
1391        Expr::FunctionCall { name, args } => eval_function_in_row_context(row, name, args, params),
1392        Expr::IsNull { expr, negated } => {
1393            let is_null = eval_expr_value(row, expr, params)? == Value::Null;
1394            Ok(Value::Bool(if *negated { !is_null } else { is_null }))
1395        }
1396        Expr::InList {
1397            expr,
1398            list,
1399            negated,
1400        } => {
1401            let needle = eval_expr_value(row, expr, params)?;
1402            let matched = list.iter().try_fold(false, |found, item| {
1403                if found {
1404                    Ok(true)
1405                } else {
1406                    let candidate = eval_expr_value(row, item, params)?;
1407                    Ok(
1408                        matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
1409                            || (needle != Value::Null
1410                                && candidate != Value::Null
1411                                && needle == candidate),
1412                    )
1413                }
1414            })?;
1415            Ok(Value::Bool(if *negated { !matched } else { matched }))
1416        }
1417        Expr::Like {
1418            expr,
1419            pattern,
1420            negated,
1421        } => {
1422            let matches = match (
1423                eval_expr_value(row, expr, params)?,
1424                eval_expr_value(row, pattern, params)?,
1425            ) {
1426                (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
1427                _ => false,
1428            };
1429            Ok(Value::Bool(if *negated { !matches } else { matches }))
1430        }
1431        _ => resolve_expr(expr, params),
1432    }
1433}
1434
1435pub fn resolve_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Value> {
1436    match expr {
1437        Expr::Literal(l) => Ok(match l {
1438            Literal::Null => Value::Null,
1439            Literal::Bool(v) => Value::Bool(*v),
1440            Literal::Integer(v) => Value::Int64(*v),
1441            Literal::Real(v) => Value::Float64(*v),
1442            Literal::Text(v) => Value::Text(v.clone()),
1443            Literal::Vector(v) => Value::Vector(v.clone()),
1444        }),
1445        Expr::Parameter(p) => params
1446            .get(p)
1447            .cloned()
1448            .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", p))),
1449        Expr::Column(c) => Ok(Value::Text(c.column.clone())),
1450        Expr::UnaryOp { op, operand } => match op {
1451            UnaryOp::Neg => match resolve_expr(operand, params)? {
1452                Value::Int64(v) => Ok(Value::Int64(-v)),
1453                Value::Float64(v) => Ok(Value::Float64(-v)),
1454                _ => Err(Error::PlanError(
1455                    "cannot negate non-numeric value".to_string(),
1456                )),
1457            },
1458            UnaryOp::Not => Err(Error::PlanError(
1459                "boolean NOT requires row context".to_string(),
1460            )),
1461        },
1462        Expr::FunctionCall { name, args } => {
1463            let values = args
1464                .iter()
1465                .map(|arg| resolve_expr(arg, params))
1466                .collect::<Result<Vec<_>>>()?;
1467            eval_function(name, &values)
1468        }
1469        Expr::CosineDistance { right, .. } => resolve_expr(right, params),
1470        _ => Err(Error::PlanError("unsupported expression".to_string())),
1471    }
1472}
1473
1474fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
1475    match (a, b) {
1476        (Value::Int64(left), Value::Int64(right)) => Some(left.cmp(right)),
1477        (Value::Float64(left), Value::Float64(right)) => Some(left.total_cmp(right)),
1478        (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
1479        (Value::Timestamp(left), Value::Timestamp(right)) => Some(left.cmp(right)),
1480        (Value::Int64(left), Value::Float64(right)) => Some((*left as f64).total_cmp(right)),
1481        (Value::Float64(left), Value::Int64(right)) => Some(left.total_cmp(&(*right as f64))),
1482        (Value::Timestamp(left), Value::Int64(right)) => Some(left.cmp(right)),
1483        (Value::Int64(left), Value::Timestamp(right)) => Some(left.cmp(right)),
1484        (Value::Bool(left), Value::Bool(right)) => Some(left.cmp(right)),
1485        (Value::Uuid(left), Value::Uuid(right)) => Some(left.cmp(right)),
1486        (Value::Uuid(u), Value::Text(t)) => {
1487            if let Ok(parsed) = t.parse::<uuid::Uuid>() {
1488                Some(u.cmp(&parsed))
1489            } else {
1490                None
1491            }
1492        }
1493        (Value::Text(t), Value::Uuid(u)) => {
1494            if let Ok(parsed) = t.parse::<uuid::Uuid>() {
1495                Some(parsed.cmp(u))
1496            } else {
1497                None
1498            }
1499        }
1500        (Value::Null, _) | (_, Value::Null) => None,
1501        _ => None,
1502    }
1503}
1504
1505fn eval_bool_expr(
1506    row: &VersionedRow,
1507    expr: &Expr,
1508    params: &HashMap<String, Value>,
1509) -> Result<Option<bool>> {
1510    match expr {
1511        Expr::BinaryOp { left, op, right } => match op {
1512            BinOp::Eq | BinOp::Neq | BinOp::Lt | BinOp::Lte | BinOp::Gt | BinOp::Gte => {
1513                let left = eval_expr_value(row, left, params)?;
1514                let right = eval_expr_value(row, right, params)?;
1515                if left == Value::Null || right == Value::Null {
1516                    return Ok(None);
1517                }
1518
1519                let result = match op {
1520                    BinOp::Eq => {
1521                        compare_values(&left, &right) == Some(Ordering::Equal) || left == right
1522                    }
1523                    BinOp::Neq => {
1524                        !(compare_values(&left, &right) == Some(Ordering::Equal) || left == right)
1525                    }
1526                    BinOp::Lt => compare_values(&left, &right) == Some(Ordering::Less),
1527                    BinOp::Lte => matches!(
1528                        compare_values(&left, &right),
1529                        Some(Ordering::Less | Ordering::Equal)
1530                    ),
1531                    BinOp::Gt => compare_values(&left, &right) == Some(Ordering::Greater),
1532                    BinOp::Gte => matches!(
1533                        compare_values(&left, &right),
1534                        Some(Ordering::Greater | Ordering::Equal)
1535                    ),
1536                    BinOp::And | BinOp::Or => unreachable!(),
1537                };
1538                Ok(Some(result))
1539            }
1540            BinOp::And => {
1541                let left = eval_bool_expr(row, left, params)?;
1542                if left == Some(false) {
1543                    return Ok(Some(false));
1544                }
1545                let right = eval_bool_expr(row, right, params)?;
1546                Ok(match (left, right) {
1547                    (Some(true), Some(true)) => Some(true),
1548                    (Some(true), other) => other,
1549                    (None, Some(false)) => Some(false),
1550                    (None, Some(true)) | (None, None) => None,
1551                    (Some(false), _) => Some(false),
1552                })
1553            }
1554            BinOp::Or => {
1555                let left = eval_bool_expr(row, left, params)?;
1556                if left == Some(true) {
1557                    return Ok(Some(true));
1558                }
1559                let right = eval_bool_expr(row, right, params)?;
1560                Ok(match (left, right) {
1561                    (Some(false), Some(false)) => Some(false),
1562                    (Some(false), other) => other,
1563                    (None, Some(true)) => Some(true),
1564                    (None, Some(false)) | (None, None) => None,
1565                    (Some(true), _) => Some(true),
1566                })
1567            }
1568        },
1569        Expr::UnaryOp {
1570            op: UnaryOp::Not,
1571            operand,
1572        } => Ok(eval_bool_expr(row, operand, params)?.map(|value| !value)),
1573        Expr::InList {
1574            expr,
1575            list,
1576            negated,
1577        } => {
1578            let needle = eval_expr_value(row, expr, params)?;
1579            if needle == Value::Null {
1580                return Ok(None);
1581            }
1582
1583            let matched = list.iter().try_fold(false, |found, item| {
1584                if found {
1585                    Ok(true)
1586                } else {
1587                    let candidate = eval_expr_value(row, item, params)?;
1588                    Ok(
1589                        matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
1590                            || (candidate != Value::Null && needle == candidate),
1591                    )
1592                }
1593            })?;
1594            Ok(Some(if *negated { !matched } else { matched }))
1595        }
1596        Expr::InSubquery { .. } => Err(Error::PlanError(
1597            "IN (subquery) must be resolved before execution".to_string(),
1598        )),
1599        Expr::Like {
1600            expr,
1601            pattern,
1602            negated,
1603        } => {
1604            let left = eval_expr_value(row, expr, params)?;
1605            let right = eval_expr_value(row, pattern, params)?;
1606            let matched = match (left, right) {
1607                (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
1608                _ => false,
1609            };
1610            Ok(Some(if *negated { !matched } else { matched }))
1611        }
1612        Expr::IsNull { expr, negated } => {
1613            let is_null = eval_expr_value(row, expr, params)? == Value::Null;
1614            Ok(Some(if *negated { !is_null } else { is_null }))
1615        }
1616        Expr::FunctionCall { .. } => match eval_expr_value(row, expr, params)? {
1617            Value::Bool(value) => Ok(Some(value)),
1618            Value::Null => Ok(None),
1619            _ => Err(Error::PlanError(format!(
1620                "unsupported WHERE expression: {:?}",
1621                expr
1622            ))),
1623        },
1624        _ => Err(Error::PlanError(format!(
1625            "unsupported WHERE expression: {:?}",
1626            expr
1627        ))),
1628    }
1629}
1630
1631fn eval_binary_op(op: &BinOp, left: &Value, right: &Value) -> Result<Value> {
1632    let bool_value = match op {
1633        BinOp::Eq => {
1634            if left == &Value::Null || right == &Value::Null {
1635                false
1636            } else {
1637                compare_values(left, right) == Some(Ordering::Equal) || left == right
1638            }
1639        }
1640        BinOp::Neq => {
1641            if left == &Value::Null || right == &Value::Null {
1642                false
1643            } else {
1644                !(compare_values(left, right) == Some(Ordering::Equal) || left == right)
1645            }
1646        }
1647        BinOp::Lt => compare_values(left, right) == Some(Ordering::Less),
1648        BinOp::Lte => matches!(
1649            compare_values(left, right),
1650            Some(Ordering::Less | Ordering::Equal)
1651        ),
1652        BinOp::Gt => compare_values(left, right) == Some(Ordering::Greater),
1653        BinOp::Gte => matches!(
1654            compare_values(left, right),
1655            Some(Ordering::Greater | Ordering::Equal)
1656        ),
1657        BinOp::And => value_to_bool(left) && value_to_bool(right),
1658        BinOp::Or => value_to_bool(left) || value_to_bool(right),
1659    };
1660    Ok(Value::Bool(bool_value))
1661}
1662
1663fn value_to_bool(value: &Value) -> bool {
1664    matches!(value, Value::Bool(true))
1665}
1666
1667fn compare_sort_values(left: &Value, right: &Value, direction: SortDirection) -> Ordering {
1668    match (left, right) {
1669        (Value::Null, Value::Null) => Ordering::Equal,
1670        (Value::Null, _) => match direction {
1671            SortDirection::Asc => Ordering::Greater,
1672            SortDirection::Desc => Ordering::Less,
1673            SortDirection::CosineDistance => Ordering::Equal,
1674        },
1675        (_, Value::Null) => match direction {
1676            SortDirection::Asc => Ordering::Less,
1677            SortDirection::Desc => Ordering::Greater,
1678            SortDirection::CosineDistance => Ordering::Equal,
1679        },
1680        _ => {
1681            let ordering = compare_values(left, right).unwrap_or(Ordering::Equal);
1682            match direction {
1683                SortDirection::Asc => ordering,
1684                SortDirection::Desc => ordering.reverse(),
1685                SortDirection::CosineDistance => ordering,
1686            }
1687        }
1688    }
1689}
1690
1691fn eval_assignment_expr(
1692    expr: &Expr,
1693    row_values: &HashMap<String, Value>,
1694    params: &HashMap<String, Value>,
1695) -> Result<Value> {
1696    match expr {
1697        Expr::Literal(lit) => literal_to_value(lit),
1698        Expr::Parameter(name) => params
1699            .get(name)
1700            .cloned()
1701            .ok_or_else(|| Error::Other(format!("unknown parameter: {}", name))),
1702        Expr::Column(col_ref) => row_values
1703            .get(&col_ref.column)
1704            .cloned()
1705            .ok_or_else(|| Error::Other(format!("column not found: {}", col_ref.column))),
1706        Expr::BinaryOp { left, op, right } => {
1707            let left = eval_assignment_expr(left, row_values, params)?;
1708            let right = eval_assignment_expr(right, row_values, params)?;
1709            eval_binary_op(op, &left, &right)
1710        }
1711        Expr::UnaryOp { op, operand } => match op {
1712            UnaryOp::Neg => match eval_assignment_expr(operand, row_values, params)? {
1713                Value::Int64(value) => Ok(Value::Int64(-value)),
1714                Value::Float64(value) => Ok(Value::Float64(-value)),
1715                _ => Err(Error::Other(format!(
1716                    "unsupported expression in UPDATE SET: {:?}",
1717                    expr
1718                ))),
1719            },
1720            UnaryOp::Not => Err(Error::Other(format!(
1721                "unsupported expression in UPDATE SET: {:?}",
1722                expr
1723            ))),
1724        },
1725        Expr::FunctionCall { name, args } => {
1726            let evaluated = args
1727                .iter()
1728                .map(|arg| eval_assignment_expr(arg, row_values, params))
1729                .collect::<Result<Vec<_>>>()?;
1730            eval_function(name, &evaluated)
1731        }
1732        _ => Err(Error::Other(format!(
1733            "unsupported expression in UPDATE SET: {:?}",
1734            expr
1735        ))),
1736    }
1737}
1738
1739fn apply_on_conflict_updates(
1740    db: &Database,
1741    table: &str,
1742    mut insert_values: HashMap<String, Value>,
1743    existing_row: &VersionedRow,
1744    on_conflict: &OnConflictPlan,
1745    params: &HashMap<String, Value>,
1746) -> Result<HashMap<String, Value>> {
1747    if on_conflict.update_columns.is_empty() {
1748        return Ok(insert_values);
1749    }
1750
1751    let mut merged = existing_row.values.clone();
1752    for (column, expr) in &on_conflict.update_columns {
1753        let value = eval_assignment_expr(expr, &existing_row.values, params)?;
1754        merged.insert(
1755            column.clone(),
1756            coerce_value_for_column(db, table, column, value)?,
1757        );
1758    }
1759
1760    for (column, value) in insert_values.drain() {
1761        merged.entry(column).or_insert(value);
1762    }
1763
1764    Ok(merged)
1765}
1766
1767fn literal_to_value(lit: &Literal) -> Result<Value> {
1768    Ok(match lit {
1769        Literal::Null => Value::Null,
1770        Literal::Bool(v) => Value::Bool(*v),
1771        Literal::Integer(v) => Value::Int64(*v),
1772        Literal::Real(v) => Value::Float64(*v),
1773        Literal::Text(v) => Value::Text(v.clone()),
1774        Literal::Vector(v) => Value::Vector(v.clone()),
1775    })
1776}
1777
1778fn eval_arithmetic(name: &str, args: &[Value]) -> Result<Value> {
1779    let [left, right] = args else {
1780        return Err(Error::PlanError(format!(
1781            "function {} expects 2 arguments",
1782            name
1783        )));
1784    };
1785
1786    match (left, right) {
1787        (Value::Int64(left), Value::Int64(right)) => match name {
1788            "__add" => Ok(Value::Int64(left + right)),
1789            "__sub" => Ok(Value::Int64(left - right)),
1790            "__mul" => Ok(Value::Int64(left * right)),
1791            "__div" => Ok(Value::Int64(left / right)),
1792            _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1793        },
1794        (Value::Float64(left), Value::Float64(right)) => match name {
1795            "__add" => Ok(Value::Float64(left + right)),
1796            "__sub" => Ok(Value::Float64(left - right)),
1797            "__mul" => Ok(Value::Float64(left * right)),
1798            "__div" => Ok(Value::Float64(left / right)),
1799            _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1800        },
1801        (Value::Int64(left), Value::Float64(right)) => match name {
1802            "__add" => Ok(Value::Float64(*left as f64 + right)),
1803            "__sub" => Ok(Value::Float64(*left as f64 - right)),
1804            "__mul" => Ok(Value::Float64(*left as f64 * right)),
1805            "__div" => Ok(Value::Float64(*left as f64 / right)),
1806            _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1807        },
1808        (Value::Float64(left), Value::Int64(right)) => match name {
1809            "__add" => Ok(Value::Float64(left + *right as f64)),
1810            "__sub" => Ok(Value::Float64(left - *right as f64)),
1811            "__mul" => Ok(Value::Float64(left * *right as f64)),
1812            "__div" => Ok(Value::Float64(left / *right as f64)),
1813            _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1814        },
1815        _ => Err(Error::PlanError(format!(
1816            "function {} expects numeric arguments",
1817            name
1818        ))),
1819    }
1820}
1821
1822fn eval_function_in_row_context(
1823    row: &VersionedRow,
1824    name: &str,
1825    args: &[Expr],
1826    params: &HashMap<String, Value>,
1827) -> Result<Value> {
1828    let values = args
1829        .iter()
1830        .map(|arg| eval_expr_value(row, arg, params))
1831        .collect::<Result<Vec<_>>>()?;
1832    eval_function(name, &values)
1833}
1834
1835fn eval_function(name: &str, args: &[Value]) -> Result<Value> {
1836    match name.to_ascii_lowercase().as_str() {
1837        "__add" | "__sub" | "__mul" | "__div" => eval_arithmetic(name, args),
1838        "coalesce" => Ok(args
1839            .iter()
1840            .find(|value| **value != Value::Null)
1841            .cloned()
1842            .unwrap_or(Value::Null)),
1843        "now" => Ok(Value::Timestamp(
1844            SystemTime::now()
1845                .duration_since(UNIX_EPOCH)
1846                .map_err(|err| Error::PlanError(err.to_string()))?
1847                .as_secs() as i64,
1848        )),
1849        _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1850    }
1851}
1852
1853fn like_matches(value: &str, pattern: &str) -> bool {
1854    let value_chars = value.chars().collect::<Vec<_>>();
1855    let pattern_chars = pattern.chars().collect::<Vec<_>>();
1856    let (mut vi, mut pi) = (0usize, 0usize);
1857    let (mut star_idx, mut match_idx) = (None, 0usize);
1858
1859    while vi < value_chars.len() {
1860        if pi < pattern_chars.len()
1861            && (pattern_chars[pi] == '_' || pattern_chars[pi] == value_chars[vi])
1862        {
1863            vi += 1;
1864            pi += 1;
1865        } else if pi < pattern_chars.len() && pattern_chars[pi] == '%' {
1866            star_idx = Some(pi);
1867            match_idx = vi;
1868            pi += 1;
1869        } else if let Some(star) = star_idx {
1870            pi = star + 1;
1871            match_idx += 1;
1872            vi = match_idx;
1873        } else {
1874            return false;
1875        }
1876    }
1877
1878    while pi < pattern_chars.len() && pattern_chars[pi] == '%' {
1879        pi += 1;
1880    }
1881
1882    pi == pattern_chars.len()
1883}
1884
1885fn resolve_in_subqueries(
1886    db: &Database,
1887    expr: &Expr,
1888    params: &HashMap<String, Value>,
1889    tx: Option<TxId>,
1890) -> Result<Expr> {
1891    resolve_in_subqueries_with_ctes(db, expr, params, tx, &[])
1892}
1893
1894pub(crate) fn resolve_in_subqueries_with_ctes(
1895    db: &Database,
1896    expr: &Expr,
1897    params: &HashMap<String, Value>,
1898    tx: Option<TxId>,
1899    ctes: &[Cte],
1900) -> Result<Expr> {
1901    match expr {
1902        Expr::InSubquery {
1903            expr,
1904            subquery,
1905            negated,
1906        } => {
1907            // Detect correlated subqueries: WHERE references to outer tables
1908            let mut subquery_tables: std::collections::HashSet<String> = subquery
1909                .from
1910                .iter()
1911                .filter_map(|item| match item {
1912                    contextdb_parser::ast::FromItem::Table { name, .. } => Some(name.clone()),
1913                    _ => None,
1914                })
1915                .collect();
1916            // CTE names are valid table references within the subquery
1917            for cte in ctes {
1918                match cte {
1919                    Cte::SqlCte { name, .. } | Cte::MatchCte { name, .. } => {
1920                        subquery_tables.insert(name.clone());
1921                    }
1922                }
1923            }
1924            if let Some(where_clause) = &subquery.where_clause
1925                && has_outer_table_ref(where_clause, &subquery_tables)
1926            {
1927                return Err(Error::Other(
1928                    "correlated subqueries are not supported".to_string(),
1929                ));
1930            }
1931
1932            let query_plan = plan(&Statement::Select(SelectStatement {
1933                ctes: ctes.to_vec(),
1934                body: (**subquery).clone(),
1935            }))?;
1936            let result = execute_plan(db, &query_plan, params, tx)?;
1937            let select_expr = subquery
1938                .columns
1939                .first()
1940                .map(|column| column.expr.clone())
1941                .ok_or_else(|| Error::PlanError("subquery must select one column".to_string()))?;
1942            let list = result
1943                .rows
1944                .iter()
1945                .map(|row| eval_project_expr(&select_expr, row, &result.columns, params))
1946                .collect::<Result<Vec<_>>>()?
1947                .into_iter()
1948                .map(value_to_literal)
1949                .collect::<Result<Vec<_>>>()?;
1950            Ok(Expr::InList {
1951                expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
1952                list,
1953                negated: *negated,
1954            })
1955        }
1956        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
1957            left: Box::new(resolve_in_subqueries_with_ctes(db, left, params, tx, ctes)?),
1958            op: *op,
1959            right: Box::new(resolve_in_subqueries_with_ctes(
1960                db, right, params, tx, ctes,
1961            )?),
1962        }),
1963        Expr::UnaryOp { op, operand } => Ok(Expr::UnaryOp {
1964            op: *op,
1965            operand: Box::new(resolve_in_subqueries_with_ctes(
1966                db, operand, params, tx, ctes,
1967            )?),
1968        }),
1969        Expr::InList {
1970            expr,
1971            list,
1972            negated,
1973        } => Ok(Expr::InList {
1974            expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
1975            list: list
1976                .iter()
1977                .map(|item| resolve_in_subqueries_with_ctes(db, item, params, tx, ctes))
1978                .collect::<Result<Vec<_>>>()?,
1979            negated: *negated,
1980        }),
1981        Expr::Like {
1982            expr,
1983            pattern,
1984            negated,
1985        } => Ok(Expr::Like {
1986            expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
1987            pattern: Box::new(resolve_in_subqueries_with_ctes(
1988                db, pattern, params, tx, ctes,
1989            )?),
1990            negated: *negated,
1991        }),
1992        Expr::IsNull { expr, negated } => Ok(Expr::IsNull {
1993            expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
1994            negated: *negated,
1995        }),
1996        Expr::FunctionCall { name, args } => Ok(Expr::FunctionCall {
1997            name: name.clone(),
1998            args: args
1999                .iter()
2000                .map(|arg| resolve_in_subqueries_with_ctes(db, arg, params, tx, ctes))
2001                .collect::<Result<Vec<_>>>()?,
2002        }),
2003        _ => Ok(expr.clone()),
2004    }
2005}
2006
2007fn has_outer_table_ref(expr: &Expr, subquery_tables: &std::collections::HashSet<String>) -> bool {
2008    match expr {
2009        Expr::Column(ColumnRef {
2010            table: Some(table), ..
2011        }) => !subquery_tables.contains(table),
2012        Expr::BinaryOp { left, right, .. } => {
2013            has_outer_table_ref(left, subquery_tables)
2014                || has_outer_table_ref(right, subquery_tables)
2015        }
2016        Expr::UnaryOp { operand, .. } => has_outer_table_ref(operand, subquery_tables),
2017        Expr::InList { expr, list, .. } => {
2018            has_outer_table_ref(expr, subquery_tables)
2019                || list
2020                    .iter()
2021                    .any(|item| has_outer_table_ref(item, subquery_tables))
2022        }
2023        Expr::IsNull { expr, .. } => has_outer_table_ref(expr, subquery_tables),
2024        Expr::Like { expr, pattern, .. } => {
2025            has_outer_table_ref(expr, subquery_tables)
2026                || has_outer_table_ref(pattern, subquery_tables)
2027        }
2028        Expr::FunctionCall { args, .. } => args
2029            .iter()
2030            .any(|arg| has_outer_table_ref(arg, subquery_tables)),
2031        _ => false,
2032    }
2033}
2034
2035fn value_to_literal(value: Value) -> Result<Expr> {
2036    Ok(Expr::Literal(match value {
2037        Value::Null => Literal::Null,
2038        Value::Bool(v) => Literal::Bool(v),
2039        Value::Int64(v) => Literal::Integer(v),
2040        Value::Float64(v) => Literal::Real(v),
2041        Value::Text(v) => Literal::Text(v),
2042        Value::Uuid(v) => Literal::Text(v.to_string()),
2043        Value::Timestamp(v) => Literal::Integer(v),
2044        other => {
2045            return Err(Error::PlanError(format!(
2046                "unsupported subquery result value: {:?}",
2047                other
2048            )));
2049        }
2050    }))
2051}
2052
2053fn query_result_row_matches(
2054    row: &[Value],
2055    columns: &[String],
2056    expr: &Expr,
2057    params: &HashMap<String, Value>,
2058) -> Result<bool> {
2059    Ok(eval_query_result_bool_expr(row, columns, expr, params)?.unwrap_or(false))
2060}
2061
2062fn eval_query_result_bool_expr(
2063    row: &[Value],
2064    columns: &[String],
2065    expr: &Expr,
2066    params: &HashMap<String, Value>,
2067) -> Result<Option<bool>> {
2068    match expr {
2069        Expr::BinaryOp { left, op, right } => match op {
2070            BinOp::Eq | BinOp::Neq | BinOp::Lt | BinOp::Lte | BinOp::Gt | BinOp::Gte => {
2071                let left = eval_query_result_expr(left, row, columns, params)?;
2072                let right = eval_query_result_expr(right, row, columns, params)?;
2073                if left == Value::Null || right == Value::Null {
2074                    return Ok(None);
2075                }
2076
2077                let result = match op {
2078                    BinOp::Eq => {
2079                        compare_values(&left, &right) == Some(Ordering::Equal) || left == right
2080                    }
2081                    BinOp::Neq => {
2082                        !(compare_values(&left, &right) == Some(Ordering::Equal) || left == right)
2083                    }
2084                    BinOp::Lt => compare_values(&left, &right) == Some(Ordering::Less),
2085                    BinOp::Lte => matches!(
2086                        compare_values(&left, &right),
2087                        Some(Ordering::Less | Ordering::Equal)
2088                    ),
2089                    BinOp::Gt => compare_values(&left, &right) == Some(Ordering::Greater),
2090                    BinOp::Gte => matches!(
2091                        compare_values(&left, &right),
2092                        Some(Ordering::Greater | Ordering::Equal)
2093                    ),
2094                    BinOp::And | BinOp::Or => unreachable!(),
2095                };
2096                Ok(Some(result))
2097            }
2098            BinOp::And => {
2099                let left = eval_query_result_bool_expr(row, columns, left, params)?;
2100                if left == Some(false) {
2101                    return Ok(Some(false));
2102                }
2103                let right = eval_query_result_bool_expr(row, columns, right, params)?;
2104                Ok(match (left, right) {
2105                    (Some(true), Some(true)) => Some(true),
2106                    (Some(true), other) => other,
2107                    (None, Some(false)) => Some(false),
2108                    (None, Some(true)) | (None, None) => None,
2109                    (Some(false), _) => Some(false),
2110                })
2111            }
2112            BinOp::Or => {
2113                let left = eval_query_result_bool_expr(row, columns, left, params)?;
2114                if left == Some(true) {
2115                    return Ok(Some(true));
2116                }
2117                let right = eval_query_result_bool_expr(row, columns, right, params)?;
2118                Ok(match (left, right) {
2119                    (Some(false), Some(false)) => Some(false),
2120                    (Some(false), other) => other,
2121                    (None, Some(true)) => Some(true),
2122                    (None, Some(false)) | (None, None) => None,
2123                    (Some(true), _) => Some(true),
2124                })
2125            }
2126        },
2127        Expr::UnaryOp {
2128            op: UnaryOp::Not,
2129            operand,
2130        } => Ok(eval_query_result_bool_expr(row, columns, operand, params)?.map(|value| !value)),
2131        Expr::InList {
2132            expr,
2133            list,
2134            negated,
2135        } => {
2136            let needle = eval_query_result_expr(expr, row, columns, params)?;
2137            if needle == Value::Null {
2138                return Ok(None);
2139            }
2140
2141            let matched = list.iter().try_fold(false, |found, item| {
2142                if found {
2143                    Ok(true)
2144                } else {
2145                    let candidate = eval_query_result_expr(item, row, columns, params)?;
2146                    Ok(
2147                        matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
2148                            || (candidate != Value::Null && needle == candidate),
2149                    )
2150                }
2151            })?;
2152            Ok(Some(if *negated { !matched } else { matched }))
2153        }
2154        Expr::InSubquery { .. } => Err(Error::PlanError(
2155            "IN (subquery) must be resolved before execution".to_string(),
2156        )),
2157        Expr::Like {
2158            expr,
2159            pattern,
2160            negated,
2161        } => {
2162            let left = eval_query_result_expr(expr, row, columns, params)?;
2163            let right = eval_query_result_expr(pattern, row, columns, params)?;
2164            let matched = match (left, right) {
2165                (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
2166                _ => false,
2167            };
2168            Ok(Some(if *negated { !matched } else { matched }))
2169        }
2170        Expr::IsNull { expr, negated } => {
2171            let is_null = eval_query_result_expr(expr, row, columns, params)? == Value::Null;
2172            Ok(Some(if *negated { !is_null } else { is_null }))
2173        }
2174        Expr::FunctionCall { .. } => match eval_query_result_expr(expr, row, columns, params)? {
2175            Value::Bool(value) => Ok(Some(value)),
2176            Value::Null => Ok(None),
2177            _ => Err(Error::PlanError(format!(
2178                "unsupported WHERE expression: {:?}",
2179                expr
2180            ))),
2181        },
2182        _ => Err(Error::PlanError(format!(
2183            "unsupported WHERE expression: {:?}",
2184            expr
2185        ))),
2186    }
2187}
2188
2189fn lookup_query_result_column(
2190    row: &[Value],
2191    input_columns: &[String],
2192    column_ref: &ColumnRef,
2193) -> Result<Value> {
2194    if let Some(table) = &column_ref.table {
2195        let qualified = format!("{table}.{}", column_ref.column);
2196        // Prioritize qualified match (e.g., "e.id") over unqualified (e.g., "id")
2197        // to avoid picking the wrong table's column in JOINs.
2198        let idx = input_columns
2199            .iter()
2200            .position(|name| name == &qualified)
2201            .or_else(|| {
2202                input_columns
2203                    .iter()
2204                    .position(|name| name == &column_ref.column)
2205            })
2206            .ok_or_else(|| Error::PlanError(format!("project column not found: {}", qualified)))?;
2207        return Ok(row.get(idx).cloned().unwrap_or(Value::Null));
2208    }
2209
2210    let matches = input_columns
2211        .iter()
2212        .enumerate()
2213        .filter_map(|(idx, name)| {
2214            (name == &column_ref.column
2215                || name.rsplit('.').next() == Some(column_ref.column.as_str()))
2216            .then_some(idx)
2217        })
2218        .collect::<Vec<_>>();
2219
2220    match matches.as_slice() {
2221        [] => Err(Error::PlanError(format!(
2222            "project column not found: {}",
2223            column_ref.column
2224        ))),
2225        [idx] => Ok(row.get(*idx).cloned().unwrap_or(Value::Null)),
2226        _ => Err(Error::PlanError(format!(
2227            "ambiguous column reference: {}",
2228            column_ref.column
2229        ))),
2230    }
2231}
2232
2233fn concatenate_rows(left: &[Value], right: &[Value]) -> Vec<Value> {
2234    let mut combined = Vec::with_capacity(left.len() + right.len());
2235    combined.extend_from_slice(left);
2236    combined.extend_from_slice(right);
2237    combined
2238}
2239
2240fn duplicate_column_names(left: &[String], right: &[String]) -> BTreeSet<String> {
2241    let left_names = left
2242        .iter()
2243        .map(|column| column.rsplit('.').next().unwrap_or(column.as_str()))
2244        .collect::<BTreeSet<_>>();
2245    right
2246        .iter()
2247        .filter_map(|column| {
2248            let bare = column.rsplit('.').next().unwrap_or(column.as_str());
2249            left_names.contains(bare).then(|| bare.to_string())
2250        })
2251        .collect()
2252}
2253
2254fn qualify_join_columns(
2255    columns: &[String],
2256    left_columns: &[String],
2257    right_columns: &[String],
2258    left_alias: &Option<String>,
2259    right_prefix: &str,
2260) -> Vec<String> {
2261    let left_prefix = left_alias.as_deref();
2262    columns
2263        .iter()
2264        .enumerate()
2265        .map(|(idx, column)| {
2266            if idx < left_columns.len() {
2267                if let Some(prefix) = left_prefix {
2268                    format!(
2269                        "{prefix}.{}",
2270                        left_columns[idx].rsplit('.').next().unwrap_or(column)
2271                    )
2272                } else {
2273                    left_columns[idx].clone()
2274                }
2275            } else {
2276                let right_idx = idx - left_columns.len();
2277                let bare = right_columns[right_idx]
2278                    .rsplit('.')
2279                    .next()
2280                    .unwrap_or(right_columns[right_idx].as_str());
2281                if column == bare {
2282                    format!("{right_prefix}.{bare}")
2283                } else {
2284                    column.clone()
2285                }
2286            }
2287        })
2288        .collect()
2289}
2290
2291fn right_table_name(plan: &PhysicalPlan) -> String {
2292    match plan {
2293        PhysicalPlan::Scan { table, alias, .. } => alias.clone().unwrap_or_else(|| table.clone()),
2294        _ => "right".to_string(),
2295    }
2296}
2297
2298fn distinct_row_key(row: &[Value]) -> Vec<u8> {
2299    bincode::serde::encode_to_vec(row, bincode::config::standard())
2300        .expect("query rows should serialize for DISTINCT")
2301}
2302
2303fn resolve_uuid(expr: &Expr, params: &HashMap<String, Value>) -> Result<uuid::Uuid> {
2304    match resolve_expr(expr, params)? {
2305        Value::Uuid(u) => Ok(u),
2306        Value::Text(t) => uuid::Uuid::parse_str(&t)
2307            .map_err(|e| Error::PlanError(format!("invalid uuid '{}': {}", t, e))),
2308        _ => Err(Error::PlanError(
2309            "graph start node must be UUID".to_string(),
2310        )),
2311    }
2312}
2313
2314/// Resolve start nodes for a graph BFS from a WHERE filter like `a.name = 'entity-0'`.
2315/// Scans all relational tables for rows matching the filter condition.
2316fn resolve_graph_start_nodes_from_filter(
2317    db: &Database,
2318    filter: &Expr,
2319    params: &HashMap<String, Value>,
2320) -> Result<Vec<uuid::Uuid>> {
2321    if let Some(ids) = resolve_graph_start_ids_from_filter(filter, params)? {
2322        return Ok(ids);
2323    }
2324
2325    // Extract column name and expected value from the filter (e.g., a.name = 'entity-0')
2326    let (col_name, expected_value) = match filter {
2327        Expr::BinaryOp {
2328            left,
2329            op: BinOp::Eq,
2330            right,
2331        } => {
2332            if let Some(col) = extract_column_name(left) {
2333                (col, resolve_expr(right, params)?)
2334            } else if let Some(col) = extract_column_name(right) {
2335                (col, resolve_expr(left, params)?)
2336            } else {
2337                return Ok(vec![]);
2338            }
2339        }
2340        _ => return Ok(vec![]),
2341    };
2342
2343    let snapshot = db.snapshot();
2344    let mut uuids = Vec::new();
2345    for table_name in db.table_names() {
2346        let meta = match db.table_meta(&table_name) {
2347            Some(m) => m,
2348            None => continue,
2349        };
2350        // Only scan tables that have the referenced column and an id column
2351        let has_col = meta.columns.iter().any(|c| c.name == col_name);
2352        let has_id = meta.columns.iter().any(|c| c.name == "id");
2353        if !has_col || !has_id {
2354            continue;
2355        }
2356        let rows = db.scan_filter(&table_name, snapshot, &|row| {
2357            row.values.get(&col_name) == Some(&expected_value)
2358        })?;
2359        for row in rows {
2360            if let Some(Value::Uuid(id)) = row.values.get("id") {
2361                uuids.push(*id);
2362            }
2363        }
2364    }
2365    Ok(uuids)
2366}
2367
2368fn resolve_graph_start_nodes_from_plan(
2369    db: &Database,
2370    plan: &PhysicalPlan,
2371    params: &HashMap<String, Value>,
2372    tx: Option<TxId>,
2373) -> Result<Vec<uuid::Uuid>> {
2374    let result = execute_plan(db, plan, params, tx)?;
2375    result
2376        .rows
2377        .into_iter()
2378        .filter_map(|row| row.into_iter().next())
2379        .map(|value| match value {
2380            Value::Uuid(id) => Ok(id),
2381            Value::Text(text) => uuid::Uuid::parse_str(&text)
2382                .map_err(|_| Error::PlanError(format!("invalid UUID in graph start plan: {text}"))),
2383            other => Err(Error::PlanError(format!(
2384                "invalid graph start identifier from plan: {other:?}"
2385            ))),
2386        })
2387        .collect()
2388}
2389
2390fn resolve_graph_start_ids_from_filter(
2391    filter: &Expr,
2392    params: &HashMap<String, Value>,
2393) -> Result<Option<Vec<uuid::Uuid>>> {
2394    match filter {
2395        Expr::BinaryOp {
2396            left,
2397            op: BinOp::Eq,
2398            right,
2399        } if is_graph_id_ref(left) || is_graph_id_ref(right) => {
2400            let value = if is_graph_id_ref(left) {
2401                resolve_expr(right, params)?
2402            } else {
2403                resolve_expr(left, params)?
2404            };
2405            let id = match value {
2406                Value::Uuid(id) => id,
2407                Value::Text(text) => uuid::Uuid::parse_str(&text).map_err(|_| {
2408                    Error::PlanError(format!("invalid UUID in graph filter: {text}"))
2409                })?,
2410                other => {
2411                    return Err(Error::PlanError(format!(
2412                        "invalid graph start identifier in filter: {other:?}"
2413                    )));
2414                }
2415            };
2416            Ok(Some(vec![id]))
2417        }
2418        Expr::InList { expr, list, .. } if is_graph_id_ref(expr) => {
2419            let ids = list
2420                .iter()
2421                .map(|item| resolve_expr(item, params))
2422                .map(|value| match value? {
2423                    Value::Uuid(id) => Ok(id),
2424                    Value::Text(text) => uuid::Uuid::parse_str(&text).map_err(|_| {
2425                        Error::PlanError(format!("invalid UUID in graph filter: {text}"))
2426                    }),
2427                    other => Err(Error::PlanError(format!(
2428                        "invalid graph start identifier in filter: {other:?}"
2429                    ))),
2430                })
2431                .collect::<Result<Vec<_>>>()?;
2432            Ok(Some(ids))
2433        }
2434        Expr::BinaryOp { left, right, .. } => {
2435            if let Some(ids) = resolve_graph_start_ids_from_filter(left, params)? {
2436                return Ok(Some(ids));
2437            }
2438            resolve_graph_start_ids_from_filter(right, params)
2439        }
2440        Expr::UnaryOp { operand, .. } => resolve_graph_start_ids_from_filter(operand, params),
2441        _ => Ok(None),
2442    }
2443}
2444
2445fn is_graph_id_ref(expr: &Expr) -> bool {
2446    matches!(
2447        expr,
2448        Expr::Column(contextdb_parser::ast::ColumnRef { column, .. }) if column == "id"
2449    )
2450}
2451
2452/// Extract a bare column name from an Expr::Column, ignoring table alias.
2453fn extract_column_name(expr: &Expr) -> Option<String> {
2454    match expr {
2455        Expr::Column(contextdb_parser::ast::ColumnRef { column, .. }) => Some(column.clone()),
2456        _ => None,
2457    }
2458}
2459
2460fn resolve_vector_from_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<f32>> {
2461    match resolve_expr(expr, params)? {
2462        Value::Vector(v) => Ok(v),
2463        Value::Text(name) => match params.get(&name) {
2464            Some(Value::Vector(v)) => Ok(v.clone()),
2465            _ => Err(Error::PlanError("vector parameter missing".to_string())),
2466        },
2467        _ => Err(Error::PlanError(
2468            "invalid vector query expression".to_string(),
2469        )),
2470    }
2471}
2472
2473fn validate_vector_columns(
2474    db: &Database,
2475    table: &str,
2476    values: &HashMap<String, Value>,
2477) -> Result<()> {
2478    let Some(meta) = db.table_meta(table) else {
2479        return Ok(());
2480    };
2481
2482    for column in &meta.columns {
2483        if let contextdb_core::ColumnType::Vector(expected) = column.column_type
2484            && let Some(Value::Vector(vector)) = values.get(&column.name)
2485        {
2486            let got = vector.len();
2487            if got != expected {
2488                return Err(Error::VectorDimensionMismatch { expected, got });
2489            }
2490        }
2491    }
2492
2493    Ok(())
2494}
2495
2496fn vector_value_for_table<'a>(
2497    db: &Database,
2498    table: &str,
2499    values: &'a HashMap<String, Value>,
2500) -> Option<&'a Vec<f32>> {
2501    let meta = db.table_meta(table)?;
2502    meta.columns
2503        .iter()
2504        .find_map(|column| match column.column_type {
2505            contextdb_core::ColumnType::Vector(_) => match values.get(&column.name) {
2506                Some(Value::Vector(vector)) => Some(vector),
2507                _ => None,
2508            },
2509            _ => None,
2510        })
2511}
2512
2513fn coerce_value_for_column(db: &Database, table: &str, col: &str, v: Value) -> Result<Value> {
2514    let Some(meta) = db.table_meta(table) else {
2515        return Ok(coerce_uuid_if_needed(col, v));
2516    };
2517    let Some(column) = meta.columns.iter().find(|c| c.name == col) else {
2518        return Ok(coerce_uuid_if_needed(col, v));
2519    };
2520
2521    match column.column_type {
2522        contextdb_core::ColumnType::Uuid => coerce_uuid_value(v),
2523        contextdb_core::ColumnType::Timestamp => coerce_timestamp_value(v),
2524        contextdb_core::ColumnType::Vector(dim) => coerce_vector_value(v, dim),
2525        _ => Ok(coerce_uuid_if_needed(col, v)),
2526    }
2527}
2528
2529fn coerce_uuid_value(v: Value) -> Result<Value> {
2530    match v {
2531        Value::Null => Ok(Value::Null),
2532        Value::Uuid(id) => Ok(Value::Uuid(id)),
2533        Value::Text(text) => uuid::Uuid::parse_str(&text)
2534            .map(Value::Uuid)
2535            .map_err(|err| Error::Other(format!("invalid UUID literal '{text}': {err}"))),
2536        other => Err(Error::Other(format!(
2537            "UUID column requires UUID or text literal, got {other:?}"
2538        ))),
2539    }
2540}
2541
2542fn coerce_uuid_if_needed(col: &str, v: Value) -> Value {
2543    if (col == "id" || col.ends_with("_id"))
2544        && let Value::Text(s) = &v
2545        && let Ok(u) = uuid::Uuid::parse_str(s)
2546    {
2547        return Value::Uuid(u);
2548    }
2549    v
2550}
2551
2552fn coerce_timestamp_value(v: Value) -> Result<Value> {
2553    match v {
2554        Value::Null => Ok(Value::Null),
2555        Value::Text(text) if text.eq_ignore_ascii_case("infinity") => {
2556            Ok(Value::Timestamp(i64::MAX))
2557        }
2558        Value::Text(text) => {
2559            let parsed = OffsetDateTime::parse(&text, &Rfc3339).map_err(|err| {
2560                Error::Other(format!("invalid TIMESTAMP literal '{text}': {err}"))
2561            })?;
2562            Ok(Value::Timestamp(
2563                parsed.unix_timestamp_nanos() as i64 / 1_000_000,
2564            ))
2565        }
2566        other => Ok(other),
2567    }
2568}
2569
2570fn coerce_vector_value(v: Value, expected_dim: usize) -> Result<Value> {
2571    let vector = match v {
2572        Value::Null => return Ok(Value::Null),
2573        Value::Vector(vector) => vector,
2574        Value::Text(text) => parse_text_vector_literal(&text)?,
2575        other => return Ok(other),
2576    };
2577
2578    if vector.len() != expected_dim {
2579        return Err(Error::VectorDimensionMismatch {
2580            expected: expected_dim,
2581            got: vector.len(),
2582        });
2583    }
2584
2585    Ok(Value::Vector(vector))
2586}
2587
2588fn parse_text_vector_literal(text: &str) -> Result<Vec<f32>> {
2589    let trimmed = text.trim();
2590    let inner = trimmed
2591        .strip_prefix('[')
2592        .and_then(|s| s.strip_suffix(']'))
2593        .ok_or_else(|| Error::Other(format!("invalid VECTOR literal '{text}'")))?;
2594
2595    if inner.trim().is_empty() {
2596        return Ok(Vec::new());
2597    }
2598
2599    inner
2600        .split(',')
2601        .map(|part| {
2602            part.trim().parse::<f32>().map_err(|err| {
2603                Error::Other(format!("invalid VECTOR component '{}': {err}", part.trim()))
2604            })
2605        })
2606        .collect()
2607}
2608
2609fn apply_missing_column_defaults(
2610    db: &Database,
2611    table: &str,
2612    values: &mut HashMap<String, Value>,
2613) -> Result<()> {
2614    let Some(meta) = db.table_meta(table) else {
2615        return Ok(());
2616    };
2617
2618    for column in &meta.columns {
2619        if values.contains_key(&column.name) {
2620            continue;
2621        }
2622        let Some(default) = &column.default else {
2623            continue;
2624        };
2625        let value = evaluate_stored_default_expr(default)?;
2626        values.insert(
2627            column.name.clone(),
2628            coerce_value_for_column(db, table, &column.name, value)?,
2629        );
2630    }
2631
2632    Ok(())
2633}
2634
2635fn evaluate_stored_default_expr(default: &str) -> Result<Value> {
2636    if default.eq_ignore_ascii_case("NOW()") {
2637        return eval_function("now", &[]);
2638    }
2639    if default.contains("FunctionCall") && default.contains("name: \"NOW\"") {
2640        return eval_function("now", &[]);
2641    }
2642    if default == "Literal(Null)" || default.eq_ignore_ascii_case("NULL") {
2643        return Ok(Value::Null);
2644    }
2645    if default.eq_ignore_ascii_case("TRUE") {
2646        return Ok(Value::Bool(true));
2647    }
2648    if default.eq_ignore_ascii_case("FALSE") {
2649        return Ok(Value::Bool(false));
2650    }
2651    if default.starts_with('\'') && default.ends_with('\'') && default.len() >= 2 {
2652        return Ok(Value::Text(
2653            default[1..default.len() - 1].replace("''", "'"),
2654        ));
2655    }
2656    if let Some(text) = default
2657        .strip_prefix("Literal(Text(\"")
2658        .and_then(|value| value.strip_suffix("\"))"))
2659    {
2660        return Ok(Value::Text(text.to_string()));
2661    }
2662    if let Some(value) = default
2663        .strip_prefix("Literal(Integer(")
2664        .and_then(|value| value.strip_suffix("))"))
2665    {
2666        let parsed = value.parse::<i64>().map_err(|err| {
2667            Error::Other(format!("invalid stored integer default '{value}': {err}"))
2668        })?;
2669        return Ok(Value::Int64(parsed));
2670    }
2671    if let Some(value) = default
2672        .strip_prefix("Literal(Real(")
2673        .and_then(|value| value.strip_suffix("))"))
2674    {
2675        let parsed = value
2676            .parse::<f64>()
2677            .map_err(|err| Error::Other(format!("invalid stored real default '{value}': {err}")))?;
2678        return Ok(Value::Float64(parsed));
2679    }
2680    if let Some(value) = default
2681        .strip_prefix("Literal(Bool(")
2682        .and_then(|value| value.strip_suffix("))"))
2683    {
2684        let parsed = value
2685            .parse::<bool>()
2686            .map_err(|err| Error::Other(format!("invalid stored bool default '{value}': {err}")))?;
2687        return Ok(Value::Bool(parsed));
2688    }
2689
2690    Err(Error::Other(format!(
2691        "unsupported stored DEFAULT expression: {default}"
2692    )))
2693}
2694
2695pub(crate) fn stored_default_expr(expr: &Expr) -> String {
2696    match expr {
2697        Expr::Literal(Literal::Null) => "NULL".to_string(),
2698        Expr::Literal(Literal::Bool(value)) => {
2699            if *value {
2700                "TRUE".to_string()
2701            } else {
2702                "FALSE".to_string()
2703            }
2704        }
2705        Expr::Literal(Literal::Integer(value)) => value.to_string(),
2706        Expr::Literal(Literal::Real(value)) => value.to_string(),
2707        Expr::Literal(Literal::Text(value)) => format!("'{}'", value.replace('\'', "''")),
2708        Expr::FunctionCall { name, args }
2709            if name.eq_ignore_ascii_case("NOW") && args.is_empty() =>
2710        {
2711            "NOW()".to_string()
2712        }
2713        _ => format!("{expr:?}"),
2714    }
2715}
2716
2717fn should_route_insert_to_graph(db: &Database, table: &str) -> bool {
2718    table.eq_ignore_ascii_case("edges")
2719        || db
2720            .table_meta(table)
2721            .is_some_and(|table_meta| !table_meta.dag_edge_types.is_empty())
2722}
2723
2724fn validate_expires_column(col: &contextdb_parser::ast::ColumnDef) -> Result<()> {
2725    if col.expires && !matches!(col.data_type, DataType::Timestamp) {
2726        return Err(Error::Other(
2727            "EXPIRES is only valid on TIMESTAMP columns".to_string(),
2728        ));
2729    }
2730    Ok(())
2731}
2732
2733fn expires_column_name(columns: &[contextdb_parser::ast::ColumnDef]) -> Result<Option<String>> {
2734    let mut expires_column = None;
2735    for col in columns {
2736        validate_expires_column(col)?;
2737        if col.expires {
2738            if expires_column.is_some() {
2739                return Err(Error::Other(
2740                    "only one EXPIRES column is supported per table".to_string(),
2741                ));
2742            }
2743            expires_column = Some(col.name.clone());
2744        }
2745    }
2746    Ok(expires_column)
2747}
2748
2749pub(crate) fn map_column_type(dtype: &DataType) -> contextdb_core::ColumnType {
2750    match dtype {
2751        DataType::Uuid => contextdb_core::ColumnType::Uuid,
2752        DataType::Text => contextdb_core::ColumnType::Text,
2753        DataType::Integer => contextdb_core::ColumnType::Integer,
2754        DataType::Real => contextdb_core::ColumnType::Real,
2755        DataType::Boolean => contextdb_core::ColumnType::Boolean,
2756        DataType::Timestamp => contextdb_core::ColumnType::Timestamp,
2757        DataType::Json => contextdb_core::ColumnType::Json,
2758        DataType::Vector(dim) => contextdb_core::ColumnType::Vector(*dim as usize),
2759    }
2760}
2761
2762fn parse_conflict_policy(s: &str) -> Result<ConflictPolicy> {
2763    match s {
2764        "latest_wins" => Ok(ConflictPolicy::LatestWins),
2765        "server_wins" => Ok(ConflictPolicy::ServerWins),
2766        "edge_wins" => Ok(ConflictPolicy::EdgeWins),
2767        "insert_if_not_exists" => Ok(ConflictPolicy::InsertIfNotExists),
2768        _ => Err(Error::Other(format!("unknown conflict policy: {s}"))),
2769    }
2770}
2771
2772fn conflict_policy_to_string(p: ConflictPolicy) -> String {
2773    match p {
2774        ConflictPolicy::LatestWins => "latest_wins".to_string(),
2775        ConflictPolicy::ServerWins => "server_wins".to_string(),
2776        ConflictPolicy::EdgeWins => "edge_wins".to_string(),
2777        ConflictPolicy::InsertIfNotExists => "insert_if_not_exists".to_string(),
2778    }
2779}
2780
2781fn project_graph_frontier_rows(
2782    frontier: Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>,
2783    start_alias: &str,
2784    steps: &[GraphStepPlan],
2785) -> Result<Vec<Vec<Value>>> {
2786    frontier
2787        .into_iter()
2788        .map(|(bindings, id, depth)| {
2789            let mut row = Vec::with_capacity(steps.len() + 3);
2790            let start_id = bindings.get(start_alias).ok_or_else(|| {
2791                Error::PlanError(format!(
2792                    "graph frontier missing required start alias binding '{start_alias}'"
2793                ))
2794            })?;
2795            row.push(Value::Uuid(*start_id));
2796            for step in steps {
2797                let target_id = bindings.get(&step.target_alias).ok_or_else(|| {
2798                    Error::PlanError(format!(
2799                        "graph frontier missing required target alias binding '{}'",
2800                        step.target_alias
2801                    ))
2802                })?;
2803                row.push(Value::Uuid(*target_id));
2804            }
2805            row.push(Value::Uuid(id));
2806            row.push(Value::Int64(depth as i64));
2807            Ok(row)
2808        })
2809        .collect()
2810}
2811
2812#[cfg(test)]
2813mod tests {
2814    use super::*;
2815    use contextdb_planner::GraphStepPlan;
2816    use uuid::Uuid;
2817
2818    #[test]
2819    fn graph_01_frontier_projection_requires_complete_bindings() {
2820        let steps = vec![GraphStepPlan {
2821            edge_types: vec!["EDGE".to_string()],
2822            direction: Direction::Outgoing,
2823            min_depth: 1,
2824            max_depth: 1,
2825            target_alias: "b".to_string(),
2826        }];
2827
2828        let missing_start = vec![(HashMap::new(), Uuid::new_v4(), 0)];
2829        let missing_target = vec![(
2830            HashMap::from([("a".to_string(), Uuid::new_v4())]),
2831            Uuid::new_v4(),
2832            0,
2833        )];
2834
2835        let start_result = project_graph_frontier_rows(missing_start, "a", &steps);
2836        assert!(
2837            matches!(start_result, Err(Error::PlanError(_))),
2838            "graph frontier projection should return a plan error on missing start alias binding, got {start_result:?}"
2839        );
2840
2841        let target_result = project_graph_frontier_rows(missing_target, "a", &steps);
2842        assert!(
2843            matches!(target_result, Err(Error::PlanError(_))),
2844            "graph frontier projection should return a plan error on missing target alias binding, got {target_result:?}"
2845        );
2846    }
2847}