Skip to main content

reifydb_engine/vm/volcano/
inline.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::{BTreeSet, HashMap, HashSet},
6	mem,
7	sync::Arc,
8};
9
10use reifydb_core::{
11	interface::{catalog::sumtype::SumType, evaluate::TargetColumn, resolved::ResolvedShape},
12	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
13};
14use reifydb_rql::expression::{AliasExpression, ConstantExpression, Expression, IdentExpression};
15use reifydb_transaction::transaction::Transaction;
16use reifydb_type::{
17	fragment::Fragment,
18	value::{Value, constraint::Constraint, r#type::Type},
19};
20
21use crate::{
22	Result,
23	expression::{cast::cast_column_data, context::EvalContext, eval::evaluate},
24	vm::volcano::query::{QueryContext, QueryNode},
25};
26
27pub(crate) struct InlineDataNode {
28	rows: Vec<Vec<AliasExpression>>,
29	headers: Option<ColumnHeaders>,
30	context: Option<Arc<QueryContext>>,
31	executed: bool,
32}
33
34impl InlineDataNode {
35	pub fn new(rows: Vec<Vec<AliasExpression>>, context: Arc<QueryContext>) -> Self {
36		// Clone the Arc to extract headers without borrowing issues
37		let cloned_context = context.clone();
38		let headers = cloned_context.source.as_ref().map(|source| {
39			let mut layout = Self::create_columns_layout_from_source(source);
40			// For series, include extra columns from input (e.g., timestamp, tag)
41			// that aren't part of the shape but are needed by the insert executor.
42			if matches!(source, ResolvedShape::Series(_)) {
43				let existing: HashSet<String> =
44					layout.columns.iter().map(|c| c.text().to_string()).collect();
45				for row in &rows {
46					for alias in row {
47						let name = alias.alias.0.text().to_string();
48						if !existing.contains(&name) {
49							layout.columns.push(Fragment::internal(&name));
50						}
51					}
52				}
53			}
54			layout
55		});
56
57		Self {
58			rows,
59			headers,
60			context: Some(context),
61			executed: false,
62		}
63	}
64
65	fn create_columns_layout_from_source(source: &ResolvedShape) -> ColumnHeaders {
66		ColumnHeaders {
67			columns: source.columns().iter().map(|col| Fragment::internal(&col.name)).collect(),
68		}
69	}
70
71	fn expand_sumtype_constructors<'a>(&mut self, txn: &mut Transaction<'a>) -> Result<()> {
72		let Some(ctx) = self.context.as_ref().cloned() else {
73			return Ok(());
74		};
75		if !rows_need_sumtype_expansion(&self.rows) {
76			return Ok(());
77		}
78		for row in &mut self.rows {
79			let original = mem::take(row);
80			let mut expanded = Vec::with_capacity(original.len());
81			for alias_expr in original {
82				match alias_expr.expression.as_ref() {
83					Expression::SumTypeConstructor(_) => {
84						expand_sumtype_ctor(&ctx, txn, alias_expr, &mut expanded)?;
85					}
86					Expression::Column(_) => {
87						expand_unit_variant_column(&ctx, txn, alias_expr, &mut expanded)?;
88					}
89					_ => expanded.push(alias_expr),
90				}
91			}
92			*row = expanded;
93		}
94		Ok(())
95	}
96}
97
98#[inline]
99fn rows_need_sumtype_expansion(rows: &[Vec<AliasExpression>]) -> bool {
100	for row in rows {
101		for alias_expr in row {
102			if matches!(
103				alias_expr.expression.as_ref(),
104				Expression::SumTypeConstructor(_) | Expression::Column(_)
105			) {
106				return true;
107			}
108		}
109	}
110	false
111}
112
113fn expand_sumtype_ctor<'a>(
114	ctx: &Arc<QueryContext>,
115	txn: &mut Transaction<'a>,
116	alias_expr: AliasExpression,
117	expanded: &mut Vec<AliasExpression>,
118) -> Result<()> {
119	let col_name = alias_expr.alias.0.text().to_string();
120	let fragment = alias_expr.fragment.clone();
121
122	let Expression::SumTypeConstructor(ctor) = *alias_expr.expression else {
123		unreachable!()
124	};
125
126	let is_unresolved = ctor.namespace.text() == ctor.variant_name.text()
127		&& ctor.sumtype_name.text() == ctor.variant_name.text();
128
129	let sumtype = if is_unresolved {
130		resolve_unresolved_sumtype(ctx, txn, &col_name)?
131	} else {
132		let ns_name = ctor.namespace.text();
133		let ns = ctx.services.catalog.find_namespace_by_name(txn, ns_name)?.unwrap();
134		let sumtype_name = ctor.sumtype_name.text();
135		ctx.services.catalog.find_sumtype_by_name(txn, ns.id(), sumtype_name)?.unwrap()
136	};
137
138	let variant_name_lower = ctor.variant_name.text().to_lowercase();
139	let variant = sumtype.variants.iter().find(|v| v.name == variant_name_lower).unwrap();
140
141	expanded.push(AliasExpression {
142		alias: IdentExpression(Fragment::internal(format!("{}_tag", col_name))),
143		expression: Box::new(Expression::Constant(ConstantExpression::Number {
144			fragment: Fragment::internal(variant.tag.to_string()),
145		})),
146		fragment: fragment.clone(),
147	});
148
149	for (field_name, field_expr) in ctor.columns {
150		let phys_col_name = format!("{}_{}_{}", col_name, variant_name_lower, field_name.text().to_lowercase());
151		expanded.push(AliasExpression {
152			alias: IdentExpression(Fragment::internal(phys_col_name)),
153			expression: Box::new(field_expr),
154			fragment: fragment.clone(),
155		});
156	}
157
158	Ok(())
159}
160
161#[inline]
162fn resolve_unresolved_sumtype<'a>(
163	ctx: &Arc<QueryContext>,
164	txn: &mut Transaction<'a>,
165	col_name: &str,
166) -> Result<SumType> {
167	let tag_col_name = format!("{}_tag", col_name);
168	let source = ctx.source.as_ref().expect("source required for unresolved sumtype");
169
170	if let Some(tag_col) = source.columns().iter().find(|c| c.name == tag_col_name) {
171		let Some(Constraint::SumType(id)) = tag_col.constraint.constraint() else {
172			panic!("expected SumType constraint on tag column")
173		};
174		ctx.services.catalog.get_sumtype(txn, *id)
175	} else if let ResolvedShape::Series(series) = source {
176		let tag_id = series.def().tag.expect("series tag expected");
177		ctx.services.catalog.get_sumtype(txn, tag_id)
178	} else {
179		panic!("tag column not found: {}", tag_col_name)
180	}
181}
182
183fn expand_unit_variant_column<'a>(
184	ctx: &Arc<QueryContext>,
185	txn: &mut Transaction<'a>,
186	alias_expr: AliasExpression,
187	expanded: &mut Vec<AliasExpression>,
188) -> Result<()> {
189	let col_name = alias_expr.alias.0.text().to_string();
190
191	let resolved = if let Some(source) = ctx.source.as_ref() {
192		let Expression::Column(col) = alias_expr.expression.as_ref() else {
193			unreachable!()
194		};
195		try_resolve_unit_variant(ctx, txn, source, &col_name, col.0.name.text())?
196	} else {
197		None
198	};
199
200	let Some((sumtype, tag)) = resolved else {
201		expanded.push(alias_expr);
202		return Ok(());
203	};
204
205	let fragment = alias_expr.fragment.clone();
206	expanded.push(AliasExpression {
207		alias: IdentExpression(Fragment::internal(format!("{}_tag", col_name))),
208		expression: Box::new(Expression::Constant(ConstantExpression::Number {
209			fragment: Fragment::internal(tag.to_string()),
210		})),
211		fragment: fragment.clone(),
212	});
213	for v in &sumtype.variants {
214		for field in &v.fields {
215			let phys_col_name =
216				format!("{}_{}_{}", col_name, v.name.to_lowercase(), field.name.to_lowercase());
217			expanded.push(AliasExpression {
218				alias: IdentExpression(Fragment::internal(phys_col_name)),
219				expression: Box::new(Expression::Constant(ConstantExpression::None {
220					fragment: fragment.clone(),
221				})),
222				fragment: fragment.clone(),
223			});
224		}
225	}
226	Ok(())
227}
228
229#[inline]
230fn try_resolve_unit_variant<'a>(
231	ctx: &Arc<QueryContext>,
232	txn: &mut Transaction<'a>,
233	source: &ResolvedShape,
234	col_name: &str,
235	alias_text: &str,
236) -> Result<Option<(SumType, u8)>> {
237	let tag_col_name = format!("{}_tag", col_name);
238
239	if let Some(tag_col) = source.columns().iter().find(|c| c.name == tag_col_name) {
240		let Some(Constraint::SumType(id)) = tag_col.constraint.constraint() else {
241			return Ok(None);
242		};
243		let sumtype = ctx.services.catalog.get_sumtype(txn, *id)?;
244		let variant_name_lower = alias_text.to_lowercase();
245		let maybe_tag =
246			sumtype.variants.iter().find(|v| v.name.to_lowercase() == variant_name_lower).map(|v| v.tag);
247		return Ok(maybe_tag.map(|tag| (sumtype, tag)));
248	}
249
250	if let ResolvedShape::Series(series) = source
251		&& let Some(tag_id) = series.def().tag
252	{
253		let sumtype = ctx.services.catalog.get_sumtype(txn, tag_id)?;
254		let variant_name_lower = alias_text.to_lowercase();
255		let maybe_tag =
256			sumtype.variants.iter().find(|v| v.name.to_lowercase() == variant_name_lower).map(|v| v.tag);
257		return Ok(maybe_tag.map(|tag| (sumtype, tag)));
258	}
259
260	Ok(None)
261}
262
263impl QueryNode for InlineDataNode {
264	fn initialize<'a>(&mut self, rx: &mut Transaction<'a>, _ctx: &QueryContext) -> Result<()> {
265		self.expand_sumtype_constructors(rx)?;
266		Ok(())
267	}
268
269	fn next<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
270		debug_assert!(self.context.is_some(), "InlineDataNode::next() called before initialize()");
271		let stored_ctx = self.context.as_ref().unwrap().clone();
272
273		if self.executed {
274			return Ok(None);
275		}
276
277		self.executed = true;
278
279		if self.rows.is_empty() {
280			let columns = Columns::empty();
281			if self.headers.is_none() {
282				self.headers = Some(ColumnHeaders::from_columns(&columns));
283			}
284			return Ok(Some(columns));
285		}
286
287		// Choose execution path based on whether we have table
288		// namespace
289		if self.headers.is_some() {
290			self.next_with_source(&stored_ctx)
291		} else {
292			self.next_infer_namespace(&stored_ctx)
293		}
294	}
295
296	fn headers(&self) -> Option<ColumnHeaders> {
297		self.headers.clone()
298	}
299}
300
301impl InlineDataNode {
302	/// Determines the optimal (narrowest) integer type that can hold all
303	/// values
304	fn find_optimal_integer_type(column: &ColumnBuffer) -> Type {
305		let mut min_val = i128::MAX;
306		let mut max_val = i128::MIN;
307		let mut has_values = false;
308
309		for value in column.iter() {
310			match value {
311				Value::Int16(v) => {
312					has_values = true;
313					min_val = min_val.min(v);
314					max_val = max_val.max(v);
315				}
316				Value::None {
317					..
318				} => {
319					// Skip undefined values
320				}
321				_ => {
322					// Non-integer value, keep as Int16
323					return Type::Int16;
324				}
325			}
326		}
327
328		if !has_values {
329			return Type::Int1; // Default to smallest if no values
330		}
331
332		// Determine narrowest type that can hold the range
333		if min_val >= i8::MIN as i128 && max_val <= i8::MAX as i128 {
334			Type::Int1
335		} else if min_val >= i16::MIN as i128 && max_val <= i16::MAX as i128 {
336			Type::Int2
337		} else if min_val >= i32::MIN as i128 && max_val <= i32::MAX as i128 {
338			Type::Int4
339		} else if min_val >= i64::MIN as i128 && max_val <= i64::MAX as i128 {
340			Type::Int8
341		} else {
342			Type::Int16
343		}
344	}
345
346	fn next_infer_namespace(&mut self, ctx: &QueryContext) -> Result<Option<Columns>> {
347		// Collect all unique column names across all rows
348		let mut all_columns: BTreeSet<String> = BTreeSet::new();
349
350		for row in &self.rows {
351			for keyed_expr in row {
352				let column_name = keyed_expr.alias.0.text().to_string();
353				all_columns.insert(column_name);
354			}
355		}
356
357		// Convert each encoded to a HashMap for easier lookup
358		let mut rows_data: Vec<HashMap<String, &AliasExpression>> = Vec::new();
359
360		for row in &self.rows {
361			let mut row_map: HashMap<String, &AliasExpression> = HashMap::new();
362			for alias_expr in row {
363				let column_name = alias_expr.alias.0.text().to_string();
364				row_map.insert(column_name, alias_expr);
365			}
366			rows_data.push(row_map);
367		}
368
369		let session = EvalContext::from_query(ctx);
370
371		// Create columns - start with wide types
372		let mut columns = Vec::new();
373
374		for column_name in all_columns {
375			// First pass: collect all values in a wide column
376			let mut all_values = Vec::new();
377			let mut first_value_type: Option<Type> = None;
378			let mut column_fragment: Option<Fragment> = None;
379
380			for row_data in &rows_data {
381				if let Some(alias_expr) = row_data.get(&column_name) {
382					if column_fragment.is_none() {
383						column_fragment = Some(alias_expr.fragment.clone());
384					}
385					let eval_ctx = session.with_eval_empty();
386
387					let evaluated = evaluate(&eval_ctx, &alias_expr.expression)?;
388
389					// Take the first value from the
390					// evaluated result
391					let mut iter = evaluated.data().iter();
392					if let Some(value) = iter.next() {
393						// Track the first non-undefined
394						// value type we see
395						if first_value_type.is_none() && !matches!(value, Value::None { .. }) {
396							first_value_type = Some(value.get_type());
397						}
398						all_values.push(value);
399					} else {
400						all_values.push(Value::none());
401					}
402				} else {
403					all_values.push(Value::none());
404				}
405			}
406
407			// Determine the initial wide type based on what we saw
408			let wide_type = if let Some(ref fvt) = first_value_type {
409				if fvt.is_integer() {
410					Some(Type::Int16) // Start with widest integer type
411				} else if fvt.is_floating_point() {
412					Some(Type::Float8) // Start with widest float type
413				} else if *fvt == Type::Utf8 {
414					Some(Type::Utf8)
415				} else if *fvt == Type::Boolean {
416					Some(Type::Boolean)
417				} else {
418					None
419				}
420			} else {
421				None
422			};
423
424			// Create the wide column and add all values
425			let mut column_data = if wide_type.is_none() {
426				ColumnBuffer::none_typed(Type::Boolean, all_values.len())
427			} else {
428				let mut data = ColumnBuffer::with_capacity(wide_type.clone().unwrap(), 0);
429
430				// Add each value, casting to the wide
431				// type if needed
432				for value in &all_values {
433					if matches!(value, Value::None { .. }) {
434						data.push_none();
435					} else if wide_type.as_ref().is_some_and(|wt| value.get_type() == *wt) {
436						data.push_value(value.clone());
437					} else {
438						// Cast to the wide type
439						let temp_data = ColumnBuffer::from(value.clone());
440						let eval_ctx = session.with_eval_empty();
441
442						match cast_column_data(
443							&eval_ctx,
444							&temp_data,
445							wide_type.clone().unwrap(),
446							Fragment::none,
447						) {
448							Ok(casted) => {
449								if let Some(casted_value) = casted.iter().next() {
450									data.push_value(casted_value);
451								} else {
452									data.push_none();
453								}
454							}
455							Err(_) => {
456								data.push_none();
457							}
458						}
459					}
460				}
461
462				data
463			};
464
465			// Now optimize: find the narrowest type and demote if
466			// possible
467			if wide_type == Some(Type::Int16) {
468				let optimal_type = Self::find_optimal_integer_type(&column_data);
469				if optimal_type != Type::Int16 {
470					// Demote to the optimal type
471					let eval_ctx = session.with_eval(Columns::empty(), column_data.len());
472
473					if let Ok(demoted) =
474						cast_column_data(&eval_ctx, &column_data, optimal_type, || {
475							Fragment::none()
476						}) {
477						column_data = demoted;
478					}
479				}
480			}
481			// Could add similar optimization for Float8 -> Float4
482			// if needed
483
484			columns.push(ColumnWithName::new(
485				column_fragment.unwrap_or_else(|| Fragment::internal(column_name)),
486				column_data,
487			));
488		}
489
490		let columns = Columns::new(columns);
491		self.headers = Some(ColumnHeaders::from_columns(&columns));
492
493		Ok(Some(columns))
494	}
495
496	fn next_with_source(&mut self, ctx: &QueryContext) -> Result<Option<Columns>> {
497		let source = ctx.source.as_ref().unwrap(); // Safe because headers is Some
498		let headers = self.headers.as_ref().unwrap(); // Safe because we're in this path
499		let session = EvalContext::from_query(ctx);
500
501		// Convert rows to HashMap for easier column lookup
502		let mut rows_data: Vec<HashMap<String, &AliasExpression>> = Vec::new();
503
504		for row in &self.rows {
505			let mut row_map: HashMap<String, &AliasExpression> = HashMap::new();
506			for alias_expr in row {
507				let column_name = alias_expr.alias.0.text().to_string();
508				row_map.insert(column_name, alias_expr);
509			}
510			rows_data.push(row_map);
511		}
512
513		// Create columns based on table namespace
514		let mut columns = Vec::new();
515
516		for column_name in &headers.columns {
517			// Find the corresponding source column for type info and policies.
518			// May be None for extra columns like "timestamp" or "tag" in series inserts.
519			let table_column = source.columns().iter().find(|col| col.name == column_name.text());
520
521			let mut column_data = if let Some(tc) = table_column {
522				ColumnBuffer::none_typed(tc.constraint.get_type(), 0)
523			} else {
524				ColumnBuffer::with_capacity(Type::Int16, 0)
525			};
526			let mut column_fragment: Option<Fragment> = None;
527
528			for row_data in &rows_data {
529				if let Some(alias_expr) = row_data.get(column_name.text()) {
530					if column_fragment.is_none() {
531						column_fragment = Some(alias_expr.fragment.clone());
532					}
533					let mut eval_ctx = session.with_eval_empty();
534					eval_ctx.target = table_column.map(|tc| TargetColumn::Partial {
535						source_name: Some(source.identifier().text().to_string()),
536						column_name: Some(tc.name.clone()),
537						column_type: tc.constraint.get_type(),
538						properties: tc
539							.properties
540							.iter()
541							.map(|cp| cp.property.clone())
542							.collect(),
543					});
544
545					let evaluated = evaluate(&eval_ctx, &alias_expr.expression)?;
546
547					// Ensure we always add exactly one
548					// value
549					let eval_len = evaluated.data().len();
550					if table_column.is_some() {
551						// Source-defined column: types match, extend directly
552						if eval_len == 1 {
553							column_data.extend(evaluated.data().clone())?;
554						} else if eval_len == 0 {
555							column_data.push_value(Value::none());
556						} else {
557							let first_value =
558								evaluated.data().iter().next().unwrap_or(Value::none());
559							column_data.push_value(first_value);
560						}
561					} else {
562						// Extra column (e.g., timestamp): cast value to Int16
563						let value = if eval_len > 0 {
564							evaluated.data().iter().next().unwrap_or(Value::none())
565						} else {
566							Value::none()
567						};
568						match &value {
569							Value::None {
570								..
571							} => column_data.push_none(),
572							Value::Int16(_) => column_data.push_value(value),
573							_ => {
574								let temp = ColumnBuffer::from(value.clone());
575								match cast_column_data(
576									&eval_ctx,
577									&temp,
578									Type::Int16,
579									Fragment::none,
580								) {
581									Ok(casted) => {
582										if let Some(v) = casted.iter().next() {
583											column_data.push_value(v);
584										} else {
585											column_data.push_none();
586										}
587									}
588									Err(_) => column_data.push_value(value),
589								}
590							}
591						}
592					}
593				} else {
594					column_data.push_value(Value::none());
595				}
596			}
597
598			// For extra columns, narrow the integer type
599			if table_column.is_none() {
600				let optimal_type = Self::find_optimal_integer_type(&column_data);
601				if optimal_type != Type::Int16 {
602					let eval_ctx = session.with_eval(Columns::empty(), column_data.len());
603					if let Ok(demoted) =
604						cast_column_data(&eval_ctx, &column_data, optimal_type, || {
605							Fragment::none()
606						}) {
607						column_data = demoted;
608					}
609				}
610			}
611
612			columns.push(ColumnWithName::new(
613				column_fragment
614					.map(|f| f.with_text(column_name.text()))
615					.unwrap_or_else(|| column_name.clone()),
616				column_data,
617			));
618		}
619
620		let columns = Columns::new(columns);
621
622		Ok(Some(columns))
623	}
624}