Skip to main content

contextdb_parser/
parser.rs

1use crate::ast::*;
2use contextdb_core::{Error, Result};
3use pest::Parser;
4use pest::iterators::Pair;
5use pest_derive::Parser;
6
7#[derive(Parser)]
8#[grammar = "grammar.pest"]
9struct ContextDbParser;
10
11pub fn parse(input: &str) -> Result<Statement> {
12    let sql = input.trim();
13
14    if starts_with_keywords(sql, &["CREATE", "PROCEDURE"])
15        || starts_with_keywords(sql, &["CREATE", "FUNCTION"])
16    {
17        return Err(Error::StoredProcNotSupported);
18    }
19    if starts_with_keywords(sql, &["WITH", "RECURSIVE"]) {
20        return Err(Error::RecursiveCteNotSupported);
21    }
22    if contains_keyword_sequence_outside_strings(sql, &["GROUP", "BY"]) {
23        return Err(Error::ParseError("GROUP BY is not supported".to_string()));
24    }
25    if contains_token_outside_strings(sql, "OVER") {
26        return Err(Error::WindowFunctionNotSupported);
27    }
28    if contains_where_match_operator(sql) {
29        return Err(Error::FullTextSearchNotSupported);
30    }
31
32    let mut pairs = ContextDbParser::parse(Rule::statement, sql)
33        .map_err(|e| Error::ParseError(e.to_string()))?;
34    let statement = pairs
35        .next()
36        .ok_or_else(|| Error::ParseError("empty statement".to_string()))?;
37    let inner = statement
38        .into_inner()
39        .next()
40        .ok_or_else(|| Error::ParseError("missing statement body".to_string()))?;
41
42    let stmt = match inner.as_rule() {
43        Rule::begin_stmt => Statement::Begin,
44        Rule::commit_stmt => Statement::Commit,
45        Rule::rollback_stmt => Statement::Rollback,
46        Rule::create_table_stmt => Statement::CreateTable(build_create_table(inner)?),
47        Rule::alter_table_stmt => Statement::AlterTable(build_alter_table(inner)?),
48        Rule::drop_table_stmt => Statement::DropTable(build_drop_table(inner)?),
49        Rule::create_index_stmt => Statement::CreateIndex(build_create_index(inner)?),
50        Rule::drop_index_stmt => Statement::DropIndex(build_drop_index(inner)?),
51        Rule::insert_stmt => Statement::Insert(build_insert(inner)?),
52        Rule::delete_stmt => Statement::Delete(build_delete(inner)?),
53        Rule::update_stmt => Statement::Update(build_update(inner)?),
54        Rule::select_stmt => Statement::Select(build_select(inner)?),
55        Rule::set_sync_conflict_policy => {
56            let policy = inner
57                .into_inner()
58                .find(|p| p.as_rule() == Rule::conflict_policy_value)
59                .ok_or_else(|| Error::ParseError("missing conflict policy value".to_string()))?
60                .as_str()
61                .to_lowercase();
62            Statement::SetSyncConflictPolicy(policy)
63        }
64        Rule::show_sync_conflict_policy => Statement::ShowSyncConflictPolicy,
65        Rule::show_vector_indexes_stmt => Statement::ShowVectorIndexes,
66        Rule::set_memory_limit => Statement::SetMemoryLimit(build_set_memory_limit(inner)?),
67        Rule::show_memory_limit => Statement::ShowMemoryLimit,
68        Rule::set_disk_limit => Statement::SetDiskLimit(build_set_disk_limit(inner)?),
69        Rule::show_disk_limit => Statement::ShowDiskLimit,
70        _ => return Err(Error::ParseError("unsupported statement".to_string())),
71    };
72
73    validate_statement(&stmt)?;
74    Ok(stmt)
75}
76
77fn build_select(pair: Pair<'_, Rule>) -> Result<SelectStatement> {
78    let mut ctes = Vec::new();
79    let mut body = None;
80
81    for p in pair.into_inner() {
82        match p.as_rule() {
83            Rule::with_clause => {
84                for item in p.into_inner() {
85                    match item.as_rule() {
86                        Rule::recursive_kw => return Err(Error::RecursiveCteNotSupported),
87                        Rule::cte_def => ctes.push(build_cte(item)?),
88                        other => return Err(unexpected_rule(other, "build_select.with_clause")),
89                    }
90                }
91            }
92            Rule::select_core => body = Some(build_select_core(p)?),
93            other => return Err(unexpected_rule(other, "build_select")),
94        }
95    }
96
97    Ok(SelectStatement {
98        ctes,
99        body: body.ok_or_else(|| Error::ParseError("missing SELECT body".to_string()))?,
100    })
101}
102
103fn build_cte(pair: Pair<'_, Rule>) -> Result<Cte> {
104    let mut name = None;
105    let mut query = None;
106
107    for p in pair.into_inner() {
108        match p.as_rule() {
109            Rule::identifier if name.is_none() => name = Some(parse_identifier(p.as_str())),
110            Rule::select_core => query = Some(build_select_core(p)?),
111            other => return Err(unexpected_rule(other, "build_cte")),
112        }
113    }
114
115    Ok(Cte::SqlCte {
116        name: name.ok_or_else(|| Error::ParseError("CTE missing name".to_string()))?,
117        query: query.ok_or_else(|| Error::ParseError("CTE missing query".to_string()))?,
118    })
119}
120
121fn build_select_core(pair: Pair<'_, Rule>) -> Result<SelectBody> {
122    let mut distinct = false;
123    let mut columns = Vec::new();
124    let mut from = Vec::new();
125    let mut joins = Vec::new();
126    let mut where_clause = None;
127    let mut order_by = Vec::new();
128    let mut use_rank = None;
129    let mut limit = None;
130
131    for p in pair.into_inner() {
132        match p.as_rule() {
133            Rule::distinct_kw => distinct = true,
134            Rule::select_list => {
135                columns = build_select_list(p)?;
136            }
137            Rule::from_clause => {
138                from = build_from_clause(p)?;
139            }
140            Rule::join_clause => {
141                joins.push(build_join_clause(p)?);
142            }
143            Rule::where_clause => {
144                where_clause = Some(build_where_clause(p)?);
145            }
146            Rule::order_by_clause => {
147                order_by = build_order_by_clause(p)?;
148            }
149            Rule::use_rank_clause => {
150                use_rank = Some(build_use_rank_clause(p)?);
151            }
152            Rule::limit_clause => {
153                limit = Some(build_limit_clause(p)?);
154            }
155            other => return Err(unexpected_rule(other, "build_select_core")),
156        }
157    }
158
159    Ok(SelectBody {
160        distinct,
161        columns,
162        from,
163        joins,
164        where_clause,
165        order_by,
166        use_rank,
167        limit,
168    })
169}
170
171fn build_select_list(pair: Pair<'_, Rule>) -> Result<Vec<SelectColumn>> {
172    let mut cols = Vec::new();
173
174    for p in pair.into_inner() {
175        match p.as_rule() {
176            Rule::star => cols.push(SelectColumn {
177                expr: Expr::Column(ColumnRef {
178                    table: None,
179                    column: "*".to_string(),
180                }),
181                alias: None,
182            }),
183            Rule::select_item => cols.push(build_select_item(p)?),
184            other => return Err(unexpected_rule(other, "build_select_list")),
185        }
186    }
187
188    Ok(cols)
189}
190
191fn build_select_item(pair: Pair<'_, Rule>) -> Result<SelectColumn> {
192    let mut expr = None;
193    let mut alias = None;
194
195    for p in pair.into_inner() {
196        match p.as_rule() {
197            Rule::expr => expr = Some(build_expr(p)?),
198            Rule::identifier => alias = Some(parse_identifier(p.as_str())),
199            other => return Err(unexpected_rule(other, "build_select_item")),
200        }
201    }
202
203    Ok(SelectColumn {
204        expr: expr
205            .ok_or_else(|| Error::ParseError("SELECT item missing expression".to_string()))?,
206        alias,
207    })
208}
209
210fn build_from_clause(pair: Pair<'_, Rule>) -> Result<Vec<FromItem>> {
211    let mut items = Vec::new();
212    for p in pair.into_inner() {
213        if p.as_rule() == Rule::from_item {
214            items.push(build_from_item(p)?);
215        }
216    }
217    Ok(items)
218}
219
220fn build_from_item(pair: Pair<'_, Rule>) -> Result<FromItem> {
221    let inner = pair
222        .into_inner()
223        .next()
224        .ok_or_else(|| Error::ParseError("missing FROM item".to_string()))?;
225
226    match inner.as_rule() {
227        Rule::table_ref => build_table_ref(inner),
228        Rule::graph_table => build_graph_table(inner),
229        _ => Err(Error::ParseError("invalid FROM item".to_string())),
230    }
231}
232
233fn build_join_clause(pair: Pair<'_, Rule>) -> Result<JoinClause> {
234    let mut join_type = None;
235    let mut table = None;
236    let mut alias = None;
237    let mut on = None;
238
239    for p in pair.into_inner() {
240        match p.as_rule() {
241            Rule::join_type => {
242                join_type = Some(if p.as_str().to_ascii_uppercase().starts_with("LEFT") {
243                    JoinType::Left
244                } else {
245                    JoinType::Inner
246                });
247            }
248            Rule::join_table_ref => {
249                let mut inner = p.into_inner();
250                table = Some(parse_identifier(inner.next().unwrap().as_str()));
251                if let Some(alias_pair) = inner.next() {
252                    alias = Some(parse_identifier(alias_pair.as_str()));
253                }
254            }
255            Rule::expr => on = Some(build_expr(p)?),
256            other => return Err(unexpected_rule(other, "build_join_clause")),
257        }
258    }
259
260    Ok(JoinClause {
261        join_type: join_type.ok_or_else(|| Error::ParseError("JOIN missing type".to_string()))?,
262        table: table.ok_or_else(|| Error::ParseError("JOIN missing table".to_string()))?,
263        alias,
264        on: on.ok_or_else(|| Error::ParseError("JOIN missing ON expression".to_string()))?,
265    })
266}
267
268fn build_table_ref(pair: Pair<'_, Rule>) -> Result<FromItem> {
269    let mut name = None;
270    let mut alias = None;
271
272    for part in pair.into_inner() {
273        match part.as_rule() {
274            Rule::identifier if name.is_none() => name = Some(parse_identifier(part.as_str())),
275            Rule::identifier | Rule::table_alias if alias.is_none() => {
276                alias = Some(parse_identifier(part.as_str()))
277            }
278            other => return Err(unexpected_rule(other, "build_table_ref")),
279        }
280    }
281
282    let name = name.ok_or_else(|| Error::ParseError("table name missing".to_string()))?;
283
284    Ok(FromItem::Table { name, alias })
285}
286
287fn build_graph_table(pair: Pair<'_, Rule>) -> Result<FromItem> {
288    let mut graph_name = None;
289    let mut pattern = None;
290    let mut where_clause = None;
291    let mut columns: Vec<GraphTableColumn> = Vec::new();
292
293    for p in pair.into_inner() {
294        match p.as_rule() {
295            Rule::graph_table_kw => {}
296            Rule::identifier if graph_name.is_none() => {
297                graph_name = Some(parse_identifier(p.as_str()))
298            }
299            Rule::graph_match_clause => pattern = Some(build_match_pattern(p)?),
300            Rule::graph_where_clause => {
301                let expr_pair = p
302                    .into_inner()
303                    .find(|i| i.as_rule() == Rule::expr)
304                    .ok_or_else(|| {
305                        Error::ParseError("MATCH WHERE missing expression".to_string())
306                    })?;
307                where_clause = Some(build_expr(expr_pair)?);
308            }
309            Rule::columns_clause => columns = build_columns_clause(p)?,
310            other => return Err(unexpected_rule(other, "build_graph_table")),
311        }
312    }
313
314    let graph_name = graph_name
315        .ok_or_else(|| Error::ParseError("GRAPH_TABLE requires graph name".to_string()))?;
316    let graph_pattern = pattern
317        .ok_or_else(|| Error::ParseError("GRAPH_TABLE missing MATCH pattern".to_string()))?;
318    let return_cols = columns
319        .iter()
320        .map(|c| ReturnCol {
321            expr: c.expr.clone(),
322            alias: Some(c.alias.clone()),
323        })
324        .collect::<Vec<_>>();
325
326    let match_clause = MatchClause {
327        graph_name: Some(graph_name.clone()),
328        pattern: graph_pattern,
329        where_clause,
330        return_cols,
331    };
332
333    Ok(FromItem::GraphTable {
334        graph_name,
335        match_clause,
336        columns,
337    })
338}
339
340fn build_match_pattern(pair: Pair<'_, Rule>) -> Result<GraphPattern> {
341    let inner = pair
342        .into_inner()
343        .find(|p| p.as_rule() == Rule::graph_pattern)
344        .ok_or_else(|| Error::ParseError("MATCH pattern missing".to_string()))?;
345
346    let mut nodes_and_edges = inner.into_inner();
347    let start_pair = nodes_and_edges
348        .next()
349        .ok_or_else(|| Error::ParseError("pattern start node missing".to_string()))?;
350    let start = build_node_pattern(start_pair)?;
351
352    let mut edges = Vec::new();
353    for p in nodes_and_edges {
354        if p.as_rule() == Rule::edge_step {
355            edges.push(build_edge_step(p)?);
356        }
357    }
358
359    if edges.is_empty() {
360        return Err(Error::ParseError(
361            "MATCH requires at least one edge step".to_string(),
362        ));
363    }
364
365    Ok(GraphPattern { start, edges })
366}
367
368fn build_node_pattern(pair: Pair<'_, Rule>) -> Result<NodePattern> {
369    let mut alias = None;
370    let mut label = None;
371
372    for p in pair.into_inner() {
373        if p.as_rule() == Rule::identifier {
374            if alias.is_none() {
375                alias = Some(parse_identifier(p.as_str()));
376            } else if label.is_none() {
377                label = Some(parse_identifier(p.as_str()));
378            }
379        }
380    }
381
382    Ok(NodePattern {
383        alias: alias.unwrap_or_default(),
384        label,
385        properties: Vec::new(),
386    })
387}
388
389fn build_edge_step(pair: Pair<'_, Rule>) -> Result<EdgeStep> {
390    let edge = pair
391        .into_inner()
392        .next()
393        .ok_or_else(|| Error::ParseError("edge step missing".to_string()))?;
394
395    let (direction, inner_rule) = match edge.as_rule() {
396        Rule::outgoing_edge => (EdgeDirection::Outgoing, edge),
397        Rule::incoming_edge => (EdgeDirection::Incoming, edge),
398        Rule::both_edge => (EdgeDirection::Both, edge),
399        _ => return Err(Error::ParseError("invalid edge direction".to_string())),
400    };
401
402    let mut alias = None;
403    let mut edge_type = None;
404    let mut min_hops = 1_u32;
405    let mut max_hops = 1_u32;
406    let mut target = None;
407
408    for p in inner_rule.into_inner() {
409        match p.as_rule() {
410            Rule::edge_bracket => {
411                let (a, t) = build_edge_bracket(p)?;
412                alias = a;
413                edge_type = t;
414            }
415            Rule::quantifier => {
416                let (min, max) = build_quantifier(p)?;
417                min_hops = min;
418                max_hops = max;
419            }
420            Rule::node_pattern => target = Some(build_node_pattern(p)?),
421            other => return Err(unexpected_rule(other, "build_edge_step")),
422        }
423    }
424
425    Ok(EdgeStep {
426        direction,
427        edge_type,
428        min_hops,
429        max_hops,
430        alias,
431        target: target.ok_or_else(|| Error::ParseError("edge target node missing".to_string()))?,
432    })
433}
434
435fn build_edge_bracket(pair: Pair<'_, Rule>) -> Result<(Option<String>, Option<String>)> {
436    let mut alias = None;
437    let mut edge_type = None;
438
439    for p in pair.into_inner() {
440        if p.as_rule() == Rule::edge_spec {
441            let raw = p.as_str().trim().to_string();
442            let ids: Vec<String> = p
443                .into_inner()
444                .filter(|i| i.as_rule() == Rule::identifier)
445                .map(|i| parse_identifier(i.as_str()))
446                .collect();
447
448            if raw.starts_with(':') {
449                if let Some(t) = ids.first() {
450                    edge_type = Some(t.clone());
451                }
452            } else if ids.len() == 1 {
453                alias = Some(ids[0].clone());
454            } else if ids.len() >= 2 {
455                alias = Some(ids[0].clone());
456                edge_type = Some(ids[1].clone());
457            }
458        }
459    }
460
461    Ok((alias, edge_type))
462}
463
464fn build_quantifier(pair: Pair<'_, Rule>) -> Result<(u32, u32)> {
465    let inner = pair
466        .into_inner()
467        .next()
468        .ok_or_else(|| Error::ParseError("invalid quantifier".to_string()))?;
469
470    match inner.as_rule() {
471        Rule::plus_quantifier | Rule::star_quantifier => Ok((1, 0)),
472        Rule::bounded_quantifier => {
473            let nums: Vec<u32> = inner
474                .into_inner()
475                .filter(|p| p.as_rule() == Rule::integer)
476                .map(|p| parse_u32(p.as_str(), "invalid quantifier number"))
477                .collect::<Result<Vec<_>>>()?;
478
479            if nums.is_empty() {
480                return Err(Error::ParseError("invalid quantifier".to_string()));
481            }
482
483            let min = nums[0];
484            let max = if nums.len() > 1 { nums[1] } else { 0 };
485            Ok((min, max))
486        }
487        _ => Err(Error::ParseError("invalid quantifier".to_string())),
488    }
489}
490
491fn build_columns_clause(pair: Pair<'_, Rule>) -> Result<Vec<GraphTableColumn>> {
492    let mut cols = Vec::new();
493
494    for p in pair.into_inner() {
495        if p.as_rule() == Rule::graph_column {
496            let mut expr = None;
497            let mut alias = None;
498
499            for inner in p.into_inner() {
500                match inner.as_rule() {
501                    Rule::expr => expr = Some(build_expr(inner)?),
502                    Rule::identifier => alias = Some(parse_identifier(inner.as_str())),
503                    other => {
504                        return Err(unexpected_rule(other, "build_columns_clause.graph_column"));
505                    }
506                }
507            }
508
509            let expr = expr
510                .ok_or_else(|| Error::ParseError("COLUMNS item missing expression".to_string()))?;
511            let alias = alias.unwrap_or_else(|| match &expr {
512                Expr::Column(c) => c.column.clone(),
513                _ => "expr".to_string(),
514            });
515            cols.push(GraphTableColumn { expr, alias });
516        }
517    }
518
519    Ok(cols)
520}
521
522fn build_where_clause(pair: Pair<'_, Rule>) -> Result<Expr> {
523    let expr_pair = pair
524        .into_inner()
525        .find(|p| p.as_rule() == Rule::expr)
526        .ok_or_else(|| Error::ParseError("WHERE missing expression".to_string()))?;
527    build_expr(expr_pair)
528}
529
530fn build_order_by_clause(pair: Pair<'_, Rule>) -> Result<Vec<OrderByItem>> {
531    let mut items = Vec::new();
532    for p in pair.into_inner() {
533        if p.as_rule() == Rule::order_item {
534            items.push(build_order_item(p)?);
535        }
536    }
537    Ok(items)
538}
539
540fn build_order_item(pair: Pair<'_, Rule>) -> Result<OrderByItem> {
541    let mut direction = SortDirection::Asc;
542    let mut expr = None;
543
544    for p in pair.into_inner() {
545        match p.as_rule() {
546            Rule::cosine_expr => {
547                let mut it = p.into_inner();
548                let left = build_additive_expr(
549                    it.next()
550                        .ok_or_else(|| Error::ParseError("invalid cosine expr".to_string()))?,
551                )?;
552                let right = build_additive_expr(
553                    it.next()
554                        .ok_or_else(|| Error::ParseError("invalid cosine expr".to_string()))?,
555                )?;
556                expr = Some(Expr::CosineDistance {
557                    left: Box::new(left),
558                    right: Box::new(right),
559                });
560                direction = SortDirection::CosineDistance;
561            }
562            Rule::expr => expr = Some(build_expr(p)?),
563            Rule::sort_dir => {
564                direction = if p.as_str().eq_ignore_ascii_case("DESC") {
565                    SortDirection::Desc
566                } else {
567                    SortDirection::Asc
568                };
569            }
570            other => return Err(unexpected_rule(other, "build_order_item")),
571        }
572    }
573
574    Ok(OrderByItem {
575        expr: expr
576            .ok_or_else(|| Error::ParseError("ORDER BY item missing expression".to_string()))?,
577        direction,
578    })
579}
580
581fn build_limit_clause(pair: Pair<'_, Rule>) -> Result<u64> {
582    let num = pair
583        .into_inner()
584        .find(|p| p.as_rule() == Rule::integer)
585        .ok_or_else(|| Error::ParseError("LIMIT missing value".to_string()))?;
586    parse_u64(num.as_str(), "invalid LIMIT value")
587}
588
589fn build_use_rank_clause(pair: Pair<'_, Rule>) -> Result<String> {
590    pair.into_inner()
591        .find(|p| p.as_rule() == Rule::identifier)
592        .map(|p| parse_identifier(p.as_str()))
593        .ok_or_else(|| Error::ParseError("USE RANK missing sort key".to_string()))
594}
595
596fn build_expr(pair: Pair<'_, Rule>) -> Result<Expr> {
597    let inner = pair
598        .into_inner()
599        .next()
600        .ok_or_else(|| Error::ParseError("invalid expression".to_string()))?;
601    build_or_expr(inner)
602}
603
604fn build_or_expr(pair: Pair<'_, Rule>) -> Result<Expr> {
605    let mut inner = pair.into_inner();
606    let first = inner
607        .next()
608        .ok_or_else(|| Error::ParseError("invalid OR expression".to_string()))?;
609    let mut expr = build_and_expr(first)?;
610
611    while let Some(op_or_next) = inner.next() {
612        if op_or_next.as_rule() == Rule::or_op {
613            let rhs_pair = inner
614                .next()
615                .ok_or_else(|| Error::ParseError("OR missing right operand".to_string()))?;
616            let rhs = build_and_expr(rhs_pair)?;
617            expr = Expr::BinaryOp {
618                left: Box::new(expr),
619                op: BinOp::Or,
620                right: Box::new(rhs),
621            };
622        }
623    }
624
625    Ok(expr)
626}
627
628fn build_and_expr(pair: Pair<'_, Rule>) -> Result<Expr> {
629    let mut inner = pair.into_inner();
630    let first = inner
631        .next()
632        .ok_or_else(|| Error::ParseError("invalid AND expression".to_string()))?;
633    let mut expr = build_unary_bool_expr(first)?;
634
635    while let Some(op_or_next) = inner.next() {
636        if op_or_next.as_rule() == Rule::and_op {
637            let rhs_pair = inner
638                .next()
639                .ok_or_else(|| Error::ParseError("AND missing right operand".to_string()))?;
640            let rhs = build_unary_bool_expr(rhs_pair)?;
641            expr = Expr::BinaryOp {
642                left: Box::new(expr),
643                op: BinOp::And,
644                right: Box::new(rhs),
645            };
646        }
647    }
648
649    Ok(expr)
650}
651
652fn build_unary_bool_expr(pair: Pair<'_, Rule>) -> Result<Expr> {
653    let mut not_count = 0usize;
654    let mut cmp = None;
655
656    for p in pair.into_inner() {
657        match p.as_rule() {
658            Rule::not_op => not_count += 1,
659            Rule::comparison_expr => cmp = Some(build_comparison_expr(p)?),
660            other => return Err(unexpected_rule(other, "build_unary_bool_expr")),
661        }
662    }
663
664    let mut expr =
665        cmp.ok_or_else(|| Error::ParseError("invalid unary boolean expression".to_string()))?;
666    for _ in 0..not_count {
667        expr = Expr::UnaryOp {
668            op: UnaryOp::Not,
669            operand: Box::new(expr),
670        };
671    }
672    Ok(expr)
673}
674
675fn build_comparison_expr(pair: Pair<'_, Rule>) -> Result<Expr> {
676    let mut inner = pair.into_inner();
677    let left_pair = inner
678        .next()
679        .ok_or_else(|| Error::ParseError("comparison missing left operand".to_string()))?;
680    let left = build_additive_expr(left_pair)?;
681
682    if let Some(suffix) = inner.next() {
683        build_comparison_suffix(left, suffix)
684    } else {
685        Ok(left)
686    }
687}
688
689fn build_comparison_suffix(left: Expr, pair: Pair<'_, Rule>) -> Result<Expr> {
690    let suffix = pair
691        .into_inner()
692        .next()
693        .ok_or_else(|| Error::ParseError("invalid comparison suffix".to_string()))?;
694
695    match suffix.as_rule() {
696        Rule::cmp_suffix => {
697            let mut it = suffix.into_inner();
698            let op_pair = it
699                .next()
700                .ok_or_else(|| Error::ParseError("comparison missing operator".to_string()))?;
701            let rhs_pair = it
702                .next()
703                .ok_or_else(|| Error::ParseError("comparison missing right operand".to_string()))?;
704            let op = match op_pair.as_str() {
705                "=" => BinOp::Eq,
706                "!=" | "<>" => BinOp::Neq,
707                "<" => BinOp::Lt,
708                "<=" => BinOp::Lte,
709                ">" => BinOp::Gt,
710                ">=" => BinOp::Gte,
711                _ => {
712                    return Err(Error::ParseError(
713                        "unsupported comparison operator".to_string(),
714                    ));
715                }
716            };
717            let right = build_additive_expr(rhs_pair)?;
718            Ok(Expr::BinaryOp {
719                left: Box::new(left),
720                op,
721                right: Box::new(right),
722            })
723        }
724        Rule::is_null_suffix => {
725            let negated = suffix.into_inner().any(|p| p.as_rule() == Rule::not_op);
726            Ok(Expr::IsNull {
727                expr: Box::new(left),
728                negated,
729            })
730        }
731        Rule::like_suffix => {
732            let mut negated = false;
733            let mut pattern = None;
734            for p in suffix.into_inner() {
735                match p.as_rule() {
736                    Rule::not_op => negated = true,
737                    Rule::additive_expr => pattern = Some(build_additive_expr(p)?),
738                    other => return Err(unexpected_rule(other, "build_comparison_suffix.like")),
739                }
740            }
741            Ok(Expr::Like {
742                expr: Box::new(left),
743                pattern: Box::new(
744                    pattern.ok_or_else(|| Error::ParseError("LIKE missing pattern".to_string()))?,
745                ),
746                negated,
747            })
748        }
749        Rule::between_suffix => {
750            let mut negated = false;
751            let mut vals = Vec::new();
752            for p in suffix.into_inner() {
753                match p.as_rule() {
754                    Rule::not_op => negated = true,
755                    Rule::additive_expr => vals.push(build_additive_expr(p)?),
756                    other => {
757                        return Err(unexpected_rule(other, "build_comparison_suffix.between"));
758                    }
759                }
760            }
761
762            if vals.len() != 2 {
763                return Err(Error::ParseError(
764                    "BETWEEN requires lower and upper bounds".to_string(),
765                ));
766            }
767
768            let upper = vals.pop().expect("checked len");
769            let lower = vals.pop().expect("checked len");
770            let gte = Expr::BinaryOp {
771                left: Box::new(left.clone()),
772                op: BinOp::Gte,
773                right: Box::new(lower),
774            };
775            let lte = Expr::BinaryOp {
776                left: Box::new(left),
777                op: BinOp::Lte,
778                right: Box::new(upper),
779            };
780            let between = Expr::BinaryOp {
781                left: Box::new(gte),
782                op: BinOp::And,
783                right: Box::new(lte),
784            };
785
786            if negated {
787                Ok(Expr::UnaryOp {
788                    op: UnaryOp::Not,
789                    operand: Box::new(between),
790                })
791            } else {
792                Ok(between)
793            }
794        }
795        Rule::in_suffix => {
796            let mut negated = false;
797            let mut list = Vec::new();
798            let mut subquery = None;
799
800            for p in suffix.into_inner() {
801                match p.as_rule() {
802                    Rule::not_op => negated = true,
803                    Rule::in_contents => {
804                        let mut parts = p.into_inner();
805                        let first = parts.next().ok_or_else(|| {
806                            Error::ParseError("IN list cannot be empty".to_string())
807                        })?;
808                        match first.as_rule() {
809                            Rule::select_core => subquery = Some(build_select_core(first)?),
810                            Rule::expr => {
811                                list.push(build_expr(first)?);
812                                for rest in parts {
813                                    if rest.as_rule() == Rule::expr {
814                                        list.push(build_expr(rest)?);
815                                    }
816                                }
817                            }
818                            _ => return Err(Error::ParseError("invalid IN contents".to_string())),
819                        }
820                    }
821                    other => return Err(unexpected_rule(other, "build_comparison_suffix.in")),
822                }
823            }
824
825            if let Some(sq) = subquery {
826                Ok(Expr::InSubquery {
827                    expr: Box::new(left),
828                    subquery: Box::new(sq),
829                    negated,
830                })
831            } else {
832                Ok(Expr::InList {
833                    expr: Box::new(left),
834                    list,
835                    negated,
836                })
837            }
838        }
839        _ => Err(Error::ParseError(
840            "unsupported comparison suffix".to_string(),
841        )),
842    }
843}
844
845fn build_additive_expr(pair: Pair<'_, Rule>) -> Result<Expr> {
846    let mut inner = pair.into_inner();
847    let first = inner
848        .next()
849        .ok_or_else(|| Error::ParseError("invalid additive expression".to_string()))?;
850    let mut expr = build_multiplicative_expr(first)?;
851
852    while let Some(op) = inner.next() {
853        let rhs_pair = inner
854            .next()
855            .ok_or_else(|| Error::ParseError("arithmetic missing right operand".to_string()))?;
856        let rhs = build_multiplicative_expr(rhs_pair)?;
857        let func = if op.as_str() == "+" { "__add" } else { "__sub" };
858        expr = Expr::FunctionCall {
859            name: func.to_string(),
860            args: vec![expr, rhs],
861        };
862    }
863
864    Ok(expr)
865}
866
867fn build_multiplicative_expr(pair: Pair<'_, Rule>) -> Result<Expr> {
868    let mut inner = pair.into_inner();
869    let first = inner
870        .next()
871        .ok_or_else(|| Error::ParseError("invalid multiplicative expression".to_string()))?;
872    let mut expr = build_unary_math_expr(first)?;
873
874    while let Some(op) = inner.next() {
875        let rhs_pair = inner
876            .next()
877            .ok_or_else(|| Error::ParseError("arithmetic missing right operand".to_string()))?;
878        let rhs = build_unary_math_expr(rhs_pair)?;
879        let func = if op.as_str() == "*" { "__mul" } else { "__div" };
880        expr = Expr::FunctionCall {
881            name: func.to_string(),
882            args: vec![expr, rhs],
883        };
884    }
885
886    Ok(expr)
887}
888
889fn build_unary_math_expr(pair: Pair<'_, Rule>) -> Result<Expr> {
890    let mut neg_count = 0usize;
891    let mut primary = None;
892
893    for p in pair.into_inner() {
894        match p.as_rule() {
895            Rule::unary_minus => neg_count += 1,
896            Rule::primary_expr => primary = Some(build_primary_expr(p)?),
897            other => return Err(unexpected_rule(other, "build_unary_math_expr")),
898        }
899    }
900
901    let mut expr =
902        primary.ok_or_else(|| Error::ParseError("invalid unary expression".to_string()))?;
903    for _ in 0..neg_count {
904        expr = Expr::UnaryOp {
905            op: UnaryOp::Neg,
906            operand: Box::new(expr),
907        };
908    }
909
910    Ok(expr)
911}
912
913fn build_primary_expr(pair: Pair<'_, Rule>) -> Result<Expr> {
914    let mut inner = pair.into_inner();
915    let first = inner
916        .next()
917        .ok_or_else(|| Error::ParseError("invalid primary expression".to_string()))?;
918
919    match first.as_rule() {
920        Rule::function_call => build_function_call(first),
921        Rule::parameter => Ok(Expr::Parameter(
922            first.as_str().trim_start_matches('$').to_string(),
923        )),
924        Rule::null_lit => Ok(Expr::Literal(Literal::Null)),
925        Rule::bool_lit => Ok(Expr::Literal(Literal::Bool(
926            first.as_str().eq_ignore_ascii_case("true"),
927        ))),
928        Rule::float => Ok(Expr::Literal(Literal::Real(parse_f64(
929            first.as_str(),
930            "invalid float literal",
931        )?))),
932        Rule::integer => Ok(Expr::Literal(Literal::Integer(parse_i64(
933            first.as_str(),
934            "invalid integer literal",
935        )?))),
936        Rule::string => Ok(Expr::Literal(Literal::Text(parse_string_literal(
937            first.as_str(),
938        )))),
939        Rule::vector_lit => {
940            let values: Vec<f32> = first
941                .into_inner()
942                .map(|p| {
943                    p.as_str()
944                        .parse::<f32>()
945                        .map_err(|_| Error::ParseError("invalid vector component".to_string()))
946                })
947                .collect::<Result<_>>()?;
948            Ok(Expr::Literal(Literal::Vector(values)))
949        }
950        Rule::column_ref => build_column_ref(first),
951        Rule::expr => build_expr(first),
952        _ => Err(Error::ParseError(
953            "unsupported primary expression".to_string(),
954        )),
955    }
956}
957
958fn build_function_call(pair: Pair<'_, Rule>) -> Result<Expr> {
959    let mut name = None;
960    let mut args = Vec::new();
961
962    for p in pair.into_inner() {
963        match p.as_rule() {
964            Rule::identifier if name.is_none() => name = Some(parse_identifier(p.as_str())),
965            Rule::star => args.push(Expr::Column(ColumnRef {
966                table: None,
967                column: "*".to_string(),
968            })),
969            Rule::expr => args.push(build_expr(p)?),
970            other => return Err(unexpected_rule(other, "build_function_call")),
971        }
972    }
973
974    Ok(Expr::FunctionCall {
975        name: name.ok_or_else(|| Error::ParseError("function name missing".to_string()))?,
976        args,
977    })
978}
979
980fn build_column_ref(pair: Pair<'_, Rule>) -> Result<Expr> {
981    let ids: Vec<String> = pair
982        .into_inner()
983        .filter(|p| p.as_rule() == Rule::identifier)
984        .map(|p| parse_identifier(p.as_str()))
985        .collect();
986
987    match ids.as_slice() {
988        [column] => Ok(Expr::Column(ColumnRef {
989            table: None,
990            column: column.clone(),
991        })),
992        [table, column] => Ok(Expr::Column(ColumnRef {
993            table: Some(table.clone()),
994            column: column.clone(),
995        })),
996        _ => Err(Error::ParseError("invalid column reference".to_string())),
997    }
998}
999
1000fn build_create_table(pair: Pair<'_, Rule>) -> Result<CreateTable> {
1001    let mut name = None;
1002    let mut if_not_exists = false;
1003    let mut columns = Vec::new();
1004    let mut unique_constraints = Vec::new();
1005    let mut immutable = false;
1006    let mut state_machine = None;
1007    let mut dag_edge_types = Vec::new();
1008    let mut propagation_rules = Vec::new();
1009    let mut has_propagation = false;
1010    let mut retain = None;
1011
1012    for p in pair.into_inner() {
1013        match p.as_rule() {
1014            Rule::if_not_exists => if_not_exists = true,
1015            Rule::identifier if name.is_none() => name = Some(parse_identifier(p.as_str())),
1016            Rule::table_element => {
1017                let element = p
1018                    .into_inner()
1019                    .next()
1020                    .ok_or_else(|| Error::ParseError("invalid table element".to_string()))?;
1021                match element.as_rule() {
1022                    Rule::column_def => {
1023                        let (col, inline_sm) = build_column_def(element)?;
1024                        if col
1025                            .references
1026                            .as_ref()
1027                            .is_some_and(|fk| !fk.propagation_rules.is_empty())
1028                        {
1029                            has_propagation = true;
1030                        }
1031                        columns.push(col);
1032                        if let Some(sm) = inline_sm {
1033                            if state_machine.is_some() {
1034                                return Err(Error::ParseError(
1035                                    "duplicate STATE MACHINE clause".to_string(),
1036                                ));
1037                            }
1038                            state_machine = Some(sm);
1039                        }
1040                    }
1041                    Rule::unique_table_constraint => {
1042                        unique_constraints.push(build_unique_table_constraint(element)?);
1043                    }
1044                    other => {
1045                        return Err(unexpected_rule(other, "build_create_table.table_element"));
1046                    }
1047                }
1048            }
1049            Rule::table_option => {
1050                let opt = p
1051                    .into_inner()
1052                    .next()
1053                    .ok_or_else(|| Error::ParseError("invalid table option".to_string()))?;
1054                match opt.as_rule() {
1055                    Rule::immutable_option => {
1056                        if immutable {
1057                            return Err(Error::ParseError(
1058                                "duplicate IMMUTABLE clause".to_string(),
1059                            ));
1060                        }
1061                        immutable = true;
1062                    }
1063                    Rule::state_machine_option => {
1064                        if state_machine.is_some() {
1065                            return Err(Error::ParseError(
1066                                "duplicate STATE MACHINE clause".to_string(),
1067                            ));
1068                        }
1069                        state_machine = Some(build_state_machine_option(opt)?)
1070                    }
1071                    Rule::dag_option => {
1072                        if !dag_edge_types.is_empty() {
1073                            return Err(Error::ParseError("duplicate DAG clause".to_string()));
1074                        }
1075                        dag_edge_types = build_dag_option(opt)?;
1076                    }
1077                    Rule::propagate_edge_option => {
1078                        has_propagation = true;
1079                        propagation_rules.push(build_edge_propagation_option(opt)?);
1080                    }
1081                    Rule::propagate_state_option => {
1082                        has_propagation = true;
1083                        propagation_rules.push(build_vector_propagation_option(opt)?);
1084                    }
1085                    Rule::retain_option => {
1086                        if retain.is_some() {
1087                            return Err(Error::ParseError("duplicate RETAIN clause".to_string()));
1088                        }
1089                        retain = Some(build_retain_option(opt)?);
1090                    }
1091                    other => return Err(unexpected_rule(other, "build_create_table.table_option")),
1092                }
1093            }
1094            other => return Err(unexpected_rule(other, "build_create_table")),
1095        }
1096    }
1097
1098    let options_count = [
1099        immutable,
1100        state_machine.is_some(),
1101        !dag_edge_types.is_empty(),
1102    ]
1103    .into_iter()
1104    .filter(|v| *v)
1105    .count();
1106
1107    if options_count > 1 {
1108        return Err(Error::ParseError(
1109            "IMMUTABLE, STATE MACHINE, and DAG cannot be used together".to_string(),
1110        ));
1111    }
1112
1113    if has_propagation && (immutable || !dag_edge_types.is_empty()) {
1114        return Err(Error::ParseError(
1115            "propagation clauses require STATE MACHINE tables".to_string(),
1116        ));
1117    }
1118
1119    if immutable && retain.is_some() {
1120        return Err(Error::ParseError(
1121            "IMMUTABLE and RETAIN are mutually exclusive".to_string(),
1122        ));
1123    }
1124
1125    // A column declared both in the STATE MACHINE status position AND IMMUTABLE is
1126    // contradictory — STATE MACHINE permits transitions, IMMUTABLE refuses them.
1127    if let Some(sm) = &state_machine
1128        && let Some(col) = columns.iter().find(|c| c.name == sm.column)
1129        && col.immutable
1130    {
1131        return Err(Error::ParseError(format!(
1132            "column '{}' cannot be both IMMUTABLE and the STATE MACHINE status column",
1133            sm.column
1134        )));
1135    }
1136
1137    // Propagation rules that write into a column (edge propagation and
1138    // FK propagation `PROPAGATE SET <col>`) cannot target a column declared
1139    // IMMUTABLE on the same table.
1140    for rule in &propagation_rules {
1141        if let AstPropagationRule::EdgeState { target_state, .. } = rule
1142            && let Some(col) = columns.iter().find(|c| c.name == *target_state)
1143            && col.immutable
1144        {
1145            return Err(Error::ParseError(format!(
1146                "propagation rule cannot target column '{}' declared IMMUTABLE",
1147                target_state
1148            )));
1149        }
1150    }
1151    for col in &columns {
1152        let Some(fk) = &col.references else { continue };
1153        for rule in &fk.propagation_rules {
1154            if let AstPropagationRule::FkState { target_state, .. } = rule
1155                && let Some(target_col) = columns.iter().find(|c| c.name == *target_state)
1156                && target_col.immutable
1157            {
1158                return Err(Error::ParseError(format!(
1159                    "FK propagation rule cannot target column '{}' declared IMMUTABLE",
1160                    target_state
1161                )));
1162            }
1163        }
1164    }
1165
1166    for columns_in_constraint in &unique_constraints {
1167        for column_name in columns_in_constraint {
1168            if !columns.iter().any(|column| column.name == *column_name) {
1169                return Err(Error::ParseError(format!(
1170                    "UNIQUE constraint references unknown column '{}'",
1171                    column_name
1172                )));
1173            }
1174        }
1175    }
1176
1177    Ok(CreateTable {
1178        name: name.ok_or_else(|| Error::ParseError("missing table name".to_string()))?,
1179        columns,
1180        unique_constraints,
1181        if_not_exists,
1182        immutable,
1183        state_machine,
1184        dag_edge_types,
1185        propagation_rules,
1186        retain,
1187    })
1188}
1189
1190fn build_alter_table(pair: Pair<'_, Rule>) -> Result<AlterTable> {
1191    let mut table = None;
1192    let mut action = None;
1193
1194    for p in pair.into_inner() {
1195        match p.as_rule() {
1196            Rule::identifier if table.is_none() => table = Some(parse_identifier(p.as_str())),
1197            Rule::alter_action => action = Some(build_alter_action(p)?),
1198            other => return Err(unexpected_rule(other, "build_alter_table")),
1199        }
1200    }
1201
1202    Ok(AlterTable {
1203        table: table.ok_or_else(|| Error::ParseError("missing table name".to_string()))?,
1204        action: action
1205            .ok_or_else(|| Error::ParseError("missing ALTER TABLE action".to_string()))?,
1206    })
1207}
1208
1209fn build_alter_action(pair: Pair<'_, Rule>) -> Result<AlterAction> {
1210    let action = pair
1211        .into_inner()
1212        .next()
1213        .ok_or_else(|| Error::ParseError("missing ALTER TABLE action".to_string()))?;
1214
1215    match action.as_rule() {
1216        Rule::add_column_action => {
1217            let (column, _) = action
1218                .into_inner()
1219                .find(|part| part.as_rule() == Rule::column_def)
1220                .ok_or_else(|| {
1221                    Error::ParseError("ADD COLUMN missing column definition".to_string())
1222                })
1223                .and_then(build_column_def)?;
1224            Ok(AlterAction::AddColumn(column))
1225        }
1226        Rule::drop_column_action => {
1227            let mut column: Option<String> = None;
1228            let mut cascade = false;
1229            for part in action.into_inner() {
1230                match part.as_rule() {
1231                    Rule::identifier if column.is_none() => {
1232                        column = Some(parse_identifier(part.as_str()));
1233                    }
1234                    Rule::drop_column_modifier => {
1235                        let token = part.as_str().to_ascii_uppercase();
1236                        if token == "CASCADE" {
1237                            cascade = true;
1238                        }
1239                        // RESTRICT is the default; no-op.
1240                    }
1241                    other => return Err(unexpected_rule(other, "build_alter_action/drop_column")),
1242                }
1243            }
1244            let column = column
1245                .ok_or_else(|| Error::ParseError("DROP COLUMN missing column name".to_string()))?;
1246            Ok(AlterAction::DropColumn { column, cascade })
1247        }
1248        Rule::rename_column_action => {
1249            let mut identifiers = action
1250                .into_inner()
1251                .filter(|part| part.as_rule() == Rule::identifier)
1252                .map(|part| parse_identifier(part.as_str()));
1253            let from = identifiers.next().ok_or_else(|| {
1254                Error::ParseError("RENAME COLUMN missing source name".to_string())
1255            })?;
1256            let to = identifiers.next().ok_or_else(|| {
1257                Error::ParseError("RENAME COLUMN missing target name".to_string())
1258            })?;
1259            Ok(AlterAction::RenameColumn { from, to })
1260        }
1261        Rule::set_retain_action => {
1262            let retain = build_retain_option(action)?;
1263            Ok(AlterAction::SetRetain {
1264                duration_seconds: retain.duration_seconds,
1265                sync_safe: retain.sync_safe,
1266            })
1267        }
1268        Rule::drop_retain_action => Ok(AlterAction::DropRetain),
1269        Rule::set_table_conflict_policy => {
1270            let policy = action
1271                .into_inner()
1272                .find(|p| p.as_rule() == Rule::conflict_policy_value)
1273                .ok_or_else(|| Error::ParseError("missing conflict policy value".to_string()))?
1274                .as_str()
1275                .to_lowercase();
1276            Ok(AlterAction::SetSyncConflictPolicy(policy))
1277        }
1278        Rule::drop_table_conflict_policy => Ok(AlterAction::DropSyncConflictPolicy),
1279        _ => Err(Error::ParseError(
1280            "unsupported ALTER TABLE action".to_string(),
1281        )),
1282    }
1283}
1284
1285fn build_column_def(pair: Pair<'_, Rule>) -> Result<(ColumnDef, Option<StateMachineDef>)> {
1286    let mut name = None;
1287    let mut data_type = None;
1288    let mut nullable = true;
1289    let mut primary_key = false;
1290    let mut unique = false;
1291    let mut default = None;
1292    let mut references = None;
1293    let mut fk_propagation_rules = Vec::new();
1294    let mut inline_state_machine = None;
1295    let mut expires = false;
1296    let mut immutable_flag = false;
1297    let mut quantization = VectorQuantization::F32;
1298    let mut rank_policy = None;
1299    // Track if we saw the type token before column_constraints. If IMMUTABLE appears
1300    // as the column name (i.e. before the data_type position), Pest will parse it as
1301    // the identifier rule; we detect that case by the column name.
1302    let mut column_name_text: Option<String> = None;
1303
1304    for p in pair.into_inner() {
1305        match p.as_rule() {
1306            Rule::identifier if name.is_none() => {
1307                let ident = parse_identifier(p.as_str());
1308                column_name_text = Some(ident.clone());
1309                name = Some(ident);
1310            }
1311            Rule::data_type => {
1312                quantization = vector_quantization_for_data_type(&p)?;
1313                data_type = Some(build_data_type(p)?);
1314            }
1315            Rule::column_constraint => {
1316                let c = p
1317                    .into_inner()
1318                    .next()
1319                    .ok_or_else(|| Error::ParseError("invalid column constraint".to_string()))?;
1320                match c.as_rule() {
1321                    Rule::not_null => {
1322                        if !nullable {
1323                            return Err(Error::ParseError(
1324                                "duplicate NOT NULL constraint".to_string(),
1325                            ));
1326                        }
1327                        nullable = false;
1328                    }
1329                    Rule::nullable_marker => {
1330                        // Explicit NULL — column remains nullable. Idempotent.
1331                    }
1332                    Rule::primary_key => {
1333                        if primary_key {
1334                            return Err(Error::ParseError(
1335                                "duplicate PRIMARY KEY constraint".to_string(),
1336                            ));
1337                        }
1338                        primary_key = true;
1339                    }
1340                    Rule::unique => {
1341                        if unique {
1342                            return Err(Error::ParseError(
1343                                "duplicate UNIQUE constraint".to_string(),
1344                            ));
1345                        }
1346                        unique = true;
1347                    }
1348                    Rule::default_clause => {
1349                        if default.is_some() {
1350                            return Err(Error::ParseError("duplicate DEFAULT clause".to_string()));
1351                        }
1352                        let expr = c
1353                            .into_inner()
1354                            .find(|i| i.as_rule() == Rule::expr)
1355                            .ok_or_else(|| {
1356                                Error::ParseError("DEFAULT missing expression".to_string())
1357                            })?;
1358                        default = Some(build_expr(expr)?);
1359                    }
1360                    Rule::references_clause => {
1361                        if references.is_some() {
1362                            return Err(Error::ParseError(
1363                                "duplicate REFERENCES clause".to_string(),
1364                            ));
1365                        }
1366                        references = Some(build_references_clause(c)?);
1367                    }
1368                    Rule::fk_propagation_clause => {
1369                        fk_propagation_rules.push(build_fk_propagation_clause(c)?);
1370                    }
1371                    Rule::expires_constraint => {
1372                        if expires {
1373                            return Err(Error::ParseError(
1374                                "duplicate EXPIRES constraint".to_string(),
1375                            ));
1376                        }
1377                        expires = true;
1378                    }
1379                    Rule::immutable_constraint => {
1380                        if immutable_flag {
1381                            let col = column_name_text.as_deref().unwrap_or("column");
1382                            return Err(Error::ParseError(format!(
1383                                "duplicate IMMUTABLE constraint on column '{col}'"
1384                            )));
1385                        }
1386                        immutable_flag = true;
1387                    }
1388                    Rule::rank_policy_clause => {
1389                        if rank_policy.is_some() {
1390                            let col = column_name_text.as_deref().unwrap_or("column");
1391                            return Err(Error::ParseError(format!(
1392                                "duplicate RANK_POLICY constraint on column '{col}'"
1393                            )));
1394                        }
1395                        rank_policy = Some(Box::new(build_rank_policy_clause(c)?));
1396                    }
1397                    Rule::state_machine_option => {
1398                        if inline_state_machine.is_some() {
1399                            return Err(Error::ParseError(
1400                                "duplicate STATE MACHINE clause".to_string(),
1401                            ));
1402                        }
1403                        inline_state_machine = Some(build_state_machine_option(c)?);
1404                    }
1405                    other => {
1406                        return Err(unexpected_rule(other, "build_column_def.column_constraint"));
1407                    }
1408                }
1409            }
1410            other => return Err(unexpected_rule(other, "build_column_def")),
1411        }
1412    }
1413
1414    if !fk_propagation_rules.is_empty() {
1415        let fk = references.as_mut().ok_or_else(|| {
1416            Error::ParseError("FK propagation requires REFERENCES constraint".to_string())
1417        })?;
1418        fk.propagation_rules = fk_propagation_rules;
1419    }
1420
1421    Ok((
1422        ColumnDef {
1423            name: name.ok_or_else(|| Error::ParseError("column name missing".to_string()))?,
1424            data_type: data_type
1425                .ok_or_else(|| Error::ParseError("column type missing".to_string()))?,
1426            nullable,
1427            primary_key,
1428            unique,
1429            default,
1430            references,
1431            expires,
1432            immutable: immutable_flag,
1433            quantization,
1434            rank_policy,
1435        },
1436        inline_state_machine,
1437    ))
1438}
1439
1440fn build_rank_policy_clause(pair: Pair<'_, Rule>) -> Result<RankPolicyAst> {
1441    let mut joined_table = None;
1442    let mut joined_column = None;
1443    let mut formula = None;
1444    let mut sort_key = None;
1445
1446    for p in pair.into_inner() {
1447        match p.as_rule() {
1448            Rule::rank_policy_join => {
1449                let mut identifiers = p
1450                    .into_inner()
1451                    .filter(|part| part.as_rule() == Rule::identifier)
1452                    .map(|part| parse_identifier(part.as_str()));
1453                joined_table = identifiers.next();
1454                joined_column = identifiers.next();
1455            }
1456            Rule::rank_policy_formula => {
1457                let raw = p
1458                    .into_inner()
1459                    .find(|part| part.as_rule() == Rule::string)
1460                    .ok_or_else(|| {
1461                        Error::ParseError("RANK_POLICY FORMULA missing string".to_string())
1462                    })?;
1463                formula = Some(parse_string_literal(raw.as_str()));
1464            }
1465            Rule::rank_policy_sort_key => {
1466                sort_key = p
1467                    .into_inner()
1468                    .find(|part| part.as_rule() == Rule::identifier)
1469                    .map(|part| parse_identifier(part.as_str()));
1470            }
1471            other => return Err(unexpected_rule(other, "build_rank_policy_clause")),
1472        }
1473    }
1474
1475    Ok(RankPolicyAst {
1476        joined_table: joined_table
1477            .ok_or_else(|| Error::ParseError("RANK_POLICY JOIN missing table".to_string()))?,
1478        joined_column: joined_column
1479            .ok_or_else(|| Error::ParseError("RANK_POLICY JOIN missing column".to_string()))?,
1480        formula: formula
1481            .ok_or_else(|| Error::ParseError("RANK_POLICY FORMULA missing string".to_string()))?,
1482        sort_key: sort_key
1483            .ok_or_else(|| Error::ParseError("RANK_POLICY SORT_KEY missing key".to_string()))?,
1484    })
1485}
1486
1487fn build_unique_table_constraint(pair: Pair<'_, Rule>) -> Result<Vec<String>> {
1488    let columns: Vec<String> = pair
1489        .into_inner()
1490        .filter(|part| part.as_rule() == Rule::identifier)
1491        .map(|part| parse_identifier(part.as_str()))
1492        .collect();
1493
1494    if columns.len() < 2 {
1495        return Err(Error::ParseError(
1496            "table-level UNIQUE requires at least two columns".to_string(),
1497        ));
1498    }
1499
1500    let mut seen = std::collections::HashSet::new();
1501    for column in &columns {
1502        if !seen.insert(column.clone()) {
1503            return Err(Error::ParseError(format!(
1504                "duplicate column '{}' in UNIQUE constraint",
1505                column
1506            )));
1507        }
1508    }
1509
1510    Ok(columns)
1511}
1512
1513fn build_retain_option(pair: Pair<'_, Rule>) -> Result<RetainOption> {
1514    let mut amount = None;
1515    let mut unit = None;
1516    let mut sync_safe = false;
1517
1518    for part in pair.into_inner() {
1519        match part.as_rule() {
1520            Rule::integer => {
1521                amount = Some(part.as_str().parse::<u64>().map_err(|err| {
1522                    Error::ParseError(format!(
1523                        "invalid RETAIN duration '{}': {err}",
1524                        part.as_str()
1525                    ))
1526                })?);
1527            }
1528            Rule::retain_unit => unit = Some(part.as_str().to_ascii_uppercase()),
1529            Rule::sync_safe_option => sync_safe = true,
1530            other => return Err(unexpected_rule(other, "build_retain_option")),
1531        }
1532    }
1533
1534    let amount = amount.ok_or_else(|| Error::ParseError("RETAIN missing duration".to_string()))?;
1535    let unit = unit.ok_or_else(|| Error::ParseError("RETAIN missing unit".to_string()))?;
1536    let duration_seconds = match unit.as_str() {
1537        "SECONDS" | "SECOND" => amount,
1538        "MINUTES" | "MINUTE" => amount.saturating_mul(60),
1539        "HOURS" | "HOUR" => amount.saturating_mul(60 * 60),
1540        "DAYS" | "DAY" => amount.saturating_mul(24 * 60 * 60),
1541        _ => {
1542            return Err(Error::ParseError(format!(
1543                "unsupported RETAIN unit: {unit}"
1544            )));
1545        }
1546    };
1547
1548    Ok(RetainOption {
1549        duration_seconds,
1550        sync_safe,
1551    })
1552}
1553
1554fn build_references_clause(pair: Pair<'_, Rule>) -> Result<ForeignKey> {
1555    let ids: Vec<String> = pair
1556        .into_inner()
1557        .filter(|p| p.as_rule() == Rule::identifier)
1558        .map(|p| parse_identifier(p.as_str()))
1559        .collect();
1560
1561    if ids.len() < 2 {
1562        return Err(Error::ParseError(
1563            "REFERENCES requires table and column".to_string(),
1564        ));
1565    }
1566
1567    Ok(ForeignKey {
1568        table: ids[0].clone(),
1569        column: ids[1].clone(),
1570        propagation_rules: Vec::new(),
1571    })
1572}
1573
1574fn build_fk_propagation_clause(pair: Pair<'_, Rule>) -> Result<AstPropagationRule> {
1575    let mut trigger_state = None;
1576    let mut target_state = None;
1577    let mut max_depth = None;
1578    let mut abort_on_failure = false;
1579
1580    for p in pair.into_inner() {
1581        match p.as_rule() {
1582            Rule::identifier if trigger_state.is_none() => {
1583                trigger_state = Some(parse_identifier(p.as_str()))
1584            }
1585            Rule::identifier if target_state.is_none() => {
1586                target_state = Some(parse_identifier(p.as_str()))
1587            }
1588            Rule::max_depth_clause => max_depth = Some(parse_max_depth_clause(p)?),
1589            Rule::abort_on_failure_clause => abort_on_failure = true,
1590            other => return Err(unexpected_rule(other, "build_fk_propagation_clause")),
1591        }
1592    }
1593
1594    Ok(AstPropagationRule::FkState {
1595        trigger_state: trigger_state
1596            .ok_or_else(|| Error::ParseError("FK propagation missing trigger state".to_string()))?,
1597        target_state: target_state
1598            .ok_or_else(|| Error::ParseError("FK propagation missing target state".to_string()))?,
1599        max_depth,
1600        abort_on_failure,
1601    })
1602}
1603
1604fn build_edge_propagation_option(pair: Pair<'_, Rule>) -> Result<AstPropagationRule> {
1605    let mut edge_type = None;
1606    let mut direction = None;
1607    let mut trigger_state = None;
1608    let mut target_state = None;
1609    let mut max_depth = None;
1610    let mut abort_on_failure = false;
1611
1612    for p in pair.into_inner() {
1613        match p.as_rule() {
1614            Rule::identifier if edge_type.is_none() => {
1615                edge_type = Some(parse_identifier(p.as_str()))
1616            }
1617            Rule::direction_kw => direction = Some(parse_identifier(p.as_str())),
1618            Rule::identifier if trigger_state.is_none() => {
1619                trigger_state = Some(parse_identifier(p.as_str()))
1620            }
1621            Rule::identifier if target_state.is_none() => {
1622                target_state = Some(parse_identifier(p.as_str()))
1623            }
1624            Rule::max_depth_clause => max_depth = Some(parse_max_depth_clause(p)?),
1625            Rule::abort_on_failure_clause => abort_on_failure = true,
1626            other => return Err(unexpected_rule(other, "build_edge_propagation_option")),
1627        }
1628    }
1629
1630    Ok(AstPropagationRule::EdgeState {
1631        edge_type: edge_type
1632            .ok_or_else(|| Error::ParseError("EDGE propagation missing edge type".to_string()))?,
1633        direction: direction
1634            .ok_or_else(|| Error::ParseError("EDGE propagation missing direction".to_string()))?,
1635        trigger_state: trigger_state.ok_or_else(|| {
1636            Error::ParseError("EDGE propagation missing trigger state".to_string())
1637        })?,
1638        target_state: target_state.ok_or_else(|| {
1639            Error::ParseError("EDGE propagation missing target state".to_string())
1640        })?,
1641        max_depth,
1642        abort_on_failure,
1643    })
1644}
1645
1646fn build_vector_propagation_option(pair: Pair<'_, Rule>) -> Result<AstPropagationRule> {
1647    let trigger_state = pair
1648        .into_inner()
1649        .find(|p| p.as_rule() == Rule::identifier)
1650        .map(|p| parse_identifier(p.as_str()))
1651        .ok_or_else(|| Error::ParseError("VECTOR propagation missing trigger state".to_string()))?;
1652
1653    Ok(AstPropagationRule::VectorExclusion { trigger_state })
1654}
1655
1656fn parse_max_depth_clause(pair: Pair<'_, Rule>) -> Result<u32> {
1657    let depth = pair
1658        .into_inner()
1659        .find(|p| p.as_rule() == Rule::integer)
1660        .ok_or_else(|| Error::ParseError("MAX DEPTH missing value".to_string()))?;
1661    parse_u32(depth.as_str(), "invalid MAX DEPTH value")
1662}
1663
1664fn build_data_type(pair: Pair<'_, Rule>) -> Result<DataType> {
1665    let txt = pair.as_str().to_string();
1666    let mut inner = pair.into_inner();
1667    if let Some(v) = inner.find(|p| p.as_rule() == Rule::vector_type) {
1668        let dim = v
1669            .into_inner()
1670            .find(|p| p.as_rule() == Rule::integer)
1671            .ok_or_else(|| Error::ParseError("VECTOR dimension missing".to_string()))?;
1672        let dim = parse_u32(dim.as_str(), "invalid VECTOR dimension")?;
1673        return Ok(DataType::Vector(dim));
1674    }
1675
1676    if txt.eq_ignore_ascii_case("UUID") {
1677        Ok(DataType::Uuid)
1678    } else if txt.eq_ignore_ascii_case("TEXT") {
1679        Ok(DataType::Text)
1680    } else if txt.eq_ignore_ascii_case("INTEGER") || txt.eq_ignore_ascii_case("INT") {
1681        Ok(DataType::Integer)
1682    } else if txt.eq_ignore_ascii_case("REAL") || txt.eq_ignore_ascii_case("FLOAT") {
1683        Ok(DataType::Real)
1684    } else if txt.eq_ignore_ascii_case("BOOLEAN") || txt.eq_ignore_ascii_case("BOOL") {
1685        Ok(DataType::Boolean)
1686    } else if txt.eq_ignore_ascii_case("TIMESTAMP") {
1687        Ok(DataType::Timestamp)
1688    } else if txt.eq_ignore_ascii_case("JSON") {
1689        Ok(DataType::Json)
1690    } else if txt.eq_ignore_ascii_case("TXID") {
1691        Ok(DataType::TxId)
1692    } else {
1693        Err(Error::ParseError(format!("unsupported data type: {txt}")))
1694    }
1695}
1696
1697fn vector_quantization_for_data_type(pair: &Pair<'_, Rule>) -> Result<VectorQuantization> {
1698    let Some(vector_type) = pair
1699        .clone()
1700        .into_inner()
1701        .find(|p| p.as_rule() == Rule::vector_type)
1702    else {
1703        return Ok(VectorQuantization::F32);
1704    };
1705
1706    for p in vector_type.into_inner() {
1707        if p.as_rule() == Rule::vector_quantization_clause {
1708            let value = p
1709                .into_inner()
1710                .find(|part| part.as_rule() == Rule::vector_quantization_value)
1711                .ok_or_else(|| Error::ParseError("missing vector quantization value".to_string()))?
1712                .as_str()
1713                .trim_matches('\'')
1714                .to_ascii_uppercase();
1715            return match value.as_str() {
1716                "F32" => Ok(VectorQuantization::F32),
1717                "SQ8" => Ok(VectorQuantization::SQ8),
1718                "SQ4" => Ok(VectorQuantization::SQ4),
1719                _ => Err(Error::ParseError(format!(
1720                    "unsupported vector quantization '{value}'"
1721                ))),
1722            };
1723        }
1724    }
1725
1726    Ok(VectorQuantization::F32)
1727}
1728
1729fn build_state_machine_option(pair: Pair<'_, Rule>) -> Result<StateMachineDef> {
1730    let entries = pair
1731        .into_inner()
1732        .find(|p| p.as_rule() == Rule::state_machine_entries)
1733        .ok_or_else(|| Error::ParseError("invalid STATE MACHINE clause".to_string()))?;
1734
1735    let mut column = None;
1736    let mut transitions: Vec<(String, Vec<String>)> = Vec::new();
1737
1738    for entry in entries
1739        .into_inner()
1740        .filter(|p| p.as_rule() == Rule::state_machine_entry)
1741    {
1742        let has_column_prefix = entry.as_str().contains(':');
1743        let ids: Vec<String> = entry
1744            .into_inner()
1745            .filter(|p| p.as_rule() == Rule::identifier)
1746            .map(|p| parse_identifier(p.as_str()))
1747            .collect();
1748
1749        if ids.len() < 2 {
1750            return Err(Error::ParseError(
1751                "invalid STATE MACHINE transition".to_string(),
1752            ));
1753        }
1754
1755        let (from, to_targets) = if has_column_prefix {
1756            if column.is_none() {
1757                column = Some(ids[0].clone());
1758            }
1759            (ids[1].clone(), ids[2..].to_vec())
1760        } else {
1761            (ids[0].clone(), ids[1..].to_vec())
1762        };
1763
1764        if let Some((_, existing)) = transitions.iter_mut().find(|(src, _)| src == &from) {
1765            for t in to_targets {
1766                if !existing.iter().any(|v| v == &t) {
1767                    existing.push(t);
1768                }
1769            }
1770        } else {
1771            transitions.push((from, to_targets));
1772        }
1773    }
1774
1775    Ok(StateMachineDef {
1776        column: column.unwrap_or_else(|| "status".to_string()),
1777        transitions,
1778    })
1779}
1780
1781fn build_dag_option(pair: Pair<'_, Rule>) -> Result<Vec<String>> {
1782    let edge_types = pair
1783        .into_inner()
1784        .filter(|p| p.as_rule() == Rule::string)
1785        .map(|p| parse_string_literal(p.as_str()))
1786        .collect::<Vec<_>>();
1787
1788    if edge_types.is_empty() {
1789        return Err(Error::ParseError(
1790            "DAG requires at least one edge type".to_string(),
1791        ));
1792    }
1793
1794    Ok(edge_types)
1795}
1796
1797fn build_drop_table(pair: Pair<'_, Rule>) -> Result<DropTable> {
1798    let mut if_exists = false;
1799    let mut name = None;
1800
1801    for p in pair.into_inner() {
1802        match p.as_rule() {
1803            Rule::if_exists => if_exists = true,
1804            Rule::identifier => name = Some(parse_identifier(p.as_str())),
1805            other => return Err(unexpected_rule(other, "build_drop_table")),
1806        }
1807    }
1808
1809    Ok(DropTable {
1810        name: name.ok_or_else(|| Error::ParseError("missing table name".to_string()))?,
1811        if_exists,
1812    })
1813}
1814
1815fn build_create_index(pair: Pair<'_, Rule>) -> Result<CreateIndex> {
1816    let mut name: Option<String> = None;
1817    let mut table: Option<String> = None;
1818    let mut columns: Vec<(String, SortDirection)> = Vec::new();
1819
1820    for p in pair.into_inner() {
1821        match p.as_rule() {
1822            Rule::identifier if name.is_none() => {
1823                name = Some(parse_identifier(p.as_str()));
1824            }
1825            Rule::identifier if table.is_none() => {
1826                table = Some(parse_identifier(p.as_str()));
1827            }
1828            Rule::indexed_column => {
1829                let mut col_name: Option<String> = None;
1830                let mut direction = SortDirection::Asc;
1831                for inner in p.into_inner() {
1832                    match inner.as_rule() {
1833                        Rule::identifier if col_name.is_none() => {
1834                            col_name = Some(parse_identifier(inner.as_str()));
1835                        }
1836                        Rule::index_sort_direction => {
1837                            let token = inner.as_str().to_ascii_uppercase();
1838                            direction = if token == "DESC" {
1839                                SortDirection::Desc
1840                            } else {
1841                                SortDirection::Asc
1842                            };
1843                        }
1844                        other => return Err(unexpected_rule(other, "build_create_index/column")),
1845                    }
1846                }
1847                let col = col_name
1848                    .ok_or_else(|| Error::ParseError("CREATE INDEX missing column".to_string()))?;
1849                columns.push((col, direction));
1850            }
1851            other => return Err(unexpected_rule(other, "build_create_index")),
1852        }
1853    }
1854
1855    Ok(CreateIndex {
1856        name: name.ok_or_else(|| Error::ParseError("CREATE INDEX missing name".to_string()))?,
1857        table: table.ok_or_else(|| Error::ParseError("CREATE INDEX missing table".to_string()))?,
1858        columns,
1859    })
1860}
1861
1862fn build_drop_index(pair: Pair<'_, Rule>) -> Result<DropIndex> {
1863    let mut if_exists = false;
1864    let mut idents: Vec<String> = Vec::new();
1865    for p in pair.into_inner() {
1866        match p.as_rule() {
1867            Rule::if_exists => if_exists = true,
1868            Rule::identifier => idents.push(parse_identifier(p.as_str())),
1869            other => return Err(unexpected_rule(other, "build_drop_index")),
1870        }
1871    }
1872    if idents.len() < 2 {
1873        return Err(Error::ParseError(
1874            "DROP INDEX requires `<index_name> ON <table>`".to_string(),
1875        ));
1876    }
1877    Ok(DropIndex {
1878        name: idents[0].clone(),
1879        table: idents[1].clone(),
1880        if_exists,
1881    })
1882}
1883
1884fn build_insert(pair: Pair<'_, Rule>) -> Result<Insert> {
1885    let mut table = None;
1886    let mut columns = Vec::new();
1887    let mut values = Vec::new();
1888    let mut on_conflict = None;
1889    let mut seen_table = false;
1890
1891    for p in pair.into_inner() {
1892        match p.as_rule() {
1893            Rule::identifier if !seen_table => {
1894                table = Some(parse_identifier(p.as_str()));
1895                seen_table = true;
1896            }
1897            Rule::identifier => columns.push(parse_identifier(p.as_str())),
1898            Rule::values_row => values.push(build_values_row(p)?),
1899            Rule::on_conflict_clause => on_conflict = Some(build_on_conflict(p)?),
1900            other => return Err(unexpected_rule(other, "build_insert")),
1901        }
1902    }
1903
1904    Ok(Insert {
1905        table: table.ok_or_else(|| Error::ParseError("INSERT missing table".to_string()))?,
1906        columns,
1907        values,
1908        on_conflict,
1909    })
1910}
1911
1912fn build_values_row(pair: Pair<'_, Rule>) -> Result<Vec<Expr>> {
1913    pair.into_inner()
1914        .filter(|p| p.as_rule() == Rule::expr)
1915        .map(build_expr)
1916        .collect()
1917}
1918
1919fn build_on_conflict(pair: Pair<'_, Rule>) -> Result<OnConflict> {
1920    let mut columns = Vec::new();
1921    let mut update_columns = Vec::new();
1922
1923    for p in pair.into_inner() {
1924        match p.as_rule() {
1925            Rule::identifier => columns.push(parse_identifier(p.as_str())),
1926            Rule::assignment => update_columns.push(build_assignment(p)?),
1927            other => return Err(unexpected_rule(other, "build_on_conflict")),
1928        }
1929    }
1930
1931    Ok(OnConflict {
1932        columns,
1933        update_columns,
1934    })
1935}
1936
1937fn build_assignment(pair: Pair<'_, Rule>) -> Result<(String, Expr)> {
1938    let mut name = None;
1939    let mut value = None;
1940
1941    for p in pair.into_inner() {
1942        match p.as_rule() {
1943            Rule::identifier if name.is_none() => name = Some(parse_identifier(p.as_str())),
1944            Rule::expr => value = Some(build_expr(p)?),
1945            other => return Err(unexpected_rule(other, "build_assignment")),
1946        }
1947    }
1948
1949    Ok((
1950        name.ok_or_else(|| Error::ParseError("assignment missing column".to_string()))?,
1951        value.ok_or_else(|| Error::ParseError("assignment missing value".to_string()))?,
1952    ))
1953}
1954
1955fn build_delete(pair: Pair<'_, Rule>) -> Result<Delete> {
1956    let mut table = None;
1957    let mut where_clause = None;
1958
1959    for p in pair.into_inner() {
1960        match p.as_rule() {
1961            Rule::identifier => table = Some(parse_identifier(p.as_str())),
1962            Rule::where_clause => where_clause = Some(build_where_clause(p)?),
1963            other => return Err(unexpected_rule(other, "build_delete")),
1964        }
1965    }
1966
1967    Ok(Delete {
1968        table: table.ok_or_else(|| Error::ParseError("DELETE missing table".to_string()))?,
1969        where_clause,
1970    })
1971}
1972
1973fn build_update(pair: Pair<'_, Rule>) -> Result<Update> {
1974    let mut table = None;
1975    let mut assignments = Vec::new();
1976    let mut where_clause = None;
1977
1978    for p in pair.into_inner() {
1979        match p.as_rule() {
1980            Rule::identifier if table.is_none() => table = Some(parse_identifier(p.as_str())),
1981            Rule::assignment => assignments.push(build_assignment(p)?),
1982            Rule::where_clause => where_clause = Some(build_where_clause(p)?),
1983            other => return Err(unexpected_rule(other, "build_update")),
1984        }
1985    }
1986
1987    Ok(Update {
1988        table: table.ok_or_else(|| Error::ParseError("UPDATE missing table".to_string()))?,
1989        assignments,
1990        where_clause,
1991    })
1992}
1993
1994fn validate_statement(stmt: &Statement) -> Result<()> {
1995    if let Statement::Select(sel) = stmt {
1996        validate_select(sel)?;
1997    }
1998    Ok(())
1999}
2000
2001fn validate_select(sel: &SelectStatement) -> Result<()> {
2002    for cte in &sel.ctes {
2003        if let Cte::SqlCte { query, .. } = cte {
2004            validate_select_body(query)?;
2005        }
2006    }
2007
2008    validate_select_body(&sel.body)?;
2009
2010    let cte_names = sel
2011        .ctes
2012        .iter()
2013        .map(|c| match c {
2014            Cte::SqlCte { name, .. } | Cte::MatchCte { name, .. } => name.as_str(),
2015        })
2016        .collect::<Vec<_>>();
2017
2018    if let Some(expr) = &sel.body.where_clause {
2019        validate_subquery_expr(expr, &cte_names)?;
2020    }
2021
2022    Ok(())
2023}
2024
2025fn validate_select_body(body: &SelectBody) -> Result<()> {
2026    if body
2027        .order_by
2028        .iter()
2029        .any(|o| matches!(o.direction, SortDirection::CosineDistance))
2030        && body.limit.is_none()
2031    {
2032        return if body.use_rank.is_some() {
2033            Err(Error::UseRankRequiresLimit)
2034        } else {
2035            Err(Error::UnboundedVectorSearch)
2036        };
2037    }
2038
2039    for from in &body.from {
2040        if let FromItem::GraphTable { match_clause, .. } = from {
2041            validate_match_clause(match_clause)?;
2042        }
2043    }
2044
2045    if let Some(expr) = &body.where_clause {
2046        validate_expr(expr)?;
2047    }
2048
2049    Ok(())
2050}
2051
2052fn validate_match_clause(mc: &MatchClause) -> Result<()> {
2053    if mc.graph_name.as_ref().is_none_or(|g| g.trim().is_empty()) {
2054        return Err(Error::ParseError(
2055            "GRAPH_TABLE requires graph name".to_string(),
2056        ));
2057    }
2058    if mc.pattern.start.alias.trim().is_empty() {
2059        return Err(Error::ParseError(
2060            "MATCH start node alias is required".to_string(),
2061        ));
2062    }
2063
2064    for edge in &mc.pattern.edges {
2065        if edge.min_hops == 0 && edge.max_hops == 0 {
2066            return Err(Error::UnboundedTraversal);
2067        }
2068        if edge.max_hops == 0 {
2069            return Err(Error::UnboundedTraversal);
2070        }
2071        if edge.min_hops == 0 {
2072            return Err(Error::ParseError(
2073                "graph quantifier minimum hop must be >= 1".to_string(),
2074            ));
2075        }
2076        if edge.min_hops > edge.max_hops {
2077            return Err(Error::ParseError(
2078                "graph quantifier minimum cannot exceed maximum".to_string(),
2079            ));
2080        }
2081        if edge.max_hops > 10 {
2082            return Err(Error::BfsDepthExceeded(edge.max_hops));
2083        }
2084    }
2085
2086    if let Some(expr) = &mc.where_clause {
2087        validate_expr(expr)?;
2088    }
2089
2090    Ok(())
2091}
2092
2093fn validate_expr(expr: &Expr) -> Result<()> {
2094    match expr {
2095        Expr::InSubquery { subquery, .. } => {
2096            if subquery.from.is_empty() {
2097                return Err(Error::SubqueryNotSupported);
2098            }
2099        }
2100        Expr::BinaryOp { left, right, .. } => {
2101            validate_expr(left)?;
2102            validate_expr(right)?;
2103        }
2104        Expr::UnaryOp { operand, .. } => validate_expr(operand)?,
2105        Expr::InList { expr, list, .. } => {
2106            validate_expr(expr)?;
2107            for item in list {
2108                validate_expr(item)?;
2109            }
2110        }
2111        Expr::Like { expr, pattern, .. } => {
2112            validate_expr(expr)?;
2113            validate_expr(pattern)?;
2114        }
2115        Expr::IsNull { expr, .. } => validate_expr(expr)?,
2116        Expr::CosineDistance { left, right } => {
2117            validate_expr(left)?;
2118            validate_expr(right)?;
2119        }
2120        Expr::FunctionCall { args, .. } => {
2121            for arg in args {
2122                validate_expr(arg)?;
2123            }
2124        }
2125        Expr::Column(_) | Expr::Literal(_) | Expr::Parameter(_) => {}
2126    }
2127    Ok(())
2128}
2129
2130fn validate_subquery_expr(expr: &Expr, cte_names: &[&str]) -> Result<()> {
2131    match expr {
2132        Expr::InSubquery { subquery, .. } => {
2133            if subquery.columns.len() != 1 || subquery.from.is_empty() {
2134                return Err(Error::SubqueryNotSupported);
2135            }
2136
2137            let referenced = subquery.from.iter().find_map(|f| match f {
2138                FromItem::Table { name, .. } => Some(name.as_str()),
2139                FromItem::GraphTable { .. } => None,
2140            });
2141            if let Some(name) = referenced {
2142                if cte_names.iter().any(|n| n.eq_ignore_ascii_case(name)) {
2143                    return Ok(());
2144                }
2145                return Ok(());
2146            }
2147            return Err(Error::SubqueryNotSupported);
2148        }
2149        Expr::BinaryOp { left, right, .. } => {
2150            validate_subquery_expr(left, cte_names)?;
2151            validate_subquery_expr(right, cte_names)?;
2152        }
2153        Expr::UnaryOp { operand, .. } => validate_subquery_expr(operand, cte_names)?,
2154        Expr::InList { expr, list, .. } => {
2155            validate_subquery_expr(expr, cte_names)?;
2156            for item in list {
2157                validate_subquery_expr(item, cte_names)?;
2158            }
2159        }
2160        Expr::Like { expr, pattern, .. } => {
2161            validate_subquery_expr(expr, cte_names)?;
2162            validate_subquery_expr(pattern, cte_names)?;
2163        }
2164        Expr::IsNull { expr, .. } => validate_subquery_expr(expr, cte_names)?,
2165        Expr::CosineDistance { left, right } => {
2166            validate_subquery_expr(left, cte_names)?;
2167            validate_subquery_expr(right, cte_names)?;
2168        }
2169        Expr::FunctionCall { args, .. } => {
2170            for arg in args {
2171                validate_subquery_expr(arg, cte_names)?;
2172            }
2173        }
2174        Expr::Column(_) | Expr::Literal(_) | Expr::Parameter(_) => {}
2175    }
2176
2177    Ok(())
2178}
2179
2180fn unexpected_rule(rule: Rule, context: &str) -> Error {
2181    Error::ParseError(format!("unexpected rule {:?} in {}", rule, context))
2182}
2183
2184fn parse_identifier(raw: &str) -> String {
2185    let trimmed = raw.trim();
2186    if trimmed.len() >= 2 && trimmed.starts_with('"') && trimmed.ends_with('"') {
2187        trimmed[1..trimmed.len() - 1].replace("\"\"", "\"")
2188    } else {
2189        trimmed.to_string()
2190    }
2191}
2192
2193fn parse_string_literal(raw: &str) -> String {
2194    let trimmed = raw.trim();
2195    if trimmed.len() >= 2 && trimmed.starts_with('\'') && trimmed.ends_with('\'') {
2196        trimmed[1..trimmed.len() - 1].replace("''", "'")
2197    } else {
2198        trimmed.to_string()
2199    }
2200}
2201
2202fn parse_u32(s: &str, err: &str) -> Result<u32> {
2203    s.parse::<u32>()
2204        .map_err(|_| Error::ParseError(err.to_string()))
2205}
2206
2207fn parse_u64(s: &str, err: &str) -> Result<u64> {
2208    s.parse::<u64>()
2209        .map_err(|_| Error::ParseError(err.to_string()))
2210}
2211
2212fn parse_i64(s: &str, err: &str) -> Result<i64> {
2213    s.parse::<i64>()
2214        .map_err(|_| Error::ParseError(err.to_string()))
2215}
2216
2217fn parse_f64(s: &str, err: &str) -> Result<f64> {
2218    s.parse::<f64>()
2219        .map_err(|_| Error::ParseError(err.to_string()))
2220}
2221
2222fn starts_with_keywords(input: &str, words: &[&str]) -> bool {
2223    let tokens: Vec<&str> = input.split_whitespace().take(words.len()).collect();
2224
2225    if tokens.len() != words.len() {
2226        return false;
2227    }
2228
2229    tokens
2230        .iter()
2231        .zip(words)
2232        .all(|(a, b)| a.eq_ignore_ascii_case(b))
2233}
2234
2235fn contains_token_outside_strings(input: &str, token: &str) -> bool {
2236    let mut in_str = false;
2237    let mut chars = input.char_indices().peekable();
2238
2239    while let Some((idx, ch)) = chars.next() {
2240        if ch == '\'' {
2241            if in_str {
2242                if let Some((_, next_ch)) = chars.peek()
2243                    && *next_ch == '\''
2244                {
2245                    let _ = chars.next();
2246                    continue;
2247                }
2248                in_str = false;
2249            } else {
2250                in_str = true;
2251            }
2252            continue;
2253        }
2254
2255        if in_str {
2256            continue;
2257        }
2258
2259        if is_word_boundary(input, idx.saturating_sub(1))
2260            && input[idx..].len() >= token.len()
2261            && input[idx..idx + token.len()].eq_ignore_ascii_case(token)
2262            && is_word_boundary(input, idx + token.len())
2263        {
2264            return true;
2265        }
2266    }
2267
2268    false
2269}
2270
2271fn contains_keyword_sequence_outside_strings(input: &str, words: &[&str]) -> bool {
2272    let mut tokens = Vec::new();
2273    let mut current = String::new();
2274    let mut in_str = false;
2275    let mut chars = input.chars().peekable();
2276
2277    while let Some(ch) = chars.next() {
2278        if ch == '\'' {
2279            if in_str {
2280                if chars.peek() == Some(&'\'') {
2281                    let _ = chars.next();
2282                    continue;
2283                }
2284                in_str = false;
2285            } else {
2286                in_str = true;
2287            }
2288            if !current.is_empty() {
2289                tokens.push(std::mem::take(&mut current));
2290            }
2291            continue;
2292        }
2293
2294        if in_str {
2295            continue;
2296        }
2297
2298        if ch.is_ascii_alphanumeric() || ch == '_' {
2299            current.push(ch);
2300        } else if !current.is_empty() {
2301            tokens.push(std::mem::take(&mut current));
2302        }
2303    }
2304
2305    if !current.is_empty() {
2306        tokens.push(current);
2307    }
2308
2309    tokens.windows(words.len()).any(|window| {
2310        window
2311            .iter()
2312            .zip(words)
2313            .all(|(a, b)| a.eq_ignore_ascii_case(b))
2314    })
2315}
2316
2317fn contains_where_match_operator(input: &str) -> bool {
2318    let mut in_str = false;
2319    let mut word = String::new();
2320    let mut seen_where = false;
2321
2322    for ch in input.chars() {
2323        if ch == '\'' {
2324            in_str = !in_str;
2325            if !word.is_empty() {
2326                if word.eq_ignore_ascii_case("WHERE") {
2327                    seen_where = true;
2328                } else if seen_where && word.eq_ignore_ascii_case("MATCH") {
2329                    return true;
2330                }
2331                word.clear();
2332            }
2333            continue;
2334        }
2335
2336        if in_str {
2337            continue;
2338        }
2339
2340        if ch.is_ascii_alphanumeric() || ch == '_' {
2341            word.push(ch);
2342            continue;
2343        }
2344
2345        if !word.is_empty() {
2346            if word.eq_ignore_ascii_case("WHERE") {
2347                seen_where = true;
2348            } else if seen_where && word.eq_ignore_ascii_case("GRAPH_TABLE") {
2349                // A later graph traversal can legitimately contain MATCH; do not
2350                // keep a prior WHERE active across that boundary.
2351                seen_where = false;
2352            } else if seen_where && word.eq_ignore_ascii_case("MATCH") {
2353                return true;
2354            } else if seen_where
2355                && (word.eq_ignore_ascii_case("GROUP")
2356                    || word.eq_ignore_ascii_case("ORDER")
2357                    || word.eq_ignore_ascii_case("LIMIT"))
2358            {
2359                seen_where = false;
2360            }
2361            word.clear();
2362        }
2363    }
2364
2365    if !word.is_empty() && seen_where && word.eq_ignore_ascii_case("MATCH") {
2366        return true;
2367    }
2368
2369    false
2370}
2371
2372fn is_word_boundary(s: &str, idx: usize) -> bool {
2373    if idx >= s.len() {
2374        return true;
2375    }
2376    !s.as_bytes()[idx].is_ascii_alphanumeric() && s.as_bytes()[idx] != b'_'
2377}
2378
2379fn build_set_memory_limit(pair: Pair<'_, Rule>) -> Result<SetMemoryLimitValue> {
2380    let inner = pair
2381        .into_inner()
2382        .find(|p| p.as_rule() == Rule::memory_limit_value)
2383        .ok_or_else(|| Error::ParseError("missing memory_limit_value".to_string()))?;
2384
2385    if inner.as_str().eq_ignore_ascii_case("none") {
2386        return Ok(SetMemoryLimitValue::None);
2387    }
2388
2389    let value_inner = inner
2390        .into_inner()
2391        .next()
2392        .ok_or_else(|| Error::ParseError("empty memory_limit_value".to_string()))?;
2393
2394    match value_inner.as_rule() {
2395        Rule::size_with_unit => Ok(SetMemoryLimitValue::Bytes(parse_size_with_unit(
2396            value_inner.as_str(),
2397        )? as usize)),
2398        _ => Ok(SetMemoryLimitValue::None),
2399    }
2400}
2401
2402fn build_set_disk_limit(pair: Pair<'_, Rule>) -> Result<SetDiskLimitValue> {
2403    let inner = pair
2404        .into_inner()
2405        .find(|p| p.as_rule() == Rule::disk_limit_value)
2406        .ok_or_else(|| Error::ParseError("missing disk_limit_value".to_string()))?;
2407
2408    if inner.as_str().eq_ignore_ascii_case("none") {
2409        return Ok(SetDiskLimitValue::None);
2410    }
2411
2412    let value_inner = inner
2413        .into_inner()
2414        .next()
2415        .ok_or_else(|| Error::ParseError("empty disk_limit_value".to_string()))?;
2416
2417    match value_inner.as_rule() {
2418        Rule::size_with_unit => Ok(SetDiskLimitValue::Bytes(parse_size_with_unit(
2419            value_inner.as_str(),
2420        )?)),
2421        _ => Ok(SetDiskLimitValue::None),
2422    }
2423}
2424
2425fn parse_size_with_unit(text: &str) -> Result<u64> {
2426    let (digits, suffix) = text.split_at(text.len() - 1);
2427    let base: u64 = digits
2428        .parse()
2429        .map_err(|e| Error::ParseError(format!("invalid size number: {e}")))?;
2430    let multiplier = match suffix {
2431        "G" | "g" => 1024 * 1024 * 1024,
2432        "M" | "m" => 1024 * 1024,
2433        "K" | "k" => 1024,
2434        _ => return Err(Error::ParseError(format!("unknown size suffix: {suffix}"))),
2435    };
2436    Ok(base * multiplier)
2437}