Skip to main content

reddb_server/replication/
primary.rs

1//! Primary-side replication: WAL record production and snapshot serving.
2//!
3//! The logical WAL spool byte format is a `reddb-file` contract. This module
4//! owns runtime policy: appending after writes, syncing acknowledged records,
5//! serving replica pulls, and pruning once slots make records removable.
6
7use std::collections::{BTreeMap, VecDeque};
8use std::fs::{self, File, OpenOptions};
9use std::io::{self, Write};
10use std::path::{Path, PathBuf};
11use std::sync::{Arc, Condvar, Mutex, RwLock};
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14use reddb_file::{ReplicationSlot, ReplicationSlotInvalidationCause};
15use tracing::warn;
16
17mod slots;
18use slots::{
19    load_replication_slot_catalog, load_replication_slots, persist_replication_slot_catalog,
20    persist_replication_slots,
21};
22
23fn term_from_payload(payload: &[u8]) -> u64 {
24    crate::replication::cdc::ChangeRecord::decode(payload)
25        .map(|record| record.term)
26        .unwrap_or(crate::replication::DEFAULT_REPLICATION_TERM)
27}
28
29/// In-memory WAL buffer for replication.
30/// Primary appends records here; replicas consume from it.
31///
32/// Each record payload is stored behind an `Arc<[u8]>` so fan-out to
33/// multiple replicas shares a single heap allocation per record
34/// (issue #832): a pull clones the `Arc` handle, never the bytes, so
35/// adding replicas does not multiply the primary's send-buffer memory.
36pub struct WalBuffer {
37    /// Circular buffer of (lsn, ref-counted serialized record) pairs.
38    records: RwLock<VecDeque<(u64, Arc<[u8]>)>>,
39    /// Current write LSN.
40    current_lsn: RwLock<u64>,
41}
42
43impl WalBuffer {
44    pub fn new(max_size: usize) -> Self {
45        Self {
46            records: RwLock::new(VecDeque::with_capacity(max_size)),
47            current_lsn: RwLock::new(0),
48        }
49    }
50
51    /// Append a WAL record. Called by the storage engine after each write.
52    pub fn append(&self, lsn: u64, data: Vec<u8>) {
53        let mut records = self.records.write().unwrap_or_else(|e| e.into_inner());
54        records.push_back((lsn, Arc::from(data.into_boxed_slice())));
55
56        let mut current = self.current_lsn.write().unwrap_or_else(|e| e.into_inner());
57        *current = (*current).max(lsn);
58    }
59
60    /// Read records since the given LSN (exclusive), copying each
61    /// payload into an owned `Vec<u8>`. Kept for callers (WAL
62    /// archiving, retention bookkeeping) that need owned bytes; the
63    /// per-replica fan-out path should prefer [`Self::read_since_shared`]
64    /// to avoid copying.
65    pub fn read_since(&self, since_lsn: u64, max_count: usize) -> Vec<(u64, Vec<u8>)> {
66        self.read_since_shared(since_lsn, max_count)
67            .into_iter()
68            .map(|(lsn, data)| (lsn, data.to_vec()))
69            .collect()
70    }
71
72    /// Read records since the given LSN (exclusive) sharing the stored
73    /// `Arc<[u8]>` payloads. Fan-out to N replicas clones only the
74    /// reference-counted handles, so the buffer's bytes are never
75    /// duplicated per replica (issue #832).
76    pub fn read_since_shared(&self, since_lsn: u64, max_count: usize) -> Vec<(u64, Arc<[u8]>)> {
77        let records = self.records.read().unwrap_or_else(|e| e.into_inner());
78        records
79            .iter()
80            .filter(|(lsn, _)| *lsn > since_lsn)
81            .take(max_count)
82            .cloned()
83            .collect()
84    }
85
86    /// Current LSN.
87    pub fn current_lsn(&self) -> u64 {
88        *self.current_lsn.read().unwrap_or_else(|e| e.into_inner())
89    }
90
91    pub fn set_current_lsn(&self, lsn: u64) {
92        let mut current = self.current_lsn.write().unwrap_or_else(|e| e.into_inner());
93        *current = (*current).max(lsn);
94    }
95
96    pub fn prune_through(&self, upto_lsn: u64) {
97        let mut records = self.records.write().unwrap_or_else(|e| e.into_inner());
98        while records
99            .front()
100            .map(|(lsn, _)| *lsn <= upto_lsn)
101            .unwrap_or(false)
102        {
103            records.pop_front();
104        }
105    }
106
107    /// Oldest available LSN (for gap detection).
108    pub fn oldest_lsn(&self) -> Option<u64> {
109        let records = self.records.read().unwrap_or_else(|e| e.into_inner());
110        records.front().map(|(lsn, _)| *lsn)
111    }
112}
113
114fn logical_wal_entry_term(entry: &reddb_file::LogicalWalEntry) -> u64 {
115    if entry.term == 0 {
116        term_from_payload(&entry.data)
117    } else {
118        entry.term
119    }
120}
121
122fn logical_wal_data_with_framing_term(entry: &reddb_file::LogicalWalEntry) -> Vec<u8> {
123    let term = logical_wal_entry_term(entry);
124    match crate::replication::cdc::ChangeRecord::decode(&entry.data) {
125        Ok(mut record) if record.term != term => {
126            record.term = term;
127            record.encode()
128        }
129        _ => entry.data.clone(),
130    }
131}
132
133/// One in every `SEEK_INDEX_INTERVAL` records is checkpointed into the
134/// spool's in-memory seek index. A briefly-disconnected replica
135/// resuming from its slot LSN binary-searches this sparse index and
136/// seeks straight to the nearest preceding checkpoint, then scans
137/// forward at most `SEEK_INDEX_INTERVAL` records — turning resume from
138/// an O(n) full-file scan into a sub-linear seek (issue #832). The
139/// index is rebuilt on `open` and extended on every `append`.
140#[derive(Debug, Default)]
141struct LogicalWalSpoolState {
142    current_lsn: u64,
143    /// Sparse, strictly LSN-ascending `(lsn, byte_offset)` checkpoints
144    /// into the spool file. `byte_offset` is the start of the record
145    /// whose LSN is `lsn`.
146    seek_index: Vec<(u64, u64)>,
147    /// Byte length of the spool file (offset at which the next append
148    /// lands). Tracked so `append` can record a checkpoint's offset
149    /// without an extra `stat`.
150    write_offset: u64,
151    /// Total records appended/recovered, used to space checkpoints
152    /// `SEEK_INDEX_INTERVAL` records apart.
153    record_count: u64,
154}
155
156impl LogicalWalSpoolState {
157    /// Push a checkpoint for the record at `offset` if it falls on a
158    /// `SEEK_INDEX_INTERVAL` boundary. `ordinal` is the record's
159    /// zero-based position in the spool.
160    fn note_record(&mut self, ordinal: u64, lsn: u64, offset: u64) {
161        if ordinal.is_multiple_of(reddb_file::LOGICAL_WAL_SEEK_INDEX_INTERVAL) {
162            // Keep the index strictly ascending even if LSNs repeat
163            // (they should not, but a defensive guard keeps the binary
164            // search total).
165            if self.seek_index.last().map(|(l, _)| *l) != Some(lsn) {
166                self.seek_index.push((lsn, offset));
167            }
168        }
169    }
170
171    /// Byte offset to start a forward scan from when resuming at
172    /// `since_lsn` (exclusive). Returns the offset of the latest
173    /// checkpoint whose LSN is `<= since_lsn`, or `0` when no such
174    /// checkpoint exists.
175    fn seek_floor_offset(&self, since_lsn: u64) -> u64 {
176        match self
177            .seek_index
178            .binary_search_by(|(lsn, _)| lsn.cmp(&since_lsn))
179        {
180            Ok(idx) => self.seek_index[idx].1,
181            Err(0) => 0,
182            Err(idx) => self.seek_index[idx - 1].1,
183        }
184    }
185}
186
187/// Durable append-only logical WAL spool kept beside the main `.rdb` file.
188///
189/// This is not the storage-engine WAL; it is a structured replication/PITR log.
190pub struct LogicalWalSpool {
191    path: PathBuf,
192    state: Mutex<LogicalWalSpoolState>,
193}
194
195impl LogicalWalSpool {
196    pub fn path_for(data_path: &Path) -> PathBuf {
197        reddb_file::layout::logical_wal_path(data_path)
198    }
199
200    pub fn open(data_path: &Path) -> io::Result<Self> {
201        let path = Self::path_for(data_path);
202        if let Some(parent) = path.parent() {
203            fs::create_dir_all(parent)?;
204        }
205        if !path.exists() {
206            File::create(&path)?;
207        }
208        // Recover-or-truncate to the longest valid prefix. A torn tail
209        // from the previous process exit (power loss, OOM kill, ENOSPC
210        // mid-write) is silently dropped; the warning surfaces to the
211        // operator log but the spool stays open.
212        let entries = reddb_file::read_and_repair_logical_wal_entries(&path)?;
213        let current_lsn = entries.last().map(|entry| entry.lsn).unwrap_or(0);
214        // Rebuild the sparse seek index from the (now repaired) file so
215        // a post-restart resume is sub-linear from the first pull.
216        let (seek_index, write_offset, record_count) =
217            reddb_file::build_logical_wal_seek_index(&path)?;
218        Ok(Self {
219            path,
220            state: Mutex::new(LogicalWalSpoolState {
221                current_lsn,
222                seek_index,
223                write_offset,
224                record_count,
225            }),
226        })
227    }
228
229    pub fn append(&self, lsn: u64, data: &[u8]) -> io::Result<()> {
230        let timestamp_ms = SystemTime::now()
231            .duration_since(UNIX_EPOCH)
232            .map(|d| d.as_millis() as u64)
233            .unwrap_or(0);
234        self.append_with_timestamp(lsn, timestamp_ms, data)
235    }
236
237    /// Append a record with an explicit framing timestamp. Used in
238    /// tests to produce deterministic timestamps; production callers
239    /// should use `append`.
240    pub fn append_with_timestamp(
241        &self,
242        lsn: u64,
243        timestamp_ms: u64,
244        data: &[u8],
245    ) -> io::Result<()> {
246        self.append_with_term_and_timestamp(term_from_payload(data), lsn, timestamp_ms, data)
247    }
248
249    pub fn append_with_term_and_timestamp(
250        &self,
251        term: u64,
252        lsn: u64,
253        timestamp_ms: u64,
254        data: &[u8],
255    ) -> io::Result<()> {
256        let mut file = OpenOptions::new()
257            .create(true)
258            .append(true)
259            .open(&self.path)?;
260        // Pre-build the record in memory so a single write_all keeps
261        // the on-disk record contiguous. Two side-effects:
262        //   (a) crash recovery sees either a complete record or a torn
263        //       header, never an interleaved partial frame from two
264        //       writers (the spool is not multi-writer today, but the
265        //       single-write semantics make that future-safe);
266        //   (b) crc32 is computed exactly once over the same bytes the
267        //       reader will checksum, with zero risk of header/payload
268        //       drift from a partial flush.
269        let frame = reddb_file::encode_logical_wal_v3(term, lsn, timestamp_ms, data)?;
270
271        file.write_all(&frame)?;
272        // PLAN.md Phase 2 mandates `sync_all` for logical WAL durability.
273        // `flush()` only drains the std::io userspace buffer; without
274        // `sync_all` the kernel page cache may still be dirty when an
275        // acknowledged write supposedly committed.
276        file.sync_all()?;
277
278        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
279        state.current_lsn = state.current_lsn.max(lsn);
280        // The record we just wrote starts at the prior end-of-file.
281        // Checkpoint it into the seek index if it lands on an interval
282        // boundary, then advance the tracked write offset.
283        let record_start = state.write_offset;
284        let ordinal = state.record_count;
285        state.note_record(ordinal, lsn, record_start);
286        state.write_offset = record_start + frame.len() as u64;
287        state.record_count = ordinal + 1;
288        Ok(())
289    }
290
291    pub fn read_since(&self, since_lsn: u64, max_count: usize) -> io::Result<Vec<(u64, Vec<u8>)>> {
292        // Seek straight to the nearest indexed checkpoint at or before
293        // `since_lsn` instead of scanning the whole spool from offset 0
294        // (issue #832). The file was already repaired on `open`, so the
295        // forward scan from the checkpoint is non-repairing and simply
296        // stops at the first torn tail (left for the next `open` to fix).
297        let start_offset = {
298            let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
299            state.seek_floor_offset(since_lsn)
300        };
301        let entries = reddb_file::read_logical_wal_entries_from(&self.path, start_offset)?;
302        Ok(entries
303            .into_iter()
304            .filter(|entry| entry.lsn > since_lsn)
305            .take(max_count)
306            .map(|entry| (entry.lsn, logical_wal_data_with_framing_term(&entry)))
307            .collect())
308    }
309
310    /// Byte offset a resume at `since_lsn` would seek to before
311    /// forward-scanning. Exposed for tests asserting the resume is
312    /// sub-linear (starts past offset 0 for a mid-spool LSN).
313    #[cfg(test)]
314    fn seek_floor_offset(&self, since_lsn: u64) -> u64 {
315        self.state
316            .lock()
317            .unwrap_or_else(|e| e.into_inner())
318            .seek_floor_offset(since_lsn)
319    }
320
321    pub fn current_lsn(&self) -> u64 {
322        self.state
323            .lock()
324            .unwrap_or_else(|e| e.into_inner())
325            .current_lsn
326    }
327
328    pub fn oldest_lsn(&self) -> io::Result<Option<u64>> {
329        Ok(reddb_file::read_and_repair_logical_wal_entries(&self.path)?
330            .into_iter()
331            .next()
332            .map(|entry| entry.lsn))
333    }
334
335    pub fn prune_through(&self, upto_lsn: u64) -> io::Result<()> {
336        let previous_lsn = self.current_lsn();
337        let mut retained: Vec<_> = reddb_file::read_and_repair_logical_wal_entries(&self.path)?
338            .into_iter()
339            .filter(|entry| entry.lsn > upto_lsn)
340            .collect();
341        for entry in &mut retained {
342            entry.term = logical_wal_entry_term(entry);
343        }
344        let temp_path = reddb_file::layout::logical_wal_temp_path(&self.path);
345        for entry in &mut retained {
346            // Re-frame as v3 so the spool only ever contains current records
347            // after a prune. Legacy v1 records are upgraded by carrying
348            // their original LSN and default term forward; the framing timestamp is
349            // re-stamped to wall-clock-now because the original v1
350            // record didn't carry one — downstream consumers that need
351            // the operation's logical timestamp continue to use the
352            // payload's own ChangeRecord::timestamp field.
353            let timestamp_ms = if entry.timestamp_ms > 0 {
354                entry.timestamp_ms
355            } else {
356                SystemTime::now()
357                    .duration_since(UNIX_EPOCH)
358                    .map(|d| d.as_millis() as u64)
359                    .unwrap_or(0)
360            };
361            entry.timestamp_ms = timestamp_ms;
362        }
363        let current_lsn =
364            reddb_file::rewrite_logical_wal_entries(&self.path, &temp_path, &retained)?;
365
366        // The rewrite shifted every record's byte offset, so the old
367        // seek index is stale — rebuild it from the compacted file.
368        let (seek_index, write_offset, record_count) =
369            reddb_file::build_logical_wal_seek_index(&self.path)?;
370        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
371        state.current_lsn = previous_lsn.max(current_lsn).max(upto_lsn);
372        state.seek_index = seek_index;
373        state.write_offset = write_offset;
374        state.record_count = record_count;
375        Ok(())
376    }
377}
378
379/// State of a connected replica. PLAN.md Phase 11.4 fields:
380/// `last_seen_at_unix_ms` updates on every interaction (pull or ack);
381/// `last_sent_lsn` updates when the primary serves a `pull_wal_records`
382/// batch; `last_durable_lsn` updates when the replica reports its WAL
383/// is durably written via `ack_replica_lsn`.
384#[derive(Debug, Clone)]
385pub struct ReplicaState {
386    pub id: String,
387    pub last_acked_lsn: u64,
388    pub last_sent_lsn: u64,
389    pub last_durable_lsn: u64,
390    pub apply_error_count: u64,
391    pub divergence_count: u64,
392    pub connected_at_unix_ms: u128,
393    pub last_seen_at_unix_ms: u128,
394    /// Region identifier declared by the replica at handshake time
395    /// (Phase 2.6 multi-region PG parity). `None` until the replica
396    /// handshake extension lands in 2.6.2; the quorum coordinator's
397    /// region-binding map covers the in-process case meanwhile.
398    pub region: Option<String>,
399    /// `true` while this replica is re-bootstrapping — loading a fresh
400    /// snapshot to replace its current dataset (issue #837). It keeps
401    /// serving non-causal reads from the old data, but the advertiser
402    /// surfaces this flag so a causal reader routes bookmark reads
403    /// elsewhere: the replica's `last_acked_lsn` describes data it is
404    /// about to discard. Cleared atomically when the swap completes.
405    pub rebootstrapping: bool,
406}
407
408/// Primary-side replication progress derived from the replica registry.
409#[derive(Debug, Clone, Copy, PartialEq, Eq)]
410pub struct ReplicationProgress {
411    pub lag_lsn: u64,
412    pub safe_replay_lsn: u64,
413}
414
415impl ReplicationProgress {
416    pub fn from_replicas(replicas: &[ReplicaState]) -> Option<Self> {
417        let max_sent_lsn = replicas.iter().map(|replica| replica.last_sent_lsn).max()?;
418        let min_acked_lsn = replicas
419            .iter()
420            .map(|replica| replica.last_acked_lsn)
421            .min()?;
422        let safe_replay_lsn = replicas
423            .iter()
424            .map(|replica| replica.last_durable_lsn)
425            .min()?;
426
427        Some(Self {
428            lag_lsn: max_sent_lsn.saturating_sub(min_acked_lsn),
429            safe_replay_lsn,
430        })
431    }
432}
433
434/// Primary replication manager.
435pub struct PrimaryReplication {
436    pub wal_buffer: Arc<WalBuffer>,
437    pub logical_wal_spool: Option<Arc<LogicalWalSpool>>,
438    pub replicas: RwLock<Vec<ReplicaState>>,
439    wal_appended: (Mutex<u64>, Condvar),
440    slot_path: Option<PathBuf>,
441    slot_catalog_path: Option<PathBuf>,
442    primary_replica_file_plan: Option<reddb_file::PrimaryReplicaFilePlan>,
443    primary_replica_wal_lock: Mutex<()>,
444    slots: RwLock<BTreeMap<String, ReplicationSlot>>,
445    slot_retention_max_lag_lsn: u64,
446    slot_idle_timeout_ms: u64,
447    /// PLAN.md Phase 11.4 — ack-driven commit synchronization. Always
448    /// allocated so the policy enum can flip from `Local` to
449    /// `AckN`/`Quorum` without touching this struct's shape.
450    pub commit_waiter: Arc<crate::replication::commit_waiter::CommitWaiter>,
451    /// Monotonic registry-change counter consumed by the
452    /// `TopologyAdvertiser` (issue #167). Bumps on register,
453    /// unregister, and the periodic health sweep when a replica
454    /// flips between healthy/unhealthy. Clients use the epoch to
455    /// detect stale advertisements without comparing the full
456    /// replica list element-wise.
457    topology_epoch: std::sync::atomic::AtomicU64,
458    /// Count of pulls served as a partial resync — a replica resuming
459    /// incrementally from its retained slot position rather than
460    /// triggering a full re-bootstrap (issue #832). Surfaced as a
461    /// replication metric so a brief disconnect that recovers via
462    /// partial resync is observable.
463    partial_resync_count: std::sync::atomic::AtomicU64,
464    /// Count of pulls that forced a full re-bootstrap — the replica's
465    /// retained WAL no longer covers its requested position, so it must
466    /// discard its dataset and reload a fresh snapshot (issue #839).
467    /// This is the primary alert signal: a healthy cluster re-bootstraps
468    /// rarely, so any sustained rise means slots are being invalidated
469    /// faster than replicas can keep up.
470    full_resync_count: std::sync::atomic::AtomicU64,
471}
472
473/// How a replica's pull should be served, decided from its slot state.
474#[derive(Debug, Clone, Copy, PartialEq, Eq)]
475pub enum ResumeMode {
476    /// Resume incrementally from `resume_lsn` (the replica's slot
477    /// position, never behind it). The retained WAL still covers the
478    /// gap, so a brief disconnect costs only a partial resync.
479    PartialResync { resume_lsn: u64 },
480    /// The slot is past the retention cap (or otherwise invalidated);
481    /// the replica must discard and re-bootstrap from a fresh snapshot.
482    FullRebootstrap {
483        cause: ReplicationSlotInvalidationCause,
484    },
485}
486
487impl PrimaryReplication {
488    pub fn slot_path_for(data_path: &Path) -> PathBuf {
489        reddb_file::layout::legacy_logical_slots_path(data_path)
490    }
491
492    pub fn primary_replica_root_for(data_path: &Path) -> PathBuf {
493        reddb_file::layout::primary_replica_root(data_path)
494    }
495
496    pub fn slot_catalog_path_for(data_path: &Path) -> PathBuf {
497        Self::primary_replica_file_plan_for(data_path).slots_path()
498    }
499
500    fn primary_replica_file_plan_for(data_path: &Path) -> reddb_file::PrimaryReplicaFilePlan {
501        let root = Self::primary_replica_root_for(data_path);
502        let timeline =
503            Self::primary_replica_current_timeline_for_root(&root).unwrap_or_else(|err| {
504                warn!(
505                    target: "reddb::replication::primary",
506                    error = %err,
507                    "failed to read primary-replica timeline history; using initial timeline"
508                );
509                reddb_file::TimelineId::initial()
510            });
511        reddb_file::PrimaryReplicaFilePlan::new(root, timeline)
512    }
513
514    fn primary_replica_current_timeline_for_root(
515        root: &Path,
516    ) -> Result<reddb_file::TimelineId, reddb_file::RdbFileError> {
517        let path = reddb_file::PrimaryReplicaFilePlan::new(root, reddb_file::TimelineId::initial())
518            .timeline_history_path();
519        match reddb_file::TimelineHistory::read_from_path(&path) {
520            Ok(history) => Ok(history
521                .current()
522                .unwrap_or_else(reddb_file::TimelineId::initial)),
523            Err(reddb_file::RdbFileError::Io(err))
524                if err.kind() == std::io::ErrorKind::NotFound =>
525            {
526                Ok(reddb_file::TimelineId::initial())
527            }
528            Err(err) => Err(err),
529        }
530    }
531
532    pub fn new(data_path: Option<&Path>) -> Self {
533        Self::new_with_config(data_path, &crate::replication::ReplicationConfig::primary())
534    }
535
536    pub fn new_with_config(
537        data_path: Option<&Path>,
538        config: &crate::replication::ReplicationConfig,
539    ) -> Self {
540        let now_ms = crate::utils::now_unix_millis() as u128;
541        let slot_path = data_path.map(Self::slot_path_for);
542        let slot_catalog_path = data_path.map(Self::slot_catalog_path_for);
543        let primary_replica_file_plan = data_path.map(Self::primary_replica_file_plan_for);
544        let mut slots = load_replication_slot_catalog(slot_catalog_path.as_deref(), now_ms);
545        slots.extend(load_replication_slots(slot_path.as_deref(), now_ms));
546        let logical_wal_spool = data_path
547            .and_then(|path| LogicalWalSpool::open(path).ok())
548            .map(Arc::new);
549        let current_lsn = logical_wal_spool
550            .as_ref()
551            .map(|spool| spool.current_lsn())
552            .unwrap_or(0);
553        Self {
554            wal_buffer: Arc::new(WalBuffer::new(100_000)),
555            logical_wal_spool,
556            replicas: RwLock::new(Vec::new()),
557            wal_appended: (Mutex::new(current_lsn), Condvar::new()),
558            slot_path,
559            slot_catalog_path,
560            primary_replica_file_plan,
561            primary_replica_wal_lock: Mutex::new(()),
562            slots: RwLock::new(slots),
563            slot_retention_max_lag_lsn: config.slot_retention_max_lag_lsn,
564            slot_idle_timeout_ms: config.slot_idle_timeout_ms,
565            commit_waiter: Arc::new(crate::replication::commit_waiter::CommitWaiter::new()),
566            topology_epoch: std::sync::atomic::AtomicU64::new(0),
567            partial_resync_count: std::sync::atomic::AtomicU64::new(0),
568            full_resync_count: std::sync::atomic::AtomicU64::new(0),
569        }
570    }
571
572    pub fn append_logical_record(&self, lsn: u64, encoded: Vec<u8>) {
573        self.wal_buffer.append(lsn, encoded.clone());
574        if let Some(spool) = &self.logical_wal_spool {
575            let _ = spool.append(lsn, &encoded);
576        }
577        if let Some(plan) = &self.primary_replica_file_plan {
578            let _guard = self
579                .primary_replica_wal_lock
580                .lock()
581                .unwrap_or_else(|err| err.into_inner());
582            match Self::primary_replica_current_timeline_for_root(&plan.root) {
583                Ok(timeline) => {
584                    let plan = reddb_file::PrimaryReplicaFilePlan::new(plan.root.clone(), timeline);
585                    if let Err(err) = plan.append_wal_record(lsn, &encoded) {
586                        warn!(
587                            target: "reddb::replication::primary",
588                            lsn,
589                            error = %err,
590                            "failed to append primary-replica WAL segment"
591                        );
592                    }
593                }
594                Err(err) => {
595                    warn!(
596                        target: "reddb::replication::primary",
597                        lsn,
598                        error = %err,
599                        "failed to read primary-replica timeline history; skipping WAL append"
600                    );
601                }
602            }
603        }
604        let (lock, cvar) = &self.wal_appended;
605        let mut latest = lock.lock().unwrap_or_else(|e| e.into_inner());
606        *latest = (*latest).max(lsn);
607        cvar.notify_all();
608    }
609
610    pub fn wait_for_logical_lsn_after(&self, since_lsn: u64, timeout: Duration) -> bool {
611        if self.current_logical_lsn() > since_lsn {
612            return true;
613        }
614        let deadline = Instant::now() + timeout;
615        let (lock, cvar) = &self.wal_appended;
616        let mut latest = lock.lock().unwrap_or_else(|e| e.into_inner());
617        while *latest <= since_lsn {
618            let now = Instant::now();
619            if now >= deadline {
620                return false;
621            }
622            let remaining = deadline.saturating_duration_since(now);
623            let (guard, result) = cvar
624                .wait_timeout(latest, remaining)
625                .unwrap_or_else(|e| e.into_inner());
626            latest = guard;
627            if result.timed_out() && *latest <= since_lsn {
628                return false;
629            }
630        }
631        true
632    }
633
634    pub fn register_replica(&self, id: String) -> u64 {
635        self.register_replica_with_region(id, None)
636    }
637
638    /// Register a replica with an explicit region tag (Phase 2.6 multi-region).
639    ///
640    /// Preferred when the replica handshake declares a region — the quorum
641    /// coordinator uses this field to decide whether the replica counts
642    /// toward a `QuorumMode::Regions` commit.
643    ///
644    /// Idempotent on reconnect (issue #812): if a replica with `id` is
645    /// already registered, the existing entry is *updated in place* rather
646    /// than duplicated — progress LSNs (`last_acked_lsn`, `last_sent_lsn`,
647    /// `last_durable_lsn`) are preserved so a reconnecting replica is not
648    /// rewound, only `last_seen_at_unix_ms` is refreshed (and `region` when
649    /// a non-`None` value is supplied). A re-registration is not a
650    /// registry-shape change, so it does **not** bump the topology epoch.
651    /// Returns the slot `restart_lsn` the replica should resume streaming from:
652    /// the current WAL LSN for a fresh registration, or the durable slot
653    /// restart point for a reconnect.
654    pub fn register_replica_with_region(&self, id: String, region: Option<String>) -> u64 {
655        let now_ms = crate::utils::now_unix_millis() as u128;
656        let resume_lsn = self.ensure_slot(&id, self.current_logical_lsn());
657        let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
658        if let Some(existing) = replicas.iter_mut().find(|r| r.id == id) {
659            existing.last_seen_at_unix_ms = now_ms;
660            if region.is_some() {
661                existing.region = region;
662            }
663            return resume_lsn;
664        }
665        replicas.push(ReplicaState {
666            id,
667            last_acked_lsn: resume_lsn,
668            last_sent_lsn: resume_lsn,
669            last_durable_lsn: resume_lsn,
670            apply_error_count: 0,
671            divergence_count: 0,
672            connected_at_unix_ms: now_ms,
673            last_seen_at_unix_ms: now_ms,
674            region,
675            rebootstrapping: false,
676        });
677        drop(replicas);
678        self.bump_topology_epoch();
679        resume_lsn
680    }
681
682    /// Mark (or clear) a replica's re-bootstrap state (issue #837).
683    ///
684    /// While `rebootstrapping` is `true` the replica keeps serving
685    /// non-causal reads from its existing data, but the advertiser
686    /// surfaces the flag so causal (bookmark) reads route to a
687    /// caught-up peer instead — the rebuilding replica's applied
688    /// frontier describes data it is about to discard. The primary
689    /// flips this back to `false` when the replica reports its atomic
690    /// snapshot swap complete.
691    ///
692    /// A change to the flag is a registry-shape change for routing
693    /// purposes, so it bumps the topology epoch to force consumers to
694    /// re-read the advertisement. Returns `true` when a replica with
695    /// `id` was present and updated.
696    pub fn set_replica_rebootstrapping(&self, id: &str, rebootstrapping: bool) -> bool {
697        let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
698        let Some(state) = replicas.iter_mut().find(|r| r.id == id) else {
699            return false;
700        };
701        if state.rebootstrapping == rebootstrapping {
702            return true;
703        }
704        state.rebootstrapping = rebootstrapping;
705        drop(replicas);
706        self.bump_topology_epoch();
707        true
708    }
709
710    /// Ensure a replica identifying itself with `id` is present in the
711    /// registry (issue #812). This is the production self-registration hook
712    /// used by the `pull_wal_records` path: the first time a replica sends
713    /// its `replica_id` on a pull, the primary registers it so it is no
714    /// longer blind to that replica's existence; subsequent pulls are
715    /// idempotent no-ops. Returns `true` when a new registration was
716    /// created. Delegates to `register_replica_with_region`, so reconnects
717    /// preserve progress and do not bump the topology epoch.
718    pub fn ensure_replica_registered(&self, id: &str) -> bool {
719        let already = self
720            .replicas
721            .read()
722            .unwrap_or_else(|e| e.into_inner())
723            .iter()
724            .any(|r| r.id == id);
725        if already {
726            return false;
727        }
728        self.register_replica(id.to_string());
729        true
730    }
731
732    /// Unregister a replica by id. Returns `true` when the replica
733    /// was present (and removed). Bumps the topology epoch so a
734    /// pending advertisement reflects the new fleet size.
735    pub fn unregister_replica(&self, id: &str) -> bool {
736        let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
737        let before = replicas.len();
738        replicas.retain(|r| r.id != id);
739        let removed = replicas.len() != before;
740        drop(replicas);
741        if removed {
742            self.commit_waiter.drop_replica(id);
743            self.bump_topology_epoch();
744        }
745        removed
746    }
747
748    /// Current topology epoch. Strictly monotonic, bumps on every
749    /// registry-shape change consumed by `TopologyAdvertiser`.
750    pub fn topology_epoch(&self) -> u64 {
751        self.topology_epoch
752            .load(std::sync::atomic::Ordering::Relaxed)
753    }
754
755    /// Advance the topology epoch. Call sites: register, unregister,
756    /// and the health-sweep tick that flips a replica between
757    /// healthy/unhealthy. Wrapping is not a concern in practice
758    /// (`u64::MAX` events would take centuries at any realistic ack
759    /// rate) but `fetch_add` saturates implicitly via wrap-around;
760    /// the consumer treats epoch as opaque so a wrap is still
761    /// strictly "different" from the previous value.
762    pub fn bump_topology_epoch(&self) {
763        self.topology_epoch
764            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
765    }
766
767    pub fn ack_replica(&self, id: &str, lsn: u64) {
768        let now_ms = crate::utils::now_unix_millis() as u128;
769        let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
770        if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
771            r.last_acked_lsn = r.last_acked_lsn.max(lsn);
772            r.last_durable_lsn = r.last_durable_lsn.max(lsn);
773            r.last_seen_at_unix_ms = now_ms;
774        }
775        drop(replicas);
776        self.commit_waiter.record_replica_ack(id, lsn);
777    }
778
779    /// PLAN.md Phase 11.4 — replica reports applied + durable LSN
780    /// after persisting a batch. Idempotent: only advances LSNs
781    /// monotonically. `last_seen_at_unix_ms` always refreshes.
782    /// Also signals `commit_waiter` so any thread blocked on
783    /// `ack_n` / `quorum` can wake and re-check its threshold.
784    pub fn ack_replica_lsn(&self, id: &str, applied_lsn: u64, durable_lsn: u64) {
785        self.ack_replica_lsn_with_observability(id, applied_lsn, durable_lsn, 0, 0);
786    }
787
788    pub fn ack_replica_lsn_with_observability(
789        &self,
790        id: &str,
791        applied_lsn: u64,
792        durable_lsn: u64,
793        apply_error_count: u64,
794        divergence_count: u64,
795    ) {
796        let now_ms = crate::utils::now_unix_millis() as u128;
797        self.advance_slot(id, applied_lsn, durable_lsn, now_ms);
798        let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
799        if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
800            r.last_acked_lsn = r.last_acked_lsn.max(applied_lsn);
801            r.last_durable_lsn = r.last_durable_lsn.max(durable_lsn);
802            r.apply_error_count = r.apply_error_count.max(apply_error_count);
803            r.divergence_count = r.divergence_count.max(divergence_count);
804            r.last_seen_at_unix_ms = now_ms;
805        }
806        // Drop the write lock before signaling so a waiter that
807        // wakes immediately can read replica state without
808        // contending against us.
809        drop(replicas);
810        self.commit_waiter.record_replica_ack(id, durable_lsn);
811    }
812
813    /// PLAN.md Phase 11.4 — primary records the LSN it last sent to a
814    /// replica via pull_wal_records. Helpful for `lag_records =
815    /// last_sent_lsn - last_acked_lsn` to distinguish pull-side delay
816    /// from apply-side delay.
817    pub fn note_replica_pull(&self, id: &str, last_sent_lsn: u64) {
818        let now_ms = crate::utils::now_unix_millis() as u128;
819        self.touch_slot(id, now_ms);
820        let mut replicas = self.replicas.write().unwrap_or_else(|e| e.into_inner());
821        if let Some(r) = replicas.iter_mut().find(|r| r.id == id) {
822            r.last_sent_lsn = r.last_sent_lsn.max(last_sent_lsn);
823            r.last_seen_at_unix_ms = now_ms;
824        }
825    }
826
827    /// Snapshot of all currently registered replicas, for /metrics +
828    /// /admin/status. Returns owned clones so callers don't hold the
829    /// lock during serialization.
830    pub fn replica_snapshots(&self) -> Vec<ReplicaState> {
831        self.replicas
832            .read()
833            .unwrap_or_else(|e| e.into_inner())
834            .clone()
835    }
836
837    pub fn replication_progress(&self) -> Option<ReplicationProgress> {
838        let replicas = self.replicas.read().unwrap_or_else(|e| e.into_inner());
839        ReplicationProgress::from_replicas(&replicas)
840    }
841
842    pub fn slot_snapshots(&self) -> Vec<ReplicationSlot> {
843        self.slots
844            .read()
845            .unwrap_or_else(|e| e.into_inner())
846            .values()
847            .cloned()
848            .collect()
849    }
850
851    pub fn retention_floor_lsn(&self) -> Option<u64> {
852        self.slots
853            .read()
854            .unwrap_or_else(|e| e.into_inner())
855            .values()
856            .filter(|slot| slot.invalidation_reason.is_none())
857            .map(|slot| slot.restart_lsn)
858            .min()
859    }
860
861    pub fn prune_retained_wal_through(&self, archived_lsn: u64) -> io::Result<u64> {
862        self.enforce_retention_limits(crate::utils::now_unix_millis() as u128);
863        let prune_lsn = self
864            .retention_floor_lsn()
865            .map(|floor| floor.min(archived_lsn))
866            .unwrap_or(archived_lsn);
867        if prune_lsn > 0 {
868            if let Some(spool) = &self.logical_wal_spool {
869                spool.prune_through(prune_lsn)?;
870            }
871            self.wal_buffer.prune_through(prune_lsn);
872        }
873        Ok(prune_lsn)
874    }
875
876    pub fn replica_count(&self) -> usize {
877        self.replicas
878            .read()
879            .unwrap_or_else(|e| e.into_inner())
880            .len()
881    }
882
883    /// Current primary write position (logical WAL LSN, falling back to
884    /// the in-memory WAL buffer). Used as the reference point for
885    /// per-replica lag — including issue #826 flow control.
886    pub fn current_logical_lsn(&self) -> u64 {
887        self.logical_wal_spool
888            .as_ref()
889            .map(|spool| spool.current_lsn())
890            .unwrap_or_else(|| self.wal_buffer.current_lsn())
891    }
892
893    fn ensure_slot(&self, id: &str, initial_lsn: u64) -> u64 {
894        let now_ms = crate::utils::now_unix_millis() as u128;
895        let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
896        if let Some(slot) = slots.get_mut(id) {
897            slot.last_seen_at_unix_ms = now_ms;
898            let restart_lsn = slot.restart_lsn;
899            self.persist_slots_locked(&slots);
900            return restart_lsn;
901        }
902        let mut slot = ReplicationSlot::new(
903            id.to_string(),
904            reddb_file::TimelineId::initial(),
905            initial_lsn,
906        );
907        slot.last_seen_at_unix_ms = now_ms;
908        slots.insert(id.to_string(), slot);
909        let restart_lsn = initial_lsn;
910        self.persist_slots_locked(&slots);
911        restart_lsn
912    }
913
914    fn advance_slot(&self, id: &str, confirmed_lsn: u64, restart_lsn: u64, now_ms: u128) {
915        let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
916        let slot = slots.entry(id.to_string()).or_insert_with(|| {
917            let mut slot =
918                ReplicationSlot::new(id.to_string(), reddb_file::TimelineId::initial(), 0);
919            slot.last_seen_at_unix_ms = now_ms;
920            slot
921        });
922        if slot.invalidation_reason.is_some() {
923            return;
924        }
925        slot.confirmed_write_lsn = slot.confirmed_lsn().max(confirmed_lsn).max(restart_lsn);
926        slot.restart_lsn = slot.restart_lsn.max(restart_lsn);
927        slot.confirmed_flush_lsn = slot.confirmed_flush_lsn.max(slot.restart_lsn);
928        slot.confirmed_apply_lsn = slot.confirmed_apply_lsn.max(slot.restart_lsn);
929        slot.last_seen_at_unix_ms = now_ms;
930        self.persist_slots_locked(&slots);
931    }
932
933    pub fn touch_slot(&self, id: &str, now_ms: u128) {
934        let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
935        let mut changed = false;
936        if let Some(slot) = slots.get_mut(id) {
937            if slot.invalidation_reason.is_none() {
938                slot.last_seen_at_unix_ms = now_ms;
939                changed = true;
940            }
941        }
942        if changed {
943            self.persist_slots_locked(&slots);
944        }
945    }
946
947    pub fn enforce_retention_limits(
948        &self,
949        now_ms: u128,
950    ) -> Vec<(String, ReplicationSlotInvalidationCause)> {
951        let current_lsn = self.current_logical_lsn();
952        let mut invalidated = Vec::new();
953        let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
954        for slot in slots.values_mut() {
955            if slot.invalidation_reason.is_some() {
956                continue;
957            }
958            let reason = if self.slot_retention_max_lag_lsn > 0
959                && current_lsn.saturating_sub(slot.restart_lsn) > self.slot_retention_max_lag_lsn
960            {
961                Some(ReplicationSlotInvalidationCause::Horizon)
962            } else if self.slot_idle_timeout_ms > 0
963                && now_ms.saturating_sub(slot.last_seen_at_unix_ms)
964                    > u128::from(self.slot_idle_timeout_ms)
965            {
966                Some(ReplicationSlotInvalidationCause::IdleTimeout)
967            } else {
968                None
969            };
970            if let Some(reason) = reason {
971                slot.invalidation_reason = Some(reason);
972                slot.invalidated_at_unix_ms = Some(now_ms);
973                invalidated.push((slot.replica_id.clone(), reason));
974            }
975        }
976        if !invalidated.is_empty() {
977            self.persist_slots_locked(&slots);
978        }
979        invalidated
980    }
981
982    pub fn slot_rebootstrap_reason(
983        &self,
984        id: &str,
985        requested_since_lsn: u64,
986        oldest_available_lsn: Option<u64>,
987    ) -> Option<ReplicationSlotInvalidationCause> {
988        let now_ms = crate::utils::now_unix_millis() as u128;
989        let mut slots = self.slots.write().unwrap_or_else(|e| e.into_inner());
990        let slot = slots.get_mut(id)?;
991        if let Some(reason) = slot.invalidation_reason {
992            return Some(reason);
993        }
994        let slot_floor = slot.restart_lsn.max(requested_since_lsn);
995        if oldest_available_lsn
996            .map(|oldest| oldest > slot_floor.saturating_add(1))
997            .unwrap_or(false)
998        {
999            slot.invalidation_reason = Some(ReplicationSlotInvalidationCause::WalRemoved);
1000            slot.invalidated_at_unix_ms = Some(now_ms);
1001            self.persist_slots_locked(&slots);
1002            return Some(ReplicationSlotInvalidationCause::WalRemoved);
1003        }
1004        None
1005    }
1006
1007    /// Decide how a reconnecting replica's pull should be served
1008    /// (issue #832). If the slot is invalidated or the requested
1009    /// position has fallen behind the retained WAL floor, the replica
1010    /// must re-bootstrap; otherwise it resumes via a partial resync
1011    /// from its slot position (never rewound behind it). Every
1012    /// partial-resync decision bumps the `partial_resync_count` metric
1013    /// so a brief disconnect that recovers without a full re-bootstrap
1014    /// is observable.
1015    pub fn plan_replica_resume(
1016        &self,
1017        id: &str,
1018        requested_since_lsn: u64,
1019        oldest_available_lsn: Option<u64>,
1020    ) -> ResumeMode {
1021        if let Some(cause) =
1022            self.slot_rebootstrap_reason(id, requested_since_lsn, oldest_available_lsn)
1023        {
1024            self.full_resync_count
1025                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1026            return ResumeMode::FullRebootstrap { cause };
1027        }
1028        let resume_lsn = self
1029            .slot_snapshots()
1030            .into_iter()
1031            .find(|slot| slot.replica_id == id)
1032            .map(|slot| requested_since_lsn.max(slot.restart_lsn))
1033            .unwrap_or(requested_since_lsn);
1034        self.partial_resync_count
1035            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1036        ResumeMode::PartialResync { resume_lsn }
1037    }
1038
1039    /// Number of pulls served as a partial resync since process start.
1040    /// Surfaced in the replication metrics/status payload (issue #832).
1041    pub fn partial_resync_count(&self) -> u64 {
1042        self.partial_resync_count
1043            .load(std::sync::atomic::Ordering::Relaxed)
1044    }
1045
1046    /// Number of pulls that forced a full re-bootstrap since process
1047    /// start (issue #839). Surfaced as `reddb_replication_full_resync_total`
1048    /// and in `/replication/status` — the primary operator alert signal.
1049    pub fn full_resync_count(&self) -> u64 {
1050        self.full_resync_count
1051            .load(std::sync::atomic::Ordering::Relaxed)
1052    }
1053
1054    fn persist_slots_locked(&self, slots: &BTreeMap<String, ReplicationSlot>) {
1055        if let Err(err) = persist_replication_slots(self.slot_path.as_deref(), slots) {
1056            warn!(
1057                target: "reddb::replication::slots",
1058                error = %err,
1059                "failed to persist replication slots"
1060            );
1061        }
1062        if let Err(err) = persist_replication_slot_catalog(self.slot_catalog_path.as_deref(), slots)
1063        {
1064            warn!(
1065                target: "reddb::replication::slots",
1066                error = %err,
1067                "failed to persist binary replication slot catalog"
1068            );
1069        }
1070    }
1071}
1072
1073#[cfg(test)]
1074mod tests;