1use std::any::Any;
12
13use async_trait::async_trait;
14
15use crate::processor::api::{Processor, ProcessorContext};
16use crate::processor::record::{Record, RecordContext};
17
18#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct FixedKeyRecord<K, V> {
25 pub key: K,
26 pub value: V,
27 pub timestamp: i64,
28}
29
30impl<K, V> FixedKeyRecord<K, V> {
31 #[must_use]
33 pub fn with_value<V2>(self, value: V2) -> FixedKeyRecord<K, V2> {
34 FixedKeyRecord {
35 key: self.key,
36 value,
37 timestamp: self.timestamp,
38 }
39 }
40}
41
42pub struct FixedKeyProcessorContext<'a, 'ctx, 'd, K, VOut> {
46 inner: &'a mut ProcessorContext<'ctx, 'd, K, VOut>,
47}
48
49impl<'a, 'ctx, 'd, K, VOut> FixedKeyProcessorContext<'a, 'ctx, 'd, K, VOut>
50where
51 K: Any + Send + Clone,
52 VOut: Any + Send + Clone,
53{
54 pub(crate) fn new(inner: &'a mut ProcessorContext<'ctx, 'd, K, VOut>) -> Self {
56 Self { inner }
57 }
58
59 pub fn forward(&mut self, record: FixedKeyRecord<K, VOut>) {
62 self.inner.forward(Record::new(
63 Some(record.key),
64 record.value,
65 record.timestamp,
66 ));
67 }
68
69 pub fn get_state_store<K2: Send + Sync + 'static, V2: Send + 'static>(
73 &mut self,
74 name: &str,
75 ) -> Option<&mut dyn crate::store::api::KeyValueStore<K2, V2>> {
76 self.inner.get_state_store::<K2, V2>(name)
77 }
78
79 #[must_use]
81 pub fn record_context(&self) -> &RecordContext {
82 self.inner.record_context()
83 }
84}
85
86#[async_trait]
90pub trait FixedKeyProcessor<KIn: Send, VIn: Send, VOut: Send>: Send + 'static {
91 async fn process(
92 &mut self,
93 ctx: &mut FixedKeyProcessorContext<'_, '_, '_, KIn, VOut>,
94 record: FixedKeyRecord<KIn, VIn>,
95 );
96}
97
98#[async_trait]
104impl<KIn, VIn, VOut> FixedKeyProcessor<KIn, VIn, VOut>
105 for Box<dyn FixedKeyProcessor<KIn, VIn, VOut>>
106where
107 KIn: Send + 'static,
108 VIn: Send + 'static,
109 VOut: Send + 'static,
110{
111 async fn process(
112 &mut self,
113 ctx: &mut FixedKeyProcessorContext<'_, '_, '_, KIn, VOut>,
114 record: FixedKeyRecord<KIn, VIn>,
115 ) {
116 (**self).process(ctx, record).await;
117 }
118}
119
120pub trait FixedKeyProcessorSupplier<KIn, VIn, VOut>: Send + Sync + 'static {
124 fn get(&self) -> Box<dyn FixedKeyProcessor<KIn, VIn, VOut>>;
125}
126
127impl<F, P, KIn, VIn, VOut> FixedKeyProcessorSupplier<KIn, VIn, VOut> for F
128where
129 F: Fn() -> P + Send + Sync + 'static,
130 KIn: Send,
131 VIn: Send,
132 VOut: Send,
133 P: FixedKeyProcessor<KIn, VIn, VOut>,
134{
135 fn get(&self) -> Box<dyn FixedKeyProcessor<KIn, VIn, VOut>> {
136 Box::new(self())
137 }
138}
139
140pub(crate) struct FixedKeyAdapter<P> {
144 pub inner: P,
145}
146
147#[async_trait]
148impl<P, KIn, VIn, VOut> Processor<KIn, VIn, KIn, VOut> for FixedKeyAdapter<P>
149where
150 KIn: Any + Send + Clone + 'static,
151 VIn: Send + 'static,
152 VOut: Any + Send + Clone + 'static,
153 P: FixedKeyProcessor<KIn, VIn, VOut>,
154{
155 async fn process(
156 &mut self,
157 ctx: &mut ProcessorContext<'_, '_, KIn, VOut>,
158 record: Record<KIn, VIn>,
159 ) {
160 let key = record.key.expect("process_values requires a non-null key");
161 let fkr = FixedKeyRecord {
162 key,
163 value: record.value,
164 timestamp: record.timestamp,
165 };
166 let mut fk_ctx = FixedKeyProcessorContext::new(ctx);
167 self.inner.process(&mut fk_ctx, fkr).await;
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use crate::processor::erased::{Dispatch, ErasedRecord};
175 use crate::processor::record::{Record, RecordContext};
176 use assert2::check;
177 use std::collections::VecDeque;
178
179 #[test]
180 fn with_value_keeps_key_and_timestamp() {
181 let r = FixedKeyRecord {
182 key: "k".to_string(),
183 value: 1i32,
184 timestamp: 42,
185 };
186 let r2 = r.with_value("v".to_string());
187 check!(r2.key == "k");
188 check!(r2.value == "v");
189 check!(r2.timestamp == 42);
190 }
191
192 struct UpperValue;
194 crate::impl_fixed_key_processor! {
195 impl UpperValue: (String, String) -> String {
196 async fn process(&mut self, ctx, r) {
197 let v = r.value.clone();
198 ctx.forward(r.with_value(v.to_uppercase()));
199 }
200 }
201 }
202
203 #[tokio::test]
204 async fn adapter_bridges_fixed_key_processor_preserving_key() {
205 let mut adapter = FixedKeyAdapter { inner: UpperValue };
208 let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
209 let mut output = Vec::new();
210 let rc = RecordContext {
211 topic: "t".into(),
212 partition: 0,
213 offset: 0,
214 timestamp: 5,
215 };
216 let children = [1usize];
217 let mut stores = crate::store::registry::StoreRegistry::default();
218 let globals = crate::runtime::global::GlobalStateManager::default();
219 let mut scheds = Vec::new();
220 let mut dispatch = Dispatch {
221 buffer: &mut buffer,
222 children: &children,
223 output: &mut output,
224 record_ctx: &rc,
225 stores: &mut stores,
226 globals: &globals,
227 node_idx: 0,
228 schedules: &mut scheds,
229 sched_stream_time: i64::MIN,
230 sched_wall_clock: 0,
231 };
232 let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut dispatch);
233 adapter
234 .process(&mut ctx, Record::new(Some("k".into()), "hi".into(), 5))
235 .await;
236
237 check!(buffer.len() == 1);
238 let (child, rec) = buffer.pop_front().unwrap();
239 check!(child == 1);
240 let key = rec.key.expect("forwarded record must carry the key");
242 check!(*key.downcast::<String>().unwrap() == "k");
243 check!(*rec.value.downcast::<String>().unwrap() == "HI");
244 check!(rec.timestamp == 5);
245 }
246
247 #[tokio::test]
248 async fn supplier_blanket_impl_boxes_processor() {
249 let supplier = || UpperValue;
252 let mut boxed = FixedKeyProcessorSupplier::get(&supplier);
253
254 let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
255 let mut output = Vec::new();
256 let rc = RecordContext {
257 topic: "t".into(),
258 partition: 0,
259 offset: 0,
260 timestamp: 7,
261 };
262 let children = [2usize];
263 let mut stores = crate::store::registry::StoreRegistry::default();
264 let globals = crate::runtime::global::GlobalStateManager::default();
265 let mut scheds = Vec::new();
266 let mut dispatch = Dispatch {
267 buffer: &mut buffer,
268 children: &children,
269 output: &mut output,
270 record_ctx: &rc,
271 stores: &mut stores,
272 globals: &globals,
273 node_idx: 0,
274 schedules: &mut scheds,
275 sched_stream_time: i64::MIN,
276 sched_wall_clock: 0,
277 };
278 let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut dispatch);
279 let mut fk_ctx = FixedKeyProcessorContext::new(&mut ctx);
280
281 boxed
283 .process(
284 &mut fk_ctx,
285 FixedKeyRecord {
286 key: "k".to_string(),
287 value: "lo".to_string(),
288 timestamp: 7,
289 },
290 )
291 .await;
292
293 check!(buffer.len() == 1);
294 let (_child, rec) = buffer.pop_front().unwrap();
295 check!(*rec.value.downcast::<String>().unwrap() == "LO");
296 }
297}