#![allow(async_fn_in_trait)]
use feldera_storage::{FileCommitter, StoragePath};
use crate::Error;
use crate::{
circuit::{
OwnershipPreference, Scope,
metadata::{OperatorLocation, OperatorMeta},
},
trace::cursor::Position,
};
use std::borrow::Cow;
use std::sync::Arc;
use super::GlobalNodeId;
pub trait Data: Clone + 'static {}
impl<T: Clone + 'static> Data for T {}
pub trait Operator: 'static {
fn name(&self) -> Cow<'static, str>;
fn location(&self) -> OperatorLocation {
None
}
fn init(&mut self, _global_id: &GlobalNodeId) {}
fn metadata(&self, _meta: &mut OperatorMeta) {}
fn clock_start(&mut self, _scope: Scope) {}
fn clock_end(&mut self, _scope: Scope) {}
fn is_async(&self) -> bool {
false
}
fn is_input(&self) -> bool {
false
}
fn ready(&self) -> bool {
true
}
fn register_ready_callback<F>(&mut self, _cb: F)
where
F: Fn() + Send + Sync + 'static,
{
}
fn fixedpoint(&self, scope: Scope) -> bool;
#[allow(unused_variables)]
fn checkpoint(
&mut self,
base: &StoragePath,
persistent_id: Option<&str>,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), Error> {
Ok(())
}
#[allow(unused_variables)]
fn restore(&mut self, base: &StoragePath, persistent_id: Option<&str>) -> Result<(), Error> {
Ok(())
}
fn clear_state(&mut self) -> Result<(), Error> {
Ok(())
}
fn start_replay(&mut self) -> Result<(), Error> {
panic!("start_replay() is not implemented for this operator")
}
fn is_replay_complete(&self) -> bool {
panic!("is_replay_complete() is not implemented for this operator")
}
fn end_replay(&mut self) -> Result<(), Error> {
panic!("end_replay() is not implemented for this operator")
}
fn start_transaction(&mut self) {}
fn flush(&mut self) {}
fn is_flush_complete(&self) -> bool {
true
}
fn flush_progress(&self) -> Option<Position> {
None
}
}
pub trait SourceOperator<O>: Operator {
async fn eval(&mut self) -> O;
}
pub trait SinkOperator<I>: Operator {
async fn eval(&mut self, input: &I);
async fn eval_owned(&mut self, input: I) {
self.eval(&input).await
}
fn input_preference(&self) -> OwnershipPreference {
OwnershipPreference::INDIFFERENT
}
}
pub trait BinarySinkOperator<I1, I2>: Operator
where
I1: Clone,
I2: Clone,
{
async fn eval<'a>(&mut self, lhs: Cow<'a, I1>, rhs: Cow<'a, I2>);
fn input_preference(&self) -> (OwnershipPreference, OwnershipPreference) {
(
OwnershipPreference::INDIFFERENT,
OwnershipPreference::INDIFFERENT,
)
}
}
pub trait TernarySinkOperator<I1, I2, I3>: Operator
where
I1: Clone,
I2: Clone,
I3: Clone,
{
async fn eval<'a>(&mut self, input1: Cow<'a, I1>, input2: Cow<'a, I2>, input3: Cow<'a, I3>);
fn input_preference(
&self,
) -> (
OwnershipPreference,
OwnershipPreference,
OwnershipPreference,
) {
(
OwnershipPreference::INDIFFERENT,
OwnershipPreference::INDIFFERENT,
OwnershipPreference::INDIFFERENT,
)
}
}
pub trait UnaryOperator<I, O>: Operator {
async fn eval(&mut self, input: &I) -> O;
async fn eval_owned(&mut self, input: I) -> O {
self.eval(&input).await
}
fn input_preference(&self) -> OwnershipPreference {
OwnershipPreference::INDIFFERENT
}
}
pub trait BinaryOperator<I1, I2, O>: Operator {
async fn eval(&mut self, lhs: &I1, rhs: &I2) -> O;
async fn eval_owned(&mut self, lhs: I1, rhs: I2) -> O {
self.eval(&lhs, &rhs).await
}
async fn eval_owned_and_ref(&mut self, lhs: I1, rhs: &I2) -> O {
self.eval(&lhs, rhs).await
}
async fn eval_ref_and_owned(&mut self, lhs: &I1, rhs: I2) -> O {
self.eval(lhs, &rhs).await
}
fn input_preference(&self) -> (OwnershipPreference, OwnershipPreference) {
(
OwnershipPreference::INDIFFERENT,
OwnershipPreference::INDIFFERENT,
)
}
}
pub trait TernaryOperator<I1, I2, I3, O>: Operator
where
I1: Clone,
I2: Clone,
I3: Clone,
{
async fn eval(&mut self, i1: Cow<'_, I1>, i2: Cow<'_, I2>, i3: Cow<'_, I3>) -> O;
fn input_preference(
&self,
) -> (
OwnershipPreference,
OwnershipPreference,
OwnershipPreference,
) {
(
OwnershipPreference::INDIFFERENT,
OwnershipPreference::INDIFFERENT,
OwnershipPreference::INDIFFERENT,
)
}
}
pub trait QuaternaryOperator<I1, I2, I3, I4, O>: Operator
where
I1: Clone,
I2: Clone,
I3: Clone,
I4: Clone,
{
async fn eval(
&mut self,
i1: Cow<'_, I1>,
i2: Cow<'_, I2>,
i3: Cow<'_, I3>,
i4: Cow<'_, I4>,
) -> O;
fn input_preference(
&self,
) -> (
OwnershipPreference,
OwnershipPreference,
OwnershipPreference,
OwnershipPreference,
) {
(
OwnershipPreference::INDIFFERENT,
OwnershipPreference::INDIFFERENT,
OwnershipPreference::INDIFFERENT,
OwnershipPreference::INDIFFERENT,
)
}
}
pub trait NaryOperator<I, O>: Operator
where
I: Clone + 'static,
{
async fn eval<'a, Iter>(&'a mut self, inputs: Iter) -> O
where
Iter: Iterator<Item = Cow<'a, I>>;
fn input_preference(&self) -> OwnershipPreference {
OwnershipPreference::INDIFFERENT
}
}
pub trait StrictOperator<O>: Operator {
fn get_output(&mut self) -> O;
fn get_final_output(&mut self) -> O;
}
pub trait StrictUnaryOperator<I, O>: StrictOperator<O> {
async fn eval_strict(&mut self, input: &I);
async fn eval_strict_owned(&mut self, input: I) {
self.eval_strict(&input).await
}
fn input_preference(&self) -> OwnershipPreference {
OwnershipPreference::INDIFFERENT
}
}
pub trait ImportOperator<I, O>: Operator {
fn import(&mut self, val: &I);
fn import_owned(&mut self, val: I);
async fn eval(&mut self) -> O;
fn input_preference(&self) -> OwnershipPreference {
OwnershipPreference::INDIFFERENT
}
}