use std::sync::Arc;
use reifydb_core::{
encoded::{row::EncodedRow, shape::RowShape},
value::column::{Column, columns::Columns, data::ColumnData},
};
use reifydb_rql::expression::Expression;
use reifydb_type::{
fragment::Fragment,
params::Params,
util::cowvec::CowVec,
value::{datetime::DateTime, identity::IdentityId, row_number::RowNumber},
};
use crate::{
Result,
expression::{
compile::{CompiledExpr, compile_expression},
context::{CompileContext, EvalContext},
},
vm::{services::Services, stack::SymbolTable},
};
pub(crate) fn decode_rows_to_columns(shape: &RowShape, rows: &[(RowNumber, EncodedRow)]) -> Columns {
let fields = shape.fields();
let mut columns_vec: Vec<Column> = Vec::with_capacity(fields.len());
for field in fields.iter() {
columns_vec.push(Column {
name: Fragment::internal(&field.name),
data: ColumnData::with_capacity(field.constraint.get_type(), rows.len()),
});
}
let mut row_numbers = Vec::with_capacity(rows.len());
let mut created_at = Vec::with_capacity(rows.len());
let mut updated_at = Vec::with_capacity(rows.len());
for (row_number, encoded) in rows {
row_numbers.push(*row_number);
created_at.push(DateTime::from_nanos(encoded.created_at_nanos()));
updated_at.push(DateTime::from_nanos(encoded.updated_at_nanos()));
for (i, _) in fields.iter().enumerate() {
columns_vec[i].data.push_value(shape.get_value(encoded, i));
}
}
Columns {
row_numbers: CowVec::new(row_numbers),
created_at: CowVec::new(created_at),
updated_at: CowVec::new(updated_at),
columns: CowVec::new(columns_vec),
}
}
fn try_column_passthrough(exprs: &[Expression], input: &Columns) -> Option<Columns> {
let mut cols = Vec::with_capacity(exprs.len());
for expr in exprs {
let Expression::Column(col_expr) = expr else {
return None;
};
let name = col_expr.0.name.text();
let col = input.column(name)?;
cols.push(col.clone());
}
if !input.row_numbers.is_empty() {
Some(Columns::with_system_columns(
cols,
input.row_numbers.to_vec(),
input.created_at.to_vec(),
input.updated_at.to_vec(),
))
} else {
Some(Columns::new(cols))
}
}
pub(crate) fn evaluate_returning(
services: &Arc<Services>,
symbols: &SymbolTable,
returning_exprs: &[Expression],
input: Columns,
) -> Result<Columns> {
if let Some(columns) = try_column_passthrough(returning_exprs, &input) {
return Ok(columns);
}
let compile_ctx = CompileContext {
functions: &services.functions,
symbols,
};
let compiled: Vec<CompiledExpr> = returning_exprs
.iter()
.map(|e| compile_expression(&compile_ctx, e).expect("compile returning expression"))
.collect();
let row_count = input.row_count();
let base = EvalContext {
params: &Params::None,
symbols,
functions: &services.functions,
runtime_context: &services.runtime_context,
arena: None,
identity: IdentityId::root(),
is_aggregate_context: false,
columns: Columns::empty(),
row_count: 1,
target: None,
take: None,
};
let mut new_columns = Vec::with_capacity(compiled.len());
for compiled_expr in &compiled {
let exec_ctx = base.with_eval(input.clone(), row_count);
let column = compiled_expr.execute(&exec_ctx)?;
new_columns.push(column);
}
if !input.row_numbers.is_empty() {
Ok(Columns::with_system_columns(
new_columns,
input.row_numbers.to_vec(),
input.created_at.to_vec(),
input.updated_at.to_vec(),
))
} else {
Ok(Columns::new(new_columns))
}
}