1use std::collections::{BTreeMap, VecDeque};
51use std::fs::{self, File, OpenOptions};
52use std::io::{self, Read, Seek, SeekFrom, Write};
53use std::path::{Path, PathBuf};
54use std::sync::{Arc, Condvar, Mutex, RwLock};
55use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
56
57use tracing::warn;
58
59const LOGICAL_WAL_SPOOL_MAGIC: &[u8; 4] = b"RDLW";
60const LOGICAL_WAL_SPOOL_VERSION_V1: u8 = 1;
61const LOGICAL_WAL_SPOOL_VERSION_V2: u8 = 2;
62const LOGICAL_WAL_SPOOL_VERSION_V3: u8 = 3;
63const LOGICAL_WAL_SPOOL_VERSION_CURRENT: u8 = LOGICAL_WAL_SPOOL_VERSION_V3;
64const LOGICAL_WAL_V3_HEADER_LEN: u64 = 4 + 1 + 8 + 8 + 8 + 4;
67const LOGICAL_WAL_V2_CRC_LEN: u64 = 4;
69
70fn compute_logical_v2_crc(version: u8, lsn: u64, timestamp: u64, payload: &[u8]) -> u32 {
79 use crate::storage::engine::crc32::crc32_update;
80 let mut crc = crc32_update(0, &[version]);
81 crc = crc32_update(crc, &lsn.to_le_bytes());
82 crc = crc32_update(crc, ×tamp.to_le_bytes());
83 crc = crc32_update(crc, &(payload.len() as u32).to_le_bytes());
84 crc = crc32_update(crc, payload);
85 crc
86}
87
88fn compute_logical_v3_crc(version: u8, term: u64, lsn: u64, timestamp: u64, payload: &[u8]) -> u32 {
89 use crate::storage::engine::crc32::crc32_update;
90 let mut crc = crc32_update(0, &[version]);
91 crc = crc32_update(crc, &term.to_le_bytes());
92 crc = crc32_update(crc, &lsn.to_le_bytes());
93 crc = crc32_update(crc, ×tamp.to_le_bytes());
94 crc = crc32_update(crc, &(payload.len() as u32).to_le_bytes());
95 crc = crc32_update(crc, payload);
96 crc
97}
98
99fn term_from_payload(payload: &[u8]) -> u64 {
100 crate::replication::cdc::ChangeRecord::decode(payload)
101 .map(|record| record.term)
102 .unwrap_or(crate::replication::DEFAULT_REPLICATION_TERM)
103}
104
105pub struct WalBuffer {
113 records: RwLock<VecDeque<(u64, Arc<[u8]>)>>,
115 current_lsn: RwLock<u64>,
117}
118
119impl WalBuffer {
120 pub fn new(max_size: usize) -> Self {
121 Self {
122 records: RwLock::new(VecDeque::with_capacity(max_size)),
123 current_lsn: RwLock::new(0),
124 }
125 }
126
127 pub fn append(&self, lsn: u64, data: Vec<u8>) {
129 let mut records = self.records.write().unwrap_or_else(|e| e.into_inner());
130 records.push_back((lsn, Arc::from(data.into_boxed_slice())));
131
132 let mut current = self.current_lsn.write().unwrap_or_else(|e| e.into_inner());
133 *current = (*current).max(lsn);
134 }
135
136 pub fn read_since(&self, since_lsn: u64, max_count: usize) -> Vec<(u64, Vec<u8>)> {
142 self.read_since_shared(since_lsn, max_count)
143 .into_iter()
144 .map(|(lsn, data)| (lsn, data.to_vec()))
145 .collect()
146 }
147
148 pub fn read_since_shared(&self, since_lsn: u64, max_count: usize) -> Vec<(u64, Arc<[u8]>)> {
153 let records = self.records.read().unwrap_or_else(|e| e.into_inner());
154 records
155 .iter()
156 .filter(|(lsn, _)| *lsn > since_lsn)
157 .take(max_count)
158 .cloned()
159 .collect()
160 }
161
162 pub fn current_lsn(&self) -> u64 {
164 *self.current_lsn.read().unwrap_or_else(|e| e.into_inner())
165 }
166
167 pub fn set_current_lsn(&self, lsn: u64) {
168 let mut current = self.current_lsn.write().unwrap_or_else(|e| e.into_inner());
169 *current = (*current).max(lsn);
170 }
171
172 pub fn prune_through(&self, upto_lsn: u64) {
173 let mut records = self.records.write().unwrap_or_else(|e| e.into_inner());
174 while records
175 .front()
176 .map(|(lsn, _)| *lsn <= upto_lsn)
177 .unwrap_or(false)
178 {
179 records.pop_front();
180 }
181 }
182
183 pub fn oldest_lsn(&self) -> Option<u64> {
185 let records = self.records.read().unwrap_or_else(|e| e.into_inner());
186 records.front().map(|(lsn, _)| *lsn)
187 }
188}
189
190#[derive(Debug, Clone)]
191struct LogicalWalEntry {
192 term: u64,
193 lsn: u64,
194 timestamp_ms: u64,
197 data: Vec<u8>,
198}
199
200impl LogicalWalEntry {
201 fn data_with_framing_term(&self) -> Vec<u8> {
202 match crate::replication::cdc::ChangeRecord::decode(&self.data) {
203 Ok(mut record) if record.term != self.term => {
204 record.term = self.term;
205 record.encode()
206 }
207 _ => self.data.clone(),
208 }
209 }
210}
211
212const SEEK_INDEX_INTERVAL: u64 = 64;
220
221#[derive(Debug, Default)]
222struct LogicalWalSpoolState {
223 current_lsn: u64,
224 seek_index: Vec<(u64, u64)>,
228 write_offset: u64,
232 record_count: u64,
235}
236
237impl LogicalWalSpoolState {
238 fn note_record(&mut self, ordinal: u64, lsn: u64, offset: u64) {
242 if ordinal.is_multiple_of(SEEK_INDEX_INTERVAL) {
243 if self.seek_index.last().map(|(l, _)| *l) != Some(lsn) {
247 self.seek_index.push((lsn, offset));
248 }
249 }
250 }
251
252 fn seek_floor_offset(&self, since_lsn: u64) -> u64 {
257 match self
258 .seek_index
259 .binary_search_by(|(lsn, _)| lsn.cmp(&since_lsn))
260 {
261 Ok(idx) => self.seek_index[idx].1,
262 Err(0) => 0,
263 Err(idx) => self.seek_index[idx - 1].1,
264 }
265 }
266}
267
268pub struct LogicalWalSpool {
272 path: PathBuf,
273 state: Mutex<LogicalWalSpoolState>,
274}
275
276impl LogicalWalSpool {
277 pub fn path_for(data_path: &Path) -> PathBuf {
278 let file_name = data_path
279 .file_name()
280 .and_then(|name| name.to_str())
281 .unwrap_or("reddb.rdb");
282 let spool_name = format!("{file_name}.logical.wal");
283 match data_path.parent() {
284 Some(parent) => parent.join(spool_name),
285 None => PathBuf::from(spool_name),
286 }
287 }
288
289 pub fn open(data_path: &Path) -> io::Result<Self> {
290 let path = Self::path_for(data_path);
291 if let Some(parent) = path.parent() {
292 fs::create_dir_all(parent)?;
293 }
294 if !path.exists() {
295 File::create(&path)?;
296 }
297 let entries = read_and_repair_entries(&path)?;
302 let current_lsn = entries.last().map(|entry| entry.lsn).unwrap_or(0);
303 let (seek_index, write_offset, record_count) = build_seek_index(&path)?;
306 Ok(Self {
307 path,
308 state: Mutex::new(LogicalWalSpoolState {
309 current_lsn,
310 seek_index,
311 write_offset,
312 record_count,
313 }),
314 })
315 }
316
317 pub fn append(&self, lsn: u64, data: &[u8]) -> io::Result<()> {
318 let timestamp_ms = SystemTime::now()
319 .duration_since(UNIX_EPOCH)
320 .map(|d| d.as_millis() as u64)
321 .unwrap_or(0);
322 self.append_with_timestamp(lsn, timestamp_ms, data)
323 }
324
325 pub fn append_with_timestamp(
329 &self,
330 lsn: u64,
331 timestamp_ms: u64,
332 data: &[u8],
333 ) -> io::Result<()> {
334 self.append_with_term_and_timestamp(term_from_payload(data), lsn, timestamp_ms, data)
335 }
336
337 pub fn append_with_term_and_timestamp(
338 &self,
339 term: u64,
340 lsn: u64,
341 timestamp_ms: u64,
342 data: &[u8],
343 ) -> io::Result<()> {
344 if data.len() > u32::MAX as usize {
345 return Err(io::Error::new(
346 io::ErrorKind::InvalidInput,
347 format!(
348 "logical WAL payload of {} bytes exceeds 4 GiB framing limit",
349 data.len()
350 ),
351 ));
352 }
353 let mut file = OpenOptions::new()
354 .create(true)
355 .append(true)
356 .open(&self.path)?;
357 let mut frame = Vec::with_capacity(
367 LOGICAL_WAL_V3_HEADER_LEN as usize + data.len() + LOGICAL_WAL_V2_CRC_LEN as usize,
368 );
369 frame.extend_from_slice(LOGICAL_WAL_SPOOL_MAGIC);
370 frame.push(LOGICAL_WAL_SPOOL_VERSION_CURRENT);
371 frame.extend_from_slice(&term.to_le_bytes());
372 frame.extend_from_slice(&lsn.to_le_bytes());
373 frame.extend_from_slice(×tamp_ms.to_le_bytes());
374 frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
375 frame.extend_from_slice(data);
376 let crc = compute_logical_v3_crc(
377 LOGICAL_WAL_SPOOL_VERSION_CURRENT,
378 term,
379 lsn,
380 timestamp_ms,
381 data,
382 );
383 frame.extend_from_slice(&crc.to_le_bytes());
384
385 file.write_all(&frame)?;
386 file.sync_all()?;
391
392 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
393 state.current_lsn = state.current_lsn.max(lsn);
394 let record_start = state.write_offset;
398 let ordinal = state.record_count;
399 state.note_record(ordinal, lsn, record_start);
400 state.write_offset = record_start + frame.len() as u64;
401 state.record_count = ordinal + 1;
402 Ok(())
403 }
404
405 pub fn read_since(&self, since_lsn: u64, max_count: usize) -> io::Result<Vec<(u64, Vec<u8>)>> {
406 let start_offset = {
412 let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
413 state.seek_floor_offset(since_lsn)
414 };
415 let entries = read_entries_from(&self.path, start_offset)?;
416 Ok(entries
417 .into_iter()
418 .filter(|entry| entry.lsn > since_lsn)
419 .take(max_count)
420 .map(|entry| (entry.lsn, entry.data_with_framing_term()))
421 .collect())
422 }
423
424 #[cfg(test)]
428 fn seek_floor_offset(&self, since_lsn: u64) -> u64 {
429 self.state
430 .lock()
431 .unwrap_or_else(|e| e.into_inner())
432 .seek_floor_offset(since_lsn)
433 }
434
435 pub fn current_lsn(&self) -> u64 {
436 self.state
437 .lock()
438 .unwrap_or_else(|e| e.into_inner())
439 .current_lsn
440 }
441
442 pub fn oldest_lsn(&self) -> io::Result<Option<u64>> {
443 Ok(read_and_repair_entries(&self.path)?
444 .into_iter()
445 .next()
446 .map(|entry| entry.lsn))
447 }
448
449 pub fn prune_through(&self, upto_lsn: u64) -> io::Result<()> {
450 let previous_lsn = self.current_lsn();
451 let retained: Vec<_> = read_and_repair_entries(&self.path)?
452 .into_iter()
453 .filter(|entry| entry.lsn > upto_lsn)
454 .collect();
455 let temp_path = self.path.with_extension("logical.wal.tmp");
456 let mut temp = File::create(&temp_path)?;
457 let mut current_lsn = 0;
458 for entry in retained {
459 let timestamp_ms = if entry.timestamp_ms > 0 {
467 entry.timestamp_ms
468 } else {
469 SystemTime::now()
470 .duration_since(UNIX_EPOCH)
471 .map(|d| d.as_millis() as u64)
472 .unwrap_or(0)
473 };
474 let crc = compute_logical_v3_crc(
475 LOGICAL_WAL_SPOOL_VERSION_CURRENT,
476 entry.term,
477 entry.lsn,
478 timestamp_ms,
479 &entry.data,
480 );
481 temp.write_all(LOGICAL_WAL_SPOOL_MAGIC)?;
482 temp.write_all(&[LOGICAL_WAL_SPOOL_VERSION_CURRENT])?;
483 temp.write_all(&entry.term.to_le_bytes())?;
484 temp.write_all(&entry.lsn.to_le_bytes())?;
485 temp.write_all(×tamp_ms.to_le_bytes())?;
486 temp.write_all(&(entry.data.len() as u32).to_le_bytes())?;
487 temp.write_all(&entry.data)?;
488 temp.write_all(&crc.to_le_bytes())?;
489 current_lsn = current_lsn.max(entry.lsn);
490 }
491 temp.sync_all()?;
492 fs::rename(&temp_path, &self.path)?;
493
494 let (seek_index, write_offset, record_count) = build_seek_index(&self.path)?;
497 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
498 state.current_lsn = previous_lsn.max(current_lsn).max(upto_lsn);
499 state.seek_index = seek_index;
500 state.write_offset = write_offset;
501 state.record_count = record_count;
502 Ok(())
503 }
504}
505
506fn read_and_repair_entries(path: &Path) -> io::Result<Vec<LogicalWalEntry>> {
531 if !path.exists() {
532 return Ok(Vec::new());
533 }
534
535 let mut file = OpenOptions::new().read(true).write(true).open(path)?;
536 let mut entries = Vec::new();
537 let mut last_good_offset: u64 = 0;
538 let mut corrupt_reason: Option<String> = None;
539
540 loop {
541 let record_start = file.stream_position()?;
542
543 let mut magic = [0u8; 4];
544 match file.read_exact(&mut magic) {
545 Ok(()) => {}
546 Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
547 Err(err) => return Err(err),
548 }
549 if &magic != LOGICAL_WAL_SPOOL_MAGIC {
550 corrupt_reason = Some(format!(
551 "bad magic at offset {record_start}: got {magic:02x?}"
552 ));
553 break;
554 }
555
556 let mut version = [0u8; 1];
557 if let Err(err) = file.read_exact(&mut version) {
558 if err.kind() == io::ErrorKind::UnexpectedEof {
559 corrupt_reason = Some(format!("torn header at offset {record_start}"));
560 break;
561 }
562 return Err(err);
563 }
564
565 let entry_result = match version[0] {
566 LOGICAL_WAL_SPOOL_VERSION_V3 => read_one_v3(&mut file, record_start),
567 LOGICAL_WAL_SPOOL_VERSION_V2 => read_one_v2(&mut file, record_start),
568 LOGICAL_WAL_SPOOL_VERSION_V1 => read_one_v1(&mut file, record_start),
569 other => {
570 corrupt_reason = Some(format!(
571 "unsupported version {other} at offset {record_start}"
572 ));
573 break;
574 }
575 };
576
577 match entry_result {
578 Ok(entry) => {
579 entries.push(entry);
580 last_good_offset = file.stream_position()?;
581 }
582 Err(reason) => {
583 corrupt_reason = Some(reason);
584 break;
585 }
586 }
587 }
588
589 if let Some(reason) = corrupt_reason {
590 let total_len = file.metadata()?.len();
591 if last_good_offset < total_len {
592 warn!(
593 target: "reddb::replication::logical_wal",
594 path = %path.display(),
595 reason = %reason,
596 truncating_from = last_good_offset,
597 truncating_to = total_len,
598 kept_records = entries.len(),
599 "truncating logical-WAL spool to last valid record"
600 );
601 file.set_len(last_good_offset)?;
602 file.sync_all()?;
603 }
604 }
605
606 Ok(entries)
607}
608
609fn read_frame(file: &mut File, record_start: u64) -> io::Result<Option<LogicalWalEntry>> {
618 let mut magic = [0u8; 4];
619 match file.read_exact(&mut magic) {
620 Ok(()) => {}
621 Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
622 Err(err) => return Err(err),
623 }
624 if &magic != LOGICAL_WAL_SPOOL_MAGIC {
625 return Ok(None);
626 }
627 let mut version = [0u8; 1];
628 match file.read_exact(&mut version) {
629 Ok(()) => {}
630 Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
631 Err(err) => return Err(err),
632 }
633 let entry = match version[0] {
634 LOGICAL_WAL_SPOOL_VERSION_V3 => read_one_v3(file, record_start),
635 LOGICAL_WAL_SPOOL_VERSION_V2 => read_one_v2(file, record_start),
636 LOGICAL_WAL_SPOOL_VERSION_V1 => read_one_v1(file, record_start),
637 _ => return Ok(None),
638 };
639 Ok(entry.ok())
640}
641
642fn read_entries_from(path: &Path, start_offset: u64) -> io::Result<Vec<LogicalWalEntry>> {
646 if !path.exists() {
647 return Ok(Vec::new());
648 }
649 let mut file = OpenOptions::new().read(true).open(path)?;
650 file.seek(SeekFrom::Start(start_offset))?;
651 let mut entries = Vec::new();
652 loop {
653 let record_start = file.stream_position()?;
654 match read_frame(&mut file, record_start)? {
655 Some(entry) => entries.push(entry),
656 None => break,
657 }
658 }
659 Ok(entries)
660}
661
662fn build_seek_index(path: &Path) -> io::Result<(Vec<(u64, u64)>, u64, u64)> {
667 if !path.exists() {
668 return Ok((Vec::new(), 0, 0));
669 }
670 let mut file = OpenOptions::new().read(true).open(path)?;
671 let mut index = Vec::new();
672 let mut ordinal: u64 = 0;
673 let mut write_offset: u64 = 0;
674 loop {
675 let record_start = file.stream_position()?;
676 match read_frame(&mut file, record_start)? {
677 Some(entry) => {
678 if ordinal.is_multiple_of(SEEK_INDEX_INTERVAL)
679 && index.last().map(|(l, _)| *l) != Some(entry.lsn)
680 {
681 index.push((entry.lsn, record_start));
682 }
683 ordinal += 1;
684 write_offset = file.stream_position()?;
685 }
686 None => break,
687 }
688 }
689 Ok((index, write_offset, ordinal))
690}
691
692fn read_one_v3(file: &mut File, record_start: u64) -> Result<LogicalWalEntry, String> {
695 let mut term = [0u8; 8];
696 if let Err(err) = file.read_exact(&mut term) {
697 return Err(format!("torn term at offset {record_start}: {err}"));
698 }
699 let mut lsn = [0u8; 8];
700 if let Err(err) = file.read_exact(&mut lsn) {
701 return Err(format!("torn lsn at offset {record_start}: {err}"));
702 }
703 let mut timestamp = [0u8; 8];
704 if let Err(err) = file.read_exact(&mut timestamp) {
705 return Err(format!("torn timestamp at offset {record_start}: {err}"));
706 }
707 let mut len_bytes = [0u8; 4];
708 if let Err(err) = file.read_exact(&mut len_bytes) {
709 return Err(format!(
710 "torn payload length at offset {record_start}: {err}"
711 ));
712 }
713 let payload_len = u32::from_le_bytes(len_bytes) as usize;
714 const MAX_PLAUSIBLE_PAYLOAD: usize = 256 * 1024 * 1024;
715 if payload_len > MAX_PLAUSIBLE_PAYLOAD {
716 return Err(format!(
717 "implausible payload_len {payload_len} at offset {record_start}"
718 ));
719 }
720 let mut payload = vec![0u8; payload_len];
721 if let Err(err) = file.read_exact(&mut payload) {
722 return Err(format!(
723 "torn payload at offset {record_start} (expected {payload_len} bytes): {err}"
724 ));
725 }
726 let mut crc_bytes = [0u8; 4];
727 if let Err(err) = file.read_exact(&mut crc_bytes) {
728 return Err(format!("torn crc at offset {record_start}: {err}"));
729 }
730 let stored_crc = u32::from_le_bytes(crc_bytes);
731 let term = u64::from_le_bytes(term);
732 let lsn = u64::from_le_bytes(lsn);
733 let timestamp = u64::from_le_bytes(timestamp);
734 let expected_crc =
735 compute_logical_v3_crc(LOGICAL_WAL_SPOOL_VERSION_V3, term, lsn, timestamp, &payload);
736 if stored_crc != expected_crc {
737 return Err(format!(
738 "crc mismatch at offset {record_start}: stored {stored_crc:#010x}, expected {expected_crc:#010x}"
739 ));
740 }
741 Ok(LogicalWalEntry {
742 term,
743 lsn,
744 timestamp_ms: timestamp,
745 data: payload,
746 })
747}
748
749fn read_one_v2(file: &mut File, record_start: u64) -> Result<LogicalWalEntry, String> {
753 let mut lsn = [0u8; 8];
754 if let Err(err) = file.read_exact(&mut lsn) {
755 return Err(format!("torn lsn at offset {record_start}: {err}"));
756 }
757 let mut timestamp = [0u8; 8];
758 if let Err(err) = file.read_exact(&mut timestamp) {
759 return Err(format!("torn timestamp at offset {record_start}: {err}"));
760 }
761 let mut len_bytes = [0u8; 4];
762 if let Err(err) = file.read_exact(&mut len_bytes) {
763 return Err(format!(
764 "torn payload length at offset {record_start}: {err}"
765 ));
766 }
767 let payload_len = u32::from_le_bytes(len_bytes) as usize;
768 const MAX_PLAUSIBLE_PAYLOAD: usize = 256 * 1024 * 1024;
773 if payload_len > MAX_PLAUSIBLE_PAYLOAD {
774 return Err(format!(
775 "implausible payload_len {payload_len} at offset {record_start}"
776 ));
777 }
778 let mut payload = vec![0u8; payload_len];
779 if let Err(err) = file.read_exact(&mut payload) {
780 return Err(format!(
781 "torn payload at offset {record_start} (expected {payload_len} bytes): {err}"
782 ));
783 }
784 let mut crc_bytes = [0u8; 4];
785 if let Err(err) = file.read_exact(&mut crc_bytes) {
786 return Err(format!("torn crc at offset {record_start}: {err}"));
787 }
788 let stored_crc = u32::from_le_bytes(crc_bytes);
789 let expected_crc = compute_logical_v2_crc(
790 LOGICAL_WAL_SPOOL_VERSION_V2,
791 u64::from_le_bytes(lsn),
792 u64::from_le_bytes(timestamp),
793 &payload,
794 );
795 if stored_crc != expected_crc {
796 return Err(format!(
797 "crc mismatch at offset {record_start}: stored {stored_crc:#010x}, expected {expected_crc:#010x}"
798 ));
799 }
800 let term = term_from_payload(&payload);
801 Ok(LogicalWalEntry {
802 term,
803 lsn: u64::from_le_bytes(lsn),
804 timestamp_ms: u64::from_le_bytes(timestamp),
805 data: payload,
806 })
807}
808
809fn read_one_v1(file: &mut File, record_start: u64) -> Result<LogicalWalEntry, String> {
814 let mut lsn = [0u8; 8];
815 if let Err(err) = file.read_exact(&mut lsn) {
816 return Err(format!("v1 torn lsn at offset {record_start}: {err}"));
817 }
818 let mut len_bytes = [0u8; 8];
819 if let Err(err) = file.read_exact(&mut len_bytes) {
820 return Err(format!(
821 "v1 torn payload length at offset {record_start}: {err}"
822 ));
823 }
824 let payload_len = u64::from_le_bytes(len_bytes) as usize;
825 if payload_len > 256 * 1024 * 1024 {
826 return Err(format!(
827 "v1 implausible payload_len {payload_len} at offset {record_start}"
828 ));
829 }
830 let mut payload = vec![0u8; payload_len];
831 if let Err(err) = file.read_exact(&mut payload) {
832 return Err(format!("v1 torn payload at offset {record_start}: {err}"));
833 }
834 let term = term_from_payload(&payload);
835 Ok(LogicalWalEntry {
836 term,
837 lsn: u64::from_le_bytes(lsn),
838 timestamp_ms: 0,
839 data: payload,
840 })
841}
842
843#[derive(Debug, Clone, Copy, PartialEq, Eq)]
844pub enum SlotInvalidationCause {
845 WalRemoved,
846 Horizon,
847 IdleTimeout,
848}
849
850impl SlotInvalidationCause {
851 pub fn as_str(self) -> &'static str {
852 match self {
853 Self::WalRemoved => "wal-removed",
854 Self::Horizon => "horizon",
855 Self::IdleTimeout => "idle-timeout",
856 }
857 }
858
859 fn from_str(value: &str) -> Option<Self> {
860 match value {
861 "wal-removed" => Some(Self::WalRemoved),
862 "horizon" => Some(Self::Horizon),
863 "idle-timeout" => Some(Self::IdleTimeout),
864 _ => None,
865 }
866 }
867}
868
869#[derive(Debug, Clone)]
870pub struct ReplicationSlot {
871 pub id: String,
872 pub restart_lsn: u64,
873 pub confirmed_lsn: u64,
874 pub last_seen_at_unix_ms: u128,
875 pub invalidation_reason: Option<SlotInvalidationCause>,
876 pub invalidated_at_unix_ms: Option<u128>,
877}
878
879fn load_replication_slots(path: Option<&Path>, now_ms: u128) -> BTreeMap<String, ReplicationSlot> {
880 let Some(path) = path else {
881 return BTreeMap::new();
882 };
883 let bytes = match fs::read(path) {
884 Ok(bytes) => bytes,
885 Err(err) if err.kind() == io::ErrorKind::NotFound => return BTreeMap::new(),
886 Err(err) => {
887 warn!(
888 target: "reddb::replication::slots",
889 path = %path.display(),
890 error = %err,
891 "failed to read replication slot store"
892 );
893 return BTreeMap::new();
894 }
895 };
896 match crate::serde_json::from_slice::<crate::serde_json::Value>(&bytes) {
897 Ok(value) => value
898 .get("slots")
899 .and_then(crate::serde_json::Value::as_array)
900 .unwrap_or(&[])
901 .iter()
902 .filter_map(|value| {
903 let object = value.as_object()?;
904 let id = object.get("id")?.as_str()?.to_string();
905 let restart_lsn = object.get("restart_lsn")?.as_u64()?;
906 let confirmed_lsn = object.get("confirmed_lsn")?.as_u64()?;
907 let last_seen_at_unix_ms = object
908 .get("last_seen_at_unix_ms")
909 .and_then(crate::serde_json::Value::as_u64)
910 .map(u128::from)
911 .unwrap_or(now_ms);
912 let invalidation_reason = object
913 .get("invalidation_reason")
914 .and_then(crate::serde_json::Value::as_str)
915 .and_then(SlotInvalidationCause::from_str);
916 let invalidated_at_unix_ms = object
917 .get("invalidated_at_unix_ms")
918 .and_then(crate::serde_json::Value::as_u64)
919 .map(u128::from);
920 Some((
921 id.clone(),
922 ReplicationSlot {
923 id,
924 restart_lsn,
925 confirmed_lsn,
926 last_seen_at_unix_ms,
927 invalidation_reason,
928 invalidated_at_unix_ms,
929 },
930 ))
931 })
932 .collect(),
933 Err(err) => {
934 warn!(
935 target: "reddb::replication::slots",
936 path = %path.display(),
937 error = %err,
938 "failed to decode replication slot store"
939 );
940 BTreeMap::new()
941 }
942 }
943}
944
945fn persist_replication_slots(
946 path: Option<&Path>,
947 slots: &BTreeMap<String, ReplicationSlot>,
948) -> io::Result<()> {
949 let Some(path) = path else {
950 return Ok(());
951 };
952 if let Some(parent) = path.parent() {
953 fs::create_dir_all(parent)?;
954 }
955 let temp_path = path.with_extension("logical.slots.tmp");
956 let slots_json = slots
957 .values()
958 .map(|slot| {
959 let mut object = crate::serde_json::Map::new();
960 object.insert(
961 "id".to_string(),
962 crate::serde_json::Value::String(slot.id.clone()),
963 );
964 object.insert(
965 "restart_lsn".to_string(),
966 crate::serde_json::Value::Number(slot.restart_lsn as f64),
967 );
968 object.insert(
969 "confirmed_lsn".to_string(),
970 crate::serde_json::Value::Number(slot.confirmed_lsn as f64),
971 );
972 object.insert(
973 "last_seen_at_unix_ms".to_string(),
974 crate::serde_json::Value::Number(slot.last_seen_at_unix_ms as f64),
975 );
976 if let Some(reason) = slot.invalidation_reason {
977 object.insert(
978 "invalidation_reason".to_string(),
979 crate::serde_json::Value::String(reason.as_str().to_string()),
980 );
981 }
982 if let Some(invalidated_at) = slot.invalidated_at_unix_ms {
983 object.insert(
984 "invalidated_at_unix_ms".to_string(),
985 crate::serde_json::Value::Number(invalidated_at as f64),
986 );
987 }
988 crate::serde_json::Value::Object(object)
989 })
990 .collect();
991 let mut root = crate::serde_json::Map::new();
992 root.insert(
993 "slots".to_string(),
994 crate::serde_json::Value::Array(slots_json),
995 );
996 let value = crate::serde_json::Value::Object(root);
997 let bytes = crate::serde_json::to_string_pretty(&value)
998 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
999 let mut temp = File::create(&temp_path)?;
1000 temp.write_all(bytes.as_bytes())?;
1001 temp.sync_all()?;
1002 fs::rename(&temp_path, path)?;
1003 Ok(())
1004}
1005
1006#[derive(Debug, Clone)]
1012pub struct ReplicaState {
1013 pub id: String,
1014 pub last_acked_lsn: u64,
1015 pub last_sent_lsn: u64,
1016 pub last_durable_lsn: u64,
1017 pub apply_error_count: u64,
1018 pub divergence_count: u64,
1019 pub connected_at_unix_ms: u128,
1020 pub last_seen_at_unix_ms: u128,
1021 pub region: Option<String>,
1026 pub rebootstrapping: bool,
1033}
1034
1035#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1037pub struct ReplicationProgress {
1038 pub lag_lsn: u64,
1039 pub safe_replay_lsn: u64,
1040}
1041
1042impl ReplicationProgress {
1043 pub fn from_replicas(replicas: &[ReplicaState]) -> Option<Self> {
1044 let max_sent_lsn = replicas.iter().map(|replica| replica.last_sent_lsn).max()?;
1045 let min_acked_lsn = replicas
1046 .iter()
1047 .map(|replica| replica.last_acked_lsn)
1048 .min()?;
1049 let safe_replay_lsn = replicas
1050 .iter()
1051 .map(|replica| replica.last_durable_lsn)
1052 .min()?;
1053
1054 Some(Self {
1055 lag_lsn: max_sent_lsn.saturating_sub(min_acked_lsn),
1056 safe_replay_lsn,
1057 })
1058 }
1059}
1060
1061pub struct PrimaryReplication {
1063 pub wal_buffer: Arc<WalBuffer>,
1064 pub logical_wal_spool: Option<Arc<LogicalWalSpool>>,
1065 pub replicas: RwLock<Vec<ReplicaState>>,
1066 wal_appended: (Mutex<u64>, Condvar),
1067 slot_path: Option<PathBuf>,
1068 slots: RwLock<BTreeMap<String, ReplicationSlot>>,
1069 slot_retention_max_lag_lsn: u64,
1070 slot_idle_timeout_ms: u64,
1071 pub commit_waiter: Arc<crate::replication::commit_waiter::CommitWaiter>,
1075 topology_epoch: std::sync::atomic::AtomicU64,
1082 partial_resync_count: std::sync::atomic::AtomicU64,
1088 full_resync_count: std::sync::atomic::AtomicU64,
1095}
1096
1097#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1099pub enum ResumeMode {
1100 PartialResync { resume_lsn: u64 },
1104 FullRebootstrap { cause: SlotInvalidationCause },
1107}
1108
1109impl PrimaryReplication {
1110 pub fn slot_path_for(data_path: &Path) -> PathBuf {
1111 let file_name = data_path
1112 .file_name()
1113 .and_then(|name| name.to_str())
1114 .unwrap_or("reddb.rdb");
1115 let slot_name = format!("{file_name}.logical.slots.json");
1116 match data_path.parent() {
1117 Some(parent) => parent.join(slot_name),
1118 None => PathBuf::from(slot_name),
1119 }
1120 }
1121
1122 pub fn new(data_path: Option<&Path>) -> Self {
1123 Self::new_with_config(data_path, &crate::replication::ReplicationConfig::primary())
1124 }
1125
1126 pub fn new_with_config(
1127 data_path: Option<&Path>,
1128 config: &crate::replication::ReplicationConfig,
1129 ) -> Self {
1130 let now_ms = crate::utils::now_unix_millis() as u128;
1131 let slot_path = data_path.map(Self::slot_path_for);
1132 let slots = load_replication_slots(slot_path.as_deref(), now_ms);
1133 let logical_wal_spool = data_path
1134 .and_then(|path| LogicalWalSpool::open(path).ok())
1135 .map(Arc::new);
1136 let current_lsn = logical_wal_spool
1137 .as_ref()
1138 .map(|spool| spool.current_lsn())
1139 .unwrap_or(0);
1140 Self {
1141 wal_buffer: Arc::new(WalBuffer::new(100_000)),
1142 logical_wal_spool,
1143 replicas: RwLock::new(Vec::new()),
1144 wal_appended: (Mutex::new(current_lsn), Condvar::new()),
1145 slot_path,
1146 slots: RwLock::new(slots),
1147 slot_retention_max_lag_lsn: config.slot_retention_max_lag_lsn,
1148 slot_idle_timeout_ms: config.slot_idle_timeout_ms,
1149 commit_waiter: Arc::new(crate::replication::commit_waiter::CommitWaiter::new()),
1150 topology_epoch: std::sync::atomic::AtomicU64::new(0),
1151 partial_resync_count: std::sync::atomic::AtomicU64::new(0),
1152 full_resync_count: std::sync::atomic::AtomicU64::new(0),
1153 }
1154 }
1155
1156 pub fn append_logical_record(&self, lsn: u64, encoded: Vec<u8>) {
1157 self.wal_buffer.append(lsn, encoded.clone());
1158 if let Some(spool) = &self.logical_wal_spool {
1159 let _ = spool.append(lsn, &encoded);
1160 }
1161 let (lock, cvar) = &self.wal_appended;
1162 let mut latest = lock.lock().unwrap_or_else(|e| e.into_inner());
1163 *latest = (*latest).max(lsn);
1164 cvar.notify_all();
1165 }
1166
1167 pub fn wait_for_logical_lsn_after(&self, since_lsn: u64, timeout: Duration) -> bool {
1168 if self.current_logical_lsn() > since_lsn {
1169 return true;
1170 }
1171 let deadline = Instant::now() + timeout;
1172 let (lock, cvar) = &self.wal_appended;
1173 let mut latest = lock.lock().unwrap_or_else(|e| e.into_inner());
1174 while *latest <= since_lsn {
1175 let now = Instant::now();
1176 if now >= deadline {
1177 return false;
1178 }
1179 let remaining = deadline.saturating_duration_since(now);
1180 let (guard, result) = cvar
1181 .wait_timeout(latest, remaining)
1182 .unwrap_or_else(|e| e.into_inner());
1183 latest = guard;
1184 if result.timed_out() && *latest <= since_lsn {
1185 return false;
1186 }
1187 }
1188 true
1189 }
1190
1191 pub fn register_replica(&self, id: String) -> u64 {
1192 self.register_replica_with_region(id, None)
1193 }
1194
1195 pub fn register_replica_with_region(&self, id: String, region: Option<String>) -> u64 {
1212 let now_ms = crate::utils::now_unix_millis() as u128;
1213 let resume_lsn = self.ensure_slot(&id, self.current_logical_lsn());
1214 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
1215 if let Some(existing) = replicas.iter_mut().find(|r| r.id == id) {
1216 existing.last_seen_at_unix_ms = now_ms;
1217 if region.is_some() {
1218 existing.region = region;
1219 }
1220 return resume_lsn;
1221 }
1222 replicas.push(ReplicaState {
1223 id,
1224 last_acked_lsn: resume_lsn,
1225 last_sent_lsn: resume_lsn,
1226 last_durable_lsn: resume_lsn,
1227 apply_error_count: 0,
1228 divergence_count: 0,
1229 connected_at_unix_ms: now_ms,
1230 last_seen_at_unix_ms: now_ms,
1231 region,
1232 rebootstrapping: false,
1233 });
1234 drop(replicas);
1235 self.bump_topology_epoch();
1236 resume_lsn
1237 }
1238
1239 pub fn set_replica_rebootstrapping(&self, id: &str, rebootstrapping: bool) -> bool {
1254 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
1255 let Some(state) = replicas.iter_mut().find(|r| r.id == id) else {
1256 return false;
1257 };
1258 if state.rebootstrapping == rebootstrapping {
1259 return true;
1260 }
1261 state.rebootstrapping = rebootstrapping;
1262 drop(replicas);
1263 self.bump_topology_epoch();
1264 true
1265 }
1266
1267 pub fn ensure_replica_registered(&self, id: &str) -> bool {
1276 let already = self
1277 .replicas
1278 .read()
1279 .unwrap_or_else(|e| e.into_inner())
1280 .iter()
1281 .any(|r| r.id == id);
1282 if already {
1283 return false;
1284 }
1285 self.register_replica(id.to_string());
1286 true
1287 }
1288
1289 pub fn unregister_replica(&self, id: &str) -> bool {
1293 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
1294 let before = replicas.len();
1295 replicas.retain(|r| r.id != id);
1296 let removed = replicas.len() != before;
1297 drop(replicas);
1298 if removed {
1299 self.commit_waiter.drop_replica(id);
1300 self.bump_topology_epoch();
1301 }
1302 removed
1303 }
1304
1305 pub fn topology_epoch(&self) -> u64 {
1308 self.topology_epoch
1309 .load(std::sync::atomic::Ordering::Relaxed)
1310 }
1311
1312 pub fn bump_topology_epoch(&self) {
1320 self.topology_epoch
1321 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1322 }
1323
1324 pub fn ack_replica(&self, id: &str, lsn: u64) {
1325 let now_ms = crate::utils::now_unix_millis() as u128;
1326 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
1327 if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
1328 r.last_acked_lsn = r.last_acked_lsn.max(lsn);
1329 r.last_durable_lsn = r.last_durable_lsn.max(lsn);
1330 r.last_seen_at_unix_ms = now_ms;
1331 }
1332 drop(replicas);
1333 self.commit_waiter.record_replica_ack(id, lsn);
1334 }
1335
1336 pub fn ack_replica_lsn(&self, id: &str, applied_lsn: u64, durable_lsn: u64) {
1342 self.ack_replica_lsn_with_observability(id, applied_lsn, durable_lsn, 0, 0);
1343 }
1344
1345 pub fn ack_replica_lsn_with_observability(
1346 &self,
1347 id: &str,
1348 applied_lsn: u64,
1349 durable_lsn: u64,
1350 apply_error_count: u64,
1351 divergence_count: u64,
1352 ) {
1353 let now_ms = crate::utils::now_unix_millis() as u128;
1354 self.advance_slot(id, applied_lsn, durable_lsn, now_ms);
1355 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
1356 if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
1357 r.last_acked_lsn = r.last_acked_lsn.max(applied_lsn);
1358 r.last_durable_lsn = r.last_durable_lsn.max(durable_lsn);
1359 r.apply_error_count = r.apply_error_count.max(apply_error_count);
1360 r.divergence_count = r.divergence_count.max(divergence_count);
1361 r.last_seen_at_unix_ms = now_ms;
1362 }
1363 drop(replicas);
1367 self.commit_waiter.record_replica_ack(id, durable_lsn);
1368 }
1369
1370 pub fn note_replica_pull(&self, id: &str, last_sent_lsn: u64) {
1375 let now_ms = crate::utils::now_unix_millis() as u128;
1376 self.touch_slot(id, now_ms);
1377 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
1378 if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
1379 r.last_sent_lsn = r.last_sent_lsn.max(last_sent_lsn);
1380 r.last_seen_at_unix_ms = now_ms;
1381 }
1382 }
1383
1384 pub fn replica_snapshots(&self) -> Vec<ReplicaState> {
1388 self.replicas
1389 .read()
1390 .unwrap_or_else(|e| e.into_inner())
1391 .clone()
1392 }
1393
1394 pub fn replication_progress(&self) -> Option<ReplicationProgress> {
1395 let replicas = self.replicas.read().unwrap_or_else(|e| e.into_inner());
1396 ReplicationProgress::from_replicas(&replicas)
1397 }
1398
1399 pub fn slot_snapshots(&self) -> Vec<ReplicationSlot> {
1400 self.slots
1401 .read()
1402 .unwrap_or_else(|e| e.into_inner())
1403 .values()
1404 .cloned()
1405 .collect()
1406 }
1407
1408 pub fn retention_floor_lsn(&self) -> Option<u64> {
1409 self.slots
1410 .read()
1411 .unwrap_or_else(|e| e.into_inner())
1412 .values()
1413 .filter(|slot| slot.invalidation_reason.is_none())
1414 .map(|slot| slot.restart_lsn)
1415 .min()
1416 }
1417
1418 pub fn prune_retained_wal_through(&self, archived_lsn: u64) -> io::Result<u64> {
1419 self.enforce_retention_limits(crate::utils::now_unix_millis() as u128);
1420 let prune_lsn = self
1421 .retention_floor_lsn()
1422 .map(|floor| floor.min(archived_lsn))
1423 .unwrap_or(archived_lsn);
1424 if prune_lsn > 0 {
1425 if let Some(spool) = &self.logical_wal_spool {
1426 spool.prune_through(prune_lsn)?;
1427 }
1428 self.wal_buffer.prune_through(prune_lsn);
1429 }
1430 Ok(prune_lsn)
1431 }
1432
1433 pub fn replica_count(&self) -> usize {
1434 self.replicas
1435 .read()
1436 .unwrap_or_else(|e| e.into_inner())
1437 .len()
1438 }
1439
1440 pub fn current_logical_lsn(&self) -> u64 {
1444 self.logical_wal_spool
1445 .as_ref()
1446 .map(|spool| spool.current_lsn())
1447 .unwrap_or_else(|| self.wal_buffer.current_lsn())
1448 }
1449
1450 fn ensure_slot(&self, id: &str, initial_lsn: u64) -> u64 {
1451 let now_ms = crate::utils::now_unix_millis() as u128;
1452 let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
1453 if let Some(slot) = slots.get_mut(id) {
1454 slot.last_seen_at_unix_ms = now_ms;
1455 let restart_lsn = slot.restart_lsn;
1456 self.persist_slots_locked(&slots);
1457 return restart_lsn;
1458 }
1459 slots.insert(
1460 id.to_string(),
1461 ReplicationSlot {
1462 id: id.to_string(),
1463 restart_lsn: initial_lsn,
1464 confirmed_lsn: initial_lsn,
1465 last_seen_at_unix_ms: now_ms,
1466 invalidation_reason: None,
1467 invalidated_at_unix_ms: None,
1468 },
1469 );
1470 let restart_lsn = initial_lsn;
1471 self.persist_slots_locked(&slots);
1472 restart_lsn
1473 }
1474
1475 fn advance_slot(&self, id: &str, confirmed_lsn: u64, restart_lsn: u64, now_ms: u128) {
1476 let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
1477 let slot = slots
1478 .entry(id.to_string())
1479 .or_insert_with(|| ReplicationSlot {
1480 id: id.to_string(),
1481 restart_lsn: 0,
1482 confirmed_lsn: 0,
1483 last_seen_at_unix_ms: now_ms,
1484 invalidation_reason: None,
1485 invalidated_at_unix_ms: None,
1486 });
1487 if slot.invalidation_reason.is_some() {
1488 return;
1489 }
1490 slot.confirmed_lsn = slot.confirmed_lsn.max(confirmed_lsn).max(restart_lsn);
1491 slot.restart_lsn = slot.restart_lsn.max(restart_lsn);
1492 slot.last_seen_at_unix_ms = now_ms;
1493 self.persist_slots_locked(&slots);
1494 }
1495
1496 pub fn touch_slot(&self, id: &str, now_ms: u128) {
1497 let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
1498 let mut changed = false;
1499 if let Some(slot) = slots.get_mut(id) {
1500 if slot.invalidation_reason.is_none() {
1501 slot.last_seen_at_unix_ms = now_ms;
1502 changed = true;
1503 }
1504 }
1505 if changed {
1506 self.persist_slots_locked(&slots);
1507 }
1508 }
1509
1510 pub fn enforce_retention_limits(&self, now_ms: u128) -> Vec<(String, SlotInvalidationCause)> {
1511 let current_lsn = self.current_logical_lsn();
1512 let mut invalidated = Vec::new();
1513 let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
1514 for slot in slots.values_mut() {
1515 if slot.invalidation_reason.is_some() {
1516 continue;
1517 }
1518 let reason = if self.slot_retention_max_lag_lsn > 0
1519 && current_lsn.saturating_sub(slot.restart_lsn) > self.slot_retention_max_lag_lsn
1520 {
1521 Some(SlotInvalidationCause::Horizon)
1522 } else if self.slot_idle_timeout_ms > 0
1523 && now_ms.saturating_sub(slot.last_seen_at_unix_ms)
1524 > u128::from(self.slot_idle_timeout_ms)
1525 {
1526 Some(SlotInvalidationCause::IdleTimeout)
1527 } else {
1528 None
1529 };
1530 if let Some(reason) = reason {
1531 slot.invalidation_reason = Some(reason);
1532 slot.invalidated_at_unix_ms = Some(now_ms);
1533 invalidated.push((slot.id.clone(), reason));
1534 }
1535 }
1536 if !invalidated.is_empty() {
1537 self.persist_slots_locked(&slots);
1538 }
1539 invalidated
1540 }
1541
1542 pub fn slot_rebootstrap_reason(
1543 &self,
1544 id: &str,
1545 requested_since_lsn: u64,
1546 oldest_available_lsn: Option<u64>,
1547 ) -> Option<SlotInvalidationCause> {
1548 let now_ms = crate::utils::now_unix_millis() as u128;
1549 let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
1550 let slot = slots.get_mut(id)?;
1551 if let Some(reason) = slot.invalidation_reason {
1552 return Some(reason);
1553 }
1554 let slot_floor = slot.restart_lsn.max(requested_since_lsn);
1555 if oldest_available_lsn
1556 .map(|oldest| oldest > slot_floor.saturating_add(1))
1557 .unwrap_or(false)
1558 {
1559 slot.invalidation_reason = Some(SlotInvalidationCause::WalRemoved);
1560 slot.invalidated_at_unix_ms = Some(now_ms);
1561 self.persist_slots_locked(&slots);
1562 return Some(SlotInvalidationCause::WalRemoved);
1563 }
1564 None
1565 }
1566
1567 pub fn plan_replica_resume(
1576 &self,
1577 id: &str,
1578 requested_since_lsn: u64,
1579 oldest_available_lsn: Option<u64>,
1580 ) -> ResumeMode {
1581 if let Some(cause) =
1582 self.slot_rebootstrap_reason(id, requested_since_lsn, oldest_available_lsn)
1583 {
1584 self.full_resync_count
1585 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1586 return ResumeMode::FullRebootstrap { cause };
1587 }
1588 let resume_lsn = self
1589 .slot_snapshots()
1590 .into_iter()
1591 .find(|slot| slot.id == id)
1592 .map(|slot| requested_since_lsn.max(slot.restart_lsn))
1593 .unwrap_or(requested_since_lsn);
1594 self.partial_resync_count
1595 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1596 ResumeMode::PartialResync { resume_lsn }
1597 }
1598
1599 pub fn partial_resync_count(&self) -> u64 {
1602 self.partial_resync_count
1603 .load(std::sync::atomic::Ordering::Relaxed)
1604 }
1605
1606 pub fn full_resync_count(&self) -> u64 {
1610 self.full_resync_count
1611 .load(std::sync::atomic::Ordering::Relaxed)
1612 }
1613
1614 fn persist_slots_locked(&self, slots: &BTreeMap<String, ReplicationSlot>) {
1615 if let Err(err) = persist_replication_slots(self.slot_path.as_deref(), slots) {
1616 warn!(
1617 target: "reddb::replication::slots",
1618 error = %err,
1619 "failed to persist replication slots"
1620 );
1621 }
1622 }
1623}
1624
1625#[cfg(test)]
1626mod tests {
1627 use super::*;
1628 use crate::replication::cdc::{ChangeOperation, ChangeRecord};
1629 use std::time::{SystemTime, UNIX_EPOCH};
1630
1631 fn temp_data_path(name: &str) -> PathBuf {
1632 let suffix = SystemTime::now()
1633 .duration_since(UNIX_EPOCH)
1634 .unwrap()
1635 .as_nanos();
1636 std::env::temp_dir().join(format!("reddb_{name}_{suffix}.rdb"))
1637 }
1638
1639 #[test]
1640 fn logical_wal_spool_roundtrip_and_prune() {
1641 let data_path = temp_data_path("logical_spool");
1642 let spool_path = LogicalWalSpool::path_for(&data_path);
1643 let spool = LogicalWalSpool::open(&data_path).expect("open spool");
1644
1645 let record1 = ChangeRecord {
1646 term: 2,
1647 lsn: 7,
1648 timestamp: 1,
1649 operation: ChangeOperation::Insert,
1650 collection: "users".to_string(),
1651 entity_id: 10,
1652 entity_kind: "row".to_string(),
1653 entity_bytes: Some(vec![1, 2, 3]),
1654 metadata: None,
1655 refresh_records: None,
1656 };
1657 let record2 = ChangeRecord {
1658 term: 2,
1659 lsn: 8,
1660 timestamp: 2,
1661 operation: ChangeOperation::Update,
1662 collection: "users".to_string(),
1663 entity_id: 10,
1664 entity_kind: "row".to_string(),
1665 entity_bytes: Some(vec![4, 5, 6]),
1666 metadata: None,
1667 refresh_records: None,
1668 };
1669
1670 spool
1671 .append_with_term_and_timestamp(record1.term, record1.lsn, 11, &record1.encode())
1672 .expect("append 1");
1673 spool
1674 .append_with_term_and_timestamp(record2.term, record2.lsn, 12, &record2.encode())
1675 .expect("append 2");
1676
1677 let entries = spool.read_since(0, usize::MAX).expect("read");
1678 assert_eq!(entries.len(), 2);
1679 assert_eq!(entries[0].0, 7);
1680 assert_eq!(entries[1].0, 8);
1681 assert_eq!(ChangeRecord::decode(&entries[0].1).unwrap().term, 2);
1682
1683 let framed = read_and_repair_entries(&spool_path).expect("read framed entries");
1684 assert_eq!(framed[0].term, 2);
1685 assert_eq!(framed[0].timestamp_ms, 11);
1686
1687 spool.prune_through(7).expect("prune");
1688 let retained = spool.read_since(0, usize::MAX).expect("read retained");
1689 assert_eq!(retained.len(), 1);
1690 assert_eq!(retained[0].0, 8);
1691 assert_eq!(ChangeRecord::decode(&retained[0].1).unwrap().term, 2);
1692
1693 let _ = fs::remove_file(spool_path);
1694 }
1695
1696 #[test]
1697 fn logical_wal_spool_reads_v2_without_term() {
1698 let data_path = temp_data_path("logical_spool_v2");
1699 let spool_path = LogicalWalSpool::path_for(&data_path);
1700 let payload = br#"{"lsn":3,"timestamp":44,"operation":"delete","collection":"users","rid":9,"kind":"row"}"#;
1701 let lsn = 3u64;
1702 let timestamp = 44u64;
1703 let crc = compute_logical_v2_crc(LOGICAL_WAL_SPOOL_VERSION_V2, lsn, timestamp, payload);
1704
1705 let mut file = File::create(&spool_path).expect("create v2 spool");
1706 file.write_all(LOGICAL_WAL_SPOOL_MAGIC).unwrap();
1707 file.write_all(&[LOGICAL_WAL_SPOOL_VERSION_V2]).unwrap();
1708 file.write_all(&lsn.to_le_bytes()).unwrap();
1709 file.write_all(×tamp.to_le_bytes()).unwrap();
1710 file.write_all(&(payload.len() as u32).to_le_bytes())
1711 .unwrap();
1712 file.write_all(payload).unwrap();
1713 file.write_all(&crc.to_le_bytes()).unwrap();
1714 file.sync_all().unwrap();
1715
1716 let spool = LogicalWalSpool::open(&data_path).expect("open v2 spool");
1717 let records = spool.read_since(0, usize::MAX).expect("read v2 spool");
1718 assert_eq!(records.len(), 1);
1719 assert_eq!(records[0].0, 3);
1720 let decoded = ChangeRecord::decode(&records[0].1).expect("decode v2 payload");
1721 assert_eq!(decoded.term, crate::replication::DEFAULT_REPLICATION_TERM);
1722 assert_eq!(decoded.lsn, 3);
1723
1724 let framed = read_and_repair_entries(&spool_path).expect("read framed v2 entries");
1725 assert_eq!(framed[0].term, crate::replication::DEFAULT_REPLICATION_TERM);
1726
1727 let _ = fs::remove_file(spool_path);
1728 }
1729
1730 #[test]
1731 fn topology_epoch_monotonic_on_register_and_unregister() {
1732 let primary = PrimaryReplication::new(None);
1737 let e0 = primary.topology_epoch();
1738 primary.register_replica("r1".to_string());
1739 let e1 = primary.topology_epoch();
1740 primary.register_replica("r2".to_string());
1741 let e2 = primary.topology_epoch();
1742 assert!(e1 > e0, "register must bump epoch ({e0} -> {e1})");
1743 assert!(e2 > e1, "second register must bump epoch ({e1} -> {e2})");
1744
1745 let removed = primary.unregister_replica("r1");
1746 assert!(removed);
1747 let e3 = primary.topology_epoch();
1748 assert!(e3 > e2, "unregister must bump epoch ({e2} -> {e3})");
1749
1750 let absent = primary.unregister_replica("ghost");
1753 assert!(!absent);
1754 assert_eq!(
1755 primary.topology_epoch(),
1756 e3,
1757 "unregistering a missing replica must not bump the epoch"
1758 );
1759 }
1760
1761 #[test]
1762 fn register_replica_is_idempotent_on_reconnect() {
1763 let primary = PrimaryReplication::new(None);
1768
1769 primary.register_replica("r1".to_string());
1771 assert_eq!(
1772 primary.replica_count(),
1773 1,
1774 "first register creates an entry"
1775 );
1776 let epoch_after_first = primary.topology_epoch();
1777
1778 primary.note_replica_pull("r1", 42);
1780 primary.ack_replica_lsn("r1", 40, 40);
1781 let before = primary
1782 .replica_snapshots()
1783 .into_iter()
1784 .find(|r| r.id == "r1")
1785 .expect("r1 present");
1786 assert_eq!(before.last_sent_lsn, 42);
1787 assert_eq!(before.last_acked_lsn, 40);
1788 assert_eq!(before.last_durable_lsn, 40);
1789
1790 let resume_lsn = primary.register_replica("r1".to_string());
1792
1793 assert_eq!(
1795 primary.replica_count(),
1796 1,
1797 "reconnect must not create a duplicate registry entry"
1798 );
1799 assert_eq!(
1801 primary.topology_epoch(),
1802 epoch_after_first,
1803 "reconnect must not bump the topology epoch"
1804 );
1805 let after = primary
1807 .replica_snapshots()
1808 .into_iter()
1809 .find(|r| r.id == "r1")
1810 .expect("r1 still present");
1811 assert_eq!(after.last_sent_lsn, 42, "last_sent_lsn preserved");
1812 assert_eq!(after.last_acked_lsn, 40, "last_acked_lsn preserved");
1813 assert_eq!(after.last_durable_lsn, 40, "last_durable_lsn preserved");
1814 assert_eq!(resume_lsn, 40, "reconnect returns the slot restart LSN");
1816 }
1817
1818 #[test]
1819 fn replica_slot_persists_and_reconnect_resumes_from_restart_lsn() {
1820 let data_path = temp_data_path("replication_slots");
1821 let spool_path = LogicalWalSpool::path_for(&data_path);
1822 let slot_path = PrimaryReplication::slot_path_for(&data_path);
1823
1824 {
1825 let primary = PrimaryReplication::new(Some(&data_path));
1826 primary.register_replica("r1".to_string());
1827 primary.note_replica_pull("r1", 12);
1828 primary.ack_replica_lsn("r1", 10, 8);
1829
1830 let slot = primary
1831 .slot_snapshots()
1832 .into_iter()
1833 .find(|slot| slot.id == "r1")
1834 .expect("r1 slot present");
1835 assert_eq!(slot.restart_lsn, 8);
1836 assert_eq!(slot.confirmed_lsn, 10);
1837 }
1838
1839 let reopened = PrimaryReplication::new(Some(&data_path));
1840 let slot = reopened
1841 .slot_snapshots()
1842 .into_iter()
1843 .find(|slot| slot.id == "r1")
1844 .expect("r1 slot loaded after reopen");
1845 assert_eq!(slot.restart_lsn, 8);
1846 assert_eq!(slot.confirmed_lsn, 10);
1847 assert_eq!(
1848 reopened.register_replica("r1".to_string()),
1849 8,
1850 "reconnect resumes from the durable slot restart LSN"
1851 );
1852
1853 let _ = fs::remove_file(spool_path);
1854 let _ = fs::remove_file(slot_path);
1855 }
1856
1857 #[test]
1858 fn retention_floor_follows_slowest_slot_and_prunes_wal() {
1859 let primary = PrimaryReplication::new(None);
1860 primary.register_replica("fast".to_string());
1861 primary.register_replica("slow".to_string());
1862
1863 for lsn in 1..=6 {
1864 primary.wal_buffer.append(lsn, vec![lsn as u8]);
1865 }
1866
1867 primary.ack_replica_lsn("fast", 5, 5);
1868 primary.ack_replica_lsn("slow", 3, 2);
1869
1870 assert_eq!(
1871 primary.retention_floor_lsn(),
1872 Some(2),
1873 "slowest slot restart_lsn sets the retention floor"
1874 );
1875 assert_eq!(primary.prune_retained_wal_through(6).unwrap(), 2);
1876 let retained: Vec<_> = primary
1877 .wal_buffer
1878 .read_since(0, usize::MAX)
1879 .into_iter()
1880 .map(|(lsn, _)| lsn)
1881 .collect();
1882 assert_eq!(retained, vec![3, 4, 5, 6]);
1883
1884 primary.ack_replica_lsn("slow", 6, 6);
1885 assert_eq!(
1886 primary.retention_floor_lsn(),
1887 Some(5),
1888 "slot confirmation advances the retention floor"
1889 );
1890 assert_eq!(primary.prune_retained_wal_through(6).unwrap(), 5);
1891 let retained: Vec<_> = primary
1892 .wal_buffer
1893 .read_since(0, usize::MAX)
1894 .into_iter()
1895 .map(|(lsn, _)| lsn)
1896 .collect();
1897 assert_eq!(retained, vec![6]);
1898 }
1899
1900 #[test]
1901 fn bootstrap_slot_pin_prevents_wal_removed_rebootstrap_after_prune() {
1902 let primary = PrimaryReplication::new(None);
1903 for lsn in 1..=5 {
1904 primary.wal_buffer.append(lsn, vec![lsn as u8]);
1905 }
1906
1907 let slot_lsn = primary.register_replica("bootstrapping".to_string());
1908 assert_eq!(slot_lsn, 5, "bootstrap pins the current frontier");
1909
1910 for lsn in 6..=8 {
1911 primary.wal_buffer.append(lsn, vec![lsn as u8]);
1912 }
1913
1914 assert_eq!(
1915 primary.prune_retained_wal_through(8).unwrap(),
1916 5,
1917 "bootstrap slot keeps the frontier retained"
1918 );
1919 assert_eq!(
1920 primary.slot_rebootstrap_reason("bootstrapping", 0, primary.wal_buffer.oldest_lsn()),
1921 None,
1922 "a caller resuming from its slot must not see wal-removed after slot-aware pruning"
1923 );
1924 }
1925
1926 #[test]
1927 fn default_config_enables_finite_slot_retention_cap() {
1928 let config = crate::replication::ReplicationConfig::primary();
1929
1930 assert!(
1931 config.slot_retention_max_lag_lsn > 0,
1932 "primary replication must default to a finite slot retention cap"
1933 );
1934 }
1935
1936 #[test]
1937 fn retention_cap_invalidates_slow_slot_and_releases_wal_floor() {
1938 let primary = PrimaryReplication::new_with_config(
1939 None,
1940 &crate::replication::ReplicationConfig::primary().with_slot_retention_max_lag_lsn(3),
1941 );
1942 primary.register_replica("fast".to_string());
1943 primary.register_replica("slow".to_string());
1944
1945 for lsn in 1..=6 {
1946 primary.wal_buffer.append(lsn, vec![lsn as u8]);
1947 }
1948 primary.ack_replica_lsn("fast", 6, 6);
1949
1950 assert_eq!(primary.prune_retained_wal_through(6).unwrap(), 6);
1951
1952 let slow = primary
1953 .slot_snapshots()
1954 .into_iter()
1955 .find(|slot| slot.id == "slow")
1956 .expect("slow slot present");
1957 assert_eq!(
1958 slow.invalidation_reason,
1959 Some(SlotInvalidationCause::Horizon)
1960 );
1961
1962 let retained: Vec<_> = primary
1963 .wal_buffer
1964 .read_since(0, usize::MAX)
1965 .into_iter()
1966 .map(|(lsn, _)| lsn)
1967 .collect();
1968 assert!(
1969 retained.is_empty(),
1970 "invalidated slow slot must not pin WAL"
1971 );
1972 }
1973
1974 #[test]
1975 fn slot_invalidation_cause_codes_cover_wal_removed_horizon_and_idle_timeout() {
1976 let wal_removed = PrimaryReplication::new_with_config(
1977 None,
1978 &crate::replication::ReplicationConfig::primary()
1979 .with_slot_retention_max_lag_lsn(3)
1980 .with_slot_idle_timeout_ms(10),
1981 );
1982 wal_removed.register_replica("wal".to_string());
1983 assert_eq!(
1984 wal_removed.slot_rebootstrap_reason("wal", 0, Some(2)),
1985 Some(SlotInvalidationCause::WalRemoved)
1986 );
1987
1988 let horizon = PrimaryReplication::new_with_config(
1989 None,
1990 &crate::replication::ReplicationConfig::primary().with_slot_retention_max_lag_lsn(3),
1991 );
1992 horizon.register_replica("horizon".to_string());
1993 for lsn in 1..=4 {
1994 horizon.wal_buffer.append(lsn, vec![lsn as u8]);
1995 }
1996 horizon.enforce_retention_limits(0);
1997 assert_eq!(
1998 horizon
1999 .slot_snapshots()
2000 .into_iter()
2001 .find(|slot| slot.id == "horizon")
2002 .and_then(|slot| slot.invalidation_reason),
2003 Some(SlotInvalidationCause::Horizon)
2004 );
2005
2006 let idle = PrimaryReplication::new_with_config(
2007 None,
2008 &crate::replication::ReplicationConfig::primary().with_slot_idle_timeout_ms(10),
2009 );
2010 idle.register_replica("idle".to_string());
2011 idle.touch_slot("idle", 1);
2012 idle.enforce_retention_limits(12);
2013 assert_eq!(
2014 idle.slot_snapshots()
2015 .into_iter()
2016 .find(|slot| slot.id == "idle")
2017 .and_then(|slot| slot.invalidation_reason),
2018 Some(SlotInvalidationCause::IdleTimeout)
2019 );
2020 }
2021
2022 #[test]
2023 fn wal_buffer_fan_out_shares_refcounted_payload() {
2024 let buffer = WalBuffer::new(8);
2028 buffer.append(1, vec![0xDE, 0xAD, 0xBE, 0xEF]);
2029
2030 let replica_a = buffer.read_since_shared(0, usize::MAX);
2031 let replica_b = buffer.read_since_shared(0, usize::MAX);
2032 assert_eq!(replica_a.len(), 1);
2033 assert_eq!(replica_b.len(), 1);
2034
2035 assert!(
2036 Arc::ptr_eq(&replica_a[0].1, &replica_b[0].1),
2037 "two replicas must share one ref-counted payload allocation"
2038 );
2039 assert_eq!(&*replica_a[0].1, &[0xDE, 0xAD, 0xBE, 0xEF]);
2040 assert!(
2041 Arc::strong_count(&replica_a[0].1) >= 3,
2042 "buffer + both replica handles reference the same payload"
2043 );
2044
2045 let owned = buffer.read_since(0, usize::MAX);
2047 assert_eq!(owned, vec![(1u64, vec![0xDE, 0xAD, 0xBE, 0xEF])]);
2048 }
2049
2050 #[test]
2051 fn spool_seek_index_resume_is_sublinear() {
2052 let data_path = temp_data_path("seek_index");
2056 let spool_path = LogicalWalSpool::path_for(&data_path);
2057 let spool = LogicalWalSpool::open(&data_path).expect("open spool");
2058
2059 for lsn in 1..=200u64 {
2060 spool
2061 .append_with_term_and_timestamp(1, lsn, lsn, &[(lsn % 251) as u8, 0xAB])
2062 .expect("append");
2063 }
2064
2065 assert_eq!(spool.read_since(0, usize::MAX).expect("full").len(), 200);
2067 assert_eq!(spool.seek_floor_offset(0), 0);
2068
2069 let resumed = spool.read_since(130, usize::MAX).expect("resume");
2072 assert_eq!(resumed.first().map(|(lsn, _)| *lsn), Some(131));
2073 assert_eq!(resumed.last().map(|(lsn, _)| *lsn), Some(200));
2074 assert_eq!(resumed.len(), 70);
2075 assert!(
2076 spool.seek_floor_offset(130) > 0,
2077 "mid-spool resume must seek past offset 0"
2078 );
2079
2080 drop(spool);
2083 let reopened = LogicalWalSpool::open(&data_path).expect("reopen spool");
2084 assert!(reopened.seek_floor_offset(130) > 0);
2085 assert_eq!(
2086 reopened
2087 .read_since(130, usize::MAX)
2088 .expect("resume reopen")
2089 .len(),
2090 70
2091 );
2092
2093 let _ = fs::remove_file(spool_path);
2094 }
2095
2096 #[test]
2097 fn plan_replica_resume_partial_within_window_full_past_cap() {
2098 let within = PrimaryReplication::new(None);
2102 within.register_replica("blip".to_string());
2103 for lsn in 1..=5 {
2104 within.wal_buffer.append(lsn, vec![lsn as u8]);
2105 }
2106 let before = within.partial_resync_count();
2107 match within.plan_replica_resume("blip", 2, within.wal_buffer.oldest_lsn()) {
2108 ResumeMode::PartialResync { resume_lsn } => assert_eq!(resume_lsn, 2),
2109 other => panic!("brief blip must resume via partial resync, got {other:?}"),
2110 }
2111 assert_eq!(
2112 within.partial_resync_count(),
2113 before + 1,
2114 "partial resync must be observable via the metric"
2115 );
2116 assert_eq!(
2117 within.full_resync_count(),
2118 0,
2119 "a partial resync must not bump the full-resync counter"
2120 );
2121
2122 let past_cap = PrimaryReplication::new_with_config(
2126 None,
2127 &crate::replication::ReplicationConfig::primary().with_slot_retention_max_lag_lsn(3),
2128 );
2129 past_cap.register_replica("slow".to_string());
2130 for lsn in 1..=6 {
2131 past_cap.wal_buffer.append(lsn, vec![lsn as u8]);
2132 }
2133 past_cap.enforce_retention_limits(0);
2134 let before_full = past_cap.partial_resync_count();
2135 let before_full_count = past_cap.full_resync_count();
2136 match past_cap.plan_replica_resume("slow", 0, past_cap.wal_buffer.oldest_lsn()) {
2137 ResumeMode::FullRebootstrap { cause } => {
2138 assert_eq!(cause, SlotInvalidationCause::Horizon)
2139 }
2140 other => panic!("slot past the cap must re-bootstrap, got {other:?}"),
2141 }
2142 assert_eq!(
2143 past_cap.partial_resync_count(),
2144 before_full,
2145 "a full re-bootstrap must not be counted as a partial resync"
2146 );
2147 assert_eq!(
2148 past_cap.full_resync_count(),
2149 before_full_count + 1,
2150 "a full re-bootstrap must bump the full-resync alert counter (issue #839)"
2151 );
2152 }
2153
2154 #[test]
2155 fn ensure_replica_registered_self_registers_then_is_a_noop() {
2156 let primary = PrimaryReplication::new(None);
2160
2161 assert!(
2163 primary.ensure_replica_registered("r1"),
2164 "first identification registers the replica"
2165 );
2166 assert_eq!(primary.replica_count(), 1);
2167 let epoch_after_register = primary.topology_epoch();
2168
2169 primary.note_replica_pull("r1", 7);
2171 assert_eq!(
2172 primary
2173 .replica_snapshots()
2174 .into_iter()
2175 .find(|r| r.id == "r1")
2176 .map(|r| r.last_sent_lsn),
2177 Some(7),
2178 "primary tracks last_sent_lsn for a registered replica's pull"
2179 );
2180
2181 assert!(
2184 !primary.ensure_replica_registered("r1"),
2185 "already-registered replica is not re-registered"
2186 );
2187 assert_eq!(primary.replica_count(), 1);
2188 assert_eq!(primary.topology_epoch(), epoch_after_register);
2189 assert_eq!(
2190 primary
2191 .replica_snapshots()
2192 .into_iter()
2193 .find(|r| r.id == "r1")
2194 .map(|r| r.last_sent_lsn),
2195 Some(7),
2196 "no-op registration preserves progress"
2197 );
2198 }
2199
2200 #[test]
2201 fn replication_progress_uses_sent_applied_and_durable_registry_lsns() {
2202 let now = crate::utils::now_unix_millis() as u128;
2203 let replicas = vec![
2204 ReplicaState {
2205 id: "fast".to_string(),
2206 last_acked_lsn: 90,
2207 last_sent_lsn: 120,
2208 last_durable_lsn: 80,
2209 apply_error_count: 0,
2210 divergence_count: 0,
2211 connected_at_unix_ms: now,
2212 last_seen_at_unix_ms: now,
2213 region: None,
2214 rebootstrapping: false,
2215 },
2216 ReplicaState {
2217 id: "slow".to_string(),
2218 last_acked_lsn: 70,
2219 last_sent_lsn: 100,
2220 last_durable_lsn: 60,
2221 apply_error_count: 0,
2222 divergence_count: 0,
2223 connected_at_unix_ms: now,
2224 last_seen_at_unix_ms: now,
2225 region: None,
2226 rebootstrapping: false,
2227 },
2228 ];
2229
2230 let progress = ReplicationProgress::from_replicas(&replicas).expect("registered replicas");
2231
2232 assert_eq!(progress.lag_lsn, 50);
2233 assert_eq!(progress.safe_replay_lsn, 60);
2234 }
2235}