use std::any::Any;
use std::marker::PhantomData;
use async_trait::async_trait;
use super::erased::{Dispatch, ErasedRecord};
use super::record::{Record, RecordContext};
#[async_trait]
pub trait Processor<KIn: Send, VIn: Send, KOut: Send, VOut: Send>: Send + 'static {
async fn init(&mut self, _ctx: &mut ProcessorContext<'_, '_, KOut, VOut>) {}
async fn process(
&mut self,
ctx: &mut ProcessorContext<'_, '_, KOut, VOut>,
record: Record<KIn, VIn>,
);
async fn close(&mut self) {}
}
#[async_trait]
impl<KIn, VIn, KOut, VOut> Processor<KIn, VIn, KOut, VOut>
for Box<dyn Processor<KIn, VIn, KOut, VOut>>
where
KIn: Send + 'static,
VIn: Send + 'static,
KOut: Send + 'static,
VOut: Send + 'static,
{
async fn init(&mut self, ctx: &mut ProcessorContext<'_, '_, KOut, VOut>) {
(**self).init(ctx).await;
}
async fn process(
&mut self,
ctx: &mut ProcessorContext<'_, '_, KOut, VOut>,
record: Record<KIn, VIn>,
) {
(**self).process(ctx, record).await;
}
async fn close(&mut self) {
(**self).close().await;
}
}
pub trait ProcessorSupplier<KIn, VIn, KOut, VOut>: Send + Sync + 'static {
fn get(&self) -> Box<dyn Processor<KIn, VIn, KOut, VOut>>;
}
impl<F, P, KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> for F
where
F: Fn() -> P + Send + Sync + 'static,
KIn: Send,
VIn: Send,
KOut: Send,
VOut: Send,
P: Processor<KIn, VIn, KOut, VOut>,
{
fn get(&self) -> Box<dyn Processor<KIn, VIn, KOut, VOut>> {
Box::new(self())
}
}
pub struct ProcessorContext<'ctx, 'd, KOut, VOut> {
dispatch: &'ctx mut Dispatch<'d>,
_pd: PhantomData<fn(KOut, VOut)>,
}
impl<'ctx, 'd, KOut, VOut> ProcessorContext<'ctx, 'd, KOut, VOut>
where
KOut: Any + Send + Clone,
VOut: Any + Send + Clone,
{
pub(crate) fn new(dispatch: &'ctx mut Dispatch<'d>) -> Self {
Self {
dispatch,
_pd: PhantomData,
}
}
pub fn forward(&mut self, record: Record<KOut, VOut>) {
let children = self.dispatch.children;
let Some((&last, rest)) = children.split_last() else {
return; };
for &child in rest {
let key: Option<Box<dyn Any + Send>> = record
.key
.clone()
.map(|k| Box::new(k) as Box<dyn Any + Send>);
let value: Box<dyn Any + Send> = Box::new(record.value.clone());
self.dispatch
.buffer
.push_back((child, ErasedRecord::new(key, value, record.timestamp)));
}
let ts = record.timestamp;
let key: Option<Box<dyn Any + Send>> =
record.key.map(|k| Box::new(k) as Box<dyn Any + Send>);
let value: Box<dyn Any + Send> = Box::new(record.value);
self.dispatch
.buffer
.push_back((last, ErasedRecord::new(key, value, ts)));
}
pub fn get_state_store<K2: Send + Sync + 'static, V2: Send + 'static>(
&mut self,
name: &str,
) -> Option<&mut dyn crate::store::api::KeyValueStore<K2, V2>> {
self.dispatch.stores.get_kv::<K2, V2>(name)
}
pub async fn global_get<GK: Send + Sync + 'static, VG: Send + 'static>(
&mut self,
store: &str,
key: &GK,
) -> Option<VG> {
self.dispatch.globals.get::<GK, VG>(store, key).await
}
pub fn get_window_store<K2: Send + Sync + 'static, V2: Send + 'static>(
&mut self,
name: &str,
) -> Option<&mut dyn crate::store::window::WindowStore<K2, V2>> {
self.dispatch.stores.get_window::<K2, V2>(name)
}
pub fn get_join_window_store<K2: Send + Sync + 'static, V2: Send + 'static>(
&mut self,
name: &str,
) -> Option<&mut dyn crate::store::join_window::JoinWindowStore<K2, V2>> {
self.dispatch.stores.get_join_window::<K2, V2>(name)
}
pub fn get_session_store<K2: Send + Sync + 'static, V2: Send + 'static>(
&mut self,
name: &str,
) -> Option<&mut dyn crate::store::session::SessionStore<K2, V2>> {
self.dispatch.stores.get_session::<K2, V2>(name)
}
pub fn get_versioned_store<K2: Send + Sync + 'static, V2: Send + 'static>(
&mut self,
name: &str,
) -> Option<&mut dyn crate::store::versioned::VersionedKeyValueStore<K2, V2>> {
self.dispatch.stores.get_versioned::<K2, V2>(name)
}
pub(crate) fn get_suppress_store<K2: Send + Sync + 'static, V2: Send + 'static>(
&mut self,
name: &str,
) -> Option<&mut dyn crate::store::suppress_store::SuppressStore<K2, V2>> {
self.dispatch.stores.get_suppress::<K2, V2>(name)
}
pub(crate) fn get_join_grace_store<K2: Send + Sync + 'static, V2: Send + 'static>(
&mut self,
name: &str,
) -> Option<&mut crate::store::join_grace_buffer::JoinGraceBufferStore<K2, V2>> {
self.dispatch.stores.get_join_grace::<K2, V2>(name)
}
pub(crate) fn get_fk_subscription_store(
&mut self,
name: &str,
) -> Option<&mut crate::store::fk_subscription::SubscriptionBytesStore> {
self.dispatch.stores.get_fk_subscription(name)
}
#[must_use]
pub fn record_context(&self) -> &RecordContext {
self.dispatch.record_ctx
}
#[must_use]
pub fn store_is_cached(&self, name: &str) -> bool {
self.dispatch.stores.kv_is_cached(name)
}
pub fn schedule<P>(
&mut self,
interval: std::time::Duration,
ty: crate::processor::punctuation::PunctuationType,
punctuator: P,
) -> crate::processor::punctuation::Cancellable
where
P: crate::processor::punctuation::Punctuator<KOut, VOut>,
{
use crate::processor::punctuation::PunctuationType;
let interval_ms = i64::try_from(interval.as_millis()).unwrap_or(i64::MAX);
assert!(
interval_ms >= 1,
"schedule interval must be positive (>= 1ms)"
);
let base = match ty {
PunctuationType::StreamTime => self.dispatch.sched_stream_time,
PunctuationType::WallClockTime => self.dispatch.sched_wall_clock,
};
let next_time = base.saturating_add(interval_ms);
let cancel = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let erased: Box<dyn crate::processor::punctuation::ErasedPunctuator> =
Box::new(crate::processor::punctuation::TypedPunctuator::<
KOut,
VOut,
P,
>::new(punctuator));
self.dispatch
.schedules
.push(crate::processor::punctuation::ScheduleEntry {
node_idx: self.dispatch.node_idx,
interval_ms,
ty,
next_time,
punctuator: erased,
cancel: cancel.clone(),
});
crate::processor::punctuation::Cancellable::new(cancel)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::processor::erased::{Dispatch, ErasedRecord};
use crate::processor::record::{Record, RecordContext};
use assert2::check;
use std::collections::VecDeque;
struct Upper;
crate::impl_processor! {
impl Upper: (String, String) -> (String, String) {
async fn process(&mut self, ctx, r) {
ctx.forward(Record::new(r.key, r.value.to_uppercase(), r.timestamp));
}
}
}
struct Noop;
#[async_trait]
impl Processor<String, String, String, String> for Noop {
async fn process(
&mut self,
_ctx: &mut ProcessorContext<'_, '_, String, String>,
_r: Record<String, String>,
) {
}
}
#[tokio::test]
async fn forward_pushes_erased_record_to_each_child() {
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let rc = RecordContext {
topic: "t".into(),
partition: 0,
offset: 0,
timestamp: 5,
};
let children = [3usize, 4usize];
let mut stores = crate::store::registry::StoreRegistry::default();
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut dispatch);
Upper
.process(&mut ctx, Record::new(Some("k".into()), "hi".into(), 5))
.await;
check!(buffer.len() == 2);
let (child, rec) = buffer.pop_front().unwrap();
check!(child == 3);
check!(*rec.value.downcast::<String>().unwrap() == "HI");
}
#[tokio::test]
async fn boxed_dyn_processor_delegates_init_process_close() {
let mut boxed: Box<dyn Processor<String, String, String, String>> = Box::new(Upper);
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let rc = RecordContext {
topic: "t".into(),
partition: 0,
offset: 0,
timestamp: 5,
};
let children = [1usize];
let mut stores = crate::store::registry::StoreRegistry::default();
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut dispatch);
boxed.init(&mut ctx).await; boxed
.process(&mut ctx, Record::new(None, "hi".into(), 5))
.await; boxed.close().await; check!(buffer.len() == 1);
let (_child, rec) = buffer.pop_front().unwrap();
check!(*rec.value.downcast::<String>().unwrap() == "HI");
}
#[tokio::test]
async fn default_init_and_close_are_noops_and_forward_with_no_children_drops() {
let mut p = Noop;
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let rc = RecordContext {
topic: "t".into(),
partition: 0,
offset: 0,
timestamp: 9,
};
let mut stores = crate::store::registry::StoreRegistry::default();
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &[],
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut dispatch);
p.init(&mut ctx).await; check!(ctx.record_context().timestamp == 9);
ctx.forward(Record::new(None, "x".to_string(), 0)); check!(buffer.is_empty());
p.close().await; }
}