Skip to main content

crabka_client_streams/processor/
fixed_key.rs

1//! Fixed-key Processor API (KIP-820 `processValues`): a processor that may change
2//! the value but NOT the key. A thin typed facade over the regular [`Processor`]
3//! runtime. An internal adapter bridges a [`FixedKeyProcessor`] into a
4//! `Processor<KIn, VIn, KIn, VOut>` (the output key type equals the input key
5//! type, so the runtime keeps the partition assignment unchanged).
6//!
7//! The DSL surface `KStream::process_values` wraps a
8//! [`FixedKeyProcessorSupplier`] in that adapter; this module is the
9//! typed facade + adapter it builds on.
10
11use std::any::Any;
12
13use async_trait::async_trait;
14
15use crate::processor::api::{Processor, ProcessorContext};
16use crate::processor::record::{Record, RecordContext};
17
18/// A record whose KEY is immutable. [`with_value`](FixedKeyRecord::with_value)
19/// changes the value (possibly the type), keeping the key and timestamp. Unlike
20/// [`Record`], the key is **not** optional: a fixed-key processor is only reached
21/// for records that carry a key (a null-key record cannot have its key
22/// "preserved" — `process_values` requires a non-null key, matching the JVM).
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct FixedKeyRecord<K, V> {
25    pub key: K,
26    pub value: V,
27    pub timestamp: i64,
28}
29
30impl<K, V> FixedKeyRecord<K, V> {
31    /// Replace the value (and possibly its type), preserving key and timestamp.
32    #[must_use]
33    pub fn with_value<V2>(self, value: V2) -> FixedKeyRecord<K, V2> {
34        FixedKeyRecord {
35            key: self.key,
36            value,
37            timestamp: self.timestamp,
38        }
39    }
40}
41
42/// Handed to a [`FixedKeyProcessor::process`]. The only `forward` re-attaches the
43/// (unchanged) key, so a fixed-key processor cannot accidentally repartition. All
44/// other accessors delegate verbatim to the underlying [`ProcessorContext`].
45pub struct FixedKeyProcessorContext<'a, 'ctx, 'd, K, VOut> {
46    inner: &'a mut ProcessorContext<'ctx, 'd, K, VOut>,
47}
48
49impl<'a, 'ctx, 'd, K, VOut> FixedKeyProcessorContext<'a, 'ctx, 'd, K, VOut>
50where
51    K: Any + Send + Clone,
52    VOut: Any + Send + Clone,
53{
54    // Constructed by FixedKeyAdapter (wired up by KStream::process_values).
55    pub(crate) fn new(inner: &'a mut ProcessorContext<'ctx, 'd, K, VOut>) -> Self {
56        Self { inner }
57    }
58
59    /// Forward a record downstream, re-attaching the (unchanged) key. Mirrors the
60    /// JVM `FixedKeyProcessorContext.forward(FixedKeyRecord)`.
61    pub fn forward(&mut self, record: FixedKeyRecord<K, VOut>) {
62        self.inner.forward(Record::new(
63            Some(record.key),
64            record.value,
65            record.timestamp,
66        ));
67    }
68
69    /// Access a connected key/value state store, typed. Delegates to
70    /// [`ProcessorContext::get_state_store`]. (Window/session-store accessors are
71    /// omitted: no DSL path connects those to a `process_values` node yet.)
72    pub fn get_state_store<K2: Send + Sync + 'static, V2: Send + 'static>(
73        &mut self,
74        name: &str,
75    ) -> Option<&mut dyn crate::store::api::KeyValueStore<K2, V2>> {
76        self.inner.get_state_store::<K2, V2>(name)
77    }
78
79    /// Metadata of the source record currently being processed.
80    #[must_use]
81    pub fn record_context(&self) -> &RecordContext {
82        self.inner.record_context()
83    }
84}
85
86/// A fixed-key record processor (KIP-820): it may change the value but not the
87/// key. The typed analogue of [`Processor`]; one instance is created per task via
88/// [`FixedKeyProcessorSupplier::get`].
89#[async_trait]
90pub trait FixedKeyProcessor<KIn: Send, VIn: Send, VOut: Send>: Send + 'static {
91    async fn process(
92        &mut self,
93        ctx: &mut FixedKeyProcessorContext<'_, '_, '_, KIn, VOut>,
94        record: FixedKeyRecord<KIn, VIn>,
95    );
96}
97
98/// A boxed fixed-key processor is itself a [`FixedKeyProcessor`], delegating to
99/// the inner value. Mirrors the `Box<dyn Processor>` blanket impl in
100/// [`crate::processor::api`]: it lets a [`FixedKeyProcessorSupplier`] closure
101/// return `Box<dyn FixedKeyProcessor<…>>` (chosen at runtime) and still feed a
102/// the internal fixed-key adapter, which requires `P: FixedKeyProcessor`.
103#[async_trait]
104impl<KIn, VIn, VOut> FixedKeyProcessor<KIn, VIn, VOut>
105    for Box<dyn FixedKeyProcessor<KIn, VIn, VOut>>
106where
107    KIn: Send + 'static,
108    VIn: Send + 'static,
109    VOut: Send + 'static,
110{
111    async fn process(
112        &mut self,
113        ctx: &mut FixedKeyProcessorContext<'_, '_, '_, KIn, VOut>,
114        record: FixedKeyRecord<KIn, VIn>,
115    ) {
116        (**self).process(ctx, record).await;
117    }
118}
119
120/// Factory for [`FixedKeyProcessor`] instances (one per task → per-task
121/// isolation). Mirrors [`ProcessorSupplier`](crate::processor::ProcessorSupplier);
122/// a closure `|| MyFixedProc` is a supplier via the blanket impl below.
123pub trait FixedKeyProcessorSupplier<KIn, VIn, VOut>: Send + Sync + 'static {
124    fn get(&self) -> Box<dyn FixedKeyProcessor<KIn, VIn, VOut>>;
125}
126
127impl<F, P, KIn, VIn, VOut> FixedKeyProcessorSupplier<KIn, VIn, VOut> for F
128where
129    F: Fn() -> P + Send + Sync + 'static,
130    KIn: Send,
131    VIn: Send,
132    VOut: Send,
133    P: FixedKeyProcessor<KIn, VIn, VOut>,
134{
135    fn get(&self) -> Box<dyn FixedKeyProcessor<KIn, VIn, VOut>> {
136        Box::new(self())
137    }
138}
139
140/// Bridges a [`FixedKeyProcessor`] into the regular [`Processor`] runtime with
141/// `KOut = KIn` (the key is preserved). Wrapped around a [`FixedKeyProcessorSupplier`]
142/// by [`KStream::process_values`](crate::dsl::kstream::KStream::process_values).
143pub(crate) struct FixedKeyAdapter<P> {
144    pub inner: P,
145}
146
147#[async_trait]
148impl<P, KIn, VIn, VOut> Processor<KIn, VIn, KIn, VOut> for FixedKeyAdapter<P>
149where
150    KIn: Any + Send + Clone + 'static,
151    VIn: Send + 'static,
152    VOut: Any + Send + Clone + 'static,
153    P: FixedKeyProcessor<KIn, VIn, VOut>,
154{
155    async fn process(
156        &mut self,
157        ctx: &mut ProcessorContext<'_, '_, KIn, VOut>,
158        record: Record<KIn, VIn>,
159    ) {
160        let key = record.key.expect("process_values requires a non-null key");
161        let fkr = FixedKeyRecord {
162            key,
163            value: record.value,
164            timestamp: record.timestamp,
165        };
166        let mut fk_ctx = FixedKeyProcessorContext::new(ctx);
167        self.inner.process(&mut fk_ctx, fkr).await;
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use crate::processor::erased::{Dispatch, ErasedRecord};
175    use crate::processor::record::{Record, RecordContext};
176    use assert2::check;
177    use std::collections::VecDeque;
178
179    #[test]
180    fn with_value_keeps_key_and_timestamp() {
181        let r = FixedKeyRecord {
182            key: "k".to_string(),
183            value: 1i32,
184            timestamp: 42,
185        };
186        let r2 = r.with_value("v".to_string());
187        check!(r2.key == "k");
188        check!(r2.value == "v");
189        check!(r2.timestamp == 42);
190    }
191
192    /// A fixed-key processor that uppercases the value, preserving the key.
193    struct UpperValue;
194    crate::impl_fixed_key_processor! {
195        impl UpperValue: (String, String) -> String {
196            async fn process(&mut self, ctx, r) {
197                let v = r.value.clone();
198                ctx.forward(r.with_value(v.to_uppercase()));
199            }
200        }
201    }
202
203    #[tokio::test]
204    async fn adapter_bridges_fixed_key_processor_preserving_key() {
205        // Drive `UpperValue` through `FixedKeyAdapter` over a real
206        // `ProcessorContext`, constructed exactly as the `api.rs` tests do.
207        let mut adapter = FixedKeyAdapter { inner: UpperValue };
208        let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
209        let mut output = Vec::new();
210        let rc = RecordContext {
211            topic: "t".into(),
212            partition: 0,
213            offset: 0,
214            timestamp: 5,
215        };
216        let children = [1usize];
217        let mut stores = crate::store::registry::StoreRegistry::default();
218        let globals = crate::runtime::global::GlobalStateManager::default();
219        let mut scheds = Vec::new();
220        let mut dispatch = Dispatch {
221            buffer: &mut buffer,
222            children: &children,
223            output: &mut output,
224            record_ctx: &rc,
225            stores: &mut stores,
226            globals: &globals,
227            node_idx: 0,
228            schedules: &mut scheds,
229            sched_stream_time: i64::MIN,
230            sched_wall_clock: 0,
231        };
232        let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut dispatch);
233        adapter
234            .process(&mut ctx, Record::new(Some("k".into()), "hi".into(), 5))
235            .await;
236
237        check!(buffer.len() == 1);
238        let (child, rec) = buffer.pop_front().unwrap();
239        check!(child == 1);
240        // The key is preserved (Some("k")) and the value uppercased.
241        let key = rec.key.expect("forwarded record must carry the key");
242        check!(*key.downcast::<String>().unwrap() == "k");
243        check!(*rec.value.downcast::<String>().unwrap() == "HI");
244        check!(rec.timestamp == 5);
245    }
246
247    #[tokio::test]
248    async fn supplier_blanket_impl_boxes_processor() {
249        // A closure `|| UpperValue` is a `FixedKeyProcessorSupplier`; `get()`
250        // returns a boxed processor that itself impls `FixedKeyProcessor`.
251        let supplier = || UpperValue;
252        let mut boxed = FixedKeyProcessorSupplier::get(&supplier);
253
254        let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
255        let mut output = Vec::new();
256        let rc = RecordContext {
257            topic: "t".into(),
258            partition: 0,
259            offset: 0,
260            timestamp: 7,
261        };
262        let children = [2usize];
263        let mut stores = crate::store::registry::StoreRegistry::default();
264        let globals = crate::runtime::global::GlobalStateManager::default();
265        let mut scheds = Vec::new();
266        let mut dispatch = Dispatch {
267            buffer: &mut buffer,
268            children: &children,
269            output: &mut output,
270            record_ctx: &rc,
271            stores: &mut stores,
272            globals: &globals,
273            node_idx: 0,
274            schedules: &mut scheds,
275            sched_stream_time: i64::MIN,
276            sched_wall_clock: 0,
277        };
278        let mut ctx = ProcessorContext::<'_, '_, String, String>::new(&mut dispatch);
279        let mut fk_ctx = FixedKeyProcessorContext::new(&mut ctx);
280
281        // Drive the boxed processor directly to exercise the Box<dyn …> blanket impl.
282        boxed
283            .process(
284                &mut fk_ctx,
285                FixedKeyRecord {
286                    key: "k".to_string(),
287                    value: "lo".to_string(),
288                    timestamp: 7,
289                },
290            )
291            .await;
292
293        check!(buffer.len() == 1);
294        let (_child, rec) = buffer.pop_front().unwrap();
295        check!(*rec.value.downcast::<String>().unwrap() == "LO");
296    }
297}