reddb_wire/replication/
catchup.rs1use serde_json::Value as JsonValue;
2
3use super::util::{
4 get_opt_string, get_opt_u64, get_string, object_from_slice, ReplicationPayloadError, Result,
5};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum CatchupMode {
9 Wal,
10 BaseBackupThenWal,
11 Reclone,
12}
13
14impl CatchupMode {
15 pub fn as_str(self) -> &'static str {
16 match self {
17 Self::Wal => "wal-only",
18 Self::BaseBackupThenWal => "basebackup-then-wal",
19 Self::Reclone => "reclone",
20 }
21 }
22
23 pub fn parse(value: &str) -> Option<Self> {
24 match value {
25 "wal" | "wal-only" => Some(Self::Wal),
26 "basebackup-then-wal" => Some(Self::BaseBackupThenWal),
27 "reclone" => Some(Self::Reclone),
28 _ => None,
29 }
30 }
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct CatchupModeReply {
35 pub mode: CatchupMode,
36 pub available_from_lsn: Option<u64>,
37 pub replica_lsn: Option<u64>,
38 pub reason: Option<String>,
39}
40
41impl CatchupModeReply {
42 pub fn encode_json(&self) -> Vec<u8> {
43 let mut obj = serde_json::Map::new();
44 obj.insert(
45 "catchup_mode".to_string(),
46 JsonValue::String(self.mode.as_str().to_string()),
47 );
48 if let Some(lsn) = self.available_from_lsn {
49 obj.insert(
50 "available_from_lsn".to_string(),
51 JsonValue::Number(lsn.into()),
52 );
53 }
54 if let Some(lsn) = self.replica_lsn {
55 obj.insert("replica_lsn".to_string(), JsonValue::Number(lsn.into()));
56 }
57 if let Some(reason) = &self.reason {
58 obj.insert("reason".to_string(), JsonValue::String(reason.clone()));
59 }
60 serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
61 }
62
63 pub fn decode_json(bytes: &[u8]) -> Result<Self> {
64 let obj = object_from_slice(bytes)?;
65 let mode = CatchupMode::parse(&get_string(&obj, "catchup_mode")?)
66 .ok_or(ReplicationPayloadError::InvalidField("catchup_mode"))?;
67 Ok(Self {
68 mode,
69 available_from_lsn: get_opt_u64(&obj, "available_from_lsn"),
70 replica_lsn: get_opt_u64(&obj, "replica_lsn"),
71 reason: get_opt_string(&obj, "reason"),
72 })
73 }
74
75 pub(crate) fn from_wal_rebootstrap_object(
76 obj: &serde_json::Map<String, JsonValue>,
77 ) -> Result<Option<Self>> {
78 let Some(mode) = get_opt_string(obj, "catchup_mode") else {
79 return Ok(None);
80 };
81 let mode = CatchupMode::parse(&mode)
82 .ok_or(ReplicationPayloadError::InvalidField("catchup_mode"))?;
83 Ok(Some(Self {
84 mode,
85 available_from_lsn: get_opt_u64(obj, "oldest_available_lsn"),
86 replica_lsn: None,
87 reason: get_opt_string(obj, "invalidation_reason"),
88 }))
89 }
90}
91
92#[cfg(test)]
93mod tests {
94 use super::*;
95
96 #[test]
97 fn catchup_mode_reply_round_trips() {
98 let reply = CatchupModeReply {
99 mode: CatchupMode::BaseBackupThenWal,
100 available_from_lsn: Some(10),
101 replica_lsn: Some(7),
102 reason: Some("slot-invalidated".to_string()),
103 };
104 assert_eq!(
105 CatchupModeReply::decode_json(&reply.encode_json()).unwrap(),
106 reply
107 );
108 }
109
110 #[test]
111 fn catchup_mode_rejects_unknown_mode() {
112 assert_eq!(
113 CatchupModeReply::decode_json(br#"{"catchup_mode":"x"}"#).unwrap_err(),
114 ReplicationPayloadError::InvalidField("catchup_mode")
115 );
116 }
117}