arcon 0.2.1

A runtime for writing streaming applications
Documentation
use crate::{
    error::ArconResult,
    index::{AppenderIndex, IndexOps},
    table::ImmutableTable,
};
use arcon_state::{
    backend::{
        handles::{ActiveHandle, Handle},
        Backend, VecState,
    },
    data::Value,
    error::*,
};
use std::sync::Arc;

#[derive(Debug)]
pub struct EagerAppender<V, B>
where
    V: Value,
    B: Backend,
{
    /// A handle to the VecState
    handle: ActiveHandle<B, VecState<V>, u64>,
}

impl<V, B> EagerAppender<V, B>
where
    V: Value,
    B: Backend,
{
    /// Creates an EagerAppender
    pub fn new(id: impl Into<String>, backend: Arc<B>) -> Self {
        let mut handle = Handle::vec(id.into()).with_item_key(0);
        backend.register_vec_handle(&mut handle);
        let handle: ActiveHandle<B, VecState<V>, u64> = handle.activate(backend);
        EagerAppender { handle }
    }
}

impl<V, B> IndexOps for EagerAppender<V, B>
where
    V: Value,
    B: Backend,
{
    fn persist(&mut self) -> ArconResult<()> {
        Ok(())
    }
    fn set_key(&mut self, key: u64) {
        self.handle.set_item_key(key);
    }
    fn table(&mut self) -> ArconResult<Option<ImmutableTable>> {
        Ok(None)
    }
}

impl<V, B> AppenderIndex<V> for EagerAppender<V, B>
where
    V: Value,
    B: Backend,
{
    #[inline]
    fn append(&mut self, data: V) -> Result<()> {
        self.handle.append(data)
    }
    #[inline]
    fn consume(&mut self) -> Result<Vec<V>> {
        let stored = self.handle.get()?;
        self.handle.clear()?;
        Ok(stored)
    }
    #[inline]
    fn len(&self) -> usize {
        self.handle.len().unwrap_or(0)
    }
    #[inline]
    fn is_empty(&self) -> bool {
        self.len() == 0
    }
}