Skip to main content

reddb_server/replication/
replica.rs

1//! Replica-side replication: connects to primary, consumes WAL records.
2
3use std::time::Duration;
4
5use crate::json::Value as JsonValue;
6use crate::telemetry::admin_intent_log::{
7    AdminIntentLog, IntentArgs, IntentHandle, IntentLogError, IntentOp, IntentProgress,
8    IntentSummary,
9};
10
11/// Replica replication state.
12pub struct ReplicaReplication {
13    pub primary_addr: String,
14    pub last_applied_lsn: u64,
15    pub poll_interval: Duration,
16    pub connected: bool,
17}
18
19impl ReplicaReplication {
20    pub fn new(primary_addr: String, poll_interval_ms: u64) -> Self {
21        Self {
22            primary_addr,
23            last_applied_lsn: 0,
24            poll_interval: Duration::from_millis(poll_interval_ms),
25            connected: false,
26        }
27    }
28}
29
30// ---------------------------------------------------------------------------
31// Bootstrap resumability via AdminIntentLog
32// ---------------------------------------------------------------------------
33
34/// Resume point recovered from a previously checkpointed bootstrap intent.
35pub struct ResumePoint {
36    pub last_applied_lsn: u64,
37}
38
39/// Manages bootstrap lifecycle using [`AdminIntentLog`] for crash-resumability.
40///
41/// # Single-resumer policy
42///
43/// Each node only resumes its own intents (`args.replica_id == node_id`).
44/// If multiple unfinished intents exist for this node (unexpected), none is
45/// resumed — a fresh bootstrap is started and the dangling intents are left for
46/// operator investigation via [`crate::telemetry::operator_event::OperatorEvent::DanglingAdminIntent`].
47pub struct ReplicaBootstrapper {
48    node_id: String,
49}
50
51impl ReplicaBootstrapper {
52    pub fn new(node_id: impl Into<String>) -> Self {
53        Self {
54            node_id: node_id.into(),
55        }
56    }
57
58    /// Scan `log` for unfinished bootstrap intents.
59    ///
60    /// Calls [`AdminIntentLog::scan_and_report`] first — this emits a
61    /// `DanglingAdminIntent` operator event for every unfinished intent.
62    /// Then applies the single-resumer policy: returns a [`ResumePoint`] only
63    /// if exactly one unfinished `ReplicaBootstrap` intent for this `node_id`
64    /// exists with at least one checkpoint record carrying `last_applied_lsn`.
65    pub fn scan_for_resume(&self, log: &AdminIntentLog) -> Option<ResumePoint> {
66        log.scan_and_report();
67
68        let mut mine: Vec<_> = log
69            .list_unfinished()
70            .into_iter()
71            .filter(|u| {
72                u.op == IntentOp::ReplicaBootstrap
73                    && u.args.get("replica_id").and_then(|v| v.as_str())
74                        == Some(self.node_id.as_str())
75            })
76            .collect();
77
78        if mine.len() != 1 {
79            return None;
80        }
81
82        let item = mine.remove(0);
83        let progress = item.last_progress?;
84        let lsn = progress
85            .get("last_applied_lsn")
86            .and_then(|v| v.as_f64())
87            .map(|f| f as u64)
88            .unwrap_or(0);
89
90        Some(ResumePoint {
91            last_applied_lsn: lsn,
92        })
93    }
94
95    /// Begin a fresh bootstrap intent.
96    ///
97    /// `source_lsn`: LSN at the primary when bootstrap starts.
98    /// `target_lsn_hint`: expected completion LSN (informational).
99    pub fn begin<'a>(
100        &self,
101        log: &'a AdminIntentLog,
102        source_lsn: u64,
103        target_lsn_hint: u64,
104    ) -> Result<BootstrapHandle<'a>, IntentLogError> {
105        let args = IntentArgs::new()
106            .insert("replica_id", JsonValue::String(self.node_id.clone()))
107            .insert("source_lsn", JsonValue::Number(source_lsn as f64))
108            .insert("target_lsn_hint", JsonValue::Number(target_lsn_hint as f64));
109        let handle = log.begin(IntentOp::ReplicaBootstrap, &self.node_id, args)?;
110        Ok(BootstrapHandle {
111            handle,
112            checkpoint_n: 0,
113            last_applied_lsn: 0,
114        })
115    }
116}
117
118/// Active bootstrap handle. Call [`BootstrapHandle::checkpoint`] periodically
119/// during catchup. Call [`BootstrapHandle::complete`] on success.
120///
121/// Dropping without calling `complete` writes `aborted` to the intent log
122/// (guaranteed by [`IntentHandle`]'s `Drop` impl).
123pub struct BootstrapHandle<'a> {
124    handle: IntentHandle<'a>,
125    checkpoint_n: u32,
126    last_applied_lsn: u64,
127}
128
129impl<'a> BootstrapHandle<'a> {
130    pub fn last_applied_lsn(&self) -> u64 {
131        self.last_applied_lsn
132    }
133
134    /// Write a checkpoint with current progress. Checkpoint number auto-increments.
135    pub fn checkpoint(
136        &mut self,
137        last_applied_lsn: u64,
138        batches_applied: u64,
139    ) -> Result<(), IntentLogError> {
140        self.checkpoint_n += 1;
141        let progress = IntentProgress::new()
142            .insert(
143                "last_applied_lsn",
144                JsonValue::Number(last_applied_lsn as f64),
145            )
146            .insert("batches_applied", JsonValue::Number(batches_applied as f64));
147        self.handle.checkpoint(self.checkpoint_n, Some(progress))?;
148        self.last_applied_lsn = last_applied_lsn;
149        Ok(())
150    }
151
152    /// Mark bootstrap complete. Consumes the handle.
153    pub fn complete(self, total_records: u64, duration_ms: u64) -> Result<(), IntentLogError> {
154        let summary = IntentSummary::new()
155            .insert("total_records", JsonValue::Number(total_records as f64))
156            .insert("duration_ms", JsonValue::Number(duration_ms as f64));
157        self.handle.complete(Some(summary))
158    }
159}
160
161// ---------------------------------------------------------------------------
162// Tests
163// ---------------------------------------------------------------------------
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168    use std::path::PathBuf;
169
170    fn tmp_path(label: &str) -> PathBuf {
171        let mut p = std::env::temp_dir();
172        p.push(format!(
173            "reddb-bootstrap-{}-{}.log",
174            label,
175            std::process::id()
176        ));
177        p
178    }
179
180    fn open_log(path: &PathBuf) -> AdminIntentLog {
181        AdminIntentLog::open(path).expect("open intent log")
182    }
183
184    // -----------------------------------------------------------------------
185    // 1. From-scratch: no unfinished intent → scan_for_resume returns None
186    // -----------------------------------------------------------------------
187    #[test]
188    fn bootstrap_from_scratch_when_no_unfinished_intent() {
189        let path = tmp_path("fresh");
190        let log = open_log(&path);
191        let bootstrapper = ReplicaBootstrapper::new("replica-1");
192
193        assert!(bootstrapper.scan_for_resume(&log).is_none());
194
195        let handle = bootstrapper.begin(&log, 0, 1000).unwrap();
196        handle.complete(500, 100).unwrap();
197
198        // Completed intent → no resume point on next boot
199        let log2 = open_log(&path);
200        assert!(bootstrapper.scan_for_resume(&log2).is_none());
201
202        let _ = std::fs::remove_file(&path);
203    }
204
205    // -----------------------------------------------------------------------
206    // 2. Crash mid-catchup (mem::forget simulates no-Drop) → resume from lsn
207    // -----------------------------------------------------------------------
208    #[test]
209    fn resume_from_checkpoint_after_crash() {
210        let path = tmp_path("resume");
211        let bootstrapper = ReplicaBootstrapper::new("replica-A");
212
213        // Phase 1: start, checkpoint at lsn=500, then "crash" (no Drop)
214        {
215            let log = open_log(&path);
216            let mut handle = bootstrapper.begin(&log, 0, 1000).unwrap();
217            handle.checkpoint(500, 10).unwrap();
218            std::mem::forget(handle);
219        }
220
221        // Phase 2: restart — resume at lsn=500, then continue to completion
222        {
223            let log2 = open_log(&path);
224            let resume = bootstrapper.scan_for_resume(&log2).expect("should resume");
225            assert_eq!(resume.last_applied_lsn, 500);
226
227            let mut handle = bootstrapper.begin(&log2, 500, 1000).unwrap();
228            handle.checkpoint(1000, 20).unwrap();
229            handle.complete(1000, 250).unwrap();
230        }
231
232        let _ = std::fs::remove_file(&path);
233    }
234
235    // -----------------------------------------------------------------------
236    // 3. Multi-replica isolation: each node sees only its own intent
237    // -----------------------------------------------------------------------
238    #[test]
239    fn multi_replica_isolation() {
240        let path = tmp_path("multi");
241        let log = open_log(&path);
242
243        let b1 = ReplicaBootstrapper::new("replica-1");
244        let b2 = ReplicaBootstrapper::new("replica-2");
245        let b3 = ReplicaBootstrapper::new("replica-3");
246
247        let mut h1 = b1.begin(&log, 0, 1000).unwrap();
248        h1.checkpoint(300, 5).unwrap();
249        std::mem::forget(h1);
250
251        let mut h2 = b2.begin(&log, 0, 2000).unwrap();
252        h2.checkpoint(700, 12).unwrap();
253        std::mem::forget(h2);
254
255        let log2 = open_log(&path);
256        let r1 = b1.scan_for_resume(&log2).map(|r| r.last_applied_lsn);
257        let r2 = b2.scan_for_resume(&log2).map(|r| r.last_applied_lsn);
258        let r3 = b3.scan_for_resume(&log2);
259
260        assert_eq!(r1, Some(300), "replica-1 resumes at 300");
261        assert_eq!(r2, Some(700), "replica-2 resumes at 700");
262        assert!(r3.is_none(), "replica-3 has no intent");
263
264        let _ = std::fs::remove_file(&path);
265    }
266
267    // -----------------------------------------------------------------------
268    // 4. Drop without complete → aborted (terminal) → list_unfinished empty
269    // -----------------------------------------------------------------------
270    #[test]
271    fn drop_without_complete_writes_aborted() {
272        let path = tmp_path("abort");
273        let log = open_log(&path);
274        let bootstrapper = ReplicaBootstrapper::new("replica-X");
275
276        {
277            let mut handle = bootstrapper.begin(&log, 0, 1000).unwrap();
278            handle.checkpoint(100, 2).unwrap();
279            // drop → aborted written by IntentHandle::Drop
280        }
281
282        let log2 = open_log(&path);
283        assert_eq!(log2.list_unfinished().len(), 0, "aborted is terminal");
284
285        let _ = std::fs::remove_file(&path);
286    }
287
288    // -----------------------------------------------------------------------
289    // 5. Success path: complete writes completed phase → no unfinished intents
290    // -----------------------------------------------------------------------
291    #[test]
292    fn bootstrap_success_completes_intent() {
293        let path = tmp_path("success");
294        let log = open_log(&path);
295        let bootstrapper = ReplicaBootstrapper::new("replica-Y");
296
297        let mut handle = bootstrapper.begin(&log, 0, 500).unwrap();
298        handle.checkpoint(250, 5).unwrap();
299        handle.checkpoint(500, 10).unwrap();
300        handle.complete(1000, 300).unwrap();
301
302        let log2 = open_log(&path);
303        assert_eq!(log2.list_unfinished().len(), 0, "completed is terminal");
304
305        let _ = std::fs::remove_file(&path);
306    }
307
308    // -----------------------------------------------------------------------
309    // 6. No resume when intent crashed before any checkpoint
310    // -----------------------------------------------------------------------
311    #[test]
312    fn no_resume_when_no_checkpoint_progress() {
313        let path = tmp_path("no-progress");
314        let log = open_log(&path);
315        let bootstrapper = ReplicaBootstrapper::new("replica-Z");
316
317        // Crash before any checkpoint — no progress in the intent log
318        let handle = bootstrapper.begin(&log, 0, 1000).unwrap();
319        std::mem::forget(handle);
320
321        let log2 = open_log(&path);
322        let resume = bootstrapper.scan_for_resume(&log2);
323        assert!(resume.is_none(), "no checkpoint → no resume point");
324
325        let _ = std::fs::remove_file(&path);
326    }
327}