pub mod function;
pub mod sink;
pub mod window;
#[cfg(feature = "metrics")]
use metrics::{gauge, increment_counter, register_counter, register_gauge};
use crate::{
application::conf::logger::ArconLogger,
data::{ArconElement, ArconType},
error::{timer::TimerResult, *},
index::ArconState,
stream::node::timer::ArconTimer,
};
use prost::Message;
pub trait Operator: Send + Sized {
type IN: ArconType;
type OUT: ArconType;
type TimerState: Message + Clone + Default;
type OperatorState: ArconState;
type ElementIterator: IntoIterator<Item = ArconElement<Self::OUT>> + 'static;
fn on_start(
&mut self,
_ctx: &mut OperatorContext<Self::TimerState, Self::OperatorState>,
) -> ArconResult<()> {
Ok(())
}
fn handle_element(
&mut self,
element: ArconElement<Self::IN>,
ctx: &mut OperatorContext<Self::TimerState, Self::OperatorState>,
) -> ArconResult<Self::ElementIterator>;
fn handle_timeout(
&mut self,
timeout: Self::TimerState,
ctx: &mut OperatorContext<Self::TimerState, Self::OperatorState>,
) -> ArconResult<Option<Self::ElementIterator>>;
}
#[macro_export]
macro_rules! ignore_timeout {
() => {
fn handle_timeout(
&mut self,
_timeout: Self::TimerState,
_ctx: &mut OperatorContext<Self::TimerState, Self::OperatorState>,
) -> ArconResult<Option<Self::ElementIterator>> {
Ok(None)
}
};
}
pub struct OperatorContext<TimerState, OperatorState>
where
TimerState: Message + Clone + Default,
OperatorState: ArconState,
{
pub(crate) timer: Box<dyn ArconTimer<Value = TimerState>>,
pub(crate) state: OperatorState,
pub(crate) logger: ArconLogger,
pub(crate) current_key: u64,
#[cfg(feature = "metrics")]
name: String,
}
impl<TimerState, OperatorState> OperatorContext<TimerState, OperatorState>
where
TimerState: Message + Clone + Default,
OperatorState: ArconState,
{
#[inline]
pub(crate) fn new(
timer: Box<dyn ArconTimer<Value = TimerState>>,
state: OperatorState,
logger: ArconLogger,
#[cfg(feature = "metrics")] name: String,
) -> Self {
OperatorContext {
timer,
state,
logger,
current_key: 0,
#[cfg(feature = "metrics")]
name,
}
}
#[inline]
pub fn state(&mut self) -> &mut OperatorState {
self.state.set_key(self.current_key);
&mut self.state
}
#[inline]
pub fn log(&self) -> &ArconLogger {
&self.logger
}
#[inline]
pub fn current_time(&mut self) -> StateResult<u64> {
self.timer.get_time()
}
#[inline]
pub fn schedule_at(&mut self, time: u64, entry: TimerState) -> TimerResult<TimerState> {
self.timer.active_key(self.current_key);
self.timer.schedule_at(time, entry)
}
#[cfg(feature = "metrics")]
pub fn register_gauge(&mut self, name: &str) {
register_gauge!(format!("{}_{}", self.name, name));
}
#[cfg(feature = "metrics")]
pub fn update_gauge(&self, name: &str, value: f64) {
gauge!(format!("{}_{}", self.name, name), value);
}
#[cfg(feature = "metrics")]
pub fn register_counter(&self, name: &str) {
register_counter!(format!("{}_{}", self.name, name));
}
#[cfg(feature = "metrics")]
pub fn increment_counter(&self, name: &str) {
increment_counter!(format!("{}_{}", self.name, name));
}
}