pub mod alter;
pub mod create;
pub mod drop;
pub mod mutate;
use std::{collections, fmt, iter::once, marker, time::Duration};
use reifydb_catalog::catalog::{
Catalog, subscription::SubscriptionColumnToCreate, table::TableColumnToCreate, view::ViewColumnToCreate,
};
use reifydb_core::{
common::{JoinType, WindowKind},
error::diagnostic::catalog::{
dictionary_not_found, namespace_not_found, ringbuffer_not_found, series_not_found, table_not_found,
},
interface::{
catalog::{
column::{Column, ColumnIndex},
id::{ColumnId, NamespaceId, TableId},
namespace::Namespace,
table::Table,
},
resolved::{
ResolvedColumn, ResolvedDictionary, ResolvedNamespace, ResolvedRingBuffer, ResolvedSchema,
ResolvedSeries, ResolvedTable, ResolvedView,
},
},
sort::SortKey,
};
use reifydb_transaction::transaction::Transaction;
use reifydb_type::{
fragment::Fragment,
return_error,
value::{constraint::TypeConstraint, r#type::Type},
};
use tracing::instrument;
use crate::{
Result,
ast::ast::{AstAlterPolicyAction, AstPolicyScope, AstViewStorageKind},
bump::{Bump, BumpBox, FragmentInterner},
error::RqlError,
expression::{
ConstantExpression, Expression, Expression::Constant, VariableExpression, extract_variable_names,
},
nodes::{
self, AlterSequenceNode, CreateDictionaryNode, CreateNamespaceNode, CreateRingBufferNode,
CreateSumTypeNode, CreateTableNode, DictionaryScanNode, EnvironmentNode, GeneratorNode, IndexScanNode,
InlineDataNode, RingBufferScanNode, RowListLookupNode, RowPointLookupNode, RowRangeScanNode,
SeriesScanNode, TableScanNode, TableVirtualScanNode, VariableNode, ViewScanNode,
},
plan::{
logical,
logical::{
LogicalPlan,
row_predicate::{RowPredicate, extract_row_predicate},
series_predicate::extract_series_predicate,
},
},
};
#[derive(Debug)]
pub enum PhysicalPlan<'bump> {
CreateDeferredView(CreateDeferredViewNode<'bump>),
CreateTransactionalView(CreateTransactionalViewNode<'bump>),
CreateNamespace(CreateNamespaceNode),
CreateRemoteNamespace(nodes::CreateRemoteNamespaceNode),
CreateTable(CreateTableNode),
CreateRingBuffer(CreateRingBufferNode),
CreateDictionary(CreateDictionaryNode),
CreateSumType(CreateSumTypeNode),
CreateSubscription(CreateSubscriptionNode<'bump>),
CreatePrimaryKey(nodes::CreatePrimaryKeyNode),
CreateColumnProperty(nodes::CreateColumnPropertyNode),
CreateProcedure(nodes::CreateProcedureNode),
CreateEvent(nodes::CreateEventNode),
CreateSeries(nodes::CreateSeriesNode),
CreateTag(nodes::CreateTagNode),
CreateSource(nodes::CreateSourceNode),
CreateSink(nodes::CreateSinkNode),
CreateTest(nodes::CreateTestNode),
RunTests(nodes::RunTestsNode),
CreateMigration(nodes::CreateMigrationNode),
Migrate(nodes::MigrateNode),
RollbackMigration(nodes::RollbackMigrationNode),
Dispatch(nodes::DispatchNode),
DropNamespace(nodes::DropNamespaceNode),
DropTable(nodes::DropTableNode),
DropView(nodes::DropViewNode),
DropRingBuffer(nodes::DropRingBufferNode),
DropDictionary(nodes::DropDictionaryNode),
DropSumType(nodes::DropSumTypeNode),
DropSubscription(nodes::DropSubscriptionNode),
DropSeries(nodes::DropSeriesNode),
DropSource(nodes::DropSourceNode),
DropSink(nodes::DropSinkNode),
AlterSequence(AlterSequenceNode),
AlterTable(AlterTableNode<'bump>),
AlterRemoteNamespace(nodes::AlterRemoteNamespaceNode),
Delete(DeleteTableNode<'bump>),
DeleteRingBuffer(DeleteRingBufferNode<'bump>),
DeleteSeries(DeleteSeriesNode<'bump>),
InsertTable(InsertTableNode<'bump>),
InsertRingBuffer(InsertRingBufferNode<'bump>),
InsertDictionary(InsertDictionaryNode<'bump>),
InsertSeries(InsertSeriesNode<'bump>),
Update(UpdateTableNode<'bump>),
UpdateRingBuffer(UpdateRingBufferNode<'bump>),
UpdateSeries(UpdateSeriesNode<'bump>),
Declare(DeclareNode<'bump>),
Assign(AssignNode<'bump>),
Append(AppendPhysicalNode<'bump>),
Variable(VariableNode),
Environment(EnvironmentNode),
Conditional(ConditionalNode<'bump>),
Loop(LoopNode<'bump>),
While(WhileNode<'bump>),
For(ForNode<'bump>),
Break,
Continue,
DefineFunction(DefineFunctionNode<'bump>),
Return(ReturnNode),
CallFunction(CallFunctionNode),
DefineClosure(DefineClosureNode<'bump>),
Aggregate(AggregateNode<'bump>),
Assert(AssertNode<'bump>),
AssertBlock(AssertBlockNode),
Distinct(DistinctNode<'bump>),
Filter(FilterNode<'bump>),
Gate(GateNode<'bump>),
IndexScan(IndexScanNode),
RowPointLookup(RowPointLookupNode),
RowListLookup(RowListLookupNode),
RowRangeScan(RowRangeScanNode),
JoinInner(JoinInnerNode<'bump>),
JoinLeft(JoinLeftNode<'bump>),
JoinNatural(JoinNaturalNode<'bump>),
Take(TakeNode<'bump>),
Sort(SortNode<'bump>),
Map(MapNode<'bump>),
Extend(ExtendNode<'bump>),
Patch(PatchNode<'bump>),
Apply(ApplyNode<'bump>),
InlineData(InlineDataNode),
RemoteScan(nodes::RemoteScanNode),
TableScan(TableScanNode),
TableVirtualScan(TableVirtualScanNode),
ViewScan(ViewScanNode),
RingBufferScan(RingBufferScanNode),
DictionaryScan(DictionaryScanNode),
SeriesScan(SeriesScanNode),
Generator(GeneratorNode),
Window(WindowNode<'bump>),
Scalarize(ScalarizeNode<'bump>),
CreateIdentity(nodes::CreateIdentityNode),
CreateRole(nodes::CreateRoleNode),
Grant(nodes::GrantNode),
Revoke(nodes::RevokeNode),
DropIdentity(nodes::DropIdentityNode),
DropRole(nodes::DropRoleNode),
CreateAuthentication(nodes::CreateAuthenticationNode),
DropAuthentication(nodes::DropAuthenticationNode),
CreatePolicy(nodes::CreatePolicyNode),
AlterPolicy(nodes::AlterPolicyNode),
DropPolicy(nodes::DropPolicyNode),
}
#[derive(Debug)]
pub struct CreateDeferredViewNode<'bump> {
pub namespace: Namespace,
pub view: Fragment,
pub if_not_exists: bool,
pub columns: Vec<ViewColumnToCreate>,
pub as_clause: BumpBox<'bump, PhysicalPlan<'bump>>,
pub storage_kind: AstViewStorageKind,
pub tick: Option<Duration>,
}
#[derive(Debug)]
pub struct CreateTransactionalViewNode<'bump> {
pub namespace: Namespace,
pub view: Fragment,
pub if_not_exists: bool,
pub columns: Vec<ViewColumnToCreate>,
pub as_clause: BumpBox<'bump, PhysicalPlan<'bump>>,
pub storage_kind: AstViewStorageKind,
pub tick: Option<Duration>,
}
#[derive(Debug)]
pub struct CreateSubscriptionNode<'bump> {
pub columns: Vec<SubscriptionColumnToCreate>,
pub as_clause: Option<BumpBox<'bump, PhysicalPlan<'bump>>>,
}
#[derive(Debug)]
pub struct AlterTableNode<'bump> {
pub namespace: ResolvedNamespace,
pub table: Fragment,
pub action: AlterTableAction,
pub _phantom: marker::PhantomData<&'bump ()>,
}
#[derive(Debug)]
pub enum AlterTableAction {
AddColumn {
column: TableColumnToCreate,
},
DropColumn {
column: Fragment,
},
RenameColumn {
old_name: Fragment,
new_name: Fragment,
},
}
#[derive(Debug)]
pub struct DeleteTableNode<'bump> {
pub input: Option<BumpBox<'bump, PhysicalPlan<'bump>>>,
pub target: Option<ResolvedTable>,
pub returning: Option<Vec<Expression>>,
}
#[derive(Debug)]
pub struct DeleteRingBufferNode<'bump> {
pub input: Option<BumpBox<'bump, PhysicalPlan<'bump>>>,
pub target: ResolvedRingBuffer,
pub returning: Option<Vec<Expression>>,
}
#[derive(Debug)]
pub struct InsertTableNode<'bump> {
pub input: BumpBox<'bump, PhysicalPlan<'bump>>,
pub target: ResolvedTable,
pub returning: Option<Vec<Expression>>,
}
#[derive(Debug)]
pub struct InsertRingBufferNode<'bump> {
pub input: BumpBox<'bump, PhysicalPlan<'bump>>,
pub target: ResolvedRingBuffer,
pub returning: Option<Vec<Expression>>,
}
#[derive(Debug)]
pub struct InsertDictionaryNode<'bump> {
pub input: BumpBox<'bump, PhysicalPlan<'bump>>,
pub target: ResolvedDictionary,
pub returning: Option<Vec<Expression>>,
}
#[derive(Debug)]
pub struct InsertSeriesNode<'bump> {
pub input: BumpBox<'bump, PhysicalPlan<'bump>>,
pub target: ResolvedSeries,
pub returning: Option<Vec<Expression>>,
}
#[derive(Debug)]
pub struct DeleteSeriesNode<'bump> {
pub input: Option<BumpBox<'bump, PhysicalPlan<'bump>>>,
pub target: ResolvedSeries,
pub returning: Option<Vec<Expression>>,
}
#[derive(Debug)]
pub struct UpdateTableNode<'bump> {
pub input: BumpBox<'bump, PhysicalPlan<'bump>>,
pub target: Option<ResolvedTable>,
pub returning: Option<Vec<Expression>>,
}
#[derive(Debug)]
pub struct UpdateRingBufferNode<'bump> {
pub input: BumpBox<'bump, PhysicalPlan<'bump>>,
pub target: ResolvedRingBuffer,
pub returning: Option<Vec<Expression>>,
}
#[derive(Debug)]
pub struct UpdateSeriesNode<'bump> {
pub input: BumpBox<'bump, PhysicalPlan<'bump>>,
pub target: ResolvedSeries,
pub returning: Option<Vec<Expression>>,
}
#[derive(Debug)]
pub enum LetValue<'bump> {
Expression(Expression),
Statement(BumpBox<'bump, PhysicalPlan<'bump>>),
EmptyFrame,
}
impl fmt::Display for LetValue<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
LetValue::Expression(expr) => write!(f, "{}", expr),
LetValue::Statement(plan) => write!(f, "Statement({:?})", plan),
LetValue::EmptyFrame => write!(f, "EmptyFrame"),
}
}
}
#[derive(Debug)]
pub struct DeclareNode<'bump> {
pub name: Fragment,
pub value: LetValue<'bump>,
}
#[derive(Debug)]
pub enum AssignValue<'bump> {
Expression(Expression),
Statement(BumpBox<'bump, PhysicalPlan<'bump>>),
}
impl fmt::Display for AssignValue<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AssignValue::Expression(expr) => write!(f, "{}", expr),
AssignValue::Statement(plan) => write!(f, "Statement({:?})", plan),
}
}
}
#[derive(Debug)]
pub struct AssignNode<'bump> {
pub name: Fragment,
pub value: AssignValue<'bump>,
}
#[derive(Debug)]
pub enum AppendPhysicalNode<'bump> {
IntoVariable {
target: Fragment,
source: AppendPhysicalSource<'bump>,
},
Query {
left: BumpBox<'bump, PhysicalPlan<'bump>>,
right: BumpBox<'bump, PhysicalPlan<'bump>>,
},
}
#[derive(Debug)]
pub enum AppendPhysicalSource<'bump> {
Statement(Vec<PhysicalPlan<'bump>>),
Inline(InlineDataNode),
}
#[derive(Debug)]
pub struct ConditionalNode<'bump> {
pub condition: Expression,
pub then_branch: BumpBox<'bump, PhysicalPlan<'bump>>,
pub else_ifs: Vec<ElseIfBranch<'bump>>,
pub else_branch: Option<BumpBox<'bump, PhysicalPlan<'bump>>>,
}
#[derive(Debug)]
pub struct ElseIfBranch<'bump> {
pub condition: Expression,
pub then_branch: BumpBox<'bump, PhysicalPlan<'bump>>,
}
#[derive(Debug)]
pub struct LoopNode<'bump> {
pub body: Vec<PhysicalPlan<'bump>>,
}
#[derive(Debug)]
pub struct WhileNode<'bump> {
pub condition: Expression,
pub body: Vec<PhysicalPlan<'bump>>,
}
#[derive(Debug)]
pub struct ForNode<'bump> {
pub variable_name: Fragment,
pub iterable: BumpBox<'bump, PhysicalPlan<'bump>>,
pub body: Vec<PhysicalPlan<'bump>>,
}
#[derive(Debug)]
pub struct DefineFunctionNode<'bump> {
pub name: Fragment,
pub parameters: Vec<nodes::FunctionParameter>,
pub return_type: Option<TypeConstraint>,
pub body: Vec<PhysicalPlan<'bump>>,
}
#[derive(Debug)]
pub struct ReturnNode {
pub value: Option<Expression>,
}
#[derive(Debug)]
pub struct CallFunctionNode {
pub name: Fragment,
pub arguments: Vec<Expression>,
pub is_procedure_call: bool,
}
#[derive(Debug)]
pub struct DefineClosureNode<'bump> {
pub parameters: Vec<nodes::FunctionParameter>,
pub body: Vec<PhysicalPlan<'bump>>,
}
#[derive(Debug)]
pub struct AggregateNode<'bump> {
pub input: BumpBox<'bump, PhysicalPlan<'bump>>,
pub by: Vec<Expression>,
pub map: Vec<Expression>,
}
#[derive(Debug)]
pub struct DistinctNode<'bump> {
pub input: BumpBox<'bump, PhysicalPlan<'bump>>,
pub columns: Vec<ResolvedColumn>,
}
#[derive(Debug)]
pub struct AssertNode<'bump> {
pub input: Option<BumpBox<'bump, PhysicalPlan<'bump>>>,
pub conditions: Vec<Expression>,
pub message: Option<String>,
}
#[derive(Debug)]
pub struct AssertBlockNode {
pub rql: String,
pub expect_error: bool,
pub message: Option<String>,
}
#[derive(Debug)]
pub struct FilterNode<'bump> {
pub input: BumpBox<'bump, PhysicalPlan<'bump>>,
pub conditions: Vec<Expression>,
}
#[derive(Debug)]
pub struct GateNode<'bump> {
pub input: BumpBox<'bump, PhysicalPlan<'bump>>,
pub conditions: Vec<Expression>,
}
#[derive(Debug)]
pub struct JoinInnerNode<'bump> {
pub left: BumpBox<'bump, PhysicalPlan<'bump>>,
pub right: BumpBox<'bump, PhysicalPlan<'bump>>,
pub on: Vec<Expression>,
pub alias: Option<Fragment>,
}
#[derive(Debug)]
pub struct JoinLeftNode<'bump> {
pub left: BumpBox<'bump, PhysicalPlan<'bump>>,
pub right: BumpBox<'bump, PhysicalPlan<'bump>>,
pub on: Vec<Expression>,
pub alias: Option<Fragment>,
}
#[derive(Debug)]
pub struct JoinNaturalNode<'bump> {
pub left: BumpBox<'bump, PhysicalPlan<'bump>>,
pub right: BumpBox<'bump, PhysicalPlan<'bump>>,
pub join_type: JoinType,
pub alias: Option<Fragment>,
}
#[derive(Debug)]
pub struct TakeNode<'bump> {
pub input: BumpBox<'bump, PhysicalPlan<'bump>>,
pub take: nodes::TakeLimit,
}
#[derive(Debug)]
pub struct SortNode<'bump> {
pub input: BumpBox<'bump, PhysicalPlan<'bump>>,
pub by: Vec<SortKey>,
}
#[derive(Debug)]
pub struct MapNode<'bump> {
pub input: Option<BumpBox<'bump, PhysicalPlan<'bump>>>,
pub map: Vec<Expression>,
}
#[derive(Debug)]
pub struct ExtendNode<'bump> {
pub input: Option<BumpBox<'bump, PhysicalPlan<'bump>>>,
pub extend: Vec<Expression>,
}
#[derive(Debug)]
pub struct PatchNode<'bump> {
pub input: Option<BumpBox<'bump, PhysicalPlan<'bump>>>,
pub assignments: Vec<Expression>,
}
#[derive(Debug)]
pub struct ApplyNode<'bump> {
pub input: Option<BumpBox<'bump, PhysicalPlan<'bump>>>,
pub operator: Fragment,
pub expressions: Vec<Expression>,
}
#[derive(Debug)]
pub struct WindowNode<'bump> {
pub input: Option<BumpBox<'bump, PhysicalPlan<'bump>>>,
pub kind: WindowKind,
pub group_by: Vec<Expression>,
pub aggregations: Vec<Expression>,
pub ts: Option<String>,
}
#[derive(Debug)]
pub struct ScalarizeNode<'bump> {
pub input: BumpBox<'bump, PhysicalPlan<'bump>>,
pub fragment: Fragment,
}
pub(crate) struct Compiler<'bump> {
pub catalog: Catalog,
pub interner: FragmentInterner,
pub bump: &'bump Bump,
}
#[instrument(name = "rql::compile::physical", level = "trace", skip(bump, catalog, rx, logical))]
pub fn compile_physical<'b>(
bump: &'b Bump,
catalog: &Catalog,
rx: &mut Transaction<'_>,
logical: impl IntoIterator<Item = LogicalPlan<'b>>,
) -> Result<Option<PhysicalPlan<'b>>> {
Compiler {
catalog: catalog.clone(),
interner: FragmentInterner::new(),
bump,
}
.compile(rx, logical)
}
impl<'bump> Compiler<'bump> {
fn bump_box(&self, plan: PhysicalPlan<'bump>) -> BumpBox<'bump, PhysicalPlan<'bump>> {
BumpBox::new_in(plan, self.bump)
}
pub fn compile(
&mut self,
rx: &mut Transaction<'_>,
logical: impl IntoIterator<Item = LogicalPlan<'bump>>,
) -> Result<Option<PhysicalPlan<'bump>>> {
let mut stack: Vec<PhysicalPlan<'bump>> = Vec::new();
for plan in logical {
match plan {
LogicalPlan::Aggregate(aggregate) => {
let input = stack.pop().unwrap();
let mut vars = Vec::new();
for expr in aggregate.by.iter().chain(aggregate.map.iter()) {
vars.extend(extract_variable_names(expr));
}
if let Some(pushed) = try_remote_push_down_with_vars(
&input,
|| Some(aggregate.rql.clone()),
vars,
) {
stack.push(pushed);
continue;
}
stack.push(PhysicalPlan::Aggregate(AggregateNode {
by: aggregate.by,
map: aggregate.map,
input: self.bump_box(input),
}));
}
LogicalPlan::CreateNamespace(create) => {
stack.push(self.compile_create_namespace(rx, create)?);
}
LogicalPlan::CreateRemoteNamespace(create) => {
stack.push(self.compile_create_remote_namespace(rx, create)?);
}
LogicalPlan::CreateTable(create) => {
stack.push(self.compile_create_table(rx, create)?);
}
LogicalPlan::CreateRingBuffer(create) => {
stack.push(self.compile_create_ringbuffer(rx, create)?);
}
LogicalPlan::CreateDeferredView(create) => {
stack.push(self.compile_create_deferred(rx, create)?);
}
LogicalPlan::CreateTransactionalView(create) => {
stack.push(self.compile_create_transactional(rx, create)?);
}
LogicalPlan::CreateDictionary(create) => {
stack.push(self.compile_create_dictionary(rx, create)?);
}
LogicalPlan::CreateSumType(create) => {
stack.push(self.compile_create_sumtype(rx, create)?);
}
LogicalPlan::CreateSubscription(create) => {
stack.push(self.compile_create_subscription(rx, create)?);
}
LogicalPlan::AlterSequence(alter) => {
stack.push(self.compile_alter_sequence(rx, alter)?);
}
LogicalPlan::CreatePrimaryKey(create) => {
stack.push(self.compile_create_primary_key(rx, create)?);
}
LogicalPlan::CreateColumnProperty(create) => {
stack.push(self.compile_create_column_property(rx, create)?);
}
LogicalPlan::CreateProcedure(create) => {
stack.push(self.compile_create_procedure(rx, create)?);
}
LogicalPlan::CreateTest(create) => {
stack.push(self.compile_create_test(rx, create)?);
}
LogicalPlan::RunTests(scope) => match scope {
logical::RunTestsNode::All => {
stack.push(PhysicalPlan::RunTests(nodes::RunTestsNode {
scope: nodes::RunTestsScope::All,
}));
}
logical::RunTestsNode::Namespace(ns) => {
let ns_segments: Vec<&str> =
ns.segments.iter().map(|n| n.text()).collect();
let namespace_name = ns_segments.join("::");
let Some(namespace_def) =
self.catalog.find_namespace_by_segments(rx, &ns_segments)?
else {
let ns_fragment = if let Some(n) = ns.segments.first() {
let interned = self.interner.intern_fragment(n);
interned.with_text(&namespace_name)
} else {
Fragment::internal("default".to_string())
};
return_error!(namespace_not_found(
ns_fragment,
&namespace_name
));
};
let namespace_id = if let Some(n) = ns.segments.first() {
let interned = self.interner.intern_fragment(n);
interned.with_text(&namespace_name)
} else {
Fragment::internal(namespace_name)
};
let resolved_namespace =
ResolvedNamespace::new(namespace_id, namespace_def);
stack.push(PhysicalPlan::RunTests(nodes::RunTestsNode {
scope: nodes::RunTestsScope::Namespace(resolved_namespace),
}));
}
logical::RunTestsNode::Single(test) => {
let ns_segments: Vec<&str> =
test.namespace.iter().map(|n| n.text()).collect();
let namespace_name = if ns_segments.is_empty() {
"default".to_string()
} else {
ns_segments.join("::")
};
let Some(namespace_def) =
self.catalog.find_namespace_by_segments(rx, &ns_segments)?
else {
let ns_fragment = if let Some(n) = test.namespace.first() {
let interned = self.interner.intern_fragment(n);
interned.with_text(&namespace_name)
} else {
Fragment::internal("default".to_string())
};
return_error!(namespace_not_found(
ns_fragment,
&namespace_name
));
};
let namespace_id = if let Some(n) = test.namespace.first() {
let interned = self.interner.intern_fragment(n);
interned.with_text(&namespace_name)
} else {
Fragment::internal(namespace_name)
};
let resolved_namespace =
ResolvedNamespace::new(namespace_id, namespace_def);
stack.push(PhysicalPlan::RunTests(nodes::RunTestsNode {
scope: nodes::RunTestsScope::Single(
resolved_namespace,
self.interner
.intern_fragment(&test.name)
.text()
.to_string(),
),
}));
}
},
LogicalPlan::CreateSeries(create) => {
stack.push(self.compile_create_series(rx, create)?);
}
LogicalPlan::CreateEvent(create) => {
stack.push(self.compile_create_event(rx, create)?);
}
LogicalPlan::CreateTag(create) => {
stack.push(self.compile_create_tag(rx, create)?);
}
LogicalPlan::CreateSource(create) => {
stack.push(self.compile_create_source(rx, create)?);
}
LogicalPlan::CreateSink(create) => {
stack.push(self.compile_create_sink(rx, create)?);
}
LogicalPlan::CreateMigration(create) => {
stack.push(PhysicalPlan::CreateMigration(nodes::CreateMigrationNode {
name: create.name,
body_source: create.body_source,
rollback_body_source: create.rollback_body_source,
}));
}
LogicalPlan::Migrate(node) => {
stack.push(PhysicalPlan::Migrate(nodes::MigrateNode {
target: node.target,
}));
}
LogicalPlan::RollbackMigration(node) => {
stack.push(PhysicalPlan::RollbackMigration(nodes::RollbackMigrationNode {
target: node.target,
}));
}
LogicalPlan::Dispatch(dispatch) => {
stack.push(self.compile_dispatch(rx, dispatch)?);
}
LogicalPlan::AlterTable(alter) => {
stack.push(self.compile_alter_table(rx, alter)?);
}
LogicalPlan::AlterRemoteNamespace(alter) => {
let ns_name: String =
alter.namespace.iter().map(|s| s.text()).collect::<Vec<_>>().join("::");
let namespace = Fragment::internal(ns_name);
stack.push(PhysicalPlan::AlterRemoteNamespace(
nodes::AlterRemoteNamespaceNode {
namespace,
grpc: self.interner.intern_fragment(&alter.grpc),
},
));
}
LogicalPlan::DropNamespace(drop) => {
stack.push(self.compile_drop_namespace(rx, drop)?);
}
LogicalPlan::DropTable(drop) => {
stack.push(self.compile_drop_table(rx, drop)?);
}
LogicalPlan::DropView(drop) => {
stack.push(self.compile_drop_view(rx, drop)?);
}
LogicalPlan::DropRingBuffer(drop) => {
stack.push(self.compile_drop_ringbuffer(rx, drop)?);
}
LogicalPlan::DropDictionary(drop) => {
stack.push(self.compile_drop_dictionary(rx, drop)?);
}
LogicalPlan::DropSumType(drop) => {
stack.push(self.compile_drop_sumtype(rx, drop)?);
}
LogicalPlan::DropSubscription(drop) => {
stack.push(self.compile_drop_subscription(rx, drop)?);
}
LogicalPlan::DropSeries(drop) => {
stack.push(self.compile_drop_series(rx, drop)?);
}
LogicalPlan::DropSource(drop) => {
stack.push(self.compile_drop_source(rx, drop)?);
}
LogicalPlan::DropSink(drop) => {
stack.push(self.compile_drop_sink(rx, drop)?);
}
LogicalPlan::CreateIdentity(node) => {
stack.push(PhysicalPlan::CreateIdentity(nodes::CreateIdentityNode {
name: self.interner.intern_fragment(&node.name),
}));
}
LogicalPlan::CreateRole(node) => {
stack.push(PhysicalPlan::CreateRole(nodes::CreateRoleNode {
name: self.interner.intern_fragment(&node.name),
}));
}
LogicalPlan::Grant(node) => {
stack.push(PhysicalPlan::Grant(nodes::GrantNode {
role: self.interner.intern_fragment(&node.role),
user: self.interner.intern_fragment(&node.user),
}));
}
LogicalPlan::Revoke(node) => {
stack.push(PhysicalPlan::Revoke(nodes::RevokeNode {
role: self.interner.intern_fragment(&node.role),
user: self.interner.intern_fragment(&node.user),
}));
}
LogicalPlan::DropIdentity(node) => {
stack.push(PhysicalPlan::DropIdentity(nodes::DropIdentityNode {
name: self.interner.intern_fragment(&node.name),
if_exists: node.if_exists,
}));
}
LogicalPlan::DropRole(node) => {
stack.push(PhysicalPlan::DropRole(nodes::DropRoleNode {
name: self.interner.intern_fragment(&node.name),
if_exists: node.if_exists,
}));
}
LogicalPlan::CreateAuthentication(node) => {
let mut method_str = String::new();
let mut config = collections::HashMap::new();
for entry in &node.entries {
let key = entry.key.text().to_string();
let value = entry.value.value().to_string();
if key == "method" {
method_str = value;
} else {
config.insert(key, value);
}
}
stack.push(PhysicalPlan::CreateAuthentication(
nodes::CreateAuthenticationNode {
user: self.interner.intern_fragment(&node.user),
method: Fragment::internal(&method_str),
config,
},
));
}
LogicalPlan::DropAuthentication(node) => {
stack.push(PhysicalPlan::DropAuthentication(nodes::DropAuthenticationNode {
user: self.interner.intern_fragment(&node.user),
method: self.interner.intern_fragment(&node.method),
if_exists: node.if_exists,
}));
}
LogicalPlan::CreatePolicy(node) => {
let name = node.name.map(|n| self.interner.intern_fragment(&n));
let target_type = node.target_type.as_str().to_string();
let (scope_namespace, scope_object) = match &node.scope {
AstPolicyScope::Specific(segments) => {
if segments.len() >= 2 {
let seg_strs: Vec<&str> =
segments.iter().map(|s| s.text()).collect();
let full_path = seg_strs.join("::");
if self.catalog
.find_namespace_by_segments(rx, &seg_strs)?
.is_some()
{
let ns_fragment = self
.interner
.intern_fragment(&segments[0]);
(Some(ns_fragment.with_text(&full_path)), None)
} else {
let ns_name = segments[..segments.len() - 1]
.iter()
.map(|s| s.text())
.collect::<Vec<_>>()
.join("::");
let ns_fragment = self
.interner
.intern_fragment(&segments[0]);
(
Some(ns_fragment.with_text(&ns_name)),
Some(self.interner.intern_fragment(
&segments[segments.len() - 1],
)),
)
}
} else if segments.len() == 1 {
(
Some(self
.interner
.intern_fragment(&segments[0])),
None,
)
} else {
(None, None)
}
}
AstPolicyScope::NamespaceWide(ns) => {
(Some(self.interner.intern_fragment(ns)), None)
}
AstPolicyScope::Global => (None, None),
};
let operations = node
.operations
.iter()
.map(|op| nodes::PolicyOperationNode {
operation: op.operation.text().to_string(),
body_source: op.body_source.clone(),
})
.collect();
stack.push(PhysicalPlan::CreatePolicy(nodes::CreatePolicyNode {
name,
target_type,
scope_namespace,
scope_object,
operations,
}));
}
LogicalPlan::AlterPolicy(node) => {
let enable = node.action == AstAlterPolicyAction::Enable;
stack.push(PhysicalPlan::AlterPolicy(nodes::AlterPolicyNode {
target_type: node.target_type.as_str().to_string(),
name: self.interner.intern_fragment(&node.name),
enable,
}));
}
LogicalPlan::DropPolicy(node) => {
stack.push(PhysicalPlan::DropPolicy(nodes::DropPolicyNode {
target_type: node.target_type.as_str().to_string(),
name: self.interner.intern_fragment(&node.name),
if_exists: node.if_exists,
}));
}
LogicalPlan::Assert(assert_node) => {
let input = stack.pop().map(|p| self.bump_box(p));
if let Some(ref boxed_input) = input {
let vars = extract_variable_names(&assert_node.condition);
if let Some(pushed) = try_remote_push_down_with_vars(
boxed_input,
|| Some(assert_node.rql.clone()),
vars,
) {
stack.push(pushed);
continue;
}
}
stack.push(PhysicalPlan::Assert(AssertNode {
conditions: vec![assert_node.condition],
message: assert_node.message,
input,
}));
}
LogicalPlan::AssertBlock(node) => {
stack.push(PhysicalPlan::AssertBlock(AssertBlockNode {
rql: node.rql,
expect_error: node.expect_error,
message: node.message,
}));
}
LogicalPlan::Filter(filter) => {
let input = stack.pop().unwrap();
let vars = extract_variable_names(&filter.condition);
if let Some(pushed) = try_remote_push_down_with_vars(
&input,
|| Some(filter.rql.clone()),
vars,
) {
stack.push(pushed);
continue;
}
if let Some(predicate) = extract_row_predicate(&filter.condition) {
let source = match &input {
PhysicalPlan::TableScan(scan) => {
Some(ResolvedSchema::Table(scan.source.clone()))
}
PhysicalPlan::ViewScan(scan) => {
Some(ResolvedSchema::View(scan.source.clone()))
}
PhysicalPlan::RingBufferScan(scan) => {
Some(ResolvedSchema::RingBuffer(scan.source.clone()))
}
_ => None,
};
if let Some(source) = source {
match predicate {
RowPredicate::Point(row_number) => {
stack.push(PhysicalPlan::RowPointLookup(
RowPointLookupNode {
source,
row_number,
},
));
continue;
}
RowPredicate::List(row_numbers) => {
stack.push(PhysicalPlan::RowListLookup(
RowListLookupNode {
source,
row_numbers,
},
));
continue;
}
RowPredicate::Range {
start,
end,
} => {
stack.push(PhysicalPlan::RowRangeScan(
RowRangeScanNode {
source,
start,
end,
},
));
continue;
}
}
}
}
if let PhysicalPlan::SeriesScan(ref scan) = input {
let key_col_name = scan.source.def().key.column();
if let Some(sp) =
extract_series_predicate(&filter.condition, key_col_name)
{
let rewritten = PhysicalPlan::SeriesScan(SeriesScanNode {
source: scan.source.clone(),
key_range_start: sp.key_start.or(scan.key_range_start),
key_range_end: sp.key_end.or(scan.key_range_end),
variant_tag: sp.variant_tag.or(scan.variant_tag),
});
if sp.remaining.is_empty() {
stack.push(rewritten);
} else {
stack.push(PhysicalPlan::Filter(FilterNode {
conditions: sp.remaining,
input: self.bump_box(rewritten),
}));
}
continue;
}
}
stack.push(PhysicalPlan::Filter(FilterNode {
conditions: vec![filter.condition],
input: self.bump_box(input),
}));
}
LogicalPlan::Gate(gate) => {
let input = stack.pop().unwrap();
let vars = extract_variable_names(&gate.condition);
if let Some(pushed) =
try_remote_push_down_with_vars(&input, || Some(gate.rql.clone()), vars)
{
stack.push(pushed);
continue;
}
stack.push(PhysicalPlan::Gate(GateNode {
conditions: vec![gate.condition],
input: self.bump_box(input),
}));
}
LogicalPlan::InlineData(inline) => {
stack.push(PhysicalPlan::InlineData(InlineDataNode {
rows: inline.rows,
}));
}
LogicalPlan::Generator(generator) => {
stack.push(PhysicalPlan::Generator(GeneratorNode {
name: self.interner.intern_fragment(&generator.name),
expressions: generator.expressions,
}));
}
LogicalPlan::DeleteTable(delete) => {
let input = if let Some(delete_input) = delete.input {
let sub_plan = self
.compile(rx, once(BumpBox::into_inner(delete_input)))?
.expect("Delete input must produce a plan");
Some(self.bump_box(sub_plan))
} else {
stack.pop().map(|i| self.bump_box(i))
};
let target = if let Some(table_id) = delete.target {
let ns_segments: Vec<&str> =
table_id.namespace.iter().map(|n| n.text()).collect();
let namespace = self
.catalog
.find_namespace_by_segments(rx, &ns_segments)?
.unwrap();
let Some(table_def) = self.catalog.find_table_by_name(
rx,
namespace.id(),
table_id.name.text(),
)?
else {
return_error!(table_not_found(
self.interner.intern_fragment(&table_id.name),
namespace.name(),
table_id.name.text()
));
};
let namespace_id = if let Some(n) = table_id.namespace.first() {
let interned = self.interner.intern_fragment(n);
interned.with_text(namespace.name())
} else {
Fragment::internal(namespace.name().to_string())
};
let resolved_namespace =
ResolvedNamespace::new(namespace_id, namespace);
Some(ResolvedTable::new(
self.interner.intern_fragment(&table_id.name),
resolved_namespace,
table_def,
))
} else {
None
};
stack.push(PhysicalPlan::Delete(DeleteTableNode {
input,
target,
returning: delete.returning,
}))
}
LogicalPlan::DeleteRingBuffer(delete) => {
let input = if let Some(delete_input) = delete.input {
let sub_plan = self
.compile(rx, once(BumpBox::into_inner(delete_input)))?
.expect("Delete input must produce a plan");
Some(self.bump_box(sub_plan))
} else {
stack.pop().map(|i| self.bump_box(i))
};
let ringbuffer_id = delete.target;
let ns_segments: Vec<&str> =
ringbuffer_id.namespace.iter().map(|n| n.text()).collect();
let Some(namespace) =
self.catalog.find_namespace_by_segments(rx, &ns_segments)?
else {
let ns_name = ns_segments.join("::");
let fragment = ringbuffer_id
.namespace
.first()
.map(|n| self.interner.intern_fragment(n))
.unwrap_or_else(|| Fragment::internal(&ns_name));
return_error!(namespace_not_found(fragment, &ns_name));
};
let Some(ringbuffer_def) = self.catalog.find_ringbuffer_by_name(
rx,
namespace.id(),
ringbuffer_id.name.text(),
)?
else {
return_error!(ringbuffer_not_found(
self.interner.intern_fragment(&ringbuffer_id.name),
namespace.name(),
ringbuffer_id.name.text()
));
};
let namespace_id = if let Some(n) = ringbuffer_id.namespace.first() {
let interned = self.interner.intern_fragment(n);
interned.with_text(namespace.name())
} else {
Fragment::internal(namespace.name().to_string())
};
let resolved_namespace = ResolvedNamespace::new(namespace_id, namespace);
let target = ResolvedRingBuffer::new(
self.interner.intern_fragment(&ringbuffer_id.name),
resolved_namespace,
ringbuffer_def,
);
stack.push(PhysicalPlan::DeleteRingBuffer(DeleteRingBufferNode {
input,
target,
returning: delete.returning,
}))
}
LogicalPlan::InsertTable(insert) => {
let input = self
.compile(rx, once(BumpBox::into_inner(insert.source)))?
.expect("Insert source must produce a plan");
let table = insert.target;
let ns_segments: Vec<&str> = table.namespace.iter().map(|n| n.text()).collect();
let Some(namespace) =
self.catalog.find_namespace_by_segments(rx, &ns_segments)?
else {
let ns_name = ns_segments.join("::");
let fragment = table
.namespace
.first()
.map(|n| self.interner.intern_fragment(n))
.unwrap_or_else(|| Fragment::internal(&ns_name));
return_error!(namespace_not_found(fragment, &ns_name));
};
let Some(table_def) = self.catalog.find_table_by_name(
rx,
namespace.id(),
table.name.text(),
)?
else {
return_error!(table_not_found(
self.interner.intern_fragment(&table.name),
namespace.name(),
table.name.text()
));
};
let namespace_id = if let Some(n) = table.namespace.first() {
let interned = self.interner.intern_fragment(n);
interned.with_text(namespace.name())
} else {
Fragment::internal(namespace.name().to_string())
};
let resolved_namespace = ResolvedNamespace::new(namespace_id, namespace);
let target = ResolvedTable::new(
self.interner.intern_fragment(&table.name),
resolved_namespace,
table_def,
);
stack.push(PhysicalPlan::InsertTable(InsertTableNode {
input: self.bump_box(input),
target,
returning: insert.returning,
}))
}
LogicalPlan::InsertRingBuffer(insert_rb) => {
let input = self
.compile(rx, once(BumpBox::into_inner(insert_rb.source)))?
.expect("Insert source must produce a plan");
let ringbuffer_id = insert_rb.target;
let ns_segments: Vec<&str> =
ringbuffer_id.namespace.iter().map(|n| n.text()).collect();
let Some(namespace) =
self.catalog.find_namespace_by_segments(rx, &ns_segments)?
else {
let ns_name = ns_segments.join("::");
let fragment = ringbuffer_id
.namespace
.first()
.map(|n| self.interner.intern_fragment(n))
.unwrap_or_else(|| Fragment::internal(&ns_name));
return_error!(namespace_not_found(fragment, &ns_name));
};
let Some(ringbuffer_def) = self.catalog.find_ringbuffer_by_name(
rx,
namespace.id(),
ringbuffer_id.name.text(),
)?
else {
return_error!(ringbuffer_not_found(
self.interner.intern_fragment(&ringbuffer_id.name),
namespace.name(),
ringbuffer_id.name.text()
));
};
let namespace_id = if let Some(n) = ringbuffer_id.namespace.first() {
let interned = self.interner.intern_fragment(n);
interned.with_text(namespace.name())
} else {
Fragment::internal(namespace.name().to_string())
};
let resolved_namespace = ResolvedNamespace::new(namespace_id, namespace);
let target = ResolvedRingBuffer::new(
self.interner.intern_fragment(&ringbuffer_id.name),
resolved_namespace,
ringbuffer_def,
);
stack.push(PhysicalPlan::InsertRingBuffer(InsertRingBufferNode {
input: self.bump_box(input),
target,
returning: insert_rb.returning,
}))
}
LogicalPlan::InsertDictionary(insert_dict) => {
let input = self
.compile(rx, once(BumpBox::into_inner(insert_dict.source)))?
.expect("Insert source must produce a plan");
let dictionary_id = insert_dict.target;
let ns_segments: Vec<&str> =
dictionary_id.namespace.iter().map(|n| n.text()).collect();
let Some(namespace) =
self.catalog.find_namespace_by_segments(rx, &ns_segments)?
else {
let ns_name = ns_segments.join("::");
let fragment = dictionary_id
.namespace
.first()
.map(|n| self.interner.intern_fragment(n))
.unwrap_or_else(|| Fragment::internal(&ns_name));
return_error!(namespace_not_found(fragment, &ns_name));
};
let Some(dictionary_def) = self.catalog.find_dictionary_by_name(
rx,
namespace.id(),
dictionary_id.name.text(),
)?
else {
return_error!(dictionary_not_found(
self.interner.intern_fragment(&dictionary_id.name),
namespace.name(),
dictionary_id.name.text()
));
};
let namespace_id = if let Some(n) = dictionary_id.namespace.first() {
let interned = self.interner.intern_fragment(n);
interned.with_text(namespace.name())
} else {
Fragment::internal(namespace.name().to_string())
};
let resolved_namespace = ResolvedNamespace::new(namespace_id, namespace);
let target = ResolvedDictionary::new(
self.interner.intern_fragment(&dictionary_id.name),
resolved_namespace,
dictionary_def,
);
stack.push(PhysicalPlan::InsertDictionary(InsertDictionaryNode {
input: self.bump_box(input),
target,
returning: insert_dict.returning,
}))
}
LogicalPlan::InsertSeries(insert_series) => {
let input = self
.compile(rx, once(BumpBox::into_inner(insert_series.source)))?
.expect("Insert source must produce a plan");
let series_id = insert_series.target;
let ns_segments: Vec<&str> =
series_id.namespace.iter().map(|n| n.text()).collect();
let Some(namespace) =
self.catalog.find_namespace_by_segments(rx, &ns_segments)?
else {
let ns_name = ns_segments.join("::");
let fragment = series_id
.namespace
.first()
.map(|n| self.interner.intern_fragment(n))
.unwrap_or_else(|| Fragment::internal(&ns_name));
return_error!(namespace_not_found(fragment, &ns_name));
};
let Some(series_def) = self.catalog.find_series_by_name(
rx,
namespace.id(),
series_id.name.text(),
)?
else {
return_error!(series_not_found(
self.interner.intern_fragment(&series_id.name),
namespace.name(),
series_id.name.text()
));
};
let namespace_id = if let Some(n) = series_id.namespace.first() {
let interned = self.interner.intern_fragment(n);
interned.with_text(namespace.name())
} else {
Fragment::internal(namespace.name().to_string())
};
let resolved_namespace = ResolvedNamespace::new(namespace_id, namespace);
let target = ResolvedSeries::new(
self.interner.intern_fragment(&series_id.name),
resolved_namespace,
series_def,
);
stack.push(PhysicalPlan::InsertSeries(InsertSeriesNode {
input: self.bump_box(input),
target,
returning: insert_series.returning,
}))
}
LogicalPlan::DeleteSeries(delete_series) => {
let input = if let Some(delete_input) = delete_series.input {
let sub_plan = self
.compile(rx, once(BumpBox::into_inner(delete_input)))?
.expect("Delete input must produce a plan");
Some(self.bump_box(sub_plan))
} else {
stack.pop().map(|i| self.bump_box(i))
};
let series_id = delete_series.target;
let ns_segments: Vec<&str> =
series_id.namespace.iter().map(|n| n.text()).collect();
let Some(namespace) =
self.catalog.find_namespace_by_segments(rx, &ns_segments)?
else {
let ns_name = ns_segments.join("::");
let fragment = series_id
.namespace
.first()
.map(|n| self.interner.intern_fragment(n))
.unwrap_or_else(|| Fragment::internal(&ns_name));
return_error!(namespace_not_found(fragment, &ns_name));
};
let Some(series_def) = self.catalog.find_series_by_name(
rx,
namespace.id(),
series_id.name.text(),
)?
else {
return_error!(series_not_found(
self.interner.intern_fragment(&series_id.name),
namespace.name(),
series_id.name.text()
));
};
let namespace_id = if let Some(n) = series_id.namespace.first() {
let interned = self.interner.intern_fragment(n);
interned.with_text(namespace.name())
} else {
Fragment::internal(namespace.name().to_string())
};
let resolved_namespace = ResolvedNamespace::new(namespace_id, namespace);
let target = ResolvedSeries::new(
self.interner.intern_fragment(&series_id.name),
resolved_namespace,
series_def,
);
stack.push(PhysicalPlan::DeleteSeries(DeleteSeriesNode {
input,
target,
returning: delete_series.returning,
}))
}
LogicalPlan::Update(update) => {
let input = if let Some(update_input) = update.input {
let sub_plan = self
.compile(rx, once(BumpBox::into_inner(update_input)))?
.expect("Update input must produce a plan");
self.bump_box(sub_plan)
} else {
self.bump_box(stack.pop().expect("Update requires input"))
};
let target = if let Some(table_id) = update.target {
let ns_segments: Vec<&str> =
table_id.namespace.iter().map(|n| n.text()).collect();
let Some(namespace) =
self.catalog.find_namespace_by_segments(rx, &ns_segments)?
else {
let ns_name = ns_segments.join("::");
let fragment = table_id
.namespace
.first()
.map(|n| self.interner.intern_fragment(n))
.unwrap_or_else(|| Fragment::internal(&ns_name));
return_error!(namespace_not_found(fragment, &ns_name));
};
let Some(table_def) = self.catalog.find_table_by_name(
rx,
namespace.id(),
table_id.name.text(),
)?
else {
return_error!(table_not_found(
self.interner.intern_fragment(&table_id.name),
namespace.name(),
table_id.name.text()
));
};
let namespace_id = if let Some(n) = table_id.namespace.first() {
let interned = self.interner.intern_fragment(n);
interned.with_text(namespace.name())
} else {
Fragment::internal(namespace.name().to_string())
};
let resolved_namespace =
ResolvedNamespace::new(namespace_id, namespace);
Some(ResolvedTable::new(
self.interner.intern_fragment(&table_id.name),
resolved_namespace,
table_def,
))
} else {
None
};
stack.push(PhysicalPlan::Update(UpdateTableNode {
input,
target,
returning: update.returning,
}))
}
LogicalPlan::UpdateRingBuffer(update_rb) => {
let input = if let Some(update_input) = update_rb.input {
let sub_plan = self
.compile(rx, once(BumpBox::into_inner(update_input)))?
.expect("UpdateRingBuffer input must produce a plan");
self.bump_box(sub_plan)
} else {
self.bump_box(stack.pop().expect("UpdateRingBuffer requires input"))
};
let ringbuffer_id = update_rb.target;
let ns_segments: Vec<&str> =
ringbuffer_id.namespace.iter().map(|n| n.text()).collect();
let Some(namespace) =
self.catalog.find_namespace_by_segments(rx, &ns_segments)?
else {
let ns_name = ns_segments.join("::");
let fragment = ringbuffer_id
.namespace
.first()
.map(|n| self.interner.intern_fragment(n))
.unwrap_or_else(|| Fragment::internal(&ns_name));
return_error!(namespace_not_found(fragment, &ns_name));
};
let Some(ringbuffer_def) = self.catalog.find_ringbuffer_by_name(
rx,
namespace.id(),
ringbuffer_id.name.text(),
)?
else {
return_error!(ringbuffer_not_found(
self.interner.intern_fragment(&ringbuffer_id.name),
namespace.name(),
ringbuffer_id.name.text()
));
};
let namespace_id = if let Some(n) = ringbuffer_id.namespace.first() {
let interned = self.interner.intern_fragment(n);
interned.with_text(namespace.name())
} else {
Fragment::internal(namespace.name().to_string())
};
let resolved_namespace = ResolvedNamespace::new(namespace_id, namespace);
let target = ResolvedRingBuffer::new(
self.interner.intern_fragment(&ringbuffer_id.name),
resolved_namespace,
ringbuffer_def,
);
stack.push(PhysicalPlan::UpdateRingBuffer(UpdateRingBufferNode {
input,
target,
returning: update_rb.returning,
}))
}
LogicalPlan::UpdateSeries(update_series) => {
let input = if let Some(update_input) = update_series.input {
let sub_plan = self
.compile(rx, once(BumpBox::into_inner(update_input)))?
.expect("UpdateSeries input must produce a plan");
self.bump_box(sub_plan)
} else {
self.bump_box(stack.pop().expect("UpdateSeries requires input"))
};
let series_id = update_series.target;
let ns_segments: Vec<&str> =
series_id.namespace.iter().map(|n| n.text()).collect();
let Some(namespace) =
self.catalog.find_namespace_by_segments(rx, &ns_segments)?
else {
let ns_name = ns_segments.join("::");
let fragment = series_id
.namespace
.first()
.map(|n| self.interner.intern_fragment(n))
.unwrap_or_else(|| Fragment::internal(&ns_name));
return_error!(namespace_not_found(fragment, &ns_name));
};
let Some(series_def) = self.catalog.find_series_by_name(
rx,
namespace.id(),
series_id.name.text(),
)?
else {
return_error!(series_not_found(
self.interner.intern_fragment(&series_id.name),
namespace.name(),
series_id.name.text()
));
};
let namespace_id = if let Some(n) = series_id.namespace.first() {
let interned = self.interner.intern_fragment(n);
interned.with_text(namespace.name())
} else {
Fragment::internal(namespace.name().to_string())
};
let resolved_namespace = ResolvedNamespace::new(namespace_id, namespace);
let target = ResolvedSeries::new(
self.interner.intern_fragment(&series_id.name),
resolved_namespace,
series_def,
);
stack.push(PhysicalPlan::UpdateSeries(UpdateSeriesNode {
input,
target,
returning: update_series.returning,
}))
}
LogicalPlan::JoinInner(join) => {
let left = stack.pop().unwrap(); let right = self.compile(rx, join.with)?.unwrap();
if let (PhysicalPlan::RemoteScan(l), PhysicalPlan::RemoteScan(r)) =
(&left, &right)
{
if l.address == r.address {
let mut pushed = l.clone();
pushed.remote_rql =
format!("{} | {}", pushed.remote_rql, join.rql);
pushed.variables.extend(r.variables.iter().cloned());
stack.push(PhysicalPlan::RemoteScan(pushed));
continue;
}
}
let alias = join.alias.map(|a| self.interner.intern_fragment(&a));
stack.push(PhysicalPlan::JoinInner(JoinInnerNode {
left: self.bump_box(left),
right: self.bump_box(right),
on: join.on,
alias,
}));
}
LogicalPlan::JoinLeft(join) => {
let left = stack.pop().unwrap(); let right = self.compile(rx, join.with)?.unwrap();
if let (PhysicalPlan::RemoteScan(l), PhysicalPlan::RemoteScan(r)) =
(&left, &right)
{
if l.address == r.address {
let mut pushed = l.clone();
pushed.remote_rql =
format!("{} | {}", pushed.remote_rql, join.rql);
pushed.variables.extend(r.variables.iter().cloned());
stack.push(PhysicalPlan::RemoteScan(pushed));
continue;
}
}
let alias = join.alias.map(|a| self.interner.intern_fragment(&a));
stack.push(PhysicalPlan::JoinLeft(JoinLeftNode {
left: self.bump_box(left),
right: self.bump_box(right),
on: join.on,
alias,
}));
}
LogicalPlan::JoinNatural(join) => {
let left = stack.pop().unwrap(); let right = self.compile(rx, join.with)?.unwrap();
if let (PhysicalPlan::RemoteScan(l), PhysicalPlan::RemoteScan(r)) =
(&left, &right)
{
if l.address == r.address {
let mut pushed = l.clone();
pushed.remote_rql =
format!("{} | {}", pushed.remote_rql, join.rql);
pushed.variables.extend(r.variables.iter().cloned());
stack.push(PhysicalPlan::RemoteScan(pushed));
continue;
}
}
let alias = join.alias.map(|a| self.interner.intern_fragment(&a));
stack.push(PhysicalPlan::JoinNatural(JoinNaturalNode {
left: self.bump_box(left),
right: self.bump_box(right),
join_type: join.join_type,
alias,
}));
}
LogicalPlan::Order(order) => {
let input = stack.pop().unwrap();
if let Some(pushed) = try_remote_push_down(&input, || Some(order.rql.clone())) {
stack.push(pushed);
continue;
}
stack.push(PhysicalPlan::Sort(SortNode {
by: order.by,
input: self.bump_box(input),
}));
}
LogicalPlan::Distinct(distinct) => {
let input = stack.pop().unwrap();
if let Some(pushed) =
try_remote_push_down(&input, || Some(distinct.rql.clone()))
{
stack.push(pushed);
continue;
}
let mut resolved_columns = Vec::with_capacity(distinct.columns.len());
for col in distinct.columns {
let namespace = ResolvedNamespace::new(
Fragment::internal("_context"),
Namespace::Local {
id: NamespaceId::SYSTEM,
name: "_context".to_string(),
local_name: "_context".to_string(),
parent_id: NamespaceId::ROOT,
},
);
let table_def = Table {
id: TableId(1),
namespace: NamespaceId::SYSTEM,
name: "_context".to_string(),
columns: vec![],
primary_key: None,
};
let resolved_table = ResolvedTable::new(
Fragment::internal("_context"),
namespace,
table_def,
);
let resolved_source = ResolvedSchema::Table(resolved_table);
let column_def = Column {
id: ColumnId(1),
name: col.name.text().to_string(),
constraint: TypeConstraint::unconstrained(Type::Utf8),
properties: vec![],
index: ColumnIndex(0),
auto_increment: false,
dictionary_id: None,
};
resolved_columns.push(ResolvedColumn::new(
self.interner.intern_fragment(&col.name),
resolved_source,
column_def,
));
}
stack.push(PhysicalPlan::Distinct(DistinctNode {
columns: resolved_columns,
input: self.bump_box(input),
}));
}
LogicalPlan::Map(map) => {
let input = stack.pop().map(|p| self.bump_box(p));
if let Some(ref boxed_input) = input {
let mut vars = Vec::new();
for expr in &map.map {
vars.extend(extract_variable_names(expr));
}
if let Some(pushed) = try_remote_push_down_with_vars(
boxed_input,
|| Some(map.rql.clone()),
vars,
) {
stack.push(pushed);
continue;
}
}
stack.push(PhysicalPlan::Map(MapNode {
map: map.map,
input,
}));
}
LogicalPlan::Extend(extend) => {
let input = stack.pop().map(|p| self.bump_box(p));
if let Some(ref boxed_input) = input {
let mut vars = Vec::new();
for expr in &extend.extend {
vars.extend(extract_variable_names(expr));
}
if let Some(pushed) = try_remote_push_down_with_vars(
boxed_input,
|| Some(extend.rql.clone()),
vars,
) {
stack.push(pushed);
continue;
}
}
stack.push(PhysicalPlan::Extend(ExtendNode {
extend: extend.extend,
input,
}));
}
LogicalPlan::Patch(patch) => {
let input = stack.pop().map(|p| self.bump_box(p));
if let Some(ref boxed_input) = input {
let mut vars = Vec::new();
for expr in &patch.assignments {
vars.extend(extract_variable_names(expr));
}
if let Some(pushed) = try_remote_push_down_with_vars(
boxed_input,
|| Some(patch.rql.clone()),
vars,
) {
stack.push(pushed);
continue;
}
}
stack.push(PhysicalPlan::Patch(PatchNode {
assignments: patch.assignments,
input,
}));
}
LogicalPlan::Apply(apply) => {
let input = stack.pop().map(|p| self.bump_box(p));
if let Some(ref boxed_input) = input {
let mut vars = Vec::new();
for expr in &apply.arguments {
vars.extend(extract_variable_names(expr));
}
if let Some(pushed) = try_remote_push_down_with_vars(
boxed_input,
|| Some(apply.rql.clone()),
vars,
) {
stack.push(pushed);
continue;
}
}
stack.push(PhysicalPlan::Apply(ApplyNode {
operator: self.interner.intern_fragment(&apply.operator),
expressions: apply.arguments,
input,
}));
}
LogicalPlan::PrimitiveScan(scan) => match &scan.source {
ResolvedSchema::Table(resolved_table) => {
if let Some(index) = &scan.index {
stack.push(PhysicalPlan::IndexScan(IndexScanNode {
source: resolved_table.clone(),
index_name: index.identifier().text().to_string(),
}));
} else {
stack.push(PhysicalPlan::TableScan(TableScanNode {
source: resolved_table.clone(),
}));
}
}
ResolvedSchema::View(resolved_view) => {
if scan.index.is_some() {
unimplemented!("views do not support indexes yet");
}
stack.push(PhysicalPlan::ViewScan(ViewScanNode {
source: resolved_view.clone(),
}));
}
ResolvedSchema::DeferredView(resolved_view) => {
if scan.index.is_some() {
unimplemented!("views do not support indexes yet");
}
let view = ResolvedView::new(
resolved_view.identifier().clone(),
resolved_view.namespace().clone(),
resolved_view.def().clone(),
);
stack.push(PhysicalPlan::ViewScan(ViewScanNode {
source: view,
}));
}
ResolvedSchema::TransactionalView(resolved_view) => {
if scan.index.is_some() {
unimplemented!("views do not support indexes yet");
}
let view = ResolvedView::new(
resolved_view.identifier().clone(),
resolved_view.namespace().clone(),
resolved_view.def().clone(),
);
stack.push(PhysicalPlan::ViewScan(ViewScanNode {
source: view,
}));
}
ResolvedSchema::TableVirtual(resolved_virtual) => {
if scan.index.is_some() {
unimplemented!("virtual tables do not support indexes yet");
}
stack.push(PhysicalPlan::TableVirtualScan(TableVirtualScanNode {
source: resolved_virtual.clone(),
pushdown_context: None,
}));
}
ResolvedSchema::RingBuffer(resolved_ringbuffer) => {
if scan.index.is_some() {
unimplemented!("ring buffers do not support indexes yet");
}
stack.push(PhysicalPlan::RingBufferScan(RingBufferScanNode {
source: resolved_ringbuffer.clone(),
}));
}
ResolvedSchema::Dictionary(resolved_dictionary) => {
if scan.index.is_some() {
unimplemented!("dictionaries do not support indexes");
}
stack.push(PhysicalPlan::DictionaryScan(DictionaryScanNode {
source: resolved_dictionary.clone(),
}));
}
ResolvedSchema::Series(resolved_series) => {
if scan.index.is_some() {
unimplemented!("series do not support indexes");
}
stack.push(PhysicalPlan::SeriesScan(SeriesScanNode {
source: resolved_series.clone(),
key_range_start: None,
key_range_end: None,
variant_tag: None,
}));
}
},
LogicalPlan::RemoteScan(scan) => {
stack.push(PhysicalPlan::RemoteScan(nodes::RemoteScanNode {
address: scan.address,
token: scan.token,
remote_rql: format!(
"FROM {}::{}",
scan.local_namespace, scan.remote_name
),
local_namespace: scan.local_namespace,
remote_name: scan.remote_name,
variables: Vec::new(),
}));
}
LogicalPlan::Take(take) => {
let input = stack.pop().unwrap();
if let Some(pushed) =
try_remote_push_down(&input, || Some(format!("TAKE {}", take.take)))
{
stack.push(pushed);
continue;
}
stack.push(PhysicalPlan::Take(TakeNode {
take: take.take,
input: self.bump_box(input),
}));
}
LogicalPlan::Window(window) => {
let input = stack.pop().map(|p| self.bump_box(p));
stack.push(PhysicalPlan::Window(WindowNode {
kind: window.kind,
group_by: window.group_by,
aggregations: window.aggregations,
ts: window.ts,
input,
}));
}
LogicalPlan::Pipeline(pipeline) => {
let pipeline_result = self.compile(rx, pipeline.steps)?;
if let Some(result) = pipeline_result {
stack.push(result);
}
}
LogicalPlan::Declare(declare_node) => {
let value = match declare_node.value {
logical::LetValue::Expression(expr) => LetValue::Expression(expr),
logical::LetValue::Statement(logical_plans) => {
let mut last_plan = None;
for logical_plan in logical_plans {
if let Some(physical_plan) =
self.compile(rx, once(logical_plan))?
{
last_plan = Some(physical_plan);
}
}
match last_plan {
Some(plan) => LetValue::Statement(self.bump_box(plan)),
None => LetValue::Expression(Constant(
ConstantExpression::None {
fragment: Fragment::internal("none"),
},
)),
}
}
logical::LetValue::EmptyFrame => LetValue::EmptyFrame,
};
stack.push(PhysicalPlan::Declare(DeclareNode {
name: self.interner.intern_fragment(&declare_node.name),
value,
}));
}
LogicalPlan::Assign(assign_node) => {
let value = match assign_node.value {
logical::AssignValue::Expression(expr) => AssignValue::Expression(expr),
logical::AssignValue::Statement(logical_plans) => {
let mut last_plan = None;
for logical_plan in logical_plans {
if let Some(physical_plan) =
self.compile(rx, once(logical_plan))?
{
last_plan = Some(physical_plan);
}
}
match last_plan {
Some(plan) => {
AssignValue::Statement(self.bump_box(plan))
}
None => AssignValue::Expression(Constant(
ConstantExpression::None {
fragment: Fragment::internal("none"),
},
)),
}
}
};
stack.push(PhysicalPlan::Assign(AssignNode {
name: self.interner.intern_fragment(&assign_node.name),
value,
}));
}
LogicalPlan::Append(append_node) => match append_node {
logical::AppendNode::IntoVariable {
target,
source,
} => {
let source = match source {
logical::AppendSourcePlan::Statement(logical_plans) => {
let mut physical_plans = Vec::new();
for logical_plan in logical_plans {
if let Some(physical_plan) =
self.compile(rx, once(logical_plan))?
{
physical_plans.push(physical_plan);
}
}
AppendPhysicalSource::Statement(physical_plans)
}
logical::AppendSourcePlan::Inline(inline) => {
AppendPhysicalSource::Inline(InlineDataNode {
rows: inline.rows,
})
}
};
stack.push(PhysicalPlan::Append(AppendPhysicalNode::IntoVariable {
target: self.interner.intern_fragment(&target),
source,
}));
}
logical::AppendNode::Query {
with,
} => {
let left = stack.pop().unwrap();
let right = self.compile(rx, with)?.unwrap();
stack.push(PhysicalPlan::Append(AppendPhysicalNode::Query {
left: self.bump_box(left),
right: self.bump_box(right),
}));
}
},
LogicalPlan::VariableSource(source) => {
let variable_expr = VariableExpression {
fragment: self.interner.intern_fragment(&source.name),
};
stack.push(PhysicalPlan::Variable(VariableNode {
variable_expr,
}));
}
LogicalPlan::Environment(_) => {
stack.push(PhysicalPlan::Environment(EnvironmentNode {}));
}
LogicalPlan::Conditional(conditional_node) => {
let then_branch = if let Some(then_plan) = self
.compile(rx, once(BumpBox::into_inner(conditional_node.then_branch)))?
{
self.bump_box(then_plan)
} else {
return Err(RqlError::InternalFunctionError {
name: "compile_physical".to_string(),
fragment: Fragment::internal("compile_physical"),
details: "Failed to compile conditional then branch"
.to_string(),
}
.into());
};
let mut else_ifs = Vec::new();
for else_if in conditional_node.else_ifs {
let condition = else_if.condition;
let then_branch = if let Some(plan) = self
.compile(rx, once(BumpBox::into_inner(else_if.then_branch)))?
{
self.bump_box(plan)
} else {
return Err(RqlError::InternalFunctionError {
name: "compile_physical".to_string(),
fragment: Fragment::internal("compile_physical"),
details: "Failed to compile conditional else if branch"
.to_string(),
}
.into());
};
else_ifs.push(ElseIfBranch {
condition,
then_branch,
});
}
let else_branch =
if let Some(else_logical) = conditional_node.else_branch {
if let Some(plan) = self
.compile(rx, once(BumpBox::into_inner(else_logical)))?
{
Some(self.bump_box(plan))
} else {
return Err(RqlError::InternalFunctionError {
name: "compile_physical".to_string(),
fragment: Fragment::internal("compile_physical"),
details: "Failed to compile conditional else branch".to_string(),
}.into());
}
} else {
None
};
stack.push(PhysicalPlan::Conditional(ConditionalNode {
condition: conditional_node.condition,
then_branch,
else_ifs,
else_branch,
}));
}
LogicalPlan::Scalarize(scalarize_node) => {
let input_plan = if let Some(plan) =
self.compile(rx, once(BumpBox::into_inner(scalarize_node.input)))?
{
self.bump_box(plan)
} else {
return Err(RqlError::InternalFunctionError {
name: "compile_physical".to_string(),
fragment: Fragment::internal("compile_physical"),
details: "Failed to compile scalarize input".to_string(),
}
.into());
};
stack.push(PhysicalPlan::Scalarize(ScalarizeNode {
input: input_plan,
fragment: self.interner.intern_fragment(&scalarize_node.fragment),
}));
}
LogicalPlan::Loop(loop_node) => {
let mut body = Vec::new();
for statement_plans in loop_node.body {
for logical_plan in statement_plans {
if let Some(physical_plan) =
self.compile(rx, once(logical_plan))?
{
body.push(physical_plan);
}
}
}
stack.push(PhysicalPlan::Loop(LoopNode {
body,
}));
}
LogicalPlan::While(while_node) => {
let mut body = Vec::new();
for statement_plans in while_node.body {
for logical_plan in statement_plans {
if let Some(physical_plan) =
self.compile(rx, once(logical_plan))?
{
body.push(physical_plan);
}
}
}
stack.push(PhysicalPlan::While(WhileNode {
condition: while_node.condition,
body,
}));
}
LogicalPlan::For(for_node) => {
let iterable = self
.compile(rx, for_node.iterable)?
.expect("For iterable must produce a plan");
let mut body = Vec::new();
for statement_plans in for_node.body {
for logical_plan in statement_plans {
if let Some(physical_plan) =
self.compile(rx, once(logical_plan))?
{
body.push(physical_plan);
}
}
}
stack.push(PhysicalPlan::For(ForNode {
variable_name: self.interner.intern_fragment(&for_node.variable_name),
iterable: self.bump_box(iterable),
body,
}));
}
LogicalPlan::Break => {
stack.push(PhysicalPlan::Break);
}
LogicalPlan::Continue => {
stack.push(PhysicalPlan::Continue);
}
LogicalPlan::DefineFunction(def_node) => {
let mut parameters = Vec::with_capacity(def_node.parameters.len());
for p in def_node.parameters {
parameters.push(nodes::FunctionParameter {
name: self.interner.intern_fragment(&p.name),
type_constraint: p.type_constraint,
});
}
let mut body = Vec::new();
for statement_plans in def_node.body {
for logical_plan in statement_plans {
if let Some(physical_plan) =
self.compile(rx, once(logical_plan))?
{
body.push(physical_plan);
}
}
}
stack.push(PhysicalPlan::DefineFunction(DefineFunctionNode {
name: self.interner.intern_fragment(&def_node.name),
parameters,
return_type: def_node.return_type,
body,
}));
}
LogicalPlan::Return(ret_node) => {
stack.push(PhysicalPlan::Return(ReturnNode {
value: ret_node.value,
}));
}
LogicalPlan::CallFunction(call_node) => {
stack.push(PhysicalPlan::CallFunction(CallFunctionNode {
name: self.interner.intern_fragment(&call_node.name),
arguments: call_node.arguments,
is_procedure_call: call_node.is_procedure_call,
}));
}
LogicalPlan::DefineClosure(closure_node) => {
let mut parameters = Vec::with_capacity(closure_node.parameters.len());
for p in closure_node.parameters {
parameters.push(nodes::FunctionParameter {
name: self.interner.intern_fragment(&p.name),
type_constraint: p.type_constraint,
});
}
let mut body = Vec::new();
for statement_plans in closure_node.body {
for logical_plan in statement_plans {
if let Some(physical_plan) =
self.compile(rx, once(logical_plan))?
{
body.push(physical_plan);
}
}
}
stack.push(PhysicalPlan::DefineClosure(DefineClosureNode {
parameters,
body,
}));
}
_ => unimplemented!(),
}
}
if stack.is_empty() {
return Ok(None);
}
if stack.len() != 1 {
return Err(RqlError::MissingSemicolon {
fragment: Fragment::internal("compile_physical"),
count: stack.len(),
}
.into());
}
Ok(Some(stack.pop().unwrap()))
}
}
fn try_remote_push_down<'a>(
input: &PhysicalPlan<'a>,
rql_suffix: impl FnOnce() -> Option<String>,
) -> Option<PhysicalPlan<'a>> {
try_remote_push_down_with_vars(input, rql_suffix, Vec::new())
}
fn try_remote_push_down_with_vars<'a>(
input: &PhysicalPlan<'a>,
rql_suffix: impl FnOnce() -> Option<String>,
variables: Vec<String>,
) -> Option<PhysicalPlan<'a>> {
if let PhysicalPlan::RemoteScan(remote) = input {
if let Some(suffix) = rql_suffix() {
let mut pushed = remote.clone();
pushed.remote_rql = format!("{} | {}", pushed.remote_rql, suffix);
for var in variables {
if !pushed.variables.contains(&var) {
pushed.variables.push(var);
}
}
return Some(PhysicalPlan::RemoteScan(pushed));
}
}
None
}