reifydb_engine/vm/volcano/
apply_transform.rs1use std::sync::Arc;
5
6use reifydb_core::value::column::{columns::Columns, headers::ColumnHeaders};
7use reifydb_extension::transform::{Transform, context::TransformContext};
8use reifydb_transaction::transaction::Transaction;
9use tracing::instrument;
10
11use crate::{
12 Result,
13 vm::volcano::query::{QueryContext, QueryNode},
14};
15
16pub(crate) struct ApplyTransformNode {
17 input: Box<dyn QueryNode>,
18 transform: Box<dyn Transform>,
19 context: Option<Arc<QueryContext>>,
20}
21
22impl ApplyTransformNode {
23 pub fn new(input: Box<dyn QueryNode>, transform: Box<dyn Transform>) -> Self {
24 Self {
25 input,
26 transform,
27 context: None,
28 }
29 }
30}
31
32impl QueryNode for ApplyTransformNode {
33 #[instrument(level = "trace", skip_all, name = "volcano::apply_transform::initialize")]
34 fn initialize<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &QueryContext) -> Result<()> {
35 self.context = Some(Arc::new(ctx.clone()));
36 self.input.initialize(rx, ctx)?;
37 Ok(())
38 }
39
40 #[instrument(level = "trace", skip_all, name = "volcano::apply_transform::next")]
41 fn next<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &mut QueryContext) -> Result<Option<Columns>> {
42 debug_assert!(self.context.is_some(), "ApplyTransformNode::next() called before initialize()");
43 let stored_ctx = self.context.as_ref().unwrap();
44
45 if let Some(columns) = self.input.next(rx, ctx)? {
46 let transform_ctx = TransformContext {
47 routines: &ctx.services.routines,
48 runtime_context: &stored_ctx.services.runtime_context,
49 params: &stored_ctx.params,
50 };
51 let result = self.transform.apply(&transform_ctx, columns)?;
52 Ok(Some(result))
53 } else {
54 Ok(None)
55 }
56 }
57
58 fn headers(&self) -> Option<ColumnHeaders> {
59 self.input.headers()
60 }
61}