use std::any::Any;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use async_trait::async_trait;
use crate::processor::api::ProcessorContext;
use crate::processor::erased::Dispatch;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum PunctuationType {
StreamTime,
WallClockTime,
}
#[async_trait]
pub trait Punctuator<K: Send, V: Send>: Send + 'static {
async fn punctuate(&mut self, ctx: &mut ProcessorContext<'_, '_, K, V>, timestamp: i64);
}
#[derive(Clone)]
pub struct Cancellable(Arc<AtomicBool>);
impl Cancellable {
pub(crate) fn new(flag: Arc<AtomicBool>) -> Self {
Self(flag)
}
pub fn cancel(&self) {
self.0.store(true, Ordering::SeqCst);
}
}
#[async_trait]
pub(crate) trait ErasedPunctuator: Send {
async fn fire(&mut self, dispatch: &mut Dispatch<'_>, timestamp: i64);
}
pub(crate) struct TypedPunctuator<K, V, P> {
inner: P,
_pd: PhantomData<fn(K, V)>,
}
impl<K, V, P> TypedPunctuator<K, V, P> {
pub(crate) fn new(inner: P) -> Self {
Self {
inner,
_pd: PhantomData,
}
}
}
#[async_trait]
impl<K, V, P> ErasedPunctuator for TypedPunctuator<K, V, P>
where
K: Any + Send + Clone,
V: Any + Send + Clone,
P: Punctuator<K, V>,
{
async fn fire(&mut self, dispatch: &mut Dispatch<'_>, timestamp: i64) {
let mut ctx = ProcessorContext::<'_, '_, K, V>::new(dispatch);
self.inner.punctuate(&mut ctx, timestamp).await;
}
}
pub(crate) struct ScheduleEntry {
pub node_idx: usize,
pub interval_ms: i64,
pub ty: PunctuationType,
pub next_time: i64,
pub punctuator: Box<dyn ErasedPunctuator>,
pub cancel: Arc<AtomicBool>,
}
impl ScheduleEntry {
pub fn is_cancelled(&self) -> bool {
self.cancel.load(Ordering::SeqCst)
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::check;
#[test]
fn cancellable_flips_the_flag() {
let flag = Arc::new(AtomicBool::new(false));
let c = Cancellable::new(flag.clone());
check!(!flag.load(Ordering::SeqCst));
c.cancel();
check!(flag.load(Ordering::SeqCst));
}
#[test]
fn schedule_entry_reports_cancelled() {
struct NoOp;
#[async_trait]
impl ErasedPunctuator for NoOp {
async fn fire(&mut self, _d: &mut Dispatch<'_>, _ts: i64) {}
}
let flag = Arc::new(AtomicBool::new(false));
let e = ScheduleEntry {
node_idx: 0,
interval_ms: 10,
ty: PunctuationType::StreamTime,
next_time: 0,
punctuator: Box::new(NoOp),
cancel: flag.clone(),
};
check!(!e.is_cancelled());
flag.store(true, Ordering::SeqCst);
check!(e.is_cancelled());
}
}