reddb_server/replication/
replica.rs1use std::time::Duration;
4
5use crate::json::Value as JsonValue;
6use crate::telemetry::admin_intent_log::{
7 AdminIntentLog, IntentArgs, IntentHandle, IntentLogError, IntentOp, IntentProgress,
8 IntentSummary,
9};
10
11pub struct ReplicaReplication {
13 pub primary_addr: String,
14 pub last_applied_lsn: u64,
15 pub poll_interval: Duration,
16 pub connected: bool,
17}
18
19impl ReplicaReplication {
20 pub fn new(primary_addr: String, poll_interval_ms: u64) -> Self {
21 Self {
22 primary_addr,
23 last_applied_lsn: 0,
24 poll_interval: Duration::from_millis(poll_interval_ms),
25 connected: false,
26 }
27 }
28}
29
30pub struct ResumePoint {
36 pub last_applied_lsn: u64,
37 pub snapshot_token: Option<String>,
38 pub snapshot_offset: u64,
39}
40
41pub struct ReplicaBootstrapper {
50 node_id: String,
51}
52
53impl ReplicaBootstrapper {
54 pub fn new(node_id: impl Into<String>) -> Self {
55 Self {
56 node_id: node_id.into(),
57 }
58 }
59
60 pub fn scan_for_resume(&self, log: &AdminIntentLog) -> Option<ResumePoint> {
68 log.scan_and_report();
69
70 let mut mine: Vec<_> = log
71 .list_unfinished()
72 .into_iter()
73 .filter(|u| {
74 u.op == IntentOp::ReplicaBootstrap
75 && u.args.get("replica_id").and_then(|v| v.as_str())
76 == Some(self.node_id.as_str())
77 })
78 .collect();
79
80 if mine.len() != 1 {
81 return None;
82 }
83
84 let item = mine.remove(0);
85 let progress = item.last_progress?;
86 let lsn = progress
87 .get("last_applied_lsn")
88 .and_then(|v| v.as_f64())
89 .map(|f| f as u64)
90 .unwrap_or(0);
91 let snapshot_token = progress
92 .get("snapshot_cursor")
93 .or_else(|| progress.get("snapshot_token"))
94 .and_then(|v| v.as_str())
95 .map(ToOwned::to_owned);
96 let snapshot_offset = progress
97 .get("snapshot_offset")
98 .and_then(|v| v.as_f64())
99 .map(|f| f as u64)
100 .unwrap_or(0);
101
102 Some(ResumePoint {
103 last_applied_lsn: lsn,
104 snapshot_token,
105 snapshot_offset,
106 })
107 }
108
109 pub fn begin<'a>(
114 &self,
115 log: &'a AdminIntentLog,
116 source_lsn: u64,
117 target_lsn_hint: u64,
118 ) -> Result<BootstrapHandle<'a>, IntentLogError> {
119 let args = IntentArgs::new()
120 .insert("replica_id", JsonValue::String(self.node_id.clone()))
121 .insert("source_lsn", JsonValue::Number(source_lsn as f64))
122 .insert("target_lsn_hint", JsonValue::Number(target_lsn_hint as f64));
123 let handle = log.begin(IntentOp::ReplicaBootstrap, &self.node_id, args)?;
124 Ok(BootstrapHandle {
125 handle,
126 checkpoint_n: 0,
127 last_applied_lsn: 0,
128 })
129 }
130}
131
132pub struct BootstrapHandle<'a> {
138 handle: IntentHandle<'a>,
139 checkpoint_n: u32,
140 last_applied_lsn: u64,
141}
142
143impl<'a> BootstrapHandle<'a> {
144 pub fn last_applied_lsn(&self) -> u64 {
145 self.last_applied_lsn
146 }
147
148 pub fn checkpoint(
150 &mut self,
151 last_applied_lsn: u64,
152 batches_applied: u64,
153 ) -> Result<(), IntentLogError> {
154 self.checkpoint_n += 1;
155 let progress = IntentProgress::new()
156 .insert(
157 "last_applied_lsn",
158 JsonValue::Number(last_applied_lsn as f64),
159 )
160 .insert("batches_applied", JsonValue::Number(batches_applied as f64));
161 self.handle.checkpoint(self.checkpoint_n, Some(progress))?;
162 self.last_applied_lsn = last_applied_lsn;
163 Ok(())
164 }
165
166 pub fn checkpoint_snapshot_transfer(
175 &mut self,
176 snapshot_token: impl Into<String>,
177 snapshot_offset: u64,
178 last_applied_lsn: u64,
179 batches_applied: u64,
180 ) -> Result<(), IntentLogError> {
181 self.checkpoint_n += 1;
182 let progress = IntentProgress::new()
183 .insert("snapshot_cursor", JsonValue::String(snapshot_token.into()))
184 .insert("snapshot_offset", JsonValue::Number(snapshot_offset as f64))
185 .insert(
186 "last_applied_lsn",
187 JsonValue::Number(last_applied_lsn as f64),
188 )
189 .insert("batches_applied", JsonValue::Number(batches_applied as f64));
190 self.handle.checkpoint(self.checkpoint_n, Some(progress))?;
191 self.last_applied_lsn = last_applied_lsn;
192 Ok(())
193 }
194
195 pub fn complete(self, total_records: u64, duration_ms: u64) -> Result<(), IntentLogError> {
197 let summary = IntentSummary::new()
198 .insert("total_records", JsonValue::Number(total_records as f64))
199 .insert("duration_ms", JsonValue::Number(duration_ms as f64));
200 self.handle.complete(Some(summary))
201 }
202}
203
204#[cfg(test)]
209mod tests {
210 use super::*;
211 use std::path::PathBuf;
212
213 fn tmp_path(label: &str) -> PathBuf {
214 let mut p = std::env::temp_dir();
215 p.push(format!(
216 "reddb-bootstrap-{}-{}.log",
217 label,
218 std::process::id()
219 ));
220 p
221 }
222
223 fn open_log(path: &PathBuf) -> AdminIntentLog {
224 AdminIntentLog::open(path).expect("open intent log")
225 }
226
227 #[test]
231 fn bootstrap_from_scratch_when_no_unfinished_intent() {
232 let path = tmp_path("fresh");
233 let log = open_log(&path);
234 let bootstrapper = ReplicaBootstrapper::new("replica-1");
235
236 assert!(bootstrapper.scan_for_resume(&log).is_none());
237
238 let handle = bootstrapper.begin(&log, 0, 1000).unwrap();
239 handle.complete(500, 100).unwrap();
240
241 let log2 = open_log(&path);
243 assert!(bootstrapper.scan_for_resume(&log2).is_none());
244
245 let _ = std::fs::remove_file(&path);
246 }
247
248 #[test]
252 fn resume_from_checkpoint_after_crash() {
253 let path = tmp_path("resume");
254 let bootstrapper = ReplicaBootstrapper::new("replica-A");
255
256 {
258 let log = open_log(&path);
259 let mut handle = bootstrapper.begin(&log, 0, 1000).unwrap();
260 handle.checkpoint(500, 10).unwrap();
261 std::mem::forget(handle);
262 }
263
264 {
266 let log2 = open_log(&path);
267 let resume = bootstrapper.scan_for_resume(&log2).expect("should resume");
268 assert_eq!(resume.last_applied_lsn, 500);
269
270 let mut handle = bootstrapper.begin(&log2, 500, 1000).unwrap();
271 handle.checkpoint(1000, 20).unwrap();
272 handle.complete(1000, 250).unwrap();
273 }
274
275 let _ = std::fs::remove_file(&path);
276 }
277
278 #[test]
282 fn multi_replica_isolation() {
283 let path = tmp_path("multi");
284 let log = open_log(&path);
285
286 let b1 = ReplicaBootstrapper::new("replica-1");
287 let b2 = ReplicaBootstrapper::new("replica-2");
288 let b3 = ReplicaBootstrapper::new("replica-3");
289
290 let mut h1 = b1.begin(&log, 0, 1000).unwrap();
291 h1.checkpoint(300, 5).unwrap();
292 std::mem::forget(h1);
293
294 let mut h2 = b2.begin(&log, 0, 2000).unwrap();
295 h2.checkpoint(700, 12).unwrap();
296 std::mem::forget(h2);
297
298 let log2 = open_log(&path);
299 let r1 = b1.scan_for_resume(&log2).map(|r| r.last_applied_lsn);
300 let r2 = b2.scan_for_resume(&log2).map(|r| r.last_applied_lsn);
301 let r3 = b3.scan_for_resume(&log2);
302
303 assert_eq!(r1, Some(300), "replica-1 resumes at 300");
304 assert_eq!(r2, Some(700), "replica-2 resumes at 700");
305 assert!(r3.is_none(), "replica-3 has no intent");
306
307 let _ = std::fs::remove_file(&path);
308 }
309
310 #[test]
311 fn resume_from_snapshot_transfer_checkpoint_after_crash() {
312 let path = tmp_path("snapshot-resume");
313 let bootstrapper = ReplicaBootstrapper::new("replica-snapshot");
314
315 {
316 let log = open_log(&path);
317 let mut handle = bootstrapper.begin(&log, 10, 1000).unwrap();
318 handle
319 .checkpoint_snapshot_transfer("snapshot-token-10", 4096, 10, 0)
320 .unwrap();
321 std::mem::forget(handle);
322 }
323
324 {
325 let log2 = open_log(&path);
326 let resume = bootstrapper.scan_for_resume(&log2).expect("should resume");
327 assert_eq!(resume.last_applied_lsn, 10);
328 assert_eq!(resume.snapshot_token.as_deref(), Some("snapshot-token-10"));
329 assert_eq!(resume.snapshot_offset, 4096);
330 }
331
332 let _ = std::fs::remove_file(&path);
333 }
334
335 #[test]
339 fn drop_without_complete_writes_aborted() {
340 let path = tmp_path("abort");
341 let log = open_log(&path);
342 let bootstrapper = ReplicaBootstrapper::new("replica-X");
343
344 {
345 let mut handle = bootstrapper.begin(&log, 0, 1000).unwrap();
346 handle.checkpoint(100, 2).unwrap();
347 }
349
350 let log2 = open_log(&path);
351 assert_eq!(log2.list_unfinished().len(), 0, "aborted is terminal");
352
353 let _ = std::fs::remove_file(&path);
354 }
355
356 #[test]
360 fn bootstrap_success_completes_intent() {
361 let path = tmp_path("success");
362 let log = open_log(&path);
363 let bootstrapper = ReplicaBootstrapper::new("replica-Y");
364
365 let mut handle = bootstrapper.begin(&log, 0, 500).unwrap();
366 handle.checkpoint(250, 5).unwrap();
367 handle.checkpoint(500, 10).unwrap();
368 handle.complete(1000, 300).unwrap();
369
370 let log2 = open_log(&path);
371 assert_eq!(log2.list_unfinished().len(), 0, "completed is terminal");
372
373 let _ = std::fs::remove_file(&path);
374 }
375
376 #[test]
380 fn no_resume_when_no_checkpoint_progress() {
381 let path = tmp_path("no-progress");
382 let log = open_log(&path);
383 let bootstrapper = ReplicaBootstrapper::new("replica-Z");
384
385 let handle = bootstrapper.begin(&log, 0, 1000).unwrap();
387 std::mem::forget(handle);
388
389 let log2 = open_log(&path);
390 let resume = bootstrapper.scan_for_resume(&log2);
391 assert!(resume.is_none(), "no checkpoint → no resume point");
392
393 let _ = std::fs::remove_file(&path);
394 }
395}