Skip to main content

reifydb_engine/expression/
lookup.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::value::column::{Column, data::ColumnData};
5use reifydb_rql::expression::ColumnExpression;
6use reifydb_type::value::{
7	Value,
8	blob::Blob,
9	date::Date,
10	datetime::DateTime,
11	decimal::Decimal,
12	dictionary::DictionaryEntryId,
13	duration::Duration,
14	identity::IdentityId,
15	int::Int,
16	row_number::ROW_NUMBER_COLUMN_NAME,
17	time::Time,
18	r#type::Type,
19	uint::Uint,
20	uuid::{Uuid4, Uuid7},
21};
22
23use crate::expression::context::EvalContext;
24
25macro_rules! extract_typed_column {
26	($col:expr, $take:expr, $variant:ident($x:ident) => $transform:expr, $default:expr, $constructor:ident) => {{
27		let mut data = Vec::new();
28		let mut bitvec = Vec::new();
29		let mut count = 0;
30		for v in $col.data().iter() {
31			if count >= $take {
32				break;
33			}
34			match v {
35				Value::$variant($x) => {
36					data.push($transform);
37					bitvec.push(true);
38				}
39				_ => {
40					data.push($default);
41					bitvec.push(false);
42				}
43			}
44			count += 1;
45		}
46		Ok($col.with_new_data(ColumnData::$constructor(data, bitvec)))
47	}};
48}
49
50pub(crate) fn column_lookup(ctx: &EvalContext, column: &ColumnExpression) -> crate::Result<Column> {
51	let name = column.0.name.text();
52
53	// Check for rownum pseudo-column first
54	if name == ROW_NUMBER_COLUMN_NAME && !ctx.columns.row_numbers.is_empty() {
55		let row_numbers: Vec<u64> = ctx.columns.row_numbers.iter().map(|r| r.value()).collect();
56		return Ok(Column::new(ROW_NUMBER_COLUMN_NAME.to_string(), ColumnData::uint8(row_numbers)));
57	}
58
59	if let Some(col) = ctx.columns.iter().find(|c| c.name() == name) {
60		return extract_column_data(col, ctx);
61	}
62
63	Ok(Column::new(name.to_string(), ColumnData::none_typed(Type::Boolean, ctx.row_count)))
64}
65
66fn extract_column_data(col: &Column, ctx: &EvalContext) -> crate::Result<Column> {
67	let take = ctx.take.unwrap_or(usize::MAX);
68
69	// Use the column's actual data type instead of checking the first value
70	// This handles cases where the first value is Undefined
71	let col_type = col.data().get_type();
72
73	// Unwrap Option to get the effective data type
74	let effective_type = match col_type {
75		Type::Option(inner) => *inner,
76		other => other,
77	};
78
79	extract_column_data_by_type(col, take, effective_type)
80}
81
82fn extract_column_data_by_type(col: &Column, take: usize, col_type: Type) -> crate::Result<Column> {
83	match col_type {
84		Type::Boolean => extract_typed_column!(col, take, Boolean(b) => b, false, bool_with_bitvec),
85		Type::Float4 => extract_typed_column!(col, take, Float4(v) => v.value(), 0.0f32, float4_with_bitvec),
86		Type::Float8 => extract_typed_column!(col, take, Float8(v) => v.value(), 0.0f64, float8_with_bitvec),
87		Type::Int1 => extract_typed_column!(col, take, Int1(n) => n, 0, int1_with_bitvec),
88		Type::Int2 => extract_typed_column!(col, take, Int2(n) => n, 0, int2_with_bitvec),
89		Type::Int4 => extract_typed_column!(col, take, Int4(n) => n, 0, int4_with_bitvec),
90		Type::Int8 => extract_typed_column!(col, take, Int8(n) => n, 0, int8_with_bitvec),
91		Type::Int16 => extract_typed_column!(col, take, Int16(n) => n, 0, int16_with_bitvec),
92		Type::Utf8 => extract_typed_column!(col, take, Utf8(s) => s.clone(), "".to_string(), utf8_with_bitvec),
93		Type::Uint1 => extract_typed_column!(col, take, Uint1(n) => n, 0, uint1_with_bitvec),
94		Type::Uint2 => extract_typed_column!(col, take, Uint2(n) => n, 0, uint2_with_bitvec),
95		Type::Uint4 => extract_typed_column!(col, take, Uint4(n) => n, 0, uint4_with_bitvec),
96		Type::Uint8 => extract_typed_column!(col, take, Uint8(n) => n, 0, uint8_with_bitvec),
97		Type::Uint16 => extract_typed_column!(col, take, Uint16(n) => n, 0, uint16_with_bitvec),
98		Type::Date => extract_typed_column!(col, take, Date(d) => d.clone(), Date::default(), date_with_bitvec),
99		Type::DateTime => {
100			extract_typed_column!(col, take, DateTime(dt) => dt.clone(), DateTime::default(), datetime_with_bitvec)
101		}
102		Type::Time => extract_typed_column!(col, take, Time(t) => t.clone(), Time::default(), time_with_bitvec),
103		Type::Duration => {
104			extract_typed_column!(col, take, Duration(i) => i.clone(), Duration::default(), duration_with_bitvec)
105		}
106		Type::IdentityId => {
107			extract_typed_column!(col, take, IdentityId(i) => i.clone(), IdentityId::default(), identity_id_with_bitvec)
108		}
109		Type::Uuid4 => {
110			extract_typed_column!(col, take, Uuid4(i) => i.clone(), Uuid4::default(), uuid4_with_bitvec)
111		}
112		Type::Uuid7 => {
113			extract_typed_column!(col, take, Uuid7(i) => i.clone(), Uuid7::default(), uuid7_with_bitvec)
114		}
115		Type::DictionaryId => {
116			extract_typed_column!(col, take, DictionaryId(i) => i.clone(), DictionaryEntryId::default(), dictionary_id_with_bitvec)
117		}
118		Type::Blob => {
119			extract_typed_column!(col, take, Blob(b) => b.clone(), Blob::new(vec![]), blob_with_bitvec)
120		}
121		Type::Int => extract_typed_column!(col, take, Int(b) => b.clone(), Int::zero(), int_with_bitvec),
122		Type::Uint => extract_typed_column!(col, take, Uint(b) => b.clone(), Uint::zero(), uint_with_bitvec),
123		Type::Any => {
124			extract_typed_column!(col, take, Any(boxed) => Box::new(*boxed.clone()), Box::new(Value::none()), any_with_bitvec)
125		}
126		Type::Decimal => {
127			extract_typed_column!(col, take, Decimal(b) => b.clone(), Decimal::from_i64(0), decimal_with_bitvec)
128		}
129		Type::Option(inner) => extract_column_data_by_type(col, take, *inner),
130	}
131}
132
133#[cfg(test)]
134pub mod tests {
135	use reifydb_core::{
136		interface::identifier::{ColumnIdentifier, ColumnPrimitive},
137		value::column::{Column, columns::Columns, data::ColumnData},
138	};
139	use reifydb_function::registry::Functions;
140	use reifydb_rql::expression::ColumnExpression;
141	use reifydb_runtime::clock::Clock;
142	use reifydb_type::{fragment::Fragment, params::Params};
143
144	use crate::{expression::context::EvalContext, vm::stack::SymbolTable};
145
146	#[test]
147	fn test_column_not_found_returns_correct_row_count() {
148		// Create context with 5 rows
149		let columns =
150			Columns::new(vec![Column::new("existing_col".to_string(), ColumnData::int4([1, 2, 3, 4, 5]))]);
151
152		let ctx = EvalContext {
153			target: None,
154			columns,
155			row_count: 5,
156			take: None,
157			params: &Params::None,
158			symbol_table: &SymbolTable::new(),
159			is_aggregate_context: false,
160			functions: &Functions::empty(),
161			clock: &Clock::default(),
162			arena: None,
163		};
164
165		// Try to access a column that doesn't exist
166		let result = super::column_lookup(
167			&ctx,
168			&ColumnExpression(ColumnIdentifier {
169				primitive: ColumnPrimitive::Alias(Fragment::internal("nonexistent_col")),
170				name: Fragment::internal("nonexistent_col"),
171			}),
172		)
173		.unwrap();
174
175		// The column should have 5 rows (matching row_count), not 0
176		assert_eq!(
177			result.data().len(),
178			5,
179			"Column not found should return column with ctx.row_count rows, not 0"
180		);
181	}
182}