use super::builder::Assigner;
use crate::{data::ArconType, stream::time::ArconTime};
use std::sync::Arc;
#[cfg(all(feature = "hardware_counters", target_os = "linux"))]
use crate::metrics::perf_event::PerfEvents;
cfg_if::cfg_if! {
if #[cfg(feature = "rocksdb")] {
#[cfg(not(test))]
pub type DefaultBackend = arcon_state::Rocks;
#[cfg(test)]
pub type DefaultBackend = arcon_state::Sled;
} else {
pub type DefaultBackend = arcon_state::Sled;
}
}
#[derive(Copy, Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
pub enum ParallelismStrategy {
Static(usize),
Managed,
}
impl Default for ParallelismStrategy {
fn default() -> Self {
ParallelismStrategy::Static(1)
}
}
#[derive(Clone, Copy, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
pub enum StreamKind {
Keyed,
Local,
}
impl Default for StreamKind {
fn default() -> Self {
StreamKind::Keyed
}
}
#[derive(Default, Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
pub struct OperatorConf {
pub parallelism_strategy: ParallelismStrategy,
pub stream_kind: StreamKind,
#[cfg(all(feature = "hardware_counters", target_os = "linux"))]
pub perf_events: PerfEvents,
}
impl OperatorConf {
#[cfg(all(feature = "serde", feature = "hocon"))]
pub fn from_file(path: impl AsRef<std::path::Path>) -> OperatorConf {
let data = std::fs::read_to_string(path).unwrap();
hocon::HoconLoader::new()
.load_str(&data)
.unwrap()
.resolve()
.unwrap()
}
}
pub type TimestampExtractor<A> = Arc<dyn Fn(&A) -> u64 + Send + Sync>;
#[derive(Clone)]
pub struct SourceConf<S: ArconType> {
pub extractor: Option<TimestampExtractor<S>>,
pub time: ArconTime,
pub batch_size: usize,
pub name: String,
}
impl<S: ArconType> SourceConf<S> {
pub fn set_arcon_time(&mut self, time: ArconTime) {
self.time = time;
}
pub fn set_timestamp_extractor(&mut self, f: impl Fn(&S) -> u64 + Send + Sync + 'static) {
self.extractor = Some(Arc::new(f));
}
pub fn set_batch_size(&mut self, size: usize) {
self.batch_size = size;
}
pub fn set_source_name(&mut self, name: String) {
self.name = name;
}
}
impl<S: ArconType> Default for SourceConf<S> {
fn default() -> Self {
Self {
extractor: None,
time: Default::default(),
batch_size: 1024,
name: format!("source_{}", uuid::Uuid::new_v4()),
}
}
}
#[derive(Clone, Copy)]
pub struct WindowConf {
pub assigner: Assigner,
}