Skip to main content

reddb_server/replication/
primary.rs

1//! Primary-side replication: WAL record production and snapshot serving.
2//!
3//! ## Logical WAL spool wire format
4//!
5//! ### Version 2 (current — PLAN.md Phase 2 / W2)
6//!
7//! ```text
8//! [magic     4 bytes  = b"RDLW"]
9//! [version   1 byte   = 0x02]
10//! [lsn       8 bytes  little-endian u64]
11//! [timestamp 8 bytes  little-endian u64 — wall-clock millis since UNIX epoch]
12//! [payload_len 4 bytes little-endian u32]
13//! [payload   payload_len bytes]
14//! [crc32     4 bytes  little-endian u32 — crc32fast of (version || lsn ||
15//!                                          timestamp || payload_len || payload)]
16//! ```
17//!
18//! - `sync_all()` is called after every append so an acknowledged
19//!   `append()` survives a power-loss event.
20//! - Recovery accepts the longest valid prefix and silently truncates
21//!   at the first torn header, short payload/crc, or checksum
22//!   mismatch (warning logged). No partial record is ever returned to
23//!   the replication subsystem.
24//!
25//! ### Version 1 (legacy, read-only)
26//!
27//! ```text
28//! [magic 4][version 1=0x01][lsn 8][payload_len 8][payload]
29//! ```
30//!
31//! No checksum, no timestamp. Read for backward compatibility on
32//! existing spools; never written. A v1 record found in a spool will
33//! be returned to consumers but flagged via `LogicalWalEntry::v1`.
34
35use std::collections::VecDeque;
36use std::fs::{self, File, OpenOptions};
37use std::io::{self, Read, Seek, SeekFrom, Write};
38use std::path::{Path, PathBuf};
39use std::sync::{Arc, Mutex, RwLock};
40use std::time::{SystemTime, UNIX_EPOCH};
41
42use tracing::warn;
43
44const LOGICAL_WAL_SPOOL_MAGIC: &[u8; 4] = b"RDLW";
45const LOGICAL_WAL_SPOOL_VERSION_V1: u8 = 1;
46const LOGICAL_WAL_SPOOL_VERSION_V2: u8 = 2;
47const LOGICAL_WAL_SPOOL_VERSION_CURRENT: u8 = LOGICAL_WAL_SPOOL_VERSION_V2;
48/// Header size in bytes for a v2 record before the payload starts:
49/// magic(4) + version(1) + lsn(8) + timestamp(8) + payload_len(4) = 25.
50const LOGICAL_WAL_V2_HEADER_LEN: u64 = 4 + 1 + 8 + 8 + 4;
51/// CRC32 trailer size in bytes for a v2 record.
52const LOGICAL_WAL_V2_CRC_LEN: u64 = 4;
53
54/// Compute CRC32 over the bytes that follow the magic — version,
55/// lsn, timestamp, payload_len, and payload. Magic is excluded so
56/// torn-record detection at recovery time depends only on data the
57/// writer covered.
58///
59/// Uses the same `crate::storage::engine::crc32` polynomial as the
60/// physical WAL record format so checksums computed here are
61/// comparable to those in `src/storage/wal/record.rs`.
62fn compute_logical_v2_crc(version: u8, lsn: u64, timestamp: u64, payload: &[u8]) -> u32 {
63    use crate::storage::engine::crc32::crc32_update;
64    let mut crc = crc32_update(0, &[version]);
65    crc = crc32_update(crc, &lsn.to_le_bytes());
66    crc = crc32_update(crc, &timestamp.to_le_bytes());
67    crc = crc32_update(crc, &(payload.len() as u32).to_le_bytes());
68    crc = crc32_update(crc, payload);
69    crc
70}
71
72/// In-memory WAL buffer for replication.
73/// Primary appends records here; replicas consume from it.
74pub struct WalBuffer {
75    /// Circular buffer of (lsn, serialized_record) pairs.
76    records: RwLock<VecDeque<(u64, Vec<u8>)>>,
77    /// Maximum records to keep in buffer.
78    max_size: usize,
79    /// Current write LSN.
80    current_lsn: RwLock<u64>,
81}
82
83impl WalBuffer {
84    pub fn new(max_size: usize) -> Self {
85        Self {
86            records: RwLock::new(VecDeque::with_capacity(max_size)),
87            max_size,
88            current_lsn: RwLock::new(0),
89        }
90    }
91
92    /// Append a WAL record. Called by the storage engine after each write.
93    pub fn append(&self, lsn: u64, data: Vec<u8>) {
94        let mut records = self.records.write().unwrap_or_else(|e| e.into_inner());
95        records.push_back((lsn, data));
96        while records.len() > self.max_size {
97            records.pop_front();
98        }
99
100        let mut current = self.current_lsn.write().unwrap_or_else(|e| e.into_inner());
101        *current = (*current).max(lsn);
102    }
103
104    /// Read records since the given LSN (exclusive).
105    pub fn read_since(&self, since_lsn: u64, max_count: usize) -> Vec<(u64, Vec<u8>)> {
106        let records = self.records.read().unwrap_or_else(|e| e.into_inner());
107        records
108            .iter()
109            .filter(|(lsn, _)| *lsn > since_lsn)
110            .take(max_count)
111            .cloned()
112            .collect()
113    }
114
115    /// Current LSN.
116    pub fn current_lsn(&self) -> u64 {
117        *self.current_lsn.read().unwrap_or_else(|e| e.into_inner())
118    }
119
120    pub fn set_current_lsn(&self, lsn: u64) {
121        let mut current = self.current_lsn.write().unwrap_or_else(|e| e.into_inner());
122        *current = (*current).max(lsn);
123    }
124
125    /// Oldest available LSN (for gap detection).
126    pub fn oldest_lsn(&self) -> Option<u64> {
127        let records = self.records.read().unwrap_or_else(|e| e.into_inner());
128        records.front().map(|(lsn, _)| *lsn)
129    }
130}
131
132#[derive(Debug, Clone)]
133struct LogicalWalEntry {
134    lsn: u64,
135    /// Wall-clock millis at append time. `0` for legacy v1 records that
136    /// did not carry a framing timestamp.
137    timestamp_ms: u64,
138    data: Vec<u8>,
139}
140
141#[derive(Debug, Default)]
142struct LogicalWalSpoolState {
143    current_lsn: u64,
144}
145
146/// Durable append-only logical WAL spool kept beside the main `.rdb` file.
147///
148/// This is not the storage-engine WAL; it is a structured replication/PITR log.
149pub struct LogicalWalSpool {
150    path: PathBuf,
151    state: Mutex<LogicalWalSpoolState>,
152}
153
154impl LogicalWalSpool {
155    pub fn path_for(data_path: &Path) -> PathBuf {
156        let file_name = data_path
157            .file_name()
158            .and_then(|name| name.to_str())
159            .unwrap_or("reddb.rdb");
160        let spool_name = format!("{file_name}.logical.wal");
161        match data_path.parent() {
162            Some(parent) => parent.join(spool_name),
163            None => PathBuf::from(spool_name),
164        }
165    }
166
167    pub fn open(data_path: &Path) -> io::Result<Self> {
168        let path = Self::path_for(data_path);
169        if let Some(parent) = path.parent() {
170            fs::create_dir_all(parent)?;
171        }
172        if !path.exists() {
173            File::create(&path)?;
174        }
175        // Recover-or-truncate to the longest valid prefix. A torn tail
176        // from the previous process exit (power loss, OOM kill, ENOSPC
177        // mid-write) is silently dropped; the warning surfaces to the
178        // operator log but the spool stays open.
179        let entries = read_and_repair_entries(&path)?;
180        let current_lsn = entries.last().map(|entry| entry.lsn).unwrap_or(0);
181        Ok(Self {
182            path,
183            state: Mutex::new(LogicalWalSpoolState { current_lsn }),
184        })
185    }
186
187    pub fn append(&self, lsn: u64, data: &[u8]) -> io::Result<()> {
188        let timestamp_ms = SystemTime::now()
189            .duration_since(UNIX_EPOCH)
190            .map(|d| d.as_millis() as u64)
191            .unwrap_or(0);
192        self.append_with_timestamp(lsn, timestamp_ms, data)
193    }
194
195    /// Append a record with an explicit framing timestamp. Used in
196    /// tests to produce deterministic timestamps; production callers
197    /// should use `append`.
198    pub fn append_with_timestamp(
199        &self,
200        lsn: u64,
201        timestamp_ms: u64,
202        data: &[u8],
203    ) -> io::Result<()> {
204        if data.len() > u32::MAX as usize {
205            return Err(io::Error::new(
206                io::ErrorKind::InvalidInput,
207                format!(
208                    "logical WAL payload of {} bytes exceeds 4 GiB framing limit",
209                    data.len()
210                ),
211            ));
212        }
213        let mut file = OpenOptions::new()
214            .create(true)
215            .append(true)
216            .open(&self.path)?;
217        // Pre-build the record in memory so a single write_all keeps
218        // the on-disk record contiguous. Two side-effects:
219        //   (a) crash recovery sees either a complete record or a torn
220        //       header, never an interleaved partial frame from two
221        //       writers (the spool is not multi-writer today, but the
222        //       single-write semantics make that future-safe);
223        //   (b) crc32 is computed exactly once over the same bytes the
224        //       reader will checksum, with zero risk of header/payload
225        //       drift from a partial flush.
226        let mut frame = Vec::with_capacity(
227            LOGICAL_WAL_V2_HEADER_LEN as usize + data.len() + LOGICAL_WAL_V2_CRC_LEN as usize,
228        );
229        frame.extend_from_slice(LOGICAL_WAL_SPOOL_MAGIC);
230        frame.push(LOGICAL_WAL_SPOOL_VERSION_CURRENT);
231        frame.extend_from_slice(&lsn.to_le_bytes());
232        frame.extend_from_slice(&timestamp_ms.to_le_bytes());
233        frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
234        frame.extend_from_slice(data);
235        let crc =
236            compute_logical_v2_crc(LOGICAL_WAL_SPOOL_VERSION_CURRENT, lsn, timestamp_ms, data);
237        frame.extend_from_slice(&crc.to_le_bytes());
238
239        file.write_all(&frame)?;
240        // PLAN.md Phase 2 mandates `sync_all` for logical WAL durability.
241        // `flush()` only drains the std::io userspace buffer; without
242        // `sync_all` the kernel page cache may still be dirty when an
243        // acknowledged write supposedly committed.
244        file.sync_all()?;
245
246        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
247        state.current_lsn = state.current_lsn.max(lsn);
248        Ok(())
249    }
250
251    pub fn read_since(&self, since_lsn: u64, max_count: usize) -> io::Result<Vec<(u64, Vec<u8>)>> {
252        let entries = read_and_repair_entries(&self.path)?;
253        Ok(entries
254            .into_iter()
255            .filter(|entry| entry.lsn > since_lsn)
256            .take(max_count)
257            .map(|entry| (entry.lsn, entry.data))
258            .collect())
259    }
260
261    pub fn current_lsn(&self) -> u64 {
262        self.state
263            .lock()
264            .unwrap_or_else(|e| e.into_inner())
265            .current_lsn
266    }
267
268    pub fn oldest_lsn(&self) -> io::Result<Option<u64>> {
269        Ok(read_and_repair_entries(&self.path)?
270            .into_iter()
271            .next()
272            .map(|entry| entry.lsn))
273    }
274
275    pub fn prune_through(&self, upto_lsn: u64) -> io::Result<()> {
276        let previous_lsn = self.current_lsn();
277        let retained: Vec<_> = read_and_repair_entries(&self.path)?
278            .into_iter()
279            .filter(|entry| entry.lsn > upto_lsn)
280            .collect();
281        let temp_path = self.path.with_extension("logical.wal.tmp");
282        let mut temp = File::create(&temp_path)?;
283        let mut current_lsn = 0;
284        for entry in retained {
285            // Re-frame as v2 so the spool only ever contains v2 records
286            // after a prune. Legacy v1 records are upgraded by carrying
287            // their original LSN forward; the framing timestamp is
288            // re-stamped to wall-clock-now because the original v1
289            // record didn't carry one — downstream consumers that need
290            // the operation's logical timestamp continue to use the
291            // payload's own ChangeRecord::timestamp field.
292            let timestamp_ms = if entry.timestamp_ms > 0 {
293                entry.timestamp_ms
294            } else {
295                SystemTime::now()
296                    .duration_since(UNIX_EPOCH)
297                    .map(|d| d.as_millis() as u64)
298                    .unwrap_or(0)
299            };
300            let crc = compute_logical_v2_crc(
301                LOGICAL_WAL_SPOOL_VERSION_CURRENT,
302                entry.lsn,
303                timestamp_ms,
304                &entry.data,
305            );
306            temp.write_all(LOGICAL_WAL_SPOOL_MAGIC)?;
307            temp.write_all(&[LOGICAL_WAL_SPOOL_VERSION_CURRENT])?;
308            temp.write_all(&entry.lsn.to_le_bytes())?;
309            temp.write_all(&timestamp_ms.to_le_bytes())?;
310            temp.write_all(&(entry.data.len() as u32).to_le_bytes())?;
311            temp.write_all(&entry.data)?;
312            temp.write_all(&crc.to_le_bytes())?;
313            current_lsn = current_lsn.max(entry.lsn);
314        }
315        temp.sync_all()?;
316        fs::rename(&temp_path, &self.path)?;
317
318        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
319        state.current_lsn = previous_lsn.max(current_lsn).max(upto_lsn);
320        Ok(())
321    }
322}
323
324/// Reads every logical-WAL record from `path`, accepting the longest
325/// valid prefix and *truncating* the file at the first torn or
326/// corrupt record. Designed for crash recovery: a process killed
327/// mid-write leaves a partial frame that this function silently drops
328/// so the spool can resume appending without ambiguity.
329///
330/// Detection of "stop here" cases:
331///   1. `UnexpectedEof` while reading any header field, payload, or
332///      crc → torn write at end of file.
333///   2. Magic mismatch (any 4 bytes that aren't `RDLW`) → corrupt or
334///      foreign data; treated as if the file ended at the start of
335///      this record.
336///   3. v2 record with unsupported version byte → same.
337///   4. v2 CRC mismatch → record corrupt; truncated.
338///
339/// The truncation only fires when at least one valid record precedes
340/// the corrupt region (or when the corrupt region is the very first
341/// record — in which case the spool becomes empty). Either way the
342/// invariant that callers see only fully-checksummed payloads is
343/// preserved.
344///
345/// v1 records (legacy, no checksum) are accepted for read-only
346/// compatibility. They never receive a checksum; a v1 read that hits
347/// `UnexpectedEof` mid-payload also triggers truncation.
348fn read_and_repair_entries(path: &Path) -> io::Result<Vec<LogicalWalEntry>> {
349    if !path.exists() {
350        return Ok(Vec::new());
351    }
352
353    let mut file = OpenOptions::new().read(true).write(true).open(path)?;
354    let mut entries = Vec::new();
355    let mut last_good_offset: u64 = 0;
356    let mut corrupt_reason: Option<String> = None;
357
358    loop {
359        let record_start = file.stream_position()?;
360
361        let mut magic = [0u8; 4];
362        match file.read_exact(&mut magic) {
363            Ok(()) => {}
364            Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
365            Err(err) => return Err(err),
366        }
367        if &magic != LOGICAL_WAL_SPOOL_MAGIC {
368            corrupt_reason = Some(format!(
369                "bad magic at offset {record_start}: got {magic:02x?}"
370            ));
371            break;
372        }
373
374        let mut version = [0u8; 1];
375        if let Err(err) = file.read_exact(&mut version) {
376            if err.kind() == io::ErrorKind::UnexpectedEof {
377                corrupt_reason = Some(format!("torn header at offset {record_start}"));
378                break;
379            }
380            return Err(err);
381        }
382
383        let entry_result = match version[0] {
384            LOGICAL_WAL_SPOOL_VERSION_V2 => read_one_v2(&mut file, record_start),
385            LOGICAL_WAL_SPOOL_VERSION_V1 => read_one_v1(&mut file, record_start),
386            other => {
387                corrupt_reason = Some(format!(
388                    "unsupported version {other} at offset {record_start}"
389                ));
390                break;
391            }
392        };
393
394        match entry_result {
395            Ok(entry) => {
396                entries.push(entry);
397                last_good_offset = file.stream_position()?;
398            }
399            Err(reason) => {
400                corrupt_reason = Some(reason);
401                break;
402            }
403        }
404    }
405
406    if let Some(reason) = corrupt_reason {
407        let total_len = file.metadata()?.len();
408        if last_good_offset < total_len {
409            warn!(
410                target: "reddb::replication::logical_wal",
411                path = %path.display(),
412                reason = %reason,
413                truncating_from = last_good_offset,
414                truncating_to = total_len,
415                kept_records = entries.len(),
416                "truncating logical-WAL spool to last valid record"
417            );
418            file.set_len(last_good_offset)?;
419            file.sync_all()?;
420        }
421    }
422
423    Ok(entries)
424}
425
426/// Read a v2 record assuming the magic + version byte have already
427/// been consumed and the file cursor sits at the LSN field. Returns
428/// `Err(reason)` for any condition that should trigger truncation.
429fn read_one_v2(file: &mut File, record_start: u64) -> Result<LogicalWalEntry, String> {
430    let mut lsn = [0u8; 8];
431    if let Err(err) = file.read_exact(&mut lsn) {
432        return Err(format!("torn lsn at offset {record_start}: {err}"));
433    }
434    let mut timestamp = [0u8; 8];
435    if let Err(err) = file.read_exact(&mut timestamp) {
436        return Err(format!("torn timestamp at offset {record_start}: {err}"));
437    }
438    let mut len_bytes = [0u8; 4];
439    if let Err(err) = file.read_exact(&mut len_bytes) {
440        return Err(format!(
441            "torn payload length at offset {record_start}: {err}"
442        ));
443    }
444    let payload_len = u32::from_le_bytes(len_bytes) as usize;
445    // Sanity guard against a runaway length encoded by a partially-
446    // corrupted header. 256 MiB is well above any plausible single
447    // ChangeRecord and well below memory we'd allocate from a torn
448    // header that happens to look like a real frame.
449    const MAX_PLAUSIBLE_PAYLOAD: usize = 256 * 1024 * 1024;
450    if payload_len > MAX_PLAUSIBLE_PAYLOAD {
451        return Err(format!(
452            "implausible payload_len {payload_len} at offset {record_start}"
453        ));
454    }
455    let mut payload = vec![0u8; payload_len];
456    if let Err(err) = file.read_exact(&mut payload) {
457        return Err(format!(
458            "torn payload at offset {record_start} (expected {payload_len} bytes): {err}"
459        ));
460    }
461    let mut crc_bytes = [0u8; 4];
462    if let Err(err) = file.read_exact(&mut crc_bytes) {
463        return Err(format!("torn crc at offset {record_start}: {err}"));
464    }
465    let stored_crc = u32::from_le_bytes(crc_bytes);
466    let expected_crc = compute_logical_v2_crc(
467        LOGICAL_WAL_SPOOL_VERSION_V2,
468        u64::from_le_bytes(lsn),
469        u64::from_le_bytes(timestamp),
470        &payload,
471    );
472    if stored_crc != expected_crc {
473        return Err(format!(
474            "crc mismatch at offset {record_start}: stored {stored_crc:#010x}, expected {expected_crc:#010x}"
475        ));
476    }
477    Ok(LogicalWalEntry {
478        lsn: u64::from_le_bytes(lsn),
479        timestamp_ms: u64::from_le_bytes(timestamp),
480        data: payload,
481    })
482}
483
484/// Read a v1 record (legacy, no checksum). Layout after magic+version:
485/// [lsn 8][payload_len 8][payload]. v1 spools were written before
486/// PLAN.md Phase 2 hardened the format; we read them so existing dev
487/// installs don't drop history on upgrade.
488fn read_one_v1(file: &mut File, record_start: u64) -> Result<LogicalWalEntry, String> {
489    let mut lsn = [0u8; 8];
490    if let Err(err) = file.read_exact(&mut lsn) {
491        return Err(format!("v1 torn lsn at offset {record_start}: {err}"));
492    }
493    let mut len_bytes = [0u8; 8];
494    if let Err(err) = file.read_exact(&mut len_bytes) {
495        return Err(format!(
496            "v1 torn payload length at offset {record_start}: {err}"
497        ));
498    }
499    let payload_len = u64::from_le_bytes(len_bytes) as usize;
500    if payload_len > 256 * 1024 * 1024 {
501        return Err(format!(
502            "v1 implausible payload_len {payload_len} at offset {record_start}"
503        ));
504    }
505    let mut payload = vec![0u8; payload_len];
506    if let Err(err) = file.read_exact(&mut payload) {
507        return Err(format!("v1 torn payload at offset {record_start}: {err}"));
508    }
509    Ok(LogicalWalEntry {
510        lsn: u64::from_le_bytes(lsn),
511        timestamp_ms: 0,
512        data: payload,
513    })
514}
515
516/// State of a connected replica. PLAN.md Phase 11.4 fields:
517/// `last_seen_at_unix_ms` updates on every interaction (pull or ack);
518/// `last_sent_lsn` updates when the primary serves a `pull_wal_records`
519/// batch; `last_durable_lsn` updates when the replica reports its WAL
520/// is durably written via `ack_replica_lsn`.
521#[derive(Debug, Clone)]
522pub struct ReplicaState {
523    pub id: String,
524    pub last_acked_lsn: u64,
525    pub last_sent_lsn: u64,
526    pub last_durable_lsn: u64,
527    pub connected_at_unix_ms: u128,
528    pub last_seen_at_unix_ms: u128,
529    /// Region identifier declared by the replica at handshake time
530    /// (Phase 2.6 multi-region PG parity). `None` until the replica
531    /// handshake extension lands in 2.6.2; the quorum coordinator's
532    /// region-binding map covers the in-process case meanwhile.
533    pub region: Option<String>,
534}
535
536/// Primary replication manager.
537pub struct PrimaryReplication {
538    pub wal_buffer: Arc<WalBuffer>,
539    pub logical_wal_spool: Option<Arc<LogicalWalSpool>>,
540    pub replicas: RwLock<Vec<ReplicaState>>,
541    /// PLAN.md Phase 11.4 — ack-driven commit synchronization. Always
542    /// allocated so the policy enum can flip from `Local` to
543    /// `AckN`/`Quorum` without touching this struct's shape.
544    pub commit_waiter: Arc<crate::replication::commit_waiter::CommitWaiter>,
545    /// Monotonic registry-change counter consumed by the
546    /// `TopologyAdvertiser` (issue #167). Bumps on register,
547    /// unregister, and the periodic health sweep when a replica
548    /// flips between healthy/unhealthy. Clients use the epoch to
549    /// detect stale advertisements without comparing the full
550    /// replica list element-wise.
551    topology_epoch: std::sync::atomic::AtomicU64,
552}
553
554impl PrimaryReplication {
555    pub fn new(data_path: Option<&Path>) -> Self {
556        Self {
557            wal_buffer: Arc::new(WalBuffer::new(100_000)),
558            logical_wal_spool: data_path
559                .and_then(|path| LogicalWalSpool::open(path).ok())
560                .map(Arc::new),
561            replicas: RwLock::new(Vec::new()),
562            commit_waiter: Arc::new(crate::replication::commit_waiter::CommitWaiter::new()),
563            topology_epoch: std::sync::atomic::AtomicU64::new(0),
564        }
565    }
566
567    pub fn register_replica(&self, id: String) -> u64 {
568        self.register_replica_with_region(id, None)
569    }
570
571    /// Register a replica with an explicit region tag (Phase 2.6 multi-region).
572    ///
573    /// Preferred when the replica handshake declares a region — the quorum
574    /// coordinator uses this field to decide whether the replica counts
575    /// toward a `QuorumMode::Regions` commit.
576    pub fn register_replica_with_region(&self, id: String, region: Option<String>) -> u64 {
577        let lsn = self.wal_buffer.current_lsn();
578        let now_ms = crate::utils::now_unix_millis() as u128;
579        let state = ReplicaState {
580            id,
581            last_acked_lsn: lsn,
582            last_sent_lsn: lsn,
583            last_durable_lsn: lsn,
584            connected_at_unix_ms: now_ms,
585            last_seen_at_unix_ms: now_ms,
586            region,
587        };
588        let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
589        replicas.push(state);
590        drop(replicas);
591        self.bump_topology_epoch();
592        lsn
593    }
594
595    /// Unregister a replica by id. Returns `true` when the replica
596    /// was present (and removed). Bumps the topology epoch so a
597    /// pending advertisement reflects the new fleet size.
598    pub fn unregister_replica(&self, id: &str) -> bool {
599        let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
600        let before = replicas.len();
601        replicas.retain(|r| r.id != id);
602        let removed = replicas.len() != before;
603        drop(replicas);
604        if removed {
605            self.bump_topology_epoch();
606        }
607        removed
608    }
609
610    /// Current topology epoch. Strictly monotonic, bumps on every
611    /// registry-shape change consumed by `TopologyAdvertiser`.
612    pub fn topology_epoch(&self) -> u64 {
613        self.topology_epoch
614            .load(std::sync::atomic::Ordering::Relaxed)
615    }
616
617    /// Advance the topology epoch. Call sites: register, unregister,
618    /// and the health-sweep tick that flips a replica between
619    /// healthy/unhealthy. Wrapping is not a concern in practice
620    /// (`u64::MAX` events would take centuries at any realistic ack
621    /// rate) but `fetch_add` saturates implicitly via wrap-around;
622    /// the consumer treats epoch as opaque so a wrap is still
623    /// strictly "different" from the previous value.
624    pub fn bump_topology_epoch(&self) {
625        self.topology_epoch
626            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
627    }
628
629    pub fn ack_replica(&self, id: &str, lsn: u64) {
630        let now_ms = crate::utils::now_unix_millis() as u128;
631        let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
632        if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
633            r.last_acked_lsn = r.last_acked_lsn.max(lsn);
634            r.last_seen_at_unix_ms = now_ms;
635        }
636    }
637
638    /// PLAN.md Phase 11.4 — replica reports applied + durable LSN
639    /// after persisting a batch. Idempotent: only advances LSNs
640    /// monotonically. `last_seen_at_unix_ms` always refreshes.
641    /// Also signals `commit_waiter` so any thread blocked on
642    /// `ack_n` / `quorum` can wake and re-check its threshold.
643    pub fn ack_replica_lsn(&self, id: &str, applied_lsn: u64, durable_lsn: u64) {
644        let now_ms = crate::utils::now_unix_millis() as u128;
645        let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
646        if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
647            r.last_acked_lsn = r.last_acked_lsn.max(applied_lsn);
648            r.last_durable_lsn = r.last_durable_lsn.max(durable_lsn);
649            r.last_seen_at_unix_ms = now_ms;
650        }
651        // Drop the write lock before signaling so a waiter that
652        // wakes immediately can read replica state without
653        // contending against us.
654        drop(replicas);
655        self.commit_waiter.record_replica_ack(id, durable_lsn);
656    }
657
658    /// PLAN.md Phase 11.4 — primary records the LSN it last sent to a
659    /// replica via pull_wal_records. Helpful for `lag_records =
660    /// last_sent_lsn - last_acked_lsn` to distinguish pull-side delay
661    /// from apply-side delay.
662    pub fn note_replica_pull(&self, id: &str, last_sent_lsn: u64) {
663        let now_ms = crate::utils::now_unix_millis() as u128;
664        let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
665        if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
666            r.last_sent_lsn = r.last_sent_lsn.max(last_sent_lsn);
667            r.last_seen_at_unix_ms = now_ms;
668        }
669    }
670
671    /// Snapshot of all currently registered replicas, for /metrics +
672    /// /admin/status. Returns owned clones so callers don't hold the
673    /// lock during serialization.
674    pub fn replica_snapshots(&self) -> Vec<ReplicaState> {
675        self.replicas
676            .read()
677            .unwrap_or_else(|e| e.into_inner())
678            .clone()
679    }
680
681    pub fn replica_count(&self) -> usize {
682        self.replicas
683            .read()
684            .unwrap_or_else(|e| e.into_inner())
685            .len()
686    }
687}
688
689#[cfg(test)]
690mod tests {
691    use super::*;
692    use crate::replication::cdc::{ChangeOperation, ChangeRecord};
693    use std::time::{SystemTime, UNIX_EPOCH};
694
695    fn temp_data_path(name: &str) -> PathBuf {
696        let suffix = SystemTime::now()
697            .duration_since(UNIX_EPOCH)
698            .unwrap()
699            .as_nanos();
700        std::env::temp_dir().join(format!("reddb_{name}_{suffix}.rdb"))
701    }
702
703    #[test]
704    fn logical_wal_spool_roundtrip_and_prune() {
705        let data_path = temp_data_path("logical_spool");
706        let spool_path = LogicalWalSpool::path_for(&data_path);
707        let spool = LogicalWalSpool::open(&data_path).expect("open spool");
708
709        let record1 = ChangeRecord {
710            lsn: 7,
711            timestamp: 1,
712            operation: ChangeOperation::Insert,
713            collection: "users".to_string(),
714            entity_id: 10,
715            entity_kind: "row".to_string(),
716            entity_bytes: Some(vec![1, 2, 3]),
717            metadata: None,
718        };
719        let record2 = ChangeRecord {
720            lsn: 8,
721            timestamp: 2,
722            operation: ChangeOperation::Update,
723            collection: "users".to_string(),
724            entity_id: 10,
725            entity_kind: "row".to_string(),
726            entity_bytes: Some(vec![4, 5, 6]),
727            metadata: None,
728        };
729
730        spool
731            .append(record1.lsn, &record1.encode())
732            .expect("append 1");
733        spool
734            .append(record2.lsn, &record2.encode())
735            .expect("append 2");
736
737        let entries = spool.read_since(0, usize::MAX).expect("read");
738        assert_eq!(entries.len(), 2);
739        assert_eq!(entries[0].0, 7);
740        assert_eq!(entries[1].0, 8);
741
742        spool.prune_through(7).expect("prune");
743        let retained = spool.read_since(0, usize::MAX).expect("read retained");
744        assert_eq!(retained.len(), 1);
745        assert_eq!(retained[0].0, 8);
746
747        let _ = fs::remove_file(spool_path);
748    }
749
750    #[test]
751    fn topology_epoch_monotonic_on_register_and_unregister() {
752        // Issue #167 acceptance: the epoch consumed by
753        // TopologyAdvertiser is strictly monotonic across registry
754        // shape changes. Pin it here so a future refactor doesn't
755        // accidentally swallow the bump.
756        let primary = PrimaryReplication::new(None);
757        let e0 = primary.topology_epoch();
758        primary.register_replica("r1".to_string());
759        let e1 = primary.topology_epoch();
760        primary.register_replica("r2".to_string());
761        let e2 = primary.topology_epoch();
762        assert!(e1 > e0, "register must bump epoch ({e0} -> {e1})");
763        assert!(e2 > e1, "second register must bump epoch ({e1} -> {e2})");
764
765        let removed = primary.unregister_replica("r1");
766        assert!(removed);
767        let e3 = primary.topology_epoch();
768        assert!(e3 > e2, "unregister must bump epoch ({e2} -> {e3})");
769
770        // Unknown id is a no-op and does not bump the epoch — keep
771        // the monotonicity tied to actual registry shape changes.
772        let absent = primary.unregister_replica("ghost");
773        assert!(!absent);
774        assert_eq!(
775            primary.topology_epoch(),
776            e3,
777            "unregistering a missing replica must not bump the epoch"
778        );
779    }
780}