Skip to main content

reifydb_engine/vm/volcano/
apply_transform.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::value::column::{columns::Columns, headers::ColumnHeaders};
7use reifydb_extension::transform::{Transform, context::TransformContext};
8use reifydb_transaction::transaction::Transaction;
9use tracing::instrument;
10
11use crate::{
12	Result,
13	vm::volcano::query::{QueryContext, QueryNode},
14};
15
16pub(crate) struct ApplyTransformNode {
17	input: Box<dyn QueryNode>,
18	transform: Box<dyn Transform>,
19	context: Option<Arc<QueryContext>>,
20}
21
22impl ApplyTransformNode {
23	pub fn new(input: Box<dyn QueryNode>, transform: Box<dyn Transform>) -> Self {
24		Self {
25			input,
26			transform,
27			context: None,
28		}
29	}
30}
31
32impl QueryNode for ApplyTransformNode {
33	#[instrument(level = "trace", skip_all, name = "volcano::apply_transform::initialize")]
34	fn initialize<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &QueryContext) -> Result<()> {
35		self.context = Some(Arc::new(ctx.clone()));
36		self.input.initialize(rx, ctx)?;
37		Ok(())
38	}
39
40	#[instrument(level = "trace", skip_all, name = "volcano::apply_transform::next")]
41	fn next<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &mut QueryContext) -> Result<Option<Columns>> {
42		debug_assert!(self.context.is_some(), "ApplyTransformNode::next() called before initialize()");
43		let stored_ctx = self.context.as_ref().unwrap();
44
45		if let Some(columns) = self.input.next(rx, ctx)? {
46			let transform_ctx = TransformContext {
47				routines: &ctx.services.routines,
48				runtime_context: &stored_ctx.services.runtime_context,
49				params: &stored_ctx.params,
50			};
51			let result = self.transform.apply(&transform_ctx, columns)?;
52			Ok(Some(result))
53		} else {
54			Ok(None)
55		}
56	}
57
58	fn headers(&self) -> Option<ColumnHeaders> {
59		self.input.headers()
60	}
61}