Skip to main content

reddb_server/storage/query/parser/
dml.rs

1//! DML SQL Parser: INSERT, UPDATE, DELETE
2
3use super::super::ast::{
4    AskCacheClause, AskQuery, BinOp, DeleteQuery, Expr, FieldRef, Filter, InsertEntityType,
5    InsertQuery, OrderByClause, QueryExpr, ReturningItem, UpdateQuery, UpdateTarget,
6};
7use super::super::lexer::Token;
8use super::error::ParseError;
9use super::Parser;
10use crate::storage::query::sql_lowering::{filter_to_expr, fold_expr_to_value};
11use crate::storage::schema::Value;
12
13/// DoS guard: maximum JSON nesting depth accepted by the parser.
14/// Mirrors typical web-server JSON limits and bails out before stack
15/// usage gets dangerous in downstream traversals.
16pub(crate) const JSON_LITERAL_MAX_DEPTH: u32 = 128;
17
18/// Walk a parsed `JsonValue` tree and bail out if nesting exceeds
19/// `JSON_LITERAL_MAX_DEPTH`. Iterative to avoid the very stack
20/// overflow we're trying to prevent.
21pub(crate) fn json_literal_depth_check(
22    value: &crate::utils::json::JsonValue,
23) -> Result<(), String> {
24    use crate::utils::json::JsonValue;
25    let mut stack: Vec<(&JsonValue, u32)> = vec![(value, 1)];
26    while let Some((node, depth)) = stack.pop() {
27        if depth > JSON_LITERAL_MAX_DEPTH {
28            return Err(format!(
29                "JSON object literal exceeds JSON_LITERAL_MAX_DEPTH ({})",
30                JSON_LITERAL_MAX_DEPTH
31            ));
32        }
33        match node {
34            JsonValue::Object(entries) => {
35                for (_, v) in entries {
36                    stack.push((v, depth + 1));
37                }
38            }
39            JsonValue::Array(items) => {
40                for v in items {
41                    stack.push((v, depth + 1));
42                }
43            }
44            _ => {}
45        }
46    }
47    Ok(())
48}
49
50impl<'a> Parser<'a> {
51    /// Parse: INSERT INTO table [NODE|EDGE|VECTOR|DOCUMENT|KV] (col1, col2) VALUES (val1, val2), (val3, val4) [RETURNING]
52    pub fn parse_insert_query(&mut self) -> Result<QueryExpr, ParseError> {
53        self.expect(Token::Insert)?;
54        self.expect(Token::Into)?;
55        let table = self.expect_ident()?;
56
57        // Check for entity type keyword
58        let entity_type = match self.peek().clone() {
59            Token::Node => {
60                self.advance()?;
61                InsertEntityType::Node
62            }
63            Token::Edge => {
64                self.advance()?;
65                InsertEntityType::Edge
66            }
67            Token::Vector => {
68                self.advance()?;
69                InsertEntityType::Vector
70            }
71            Token::Document => {
72                self.advance()?;
73                InsertEntityType::Document
74            }
75            Token::Kv => {
76                self.advance()?;
77                InsertEntityType::Kv
78            }
79            _ => InsertEntityType::Row,
80        };
81
82        // Parse column list
83        self.expect(Token::LParen)?;
84        let columns = self.parse_ident_list()?;
85        self.expect(Token::RParen)?;
86
87        // Parse VALUES
88        self.expect(Token::Values)?;
89        let mut all_values = Vec::new();
90        let mut all_value_exprs = Vec::new();
91        loop {
92            self.expect(Token::LParen)?;
93            let row_exprs = self.parse_dml_expr_list()?;
94            self.expect(Token::RParen)?;
95            // Tolerate `$N` / `?` placeholders in VALUES rows: fold to
96            // Value::Null and rely on `user_params::bind` to substitute
97            // the caller's values before execution. Issue #355.
98            // Tolerate `$N` / `?` placeholders in VALUES rows: if fold
99            // fails on an expression that contains `Expr::Parameter`,
100            // emit a `Value::Null` placeholder. `user_params::bind`
101            // substitutes the caller-supplied value before execution.
102            // Issue #355.
103            let row_values = row_exprs
104                .iter()
105                .map(|expr| match fold_expr_to_value(expr.clone()) {
106                    Ok(value) => Ok(value),
107                    Err(msg) => {
108                        if crate::storage::query::user_params::expr_contains_parameter(&expr) {
109                            Ok(Value::Null)
110                        } else {
111                            Err(msg)
112                        }
113                    }
114                })
115                .collect::<Result<Vec<_>, _>>()
116                .map_err(|msg| ParseError::new(msg, self.position()))?;
117            all_value_exprs.push(row_exprs);
118            all_values.push(row_values);
119            if !self.consume(&Token::Comma)? {
120                break;
121            }
122        }
123
124        // Parse optional WITH clauses
125        let (ttl_ms, expires_at_ms, with_metadata, auto_embed) = self.parse_with_clauses()?;
126
127        let returning = self.parse_returning_clause()?;
128
129        let suppress_events = if self.consume_ident_ci("SUPPRESS")? {
130            self.expect_ident_ci("EVENTS")?;
131            true
132        } else {
133            false
134        };
135
136        Ok(QueryExpr::Insert(InsertQuery {
137            table,
138            entity_type,
139            columns,
140            value_exprs: all_value_exprs,
141            values: all_values,
142            returning,
143            ttl_ms,
144            expires_at_ms,
145            with_metadata,
146            auto_embed,
147            suppress_events,
148        }))
149    }
150
151    /// Parse TTL duration value using the same logic as CREATE TABLE ... WITH TTL.
152    fn parse_ttl_duration(&mut self) -> Result<u64, ParseError> {
153        // Reuse the DDL TTL parser: expects a number followed by optional unit
154        let ttl_value = self.parse_float()?;
155        let ttl_unit = match self.peek() {
156            Token::Ident(unit) => {
157                let unit = unit.clone();
158                self.advance()?;
159                unit
160            }
161            _ => "s".to_string(),
162        };
163
164        let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
165            "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
166            "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
167            "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
168            "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
169            "d" | "day" | "days" => 86_400_000.0,
170            other => {
171                return Err(ParseError::new(
172                    // F-05: render `other` via `{:?}` so caller-controlled
173                    // bytes (CR / LF / NUL / quotes) are escaped before
174                    // landing in the JSON/audit/log/gRPC error sinks.
175                    format!("unsupported TTL unit {other:?}"),
176                    self.position(),
177                ));
178            }
179        };
180
181        Ok((ttl_value * multiplier_ms) as u64)
182    }
183
184    /// Parse WITH clauses: WITH TTL | EXPIRES AT | METADATA | AUTO EMBED
185    /// Returns (ttl_ms, expires_at_ms, metadata, auto_embed)
186    pub fn parse_with_clauses(
187        &mut self,
188    ) -> Result<
189        (
190            Option<u64>,
191            Option<u64>,
192            Vec<(String, Value)>,
193            Option<crate::storage::query::ast::AutoEmbedConfig>,
194        ),
195        ParseError,
196    > {
197        let mut ttl_ms = None;
198        let mut expires_at_ms = None;
199        let mut with_metadata = Vec::new();
200        let mut auto_embed = None;
201
202        while self.consume(&Token::With)? {
203            if self.consume_ident_ci("TTL")? {
204                ttl_ms = Some(self.parse_ttl_duration()?);
205            } else if self.consume_ident_ci("EXPIRES")? {
206                self.expect_ident_ci("AT")?;
207                let ts = self.parse_expires_at_value()?;
208                expires_at_ms = Some(ts);
209            } else if self.consume(&Token::Metadata)? || self.consume_ident_ci("METADATA")? {
210                with_metadata = self.parse_with_metadata_pairs()?;
211            } else if self.consume_ident_ci("AUTO")? {
212                // WITH AUTO EMBED (field1, field2) [USING provider] [MODEL 'model']
213                self.consume_ident_ci("EMBED")?;
214                self.expect(Token::LParen)?;
215                let mut fields = Vec::new();
216                loop {
217                    fields.push(self.expect_ident()?);
218                    if !self.consume(&Token::Comma)? {
219                        break;
220                    }
221                }
222                self.expect(Token::RParen)?;
223                // `USING` is a reserved keyword (`Token::Using`), so
224                // `consume_ident_ci` would never match. Use the typed
225                // consumer instead. See bug #108 (mirrors the #92 fix
226                // for migration `DEPENDS ON`).
227                let provider = if self.consume(&Token::Using)? {
228                    self.expect_ident()?
229                } else {
230                    "openai".to_string()
231                };
232                let model = if self.consume_ident_ci("MODEL")? {
233                    Some(self.parse_string()?)
234                } else {
235                    None
236                };
237                auto_embed = Some(crate::storage::query::ast::AutoEmbedConfig {
238                    fields,
239                    provider,
240                    model,
241                });
242            } else {
243                return Err(ParseError::expected(
244                    vec!["TTL", "EXPIRES AT", "METADATA", "AUTO EMBED"],
245                    self.peek(),
246                    self.position(),
247                ));
248            }
249        }
250
251        Ok((ttl_ms, expires_at_ms, with_metadata, auto_embed))
252    }
253
254    /// Expect a case-insensitive identifier (error if not found)
255    fn expect_ident_ci(&mut self, expected: &str) -> Result<(), ParseError> {
256        if self.consume_ident_ci(expected)? {
257            Ok(())
258        } else {
259            Err(ParseError::expected(
260                vec![expected],
261                self.peek(),
262                self.position(),
263            ))
264        }
265    }
266
267    /// Parse an absolute expiration timestamp (unix ms or string date)
268    fn parse_expires_at_value(&mut self) -> Result<u64, ParseError> {
269        // Try integer (unix timestamp in ms)
270        if let Ok(value) = self.parse_integer() {
271            return Ok(value as u64);
272        }
273        // Try string like '2026-12-31' — convert to unix ms
274        if let Ok(text) = self.parse_string() {
275            // Simple ISO date parsing: YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS
276            let trimmed = text.trim();
277            if let Ok(ts) = trimmed.parse::<u64>() {
278                return Ok(ts);
279            }
280            // Basic date parsing — delegate to chrono if available, or simple heuristic
281            return Err(ParseError::new(
282                // F-05: `trimmed` is caller-controlled string-literal bytes.
283                // Render via `{:?}` so CR/LF/NUL/quotes are escaped before
284                // the message reaches the JSON / audit / log / gRPC sinks.
285                format!("EXPIRES AT requires a unix timestamp in milliseconds, got {trimmed:?}"),
286                self.position(),
287            ));
288        }
289        Err(ParseError::expected(
290            vec!["timestamp (unix ms) or 'YYYY-MM-DD'"],
291            self.peek(),
292            self.position(),
293        ))
294    }
295
296    /// Parse WITH METADATA (key1 = 'value1', key2 = 42)
297    fn parse_with_metadata_pairs(&mut self) -> Result<Vec<(String, Value)>, ParseError> {
298        self.expect(Token::LParen)?;
299        let mut pairs = Vec::new();
300        if !self.check(&Token::RParen) {
301            loop {
302                let key = self.expect_ident_or_keyword()?.to_ascii_lowercase();
303                self.expect(Token::Eq)?;
304                let value = self.parse_literal_value()?;
305                pairs.push((key, value));
306                if !self.consume(&Token::Comma)? {
307                    break;
308                }
309            }
310        }
311        self.expect(Token::RParen)?;
312        Ok(pairs)
313    }
314
315    /// Parse: UPDATE table SET col1=val1, col2=val2 [WHERE filter] [WITH TTL|EXPIRES AT|METADATA]
316    pub fn parse_update_query(&mut self) -> Result<QueryExpr, ParseError> {
317        self.expect(Token::Update)?;
318        let table = self.expect_ident()?;
319        let target = self.parse_update_target()?;
320        self.expect(Token::Set)?;
321
322        let mut assignments = Vec::new();
323        let mut assignment_exprs = Vec::new();
324        let mut compound_assignment_ops = Vec::new();
325        loop {
326            let col = self.expect_column_ident()?;
327            let compound_op = if self.consume(&Token::Eq)? {
328                None
329            } else {
330                let op = match self.peek() {
331                    Token::Plus => BinOp::Add,
332                    Token::Dash | Token::Minus => BinOp::Sub,
333                    Token::Star => BinOp::Mul,
334                    Token::Slash => BinOp::Div,
335                    Token::Percent => BinOp::Mod,
336                    _ => {
337                        return Err(ParseError::expected(
338                            vec!["=", "+=", "-=", "*=", "/=", "%="],
339                            self.peek(),
340                            self.position(),
341                        ));
342                    }
343                };
344                self.advance()?;
345                self.expect(Token::Eq)?;
346                Some(op)
347            };
348            let expr = self.parse_expr()?;
349            let folded = fold_expr_to_value(expr.clone()).ok();
350            assignment_exprs.push((col.clone(), expr));
351            compound_assignment_ops.push(compound_op);
352            if compound_op.is_none() {
353                if let Some(val) = folded {
354                    assignments.push((col.clone(), val));
355                }
356            }
357            if !self.consume(&Token::Comma)? {
358                break;
359            }
360        }
361
362        let filter = if self.consume(&Token::Where)? {
363            Some(self.parse_filter()?)
364        } else {
365            None
366        };
367        let where_expr = filter.as_ref().map(filter_to_expr);
368
369        let (ttl_ms, expires_at_ms, with_metadata, _auto_embed) = self.parse_with_clauses()?;
370
371        let mut order_by = if self.consume(&Token::Order)? {
372            self.expect(Token::By)?;
373            let clauses = self.parse_order_by_list()?;
374            validate_update_order_by(&clauses, self.position())?;
375            clauses
376        } else {
377            Vec::new()
378        };
379
380        // Optional `LIMIT N` — used by `BATCH N ROWS` data migrations
381        // to cap a single batch. Must come after WHERE / WITH because
382        // those have their own keyword tokens that the LIMIT branch
383        // would otherwise mis-consume.
384        let limit = if self.consume(&Token::Limit)? {
385            Some(self.parse_integer()? as u64)
386        } else {
387            None
388        };
389        if !order_by.is_empty() && limit.is_none() {
390            return Err(ParseError::new(
391                "UPDATE ORDER BY requires LIMIT",
392                self.position(),
393            ));
394        }
395        if !order_by.is_empty() && !update_order_by_mentions_rid(&order_by) {
396            order_by.push(OrderByClause {
397                field: FieldRef::TableColumn {
398                    table: String::new(),
399                    column: "rid".to_string(),
400                },
401                expr: None,
402                ascending: true,
403                nulls_first: false,
404            });
405        }
406
407        let returning = self.parse_returning_clause()?;
408
409        let suppress_events = if self.consume_ident_ci("SUPPRESS")? {
410            self.expect_ident_ci("EVENTS")?;
411            true
412        } else {
413            false
414        };
415
416        Ok(QueryExpr::Update(UpdateQuery {
417            table,
418            target,
419            assignment_exprs,
420            compound_assignment_ops,
421            assignments,
422            where_expr,
423            filter,
424            ttl_ms,
425            expires_at_ms,
426            with_metadata,
427            returning,
428            order_by,
429            limit,
430            suppress_events,
431        }))
432    }
433
434    fn parse_update_target(&mut self) -> Result<UpdateTarget, ParseError> {
435        if self.consume(&Token::Kv)? {
436            return Ok(UpdateTarget::Kv);
437        }
438        if self.consume_ident_ci("ROWS")? {
439            return Ok(UpdateTarget::Rows);
440        }
441        if self.consume_ident_ci("DOCUMENTS")? {
442            return Ok(UpdateTarget::Documents);
443        }
444        if self.consume_ident_ci("NODES")? {
445            return Ok(UpdateTarget::Nodes);
446        }
447        if self.consume_ident_ci("EDGES")? {
448            return Ok(UpdateTarget::Edges);
449        }
450        Ok(UpdateTarget::Rows)
451    }
452
453    /// Parse: DELETE FROM table [WHERE filter]
454    pub fn parse_delete_query(&mut self) -> Result<QueryExpr, ParseError> {
455        self.expect(Token::Delete)?;
456        self.expect(Token::From)?;
457        let table = self.expect_ident()?;
458
459        let filter = if self.consume(&Token::Where)? {
460            Some(self.parse_filter()?)
461        } else {
462            None
463        };
464
465        let where_expr = filter.as_ref().map(filter_to_expr);
466
467        let returning = self.parse_returning_clause()?;
468
469        let suppress_events = if self.consume_ident_ci("SUPPRESS")? {
470            self.expect_ident_ci("EVENTS")?;
471            true
472        } else {
473            false
474        };
475
476        Ok(QueryExpr::Delete(DeleteQuery {
477            table,
478            where_expr,
479            filter,
480            returning,
481            suppress_events,
482        }))
483    }
484
485    /// Parse optional `RETURNING (* | col [, col ...])` clause.
486    /// Returns `None` if no RETURNING token, errors if RETURNING is present
487    /// but not followed by `*` or a non-empty column list.
488    fn parse_returning_clause(&mut self) -> Result<Option<Vec<ReturningItem>>, ParseError> {
489        if !self.consume(&Token::Returning)? {
490            return Ok(None);
491        }
492        if self.consume(&Token::Star)? {
493            return Ok(Some(vec![ReturningItem::All]));
494        }
495        let mut items = Vec::new();
496        loop {
497            if returning_expr_start(self.peek()) {
498                return Err(returning_expr_not_supported(self.position()));
499            }
500            let col = self.expect_update_returning_column()?;
501            items.push(ReturningItem::Column(col));
502            if returning_expr_tail(self.peek()) {
503                return Err(returning_expr_not_supported(self.position()));
504            }
505            if !self.consume(&Token::Comma)? {
506                break;
507            }
508        }
509        if items.is_empty() {
510            return Err(ParseError::expected(
511                vec!["*", "column name"],
512                self.peek(),
513                self.position(),
514            ));
515        }
516        Ok(Some(items))
517    }
518
519    fn expect_update_returning_column(&mut self) -> Result<String, ParseError> {
520        if self.consume(&Token::Weight)? {
521            return Ok("weight".to_string());
522        }
523        self.expect_ident_or_keyword()
524    }
525
526    /// Parse: ASK 'question' [USING provider] [MODEL 'model'] [DEPTH n]
527    /// [LIMIT n] [MIN_SCORE x] [COLLECTION col]
528    pub fn parse_ask_query(&mut self) -> Result<QueryExpr, ParseError> {
529        self.parse_ask_query_with_explain(false)
530    }
531
532    /// Parse: EXPLAIN ASK 'question' ...
533    pub fn parse_explain_ask_query(&mut self) -> Result<QueryExpr, ParseError> {
534        self.advance()?; // consume EXPLAIN
535        if !matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ASK")) {
536            return Err(ParseError::expected(
537                vec!["ASK"],
538                self.peek(),
539                self.position(),
540            ));
541        }
542        self.parse_ask_query_with_explain(true)
543    }
544
545    fn parse_ask_query_with_explain(&mut self, explain: bool) -> Result<QueryExpr, ParseError> {
546        self.advance()?; // consume ASK
547
548        let (question, question_param) = match self.peek() {
549            Token::String(_) => (self.parse_string()?, None),
550            Token::Dollar | Token::Question => {
551                let index = self.parse_param_slot("ASK question")?;
552                (String::new(), Some(index))
553            }
554            other => {
555                return Err(ParseError::expected(
556                    vec!["string", "$N", "?"],
557                    other,
558                    self.position(),
559                ));
560            }
561        };
562
563        let mut provider = None;
564        let mut model = None;
565        let mut depth = None;
566        let mut limit = None;
567        let mut min_score = None;
568        let mut collection = None;
569        let mut temperature = None;
570        let mut seed = None;
571        let mut strict = true;
572        let mut stream = false;
573        let mut cache = AskCacheClause::Default;
574
575        // Parse optional clauses in any order. Loop bound = number of
576        // clause kinds, so each can appear at most once.
577        for _ in 0..12 {
578            if self.consume(&Token::Using)? {
579                provider = Some(match &self.current.token {
580                    Token::String(_) => self.parse_string()?,
581                    _ => self.expect_ident()?,
582                });
583            } else if self.consume_ident_ci("MODEL")? {
584                model = Some(self.parse_string()?);
585            } else if self.consume(&Token::Depth)? {
586                depth = Some(self.parse_integer()? as usize);
587            } else if self.consume(&Token::Limit)? {
588                limit = Some(self.parse_integer()? as usize);
589            } else if self.consume(&Token::MinScore)? {
590                min_score = Some(self.parse_float()? as f32);
591            } else if self.consume(&Token::Collection)? {
592                collection = Some(self.expect_ident()?);
593            } else if self.consume_ident_ci("TEMPERATURE")? {
594                temperature = Some(self.parse_float()? as f32);
595            } else if self.consume_ident_ci("SEED")? {
596                seed = Some(self.parse_integer()? as u64);
597            } else if self.consume_ident_ci("STRICT")? {
598                let value = self.expect_ident_or_keyword()?;
599                if value.eq_ignore_ascii_case("ON") {
600                    strict = true;
601                } else if value.eq_ignore_ascii_case("OFF") {
602                    strict = false;
603                } else {
604                    return Err(ParseError::new(
605                        "Expected ON or OFF after STRICT",
606                        self.position(),
607                    ));
608                }
609            } else if self.consume_ident_ci("STREAM")? {
610                stream = true;
611            } else if self.consume_ident_ci("CACHE")? {
612                if !matches!(cache, AskCacheClause::Default) {
613                    return Err(ParseError::new(
614                        "ASK cache clause specified more than once",
615                        self.position(),
616                    ));
617                }
618                let ttl = self.expect_ident_or_keyword()?;
619                if !ttl.eq_ignore_ascii_case("TTL") {
620                    return Err(ParseError::new("Expected TTL after CACHE", self.position()));
621                }
622                cache = AskCacheClause::CacheTtl(self.parse_string()?);
623            } else if self.consume_ident_ci("NOCACHE")? {
624                if !matches!(cache, AskCacheClause::Default) {
625                    return Err(ParseError::new(
626                        "ASK cache clause specified more than once",
627                        self.position(),
628                    ));
629                }
630                cache = AskCacheClause::NoCache;
631            } else {
632                break;
633            }
634        }
635
636        Ok(QueryExpr::Ask(AskQuery {
637            explain,
638            question,
639            question_param,
640            provider,
641            model,
642            depth,
643            limit,
644            min_score,
645            collection,
646            temperature,
647            seed,
648            strict,
649            stream,
650            cache,
651        }))
652    }
653
654    /// Parse comma-separated identifiers (accepts keywords as column names in DML context)
655    fn parse_ident_list(&mut self) -> Result<Vec<String>, ParseError> {
656        let mut idents = Vec::new();
657        loop {
658            idents.push(self.expect_ident_or_keyword()?);
659            if !self.consume(&Token::Comma)? {
660                break;
661            }
662        }
663        Ok(idents)
664    }
665
666    /// Parse comma-separated literal values for DML statements
667    fn parse_dml_value_list(&mut self) -> Result<Vec<Value>, ParseError> {
668        self.parse_dml_expr_list()?
669            .into_iter()
670            .map(fold_expr_to_value)
671            .collect::<Result<Vec<_>, _>>()
672            .map_err(|msg| ParseError::new(msg, self.position()))
673    }
674
675    fn parse_dml_expr_list(&mut self) -> Result<Vec<Expr>, ParseError> {
676        let mut values = Vec::new();
677        loop {
678            values.push(self.parse_expr()?);
679            if !self.consume(&Token::Comma)? {
680                break;
681            }
682        }
683        Ok(values)
684    }
685
686    /// Parse a single literal value (string, number, true, false, null, array)
687    pub(crate) fn parse_literal_value(&mut self) -> Result<Value, ParseError> {
688        // Depth guard: this function recurses for nested array `[…]`
689        // and object `{…}` literals (see the LBracket / LBrace arms
690        // below). Without entering the depth counter, an adversarial
691        // payload like `[[[[…(10k×)…]]]]` would overflow the Rust
692        // stack BEFORE `ParserLimits::max_depth` fires. The
693        // `JsonLiteral` token path uses `json_literal_depth_check`
694        // (iterative) — the bare `[`/`{` path needs the recursion
695        // counter explicitly.
696        self.enter_depth()?;
697        let result = self.parse_literal_value_inner();
698        self.exit_depth();
699        result
700    }
701
702    fn parse_literal_value_inner(&mut self) -> Result<Value, ParseError> {
703        // Recognize PASSWORD('plaintext') and SECRET('plaintext') as
704        // typed literal constructors. The parser stores them as
705        // sentinel-prefixed values so that the INSERT executor can
706        // apply the crypto transform (argon2id hash / AES-256-GCM
707        // encrypt) without the parser depending on auth or crypto
708        // subsystems.
709        if let Token::Ident(name) = self.peek().clone() {
710            let upper = name.to_uppercase();
711            if upper == "PASSWORD" || upper == "SECRET" {
712                self.advance()?; // consume ident
713                self.expect(Token::LParen)?;
714                let plaintext = self.parse_string()?;
715                self.expect(Token::RParen)?;
716                return Ok(match upper.as_str() {
717                    "PASSWORD" => Value::Password(format!("@@plain@@{plaintext}")),
718                    "SECRET" => Value::Secret(format!("@@plain@@{plaintext}").into_bytes()),
719                    _ => unreachable!(),
720                });
721            }
722            if upper == "SECRET_REF" {
723                self.advance()?; // consume ident
724                self.expect(Token::LParen)?;
725                let store = self.expect_ident_or_keyword()?.to_ascii_lowercase();
726                if store != "vault" {
727                    return Err(ParseError::expected(
728                        vec!["vault"],
729                        self.peek(),
730                        self.position(),
731                    ));
732                }
733                self.expect(Token::Comma)?;
734                let (collection, key) =
735                    self.parse_kv_key(crate::catalog::CollectionModel::Vault)?;
736                self.expect(Token::RParen)?;
737                return Ok(secret_ref_value(&store, &collection, &key));
738            }
739        }
740
741        match self.peek().clone() {
742            Token::String(s) => {
743                let s = s.clone();
744                self.advance()?;
745                Ok(Value::text(s))
746            }
747            Token::JsonLiteral(raw) => {
748                // The lexer already validated brace balance and the
749                // 16 MiB payload ceiling. Parse the raw text into a
750                // canonical JsonValue then re-encode via `to_vec` so
751                // the on-disk bytes match the quoted form.
752                self.advance()?;
753                let json_value = crate::utils::json::parse_json(&raw).map_err(|err| {
754                    ParseError::new(
755                        // F-05: render the underlying parse-error string
756                        // via `{:?}` so any user fragment serde echoed
757                        // back (unexpected character, key text, …) is
758                        // Debug-escaped before reaching the downstream
759                        // JSON / audit / log / gRPC sinks.
760                        format!("invalid JSON object literal: {:?}", err.to_string()),
761                        self.position(),
762                    )
763                })?;
764                json_literal_depth_check(&json_value)
765                    .map_err(|err| ParseError::new(err, self.position()))?;
766                let canonical = crate::serde_json::Value::from(json_value);
767                let bytes = crate::json::to_vec(&canonical).map_err(|err| {
768                    ParseError::new(
769                        // F-05: escape the encoder error via `{:?}` so any
770                        // user fragment it carries cannot smuggle control
771                        // bytes through downstream serialization sinks.
772                        format!("failed to encode JSON literal: {:?}", err.to_string()),
773                        self.position(),
774                    )
775                })?;
776                Ok(Value::Json(bytes))
777            }
778            Token::Integer(n) => {
779                self.advance()?;
780                Ok(Value::Integer(n))
781            }
782            Token::Float(n) => {
783                self.advance()?;
784                Ok(Value::Float(n))
785            }
786            Token::True => {
787                self.advance()?;
788                Ok(Value::Boolean(true))
789            }
790            Token::False => {
791                self.advance()?;
792                Ok(Value::Boolean(false))
793            }
794            Token::Null => {
795                self.advance()?;
796                Ok(Value::Null)
797            }
798            Token::LBracket => {
799                // Parse array literal [val1, val2, ...]
800                // For numeric arrays, produce Value::Vector; for others, produce Value::Json
801                self.advance()?; // consume '['
802                let mut items = Vec::new();
803                if !self.check(&Token::RBracket) {
804                    loop {
805                        items.push(self.parse_literal_value()?);
806                        if !self.consume(&Token::Comma)? {
807                            break;
808                        }
809                    }
810                }
811                self.expect(Token::RBracket)?;
812
813                // Check if all items are numeric (Integer or Float) -> Value::Vector
814                let all_numeric = items
815                    .iter()
816                    .all(|v| matches!(v, Value::Integer(_) | Value::Float(_)));
817                if all_numeric && !items.is_empty() {
818                    let floats: Vec<f32> = items
819                        .iter()
820                        .map(|v| match v {
821                            Value::Float(f) => *f as f32,
822                            Value::Integer(i) => *i as f32,
823                            _ => 0.0,
824                        })
825                        .collect();
826                    Ok(Value::Vector(floats))
827                } else {
828                    // Encode as JSON bytes
829                    let json_arr: Vec<crate::json::Value> = items
830                        .iter()
831                        .map(|v| match v {
832                            Value::Null => crate::json::Value::Null,
833                            Value::Boolean(b) => crate::json::Value::Bool(*b),
834                            Value::Integer(i) => crate::json::Value::Number(*i as f64),
835                            Value::Float(f) => crate::json::Value::Number(*f),
836                            Value::Text(s) => crate::json::Value::String(s.to_string()),
837                            _ => crate::json::Value::Null,
838                        })
839                        .collect();
840                    let json_val = crate::json::Value::Array(json_arr);
841                    let bytes = crate::json::to_vec(&json_val).unwrap_or_default();
842                    Ok(Value::Json(bytes))
843                }
844            }
845            Token::LBrace => {
846                // Parse JSON object literal {key: value, ...}
847                self.advance()?; // consume '{'
848                let mut map = crate::json::Map::new();
849                if !self.check(&Token::RBrace) {
850                    loop {
851                        // Key: string or identifier. Reserved-word
852                        // keys (`level`, `msg`, `type`, …) fall through
853                        // to `expect_ident_or_keyword`, which returns
854                        // the canonical UPPERCASE spelling; lowercase
855                        // that path so the JSON object preserves the
856                        // source casing.
857                        let key = match self.peek().clone() {
858                            Token::String(s) => {
859                                self.advance()?;
860                                s
861                            }
862                            Token::Ident(s) => {
863                                self.advance()?;
864                                s
865                            }
866                            _ => self.expect_ident_or_keyword()?.to_ascii_lowercase(),
867                        };
868                        // Separator: ':' or '='
869                        if !self.consume(&Token::Colon)? {
870                            self.expect(Token::Eq)?;
871                        }
872                        // Value: recursive
873                        let val = self.parse_literal_value()?;
874                        let json_val = match val {
875                            Value::Null => crate::json::Value::Null,
876                            Value::Boolean(b) => crate::json::Value::Bool(b),
877                            Value::Integer(i) => crate::json::Value::Number(i as f64),
878                            Value::Float(f) => crate::json::Value::Number(f),
879                            Value::Text(s) => crate::json::Value::String(s.to_string()),
880                            Value::Json(ref bytes) => {
881                                crate::json::from_slice(bytes).unwrap_or(crate::json::Value::Null)
882                            }
883                            _ => crate::json::Value::Null,
884                        };
885                        map.insert(key, json_val);
886                        if !self.consume(&Token::Comma)? {
887                            break;
888                        }
889                    }
890                }
891                self.expect(Token::RBrace)?;
892                let json_val = crate::json::Value::Object(map);
893                let bytes = crate::json::to_vec(&json_val).unwrap_or_default();
894                Ok(Value::Json(bytes))
895            }
896            ref other => Err(ParseError::expected(
897                vec!["string", "number", "true", "false", "null", "[", "{"],
898                other,
899                self.position(),
900            )),
901        }
902    }
903}
904
905fn returning_expr_start(token: &Token) -> bool {
906    matches!(
907        token,
908        Token::Integer(_)
909            | Token::Float(_)
910            | Token::String(_)
911            | Token::JsonLiteral(_)
912            | Token::Null
913            | Token::True
914            | Token::False
915            | Token::LParen
916            | Token::Minus
917            | Token::Question
918            | Token::Dollar
919    )
920}
921
922fn returning_expr_tail(token: &Token) -> bool {
923    matches!(
924        token,
925        Token::LParen
926            | Token::Plus
927            | Token::Minus
928            | Token::Star
929            | Token::Slash
930            | Token::Percent
931            | Token::DoublePipe
932            | Token::Pipe
933            | Token::Eq
934            | Token::Ne
935            | Token::Lt
936            | Token::Le
937            | Token::Gt
938            | Token::Ge
939            | Token::Dot
940            | Token::Colon
941    )
942}
943
944fn validate_update_order_by(
945    clauses: &[OrderByClause],
946    position: super::super::lexer::Position,
947) -> Result<(), ParseError> {
948    for clause in clauses {
949        if clause.expr.is_some() {
950            return Err(ParseError::new(
951                "UPDATE ORDER BY only supports top-level fields",
952                position,
953            ));
954        }
955        match &clause.field {
956            FieldRef::TableColumn { table, column }
957                if table.is_empty() && !column.contains('.') => {}
958            _ => {
959                return Err(ParseError::new(
960                    "UPDATE ORDER BY only supports top-level fields",
961                    position,
962                ));
963            }
964        }
965    }
966    Ok(())
967}
968
969fn update_order_by_mentions_rid(clauses: &[OrderByClause]) -> bool {
970    clauses.iter().any(|clause| {
971        matches!(
972            &clause.field,
973            FieldRef::TableColumn { table, column }
974                if table.is_empty() && column.eq_ignore_ascii_case("rid")
975        )
976    })
977}
978
979fn returning_expr_not_supported(position: super::super::lexer::Position) -> ParseError {
980    ParseError::new(
981        "NOT_YET_SUPPORTED: RETURNING expressions are not supported yet; use RETURNING * or named columns. Track a follow-up issue for RETURNING <expr>.",
982        position,
983    )
984}
985
986fn secret_ref_value(store: &str, collection: &str, key: &str) -> Value {
987    let mut map = crate::json::Map::new();
988    map.insert(
989        "type".to_string(),
990        crate::json::Value::String("secret_ref".to_string()),
991    );
992    map.insert(
993        "store".to_string(),
994        crate::json::Value::String(store.to_string()),
995    );
996    map.insert(
997        "collection".to_string(),
998        crate::json::Value::String(collection.to_string()),
999    );
1000    map.insert(
1001        "key".to_string(),
1002        crate::json::Value::String(key.to_string()),
1003    );
1004    Value::Json(crate::json::to_vec(&crate::json::Value::Object(map)).unwrap_or_default())
1005}