reifydb-engine 0.4.12

Query execution and processing engine for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use std::sync::Arc;

use reifydb_core::{
	encoded::{row::EncodedRow, shape::RowShape},
	value::column::{Column, columns::Columns, data::ColumnData},
};
use reifydb_rql::expression::Expression;
use reifydb_type::{
	fragment::Fragment,
	params::Params,
	util::cowvec::CowVec,
	value::{datetime::DateTime, identity::IdentityId, row_number::RowNumber},
};

use crate::{
	Result,
	expression::{
		compile::{CompiledExpr, compile_expression},
		context::{CompileContext, EvalContext},
	},
	vm::{services::Services, stack::SymbolTable},
};

/// Decode multiple encoded rows into a single Columns structure using the shape.
pub(crate) fn decode_rows_to_columns(shape: &RowShape, rows: &[(RowNumber, EncodedRow)]) -> Columns {
	let fields = shape.fields();

	let mut columns_vec: Vec<Column> = Vec::with_capacity(fields.len());
	for field in fields.iter() {
		columns_vec.push(Column {
			name: Fragment::internal(&field.name),
			data: ColumnData::with_capacity(field.constraint.get_type(), rows.len()),
		});
	}

	let mut row_numbers = Vec::with_capacity(rows.len());
	let mut created_at = Vec::with_capacity(rows.len());
	let mut updated_at = Vec::with_capacity(rows.len());
	for (row_number, encoded) in rows {
		row_numbers.push(*row_number);
		created_at.push(DateTime::from_nanos(encoded.created_at_nanos()));
		updated_at.push(DateTime::from_nanos(encoded.updated_at_nanos()));
		for (i, _) in fields.iter().enumerate() {
			columns_vec[i].data.push_value(shape.get_value(encoded, i));
		}
	}

	Columns {
		row_numbers: CowVec::new(row_numbers),
		created_at: CowVec::new(created_at),
		updated_at: CowVec::new(updated_at),
		columns: CowVec::new(columns_vec),
	}
}

/// If every RETURNING expression is a simple `Expression::Column`, extract
/// those columns from `input` by name and return a new `Columns`.
/// Returns `None` if any expression is not a plain column reference or
/// if a referenced column is missing from `input`.
fn try_column_passthrough(exprs: &[Expression], input: &Columns) -> Option<Columns> {
	let mut cols = Vec::with_capacity(exprs.len());
	for expr in exprs {
		let Expression::Column(col_expr) = expr else {
			return None;
		};
		let name = col_expr.0.name.text();
		let col = input.column(name)?;
		cols.push(col.clone());
	}
	if !input.row_numbers.is_empty() {
		Some(Columns::with_system_columns(
			cols,
			input.row_numbers.to_vec(),
			input.created_at.to_vec(),
			input.updated_at.to_vec(),
		))
	} else {
		Some(Columns::new(cols))
	}
}

/// Evaluate RETURNING expressions against the given columns.
pub(crate) fn evaluate_returning(
	services: &Arc<Services>,
	symbols: &SymbolTable,
	returning_exprs: &[Expression],
	input: Columns,
) -> Result<Columns> {
	if let Some(columns) = try_column_passthrough(returning_exprs, &input) {
		return Ok(columns);
	}

	let compile_ctx = CompileContext {
		functions: &services.functions,
		symbols,
	};

	let compiled: Vec<CompiledExpr> = returning_exprs
		.iter()
		.map(|e| compile_expression(&compile_ctx, e).expect("compile returning expression"))
		.collect();

	let row_count = input.row_count();
	let base = EvalContext {
		params: &Params::None,
		symbols,
		functions: &services.functions,
		runtime_context: &services.runtime_context,
		arena: None,
		identity: IdentityId::root(),
		is_aggregate_context: false,
		columns: Columns::empty(),
		row_count: 1,
		target: None,
		take: None,
	};

	let mut new_columns = Vec::with_capacity(compiled.len());
	for compiled_expr in &compiled {
		let exec_ctx = base.with_eval(input.clone(), row_count);
		let column = compiled_expr.execute(&exec_ctx)?;
		new_columns.push(column);
	}

	if !input.row_numbers.is_empty() {
		Ok(Columns::with_system_columns(
			new_columns,
			input.row_numbers.to_vec(),
			input.created_at.to_vec(),
			input.updated_at.to_vec(),
		))
	} else {
		Ok(Columns::new(new_columns))
	}
}