1use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Condvar, Mutex};
5
6use crate::api::{RedDBError, RedDBResult};
7use crate::application::entity::metadata_from_json;
8use crate::replication::cdc::{ChangeOperation, ChangeRecord};
9use crate::storage::{EntityId, EntityKind, RedDB, UnifiedStore};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ApplyMode {
13 Replica,
14 Restore,
15}
16
17#[derive(Debug, Default)]
23pub struct ReplicaApplyMetrics {
24 pub gap_total: std::sync::atomic::AtomicU64,
25 pub divergence_total: std::sync::atomic::AtomicU64,
26 pub apply_error_total: std::sync::atomic::AtomicU64,
27 pub decode_error_total: std::sync::atomic::AtomicU64,
28 pub apply_miss_total: std::sync::atomic::AtomicU64,
34 pub fenced_total: std::sync::atomic::AtomicU64,
40}
41
42impl ReplicaApplyMetrics {
43 pub fn record(&self, kind: ApplyErrorKind) {
44 use std::sync::atomic::Ordering::Relaxed;
45 match kind {
46 ApplyErrorKind::Gap => {
47 self.gap_total.fetch_add(1, Relaxed);
48 }
49 ApplyErrorKind::Divergence => {
50 self.divergence_total.fetch_add(1, Relaxed);
51 }
52 ApplyErrorKind::Apply => {
53 self.apply_error_total.fetch_add(1, Relaxed);
54 }
55 ApplyErrorKind::Decode => {
56 self.decode_error_total.fetch_add(1, Relaxed);
57 }
58 ApplyErrorKind::Miss => {
59 self.apply_miss_total.fetch_add(1, Relaxed);
60 }
61 ApplyErrorKind::Fenced => {
62 self.fenced_total.fetch_add(1, Relaxed);
63 }
64 }
65 }
66
67 pub fn snapshot(&self) -> [(ApplyErrorKind, u64); 6] {
68 use std::sync::atomic::Ordering::Relaxed;
69 [
70 (ApplyErrorKind::Gap, self.gap_total.load(Relaxed)),
71 (
72 ApplyErrorKind::Divergence,
73 self.divergence_total.load(Relaxed),
74 ),
75 (ApplyErrorKind::Apply, self.apply_error_total.load(Relaxed)),
76 (
77 ApplyErrorKind::Decode,
78 self.decode_error_total.load(Relaxed),
79 ),
80 (ApplyErrorKind::Miss, self.apply_miss_total.load(Relaxed)),
81 (ApplyErrorKind::Fenced, self.fenced_total.load(Relaxed)),
82 ]
83 }
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum ApplyErrorKind {
88 Gap,
89 Divergence,
90 Apply,
91 Decode,
92 Miss,
95 Fenced,
98}
99
100impl ApplyErrorKind {
101 pub fn label(self) -> &'static str {
102 match self {
103 Self::Gap => "gap",
104 Self::Divergence => "divergence",
105 Self::Apply => "apply",
106 Self::Decode => "decode",
107 Self::Miss => "apply_miss",
108 Self::Fenced => "fenced",
109 }
110 }
111}
112
113impl LogicalApplyError {
114 pub fn kind(&self) -> ApplyErrorKind {
115 match self {
116 Self::Gap { .. } => ApplyErrorKind::Gap,
117 Self::Divergence { .. } => ApplyErrorKind::Divergence,
118 Self::Apply { .. } => ApplyErrorKind::Apply,
119 Self::StaleTermFenced { .. } => ApplyErrorKind::Fenced,
120 }
121 }
122}
123
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub enum ApplyOutcome {
131 Applied,
133 Idempotent,
135 Skipped,
137}
138
139#[derive(Debug)]
140pub enum LogicalApplyError {
141 Gap {
142 last: u64,
143 next: u64,
144 },
145 Divergence {
146 expected_term: u64,
147 got_term: u64,
148 lsn: u64,
149 expected: String,
150 got: String,
151 },
152 Apply {
153 lsn: u64,
154 source: RedDBError,
155 },
156 StaleTermFenced {
161 record_term: u64,
162 current_term: u64,
163 lsn: u64,
164 },
165}
166
167impl std::fmt::Display for LogicalApplyError {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 match self {
170 Self::Gap { last, next } => write!(f, "LSN gap on apply: last={last} next={next}"),
171 Self::StaleTermFenced {
172 record_term,
173 current_term,
174 lsn,
175 } => write!(
176 f,
177 "stale-term record fenced at lsn={lsn}: record term {record_term} is behind current term {current_term}"
178 ),
179 Self::Divergence {
180 expected_term,
181 got_term,
182 lsn,
183 expected,
184 got,
185 } => write!(
186 f,
187 "LSN divergence on apply at term/lsn=({got_term},{lsn}): expected term {expected_term} payload hash {expected}, got {got}"
188 ),
189 Self::Apply { lsn, source } => write!(f, "apply error at lsn={lsn}: {source}"),
190 }
191 }
192}
193
194impl std::error::Error for LogicalApplyError {}
195
196#[derive(Debug, Clone, PartialEq, Eq)]
197pub enum BookmarkWaitError {
198 Timeout { target_lsn: u64, applied_lsn: u64 },
199 TermMismatch { target_term: u64, applied_term: u64 },
200}
201
202impl BookmarkWaitError {
203 pub fn is_timeout(&self) -> bool {
204 matches!(self, Self::Timeout { .. })
205 }
206}
207
208impl std::fmt::Display for BookmarkWaitError {
209 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210 match self {
211 Self::Timeout {
212 target_lsn,
213 applied_lsn,
214 } => write!(
215 f,
216 "timed out waiting for causal bookmark lsn {target_lsn}; applied={applied_lsn}"
217 ),
218 Self::TermMismatch {
219 target_term,
220 applied_term,
221 } => write!(
222 f,
223 "causal bookmark term mismatch: target={target_term} applied={applied_term}"
224 ),
225 }
226 }
227}
228
229impl std::error::Error for BookmarkWaitError {}
230
231pub struct LogicalChangeApplier {
236 last_applied_term: AtomicU64,
237 last_applied_lsn: AtomicU64,
238 received_frontier_lsn: AtomicU64,
239 last_payload_hash: Mutex<Option<[u8; 32]>>,
240 apply_wait: (Mutex<()>, Condvar),
241 metrics: std::sync::Arc<ReplicaApplyMetrics>,
246}
247
248impl LogicalChangeApplier {
249 pub fn new(starting_lsn: u64) -> Self {
254 Self::with_metrics(
255 starting_lsn,
256 std::sync::Arc::new(ReplicaApplyMetrics::default()),
257 )
258 }
259
260 pub fn with_metrics(starting_lsn: u64, metrics: std::sync::Arc<ReplicaApplyMetrics>) -> Self {
265 Self {
266 last_applied_term: AtomicU64::new(crate::replication::DEFAULT_REPLICATION_TERM),
267 last_applied_lsn: AtomicU64::new(starting_lsn),
268 received_frontier_lsn: AtomicU64::new(starting_lsn),
269 last_payload_hash: Mutex::new(None),
270 apply_wait: (Mutex::new(()), Condvar::new()),
271 metrics,
272 }
273 }
274
275 pub fn metrics(&self) -> &std::sync::Arc<ReplicaApplyMetrics> {
277 &self.metrics
278 }
279
280 pub fn last_applied_lsn(&self) -> u64 {
281 self.last_applied_lsn.load(Ordering::Acquire)
282 }
283
284 pub fn received_frontier_lsn(&self) -> u64 {
285 self.received_frontier_lsn.load(Ordering::Acquire)
286 }
287
288 pub fn last_applied_term(&self) -> u64 {
289 self.last_applied_term.load(Ordering::Acquire)
290 }
291
292 pub fn wait_for_bookmark(
293 &self,
294 bookmark: &crate::replication::CausalBookmark,
295 timeout: std::time::Duration,
296 ) -> Result<(), BookmarkWaitError> {
297 let deadline = std::time::Instant::now() + timeout;
298 let target_lsn = bookmark.commit_lsn();
299 let target_term = bookmark.term();
300
301 let mut guard = self.apply_wait.0.lock().expect("apply wait mutex");
302 loop {
303 let applied_lsn = self.last_applied_lsn();
304 let applied_term = self.last_applied_term();
305 if applied_lsn >= target_lsn {
306 if applied_term == target_term {
307 return Ok(());
308 }
309 return Err(BookmarkWaitError::TermMismatch {
310 target_term,
311 applied_term,
312 });
313 }
314
315 let now = std::time::Instant::now();
316 if now >= deadline {
317 return Err(BookmarkWaitError::Timeout {
318 target_lsn,
319 applied_lsn,
320 });
321 }
322 let remaining = deadline.saturating_duration_since(now);
323 let (next_guard, wait_result) = self
324 .apply_wait
325 .1
326 .wait_timeout(guard, remaining)
327 .expect("apply wait condvar");
328 guard = next_guard;
329 if wait_result.timed_out() {
330 return Err(BookmarkWaitError::Timeout {
331 target_lsn,
332 applied_lsn: self.last_applied_lsn(),
333 });
334 }
335 }
336 }
337
338 pub fn apply(
346 &self,
347 db: &RedDB,
348 record: &ChangeRecord,
349 mode: ApplyMode,
350 ) -> Result<ApplyOutcome, LogicalApplyError> {
351 let last = self.last_applied_lsn.load(Ordering::Acquire);
352 let last_term = self.last_applied_term.load(Ordering::Acquire);
353
354 if record.term < last_term {
363 self.metrics.record(ApplyErrorKind::Fenced);
364 return Err(LogicalApplyError::StaleTermFenced {
365 record_term: record.term,
366 current_term: last_term,
367 lsn: record.lsn,
368 });
369 }
370
371 let payload_hash = record_payload_hash(record);
372 self.received_frontier_lsn
373 .fetch_max(record.lsn, Ordering::AcqRel);
374
375 if last == 0 && record.lsn > 0 {
376 self.do_apply(db, record, mode)?;
377 self.last_applied_term.store(record.term, Ordering::Release);
378 self.last_applied_lsn.store(record.lsn, Ordering::Release);
379 *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
380 self.apply_wait.1.notify_all();
381 return Ok(ApplyOutcome::Applied);
382 }
383
384 if record.lsn == last {
385 let prior = *self.last_payload_hash.lock().expect("payload hash mutex");
386 return match prior {
387 Some(p) if p == payload_hash => Ok(ApplyOutcome::Idempotent),
388 Some(p) => Err(LogicalApplyError::Divergence {
389 expected_term: last_term,
390 got_term: record.term,
391 lsn: record.lsn,
392 expected: hex_digest(&p),
393 got: hex_digest(&payload_hash),
394 }),
395 None => Ok(ApplyOutcome::Idempotent),
396 };
397 }
398 if record.lsn < last {
399 return Ok(ApplyOutcome::Skipped);
400 }
401 if record.lsn > last + 1 {
402 return Err(LogicalApplyError::Gap {
403 last,
404 next: record.lsn,
405 });
406 }
407
408 self.do_apply(db, record, mode)?;
409 self.last_applied_term.store(record.term, Ordering::Release);
410 self.last_applied_lsn.store(record.lsn, Ordering::Release);
411 *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
412 self.apply_wait.1.notify_all();
413 Ok(ApplyOutcome::Applied)
414 }
415
416 fn do_apply(
417 &self,
418 db: &RedDB,
419 record: &ChangeRecord,
420 mode: ApplyMode,
421 ) -> Result<(), LogicalApplyError> {
422 Self::apply_record_with_metrics(db, record, mode, &self.metrics).map_err(|err| {
423 LogicalApplyError::Apply {
424 lsn: record.lsn,
425 source: err,
426 }
427 })
428 }
429
430 pub fn apply_record(db: &RedDB, record: &ChangeRecord, mode: ApplyMode) -> RedDBResult<()> {
437 Self::apply_record_with_metrics(db, record, mode, &ReplicaApplyMetrics::default())
438 }
439
440 pub fn apply_record_with_metrics(
449 db: &RedDB,
450 record: &ChangeRecord,
451 _mode: ApplyMode,
452 metrics: &ReplicaApplyMetrics,
453 ) -> RedDBResult<()> {
454 let store = db.store();
455 match record.operation {
456 ChangeOperation::Delete => {
457 match store.delete(&record.collection, EntityId::new(record.entity_id)) {
458 Ok(true) => {}
459 Ok(false) => {
460 metrics.record(ApplyErrorKind::Miss);
463 tracing::warn!(
464 target: "reddb::replication::apply",
465 lsn = record.lsn,
466 collection = %record.collection,
467 entity_id = record.entity_id,
468 "replica delete found no matching entity; recorded apply miss (non-fatal divergence signal)"
469 );
470 }
471 Err(crate::storage::StoreError::CollectionNotFound(name)) => {
472 metrics.record(ApplyErrorKind::Miss);
475 tracing::warn!(
476 target: "reddb::replication::apply",
477 lsn = record.lsn,
478 collection = %name,
479 entity_id = record.entity_id,
480 "replica delete against missing collection; recorded apply miss (non-fatal divergence signal)"
481 );
482 }
483 Err(err) => {
484 return Err(RedDBError::Internal(err.to_string()));
488 }
489 }
490 }
491 ChangeOperation::Refresh => {
492 let records = record.refresh_records.clone().ok_or_else(|| {
501 RedDBError::Internal(
502 "replication refresh record missing refresh_records payload".to_string(),
503 )
504 })?;
505 store
506 .refresh_collection_from_records(&record.collection, records)
507 .map_err(|err| RedDBError::Internal(err.to_string()))?;
508 }
509 ChangeOperation::Insert | ChangeOperation::Update => {
510 let Some(bytes) = &record.entity_bytes else {
511 return Err(RedDBError::Internal(
512 "replication record missing entity payload".to_string(),
513 ));
514 };
515 let entity = UnifiedStore::deserialize_entity(bytes, store.format_version())
516 .map_err(|err| RedDBError::Internal(err.to_string()))?;
517
518 if matches!(entity.kind, EntityKind::TableRow { .. }) {
537 let logical = entity.logical_id();
538 let new_id = entity.id;
539 let superseding_xid = if entity.xmin != 0 { entity.xmin } else { 1 };
540 let stale: Vec<_> = store
541 .table_row_versions_by_logical_id(&record.collection, logical)
542 .into_iter()
543 .filter(|version| version.id != new_id && version.xmax == 0)
544 .collect();
545 if !stale.is_empty() {
546 let manager = store
547 .get_collection(&record.collection)
548 .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
549 for mut version in stale {
550 version.set_xmax(superseding_xid);
551 manager
552 .update(version)
553 .map_err(|err| RedDBError::Internal(err.to_string()))?;
554 }
555 }
556 }
557
558 let exists = store
559 .get(&record.collection, EntityId::new(record.entity_id))
560 .is_some();
561 if exists {
562 let manager = store
563 .get_collection(&record.collection)
564 .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
565 manager
566 .update(entity.clone())
567 .map_err(|err| RedDBError::Internal(err.to_string()))?;
568 } else {
569 store
570 .insert_auto(&record.collection, entity.clone())
571 .map_err(|err| RedDBError::Internal(err.to_string()))?;
572 }
573 if let Some(metadata_json) = &record.metadata {
574 let metadata = metadata_from_json(metadata_json)
575 .map_err(|err| RedDBError::Internal(err.to_string()))?;
576 store
577 .set_metadata(&record.collection, entity.id, metadata)
578 .map_err(|err| RedDBError::Internal(err.to_string()))?;
579 }
580 store
581 .context_index()
582 .index_entity(&record.collection, &entity);
583 }
584 }
585 Ok(())
586 }
587}
588
589fn record_payload_hash(record: &ChangeRecord) -> [u8; 32] {
590 let mut hasher = crate::crypto::sha256::Sha256::new();
591 hasher.update(&record.term.to_le_bytes());
592 hasher.update(&record.lsn.to_le_bytes());
593 hasher.update(&[record.operation as u8]);
594 hasher.update(record.collection.as_bytes());
595 hasher.update(&record.entity_id.to_le_bytes());
596 if let Some(bytes) = &record.entity_bytes {
597 hasher.update(bytes);
598 }
599 if let Some(records) = &record.refresh_records {
603 hasher.update(&(records.len() as u64).to_le_bytes());
604 for r in records {
605 hasher.update(&(r.len() as u64).to_le_bytes());
606 hasher.update(r);
607 }
608 }
609 hasher.finalize()
610}
611
612fn hex_digest(bytes: &[u8; 32]) -> String {
613 crate::utils::to_hex(bytes)
614}
615
616#[cfg(test)]
617mod tests {
618 use super::*;
619 use crate::replication::cdc::ChangeOperation;
620 use crate::storage::schema::Value;
621 use crate::storage::{EntityData, EntityId, EntityKind, RedDB, RowData, UnifiedEntity};
622 use std::sync::Arc;
623
624 fn open_db() -> (RedDB, std::path::PathBuf) {
625 let path = std::env::temp_dir().join(format!(
626 "reddb_logical_apply_{}_{}",
627 std::process::id(),
628 std::time::SystemTime::now()
629 .duration_since(std::time::UNIX_EPOCH)
630 .unwrap()
631 .as_nanos()
632 ));
633 let _ = std::fs::remove_file(&path);
634 let db = RedDB::open(&path).unwrap();
635 (db, path)
636 }
637
638 fn record(lsn: u64, payload: &[u8]) -> ChangeRecord {
639 let timestamp = 100 + lsn;
640 let mut entity = UnifiedEntity::new(
641 EntityId::new(lsn),
642 EntityKind::TableRow {
643 table: Arc::from("users"),
644 row_id: lsn,
645 },
646 EntityData::Row(RowData::with_names(
647 vec![Value::UnsignedInteger(lsn), Value::Blob(payload.to_vec())],
648 vec!["id".to_string(), "payload".to_string()],
649 )),
650 );
651 entity.created_at = timestamp;
652 entity.updated_at = timestamp;
653 entity.sequence_id = lsn;
654 ChangeRecord::from_entity(
655 lsn,
656 timestamp,
657 ChangeOperation::Insert,
658 "users",
659 "row",
660 &entity,
661 crate::api::REDDB_FORMAT_VERSION,
662 None,
663 )
664 }
665
666 fn delete_record(lsn: u64, collection: &str, entity_id: u64) -> ChangeRecord {
667 ChangeRecord {
668 term: crate::replication::DEFAULT_REPLICATION_TERM,
669 lsn,
670 timestamp: 100 + lsn,
671 operation: ChangeOperation::Delete,
672 collection: collection.to_string(),
673 entity_id,
674 entity_kind: "row".to_string(),
675 entity_bytes: None,
676 metadata: None,
677 refresh_records: None,
678 }
679 }
680
681 fn table_row_entity(id: u64) -> UnifiedEntity {
682 let mut entity = UnifiedEntity::new(
683 EntityId::new(id),
684 EntityKind::TableRow {
685 table: Arc::from("users"),
686 row_id: id,
687 },
688 EntityData::Row(RowData::with_names(
689 vec![Value::UnsignedInteger(id)],
690 vec!["id".to_string()],
691 )),
692 );
693 entity.created_at = 100 + id;
694 entity.updated_at = 100 + id;
695 entity.sequence_id = id;
696 entity
697 }
698
699 #[test]
703 fn delete_against_missing_collection_records_apply_miss() {
704 let (db, path) = open_db();
705 let metrics = ReplicaApplyMetrics::default();
706 let before = metrics.apply_miss_total.load(Ordering::Relaxed);
707
708 LogicalChangeApplier::apply_record_with_metrics(
709 &db,
710 &delete_record(1, "no_such_collection", 42),
711 ApplyMode::Replica,
712 &metrics,
713 )
714 .expect("missing-target delete is non-fatal");
715
716 assert_eq!(
717 metrics.apply_miss_total.load(Ordering::Relaxed),
718 before + 1,
719 "delete against a missing collection must bump the apply-miss signal"
720 );
721 let _ = std::fs::remove_file(path);
722 }
723
724 #[test]
727 fn delete_against_missing_entity_records_apply_miss() {
728 let (db, path) = open_db();
729 let _ = db.store().get_or_create_collection("users");
730 let metrics = ReplicaApplyMetrics::default();
731
732 LogicalChangeApplier::apply_record_with_metrics(
733 &db,
734 &delete_record(1, "users", 9999),
735 ApplyMode::Replica,
736 &metrics,
737 )
738 .expect("missing-entity delete is non-fatal");
739
740 assert_eq!(
741 metrics.apply_miss_total.load(Ordering::Relaxed),
742 1,
743 "delete of an absent entity must bump the apply-miss signal"
744 );
745 let _ = std::fs::remove_file(path);
746 }
747
748 #[test]
751 fn delete_of_present_target_records_no_apply_miss() {
752 let (db, path) = open_db();
753 let store = db.store();
754 let _ = store.get_or_create_collection("users");
755 let id = store
756 .insert_auto("users", table_row_entity(1))
757 .expect("insert entity");
758 let metrics = ReplicaApplyMetrics::default();
759
760 LogicalChangeApplier::apply_record_with_metrics(
761 &db,
762 &delete_record(1, "users", id.raw()),
763 ApplyMode::Replica,
764 &metrics,
765 )
766 .expect("present-target delete applies");
767
768 assert_eq!(
769 metrics.apply_miss_total.load(Ordering::Relaxed),
770 0,
771 "deleting a present target must not fire the apply-miss signal"
772 );
773 assert!(
774 store.get("users", id).is_none(),
775 "the entity must actually be removed on the normal path"
776 );
777 let _ = std::fs::remove_file(path);
778 }
779
780 #[test]
783 fn stateful_apply_surfaces_delete_miss_via_metrics_handle() {
784 let (db, path) = open_db();
785 let applier =
786 LogicalChangeApplier::with_metrics(0, Arc::new(ReplicaApplyMetrics::default()));
787
788 applier
789 .apply(&db, &delete_record(1, "ghost", 7), ApplyMode::Replica)
790 .expect("missing-target delete advances the chain");
791
792 assert_eq!(
793 applier.metrics().apply_miss_total.load(Ordering::Relaxed),
794 1,
795 "the applier's shared metrics handle must record the miss"
796 );
797 assert_eq!(
798 applier.last_applied_lsn(),
799 1,
800 "a non-fatal miss still advances the LSN chain"
801 );
802 let _ = std::fs::remove_file(path);
803 }
804
805 #[test]
806 fn apply_advances_on_monotonic_lsn() {
807 let (db, path) = open_db();
808 let applier = LogicalChangeApplier::new(0);
809 assert_eq!(
810 applier
811 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
812 .unwrap(),
813 ApplyOutcome::Applied
814 );
815 assert_eq!(applier.last_applied_lsn(), 1);
816 assert_eq!(
817 applier
818 .apply(&db, &record(2, b"b"), ApplyMode::Replica)
819 .unwrap(),
820 ApplyOutcome::Applied
821 );
822 assert_eq!(applier.last_applied_lsn(), 2);
823 let _ = std::fs::remove_file(path);
824 }
825
826 #[test]
827 fn apply_idempotent_on_duplicate_lsn_same_payload() {
828 let (db, path) = open_db();
829 let applier = LogicalChangeApplier::new(0);
830 let r = record(5, b"same");
831 applier.apply(&db, &r, ApplyMode::Replica).unwrap();
832 assert_eq!(
833 applier.apply(&db, &r, ApplyMode::Replica).unwrap(),
834 ApplyOutcome::Idempotent
835 );
836 assert_eq!(applier.last_applied_lsn(), 5);
837 let _ = std::fs::remove_file(path);
838 }
839
840 #[test]
841 fn apply_fails_closed_on_lsn_collision_diff_payload() {
842 let (db, path) = open_db();
843 let applier = LogicalChangeApplier::new(0);
844 applier
845 .apply(&db, &record(7, b"first"), ApplyMode::Replica)
846 .unwrap();
847 let err = applier
848 .apply(&db, &record(7, b"different"), ApplyMode::Replica)
849 .unwrap_err();
850 assert!(
851 matches!(err, LogicalApplyError::Divergence { lsn: 7, .. }),
852 "got {err:?}"
853 );
854 let _ = std::fs::remove_file(path);
855 }
856
857 #[test]
858 fn apply_fails_closed_on_same_lsn_different_term() {
859 let (db, path) = open_db();
860 let applier = LogicalChangeApplier::new(0);
861 applier
862 .apply(&db, &record(7, b"same").with_term(1), ApplyMode::Replica)
863 .unwrap();
864 let err = applier
865 .apply(&db, &record(7, b"same").with_term(2), ApplyMode::Replica)
866 .unwrap_err();
867 assert!(
868 matches!(
869 err,
870 LogicalApplyError::Divergence {
871 lsn: 7,
872 expected_term: 1,
873 got_term: 2,
874 ..
875 }
876 ),
877 "got {err:?}"
878 );
879 assert_eq!(applier.last_applied_term(), 1);
880 assert_eq!(applier.last_applied_lsn(), 7);
881 let _ = std::fs::remove_file(path);
882 }
883
884 #[test]
889 fn apply_fences_stale_term_record() {
890 let (db, path) = open_db();
891 let applier = LogicalChangeApplier::new(0);
892
893 applier
895 .apply(&db, &record(1, b"a").with_term(5), ApplyMode::Replica)
896 .unwrap();
897 assert_eq!(applier.last_applied_term(), 5);
898 assert_eq!(applier.last_applied_lsn(), 1);
899
900 let before = applier.metrics().fenced_total.load(Ordering::Relaxed);
902 let err = applier
903 .apply(&db, &record(2, b"b").with_term(4), ApplyMode::Replica)
904 .unwrap_err();
905 assert!(
906 matches!(
907 err,
908 LogicalApplyError::StaleTermFenced {
909 record_term: 4,
910 current_term: 5,
911 lsn: 2,
912 }
913 ),
914 "got {err:?}"
915 );
916 assert_eq!(err.kind(), ApplyErrorKind::Fenced);
917 assert_eq!(
918 applier.metrics().fenced_total.load(Ordering::Relaxed),
919 before + 1,
920 "the fence must leave a metrics trail"
921 );
922 assert_eq!(applier.last_applied_lsn(), 1, "watermark must not advance");
924 assert_eq!(applier.last_applied_term(), 5);
925 assert_eq!(
926 applier.received_frontier_lsn(),
927 1,
928 "a fenced record must not even advance the received frontier"
929 );
930 let _ = std::fs::remove_file(path);
931 }
932
933 #[test]
937 fn apply_admits_same_term_and_adopts_higher_term() {
938 let (db, path) = open_db();
939 let applier = LogicalChangeApplier::new(0);
940
941 applier
942 .apply(&db, &record(1, b"a").with_term(3), ApplyMode::Replica)
943 .unwrap();
944 applier
946 .apply(&db, &record(2, b"b").with_term(3), ApplyMode::Replica)
947 .unwrap();
948 assert_eq!(applier.last_applied_term(), 3);
949 applier
951 .apply(&db, &record(3, b"c").with_term(7), ApplyMode::Replica)
952 .unwrap();
953 assert_eq!(applier.last_applied_term(), 7);
954 assert_eq!(applier.last_applied_lsn(), 3);
955
956 let err = applier
958 .apply(&db, &record(4, b"d").with_term(3), ApplyMode::Replica)
959 .unwrap_err();
960 assert!(
961 matches!(err, LogicalApplyError::StaleTermFenced { .. }),
962 "got {err:?}"
963 );
964 let _ = std::fs::remove_file(path);
965 }
966
967 #[test]
968 fn apply_skips_older_lsn() {
969 let (db, path) = open_db();
970 let applier = LogicalChangeApplier::new(0);
971 applier
972 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
973 .unwrap();
974 applier
975 .apply(&db, &record(2, b"b"), ApplyMode::Replica)
976 .unwrap();
977 assert_eq!(
978 applier
979 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
980 .unwrap(),
981 ApplyOutcome::Skipped
982 );
983 assert_eq!(applier.last_applied_lsn(), 2);
984 let _ = std::fs::remove_file(path);
985 }
986
987 #[test]
988 fn apply_returns_gap_on_future_lsn() {
989 let (db, path) = open_db();
990 let applier = LogicalChangeApplier::new(0);
991 applier
992 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
993 .unwrap();
994 let err = applier
995 .apply(&db, &record(5, b"e"), ApplyMode::Replica)
996 .unwrap_err();
997 assert!(
998 matches!(err, LogicalApplyError::Gap { last: 1, next: 5 }),
999 "got {err:?}"
1000 );
1001 assert_eq!(applier.last_applied_lsn(), 1);
1002 let _ = std::fs::remove_file(path);
1003 }
1004}