manifoldb_query/exec/operator.rs
1//! Operator trait and base types.
2//!
3//! This module defines the [`Operator`] trait that all execution
4//! operators implement.
5
6use std::sync::Arc;
7
8use crate::error::ParseError;
9
10use super::context::ExecutionContext;
11use super::row::{Row, Schema};
12
13/// Result type for operator operations.
14pub type OperatorResult<T> = Result<T, ParseError>;
15
16/// The state of an operator.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum OperatorState {
19 /// Operator has not been opened yet.
20 Created,
21 /// Operator is open and ready to produce rows.
22 Open,
23 /// Operator has finished producing rows.
24 Finished,
25 /// Operator has been closed.
26 Closed,
27}
28
29impl OperatorState {
30 /// Returns true if the operator is open.
31 #[must_use]
32 pub const fn is_open(self) -> bool {
33 matches!(self, Self::Open)
34 }
35
36 /// Returns true if the operator has finished.
37 #[must_use]
38 pub const fn is_finished(self) -> bool {
39 matches!(self, Self::Finished)
40 }
41
42 /// Returns true if the operator is closed.
43 #[must_use]
44 pub const fn is_closed(self) -> bool {
45 matches!(self, Self::Closed)
46 }
47}
48
49/// The operator trait for pull-based query execution.
50///
51/// Operators are organized in a tree structure matching the physical plan.
52/// Data flows from leaf operators (scans) up through intermediate operators
53/// (filter, project, join) to the root.
54///
55/// # Lifecycle
56///
57/// 1. **Created**: Initial state after construction
58/// 2. **Open**: After `open()` is called; ready to produce rows
59/// 3. **Finished**: After `next()` returns `None`; no more rows
60/// 4. **Closed**: After `close()` is called; resources released
61///
62/// # Thread Safety
63///
64/// The `Send` bound allows operators to be passed between threads,
65/// but operators are not required to be `Sync` - they maintain mutable
66/// internal state.
67pub trait Operator: Send {
68 /// Opens the operator and prepares it to produce rows.
69 ///
70 /// This method recursively opens any child operators.
71 fn open(&mut self, ctx: &ExecutionContext) -> OperatorResult<()>;
72
73 /// Returns the next row, or `None` if there are no more rows.
74 ///
75 /// This method should be called repeatedly until it returns `None`.
76 fn next(&mut self) -> OperatorResult<Option<Row>>;
77
78 /// Closes the operator and releases resources.
79 ///
80 /// This method recursively closes any child operators.
81 fn close(&mut self) -> OperatorResult<()>;
82
83 /// Returns the output schema of this operator.
84 fn schema(&self) -> Arc<Schema>;
85
86 /// Returns the current state of this operator.
87 fn state(&self) -> OperatorState;
88
89 /// Returns the name of this operator type.
90 fn name(&self) -> &'static str;
91}
92
93/// A boxed operator for dynamic dispatch.
94pub type BoxedOperator = Box<dyn Operator>;
95
96/// Base implementation for operators.
97///
98/// This struct provides common functionality that operators can use.
99#[derive(Debug)]
100pub struct OperatorBase {
101 /// The output schema.
102 schema: Arc<Schema>,
103 /// The current state.
104 state: OperatorState,
105 /// Number of rows produced.
106 rows_produced: u64,
107}
108
109impl OperatorBase {
110 /// Creates a new operator base with the given schema.
111 #[must_use]
112 pub fn new(schema: Arc<Schema>) -> Self {
113 Self { schema, state: OperatorState::Created, rows_produced: 0 }
114 }
115
116 /// Returns the schema.
117 #[must_use]
118 pub fn schema(&self) -> Arc<Schema> {
119 Arc::clone(&self.schema)
120 }
121
122 /// Returns the current state.
123 #[must_use]
124 pub const fn state(&self) -> OperatorState {
125 self.state
126 }
127
128 /// Sets the state to open.
129 pub fn set_open(&mut self) {
130 self.state = OperatorState::Open;
131 }
132
133 /// Sets the state to finished.
134 pub fn set_finished(&mut self) {
135 self.state = OperatorState::Finished;
136 }
137
138 /// Sets the state to closed.
139 pub fn set_closed(&mut self) {
140 self.state = OperatorState::Closed;
141 }
142
143 /// Increments the rows produced counter.
144 pub fn inc_rows_produced(&mut self) {
145 self.rows_produced += 1;
146 }
147
148 /// Returns the number of rows produced.
149 #[must_use]
150 pub const fn rows_produced(&self) -> u64 {
151 self.rows_produced
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158
159 #[test]
160 fn operator_state_transitions() {
161 let mut base = OperatorBase::new(Arc::new(Schema::empty()));
162
163 assert_eq!(base.state(), OperatorState::Created);
164
165 base.set_open();
166 assert!(base.state().is_open());
167
168 base.set_finished();
169 assert!(base.state().is_finished());
170
171 base.set_closed();
172 assert!(base.state().is_closed());
173 }
174
175 #[test]
176 fn operator_base_rows() {
177 let mut base = OperatorBase::new(Arc::new(Schema::empty()));
178 assert_eq!(base.rows_produced(), 0);
179
180 base.inc_rows_produced();
181 base.inc_rows_produced();
182 assert_eq!(base.rows_produced(), 2);
183 }
184}