use std::marker::PhantomData;
use async_trait::async_trait;
use crate::processor::api::{Processor, ProcessorContext};
use crate::processor::record::Record;
type Marker<T> = PhantomData<fn() -> T>;
pub(crate) struct KStreamGlobalTableJoinProcessor<K, V, GK, VG, VR, KM, J> {
pub store_name: String,
pub key_mapper: KM,
pub joiner: J,
pub emit_on_miss: bool,
pub _pd: Marker<(K, V, GK, VG, VR)>,
}
#[async_trait]
impl<K, V, GK, VG, VR, KM, J> Processor<K, V, K, VR>
for KStreamGlobalTableJoinProcessor<K, V, GK, VG, VR, KM, J>
where
K: std::any::Any + Send + Sync + Clone,
V: Send + 'static,
GK: Send + Sync + 'static,
VG: Send + 'static,
VR: std::any::Any + Send + Clone,
KM: Fn(&K, &V) -> GK + Send + 'static,
J: Fn(&V, Option<&VG>) -> VR + Send + 'static,
{
async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, K, VR>, r: Record<K, V>) {
let k = r.key.expect("global join requires a non-null stream key");
let gk = (self.key_mapper)(&k, &r.value);
let looked = ctx.global_get::<GK, VG>(&self.store_name, &gk).await;
if looked.is_some() || self.emit_on_miss {
let out = (self.joiner)(&r.value, looked.as_ref());
ctx.forward(Record::new(Some(k), out, r.timestamp));
}
}
}
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use std::marker::PhantomData;
use assert2::check;
use super::*;
use crate::processor::api::ProcessorContext;
use crate::processor::erased::{Dispatch, ErasedRecord};
use crate::processor::record::RecordContext;
use crate::processor::serde::{Consumed, StringSerde};
use crate::runtime::global::GlobalStateManager;
use crate::store::backend::StoreBackend;
use crate::store::registry::StoreRegistry;
use crate::topology::{NodeHandle, Topology};
async fn make_globals() -> GlobalStateManager {
let mut t = Topology::new();
t.add_global_store::<String, String, _, _>(
"g-store",
"g-src",
"g-topic",
"g-proc",
Consumed::with(StringSerde, StringSerde),
);
let src: NodeHandle<String, String> = t.add_source("src", ["in"]);
t.add_sink("snk", "out", [&src]);
let built = t.build("app").unwrap();
let globals = GlobalStateManager::build(
built.global_store_factories(),
built.global_store_topics(),
&StoreBackend::InMemory,
"app",
)
.await;
globals
.put("g-store", "v".to_string(), "gv".to_string())
.await;
globals
}
async fn run_one(
proc: &mut KStreamGlobalTableJoinProcessor<
String,
String,
String,
String,
String,
impl Fn(&String, &String) -> String + Send + 'static,
impl Fn(&String, Option<&String>) -> String + Send + 'static,
>,
globals: &GlobalStateManager,
key: &str,
value: &str,
) -> Option<String> {
let children = [0usize];
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let mut stores = StoreRegistry::default();
let rc = RecordContext {
topic: "in".into(),
partition: 0,
offset: 0,
timestamp: 0,
};
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut dispatch);
proc.process(
&mut ctx,
Record::new(Some(key.to_string()), value.to_string(), 0),
)
.await;
buffer
.pop_front()
.map(|(_, rec)| *rec.value.downcast::<String>().unwrap())
}
#[tokio::test]
async fn inner_join_hit_uses_derived_key_and_keeps_stream_key() {
let globals = make_globals().await;
let mut stores = StoreRegistry::default();
let mut proc = KStreamGlobalTableJoinProcessor {
store_name: "g-store".into(),
key_mapper: |_k: &String, v: &String| v.clone(),
joiner: |v: &String, vg: Option<&String>| {
format!("{v}+{}", vg.cloned().unwrap_or_else(|| "<none>".into()))
},
emit_on_miss: false, _pd: PhantomData::<fn() -> (String, String, String, String, String)>,
};
let children = [0usize];
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let rc = RecordContext {
topic: "in".into(),
partition: 0,
offset: 0,
timestamp: 7,
};
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut dispatch);
proc.process(
&mut ctx,
Record::new(Some("k".to_string()), "v".to_string(), 7),
)
.await;
let (_child, rec) = buffer.pop_front().expect("inner hit should forward");
check!(*rec.key.unwrap().downcast::<String>().unwrap() == "k"); check!(*rec.value.downcast::<String>().unwrap() == "v+gv");
check!(rec.timestamp == 7); }
#[tokio::test]
async fn inner_join_miss_drops() {
let globals = make_globals().await;
let mut proc = KStreamGlobalTableJoinProcessor {
store_name: "g-store".into(),
key_mapper: |_k: &String, v: &String| v.clone(),
joiner: |v: &String, vg: Option<&String>| {
format!("{v}+{}", vg.cloned().unwrap_or_else(|| "<none>".into()))
},
emit_on_miss: false, _pd: PhantomData::<fn() -> (String, String, String, String, String)>,
};
let out = run_one(&mut proc, &globals, "k", "absent").await;
check!(out == None);
}
#[tokio::test]
async fn left_join_emits_on_miss_with_none() {
let globals = make_globals().await;
let mut proc = KStreamGlobalTableJoinProcessor {
store_name: "g-store".into(),
key_mapper: |_k: &String, v: &String| v.clone(),
joiner: |v: &String, vg: Option<&String>| {
format!("{v}+{}", vg.cloned().unwrap_or_else(|| "<none>".into()))
},
emit_on_miss: true, _pd: PhantomData::<fn() -> (String, String, String, String, String)>,
};
let hit = run_one(&mut proc, &globals, "k", "v").await;
check!(hit == Some("v+gv".to_string()));
let miss = run_one(&mut proc, &globals, "k", "absent").await;
check!(miss == Some("absent+<none>".to_string()));
}
}