use std::marker::PhantomData;
use async_trait::async_trait;
use crate::dsl::processors::change::Change;
use crate::processor::api::{Processor, ProcessorContext};
use crate::processor::record::Record;
type Marker<T> = PhantomData<fn() -> T>;
#[allow(dead_code)]
pub(crate) struct KTableSourceProcessor<K, V> {
pub store_name: String,
pub _pd: Marker<(K, V)>,
}
#[async_trait]
impl<K, V> Processor<K, V, K, Change<V>> for KTableSourceProcessor<K, V>
where
K: std::any::Any + Send + Sync + Clone,
V: std::any::Any + Send + Clone,
{
async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, K, Change<V>>, r: Record<K, V>) {
let key = r.key.expect("KTable source requires a non-null key");
let old = {
let store = ctx
.get_state_store::<K, V>(&self.store_name)
.expect("KTable source store not found");
let old = store.get(&key).await;
store.put(key.clone(), r.value.clone()).await;
old
};
ctx.forward(Record::new(
Some(key),
Change::update(old, r.value),
r.timestamp,
));
}
}
#[allow(dead_code)]
pub(crate) struct KStreamToTableProcessor<K, V> {
pub store_name: String,
pub _pd: Marker<(K, V)>,
}
#[async_trait]
impl<K, V> Processor<K, V, K, Change<V>> for KStreamToTableProcessor<K, V>
where
K: std::any::Any + Send + Sync + Clone,
V: std::any::Any + Send + Clone,
{
async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, K, Change<V>>, r: Record<K, V>) {
let key = r.key.expect("to_table requires a non-null key");
let old = {
let store = ctx
.get_state_store::<K, V>(&self.store_name)
.expect("to_table store not found");
let old = store.get(&key).await;
store.put(key.clone(), r.value.clone()).await;
old
};
ctx.forward(Record::new(
Some(key),
Change::update(old, r.value),
r.timestamp,
));
}
}
#[allow(dead_code)]
pub(crate) struct KTableToStreamProcessor<K, V> {
pub _pd: Marker<(K, V)>,
}
#[async_trait]
impl<K, V> Processor<K, Change<V>, K, V> for KTableToStreamProcessor<K, V>
where
K: std::any::Any + Send + Sync + Clone,
V: std::any::Any + Send + Clone,
{
async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, K, V>, r: Record<K, Change<V>>) {
if let Some(new) = r.value.new {
ctx.forward(Record::new(r.key, new, r.timestamp));
}
}
}
#[allow(dead_code)]
pub(crate) struct KTableMapValuesProcessor<K, V, V2, F> {
pub f: F,
pub store_name: String,
pub _pd: Marker<(K, V, V2)>,
}
#[async_trait]
impl<K, V, V2, F> Processor<K, Change<V>, K, Change<V2>> for KTableMapValuesProcessor<K, V, V2, F>
where
K: std::any::Any + Send + Sync + Clone,
V: Send + 'static,
V2: std::any::Any + Send + Clone,
F: Fn(&V) -> V2 + Send + 'static,
{
async fn process(
&mut self,
ctx: &mut ProcessorContext<'_, '_, K, Change<V2>>,
r: Record<K, Change<V>>,
) {
let key = r.key.expect("KTable map_values requires a non-null key");
let mapped = r.value.map(|v| (self.f)(v));
{
let store = ctx
.get_state_store::<K, V2>(&self.store_name)
.expect("KTable map_values store not found");
match &mapped.new {
Some(nv) => {
store.put(key.clone(), nv.clone()).await;
}
None => {
store.delete(&key).await;
}
}
}
ctx.forward(Record::new(Some(key), mapped, r.timestamp));
}
}
#[allow(dead_code)]
pub(crate) struct KTableMapValuesViewProcessor<K, V, V2, F> {
pub f: F,
pub _pd: Marker<(K, V, V2)>,
}
#[async_trait]
impl<K, V, V2, F> Processor<K, Change<V>, K, Change<V2>>
for KTableMapValuesViewProcessor<K, V, V2, F>
where
K: std::any::Any + Send + Sync + Clone,
V: Send + 'static,
V2: std::any::Any + Send + Clone,
F: Fn(&V) -> V2 + Send + 'static,
{
async fn process(
&mut self,
ctx: &mut ProcessorContext<'_, '_, K, Change<V2>>,
r: Record<K, Change<V>>,
) {
ctx.forward(Record::new(
r.key,
r.value.map(|v| (self.f)(v)),
r.timestamp,
));
}
}
#[allow(dead_code)]
pub(crate) struct KTableFilterProcessor<K, V, P> {
pub predicate: P,
pub store_name: String,
pub _pd: Marker<(K, V)>,
}
#[async_trait]
impl<K, V, P> Processor<K, Change<V>, K, Change<V>> for KTableFilterProcessor<K, V, P>
where
K: std::any::Any + Send + Sync + Clone,
V: std::any::Any + Send + Clone,
P: Fn(&K, &V) -> bool + Send + 'static,
{
async fn process(
&mut self,
ctx: &mut ProcessorContext<'_, '_, K, Change<V>>,
r: Record<K, Change<V>>,
) {
let key = r.key.expect("KTable filter requires a non-null key");
let pred = &self.predicate;
let old_p = r.value.old.filter(|v| pred(&key, v));
let new_p = r.value.new.filter(|v| pred(&key, v));
{
let store = ctx
.get_state_store::<K, V>(&self.store_name)
.expect("KTable filter store not found");
match &new_p {
Some(nv) => {
store.put(key.clone(), nv.clone()).await;
}
None => {
store.delete(&key).await;
}
}
}
if new_p.is_some() || old_p.is_some() {
ctx.forward(Record::new(
Some(key),
Change {
old: old_p,
new: new_p,
},
r.timestamp,
));
}
}
}
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use assert2::check;
use super::*;
use crate::processor::api::ProcessorContext;
use crate::processor::erased::{Dispatch, ErasedRecord};
use crate::processor::record::RecordContext;
use crate::processor::serde::{I64Serde, StringSerde};
use crate::store::kv::KeyValueBytesStore;
use crate::store::registry::StoreRegistry;
fn make_stores() -> StoreRegistry {
let mut stores = StoreRegistry::default();
stores.insert(Box::new(KeyValueBytesStore::<String, i64>::in_memory(
"tbl".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"tbl-changelog".into(),
)));
stores
}
fn rc() -> RecordContext {
RecordContext {
topic: "in".into(),
partition: 0,
offset: 0,
timestamp: 0,
}
}
#[tokio::test]
async fn ktable_source_materializes_and_forwards() {
let mut stores = make_stores();
let children = [0usize];
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let rc = rc();
let mut proc = KTableSourceProcessor::<String, i64> {
store_name: "tbl".into(),
_pd: PhantomData,
};
{
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, Change<i64>>::new(&mut dispatch);
proc.process(&mut ctx, Record::new(Some("k".into()), 42i64, 1))
.await;
}
let (_, rec) = buffer.pop_front().unwrap();
let change = rec.value.downcast::<Change<i64>>().unwrap();
check!(change.old.is_none());
check!(change.new == Some(42i64));
check!(
stores
.get_kv::<String, i64>("tbl")
.unwrap()
.get(&"k".to_string())
.await
== Some(42)
);
}
#[tokio::test]
async fn ktable_to_stream_extracts_new_and_drops_tombstones() {
let mut stores = make_stores();
let children = [0usize];
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let rc = rc();
let mut proc = KTableToStreamProcessor::<String, i64> { _pd: PhantomData };
{
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, i64>::new(&mut dispatch);
proc.process(
&mut ctx,
Record::new(Some("k".into()), Change::update(Some(1), 7i64), 5),
)
.await;
}
let (_, rec) = buffer.pop_front().unwrap();
check!(*rec.value.downcast::<i64>().unwrap() == 7i64);
{
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, i64>::new(&mut dispatch);
proc.process(
&mut ctx,
Record::new(Some("k".into()), Change::tombstone(Some(7i64)), 6),
)
.await;
}
check!(buffer.is_empty(), "tombstone must not reach the KStream");
}
#[tokio::test]
async fn ktable_map_values_rewrites_and_materializes() {
let mut stores = StoreRegistry::default();
stores.insert(Box::new(KeyValueBytesStore::<String, String>::in_memory(
"mv".into(),
Box::new(StringSerde),
Box::new(StringSerde),
"mv-changelog".into(),
)));
let children = [0usize];
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let rc = rc();
let mut proc = KTableMapValuesProcessor::<String, i64, String, _> {
f: |v: &i64| v.to_string(),
store_name: "mv".into(),
_pd: PhantomData,
};
let mut stores2 = StoreRegistry::default();
stores2.insert(Box::new(KeyValueBytesStore::<String, String>::in_memory(
"mv".into(),
Box::new(StringSerde),
Box::new(StringSerde),
"mv-changelog".into(),
)));
{
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores2,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, Change<String>>::new(&mut dispatch);
proc.process(
&mut ctx,
Record::new(Some("k".into()), Change::update(Some(8i64), 9i64), 0),
)
.await;
}
let (_, rec) = buffer.pop_front().unwrap();
let change = rec.value.downcast::<Change<String>>().unwrap();
check!(change.old == Some("8".to_string()));
check!(change.new == Some("9".to_string()));
check!(
stores2
.get_kv::<String, String>("mv")
.unwrap()
.get(&"k".to_string())
.await
== Some("9".to_string())
);
{
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores2,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, Change<String>>::new(&mut dispatch);
proc.process(
&mut ctx,
Record::new(Some("k".into()), Change::tombstone(Some(9i64)), 1),
)
.await;
}
let (_, rec) = buffer.pop_front().unwrap();
let change = rec.value.downcast::<Change<String>>().unwrap();
check!(change.new.is_none());
check!(
stores2
.get_kv::<String, String>("mv")
.unwrap()
.get(&"k".to_string())
.await
.is_none()
);
}
#[tokio::test]
async fn ktable_map_values_view_rewrites_without_a_store() {
let mut stores = StoreRegistry::default();
let children = [0usize];
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let rc = rc();
let mut proc = KTableMapValuesViewProcessor::<String, i64, String, _> {
f: |v: &i64| v.to_string(),
_pd: PhantomData,
};
{
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, Change<String>>::new(&mut dispatch);
proc.process(
&mut ctx,
Record::new(Some("k".into()), Change::update(Some(8i64), 9i64), 0),
)
.await;
}
let (_, rec) = buffer.pop_front().unwrap();
let change = rec.value.downcast::<Change<String>>().unwrap();
check!(change.old == Some("8".to_string()));
check!(change.new == Some("9".to_string()));
check!(stores.names().is_empty());
}
#[tokio::test]
#[allow(clippy::too_many_lines)]
async fn ktable_filter_materializes_matches_and_emits_tombstones() {
let mut stores = make_stores();
let children = [0usize];
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let rc = rc();
stores
.get_kv::<String, i64>("tbl")
.unwrap()
.put("b".into(), 99)
.await;
let mut proc = KTableFilterProcessor::<String, i64, _> {
predicate: |_k: &String, v: &i64| *v > 10,
store_name: "tbl".into(),
_pd: PhantomData,
};
{
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, Change<i64>>::new(&mut dispatch);
proc.process(
&mut ctx,
Record::new(Some("a".into()), Change::update(None, 42i64), 1),
)
.await;
}
let (_, rec) = buffer.pop_front().unwrap();
let change = rec.value.downcast::<Change<i64>>().unwrap();
check!(change.old.is_none());
check!(change.new == Some(42i64));
check!(
stores
.get_kv::<String, i64>("tbl")
.unwrap()
.get(&"a".to_string())
.await
== Some(42)
);
{
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, Change<i64>>::new(&mut dispatch);
proc.process(
&mut ctx,
Record::new(Some("b".into()), Change::update(Some(99i64), 5i64), 2),
)
.await;
}
let (_, rec) = buffer.pop_front().unwrap();
let change = rec.value.downcast::<Change<i64>>().unwrap();
check!(change.old == Some(99i64), "old side survived the predicate");
check!(change.new.is_none(), "new side filtered out → tombstone");
check!(
stores
.get_kv::<String, i64>("tbl")
.unwrap()
.get(&"b".to_string())
.await
.is_none()
);
{
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, Change<i64>>::new(&mut dispatch);
proc.process(
&mut ctx,
Record::new(Some("c".into()), Change::update(Some(3i64), 4i64), 3),
)
.await;
}
check!(
buffer.is_empty(),
"never-matching row must not be forwarded"
);
}
}