1use crate::db::relational_db::{MutTx, RelationalDB, Tx};
2use crate::error::{DBError, PlanError};
3use anyhow::Context;
4use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
5use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
6use spacetimedb_datastore::system_tables::{StRowLevelSecurityFields, ST_ROW_LEVEL_SECURITY_ID};
7use spacetimedb_expr::check::SchemaView;
8use spacetimedb_expr::statement::compile_sql_stmt;
9use spacetimedb_lib::db::auth::StAccess;
10use spacetimedb_lib::identity::AuthCtx;
11use spacetimedb_primitives::{ColId, TableId};
12use spacetimedb_sats::{AlgebraicType, AlgebraicValue};
13use spacetimedb_schema::def::error::RelationError;
14use spacetimedb_schema::relation::{ColExpr, FieldName};
15use spacetimedb_schema::schema::{ColumnSchema, TableSchema};
16use spacetimedb_vm::errors::ErrorVm;
17use spacetimedb_vm::expr::{Expr, FieldExpr, FieldOp};
18use spacetimedb_vm::operator::{OpCmp, OpLogic, OpQuery};
19use spacetimedb_vm::ops::parse::{parse, parse_simple_enum};
20use sqlparser::ast::{
21 Assignment, BinaryOperator, Expr as SqlExpr, HiveDistributionStyle, Ident, JoinConstraint, JoinOperator,
22 ObjectName, Query, Select, SelectItem, SetExpr, Statement, TableFactor, TableWithJoins, Value, Values,
23};
24use sqlparser::dialect::PostgreSqlDialect;
25use sqlparser::parser::Parser;
26use std::ops::Deref;
27use std::sync::Arc;
28
29trait Unsupported {
33 fn unsupported(&self) -> bool;
34}
35
36impl Unsupported for bool {
37 fn unsupported(&self) -> bool {
38 *self
39 }
40}
41
42impl<T> Unsupported for Option<T> {
43 fn unsupported(&self) -> bool {
44 self.is_some()
45 }
46}
47
48impl<T> Unsupported for Vec<T> {
49 fn unsupported(&self) -> bool {
50 !self.is_empty()
51 }
52}
53
54impl Unsupported for HiveDistributionStyle {
55 fn unsupported(&self) -> bool {
56 !matches!(self, HiveDistributionStyle::NONE)
57 }
58}
59
60impl Unsupported for sqlparser::ast::GroupByExpr {
61 fn unsupported(&self) -> bool {
62 match self {
63 sqlparser::ast::GroupByExpr::All => true,
64 sqlparser::ast::GroupByExpr::Expressions(v) => v.unsupported(),
65 }
66 }
67}
68
69macro_rules! unsupported {
70 ($name:literal,$a:expr)=>{{
71 let name = stringify!($name);
72 let it = stringify!($a);
73 if $a.unsupported() {
74 return Err(PlanError::Unsupported {
75 feature: format!("Unsupported {name} with `{it}` feature."),
76
77 });
78 }
79 }};
80 ($name:literal,$($a:expr),+$(,)?)=> {{
81 $(unsupported!($name,$a);)+
82 }};
83}
84
85pub struct Table {
87 pub(crate) name: Box<str>,
88}
89
90impl Table {
91 pub fn new(name: ObjectName) -> Self {
92 Self {
93 name: name.to_string().into(),
94 }
95 }
96}
97
98#[derive(Debug)]
99pub enum Column {
100 UnnamedExpr(Expr),
102 QualifiedWildcard { table: String },
104 Wildcard,
106}
107
108#[derive(Debug, Clone)]
110pub struct Selection {
111 pub(crate) clause: FieldOp,
112}
113
114impl Selection {
115 pub fn with_cmp(op: OpQuery, lhs: FieldOp, rhs: FieldOp) -> Self {
116 let cmp = FieldOp::new(op, lhs, rhs);
117 Selection { clause: cmp }
118 }
119}
120
121#[derive(Debug)]
122pub struct OnExpr {
123 pub op: OpCmp,
124 pub lhs: FieldName,
125 pub rhs: FieldName,
126}
127
128#[derive(Debug)]
130pub enum Join {
131 Inner { rhs: Arc<TableSchema>, on: OnExpr },
132}
133
134#[derive(Debug)]
136pub struct From {
137 pub root: Arc<TableSchema>,
138 pub joins: Vec<Join>,
139}
140
141impl From {
142 pub fn new(root: Arc<TableSchema>) -> Self {
143 Self {
144 root,
145 joins: Vec::new(),
146 }
147 }
148
149 pub fn with_inner_join(mut self, rhs: Arc<TableSchema>, on: OnExpr) -> Self {
150 let on = if on.rhs.table() == self.root.table_id && self.root.get_column_by_field(on.rhs).is_some() {
153 OnExpr {
154 op: on.op.reverse(),
155 lhs: on.rhs,
156 rhs: on.lhs,
157 }
158 } else {
159 on
160 };
161
162 self.joins.push(Join::Inner { rhs, on });
163 self
164 }
165
166 pub fn iter_tables(&self) -> impl Clone + Iterator<Item = &TableSchema> {
168 [&*self.root]
169 .into_iter()
170 .chain(self.joins.iter().map(|Join::Inner { rhs, .. }| &**rhs))
171 }
172
173 pub fn table_names(&self) -> Vec<Box<str>> {
175 self.iter_tables().map(|x| x.table_name.clone()).collect()
176 }
177
178 pub(super) fn find_field(&self, f: &str) -> Result<(FieldName, &AlgebraicType), PlanError> {
182 find_field(self.iter_tables(), f)
183 }
184
185 pub(super) fn find_field_name(&self, field: FieldName) -> Option<(&str, &ColumnSchema)> {
189 self.iter_tables().find_map(|t| {
190 if t.table_id == field.table() {
191 t.get_column_by_field(field).map(|c| (&*t.table_name, c))
192 } else {
193 None
194 }
195 })
196 }
197}
198
199pub fn find_field<'a>(
212 mut tables: impl Clone + Iterator<Item = &'a TableSchema>,
213 f: &str,
214) -> Result<(FieldName, &'a AlgebraicType), PlanError> {
215 fn extract_table_field(ident: &str) -> Result<(Option<&str>, &str), RelationError> {
216 let mut iter = ident.rsplit('.');
217 let field = iter.next();
218 let table = iter.next();
219 let more = iter.next();
220 match (field, table, more) {
221 (Some(field), table, None) => Ok((table, field)),
222 _ => Err(RelationError::FieldPathInvalid(ident.to_string())),
223 }
224 }
225
226 let (f_table, f_field) = extract_table_field(f)?;
227
228 let tables2 = tables.clone();
229 let unknown_field = || {
230 let field = match f_table {
231 Some(f_table) => format!("{f_table}.{f_field}"),
232 None => f_field.into(),
233 };
234 let tables = tables2.map(|t| t.table_name.clone()).collect();
235 Err(PlanError::UnknownField { field, tables })
236 };
237
238 if let Some(f_table) = f_table {
239 return if let Some(col) = tables
242 .find(|t| &*t.table_name == f_table)
243 .and_then(|t| t.get_column_by_name(f_field))
244 {
245 Ok((FieldName::new(col.table_id, col.col_pos), &col.col_type))
246 } else {
247 unknown_field()
248 };
249 }
250
251 let mut fields = tables
254 .flat_map(|t| t.columns().iter().map(move |col| (t, col)))
255 .filter(|(_, col)| &*col.col_name == f_field);
256
257 match (fields.next(), fields.next()) {
260 (None, _) => unknown_field(),
261 (Some((_, col)), None) => Ok((FieldName::new(col.table_id, col.col_pos), &col.col_type)),
262 (Some(f1), Some(f2)) => {
263 let found = [f1, f2]
264 .into_iter()
265 .chain(fields)
266 .map(|(table, column)| format!("{0}.{1}", &table.table_name, &column.col_name))
267 .collect();
268 Err(PlanError::AmbiguousField { field: f.into(), found })
269 }
270 }
271}
272
273#[derive(Debug)]
275pub enum SqlAst {
276 Select {
277 from: From,
278 project: Box<[Column]>,
279 selection: Option<Selection>,
280 },
281 Insert {
282 table: Arc<TableSchema>,
283 columns: Box<[ColId]>,
284 values: Box<[Box<[ColExpr]>]>,
285 },
286 Update {
287 table: Arc<TableSchema>,
288 assignments: IntMap<ColId, ColExpr>,
289 selection: Option<Selection>,
290 },
291 Delete {
292 table: Arc<TableSchema>,
293 selection: Option<Selection>,
294 },
295 SetVar {
296 name: String,
297 literal: String,
298 },
299 ReadVar {
300 name: String,
301 },
302}
303
304fn extract_field<'a>(
305 tables: impl Clone + Iterator<Item = &'a TableSchema>,
306 of: &SqlExpr,
307) -> Result<Option<&'a AlgebraicType>, PlanError> {
308 match of {
309 SqlExpr::Identifier(x) => find_field(tables, &x.value).map(|(_, ty)| Some(ty)),
310 SqlExpr::CompoundIdentifier(ident) => {
311 let col_name = compound_ident(ident);
312 find_field(tables, &col_name).map(|(_, ty)| Some(ty))
313 }
314 _ => Ok(None),
315 }
316}
317
318fn infer_number(field: Option<&AlgebraicType>, value: &str, is_long: bool) -> Result<AlgebraicValue, ErrorVm> {
323 match field {
324 None => {
325 let ty = if value.contains('.') {
326 if is_long {
327 AlgebraicType::F64
328 } else {
329 AlgebraicType::F32
330 }
331 } else if is_long {
332 AlgebraicType::I64
333 } else {
334 AlgebraicType::I32
335 };
336 parse(value, &ty)
337 }
338 Some(f) => parse(value, f),
339 }
340}
341
342fn infer_str_or_enum(field: Option<&AlgebraicType>, value: String) -> Result<AlgebraicValue, ErrorVm> {
346 if let Some(sum) = field.and_then(|x| x.as_sum()) {
347 parse_simple_enum(sum, &value)
348 } else {
349 Ok(AlgebraicValue::String(value.into()))
350 }
351}
352
353fn compile_expr_value<'a>(
355 tables: impl Clone + Iterator<Item = &'a TableSchema>,
356 field: Option<&'a AlgebraicType>,
357 of: SqlExpr,
358) -> Result<FieldOp, PlanError> {
359 Ok(FieldOp::Field(match of {
360 SqlExpr::Identifier(name) => FieldExpr::Name(find_field(tables, &name.value)?.0),
361 SqlExpr::CompoundIdentifier(ident) => {
362 let col_name = compound_ident(&ident);
363 FieldExpr::Name(find_field(tables, &col_name)?.0)
364 }
365 SqlExpr::Value(x) => FieldExpr::Value(match x {
366 Value::Number(value, is_long) => infer_number(field, &value, is_long)?,
367 Value::SingleQuotedString(s) => infer_str_or_enum(field, s)?,
368 Value::DoubleQuotedString(s) => AlgebraicValue::String(s.into()),
369 Value::HexStringLiteral(s) => infer_number(field, &s, false)?,
370 Value::Boolean(x) => AlgebraicValue::Bool(x),
371 Value::Null => AlgebraicValue::OptionNone(),
372 x => {
373 return Err(PlanError::Unsupported {
374 feature: format!("Unsupported value: {x}."),
375 });
376 }
377 }),
378 SqlExpr::BinaryOp { left, op, right } => {
379 let (op, lhs, rhs) = compile_bin_op(tables, op, left, right)?;
380
381 return Ok(FieldOp::new(op, lhs, rhs));
382 }
383 SqlExpr::Nested(x) => {
384 return compile_expr_value(tables, field, *x);
385 }
386 x => {
387 return Err(PlanError::Unsupported {
388 feature: format!("Unsupported expression: {x}"),
389 });
390 }
391 }))
392}
393
394fn compile_expr_field(table: &From, field: Option<&AlgebraicType>, of: SqlExpr) -> Result<FieldExpr, PlanError> {
395 match compile_expr_value(table.iter_tables(), field, of)? {
396 FieldOp::Field(field) => Ok(field),
397 x => Err(PlanError::Unsupported {
398 feature: format!("Complex expression {x} on insert..."),
399 }),
400 }
401}
402
403fn compile_table_factor(table: TableFactor) -> Result<Table, PlanError> {
405 match table {
406 TableFactor::Table {
407 name,
408 alias,
409 args,
410 with_hints,
411 version,
412 partitions,
413 } => {
414 unsupported!("TableFactor", alias, args, with_hints, version, partitions);
415
416 Ok(Table::new(name))
417 }
418 x => Err(PlanError::Unsupported {
419 feature: format!("TableFactor with syntax {x:?} not supported"),
420 }),
421 }
422}
423
424fn compile_bin_op<'a>(
426 tables: impl Clone + Iterator<Item = &'a TableSchema>,
427 op: BinaryOperator,
428 lhs: Box<sqlparser::ast::Expr>,
429 rhs: Box<sqlparser::ast::Expr>,
430) -> Result<(OpQuery, FieldOp, FieldOp), PlanError> {
431 let op: OpQuery = match op {
432 BinaryOperator::Gt => OpCmp::Gt.into(),
433 BinaryOperator::Lt => OpCmp::Lt.into(),
434 BinaryOperator::GtEq => OpCmp::GtEq.into(),
435 BinaryOperator::LtEq => OpCmp::LtEq.into(),
436 BinaryOperator::Eq => OpCmp::Eq.into(),
437 BinaryOperator::NotEq => OpCmp::NotEq.into(),
438 BinaryOperator::And => OpLogic::And.into(),
439 BinaryOperator::Or => OpLogic::Or.into(),
440 x => {
441 return Err(PlanError::Unsupported {
442 feature: format!("BinaryOperator not supported in WHERE: {x}."),
443 });
444 }
445 };
446
447 let field_lhs = extract_field(tables.clone(), &lhs)?;
448 let field_rhs = extract_field(tables.clone(), &rhs)?;
449 let lhs = compile_expr_value(tables.clone(), field_rhs, *lhs)?;
452 let rhs = compile_expr_value(tables, field_lhs, *rhs)?;
453
454 Ok((op, lhs, rhs))
455}
456
457fn _compile_where(table: &From, filter: SqlExpr) -> Result<Option<Selection>, PlanError> {
458 match filter {
459 SqlExpr::BinaryOp { left, op, right } => {
460 let (op, lhs, rhs) = compile_bin_op(table.iter_tables(), op, left, right)?;
461
462 Ok(Some(Selection::with_cmp(op, lhs, rhs)))
463 }
464 SqlExpr::Nested(x) => _compile_where(table, *x),
465 x => Err(PlanError::Unsupported {
466 feature: format!("Unsupported in WHERE: {x}."),
467 }),
468 }
469}
470
471fn compile_where(table: &From, filter: Option<SqlExpr>) -> Result<Option<Selection>, PlanError> {
473 if let Some(filter) = filter {
474 _compile_where(table, filter)
475 } else {
476 Ok(None)
477 }
478}
479
480pub struct SchemaViewer<'a, T> {
481 tx: &'a T,
482 auth: &'a AuthCtx,
483}
484
485impl<T> Deref for SchemaViewer<'_, T> {
486 type Target = T;
487
488 fn deref(&self) -> &Self::Target {
489 self.tx
490 }
491}
492
493impl<T: StateView> SchemaView for SchemaViewer<'_, T> {
494 fn table_id(&self, name: &str) -> Option<TableId> {
495 let AuthCtx { owner, caller } = self.auth;
496 self.tx
498 .table_id_from_name(name)
499 .ok()
500 .flatten()
501 .and_then(|table_id| self.schema_for_table(table_id))
502 .filter(|schema| schema.table_access == StAccess::Public || caller == owner)
503 .map(|schema| schema.table_id)
504 }
505
506 fn schema_for_table(&self, table_id: TableId) -> Option<Arc<TableSchema>> {
507 let AuthCtx { owner, caller } = self.auth;
508 self.tx
509 .get_schema(table_id)
510 .filter(|schema| schema.table_access == StAccess::Public || caller == owner)
511 .cloned()
512 }
513
514 fn rls_rules_for_table(&self, table_id: TableId) -> anyhow::Result<Vec<Box<str>>> {
515 self.tx
516 .iter_by_col_eq(
517 ST_ROW_LEVEL_SECURITY_ID,
518 StRowLevelSecurityFields::TableId,
519 &AlgebraicValue::from(table_id),
520 )?
521 .map(|row| {
522 row.read_col::<AlgebraicValue>(StRowLevelSecurityFields::Sql)
523 .with_context(|| {
524 format!(
525 "Failed to read value from the `{}` column of `{}` for table_id `{}`",
526 "sql", "st_row_level_security", table_id
527 )
528 })
529 .and_then(|sql| {
530 sql.into_string().map_err(|_| {
531 anyhow::anyhow!(format!(
532 "Failed to read value from the `{}` column of `{}` for table_id `{}`",
533 "sql", "st_row_level_security", table_id
534 ))
535 })
536 })
537 })
538 .collect::<anyhow::Result<_>>()
539 }
540}
541
542impl<'a, T> SchemaViewer<'a, T> {
543 pub fn new(tx: &'a T, auth: &'a AuthCtx) -> Self {
544 Self { tx, auth }
545 }
546}
547
548pub trait TableSchemaView {
549 fn find_table(&self, db: &RelationalDB, t: Table) -> Result<Arc<TableSchema>, PlanError>;
550}
551
552impl TableSchemaView for Tx {
553 fn find_table(&self, db: &RelationalDB, t: Table) -> Result<Arc<TableSchema>, PlanError> {
554 let table_id = db
555 .table_id_from_name(self, &t.name)?
556 .ok_or(PlanError::UnknownTable { table: t.name.clone() })?;
557 if !db.table_id_exists(self, &table_id) {
558 return Err(PlanError::UnknownTable { table: t.name });
559 }
560 db.schema_for_table(self, table_id)
561 .map_err(move |e| PlanError::DatabaseInternal(Box::new(e)))
562 }
563}
564
565impl TableSchemaView for MutTx {
566 fn find_table(&self, db: &RelationalDB, t: Table) -> Result<Arc<TableSchema>, PlanError> {
567 let table_id = db
568 .table_id_from_name_mut(self, &t.name)?
569 .ok_or(PlanError::UnknownTable { table: t.name.clone() })?;
570 if !db.table_id_exists_mut(self, &table_id) {
571 return Err(PlanError::UnknownTable { table: t.name });
572 }
573 db.schema_for_table_mut(self, table_id)
574 .map_err(|e| PlanError::DatabaseInternal(Box::new(e)))
575 }
576}
577
578fn compile_from<T: TableSchemaView + StateView>(
580 db: &RelationalDB,
581 tx: &T,
582 from: &[TableWithJoins],
583) -> Result<From, PlanError> {
584 if from.len() > 1 {
585 return Err(PlanError::Unsupported {
586 feature: "Multiple tables in `FROM`.".into(),
587 });
588 }
589
590 let root_table = match from.first() {
591 Some(root_table) => root_table,
592 None => {
593 return Err(PlanError::Unstructured("Missing `FROM` expression.".into()));
594 }
595 };
596
597 let t = compile_table_factor(root_table.relation.clone())?;
598 let base = tx.find_table(db, t)?;
599 let mut base = From::new(base);
600
601 for join in &root_table.joins {
602 match &join.join_operator {
603 JoinOperator::Inner(constraint) => {
604 let t = compile_table_factor(join.relation.clone())?;
605 let join = tx.find_table(db, t)?;
606
607 match constraint {
608 JoinConstraint::On(x) => {
609 let tables = base.iter_tables().chain([&*join]);
610 let expr = compile_expr_value(tables, None, x.clone())?;
611 match expr {
612 FieldOp::Field(_) => {}
613 FieldOp::Cmp { op, lhs, rhs } => {
614 let op = match op {
615 OpQuery::Cmp(op) => op,
616 OpQuery::Logic(op) => {
617 return Err(PlanError::Unsupported {
618 feature: format!("Can't use operator {op} on JOIN clause"),
619 });
620 }
621 };
622 let (lhs, rhs) = match (*lhs, *rhs) {
623 (FieldOp::Field(FieldExpr::Name(lhs)), FieldOp::Field(FieldExpr::Name(rhs))) => {
624 (lhs, rhs)
625 }
626 (lhs, rhs) => {
627 return Err(PlanError::Unsupported {
628 feature: format!(
629 "Can't compare non-field expressions {lhs} and {rhs} in JOIN clause"
630 ),
631 });
632 }
633 };
634
635 base = base.with_inner_join(join, OnExpr { op, lhs, rhs })
636 }
637 }
638 }
639 x => {
640 return Err(PlanError::Unsupported {
641 feature: format!("JOIN constrain {x:?} is not valid, can be only on the form Table.Field [Cmp] Table.Field"),
642 });
643 }
644 }
645 }
646 x => {
647 return Err(PlanError::Unsupported {
648 feature: format!("Unsupported JOIN operator: `{x:?}`"),
649 });
650 }
651 }
652 }
653
654 Ok(base)
655}
656
657fn compound_ident(ident: &[Ident]) -> String {
658 ident.iter().map(ToString::to_string).collect::<Vec<_>>().join(".")
659}
660
661fn compile_select_item(from: &From, select_item: SelectItem) -> Result<Column, PlanError> {
662 match select_item {
663 SelectItem::UnnamedExpr(expr) => match expr {
664 sqlparser::ast::Expr::Identifier(ident) => {
665 let col_name = ident.to_string();
666
667 Ok(Column::UnnamedExpr(Expr::Ident(col_name)))
668 }
669 sqlparser::ast::Expr::CompoundIdentifier(ident) => {
670 let col_name = compound_ident(&ident);
671
672 Ok(Column::UnnamedExpr(Expr::Ident(col_name)))
673 }
674 sqlparser::ast::Expr::Value(_) => {
675 let value = compile_expr_value(from.iter_tables(), None, expr)?;
676 match value {
677 FieldOp::Field(value) => match value {
678 FieldExpr::Name(_) => Err(PlanError::Unsupported {
679 feature: "Should not be an identifier in Expr::Value".to_string(),
680 }),
681 FieldExpr::Value(x) => Ok(Column::UnnamedExpr(Expr::Value(x))),
682 },
683 x => Err(PlanError::Unsupported {
684 feature: format!("Should not be an {x} in Expr::Value"),
685 }),
686 }
687 }
688 sqlparser::ast::Expr::Nested(x) => compile_select_item(from, SelectItem::UnnamedExpr(*x)),
689 _ => Err(PlanError::Unsupported {
690 feature: "Only columns names & scalars are supported.".into(),
691 }),
692 },
693 SelectItem::ExprWithAlias { expr: _, alias: _ } => Err(PlanError::Unsupported {
694 feature: "ExprWithAlias".into(),
695 }),
696 SelectItem::QualifiedWildcard(ident, _) => Ok(Column::QualifiedWildcard {
697 table: ident.to_string(),
698 }),
699 SelectItem::Wildcard(_) => Ok(Column::Wildcard),
700 }
701}
702
703fn compile_select<T: TableSchemaView + StateView>(
705 db: &RelationalDB,
706 tx: &T,
707 select: Select,
708) -> Result<SqlAst, PlanError> {
709 let from = compile_from(db, tx, &select.from)?;
710
711 let mut project = Vec::with_capacity(select.projection.len());
713 for select_item in select.projection {
714 project.push(compile_select_item(&from, select_item)?);
715 }
716 let project = project.into();
717
718 let selection = compile_where(&from, select.selection)?;
719
720 Ok(SqlAst::Select {
721 from,
722 project,
723 selection,
724 })
725}
726
727fn compile_query<T: TableSchemaView + StateView>(db: &RelationalDB, tx: &T, query: Query) -> Result<SqlAst, PlanError> {
729 unsupported!(
730 "SELECT",
731 query.order_by,
732 query.fetch,
733 query.limit,
734 query.offset,
735 query.locks,
736 query.with
737 );
738
739 match *query.body {
740 SetExpr::Select(select) => {
741 unsupported!(
742 "SELECT",
743 select.distinct,
744 select.top,
745 select.into,
746 select.lateral_views,
747 select.group_by,
748 select.having,
749 select.sort_by
750 );
751
752 compile_select(db, tx, *select)
753 }
754 SetExpr::Query(_) => Err(PlanError::Unsupported {
755 feature: "Query".into(),
756 }),
757 SetExpr::SetOperation {
758 op: _,
759 set_quantifier: _,
760 left: _,
761 right: _,
762 } => Err(PlanError::Unsupported {
763 feature: "SetOperation".into(),
764 }),
765 SetExpr::Values(_) => Err(PlanError::Unsupported {
766 feature: "Values".into(),
767 }),
768 SetExpr::Insert(_) => Err(PlanError::Unsupported {
769 feature: "SetExpr::Insert".into(),
770 }),
771 SetExpr::Update(_) => Err(PlanError::Unsupported {
772 feature: "SetExpr::Update".into(),
773 }),
774 SetExpr::Table(_) => Err(PlanError::Unsupported {
775 feature: "SetExpr::Table".into(),
776 }),
777 }
778}
779
780fn compile_insert<T: TableSchemaView + StateView>(
782 db: &RelationalDB,
783 tx: &T,
784 table_name: ObjectName,
785 columns: Vec<Ident>,
786 data: &Values,
787) -> Result<SqlAst, PlanError> {
788 let table = tx.find_table(db, Table::new(table_name))?;
789
790 let table = From::new(table);
791
792 let columns = columns
793 .into_iter()
794 .map(|x| {
795 table
796 .find_field(&format!("{}.{}", &table.root.table_name, x))
797 .map(|(f, _)| f.col)
798 })
799 .collect::<Result<Box<[_]>, _>>()?;
800
801 let mut values = Vec::with_capacity(data.rows.len());
802 for x in &data.rows {
803 let mut row = Vec::with_capacity(x.len());
804 for (pos, v) in x.iter().enumerate() {
805 let field_ty = table.root.get_column(pos).map(|col| &col.col_type);
806 row.push(compile_expr_field(&table, field_ty, v.clone())?.strip_table());
807 }
808 values.push(row.into());
809 }
810 let values = values.into();
811
812 Ok(SqlAst::Insert {
813 table: table.root,
814 columns,
815 values,
816 })
817}
818
819fn compile_update<T: TableSchemaView + StateView>(
821 db: &RelationalDB,
822 tx: &T,
823 table: Table,
824 assignments: Vec<Assignment>,
825 selection: Option<SqlExpr>,
826) -> Result<SqlAst, PlanError> {
827 let table = From::new(tx.find_table(db, table)?);
828 let selection = compile_where(&table, selection)?;
829
830 let mut assigns = IntMap::with_capacity(assignments.len());
831 for col in assignments {
832 let name: String = col.id.iter().map(|x| x.to_string()).collect();
833 let (field_name, field_ty) = table.find_field(&name)?;
834 let col_id = field_name.col;
835
836 let value = compile_expr_field(&table, Some(field_ty), col.value)?.strip_table();
837 assigns.insert(col_id, value);
838 }
839
840 Ok(SqlAst::Update {
841 table: table.root,
842 assignments: assigns,
843 selection,
844 })
845}
846
847fn compile_delete<T: TableSchemaView + StateView>(
849 db: &RelationalDB,
850 tx: &T,
851 table: Table,
852 selection: Option<SqlExpr>,
853) -> Result<SqlAst, PlanError> {
854 let table = From::new(tx.find_table(db, table)?);
855 let selection = compile_where(&table, selection)?;
856
857 Ok(SqlAst::Delete {
858 table: table.root,
859 selection,
860 })
861}
862
863fn compile_set_config(name: ObjectName, value: Vec<SqlExpr>) -> Result<SqlAst, PlanError> {
865 let name = name.to_string();
866
867 let value = match value.as_slice() {
868 [first] => first.clone(),
869 _ => {
870 return Err(PlanError::Unsupported {
871 feature: format!("Invalid value for config: {name} => {value:?}."),
872 });
873 }
874 };
875
876 let literal = match value {
877 SqlExpr::Value(x) => match x {
878 Value::Number(value, _) => value,
879 x => {
880 return Err(PlanError::Unsupported {
881 feature: format!("Unsupported value for config: {x}."),
882 });
883 }
884 },
885 x => {
886 return Err(PlanError::Unsupported {
887 feature: format!("Unsupported expression for config: {x}"),
888 });
889 }
890 };
891
892 Ok(SqlAst::SetVar { name, literal })
893}
894
895fn compile_read_config(name: Vec<Ident>) -> Result<SqlAst, PlanError> {
897 let name = match name.as_slice() {
898 [first] => first.to_string(),
899 _ => {
900 return Err(PlanError::Unsupported {
901 feature: format!("Invalid name for config: {name:?}"),
902 });
903 }
904 };
905 Ok(SqlAst::ReadVar { name })
906}
907
908fn compile_statement<T: TableSchemaView + StateView>(
910 db: &RelationalDB,
911 tx: &T,
912 statement: Statement,
913) -> Result<SqlAst, PlanError> {
914 match statement {
915 Statement::Query(query) => Ok(compile_query(db, tx, *query)?),
916 Statement::Insert {
917 or,
918 into,
919 table_name,
920 columns,
921 overwrite,
922 source,
923 partitioned,
924 after_columns,
925 table,
926 on,
927 returning,
928 } => {
929 unsupported!(
930 "INSERT",
931 or,
932 overwrite,
933 partitioned,
934 after_columns,
935 table,
936 on,
937 returning
938 );
939 if into {
940 let values = match &*source.body {
941 SetExpr::Values(values) => values,
942 _ => {
943 return Err(PlanError::Unsupported {
944 feature: "Insert WITHOUT values".into(),
945 });
946 }
947 };
948
949 return compile_insert(db, tx, table_name, columns, values);
950 };
951
952 Err(PlanError::Unsupported {
953 feature: "INSERT without INTO".into(),
954 })
955 }
956 Statement::Update {
957 table,
958 assignments,
959 from,
960 selection,
961 returning,
962 } => {
963 unsupported!("UPDATE", from, returning);
964
965 let table_name = compile_table_factor(table.relation)?;
966 compile_update(db, tx, table_name, assignments, selection)
967 }
968 Statement::Delete {
969 tables,
970 from,
971 using,
972 selection,
973 returning,
974 } => {
975 unsupported!("DELETE", using, returning, tables);
976 if from.len() != 1 {
977 unsupported!("DELETE (multiple tables)", tables);
978 }
979
980 let table = from.first().unwrap().clone();
981 let table_name = compile_table_factor(table.relation)?;
982 compile_delete(db, tx, table_name, selection)
983 }
984 Statement::SetVariable {
985 local,
986 hivevar,
987 variable,
988 value,
989 } => {
990 unsupported!("SET", local, hivevar);
991 compile_set_config(variable, value)
992 }
993 Statement::ShowVariable { variable } => compile_read_config(variable),
994 x => Err(PlanError::Unsupported {
995 feature: format!("Syntax {x}"),
996 }),
997 }
998}
999
1000pub(crate) fn compile_to_ast<T: TableSchemaView + StateView>(
1002 db: &RelationalDB,
1003 auth: &AuthCtx,
1004 tx: &T,
1005 sql_text: &str,
1006) -> Result<Vec<SqlAst>, DBError> {
1007 compile_sql_stmt(sql_text, &SchemaViewer::new(tx, auth), auth)?;
1010
1011 let dialect = PostgreSqlDialect {};
1012 let ast = Parser::parse_sql(&dialect, sql_text).map_err(|error| DBError::SqlParser {
1013 sql: sql_text.to_string(),
1014 error,
1015 })?;
1016
1017 let mut results = Vec::new();
1018 for statement in ast {
1019 let plan_result = compile_statement(db, tx, statement);
1020 let query = match plan_result {
1021 Ok(plan) => plan,
1022 Err(error) => {
1023 return Err(DBError::Plan {
1024 sql: sql_text.to_string(),
1025 error,
1026 });
1027 }
1028 };
1029 results.push(query);
1030 }
1031 Ok(results)
1032}