1use std::any::Any;
5use std::marker::PhantomData;
6
7use async_trait::async_trait;
8
9use super::erased::{Dispatch, ErasedRecord};
10use super::record::{Record, RecordContext};
11
12#[async_trait]
21pub trait Processor<KIn: Send, VIn: Send, KOut: Send, VOut: Send>: Send + 'static {
22 async fn init(&mut self, _ctx: &mut ProcessorContext<'_, '_, KOut, VOut>) {}
23 async fn process(
24 &mut self,
25 ctx: &mut ProcessorContext<'_, '_, KOut, VOut>,
26 record: Record<KIn, VIn>,
27 );
28 async fn close(&mut self) {}
29}
30
31#[async_trait]
39impl<KIn, VIn, KOut, VOut> Processor<KIn, VIn, KOut, VOut>
40 for Box<dyn Processor<KIn, VIn, KOut, VOut>>
41where
42 KIn: Send + 'static,
43 VIn: Send + 'static,
44 KOut: Send + 'static,
45 VOut: Send + 'static,
46{
47 async fn init(&mut self, ctx: &mut ProcessorContext<'_, '_, KOut, VOut>) {
48 (**self).init(ctx).await;
49 }
50 async fn process(
51 &mut self,
52 ctx: &mut ProcessorContext<'_, '_, KOut, VOut>,
53 record: Record<KIn, VIn>,
54 ) {
55 (**self).process(ctx, record).await;
56 }
57 async fn close(&mut self) {
58 (**self).close().await;
59 }
60}
61
62pub trait ProcessorSupplier<KIn, VIn, KOut, VOut>: Send + Sync + 'static {
64 fn get(&self) -> Box<dyn Processor<KIn, VIn, KOut, VOut>>;
65}
66
67impl<F, P, KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> for F
74where
75 F: Fn() -> P + Send + Sync + 'static,
76 KIn: Send,
77 VIn: Send,
78 KOut: Send,
79 VOut: Send,
80 P: Processor<KIn, VIn, KOut, VOut>,
81{
82 fn get(&self) -> Box<dyn Processor<KIn, VIn, KOut, VOut>> {
83 Box::new(self())
84 }
85}
86
87pub struct ProcessorContext<'ctx, 'd, KOut, VOut> {
96 dispatch: &'ctx mut Dispatch<'d>,
97 _pd: PhantomData<fn(KOut, VOut)>,
98}
99
100impl<'ctx, 'd, KOut, VOut> ProcessorContext<'ctx, 'd, KOut, VOut>
101where
102 KOut: Any + Send + Clone,
103 VOut: Any + Send + Clone,
104{
105 pub(crate) fn new(dispatch: &'ctx mut Dispatch<'d>) -> Self {
106 Self {
107 dispatch,
108 _pd: PhantomData,
109 }
110 }
111
112 pub fn forward(&mut self, record: Record<KOut, VOut>) {
117 let children = self.dispatch.children;
119 let Some((&last, rest)) = children.split_last() else {
120 return; };
122 for &child in rest {
123 let key: Option<Box<dyn Any + Send>> = record
124 .key
125 .clone()
126 .map(|k| Box::new(k) as Box<dyn Any + Send>);
127 let value: Box<dyn Any + Send> = Box::new(record.value.clone());
128 self.dispatch
129 .buffer
130 .push_back((child, ErasedRecord::new(key, value, record.timestamp)));
131 }
132 let ts = record.timestamp;
133 let key: Option<Box<dyn Any + Send>> =
134 record.key.map(|k| Box::new(k) as Box<dyn Any + Send>);
135 let value: Box<dyn Any + Send> = Box::new(record.value);
136 self.dispatch
137 .buffer
138 .push_back((last, ErasedRecord::new(key, value, ts)));
139 }
140
141 pub fn get_state_store<K2: Send + Sync + 'static, V2: Send + 'static>(
144 &mut self,
145 name: &str,
146 ) -> Option<&mut dyn crate::store::api::KeyValueStore<K2, V2>> {
147 self.dispatch.stores.get_kv::<K2, V2>(name)
148 }
149
150 pub async fn global_get<GK: Send + Sync + 'static, VG: Send + 'static>(
155 &mut self,
156 store: &str,
157 key: &GK,
158 ) -> Option<VG> {
159 self.dispatch.globals.get::<GK, VG>(store, key).await
160 }
161
162 pub fn get_window_store<K2: Send + Sync + 'static, V2: Send + 'static>(
165 &mut self,
166 name: &str,
167 ) -> Option<&mut dyn crate::store::window::WindowStore<K2, V2>> {
168 self.dispatch.stores.get_window::<K2, V2>(name)
169 }
170
171 pub fn get_join_window_store<K2: Send + Sync + 'static, V2: Send + 'static>(
175 &mut self,
176 name: &str,
177 ) -> Option<&mut dyn crate::store::join_window::JoinWindowStore<K2, V2>> {
178 self.dispatch.stores.get_join_window::<K2, V2>(name)
179 }
180
181 pub fn get_session_store<K2: Send + Sync + 'static, V2: Send + 'static>(
184 &mut self,
185 name: &str,
186 ) -> Option<&mut dyn crate::store::session::SessionStore<K2, V2>> {
187 self.dispatch.stores.get_session::<K2, V2>(name)
188 }
189
190 pub fn get_versioned_store<K2: Send + Sync + 'static, V2: Send + 'static>(
194 &mut self,
195 name: &str,
196 ) -> Option<&mut dyn crate::store::versioned::VersionedKeyValueStore<K2, V2>> {
197 self.dispatch.stores.get_versioned::<K2, V2>(name)
198 }
199
200 pub(crate) fn get_suppress_store<K2: Send + Sync + 'static, V2: Send + 'static>(
206 &mut self,
207 name: &str,
208 ) -> Option<&mut dyn crate::store::suppress_store::SuppressStore<K2, V2>> {
209 self.dispatch.stores.get_suppress::<K2, V2>(name)
210 }
211
212 pub(crate) fn get_join_grace_store<K2: Send + Sync + 'static, V2: Send + 'static>(
220 &mut self,
221 name: &str,
222 ) -> Option<&mut crate::store::join_grace_buffer::JoinGraceBufferStore<K2, V2>> {
223 self.dispatch.stores.get_join_grace::<K2, V2>(name)
224 }
225
226 pub(crate) fn get_fk_subscription_store(
231 &mut self,
232 name: &str,
233 ) -> Option<&mut crate::store::fk_subscription::SubscriptionBytesStore> {
234 self.dispatch.stores.get_fk_subscription(name)
235 }
236
237 #[must_use]
239 pub fn record_context(&self) -> &RecordContext {
240 self.dispatch.record_ctx
241 }
242
243 #[must_use]
247 pub fn store_is_cached(&self, name: &str) -> bool {
248 self.dispatch.stores.kv_is_cached(name)
249 }
250
251 pub fn schedule<P>(
257 &mut self,
258 interval: std::time::Duration,
259 ty: crate::processor::punctuation::PunctuationType,
260 punctuator: P,
261 ) -> crate::processor::punctuation::Cancellable
262 where
263 P: crate::processor::punctuation::Punctuator<KOut, VOut>,
264 {
265 use crate::processor::punctuation::PunctuationType;
266 let interval_ms = i64::try_from(interval.as_millis()).unwrap_or(i64::MAX);
267 assert!(
268 interval_ms >= 1,
269 "schedule interval must be positive (>= 1ms)"
270 );
271 let base = match ty {
272 PunctuationType::StreamTime => self.dispatch.sched_stream_time,
273 PunctuationType::WallClockTime => self.dispatch.sched_wall_clock,
274 };
275 let next_time = base.saturating_add(interval_ms);
276 let cancel = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
277 let erased: Box<dyn crate::processor::punctuation::ErasedPunctuator> =
278 Box::new(crate::processor::punctuation::TypedPunctuator::<
279 KOut,
280 VOut,
281 P,
282 >::new(punctuator));
283 self.dispatch
284 .schedules
285 .push(crate::processor::punctuation::ScheduleEntry {
286 node_idx: self.dispatch.node_idx,
287 interval_ms,
288 ty,
289 next_time,
290 punctuator: erased,
291 cancel: cancel.clone(),
292 });
293 crate::processor::punctuation::Cancellable::new(cancel)
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use crate::processor::erased::{Dispatch, ErasedRecord};
301 use crate::processor::record::{Record, RecordContext};
302 use assert2::check;
303 use std::collections::VecDeque;
304
305 struct Upper;
306 crate::impl_processor! {
307 impl Upper: (String, String) -> (String, String) {
308 async fn process(&mut self, ctx, r) {
309 ctx.forward(Record::new(r.key, r.value.to_uppercase(), r.timestamp));
310 }
311 }
312 }
313
314 struct Noop;
315 #[async_trait]
316 impl Processor<String, String, String, String> for Noop {
317 async fn process(
318 &mut self,
319 _ctx: &mut ProcessorContext<'_, '_, String, String>,
320 _r: Record<String, String>,
321 ) {
322 }
323 }
324
325 #[tokio::test]
326 async fn forward_pushes_erased_record_to_each_child() {
327 let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
328 let mut output = Vec::new();
329 let rc = RecordContext {
330 topic: "t".into(),
331 partition: 0,
332 offset: 0,
333 timestamp: 5,
334 };
335 let children = [3usize, 4usize];
336 let mut stores = crate::store::registry::StoreRegistry::default();
337 let globals = crate::runtime::global::GlobalStateManager::default();
338 let mut scheds = Vec::new();
339 let mut dispatch = Dispatch {
340 buffer: &mut buffer,
341 children: &children,
342 output: &mut output,
343 record_ctx: &rc,
344 stores: &mut stores,
345 globals: &globals,
346 node_idx: 0,
347 schedules: &mut scheds,
348 sched_stream_time: i64::MIN,
349 sched_wall_clock: 0,
350 };
351 let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut dispatch);
352 Upper
353 .process(&mut ctx, Record::new(Some("k".into()), "hi".into(), 5))
354 .await;
355 check!(buffer.len() == 2);
356 let (child, rec) = buffer.pop_front().unwrap();
357 check!(child == 3);
358 check!(*rec.value.downcast::<String>().unwrap() == "HI");
359 }
360
361 #[tokio::test]
362 async fn boxed_dyn_processor_delegates_init_process_close() {
363 let mut boxed: Box<dyn Processor<String, String, String, String>> = Box::new(Upper);
368 let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
369 let mut output = Vec::new();
370 let rc = RecordContext {
371 topic: "t".into(),
372 partition: 0,
373 offset: 0,
374 timestamp: 5,
375 };
376 let children = [1usize];
377 let mut stores = crate::store::registry::StoreRegistry::default();
378 let globals = crate::runtime::global::GlobalStateManager::default();
379 let mut scheds = Vec::new();
380 let mut dispatch = Dispatch {
381 buffer: &mut buffer,
382 children: &children,
383 output: &mut output,
384 record_ctx: &rc,
385 stores: &mut stores,
386 globals: &globals,
387 node_idx: 0,
388 schedules: &mut scheds,
389 sched_stream_time: i64::MIN,
390 sched_wall_clock: 0,
391 };
392 let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut dispatch);
393 boxed.init(&mut ctx).await; boxed
395 .process(&mut ctx, Record::new(None, "hi".into(), 5))
396 .await; boxed.close().await; check!(buffer.len() == 1);
399 let (_child, rec) = buffer.pop_front().unwrap();
400 check!(*rec.value.downcast::<String>().unwrap() == "HI");
401 }
402
403 #[tokio::test]
404 async fn default_init_and_close_are_noops_and_forward_with_no_children_drops() {
405 let mut p = Noop;
406 let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
407 let mut output = Vec::new();
408 let rc = RecordContext {
409 topic: "t".into(),
410 partition: 0,
411 offset: 0,
412 timestamp: 9,
413 };
414 let mut stores = crate::store::registry::StoreRegistry::default();
415 let globals = crate::runtime::global::GlobalStateManager::default();
416 let mut scheds = Vec::new();
417 let mut dispatch = Dispatch {
418 buffer: &mut buffer,
419 children: &[],
420 output: &mut output,
421 record_ctx: &rc,
422 stores: &mut stores,
423 globals: &globals,
424 node_idx: 0,
425 schedules: &mut scheds,
426 sched_stream_time: i64::MIN,
427 sched_wall_clock: 0,
428 };
429 let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut dispatch);
430 p.init(&mut ctx).await; check!(ctx.record_context().timestamp == 9);
432 ctx.forward(Record::new(None, "x".to_string(), 0)); check!(buffer.is_empty());
434 p.close().await; }
436}