Skip to main content

reddb_server/replication/
logical.rs

1//! Logical replication helpers shared by replica apply and point-in-time restore.
2
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Mutex;
5
6use crate::api::{RedDBError, RedDBResult};
7use crate::application::entity::metadata_from_json;
8use crate::replication::cdc::{ChangeOperation, ChangeRecord};
9use crate::storage::{EntityId, RedDB, UnifiedStore};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ApplyMode {
13    Replica,
14    Restore,
15}
16
17/// PLAN.md Phase 11.5 — counters the replica apply loop bumps when an
18/// invariant breaks. Surfaced via `reddb_replica_apply_errors_total`.
19/// Decode errors aren't strictly apply errors but they share the same
20/// observability lane so dashboards alert on "replica is ingesting
21/// trash from primary regardless of cause".
22#[derive(Debug, Default)]
23pub struct ReplicaApplyMetrics {
24    pub gap_total: std::sync::atomic::AtomicU64,
25    pub divergence_total: std::sync::atomic::AtomicU64,
26    pub apply_error_total: std::sync::atomic::AtomicU64,
27    pub decode_error_total: std::sync::atomic::AtomicU64,
28}
29
30impl ReplicaApplyMetrics {
31    pub fn record(&self, kind: ApplyErrorKind) {
32        use std::sync::atomic::Ordering::Relaxed;
33        match kind {
34            ApplyErrorKind::Gap => {
35                self.gap_total.fetch_add(1, Relaxed);
36            }
37            ApplyErrorKind::Divergence => {
38                self.divergence_total.fetch_add(1, Relaxed);
39            }
40            ApplyErrorKind::Apply => {
41                self.apply_error_total.fetch_add(1, Relaxed);
42            }
43            ApplyErrorKind::Decode => {
44                self.decode_error_total.fetch_add(1, Relaxed);
45            }
46        }
47    }
48
49    pub fn snapshot(&self) -> [(ApplyErrorKind, u64); 4] {
50        use std::sync::atomic::Ordering::Relaxed;
51        [
52            (ApplyErrorKind::Gap, self.gap_total.load(Relaxed)),
53            (
54                ApplyErrorKind::Divergence,
55                self.divergence_total.load(Relaxed),
56            ),
57            (ApplyErrorKind::Apply, self.apply_error_total.load(Relaxed)),
58            (
59                ApplyErrorKind::Decode,
60                self.decode_error_total.load(Relaxed),
61            ),
62        ]
63    }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum ApplyErrorKind {
68    Gap,
69    Divergence,
70    Apply,
71    Decode,
72}
73
74impl ApplyErrorKind {
75    pub fn label(self) -> &'static str {
76        match self {
77            Self::Gap => "gap",
78            Self::Divergence => "divergence",
79            Self::Apply => "apply",
80            Self::Decode => "decode",
81        }
82    }
83}
84
85impl LogicalApplyError {
86    pub fn kind(&self) -> ApplyErrorKind {
87        match self {
88            Self::Gap { .. } => ApplyErrorKind::Gap,
89            Self::Divergence { .. } => ApplyErrorKind::Divergence,
90            Self::Apply { .. } => ApplyErrorKind::Apply,
91        }
92    }
93}
94
95/// Outcome of a single `apply` call. `Applied` advances the chain;
96/// `Idempotent` and `Skipped` are no-ops (we already saw an
97/// equal-or-newer LSN). `Gap` and `Divergence` (returned via
98/// `LogicalApplyError`) are fail-closed — callers (replica fetcher,
99/// restore loop) should mark the instance unhealthy and stop applying.
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum ApplyOutcome {
102    /// Normal monotonic advance.
103    Applied,
104    /// Same LSN as last applied with same payload hash — log + skip.
105    Idempotent,
106    /// Older LSN than what we already have — log + skip.
107    Skipped,
108}
109
110#[derive(Debug)]
111pub enum LogicalApplyError {
112    Gap {
113        last: u64,
114        next: u64,
115    },
116    Divergence {
117        lsn: u64,
118        expected: String,
119        got: String,
120    },
121    Apply {
122        lsn: u64,
123        source: RedDBError,
124    },
125}
126
127impl std::fmt::Display for LogicalApplyError {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        match self {
130            Self::Gap { last, next } => write!(f, "LSN gap on apply: last={last} next={next}"),
131            Self::Divergence { lsn, expected, got } => write!(
132                f,
133                "LSN divergence on apply at lsn={lsn}: expected payload hash {expected}, got {got}"
134            ),
135            Self::Apply { lsn, source } => write!(f, "apply error at lsn={lsn}: {source}"),
136        }
137    }
138}
139
140impl std::error::Error for LogicalApplyError {}
141
142/// Shared logical change applier so replica replay and PITR converge on the
143/// same semantics. Stateful (PLAN.md Phase 11.5): tracks the last applied
144/// LSN + payload hash so duplicates / older LSNs / gaps / divergences are
145/// detected explicitly.
146pub struct LogicalChangeApplier {
147    last_applied_lsn: AtomicU64,
148    last_payload_hash: Mutex<Option<[u8; 32]>>,
149}
150
151impl LogicalChangeApplier {
152    /// Build a fresh applier. `starting_lsn` is the LSN already
153    /// covered by the snapshot (or `0` for an empty replica). The
154    /// next acceptable record is any positive LSN; from there the
155    /// chain advances by 1.
156    pub fn new(starting_lsn: u64) -> Self {
157        Self {
158            last_applied_lsn: AtomicU64::new(starting_lsn),
159            last_payload_hash: Mutex::new(None),
160        }
161    }
162
163    pub fn last_applied_lsn(&self) -> u64 {
164        self.last_applied_lsn.load(Ordering::Acquire)
165    }
166
167    /// Apply one logical change record. The state machine:
168    /// - first record after `starting_lsn == 0` → apply, anchor.
169    /// - `lsn == last + 1` → apply, advance.
170    /// - `lsn == last` && payload hash equal → idempotent skip.
171    /// - `lsn == last` && payload hash differs → `Divergence` (fail closed).
172    /// - `lsn < last` → older replay, skip with debug log.
173    /// - `lsn > last + 1` → `Gap` (fail closed; caller marks unhealthy).
174    pub fn apply(
175        &self,
176        db: &RedDB,
177        record: &ChangeRecord,
178        mode: ApplyMode,
179    ) -> Result<ApplyOutcome, LogicalApplyError> {
180        let last = self.last_applied_lsn.load(Ordering::Acquire);
181        let payload_hash = record_payload_hash(record);
182
183        if last == 0 && record.lsn > 0 {
184            self.do_apply(db, record, mode)?;
185            self.last_applied_lsn.store(record.lsn, Ordering::Release);
186            *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
187            return Ok(ApplyOutcome::Applied);
188        }
189
190        if record.lsn == last {
191            let prior = *self.last_payload_hash.lock().expect("payload hash mutex");
192            return match prior {
193                Some(p) if p == payload_hash => Ok(ApplyOutcome::Idempotent),
194                Some(p) => Err(LogicalApplyError::Divergence {
195                    lsn: record.lsn,
196                    expected: hex_digest(&p),
197                    got: hex_digest(&payload_hash),
198                }),
199                None => Ok(ApplyOutcome::Idempotent),
200            };
201        }
202        if record.lsn < last {
203            return Ok(ApplyOutcome::Skipped);
204        }
205        if record.lsn > last + 1 {
206            return Err(LogicalApplyError::Gap {
207                last,
208                next: record.lsn,
209            });
210        }
211
212        self.do_apply(db, record, mode)?;
213        self.last_applied_lsn.store(record.lsn, Ordering::Release);
214        *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
215        Ok(ApplyOutcome::Applied)
216    }
217
218    fn do_apply(
219        &self,
220        db: &RedDB,
221        record: &ChangeRecord,
222        mode: ApplyMode,
223    ) -> Result<(), LogicalApplyError> {
224        Self::apply_record(db, record, mode).map_err(|err| LogicalApplyError::Apply {
225            lsn: record.lsn,
226            source: err,
227        })
228    }
229
230    /// Stateless apply — applies the record without monotonicity
231    /// checks. Kept for callers that don't yet thread the stateful
232    /// applier through. New code should prefer
233    /// `LogicalChangeApplier::new()` + `apply()`.
234    pub fn apply_record(db: &RedDB, record: &ChangeRecord, _mode: ApplyMode) -> RedDBResult<()> {
235        let store = db.store();
236        match record.operation {
237            ChangeOperation::Delete => {
238                let _ = store.delete(&record.collection, EntityId::new(record.entity_id));
239            }
240            ChangeOperation::Refresh => {
241                // Issue #596 slice 9d — replica replay of
242                // `REFRESH MATERIALIZED VIEW v`. The primary
243                // emitted the serialized backing-collection
244                // contents in `refresh_records`; apply the
245                // atomic swap on the replica's local store
246                // (which also persists a `RefreshCollection`
247                // WAL action so the post-swap contents survive
248                // a replica restart).
249                let records = record.refresh_records.clone().ok_or_else(|| {
250                    RedDBError::Internal(
251                        "replication refresh record missing refresh_records payload".to_string(),
252                    )
253                })?;
254                store
255                    .refresh_collection_from_records(&record.collection, records)
256                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
257            }
258            ChangeOperation::Insert | ChangeOperation::Update => {
259                let Some(bytes) = &record.entity_bytes else {
260                    return Err(RedDBError::Internal(
261                        "replication record missing entity payload".to_string(),
262                    ));
263                };
264                let entity = UnifiedStore::deserialize_entity(bytes, store.format_version())
265                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
266                let exists = store
267                    .get(&record.collection, EntityId::new(record.entity_id))
268                    .is_some();
269                if exists {
270                    let manager = store
271                        .get_collection(&record.collection)
272                        .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
273                    manager
274                        .update(entity.clone())
275                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
276                } else {
277                    store
278                        .insert_auto(&record.collection, entity.clone())
279                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
280                }
281                if let Some(metadata_json) = &record.metadata {
282                    let metadata = metadata_from_json(metadata_json)
283                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
284                    store
285                        .set_metadata(&record.collection, entity.id, metadata)
286                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
287                }
288                store
289                    .context_index()
290                    .index_entity(&record.collection, &entity);
291            }
292        }
293        Ok(())
294    }
295}
296
297fn record_payload_hash(record: &ChangeRecord) -> [u8; 32] {
298    let mut hasher = crate::crypto::sha256::Sha256::new();
299    hasher.update(&record.lsn.to_le_bytes());
300    hasher.update(&[record.operation as u8]);
301    hasher.update(record.collection.as_bytes());
302    hasher.update(&record.entity_id.to_le_bytes());
303    if let Some(bytes) = &record.entity_bytes {
304        hasher.update(bytes);
305    }
306    // Issue #596 slice 9d — refresh payload participates in the
307    // payload-hash so the same-LSN-idempotent / different-payload-
308    // divergence state machine works for Refresh records too.
309    if let Some(records) = &record.refresh_records {
310        hasher.update(&(records.len() as u64).to_le_bytes());
311        for r in records {
312            hasher.update(&(r.len() as u64).to_le_bytes());
313            hasher.update(r);
314        }
315    }
316    hasher.finalize()
317}
318
319fn hex_digest(bytes: &[u8; 32]) -> String {
320    crate::utils::to_hex(bytes)
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326    use crate::replication::cdc::ChangeOperation;
327    use crate::storage::schema::Value;
328    use crate::storage::{EntityData, EntityId, EntityKind, RedDB, RowData, UnifiedEntity};
329    use std::sync::Arc;
330
331    fn open_db() -> (RedDB, std::path::PathBuf) {
332        let path = std::env::temp_dir().join(format!(
333            "reddb_logical_apply_{}_{}",
334            std::process::id(),
335            std::time::SystemTime::now()
336                .duration_since(std::time::UNIX_EPOCH)
337                .unwrap()
338                .as_nanos()
339        ));
340        let _ = std::fs::remove_file(&path);
341        let db = RedDB::open(&path).unwrap();
342        (db, path)
343    }
344
345    fn record(lsn: u64, payload: &[u8]) -> ChangeRecord {
346        let timestamp = 100 + lsn;
347        let mut entity = UnifiedEntity::new(
348            EntityId::new(lsn),
349            EntityKind::TableRow {
350                table: Arc::from("users"),
351                row_id: lsn,
352            },
353            EntityData::Row(RowData::with_names(
354                vec![Value::UnsignedInteger(lsn), Value::Blob(payload.to_vec())],
355                vec!["id".to_string(), "payload".to_string()],
356            )),
357        );
358        entity.created_at = timestamp;
359        entity.updated_at = timestamp;
360        entity.sequence_id = lsn;
361        ChangeRecord::from_entity(
362            lsn,
363            timestamp,
364            ChangeOperation::Insert,
365            "users",
366            "row",
367            &entity,
368            crate::api::REDDB_FORMAT_VERSION,
369            None,
370        )
371    }
372
373    #[test]
374    fn apply_advances_on_monotonic_lsn() {
375        let (db, path) = open_db();
376        let applier = LogicalChangeApplier::new(0);
377        assert_eq!(
378            applier
379                .apply(&db, &record(1, b"a"), ApplyMode::Replica)
380                .unwrap(),
381            ApplyOutcome::Applied
382        );
383        assert_eq!(applier.last_applied_lsn(), 1);
384        assert_eq!(
385            applier
386                .apply(&db, &record(2, b"b"), ApplyMode::Replica)
387                .unwrap(),
388            ApplyOutcome::Applied
389        );
390        assert_eq!(applier.last_applied_lsn(), 2);
391        let _ = std::fs::remove_file(path);
392    }
393
394    #[test]
395    fn apply_idempotent_on_duplicate_lsn_same_payload() {
396        let (db, path) = open_db();
397        let applier = LogicalChangeApplier::new(0);
398        let r = record(5, b"same");
399        applier.apply(&db, &r, ApplyMode::Replica).unwrap();
400        assert_eq!(
401            applier.apply(&db, &r, ApplyMode::Replica).unwrap(),
402            ApplyOutcome::Idempotent
403        );
404        assert_eq!(applier.last_applied_lsn(), 5);
405        let _ = std::fs::remove_file(path);
406    }
407
408    #[test]
409    fn apply_fails_closed_on_lsn_collision_diff_payload() {
410        let (db, path) = open_db();
411        let applier = LogicalChangeApplier::new(0);
412        applier
413            .apply(&db, &record(7, b"first"), ApplyMode::Replica)
414            .unwrap();
415        let err = applier
416            .apply(&db, &record(7, b"different"), ApplyMode::Replica)
417            .unwrap_err();
418        assert!(
419            matches!(err, LogicalApplyError::Divergence { lsn: 7, .. }),
420            "got {err:?}"
421        );
422        let _ = std::fs::remove_file(path);
423    }
424
425    #[test]
426    fn apply_skips_older_lsn() {
427        let (db, path) = open_db();
428        let applier = LogicalChangeApplier::new(0);
429        applier
430            .apply(&db, &record(1, b"a"), ApplyMode::Replica)
431            .unwrap();
432        applier
433            .apply(&db, &record(2, b"b"), ApplyMode::Replica)
434            .unwrap();
435        assert_eq!(
436            applier
437                .apply(&db, &record(1, b"a"), ApplyMode::Replica)
438                .unwrap(),
439            ApplyOutcome::Skipped
440        );
441        assert_eq!(applier.last_applied_lsn(), 2);
442        let _ = std::fs::remove_file(path);
443    }
444
445    #[test]
446    fn apply_returns_gap_on_future_lsn() {
447        let (db, path) = open_db();
448        let applier = LogicalChangeApplier::new(0);
449        applier
450            .apply(&db, &record(1, b"a"), ApplyMode::Replica)
451            .unwrap();
452        let err = applier
453            .apply(&db, &record(5, b"e"), ApplyMode::Replica)
454            .unwrap_err();
455        assert!(
456            matches!(err, LogicalApplyError::Gap { last: 1, next: 5 }),
457            "got {err:?}"
458        );
459        assert_eq!(applier.last_applied_lsn(), 1);
460        let _ = std::fs::remove_file(path);
461    }
462}