Skip to main content

reddb_server/replication/
logical.rs

1//! Logical replication helpers shared by replica apply and point-in-time restore.
2
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Condvar, Mutex};
5
6use crate::api::{RedDBError, RedDBResult};
7use crate::application::entity::metadata_from_json;
8use crate::replication::cdc::{ChangeOperation, ChangeRecord};
9use crate::storage::{EntityId, EntityKind, RedDB, UnifiedStore};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ApplyMode {
13    Replica,
14    Restore,
15}
16
17/// PLAN.md Phase 11.5 — counters the replica apply loop bumps when an
18/// invariant breaks. Surfaced via `reddb_replica_apply_errors_total`.
19/// Decode errors aren't strictly apply errors but they share the same
20/// observability lane so dashboards alert on "replica is ingesting
21/// trash from primary regardless of cause".
22#[derive(Debug, Default)]
23pub struct ReplicaApplyMetrics {
24    pub gap_total: std::sync::atomic::AtomicU64,
25    pub divergence_total: std::sync::atomic::AtomicU64,
26    pub apply_error_total: std::sync::atomic::AtomicU64,
27    pub decode_error_total: std::sync::atomic::AtomicU64,
28    /// Issue #814 — a delete (or other apply) that found no target on the
29    /// replica: a missing collection or a missing entity. Non-fatal (the
30    /// LSN chain still advances so idempotent re-pull converges, see
31    /// #813), but recorded so a missed delete that drives collection-count
32    /// drift leaves a trail instead of being swallowed by `let _ =`.
33    pub apply_miss_total: std::sync::atomic::AtomicU64,
34}
35
36impl ReplicaApplyMetrics {
37    pub fn record(&self, kind: ApplyErrorKind) {
38        use std::sync::atomic::Ordering::Relaxed;
39        match kind {
40            ApplyErrorKind::Gap => {
41                self.gap_total.fetch_add(1, Relaxed);
42            }
43            ApplyErrorKind::Divergence => {
44                self.divergence_total.fetch_add(1, Relaxed);
45            }
46            ApplyErrorKind::Apply => {
47                self.apply_error_total.fetch_add(1, Relaxed);
48            }
49            ApplyErrorKind::Decode => {
50                self.decode_error_total.fetch_add(1, Relaxed);
51            }
52            ApplyErrorKind::Miss => {
53                self.apply_miss_total.fetch_add(1, Relaxed);
54            }
55        }
56    }
57
58    pub fn snapshot(&self) -> [(ApplyErrorKind, u64); 5] {
59        use std::sync::atomic::Ordering::Relaxed;
60        [
61            (ApplyErrorKind::Gap, self.gap_total.load(Relaxed)),
62            (
63                ApplyErrorKind::Divergence,
64                self.divergence_total.load(Relaxed),
65            ),
66            (ApplyErrorKind::Apply, self.apply_error_total.load(Relaxed)),
67            (
68                ApplyErrorKind::Decode,
69                self.decode_error_total.load(Relaxed),
70            ),
71            (ApplyErrorKind::Miss, self.apply_miss_total.load(Relaxed)),
72        ]
73    }
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77pub enum ApplyErrorKind {
78    Gap,
79    Divergence,
80    Apply,
81    Decode,
82    /// Issue #814 — apply ran against a missing target (delete on an
83    /// absent collection/entity). Non-fatal divergence signal.
84    Miss,
85}
86
87impl ApplyErrorKind {
88    pub fn label(self) -> &'static str {
89        match self {
90            Self::Gap => "gap",
91            Self::Divergence => "divergence",
92            Self::Apply => "apply",
93            Self::Decode => "decode",
94            Self::Miss => "apply_miss",
95        }
96    }
97}
98
99impl LogicalApplyError {
100    pub fn kind(&self) -> ApplyErrorKind {
101        match self {
102            Self::Gap { .. } => ApplyErrorKind::Gap,
103            Self::Divergence { .. } => ApplyErrorKind::Divergence,
104            Self::Apply { .. } => ApplyErrorKind::Apply,
105        }
106    }
107}
108
109/// Outcome of a single `apply` call. `Applied` advances the chain;
110/// `Idempotent` and `Skipped` are no-ops (we already saw an
111/// equal-or-newer LSN). `Gap` and `Divergence` (returned via
112/// `LogicalApplyError`) are fail-closed — callers (replica fetcher,
113/// restore loop) should mark the instance unhealthy and stop applying.
114#[derive(Debug, Clone, Copy, PartialEq, Eq)]
115pub enum ApplyOutcome {
116    /// Normal monotonic advance.
117    Applied,
118    /// Same LSN as last applied with same payload hash — log + skip.
119    Idempotent,
120    /// Older LSN than what we already have — log + skip.
121    Skipped,
122}
123
124#[derive(Debug)]
125pub enum LogicalApplyError {
126    Gap {
127        last: u64,
128        next: u64,
129    },
130    Divergence {
131        expected_term: u64,
132        got_term: u64,
133        lsn: u64,
134        expected: String,
135        got: String,
136    },
137    Apply {
138        lsn: u64,
139        source: RedDBError,
140    },
141}
142
143impl std::fmt::Display for LogicalApplyError {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        match self {
146            Self::Gap { last, next } => write!(f, "LSN gap on apply: last={last} next={next}"),
147            Self::Divergence {
148                expected_term,
149                got_term,
150                lsn,
151                expected,
152                got,
153            } => write!(
154                f,
155                "LSN divergence on apply at term/lsn=({got_term},{lsn}): expected term {expected_term} payload hash {expected}, got {got}"
156            ),
157            Self::Apply { lsn, source } => write!(f, "apply error at lsn={lsn}: {source}"),
158        }
159    }
160}
161
162impl std::error::Error for LogicalApplyError {}
163
164#[derive(Debug, Clone, PartialEq, Eq)]
165pub enum BookmarkWaitError {
166    Timeout { target_lsn: u64, applied_lsn: u64 },
167    TermMismatch { target_term: u64, applied_term: u64 },
168}
169
170impl BookmarkWaitError {
171    pub fn is_timeout(&self) -> bool {
172        matches!(self, Self::Timeout { .. })
173    }
174}
175
176impl std::fmt::Display for BookmarkWaitError {
177    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178        match self {
179            Self::Timeout {
180                target_lsn,
181                applied_lsn,
182            } => write!(
183                f,
184                "timed out waiting for causal bookmark lsn {target_lsn}; applied={applied_lsn}"
185            ),
186            Self::TermMismatch {
187                target_term,
188                applied_term,
189            } => write!(
190                f,
191                "causal bookmark term mismatch: target={target_term} applied={applied_term}"
192            ),
193        }
194    }
195}
196
197impl std::error::Error for BookmarkWaitError {}
198
199/// Shared logical change applier so replica replay and PITR converge on the
200/// same semantics. Stateful (PLAN.md Phase 11.5): tracks the last applied
201/// LSN + payload hash so duplicates / older LSNs / gaps / divergences are
202/// detected explicitly.
203pub struct LogicalChangeApplier {
204    last_applied_term: AtomicU64,
205    last_applied_lsn: AtomicU64,
206    received_frontier_lsn: AtomicU64,
207    last_payload_hash: Mutex<Option<[u8; 32]>>,
208    apply_wait: (Mutex<()>, Condvar),
209    /// Issue #814 — metrics the apply path bumps when a record runs
210    /// against a missing target. The production replica loop shares the
211    /// runtime's `ReplicaApplyMetrics` here so `/metrics` surfaces misses;
212    /// other callers (PITR, tests) get a private default that no one reads.
213    metrics: std::sync::Arc<ReplicaApplyMetrics>,
214}
215
216impl LogicalChangeApplier {
217    /// Build a fresh applier. `starting_lsn` is the LSN already
218    /// covered by the snapshot (or `0` for an empty replica). The
219    /// next acceptable record is any positive LSN; from there the
220    /// chain advances by 1.
221    pub fn new(starting_lsn: u64) -> Self {
222        Self::with_metrics(
223            starting_lsn,
224            std::sync::Arc::new(ReplicaApplyMetrics::default()),
225        )
226    }
227
228    /// Build an applier that records apply misses / errors into a shared
229    /// `ReplicaApplyMetrics` (issue #814). The production replica loop
230    /// passes the runtime's metrics so a swallowed delete leaves a trail
231    /// on `reddb_replica_apply_errors_total{kind="apply_miss"}`.
232    pub fn with_metrics(starting_lsn: u64, metrics: std::sync::Arc<ReplicaApplyMetrics>) -> Self {
233        Self {
234            last_applied_term: AtomicU64::new(crate::replication::DEFAULT_REPLICATION_TERM),
235            last_applied_lsn: AtomicU64::new(starting_lsn),
236            received_frontier_lsn: AtomicU64::new(starting_lsn),
237            last_payload_hash: Mutex::new(None),
238            apply_wait: (Mutex::new(()), Condvar::new()),
239            metrics,
240        }
241    }
242
243    /// The metrics handle this applier records misses/errors into.
244    pub fn metrics(&self) -> &std::sync::Arc<ReplicaApplyMetrics> {
245        &self.metrics
246    }
247
248    pub fn last_applied_lsn(&self) -> u64 {
249        self.last_applied_lsn.load(Ordering::Acquire)
250    }
251
252    pub fn received_frontier_lsn(&self) -> u64 {
253        self.received_frontier_lsn.load(Ordering::Acquire)
254    }
255
256    pub fn last_applied_term(&self) -> u64 {
257        self.last_applied_term.load(Ordering::Acquire)
258    }
259
260    pub fn wait_for_bookmark(
261        &self,
262        bookmark: &crate::replication::CausalBookmark,
263        timeout: std::time::Duration,
264    ) -> Result<(), BookmarkWaitError> {
265        let deadline = std::time::Instant::now() + timeout;
266        let target_lsn = bookmark.commit_lsn();
267        let target_term = bookmark.term();
268
269        let mut guard = self.apply_wait.0.lock().expect("apply wait mutex");
270        loop {
271            let applied_lsn = self.last_applied_lsn();
272            let applied_term = self.last_applied_term();
273            if applied_lsn >= target_lsn {
274                if applied_term == target_term {
275                    return Ok(());
276                }
277                return Err(BookmarkWaitError::TermMismatch {
278                    target_term,
279                    applied_term,
280                });
281            }
282
283            let now = std::time::Instant::now();
284            if now >= deadline {
285                return Err(BookmarkWaitError::Timeout {
286                    target_lsn,
287                    applied_lsn,
288                });
289            }
290            let remaining = deadline.saturating_duration_since(now);
291            let (next_guard, wait_result) = self
292                .apply_wait
293                .1
294                .wait_timeout(guard, remaining)
295                .expect("apply wait condvar");
296            guard = next_guard;
297            if wait_result.timed_out() {
298                return Err(BookmarkWaitError::Timeout {
299                    target_lsn,
300                    applied_lsn: self.last_applied_lsn(),
301                });
302            }
303        }
304    }
305
306    /// Apply one logical change record. The state machine:
307    /// - first record after `starting_lsn == 0` → apply, anchor.
308    /// - `lsn == last + 1` → apply, advance.
309    /// - `lsn == last` && payload hash equal → idempotent skip.
310    /// - `lsn == last` && payload hash differs → `Divergence` (fail closed).
311    /// - `lsn < last` → older replay, skip with debug log.
312    /// - `lsn > last + 1` → `Gap` (fail closed; caller marks unhealthy).
313    pub fn apply(
314        &self,
315        db: &RedDB,
316        record: &ChangeRecord,
317        mode: ApplyMode,
318    ) -> Result<ApplyOutcome, LogicalApplyError> {
319        let last = self.last_applied_lsn.load(Ordering::Acquire);
320        let last_term = self.last_applied_term.load(Ordering::Acquire);
321        let payload_hash = record_payload_hash(record);
322        self.received_frontier_lsn
323            .fetch_max(record.lsn, Ordering::AcqRel);
324
325        if last == 0 && record.lsn > 0 {
326            self.do_apply(db, record, mode)?;
327            self.last_applied_term.store(record.term, Ordering::Release);
328            self.last_applied_lsn.store(record.lsn, Ordering::Release);
329            *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
330            self.apply_wait.1.notify_all();
331            return Ok(ApplyOutcome::Applied);
332        }
333
334        if record.lsn == last {
335            let prior = *self.last_payload_hash.lock().expect("payload hash mutex");
336            return match prior {
337                Some(p) if p == payload_hash => Ok(ApplyOutcome::Idempotent),
338                Some(p) => Err(LogicalApplyError::Divergence {
339                    expected_term: last_term,
340                    got_term: record.term,
341                    lsn: record.lsn,
342                    expected: hex_digest(&p),
343                    got: hex_digest(&payload_hash),
344                }),
345                None => Ok(ApplyOutcome::Idempotent),
346            };
347        }
348        if record.lsn < last {
349            return Ok(ApplyOutcome::Skipped);
350        }
351        if record.lsn > last + 1 {
352            return Err(LogicalApplyError::Gap {
353                last,
354                next: record.lsn,
355            });
356        }
357
358        self.do_apply(db, record, mode)?;
359        self.last_applied_term.store(record.term, Ordering::Release);
360        self.last_applied_lsn.store(record.lsn, Ordering::Release);
361        *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
362        self.apply_wait.1.notify_all();
363        Ok(ApplyOutcome::Applied)
364    }
365
366    fn do_apply(
367        &self,
368        db: &RedDB,
369        record: &ChangeRecord,
370        mode: ApplyMode,
371    ) -> Result<(), LogicalApplyError> {
372        Self::apply_record_with_metrics(db, record, mode, &self.metrics).map_err(|err| {
373            LogicalApplyError::Apply {
374                lsn: record.lsn,
375                source: err,
376            }
377        })
378    }
379
380    /// Stateless apply — applies the record without monotonicity
381    /// checks. Kept for callers that don't yet thread the stateful
382    /// applier through. New code should prefer
383    /// `LogicalChangeApplier::new()` + `apply()`. Apply misses (delete
384    /// against a missing target) are recorded into a throwaway metrics
385    /// handle; use [`apply_record_with_metrics`] to surface them.
386    pub fn apply_record(db: &RedDB, record: &ChangeRecord, mode: ApplyMode) -> RedDBResult<()> {
387        Self::apply_record_with_metrics(db, record, mode, &ReplicaApplyMetrics::default())
388    }
389
390    /// Stateless apply that records apply misses (issue #814) into
391    /// `metrics`. A delete against a missing collection or a missing
392    /// entity is a non-fatal divergence signal: it bumps
393    /// `ApplyErrorKind::Miss` and emits a structured warn line, but still
394    /// returns `Ok(())` so the LSN chain advances and idempotent re-pull
395    /// (#813) converges. A genuine (non-missing-target) store error on a
396    /// delete propagates as a real apply error — counted, fail-closed —
397    /// rather than being swallowed by the old `let _ =`.
398    pub fn apply_record_with_metrics(
399        db: &RedDB,
400        record: &ChangeRecord,
401        _mode: ApplyMode,
402        metrics: &ReplicaApplyMetrics,
403    ) -> RedDBResult<()> {
404        let store = db.store();
405        match record.operation {
406            ChangeOperation::Delete => {
407                match store.delete(&record.collection, EntityId::new(record.entity_id)) {
408                    Ok(true) => {}
409                    Ok(false) => {
410                        // Target collection existed but no such entity —
411                        // the delete found nothing to remove.
412                        metrics.record(ApplyErrorKind::Miss);
413                        tracing::warn!(
414                            target: "reddb::replication::apply",
415                            lsn = record.lsn,
416                            collection = %record.collection,
417                            entity_id = record.entity_id,
418                            "replica delete found no matching entity; recorded apply miss (non-fatal divergence signal)"
419                        );
420                    }
421                    Err(crate::storage::StoreError::CollectionNotFound(name)) => {
422                        // The whole collection is absent on the replica —
423                        // a missed delete that can drive count drift.
424                        metrics.record(ApplyErrorKind::Miss);
425                        tracing::warn!(
426                            target: "reddb::replication::apply",
427                            lsn = record.lsn,
428                            collection = %name,
429                            entity_id = record.entity_id,
430                            "replica delete against missing collection; recorded apply miss (non-fatal divergence signal)"
431                        );
432                    }
433                    Err(err) => {
434                        // A real store error is a genuine apply failure:
435                        // surface it instead of discarding it so the
436                        // caller counts it and the replica fails closed.
437                        return Err(RedDBError::Internal(err.to_string()));
438                    }
439                }
440            }
441            ChangeOperation::Refresh => {
442                // Issue #596 slice 9d — replica replay of
443                // `REFRESH MATERIALIZED VIEW v`. The primary
444                // emitted the serialized backing-collection
445                // contents in `refresh_records`; apply the
446                // atomic swap on the replica's local store
447                // (which also persists a `RefreshCollection`
448                // WAL action so the post-swap contents survive
449                // a replica restart).
450                let records = record.refresh_records.clone().ok_or_else(|| {
451                    RedDBError::Internal(
452                        "replication refresh record missing refresh_records payload".to_string(),
453                    )
454                })?;
455                store
456                    .refresh_collection_from_records(&record.collection, records)
457                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
458            }
459            ChangeOperation::Insert | ChangeOperation::Update => {
460                let Some(bytes) = &record.entity_bytes else {
461                    return Err(RedDBError::Internal(
462                        "replication record missing entity payload".to_string(),
463                    ));
464                };
465                let entity = UnifiedStore::deserialize_entity(bytes, store.format_version())
466                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
467
468                // Issue #813 — MVCC table-row supersession on the replica.
469                //
470                // A table-row UPDATE on the primary installs a NEW physical
471                // version (fresh `EntityId`) that shares the row's stable
472                // `logical_id`, and marks the prior version superseded
473                // (`xmax != 0`) so snapshot reads skip it. Only the new
474                // version travels on the wire — the prior version's `xmax`
475                // bump is implicit. Without reproducing it here the replica
476                // leaves every prior version LIVE, so each update to a row
477                // accumulates a stale live duplicate and a full re-pull
478                // replays them all (the observed 22× inflation). Before
479                // upserting the incoming version, mark any *other* live
480                // version of the same logical id superseded — mirroring
481                // `install_versioned_table_row_update` on the primary. This
482                // is idempotent under re-pull: re-applying a record updates
483                // its version in place (resetting its `xmax` from the
484                // serialized bytes), and the last writer per logical id in
485                // LSN order wins, converging on the primary's live set.
486                if matches!(entity.kind, EntityKind::TableRow { .. }) {
487                    let logical = entity.logical_id();
488                    let new_id = entity.id;
489                    let superseding_xid = if entity.xmin != 0 { entity.xmin } else { 1 };
490                    let stale: Vec<_> = store
491                        .table_row_versions_by_logical_id(&record.collection, logical)
492                        .into_iter()
493                        .filter(|version| version.id != new_id && version.xmax == 0)
494                        .collect();
495                    if !stale.is_empty() {
496                        let manager = store
497                            .get_collection(&record.collection)
498                            .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
499                        for mut version in stale {
500                            version.set_xmax(superseding_xid);
501                            manager
502                                .update(version)
503                                .map_err(|err| RedDBError::Internal(err.to_string()))?;
504                        }
505                    }
506                }
507
508                let exists = store
509                    .get(&record.collection, EntityId::new(record.entity_id))
510                    .is_some();
511                if exists {
512                    let manager = store
513                        .get_collection(&record.collection)
514                        .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
515                    manager
516                        .update(entity.clone())
517                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
518                } else {
519                    store
520                        .insert_auto(&record.collection, entity.clone())
521                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
522                }
523                if let Some(metadata_json) = &record.metadata {
524                    let metadata = metadata_from_json(metadata_json)
525                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
526                    store
527                        .set_metadata(&record.collection, entity.id, metadata)
528                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
529                }
530                store
531                    .context_index()
532                    .index_entity(&record.collection, &entity);
533            }
534        }
535        Ok(())
536    }
537}
538
539fn record_payload_hash(record: &ChangeRecord) -> [u8; 32] {
540    let mut hasher = crate::crypto::sha256::Sha256::new();
541    hasher.update(&record.term.to_le_bytes());
542    hasher.update(&record.lsn.to_le_bytes());
543    hasher.update(&[record.operation as u8]);
544    hasher.update(record.collection.as_bytes());
545    hasher.update(&record.entity_id.to_le_bytes());
546    if let Some(bytes) = &record.entity_bytes {
547        hasher.update(bytes);
548    }
549    // Issue #596 slice 9d — refresh payload participates in the
550    // payload-hash so the same-LSN-idempotent / different-payload-
551    // divergence state machine works for Refresh records too.
552    if let Some(records) = &record.refresh_records {
553        hasher.update(&(records.len() as u64).to_le_bytes());
554        for r in records {
555            hasher.update(&(r.len() as u64).to_le_bytes());
556            hasher.update(r);
557        }
558    }
559    hasher.finalize()
560}
561
562fn hex_digest(bytes: &[u8; 32]) -> String {
563    crate::utils::to_hex(bytes)
564}
565
566#[cfg(test)]
567mod tests {
568    use super::*;
569    use crate::replication::cdc::ChangeOperation;
570    use crate::storage::schema::Value;
571    use crate::storage::{EntityData, EntityId, EntityKind, RedDB, RowData, UnifiedEntity};
572    use std::sync::Arc;
573
574    fn open_db() -> (RedDB, std::path::PathBuf) {
575        let path = std::env::temp_dir().join(format!(
576            "reddb_logical_apply_{}_{}",
577            std::process::id(),
578            std::time::SystemTime::now()
579                .duration_since(std::time::UNIX_EPOCH)
580                .unwrap()
581                .as_nanos()
582        ));
583        let _ = std::fs::remove_file(&path);
584        let db = RedDB::open(&path).unwrap();
585        (db, path)
586    }
587
588    fn record(lsn: u64, payload: &[u8]) -> ChangeRecord {
589        let timestamp = 100 + lsn;
590        let mut entity = UnifiedEntity::new(
591            EntityId::new(lsn),
592            EntityKind::TableRow {
593                table: Arc::from("users"),
594                row_id: lsn,
595            },
596            EntityData::Row(RowData::with_names(
597                vec![Value::UnsignedInteger(lsn), Value::Blob(payload.to_vec())],
598                vec!["id".to_string(), "payload".to_string()],
599            )),
600        );
601        entity.created_at = timestamp;
602        entity.updated_at = timestamp;
603        entity.sequence_id = lsn;
604        ChangeRecord::from_entity(
605            lsn,
606            timestamp,
607            ChangeOperation::Insert,
608            "users",
609            "row",
610            &entity,
611            crate::api::REDDB_FORMAT_VERSION,
612            None,
613        )
614    }
615
616    fn delete_record(lsn: u64, collection: &str, entity_id: u64) -> ChangeRecord {
617        ChangeRecord {
618            term: crate::replication::DEFAULT_REPLICATION_TERM,
619            lsn,
620            timestamp: 100 + lsn,
621            operation: ChangeOperation::Delete,
622            collection: collection.to_string(),
623            entity_id,
624            entity_kind: "row".to_string(),
625            entity_bytes: None,
626            metadata: None,
627            refresh_records: None,
628        }
629    }
630
631    fn table_row_entity(id: u64) -> UnifiedEntity {
632        let mut entity = UnifiedEntity::new(
633            EntityId::new(id),
634            EntityKind::TableRow {
635                table: Arc::from("users"),
636                row_id: id,
637            },
638            EntityData::Row(RowData::with_names(
639                vec![Value::UnsignedInteger(id)],
640                vec!["id".to_string()],
641            )),
642        );
643        entity.created_at = 100 + id;
644        entity.updated_at = 100 + id;
645        entity.sequence_id = id;
646        entity
647    }
648
649    // Issue #814 — a delete against a missing collection must record an
650    // apply miss (not a silent no-op) and still return Ok so the LSN
651    // chain advances (idempotent re-pull, #813).
652    #[test]
653    fn delete_against_missing_collection_records_apply_miss() {
654        let (db, path) = open_db();
655        let metrics = ReplicaApplyMetrics::default();
656        let before = metrics.apply_miss_total.load(Ordering::Relaxed);
657
658        LogicalChangeApplier::apply_record_with_metrics(
659            &db,
660            &delete_record(1, "no_such_collection", 42),
661            ApplyMode::Replica,
662            &metrics,
663        )
664        .expect("missing-target delete is non-fatal");
665
666        assert_eq!(
667            metrics.apply_miss_total.load(Ordering::Relaxed),
668            before + 1,
669            "delete against a missing collection must bump the apply-miss signal"
670        );
671        let _ = std::fs::remove_file(path);
672    }
673
674    // Issue #814 — a delete against an existing collection but absent
675    // entity is likewise a recorded miss, not a swallowed no-op.
676    #[test]
677    fn delete_against_missing_entity_records_apply_miss() {
678        let (db, path) = open_db();
679        let _ = db.store().get_or_create_collection("users");
680        let metrics = ReplicaApplyMetrics::default();
681
682        LogicalChangeApplier::apply_record_with_metrics(
683            &db,
684            &delete_record(1, "users", 9999),
685            ApplyMode::Replica,
686            &metrics,
687        )
688        .expect("missing-entity delete is non-fatal");
689
690        assert_eq!(
691            metrics.apply_miss_total.load(Ordering::Relaxed),
692            1,
693            "delete of an absent entity must bump the apply-miss signal"
694        );
695        let _ = std::fs::remove_file(path);
696    }
697
698    // Issue #814 — the normal path (target present) deletes without
699    // firing the miss signal. No behavioral regression.
700    #[test]
701    fn delete_of_present_target_records_no_apply_miss() {
702        let (db, path) = open_db();
703        let store = db.store();
704        let _ = store.get_or_create_collection("users");
705        let id = store
706            .insert_auto("users", table_row_entity(1))
707            .expect("insert entity");
708        let metrics = ReplicaApplyMetrics::default();
709
710        LogicalChangeApplier::apply_record_with_metrics(
711            &db,
712            &delete_record(1, "users", id.raw()),
713            ApplyMode::Replica,
714            &metrics,
715        )
716        .expect("present-target delete applies");
717
718        assert_eq!(
719            metrics.apply_miss_total.load(Ordering::Relaxed),
720            0,
721            "deleting a present target must not fire the apply-miss signal"
722        );
723        assert!(
724            store.get("users", id).is_none(),
725            "the entity must actually be removed on the normal path"
726        );
727        let _ = std::fs::remove_file(path);
728    }
729
730    // Issue #814 — the shared-metrics handle on the stateful applier
731    // surfaces the miss so `/metrics` (which reads the same Arc) sees it.
732    #[test]
733    fn stateful_apply_surfaces_delete_miss_via_metrics_handle() {
734        let (db, path) = open_db();
735        let applier =
736            LogicalChangeApplier::with_metrics(0, Arc::new(ReplicaApplyMetrics::default()));
737
738        applier
739            .apply(&db, &delete_record(1, "ghost", 7), ApplyMode::Replica)
740            .expect("missing-target delete advances the chain");
741
742        assert_eq!(
743            applier.metrics().apply_miss_total.load(Ordering::Relaxed),
744            1,
745            "the applier's shared metrics handle must record the miss"
746        );
747        assert_eq!(
748            applier.last_applied_lsn(),
749            1,
750            "a non-fatal miss still advances the LSN chain"
751        );
752        let _ = std::fs::remove_file(path);
753    }
754
755    #[test]
756    fn apply_advances_on_monotonic_lsn() {
757        let (db, path) = open_db();
758        let applier = LogicalChangeApplier::new(0);
759        assert_eq!(
760            applier
761                .apply(&db, &record(1, b"a"), ApplyMode::Replica)
762                .unwrap(),
763            ApplyOutcome::Applied
764        );
765        assert_eq!(applier.last_applied_lsn(), 1);
766        assert_eq!(
767            applier
768                .apply(&db, &record(2, b"b"), ApplyMode::Replica)
769                .unwrap(),
770            ApplyOutcome::Applied
771        );
772        assert_eq!(applier.last_applied_lsn(), 2);
773        let _ = std::fs::remove_file(path);
774    }
775
776    #[test]
777    fn apply_idempotent_on_duplicate_lsn_same_payload() {
778        let (db, path) = open_db();
779        let applier = LogicalChangeApplier::new(0);
780        let r = record(5, b"same");
781        applier.apply(&db, &r, ApplyMode::Replica).unwrap();
782        assert_eq!(
783            applier.apply(&db, &r, ApplyMode::Replica).unwrap(),
784            ApplyOutcome::Idempotent
785        );
786        assert_eq!(applier.last_applied_lsn(), 5);
787        let _ = std::fs::remove_file(path);
788    }
789
790    #[test]
791    fn apply_fails_closed_on_lsn_collision_diff_payload() {
792        let (db, path) = open_db();
793        let applier = LogicalChangeApplier::new(0);
794        applier
795            .apply(&db, &record(7, b"first"), ApplyMode::Replica)
796            .unwrap();
797        let err = applier
798            .apply(&db, &record(7, b"different"), ApplyMode::Replica)
799            .unwrap_err();
800        assert!(
801            matches!(err, LogicalApplyError::Divergence { lsn: 7, .. }),
802            "got {err:?}"
803        );
804        let _ = std::fs::remove_file(path);
805    }
806
807    #[test]
808    fn apply_fails_closed_on_same_lsn_different_term() {
809        let (db, path) = open_db();
810        let applier = LogicalChangeApplier::new(0);
811        applier
812            .apply(&db, &record(7, b"same").with_term(1), ApplyMode::Replica)
813            .unwrap();
814        let err = applier
815            .apply(&db, &record(7, b"same").with_term(2), ApplyMode::Replica)
816            .unwrap_err();
817        assert!(
818            matches!(
819                err,
820                LogicalApplyError::Divergence {
821                    lsn: 7,
822                    expected_term: 1,
823                    got_term: 2,
824                    ..
825                }
826            ),
827            "got {err:?}"
828        );
829        assert_eq!(applier.last_applied_term(), 1);
830        assert_eq!(applier.last_applied_lsn(), 7);
831        let _ = std::fs::remove_file(path);
832    }
833
834    #[test]
835    fn apply_skips_older_lsn() {
836        let (db, path) = open_db();
837        let applier = LogicalChangeApplier::new(0);
838        applier
839            .apply(&db, &record(1, b"a"), ApplyMode::Replica)
840            .unwrap();
841        applier
842            .apply(&db, &record(2, b"b"), ApplyMode::Replica)
843            .unwrap();
844        assert_eq!(
845            applier
846                .apply(&db, &record(1, b"a"), ApplyMode::Replica)
847                .unwrap(),
848            ApplyOutcome::Skipped
849        );
850        assert_eq!(applier.last_applied_lsn(), 2);
851        let _ = std::fs::remove_file(path);
852    }
853
854    #[test]
855    fn apply_returns_gap_on_future_lsn() {
856        let (db, path) = open_db();
857        let applier = LogicalChangeApplier::new(0);
858        applier
859            .apply(&db, &record(1, b"a"), ApplyMode::Replica)
860            .unwrap();
861        let err = applier
862            .apply(&db, &record(5, b"e"), ApplyMode::Replica)
863            .unwrap_err();
864        assert!(
865            matches!(err, LogicalApplyError::Gap { last: 1, next: 5 }),
866            "got {err:?}"
867        );
868        assert_eq!(applier.last_applied_lsn(), 1);
869        let _ = std::fs::remove_file(path);
870    }
871}