use std::marker::PhantomData;
use async_trait::async_trait;
use crate::processor::api::{Processor, ProcessorContext};
use crate::processor::record::Record;
type Marker<T> = PhantomData<fn() -> T>;
pub(crate) struct KStreamKTableJoinProcessor<K, V, VT, VO, F> {
pub table_store: String,
pub joiner: F,
pub emit_on_miss: bool,
pub _pd: Marker<(K, V, VT, VO)>,
}
#[async_trait]
impl<K, V, VT, VO, F> Processor<K, V, K, VO> for KStreamKTableJoinProcessor<K, V, VT, VO, F>
where
K: std::any::Any + Send + Sync + Clone,
V: Send + 'static,
VT: Send + 'static,
VO: std::any::Any + Send + Clone,
F: Fn(&V, Option<&VT>) -> VO + Send + 'static,
{
async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, K, VO>, r: Record<K, V>) {
let key = r.key.expect("join requires a non-null key");
let vt = match ctx.get_state_store::<K, VT>(&self.table_store) {
Some(s) => s.get(&key).await,
None => None,
};
if vt.is_some() || self.emit_on_miss {
let out = (self.joiner)(&r.value, vt.as_ref());
ctx.forward(Record::new(Some(key), out, r.timestamp));
}
}
}
#[allow(dead_code)]
pub(crate) struct KStreamKTableJoinAsOfProcessor<K, V, VT, VO, F> {
pub table_store: String,
pub joiner: F,
pub emit_on_miss: bool,
pub _pd: Marker<(K, V, VT, VO)>,
}
#[async_trait]
impl<K, V, VT, VO, F> Processor<K, V, K, VO> for KStreamKTableJoinAsOfProcessor<K, V, VT, VO, F>
where
K: std::any::Any + Send + Sync + Clone,
V: Send + 'static,
VT: Send + 'static,
VO: std::any::Any + Send + Clone,
F: Fn(&V, Option<&VT>) -> VO + Send + 'static,
{
async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, K, VO>, r: Record<K, V>) {
let key = r.key.expect("join requires a non-null key");
let ts = r.timestamp;
let vt = match ctx.get_versioned_store::<K, VT>(&self.table_store) {
Some(s) => s.get_as_of(&key, ts).await.map(|rec| rec.value),
None => None,
};
if vt.is_some() || self.emit_on_miss {
let out = (self.joiner)(&r.value, vt.as_ref());
ctx.forward(Record::new(Some(key), out, ts));
}
}
}
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use std::marker::PhantomData;
use assert2::check;
use super::*;
use crate::processor::api::ProcessorContext;
use crate::processor::erased::{Dispatch, ErasedRecord};
use crate::processor::record::RecordContext;
use crate::processor::serde::{I64Serde, StringSerde};
use crate::store::api::KeyValueStore;
use crate::store::kv::KeyValueBytesStore;
use crate::store::registry::StoreRegistry;
use crate::store::versioned::{VersionedBytesStore, VersionedKeyValueStore};
async fn make_stores() -> StoreRegistry {
let mut stores = StoreRegistry::default();
let mut kv = KeyValueBytesStore::<String, i64>::in_memory(
"t".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"app-t-changelog".into(),
);
kv.put("a".to_string(), 10i64).await;
stores.insert(Box::new(kv));
stores
}
async fn run_one(
proc: &mut KStreamKTableJoinProcessor<
String,
i64,
i64,
i64,
impl Fn(&i64, Option<&i64>) -> i64 + Send + 'static,
>,
stores: &mut StoreRegistry,
key: &str,
value: i64,
) -> Option<i64> {
let children = [0usize];
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let globals = crate::runtime::global::GlobalStateManager::default();
let rc = RecordContext {
topic: "in".into(),
partition: 0,
offset: 0,
timestamp: 0,
};
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, i64>::new(&mut dispatch);
proc.process(&mut ctx, Record::new(Some(key.to_string()), value, 0))
.await;
buffer
.pop_front()
.map(|(_, rec)| *rec.value.downcast::<i64>().unwrap())
}
#[tokio::test]
async fn inner_join_hit_and_miss() {
let mut stores = make_stores().await;
let mut proc = KStreamKTableJoinProcessor {
table_store: "t".into(),
joiner: |v: &i64, vt: Option<&i64>| v + vt.copied().unwrap_or(0),
emit_on_miss: false, _pd: PhantomData::<fn() -> (String, i64, i64, i64)>,
};
let hit = run_one(&mut proc, &mut stores, "a", 1).await;
check!(hit == Some(11));
let miss = run_one(&mut proc, &mut stores, "b", 2).await;
check!(miss == None);
}
#[tokio::test]
async fn left_join_hit_and_miss() {
let mut stores = make_stores().await;
let mut proc = KStreamKTableJoinProcessor {
table_store: "t".into(),
joiner: |v: &i64, vt: Option<&i64>| v + vt.copied().unwrap_or(0),
emit_on_miss: true, _pd: PhantomData::<fn() -> (String, i64, i64, i64)>,
};
let hit = run_one(&mut proc, &mut stores, "a", 1).await;
check!(hit == Some(11));
let miss = run_one(&mut proc, &mut stores, "b", 2).await;
check!(miss == Some(2));
}
async fn make_versioned_stores() -> StoreRegistry {
let mut stores = StoreRegistry::default();
let mut v = VersionedBytesStore::<String, i64>::in_memory(
"vt".into(),
1_000_000,
Box::new(StringSerde),
Box::new(I64Serde),
"app-vt-changelog".into(),
);
v.put("a".to_string(), Some(10), 100).await;
v.put("a".to_string(), Some(20), 200).await;
stores.insert(Box::new(v));
stores
}
async fn run_one_asof(
proc: &mut KStreamKTableJoinAsOfProcessor<
String,
i64,
i64,
i64,
impl Fn(&i64, Option<&i64>) -> i64 + Send + 'static,
>,
stores: &mut StoreRegistry,
key: &str,
value: i64,
ts: i64,
) -> Option<i64> {
let children = [0usize];
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let globals = crate::runtime::global::GlobalStateManager::default();
let rc = RecordContext {
topic: "in".into(),
partition: 0,
offset: 0,
timestamp: ts,
};
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, i64>::new(&mut dispatch);
proc.process(&mut ctx, Record::new(Some(key.to_string()), value, ts))
.await;
buffer
.pop_front()
.map(|(_, rec)| *rec.value.downcast::<i64>().unwrap())
}
#[tokio::test]
async fn asof_inner_picks_version_at_record_ts() {
let mut stores = make_versioned_stores().await;
let mut proc = KStreamKTableJoinAsOfProcessor {
table_store: "vt".into(),
joiner: |v: &i64, vt: Option<&i64>| v + vt.copied().unwrap_or(0),
emit_on_miss: false,
_pd: PhantomData::<fn() -> (String, i64, i64, i64)>,
};
check!(run_one_asof(&mut proc, &mut stores, "a", 1, 150).await == Some(11));
check!(run_one_asof(&mut proc, &mut stores, "a", 1, 250).await == Some(21));
check!(run_one_asof(&mut proc, &mut stores, "a", 1, 50).await == None);
}
#[tokio::test]
async fn asof_left_emits_none_on_miss() {
let mut stores = make_versioned_stores().await;
let mut proc = KStreamKTableJoinAsOfProcessor {
table_store: "vt".into(),
joiner: |v: &i64, vt: Option<&i64>| v + vt.copied().unwrap_or(0),
emit_on_miss: true,
_pd: PhantomData::<fn() -> (String, i64, i64, i64)>,
};
check!(run_one_asof(&mut proc, &mut stores, "a", 1, 50).await == Some(1));
}
}