Skip to main content

contextdb_engine/
executor.rs

1use crate::database::{Database, QueryResult};
2use crate::sync_types::ConflictPolicy;
3use contextdb_core::*;
4use contextdb_parser::ast::{
5    AlterAction, BinOp, ColumnRef, Cte, DataType, Expr, Literal, SelectStatement,
6    SetDiskLimitValue, SetMemoryLimitValue, SortDirection, Statement, UnaryOp,
7};
8use contextdb_planner::{
9    DeletePlan, GraphStepPlan, InsertPlan, OnConflictPlan, PhysicalPlan, UpdatePlan, plan,
10};
11use roaring::RoaringTreemap;
12use std::cmp::Ordering;
13use std::collections::{BTreeSet, HashMap, HashSet};
14use std::time::{SystemTime, UNIX_EPOCH};
15use time::OffsetDateTime;
16use time::format_description::well_known::Rfc3339;
17
18pub(crate) fn execute_plan(
19    db: &Database,
20    plan: &PhysicalPlan,
21    params: &HashMap<String, Value>,
22    tx: Option<TxId>,
23) -> Result<QueryResult> {
24    match plan {
25        PhysicalPlan::CreateTable(p) => {
26            db.check_disk_budget("CREATE TABLE")?;
27            let expires_column = expires_column_name(&p.columns)?;
28            let meta = TableMeta {
29                columns: p
30                    .columns
31                    .iter()
32                    .map(|c| contextdb_core::ColumnDef {
33                        name: c.name.clone(),
34                        column_type: map_column_type(&c.data_type),
35                        nullable: c.nullable,
36                        primary_key: c.primary_key,
37                        unique: c.unique,
38                        default: c.default.as_ref().map(stored_default_expr),
39                        expires: c.expires,
40                    })
41                    .collect(),
42                immutable: p.immutable,
43                state_machine: p.state_machine.as_ref().map(|sm| StateMachineConstraint {
44                    column: sm.column.clone(),
45                    transitions: sm
46                        .transitions
47                        .iter()
48                        .map(|(from, tos)| (from.clone(), tos.clone()))
49                        .collect(),
50                }),
51                dag_edge_types: p.dag_edge_types.clone(),
52                natural_key_column: None,
53                propagation_rules: p.propagation_rules.clone(),
54                default_ttl_seconds: p.retain.as_ref().map(|r| r.duration_seconds),
55                sync_safe: p.retain.as_ref().is_some_and(|r| r.sync_safe),
56                expires_column,
57            };
58            let metadata_bytes = meta.estimated_bytes();
59            db.accountant().try_allocate_for(
60                metadata_bytes,
61                "ddl",
62                "create_table",
63                "Reduce schema size or raise MEMORY_LIMIT before creating more tables.",
64            )?;
65            db.relational_store().create_table(&p.name, meta);
66            if let Some(table_meta) = db.table_meta(&p.name) {
67                db.persist_table_meta(&p.name, &table_meta)?;
68                db.allocate_ddl_lsn(|lsn| db.log_create_table_ddl(&p.name, &table_meta, lsn));
69            }
70            Ok(QueryResult::empty_with_affected(0))
71        }
72        PhysicalPlan::DropTable(name) => {
73            let bytes_to_release = estimate_drop_table_bytes(db, name);
74            db.drop_table_aux_state(name);
75            db.relational_store().drop_table(name);
76            db.remove_persisted_table(name)?;
77            db.allocate_ddl_lsn(|lsn| db.log_drop_table_ddl(name, lsn));
78            db.accountant().release(bytes_to_release);
79            Ok(QueryResult::empty_with_affected(0))
80        }
81        PhysicalPlan::AlterTable(p) => {
82            db.check_disk_budget("ALTER TABLE")?;
83            let store = db.relational_store();
84            match &p.action {
85                AlterAction::AddColumn(col) => {
86                    if col.primary_key {
87                        return Err(Error::Other(
88                            "adding a primary key column via ALTER TABLE is not supported"
89                                .to_string(),
90                        ));
91                    }
92                    validate_expires_column(col)?;
93                    let core_col = contextdb_core::ColumnDef {
94                        name: col.name.clone(),
95                        column_type: map_column_type(&col.data_type),
96                        nullable: col.nullable,
97                        primary_key: col.primary_key,
98                        unique: col.unique,
99                        default: col.default.as_ref().map(stored_default_expr),
100                        expires: col.expires,
101                    };
102                    store
103                        .alter_table_add_column(&p.table, core_col)
104                        .map_err(Error::Other)?;
105                    if col.expires {
106                        let mut meta = store.table_meta.write();
107                        let table_meta = meta.get_mut(&p.table).ok_or_else(|| {
108                            Error::Other(format!("table '{}' not found", p.table))
109                        })?;
110                        table_meta.expires_column = Some(col.name.clone());
111                    }
112                }
113                AlterAction::DropColumn(name) => {
114                    store
115                        .alter_table_drop_column(&p.table, name)
116                        .map_err(Error::Other)?;
117                    let mut meta = store.table_meta.write();
118                    if let Some(table_meta) = meta.get_mut(&p.table)
119                        && table_meta.expires_column.as_deref() == Some(name.as_str())
120                    {
121                        table_meta.expires_column = None;
122                    }
123                }
124                AlterAction::RenameColumn { from, to } => {
125                    store
126                        .alter_table_rename_column(&p.table, from, to)
127                        .map_err(Error::Other)?;
128                    let mut meta = store.table_meta.write();
129                    if let Some(table_meta) = meta.get_mut(&p.table)
130                        && table_meta.expires_column.as_deref() == Some(from.as_str())
131                    {
132                        table_meta.expires_column = Some(to.clone());
133                    }
134                }
135                AlterAction::SetRetain {
136                    duration_seconds,
137                    sync_safe,
138                } => {
139                    let mut meta = store.table_meta.write();
140                    let table_meta = meta
141                        .get_mut(&p.table)
142                        .ok_or_else(|| Error::Other(format!("table '{}' not found", p.table)))?;
143                    if table_meta.immutable {
144                        return Err(Error::Other(
145                            "IMMUTABLE and RETAIN are mutually exclusive".to_string(),
146                        ));
147                    }
148                    table_meta.default_ttl_seconds = Some(*duration_seconds);
149                    table_meta.sync_safe = *sync_safe;
150                }
151                AlterAction::DropRetain => {
152                    let mut meta = store.table_meta.write();
153                    let table_meta = meta
154                        .get_mut(&p.table)
155                        .ok_or_else(|| Error::Other(format!("table '{}' not found", p.table)))?;
156                    table_meta.default_ttl_seconds = None;
157                    table_meta.sync_safe = false;
158                }
159                AlterAction::SetSyncConflictPolicy(policy) => {
160                    let cp = parse_conflict_policy(policy)?;
161                    db.set_table_conflict_policy(&p.table, cp);
162                }
163                AlterAction::DropSyncConflictPolicy => {
164                    db.drop_table_conflict_policy(&p.table);
165                }
166            }
167            if let Some(table_meta) = db.table_meta(&p.table) {
168                db.persist_table_meta(&p.table, &table_meta)?;
169                if !matches!(
170                    p.action,
171                    AlterAction::AddColumn(_)
172                        | AlterAction::SetRetain { .. }
173                        | AlterAction::DropRetain
174                        | AlterAction::SetSyncConflictPolicy(_)
175                        | AlterAction::DropSyncConflictPolicy
176                ) {
177                    db.persist_table_rows(&p.table)?;
178                }
179                db.allocate_ddl_lsn(|lsn| db.log_alter_table_ddl(&p.table, &table_meta, lsn));
180            }
181            Ok(QueryResult::empty_with_affected(0))
182        }
183        PhysicalPlan::Insert(p) => exec_insert(db, p, params, tx),
184        PhysicalPlan::Delete(p) => exec_delete(db, p, params, tx),
185        PhysicalPlan::Update(p) => exec_update(db, p, params, tx),
186        PhysicalPlan::Scan { table, filter, .. } => {
187            if table == "dual" {
188                return Ok(QueryResult {
189                    columns: vec![],
190                    rows: vec![vec![]],
191                    rows_affected: 0,
192                });
193            }
194            let snapshot = db.snapshot();
195            let rows = db.scan(table, snapshot)?;
196            let schema_columns = db.table_meta(table).map(|meta| {
197                meta.columns
198                    .into_iter()
199                    .map(|column| column.name)
200                    .collect::<Vec<_>>()
201            });
202            let resolved_filter = filter
203                .as_ref()
204                .map(|expr| resolve_in_subqueries(db, expr, params, tx))
205                .transpose()?;
206            materialize_rows(
207                rows,
208                resolved_filter.as_ref(),
209                params,
210                schema_columns.as_deref(),
211            )
212        }
213        PhysicalPlan::GraphBfs {
214            start_alias,
215            start_expr,
216            start_candidates,
217            steps,
218            filter,
219        } => {
220            let start_uuids = match resolve_uuid(start_expr, params) {
221                Ok(start) => vec![start],
222                Err(Error::PlanError(_))
223                    if matches!(
224                        start_expr,
225                        Expr::Column(contextdb_parser::ast::ColumnRef { table: None, .. })
226                    ) =>
227                {
228                    // Start node not directly specified — check if a subquery or filter can help
229                    if let Some(candidate_plan) = start_candidates {
230                        resolve_graph_start_nodes_from_plan(db, candidate_plan, params, tx)?
231                    } else if let Some(filter_expr) = filter {
232                        let resolved_filter = resolve_in_subqueries(db, filter_expr, params, tx)?;
233                        resolve_graph_start_nodes_from_filter(db, &resolved_filter, params)?
234                    } else {
235                        vec![]
236                    }
237                }
238                Err(err) => return Err(err),
239            };
240            if start_uuids.is_empty() {
241                return Ok(QueryResult {
242                    columns: vec!["id".to_string(), "depth".to_string()],
243                    rows: vec![],
244                    rows_affected: 0,
245                });
246            }
247            let snapshot = db.snapshot();
248            let mut frontier = start_uuids
249                .into_iter()
250                .map(|id| (HashMap::from([(start_alias.clone(), id)]), id, 0_u32))
251                .collect::<Vec<_>>();
252            let bfs_bytes = estimate_bfs_working_bytes(&frontier, steps);
253            db.accountant().try_allocate_for(
254                bfs_bytes,
255                "bfs_frontier",
256                "graph_bfs",
257                "Reduce traversal depth/fan-out or raise MEMORY_LIMIT before running BFS.",
258            )?;
259
260            let result = (|| {
261                for step in steps {
262                    let edge_types_ref = if step.edge_types.is_empty() {
263                        None
264                    } else {
265                        Some(step.edge_types.as_slice())
266                    };
267                    let mut next = Vec::new();
268
269                    for (bindings, start, base_depth) in &frontier {
270                        let res = db.graph().bfs(
271                            *start,
272                            edge_types_ref,
273                            step.direction,
274                            step.min_depth,
275                            step.max_depth,
276                            snapshot,
277                        )?;
278                        for node in res.nodes {
279                            let total_depth = base_depth.saturating_add(node.depth);
280                            let mut next_bindings = bindings.clone();
281                            next_bindings.insert(step.target_alias.clone(), node.id);
282                            next.push((next_bindings, node.id, total_depth));
283                        }
284                    }
285
286                    frontier = dedupe_graph_frontier(next, steps);
287                    if frontier.is_empty() {
288                        break;
289                    }
290                }
291
292                let mut columns =
293                    steps
294                        .iter()
295                        .fold(vec![format!("{start_alias}.id")], |mut cols, step| {
296                            cols.push(format!("{}.id", step.target_alias));
297                            cols
298                        });
299                columns.push("id".to_string());
300                columns.push("depth".to_string());
301
302                Ok(QueryResult {
303                    columns,
304                    rows: project_graph_frontier_rows(frontier, start_alias, steps)?,
305                    rows_affected: 0,
306                })
307            })();
308            db.accountant().release(bfs_bytes);
309
310            result
311        }
312        PhysicalPlan::VectorSearch {
313            table,
314            query_expr,
315            k,
316            candidates,
317            ..
318        }
319        | PhysicalPlan::HnswSearch {
320            table,
321            query_expr,
322            k,
323            candidates,
324            ..
325        } => {
326            let query_vec = resolve_vector_from_expr(query_expr, params)?;
327            let snapshot = db.snapshot();
328            let all_rows = db.scan(table, snapshot)?;
329            let candidate_bitmap = if let Some(cands_plan) = candidates {
330                let qr = execute_plan(db, cands_plan, params, tx)?;
331                let mut bm = RoaringTreemap::new();
332                let row_id_idx = qr.columns.iter().position(|column| {
333                    column == "row_id" || column.rsplit('.').next() == Some("row_id")
334                });
335                let id_idx = qr
336                    .columns
337                    .iter()
338                    .position(|column| column == "id" || column.rsplit('.').next() == Some("id"));
339
340                if let Some(idx) = row_id_idx {
341                    for row in qr.rows {
342                        if let Some(Value::Int64(id)) = row.get(idx) {
343                            bm.insert(*id as u64);
344                        }
345                    }
346                } else if let Some(idx) = id_idx {
347                    let uuid_to_row_id: HashMap<uuid::Uuid, u64> = all_rows
348                        .iter()
349                        .filter_map(|row| match row.values.get("id") {
350                            Some(Value::Uuid(uuid)) => Some((*uuid, row.row_id)),
351                            _ => None,
352                        })
353                        .collect();
354                    for row in qr.rows {
355                        if let Some(Value::Uuid(uuid)) = row.get(idx)
356                            && let Some(row_id) = uuid_to_row_id.get(uuid)
357                        {
358                            bm.insert(*row_id);
359                        }
360                    }
361                }
362                Some(bm)
363            } else {
364                None
365            };
366
367            let vector_bytes = estimate_vector_search_bytes(query_vec.len(), *k as usize);
368            db.accountant().try_allocate_for(
369                vector_bytes,
370                "vector_search",
371                "search",
372                "Reduce LIMIT/dimensionality or raise MEMORY_LIMIT before vector search.",
373            )?;
374            let res = db.query_vector(
375                &query_vec,
376                *k as usize,
377                candidate_bitmap.as_ref(),
378                db.snapshot(),
379            );
380            db.accountant().release(vector_bytes);
381            let res = res?;
382
383            // Re-materialize: look up actual rows by row_id so SELECT * returns user columns
384            let schema_columns = db.table_meta(table).map(|meta| {
385                meta.columns
386                    .into_iter()
387                    .map(|column| column.name)
388                    .collect::<Vec<_>>()
389            });
390            let keys = if let Some(ref sc) = schema_columns {
391                sc.clone()
392            } else {
393                let mut ks = BTreeSet::new();
394                for r in &all_rows {
395                    for k in r.values.keys() {
396                        ks.insert(k.clone());
397                    }
398                }
399                ks.into_iter().collect::<Vec<_>>()
400            };
401
402            let row_map: HashMap<u64, &VersionedRow> =
403                all_rows.iter().map(|r| (r.row_id, r)).collect();
404
405            let mut columns = vec!["row_id".to_string()];
406            columns.extend(keys.iter().cloned());
407            columns.push("score".to_string());
408
409            let rows = res
410                .into_iter()
411                .filter_map(|(rid, score)| {
412                    row_map.get(&rid).map(|row| {
413                        let mut out = vec![Value::Int64(rid as i64)];
414                        for k in &keys {
415                            out.push(row.values.get(k).cloned().unwrap_or(Value::Null));
416                        }
417                        out.push(Value::Float64(score as f64));
418                        out
419                    })
420                })
421                .collect();
422
423            Ok(QueryResult {
424                columns,
425                rows,
426                rows_affected: 0,
427            })
428        }
429        PhysicalPlan::MaterializeCte { input, .. } => execute_plan(db, input, params, tx),
430        PhysicalPlan::Project { input, columns } => {
431            let input_result = execute_plan(db, input, params, tx)?;
432            let has_aggregate = columns.iter().any(|column| {
433                matches!(
434                    &column.expr,
435                    Expr::FunctionCall { name, .. } if name.eq_ignore_ascii_case("count")
436                )
437            });
438            if has_aggregate {
439                if columns.iter().any(|column| {
440                    !matches!(
441                        &column.expr,
442                        Expr::FunctionCall { name, .. } if name.eq_ignore_ascii_case("count")
443                    )
444                }) {
445                    return Err(Error::PlanError(
446                        "mixed aggregate and non-aggregate columns without GROUP BY".to_string(),
447                    ));
448                }
449
450                let output_columns = columns
451                    .iter()
452                    .map(|column| {
453                        column.alias.clone().unwrap_or_else(|| match &column.expr {
454                            Expr::FunctionCall { name, .. } => name.clone(),
455                            _ => "expr".to_string(),
456                        })
457                    })
458                    .collect::<Vec<_>>();
459
460                let aggregate_row = columns
461                    .iter()
462                    .map(|column| match &column.expr {
463                        Expr::FunctionCall { name: _, args } => {
464                            let count = if matches!(
465                                args.as_slice(),
466                                [Expr::Column(contextdb_parser::ast::ColumnRef { table: None, column })]
467                                if column == "*"
468                            ) {
469                                input_result.rows.len() as i64
470                            } else {
471                                input_result
472                                    .rows
473                                    .iter()
474                                    .filter_map(|row| {
475                                        args.first().map(|arg| {
476                                            eval_query_result_expr(
477                                                arg,
478                                                row,
479                                                &input_result.columns,
480                                                params,
481                                            )
482                                        })
483                                    })
484                                    .collect::<Result<Vec<_>>>()?
485                                    .into_iter()
486                                    .filter(|value| *value != Value::Null)
487                                    .count() as i64
488                            };
489                            Ok(Value::Int64(count))
490                        }
491                        _ => Err(Error::PlanError(
492                            "mixed aggregate and non-aggregate columns without GROUP BY"
493                                .to_string(),
494                        )),
495                    })
496                    .collect::<Result<Vec<_>>>()?;
497
498                return Ok(QueryResult {
499                    columns: output_columns,
500                    rows: vec![aggregate_row],
501                    rows_affected: 0,
502                });
503            }
504
505            let output_columns = columns
506                .iter()
507                .map(|c| {
508                    c.alias.clone().unwrap_or_else(|| match &c.expr {
509                        Expr::Column(col) => col.column.clone(),
510                        _ => "expr".to_string(),
511                    })
512                })
513                .collect::<Vec<_>>();
514
515            let mut output_rows = Vec::with_capacity(input_result.rows.len());
516            for row in &input_result.rows {
517                let mut projected = Vec::with_capacity(columns.len());
518                for col in columns {
519                    projected.push(eval_project_expr(
520                        &col.expr,
521                        row,
522                        &input_result.columns,
523                        params,
524                    )?);
525                }
526                output_rows.push(projected);
527            }
528
529            Ok(QueryResult {
530                columns: output_columns,
531                rows: output_rows,
532                rows_affected: 0,
533            })
534        }
535        PhysicalPlan::Sort { input, keys } => {
536            let mut input_result = execute_plan(db, input, params, tx)?;
537            input_result.rows.sort_by(|left, right| {
538                for key in keys {
539                    let Expr::Column(column_ref) = &key.expr else {
540                        return Ordering::Equal;
541                    };
542                    let left_value =
543                        match lookup_query_result_column(left, &input_result.columns, column_ref) {
544                            Ok(value) => value,
545                            Err(_) => return Ordering::Equal,
546                        };
547                    let right_value = match lookup_query_result_column(
548                        right,
549                        &input_result.columns,
550                        column_ref,
551                    ) {
552                        Ok(value) => value,
553                        Err(_) => return Ordering::Equal,
554                    };
555                    let ordering = compare_sort_values(&left_value, &right_value, key.direction);
556                    if ordering != Ordering::Equal {
557                        return ordering;
558                    }
559                }
560                Ordering::Equal
561            });
562            Ok(input_result)
563        }
564        PhysicalPlan::Limit { input, count } => {
565            let mut input_result = execute_plan(db, input, params, tx)?;
566            input_result.rows.truncate(*count as usize);
567            Ok(input_result)
568        }
569        PhysicalPlan::Filter { input, predicate } => {
570            let mut input_result = execute_plan(db, input, params, tx)?;
571            input_result.rows.retain(|row| {
572                query_result_row_matches(row, &input_result.columns, predicate, params)
573                    .unwrap_or(false)
574            });
575            Ok(input_result)
576        }
577        PhysicalPlan::Distinct { input } => {
578            let input_result = execute_plan(db, input, params, tx)?;
579            let mut seen = HashSet::<Vec<u8>>::new();
580            let rows = input_result
581                .rows
582                .into_iter()
583                .filter(|row| seen.insert(distinct_row_key(row)))
584                .collect();
585            Ok(QueryResult {
586                columns: input_result.columns,
587                rows,
588                rows_affected: input_result.rows_affected,
589            })
590        }
591        PhysicalPlan::Join {
592            left,
593            right,
594            condition,
595            join_type,
596            left_alias,
597            right_alias,
598        } => {
599            let left_result = execute_plan(db, left, params, tx)?;
600            let right_result = execute_plan(db, right, params, tx)?;
601            let right_duplicate_names =
602                duplicate_column_names(&left_result.columns, &right_result.columns);
603            let right_prefix = right_alias
604                .clone()
605                .unwrap_or_else(|| right_table_name(right));
606            let right_columns = right_result
607                .columns
608                .iter()
609                .map(|column| {
610                    if right_duplicate_names.contains(column) {
611                        format!("{right_prefix}.{column}")
612                    } else {
613                        column.clone()
614                    }
615                })
616                .collect::<Vec<_>>();
617
618            let mut columns = left_result.columns.clone();
619            columns.extend(right_columns);
620
621            let mut rows = Vec::new();
622            for left_row in &left_result.rows {
623                let mut matched = false;
624                for right_row in &right_result.rows {
625                    let combined = concatenate_rows(left_row, right_row);
626                    if query_result_row_matches(&combined, &columns, condition, params)? {
627                        matched = true;
628                        rows.push(combined);
629                    }
630                }
631
632                if !matched && matches!(join_type, contextdb_planner::JoinType::Left) {
633                    let mut combined = left_row.clone();
634                    combined.extend(std::iter::repeat_n(Value::Null, right_result.columns.len()));
635                    rows.push(combined);
636                }
637            }
638
639            let output_columns = qualify_join_columns(
640                &columns,
641                &left_result.columns,
642                &right_result.columns,
643                left_alias,
644                &right_prefix,
645            );
646
647            Ok(QueryResult {
648                columns: output_columns,
649                rows,
650                rows_affected: 0,
651            })
652        }
653        PhysicalPlan::CreateIndex(_) => Ok(QueryResult::empty_with_affected(0)),
654        PhysicalPlan::SetMemoryLimit(val) => {
655            let limit = match val {
656                SetMemoryLimitValue::Bytes(bytes) => Some(*bytes),
657                SetMemoryLimitValue::None => None,
658            };
659            db.accountant().set_budget(limit)?;
660            db.persist_memory_limit(limit)?;
661            Ok(QueryResult::empty())
662        }
663        PhysicalPlan::ShowMemoryLimit => {
664            let usage = db.accountant().usage();
665            Ok(QueryResult {
666                columns: vec![
667                    "limit".to_string(),
668                    "used".to_string(),
669                    "available".to_string(),
670                    "startup_ceiling".to_string(),
671                ],
672                rows: vec![vec![
673                    usage
674                        .limit
675                        .map(|value| Value::Int64(value as i64))
676                        .unwrap_or_else(|| Value::Text("none".to_string())),
677                    Value::Int64(usage.used as i64),
678                    usage
679                        .available
680                        .map(|value| Value::Int64(value as i64))
681                        .unwrap_or_else(|| Value::Text("none".to_string())),
682                    usage
683                        .startup_ceiling
684                        .map(|value| Value::Int64(value as i64))
685                        .unwrap_or_else(|| Value::Text("none".to_string())),
686                ]],
687                rows_affected: 0,
688            })
689        }
690        PhysicalPlan::SetDiskLimit(val) => {
691            let limit = match val {
692                SetDiskLimitValue::Bytes(bytes) => Some(*bytes),
693                SetDiskLimitValue::None => None,
694            };
695            db.set_disk_limit(limit)?;
696            db.persist_disk_limit(limit)?;
697            Ok(QueryResult::empty())
698        }
699        PhysicalPlan::ShowDiskLimit => {
700            let limit = db.disk_limit();
701            let used = db.disk_file_size();
702            let startup_ceiling = db.disk_limit_startup_ceiling();
703            Ok(QueryResult {
704                columns: vec![
705                    "limit".to_string(),
706                    "used".to_string(),
707                    "available".to_string(),
708                    "startup_ceiling".to_string(),
709                ],
710                rows: vec![vec![
711                    limit
712                        .map(|value| Value::Int64(value as i64))
713                        .unwrap_or_else(|| Value::Text("none".to_string())),
714                    used.map(|value| Value::Int64(value as i64))
715                        .unwrap_or(Value::Null),
716                    match (limit, used) {
717                        (Some(limit), Some(used)) => {
718                            Value::Int64(limit.saturating_sub(used) as i64)
719                        }
720                        _ => Value::Null,
721                    },
722                    startup_ceiling
723                        .map(|value| Value::Int64(value as i64))
724                        .unwrap_or_else(|| Value::Text("none".to_string())),
725                ]],
726                rows_affected: 0,
727            })
728        }
729        PhysicalPlan::SetSyncConflictPolicy(policy) => {
730            let cp = parse_conflict_policy(policy)?;
731            db.set_default_conflict_policy(cp);
732            Ok(QueryResult::empty())
733        }
734        PhysicalPlan::ShowSyncConflictPolicy => {
735            let policies = db.conflict_policies();
736            let default_str = conflict_policy_to_string(policies.default);
737            let mut rows = vec![vec![Value::Text(default_str)]];
738            for (table, policy) in &policies.per_table {
739                rows.push(vec![Value::Text(format!(
740                    "{}={}",
741                    table,
742                    conflict_policy_to_string(*policy)
743                ))]);
744            }
745            Ok(QueryResult {
746                columns: vec!["policy".to_string()],
747                rows,
748                rows_affected: 0,
749            })
750        }
751        PhysicalPlan::Pipeline(plans) => {
752            let mut last = QueryResult::empty();
753            for p in plans {
754                last = execute_plan(db, p, params, tx)?;
755            }
756            Ok(last)
757        }
758        _ => Err(Error::PlanError(
759            "unsupported plan node in executor".to_string(),
760        )),
761    }
762}
763
764fn eval_project_expr(
765    expr: &Expr,
766    row: &[Value],
767    input_columns: &[String],
768    params: &HashMap<String, Value>,
769) -> Result<Value> {
770    match expr {
771        Expr::Column(c) => lookup_query_result_column(row, input_columns, c),
772        Expr::Literal(lit) => resolve_expr(&Expr::Literal(lit.clone()), params),
773        Expr::Parameter(name) => params
774            .get(name)
775            .cloned()
776            .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", name))),
777        Expr::BinaryOp { left, op, right } => {
778            let left = eval_query_result_expr(left, row, input_columns, params)?;
779            let right = eval_query_result_expr(right, row, input_columns, params)?;
780            eval_binary_op(op, &left, &right)
781        }
782        Expr::UnaryOp { op, operand } => {
783            let value = eval_query_result_expr(operand, row, input_columns, params)?;
784            match op {
785                UnaryOp::Not => Ok(Value::Bool(!value_to_bool(&value))),
786                UnaryOp::Neg => match value {
787                    Value::Int64(v) => Ok(Value::Int64(-v)),
788                    Value::Float64(v) => Ok(Value::Float64(-v)),
789                    _ => Err(Error::PlanError(
790                        "cannot negate non-numeric value".to_string(),
791                    )),
792                },
793            }
794        }
795        Expr::FunctionCall { name, args } => {
796            let values = args
797                .iter()
798                .map(|arg| eval_query_result_expr(arg, row, input_columns, params))
799                .collect::<Result<Vec<_>>>()?;
800            eval_function(name, &values)
801        }
802        Expr::IsNull { expr, negated } => {
803            let is_null = eval_query_result_expr(expr, row, input_columns, params)? == Value::Null;
804            Ok(Value::Bool(if *negated { !is_null } else { is_null }))
805        }
806        Expr::InList {
807            expr,
808            list,
809            negated,
810        } => {
811            let needle = eval_query_result_expr(expr, row, input_columns, params)?;
812            let matched = list.iter().try_fold(false, |found, item| {
813                if found {
814                    Ok(true)
815                } else {
816                    let candidate = eval_query_result_expr(item, row, input_columns, params)?;
817                    Ok(
818                        matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
819                            || (needle != Value::Null
820                                && candidate != Value::Null
821                                && needle == candidate),
822                    )
823                }
824            })?;
825            Ok(Value::Bool(if *negated { !matched } else { matched }))
826        }
827        Expr::Like {
828            expr,
829            pattern,
830            negated,
831        } => {
832            let matches = match (
833                eval_query_result_expr(expr, row, input_columns, params)?,
834                eval_query_result_expr(pattern, row, input_columns, params)?,
835            ) {
836                (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
837                _ => false,
838            };
839            Ok(Value::Bool(if *negated { !matches } else { matches }))
840        }
841        _ => resolve_expr(expr, params),
842    }
843}
844
845fn eval_query_result_expr(
846    expr: &Expr,
847    row: &[Value],
848    input_columns: &[String],
849    params: &HashMap<String, Value>,
850) -> Result<Value> {
851    match expr {
852        Expr::Column(c) => lookup_query_result_column(row, input_columns, c),
853        Expr::Literal(lit) => resolve_expr(&Expr::Literal(lit.clone()), params),
854        Expr::Parameter(name) => params
855            .get(name)
856            .cloned()
857            .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", name))),
858        Expr::FunctionCall { name, args } => {
859            let values = args
860                .iter()
861                .map(|arg| eval_query_result_expr(arg, row, input_columns, params))
862                .collect::<Result<Vec<_>>>()?;
863            eval_function(name, &values)
864        }
865        _ => resolve_expr(expr, params),
866    }
867}
868
869fn exec_insert(
870    db: &Database,
871    p: &InsertPlan,
872    params: &HashMap<String, Value>,
873    tx: Option<TxId>,
874) -> Result<QueryResult> {
875    db.check_disk_budget("INSERT")?;
876    let txid = tx.ok_or_else(|| Error::Other("missing tx for insert".to_string()))?;
877
878    // When no column list is provided (INSERT INTO t VALUES (...)),
879    // infer column names from table metadata in declaration order.
880    let columns: Vec<String> = if p.columns.is_empty() {
881        let meta = db
882            .table_meta(&p.table)
883            .ok_or_else(|| Error::TableNotFound(p.table.clone()))?;
884        meta.columns.iter().map(|c| c.name.clone()).collect()
885    } else {
886        p.columns.clone()
887    };
888
889    let mut rows_affected = 0;
890    for row in &p.values {
891        let mut values = HashMap::new();
892        for (idx, expr) in row.iter().enumerate() {
893            let col = columns
894                .get(idx)
895                .ok_or_else(|| Error::PlanError("column/value count mismatch".to_string()))?;
896            let v = resolve_expr(expr, params)?;
897            values.insert(col.clone(), coerce_value_for_column(db, &p.table, col, v)?);
898        }
899
900        apply_missing_column_defaults(db, &p.table, &mut values)?;
901
902        validate_vector_columns(db, &p.table, &values)?;
903        let row_bytes = estimate_table_row_bytes(db, &p.table, &values)?;
904        db.accountant().try_allocate_for(
905            row_bytes,
906            "insert",
907            "row_insert",
908            "Reduce row size or raise MEMORY_LIMIT before inserting more data.",
909        )?;
910        let checkpoint = db.write_set_checkpoint(txid)?;
911
912        let row_id = if let Some(on_conflict) = &p.on_conflict {
913            let conflict_col = &on_conflict.columns[0];
914            let conflict_value = values
915                .get(conflict_col)
916                .ok_or_else(|| Error::Other("conflict column not in values".to_string()))?;
917            let existing =
918                db.point_lookup(&p.table, conflict_col, conflict_value, db.snapshot())?;
919            let existing_row_id = existing.as_ref().map(|row| row.row_id);
920            let existing_has_vector = existing
921                .as_ref()
922                .is_some_and(|row| db.has_live_vector(row.row_id, db.snapshot()));
923            let upsert_values = if let Some(existing_row) = existing.as_ref() {
924                apply_on_conflict_updates(
925                    db,
926                    &p.table,
927                    values.clone(),
928                    existing_row,
929                    on_conflict,
930                    params,
931                )?
932            } else {
933                values.clone()
934            };
935
936            match db.upsert_row(txid, &p.table, conflict_col, upsert_values) {
937                Ok(UpsertResult::Inserted) => {
938                    db.point_lookup_in_tx(
939                        txid,
940                        &p.table,
941                        conflict_col,
942                        conflict_value,
943                        db.snapshot(),
944                    )?
945                    .ok_or_else(|| {
946                        Error::Other("inserted upsert row not visible in tx".to_string())
947                    })?
948                    .row_id
949                }
950                Ok(UpsertResult::Updated) => {
951                    if existing_has_vector && let Some(existing_row_id) = existing_row_id {
952                        db.delete_vector(txid, existing_row_id)?;
953                    }
954                    db.point_lookup_in_tx(
955                        txid,
956                        &p.table,
957                        conflict_col,
958                        conflict_value,
959                        db.snapshot(),
960                    )?
961                    .ok_or_else(|| {
962                        Error::Other("updated upsert row not visible in tx".to_string())
963                    })?
964                    .row_id
965                }
966                Ok(UpsertResult::NoOp) => {
967                    db.accountant().release(row_bytes);
968                    0
969                }
970                Err(err) => {
971                    db.accountant().release(row_bytes);
972                    return Err(err);
973                }
974            }
975        } else {
976            match db.insert_row(txid, &p.table, values.clone()) {
977                Ok(row_id) => row_id,
978                Err(err) => {
979                    db.accountant().release(row_bytes);
980                    return Err(err);
981                }
982            }
983        };
984
985        if should_route_insert_to_graph(db, &p.table)
986            && let (
987                Some(Value::Uuid(source)),
988                Some(Value::Uuid(target)),
989                Some(Value::Text(edge_type)),
990            ) = (
991                values.get("source_id"),
992                values.get("target_id"),
993                values.get("edge_type"),
994            )
995        {
996            match db.insert_edge(txid, *source, *target, edge_type.clone(), HashMap::new()) {
997                Ok(true) => {}
998                Ok(false) => {
999                    let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1000                    db.accountant().release(row_bytes);
1001                    continue;
1002                }
1003                Err(err) => {
1004                    let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1005                    db.accountant().release(row_bytes);
1006                    return Err(err);
1007                }
1008            }
1009        }
1010
1011        if let Some(v) = vector_value_for_table(db, &p.table, &values)
1012            && row_id != 0
1013            && let Err(err) = db.insert_vector(txid, row_id, v.clone())
1014        {
1015            let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1016            db.accountant().release(row_bytes);
1017            return Err(err);
1018        }
1019
1020        rows_affected += 1;
1021    }
1022
1023    Ok(QueryResult::empty_with_affected(rows_affected))
1024}
1025
1026fn exec_delete(
1027    db: &Database,
1028    p: &DeletePlan,
1029    params: &HashMap<String, Value>,
1030    tx: Option<TxId>,
1031) -> Result<QueryResult> {
1032    let txid = tx.ok_or_else(|| Error::Other("missing tx for delete".to_string()))?;
1033    let snapshot = db.snapshot();
1034    let rows = db.scan(&p.table, snapshot)?;
1035    let resolved_where = p
1036        .where_clause
1037        .as_ref()
1038        .map(|expr| resolve_in_subqueries(db, expr, params, tx))
1039        .transpose()?;
1040    let matched: Vec<_> = rows
1041        .into_iter()
1042        .filter(|r| {
1043            resolved_where
1044                .as_ref()
1045                .is_none_or(|w| row_matches(r, w, params).unwrap_or(false))
1046        })
1047        .collect();
1048
1049    for row in &matched {
1050        if db.has_live_vector(row.row_id, snapshot) {
1051            db.delete_vector(txid, row.row_id)?;
1052        }
1053        db.delete_row(txid, &p.table, row.row_id)?;
1054    }
1055
1056    Ok(QueryResult::empty_with_affected(matched.len() as u64))
1057}
1058
1059fn exec_update(
1060    db: &Database,
1061    p: &UpdatePlan,
1062    params: &HashMap<String, Value>,
1063    tx: Option<TxId>,
1064) -> Result<QueryResult> {
1065    db.check_disk_budget("UPDATE")?;
1066    let txid = tx.ok_or_else(|| Error::Other("missing tx for update".to_string()))?;
1067    let snapshot = db.snapshot();
1068    let rows = db.scan(&p.table, snapshot)?;
1069    let resolved_where = p
1070        .where_clause
1071        .as_ref()
1072        .map(|expr| resolve_in_subqueries(db, expr, params, tx))
1073        .transpose()?;
1074    let matched: Vec<_> = rows
1075        .into_iter()
1076        .filter(|r| {
1077            resolved_where
1078                .as_ref()
1079                .is_none_or(|w| row_matches(r, w, params).unwrap_or(false))
1080        })
1081        .collect();
1082
1083    for row in &matched {
1084        let mut values = row.values.clone();
1085        for (k, vexpr) in &p.assignments {
1086            let value = eval_assignment_expr(vexpr, &row.values, params)?;
1087            values.insert(k.clone(), coerce_value_for_column(db, &p.table, k, value)?);
1088        }
1089        validate_update_state_transition(db, &p.table, row, &values)?;
1090        let row_uuid = values.get("id").and_then(Value::as_uuid).copied();
1091        let new_state = db
1092            .table_meta(&p.table)
1093            .as_ref()
1094            .and_then(|meta| meta.state_machine.as_ref())
1095            .and_then(|sm| values.get(&sm.column))
1096            .and_then(Value::as_text)
1097            .map(std::borrow::ToOwned::to_owned);
1098
1099        let old_has_vector = db.has_live_vector(row.row_id, snapshot);
1100        validate_vector_columns(db, &p.table, &values)?;
1101        let new_vector = vector_value_for_table(db, &p.table, &values).cloned();
1102        let new_row_bytes = estimate_table_row_bytes(db, &p.table, &values)?;
1103        db.accountant().try_allocate_for(
1104            new_row_bytes,
1105            "update",
1106            "row_replace",
1107            "Reduce row growth or raise MEMORY_LIMIT before updating this row.",
1108        )?;
1109        let checkpoint = db.write_set_checkpoint(txid)?;
1110
1111        if let Err(err) = db.delete_row(txid, &p.table, row.row_id) {
1112            db.accountant().release(new_row_bytes);
1113            return Err(err);
1114        }
1115        if old_has_vector && let Err(err) = db.delete_vector(txid, row.row_id) {
1116            db.accountant().release(new_row_bytes);
1117            let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1118            return Err(err);
1119        }
1120
1121        let new_row_id = match db.insert_row(txid, &p.table, values) {
1122            Ok(row_id) => row_id,
1123            Err(err) => {
1124                db.accountant().release(new_row_bytes);
1125                let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1126                return Err(err);
1127            }
1128        };
1129        if let Some(vector) = new_vector
1130            && let Err(err) = db.insert_vector(txid, new_row_id, vector)
1131        {
1132            db.accountant().release(new_row_bytes);
1133            let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1134            return Err(err);
1135        }
1136        if let Err(err) =
1137            db.propagate_state_change_if_needed(txid, &p.table, row_uuid, new_state.as_deref())
1138        {
1139            db.accountant().release(new_row_bytes);
1140            let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1141            return Err(err);
1142        }
1143    }
1144
1145    Ok(QueryResult::empty_with_affected(matched.len() as u64))
1146}
1147
1148fn estimate_table_row_bytes(
1149    db: &Database,
1150    table: &str,
1151    values: &HashMap<String, Value>,
1152) -> Result<usize> {
1153    let meta = db
1154        .table_meta(table)
1155        .ok_or_else(|| Error::TableNotFound(table.to_string()))?;
1156    Ok(estimate_row_bytes_for_meta(values, &meta, false))
1157}
1158
1159fn validate_update_state_transition(
1160    db: &Database,
1161    table: &str,
1162    existing: &VersionedRow,
1163    next_values: &HashMap<String, Value>,
1164) -> Result<()> {
1165    let Some(meta) = db.table_meta(table) else {
1166        return Ok(());
1167    };
1168    let Some(state_machine) = meta.state_machine else {
1169        return Ok(());
1170    };
1171
1172    let old_state = existing
1173        .values
1174        .get(&state_machine.column)
1175        .and_then(Value::as_text);
1176    let new_state = next_values
1177        .get(&state_machine.column)
1178        .and_then(Value::as_text);
1179
1180    let (Some(old_state), Some(new_state)) = (old_state, new_state) else {
1181        return Ok(());
1182    };
1183
1184    if old_state == new_state
1185        || db.relational_store().validate_state_transition(
1186            table,
1187            &state_machine.column,
1188            old_state,
1189            new_state,
1190        )
1191    {
1192        return Ok(());
1193    }
1194
1195    Err(Error::InvalidStateTransition(format!(
1196        "{old_state} -> {new_state}"
1197    )))
1198}
1199
1200fn estimate_row_bytes_for_meta(
1201    values: &HashMap<String, Value>,
1202    meta: &TableMeta,
1203    include_vectors: bool,
1204) -> usize {
1205    let mut bytes = 96usize;
1206    for column in &meta.columns {
1207        let Some(value) = values.get(&column.name) else {
1208            continue;
1209        };
1210        if !include_vectors && matches!(column.column_type, ColumnType::Vector(_)) {
1211            continue;
1212        }
1213        bytes = bytes.saturating_add(32 + column.name.len() * 8 + value.estimated_bytes());
1214    }
1215    bytes
1216}
1217
1218fn estimate_vector_search_bytes(dimension: usize, k: usize) -> usize {
1219    k.saturating_mul(3)
1220        .saturating_mul(dimension)
1221        .saturating_mul(std::mem::size_of::<f32>())
1222}
1223
1224fn estimate_bfs_working_bytes<T>(
1225    frontier: &[T],
1226    steps: &[contextdb_planner::GraphStepPlan],
1227) -> usize {
1228    let max_hops = steps.iter().fold(0usize, |acc, step| {
1229        acc.saturating_add(step.max_depth as usize)
1230    });
1231    frontier
1232        .len()
1233        .saturating_mul(2048)
1234        .saturating_mul(max_hops.max(1))
1235}
1236
1237fn dedupe_graph_frontier(
1238    frontier: Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>,
1239    steps: &[contextdb_planner::GraphStepPlan],
1240) -> Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)> {
1241    let mut best =
1242        HashMap::<Vec<uuid::Uuid>, (HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>::new();
1243
1244    for (bindings, current_id, depth) in frontier {
1245        let mut key = Vec::with_capacity(steps.len());
1246        for step in steps {
1247            if let Some(id) = bindings.get(&step.target_alias) {
1248                key.push(*id);
1249            }
1250        }
1251
1252        best.entry(key)
1253            .and_modify(|existing| {
1254                if depth < existing.2 {
1255                    *existing = (bindings.clone(), current_id, depth);
1256                }
1257            })
1258            .or_insert((bindings, current_id, depth));
1259    }
1260
1261    best.into_values().collect()
1262}
1263
1264fn estimate_drop_table_bytes(db: &Database, table: &str) -> usize {
1265    let meta = db.table_meta(table);
1266    let metadata_bytes = meta.as_ref().map(TableMeta::estimated_bytes).unwrap_or(0);
1267    let snapshot = db.snapshot();
1268    let rows = db.scan(table, snapshot).unwrap_or_default();
1269    let row_bytes = rows.iter().fold(0usize, |acc, row| {
1270        acc.saturating_add(meta.as_ref().map_or_else(
1271            || row.estimated_bytes(),
1272            |meta| estimate_row_bytes_for_meta(&row.values, meta, false),
1273        ))
1274    });
1275    let vector_bytes = rows
1276        .iter()
1277        .filter_map(|row| db.live_vector_entry(row.row_id, snapshot))
1278        .fold(0usize, |acc, entry| {
1279            acc.saturating_add(entry.estimated_bytes())
1280        });
1281    let edge_bytes = rows.iter().fold(0usize, |acc, row| {
1282        match (
1283            row.values.get("source_id").and_then(Value::as_uuid),
1284            row.values.get("target_id").and_then(Value::as_uuid),
1285            row.values.get("edge_type").and_then(Value::as_text),
1286        ) {
1287            (Some(_), Some(_), Some(edge_type)) => acc.saturating_add(
1288                96 + edge_type.len().saturating_mul(16) + estimate_row_value_bytes(&HashMap::new()),
1289            ),
1290            _ => acc,
1291        }
1292    });
1293    metadata_bytes
1294        .saturating_add(row_bytes)
1295        .saturating_add(vector_bytes)
1296        .saturating_add(edge_bytes)
1297}
1298
1299fn materialize_rows(
1300    rows: Vec<VersionedRow>,
1301    filter: Option<&Expr>,
1302    params: &HashMap<String, Value>,
1303    schema_columns: Option<&[String]>,
1304) -> Result<QueryResult> {
1305    let filtered: Vec<VersionedRow> = rows
1306        .into_iter()
1307        .filter(|r| filter.is_none_or(|f| row_matches(r, f, params).unwrap_or(false)))
1308        .collect();
1309
1310    let keys = if let Some(schema_columns) = schema_columns {
1311        schema_columns.to_vec()
1312    } else {
1313        let mut keys = BTreeSet::new();
1314        for r in &filtered {
1315            for k in r.values.keys() {
1316                keys.insert(k.clone());
1317            }
1318        }
1319        keys.into_iter().collect::<Vec<_>>()
1320    };
1321
1322    let mut columns = vec!["row_id".to_string()];
1323    columns.extend(keys.iter().cloned());
1324
1325    let rows = filtered
1326        .into_iter()
1327        .map(|r| {
1328            let mut out = vec![Value::Int64(r.row_id as i64)];
1329            for k in &keys {
1330                out.push(r.values.get(k).cloned().unwrap_or(Value::Null));
1331            }
1332            out
1333        })
1334        .collect();
1335
1336    Ok(QueryResult {
1337        columns,
1338        rows,
1339        rows_affected: 0,
1340    })
1341}
1342
1343fn row_matches(row: &VersionedRow, expr: &Expr, params: &HashMap<String, Value>) -> Result<bool> {
1344    Ok(eval_bool_expr(row, expr, params)?.unwrap_or(false))
1345}
1346
1347fn eval_expr_value(
1348    row: &VersionedRow,
1349    expr: &Expr,
1350    params: &HashMap<String, Value>,
1351) -> Result<Value> {
1352    match expr {
1353        Expr::Column(c) => {
1354            if c.column == "row_id" {
1355                Ok(Value::Int64(row.row_id as i64))
1356            } else {
1357                Ok(row.values.get(&c.column).cloned().unwrap_or(Value::Null))
1358            }
1359        }
1360        Expr::BinaryOp { left, op, right } => {
1361            let left = eval_expr_value(row, left, params)?;
1362            let right = eval_expr_value(row, right, params)?;
1363            eval_binary_op(op, &left, &right)
1364        }
1365        Expr::UnaryOp { op, operand } => {
1366            let value = eval_expr_value(row, operand, params)?;
1367            match op {
1368                UnaryOp::Not => Ok(Value::Bool(!value_to_bool(&value))),
1369                UnaryOp::Neg => match value {
1370                    Value::Int64(v) => Ok(Value::Int64(-v)),
1371                    Value::Float64(v) => Ok(Value::Float64(-v)),
1372                    _ => Err(Error::PlanError(
1373                        "cannot negate non-numeric value".to_string(),
1374                    )),
1375                },
1376            }
1377        }
1378        Expr::FunctionCall { name, args } => eval_function_in_row_context(row, name, args, params),
1379        Expr::IsNull { expr, negated } => {
1380            let is_null = eval_expr_value(row, expr, params)? == Value::Null;
1381            Ok(Value::Bool(if *negated { !is_null } else { is_null }))
1382        }
1383        Expr::InList {
1384            expr,
1385            list,
1386            negated,
1387        } => {
1388            let needle = eval_expr_value(row, expr, params)?;
1389            let matched = list.iter().try_fold(false, |found, item| {
1390                if found {
1391                    Ok(true)
1392                } else {
1393                    let candidate = eval_expr_value(row, item, params)?;
1394                    Ok(
1395                        matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
1396                            || (needle != Value::Null
1397                                && candidate != Value::Null
1398                                && needle == candidate),
1399                    )
1400                }
1401            })?;
1402            Ok(Value::Bool(if *negated { !matched } else { matched }))
1403        }
1404        Expr::Like {
1405            expr,
1406            pattern,
1407            negated,
1408        } => {
1409            let matches = match (
1410                eval_expr_value(row, expr, params)?,
1411                eval_expr_value(row, pattern, params)?,
1412            ) {
1413                (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
1414                _ => false,
1415            };
1416            Ok(Value::Bool(if *negated { !matches } else { matches }))
1417        }
1418        _ => resolve_expr(expr, params),
1419    }
1420}
1421
1422pub fn resolve_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Value> {
1423    match expr {
1424        Expr::Literal(l) => Ok(match l {
1425            Literal::Null => Value::Null,
1426            Literal::Bool(v) => Value::Bool(*v),
1427            Literal::Integer(v) => Value::Int64(*v),
1428            Literal::Real(v) => Value::Float64(*v),
1429            Literal::Text(v) => Value::Text(v.clone()),
1430            Literal::Vector(v) => Value::Vector(v.clone()),
1431        }),
1432        Expr::Parameter(p) => params
1433            .get(p)
1434            .cloned()
1435            .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", p))),
1436        Expr::Column(c) => Ok(Value::Text(c.column.clone())),
1437        Expr::UnaryOp { op, operand } => match op {
1438            UnaryOp::Neg => match resolve_expr(operand, params)? {
1439                Value::Int64(v) => Ok(Value::Int64(-v)),
1440                Value::Float64(v) => Ok(Value::Float64(-v)),
1441                _ => Err(Error::PlanError(
1442                    "cannot negate non-numeric value".to_string(),
1443                )),
1444            },
1445            UnaryOp::Not => Err(Error::PlanError(
1446                "boolean NOT requires row context".to_string(),
1447            )),
1448        },
1449        Expr::FunctionCall { name, args } => {
1450            let values = args
1451                .iter()
1452                .map(|arg| resolve_expr(arg, params))
1453                .collect::<Result<Vec<_>>>()?;
1454            eval_function(name, &values)
1455        }
1456        Expr::CosineDistance { right, .. } => resolve_expr(right, params),
1457        _ => Err(Error::PlanError("unsupported expression".to_string())),
1458    }
1459}
1460
1461fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
1462    match (a, b) {
1463        (Value::Int64(left), Value::Int64(right)) => Some(left.cmp(right)),
1464        (Value::Float64(left), Value::Float64(right)) => Some(left.total_cmp(right)),
1465        (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
1466        (Value::Timestamp(left), Value::Timestamp(right)) => Some(left.cmp(right)),
1467        (Value::Int64(left), Value::Float64(right)) => Some((*left as f64).total_cmp(right)),
1468        (Value::Float64(left), Value::Int64(right)) => Some(left.total_cmp(&(*right as f64))),
1469        (Value::Timestamp(left), Value::Int64(right)) => Some(left.cmp(right)),
1470        (Value::Int64(left), Value::Timestamp(right)) => Some(left.cmp(right)),
1471        (Value::Bool(left), Value::Bool(right)) => Some(left.cmp(right)),
1472        (Value::Uuid(left), Value::Uuid(right)) => Some(left.cmp(right)),
1473        (Value::Uuid(u), Value::Text(t)) => {
1474            if let Ok(parsed) = t.parse::<uuid::Uuid>() {
1475                Some(u.cmp(&parsed))
1476            } else {
1477                None
1478            }
1479        }
1480        (Value::Text(t), Value::Uuid(u)) => {
1481            if let Ok(parsed) = t.parse::<uuid::Uuid>() {
1482                Some(parsed.cmp(u))
1483            } else {
1484                None
1485            }
1486        }
1487        (Value::Null, _) | (_, Value::Null) => None,
1488        _ => None,
1489    }
1490}
1491
1492fn eval_bool_expr(
1493    row: &VersionedRow,
1494    expr: &Expr,
1495    params: &HashMap<String, Value>,
1496) -> Result<Option<bool>> {
1497    match expr {
1498        Expr::BinaryOp { left, op, right } => match op {
1499            BinOp::Eq | BinOp::Neq | BinOp::Lt | BinOp::Lte | BinOp::Gt | BinOp::Gte => {
1500                let left = eval_expr_value(row, left, params)?;
1501                let right = eval_expr_value(row, right, params)?;
1502                if left == Value::Null || right == Value::Null {
1503                    return Ok(None);
1504                }
1505
1506                let result = match op {
1507                    BinOp::Eq => {
1508                        compare_values(&left, &right) == Some(Ordering::Equal) || left == right
1509                    }
1510                    BinOp::Neq => {
1511                        !(compare_values(&left, &right) == Some(Ordering::Equal) || left == right)
1512                    }
1513                    BinOp::Lt => compare_values(&left, &right) == Some(Ordering::Less),
1514                    BinOp::Lte => matches!(
1515                        compare_values(&left, &right),
1516                        Some(Ordering::Less | Ordering::Equal)
1517                    ),
1518                    BinOp::Gt => compare_values(&left, &right) == Some(Ordering::Greater),
1519                    BinOp::Gte => matches!(
1520                        compare_values(&left, &right),
1521                        Some(Ordering::Greater | Ordering::Equal)
1522                    ),
1523                    BinOp::And | BinOp::Or => unreachable!(),
1524                };
1525                Ok(Some(result))
1526            }
1527            BinOp::And => {
1528                let left = eval_bool_expr(row, left, params)?;
1529                if left == Some(false) {
1530                    return Ok(Some(false));
1531                }
1532                let right = eval_bool_expr(row, right, params)?;
1533                Ok(match (left, right) {
1534                    (Some(true), Some(true)) => Some(true),
1535                    (Some(true), other) => other,
1536                    (None, Some(false)) => Some(false),
1537                    (None, Some(true)) | (None, None) => None,
1538                    (Some(false), _) => Some(false),
1539                })
1540            }
1541            BinOp::Or => {
1542                let left = eval_bool_expr(row, left, params)?;
1543                if left == Some(true) {
1544                    return Ok(Some(true));
1545                }
1546                let right = eval_bool_expr(row, right, params)?;
1547                Ok(match (left, right) {
1548                    (Some(false), Some(false)) => Some(false),
1549                    (Some(false), other) => other,
1550                    (None, Some(true)) => Some(true),
1551                    (None, Some(false)) | (None, None) => None,
1552                    (Some(true), _) => Some(true),
1553                })
1554            }
1555        },
1556        Expr::UnaryOp {
1557            op: UnaryOp::Not,
1558            operand,
1559        } => Ok(eval_bool_expr(row, operand, params)?.map(|value| !value)),
1560        Expr::InList {
1561            expr,
1562            list,
1563            negated,
1564        } => {
1565            let needle = eval_expr_value(row, expr, params)?;
1566            if needle == Value::Null {
1567                return Ok(None);
1568            }
1569
1570            let matched = list.iter().try_fold(false, |found, item| {
1571                if found {
1572                    Ok(true)
1573                } else {
1574                    let candidate = eval_expr_value(row, item, params)?;
1575                    Ok(
1576                        matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
1577                            || (candidate != Value::Null && needle == candidate),
1578                    )
1579                }
1580            })?;
1581            Ok(Some(if *negated { !matched } else { matched }))
1582        }
1583        Expr::InSubquery { .. } => Err(Error::PlanError(
1584            "IN (subquery) must be resolved before execution".to_string(),
1585        )),
1586        Expr::Like {
1587            expr,
1588            pattern,
1589            negated,
1590        } => {
1591            let left = eval_expr_value(row, expr, params)?;
1592            let right = eval_expr_value(row, pattern, params)?;
1593            let matched = match (left, right) {
1594                (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
1595                _ => false,
1596            };
1597            Ok(Some(if *negated { !matched } else { matched }))
1598        }
1599        Expr::IsNull { expr, negated } => {
1600            let is_null = eval_expr_value(row, expr, params)? == Value::Null;
1601            Ok(Some(if *negated { !is_null } else { is_null }))
1602        }
1603        Expr::FunctionCall { .. } => match eval_expr_value(row, expr, params)? {
1604            Value::Bool(value) => Ok(Some(value)),
1605            Value::Null => Ok(None),
1606            _ => Err(Error::PlanError(format!(
1607                "unsupported WHERE expression: {:?}",
1608                expr
1609            ))),
1610        },
1611        _ => Err(Error::PlanError(format!(
1612            "unsupported WHERE expression: {:?}",
1613            expr
1614        ))),
1615    }
1616}
1617
1618fn eval_binary_op(op: &BinOp, left: &Value, right: &Value) -> Result<Value> {
1619    let bool_value = match op {
1620        BinOp::Eq => {
1621            if left == &Value::Null || right == &Value::Null {
1622                false
1623            } else {
1624                compare_values(left, right) == Some(Ordering::Equal) || left == right
1625            }
1626        }
1627        BinOp::Neq => {
1628            if left == &Value::Null || right == &Value::Null {
1629                false
1630            } else {
1631                !(compare_values(left, right) == Some(Ordering::Equal) || left == right)
1632            }
1633        }
1634        BinOp::Lt => compare_values(left, right) == Some(Ordering::Less),
1635        BinOp::Lte => matches!(
1636            compare_values(left, right),
1637            Some(Ordering::Less | Ordering::Equal)
1638        ),
1639        BinOp::Gt => compare_values(left, right) == Some(Ordering::Greater),
1640        BinOp::Gte => matches!(
1641            compare_values(left, right),
1642            Some(Ordering::Greater | Ordering::Equal)
1643        ),
1644        BinOp::And => value_to_bool(left) && value_to_bool(right),
1645        BinOp::Or => value_to_bool(left) || value_to_bool(right),
1646    };
1647    Ok(Value::Bool(bool_value))
1648}
1649
1650fn value_to_bool(value: &Value) -> bool {
1651    matches!(value, Value::Bool(true))
1652}
1653
1654fn compare_sort_values(left: &Value, right: &Value, direction: SortDirection) -> Ordering {
1655    match (left, right) {
1656        (Value::Null, Value::Null) => Ordering::Equal,
1657        (Value::Null, _) => match direction {
1658            SortDirection::Asc => Ordering::Greater,
1659            SortDirection::Desc => Ordering::Less,
1660            SortDirection::CosineDistance => Ordering::Equal,
1661        },
1662        (_, Value::Null) => match direction {
1663            SortDirection::Asc => Ordering::Less,
1664            SortDirection::Desc => Ordering::Greater,
1665            SortDirection::CosineDistance => Ordering::Equal,
1666        },
1667        _ => {
1668            let ordering = compare_values(left, right).unwrap_or(Ordering::Equal);
1669            match direction {
1670                SortDirection::Asc => ordering,
1671                SortDirection::Desc => ordering.reverse(),
1672                SortDirection::CosineDistance => ordering,
1673            }
1674        }
1675    }
1676}
1677
1678fn eval_assignment_expr(
1679    expr: &Expr,
1680    row_values: &HashMap<String, Value>,
1681    params: &HashMap<String, Value>,
1682) -> Result<Value> {
1683    match expr {
1684        Expr::Literal(lit) => literal_to_value(lit),
1685        Expr::Parameter(name) => params
1686            .get(name)
1687            .cloned()
1688            .ok_or_else(|| Error::Other(format!("unknown parameter: {}", name))),
1689        Expr::Column(col_ref) => row_values
1690            .get(&col_ref.column)
1691            .cloned()
1692            .ok_or_else(|| Error::Other(format!("column not found: {}", col_ref.column))),
1693        Expr::BinaryOp { left, op, right } => {
1694            let left = eval_assignment_expr(left, row_values, params)?;
1695            let right = eval_assignment_expr(right, row_values, params)?;
1696            eval_binary_op(op, &left, &right)
1697        }
1698        Expr::UnaryOp { op, operand } => match op {
1699            UnaryOp::Neg => match eval_assignment_expr(operand, row_values, params)? {
1700                Value::Int64(value) => Ok(Value::Int64(-value)),
1701                Value::Float64(value) => Ok(Value::Float64(-value)),
1702                _ => Err(Error::Other(format!(
1703                    "unsupported expression in UPDATE SET: {:?}",
1704                    expr
1705                ))),
1706            },
1707            UnaryOp::Not => Err(Error::Other(format!(
1708                "unsupported expression in UPDATE SET: {:?}",
1709                expr
1710            ))),
1711        },
1712        Expr::FunctionCall { name, args } => {
1713            let evaluated = args
1714                .iter()
1715                .map(|arg| eval_assignment_expr(arg, row_values, params))
1716                .collect::<Result<Vec<_>>>()?;
1717            eval_function(name, &evaluated)
1718        }
1719        _ => Err(Error::Other(format!(
1720            "unsupported expression in UPDATE SET: {:?}",
1721            expr
1722        ))),
1723    }
1724}
1725
1726fn apply_on_conflict_updates(
1727    db: &Database,
1728    table: &str,
1729    mut insert_values: HashMap<String, Value>,
1730    existing_row: &VersionedRow,
1731    on_conflict: &OnConflictPlan,
1732    params: &HashMap<String, Value>,
1733) -> Result<HashMap<String, Value>> {
1734    if on_conflict.update_columns.is_empty() {
1735        return Ok(insert_values);
1736    }
1737
1738    let mut merged = existing_row.values.clone();
1739    for (column, expr) in &on_conflict.update_columns {
1740        let value = eval_assignment_expr(expr, &existing_row.values, params)?;
1741        merged.insert(
1742            column.clone(),
1743            coerce_value_for_column(db, table, column, value)?,
1744        );
1745    }
1746
1747    for (column, value) in insert_values.drain() {
1748        merged.entry(column).or_insert(value);
1749    }
1750
1751    Ok(merged)
1752}
1753
1754fn literal_to_value(lit: &Literal) -> Result<Value> {
1755    Ok(match lit {
1756        Literal::Null => Value::Null,
1757        Literal::Bool(v) => Value::Bool(*v),
1758        Literal::Integer(v) => Value::Int64(*v),
1759        Literal::Real(v) => Value::Float64(*v),
1760        Literal::Text(v) => Value::Text(v.clone()),
1761        Literal::Vector(v) => Value::Vector(v.clone()),
1762    })
1763}
1764
1765fn eval_arithmetic(name: &str, args: &[Value]) -> Result<Value> {
1766    let [left, right] = args else {
1767        return Err(Error::PlanError(format!(
1768            "function {} expects 2 arguments",
1769            name
1770        )));
1771    };
1772
1773    match (left, right) {
1774        (Value::Int64(left), Value::Int64(right)) => match name {
1775            "__add" => Ok(Value::Int64(left + right)),
1776            "__sub" => Ok(Value::Int64(left - right)),
1777            "__mul" => Ok(Value::Int64(left * right)),
1778            "__div" => Ok(Value::Int64(left / right)),
1779            _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1780        },
1781        (Value::Float64(left), Value::Float64(right)) => match name {
1782            "__add" => Ok(Value::Float64(left + right)),
1783            "__sub" => Ok(Value::Float64(left - right)),
1784            "__mul" => Ok(Value::Float64(left * right)),
1785            "__div" => Ok(Value::Float64(left / right)),
1786            _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1787        },
1788        (Value::Int64(left), Value::Float64(right)) => match name {
1789            "__add" => Ok(Value::Float64(*left as f64 + right)),
1790            "__sub" => Ok(Value::Float64(*left as f64 - right)),
1791            "__mul" => Ok(Value::Float64(*left as f64 * right)),
1792            "__div" => Ok(Value::Float64(*left as f64 / right)),
1793            _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1794        },
1795        (Value::Float64(left), Value::Int64(right)) => match name {
1796            "__add" => Ok(Value::Float64(left + *right as f64)),
1797            "__sub" => Ok(Value::Float64(left - *right as f64)),
1798            "__mul" => Ok(Value::Float64(left * *right as f64)),
1799            "__div" => Ok(Value::Float64(left / *right as f64)),
1800            _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1801        },
1802        _ => Err(Error::PlanError(format!(
1803            "function {} expects numeric arguments",
1804            name
1805        ))),
1806    }
1807}
1808
1809fn eval_function_in_row_context(
1810    row: &VersionedRow,
1811    name: &str,
1812    args: &[Expr],
1813    params: &HashMap<String, Value>,
1814) -> Result<Value> {
1815    let values = args
1816        .iter()
1817        .map(|arg| eval_expr_value(row, arg, params))
1818        .collect::<Result<Vec<_>>>()?;
1819    eval_function(name, &values)
1820}
1821
1822fn eval_function(name: &str, args: &[Value]) -> Result<Value> {
1823    match name.to_ascii_lowercase().as_str() {
1824        "__add" | "__sub" | "__mul" | "__div" => eval_arithmetic(name, args),
1825        "coalesce" => Ok(args
1826            .iter()
1827            .find(|value| **value != Value::Null)
1828            .cloned()
1829            .unwrap_or(Value::Null)),
1830        "now" => Ok(Value::Timestamp(
1831            SystemTime::now()
1832                .duration_since(UNIX_EPOCH)
1833                .map_err(|err| Error::PlanError(err.to_string()))?
1834                .as_secs() as i64,
1835        )),
1836        _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1837    }
1838}
1839
1840fn like_matches(value: &str, pattern: &str) -> bool {
1841    let value_chars = value.chars().collect::<Vec<_>>();
1842    let pattern_chars = pattern.chars().collect::<Vec<_>>();
1843    let (mut vi, mut pi) = (0usize, 0usize);
1844    let (mut star_idx, mut match_idx) = (None, 0usize);
1845
1846    while vi < value_chars.len() {
1847        if pi < pattern_chars.len()
1848            && (pattern_chars[pi] == '_' || pattern_chars[pi] == value_chars[vi])
1849        {
1850            vi += 1;
1851            pi += 1;
1852        } else if pi < pattern_chars.len() && pattern_chars[pi] == '%' {
1853            star_idx = Some(pi);
1854            match_idx = vi;
1855            pi += 1;
1856        } else if let Some(star) = star_idx {
1857            pi = star + 1;
1858            match_idx += 1;
1859            vi = match_idx;
1860        } else {
1861            return false;
1862        }
1863    }
1864
1865    while pi < pattern_chars.len() && pattern_chars[pi] == '%' {
1866        pi += 1;
1867    }
1868
1869    pi == pattern_chars.len()
1870}
1871
1872fn resolve_in_subqueries(
1873    db: &Database,
1874    expr: &Expr,
1875    params: &HashMap<String, Value>,
1876    tx: Option<TxId>,
1877) -> Result<Expr> {
1878    resolve_in_subqueries_with_ctes(db, expr, params, tx, &[])
1879}
1880
1881pub(crate) fn resolve_in_subqueries_with_ctes(
1882    db: &Database,
1883    expr: &Expr,
1884    params: &HashMap<String, Value>,
1885    tx: Option<TxId>,
1886    ctes: &[Cte],
1887) -> Result<Expr> {
1888    match expr {
1889        Expr::InSubquery {
1890            expr,
1891            subquery,
1892            negated,
1893        } => {
1894            // Detect correlated subqueries: WHERE references to outer tables
1895            let mut subquery_tables: std::collections::HashSet<String> = subquery
1896                .from
1897                .iter()
1898                .filter_map(|item| match item {
1899                    contextdb_parser::ast::FromItem::Table { name, .. } => Some(name.clone()),
1900                    _ => None,
1901                })
1902                .collect();
1903            // CTE names are valid table references within the subquery
1904            for cte in ctes {
1905                match cte {
1906                    Cte::SqlCte { name, .. } | Cte::MatchCte { name, .. } => {
1907                        subquery_tables.insert(name.clone());
1908                    }
1909                }
1910            }
1911            if let Some(where_clause) = &subquery.where_clause
1912                && has_outer_table_ref(where_clause, &subquery_tables)
1913            {
1914                return Err(Error::Other(
1915                    "correlated subqueries are not supported".to_string(),
1916                ));
1917            }
1918
1919            let query_plan = plan(&Statement::Select(SelectStatement {
1920                ctes: ctes.to_vec(),
1921                body: (**subquery).clone(),
1922            }))?;
1923            let result = execute_plan(db, &query_plan, params, tx)?;
1924            let select_expr = subquery
1925                .columns
1926                .first()
1927                .map(|column| column.expr.clone())
1928                .ok_or_else(|| Error::PlanError("subquery must select one column".to_string()))?;
1929            let list = result
1930                .rows
1931                .iter()
1932                .map(|row| eval_project_expr(&select_expr, row, &result.columns, params))
1933                .collect::<Result<Vec<_>>>()?
1934                .into_iter()
1935                .map(value_to_literal)
1936                .collect::<Result<Vec<_>>>()?;
1937            Ok(Expr::InList {
1938                expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
1939                list,
1940                negated: *negated,
1941            })
1942        }
1943        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
1944            left: Box::new(resolve_in_subqueries_with_ctes(db, left, params, tx, ctes)?),
1945            op: *op,
1946            right: Box::new(resolve_in_subqueries_with_ctes(
1947                db, right, params, tx, ctes,
1948            )?),
1949        }),
1950        Expr::UnaryOp { op, operand } => Ok(Expr::UnaryOp {
1951            op: *op,
1952            operand: Box::new(resolve_in_subqueries_with_ctes(
1953                db, operand, params, tx, ctes,
1954            )?),
1955        }),
1956        Expr::InList {
1957            expr,
1958            list,
1959            negated,
1960        } => Ok(Expr::InList {
1961            expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
1962            list: list
1963                .iter()
1964                .map(|item| resolve_in_subqueries_with_ctes(db, item, params, tx, ctes))
1965                .collect::<Result<Vec<_>>>()?,
1966            negated: *negated,
1967        }),
1968        Expr::Like {
1969            expr,
1970            pattern,
1971            negated,
1972        } => Ok(Expr::Like {
1973            expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
1974            pattern: Box::new(resolve_in_subqueries_with_ctes(
1975                db, pattern, params, tx, ctes,
1976            )?),
1977            negated: *negated,
1978        }),
1979        Expr::IsNull { expr, negated } => Ok(Expr::IsNull {
1980            expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
1981            negated: *negated,
1982        }),
1983        Expr::FunctionCall { name, args } => Ok(Expr::FunctionCall {
1984            name: name.clone(),
1985            args: args
1986                .iter()
1987                .map(|arg| resolve_in_subqueries_with_ctes(db, arg, params, tx, ctes))
1988                .collect::<Result<Vec<_>>>()?,
1989        }),
1990        _ => Ok(expr.clone()),
1991    }
1992}
1993
1994fn has_outer_table_ref(expr: &Expr, subquery_tables: &std::collections::HashSet<String>) -> bool {
1995    match expr {
1996        Expr::Column(ColumnRef {
1997            table: Some(table), ..
1998        }) => !subquery_tables.contains(table),
1999        Expr::BinaryOp { left, right, .. } => {
2000            has_outer_table_ref(left, subquery_tables)
2001                || has_outer_table_ref(right, subquery_tables)
2002        }
2003        Expr::UnaryOp { operand, .. } => has_outer_table_ref(operand, subquery_tables),
2004        Expr::InList { expr, list, .. } => {
2005            has_outer_table_ref(expr, subquery_tables)
2006                || list
2007                    .iter()
2008                    .any(|item| has_outer_table_ref(item, subquery_tables))
2009        }
2010        Expr::IsNull { expr, .. } => has_outer_table_ref(expr, subquery_tables),
2011        Expr::Like { expr, pattern, .. } => {
2012            has_outer_table_ref(expr, subquery_tables)
2013                || has_outer_table_ref(pattern, subquery_tables)
2014        }
2015        Expr::FunctionCall { args, .. } => args
2016            .iter()
2017            .any(|arg| has_outer_table_ref(arg, subquery_tables)),
2018        _ => false,
2019    }
2020}
2021
2022fn value_to_literal(value: Value) -> Result<Expr> {
2023    Ok(Expr::Literal(match value {
2024        Value::Null => Literal::Null,
2025        Value::Bool(v) => Literal::Bool(v),
2026        Value::Int64(v) => Literal::Integer(v),
2027        Value::Float64(v) => Literal::Real(v),
2028        Value::Text(v) => Literal::Text(v),
2029        Value::Uuid(v) => Literal::Text(v.to_string()),
2030        Value::Timestamp(v) => Literal::Integer(v),
2031        other => {
2032            return Err(Error::PlanError(format!(
2033                "unsupported subquery result value: {:?}",
2034                other
2035            )));
2036        }
2037    }))
2038}
2039
2040fn query_result_row_matches(
2041    row: &[Value],
2042    columns: &[String],
2043    expr: &Expr,
2044    params: &HashMap<String, Value>,
2045) -> Result<bool> {
2046    Ok(eval_query_result_bool_expr(row, columns, expr, params)?.unwrap_or(false))
2047}
2048
2049fn eval_query_result_bool_expr(
2050    row: &[Value],
2051    columns: &[String],
2052    expr: &Expr,
2053    params: &HashMap<String, Value>,
2054) -> Result<Option<bool>> {
2055    match expr {
2056        Expr::BinaryOp { left, op, right } => match op {
2057            BinOp::Eq | BinOp::Neq | BinOp::Lt | BinOp::Lte | BinOp::Gt | BinOp::Gte => {
2058                let left = eval_query_result_expr(left, row, columns, params)?;
2059                let right = eval_query_result_expr(right, row, columns, params)?;
2060                if left == Value::Null || right == Value::Null {
2061                    return Ok(None);
2062                }
2063
2064                let result = match op {
2065                    BinOp::Eq => {
2066                        compare_values(&left, &right) == Some(Ordering::Equal) || left == right
2067                    }
2068                    BinOp::Neq => {
2069                        !(compare_values(&left, &right) == Some(Ordering::Equal) || left == right)
2070                    }
2071                    BinOp::Lt => compare_values(&left, &right) == Some(Ordering::Less),
2072                    BinOp::Lte => matches!(
2073                        compare_values(&left, &right),
2074                        Some(Ordering::Less | Ordering::Equal)
2075                    ),
2076                    BinOp::Gt => compare_values(&left, &right) == Some(Ordering::Greater),
2077                    BinOp::Gte => matches!(
2078                        compare_values(&left, &right),
2079                        Some(Ordering::Greater | Ordering::Equal)
2080                    ),
2081                    BinOp::And | BinOp::Or => unreachable!(),
2082                };
2083                Ok(Some(result))
2084            }
2085            BinOp::And => {
2086                let left = eval_query_result_bool_expr(row, columns, left, params)?;
2087                if left == Some(false) {
2088                    return Ok(Some(false));
2089                }
2090                let right = eval_query_result_bool_expr(row, columns, right, params)?;
2091                Ok(match (left, right) {
2092                    (Some(true), Some(true)) => Some(true),
2093                    (Some(true), other) => other,
2094                    (None, Some(false)) => Some(false),
2095                    (None, Some(true)) | (None, None) => None,
2096                    (Some(false), _) => Some(false),
2097                })
2098            }
2099            BinOp::Or => {
2100                let left = eval_query_result_bool_expr(row, columns, left, params)?;
2101                if left == Some(true) {
2102                    return Ok(Some(true));
2103                }
2104                let right = eval_query_result_bool_expr(row, columns, right, params)?;
2105                Ok(match (left, right) {
2106                    (Some(false), Some(false)) => Some(false),
2107                    (Some(false), other) => other,
2108                    (None, Some(true)) => Some(true),
2109                    (None, Some(false)) | (None, None) => None,
2110                    (Some(true), _) => Some(true),
2111                })
2112            }
2113        },
2114        Expr::UnaryOp {
2115            op: UnaryOp::Not,
2116            operand,
2117        } => Ok(eval_query_result_bool_expr(row, columns, operand, params)?.map(|value| !value)),
2118        Expr::InList {
2119            expr,
2120            list,
2121            negated,
2122        } => {
2123            let needle = eval_query_result_expr(expr, row, columns, params)?;
2124            if needle == Value::Null {
2125                return Ok(None);
2126            }
2127
2128            let matched = list.iter().try_fold(false, |found, item| {
2129                if found {
2130                    Ok(true)
2131                } else {
2132                    let candidate = eval_query_result_expr(item, row, columns, params)?;
2133                    Ok(
2134                        matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
2135                            || (candidate != Value::Null && needle == candidate),
2136                    )
2137                }
2138            })?;
2139            Ok(Some(if *negated { !matched } else { matched }))
2140        }
2141        Expr::InSubquery { .. } => Err(Error::PlanError(
2142            "IN (subquery) must be resolved before execution".to_string(),
2143        )),
2144        Expr::Like {
2145            expr,
2146            pattern,
2147            negated,
2148        } => {
2149            let left = eval_query_result_expr(expr, row, columns, params)?;
2150            let right = eval_query_result_expr(pattern, row, columns, params)?;
2151            let matched = match (left, right) {
2152                (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
2153                _ => false,
2154            };
2155            Ok(Some(if *negated { !matched } else { matched }))
2156        }
2157        Expr::IsNull { expr, negated } => {
2158            let is_null = eval_query_result_expr(expr, row, columns, params)? == Value::Null;
2159            Ok(Some(if *negated { !is_null } else { is_null }))
2160        }
2161        Expr::FunctionCall { .. } => match eval_query_result_expr(expr, row, columns, params)? {
2162            Value::Bool(value) => Ok(Some(value)),
2163            Value::Null => Ok(None),
2164            _ => Err(Error::PlanError(format!(
2165                "unsupported WHERE expression: {:?}",
2166                expr
2167            ))),
2168        },
2169        _ => Err(Error::PlanError(format!(
2170            "unsupported WHERE expression: {:?}",
2171            expr
2172        ))),
2173    }
2174}
2175
2176fn lookup_query_result_column(
2177    row: &[Value],
2178    input_columns: &[String],
2179    column_ref: &ColumnRef,
2180) -> Result<Value> {
2181    if let Some(table) = &column_ref.table {
2182        let qualified = format!("{table}.{}", column_ref.column);
2183        // Prioritize qualified match (e.g., "e.id") over unqualified (e.g., "id")
2184        // to avoid picking the wrong table's column in JOINs.
2185        let idx = input_columns
2186            .iter()
2187            .position(|name| name == &qualified)
2188            .or_else(|| {
2189                input_columns
2190                    .iter()
2191                    .position(|name| name == &column_ref.column)
2192            })
2193            .ok_or_else(|| Error::PlanError(format!("project column not found: {}", qualified)))?;
2194        return Ok(row.get(idx).cloned().unwrap_or(Value::Null));
2195    }
2196
2197    let matches = input_columns
2198        .iter()
2199        .enumerate()
2200        .filter_map(|(idx, name)| {
2201            (name == &column_ref.column
2202                || name.rsplit('.').next() == Some(column_ref.column.as_str()))
2203            .then_some(idx)
2204        })
2205        .collect::<Vec<_>>();
2206
2207    match matches.as_slice() {
2208        [] => Err(Error::PlanError(format!(
2209            "project column not found: {}",
2210            column_ref.column
2211        ))),
2212        [idx] => Ok(row.get(*idx).cloned().unwrap_or(Value::Null)),
2213        _ => Err(Error::PlanError(format!(
2214            "ambiguous column reference: {}",
2215            column_ref.column
2216        ))),
2217    }
2218}
2219
2220fn concatenate_rows(left: &[Value], right: &[Value]) -> Vec<Value> {
2221    let mut combined = Vec::with_capacity(left.len() + right.len());
2222    combined.extend_from_slice(left);
2223    combined.extend_from_slice(right);
2224    combined
2225}
2226
2227fn duplicate_column_names(left: &[String], right: &[String]) -> BTreeSet<String> {
2228    let left_names = left
2229        .iter()
2230        .map(|column| column.rsplit('.').next().unwrap_or(column.as_str()))
2231        .collect::<BTreeSet<_>>();
2232    right
2233        .iter()
2234        .filter_map(|column| {
2235            let bare = column.rsplit('.').next().unwrap_or(column.as_str());
2236            left_names.contains(bare).then(|| bare.to_string())
2237        })
2238        .collect()
2239}
2240
2241fn qualify_join_columns(
2242    columns: &[String],
2243    left_columns: &[String],
2244    right_columns: &[String],
2245    left_alias: &Option<String>,
2246    right_prefix: &str,
2247) -> Vec<String> {
2248    let left_prefix = left_alias.as_deref();
2249    columns
2250        .iter()
2251        .enumerate()
2252        .map(|(idx, column)| {
2253            if idx < left_columns.len() {
2254                if let Some(prefix) = left_prefix {
2255                    format!(
2256                        "{prefix}.{}",
2257                        left_columns[idx].rsplit('.').next().unwrap_or(column)
2258                    )
2259                } else {
2260                    left_columns[idx].clone()
2261                }
2262            } else {
2263                let right_idx = idx - left_columns.len();
2264                let bare = right_columns[right_idx]
2265                    .rsplit('.')
2266                    .next()
2267                    .unwrap_or(right_columns[right_idx].as_str());
2268                if column == bare {
2269                    format!("{right_prefix}.{bare}")
2270                } else {
2271                    column.clone()
2272                }
2273            }
2274        })
2275        .collect()
2276}
2277
2278fn right_table_name(plan: &PhysicalPlan) -> String {
2279    match plan {
2280        PhysicalPlan::Scan { table, alias, .. } => alias.clone().unwrap_or_else(|| table.clone()),
2281        _ => "right".to_string(),
2282    }
2283}
2284
2285fn distinct_row_key(row: &[Value]) -> Vec<u8> {
2286    bincode::serde::encode_to_vec(row, bincode::config::standard())
2287        .expect("query rows should serialize for DISTINCT")
2288}
2289
2290fn resolve_uuid(expr: &Expr, params: &HashMap<String, Value>) -> Result<uuid::Uuid> {
2291    match resolve_expr(expr, params)? {
2292        Value::Uuid(u) => Ok(u),
2293        Value::Text(t) => uuid::Uuid::parse_str(&t)
2294            .map_err(|e| Error::PlanError(format!("invalid uuid '{}': {}", t, e))),
2295        _ => Err(Error::PlanError(
2296            "graph start node must be UUID".to_string(),
2297        )),
2298    }
2299}
2300
2301/// Resolve start nodes for a graph BFS from a WHERE filter like `a.name = 'entity-0'`.
2302/// Scans all relational tables for rows matching the filter condition.
2303fn resolve_graph_start_nodes_from_filter(
2304    db: &Database,
2305    filter: &Expr,
2306    params: &HashMap<String, Value>,
2307) -> Result<Vec<uuid::Uuid>> {
2308    if let Some(ids) = resolve_graph_start_ids_from_filter(filter, params)? {
2309        return Ok(ids);
2310    }
2311
2312    // Extract column name and expected value from the filter (e.g., a.name = 'entity-0')
2313    let (col_name, expected_value) = match filter {
2314        Expr::BinaryOp {
2315            left,
2316            op: BinOp::Eq,
2317            right,
2318        } => {
2319            if let Some(col) = extract_column_name(left) {
2320                (col, resolve_expr(right, params)?)
2321            } else if let Some(col) = extract_column_name(right) {
2322                (col, resolve_expr(left, params)?)
2323            } else {
2324                return Ok(vec![]);
2325            }
2326        }
2327        _ => return Ok(vec![]),
2328    };
2329
2330    let snapshot = db.snapshot();
2331    let mut uuids = Vec::new();
2332    for table_name in db.table_names() {
2333        let meta = match db.table_meta(&table_name) {
2334            Some(m) => m,
2335            None => continue,
2336        };
2337        // Only scan tables that have the referenced column and an id column
2338        let has_col = meta.columns.iter().any(|c| c.name == col_name);
2339        let has_id = meta.columns.iter().any(|c| c.name == "id");
2340        if !has_col || !has_id {
2341            continue;
2342        }
2343        let rows = db.scan_filter(&table_name, snapshot, &|row| {
2344            row.values.get(&col_name) == Some(&expected_value)
2345        })?;
2346        for row in rows {
2347            if let Some(Value::Uuid(id)) = row.values.get("id") {
2348                uuids.push(*id);
2349            }
2350        }
2351    }
2352    Ok(uuids)
2353}
2354
2355fn resolve_graph_start_nodes_from_plan(
2356    db: &Database,
2357    plan: &PhysicalPlan,
2358    params: &HashMap<String, Value>,
2359    tx: Option<TxId>,
2360) -> Result<Vec<uuid::Uuid>> {
2361    let result = execute_plan(db, plan, params, tx)?;
2362    result
2363        .rows
2364        .into_iter()
2365        .filter_map(|row| row.into_iter().next())
2366        .map(|value| match value {
2367            Value::Uuid(id) => Ok(id),
2368            Value::Text(text) => uuid::Uuid::parse_str(&text)
2369                .map_err(|_| Error::PlanError(format!("invalid UUID in graph start plan: {text}"))),
2370            other => Err(Error::PlanError(format!(
2371                "invalid graph start identifier from plan: {other:?}"
2372            ))),
2373        })
2374        .collect()
2375}
2376
2377fn resolve_graph_start_ids_from_filter(
2378    filter: &Expr,
2379    params: &HashMap<String, Value>,
2380) -> Result<Option<Vec<uuid::Uuid>>> {
2381    match filter {
2382        Expr::BinaryOp {
2383            left,
2384            op: BinOp::Eq,
2385            right,
2386        } if is_graph_id_ref(left) || is_graph_id_ref(right) => {
2387            let value = if is_graph_id_ref(left) {
2388                resolve_expr(right, params)?
2389            } else {
2390                resolve_expr(left, params)?
2391            };
2392            let id = match value {
2393                Value::Uuid(id) => id,
2394                Value::Text(text) => uuid::Uuid::parse_str(&text).map_err(|_| {
2395                    Error::PlanError(format!("invalid UUID in graph filter: {text}"))
2396                })?,
2397                other => {
2398                    return Err(Error::PlanError(format!(
2399                        "invalid graph start identifier in filter: {other:?}"
2400                    )));
2401                }
2402            };
2403            Ok(Some(vec![id]))
2404        }
2405        Expr::InList { expr, list, .. } if is_graph_id_ref(expr) => {
2406            let ids = list
2407                .iter()
2408                .map(|item| resolve_expr(item, params))
2409                .map(|value| match value? {
2410                    Value::Uuid(id) => Ok(id),
2411                    Value::Text(text) => uuid::Uuid::parse_str(&text).map_err(|_| {
2412                        Error::PlanError(format!("invalid UUID in graph filter: {text}"))
2413                    }),
2414                    other => Err(Error::PlanError(format!(
2415                        "invalid graph start identifier in filter: {other:?}"
2416                    ))),
2417                })
2418                .collect::<Result<Vec<_>>>()?;
2419            Ok(Some(ids))
2420        }
2421        Expr::BinaryOp { left, right, .. } => {
2422            if let Some(ids) = resolve_graph_start_ids_from_filter(left, params)? {
2423                return Ok(Some(ids));
2424            }
2425            resolve_graph_start_ids_from_filter(right, params)
2426        }
2427        Expr::UnaryOp { operand, .. } => resolve_graph_start_ids_from_filter(operand, params),
2428        _ => Ok(None),
2429    }
2430}
2431
2432fn is_graph_id_ref(expr: &Expr) -> bool {
2433    matches!(
2434        expr,
2435        Expr::Column(contextdb_parser::ast::ColumnRef { column, .. }) if column == "id"
2436    )
2437}
2438
2439/// Extract a bare column name from an Expr::Column, ignoring table alias.
2440fn extract_column_name(expr: &Expr) -> Option<String> {
2441    match expr {
2442        Expr::Column(contextdb_parser::ast::ColumnRef { column, .. }) => Some(column.clone()),
2443        _ => None,
2444    }
2445}
2446
2447fn resolve_vector_from_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<f32>> {
2448    match resolve_expr(expr, params)? {
2449        Value::Vector(v) => Ok(v),
2450        Value::Text(name) => match params.get(&name) {
2451            Some(Value::Vector(v)) => Ok(v.clone()),
2452            _ => Err(Error::PlanError("vector parameter missing".to_string())),
2453        },
2454        _ => Err(Error::PlanError(
2455            "invalid vector query expression".to_string(),
2456        )),
2457    }
2458}
2459
2460fn validate_vector_columns(
2461    db: &Database,
2462    table: &str,
2463    values: &HashMap<String, Value>,
2464) -> Result<()> {
2465    let Some(meta) = db.table_meta(table) else {
2466        return Ok(());
2467    };
2468
2469    for column in &meta.columns {
2470        if let contextdb_core::ColumnType::Vector(expected) = column.column_type
2471            && let Some(Value::Vector(vector)) = values.get(&column.name)
2472        {
2473            let got = vector.len();
2474            if got != expected {
2475                return Err(Error::VectorDimensionMismatch { expected, got });
2476            }
2477        }
2478    }
2479
2480    Ok(())
2481}
2482
2483fn vector_value_for_table<'a>(
2484    db: &Database,
2485    table: &str,
2486    values: &'a HashMap<String, Value>,
2487) -> Option<&'a Vec<f32>> {
2488    let meta = db.table_meta(table)?;
2489    meta.columns
2490        .iter()
2491        .find_map(|column| match column.column_type {
2492            contextdb_core::ColumnType::Vector(_) => match values.get(&column.name) {
2493                Some(Value::Vector(vector)) => Some(vector),
2494                _ => None,
2495            },
2496            _ => None,
2497        })
2498}
2499
2500fn coerce_value_for_column(db: &Database, table: &str, col: &str, v: Value) -> Result<Value> {
2501    let Some(meta) = db.table_meta(table) else {
2502        return Ok(coerce_uuid_if_needed(col, v));
2503    };
2504    let Some(column) = meta.columns.iter().find(|c| c.name == col) else {
2505        return Ok(coerce_uuid_if_needed(col, v));
2506    };
2507
2508    match column.column_type {
2509        contextdb_core::ColumnType::Uuid => coerce_uuid_value(v),
2510        contextdb_core::ColumnType::Timestamp => coerce_timestamp_value(v),
2511        contextdb_core::ColumnType::Vector(dim) => coerce_vector_value(v, dim),
2512        _ => Ok(coerce_uuid_if_needed(col, v)),
2513    }
2514}
2515
2516fn coerce_uuid_value(v: Value) -> Result<Value> {
2517    match v {
2518        Value::Null => Ok(Value::Null),
2519        Value::Uuid(id) => Ok(Value::Uuid(id)),
2520        Value::Text(text) => uuid::Uuid::parse_str(&text)
2521            .map(Value::Uuid)
2522            .map_err(|err| Error::Other(format!("invalid UUID literal '{text}': {err}"))),
2523        other => Err(Error::Other(format!(
2524            "UUID column requires UUID or text literal, got {other:?}"
2525        ))),
2526    }
2527}
2528
2529fn coerce_uuid_if_needed(col: &str, v: Value) -> Value {
2530    if (col == "id" || col.ends_with("_id"))
2531        && let Value::Text(s) = &v
2532        && let Ok(u) = uuid::Uuid::parse_str(s)
2533    {
2534        return Value::Uuid(u);
2535    }
2536    v
2537}
2538
2539fn coerce_timestamp_value(v: Value) -> Result<Value> {
2540    match v {
2541        Value::Null => Ok(Value::Null),
2542        Value::Text(text) if text.eq_ignore_ascii_case("infinity") => {
2543            Ok(Value::Timestamp(i64::MAX))
2544        }
2545        Value::Text(text) => {
2546            let parsed = OffsetDateTime::parse(&text, &Rfc3339).map_err(|err| {
2547                Error::Other(format!("invalid TIMESTAMP literal '{text}': {err}"))
2548            })?;
2549            Ok(Value::Timestamp(
2550                parsed.unix_timestamp_nanos() as i64 / 1_000_000,
2551            ))
2552        }
2553        other => Ok(other),
2554    }
2555}
2556
2557fn coerce_vector_value(v: Value, expected_dim: usize) -> Result<Value> {
2558    let vector = match v {
2559        Value::Null => return Ok(Value::Null),
2560        Value::Vector(vector) => vector,
2561        Value::Text(text) => parse_text_vector_literal(&text)?,
2562        other => return Ok(other),
2563    };
2564
2565    if vector.len() != expected_dim {
2566        return Err(Error::VectorDimensionMismatch {
2567            expected: expected_dim,
2568            got: vector.len(),
2569        });
2570    }
2571
2572    Ok(Value::Vector(vector))
2573}
2574
2575fn parse_text_vector_literal(text: &str) -> Result<Vec<f32>> {
2576    let trimmed = text.trim();
2577    let inner = trimmed
2578        .strip_prefix('[')
2579        .and_then(|s| s.strip_suffix(']'))
2580        .ok_or_else(|| Error::Other(format!("invalid VECTOR literal '{text}'")))?;
2581
2582    if inner.trim().is_empty() {
2583        return Ok(Vec::new());
2584    }
2585
2586    inner
2587        .split(',')
2588        .map(|part| {
2589            part.trim().parse::<f32>().map_err(|err| {
2590                Error::Other(format!("invalid VECTOR component '{}': {err}", part.trim()))
2591            })
2592        })
2593        .collect()
2594}
2595
2596fn apply_missing_column_defaults(
2597    db: &Database,
2598    table: &str,
2599    values: &mut HashMap<String, Value>,
2600) -> Result<()> {
2601    let Some(meta) = db.table_meta(table) else {
2602        return Ok(());
2603    };
2604
2605    for column in &meta.columns {
2606        if values.contains_key(&column.name) {
2607            continue;
2608        }
2609        let Some(default) = &column.default else {
2610            continue;
2611        };
2612        let value = evaluate_stored_default_expr(default)?;
2613        values.insert(
2614            column.name.clone(),
2615            coerce_value_for_column(db, table, &column.name, value)?,
2616        );
2617    }
2618
2619    Ok(())
2620}
2621
2622fn evaluate_stored_default_expr(default: &str) -> Result<Value> {
2623    if default.eq_ignore_ascii_case("NOW()") {
2624        return eval_function("now", &[]);
2625    }
2626    if default.contains("FunctionCall") && default.contains("name: \"NOW\"") {
2627        return eval_function("now", &[]);
2628    }
2629    if default == "Literal(Null)" || default.eq_ignore_ascii_case("NULL") {
2630        return Ok(Value::Null);
2631    }
2632    if default.eq_ignore_ascii_case("TRUE") {
2633        return Ok(Value::Bool(true));
2634    }
2635    if default.eq_ignore_ascii_case("FALSE") {
2636        return Ok(Value::Bool(false));
2637    }
2638    if default.starts_with('\'') && default.ends_with('\'') && default.len() >= 2 {
2639        return Ok(Value::Text(
2640            default[1..default.len() - 1].replace("''", "'"),
2641        ));
2642    }
2643    if let Some(text) = default
2644        .strip_prefix("Literal(Text(\"")
2645        .and_then(|value| value.strip_suffix("\"))"))
2646    {
2647        return Ok(Value::Text(text.to_string()));
2648    }
2649    if let Some(value) = default
2650        .strip_prefix("Literal(Integer(")
2651        .and_then(|value| value.strip_suffix("))"))
2652    {
2653        let parsed = value.parse::<i64>().map_err(|err| {
2654            Error::Other(format!("invalid stored integer default '{value}': {err}"))
2655        })?;
2656        return Ok(Value::Int64(parsed));
2657    }
2658    if let Some(value) = default
2659        .strip_prefix("Literal(Real(")
2660        .and_then(|value| value.strip_suffix("))"))
2661    {
2662        let parsed = value
2663            .parse::<f64>()
2664            .map_err(|err| Error::Other(format!("invalid stored real default '{value}': {err}")))?;
2665        return Ok(Value::Float64(parsed));
2666    }
2667    if let Some(value) = default
2668        .strip_prefix("Literal(Bool(")
2669        .and_then(|value| value.strip_suffix("))"))
2670    {
2671        let parsed = value
2672            .parse::<bool>()
2673            .map_err(|err| Error::Other(format!("invalid stored bool default '{value}': {err}")))?;
2674        return Ok(Value::Bool(parsed));
2675    }
2676
2677    Err(Error::Other(format!(
2678        "unsupported stored DEFAULT expression: {default}"
2679    )))
2680}
2681
2682pub(crate) fn stored_default_expr(expr: &Expr) -> String {
2683    match expr {
2684        Expr::Literal(Literal::Null) => "NULL".to_string(),
2685        Expr::Literal(Literal::Bool(value)) => {
2686            if *value {
2687                "TRUE".to_string()
2688            } else {
2689                "FALSE".to_string()
2690            }
2691        }
2692        Expr::Literal(Literal::Integer(value)) => value.to_string(),
2693        Expr::Literal(Literal::Real(value)) => value.to_string(),
2694        Expr::Literal(Literal::Text(value)) => format!("'{}'", value.replace('\'', "''")),
2695        Expr::FunctionCall { name, args }
2696            if name.eq_ignore_ascii_case("NOW") && args.is_empty() =>
2697        {
2698            "NOW()".to_string()
2699        }
2700        _ => format!("{expr:?}"),
2701    }
2702}
2703
2704fn should_route_insert_to_graph(db: &Database, table: &str) -> bool {
2705    table.eq_ignore_ascii_case("edges")
2706        || db
2707            .table_meta(table)
2708            .is_some_and(|table_meta| !table_meta.dag_edge_types.is_empty())
2709}
2710
2711fn validate_expires_column(col: &contextdb_parser::ast::ColumnDef) -> Result<()> {
2712    if col.expires && !matches!(col.data_type, DataType::Timestamp) {
2713        return Err(Error::Other(
2714            "EXPIRES is only valid on TIMESTAMP columns".to_string(),
2715        ));
2716    }
2717    Ok(())
2718}
2719
2720fn expires_column_name(columns: &[contextdb_parser::ast::ColumnDef]) -> Result<Option<String>> {
2721    let mut expires_column = None;
2722    for col in columns {
2723        validate_expires_column(col)?;
2724        if col.expires {
2725            if expires_column.is_some() {
2726                return Err(Error::Other(
2727                    "only one EXPIRES column is supported per table".to_string(),
2728                ));
2729            }
2730            expires_column = Some(col.name.clone());
2731        }
2732    }
2733    Ok(expires_column)
2734}
2735
2736pub(crate) fn map_column_type(dtype: &DataType) -> contextdb_core::ColumnType {
2737    match dtype {
2738        DataType::Uuid => contextdb_core::ColumnType::Uuid,
2739        DataType::Text => contextdb_core::ColumnType::Text,
2740        DataType::Integer => contextdb_core::ColumnType::Integer,
2741        DataType::Real => contextdb_core::ColumnType::Real,
2742        DataType::Boolean => contextdb_core::ColumnType::Boolean,
2743        DataType::Timestamp => contextdb_core::ColumnType::Timestamp,
2744        DataType::Json => contextdb_core::ColumnType::Json,
2745        DataType::Vector(dim) => contextdb_core::ColumnType::Vector(*dim as usize),
2746    }
2747}
2748
2749fn parse_conflict_policy(s: &str) -> Result<ConflictPolicy> {
2750    match s {
2751        "latest_wins" => Ok(ConflictPolicy::LatestWins),
2752        "server_wins" => Ok(ConflictPolicy::ServerWins),
2753        "edge_wins" => Ok(ConflictPolicy::EdgeWins),
2754        "insert_if_not_exists" => Ok(ConflictPolicy::InsertIfNotExists),
2755        _ => Err(Error::Other(format!("unknown conflict policy: {s}"))),
2756    }
2757}
2758
2759fn conflict_policy_to_string(p: ConflictPolicy) -> String {
2760    match p {
2761        ConflictPolicy::LatestWins => "latest_wins".to_string(),
2762        ConflictPolicy::ServerWins => "server_wins".to_string(),
2763        ConflictPolicy::EdgeWins => "edge_wins".to_string(),
2764        ConflictPolicy::InsertIfNotExists => "insert_if_not_exists".to_string(),
2765    }
2766}
2767
2768fn project_graph_frontier_rows(
2769    frontier: Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>,
2770    start_alias: &str,
2771    steps: &[GraphStepPlan],
2772) -> Result<Vec<Vec<Value>>> {
2773    frontier
2774        .into_iter()
2775        .map(|(bindings, id, depth)| {
2776            let mut row = Vec::with_capacity(steps.len() + 3);
2777            let start_id = bindings.get(start_alias).ok_or_else(|| {
2778                Error::PlanError(format!(
2779                    "graph frontier missing required start alias binding '{start_alias}'"
2780                ))
2781            })?;
2782            row.push(Value::Uuid(*start_id));
2783            for step in steps {
2784                let target_id = bindings.get(&step.target_alias).ok_or_else(|| {
2785                    Error::PlanError(format!(
2786                        "graph frontier missing required target alias binding '{}'",
2787                        step.target_alias
2788                    ))
2789                })?;
2790                row.push(Value::Uuid(*target_id));
2791            }
2792            row.push(Value::Uuid(id));
2793            row.push(Value::Int64(depth as i64));
2794            Ok(row)
2795        })
2796        .collect()
2797}
2798
2799#[cfg(test)]
2800mod tests {
2801    use super::*;
2802    use contextdb_planner::GraphStepPlan;
2803    use uuid::Uuid;
2804
2805    #[test]
2806    fn graph_01_frontier_projection_requires_complete_bindings() {
2807        let steps = vec![GraphStepPlan {
2808            edge_types: vec!["EDGE".to_string()],
2809            direction: Direction::Outgoing,
2810            min_depth: 1,
2811            max_depth: 1,
2812            target_alias: "b".to_string(),
2813        }];
2814
2815        let missing_start = vec![(HashMap::new(), Uuid::new_v4(), 0)];
2816        let missing_target = vec![(
2817            HashMap::from([("a".to_string(), Uuid::new_v4())]),
2818            Uuid::new_v4(),
2819            0,
2820        )];
2821
2822        let start_result = project_graph_frontier_rows(missing_start, "a", &steps);
2823        assert!(
2824            matches!(start_result, Err(Error::PlanError(_))),
2825            "graph frontier projection should return a plan error on missing start alias binding, got {start_result:?}"
2826        );
2827
2828        let target_result = project_graph_frontier_rows(missing_target, "a", &steps);
2829        assert!(
2830            matches!(target_result, Err(Error::PlanError(_))),
2831            "graph frontier projection should return a plan error on missing target alias binding, got {target_result:?}"
2832        );
2833    }
2834}