1use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Mutex;
5
6use crate::api::{RedDBError, RedDBResult};
7use crate::application::entity::metadata_from_json;
8use crate::replication::cdc::{ChangeOperation, ChangeRecord};
9use crate::storage::{EntityId, RedDB, UnifiedStore};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ApplyMode {
13 Replica,
14 Restore,
15}
16
17#[derive(Debug, Default)]
23pub struct ReplicaApplyMetrics {
24 pub gap_total: std::sync::atomic::AtomicU64,
25 pub divergence_total: std::sync::atomic::AtomicU64,
26 pub apply_error_total: std::sync::atomic::AtomicU64,
27 pub decode_error_total: std::sync::atomic::AtomicU64,
28}
29
30impl ReplicaApplyMetrics {
31 pub fn record(&self, kind: ApplyErrorKind) {
32 use std::sync::atomic::Ordering::Relaxed;
33 match kind {
34 ApplyErrorKind::Gap => {
35 self.gap_total.fetch_add(1, Relaxed);
36 }
37 ApplyErrorKind::Divergence => {
38 self.divergence_total.fetch_add(1, Relaxed);
39 }
40 ApplyErrorKind::Apply => {
41 self.apply_error_total.fetch_add(1, Relaxed);
42 }
43 ApplyErrorKind::Decode => {
44 self.decode_error_total.fetch_add(1, Relaxed);
45 }
46 }
47 }
48
49 pub fn snapshot(&self) -> [(ApplyErrorKind, u64); 4] {
50 use std::sync::atomic::Ordering::Relaxed;
51 [
52 (ApplyErrorKind::Gap, self.gap_total.load(Relaxed)),
53 (
54 ApplyErrorKind::Divergence,
55 self.divergence_total.load(Relaxed),
56 ),
57 (ApplyErrorKind::Apply, self.apply_error_total.load(Relaxed)),
58 (
59 ApplyErrorKind::Decode,
60 self.decode_error_total.load(Relaxed),
61 ),
62 ]
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum ApplyErrorKind {
68 Gap,
69 Divergence,
70 Apply,
71 Decode,
72}
73
74impl ApplyErrorKind {
75 pub fn label(self) -> &'static str {
76 match self {
77 Self::Gap => "gap",
78 Self::Divergence => "divergence",
79 Self::Apply => "apply",
80 Self::Decode => "decode",
81 }
82 }
83}
84
85impl LogicalApplyError {
86 pub fn kind(&self) -> ApplyErrorKind {
87 match self {
88 Self::Gap { .. } => ApplyErrorKind::Gap,
89 Self::Divergence { .. } => ApplyErrorKind::Divergence,
90 Self::Apply { .. } => ApplyErrorKind::Apply,
91 }
92 }
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum ApplyOutcome {
102 Applied,
104 Idempotent,
106 Skipped,
108}
109
110#[derive(Debug)]
111pub enum LogicalApplyError {
112 Gap {
113 last: u64,
114 next: u64,
115 },
116 Divergence {
117 lsn: u64,
118 expected: String,
119 got: String,
120 },
121 Apply {
122 lsn: u64,
123 source: RedDBError,
124 },
125}
126
127impl std::fmt::Display for LogicalApplyError {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 match self {
130 Self::Gap { last, next } => write!(f, "LSN gap on apply: last={last} next={next}"),
131 Self::Divergence { lsn, expected, got } => write!(
132 f,
133 "LSN divergence on apply at lsn={lsn}: expected payload hash {expected}, got {got}"
134 ),
135 Self::Apply { lsn, source } => write!(f, "apply error at lsn={lsn}: {source}"),
136 }
137 }
138}
139
140impl std::error::Error for LogicalApplyError {}
141
142pub struct LogicalChangeApplier {
147 last_applied_lsn: AtomicU64,
148 last_payload_hash: Mutex<Option<[u8; 32]>>,
149}
150
151impl LogicalChangeApplier {
152 pub fn new(starting_lsn: u64) -> Self {
157 Self {
158 last_applied_lsn: AtomicU64::new(starting_lsn),
159 last_payload_hash: Mutex::new(None),
160 }
161 }
162
163 pub fn last_applied_lsn(&self) -> u64 {
164 self.last_applied_lsn.load(Ordering::Acquire)
165 }
166
167 pub fn apply(
175 &self,
176 db: &RedDB,
177 record: &ChangeRecord,
178 mode: ApplyMode,
179 ) -> Result<ApplyOutcome, LogicalApplyError> {
180 let last = self.last_applied_lsn.load(Ordering::Acquire);
181 let payload_hash = record_payload_hash(record);
182
183 if last == 0 && record.lsn > 0 {
184 self.do_apply(db, record, mode)?;
185 self.last_applied_lsn.store(record.lsn, Ordering::Release);
186 *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
187 return Ok(ApplyOutcome::Applied);
188 }
189
190 if record.lsn == last {
191 let prior = *self.last_payload_hash.lock().expect("payload hash mutex");
192 return match prior {
193 Some(p) if p == payload_hash => Ok(ApplyOutcome::Idempotent),
194 Some(p) => Err(LogicalApplyError::Divergence {
195 lsn: record.lsn,
196 expected: hex_digest(&p),
197 got: hex_digest(&payload_hash),
198 }),
199 None => Ok(ApplyOutcome::Idempotent),
200 };
201 }
202 if record.lsn < last {
203 return Ok(ApplyOutcome::Skipped);
204 }
205 if record.lsn > last + 1 {
206 return Err(LogicalApplyError::Gap {
207 last,
208 next: record.lsn,
209 });
210 }
211
212 self.do_apply(db, record, mode)?;
213 self.last_applied_lsn.store(record.lsn, Ordering::Release);
214 *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
215 Ok(ApplyOutcome::Applied)
216 }
217
218 fn do_apply(
219 &self,
220 db: &RedDB,
221 record: &ChangeRecord,
222 mode: ApplyMode,
223 ) -> Result<(), LogicalApplyError> {
224 Self::apply_record(db, record, mode).map_err(|err| LogicalApplyError::Apply {
225 lsn: record.lsn,
226 source: err,
227 })
228 }
229
230 pub fn apply_record(db: &RedDB, record: &ChangeRecord, _mode: ApplyMode) -> RedDBResult<()> {
235 let store = db.store();
236 match record.operation {
237 ChangeOperation::Delete => {
238 let _ = store.delete(&record.collection, EntityId::new(record.entity_id));
239 }
240 ChangeOperation::Insert | ChangeOperation::Update => {
241 let Some(bytes) = &record.entity_bytes else {
242 return Err(RedDBError::Internal(
243 "replication record missing entity payload".to_string(),
244 ));
245 };
246 let entity = UnifiedStore::deserialize_entity(bytes, store.format_version())
247 .map_err(|err| RedDBError::Internal(err.to_string()))?;
248 let exists = store
249 .get(&record.collection, EntityId::new(record.entity_id))
250 .is_some();
251 if exists {
252 let manager = store
253 .get_collection(&record.collection)
254 .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
255 manager
256 .update(entity.clone())
257 .map_err(|err| RedDBError::Internal(err.to_string()))?;
258 } else {
259 store
260 .insert_auto(&record.collection, entity.clone())
261 .map_err(|err| RedDBError::Internal(err.to_string()))?;
262 }
263 if let Some(metadata_json) = &record.metadata {
264 let metadata = metadata_from_json(metadata_json)
265 .map_err(|err| RedDBError::Internal(err.to_string()))?;
266 store
267 .set_metadata(&record.collection, entity.id, metadata)
268 .map_err(|err| RedDBError::Internal(err.to_string()))?;
269 }
270 store
271 .context_index()
272 .index_entity(&record.collection, &entity);
273 }
274 }
275 Ok(())
276 }
277}
278
279fn record_payload_hash(record: &ChangeRecord) -> [u8; 32] {
280 let mut hasher = crate::crypto::sha256::Sha256::new();
281 hasher.update(&record.lsn.to_le_bytes());
282 hasher.update(&[record.operation as u8]);
283 hasher.update(record.collection.as_bytes());
284 hasher.update(&record.entity_id.to_le_bytes());
285 if let Some(bytes) = &record.entity_bytes {
286 hasher.update(bytes);
287 }
288 hasher.finalize()
289}
290
291fn hex_digest(bytes: &[u8; 32]) -> String {
292 crate::utils::to_hex(bytes)
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 use crate::replication::cdc::ChangeOperation;
299 use crate::storage::schema::Value;
300 use crate::storage::{EntityData, EntityId, EntityKind, RedDB, RowData, UnifiedEntity};
301 use std::sync::Arc;
302
303 fn open_db() -> (RedDB, std::path::PathBuf) {
304 let path = std::env::temp_dir().join(format!(
305 "reddb_logical_apply_{}_{}",
306 std::process::id(),
307 std::time::SystemTime::now()
308 .duration_since(std::time::UNIX_EPOCH)
309 .unwrap()
310 .as_nanos()
311 ));
312 let _ = std::fs::remove_file(&path);
313 let db = RedDB::open(&path).unwrap();
314 (db, path)
315 }
316
317 fn record(lsn: u64, payload: &[u8]) -> ChangeRecord {
318 let timestamp = 100 + lsn;
319 let mut entity = UnifiedEntity::new(
320 EntityId::new(lsn),
321 EntityKind::TableRow {
322 table: Arc::from("users"),
323 row_id: lsn,
324 },
325 EntityData::Row(RowData::with_names(
326 vec![Value::UnsignedInteger(lsn), Value::Blob(payload.to_vec())],
327 vec!["id".to_string(), "payload".to_string()],
328 )),
329 );
330 entity.created_at = timestamp;
331 entity.updated_at = timestamp;
332 entity.sequence_id = lsn;
333 ChangeRecord::from_entity(
334 lsn,
335 timestamp,
336 ChangeOperation::Insert,
337 "users",
338 "row",
339 &entity,
340 crate::api::REDDB_FORMAT_VERSION,
341 None,
342 )
343 }
344
345 #[test]
346 fn apply_advances_on_monotonic_lsn() {
347 let (db, path) = open_db();
348 let applier = LogicalChangeApplier::new(0);
349 assert_eq!(
350 applier
351 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
352 .unwrap(),
353 ApplyOutcome::Applied
354 );
355 assert_eq!(applier.last_applied_lsn(), 1);
356 assert_eq!(
357 applier
358 .apply(&db, &record(2, b"b"), ApplyMode::Replica)
359 .unwrap(),
360 ApplyOutcome::Applied
361 );
362 assert_eq!(applier.last_applied_lsn(), 2);
363 let _ = std::fs::remove_file(path);
364 }
365
366 #[test]
367 fn apply_idempotent_on_duplicate_lsn_same_payload() {
368 let (db, path) = open_db();
369 let applier = LogicalChangeApplier::new(0);
370 let r = record(5, b"same");
371 applier.apply(&db, &r, ApplyMode::Replica).unwrap();
372 assert_eq!(
373 applier.apply(&db, &r, ApplyMode::Replica).unwrap(),
374 ApplyOutcome::Idempotent
375 );
376 assert_eq!(applier.last_applied_lsn(), 5);
377 let _ = std::fs::remove_file(path);
378 }
379
380 #[test]
381 fn apply_fails_closed_on_lsn_collision_diff_payload() {
382 let (db, path) = open_db();
383 let applier = LogicalChangeApplier::new(0);
384 applier
385 .apply(&db, &record(7, b"first"), ApplyMode::Replica)
386 .unwrap();
387 let err = applier
388 .apply(&db, &record(7, b"different"), ApplyMode::Replica)
389 .unwrap_err();
390 assert!(
391 matches!(err, LogicalApplyError::Divergence { lsn: 7, .. }),
392 "got {err:?}"
393 );
394 let _ = std::fs::remove_file(path);
395 }
396
397 #[test]
398 fn apply_skips_older_lsn() {
399 let (db, path) = open_db();
400 let applier = LogicalChangeApplier::new(0);
401 applier
402 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
403 .unwrap();
404 applier
405 .apply(&db, &record(2, b"b"), ApplyMode::Replica)
406 .unwrap();
407 assert_eq!(
408 applier
409 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
410 .unwrap(),
411 ApplyOutcome::Skipped
412 );
413 assert_eq!(applier.last_applied_lsn(), 2);
414 let _ = std::fs::remove_file(path);
415 }
416
417 #[test]
418 fn apply_returns_gap_on_future_lsn() {
419 let (db, path) = open_db();
420 let applier = LogicalChangeApplier::new(0);
421 applier
422 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
423 .unwrap();
424 let err = applier
425 .apply(&db, &record(5, b"e"), ApplyMode::Replica)
426 .unwrap_err();
427 assert!(
428 matches!(err, LogicalApplyError::Gap { last: 1, next: 5 }),
429 "got {err:?}"
430 );
431 assert_eq!(applier.last_applied_lsn(), 1);
432 let _ = std::fs::remove_file(path);
433 }
434}