Skip to main content

reddb_wire/replication/
catchup.rs

1use serde_json::Value as JsonValue;
2
3use super::util::{
4    get_opt_string, get_opt_u64, get_string, object_from_slice, ReplicationPayloadError, Result,
5};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum CatchupMode {
9    Wal,
10    BaseBackupThenWal,
11    Reclone,
12}
13
14impl CatchupMode {
15    pub fn as_str(self) -> &'static str {
16        match self {
17            Self::Wal => "wal-only",
18            Self::BaseBackupThenWal => "basebackup-then-wal",
19            Self::Reclone => "reclone",
20        }
21    }
22
23    pub fn parse(value: &str) -> Option<Self> {
24        match value {
25            "wal" | "wal-only" => Some(Self::Wal),
26            "basebackup-then-wal" => Some(Self::BaseBackupThenWal),
27            "reclone" => Some(Self::Reclone),
28            _ => None,
29        }
30    }
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct CatchupModeReply {
35    pub mode: CatchupMode,
36    pub available_from_lsn: Option<u64>,
37    pub replica_lsn: Option<u64>,
38    pub reason: Option<String>,
39}
40
41impl CatchupModeReply {
42    pub fn encode_json(&self) -> Vec<u8> {
43        let mut obj = serde_json::Map::new();
44        obj.insert(
45            "catchup_mode".to_string(),
46            JsonValue::String(self.mode.as_str().to_string()),
47        );
48        if let Some(lsn) = self.available_from_lsn {
49            obj.insert(
50                "available_from_lsn".to_string(),
51                JsonValue::Number(lsn.into()),
52            );
53        }
54        if let Some(lsn) = self.replica_lsn {
55            obj.insert("replica_lsn".to_string(), JsonValue::Number(lsn.into()));
56        }
57        if let Some(reason) = &self.reason {
58            obj.insert("reason".to_string(), JsonValue::String(reason.clone()));
59        }
60        serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
61    }
62
63    pub fn decode_json(bytes: &[u8]) -> Result<Self> {
64        let obj = object_from_slice(bytes)?;
65        let mode = CatchupMode::parse(&get_string(&obj, "catchup_mode")?)
66            .ok_or(ReplicationPayloadError::InvalidField("catchup_mode"))?;
67        Ok(Self {
68            mode,
69            available_from_lsn: get_opt_u64(&obj, "available_from_lsn"),
70            replica_lsn: get_opt_u64(&obj, "replica_lsn"),
71            reason: get_opt_string(&obj, "reason"),
72        })
73    }
74
75    pub(crate) fn from_wal_rebootstrap_object(
76        obj: &serde_json::Map<String, JsonValue>,
77    ) -> Result<Option<Self>> {
78        let Some(mode) = get_opt_string(obj, "catchup_mode") else {
79            return Ok(None);
80        };
81        let mode = CatchupMode::parse(&mode)
82            .ok_or(ReplicationPayloadError::InvalidField("catchup_mode"))?;
83        Ok(Some(Self {
84            mode,
85            available_from_lsn: get_opt_u64(obj, "oldest_available_lsn"),
86            replica_lsn: None,
87            reason: get_opt_string(obj, "invalidation_reason"),
88        }))
89    }
90}
91
92#[cfg(test)]
93mod tests {
94    use super::*;
95
96    #[test]
97    fn catchup_mode_reply_round_trips() {
98        let reply = CatchupModeReply {
99            mode: CatchupMode::BaseBackupThenWal,
100            available_from_lsn: Some(10),
101            replica_lsn: Some(7),
102            reason: Some("slot-invalidated".to_string()),
103        };
104        assert_eq!(
105            CatchupModeReply::decode_json(&reply.encode_json()).unwrap(),
106            reply
107        );
108    }
109
110    #[test]
111    fn catchup_mode_rejects_unknown_mode() {
112        assert_eq!(
113            CatchupModeReply::decode_json(br#"{"catchup_mode":"x"}"#).unwrap_err(),
114            ReplicationPayloadError::InvalidField("catchup_mode")
115        );
116    }
117}