Skip to main content

reddb_wire/replication/
basebackup.rs

1use serde_json::Value as JsonValue;
2
3use super::util::{
4    get_bool_default, get_opt_string, get_opt_u64, get_string, get_u64, hex_decode, hex_encode,
5    object_from_slice, ReplicationPayloadError, Result,
6};
7
8pub const BASEBACKUP_MANIFEST_HEX_FIELD: &str = "basebackup_manifest_hex";
9pub const BASEBACKUP_CHUNK_ORDINAL_FIELD: &str = "basebackup_chunk_ordinal";
10pub const BASEBACKUP_CHUNK_HEX_FIELD: &str = "basebackup_chunk_hex";
11pub const BASEBACKUP_CHUNK_PAIR_FIELD: &str = "basebackup_chunk_ordinal/basebackup_chunk_hex";
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct BaseBackupRequest {
15    pub replica_id: Option<String>,
16    pub max_bytes: Option<u64>,
17    pub snapshot_offset: u64,
18    pub snapshot_token: Option<String>,
19}
20
21impl BaseBackupRequest {
22    pub fn encode_json(&self) -> Vec<u8> {
23        let mut obj = serde_json::Map::new();
24        if let Some(replica_id) = &self.replica_id {
25            obj.insert(
26                "replica_id".to_string(),
27                JsonValue::String(replica_id.clone()),
28            );
29        }
30        if let Some(max_bytes) = self.max_bytes {
31            obj.insert("max_bytes".to_string(), JsonValue::Number(max_bytes.into()));
32        }
33        obj.insert(
34            "snapshot_offset".to_string(),
35            JsonValue::Number(self.snapshot_offset.into()),
36        );
37        if let Some(token) = &self.snapshot_token {
38            obj.insert(
39                "snapshot_token".to_string(),
40                JsonValue::String(token.clone()),
41            );
42        }
43        serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
44    }
45
46    pub fn decode_json(bytes: &[u8]) -> Result<Self> {
47        let obj = object_from_slice(bytes)?;
48        Ok(Self {
49            replica_id: get_opt_string(&obj, "replica_id"),
50            max_bytes: get_opt_u64(&obj, "max_bytes"),
51            snapshot_offset: get_opt_u64(&obj, "snapshot_offset").unwrap_or(0),
52            snapshot_token: get_opt_string(&obj, "snapshot_token"),
53        })
54    }
55}
56
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct BaseBackupManifestChunk {
59    pub ordinal: u32,
60    pub snapshot_offset: u64,
61    pub bytes: u64,
62    pub checksum: u64,
63    pub relative_path: String,
64}
65
66impl BaseBackupManifestChunk {
67    fn to_json(&self) -> JsonValue {
68        let mut obj = serde_json::Map::new();
69        obj.insert(
70            "ordinal".to_string(),
71            JsonValue::Number(self.ordinal.into()),
72        );
73        obj.insert(
74            "snapshot_offset".to_string(),
75            JsonValue::Number(self.snapshot_offset.into()),
76        );
77        obj.insert("bytes".to_string(), JsonValue::Number(self.bytes.into()));
78        obj.insert(
79            "checksum".to_string(),
80            JsonValue::Number(self.checksum.into()),
81        );
82        obj.insert(
83            "relative_path".to_string(),
84            JsonValue::String(self.relative_path.clone()),
85        );
86        JsonValue::Object(obj)
87    }
88
89    fn from_json(value: &JsonValue) -> Result<Self> {
90        let obj = value
91            .as_object()
92            .ok_or(ReplicationPayloadError::InvalidField("basebackup_chunks"))?;
93        let ordinal = get_u64(obj, "ordinal")?;
94        Ok(Self {
95            ordinal: u32::try_from(ordinal)
96                .map_err(|_| ReplicationPayloadError::InvalidField("ordinal"))?,
97            snapshot_offset: get_u64(obj, "snapshot_offset")?,
98            bytes: get_u64(obj, "bytes")?,
99            checksum: get_u64(obj, "checksum")?,
100            relative_path: get_string(obj, "relative_path")?,
101        })
102    }
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct BaseBackupChunk {
107    pub snapshot_available: bool,
108    pub replica_id: String,
109    pub slot_restart_lsn: u64,
110    pub snapshot_lsn: Option<u64>,
111    pub snapshot_token: Option<String>,
112    pub snapshot_total_bytes: Option<u64>,
113    pub snapshot_offset: u64,
114    pub next_snapshot_offset: Option<u64>,
115    pub snapshot_complete: bool,
116    pub snapshot_path: Option<String>,
117    pub snapshot_chunk: Option<Vec<u8>>,
118    pub snapshot_hex: Option<Vec<u8>>,
119    pub metadata_binary: Option<Vec<u8>>,
120    pub metadata_json: Option<Vec<u8>>,
121    pub header_shadow: Option<Vec<u8>>,
122    pub metadata_shadow: Option<Vec<u8>>,
123    pub basebackup_available: bool,
124    pub basebackup_timeline: Option<u64>,
125    pub basebackup_start_lsn: Option<u64>,
126    pub basebackup_checkpoint_lsn: Option<u64>,
127    pub basebackup_snapshot_bytes: Option<u64>,
128    pub basebackup_snapshot_checksum: Option<u64>,
129    pub basebackup_manifest: Option<Vec<u8>>,
130    pub basebackup_chunks: Vec<BaseBackupManifestChunk>,
131    pub basebackup_chunk_ordinal: Option<u32>,
132    pub basebackup_chunk: Option<Vec<u8>>,
133}
134
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub struct BaseBackupChunkPart<'a> {
137    pub ordinal: u32,
138    pub bytes: &'a [u8],
139}
140
141impl BaseBackupChunk {
142    pub fn new(replica_id: impl Into<String>, slot_restart_lsn: u64) -> Self {
143        Self {
144            snapshot_available: true,
145            replica_id: replica_id.into(),
146            slot_restart_lsn,
147            snapshot_lsn: None,
148            snapshot_token: None,
149            snapshot_total_bytes: None,
150            snapshot_offset: 0,
151            next_snapshot_offset: None,
152            snapshot_complete: false,
153            snapshot_path: None,
154            snapshot_chunk: None,
155            snapshot_hex: None,
156            metadata_binary: None,
157            metadata_json: None,
158            header_shadow: None,
159            metadata_shadow: None,
160            basebackup_available: false,
161            basebackup_timeline: None,
162            basebackup_start_lsn: None,
163            basebackup_checkpoint_lsn: None,
164            basebackup_snapshot_bytes: None,
165            basebackup_snapshot_checksum: None,
166            basebackup_manifest: None,
167            basebackup_chunks: Vec::new(),
168            basebackup_chunk_ordinal: None,
169            basebackup_chunk: None,
170        }
171    }
172
173    pub fn encode_json(&self) -> Vec<u8> {
174        let mut obj = serde_json::Map::new();
175        obj.insert(
176            "snapshot_available".to_string(),
177            JsonValue::Bool(self.snapshot_available),
178        );
179        obj.insert(
180            "replica_id".to_string(),
181            JsonValue::String(self.replica_id.clone()),
182        );
183        obj.insert(
184            "slot_restart_lsn".to_string(),
185            JsonValue::Number(self.slot_restart_lsn.into()),
186        );
187        if let Some(lsn) = self.snapshot_lsn {
188            obj.insert("snapshot_lsn".to_string(), JsonValue::Number(lsn.into()));
189        }
190        if let Some(token) = &self.snapshot_token {
191            obj.insert(
192                "snapshot_token".to_string(),
193                JsonValue::String(token.clone()),
194            );
195        }
196        if let Some(bytes) = self.snapshot_total_bytes {
197            obj.insert(
198                "snapshot_total_bytes".to_string(),
199                JsonValue::Number(bytes.into()),
200            );
201        }
202        obj.insert(
203            "snapshot_offset".to_string(),
204            JsonValue::Number(self.snapshot_offset.into()),
205        );
206        if let Some(offset) = self.next_snapshot_offset {
207            obj.insert(
208                "next_snapshot_offset".to_string(),
209                JsonValue::Number(offset.into()),
210            );
211        }
212        obj.insert(
213            "snapshot_complete".to_string(),
214            JsonValue::Bool(self.snapshot_complete),
215        );
216        if let Some(path) = &self.snapshot_path {
217            obj.insert("snapshot_path".to_string(), JsonValue::String(path.clone()));
218        }
219        if let Some(bytes) = &self.snapshot_chunk {
220            obj.insert(
221                "snapshot_chunk_hex".to_string(),
222                JsonValue::String(hex_encode(bytes)),
223            );
224        }
225        if let Some(bytes) = &self.snapshot_hex {
226            obj.insert(
227                "snapshot_hex".to_string(),
228                JsonValue::String(hex_encode(bytes)),
229            );
230        }
231        insert_opt_hex(&mut obj, "metadata_binary_hex", &self.metadata_binary);
232        insert_opt_hex(&mut obj, "metadata_json_hex", &self.metadata_json);
233        insert_opt_hex(&mut obj, "header_shadow_hex", &self.header_shadow);
234        insert_opt_hex(&mut obj, "metadata_shadow_hex", &self.metadata_shadow);
235
236        obj.insert(
237            "basebackup_available".to_string(),
238            JsonValue::Bool(self.basebackup_available),
239        );
240        insert_opt_u64(&mut obj, "basebackup_timeline", self.basebackup_timeline);
241        insert_opt_u64(&mut obj, "basebackup_start_lsn", self.basebackup_start_lsn);
242        insert_opt_u64(
243            &mut obj,
244            "basebackup_checkpoint_lsn",
245            self.basebackup_checkpoint_lsn,
246        );
247        insert_opt_u64(
248            &mut obj,
249            "basebackup_snapshot_bytes",
250            self.basebackup_snapshot_bytes,
251        );
252        insert_opt_u64(
253            &mut obj,
254            "basebackup_snapshot_checksum",
255            self.basebackup_snapshot_checksum,
256        );
257        if let Some(bytes) = &self.basebackup_manifest {
258            obj.insert(
259                BASEBACKUP_MANIFEST_HEX_FIELD.to_string(),
260                JsonValue::String(hex_encode(bytes)),
261            );
262        }
263        obj.insert(
264            "basebackup_chunks".to_string(),
265            JsonValue::Array(
266                self.basebackup_chunks
267                    .iter()
268                    .map(BaseBackupManifestChunk::to_json)
269                    .collect(),
270            ),
271        );
272        if let Some(ordinal) = self.basebackup_chunk_ordinal {
273            obj.insert(
274                BASEBACKUP_CHUNK_ORDINAL_FIELD.to_string(),
275                JsonValue::Number(ordinal.into()),
276            );
277        }
278        if let Some(bytes) = &self.basebackup_chunk {
279            obj.insert(
280                BASEBACKUP_CHUNK_HEX_FIELD.to_string(),
281                JsonValue::String(hex_encode(bytes)),
282            );
283        }
284        serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
285    }
286
287    pub fn decode_json(bytes: &[u8]) -> Result<Self> {
288        let obj = object_from_slice(bytes)?;
289        let basebackup_chunks = match obj.get("basebackup_chunks") {
290            Some(JsonValue::Array(values)) => values
291                .iter()
292                .map(BaseBackupManifestChunk::from_json)
293                .collect::<Result<Vec<_>>>()?,
294            Some(_) => return Err(ReplicationPayloadError::InvalidField("basebackup_chunks")),
295            None => Vec::new(),
296        };
297        let basebackup_chunk_ordinal = match get_opt_u64(&obj, BASEBACKUP_CHUNK_ORDINAL_FIELD) {
298            Some(value) => Some(u32::try_from(value).map_err(|_| {
299                ReplicationPayloadError::InvalidField(BASEBACKUP_CHUNK_ORDINAL_FIELD)
300            })?),
301            None => None,
302        };
303        Ok(Self {
304            snapshot_available: get_bool_default(&obj, "snapshot_available", false),
305            replica_id: get_string(&obj, "replica_id")?,
306            slot_restart_lsn: get_u64(&obj, "slot_restart_lsn")?,
307            snapshot_lsn: get_opt_u64(&obj, "snapshot_lsn"),
308            snapshot_token: get_opt_string(&obj, "snapshot_token"),
309            snapshot_total_bytes: get_opt_u64(&obj, "snapshot_total_bytes"),
310            snapshot_offset: get_opt_u64(&obj, "snapshot_offset").unwrap_or(0),
311            next_snapshot_offset: get_opt_u64(&obj, "next_snapshot_offset"),
312            snapshot_complete: get_bool_default(&obj, "snapshot_complete", false),
313            snapshot_path: get_opt_string(&obj, "snapshot_path"),
314            snapshot_chunk: decode_opt_hex(&obj, "snapshot_chunk_hex")?,
315            snapshot_hex: decode_opt_hex(&obj, "snapshot_hex")?,
316            metadata_binary: decode_opt_hex(&obj, "metadata_binary_hex")?,
317            metadata_json: decode_opt_hex(&obj, "metadata_json_hex")?,
318            header_shadow: decode_opt_hex(&obj, "header_shadow_hex")?,
319            metadata_shadow: decode_opt_hex(&obj, "metadata_shadow_hex")?,
320            basebackup_available: get_bool_default(&obj, "basebackup_available", false),
321            basebackup_timeline: get_opt_u64(&obj, "basebackup_timeline"),
322            basebackup_start_lsn: get_opt_u64(&obj, "basebackup_start_lsn"),
323            basebackup_checkpoint_lsn: get_opt_u64(&obj, "basebackup_checkpoint_lsn"),
324            basebackup_snapshot_bytes: get_opt_u64(&obj, "basebackup_snapshot_bytes"),
325            basebackup_snapshot_checksum: get_opt_u64(&obj, "basebackup_snapshot_checksum"),
326            basebackup_manifest: decode_opt_hex(&obj, BASEBACKUP_MANIFEST_HEX_FIELD)?,
327            basebackup_chunks,
328            basebackup_chunk_ordinal,
329            basebackup_chunk: decode_opt_hex(&obj, BASEBACKUP_CHUNK_HEX_FIELD)?,
330        })
331    }
332
333    pub fn required_basebackup_manifest(&self) -> Result<Option<&[u8]>> {
334        if !self.basebackup_available {
335            return Ok(None);
336        }
337        self.basebackup_manifest
338            .as_deref()
339            .map(Some)
340            .ok_or(ReplicationPayloadError::MissingField(
341                BASEBACKUP_MANIFEST_HEX_FIELD,
342            ))
343    }
344
345    pub fn basebackup_chunk_part(&self) -> Result<Option<BaseBackupChunkPart<'_>>> {
346        match (
347            self.basebackup_chunk_ordinal,
348            self.basebackup_chunk.as_deref(),
349        ) {
350            (Some(ordinal), Some(bytes)) => Ok(Some(BaseBackupChunkPart { ordinal, bytes })),
351            (None, None) => Ok(None),
352            _ => Err(ReplicationPayloadError::InvalidField(
353                BASEBACKUP_CHUNK_PAIR_FIELD,
354            )),
355        }
356    }
357}
358
359fn insert_opt_u64(obj: &mut serde_json::Map<String, JsonValue>, field: &str, value: Option<u64>) {
360    if let Some(value) = value {
361        obj.insert(field.to_string(), JsonValue::Number(value.into()));
362    }
363}
364
365fn insert_opt_hex(
366    obj: &mut serde_json::Map<String, JsonValue>,
367    field: &str,
368    value: &Option<Vec<u8>>,
369) {
370    if let Some(bytes) = value {
371        obj.insert(field.to_string(), JsonValue::String(hex_encode(bytes)));
372    }
373}
374
375fn decode_opt_hex(
376    obj: &serde_json::Map<String, JsonValue>,
377    field: &'static str,
378) -> Result<Option<Vec<u8>>> {
379    match obj.get(field).and_then(JsonValue::as_str) {
380        Some(value) => Ok(Some(hex_decode(field, value)?)),
381        None => Ok(None),
382    }
383}
384
385#[cfg(test)]
386mod tests {
387    use super::*;
388
389    #[test]
390    fn basebackup_request_round_trips() {
391        let request = BaseBackupRequest {
392            replica_id: Some("replica-a".to_string()),
393            max_bytes: Some(64),
394            snapshot_offset: 128,
395            snapshot_token: Some("snapshot:r:1:2".to_string()),
396        };
397        assert_eq!(
398            BaseBackupRequest::decode_json(&request.encode_json()).unwrap(),
399            request
400        );
401    }
402
403    #[test]
404    fn basebackup_chunk_constructor_sets_wire_defaults() {
405        let chunk = BaseBackupChunk::new("replica-a", 7);
406        assert!(chunk.snapshot_available);
407        assert_eq!(chunk.replica_id, "replica-a");
408        assert_eq!(chunk.slot_restart_lsn, 7);
409        assert_eq!(chunk.snapshot_offset, 0);
410        assert!(!chunk.snapshot_complete);
411        assert!(!chunk.basebackup_available);
412        assert!(chunk.basebackup_chunks.is_empty());
413    }
414
415    #[test]
416    fn basebackup_chunk_round_trips_manifest_and_payload_chunk() {
417        let chunk = BaseBackupChunk {
418            snapshot_available: true,
419            replica_id: "replica-a".to_string(),
420            slot_restart_lsn: 7,
421            snapshot_lsn: Some(9),
422            snapshot_token: Some("snapshot:replica-a:9:100".to_string()),
423            snapshot_total_bytes: Some(100),
424            snapshot_offset: 0,
425            next_snapshot_offset: Some(10),
426            snapshot_complete: false,
427            snapshot_path: Some("/tmp/replica.rdb".to_string()),
428            snapshot_chunk: Some(b"snapshot".to_vec()),
429            snapshot_hex: None,
430            metadata_binary: Some(b"metadata-binary".to_vec()),
431            metadata_json: Some(b"metadata-json".to_vec()),
432            header_shadow: Some(b"header-shadow".to_vec()),
433            metadata_shadow: Some(b"metadata-shadow".to_vec()),
434            basebackup_available: true,
435            basebackup_timeline: Some(1),
436            basebackup_start_lsn: Some(0),
437            basebackup_checkpoint_lsn: Some(9),
438            basebackup_snapshot_bytes: Some(100),
439            basebackup_snapshot_checksum: Some(123),
440            basebackup_manifest: Some(b"manifest".to_vec()),
441            basebackup_chunks: vec![BaseBackupManifestChunk {
442                ordinal: 0,
443                snapshot_offset: 0,
444                bytes: 10,
445                checksum: 99,
446                relative_path: "base/part-000000.redbasepart".to_string(),
447            }],
448            basebackup_chunk_ordinal: Some(0),
449            basebackup_chunk: Some(b"basebackup".to_vec()),
450        };
451        assert_eq!(
452            BaseBackupChunk::decode_json(&chunk.encode_json()).unwrap(),
453            chunk
454        );
455    }
456
457    #[test]
458    fn basebackup_chunk_helpers_own_required_wire_field_validation() {
459        let mut chunk = BaseBackupChunk::new("replica-a", 7);
460        chunk.basebackup_available = true;
461        assert_eq!(
462            chunk.required_basebackup_manifest().unwrap_err(),
463            ReplicationPayloadError::MissingField(BASEBACKUP_MANIFEST_HEX_FIELD)
464        );
465
466        chunk.basebackup_manifest = Some(b"manifest".to_vec());
467        assert_eq!(
468            chunk.required_basebackup_manifest().unwrap(),
469            Some(&b"manifest"[..])
470        );
471
472        chunk.basebackup_chunk_ordinal = Some(2);
473        assert_eq!(
474            chunk.basebackup_chunk_part().unwrap_err(),
475            ReplicationPayloadError::InvalidField(BASEBACKUP_CHUNK_PAIR_FIELD)
476        );
477
478        chunk.basebackup_chunk = Some(b"part".to_vec());
479        assert_eq!(
480            chunk.basebackup_chunk_part().unwrap(),
481            Some(BaseBackupChunkPart {
482                ordinal: 2,
483                bytes: &b"part"[..],
484            })
485        );
486    }
487}