Skip to main content

reddb_wire/replication/
timeline.rs

1use serde_json::Value as JsonValue;
2
3use super::util::{get_opt_string, get_opt_u64, get_string, get_u64, object_from_slice, Result};
4
5#[derive(Debug, Clone, PartialEq, Eq)]
6pub struct TimelineForkNotice {
7    pub parent_timeline: u64,
8    pub new_timeline: u64,
9    pub fork_lsn: u64,
10    pub promoted_replica_id: Option<String>,
11    pub created_at_unix_ms: Option<u64>,
12}
13
14impl TimelineForkNotice {
15    pub fn encode_json(&self) -> Vec<u8> {
16        let mut obj = serde_json::Map::new();
17        obj.insert(
18            "parent_timeline".to_string(),
19            JsonValue::Number(self.parent_timeline.into()),
20        );
21        obj.insert(
22            "new_timeline".to_string(),
23            JsonValue::Number(self.new_timeline.into()),
24        );
25        obj.insert(
26            "fork_lsn".to_string(),
27            JsonValue::Number(self.fork_lsn.into()),
28        );
29        if let Some(replica_id) = &self.promoted_replica_id {
30            obj.insert(
31                "promoted_replica_id".to_string(),
32                JsonValue::String(replica_id.clone()),
33            );
34        }
35        if let Some(created_at) = self.created_at_unix_ms {
36            obj.insert(
37                "created_at_unix_ms".to_string(),
38                JsonValue::Number(created_at.into()),
39            );
40        }
41        serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
42    }
43
44    pub fn decode_json(bytes: &[u8]) -> Result<Self> {
45        let obj = object_from_slice(bytes)?;
46        Ok(Self {
47            parent_timeline: get_u64(&obj, "parent_timeline")?,
48            new_timeline: get_u64(&obj, "new_timeline")?,
49            fork_lsn: get_u64(&obj, "fork_lsn")?,
50            promoted_replica_id: get_opt_string(&obj, "promoted_replica_id"),
51            created_at_unix_ms: get_opt_u64(&obj, "created_at_unix_ms"),
52        })
53    }
54
55    pub fn decode_legacy_rejoin_plan(bytes: &[u8]) -> Result<Self> {
56        let obj = object_from_slice(bytes)?;
57        Ok(Self {
58            parent_timeline: get_opt_u64(&obj, "rejoin_node_timeline").unwrap_or(0),
59            new_timeline: get_u64(&obj, "rejoin_target_timeline")?,
60            fork_lsn: get_u64(&obj, "rejoin_start_lsn")
61                .or_else(|_| get_u64(&obj, "rejoin_rewind_to_lsn"))?,
62            promoted_replica_id: get_opt_string(&obj, "promoted_replica_id")
63                .or_else(|| get_opt_string(&obj, "replica_id")),
64            created_at_unix_ms: None,
65        })
66    }
67}
68
69#[derive(Debug, Clone, PartialEq, Eq)]
70pub struct RejoinPlanNotice {
71    pub state: String,
72    pub node_timeline: u64,
73    pub node_flushed_lsn: u64,
74    pub available_from_lsn: u64,
75    pub target_timeline: u64,
76    pub rewind_to_lsn: Option<u64>,
77    pub start_lsn: u64,
78}
79
80impl RejoinPlanNotice {
81    pub fn decode_json(bytes: &[u8]) -> Result<Self> {
82        let obj = object_from_slice(bytes)?;
83        Ok(Self {
84            state: get_string(&obj, "state")?,
85            node_timeline: get_u64(&obj, "rejoin_node_timeline")?,
86            node_flushed_lsn: get_u64(&obj, "rejoin_node_flushed_lsn")?,
87            available_from_lsn: get_u64(&obj, "rejoin_available_from_lsn")?,
88            target_timeline: get_u64(&obj, "rejoin_target_timeline")?,
89            rewind_to_lsn: get_opt_u64(&obj, "rejoin_rewind_to_lsn"),
90            start_lsn: get_u64(&obj, "rejoin_start_lsn")?,
91        })
92    }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub struct RejoinRewindConfirmation {
97    pub target_timeline: u64,
98    pub rewind_to_lsn: u64,
99}
100
101impl RejoinRewindConfirmation {
102    pub fn encode_json(&self) -> Vec<u8> {
103        let mut obj = serde_json::Map::new();
104        obj.insert(
105            "target_timeline".to_string(),
106            JsonValue::Number(self.target_timeline.into()),
107        );
108        obj.insert(
109            "rewind_to_lsn".to_string(),
110            JsonValue::Number(self.rewind_to_lsn.into()),
111        );
112        serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
113    }
114
115    pub fn decode_json(bytes: &[u8]) -> Result<Self> {
116        let obj = object_from_slice(bytes)?;
117        Ok(Self {
118            target_timeline: get_u64(&obj, "target_timeline")?,
119            rewind_to_lsn: get_u64(&obj, "rewind_to_lsn")?,
120        })
121    }
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct RejoinRewindConfirmationReply {
126    pub ok: bool,
127    pub target_timeline: u64,
128    pub rewind_to_lsn: u64,
129    pub next_step: String,
130}
131
132impl RejoinRewindConfirmationReply {
133    pub fn confirmed(target_timeline: u64, rewind_to_lsn: u64) -> Self {
134        Self {
135            ok: true,
136            target_timeline,
137            rewind_to_lsn,
138            next_step: "restart or resume replica apply from the confirmed LSN".to_string(),
139        }
140    }
141
142    pub fn encode_json(&self) -> Vec<u8> {
143        let mut obj = serde_json::Map::new();
144        obj.insert("ok".to_string(), JsonValue::Bool(self.ok));
145        obj.insert(
146            "target_timeline".to_string(),
147            JsonValue::Number(self.target_timeline.into()),
148        );
149        obj.insert(
150            "rewind_to_lsn".to_string(),
151            JsonValue::Number(self.rewind_to_lsn.into()),
152        );
153        obj.insert(
154            "next_step".to_string(),
155            JsonValue::String(self.next_step.clone()),
156        );
157        serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
158    }
159
160    pub fn decode_json(bytes: &[u8]) -> Result<Self> {
161        let obj = object_from_slice(bytes)?;
162        Ok(Self {
163            ok: obj.get("ok").and_then(JsonValue::as_bool).unwrap_or(false),
164            target_timeline: get_u64(&obj, "target_timeline")?,
165            rewind_to_lsn: get_u64(&obj, "rewind_to_lsn")?,
166            next_step: get_string(&obj, "next_step")?,
167        })
168    }
169}
170
171#[derive(Debug, Clone, PartialEq, Eq)]
172pub struct FailoverPromotionRequest {
173    pub holder_id: Option<String>,
174    pub ttl_ms: Option<u64>,
175}
176
177impl FailoverPromotionRequest {
178    pub fn encode_json(&self) -> Vec<u8> {
179        let mut obj = serde_json::Map::new();
180        if let Some(holder_id) = &self.holder_id {
181            obj.insert(
182                "holder_id".to_string(),
183                JsonValue::String(holder_id.clone()),
184            );
185        }
186        if let Some(ttl_ms) = self.ttl_ms.filter(|ttl| *ttl > 0) {
187            obj.insert("ttl_ms".to_string(), JsonValue::Number(ttl_ms.into()));
188        }
189        serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
190    }
191
192    pub fn decode_json(bytes: &[u8]) -> Result<Self> {
193        let obj = object_from_slice(bytes)?;
194        Ok(Self {
195            holder_id: get_opt_string(&obj, "holder_id"),
196            ttl_ms: get_opt_u64(&obj, "ttl_ms").filter(|ttl| *ttl > 0),
197        })
198    }
199}
200
201#[derive(Debug, Clone, PartialEq, Eq)]
202pub struct FailoverPromotionReply {
203    pub ok: bool,
204    pub holder_id: String,
205    pub generation: u64,
206    pub acquired_at_ms: u64,
207    pub expires_at_ms: u64,
208    pub timeline: u64,
209    pub applied_lsn: u64,
210    pub next_step: String,
211}
212
213impl FailoverPromotionReply {
214    pub fn promoted(
215        holder_id: impl Into<String>,
216        generation: u64,
217        acquired_at_ms: u64,
218        expires_at_ms: u64,
219        timeline: u64,
220        applied_lsn: u64,
221    ) -> Self {
222        Self {
223            ok: true,
224            holder_id: holder_id.into(),
225            generation,
226            acquired_at_ms,
227            expires_at_ms,
228            timeline,
229            applied_lsn,
230            next_step: "restart with RED_REPLICATION_MODE=primary to start accepting writes"
231                .to_string(),
232        }
233    }
234
235    pub fn encode_json(&self) -> Vec<u8> {
236        let mut obj = serde_json::Map::new();
237        obj.insert("ok".to_string(), JsonValue::Bool(self.ok));
238        obj.insert(
239            "holder_id".to_string(),
240            JsonValue::String(self.holder_id.clone()),
241        );
242        obj.insert(
243            "generation".to_string(),
244            JsonValue::Number(self.generation.into()),
245        );
246        obj.insert(
247            "acquired_at_ms".to_string(),
248            JsonValue::Number(self.acquired_at_ms.into()),
249        );
250        obj.insert(
251            "expires_at_ms".to_string(),
252            JsonValue::Number(self.expires_at_ms.into()),
253        );
254        obj.insert(
255            "timeline".to_string(),
256            JsonValue::Number(self.timeline.into()),
257        );
258        obj.insert(
259            "applied_lsn".to_string(),
260            JsonValue::Number(self.applied_lsn.into()),
261        );
262        obj.insert(
263            "next_step".to_string(),
264            JsonValue::String(self.next_step.clone()),
265        );
266        serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
267    }
268
269    pub fn decode_json(bytes: &[u8]) -> Result<Self> {
270        let obj = object_from_slice(bytes)?;
271        Ok(Self {
272            ok: obj.get("ok").and_then(JsonValue::as_bool).unwrap_or(false),
273            holder_id: get_string(&obj, "holder_id")?,
274            generation: get_u64(&obj, "generation")?,
275            acquired_at_ms: get_u64(&obj, "acquired_at_ms")?,
276            expires_at_ms: get_u64(&obj, "expires_at_ms")?,
277            timeline: get_u64(&obj, "timeline")?,
278            applied_lsn: get_u64(&obj, "applied_lsn")?,
279            next_step: get_string(&obj, "next_step")?,
280        })
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287
288    #[test]
289    fn timeline_fork_notice_round_trips() {
290        let notice = TimelineForkNotice {
291            parent_timeline: 1,
292            new_timeline: 2,
293            fork_lsn: 42,
294            promoted_replica_id: Some("replica-a".to_string()),
295            created_at_unix_ms: Some(1000),
296        };
297        assert_eq!(
298            TimelineForkNotice::decode_json(&notice.encode_json()).unwrap(),
299            notice
300        );
301    }
302
303    #[test]
304    fn rejoin_plan_notice_decodes_existing_runtime_shape() {
305        let plan = RejoinPlanNotice::decode_json(
306            br#"{
307                "state":"rejoin_rewind_required",
308                "rejoin_node_timeline":1,
309                "rejoin_node_flushed_lsn":60,
310                "rejoin_available_from_lsn":40,
311                "rejoin_target_timeline":3,
312                "rejoin_rewind_to_lsn":42,
313                "rejoin_start_lsn":42
314            }"#,
315        )
316        .unwrap();
317        assert_eq!(plan.target_timeline, 3);
318        assert_eq!(plan.rewind_to_lsn, Some(42));
319    }
320
321    #[test]
322    fn rejoin_rewind_confirmation_contract_round_trips() {
323        let request = RejoinRewindConfirmation {
324            target_timeline: 3,
325            rewind_to_lsn: 42,
326        };
327        let request = RejoinRewindConfirmation::decode_json(&request.encode_json()).unwrap();
328        assert_eq!(request.target_timeline, 3);
329        assert_eq!(request.rewind_to_lsn, 42);
330
331        let reply = RejoinRewindConfirmationReply::confirmed(3, 42);
332        assert_eq!(
333            RejoinRewindConfirmationReply::decode_json(&reply.encode_json()).unwrap(),
334            reply
335        );
336    }
337
338    #[test]
339    fn failover_promotion_payloads_round_trip() {
340        let request = FailoverPromotionRequest {
341            holder_id: Some("replica-a".to_string()),
342            ttl_ms: Some(30_000),
343        };
344        let request = FailoverPromotionRequest::decode_json(&request.encode_json()).unwrap();
345        assert_eq!(request.holder_id.as_deref(), Some("replica-a"));
346        assert_eq!(request.ttl_ms, Some(30_000));
347
348        let reply = FailoverPromotionReply::promoted("replica-a", 7, 100, 200, 2, 42);
349        assert_eq!(
350            FailoverPromotionReply::decode_json(&reply.encode_json()).unwrap(),
351            reply
352        );
353    }
354}