1use reifydb_core::value::column::{Column, data::ColumnData};
5use reifydb_rql::expression::ColumnExpression;
6use reifydb_type::value::{
7 Value,
8 blob::Blob,
9 date::Date,
10 datetime::DateTime,
11 decimal::Decimal,
12 dictionary::DictionaryEntryId,
13 duration::Duration,
14 identity::IdentityId,
15 int::Int,
16 row_number::ROW_NUMBER_COLUMN_NAME,
17 time::Time,
18 r#type::Type,
19 uint::Uint,
20 uuid::{Uuid4, Uuid7},
21};
22
23use crate::{Result, expression::context::EvalContext, vm::stack::Variable};
24
25macro_rules! extract_typed_column {
26 ($col:expr, $take:expr, $variant:ident($x:ident) => $transform:expr, $default:expr, $constructor:ident) => {{
27 let mut data = Vec::new();
28 let mut bitvec = Vec::new();
29 let mut count = 0;
30 for v in $col.data().iter() {
31 if count >= $take {
32 break;
33 }
34 match v {
35 Value::$variant($x) => {
36 data.push($transform);
37 bitvec.push(true);
38 }
39 _ => {
40 data.push($default);
41 bitvec.push(false);
42 }
43 }
44 count += 1;
45 }
46 Ok($col.with_new_data(ColumnData::$constructor(data, bitvec)))
47 }};
48}
49
50pub(crate) fn column_lookup(ctx: &EvalContext, column: &ColumnExpression) -> Result<Column> {
51 let name = column.0.name.text();
52
53 if name == ROW_NUMBER_COLUMN_NAME && !ctx.columns.row_numbers.is_empty() {
55 let row_numbers: Vec<u64> = ctx.columns.row_numbers.iter().map(|r| r.value()).collect();
56 return Ok(Column::new(ROW_NUMBER_COLUMN_NAME.to_string(), ColumnData::uint8(row_numbers)));
57 }
58
59 if let Some(col) = ctx.columns.iter().find(|c| c.name() == name) {
60 return extract_column_data(col, ctx);
61 }
62
63 if let Some(Variable::Scalar(scalar_cols)) = ctx.symbol_table.get(name) {
64 if let Some(col) = scalar_cols.columns.first() {
65 return extract_column_data(col, ctx);
66 }
67 }
68
69 Ok(Column::new(name.to_string(), ColumnData::none_typed(Type::Boolean, ctx.row_count)))
70}
71
72fn extract_column_data(col: &Column, ctx: &EvalContext) -> Result<Column> {
73 let take = ctx.take.unwrap_or(usize::MAX);
74
75 let col_type = col.data().get_type();
78
79 let effective_type = match col_type {
81 Type::Option(inner) => *inner,
82 other => other,
83 };
84
85 extract_column_data_by_type(col, take, effective_type)
86}
87
88fn extract_column_data_by_type(col: &Column, take: usize, col_type: Type) -> Result<Column> {
89 match col_type {
90 Type::Boolean => extract_typed_column!(col, take, Boolean(b) => b, false, bool_with_bitvec),
91 Type::Float4 => extract_typed_column!(col, take, Float4(v) => v.value(), 0.0f32, float4_with_bitvec),
92 Type::Float8 => extract_typed_column!(col, take, Float8(v) => v.value(), 0.0f64, float8_with_bitvec),
93 Type::Int1 => extract_typed_column!(col, take, Int1(n) => n, 0, int1_with_bitvec),
94 Type::Int2 => extract_typed_column!(col, take, Int2(n) => n, 0, int2_with_bitvec),
95 Type::Int4 => extract_typed_column!(col, take, Int4(n) => n, 0, int4_with_bitvec),
96 Type::Int8 => extract_typed_column!(col, take, Int8(n) => n, 0, int8_with_bitvec),
97 Type::Int16 => extract_typed_column!(col, take, Int16(n) => n, 0, int16_with_bitvec),
98 Type::Utf8 => extract_typed_column!(col, take, Utf8(s) => s.clone(), "".to_string(), utf8_with_bitvec),
99 Type::Uint1 => extract_typed_column!(col, take, Uint1(n) => n, 0, uint1_with_bitvec),
100 Type::Uint2 => extract_typed_column!(col, take, Uint2(n) => n, 0, uint2_with_bitvec),
101 Type::Uint4 => extract_typed_column!(col, take, Uint4(n) => n, 0, uint4_with_bitvec),
102 Type::Uint8 => extract_typed_column!(col, take, Uint8(n) => n, 0, uint8_with_bitvec),
103 Type::Uint16 => extract_typed_column!(col, take, Uint16(n) => n, 0, uint16_with_bitvec),
104 Type::Date => extract_typed_column!(col, take, Date(d) => d.clone(), Date::default(), date_with_bitvec),
105 Type::DateTime => {
106 extract_typed_column!(col, take, DateTime(dt) => dt.clone(), DateTime::default(), datetime_with_bitvec)
107 }
108 Type::Time => extract_typed_column!(col, take, Time(t) => t.clone(), Time::default(), time_with_bitvec),
109 Type::Duration => {
110 extract_typed_column!(col, take, Duration(i) => i.clone(), Duration::default(), duration_with_bitvec)
111 }
112 Type::IdentityId => {
113 extract_typed_column!(col, take, IdentityId(i) => i.clone(), IdentityId::default(), identity_id_with_bitvec)
114 }
115 Type::Uuid4 => {
116 extract_typed_column!(col, take, Uuid4(i) => i.clone(), Uuid4::default(), uuid4_with_bitvec)
117 }
118 Type::Uuid7 => {
119 extract_typed_column!(col, take, Uuid7(i) => i.clone(), Uuid7::default(), uuid7_with_bitvec)
120 }
121 Type::DictionaryId => {
122 extract_typed_column!(col, take, DictionaryId(i) => i.clone(), DictionaryEntryId::default(), dictionary_id_with_bitvec)
123 }
124 Type::Blob => {
125 extract_typed_column!(col, take, Blob(b) => b.clone(), Blob::new(vec![]), blob_with_bitvec)
126 }
127 Type::Int => extract_typed_column!(col, take, Int(b) => b.clone(), Int::zero(), int_with_bitvec),
128 Type::Uint => extract_typed_column!(col, take, Uint(b) => b.clone(), Uint::zero(), uint_with_bitvec),
129 Type::Any => {
130 extract_typed_column!(col, take, Any(boxed) => Box::new(*boxed.clone()), Box::new(Value::none()), any_with_bitvec)
131 }
132 Type::Decimal => {
133 extract_typed_column!(col, take, Decimal(b) => b.clone(), Decimal::from_i64(0), decimal_with_bitvec)
134 }
135 Type::Option(inner) => extract_column_data_by_type(col, take, *inner),
136 Type::List(_) => {
137 extract_typed_column!(col, take, Any(boxed) => Box::new(*boxed.clone()), Box::new(Value::none()), any_with_bitvec)
138 }
139 }
140}
141
142#[cfg(test)]
143pub mod tests {
144 use reifydb_core::{
145 interface::identifier::{ColumnIdentifier, ColumnPrimitive},
146 value::column::{Column, columns::Columns, data::ColumnData},
147 };
148 use reifydb_function::registry::Functions;
149 use reifydb_rql::expression::ColumnExpression;
150 use reifydb_runtime::clock::Clock;
151 use reifydb_type::{fragment::Fragment, params::Params, value::identity::IdentityId};
152
153 use super::column_lookup;
154 use crate::{expression::context::EvalContext, vm::stack::SymbolTable};
155
156 #[test]
157 fn test_column_not_found_returns_correct_row_count() {
158 let columns =
160 Columns::new(vec![Column::new("existing_col".to_string(), ColumnData::int4([1, 2, 3, 4, 5]))]);
161
162 let ctx = EvalContext {
163 target: None,
164 columns,
165 row_count: 5,
166 take: None,
167 params: &Params::None,
168 symbol_table: &SymbolTable::new(),
169 is_aggregate_context: false,
170 functions: &Functions::empty(),
171 clock: &Clock::default(),
172 arena: None,
173 identity: IdentityId::root(),
174 };
175
176 let result = column_lookup(
178 &ctx,
179 &ColumnExpression(ColumnIdentifier {
180 primitive: ColumnPrimitive::Alias(Fragment::internal("nonexistent_col")),
181 name: Fragment::internal("nonexistent_col"),
182 }),
183 )
184 .unwrap();
185
186 assert_eq!(
188 result.data().len(),
189 5,
190 "Column not found should return column with ctx.row_count rows, not 0"
191 );
192 }
193}