Skip to main content

reifydb_engine/bulk_insert/
coerce.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Column coercion for bulk inserts.
5
6use reifydb_core::{
7	interface::catalog::column::Column,
8	value::column::{columns::Columns, data::ColumnData},
9};
10use reifydb_routine::function::registry::Functions;
11use reifydb_runtime::context::{RuntimeContext, clock::Clock};
12use reifydb_type::{fragment::Fragment, params::Params, value::identity::IdentityId};
13
14use crate::{
15	Result,
16	expression::{cast::cast_column_data, context::EvalSession},
17	vm::stack::SymbolTable,
18};
19
20/// Coerce each column's data to the target type in batch.
21pub(super) fn coerce_columns(
22	column_data: &[ColumnData],
23	columns: &[Column],
24	num_rows: usize,
25) -> Result<Vec<ColumnData>> {
26	let runtime_ctx = RuntimeContext::with_clock(Clock::Real);
27	let session = EvalSession {
28		params: &Params::None,
29		symbols: &SymbolTable::new(),
30		functions: &Functions::empty(),
31		runtime_context: &runtime_ctx,
32		arena: None,
33		identity: IdentityId::root(),
34		is_aggregate_context: false,
35	};
36	let ctx = session.eval(Columns::empty(), num_rows);
37
38	let mut coerced_columns: Vec<ColumnData> = Vec::with_capacity(columns.len());
39
40	for (col_idx, col) in columns.iter().enumerate() {
41		let target = col.constraint.get_type();
42		// For Option(T) columns, cast to the inner type T; None values pass through unchanged
43		let cast_target = target.inner_type().clone();
44		let source_data = &column_data[col_idx];
45
46		let coerced = cast_column_data(&ctx, source_data, cast_target, || Fragment::internal(&col.name))?;
47		coerced_columns.push(coerced);
48	}
49
50	Ok(coerced_columns)
51}