1use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Mutex;
5
6use crate::api::{RedDBError, RedDBResult};
7use crate::application::entity::metadata_from_json;
8use crate::replication::cdc::{ChangeOperation, ChangeRecord};
9use crate::storage::{EntityId, RedDB, UnifiedStore};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ApplyMode {
13 Replica,
14 Restore,
15}
16
17#[derive(Debug, Default)]
23pub struct ReplicaApplyMetrics {
24 pub gap_total: std::sync::atomic::AtomicU64,
25 pub divergence_total: std::sync::atomic::AtomicU64,
26 pub apply_error_total: std::sync::atomic::AtomicU64,
27 pub decode_error_total: std::sync::atomic::AtomicU64,
28}
29
30impl ReplicaApplyMetrics {
31 pub fn record(&self, kind: ApplyErrorKind) {
32 use std::sync::atomic::Ordering::Relaxed;
33 match kind {
34 ApplyErrorKind::Gap => {
35 self.gap_total.fetch_add(1, Relaxed);
36 }
37 ApplyErrorKind::Divergence => {
38 self.divergence_total.fetch_add(1, Relaxed);
39 }
40 ApplyErrorKind::Apply => {
41 self.apply_error_total.fetch_add(1, Relaxed);
42 }
43 ApplyErrorKind::Decode => {
44 self.decode_error_total.fetch_add(1, Relaxed);
45 }
46 }
47 }
48
49 pub fn snapshot(&self) -> [(ApplyErrorKind, u64); 4] {
50 use std::sync::atomic::Ordering::Relaxed;
51 [
52 (ApplyErrorKind::Gap, self.gap_total.load(Relaxed)),
53 (
54 ApplyErrorKind::Divergence,
55 self.divergence_total.load(Relaxed),
56 ),
57 (ApplyErrorKind::Apply, self.apply_error_total.load(Relaxed)),
58 (
59 ApplyErrorKind::Decode,
60 self.decode_error_total.load(Relaxed),
61 ),
62 ]
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum ApplyErrorKind {
68 Gap,
69 Divergence,
70 Apply,
71 Decode,
72}
73
74impl ApplyErrorKind {
75 pub fn label(self) -> &'static str {
76 match self {
77 Self::Gap => "gap",
78 Self::Divergence => "divergence",
79 Self::Apply => "apply",
80 Self::Decode => "decode",
81 }
82 }
83}
84
85impl LogicalApplyError {
86 pub fn kind(&self) -> ApplyErrorKind {
87 match self {
88 Self::Gap { .. } => ApplyErrorKind::Gap,
89 Self::Divergence { .. } => ApplyErrorKind::Divergence,
90 Self::Apply { .. } => ApplyErrorKind::Apply,
91 }
92 }
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum ApplyOutcome {
102 Applied,
104 Idempotent,
106 Skipped,
108}
109
110#[derive(Debug)]
111pub enum LogicalApplyError {
112 Gap {
113 last: u64,
114 next: u64,
115 },
116 Divergence {
117 lsn: u64,
118 expected: String,
119 got: String,
120 },
121 Apply {
122 lsn: u64,
123 source: RedDBError,
124 },
125}
126
127impl std::fmt::Display for LogicalApplyError {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 match self {
130 Self::Gap { last, next } => write!(f, "LSN gap on apply: last={last} next={next}"),
131 Self::Divergence { lsn, expected, got } => write!(
132 f,
133 "LSN divergence on apply at lsn={lsn}: expected payload hash {expected}, got {got}"
134 ),
135 Self::Apply { lsn, source } => write!(f, "apply error at lsn={lsn}: {source}"),
136 }
137 }
138}
139
140impl std::error::Error for LogicalApplyError {}
141
142pub struct LogicalChangeApplier {
147 last_applied_lsn: AtomicU64,
148 last_payload_hash: Mutex<Option<[u8; 32]>>,
149}
150
151impl LogicalChangeApplier {
152 pub fn new(starting_lsn: u64) -> Self {
157 Self {
158 last_applied_lsn: AtomicU64::new(starting_lsn),
159 last_payload_hash: Mutex::new(None),
160 }
161 }
162
163 pub fn last_applied_lsn(&self) -> u64 {
164 self.last_applied_lsn.load(Ordering::Acquire)
165 }
166
167 pub fn apply(
175 &self,
176 db: &RedDB,
177 record: &ChangeRecord,
178 mode: ApplyMode,
179 ) -> Result<ApplyOutcome, LogicalApplyError> {
180 let last = self.last_applied_lsn.load(Ordering::Acquire);
181 let payload_hash = record_payload_hash(record);
182
183 if last == 0 && record.lsn > 0 {
184 self.do_apply(db, record, mode)?;
185 self.last_applied_lsn.store(record.lsn, Ordering::Release);
186 *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
187 return Ok(ApplyOutcome::Applied);
188 }
189
190 if record.lsn == last {
191 let prior = *self.last_payload_hash.lock().expect("payload hash mutex");
192 return match prior {
193 Some(p) if p == payload_hash => Ok(ApplyOutcome::Idempotent),
194 Some(p) => Err(LogicalApplyError::Divergence {
195 lsn: record.lsn,
196 expected: hex_digest(&p),
197 got: hex_digest(&payload_hash),
198 }),
199 None => Ok(ApplyOutcome::Idempotent),
200 };
201 }
202 if record.lsn < last {
203 return Ok(ApplyOutcome::Skipped);
204 }
205 if record.lsn > last + 1 {
206 return Err(LogicalApplyError::Gap {
207 last,
208 next: record.lsn,
209 });
210 }
211
212 self.do_apply(db, record, mode)?;
213 self.last_applied_lsn.store(record.lsn, Ordering::Release);
214 *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
215 Ok(ApplyOutcome::Applied)
216 }
217
218 fn do_apply(
219 &self,
220 db: &RedDB,
221 record: &ChangeRecord,
222 mode: ApplyMode,
223 ) -> Result<(), LogicalApplyError> {
224 Self::apply_record(db, record, mode).map_err(|err| LogicalApplyError::Apply {
225 lsn: record.lsn,
226 source: err,
227 })
228 }
229
230 pub fn apply_record(db: &RedDB, record: &ChangeRecord, _mode: ApplyMode) -> RedDBResult<()> {
235 let store = db.store();
236 match record.operation {
237 ChangeOperation::Delete => {
238 let _ = store.delete(&record.collection, EntityId::new(record.entity_id));
239 }
240 ChangeOperation::Refresh => {
241 let records = record.refresh_records.clone().ok_or_else(|| {
250 RedDBError::Internal(
251 "replication refresh record missing refresh_records payload".to_string(),
252 )
253 })?;
254 store
255 .refresh_collection_from_records(&record.collection, records)
256 .map_err(|err| RedDBError::Internal(err.to_string()))?;
257 }
258 ChangeOperation::Insert | ChangeOperation::Update => {
259 let Some(bytes) = &record.entity_bytes else {
260 return Err(RedDBError::Internal(
261 "replication record missing entity payload".to_string(),
262 ));
263 };
264 let entity = UnifiedStore::deserialize_entity(bytes, store.format_version())
265 .map_err(|err| RedDBError::Internal(err.to_string()))?;
266 let exists = store
267 .get(&record.collection, EntityId::new(record.entity_id))
268 .is_some();
269 if exists {
270 let manager = store
271 .get_collection(&record.collection)
272 .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
273 manager
274 .update(entity.clone())
275 .map_err(|err| RedDBError::Internal(err.to_string()))?;
276 } else {
277 store
278 .insert_auto(&record.collection, entity.clone())
279 .map_err(|err| RedDBError::Internal(err.to_string()))?;
280 }
281 if let Some(metadata_json) = &record.metadata {
282 let metadata = metadata_from_json(metadata_json)
283 .map_err(|err| RedDBError::Internal(err.to_string()))?;
284 store
285 .set_metadata(&record.collection, entity.id, metadata)
286 .map_err(|err| RedDBError::Internal(err.to_string()))?;
287 }
288 store
289 .context_index()
290 .index_entity(&record.collection, &entity);
291 }
292 }
293 Ok(())
294 }
295}
296
297fn record_payload_hash(record: &ChangeRecord) -> [u8; 32] {
298 let mut hasher = crate::crypto::sha256::Sha256::new();
299 hasher.update(&record.lsn.to_le_bytes());
300 hasher.update(&[record.operation as u8]);
301 hasher.update(record.collection.as_bytes());
302 hasher.update(&record.entity_id.to_le_bytes());
303 if let Some(bytes) = &record.entity_bytes {
304 hasher.update(bytes);
305 }
306 if let Some(records) = &record.refresh_records {
310 hasher.update(&(records.len() as u64).to_le_bytes());
311 for r in records {
312 hasher.update(&(r.len() as u64).to_le_bytes());
313 hasher.update(r);
314 }
315 }
316 hasher.finalize()
317}
318
319fn hex_digest(bytes: &[u8; 32]) -> String {
320 crate::utils::to_hex(bytes)
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326 use crate::replication::cdc::ChangeOperation;
327 use crate::storage::schema::Value;
328 use crate::storage::{EntityData, EntityId, EntityKind, RedDB, RowData, UnifiedEntity};
329 use std::sync::Arc;
330
331 fn open_db() -> (RedDB, std::path::PathBuf) {
332 let path = std::env::temp_dir().join(format!(
333 "reddb_logical_apply_{}_{}",
334 std::process::id(),
335 std::time::SystemTime::now()
336 .duration_since(std::time::UNIX_EPOCH)
337 .unwrap()
338 .as_nanos()
339 ));
340 let _ = std::fs::remove_file(&path);
341 let db = RedDB::open(&path).unwrap();
342 (db, path)
343 }
344
345 fn record(lsn: u64, payload: &[u8]) -> ChangeRecord {
346 let timestamp = 100 + lsn;
347 let mut entity = UnifiedEntity::new(
348 EntityId::new(lsn),
349 EntityKind::TableRow {
350 table: Arc::from("users"),
351 row_id: lsn,
352 },
353 EntityData::Row(RowData::with_names(
354 vec![Value::UnsignedInteger(lsn), Value::Blob(payload.to_vec())],
355 vec!["id".to_string(), "payload".to_string()],
356 )),
357 );
358 entity.created_at = timestamp;
359 entity.updated_at = timestamp;
360 entity.sequence_id = lsn;
361 ChangeRecord::from_entity(
362 lsn,
363 timestamp,
364 ChangeOperation::Insert,
365 "users",
366 "row",
367 &entity,
368 crate::api::REDDB_FORMAT_VERSION,
369 None,
370 )
371 }
372
373 #[test]
374 fn apply_advances_on_monotonic_lsn() {
375 let (db, path) = open_db();
376 let applier = LogicalChangeApplier::new(0);
377 assert_eq!(
378 applier
379 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
380 .unwrap(),
381 ApplyOutcome::Applied
382 );
383 assert_eq!(applier.last_applied_lsn(), 1);
384 assert_eq!(
385 applier
386 .apply(&db, &record(2, b"b"), ApplyMode::Replica)
387 .unwrap(),
388 ApplyOutcome::Applied
389 );
390 assert_eq!(applier.last_applied_lsn(), 2);
391 let _ = std::fs::remove_file(path);
392 }
393
394 #[test]
395 fn apply_idempotent_on_duplicate_lsn_same_payload() {
396 let (db, path) = open_db();
397 let applier = LogicalChangeApplier::new(0);
398 let r = record(5, b"same");
399 applier.apply(&db, &r, ApplyMode::Replica).unwrap();
400 assert_eq!(
401 applier.apply(&db, &r, ApplyMode::Replica).unwrap(),
402 ApplyOutcome::Idempotent
403 );
404 assert_eq!(applier.last_applied_lsn(), 5);
405 let _ = std::fs::remove_file(path);
406 }
407
408 #[test]
409 fn apply_fails_closed_on_lsn_collision_diff_payload() {
410 let (db, path) = open_db();
411 let applier = LogicalChangeApplier::new(0);
412 applier
413 .apply(&db, &record(7, b"first"), ApplyMode::Replica)
414 .unwrap();
415 let err = applier
416 .apply(&db, &record(7, b"different"), ApplyMode::Replica)
417 .unwrap_err();
418 assert!(
419 matches!(err, LogicalApplyError::Divergence { lsn: 7, .. }),
420 "got {err:?}"
421 );
422 let _ = std::fs::remove_file(path);
423 }
424
425 #[test]
426 fn apply_skips_older_lsn() {
427 let (db, path) = open_db();
428 let applier = LogicalChangeApplier::new(0);
429 applier
430 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
431 .unwrap();
432 applier
433 .apply(&db, &record(2, b"b"), ApplyMode::Replica)
434 .unwrap();
435 assert_eq!(
436 applier
437 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
438 .unwrap(),
439 ApplyOutcome::Skipped
440 );
441 assert_eq!(applier.last_applied_lsn(), 2);
442 let _ = std::fs::remove_file(path);
443 }
444
445 #[test]
446 fn apply_returns_gap_on_future_lsn() {
447 let (db, path) = open_db();
448 let applier = LogicalChangeApplier::new(0);
449 applier
450 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
451 .unwrap();
452 let err = applier
453 .apply(&db, &record(5, b"e"), ApplyMode::Replica)
454 .unwrap_err();
455 assert!(
456 matches!(err, LogicalApplyError::Gap { last: 1, next: 5 }),
457 "got {err:?}"
458 );
459 assert_eq!(applier.last_applied_lsn(), 1);
460 let _ = std::fs::remove_file(path);
461 }
462}