#[allow(dead_code)]
pub mod appender;
pub mod hash_table;
pub mod value;
pub mod window;
use crate::error::ArconResult;
use crate::stream::operator::window::WindowContext;
use crate::ArconType;
use crate::{data::arrow::ToArrow, manager::snapshot::Snapshot, table::ImmutableTable};
use arcon_state::{
data::{Key, Value},
error::Result,
Backend,
};
use std::{borrow::Cow, sync::Arc};
pub trait IndexValue: Value + ToArrow {}
impl<T> IndexValue for T where T: Value + ToArrow {}
pub use self::{
appender::eager::EagerAppender,
hash_table::{eager::EagerHashTable, HashTable},
value::{EagerValue, LazyValue, LocalValue},
window::appender::AppenderWindow,
window::arrow::ArrowWindow,
window::incremental::IncrementalWindow,
};
pub trait IndexOps {
fn persist(&mut self) -> ArconResult<()>;
fn set_key(&mut self, key: u64);
fn table(&mut self) -> ArconResult<Option<ImmutableTable>>;
}
pub trait ArconState: Send + 'static {
const STATE_ID: &'static str;
fn restore<B: Backend>(snapshot: Snapshot, f: Arc<dyn Fn(Arc<B>) -> Self>) -> ArconResult<Self>
where
Self: Sized,
{
let snapshot_dir = std::path::Path::new(&snapshot.snapshot_path);
let backend = B::restore(snapshot_dir, snapshot_dir, String::from(Self::STATE_ID))?;
Ok(f(Arc::new(backend)))
}
fn persist(&mut self) -> ArconResult<()>;
fn set_key(&mut self, key: u64);
fn tables(&mut self) -> Vec<ImmutableTable>;
fn table_ids() -> Vec<String>;
fn get_table(&mut self, id: &str) -> ArconResult<Option<ImmutableTable>>;
fn has_tables() -> bool;
}
pub const EMPTY_STATE_ID: &str = "!";
#[derive(Clone, Copy)]
pub struct EmptyState;
impl ArconState for EmptyState {
const STATE_ID: &'static str = EMPTY_STATE_ID;
fn persist(&mut self) -> ArconResult<()> {
Ok(())
}
fn set_key(&mut self, _: u64) {}
fn tables(&mut self) -> Vec<ImmutableTable> {
Vec::new()
}
fn table_ids() -> Vec<String> {
Vec::new()
}
fn get_table(&mut self, _: &str) -> ArconResult<Option<ImmutableTable>> {
Ok(None)
}
fn has_tables() -> bool {
false
}
}
impl IndexOps for EmptyState {
fn persist(&mut self) -> ArconResult<()> {
Ok(())
}
fn set_key(&mut self, _: u64) {
}
fn table(&mut self) -> ArconResult<Option<ImmutableTable>> {
Ok(None)
}
}
pub trait AppenderIndex<V>: Send + Sized + IndexOps + 'static
where
V: Value,
{
fn append(&mut self, value: V) -> Result<()>;
fn consume(&mut self) -> Result<Vec<V>>;
fn len(&self) -> usize;
fn is_empty(&self) -> bool;
}
pub trait ValueIndex<V>: Send + Sized + IndexOps + 'static
where
V: Value,
{
fn put(&mut self, value: V) -> Result<()>;
fn get(&self) -> Result<Option<Cow<V>>>;
fn take(&mut self) -> Result<Option<V>>;
fn clear(&mut self) -> Result<()>;
fn rmw<F>(&mut self, f: F) -> Result<()>
where
F: FnMut(&mut V) + Sized;
}
pub trait MapIndex<K, V>: Send + Sized + IndexOps + 'static
where
K: Key,
V: Value,
{
fn put(&mut self, key: &K, value: V) -> Result<()>;
fn get(&self, key: &K) -> Result<Option<V>>;
fn take(&mut self, key: &K) -> Result<Option<V>>;
fn clear(&mut self, key: &K) -> Result<()>;
fn len(&self) -> usize;
fn is_empty(&self) -> bool;
fn rmw<F>(&mut self, key: &K, value: V)
where
F: FnMut(&mut V) + Sized;
}
pub trait WindowIndex: Send + Sized + IndexOps + 'static {
type IN: ArconType;
type OUT: ArconType;
fn on_element(&mut self, element: Self::IN, ctx: WindowContext) -> ArconResult<()>;
fn result(&mut self, ctx: WindowContext) -> ArconResult<Self::OUT>;
fn clear(&mut self, ctx: WindowContext) -> ArconResult<()>;
}