Skip to main content

reddb_server/replication/
logical.rs

1//! Logical replication helpers shared by replica apply and point-in-time restore.
2
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Condvar, Mutex};
5
6use crate::api::{RedDBError, RedDBResult};
7use crate::application::entity::metadata_from_json;
8use crate::replication::cdc::{
9    change_record_from_entity, wire_json_to_server_json, ChangeOperation, ChangeRecord,
10    RangeAdmitError, RangeAuthority,
11};
12use crate::storage::{EntityId, EntityKind, RedDB, UnifiedStore};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum ApplyMode {
16    Replica,
17    Restore,
18}
19
20/// PLAN.md Phase 11.5 — counters the replica apply loop bumps when an
21/// invariant breaks. Surfaced via `reddb_replica_apply_errors_total`.
22/// Decode errors aren't strictly apply errors but they share the same
23/// observability lane so dashboards alert on "replica is ingesting
24/// trash from primary regardless of cause".
25#[derive(Debug, Default)]
26pub struct ReplicaApplyMetrics {
27    pub gap_total: std::sync::atomic::AtomicU64,
28    pub divergence_total: std::sync::atomic::AtomicU64,
29    pub apply_error_total: std::sync::atomic::AtomicU64,
30    pub decode_error_total: std::sync::atomic::AtomicU64,
31    /// Issue #814 — a delete (or other apply) that found no target on the
32    /// replica: a missing collection or a missing entity. Non-fatal (the
33    /// LSN chain still advances so idempotent re-pull converges, see
34    /// #813), but recorded so a missed delete that drives collection-count
35    /// drift leaves a trail instead of being swallowed by `let _ =`.
36    pub apply_miss_total: std::sync::atomic::AtomicU64,
37    /// Issue #835 — a record carrying a term *behind* the replica's current
38    /// term was fenced at the apply boundary (a returning ex-primary on a
39    /// stale term). Fail-closed: the record is rejected and the LSN/term
40    /// chain does not advance, so the deposed primary cannot move any
41    /// watermark until it re-syncs under the new term.
42    pub fenced_total: std::sync::atomic::AtomicU64,
43}
44
45impl ReplicaApplyMetrics {
46    pub fn record(&self, kind: ApplyErrorKind) {
47        use std::sync::atomic::Ordering::Relaxed;
48        match kind {
49            ApplyErrorKind::Gap => {
50                self.gap_total.fetch_add(1, Relaxed);
51            }
52            ApplyErrorKind::Divergence => {
53                self.divergence_total.fetch_add(1, Relaxed);
54            }
55            ApplyErrorKind::Apply => {
56                self.apply_error_total.fetch_add(1, Relaxed);
57            }
58            ApplyErrorKind::Decode => {
59                self.decode_error_total.fetch_add(1, Relaxed);
60            }
61            ApplyErrorKind::Miss => {
62                self.apply_miss_total.fetch_add(1, Relaxed);
63            }
64            ApplyErrorKind::Fenced => {
65                self.fenced_total.fetch_add(1, Relaxed);
66            }
67        }
68    }
69
70    pub fn snapshot(&self) -> [(ApplyErrorKind, u64); 6] {
71        use std::sync::atomic::Ordering::Relaxed;
72        [
73            (ApplyErrorKind::Gap, self.gap_total.load(Relaxed)),
74            (
75                ApplyErrorKind::Divergence,
76                self.divergence_total.load(Relaxed),
77            ),
78            (ApplyErrorKind::Apply, self.apply_error_total.load(Relaxed)),
79            (
80                ApplyErrorKind::Decode,
81                self.decode_error_total.load(Relaxed),
82            ),
83            (ApplyErrorKind::Miss, self.apply_miss_total.load(Relaxed)),
84            (ApplyErrorKind::Fenced, self.fenced_total.load(Relaxed)),
85        ]
86    }
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum ApplyErrorKind {
91    Gap,
92    Divergence,
93    Apply,
94    Decode,
95    /// Issue #814 — apply ran against a missing target (delete on an
96    /// absent collection/entity). Non-fatal divergence signal.
97    Miss,
98    /// Issue #835 — a record from a term behind the replica's current term
99    /// was fenced at the apply boundary (a stale ex-primary). Fail-closed.
100    Fenced,
101}
102
103impl ApplyErrorKind {
104    pub fn label(self) -> &'static str {
105        match self {
106            Self::Gap => "gap",
107            Self::Divergence => "divergence",
108            Self::Apply => "apply",
109            Self::Decode => "decode",
110            Self::Miss => "apply_miss",
111            Self::Fenced => "fenced",
112        }
113    }
114}
115
116impl LogicalApplyError {
117    pub fn kind(&self) -> ApplyErrorKind {
118        match self {
119            Self::Gap { .. } => ApplyErrorKind::Gap,
120            Self::Divergence { .. } => ApplyErrorKind::Divergence,
121            Self::Apply { .. } => ApplyErrorKind::Apply,
122            Self::StaleTermFenced { .. } => ApplyErrorKind::Fenced,
123            Self::RangeFenced { .. } => ApplyErrorKind::Fenced,
124        }
125    }
126}
127
128/// Outcome of a single `apply` call. `Applied` advances the chain;
129/// `Idempotent` and `Skipped` are no-ops (we already saw an
130/// equal-or-newer LSN). `Gap` and `Divergence` (returned via
131/// `LogicalApplyError`) are fail-closed — callers (replica fetcher,
132/// restore loop) should mark the instance unhealthy and stop applying.
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum ApplyOutcome {
135    /// Normal monotonic advance.
136    Applied,
137    /// Same LSN as last applied with same payload hash — log + skip.
138    Idempotent,
139    /// Older LSN than what we already have — log + skip.
140    Skipped,
141}
142
143#[derive(Debug)]
144pub enum LogicalApplyError {
145    Gap {
146        last: u64,
147        next: u64,
148    },
149    Divergence {
150        expected_term: u64,
151        got_term: u64,
152        lsn: u64,
153        expected: String,
154        got: String,
155    },
156    Apply {
157        lsn: u64,
158        source: RedDBError,
159    },
160    /// Issue #835 — the record's term is behind the replica's current
161    /// term: a returning ex-primary streaming on a stale, superseded term.
162    /// Rejected at the apply boundary so the deposed primary cannot apply
163    /// or advance any watermark until it re-syncs under the new term.
164    StaleTermFenced {
165        record_term: u64,
166        current_term: u64,
167        lsn: u64,
168    },
169    /// Issue #991 — the record is stamped for a range whose authority
170    /// watermark has moved past it: a write from a deposed range owner
171    /// (stale ownership epoch) or a superseded timeline (stale term) for the
172    /// target range. Rejected at the apply boundary before the LSN state
173    /// machine runs — fail-closed, so a stale owner cannot apply or advance
174    /// the chain/watermark for that range. Shares the `Fenced` metrics lane
175    /// with the global stale-term fence.
176    RangeFenced {
177        range_id: u64,
178        lsn: u64,
179        reason: RangeAdmitError,
180    },
181}
182
183impl std::fmt::Display for LogicalApplyError {
184    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185        match self {
186            Self::Gap { last, next } => write!(f, "LSN gap on apply: last={last} next={next}"),
187            Self::StaleTermFenced {
188                record_term,
189                current_term,
190                lsn,
191            } => write!(
192                f,
193                "stale-term record fenced at lsn={lsn}: record term {record_term} is behind current term {current_term}"
194            ),
195            Self::RangeFenced {
196                range_id,
197                lsn,
198                reason,
199            } => match reason {
200                RangeAdmitError::StaleTerm {
201                    record_term,
202                    accepted_term,
203                } => write!(
204                    f,
205                    "range-stale record fenced at lsn={lsn} for range {range_id}: record term {record_term} is behind accepted term {accepted_term}"
206                ),
207                RangeAdmitError::StaleOwnershipEpoch {
208                    record_epoch,
209                    accepted_epoch,
210                } => write!(
211                    f,
212                    "range-stale record fenced at lsn={lsn} for range {range_id}: ownership epoch {record_epoch} is behind accepted epoch {accepted_epoch}"
213                ),
214            },
215            Self::Divergence {
216                expected_term,
217                got_term,
218                lsn,
219                expected,
220                got,
221            } => write!(
222                f,
223                "LSN divergence on apply at term/lsn=({got_term},{lsn}): expected term {expected_term} payload hash {expected}, got {got}"
224            ),
225            Self::Apply { lsn, source } => write!(f, "apply error at lsn={lsn}: {source}"),
226        }
227    }
228}
229
230impl std::error::Error for LogicalApplyError {}
231
232#[derive(Debug, Clone, PartialEq, Eq)]
233pub enum BookmarkWaitError {
234    Timeout { target_lsn: u64, applied_lsn: u64 },
235    TermMismatch { target_term: u64, applied_term: u64 },
236}
237
238impl BookmarkWaitError {
239    pub fn is_timeout(&self) -> bool {
240        matches!(self, Self::Timeout { .. })
241    }
242}
243
244impl std::fmt::Display for BookmarkWaitError {
245    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246        match self {
247            Self::Timeout {
248                target_lsn,
249                applied_lsn,
250            } => write!(
251                f,
252                "timed out waiting for causal bookmark lsn {target_lsn}; applied={applied_lsn}"
253            ),
254            Self::TermMismatch {
255                target_term,
256                applied_term,
257            } => write!(
258                f,
259                "causal bookmark term mismatch: target={target_term} applied={applied_term}"
260            ),
261        }
262    }
263}
264
265impl std::error::Error for BookmarkWaitError {}
266
267/// Shared logical change applier so replica replay and PITR converge on the
268/// same semantics. Stateful (PLAN.md Phase 11.5): tracks the last applied
269/// LSN + payload hash so duplicates / older LSNs / gaps / divergences are
270/// detected explicitly.
271pub struct LogicalChangeApplier {
272    last_applied_term: AtomicU64,
273    last_applied_lsn: AtomicU64,
274    received_frontier_lsn: AtomicU64,
275    last_payload_hash: Mutex<Option<[u8; 32]>>,
276    apply_wait: (Mutex<()>, Condvar),
277    /// Issue #814 — metrics the apply path bumps when a record runs
278    /// against a missing target. The production replica loop shares the
279    /// runtime's `ReplicaApplyMetrics` here so `/metrics` surfaces misses;
280    /// other callers (PITR, tests) get a private default that no one reads.
281    metrics: std::sync::Arc<ReplicaApplyMetrics>,
282}
283
284impl LogicalChangeApplier {
285    /// Build a fresh applier. `starting_lsn` is the LSN already
286    /// covered by the snapshot (or `0` for an empty replica). The
287    /// next acceptable record is any positive LSN; from there the
288    /// chain advances by 1.
289    pub fn new(starting_lsn: u64) -> Self {
290        Self::with_metrics(
291            starting_lsn,
292            std::sync::Arc::new(ReplicaApplyMetrics::default()),
293        )
294    }
295
296    /// Build an applier that records apply misses / errors into a shared
297    /// `ReplicaApplyMetrics` (issue #814). The production replica loop
298    /// passes the runtime's metrics so a swallowed delete leaves a trail
299    /// on `reddb_replica_apply_errors_total{kind="apply_miss"}`.
300    pub fn with_metrics(starting_lsn: u64, metrics: std::sync::Arc<ReplicaApplyMetrics>) -> Self {
301        Self {
302            last_applied_term: AtomicU64::new(crate::replication::DEFAULT_REPLICATION_TERM),
303            last_applied_lsn: AtomicU64::new(starting_lsn),
304            received_frontier_lsn: AtomicU64::new(starting_lsn),
305            last_payload_hash: Mutex::new(None),
306            apply_wait: (Mutex::new(()), Condvar::new()),
307            metrics,
308        }
309    }
310
311    /// The metrics handle this applier records misses/errors into.
312    pub fn metrics(&self) -> &std::sync::Arc<ReplicaApplyMetrics> {
313        &self.metrics
314    }
315
316    pub fn last_applied_lsn(&self) -> u64 {
317        self.last_applied_lsn.load(Ordering::Acquire)
318    }
319
320    pub fn received_frontier_lsn(&self) -> u64 {
321        self.received_frontier_lsn.load(Ordering::Acquire)
322    }
323
324    pub fn last_applied_term(&self) -> u64 {
325        self.last_applied_term.load(Ordering::Acquire)
326    }
327
328    pub fn wait_for_bookmark(
329        &self,
330        bookmark: &crate::replication::CausalBookmark,
331        timeout: std::time::Duration,
332    ) -> Result<(), BookmarkWaitError> {
333        let deadline = std::time::Instant::now() + timeout;
334        let target_lsn = bookmark.commit_lsn();
335        let target_term = bookmark.term();
336
337        let mut guard = self.apply_wait.0.lock().expect("apply wait mutex");
338        loop {
339            let applied_lsn = self.last_applied_lsn();
340            let applied_term = self.last_applied_term();
341            if applied_lsn >= target_lsn {
342                if applied_term == target_term {
343                    return Ok(());
344                }
345                return Err(BookmarkWaitError::TermMismatch {
346                    target_term,
347                    applied_term,
348                });
349            }
350
351            let now = std::time::Instant::now();
352            if now >= deadline {
353                return Err(BookmarkWaitError::Timeout {
354                    target_lsn,
355                    applied_lsn,
356                });
357            }
358            let remaining = deadline.saturating_duration_since(now);
359            let (next_guard, wait_result) = self
360                .apply_wait
361                .1
362                .wait_timeout(guard, remaining)
363                .expect("apply wait condvar");
364            guard = next_guard;
365            if wait_result.timed_out() {
366                return Err(BookmarkWaitError::Timeout {
367                    target_lsn,
368                    applied_lsn: self.last_applied_lsn(),
369                });
370            }
371        }
372    }
373
374    /// Apply one logical change record. The state machine:
375    /// - first record after `starting_lsn == 0` → apply, anchor.
376    /// - `lsn == last + 1` → apply, advance.
377    /// - `lsn == last` && payload hash equal → idempotent skip.
378    /// - `lsn == last` && payload hash differs → `Divergence` (fail closed).
379    /// - `lsn < last` → older replay, skip with debug log.
380    /// - `lsn > last + 1` → `Gap` (fail closed; caller marks unhealthy).
381    pub fn apply(
382        &self,
383        db: &RedDB,
384        record: &ChangeRecord,
385        mode: ApplyMode,
386    ) -> Result<ApplyOutcome, LogicalApplyError> {
387        self.apply_fenced(db, record, mode, None)
388    }
389
390    /// Apply one record, first gating it against the target range's authority
391    /// watermark (issue #991). When `range_fence` is `Some`, a record stamped
392    /// for that range whose term or ownership epoch is behind the watermark is
393    /// rejected before the global term fence and the LSN state machine run —
394    /// fail-closed, so a deposed range owner cannot advance anything. Records
395    /// for a different range, or with no range identity (legacy /
396    /// non-range-replicated), pass the range fence untouched. `apply` is the
397    /// unfenced shorthand for callers that do not yet hold range authority.
398    pub fn apply_fenced(
399        &self,
400        db: &RedDB,
401        record: &ChangeRecord,
402        mode: ApplyMode,
403        range_fence: Option<&RangeAuthority>,
404    ) -> Result<ApplyOutcome, LogicalApplyError> {
405        let last = self.last_applied_lsn.load(Ordering::Acquire);
406        let last_term = self.last_applied_term.load(Ordering::Acquire);
407
408        // Per-range authority fence (issue #991). Runs before the global
409        // term fence so a stale ownership epoch is rejected even when the
410        // record's term is otherwise current. Only records stamped for the
411        // fence's range are gated; the rest fall through.
412        if let Some(fence) = range_fence {
413            if let Err(reason) = fence.admit(record) {
414                self.metrics.record(ApplyErrorKind::Fenced);
415                return Err(LogicalApplyError::RangeFenced {
416                    range_id: fence.range_id,
417                    lsn: record.lsn,
418                    reason,
419                });
420            }
421        }
422
423        // Stale-term fence (issue #835, ADR 0030). A record from a term
424        // *behind* the highest term this replica has adopted is a returning
425        // ex-primary on a superseded timeline. Reject it before the LSN
426        // state machine runs — fail closed regardless of LSN, so a stale
427        // ex-primary can neither apply nor advance the chain/watermark. A
428        // record on the *same* term is admitted; a *higher* term is the new
429        // primary's timeline and is adopted on apply below. This mirrors the
430        // election-side `RefusalReason::StaleTerm` on the data path.
431        if record.term < last_term {
432            self.metrics.record(ApplyErrorKind::Fenced);
433            return Err(LogicalApplyError::StaleTermFenced {
434                record_term: record.term,
435                current_term: last_term,
436                lsn: record.lsn,
437            });
438        }
439
440        let payload_hash = record_payload_hash(record);
441        self.received_frontier_lsn
442            .fetch_max(record.lsn, Ordering::AcqRel);
443
444        if last == 0 && record.lsn > 0 {
445            self.do_apply(db, record, mode)?;
446            self.last_applied_term.store(record.term, Ordering::Release);
447            self.last_applied_lsn.store(record.lsn, Ordering::Release);
448            *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
449            self.apply_wait.1.notify_all();
450            return Ok(ApplyOutcome::Applied);
451        }
452
453        if record.lsn == last {
454            let prior = *self.last_payload_hash.lock().expect("payload hash mutex");
455            return match prior {
456                Some(p) if p == payload_hash => Ok(ApplyOutcome::Idempotent),
457                Some(p) => Err(LogicalApplyError::Divergence {
458                    expected_term: last_term,
459                    got_term: record.term,
460                    lsn: record.lsn,
461                    expected: hex_digest(&p),
462                    got: hex_digest(&payload_hash),
463                }),
464                None => Ok(ApplyOutcome::Idempotent),
465            };
466        }
467        if record.lsn < last {
468            return Ok(ApplyOutcome::Skipped);
469        }
470        if record.lsn > last + 1 {
471            return Err(LogicalApplyError::Gap {
472                last,
473                next: record.lsn,
474            });
475        }
476
477        self.do_apply(db, record, mode)?;
478        self.last_applied_term.store(record.term, Ordering::Release);
479        self.last_applied_lsn.store(record.lsn, Ordering::Release);
480        *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
481        self.apply_wait.1.notify_all();
482        Ok(ApplyOutcome::Applied)
483    }
484
485    fn do_apply(
486        &self,
487        db: &RedDB,
488        record: &ChangeRecord,
489        mode: ApplyMode,
490    ) -> Result<(), LogicalApplyError> {
491        Self::apply_record_with_metrics(db, record, mode, &self.metrics).map_err(|err| {
492            LogicalApplyError::Apply {
493                lsn: record.lsn,
494                source: err,
495            }
496        })
497    }
498
499    /// Stateless apply — applies the record without monotonicity
500    /// checks. Kept for callers that don't yet thread the stateful
501    /// applier through. New code should prefer
502    /// `LogicalChangeApplier::new()` + `apply()`. Apply misses (delete
503    /// against a missing target) are recorded into a throwaway metrics
504    /// handle; use [`apply_record_with_metrics`] to surface them.
505    pub fn apply_record(db: &RedDB, record: &ChangeRecord, mode: ApplyMode) -> RedDBResult<()> {
506        Self::apply_record_with_metrics(db, record, mode, &ReplicaApplyMetrics::default())
507    }
508
509    /// Stateless apply that records apply misses (issue #814) into
510    /// `metrics`. A delete against a missing collection or a missing
511    /// entity is a non-fatal divergence signal: it bumps
512    /// `ApplyErrorKind::Miss` and emits a structured warn line, but still
513    /// returns `Ok(())` so the LSN chain advances and idempotent re-pull
514    /// (#813) converges. A genuine (non-missing-target) store error on a
515    /// delete propagates as a real apply error — counted, fail-closed —
516    /// rather than being swallowed by the old `let _ =`.
517    pub fn apply_record_with_metrics(
518        db: &RedDB,
519        record: &ChangeRecord,
520        _mode: ApplyMode,
521        metrics: &ReplicaApplyMetrics,
522    ) -> RedDBResult<()> {
523        let store = db.store();
524        match record.operation {
525            ChangeOperation::Delete => {
526                match store.delete(&record.collection, EntityId::new(record.entity_id)) {
527                    Ok(true) => {}
528                    Ok(false) => {
529                        // Target collection existed but no such entity —
530                        // the delete found nothing to remove.
531                        metrics.record(ApplyErrorKind::Miss);
532                        tracing::warn!(
533                            target: "reddb::replication::apply",
534                            lsn = record.lsn,
535                            collection = %record.collection,
536                            entity_id = record.entity_id,
537                            "replica delete found no matching entity; recorded apply miss (non-fatal divergence signal)"
538                        );
539                    }
540                    Err(crate::storage::StoreError::CollectionNotFound(name)) => {
541                        // The whole collection is absent on the replica —
542                        // a missed delete that can drive count drift.
543                        metrics.record(ApplyErrorKind::Miss);
544                        tracing::warn!(
545                            target: "reddb::replication::apply",
546                            lsn = record.lsn,
547                            collection = %name,
548                            entity_id = record.entity_id,
549                            "replica delete against missing collection; recorded apply miss (non-fatal divergence signal)"
550                        );
551                    }
552                    Err(err) => {
553                        // A real store error is a genuine apply failure:
554                        // surface it instead of discarding it so the
555                        // caller counts it and the replica fails closed.
556                        return Err(RedDBError::Internal(err.to_string()));
557                    }
558                }
559            }
560            ChangeOperation::Refresh => {
561                // Issue #596 slice 9d — replica replay of
562                // `REFRESH MATERIALIZED VIEW v`. The primary
563                // emitted the serialized backing-collection
564                // contents in `refresh_records`; apply the
565                // atomic swap on the replica's local store
566                // (which also persists a `RefreshCollection`
567                // WAL action so the post-swap contents survive
568                // a replica restart).
569                let records = record.refresh_records.clone().ok_or_else(|| {
570                    RedDBError::Internal(
571                        "replication refresh record missing refresh_records payload".to_string(),
572                    )
573                })?;
574                store
575                    .refresh_collection_from_records(&record.collection, records)
576                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
577            }
578            ChangeOperation::Insert | ChangeOperation::Update => {
579                let Some(bytes) = &record.entity_bytes else {
580                    return Err(RedDBError::Internal(
581                        "replication record missing entity payload".to_string(),
582                    ));
583                };
584                let entity = UnifiedStore::deserialize_entity(bytes, store.format_version())
585                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
586
587                // Issue #813 — MVCC table-row supersession on the replica.
588                //
589                // A table-row UPDATE on the primary installs a NEW physical
590                // version (fresh `EntityId`) that shares the row's stable
591                // `logical_id`, and marks the prior version superseded
592                // (`xmax != 0`) so snapshot reads skip it. Only the new
593                // version travels on the wire — the prior version's `xmax`
594                // bump is implicit. Without reproducing it here the replica
595                // leaves every prior version LIVE, so each update to a row
596                // accumulates a stale live duplicate and a full re-pull
597                // replays them all (the observed 22× inflation). Before
598                // upserting the incoming version, mark any *other* live
599                // version of the same logical id superseded — mirroring
600                // `install_versioned_table_row_update` on the primary. This
601                // is idempotent under re-pull: re-applying a record updates
602                // its version in place (resetting its `xmax` from the
603                // serialized bytes), and the last writer per logical id in
604                // LSN order wins, converging on the primary's live set.
605                if matches!(entity.kind, EntityKind::TableRow { .. }) {
606                    let logical = entity.logical_id();
607                    let new_id = entity.id;
608                    let superseding_xid = if entity.xmin != 0 { entity.xmin } else { 1 };
609                    let stale: Vec<_> = store
610                        .table_row_versions_by_logical_id(&record.collection, logical)
611                        .into_iter()
612                        .filter(|version| version.id != new_id && version.xmax == 0)
613                        .collect();
614                    if !stale.is_empty() {
615                        let manager = store
616                            .get_collection(&record.collection)
617                            .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
618                        for mut version in stale {
619                            version.set_xmax(superseding_xid);
620                            manager
621                                .update(version)
622                                .map_err(|err| RedDBError::Internal(err.to_string()))?;
623                        }
624                    }
625                }
626
627                let exists = store
628                    .get(&record.collection, EntityId::new(record.entity_id))
629                    .is_some();
630                if exists {
631                    let manager = store
632                        .get_collection(&record.collection)
633                        .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
634                    manager
635                        .update(entity.clone())
636                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
637                } else {
638                    store
639                        .insert_auto(&record.collection, entity.clone())
640                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
641                }
642                if let Some(metadata_json) = &record.metadata {
643                    let metadata_json = wire_json_to_server_json(metadata_json);
644                    let metadata = metadata_from_json(&metadata_json)
645                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
646                    store
647                        .set_metadata(&record.collection, entity.id, metadata)
648                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
649                }
650                store
651                    .context_index()
652                    .index_entity(&record.collection, &entity);
653            }
654        }
655        Ok(())
656    }
657}
658
659fn record_payload_hash(record: &ChangeRecord) -> [u8; 32] {
660    let mut hasher = crate::crypto::sha256::Sha256::new();
661    hasher.update(&record.term.to_le_bytes());
662    hasher.update(&record.lsn.to_le_bytes());
663    hasher.update(&[record.operation as u8]);
664    hasher.update(record.collection.as_bytes());
665    hasher.update(&record.entity_id.to_le_bytes());
666    // Issue #991 — range authority participates in the payload hash so two
667    // records at the same LSN that differ only in range identity or ownership
668    // epoch are flagged divergent rather than silently treated as idempotent.
669    // `u64::MAX` stands in for an absent field (a value real epochs/ids never
670    // reach) so `None` and `Some(MAX)` stay distinguishable.
671    hasher.update(&record.range_id.unwrap_or(u64::MAX).to_le_bytes());
672    hasher.update(&record.ownership_epoch.unwrap_or(u64::MAX).to_le_bytes());
673    if let Some(bytes) = &record.entity_bytes {
674        hasher.update(bytes);
675    }
676    // Issue #596 slice 9d — refresh payload participates in the
677    // payload-hash so the same-LSN-idempotent / different-payload-
678    // divergence state machine works for Refresh records too.
679    if let Some(records) = &record.refresh_records {
680        hasher.update(&(records.len() as u64).to_le_bytes());
681        for r in records {
682            hasher.update(&(r.len() as u64).to_le_bytes());
683            hasher.update(r);
684        }
685    }
686    hasher.finalize()
687}
688
689fn hex_digest(bytes: &[u8; 32]) -> String {
690    crate::utils::to_hex(bytes)
691}
692
693#[cfg(test)]
694mod tests {
695    use super::*;
696    use crate::replication::cdc::ChangeOperation;
697    use crate::storage::schema::Value;
698    use crate::storage::{EntityData, EntityId, EntityKind, RedDB, RowData, UnifiedEntity};
699    use std::sync::Arc;
700
701    fn open_db() -> (RedDB, std::path::PathBuf) {
702        let path = std::env::temp_dir().join(format!(
703            "reddb_logical_apply_{}_{}",
704            std::process::id(),
705            std::time::SystemTime::now()
706                .duration_since(std::time::UNIX_EPOCH)
707                .unwrap()
708                .as_nanos()
709        ));
710        let _ = std::fs::remove_file(&path);
711        let db = RedDB::open(&path).unwrap();
712        (db, path)
713    }
714
715    fn record(lsn: u64, payload: &[u8]) -> ChangeRecord {
716        let timestamp = 100 + lsn;
717        let mut entity = UnifiedEntity::new(
718            EntityId::new(lsn),
719            EntityKind::TableRow {
720                table: Arc::from("users"),
721                row_id: lsn,
722            },
723            EntityData::Row(RowData::with_names(
724                vec![Value::UnsignedInteger(lsn), Value::Blob(payload.to_vec())],
725                vec!["id".to_string(), "payload".to_string()],
726            )),
727        );
728        entity.created_at = timestamp;
729        entity.updated_at = timestamp;
730        entity.sequence_id = lsn;
731        change_record_from_entity(
732            lsn,
733            timestamp,
734            ChangeOperation::Insert,
735            "users",
736            "row",
737            &entity,
738            crate::api::REDDB_FORMAT_VERSION,
739            None,
740        )
741    }
742
743    fn delete_record(lsn: u64, collection: &str, entity_id: u64) -> ChangeRecord {
744        ChangeRecord {
745            term: crate::replication::DEFAULT_REPLICATION_TERM,
746            lsn,
747            timestamp: 100 + lsn,
748            operation: ChangeOperation::Delete,
749            collection: collection.to_string(),
750            entity_id,
751            entity_kind: "row".to_string(),
752            entity_bytes: None,
753            metadata: None,
754            refresh_records: None,
755            range_id: None,
756            ownership_epoch: None,
757        }
758    }
759
760    fn table_row_entity(id: u64) -> UnifiedEntity {
761        let mut entity = UnifiedEntity::new(
762            EntityId::new(id),
763            EntityKind::TableRow {
764                table: Arc::from("users"),
765                row_id: id,
766            },
767            EntityData::Row(RowData::with_names(
768                vec![Value::UnsignedInteger(id)],
769                vec!["id".to_string()],
770            )),
771        );
772        entity.created_at = 100 + id;
773        entity.updated_at = 100 + id;
774        entity.sequence_id = id;
775        entity
776    }
777
778    // Issue #814 — a delete against a missing collection must record an
779    // apply miss (not a silent no-op) and still return Ok so the LSN
780    // chain advances (idempotent re-pull, #813).
781    #[test]
782    fn delete_against_missing_collection_records_apply_miss() {
783        let (db, path) = open_db();
784        let metrics = ReplicaApplyMetrics::default();
785        let before = metrics.apply_miss_total.load(Ordering::Relaxed);
786
787        LogicalChangeApplier::apply_record_with_metrics(
788            &db,
789            &delete_record(1, "no_such_collection", 42),
790            ApplyMode::Replica,
791            &metrics,
792        )
793        .expect("missing-target delete is non-fatal");
794
795        assert_eq!(
796            metrics.apply_miss_total.load(Ordering::Relaxed),
797            before + 1,
798            "delete against a missing collection must bump the apply-miss signal"
799        );
800        let _ = std::fs::remove_file(path);
801    }
802
803    // Issue #814 — a delete against an existing collection but absent
804    // entity is likewise a recorded miss, not a swallowed no-op.
805    #[test]
806    fn delete_against_missing_entity_records_apply_miss() {
807        let (db, path) = open_db();
808        let _ = db.store().get_or_create_collection("users");
809        let metrics = ReplicaApplyMetrics::default();
810
811        LogicalChangeApplier::apply_record_with_metrics(
812            &db,
813            &delete_record(1, "users", 9999),
814            ApplyMode::Replica,
815            &metrics,
816        )
817        .expect("missing-entity delete is non-fatal");
818
819        assert_eq!(
820            metrics.apply_miss_total.load(Ordering::Relaxed),
821            1,
822            "delete of an absent entity must bump the apply-miss signal"
823        );
824        let _ = std::fs::remove_file(path);
825    }
826
827    // Issue #814 — the normal path (target present) deletes without
828    // firing the miss signal. No behavioral regression.
829    #[test]
830    fn delete_of_present_target_records_no_apply_miss() {
831        let (db, path) = open_db();
832        let store = db.store();
833        let _ = store.get_or_create_collection("users");
834        let id = store
835            .insert_auto("users", table_row_entity(1))
836            .expect("insert entity");
837        let metrics = ReplicaApplyMetrics::default();
838
839        LogicalChangeApplier::apply_record_with_metrics(
840            &db,
841            &delete_record(1, "users", id.raw()),
842            ApplyMode::Replica,
843            &metrics,
844        )
845        .expect("present-target delete applies");
846
847        assert_eq!(
848            metrics.apply_miss_total.load(Ordering::Relaxed),
849            0,
850            "deleting a present target must not fire the apply-miss signal"
851        );
852        assert!(
853            store.get("users", id).is_none(),
854            "the entity must actually be removed on the normal path"
855        );
856        let _ = std::fs::remove_file(path);
857    }
858
859    // Issue #814 — the shared-metrics handle on the stateful applier
860    // surfaces the miss so `/metrics` (which reads the same Arc) sees it.
861    #[test]
862    fn stateful_apply_surfaces_delete_miss_via_metrics_handle() {
863        let (db, path) = open_db();
864        let applier =
865            LogicalChangeApplier::with_metrics(0, Arc::new(ReplicaApplyMetrics::default()));
866
867        applier
868            .apply(&db, &delete_record(1, "ghost", 7), ApplyMode::Replica)
869            .expect("missing-target delete advances the chain");
870
871        assert_eq!(
872            applier.metrics().apply_miss_total.load(Ordering::Relaxed),
873            1,
874            "the applier's shared metrics handle must record the miss"
875        );
876        assert_eq!(
877            applier.last_applied_lsn(),
878            1,
879            "a non-fatal miss still advances the LSN chain"
880        );
881        let _ = std::fs::remove_file(path);
882    }
883
884    #[test]
885    fn apply_advances_on_monotonic_lsn() {
886        let (db, path) = open_db();
887        let applier = LogicalChangeApplier::new(0);
888        assert_eq!(
889            applier
890                .apply(&db, &record(1, b"a"), ApplyMode::Replica)
891                .unwrap(),
892            ApplyOutcome::Applied
893        );
894        assert_eq!(applier.last_applied_lsn(), 1);
895        assert_eq!(
896            applier
897                .apply(&db, &record(2, b"b"), ApplyMode::Replica)
898                .unwrap(),
899            ApplyOutcome::Applied
900        );
901        assert_eq!(applier.last_applied_lsn(), 2);
902        let _ = std::fs::remove_file(path);
903    }
904
905    #[test]
906    fn apply_idempotent_on_duplicate_lsn_same_payload() {
907        let (db, path) = open_db();
908        let applier = LogicalChangeApplier::new(0);
909        let r = record(5, b"same");
910        applier.apply(&db, &r, ApplyMode::Replica).unwrap();
911        assert_eq!(
912            applier.apply(&db, &r, ApplyMode::Replica).unwrap(),
913            ApplyOutcome::Idempotent
914        );
915        assert_eq!(applier.last_applied_lsn(), 5);
916        let _ = std::fs::remove_file(path);
917    }
918
919    #[test]
920    fn apply_fails_closed_on_lsn_collision_diff_payload() {
921        let (db, path) = open_db();
922        let applier = LogicalChangeApplier::new(0);
923        applier
924            .apply(&db, &record(7, b"first"), ApplyMode::Replica)
925            .unwrap();
926        let err = applier
927            .apply(&db, &record(7, b"different"), ApplyMode::Replica)
928            .unwrap_err();
929        assert!(
930            matches!(err, LogicalApplyError::Divergence { lsn: 7, .. }),
931            "got {err:?}"
932        );
933        let _ = std::fs::remove_file(path);
934    }
935
936    #[test]
937    fn apply_fails_closed_on_same_lsn_different_term() {
938        let (db, path) = open_db();
939        let applier = LogicalChangeApplier::new(0);
940        applier
941            .apply(&db, &record(7, b"same").with_term(1), ApplyMode::Replica)
942            .unwrap();
943        let err = applier
944            .apply(&db, &record(7, b"same").with_term(2), ApplyMode::Replica)
945            .unwrap_err();
946        assert!(
947            matches!(
948                err,
949                LogicalApplyError::Divergence {
950                    lsn: 7,
951                    expected_term: 1,
952                    got_term: 2,
953                    ..
954                }
955            ),
956            "got {err:?}"
957        );
958        assert_eq!(applier.last_applied_term(), 1);
959        assert_eq!(applier.last_applied_lsn(), 7);
960        let _ = std::fs::remove_file(path);
961    }
962
963    // Issue #835 — a record from a term behind the replica's adopted term
964    // is fenced at the apply boundary: rejected, counted, and crucially the
965    // LSN/term chain does NOT advance, so a returning ex-primary on a stale
966    // term cannot move any watermark.
967    #[test]
968    fn apply_fences_stale_term_record() {
969        let (db, path) = open_db();
970        let applier = LogicalChangeApplier::new(0);
971
972        // Replica adopts term 5 from the legitimate primary at lsn 1.
973        applier
974            .apply(&db, &record(1, b"a").with_term(5), ApplyMode::Replica)
975            .unwrap();
976        assert_eq!(applier.last_applied_term(), 5);
977        assert_eq!(applier.last_applied_lsn(), 1);
978
979        // A returning ex-primary streams the next record on the old term 4.
980        let before = applier.metrics().fenced_total.load(Ordering::Relaxed);
981        let err = applier
982            .apply(&db, &record(2, b"b").with_term(4), ApplyMode::Replica)
983            .unwrap_err();
984        assert!(
985            matches!(
986                err,
987                LogicalApplyError::StaleTermFenced {
988                    record_term: 4,
989                    current_term: 5,
990                    lsn: 2,
991                }
992            ),
993            "got {err:?}"
994        );
995        assert_eq!(err.kind(), ApplyErrorKind::Fenced);
996        assert_eq!(
997            applier.metrics().fenced_total.load(Ordering::Relaxed),
998            before + 1,
999            "the fence must leave a metrics trail"
1000        );
1001        // The fenced record advanced nothing — no apply, no watermark move.
1002        assert_eq!(applier.last_applied_lsn(), 1, "watermark must not advance");
1003        assert_eq!(applier.last_applied_term(), 5);
1004        assert_eq!(
1005            applier.received_frontier_lsn(),
1006            1,
1007            "a fenced record must not even advance the received frontier"
1008        );
1009        let _ = std::fs::remove_file(path);
1010    }
1011
1012    // The fence only bites a *behind* term. A record on the same term
1013    // applies normally, and a record on a higher term is the new primary's
1014    // timeline — adopted on apply.
1015    #[test]
1016    fn apply_admits_same_term_and_adopts_higher_term() {
1017        let (db, path) = open_db();
1018        let applier = LogicalChangeApplier::new(0);
1019
1020        applier
1021            .apply(&db, &record(1, b"a").with_term(3), ApplyMode::Replica)
1022            .unwrap();
1023        // Same term → admitted.
1024        applier
1025            .apply(&db, &record(2, b"b").with_term(3), ApplyMode::Replica)
1026            .unwrap();
1027        assert_eq!(applier.last_applied_term(), 3);
1028        // Higher term → adopted.
1029        applier
1030            .apply(&db, &record(3, b"c").with_term(7), ApplyMode::Replica)
1031            .unwrap();
1032        assert_eq!(applier.last_applied_term(), 7);
1033        assert_eq!(applier.last_applied_lsn(), 3);
1034
1035        // Now a record back on term 3 is fenced.
1036        let err = applier
1037            .apply(&db, &record(4, b"d").with_term(3), ApplyMode::Replica)
1038            .unwrap_err();
1039        assert!(
1040            matches!(err, LogicalApplyError::StaleTermFenced { .. }),
1041            "got {err:?}"
1042        );
1043        let _ = std::fs::remove_file(path);
1044    }
1045
1046    // Issue #991 — a record stamped for the target range but carrying an
1047    // ownership epoch behind the range's accepted epoch is a write from a
1048    // deposed owner. It is fenced at the apply boundary: rejected, counted on
1049    // the Fenced lane, and the LSN/term chain does not advance.
1050    #[test]
1051    fn apply_fenced_rejects_stale_ownership_epoch() {
1052        let (db, path) = open_db();
1053        let applier = LogicalChangeApplier::new(0);
1054        let fence = RangeAuthority {
1055            range_id: 7,
1056            min_term: 1,
1057            min_ownership_epoch: 5,
1058        };
1059
1060        let stale = record(1, b"a").with_range_authority(7, 4);
1061        let before = applier.metrics().fenced_total.load(Ordering::Relaxed);
1062        let err = applier
1063            .apply_fenced(&db, &stale, ApplyMode::Replica, Some(&fence))
1064            .unwrap_err();
1065        assert!(
1066            matches!(
1067                err,
1068                LogicalApplyError::RangeFenced {
1069                    range_id: 7,
1070                    lsn: 1,
1071                    reason: RangeAdmitError::StaleOwnershipEpoch {
1072                        record_epoch: 4,
1073                        accepted_epoch: 5,
1074                    },
1075                }
1076            ),
1077            "got {err:?}"
1078        );
1079        assert_eq!(err.kind(), ApplyErrorKind::Fenced);
1080        assert_eq!(
1081            applier.metrics().fenced_total.load(Ordering::Relaxed),
1082            before + 1
1083        );
1084        assert_eq!(applier.last_applied_lsn(), 0, "watermark must not advance");
1085        assert_eq!(
1086            applier.received_frontier_lsn(),
1087            0,
1088            "a range-fenced record must not advance the received frontier"
1089        );
1090        let _ = std::fs::remove_file(path);
1091    }
1092
1093    // Issue #991 — the same fence applies on the recovery/restore path: a
1094    // record on a stale term for the target range is rejected there too.
1095    #[test]
1096    fn apply_fenced_rejects_stale_range_term_on_restore() {
1097        let (db, path) = open_db();
1098        let applier = LogicalChangeApplier::new(0);
1099        let fence = RangeAuthority {
1100            range_id: 3,
1101            min_term: 6,
1102            min_ownership_epoch: 1,
1103        };
1104
1105        let stale = record(1, b"a").with_term(4).with_range_authority(3, 9);
1106        let err = applier
1107            .apply_fenced(&db, &stale, ApplyMode::Restore, Some(&fence))
1108            .unwrap_err();
1109        assert!(
1110            matches!(
1111                err,
1112                LogicalApplyError::RangeFenced {
1113                    range_id: 3,
1114                    reason: RangeAdmitError::StaleTerm {
1115                        record_term: 4,
1116                        accepted_term: 6,
1117                    },
1118                    ..
1119                }
1120            ),
1121            "got {err:?}"
1122        );
1123        let _ = std::fs::remove_file(path);
1124    }
1125
1126    // Issue #991 — a record current for the range applies through the fence,
1127    // and a record for a *different* range is not gated by this fence.
1128    #[test]
1129    fn apply_fenced_admits_current_and_ignores_other_ranges() {
1130        let (db, path) = open_db();
1131        let applier = LogicalChangeApplier::new(0);
1132        let fence = RangeAuthority {
1133            range_id: 7,
1134            min_term: 1,
1135            min_ownership_epoch: 5,
1136        };
1137
1138        // Current epoch for the fenced range → applies and advances.
1139        applier
1140            .apply_fenced(
1141                &db,
1142                &record(1, b"a").with_range_authority(7, 5),
1143                ApplyMode::Replica,
1144                Some(&fence),
1145            )
1146            .expect("current record applies");
1147        assert_eq!(applier.last_applied_lsn(), 1);
1148
1149        // A record stamped for a different (stale-looking) range is not this
1150        // fence's concern and still applies.
1151        applier
1152            .apply_fenced(
1153                &db,
1154                &record(2, b"b").with_range_authority(99, 1),
1155                ApplyMode::Replica,
1156                Some(&fence),
1157            )
1158            .expect("other-range record bypasses this fence");
1159        assert_eq!(applier.last_applied_lsn(), 2);
1160        let _ = std::fs::remove_file(path);
1161    }
1162
1163    // Issue #992 — end-to-end range-indexed catch-up over the single physical
1164    // WAL: a follower plans its range's work out of the shared stream, then
1165    // applies exactly that work through the same `apply_fenced` gate. The other
1166    // range's records and a deposed-owner write never reach apply.
1167    #[test]
1168    fn range_catchup_plan_applies_only_its_range_through_the_fence() {
1169        use crate::replication::cdc::{plan_range_catchup, RangeStreamPosition, RangeStreamReject};
1170
1171        let (db, path) = open_db();
1172        let applier = LogicalChangeApplier::new(0);
1173
1174        // One sequential WAL slice: range 7 at LSN 1..3 (epoch 5), range 9 at
1175        // 4..5, and a returning ex-owner of range 7 at LSN 6 on a stale epoch.
1176        let stream = vec![
1177            record(1, b"a").with_range_authority(7, 5),
1178            record(2, b"b").with_range_authority(7, 5),
1179            record(3, b"c").with_range_authority(7, 5),
1180            record(4, b"d").with_range_authority(9, 2),
1181            record(5, b"e").with_range_authority(9, 2),
1182            record(6, b"f").with_range_authority(7, 4),
1183        ];
1184
1185        // Range-7 follower resumes from origin, already knowing owner epoch 5.
1186        let position = RangeStreamPosition::new(7, 0, 1, 5);
1187        let plan = plan_range_catchup(&position, &stream);
1188
1189        // Only range 7's current records were selected; the stale-epoch write
1190        // is rejected, not selected.
1191        assert_eq!(plan.apply, vec![0, 1, 2]);
1192        assert_eq!(
1193            plan.rejected,
1194            vec![RangeStreamReject {
1195                lsn: 6,
1196                error: RangeAdmitError::StaleOwnershipEpoch {
1197                    record_epoch: 4,
1198                    accepted_epoch: 5,
1199                },
1200            }]
1201        );
1202        assert_eq!(plan.resume.applied_lsn, 3);
1203
1204        // Apply exactly the planned records through the per-range fence.
1205        let fence = position.authority();
1206        for index in &plan.apply {
1207            applier
1208                .apply_fenced(&db, &stream[*index], ApplyMode::Replica, Some(&fence))
1209                .expect("planned record applies through the fence");
1210        }
1211        assert_eq!(applier.last_applied_lsn(), 3);
1212
1213        // The deposed-owner record the plan refused is exactly what the apply
1214        // fence would also reject, never advancing the chain.
1215        let stale = &stream[5];
1216        let err = applier
1217            .apply_fenced(&db, stale, ApplyMode::Replica, Some(&fence))
1218            .unwrap_err();
1219        assert!(
1220            matches!(err, LogicalApplyError::RangeFenced { range_id: 7, .. }),
1221            "got {err:?}"
1222        );
1223        assert_eq!(
1224            applier.last_applied_lsn(),
1225            3,
1226            "stale write must not advance"
1227        );
1228        let _ = std::fs::remove_file(path);
1229    }
1230
1231    #[test]
1232    fn apply_skips_older_lsn() {
1233        let (db, path) = open_db();
1234        let applier = LogicalChangeApplier::new(0);
1235        applier
1236            .apply(&db, &record(1, b"a"), ApplyMode::Replica)
1237            .unwrap();
1238        applier
1239            .apply(&db, &record(2, b"b"), ApplyMode::Replica)
1240            .unwrap();
1241        assert_eq!(
1242            applier
1243                .apply(&db, &record(1, b"a"), ApplyMode::Replica)
1244                .unwrap(),
1245            ApplyOutcome::Skipped
1246        );
1247        assert_eq!(applier.last_applied_lsn(), 2);
1248        let _ = std::fs::remove_file(path);
1249    }
1250
1251    #[test]
1252    fn apply_returns_gap_on_future_lsn() {
1253        let (db, path) = open_db();
1254        let applier = LogicalChangeApplier::new(0);
1255        applier
1256            .apply(&db, &record(1, b"a"), ApplyMode::Replica)
1257            .unwrap();
1258        let err = applier
1259            .apply(&db, &record(5, b"e"), ApplyMode::Replica)
1260            .unwrap_err();
1261        assert!(
1262            matches!(err, LogicalApplyError::Gap { last: 1, next: 5 }),
1263            "got {err:?}"
1264        );
1265        assert_eq!(applier.last_applied_lsn(), 1);
1266        let _ = std::fs::remove_file(path);
1267    }
1268}