reifydb-rql 0.4.12

ReifyDB Query Language (RQL) parser and AST
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use reifydb_transaction::transaction::Transaction;

use crate::{
	Result,
	ast::{
		ast::{Ast, AstFrom, AstPatch, AstUpdate},
		identifier::{
			MaybeQualifiedRingBufferIdentifier, MaybeQualifiedSeriesIdentifier,
			MaybeQualifiedTableIdentifier,
		},
	},
	bump::{BumpBox, BumpVec},
	expression::ExpressionCompiler,
	plan::logical::{
		Compiler, FilterNode, LogicalPlan, PipelineNode, UpdateRingBufferNode, UpdateSeriesNode,
		UpdateTableNode,
	},
};

impl<'bump> Compiler<'bump> {
	pub(crate) fn compile_update(
		&self,
		ast: AstUpdate<'bump>,
		tx: &mut Transaction<'_>,
	) -> Result<LogicalPlan<'bump>> {
		let returning = if let Some(returning_asts) = ast.returning {
			let mut exprs = Vec::with_capacity(returning_asts.len());
			for ast_node in returning_asts {
				exprs.push(ExpressionCompiler::compile(ast_node)?);
			}
			Some(exprs)
		} else {
			None
		};

		// Build internal pipeline: FROM -> FILTER -> MAP

		// 1. Create FROM scan from target
		let from_ast = AstFrom::Source {
			token: ast.token,
			source: ast.target.clone(),
			index_name: None,
		};
		let from_plan = self.compile_from(from_ast, tx)?;

		// 2. Create FILTER node from the filter clause
		let filter_ast = match BumpBox::into_inner(ast.filter) {
			Ast::Filter(f) => f,
			_ => unreachable!("filter should always be Ast::Filter"),
		};
		let filter_plan = LogicalPlan::Filter(FilterNode {
			condition: ExpressionCompiler::compile(BumpBox::into_inner(filter_ast.node))?,
			rql: filter_ast.rql.to_string(),
		});

		// 3. Create PATCH node from assignments (merges with original row)
		let patch_ast = AstPatch {
			token: ast.token,
			assignments: ast.assignments,
			rql: "",
		};
		let patch_plan = self.compile_patch(patch_ast)?;

		// 4. Build pipeline: FROM -> FILTER -> [TAKE] -> PATCH
		let take_plan = if let Some(take_box) = ast.take {
			let take_ast = match BumpBox::into_inner(take_box) {
				Ast::Take(t) => t,
				_ => unreachable!("take should always be Ast::Take"),
			};
			Some(self.compile_take(take_ast)?)
		} else {
			None
		};

		let capacity = if take_plan.is_some() {
			4
		} else {
			3
		};
		let mut steps = BumpVec::with_capacity_in(capacity, self.bump);
		steps.push(from_plan);
		steps.push(filter_plan);
		if let Some(take) = take_plan {
			steps.push(take);
		}
		steps.push(patch_plan);
		let pipeline = LogicalPlan::Pipeline(PipelineNode {
			steps,
		});

		// 5. Wrap in UPDATE node
		// Check in the catalog whether the target is a table or ring buffer
		let target_name = ast.target.name.text();
		let name = ast.target.name;
		let namespace = ast.target.namespace;
		let ns_segments: Vec<&str> = namespace.iter().map(|n| n.text()).collect();

		// Try to find namespace
		let namespace_id = if let Some(ns) = self.catalog.find_namespace_by_segments(tx, &ns_segments)? {
			ns.id()
		} else {
			// If namespace doesn't exist, default to table (will error during physical plan)
			let mut target = MaybeQualifiedTableIdentifier::new(name);
			if !namespace.is_empty() {
				target = target.with_namespace(namespace);
			}
			return Ok(LogicalPlan::Update(UpdateTableNode {
				target: Some(target),
				input: Some(BumpBox::new_in(pipeline, self.bump)),
				returning,
			}));
		};

		// Check if it's a ring buffer first
		if self.catalog.find_ringbuffer_by_name(tx, namespace_id, target_name)?.is_some() {
			let mut target = MaybeQualifiedRingBufferIdentifier::new(name);
			if !namespace.is_empty() {
				target = target.with_namespace(namespace);
			}
			Ok(LogicalPlan::UpdateRingBuffer(UpdateRingBufferNode {
				target,
				input: Some(BumpBox::new_in(pipeline, self.bump)),
				returning,
			}))
		} else if self.catalog.find_series_by_name(tx, namespace_id, target_name)?.is_some() {
			let mut target = MaybeQualifiedSeriesIdentifier::new(name);
			if !namespace.is_empty() {
				target = target.with_namespace(namespace);
			}
			Ok(LogicalPlan::UpdateSeries(UpdateSeriesNode {
				target,
				input: Some(BumpBox::new_in(pipeline, self.bump)),
				returning,
			}))
		} else {
			// Assume it's a table (will error during physical plan if not found)
			let mut target = MaybeQualifiedTableIdentifier::new(name);
			if !namespace.is_empty() {
				target = target.with_namespace(namespace);
			}
			Ok(LogicalPlan::Update(UpdateTableNode {
				target: Some(target),
				input: Some(BumpBox::new_in(pipeline, self.bump)),
				returning,
			}))
		}
	}
}