use crate::{
data::StateID,
dataflow::conf::{DefaultBackend, OperatorConf, SourceConf},
index::{ArconState, EMPTY_STATE_ID},
stream::{
operator::Operator,
source::Source,
time::{ArconTime, Time},
},
};
use arcon_state::Backend;
use std::sync::Arc;
#[derive(Clone)]
pub struct OperatorBuilder<OP: Operator, Backend = DefaultBackend> {
pub operator: Arc<dyn Fn() -> OP + Send + Sync + 'static>,
pub state: Arc<dyn Fn(Arc<Backend>) -> OP::OperatorState + Send + Sync + 'static>,
pub conf: OperatorConf,
}
impl<OP: Operator, Backend: arcon_state::Backend> OperatorBuilder<OP, Backend> {
pub(crate) fn create_backend(
&self,
state_dir: std::path::PathBuf,
name: String,
) -> Arc<Backend> {
Arc::new(Backend::create(&state_dir, name).unwrap())
}
pub(crate) fn _state_id(&self) -> StateID {
let mut state_id = OP::OperatorState::STATE_ID.to_owned();
if state_id == EMPTY_STATE_ID {
let unique_id = uuid::Uuid::new_v4().to_string();
state_id = format!("{}_{}", state_id, unique_id);
}
state_id
}
}
type SourceIndex = usize;
type TotalSources = usize;
#[derive(Clone)]
pub enum SourceBuilderType<S, B>
where
S: Source,
B: Backend,
{
Single(SourceBuilder<S, B>),
Parallel(ParallelSourceBuilder<S, B>),
}
impl<S, B> SourceBuilderType<S, B>
where
S: Source,
B: Backend,
{
pub fn parallelism(&self) -> usize {
match self {
SourceBuilderType::Single(_) => 1,
SourceBuilderType::Parallel(builder) => builder.parallelism,
}
}
pub fn time(&self) -> ArconTime {
match self {
SourceBuilderType::Single(builder) => builder.conf.time,
SourceBuilderType::Parallel(builder) => builder.conf.time,
}
}
}
#[derive(Clone)]
pub struct SourceBuilder<S: Source, Backend = DefaultBackend> {
pub constructor: Arc<dyn Fn(Arc<Backend>) -> S + Send + Sync + 'static>,
pub conf: SourceConf<S::Item>,
}
#[derive(Clone)]
pub struct ParallelSourceBuilder<S: Source, Backend = DefaultBackend> {
pub constructor:
Arc<dyn Fn(Arc<Backend>, SourceIndex, TotalSources) -> S + Send + Sync + 'static>,
pub conf: SourceConf<S::Item>,
pub parallelism: usize,
}
#[derive(Clone)]
pub struct KeyBuilder<T> {
pub extractor: Arc<(dyn Fn(&T) -> u64 + Send + Sync)>,
}
impl<T> KeyBuilder<T> {
pub fn get_key(&self, event: &T) -> u64 {
(self.extractor)(event)
}
}
#[derive(Clone, Copy)]
pub enum Assigner {
Sliding {
length: Time,
slide: Time,
late_arrival: Time,
},
Tumbling {
length: Time,
late_arrival: Time,
},
}