Skip to main content

reddb_server/replication/
logical.rs

1//! Logical replication helpers shared by replica apply and point-in-time restore.
2
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Condvar, Mutex};
5
6use crate::api::{RedDBError, RedDBResult};
7use crate::application::entity::metadata_from_json;
8use crate::replication::cdc::{
9    change_record_from_entity, wire_json_to_server_json, ChangeOperation, ChangeRecord,
10};
11use crate::storage::{EntityId, EntityKind, RedDB, UnifiedStore};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ApplyMode {
15    Replica,
16    Restore,
17}
18
19/// PLAN.md Phase 11.5 — counters the replica apply loop bumps when an
20/// invariant breaks. Surfaced via `reddb_replica_apply_errors_total`.
21/// Decode errors aren't strictly apply errors but they share the same
22/// observability lane so dashboards alert on "replica is ingesting
23/// trash from primary regardless of cause".
24#[derive(Debug, Default)]
25pub struct ReplicaApplyMetrics {
26    pub gap_total: std::sync::atomic::AtomicU64,
27    pub divergence_total: std::sync::atomic::AtomicU64,
28    pub apply_error_total: std::sync::atomic::AtomicU64,
29    pub decode_error_total: std::sync::atomic::AtomicU64,
30    /// Issue #814 — a delete (or other apply) that found no target on the
31    /// replica: a missing collection or a missing entity. Non-fatal (the
32    /// LSN chain still advances so idempotent re-pull converges, see
33    /// #813), but recorded so a missed delete that drives collection-count
34    /// drift leaves a trail instead of being swallowed by `let _ =`.
35    pub apply_miss_total: std::sync::atomic::AtomicU64,
36    /// Issue #835 — a record carrying a term *behind* the replica's current
37    /// term was fenced at the apply boundary (a returning ex-primary on a
38    /// stale term). Fail-closed: the record is rejected and the LSN/term
39    /// chain does not advance, so the deposed primary cannot move any
40    /// watermark until it re-syncs under the new term.
41    pub fenced_total: std::sync::atomic::AtomicU64,
42}
43
44impl ReplicaApplyMetrics {
45    pub fn record(&self, kind: ApplyErrorKind) {
46        use std::sync::atomic::Ordering::Relaxed;
47        match kind {
48            ApplyErrorKind::Gap => {
49                self.gap_total.fetch_add(1, Relaxed);
50            }
51            ApplyErrorKind::Divergence => {
52                self.divergence_total.fetch_add(1, Relaxed);
53            }
54            ApplyErrorKind::Apply => {
55                self.apply_error_total.fetch_add(1, Relaxed);
56            }
57            ApplyErrorKind::Decode => {
58                self.decode_error_total.fetch_add(1, Relaxed);
59            }
60            ApplyErrorKind::Miss => {
61                self.apply_miss_total.fetch_add(1, Relaxed);
62            }
63            ApplyErrorKind::Fenced => {
64                self.fenced_total.fetch_add(1, Relaxed);
65            }
66        }
67    }
68
69    pub fn snapshot(&self) -> [(ApplyErrorKind, u64); 6] {
70        use std::sync::atomic::Ordering::Relaxed;
71        [
72            (ApplyErrorKind::Gap, self.gap_total.load(Relaxed)),
73            (
74                ApplyErrorKind::Divergence,
75                self.divergence_total.load(Relaxed),
76            ),
77            (ApplyErrorKind::Apply, self.apply_error_total.load(Relaxed)),
78            (
79                ApplyErrorKind::Decode,
80                self.decode_error_total.load(Relaxed),
81            ),
82            (ApplyErrorKind::Miss, self.apply_miss_total.load(Relaxed)),
83            (ApplyErrorKind::Fenced, self.fenced_total.load(Relaxed)),
84        ]
85    }
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum ApplyErrorKind {
90    Gap,
91    Divergence,
92    Apply,
93    Decode,
94    /// Issue #814 — apply ran against a missing target (delete on an
95    /// absent collection/entity). Non-fatal divergence signal.
96    Miss,
97    /// Issue #835 — a record from a term behind the replica's current term
98    /// was fenced at the apply boundary (a stale ex-primary). Fail-closed.
99    Fenced,
100}
101
102impl ApplyErrorKind {
103    pub fn label(self) -> &'static str {
104        match self {
105            Self::Gap => "gap",
106            Self::Divergence => "divergence",
107            Self::Apply => "apply",
108            Self::Decode => "decode",
109            Self::Miss => "apply_miss",
110            Self::Fenced => "fenced",
111        }
112    }
113}
114
115impl LogicalApplyError {
116    pub fn kind(&self) -> ApplyErrorKind {
117        match self {
118            Self::Gap { .. } => ApplyErrorKind::Gap,
119            Self::Divergence { .. } => ApplyErrorKind::Divergence,
120            Self::Apply { .. } => ApplyErrorKind::Apply,
121            Self::StaleTermFenced { .. } => ApplyErrorKind::Fenced,
122        }
123    }
124}
125
126/// Outcome of a single `apply` call. `Applied` advances the chain;
127/// `Idempotent` and `Skipped` are no-ops (we already saw an
128/// equal-or-newer LSN). `Gap` and `Divergence` (returned via
129/// `LogicalApplyError`) are fail-closed — callers (replica fetcher,
130/// restore loop) should mark the instance unhealthy and stop applying.
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub enum ApplyOutcome {
133    /// Normal monotonic advance.
134    Applied,
135    /// Same LSN as last applied with same payload hash — log + skip.
136    Idempotent,
137    /// Older LSN than what we already have — log + skip.
138    Skipped,
139}
140
141#[derive(Debug)]
142pub enum LogicalApplyError {
143    Gap {
144        last: u64,
145        next: u64,
146    },
147    Divergence {
148        expected_term: u64,
149        got_term: u64,
150        lsn: u64,
151        expected: String,
152        got: String,
153    },
154    Apply {
155        lsn: u64,
156        source: RedDBError,
157    },
158    /// Issue #835 — the record's term is behind the replica's current
159    /// term: a returning ex-primary streaming on a stale, superseded term.
160    /// Rejected at the apply boundary so the deposed primary cannot apply
161    /// or advance any watermark until it re-syncs under the new term.
162    StaleTermFenced {
163        record_term: u64,
164        current_term: u64,
165        lsn: u64,
166    },
167}
168
169impl std::fmt::Display for LogicalApplyError {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        match self {
172            Self::Gap { last, next } => write!(f, "LSN gap on apply: last={last} next={next}"),
173            Self::StaleTermFenced {
174                record_term,
175                current_term,
176                lsn,
177            } => write!(
178                f,
179                "stale-term record fenced at lsn={lsn}: record term {record_term} is behind current term {current_term}"
180            ),
181            Self::Divergence {
182                expected_term,
183                got_term,
184                lsn,
185                expected,
186                got,
187            } => write!(
188                f,
189                "LSN divergence on apply at term/lsn=({got_term},{lsn}): expected term {expected_term} payload hash {expected}, got {got}"
190            ),
191            Self::Apply { lsn, source } => write!(f, "apply error at lsn={lsn}: {source}"),
192        }
193    }
194}
195
196impl std::error::Error for LogicalApplyError {}
197
198#[derive(Debug, Clone, PartialEq, Eq)]
199pub enum BookmarkWaitError {
200    Timeout { target_lsn: u64, applied_lsn: u64 },
201    TermMismatch { target_term: u64, applied_term: u64 },
202}
203
204impl BookmarkWaitError {
205    pub fn is_timeout(&self) -> bool {
206        matches!(self, Self::Timeout { .. })
207    }
208}
209
210impl std::fmt::Display for BookmarkWaitError {
211    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212        match self {
213            Self::Timeout {
214                target_lsn,
215                applied_lsn,
216            } => write!(
217                f,
218                "timed out waiting for causal bookmark lsn {target_lsn}; applied={applied_lsn}"
219            ),
220            Self::TermMismatch {
221                target_term,
222                applied_term,
223            } => write!(
224                f,
225                "causal bookmark term mismatch: target={target_term} applied={applied_term}"
226            ),
227        }
228    }
229}
230
231impl std::error::Error for BookmarkWaitError {}
232
233/// Shared logical change applier so replica replay and PITR converge on the
234/// same semantics. Stateful (PLAN.md Phase 11.5): tracks the last applied
235/// LSN + payload hash so duplicates / older LSNs / gaps / divergences are
236/// detected explicitly.
237pub struct LogicalChangeApplier {
238    last_applied_term: AtomicU64,
239    last_applied_lsn: AtomicU64,
240    received_frontier_lsn: AtomicU64,
241    last_payload_hash: Mutex<Option<[u8; 32]>>,
242    apply_wait: (Mutex<()>, Condvar),
243    /// Issue #814 — metrics the apply path bumps when a record runs
244    /// against a missing target. The production replica loop shares the
245    /// runtime's `ReplicaApplyMetrics` here so `/metrics` surfaces misses;
246    /// other callers (PITR, tests) get a private default that no one reads.
247    metrics: std::sync::Arc<ReplicaApplyMetrics>,
248}
249
250impl LogicalChangeApplier {
251    /// Build a fresh applier. `starting_lsn` is the LSN already
252    /// covered by the snapshot (or `0` for an empty replica). The
253    /// next acceptable record is any positive LSN; from there the
254    /// chain advances by 1.
255    pub fn new(starting_lsn: u64) -> Self {
256        Self::with_metrics(
257            starting_lsn,
258            std::sync::Arc::new(ReplicaApplyMetrics::default()),
259        )
260    }
261
262    /// Build an applier that records apply misses / errors into a shared
263    /// `ReplicaApplyMetrics` (issue #814). The production replica loop
264    /// passes the runtime's metrics so a swallowed delete leaves a trail
265    /// on `reddb_replica_apply_errors_total{kind="apply_miss"}`.
266    pub fn with_metrics(starting_lsn: u64, metrics: std::sync::Arc<ReplicaApplyMetrics>) -> Self {
267        Self {
268            last_applied_term: AtomicU64::new(crate::replication::DEFAULT_REPLICATION_TERM),
269            last_applied_lsn: AtomicU64::new(starting_lsn),
270            received_frontier_lsn: AtomicU64::new(starting_lsn),
271            last_payload_hash: Mutex::new(None),
272            apply_wait: (Mutex::new(()), Condvar::new()),
273            metrics,
274        }
275    }
276
277    /// The metrics handle this applier records misses/errors into.
278    pub fn metrics(&self) -> &std::sync::Arc<ReplicaApplyMetrics> {
279        &self.metrics
280    }
281
282    pub fn last_applied_lsn(&self) -> u64 {
283        self.last_applied_lsn.load(Ordering::Acquire)
284    }
285
286    pub fn received_frontier_lsn(&self) -> u64 {
287        self.received_frontier_lsn.load(Ordering::Acquire)
288    }
289
290    pub fn last_applied_term(&self) -> u64 {
291        self.last_applied_term.load(Ordering::Acquire)
292    }
293
294    pub fn wait_for_bookmark(
295        &self,
296        bookmark: &crate::replication::CausalBookmark,
297        timeout: std::time::Duration,
298    ) -> Result<(), BookmarkWaitError> {
299        let deadline = std::time::Instant::now() + timeout;
300        let target_lsn = bookmark.commit_lsn();
301        let target_term = bookmark.term();
302
303        let mut guard = self.apply_wait.0.lock().expect("apply wait mutex");
304        loop {
305            let applied_lsn = self.last_applied_lsn();
306            let applied_term = self.last_applied_term();
307            if applied_lsn >= target_lsn {
308                if applied_term == target_term {
309                    return Ok(());
310                }
311                return Err(BookmarkWaitError::TermMismatch {
312                    target_term,
313                    applied_term,
314                });
315            }
316
317            let now = std::time::Instant::now();
318            if now >= deadline {
319                return Err(BookmarkWaitError::Timeout {
320                    target_lsn,
321                    applied_lsn,
322                });
323            }
324            let remaining = deadline.saturating_duration_since(now);
325            let (next_guard, wait_result) = self
326                .apply_wait
327                .1
328                .wait_timeout(guard, remaining)
329                .expect("apply wait condvar");
330            guard = next_guard;
331            if wait_result.timed_out() {
332                return Err(BookmarkWaitError::Timeout {
333                    target_lsn,
334                    applied_lsn: self.last_applied_lsn(),
335                });
336            }
337        }
338    }
339
340    /// Apply one logical change record. The state machine:
341    /// - first record after `starting_lsn == 0` → apply, anchor.
342    /// - `lsn == last + 1` → apply, advance.
343    /// - `lsn == last` && payload hash equal → idempotent skip.
344    /// - `lsn == last` && payload hash differs → `Divergence` (fail closed).
345    /// - `lsn < last` → older replay, skip with debug log.
346    /// - `lsn > last + 1` → `Gap` (fail closed; caller marks unhealthy).
347    pub fn apply(
348        &self,
349        db: &RedDB,
350        record: &ChangeRecord,
351        mode: ApplyMode,
352    ) -> Result<ApplyOutcome, LogicalApplyError> {
353        let last = self.last_applied_lsn.load(Ordering::Acquire);
354        let last_term = self.last_applied_term.load(Ordering::Acquire);
355
356        // Stale-term fence (issue #835, ADR 0030). A record from a term
357        // *behind* the highest term this replica has adopted is a returning
358        // ex-primary on a superseded timeline. Reject it before the LSN
359        // state machine runs — fail closed regardless of LSN, so a stale
360        // ex-primary can neither apply nor advance the chain/watermark. A
361        // record on the *same* term is admitted; a *higher* term is the new
362        // primary's timeline and is adopted on apply below. This mirrors the
363        // election-side `RefusalReason::StaleTerm` on the data path.
364        if record.term < last_term {
365            self.metrics.record(ApplyErrorKind::Fenced);
366            return Err(LogicalApplyError::StaleTermFenced {
367                record_term: record.term,
368                current_term: last_term,
369                lsn: record.lsn,
370            });
371        }
372
373        let payload_hash = record_payload_hash(record);
374        self.received_frontier_lsn
375            .fetch_max(record.lsn, Ordering::AcqRel);
376
377        if last == 0 && record.lsn > 0 {
378            self.do_apply(db, record, mode)?;
379            self.last_applied_term.store(record.term, Ordering::Release);
380            self.last_applied_lsn.store(record.lsn, Ordering::Release);
381            *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
382            self.apply_wait.1.notify_all();
383            return Ok(ApplyOutcome::Applied);
384        }
385
386        if record.lsn == last {
387            let prior = *self.last_payload_hash.lock().expect("payload hash mutex");
388            return match prior {
389                Some(p) if p == payload_hash => Ok(ApplyOutcome::Idempotent),
390                Some(p) => Err(LogicalApplyError::Divergence {
391                    expected_term: last_term,
392                    got_term: record.term,
393                    lsn: record.lsn,
394                    expected: hex_digest(&p),
395                    got: hex_digest(&payload_hash),
396                }),
397                None => Ok(ApplyOutcome::Idempotent),
398            };
399        }
400        if record.lsn < last {
401            return Ok(ApplyOutcome::Skipped);
402        }
403        if record.lsn > last + 1 {
404            return Err(LogicalApplyError::Gap {
405                last,
406                next: record.lsn,
407            });
408        }
409
410        self.do_apply(db, record, mode)?;
411        self.last_applied_term.store(record.term, Ordering::Release);
412        self.last_applied_lsn.store(record.lsn, Ordering::Release);
413        *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
414        self.apply_wait.1.notify_all();
415        Ok(ApplyOutcome::Applied)
416    }
417
418    fn do_apply(
419        &self,
420        db: &RedDB,
421        record: &ChangeRecord,
422        mode: ApplyMode,
423    ) -> Result<(), LogicalApplyError> {
424        Self::apply_record_with_metrics(db, record, mode, &self.metrics).map_err(|err| {
425            LogicalApplyError::Apply {
426                lsn: record.lsn,
427                source: err,
428            }
429        })
430    }
431
432    /// Stateless apply — applies the record without monotonicity
433    /// checks. Kept for callers that don't yet thread the stateful
434    /// applier through. New code should prefer
435    /// `LogicalChangeApplier::new()` + `apply()`. Apply misses (delete
436    /// against a missing target) are recorded into a throwaway metrics
437    /// handle; use [`apply_record_with_metrics`] to surface them.
438    pub fn apply_record(db: &RedDB, record: &ChangeRecord, mode: ApplyMode) -> RedDBResult<()> {
439        Self::apply_record_with_metrics(db, record, mode, &ReplicaApplyMetrics::default())
440    }
441
442    /// Stateless apply that records apply misses (issue #814) into
443    /// `metrics`. A delete against a missing collection or a missing
444    /// entity is a non-fatal divergence signal: it bumps
445    /// `ApplyErrorKind::Miss` and emits a structured warn line, but still
446    /// returns `Ok(())` so the LSN chain advances and idempotent re-pull
447    /// (#813) converges. A genuine (non-missing-target) store error on a
448    /// delete propagates as a real apply error — counted, fail-closed —
449    /// rather than being swallowed by the old `let _ =`.
450    pub fn apply_record_with_metrics(
451        db: &RedDB,
452        record: &ChangeRecord,
453        _mode: ApplyMode,
454        metrics: &ReplicaApplyMetrics,
455    ) -> RedDBResult<()> {
456        let store = db.store();
457        match record.operation {
458            ChangeOperation::Delete => {
459                match store.delete(&record.collection, EntityId::new(record.entity_id)) {
460                    Ok(true) => {}
461                    Ok(false) => {
462                        // Target collection existed but no such entity —
463                        // the delete found nothing to remove.
464                        metrics.record(ApplyErrorKind::Miss);
465                        tracing::warn!(
466                            target: "reddb::replication::apply",
467                            lsn = record.lsn,
468                            collection = %record.collection,
469                            entity_id = record.entity_id,
470                            "replica delete found no matching entity; recorded apply miss (non-fatal divergence signal)"
471                        );
472                    }
473                    Err(crate::storage::StoreError::CollectionNotFound(name)) => {
474                        // The whole collection is absent on the replica —
475                        // a missed delete that can drive count drift.
476                        metrics.record(ApplyErrorKind::Miss);
477                        tracing::warn!(
478                            target: "reddb::replication::apply",
479                            lsn = record.lsn,
480                            collection = %name,
481                            entity_id = record.entity_id,
482                            "replica delete against missing collection; recorded apply miss (non-fatal divergence signal)"
483                        );
484                    }
485                    Err(err) => {
486                        // A real store error is a genuine apply failure:
487                        // surface it instead of discarding it so the
488                        // caller counts it and the replica fails closed.
489                        return Err(RedDBError::Internal(err.to_string()));
490                    }
491                }
492            }
493            ChangeOperation::Refresh => {
494                // Issue #596 slice 9d — replica replay of
495                // `REFRESH MATERIALIZED VIEW v`. The primary
496                // emitted the serialized backing-collection
497                // contents in `refresh_records`; apply the
498                // atomic swap on the replica's local store
499                // (which also persists a `RefreshCollection`
500                // WAL action so the post-swap contents survive
501                // a replica restart).
502                let records = record.refresh_records.clone().ok_or_else(|| {
503                    RedDBError::Internal(
504                        "replication refresh record missing refresh_records payload".to_string(),
505                    )
506                })?;
507                store
508                    .refresh_collection_from_records(&record.collection, records)
509                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
510            }
511            ChangeOperation::Insert | ChangeOperation::Update => {
512                let Some(bytes) = &record.entity_bytes else {
513                    return Err(RedDBError::Internal(
514                        "replication record missing entity payload".to_string(),
515                    ));
516                };
517                let entity = UnifiedStore::deserialize_entity(bytes, store.format_version())
518                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
519
520                // Issue #813 — MVCC table-row supersession on the replica.
521                //
522                // A table-row UPDATE on the primary installs a NEW physical
523                // version (fresh `EntityId`) that shares the row's stable
524                // `logical_id`, and marks the prior version superseded
525                // (`xmax != 0`) so snapshot reads skip it. Only the new
526                // version travels on the wire — the prior version's `xmax`
527                // bump is implicit. Without reproducing it here the replica
528                // leaves every prior version LIVE, so each update to a row
529                // accumulates a stale live duplicate and a full re-pull
530                // replays them all (the observed 22× inflation). Before
531                // upserting the incoming version, mark any *other* live
532                // version of the same logical id superseded — mirroring
533                // `install_versioned_table_row_update` on the primary. This
534                // is idempotent under re-pull: re-applying a record updates
535                // its version in place (resetting its `xmax` from the
536                // serialized bytes), and the last writer per logical id in
537                // LSN order wins, converging on the primary's live set.
538                if matches!(entity.kind, EntityKind::TableRow { .. }) {
539                    let logical = entity.logical_id();
540                    let new_id = entity.id;
541                    let superseding_xid = if entity.xmin != 0 { entity.xmin } else { 1 };
542                    let stale: Vec<_> = store
543                        .table_row_versions_by_logical_id(&record.collection, logical)
544                        .into_iter()
545                        .filter(|version| version.id != new_id && version.xmax == 0)
546                        .collect();
547                    if !stale.is_empty() {
548                        let manager = store
549                            .get_collection(&record.collection)
550                            .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
551                        for mut version in stale {
552                            version.set_xmax(superseding_xid);
553                            manager
554                                .update(version)
555                                .map_err(|err| RedDBError::Internal(err.to_string()))?;
556                        }
557                    }
558                }
559
560                let exists = store
561                    .get(&record.collection, EntityId::new(record.entity_id))
562                    .is_some();
563                if exists {
564                    let manager = store
565                        .get_collection(&record.collection)
566                        .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
567                    manager
568                        .update(entity.clone())
569                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
570                } else {
571                    store
572                        .insert_auto(&record.collection, entity.clone())
573                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
574                }
575                if let Some(metadata_json) = &record.metadata {
576                    let metadata_json = wire_json_to_server_json(metadata_json);
577                    let metadata = metadata_from_json(&metadata_json)
578                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
579                    store
580                        .set_metadata(&record.collection, entity.id, metadata)
581                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
582                }
583                store
584                    .context_index()
585                    .index_entity(&record.collection, &entity);
586            }
587        }
588        Ok(())
589    }
590}
591
592fn record_payload_hash(record: &ChangeRecord) -> [u8; 32] {
593    let mut hasher = crate::crypto::sha256::Sha256::new();
594    hasher.update(&record.term.to_le_bytes());
595    hasher.update(&record.lsn.to_le_bytes());
596    hasher.update(&[record.operation as u8]);
597    hasher.update(record.collection.as_bytes());
598    hasher.update(&record.entity_id.to_le_bytes());
599    if let Some(bytes) = &record.entity_bytes {
600        hasher.update(bytes);
601    }
602    // Issue #596 slice 9d — refresh payload participates in the
603    // payload-hash so the same-LSN-idempotent / different-payload-
604    // divergence state machine works for Refresh records too.
605    if let Some(records) = &record.refresh_records {
606        hasher.update(&(records.len() as u64).to_le_bytes());
607        for r in records {
608            hasher.update(&(r.len() as u64).to_le_bytes());
609            hasher.update(r);
610        }
611    }
612    hasher.finalize()
613}
614
615fn hex_digest(bytes: &[u8; 32]) -> String {
616    crate::utils::to_hex(bytes)
617}
618
619#[cfg(test)]
620mod tests {
621    use super::*;
622    use crate::replication::cdc::ChangeOperation;
623    use crate::storage::schema::Value;
624    use crate::storage::{EntityData, EntityId, EntityKind, RedDB, RowData, UnifiedEntity};
625    use std::sync::Arc;
626
627    fn open_db() -> (RedDB, std::path::PathBuf) {
628        let path = std::env::temp_dir().join(format!(
629            "reddb_logical_apply_{}_{}",
630            std::process::id(),
631            std::time::SystemTime::now()
632                .duration_since(std::time::UNIX_EPOCH)
633                .unwrap()
634                .as_nanos()
635        ));
636        let _ = std::fs::remove_file(&path);
637        let db = RedDB::open(&path).unwrap();
638        (db, path)
639    }
640
641    fn record(lsn: u64, payload: &[u8]) -> ChangeRecord {
642        let timestamp = 100 + lsn;
643        let mut entity = UnifiedEntity::new(
644            EntityId::new(lsn),
645            EntityKind::TableRow {
646                table: Arc::from("users"),
647                row_id: lsn,
648            },
649            EntityData::Row(RowData::with_names(
650                vec![Value::UnsignedInteger(lsn), Value::Blob(payload.to_vec())],
651                vec!["id".to_string(), "payload".to_string()],
652            )),
653        );
654        entity.created_at = timestamp;
655        entity.updated_at = timestamp;
656        entity.sequence_id = lsn;
657        change_record_from_entity(
658            lsn,
659            timestamp,
660            ChangeOperation::Insert,
661            "users",
662            "row",
663            &entity,
664            crate::api::REDDB_FORMAT_VERSION,
665            None,
666        )
667    }
668
669    fn delete_record(lsn: u64, collection: &str, entity_id: u64) -> ChangeRecord {
670        ChangeRecord {
671            term: crate::replication::DEFAULT_REPLICATION_TERM,
672            lsn,
673            timestamp: 100 + lsn,
674            operation: ChangeOperation::Delete,
675            collection: collection.to_string(),
676            entity_id,
677            entity_kind: "row".to_string(),
678            entity_bytes: None,
679            metadata: None,
680            refresh_records: None,
681        }
682    }
683
684    fn table_row_entity(id: u64) -> UnifiedEntity {
685        let mut entity = UnifiedEntity::new(
686            EntityId::new(id),
687            EntityKind::TableRow {
688                table: Arc::from("users"),
689                row_id: id,
690            },
691            EntityData::Row(RowData::with_names(
692                vec![Value::UnsignedInteger(id)],
693                vec!["id".to_string()],
694            )),
695        );
696        entity.created_at = 100 + id;
697        entity.updated_at = 100 + id;
698        entity.sequence_id = id;
699        entity
700    }
701
702    // Issue #814 — a delete against a missing collection must record an
703    // apply miss (not a silent no-op) and still return Ok so the LSN
704    // chain advances (idempotent re-pull, #813).
705    #[test]
706    fn delete_against_missing_collection_records_apply_miss() {
707        let (db, path) = open_db();
708        let metrics = ReplicaApplyMetrics::default();
709        let before = metrics.apply_miss_total.load(Ordering::Relaxed);
710
711        LogicalChangeApplier::apply_record_with_metrics(
712            &db,
713            &delete_record(1, "no_such_collection", 42),
714            ApplyMode::Replica,
715            &metrics,
716        )
717        .expect("missing-target delete is non-fatal");
718
719        assert_eq!(
720            metrics.apply_miss_total.load(Ordering::Relaxed),
721            before + 1,
722            "delete against a missing collection must bump the apply-miss signal"
723        );
724        let _ = std::fs::remove_file(path);
725    }
726
727    // Issue #814 — a delete against an existing collection but absent
728    // entity is likewise a recorded miss, not a swallowed no-op.
729    #[test]
730    fn delete_against_missing_entity_records_apply_miss() {
731        let (db, path) = open_db();
732        let _ = db.store().get_or_create_collection("users");
733        let metrics = ReplicaApplyMetrics::default();
734
735        LogicalChangeApplier::apply_record_with_metrics(
736            &db,
737            &delete_record(1, "users", 9999),
738            ApplyMode::Replica,
739            &metrics,
740        )
741        .expect("missing-entity delete is non-fatal");
742
743        assert_eq!(
744            metrics.apply_miss_total.load(Ordering::Relaxed),
745            1,
746            "delete of an absent entity must bump the apply-miss signal"
747        );
748        let _ = std::fs::remove_file(path);
749    }
750
751    // Issue #814 — the normal path (target present) deletes without
752    // firing the miss signal. No behavioral regression.
753    #[test]
754    fn delete_of_present_target_records_no_apply_miss() {
755        let (db, path) = open_db();
756        let store = db.store();
757        let _ = store.get_or_create_collection("users");
758        let id = store
759            .insert_auto("users", table_row_entity(1))
760            .expect("insert entity");
761        let metrics = ReplicaApplyMetrics::default();
762
763        LogicalChangeApplier::apply_record_with_metrics(
764            &db,
765            &delete_record(1, "users", id.raw()),
766            ApplyMode::Replica,
767            &metrics,
768        )
769        .expect("present-target delete applies");
770
771        assert_eq!(
772            metrics.apply_miss_total.load(Ordering::Relaxed),
773            0,
774            "deleting a present target must not fire the apply-miss signal"
775        );
776        assert!(
777            store.get("users", id).is_none(),
778            "the entity must actually be removed on the normal path"
779        );
780        let _ = std::fs::remove_file(path);
781    }
782
783    // Issue #814 — the shared-metrics handle on the stateful applier
784    // surfaces the miss so `/metrics` (which reads the same Arc) sees it.
785    #[test]
786    fn stateful_apply_surfaces_delete_miss_via_metrics_handle() {
787        let (db, path) = open_db();
788        let applier =
789            LogicalChangeApplier::with_metrics(0, Arc::new(ReplicaApplyMetrics::default()));
790
791        applier
792            .apply(&db, &delete_record(1, "ghost", 7), ApplyMode::Replica)
793            .expect("missing-target delete advances the chain");
794
795        assert_eq!(
796            applier.metrics().apply_miss_total.load(Ordering::Relaxed),
797            1,
798            "the applier's shared metrics handle must record the miss"
799        );
800        assert_eq!(
801            applier.last_applied_lsn(),
802            1,
803            "a non-fatal miss still advances the LSN chain"
804        );
805        let _ = std::fs::remove_file(path);
806    }
807
808    #[test]
809    fn apply_advances_on_monotonic_lsn() {
810        let (db, path) = open_db();
811        let applier = LogicalChangeApplier::new(0);
812        assert_eq!(
813            applier
814                .apply(&db, &record(1, b"a"), ApplyMode::Replica)
815                .unwrap(),
816            ApplyOutcome::Applied
817        );
818        assert_eq!(applier.last_applied_lsn(), 1);
819        assert_eq!(
820            applier
821                .apply(&db, &record(2, b"b"), ApplyMode::Replica)
822                .unwrap(),
823            ApplyOutcome::Applied
824        );
825        assert_eq!(applier.last_applied_lsn(), 2);
826        let _ = std::fs::remove_file(path);
827    }
828
829    #[test]
830    fn apply_idempotent_on_duplicate_lsn_same_payload() {
831        let (db, path) = open_db();
832        let applier = LogicalChangeApplier::new(0);
833        let r = record(5, b"same");
834        applier.apply(&db, &r, ApplyMode::Replica).unwrap();
835        assert_eq!(
836            applier.apply(&db, &r, ApplyMode::Replica).unwrap(),
837            ApplyOutcome::Idempotent
838        );
839        assert_eq!(applier.last_applied_lsn(), 5);
840        let _ = std::fs::remove_file(path);
841    }
842
843    #[test]
844    fn apply_fails_closed_on_lsn_collision_diff_payload() {
845        let (db, path) = open_db();
846        let applier = LogicalChangeApplier::new(0);
847        applier
848            .apply(&db, &record(7, b"first"), ApplyMode::Replica)
849            .unwrap();
850        let err = applier
851            .apply(&db, &record(7, b"different"), ApplyMode::Replica)
852            .unwrap_err();
853        assert!(
854            matches!(err, LogicalApplyError::Divergence { lsn: 7, .. }),
855            "got {err:?}"
856        );
857        let _ = std::fs::remove_file(path);
858    }
859
860    #[test]
861    fn apply_fails_closed_on_same_lsn_different_term() {
862        let (db, path) = open_db();
863        let applier = LogicalChangeApplier::new(0);
864        applier
865            .apply(&db, &record(7, b"same").with_term(1), ApplyMode::Replica)
866            .unwrap();
867        let err = applier
868            .apply(&db, &record(7, b"same").with_term(2), ApplyMode::Replica)
869            .unwrap_err();
870        assert!(
871            matches!(
872                err,
873                LogicalApplyError::Divergence {
874                    lsn: 7,
875                    expected_term: 1,
876                    got_term: 2,
877                    ..
878                }
879            ),
880            "got {err:?}"
881        );
882        assert_eq!(applier.last_applied_term(), 1);
883        assert_eq!(applier.last_applied_lsn(), 7);
884        let _ = std::fs::remove_file(path);
885    }
886
887    // Issue #835 — a record from a term behind the replica's adopted term
888    // is fenced at the apply boundary: rejected, counted, and crucially the
889    // LSN/term chain does NOT advance, so a returning ex-primary on a stale
890    // term cannot move any watermark.
891    #[test]
892    fn apply_fences_stale_term_record() {
893        let (db, path) = open_db();
894        let applier = LogicalChangeApplier::new(0);
895
896        // Replica adopts term 5 from the legitimate primary at lsn 1.
897        applier
898            .apply(&db, &record(1, b"a").with_term(5), ApplyMode::Replica)
899            .unwrap();
900        assert_eq!(applier.last_applied_term(), 5);
901        assert_eq!(applier.last_applied_lsn(), 1);
902
903        // A returning ex-primary streams the next record on the old term 4.
904        let before = applier.metrics().fenced_total.load(Ordering::Relaxed);
905        let err = applier
906            .apply(&db, &record(2, b"b").with_term(4), ApplyMode::Replica)
907            .unwrap_err();
908        assert!(
909            matches!(
910                err,
911                LogicalApplyError::StaleTermFenced {
912                    record_term: 4,
913                    current_term: 5,
914                    lsn: 2,
915                }
916            ),
917            "got {err:?}"
918        );
919        assert_eq!(err.kind(), ApplyErrorKind::Fenced);
920        assert_eq!(
921            applier.metrics().fenced_total.load(Ordering::Relaxed),
922            before + 1,
923            "the fence must leave a metrics trail"
924        );
925        // The fenced record advanced nothing — no apply, no watermark move.
926        assert_eq!(applier.last_applied_lsn(), 1, "watermark must not advance");
927        assert_eq!(applier.last_applied_term(), 5);
928        assert_eq!(
929            applier.received_frontier_lsn(),
930            1,
931            "a fenced record must not even advance the received frontier"
932        );
933        let _ = std::fs::remove_file(path);
934    }
935
936    // The fence only bites a *behind* term. A record on the same term
937    // applies normally, and a record on a higher term is the new primary's
938    // timeline — adopted on apply.
939    #[test]
940    fn apply_admits_same_term_and_adopts_higher_term() {
941        let (db, path) = open_db();
942        let applier = LogicalChangeApplier::new(0);
943
944        applier
945            .apply(&db, &record(1, b"a").with_term(3), ApplyMode::Replica)
946            .unwrap();
947        // Same term → admitted.
948        applier
949            .apply(&db, &record(2, b"b").with_term(3), ApplyMode::Replica)
950            .unwrap();
951        assert_eq!(applier.last_applied_term(), 3);
952        // Higher term → adopted.
953        applier
954            .apply(&db, &record(3, b"c").with_term(7), ApplyMode::Replica)
955            .unwrap();
956        assert_eq!(applier.last_applied_term(), 7);
957        assert_eq!(applier.last_applied_lsn(), 3);
958
959        // Now a record back on term 3 is fenced.
960        let err = applier
961            .apply(&db, &record(4, b"d").with_term(3), ApplyMode::Replica)
962            .unwrap_err();
963        assert!(
964            matches!(err, LogicalApplyError::StaleTermFenced { .. }),
965            "got {err:?}"
966        );
967        let _ = std::fs::remove_file(path);
968    }
969
970    #[test]
971    fn apply_skips_older_lsn() {
972        let (db, path) = open_db();
973        let applier = LogicalChangeApplier::new(0);
974        applier
975            .apply(&db, &record(1, b"a"), ApplyMode::Replica)
976            .unwrap();
977        applier
978            .apply(&db, &record(2, b"b"), ApplyMode::Replica)
979            .unwrap();
980        assert_eq!(
981            applier
982                .apply(&db, &record(1, b"a"), ApplyMode::Replica)
983                .unwrap(),
984            ApplyOutcome::Skipped
985        );
986        assert_eq!(applier.last_applied_lsn(), 2);
987        let _ = std::fs::remove_file(path);
988    }
989
990    #[test]
991    fn apply_returns_gap_on_future_lsn() {
992        let (db, path) = open_db();
993        let applier = LogicalChangeApplier::new(0);
994        applier
995            .apply(&db, &record(1, b"a"), ApplyMode::Replica)
996            .unwrap();
997        let err = applier
998            .apply(&db, &record(5, b"e"), ApplyMode::Replica)
999            .unwrap_err();
1000        assert!(
1001            matches!(err, LogicalApplyError::Gap { last: 1, next: 5 }),
1002            "got {err:?}"
1003        );
1004        assert_eq!(applier.last_applied_lsn(), 1);
1005        let _ = std::fs::remove_file(path);
1006    }
1007}