arcon 0.2.1

A runtime for writing streaming applications
Documentation
use crate::{
    data::{ArconElement, ArconNever, ArconType},
    error::*,
    index::{ArconState, EmptyState},
    stream::operator::{Operator, OperatorContext},
    util::ArconFnBounds,
};
use std::marker::PhantomData;

pub struct Map<IN, OUT, F, S>
where
    IN: ArconType,
    OUT: ArconType,
    F: Fn(IN, &mut S) -> ArconResult<OUT> + ArconFnBounds,
    S: ArconState,
{
    udf: F,
    _marker: PhantomData<fn(IN, S) -> ArconResult<OUT>>,
}

impl<IN, OUT> Map<IN, OUT, fn(IN, &mut EmptyState) -> ArconResult<OUT>, EmptyState>
where
    IN: ArconType,
    OUT: ArconType,
{
    #[allow(clippy::new_ret_no_self)]
    pub fn new(
        udf: impl Fn(IN) -> OUT + ArconFnBounds,
    ) -> Map<IN, OUT, impl Fn(IN, &mut EmptyState) -> ArconResult<OUT> + ArconFnBounds, EmptyState>
    {
        let udf = move |input: IN, _: &mut EmptyState| {
            let output = udf(input);
            Ok(output)
        };

        Map {
            udf,
            _marker: Default::default(),
        }
    }
}

impl<IN, OUT, F, S> Map<IN, OUT, F, S>
where
    IN: ArconType,
    OUT: ArconType,
    F: Fn(IN, &mut S) -> ArconResult<OUT> + ArconFnBounds,
    S: ArconState,
{
    pub fn stateful(udf: F) -> Self {
        Map {
            udf,
            _marker: Default::default(),
        }
    }
}

impl<IN, OUT, F, S> Operator for Map<IN, OUT, F, S>
where
    IN: ArconType,
    OUT: ArconType,
    F: Fn(IN, &mut S) -> ArconResult<OUT> + ArconFnBounds,
    S: ArconState,
{
    type IN = IN;
    type OUT = OUT;
    type TimerState = ArconNever;
    type OperatorState = S;
    type ElementIterator = std::iter::Once<ArconElement<Self::OUT>>;

    fn handle_element(
        &mut self,
        element: ArconElement<IN>,
        ctx: &mut OperatorContext<Self::TimerState, Self::OperatorState>,
    ) -> ArconResult<Self::ElementIterator> {
        let data = (self.udf)(element.data, ctx.state())?;
        Ok(std::iter::once(ArconElement::with_timestamp(
            data,
            element.timestamp,
        )))
    }

    crate::ignore_timeout!();
}