use std::marker::PhantomData;
use async_trait::async_trait;
use crate::processor::api::{Processor, ProcessorContext};
use crate::processor::record::Record;
type Marker<T> = PhantomData<fn() -> T>;
pub(crate) struct KStreamKTableJoinGraceProcessor<K, V, VT, VO, F> {
pub table_store: String,
pub buffer_store: String,
pub grace_ms: i64,
pub joiner: F,
pub emit_on_miss: bool,
pub observed_stream_time: i64,
pub _pd: Marker<(K, V, VT, VO)>,
}
#[async_trait]
impl<K, V, VT, VO, F> Processor<K, V, K, VO> for KStreamKTableJoinGraceProcessor<K, V, VT, VO, F>
where
K: std::any::Any + Send + Sync + Clone,
V: std::any::Any + Send + Sync + Clone,
VT: Send + Sync + 'static,
VO: std::any::Any + Send + Clone,
F: Fn(&V, Option<&VT>) -> VO + Send + 'static,
{
async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, K, VO>, r: Record<K, V>) {
let key = r.key.expect("join requires a non-null key");
self.observed_stream_time = self.observed_stream_time.max(r.timestamp);
{
let buf = ctx
.get_join_grace_store::<K, V>(&self.buffer_store)
.expect("join grace buffer store not found");
buf.put(key, r.value, r.timestamp).await;
}
let threshold = self.observed_stream_time - self.grace_ms;
let due = {
let buf = ctx
.get_join_grace_store::<K, V>(&self.buffer_store)
.expect("join grace buffer store not found");
buf.drain_due(threshold).await
};
for (k, v, ts) in due {
let vt = match ctx.get_versioned_store::<K, VT>(&self.table_store) {
Some(s) => s.get_as_of(&k, ts).await.map(|rec| rec.value),
None => None,
};
if vt.is_some() || self.emit_on_miss {
let out = (self.joiner)(&v, vt.as_ref());
ctx.forward(Record::new(Some(k), out, ts));
}
}
}
}
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use std::marker::PhantomData;
use assert2::check;
use super::*;
use crate::processor::api::ProcessorContext;
use crate::processor::erased::{Dispatch, ErasedRecord};
use crate::processor::record::RecordContext;
use crate::processor::serde::{I64Serde, StringSerde};
use crate::store::registry::StoreRegistry;
use crate::store::versioned::{VersionedBytesStore, VersionedKeyValueStore};
async fn make_stores() -> StoreRegistry {
let mut stores = StoreRegistry::default();
let mut vt = VersionedBytesStore::<String, i64>::in_memory(
"vt".into(),
1_000_000,
Box::new(StringSerde),
Box::new(I64Serde),
"app-vt-changelog".into(),
);
vt.put("a".to_string(), Some(10), 100).await;
vt.put("a".to_string(), Some(20), 200).await;
stores.insert(Box::new(vt));
let buf = crate::store::join_grace_buffer::JoinGraceBufferStore::<String, i64>::in_memory(
"buf".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"app-buf-changelog".into(),
);
stores.insert(Box::new(buf));
stores
}
async fn run_one(
proc: &mut KStreamKTableJoinGraceProcessor<
String,
i64,
i64,
i64,
impl Fn(&i64, Option<&i64>) -> i64 + Send + 'static,
>,
stores: &mut StoreRegistry,
key: &str,
value: i64,
ts: i64,
) -> Vec<(i64, i64)> {
let children = [0usize];
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let globals = crate::runtime::global::GlobalStateManager::default();
let rc = RecordContext {
topic: "in".into(),
partition: 0,
offset: 0,
timestamp: ts,
};
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, i64>::new(&mut dispatch);
proc.process(&mut ctx, Record::new(Some(key.to_string()), value, ts))
.await;
buffer
.drain(..)
.map(|(_, rec)| (*rec.value.downcast::<i64>().unwrap(), rec.timestamp))
.collect()
}
#[tokio::test]
async fn grace_drains_in_order_with_asof() {
let mut stores = make_stores().await;
let mut proc = KStreamKTableJoinGraceProcessor {
table_store: "vt".into(),
buffer_store: "buf".into(),
grace_ms: 50,
joiner: |v: &i64, vt: Option<&i64>| v + vt.copied().unwrap_or(0),
emit_on_miss: false, observed_stream_time: i64::MIN,
_pd: PhantomData::<fn() -> (String, i64, i64, i64)>,
};
let mut all: Vec<(i64, i64)> = Vec::new();
all.extend(run_one(&mut proc, &mut stores, "a", 1, 150).await);
all.extend(run_one(&mut proc, &mut stores, "a", 1, 260).await);
all.extend(run_one(&mut proc, &mut stores, "a", 1, 300).await);
all.extend(run_one(&mut proc, &mut stores, "a", 1, 320).await);
check!(all == vec![(11, 150), (21, 260)]);
}
}