reifydb_engine/vm/volcano/
inline.rs1use std::{
5 collections::{BTreeSet, HashMap, HashSet},
6 mem,
7 sync::Arc,
8};
9
10use reifydb_core::{
11 interface::{catalog::sumtype::SumType, evaluate::TargetColumn, resolved::ResolvedShape},
12 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
13};
14use reifydb_rql::expression::{AliasExpression, ConstantExpression, Expression, IdentExpression};
15use reifydb_transaction::transaction::Transaction;
16use reifydb_type::{
17 fragment::Fragment,
18 value::{Value, constraint::Constraint, r#type::Type},
19};
20
21use crate::{
22 Result,
23 expression::{cast::cast_column_data, context::EvalContext, eval::evaluate},
24 vm::volcano::query::{QueryContext, QueryNode},
25};
26
27pub(crate) struct InlineDataNode {
28 rows: Vec<Vec<AliasExpression>>,
29 headers: Option<ColumnHeaders>,
30 context: Option<Arc<QueryContext>>,
31 executed: bool,
32}
33
34impl InlineDataNode {
35 pub fn new(rows: Vec<Vec<AliasExpression>>, context: Arc<QueryContext>) -> Self {
36 let cloned_context = context.clone();
38 let headers = cloned_context.source.as_ref().map(|source| {
39 let mut layout = Self::create_columns_layout_from_source(source);
40 if matches!(source, ResolvedShape::Series(_)) {
43 let existing: HashSet<String> =
44 layout.columns.iter().map(|c| c.text().to_string()).collect();
45 for row in &rows {
46 for alias in row {
47 let name = alias.alias.0.text().to_string();
48 if !existing.contains(&name) {
49 layout.columns.push(Fragment::internal(&name));
50 }
51 }
52 }
53 }
54 layout
55 });
56
57 Self {
58 rows,
59 headers,
60 context: Some(context),
61 executed: false,
62 }
63 }
64
65 fn create_columns_layout_from_source(source: &ResolvedShape) -> ColumnHeaders {
66 ColumnHeaders {
67 columns: source.columns().iter().map(|col| Fragment::internal(&col.name)).collect(),
68 }
69 }
70
71 fn expand_sumtype_constructors<'a>(&mut self, txn: &mut Transaction<'a>) -> Result<()> {
72 let Some(ctx) = self.context.as_ref().cloned() else {
73 return Ok(());
74 };
75 if !rows_need_sumtype_expansion(&self.rows) {
76 return Ok(());
77 }
78 for row in &mut self.rows {
79 let original = mem::take(row);
80 let mut expanded = Vec::with_capacity(original.len());
81 for alias_expr in original {
82 match alias_expr.expression.as_ref() {
83 Expression::SumTypeConstructor(_) => {
84 expand_sumtype_ctor(&ctx, txn, alias_expr, &mut expanded)?;
85 }
86 Expression::Column(_) => {
87 expand_unit_variant_column(&ctx, txn, alias_expr, &mut expanded)?;
88 }
89 _ => expanded.push(alias_expr),
90 }
91 }
92 *row = expanded;
93 }
94 Ok(())
95 }
96}
97
98#[inline]
99fn rows_need_sumtype_expansion(rows: &[Vec<AliasExpression>]) -> bool {
100 for row in rows {
101 for alias_expr in row {
102 if matches!(
103 alias_expr.expression.as_ref(),
104 Expression::SumTypeConstructor(_) | Expression::Column(_)
105 ) {
106 return true;
107 }
108 }
109 }
110 false
111}
112
113fn expand_sumtype_ctor<'a>(
114 ctx: &Arc<QueryContext>,
115 txn: &mut Transaction<'a>,
116 alias_expr: AliasExpression,
117 expanded: &mut Vec<AliasExpression>,
118) -> Result<()> {
119 let col_name = alias_expr.alias.0.text().to_string();
120 let fragment = alias_expr.fragment.clone();
121
122 let Expression::SumTypeConstructor(ctor) = *alias_expr.expression else {
123 unreachable!()
124 };
125
126 let is_unresolved = ctor.namespace.text() == ctor.variant_name.text()
127 && ctor.sumtype_name.text() == ctor.variant_name.text();
128
129 let sumtype = if is_unresolved {
130 resolve_unresolved_sumtype(ctx, txn, &col_name)?
131 } else {
132 let ns_name = ctor.namespace.text();
133 let ns = ctx.services.catalog.find_namespace_by_name(txn, ns_name)?.unwrap();
134 let sumtype_name = ctor.sumtype_name.text();
135 ctx.services.catalog.find_sumtype_by_name(txn, ns.id(), sumtype_name)?.unwrap()
136 };
137
138 let variant_name_lower = ctor.variant_name.text().to_lowercase();
139 let variant = sumtype.variants.iter().find(|v| v.name == variant_name_lower).unwrap();
140
141 expanded.push(AliasExpression {
142 alias: IdentExpression(Fragment::internal(format!("{}_tag", col_name))),
143 expression: Box::new(Expression::Constant(ConstantExpression::Number {
144 fragment: Fragment::internal(variant.tag.to_string()),
145 })),
146 fragment: fragment.clone(),
147 });
148
149 for (field_name, field_expr) in ctor.columns {
150 let phys_col_name = format!("{}_{}_{}", col_name, variant_name_lower, field_name.text().to_lowercase());
151 expanded.push(AliasExpression {
152 alias: IdentExpression(Fragment::internal(phys_col_name)),
153 expression: Box::new(field_expr),
154 fragment: fragment.clone(),
155 });
156 }
157
158 Ok(())
159}
160
161#[inline]
162fn resolve_unresolved_sumtype<'a>(
163 ctx: &Arc<QueryContext>,
164 txn: &mut Transaction<'a>,
165 col_name: &str,
166) -> Result<SumType> {
167 let tag_col_name = format!("{}_tag", col_name);
168 let source = ctx.source.as_ref().expect("source required for unresolved sumtype");
169
170 if let Some(tag_col) = source.columns().iter().find(|c| c.name == tag_col_name) {
171 let Some(Constraint::SumType(id)) = tag_col.constraint.constraint() else {
172 panic!("expected SumType constraint on tag column")
173 };
174 ctx.services.catalog.get_sumtype(txn, *id)
175 } else if let ResolvedShape::Series(series) = source {
176 let tag_id = series.def().tag.expect("series tag expected");
177 ctx.services.catalog.get_sumtype(txn, tag_id)
178 } else {
179 panic!("tag column not found: {}", tag_col_name)
180 }
181}
182
183fn expand_unit_variant_column<'a>(
184 ctx: &Arc<QueryContext>,
185 txn: &mut Transaction<'a>,
186 alias_expr: AliasExpression,
187 expanded: &mut Vec<AliasExpression>,
188) -> Result<()> {
189 let col_name = alias_expr.alias.0.text().to_string();
190
191 let resolved = if let Some(source) = ctx.source.as_ref() {
192 let Expression::Column(col) = alias_expr.expression.as_ref() else {
193 unreachable!()
194 };
195 try_resolve_unit_variant(ctx, txn, source, &col_name, col.0.name.text())?
196 } else {
197 None
198 };
199
200 let Some((sumtype, tag)) = resolved else {
201 expanded.push(alias_expr);
202 return Ok(());
203 };
204
205 let fragment = alias_expr.fragment.clone();
206 expanded.push(AliasExpression {
207 alias: IdentExpression(Fragment::internal(format!("{}_tag", col_name))),
208 expression: Box::new(Expression::Constant(ConstantExpression::Number {
209 fragment: Fragment::internal(tag.to_string()),
210 })),
211 fragment: fragment.clone(),
212 });
213 for v in &sumtype.variants {
214 for field in &v.fields {
215 let phys_col_name =
216 format!("{}_{}_{}", col_name, v.name.to_lowercase(), field.name.to_lowercase());
217 expanded.push(AliasExpression {
218 alias: IdentExpression(Fragment::internal(phys_col_name)),
219 expression: Box::new(Expression::Constant(ConstantExpression::None {
220 fragment: fragment.clone(),
221 })),
222 fragment: fragment.clone(),
223 });
224 }
225 }
226 Ok(())
227}
228
229#[inline]
230fn try_resolve_unit_variant<'a>(
231 ctx: &Arc<QueryContext>,
232 txn: &mut Transaction<'a>,
233 source: &ResolvedShape,
234 col_name: &str,
235 alias_text: &str,
236) -> Result<Option<(SumType, u8)>> {
237 let tag_col_name = format!("{}_tag", col_name);
238
239 if let Some(tag_col) = source.columns().iter().find(|c| c.name == tag_col_name) {
240 let Some(Constraint::SumType(id)) = tag_col.constraint.constraint() else {
241 return Ok(None);
242 };
243 let sumtype = ctx.services.catalog.get_sumtype(txn, *id)?;
244 let variant_name_lower = alias_text.to_lowercase();
245 let maybe_tag =
246 sumtype.variants.iter().find(|v| v.name.to_lowercase() == variant_name_lower).map(|v| v.tag);
247 return Ok(maybe_tag.map(|tag| (sumtype, tag)));
248 }
249
250 if let ResolvedShape::Series(series) = source
251 && let Some(tag_id) = series.def().tag
252 {
253 let sumtype = ctx.services.catalog.get_sumtype(txn, tag_id)?;
254 let variant_name_lower = alias_text.to_lowercase();
255 let maybe_tag =
256 sumtype.variants.iter().find(|v| v.name.to_lowercase() == variant_name_lower).map(|v| v.tag);
257 return Ok(maybe_tag.map(|tag| (sumtype, tag)));
258 }
259
260 Ok(None)
261}
262
263impl QueryNode for InlineDataNode {
264 fn initialize<'a>(&mut self, rx: &mut Transaction<'a>, _ctx: &QueryContext) -> Result<()> {
265 self.expand_sumtype_constructors(rx)?;
266 Ok(())
267 }
268
269 fn next<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
270 debug_assert!(self.context.is_some(), "InlineDataNode::next() called before initialize()");
271 let stored_ctx = self.context.as_ref().unwrap().clone();
272
273 if self.executed {
274 return Ok(None);
275 }
276
277 self.executed = true;
278
279 if self.rows.is_empty() {
280 let columns = Columns::empty();
281 if self.headers.is_none() {
282 self.headers = Some(ColumnHeaders::from_columns(&columns));
283 }
284 return Ok(Some(columns));
285 }
286
287 if self.headers.is_some() {
290 self.next_with_source(&stored_ctx)
291 } else {
292 self.next_infer_namespace(&stored_ctx)
293 }
294 }
295
296 fn headers(&self) -> Option<ColumnHeaders> {
297 self.headers.clone()
298 }
299}
300
301impl InlineDataNode {
302 fn find_optimal_integer_type(column: &ColumnBuffer) -> Type {
305 let mut min_val = i128::MAX;
306 let mut max_val = i128::MIN;
307 let mut has_values = false;
308
309 for value in column.iter() {
310 match value {
311 Value::Int16(v) => {
312 has_values = true;
313 min_val = min_val.min(v);
314 max_val = max_val.max(v);
315 }
316 Value::None {
317 ..
318 } => {
319 }
321 _ => {
322 return Type::Int16;
324 }
325 }
326 }
327
328 if !has_values {
329 return Type::Int1; }
331
332 if min_val >= i8::MIN as i128 && max_val <= i8::MAX as i128 {
334 Type::Int1
335 } else if min_val >= i16::MIN as i128 && max_val <= i16::MAX as i128 {
336 Type::Int2
337 } else if min_val >= i32::MIN as i128 && max_val <= i32::MAX as i128 {
338 Type::Int4
339 } else if min_val >= i64::MIN as i128 && max_val <= i64::MAX as i128 {
340 Type::Int8
341 } else {
342 Type::Int16
343 }
344 }
345
346 fn next_infer_namespace(&mut self, ctx: &QueryContext) -> Result<Option<Columns>> {
347 let mut all_columns: BTreeSet<String> = BTreeSet::new();
349
350 for row in &self.rows {
351 for keyed_expr in row {
352 let column_name = keyed_expr.alias.0.text().to_string();
353 all_columns.insert(column_name);
354 }
355 }
356
357 let mut rows_data: Vec<HashMap<String, &AliasExpression>> = Vec::new();
359
360 for row in &self.rows {
361 let mut row_map: HashMap<String, &AliasExpression> = HashMap::new();
362 for alias_expr in row {
363 let column_name = alias_expr.alias.0.text().to_string();
364 row_map.insert(column_name, alias_expr);
365 }
366 rows_data.push(row_map);
367 }
368
369 let session = EvalContext::from_query(ctx);
370
371 let mut columns = Vec::new();
373
374 for column_name in all_columns {
375 let mut all_values = Vec::new();
377 let mut first_value_type: Option<Type> = None;
378 let mut column_fragment: Option<Fragment> = None;
379
380 for row_data in &rows_data {
381 if let Some(alias_expr) = row_data.get(&column_name) {
382 if column_fragment.is_none() {
383 column_fragment = Some(alias_expr.fragment.clone());
384 }
385 let eval_ctx = session.with_eval_empty();
386
387 let evaluated = evaluate(&eval_ctx, &alias_expr.expression)?;
388
389 let mut iter = evaluated.data().iter();
392 if let Some(value) = iter.next() {
393 if first_value_type.is_none() && !matches!(value, Value::None { .. }) {
396 first_value_type = Some(value.get_type());
397 }
398 all_values.push(value);
399 } else {
400 all_values.push(Value::none());
401 }
402 } else {
403 all_values.push(Value::none());
404 }
405 }
406
407 let wide_type = if let Some(ref fvt) = first_value_type {
409 if fvt.is_integer() {
410 Some(Type::Int16) } else if fvt.is_floating_point() {
412 Some(Type::Float8) } else if *fvt == Type::Utf8 {
414 Some(Type::Utf8)
415 } else if *fvt == Type::Boolean {
416 Some(Type::Boolean)
417 } else {
418 None
419 }
420 } else {
421 None
422 };
423
424 let mut column_data = if wide_type.is_none() {
426 ColumnBuffer::none_typed(Type::Boolean, all_values.len())
427 } else {
428 let mut data = ColumnBuffer::with_capacity(wide_type.clone().unwrap(), 0);
429
430 for value in &all_values {
433 if matches!(value, Value::None { .. }) {
434 data.push_none();
435 } else if wide_type.as_ref().is_some_and(|wt| value.get_type() == *wt) {
436 data.push_value(value.clone());
437 } else {
438 let temp_data = ColumnBuffer::from(value.clone());
440 let eval_ctx = session.with_eval_empty();
441
442 match cast_column_data(
443 &eval_ctx,
444 &temp_data,
445 wide_type.clone().unwrap(),
446 Fragment::none,
447 ) {
448 Ok(casted) => {
449 if let Some(casted_value) = casted.iter().next() {
450 data.push_value(casted_value);
451 } else {
452 data.push_none();
453 }
454 }
455 Err(_) => {
456 data.push_none();
457 }
458 }
459 }
460 }
461
462 data
463 };
464
465 if wide_type == Some(Type::Int16) {
468 let optimal_type = Self::find_optimal_integer_type(&column_data);
469 if optimal_type != Type::Int16 {
470 let eval_ctx = session.with_eval(Columns::empty(), column_data.len());
472
473 if let Ok(demoted) =
474 cast_column_data(&eval_ctx, &column_data, optimal_type, || {
475 Fragment::none()
476 }) {
477 column_data = demoted;
478 }
479 }
480 }
481 columns.push(ColumnWithName::new(
485 column_fragment.unwrap_or_else(|| Fragment::internal(column_name)),
486 column_data,
487 ));
488 }
489
490 let columns = Columns::new(columns);
491 self.headers = Some(ColumnHeaders::from_columns(&columns));
492
493 Ok(Some(columns))
494 }
495
496 fn next_with_source(&mut self, ctx: &QueryContext) -> Result<Option<Columns>> {
497 let source = ctx.source.as_ref().unwrap(); let headers = self.headers.as_ref().unwrap(); let session = EvalContext::from_query(ctx);
500
501 let mut rows_data: Vec<HashMap<String, &AliasExpression>> = Vec::new();
503
504 for row in &self.rows {
505 let mut row_map: HashMap<String, &AliasExpression> = HashMap::new();
506 for alias_expr in row {
507 let column_name = alias_expr.alias.0.text().to_string();
508 row_map.insert(column_name, alias_expr);
509 }
510 rows_data.push(row_map);
511 }
512
513 let mut columns = Vec::new();
515
516 for column_name in &headers.columns {
517 let table_column = source.columns().iter().find(|col| col.name == column_name.text());
520
521 let mut column_data = if let Some(tc) = table_column {
522 ColumnBuffer::none_typed(tc.constraint.get_type(), 0)
523 } else {
524 ColumnBuffer::with_capacity(Type::Int16, 0)
525 };
526 let mut column_fragment: Option<Fragment> = None;
527
528 for row_data in &rows_data {
529 if let Some(alias_expr) = row_data.get(column_name.text()) {
530 if column_fragment.is_none() {
531 column_fragment = Some(alias_expr.fragment.clone());
532 }
533 let mut eval_ctx = session.with_eval_empty();
534 eval_ctx.target = table_column.map(|tc| TargetColumn::Partial {
535 source_name: Some(source.identifier().text().to_string()),
536 column_name: Some(tc.name.clone()),
537 column_type: tc.constraint.get_type(),
538 properties: tc
539 .properties
540 .iter()
541 .map(|cp| cp.property.clone())
542 .collect(),
543 });
544
545 let evaluated = evaluate(&eval_ctx, &alias_expr.expression)?;
546
547 let eval_len = evaluated.data().len();
550 if table_column.is_some() {
551 if eval_len == 1 {
553 column_data.extend(evaluated.data().clone())?;
554 } else if eval_len == 0 {
555 column_data.push_value(Value::none());
556 } else {
557 let first_value =
558 evaluated.data().iter().next().unwrap_or(Value::none());
559 column_data.push_value(first_value);
560 }
561 } else {
562 let value = if eval_len > 0 {
564 evaluated.data().iter().next().unwrap_or(Value::none())
565 } else {
566 Value::none()
567 };
568 match &value {
569 Value::None {
570 ..
571 } => column_data.push_none(),
572 Value::Int16(_) => column_data.push_value(value),
573 _ => {
574 let temp = ColumnBuffer::from(value.clone());
575 match cast_column_data(
576 &eval_ctx,
577 &temp,
578 Type::Int16,
579 Fragment::none,
580 ) {
581 Ok(casted) => {
582 if let Some(v) = casted.iter().next() {
583 column_data.push_value(v);
584 } else {
585 column_data.push_none();
586 }
587 }
588 Err(_) => column_data.push_value(value),
589 }
590 }
591 }
592 }
593 } else {
594 column_data.push_value(Value::none());
595 }
596 }
597
598 if table_column.is_none() {
600 let optimal_type = Self::find_optimal_integer_type(&column_data);
601 if optimal_type != Type::Int16 {
602 let eval_ctx = session.with_eval(Columns::empty(), column_data.len());
603 if let Ok(demoted) =
604 cast_column_data(&eval_ctx, &column_data, optimal_type, || {
605 Fragment::none()
606 }) {
607 column_data = demoted;
608 }
609 }
610 }
611
612 columns.push(ColumnWithName::new(
613 column_fragment
614 .map(|f| f.with_text(column_name.text()))
615 .unwrap_or_else(|| column_name.clone()),
616 column_data,
617 ));
618 }
619
620 let columns = Columns::new(columns);
621
622 Ok(Some(columns))
623 }
624}