use std::time::Duration;
use reifydb_core::sort::SortDirection;
use reifydb_type::{
error::{AstErrorKind, Error, TypeError},
fragment::Fragment,
};
use crate::{
Result,
ast::{
ast::{
AstColumnProperty, AstColumnPropertyEntry, AstColumnPropertyKind, AstColumnToCreate, AstCreate,
AstCreateColumnProperty, AstCreateDeferredView, AstCreateDictionary, AstCreateEvent,
AstCreateHandler, AstCreateMigration, AstCreateNamespace, AstCreatePrimaryKey,
AstCreateProcedure, AstCreateRemoteNamespace, AstCreateRingBuffer, AstCreateSeries,
AstCreateSubscription, AstCreateSumType, AstCreateTable, AstCreateTag, AstCreateTest,
AstCreateTransactionalView, AstIndexColumn, AstPolicyTargetType, AstPrimaryKey,
AstProcedureParam, AstRowTtl, AstStatement, AstTimestampPrecision, AstType, AstVariant,
AstViewStorageKind,
},
identifier::{
MaybeQualifiedDeferredViewIdentifier, MaybeQualifiedDictionaryIdentifier,
MaybeQualifiedNamespaceIdentifier, MaybeQualifiedProcedureIdentifier,
MaybeQualifiedRingBufferIdentifier, MaybeQualifiedSeriesIdentifier,
MaybeQualifiedSumTypeIdentifier, MaybeQualifiedTableIdentifier, MaybeQualifiedTestIdentifier,
MaybeQualifiedTransactionalViewIdentifier,
},
parse::{Parser, Precedence},
},
bump::BumpBox,
plan::logical::Compiler,
token::{
keyword::{
Keyword,
Keyword::{
Create, Deferred, Dictionary, Exists, For, If, Namespace, Remote, Replace, Ringbuffer,
Series, Subscription, Table, Tag, Test, Transactional, View,
},
},
operator::{
Operator,
Operator::{Colon, Not, Or},
},
separator::{Separator, Separator::Comma},
token::{Literal, Token, TokenKind},
},
};
impl<'bump> Parser<'bump> {
pub(crate) fn parse_create(&mut self) -> Result<AstCreate<'bump>> {
let token = self.consume_keyword(Create)?;
let or_replace = if (self.consume_if(TokenKind::Operator(Or))?).is_some() {
self.consume_keyword(Replace)?;
true
} else {
false
};
if or_replace {
let fragment = self.current()?.fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "FLOW after CREATE OR REPLACE".to_string(),
},
message: format!(
"Unexpected token: expected {}, got {}",
"FLOW after CREATE OR REPLACE",
fragment.text()
),
fragment,
}));
}
if (self.consume_if(TokenKind::Keyword(Remote))?).is_some() {
self.consume_keyword(Namespace)?;
return self.parse_remote_namespace(token);
}
if (self.consume_if(TokenKind::Keyword(Namespace))?).is_some() {
if (self.consume_if(TokenKind::Keyword(Keyword::Policy))?).is_some() {
return self.parse_create_policy(token, AstPolicyTargetType::Namespace);
}
return self.parse_namespace(token);
}
if (self.consume_if(TokenKind::Keyword(View))?).is_some() {
if (self.consume_if(TokenKind::Keyword(Keyword::Policy))?).is_some() {
return self.parse_create_policy(token, AstPolicyTargetType::View);
}
return self.parse_transactional_view(token);
}
if (self.consume_if(TokenKind::Keyword(Deferred))?).is_some() {
if (self.consume_if(TokenKind::Keyword(Ringbuffer))?).is_some() {
self.consume_keyword(View)?;
return self.parse_deferred_view_with_storage(token, ViewStorageKindHint::RingBuffer);
}
if (self.consume_if(TokenKind::Keyword(Series))?).is_some() {
self.consume_keyword(View)?;
return self.parse_deferred_view_with_storage(token, ViewStorageKindHint::Series);
}
if (self.consume_if(TokenKind::Keyword(View))?).is_some() {
return self.parse_deferred_view(token);
}
unimplemented!()
}
if (self.consume_if(TokenKind::Keyword(Transactional))?).is_some() {
if (self.consume_if(TokenKind::Keyword(Ringbuffer))?).is_some() {
self.consume_keyword(View)?;
return self
.parse_transactional_view_with_storage(token, ViewStorageKindHint::RingBuffer);
}
if (self.consume_if(TokenKind::Keyword(Series))?).is_some() {
self.consume_keyword(View)?;
return self.parse_transactional_view_with_storage(token, ViewStorageKindHint::Series);
}
if (self.consume_if(TokenKind::Keyword(View))?).is_some() {
return self.parse_transactional_view(token);
}
unimplemented!()
}
if (self.consume_if(TokenKind::Keyword(Table))?).is_some() {
if (self.consume_if(TokenKind::Keyword(Keyword::Policy))?).is_some() {
return self.parse_create_policy(token, AstPolicyTargetType::Table);
}
return self.parse_table(token);
}
if (self.consume_if(TokenKind::Keyword(Ringbuffer))?).is_some() {
if (self.consume_if(TokenKind::Keyword(Keyword::Policy))?).is_some() {
return self.parse_create_policy(token, AstPolicyTargetType::RingBuffer);
}
return self.parse_ringbuffer(token);
}
if (self.consume_if(TokenKind::Keyword(Dictionary))?).is_some() {
if (self.consume_if(TokenKind::Keyword(Keyword::Policy))?).is_some() {
return self.parse_create_policy(token, AstPolicyTargetType::Dictionary);
}
return self.parse_dictionary(token);
}
if (self.consume_if(TokenKind::Keyword(Keyword::Enum))?).is_some() {
return self.parse_enum(token);
}
if (self.consume_if(TokenKind::Keyword(Series))?).is_some() {
if (self.consume_if(TokenKind::Keyword(Keyword::Policy))?).is_some() {
return self.parse_create_policy(token, AstPolicyTargetType::Series);
}
return self.parse_series(token);
}
if (self.consume_if(TokenKind::Keyword(Subscription))?).is_some() {
if (self.consume_if(TokenKind::Keyword(Keyword::Policy))?).is_some() {
return self.parse_create_policy(token, AstPolicyTargetType::Subscription);
}
return self.parse_subscription(token);
}
if (self.consume_if(TokenKind::Keyword(Keyword::Primary))?).is_some() {
self.consume_keyword(Keyword::Key)?;
return self.parse_create_primary_key(token);
}
if (self.consume_if(TokenKind::Keyword(Keyword::Column))?).is_some() {
self.consume_keyword(Keyword::Property)?;
return self.parse_create_column_property(token);
}
if (self.consume_if(TokenKind::Keyword(Keyword::Procedure))?).is_some() {
if (self.consume_if(TokenKind::Keyword(Keyword::Policy))?).is_some() {
return self.parse_create_policy(token, AstPolicyTargetType::Procedure);
}
return self.parse_procedure(token);
}
if (self.consume_if(TokenKind::Keyword(Test))?).is_some() {
if (self.consume_if(TokenKind::Keyword(Keyword::Procedure))?).is_some() {
return self.parse_test_procedure(token);
}
return self.parse_test(token);
}
if (self.consume_if(TokenKind::Keyword(Keyword::Event))?).is_some() {
return self.parse_event(token);
}
if (self.consume_if(TokenKind::Keyword(Tag))?).is_some() {
return self.parse_tag(token);
}
if (self.consume_if(TokenKind::Keyword(Keyword::Handler))?).is_some() {
return self.parse_handler(token);
}
if (self.consume_if(TokenKind::Keyword(Keyword::Authentication))?).is_some() {
return self.parse_create_authentication(token);
}
if (self.consume_if(TokenKind::Keyword(Keyword::User))?).is_some() {
return self.parse_create_identity(token);
}
if (self.consume_if(TokenKind::Keyword(Keyword::Role))?).is_some() {
return self.parse_create_role(token);
}
if (self.consume_if(TokenKind::Keyword(Keyword::Session))?).is_some() {
self.consume_keyword(Keyword::Policy)?;
return self.parse_create_policy(token, AstPolicyTargetType::Session);
}
if (self.consume_if(TokenKind::Keyword(Keyword::Feature))?).is_some() {
self.consume_keyword(Keyword::Policy)?;
return self.parse_create_policy(token, AstPolicyTargetType::Feature);
}
if (self.consume_if(TokenKind::Keyword(Keyword::Function))?).is_some() {
self.consume_keyword(Keyword::Policy)?;
return self.parse_create_policy(token, AstPolicyTargetType::Function);
}
if (self.consume_if(TokenKind::Keyword(Keyword::Migration))?).is_some() {
return self.parse_migration(token);
}
if (self.consume_if(TokenKind::Keyword(Keyword::Source))?).is_some() {
return self.parse_source(token);
}
if (self.consume_if(TokenKind::Keyword(Keyword::Sink))?).is_some() {
return self.parse_sink(token);
}
if self.peek_is_index_creation()? {
return self.parse_create_index(token);
}
unimplemented!();
}
fn parse_procedure(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
let mut segments = self.parse_double_colon_separated_identifiers()?;
let name = segments.pop().unwrap().into_fragment();
let namespace: Vec<_> = segments.into_iter().map(|s| s.into_fragment()).collect();
let proc_ident = MaybeQualifiedProcedureIdentifier::new(name).with_namespace(namespace);
let params = if self.current()?.is_operator(Operator::OpenCurly) {
self.parse_procedure_params()?
} else {
Vec::new()
};
self.consume_operator(Operator::As)?;
self.consume_operator(Operator::OpenCurly)?;
let body_start_pos = self.position;
let mut body = Vec::new();
loop {
self.skip_new_line()?;
if self.is_eof() || self.current()?.kind == TokenKind::Operator(Operator::CloseCurly) {
break;
}
let node = self.parse_node(Precedence::None)?;
body.push(node);
self.consume_if(TokenKind::Separator(Separator::NewLine))?;
self.consume_if(TokenKind::Separator(Separator::Semicolon))?;
}
let body_end_pos = self.position;
let body_source = if body_start_pos < body_end_pos {
let start = self.tokens[body_start_pos].fragment.offset();
let end = self.tokens[body_end_pos - 1].fragment.source_end();
self.source[start..end].trim().to_string()
} else {
String::new()
};
self.consume_operator(Operator::CloseCurly)?;
Ok(AstCreate::Procedure(AstCreateProcedure {
token,
name: proc_ident,
params,
body,
body_source,
is_test: false,
}))
}
fn parse_test_procedure(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
let mut segments = self.parse_double_colon_separated_identifiers()?;
let name = segments.pop().unwrap().into_fragment();
let namespace: Vec<_> = segments.into_iter().map(|s| s.into_fragment()).collect();
let proc_ident = MaybeQualifiedProcedureIdentifier::new(name).with_namespace(namespace);
let params = if self.current()?.is_operator(Operator::OpenCurly) {
self.parse_procedure_params()?
} else {
Vec::new()
};
self.consume_operator(Operator::As)?;
self.consume_operator(Operator::OpenCurly)?;
let body_start_pos = self.position;
let mut body = Vec::new();
loop {
self.skip_new_line()?;
if self.is_eof() || self.current()?.kind == TokenKind::Operator(Operator::CloseCurly) {
break;
}
let node = self.parse_node(Precedence::None)?;
body.push(node);
self.consume_if(TokenKind::Separator(Separator::NewLine))?;
self.consume_if(TokenKind::Separator(Separator::Semicolon))?;
}
let body_end_pos = self.position;
let body_source = if body_start_pos < body_end_pos {
let start = self.tokens[body_start_pos].fragment.offset();
let end = self.tokens[body_end_pos - 1].fragment.source_end();
self.source[start..end].trim().to_string()
} else {
String::new()
};
self.consume_operator(Operator::CloseCurly)?;
Ok(AstCreate::Procedure(AstCreateProcedure {
token,
name: proc_ident,
params,
body,
body_source,
is_test: true,
}))
}
fn parse_test(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
let mut segments = self.parse_double_colon_separated_identifiers()?;
let name = segments.pop().unwrap().into_fragment();
let namespace: Vec<_> = segments.into_iter().map(|s| s.into_fragment()).collect();
let test_ident = MaybeQualifiedTestIdentifier::new(name).with_namespace(namespace);
let cases = if !self.is_eof() && self.current()?.kind == TokenKind::Operator(Operator::OpenBracket) {
let params_start_pos = self.position;
let mut depth = 0u32;
loop {
let tok = &self.tokens[self.position];
match tok.kind {
TokenKind::Operator(Operator::OpenBracket) => depth += 1,
TokenKind::Operator(Operator::CloseBracket) => {
depth -= 1;
if depth == 0 {
self.position += 1; break;
}
}
_ => {}
}
self.position += 1;
}
let params_end_pos = self.position;
let start = self.tokens[params_start_pos].fragment.offset();
let end = self.tokens[params_end_pos - 1].fragment.offset()
+ self.tokens[params_end_pos - 1].fragment.text().len();
Some(self.source[start..end].to_string())
} else {
None
};
self.consume_operator(Operator::OpenCurly)?;
let body_start_pos = self.position;
let mut body = Vec::new();
loop {
self.skip_new_line()?;
if self.is_eof() || self.current()?.kind == TokenKind::Operator(Operator::CloseCurly) {
break;
}
let node = self.parse_node(Precedence::None)?;
body.push(node);
if !self.is_eof() && self.current()?.is_operator(Operator::Pipe) {
self.advance()?;
continue;
}
self.consume_if(TokenKind::Separator(Separator::NewLine))?;
self.consume_if(TokenKind::Separator(Separator::Semicolon))?;
}
let body_end_pos = self.position;
let body_source = if body_start_pos < body_end_pos {
let start = self.tokens[body_start_pos].fragment.offset();
let end = self.tokens[body_end_pos - 1].fragment.source_end();
self.source[start..end].trim().to_string()
} else {
String::new()
};
self.consume_operator(Operator::CloseCurly)?;
Ok(AstCreate::Test(AstCreateTest {
token,
name: test_ident,
cases,
body,
body_source,
}))
}
fn parse_procedure_params(&mut self) -> Result<Vec<AstProcedureParam<'bump>>> {
let mut params = Vec::new();
self.consume_operator(Operator::OpenCurly)?;
loop {
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
let name = self.parse_identifier_with_hyphens()?.into_fragment();
self.consume_operator(Colon)?;
let param_type = self.parse_type()?;
params.push(AstProcedureParam {
name,
param_type,
});
self.skip_new_line()?;
if self.consume_if(TokenKind::Separator(Comma))?.is_some() {
continue;
}
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
}
self.consume_operator(Operator::CloseCurly)?;
Ok(params)
}
fn parse_namespace(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
let mut if_not_exists = if (self.consume_if(TokenKind::Keyword(If))?).is_some() {
self.consume_operator(Not)?;
self.consume_keyword(Exists)?;
true
} else {
false
};
let segments = self.parse_double_colon_separated_identifiers()?;
if !if_not_exists && (self.consume_if(TokenKind::Keyword(If))?).is_some() {
self.consume_operator(Not)?;
self.consume_keyword(Exists)?;
if_not_exists = true;
}
let namespace = MaybeQualifiedNamespaceIdentifier::new(
segments.into_iter().map(|s| s.into_fragment()).collect(),
);
Ok(AstCreate::Namespace(AstCreateNamespace {
token,
namespace,
if_not_exists,
}))
}
fn parse_remote_namespace(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
let mut if_not_exists = if (self.consume_if(TokenKind::Keyword(If))?).is_some() {
self.consume_operator(Not)?;
self.consume_keyword(Exists)?;
true
} else {
false
};
let segments = self.parse_double_colon_separated_identifiers()?;
if !if_not_exists && (self.consume_if(TokenKind::Keyword(If))?).is_some() {
self.consume_operator(Not)?;
self.consume_keyword(Exists)?;
if_not_exists = true;
}
let namespace = MaybeQualifiedNamespaceIdentifier::new(
segments.into_iter().map(|s| s.into_fragment()).collect(),
);
self.consume_keyword(Keyword::With)?;
self.consume_operator(Operator::OpenCurly)?;
let mut grpc = None;
let mut remote_token = None;
loop {
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
let key = self.consume(TokenKind::Identifier)?;
self.consume_operator(Operator::Colon)?;
match key.fragment.text() {
"grpc" => {
let value = self.consume_literal(Literal::Text)?;
grpc = Some(value.fragment);
}
"token" => {
let value = self.consume_literal(Literal::Text)?;
remote_token = Some(value.fragment);
}
_other => {
let fragment = key.fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "'grpc' or 'token'".to_string(),
},
message: format!(
"Unexpected token: expected {}, got {}",
"'grpc' or 'token'",
fragment.text()
),
fragment,
}));
}
}
self.skip_new_line()?;
if self.consume_if(TokenKind::Separator(Comma))?.is_some() {
continue;
}
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
}
self.consume_operator(Operator::CloseCurly)?;
let grpc = grpc.ok_or_else(|| {
Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "'grpc' key in WITH block".to_string(),
},
message: "CREATE REMOTE NAMESPACE requires 'grpc' in WITH block".to_string(),
fragment: token.fragment.to_owned(),
})
})?;
Ok(AstCreate::RemoteNamespace(AstCreateRemoteNamespace {
token,
namespace,
if_not_exists,
grpc,
token_value: remote_token,
}))
}
fn parse_series(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
let mut segments = self.parse_double_colon_separated_identifiers()?;
let name = segments.pop().unwrap().into_fragment();
let namespace: Vec<_> = segments.into_iter().map(|s| s.into_fragment()).collect();
let columns = self.parse_columns()?;
let series = MaybeQualifiedSeriesIdentifier::new(name).with_namespace(namespace);
let mut tag = None;
let mut key_field = None;
let mut precision = None;
let mut ttl = None;
if self.consume_if(TokenKind::Keyword(Keyword::With))?.is_some() {
self.consume_operator(Operator::OpenCurly)?;
loop {
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
let with_key = {
let current = self.current()?;
match current.kind {
TokenKind::Identifier => self.consume(TokenKind::Identifier)?,
TokenKind::Keyword(Keyword::Tag) => {
let token = self.advance()?;
Token {
kind: TokenKind::Identifier,
..token
}
}
_ => {
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "'key', 'tag', 'precision', or 'ttl'"
.to_string(),
},
message: format!(
"expected 'key', 'tag', 'precision', or 'ttl', found `{}`",
current.fragment.text()
),
fragment: current.fragment.to_owned(),
}));
}
}
};
self.consume_operator(Operator::Colon)?;
match with_key.fragment.text() {
"key" => {
let key_token = self.consume(TokenKind::Identifier)?;
key_field = Some(key_token.fragment);
}
"tag" => {
let mut tag_segments =
self.parse_double_colon_separated_identifiers()?;
let tag_name = tag_segments.pop().unwrap().into_fragment();
let tag_namespace: Vec<_> =
tag_segments.into_iter().map(|s| s.into_fragment()).collect();
tag = Some(MaybeQualifiedSumTypeIdentifier::new(tag_name)
.with_namespace(tag_namespace));
}
"precision" => {
let prec_token = self.consume(TokenKind::Identifier)?;
precision = Some(match prec_token.fragment.text() {
"second" => AstTimestampPrecision::Second,
"millisecond" => AstTimestampPrecision::Millisecond,
"microsecond" => AstTimestampPrecision::Microsecond,
"nanosecond" => AstTimestampPrecision::Nanosecond,
_ => {
let fragment = prec_token.fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "'second', 'millisecond', 'microsecond', or 'nanosecond'"
.to_string(),
},
message: format!(
"Unexpected token: expected {}, got {}",
"'second', 'millisecond', 'microsecond', or 'nanosecond'",
fragment.text()
),
fragment,
}));
}
});
}
"ttl" => {
ttl = Some(self.parse_row_ttl()?);
}
_other => {
let fragment = with_key.fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "'key', 'tag', 'precision', or 'ttl'"
.to_string(),
},
message: format!(
"Unexpected token: expected {}, got {}",
"'key', 'tag', 'precision', or 'ttl'",
fragment.text()
),
fragment,
}));
}
}
self.skip_new_line()?;
if self.consume_if(TokenKind::Separator(Comma))?.is_some() {
continue;
}
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
}
self.consume_operator(Operator::CloseCurly)?;
}
let key_fragment = match key_field {
Some(k) => Some(k),
None => {
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "WITH block containing 'key' field".to_string(),
},
message: "CREATE SERIES requires a WITH block with a 'key' field specifying the ordering column".to_string(),
fragment: token.fragment.to_owned(),
}));
}
};
Ok(AstCreate::Series(AstCreateSeries {
token,
series,
columns,
tag,
key: key_fragment,
precision,
ttl,
}))
}
fn parse_subscription(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
let columns = if self.current()?.is_operator(Operator::As) {
Vec::new()
} else if self.current()?.is_operator(Operator::OpenCurly) {
self.parse_columns()?
} else {
let fragment = self.current()?.fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "'{' or 'AS'".to_string(),
},
message: format!(
"Unexpected token: expected {}, got {}",
"'{' or 'AS'",
fragment.text()
),
fragment,
}));
};
let as_clause = if self.consume_if(TokenKind::Operator(Operator::As))?.is_some() {
self.consume_operator(Operator::OpenCurly)?;
let mut query_nodes = Vec::new();
let mut has_pipes = false;
loop {
if self.is_eof() || self.current()?.kind == TokenKind::Operator(Operator::CloseCurly) {
break;
}
let node = self.parse_node(Precedence::None)?;
query_nodes.push(node);
if !self.is_eof() && self.current()?.is_operator(Operator::Pipe) {
self.advance()?; has_pipes = true;
} else {
self.consume_if(TokenKind::Separator(Separator::NewLine))?;
}
}
self.consume_operator(Operator::CloseCurly)?;
Some(AstStatement {
nodes: query_nodes,
has_pipes,
is_output: false,
})
} else {
None
};
if columns.is_empty() && as_clause.is_none() {
let fragment = self
.current()
.ok()
.map(|t| t.fragment.to_owned())
.unwrap_or_else(|| Fragment::internal("end of input"));
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "AS clause (shape-less CREATE SUBSCRIPTION requires AS clause)"
.to_string(),
},
message: format!(
"Unexpected token: expected {}, got {}",
"AS clause (shape-less CREATE SUBSCRIPTION requires AS clause)",
fragment.text()
),
fragment,
}));
}
Ok(AstCreate::Subscription(AstCreateSubscription {
token,
columns,
as_clause,
}))
}
fn parse_deferred_view(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
let mut segments = self.parse_double_colon_separated_identifiers()?;
let name = segments.pop().unwrap().into_fragment();
let namespace: Vec<_> = segments.into_iter().map(|s| s.into_fragment()).collect();
let columns = self.parse_columns()?;
let view = MaybeQualifiedDeferredViewIdentifier::new(name).with_namespace(namespace);
let tick = if !self.is_eof() && self.current()?.is_keyword(Keyword::With) {
self.advance()?;
self.parse_view_tick_with_clause()?
} else {
None
};
let as_clause = if self.consume_if(TokenKind::Operator(Operator::As))?.is_some() {
self.consume_operator(Operator::OpenCurly)?;
let mut query_nodes = Vec::new();
let mut has_pipes = false;
loop {
if self.is_eof() || self.current()?.kind == TokenKind::Operator(Operator::CloseCurly) {
break;
}
let node = self.parse_node(Precedence::None)?;
query_nodes.push(node);
if !self.is_eof() && self.current()?.is_operator(Operator::Pipe) {
self.advance()?; has_pipes = true;
} else {
self.consume_if(TokenKind::Separator(Separator::NewLine))?;
}
}
self.consume_operator(Operator::CloseCurly)?;
Some(AstStatement {
nodes: query_nodes,
has_pipes,
is_output: false,
})
} else {
None
};
Ok(AstCreate::DeferredView(AstCreateDeferredView {
token,
view,
columns,
as_clause,
storage_kind: AstViewStorageKind::Table,
tick,
}))
}
fn parse_deferred_view_with_storage(
&mut self,
token: Token<'bump>,
hint: ViewStorageKindHint,
) -> Result<AstCreate<'bump>> {
let mut segments = self.parse_double_colon_separated_identifiers()?;
let name = segments.pop().unwrap().into_fragment();
let namespace: Vec<_> = segments.into_iter().map(|s| s.into_fragment()).collect();
let columns = self.parse_columns()?;
let view = MaybeQualifiedDeferredViewIdentifier::new(name).with_namespace(namespace);
let (storage_kind, tick) = self.parse_view_storage_with_clause(hint)?;
let as_clause = self.parse_view_as_clause()?;
Ok(AstCreate::DeferredView(AstCreateDeferredView {
token,
view,
columns,
as_clause,
storage_kind,
tick,
}))
}
fn parse_transactional_view(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
let mut segments = self.parse_double_colon_separated_identifiers()?;
let name = segments.pop().unwrap().into_fragment();
let namespace: Vec<_> = segments.into_iter().map(|s| s.into_fragment()).collect();
let columns = self.parse_columns()?;
let view = MaybeQualifiedTransactionalViewIdentifier::new(name).with_namespace(namespace);
let tick = if !self.is_eof() && self.current()?.is_keyword(Keyword::With) {
self.advance()?;
self.parse_view_tick_with_clause()?
} else {
None
};
let as_clause = if self.consume_if(TokenKind::Operator(Operator::As))?.is_some() {
self.consume_operator(Operator::OpenCurly)?;
let mut query_nodes = Vec::new();
let mut has_pipes = false;
loop {
if self.is_eof() || self.current()?.kind == TokenKind::Operator(Operator::CloseCurly) {
break;
}
let node = self.parse_node(Precedence::None)?;
query_nodes.push(node);
if !self.is_eof() && self.current()?.is_operator(Operator::Pipe) {
self.advance()?; has_pipes = true;
} else {
self.consume_if(TokenKind::Separator(Separator::NewLine))?;
}
}
self.consume_operator(Operator::CloseCurly)?;
Some(AstStatement {
nodes: query_nodes,
has_pipes,
is_output: false,
})
} else {
None
};
Ok(AstCreate::TransactionalView(AstCreateTransactionalView {
token,
view,
columns,
as_clause,
storage_kind: AstViewStorageKind::Table,
tick,
}))
}
fn parse_transactional_view_with_storage(
&mut self,
token: Token<'bump>,
hint: ViewStorageKindHint,
) -> Result<AstCreate<'bump>> {
let mut segments = self.parse_double_colon_separated_identifiers()?;
let name = segments.pop().unwrap().into_fragment();
let namespace: Vec<_> = segments.into_iter().map(|s| s.into_fragment()).collect();
let columns = self.parse_columns()?;
let view = MaybeQualifiedTransactionalViewIdentifier::new(name).with_namespace(namespace);
let (storage_kind, tick) = self.parse_view_storage_with_clause(hint)?;
let as_clause = self.parse_view_as_clause()?;
Ok(AstCreate::TransactionalView(AstCreateTransactionalView {
token,
view,
columns,
as_clause,
storage_kind,
tick,
}))
}
fn parse_table(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
let if_not_exists = if (self.consume_if(TokenKind::Keyword(If))?).is_some() {
self.consume_operator(Not)?;
self.consume_keyword(Exists)?;
true
} else {
false
};
let mut segments = self.parse_double_colon_separated_identifiers()?;
let name = segments.pop().unwrap().into_fragment();
let namespace: Vec<_> = segments.into_iter().map(|s| s.into_fragment()).collect();
let columns = self.parse_columns()?;
let table = MaybeQualifiedTableIdentifier::new(name).with_namespace(namespace);
let mut ttl = None;
if self.consume_if(TokenKind::Keyword(Keyword::With))?.is_some() {
self.consume_operator(Operator::OpenCurly)?;
loop {
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
let key = self.consume(TokenKind::Identifier)?;
self.consume_operator(Operator::Colon)?;
match key.fragment.text() {
"ttl" => {
ttl = Some(self.parse_row_ttl()?);
}
_other => {
let fragment = key.fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "'ttl'".to_string(),
},
message: format!("expected 'ttl', found `{}`", fragment.text()),
fragment,
}));
}
}
self.skip_new_line()?;
if self.consume_if(TokenKind::Separator(Comma))?.is_some() {
continue;
}
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
}
self.consume_operator(Operator::CloseCurly)?;
}
Ok(AstCreate::Table(AstCreateTable {
token,
table,
if_not_exists,
columns,
ttl,
}))
}
fn parse_ringbuffer(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
let mut segments = self.parse_double_colon_separated_identifiers()?;
let name = segments.pop().unwrap().into_fragment();
let namespace: Vec<_> = segments.into_iter().map(|s| s.into_fragment()).collect();
let columns = self.parse_columns()?;
self.consume_keyword(Keyword::With)?;
self.consume_operator(Operator::OpenCurly)?;
let mut capacity: Option<u64> = None;
let mut partition_by: Vec<String> = Vec::new();
let mut ttl = None;
loop {
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
let key = {
let current = self.current()?;
match current.kind {
TokenKind::Identifier => self.consume(TokenKind::Identifier)?,
TokenKind::Keyword(Keyword::Tag) => {
let token = self.advance()?;
Token {
kind: TokenKind::Identifier,
..token
}
}
_ => {
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "'capacity', 'partition_by', or 'ttl'"
.to_string(),
},
message: format!(
"expected 'capacity', 'partition_by', or 'ttl', found `{}`",
current.fragment.text()
),
fragment: current.fragment.to_owned(),
}));
}
}
};
self.consume_operator(Operator::Colon)?;
match key.fragment.text() {
"capacity" => {
let capacity_token = self.consume(TokenKind::Literal(Literal::Number))?;
capacity =
Some(capacity_token.fragment.text().parse::<u64>().map_err(|_| {
let fragment = capacity_token.fragment.to_owned();
Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "valid capacity number".to_string(),
},
message: format!(
"Unexpected token: expected {}, got {}",
"valid capacity number",
fragment.text()
),
fragment,
})
})?);
}
"partition_by" => {
self.consume_operator(Operator::OpenCurly)?;
loop {
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
let col = self.consume(TokenKind::Identifier)?;
partition_by.push(col.fragment.text().to_string());
if self.consume_if(TokenKind::Separator(Comma))?.is_none() {
break;
}
}
self.consume_operator(Operator::CloseCurly)?;
}
"ttl" => {
ttl = Some(self.parse_row_ttl()?);
}
_other => {
let fragment = key.fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "'capacity', 'partition_by', or 'ttl'".to_string(),
},
message: format!(
"Unexpected token: expected {}, got {}",
"'capacity', 'partition_by', or 'ttl'",
fragment.text()
),
fragment,
}));
}
}
self.skip_new_line()?;
if self.consume_if(TokenKind::Separator(Comma))?.is_some() {
continue;
}
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
}
self.consume_operator(Operator::CloseCurly)?;
let capacity = capacity.ok_or_else(|| {
let fragment = self
.current()
.ok()
.map(|t| t.fragment.to_owned())
.unwrap_or_else(|| Fragment::internal("end of input"));
Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "'capacity' is required for RINGBUFFER".to_string(),
},
message: format!(
"Unexpected token: expected {}, got {}",
"'capacity' is required for RINGBUFFER",
fragment.text()
),
fragment,
})
})?;
let ringbuffer = MaybeQualifiedRingBufferIdentifier::new(name).with_namespace(namespace);
Ok(AstCreate::RingBuffer(AstCreateRingBuffer {
token,
ringbuffer,
columns,
capacity,
partition_by,
ttl,
}))
}
fn parse_primary_keyinition(&mut self) -> Result<AstPrimaryKey<'bump>> {
let mut columns = Vec::new();
self.consume_operator(Operator::OpenCurly)?;
loop {
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
let column = self.parse_column_identifier()?;
let sort_direction = if self.current()?.is_operator(Operator::Colon) {
self.consume_operator(Operator::Colon)?;
if self.current()?.is_keyword(Keyword::Asc) {
self.consume_keyword(Keyword::Asc)?;
SortDirection::Asc
} else if self.current()?.is_keyword(Keyword::Desc) {
self.consume_keyword(Keyword::Desc)?;
SortDirection::Desc
} else {
SortDirection::Desc
}
} else {
SortDirection::Desc
};
columns.push(AstIndexColumn {
column,
order: Some(sort_direction),
});
self.skip_new_line()?;
if self.consume_if(TokenKind::Separator(Separator::Comma))?.is_some() {
continue;
}
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
}
self.consume_operator(Operator::CloseCurly)?;
if columns.is_empty() {
let fragment = self.current()?.fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "at least one column in primary key".to_string(),
},
message: format!(
"Unexpected token: expected {}, got {}",
"at least one column in primary key",
fragment.text()
),
fragment,
}));
}
Ok(AstPrimaryKey {
columns,
})
}
fn parse_create_primary_key(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
self.consume_keyword(Keyword::On)?;
let mut segments = self.parse_double_colon_separated_identifiers()?;
let name = segments.pop().unwrap().into_fragment();
let namespace: Vec<_> = segments.into_iter().map(|s| s.into_fragment()).collect();
let table = MaybeQualifiedTableIdentifier::new(name).with_namespace(namespace);
let pk_def = self.parse_primary_keyinition()?;
Ok(AstCreate::PrimaryKey(AstCreatePrimaryKey {
token,
table,
columns: pk_def.columns,
}))
}
fn parse_create_column_property(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
self.consume_keyword(Keyword::On)?;
let column = self.parse_column_identifier()?;
self.consume_operator(Operator::OpenCurly)?;
let mut properties = Vec::new();
loop {
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
let kind_token = self.consume(TokenKind::Identifier)?;
let kind = match kind_token.fragment.text() {
"saturation" => AstColumnPropertyKind::Saturation,
"default" => AstColumnPropertyKind::Default,
_ => {
let fragment = kind_token.fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::InvalidPolicy,
message: format!("Invalid property token: {}", fragment.text()),
fragment,
}));
}
};
self.consume_operator(Operator::Colon)?;
let value = BumpBox::new_in(self.parse_node(Precedence::None)?, self.bump());
properties.push(AstColumnPropertyEntry {
kind,
value,
});
self.skip_new_line()?;
if self.consume_if(TokenKind::Separator(Comma))?.is_some() {
continue;
}
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
}
self.consume_operator(Operator::CloseCurly)?;
Ok(AstCreate::ColumnProperty(AstCreateColumnProperty {
token,
column,
properties,
}))
}
fn parse_dictionary(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
let if_not_exists = if (self.consume_if(TokenKind::Keyword(If))?).is_some() {
self.consume_operator(Not)?;
self.consume_keyword(Exists)?;
true
} else {
false
};
let mut segments = self.parse_double_colon_separated_identifiers()?;
let name = segments.pop().unwrap().into_fragment();
let namespace: Vec<_> = segments.into_iter().map(|s| s.into_fragment()).collect();
let dictionary = if namespace.is_empty() {
MaybeQualifiedDictionaryIdentifier::new(name)
} else {
MaybeQualifiedDictionaryIdentifier::new(name).with_namespace(namespace)
};
self.consume_keyword(For)?;
let value_type = self.parse_type()?;
self.consume_operator(Operator::As)?;
let id_type = self.parse_type()?;
Ok(AstCreate::Dictionary(AstCreateDictionary {
token,
if_not_exists,
dictionary,
value_type,
id_type,
}))
}
fn parse_enum(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
let if_not_exists = if (self.consume_if(TokenKind::Keyword(If))?).is_some() {
self.consume_operator(Not)?;
self.consume_keyword(Exists)?;
true
} else {
false
};
let mut segments = self.parse_double_colon_separated_identifiers()?;
let name_frag = segments.pop().unwrap().into_fragment();
let namespace: Vec<_> = segments.into_iter().map(|s| s.into_fragment()).collect();
let sumtype_ident = if namespace.is_empty() {
MaybeQualifiedSumTypeIdentifier::new(name_frag)
} else {
MaybeQualifiedSumTypeIdentifier::new(name_frag).with_namespace(namespace)
};
self.consume_operator(Operator::OpenCurly)?;
let mut variants = Vec::new();
loop {
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
let variant_name = self.parse_identifier_with_hyphens()?.into_fragment();
let columns = if !self.is_eof() && self.current()?.is_operator(Operator::OpenCurly) {
self.parse_columns()?
} else {
Vec::new()
};
variants.push(AstVariant {
name: variant_name,
columns,
});
self.skip_new_line()?;
if self.consume_if(TokenKind::Separator(Comma))?.is_none() {
self.skip_new_line()?;
break;
}
}
self.consume_operator(Operator::CloseCurly)?;
Ok(AstCreate::Enum(AstCreateSumType {
token,
if_not_exists,
name: sumtype_ident,
variants,
}))
}
fn parse_type(&mut self) -> Result<AstType<'bump>> {
let ty_token = self.consume(TokenKind::Identifier)?;
if ty_token.fragment.text().eq_ignore_ascii_case("option") {
self.consume_operator(Operator::OpenParen)?;
let inner = self.parse_type()?;
self.consume_operator(Operator::CloseParen)?;
return Ok(AstType::Optional(Box::new(inner)));
}
if !self.is_eof() && self.current()?.is_operator(Operator::DoubleColon) {
self.consume_operator(Operator::DoubleColon)?;
let name_token = self.consume(TokenKind::Identifier)?;
return Ok(AstType::Qualified {
namespace: ty_token.fragment,
name: name_token.fragment,
});
}
if !self.is_eof() && self.current()?.is_operator(Operator::OpenParen) {
self.consume_operator(Operator::OpenParen)?;
let mut params = Vec::new();
params.push(self.parse_literal_number()?);
while self.consume_if(TokenKind::Separator(Comma))?.is_some() {
params.push(self.parse_literal_number()?);
}
self.consume_operator(Operator::CloseParen)?;
Ok(AstType::Constrained {
name: ty_token.fragment,
params,
})
} else {
Ok(AstType::Unconstrained(ty_token.fragment))
}
}
fn parse_columns(&mut self) -> Result<Vec<AstColumnToCreate<'bump>>> {
let mut result = Vec::new();
self.consume_operator(Operator::OpenCurly)?;
loop {
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
result.push(self.parse_column()?);
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
if self.consume_if(TokenKind::Separator(Comma))?.is_none() {
break;
};
}
self.consume_operator(Operator::CloseCurly)?;
Ok(result)
}
pub(crate) fn parse_column(&mut self) -> Result<AstColumnToCreate<'bump>> {
let name_identifier = self.parse_identifier_with_hyphens()?;
self.consume_operator(Colon)?;
let ty_token = self.consume(TokenKind::Identifier)?;
let name = name_identifier.into_fragment();
let ty = if ty_token.fragment.text().eq_ignore_ascii_case("option") {
self.consume_operator(Operator::OpenParen)?;
let inner = self.parse_type()?;
self.consume_operator(Operator::CloseParen)?;
AstType::Optional(Box::new(inner))
} else if !self.is_eof() && self.current()?.is_operator(Operator::DoubleColon) {
self.consume_operator(Operator::DoubleColon)?;
let name_token = self.consume(TokenKind::Identifier)?;
AstType::Qualified {
namespace: ty_token.fragment,
name: name_token.fragment,
}
} else if !self.is_eof() && self.current()?.is_operator(Operator::OpenParen) {
self.consume_operator(Operator::OpenParen)?;
let mut params = Vec::new();
params.push(self.parse_literal_number()?);
while self.consume_if(TokenKind::Separator(Comma))?.is_some() {
params.push(self.parse_literal_number()?);
}
self.consume_operator(Operator::CloseParen)?;
AstType::Constrained {
name: ty_token.fragment,
params,
}
} else {
AstType::Unconstrained(ty_token.fragment)
};
let properties = if !self.is_eof() && self.current()?.is_keyword(Keyword::With) {
self.parse_column_properties()?
} else {
vec![]
};
Ok(AstColumnToCreate {
name,
ty,
properties,
})
}
fn parse_column_properties(&mut self) -> Result<Vec<AstColumnProperty<'bump>>> {
self.consume_keyword(Keyword::With)?;
self.consume_operator(Operator::OpenCurly)?;
let mut properties = Vec::new();
loop {
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
let key_token = {
let current = self.current()?;
match current.kind {
TokenKind::Identifier => self.consume(TokenKind::Identifier)?,
TokenKind::Keyword(Keyword::Dictionary) => {
let token = self.advance()?;
Token {
kind: TokenKind::Identifier,
..token
}
}
_ => {
let fragment = current.fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::InvalidColumnProperty,
message: format!(
"Invalid column property: {}",
fragment.text()
),
fragment,
}));
}
}
};
let key = key_token.fragment.text();
let property = match key {
"auto_increment" => AstColumnProperty::AutoIncrement,
"dictionary" => {
self.consume_operator(Colon)?;
let mut segments = self.parse_double_colon_separated_identifiers()?;
let name = segments.pop().unwrap().into_fragment();
let namespace: Vec<_> =
segments.into_iter().map(|s| s.into_fragment()).collect();
let dict_ident = if namespace.is_empty() {
MaybeQualifiedDictionaryIdentifier::new(name)
} else {
MaybeQualifiedDictionaryIdentifier::new(name).with_namespace(namespace)
};
AstColumnProperty::Dictionary(dict_ident)
}
"saturation" => {
self.consume_operator(Colon)?;
let value = BumpBox::new_in(self.parse_node(Precedence::None)?, self.bump());
AstColumnProperty::Saturation(value)
}
"default" => {
self.consume_operator(Colon)?;
let value = BumpBox::new_in(self.parse_node(Precedence::None)?, self.bump());
AstColumnProperty::Default(value)
}
_ => {
let fragment = key_token.fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::InvalidColumnProperty,
message: format!("Invalid column property: {}", fragment.text()),
fragment,
}));
}
};
properties.push(property);
self.skip_new_line()?;
if self.consume_if(TokenKind::Separator(Comma))?.is_some() {
continue;
}
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
}
self.consume_operator(Operator::CloseCurly)?;
Ok(properties)
}
pub(crate) fn parse_event(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
let mut segments = self.parse_double_colon_separated_identifiers()?;
let name_frag = segments.pop().unwrap().into_fragment();
let namespace: Vec<_> = segments.into_iter().map(|s| s.into_fragment()).collect();
let sumtype_ident = if namespace.is_empty() {
MaybeQualifiedSumTypeIdentifier::new(name_frag)
} else {
MaybeQualifiedSumTypeIdentifier::new(name_frag).with_namespace(namespace)
};
self.consume_operator(Operator::OpenCurly)?;
let mut variants = Vec::new();
loop {
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
let variant_name = self.parse_identifier_with_hyphens()?.into_fragment();
let columns = if !self.is_eof() && self.current()?.is_operator(Operator::OpenCurly) {
self.parse_columns()?
} else {
Vec::new()
};
variants.push(AstVariant {
name: variant_name,
columns,
});
self.skip_new_line()?;
if self.consume_if(TokenKind::Separator(Comma))?.is_none() {
self.skip_new_line()?;
break;
}
}
self.consume_operator(Operator::CloseCurly)?;
Ok(AstCreate::Event(AstCreateEvent {
token,
name: sumtype_ident,
variants,
}))
}
pub(crate) fn parse_tag(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
let mut segments = self.parse_double_colon_separated_identifiers()?;
let name_frag = segments.pop().unwrap().into_fragment();
let namespace: Vec<_> = segments.into_iter().map(|s| s.into_fragment()).collect();
let sumtype_ident = if namespace.is_empty() {
MaybeQualifiedSumTypeIdentifier::new(name_frag)
} else {
MaybeQualifiedSumTypeIdentifier::new(name_frag).with_namespace(namespace)
};
self.consume_operator(Operator::OpenCurly)?;
let mut variants = Vec::new();
loop {
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
let variant_name = self.parse_identifier_with_hyphens()?.into_fragment();
let columns = if !self.is_eof() && self.current()?.is_operator(Operator::OpenCurly) {
self.parse_columns()?
} else {
Vec::new()
};
variants.push(AstVariant {
name: variant_name,
columns,
});
self.skip_new_line()?;
if self.consume_if(TokenKind::Separator(Comma))?.is_none() {
self.skip_new_line()?;
break;
}
}
self.consume_operator(Operator::CloseCurly)?;
Ok(AstCreate::Tag(AstCreateTag {
token,
name: sumtype_ident,
variants,
}))
}
pub(crate) fn parse_handler(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
let mut segments = self.parse_double_colon_separated_identifiers()?;
let name_frag = segments.pop().unwrap().into_fragment();
let namespace: Vec<_> = segments.into_iter().map(|s| s.into_fragment()).collect();
let handler_name = MaybeQualifiedTableIdentifier::new(name_frag).with_namespace(namespace);
self.consume_keyword(Keyword::On)?;
let mut event_segments = self.parse_double_colon_separated_identifiers()?;
let on_variant = event_segments.pop().unwrap().into_fragment();
let event_name_frag = event_segments.pop().unwrap().into_fragment();
let event_namespace: Vec<_> = event_segments.into_iter().map(|s| s.into_fragment()).collect();
let on_event = if event_namespace.is_empty() {
MaybeQualifiedSumTypeIdentifier::new(event_name_frag)
} else {
MaybeQualifiedSumTypeIdentifier::new(event_name_frag).with_namespace(event_namespace)
};
self.consume_operator(Operator::OpenCurly)?;
let body_start_pos = self.position;
let mut body = Vec::new();
loop {
self.skip_new_line()?;
if self.is_eof() || self.current()?.kind == TokenKind::Operator(Operator::CloseCurly) {
break;
}
let node = self.parse_node(Precedence::None)?;
body.push(node);
self.consume_if(TokenKind::Separator(Separator::NewLine))?;
self.consume_if(TokenKind::Separator(Separator::Semicolon))?;
}
let body_end_pos = self.position;
let body_source = if body_start_pos < body_end_pos {
let start = self.tokens[body_start_pos].fragment.offset();
let end = self.tokens[body_end_pos - 1].fragment.source_end();
self.source[start..end].trim().to_string()
} else {
String::new()
};
self.consume_operator(Operator::CloseCurly)?;
Ok(AstCreate::Handler(AstCreateHandler {
token,
name: handler_name,
on_event,
on_variant,
body,
body_source,
}))
}
fn parse_migration(&mut self, token: Token<'bump>) -> Result<AstCreate<'bump>> {
let name = match &self.current()?.kind {
TokenKind::Literal(Literal::Text) => {
let text = self.current()?.fragment.text().to_string();
self.advance()?;
text
}
_ => {
let fragment = self.current()?.fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "migration name as string literal".to_string(),
},
message: format!(
"Expected migration name as string literal, got {}",
fragment.text()
),
fragment,
}));
}
};
self.consume_operator(Operator::OpenCurly)?;
let body_start_pos = self.position;
let mut depth = 1u32;
while depth > 0 {
if self.is_eof() {
let fragment = self.tokens[body_start_pos].fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "closing '}'".to_string(),
},
message: "Unexpected end of input while parsing migration body".to_string(),
fragment,
}));
}
match self.current()?.kind {
TokenKind::Operator(Operator::OpenCurly) => {
depth += 1;
self.advance()?;
}
TokenKind::Operator(Operator::CloseCurly) => {
depth -= 1;
if depth > 0 {
self.advance()?;
}
}
_ => {
self.advance()?;
}
}
}
let body_end_pos = self.position;
let body_source = if body_start_pos < body_end_pos {
let start = self.tokens[body_start_pos].fragment.offset();
let end = self.tokens[body_end_pos - 1].fragment.source_end();
self.source[start..end].trim().to_string()
} else {
String::new()
};
self.consume_operator(Operator::CloseCurly)?;
let rollback_body_source = if (self.consume_if(TokenKind::Keyword(Keyword::Rollback))?).is_some() {
self.consume_operator(Operator::OpenCurly)?;
let rb_start_pos = self.position;
let mut depth = 1u32;
while depth > 0 {
if self.is_eof() {
let fragment = self.tokens[rb_start_pos].fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "closing '}'".to_string(),
},
message: "Unexpected end of input while parsing rollback body"
.to_string(),
fragment,
}));
}
match self.current()?.kind {
TokenKind::Operator(Operator::OpenCurly) => {
depth += 1;
self.advance()?;
}
TokenKind::Operator(Operator::CloseCurly) => {
depth -= 1;
if depth > 0 {
self.advance()?;
}
}
_ => {
self.advance()?;
}
}
}
let rb_end_pos = self.position;
let rb_source = if rb_start_pos < rb_end_pos {
let start = self.tokens[rb_start_pos].fragment.offset();
let end = self.tokens[rb_end_pos - 1].fragment.offset()
+ self.tokens[rb_end_pos - 1].fragment.text().len();
self.source[start..end].trim().to_string()
} else {
String::new()
};
self.consume_operator(Operator::CloseCurly)?;
Some(rb_source)
} else {
None
};
Ok(AstCreate::Migration(AstCreateMigration {
token,
name,
body_source,
rollback_body_source,
}))
}
fn parse_view_as_clause(&mut self) -> Result<Option<AstStatement<'bump>>> {
if self.consume_if(TokenKind::Operator(Operator::As))?.is_some() {
self.consume_operator(Operator::OpenCurly)?;
let mut query_nodes = Vec::new();
let mut has_pipes = false;
loop {
if self.is_eof() || self.current()?.kind == TokenKind::Operator(Operator::CloseCurly) {
break;
}
let node = self.parse_node(Precedence::None)?;
query_nodes.push(node);
if !self.is_eof() && self.current()?.is_operator(Operator::Pipe) {
self.advance()?;
has_pipes = true;
} else {
self.consume_if(TokenKind::Separator(Separator::NewLine))?;
}
}
self.consume_operator(Operator::CloseCurly)?;
Ok(Some(AstStatement {
nodes: query_nodes,
has_pipes,
is_output: false,
}))
} else {
Ok(None)
}
}
fn parse_view_storage_with_clause(
&mut self,
hint: ViewStorageKindHint,
) -> Result<(AstViewStorageKind, Option<Duration>)> {
self.consume_keyword(Keyword::With)?;
self.consume_operator(Operator::OpenCurly)?;
let mut tick: Option<Duration> = None;
match hint {
ViewStorageKindHint::RingBuffer => {
let mut capacity: Option<u64> = None;
let mut propagate_evictions: Option<bool> = None;
let mut partition_by: Vec<String> = Vec::new();
loop {
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
let key = self.consume(TokenKind::Identifier)?;
self.consume_operator(Operator::Colon)?;
match key.fragment.text() {
"capacity" => {
let token =
self.consume(TokenKind::Literal(Literal::Number))?;
capacity =
Some(token.fragment.text().parse::<u64>().map_err(
|_| {
let fragment =
token.fragment.to_owned();
Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "valid capacity number".to_string(),
},
message: format!("expected valid capacity number, got {}", fragment.text()),
fragment,
})
},
)?);
}
"propagate_evictions" => {
let token = self.consume(TokenKind::Identifier)?;
propagate_evictions =
Some(match token.fragment.text() {
"true" => true,
"false" => false,
_ => {
let fragment =
token.fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "true or false".to_string(),
},
message: format!("expected true or false, got {}", fragment.text()),
fragment,
}));
}
});
}
"partition_by" => {
self.consume_operator(Operator::OpenCurly)?;
loop {
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
let col = self.consume(TokenKind::Identifier)?;
partition_by.push(col.fragment.text().to_string());
if self.consume_if(TokenKind::Separator(Comma))?
.is_none()
{
break;
}
}
self.consume_operator(Operator::CloseCurly)?;
}
"tick" => {
tick = Some(self.parse_tick_duration()?);
}
other => {
let fragment = key.fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "'capacity', 'propagate_evictions', 'partition_by', or 'tick'"
.to_string(),
},
message: format!(
"unexpected key '{}' in WITH clause",
other
),
fragment,
}));
}
}
self.consume_if(TokenKind::Separator(Comma))?;
}
self.consume_operator(Operator::CloseCurly)?;
let capacity = capacity.ok_or_else(|| {
Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "capacity".to_string(),
},
message: "ringbuffer view requires 'capacity' in WITH clause"
.to_string(),
fragment: Fragment::internal("".to_string()),
})
})?;
Ok((
AstViewStorageKind::RingBuffer {
capacity,
propagate_evictions,
partition_by,
},
tick,
))
}
ViewStorageKindHint::Series => {
let mut key_column: Option<String> = None;
let mut precision: Option<AstTimestampPrecision> = None;
loop {
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
let key = self.consume(TokenKind::Identifier)?;
self.consume_operator(Operator::Colon)?;
match key.fragment.text() {
"key" => {
let token = self.consume(TokenKind::Identifier)?;
key_column = Some(token.fragment.text().to_string());
}
"precision" => {
let token = self.consume(TokenKind::Identifier)?;
precision = Some(match token.fragment.text() {
"second" => AstTimestampPrecision::Second,
"millisecond" => AstTimestampPrecision::Millisecond,
"microsecond" => AstTimestampPrecision::Microsecond,
"nanosecond" => AstTimestampPrecision::Nanosecond,
_ => {
let fragment = token.fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "second, millisecond, microsecond, or nanosecond".to_string(),
},
message: format!("unexpected precision '{}'", fragment.text()),
fragment,
}));
}
});
}
"tick" => {
tick = Some(self.parse_tick_duration()?);
}
other => {
let fragment = key.fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "'key', 'precision', or 'tick'"
.to_string(),
},
message: format!(
"unexpected key '{}' in WITH clause",
other
),
fragment,
}));
}
}
self.consume_if(TokenKind::Separator(Comma))?;
}
self.consume_operator(Operator::CloseCurly)?;
let key_column = key_column.unwrap_or_default();
Ok((
AstViewStorageKind::Series {
key_column,
precision,
},
tick,
))
}
}
}
fn parse_view_tick_with_clause(&mut self) -> Result<Option<Duration>> {
self.consume_operator(Operator::OpenCurly)?;
let mut tick: Option<Duration> = None;
loop {
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
let key = self.consume(TokenKind::Identifier)?;
self.consume_operator(Operator::Colon)?;
match key.fragment.text() {
"tick" => {
tick = Some(self.parse_tick_duration()?);
}
other => {
let fragment = key.fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "'tick'".to_string(),
},
message: format!("unexpected key '{}' in WITH clause", other),
fragment,
}));
}
}
self.consume_if(TokenKind::Separator(Comma))?;
}
self.consume_operator(Operator::CloseCurly)?;
Ok(tick)
}
fn parse_tick_duration(&mut self) -> Result<Duration> {
let token = self.consume(TokenKind::Literal(Literal::Text))?;
let duration_str = token.fragment.text();
Compiler::parse_duration(duration_str)
}
fn parse_row_ttl(&mut self) -> Result<AstRowTtl<'bump>> {
self.consume_operator(Operator::OpenCurly)?;
let mut duration = None;
let mut anchor = None;
let mut mode = None;
loop {
self.skip_new_line()?;
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
let key = {
let current = self.current()?;
match current.kind {
TokenKind::Identifier => self.consume(TokenKind::Identifier)?,
TokenKind::Keyword(Keyword::On) => {
let token = self.advance()?;
Token {
kind: TokenKind::Identifier,
..token
}
}
_ => {
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "'duration', 'on', or 'mode'".to_string(),
},
message: format!(
"expected 'duration', 'on', or 'mode', found `{}`",
current.fragment.text()
),
fragment: current.fragment.to_owned(),
}));
}
}
};
self.consume_operator(Operator::Colon)?;
match key.fragment.text() {
"duration" => {
let token = self.consume(TokenKind::Literal(Literal::Text))?;
duration = Some(token);
}
"on" => {
let token = self.consume(TokenKind::Identifier)?;
anchor = Some(token);
}
"mode" => {
let current = self.current()?;
let token = match current.kind {
TokenKind::Identifier => self.consume(TokenKind::Identifier)?,
TokenKind::Keyword(Keyword::Delete)
| TokenKind::Keyword(Keyword::Drop) => {
let token = self.advance()?;
Token {
kind: TokenKind::Identifier,
..token
}
}
_ => {
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "'delete' or 'drop'".to_string(),
},
message: format!(
"expected 'delete' or 'drop', found `{}`",
current.fragment.text()
),
fragment: current.fragment.to_owned(),
}));
}
};
mode = Some(token);
}
_other => {
let fragment = key.fragment.to_owned();
return Err(Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "'duration', 'on', or 'mode'".to_string(),
},
message: format!(
"expected 'duration', 'on', or 'mode', found `{}`",
fragment.text()
),
fragment,
}));
}
}
self.skip_new_line()?;
if self.consume_if(TokenKind::Separator(Comma))?.is_some() {
continue;
}
if self.current()?.is_operator(Operator::CloseCurly) {
break;
}
}
self.consume_operator(Operator::CloseCurly)?;
let duration = duration.ok_or_else(|| {
let fragment = self
.current()
.ok()
.map(|t| t.fragment.to_owned())
.unwrap_or_else(|| Fragment::internal("end of input"));
Error::from(TypeError::Ast {
kind: AstErrorKind::UnexpectedToken {
expected: "'duration' is required in ttl config".to_string(),
},
message: "'duration' is required in ttl config".to_string(),
fragment,
})
})?;
Ok(AstRowTtl {
duration,
anchor,
mode,
})
}
}
enum ViewStorageKindHint {
RingBuffer,
Series,
}
#[cfg(test)]
pub mod tests {
use crate::{
ast::{
ast::{
Ast, AstColumnProperty, AstCreate, AstCreateDeferredView, AstCreateDictionary,
AstCreateNamespace, AstCreateRingBuffer, AstCreateSeries, AstCreateSubscription,
AstCreateSumType, AstCreateTable, AstCreateTransactionalView, AstType,
},
parse::Parser,
},
bump::Bump,
token::tokenize,
};
#[test]
fn test_create_namespace() {
let bump = Bump::new();
let source = "CREATE NAMESPACE REIFYDB";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Namespace(AstCreateNamespace {
namespace,
if_not_exists,
..
}) => {
assert_eq!(namespace.segments[0].text(), "REIFYDB");
assert!(!if_not_exists);
}
_ => unreachable!(),
}
}
#[test]
fn test_create_namespace_with_hyphen() {
let bump = Bump::new();
let source = "CREATE NAMESPACE my-namespace";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Namespace(AstCreateNamespace {
namespace,
if_not_exists,
..
}) => {
assert_eq!(namespace.segments[0].text(), "my-namespace");
assert!(!if_not_exists);
}
_ => unreachable!(),
}
}
#[test]
fn test_create_namespace_if_not_exists() {
let bump = Bump::new();
let source = "CREATE NAMESPACE IF NOT EXISTS my_namespace";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Namespace(AstCreateNamespace {
namespace,
if_not_exists,
..
}) => {
assert_eq!(namespace.segments[0].text(), "my_namespace");
assert!(if_not_exists);
}
_ => unreachable!(),
}
}
#[test]
fn test_create_namespace_if_not_exists_with_hyphen() {
let bump = Bump::new();
let source = "CREATE NAMESPACE IF NOT EXISTS my-test-namespace";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Namespace(AstCreateNamespace {
namespace,
if_not_exists,
..
}) => {
assert_eq!(namespace.segments[0].text(), "my-test-namespace");
assert!(if_not_exists);
}
_ => unreachable!(),
}
}
#[test]
fn test_create_namespace_if_not_exists_with_backtick() {
let bump = Bump::new();
let source = "CREATE NAMESPACE IF NOT EXISTS `my-namespace`";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Namespace(AstCreateNamespace {
namespace,
if_not_exists,
..
}) => {
assert_eq!(namespace.segments[0].text(), "my-namespace");
assert!(if_not_exists);
}
_ => unreachable!(),
}
}
#[test]
fn test_create_namespace_name_if_not_exists() {
let bump = Bump::new();
let source = "CREATE NAMESPACE my_namespace IF NOT EXISTS";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Namespace(AstCreateNamespace {
namespace,
if_not_exists,
..
}) => {
assert_eq!(namespace.segments[0].text(), "my_namespace");
assert!(if_not_exists);
}
_ => unreachable!(),
}
}
#[test]
fn test_create_namespace_name_if_not_exists_with_hyphen() {
let bump = Bump::new();
let source = "CREATE NAMESPACE my-test-namespace IF NOT EXISTS";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Namespace(AstCreateNamespace {
namespace,
if_not_exists,
..
}) => {
assert_eq!(namespace.segments[0].text(), "my-test-namespace");
assert!(if_not_exists);
}
_ => unreachable!(),
}
}
#[test]
fn test_create_namespace_name_if_not_exists_with_backtick() {
let bump = Bump::new();
let source = "CREATE NAMESPACE `my-namespace` IF NOT EXISTS";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Namespace(AstCreateNamespace {
namespace,
if_not_exists,
..
}) => {
assert_eq!(namespace.segments[0].text(), "my-namespace");
assert!(if_not_exists);
}
_ => unreachable!(),
}
}
#[test]
fn test_create_table_with_hyphen() {
let bump = Bump::new();
let source = "CREATE TABLE my-shape::my-table { id: Int4 }";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Table(AstCreateTable {
table,
..
}) => {
assert_eq!(table.namespace[0].text(), "my-shape");
assert_eq!(table.name.text(), "my-table");
}
_ => unreachable!(),
}
}
#[test]
fn test_create_ringbuffer_with_hyphen() {
let bump = Bump::new();
let source = "CREATE RINGBUFFER my-ns::my-buffer { id: Int4 } WITH { capacity: 100 }";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::RingBuffer(AstCreateRingBuffer {
ringbuffer,
capacity,
..
}) => {
assert_eq!(ringbuffer.namespace[0].text(), "my-ns");
assert_eq!(ringbuffer.name.text(), "my-buffer");
assert_eq!(*capacity, 100);
}
_ => unreachable!(),
}
}
#[test]
fn test_create_dictionary_with_hyphen() {
let bump = Bump::new();
let source = "CREATE DICTIONARY my-dict FOR Text AS Int4";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Dictionary(AstCreateDictionary {
dictionary,
..
}) => {
assert_eq!(dictionary.name.text(), "my-dict");
}
_ => unreachable!(),
}
}
#[test]
fn test_create_table_with_hyphenated_columns() {
let bump = Bump::new();
let source = "CREATE TABLE test::user-data { user-id: Int4, user-name: Text }";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Table(AstCreateTable {
table,
columns,
..
}) => {
assert_eq!(table.name.text(), "user-data");
assert_eq!(columns.len(), 2);
assert_eq!(columns[0].name.text(), "user-id");
assert_eq!(columns[1].name.text(), "user-name");
}
_ => unreachable!(),
}
}
#[test]
fn test_create_namespace_with_backtick() {
let bump = Bump::new();
let source = "CREATE NAMESPACE `my-namespace`";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Namespace(AstCreateNamespace {
namespace,
..
}) => {
assert_eq!(namespace.segments[0].text(), "my-namespace");
}
_ => unreachable!(),
}
}
#[test]
fn test_create_series() {
let bump = Bump::new();
let source = r#"
create series test::metrics{ts: datetime, value: Int2} WITH { key: ts }
"#;
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Series(AstCreateSeries {
series,
columns,
key,
..
}) => {
assert_eq!(series.namespace[0].text(), "test");
assert_eq!(series.name.text(), "metrics");
assert_eq!(columns.len(), 2);
assert_eq!(columns[0].name.text(), "ts");
match &columns[0].ty {
AstType::Unconstrained(ident) => {
assert_eq!(ident.text(), "datetime")
}
_ => panic!("Expected simple type"),
}
assert_eq!(columns[1].name.text(), "value");
match &columns[1].ty {
AstType::Unconstrained(ident) => {
assert_eq!(ident.text(), "Int2")
}
_ => panic!("Expected simple type"),
}
assert!(columns[1].properties.is_empty());
assert!(key.is_some());
assert_eq!(key.as_ref().unwrap().text(), "ts");
}
_ => unreachable!(),
}
}
#[test]
fn test_create_table() {
let bump = Bump::new();
let source = r#"
create table test::users{id: int2, name: text, is_premium: bool}
"#;
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Table(AstCreateTable {
table,
columns,
..
}) => {
assert_eq!(table.namespace[0].text(), "test");
assert_eq!(table.name.text(), "users");
assert_eq!(columns.len(), 3);
{
let col = &columns[0];
assert_eq!(col.name.text(), "id");
match &col.ty {
AstType::Unconstrained(ident) => {
assert_eq!(ident.text(), "int2")
}
_ => panic!("Expected simple type"),
}
assert!(col.properties.is_empty());
}
{
let col = &columns[1];
assert_eq!(col.name.text(), "name");
match &col.ty {
AstType::Unconstrained(ident) => {
assert_eq!(ident.text(), "text")
}
_ => panic!("Expected simple type"),
}
assert!(col.properties.is_empty());
}
{
let col = &columns[2];
assert_eq!(col.name.text(), "is_premium");
match &col.ty {
AstType::Unconstrained(ident) => {
assert_eq!(ident.text(), "bool")
}
_ => panic!("Expected simple type"),
}
assert!(col.properties.is_empty());
}
}
_ => unreachable!(),
}
}
#[test]
fn test_create_table_with_auto_increment() {
let bump = Bump::new();
let source = r#"
create table test::users { id: int4 with { auto_increment }, name: utf8 }
"#;
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Table(AstCreateTable {
table,
columns,
..
}) => {
assert_eq!(table.namespace[0].text(), "test");
assert_eq!(table.name.text(), "users");
assert_eq!(columns.len(), 2);
{
let col = &columns[0];
assert_eq!(col.name.text(), "id");
match &col.ty {
AstType::Unconstrained(ident) => {
assert_eq!(ident.text(), "int4")
}
_ => panic!("Expected simple type"),
}
assert_eq!(col.properties.len(), 1);
assert!(matches!(col.properties[0], AstColumnProperty::AutoIncrement));
}
{
let col = &columns[1];
assert_eq!(col.name.text(), "name");
match &col.ty {
AstType::Unconstrained(ident) => {
assert_eq!(ident.text(), "utf8")
}
_ => panic!("Expected simple type"),
}
assert!(col.properties.is_empty());
}
}
_ => unreachable!(),
}
}
#[test]
fn test_create_deferred_view() {
let bump = Bump::new();
let source = r#"
create deferred view test::views{field: int2}
"#;
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::DeferredView(AstCreateDeferredView {
view,
columns,
..
}) => {
assert_eq!(view.namespace[0].text(), "test");
assert_eq!(view.name.text(), "views");
assert_eq!(columns.len(), 1);
let col = &columns[0];
assert_eq!(col.name.text(), "field");
match &col.ty {
AstType::Unconstrained(ident) => {
assert_eq!(ident.text(), "int2")
}
_ => panic!("Expected simple type"),
}
assert!(col.properties.is_empty());
}
_ => unreachable!(),
}
}
#[test]
fn test_create_transactional_view() {
let bump = Bump::new();
let source = r#"
create transactional view test::myview{id: int4, name: utf8}
"#;
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::TransactionalView(AstCreateTransactionalView {
view,
columns,
..
}) => {
assert_eq!(view.namespace[0].text(), "test");
assert_eq!(view.name.text(), "myview");
assert_eq!(columns.len(), 2);
{
let col = &columns[0];
assert_eq!(col.name.text(), "id");
match &col.ty {
AstType::Unconstrained(ident) => {
assert_eq!(ident.text(), "int4")
}
_ => panic!("Expected simple type"),
}
assert!(col.properties.is_empty());
}
{
let col = &columns[1];
assert_eq!(col.name.text(), "name");
match &col.ty {
AstType::Unconstrained(ident) => {
assert_eq!(ident.text(), "utf8")
}
_ => panic!("Expected simple type"),
}
assert!(col.properties.is_empty());
}
}
_ => unreachable!(),
}
}
#[test]
fn test_create_ringbuffer() {
let bump = Bump::new();
let source = r#"
create ringbuffer test::events { id: int4, data: utf8 } with { capacity: 10 }
"#;
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::RingBuffer(AstCreateRingBuffer {
ringbuffer,
columns,
capacity,
..
}) => {
assert_eq!(ringbuffer.namespace[0].text(), "test");
assert_eq!(ringbuffer.name.text(), "events");
assert_eq!(*capacity, 10);
assert_eq!(columns.len(), 2);
{
let col = &columns[0];
assert_eq!(col.name.text(), "id");
match &col.ty {
AstType::Unconstrained(ident) => {
assert_eq!(ident.text(), "int4")
}
_ => panic!("Expected simple type"),
}
}
{
let col = &columns[1];
assert_eq!(col.name.text(), "data");
match &col.ty {
AstType::Unconstrained(ident) => {
assert_eq!(ident.text(), "utf8")
}
_ => panic!("Expected simple type"),
}
}
}
_ => unreachable!("Expected ring buffer create"),
}
}
#[test]
fn test_create_transactional_view_with_query() {
let bump = Bump::new();
let source = r#"
create transactional view test::myview{id: int4, name: utf8} as {
from test::users
where age > 18
}
"#;
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::TransactionalView(AstCreateTransactionalView {
view,
columns,
as_clause,
..
}) => {
assert_eq!(view.namespace[0].text(), "test");
assert_eq!(view.name.text(), "myview");
assert_eq!(columns.len(), 2);
assert!(as_clause.is_some());
if let Some(as_statement) = as_clause {
assert!(as_statement.len() > 0);
}
}
_ => unreachable!(),
}
}
#[test]
fn test_create_dictionary_basic() {
let bump = Bump::new();
let source = "CREATE DICTIONARY token_mints FOR Utf8 AS Uint2";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Dictionary(dict) => {
assert!(dict.dictionary.namespace.is_empty());
assert_eq!(dict.dictionary.name.text(), "token_mints");
match &dict.value_type {
AstType::Unconstrained(ty) => assert_eq!(ty.text(), "Utf8"),
_ => panic!("Expected unconstrained type"),
}
match &dict.id_type {
AstType::Unconstrained(ty) => assert_eq!(ty.text(), "Uint2"),
_ => panic!("Expected unconstrained type"),
}
}
_ => unreachable!("Expected Dictionary create"),
}
}
#[test]
fn test_create_dictionary_qualified() {
let bump = Bump::new();
let source = "CREATE DICTIONARY analytics::token_mints FOR Utf8 AS Uint4";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Dictionary(dict) => {
assert_eq!(dict.dictionary.namespace[0].text(), "analytics");
assert_eq!(dict.dictionary.name.text(), "token_mints");
match &dict.value_type {
AstType::Unconstrained(ty) => assert_eq!(ty.text(), "Utf8"),
_ => panic!("Expected unconstrained type"),
}
match &dict.id_type {
AstType::Unconstrained(ty) => assert_eq!(ty.text(), "Uint4"),
_ => panic!("Expected unconstrained type"),
}
}
_ => unreachable!("Expected Dictionary create"),
}
}
#[test]
fn test_create_dictionary_blob_value() {
let bump = Bump::new();
let source = "CREATE DICTIONARY hashes FOR Blob AS Uint8";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Dictionary(dict) => {
assert_eq!(dict.dictionary.name.text(), "hashes");
match &dict.value_type {
AstType::Unconstrained(ty) => assert_eq!(ty.text(), "Blob"),
_ => panic!("Expected unconstrained type"),
}
match &dict.id_type {
AstType::Unconstrained(ty) => assert_eq!(ty.text(), "Uint8"),
_ => panic!("Expected unconstrained type"),
}
}
_ => unreachable!("Expected Dictionary create"),
}
}
#[test]
fn test_create_dictionary_if_not_exists() {
let bump = Bump::new();
let source = "CREATE DICTIONARY IF NOT EXISTS token_mints FOR Utf8 AS Uint4";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Dictionary(dict) => {
assert!(dict.if_not_exists);
assert!(dict.dictionary.namespace.is_empty());
assert_eq!(dict.dictionary.name.text(), "token_mints");
match &dict.value_type {
AstType::Unconstrained(ty) => assert_eq!(ty.text(), "Utf8"),
_ => panic!("Expected unconstrained type"),
}
match &dict.id_type {
AstType::Unconstrained(ty) => assert_eq!(ty.text(), "Uint4"),
_ => panic!("Expected unconstrained type"),
}
}
_ => unreachable!("Expected Dictionary create"),
}
}
#[test]
fn test_create_enum_basic() {
let bump = Bump::new();
let source = "CREATE ENUM Status { Active, Inactive, Pending }";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Enum(AstCreateSumType {
if_not_exists,
name,
variants,
..
}) => {
assert!(!if_not_exists);
assert!(name.namespace.is_empty());
assert_eq!(name.name.text(), "Status");
assert_eq!(variants.len(), 3);
assert_eq!(variants[0].name.text(), "Active");
assert_eq!(variants[1].name.text(), "Inactive");
assert_eq!(variants[2].name.text(), "Pending");
assert!(variants[0].columns.is_empty());
assert!(variants[1].columns.is_empty());
assert!(variants[2].columns.is_empty());
}
_ => unreachable!("Expected Enum create"),
}
}
#[test]
fn test_create_enum_with_fields() {
let bump = Bump::new();
let source =
"CREATE ENUM Shape { Circle { radius: Float8 }, Rectangle { width: Float8, height: Float8 } }";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Enum(AstCreateSumType {
name,
variants,
..
}) => {
assert_eq!(name.name.text(), "Shape");
assert_eq!(variants.len(), 2);
assert_eq!(variants[0].name.text(), "Circle");
assert_eq!(variants[0].columns.len(), 1);
assert_eq!(variants[0].columns[0].name.text(), "radius");
assert_eq!(variants[1].name.text(), "Rectangle");
assert_eq!(variants[1].columns.len(), 2);
assert_eq!(variants[1].columns[0].name.text(), "width");
assert_eq!(variants[1].columns[1].name.text(), "height");
}
_ => unreachable!("Expected Enum create"),
}
}
#[test]
fn test_create_enum_qualified_name() {
let bump = Bump::new();
let source = "CREATE ENUM analytics::Status { Active, Inactive }";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Enum(AstCreateSumType {
name,
variants,
..
}) => {
assert_eq!(name.namespace[0].text(), "analytics");
assert_eq!(name.name.text(), "Status");
assert_eq!(variants.len(), 2);
}
_ => unreachable!("Expected Enum create"),
}
}
#[test]
fn test_create_enum_if_not_exists() {
let bump = Bump::new();
let source = "CREATE ENUM IF NOT EXISTS Status { Active }";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Enum(AstCreateSumType {
if_not_exists,
name,
variants,
..
}) => {
assert!(if_not_exists);
assert!(name.namespace.is_empty());
assert_eq!(name.name.text(), "Status");
assert_eq!(variants.len(), 1);
assert_eq!(variants[0].name.text(), "Active");
}
_ => unreachable!("Expected Enum create"),
}
}
#[test]
fn test_create_subscription_basic() {
let bump = Bump::new();
let source = "CREATE SUBSCRIPTION { id: Int4, name: Utf8 }";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Subscription(AstCreateSubscription {
columns,
..
}) => {
assert_eq!(columns.len(), 2);
{
let col = &columns[0];
assert_eq!(col.name.text(), "id");
match &col.ty {
AstType::Unconstrained(ident) => {
assert_eq!(ident.text(), "Int4")
}
_ => panic!("Expected simple type"),
}
}
{
let col = &columns[1];
assert_eq!(col.name.text(), "name");
match &col.ty {
AstType::Unconstrained(ident) => {
assert_eq!(ident.text(), "Utf8")
}
_ => panic!("Expected simple type"),
}
}
}
_ => unreachable!("Expected Subscription create"),
}
}
#[test]
fn test_create_subscription_single_column() {
let bump = Bump::new();
let source = "CREATE SUBSCRIPTION { value: Float8 }";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Subscription(AstCreateSubscription {
columns,
..
}) => {
assert_eq!(columns.len(), 1);
assert_eq!(columns[0].name.text(), "value");
match &columns[0].ty {
AstType::Unconstrained(ident) => {
assert_eq!(ident.text(), "Float8")
}
_ => panic!("Expected simple type"),
}
}
_ => unreachable!("Expected Subscription create"),
}
}
#[test]
fn test_create_subscription_with_simple_query() {
let bump = Bump::new();
let source = "CREATE SUBSCRIPTION { id: Int4, name: Utf8 } AS { from test::products }";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Subscription(AstCreateSubscription {
columns,
as_clause,
..
}) => {
assert_eq!(columns.len(), 2);
assert_eq!(columns[0].name.text(), "id");
assert_eq!(columns[1].name.text(), "name");
assert!(as_clause.is_some(), "AS clause should be present");
let as_clause = as_clause.as_ref().unwrap();
assert_eq!(as_clause.nodes.len(), 1, "Should have one FROM node");
match &as_clause.nodes[0] {
Ast::From(_) => {
}
_ => panic!("Expected FROM node in AS clause"),
}
}
_ => unreachable!("Expected Subscription create"),
}
}
#[test]
fn test_create_subscription_with_piped_query() {
let bump = Bump::new();
let source = "CREATE SUBSCRIPTION { id: Int4, price: Float8 } AS { from test::products | filter {price > 50} | filter {stock > 0} }";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Subscription(AstCreateSubscription {
columns,
as_clause,
..
}) => {
assert_eq!(columns.len(), 2);
assert!(as_clause.is_some(), "AS clause should be present");
let as_clause = as_clause.as_ref().unwrap();
assert!(as_clause.nodes.len() >= 1, "Should have at least FROM node");
match &as_clause.nodes[0] {
Ast::From(_) => {}
_ => panic!("Expected FROM node as first node in AS clause"),
}
}
_ => unreachable!("Expected Subscription create"),
}
}
#[test]
fn test_create_subscription_without_as_clause() {
let bump = Bump::new();
let source = "CREATE SUBSCRIPTION { value: Float8 }";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Subscription(AstCreateSubscription {
as_clause,
..
}) => {
assert!(as_clause.is_none(), "AS clause should not be present");
}
_ => unreachable!("Expected Subscription create"),
}
}
#[test]
fn test_create_subscription_shapeless() {
let bump = Bump::new();
let source = "CREATE SUBSCRIPTION AS { FROM demo::events }";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Subscription(AstCreateSubscription {
columns,
as_clause,
..
}) => {
assert_eq!(columns.len(), 0, "Shape-less should have no columns");
assert!(as_clause.is_some(), "AS clause should be present");
let as_clause = as_clause.as_ref().unwrap();
assert_eq!(as_clause.nodes.len(), 1, "Should have one FROM node");
match &as_clause.nodes[0] {
Ast::From(_) => {
}
_ => panic!("Expected FROM node in AS clause"),
}
}
_ => unreachable!("Expected Subscription create"),
}
}
#[test]
fn test_create_subscription_shapeless_with_filter() {
let bump = Bump::new();
let source = "CREATE SUBSCRIPTION AS { FROM demo::events | FILTER {id > 1 and id < 3} }";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Subscription(AstCreateSubscription {
columns,
as_clause,
..
}) => {
assert_eq!(columns.len(), 0, "Shape-less should have no columns");
assert!(as_clause.is_some(), "AS clause should be present");
let as_clause = as_clause.as_ref().unwrap();
assert!(as_clause.nodes.len() >= 1, "Should have at least FROM node");
assert!(as_clause.has_pipes, "Should have pipes");
match &as_clause.nodes[0] {
Ast::From(_) => {}
_ => panic!("Expected FROM node as first node"),
}
}
_ => unreachable!("Expected Subscription create"),
}
}
#[test]
fn test_create_subscription_shapeless_missing_as_fails() {
let bump = Bump::new();
let source = "CREATE SUBSCRIPTION";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let result = parser.parse();
assert!(result.is_err(), "Shape-less subscription without AS should fail");
}
#[test]
fn test_create_subscription_backward_compat_with_columns() {
let bump = Bump::new();
let source = "CREATE SUBSCRIPTION { id: Int4 } AS { FROM demo::events }";
let tokens = tokenize(&bump, source).unwrap().into_iter().collect();
let mut parser = Parser::new(&bump, source, tokens);
let mut result = parser.parse().unwrap();
assert_eq!(result.len(), 1);
let result = result.pop().unwrap();
let create = result.first_unchecked().as_create();
match create {
AstCreate::Subscription(AstCreateSubscription {
columns,
as_clause,
..
}) => {
assert_eq!(columns.len(), 1, "Should have one column");
assert_eq!(columns[0].name.text(), "id");
assert!(as_clause.is_some(), "AS clause should be present");
}
_ => unreachable!("Expected Subscription create"),
}
}
}