1use base64ct::{Base64, Encoding as _};
2use s2_common::{record, types};
3use serde::{
4 Serialize,
5 ser::{SerializeSeq, SerializeStruct, SerializeTuple},
6};
7
8use crate::data::Format;
9
10pub fn serialize_read_batch(
11 format: Format,
12 batch: &types::stream::ReadBatch,
13) -> impl Serialize + '_ {
14 ReadBatchJson { format, batch }
15}
16
17struct ReadBatchJson<'a> {
18 format: Format,
19 batch: &'a types::stream::ReadBatch,
20}
21
22impl Serialize for ReadBatchJson<'_> {
23 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
24 where
25 S: serde::Serializer,
26 {
27 let mut state =
28 serializer.serialize_struct("ReadBatch", 1 + usize::from(self.batch.tail.is_some()))?;
29 state.serialize_field(
30 "records",
31 &RecordsJson {
32 format: self.format,
33 records: self.batch.records.as_slice(),
34 },
35 )?;
36 if let Some(tail) = self.batch.tail {
37 state.serialize_field("tail", &StreamPositionJson(tail))?;
38 }
39 state.end()
40 }
41}
42
43struct RecordsJson<'a> {
44 format: Format,
45 records: &'a [record::SequencedRecord],
46}
47
48impl Serialize for RecordsJson<'_> {
49 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
50 where
51 S: serde::Serializer,
52 {
53 let mut seq = serializer.serialize_seq(Some(self.records.len()))?;
54 for record in self.records {
55 seq.serialize_element(&RecordJson {
56 format: self.format,
57 record,
58 })?;
59 }
60 seq.end()
61 }
62}
63
64struct RecordJson<'a> {
65 format: Format,
66 record: &'a record::SequencedRecord,
67}
68
69impl Serialize for RecordJson<'_> {
70 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
71 where
72 S: serde::Serializer,
73 {
74 let mut state = serializer.serialize_struct("SequencedRecord", 4)?;
77 state.serialize_field("seq_num", &self.record.position.seq_num)?;
78 state.serialize_field("timestamp", &self.record.position.timestamp)?;
79 match &self.record.record {
80 record::Record::Command(command) => {
81 state.serialize_field(
82 "headers",
83 &CommandHeadersJson {
84 format: self.format,
85 command,
86 },
87 )?;
88 match command {
89 record::CommandRecord::Fence(token) => {
90 if !token.is_empty() {
91 state.serialize_field(
92 "body",
93 &FormattedBytes {
94 format: self.format,
95 bytes: token.as_bytes(),
96 },
97 )?;
98 }
99 }
100 record::CommandRecord::Trim(trim_point) => {
101 let bytes = trim_point.to_be_bytes();
102 state.serialize_field(
103 "body",
104 &FormattedBytes {
105 format: self.format,
106 bytes: &bytes,
107 },
108 )?;
109 }
110 }
111 }
112 record::Record::Envelope(envelope) => {
113 if !envelope.headers().is_empty() {
114 state.serialize_field(
115 "headers",
116 &HeadersJson {
117 format: self.format,
118 headers: envelope.headers(),
119 },
120 )?;
121 }
122 if !envelope.body().is_empty() {
123 state.serialize_field(
124 "body",
125 &FormattedBytes {
126 format: self.format,
127 bytes: envelope.body().as_ref(),
128 },
129 )?;
130 }
131 }
132 }
133 state.end()
134 }
135}
136
137struct HeadersJson<'a> {
138 format: Format,
139 headers: &'a [record::Header],
140}
141
142impl Serialize for HeadersJson<'_> {
143 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
144 where
145 S: serde::Serializer,
146 {
147 let mut seq = serializer.serialize_seq(Some(self.headers.len()))?;
148 for header in self.headers {
149 seq.serialize_element(&HeaderJson {
150 format: self.format,
151 header,
152 })?;
153 }
154 seq.end()
155 }
156}
157
158struct HeaderJson<'a> {
159 format: Format,
160 header: &'a record::Header,
161}
162
163impl Serialize for HeaderJson<'_> {
164 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
165 where
166 S: serde::Serializer,
167 {
168 let mut tuple = serializer.serialize_tuple(2)?;
169 tuple.serialize_element(&FormattedBytes {
170 format: self.format,
171 bytes: self.header.name.as_ref(),
172 })?;
173 tuple.serialize_element(&FormattedBytes {
174 format: self.format,
175 bytes: self.header.value.as_ref(),
176 })?;
177 tuple.end()
178 }
179}
180
181struct CommandHeadersJson<'a> {
182 format: Format,
183 command: &'a record::CommandRecord,
184}
185
186impl Serialize for CommandHeadersJson<'_> {
187 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
188 where
189 S: serde::Serializer,
190 {
191 let mut seq = serializer.serialize_seq(Some(1))?;
192 seq.serialize_element(&CommandHeaderJson {
193 format: self.format,
194 command: self.command,
195 })?;
196 seq.end()
197 }
198}
199
200struct CommandHeaderJson<'a> {
201 format: Format,
202 command: &'a record::CommandRecord,
203}
204
205impl Serialize for CommandHeaderJson<'_> {
206 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
207 where
208 S: serde::Serializer,
209 {
210 let mut tuple = serializer.serialize_tuple(2)?;
211 tuple.serialize_element(&FormattedBytes {
212 format: self.format,
213 bytes: b"",
214 })?;
215 tuple.serialize_element(&FormattedBytes {
216 format: self.format,
217 bytes: self.command.op().to_id(),
218 })?;
219 tuple.end()
220 }
221}
222
223struct StreamPositionJson(record::StreamPosition);
224
225impl Serialize for StreamPositionJson {
226 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
227 where
228 S: serde::Serializer,
229 {
230 let mut state = serializer.serialize_struct("StreamPosition", 2)?;
231 state.serialize_field("seq_num", &self.0.seq_num)?;
232 state.serialize_field("timestamp", &self.0.timestamp)?;
233 state.end()
234 }
235}
236
237struct FormattedBytes<'a> {
238 format: Format,
239 bytes: &'a [u8],
240}
241
242impl Serialize for FormattedBytes<'_> {
243 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
244 where
245 S: serde::Serializer,
246 {
247 match self.format {
248 Format::Raw => serializer.collect_str(&LossyUtf8(self.bytes)),
249 Format::Base64 => serializer.collect_str(&Base64Display(self.bytes)),
250 }
251 }
252}
253
254struct LossyUtf8<'a>(&'a [u8]);
255
256impl std::fmt::Display for LossyUtf8<'_> {
257 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258 use std::fmt::Write as _;
259
260 for chunk in self.0.utf8_chunks() {
261 f.write_str(chunk.valid())?;
262 if !chunk.invalid().is_empty() {
263 f.write_char(char::REPLACEMENT_CHARACTER)?;
264 }
265 }
266 Ok(())
267 }
268}
269
270struct Base64Display<'a>(&'a [u8]);
271
272impl std::fmt::Display for Base64Display<'_> {
273 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274 const INPUT_CHUNK: usize = 3 * 256;
275 const OUTPUT_CHUNK: usize = 4 * 256;
276
277 let mut output = [0u8; OUTPUT_CHUNK];
278 let mut chunks = self.0.chunks_exact(INPUT_CHUNK);
279 for chunk in &mut chunks {
280 let encoded = Base64::encode(chunk, &mut output).map_err(|_| std::fmt::Error)?;
281 f.write_str(encoded)?;
282 }
283
284 let remainder = chunks.remainder();
285 if !remainder.is_empty() {
286 let encoded_len = Base64::encoded_len(remainder);
287 let encoded = Base64::encode(remainder, &mut output[..encoded_len])
288 .map_err(|_| std::fmt::Error)?;
289 f.write_str(encoded)?;
290 }
291
292 Ok(())
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use bytes::Bytes;
299
300 use super::*;
301 use crate::v1::stream::ReadBatch;
302
303 fn fixture_batch() -> types::stream::ReadBatch {
304 let envelope = record::Record::try_from_parts(
305 vec![record::Header {
306 name: Bytes::from_static(b"kind"),
307 value: Bytes::from(vec![b'a', 0xff, b'z']),
308 }],
309 Bytes::from(vec![0xf0, 0x28, 0x8c, 0xbc]),
310 )
311 .expect("valid envelope");
312
313 let empty_fence = record::Record::Command(record::CommandRecord::Fence(
314 "".parse().expect("valid token"),
315 ));
316
317 let non_empty_fence = record::Record::Command(record::CommandRecord::Fence(
318 "token-1".parse().expect("valid token"),
319 ));
320
321 let trim = record::Record::Command(record::CommandRecord::Trim(42));
322
323 types::stream::ReadBatch {
324 records: vec![
325 record::Metered::from(envelope).sequenced(record::StreamPosition {
326 seq_num: 7,
327 timestamp: 11,
328 }),
329 record::Metered::from(empty_fence).sequenced(record::StreamPosition {
330 seq_num: 8,
331 timestamp: 12,
332 }),
333 record::Metered::from(non_empty_fence).sequenced(record::StreamPosition {
334 seq_num: 9,
335 timestamp: 13,
336 }),
337 record::Metered::from(trim).sequenced(record::StreamPosition {
338 seq_num: 10,
339 timestamp: 14,
340 }),
341 ]
342 .into_iter()
343 .collect(),
344 tail: Some(record::StreamPosition {
345 seq_num: 11,
346 timestamp: 15,
347 }),
348 }
349 }
350
351 #[test]
352 fn serialized_batch_matches_existing_json_shape() {
353 let batch = fixture_batch();
354
355 for format in [Format::Raw, Format::Base64] {
356 let expected =
357 serde_json::to_value(ReadBatch::encode(format, batch.clone())).expect("json");
358 let actual = serde_json::to_value(serialize_read_batch(format, &batch)).expect("json");
359 assert_eq!(actual, expected);
360 }
361 }
362}