Skip to main content

reddb_server/replication/
logical.rs

1//! Logical replication helpers shared by replica apply and point-in-time restore.
2
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Condvar, Mutex};
5
6use crate::api::{RedDBError, RedDBResult};
7use crate::application::entity::metadata_from_json;
8use crate::replication::cdc::{ChangeOperation, ChangeRecord};
9use crate::storage::{EntityId, EntityKind, RedDB, UnifiedStore};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ApplyMode {
13    Replica,
14    Restore,
15}
16
17/// PLAN.md Phase 11.5 — counters the replica apply loop bumps when an
18/// invariant breaks. Surfaced via `reddb_replica_apply_errors_total`.
19/// Decode errors aren't strictly apply errors but they share the same
20/// observability lane so dashboards alert on "replica is ingesting
21/// trash from primary regardless of cause".
22#[derive(Debug, Default)]
23pub struct ReplicaApplyMetrics {
24    pub gap_total: std::sync::atomic::AtomicU64,
25    pub divergence_total: std::sync::atomic::AtomicU64,
26    pub apply_error_total: std::sync::atomic::AtomicU64,
27    pub decode_error_total: std::sync::atomic::AtomicU64,
28    /// Issue #814 — a delete (or other apply) that found no target on the
29    /// replica: a missing collection or a missing entity. Non-fatal (the
30    /// LSN chain still advances so idempotent re-pull converges, see
31    /// #813), but recorded so a missed delete that drives collection-count
32    /// drift leaves a trail instead of being swallowed by `let _ =`.
33    pub apply_miss_total: std::sync::atomic::AtomicU64,
34    /// Issue #835 — a record carrying a term *behind* the replica's current
35    /// term was fenced at the apply boundary (a returning ex-primary on a
36    /// stale term). Fail-closed: the record is rejected and the LSN/term
37    /// chain does not advance, so the deposed primary cannot move any
38    /// watermark until it re-syncs under the new term.
39    pub fenced_total: std::sync::atomic::AtomicU64,
40}
41
42impl ReplicaApplyMetrics {
43    pub fn record(&self, kind: ApplyErrorKind) {
44        use std::sync::atomic::Ordering::Relaxed;
45        match kind {
46            ApplyErrorKind::Gap => {
47                self.gap_total.fetch_add(1, Relaxed);
48            }
49            ApplyErrorKind::Divergence => {
50                self.divergence_total.fetch_add(1, Relaxed);
51            }
52            ApplyErrorKind::Apply => {
53                self.apply_error_total.fetch_add(1, Relaxed);
54            }
55            ApplyErrorKind::Decode => {
56                self.decode_error_total.fetch_add(1, Relaxed);
57            }
58            ApplyErrorKind::Miss => {
59                self.apply_miss_total.fetch_add(1, Relaxed);
60            }
61            ApplyErrorKind::Fenced => {
62                self.fenced_total.fetch_add(1, Relaxed);
63            }
64        }
65    }
66
67    pub fn snapshot(&self) -> [(ApplyErrorKind, u64); 6] {
68        use std::sync::atomic::Ordering::Relaxed;
69        [
70            (ApplyErrorKind::Gap, self.gap_total.load(Relaxed)),
71            (
72                ApplyErrorKind::Divergence,
73                self.divergence_total.load(Relaxed),
74            ),
75            (ApplyErrorKind::Apply, self.apply_error_total.load(Relaxed)),
76            (
77                ApplyErrorKind::Decode,
78                self.decode_error_total.load(Relaxed),
79            ),
80            (ApplyErrorKind::Miss, self.apply_miss_total.load(Relaxed)),
81            (ApplyErrorKind::Fenced, self.fenced_total.load(Relaxed)),
82        ]
83    }
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum ApplyErrorKind {
88    Gap,
89    Divergence,
90    Apply,
91    Decode,
92    /// Issue #814 — apply ran against a missing target (delete on an
93    /// absent collection/entity). Non-fatal divergence signal.
94    Miss,
95    /// Issue #835 — a record from a term behind the replica's current term
96    /// was fenced at the apply boundary (a stale ex-primary). Fail-closed.
97    Fenced,
98}
99
100impl ApplyErrorKind {
101    pub fn label(self) -> &'static str {
102        match self {
103            Self::Gap => "gap",
104            Self::Divergence => "divergence",
105            Self::Apply => "apply",
106            Self::Decode => "decode",
107            Self::Miss => "apply_miss",
108            Self::Fenced => "fenced",
109        }
110    }
111}
112
113impl LogicalApplyError {
114    pub fn kind(&self) -> ApplyErrorKind {
115        match self {
116            Self::Gap { .. } => ApplyErrorKind::Gap,
117            Self::Divergence { .. } => ApplyErrorKind::Divergence,
118            Self::Apply { .. } => ApplyErrorKind::Apply,
119            Self::StaleTermFenced { .. } => ApplyErrorKind::Fenced,
120        }
121    }
122}
123
124/// Outcome of a single `apply` call. `Applied` advances the chain;
125/// `Idempotent` and `Skipped` are no-ops (we already saw an
126/// equal-or-newer LSN). `Gap` and `Divergence` (returned via
127/// `LogicalApplyError`) are fail-closed — callers (replica fetcher,
128/// restore loop) should mark the instance unhealthy and stop applying.
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub enum ApplyOutcome {
131    /// Normal monotonic advance.
132    Applied,
133    /// Same LSN as last applied with same payload hash — log + skip.
134    Idempotent,
135    /// Older LSN than what we already have — log + skip.
136    Skipped,
137}
138
139#[derive(Debug)]
140pub enum LogicalApplyError {
141    Gap {
142        last: u64,
143        next: u64,
144    },
145    Divergence {
146        expected_term: u64,
147        got_term: u64,
148        lsn: u64,
149        expected: String,
150        got: String,
151    },
152    Apply {
153        lsn: u64,
154        source: RedDBError,
155    },
156    /// Issue #835 — the record's term is behind the replica's current
157    /// term: a returning ex-primary streaming on a stale, superseded term.
158    /// Rejected at the apply boundary so the deposed primary cannot apply
159    /// or advance any watermark until it re-syncs under the new term.
160    StaleTermFenced {
161        record_term: u64,
162        current_term: u64,
163        lsn: u64,
164    },
165}
166
167impl std::fmt::Display for LogicalApplyError {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        match self {
170            Self::Gap { last, next } => write!(f, "LSN gap on apply: last={last} next={next}"),
171            Self::StaleTermFenced {
172                record_term,
173                current_term,
174                lsn,
175            } => write!(
176                f,
177                "stale-term record fenced at lsn={lsn}: record term {record_term} is behind current term {current_term}"
178            ),
179            Self::Divergence {
180                expected_term,
181                got_term,
182                lsn,
183                expected,
184                got,
185            } => write!(
186                f,
187                "LSN divergence on apply at term/lsn=({got_term},{lsn}): expected term {expected_term} payload hash {expected}, got {got}"
188            ),
189            Self::Apply { lsn, source } => write!(f, "apply error at lsn={lsn}: {source}"),
190        }
191    }
192}
193
194impl std::error::Error for LogicalApplyError {}
195
196#[derive(Debug, Clone, PartialEq, Eq)]
197pub enum BookmarkWaitError {
198    Timeout { target_lsn: u64, applied_lsn: u64 },
199    TermMismatch { target_term: u64, applied_term: u64 },
200}
201
202impl BookmarkWaitError {
203    pub fn is_timeout(&self) -> bool {
204        matches!(self, Self::Timeout { .. })
205    }
206}
207
208impl std::fmt::Display for BookmarkWaitError {
209    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210        match self {
211            Self::Timeout {
212                target_lsn,
213                applied_lsn,
214            } => write!(
215                f,
216                "timed out waiting for causal bookmark lsn {target_lsn}; applied={applied_lsn}"
217            ),
218            Self::TermMismatch {
219                target_term,
220                applied_term,
221            } => write!(
222                f,
223                "causal bookmark term mismatch: target={target_term} applied={applied_term}"
224            ),
225        }
226    }
227}
228
229impl std::error::Error for BookmarkWaitError {}
230
231/// Shared logical change applier so replica replay and PITR converge on the
232/// same semantics. Stateful (PLAN.md Phase 11.5): tracks the last applied
233/// LSN + payload hash so duplicates / older LSNs / gaps / divergences are
234/// detected explicitly.
235pub struct LogicalChangeApplier {
236    last_applied_term: AtomicU64,
237    last_applied_lsn: AtomicU64,
238    received_frontier_lsn: AtomicU64,
239    last_payload_hash: Mutex<Option<[u8; 32]>>,
240    apply_wait: (Mutex<()>, Condvar),
241    /// Issue #814 — metrics the apply path bumps when a record runs
242    /// against a missing target. The production replica loop shares the
243    /// runtime's `ReplicaApplyMetrics` here so `/metrics` surfaces misses;
244    /// other callers (PITR, tests) get a private default that no one reads.
245    metrics: std::sync::Arc<ReplicaApplyMetrics>,
246}
247
248impl LogicalChangeApplier {
249    /// Build a fresh applier. `starting_lsn` is the LSN already
250    /// covered by the snapshot (or `0` for an empty replica). The
251    /// next acceptable record is any positive LSN; from there the
252    /// chain advances by 1.
253    pub fn new(starting_lsn: u64) -> Self {
254        Self::with_metrics(
255            starting_lsn,
256            std::sync::Arc::new(ReplicaApplyMetrics::default()),
257        )
258    }
259
260    /// Build an applier that records apply misses / errors into a shared
261    /// `ReplicaApplyMetrics` (issue #814). The production replica loop
262    /// passes the runtime's metrics so a swallowed delete leaves a trail
263    /// on `reddb_replica_apply_errors_total{kind="apply_miss"}`.
264    pub fn with_metrics(starting_lsn: u64, metrics: std::sync::Arc<ReplicaApplyMetrics>) -> Self {
265        Self {
266            last_applied_term: AtomicU64::new(crate::replication::DEFAULT_REPLICATION_TERM),
267            last_applied_lsn: AtomicU64::new(starting_lsn),
268            received_frontier_lsn: AtomicU64::new(starting_lsn),
269            last_payload_hash: Mutex::new(None),
270            apply_wait: (Mutex::new(()), Condvar::new()),
271            metrics,
272        }
273    }
274
275    /// The metrics handle this applier records misses/errors into.
276    pub fn metrics(&self) -> &std::sync::Arc<ReplicaApplyMetrics> {
277        &self.metrics
278    }
279
280    pub fn last_applied_lsn(&self) -> u64 {
281        self.last_applied_lsn.load(Ordering::Acquire)
282    }
283
284    pub fn received_frontier_lsn(&self) -> u64 {
285        self.received_frontier_lsn.load(Ordering::Acquire)
286    }
287
288    pub fn last_applied_term(&self) -> u64 {
289        self.last_applied_term.load(Ordering::Acquire)
290    }
291
292    pub fn wait_for_bookmark(
293        &self,
294        bookmark: &crate::replication::CausalBookmark,
295        timeout: std::time::Duration,
296    ) -> Result<(), BookmarkWaitError> {
297        let deadline = std::time::Instant::now() + timeout;
298        let target_lsn = bookmark.commit_lsn();
299        let target_term = bookmark.term();
300
301        let mut guard = self.apply_wait.0.lock().expect("apply wait mutex");
302        loop {
303            let applied_lsn = self.last_applied_lsn();
304            let applied_term = self.last_applied_term();
305            if applied_lsn >= target_lsn {
306                if applied_term == target_term {
307                    return Ok(());
308                }
309                return Err(BookmarkWaitError::TermMismatch {
310                    target_term,
311                    applied_term,
312                });
313            }
314
315            let now = std::time::Instant::now();
316            if now >= deadline {
317                return Err(BookmarkWaitError::Timeout {
318                    target_lsn,
319                    applied_lsn,
320                });
321            }
322            let remaining = deadline.saturating_duration_since(now);
323            let (next_guard, wait_result) = self
324                .apply_wait
325                .1
326                .wait_timeout(guard, remaining)
327                .expect("apply wait condvar");
328            guard = next_guard;
329            if wait_result.timed_out() {
330                return Err(BookmarkWaitError::Timeout {
331                    target_lsn,
332                    applied_lsn: self.last_applied_lsn(),
333                });
334            }
335        }
336    }
337
338    /// Apply one logical change record. The state machine:
339    /// - first record after `starting_lsn == 0` → apply, anchor.
340    /// - `lsn == last + 1` → apply, advance.
341    /// - `lsn == last` && payload hash equal → idempotent skip.
342    /// - `lsn == last` && payload hash differs → `Divergence` (fail closed).
343    /// - `lsn < last` → older replay, skip with debug log.
344    /// - `lsn > last + 1` → `Gap` (fail closed; caller marks unhealthy).
345    pub fn apply(
346        &self,
347        db: &RedDB,
348        record: &ChangeRecord,
349        mode: ApplyMode,
350    ) -> Result<ApplyOutcome, LogicalApplyError> {
351        let last = self.last_applied_lsn.load(Ordering::Acquire);
352        let last_term = self.last_applied_term.load(Ordering::Acquire);
353
354        // Stale-term fence (issue #835, ADR 0030). A record from a term
355        // *behind* the highest term this replica has adopted is a returning
356        // ex-primary on a superseded timeline. Reject it before the LSN
357        // state machine runs — fail closed regardless of LSN, so a stale
358        // ex-primary can neither apply nor advance the chain/watermark. A
359        // record on the *same* term is admitted; a *higher* term is the new
360        // primary's timeline and is adopted on apply below. This mirrors the
361        // election-side `RefusalReason::StaleTerm` on the data path.
362        if record.term < last_term {
363            self.metrics.record(ApplyErrorKind::Fenced);
364            return Err(LogicalApplyError::StaleTermFenced {
365                record_term: record.term,
366                current_term: last_term,
367                lsn: record.lsn,
368            });
369        }
370
371        let payload_hash = record_payload_hash(record);
372        self.received_frontier_lsn
373            .fetch_max(record.lsn, Ordering::AcqRel);
374
375        if last == 0 && record.lsn > 0 {
376            self.do_apply(db, record, mode)?;
377            self.last_applied_term.store(record.term, Ordering::Release);
378            self.last_applied_lsn.store(record.lsn, Ordering::Release);
379            *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
380            self.apply_wait.1.notify_all();
381            return Ok(ApplyOutcome::Applied);
382        }
383
384        if record.lsn == last {
385            let prior = *self.last_payload_hash.lock().expect("payload hash mutex");
386            return match prior {
387                Some(p) if p == payload_hash => Ok(ApplyOutcome::Idempotent),
388                Some(p) => Err(LogicalApplyError::Divergence {
389                    expected_term: last_term,
390                    got_term: record.term,
391                    lsn: record.lsn,
392                    expected: hex_digest(&p),
393                    got: hex_digest(&payload_hash),
394                }),
395                None => Ok(ApplyOutcome::Idempotent),
396            };
397        }
398        if record.lsn < last {
399            return Ok(ApplyOutcome::Skipped);
400        }
401        if record.lsn > last + 1 {
402            return Err(LogicalApplyError::Gap {
403                last,
404                next: record.lsn,
405            });
406        }
407
408        self.do_apply(db, record, mode)?;
409        self.last_applied_term.store(record.term, Ordering::Release);
410        self.last_applied_lsn.store(record.lsn, Ordering::Release);
411        *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
412        self.apply_wait.1.notify_all();
413        Ok(ApplyOutcome::Applied)
414    }
415
416    fn do_apply(
417        &self,
418        db: &RedDB,
419        record: &ChangeRecord,
420        mode: ApplyMode,
421    ) -> Result<(), LogicalApplyError> {
422        Self::apply_record_with_metrics(db, record, mode, &self.metrics).map_err(|err| {
423            LogicalApplyError::Apply {
424                lsn: record.lsn,
425                source: err,
426            }
427        })
428    }
429
430    /// Stateless apply — applies the record without monotonicity
431    /// checks. Kept for callers that don't yet thread the stateful
432    /// applier through. New code should prefer
433    /// `LogicalChangeApplier::new()` + `apply()`. Apply misses (delete
434    /// against a missing target) are recorded into a throwaway metrics
435    /// handle; use [`apply_record_with_metrics`] to surface them.
436    pub fn apply_record(db: &RedDB, record: &ChangeRecord, mode: ApplyMode) -> RedDBResult<()> {
437        Self::apply_record_with_metrics(db, record, mode, &ReplicaApplyMetrics::default())
438    }
439
440    /// Stateless apply that records apply misses (issue #814) into
441    /// `metrics`. A delete against a missing collection or a missing
442    /// entity is a non-fatal divergence signal: it bumps
443    /// `ApplyErrorKind::Miss` and emits a structured warn line, but still
444    /// returns `Ok(())` so the LSN chain advances and idempotent re-pull
445    /// (#813) converges. A genuine (non-missing-target) store error on a
446    /// delete propagates as a real apply error — counted, fail-closed —
447    /// rather than being swallowed by the old `let _ =`.
448    pub fn apply_record_with_metrics(
449        db: &RedDB,
450        record: &ChangeRecord,
451        _mode: ApplyMode,
452        metrics: &ReplicaApplyMetrics,
453    ) -> RedDBResult<()> {
454        let store = db.store();
455        match record.operation {
456            ChangeOperation::Delete => {
457                match store.delete(&record.collection, EntityId::new(record.entity_id)) {
458                    Ok(true) => {}
459                    Ok(false) => {
460                        // Target collection existed but no such entity —
461                        // the delete found nothing to remove.
462                        metrics.record(ApplyErrorKind::Miss);
463                        tracing::warn!(
464                            target: "reddb::replication::apply",
465                            lsn = record.lsn,
466                            collection = %record.collection,
467                            entity_id = record.entity_id,
468                            "replica delete found no matching entity; recorded apply miss (non-fatal divergence signal)"
469                        );
470                    }
471                    Err(crate::storage::StoreError::CollectionNotFound(name)) => {
472                        // The whole collection is absent on the replica —
473                        // a missed delete that can drive count drift.
474                        metrics.record(ApplyErrorKind::Miss);
475                        tracing::warn!(
476                            target: "reddb::replication::apply",
477                            lsn = record.lsn,
478                            collection = %name,
479                            entity_id = record.entity_id,
480                            "replica delete against missing collection; recorded apply miss (non-fatal divergence signal)"
481                        );
482                    }
483                    Err(err) => {
484                        // A real store error is a genuine apply failure:
485                        // surface it instead of discarding it so the
486                        // caller counts it and the replica fails closed.
487                        return Err(RedDBError::Internal(err.to_string()));
488                    }
489                }
490            }
491            ChangeOperation::Refresh => {
492                // Issue #596 slice 9d — replica replay of
493                // `REFRESH MATERIALIZED VIEW v`. The primary
494                // emitted the serialized backing-collection
495                // contents in `refresh_records`; apply the
496                // atomic swap on the replica's local store
497                // (which also persists a `RefreshCollection`
498                // WAL action so the post-swap contents survive
499                // a replica restart).
500                let records = record.refresh_records.clone().ok_or_else(|| {
501                    RedDBError::Internal(
502                        "replication refresh record missing refresh_records payload".to_string(),
503                    )
504                })?;
505                store
506                    .refresh_collection_from_records(&record.collection, records)
507                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
508            }
509            ChangeOperation::Insert | ChangeOperation::Update => {
510                let Some(bytes) = &record.entity_bytes else {
511                    return Err(RedDBError::Internal(
512                        "replication record missing entity payload".to_string(),
513                    ));
514                };
515                let entity = UnifiedStore::deserialize_entity(bytes, store.format_version())
516                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
517
518                // Issue #813 — MVCC table-row supersession on the replica.
519                //
520                // A table-row UPDATE on the primary installs a NEW physical
521                // version (fresh `EntityId`) that shares the row's stable
522                // `logical_id`, and marks the prior version superseded
523                // (`xmax != 0`) so snapshot reads skip it. Only the new
524                // version travels on the wire — the prior version's `xmax`
525                // bump is implicit. Without reproducing it here the replica
526                // leaves every prior version LIVE, so each update to a row
527                // accumulates a stale live duplicate and a full re-pull
528                // replays them all (the observed 22× inflation). Before
529                // upserting the incoming version, mark any *other* live
530                // version of the same logical id superseded — mirroring
531                // `install_versioned_table_row_update` on the primary. This
532                // is idempotent under re-pull: re-applying a record updates
533                // its version in place (resetting its `xmax` from the
534                // serialized bytes), and the last writer per logical id in
535                // LSN order wins, converging on the primary's live set.
536                if matches!(entity.kind, EntityKind::TableRow { .. }) {
537                    let logical = entity.logical_id();
538                    let new_id = entity.id;
539                    let superseding_xid = if entity.xmin != 0 { entity.xmin } else { 1 };
540                    let stale: Vec<_> = store
541                        .table_row_versions_by_logical_id(&record.collection, logical)
542                        .into_iter()
543                        .filter(|version| version.id != new_id && version.xmax == 0)
544                        .collect();
545                    if !stale.is_empty() {
546                        let manager = store
547                            .get_collection(&record.collection)
548                            .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
549                        for mut version in stale {
550                            version.set_xmax(superseding_xid);
551                            manager
552                                .update(version)
553                                .map_err(|err| RedDBError::Internal(err.to_string()))?;
554                        }
555                    }
556                }
557
558                let exists = store
559                    .get(&record.collection, EntityId::new(record.entity_id))
560                    .is_some();
561                if exists {
562                    let manager = store
563                        .get_collection(&record.collection)
564                        .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
565                    manager
566                        .update(entity.clone())
567                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
568                } else {
569                    store
570                        .insert_auto(&record.collection, entity.clone())
571                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
572                }
573                if let Some(metadata_json) = &record.metadata {
574                    let metadata = metadata_from_json(metadata_json)
575                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
576                    store
577                        .set_metadata(&record.collection, entity.id, metadata)
578                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
579                }
580                store
581                    .context_index()
582                    .index_entity(&record.collection, &entity);
583            }
584        }
585        Ok(())
586    }
587}
588
589fn record_payload_hash(record: &ChangeRecord) -> [u8; 32] {
590    let mut hasher = crate::crypto::sha256::Sha256::new();
591    hasher.update(&record.term.to_le_bytes());
592    hasher.update(&record.lsn.to_le_bytes());
593    hasher.update(&[record.operation as u8]);
594    hasher.update(record.collection.as_bytes());
595    hasher.update(&record.entity_id.to_le_bytes());
596    if let Some(bytes) = &record.entity_bytes {
597        hasher.update(bytes);
598    }
599    // Issue #596 slice 9d — refresh payload participates in the
600    // payload-hash so the same-LSN-idempotent / different-payload-
601    // divergence state machine works for Refresh records too.
602    if let Some(records) = &record.refresh_records {
603        hasher.update(&(records.len() as u64).to_le_bytes());
604        for r in records {
605            hasher.update(&(r.len() as u64).to_le_bytes());
606            hasher.update(r);
607        }
608    }
609    hasher.finalize()
610}
611
612fn hex_digest(bytes: &[u8; 32]) -> String {
613    crate::utils::to_hex(bytes)
614}
615
616#[cfg(test)]
617mod tests {
618    use super::*;
619    use crate::replication::cdc::ChangeOperation;
620    use crate::storage::schema::Value;
621    use crate::storage::{EntityData, EntityId, EntityKind, RedDB, RowData, UnifiedEntity};
622    use std::sync::Arc;
623
624    fn open_db() -> (RedDB, std::path::PathBuf) {
625        let path = std::env::temp_dir().join(format!(
626            "reddb_logical_apply_{}_{}",
627            std::process::id(),
628            std::time::SystemTime::now()
629                .duration_since(std::time::UNIX_EPOCH)
630                .unwrap()
631                .as_nanos()
632        ));
633        let _ = std::fs::remove_file(&path);
634        let db = RedDB::open(&path).unwrap();
635        (db, path)
636    }
637
638    fn record(lsn: u64, payload: &[u8]) -> ChangeRecord {
639        let timestamp = 100 + lsn;
640        let mut entity = UnifiedEntity::new(
641            EntityId::new(lsn),
642            EntityKind::TableRow {
643                table: Arc::from("users"),
644                row_id: lsn,
645            },
646            EntityData::Row(RowData::with_names(
647                vec![Value::UnsignedInteger(lsn), Value::Blob(payload.to_vec())],
648                vec!["id".to_string(), "payload".to_string()],
649            )),
650        );
651        entity.created_at = timestamp;
652        entity.updated_at = timestamp;
653        entity.sequence_id = lsn;
654        ChangeRecord::from_entity(
655            lsn,
656            timestamp,
657            ChangeOperation::Insert,
658            "users",
659            "row",
660            &entity,
661            crate::api::REDDB_FORMAT_VERSION,
662            None,
663        )
664    }
665
666    fn delete_record(lsn: u64, collection: &str, entity_id: u64) -> ChangeRecord {
667        ChangeRecord {
668            term: crate::replication::DEFAULT_REPLICATION_TERM,
669            lsn,
670            timestamp: 100 + lsn,
671            operation: ChangeOperation::Delete,
672            collection: collection.to_string(),
673            entity_id,
674            entity_kind: "row".to_string(),
675            entity_bytes: None,
676            metadata: None,
677            refresh_records: None,
678        }
679    }
680
681    fn table_row_entity(id: u64) -> UnifiedEntity {
682        let mut entity = UnifiedEntity::new(
683            EntityId::new(id),
684            EntityKind::TableRow {
685                table: Arc::from("users"),
686                row_id: id,
687            },
688            EntityData::Row(RowData::with_names(
689                vec![Value::UnsignedInteger(id)],
690                vec!["id".to_string()],
691            )),
692        );
693        entity.created_at = 100 + id;
694        entity.updated_at = 100 + id;
695        entity.sequence_id = id;
696        entity
697    }
698
699    // Issue #814 — a delete against a missing collection must record an
700    // apply miss (not a silent no-op) and still return Ok so the LSN
701    // chain advances (idempotent re-pull, #813).
702    #[test]
703    fn delete_against_missing_collection_records_apply_miss() {
704        let (db, path) = open_db();
705        let metrics = ReplicaApplyMetrics::default();
706        let before = metrics.apply_miss_total.load(Ordering::Relaxed);
707
708        LogicalChangeApplier::apply_record_with_metrics(
709            &db,
710            &delete_record(1, "no_such_collection", 42),
711            ApplyMode::Replica,
712            &metrics,
713        )
714        .expect("missing-target delete is non-fatal");
715
716        assert_eq!(
717            metrics.apply_miss_total.load(Ordering::Relaxed),
718            before + 1,
719            "delete against a missing collection must bump the apply-miss signal"
720        );
721        let _ = std::fs::remove_file(path);
722    }
723
724    // Issue #814 — a delete against an existing collection but absent
725    // entity is likewise a recorded miss, not a swallowed no-op.
726    #[test]
727    fn delete_against_missing_entity_records_apply_miss() {
728        let (db, path) = open_db();
729        let _ = db.store().get_or_create_collection("users");
730        let metrics = ReplicaApplyMetrics::default();
731
732        LogicalChangeApplier::apply_record_with_metrics(
733            &db,
734            &delete_record(1, "users", 9999),
735            ApplyMode::Replica,
736            &metrics,
737        )
738        .expect("missing-entity delete is non-fatal");
739
740        assert_eq!(
741            metrics.apply_miss_total.load(Ordering::Relaxed),
742            1,
743            "delete of an absent entity must bump the apply-miss signal"
744        );
745        let _ = std::fs::remove_file(path);
746    }
747
748    // Issue #814 — the normal path (target present) deletes without
749    // firing the miss signal. No behavioral regression.
750    #[test]
751    fn delete_of_present_target_records_no_apply_miss() {
752        let (db, path) = open_db();
753        let store = db.store();
754        let _ = store.get_or_create_collection("users");
755        let id = store
756            .insert_auto("users", table_row_entity(1))
757            .expect("insert entity");
758        let metrics = ReplicaApplyMetrics::default();
759
760        LogicalChangeApplier::apply_record_with_metrics(
761            &db,
762            &delete_record(1, "users", id.raw()),
763            ApplyMode::Replica,
764            &metrics,
765        )
766        .expect("present-target delete applies");
767
768        assert_eq!(
769            metrics.apply_miss_total.load(Ordering::Relaxed),
770            0,
771            "deleting a present target must not fire the apply-miss signal"
772        );
773        assert!(
774            store.get("users", id).is_none(),
775            "the entity must actually be removed on the normal path"
776        );
777        let _ = std::fs::remove_file(path);
778    }
779
780    // Issue #814 — the shared-metrics handle on the stateful applier
781    // surfaces the miss so `/metrics` (which reads the same Arc) sees it.
782    #[test]
783    fn stateful_apply_surfaces_delete_miss_via_metrics_handle() {
784        let (db, path) = open_db();
785        let applier =
786            LogicalChangeApplier::with_metrics(0, Arc::new(ReplicaApplyMetrics::default()));
787
788        applier
789            .apply(&db, &delete_record(1, "ghost", 7), ApplyMode::Replica)
790            .expect("missing-target delete advances the chain");
791
792        assert_eq!(
793            applier.metrics().apply_miss_total.load(Ordering::Relaxed),
794            1,
795            "the applier's shared metrics handle must record the miss"
796        );
797        assert_eq!(
798            applier.last_applied_lsn(),
799            1,
800            "a non-fatal miss still advances the LSN chain"
801        );
802        let _ = std::fs::remove_file(path);
803    }
804
805    #[test]
806    fn apply_advances_on_monotonic_lsn() {
807        let (db, path) = open_db();
808        let applier = LogicalChangeApplier::new(0);
809        assert_eq!(
810            applier
811                .apply(&db, &record(1, b"a"), ApplyMode::Replica)
812                .unwrap(),
813            ApplyOutcome::Applied
814        );
815        assert_eq!(applier.last_applied_lsn(), 1);
816        assert_eq!(
817            applier
818                .apply(&db, &record(2, b"b"), ApplyMode::Replica)
819                .unwrap(),
820            ApplyOutcome::Applied
821        );
822        assert_eq!(applier.last_applied_lsn(), 2);
823        let _ = std::fs::remove_file(path);
824    }
825
826    #[test]
827    fn apply_idempotent_on_duplicate_lsn_same_payload() {
828        let (db, path) = open_db();
829        let applier = LogicalChangeApplier::new(0);
830        let r = record(5, b"same");
831        applier.apply(&db, &r, ApplyMode::Replica).unwrap();
832        assert_eq!(
833            applier.apply(&db, &r, ApplyMode::Replica).unwrap(),
834            ApplyOutcome::Idempotent
835        );
836        assert_eq!(applier.last_applied_lsn(), 5);
837        let _ = std::fs::remove_file(path);
838    }
839
840    #[test]
841    fn apply_fails_closed_on_lsn_collision_diff_payload() {
842        let (db, path) = open_db();
843        let applier = LogicalChangeApplier::new(0);
844        applier
845            .apply(&db, &record(7, b"first"), ApplyMode::Replica)
846            .unwrap();
847        let err = applier
848            .apply(&db, &record(7, b"different"), ApplyMode::Replica)
849            .unwrap_err();
850        assert!(
851            matches!(err, LogicalApplyError::Divergence { lsn: 7, .. }),
852            "got {err:?}"
853        );
854        let _ = std::fs::remove_file(path);
855    }
856
857    #[test]
858    fn apply_fails_closed_on_same_lsn_different_term() {
859        let (db, path) = open_db();
860        let applier = LogicalChangeApplier::new(0);
861        applier
862            .apply(&db, &record(7, b"same").with_term(1), ApplyMode::Replica)
863            .unwrap();
864        let err = applier
865            .apply(&db, &record(7, b"same").with_term(2), ApplyMode::Replica)
866            .unwrap_err();
867        assert!(
868            matches!(
869                err,
870                LogicalApplyError::Divergence {
871                    lsn: 7,
872                    expected_term: 1,
873                    got_term: 2,
874                    ..
875                }
876            ),
877            "got {err:?}"
878        );
879        assert_eq!(applier.last_applied_term(), 1);
880        assert_eq!(applier.last_applied_lsn(), 7);
881        let _ = std::fs::remove_file(path);
882    }
883
884    // Issue #835 — a record from a term behind the replica's adopted term
885    // is fenced at the apply boundary: rejected, counted, and crucially the
886    // LSN/term chain does NOT advance, so a returning ex-primary on a stale
887    // term cannot move any watermark.
888    #[test]
889    fn apply_fences_stale_term_record() {
890        let (db, path) = open_db();
891        let applier = LogicalChangeApplier::new(0);
892
893        // Replica adopts term 5 from the legitimate primary at lsn 1.
894        applier
895            .apply(&db, &record(1, b"a").with_term(5), ApplyMode::Replica)
896            .unwrap();
897        assert_eq!(applier.last_applied_term(), 5);
898        assert_eq!(applier.last_applied_lsn(), 1);
899
900        // A returning ex-primary streams the next record on the old term 4.
901        let before = applier.metrics().fenced_total.load(Ordering::Relaxed);
902        let err = applier
903            .apply(&db, &record(2, b"b").with_term(4), ApplyMode::Replica)
904            .unwrap_err();
905        assert!(
906            matches!(
907                err,
908                LogicalApplyError::StaleTermFenced {
909                    record_term: 4,
910                    current_term: 5,
911                    lsn: 2,
912                }
913            ),
914            "got {err:?}"
915        );
916        assert_eq!(err.kind(), ApplyErrorKind::Fenced);
917        assert_eq!(
918            applier.metrics().fenced_total.load(Ordering::Relaxed),
919            before + 1,
920            "the fence must leave a metrics trail"
921        );
922        // The fenced record advanced nothing — no apply, no watermark move.
923        assert_eq!(applier.last_applied_lsn(), 1, "watermark must not advance");
924        assert_eq!(applier.last_applied_term(), 5);
925        assert_eq!(
926            applier.received_frontier_lsn(),
927            1,
928            "a fenced record must not even advance the received frontier"
929        );
930        let _ = std::fs::remove_file(path);
931    }
932
933    // The fence only bites a *behind* term. A record on the same term
934    // applies normally, and a record on a higher term is the new primary's
935    // timeline — adopted on apply.
936    #[test]
937    fn apply_admits_same_term_and_adopts_higher_term() {
938        let (db, path) = open_db();
939        let applier = LogicalChangeApplier::new(0);
940
941        applier
942            .apply(&db, &record(1, b"a").with_term(3), ApplyMode::Replica)
943            .unwrap();
944        // Same term → admitted.
945        applier
946            .apply(&db, &record(2, b"b").with_term(3), ApplyMode::Replica)
947            .unwrap();
948        assert_eq!(applier.last_applied_term(), 3);
949        // Higher term → adopted.
950        applier
951            .apply(&db, &record(3, b"c").with_term(7), ApplyMode::Replica)
952            .unwrap();
953        assert_eq!(applier.last_applied_term(), 7);
954        assert_eq!(applier.last_applied_lsn(), 3);
955
956        // Now a record back on term 3 is fenced.
957        let err = applier
958            .apply(&db, &record(4, b"d").with_term(3), ApplyMode::Replica)
959            .unwrap_err();
960        assert!(
961            matches!(err, LogicalApplyError::StaleTermFenced { .. }),
962            "got {err:?}"
963        );
964        let _ = std::fs::remove_file(path);
965    }
966
967    #[test]
968    fn apply_skips_older_lsn() {
969        let (db, path) = open_db();
970        let applier = LogicalChangeApplier::new(0);
971        applier
972            .apply(&db, &record(1, b"a"), ApplyMode::Replica)
973            .unwrap();
974        applier
975            .apply(&db, &record(2, b"b"), ApplyMode::Replica)
976            .unwrap();
977        assert_eq!(
978            applier
979                .apply(&db, &record(1, b"a"), ApplyMode::Replica)
980                .unwrap(),
981            ApplyOutcome::Skipped
982        );
983        assert_eq!(applier.last_applied_lsn(), 2);
984        let _ = std::fs::remove_file(path);
985    }
986
987    #[test]
988    fn apply_returns_gap_on_future_lsn() {
989        let (db, path) = open_db();
990        let applier = LogicalChangeApplier::new(0);
991        applier
992            .apply(&db, &record(1, b"a"), ApplyMode::Replica)
993            .unwrap();
994        let err = applier
995            .apply(&db, &record(5, b"e"), ApplyMode::Replica)
996            .unwrap_err();
997        assert!(
998            matches!(err, LogicalApplyError::Gap { last: 1, next: 5 }),
999            "got {err:?}"
1000        );
1001        assert_eq!(applier.last_applied_lsn(), 1);
1002        let _ = std::fs::remove_file(path);
1003    }
1004}