1use bytes::Bytes;
4
5#[derive(Debug, thiserror::Error)]
7#[error("deserialization error: {0}")]
8pub struct SerdeError(pub String);
9
10#[diagnostic::on_unimplemented(
13 message = "the type `{Self}` is not a valid serializer/deserializer for `{T}`",
14 label = "does not implement `Serde<{T}>`",
15 note = "implement `Serde<{T}>` for `{Self}` or verify that the type parameter `{T}` matches the deserialized type of this serde"
16)]
17pub trait Serde<T>: Send + Sync + 'static {
18 fn serialize(&self, topic: &str, value: &T) -> Bytes;
19 fn deserialize(&self, topic: &str, bytes: &[u8]) -> Result<T, SerdeError>;
20
21 fn prepare(&self, _topic: &str, _role: SerdeRole) {}
25}
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum SerdeRole {
31 Key,
32 Value,
33}
34
35#[diagnostic::on_unimplemented(
37 message = "the serde type `{Self}` is not associated with any target model type",
38 label = "does not implement `SerdeAssociate`",
39 note = "implement `SerdeAssociate` for `{Self}` and define `type Target = ...;` to map this Serde to its model type"
40)]
41pub trait SerdeAssociate {
42 type Target: Send + Sync + 'static;
43}
44
45#[derive(Debug, Clone, Copy)]
54pub struct Consumed<KS, VS> {
55 pub(crate) key_serde: KS,
56 pub(crate) value_serde: VS,
57}
58
59impl<KS, VS> Consumed<KS, VS> {
60 #[must_use]
63 pub fn with(key_serde: KS, value_serde: VS) -> Self {
64 Self {
65 key_serde,
66 value_serde,
67 }
68 }
69}
70
71#[derive(Debug, Clone, Copy)]
80pub struct Produced<KS, VS> {
81 pub(crate) key_serde: KS,
82 pub(crate) value_serde: VS,
83}
84
85impl<KS, VS> Produced<KS, VS> {
86 #[must_use]
89 pub fn with(key_serde: KS, value_serde: VS) -> Self {
90 Self {
91 key_serde,
92 value_serde,
93 }
94 }
95}
96
97pub(crate) struct SerdeArc<T>(pub(crate) std::sync::Arc<dyn Serde<T>>);
102
103impl<T> Clone for SerdeArc<T> {
104 fn clone(&self) -> Self {
105 SerdeArc(std::sync::Arc::clone(&self.0))
106 }
107}
108
109impl<T: Send + Sync + 'static> Serde<T> for SerdeArc<T> {
110 fn serialize(&self, topic: &str, value: &T) -> Bytes {
111 self.0.serialize(topic, value)
112 }
113 fn deserialize(&self, topic: &str, bytes: &[u8]) -> Result<T, SerdeError> {
114 self.0.deserialize(topic, bytes)
115 }
116 fn prepare(&self, topic: &str, role: SerdeRole) {
117 self.0.prepare(topic, role);
118 }
119}
120
121impl<T: Send + Sync + 'static> SerdeAssociate for SerdeArc<T> {
122 type Target = T;
123}
124
125#[derive(Debug, Clone, Copy, Default)]
127pub struct BytesSerde;
128impl Serde<Bytes> for BytesSerde {
129 fn serialize(&self, _topic: &str, value: &Bytes) -> Bytes {
130 value.clone()
131 }
132 fn deserialize(&self, _topic: &str, bytes: &[u8]) -> Result<Bytes, SerdeError> {
133 Ok(Bytes::copy_from_slice(bytes))
134 }
135}
136
137impl SerdeAssociate for BytesSerde {
138 type Target = Bytes;
139}
140
141#[derive(Debug, Clone, Copy, Default)]
143pub struct StringSerde;
144impl Serde<String> for StringSerde {
145 fn serialize(&self, _topic: &str, value: &String) -> Bytes {
146 Bytes::copy_from_slice(value.as_bytes())
147 }
148 fn deserialize(&self, _topic: &str, bytes: &[u8]) -> Result<String, SerdeError> {
149 String::from_utf8(bytes.to_vec()).map_err(|e| SerdeError(e.to_string()))
150 }
151}
152
153impl SerdeAssociate for StringSerde {
154 type Target = String;
155}
156
157#[derive(Debug, Clone, Copy, Default)]
159pub struct I64Serde;
160impl Serde<i64> for I64Serde {
161 fn serialize(&self, _topic: &str, value: &i64) -> Bytes {
162 Bytes::copy_from_slice(&value.to_be_bytes())
163 }
164 fn deserialize(&self, _topic: &str, bytes: &[u8]) -> Result<i64, SerdeError> {
165 let arr: [u8; 8] = bytes
166 .try_into()
167 .map_err(|_| SerdeError(format!("expected 8 bytes, got {}", bytes.len())))?;
168 Ok(i64::from_be_bytes(arr))
169 }
170}
171
172impl SerdeAssociate for I64Serde {
173 type Target = i64;
174}
175
176#[diagnostic::on_unimplemented(
193 message = "the type `{Self}` does not have a default Serde implementation",
194 label = "no default Serde association found for this type",
195 note = "implement `DefaultSerde` for `{Self}` or specify/override the serde explicitly using `.with_key_serde()` or `.with_value_serde()`"
196)]
197pub trait DefaultSerde: Send + Sync + 'static + Sized {
198 type Serde: SerdeAssociate<Target = Self> + Serde<Self> + Clone + Default + 'static;
199}
200
201impl DefaultSerde for String {
202 type Serde = StringSerde;
203}
204
205impl DefaultSerde for i64 {
206 type Serde = I64Serde;
207}
208
209impl DefaultSerde for bytes::Bytes {
210 type Serde = BytesSerde;
211}
212
213impl<KS, VS> From<(KS, VS)> for Consumed<KS, VS> {
214 fn from((key_serde, value_serde): (KS, VS)) -> Self {
215 Self::with(key_serde, value_serde)
216 }
217}
218
219impl<KS, VS> From<(KS, VS)> for Produced<KS, VS> {
220 fn from((key_serde, value_serde): (KS, VS)) -> Self {
221 Self::with(key_serde, value_serde)
222 }
223}
224
225fn write_varint(n: usize, buf: &mut Vec<u8>) {
230 let mut v = (n as u64) << 1;
233 loop {
234 if v < 0x80 {
235 #[allow(clippy::cast_possible_truncation)]
236 buf.push(v as u8);
237 break;
238 }
239 #[allow(clippy::cast_possible_truncation)]
240 buf.push((v as u8) | 0x80);
241 v >>= 7;
242 }
243}
244
245fn read_varint(bytes: &[u8]) -> Result<(usize, usize), SerdeError> {
249 let mut value: u64 = 0;
250 let mut shift = 0u32;
251 let mut consumed = 0;
252 for &b in bytes {
253 consumed += 1;
254 let low7 = u64::from(b & 0x7F);
255 value |= low7 << shift;
256 shift += 7;
257 if b & 0x80 == 0 {
258 let decoded = (value >> 1) ^ ((value & 1).wrapping_neg());
260 let n = usize::try_from(decoded)
261 .map_err(|_| SerdeError(format!("varint value {decoded} out of usize range")))?;
262 return Ok((n, consumed));
263 }
264 if shift >= 64 {
265 return Err(SerdeError("varint overflow".to_string()));
266 }
267 }
268 Err(SerdeError("truncated varint".to_string()))
269}
270
271#[derive(Debug, Clone, Copy)]
285pub struct Changed<S> {
286 inner: S,
287}
288
289impl<S> Changed<S> {
290 #[must_use]
292 pub fn new(inner: S) -> Self {
293 Self { inner }
294 }
295}
296
297impl<V, S> Serde<crate::dsl::processors::change::Change<V>> for Changed<S>
298where
299 V: Send + Sync + 'static,
300 S: Serde<V>,
301{
302 fn serialize(&self, topic: &str, value: &crate::dsl::processors::change::Change<V>) -> Bytes {
303 match (&value.new, &value.old) {
304 (Some(new), Some(old)) => {
305 let new_bytes = self.inner.serialize(topic, new);
306 let old_bytes = self.inner.serialize(topic, old);
307 let mut buf = Vec::with_capacity(1 + new_bytes.len() + old_bytes.len() + 1);
308 write_varint(new_bytes.len(), &mut buf);
309 buf.extend_from_slice(&new_bytes);
310 buf.extend_from_slice(&old_bytes);
311 buf.push(0x02);
312 Bytes::from(buf)
313 }
314 (Some(new), None) => {
315 let new_bytes = self.inner.serialize(topic, new);
316 let mut buf = Vec::with_capacity(new_bytes.len() + 1);
317 buf.extend_from_slice(&new_bytes);
318 buf.push(0x01);
319 Bytes::from(buf)
320 }
321 (None, Some(old)) => {
322 let old_bytes = self.inner.serialize(topic, old);
323 let mut buf = Vec::with_capacity(old_bytes.len() + 1);
324 buf.extend_from_slice(&old_bytes);
325 buf.push(0x00);
326 Bytes::from(buf)
327 }
328 (None, None) => unreachable!("Changed serde never carries a both-None change"),
333 }
334 }
335
336 fn deserialize(
337 &self,
338 topic: &str,
339 bytes: &[u8],
340 ) -> Result<crate::dsl::processors::change::Change<V>, SerdeError> {
341 if bytes.is_empty() {
342 return Err(SerdeError("empty bytes for Change".to_string()));
343 }
344 let flag = bytes[bytes.len() - 1];
345 let payload = &bytes[..bytes.len() - 1];
346 match flag {
347 0x00 => {
348 let old = self.inner.deserialize(topic, payload)?;
349 Ok(crate::dsl::processors::change::Change {
350 old: Some(old),
351 new: None,
352 })
353 }
354 0x01 => {
355 let new = self.inner.deserialize(topic, payload)?;
356 Ok(crate::dsl::processors::change::Change {
357 old: None,
358 new: Some(new),
359 })
360 }
361 0x02 => {
362 let (new_len, consumed) = read_varint(payload)?;
363 let after_varint = &payload[consumed..];
364 if after_varint.len() < new_len {
365 return Err(SerdeError(format!(
366 "not enough bytes for new value: need {new_len}, have {}",
367 after_varint.len()
368 )));
369 }
370 let new_bytes = &after_varint[..new_len];
371 let old_bytes = &after_varint[new_len..];
372 let new = self.inner.deserialize(topic, new_bytes)?;
373 let old = self.inner.deserialize(topic, old_bytes)?;
374 Ok(crate::dsl::processors::change::Change {
375 old: Some(old),
376 new: Some(new),
377 })
378 }
379 other => Err(SerdeError(format!("unknown Change flag: 0x{other:02x}"))),
380 }
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387 use assert2::check;
388
389 #[test]
390 fn string_serde_round_trips() {
391 let s = StringSerde;
392 let b = s.serialize("t", &"héllo".to_string());
393 check!(s.deserialize("t", &b).unwrap() == "héllo");
394 }
395
396 #[test]
397 fn i64_serde_is_big_endian_8_bytes() {
398 let s = I64Serde;
399 let b = s.serialize("t", &1i64);
400 check!(b.as_ref() == [0, 0, 0, 0, 0, 0, 0, 1]);
401 check!(s.deserialize("t", &b).unwrap() == 1);
402 check!(s.deserialize("t", &[0, 1]).is_err());
403 }
404
405 #[test]
406 fn bytes_serde_is_identity() {
407 let s = BytesSerde;
408 let b = s.serialize("t", &bytes::Bytes::from_static(b"xy"));
409 check!(s.deserialize("t", &b).unwrap() == bytes::Bytes::from_static(b"xy"));
410 }
411
412 fn hex(b: &[u8]) -> String {
413 b.iter().fold(String::new(), |mut s, x| {
414 use std::fmt::Write as _;
415 write!(s, "{x:02x}").unwrap();
416 s
417 })
418 }
419
420 fn golden() -> serde_json::Value {
421 let raw = std::fs::read_to_string(concat!(
422 env!("CARGO_MANIFEST_DIR"),
423 "/tests/testdata/kgrouped_table/changed_bytes.json"
424 ))
425 .expect("read changed_bytes golden");
426 serde_json::from_str(&raw).unwrap()
427 }
428
429 #[test]
430 fn changed_long_matches_jvm_bytes() {
431 let g = golden();
432 let s = Changed::new(I64Serde);
433 let both = crate::dsl::processors::change::Change {
434 old: Some(2i64),
435 new: Some(6i64),
436 };
437 let new_only = crate::dsl::processors::change::Change {
438 old: None,
439 new: Some(5i64),
440 };
441 let old_only = crate::dsl::processors::change::Change {
442 old: Some(4i64),
443 new: None,
444 };
445 check!(hex(&s.serialize("topic", &both)) == g["both"].as_str().unwrap());
446 check!(hex(&s.serialize("topic", &new_only)) == g["new_only"].as_str().unwrap());
447 check!(hex(&s.serialize("topic", &old_only)) == g["old_only"].as_str().unwrap());
448 }
449
450 #[test]
451 fn changed_round_trips() {
452 let s = Changed::new(I64Serde);
453 for c in [
454 crate::dsl::processors::change::Change {
455 old: Some(2i64),
456 new: Some(6i64),
457 },
458 crate::dsl::processors::change::Change {
459 old: None,
460 new: Some(5i64),
461 },
462 crate::dsl::processors::change::Change {
463 old: Some(4i64),
464 new: None,
465 },
466 ] {
467 let bytes = s.serialize("topic", &c);
468 let back: crate::dsl::processors::change::Change<i64> =
469 s.deserialize("topic", &bytes).unwrap();
470 check!(back == c);
471 }
472 }
473}