1use super::super::ast::{
4 AskCacheClause, AskQuery, DeleteQuery, Expr, Filter, InsertEntityType, InsertQuery, QueryExpr,
5 ReturningItem, UpdateQuery,
6};
7use super::super::lexer::Token;
8use super::error::ParseError;
9use super::Parser;
10use crate::storage::query::sql_lowering::{filter_to_expr, fold_expr_to_value};
11use crate::storage::schema::Value;
12
13pub(crate) const JSON_LITERAL_MAX_DEPTH: u32 = 128;
17
18pub(crate) fn json_literal_depth_check(
22 value: &crate::utils::json::JsonValue,
23) -> Result<(), String> {
24 use crate::utils::json::JsonValue;
25 let mut stack: Vec<(&JsonValue, u32)> = vec![(value, 1)];
26 while let Some((node, depth)) = stack.pop() {
27 if depth > JSON_LITERAL_MAX_DEPTH {
28 return Err(format!(
29 "JSON object literal exceeds JSON_LITERAL_MAX_DEPTH ({})",
30 JSON_LITERAL_MAX_DEPTH
31 ));
32 }
33 match node {
34 JsonValue::Object(entries) => {
35 for (_, v) in entries {
36 stack.push((v, depth + 1));
37 }
38 }
39 JsonValue::Array(items) => {
40 for v in items {
41 stack.push((v, depth + 1));
42 }
43 }
44 _ => {}
45 }
46 }
47 Ok(())
48}
49
50impl<'a> Parser<'a> {
51 pub fn parse_insert_query(&mut self) -> Result<QueryExpr, ParseError> {
53 self.expect(Token::Insert)?;
54 self.expect(Token::Into)?;
55 let table = self.expect_ident()?;
56
57 let entity_type = match self.peek().clone() {
59 Token::Node => {
60 self.advance()?;
61 InsertEntityType::Node
62 }
63 Token::Edge => {
64 self.advance()?;
65 InsertEntityType::Edge
66 }
67 Token::Vector => {
68 self.advance()?;
69 InsertEntityType::Vector
70 }
71 Token::Document => {
72 self.advance()?;
73 InsertEntityType::Document
74 }
75 Token::Kv => {
76 self.advance()?;
77 InsertEntityType::Kv
78 }
79 _ => InsertEntityType::Row,
80 };
81
82 self.expect(Token::LParen)?;
84 let columns = self.parse_ident_list()?;
85 self.expect(Token::RParen)?;
86
87 self.expect(Token::Values)?;
89 let mut all_values = Vec::new();
90 let mut all_value_exprs = Vec::new();
91 loop {
92 self.expect(Token::LParen)?;
93 let row_exprs = self.parse_dml_expr_list()?;
94 self.expect(Token::RParen)?;
95 let row_values = row_exprs
104 .iter()
105 .map(|expr| match fold_expr_to_value(expr.clone()) {
106 Ok(value) => Ok(value),
107 Err(msg) => {
108 if crate::storage::query::user_params::expr_contains_parameter(&expr) {
109 Ok(Value::Null)
110 } else {
111 Err(msg)
112 }
113 }
114 })
115 .collect::<Result<Vec<_>, _>>()
116 .map_err(|msg| ParseError::new(msg, self.position()))?;
117 all_value_exprs.push(row_exprs);
118 all_values.push(row_values);
119 if !self.consume(&Token::Comma)? {
120 break;
121 }
122 }
123
124 let (ttl_ms, expires_at_ms, with_metadata, auto_embed) = self.parse_with_clauses()?;
126
127 let returning = self.parse_returning_clause()?;
128
129 let suppress_events = if self.consume_ident_ci("SUPPRESS")? {
130 self.expect_ident_ci("EVENTS")?;
131 true
132 } else {
133 false
134 };
135
136 Ok(QueryExpr::Insert(InsertQuery {
137 table,
138 entity_type,
139 columns,
140 value_exprs: all_value_exprs,
141 values: all_values,
142 returning,
143 ttl_ms,
144 expires_at_ms,
145 with_metadata,
146 auto_embed,
147 suppress_events,
148 }))
149 }
150
151 fn parse_ttl_duration(&mut self) -> Result<u64, ParseError> {
153 let ttl_value = self.parse_float()?;
155 let ttl_unit = match self.peek() {
156 Token::Ident(unit) => {
157 let unit = unit.clone();
158 self.advance()?;
159 unit
160 }
161 _ => "s".to_string(),
162 };
163
164 let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
165 "ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
166 "s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
167 "m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
168 "h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
169 "d" | "day" | "days" => 86_400_000.0,
170 other => {
171 return Err(ParseError::new(
172 format!("unsupported TTL unit {other:?}"),
176 self.position(),
177 ));
178 }
179 };
180
181 Ok((ttl_value * multiplier_ms) as u64)
182 }
183
184 pub fn parse_with_clauses(
187 &mut self,
188 ) -> Result<
189 (
190 Option<u64>,
191 Option<u64>,
192 Vec<(String, Value)>,
193 Option<crate::storage::query::ast::AutoEmbedConfig>,
194 ),
195 ParseError,
196 > {
197 let mut ttl_ms = None;
198 let mut expires_at_ms = None;
199 let mut with_metadata = Vec::new();
200 let mut auto_embed = None;
201
202 while self.consume(&Token::With)? {
203 if self.consume_ident_ci("TTL")? {
204 ttl_ms = Some(self.parse_ttl_duration()?);
205 } else if self.consume_ident_ci("EXPIRES")? {
206 self.expect_ident_ci("AT")?;
207 let ts = self.parse_expires_at_value()?;
208 expires_at_ms = Some(ts);
209 } else if self.consume(&Token::Metadata)? || self.consume_ident_ci("METADATA")? {
210 with_metadata = self.parse_with_metadata_pairs()?;
211 } else if self.consume_ident_ci("AUTO")? {
212 self.consume_ident_ci("EMBED")?;
214 self.expect(Token::LParen)?;
215 let mut fields = Vec::new();
216 loop {
217 fields.push(self.expect_ident()?);
218 if !self.consume(&Token::Comma)? {
219 break;
220 }
221 }
222 self.expect(Token::RParen)?;
223 let provider = if self.consume(&Token::Using)? {
228 self.expect_ident()?
229 } else {
230 "openai".to_string()
231 };
232 let model = if self.consume_ident_ci("MODEL")? {
233 Some(self.parse_string()?)
234 } else {
235 None
236 };
237 auto_embed = Some(crate::storage::query::ast::AutoEmbedConfig {
238 fields,
239 provider,
240 model,
241 });
242 } else {
243 return Err(ParseError::expected(
244 vec!["TTL", "EXPIRES AT", "METADATA", "AUTO EMBED"],
245 self.peek(),
246 self.position(),
247 ));
248 }
249 }
250
251 Ok((ttl_ms, expires_at_ms, with_metadata, auto_embed))
252 }
253
254 fn expect_ident_ci(&mut self, expected: &str) -> Result<(), ParseError> {
256 if self.consume_ident_ci(expected)? {
257 Ok(())
258 } else {
259 Err(ParseError::expected(
260 vec![expected],
261 self.peek(),
262 self.position(),
263 ))
264 }
265 }
266
267 fn parse_expires_at_value(&mut self) -> Result<u64, ParseError> {
269 if let Ok(value) = self.parse_integer() {
271 return Ok(value as u64);
272 }
273 if let Ok(text) = self.parse_string() {
275 let trimmed = text.trim();
277 if let Ok(ts) = trimmed.parse::<u64>() {
278 return Ok(ts);
279 }
280 return Err(ParseError::new(
282 format!("EXPIRES AT requires a unix timestamp in milliseconds, got {trimmed:?}"),
286 self.position(),
287 ));
288 }
289 Err(ParseError::expected(
290 vec!["timestamp (unix ms) or 'YYYY-MM-DD'"],
291 self.peek(),
292 self.position(),
293 ))
294 }
295
296 fn parse_with_metadata_pairs(&mut self) -> Result<Vec<(String, Value)>, ParseError> {
298 self.expect(Token::LParen)?;
299 let mut pairs = Vec::new();
300 if !self.check(&Token::RParen) {
301 loop {
302 let key = self.expect_ident_or_keyword()?.to_ascii_lowercase();
303 self.expect(Token::Eq)?;
304 let value = self.parse_literal_value()?;
305 pairs.push((key, value));
306 if !self.consume(&Token::Comma)? {
307 break;
308 }
309 }
310 }
311 self.expect(Token::RParen)?;
312 Ok(pairs)
313 }
314
315 pub fn parse_update_query(&mut self) -> Result<QueryExpr, ParseError> {
317 self.expect(Token::Update)?;
318 let table = self.expect_ident()?;
319 self.expect(Token::Set)?;
320
321 let mut assignments = Vec::new();
322 let mut assignment_exprs = Vec::new();
323 loop {
324 let col = self.expect_ident()?;
325 self.expect(Token::Eq)?;
326 let expr = self.parse_expr()?;
327 let folded = fold_expr_to_value(expr.clone()).ok();
328 assignment_exprs.push((col.clone(), expr));
329 if let Some(val) = folded {
330 assignments.push((col.clone(), val));
331 }
332 if !self.consume(&Token::Comma)? {
333 break;
334 }
335 }
336
337 let filter = if self.consume(&Token::Where)? {
338 Some(self.parse_filter()?)
339 } else {
340 None
341 };
342 let where_expr = filter.as_ref().map(filter_to_expr);
343
344 let (ttl_ms, expires_at_ms, with_metadata, _auto_embed) = self.parse_with_clauses()?;
345
346 let limit = if self.consume(&Token::Limit)? {
351 Some(self.parse_integer()? as u64)
352 } else {
353 None
354 };
355
356 let returning = self.parse_returning_clause()?;
357
358 let suppress_events = if self.consume_ident_ci("SUPPRESS")? {
359 self.expect_ident_ci("EVENTS")?;
360 true
361 } else {
362 false
363 };
364
365 Ok(QueryExpr::Update(UpdateQuery {
366 table,
367 assignment_exprs,
368 assignments,
369 where_expr,
370 filter,
371 ttl_ms,
372 expires_at_ms,
373 with_metadata,
374 returning,
375 limit,
376 suppress_events,
377 }))
378 }
379
380 pub fn parse_delete_query(&mut self) -> Result<QueryExpr, ParseError> {
382 self.expect(Token::Delete)?;
383 self.expect(Token::From)?;
384 let table = self.expect_ident()?;
385
386 let filter = if self.consume(&Token::Where)? {
387 Some(self.parse_filter()?)
388 } else {
389 None
390 };
391
392 let where_expr = filter.as_ref().map(filter_to_expr);
393
394 let returning = self.parse_returning_clause()?;
395
396 let suppress_events = if self.consume_ident_ci("SUPPRESS")? {
397 self.expect_ident_ci("EVENTS")?;
398 true
399 } else {
400 false
401 };
402
403 Ok(QueryExpr::Delete(DeleteQuery {
404 table,
405 where_expr,
406 filter,
407 returning,
408 suppress_events,
409 }))
410 }
411
412 fn parse_returning_clause(&mut self) -> Result<Option<Vec<ReturningItem>>, ParseError> {
416 if !self.consume(&Token::Returning)? {
417 return Ok(None);
418 }
419 if self.consume(&Token::Star)? {
420 return Ok(Some(vec![ReturningItem::All]));
421 }
422 let mut items = Vec::new();
423 loop {
424 let col = self.expect_ident_or_keyword()?;
425 items.push(ReturningItem::Column(col));
426 if !self.consume(&Token::Comma)? {
427 break;
428 }
429 }
430 if items.is_empty() {
431 return Err(ParseError::expected(
432 vec!["*", "column name"],
433 self.peek(),
434 self.position(),
435 ));
436 }
437 Ok(Some(items))
438 }
439
440 pub fn parse_ask_query(&mut self) -> Result<QueryExpr, ParseError> {
443 self.parse_ask_query_with_explain(false)
444 }
445
446 pub fn parse_explain_ask_query(&mut self) -> Result<QueryExpr, ParseError> {
448 self.advance()?; if !matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ASK")) {
450 return Err(ParseError::expected(
451 vec!["ASK"],
452 self.peek(),
453 self.position(),
454 ));
455 }
456 self.parse_ask_query_with_explain(true)
457 }
458
459 fn parse_ask_query_with_explain(&mut self, explain: bool) -> Result<QueryExpr, ParseError> {
460 self.advance()?; let (question, question_param) = match self.peek() {
463 Token::String(_) => (self.parse_string()?, None),
464 Token::Dollar | Token::Question => {
465 let index = self.parse_param_slot("ASK question")?;
466 (String::new(), Some(index))
467 }
468 other => {
469 return Err(ParseError::expected(
470 vec!["string", "$N", "?"],
471 other,
472 self.position(),
473 ));
474 }
475 };
476
477 let mut provider = None;
478 let mut model = None;
479 let mut depth = None;
480 let mut limit = None;
481 let mut min_score = None;
482 let mut collection = None;
483 let mut temperature = None;
484 let mut seed = None;
485 let mut strict = true;
486 let mut stream = false;
487 let mut cache = AskCacheClause::Default;
488
489 for _ in 0..12 {
492 if self.consume(&Token::Using)? {
493 provider = Some(match &self.current.token {
494 Token::String(_) => self.parse_string()?,
495 _ => self.expect_ident()?,
496 });
497 } else if self.consume_ident_ci("MODEL")? {
498 model = Some(self.parse_string()?);
499 } else if self.consume(&Token::Depth)? {
500 depth = Some(self.parse_integer()? as usize);
501 } else if self.consume(&Token::Limit)? {
502 limit = Some(self.parse_integer()? as usize);
503 } else if self.consume(&Token::MinScore)? {
504 min_score = Some(self.parse_float()? as f32);
505 } else if self.consume(&Token::Collection)? {
506 collection = Some(self.expect_ident()?);
507 } else if self.consume_ident_ci("TEMPERATURE")? {
508 temperature = Some(self.parse_float()? as f32);
509 } else if self.consume_ident_ci("SEED")? {
510 seed = Some(self.parse_integer()? as u64);
511 } else if self.consume_ident_ci("STRICT")? {
512 let value = self.expect_ident_or_keyword()?;
513 if value.eq_ignore_ascii_case("ON") {
514 strict = true;
515 } else if value.eq_ignore_ascii_case("OFF") {
516 strict = false;
517 } else {
518 return Err(ParseError::new(
519 "Expected ON or OFF after STRICT",
520 self.position(),
521 ));
522 }
523 } else if self.consume_ident_ci("STREAM")? {
524 stream = true;
525 } else if self.consume_ident_ci("CACHE")? {
526 if !matches!(cache, AskCacheClause::Default) {
527 return Err(ParseError::new(
528 "ASK cache clause specified more than once",
529 self.position(),
530 ));
531 }
532 let ttl = self.expect_ident_or_keyword()?;
533 if !ttl.eq_ignore_ascii_case("TTL") {
534 return Err(ParseError::new("Expected TTL after CACHE", self.position()));
535 }
536 cache = AskCacheClause::CacheTtl(self.parse_string()?);
537 } else if self.consume_ident_ci("NOCACHE")? {
538 if !matches!(cache, AskCacheClause::Default) {
539 return Err(ParseError::new(
540 "ASK cache clause specified more than once",
541 self.position(),
542 ));
543 }
544 cache = AskCacheClause::NoCache;
545 } else {
546 break;
547 }
548 }
549
550 Ok(QueryExpr::Ask(AskQuery {
551 explain,
552 question,
553 question_param,
554 provider,
555 model,
556 depth,
557 limit,
558 min_score,
559 collection,
560 temperature,
561 seed,
562 strict,
563 stream,
564 cache,
565 }))
566 }
567
568 fn parse_ident_list(&mut self) -> Result<Vec<String>, ParseError> {
570 let mut idents = Vec::new();
571 loop {
572 idents.push(self.expect_ident_or_keyword()?);
573 if !self.consume(&Token::Comma)? {
574 break;
575 }
576 }
577 Ok(idents)
578 }
579
580 fn parse_dml_value_list(&mut self) -> Result<Vec<Value>, ParseError> {
582 self.parse_dml_expr_list()?
583 .into_iter()
584 .map(fold_expr_to_value)
585 .collect::<Result<Vec<_>, _>>()
586 .map_err(|msg| ParseError::new(msg, self.position()))
587 }
588
589 fn parse_dml_expr_list(&mut self) -> Result<Vec<Expr>, ParseError> {
590 let mut values = Vec::new();
591 loop {
592 values.push(self.parse_expr()?);
593 if !self.consume(&Token::Comma)? {
594 break;
595 }
596 }
597 Ok(values)
598 }
599
600 pub(crate) fn parse_literal_value(&mut self) -> Result<Value, ParseError> {
602 self.enter_depth()?;
611 let result = self.parse_literal_value_inner();
612 self.exit_depth();
613 result
614 }
615
616 fn parse_literal_value_inner(&mut self) -> Result<Value, ParseError> {
617 if let Token::Ident(name) = self.peek().clone() {
624 let upper = name.to_uppercase();
625 if upper == "PASSWORD" || upper == "SECRET" {
626 self.advance()?; self.expect(Token::LParen)?;
628 let plaintext = self.parse_string()?;
629 self.expect(Token::RParen)?;
630 return Ok(match upper.as_str() {
631 "PASSWORD" => Value::Password(format!("@@plain@@{plaintext}")),
632 "SECRET" => Value::Secret(format!("@@plain@@{plaintext}").into_bytes()),
633 _ => unreachable!(),
634 });
635 }
636 if upper == "SECRET_REF" {
637 self.advance()?; self.expect(Token::LParen)?;
639 let store = self.expect_ident_or_keyword()?.to_ascii_lowercase();
640 if store != "vault" {
641 return Err(ParseError::expected(
642 vec!["vault"],
643 self.peek(),
644 self.position(),
645 ));
646 }
647 self.expect(Token::Comma)?;
648 let (collection, key) =
649 self.parse_kv_key(crate::catalog::CollectionModel::Vault)?;
650 self.expect(Token::RParen)?;
651 return Ok(secret_ref_value(&store, &collection, &key));
652 }
653 }
654
655 match self.peek().clone() {
656 Token::String(s) => {
657 let s = s.clone();
658 self.advance()?;
659 Ok(Value::text(s))
660 }
661 Token::JsonLiteral(raw) => {
662 self.advance()?;
667 let json_value = crate::utils::json::parse_json(&raw).map_err(|err| {
668 ParseError::new(
669 format!("invalid JSON object literal: {:?}", err.to_string()),
675 self.position(),
676 )
677 })?;
678 json_literal_depth_check(&json_value)
679 .map_err(|err| ParseError::new(err, self.position()))?;
680 let canonical = crate::serde_json::Value::from(json_value);
681 let bytes = crate::json::to_vec(&canonical).map_err(|err| {
682 ParseError::new(
683 format!("failed to encode JSON literal: {:?}", err.to_string()),
687 self.position(),
688 )
689 })?;
690 Ok(Value::Json(bytes))
691 }
692 Token::Integer(n) => {
693 self.advance()?;
694 Ok(Value::Integer(n))
695 }
696 Token::Float(n) => {
697 self.advance()?;
698 Ok(Value::Float(n))
699 }
700 Token::True => {
701 self.advance()?;
702 Ok(Value::Boolean(true))
703 }
704 Token::False => {
705 self.advance()?;
706 Ok(Value::Boolean(false))
707 }
708 Token::Null => {
709 self.advance()?;
710 Ok(Value::Null)
711 }
712 Token::LBracket => {
713 self.advance()?; let mut items = Vec::new();
717 if !self.check(&Token::RBracket) {
718 loop {
719 items.push(self.parse_literal_value()?);
720 if !self.consume(&Token::Comma)? {
721 break;
722 }
723 }
724 }
725 self.expect(Token::RBracket)?;
726
727 let all_numeric = items
729 .iter()
730 .all(|v| matches!(v, Value::Integer(_) | Value::Float(_)));
731 if all_numeric && !items.is_empty() {
732 let floats: Vec<f32> = items
733 .iter()
734 .map(|v| match v {
735 Value::Float(f) => *f as f32,
736 Value::Integer(i) => *i as f32,
737 _ => 0.0,
738 })
739 .collect();
740 Ok(Value::Vector(floats))
741 } else {
742 let json_arr: Vec<crate::json::Value> = items
744 .iter()
745 .map(|v| match v {
746 Value::Null => crate::json::Value::Null,
747 Value::Boolean(b) => crate::json::Value::Bool(*b),
748 Value::Integer(i) => crate::json::Value::Number(*i as f64),
749 Value::Float(f) => crate::json::Value::Number(*f),
750 Value::Text(s) => crate::json::Value::String(s.to_string()),
751 _ => crate::json::Value::Null,
752 })
753 .collect();
754 let json_val = crate::json::Value::Array(json_arr);
755 let bytes = crate::json::to_vec(&json_val).unwrap_or_default();
756 Ok(Value::Json(bytes))
757 }
758 }
759 Token::LBrace => {
760 self.advance()?; let mut map = crate::json::Map::new();
763 if !self.check(&Token::RBrace) {
764 loop {
765 let key = match self.peek().clone() {
772 Token::String(s) => {
773 self.advance()?;
774 s
775 }
776 Token::Ident(s) => {
777 self.advance()?;
778 s
779 }
780 _ => self.expect_ident_or_keyword()?.to_ascii_lowercase(),
781 };
782 if !self.consume(&Token::Colon)? {
784 self.expect(Token::Eq)?;
785 }
786 let val = self.parse_literal_value()?;
788 let json_val = match val {
789 Value::Null => crate::json::Value::Null,
790 Value::Boolean(b) => crate::json::Value::Bool(b),
791 Value::Integer(i) => crate::json::Value::Number(i as f64),
792 Value::Float(f) => crate::json::Value::Number(f),
793 Value::Text(s) => crate::json::Value::String(s.to_string()),
794 Value::Json(ref bytes) => {
795 crate::json::from_slice(bytes).unwrap_or(crate::json::Value::Null)
796 }
797 _ => crate::json::Value::Null,
798 };
799 map.insert(key, json_val);
800 if !self.consume(&Token::Comma)? {
801 break;
802 }
803 }
804 }
805 self.expect(Token::RBrace)?;
806 let json_val = crate::json::Value::Object(map);
807 let bytes = crate::json::to_vec(&json_val).unwrap_or_default();
808 Ok(Value::Json(bytes))
809 }
810 ref other => Err(ParseError::expected(
811 vec!["string", "number", "true", "false", "null", "[", "{"],
812 other,
813 self.position(),
814 )),
815 }
816 }
817}
818
819fn secret_ref_value(store: &str, collection: &str, key: &str) -> Value {
820 let mut map = crate::json::Map::new();
821 map.insert(
822 "type".to_string(),
823 crate::json::Value::String("secret_ref".to_string()),
824 );
825 map.insert(
826 "store".to_string(),
827 crate::json::Value::String(store.to_string()),
828 );
829 map.insert(
830 "collection".to_string(),
831 crate::json::Value::String(collection.to_string()),
832 );
833 map.insert(
834 "key".to_string(),
835 crate::json::Value::String(key.to_string()),
836 );
837 Value::Json(crate::json::to_vec(&crate::json::Value::Object(map)).unwrap_or_default())
838}