reddb_server/replication/
replica.rs1use std::time::Duration;
4
5use crate::json::Value as JsonValue;
6use crate::telemetry::admin_intent_log::{
7 AdminIntentLog, IntentArgs, IntentHandle, IntentLogError, IntentOp, IntentProgress,
8 IntentSummary,
9};
10
11pub struct ReplicaReplication {
13 pub primary_addr: String,
14 pub last_applied_lsn: u64,
15 pub poll_interval: Duration,
16 pub connected: bool,
17}
18
19impl ReplicaReplication {
20 pub fn new(primary_addr: String, poll_interval_ms: u64) -> Self {
21 Self {
22 primary_addr,
23 last_applied_lsn: 0,
24 poll_interval: Duration::from_millis(poll_interval_ms),
25 connected: false,
26 }
27 }
28}
29
30pub struct ResumePoint {
36 pub last_applied_lsn: u64,
37}
38
39pub struct ReplicaBootstrapper {
48 node_id: String,
49}
50
51impl ReplicaBootstrapper {
52 pub fn new(node_id: impl Into<String>) -> Self {
53 Self {
54 node_id: node_id.into(),
55 }
56 }
57
58 pub fn scan_for_resume(&self, log: &AdminIntentLog) -> Option<ResumePoint> {
66 log.scan_and_report();
67
68 let mut mine: Vec<_> = log
69 .list_unfinished()
70 .into_iter()
71 .filter(|u| {
72 u.op == IntentOp::ReplicaBootstrap
73 && u.args.get("replica_id").and_then(|v| v.as_str())
74 == Some(self.node_id.as_str())
75 })
76 .collect();
77
78 if mine.len() != 1 {
79 return None;
80 }
81
82 let item = mine.remove(0);
83 let progress = item.last_progress?;
84 let lsn = progress
85 .get("last_applied_lsn")
86 .and_then(|v| v.as_f64())
87 .map(|f| f as u64)
88 .unwrap_or(0);
89
90 Some(ResumePoint {
91 last_applied_lsn: lsn,
92 })
93 }
94
95 pub fn begin<'a>(
100 &self,
101 log: &'a AdminIntentLog,
102 source_lsn: u64,
103 target_lsn_hint: u64,
104 ) -> Result<BootstrapHandle<'a>, IntentLogError> {
105 let args = IntentArgs::new()
106 .insert("replica_id", JsonValue::String(self.node_id.clone()))
107 .insert("source_lsn", JsonValue::Number(source_lsn as f64))
108 .insert("target_lsn_hint", JsonValue::Number(target_lsn_hint as f64));
109 let handle = log.begin(IntentOp::ReplicaBootstrap, &self.node_id, args)?;
110 Ok(BootstrapHandle {
111 handle,
112 checkpoint_n: 0,
113 last_applied_lsn: 0,
114 })
115 }
116}
117
118pub struct BootstrapHandle<'a> {
124 handle: IntentHandle<'a>,
125 checkpoint_n: u32,
126 last_applied_lsn: u64,
127}
128
129impl<'a> BootstrapHandle<'a> {
130 pub fn last_applied_lsn(&self) -> u64 {
131 self.last_applied_lsn
132 }
133
134 pub fn checkpoint(
136 &mut self,
137 last_applied_lsn: u64,
138 batches_applied: u64,
139 ) -> Result<(), IntentLogError> {
140 self.checkpoint_n += 1;
141 let progress = IntentProgress::new()
142 .insert(
143 "last_applied_lsn",
144 JsonValue::Number(last_applied_lsn as f64),
145 )
146 .insert("batches_applied", JsonValue::Number(batches_applied as f64));
147 self.handle.checkpoint(self.checkpoint_n, Some(progress))?;
148 self.last_applied_lsn = last_applied_lsn;
149 Ok(())
150 }
151
152 pub fn complete(self, total_records: u64, duration_ms: u64) -> Result<(), IntentLogError> {
154 let summary = IntentSummary::new()
155 .insert("total_records", JsonValue::Number(total_records as f64))
156 .insert("duration_ms", JsonValue::Number(duration_ms as f64));
157 self.handle.complete(Some(summary))
158 }
159}
160
161#[cfg(test)]
166mod tests {
167 use super::*;
168 use std::path::PathBuf;
169
170 fn tmp_path(label: &str) -> PathBuf {
171 let mut p = std::env::temp_dir();
172 p.push(format!(
173 "reddb-bootstrap-{}-{}.log",
174 label,
175 std::process::id()
176 ));
177 p
178 }
179
180 fn open_log(path: &PathBuf) -> AdminIntentLog {
181 AdminIntentLog::open(path).expect("open intent log")
182 }
183
184 #[test]
188 fn bootstrap_from_scratch_when_no_unfinished_intent() {
189 let path = tmp_path("fresh");
190 let log = open_log(&path);
191 let bootstrapper = ReplicaBootstrapper::new("replica-1");
192
193 assert!(bootstrapper.scan_for_resume(&log).is_none());
194
195 let handle = bootstrapper.begin(&log, 0, 1000).unwrap();
196 handle.complete(500, 100).unwrap();
197
198 let log2 = open_log(&path);
200 assert!(bootstrapper.scan_for_resume(&log2).is_none());
201
202 let _ = std::fs::remove_file(&path);
203 }
204
205 #[test]
209 fn resume_from_checkpoint_after_crash() {
210 let path = tmp_path("resume");
211 let bootstrapper = ReplicaBootstrapper::new("replica-A");
212
213 {
215 let log = open_log(&path);
216 let mut handle = bootstrapper.begin(&log, 0, 1000).unwrap();
217 handle.checkpoint(500, 10).unwrap();
218 std::mem::forget(handle);
219 }
220
221 {
223 let log2 = open_log(&path);
224 let resume = bootstrapper.scan_for_resume(&log2).expect("should resume");
225 assert_eq!(resume.last_applied_lsn, 500);
226
227 let mut handle = bootstrapper.begin(&log2, 500, 1000).unwrap();
228 handle.checkpoint(1000, 20).unwrap();
229 handle.complete(1000, 250).unwrap();
230 }
231
232 let _ = std::fs::remove_file(&path);
233 }
234
235 #[test]
239 fn multi_replica_isolation() {
240 let path = tmp_path("multi");
241 let log = open_log(&path);
242
243 let b1 = ReplicaBootstrapper::new("replica-1");
244 let b2 = ReplicaBootstrapper::new("replica-2");
245 let b3 = ReplicaBootstrapper::new("replica-3");
246
247 let mut h1 = b1.begin(&log, 0, 1000).unwrap();
248 h1.checkpoint(300, 5).unwrap();
249 std::mem::forget(h1);
250
251 let mut h2 = b2.begin(&log, 0, 2000).unwrap();
252 h2.checkpoint(700, 12).unwrap();
253 std::mem::forget(h2);
254
255 let log2 = open_log(&path);
256 let r1 = b1.scan_for_resume(&log2).map(|r| r.last_applied_lsn);
257 let r2 = b2.scan_for_resume(&log2).map(|r| r.last_applied_lsn);
258 let r3 = b3.scan_for_resume(&log2);
259
260 assert_eq!(r1, Some(300), "replica-1 resumes at 300");
261 assert_eq!(r2, Some(700), "replica-2 resumes at 700");
262 assert!(r3.is_none(), "replica-3 has no intent");
263
264 let _ = std::fs::remove_file(&path);
265 }
266
267 #[test]
271 fn drop_without_complete_writes_aborted() {
272 let path = tmp_path("abort");
273 let log = open_log(&path);
274 let bootstrapper = ReplicaBootstrapper::new("replica-X");
275
276 {
277 let mut handle = bootstrapper.begin(&log, 0, 1000).unwrap();
278 handle.checkpoint(100, 2).unwrap();
279 }
281
282 let log2 = open_log(&path);
283 assert_eq!(log2.list_unfinished().len(), 0, "aborted is terminal");
284
285 let _ = std::fs::remove_file(&path);
286 }
287
288 #[test]
292 fn bootstrap_success_completes_intent() {
293 let path = tmp_path("success");
294 let log = open_log(&path);
295 let bootstrapper = ReplicaBootstrapper::new("replica-Y");
296
297 let mut handle = bootstrapper.begin(&log, 0, 500).unwrap();
298 handle.checkpoint(250, 5).unwrap();
299 handle.checkpoint(500, 10).unwrap();
300 handle.complete(1000, 300).unwrap();
301
302 let log2 = open_log(&path);
303 assert_eq!(log2.list_unfinished().len(), 0, "completed is terminal");
304
305 let _ = std::fs::remove_file(&path);
306 }
307
308 #[test]
312 fn no_resume_when_no_checkpoint_progress() {
313 let path = tmp_path("no-progress");
314 let log = open_log(&path);
315 let bootstrapper = ReplicaBootstrapper::new("replica-Z");
316
317 let handle = bootstrapper.begin(&log, 0, 1000).unwrap();
319 std::mem::forget(handle);
320
321 let log2 = open_log(&path);
322 let resume = bootstrapper.scan_for_resume(&log2);
323 assert!(resume.is_none(), "no checkpoint → no resume point");
324
325 let _ = std::fs::remove_file(&path);
326 }
327}