reifydb_engine/vm/volcano/
inline.rs1use std::{
5 collections::{BTreeSet, HashMap, HashSet},
6 mem,
7 sync::Arc,
8};
9
10use reifydb_core::{
11 interface::{catalog::sumtype::SumType, evaluate::TargetColumn, resolved::ResolvedShape},
12 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
13};
14use reifydb_rql::expression::{AliasExpression, ConstantExpression, Expression, IdentExpression};
15use reifydb_transaction::transaction::Transaction;
16use reifydb_value::{
17 fragment::Fragment,
18 value::{Value, constraint::Constraint, value_type::ValueType},
19};
20
21use crate::{
22 Result,
23 expression::{cast::cast_column_data, context::EvalContext, eval::evaluate},
24 vm::volcano::query::{QueryContext, QueryNode},
25};
26
27pub(crate) struct InlineDataNode {
28 rows: Vec<Vec<AliasExpression>>,
29 headers: Option<ColumnHeaders>,
30 context: Option<Arc<QueryContext>>,
31 executed: bool,
32}
33
34impl InlineDataNode {
35 pub fn new(rows: Vec<Vec<AliasExpression>>, context: Arc<QueryContext>) -> Self {
36 let cloned_context = context.clone();
37 let headers = cloned_context.source.as_ref().map(|source| {
38 let mut layout = Self::create_columns_layout_from_source(source);
39
40 if matches!(source, ResolvedShape::Series(_)) {
41 let existing: HashSet<String> =
42 layout.columns.iter().map(|c| c.text().to_string()).collect();
43 for row in &rows {
44 for alias in row {
45 let name = alias.alias.0.text().to_string();
46 if !existing.contains(&name) {
47 layout.columns.push(Fragment::internal(&name));
48 }
49 }
50 }
51 }
52 layout
53 });
54
55 Self {
56 rows,
57 headers,
58 context: Some(context),
59 executed: false,
60 }
61 }
62
63 fn create_columns_layout_from_source(source: &ResolvedShape) -> ColumnHeaders {
64 ColumnHeaders {
65 columns: source.columns().iter().map(|col| Fragment::internal(&col.name)).collect(),
66 }
67 }
68
69 fn expand_sumtype_constructors<'a>(&mut self, txn: &mut Transaction<'a>) -> Result<()> {
70 let Some(ctx) = self.context.as_ref().cloned() else {
71 return Ok(());
72 };
73 if !rows_need_sumtype_expansion(&self.rows) {
74 return Ok(());
75 }
76 for row in &mut self.rows {
77 let original = mem::take(row);
78 let mut expanded = Vec::with_capacity(original.len());
79 for alias_expr in original {
80 match alias_expr.expression.as_ref() {
81 Expression::SumTypeConstructor(_) => {
82 expand_sumtype_ctor(&ctx, txn, alias_expr, &mut expanded)?;
83 }
84 Expression::Column(_) => {
85 expand_unit_variant_column(&ctx, txn, alias_expr, &mut expanded)?;
86 }
87 _ => expanded.push(alias_expr),
88 }
89 }
90 *row = expanded;
91 }
92 Ok(())
93 }
94}
95
96#[inline]
97fn rows_need_sumtype_expansion(rows: &[Vec<AliasExpression>]) -> bool {
98 for row in rows {
99 for alias_expr in row {
100 if matches!(
101 alias_expr.expression.as_ref(),
102 Expression::SumTypeConstructor(_) | Expression::Column(_)
103 ) {
104 return true;
105 }
106 }
107 }
108 false
109}
110
111fn expand_sumtype_ctor<'a>(
112 ctx: &Arc<QueryContext>,
113 txn: &mut Transaction<'a>,
114 alias_expr: AliasExpression,
115 expanded: &mut Vec<AliasExpression>,
116) -> Result<()> {
117 let col_name = alias_expr.alias.0.text().to_string();
118 let fragment = alias_expr.fragment.clone();
119
120 let Expression::SumTypeConstructor(ctor) = *alias_expr.expression else {
121 unreachable!()
122 };
123
124 let is_unresolved = ctor.namespace.text() == ctor.variant_name.text()
125 && ctor.sumtype_name.text() == ctor.variant_name.text();
126
127 let sumtype = if is_unresolved {
128 resolve_unresolved_sumtype(ctx, txn, &col_name)?
129 } else {
130 let ns_name = ctor.namespace.text();
131 let ns = ctx.services.catalog.find_namespace_by_name(txn, ns_name)?.unwrap();
132 let sumtype_name = ctor.sumtype_name.text();
133 ctx.services.catalog.find_sumtype_by_name(txn, ns.id(), sumtype_name)?.unwrap()
134 };
135
136 let variant_name_lower = ctor.variant_name.text().to_lowercase();
137 let variant = sumtype.variants.iter().find(|v| v.name == variant_name_lower).unwrap();
138
139 expanded.push(AliasExpression {
140 alias: IdentExpression(Fragment::internal(format!("{}_tag", col_name))),
141 expression: Box::new(Expression::Constant(ConstantExpression::Number {
142 fragment: Fragment::internal(variant.tag.to_string()),
143 })),
144 fragment: fragment.clone(),
145 });
146
147 for (field_name, field_expr) in ctor.columns {
148 let phys_col_name = format!("{}_{}_{}", col_name, variant_name_lower, field_name.text().to_lowercase());
149 expanded.push(AliasExpression {
150 alias: IdentExpression(Fragment::internal(phys_col_name)),
151 expression: Box::new(field_expr),
152 fragment: fragment.clone(),
153 });
154 }
155
156 Ok(())
157}
158
159#[inline]
160fn resolve_unresolved_sumtype<'a>(
161 ctx: &Arc<QueryContext>,
162 txn: &mut Transaction<'a>,
163 col_name: &str,
164) -> Result<SumType> {
165 let tag_col_name = format!("{}_tag", col_name);
166 let source = ctx.source.as_ref().expect("source required for unresolved sumtype");
167
168 if let Some(tag_col) = source.columns().iter().find(|c| c.name == tag_col_name) {
169 let Some(Constraint::SumType(id)) = tag_col.constraint.constraint() else {
170 panic!("expected SumType constraint on tag column")
171 };
172 ctx.services.catalog.get_sumtype(txn, *id)
173 } else if let ResolvedShape::Series(series) = source {
174 let tag_id = series.def().tag.expect("series tag expected");
175 ctx.services.catalog.get_sumtype(txn, tag_id)
176 } else {
177 panic!("tag column not found: {}", tag_col_name)
178 }
179}
180
181fn expand_unit_variant_column<'a>(
182 ctx: &Arc<QueryContext>,
183 txn: &mut Transaction<'a>,
184 alias_expr: AliasExpression,
185 expanded: &mut Vec<AliasExpression>,
186) -> Result<()> {
187 let col_name = alias_expr.alias.0.text().to_string();
188
189 let resolved = if let Some(source) = ctx.source.as_ref() {
190 let Expression::Column(col) = alias_expr.expression.as_ref() else {
191 unreachable!()
192 };
193 try_resolve_unit_variant(ctx, txn, source, &col_name, col.0.name.text())?
194 } else {
195 None
196 };
197
198 let Some((sumtype, tag)) = resolved else {
199 expanded.push(alias_expr);
200 return Ok(());
201 };
202
203 let fragment = alias_expr.fragment.clone();
204 expanded.push(AliasExpression {
205 alias: IdentExpression(Fragment::internal(format!("{}_tag", col_name))),
206 expression: Box::new(Expression::Constant(ConstantExpression::Number {
207 fragment: Fragment::internal(tag.to_string()),
208 })),
209 fragment: fragment.clone(),
210 });
211 for v in &sumtype.variants {
212 for field in &v.fields {
213 let phys_col_name =
214 format!("{}_{}_{}", col_name, v.name.to_lowercase(), field.name.to_lowercase());
215 expanded.push(AliasExpression {
216 alias: IdentExpression(Fragment::internal(phys_col_name)),
217 expression: Box::new(Expression::Constant(ConstantExpression::None {
218 fragment: fragment.clone(),
219 })),
220 fragment: fragment.clone(),
221 });
222 }
223 }
224 Ok(())
225}
226
227#[inline]
228fn try_resolve_unit_variant<'a>(
229 ctx: &Arc<QueryContext>,
230 txn: &mut Transaction<'a>,
231 source: &ResolvedShape,
232 col_name: &str,
233 alias_text: &str,
234) -> Result<Option<(SumType, u8)>> {
235 let tag_col_name = format!("{}_tag", col_name);
236
237 if let Some(tag_col) = source.columns().iter().find(|c| c.name == tag_col_name) {
238 let Some(Constraint::SumType(id)) = tag_col.constraint.constraint() else {
239 return Ok(None);
240 };
241 let sumtype = ctx.services.catalog.get_sumtype(txn, *id)?;
242 let variant_name_lower = alias_text.to_lowercase();
243 let maybe_tag =
244 sumtype.variants.iter().find(|v| v.name.to_lowercase() == variant_name_lower).map(|v| v.tag);
245 return Ok(maybe_tag.map(|tag| (sumtype, tag)));
246 }
247
248 if let ResolvedShape::Series(series) = source
249 && let Some(tag_id) = series.def().tag
250 {
251 let sumtype = ctx.services.catalog.get_sumtype(txn, tag_id)?;
252 let variant_name_lower = alias_text.to_lowercase();
253 let maybe_tag =
254 sumtype.variants.iter().find(|v| v.name.to_lowercase() == variant_name_lower).map(|v| v.tag);
255 return Ok(maybe_tag.map(|tag| (sumtype, tag)));
256 }
257
258 Ok(None)
259}
260
261impl QueryNode for InlineDataNode {
262 fn initialize<'a>(&mut self, rx: &mut Transaction<'a>, _ctx: &QueryContext) -> Result<()> {
263 self.expand_sumtype_constructors(rx)?;
264 Ok(())
265 }
266
267 fn next<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
268 debug_assert!(self.context.is_some(), "InlineDataNode::next() called before initialize()");
269 let stored_ctx = self.context.as_ref().unwrap().clone();
270
271 if self.executed {
272 return Ok(None);
273 }
274
275 self.executed = true;
276
277 if self.rows.is_empty() {
278 let columns = Columns::empty();
279 if self.headers.is_none() {
280 self.headers = Some(ColumnHeaders::from_columns(&columns));
281 }
282 return Ok(Some(columns));
283 }
284
285 if self.headers.is_some() {
286 self.next_with_source(&stored_ctx)
287 } else {
288 self.next_infer_namespace(&stored_ctx)
289 }
290 }
291
292 fn headers(&self) -> Option<ColumnHeaders> {
293 self.headers.clone()
294 }
295}
296
297impl InlineDataNode {
298 fn find_optimal_integer_type(column: &ColumnBuffer) -> ValueType {
299 let mut min_val = i128::MAX;
300 let mut max_val = i128::MIN;
301 let mut has_values = false;
302
303 for value in column.iter() {
304 match value {
305 Value::Int16(v) => {
306 has_values = true;
307 min_val = min_val.min(v);
308 max_val = max_val.max(v);
309 }
310 Value::None {
311 ..
312 } => {}
313 _ => {
314 return ValueType::Int16;
315 }
316 }
317 }
318
319 if !has_values {
320 return ValueType::Int1;
321 }
322
323 if min_val >= i8::MIN as i128 && max_val <= i8::MAX as i128 {
324 ValueType::Int1
325 } else if min_val >= i16::MIN as i128 && max_val <= i16::MAX as i128 {
326 ValueType::Int2
327 } else if min_val >= i32::MIN as i128 && max_val <= i32::MAX as i128 {
328 ValueType::Int4
329 } else if min_val >= i64::MIN as i128 && max_val <= i64::MAX as i128 {
330 ValueType::Int8
331 } else {
332 ValueType::Int16
333 }
334 }
335
336 fn next_infer_namespace(&mut self, ctx: &QueryContext) -> Result<Option<Columns>> {
337 let mut all_columns: BTreeSet<String> = BTreeSet::new();
338
339 for row in &self.rows {
340 for keyed_expr in row {
341 let column_name = keyed_expr.alias.0.text().to_string();
342 all_columns.insert(column_name);
343 }
344 }
345
346 let mut rows_data: Vec<HashMap<String, &AliasExpression>> = Vec::new();
347
348 for row in &self.rows {
349 let mut row_map: HashMap<String, &AliasExpression> = HashMap::new();
350 for alias_expr in row {
351 let column_name = alias_expr.alias.0.text().to_string();
352 row_map.insert(column_name, alias_expr);
353 }
354 rows_data.push(row_map);
355 }
356
357 let session = EvalContext::from_query(ctx);
358
359 let mut columns = Vec::new();
360
361 for column_name in all_columns {
362 let mut all_values = Vec::new();
363 let mut first_value_type: Option<ValueType> = None;
364 let mut column_fragment: Option<Fragment> = None;
365
366 for row_data in &rows_data {
367 if let Some(alias_expr) = row_data.get(&column_name) {
368 if column_fragment.is_none() {
369 column_fragment = Some(alias_expr.fragment.clone());
370 }
371 let eval_ctx = session.with_eval_empty();
372
373 let evaluated = evaluate(&eval_ctx, &alias_expr.expression)?;
374
375 let mut iter = evaluated.data().iter();
376 if let Some(value) = iter.next() {
377 if first_value_type.is_none() && !matches!(value, Value::None { .. }) {
378 first_value_type = Some(value.get_type());
379 }
380 all_values.push(value);
381 } else {
382 all_values.push(Value::none());
383 }
384 } else {
385 all_values.push(Value::none());
386 }
387 }
388
389 let wide_type = if let Some(ref fvt) = first_value_type {
390 if *fvt == ValueType::Decimal {
391 Some(ValueType::Decimal)
392 } else if *fvt == ValueType::Int {
393 Some(ValueType::Int)
394 } else if *fvt == ValueType::Uint {
395 Some(ValueType::Uint)
396 } else if fvt.is_integer() {
397 Some(ValueType::Int16)
398 } else if fvt.is_floating_point() {
399 Some(ValueType::Float8)
400 } else if *fvt == ValueType::Utf8 {
401 Some(ValueType::Utf8)
402 } else if *fvt == ValueType::Boolean {
403 Some(ValueType::Boolean)
404 } else {
405 None
406 }
407 } else {
408 None
409 };
410
411 let mut column_data = if wide_type.is_none() {
412 ColumnBuffer::none_typed(ValueType::Boolean, all_values.len())
413 } else {
414 let mut data = ColumnBuffer::with_capacity(wide_type.clone().unwrap(), 0);
415
416 for value in &all_values {
417 if matches!(value, Value::None { .. }) {
418 data.push_none();
419 } else if wide_type.as_ref().is_some_and(|wt| value.get_type() == *wt) {
420 data.push_value(value.clone());
421 } else {
422 let temp_data = ColumnBuffer::from(value.clone());
423 let eval_ctx = session.with_eval_empty();
424
425 match cast_column_data(
426 &eval_ctx,
427 &temp_data,
428 wide_type.clone().unwrap(),
429 Fragment::none,
430 ) {
431 Ok(casted) => {
432 if let Some(casted_value) = casted.iter().next() {
433 data.push_value(casted_value);
434 } else {
435 data.push_none();
436 }
437 }
438 Err(_) => {
439 data.push_none();
440 }
441 }
442 }
443 }
444
445 data
446 };
447
448 if wide_type == Some(ValueType::Int16) {
449 let optimal_type = Self::find_optimal_integer_type(&column_data);
450 if optimal_type != ValueType::Int16 {
451 let eval_ctx = session.with_eval(Columns::empty(), column_data.len());
452
453 if let Ok(demoted) =
454 cast_column_data(&eval_ctx, &column_data, optimal_type, || {
455 Fragment::none()
456 }) {
457 column_data = demoted;
458 }
459 }
460 }
461
462 columns.push(ColumnWithName::new(
463 column_fragment.unwrap_or_else(|| Fragment::internal(column_name)),
464 column_data,
465 ));
466 }
467
468 let columns = Columns::new(columns);
469 self.headers = Some(ColumnHeaders::from_columns(&columns));
470
471 Ok(Some(columns))
472 }
473
474 fn next_with_source(&mut self, ctx: &QueryContext) -> Result<Option<Columns>> {
475 let source = ctx.source.as_ref().unwrap();
476 let headers = self.headers.as_ref().unwrap();
477 let session = EvalContext::from_query(ctx);
478
479 let mut rows_data: Vec<HashMap<String, &AliasExpression>> = Vec::new();
480
481 for row in &self.rows {
482 let mut row_map: HashMap<String, &AliasExpression> = HashMap::new();
483 for alias_expr in row {
484 let column_name = alias_expr.alias.0.text().to_string();
485 row_map.insert(column_name, alias_expr);
486 }
487 rows_data.push(row_map);
488 }
489
490 let mut columns = Vec::new();
491
492 for column_name in &headers.columns {
493 let table_column = source.columns().iter().find(|col| col.name == column_name.text());
494
495 let mut column_data = if let Some(tc) = table_column {
496 ColumnBuffer::none_typed(tc.constraint.get_type(), 0)
497 } else {
498 ColumnBuffer::with_capacity(ValueType::Int16, 0)
499 };
500 let mut column_fragment: Option<Fragment> = None;
501
502 for row_data in &rows_data {
503 if let Some(alias_expr) = row_data.get(column_name.text()) {
504 if column_fragment.is_none() {
505 column_fragment = Some(alias_expr.fragment.clone());
506 }
507 let mut eval_ctx = session.with_eval_empty();
508 eval_ctx.target = table_column.map(|tc| TargetColumn::Partial {
509 source_name: Some(source.identifier().text().to_string()),
510 column_name: Some(tc.name.clone()),
511 column_type: tc.constraint.get_type(),
512 properties: tc
513 .properties
514 .iter()
515 .map(|cp| cp.property.clone())
516 .collect(),
517 });
518
519 let evaluated = evaluate(&eval_ctx, &alias_expr.expression)?;
520
521 let eval_len = evaluated.data().len();
522 if table_column.is_some() {
523 if eval_len == 1 {
524 column_data.extend(evaluated.data().clone())?;
525 } else if eval_len == 0 {
526 column_data.push_value(Value::none());
527 } else {
528 let first_value =
529 evaluated.data().iter().next().unwrap_or(Value::none());
530 column_data.push_value(first_value);
531 }
532 } else {
533 let value = if eval_len > 0 {
534 evaluated.data().iter().next().unwrap_or(Value::none())
535 } else {
536 Value::none()
537 };
538 match &value {
539 Value::None {
540 ..
541 } => column_data.push_none(),
542 Value::Int16(_) => column_data.push_value(value),
543 _ => {
544 let temp = ColumnBuffer::from(value.clone());
545 match cast_column_data(
546 &eval_ctx,
547 &temp,
548 ValueType::Int16,
549 Fragment::none,
550 ) {
551 Ok(casted) => {
552 if let Some(v) = casted.iter().next() {
553 column_data.push_value(v);
554 } else {
555 column_data.push_none();
556 }
557 }
558 Err(_) => column_data.push_value(value),
559 }
560 }
561 }
562 }
563 } else {
564 column_data.push_value(Value::none());
565 }
566 }
567
568 if table_column.is_none() {
569 let optimal_type = Self::find_optimal_integer_type(&column_data);
570 if optimal_type != ValueType::Int16 {
571 let eval_ctx = session.with_eval(Columns::empty(), column_data.len());
572 if let Ok(demoted) =
573 cast_column_data(&eval_ctx, &column_data, optimal_type, || {
574 Fragment::none()
575 }) {
576 column_data = demoted;
577 }
578 }
579 }
580
581 columns.push(ColumnWithName::new(
582 column_fragment
583 .map(|f| f.with_text(column_name.text()))
584 .unwrap_or_else(|| column_name.clone()),
585 column_data,
586 ));
587 }
588
589 let columns = Columns::new(columns);
590
591 Ok(Some(columns))
592 }
593}