use std::any::Any;
use async_trait::async_trait;
use crate::processor::api::{Processor, ProcessorContext};
use crate::processor::record::{Record, RecordContext};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FixedKeyRecord<K, V> {
pub key: K,
pub value: V,
pub timestamp: i64,
}
impl<K, V> FixedKeyRecord<K, V> {
#[must_use]
pub fn with_value<V2>(self, value: V2) -> FixedKeyRecord<K, V2> {
FixedKeyRecord {
key: self.key,
value,
timestamp: self.timestamp,
}
}
}
pub struct FixedKeyProcessorContext<'a, 'ctx, 'd, K, VOut> {
inner: &'a mut ProcessorContext<'ctx, 'd, K, VOut>,
}
impl<'a, 'ctx, 'd, K, VOut> FixedKeyProcessorContext<'a, 'ctx, 'd, K, VOut>
where
K: Any + Send + Clone,
VOut: Any + Send + Clone,
{
pub(crate) fn new(inner: &'a mut ProcessorContext<'ctx, 'd, K, VOut>) -> Self {
Self { inner }
}
pub fn forward(&mut self, record: FixedKeyRecord<K, VOut>) {
self.inner.forward(Record::new(
Some(record.key),
record.value,
record.timestamp,
));
}
pub fn get_state_store<K2: Send + Sync + 'static, V2: Send + 'static>(
&mut self,
name: &str,
) -> Option<&mut dyn crate::store::api::KeyValueStore<K2, V2>> {
self.inner.get_state_store::<K2, V2>(name)
}
#[must_use]
pub fn record_context(&self) -> &RecordContext {
self.inner.record_context()
}
}
#[async_trait]
pub trait FixedKeyProcessor<KIn: Send, VIn: Send, VOut: Send>: Send + 'static {
async fn process(
&mut self,
ctx: &mut FixedKeyProcessorContext<'_, '_, '_, KIn, VOut>,
record: FixedKeyRecord<KIn, VIn>,
);
}
#[async_trait]
impl<KIn, VIn, VOut> FixedKeyProcessor<KIn, VIn, VOut>
for Box<dyn FixedKeyProcessor<KIn, VIn, VOut>>
where
KIn: Send + 'static,
VIn: Send + 'static,
VOut: Send + 'static,
{
async fn process(
&mut self,
ctx: &mut FixedKeyProcessorContext<'_, '_, '_, KIn, VOut>,
record: FixedKeyRecord<KIn, VIn>,
) {
(**self).process(ctx, record).await;
}
}
pub trait FixedKeyProcessorSupplier<KIn, VIn, VOut>: Send + Sync + 'static {
fn get(&self) -> Box<dyn FixedKeyProcessor<KIn, VIn, VOut>>;
}
impl<F, P, KIn, VIn, VOut> FixedKeyProcessorSupplier<KIn, VIn, VOut> for F
where
F: Fn() -> P + Send + Sync + 'static,
KIn: Send,
VIn: Send,
VOut: Send,
P: FixedKeyProcessor<KIn, VIn, VOut>,
{
fn get(&self) -> Box<dyn FixedKeyProcessor<KIn, VIn, VOut>> {
Box::new(self())
}
}
pub(crate) struct FixedKeyAdapter<P> {
pub inner: P,
}
#[async_trait]
impl<P, KIn, VIn, VOut> Processor<KIn, VIn, KIn, VOut> for FixedKeyAdapter<P>
where
KIn: Any + Send + Clone + 'static,
VIn: Send + 'static,
VOut: Any + Send + Clone + 'static,
P: FixedKeyProcessor<KIn, VIn, VOut>,
{
async fn process(
&mut self,
ctx: &mut ProcessorContext<'_, '_, KIn, VOut>,
record: Record<KIn, VIn>,
) {
let key = record.key.expect("process_values requires a non-null key");
let fkr = FixedKeyRecord {
key,
value: record.value,
timestamp: record.timestamp,
};
let mut fk_ctx = FixedKeyProcessorContext::new(ctx);
self.inner.process(&mut fk_ctx, fkr).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::processor::erased::{Dispatch, ErasedRecord};
use crate::processor::record::{Record, RecordContext};
use assert2::check;
use std::collections::VecDeque;
#[test]
fn with_value_keeps_key_and_timestamp() {
let r = FixedKeyRecord {
key: "k".to_string(),
value: 1i32,
timestamp: 42,
};
let r2 = r.with_value("v".to_string());
check!(r2.key == "k");
check!(r2.value == "v");
check!(r2.timestamp == 42);
}
struct UpperValue;
crate::impl_fixed_key_processor! {
impl UpperValue: (String, String) -> String {
async fn process(&mut self, ctx, r) {
let v = r.value.clone();
ctx.forward(r.with_value(v.to_uppercase()));
}
}
}
#[tokio::test]
async fn adapter_bridges_fixed_key_processor_preserving_key() {
let mut adapter = FixedKeyAdapter { inner: UpperValue };
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let rc = RecordContext {
topic: "t".into(),
partition: 0,
offset: 0,
timestamp: 5,
};
let children = [1usize];
let mut stores = crate::store::registry::StoreRegistry::default();
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut dispatch);
adapter
.process(&mut ctx, Record::new(Some("k".into()), "hi".into(), 5))
.await;
check!(buffer.len() == 1);
let (child, rec) = buffer.pop_front().unwrap();
check!(child == 1);
let key = rec.key.expect("forwarded record must carry the key");
check!(*key.downcast::<String>().unwrap() == "k");
check!(*rec.value.downcast::<String>().unwrap() == "HI");
check!(rec.timestamp == 5);
}
#[tokio::test]
async fn supplier_blanket_impl_boxes_processor() {
let supplier = || UpperValue;
let mut boxed = FixedKeyProcessorSupplier::get(&supplier);
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let rc = RecordContext {
topic: "t".into(),
partition: 0,
offset: 0,
timestamp: 7,
};
let children = [2usize];
let mut stores = crate::store::registry::StoreRegistry::default();
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut dispatch = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut dispatch);
let mut fk_ctx = FixedKeyProcessorContext::new(&mut ctx);
boxed
.process(
&mut fk_ctx,
FixedKeyRecord {
key: "k".to_string(),
value: "lo".to_string(),
timestamp: 7,
},
)
.await;
check!(buffer.len() == 1);
let (_child, rec) = buffer.pop_front().unwrap();
check!(*rec.value.downcast::<String>().unwrap() == "LO");
}
}