Skip to main content

reddb_server/replication/
replica.rs

1//! Replica-side replication: connects to primary, consumes WAL records.
2
3use std::time::Duration;
4
5use crate::json::Value as JsonValue;
6use crate::telemetry::admin_intent_log::{
7    AdminIntentLog, IntentArgs, IntentHandle, IntentLogError, IntentOp, IntentProgress,
8    IntentSummary,
9};
10
11/// Replica replication state.
12pub struct ReplicaReplication {
13    pub primary_addr: String,
14    pub last_applied_lsn: u64,
15    pub poll_interval: Duration,
16    pub connected: bool,
17}
18
19impl ReplicaReplication {
20    pub fn new(primary_addr: String, poll_interval_ms: u64) -> Self {
21        Self {
22            primary_addr,
23            last_applied_lsn: 0,
24            poll_interval: Duration::from_millis(poll_interval_ms),
25            connected: false,
26        }
27    }
28}
29
30// ---------------------------------------------------------------------------
31// Bootstrap resumability via AdminIntentLog
32// ---------------------------------------------------------------------------
33
34/// Resume point recovered from a previously checkpointed bootstrap intent.
35pub struct ResumePoint {
36    pub last_applied_lsn: u64,
37    pub snapshot_token: Option<String>,
38    pub snapshot_offset: u64,
39}
40
41/// Manages bootstrap lifecycle using [`AdminIntentLog`] for crash-resumability.
42///
43/// # Single-resumer policy
44///
45/// Each node only resumes its own intents (`args.replica_id == node_id`).
46/// If multiple unfinished intents exist for this node (unexpected), none is
47/// resumed — a fresh bootstrap is started and the dangling intents are left for
48/// operator investigation via [`crate::telemetry::operator_event::OperatorEvent::DanglingAdminIntent`].
49pub struct ReplicaBootstrapper {
50    node_id: String,
51}
52
53impl ReplicaBootstrapper {
54    pub fn new(node_id: impl Into<String>) -> Self {
55        Self {
56            node_id: node_id.into(),
57        }
58    }
59
60    /// Scan `log` for unfinished bootstrap intents.
61    ///
62    /// Calls [`AdminIntentLog::scan_and_report`] first — this emits a
63    /// `DanglingAdminIntent` operator event for every unfinished intent.
64    /// Then applies the single-resumer policy: returns a [`ResumePoint`] only
65    /// if exactly one unfinished `ReplicaBootstrap` intent for this `node_id`
66    /// exists with at least one checkpoint record carrying `last_applied_lsn`.
67    pub fn scan_for_resume(&self, log: &AdminIntentLog) -> Option<ResumePoint> {
68        log.scan_and_report();
69
70        let mut mine: Vec<_> = log
71            .list_unfinished()
72            .into_iter()
73            .filter(|u| {
74                u.op == IntentOp::ReplicaBootstrap
75                    && u.args.get("replica_id").and_then(|v| v.as_str())
76                        == Some(self.node_id.as_str())
77            })
78            .collect();
79
80        if mine.len() != 1 {
81            return None;
82        }
83
84        let item = mine.remove(0);
85        let progress = item.last_progress?;
86        let lsn = progress
87            .get("last_applied_lsn")
88            .and_then(|v| v.as_f64())
89            .map(|f| f as u64)
90            .unwrap_or(0);
91        let snapshot_token = progress
92            .get("snapshot_cursor")
93            .or_else(|| progress.get("snapshot_token"))
94            .and_then(|v| v.as_str())
95            .map(ToOwned::to_owned);
96        let snapshot_offset = progress
97            .get("snapshot_offset")
98            .and_then(|v| v.as_f64())
99            .map(|f| f as u64)
100            .unwrap_or(0);
101
102        Some(ResumePoint {
103            last_applied_lsn: lsn,
104            snapshot_token,
105            snapshot_offset,
106        })
107    }
108
109    /// Begin a fresh bootstrap intent.
110    ///
111    /// `source_lsn`: LSN at the primary when bootstrap starts.
112    /// `target_lsn_hint`: expected completion LSN (informational).
113    pub fn begin<'a>(
114        &self,
115        log: &'a AdminIntentLog,
116        source_lsn: u64,
117        target_lsn_hint: u64,
118    ) -> Result<BootstrapHandle<'a>, IntentLogError> {
119        let args = IntentArgs::new()
120            .insert("replica_id", JsonValue::String(self.node_id.clone()))
121            .insert("source_lsn", JsonValue::Number(source_lsn as f64))
122            .insert("target_lsn_hint", JsonValue::Number(target_lsn_hint as f64));
123        let handle = log.begin(IntentOp::ReplicaBootstrap, &self.node_id, args)?;
124        Ok(BootstrapHandle {
125            handle,
126            checkpoint_n: 0,
127            last_applied_lsn: 0,
128        })
129    }
130}
131
132/// Active bootstrap handle. Call [`BootstrapHandle::checkpoint`] periodically
133/// during catchup. Call [`BootstrapHandle::complete`] on success.
134///
135/// Dropping without calling `complete` writes `aborted` to the intent log
136/// (guaranteed by [`IntentHandle`]'s `Drop` impl).
137pub struct BootstrapHandle<'a> {
138    handle: IntentHandle<'a>,
139    checkpoint_n: u32,
140    last_applied_lsn: u64,
141}
142
143impl<'a> BootstrapHandle<'a> {
144    pub fn last_applied_lsn(&self) -> u64 {
145        self.last_applied_lsn
146    }
147
148    /// Write a checkpoint with current progress. Checkpoint number auto-increments.
149    pub fn checkpoint(
150        &mut self,
151        last_applied_lsn: u64,
152        batches_applied: u64,
153    ) -> Result<(), IntentLogError> {
154        self.checkpoint_n += 1;
155        let progress = IntentProgress::new()
156            .insert(
157                "last_applied_lsn",
158                JsonValue::Number(last_applied_lsn as f64),
159            )
160            .insert("batches_applied", JsonValue::Number(batches_applied as f64));
161        self.handle.checkpoint(self.checkpoint_n, Some(progress))?;
162        self.last_applied_lsn = last_applied_lsn;
163        Ok(())
164    }
165
166    /// Checkpoint an in-flight snapshot transfer so an interrupted bootstrap
167    /// can resume from the last persisted byte offset instead of restarting
168    /// from zero (issue #830).
169    ///
170    /// The snapshot token is stored under `snapshot_cursor` because
171    /// [`AdminIntentLog`] redacts progress keys containing `token`; the public
172    /// [`ResumePoint`] still surfaces it as `snapshot_token` to callers, which
173    /// also read the legacy `snapshot_token` key as a fallback.
174    pub fn checkpoint_snapshot_transfer(
175        &mut self,
176        snapshot_token: impl Into<String>,
177        snapshot_offset: u64,
178        last_applied_lsn: u64,
179        batches_applied: u64,
180    ) -> Result<(), IntentLogError> {
181        self.checkpoint_n += 1;
182        let progress = IntentProgress::new()
183            .insert("snapshot_cursor", JsonValue::String(snapshot_token.into()))
184            .insert("snapshot_offset", JsonValue::Number(snapshot_offset as f64))
185            .insert(
186                "last_applied_lsn",
187                JsonValue::Number(last_applied_lsn as f64),
188            )
189            .insert("batches_applied", JsonValue::Number(batches_applied as f64));
190        self.handle.checkpoint(self.checkpoint_n, Some(progress))?;
191        self.last_applied_lsn = last_applied_lsn;
192        Ok(())
193    }
194
195    /// Mark bootstrap complete. Consumes the handle.
196    pub fn complete(self, total_records: u64, duration_ms: u64) -> Result<(), IntentLogError> {
197        let summary = IntentSummary::new()
198            .insert("total_records", JsonValue::Number(total_records as f64))
199            .insert("duration_ms", JsonValue::Number(duration_ms as f64));
200        self.handle.complete(Some(summary))
201    }
202}
203
204// ---------------------------------------------------------------------------
205// Tests
206// ---------------------------------------------------------------------------
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use std::path::PathBuf;
212
213    fn tmp_path(label: &str) -> PathBuf {
214        let mut p = std::env::temp_dir();
215        p.push(format!(
216            "reddb-bootstrap-{}-{}.log",
217            label,
218            std::process::id()
219        ));
220        p
221    }
222
223    fn open_log(path: &PathBuf) -> AdminIntentLog {
224        AdminIntentLog::open(path).expect("open intent log")
225    }
226
227    // -----------------------------------------------------------------------
228    // 1. From-scratch: no unfinished intent → scan_for_resume returns None
229    // -----------------------------------------------------------------------
230    #[test]
231    fn bootstrap_from_scratch_when_no_unfinished_intent() {
232        let path = tmp_path("fresh");
233        let log = open_log(&path);
234        let bootstrapper = ReplicaBootstrapper::new("replica-1");
235
236        assert!(bootstrapper.scan_for_resume(&log).is_none());
237
238        let handle = bootstrapper.begin(&log, 0, 1000).unwrap();
239        handle.complete(500, 100).unwrap();
240
241        // Completed intent → no resume point on next boot
242        let log2 = open_log(&path);
243        assert!(bootstrapper.scan_for_resume(&log2).is_none());
244
245        let _ = std::fs::remove_file(&path);
246    }
247
248    // -----------------------------------------------------------------------
249    // 2. Crash mid-catchup (mem::forget simulates no-Drop) → resume from lsn
250    // -----------------------------------------------------------------------
251    #[test]
252    fn resume_from_checkpoint_after_crash() {
253        let path = tmp_path("resume");
254        let bootstrapper = ReplicaBootstrapper::new("replica-A");
255
256        // Phase 1: start, checkpoint at lsn=500, then "crash" (no Drop)
257        {
258            let log = open_log(&path);
259            let mut handle = bootstrapper.begin(&log, 0, 1000).unwrap();
260            handle.checkpoint(500, 10).unwrap();
261            std::mem::forget(handle);
262        }
263
264        // Phase 2: restart — resume at lsn=500, then continue to completion
265        {
266            let log2 = open_log(&path);
267            let resume = bootstrapper.scan_for_resume(&log2).expect("should resume");
268            assert_eq!(resume.last_applied_lsn, 500);
269
270            let mut handle = bootstrapper.begin(&log2, 500, 1000).unwrap();
271            handle.checkpoint(1000, 20).unwrap();
272            handle.complete(1000, 250).unwrap();
273        }
274
275        let _ = std::fs::remove_file(&path);
276    }
277
278    // -----------------------------------------------------------------------
279    // 3. Multi-replica isolation: each node sees only its own intent
280    // -----------------------------------------------------------------------
281    #[test]
282    fn multi_replica_isolation() {
283        let path = tmp_path("multi");
284        let log = open_log(&path);
285
286        let b1 = ReplicaBootstrapper::new("replica-1");
287        let b2 = ReplicaBootstrapper::new("replica-2");
288        let b3 = ReplicaBootstrapper::new("replica-3");
289
290        let mut h1 = b1.begin(&log, 0, 1000).unwrap();
291        h1.checkpoint(300, 5).unwrap();
292        std::mem::forget(h1);
293
294        let mut h2 = b2.begin(&log, 0, 2000).unwrap();
295        h2.checkpoint(700, 12).unwrap();
296        std::mem::forget(h2);
297
298        let log2 = open_log(&path);
299        let r1 = b1.scan_for_resume(&log2).map(|r| r.last_applied_lsn);
300        let r2 = b2.scan_for_resume(&log2).map(|r| r.last_applied_lsn);
301        let r3 = b3.scan_for_resume(&log2);
302
303        assert_eq!(r1, Some(300), "replica-1 resumes at 300");
304        assert_eq!(r2, Some(700), "replica-2 resumes at 700");
305        assert!(r3.is_none(), "replica-3 has no intent");
306
307        let _ = std::fs::remove_file(&path);
308    }
309
310    #[test]
311    fn resume_from_snapshot_transfer_checkpoint_after_crash() {
312        let path = tmp_path("snapshot-resume");
313        let bootstrapper = ReplicaBootstrapper::new("replica-snapshot");
314
315        {
316            let log = open_log(&path);
317            let mut handle = bootstrapper.begin(&log, 10, 1000).unwrap();
318            handle
319                .checkpoint_snapshot_transfer("snapshot-token-10", 4096, 10, 0)
320                .unwrap();
321            std::mem::forget(handle);
322        }
323
324        {
325            let log2 = open_log(&path);
326            let resume = bootstrapper.scan_for_resume(&log2).expect("should resume");
327            assert_eq!(resume.last_applied_lsn, 10);
328            assert_eq!(resume.snapshot_token.as_deref(), Some("snapshot-token-10"));
329            assert_eq!(resume.snapshot_offset, 4096);
330        }
331
332        let _ = std::fs::remove_file(&path);
333    }
334
335    // -----------------------------------------------------------------------
336    // 4. Drop without complete → aborted (terminal) → list_unfinished empty
337    // -----------------------------------------------------------------------
338    #[test]
339    fn drop_without_complete_writes_aborted() {
340        let path = tmp_path("abort");
341        let log = open_log(&path);
342        let bootstrapper = ReplicaBootstrapper::new("replica-X");
343
344        {
345            let mut handle = bootstrapper.begin(&log, 0, 1000).unwrap();
346            handle.checkpoint(100, 2).unwrap();
347            // drop → aborted written by IntentHandle::Drop
348        }
349
350        let log2 = open_log(&path);
351        assert_eq!(log2.list_unfinished().len(), 0, "aborted is terminal");
352
353        let _ = std::fs::remove_file(&path);
354    }
355
356    // -----------------------------------------------------------------------
357    // 5. Success path: complete writes completed phase → no unfinished intents
358    // -----------------------------------------------------------------------
359    #[test]
360    fn bootstrap_success_completes_intent() {
361        let path = tmp_path("success");
362        let log = open_log(&path);
363        let bootstrapper = ReplicaBootstrapper::new("replica-Y");
364
365        let mut handle = bootstrapper.begin(&log, 0, 500).unwrap();
366        handle.checkpoint(250, 5).unwrap();
367        handle.checkpoint(500, 10).unwrap();
368        handle.complete(1000, 300).unwrap();
369
370        let log2 = open_log(&path);
371        assert_eq!(log2.list_unfinished().len(), 0, "completed is terminal");
372
373        let _ = std::fs::remove_file(&path);
374    }
375
376    // -----------------------------------------------------------------------
377    // 6. No resume when intent crashed before any checkpoint
378    // -----------------------------------------------------------------------
379    #[test]
380    fn no_resume_when_no_checkpoint_progress() {
381        let path = tmp_path("no-progress");
382        let log = open_log(&path);
383        let bootstrapper = ReplicaBootstrapper::new("replica-Z");
384
385        // Crash before any checkpoint — no progress in the intent log
386        let handle = bootstrapper.begin(&log, 0, 1000).unwrap();
387        std::mem::forget(handle);
388
389        let log2 = open_log(&path);
390        let resume = bootstrapper.scan_for_resume(&log2);
391        assert!(resume.is_none(), "no checkpoint → no resume point");
392
393        let _ = std::fs::remove_file(&path);
394    }
395}