use crate::incremental::dbsp::{Delta, DeltaPair};
use crate::incremental::operator::{
ComputationTracker, DbspStateCursors, EvalState, IncrementalOperator,
};
use crate::sync::Arc;
use crate::sync::Mutex;
use crate::types::IOResult;
use crate::Result;
#[derive(Debug)]
pub struct InputOperator {
#[allow(dead_code)]
name: String,
}
impl InputOperator {
pub fn new(name: String) -> Self {
Self { name }
}
}
impl IncrementalOperator for InputOperator {
fn eval(
&mut self,
state: &mut EvalState,
_cursors: &mut DbspStateCursors,
) -> Result<IOResult<Delta>> {
match state {
EvalState::Init { deltas } => {
assert!(
deltas.right.is_empty(),
"InputOperator expects right_delta to be empty"
);
let output = std::mem::take(&mut deltas.left);
*state = EvalState::Done;
Ok(IOResult::Done(output))
}
_ => unreachable!(
"InputOperator doesn't execute the state machine. Should be in Init state"
),
}
}
fn commit(
&mut self,
deltas: DeltaPair,
_cursors: &mut DbspStateCursors,
) -> Result<IOResult<Delta>> {
assert!(
deltas.right.is_empty(),
"InputOperator expects right delta to be empty in commit"
);
Ok(IOResult::Done(deltas.left))
}
fn set_tracker(&mut self, _tracker: Arc<Mutex<ComputationTracker>>) {
}
}