Skip to main content

reddb_server/storage/query/parser/
dml.rs

1//! DML SQL Parser: INSERT, UPDATE, DELETE
2
3use super::super::ast::{
4    AskCacheClause, AskQuery, BinOp, DeleteQuery, Expr, FieldRef, Filter, InsertEntityType,
5    InsertQuery, OrderByClause, QueryExpr, ReturningItem, UpdateQuery, UpdateTarget,
6};
7use super::super::lexer::Token;
8use super::error::ParseError;
9use super::Parser;
10use crate::storage::query::sql_lowering::{filter_to_expr, fold_expr_to_value};
11use crate::storage::schema::Value;
12
13/// DoS guard: maximum JSON nesting depth accepted by the parser.
14/// Mirrors typical web-server JSON limits and bails out before stack
15/// usage gets dangerous in downstream traversals.
16pub(crate) const JSON_LITERAL_MAX_DEPTH: u32 = 128;
17
18/// Walk a parsed `JsonValue` tree and bail out if nesting exceeds
19/// `JSON_LITERAL_MAX_DEPTH`. Iterative to avoid the very stack
20/// overflow we're trying to prevent.
21pub(crate) fn json_literal_depth_check(
22    value: &crate::utils::json::JsonValue,
23) -> Result<(), String> {
24    use crate::utils::json::JsonValue;
25    let mut stack: Vec<(&JsonValue, u32)> = vec![(value, 1)];
26    while let Some((node, depth)) = stack.pop() {
27        if depth > JSON_LITERAL_MAX_DEPTH {
28            return Err(format!(
29                "JSON object literal exceeds JSON_LITERAL_MAX_DEPTH ({})",
30                JSON_LITERAL_MAX_DEPTH
31            ));
32        }
33        match node {
34            JsonValue::Object(entries) => {
35                for (_, v) in entries {
36                    stack.push((v, depth + 1));
37                }
38            }
39            JsonValue::Array(items) => {
40                for v in items {
41                    stack.push((v, depth + 1));
42                }
43            }
44            _ => {}
45        }
46    }
47    Ok(())
48}
49
50impl<'a> Parser<'a> {
51    /// Parse: INSERT INTO table [NODE|EDGE|VECTOR|DOCUMENT|KV] (col1, col2) VALUES (val1, val2), (val3, val4) [RETURNING]
52    pub fn parse_insert_query(&mut self) -> Result<QueryExpr, ParseError> {
53        self.expect(Token::Insert)?;
54        self.expect(Token::Into)?;
55        // Issue #789 — Analytics v0 explicitly excludes `INSERT INTO
56        // METRIC <path>` as a raw write path (PRD #782 non-goal). Raw
57        // samples land in ordinary RedDB collections; the metric
58        // descriptor catalog is reached through `CREATE METRIC` and
59        // `red.analytics.metrics`. Reject the form here before the
60        // identifier slot so the error names the actual reason, not a
61        // generic "expected identifier".
62        if matches!(self.peek(), Token::Metric) {
63            return Err(ParseError::new(
64                "INSERT INTO METRIC is not supported in Analytics v0 — \
65                 write raw samples into an ordinary TABLE/DOCUMENT \
66                 collection; the metric descriptor catalog is reached \
67                 via CREATE METRIC and red.analytics.metrics \
68                 (PRD #782 non-goal)",
69                self.position(),
70            ));
71        }
72        let table = self.expect_ident()?;
73
74        // Check for entity type keyword
75        let entity_type = match self.peek().clone() {
76            Token::Node => {
77                self.advance()?;
78                InsertEntityType::Node
79            }
80            Token::Edge => {
81                self.advance()?;
82                InsertEntityType::Edge
83            }
84            Token::Vector => {
85                self.advance()?;
86                InsertEntityType::Vector
87            }
88            Token::Document => {
89                self.advance()?;
90                InsertEntityType::Document
91            }
92            Token::Kv => {
93                self.advance()?;
94                InsertEntityType::Kv
95            }
96            _ => InsertEntityType::Row,
97        };
98
99        // Parse column list
100        self.expect(Token::LParen)?;
101        let columns = self.parse_ident_list()?;
102        self.expect(Token::RParen)?;
103
104        // Parse VALUES
105        self.expect(Token::Values)?;
106        let mut all_values = Vec::new();
107        let mut all_value_exprs = Vec::new();
108        loop {
109            self.expect(Token::LParen)?;
110            let row_exprs = self.parse_dml_expr_list()?;
111            self.expect(Token::RParen)?;
112            // Tolerate `$N` / `?` placeholders in VALUES rows: fold to
113            // Value::Null and rely on `user_params::bind` to substitute
114            // the caller's values before execution. Issue #355.
115            // Tolerate `$N` / `?` placeholders in VALUES rows: if fold
116            // fails on an expression that contains `Expr::Parameter`,
117            // emit a `Value::Null` placeholder. `user_params::bind`
118            // substitutes the caller-supplied value before execution.
119            // Issue #355.
120            let row_values = row_exprs
121                .iter()
122                .map(|expr| match fold_expr_to_value(expr.clone()) {
123                    Ok(value) => Ok(value),
124                    Err(msg) => {
125                        if crate::storage::query::user_params::expr_contains_parameter(expr) {
126                            Ok(Value::Null)
127                        } else {
128                            Err(msg)
129                        }
130                    }
131                })
132                .collect::<Result<Vec<_>, _>>()
133                .map_err(|msg| ParseError::new(msg, self.position()))?;
134            all_value_exprs.push(row_exprs);
135            all_values.push(row_values);
136            if !self.consume(&Token::Comma)? {
137                break;
138            }
139        }
140
141        // Parse optional WITH clauses
142        let (ttl_ms, expires_at_ms, with_metadata, auto_embed) = self.parse_with_clauses()?;
143
144        let returning = self.parse_returning_clause()?;
145
146        let suppress_events = if self.consume_ident_ci("SUPPRESS")? {
147            self.expect_ident_ci("EVENTS")?;
148            true
149        } else {
150            false
151        };
152
153        Ok(QueryExpr::Insert(InsertQuery {
154            table,
155            entity_type,
156            columns,
157            value_exprs: all_value_exprs,
158            values: all_values,
159            returning,
160            ttl_ms,
161            expires_at_ms,
162            with_metadata,
163            auto_embed,
164            suppress_events,
165        }))
166    }
167
168    /// Parse TTL duration value using the same logic as CREATE TABLE ... WITH TTL.
169    fn parse_ttl_duration(&mut self) -> Result<u64, ParseError> {
170        // Reuse the DDL TTL parser: expects a number followed by optional unit
171        let ttl_value = self.parse_float()?;
172        let ttl_unit = match self.peek() {
173            Token::Ident(unit) => {
174                let unit = unit.clone();
175                self.advance()?;
176                unit
177            }
178            _ => "s".to_string(),
179        };
180
181        let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
182            "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
183            "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
184            "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
185            "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
186            "d" | "day" | "days" => 86_400_000.0,
187            other => {
188                return Err(ParseError::new(
189                    // F-05: render `other` via `{:?}` so caller-controlled
190                    // bytes (CR / LF / NUL / quotes) are escaped before
191                    // landing in the JSON/audit/log/gRPC error sinks.
192                    format!(
193                        "unsupported TTL unit {other:?}; supported units: ms, s, m, h, d (e.g. `WITH TTL 30 m`)"
194                    ),
195                    self.position(),
196                ));
197            }
198        };
199
200        Ok((ttl_value * multiplier_ms) as u64)
201    }
202
203    /// Parse WITH clauses: WITH TTL | EXPIRES AT | METADATA | AUTO EMBED
204    /// Returns (ttl_ms, expires_at_ms, metadata, auto_embed)
205    pub fn parse_with_clauses(
206        &mut self,
207    ) -> Result<
208        (
209            Option<u64>,
210            Option<u64>,
211            Vec<(String, Value)>,
212            Option<crate::storage::query::ast::AutoEmbedConfig>,
213        ),
214        ParseError,
215    > {
216        let mut ttl_ms = None;
217        let mut expires_at_ms = None;
218        let mut with_metadata = Vec::new();
219        let mut auto_embed = None;
220
221        while self.consume(&Token::With)? {
222            if self.consume_ident_ci("TTL")? {
223                ttl_ms = Some(self.parse_ttl_duration()?);
224            } else if self.consume_ident_ci("EXPIRES")? {
225                self.expect_ident_ci("AT")?;
226                let ts = self.parse_expires_at_value()?;
227                expires_at_ms = Some(ts);
228            } else if self.consume(&Token::Metadata)? || self.consume_ident_ci("METADATA")? {
229                with_metadata = self.parse_with_metadata_pairs()?;
230            } else if self.consume_ident_ci("AUTO")? {
231                // WITH AUTO EMBED (field1, field2) [USING provider] [MODEL 'model']
232                self.consume_ident_ci("EMBED")?;
233                self.expect(Token::LParen)?;
234                let mut fields = Vec::new();
235                loop {
236                    fields.push(self.expect_ident()?);
237                    if !self.consume(&Token::Comma)? {
238                        break;
239                    }
240                }
241                self.expect(Token::RParen)?;
242                // `USING` is a reserved keyword (`Token::Using`), so
243                // `consume_ident_ci` would never match. Use the typed
244                // consumer instead. See bug #108 (mirrors the #92 fix
245                // for migration `DEPENDS ON`).
246                let provider = if self.consume(&Token::Using)? {
247                    self.expect_ident()?
248                } else {
249                    "openai".to_string()
250                };
251                let model = if self.consume_ident_ci("MODEL")? {
252                    Some(self.parse_string()?)
253                } else {
254                    None
255                };
256                auto_embed = Some(crate::storage::query::ast::AutoEmbedConfig {
257                    fields,
258                    provider,
259                    model,
260                });
261            } else {
262                return Err(ParseError::expected(
263                    vec!["TTL", "EXPIRES AT", "METADATA", "AUTO EMBED"],
264                    self.peek(),
265                    self.position(),
266                ));
267            }
268        }
269
270        Ok((ttl_ms, expires_at_ms, with_metadata, auto_embed))
271    }
272
273    /// Expect a case-insensitive identifier (error if not found)
274    fn expect_ident_ci(&mut self, expected: &str) -> Result<(), ParseError> {
275        if self.consume_ident_ci(expected)? {
276            Ok(())
277        } else {
278            Err(ParseError::expected(
279                vec![expected],
280                self.peek(),
281                self.position(),
282            ))
283        }
284    }
285
286    /// Parse an absolute expiration timestamp (unix ms or string date)
287    fn parse_expires_at_value(&mut self) -> Result<u64, ParseError> {
288        // Try integer (unix timestamp in ms)
289        if let Ok(value) = self.parse_integer() {
290            return Ok(value as u64);
291        }
292        // Try string like '2026-12-31' — convert to unix ms
293        if let Ok(text) = self.parse_string() {
294            // Simple ISO date parsing: YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS
295            let trimmed = text.trim();
296            if let Ok(ts) = trimmed.parse::<u64>() {
297                return Ok(ts);
298            }
299            // Basic date parsing — delegate to chrono if available, or simple heuristic
300            return Err(ParseError::new(
301                // F-05: `trimmed` is caller-controlled string-literal bytes.
302                // Render via `{:?}` so CR/LF/NUL/quotes are escaped before
303                // the message reaches the JSON / audit / log / gRPC sinks.
304                format!("EXPIRES AT requires a unix timestamp in milliseconds, got {trimmed:?}"),
305                self.position(),
306            ));
307        }
308        Err(ParseError::expected(
309            vec!["timestamp (unix ms) or 'YYYY-MM-DD'"],
310            self.peek(),
311            self.position(),
312        ))
313    }
314
315    /// Parse WITH METADATA (key1 = 'value1', key2 = 42)
316    fn parse_with_metadata_pairs(&mut self) -> Result<Vec<(String, Value)>, ParseError> {
317        self.expect(Token::LParen)?;
318        let mut pairs = Vec::new();
319        if !self.check(&Token::RParen) {
320            loop {
321                let key = self.expect_ident_or_keyword()?.to_ascii_lowercase();
322                self.expect(Token::Eq)?;
323                let value = self.parse_literal_value()?;
324                pairs.push((key, value));
325                if !self.consume(&Token::Comma)? {
326                    break;
327                }
328            }
329        }
330        self.expect(Token::RParen)?;
331        Ok(pairs)
332    }
333
334    /// Parse: UPDATE table SET col1=val1, col2=val2 [WHERE filter] [WITH TTL|EXPIRES AT|METADATA]
335    pub fn parse_update_query(&mut self) -> Result<QueryExpr, ParseError> {
336        self.expect(Token::Update)?;
337        let table = self.expect_ident()?;
338        let target = self.parse_update_target()?;
339        self.expect(Token::Set)?;
340
341        let mut assignments = Vec::new();
342        let mut assignment_exprs = Vec::new();
343        let mut compound_assignment_ops = Vec::new();
344        loop {
345            let col = self.expect_column_ident()?;
346            let compound_op = if self.consume(&Token::Eq)? {
347                None
348            } else {
349                let op = match self.peek() {
350                    Token::Plus => BinOp::Add,
351                    Token::Dash | Token::Minus => BinOp::Sub,
352                    Token::Star => BinOp::Mul,
353                    Token::Slash => BinOp::Div,
354                    Token::Percent => BinOp::Mod,
355                    _ => {
356                        return Err(ParseError::expected(
357                            vec!["=", "+=", "-=", "*=", "/=", "%="],
358                            self.peek(),
359                            self.position(),
360                        ));
361                    }
362                };
363                self.advance()?;
364                self.expect(Token::Eq)?;
365                Some(op)
366            };
367            let expr = self.parse_expr()?;
368            let folded = fold_expr_to_value(expr.clone()).ok();
369            assignment_exprs.push((col.clone(), expr));
370            compound_assignment_ops.push(compound_op);
371            if compound_op.is_none() {
372                if let Some(val) = folded {
373                    assignments.push((col.clone(), val));
374                }
375            }
376            if !self.consume(&Token::Comma)? {
377                break;
378            }
379        }
380
381        let filter = if self.consume(&Token::Where)? {
382            Some(self.parse_filter()?)
383        } else {
384            None
385        };
386        let where_expr = filter.as_ref().map(filter_to_expr);
387
388        let (ttl_ms, expires_at_ms, with_metadata, _auto_embed) = self.parse_with_clauses()?;
389
390        let mut order_by = if self.consume(&Token::Order)? {
391            self.expect(Token::By)?;
392            let clauses = self.parse_order_by_list()?;
393            validate_update_order_by(&clauses, self.position())?;
394            clauses
395        } else {
396            Vec::new()
397        };
398
399        // Optional `LIMIT N` — used by `BATCH N ROWS` data migrations
400        // to cap a single batch. Must come after WHERE / WITH because
401        // those have their own keyword tokens that the LIMIT branch
402        // would otherwise mis-consume.
403        let limit = if self.consume(&Token::Limit)? {
404            Some(self.parse_integer()? as u64)
405        } else {
406            None
407        };
408        if !order_by.is_empty() && limit.is_none() {
409            return Err(ParseError::new(
410                "UPDATE ORDER BY requires LIMIT",
411                self.position(),
412            ));
413        }
414        if !order_by.is_empty() && !update_order_by_mentions_rid(&order_by) {
415            order_by.push(OrderByClause {
416                field: FieldRef::TableColumn {
417                    table: String::new(),
418                    column: "rid".to_string(),
419                },
420                expr: None,
421                ascending: true,
422                nulls_first: false,
423            });
424        }
425
426        let returning = self.parse_returning_clause()?;
427
428        let suppress_events = if self.consume_ident_ci("SUPPRESS")? {
429            self.expect_ident_ci("EVENTS")?;
430            true
431        } else {
432            false
433        };
434
435        Ok(QueryExpr::Update(UpdateQuery {
436            table,
437            target,
438            assignment_exprs,
439            compound_assignment_ops,
440            assignments,
441            where_expr,
442            filter,
443            ttl_ms,
444            expires_at_ms,
445            with_metadata,
446            returning,
447            order_by,
448            limit,
449            suppress_events,
450        }))
451    }
452
453    fn parse_update_target(&mut self) -> Result<UpdateTarget, ParseError> {
454        if self.consume(&Token::Kv)? {
455            return Ok(UpdateTarget::Kv);
456        }
457        if self.consume(&Token::Rows)? {
458            return Ok(UpdateTarget::Rows);
459        }
460        if self.consume_ident_ci("DOCUMENTS")? {
461            return Ok(UpdateTarget::Documents);
462        }
463        if self.consume_ident_ci("NODES")? {
464            return Ok(UpdateTarget::Nodes);
465        }
466        if self.consume_ident_ci("EDGES")? {
467            return Ok(UpdateTarget::Edges);
468        }
469        Ok(UpdateTarget::Rows)
470    }
471
472    /// Parse: DELETE FROM table [WHERE filter]
473    pub fn parse_delete_query(&mut self) -> Result<QueryExpr, ParseError> {
474        self.expect(Token::Delete)?;
475        self.expect(Token::From)?;
476        let table = self.expect_ident()?;
477
478        let filter = if self.consume(&Token::Where)? {
479            Some(self.parse_filter()?)
480        } else {
481            None
482        };
483
484        let where_expr = filter.as_ref().map(filter_to_expr);
485
486        let returning = self.parse_returning_clause()?;
487
488        let suppress_events = if self.consume_ident_ci("SUPPRESS")? {
489            self.expect_ident_ci("EVENTS")?;
490            true
491        } else {
492            false
493        };
494
495        Ok(QueryExpr::Delete(DeleteQuery {
496            table,
497            where_expr,
498            filter,
499            returning,
500            suppress_events,
501        }))
502    }
503
504    /// Parse optional `RETURNING (* | col [, col ...])` clause.
505    /// Returns `None` if no RETURNING token, errors if RETURNING is present
506    /// but not followed by `*` or a non-empty column list.
507    fn parse_returning_clause(&mut self) -> Result<Option<Vec<ReturningItem>>, ParseError> {
508        if !self.consume(&Token::Returning)? {
509            return Ok(None);
510        }
511        if self.consume(&Token::Star)? {
512            return Ok(Some(vec![ReturningItem::All]));
513        }
514        let mut items = Vec::new();
515        loop {
516            if returning_expr_start(self.peek()) {
517                return Err(returning_expr_not_supported(self.position()));
518            }
519            let col = self.expect_update_returning_column()?;
520            items.push(ReturningItem::Column(col));
521            if returning_expr_tail(self.peek()) {
522                return Err(returning_expr_not_supported(self.position()));
523            }
524            if !self.consume(&Token::Comma)? {
525                break;
526            }
527        }
528        if items.is_empty() {
529            return Err(ParseError::expected(
530                vec!["*", "column name"],
531                self.peek(),
532                self.position(),
533            ));
534        }
535        Ok(Some(items))
536    }
537
538    fn expect_update_returning_column(&mut self) -> Result<String, ParseError> {
539        if self.consume(&Token::Weight)? {
540            return Ok("weight".to_string());
541        }
542        self.expect_ident_or_keyword()
543    }
544
545    /// Parse: ASK 'question' [USING provider] [MODEL 'model'] [DEPTH n]
546    /// [LIMIT n] [MIN_SCORE x] [COLLECTION col]
547    pub fn parse_ask_query(&mut self) -> Result<QueryExpr, ParseError> {
548        self.parse_ask_query_with_explain(false)
549    }
550
551    /// Parse: EXPLAIN ASK 'question' ...
552    pub fn parse_explain_ask_query(&mut self) -> Result<QueryExpr, ParseError> {
553        self.advance()?; // consume EXPLAIN
554        if !matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ASK")) {
555            return Err(ParseError::expected(
556                vec!["ASK"],
557                self.peek(),
558                self.position(),
559            ));
560        }
561        self.parse_ask_query_with_explain(true)
562    }
563
564    fn parse_ask_query_with_explain(&mut self, explain: bool) -> Result<QueryExpr, ParseError> {
565        self.advance()?; // consume ASK
566
567        let (question, question_param) = match self.peek() {
568            Token::String(_) => (self.parse_string()?, None),
569            Token::Dollar | Token::Question => {
570                let index = self.parse_param_slot("ASK question")?;
571                (String::new(), Some(index))
572            }
573            other => {
574                return Err(ParseError::expected(
575                    vec!["string", "$N", "?"],
576                    other,
577                    self.position(),
578                ));
579            }
580        };
581
582        let mut provider = None;
583        let mut model = None;
584        let mut depth = None;
585        let mut limit = None;
586        let mut min_score = None;
587        let mut collection = None;
588        let mut temperature = None;
589        let mut seed = None;
590        let mut strict = true;
591        let mut stream = false;
592        let mut cache = AskCacheClause::Default;
593
594        // Parse optional clauses in any order. Loop bound = number of
595        // clause kinds, so each can appear at most once.
596        for _ in 0..12 {
597            if self.consume(&Token::Using)? {
598                provider = Some(match &self.current.token {
599                    Token::String(_) => self.parse_string()?,
600                    _ => self.expect_ident()?,
601                });
602            } else if self.consume_ident_ci("MODEL")? {
603                model = Some(self.parse_string()?);
604            } else if self.consume(&Token::Depth)? {
605                depth = Some(self.parse_integer()? as usize);
606            } else if self.consume(&Token::Limit)? {
607                limit = Some(self.parse_integer()? as usize);
608            } else if self.consume(&Token::MinScore)? {
609                min_score = Some(self.parse_float()? as f32);
610            } else if self.consume(&Token::Collection)? {
611                collection = Some(self.expect_ident()?);
612            } else if self.consume_ident_ci("TEMPERATURE")? {
613                temperature = Some(self.parse_float()? as f32);
614            } else if self.consume_ident_ci("SEED")? {
615                seed = Some(self.parse_integer()? as u64);
616            } else if self.consume_ident_ci("STRICT")? {
617                let value = self.expect_ident_or_keyword()?;
618                if value.eq_ignore_ascii_case("ON") {
619                    strict = true;
620                } else if value.eq_ignore_ascii_case("OFF") {
621                    strict = false;
622                } else {
623                    return Err(ParseError::new(
624                        "Expected ON or OFF after STRICT",
625                        self.position(),
626                    ));
627                }
628            } else if self.consume_ident_ci("STREAM")? {
629                stream = true;
630            } else if self.consume_ident_ci("CACHE")? {
631                if !matches!(cache, AskCacheClause::Default) {
632                    return Err(ParseError::new(
633                        "ASK cache clause specified more than once",
634                        self.position(),
635                    ));
636                }
637                let ttl = self.expect_ident_or_keyword()?;
638                if !ttl.eq_ignore_ascii_case("TTL") {
639                    return Err(ParseError::new("Expected TTL after CACHE", self.position()));
640                }
641                cache = AskCacheClause::CacheTtl(self.parse_string()?);
642            } else if self.consume_ident_ci("NOCACHE")? {
643                if !matches!(cache, AskCacheClause::Default) {
644                    return Err(ParseError::new(
645                        "ASK cache clause specified more than once",
646                        self.position(),
647                    ));
648                }
649                cache = AskCacheClause::NoCache;
650            } else {
651                break;
652            }
653        }
654
655        Ok(QueryExpr::Ask(AskQuery {
656            explain,
657            question,
658            question_param,
659            provider,
660            model,
661            depth,
662            limit,
663            min_score,
664            collection,
665            temperature,
666            seed,
667            strict,
668            stream,
669            cache,
670        }))
671    }
672
673    /// Parse comma-separated identifiers (accepts keywords as column names in DML context)
674    fn parse_ident_list(&mut self) -> Result<Vec<String>, ParseError> {
675        let mut idents = Vec::new();
676        loop {
677            idents.push(self.expect_ident_or_keyword()?);
678            if !self.consume(&Token::Comma)? {
679                break;
680            }
681        }
682        Ok(idents)
683    }
684
685    /// Parse comma-separated literal values for DML statements
686    fn parse_dml_value_list(&mut self) -> Result<Vec<Value>, ParseError> {
687        self.parse_dml_expr_list()?
688            .into_iter()
689            .map(fold_expr_to_value)
690            .collect::<Result<Vec<_>, _>>()
691            .map_err(|msg| ParseError::new(msg, self.position()))
692    }
693
694    fn parse_dml_expr_list(&mut self) -> Result<Vec<Expr>, ParseError> {
695        let mut values = Vec::new();
696        loop {
697            values.push(self.parse_expr()?);
698            if !self.consume(&Token::Comma)? {
699                break;
700            }
701        }
702        Ok(values)
703    }
704
705    /// Parse a single literal value (string, number, true, false, null, array)
706    pub(crate) fn parse_literal_value(&mut self) -> Result<Value, ParseError> {
707        // Depth guard: this function recurses for nested array `[…]`
708        // and object `{…}` literals (see the LBracket / LBrace arms
709        // below). Without entering the depth counter, an adversarial
710        // payload like `[[[[…(10k×)…]]]]` would overflow the Rust
711        // stack BEFORE `ParserLimits::max_depth` fires. The
712        // `JsonLiteral` token path uses `json_literal_depth_check`
713        // (iterative) — the bare `[`/`{` path needs the recursion
714        // counter explicitly.
715        self.enter_depth()?;
716        let result = self.parse_literal_value_inner();
717        self.exit_depth();
718        result
719    }
720
721    fn parse_literal_value_inner(&mut self) -> Result<Value, ParseError> {
722        // Recognize PASSWORD('plaintext') and SECRET('plaintext') as
723        // typed literal constructors. The parser stores them as
724        // sentinel-prefixed values so that the INSERT executor can
725        // apply the crypto transform (argon2id hash / AES-256-GCM
726        // encrypt) without the parser depending on auth or crypto
727        // subsystems.
728        if let Token::Ident(name) = self.peek().clone() {
729            let upper = name.to_uppercase();
730            if upper == "PASSWORD" || upper == "SECRET" {
731                self.advance()?; // consume ident
732                self.expect(Token::LParen)?;
733                let plaintext = self.parse_string()?;
734                self.expect(Token::RParen)?;
735                return Ok(match upper.as_str() {
736                    "PASSWORD" => Value::Password(format!("@@plain@@{plaintext}")),
737                    "SECRET" => Value::Secret(format!("@@plain@@{plaintext}").into_bytes()),
738                    _ => unreachable!(),
739                });
740            }
741            if upper == "SECRET_REF" {
742                self.advance()?; // consume ident
743                self.expect(Token::LParen)?;
744                let store = self.expect_ident_or_keyword()?.to_ascii_lowercase();
745                if store != "vault" {
746                    return Err(ParseError::expected(
747                        vec!["vault"],
748                        self.peek(),
749                        self.position(),
750                    ));
751                }
752                self.expect(Token::Comma)?;
753                let (collection, key) =
754                    self.parse_kv_key(crate::catalog::CollectionModel::Vault)?;
755                self.expect(Token::RParen)?;
756                return Ok(secret_ref_value(&store, &collection, &key));
757            }
758        }
759
760        match self.peek().clone() {
761            Token::String(s) => {
762                let s = s.clone();
763                self.advance()?;
764                Ok(Value::text(s))
765            }
766            Token::JsonLiteral(raw) => {
767                // The lexer already validated brace balance and the
768                // 16 MiB payload ceiling. Parse the raw text into a
769                // canonical JsonValue then re-encode via `to_vec` so
770                // the on-disk bytes match the quoted form.
771                self.advance()?;
772                let json_value = crate::utils::json::parse_json(&raw).map_err(|err| {
773                    ParseError::new(
774                        // F-05: render the underlying parse-error string
775                        // via `{:?}` so any user fragment serde echoed
776                        // back (unexpected character, key text, …) is
777                        // Debug-escaped before reaching the downstream
778                        // JSON / audit / log / gRPC sinks.
779                        format!("invalid JSON object literal: {:?}", err.to_string()),
780                        self.position(),
781                    )
782                })?;
783                json_literal_depth_check(&json_value)
784                    .map_err(|err| ParseError::new(err, self.position()))?;
785                let canonical = crate::serde_json::Value::from(json_value);
786                let bytes = crate::json::to_vec(&canonical).map_err(|err| {
787                    ParseError::new(
788                        // F-05: escape the encoder error via `{:?}` so any
789                        // user fragment it carries cannot smuggle control
790                        // bytes through downstream serialization sinks.
791                        format!("failed to encode JSON literal: {:?}", err.to_string()),
792                        self.position(),
793                    )
794                })?;
795                Ok(Value::Json(bytes))
796            }
797            Token::Integer(n) => {
798                self.advance()?;
799                Ok(Value::Integer(n))
800            }
801            Token::Float(n) => {
802                self.advance()?;
803                Ok(Value::Float(n))
804            }
805            Token::True => {
806                self.advance()?;
807                Ok(Value::Boolean(true))
808            }
809            Token::False => {
810                self.advance()?;
811                Ok(Value::Boolean(false))
812            }
813            Token::Null => {
814                self.advance()?;
815                Ok(Value::Null)
816            }
817            Token::LBracket => {
818                // Parse array literal [val1, val2, ...]
819                // For numeric arrays, produce Value::Vector; for others, produce Value::Json
820                self.advance()?; // consume '['
821                let mut items = Vec::new();
822                if !self.check(&Token::RBracket) {
823                    loop {
824                        items.push(self.parse_literal_value()?);
825                        if !self.consume(&Token::Comma)? {
826                            break;
827                        }
828                    }
829                }
830                self.expect(Token::RBracket)?;
831
832                // Check if all items are numeric (Integer or Float) -> Value::Vector
833                let all_numeric = items
834                    .iter()
835                    .all(|v| matches!(v, Value::Integer(_) | Value::Float(_)));
836                if all_numeric && !items.is_empty() {
837                    let floats: Vec<f32> = items
838                        .iter()
839                        .map(|v| match v {
840                            Value::Float(f) => *f as f32,
841                            Value::Integer(i) => *i as f32,
842                            _ => 0.0,
843                        })
844                        .collect();
845                    Ok(Value::Vector(floats))
846                } else {
847                    // Encode as JSON bytes
848                    let json_arr: Vec<crate::json::Value> = items
849                        .iter()
850                        .map(|v| match v {
851                            Value::Null => crate::json::Value::Null,
852                            Value::Boolean(b) => crate::json::Value::Bool(*b),
853                            Value::Integer(i) => crate::json::Value::Number(*i as f64),
854                            Value::Float(f) => crate::json::Value::Number(*f),
855                            Value::Text(s) => crate::json::Value::String(s.to_string()),
856                            _ => crate::json::Value::Null,
857                        })
858                        .collect();
859                    let json_val = crate::json::Value::Array(json_arr);
860                    let bytes = crate::json::to_vec(&json_val).unwrap_or_default();
861                    Ok(Value::Json(bytes))
862                }
863            }
864            Token::LBrace => {
865                // Parse JSON object literal {key: value, ...}
866                self.advance()?; // consume '{'
867                let mut map = crate::json::Map::new();
868                if !self.check(&Token::RBrace) {
869                    loop {
870                        // Key: string or identifier. Reserved-word
871                        // keys (`level`, `msg`, `type`, …) fall through
872                        // to `expect_ident_or_keyword`, which returns
873                        // the canonical UPPERCASE spelling; lowercase
874                        // that path so the JSON object preserves the
875                        // source casing.
876                        let key = match self.peek().clone() {
877                            Token::String(s) => {
878                                self.advance()?;
879                                s
880                            }
881                            Token::Ident(s) => {
882                                self.advance()?;
883                                s
884                            }
885                            _ => self.expect_ident_or_keyword()?.to_ascii_lowercase(),
886                        };
887                        // Separator: ':' or '='
888                        if !self.consume(&Token::Colon)? {
889                            self.expect(Token::Eq)?;
890                        }
891                        // Value: recursive
892                        let val = self.parse_literal_value()?;
893                        let json_val = match val {
894                            Value::Null => crate::json::Value::Null,
895                            Value::Boolean(b) => crate::json::Value::Bool(b),
896                            Value::Integer(i) => crate::json::Value::Number(i as f64),
897                            Value::Float(f) => crate::json::Value::Number(f),
898                            Value::Text(s) => crate::json::Value::String(s.to_string()),
899                            Value::Json(ref bytes) => {
900                                crate::json::from_slice(bytes).unwrap_or(crate::json::Value::Null)
901                            }
902                            _ => crate::json::Value::Null,
903                        };
904                        map.insert(key, json_val);
905                        if !self.consume(&Token::Comma)? {
906                            break;
907                        }
908                    }
909                }
910                self.expect(Token::RBrace)?;
911                let json_val = crate::json::Value::Object(map);
912                let bytes = crate::json::to_vec(&json_val).unwrap_or_default();
913                Ok(Value::Json(bytes))
914            }
915            ref other => Err(ParseError::expected(
916                vec!["string", "number", "true", "false", "null", "[", "{"],
917                other,
918                self.position(),
919            )),
920        }
921    }
922}
923
924fn returning_expr_start(token: &Token) -> bool {
925    matches!(
926        token,
927        Token::Integer(_)
928            | Token::Float(_)
929            | Token::String(_)
930            | Token::JsonLiteral(_)
931            | Token::Null
932            | Token::True
933            | Token::False
934            | Token::LParen
935            | Token::Minus
936            | Token::Question
937            | Token::Dollar
938    )
939}
940
941fn returning_expr_tail(token: &Token) -> bool {
942    matches!(
943        token,
944        Token::LParen
945            | Token::Plus
946            | Token::Minus
947            | Token::Star
948            | Token::Slash
949            | Token::Percent
950            | Token::DoublePipe
951            | Token::Pipe
952            | Token::Eq
953            | Token::Ne
954            | Token::Lt
955            | Token::Le
956            | Token::Gt
957            | Token::Ge
958            | Token::Dot
959            | Token::Colon
960    )
961}
962
963fn validate_update_order_by(
964    clauses: &[OrderByClause],
965    position: super::super::lexer::Position,
966) -> Result<(), ParseError> {
967    for clause in clauses {
968        if clause.expr.is_some() {
969            return Err(ParseError::new(
970                "UPDATE ORDER BY only supports top-level fields",
971                position,
972            ));
973        }
974        match &clause.field {
975            FieldRef::TableColumn { table, column }
976                if table.is_empty() && !column.contains('.') => {}
977            _ => {
978                return Err(ParseError::new(
979                    "UPDATE ORDER BY only supports top-level fields",
980                    position,
981                ));
982            }
983        }
984    }
985    Ok(())
986}
987
988fn update_order_by_mentions_rid(clauses: &[OrderByClause]) -> bool {
989    clauses.iter().any(|clause| {
990        matches!(
991            &clause.field,
992            FieldRef::TableColumn { table, column }
993                if table.is_empty() && column.eq_ignore_ascii_case("rid")
994        )
995    })
996}
997
998fn returning_expr_not_supported(position: super::super::lexer::Position) -> ParseError {
999    ParseError::new(
1000        "NOT_YET_SUPPORTED: RETURNING expressions are not supported yet; use RETURNING * or named columns. Track a follow-up issue for RETURNING <expr>.",
1001        position,
1002    )
1003}
1004
1005fn secret_ref_value(store: &str, collection: &str, key: &str) -> Value {
1006    let mut map = crate::json::Map::new();
1007    map.insert(
1008        "type".to_string(),
1009        crate::json::Value::String("secret_ref".to_string()),
1010    );
1011    map.insert(
1012        "store".to_string(),
1013        crate::json::Value::String(store.to_string()),
1014    );
1015    map.insert(
1016        "collection".to_string(),
1017        crate::json::Value::String(collection.to_string()),
1018    );
1019    map.insert(
1020        "key".to_string(),
1021        crate::json::Value::String(key.to_string()),
1022    );
1023    Value::Json(crate::json::to_vec(&crate::json::Value::Object(map)).unwrap_or_default())
1024}