1use reifydb_core::value::column::{ColumnWithName, buffer::ColumnBuffer};
5use reifydb_rql::expression::ColumnExpression;
6use reifydb_value::value::{
7 Value,
8 blob::Blob,
9 date::Date,
10 datetime::{CREATED_AT_COLUMN_NAME, DateTime, UPDATED_AT_COLUMN_NAME},
11 decimal::Decimal,
12 dictionary::DictionaryEntryId,
13 duration::Duration,
14 identity::IdentityId,
15 int::Int,
16 row_number::ROW_NUMBER_COLUMN_NAME,
17 time::Time,
18 uint::Uint,
19 uuid::{Uuid4, Uuid7},
20 value_type::ValueType,
21};
22
23use crate::{Result, expression::context::EvalContext, vm::stack::Variable};
24
25macro_rules! extract_typed_column {
26 ($col:expr, $take:expr, $variant:ident($x:ident) => $transform:expr, $default:expr, $constructor:ident) => {{
27 let mut data = Vec::new();
28 let mut bitvec = Vec::new();
29 let mut count = 0;
30 for v in $col.data().iter() {
31 if count >= $take {
32 break;
33 }
34 match v {
35 Value::$variant($x) => {
36 data.push($transform);
37 bitvec.push(true);
38 }
39 _ => {
40 data.push($default);
41 bitvec.push(false);
42 }
43 }
44 count += 1;
45 }
46 Ok($col.with_new_data(ColumnBuffer::$constructor(data, bitvec)))
47 }};
48}
49
50pub(crate) fn column_lookup(ctx: &EvalContext, column: &ColumnExpression) -> Result<ColumnWithName> {
51 let name = column.0.name.text();
52
53 if name == ROW_NUMBER_COLUMN_NAME && !ctx.columns.row_numbers.is_empty() {
54 let row_numbers: Vec<u64> = ctx.columns.row_numbers.iter().map(|r| r.value()).collect();
55 return Ok(ColumnWithName::new(ROW_NUMBER_COLUMN_NAME.to_string(), ColumnBuffer::uint8(row_numbers)));
56 }
57
58 if name == CREATED_AT_COLUMN_NAME && !ctx.columns.created_at.is_empty() {
59 return Ok(ColumnWithName::new(
60 CREATED_AT_COLUMN_NAME.to_string(),
61 ColumnBuffer::datetime(ctx.columns.created_at.to_vec()),
62 ));
63 }
64
65 if name == UPDATED_AT_COLUMN_NAME && !ctx.columns.updated_at.is_empty() {
66 return Ok(ColumnWithName::new(
67 UPDATED_AT_COLUMN_NAME.to_string(),
68 ColumnBuffer::datetime(ctx.columns.updated_at.to_vec()),
69 ));
70 }
71
72 if let Some(col) = ctx.columns.iter().find(|c| c.name() == name) {
73 let owned = ColumnWithName::new(col.name().clone(), col.data().clone());
74 return extract_column_data(&owned, ctx);
75 }
76
77 if let Some(Variable::Columns {
78 columns: scalar_cols,
79 }) = ctx.symbols.get(name)
80 && scalar_cols.is_scalar()
81 && let Some(col) = scalar_cols.columns.first()
82 {
83 let owned = ColumnWithName::new(scalar_cols.name_at(0).clone(), col.clone());
84 return extract_column_data(&owned, ctx);
85 }
86
87 Ok(ColumnWithName::new(name.to_string(), ColumnBuffer::none_typed(ValueType::Boolean, ctx.row_count)))
88}
89
90fn extract_column_data(col: &ColumnWithName, ctx: &EvalContext) -> Result<ColumnWithName> {
91 let take = ctx.take.unwrap_or(usize::MAX);
92
93 if take >= col.data().len() {
94 return Ok(col.clone());
95 }
96
97 let col_type = col.data().get_type();
98 let effective_type = match col_type {
99 ValueType::Option(inner) => *inner,
100 other => other,
101 };
102
103 extract_column_data_by_type(col, take, effective_type)
104}
105
106fn extract_column_data_by_type(col: &ColumnWithName, take: usize, col_type: ValueType) -> Result<ColumnWithName> {
107 match col_type {
108 ValueType::Boolean => extract_typed_column!(col, take, Boolean(b) => b, false, bool_with_bitvec),
109 ValueType::Float4 => {
110 extract_typed_column!(col, take, Float4(v) => v.value(), 0.0f32, float4_with_bitvec)
111 }
112 ValueType::Float8 => {
113 extract_typed_column!(col, take, Float8(v) => v.value(), 0.0f64, float8_with_bitvec)
114 }
115 ValueType::Int1 => extract_typed_column!(col, take, Int1(n) => n, 0, int1_with_bitvec),
116 ValueType::Int2 => extract_typed_column!(col, take, Int2(n) => n, 0, int2_with_bitvec),
117 ValueType::Int4 => extract_typed_column!(col, take, Int4(n) => n, 0, int4_with_bitvec),
118 ValueType::Int8 => extract_typed_column!(col, take, Int8(n) => n, 0, int8_with_bitvec),
119 ValueType::Int16 => extract_typed_column!(col, take, Int16(n) => n, 0, int16_with_bitvec),
120 ValueType::Utf8 => {
121 extract_typed_column!(col, take, Utf8(s) => s.clone(), "".to_string(), utf8_with_bitvec)
122 }
123 ValueType::Uint1 => extract_typed_column!(col, take, Uint1(n) => n, 0, uint1_with_bitvec),
124 ValueType::Uint2 => extract_typed_column!(col, take, Uint2(n) => n, 0, uint2_with_bitvec),
125 ValueType::Uint4 => extract_typed_column!(col, take, Uint4(n) => n, 0, uint4_with_bitvec),
126 ValueType::Uint8 => extract_typed_column!(col, take, Uint8(n) => n, 0, uint8_with_bitvec),
127 ValueType::Uint16 => extract_typed_column!(col, take, Uint16(n) => n, 0, uint16_with_bitvec),
128 ValueType::Date => extract_typed_column!(col, take, Date(d) => d, Date::default(), date_with_bitvec),
129 ValueType::DateTime => {
130 extract_typed_column!(col, take, DateTime(dt) => dt, DateTime::default(), datetime_with_bitvec)
131 }
132 ValueType::Time => extract_typed_column!(col, take, Time(t) => t, Time::default(), time_with_bitvec),
133 ValueType::Duration => {
134 extract_typed_column!(col, take, Duration(i) => i, Duration::default(), duration_with_bitvec)
135 }
136 ValueType::IdentityId => {
137 extract_typed_column!(col, take, IdentityId(i) => i, IdentityId::default(), identity_id_with_bitvec)
138 }
139 ValueType::Uuid4 => {
140 extract_typed_column!(col, take, Uuid4(i) => i, Uuid4::default(), uuid4_with_bitvec)
141 }
142 ValueType::Uuid7 => {
143 extract_typed_column!(col, take, Uuid7(i) => i, Uuid7::default(), uuid7_with_bitvec)
144 }
145 ValueType::DictionaryId => {
146 extract_typed_column!(col, take, DictionaryId(i) => i, DictionaryEntryId::default(), dictionary_id_with_bitvec)
147 }
148 ValueType::Blob => {
149 extract_typed_column!(col, take, Blob(b) => b.clone(), Blob::new(vec![]), blob_with_bitvec)
150 }
151 ValueType::Int => extract_typed_column!(col, take, Int(b) => b.clone(), Int::zero(), int_with_bitvec),
152 ValueType::Uint => {
153 extract_typed_column!(col, take, Uint(b) => b.clone(), Uint::zero(), uint_with_bitvec)
154 }
155 ValueType::Any => {
156 extract_typed_column!(col, take, Any(boxed) => Box::new(*boxed.clone()), Box::new(Value::none()), any_with_bitvec)
157 }
158 ValueType::Decimal => {
159 extract_typed_column!(col, take, Decimal(b) => b.clone(), Decimal::from_i64(0), decimal_with_bitvec)
160 }
161 ValueType::Option(inner) => extract_column_data_by_type(col, take, *inner),
162 ValueType::List(_) => {
163 extract_typed_column!(col, take, Any(boxed) => Box::new(*boxed.clone()), Box::new(Value::none()), any_with_bitvec)
164 }
165 ValueType::Record(_) => {
166 extract_typed_column!(col, take, Any(boxed) => Box::new(*boxed.clone()), Box::new(Value::none()), any_with_bitvec)
167 }
168 ValueType::Tuple(_) => {
169 extract_typed_column!(col, take, Any(boxed) => Box::new(*boxed.clone()), Box::new(Value::none()), any_with_bitvec)
170 }
171 }
172}
173
174#[cfg(test)]
175pub mod tests {
176 use reifydb_core::{
177 interface::identifier::{ColumnIdentifier, ColumnShape},
178 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
179 };
180 use reifydb_routine::routine::registry::Routines;
181 use reifydb_rql::expression::ColumnExpression;
182 use reifydb_runtime::context::{RuntimeContext, clock::Clock};
183 use reifydb_value::{fragment::Fragment, params::Params, value::identity::IdentityId};
184
185 use super::column_lookup;
186 use crate::{expression::context::EvalContext, vm::stack::SymbolTable};
187
188 #[test]
189 fn test_column_not_found_returns_correct_row_count() {
190 let columns = Columns::new(vec![ColumnWithName::new(
192 "existing_col".to_string(),
193 ColumnBuffer::int4([1, 2, 3, 4, 5]),
194 )]);
195
196 let runtime_ctx = RuntimeContext::with_clock(Clock::Real);
197 let routines = Routines::empty();
198 let base = EvalContext {
199 params: &Params::None,
200 symbols: &SymbolTable::new(),
201 routines: &routines,
202 runtime_context: &runtime_ctx,
203 arena: None,
204 identity: IdentityId::root(),
205 is_aggregate_context: false,
206 columns: Columns::empty(),
207 row_count: 1,
208 target: None,
209 take: None,
210 };
211 let ctx = base.with_eval(columns, 5);
212
213 let result = column_lookup(
215 &ctx,
216 &ColumnExpression(ColumnIdentifier {
217 shape: ColumnShape::Alias(Fragment::internal("nonexistent_col")),
218 name: Fragment::internal("nonexistent_col"),
219 }),
220 )
221 .unwrap();
222
223 assert_eq!(
225 result.data().len(),
226 5,
227 "Column not found should return column with ctx.row_count rows, not 0"
228 );
229 }
230}