Skip to main content

reddb_server/replication/
replica.rs

1//! Replica-side replication: connects to primary, consumes WAL records.
2
3use std::fmt;
4use std::path::Path;
5use std::time::Duration;
6
7use crate::json::Value as JsonValue;
8use crate::telemetry::admin_intent_log::{
9    AdminIntentLog, IntentArgs, IntentHandle, IntentLogError, IntentOp, IntentPhase,
10    IntentProgress, IntentSummary,
11};
12
13/// Replica replication state.
14pub struct ReplicaReplication {
15    pub primary_addr: String,
16    pub last_applied_lsn: u64,
17    pub poll_interval: Duration,
18    pub connected: bool,
19}
20
21/// Result of staging one basebackup response from `replication_snapshot`.
22#[derive(Debug, Clone)]
23pub struct StagedBaseBackupChunk {
24    pub manifest: reddb_file::PrimaryReplicaBaseBackupManifest,
25    pub chunk_ordinal: Option<u32>,
26    pub snapshot_offset: u64,
27    pub next_snapshot_offset: Option<u64>,
28    pub snapshot_complete: bool,
29}
30
31#[derive(Debug)]
32pub enum ReplicaBaseBackupError {
33    MissingField(&'static str),
34    InvalidField(&'static str),
35    Decode(String),
36    File(reddb_file::RdbFileError),
37    Io(std::io::Error),
38}
39
40impl fmt::Display for ReplicaBaseBackupError {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        match self {
43            Self::MissingField(field) => write!(f, "missing basebackup field {field}"),
44            Self::InvalidField(field) => write!(f, "invalid basebackup field {field}"),
45            Self::Decode(err) => write!(f, "decode basebackup payload: {err}"),
46            Self::File(err) => write!(f, "{err}"),
47            Self::Io(err) => write!(f, "{err}"),
48        }
49    }
50}
51
52impl std::error::Error for ReplicaBaseBackupError {}
53
54impl From<reddb_file::RdbFileError> for ReplicaBaseBackupError {
55    fn from(value: reddb_file::RdbFileError) -> Self {
56        Self::File(value)
57    }
58}
59
60impl From<std::io::Error> for ReplicaBaseBackupError {
61    fn from(value: std::io::Error) -> Self {
62        Self::Io(value)
63    }
64}
65
66impl From<reddb_wire::replication::ReplicationPayloadError> for ReplicaBaseBackupError {
67    fn from(value: reddb_wire::replication::ReplicationPayloadError) -> Self {
68        match value {
69            reddb_wire::replication::ReplicationPayloadError::MissingField(field) => {
70                Self::MissingField(field)
71            }
72            reddb_wire::replication::ReplicationPayloadError::InvalidField(field)
73            | reddb_wire::replication::ReplicationPayloadError::InvalidHex(field) => {
74                Self::InvalidField(field)
75            }
76            reddb_wire::replication::ReplicationPayloadError::NotJson
77            | reddb_wire::replication::ReplicationPayloadError::NotObject => {
78                Self::Decode(value.to_string())
79            }
80        }
81    }
82}
83
84/// Stage the basebackup chunk carried by one `replication_snapshot` response.
85///
86/// The wire payload carries a binary manifest plus at most one matching chunk
87/// for the requested snapshot offset. This helper centralizes the validation:
88/// manifest checksum, relative paths, per-chunk CRC, and atomic chunk write.
89pub fn stage_basebackup_snapshot_chunk(
90    payload: &reddb_wire::replication::BaseBackupChunk,
91    parts_root: impl AsRef<Path>,
92) -> Result<Option<StagedBaseBackupChunk>, ReplicaBaseBackupError> {
93    if !payload.basebackup_available {
94        return Ok(None);
95    }
96
97    let manifest_bytes = payload
98        .required_basebackup_manifest()?
99        .expect("basebackup_available checked above");
100    let manifest = reddb_file::PrimaryReplicaBaseBackupManifest::decode(manifest_bytes)?;
101    manifest.validate()?;
102
103    let snapshot_offset = payload.snapshot_offset;
104    let next_snapshot_offset = payload.next_snapshot_offset;
105    let snapshot_complete = payload.snapshot_complete;
106
107    let chunk_ordinal = match payload.basebackup_chunk_part()? {
108        Some(part) => {
109            let ordinal = part.ordinal;
110            if !manifest.chunks.iter().any(|chunk| chunk.ordinal == ordinal) {
111                return Err(ReplicaBaseBackupError::InvalidField(
112                    reddb_wire::replication::BASEBACKUP_CHUNK_ORDINAL_FIELD,
113                ));
114            }
115            manifest.stage_chunk_part(parts_root.as_ref(), ordinal, part.bytes)?;
116            Some(ordinal)
117        }
118        None => None,
119    };
120
121    Ok(Some(StagedBaseBackupChunk {
122        manifest,
123        chunk_ordinal,
124        snapshot_offset,
125        next_snapshot_offset,
126        snapshot_complete,
127    }))
128}
129
130pub fn recover_staged_basebackup_chunks(
131    manifest: &reddb_file::PrimaryReplicaBaseBackupManifest,
132    parts_root: impl AsRef<Path>,
133) -> Result<std::collections::BTreeSet<u32>, ReplicaBaseBackupError> {
134    manifest
135        .recover_staged_chunk_parts(parts_root)
136        .map_err(Into::into)
137}
138
139impl ReplicaReplication {
140    pub fn new(primary_addr: String, poll_interval_ms: u64) -> Self {
141        Self {
142            primary_addr,
143            last_applied_lsn: 0,
144            poll_interval: Duration::from_millis(poll_interval_ms),
145            connected: false,
146        }
147    }
148}
149
150// ---------------------------------------------------------------------------
151// Bootstrap resumability via AdminIntentLog
152// ---------------------------------------------------------------------------
153
154/// Resume point recovered from a previously checkpointed bootstrap intent.
155#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct ResumePoint {
157    pub last_applied_lsn: u64,
158    pub snapshot_token: Option<String>,
159    pub snapshot_offset: u64,
160}
161
162/// Manages bootstrap lifecycle using [`AdminIntentLog`] for crash-resumability.
163///
164/// # Single-resumer policy
165///
166/// Each node only resumes its own intents (`args.replica_id == node_id`).
167/// If multiple unfinished intents exist for this node (unexpected), none is
168/// resumed — a fresh bootstrap is started and the dangling intents are left for
169/// operator investigation via [`crate::telemetry::operator_event::OperatorEvent::DanglingAdminIntent`].
170pub struct ReplicaBootstrapper {
171    node_id: String,
172}
173
174impl ReplicaBootstrapper {
175    pub fn new(node_id: impl Into<String>) -> Self {
176        Self {
177            node_id: node_id.into(),
178        }
179    }
180
181    /// Scan `log` for unfinished bootstrap intents.
182    ///
183    /// Calls [`AdminIntentLog::scan_and_report`] first — this emits a
184    /// `DanglingAdminIntent` operator event for every unfinished intent.
185    /// Then applies the single-resumer policy: returns a [`ResumePoint`] only
186    /// if exactly one unfinished `ReplicaBootstrap` intent for this `node_id`
187    /// exists with at least one checkpoint record carrying `last_applied_lsn`.
188    pub fn scan_for_resume(&self, log: &AdminIntentLog) -> Option<ResumePoint> {
189        log.scan_and_report();
190        let mut mine: Vec<_> = log
191            .list_unfinished()
192            .into_iter()
193            .filter(|u| {
194                u.op == IntentOp::ReplicaBootstrap
195                    && u.args.get("replica_id").and_then(|v| v.as_str())
196                        == Some(self.node_id.as_str())
197            })
198            .collect();
199
200        if mine.len() != 1 {
201            return None;
202        }
203
204        let item = mine.remove(0);
205        item.last_progress
206            .as_ref()
207            .and_then(resume_point_from_progress)
208    }
209
210    /// Resume one unfinished bootstrap intent and return a handle that appends
211    /// progress to the original intent id.
212    pub fn resume<'a>(
213        &self,
214        log: &'a AdminIntentLog,
215    ) -> Option<(ResumePoint, BootstrapHandle<'a>)> {
216        log.scan_and_report();
217
218        let mut mine: Vec<_> = log
219            .list_unfinished()
220            .into_iter()
221            .filter(|u| {
222                u.op == IntentOp::ReplicaBootstrap
223                    && u.args.get("replica_id").and_then(|v| v.as_str())
224                        == Some(self.node_id.as_str())
225            })
226            .collect();
227
228        if mine.len() != 1 {
229            return None;
230        }
231
232        let item = mine.remove(0);
233        let progress = item.last_progress.as_ref()?;
234        let resume = resume_point_from_progress(progress)?;
235        let checkpoint_n = match item.last_phase {
236            IntentPhase::Checkpoint(n) => n,
237            _ => 0,
238        };
239        let handle = log.resume_unfinished(&item);
240
241        Some((
242            resume.clone(),
243            BootstrapHandle {
244                handle,
245                checkpoint_n,
246                last_applied_lsn: resume.last_applied_lsn,
247            },
248        ))
249    }
250}
251
252fn resume_point_from_progress(
253    progress: &crate::json::Map<String, JsonValue>,
254) -> Option<ResumePoint> {
255    let lsn = progress
256        .get("last_applied_lsn")
257        .and_then(|v| v.as_f64())
258        .map(|f| f as u64)
259        .unwrap_or(0);
260    let snapshot_token = progress
261        .get("snapshot_cursor")
262        .or_else(|| progress.get("snapshot_token"))
263        .and_then(|v| v.as_str())
264        .map(ToOwned::to_owned);
265    let snapshot_offset = progress
266        .get("snapshot_offset")
267        .and_then(|v| v.as_f64())
268        .map(|f| f as u64)
269        .unwrap_or(0);
270
271    Some(ResumePoint {
272        last_applied_lsn: lsn,
273        snapshot_token,
274        snapshot_offset,
275    })
276}
277
278impl ReplicaBootstrapper {
279    /// Begin a fresh bootstrap intent.
280    ///
281    /// `source_lsn`: LSN at the primary when bootstrap starts.
282    /// `target_lsn_hint`: expected completion LSN (informational).
283    pub fn begin<'a>(
284        &self,
285        log: &'a AdminIntentLog,
286        source_lsn: u64,
287        target_lsn_hint: u64,
288    ) -> Result<BootstrapHandle<'a>, IntentLogError> {
289        let args = IntentArgs::new()
290            .insert("replica_id", JsonValue::String(self.node_id.clone()))
291            .insert("source_lsn", JsonValue::Number(source_lsn as f64))
292            .insert("target_lsn_hint", JsonValue::Number(target_lsn_hint as f64));
293        let handle = log.begin(IntentOp::ReplicaBootstrap, &self.node_id, args)?;
294        Ok(BootstrapHandle {
295            handle,
296            checkpoint_n: 0,
297            last_applied_lsn: 0,
298        })
299    }
300}
301
302/// Active bootstrap handle. Call [`BootstrapHandle::checkpoint`] periodically
303/// during catchup. Call [`BootstrapHandle::complete`] on success.
304///
305/// Dropping without calling `complete` writes `aborted` to the intent log
306/// (guaranteed by [`IntentHandle`]'s `Drop` impl).
307pub struct BootstrapHandle<'a> {
308    handle: IntentHandle<'a>,
309    checkpoint_n: u32,
310    last_applied_lsn: u64,
311}
312
313impl<'a> BootstrapHandle<'a> {
314    pub fn last_applied_lsn(&self) -> u64 {
315        self.last_applied_lsn
316    }
317
318    /// Write a checkpoint with current progress. Checkpoint number auto-increments.
319    pub fn checkpoint(
320        &mut self,
321        last_applied_lsn: u64,
322        batches_applied: u64,
323    ) -> Result<(), IntentLogError> {
324        self.checkpoint_n += 1;
325        let progress = IntentProgress::new()
326            .insert(
327                "last_applied_lsn",
328                JsonValue::Number(last_applied_lsn as f64),
329            )
330            .insert("batches_applied", JsonValue::Number(batches_applied as f64));
331        self.handle.checkpoint(self.checkpoint_n, Some(progress))?;
332        self.last_applied_lsn = last_applied_lsn;
333        Ok(())
334    }
335
336    /// Checkpoint an in-flight snapshot transfer so an interrupted bootstrap
337    /// can resume from the last persisted byte offset instead of restarting
338    /// from zero (issue #830).
339    ///
340    /// The snapshot token is stored under `snapshot_cursor` because
341    /// [`AdminIntentLog`] redacts progress keys containing `token`; the public
342    /// [`ResumePoint`] still surfaces it as `snapshot_token` to callers, which
343    /// also read the legacy `snapshot_token` key as a fallback.
344    pub fn checkpoint_snapshot_transfer(
345        &mut self,
346        snapshot_token: impl Into<String>,
347        snapshot_offset: u64,
348        last_applied_lsn: u64,
349        batches_applied: u64,
350    ) -> Result<(), IntentLogError> {
351        self.checkpoint_n += 1;
352        let progress = IntentProgress::new()
353            .insert("snapshot_cursor", JsonValue::String(snapshot_token.into()))
354            .insert("snapshot_offset", JsonValue::Number(snapshot_offset as f64))
355            .insert(
356                "last_applied_lsn",
357                JsonValue::Number(last_applied_lsn as f64),
358            )
359            .insert("batches_applied", JsonValue::Number(batches_applied as f64));
360        self.handle.checkpoint(self.checkpoint_n, Some(progress))?;
361        self.last_applied_lsn = last_applied_lsn;
362        Ok(())
363    }
364
365    /// Mark bootstrap complete. Consumes the handle.
366    pub fn complete(self, total_records: u64, duration_ms: u64) -> Result<(), IntentLogError> {
367        let summary = IntentSummary::new()
368            .insert("total_records", JsonValue::Number(total_records as f64))
369            .insert("duration_ms", JsonValue::Number(duration_ms as f64));
370        self.handle.complete(Some(summary))
371    }
372}
373
374// ---------------------------------------------------------------------------
375// Tests
376// ---------------------------------------------------------------------------
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381    use std::fs;
382    use std::path::PathBuf;
383
384    fn tmp_path(label: &str) -> PathBuf {
385        let mut p = std::env::temp_dir();
386        p.push(format!(
387            "reddb-bootstrap-{}-{}.log",
388            label,
389            std::process::id()
390        ));
391        p
392    }
393
394    fn open_log(path: &PathBuf) -> AdminIntentLog {
395        AdminIntentLog::open(path).expect("open intent log")
396    }
397
398    #[test]
399    fn rebootstrap_ready_marker_write_is_atomic_and_readable() {
400        let mut data_path = std::env::temp_dir();
401        data_path.push(format!(
402            "reddb-rebootstrap-marker-{}.rdb",
403            std::process::id()
404        ));
405        let marker_path = reddb_file::layout::rebootstrap_ready_marker_path(&data_path);
406        let tmp_path = reddb_file::layout::atomic_temp_path(&marker_path);
407        let pending_path = reddb_file::layout::rebootstrap_pending_path(&data_path);
408        let _ = fs::remove_file(&marker_path);
409        let _ = fs::remove_file(&tmp_path);
410
411        reddb_file::write_rebootstrap_ready_marker(
412            &data_path,
413            &reddb_file::ReplicaRebootstrapReadyMarker {
414                pending_path: pending_path.clone(),
415                checkpoint_lsn: 7,
416                timeline: reddb_file::TimelineId::initial(),
417            },
418        )
419        .expect("write marker");
420        let ready = reddb_file::read_rebootstrap_ready_marker(&data_path).expect("read marker");
421        assert_eq!(ready.checkpoint_lsn, 7);
422        assert_eq!(ready.pending_path, pending_path);
423
424        fs::write(&tmp_path, b"stale tmp").expect("write stale tmp");
425        reddb_file::write_rebootstrap_ready_marker(
426            &data_path,
427            &reddb_file::ReplicaRebootstrapReadyMarker {
428                pending_path: reddb_file::layout::rebootstrap_pending_path(&data_path),
429                checkpoint_lsn: 8,
430                timeline: reddb_file::TimelineId(3),
431            },
432        )
433        .expect("replace marker");
434        let ready =
435            reddb_file::read_rebootstrap_ready_marker(&data_path).expect("read replaced marker");
436        assert_eq!(ready.checkpoint_lsn, 8);
437        assert_eq!(ready.timeline, reddb_file::TimelineId(3));
438
439        let _ = fs::remove_file(&marker_path);
440        let _ = fs::remove_file(&tmp_path);
441    }
442
443    // -----------------------------------------------------------------------
444    // 1. From-scratch: no unfinished intent → scan_for_resume returns None
445    // -----------------------------------------------------------------------
446    #[test]
447    fn bootstrap_from_scratch_when_no_unfinished_intent() {
448        let path = tmp_path("fresh");
449        let log = open_log(&path);
450        let bootstrapper = ReplicaBootstrapper::new("replica-1");
451
452        assert!(bootstrapper.scan_for_resume(&log).is_none());
453
454        let handle = bootstrapper.begin(&log, 0, 1000).unwrap();
455        handle.complete(500, 100).unwrap();
456
457        // Completed intent → no resume point on next boot
458        let log2 = open_log(&path);
459        assert!(bootstrapper.scan_for_resume(&log2).is_none());
460
461        let _ = std::fs::remove_file(&path);
462    }
463
464    // -----------------------------------------------------------------------
465    // 2. Crash mid-catchup (mem::forget simulates no-Drop) → resume from lsn
466    // -----------------------------------------------------------------------
467    #[test]
468    fn resume_from_checkpoint_after_crash() {
469        let path = tmp_path("resume");
470        let bootstrapper = ReplicaBootstrapper::new("replica-A");
471
472        // Phase 1: start, checkpoint at lsn=500, then "crash" (no Drop)
473        {
474            let log = open_log(&path);
475            let mut handle = bootstrapper.begin(&log, 0, 1000).unwrap();
476            handle.checkpoint(500, 10).unwrap();
477            std::mem::forget(handle);
478        }
479
480        // Phase 2: restart — resume at lsn=500, then continue to completion
481        {
482            let log2 = open_log(&path);
483            let resume = bootstrapper.scan_for_resume(&log2).expect("should resume");
484            assert_eq!(resume.last_applied_lsn, 500);
485
486            let mut handle = bootstrapper.begin(&log2, 500, 1000).unwrap();
487            handle.checkpoint(1000, 20).unwrap();
488            handle.complete(1000, 250).unwrap();
489        }
490
491        let _ = std::fs::remove_file(&path);
492    }
493
494    // -----------------------------------------------------------------------
495    // 3. Multi-replica isolation: each node sees only its own intent
496    // -----------------------------------------------------------------------
497    #[test]
498    fn multi_replica_isolation() {
499        let path = tmp_path("multi");
500        let log = open_log(&path);
501
502        let b1 = ReplicaBootstrapper::new("replica-1");
503        let b2 = ReplicaBootstrapper::new("replica-2");
504        let b3 = ReplicaBootstrapper::new("replica-3");
505
506        let mut h1 = b1.begin(&log, 0, 1000).unwrap();
507        h1.checkpoint(300, 5).unwrap();
508        std::mem::forget(h1);
509
510        let mut h2 = b2.begin(&log, 0, 2000).unwrap();
511        h2.checkpoint(700, 12).unwrap();
512        std::mem::forget(h2);
513
514        let log2 = open_log(&path);
515        let r1 = b1.scan_for_resume(&log2).map(|r| r.last_applied_lsn);
516        let r2 = b2.scan_for_resume(&log2).map(|r| r.last_applied_lsn);
517        let r3 = b3.scan_for_resume(&log2);
518
519        assert_eq!(r1, Some(300), "replica-1 resumes at 300");
520        assert_eq!(r2, Some(700), "replica-2 resumes at 700");
521        assert!(r3.is_none(), "replica-3 has no intent");
522
523        let _ = std::fs::remove_file(&path);
524    }
525
526    #[test]
527    fn resume_from_snapshot_transfer_checkpoint_after_crash() {
528        let path = tmp_path("snapshot-resume");
529        let bootstrapper = ReplicaBootstrapper::new("replica-snapshot");
530
531        {
532            let log = open_log(&path);
533            let mut handle = bootstrapper.begin(&log, 10, 1000).unwrap();
534            handle
535                .checkpoint_snapshot_transfer("snapshot-token-10", 4096, 10, 0)
536                .unwrap();
537            std::mem::forget(handle);
538        }
539
540        {
541            let log2 = open_log(&path);
542            let resume = bootstrapper.scan_for_resume(&log2).expect("should resume");
543            assert_eq!(resume.last_applied_lsn, 10);
544            assert_eq!(resume.snapshot_token.as_deref(), Some("snapshot-token-10"));
545            assert_eq!(resume.snapshot_offset, 4096);
546        }
547
548        let _ = std::fs::remove_file(&path);
549    }
550
551    #[test]
552    fn resume_snapshot_transfer_completes_original_intent() {
553        let path = tmp_path("snapshot-resume-complete");
554        let bootstrapper = ReplicaBootstrapper::new("replica-snapshot-complete");
555
556        {
557            let log = open_log(&path);
558            let mut handle = bootstrapper.begin(&log, 10, 1000).unwrap();
559            handle
560                .checkpoint_snapshot_transfer("snapshot-token-10", 4096, 10, 1)
561                .unwrap();
562            std::mem::forget(handle);
563        }
564
565        {
566            let log = open_log(&path);
567            let (resume, mut handle) = bootstrapper.resume(&log).expect("resume handle");
568            assert_eq!(resume.last_applied_lsn, 10);
569            assert_eq!(resume.snapshot_token.as_deref(), Some("snapshot-token-10"));
570            assert_eq!(resume.snapshot_offset, 4096);
571
572            handle
573                .checkpoint_snapshot_transfer("snapshot-token-10", 8192, 10, 2)
574                .unwrap();
575            handle.complete(2, 25).unwrap();
576        }
577
578        let log = open_log(&path);
579        assert!(
580            log.list_unfinished().is_empty(),
581            "resumed handle must complete the original dangling intent"
582        );
583
584        let _ = std::fs::remove_file(&path);
585    }
586
587    // -----------------------------------------------------------------------
588    // 4. Drop without complete → aborted (terminal) → list_unfinished empty
589    // -----------------------------------------------------------------------
590    #[test]
591    fn drop_without_complete_writes_aborted() {
592        let path = tmp_path("abort");
593        let log = open_log(&path);
594        let bootstrapper = ReplicaBootstrapper::new("replica-X");
595
596        {
597            let mut handle = bootstrapper.begin(&log, 0, 1000).unwrap();
598            handle.checkpoint(100, 2).unwrap();
599            // drop → aborted written by IntentHandle::Drop
600        }
601
602        let log2 = open_log(&path);
603        assert_eq!(log2.list_unfinished().len(), 0, "aborted is terminal");
604
605        let _ = std::fs::remove_file(&path);
606    }
607
608    // -----------------------------------------------------------------------
609    // 5. Success path: complete writes completed phase → no unfinished intents
610    // -----------------------------------------------------------------------
611    #[test]
612    fn bootstrap_success_completes_intent() {
613        let path = tmp_path("success");
614        let log = open_log(&path);
615        let bootstrapper = ReplicaBootstrapper::new("replica-Y");
616
617        let mut handle = bootstrapper.begin(&log, 0, 500).unwrap();
618        handle.checkpoint(250, 5).unwrap();
619        handle.checkpoint(500, 10).unwrap();
620        handle.complete(1000, 300).unwrap();
621
622        let log2 = open_log(&path);
623        assert_eq!(log2.list_unfinished().len(), 0, "completed is terminal");
624
625        let _ = std::fs::remove_file(&path);
626    }
627
628    // -----------------------------------------------------------------------
629    // 6. No resume when intent crashed before any checkpoint
630    // -----------------------------------------------------------------------
631    #[test]
632    fn no_resume_when_no_checkpoint_progress() {
633        let path = tmp_path("no-progress");
634        let log = open_log(&path);
635        let bootstrapper = ReplicaBootstrapper::new("replica-Z");
636
637        // Crash before any checkpoint — no progress in the intent log
638        let handle = bootstrapper.begin(&log, 0, 1000).unwrap();
639        std::mem::forget(handle);
640
641        let log2 = open_log(&path);
642        let resume = bootstrapper.scan_for_resume(&log2);
643        assert!(resume.is_none(), "no checkpoint → no resume point");
644
645        let _ = std::fs::remove_file(&path);
646    }
647}