1use base64ct::{Base64, Encoding as _};
2use s2_common::{record, types};
3use serde::{
4 Serialize,
5 ser::{SerializeSeq, SerializeStruct, SerializeTuple},
6};
7
8use crate::data::Format;
9
10pub fn serialize_read_batch(
11 format: Format,
12 batch: &types::stream::ReadBatch,
13) -> impl Serialize + '_ {
14 ReadBatchJson { format, batch }
15}
16
17struct ReadBatchJson<'a> {
18 format: Format,
19 batch: &'a types::stream::ReadBatch,
20}
21
22impl Serialize for ReadBatchJson<'_> {
23 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
24 where
25 S: serde::Serializer,
26 {
27 let mut state =
28 serializer.serialize_struct("ReadBatch", 1 + usize::from(self.batch.tail.is_some()))?;
29 state.serialize_field(
30 "records",
31 &RecordsJson {
32 format: self.format,
33 records: self.batch.records.as_slice(),
34 },
35 )?;
36 if let Some(tail) = self.batch.tail {
37 state.serialize_field("tail", &StreamPositionJson(tail))?;
38 }
39 state.end()
40 }
41}
42
43struct RecordsJson<'a> {
44 format: Format,
45 records: &'a [record::SequencedRecord],
46}
47
48impl Serialize for RecordsJson<'_> {
49 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
50 where
51 S: serde::Serializer,
52 {
53 let mut seq = serializer.serialize_seq(Some(self.records.len()))?;
54 for record in self.records {
55 seq.serialize_element(&RecordJson {
56 format: self.format,
57 record,
58 })?;
59 }
60 seq.end()
61 }
62}
63
64struct RecordJson<'a> {
65 format: Format,
66 record: &'a record::SequencedRecord,
67}
68
69impl Serialize for RecordJson<'_> {
70 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
71 where
72 S: serde::Serializer,
73 {
74 let mut state = serializer.serialize_struct("SequencedRecord", 4)?;
77 let position = self.record.position();
78 state.serialize_field("seq_num", &position.seq_num)?;
79 state.serialize_field("timestamp", &position.timestamp)?;
80 match self.record.inner() {
81 record::Record::Command(command) => {
82 state.serialize_field(
83 "headers",
84 &CommandHeadersJson {
85 format: self.format,
86 command,
87 },
88 )?;
89 match command {
90 record::CommandRecord::Fence(token) => {
91 if !token.is_empty() {
92 state.serialize_field(
93 "body",
94 &FormattedBytes {
95 format: self.format,
96 bytes: token.as_bytes(),
97 },
98 )?;
99 }
100 }
101 record::CommandRecord::Trim(trim_point) => {
102 let bytes = trim_point.to_be_bytes();
103 state.serialize_field(
104 "body",
105 &FormattedBytes {
106 format: self.format,
107 bytes: &bytes,
108 },
109 )?;
110 }
111 }
112 }
113 record::Record::Envelope(envelope) => {
114 if !envelope.headers().is_empty() {
115 state.serialize_field(
116 "headers",
117 &HeadersJson {
118 format: self.format,
119 headers: envelope.headers(),
120 },
121 )?;
122 }
123 if !envelope.body().is_empty() {
124 state.serialize_field(
125 "body",
126 &FormattedBytes {
127 format: self.format,
128 bytes: envelope.body().as_ref(),
129 },
130 )?;
131 }
132 }
133 }
134 state.end()
135 }
136}
137
138struct HeadersJson<'a> {
139 format: Format,
140 headers: &'a [record::Header],
141}
142
143impl Serialize for HeadersJson<'_> {
144 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
145 where
146 S: serde::Serializer,
147 {
148 let mut seq = serializer.serialize_seq(Some(self.headers.len()))?;
149 for header in self.headers {
150 seq.serialize_element(&HeaderJson {
151 format: self.format,
152 header,
153 })?;
154 }
155 seq.end()
156 }
157}
158
159struct HeaderJson<'a> {
160 format: Format,
161 header: &'a record::Header,
162}
163
164impl Serialize for HeaderJson<'_> {
165 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
166 where
167 S: serde::Serializer,
168 {
169 let mut tuple = serializer.serialize_tuple(2)?;
170 tuple.serialize_element(&FormattedBytes {
171 format: self.format,
172 bytes: self.header.name.as_ref(),
173 })?;
174 tuple.serialize_element(&FormattedBytes {
175 format: self.format,
176 bytes: self.header.value.as_ref(),
177 })?;
178 tuple.end()
179 }
180}
181
182struct CommandHeadersJson<'a> {
183 format: Format,
184 command: &'a record::CommandRecord,
185}
186
187impl Serialize for CommandHeadersJson<'_> {
188 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
189 where
190 S: serde::Serializer,
191 {
192 let mut seq = serializer.serialize_seq(Some(1))?;
193 seq.serialize_element(&CommandHeaderJson {
194 format: self.format,
195 command: self.command,
196 })?;
197 seq.end()
198 }
199}
200
201struct CommandHeaderJson<'a> {
202 format: Format,
203 command: &'a record::CommandRecord,
204}
205
206impl Serialize for CommandHeaderJson<'_> {
207 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
208 where
209 S: serde::Serializer,
210 {
211 let mut tuple = serializer.serialize_tuple(2)?;
212 tuple.serialize_element(&FormattedBytes {
213 format: self.format,
214 bytes: b"",
215 })?;
216 tuple.serialize_element(&FormattedBytes {
217 format: self.format,
218 bytes: self.command.op().to_id(),
219 })?;
220 tuple.end()
221 }
222}
223
224struct StreamPositionJson(record::StreamPosition);
225
226impl Serialize for StreamPositionJson {
227 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
228 where
229 S: serde::Serializer,
230 {
231 let mut state = serializer.serialize_struct("StreamPosition", 2)?;
232 state.serialize_field("seq_num", &self.0.seq_num)?;
233 state.serialize_field("timestamp", &self.0.timestamp)?;
234 state.end()
235 }
236}
237
238struct FormattedBytes<'a> {
239 format: Format,
240 bytes: &'a [u8],
241}
242
243impl Serialize for FormattedBytes<'_> {
244 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
245 where
246 S: serde::Serializer,
247 {
248 match self.format {
249 Format::Raw => serializer.collect_str(&LossyUtf8(self.bytes)),
250 Format::Base64 => serializer.collect_str(&Base64Display(self.bytes)),
251 }
252 }
253}
254
255struct LossyUtf8<'a>(&'a [u8]);
256
257impl std::fmt::Display for LossyUtf8<'_> {
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 use std::fmt::Write as _;
260
261 for chunk in self.0.utf8_chunks() {
262 f.write_str(chunk.valid())?;
263 if !chunk.invalid().is_empty() {
264 f.write_char(char::REPLACEMENT_CHARACTER)?;
265 }
266 }
267 Ok(())
268 }
269}
270
271struct Base64Display<'a>(&'a [u8]);
272
273impl std::fmt::Display for Base64Display<'_> {
274 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275 const INPUT_CHUNK: usize = 3 * 256;
276 const OUTPUT_CHUNK: usize = 4 * 256;
277
278 let mut output = [0u8; OUTPUT_CHUNK];
279 let mut chunks = self.0.chunks_exact(INPUT_CHUNK);
280 for chunk in &mut chunks {
281 let encoded = Base64::encode(chunk, &mut output).map_err(|_| std::fmt::Error)?;
282 f.write_str(encoded)?;
283 }
284
285 let remainder = chunks.remainder();
286 if !remainder.is_empty() {
287 let encoded_len = Base64::encoded_len(remainder);
288 let encoded = Base64::encode(remainder, &mut output[..encoded_len])
289 .map_err(|_| std::fmt::Error)?;
290 f.write_str(encoded)?;
291 }
292
293 Ok(())
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use bytes::Bytes;
300 use s2_common::record::MeteredExt;
301
302 use super::*;
303 use crate::v1::stream::ReadBatch;
304
305 fn fixture_batch() -> types::stream::ReadBatch {
306 let envelope = record::Record::try_from_parts(
307 vec![record::Header {
308 name: Bytes::from_static(b"kind"),
309 value: Bytes::from(vec![b'a', 0xff, b'z']),
310 }],
311 Bytes::from(vec![0xf0, 0x28, 0x8c, 0xbc]),
312 )
313 .expect("valid envelope");
314
315 let empty_fence = record::Record::Command(record::CommandRecord::Fence(
316 "".parse().expect("valid token"),
317 ));
318
319 let non_empty_fence = record::Record::Command(record::CommandRecord::Fence(
320 "token-1".parse().expect("valid token"),
321 ));
322
323 let trim = record::Record::Command(record::CommandRecord::Trim(42));
324
325 types::stream::ReadBatch {
326 records: vec![
327 envelope.metered().sequenced(record::StreamPosition {
328 seq_num: 7,
329 timestamp: 11,
330 }),
331 empty_fence.metered().sequenced(record::StreamPosition {
332 seq_num: 8,
333 timestamp: 12,
334 }),
335 non_empty_fence.metered().sequenced(record::StreamPosition {
336 seq_num: 9,
337 timestamp: 13,
338 }),
339 trim.metered().sequenced(record::StreamPosition {
340 seq_num: 10,
341 timestamp: 14,
342 }),
343 ]
344 .into_iter()
345 .collect(),
346 tail: Some(record::StreamPosition {
347 seq_num: 11,
348 timestamp: 15,
349 }),
350 }
351 }
352
353 #[test]
354 fn serialized_batch_matches_existing_json_shape() {
355 let batch = fixture_batch();
356
357 for format in [Format::Raw, Format::Base64] {
358 let expected =
359 serde_json::to_value(ReadBatch::encode(format, batch.clone())).expect("json");
360 let actual = serde_json::to_value(serialize_read_batch(format, &batch)).expect("json");
361 assert_eq!(actual, expected);
362 }
363 }
364}