Skip to main content

crabka_client_streams/processor/
api.rs

1//! The typed Processor API: `Processor`, `ProcessorSupplier`, and the
2//! `ProcessorContext` users call `forward` on.
3
4use std::any::Any;
5use std::marker::PhantomData;
6
7use async_trait::async_trait;
8
9use super::erased::{Dispatch, ErasedRecord};
10use super::record::{Record, RecordContext};
11
12/// A stateless record processor. One instance is created per task via
13/// [`ProcessorSupplier::get`]. Mirrors `org.apache.kafka.streams.processor.api.Processor`.
14///
15/// ## Lifecycle
16///
17/// The runtime invokes `init` once before the first record and `close` once at
18/// task shutdown. [`TopologyTestDriver`](crate::TopologyTestDriver) invokes
19/// `init` when it instantiates a topology for tests.
20#[async_trait]
21pub trait Processor<KIn: Send, VIn: Send, KOut: Send, VOut: Send>: Send + 'static {
22    async fn init(&mut self, _ctx: &mut ProcessorContext<'_, '_, KOut, VOut>) {}
23    async fn process(
24        &mut self,
25        ctx: &mut ProcessorContext<'_, '_, KOut, VOut>,
26        record: Record<KIn, VIn>,
27    );
28    async fn close(&mut self) {}
29}
30
31/// A boxed processor is itself a [`Processor`], delegating to the inner value.
32///
33/// This is what lets a [`ProcessorSupplier`] closure return `Box<dyn
34/// Processor<…>>` when the concrete type is chosen at runtime: the boxed value
35/// still satisfies the supplier blanket impl (which only requires the closure's
36/// return type to be *some* `Processor`). For the common case, return the
37/// concrete processor directly (`|| MyProc`) and skip the box entirely.
38#[async_trait]
39impl<KIn, VIn, KOut, VOut> Processor<KIn, VIn, KOut, VOut>
40    for Box<dyn Processor<KIn, VIn, KOut, VOut>>
41where
42    KIn: Send + 'static,
43    VIn: Send + 'static,
44    KOut: Send + 'static,
45    VOut: Send + 'static,
46{
47    async fn init(&mut self, ctx: &mut ProcessorContext<'_, '_, KOut, VOut>) {
48        (**self).init(ctx).await;
49    }
50    async fn process(
51        &mut self,
52        ctx: &mut ProcessorContext<'_, '_, KOut, VOut>,
53        record: Record<KIn, VIn>,
54    ) {
55        (**self).process(ctx, record).await;
56    }
57    async fn close(&mut self) {
58        (**self).close().await;
59    }
60}
61
62/// Factory for [`Processor`] instances (one per task → per-task isolation).
63pub trait ProcessorSupplier<KIn, VIn, KOut, VOut>: Send + Sync + 'static {
64    fn get(&self) -> Box<dyn Processor<KIn, VIn, KOut, VOut>>;
65}
66
67// Blanket impl so a closure `|| MyProc` is a supplier. The closure returns a
68// *concrete* `P: Processor`, which we box. Because `P` is concrete, the four KV
69// type parameters are inferred from `P`'s single `Processor` impl — callers
70// never annotate them. A closure returning `Box<dyn Processor<…>>` also works
71// (the boxed value is itself a `Processor`, see the impl above), covering the
72// rarer case of picking the concrete processor type at runtime.
73impl<F, P, KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> for F
74where
75    F: Fn() -> P + Send + Sync + 'static,
76    KIn: Send,
77    VIn: Send,
78    KOut: Send,
79    VOut: Send,
80    P: Processor<KIn, VIn, KOut, VOut>,
81{
82    fn get(&self) -> Box<dyn Processor<KIn, VIn, KOut, VOut>> {
83        Box::new(self())
84    }
85}
86
87/// Handed to [`Processor::process`]. `forward` boxes the record and queues it
88/// for each child node (the driver drains the queue).
89///
90/// Two lifetimes: `'ctx` is the borrow of the `Dispatch` reference itself;
91/// `'d` is the lifetime of the data inside `Dispatch` (buffers, slices, etc.).
92/// Keeping them separate avoids lifetime-invariance issues when constructing a
93/// `ProcessorContext` from a `&mut Dispatch<'d>` with an independently-scoped
94/// outer borrow `'ctx`.
95pub struct ProcessorContext<'ctx, 'd, KOut, VOut> {
96    dispatch: &'ctx mut Dispatch<'d>,
97    _pd: PhantomData<fn(KOut, VOut)>,
98}
99
100impl<'ctx, 'd, KOut, VOut> ProcessorContext<'ctx, 'd, KOut, VOut>
101where
102    KOut: Any + Send + Clone,
103    VOut: Any + Send + Clone,
104{
105    pub(crate) fn new(dispatch: &'ctx mut Dispatch<'d>) -> Self {
106        Self {
107            dispatch,
108            _pd: PhantomData,
109        }
110    }
111
112    /// Forward a record to all child nodes. The record is cloned per child for
113    /// fan-out; the last child receives the original by move (so the common
114    /// single-child case performs zero clones). Mirrors the JVM
115    /// `ProcessorContext.forward(Record)`, which takes the record by value.
116    pub fn forward(&mut self, record: Record<KOut, VOut>) {
117        // Copy the child-slice reference out so we can mutably borrow `buffer`.
118        let children = self.dispatch.children;
119        let Some((&last, rest)) = children.split_last() else {
120            return; // no children — drop the record
121        };
122        for &child in rest {
123            let key: Option<Box<dyn Any + Send>> = record
124                .key
125                .clone()
126                .map(|k| Box::new(k) as Box<dyn Any + Send>);
127            let value: Box<dyn Any + Send> = Box::new(record.value.clone());
128            self.dispatch
129                .buffer
130                .push_back((child, ErasedRecord::new(key, value, record.timestamp)));
131        }
132        let ts = record.timestamp;
133        let key: Option<Box<dyn Any + Send>> =
134            record.key.map(|k| Box::new(k) as Box<dyn Any + Send>);
135        let value: Box<dyn Any + Send> = Box::new(record.value);
136        self.dispatch
137            .buffer
138            .push_back((last, ErasedRecord::new(key, value, ts)));
139    }
140
141    /// Access a connected state store, typed. `None` if absent or the K/V types
142    /// don't match. Fetch it per-record (do not hold across `process` calls).
143    pub fn get_state_store<K2: Send + Sync + 'static, V2: Send + 'static>(
144        &mut self,
145        name: &str,
146    ) -> Option<&mut dyn crate::store::api::KeyValueStore<K2, V2>> {
147        self.dispatch.stores.get_kv::<K2, V2>(name)
148    }
149
150    /// Look up a value in a connected GLOBAL store (fully-replicated, shared across
151    /// tasks). Returns an owned value — no borrow escapes the shared manager's lock,
152    /// so the lookup future need not be held across `forward`. `None` on miss /
153    /// type mismatch. Fetch it per-record (do not hold across `process` calls).
154    pub async fn global_get<GK: Send + Sync + 'static, VG: Send + 'static>(
155        &mut self,
156        store: &str,
157        key: &GK,
158    ) -> Option<VG> {
159        self.dispatch.globals.get::<GK, VG>(store, key).await
160    }
161
162    /// Access a connected window store, typed. `None` if absent or the K/V types
163    /// don't match. Fetch it per-record (do not hold across `process` calls).
164    pub fn get_window_store<K2: Send + Sync + 'static, V2: Send + 'static>(
165        &mut self,
166        name: &str,
167    ) -> Option<&mut dyn crate::store::window::WindowStore<K2, V2>> {
168        self.dispatch.stores.get_window::<K2, V2>(name)
169    }
170
171    /// Access a connected join-window store (retainDuplicates), typed. `None` if
172    /// absent or the K/V types don't match. Fetch it per-record (do not hold
173    /// across `process` calls).
174    pub fn get_join_window_store<K2: Send + Sync + 'static, V2: Send + 'static>(
175        &mut self,
176        name: &str,
177    ) -> Option<&mut dyn crate::store::join_window::JoinWindowStore<K2, V2>> {
178        self.dispatch.stores.get_join_window::<K2, V2>(name)
179    }
180
181    /// Access a connected session store, typed. `None` if absent or the K/V types
182    /// don't match. Fetch it per-record (do not hold across `process` calls).
183    pub fn get_session_store<K2: Send + Sync + 'static, V2: Send + 'static>(
184        &mut self,
185        name: &str,
186    ) -> Option<&mut dyn crate::store::session::SessionStore<K2, V2>> {
187        self.dispatch.stores.get_session::<K2, V2>(name)
188    }
189
190    /// Access a connected versioned store (KIP-889), typed. `None` if absent or
191    /// the K/V types don't match. Fetch it per-record (do not hold across
192    /// `process` calls).
193    pub fn get_versioned_store<K2: Send + Sync + 'static, V2: Send + 'static>(
194        &mut self,
195        name: &str,
196    ) -> Option<&mut dyn crate::store::versioned::VersionedKeyValueStore<K2, V2>> {
197        self.dispatch.stores.get_versioned::<K2, V2>(name)
198    }
199
200    /// Access a connected suppress store, typed. `None` if absent or the K/V types
201    /// don't match. Fetch it per-record (do not hold across `process` calls).
202    ///
203    /// `pub(crate)`: the returned trait surfaces `Change<V>` (crate-internal) and
204    /// the suppress store is a built-in DSL mechanism, not a user-facing store.
205    pub(crate) fn get_suppress_store<K2: Send + Sync + 'static, V2: Send + 'static>(
206        &mut self,
207        name: &str,
208    ) -> Option<&mut dyn crate::store::suppress_store::SuppressStore<K2, V2>> {
209        self.dispatch.stores.get_suppress::<K2, V2>(name)
210    }
211
212    /// Access a connected join-grace buffer store (KIP-923), typed. `None` if
213    /// absent or the K/V types don't match. Fetch it per-record (do not hold
214    /// across `process` calls).
215    ///
216    /// `pub(crate)`: the grace buffer is a built-in DSL mechanism the stream–table
217    /// join's grace-flush processor reaches via the context, not a user-facing
218    /// store.
219    pub(crate) fn get_join_grace_store<K2: Send + Sync + 'static, V2: Send + 'static>(
220        &mut self,
221        name: &str,
222    ) -> Option<&mut crate::store::join_grace_buffer::JoinGraceBufferStore<K2, V2>> {
223        self.dispatch.stores.get_join_grace::<K2, V2>(name)
224    }
225
226    /// Access the connected FK subscription store. `None` if absent.
227    ///
228    /// `pub(crate)`: the subscription store is an internal KIP-213 FK-join
229    /// mechanism, not a user-facing store.
230    pub(crate) fn get_fk_subscription_store(
231        &mut self,
232        name: &str,
233    ) -> Option<&mut crate::store::fk_subscription::SubscriptionBytesStore> {
234        self.dispatch.stores.get_fk_subscription(name)
235    }
236
237    /// Metadata of the source record currently being processed.
238    #[must_use]
239    pub fn record_context(&self) -> &RecordContext {
240        self.dispatch.record_ctx
241    }
242
243    /// Whether the named KV state store is record-cached (so this processor should
244    /// suppress its immediate forward and let the cache flush forward the deduped
245    /// change). False for absent/non-KV/uncached stores.
246    #[must_use]
247    pub fn store_is_cached(&self, name: &str) -> bool {
248        self.dispatch.stores.kv_is_cached(name)
249    }
250
251    /// Schedule a periodic [`Punctuator`]. Callable from `init` or `process`.
252    /// `interval` must be positive. Returns a [`Cancellable`] to stop it.
253    ///
254    /// [`Punctuator`]: crate::processor::punctuation::Punctuator
255    /// [`Cancellable`]: crate::processor::punctuation::Cancellable
256    pub fn schedule<P>(
257        &mut self,
258        interval: std::time::Duration,
259        ty: crate::processor::punctuation::PunctuationType,
260        punctuator: P,
261    ) -> crate::processor::punctuation::Cancellable
262    where
263        P: crate::processor::punctuation::Punctuator<KOut, VOut>,
264    {
265        use crate::processor::punctuation::PunctuationType;
266        let interval_ms = i64::try_from(interval.as_millis()).unwrap_or(i64::MAX);
267        assert!(
268            interval_ms >= 1,
269            "schedule interval must be positive (>= 1ms)"
270        );
271        let base = match ty {
272            PunctuationType::StreamTime => self.dispatch.sched_stream_time,
273            PunctuationType::WallClockTime => self.dispatch.sched_wall_clock,
274        };
275        let next_time = base.saturating_add(interval_ms);
276        let cancel = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
277        let erased: Box<dyn crate::processor::punctuation::ErasedPunctuator> =
278            Box::new(crate::processor::punctuation::TypedPunctuator::<
279                KOut,
280                VOut,
281                P,
282            >::new(punctuator));
283        self.dispatch
284            .schedules
285            .push(crate::processor::punctuation::ScheduleEntry {
286                node_idx: self.dispatch.node_idx,
287                interval_ms,
288                ty,
289                next_time,
290                punctuator: erased,
291                cancel: cancel.clone(),
292            });
293        crate::processor::punctuation::Cancellable::new(cancel)
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use crate::processor::erased::{Dispatch, ErasedRecord};
301    use crate::processor::record::{Record, RecordContext};
302    use assert2::check;
303    use std::collections::VecDeque;
304
305    struct Upper;
306    crate::impl_processor! {
307        impl Upper: (String, String) -> (String, String) {
308            async fn process(&mut self, ctx, r) {
309                ctx.forward(Record::new(r.key, r.value.to_uppercase(), r.timestamp));
310            }
311        }
312    }
313
314    struct Noop;
315    #[async_trait]
316    impl Processor<String, String, String, String> for Noop {
317        async fn process(
318            &mut self,
319            _ctx: &mut ProcessorContext<'_, '_, String, String>,
320            _r: Record<String, String>,
321        ) {
322        }
323    }
324
325    #[tokio::test]
326    async fn forward_pushes_erased_record_to_each_child() {
327        let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
328        let mut output = Vec::new();
329        let rc = RecordContext {
330            topic: "t".into(),
331            partition: 0,
332            offset: 0,
333            timestamp: 5,
334        };
335        let children = [3usize, 4usize];
336        let mut stores = crate::store::registry::StoreRegistry::default();
337        let globals = crate::runtime::global::GlobalStateManager::default();
338        let mut scheds = Vec::new();
339        let mut dispatch = Dispatch {
340            buffer: &mut buffer,
341            children: &children,
342            output: &mut output,
343            record_ctx: &rc,
344            stores: &mut stores,
345            globals: &globals,
346            node_idx: 0,
347            schedules: &mut scheds,
348            sched_stream_time: i64::MIN,
349            sched_wall_clock: 0,
350        };
351        let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut dispatch);
352        Upper
353            .process(&mut ctx, Record::new(Some("k".into()), "hi".into(), 5))
354            .await;
355        check!(buffer.len() == 2);
356        let (child, rec) = buffer.pop_front().unwrap();
357        check!(child == 3);
358        check!(*rec.value.downcast::<String>().unwrap() == "HI");
359    }
360
361    #[tokio::test]
362    async fn boxed_dyn_processor_delegates_init_process_close() {
363        // A `Box<dyn Processor>` is itself a `Processor`, forwarding every method
364        // to the inner value. This is the runtime-dispatch path a
365        // `ProcessorSupplier` closure takes when it returns `Box<dyn Processor<…>>`
366        // instead of a concrete processor.
367        let mut boxed: Box<dyn Processor<String, String, String, String>> = Box::new(Upper);
368        let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
369        let mut output = Vec::new();
370        let rc = RecordContext {
371            topic: "t".into(),
372            partition: 0,
373            offset: 0,
374            timestamp: 5,
375        };
376        let children = [1usize];
377        let mut stores = crate::store::registry::StoreRegistry::default();
378        let globals = crate::runtime::global::GlobalStateManager::default();
379        let mut scheds = Vec::new();
380        let mut dispatch = Dispatch {
381            buffer: &mut buffer,
382            children: &children,
383            output: &mut output,
384            record_ctx: &rc,
385            stores: &mut stores,
386            globals: &globals,
387            node_idx: 0,
388            schedules: &mut scheds,
389            sched_stream_time: i64::MIN,
390            sched_wall_clock: 0,
391        };
392        let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut dispatch);
393        boxed.init(&mut ctx).await; // forwards to Upper's default no-op
394        boxed
395            .process(&mut ctx, Record::new(None, "hi".into(), 5))
396            .await; // forwards → uppercases
397        boxed.close().await; // forwards to Upper's default no-op
398        check!(buffer.len() == 1);
399        let (_child, rec) = buffer.pop_front().unwrap();
400        check!(*rec.value.downcast::<String>().unwrap() == "HI");
401    }
402
403    #[tokio::test]
404    async fn default_init_and_close_are_noops_and_forward_with_no_children_drops() {
405        let mut p = Noop;
406        let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
407        let mut output = Vec::new();
408        let rc = RecordContext {
409            topic: "t".into(),
410            partition: 0,
411            offset: 0,
412            timestamp: 9,
413        };
414        let mut stores = crate::store::registry::StoreRegistry::default();
415        let globals = crate::runtime::global::GlobalStateManager::default();
416        let mut scheds = Vec::new();
417        let mut dispatch = Dispatch {
418            buffer: &mut buffer,
419            children: &[],
420            output: &mut output,
421            record_ctx: &rc,
422            stores: &mut stores,
423            globals: &globals,
424            node_idx: 0,
425            schedules: &mut scheds,
426            sched_stream_time: i64::MIN,
427            sched_wall_clock: 0,
428        };
429        let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut dispatch);
430        p.init(&mut ctx).await; // default no-op
431        check!(ctx.record_context().timestamp == 9);
432        ctx.forward(Record::new(None, "x".to_string(), 0)); // no children → dropped, no panic
433        check!(buffer.is_empty());
434        p.close().await; // default no-op
435    }
436}