1use std::fmt;
4use std::path::Path;
5use std::time::Duration;
6
7use crate::json::Value as JsonValue;
8use crate::telemetry::admin_intent_log::{
9 AdminIntentLog, IntentArgs, IntentHandle, IntentLogError, IntentOp, IntentPhase,
10 IntentProgress, IntentSummary,
11};
12
13pub struct ReplicaReplication {
15 pub primary_addr: String,
16 pub last_applied_lsn: u64,
17 pub poll_interval: Duration,
18 pub connected: bool,
19}
20
21#[derive(Debug, Clone)]
23pub struct StagedBaseBackupChunk {
24 pub manifest: reddb_file::PrimaryReplicaBaseBackupManifest,
25 pub chunk_ordinal: Option<u32>,
26 pub snapshot_offset: u64,
27 pub next_snapshot_offset: Option<u64>,
28 pub snapshot_complete: bool,
29}
30
31#[derive(Debug)]
32pub enum ReplicaBaseBackupError {
33 MissingField(&'static str),
34 InvalidField(&'static str),
35 Decode(String),
36 File(reddb_file::RdbFileError),
37 Io(std::io::Error),
38}
39
40impl fmt::Display for ReplicaBaseBackupError {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 match self {
43 Self::MissingField(field) => write!(f, "missing basebackup field {field}"),
44 Self::InvalidField(field) => write!(f, "invalid basebackup field {field}"),
45 Self::Decode(err) => write!(f, "decode basebackup payload: {err}"),
46 Self::File(err) => write!(f, "{err}"),
47 Self::Io(err) => write!(f, "{err}"),
48 }
49 }
50}
51
52impl std::error::Error for ReplicaBaseBackupError {}
53
54impl From<reddb_file::RdbFileError> for ReplicaBaseBackupError {
55 fn from(value: reddb_file::RdbFileError) -> Self {
56 Self::File(value)
57 }
58}
59
60impl From<std::io::Error> for ReplicaBaseBackupError {
61 fn from(value: std::io::Error) -> Self {
62 Self::Io(value)
63 }
64}
65
66impl From<reddb_wire::replication::ReplicationPayloadError> for ReplicaBaseBackupError {
67 fn from(value: reddb_wire::replication::ReplicationPayloadError) -> Self {
68 match value {
69 reddb_wire::replication::ReplicationPayloadError::MissingField(field) => {
70 Self::MissingField(field)
71 }
72 reddb_wire::replication::ReplicationPayloadError::InvalidField(field)
73 | reddb_wire::replication::ReplicationPayloadError::InvalidHex(field) => {
74 Self::InvalidField(field)
75 }
76 reddb_wire::replication::ReplicationPayloadError::NotJson
77 | reddb_wire::replication::ReplicationPayloadError::NotObject => {
78 Self::Decode(value.to_string())
79 }
80 }
81 }
82}
83
84pub fn stage_basebackup_snapshot_chunk(
90 payload: &reddb_wire::replication::BaseBackupChunk,
91 parts_root: impl AsRef<Path>,
92) -> Result<Option<StagedBaseBackupChunk>, ReplicaBaseBackupError> {
93 if !payload.basebackup_available {
94 return Ok(None);
95 }
96
97 let manifest_bytes = payload
98 .required_basebackup_manifest()?
99 .expect("basebackup_available checked above");
100 let manifest = reddb_file::PrimaryReplicaBaseBackupManifest::decode(manifest_bytes)?;
101 manifest.validate()?;
102
103 let snapshot_offset = payload.snapshot_offset;
104 let next_snapshot_offset = payload.next_snapshot_offset;
105 let snapshot_complete = payload.snapshot_complete;
106
107 let chunk_ordinal = match payload.basebackup_chunk_part()? {
108 Some(part) => {
109 let ordinal = part.ordinal;
110 if !manifest.chunks.iter().any(|chunk| chunk.ordinal == ordinal) {
111 return Err(ReplicaBaseBackupError::InvalidField(
112 reddb_wire::replication::BASEBACKUP_CHUNK_ORDINAL_FIELD,
113 ));
114 }
115 manifest.stage_chunk_part(parts_root.as_ref(), ordinal, part.bytes)?;
116 Some(ordinal)
117 }
118 None => None,
119 };
120
121 Ok(Some(StagedBaseBackupChunk {
122 manifest,
123 chunk_ordinal,
124 snapshot_offset,
125 next_snapshot_offset,
126 snapshot_complete,
127 }))
128}
129
130pub fn recover_staged_basebackup_chunks(
131 manifest: &reddb_file::PrimaryReplicaBaseBackupManifest,
132 parts_root: impl AsRef<Path>,
133) -> Result<std::collections::BTreeSet<u32>, ReplicaBaseBackupError> {
134 manifest
135 .recover_staged_chunk_parts(parts_root)
136 .map_err(Into::into)
137}
138
139impl ReplicaReplication {
140 pub fn new(primary_addr: String, poll_interval_ms: u64) -> Self {
141 Self {
142 primary_addr,
143 last_applied_lsn: 0,
144 poll_interval: Duration::from_millis(poll_interval_ms),
145 connected: false,
146 }
147 }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct ResumePoint {
157 pub last_applied_lsn: u64,
158 pub snapshot_token: Option<String>,
159 pub snapshot_offset: u64,
160}
161
162pub struct ReplicaBootstrapper {
171 node_id: String,
172}
173
174impl ReplicaBootstrapper {
175 pub fn new(node_id: impl Into<String>) -> Self {
176 Self {
177 node_id: node_id.into(),
178 }
179 }
180
181 pub fn scan_for_resume(&self, log: &AdminIntentLog) -> Option<ResumePoint> {
189 log.scan_and_report();
190 let mut mine: Vec<_> = log
191 .list_unfinished()
192 .into_iter()
193 .filter(|u| {
194 u.op == IntentOp::ReplicaBootstrap
195 && u.args.get("replica_id").and_then(|v| v.as_str())
196 == Some(self.node_id.as_str())
197 })
198 .collect();
199
200 if mine.len() != 1 {
201 return None;
202 }
203
204 let item = mine.remove(0);
205 item.last_progress
206 .as_ref()
207 .and_then(resume_point_from_progress)
208 }
209
210 pub fn resume<'a>(
213 &self,
214 log: &'a AdminIntentLog,
215 ) -> Option<(ResumePoint, BootstrapHandle<'a>)> {
216 log.scan_and_report();
217
218 let mut mine: Vec<_> = log
219 .list_unfinished()
220 .into_iter()
221 .filter(|u| {
222 u.op == IntentOp::ReplicaBootstrap
223 && u.args.get("replica_id").and_then(|v| v.as_str())
224 == Some(self.node_id.as_str())
225 })
226 .collect();
227
228 if mine.len() != 1 {
229 return None;
230 }
231
232 let item = mine.remove(0);
233 let progress = item.last_progress.as_ref()?;
234 let resume = resume_point_from_progress(progress)?;
235 let checkpoint_n = match item.last_phase {
236 IntentPhase::Checkpoint(n) => n,
237 _ => 0,
238 };
239 let handle = log.resume_unfinished(&item);
240
241 Some((
242 resume.clone(),
243 BootstrapHandle {
244 handle,
245 checkpoint_n,
246 last_applied_lsn: resume.last_applied_lsn,
247 },
248 ))
249 }
250}
251
252fn resume_point_from_progress(
253 progress: &crate::json::Map<String, JsonValue>,
254) -> Option<ResumePoint> {
255 let lsn = progress
256 .get("last_applied_lsn")
257 .and_then(|v| v.as_f64())
258 .map(|f| f as u64)
259 .unwrap_or(0);
260 let snapshot_token = progress
261 .get("snapshot_cursor")
262 .or_else(|| progress.get("snapshot_token"))
263 .and_then(|v| v.as_str())
264 .map(ToOwned::to_owned);
265 let snapshot_offset = progress
266 .get("snapshot_offset")
267 .and_then(|v| v.as_f64())
268 .map(|f| f as u64)
269 .unwrap_or(0);
270
271 Some(ResumePoint {
272 last_applied_lsn: lsn,
273 snapshot_token,
274 snapshot_offset,
275 })
276}
277
278impl ReplicaBootstrapper {
279 pub fn begin<'a>(
284 &self,
285 log: &'a AdminIntentLog,
286 source_lsn: u64,
287 target_lsn_hint: u64,
288 ) -> Result<BootstrapHandle<'a>, IntentLogError> {
289 let args = IntentArgs::new()
290 .insert("replica_id", JsonValue::String(self.node_id.clone()))
291 .insert("source_lsn", JsonValue::Number(source_lsn as f64))
292 .insert("target_lsn_hint", JsonValue::Number(target_lsn_hint as f64));
293 let handle = log.begin(IntentOp::ReplicaBootstrap, &self.node_id, args)?;
294 Ok(BootstrapHandle {
295 handle,
296 checkpoint_n: 0,
297 last_applied_lsn: 0,
298 })
299 }
300}
301
302pub struct BootstrapHandle<'a> {
308 handle: IntentHandle<'a>,
309 checkpoint_n: u32,
310 last_applied_lsn: u64,
311}
312
313impl<'a> BootstrapHandle<'a> {
314 pub fn last_applied_lsn(&self) -> u64 {
315 self.last_applied_lsn
316 }
317
318 pub fn checkpoint(
320 &mut self,
321 last_applied_lsn: u64,
322 batches_applied: u64,
323 ) -> Result<(), IntentLogError> {
324 self.checkpoint_n += 1;
325 let progress = IntentProgress::new()
326 .insert(
327 "last_applied_lsn",
328 JsonValue::Number(last_applied_lsn as f64),
329 )
330 .insert("batches_applied", JsonValue::Number(batches_applied as f64));
331 self.handle.checkpoint(self.checkpoint_n, Some(progress))?;
332 self.last_applied_lsn = last_applied_lsn;
333 Ok(())
334 }
335
336 pub fn checkpoint_snapshot_transfer(
345 &mut self,
346 snapshot_token: impl Into<String>,
347 snapshot_offset: u64,
348 last_applied_lsn: u64,
349 batches_applied: u64,
350 ) -> Result<(), IntentLogError> {
351 self.checkpoint_n += 1;
352 let progress = IntentProgress::new()
353 .insert("snapshot_cursor", JsonValue::String(snapshot_token.into()))
354 .insert("snapshot_offset", JsonValue::Number(snapshot_offset as f64))
355 .insert(
356 "last_applied_lsn",
357 JsonValue::Number(last_applied_lsn as f64),
358 )
359 .insert("batches_applied", JsonValue::Number(batches_applied as f64));
360 self.handle.checkpoint(self.checkpoint_n, Some(progress))?;
361 self.last_applied_lsn = last_applied_lsn;
362 Ok(())
363 }
364
365 pub fn complete(self, total_records: u64, duration_ms: u64) -> Result<(), IntentLogError> {
367 let summary = IntentSummary::new()
368 .insert("total_records", JsonValue::Number(total_records as f64))
369 .insert("duration_ms", JsonValue::Number(duration_ms as f64));
370 self.handle.complete(Some(summary))
371 }
372}
373
374#[cfg(test)]
379mod tests {
380 use super::*;
381 use std::fs;
382 use std::path::PathBuf;
383
384 fn tmp_path(label: &str) -> PathBuf {
385 let mut p = std::env::temp_dir();
386 p.push(format!(
387 "reddb-bootstrap-{}-{}.log",
388 label,
389 std::process::id()
390 ));
391 p
392 }
393
394 fn open_log(path: &PathBuf) -> AdminIntentLog {
395 AdminIntentLog::open(path).expect("open intent log")
396 }
397
398 #[test]
399 fn rebootstrap_ready_marker_write_is_atomic_and_readable() {
400 let mut data_path = std::env::temp_dir();
401 data_path.push(format!(
402 "reddb-rebootstrap-marker-{}.rdb",
403 std::process::id()
404 ));
405 let marker_path = reddb_file::layout::rebootstrap_ready_marker_path(&data_path);
406 let tmp_path = reddb_file::layout::atomic_temp_path(&marker_path);
407 let pending_path = reddb_file::layout::rebootstrap_pending_path(&data_path);
408 let _ = fs::remove_file(&marker_path);
409 let _ = fs::remove_file(&tmp_path);
410
411 reddb_file::write_rebootstrap_ready_marker(
412 &data_path,
413 &reddb_file::ReplicaRebootstrapReadyMarker {
414 pending_path: pending_path.clone(),
415 checkpoint_lsn: 7,
416 timeline: reddb_file::TimelineId::initial(),
417 },
418 )
419 .expect("write marker");
420 let ready = reddb_file::read_rebootstrap_ready_marker(&data_path).expect("read marker");
421 assert_eq!(ready.checkpoint_lsn, 7);
422 assert_eq!(ready.pending_path, pending_path);
423
424 fs::write(&tmp_path, b"stale tmp").expect("write stale tmp");
425 reddb_file::write_rebootstrap_ready_marker(
426 &data_path,
427 &reddb_file::ReplicaRebootstrapReadyMarker {
428 pending_path: reddb_file::layout::rebootstrap_pending_path(&data_path),
429 checkpoint_lsn: 8,
430 timeline: reddb_file::TimelineId(3),
431 },
432 )
433 .expect("replace marker");
434 let ready =
435 reddb_file::read_rebootstrap_ready_marker(&data_path).expect("read replaced marker");
436 assert_eq!(ready.checkpoint_lsn, 8);
437 assert_eq!(ready.timeline, reddb_file::TimelineId(3));
438
439 let _ = fs::remove_file(&marker_path);
440 let _ = fs::remove_file(&tmp_path);
441 }
442
443 #[test]
447 fn bootstrap_from_scratch_when_no_unfinished_intent() {
448 let path = tmp_path("fresh");
449 let log = open_log(&path);
450 let bootstrapper = ReplicaBootstrapper::new("replica-1");
451
452 assert!(bootstrapper.scan_for_resume(&log).is_none());
453
454 let handle = bootstrapper.begin(&log, 0, 1000).unwrap();
455 handle.complete(500, 100).unwrap();
456
457 let log2 = open_log(&path);
459 assert!(bootstrapper.scan_for_resume(&log2).is_none());
460
461 let _ = std::fs::remove_file(&path);
462 }
463
464 #[test]
468 fn resume_from_checkpoint_after_crash() {
469 let path = tmp_path("resume");
470 let bootstrapper = ReplicaBootstrapper::new("replica-A");
471
472 {
474 let log = open_log(&path);
475 let mut handle = bootstrapper.begin(&log, 0, 1000).unwrap();
476 handle.checkpoint(500, 10).unwrap();
477 std::mem::forget(handle);
478 }
479
480 {
482 let log2 = open_log(&path);
483 let resume = bootstrapper.scan_for_resume(&log2).expect("should resume");
484 assert_eq!(resume.last_applied_lsn, 500);
485
486 let mut handle = bootstrapper.begin(&log2, 500, 1000).unwrap();
487 handle.checkpoint(1000, 20).unwrap();
488 handle.complete(1000, 250).unwrap();
489 }
490
491 let _ = std::fs::remove_file(&path);
492 }
493
494 #[test]
498 fn multi_replica_isolation() {
499 let path = tmp_path("multi");
500 let log = open_log(&path);
501
502 let b1 = ReplicaBootstrapper::new("replica-1");
503 let b2 = ReplicaBootstrapper::new("replica-2");
504 let b3 = ReplicaBootstrapper::new("replica-3");
505
506 let mut h1 = b1.begin(&log, 0, 1000).unwrap();
507 h1.checkpoint(300, 5).unwrap();
508 std::mem::forget(h1);
509
510 let mut h2 = b2.begin(&log, 0, 2000).unwrap();
511 h2.checkpoint(700, 12).unwrap();
512 std::mem::forget(h2);
513
514 let log2 = open_log(&path);
515 let r1 = b1.scan_for_resume(&log2).map(|r| r.last_applied_lsn);
516 let r2 = b2.scan_for_resume(&log2).map(|r| r.last_applied_lsn);
517 let r3 = b3.scan_for_resume(&log2);
518
519 assert_eq!(r1, Some(300), "replica-1 resumes at 300");
520 assert_eq!(r2, Some(700), "replica-2 resumes at 700");
521 assert!(r3.is_none(), "replica-3 has no intent");
522
523 let _ = std::fs::remove_file(&path);
524 }
525
526 #[test]
527 fn resume_from_snapshot_transfer_checkpoint_after_crash() {
528 let path = tmp_path("snapshot-resume");
529 let bootstrapper = ReplicaBootstrapper::new("replica-snapshot");
530
531 {
532 let log = open_log(&path);
533 let mut handle = bootstrapper.begin(&log, 10, 1000).unwrap();
534 handle
535 .checkpoint_snapshot_transfer("snapshot-token-10", 4096, 10, 0)
536 .unwrap();
537 std::mem::forget(handle);
538 }
539
540 {
541 let log2 = open_log(&path);
542 let resume = bootstrapper.scan_for_resume(&log2).expect("should resume");
543 assert_eq!(resume.last_applied_lsn, 10);
544 assert_eq!(resume.snapshot_token.as_deref(), Some("snapshot-token-10"));
545 assert_eq!(resume.snapshot_offset, 4096);
546 }
547
548 let _ = std::fs::remove_file(&path);
549 }
550
551 #[test]
552 fn resume_snapshot_transfer_completes_original_intent() {
553 let path = tmp_path("snapshot-resume-complete");
554 let bootstrapper = ReplicaBootstrapper::new("replica-snapshot-complete");
555
556 {
557 let log = open_log(&path);
558 let mut handle = bootstrapper.begin(&log, 10, 1000).unwrap();
559 handle
560 .checkpoint_snapshot_transfer("snapshot-token-10", 4096, 10, 1)
561 .unwrap();
562 std::mem::forget(handle);
563 }
564
565 {
566 let log = open_log(&path);
567 let (resume, mut handle) = bootstrapper.resume(&log).expect("resume handle");
568 assert_eq!(resume.last_applied_lsn, 10);
569 assert_eq!(resume.snapshot_token.as_deref(), Some("snapshot-token-10"));
570 assert_eq!(resume.snapshot_offset, 4096);
571
572 handle
573 .checkpoint_snapshot_transfer("snapshot-token-10", 8192, 10, 2)
574 .unwrap();
575 handle.complete(2, 25).unwrap();
576 }
577
578 let log = open_log(&path);
579 assert!(
580 log.list_unfinished().is_empty(),
581 "resumed handle must complete the original dangling intent"
582 );
583
584 let _ = std::fs::remove_file(&path);
585 }
586
587 #[test]
591 fn drop_without_complete_writes_aborted() {
592 let path = tmp_path("abort");
593 let log = open_log(&path);
594 let bootstrapper = ReplicaBootstrapper::new("replica-X");
595
596 {
597 let mut handle = bootstrapper.begin(&log, 0, 1000).unwrap();
598 handle.checkpoint(100, 2).unwrap();
599 }
601
602 let log2 = open_log(&path);
603 assert_eq!(log2.list_unfinished().len(), 0, "aborted is terminal");
604
605 let _ = std::fs::remove_file(&path);
606 }
607
608 #[test]
612 fn bootstrap_success_completes_intent() {
613 let path = tmp_path("success");
614 let log = open_log(&path);
615 let bootstrapper = ReplicaBootstrapper::new("replica-Y");
616
617 let mut handle = bootstrapper.begin(&log, 0, 500).unwrap();
618 handle.checkpoint(250, 5).unwrap();
619 handle.checkpoint(500, 10).unwrap();
620 handle.complete(1000, 300).unwrap();
621
622 let log2 = open_log(&path);
623 assert_eq!(log2.list_unfinished().len(), 0, "completed is terminal");
624
625 let _ = std::fs::remove_file(&path);
626 }
627
628 #[test]
632 fn no_resume_when_no_checkpoint_progress() {
633 let path = tmp_path("no-progress");
634 let log = open_log(&path);
635 let bootstrapper = ReplicaBootstrapper::new("replica-Z");
636
637 let handle = bootstrapper.begin(&log, 0, 1000).unwrap();
639 std::mem::forget(handle);
640
641 let log2 = open_log(&path);
642 let resume = bootstrapper.scan_for_resume(&log2);
643 assert!(resume.is_none(), "no checkpoint → no resume point");
644
645 let _ = std::fs::remove_file(&path);
646 }
647}