Skip to main content

crabka_client_streams/processor/
serde.rs

1//! `Serde<T>`: typed (de)serialization at source/sink boundaries.
2
3use bytes::Bytes;
4
5/// Failure to deserialize bytes into `T`.
6#[derive(Debug, thiserror::Error)]
7#[error("deserialization error: {0}")]
8pub struct SerdeError(pub String);
9
10/// Serialize a `T` to bytes and back. Used by source nodes (deserialize) and
11/// sink/repartition nodes (serialize).
12#[diagnostic::on_unimplemented(
13    message = "the type `{Self}` is not a valid serializer/deserializer for `{T}`",
14    label = "does not implement `Serde<{T}>`",
15    note = "implement `Serde<{T}>` for `{Self}` or verify that the type parameter `{T}` matches the deserialized type of this serde"
16)]
17pub trait Serde<T>: Send + Sync + 'static {
18    fn serialize(&self, topic: &str, value: &T) -> Bytes;
19    fn deserialize(&self, topic: &str, bytes: &[u8]) -> Result<T, SerdeError>;
20
21    /// Pre-register any per-topic state (e.g. a schema-registry subject) before
22    /// processing begins. Called once per `(topic, role)` at topology wiring
23    /// time. Default: no-op (stateless serdes need nothing).
24    fn prepare(&self, _topic: &str, _role: SerdeRole) {}
25}
26
27/// Whether a serde is wired for the record key or value — passed to
28/// [`Serde::prepare`] so schema serdes can derive `<topic>-key` / `<topic>-value`.
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum SerdeRole {
31    Key,
32    Value,
33}
34
35/// Associate a concrete Serde type with its target deserialized type to enable automatic type inference.
36#[diagnostic::on_unimplemented(
37    message = "the serde type `{Self}` is not associated with any target model type",
38    label = "does not implement `SerdeAssociate`",
39    note = "implement `SerdeAssociate` for `{Self}` and define `type Target = ...;` to map this Serde to its model type"
40)]
41pub trait SerdeAssociate {
42    type Target: Send + Sync + 'static;
43}
44
45/// The key + value [`Serde`]s used to **read** a topic into the topology.
46///
47/// A source node (and [`TopologyTestDriver::pipe_input`]) deserializes incoming
48/// bytes with these. Pairing the two serdes into one named argument keeps their
49/// roles visible at the call site — `Consumed::with(keySerde, valueSerde)` reads
50/// the same as Kafka Streams' `Consumed`.
51///
52/// [`TopologyTestDriver::pipe_input`]: crate::TopologyTestDriver::pipe_input
53#[derive(Debug, Clone, Copy)]
54pub struct Consumed<KS, VS> {
55    pub(crate) key_serde: KS,
56    pub(crate) value_serde: VS,
57}
58
59impl<KS, VS> Consumed<KS, VS> {
60    /// Pair a key serde with a value serde (key first, mirroring
61    /// `Consumed.with(keySerde, valueSerde)`).
62    #[must_use]
63    pub fn with(key_serde: KS, value_serde: VS) -> Self {
64        Self {
65            key_serde,
66            value_serde,
67        }
68    }
69}
70
71/// The key + value [`Serde`]s used to **write** a topic from the topology.
72///
73/// A sink node serializes outgoing records with these;
74/// [`TopologyTestDriver::read_output`] uses them to deserialize what a sink
75/// wrote. `Produced::with(keySerde, valueSerde)` reads the same as Kafka
76/// Streams' `Produced`.
77///
78/// [`TopologyTestDriver::read_output`]: crate::TopologyTestDriver::read_output
79#[derive(Debug, Clone, Copy)]
80pub struct Produced<KS, VS> {
81    pub(crate) key_serde: KS,
82    pub(crate) value_serde: VS,
83}
84
85impl<KS, VS> Produced<KS, VS> {
86    /// Pair a key serde with a value serde (key first, mirroring
87    /// `Produced.with(keySerde, valueSerde)`).
88    #[must_use]
89    pub fn with(key_serde: KS, value_serde: VS) -> Self {
90        Self {
91            key_serde,
92            value_serde,
93        }
94    }
95}
96
97/// A clonable [`Serde<T>`] over a type-erased `Arc<dyn Serde<T>>`. Lets a serde
98/// captured behind an `Arc` (e.g. a `KTable`'s stored key/value serde) be passed
99/// to `add_source`/`add_sink` (which want a `Serde<T> + Clone` value) or boxed
100/// into a processor field, without naming the concrete serde type.
101pub(crate) struct SerdeArc<T>(pub(crate) std::sync::Arc<dyn Serde<T>>);
102
103impl<T> Clone for SerdeArc<T> {
104    fn clone(&self) -> Self {
105        SerdeArc(std::sync::Arc::clone(&self.0))
106    }
107}
108
109impl<T: Send + Sync + 'static> Serde<T> for SerdeArc<T> {
110    fn serialize(&self, topic: &str, value: &T) -> Bytes {
111        self.0.serialize(topic, value)
112    }
113    fn deserialize(&self, topic: &str, bytes: &[u8]) -> Result<T, SerdeError> {
114        self.0.deserialize(topic, bytes)
115    }
116    fn prepare(&self, topic: &str, role: SerdeRole) {
117        self.0.prepare(topic, role);
118    }
119}
120
121impl<T: Send + Sync + 'static> SerdeAssociate for SerdeArc<T> {
122    type Target = T;
123}
124
125/// Identity serde for raw `Bytes`.
126#[derive(Debug, Clone, Copy, Default)]
127pub struct BytesSerde;
128impl Serde<Bytes> for BytesSerde {
129    fn serialize(&self, _topic: &str, value: &Bytes) -> Bytes {
130        value.clone()
131    }
132    fn deserialize(&self, _topic: &str, bytes: &[u8]) -> Result<Bytes, SerdeError> {
133        Ok(Bytes::copy_from_slice(bytes))
134    }
135}
136
137impl SerdeAssociate for BytesSerde {
138    type Target = Bytes;
139}
140
141/// UTF-8 `String` serde.
142#[derive(Debug, Clone, Copy, Default)]
143pub struct StringSerde;
144impl Serde<String> for StringSerde {
145    fn serialize(&self, _topic: &str, value: &String) -> Bytes {
146        Bytes::copy_from_slice(value.as_bytes())
147    }
148    fn deserialize(&self, _topic: &str, bytes: &[u8]) -> Result<String, SerdeError> {
149        String::from_utf8(bytes.to_vec()).map_err(|e| SerdeError(e.to_string()))
150    }
151}
152
153impl SerdeAssociate for StringSerde {
154    type Target = String;
155}
156
157/// Big-endian 8-byte `i64` serde (matches the JVM `Serdes.Long()`).
158#[derive(Debug, Clone, Copy, Default)]
159pub struct I64Serde;
160impl Serde<i64> for I64Serde {
161    fn serialize(&self, _topic: &str, value: &i64) -> Bytes {
162        Bytes::copy_from_slice(&value.to_be_bytes())
163    }
164    fn deserialize(&self, _topic: &str, bytes: &[u8]) -> Result<i64, SerdeError> {
165        let arr: [u8; 8] = bytes
166            .try_into()
167            .map_err(|_| SerdeError(format!("expected 8 bytes, got {}", bytes.len())))?;
168        Ok(i64::from_be_bytes(arr))
169    }
170}
171
172impl SerdeAssociate for I64Serde {
173    type Target = i64;
174}
175
176/// Mapping trait for types that have a default associated [`Serde`].
177///
178/// If a type does not implement this trait, using `.stream` or other ergonomic
179/// methods will trigger a compiler error indicating that the default serde is not found.
180///
181/// # Examples
182///
183/// ```compile_fail
184/// use crabka_client_streams::{StreamsBuilder, KStream};
185///
186/// struct CustomType;
187///
188/// let builder = StreamsBuilder::new();
189/// // Fails because CustomType does not implement DefaultSerde:
190/// let stream: KStream<CustomType, String> = builder.stream(["topic"]);
191/// ```
192#[diagnostic::on_unimplemented(
193    message = "the type `{Self}` does not have a default Serde implementation",
194    label = "no default Serde association found for this type",
195    note = "implement `DefaultSerde` for `{Self}` or specify/override the serde explicitly using `.with_key_serde()` or `.with_value_serde()`"
196)]
197pub trait DefaultSerde: Send + Sync + 'static + Sized {
198    type Serde: SerdeAssociate<Target = Self> + Serde<Self> + Clone + Default + 'static;
199}
200
201impl DefaultSerde for String {
202    type Serde = StringSerde;
203}
204
205impl DefaultSerde for i64 {
206    type Serde = I64Serde;
207}
208
209impl DefaultSerde for bytes::Bytes {
210    type Serde = BytesSerde;
211}
212
213impl<KS, VS> From<(KS, VS)> for Consumed<KS, VS> {
214    fn from((key_serde, value_serde): (KS, VS)) -> Self {
215        Self::with(key_serde, value_serde)
216    }
217}
218
219impl<KS, VS> From<(KS, VS)> for Produced<KS, VS> {
220    fn from((key_serde, value_serde): (KS, VS)) -> Self {
221        Self::with(key_serde, value_serde)
222    }
223}
224
225/// Write a Kafka signed/zigzag varint (same encoding as `ByteUtils.writeVarint` in the JVM).
226///
227/// The value is zigzag-encoded (`n << 1 ^ n >> 31` for 32-bit equivalent) then
228/// written as a base-128 little-endian varint.  Values ≤ 63 encode to one byte.
229fn write_varint(n: usize, buf: &mut Vec<u8>) {
230    // Zigzag encode: treat as i64 equivalent — all inputs here are non-negative
231    // lengths, so the zigzag is simply n << 1 (sign bit = 0).
232    let mut v = (n as u64) << 1;
233    loop {
234        if v < 0x80 {
235            #[allow(clippy::cast_possible_truncation)]
236            buf.push(v as u8);
237            break;
238        }
239        #[allow(clippy::cast_possible_truncation)]
240        buf.push((v as u8) | 0x80);
241        v >>= 7;
242    }
243}
244
245/// Read a Kafka signed/zigzag varint.
246///
247/// Returns `(decoded_value, bytes_consumed)`.
248fn read_varint(bytes: &[u8]) -> Result<(usize, usize), SerdeError> {
249    let mut value: u64 = 0;
250    let mut shift = 0u32;
251    let mut consumed = 0;
252    for &b in bytes {
253        consumed += 1;
254        let low7 = u64::from(b & 0x7F);
255        value |= low7 << shift;
256        shift += 7;
257        if b & 0x80 == 0 {
258            // Zigzag decode: (value >>> 1) ^ -(value & 1)
259            let decoded = (value >> 1) ^ ((value & 1).wrapping_neg());
260            let n = usize::try_from(decoded)
261                .map_err(|_| SerdeError(format!("varint value {decoded} out of usize range")))?;
262            return Ok((n, consumed));
263        }
264        if shift >= 64 {
265            return Err(SerdeError("varint overflow".to_string()));
266        }
267    }
268    Err(SerdeError("truncated varint".to_string()))
269}
270
271/// [`Serde`] for `Change<V>` — byte-compatible with the JVM
272/// `ChangedSerializer`/`ChangedDeserializer` used on repartition topics when a
273/// `KGroupedTable` re-keys.
274///
275/// # Wire format
276///
277/// | Case | Layout |
278/// |------|--------|
279/// | new only | `new_bytes ++ [0x01]` |
280/// | old only | `old_bytes ++ [0x00]` |
281/// | both     | `varint(new_len) ++ new_bytes ++ old_bytes ++ [0x02]` |
282///
283/// The `varint` is a Kafka signed/zigzag varint (`ByteUtils.writeVarint` / `readVarint`).
284#[derive(Debug, Clone, Copy)]
285pub struct Changed<S> {
286    inner: S,
287}
288
289impl<S> Changed<S> {
290    /// Wrap an inner serde to produce a `Changed<V>` serde.
291    #[must_use]
292    pub fn new(inner: S) -> Self {
293        Self { inner }
294    }
295}
296
297impl<V, S> Serde<crate::dsl::processors::change::Change<V>> for Changed<S>
298where
299    V: Send + Sync + 'static,
300    S: Serde<V>,
301{
302    fn serialize(&self, topic: &str, value: &crate::dsl::processors::change::Change<V>) -> Bytes {
303        match (&value.new, &value.old) {
304            (Some(new), Some(old)) => {
305                let new_bytes = self.inner.serialize(topic, new);
306                let old_bytes = self.inner.serialize(topic, old);
307                let mut buf = Vec::with_capacity(1 + new_bytes.len() + old_bytes.len() + 1);
308                write_varint(new_bytes.len(), &mut buf);
309                buf.extend_from_slice(&new_bytes);
310                buf.extend_from_slice(&old_bytes);
311                buf.push(0x02);
312                Bytes::from(buf)
313            }
314            (Some(new), None) => {
315                let new_bytes = self.inner.serialize(topic, new);
316                let mut buf = Vec::with_capacity(new_bytes.len() + 1);
317                buf.extend_from_slice(&new_bytes);
318                buf.push(0x01);
319                Bytes::from(buf)
320            }
321            (None, Some(old)) => {
322                let old_bytes = self.inner.serialize(topic, old);
323                let mut buf = Vec::with_capacity(old_bytes.len() + 1);
324                buf.extend_from_slice(&old_bytes);
325                buf.push(0x00);
326                Bytes::from(buf)
327            }
328            // The repartition-map only ever forwards combined, subtract-only, or
329            // add-only changes — never a both-`None` change, and the JVM
330            // `ChangedSerializer` likewise rejects an all-null change. Fail loud
331            // rather than silently emit an unsymmetric encoding.
332            (None, None) => unreachable!("Changed serde never carries a both-None change"),
333        }
334    }
335
336    fn deserialize(
337        &self,
338        topic: &str,
339        bytes: &[u8],
340    ) -> Result<crate::dsl::processors::change::Change<V>, SerdeError> {
341        if bytes.is_empty() {
342            return Err(SerdeError("empty bytes for Change".to_string()));
343        }
344        let flag = bytes[bytes.len() - 1];
345        let payload = &bytes[..bytes.len() - 1];
346        match flag {
347            0x00 => {
348                let old = self.inner.deserialize(topic, payload)?;
349                Ok(crate::dsl::processors::change::Change {
350                    old: Some(old),
351                    new: None,
352                })
353            }
354            0x01 => {
355                let new = self.inner.deserialize(topic, payload)?;
356                Ok(crate::dsl::processors::change::Change {
357                    old: None,
358                    new: Some(new),
359                })
360            }
361            0x02 => {
362                let (new_len, consumed) = read_varint(payload)?;
363                let after_varint = &payload[consumed..];
364                if after_varint.len() < new_len {
365                    return Err(SerdeError(format!(
366                        "not enough bytes for new value: need {new_len}, have {}",
367                        after_varint.len()
368                    )));
369                }
370                let new_bytes = &after_varint[..new_len];
371                let old_bytes = &after_varint[new_len..];
372                let new = self.inner.deserialize(topic, new_bytes)?;
373                let old = self.inner.deserialize(topic, old_bytes)?;
374                Ok(crate::dsl::processors::change::Change {
375                    old: Some(old),
376                    new: Some(new),
377                })
378            }
379            other => Err(SerdeError(format!("unknown Change flag: 0x{other:02x}"))),
380        }
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387    use assert2::check;
388
389    #[test]
390    fn string_serde_round_trips() {
391        let s = StringSerde;
392        let b = s.serialize("t", &"héllo".to_string());
393        check!(s.deserialize("t", &b).unwrap() == "héllo");
394    }
395
396    #[test]
397    fn i64_serde_is_big_endian_8_bytes() {
398        let s = I64Serde;
399        let b = s.serialize("t", &1i64);
400        check!(b.as_ref() == [0, 0, 0, 0, 0, 0, 0, 1]);
401        check!(s.deserialize("t", &b).unwrap() == 1);
402        check!(s.deserialize("t", &[0, 1]).is_err());
403    }
404
405    #[test]
406    fn bytes_serde_is_identity() {
407        let s = BytesSerde;
408        let b = s.serialize("t", &bytes::Bytes::from_static(b"xy"));
409        check!(s.deserialize("t", &b).unwrap() == bytes::Bytes::from_static(b"xy"));
410    }
411
412    fn hex(b: &[u8]) -> String {
413        b.iter().fold(String::new(), |mut s, x| {
414            use std::fmt::Write as _;
415            write!(s, "{x:02x}").unwrap();
416            s
417        })
418    }
419
420    fn golden() -> serde_json::Value {
421        let raw = std::fs::read_to_string(concat!(
422            env!("CARGO_MANIFEST_DIR"),
423            "/tests/testdata/kgrouped_table/changed_bytes.json"
424        ))
425        .expect("read changed_bytes golden");
426        serde_json::from_str(&raw).unwrap()
427    }
428
429    #[test]
430    fn changed_long_matches_jvm_bytes() {
431        let g = golden();
432        let s = Changed::new(I64Serde);
433        let both = crate::dsl::processors::change::Change {
434            old: Some(2i64),
435            new: Some(6i64),
436        };
437        let new_only = crate::dsl::processors::change::Change {
438            old: None,
439            new: Some(5i64),
440        };
441        let old_only = crate::dsl::processors::change::Change {
442            old: Some(4i64),
443            new: None,
444        };
445        check!(hex(&s.serialize("topic", &both)) == g["both"].as_str().unwrap());
446        check!(hex(&s.serialize("topic", &new_only)) == g["new_only"].as_str().unwrap());
447        check!(hex(&s.serialize("topic", &old_only)) == g["old_only"].as_str().unwrap());
448    }
449
450    #[test]
451    fn changed_round_trips() {
452        let s = Changed::new(I64Serde);
453        for c in [
454            crate::dsl::processors::change::Change {
455                old: Some(2i64),
456                new: Some(6i64),
457            },
458            crate::dsl::processors::change::Change {
459                old: None,
460                new: Some(5i64),
461            },
462            crate::dsl::processors::change::Change {
463                old: Some(4i64),
464                new: None,
465            },
466        ] {
467            let bytes = s.serialize("topic", &c);
468            let back: crate::dsl::processors::change::Change<i64> =
469                s.deserialize("topic", &bytes).unwrap();
470            check!(back == c);
471        }
472    }
473}