1use std::collections::{BTreeMap, VecDeque};
51use std::fs::{self, File, OpenOptions};
52use std::io::{self, Read, Seek, SeekFrom, Write};
53use std::path::{Path, PathBuf};
54use std::sync::{Arc, Condvar, Mutex, RwLock};
55use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
56
57use tracing::warn;
58
59const LOGICAL_WAL_SPOOL_MAGIC: &[u8; 4] = b"RDLW";
60const LOGICAL_WAL_SPOOL_VERSION_V1: u8 = 1;
61const LOGICAL_WAL_SPOOL_VERSION_V2: u8 = 2;
62const LOGICAL_WAL_SPOOL_VERSION_V3: u8 = 3;
63const LOGICAL_WAL_SPOOL_VERSION_CURRENT: u8 = LOGICAL_WAL_SPOOL_VERSION_V3;
64const LOGICAL_WAL_V3_HEADER_LEN: u64 = 4 + 1 + 8 + 8 + 8 + 4;
67const LOGICAL_WAL_V2_CRC_LEN: u64 = 4;
69
70fn compute_logical_v2_crc(version: u8, lsn: u64, timestamp: u64, payload: &[u8]) -> u32 {
79 use crate::storage::engine::crc32::crc32_update;
80 let mut crc = crc32_update(0, &[version]);
81 crc = crc32_update(crc, &lsn.to_le_bytes());
82 crc = crc32_update(crc, ×tamp.to_le_bytes());
83 crc = crc32_update(crc, &(payload.len() as u32).to_le_bytes());
84 crc = crc32_update(crc, payload);
85 crc
86}
87
88fn compute_logical_v3_crc(version: u8, term: u64, lsn: u64, timestamp: u64, payload: &[u8]) -> u32 {
89 use crate::storage::engine::crc32::crc32_update;
90 let mut crc = crc32_update(0, &[version]);
91 crc = crc32_update(crc, &term.to_le_bytes());
92 crc = crc32_update(crc, &lsn.to_le_bytes());
93 crc = crc32_update(crc, ×tamp.to_le_bytes());
94 crc = crc32_update(crc, &(payload.len() as u32).to_le_bytes());
95 crc = crc32_update(crc, payload);
96 crc
97}
98
99fn term_from_payload(payload: &[u8]) -> u64 {
100 crate::replication::cdc::ChangeRecord::decode(payload)
101 .map(|record| record.term)
102 .unwrap_or(crate::replication::DEFAULT_REPLICATION_TERM)
103}
104
105pub struct WalBuffer {
113 records: RwLock<VecDeque<(u64, Arc<[u8]>)>>,
115 current_lsn: RwLock<u64>,
117}
118
119impl WalBuffer {
120 pub fn new(max_size: usize) -> Self {
121 Self {
122 records: RwLock::new(VecDeque::with_capacity(max_size)),
123 current_lsn: RwLock::new(0),
124 }
125 }
126
127 pub fn append(&self, lsn: u64, data: Vec<u8>) {
129 let mut records = self.records.write().unwrap_or_else(|e| e.into_inner());
130 records.push_back((lsn, Arc::from(data.into_boxed_slice())));
131
132 let mut current = self.current_lsn.write().unwrap_or_else(|e| e.into_inner());
133 *current = (*current).max(lsn);
134 }
135
136 pub fn read_since(&self, since_lsn: u64, max_count: usize) -> Vec<(u64, Vec<u8>)> {
142 self.read_since_shared(since_lsn, max_count)
143 .into_iter()
144 .map(|(lsn, data)| (lsn, data.to_vec()))
145 .collect()
146 }
147
148 pub fn read_since_shared(&self, since_lsn: u64, max_count: usize) -> Vec<(u64, Arc<[u8]>)> {
153 let records = self.records.read().unwrap_or_else(|e| e.into_inner());
154 records
155 .iter()
156 .filter(|(lsn, _)| *lsn > since_lsn)
157 .take(max_count)
158 .cloned()
159 .collect()
160 }
161
162 pub fn current_lsn(&self) -> u64 {
164 *self.current_lsn.read().unwrap_or_else(|e| e.into_inner())
165 }
166
167 pub fn set_current_lsn(&self, lsn: u64) {
168 let mut current = self.current_lsn.write().unwrap_or_else(|e| e.into_inner());
169 *current = (*current).max(lsn);
170 }
171
172 pub fn prune_through(&self, upto_lsn: u64) {
173 let mut records = self.records.write().unwrap_or_else(|e| e.into_inner());
174 while records
175 .front()
176 .map(|(lsn, _)| *lsn <= upto_lsn)
177 .unwrap_or(false)
178 {
179 records.pop_front();
180 }
181 }
182
183 pub fn oldest_lsn(&self) -> Option<u64> {
185 let records = self.records.read().unwrap_or_else(|e| e.into_inner());
186 records.front().map(|(lsn, _)| *lsn)
187 }
188}
189
190#[derive(Debug, Clone)]
191struct LogicalWalEntry {
192 term: u64,
193 lsn: u64,
194 timestamp_ms: u64,
197 data: Vec<u8>,
198}
199
200impl LogicalWalEntry {
201 fn data_with_framing_term(&self) -> Vec<u8> {
202 match crate::replication::cdc::ChangeRecord::decode(&self.data) {
203 Ok(mut record) if record.term != self.term => {
204 record.term = self.term;
205 record.encode()
206 }
207 _ => self.data.clone(),
208 }
209 }
210}
211
212const SEEK_INDEX_INTERVAL: u64 = 64;
220
221#[derive(Debug, Default)]
222struct LogicalWalSpoolState {
223 current_lsn: u64,
224 seek_index: Vec<(u64, u64)>,
228 write_offset: u64,
232 record_count: u64,
235}
236
237impl LogicalWalSpoolState {
238 fn note_record(&mut self, ordinal: u64, lsn: u64, offset: u64) {
242 if ordinal.is_multiple_of(SEEK_INDEX_INTERVAL) {
243 if self.seek_index.last().map(|(l, _)| *l) != Some(lsn) {
247 self.seek_index.push((lsn, offset));
248 }
249 }
250 }
251
252 fn seek_floor_offset(&self, since_lsn: u64) -> u64 {
257 match self
258 .seek_index
259 .binary_search_by(|(lsn, _)| lsn.cmp(&since_lsn))
260 {
261 Ok(idx) => self.seek_index[idx].1,
262 Err(0) => 0,
263 Err(idx) => self.seek_index[idx - 1].1,
264 }
265 }
266}
267
268pub struct LogicalWalSpool {
272 path: PathBuf,
273 state: Mutex<LogicalWalSpoolState>,
274}
275
276impl LogicalWalSpool {
277 pub fn path_for(data_path: &Path) -> PathBuf {
278 let file_name = data_path
279 .file_name()
280 .and_then(|name| name.to_str())
281 .unwrap_or("reddb.rdb");
282 let spool_name = format!("{file_name}.logical.wal");
283 match data_path.parent() {
284 Some(parent) => parent.join(spool_name),
285 None => PathBuf::from(spool_name),
286 }
287 }
288
289 pub fn open(data_path: &Path) -> io::Result<Self> {
290 let path = Self::path_for(data_path);
291 if let Some(parent) = path.parent() {
292 fs::create_dir_all(parent)?;
293 }
294 if !path.exists() {
295 File::create(&path)?;
296 }
297 let entries = read_and_repair_entries(&path)?;
302 let current_lsn = entries.last().map(|entry| entry.lsn).unwrap_or(0);
303 let (seek_index, write_offset, record_count) = build_seek_index(&path)?;
306 Ok(Self {
307 path,
308 state: Mutex::new(LogicalWalSpoolState {
309 current_lsn,
310 seek_index,
311 write_offset,
312 record_count,
313 }),
314 })
315 }
316
317 pub fn append(&self, lsn: u64, data: &[u8]) -> io::Result<()> {
318 let timestamp_ms = SystemTime::now()
319 .duration_since(UNIX_EPOCH)
320 .map(|d| d.as_millis() as u64)
321 .unwrap_or(0);
322 self.append_with_timestamp(lsn, timestamp_ms, data)
323 }
324
325 pub fn append_with_timestamp(
329 &self,
330 lsn: u64,
331 timestamp_ms: u64,
332 data: &[u8],
333 ) -> io::Result<()> {
334 self.append_with_term_and_timestamp(term_from_payload(data), lsn, timestamp_ms, data)
335 }
336
337 pub fn append_with_term_and_timestamp(
338 &self,
339 term: u64,
340 lsn: u64,
341 timestamp_ms: u64,
342 data: &[u8],
343 ) -> io::Result<()> {
344 if data.len() > u32::MAX as usize {
345 return Err(io::Error::new(
346 io::ErrorKind::InvalidInput,
347 format!(
348 "logical WAL payload of {} bytes exceeds 4 GiB framing limit",
349 data.len()
350 ),
351 ));
352 }
353 let mut file = OpenOptions::new()
354 .create(true)
355 .append(true)
356 .open(&self.path)?;
357 let mut frame = Vec::with_capacity(
367 LOGICAL_WAL_V3_HEADER_LEN as usize + data.len() + LOGICAL_WAL_V2_CRC_LEN as usize,
368 );
369 frame.extend_from_slice(LOGICAL_WAL_SPOOL_MAGIC);
370 frame.push(LOGICAL_WAL_SPOOL_VERSION_CURRENT);
371 frame.extend_from_slice(&term.to_le_bytes());
372 frame.extend_from_slice(&lsn.to_le_bytes());
373 frame.extend_from_slice(×tamp_ms.to_le_bytes());
374 frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
375 frame.extend_from_slice(data);
376 let crc = compute_logical_v3_crc(
377 LOGICAL_WAL_SPOOL_VERSION_CURRENT,
378 term,
379 lsn,
380 timestamp_ms,
381 data,
382 );
383 frame.extend_from_slice(&crc.to_le_bytes());
384
385 file.write_all(&frame)?;
386 file.sync_all()?;
391
392 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
393 state.current_lsn = state.current_lsn.max(lsn);
394 let record_start = state.write_offset;
398 let ordinal = state.record_count;
399 state.note_record(ordinal, lsn, record_start);
400 state.write_offset = record_start + frame.len() as u64;
401 state.record_count = ordinal + 1;
402 Ok(())
403 }
404
405 pub fn read_since(&self, since_lsn: u64, max_count: usize) -> io::Result<Vec<(u64, Vec<u8>)>> {
406 let start_offset = {
412 let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
413 state.seek_floor_offset(since_lsn)
414 };
415 let entries = read_entries_from(&self.path, start_offset)?;
416 Ok(entries
417 .into_iter()
418 .filter(|entry| entry.lsn > since_lsn)
419 .take(max_count)
420 .map(|entry| (entry.lsn, entry.data_with_framing_term()))
421 .collect())
422 }
423
424 #[cfg(test)]
428 fn seek_floor_offset(&self, since_lsn: u64) -> u64 {
429 self.state
430 .lock()
431 .unwrap_or_else(|e| e.into_inner())
432 .seek_floor_offset(since_lsn)
433 }
434
435 pub fn current_lsn(&self) -> u64 {
436 self.state
437 .lock()
438 .unwrap_or_else(|e| e.into_inner())
439 .current_lsn
440 }
441
442 pub fn oldest_lsn(&self) -> io::Result<Option<u64>> {
443 Ok(read_and_repair_entries(&self.path)?
444 .into_iter()
445 .next()
446 .map(|entry| entry.lsn))
447 }
448
449 pub fn prune_through(&self, upto_lsn: u64) -> io::Result<()> {
450 let previous_lsn = self.current_lsn();
451 let retained: Vec<_> = read_and_repair_entries(&self.path)?
452 .into_iter()
453 .filter(|entry| entry.lsn > upto_lsn)
454 .collect();
455 let temp_path = self.path.with_extension("logical.wal.tmp");
456 let mut temp = File::create(&temp_path)?;
457 let mut current_lsn = 0;
458 for entry in retained {
459 let timestamp_ms = if entry.timestamp_ms > 0 {
467 entry.timestamp_ms
468 } else {
469 SystemTime::now()
470 .duration_since(UNIX_EPOCH)
471 .map(|d| d.as_millis() as u64)
472 .unwrap_or(0)
473 };
474 let crc = compute_logical_v3_crc(
475 LOGICAL_WAL_SPOOL_VERSION_CURRENT,
476 entry.term,
477 entry.lsn,
478 timestamp_ms,
479 &entry.data,
480 );
481 temp.write_all(LOGICAL_WAL_SPOOL_MAGIC)?;
482 temp.write_all(&[LOGICAL_WAL_SPOOL_VERSION_CURRENT])?;
483 temp.write_all(&entry.term.to_le_bytes())?;
484 temp.write_all(&entry.lsn.to_le_bytes())?;
485 temp.write_all(×tamp_ms.to_le_bytes())?;
486 temp.write_all(&(entry.data.len() as u32).to_le_bytes())?;
487 temp.write_all(&entry.data)?;
488 temp.write_all(&crc.to_le_bytes())?;
489 current_lsn = current_lsn.max(entry.lsn);
490 }
491 temp.sync_all()?;
492 fs::rename(&temp_path, &self.path)?;
493
494 let (seek_index, write_offset, record_count) = build_seek_index(&self.path)?;
497 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
498 state.current_lsn = previous_lsn.max(current_lsn).max(upto_lsn);
499 state.seek_index = seek_index;
500 state.write_offset = write_offset;
501 state.record_count = record_count;
502 Ok(())
503 }
504}
505
506fn read_and_repair_entries(path: &Path) -> io::Result<Vec<LogicalWalEntry>> {
531 if !path.exists() {
532 return Ok(Vec::new());
533 }
534
535 let mut file = OpenOptions::new().read(true).write(true).open(path)?;
536 let mut entries = Vec::new();
537 let mut last_good_offset: u64 = 0;
538 let mut corrupt_reason: Option<String> = None;
539
540 loop {
541 let record_start = file.stream_position()?;
542
543 let mut magic = [0u8; 4];
544 match file.read_exact(&mut magic) {
545 Ok(()) => {}
546 Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
547 Err(err) => return Err(err),
548 }
549 if &magic != LOGICAL_WAL_SPOOL_MAGIC {
550 corrupt_reason = Some(format!(
551 "bad magic at offset {record_start}: got {magic:02x?}"
552 ));
553 break;
554 }
555
556 let mut version = [0u8; 1];
557 if let Err(err) = file.read_exact(&mut version) {
558 if err.kind() == io::ErrorKind::UnexpectedEof {
559 corrupt_reason = Some(format!("torn header at offset {record_start}"));
560 break;
561 }
562 return Err(err);
563 }
564
565 let entry_result = match version[0] {
566 LOGICAL_WAL_SPOOL_VERSION_V3 => read_one_v3(&mut file, record_start),
567 LOGICAL_WAL_SPOOL_VERSION_V2 => read_one_v2(&mut file, record_start),
568 LOGICAL_WAL_SPOOL_VERSION_V1 => read_one_v1(&mut file, record_start),
569 other => {
570 corrupt_reason = Some(format!(
571 "unsupported version {other} at offset {record_start}"
572 ));
573 break;
574 }
575 };
576
577 match entry_result {
578 Ok(entry) => {
579 entries.push(entry);
580 last_good_offset = file.stream_position()?;
581 }
582 Err(reason) => {
583 corrupt_reason = Some(reason);
584 break;
585 }
586 }
587 }
588
589 if let Some(reason) = corrupt_reason {
590 let total_len = file.metadata()?.len();
591 if last_good_offset < total_len {
592 warn!(
593 target: "reddb::replication::logical_wal",
594 path = %path.display(),
595 reason = %reason,
596 truncating_from = last_good_offset,
597 truncating_to = total_len,
598 kept_records = entries.len(),
599 "truncating logical-WAL spool to last valid record"
600 );
601 file.set_len(last_good_offset)?;
602 file.sync_all()?;
603 }
604 }
605
606 Ok(entries)
607}
608
609fn read_frame(file: &mut File, record_start: u64) -> io::Result<Option<LogicalWalEntry>> {
618 let mut magic = [0u8; 4];
619 match file.read_exact(&mut magic) {
620 Ok(()) => {}
621 Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
622 Err(err) => return Err(err),
623 }
624 if &magic != LOGICAL_WAL_SPOOL_MAGIC {
625 return Ok(None);
626 }
627 let mut version = [0u8; 1];
628 match file.read_exact(&mut version) {
629 Ok(()) => {}
630 Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
631 Err(err) => return Err(err),
632 }
633 let entry = match version[0] {
634 LOGICAL_WAL_SPOOL_VERSION_V3 => read_one_v3(file, record_start),
635 LOGICAL_WAL_SPOOL_VERSION_V2 => read_one_v2(file, record_start),
636 LOGICAL_WAL_SPOOL_VERSION_V1 => read_one_v1(file, record_start),
637 _ => return Ok(None),
638 };
639 Ok(entry.ok())
640}
641
642fn read_entries_from(path: &Path, start_offset: u64) -> io::Result<Vec<LogicalWalEntry>> {
646 if !path.exists() {
647 return Ok(Vec::new());
648 }
649 let mut file = OpenOptions::new().read(true).open(path)?;
650 file.seek(SeekFrom::Start(start_offset))?;
651 let mut entries = Vec::new();
652 loop {
653 let record_start = file.stream_position()?;
654 match read_frame(&mut file, record_start)? {
655 Some(entry) => entries.push(entry),
656 None => break,
657 }
658 }
659 Ok(entries)
660}
661
662fn build_seek_index(path: &Path) -> io::Result<(Vec<(u64, u64)>, u64, u64)> {
667 if !path.exists() {
668 return Ok((Vec::new(), 0, 0));
669 }
670 let mut file = OpenOptions::new().read(true).open(path)?;
671 let mut index = Vec::new();
672 let mut ordinal: u64 = 0;
673 let mut write_offset: u64 = 0;
674 loop {
675 let record_start = file.stream_position()?;
676 match read_frame(&mut file, record_start)? {
677 Some(entry) => {
678 if ordinal.is_multiple_of(SEEK_INDEX_INTERVAL)
679 && index.last().map(|(l, _)| *l) != Some(entry.lsn)
680 {
681 index.push((entry.lsn, record_start));
682 }
683 ordinal += 1;
684 write_offset = file.stream_position()?;
685 }
686 None => break,
687 }
688 }
689 Ok((index, write_offset, ordinal))
690}
691
692fn read_one_v3(file: &mut File, record_start: u64) -> Result<LogicalWalEntry, String> {
695 let mut term = [0u8; 8];
696 if let Err(err) = file.read_exact(&mut term) {
697 return Err(format!("torn term at offset {record_start}: {err}"));
698 }
699 let mut lsn = [0u8; 8];
700 if let Err(err) = file.read_exact(&mut lsn) {
701 return Err(format!("torn lsn at offset {record_start}: {err}"));
702 }
703 let mut timestamp = [0u8; 8];
704 if let Err(err) = file.read_exact(&mut timestamp) {
705 return Err(format!("torn timestamp at offset {record_start}: {err}"));
706 }
707 let mut len_bytes = [0u8; 4];
708 if let Err(err) = file.read_exact(&mut len_bytes) {
709 return Err(format!(
710 "torn payload length at offset {record_start}: {err}"
711 ));
712 }
713 let payload_len = u32::from_le_bytes(len_bytes) as usize;
714 const MAX_PLAUSIBLE_PAYLOAD: usize = 256 * 1024 * 1024;
715 if payload_len > MAX_PLAUSIBLE_PAYLOAD {
716 return Err(format!(
717 "implausible payload_len {payload_len} at offset {record_start}"
718 ));
719 }
720 let mut payload = vec![0u8; payload_len];
721 if let Err(err) = file.read_exact(&mut payload) {
722 return Err(format!(
723 "torn payload at offset {record_start} (expected {payload_len} bytes): {err}"
724 ));
725 }
726 let mut crc_bytes = [0u8; 4];
727 if let Err(err) = file.read_exact(&mut crc_bytes) {
728 return Err(format!("torn crc at offset {record_start}: {err}"));
729 }
730 let stored_crc = u32::from_le_bytes(crc_bytes);
731 let term = u64::from_le_bytes(term);
732 let lsn = u64::from_le_bytes(lsn);
733 let timestamp = u64::from_le_bytes(timestamp);
734 let expected_crc =
735 compute_logical_v3_crc(LOGICAL_WAL_SPOOL_VERSION_V3, term, lsn, timestamp, &payload);
736 if stored_crc != expected_crc {
737 return Err(format!(
738 "crc mismatch at offset {record_start}: stored {stored_crc:#010x}, expected {expected_crc:#010x}"
739 ));
740 }
741 Ok(LogicalWalEntry {
742 term,
743 lsn,
744 timestamp_ms: timestamp,
745 data: payload,
746 })
747}
748
749fn read_one_v2(file: &mut File, record_start: u64) -> Result<LogicalWalEntry, String> {
753 let mut lsn = [0u8; 8];
754 if let Err(err) = file.read_exact(&mut lsn) {
755 return Err(format!("torn lsn at offset {record_start}: {err}"));
756 }
757 let mut timestamp = [0u8; 8];
758 if let Err(err) = file.read_exact(&mut timestamp) {
759 return Err(format!("torn timestamp at offset {record_start}: {err}"));
760 }
761 let mut len_bytes = [0u8; 4];
762 if let Err(err) = file.read_exact(&mut len_bytes) {
763 return Err(format!(
764 "torn payload length at offset {record_start}: {err}"
765 ));
766 }
767 let payload_len = u32::from_le_bytes(len_bytes) as usize;
768 const MAX_PLAUSIBLE_PAYLOAD: usize = 256 * 1024 * 1024;
773 if payload_len > MAX_PLAUSIBLE_PAYLOAD {
774 return Err(format!(
775 "implausible payload_len {payload_len} at offset {record_start}"
776 ));
777 }
778 let mut payload = vec![0u8; payload_len];
779 if let Err(err) = file.read_exact(&mut payload) {
780 return Err(format!(
781 "torn payload at offset {record_start} (expected {payload_len} bytes): {err}"
782 ));
783 }
784 let mut crc_bytes = [0u8; 4];
785 if let Err(err) = file.read_exact(&mut crc_bytes) {
786 return Err(format!("torn crc at offset {record_start}: {err}"));
787 }
788 let stored_crc = u32::from_le_bytes(crc_bytes);
789 let expected_crc = compute_logical_v2_crc(
790 LOGICAL_WAL_SPOOL_VERSION_V2,
791 u64::from_le_bytes(lsn),
792 u64::from_le_bytes(timestamp),
793 &payload,
794 );
795 if stored_crc != expected_crc {
796 return Err(format!(
797 "crc mismatch at offset {record_start}: stored {stored_crc:#010x}, expected {expected_crc:#010x}"
798 ));
799 }
800 let term = term_from_payload(&payload);
801 Ok(LogicalWalEntry {
802 term,
803 lsn: u64::from_le_bytes(lsn),
804 timestamp_ms: u64::from_le_bytes(timestamp),
805 data: payload,
806 })
807}
808
809fn read_one_v1(file: &mut File, record_start: u64) -> Result<LogicalWalEntry, String> {
814 let mut lsn = [0u8; 8];
815 if let Err(err) = file.read_exact(&mut lsn) {
816 return Err(format!("v1 torn lsn at offset {record_start}: {err}"));
817 }
818 let mut len_bytes = [0u8; 8];
819 if let Err(err) = file.read_exact(&mut len_bytes) {
820 return Err(format!(
821 "v1 torn payload length at offset {record_start}: {err}"
822 ));
823 }
824 let payload_len = u64::from_le_bytes(len_bytes) as usize;
825 if payload_len > 256 * 1024 * 1024 {
826 return Err(format!(
827 "v1 implausible payload_len {payload_len} at offset {record_start}"
828 ));
829 }
830 let mut payload = vec![0u8; payload_len];
831 if let Err(err) = file.read_exact(&mut payload) {
832 return Err(format!("v1 torn payload at offset {record_start}: {err}"));
833 }
834 let term = term_from_payload(&payload);
835 Ok(LogicalWalEntry {
836 term,
837 lsn: u64::from_le_bytes(lsn),
838 timestamp_ms: 0,
839 data: payload,
840 })
841}
842
843#[derive(Debug, Clone, Copy, PartialEq, Eq)]
844pub enum SlotInvalidationCause {
845 WalRemoved,
846 Horizon,
847 IdleTimeout,
848}
849
850impl SlotInvalidationCause {
851 pub fn as_str(self) -> &'static str {
852 match self {
853 Self::WalRemoved => "wal-removed",
854 Self::Horizon => "horizon",
855 Self::IdleTimeout => "idle-timeout",
856 }
857 }
858
859 fn from_str(value: &str) -> Option<Self> {
860 match value {
861 "wal-removed" => Some(Self::WalRemoved),
862 "horizon" => Some(Self::Horizon),
863 "idle-timeout" => Some(Self::IdleTimeout),
864 _ => None,
865 }
866 }
867}
868
869#[derive(Debug, Clone)]
870pub struct ReplicationSlot {
871 pub id: String,
872 pub restart_lsn: u64,
873 pub confirmed_lsn: u64,
874 pub last_seen_at_unix_ms: u128,
875 pub invalidation_reason: Option<SlotInvalidationCause>,
876 pub invalidated_at_unix_ms: Option<u128>,
877}
878
879fn load_replication_slots(path: Option<&Path>, now_ms: u128) -> BTreeMap<String, ReplicationSlot> {
880 let Some(path) = path else {
881 return BTreeMap::new();
882 };
883 let bytes = match fs::read(path) {
884 Ok(bytes) => bytes,
885 Err(err) if err.kind() == io::ErrorKind::NotFound => return BTreeMap::new(),
886 Err(err) => {
887 warn!(
888 target: "reddb::replication::slots",
889 path = %path.display(),
890 error = %err,
891 "failed to read replication slot store"
892 );
893 return BTreeMap::new();
894 }
895 };
896 match crate::serde_json::from_slice::<crate::serde_json::Value>(&bytes) {
897 Ok(value) => value
898 .get("slots")
899 .and_then(crate::serde_json::Value::as_array)
900 .unwrap_or(&[])
901 .iter()
902 .filter_map(|value| {
903 let object = value.as_object()?;
904 let id = object.get("id")?.as_str()?.to_string();
905 let restart_lsn = object.get("restart_lsn")?.as_u64()?;
906 let confirmed_lsn = object.get("confirmed_lsn")?.as_u64()?;
907 let last_seen_at_unix_ms = object
908 .get("last_seen_at_unix_ms")
909 .and_then(crate::serde_json::Value::as_u64)
910 .map(u128::from)
911 .unwrap_or(now_ms);
912 let invalidation_reason = object
913 .get("invalidation_reason")
914 .and_then(crate::serde_json::Value::as_str)
915 .and_then(SlotInvalidationCause::from_str);
916 let invalidated_at_unix_ms = object
917 .get("invalidated_at_unix_ms")
918 .and_then(crate::serde_json::Value::as_u64)
919 .map(u128::from);
920 Some((
921 id.clone(),
922 ReplicationSlot {
923 id,
924 restart_lsn,
925 confirmed_lsn,
926 last_seen_at_unix_ms,
927 invalidation_reason,
928 invalidated_at_unix_ms,
929 },
930 ))
931 })
932 .collect(),
933 Err(err) => {
934 warn!(
935 target: "reddb::replication::slots",
936 path = %path.display(),
937 error = %err,
938 "failed to decode replication slot store"
939 );
940 BTreeMap::new()
941 }
942 }
943}
944
945fn persist_replication_slots(
946 path: Option<&Path>,
947 slots: &BTreeMap<String, ReplicationSlot>,
948) -> io::Result<()> {
949 let Some(path) = path else {
950 return Ok(());
951 };
952 if let Some(parent) = path.parent() {
953 fs::create_dir_all(parent)?;
954 }
955 let temp_path = path.with_extension("logical.slots.tmp");
956 let slots_json = slots
957 .values()
958 .map(|slot| {
959 let mut object = crate::serde_json::Map::new();
960 object.insert(
961 "id".to_string(),
962 crate::serde_json::Value::String(slot.id.clone()),
963 );
964 object.insert(
965 "restart_lsn".to_string(),
966 crate::serde_json::Value::Number(slot.restart_lsn as f64),
967 );
968 object.insert(
969 "confirmed_lsn".to_string(),
970 crate::serde_json::Value::Number(slot.confirmed_lsn as f64),
971 );
972 object.insert(
973 "last_seen_at_unix_ms".to_string(),
974 crate::serde_json::Value::Number(slot.last_seen_at_unix_ms as f64),
975 );
976 if let Some(reason) = slot.invalidation_reason {
977 object.insert(
978 "invalidation_reason".to_string(),
979 crate::serde_json::Value::String(reason.as_str().to_string()),
980 );
981 }
982 if let Some(invalidated_at) = slot.invalidated_at_unix_ms {
983 object.insert(
984 "invalidated_at_unix_ms".to_string(),
985 crate::serde_json::Value::Number(invalidated_at as f64),
986 );
987 }
988 crate::serde_json::Value::Object(object)
989 })
990 .collect();
991 let mut root = crate::serde_json::Map::new();
992 root.insert(
993 "slots".to_string(),
994 crate::serde_json::Value::Array(slots_json),
995 );
996 let value = crate::serde_json::Value::Object(root);
997 let bytes = crate::serde_json::to_string_pretty(&value)
998 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
999 let mut temp = File::create(&temp_path)?;
1000 temp.write_all(bytes.as_bytes())?;
1001 temp.sync_all()?;
1002 fs::rename(&temp_path, path)?;
1003 Ok(())
1004}
1005
1006#[derive(Debug, Clone)]
1012pub struct ReplicaState {
1013 pub id: String,
1014 pub last_acked_lsn: u64,
1015 pub last_sent_lsn: u64,
1016 pub last_durable_lsn: u64,
1017 pub apply_error_count: u64,
1018 pub divergence_count: u64,
1019 pub connected_at_unix_ms: u128,
1020 pub last_seen_at_unix_ms: u128,
1021 pub region: Option<String>,
1026 pub rebootstrapping: bool,
1033}
1034
1035#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1037pub struct ReplicationProgress {
1038 pub lag_lsn: u64,
1039 pub safe_replay_lsn: u64,
1040}
1041
1042impl ReplicationProgress {
1043 pub fn from_replicas(replicas: &[ReplicaState]) -> Option<Self> {
1044 let max_sent_lsn = replicas.iter().map(|replica| replica.last_sent_lsn).max()?;
1045 let min_acked_lsn = replicas
1046 .iter()
1047 .map(|replica| replica.last_acked_lsn)
1048 .min()?;
1049 let safe_replay_lsn = replicas
1050 .iter()
1051 .map(|replica| replica.last_durable_lsn)
1052 .min()?;
1053
1054 Some(Self {
1055 lag_lsn: max_sent_lsn.saturating_sub(min_acked_lsn),
1056 safe_replay_lsn,
1057 })
1058 }
1059}
1060
1061pub struct PrimaryReplication {
1063 pub wal_buffer: Arc<WalBuffer>,
1064 pub logical_wal_spool: Option<Arc<LogicalWalSpool>>,
1065 pub replicas: RwLock<Vec<ReplicaState>>,
1066 wal_appended: (Mutex<u64>, Condvar),
1067 slot_path: Option<PathBuf>,
1068 slots: RwLock<BTreeMap<String, ReplicationSlot>>,
1069 slot_retention_max_lag_lsn: u64,
1070 slot_idle_timeout_ms: u64,
1071 pub commit_waiter: Arc<crate::replication::commit_waiter::CommitWaiter>,
1075 topology_epoch: std::sync::atomic::AtomicU64,
1082 partial_resync_count: std::sync::atomic::AtomicU64,
1088}
1089
1090#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1092pub enum ResumeMode {
1093 PartialResync { resume_lsn: u64 },
1097 FullRebootstrap { cause: SlotInvalidationCause },
1100}
1101
1102impl PrimaryReplication {
1103 pub fn slot_path_for(data_path: &Path) -> PathBuf {
1104 let file_name = data_path
1105 .file_name()
1106 .and_then(|name| name.to_str())
1107 .unwrap_or("reddb.rdb");
1108 let slot_name = format!("{file_name}.logical.slots.json");
1109 match data_path.parent() {
1110 Some(parent) => parent.join(slot_name),
1111 None => PathBuf::from(slot_name),
1112 }
1113 }
1114
1115 pub fn new(data_path: Option<&Path>) -> Self {
1116 Self::new_with_config(data_path, &crate::replication::ReplicationConfig::primary())
1117 }
1118
1119 pub fn new_with_config(
1120 data_path: Option<&Path>,
1121 config: &crate::replication::ReplicationConfig,
1122 ) -> Self {
1123 let now_ms = crate::utils::now_unix_millis() as u128;
1124 let slot_path = data_path.map(Self::slot_path_for);
1125 let slots = load_replication_slots(slot_path.as_deref(), now_ms);
1126 let logical_wal_spool = data_path
1127 .and_then(|path| LogicalWalSpool::open(path).ok())
1128 .map(Arc::new);
1129 let current_lsn = logical_wal_spool
1130 .as_ref()
1131 .map(|spool| spool.current_lsn())
1132 .unwrap_or(0);
1133 Self {
1134 wal_buffer: Arc::new(WalBuffer::new(100_000)),
1135 logical_wal_spool,
1136 replicas: RwLock::new(Vec::new()),
1137 wal_appended: (Mutex::new(current_lsn), Condvar::new()),
1138 slot_path,
1139 slots: RwLock::new(slots),
1140 slot_retention_max_lag_lsn: config.slot_retention_max_lag_lsn,
1141 slot_idle_timeout_ms: config.slot_idle_timeout_ms,
1142 commit_waiter: Arc::new(crate::replication::commit_waiter::CommitWaiter::new()),
1143 topology_epoch: std::sync::atomic::AtomicU64::new(0),
1144 partial_resync_count: std::sync::atomic::AtomicU64::new(0),
1145 }
1146 }
1147
1148 pub fn append_logical_record(&self, lsn: u64, encoded: Vec<u8>) {
1149 self.wal_buffer.append(lsn, encoded.clone());
1150 if let Some(spool) = &self.logical_wal_spool {
1151 let _ = spool.append(lsn, &encoded);
1152 }
1153 let (lock, cvar) = &self.wal_appended;
1154 let mut latest = lock.lock().unwrap_or_else(|e| e.into_inner());
1155 *latest = (*latest).max(lsn);
1156 cvar.notify_all();
1157 }
1158
1159 pub fn wait_for_logical_lsn_after(&self, since_lsn: u64, timeout: Duration) -> bool {
1160 if self.current_logical_lsn() > since_lsn {
1161 return true;
1162 }
1163 let deadline = Instant::now() + timeout;
1164 let (lock, cvar) = &self.wal_appended;
1165 let mut latest = lock.lock().unwrap_or_else(|e| e.into_inner());
1166 while *latest <= since_lsn {
1167 let now = Instant::now();
1168 if now >= deadline {
1169 return false;
1170 }
1171 let remaining = deadline.saturating_duration_since(now);
1172 let (guard, result) = cvar
1173 .wait_timeout(latest, remaining)
1174 .unwrap_or_else(|e| e.into_inner());
1175 latest = guard;
1176 if result.timed_out() && *latest <= since_lsn {
1177 return false;
1178 }
1179 }
1180 true
1181 }
1182
1183 pub fn register_replica(&self, id: String) -> u64 {
1184 self.register_replica_with_region(id, None)
1185 }
1186
1187 pub fn register_replica_with_region(&self, id: String, region: Option<String>) -> u64 {
1204 let now_ms = crate::utils::now_unix_millis() as u128;
1205 let resume_lsn = self.ensure_slot(&id, self.current_logical_lsn());
1206 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
1207 if let Some(existing) = replicas.iter_mut().find(|r| r.id == id) {
1208 existing.last_seen_at_unix_ms = now_ms;
1209 if region.is_some() {
1210 existing.region = region;
1211 }
1212 return resume_lsn;
1213 }
1214 replicas.push(ReplicaState {
1215 id,
1216 last_acked_lsn: resume_lsn,
1217 last_sent_lsn: resume_lsn,
1218 last_durable_lsn: resume_lsn,
1219 apply_error_count: 0,
1220 divergence_count: 0,
1221 connected_at_unix_ms: now_ms,
1222 last_seen_at_unix_ms: now_ms,
1223 region,
1224 rebootstrapping: false,
1225 });
1226 drop(replicas);
1227 self.bump_topology_epoch();
1228 resume_lsn
1229 }
1230
1231 pub fn set_replica_rebootstrapping(&self, id: &str, rebootstrapping: bool) -> bool {
1246 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
1247 let Some(state) = replicas.iter_mut().find(|r| r.id == id) else {
1248 return false;
1249 };
1250 if state.rebootstrapping == rebootstrapping {
1251 return true;
1252 }
1253 state.rebootstrapping = rebootstrapping;
1254 drop(replicas);
1255 self.bump_topology_epoch();
1256 true
1257 }
1258
1259 pub fn ensure_replica_registered(&self, id: &str) -> bool {
1268 let already = self
1269 .replicas
1270 .read()
1271 .unwrap_or_else(|e| e.into_inner())
1272 .iter()
1273 .any(|r| r.id == id);
1274 if already {
1275 return false;
1276 }
1277 self.register_replica(id.to_string());
1278 true
1279 }
1280
1281 pub fn unregister_replica(&self, id: &str) -> bool {
1285 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
1286 let before = replicas.len();
1287 replicas.retain(|r| r.id != id);
1288 let removed = replicas.len() != before;
1289 drop(replicas);
1290 if removed {
1291 self.commit_waiter.drop_replica(id);
1292 self.bump_topology_epoch();
1293 }
1294 removed
1295 }
1296
1297 pub fn topology_epoch(&self) -> u64 {
1300 self.topology_epoch
1301 .load(std::sync::atomic::Ordering::Relaxed)
1302 }
1303
1304 pub fn bump_topology_epoch(&self) {
1312 self.topology_epoch
1313 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1314 }
1315
1316 pub fn ack_replica(&self, id: &str, lsn: u64) {
1317 let now_ms = crate::utils::now_unix_millis() as u128;
1318 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
1319 if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
1320 r.last_acked_lsn = r.last_acked_lsn.max(lsn);
1321 r.last_durable_lsn = r.last_durable_lsn.max(lsn);
1322 r.last_seen_at_unix_ms = now_ms;
1323 }
1324 drop(replicas);
1325 self.commit_waiter.record_replica_ack(id, lsn);
1326 }
1327
1328 pub fn ack_replica_lsn(&self, id: &str, applied_lsn: u64, durable_lsn: u64) {
1334 self.ack_replica_lsn_with_observability(id, applied_lsn, durable_lsn, 0, 0);
1335 }
1336
1337 pub fn ack_replica_lsn_with_observability(
1338 &self,
1339 id: &str,
1340 applied_lsn: u64,
1341 durable_lsn: u64,
1342 apply_error_count: u64,
1343 divergence_count: u64,
1344 ) {
1345 let now_ms = crate::utils::now_unix_millis() as u128;
1346 self.advance_slot(id, applied_lsn, durable_lsn, now_ms);
1347 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
1348 if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
1349 r.last_acked_lsn = r.last_acked_lsn.max(applied_lsn);
1350 r.last_durable_lsn = r.last_durable_lsn.max(durable_lsn);
1351 r.apply_error_count = r.apply_error_count.max(apply_error_count);
1352 r.divergence_count = r.divergence_count.max(divergence_count);
1353 r.last_seen_at_unix_ms = now_ms;
1354 }
1355 drop(replicas);
1359 self.commit_waiter.record_replica_ack(id, durable_lsn);
1360 }
1361
1362 pub fn note_replica_pull(&self, id: &str, last_sent_lsn: u64) {
1367 let now_ms = crate::utils::now_unix_millis() as u128;
1368 self.touch_slot(id, now_ms);
1369 let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
1370 if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
1371 r.last_sent_lsn = r.last_sent_lsn.max(last_sent_lsn);
1372 r.last_seen_at_unix_ms = now_ms;
1373 }
1374 }
1375
1376 pub fn replica_snapshots(&self) -> Vec<ReplicaState> {
1380 self.replicas
1381 .read()
1382 .unwrap_or_else(|e| e.into_inner())
1383 .clone()
1384 }
1385
1386 pub fn replication_progress(&self) -> Option<ReplicationProgress> {
1387 let replicas = self.replicas.read().unwrap_or_else(|e| e.into_inner());
1388 ReplicationProgress::from_replicas(&replicas)
1389 }
1390
1391 pub fn slot_snapshots(&self) -> Vec<ReplicationSlot> {
1392 self.slots
1393 .read()
1394 .unwrap_or_else(|e| e.into_inner())
1395 .values()
1396 .cloned()
1397 .collect()
1398 }
1399
1400 pub fn retention_floor_lsn(&self) -> Option<u64> {
1401 self.slots
1402 .read()
1403 .unwrap_or_else(|e| e.into_inner())
1404 .values()
1405 .filter(|slot| slot.invalidation_reason.is_none())
1406 .map(|slot| slot.restart_lsn)
1407 .min()
1408 }
1409
1410 pub fn prune_retained_wal_through(&self, archived_lsn: u64) -> io::Result<u64> {
1411 self.enforce_retention_limits(crate::utils::now_unix_millis() as u128);
1412 let prune_lsn = self
1413 .retention_floor_lsn()
1414 .map(|floor| floor.min(archived_lsn))
1415 .unwrap_or(archived_lsn);
1416 if prune_lsn > 0 {
1417 if let Some(spool) = &self.logical_wal_spool {
1418 spool.prune_through(prune_lsn)?;
1419 }
1420 self.wal_buffer.prune_through(prune_lsn);
1421 }
1422 Ok(prune_lsn)
1423 }
1424
1425 pub fn replica_count(&self) -> usize {
1426 self.replicas
1427 .read()
1428 .unwrap_or_else(|e| e.into_inner())
1429 .len()
1430 }
1431
1432 pub fn current_logical_lsn(&self) -> u64 {
1436 self.logical_wal_spool
1437 .as_ref()
1438 .map(|spool| spool.current_lsn())
1439 .unwrap_or_else(|| self.wal_buffer.current_lsn())
1440 }
1441
1442 fn ensure_slot(&self, id: &str, initial_lsn: u64) -> u64 {
1443 let now_ms = crate::utils::now_unix_millis() as u128;
1444 let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
1445 if let Some(slot) = slots.get_mut(id) {
1446 slot.last_seen_at_unix_ms = now_ms;
1447 let restart_lsn = slot.restart_lsn;
1448 self.persist_slots_locked(&slots);
1449 return restart_lsn;
1450 }
1451 slots.insert(
1452 id.to_string(),
1453 ReplicationSlot {
1454 id: id.to_string(),
1455 restart_lsn: initial_lsn,
1456 confirmed_lsn: initial_lsn,
1457 last_seen_at_unix_ms: now_ms,
1458 invalidation_reason: None,
1459 invalidated_at_unix_ms: None,
1460 },
1461 );
1462 let restart_lsn = initial_lsn;
1463 self.persist_slots_locked(&slots);
1464 restart_lsn
1465 }
1466
1467 fn advance_slot(&self, id: &str, confirmed_lsn: u64, restart_lsn: u64, now_ms: u128) {
1468 let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
1469 let slot = slots
1470 .entry(id.to_string())
1471 .or_insert_with(|| ReplicationSlot {
1472 id: id.to_string(),
1473 restart_lsn: 0,
1474 confirmed_lsn: 0,
1475 last_seen_at_unix_ms: now_ms,
1476 invalidation_reason: None,
1477 invalidated_at_unix_ms: None,
1478 });
1479 if slot.invalidation_reason.is_some() {
1480 return;
1481 }
1482 slot.confirmed_lsn = slot.confirmed_lsn.max(confirmed_lsn).max(restart_lsn);
1483 slot.restart_lsn = slot.restart_lsn.max(restart_lsn);
1484 slot.last_seen_at_unix_ms = now_ms;
1485 self.persist_slots_locked(&slots);
1486 }
1487
1488 pub fn touch_slot(&self, id: &str, now_ms: u128) {
1489 let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
1490 let mut changed = false;
1491 if let Some(slot) = slots.get_mut(id) {
1492 if slot.invalidation_reason.is_none() {
1493 slot.last_seen_at_unix_ms = now_ms;
1494 changed = true;
1495 }
1496 }
1497 if changed {
1498 self.persist_slots_locked(&slots);
1499 }
1500 }
1501
1502 pub fn enforce_retention_limits(&self, now_ms: u128) -> Vec<(String, SlotInvalidationCause)> {
1503 let current_lsn = self.current_logical_lsn();
1504 let mut invalidated = Vec::new();
1505 let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
1506 for slot in slots.values_mut() {
1507 if slot.invalidation_reason.is_some() {
1508 continue;
1509 }
1510 let reason = if self.slot_retention_max_lag_lsn > 0
1511 && current_lsn.saturating_sub(slot.restart_lsn) > self.slot_retention_max_lag_lsn
1512 {
1513 Some(SlotInvalidationCause::Horizon)
1514 } else if self.slot_idle_timeout_ms > 0
1515 && now_ms.saturating_sub(slot.last_seen_at_unix_ms)
1516 > u128::from(self.slot_idle_timeout_ms)
1517 {
1518 Some(SlotInvalidationCause::IdleTimeout)
1519 } else {
1520 None
1521 };
1522 if let Some(reason) = reason {
1523 slot.invalidation_reason = Some(reason);
1524 slot.invalidated_at_unix_ms = Some(now_ms);
1525 invalidated.push((slot.id.clone(), reason));
1526 }
1527 }
1528 if !invalidated.is_empty() {
1529 self.persist_slots_locked(&slots);
1530 }
1531 invalidated
1532 }
1533
1534 pub fn slot_rebootstrap_reason(
1535 &self,
1536 id: &str,
1537 requested_since_lsn: u64,
1538 oldest_available_lsn: Option<u64>,
1539 ) -> Option<SlotInvalidationCause> {
1540 let now_ms = crate::utils::now_unix_millis() as u128;
1541 let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
1542 let slot = slots.get_mut(id)?;
1543 if let Some(reason) = slot.invalidation_reason {
1544 return Some(reason);
1545 }
1546 let slot_floor = slot.restart_lsn.max(requested_since_lsn);
1547 if oldest_available_lsn
1548 .map(|oldest| oldest > slot_floor.saturating_add(1))
1549 .unwrap_or(false)
1550 {
1551 slot.invalidation_reason = Some(SlotInvalidationCause::WalRemoved);
1552 slot.invalidated_at_unix_ms = Some(now_ms);
1553 self.persist_slots_locked(&slots);
1554 return Some(SlotInvalidationCause::WalRemoved);
1555 }
1556 None
1557 }
1558
1559 pub fn plan_replica_resume(
1568 &self,
1569 id: &str,
1570 requested_since_lsn: u64,
1571 oldest_available_lsn: Option<u64>,
1572 ) -> ResumeMode {
1573 if let Some(cause) =
1574 self.slot_rebootstrap_reason(id, requested_since_lsn, oldest_available_lsn)
1575 {
1576 return ResumeMode::FullRebootstrap { cause };
1577 }
1578 let resume_lsn = self
1579 .slot_snapshots()
1580 .into_iter()
1581 .find(|slot| slot.id == id)
1582 .map(|slot| requested_since_lsn.max(slot.restart_lsn))
1583 .unwrap_or(requested_since_lsn);
1584 self.partial_resync_count
1585 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1586 ResumeMode::PartialResync { resume_lsn }
1587 }
1588
1589 pub fn partial_resync_count(&self) -> u64 {
1592 self.partial_resync_count
1593 .load(std::sync::atomic::Ordering::Relaxed)
1594 }
1595
1596 fn persist_slots_locked(&self, slots: &BTreeMap<String, ReplicationSlot>) {
1597 if let Err(err) = persist_replication_slots(self.slot_path.as_deref(), slots) {
1598 warn!(
1599 target: "reddb::replication::slots",
1600 error = %err,
1601 "failed to persist replication slots"
1602 );
1603 }
1604 }
1605}
1606
1607#[cfg(test)]
1608mod tests {
1609 use super::*;
1610 use crate::replication::cdc::{ChangeOperation, ChangeRecord};
1611 use std::time::{SystemTime, UNIX_EPOCH};
1612
1613 fn temp_data_path(name: &str) -> PathBuf {
1614 let suffix = SystemTime::now()
1615 .duration_since(UNIX_EPOCH)
1616 .unwrap()
1617 .as_nanos();
1618 std::env::temp_dir().join(format!("reddb_{name}_{suffix}.rdb"))
1619 }
1620
1621 #[test]
1622 fn logical_wal_spool_roundtrip_and_prune() {
1623 let data_path = temp_data_path("logical_spool");
1624 let spool_path = LogicalWalSpool::path_for(&data_path);
1625 let spool = LogicalWalSpool::open(&data_path).expect("open spool");
1626
1627 let record1 = ChangeRecord {
1628 term: 2,
1629 lsn: 7,
1630 timestamp: 1,
1631 operation: ChangeOperation::Insert,
1632 collection: "users".to_string(),
1633 entity_id: 10,
1634 entity_kind: "row".to_string(),
1635 entity_bytes: Some(vec![1, 2, 3]),
1636 metadata: None,
1637 refresh_records: None,
1638 };
1639 let record2 = ChangeRecord {
1640 term: 2,
1641 lsn: 8,
1642 timestamp: 2,
1643 operation: ChangeOperation::Update,
1644 collection: "users".to_string(),
1645 entity_id: 10,
1646 entity_kind: "row".to_string(),
1647 entity_bytes: Some(vec![4, 5, 6]),
1648 metadata: None,
1649 refresh_records: None,
1650 };
1651
1652 spool
1653 .append_with_term_and_timestamp(record1.term, record1.lsn, 11, &record1.encode())
1654 .expect("append 1");
1655 spool
1656 .append_with_term_and_timestamp(record2.term, record2.lsn, 12, &record2.encode())
1657 .expect("append 2");
1658
1659 let entries = spool.read_since(0, usize::MAX).expect("read");
1660 assert_eq!(entries.len(), 2);
1661 assert_eq!(entries[0].0, 7);
1662 assert_eq!(entries[1].0, 8);
1663 assert_eq!(ChangeRecord::decode(&entries[0].1).unwrap().term, 2);
1664
1665 let framed = read_and_repair_entries(&spool_path).expect("read framed entries");
1666 assert_eq!(framed[0].term, 2);
1667 assert_eq!(framed[0].timestamp_ms, 11);
1668
1669 spool.prune_through(7).expect("prune");
1670 let retained = spool.read_since(0, usize::MAX).expect("read retained");
1671 assert_eq!(retained.len(), 1);
1672 assert_eq!(retained[0].0, 8);
1673 assert_eq!(ChangeRecord::decode(&retained[0].1).unwrap().term, 2);
1674
1675 let _ = fs::remove_file(spool_path);
1676 }
1677
1678 #[test]
1679 fn logical_wal_spool_reads_v2_without_term() {
1680 let data_path = temp_data_path("logical_spool_v2");
1681 let spool_path = LogicalWalSpool::path_for(&data_path);
1682 let payload = br#"{"lsn":3,"timestamp":44,"operation":"delete","collection":"users","rid":9,"kind":"row"}"#;
1683 let lsn = 3u64;
1684 let timestamp = 44u64;
1685 let crc = compute_logical_v2_crc(LOGICAL_WAL_SPOOL_VERSION_V2, lsn, timestamp, payload);
1686
1687 let mut file = File::create(&spool_path).expect("create v2 spool");
1688 file.write_all(LOGICAL_WAL_SPOOL_MAGIC).unwrap();
1689 file.write_all(&[LOGICAL_WAL_SPOOL_VERSION_V2]).unwrap();
1690 file.write_all(&lsn.to_le_bytes()).unwrap();
1691 file.write_all(×tamp.to_le_bytes()).unwrap();
1692 file.write_all(&(payload.len() as u32).to_le_bytes())
1693 .unwrap();
1694 file.write_all(payload).unwrap();
1695 file.write_all(&crc.to_le_bytes()).unwrap();
1696 file.sync_all().unwrap();
1697
1698 let spool = LogicalWalSpool::open(&data_path).expect("open v2 spool");
1699 let records = spool.read_since(0, usize::MAX).expect("read v2 spool");
1700 assert_eq!(records.len(), 1);
1701 assert_eq!(records[0].0, 3);
1702 let decoded = ChangeRecord::decode(&records[0].1).expect("decode v2 payload");
1703 assert_eq!(decoded.term, crate::replication::DEFAULT_REPLICATION_TERM);
1704 assert_eq!(decoded.lsn, 3);
1705
1706 let framed = read_and_repair_entries(&spool_path).expect("read framed v2 entries");
1707 assert_eq!(framed[0].term, crate::replication::DEFAULT_REPLICATION_TERM);
1708
1709 let _ = fs::remove_file(spool_path);
1710 }
1711
1712 #[test]
1713 fn topology_epoch_monotonic_on_register_and_unregister() {
1714 let primary = PrimaryReplication::new(None);
1719 let e0 = primary.topology_epoch();
1720 primary.register_replica("r1".to_string());
1721 let e1 = primary.topology_epoch();
1722 primary.register_replica("r2".to_string());
1723 let e2 = primary.topology_epoch();
1724 assert!(e1 > e0, "register must bump epoch ({e0} -> {e1})");
1725 assert!(e2 > e1, "second register must bump epoch ({e1} -> {e2})");
1726
1727 let removed = primary.unregister_replica("r1");
1728 assert!(removed);
1729 let e3 = primary.topology_epoch();
1730 assert!(e3 > e2, "unregister must bump epoch ({e2} -> {e3})");
1731
1732 let absent = primary.unregister_replica("ghost");
1735 assert!(!absent);
1736 assert_eq!(
1737 primary.topology_epoch(),
1738 e3,
1739 "unregistering a missing replica must not bump the epoch"
1740 );
1741 }
1742
1743 #[test]
1744 fn register_replica_is_idempotent_on_reconnect() {
1745 let primary = PrimaryReplication::new(None);
1750
1751 primary.register_replica("r1".to_string());
1753 assert_eq!(
1754 primary.replica_count(),
1755 1,
1756 "first register creates an entry"
1757 );
1758 let epoch_after_first = primary.topology_epoch();
1759
1760 primary.note_replica_pull("r1", 42);
1762 primary.ack_replica_lsn("r1", 40, 40);
1763 let before = primary
1764 .replica_snapshots()
1765 .into_iter()
1766 .find(|r| r.id == "r1")
1767 .expect("r1 present");
1768 assert_eq!(before.last_sent_lsn, 42);
1769 assert_eq!(before.last_acked_lsn, 40);
1770 assert_eq!(before.last_durable_lsn, 40);
1771
1772 let resume_lsn = primary.register_replica("r1".to_string());
1774
1775 assert_eq!(
1777 primary.replica_count(),
1778 1,
1779 "reconnect must not create a duplicate registry entry"
1780 );
1781 assert_eq!(
1783 primary.topology_epoch(),
1784 epoch_after_first,
1785 "reconnect must not bump the topology epoch"
1786 );
1787 let after = primary
1789 .replica_snapshots()
1790 .into_iter()
1791 .find(|r| r.id == "r1")
1792 .expect("r1 still present");
1793 assert_eq!(after.last_sent_lsn, 42, "last_sent_lsn preserved");
1794 assert_eq!(after.last_acked_lsn, 40, "last_acked_lsn preserved");
1795 assert_eq!(after.last_durable_lsn, 40, "last_durable_lsn preserved");
1796 assert_eq!(resume_lsn, 40, "reconnect returns the slot restart LSN");
1798 }
1799
1800 #[test]
1801 fn replica_slot_persists_and_reconnect_resumes_from_restart_lsn() {
1802 let data_path = temp_data_path("replication_slots");
1803 let spool_path = LogicalWalSpool::path_for(&data_path);
1804 let slot_path = PrimaryReplication::slot_path_for(&data_path);
1805
1806 {
1807 let primary = PrimaryReplication::new(Some(&data_path));
1808 primary.register_replica("r1".to_string());
1809 primary.note_replica_pull("r1", 12);
1810 primary.ack_replica_lsn("r1", 10, 8);
1811
1812 let slot = primary
1813 .slot_snapshots()
1814 .into_iter()
1815 .find(|slot| slot.id == "r1")
1816 .expect("r1 slot present");
1817 assert_eq!(slot.restart_lsn, 8);
1818 assert_eq!(slot.confirmed_lsn, 10);
1819 }
1820
1821 let reopened = PrimaryReplication::new(Some(&data_path));
1822 let slot = reopened
1823 .slot_snapshots()
1824 .into_iter()
1825 .find(|slot| slot.id == "r1")
1826 .expect("r1 slot loaded after reopen");
1827 assert_eq!(slot.restart_lsn, 8);
1828 assert_eq!(slot.confirmed_lsn, 10);
1829 assert_eq!(
1830 reopened.register_replica("r1".to_string()),
1831 8,
1832 "reconnect resumes from the durable slot restart LSN"
1833 );
1834
1835 let _ = fs::remove_file(spool_path);
1836 let _ = fs::remove_file(slot_path);
1837 }
1838
1839 #[test]
1840 fn retention_floor_follows_slowest_slot_and_prunes_wal() {
1841 let primary = PrimaryReplication::new(None);
1842 primary.register_replica("fast".to_string());
1843 primary.register_replica("slow".to_string());
1844
1845 for lsn in 1..=6 {
1846 primary.wal_buffer.append(lsn, vec![lsn as u8]);
1847 }
1848
1849 primary.ack_replica_lsn("fast", 5, 5);
1850 primary.ack_replica_lsn("slow", 3, 2);
1851
1852 assert_eq!(
1853 primary.retention_floor_lsn(),
1854 Some(2),
1855 "slowest slot restart_lsn sets the retention floor"
1856 );
1857 assert_eq!(primary.prune_retained_wal_through(6).unwrap(), 2);
1858 let retained: Vec<_> = primary
1859 .wal_buffer
1860 .read_since(0, usize::MAX)
1861 .into_iter()
1862 .map(|(lsn, _)| lsn)
1863 .collect();
1864 assert_eq!(retained, vec![3, 4, 5, 6]);
1865
1866 primary.ack_replica_lsn("slow", 6, 6);
1867 assert_eq!(
1868 primary.retention_floor_lsn(),
1869 Some(5),
1870 "slot confirmation advances the retention floor"
1871 );
1872 assert_eq!(primary.prune_retained_wal_through(6).unwrap(), 5);
1873 let retained: Vec<_> = primary
1874 .wal_buffer
1875 .read_since(0, usize::MAX)
1876 .into_iter()
1877 .map(|(lsn, _)| lsn)
1878 .collect();
1879 assert_eq!(retained, vec![6]);
1880 }
1881
1882 #[test]
1883 fn bootstrap_slot_pin_prevents_wal_removed_rebootstrap_after_prune() {
1884 let primary = PrimaryReplication::new(None);
1885 for lsn in 1..=5 {
1886 primary.wal_buffer.append(lsn, vec![lsn as u8]);
1887 }
1888
1889 let slot_lsn = primary.register_replica("bootstrapping".to_string());
1890 assert_eq!(slot_lsn, 5, "bootstrap pins the current frontier");
1891
1892 for lsn in 6..=8 {
1893 primary.wal_buffer.append(lsn, vec![lsn as u8]);
1894 }
1895
1896 assert_eq!(
1897 primary.prune_retained_wal_through(8).unwrap(),
1898 5,
1899 "bootstrap slot keeps the frontier retained"
1900 );
1901 assert_eq!(
1902 primary.slot_rebootstrap_reason("bootstrapping", 0, primary.wal_buffer.oldest_lsn()),
1903 None,
1904 "a caller resuming from its slot must not see wal-removed after slot-aware pruning"
1905 );
1906 }
1907
1908 #[test]
1909 fn default_config_enables_finite_slot_retention_cap() {
1910 let config = crate::replication::ReplicationConfig::primary();
1911
1912 assert!(
1913 config.slot_retention_max_lag_lsn > 0,
1914 "primary replication must default to a finite slot retention cap"
1915 );
1916 }
1917
1918 #[test]
1919 fn retention_cap_invalidates_slow_slot_and_releases_wal_floor() {
1920 let primary = PrimaryReplication::new_with_config(
1921 None,
1922 &crate::replication::ReplicationConfig::primary().with_slot_retention_max_lag_lsn(3),
1923 );
1924 primary.register_replica("fast".to_string());
1925 primary.register_replica("slow".to_string());
1926
1927 for lsn in 1..=6 {
1928 primary.wal_buffer.append(lsn, vec![lsn as u8]);
1929 }
1930 primary.ack_replica_lsn("fast", 6, 6);
1931
1932 assert_eq!(primary.prune_retained_wal_through(6).unwrap(), 6);
1933
1934 let slow = primary
1935 .slot_snapshots()
1936 .into_iter()
1937 .find(|slot| slot.id == "slow")
1938 .expect("slow slot present");
1939 assert_eq!(
1940 slow.invalidation_reason,
1941 Some(SlotInvalidationCause::Horizon)
1942 );
1943
1944 let retained: Vec<_> = primary
1945 .wal_buffer
1946 .read_since(0, usize::MAX)
1947 .into_iter()
1948 .map(|(lsn, _)| lsn)
1949 .collect();
1950 assert!(
1951 retained.is_empty(),
1952 "invalidated slow slot must not pin WAL"
1953 );
1954 }
1955
1956 #[test]
1957 fn slot_invalidation_cause_codes_cover_wal_removed_horizon_and_idle_timeout() {
1958 let wal_removed = PrimaryReplication::new_with_config(
1959 None,
1960 &crate::replication::ReplicationConfig::primary()
1961 .with_slot_retention_max_lag_lsn(3)
1962 .with_slot_idle_timeout_ms(10),
1963 );
1964 wal_removed.register_replica("wal".to_string());
1965 assert_eq!(
1966 wal_removed.slot_rebootstrap_reason("wal", 0, Some(2)),
1967 Some(SlotInvalidationCause::WalRemoved)
1968 );
1969
1970 let horizon = PrimaryReplication::new_with_config(
1971 None,
1972 &crate::replication::ReplicationConfig::primary().with_slot_retention_max_lag_lsn(3),
1973 );
1974 horizon.register_replica("horizon".to_string());
1975 for lsn in 1..=4 {
1976 horizon.wal_buffer.append(lsn, vec![lsn as u8]);
1977 }
1978 horizon.enforce_retention_limits(0);
1979 assert_eq!(
1980 horizon
1981 .slot_snapshots()
1982 .into_iter()
1983 .find(|slot| slot.id == "horizon")
1984 .and_then(|slot| slot.invalidation_reason),
1985 Some(SlotInvalidationCause::Horizon)
1986 );
1987
1988 let idle = PrimaryReplication::new_with_config(
1989 None,
1990 &crate::replication::ReplicationConfig::primary().with_slot_idle_timeout_ms(10),
1991 );
1992 idle.register_replica("idle".to_string());
1993 idle.touch_slot("idle", 1);
1994 idle.enforce_retention_limits(12);
1995 assert_eq!(
1996 idle.slot_snapshots()
1997 .into_iter()
1998 .find(|slot| slot.id == "idle")
1999 .and_then(|slot| slot.invalidation_reason),
2000 Some(SlotInvalidationCause::IdleTimeout)
2001 );
2002 }
2003
2004 #[test]
2005 fn wal_buffer_fan_out_shares_refcounted_payload() {
2006 let buffer = WalBuffer::new(8);
2010 buffer.append(1, vec![0xDE, 0xAD, 0xBE, 0xEF]);
2011
2012 let replica_a = buffer.read_since_shared(0, usize::MAX);
2013 let replica_b = buffer.read_since_shared(0, usize::MAX);
2014 assert_eq!(replica_a.len(), 1);
2015 assert_eq!(replica_b.len(), 1);
2016
2017 assert!(
2018 Arc::ptr_eq(&replica_a[0].1, &replica_b[0].1),
2019 "two replicas must share one ref-counted payload allocation"
2020 );
2021 assert_eq!(&*replica_a[0].1, &[0xDE, 0xAD, 0xBE, 0xEF]);
2022 assert!(
2023 Arc::strong_count(&replica_a[0].1) >= 3,
2024 "buffer + both replica handles reference the same payload"
2025 );
2026
2027 let owned = buffer.read_since(0, usize::MAX);
2029 assert_eq!(owned, vec![(1u64, vec![0xDE, 0xAD, 0xBE, 0xEF])]);
2030 }
2031
2032 #[test]
2033 fn spool_seek_index_resume_is_sublinear() {
2034 let data_path = temp_data_path("seek_index");
2038 let spool_path = LogicalWalSpool::path_for(&data_path);
2039 let spool = LogicalWalSpool::open(&data_path).expect("open spool");
2040
2041 for lsn in 1..=200u64 {
2042 spool
2043 .append_with_term_and_timestamp(1, lsn, lsn, &[(lsn % 251) as u8, 0xAB])
2044 .expect("append");
2045 }
2046
2047 assert_eq!(spool.read_since(0, usize::MAX).expect("full").len(), 200);
2049 assert_eq!(spool.seek_floor_offset(0), 0);
2050
2051 let resumed = spool.read_since(130, usize::MAX).expect("resume");
2054 assert_eq!(resumed.first().map(|(lsn, _)| *lsn), Some(131));
2055 assert_eq!(resumed.last().map(|(lsn, _)| *lsn), Some(200));
2056 assert_eq!(resumed.len(), 70);
2057 assert!(
2058 spool.seek_floor_offset(130) > 0,
2059 "mid-spool resume must seek past offset 0"
2060 );
2061
2062 drop(spool);
2065 let reopened = LogicalWalSpool::open(&data_path).expect("reopen spool");
2066 assert!(reopened.seek_floor_offset(130) > 0);
2067 assert_eq!(
2068 reopened
2069 .read_since(130, usize::MAX)
2070 .expect("resume reopen")
2071 .len(),
2072 70
2073 );
2074
2075 let _ = fs::remove_file(spool_path);
2076 }
2077
2078 #[test]
2079 fn plan_replica_resume_partial_within_window_full_past_cap() {
2080 let within = PrimaryReplication::new(None);
2084 within.register_replica("blip".to_string());
2085 for lsn in 1..=5 {
2086 within.wal_buffer.append(lsn, vec![lsn as u8]);
2087 }
2088 let before = within.partial_resync_count();
2089 match within.plan_replica_resume("blip", 2, within.wal_buffer.oldest_lsn()) {
2090 ResumeMode::PartialResync { resume_lsn } => assert_eq!(resume_lsn, 2),
2091 other => panic!("brief blip must resume via partial resync, got {other:?}"),
2092 }
2093 assert_eq!(
2094 within.partial_resync_count(),
2095 before + 1,
2096 "partial resync must be observable via the metric"
2097 );
2098
2099 let past_cap = PrimaryReplication::new_with_config(
2103 None,
2104 &crate::replication::ReplicationConfig::primary().with_slot_retention_max_lag_lsn(3),
2105 );
2106 past_cap.register_replica("slow".to_string());
2107 for lsn in 1..=6 {
2108 past_cap.wal_buffer.append(lsn, vec![lsn as u8]);
2109 }
2110 past_cap.enforce_retention_limits(0);
2111 let before_full = past_cap.partial_resync_count();
2112 match past_cap.plan_replica_resume("slow", 0, past_cap.wal_buffer.oldest_lsn()) {
2113 ResumeMode::FullRebootstrap { cause } => {
2114 assert_eq!(cause, SlotInvalidationCause::Horizon)
2115 }
2116 other => panic!("slot past the cap must re-bootstrap, got {other:?}"),
2117 }
2118 assert_eq!(
2119 past_cap.partial_resync_count(),
2120 before_full,
2121 "a full re-bootstrap must not be counted as a partial resync"
2122 );
2123 }
2124
2125 #[test]
2126 fn ensure_replica_registered_self_registers_then_is_a_noop() {
2127 let primary = PrimaryReplication::new(None);
2131
2132 assert!(
2134 primary.ensure_replica_registered("r1"),
2135 "first identification registers the replica"
2136 );
2137 assert_eq!(primary.replica_count(), 1);
2138 let epoch_after_register = primary.topology_epoch();
2139
2140 primary.note_replica_pull("r1", 7);
2142 assert_eq!(
2143 primary
2144 .replica_snapshots()
2145 .into_iter()
2146 .find(|r| r.id == "r1")
2147 .map(|r| r.last_sent_lsn),
2148 Some(7),
2149 "primary tracks last_sent_lsn for a registered replica's pull"
2150 );
2151
2152 assert!(
2155 !primary.ensure_replica_registered("r1"),
2156 "already-registered replica is not re-registered"
2157 );
2158 assert_eq!(primary.replica_count(), 1);
2159 assert_eq!(primary.topology_epoch(), epoch_after_register);
2160 assert_eq!(
2161 primary
2162 .replica_snapshots()
2163 .into_iter()
2164 .find(|r| r.id == "r1")
2165 .map(|r| r.last_sent_lsn),
2166 Some(7),
2167 "no-op registration preserves progress"
2168 );
2169 }
2170
2171 #[test]
2172 fn replication_progress_uses_sent_applied_and_durable_registry_lsns() {
2173 let now = crate::utils::now_unix_millis() as u128;
2174 let replicas = vec![
2175 ReplicaState {
2176 id: "fast".to_string(),
2177 last_acked_lsn: 90,
2178 last_sent_lsn: 120,
2179 last_durable_lsn: 80,
2180 apply_error_count: 0,
2181 divergence_count: 0,
2182 connected_at_unix_ms: now,
2183 last_seen_at_unix_ms: now,
2184 region: None,
2185 rebootstrapping: false,
2186 },
2187 ReplicaState {
2188 id: "slow".to_string(),
2189 last_acked_lsn: 70,
2190 last_sent_lsn: 100,
2191 last_durable_lsn: 60,
2192 apply_error_count: 0,
2193 divergence_count: 0,
2194 connected_at_unix_ms: now,
2195 last_seen_at_unix_ms: now,
2196 region: None,
2197 rebootstrapping: false,
2198 },
2199 ];
2200
2201 let progress = ReplicationProgress::from_replicas(&replicas).expect("registered replicas");
2202
2203 assert_eq!(progress.lag_lsn, 50);
2204 assert_eq!(progress.safe_replay_lsn, 60);
2205 }
2206}