use reifydb_transaction::transaction::Transaction;
use crate::{
Result,
ast::{
ast::{Ast, AstFrom, AstPatch, AstUpdate},
identifier::{
MaybeQualifiedRingBufferIdentifier, MaybeQualifiedSeriesIdentifier,
MaybeQualifiedTableIdentifier,
},
},
bump::{BumpBox, BumpVec},
expression::ExpressionCompiler,
plan::logical::{
Compiler, FilterNode, LogicalPlan, PipelineNode, UpdateRingBufferNode, UpdateSeriesNode,
UpdateTableNode,
},
};
impl<'bump> Compiler<'bump> {
pub(crate) fn compile_update(
&self,
ast: AstUpdate<'bump>,
tx: &mut Transaction<'_>,
) -> Result<LogicalPlan<'bump>> {
let returning = if let Some(returning_asts) = ast.returning {
let mut exprs = Vec::with_capacity(returning_asts.len());
for ast_node in returning_asts {
exprs.push(ExpressionCompiler::compile(ast_node)?);
}
Some(exprs)
} else {
None
};
let from_ast = AstFrom::Source {
token: ast.token.clone(),
source: ast.target.clone(),
index_name: None,
};
let from_plan = self.compile_from(from_ast, tx)?;
let filter_ast = match BumpBox::into_inner(ast.filter) {
Ast::Filter(f) => f,
_ => unreachable!("filter should always be Ast::Filter"),
};
let filter_plan = LogicalPlan::Filter(FilterNode {
condition: ExpressionCompiler::compile(BumpBox::into_inner(filter_ast.node))?,
rql: filter_ast.rql.to_string(),
});
let patch_ast = AstPatch {
token: ast.token.clone(),
assignments: ast.assignments,
rql: "",
};
let patch_plan = self.compile_patch(patch_ast)?;
let take_plan = if let Some(take_box) = ast.take {
let take_ast = match BumpBox::into_inner(take_box) {
Ast::Take(t) => t,
_ => unreachable!("take should always be Ast::Take"),
};
Some(self.compile_take(take_ast)?)
} else {
None
};
let capacity = if take_plan.is_some() {
4
} else {
3
};
let mut steps = BumpVec::with_capacity_in(capacity, self.bump);
steps.push(from_plan);
steps.push(filter_plan);
if let Some(take) = take_plan {
steps.push(take);
}
steps.push(patch_plan);
let pipeline = LogicalPlan::Pipeline(PipelineNode {
steps,
});
let target_name = ast.target.name.text();
let name = ast.target.name;
let namespace = ast.target.namespace;
let ns_segments: Vec<&str> = namespace.iter().map(|n| n.text()).collect();
let namespace_id = if let Some(ns) = self.catalog.find_namespace_by_segments(tx, &ns_segments)? {
ns.id()
} else {
let mut target = MaybeQualifiedTableIdentifier::new(name);
if !namespace.is_empty() {
target = target.with_namespace(namespace);
}
return Ok(LogicalPlan::Update(UpdateTableNode {
target: Some(target),
input: Some(BumpBox::new_in(pipeline, self.bump)),
returning,
}));
};
if self.catalog.find_ringbuffer_by_name(tx, namespace_id, target_name)?.is_some() {
let mut target = MaybeQualifiedRingBufferIdentifier::new(name);
if !namespace.is_empty() {
target = target.with_namespace(namespace);
}
Ok(LogicalPlan::UpdateRingBuffer(UpdateRingBufferNode {
target,
input: Some(BumpBox::new_in(pipeline, self.bump)),
returning,
}))
} else if self.catalog.find_series_by_name(tx, namespace_id, target_name)?.is_some() {
let mut target = MaybeQualifiedSeriesIdentifier::new(name);
if !namespace.is_empty() {
target = target.with_namespace(namespace);
}
Ok(LogicalPlan::UpdateSeries(UpdateSeriesNode {
target,
input: Some(BumpBox::new_in(pipeline, self.bump)),
returning,
}))
} else {
let mut target = MaybeQualifiedTableIdentifier::new(name);
if !namespace.is_empty() {
target = target.with_namespace(namespace);
}
Ok(LogicalPlan::Update(UpdateTableNode {
target: Some(target),
input: Some(BumpBox::new_in(pipeline, self.bump)),
returning,
}))
}
}
}