Skip to main content

reddb_server/replication/
primary.rs

1//! Primary-side replication: WAL record production and snapshot serving.
2//!
3//! ## Logical WAL spool wire format
4//!
5//! ### Version 3 (current — issue #821)
6//!
7//! ```text
8//! [magic     4 bytes  = b"RDLW"]
9//! [version   1 byte   = 0x03]
10//! [term      8 bytes  little-endian u64]
11//! [lsn       8 bytes  little-endian u64]
12//! [timestamp 8 bytes  little-endian u64 — wall-clock millis since UNIX epoch]
13//! [payload_len 4 bytes little-endian u32]
14//! [payload   payload_len bytes]
15//! [crc32     4 bytes  little-endian u32 — crc32fast of (version || term ||
16//!                                          lsn || timestamp || payload_len ||
17//!                                          payload)]
18//! ```
19//!
20//! ### Version 2 (legacy, read-only — PLAN.md Phase 2 / W2)
21//!
22//! ```text
23//! [magic     4 bytes  = b"RDLW"]
24//! [version   1 byte   = 0x02]
25//! [lsn       8 bytes  little-endian u64]
26//! [timestamp 8 bytes  little-endian u64 — wall-clock millis since UNIX epoch]
27//! [payload_len 4 bytes little-endian u32]
28//! [payload   payload_len bytes]
29//! [crc32     4 bytes  little-endian u32 — crc32fast of (version || lsn ||
30//!                                          timestamp || payload_len || payload)]
31//! ```
32//!
33//! - `sync_all()` is called after every append so an acknowledged
34//!   `append()` survives a power-loss event.
35//! - Recovery accepts the longest valid prefix and silently truncates
36//!   at the first torn header, short payload/crc, or checksum
37//!   mismatch (warning logged). No partial record is ever returned to
38//!   the replication subsystem.
39//!
40//! ### Version 1 (legacy, read-only)
41//!
42//! ```text
43//! [magic 4][version 1=0x01][lsn 8][payload_len 8][payload]
44//! ```
45//!
46//! No checksum, no timestamp. Read for backward compatibility on
47//! existing spools; never written. A v1 record found in a spool will
48//! be returned to consumers but flagged via `LogicalWalEntry::v1`.
49
50use std::collections::{BTreeMap, VecDeque};
51use std::fs::{self, File, OpenOptions};
52use std::io::{self, Read, Seek, SeekFrom, Write};
53use std::path::{Path, PathBuf};
54use std::sync::{Arc, Condvar, Mutex, RwLock};
55use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
56
57use tracing::warn;
58
59const LOGICAL_WAL_SPOOL_MAGIC: &[u8; 4] = b"RDLW";
60const LOGICAL_WAL_SPOOL_VERSION_V1: u8 = 1;
61const LOGICAL_WAL_SPOOL_VERSION_V2: u8 = 2;
62const LOGICAL_WAL_SPOOL_VERSION_V3: u8 = 3;
63const LOGICAL_WAL_SPOOL_VERSION_CURRENT: u8 = LOGICAL_WAL_SPOOL_VERSION_V3;
64/// Header size in bytes for a v3 record before the payload starts:
65/// magic(4) + version(1) + term(8) + lsn(8) + timestamp(8) + payload_len(4) = 33.
66const LOGICAL_WAL_V3_HEADER_LEN: u64 = 4 + 1 + 8 + 8 + 8 + 4;
67/// CRC32 trailer size in bytes for logical spool records.
68const LOGICAL_WAL_V2_CRC_LEN: u64 = 4;
69
70/// Compute CRC32 over the bytes that follow the magic — version,
71/// lsn, timestamp, payload_len, and payload. Magic is excluded so
72/// torn-record detection at recovery time depends only on data the
73/// writer covered.
74///
75/// Uses the same `crate::storage::engine::crc32` polynomial as the
76/// physical WAL record format so checksums computed here are
77/// comparable to those in `src/storage/wal/record.rs`.
78fn compute_logical_v2_crc(version: u8, lsn: u64, timestamp: u64, payload: &[u8]) -> u32 {
79    use crate::storage::engine::crc32::crc32_update;
80    let mut crc = crc32_update(0, &[version]);
81    crc = crc32_update(crc, &lsn.to_le_bytes());
82    crc = crc32_update(crc, &timestamp.to_le_bytes());
83    crc = crc32_update(crc, &(payload.len() as u32).to_le_bytes());
84    crc = crc32_update(crc, payload);
85    crc
86}
87
88fn compute_logical_v3_crc(version: u8, term: u64, lsn: u64, timestamp: u64, payload: &[u8]) -> u32 {
89    use crate::storage::engine::crc32::crc32_update;
90    let mut crc = crc32_update(0, &[version]);
91    crc = crc32_update(crc, &term.to_le_bytes());
92    crc = crc32_update(crc, &lsn.to_le_bytes());
93    crc = crc32_update(crc, &timestamp.to_le_bytes());
94    crc = crc32_update(crc, &(payload.len() as u32).to_le_bytes());
95    crc = crc32_update(crc, payload);
96    crc
97}
98
99fn term_from_payload(payload: &[u8]) -> u64 {
100    crate::replication::cdc::ChangeRecord::decode(payload)
101        .map(|record| record.term)
102        .unwrap_or(crate::replication::DEFAULT_REPLICATION_TERM)
103}
104
105/// In-memory WAL buffer for replication.
106/// Primary appends records here; replicas consume from it.
107///
108/// Each record payload is stored behind an `Arc<[u8]>` so fan-out to
109/// multiple replicas shares a single heap allocation per record
110/// (issue #832): a pull clones the `Arc` handle, never the bytes, so
111/// adding replicas does not multiply the primary's send-buffer memory.
112pub struct WalBuffer {
113    /// Circular buffer of (lsn, ref-counted serialized record) pairs.
114    records: RwLock<VecDeque<(u64, Arc<[u8]>)>>,
115    /// Current write LSN.
116    current_lsn: RwLock<u64>,
117}
118
119impl WalBuffer {
120    pub fn new(max_size: usize) -> Self {
121        Self {
122            records: RwLock::new(VecDeque::with_capacity(max_size)),
123            current_lsn: RwLock::new(0),
124        }
125    }
126
127    /// Append a WAL record. Called by the storage engine after each write.
128    pub fn append(&self, lsn: u64, data: Vec<u8>) {
129        let mut records = self.records.write().unwrap_or_else(|e| e.into_inner());
130        records.push_back((lsn, Arc::from(data.into_boxed_slice())));
131
132        let mut current = self.current_lsn.write().unwrap_or_else(|e| e.into_inner());
133        *current = (*current).max(lsn);
134    }
135
136    /// Read records since the given LSN (exclusive), copying each
137    /// payload into an owned `Vec<u8>`. Kept for callers (WAL
138    /// archiving, retention bookkeeping) that need owned bytes; the
139    /// per-replica fan-out path should prefer [`Self::read_since_shared`]
140    /// to avoid copying.
141    pub fn read_since(&self, since_lsn: u64, max_count: usize) -> Vec<(u64, Vec<u8>)> {
142        self.read_since_shared(since_lsn, max_count)
143            .into_iter()
144            .map(|(lsn, data)| (lsn, data.to_vec()))
145            .collect()
146    }
147
148    /// Read records since the given LSN (exclusive) sharing the stored
149    /// `Arc<[u8]>` payloads. Fan-out to N replicas clones only the
150    /// reference-counted handles, so the buffer's bytes are never
151    /// duplicated per replica (issue #832).
152    pub fn read_since_shared(&self, since_lsn: u64, max_count: usize) -> Vec<(u64, Arc<[u8]>)> {
153        let records = self.records.read().unwrap_or_else(|e| e.into_inner());
154        records
155            .iter()
156            .filter(|(lsn, _)| *lsn > since_lsn)
157            .take(max_count)
158            .cloned()
159            .collect()
160    }
161
162    /// Current LSN.
163    pub fn current_lsn(&self) -> u64 {
164        *self.current_lsn.read().unwrap_or_else(|e| e.into_inner())
165    }
166
167    pub fn set_current_lsn(&self, lsn: u64) {
168        let mut current = self.current_lsn.write().unwrap_or_else(|e| e.into_inner());
169        *current = (*current).max(lsn);
170    }
171
172    pub fn prune_through(&self, upto_lsn: u64) {
173        let mut records = self.records.write().unwrap_or_else(|e| e.into_inner());
174        while records
175            .front()
176            .map(|(lsn, _)| *lsn <= upto_lsn)
177            .unwrap_or(false)
178        {
179            records.pop_front();
180        }
181    }
182
183    /// Oldest available LSN (for gap detection).
184    pub fn oldest_lsn(&self) -> Option<u64> {
185        let records = self.records.read().unwrap_or_else(|e| e.into_inner());
186        records.front().map(|(lsn, _)| *lsn)
187    }
188}
189
190#[derive(Debug, Clone)]
191struct LogicalWalEntry {
192    term: u64,
193    lsn: u64,
194    /// Wall-clock millis at append time. `0` for legacy v1 records that
195    /// did not carry a framing timestamp.
196    timestamp_ms: u64,
197    data: Vec<u8>,
198}
199
200impl LogicalWalEntry {
201    fn data_with_framing_term(&self) -> Vec<u8> {
202        match crate::replication::cdc::ChangeRecord::decode(&self.data) {
203            Ok(mut record) if record.term != self.term => {
204                record.term = self.term;
205                record.encode()
206            }
207            _ => self.data.clone(),
208        }
209    }
210}
211
212/// One in every `SEEK_INDEX_INTERVAL` records is checkpointed into the
213/// spool's in-memory seek index. A briefly-disconnected replica
214/// resuming from its slot LSN binary-searches this sparse index and
215/// seeks straight to the nearest preceding checkpoint, then scans
216/// forward at most `SEEK_INDEX_INTERVAL` records — turning resume from
217/// an O(n) full-file scan into a sub-linear seek (issue #832). The
218/// index is rebuilt on `open` and extended on every `append`.
219const SEEK_INDEX_INTERVAL: u64 = 64;
220
221#[derive(Debug, Default)]
222struct LogicalWalSpoolState {
223    current_lsn: u64,
224    /// Sparse, strictly LSN-ascending `(lsn, byte_offset)` checkpoints
225    /// into the spool file. `byte_offset` is the start of the record
226    /// whose LSN is `lsn`.
227    seek_index: Vec<(u64, u64)>,
228    /// Byte length of the spool file (offset at which the next append
229    /// lands). Tracked so `append` can record a checkpoint's offset
230    /// without an extra `stat`.
231    write_offset: u64,
232    /// Total records appended/recovered, used to space checkpoints
233    /// `SEEK_INDEX_INTERVAL` records apart.
234    record_count: u64,
235}
236
237impl LogicalWalSpoolState {
238    /// Push a checkpoint for the record at `offset` if it falls on a
239    /// `SEEK_INDEX_INTERVAL` boundary. `ordinal` is the record's
240    /// zero-based position in the spool.
241    fn note_record(&mut self, ordinal: u64, lsn: u64, offset: u64) {
242        if ordinal.is_multiple_of(SEEK_INDEX_INTERVAL) {
243            // Keep the index strictly ascending even if LSNs repeat
244            // (they should not, but a defensive guard keeps the binary
245            // search total).
246            if self.seek_index.last().map(|(l, _)| *l) != Some(lsn) {
247                self.seek_index.push((lsn, offset));
248            }
249        }
250    }
251
252    /// Byte offset to start a forward scan from when resuming at
253    /// `since_lsn` (exclusive). Returns the offset of the latest
254    /// checkpoint whose LSN is `<= since_lsn`, or `0` when no such
255    /// checkpoint exists.
256    fn seek_floor_offset(&self, since_lsn: u64) -> u64 {
257        match self
258            .seek_index
259            .binary_search_by(|(lsn, _)| lsn.cmp(&since_lsn))
260        {
261            Ok(idx) => self.seek_index[idx].1,
262            Err(0) => 0,
263            Err(idx) => self.seek_index[idx - 1].1,
264        }
265    }
266}
267
268/// Durable append-only logical WAL spool kept beside the main `.rdb` file.
269///
270/// This is not the storage-engine WAL; it is a structured replication/PITR log.
271pub struct LogicalWalSpool {
272    path: PathBuf,
273    state: Mutex<LogicalWalSpoolState>,
274}
275
276impl LogicalWalSpool {
277    pub fn path_for(data_path: &Path) -> PathBuf {
278        let file_name = data_path
279            .file_name()
280            .and_then(|name| name.to_str())
281            .unwrap_or("reddb.rdb");
282        let spool_name = format!("{file_name}.logical.wal");
283        match data_path.parent() {
284            Some(parent) => parent.join(spool_name),
285            None => PathBuf::from(spool_name),
286        }
287    }
288
289    pub fn open(data_path: &Path) -> io::Result<Self> {
290        let path = Self::path_for(data_path);
291        if let Some(parent) = path.parent() {
292            fs::create_dir_all(parent)?;
293        }
294        if !path.exists() {
295            File::create(&path)?;
296        }
297        // Recover-or-truncate to the longest valid prefix. A torn tail
298        // from the previous process exit (power loss, OOM kill, ENOSPC
299        // mid-write) is silently dropped; the warning surfaces to the
300        // operator log but the spool stays open.
301        let entries = read_and_repair_entries(&path)?;
302        let current_lsn = entries.last().map(|entry| entry.lsn).unwrap_or(0);
303        // Rebuild the sparse seek index from the (now repaired) file so
304        // a post-restart resume is sub-linear from the first pull.
305        let (seek_index, write_offset, record_count) = build_seek_index(&path)?;
306        Ok(Self {
307            path,
308            state: Mutex::new(LogicalWalSpoolState {
309                current_lsn,
310                seek_index,
311                write_offset,
312                record_count,
313            }),
314        })
315    }
316
317    pub fn append(&self, lsn: u64, data: &[u8]) -> io::Result<()> {
318        let timestamp_ms = SystemTime::now()
319            .duration_since(UNIX_EPOCH)
320            .map(|d| d.as_millis() as u64)
321            .unwrap_or(0);
322        self.append_with_timestamp(lsn, timestamp_ms, data)
323    }
324
325    /// Append a record with an explicit framing timestamp. Used in
326    /// tests to produce deterministic timestamps; production callers
327    /// should use `append`.
328    pub fn append_with_timestamp(
329        &self,
330        lsn: u64,
331        timestamp_ms: u64,
332        data: &[u8],
333    ) -> io::Result<()> {
334        self.append_with_term_and_timestamp(term_from_payload(data), lsn, timestamp_ms, data)
335    }
336
337    pub fn append_with_term_and_timestamp(
338        &self,
339        term: u64,
340        lsn: u64,
341        timestamp_ms: u64,
342        data: &[u8],
343    ) -> io::Result<()> {
344        if data.len() > u32::MAX as usize {
345            return Err(io::Error::new(
346                io::ErrorKind::InvalidInput,
347                format!(
348                    "logical WAL payload of {} bytes exceeds 4 GiB framing limit",
349                    data.len()
350                ),
351            ));
352        }
353        let mut file = OpenOptions::new()
354            .create(true)
355            .append(true)
356            .open(&self.path)?;
357        // Pre-build the record in memory so a single write_all keeps
358        // the on-disk record contiguous. Two side-effects:
359        //   (a) crash recovery sees either a complete record or a torn
360        //       header, never an interleaved partial frame from two
361        //       writers (the spool is not multi-writer today, but the
362        //       single-write semantics make that future-safe);
363        //   (b) crc32 is computed exactly once over the same bytes the
364        //       reader will checksum, with zero risk of header/payload
365        //       drift from a partial flush.
366        let mut frame = Vec::with_capacity(
367            LOGICAL_WAL_V3_HEADER_LEN as usize + data.len() + LOGICAL_WAL_V2_CRC_LEN as usize,
368        );
369        frame.extend_from_slice(LOGICAL_WAL_SPOOL_MAGIC);
370        frame.push(LOGICAL_WAL_SPOOL_VERSION_CURRENT);
371        frame.extend_from_slice(&term.to_le_bytes());
372        frame.extend_from_slice(&lsn.to_le_bytes());
373        frame.extend_from_slice(&timestamp_ms.to_le_bytes());
374        frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
375        frame.extend_from_slice(data);
376        let crc = compute_logical_v3_crc(
377            LOGICAL_WAL_SPOOL_VERSION_CURRENT,
378            term,
379            lsn,
380            timestamp_ms,
381            data,
382        );
383        frame.extend_from_slice(&crc.to_le_bytes());
384
385        file.write_all(&frame)?;
386        // PLAN.md Phase 2 mandates `sync_all` for logical WAL durability.
387        // `flush()` only drains the std::io userspace buffer; without
388        // `sync_all` the kernel page cache may still be dirty when an
389        // acknowledged write supposedly committed.
390        file.sync_all()?;
391
392        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
393        state.current_lsn = state.current_lsn.max(lsn);
394        // The record we just wrote starts at the prior end-of-file.
395        // Checkpoint it into the seek index if it lands on an interval
396        // boundary, then advance the tracked write offset.
397        let record_start = state.write_offset;
398        let ordinal = state.record_count;
399        state.note_record(ordinal, lsn, record_start);
400        state.write_offset = record_start + frame.len() as u64;
401        state.record_count = ordinal + 1;
402        Ok(())
403    }
404
405    pub fn read_since(&self, since_lsn: u64, max_count: usize) -> io::Result<Vec<(u64, Vec<u8>)>> {
406        // Seek straight to the nearest indexed checkpoint at or before
407        // `since_lsn` instead of scanning the whole spool from offset 0
408        // (issue #832). The file was already repaired on `open`, so the
409        // forward scan from the checkpoint is non-repairing and simply
410        // stops at the first torn tail (left for the next `open` to fix).
411        let start_offset = {
412            let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
413            state.seek_floor_offset(since_lsn)
414        };
415        let entries = read_entries_from(&self.path, start_offset)?;
416        Ok(entries
417            .into_iter()
418            .filter(|entry| entry.lsn > since_lsn)
419            .take(max_count)
420            .map(|entry| (entry.lsn, entry.data_with_framing_term()))
421            .collect())
422    }
423
424    /// Byte offset a resume at `since_lsn` would seek to before
425    /// forward-scanning. Exposed for tests asserting the resume is
426    /// sub-linear (starts past offset 0 for a mid-spool LSN).
427    #[cfg(test)]
428    fn seek_floor_offset(&self, since_lsn: u64) -> u64 {
429        self.state
430            .lock()
431            .unwrap_or_else(|e| e.into_inner())
432            .seek_floor_offset(since_lsn)
433    }
434
435    pub fn current_lsn(&self) -> u64 {
436        self.state
437            .lock()
438            .unwrap_or_else(|e| e.into_inner())
439            .current_lsn
440    }
441
442    pub fn oldest_lsn(&self) -> io::Result<Option<u64>> {
443        Ok(read_and_repair_entries(&self.path)?
444            .into_iter()
445            .next()
446            .map(|entry| entry.lsn))
447    }
448
449    pub fn prune_through(&self, upto_lsn: u64) -> io::Result<()> {
450        let previous_lsn = self.current_lsn();
451        let retained: Vec<_> = read_and_repair_entries(&self.path)?
452            .into_iter()
453            .filter(|entry| entry.lsn > upto_lsn)
454            .collect();
455        let temp_path = self.path.with_extension("logical.wal.tmp");
456        let mut temp = File::create(&temp_path)?;
457        let mut current_lsn = 0;
458        for entry in retained {
459            // Re-frame as v3 so the spool only ever contains current records
460            // after a prune. Legacy v1 records are upgraded by carrying
461            // their original LSN and default term forward; the framing timestamp is
462            // re-stamped to wall-clock-now because the original v1
463            // record didn't carry one — downstream consumers that need
464            // the operation's logical timestamp continue to use the
465            // payload's own ChangeRecord::timestamp field.
466            let timestamp_ms = if entry.timestamp_ms > 0 {
467                entry.timestamp_ms
468            } else {
469                SystemTime::now()
470                    .duration_since(UNIX_EPOCH)
471                    .map(|d| d.as_millis() as u64)
472                    .unwrap_or(0)
473            };
474            let crc = compute_logical_v3_crc(
475                LOGICAL_WAL_SPOOL_VERSION_CURRENT,
476                entry.term,
477                entry.lsn,
478                timestamp_ms,
479                &entry.data,
480            );
481            temp.write_all(LOGICAL_WAL_SPOOL_MAGIC)?;
482            temp.write_all(&[LOGICAL_WAL_SPOOL_VERSION_CURRENT])?;
483            temp.write_all(&entry.term.to_le_bytes())?;
484            temp.write_all(&entry.lsn.to_le_bytes())?;
485            temp.write_all(&timestamp_ms.to_le_bytes())?;
486            temp.write_all(&(entry.data.len() as u32).to_le_bytes())?;
487            temp.write_all(&entry.data)?;
488            temp.write_all(&crc.to_le_bytes())?;
489            current_lsn = current_lsn.max(entry.lsn);
490        }
491        temp.sync_all()?;
492        fs::rename(&temp_path, &self.path)?;
493
494        // The rewrite shifted every record's byte offset, so the old
495        // seek index is stale — rebuild it from the compacted file.
496        let (seek_index, write_offset, record_count) = build_seek_index(&self.path)?;
497        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
498        state.current_lsn = previous_lsn.max(current_lsn).max(upto_lsn);
499        state.seek_index = seek_index;
500        state.write_offset = write_offset;
501        state.record_count = record_count;
502        Ok(())
503    }
504}
505
506/// Reads every logical-WAL record from `path`, accepting the longest
507/// valid prefix and *truncating* the file at the first torn or
508/// corrupt record. Designed for crash recovery: a process killed
509/// mid-write leaves a partial frame that this function silently drops
510/// so the spool can resume appending without ambiguity.
511///
512/// Detection of "stop here" cases:
513///   1. `UnexpectedEof` while reading any header field, payload, or
514///      crc → torn write at end of file.
515///   2. Magic mismatch (any 4 bytes that aren't `RDLW`) → corrupt or
516///      foreign data; treated as if the file ended at the start of
517///      this record.
518///   3. v2 record with unsupported version byte → same.
519///   4. v2 CRC mismatch → record corrupt; truncated.
520///
521/// The truncation only fires when at least one valid record precedes
522/// the corrupt region (or when the corrupt region is the very first
523/// record — in which case the spool becomes empty). Either way the
524/// invariant that callers see only fully-checksummed payloads is
525/// preserved.
526///
527/// v1 records (legacy, no checksum) are accepted for read-only
528/// compatibility. They never receive a checksum; a v1 read that hits
529/// `UnexpectedEof` mid-payload also triggers truncation.
530fn read_and_repair_entries(path: &Path) -> io::Result<Vec<LogicalWalEntry>> {
531    if !path.exists() {
532        return Ok(Vec::new());
533    }
534
535    let mut file = OpenOptions::new().read(true).write(true).open(path)?;
536    let mut entries = Vec::new();
537    let mut last_good_offset: u64 = 0;
538    let mut corrupt_reason: Option<String> = None;
539
540    loop {
541        let record_start = file.stream_position()?;
542
543        let mut magic = [0u8; 4];
544        match file.read_exact(&mut magic) {
545            Ok(()) => {}
546            Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
547            Err(err) => return Err(err),
548        }
549        if &magic != LOGICAL_WAL_SPOOL_MAGIC {
550            corrupt_reason = Some(format!(
551                "bad magic at offset {record_start}: got {magic:02x?}"
552            ));
553            break;
554        }
555
556        let mut version = [0u8; 1];
557        if let Err(err) = file.read_exact(&mut version) {
558            if err.kind() == io::ErrorKind::UnexpectedEof {
559                corrupt_reason = Some(format!("torn header at offset {record_start}"));
560                break;
561            }
562            return Err(err);
563        }
564
565        let entry_result = match version[0] {
566            LOGICAL_WAL_SPOOL_VERSION_V3 => read_one_v3(&mut file, record_start),
567            LOGICAL_WAL_SPOOL_VERSION_V2 => read_one_v2(&mut file, record_start),
568            LOGICAL_WAL_SPOOL_VERSION_V1 => read_one_v1(&mut file, record_start),
569            other => {
570                corrupt_reason = Some(format!(
571                    "unsupported version {other} at offset {record_start}"
572                ));
573                break;
574            }
575        };
576
577        match entry_result {
578            Ok(entry) => {
579                entries.push(entry);
580                last_good_offset = file.stream_position()?;
581            }
582            Err(reason) => {
583                corrupt_reason = Some(reason);
584                break;
585            }
586        }
587    }
588
589    if let Some(reason) = corrupt_reason {
590        let total_len = file.metadata()?.len();
591        if last_good_offset < total_len {
592            warn!(
593                target: "reddb::replication::logical_wal",
594                path = %path.display(),
595                reason = %reason,
596                truncating_from = last_good_offset,
597                truncating_to = total_len,
598                kept_records = entries.len(),
599                "truncating logical-WAL spool to last valid record"
600            );
601            file.set_len(last_good_offset)?;
602            file.sync_all()?;
603        }
604    }
605
606    Ok(entries)
607}
608
609/// Read the single record whose magic byte begins at the current file
610/// cursor. Returns `Ok(Some(entry))` on a valid record, `Ok(None)`
611/// when the cursor is at a clean EOF or a torn/corrupt frame (the
612/// caller stops scanning), and `Err` only on an unexpected I/O fault.
613///
614/// Unlike [`read_and_repair_entries`] this never truncates: it is the
615/// read-time forward-scan primitive used after a seek, where the file
616/// was already repaired at `open`.
617fn read_frame(file: &mut File, record_start: u64) -> io::Result<Option<LogicalWalEntry>> {
618    let mut magic = [0u8; 4];
619    match file.read_exact(&mut magic) {
620        Ok(()) => {}
621        Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
622        Err(err) => return Err(err),
623    }
624    if &magic != LOGICAL_WAL_SPOOL_MAGIC {
625        return Ok(None);
626    }
627    let mut version = [0u8; 1];
628    match file.read_exact(&mut version) {
629        Ok(()) => {}
630        Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
631        Err(err) => return Err(err),
632    }
633    let entry = match version[0] {
634        LOGICAL_WAL_SPOOL_VERSION_V3 => read_one_v3(file, record_start),
635        LOGICAL_WAL_SPOOL_VERSION_V2 => read_one_v2(file, record_start),
636        LOGICAL_WAL_SPOOL_VERSION_V1 => read_one_v1(file, record_start),
637        _ => return Ok(None),
638    };
639    Ok(entry.ok())
640}
641
642/// Forward-scan valid records starting at `start_offset`, stopping at
643/// the first clean EOF or torn/corrupt frame. Non-repairing — used by
644/// the seek-based [`LogicalWalSpool::read_since`] resume path.
645fn read_entries_from(path: &Path, start_offset: u64) -> io::Result<Vec<LogicalWalEntry>> {
646    if !path.exists() {
647        return Ok(Vec::new());
648    }
649    let mut file = OpenOptions::new().read(true).open(path)?;
650    file.seek(SeekFrom::Start(start_offset))?;
651    let mut entries = Vec::new();
652    loop {
653        let record_start = file.stream_position()?;
654        match read_frame(&mut file, record_start)? {
655            Some(entry) => entries.push(entry),
656            None => break,
657        }
658    }
659    Ok(entries)
660}
661
662/// Build the sparse seek index by walking the (repaired) spool once,
663/// checkpointing every `SEEK_INDEX_INTERVAL`-th record. Returns the
664/// index, the byte offset just past the last valid record, and the
665/// total record count.
666fn build_seek_index(path: &Path) -> io::Result<(Vec<(u64, u64)>, u64, u64)> {
667    if !path.exists() {
668        return Ok((Vec::new(), 0, 0));
669    }
670    let mut file = OpenOptions::new().read(true).open(path)?;
671    let mut index = Vec::new();
672    let mut ordinal: u64 = 0;
673    let mut write_offset: u64 = 0;
674    loop {
675        let record_start = file.stream_position()?;
676        match read_frame(&mut file, record_start)? {
677            Some(entry) => {
678                if ordinal.is_multiple_of(SEEK_INDEX_INTERVAL)
679                    && index.last().map(|(l, _)| *l) != Some(entry.lsn)
680                {
681                    index.push((entry.lsn, record_start));
682                }
683                ordinal += 1;
684                write_offset = file.stream_position()?;
685            }
686            None => break,
687        }
688    }
689    Ok((index, write_offset, ordinal))
690}
691
692/// Read a v3 record assuming the magic + version byte have already
693/// been consumed and the file cursor sits at the term field.
694fn read_one_v3(file: &mut File, record_start: u64) -> Result<LogicalWalEntry, String> {
695    let mut term = [0u8; 8];
696    if let Err(err) = file.read_exact(&mut term) {
697        return Err(format!("torn term at offset {record_start}: {err}"));
698    }
699    let mut lsn = [0u8; 8];
700    if let Err(err) = file.read_exact(&mut lsn) {
701        return Err(format!("torn lsn at offset {record_start}: {err}"));
702    }
703    let mut timestamp = [0u8; 8];
704    if let Err(err) = file.read_exact(&mut timestamp) {
705        return Err(format!("torn timestamp at offset {record_start}: {err}"));
706    }
707    let mut len_bytes = [0u8; 4];
708    if let Err(err) = file.read_exact(&mut len_bytes) {
709        return Err(format!(
710            "torn payload length at offset {record_start}: {err}"
711        ));
712    }
713    let payload_len = u32::from_le_bytes(len_bytes) as usize;
714    const MAX_PLAUSIBLE_PAYLOAD: usize = 256 * 1024 * 1024;
715    if payload_len > MAX_PLAUSIBLE_PAYLOAD {
716        return Err(format!(
717            "implausible payload_len {payload_len} at offset {record_start}"
718        ));
719    }
720    let mut payload = vec![0u8; payload_len];
721    if let Err(err) = file.read_exact(&mut payload) {
722        return Err(format!(
723            "torn payload at offset {record_start} (expected {payload_len} bytes): {err}"
724        ));
725    }
726    let mut crc_bytes = [0u8; 4];
727    if let Err(err) = file.read_exact(&mut crc_bytes) {
728        return Err(format!("torn crc at offset {record_start}: {err}"));
729    }
730    let stored_crc = u32::from_le_bytes(crc_bytes);
731    let term = u64::from_le_bytes(term);
732    let lsn = u64::from_le_bytes(lsn);
733    let timestamp = u64::from_le_bytes(timestamp);
734    let expected_crc =
735        compute_logical_v3_crc(LOGICAL_WAL_SPOOL_VERSION_V3, term, lsn, timestamp, &payload);
736    if stored_crc != expected_crc {
737        return Err(format!(
738            "crc mismatch at offset {record_start}: stored {stored_crc:#010x}, expected {expected_crc:#010x}"
739        ));
740    }
741    Ok(LogicalWalEntry {
742        term,
743        lsn,
744        timestamp_ms: timestamp,
745        data: payload,
746    })
747}
748
749/// Read a v2 record assuming the magic + version byte have already
750/// been consumed and the file cursor sits at the LSN field. Returns
751/// `Err(reason)` for any condition that should trigger truncation.
752fn read_one_v2(file: &mut File, record_start: u64) -> Result<LogicalWalEntry, String> {
753    let mut lsn = [0u8; 8];
754    if let Err(err) = file.read_exact(&mut lsn) {
755        return Err(format!("torn lsn at offset {record_start}: {err}"));
756    }
757    let mut timestamp = [0u8; 8];
758    if let Err(err) = file.read_exact(&mut timestamp) {
759        return Err(format!("torn timestamp at offset {record_start}: {err}"));
760    }
761    let mut len_bytes = [0u8; 4];
762    if let Err(err) = file.read_exact(&mut len_bytes) {
763        return Err(format!(
764            "torn payload length at offset {record_start}: {err}"
765        ));
766    }
767    let payload_len = u32::from_le_bytes(len_bytes) as usize;
768    // Sanity guard against a runaway length encoded by a partially-
769    // corrupted header. 256 MiB is well above any plausible single
770    // ChangeRecord and well below memory we'd allocate from a torn
771    // header that happens to look like a real frame.
772    const MAX_PLAUSIBLE_PAYLOAD: usize = 256 * 1024 * 1024;
773    if payload_len > MAX_PLAUSIBLE_PAYLOAD {
774        return Err(format!(
775            "implausible payload_len {payload_len} at offset {record_start}"
776        ));
777    }
778    let mut payload = vec![0u8; payload_len];
779    if let Err(err) = file.read_exact(&mut payload) {
780        return Err(format!(
781            "torn payload at offset {record_start} (expected {payload_len} bytes): {err}"
782        ));
783    }
784    let mut crc_bytes = [0u8; 4];
785    if let Err(err) = file.read_exact(&mut crc_bytes) {
786        return Err(format!("torn crc at offset {record_start}: {err}"));
787    }
788    let stored_crc = u32::from_le_bytes(crc_bytes);
789    let expected_crc = compute_logical_v2_crc(
790        LOGICAL_WAL_SPOOL_VERSION_V2,
791        u64::from_le_bytes(lsn),
792        u64::from_le_bytes(timestamp),
793        &payload,
794    );
795    if stored_crc != expected_crc {
796        return Err(format!(
797            "crc mismatch at offset {record_start}: stored {stored_crc:#010x}, expected {expected_crc:#010x}"
798        ));
799    }
800    let term = term_from_payload(&payload);
801    Ok(LogicalWalEntry {
802        term,
803        lsn: u64::from_le_bytes(lsn),
804        timestamp_ms: u64::from_le_bytes(timestamp),
805        data: payload,
806    })
807}
808
809/// Read a v1 record (legacy, no checksum). Layout after magic+version:
810/// [lsn 8][payload_len 8][payload]. v1 spools were written before
811/// PLAN.md Phase 2 hardened the format; we read them so existing dev
812/// installs don't drop history on upgrade.
813fn read_one_v1(file: &mut File, record_start: u64) -> Result<LogicalWalEntry, String> {
814    let mut lsn = [0u8; 8];
815    if let Err(err) = file.read_exact(&mut lsn) {
816        return Err(format!("v1 torn lsn at offset {record_start}: {err}"));
817    }
818    let mut len_bytes = [0u8; 8];
819    if let Err(err) = file.read_exact(&mut len_bytes) {
820        return Err(format!(
821            "v1 torn payload length at offset {record_start}: {err}"
822        ));
823    }
824    let payload_len = u64::from_le_bytes(len_bytes) as usize;
825    if payload_len > 256 * 1024 * 1024 {
826        return Err(format!(
827            "v1 implausible payload_len {payload_len} at offset {record_start}"
828        ));
829    }
830    let mut payload = vec![0u8; payload_len];
831    if let Err(err) = file.read_exact(&mut payload) {
832        return Err(format!("v1 torn payload at offset {record_start}: {err}"));
833    }
834    let term = term_from_payload(&payload);
835    Ok(LogicalWalEntry {
836        term,
837        lsn: u64::from_le_bytes(lsn),
838        timestamp_ms: 0,
839        data: payload,
840    })
841}
842
843#[derive(Debug, Clone, Copy, PartialEq, Eq)]
844pub enum SlotInvalidationCause {
845    WalRemoved,
846    Horizon,
847    IdleTimeout,
848}
849
850impl SlotInvalidationCause {
851    pub fn as_str(self) -> &'static str {
852        match self {
853            Self::WalRemoved => "wal-removed",
854            Self::Horizon => "horizon",
855            Self::IdleTimeout => "idle-timeout",
856        }
857    }
858
859    fn from_str(value: &str) -> Option<Self> {
860        match value {
861            "wal-removed" => Some(Self::WalRemoved),
862            "horizon" => Some(Self::Horizon),
863            "idle-timeout" => Some(Self::IdleTimeout),
864            _ => None,
865        }
866    }
867}
868
869#[derive(Debug, Clone)]
870pub struct ReplicationSlot {
871    pub id: String,
872    pub restart_lsn: u64,
873    pub confirmed_lsn: u64,
874    pub last_seen_at_unix_ms: u128,
875    pub invalidation_reason: Option<SlotInvalidationCause>,
876    pub invalidated_at_unix_ms: Option<u128>,
877}
878
879fn load_replication_slots(path: Option<&Path>, now_ms: u128) -> BTreeMap<String, ReplicationSlot> {
880    let Some(path) = path else {
881        return BTreeMap::new();
882    };
883    let bytes = match fs::read(path) {
884        Ok(bytes) => bytes,
885        Err(err) if err.kind() == io::ErrorKind::NotFound => return BTreeMap::new(),
886        Err(err) => {
887            warn!(
888                target: "reddb::replication::slots",
889                path = %path.display(),
890                error = %err,
891                "failed to read replication slot store"
892            );
893            return BTreeMap::new();
894        }
895    };
896    match crate::serde_json::from_slice::<crate::serde_json::Value>(&bytes) {
897        Ok(value) => value
898            .get("slots")
899            .and_then(crate::serde_json::Value::as_array)
900            .unwrap_or(&[])
901            .iter()
902            .filter_map(|value| {
903                let object = value.as_object()?;
904                let id = object.get("id")?.as_str()?.to_string();
905                let restart_lsn = object.get("restart_lsn")?.as_u64()?;
906                let confirmed_lsn = object.get("confirmed_lsn")?.as_u64()?;
907                let last_seen_at_unix_ms = object
908                    .get("last_seen_at_unix_ms")
909                    .and_then(crate::serde_json::Value::as_u64)
910                    .map(u128::from)
911                    .unwrap_or(now_ms);
912                let invalidation_reason = object
913                    .get("invalidation_reason")
914                    .and_then(crate::serde_json::Value::as_str)
915                    .and_then(SlotInvalidationCause::from_str);
916                let invalidated_at_unix_ms = object
917                    .get("invalidated_at_unix_ms")
918                    .and_then(crate::serde_json::Value::as_u64)
919                    .map(u128::from);
920                Some((
921                    id.clone(),
922                    ReplicationSlot {
923                        id,
924                        restart_lsn,
925                        confirmed_lsn,
926                        last_seen_at_unix_ms,
927                        invalidation_reason,
928                        invalidated_at_unix_ms,
929                    },
930                ))
931            })
932            .collect(),
933        Err(err) => {
934            warn!(
935                target: "reddb::replication::slots",
936                path = %path.display(),
937                error = %err,
938                "failed to decode replication slot store"
939            );
940            BTreeMap::new()
941        }
942    }
943}
944
945fn persist_replication_slots(
946    path: Option<&Path>,
947    slots: &BTreeMap<String, ReplicationSlot>,
948) -> io::Result<()> {
949    let Some(path) = path else {
950        return Ok(());
951    };
952    if let Some(parent) = path.parent() {
953        fs::create_dir_all(parent)?;
954    }
955    let temp_path = path.with_extension("logical.slots.tmp");
956    let slots_json = slots
957        .values()
958        .map(|slot| {
959            let mut object = crate::serde_json::Map::new();
960            object.insert(
961                "id".to_string(),
962                crate::serde_json::Value::String(slot.id.clone()),
963            );
964            object.insert(
965                "restart_lsn".to_string(),
966                crate::serde_json::Value::Number(slot.restart_lsn as f64),
967            );
968            object.insert(
969                "confirmed_lsn".to_string(),
970                crate::serde_json::Value::Number(slot.confirmed_lsn as f64),
971            );
972            object.insert(
973                "last_seen_at_unix_ms".to_string(),
974                crate::serde_json::Value::Number(slot.last_seen_at_unix_ms as f64),
975            );
976            if let Some(reason) = slot.invalidation_reason {
977                object.insert(
978                    "invalidation_reason".to_string(),
979                    crate::serde_json::Value::String(reason.as_str().to_string()),
980                );
981            }
982            if let Some(invalidated_at) = slot.invalidated_at_unix_ms {
983                object.insert(
984                    "invalidated_at_unix_ms".to_string(),
985                    crate::serde_json::Value::Number(invalidated_at as f64),
986                );
987            }
988            crate::serde_json::Value::Object(object)
989        })
990        .collect();
991    let mut root = crate::serde_json::Map::new();
992    root.insert(
993        "slots".to_string(),
994        crate::serde_json::Value::Array(slots_json),
995    );
996    let value = crate::serde_json::Value::Object(root);
997    let bytes = crate::serde_json::to_string_pretty(&value)
998        .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
999    let mut temp = File::create(&temp_path)?;
1000    temp.write_all(bytes.as_bytes())?;
1001    temp.sync_all()?;
1002    fs::rename(&temp_path, path)?;
1003    Ok(())
1004}
1005
1006/// State of a connected replica. PLAN.md Phase 11.4 fields:
1007/// `last_seen_at_unix_ms` updates on every interaction (pull or ack);
1008/// `last_sent_lsn` updates when the primary serves a `pull_wal_records`
1009/// batch; `last_durable_lsn` updates when the replica reports its WAL
1010/// is durably written via `ack_replica_lsn`.
1011#[derive(Debug, Clone)]
1012pub struct ReplicaState {
1013    pub id: String,
1014    pub last_acked_lsn: u64,
1015    pub last_sent_lsn: u64,
1016    pub last_durable_lsn: u64,
1017    pub apply_error_count: u64,
1018    pub divergence_count: u64,
1019    pub connected_at_unix_ms: u128,
1020    pub last_seen_at_unix_ms: u128,
1021    /// Region identifier declared by the replica at handshake time
1022    /// (Phase 2.6 multi-region PG parity). `None` until the replica
1023    /// handshake extension lands in 2.6.2; the quorum coordinator's
1024    /// region-binding map covers the in-process case meanwhile.
1025    pub region: Option<String>,
1026    /// `true` while this replica is re-bootstrapping — loading a fresh
1027    /// snapshot to replace its current dataset (issue #837). It keeps
1028    /// serving non-causal reads from the old data, but the advertiser
1029    /// surfaces this flag so a causal reader routes bookmark reads
1030    /// elsewhere: the replica's `last_acked_lsn` describes data it is
1031    /// about to discard. Cleared atomically when the swap completes.
1032    pub rebootstrapping: bool,
1033}
1034
1035/// Primary-side replication progress derived from the replica registry.
1036#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1037pub struct ReplicationProgress {
1038    pub lag_lsn: u64,
1039    pub safe_replay_lsn: u64,
1040}
1041
1042impl ReplicationProgress {
1043    pub fn from_replicas(replicas: &[ReplicaState]) -> Option<Self> {
1044        let max_sent_lsn = replicas.iter().map(|replica| replica.last_sent_lsn).max()?;
1045        let min_acked_lsn = replicas
1046            .iter()
1047            .map(|replica| replica.last_acked_lsn)
1048            .min()?;
1049        let safe_replay_lsn = replicas
1050            .iter()
1051            .map(|replica| replica.last_durable_lsn)
1052            .min()?;
1053
1054        Some(Self {
1055            lag_lsn: max_sent_lsn.saturating_sub(min_acked_lsn),
1056            safe_replay_lsn,
1057        })
1058    }
1059}
1060
1061/// Primary replication manager.
1062pub struct PrimaryReplication {
1063    pub wal_buffer: Arc<WalBuffer>,
1064    pub logical_wal_spool: Option<Arc<LogicalWalSpool>>,
1065    pub replicas: RwLock<Vec<ReplicaState>>,
1066    wal_appended: (Mutex<u64>, Condvar),
1067    slot_path: Option<PathBuf>,
1068    slots: RwLock<BTreeMap<String, ReplicationSlot>>,
1069    slot_retention_max_lag_lsn: u64,
1070    slot_idle_timeout_ms: u64,
1071    /// PLAN.md Phase 11.4 — ack-driven commit synchronization. Always
1072    /// allocated so the policy enum can flip from `Local` to
1073    /// `AckN`/`Quorum` without touching this struct's shape.
1074    pub commit_waiter: Arc<crate::replication::commit_waiter::CommitWaiter>,
1075    /// Monotonic registry-change counter consumed by the
1076    /// `TopologyAdvertiser` (issue #167). Bumps on register,
1077    /// unregister, and the periodic health sweep when a replica
1078    /// flips between healthy/unhealthy. Clients use the epoch to
1079    /// detect stale advertisements without comparing the full
1080    /// replica list element-wise.
1081    topology_epoch: std::sync::atomic::AtomicU64,
1082    /// Count of pulls served as a partial resync — a replica resuming
1083    /// incrementally from its retained slot position rather than
1084    /// triggering a full re-bootstrap (issue #832). Surfaced as a
1085    /// replication metric so a brief disconnect that recovers via
1086    /// partial resync is observable.
1087    partial_resync_count: std::sync::atomic::AtomicU64,
1088}
1089
1090/// How a replica's pull should be served, decided from its slot state.
1091#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1092pub enum ResumeMode {
1093    /// Resume incrementally from `resume_lsn` (the replica's slot
1094    /// position, never behind it). The retained WAL still covers the
1095    /// gap, so a brief disconnect costs only a partial resync.
1096    PartialResync { resume_lsn: u64 },
1097    /// The slot is past the retention cap (or otherwise invalidated);
1098    /// the replica must discard and re-bootstrap from a fresh snapshot.
1099    FullRebootstrap { cause: SlotInvalidationCause },
1100}
1101
1102impl PrimaryReplication {
1103    pub fn slot_path_for(data_path: &Path) -> PathBuf {
1104        let file_name = data_path
1105            .file_name()
1106            .and_then(|name| name.to_str())
1107            .unwrap_or("reddb.rdb");
1108        let slot_name = format!("{file_name}.logical.slots.json");
1109        match data_path.parent() {
1110            Some(parent) => parent.join(slot_name),
1111            None => PathBuf::from(slot_name),
1112        }
1113    }
1114
1115    pub fn new(data_path: Option<&Path>) -> Self {
1116        Self::new_with_config(data_path, &crate::replication::ReplicationConfig::primary())
1117    }
1118
1119    pub fn new_with_config(
1120        data_path: Option<&Path>,
1121        config: &crate::replication::ReplicationConfig,
1122    ) -> Self {
1123        let now_ms = crate::utils::now_unix_millis() as u128;
1124        let slot_path = data_path.map(Self::slot_path_for);
1125        let slots = load_replication_slots(slot_path.as_deref(), now_ms);
1126        let logical_wal_spool = data_path
1127            .and_then(|path| LogicalWalSpool::open(path).ok())
1128            .map(Arc::new);
1129        let current_lsn = logical_wal_spool
1130            .as_ref()
1131            .map(|spool| spool.current_lsn())
1132            .unwrap_or(0);
1133        Self {
1134            wal_buffer: Arc::new(WalBuffer::new(100_000)),
1135            logical_wal_spool,
1136            replicas: RwLock::new(Vec::new()),
1137            wal_appended: (Mutex::new(current_lsn), Condvar::new()),
1138            slot_path,
1139            slots: RwLock::new(slots),
1140            slot_retention_max_lag_lsn: config.slot_retention_max_lag_lsn,
1141            slot_idle_timeout_ms: config.slot_idle_timeout_ms,
1142            commit_waiter: Arc::new(crate::replication::commit_waiter::CommitWaiter::new()),
1143            topology_epoch: std::sync::atomic::AtomicU64::new(0),
1144            partial_resync_count: std::sync::atomic::AtomicU64::new(0),
1145        }
1146    }
1147
1148    pub fn append_logical_record(&self, lsn: u64, encoded: Vec<u8>) {
1149        self.wal_buffer.append(lsn, encoded.clone());
1150        if let Some(spool) = &self.logical_wal_spool {
1151            let _ = spool.append(lsn, &encoded);
1152        }
1153        let (lock, cvar) = &self.wal_appended;
1154        let mut latest = lock.lock().unwrap_or_else(|e| e.into_inner());
1155        *latest = (*latest).max(lsn);
1156        cvar.notify_all();
1157    }
1158
1159    pub fn wait_for_logical_lsn_after(&self, since_lsn: u64, timeout: Duration) -> bool {
1160        if self.current_logical_lsn() > since_lsn {
1161            return true;
1162        }
1163        let deadline = Instant::now() + timeout;
1164        let (lock, cvar) = &self.wal_appended;
1165        let mut latest = lock.lock().unwrap_or_else(|e| e.into_inner());
1166        while *latest <= since_lsn {
1167            let now = Instant::now();
1168            if now >= deadline {
1169                return false;
1170            }
1171            let remaining = deadline.saturating_duration_since(now);
1172            let (guard, result) = cvar
1173                .wait_timeout(latest, remaining)
1174                .unwrap_or_else(|e| e.into_inner());
1175            latest = guard;
1176            if result.timed_out() && *latest <= since_lsn {
1177                return false;
1178            }
1179        }
1180        true
1181    }
1182
1183    pub fn register_replica(&self, id: String) -> u64 {
1184        self.register_replica_with_region(id, None)
1185    }
1186
1187    /// Register a replica with an explicit region tag (Phase 2.6 multi-region).
1188    ///
1189    /// Preferred when the replica handshake declares a region — the quorum
1190    /// coordinator uses this field to decide whether the replica counts
1191    /// toward a `QuorumMode::Regions` commit.
1192    ///
1193    /// Idempotent on reconnect (issue #812): if a replica with `id` is
1194    /// already registered, the existing entry is *updated in place* rather
1195    /// than duplicated — progress LSNs (`last_acked_lsn`, `last_sent_lsn`,
1196    /// `last_durable_lsn`) are preserved so a reconnecting replica is not
1197    /// rewound, only `last_seen_at_unix_ms` is refreshed (and `region` when
1198    /// a non-`None` value is supplied). A re-registration is not a
1199    /// registry-shape change, so it does **not** bump the topology epoch.
1200    /// Returns the slot `restart_lsn` the replica should resume streaming from:
1201    /// the current WAL LSN for a fresh registration, or the durable slot
1202    /// restart point for a reconnect.
1203    pub fn register_replica_with_region(&self, id: String, region: Option<String>) -> u64 {
1204        let now_ms = crate::utils::now_unix_millis() as u128;
1205        let resume_lsn = self.ensure_slot(&id, self.current_logical_lsn());
1206        let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
1207        if let Some(existing) = replicas.iter_mut().find(|r| r.id == id) {
1208            existing.last_seen_at_unix_ms = now_ms;
1209            if region.is_some() {
1210                existing.region = region;
1211            }
1212            return resume_lsn;
1213        }
1214        replicas.push(ReplicaState {
1215            id,
1216            last_acked_lsn: resume_lsn,
1217            last_sent_lsn: resume_lsn,
1218            last_durable_lsn: resume_lsn,
1219            apply_error_count: 0,
1220            divergence_count: 0,
1221            connected_at_unix_ms: now_ms,
1222            last_seen_at_unix_ms: now_ms,
1223            region,
1224            rebootstrapping: false,
1225        });
1226        drop(replicas);
1227        self.bump_topology_epoch();
1228        resume_lsn
1229    }
1230
1231    /// Mark (or clear) a replica's re-bootstrap state (issue #837).
1232    ///
1233    /// While `rebootstrapping` is `true` the replica keeps serving
1234    /// non-causal reads from its existing data, but the advertiser
1235    /// surfaces the flag so causal (bookmark) reads route to a
1236    /// caught-up peer instead — the rebuilding replica's applied
1237    /// frontier describes data it is about to discard. The primary
1238    /// flips this back to `false` when the replica reports its atomic
1239    /// snapshot swap complete.
1240    ///
1241    /// A change to the flag is a registry-shape change for routing
1242    /// purposes, so it bumps the topology epoch to force consumers to
1243    /// re-read the advertisement. Returns `true` when a replica with
1244    /// `id` was present and updated.
1245    pub fn set_replica_rebootstrapping(&self, id: &str, rebootstrapping: bool) -> bool {
1246        let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
1247        let Some(state) = replicas.iter_mut().find(|r| r.id == id) else {
1248            return false;
1249        };
1250        if state.rebootstrapping == rebootstrapping {
1251            return true;
1252        }
1253        state.rebootstrapping = rebootstrapping;
1254        drop(replicas);
1255        self.bump_topology_epoch();
1256        true
1257    }
1258
1259    /// Ensure a replica identifying itself with `id` is present in the
1260    /// registry (issue #812). This is the production self-registration hook
1261    /// used by the `pull_wal_records` path: the first time a replica sends
1262    /// its `replica_id` on a pull, the primary registers it so it is no
1263    /// longer blind to that replica's existence; subsequent pulls are
1264    /// idempotent no-ops. Returns `true` when a new registration was
1265    /// created. Delegates to `register_replica_with_region`, so reconnects
1266    /// preserve progress and do not bump the topology epoch.
1267    pub fn ensure_replica_registered(&self, id: &str) -> bool {
1268        let already = self
1269            .replicas
1270            .read()
1271            .unwrap_or_else(|e| e.into_inner())
1272            .iter()
1273            .any(|r| r.id == id);
1274        if already {
1275            return false;
1276        }
1277        self.register_replica(id.to_string());
1278        true
1279    }
1280
1281    /// Unregister a replica by id. Returns `true` when the replica
1282    /// was present (and removed). Bumps the topology epoch so a
1283    /// pending advertisement reflects the new fleet size.
1284    pub fn unregister_replica(&self, id: &str) -> bool {
1285        let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
1286        let before = replicas.len();
1287        replicas.retain(|r| r.id != id);
1288        let removed = replicas.len() != before;
1289        drop(replicas);
1290        if removed {
1291            self.commit_waiter.drop_replica(id);
1292            self.bump_topology_epoch();
1293        }
1294        removed
1295    }
1296
1297    /// Current topology epoch. Strictly monotonic, bumps on every
1298    /// registry-shape change consumed by `TopologyAdvertiser`.
1299    pub fn topology_epoch(&self) -> u64 {
1300        self.topology_epoch
1301            .load(std::sync::atomic::Ordering::Relaxed)
1302    }
1303
1304    /// Advance the topology epoch. Call sites: register, unregister,
1305    /// and the health-sweep tick that flips a replica between
1306    /// healthy/unhealthy. Wrapping is not a concern in practice
1307    /// (`u64::MAX` events would take centuries at any realistic ack
1308    /// rate) but `fetch_add` saturates implicitly via wrap-around;
1309    /// the consumer treats epoch as opaque so a wrap is still
1310    /// strictly "different" from the previous value.
1311    pub fn bump_topology_epoch(&self) {
1312        self.topology_epoch
1313            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1314    }
1315
1316    pub fn ack_replica(&self, id: &str, lsn: u64) {
1317        let now_ms = crate::utils::now_unix_millis() as u128;
1318        let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
1319        if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
1320            r.last_acked_lsn = r.last_acked_lsn.max(lsn);
1321            r.last_durable_lsn = r.last_durable_lsn.max(lsn);
1322            r.last_seen_at_unix_ms = now_ms;
1323        }
1324        drop(replicas);
1325        self.commit_waiter.record_replica_ack(id, lsn);
1326    }
1327
1328    /// PLAN.md Phase 11.4 — replica reports applied + durable LSN
1329    /// after persisting a batch. Idempotent: only advances LSNs
1330    /// monotonically. `last_seen_at_unix_ms` always refreshes.
1331    /// Also signals `commit_waiter` so any thread blocked on
1332    /// `ack_n` / `quorum` can wake and re-check its threshold.
1333    pub fn ack_replica_lsn(&self, id: &str, applied_lsn: u64, durable_lsn: u64) {
1334        self.ack_replica_lsn_with_observability(id, applied_lsn, durable_lsn, 0, 0);
1335    }
1336
1337    pub fn ack_replica_lsn_with_observability(
1338        &self,
1339        id: &str,
1340        applied_lsn: u64,
1341        durable_lsn: u64,
1342        apply_error_count: u64,
1343        divergence_count: u64,
1344    ) {
1345        let now_ms = crate::utils::now_unix_millis() as u128;
1346        self.advance_slot(id, applied_lsn, durable_lsn, now_ms);
1347        let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
1348        if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
1349            r.last_acked_lsn = r.last_acked_lsn.max(applied_lsn);
1350            r.last_durable_lsn = r.last_durable_lsn.max(durable_lsn);
1351            r.apply_error_count = r.apply_error_count.max(apply_error_count);
1352            r.divergence_count = r.divergence_count.max(divergence_count);
1353            r.last_seen_at_unix_ms = now_ms;
1354        }
1355        // Drop the write lock before signaling so a waiter that
1356        // wakes immediately can read replica state without
1357        // contending against us.
1358        drop(replicas);
1359        self.commit_waiter.record_replica_ack(id, durable_lsn);
1360    }
1361
1362    /// PLAN.md Phase 11.4 — primary records the LSN it last sent to a
1363    /// replica via pull_wal_records. Helpful for `lag_records =
1364    /// last_sent_lsn - last_acked_lsn` to distinguish pull-side delay
1365    /// from apply-side delay.
1366    pub fn note_replica_pull(&self, id: &str, last_sent_lsn: u64) {
1367        let now_ms = crate::utils::now_unix_millis() as u128;
1368        self.touch_slot(id, now_ms);
1369        let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
1370        if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
1371            r.last_sent_lsn = r.last_sent_lsn.max(last_sent_lsn);
1372            r.last_seen_at_unix_ms = now_ms;
1373        }
1374    }
1375
1376    /// Snapshot of all currently registered replicas, for /metrics +
1377    /// /admin/status. Returns owned clones so callers don't hold the
1378    /// lock during serialization.
1379    pub fn replica_snapshots(&self) -> Vec<ReplicaState> {
1380        self.replicas
1381            .read()
1382            .unwrap_or_else(|e| e.into_inner())
1383            .clone()
1384    }
1385
1386    pub fn replication_progress(&self) -> Option<ReplicationProgress> {
1387        let replicas = self.replicas.read().unwrap_or_else(|e| e.into_inner());
1388        ReplicationProgress::from_replicas(&replicas)
1389    }
1390
1391    pub fn slot_snapshots(&self) -> Vec<ReplicationSlot> {
1392        self.slots
1393            .read()
1394            .unwrap_or_else(|e| e.into_inner())
1395            .values()
1396            .cloned()
1397            .collect()
1398    }
1399
1400    pub fn retention_floor_lsn(&self) -> Option<u64> {
1401        self.slots
1402            .read()
1403            .unwrap_or_else(|e| e.into_inner())
1404            .values()
1405            .filter(|slot| slot.invalidation_reason.is_none())
1406            .map(|slot| slot.restart_lsn)
1407            .min()
1408    }
1409
1410    pub fn prune_retained_wal_through(&self, archived_lsn: u64) -> io::Result<u64> {
1411        self.enforce_retention_limits(crate::utils::now_unix_millis() as u128);
1412        let prune_lsn = self
1413            .retention_floor_lsn()
1414            .map(|floor| floor.min(archived_lsn))
1415            .unwrap_or(archived_lsn);
1416        if prune_lsn > 0 {
1417            if let Some(spool) = &self.logical_wal_spool {
1418                spool.prune_through(prune_lsn)?;
1419            }
1420            self.wal_buffer.prune_through(prune_lsn);
1421        }
1422        Ok(prune_lsn)
1423    }
1424
1425    pub fn replica_count(&self) -> usize {
1426        self.replicas
1427            .read()
1428            .unwrap_or_else(|e| e.into_inner())
1429            .len()
1430    }
1431
1432    /// Current primary write position (logical WAL LSN, falling back to
1433    /// the in-memory WAL buffer). Used as the reference point for
1434    /// per-replica lag — including issue #826 flow control.
1435    pub fn current_logical_lsn(&self) -> u64 {
1436        self.logical_wal_spool
1437            .as_ref()
1438            .map(|spool| spool.current_lsn())
1439            .unwrap_or_else(|| self.wal_buffer.current_lsn())
1440    }
1441
1442    fn ensure_slot(&self, id: &str, initial_lsn: u64) -> u64 {
1443        let now_ms = crate::utils::now_unix_millis() as u128;
1444        let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
1445        if let Some(slot) = slots.get_mut(id) {
1446            slot.last_seen_at_unix_ms = now_ms;
1447            let restart_lsn = slot.restart_lsn;
1448            self.persist_slots_locked(&slots);
1449            return restart_lsn;
1450        }
1451        slots.insert(
1452            id.to_string(),
1453            ReplicationSlot {
1454                id: id.to_string(),
1455                restart_lsn: initial_lsn,
1456                confirmed_lsn: initial_lsn,
1457                last_seen_at_unix_ms: now_ms,
1458                invalidation_reason: None,
1459                invalidated_at_unix_ms: None,
1460            },
1461        );
1462        let restart_lsn = initial_lsn;
1463        self.persist_slots_locked(&slots);
1464        restart_lsn
1465    }
1466
1467    fn advance_slot(&self, id: &str, confirmed_lsn: u64, restart_lsn: u64, now_ms: u128) {
1468        let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
1469        let slot = slots
1470            .entry(id.to_string())
1471            .or_insert_with(|| ReplicationSlot {
1472                id: id.to_string(),
1473                restart_lsn: 0,
1474                confirmed_lsn: 0,
1475                last_seen_at_unix_ms: now_ms,
1476                invalidation_reason: None,
1477                invalidated_at_unix_ms: None,
1478            });
1479        if slot.invalidation_reason.is_some() {
1480            return;
1481        }
1482        slot.confirmed_lsn = slot.confirmed_lsn.max(confirmed_lsn).max(restart_lsn);
1483        slot.restart_lsn = slot.restart_lsn.max(restart_lsn);
1484        slot.last_seen_at_unix_ms = now_ms;
1485        self.persist_slots_locked(&slots);
1486    }
1487
1488    pub fn touch_slot(&self, id: &str, now_ms: u128) {
1489        let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
1490        let mut changed = false;
1491        if let Some(slot) = slots.get_mut(id) {
1492            if slot.invalidation_reason.is_none() {
1493                slot.last_seen_at_unix_ms = now_ms;
1494                changed = true;
1495            }
1496        }
1497        if changed {
1498            self.persist_slots_locked(&slots);
1499        }
1500    }
1501
1502    pub fn enforce_retention_limits(&self, now_ms: u128) -> Vec<(String, SlotInvalidationCause)> {
1503        let current_lsn = self.current_logical_lsn();
1504        let mut invalidated = Vec::new();
1505        let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
1506        for slot in slots.values_mut() {
1507            if slot.invalidation_reason.is_some() {
1508                continue;
1509            }
1510            let reason = if self.slot_retention_max_lag_lsn > 0
1511                && current_lsn.saturating_sub(slot.restart_lsn) > self.slot_retention_max_lag_lsn
1512            {
1513                Some(SlotInvalidationCause::Horizon)
1514            } else if self.slot_idle_timeout_ms > 0
1515                && now_ms.saturating_sub(slot.last_seen_at_unix_ms)
1516                    > u128::from(self.slot_idle_timeout_ms)
1517            {
1518                Some(SlotInvalidationCause::IdleTimeout)
1519            } else {
1520                None
1521            };
1522            if let Some(reason) = reason {
1523                slot.invalidation_reason = Some(reason);
1524                slot.invalidated_at_unix_ms = Some(now_ms);
1525                invalidated.push((slot.id.clone(), reason));
1526            }
1527        }
1528        if !invalidated.is_empty() {
1529            self.persist_slots_locked(&slots);
1530        }
1531        invalidated
1532    }
1533
1534    pub fn slot_rebootstrap_reason(
1535        &self,
1536        id: &str,
1537        requested_since_lsn: u64,
1538        oldest_available_lsn: Option<u64>,
1539    ) -> Option<SlotInvalidationCause> {
1540        let now_ms = crate::utils::now_unix_millis() as u128;
1541        let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
1542        let slot = slots.get_mut(id)?;
1543        if let Some(reason) = slot.invalidation_reason {
1544            return Some(reason);
1545        }
1546        let slot_floor = slot.restart_lsn.max(requested_since_lsn);
1547        if oldest_available_lsn
1548            .map(|oldest| oldest > slot_floor.saturating_add(1))
1549            .unwrap_or(false)
1550        {
1551            slot.invalidation_reason = Some(SlotInvalidationCause::WalRemoved);
1552            slot.invalidated_at_unix_ms = Some(now_ms);
1553            self.persist_slots_locked(&slots);
1554            return Some(SlotInvalidationCause::WalRemoved);
1555        }
1556        None
1557    }
1558
1559    /// Decide how a reconnecting replica's pull should be served
1560    /// (issue #832). If the slot is invalidated or the requested
1561    /// position has fallen behind the retained WAL floor, the replica
1562    /// must re-bootstrap; otherwise it resumes via a partial resync
1563    /// from its slot position (never rewound behind it). Every
1564    /// partial-resync decision bumps the `partial_resync_count` metric
1565    /// so a brief disconnect that recovers without a full re-bootstrap
1566    /// is observable.
1567    pub fn plan_replica_resume(
1568        &self,
1569        id: &str,
1570        requested_since_lsn: u64,
1571        oldest_available_lsn: Option<u64>,
1572    ) -> ResumeMode {
1573        if let Some(cause) =
1574            self.slot_rebootstrap_reason(id, requested_since_lsn, oldest_available_lsn)
1575        {
1576            return ResumeMode::FullRebootstrap { cause };
1577        }
1578        let resume_lsn = self
1579            .slot_snapshots()
1580            .into_iter()
1581            .find(|slot| slot.id == id)
1582            .map(|slot| requested_since_lsn.max(slot.restart_lsn))
1583            .unwrap_or(requested_since_lsn);
1584        self.partial_resync_count
1585            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1586        ResumeMode::PartialResync { resume_lsn }
1587    }
1588
1589    /// Number of pulls served as a partial resync since process start.
1590    /// Surfaced in the replication metrics/status payload (issue #832).
1591    pub fn partial_resync_count(&self) -> u64 {
1592        self.partial_resync_count
1593            .load(std::sync::atomic::Ordering::Relaxed)
1594    }
1595
1596    fn persist_slots_locked(&self, slots: &BTreeMap<String, ReplicationSlot>) {
1597        if let Err(err) = persist_replication_slots(self.slot_path.as_deref(), slots) {
1598            warn!(
1599                target: "reddb::replication::slots",
1600                error = %err,
1601                "failed to persist replication slots"
1602            );
1603        }
1604    }
1605}
1606
1607#[cfg(test)]
1608mod tests {
1609    use super::*;
1610    use crate::replication::cdc::{ChangeOperation, ChangeRecord};
1611    use std::time::{SystemTime, UNIX_EPOCH};
1612
1613    fn temp_data_path(name: &str) -> PathBuf {
1614        let suffix = SystemTime::now()
1615            .duration_since(UNIX_EPOCH)
1616            .unwrap()
1617            .as_nanos();
1618        std::env::temp_dir().join(format!("reddb_{name}_{suffix}.rdb"))
1619    }
1620
1621    #[test]
1622    fn logical_wal_spool_roundtrip_and_prune() {
1623        let data_path = temp_data_path("logical_spool");
1624        let spool_path = LogicalWalSpool::path_for(&data_path);
1625        let spool = LogicalWalSpool::open(&data_path).expect("open spool");
1626
1627        let record1 = ChangeRecord {
1628            term: 2,
1629            lsn: 7,
1630            timestamp: 1,
1631            operation: ChangeOperation::Insert,
1632            collection: "users".to_string(),
1633            entity_id: 10,
1634            entity_kind: "row".to_string(),
1635            entity_bytes: Some(vec![1, 2, 3]),
1636            metadata: None,
1637            refresh_records: None,
1638        };
1639        let record2 = ChangeRecord {
1640            term: 2,
1641            lsn: 8,
1642            timestamp: 2,
1643            operation: ChangeOperation::Update,
1644            collection: "users".to_string(),
1645            entity_id: 10,
1646            entity_kind: "row".to_string(),
1647            entity_bytes: Some(vec![4, 5, 6]),
1648            metadata: None,
1649            refresh_records: None,
1650        };
1651
1652        spool
1653            .append_with_term_and_timestamp(record1.term, record1.lsn, 11, &record1.encode())
1654            .expect("append 1");
1655        spool
1656            .append_with_term_and_timestamp(record2.term, record2.lsn, 12, &record2.encode())
1657            .expect("append 2");
1658
1659        let entries = spool.read_since(0, usize::MAX).expect("read");
1660        assert_eq!(entries.len(), 2);
1661        assert_eq!(entries[0].0, 7);
1662        assert_eq!(entries[1].0, 8);
1663        assert_eq!(ChangeRecord::decode(&entries[0].1).unwrap().term, 2);
1664
1665        let framed = read_and_repair_entries(&spool_path).expect("read framed entries");
1666        assert_eq!(framed[0].term, 2);
1667        assert_eq!(framed[0].timestamp_ms, 11);
1668
1669        spool.prune_through(7).expect("prune");
1670        let retained = spool.read_since(0, usize::MAX).expect("read retained");
1671        assert_eq!(retained.len(), 1);
1672        assert_eq!(retained[0].0, 8);
1673        assert_eq!(ChangeRecord::decode(&retained[0].1).unwrap().term, 2);
1674
1675        let _ = fs::remove_file(spool_path);
1676    }
1677
1678    #[test]
1679    fn logical_wal_spool_reads_v2_without_term() {
1680        let data_path = temp_data_path("logical_spool_v2");
1681        let spool_path = LogicalWalSpool::path_for(&data_path);
1682        let payload = br#"{"lsn":3,"timestamp":44,"operation":"delete","collection":"users","rid":9,"kind":"row"}"#;
1683        let lsn = 3u64;
1684        let timestamp = 44u64;
1685        let crc = compute_logical_v2_crc(LOGICAL_WAL_SPOOL_VERSION_V2, lsn, timestamp, payload);
1686
1687        let mut file = File::create(&spool_path).expect("create v2 spool");
1688        file.write_all(LOGICAL_WAL_SPOOL_MAGIC).unwrap();
1689        file.write_all(&[LOGICAL_WAL_SPOOL_VERSION_V2]).unwrap();
1690        file.write_all(&lsn.to_le_bytes()).unwrap();
1691        file.write_all(&timestamp.to_le_bytes()).unwrap();
1692        file.write_all(&(payload.len() as u32).to_le_bytes())
1693            .unwrap();
1694        file.write_all(payload).unwrap();
1695        file.write_all(&crc.to_le_bytes()).unwrap();
1696        file.sync_all().unwrap();
1697
1698        let spool = LogicalWalSpool::open(&data_path).expect("open v2 spool");
1699        let records = spool.read_since(0, usize::MAX).expect("read v2 spool");
1700        assert_eq!(records.len(), 1);
1701        assert_eq!(records[0].0, 3);
1702        let decoded = ChangeRecord::decode(&records[0].1).expect("decode v2 payload");
1703        assert_eq!(decoded.term, crate::replication::DEFAULT_REPLICATION_TERM);
1704        assert_eq!(decoded.lsn, 3);
1705
1706        let framed = read_and_repair_entries(&spool_path).expect("read framed v2 entries");
1707        assert_eq!(framed[0].term, crate::replication::DEFAULT_REPLICATION_TERM);
1708
1709        let _ = fs::remove_file(spool_path);
1710    }
1711
1712    #[test]
1713    fn topology_epoch_monotonic_on_register_and_unregister() {
1714        // Issue #167 acceptance: the epoch consumed by
1715        // TopologyAdvertiser is strictly monotonic across registry
1716        // shape changes. Pin it here so a future refactor doesn't
1717        // accidentally swallow the bump.
1718        let primary = PrimaryReplication::new(None);
1719        let e0 = primary.topology_epoch();
1720        primary.register_replica("r1".to_string());
1721        let e1 = primary.topology_epoch();
1722        primary.register_replica("r2".to_string());
1723        let e2 = primary.topology_epoch();
1724        assert!(e1 > e0, "register must bump epoch ({e0} -> {e1})");
1725        assert!(e2 > e1, "second register must bump epoch ({e1} -> {e2})");
1726
1727        let removed = primary.unregister_replica("r1");
1728        assert!(removed);
1729        let e3 = primary.topology_epoch();
1730        assert!(e3 > e2, "unregister must bump epoch ({e2} -> {e3})");
1731
1732        // Unknown id is a no-op and does not bump the epoch — keep
1733        // the monotonicity tied to actual registry shape changes.
1734        let absent = primary.unregister_replica("ghost");
1735        assert!(!absent);
1736        assert_eq!(
1737            primary.topology_epoch(),
1738            e3,
1739            "unregistering a missing replica must not bump the epoch"
1740        );
1741    }
1742
1743    #[test]
1744    fn register_replica_is_idempotent_on_reconnect() {
1745        // Issue #812 acceptance: registration is the production foundation
1746        // for per-replica progress tracking. A reconnect must update the
1747        // existing registry entry rather than create a duplicate or rewind
1748        // progress. Uses the `None` data-path fake — no engine boot.
1749        let primary = PrimaryReplication::new(None);
1750
1751        // First registration creates exactly one entry and bumps the epoch.
1752        primary.register_replica("r1".to_string());
1753        assert_eq!(
1754            primary.replica_count(),
1755            1,
1756            "first register creates an entry"
1757        );
1758        let epoch_after_first = primary.topology_epoch();
1759
1760        // Advance the replica's progress as a real pull/ack would.
1761        primary.note_replica_pull("r1", 42);
1762        primary.ack_replica_lsn("r1", 40, 40);
1763        let before = primary
1764            .replica_snapshots()
1765            .into_iter()
1766            .find(|r| r.id == "r1")
1767            .expect("r1 present");
1768        assert_eq!(before.last_sent_lsn, 42);
1769        assert_eq!(before.last_acked_lsn, 40);
1770        assert_eq!(before.last_durable_lsn, 40);
1771
1772        // Reconnect: re-register the same id.
1773        let resume_lsn = primary.register_replica("r1".to_string());
1774
1775        // No duplicate entry.
1776        assert_eq!(
1777            primary.replica_count(),
1778            1,
1779            "reconnect must not create a duplicate registry entry"
1780        );
1781        // Re-registration is not a registry-shape change — epoch is untouched.
1782        assert_eq!(
1783            primary.topology_epoch(),
1784            epoch_after_first,
1785            "reconnect must not bump the topology epoch"
1786        );
1787        // Progress preserved, not rewound to the current WAL LSN.
1788        let after = primary
1789            .replica_snapshots()
1790            .into_iter()
1791            .find(|r| r.id == "r1")
1792            .expect("r1 still present");
1793        assert_eq!(after.last_sent_lsn, 42, "last_sent_lsn preserved");
1794        assert_eq!(after.last_acked_lsn, 40, "last_acked_lsn preserved");
1795        assert_eq!(after.last_durable_lsn, 40, "last_durable_lsn preserved");
1796        // Reconnect returns the slot restart point, not the last sent LSN.
1797        assert_eq!(resume_lsn, 40, "reconnect returns the slot restart LSN");
1798    }
1799
1800    #[test]
1801    fn replica_slot_persists_and_reconnect_resumes_from_restart_lsn() {
1802        let data_path = temp_data_path("replication_slots");
1803        let spool_path = LogicalWalSpool::path_for(&data_path);
1804        let slot_path = PrimaryReplication::slot_path_for(&data_path);
1805
1806        {
1807            let primary = PrimaryReplication::new(Some(&data_path));
1808            primary.register_replica("r1".to_string());
1809            primary.note_replica_pull("r1", 12);
1810            primary.ack_replica_lsn("r1", 10, 8);
1811
1812            let slot = primary
1813                .slot_snapshots()
1814                .into_iter()
1815                .find(|slot| slot.id == "r1")
1816                .expect("r1 slot present");
1817            assert_eq!(slot.restart_lsn, 8);
1818            assert_eq!(slot.confirmed_lsn, 10);
1819        }
1820
1821        let reopened = PrimaryReplication::new(Some(&data_path));
1822        let slot = reopened
1823            .slot_snapshots()
1824            .into_iter()
1825            .find(|slot| slot.id == "r1")
1826            .expect("r1 slot loaded after reopen");
1827        assert_eq!(slot.restart_lsn, 8);
1828        assert_eq!(slot.confirmed_lsn, 10);
1829        assert_eq!(
1830            reopened.register_replica("r1".to_string()),
1831            8,
1832            "reconnect resumes from the durable slot restart LSN"
1833        );
1834
1835        let _ = fs::remove_file(spool_path);
1836        let _ = fs::remove_file(slot_path);
1837    }
1838
1839    #[test]
1840    fn retention_floor_follows_slowest_slot_and_prunes_wal() {
1841        let primary = PrimaryReplication::new(None);
1842        primary.register_replica("fast".to_string());
1843        primary.register_replica("slow".to_string());
1844
1845        for lsn in 1..=6 {
1846            primary.wal_buffer.append(lsn, vec![lsn as u8]);
1847        }
1848
1849        primary.ack_replica_lsn("fast", 5, 5);
1850        primary.ack_replica_lsn("slow", 3, 2);
1851
1852        assert_eq!(
1853            primary.retention_floor_lsn(),
1854            Some(2),
1855            "slowest slot restart_lsn sets the retention floor"
1856        );
1857        assert_eq!(primary.prune_retained_wal_through(6).unwrap(), 2);
1858        let retained: Vec<_> = primary
1859            .wal_buffer
1860            .read_since(0, usize::MAX)
1861            .into_iter()
1862            .map(|(lsn, _)| lsn)
1863            .collect();
1864        assert_eq!(retained, vec![3, 4, 5, 6]);
1865
1866        primary.ack_replica_lsn("slow", 6, 6);
1867        assert_eq!(
1868            primary.retention_floor_lsn(),
1869            Some(5),
1870            "slot confirmation advances the retention floor"
1871        );
1872        assert_eq!(primary.prune_retained_wal_through(6).unwrap(), 5);
1873        let retained: Vec<_> = primary
1874            .wal_buffer
1875            .read_since(0, usize::MAX)
1876            .into_iter()
1877            .map(|(lsn, _)| lsn)
1878            .collect();
1879        assert_eq!(retained, vec![6]);
1880    }
1881
1882    #[test]
1883    fn bootstrap_slot_pin_prevents_wal_removed_rebootstrap_after_prune() {
1884        let primary = PrimaryReplication::new(None);
1885        for lsn in 1..=5 {
1886            primary.wal_buffer.append(lsn, vec![lsn as u8]);
1887        }
1888
1889        let slot_lsn = primary.register_replica("bootstrapping".to_string());
1890        assert_eq!(slot_lsn, 5, "bootstrap pins the current frontier");
1891
1892        for lsn in 6..=8 {
1893            primary.wal_buffer.append(lsn, vec![lsn as u8]);
1894        }
1895
1896        assert_eq!(
1897            primary.prune_retained_wal_through(8).unwrap(),
1898            5,
1899            "bootstrap slot keeps the frontier retained"
1900        );
1901        assert_eq!(
1902            primary.slot_rebootstrap_reason("bootstrapping", 0, primary.wal_buffer.oldest_lsn()),
1903            None,
1904            "a caller resuming from its slot must not see wal-removed after slot-aware pruning"
1905        );
1906    }
1907
1908    #[test]
1909    fn default_config_enables_finite_slot_retention_cap() {
1910        let config = crate::replication::ReplicationConfig::primary();
1911
1912        assert!(
1913            config.slot_retention_max_lag_lsn > 0,
1914            "primary replication must default to a finite slot retention cap"
1915        );
1916    }
1917
1918    #[test]
1919    fn retention_cap_invalidates_slow_slot_and_releases_wal_floor() {
1920        let primary = PrimaryReplication::new_with_config(
1921            None,
1922            &crate::replication::ReplicationConfig::primary().with_slot_retention_max_lag_lsn(3),
1923        );
1924        primary.register_replica("fast".to_string());
1925        primary.register_replica("slow".to_string());
1926
1927        for lsn in 1..=6 {
1928            primary.wal_buffer.append(lsn, vec![lsn as u8]);
1929        }
1930        primary.ack_replica_lsn("fast", 6, 6);
1931
1932        assert_eq!(primary.prune_retained_wal_through(6).unwrap(), 6);
1933
1934        let slow = primary
1935            .slot_snapshots()
1936            .into_iter()
1937            .find(|slot| slot.id == "slow")
1938            .expect("slow slot present");
1939        assert_eq!(
1940            slow.invalidation_reason,
1941            Some(SlotInvalidationCause::Horizon)
1942        );
1943
1944        let retained: Vec<_> = primary
1945            .wal_buffer
1946            .read_since(0, usize::MAX)
1947            .into_iter()
1948            .map(|(lsn, _)| lsn)
1949            .collect();
1950        assert!(
1951            retained.is_empty(),
1952            "invalidated slow slot must not pin WAL"
1953        );
1954    }
1955
1956    #[test]
1957    fn slot_invalidation_cause_codes_cover_wal_removed_horizon_and_idle_timeout() {
1958        let wal_removed = PrimaryReplication::new_with_config(
1959            None,
1960            &crate::replication::ReplicationConfig::primary()
1961                .with_slot_retention_max_lag_lsn(3)
1962                .with_slot_idle_timeout_ms(10),
1963        );
1964        wal_removed.register_replica("wal".to_string());
1965        assert_eq!(
1966            wal_removed.slot_rebootstrap_reason("wal", 0, Some(2)),
1967            Some(SlotInvalidationCause::WalRemoved)
1968        );
1969
1970        let horizon = PrimaryReplication::new_with_config(
1971            None,
1972            &crate::replication::ReplicationConfig::primary().with_slot_retention_max_lag_lsn(3),
1973        );
1974        horizon.register_replica("horizon".to_string());
1975        for lsn in 1..=4 {
1976            horizon.wal_buffer.append(lsn, vec![lsn as u8]);
1977        }
1978        horizon.enforce_retention_limits(0);
1979        assert_eq!(
1980            horizon
1981                .slot_snapshots()
1982                .into_iter()
1983                .find(|slot| slot.id == "horizon")
1984                .and_then(|slot| slot.invalidation_reason),
1985            Some(SlotInvalidationCause::Horizon)
1986        );
1987
1988        let idle = PrimaryReplication::new_with_config(
1989            None,
1990            &crate::replication::ReplicationConfig::primary().with_slot_idle_timeout_ms(10),
1991        );
1992        idle.register_replica("idle".to_string());
1993        idle.touch_slot("idle", 1);
1994        idle.enforce_retention_limits(12);
1995        assert_eq!(
1996            idle.slot_snapshots()
1997                .into_iter()
1998                .find(|slot| slot.id == "idle")
1999                .and_then(|slot| slot.invalidation_reason),
2000            Some(SlotInvalidationCause::IdleTimeout)
2001        );
2002    }
2003
2004    #[test]
2005    fn wal_buffer_fan_out_shares_refcounted_payload() {
2006        // Issue #832 acceptance: fan-out to multiple replicas must share
2007        // one ref-counted buffer — a second reader gets the *same*
2008        // allocation, not a per-replica byte copy.
2009        let buffer = WalBuffer::new(8);
2010        buffer.append(1, vec![0xDE, 0xAD, 0xBE, 0xEF]);
2011
2012        let replica_a = buffer.read_since_shared(0, usize::MAX);
2013        let replica_b = buffer.read_since_shared(0, usize::MAX);
2014        assert_eq!(replica_a.len(), 1);
2015        assert_eq!(replica_b.len(), 1);
2016
2017        assert!(
2018            Arc::ptr_eq(&replica_a[0].1, &replica_b[0].1),
2019            "two replicas must share one ref-counted payload allocation"
2020        );
2021        assert_eq!(&*replica_a[0].1, &[0xDE, 0xAD, 0xBE, 0xEF]);
2022        assert!(
2023            Arc::strong_count(&replica_a[0].1) >= 3,
2024            "buffer + both replica handles reference the same payload"
2025        );
2026
2027        // The owned-bytes compatibility path still yields the payload.
2028        let owned = buffer.read_since(0, usize::MAX);
2029        assert_eq!(owned, vec![(1u64, vec![0xDE, 0xAD, 0xBE, 0xEF])]);
2030    }
2031
2032    #[test]
2033    fn spool_seek_index_resume_is_sublinear() {
2034        // Issue #832 acceptance: a replica resuming from a mid-spool slot
2035        // position seeks to the nearest checkpoint rather than scanning
2036        // the whole spool from offset 0 (partial-resync seek).
2037        let data_path = temp_data_path("seek_index");
2038        let spool_path = LogicalWalSpool::path_for(&data_path);
2039        let spool = LogicalWalSpool::open(&data_path).expect("open spool");
2040
2041        for lsn in 1..=200u64 {
2042            spool
2043                .append_with_term_and_timestamp(1, lsn, lsn, &[(lsn % 251) as u8, 0xAB])
2044                .expect("append");
2045        }
2046
2047        // Full scan from the start covers every record and seeks to 0.
2048        assert_eq!(spool.read_since(0, usize::MAX).expect("full").len(), 200);
2049        assert_eq!(spool.seek_floor_offset(0), 0);
2050
2051        // Resuming from a mid LSN returns exactly the tail and seeks past
2052        // offset 0 — proof the scan started from a checkpoint, not byte 0.
2053        let resumed = spool.read_since(130, usize::MAX).expect("resume");
2054        assert_eq!(resumed.first().map(|(lsn, _)| *lsn), Some(131));
2055        assert_eq!(resumed.last().map(|(lsn, _)| *lsn), Some(200));
2056        assert_eq!(resumed.len(), 70);
2057        assert!(
2058            spool.seek_floor_offset(130) > 0,
2059            "mid-spool resume must seek past offset 0"
2060        );
2061
2062        // Reopening rebuilds the seek index from disk, so resume stays
2063        // sub-linear across a restart.
2064        drop(spool);
2065        let reopened = LogicalWalSpool::open(&data_path).expect("reopen spool");
2066        assert!(reopened.seek_floor_offset(130) > 0);
2067        assert_eq!(
2068            reopened
2069                .read_since(130, usize::MAX)
2070                .expect("resume reopen")
2071                .len(),
2072            70
2073        );
2074
2075        let _ = fs::remove_file(spool_path);
2076    }
2077
2078    #[test]
2079    fn plan_replica_resume_partial_within_window_full_past_cap() {
2080        // Issue #832 acceptance: within the retention window a brief blip
2081        // resumes via partial resync (counted as a metric); only a slot
2082        // past the retention cap forces a full re-bootstrap.
2083        let within = PrimaryReplication::new(None);
2084        within.register_replica("blip".to_string());
2085        for lsn in 1..=5 {
2086            within.wal_buffer.append(lsn, vec![lsn as u8]);
2087        }
2088        let before = within.partial_resync_count();
2089        match within.plan_replica_resume("blip", 2, within.wal_buffer.oldest_lsn()) {
2090            ResumeMode::PartialResync { resume_lsn } => assert_eq!(resume_lsn, 2),
2091            other => panic!("brief blip must resume via partial resync, got {other:?}"),
2092        }
2093        assert_eq!(
2094            within.partial_resync_count(),
2095            before + 1,
2096            "partial resync must be observable via the metric"
2097        );
2098
2099        // A slot driven past the retention cap is invalidated and must
2100        // re-bootstrap — and that decision must NOT count as a partial
2101        // resync.
2102        let past_cap = PrimaryReplication::new_with_config(
2103            None,
2104            &crate::replication::ReplicationConfig::primary().with_slot_retention_max_lag_lsn(3),
2105        );
2106        past_cap.register_replica("slow".to_string());
2107        for lsn in 1..=6 {
2108            past_cap.wal_buffer.append(lsn, vec![lsn as u8]);
2109        }
2110        past_cap.enforce_retention_limits(0);
2111        let before_full = past_cap.partial_resync_count();
2112        match past_cap.plan_replica_resume("slow", 0, past_cap.wal_buffer.oldest_lsn()) {
2113            ResumeMode::FullRebootstrap { cause } => {
2114                assert_eq!(cause, SlotInvalidationCause::Horizon)
2115            }
2116            other => panic!("slot past the cap must re-bootstrap, got {other:?}"),
2117        }
2118        assert_eq!(
2119            past_cap.partial_resync_count(),
2120            before_full,
2121            "a full re-bootstrap must not be counted as a partial resync"
2122        );
2123    }
2124
2125    #[test]
2126    fn ensure_replica_registered_self_registers_then_is_a_noop() {
2127        // Issue #812 acceptance: the production pull path auto-registers a
2128        // replica the first time it identifies itself, then advances its
2129        // per-replica state on subsequent pulls without duplicating it.
2130        let primary = PrimaryReplication::new(None);
2131
2132        // First pull-with-id self-registers.
2133        assert!(
2134            primary.ensure_replica_registered("r1"),
2135            "first identification registers the replica"
2136        );
2137        assert_eq!(primary.replica_count(), 1);
2138        let epoch_after_register = primary.topology_epoch();
2139
2140        // Per-replica state advances on pull for the now-registered replica.
2141        primary.note_replica_pull("r1", 7);
2142        assert_eq!(
2143            primary
2144                .replica_snapshots()
2145                .into_iter()
2146                .find(|r| r.id == "r1")
2147                .map(|r| r.last_sent_lsn),
2148            Some(7),
2149            "primary tracks last_sent_lsn for a registered replica's pull"
2150        );
2151
2152        // Subsequent identification is an idempotent no-op: no duplicate,
2153        // no epoch bump, progress preserved.
2154        assert!(
2155            !primary.ensure_replica_registered("r1"),
2156            "already-registered replica is not re-registered"
2157        );
2158        assert_eq!(primary.replica_count(), 1);
2159        assert_eq!(primary.topology_epoch(), epoch_after_register);
2160        assert_eq!(
2161            primary
2162                .replica_snapshots()
2163                .into_iter()
2164                .find(|r| r.id == "r1")
2165                .map(|r| r.last_sent_lsn),
2166            Some(7),
2167            "no-op registration preserves progress"
2168        );
2169    }
2170
2171    #[test]
2172    fn replication_progress_uses_sent_applied_and_durable_registry_lsns() {
2173        let now = crate::utils::now_unix_millis() as u128;
2174        let replicas = vec![
2175            ReplicaState {
2176                id: "fast".to_string(),
2177                last_acked_lsn: 90,
2178                last_sent_lsn: 120,
2179                last_durable_lsn: 80,
2180                apply_error_count: 0,
2181                divergence_count: 0,
2182                connected_at_unix_ms: now,
2183                last_seen_at_unix_ms: now,
2184                region: None,
2185                rebootstrapping: false,
2186            },
2187            ReplicaState {
2188                id: "slow".to_string(),
2189                last_acked_lsn: 70,
2190                last_sent_lsn: 100,
2191                last_durable_lsn: 60,
2192                apply_error_count: 0,
2193                divergence_count: 0,
2194                connected_at_unix_ms: now,
2195                last_seen_at_unix_ms: now,
2196                region: None,
2197                rebootstrapping: false,
2198            },
2199        ];
2200
2201        let progress = ReplicationProgress::from_replicas(&replicas).expect("registered replicas");
2202
2203        assert_eq!(progress.lag_lsn, 50);
2204        assert_eq!(progress.safe_replay_lsn, 60);
2205    }
2206}