use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use bytes::Bytes;
use crate::dsl::processors::outer_join_store::{
TimeTracker, outer_key, outer_key_key_bytes, outer_key_side_left, outer_key_ts,
outer_value_decode, outer_value_left, outer_value_right,
};
use crate::processor::api::{Processor, ProcessorContext};
use crate::processor::record::Record;
use crate::processor::serde::Serde;
type Marker<T> = PhantomData<fn() -> T>;
pub(crate) struct KStreamKStreamJoinProcessor<K, VThis, VOther, VO, F> {
pub own_store: String,
pub other_store: String,
pub fetch_before: i64,
pub fetch_after: i64,
pub joiner: F,
pub side_left: bool,
pub emit_unmatched: bool,
pub outer_store: Option<String>,
pub tracker: Option<Arc<Mutex<TimeTracker>>>,
pub key_serde: Option<Box<dyn Serde<K>>>,
pub value_serde: Option<Box<dyn Serde<VThis>>>,
pub before_ms: i64,
pub after_ms: i64,
pub grace_ms: i64,
pub _pd: Marker<(K, VThis, VOther, VO)>,
}
#[async_trait]
impl<K, VThis, VOther, VO, F> Processor<K, VThis, K, VO>
for KStreamKStreamJoinProcessor<K, VThis, VOther, VO, F>
where
K: std::any::Any + Send + Sync + Clone,
VThis: std::any::Any + Send + Sync + Clone,
VOther: std::any::Any + Send + Sync + Clone,
VO: std::any::Any + Send + Clone,
F: Fn(&VThis, Option<&VOther>) -> VO + Send + 'static,
{
#[allow(clippy::too_many_lines)] async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, K, VO>, r: Record<K, VThis>) {
let key = r.key.expect("stream-stream join requires a non-null key");
let t = r.timestamp;
{
let own = ctx
.get_join_window_store::<K, VThis>(&self.own_store)
.expect("own join store not found");
own.put(key.clone(), t, r.value.clone()).await;
}
let matches: Vec<(i64, VOther)> = {
let other = ctx
.get_join_window_store::<K, VOther>(&self.other_store)
.expect("other join store not found");
other
.fetch(&key, t - self.fetch_before, t + self.fetch_after)
.await
};
let had_match = !matches.is_empty();
let match_ts: Vec<i64> = matches.iter().map(|(ts, _)| *ts).collect();
for (t_other, v_other) in matches {
let out = (self.joiner)(&r.value, Some(&v_other));
ctx.forward(Record::new(
Some(key.clone()),
out,
std::cmp::max(t, t_other),
));
}
let Some(os) = self.outer_store.clone() else {
return;
};
let tracker = self
.tracker
.clone()
.expect("left/outer requires a time tracker");
let key_serde = self
.key_serde
.as_ref()
.expect("left/outer requires a key serde");
let value_serde = self
.value_serde
.as_ref()
.expect("left/outer requires a value serde");
let kb = key_serde.serialize(&os, &key);
tracker.lock().expect("tracker lock").advance(t);
if !match_ts.is_empty() {
let osr = ctx
.get_state_store::<Bytes, Bytes>(&os)
.expect("outer join store not found");
for t_other in &match_ts {
osr.delete(&outer_key(*t_other, !self.side_left, &kb)).await;
}
}
if self.emit_unmatched && !had_match {
let st = tracker.lock().expect("tracker lock").stream_time;
if t + self.fetch_after < st {
let out = (self.joiner)(&r.value, None);
ctx.forward(Record::new(Some(key.clone()), out, t));
} else {
let raw = value_serde.serialize(&os, &r.value);
let tagged = if self.side_left {
outer_value_left(&raw)
} else {
outer_value_right(&raw)
};
let osr = ctx
.get_state_store::<Bytes, Bytes>(&os)
.expect("outer join store");
osr.put(outer_key(t, self.side_left, &kb), tagged).await;
}
}
let st = tracker.lock().expect("tracker lock").stream_time;
let lookback = if self.side_left {
self.after_ms
} else {
self.before_ms
};
let zero = Bytes::copy_from_slice(&0i64.to_be_bytes());
let hi = Bytes::copy_from_slice(&st.saturating_add(1).to_be_bytes());
let closed: Vec<(i64, Bytes, VThis, K)> = {
let osr = ctx
.get_state_store::<Bytes, Bytes>(&os)
.expect("outer join store");
osr.range(&zero, &hi)
.await
.into_iter()
.filter(|(k, _)| outer_key_side_left(k) == self.side_left)
.filter(|(k, _)| outer_key_ts(k) + lookback + self.grace_ms < st)
.map(|(k, v)| {
let (_is_left, raw) = outer_value_decode(&v);
let val = value_serde
.deserialize(&os, raw)
.expect("outer value deserialize");
let ekey = key_serde
.deserialize(&os, outer_key_key_bytes(&k))
.expect("outer key deserialize");
(outer_key_ts(&k), k, val, ekey)
})
.collect()
};
for (ets, ck, val, ekey) in closed {
{
let osr = ctx
.get_state_store::<Bytes, Bytes>(&os)
.expect("outer join store");
osr.delete(&ck).await;
}
ctx.forward(Record::new(Some(ekey), (self.joiner)(&val, None), ets));
}
}
}
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use bytes::Bytes;
use super::*;
use crate::processor::api::ProcessorContext;
use crate::processor::erased::{Dispatch, ErasedRecord};
use crate::processor::record::{Record, RecordContext};
use crate::processor::serde::{BytesSerde, StringSerde};
use crate::store::join_window::JoinWindowBytesStore;
use crate::store::kv::KeyValueBytesStore;
use crate::store::registry::StoreRegistry;
fn make_stores() -> StoreRegistry {
let mut stores = StoreRegistry::default();
stores.insert(Box::new(JoinWindowBytesStore::<String, String>::in_memory(
"this".into(),
Box::new(StringSerde),
Box::new(StringSerde),
"app-this-changelog".into(),
)));
stores.insert(Box::new(JoinWindowBytesStore::<String, String>::in_memory(
"other".into(),
Box::new(StringSerde),
Box::new(StringSerde),
"app-other-changelog".into(),
)));
stores
}
fn make_proc() -> KStreamKStreamJoinProcessor<
String,
String,
String,
String,
impl Fn(&String, Option<&String>) -> String,
> {
KStreamKStreamJoinProcessor {
own_store: "this".into(),
other_store: "other".into(),
fetch_before: 10,
fetch_after: 10,
joiner: |a: &String, b: Option<&String>| {
format!("{a}{}", b.cloned().unwrap_or_default())
},
side_left: true,
emit_unmatched: false,
outer_store: None,
tracker: None,
key_serde: None,
value_serde: None,
before_ms: 0,
after_ms: 0,
grace_ms: 0,
_pd: PhantomData::<fn() -> (String, String, String, String)>,
}
}
#[tokio::test]
async fn this_side_joins_records_in_window() {
let mut stores = make_stores();
{
let s = stores.get_join_window::<String, String>("other").unwrap();
s.put("k".into(), 3, "b1".into()).await;
s.put("k".into(), 50, "b2".into()).await;
}
let children = [0usize];
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let rc = RecordContext {
topic: "left".into(),
partition: 0,
offset: 0,
timestamp: 0,
};
let mut proc = make_proc();
{
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut d = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut d);
proc.process(&mut ctx, Record::new(Some("k".into()), "a".into(), 5))
.await;
}
assert_eq!(buffer.len(), 1);
let (_, rec) = buffer.pop_front().unwrap();
assert_eq!(*rec.value.downcast::<String>().unwrap(), "ab1");
assert_eq!(rec.timestamp, 5);
}
#[tokio::test]
async fn duplicates_emit_one_per_match() {
let mut stores = make_stores();
{
let s = stores.get_join_window::<String, String>("other").unwrap();
s.put("k".into(), 4, "x".into()).await;
s.put("k".into(), 4, "y".into()).await;
}
let children = [0usize];
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let rc = RecordContext {
topic: "left".into(),
partition: 0,
offset: 0,
timestamp: 0,
};
let mut proc = make_proc();
{
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut d = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut d);
proc.process(&mut ctx, Record::new(Some("k".into()), "a".into(), 5))
.await;
}
assert_eq!(buffer.len(), 2);
let (_, rec1) = buffer.pop_front().unwrap();
assert_eq!(*rec1.value.downcast::<String>().unwrap(), "ax");
assert_eq!(rec1.timestamp, 5);
let (_, rec2) = buffer.pop_front().unwrap();
assert_eq!(*rec2.value.downcast::<String>().unwrap(), "ay");
assert_eq!(rec2.timestamp, 5);
}
#[tokio::test]
async fn left_buffers_then_emits_on_close() {
let mut stores = make_stores();
stores.insert(Box::new(KeyValueBytesStore::<Bytes, Bytes>::in_memory(
"outer".into(),
Box::new(BytesSerde),
Box::new(BytesSerde),
"app-outer-changelog".into(),
)));
let tracker = Arc::new(Mutex::new(TimeTracker::default()));
let mut proc = KStreamKStreamJoinProcessor {
own_store: "this".into(),
other_store: "other".into(),
fetch_before: 10,
fetch_after: 10,
joiner: |a: &String, b: Option<&String>| {
format!("{a}{}", b.cloned().unwrap_or_default())
},
side_left: true,
emit_unmatched: true,
outer_store: Some("outer".into()),
tracker: Some(Arc::clone(&tracker)),
key_serde: Some(Box::new(StringSerde)),
value_serde: Some(Box::new(StringSerde)),
before_ms: 10,
after_ms: 10,
grace_ms: 0,
_pd: PhantomData::<fn() -> (String, String, String, String)>,
};
let children = [0usize];
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let rc = RecordContext {
topic: "left".into(),
partition: 0,
offset: 0,
timestamp: 0,
};
{
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut d = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut d);
proc.process(&mut ctx, Record::new(Some("k".into()), "a".into(), 5))
.await;
}
assert!(buffer.is_empty(), "expected buffered, got {}", buffer.len());
{
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut d = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut d);
proc.process(&mut ctx, Record::new(Some("k".into()), "z".into(), 100))
.await;
}
assert_eq!(buffer.len(), 1, "expected 1 close-emit");
let (_, rec) = buffer.pop_front().unwrap();
assert_eq!(*rec.value.downcast::<String>().unwrap(), "a");
assert_eq!(rec.timestamp, 5);
}
}