1use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Condvar, Mutex};
5
6use crate::api::{RedDBError, RedDBResult};
7use crate::application::entity::metadata_from_json;
8use crate::replication::cdc::{
9 change_record_from_entity, wire_json_to_server_json, ChangeOperation, ChangeRecord,
10 RangeAdmitError, RangeAuthority,
11};
12use crate::storage::{EntityId, EntityKind, RedDB, UnifiedStore};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum ApplyMode {
16 Replica,
17 Restore,
18}
19
20#[derive(Debug, Default)]
26pub struct ReplicaApplyMetrics {
27 pub gap_total: std::sync::atomic::AtomicU64,
28 pub divergence_total: std::sync::atomic::AtomicU64,
29 pub apply_error_total: std::sync::atomic::AtomicU64,
30 pub decode_error_total: std::sync::atomic::AtomicU64,
31 pub apply_miss_total: std::sync::atomic::AtomicU64,
37 pub fenced_total: std::sync::atomic::AtomicU64,
43}
44
45impl ReplicaApplyMetrics {
46 pub fn record(&self, kind: ApplyErrorKind) {
47 use std::sync::atomic::Ordering::Relaxed;
48 match kind {
49 ApplyErrorKind::Gap => {
50 self.gap_total.fetch_add(1, Relaxed);
51 }
52 ApplyErrorKind::Divergence => {
53 self.divergence_total.fetch_add(1, Relaxed);
54 }
55 ApplyErrorKind::Apply => {
56 self.apply_error_total.fetch_add(1, Relaxed);
57 }
58 ApplyErrorKind::Decode => {
59 self.decode_error_total.fetch_add(1, Relaxed);
60 }
61 ApplyErrorKind::Miss => {
62 self.apply_miss_total.fetch_add(1, Relaxed);
63 }
64 ApplyErrorKind::Fenced => {
65 self.fenced_total.fetch_add(1, Relaxed);
66 }
67 }
68 }
69
70 pub fn snapshot(&self) -> [(ApplyErrorKind, u64); 6] {
71 use std::sync::atomic::Ordering::Relaxed;
72 [
73 (ApplyErrorKind::Gap, self.gap_total.load(Relaxed)),
74 (
75 ApplyErrorKind::Divergence,
76 self.divergence_total.load(Relaxed),
77 ),
78 (ApplyErrorKind::Apply, self.apply_error_total.load(Relaxed)),
79 (
80 ApplyErrorKind::Decode,
81 self.decode_error_total.load(Relaxed),
82 ),
83 (ApplyErrorKind::Miss, self.apply_miss_total.load(Relaxed)),
84 (ApplyErrorKind::Fenced, self.fenced_total.load(Relaxed)),
85 ]
86 }
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum ApplyErrorKind {
91 Gap,
92 Divergence,
93 Apply,
94 Decode,
95 Miss,
98 Fenced,
101}
102
103impl ApplyErrorKind {
104 pub fn label(self) -> &'static str {
105 match self {
106 Self::Gap => "gap",
107 Self::Divergence => "divergence",
108 Self::Apply => "apply",
109 Self::Decode => "decode",
110 Self::Miss => "apply_miss",
111 Self::Fenced => "fenced",
112 }
113 }
114}
115
116impl LogicalApplyError {
117 pub fn kind(&self) -> ApplyErrorKind {
118 match self {
119 Self::Gap { .. } => ApplyErrorKind::Gap,
120 Self::Divergence { .. } => ApplyErrorKind::Divergence,
121 Self::Apply { .. } => ApplyErrorKind::Apply,
122 Self::StaleTermFenced { .. } => ApplyErrorKind::Fenced,
123 Self::RangeFenced { .. } => ApplyErrorKind::Fenced,
124 }
125 }
126}
127
128#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum ApplyOutcome {
135 Applied,
137 Idempotent,
139 Skipped,
141}
142
143#[derive(Debug)]
144pub enum LogicalApplyError {
145 Gap {
146 last: u64,
147 next: u64,
148 },
149 Divergence {
150 expected_term: u64,
151 got_term: u64,
152 lsn: u64,
153 expected: String,
154 got: String,
155 },
156 Apply {
157 lsn: u64,
158 source: RedDBError,
159 },
160 StaleTermFenced {
165 record_term: u64,
166 current_term: u64,
167 lsn: u64,
168 },
169 RangeFenced {
177 range_id: u64,
178 lsn: u64,
179 reason: RangeAdmitError,
180 },
181}
182
183impl std::fmt::Display for LogicalApplyError {
184 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185 match self {
186 Self::Gap { last, next } => write!(f, "LSN gap on apply: last={last} next={next}"),
187 Self::StaleTermFenced {
188 record_term,
189 current_term,
190 lsn,
191 } => write!(
192 f,
193 "stale-term record fenced at lsn={lsn}: record term {record_term} is behind current term {current_term}"
194 ),
195 Self::RangeFenced {
196 range_id,
197 lsn,
198 reason,
199 } => match reason {
200 RangeAdmitError::StaleTerm {
201 record_term,
202 accepted_term,
203 } => write!(
204 f,
205 "range-stale record fenced at lsn={lsn} for range {range_id}: record term {record_term} is behind accepted term {accepted_term}"
206 ),
207 RangeAdmitError::StaleOwnershipEpoch {
208 record_epoch,
209 accepted_epoch,
210 } => write!(
211 f,
212 "range-stale record fenced at lsn={lsn} for range {range_id}: ownership epoch {record_epoch} is behind accepted epoch {accepted_epoch}"
213 ),
214 },
215 Self::Divergence {
216 expected_term,
217 got_term,
218 lsn,
219 expected,
220 got,
221 } => write!(
222 f,
223 "LSN divergence on apply at term/lsn=({got_term},{lsn}): expected term {expected_term} payload hash {expected}, got {got}"
224 ),
225 Self::Apply { lsn, source } => write!(f, "apply error at lsn={lsn}: {source}"),
226 }
227 }
228}
229
230impl std::error::Error for LogicalApplyError {}
231
232#[derive(Debug, Clone, PartialEq, Eq)]
233pub enum BookmarkWaitError {
234 Timeout { target_lsn: u64, applied_lsn: u64 },
235 TermMismatch { target_term: u64, applied_term: u64 },
236}
237
238impl BookmarkWaitError {
239 pub fn is_timeout(&self) -> bool {
240 matches!(self, Self::Timeout { .. })
241 }
242}
243
244impl std::fmt::Display for BookmarkWaitError {
245 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246 match self {
247 Self::Timeout {
248 target_lsn,
249 applied_lsn,
250 } => write!(
251 f,
252 "timed out waiting for causal bookmark lsn {target_lsn}; applied={applied_lsn}"
253 ),
254 Self::TermMismatch {
255 target_term,
256 applied_term,
257 } => write!(
258 f,
259 "causal bookmark term mismatch: target={target_term} applied={applied_term}"
260 ),
261 }
262 }
263}
264
265impl std::error::Error for BookmarkWaitError {}
266
267pub struct LogicalChangeApplier {
272 last_applied_term: AtomicU64,
273 last_applied_lsn: AtomicU64,
274 received_frontier_lsn: AtomicU64,
275 last_payload_hash: Mutex<Option<[u8; 32]>>,
276 apply_wait: (Mutex<()>, Condvar),
277 metrics: std::sync::Arc<ReplicaApplyMetrics>,
282}
283
284impl LogicalChangeApplier {
285 pub fn new(starting_lsn: u64) -> Self {
290 Self::with_metrics(
291 starting_lsn,
292 std::sync::Arc::new(ReplicaApplyMetrics::default()),
293 )
294 }
295
296 pub fn with_metrics(starting_lsn: u64, metrics: std::sync::Arc<ReplicaApplyMetrics>) -> Self {
301 Self {
302 last_applied_term: AtomicU64::new(crate::replication::DEFAULT_REPLICATION_TERM),
303 last_applied_lsn: AtomicU64::new(starting_lsn),
304 received_frontier_lsn: AtomicU64::new(starting_lsn),
305 last_payload_hash: Mutex::new(None),
306 apply_wait: (Mutex::new(()), Condvar::new()),
307 metrics,
308 }
309 }
310
311 pub fn metrics(&self) -> &std::sync::Arc<ReplicaApplyMetrics> {
313 &self.metrics
314 }
315
316 pub fn last_applied_lsn(&self) -> u64 {
317 self.last_applied_lsn.load(Ordering::Acquire)
318 }
319
320 pub fn received_frontier_lsn(&self) -> u64 {
321 self.received_frontier_lsn.load(Ordering::Acquire)
322 }
323
324 pub fn last_applied_term(&self) -> u64 {
325 self.last_applied_term.load(Ordering::Acquire)
326 }
327
328 pub fn wait_for_bookmark(
329 &self,
330 bookmark: &crate::replication::CausalBookmark,
331 timeout: std::time::Duration,
332 ) -> Result<(), BookmarkWaitError> {
333 let deadline = std::time::Instant::now() + timeout;
334 let target_lsn = bookmark.commit_lsn();
335 let target_term = bookmark.term();
336
337 let mut guard = self.apply_wait.0.lock().expect("apply wait mutex");
338 loop {
339 let applied_lsn = self.last_applied_lsn();
340 let applied_term = self.last_applied_term();
341 if applied_lsn >= target_lsn {
342 if applied_term == target_term {
343 return Ok(());
344 }
345 return Err(BookmarkWaitError::TermMismatch {
346 target_term,
347 applied_term,
348 });
349 }
350
351 let now = std::time::Instant::now();
352 if now >= deadline {
353 return Err(BookmarkWaitError::Timeout {
354 target_lsn,
355 applied_lsn,
356 });
357 }
358 let remaining = deadline.saturating_duration_since(now);
359 let (next_guard, wait_result) = self
360 .apply_wait
361 .1
362 .wait_timeout(guard, remaining)
363 .expect("apply wait condvar");
364 guard = next_guard;
365 if wait_result.timed_out() {
366 return Err(BookmarkWaitError::Timeout {
367 target_lsn,
368 applied_lsn: self.last_applied_lsn(),
369 });
370 }
371 }
372 }
373
374 pub fn apply(
382 &self,
383 db: &RedDB,
384 record: &ChangeRecord,
385 mode: ApplyMode,
386 ) -> Result<ApplyOutcome, LogicalApplyError> {
387 self.apply_fenced(db, record, mode, None)
388 }
389
390 pub fn apply_fenced(
399 &self,
400 db: &RedDB,
401 record: &ChangeRecord,
402 mode: ApplyMode,
403 range_fence: Option<&RangeAuthority>,
404 ) -> Result<ApplyOutcome, LogicalApplyError> {
405 let last = self.last_applied_lsn.load(Ordering::Acquire);
406 let last_term = self.last_applied_term.load(Ordering::Acquire);
407
408 if let Some(fence) = range_fence {
413 if let Err(reason) = fence.admit(record) {
414 self.metrics.record(ApplyErrorKind::Fenced);
415 return Err(LogicalApplyError::RangeFenced {
416 range_id: fence.range_id,
417 lsn: record.lsn,
418 reason,
419 });
420 }
421 }
422
423 if record.term < last_term {
432 self.metrics.record(ApplyErrorKind::Fenced);
433 return Err(LogicalApplyError::StaleTermFenced {
434 record_term: record.term,
435 current_term: last_term,
436 lsn: record.lsn,
437 });
438 }
439
440 let payload_hash = record_payload_hash(record);
441 self.received_frontier_lsn
442 .fetch_max(record.lsn, Ordering::AcqRel);
443
444 if last == 0 && record.lsn > 0 {
445 self.do_apply(db, record, mode)?;
446 self.last_applied_term.store(record.term, Ordering::Release);
447 self.last_applied_lsn.store(record.lsn, Ordering::Release);
448 *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
449 self.apply_wait.1.notify_all();
450 return Ok(ApplyOutcome::Applied);
451 }
452
453 if record.lsn == last {
454 let prior = *self.last_payload_hash.lock().expect("payload hash mutex");
455 return match prior {
456 Some(p) if p == payload_hash => Ok(ApplyOutcome::Idempotent),
457 Some(p) => Err(LogicalApplyError::Divergence {
458 expected_term: last_term,
459 got_term: record.term,
460 lsn: record.lsn,
461 expected: hex_digest(&p),
462 got: hex_digest(&payload_hash),
463 }),
464 None => Ok(ApplyOutcome::Idempotent),
465 };
466 }
467 if record.lsn < last {
468 return Ok(ApplyOutcome::Skipped);
469 }
470 if record.lsn > last + 1 {
471 return Err(LogicalApplyError::Gap {
472 last,
473 next: record.lsn,
474 });
475 }
476
477 self.do_apply(db, record, mode)?;
478 self.last_applied_term.store(record.term, Ordering::Release);
479 self.last_applied_lsn.store(record.lsn, Ordering::Release);
480 *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
481 self.apply_wait.1.notify_all();
482 Ok(ApplyOutcome::Applied)
483 }
484
485 fn do_apply(
486 &self,
487 db: &RedDB,
488 record: &ChangeRecord,
489 mode: ApplyMode,
490 ) -> Result<(), LogicalApplyError> {
491 Self::apply_record_with_metrics(db, record, mode, &self.metrics).map_err(|err| {
492 LogicalApplyError::Apply {
493 lsn: record.lsn,
494 source: err,
495 }
496 })
497 }
498
499 pub fn apply_record(db: &RedDB, record: &ChangeRecord, mode: ApplyMode) -> RedDBResult<()> {
506 Self::apply_record_with_metrics(db, record, mode, &ReplicaApplyMetrics::default())
507 }
508
509 pub fn apply_record_with_metrics(
518 db: &RedDB,
519 record: &ChangeRecord,
520 _mode: ApplyMode,
521 metrics: &ReplicaApplyMetrics,
522 ) -> RedDBResult<()> {
523 let store = db.store();
524 match record.operation {
525 ChangeOperation::Delete => {
526 match store.delete(&record.collection, EntityId::new(record.entity_id)) {
527 Ok(true) => {}
528 Ok(false) => {
529 metrics.record(ApplyErrorKind::Miss);
532 tracing::warn!(
533 target: "reddb::replication::apply",
534 lsn = record.lsn,
535 collection = %record.collection,
536 entity_id = record.entity_id,
537 "replica delete found no matching entity; recorded apply miss (non-fatal divergence signal)"
538 );
539 }
540 Err(crate::storage::StoreError::CollectionNotFound(name)) => {
541 metrics.record(ApplyErrorKind::Miss);
544 tracing::warn!(
545 target: "reddb::replication::apply",
546 lsn = record.lsn,
547 collection = %name,
548 entity_id = record.entity_id,
549 "replica delete against missing collection; recorded apply miss (non-fatal divergence signal)"
550 );
551 }
552 Err(err) => {
553 return Err(RedDBError::Internal(err.to_string()));
557 }
558 }
559 }
560 ChangeOperation::Refresh => {
561 let records = record.refresh_records.clone().ok_or_else(|| {
570 RedDBError::Internal(
571 "replication refresh record missing refresh_records payload".to_string(),
572 )
573 })?;
574 store
575 .refresh_collection_from_records(&record.collection, records)
576 .map_err(|err| RedDBError::Internal(err.to_string()))?;
577 }
578 ChangeOperation::Insert | ChangeOperation::Update => {
579 let Some(bytes) = &record.entity_bytes else {
580 return Err(RedDBError::Internal(
581 "replication record missing entity payload".to_string(),
582 ));
583 };
584 let entity = UnifiedStore::deserialize_entity(bytes, store.format_version())
585 .map_err(|err| RedDBError::Internal(err.to_string()))?;
586
587 if matches!(entity.kind, EntityKind::TableRow { .. }) {
606 let logical = entity.logical_id();
607 let new_id = entity.id;
608 let superseding_xid = if entity.xmin != 0 { entity.xmin } else { 1 };
609 let stale: Vec<_> = store
610 .table_row_versions_by_logical_id(&record.collection, logical)
611 .into_iter()
612 .filter(|version| version.id != new_id && version.xmax == 0)
613 .collect();
614 if !stale.is_empty() {
615 let manager = store
616 .get_collection(&record.collection)
617 .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
618 for mut version in stale {
619 version.set_xmax(superseding_xid);
620 manager
621 .update(version)
622 .map_err(|err| RedDBError::Internal(err.to_string()))?;
623 }
624 }
625 }
626
627 let exists = store
628 .get(&record.collection, EntityId::new(record.entity_id))
629 .is_some();
630 if exists {
631 let manager = store
632 .get_collection(&record.collection)
633 .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
634 manager
635 .update(entity.clone())
636 .map_err(|err| RedDBError::Internal(err.to_string()))?;
637 } else {
638 store
639 .insert_auto(&record.collection, entity.clone())
640 .map_err(|err| RedDBError::Internal(err.to_string()))?;
641 }
642 if let Some(metadata_json) = &record.metadata {
643 let metadata_json = wire_json_to_server_json(metadata_json);
644 let metadata = metadata_from_json(&metadata_json)
645 .map_err(|err| RedDBError::Internal(err.to_string()))?;
646 store
647 .set_metadata(&record.collection, entity.id, metadata)
648 .map_err(|err| RedDBError::Internal(err.to_string()))?;
649 }
650 store
651 .context_index()
652 .index_entity(&record.collection, &entity);
653 }
654 }
655 Ok(())
656 }
657}
658
659fn record_payload_hash(record: &ChangeRecord) -> [u8; 32] {
660 let mut hasher = crate::crypto::sha256::Sha256::new();
661 hasher.update(&record.term.to_le_bytes());
662 hasher.update(&record.lsn.to_le_bytes());
663 hasher.update(&[record.operation as u8]);
664 hasher.update(record.collection.as_bytes());
665 hasher.update(&record.entity_id.to_le_bytes());
666 hasher.update(&record.range_id.unwrap_or(u64::MAX).to_le_bytes());
672 hasher.update(&record.ownership_epoch.unwrap_or(u64::MAX).to_le_bytes());
673 if let Some(bytes) = &record.entity_bytes {
674 hasher.update(bytes);
675 }
676 if let Some(records) = &record.refresh_records {
680 hasher.update(&(records.len() as u64).to_le_bytes());
681 for r in records {
682 hasher.update(&(r.len() as u64).to_le_bytes());
683 hasher.update(r);
684 }
685 }
686 hasher.finalize()
687}
688
689fn hex_digest(bytes: &[u8; 32]) -> String {
690 crate::utils::to_hex(bytes)
691}
692
693#[cfg(test)]
694mod tests {
695 use super::*;
696 use crate::replication::cdc::ChangeOperation;
697 use crate::storage::schema::Value;
698 use crate::storage::{EntityData, EntityId, EntityKind, RedDB, RowData, UnifiedEntity};
699 use std::sync::Arc;
700
701 fn open_db() -> (RedDB, std::path::PathBuf) {
702 let path = std::env::temp_dir().join(format!(
703 "reddb_logical_apply_{}_{}",
704 std::process::id(),
705 std::time::SystemTime::now()
706 .duration_since(std::time::UNIX_EPOCH)
707 .unwrap()
708 .as_nanos()
709 ));
710 let _ = std::fs::remove_file(&path);
711 let db = RedDB::open(&path).unwrap();
712 (db, path)
713 }
714
715 fn record(lsn: u64, payload: &[u8]) -> ChangeRecord {
716 let timestamp = 100 + lsn;
717 let mut entity = UnifiedEntity::new(
718 EntityId::new(lsn),
719 EntityKind::TableRow {
720 table: Arc::from("users"),
721 row_id: lsn,
722 },
723 EntityData::Row(RowData::with_names(
724 vec![Value::UnsignedInteger(lsn), Value::Blob(payload.to_vec())],
725 vec!["id".to_string(), "payload".to_string()],
726 )),
727 );
728 entity.created_at = timestamp;
729 entity.updated_at = timestamp;
730 entity.sequence_id = lsn;
731 change_record_from_entity(
732 lsn,
733 timestamp,
734 ChangeOperation::Insert,
735 "users",
736 "row",
737 &entity,
738 crate::api::REDDB_FORMAT_VERSION,
739 None,
740 )
741 }
742
743 fn delete_record(lsn: u64, collection: &str, entity_id: u64) -> ChangeRecord {
744 ChangeRecord {
745 term: crate::replication::DEFAULT_REPLICATION_TERM,
746 lsn,
747 timestamp: 100 + lsn,
748 operation: ChangeOperation::Delete,
749 collection: collection.to_string(),
750 entity_id,
751 entity_kind: "row".to_string(),
752 entity_bytes: None,
753 metadata: None,
754 refresh_records: None,
755 range_id: None,
756 ownership_epoch: None,
757 }
758 }
759
760 fn table_row_entity(id: u64) -> UnifiedEntity {
761 let mut entity = UnifiedEntity::new(
762 EntityId::new(id),
763 EntityKind::TableRow {
764 table: Arc::from("users"),
765 row_id: id,
766 },
767 EntityData::Row(RowData::with_names(
768 vec![Value::UnsignedInteger(id)],
769 vec!["id".to_string()],
770 )),
771 );
772 entity.created_at = 100 + id;
773 entity.updated_at = 100 + id;
774 entity.sequence_id = id;
775 entity
776 }
777
778 #[test]
782 fn delete_against_missing_collection_records_apply_miss() {
783 let (db, path) = open_db();
784 let metrics = ReplicaApplyMetrics::default();
785 let before = metrics.apply_miss_total.load(Ordering::Relaxed);
786
787 LogicalChangeApplier::apply_record_with_metrics(
788 &db,
789 &delete_record(1, "no_such_collection", 42),
790 ApplyMode::Replica,
791 &metrics,
792 )
793 .expect("missing-target delete is non-fatal");
794
795 assert_eq!(
796 metrics.apply_miss_total.load(Ordering::Relaxed),
797 before + 1,
798 "delete against a missing collection must bump the apply-miss signal"
799 );
800 let _ = std::fs::remove_file(path);
801 }
802
803 #[test]
806 fn delete_against_missing_entity_records_apply_miss() {
807 let (db, path) = open_db();
808 let _ = db.store().get_or_create_collection("users");
809 let metrics = ReplicaApplyMetrics::default();
810
811 LogicalChangeApplier::apply_record_with_metrics(
812 &db,
813 &delete_record(1, "users", 9999),
814 ApplyMode::Replica,
815 &metrics,
816 )
817 .expect("missing-entity delete is non-fatal");
818
819 assert_eq!(
820 metrics.apply_miss_total.load(Ordering::Relaxed),
821 1,
822 "delete of an absent entity must bump the apply-miss signal"
823 );
824 let _ = std::fs::remove_file(path);
825 }
826
827 #[test]
830 fn delete_of_present_target_records_no_apply_miss() {
831 let (db, path) = open_db();
832 let store = db.store();
833 let _ = store.get_or_create_collection("users");
834 let id = store
835 .insert_auto("users", table_row_entity(1))
836 .expect("insert entity");
837 let metrics = ReplicaApplyMetrics::default();
838
839 LogicalChangeApplier::apply_record_with_metrics(
840 &db,
841 &delete_record(1, "users", id.raw()),
842 ApplyMode::Replica,
843 &metrics,
844 )
845 .expect("present-target delete applies");
846
847 assert_eq!(
848 metrics.apply_miss_total.load(Ordering::Relaxed),
849 0,
850 "deleting a present target must not fire the apply-miss signal"
851 );
852 assert!(
853 store.get("users", id).is_none(),
854 "the entity must actually be removed on the normal path"
855 );
856 let _ = std::fs::remove_file(path);
857 }
858
859 #[test]
862 fn stateful_apply_surfaces_delete_miss_via_metrics_handle() {
863 let (db, path) = open_db();
864 let applier =
865 LogicalChangeApplier::with_metrics(0, Arc::new(ReplicaApplyMetrics::default()));
866
867 applier
868 .apply(&db, &delete_record(1, "ghost", 7), ApplyMode::Replica)
869 .expect("missing-target delete advances the chain");
870
871 assert_eq!(
872 applier.metrics().apply_miss_total.load(Ordering::Relaxed),
873 1,
874 "the applier's shared metrics handle must record the miss"
875 );
876 assert_eq!(
877 applier.last_applied_lsn(),
878 1,
879 "a non-fatal miss still advances the LSN chain"
880 );
881 let _ = std::fs::remove_file(path);
882 }
883
884 #[test]
885 fn apply_advances_on_monotonic_lsn() {
886 let (db, path) = open_db();
887 let applier = LogicalChangeApplier::new(0);
888 assert_eq!(
889 applier
890 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
891 .unwrap(),
892 ApplyOutcome::Applied
893 );
894 assert_eq!(applier.last_applied_lsn(), 1);
895 assert_eq!(
896 applier
897 .apply(&db, &record(2, b"b"), ApplyMode::Replica)
898 .unwrap(),
899 ApplyOutcome::Applied
900 );
901 assert_eq!(applier.last_applied_lsn(), 2);
902 let _ = std::fs::remove_file(path);
903 }
904
905 #[test]
906 fn apply_idempotent_on_duplicate_lsn_same_payload() {
907 let (db, path) = open_db();
908 let applier = LogicalChangeApplier::new(0);
909 let r = record(5, b"same");
910 applier.apply(&db, &r, ApplyMode::Replica).unwrap();
911 assert_eq!(
912 applier.apply(&db, &r, ApplyMode::Replica).unwrap(),
913 ApplyOutcome::Idempotent
914 );
915 assert_eq!(applier.last_applied_lsn(), 5);
916 let _ = std::fs::remove_file(path);
917 }
918
919 #[test]
920 fn apply_fails_closed_on_lsn_collision_diff_payload() {
921 let (db, path) = open_db();
922 let applier = LogicalChangeApplier::new(0);
923 applier
924 .apply(&db, &record(7, b"first"), ApplyMode::Replica)
925 .unwrap();
926 let err = applier
927 .apply(&db, &record(7, b"different"), ApplyMode::Replica)
928 .unwrap_err();
929 assert!(
930 matches!(err, LogicalApplyError::Divergence { lsn: 7, .. }),
931 "got {err:?}"
932 );
933 let _ = std::fs::remove_file(path);
934 }
935
936 #[test]
937 fn apply_fails_closed_on_same_lsn_different_term() {
938 let (db, path) = open_db();
939 let applier = LogicalChangeApplier::new(0);
940 applier
941 .apply(&db, &record(7, b"same").with_term(1), ApplyMode::Replica)
942 .unwrap();
943 let err = applier
944 .apply(&db, &record(7, b"same").with_term(2), ApplyMode::Replica)
945 .unwrap_err();
946 assert!(
947 matches!(
948 err,
949 LogicalApplyError::Divergence {
950 lsn: 7,
951 expected_term: 1,
952 got_term: 2,
953 ..
954 }
955 ),
956 "got {err:?}"
957 );
958 assert_eq!(applier.last_applied_term(), 1);
959 assert_eq!(applier.last_applied_lsn(), 7);
960 let _ = std::fs::remove_file(path);
961 }
962
963 #[test]
968 fn apply_fences_stale_term_record() {
969 let (db, path) = open_db();
970 let applier = LogicalChangeApplier::new(0);
971
972 applier
974 .apply(&db, &record(1, b"a").with_term(5), ApplyMode::Replica)
975 .unwrap();
976 assert_eq!(applier.last_applied_term(), 5);
977 assert_eq!(applier.last_applied_lsn(), 1);
978
979 let before = applier.metrics().fenced_total.load(Ordering::Relaxed);
981 let err = applier
982 .apply(&db, &record(2, b"b").with_term(4), ApplyMode::Replica)
983 .unwrap_err();
984 assert!(
985 matches!(
986 err,
987 LogicalApplyError::StaleTermFenced {
988 record_term: 4,
989 current_term: 5,
990 lsn: 2,
991 }
992 ),
993 "got {err:?}"
994 );
995 assert_eq!(err.kind(), ApplyErrorKind::Fenced);
996 assert_eq!(
997 applier.metrics().fenced_total.load(Ordering::Relaxed),
998 before + 1,
999 "the fence must leave a metrics trail"
1000 );
1001 assert_eq!(applier.last_applied_lsn(), 1, "watermark must not advance");
1003 assert_eq!(applier.last_applied_term(), 5);
1004 assert_eq!(
1005 applier.received_frontier_lsn(),
1006 1,
1007 "a fenced record must not even advance the received frontier"
1008 );
1009 let _ = std::fs::remove_file(path);
1010 }
1011
1012 #[test]
1016 fn apply_admits_same_term_and_adopts_higher_term() {
1017 let (db, path) = open_db();
1018 let applier = LogicalChangeApplier::new(0);
1019
1020 applier
1021 .apply(&db, &record(1, b"a").with_term(3), ApplyMode::Replica)
1022 .unwrap();
1023 applier
1025 .apply(&db, &record(2, b"b").with_term(3), ApplyMode::Replica)
1026 .unwrap();
1027 assert_eq!(applier.last_applied_term(), 3);
1028 applier
1030 .apply(&db, &record(3, b"c").with_term(7), ApplyMode::Replica)
1031 .unwrap();
1032 assert_eq!(applier.last_applied_term(), 7);
1033 assert_eq!(applier.last_applied_lsn(), 3);
1034
1035 let err = applier
1037 .apply(&db, &record(4, b"d").with_term(3), ApplyMode::Replica)
1038 .unwrap_err();
1039 assert!(
1040 matches!(err, LogicalApplyError::StaleTermFenced { .. }),
1041 "got {err:?}"
1042 );
1043 let _ = std::fs::remove_file(path);
1044 }
1045
1046 #[test]
1051 fn apply_fenced_rejects_stale_ownership_epoch() {
1052 let (db, path) = open_db();
1053 let applier = LogicalChangeApplier::new(0);
1054 let fence = RangeAuthority {
1055 range_id: 7,
1056 min_term: 1,
1057 min_ownership_epoch: 5,
1058 };
1059
1060 let stale = record(1, b"a").with_range_authority(7, 4);
1061 let before = applier.metrics().fenced_total.load(Ordering::Relaxed);
1062 let err = applier
1063 .apply_fenced(&db, &stale, ApplyMode::Replica, Some(&fence))
1064 .unwrap_err();
1065 assert!(
1066 matches!(
1067 err,
1068 LogicalApplyError::RangeFenced {
1069 range_id: 7,
1070 lsn: 1,
1071 reason: RangeAdmitError::StaleOwnershipEpoch {
1072 record_epoch: 4,
1073 accepted_epoch: 5,
1074 },
1075 }
1076 ),
1077 "got {err:?}"
1078 );
1079 assert_eq!(err.kind(), ApplyErrorKind::Fenced);
1080 assert_eq!(
1081 applier.metrics().fenced_total.load(Ordering::Relaxed),
1082 before + 1
1083 );
1084 assert_eq!(applier.last_applied_lsn(), 0, "watermark must not advance");
1085 assert_eq!(
1086 applier.received_frontier_lsn(),
1087 0,
1088 "a range-fenced record must not advance the received frontier"
1089 );
1090 let _ = std::fs::remove_file(path);
1091 }
1092
1093 #[test]
1096 fn apply_fenced_rejects_stale_range_term_on_restore() {
1097 let (db, path) = open_db();
1098 let applier = LogicalChangeApplier::new(0);
1099 let fence = RangeAuthority {
1100 range_id: 3,
1101 min_term: 6,
1102 min_ownership_epoch: 1,
1103 };
1104
1105 let stale = record(1, b"a").with_term(4).with_range_authority(3, 9);
1106 let err = applier
1107 .apply_fenced(&db, &stale, ApplyMode::Restore, Some(&fence))
1108 .unwrap_err();
1109 assert!(
1110 matches!(
1111 err,
1112 LogicalApplyError::RangeFenced {
1113 range_id: 3,
1114 reason: RangeAdmitError::StaleTerm {
1115 record_term: 4,
1116 accepted_term: 6,
1117 },
1118 ..
1119 }
1120 ),
1121 "got {err:?}"
1122 );
1123 let _ = std::fs::remove_file(path);
1124 }
1125
1126 #[test]
1129 fn apply_fenced_admits_current_and_ignores_other_ranges() {
1130 let (db, path) = open_db();
1131 let applier = LogicalChangeApplier::new(0);
1132 let fence = RangeAuthority {
1133 range_id: 7,
1134 min_term: 1,
1135 min_ownership_epoch: 5,
1136 };
1137
1138 applier
1140 .apply_fenced(
1141 &db,
1142 &record(1, b"a").with_range_authority(7, 5),
1143 ApplyMode::Replica,
1144 Some(&fence),
1145 )
1146 .expect("current record applies");
1147 assert_eq!(applier.last_applied_lsn(), 1);
1148
1149 applier
1152 .apply_fenced(
1153 &db,
1154 &record(2, b"b").with_range_authority(99, 1),
1155 ApplyMode::Replica,
1156 Some(&fence),
1157 )
1158 .expect("other-range record bypasses this fence");
1159 assert_eq!(applier.last_applied_lsn(), 2);
1160 let _ = std::fs::remove_file(path);
1161 }
1162
1163 #[test]
1168 fn range_catchup_plan_applies_only_its_range_through_the_fence() {
1169 use crate::replication::cdc::{plan_range_catchup, RangeStreamPosition, RangeStreamReject};
1170
1171 let (db, path) = open_db();
1172 let applier = LogicalChangeApplier::new(0);
1173
1174 let stream = vec![
1177 record(1, b"a").with_range_authority(7, 5),
1178 record(2, b"b").with_range_authority(7, 5),
1179 record(3, b"c").with_range_authority(7, 5),
1180 record(4, b"d").with_range_authority(9, 2),
1181 record(5, b"e").with_range_authority(9, 2),
1182 record(6, b"f").with_range_authority(7, 4),
1183 ];
1184
1185 let position = RangeStreamPosition::new(7, 0, 1, 5);
1187 let plan = plan_range_catchup(&position, &stream);
1188
1189 assert_eq!(plan.apply, vec![0, 1, 2]);
1192 assert_eq!(
1193 plan.rejected,
1194 vec![RangeStreamReject {
1195 lsn: 6,
1196 error: RangeAdmitError::StaleOwnershipEpoch {
1197 record_epoch: 4,
1198 accepted_epoch: 5,
1199 },
1200 }]
1201 );
1202 assert_eq!(plan.resume.applied_lsn, 3);
1203
1204 let fence = position.authority();
1206 for index in &plan.apply {
1207 applier
1208 .apply_fenced(&db, &stream[*index], ApplyMode::Replica, Some(&fence))
1209 .expect("planned record applies through the fence");
1210 }
1211 assert_eq!(applier.last_applied_lsn(), 3);
1212
1213 let stale = &stream[5];
1216 let err = applier
1217 .apply_fenced(&db, stale, ApplyMode::Replica, Some(&fence))
1218 .unwrap_err();
1219 assert!(
1220 matches!(err, LogicalApplyError::RangeFenced { range_id: 7, .. }),
1221 "got {err:?}"
1222 );
1223 assert_eq!(
1224 applier.last_applied_lsn(),
1225 3,
1226 "stale write must not advance"
1227 );
1228 let _ = std::fs::remove_file(path);
1229 }
1230
1231 #[test]
1232 fn apply_skips_older_lsn() {
1233 let (db, path) = open_db();
1234 let applier = LogicalChangeApplier::new(0);
1235 applier
1236 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
1237 .unwrap();
1238 applier
1239 .apply(&db, &record(2, b"b"), ApplyMode::Replica)
1240 .unwrap();
1241 assert_eq!(
1242 applier
1243 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
1244 .unwrap(),
1245 ApplyOutcome::Skipped
1246 );
1247 assert_eq!(applier.last_applied_lsn(), 2);
1248 let _ = std::fs::remove_file(path);
1249 }
1250
1251 #[test]
1252 fn apply_returns_gap_on_future_lsn() {
1253 let (db, path) = open_db();
1254 let applier = LogicalChangeApplier::new(0);
1255 applier
1256 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
1257 .unwrap();
1258 let err = applier
1259 .apply(&db, &record(5, b"e"), ApplyMode::Replica)
1260 .unwrap_err();
1261 assert!(
1262 matches!(err, LogicalApplyError::Gap { last: 1, next: 5 }),
1263 "got {err:?}"
1264 );
1265 assert_eq!(applier.last_applied_lsn(), 1);
1266 let _ = std::fs::remove_file(path);
1267 }
1268}