Skip to main content

reddb_server/replication/
logical.rs

1//! Logical replication helpers shared by replica apply and point-in-time restore.
2
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Mutex;
5
6use crate::api::{RedDBError, RedDBResult};
7use crate::application::entity::metadata_from_json;
8use crate::replication::cdc::{ChangeOperation, ChangeRecord};
9use crate::storage::{EntityId, RedDB, UnifiedStore};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ApplyMode {
13    Replica,
14    Restore,
15}
16
17/// PLAN.md Phase 11.5 — counters the replica apply loop bumps when an
18/// invariant breaks. Surfaced via `reddb_replica_apply_errors_total`.
19/// Decode errors aren't strictly apply errors but they share the same
20/// observability lane so dashboards alert on "replica is ingesting
21/// trash from primary regardless of cause".
22#[derive(Debug, Default)]
23pub struct ReplicaApplyMetrics {
24    pub gap_total: std::sync::atomic::AtomicU64,
25    pub divergence_total: std::sync::atomic::AtomicU64,
26    pub apply_error_total: std::sync::atomic::AtomicU64,
27    pub decode_error_total: std::sync::atomic::AtomicU64,
28}
29
30impl ReplicaApplyMetrics {
31    pub fn record(&self, kind: ApplyErrorKind) {
32        use std::sync::atomic::Ordering::Relaxed;
33        match kind {
34            ApplyErrorKind::Gap => {
35                self.gap_total.fetch_add(1, Relaxed);
36            }
37            ApplyErrorKind::Divergence => {
38                self.divergence_total.fetch_add(1, Relaxed);
39            }
40            ApplyErrorKind::Apply => {
41                self.apply_error_total.fetch_add(1, Relaxed);
42            }
43            ApplyErrorKind::Decode => {
44                self.decode_error_total.fetch_add(1, Relaxed);
45            }
46        }
47    }
48
49    pub fn snapshot(&self) -> [(ApplyErrorKind, u64); 4] {
50        use std::sync::atomic::Ordering::Relaxed;
51        [
52            (ApplyErrorKind::Gap, self.gap_total.load(Relaxed)),
53            (
54                ApplyErrorKind::Divergence,
55                self.divergence_total.load(Relaxed),
56            ),
57            (ApplyErrorKind::Apply, self.apply_error_total.load(Relaxed)),
58            (
59                ApplyErrorKind::Decode,
60                self.decode_error_total.load(Relaxed),
61            ),
62        ]
63    }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum ApplyErrorKind {
68    Gap,
69    Divergence,
70    Apply,
71    Decode,
72}
73
74impl ApplyErrorKind {
75    pub fn label(self) -> &'static str {
76        match self {
77            Self::Gap => "gap",
78            Self::Divergence => "divergence",
79            Self::Apply => "apply",
80            Self::Decode => "decode",
81        }
82    }
83}
84
85impl LogicalApplyError {
86    pub fn kind(&self) -> ApplyErrorKind {
87        match self {
88            Self::Gap { .. } => ApplyErrorKind::Gap,
89            Self::Divergence { .. } => ApplyErrorKind::Divergence,
90            Self::Apply { .. } => ApplyErrorKind::Apply,
91        }
92    }
93}
94
95/// Outcome of a single `apply` call. `Applied` advances the chain;
96/// `Idempotent` and `Skipped` are no-ops (we already saw an
97/// equal-or-newer LSN). `Gap` and `Divergence` (returned via
98/// `LogicalApplyError`) are fail-closed — callers (replica fetcher,
99/// restore loop) should mark the instance unhealthy and stop applying.
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum ApplyOutcome {
102    /// Normal monotonic advance.
103    Applied,
104    /// Same LSN as last applied with same payload hash — log + skip.
105    Idempotent,
106    /// Older LSN than what we already have — log + skip.
107    Skipped,
108}
109
110#[derive(Debug)]
111pub enum LogicalApplyError {
112    Gap {
113        last: u64,
114        next: u64,
115    },
116    Divergence {
117        lsn: u64,
118        expected: String,
119        got: String,
120    },
121    Apply {
122        lsn: u64,
123        source: RedDBError,
124    },
125}
126
127impl std::fmt::Display for LogicalApplyError {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        match self {
130            Self::Gap { last, next } => write!(f, "LSN gap on apply: last={last} next={next}"),
131            Self::Divergence { lsn, expected, got } => write!(
132                f,
133                "LSN divergence on apply at lsn={lsn}: expected payload hash {expected}, got {got}"
134            ),
135            Self::Apply { lsn, source } => write!(f, "apply error at lsn={lsn}: {source}"),
136        }
137    }
138}
139
140impl std::error::Error for LogicalApplyError {}
141
142/// Shared logical change applier so replica replay and PITR converge on the
143/// same semantics. Stateful (PLAN.md Phase 11.5): tracks the last applied
144/// LSN + payload hash so duplicates / older LSNs / gaps / divergences are
145/// detected explicitly.
146pub struct LogicalChangeApplier {
147    last_applied_lsn: AtomicU64,
148    last_payload_hash: Mutex<Option<[u8; 32]>>,
149}
150
151impl LogicalChangeApplier {
152    /// Build a fresh applier. `starting_lsn` is the LSN already
153    /// covered by the snapshot (or `0` for an empty replica). The
154    /// next acceptable record is any positive LSN; from there the
155    /// chain advances by 1.
156    pub fn new(starting_lsn: u64) -> Self {
157        Self {
158            last_applied_lsn: AtomicU64::new(starting_lsn),
159            last_payload_hash: Mutex::new(None),
160        }
161    }
162
163    pub fn last_applied_lsn(&self) -> u64 {
164        self.last_applied_lsn.load(Ordering::Acquire)
165    }
166
167    /// Apply one logical change record. The state machine:
168    /// - first record after `starting_lsn == 0` → apply, anchor.
169    /// - `lsn == last + 1` → apply, advance.
170    /// - `lsn == last` && payload hash equal → idempotent skip.
171    /// - `lsn == last` && payload hash differs → `Divergence` (fail closed).
172    /// - `lsn < last` → older replay, skip with debug log.
173    /// - `lsn > last + 1` → `Gap` (fail closed; caller marks unhealthy).
174    pub fn apply(
175        &self,
176        db: &RedDB,
177        record: &ChangeRecord,
178        mode: ApplyMode,
179    ) -> Result<ApplyOutcome, LogicalApplyError> {
180        let last = self.last_applied_lsn.load(Ordering::Acquire);
181        let payload_hash = record_payload_hash(record);
182
183        if last == 0 && record.lsn > 0 {
184            self.do_apply(db, record, mode)?;
185            self.last_applied_lsn.store(record.lsn, Ordering::Release);
186            *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
187            return Ok(ApplyOutcome::Applied);
188        }
189
190        if record.lsn == last {
191            let prior = *self.last_payload_hash.lock().expect("payload hash mutex");
192            return match prior {
193                Some(p) if p == payload_hash => Ok(ApplyOutcome::Idempotent),
194                Some(p) => Err(LogicalApplyError::Divergence {
195                    lsn: record.lsn,
196                    expected: hex_digest(&p),
197                    got: hex_digest(&payload_hash),
198                }),
199                None => Ok(ApplyOutcome::Idempotent),
200            };
201        }
202        if record.lsn < last {
203            return Ok(ApplyOutcome::Skipped);
204        }
205        if record.lsn > last + 1 {
206            return Err(LogicalApplyError::Gap {
207                last,
208                next: record.lsn,
209            });
210        }
211
212        self.do_apply(db, record, mode)?;
213        self.last_applied_lsn.store(record.lsn, Ordering::Release);
214        *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
215        Ok(ApplyOutcome::Applied)
216    }
217
218    fn do_apply(
219        &self,
220        db: &RedDB,
221        record: &ChangeRecord,
222        mode: ApplyMode,
223    ) -> Result<(), LogicalApplyError> {
224        Self::apply_record(db, record, mode).map_err(|err| LogicalApplyError::Apply {
225            lsn: record.lsn,
226            source: err,
227        })
228    }
229
230    /// Stateless apply — applies the record without monotonicity
231    /// checks. Kept for callers that don't yet thread the stateful
232    /// applier through. New code should prefer
233    /// `LogicalChangeApplier::new()` + `apply()`.
234    pub fn apply_record(db: &RedDB, record: &ChangeRecord, _mode: ApplyMode) -> RedDBResult<()> {
235        let store = db.store();
236        match record.operation {
237            ChangeOperation::Delete => {
238                let _ = store.delete(&record.collection, EntityId::new(record.entity_id));
239            }
240            ChangeOperation::Insert | ChangeOperation::Update => {
241                let Some(bytes) = &record.entity_bytes else {
242                    return Err(RedDBError::Internal(
243                        "replication record missing entity payload".to_string(),
244                    ));
245                };
246                let entity = UnifiedStore::deserialize_entity(bytes, store.format_version())
247                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
248                let exists = store
249                    .get(&record.collection, EntityId::new(record.entity_id))
250                    .is_some();
251                if exists {
252                    let manager = store
253                        .get_collection(&record.collection)
254                        .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
255                    manager
256                        .update(entity.clone())
257                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
258                } else {
259                    store
260                        .insert_auto(&record.collection, entity.clone())
261                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
262                }
263                if let Some(metadata_json) = &record.metadata {
264                    let metadata = metadata_from_json(metadata_json)
265                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
266                    store
267                        .set_metadata(&record.collection, entity.id, metadata)
268                        .map_err(|err| RedDBError::Internal(err.to_string()))?;
269                }
270                store
271                    .context_index()
272                    .index_entity(&record.collection, &entity);
273            }
274        }
275        Ok(())
276    }
277}
278
279fn record_payload_hash(record: &ChangeRecord) -> [u8; 32] {
280    let mut hasher = crate::crypto::sha256::Sha256::new();
281    hasher.update(&record.lsn.to_le_bytes());
282    hasher.update(&[record.operation as u8]);
283    hasher.update(record.collection.as_bytes());
284    hasher.update(&record.entity_id.to_le_bytes());
285    if let Some(bytes) = &record.entity_bytes {
286        hasher.update(bytes);
287    }
288    hasher.finalize()
289}
290
291fn hex_digest(bytes: &[u8; 32]) -> String {
292    crate::utils::to_hex(bytes)
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use crate::replication::cdc::ChangeOperation;
299    use crate::storage::schema::Value;
300    use crate::storage::{EntityData, EntityId, EntityKind, RedDB, RowData, UnifiedEntity};
301    use std::sync::Arc;
302
303    fn open_db() -> (RedDB, std::path::PathBuf) {
304        let path = std::env::temp_dir().join(format!(
305            "reddb_logical_apply_{}_{}",
306            std::process::id(),
307            std::time::SystemTime::now()
308                .duration_since(std::time::UNIX_EPOCH)
309                .unwrap()
310                .as_nanos()
311        ));
312        let _ = std::fs::remove_file(&path);
313        let db = RedDB::open(&path).unwrap();
314        (db, path)
315    }
316
317    fn record(lsn: u64, payload: &[u8]) -> ChangeRecord {
318        let timestamp = 100 + lsn;
319        let mut entity = UnifiedEntity::new(
320            EntityId::new(lsn),
321            EntityKind::TableRow {
322                table: Arc::from("users"),
323                row_id: lsn,
324            },
325            EntityData::Row(RowData::with_names(
326                vec![Value::UnsignedInteger(lsn), Value::Blob(payload.to_vec())],
327                vec!["id".to_string(), "payload".to_string()],
328            )),
329        );
330        entity.created_at = timestamp;
331        entity.updated_at = timestamp;
332        entity.sequence_id = lsn;
333        ChangeRecord::from_entity(
334            lsn,
335            timestamp,
336            ChangeOperation::Insert,
337            "users",
338            "row",
339            &entity,
340            crate::api::REDDB_FORMAT_VERSION,
341            None,
342        )
343    }
344
345    #[test]
346    fn apply_advances_on_monotonic_lsn() {
347        let (db, path) = open_db();
348        let applier = LogicalChangeApplier::new(0);
349        assert_eq!(
350            applier
351                .apply(&db, &record(1, b"a"), ApplyMode::Replica)
352                .unwrap(),
353            ApplyOutcome::Applied
354        );
355        assert_eq!(applier.last_applied_lsn(), 1);
356        assert_eq!(
357            applier
358                .apply(&db, &record(2, b"b"), ApplyMode::Replica)
359                .unwrap(),
360            ApplyOutcome::Applied
361        );
362        assert_eq!(applier.last_applied_lsn(), 2);
363        let _ = std::fs::remove_file(path);
364    }
365
366    #[test]
367    fn apply_idempotent_on_duplicate_lsn_same_payload() {
368        let (db, path) = open_db();
369        let applier = LogicalChangeApplier::new(0);
370        let r = record(5, b"same");
371        applier.apply(&db, &r, ApplyMode::Replica).unwrap();
372        assert_eq!(
373            applier.apply(&db, &r, ApplyMode::Replica).unwrap(),
374            ApplyOutcome::Idempotent
375        );
376        assert_eq!(applier.last_applied_lsn(), 5);
377        let _ = std::fs::remove_file(path);
378    }
379
380    #[test]
381    fn apply_fails_closed_on_lsn_collision_diff_payload() {
382        let (db, path) = open_db();
383        let applier = LogicalChangeApplier::new(0);
384        applier
385            .apply(&db, &record(7, b"first"), ApplyMode::Replica)
386            .unwrap();
387        let err = applier
388            .apply(&db, &record(7, b"different"), ApplyMode::Replica)
389            .unwrap_err();
390        assert!(
391            matches!(err, LogicalApplyError::Divergence { lsn: 7, .. }),
392            "got {err:?}"
393        );
394        let _ = std::fs::remove_file(path);
395    }
396
397    #[test]
398    fn apply_skips_older_lsn() {
399        let (db, path) = open_db();
400        let applier = LogicalChangeApplier::new(0);
401        applier
402            .apply(&db, &record(1, b"a"), ApplyMode::Replica)
403            .unwrap();
404        applier
405            .apply(&db, &record(2, b"b"), ApplyMode::Replica)
406            .unwrap();
407        assert_eq!(
408            applier
409                .apply(&db, &record(1, b"a"), ApplyMode::Replica)
410                .unwrap(),
411            ApplyOutcome::Skipped
412        );
413        assert_eq!(applier.last_applied_lsn(), 2);
414        let _ = std::fs::remove_file(path);
415    }
416
417    #[test]
418    fn apply_returns_gap_on_future_lsn() {
419        let (db, path) = open_db();
420        let applier = LogicalChangeApplier::new(0);
421        applier
422            .apply(&db, &record(1, b"a"), ApplyMode::Replica)
423            .unwrap();
424        let err = applier
425            .apply(&db, &record(5, b"e"), ApplyMode::Replica)
426            .unwrap_err();
427        assert!(
428            matches!(err, LogicalApplyError::Gap { last: 1, next: 5 }),
429            "got {err:?}"
430        );
431        assert_eq!(applier.last_applied_lsn(), 1);
432        let _ = std::fs::remove_file(path);
433    }
434}