use std::sync::Arc;
use crate::error::ParseError;
use super::context::ExecutionContext;
use super::row::{Row, Schema};
pub type OperatorResult<T> = Result<T, ParseError>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OperatorState {
Created,
Open,
Finished,
Closed,
}
impl OperatorState {
#[must_use]
pub const fn is_open(self) -> bool {
matches!(self, Self::Open)
}
#[must_use]
pub const fn is_finished(self) -> bool {
matches!(self, Self::Finished)
}
#[must_use]
pub const fn is_closed(self) -> bool {
matches!(self, Self::Closed)
}
}
pub trait Operator: Send {
fn open(&mut self, ctx: &ExecutionContext) -> OperatorResult<()>;
fn next(&mut self) -> OperatorResult<Option<Row>>;
fn close(&mut self) -> OperatorResult<()>;
fn schema(&self) -> Arc<Schema>;
fn state(&self) -> OperatorState;
fn name(&self) -> &'static str;
}
pub type BoxedOperator = Box<dyn Operator>;
#[derive(Debug)]
pub struct OperatorBase {
schema: Arc<Schema>,
state: OperatorState,
rows_produced: u64,
}
impl OperatorBase {
#[must_use]
pub fn new(schema: Arc<Schema>) -> Self {
Self { schema, state: OperatorState::Created, rows_produced: 0 }
}
#[must_use]
pub fn schema(&self) -> Arc<Schema> {
Arc::clone(&self.schema)
}
#[must_use]
pub const fn state(&self) -> OperatorState {
self.state
}
pub fn set_open(&mut self) {
self.state = OperatorState::Open;
}
pub fn set_finished(&mut self) {
self.state = OperatorState::Finished;
}
pub fn set_closed(&mut self) {
self.state = OperatorState::Closed;
}
pub fn inc_rows_produced(&mut self) {
self.rows_produced += 1;
}
#[must_use]
pub const fn rows_produced(&self) -> u64 {
self.rows_produced
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn operator_state_transitions() {
let mut base = OperatorBase::new(Arc::new(Schema::empty()));
assert_eq!(base.state(), OperatorState::Created);
base.set_open();
assert!(base.state().is_open());
base.set_finished();
assert!(base.state().is_finished());
base.set_closed();
assert!(base.state().is_closed());
}
#[test]
fn operator_base_rows() {
let mut base = OperatorBase::new(Arc::new(Schema::empty()));
assert_eq!(base.rows_produced(), 0);
base.inc_rows_produced();
base.inc_rows_produced();
assert_eq!(base.rows_produced(), 2);
}
}