use super::error::ParseError;
use super::Parser;
use crate::ast::{
AskCacheClause, AskQuery, BinOp, DeleteQuery, Expr, FieldRef, Filter, InsertEntityType,
InsertQuery, OrderByClause, QueryExpr, ReturningItem, UpdateQuery, UpdateTarget,
};
use crate::lexer::Token;
use crate::sql_lowering::{filter_to_expr, fold_expr_to_value};
use reddb_types::types::Value;
pub(crate) use crate::limits::JSON_LITERAL_MAX_DEPTH;
pub(crate) fn json_literal_depth_check(
value: &reddb_types::utils::json::JsonValue,
) -> Result<(), String> {
use reddb_types::utils::json::JsonValue;
let mut stack: Vec<(&JsonValue, u32)> = vec![(value, 1)];
while let Some((node, depth)) = stack.pop() {
if depth > JSON_LITERAL_MAX_DEPTH {
return Err(format!(
"JSON object literal exceeds JSON_LITERAL_MAX_DEPTH ({})",
JSON_LITERAL_MAX_DEPTH
));
}
match node {
JsonValue::Object(entries) => {
for (_, v) in entries {
stack.push((v, depth + 1));
}
}
JsonValue::Array(items) => {
for v in items {
stack.push((v, depth + 1));
}
}
_ => {}
}
}
Ok(())
}
impl<'a> Parser<'a> {
pub fn parse_insert_query(&mut self) -> Result<QueryExpr, ParseError> {
self.expect(Token::Insert)?;
self.expect(Token::Into)?;
if matches!(self.peek(), Token::Metric) {
return Err(ParseError::new(
"INSERT INTO METRIC is not supported in Analytics v0 — \
write raw samples into an ordinary TABLE/DOCUMENT \
collection; the metric descriptor catalog is reached \
via CREATE METRIC and red.analytics.metrics \
(PRD #782 non-goal)",
self.position(),
));
}
let table = self.expect_ident()?;
let entity_type = match self.peek().clone() {
Token::Node => {
self.advance()?;
InsertEntityType::Node
}
Token::Edge => {
self.advance()?;
InsertEntityType::Edge
}
Token::Vector => {
self.advance()?;
InsertEntityType::Vector
}
Token::Document => {
self.advance()?;
InsertEntityType::Document
}
Token::Kv => {
self.advance()?;
InsertEntityType::Kv
}
_ => InsertEntityType::Row,
};
self.expect(Token::LParen)?;
let columns = self.parse_ident_list()?;
self.expect(Token::RParen)?;
self.expect(Token::Values)?;
let mut all_values = Vec::new();
let mut all_value_exprs = Vec::new();
loop {
self.expect(Token::LParen)?;
let row_exprs = self.parse_dml_expr_list()?;
self.expect(Token::RParen)?;
let row_values = row_exprs
.iter()
.map(|expr| match fold_expr_to_value(expr.clone()) {
Ok(value) => Ok(value),
Err(msg) => {
if crate::sql_lowering::expr_contains_parameter(expr) {
Ok(Value::Null)
} else {
Err(msg)
}
}
})
.collect::<Result<Vec<_>, _>>()
.map_err(|msg| ParseError::new(msg, self.position()))?;
all_value_exprs.push(row_exprs);
all_values.push(row_values);
if !self.consume(&Token::Comma)? {
break;
}
}
let (ttl_ms, expires_at_ms, with_metadata, auto_embed) = self.parse_with_clauses()?;
let returning = self.parse_returning_clause()?;
let suppress_events = if self.consume_ident_ci("SUPPRESS")? {
self.expect_ident_ci("EVENTS")?;
true
} else {
false
};
Ok(QueryExpr::Insert(InsertQuery {
table,
entity_type,
columns,
value_exprs: all_value_exprs,
values: all_values,
returning,
ttl_ms,
expires_at_ms,
with_metadata,
auto_embed,
suppress_events,
}))
}
fn parse_ttl_duration(&mut self) -> Result<u64, ParseError> {
let ttl_value = self.parse_float()?;
let ttl_unit = match self.peek() {
Token::Ident(unit) => {
let unit = unit.clone();
self.advance()?;
unit
}
_ => "s".to_string(),
};
let multiplier_ms = match ttl_unit.to_ascii_lowercase().as_str() {
"ms" | "msec" | "millisecond" | "milliseconds" => 1.0,
"s" | "sec" | "secs" | "second" | "seconds" => 1_000.0,
"m" | "min" | "mins" | "minute" | "minutes" => 60_000.0,
"h" | "hr" | "hrs" | "hour" | "hours" => 3_600_000.0,
"d" | "day" | "days" => 86_400_000.0,
other => {
return Err(ParseError::new(
format!(
"unsupported TTL unit {other:?}; supported units: ms, s, m, h, d (e.g. `WITH TTL 30 m`)"
),
self.position(),
));
}
};
Ok((ttl_value * multiplier_ms) as u64)
}
pub fn parse_with_clauses(
&mut self,
) -> Result<
(
Option<u64>,
Option<u64>,
Vec<(String, Value)>,
Option<crate::ast::AutoEmbedConfig>,
),
ParseError,
> {
let mut ttl_ms = None;
let mut expires_at_ms = None;
let mut with_metadata = Vec::new();
let mut auto_embed = None;
while self.consume(&Token::With)? {
if self.consume_ident_ci("TTL")? {
ttl_ms = Some(self.parse_ttl_duration()?);
} else if self.consume_ident_ci("EXPIRES")? {
self.expect_ident_ci("AT")?;
let ts = self.parse_expires_at_value()?;
expires_at_ms = Some(ts);
} else if self.consume(&Token::Metadata)? || self.consume_ident_ci("METADATA")? {
with_metadata = self.parse_with_metadata_pairs()?;
} else if self.consume_ident_ci("AUTO")? {
self.consume_ident_ci("EMBED")?;
self.expect(Token::LParen)?;
let mut fields = Vec::new();
loop {
fields.push(self.expect_ident()?);
if !self.consume(&Token::Comma)? {
break;
}
}
self.expect(Token::RParen)?;
let provider = if self.consume(&Token::Using)? {
self.expect_ident()?
} else {
"openai".to_string()
};
let model = if self.consume_ident_ci("MODEL")? {
Some(self.parse_string()?)
} else {
None
};
auto_embed = Some(crate::ast::AutoEmbedConfig {
fields,
provider,
model,
});
} else {
return Err(ParseError::expected(
vec!["TTL", "EXPIRES AT", "METADATA", "AUTO EMBED"],
self.peek(),
self.position(),
));
}
}
Ok((ttl_ms, expires_at_ms, with_metadata, auto_embed))
}
fn expect_ident_ci(&mut self, expected: &str) -> Result<(), ParseError> {
if self.consume_ident_ci(expected)? {
Ok(())
} else {
Err(ParseError::expected(
vec![expected],
self.peek(),
self.position(),
))
}
}
fn parse_expires_at_value(&mut self) -> Result<u64, ParseError> {
if let Ok(value) = self.parse_integer() {
return Ok(value as u64);
}
if let Ok(text) = self.parse_string() {
let trimmed = text.trim();
if let Ok(ts) = trimmed.parse::<u64>() {
return Ok(ts);
}
return Err(ParseError::new(
format!("EXPIRES AT requires a unix timestamp in milliseconds, got {trimmed:?}"),
self.position(),
));
}
Err(ParseError::expected(
vec!["timestamp (unix ms) or 'YYYY-MM-DD'"],
self.peek(),
self.position(),
))
}
fn parse_with_metadata_pairs(&mut self) -> Result<Vec<(String, Value)>, ParseError> {
self.expect(Token::LParen)?;
let mut pairs = Vec::new();
if !self.check(&Token::RParen) {
loop {
let key = self.expect_ident_or_keyword()?.to_ascii_lowercase();
self.expect(Token::Eq)?;
let value = self.parse_literal_value()?;
pairs.push((key, value));
if !self.consume(&Token::Comma)? {
break;
}
}
}
self.expect(Token::RParen)?;
Ok(pairs)
}
pub fn parse_update_query(&mut self) -> Result<QueryExpr, ParseError> {
self.expect(Token::Update)?;
let table = self.expect_ident()?;
let target = self.parse_update_target()?;
self.expect(Token::Set)?;
let mut assignments = Vec::new();
let mut assignment_exprs = Vec::new();
let mut compound_assignment_ops = Vec::new();
loop {
let col = self.expect_column_ident()?;
let compound_op = if self.consume(&Token::Eq)? {
None
} else {
let op = match self.peek() {
Token::Plus => BinOp::Add,
Token::Dash | Token::Minus => BinOp::Sub,
Token::Star => BinOp::Mul,
Token::Slash => BinOp::Div,
Token::Percent => BinOp::Mod,
_ => {
return Err(ParseError::expected(
vec!["=", "+=", "-=", "*=", "/=", "%="],
self.peek(),
self.position(),
));
}
};
self.advance()?;
self.expect(Token::Eq)?;
Some(op)
};
let expr = self.parse_expr()?;
let folded = fold_expr_to_value(expr.clone()).ok();
assignment_exprs.push((col.clone(), expr));
compound_assignment_ops.push(compound_op);
if compound_op.is_none() {
if let Some(val) = folded {
assignments.push((col.clone(), val));
}
}
if !self.consume(&Token::Comma)? {
break;
}
}
let filter = if self.consume(&Token::Where)? {
Some(self.parse_filter()?)
} else {
None
};
let where_expr = filter.as_ref().map(filter_to_expr);
let (ttl_ms, expires_at_ms, with_metadata, _auto_embed) = self.parse_with_clauses()?;
let mut order_by = if self.consume(&Token::Order)? {
self.expect(Token::By)?;
let clauses = self.parse_order_by_list()?;
validate_update_order_by(&clauses, self.position())?;
clauses
} else {
Vec::new()
};
let limit = if self.consume(&Token::Limit)? {
Some(self.parse_integer()? as u64)
} else {
None
};
if !order_by.is_empty() && limit.is_none() {
return Err(ParseError::new(
"UPDATE ORDER BY requires LIMIT",
self.position(),
));
}
if !order_by.is_empty() && !update_order_by_mentions_rid(&order_by) {
order_by.push(OrderByClause {
field: FieldRef::TableColumn {
table: String::new(),
column: "rid".to_string(),
},
expr: None,
ascending: true,
nulls_first: false,
});
}
let returning = self.parse_returning_clause()?;
let suppress_events = if self.consume_ident_ci("SUPPRESS")? {
self.expect_ident_ci("EVENTS")?;
true
} else {
false
};
Ok(QueryExpr::Update(UpdateQuery {
table,
target,
assignment_exprs,
compound_assignment_ops,
assignments,
where_expr,
filter,
ttl_ms,
expires_at_ms,
with_metadata,
returning,
order_by,
limit,
suppress_events,
}))
}
fn parse_update_target(&mut self) -> Result<UpdateTarget, ParseError> {
if self.consume(&Token::Kv)? {
return Ok(UpdateTarget::Kv);
}
if self.consume(&Token::Rows)? {
return Ok(UpdateTarget::Rows);
}
if self.consume_ident_ci("DOCUMENTS")? {
return Ok(UpdateTarget::Documents);
}
if self.consume_ident_ci("NODES")? {
return Ok(UpdateTarget::Nodes);
}
if self.consume_ident_ci("EDGES")? {
return Ok(UpdateTarget::Edges);
}
Ok(UpdateTarget::Rows)
}
pub fn parse_delete_query(&mut self) -> Result<QueryExpr, ParseError> {
self.expect(Token::Delete)?;
self.expect(Token::From)?;
let table = self.expect_ident()?;
let filter = if self.consume(&Token::Where)? {
Some(self.parse_filter()?)
} else {
None
};
let where_expr = filter.as_ref().map(filter_to_expr);
let returning = self.parse_returning_clause()?;
let suppress_events = if self.consume_ident_ci("SUPPRESS")? {
self.expect_ident_ci("EVENTS")?;
true
} else {
false
};
Ok(QueryExpr::Delete(DeleteQuery {
table,
where_expr,
filter,
returning,
suppress_events,
}))
}
fn parse_returning_clause(&mut self) -> Result<Option<Vec<ReturningItem>>, ParseError> {
if !self.consume(&Token::Returning)? {
return Ok(None);
}
if self.consume(&Token::Star)? {
return Ok(Some(vec![ReturningItem::All]));
}
let mut items = Vec::new();
loop {
if returning_expr_start(self.peek()) {
return Err(returning_expr_not_supported(self.position()));
}
let col = self.expect_update_returning_column()?;
items.push(ReturningItem::Column(col));
if returning_expr_tail(self.peek()) {
return Err(returning_expr_not_supported(self.position()));
}
if !self.consume(&Token::Comma)? {
break;
}
}
if items.is_empty() {
return Err(ParseError::expected(
vec!["*", "column name"],
self.peek(),
self.position(),
));
}
Ok(Some(items))
}
fn expect_update_returning_column(&mut self) -> Result<String, ParseError> {
if self.consume(&Token::Weight)? {
return Ok("weight".to_string());
}
self.expect_ident_or_keyword()
}
pub fn parse_ask_query(&mut self) -> Result<QueryExpr, ParseError> {
self.parse_ask_query_with_explain(false)
}
pub fn parse_explain_ask_query(&mut self) -> Result<QueryExpr, ParseError> {
self.advance()?; if !matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("ASK")) {
return Err(ParseError::expected(
vec!["ASK"],
self.peek(),
self.position(),
));
}
self.parse_ask_query_with_explain(true)
}
fn parse_ask_query_with_explain(&mut self, explain: bool) -> Result<QueryExpr, ParseError> {
self.advance()?;
let (question, question_param) = match self.peek() {
Token::String(_) => (self.parse_string()?, None),
Token::Dollar | Token::Question => {
let index = self.parse_param_slot("ASK question")?;
(String::new(), Some(index))
}
other => {
return Err(ParseError::expected(
vec!["string", "$N", "?"],
other,
self.position(),
));
}
};
let mut provider = None;
let mut model = None;
let mut depth = None;
let mut limit = None;
let mut min_score = None;
let mut collection = None;
let mut temperature = None;
let mut seed = None;
let mut strict = true;
let mut stream = false;
let mut cache = AskCacheClause::Default;
let mut as_rql = false;
let mut execute = false;
for _ in 0..14 {
if self.consume(&Token::Using)? {
provider = Some(match &self.current.token {
Token::String(_) => self.parse_string()?,
_ => self.expect_ident()?,
});
} else if self.consume_ident_ci("MODEL")? {
model = Some(self.parse_string()?);
} else if self.consume(&Token::Depth)? {
depth = Some(self.parse_integer()? as usize);
} else if self.consume(&Token::Limit)? {
limit = Some(self.parse_integer()? as usize);
} else if self.consume(&Token::MinScore)? {
min_score = Some(self.parse_float()? as f32);
} else if self.consume(&Token::Collection)? {
collection = Some(self.expect_ident()?);
} else if self.consume_ident_ci("TEMPERATURE")? {
temperature = Some(self.parse_float()? as f32);
} else if self.consume_ident_ci("SEED")? {
seed = Some(self.parse_integer()? as u64);
} else if self.consume_ident_ci("STRICT")? {
let value = self.expect_ident_or_keyword()?;
if value.eq_ignore_ascii_case("ON") {
strict = true;
} else if value.eq_ignore_ascii_case("OFF") {
strict = false;
} else {
return Err(ParseError::new(
"Expected ON or OFF after STRICT",
self.position(),
));
}
} else if self.consume_ident_ci("STREAM")? {
stream = true;
} else if self.consume_ident_ci("CACHE")? {
if !matches!(cache, AskCacheClause::Default) {
return Err(ParseError::new(
"ASK cache clause specified more than once",
self.position(),
));
}
let ttl = self.expect_ident_or_keyword()?;
if !ttl.eq_ignore_ascii_case("TTL") {
return Err(ParseError::new("Expected TTL after CACHE", self.position()));
}
cache = AskCacheClause::CacheTtl(self.parse_string()?);
} else if self.consume_ident_ci("NOCACHE")? {
if !matches!(cache, AskCacheClause::Default) {
return Err(ParseError::new(
"ASK cache clause specified more than once",
self.position(),
));
}
cache = AskCacheClause::NoCache;
} else if self.consume(&Token::As)? {
if as_rql {
return Err(ParseError::new(
"ASK AS RQL specified more than once",
self.position(),
));
}
let output = self.expect_ident_or_keyword()?;
if !output.eq_ignore_ascii_case("RQL") {
return Err(ParseError::new(
"Expected RQL after ASK AS",
self.position(),
));
}
as_rql = true;
} else if self.consume_ident_ci("EXECUTE")? {
if execute {
return Err(ParseError::new(
"ASK EXECUTE specified more than once",
self.position(),
));
}
execute = true;
} else {
break;
}
}
Ok(QueryExpr::Ask(AskQuery {
explain,
question,
question_param,
provider,
model,
depth,
limit,
min_score,
collection,
temperature,
seed,
strict,
stream,
cache,
as_rql,
execute,
}))
}
fn parse_ident_list(&mut self) -> Result<Vec<String>, ParseError> {
let mut idents = Vec::new();
loop {
idents.push(self.expect_ident_or_keyword()?);
if !self.consume(&Token::Comma)? {
break;
}
}
Ok(idents)
}
fn parse_dml_value_list(&mut self) -> Result<Vec<Value>, ParseError> {
self.parse_dml_expr_list()?
.into_iter()
.map(fold_expr_to_value)
.collect::<Result<Vec<_>, _>>()
.map_err(|msg| ParseError::new(msg, self.position()))
}
fn parse_dml_expr_list(&mut self) -> Result<Vec<Expr>, ParseError> {
let mut values = Vec::new();
loop {
values.push(self.parse_expr()?);
if !self.consume(&Token::Comma)? {
break;
}
}
Ok(values)
}
pub(crate) fn parse_literal_value(&mut self) -> Result<Value, ParseError> {
self.enter_depth()?;
let result = self.parse_literal_value_inner();
self.exit_depth();
result
}
fn parse_literal_value_inner(&mut self) -> Result<Value, ParseError> {
if let Token::Ident(name) = self.peek().clone() {
let upper = name.to_uppercase();
if upper == "PASSWORD" || upper == "SECRET" {
self.advance()?; self.expect(Token::LParen)?;
let plaintext = self.parse_string()?;
self.expect(Token::RParen)?;
return Ok(match upper.as_str() {
"PASSWORD" => Value::Password(format!("@@plain@@{plaintext}")),
"SECRET" => Value::Secret(format!("@@plain@@{plaintext}").into_bytes()),
_ => unreachable!(),
});
}
if upper == "SECRET_REF" {
self.advance()?; self.expect(Token::LParen)?;
let store = self.expect_ident_or_keyword()?.to_ascii_lowercase();
if store != "vault" {
return Err(ParseError::expected(
vec!["vault"],
self.peek(),
self.position(),
));
}
self.expect(Token::Comma)?;
let (collection, key) =
self.parse_kv_key(reddb_types::catalog::CollectionModel::Vault)?;
self.expect(Token::RParen)?;
return Ok(secret_ref_value(&store, &collection, &key));
}
}
match self.peek().clone() {
Token::String(s) => {
let s = s.clone();
self.advance()?;
Ok(Value::text(s))
}
Token::JsonLiteral(raw) => {
self.advance()?;
let json_value = reddb_types::utils::json::parse_json(&raw).map_err(|err| {
ParseError::new(
format!("invalid JSON object literal: {:?}", err.to_string()),
self.position(),
)
})?;
json_literal_depth_check(&json_value)
.map_err(|err| ParseError::new(err, self.position()))?;
let canonical = reddb_types::serde_json::Value::from(json_value);
let bytes = reddb_types::json::to_vec(&canonical).map_err(|err| {
ParseError::new(
format!("failed to encode JSON literal: {:?}", err.to_string()),
self.position(),
)
})?;
Ok(Value::Json(bytes))
}
Token::Integer(n) => {
self.advance()?;
Ok(Value::Integer(n))
}
Token::Float(n) => {
self.advance()?;
Ok(Value::Float(n))
}
Token::True => {
self.advance()?;
Ok(Value::Boolean(true))
}
Token::False => {
self.advance()?;
Ok(Value::Boolean(false))
}
Token::Null => {
self.advance()?;
Ok(Value::Null)
}
Token::LBracket => {
self.advance()?; let mut items = Vec::new();
if !self.check(&Token::RBracket) {
loop {
items.push(self.parse_literal_value()?);
if !self.consume(&Token::Comma)? {
break;
}
}
}
self.expect(Token::RBracket)?;
let all_numeric = items
.iter()
.all(|v| matches!(v, Value::Integer(_) | Value::Float(_)));
if all_numeric && !items.is_empty() {
let floats: Vec<f32> = items
.iter()
.map(|v| match v {
Value::Float(f) => *f as f32,
Value::Integer(i) => *i as f32,
_ => 0.0,
})
.collect();
Ok(Value::Vector(floats))
} else {
let json_arr: Vec<reddb_types::json::Value> = items
.iter()
.map(|v| match v {
Value::Null => reddb_types::json::Value::Null,
Value::Boolean(b) => reddb_types::json::Value::Bool(*b),
Value::Integer(i) => reddb_types::json::Value::Number(*i as f64),
Value::Float(f) => reddb_types::json::Value::Number(*f),
Value::Text(s) => reddb_types::json::Value::String(s.to_string()),
_ => reddb_types::json::Value::Null,
})
.collect();
let json_val = reddb_types::json::Value::Array(json_arr);
let bytes = reddb_types::json::to_vec(&json_val).unwrap_or_default();
Ok(Value::Json(bytes))
}
}
Token::LBrace => {
self.advance()?; let mut map = reddb_types::json::Map::new();
if !self.check(&Token::RBrace) {
loop {
let key = match self.peek().clone() {
Token::String(s) => {
self.advance()?;
s
}
Token::Ident(s) => {
self.advance()?;
s
}
_ => self.expect_ident_or_keyword()?.to_ascii_lowercase(),
};
if !self.consume(&Token::Colon)? {
self.expect(Token::Eq)?;
}
let val = self.parse_literal_value()?;
let json_val = match val {
Value::Null => reddb_types::json::Value::Null,
Value::Boolean(b) => reddb_types::json::Value::Bool(b),
Value::Integer(i) => reddb_types::json::Value::Number(i as f64),
Value::Float(f) => reddb_types::json::Value::Number(f),
Value::Text(s) => reddb_types::json::Value::String(s.to_string()),
Value::Json(ref bytes) => reddb_types::json::from_slice(bytes)
.unwrap_or(reddb_types::json::Value::Null),
_ => reddb_types::json::Value::Null,
};
map.insert(key, json_val);
if !self.consume(&Token::Comma)? {
break;
}
}
}
self.expect(Token::RBrace)?;
let json_val = reddb_types::json::Value::Object(map);
let bytes = reddb_types::json::to_vec(&json_val).unwrap_or_default();
Ok(Value::Json(bytes))
}
ref other => Err(ParseError::expected(
vec!["string", "number", "true", "false", "null", "[", "{"],
other,
self.position(),
)),
}
}
}
fn returning_expr_start(token: &Token) -> bool {
matches!(
token,
Token::Integer(_)
| Token::Float(_)
| Token::String(_)
| Token::JsonLiteral(_)
| Token::Null
| Token::True
| Token::False
| Token::LParen
| Token::Minus
| Token::Question
| Token::Dollar
)
}
fn returning_expr_tail(token: &Token) -> bool {
matches!(
token,
Token::LParen
| Token::Plus
| Token::Minus
| Token::Star
| Token::Slash
| Token::Percent
| Token::DoublePipe
| Token::Pipe
| Token::Eq
| Token::Ne
| Token::Lt
| Token::Le
| Token::Gt
| Token::Ge
| Token::Dot
| Token::Colon
)
}
fn validate_update_order_by(
clauses: &[OrderByClause],
position: crate::lexer::Position,
) -> Result<(), ParseError> {
for clause in clauses {
if clause.expr.is_some() {
return Err(ParseError::new(
"UPDATE ORDER BY only supports top-level fields",
position,
));
}
match &clause.field {
FieldRef::TableColumn { table, column }
if table.is_empty() && !column.contains('.') => {}
_ => {
return Err(ParseError::new(
"UPDATE ORDER BY only supports top-level fields",
position,
));
}
}
}
Ok(())
}
fn update_order_by_mentions_rid(clauses: &[OrderByClause]) -> bool {
clauses.iter().any(|clause| {
matches!(
&clause.field,
FieldRef::TableColumn { table, column }
if table.is_empty() && column.eq_ignore_ascii_case("rid")
)
})
}
fn returning_expr_not_supported(position: crate::lexer::Position) -> ParseError {
ParseError::new(
"NOT_YET_SUPPORTED: RETURNING expressions are not supported yet; use RETURNING * or named columns. Track a follow-up issue for RETURNING <expr>.",
position,
)
}
fn secret_ref_value(store: &str, collection: &str, key: &str) -> Value {
let mut map = reddb_types::json::Map::new();
map.insert(
"type".to_string(),
reddb_types::json::Value::String("secret_ref".to_string()),
);
map.insert(
"store".to_string(),
reddb_types::json::Value::String(store.to_string()),
);
map.insert(
"collection".to_string(),
reddb_types::json::Value::String(collection.to_string()),
);
map.insert(
"key".to_string(),
reddb_types::json::Value::String(key.to_string()),
);
Value::Json(
reddb_types::json::to_vec(&reddb_types::json::Value::Object(map)).unwrap_or_default(),
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ast::{InsertEntityType, ReturningItem, UpdateTarget};
fn make_parser(input: &str) -> Parser<'_> {
Parser::new(input).expect("lexer")
}
fn insert(input: &str) -> InsertQuery {
let mut parser = make_parser(input);
let QueryExpr::Insert(query) = parser.parse_insert_query().expect("insert") else {
panic!("expected insert query");
};
query
}
fn update(input: &str) -> UpdateQuery {
let mut parser = make_parser(input);
let QueryExpr::Update(query) = parser.parse_update_query().expect("update") else {
panic!("expected update query");
};
query
}
fn delete(input: &str) -> DeleteQuery {
let mut parser = make_parser(input);
let QueryExpr::Delete(query) = parser.parse_delete_query().expect("delete") else {
panic!("expected delete query");
};
query
}
fn ask(input: &str) -> AskQuery {
let mut parser = make_parser(input);
let QueryExpr::Ask(query) = parser.parse_ask_query().expect("ask") else {
panic!("expected ask query");
};
query
}
#[test]
fn insert_entity_types_with_options_returning_and_suppress_events() {
let cases = [
(
"INSERT INTO items NODE (id) VALUES (1)",
InsertEntityType::Node,
),
(
"INSERT INTO items EDGE (id) VALUES (1)",
InsertEntityType::Edge,
),
(
"INSERT INTO items VECTOR (id) VALUES (1)",
InsertEntityType::Vector,
),
(
"INSERT INTO items DOCUMENT (id) VALUES (1)",
InsertEntityType::Document,
),
("INSERT INTO items KV (id) VALUES (1)", InsertEntityType::Kv),
];
for (input, expected) in cases {
assert_eq!(insert(input).entity_type, expected, "{input}");
}
let query = insert(
"INSERT INTO docs (id, body) VALUES (1, 'red'), (2, ?) \
WITH TTL 2 h WITH EXPIRES AT 999 \
WITH METADATA (source = 'test', score = 3) \
WITH AUTO EMBED (body, title) USING openai MODEL 'text-embedding-3-small' \
RETURNING * SUPPRESS EVENTS",
);
assert_eq!(query.table, "docs");
assert_eq!(query.entity_type, InsertEntityType::Row);
assert_eq!(query.columns, vec!["id", "body"]);
assert_eq!(query.values.len(), 2);
assert_eq!(query.value_exprs.len(), 2);
assert_eq!(query.ttl_ms, Some(7_200_000));
assert_eq!(query.expires_at_ms, Some(999));
assert_eq!(query.with_metadata.len(), 2);
assert_eq!(
query.returning.as_deref(),
Some([ReturningItem::All].as_slice())
);
let auto_embed = query.auto_embed.expect("auto embed");
assert_eq!(auto_embed.fields, vec!["body", "title"]);
assert_eq!(auto_embed.provider, "openai");
assert_eq!(auto_embed.model.as_deref(), Some("text-embedding-3-small"));
assert!(query.suppress_events);
}
#[test]
fn insert_rejects_metric_and_bad_with_clause() {
let mut parser = make_parser("INSERT INTO METRIC cpu.usage (value) VALUES (1)");
let err = parser
.parse_insert_query()
.expect_err("metric insert should fail");
assert!(err.to_string().contains("INSERT INTO METRIC"));
let mut parser = make_parser("INSERT INTO docs (id) VALUES (1) WITH TTL 1 fortnight");
let err = parser
.parse_insert_query()
.expect_err("bad ttl unit should fail");
assert!(err.to_string().contains("unsupported TTL unit"));
let mut parser = make_parser("INSERT INTO docs (id) VALUES (1) WITH UNKNOWN");
let err = parser
.parse_insert_query()
.expect_err("bad WITH should fail");
assert!(err.to_string().contains("expected"));
}
#[test]
fn update_targets_compound_assignments_order_limit_returning() {
let cases = [
("UPDATE docs KV SET count = 1", UpdateTarget::Kv),
("UPDATE docs ROWS SET count = 1", UpdateTarget::Rows),
(
"UPDATE docs DOCUMENTS SET count = 1",
UpdateTarget::Documents,
),
("UPDATE docs NODES SET count = 1", UpdateTarget::Nodes),
("UPDATE docs EDGES SET count = 1", UpdateTarget::Edges),
];
for (input, expected) in cases {
assert_eq!(update(input).target, expected, "{input}");
}
let query = update(
"UPDATE docs DOCUMENTS SET count += 2, title = UPPER(title) \
WHERE id = 1 WITH TTL 30 s WITH METADATA (source = 'update') \
ORDER BY updated_at DESC LIMIT 5 RETURNING weight, title SUPPRESS EVENTS",
);
assert_eq!(query.table, "docs");
assert_eq!(query.target, UpdateTarget::Documents);
assert_eq!(query.assignment_exprs.len(), 2);
assert_eq!(query.compound_assignment_ops, vec![Some(BinOp::Add), None]);
assert_eq!(query.assignments.len(), 0);
assert!(query.filter.is_some());
assert!(query.where_expr.is_some());
assert_eq!(query.ttl_ms, Some(30_000));
assert_eq!(query.with_metadata.len(), 1);
assert_eq!(query.limit, Some(5));
assert_eq!(query.order_by.len(), 2);
assert!(matches!(
&query.order_by[1].field,
FieldRef::TableColumn { column, .. } if column == "rid"
));
assert_eq!(
query.returning.as_deref(),
Some(
[
ReturningItem::Column("weight".to_string()),
ReturningItem::Column("title".to_string())
]
.as_slice()
)
);
assert!(query.suppress_events);
}
#[test]
fn update_rejects_invalid_assignment_and_order_by_forms() {
let mut parser = make_parser("UPDATE docs SET count ^= 1");
let err = parser
.parse_update_query()
.expect_err("unknown compound assignment should fail");
assert!(err.to_string().contains("expected"));
let mut parser = make_parser("UPDATE docs SET count = 1 ORDER BY updated_at");
let err = parser
.parse_update_query()
.expect_err("ORDER BY without LIMIT should fail");
assert!(err.to_string().contains("requires LIMIT"));
let mut parser = make_parser("UPDATE docs SET count = 1 ORDER BY updated_at + 1 LIMIT 1");
let err = parser
.parse_update_query()
.expect_err("ORDER BY expression should fail");
assert!(err.to_string().contains("top-level fields"));
}
#[test]
fn delete_returning_and_suppress_events() {
let query = delete("DELETE FROM docs WHERE id = 1 RETURNING id, title SUPPRESS EVENTS");
assert_eq!(query.table, "docs");
assert!(query.filter.is_some());
assert!(query.where_expr.is_some());
assert_eq!(
query.returning.as_deref(),
Some(
[
ReturningItem::Column("id".to_string()),
ReturningItem::Column("title".to_string())
]
.as_slice()
)
);
assert!(query.suppress_events);
let query = delete("DELETE FROM docs RETURNING *");
assert_eq!(
query.returning.as_deref(),
Some([ReturningItem::All].as_slice())
);
}
#[test]
fn returning_rejects_expression_forms() {
for input in [
"DELETE FROM docs RETURNING 1",
"DELETE FROM docs RETURNING UPPER(title)",
"DELETE FROM docs RETURNING title || body",
] {
let mut parser = make_parser(input);
let err = parser
.parse_delete_query()
.expect_err("RETURNING expression should fail");
assert!(err.to_string().contains("RETURNING expressions"));
}
}
#[test]
fn ask_parses_all_optional_clauses_and_cache_modes() {
let query = ask(
"ASK 'what changed?' USING 'openai' MODEL 'gpt' DEPTH 3 LIMIT 4 \
MIN_SCORE 0.7 COLLECTION docs TEMPERATURE 0.2 SEED 42 STRICT OFF \
STREAM CACHE TTL '10m'",
);
assert_eq!(query.question, "what changed?");
assert_eq!(query.provider.as_deref(), Some("openai"));
assert_eq!(query.model.as_deref(), Some("gpt"));
assert_eq!(query.depth, Some(3));
assert_eq!(query.limit, Some(4));
assert_eq!(query.min_score, Some(0.7));
assert_eq!(query.collection.as_deref(), Some("docs"));
assert_eq!(query.temperature, Some(0.2));
assert_eq!(query.seed, Some(42));
assert!(!query.strict);
assert!(query.stream);
assert_eq!(query.cache, AskCacheClause::CacheTtl("10m".to_string()));
let query = ask("ASK ? NOCACHE");
assert_eq!(query.question, "");
assert_eq!(query.question_param, Some(0));
assert_eq!(query.cache, AskCacheClause::NoCache);
}
#[test]
fn explain_ask_and_ask_error_paths() {
let mut parser = make_parser("EXPLAIN ASK $2 STRICT ON");
let QueryExpr::Ask(query) = parser.parse_explain_ask_query().expect("explain ask") else {
panic!("expected ask query");
};
assert!(query.explain);
assert_eq!(query.question_param, Some(1));
assert!(query.strict);
let mut parser = make_parser("EXPLAIN SELECT 1");
let err = parser
.parse_explain_ask_query()
.expect_err("missing ASK should fail");
assert!(err.to_string().contains("expected"));
let mut parser = make_parser("ASK 'q' STRICT MAYBE");
let err = parser
.parse_ask_query()
.expect_err("bad strict should fail");
assert!(err.to_string().contains("Expected ON or OFF"));
let mut parser = make_parser("ASK 'q' CACHE TTL '10m' NOCACHE");
let err = parser
.parse_ask_query()
.expect_err("duplicate cache should fail");
assert!(err.to_string().contains("cache clause"));
let mut parser = make_parser("ASK 'q' CACHE FOREVER '10m'");
let err = parser
.parse_ask_query()
.expect_err("bad cache ttl keyword should fail");
assert!(err.to_string().contains("Expected TTL"));
}
#[test]
fn literal_value_special_constructors_arrays_and_objects() {
let mut parser = make_parser("PASSWORD('pw')");
assert!(matches!(
parser.parse_literal_value().expect("password"),
Value::Password(secret) if secret == "@@plain@@pw"
));
let mut parser = make_parser("SECRET('pw')");
assert!(matches!(
parser.parse_literal_value().expect("secret"),
Value::Secret(bytes) if bytes == b"@@plain@@pw"
));
let mut parser = make_parser("SECRET_REF(vault, red.vault.api_key)");
let value = parser.parse_literal_value().expect("secret ref");
assert!(matches!(value, Value::Json(_)));
let mut parser = make_parser("[1, 2.5]");
assert!(matches!(
parser.parse_literal_value().expect("vector"),
Value::Vector(values) if values == vec![1.0, 2.5]
));
let mut parser = make_parser("['a', 2]");
assert!(matches!(
parser.parse_literal_value().expect("json array"),
Value::Json(_)
));
let mut parser = make_parser("{level = 'info', count: 2}");
assert!(matches!(
parser.parse_literal_value().expect("json object"),
Value::Json(_)
));
}
#[test]
fn literal_value_rejects_invalid_secret_ref_and_scalar_start() {
let mut parser = make_parser("SECRET_REF(config, red.vault.api_key)");
let err = parser
.parse_literal_value()
.expect_err("non-vault secret ref should fail");
assert!(err.to_string().contains("expected"));
let mut parser = make_parser("ORDER");
let err = parser
.parse_literal_value()
.expect_err("non literal should fail");
assert!(err.to_string().contains("expected"));
}
#[test]
fn json_depth_check_rejects_deep_literals() {
let mut deep = reddb_types::utils::json::JsonValue::Array(vec![]);
for _ in 0..JSON_LITERAL_MAX_DEPTH {
deep = reddb_types::utils::json::JsonValue::Array(vec![deep]);
}
let err = json_literal_depth_check(&deep).expect_err("depth should fail");
assert!(err.contains("JSON_LITERAL_MAX_DEPTH"));
}
}