1use serde_json::Value as JsonValue;
2
3use super::util::{get_opt_string, get_opt_u64, get_string, get_u64, object_from_slice, Result};
4
5#[derive(Debug, Clone, PartialEq, Eq)]
6pub struct TimelineForkNotice {
7 pub parent_timeline: u64,
8 pub new_timeline: u64,
9 pub fork_lsn: u64,
10 pub promoted_replica_id: Option<String>,
11 pub created_at_unix_ms: Option<u64>,
12}
13
14impl TimelineForkNotice {
15 pub fn encode_json(&self) -> Vec<u8> {
16 let mut obj = serde_json::Map::new();
17 obj.insert(
18 "parent_timeline".to_string(),
19 JsonValue::Number(self.parent_timeline.into()),
20 );
21 obj.insert(
22 "new_timeline".to_string(),
23 JsonValue::Number(self.new_timeline.into()),
24 );
25 obj.insert(
26 "fork_lsn".to_string(),
27 JsonValue::Number(self.fork_lsn.into()),
28 );
29 if let Some(replica_id) = &self.promoted_replica_id {
30 obj.insert(
31 "promoted_replica_id".to_string(),
32 JsonValue::String(replica_id.clone()),
33 );
34 }
35 if let Some(created_at) = self.created_at_unix_ms {
36 obj.insert(
37 "created_at_unix_ms".to_string(),
38 JsonValue::Number(created_at.into()),
39 );
40 }
41 serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
42 }
43
44 pub fn decode_json(bytes: &[u8]) -> Result<Self> {
45 let obj = object_from_slice(bytes)?;
46 Ok(Self {
47 parent_timeline: get_u64(&obj, "parent_timeline")?,
48 new_timeline: get_u64(&obj, "new_timeline")?,
49 fork_lsn: get_u64(&obj, "fork_lsn")?,
50 promoted_replica_id: get_opt_string(&obj, "promoted_replica_id"),
51 created_at_unix_ms: get_opt_u64(&obj, "created_at_unix_ms"),
52 })
53 }
54
55 pub fn decode_legacy_rejoin_plan(bytes: &[u8]) -> Result<Self> {
56 let obj = object_from_slice(bytes)?;
57 Ok(Self {
58 parent_timeline: get_opt_u64(&obj, "rejoin_node_timeline").unwrap_or(0),
59 new_timeline: get_u64(&obj, "rejoin_target_timeline")?,
60 fork_lsn: get_u64(&obj, "rejoin_start_lsn")
61 .or_else(|_| get_u64(&obj, "rejoin_rewind_to_lsn"))?,
62 promoted_replica_id: get_opt_string(&obj, "promoted_replica_id")
63 .or_else(|| get_opt_string(&obj, "replica_id")),
64 created_at_unix_ms: None,
65 })
66 }
67}
68
69#[derive(Debug, Clone, PartialEq, Eq)]
70pub struct RejoinPlanNotice {
71 pub state: String,
72 pub node_timeline: u64,
73 pub node_flushed_lsn: u64,
74 pub available_from_lsn: u64,
75 pub target_timeline: u64,
76 pub rewind_to_lsn: Option<u64>,
77 pub start_lsn: u64,
78}
79
80impl RejoinPlanNotice {
81 pub fn decode_json(bytes: &[u8]) -> Result<Self> {
82 let obj = object_from_slice(bytes)?;
83 Ok(Self {
84 state: get_string(&obj, "state")?,
85 node_timeline: get_u64(&obj, "rejoin_node_timeline")?,
86 node_flushed_lsn: get_u64(&obj, "rejoin_node_flushed_lsn")?,
87 available_from_lsn: get_u64(&obj, "rejoin_available_from_lsn")?,
88 target_timeline: get_u64(&obj, "rejoin_target_timeline")?,
89 rewind_to_lsn: get_opt_u64(&obj, "rejoin_rewind_to_lsn"),
90 start_lsn: get_u64(&obj, "rejoin_start_lsn")?,
91 })
92 }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub struct RejoinRewindConfirmation {
97 pub target_timeline: u64,
98 pub rewind_to_lsn: u64,
99}
100
101impl RejoinRewindConfirmation {
102 pub fn encode_json(&self) -> Vec<u8> {
103 let mut obj = serde_json::Map::new();
104 obj.insert(
105 "target_timeline".to_string(),
106 JsonValue::Number(self.target_timeline.into()),
107 );
108 obj.insert(
109 "rewind_to_lsn".to_string(),
110 JsonValue::Number(self.rewind_to_lsn.into()),
111 );
112 serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
113 }
114
115 pub fn decode_json(bytes: &[u8]) -> Result<Self> {
116 let obj = object_from_slice(bytes)?;
117 Ok(Self {
118 target_timeline: get_u64(&obj, "target_timeline")?,
119 rewind_to_lsn: get_u64(&obj, "rewind_to_lsn")?,
120 })
121 }
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct RejoinRewindConfirmationReply {
126 pub ok: bool,
127 pub target_timeline: u64,
128 pub rewind_to_lsn: u64,
129 pub next_step: String,
130}
131
132impl RejoinRewindConfirmationReply {
133 pub fn confirmed(target_timeline: u64, rewind_to_lsn: u64) -> Self {
134 Self {
135 ok: true,
136 target_timeline,
137 rewind_to_lsn,
138 next_step: "restart or resume replica apply from the confirmed LSN".to_string(),
139 }
140 }
141
142 pub fn encode_json(&self) -> Vec<u8> {
143 let mut obj = serde_json::Map::new();
144 obj.insert("ok".to_string(), JsonValue::Bool(self.ok));
145 obj.insert(
146 "target_timeline".to_string(),
147 JsonValue::Number(self.target_timeline.into()),
148 );
149 obj.insert(
150 "rewind_to_lsn".to_string(),
151 JsonValue::Number(self.rewind_to_lsn.into()),
152 );
153 obj.insert(
154 "next_step".to_string(),
155 JsonValue::String(self.next_step.clone()),
156 );
157 serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
158 }
159
160 pub fn decode_json(bytes: &[u8]) -> Result<Self> {
161 let obj = object_from_slice(bytes)?;
162 Ok(Self {
163 ok: obj.get("ok").and_then(JsonValue::as_bool).unwrap_or(false),
164 target_timeline: get_u64(&obj, "target_timeline")?,
165 rewind_to_lsn: get_u64(&obj, "rewind_to_lsn")?,
166 next_step: get_string(&obj, "next_step")?,
167 })
168 }
169}
170
171#[derive(Debug, Clone, PartialEq, Eq)]
172pub struct FailoverPromotionRequest {
173 pub holder_id: Option<String>,
174 pub ttl_ms: Option<u64>,
175}
176
177impl FailoverPromotionRequest {
178 pub fn encode_json(&self) -> Vec<u8> {
179 let mut obj = serde_json::Map::new();
180 if let Some(holder_id) = &self.holder_id {
181 obj.insert(
182 "holder_id".to_string(),
183 JsonValue::String(holder_id.clone()),
184 );
185 }
186 if let Some(ttl_ms) = self.ttl_ms.filter(|ttl| *ttl > 0) {
187 obj.insert("ttl_ms".to_string(), JsonValue::Number(ttl_ms.into()));
188 }
189 serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
190 }
191
192 pub fn decode_json(bytes: &[u8]) -> Result<Self> {
193 let obj = object_from_slice(bytes)?;
194 Ok(Self {
195 holder_id: get_opt_string(&obj, "holder_id"),
196 ttl_ms: get_opt_u64(&obj, "ttl_ms").filter(|ttl| *ttl > 0),
197 })
198 }
199}
200
201#[derive(Debug, Clone, PartialEq, Eq)]
202pub struct FailoverPromotionReply {
203 pub ok: bool,
204 pub holder_id: String,
205 pub generation: u64,
206 pub acquired_at_ms: u64,
207 pub expires_at_ms: u64,
208 pub timeline: u64,
209 pub applied_lsn: u64,
210 pub next_step: String,
211}
212
213impl FailoverPromotionReply {
214 pub fn promoted(
215 holder_id: impl Into<String>,
216 generation: u64,
217 acquired_at_ms: u64,
218 expires_at_ms: u64,
219 timeline: u64,
220 applied_lsn: u64,
221 ) -> Self {
222 Self {
223 ok: true,
224 holder_id: holder_id.into(),
225 generation,
226 acquired_at_ms,
227 expires_at_ms,
228 timeline,
229 applied_lsn,
230 next_step: "restart with RED_REPLICATION_MODE=primary to start accepting writes"
231 .to_string(),
232 }
233 }
234
235 pub fn encode_json(&self) -> Vec<u8> {
236 let mut obj = serde_json::Map::new();
237 obj.insert("ok".to_string(), JsonValue::Bool(self.ok));
238 obj.insert(
239 "holder_id".to_string(),
240 JsonValue::String(self.holder_id.clone()),
241 );
242 obj.insert(
243 "generation".to_string(),
244 JsonValue::Number(self.generation.into()),
245 );
246 obj.insert(
247 "acquired_at_ms".to_string(),
248 JsonValue::Number(self.acquired_at_ms.into()),
249 );
250 obj.insert(
251 "expires_at_ms".to_string(),
252 JsonValue::Number(self.expires_at_ms.into()),
253 );
254 obj.insert(
255 "timeline".to_string(),
256 JsonValue::Number(self.timeline.into()),
257 );
258 obj.insert(
259 "applied_lsn".to_string(),
260 JsonValue::Number(self.applied_lsn.into()),
261 );
262 obj.insert(
263 "next_step".to_string(),
264 JsonValue::String(self.next_step.clone()),
265 );
266 serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
267 }
268
269 pub fn decode_json(bytes: &[u8]) -> Result<Self> {
270 let obj = object_from_slice(bytes)?;
271 Ok(Self {
272 ok: obj.get("ok").and_then(JsonValue::as_bool).unwrap_or(false),
273 holder_id: get_string(&obj, "holder_id")?,
274 generation: get_u64(&obj, "generation")?,
275 acquired_at_ms: get_u64(&obj, "acquired_at_ms")?,
276 expires_at_ms: get_u64(&obj, "expires_at_ms")?,
277 timeline: get_u64(&obj, "timeline")?,
278 applied_lsn: get_u64(&obj, "applied_lsn")?,
279 next_step: get_string(&obj, "next_step")?,
280 })
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287
288 #[test]
289 fn timeline_fork_notice_round_trips() {
290 let notice = TimelineForkNotice {
291 parent_timeline: 1,
292 new_timeline: 2,
293 fork_lsn: 42,
294 promoted_replica_id: Some("replica-a".to_string()),
295 created_at_unix_ms: Some(1000),
296 };
297 assert_eq!(
298 TimelineForkNotice::decode_json(¬ice.encode_json()).unwrap(),
299 notice
300 );
301 }
302
303 #[test]
304 fn rejoin_plan_notice_decodes_existing_runtime_shape() {
305 let plan = RejoinPlanNotice::decode_json(
306 br#"{
307 "state":"rejoin_rewind_required",
308 "rejoin_node_timeline":1,
309 "rejoin_node_flushed_lsn":60,
310 "rejoin_available_from_lsn":40,
311 "rejoin_target_timeline":3,
312 "rejoin_rewind_to_lsn":42,
313 "rejoin_start_lsn":42
314 }"#,
315 )
316 .unwrap();
317 assert_eq!(plan.target_timeline, 3);
318 assert_eq!(plan.rewind_to_lsn, Some(42));
319 }
320
321 #[test]
322 fn rejoin_rewind_confirmation_contract_round_trips() {
323 let request = RejoinRewindConfirmation {
324 target_timeline: 3,
325 rewind_to_lsn: 42,
326 };
327 let request = RejoinRewindConfirmation::decode_json(&request.encode_json()).unwrap();
328 assert_eq!(request.target_timeline, 3);
329 assert_eq!(request.rewind_to_lsn, 42);
330
331 let reply = RejoinRewindConfirmationReply::confirmed(3, 42);
332 assert_eq!(
333 RejoinRewindConfirmationReply::decode_json(&reply.encode_json()).unwrap(),
334 reply
335 );
336 }
337
338 #[test]
339 fn failover_promotion_payloads_round_trip() {
340 let request = FailoverPromotionRequest {
341 holder_id: Some("replica-a".to_string()),
342 ttl_ms: Some(30_000),
343 };
344 let request = FailoverPromotionRequest::decode_json(&request.encode_json()).unwrap();
345 assert_eq!(request.holder_id.as_deref(), Some("replica-a"));
346 assert_eq!(request.ttl_ms, Some(30_000));
347
348 let reply = FailoverPromotionReply::promoted("replica-a", 7, 100, 200, 2, 42);
349 assert_eq!(
350 FailoverPromotionReply::decode_json(&reply.encode_json()).unwrap(),
351 reply
352 );
353 }
354}