Skip to main content

s2_api/v1/stream/
json.rs

1use base64ct::{Base64, Encoding as _};
2use s2_common::{record, types};
3use serde::{
4    Serialize,
5    ser::{SerializeSeq, SerializeStruct, SerializeTuple},
6};
7
8use crate::data::Format;
9
10pub fn serialize_read_batch(
11    format: Format,
12    batch: &types::stream::ReadBatch,
13) -> impl Serialize + '_ {
14    ReadBatchJson { format, batch }
15}
16
17struct ReadBatchJson<'a> {
18    format: Format,
19    batch: &'a types::stream::ReadBatch,
20}
21
22impl Serialize for ReadBatchJson<'_> {
23    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
24    where
25        S: serde::Serializer,
26    {
27        let mut state =
28            serializer.serialize_struct("ReadBatch", 1 + usize::from(self.batch.tail.is_some()))?;
29        state.serialize_field(
30            "records",
31            &RecordsJson {
32                format: self.format,
33                records: self.batch.records.as_slice(),
34            },
35        )?;
36        if let Some(tail) = self.batch.tail {
37            state.serialize_field("tail", &StreamPositionJson(tail))?;
38        }
39        state.end()
40    }
41}
42
43struct RecordsJson<'a> {
44    format: Format,
45    records: &'a [record::SequencedRecord],
46}
47
48impl Serialize for RecordsJson<'_> {
49    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
50    where
51        S: serde::Serializer,
52    {
53        let mut seq = serializer.serialize_seq(Some(self.records.len()))?;
54        for record in self.records {
55            seq.serialize_element(&RecordJson {
56                format: self.format,
57                record,
58            })?;
59        }
60        seq.end()
61    }
62}
63
64struct RecordJson<'a> {
65    format: Format,
66    record: &'a record::SequencedRecord,
67}
68
69impl Serialize for RecordJson<'_> {
70    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
71    where
72        S: serde::Serializer,
73    {
74        // Some records omit `headers` and/or `body`, but `serde_json` does not rely on an exact
75        // field count here, so keep the fixed upper bound and avoid extra bookkeeping.
76        let mut state = serializer.serialize_struct("SequencedRecord", 4)?;
77        state.serialize_field("seq_num", &self.record.position.seq_num)?;
78        state.serialize_field("timestamp", &self.record.position.timestamp)?;
79        match &self.record.record {
80            record::Record::Command(command) => {
81                state.serialize_field(
82                    "headers",
83                    &CommandHeadersJson {
84                        format: self.format,
85                        command,
86                    },
87                )?;
88                match command {
89                    record::CommandRecord::Fence(token) => {
90                        if !token.is_empty() {
91                            state.serialize_field(
92                                "body",
93                                &FormattedBytes {
94                                    format: self.format,
95                                    bytes: token.as_bytes(),
96                                },
97                            )?;
98                        }
99                    }
100                    record::CommandRecord::Trim(trim_point) => {
101                        let bytes = trim_point.to_be_bytes();
102                        state.serialize_field(
103                            "body",
104                            &FormattedBytes {
105                                format: self.format,
106                                bytes: &bytes,
107                            },
108                        )?;
109                    }
110                }
111            }
112            record::Record::Envelope(envelope) => {
113                if !envelope.headers().is_empty() {
114                    state.serialize_field(
115                        "headers",
116                        &HeadersJson {
117                            format: self.format,
118                            headers: envelope.headers(),
119                        },
120                    )?;
121                }
122                if !envelope.body().is_empty() {
123                    state.serialize_field(
124                        "body",
125                        &FormattedBytes {
126                            format: self.format,
127                            bytes: envelope.body().as_ref(),
128                        },
129                    )?;
130                }
131            }
132        }
133        state.end()
134    }
135}
136
137struct HeadersJson<'a> {
138    format: Format,
139    headers: &'a [record::Header],
140}
141
142impl Serialize for HeadersJson<'_> {
143    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
144    where
145        S: serde::Serializer,
146    {
147        let mut seq = serializer.serialize_seq(Some(self.headers.len()))?;
148        for header in self.headers {
149            seq.serialize_element(&HeaderJson {
150                format: self.format,
151                header,
152            })?;
153        }
154        seq.end()
155    }
156}
157
158struct HeaderJson<'a> {
159    format: Format,
160    header: &'a record::Header,
161}
162
163impl Serialize for HeaderJson<'_> {
164    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
165    where
166        S: serde::Serializer,
167    {
168        let mut tuple = serializer.serialize_tuple(2)?;
169        tuple.serialize_element(&FormattedBytes {
170            format: self.format,
171            bytes: self.header.name.as_ref(),
172        })?;
173        tuple.serialize_element(&FormattedBytes {
174            format: self.format,
175            bytes: self.header.value.as_ref(),
176        })?;
177        tuple.end()
178    }
179}
180
181struct CommandHeadersJson<'a> {
182    format: Format,
183    command: &'a record::CommandRecord,
184}
185
186impl Serialize for CommandHeadersJson<'_> {
187    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
188    where
189        S: serde::Serializer,
190    {
191        let mut seq = serializer.serialize_seq(Some(1))?;
192        seq.serialize_element(&CommandHeaderJson {
193            format: self.format,
194            command: self.command,
195        })?;
196        seq.end()
197    }
198}
199
200struct CommandHeaderJson<'a> {
201    format: Format,
202    command: &'a record::CommandRecord,
203}
204
205impl Serialize for CommandHeaderJson<'_> {
206    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
207    where
208        S: serde::Serializer,
209    {
210        let mut tuple = serializer.serialize_tuple(2)?;
211        tuple.serialize_element(&FormattedBytes {
212            format: self.format,
213            bytes: b"",
214        })?;
215        tuple.serialize_element(&FormattedBytes {
216            format: self.format,
217            bytes: self.command.op().to_id(),
218        })?;
219        tuple.end()
220    }
221}
222
223struct StreamPositionJson(record::StreamPosition);
224
225impl Serialize for StreamPositionJson {
226    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
227    where
228        S: serde::Serializer,
229    {
230        let mut state = serializer.serialize_struct("StreamPosition", 2)?;
231        state.serialize_field("seq_num", &self.0.seq_num)?;
232        state.serialize_field("timestamp", &self.0.timestamp)?;
233        state.end()
234    }
235}
236
237struct FormattedBytes<'a> {
238    format: Format,
239    bytes: &'a [u8],
240}
241
242impl Serialize for FormattedBytes<'_> {
243    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
244    where
245        S: serde::Serializer,
246    {
247        match self.format {
248            Format::Raw => serializer.collect_str(&LossyUtf8(self.bytes)),
249            Format::Base64 => serializer.collect_str(&Base64Display(self.bytes)),
250        }
251    }
252}
253
254struct LossyUtf8<'a>(&'a [u8]);
255
256impl std::fmt::Display for LossyUtf8<'_> {
257    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258        use std::fmt::Write as _;
259
260        for chunk in self.0.utf8_chunks() {
261            f.write_str(chunk.valid())?;
262            if !chunk.invalid().is_empty() {
263                f.write_char(char::REPLACEMENT_CHARACTER)?;
264            }
265        }
266        Ok(())
267    }
268}
269
270struct Base64Display<'a>(&'a [u8]);
271
272impl std::fmt::Display for Base64Display<'_> {
273    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274        const INPUT_CHUNK: usize = 3 * 256;
275        const OUTPUT_CHUNK: usize = 4 * 256;
276
277        let mut output = [0u8; OUTPUT_CHUNK];
278        let mut chunks = self.0.chunks_exact(INPUT_CHUNK);
279        for chunk in &mut chunks {
280            let encoded = Base64::encode(chunk, &mut output).map_err(|_| std::fmt::Error)?;
281            f.write_str(encoded)?;
282        }
283
284        let remainder = chunks.remainder();
285        if !remainder.is_empty() {
286            let encoded_len = Base64::encoded_len(remainder);
287            let encoded = Base64::encode(remainder, &mut output[..encoded_len])
288                .map_err(|_| std::fmt::Error)?;
289            f.write_str(encoded)?;
290        }
291
292        Ok(())
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use bytes::Bytes;
299
300    use super::*;
301    use crate::v1::stream::ReadBatch;
302
303    fn fixture_batch() -> types::stream::ReadBatch {
304        let envelope = record::Record::try_from_parts(
305            vec![record::Header {
306                name: Bytes::from_static(b"kind"),
307                value: Bytes::from(vec![b'a', 0xff, b'z']),
308            }],
309            Bytes::from(vec![0xf0, 0x28, 0x8c, 0xbc]),
310        )
311        .expect("valid envelope");
312
313        let empty_fence = record::Record::Command(record::CommandRecord::Fence(
314            "".parse().expect("valid token"),
315        ));
316
317        let non_empty_fence = record::Record::Command(record::CommandRecord::Fence(
318            "token-1".parse().expect("valid token"),
319        ));
320
321        let trim = record::Record::Command(record::CommandRecord::Trim(42));
322
323        types::stream::ReadBatch {
324            records: vec![
325                record::Metered::from(envelope).sequenced(record::StreamPosition {
326                    seq_num: 7,
327                    timestamp: 11,
328                }),
329                record::Metered::from(empty_fence).sequenced(record::StreamPosition {
330                    seq_num: 8,
331                    timestamp: 12,
332                }),
333                record::Metered::from(non_empty_fence).sequenced(record::StreamPosition {
334                    seq_num: 9,
335                    timestamp: 13,
336                }),
337                record::Metered::from(trim).sequenced(record::StreamPosition {
338                    seq_num: 10,
339                    timestamp: 14,
340                }),
341            ]
342            .into_iter()
343            .collect(),
344            tail: Some(record::StreamPosition {
345                seq_num: 11,
346                timestamp: 15,
347            }),
348        }
349    }
350
351    #[test]
352    fn serialized_batch_matches_existing_json_shape() {
353        let batch = fixture_batch();
354
355        for format in [Format::Raw, Format::Base64] {
356            let expected =
357                serde_json::to_value(ReadBatch::encode(format, batch.clone())).expect("json");
358            let actual = serde_json::to_value(serialize_read_batch(format, &batch)).expect("json");
359            assert_eq!(actual, expected);
360        }
361    }
362}