1use serde_json::Value as JsonValue;
2
3use super::util::{
4 get_bool_default, get_opt_string, get_opt_u64, get_string, get_u64, hex_decode, hex_encode,
5 object_from_slice, ReplicationPayloadError, Result,
6};
7
8pub const BASEBACKUP_MANIFEST_HEX_FIELD: &str = "basebackup_manifest_hex";
9pub const BASEBACKUP_CHUNK_ORDINAL_FIELD: &str = "basebackup_chunk_ordinal";
10pub const BASEBACKUP_CHUNK_HEX_FIELD: &str = "basebackup_chunk_hex";
11pub const BASEBACKUP_CHUNK_PAIR_FIELD: &str = "basebackup_chunk_ordinal/basebackup_chunk_hex";
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct BaseBackupRequest {
15 pub replica_id: Option<String>,
16 pub max_bytes: Option<u64>,
17 pub snapshot_offset: u64,
18 pub snapshot_token: Option<String>,
19}
20
21impl BaseBackupRequest {
22 pub fn encode_json(&self) -> Vec<u8> {
23 let mut obj = serde_json::Map::new();
24 if let Some(replica_id) = &self.replica_id {
25 obj.insert(
26 "replica_id".to_string(),
27 JsonValue::String(replica_id.clone()),
28 );
29 }
30 if let Some(max_bytes) = self.max_bytes {
31 obj.insert("max_bytes".to_string(), JsonValue::Number(max_bytes.into()));
32 }
33 obj.insert(
34 "snapshot_offset".to_string(),
35 JsonValue::Number(self.snapshot_offset.into()),
36 );
37 if let Some(token) = &self.snapshot_token {
38 obj.insert(
39 "snapshot_token".to_string(),
40 JsonValue::String(token.clone()),
41 );
42 }
43 serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
44 }
45
46 pub fn decode_json(bytes: &[u8]) -> Result<Self> {
47 let obj = object_from_slice(bytes)?;
48 Ok(Self {
49 replica_id: get_opt_string(&obj, "replica_id"),
50 max_bytes: get_opt_u64(&obj, "max_bytes"),
51 snapshot_offset: get_opt_u64(&obj, "snapshot_offset").unwrap_or(0),
52 snapshot_token: get_opt_string(&obj, "snapshot_token"),
53 })
54 }
55}
56
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct BaseBackupManifestChunk {
59 pub ordinal: u32,
60 pub snapshot_offset: u64,
61 pub bytes: u64,
62 pub checksum: u64,
63 pub relative_path: String,
64}
65
66impl BaseBackupManifestChunk {
67 fn to_json(&self) -> JsonValue {
68 let mut obj = serde_json::Map::new();
69 obj.insert(
70 "ordinal".to_string(),
71 JsonValue::Number(self.ordinal.into()),
72 );
73 obj.insert(
74 "snapshot_offset".to_string(),
75 JsonValue::Number(self.snapshot_offset.into()),
76 );
77 obj.insert("bytes".to_string(), JsonValue::Number(self.bytes.into()));
78 obj.insert(
79 "checksum".to_string(),
80 JsonValue::Number(self.checksum.into()),
81 );
82 obj.insert(
83 "relative_path".to_string(),
84 JsonValue::String(self.relative_path.clone()),
85 );
86 JsonValue::Object(obj)
87 }
88
89 fn from_json(value: &JsonValue) -> Result<Self> {
90 let obj = value
91 .as_object()
92 .ok_or(ReplicationPayloadError::InvalidField("basebackup_chunks"))?;
93 let ordinal = get_u64(obj, "ordinal")?;
94 Ok(Self {
95 ordinal: u32::try_from(ordinal)
96 .map_err(|_| ReplicationPayloadError::InvalidField("ordinal"))?,
97 snapshot_offset: get_u64(obj, "snapshot_offset")?,
98 bytes: get_u64(obj, "bytes")?,
99 checksum: get_u64(obj, "checksum")?,
100 relative_path: get_string(obj, "relative_path")?,
101 })
102 }
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct BaseBackupChunk {
107 pub snapshot_available: bool,
108 pub replica_id: String,
109 pub slot_restart_lsn: u64,
110 pub snapshot_lsn: Option<u64>,
111 pub snapshot_token: Option<String>,
112 pub snapshot_total_bytes: Option<u64>,
113 pub snapshot_offset: u64,
114 pub next_snapshot_offset: Option<u64>,
115 pub snapshot_complete: bool,
116 pub snapshot_path: Option<String>,
117 pub snapshot_chunk: Option<Vec<u8>>,
118 pub snapshot_hex: Option<Vec<u8>>,
119 pub metadata_binary: Option<Vec<u8>>,
120 pub metadata_json: Option<Vec<u8>>,
121 pub header_shadow: Option<Vec<u8>>,
122 pub metadata_shadow: Option<Vec<u8>>,
123 pub basebackup_available: bool,
124 pub basebackup_timeline: Option<u64>,
125 pub basebackup_start_lsn: Option<u64>,
126 pub basebackup_checkpoint_lsn: Option<u64>,
127 pub basebackup_snapshot_bytes: Option<u64>,
128 pub basebackup_snapshot_checksum: Option<u64>,
129 pub basebackup_manifest: Option<Vec<u8>>,
130 pub basebackup_chunks: Vec<BaseBackupManifestChunk>,
131 pub basebackup_chunk_ordinal: Option<u32>,
132 pub basebackup_chunk: Option<Vec<u8>>,
133}
134
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub struct BaseBackupChunkPart<'a> {
137 pub ordinal: u32,
138 pub bytes: &'a [u8],
139}
140
141impl BaseBackupChunk {
142 pub fn new(replica_id: impl Into<String>, slot_restart_lsn: u64) -> Self {
143 Self {
144 snapshot_available: true,
145 replica_id: replica_id.into(),
146 slot_restart_lsn,
147 snapshot_lsn: None,
148 snapshot_token: None,
149 snapshot_total_bytes: None,
150 snapshot_offset: 0,
151 next_snapshot_offset: None,
152 snapshot_complete: false,
153 snapshot_path: None,
154 snapshot_chunk: None,
155 snapshot_hex: None,
156 metadata_binary: None,
157 metadata_json: None,
158 header_shadow: None,
159 metadata_shadow: None,
160 basebackup_available: false,
161 basebackup_timeline: None,
162 basebackup_start_lsn: None,
163 basebackup_checkpoint_lsn: None,
164 basebackup_snapshot_bytes: None,
165 basebackup_snapshot_checksum: None,
166 basebackup_manifest: None,
167 basebackup_chunks: Vec::new(),
168 basebackup_chunk_ordinal: None,
169 basebackup_chunk: None,
170 }
171 }
172
173 pub fn encode_json(&self) -> Vec<u8> {
174 let mut obj = serde_json::Map::new();
175 obj.insert(
176 "snapshot_available".to_string(),
177 JsonValue::Bool(self.snapshot_available),
178 );
179 obj.insert(
180 "replica_id".to_string(),
181 JsonValue::String(self.replica_id.clone()),
182 );
183 obj.insert(
184 "slot_restart_lsn".to_string(),
185 JsonValue::Number(self.slot_restart_lsn.into()),
186 );
187 if let Some(lsn) = self.snapshot_lsn {
188 obj.insert("snapshot_lsn".to_string(), JsonValue::Number(lsn.into()));
189 }
190 if let Some(token) = &self.snapshot_token {
191 obj.insert(
192 "snapshot_token".to_string(),
193 JsonValue::String(token.clone()),
194 );
195 }
196 if let Some(bytes) = self.snapshot_total_bytes {
197 obj.insert(
198 "snapshot_total_bytes".to_string(),
199 JsonValue::Number(bytes.into()),
200 );
201 }
202 obj.insert(
203 "snapshot_offset".to_string(),
204 JsonValue::Number(self.snapshot_offset.into()),
205 );
206 if let Some(offset) = self.next_snapshot_offset {
207 obj.insert(
208 "next_snapshot_offset".to_string(),
209 JsonValue::Number(offset.into()),
210 );
211 }
212 obj.insert(
213 "snapshot_complete".to_string(),
214 JsonValue::Bool(self.snapshot_complete),
215 );
216 if let Some(path) = &self.snapshot_path {
217 obj.insert("snapshot_path".to_string(), JsonValue::String(path.clone()));
218 }
219 if let Some(bytes) = &self.snapshot_chunk {
220 obj.insert(
221 "snapshot_chunk_hex".to_string(),
222 JsonValue::String(hex_encode(bytes)),
223 );
224 }
225 if let Some(bytes) = &self.snapshot_hex {
226 obj.insert(
227 "snapshot_hex".to_string(),
228 JsonValue::String(hex_encode(bytes)),
229 );
230 }
231 insert_opt_hex(&mut obj, "metadata_binary_hex", &self.metadata_binary);
232 insert_opt_hex(&mut obj, "metadata_json_hex", &self.metadata_json);
233 insert_opt_hex(&mut obj, "header_shadow_hex", &self.header_shadow);
234 insert_opt_hex(&mut obj, "metadata_shadow_hex", &self.metadata_shadow);
235
236 obj.insert(
237 "basebackup_available".to_string(),
238 JsonValue::Bool(self.basebackup_available),
239 );
240 insert_opt_u64(&mut obj, "basebackup_timeline", self.basebackup_timeline);
241 insert_opt_u64(&mut obj, "basebackup_start_lsn", self.basebackup_start_lsn);
242 insert_opt_u64(
243 &mut obj,
244 "basebackup_checkpoint_lsn",
245 self.basebackup_checkpoint_lsn,
246 );
247 insert_opt_u64(
248 &mut obj,
249 "basebackup_snapshot_bytes",
250 self.basebackup_snapshot_bytes,
251 );
252 insert_opt_u64(
253 &mut obj,
254 "basebackup_snapshot_checksum",
255 self.basebackup_snapshot_checksum,
256 );
257 if let Some(bytes) = &self.basebackup_manifest {
258 obj.insert(
259 BASEBACKUP_MANIFEST_HEX_FIELD.to_string(),
260 JsonValue::String(hex_encode(bytes)),
261 );
262 }
263 obj.insert(
264 "basebackup_chunks".to_string(),
265 JsonValue::Array(
266 self.basebackup_chunks
267 .iter()
268 .map(BaseBackupManifestChunk::to_json)
269 .collect(),
270 ),
271 );
272 if let Some(ordinal) = self.basebackup_chunk_ordinal {
273 obj.insert(
274 BASEBACKUP_CHUNK_ORDINAL_FIELD.to_string(),
275 JsonValue::Number(ordinal.into()),
276 );
277 }
278 if let Some(bytes) = &self.basebackup_chunk {
279 obj.insert(
280 BASEBACKUP_CHUNK_HEX_FIELD.to_string(),
281 JsonValue::String(hex_encode(bytes)),
282 );
283 }
284 serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
285 }
286
287 pub fn decode_json(bytes: &[u8]) -> Result<Self> {
288 let obj = object_from_slice(bytes)?;
289 let basebackup_chunks = match obj.get("basebackup_chunks") {
290 Some(JsonValue::Array(values)) => values
291 .iter()
292 .map(BaseBackupManifestChunk::from_json)
293 .collect::<Result<Vec<_>>>()?,
294 Some(_) => return Err(ReplicationPayloadError::InvalidField("basebackup_chunks")),
295 None => Vec::new(),
296 };
297 let basebackup_chunk_ordinal = match get_opt_u64(&obj, BASEBACKUP_CHUNK_ORDINAL_FIELD) {
298 Some(value) => Some(u32::try_from(value).map_err(|_| {
299 ReplicationPayloadError::InvalidField(BASEBACKUP_CHUNK_ORDINAL_FIELD)
300 })?),
301 None => None,
302 };
303 Ok(Self {
304 snapshot_available: get_bool_default(&obj, "snapshot_available", false),
305 replica_id: get_string(&obj, "replica_id")?,
306 slot_restart_lsn: get_u64(&obj, "slot_restart_lsn")?,
307 snapshot_lsn: get_opt_u64(&obj, "snapshot_lsn"),
308 snapshot_token: get_opt_string(&obj, "snapshot_token"),
309 snapshot_total_bytes: get_opt_u64(&obj, "snapshot_total_bytes"),
310 snapshot_offset: get_opt_u64(&obj, "snapshot_offset").unwrap_or(0),
311 next_snapshot_offset: get_opt_u64(&obj, "next_snapshot_offset"),
312 snapshot_complete: get_bool_default(&obj, "snapshot_complete", false),
313 snapshot_path: get_opt_string(&obj, "snapshot_path"),
314 snapshot_chunk: decode_opt_hex(&obj, "snapshot_chunk_hex")?,
315 snapshot_hex: decode_opt_hex(&obj, "snapshot_hex")?,
316 metadata_binary: decode_opt_hex(&obj, "metadata_binary_hex")?,
317 metadata_json: decode_opt_hex(&obj, "metadata_json_hex")?,
318 header_shadow: decode_opt_hex(&obj, "header_shadow_hex")?,
319 metadata_shadow: decode_opt_hex(&obj, "metadata_shadow_hex")?,
320 basebackup_available: get_bool_default(&obj, "basebackup_available", false),
321 basebackup_timeline: get_opt_u64(&obj, "basebackup_timeline"),
322 basebackup_start_lsn: get_opt_u64(&obj, "basebackup_start_lsn"),
323 basebackup_checkpoint_lsn: get_opt_u64(&obj, "basebackup_checkpoint_lsn"),
324 basebackup_snapshot_bytes: get_opt_u64(&obj, "basebackup_snapshot_bytes"),
325 basebackup_snapshot_checksum: get_opt_u64(&obj, "basebackup_snapshot_checksum"),
326 basebackup_manifest: decode_opt_hex(&obj, BASEBACKUP_MANIFEST_HEX_FIELD)?,
327 basebackup_chunks,
328 basebackup_chunk_ordinal,
329 basebackup_chunk: decode_opt_hex(&obj, BASEBACKUP_CHUNK_HEX_FIELD)?,
330 })
331 }
332
333 pub fn required_basebackup_manifest(&self) -> Result<Option<&[u8]>> {
334 if !self.basebackup_available {
335 return Ok(None);
336 }
337 self.basebackup_manifest
338 .as_deref()
339 .map(Some)
340 .ok_or(ReplicationPayloadError::MissingField(
341 BASEBACKUP_MANIFEST_HEX_FIELD,
342 ))
343 }
344
345 pub fn basebackup_chunk_part(&self) -> Result<Option<BaseBackupChunkPart<'_>>> {
346 match (
347 self.basebackup_chunk_ordinal,
348 self.basebackup_chunk.as_deref(),
349 ) {
350 (Some(ordinal), Some(bytes)) => Ok(Some(BaseBackupChunkPart { ordinal, bytes })),
351 (None, None) => Ok(None),
352 _ => Err(ReplicationPayloadError::InvalidField(
353 BASEBACKUP_CHUNK_PAIR_FIELD,
354 )),
355 }
356 }
357}
358
359fn insert_opt_u64(obj: &mut serde_json::Map<String, JsonValue>, field: &str, value: Option<u64>) {
360 if let Some(value) = value {
361 obj.insert(field.to_string(), JsonValue::Number(value.into()));
362 }
363}
364
365fn insert_opt_hex(
366 obj: &mut serde_json::Map<String, JsonValue>,
367 field: &str,
368 value: &Option<Vec<u8>>,
369) {
370 if let Some(bytes) = value {
371 obj.insert(field.to_string(), JsonValue::String(hex_encode(bytes)));
372 }
373}
374
375fn decode_opt_hex(
376 obj: &serde_json::Map<String, JsonValue>,
377 field: &'static str,
378) -> Result<Option<Vec<u8>>> {
379 match obj.get(field).and_then(JsonValue::as_str) {
380 Some(value) => Ok(Some(hex_decode(field, value)?)),
381 None => Ok(None),
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use super::*;
388
389 #[test]
390 fn basebackup_request_round_trips() {
391 let request = BaseBackupRequest {
392 replica_id: Some("replica-a".to_string()),
393 max_bytes: Some(64),
394 snapshot_offset: 128,
395 snapshot_token: Some("snapshot:r:1:2".to_string()),
396 };
397 assert_eq!(
398 BaseBackupRequest::decode_json(&request.encode_json()).unwrap(),
399 request
400 );
401 }
402
403 #[test]
404 fn basebackup_chunk_constructor_sets_wire_defaults() {
405 let chunk = BaseBackupChunk::new("replica-a", 7);
406 assert!(chunk.snapshot_available);
407 assert_eq!(chunk.replica_id, "replica-a");
408 assert_eq!(chunk.slot_restart_lsn, 7);
409 assert_eq!(chunk.snapshot_offset, 0);
410 assert!(!chunk.snapshot_complete);
411 assert!(!chunk.basebackup_available);
412 assert!(chunk.basebackup_chunks.is_empty());
413 }
414
415 #[test]
416 fn basebackup_chunk_round_trips_manifest_and_payload_chunk() {
417 let chunk = BaseBackupChunk {
418 snapshot_available: true,
419 replica_id: "replica-a".to_string(),
420 slot_restart_lsn: 7,
421 snapshot_lsn: Some(9),
422 snapshot_token: Some("snapshot:replica-a:9:100".to_string()),
423 snapshot_total_bytes: Some(100),
424 snapshot_offset: 0,
425 next_snapshot_offset: Some(10),
426 snapshot_complete: false,
427 snapshot_path: Some("/tmp/replica.rdb".to_string()),
428 snapshot_chunk: Some(b"snapshot".to_vec()),
429 snapshot_hex: None,
430 metadata_binary: Some(b"metadata-binary".to_vec()),
431 metadata_json: Some(b"metadata-json".to_vec()),
432 header_shadow: Some(b"header-shadow".to_vec()),
433 metadata_shadow: Some(b"metadata-shadow".to_vec()),
434 basebackup_available: true,
435 basebackup_timeline: Some(1),
436 basebackup_start_lsn: Some(0),
437 basebackup_checkpoint_lsn: Some(9),
438 basebackup_snapshot_bytes: Some(100),
439 basebackup_snapshot_checksum: Some(123),
440 basebackup_manifest: Some(b"manifest".to_vec()),
441 basebackup_chunks: vec![BaseBackupManifestChunk {
442 ordinal: 0,
443 snapshot_offset: 0,
444 bytes: 10,
445 checksum: 99,
446 relative_path: "base/part-000000.redbasepart".to_string(),
447 }],
448 basebackup_chunk_ordinal: Some(0),
449 basebackup_chunk: Some(b"basebackup".to_vec()),
450 };
451 assert_eq!(
452 BaseBackupChunk::decode_json(&chunk.encode_json()).unwrap(),
453 chunk
454 );
455 }
456
457 #[test]
458 fn basebackup_chunk_helpers_own_required_wire_field_validation() {
459 let mut chunk = BaseBackupChunk::new("replica-a", 7);
460 chunk.basebackup_available = true;
461 assert_eq!(
462 chunk.required_basebackup_manifest().unwrap_err(),
463 ReplicationPayloadError::MissingField(BASEBACKUP_MANIFEST_HEX_FIELD)
464 );
465
466 chunk.basebackup_manifest = Some(b"manifest".to_vec());
467 assert_eq!(
468 chunk.required_basebackup_manifest().unwrap(),
469 Some(&b"manifest"[..])
470 );
471
472 chunk.basebackup_chunk_ordinal = Some(2);
473 assert_eq!(
474 chunk.basebackup_chunk_part().unwrap_err(),
475 ReplicationPayloadError::InvalidField(BASEBACKUP_CHUNK_PAIR_FIELD)
476 );
477
478 chunk.basebackup_chunk = Some(b"part".to_vec());
479 assert_eq!(
480 chunk.basebackup_chunk_part().unwrap(),
481 Some(BaseBackupChunkPart {
482 ordinal: 2,
483 bytes: &b"part"[..],
484 })
485 );
486 }
487}