Skip to main content

reifydb_engine/expression/
lookup.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::value::column::{ColumnWithName, buffer::ColumnBuffer};
5use reifydb_rql::expression::ColumnExpression;
6use reifydb_type::value::{
7	Value,
8	blob::Blob,
9	date::Date,
10	datetime::{CREATED_AT_COLUMN_NAME, DateTime, UPDATED_AT_COLUMN_NAME},
11	decimal::Decimal,
12	dictionary::DictionaryEntryId,
13	duration::Duration,
14	identity::IdentityId,
15	int::Int,
16	row_number::ROW_NUMBER_COLUMN_NAME,
17	time::Time,
18	r#type::Type,
19	uint::Uint,
20	uuid::{Uuid4, Uuid7},
21};
22
23use crate::{Result, expression::context::EvalContext, vm::stack::Variable};
24
25macro_rules! extract_typed_column {
26	($col:expr, $take:expr, $variant:ident($x:ident) => $transform:expr, $default:expr, $constructor:ident) => {{
27		let mut data = Vec::new();
28		let mut bitvec = Vec::new();
29		let mut count = 0;
30		for v in $col.data().iter() {
31			if count >= $take {
32				break;
33			}
34			match v {
35				Value::$variant($x) => {
36					data.push($transform);
37					bitvec.push(true);
38				}
39				_ => {
40					data.push($default);
41					bitvec.push(false);
42				}
43			}
44			count += 1;
45		}
46		Ok($col.with_new_data(ColumnBuffer::$constructor(data, bitvec)))
47	}};
48}
49
50pub(crate) fn column_lookup(ctx: &EvalContext, column: &ColumnExpression) -> Result<ColumnWithName> {
51	let name = column.0.name.text();
52
53	if name == ROW_NUMBER_COLUMN_NAME && !ctx.columns.row_numbers.is_empty() {
54		let row_numbers: Vec<u64> = ctx.columns.row_numbers.iter().map(|r| r.value()).collect();
55		return Ok(ColumnWithName::new(ROW_NUMBER_COLUMN_NAME.to_string(), ColumnBuffer::uint8(row_numbers)));
56	}
57
58	if name == CREATED_AT_COLUMN_NAME && !ctx.columns.created_at.is_empty() {
59		return Ok(ColumnWithName::new(
60			CREATED_AT_COLUMN_NAME.to_string(),
61			ColumnBuffer::datetime(ctx.columns.created_at.to_vec()),
62		));
63	}
64
65	if name == UPDATED_AT_COLUMN_NAME && !ctx.columns.updated_at.is_empty() {
66		return Ok(ColumnWithName::new(
67			UPDATED_AT_COLUMN_NAME.to_string(),
68			ColumnBuffer::datetime(ctx.columns.updated_at.to_vec()),
69		));
70	}
71
72	if let Some(col) = ctx.columns.iter().find(|c| c.name() == name) {
73		let owned = ColumnWithName::new(col.name().clone(), col.data().clone());
74		return extract_column_data(&owned, ctx);
75	}
76
77	if let Some(Variable::Columns {
78		columns: scalar_cols,
79	}) = ctx.symbols.get(name)
80		&& scalar_cols.is_scalar()
81		&& let Some(col) = scalar_cols.columns.first()
82	{
83		let owned = ColumnWithName::new(scalar_cols.name_at(0).clone(), col.clone());
84		return extract_column_data(&owned, ctx);
85	}
86
87	Ok(ColumnWithName::new(name.to_string(), ColumnBuffer::none_typed(Type::Boolean, ctx.row_count)))
88}
89
90fn extract_column_data(col: &ColumnWithName, ctx: &EvalContext) -> Result<ColumnWithName> {
91	let take = ctx.take.unwrap_or(usize::MAX);
92
93	// Fast path: when no truncation is required, the underlying buffer already
94	// carries its data in the correct typed form and a clone is an Arc-bump.
95	if take >= col.data().len() {
96		return Ok(col.clone());
97	}
98
99	let col_type = col.data().get_type();
100	let effective_type = match col_type {
101		Type::Option(inner) => *inner,
102		other => other,
103	};
104
105	extract_column_data_by_type(col, take, effective_type)
106}
107
108fn extract_column_data_by_type(col: &ColumnWithName, take: usize, col_type: Type) -> Result<ColumnWithName> {
109	match col_type {
110		Type::Boolean => extract_typed_column!(col, take, Boolean(b) => b, false, bool_with_bitvec),
111		Type::Float4 => extract_typed_column!(col, take, Float4(v) => v.value(), 0.0f32, float4_with_bitvec),
112		Type::Float8 => extract_typed_column!(col, take, Float8(v) => v.value(), 0.0f64, float8_with_bitvec),
113		Type::Int1 => extract_typed_column!(col, take, Int1(n) => n, 0, int1_with_bitvec),
114		Type::Int2 => extract_typed_column!(col, take, Int2(n) => n, 0, int2_with_bitvec),
115		Type::Int4 => extract_typed_column!(col, take, Int4(n) => n, 0, int4_with_bitvec),
116		Type::Int8 => extract_typed_column!(col, take, Int8(n) => n, 0, int8_with_bitvec),
117		Type::Int16 => extract_typed_column!(col, take, Int16(n) => n, 0, int16_with_bitvec),
118		Type::Utf8 => extract_typed_column!(col, take, Utf8(s) => s.clone(), "".to_string(), utf8_with_bitvec),
119		Type::Uint1 => extract_typed_column!(col, take, Uint1(n) => n, 0, uint1_with_bitvec),
120		Type::Uint2 => extract_typed_column!(col, take, Uint2(n) => n, 0, uint2_with_bitvec),
121		Type::Uint4 => extract_typed_column!(col, take, Uint4(n) => n, 0, uint4_with_bitvec),
122		Type::Uint8 => extract_typed_column!(col, take, Uint8(n) => n, 0, uint8_with_bitvec),
123		Type::Uint16 => extract_typed_column!(col, take, Uint16(n) => n, 0, uint16_with_bitvec),
124		Type::Date => extract_typed_column!(col, take, Date(d) => d, Date::default(), date_with_bitvec),
125		Type::DateTime => {
126			extract_typed_column!(col, take, DateTime(dt) => dt, DateTime::default(), datetime_with_bitvec)
127		}
128		Type::Time => extract_typed_column!(col, take, Time(t) => t, Time::default(), time_with_bitvec),
129		Type::Duration => {
130			extract_typed_column!(col, take, Duration(i) => i, Duration::default(), duration_with_bitvec)
131		}
132		Type::IdentityId => {
133			extract_typed_column!(col, take, IdentityId(i) => i, IdentityId::default(), identity_id_with_bitvec)
134		}
135		Type::Uuid4 => {
136			extract_typed_column!(col, take, Uuid4(i) => i, Uuid4::default(), uuid4_with_bitvec)
137		}
138		Type::Uuid7 => {
139			extract_typed_column!(col, take, Uuid7(i) => i, Uuid7::default(), uuid7_with_bitvec)
140		}
141		Type::DictionaryId => {
142			extract_typed_column!(col, take, DictionaryId(i) => i, DictionaryEntryId::default(), dictionary_id_with_bitvec)
143		}
144		Type::Blob => {
145			extract_typed_column!(col, take, Blob(b) => b.clone(), Blob::new(vec![]), blob_with_bitvec)
146		}
147		Type::Int => extract_typed_column!(col, take, Int(b) => b.clone(), Int::zero(), int_with_bitvec),
148		Type::Uint => extract_typed_column!(col, take, Uint(b) => b.clone(), Uint::zero(), uint_with_bitvec),
149		Type::Any => {
150			extract_typed_column!(col, take, Any(boxed) => Box::new(*boxed.clone()), Box::new(Value::none()), any_with_bitvec)
151		}
152		Type::Decimal => {
153			extract_typed_column!(col, take, Decimal(b) => b.clone(), Decimal::from_i64(0), decimal_with_bitvec)
154		}
155		Type::Option(inner) => extract_column_data_by_type(col, take, *inner),
156		Type::List(_) => {
157			extract_typed_column!(col, take, Any(boxed) => Box::new(*boxed.clone()), Box::new(Value::none()), any_with_bitvec)
158		}
159		Type::Record(_) => {
160			extract_typed_column!(col, take, Any(boxed) => Box::new(*boxed.clone()), Box::new(Value::none()), any_with_bitvec)
161		}
162		Type::Tuple(_) => {
163			extract_typed_column!(col, take, Any(boxed) => Box::new(*boxed.clone()), Box::new(Value::none()), any_with_bitvec)
164		}
165	}
166}
167
168#[cfg(test)]
169pub mod tests {
170	use reifydb_core::{
171		interface::identifier::{ColumnIdentifier, ColumnShape},
172		value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
173	};
174	use reifydb_routine::routine::registry::Routines;
175	use reifydb_rql::expression::ColumnExpression;
176	use reifydb_runtime::context::{RuntimeContext, clock::Clock};
177	use reifydb_type::{fragment::Fragment, params::Params, value::identity::IdentityId};
178
179	use super::column_lookup;
180	use crate::{expression::context::EvalContext, vm::stack::SymbolTable};
181
182	#[test]
183	fn test_column_not_found_returns_correct_row_count() {
184		// Create context with 5 rows
185		let columns = Columns::new(vec![ColumnWithName::new(
186			"existing_col".to_string(),
187			ColumnBuffer::int4([1, 2, 3, 4, 5]),
188		)]);
189
190		let runtime_ctx = RuntimeContext::with_clock(Clock::Real);
191		let routines = Routines::empty();
192		let base = EvalContext {
193			params: &Params::None,
194			symbols: &SymbolTable::new(),
195			routines: &routines,
196			runtime_context: &runtime_ctx,
197			arena: None,
198			identity: IdentityId::root(),
199			is_aggregate_context: false,
200			columns: Columns::empty(),
201			row_count: 1,
202			target: None,
203			take: None,
204		};
205		let ctx = base.with_eval(columns, 5);
206
207		// Try to access a column that doesn't exist
208		let result = column_lookup(
209			&ctx,
210			&ColumnExpression(ColumnIdentifier {
211				shape: ColumnShape::Alias(Fragment::internal("nonexistent_col")),
212				name: Fragment::internal("nonexistent_col"),
213			}),
214		)
215		.unwrap();
216
217		// The column should have 5 rows (matching row_count), not 0
218		assert_eq!(
219			result.data().len(),
220			5,
221			"Column not found should return column with ctx.row_count rows, not 0"
222		);
223	}
224}