Skip to main content

s2_api/v1/stream/
json.rs

1use base64ct::{Base64, Encoding as _};
2use s2_common::{record, types};
3use serde::{
4    Serialize,
5    ser::{SerializeSeq, SerializeStruct, SerializeTuple},
6};
7
8use crate::data::Format;
9
10pub fn serialize_read_batch(
11    format: Format,
12    batch: &types::stream::ReadBatch,
13) -> impl Serialize + '_ {
14    ReadBatchJson { format, batch }
15}
16
17struct ReadBatchJson<'a> {
18    format: Format,
19    batch: &'a types::stream::ReadBatch,
20}
21
22impl Serialize for ReadBatchJson<'_> {
23    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
24    where
25        S: serde::Serializer,
26    {
27        let mut state =
28            serializer.serialize_struct("ReadBatch", 1 + usize::from(self.batch.tail.is_some()))?;
29        state.serialize_field(
30            "records",
31            &RecordsJson {
32                format: self.format,
33                records: self.batch.records.as_slice(),
34            },
35        )?;
36        if let Some(tail) = self.batch.tail {
37            state.serialize_field("tail", &StreamPositionJson(tail))?;
38        }
39        state.end()
40    }
41}
42
43struct RecordsJson<'a> {
44    format: Format,
45    records: &'a [record::SequencedRecord],
46}
47
48impl Serialize for RecordsJson<'_> {
49    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
50    where
51        S: serde::Serializer,
52    {
53        let mut seq = serializer.serialize_seq(Some(self.records.len()))?;
54        for record in self.records {
55            seq.serialize_element(&RecordJson {
56                format: self.format,
57                record,
58            })?;
59        }
60        seq.end()
61    }
62}
63
64struct RecordJson<'a> {
65    format: Format,
66    record: &'a record::SequencedRecord,
67}
68
69impl Serialize for RecordJson<'_> {
70    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
71    where
72        S: serde::Serializer,
73    {
74        // Some records omit `headers` and/or `body`, but `serde_json` does not rely on an exact
75        // field count here, so keep the fixed upper bound and avoid extra bookkeeping.
76        let mut state = serializer.serialize_struct("SequencedRecord", 4)?;
77        let position = self.record.position();
78        state.serialize_field("seq_num", &position.seq_num)?;
79        state.serialize_field("timestamp", &position.timestamp)?;
80        match self.record.inner() {
81            record::Record::Command(command) => {
82                state.serialize_field(
83                    "headers",
84                    &CommandHeadersJson {
85                        format: self.format,
86                        command,
87                    },
88                )?;
89                match command {
90                    record::CommandRecord::Fence(token) => {
91                        if !token.is_empty() {
92                            state.serialize_field(
93                                "body",
94                                &FormattedBytes {
95                                    format: self.format,
96                                    bytes: token.as_bytes(),
97                                },
98                            )?;
99                        }
100                    }
101                    record::CommandRecord::Trim(trim_point) => {
102                        let bytes = trim_point.to_be_bytes();
103                        state.serialize_field(
104                            "body",
105                            &FormattedBytes {
106                                format: self.format,
107                                bytes: &bytes,
108                            },
109                        )?;
110                    }
111                }
112            }
113            record::Record::Envelope(envelope) => {
114                if !envelope.headers().is_empty() {
115                    state.serialize_field(
116                        "headers",
117                        &HeadersJson {
118                            format: self.format,
119                            headers: envelope.headers(),
120                        },
121                    )?;
122                }
123                if !envelope.body().is_empty() {
124                    state.serialize_field(
125                        "body",
126                        &FormattedBytes {
127                            format: self.format,
128                            bytes: envelope.body().as_ref(),
129                        },
130                    )?;
131                }
132            }
133        }
134        state.end()
135    }
136}
137
138struct HeadersJson<'a> {
139    format: Format,
140    headers: &'a [record::Header],
141}
142
143impl Serialize for HeadersJson<'_> {
144    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
145    where
146        S: serde::Serializer,
147    {
148        let mut seq = serializer.serialize_seq(Some(self.headers.len()))?;
149        for header in self.headers {
150            seq.serialize_element(&HeaderJson {
151                format: self.format,
152                header,
153            })?;
154        }
155        seq.end()
156    }
157}
158
159struct HeaderJson<'a> {
160    format: Format,
161    header: &'a record::Header,
162}
163
164impl Serialize for HeaderJson<'_> {
165    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
166    where
167        S: serde::Serializer,
168    {
169        let mut tuple = serializer.serialize_tuple(2)?;
170        tuple.serialize_element(&FormattedBytes {
171            format: self.format,
172            bytes: self.header.name.as_ref(),
173        })?;
174        tuple.serialize_element(&FormattedBytes {
175            format: self.format,
176            bytes: self.header.value.as_ref(),
177        })?;
178        tuple.end()
179    }
180}
181
182struct CommandHeadersJson<'a> {
183    format: Format,
184    command: &'a record::CommandRecord,
185}
186
187impl Serialize for CommandHeadersJson<'_> {
188    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
189    where
190        S: serde::Serializer,
191    {
192        let mut seq = serializer.serialize_seq(Some(1))?;
193        seq.serialize_element(&CommandHeaderJson {
194            format: self.format,
195            command: self.command,
196        })?;
197        seq.end()
198    }
199}
200
201struct CommandHeaderJson<'a> {
202    format: Format,
203    command: &'a record::CommandRecord,
204}
205
206impl Serialize for CommandHeaderJson<'_> {
207    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
208    where
209        S: serde::Serializer,
210    {
211        let mut tuple = serializer.serialize_tuple(2)?;
212        tuple.serialize_element(&FormattedBytes {
213            format: self.format,
214            bytes: b"",
215        })?;
216        tuple.serialize_element(&FormattedBytes {
217            format: self.format,
218            bytes: self.command.op().to_id(),
219        })?;
220        tuple.end()
221    }
222}
223
224struct StreamPositionJson(record::StreamPosition);
225
226impl Serialize for StreamPositionJson {
227    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
228    where
229        S: serde::Serializer,
230    {
231        let mut state = serializer.serialize_struct("StreamPosition", 2)?;
232        state.serialize_field("seq_num", &self.0.seq_num)?;
233        state.serialize_field("timestamp", &self.0.timestamp)?;
234        state.end()
235    }
236}
237
238struct FormattedBytes<'a> {
239    format: Format,
240    bytes: &'a [u8],
241}
242
243impl Serialize for FormattedBytes<'_> {
244    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
245    where
246        S: serde::Serializer,
247    {
248        match self.format {
249            Format::Raw => serializer.collect_str(&LossyUtf8(self.bytes)),
250            Format::Base64 => serializer.collect_str(&Base64Display(self.bytes)),
251        }
252    }
253}
254
255struct LossyUtf8<'a>(&'a [u8]);
256
257impl std::fmt::Display for LossyUtf8<'_> {
258    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259        use std::fmt::Write as _;
260
261        for chunk in self.0.utf8_chunks() {
262            f.write_str(chunk.valid())?;
263            if !chunk.invalid().is_empty() {
264                f.write_char(char::REPLACEMENT_CHARACTER)?;
265            }
266        }
267        Ok(())
268    }
269}
270
271struct Base64Display<'a>(&'a [u8]);
272
273impl std::fmt::Display for Base64Display<'_> {
274    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275        const INPUT_CHUNK: usize = 3 * 256;
276        const OUTPUT_CHUNK: usize = 4 * 256;
277
278        let mut output = [0u8; OUTPUT_CHUNK];
279        let mut chunks = self.0.chunks_exact(INPUT_CHUNK);
280        for chunk in &mut chunks {
281            let encoded = Base64::encode(chunk, &mut output).map_err(|_| std::fmt::Error)?;
282            f.write_str(encoded)?;
283        }
284
285        let remainder = chunks.remainder();
286        if !remainder.is_empty() {
287            let encoded_len = Base64::encoded_len(remainder);
288            let encoded = Base64::encode(remainder, &mut output[..encoded_len])
289                .map_err(|_| std::fmt::Error)?;
290            f.write_str(encoded)?;
291        }
292
293        Ok(())
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use bytes::Bytes;
300    use s2_common::record::MeteredExt;
301
302    use super::*;
303    use crate::v1::stream::ReadBatch;
304
305    fn fixture_batch() -> types::stream::ReadBatch {
306        let envelope = record::Record::try_from_parts(
307            vec![record::Header {
308                name: Bytes::from_static(b"kind"),
309                value: Bytes::from(vec![b'a', 0xff, b'z']),
310            }],
311            Bytes::from(vec![0xf0, 0x28, 0x8c, 0xbc]),
312        )
313        .expect("valid envelope");
314
315        let empty_fence = record::Record::Command(record::CommandRecord::Fence(
316            "".parse().expect("valid token"),
317        ));
318
319        let non_empty_fence = record::Record::Command(record::CommandRecord::Fence(
320            "token-1".parse().expect("valid token"),
321        ));
322
323        let trim = record::Record::Command(record::CommandRecord::Trim(42));
324
325        types::stream::ReadBatch {
326            records: vec![
327                envelope.metered().sequenced(record::StreamPosition {
328                    seq_num: 7,
329                    timestamp: 11,
330                }),
331                empty_fence.metered().sequenced(record::StreamPosition {
332                    seq_num: 8,
333                    timestamp: 12,
334                }),
335                non_empty_fence.metered().sequenced(record::StreamPosition {
336                    seq_num: 9,
337                    timestamp: 13,
338                }),
339                trim.metered().sequenced(record::StreamPosition {
340                    seq_num: 10,
341                    timestamp: 14,
342                }),
343            ]
344            .into_iter()
345            .collect(),
346            tail: Some(record::StreamPosition {
347                seq_num: 11,
348                timestamp: 15,
349            }),
350        }
351    }
352
353    #[test]
354    fn serialized_batch_matches_existing_json_shape() {
355        let batch = fixture_batch();
356
357        for format in [Format::Raw, Format::Base64] {
358            let expected =
359                serde_json::to_value(ReadBatch::encode(format, batch.clone())).expect("json");
360            let actual = serde_json::to_value(serialize_read_batch(format, &batch)).expect("json");
361            assert_eq!(actual, expected);
362        }
363    }
364}