manifoldb_query/exec/
operator.rs

1//! Operator trait and base types.
2//!
3//! This module defines the [`Operator`] trait that all execution
4//! operators implement.
5
6use std::sync::Arc;
7
8use crate::error::ParseError;
9
10use super::context::ExecutionContext;
11use super::row::{Row, Schema};
12
13/// Result type for operator operations.
14pub type OperatorResult<T> = Result<T, ParseError>;
15
16/// The state of an operator.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum OperatorState {
19    /// Operator has not been opened yet.
20    Created,
21    /// Operator is open and ready to produce rows.
22    Open,
23    /// Operator has finished producing rows.
24    Finished,
25    /// Operator has been closed.
26    Closed,
27}
28
29impl OperatorState {
30    /// Returns true if the operator is open.
31    #[must_use]
32    pub const fn is_open(self) -> bool {
33        matches!(self, Self::Open)
34    }
35
36    /// Returns true if the operator has finished.
37    #[must_use]
38    pub const fn is_finished(self) -> bool {
39        matches!(self, Self::Finished)
40    }
41
42    /// Returns true if the operator is closed.
43    #[must_use]
44    pub const fn is_closed(self) -> bool {
45        matches!(self, Self::Closed)
46    }
47}
48
49/// The operator trait for pull-based query execution.
50///
51/// Operators are organized in a tree structure matching the physical plan.
52/// Data flows from leaf operators (scans) up through intermediate operators
53/// (filter, project, join) to the root.
54///
55/// # Lifecycle
56///
57/// 1. **Created**: Initial state after construction
58/// 2. **Open**: After `open()` is called; ready to produce rows
59/// 3. **Finished**: After `next()` returns `None`; no more rows
60/// 4. **Closed**: After `close()` is called; resources released
61///
62/// # Thread Safety
63///
64/// The `Send` bound allows operators to be passed between threads,
65/// but operators are not required to be `Sync` - they maintain mutable
66/// internal state.
67pub trait Operator: Send {
68    /// Opens the operator and prepares it to produce rows.
69    ///
70    /// This method recursively opens any child operators.
71    fn open(&mut self, ctx: &ExecutionContext) -> OperatorResult<()>;
72
73    /// Returns the next row, or `None` if there are no more rows.
74    ///
75    /// This method should be called repeatedly until it returns `None`.
76    fn next(&mut self) -> OperatorResult<Option<Row>>;
77
78    /// Closes the operator and releases resources.
79    ///
80    /// This method recursively closes any child operators.
81    fn close(&mut self) -> OperatorResult<()>;
82
83    /// Returns the output schema of this operator.
84    fn schema(&self) -> Arc<Schema>;
85
86    /// Returns the current state of this operator.
87    fn state(&self) -> OperatorState;
88
89    /// Returns the name of this operator type.
90    fn name(&self) -> &'static str;
91}
92
93/// A boxed operator for dynamic dispatch.
94pub type BoxedOperator = Box<dyn Operator>;
95
96/// Base implementation for operators.
97///
98/// This struct provides common functionality that operators can use.
99#[derive(Debug)]
100pub struct OperatorBase {
101    /// The output schema.
102    schema: Arc<Schema>,
103    /// The current state.
104    state: OperatorState,
105    /// Number of rows produced.
106    rows_produced: u64,
107}
108
109impl OperatorBase {
110    /// Creates a new operator base with the given schema.
111    #[must_use]
112    pub fn new(schema: Arc<Schema>) -> Self {
113        Self { schema, state: OperatorState::Created, rows_produced: 0 }
114    }
115
116    /// Returns the schema.
117    #[must_use]
118    pub fn schema(&self) -> Arc<Schema> {
119        Arc::clone(&self.schema)
120    }
121
122    /// Returns the current state.
123    #[must_use]
124    pub const fn state(&self) -> OperatorState {
125        self.state
126    }
127
128    /// Sets the state to open.
129    pub fn set_open(&mut self) {
130        self.state = OperatorState::Open;
131    }
132
133    /// Sets the state to finished.
134    pub fn set_finished(&mut self) {
135        self.state = OperatorState::Finished;
136    }
137
138    /// Sets the state to closed.
139    pub fn set_closed(&mut self) {
140        self.state = OperatorState::Closed;
141    }
142
143    /// Increments the rows produced counter.
144    pub fn inc_rows_produced(&mut self) {
145        self.rows_produced += 1;
146    }
147
148    /// Returns the number of rows produced.
149    #[must_use]
150    pub const fn rows_produced(&self) -> u64 {
151        self.rows_produced
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158
159    #[test]
160    fn operator_state_transitions() {
161        let mut base = OperatorBase::new(Arc::new(Schema::empty()));
162
163        assert_eq!(base.state(), OperatorState::Created);
164
165        base.set_open();
166        assert!(base.state().is_open());
167
168        base.set_finished();
169        assert!(base.state().is_finished());
170
171        base.set_closed();
172        assert!(base.state().is_closed());
173    }
174
175    #[test]
176    fn operator_base_rows() {
177        let mut base = OperatorBase::new(Arc::new(Schema::empty()));
178        assert_eq!(base.rows_produced(), 0);
179
180        base.inc_rows_produced();
181        base.inc_rows_produced();
182        assert_eq!(base.rows_produced(), 2);
183    }
184}