use std::collections::{HashMap, VecDeque};
use std::fmt::Display;
use std::marker::PhantomData;
pub use descr::*;
use crate::block::{GroupHasherBuilder, OperatorStructure, Replication};
use crate::operator::{Data, DataKey, ExchangeData, Operator, StreamElement, Timestamp};
use crate::stream::{KeyedStream, Stream, WindowedStream};
mod aggr;
mod descr;
pub trait WindowDescription<T> {
type Manager<A: WindowAccumulator<In = T>>: WindowManager<In = T, Out = A::Out> + 'static;
fn build<A: WindowAccumulator<In = T>>(&self, accumulator: A) -> Self::Manager<A>;
}
pub trait WindowAccumulator: Clone + Send + 'static {
type In: Data;
type Out: Data;
fn process(&mut self, el: Self::In);
fn output(self) -> Self::Out;
}
#[derive(Clone)]
pub(crate) struct KeyedWindowManager<Key, In, Out, W: WindowManager> {
windows: HashMap<Key, W, GroupHasherBuilder>,
init: W,
_in: PhantomData<In>,
_out: PhantomData<Out>,
}
pub trait WindowManager: Clone + Send {
type In: Data;
type Out: Data;
type Output: IntoIterator<Item = WindowResult<Self::Out>>;
fn process(&mut self, el: StreamElement<Self::In>) -> Self::Output;
fn recycle(&self) -> bool {
false
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WindowResult<T> {
Item(T),
Timestamped(T, Timestamp),
}
impl<T> WindowResult<T> {
#[inline]
pub fn new(item: T, timestamp: Option<Timestamp>) -> Self {
match timestamp {
Some(ts) => WindowResult::Timestamped(item, ts),
None => WindowResult::Item(item),
}
}
#[inline]
pub fn item(&self) -> &T {
match self {
WindowResult::Item(item) => item,
WindowResult::Timestamped(item, _) => item,
}
}
#[inline]
pub fn unwrap_item(self) -> T {
match self {
WindowResult::Item(item) => item,
WindowResult::Timestamped(item, _) => item,
}
}
}
impl<T> From<WindowResult<T>> for StreamElement<T> {
#[inline]
fn from(value: WindowResult<T>) -> Self {
match value {
WindowResult::Item(item) => StreamElement::Item(item),
WindowResult::Timestamped(item, ts) => StreamElement::Timestamped(item, ts),
}
}
}
#[derive(Clone)]
pub(crate) struct WindowOperator<Key, In, Out, Prev, W>
where
W: WindowManager,
{
prev: Prev,
name: String,
manager: KeyedWindowManager<Key, In, Out, W>,
output_buffer: VecDeque<StreamElement<(Key, Out)>>,
}
impl<Key, In, Out, Prev, W> Display for WindowOperator<Key, In, Out, Prev, W>
where
W: WindowManager,
Prev: Display,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} -> {} -> WindowOperator[{}]<{}>",
self.prev,
std::any::type_name::<W>(),
self.name,
std::any::type_name::<Out>(),
)
}
}
impl<Key, In, Out, Prev, W> Operator for WindowOperator<Key, In, Out, Prev, W>
where
W: WindowManager<In = In, Out = Out> + Send,
Prev: Operator<Out = (Key, In)>,
Key: DataKey,
In: Data,
Out: Data,
{
type Out = (Key, Out);
fn setup(&mut self, metadata: &mut crate::ExecutionMetadata) {
self.prev.setup(metadata);
}
fn next(&mut self) -> StreamElement<(Key, Out)> {
loop {
if let Some(item) = self.output_buffer.pop_front() {
return item;
}
let el = self.prev.next();
match el {
el @ (StreamElement::Item(_) | StreamElement::Timestamped(_, _)) => {
let (key, el) = el.take_key();
let key = key.unwrap();
let mgr = self
.manager
.windows
.entry(key.clone())
.or_insert_with(|| self.manager.init.clone());
let ret = mgr.process(el);
self.output_buffer.extend(
ret.into_iter()
.map(|e| StreamElement::from(e).add_key(key.clone())),
);
}
StreamElement::FlushBatch => return StreamElement::FlushBatch,
el => {
let (_, el) = el.take_key();
self.manager.windows.retain(|key, mgr| {
let ret = mgr.process(el.clone());
self.output_buffer.extend(
ret.into_iter()
.map(|e| StreamElement::from(e).add_key(key.clone())),
);
!mgr.recycle()
});
let msg = match el {
StreamElement::Watermark(w) => StreamElement::Watermark(w),
StreamElement::Terminate => StreamElement::Terminate,
StreamElement::FlushAndRestart => StreamElement::FlushAndRestart,
_ => unreachable!(),
};
self.output_buffer.push_back(msg);
}
}
}
}
fn structure(&self) -> crate::block::BlockStructure {
self.prev
.structure()
.add_operator(OperatorStructure::new::<(Key, Out), _>(&self.name))
}
}
impl<Key, In, Out, Prev, W> WindowOperator<Key, In, Out, Prev, W>
where
W: WindowManager,
{
pub(crate) fn new(
prev: Prev,
name: String,
manager: KeyedWindowManager<Key, In, Out, W>,
) -> Self {
Self {
prev,
name,
manager,
output_buffer: Default::default(),
}
}
}
impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
where
WindowDescr: WindowDescription<Out>,
OperatorChain: Operator<Out = (Key, Out)> + 'static,
Key: DataKey,
Out: Data,
{
pub(crate) fn add_window_operator<A, NewOut>(
self,
name: &str,
accumulator: A,
) -> KeyedStream<impl Operator<Out = (Key, NewOut)>>
where
NewOut: Data,
A: WindowAccumulator<In = Out, Out = NewOut>,
{
let stream = self.inner;
let init = self.descr.build::<A>(accumulator);
let manager: KeyedWindowManager<Key, Out, NewOut, WindowDescr::Manager<A>> =
KeyedWindowManager {
windows: HashMap::default(),
init,
_in: PhantomData,
_out: PhantomData,
};
stream .add_operator(|prev| WindowOperator::new(prev, name.into(), manager))
}
}
impl<Key: DataKey, Out: Data, OperatorChain> KeyedStream<OperatorChain>
where
OperatorChain: Operator<Out = (Key, Out)> + 'static,
{
pub fn window<WinOut: Data, WinDescr: WindowDescription<Out>>(
self,
descr: WinDescr,
) -> WindowedStream<impl Operator<Out = (Key, Out)>, WinOut, WinDescr> {
WindowedStream {
inner: self,
descr,
_win_out: PhantomData,
}
}
}
impl<Out: ExchangeData, OperatorChain> Stream<OperatorChain>
where
OperatorChain: Operator<Out = Out> + 'static,
{
pub fn window_all<WinOut: Data, WinDescr: WindowDescription<Out>>(
self,
descr: WinDescr,
) -> WindowedStream<impl Operator<Out = ((), Out)>, WinOut, WinDescr> {
self.replication(Replication::new_one())
.key_by(|_| ())
.window(descr)
}
}