Skip to main content

reifydb_engine/vm/volcano/
query.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	interface::resolved::ResolvedShape,
8	value::{
9		batch::lazy::LazyBatch,
10		column::{columns::Columns, headers::ColumnHeaders},
11	},
12};
13use reifydb_transaction::transaction::Transaction;
14use reifydb_type::{params::Params, value::identity::IdentityId};
15
16use crate::{
17	Result,
18	vm::{services::Services, stack::SymbolTable},
19};
20
21/// Unified trait for query execution nodes following the volcano iterator pattern
22pub trait QueryNode: Send + Sync {
23	/// Initialize the operator with execution context
24	/// Called once before iteration begins
25	fn initialize<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &QueryContext) -> Result<()>;
26
27	/// Get the next batch of results (volcano iterator pattern)
28	/// Returns None when exhausted
29	fn next<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &mut QueryContext) -> Result<Option<Columns>>;
30
31	/// Get the next batch as a LazyBatch for deferred materialization
32	/// Returns None if this node doesn't support lazy evaluation or is exhausted
33	/// Default implementation returns None (falls back to materialized evaluation)
34	fn next_lazy<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<LazyBatch>> {
35		Ok(None)
36	}
37
38	/// Get the headers of columns this node produces
39	fn headers(&self) -> Option<ColumnHeaders>;
40
41	/// Hint the maximum number of rows this scan needs to produce.
42	/// Scan operators override this to cap their batch size.
43	/// Non-scan operators ignore it (default no-op).
44	fn set_scan_limit(&mut self, _limit: usize) {}
45}
46
47#[derive(Clone)]
48pub struct QueryContext {
49	pub services: Arc<Services>,
50	pub source: Option<ResolvedShape>,
51	pub batch_size: u64,
52	pub params: Params,
53	pub symbols: SymbolTable,
54	pub identity: IdentityId,
55}
56
57impl QueryNode for Box<dyn QueryNode> {
58	fn initialize<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &QueryContext) -> Result<()> {
59		(**self).initialize(rx, ctx)
60	}
61
62	fn next<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &mut QueryContext) -> Result<Option<Columns>> {
63		(**self).next(rx, ctx)
64	}
65
66	fn next_lazy<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &mut QueryContext) -> Result<Option<LazyBatch>> {
67		(**self).next_lazy(rx, ctx)
68	}
69
70	fn headers(&self) -> Option<ColumnHeaders> {
71		(**self).headers()
72	}
73
74	fn set_scan_limit(&mut self, limit: usize) {
75		(**self).set_scan_limit(limit)
76	}
77}