arcon 0.2.1

A runtime for writing streaming applications
Documentation
use super::super::WindowIndex;
use crate::data::ArconType;
use crate::error::ArconResult;
use crate::stream::operator::window::WindowContext;
use crate::util::ArconFnBounds;
use crate::{index::IndexOps, table::ImmutableTable};
use arcon_state::{backend::handles::ActiveHandle, Backend, Handle, VecState};
use std::sync::Arc;

pub struct AppenderWindow<IN, OUT, F, B>
where
    IN: ArconType,
    OUT: ArconType,
    F: Fn(&[IN]) -> OUT + ArconFnBounds,
    B: Backend,
{
    handle: ActiveHandle<B, VecState<IN>, u64, u64>,
    materializer: F,
}
impl<IN, OUT, F, B> AppenderWindow<IN, OUT, F, B>
where
    IN: ArconType,
    OUT: ArconType,
    F: Fn(&[IN]) -> OUT + ArconFnBounds,
    B: Backend,
{
    pub fn new(backend: Arc<B>, materializer: F) -> Self {
        let mut handle = Handle::vec("window_handle")
            .with_item_key(0)
            .with_namespace(0);

        backend.register_vec_handle(&mut handle);

        let handle = handle.activate(backend);

        Self {
            handle,
            materializer,
        }
    }
}

impl<IN, OUT, F, B> WindowIndex for AppenderWindow<IN, OUT, F, B>
where
    IN: ArconType,
    OUT: ArconType,
    F: Fn(&[IN]) -> OUT + ArconFnBounds,
    B: Backend,
{
    type IN = IN;
    type OUT = OUT;

    fn on_element(&mut self, element: Self::IN, ctx: WindowContext) -> ArconResult<()> {
        self.handle.set_item_key(ctx.key);
        self.handle.set_namespace(ctx.index);

        self.handle.append(element)?;
        Ok(())
    }

    fn result(&mut self, ctx: WindowContext) -> ArconResult<Self::OUT> {
        self.handle.set_item_key(ctx.key);
        self.handle.set_namespace(ctx.index);

        let buf = self.handle.get()?;
        Ok((self.materializer)(&buf))
    }

    fn clear(&mut self, ctx: WindowContext) -> ArconResult<()> {
        self.handle.set_item_key(ctx.key);
        self.handle.set_namespace(ctx.index);

        self.handle.clear()?;
        Ok(())
    }
}
impl<IN, OUT, F, B> IndexOps for AppenderWindow<IN, OUT, F, B>
where
    IN: ArconType,
    OUT: ArconType,
    F: Fn(&[IN]) -> OUT + ArconFnBounds,
    B: Backend,
{
    fn persist(&mut self) -> ArconResult<()> {
        Ok(())
    }
    fn set_key(&mut self, _: u64) {}
    fn table(&mut self) -> ArconResult<Option<ImmutableTable>> {
        Ok(None)
    }
}