use super::super::*;
use crate::operator::{Data, StreamElement};
#[derive(Default)]
pub enum TransactionOp {
#[default]
Continue,
Commit,
CommitAfter(Timestamp),
Discard,
}
#[derive(Clone)]
pub struct TransactionWindowManager<A, F>
where
A: WindowAccumulator,
F: Fn(&A::In) -> TransactionOp,
{
init: A,
f: F,
w: Option<Slot<A>>,
}
#[derive(Clone)]
struct Slot<A> {
acc: A,
close: Option<Timestamp>,
}
impl<A> Slot<A> {
#[inline]
fn new(acc: A) -> Self {
Self { acc, close: None }
}
}
impl<A: WindowAccumulator, F: Fn(&A::In) -> TransactionOp + Clone + Send + 'static> WindowManager
for TransactionWindowManager<A, F>
where
A::In: Data,
A::Out: Data,
{
type In = A::In;
type Out = A::Out;
type Output = Option<WindowResult<A::Out>>;
#[inline]
fn process(&mut self, el: StreamElement<A::In>) -> Self::Output {
macro_rules! return_current {
() => {
return Some(WindowResult::Item(self.w.take().unwrap().acc.output()))
};
}
match el {
StreamElement::Timestamped(item, _ts) => {
let slot = self.w.get_or_insert_with(|| Slot::new(self.init.clone()));
let command = (self.f)(&item);
slot.acc.process(item);
match command {
TransactionOp::Commit => return_current!(),
TransactionOp::CommitAfter(t) => slot.close = Some(t),
TransactionOp::Discard => self.w = None,
TransactionOp::Continue => {}
}
}
StreamElement::Watermark(ts) => {
if let Some(close) = self.w.as_ref().and_then(|w| w.close) {
if close < ts {
return_current!()
}
}
}
StreamElement::Terminate | StreamElement::FlushAndRestart
if self.w.as_ref().and_then(|w| w.close).is_some() =>
{
return_current!()
}
StreamElement::Item(_) => panic!(
"Non timestamped streams are not currently supported with transaction windows!"
),
_ => {}
}
None
}
fn recycle(&self) -> bool {
self.w.is_none()
}
}
#[derive(Clone)]
pub struct TransactionWindow<T, F: Fn(&T) -> TransactionOp> {
logic: F,
_t: PhantomData<T>,
}
impl<T, F: Fn(&T) -> TransactionOp> TransactionWindow<T, F> {
#[inline]
pub fn new(logic: F) -> Self {
Self {
logic,
_t: PhantomData,
}
}
}
impl<T: Data, F: Fn(&T) -> TransactionOp + Data> WindowDescription<T> for TransactionWindow<T, F> {
type Manager<A: WindowAccumulator<In = T>> = TransactionWindowManager<A, F>;
#[inline]
fn build<A: WindowAccumulator<In = T>>(&self, accumulator: A) -> Self::Manager<A> {
TransactionWindowManager {
init: accumulator,
f: self.logic.clone(),
w: None,
}
}
}