1use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Condvar, Mutex};
5
6use crate::api::{RedDBError, RedDBResult};
7use crate::application::entity::metadata_from_json;
8use crate::replication::cdc::{
9 change_record_from_entity, wire_json_to_server_json, ChangeOperation, ChangeRecord,
10};
11use crate::storage::{EntityId, EntityKind, RedDB, UnifiedStore};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ApplyMode {
15 Replica,
16 Restore,
17}
18
19#[derive(Debug, Default)]
25pub struct ReplicaApplyMetrics {
26 pub gap_total: std::sync::atomic::AtomicU64,
27 pub divergence_total: std::sync::atomic::AtomicU64,
28 pub apply_error_total: std::sync::atomic::AtomicU64,
29 pub decode_error_total: std::sync::atomic::AtomicU64,
30 pub apply_miss_total: std::sync::atomic::AtomicU64,
36 pub fenced_total: std::sync::atomic::AtomicU64,
42}
43
44impl ReplicaApplyMetrics {
45 pub fn record(&self, kind: ApplyErrorKind) {
46 use std::sync::atomic::Ordering::Relaxed;
47 match kind {
48 ApplyErrorKind::Gap => {
49 self.gap_total.fetch_add(1, Relaxed);
50 }
51 ApplyErrorKind::Divergence => {
52 self.divergence_total.fetch_add(1, Relaxed);
53 }
54 ApplyErrorKind::Apply => {
55 self.apply_error_total.fetch_add(1, Relaxed);
56 }
57 ApplyErrorKind::Decode => {
58 self.decode_error_total.fetch_add(1, Relaxed);
59 }
60 ApplyErrorKind::Miss => {
61 self.apply_miss_total.fetch_add(1, Relaxed);
62 }
63 ApplyErrorKind::Fenced => {
64 self.fenced_total.fetch_add(1, Relaxed);
65 }
66 }
67 }
68
69 pub fn snapshot(&self) -> [(ApplyErrorKind, u64); 6] {
70 use std::sync::atomic::Ordering::Relaxed;
71 [
72 (ApplyErrorKind::Gap, self.gap_total.load(Relaxed)),
73 (
74 ApplyErrorKind::Divergence,
75 self.divergence_total.load(Relaxed),
76 ),
77 (ApplyErrorKind::Apply, self.apply_error_total.load(Relaxed)),
78 (
79 ApplyErrorKind::Decode,
80 self.decode_error_total.load(Relaxed),
81 ),
82 (ApplyErrorKind::Miss, self.apply_miss_total.load(Relaxed)),
83 (ApplyErrorKind::Fenced, self.fenced_total.load(Relaxed)),
84 ]
85 }
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum ApplyErrorKind {
90 Gap,
91 Divergence,
92 Apply,
93 Decode,
94 Miss,
97 Fenced,
100}
101
102impl ApplyErrorKind {
103 pub fn label(self) -> &'static str {
104 match self {
105 Self::Gap => "gap",
106 Self::Divergence => "divergence",
107 Self::Apply => "apply",
108 Self::Decode => "decode",
109 Self::Miss => "apply_miss",
110 Self::Fenced => "fenced",
111 }
112 }
113}
114
115impl LogicalApplyError {
116 pub fn kind(&self) -> ApplyErrorKind {
117 match self {
118 Self::Gap { .. } => ApplyErrorKind::Gap,
119 Self::Divergence { .. } => ApplyErrorKind::Divergence,
120 Self::Apply { .. } => ApplyErrorKind::Apply,
121 Self::StaleTermFenced { .. } => ApplyErrorKind::Fenced,
122 }
123 }
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub enum ApplyOutcome {
133 Applied,
135 Idempotent,
137 Skipped,
139}
140
141#[derive(Debug)]
142pub enum LogicalApplyError {
143 Gap {
144 last: u64,
145 next: u64,
146 },
147 Divergence {
148 expected_term: u64,
149 got_term: u64,
150 lsn: u64,
151 expected: String,
152 got: String,
153 },
154 Apply {
155 lsn: u64,
156 source: RedDBError,
157 },
158 StaleTermFenced {
163 record_term: u64,
164 current_term: u64,
165 lsn: u64,
166 },
167}
168
169impl std::fmt::Display for LogicalApplyError {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 match self {
172 Self::Gap { last, next } => write!(f, "LSN gap on apply: last={last} next={next}"),
173 Self::StaleTermFenced {
174 record_term,
175 current_term,
176 lsn,
177 } => write!(
178 f,
179 "stale-term record fenced at lsn={lsn}: record term {record_term} is behind current term {current_term}"
180 ),
181 Self::Divergence {
182 expected_term,
183 got_term,
184 lsn,
185 expected,
186 got,
187 } => write!(
188 f,
189 "LSN divergence on apply at term/lsn=({got_term},{lsn}): expected term {expected_term} payload hash {expected}, got {got}"
190 ),
191 Self::Apply { lsn, source } => write!(f, "apply error at lsn={lsn}: {source}"),
192 }
193 }
194}
195
196impl std::error::Error for LogicalApplyError {}
197
198#[derive(Debug, Clone, PartialEq, Eq)]
199pub enum BookmarkWaitError {
200 Timeout { target_lsn: u64, applied_lsn: u64 },
201 TermMismatch { target_term: u64, applied_term: u64 },
202}
203
204impl BookmarkWaitError {
205 pub fn is_timeout(&self) -> bool {
206 matches!(self, Self::Timeout { .. })
207 }
208}
209
210impl std::fmt::Display for BookmarkWaitError {
211 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212 match self {
213 Self::Timeout {
214 target_lsn,
215 applied_lsn,
216 } => write!(
217 f,
218 "timed out waiting for causal bookmark lsn {target_lsn}; applied={applied_lsn}"
219 ),
220 Self::TermMismatch {
221 target_term,
222 applied_term,
223 } => write!(
224 f,
225 "causal bookmark term mismatch: target={target_term} applied={applied_term}"
226 ),
227 }
228 }
229}
230
231impl std::error::Error for BookmarkWaitError {}
232
233pub struct LogicalChangeApplier {
238 last_applied_term: AtomicU64,
239 last_applied_lsn: AtomicU64,
240 received_frontier_lsn: AtomicU64,
241 last_payload_hash: Mutex<Option<[u8; 32]>>,
242 apply_wait: (Mutex<()>, Condvar),
243 metrics: std::sync::Arc<ReplicaApplyMetrics>,
248}
249
250impl LogicalChangeApplier {
251 pub fn new(starting_lsn: u64) -> Self {
256 Self::with_metrics(
257 starting_lsn,
258 std::sync::Arc::new(ReplicaApplyMetrics::default()),
259 )
260 }
261
262 pub fn with_metrics(starting_lsn: u64, metrics: std::sync::Arc<ReplicaApplyMetrics>) -> Self {
267 Self {
268 last_applied_term: AtomicU64::new(crate::replication::DEFAULT_REPLICATION_TERM),
269 last_applied_lsn: AtomicU64::new(starting_lsn),
270 received_frontier_lsn: AtomicU64::new(starting_lsn),
271 last_payload_hash: Mutex::new(None),
272 apply_wait: (Mutex::new(()), Condvar::new()),
273 metrics,
274 }
275 }
276
277 pub fn metrics(&self) -> &std::sync::Arc<ReplicaApplyMetrics> {
279 &self.metrics
280 }
281
282 pub fn last_applied_lsn(&self) -> u64 {
283 self.last_applied_lsn.load(Ordering::Acquire)
284 }
285
286 pub fn received_frontier_lsn(&self) -> u64 {
287 self.received_frontier_lsn.load(Ordering::Acquire)
288 }
289
290 pub fn last_applied_term(&self) -> u64 {
291 self.last_applied_term.load(Ordering::Acquire)
292 }
293
294 pub fn wait_for_bookmark(
295 &self,
296 bookmark: &crate::replication::CausalBookmark,
297 timeout: std::time::Duration,
298 ) -> Result<(), BookmarkWaitError> {
299 let deadline = std::time::Instant::now() + timeout;
300 let target_lsn = bookmark.commit_lsn();
301 let target_term = bookmark.term();
302
303 let mut guard = self.apply_wait.0.lock().expect("apply wait mutex");
304 loop {
305 let applied_lsn = self.last_applied_lsn();
306 let applied_term = self.last_applied_term();
307 if applied_lsn >= target_lsn {
308 if applied_term == target_term {
309 return Ok(());
310 }
311 return Err(BookmarkWaitError::TermMismatch {
312 target_term,
313 applied_term,
314 });
315 }
316
317 let now = std::time::Instant::now();
318 if now >= deadline {
319 return Err(BookmarkWaitError::Timeout {
320 target_lsn,
321 applied_lsn,
322 });
323 }
324 let remaining = deadline.saturating_duration_since(now);
325 let (next_guard, wait_result) = self
326 .apply_wait
327 .1
328 .wait_timeout(guard, remaining)
329 .expect("apply wait condvar");
330 guard = next_guard;
331 if wait_result.timed_out() {
332 return Err(BookmarkWaitError::Timeout {
333 target_lsn,
334 applied_lsn: self.last_applied_lsn(),
335 });
336 }
337 }
338 }
339
340 pub fn apply(
348 &self,
349 db: &RedDB,
350 record: &ChangeRecord,
351 mode: ApplyMode,
352 ) -> Result<ApplyOutcome, LogicalApplyError> {
353 let last = self.last_applied_lsn.load(Ordering::Acquire);
354 let last_term = self.last_applied_term.load(Ordering::Acquire);
355
356 if record.term < last_term {
365 self.metrics.record(ApplyErrorKind::Fenced);
366 return Err(LogicalApplyError::StaleTermFenced {
367 record_term: record.term,
368 current_term: last_term,
369 lsn: record.lsn,
370 });
371 }
372
373 let payload_hash = record_payload_hash(record);
374 self.received_frontier_lsn
375 .fetch_max(record.lsn, Ordering::AcqRel);
376
377 if last == 0 && record.lsn > 0 {
378 self.do_apply(db, record, mode)?;
379 self.last_applied_term.store(record.term, Ordering::Release);
380 self.last_applied_lsn.store(record.lsn, Ordering::Release);
381 *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
382 self.apply_wait.1.notify_all();
383 return Ok(ApplyOutcome::Applied);
384 }
385
386 if record.lsn == last {
387 let prior = *self.last_payload_hash.lock().expect("payload hash mutex");
388 return match prior {
389 Some(p) if p == payload_hash => Ok(ApplyOutcome::Idempotent),
390 Some(p) => Err(LogicalApplyError::Divergence {
391 expected_term: last_term,
392 got_term: record.term,
393 lsn: record.lsn,
394 expected: hex_digest(&p),
395 got: hex_digest(&payload_hash),
396 }),
397 None => Ok(ApplyOutcome::Idempotent),
398 };
399 }
400 if record.lsn < last {
401 return Ok(ApplyOutcome::Skipped);
402 }
403 if record.lsn > last + 1 {
404 return Err(LogicalApplyError::Gap {
405 last,
406 next: record.lsn,
407 });
408 }
409
410 self.do_apply(db, record, mode)?;
411 self.last_applied_term.store(record.term, Ordering::Release);
412 self.last_applied_lsn.store(record.lsn, Ordering::Release);
413 *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
414 self.apply_wait.1.notify_all();
415 Ok(ApplyOutcome::Applied)
416 }
417
418 fn do_apply(
419 &self,
420 db: &RedDB,
421 record: &ChangeRecord,
422 mode: ApplyMode,
423 ) -> Result<(), LogicalApplyError> {
424 Self::apply_record_with_metrics(db, record, mode, &self.metrics).map_err(|err| {
425 LogicalApplyError::Apply {
426 lsn: record.lsn,
427 source: err,
428 }
429 })
430 }
431
432 pub fn apply_record(db: &RedDB, record: &ChangeRecord, mode: ApplyMode) -> RedDBResult<()> {
439 Self::apply_record_with_metrics(db, record, mode, &ReplicaApplyMetrics::default())
440 }
441
442 pub fn apply_record_with_metrics(
451 db: &RedDB,
452 record: &ChangeRecord,
453 _mode: ApplyMode,
454 metrics: &ReplicaApplyMetrics,
455 ) -> RedDBResult<()> {
456 let store = db.store();
457 match record.operation {
458 ChangeOperation::Delete => {
459 match store.delete(&record.collection, EntityId::new(record.entity_id)) {
460 Ok(true) => {}
461 Ok(false) => {
462 metrics.record(ApplyErrorKind::Miss);
465 tracing::warn!(
466 target: "reddb::replication::apply",
467 lsn = record.lsn,
468 collection = %record.collection,
469 entity_id = record.entity_id,
470 "replica delete found no matching entity; recorded apply miss (non-fatal divergence signal)"
471 );
472 }
473 Err(crate::storage::StoreError::CollectionNotFound(name)) => {
474 metrics.record(ApplyErrorKind::Miss);
477 tracing::warn!(
478 target: "reddb::replication::apply",
479 lsn = record.lsn,
480 collection = %name,
481 entity_id = record.entity_id,
482 "replica delete against missing collection; recorded apply miss (non-fatal divergence signal)"
483 );
484 }
485 Err(err) => {
486 return Err(RedDBError::Internal(err.to_string()));
490 }
491 }
492 }
493 ChangeOperation::Refresh => {
494 let records = record.refresh_records.clone().ok_or_else(|| {
503 RedDBError::Internal(
504 "replication refresh record missing refresh_records payload".to_string(),
505 )
506 })?;
507 store
508 .refresh_collection_from_records(&record.collection, records)
509 .map_err(|err| RedDBError::Internal(err.to_string()))?;
510 }
511 ChangeOperation::Insert | ChangeOperation::Update => {
512 let Some(bytes) = &record.entity_bytes else {
513 return Err(RedDBError::Internal(
514 "replication record missing entity payload".to_string(),
515 ));
516 };
517 let entity = UnifiedStore::deserialize_entity(bytes, store.format_version())
518 .map_err(|err| RedDBError::Internal(err.to_string()))?;
519
520 if matches!(entity.kind, EntityKind::TableRow { .. }) {
539 let logical = entity.logical_id();
540 let new_id = entity.id;
541 let superseding_xid = if entity.xmin != 0 { entity.xmin } else { 1 };
542 let stale: Vec<_> = store
543 .table_row_versions_by_logical_id(&record.collection, logical)
544 .into_iter()
545 .filter(|version| version.id != new_id && version.xmax == 0)
546 .collect();
547 if !stale.is_empty() {
548 let manager = store
549 .get_collection(&record.collection)
550 .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
551 for mut version in stale {
552 version.set_xmax(superseding_xid);
553 manager
554 .update(version)
555 .map_err(|err| RedDBError::Internal(err.to_string()))?;
556 }
557 }
558 }
559
560 let exists = store
561 .get(&record.collection, EntityId::new(record.entity_id))
562 .is_some();
563 if exists {
564 let manager = store
565 .get_collection(&record.collection)
566 .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
567 manager
568 .update(entity.clone())
569 .map_err(|err| RedDBError::Internal(err.to_string()))?;
570 } else {
571 store
572 .insert_auto(&record.collection, entity.clone())
573 .map_err(|err| RedDBError::Internal(err.to_string()))?;
574 }
575 if let Some(metadata_json) = &record.metadata {
576 let metadata_json = wire_json_to_server_json(metadata_json);
577 let metadata = metadata_from_json(&metadata_json)
578 .map_err(|err| RedDBError::Internal(err.to_string()))?;
579 store
580 .set_metadata(&record.collection, entity.id, metadata)
581 .map_err(|err| RedDBError::Internal(err.to_string()))?;
582 }
583 store
584 .context_index()
585 .index_entity(&record.collection, &entity);
586 }
587 }
588 Ok(())
589 }
590}
591
592fn record_payload_hash(record: &ChangeRecord) -> [u8; 32] {
593 let mut hasher = crate::crypto::sha256::Sha256::new();
594 hasher.update(&record.term.to_le_bytes());
595 hasher.update(&record.lsn.to_le_bytes());
596 hasher.update(&[record.operation as u8]);
597 hasher.update(record.collection.as_bytes());
598 hasher.update(&record.entity_id.to_le_bytes());
599 if let Some(bytes) = &record.entity_bytes {
600 hasher.update(bytes);
601 }
602 if let Some(records) = &record.refresh_records {
606 hasher.update(&(records.len() as u64).to_le_bytes());
607 for r in records {
608 hasher.update(&(r.len() as u64).to_le_bytes());
609 hasher.update(r);
610 }
611 }
612 hasher.finalize()
613}
614
615fn hex_digest(bytes: &[u8; 32]) -> String {
616 crate::utils::to_hex(bytes)
617}
618
619#[cfg(test)]
620mod tests {
621 use super::*;
622 use crate::replication::cdc::ChangeOperation;
623 use crate::storage::schema::Value;
624 use crate::storage::{EntityData, EntityId, EntityKind, RedDB, RowData, UnifiedEntity};
625 use std::sync::Arc;
626
627 fn open_db() -> (RedDB, std::path::PathBuf) {
628 let path = std::env::temp_dir().join(format!(
629 "reddb_logical_apply_{}_{}",
630 std::process::id(),
631 std::time::SystemTime::now()
632 .duration_since(std::time::UNIX_EPOCH)
633 .unwrap()
634 .as_nanos()
635 ));
636 let _ = std::fs::remove_file(&path);
637 let db = RedDB::open(&path).unwrap();
638 (db, path)
639 }
640
641 fn record(lsn: u64, payload: &[u8]) -> ChangeRecord {
642 let timestamp = 100 + lsn;
643 let mut entity = UnifiedEntity::new(
644 EntityId::new(lsn),
645 EntityKind::TableRow {
646 table: Arc::from("users"),
647 row_id: lsn,
648 },
649 EntityData::Row(RowData::with_names(
650 vec![Value::UnsignedInteger(lsn), Value::Blob(payload.to_vec())],
651 vec!["id".to_string(), "payload".to_string()],
652 )),
653 );
654 entity.created_at = timestamp;
655 entity.updated_at = timestamp;
656 entity.sequence_id = lsn;
657 change_record_from_entity(
658 lsn,
659 timestamp,
660 ChangeOperation::Insert,
661 "users",
662 "row",
663 &entity,
664 crate::api::REDDB_FORMAT_VERSION,
665 None,
666 )
667 }
668
669 fn delete_record(lsn: u64, collection: &str, entity_id: u64) -> ChangeRecord {
670 ChangeRecord {
671 term: crate::replication::DEFAULT_REPLICATION_TERM,
672 lsn,
673 timestamp: 100 + lsn,
674 operation: ChangeOperation::Delete,
675 collection: collection.to_string(),
676 entity_id,
677 entity_kind: "row".to_string(),
678 entity_bytes: None,
679 metadata: None,
680 refresh_records: None,
681 }
682 }
683
684 fn table_row_entity(id: u64) -> UnifiedEntity {
685 let mut entity = UnifiedEntity::new(
686 EntityId::new(id),
687 EntityKind::TableRow {
688 table: Arc::from("users"),
689 row_id: id,
690 },
691 EntityData::Row(RowData::with_names(
692 vec![Value::UnsignedInteger(id)],
693 vec!["id".to_string()],
694 )),
695 );
696 entity.created_at = 100 + id;
697 entity.updated_at = 100 + id;
698 entity.sequence_id = id;
699 entity
700 }
701
702 #[test]
706 fn delete_against_missing_collection_records_apply_miss() {
707 let (db, path) = open_db();
708 let metrics = ReplicaApplyMetrics::default();
709 let before = metrics.apply_miss_total.load(Ordering::Relaxed);
710
711 LogicalChangeApplier::apply_record_with_metrics(
712 &db,
713 &delete_record(1, "no_such_collection", 42),
714 ApplyMode::Replica,
715 &metrics,
716 )
717 .expect("missing-target delete is non-fatal");
718
719 assert_eq!(
720 metrics.apply_miss_total.load(Ordering::Relaxed),
721 before + 1,
722 "delete against a missing collection must bump the apply-miss signal"
723 );
724 let _ = std::fs::remove_file(path);
725 }
726
727 #[test]
730 fn delete_against_missing_entity_records_apply_miss() {
731 let (db, path) = open_db();
732 let _ = db.store().get_or_create_collection("users");
733 let metrics = ReplicaApplyMetrics::default();
734
735 LogicalChangeApplier::apply_record_with_metrics(
736 &db,
737 &delete_record(1, "users", 9999),
738 ApplyMode::Replica,
739 &metrics,
740 )
741 .expect("missing-entity delete is non-fatal");
742
743 assert_eq!(
744 metrics.apply_miss_total.load(Ordering::Relaxed),
745 1,
746 "delete of an absent entity must bump the apply-miss signal"
747 );
748 let _ = std::fs::remove_file(path);
749 }
750
751 #[test]
754 fn delete_of_present_target_records_no_apply_miss() {
755 let (db, path) = open_db();
756 let store = db.store();
757 let _ = store.get_or_create_collection("users");
758 let id = store
759 .insert_auto("users", table_row_entity(1))
760 .expect("insert entity");
761 let metrics = ReplicaApplyMetrics::default();
762
763 LogicalChangeApplier::apply_record_with_metrics(
764 &db,
765 &delete_record(1, "users", id.raw()),
766 ApplyMode::Replica,
767 &metrics,
768 )
769 .expect("present-target delete applies");
770
771 assert_eq!(
772 metrics.apply_miss_total.load(Ordering::Relaxed),
773 0,
774 "deleting a present target must not fire the apply-miss signal"
775 );
776 assert!(
777 store.get("users", id).is_none(),
778 "the entity must actually be removed on the normal path"
779 );
780 let _ = std::fs::remove_file(path);
781 }
782
783 #[test]
786 fn stateful_apply_surfaces_delete_miss_via_metrics_handle() {
787 let (db, path) = open_db();
788 let applier =
789 LogicalChangeApplier::with_metrics(0, Arc::new(ReplicaApplyMetrics::default()));
790
791 applier
792 .apply(&db, &delete_record(1, "ghost", 7), ApplyMode::Replica)
793 .expect("missing-target delete advances the chain");
794
795 assert_eq!(
796 applier.metrics().apply_miss_total.load(Ordering::Relaxed),
797 1,
798 "the applier's shared metrics handle must record the miss"
799 );
800 assert_eq!(
801 applier.last_applied_lsn(),
802 1,
803 "a non-fatal miss still advances the LSN chain"
804 );
805 let _ = std::fs::remove_file(path);
806 }
807
808 #[test]
809 fn apply_advances_on_monotonic_lsn() {
810 let (db, path) = open_db();
811 let applier = LogicalChangeApplier::new(0);
812 assert_eq!(
813 applier
814 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
815 .unwrap(),
816 ApplyOutcome::Applied
817 );
818 assert_eq!(applier.last_applied_lsn(), 1);
819 assert_eq!(
820 applier
821 .apply(&db, &record(2, b"b"), ApplyMode::Replica)
822 .unwrap(),
823 ApplyOutcome::Applied
824 );
825 assert_eq!(applier.last_applied_lsn(), 2);
826 let _ = std::fs::remove_file(path);
827 }
828
829 #[test]
830 fn apply_idempotent_on_duplicate_lsn_same_payload() {
831 let (db, path) = open_db();
832 let applier = LogicalChangeApplier::new(0);
833 let r = record(5, b"same");
834 applier.apply(&db, &r, ApplyMode::Replica).unwrap();
835 assert_eq!(
836 applier.apply(&db, &r, ApplyMode::Replica).unwrap(),
837 ApplyOutcome::Idempotent
838 );
839 assert_eq!(applier.last_applied_lsn(), 5);
840 let _ = std::fs::remove_file(path);
841 }
842
843 #[test]
844 fn apply_fails_closed_on_lsn_collision_diff_payload() {
845 let (db, path) = open_db();
846 let applier = LogicalChangeApplier::new(0);
847 applier
848 .apply(&db, &record(7, b"first"), ApplyMode::Replica)
849 .unwrap();
850 let err = applier
851 .apply(&db, &record(7, b"different"), ApplyMode::Replica)
852 .unwrap_err();
853 assert!(
854 matches!(err, LogicalApplyError::Divergence { lsn: 7, .. }),
855 "got {err:?}"
856 );
857 let _ = std::fs::remove_file(path);
858 }
859
860 #[test]
861 fn apply_fails_closed_on_same_lsn_different_term() {
862 let (db, path) = open_db();
863 let applier = LogicalChangeApplier::new(0);
864 applier
865 .apply(&db, &record(7, b"same").with_term(1), ApplyMode::Replica)
866 .unwrap();
867 let err = applier
868 .apply(&db, &record(7, b"same").with_term(2), ApplyMode::Replica)
869 .unwrap_err();
870 assert!(
871 matches!(
872 err,
873 LogicalApplyError::Divergence {
874 lsn: 7,
875 expected_term: 1,
876 got_term: 2,
877 ..
878 }
879 ),
880 "got {err:?}"
881 );
882 assert_eq!(applier.last_applied_term(), 1);
883 assert_eq!(applier.last_applied_lsn(), 7);
884 let _ = std::fs::remove_file(path);
885 }
886
887 #[test]
892 fn apply_fences_stale_term_record() {
893 let (db, path) = open_db();
894 let applier = LogicalChangeApplier::new(0);
895
896 applier
898 .apply(&db, &record(1, b"a").with_term(5), ApplyMode::Replica)
899 .unwrap();
900 assert_eq!(applier.last_applied_term(), 5);
901 assert_eq!(applier.last_applied_lsn(), 1);
902
903 let before = applier.metrics().fenced_total.load(Ordering::Relaxed);
905 let err = applier
906 .apply(&db, &record(2, b"b").with_term(4), ApplyMode::Replica)
907 .unwrap_err();
908 assert!(
909 matches!(
910 err,
911 LogicalApplyError::StaleTermFenced {
912 record_term: 4,
913 current_term: 5,
914 lsn: 2,
915 }
916 ),
917 "got {err:?}"
918 );
919 assert_eq!(err.kind(), ApplyErrorKind::Fenced);
920 assert_eq!(
921 applier.metrics().fenced_total.load(Ordering::Relaxed),
922 before + 1,
923 "the fence must leave a metrics trail"
924 );
925 assert_eq!(applier.last_applied_lsn(), 1, "watermark must not advance");
927 assert_eq!(applier.last_applied_term(), 5);
928 assert_eq!(
929 applier.received_frontier_lsn(),
930 1,
931 "a fenced record must not even advance the received frontier"
932 );
933 let _ = std::fs::remove_file(path);
934 }
935
936 #[test]
940 fn apply_admits_same_term_and_adopts_higher_term() {
941 let (db, path) = open_db();
942 let applier = LogicalChangeApplier::new(0);
943
944 applier
945 .apply(&db, &record(1, b"a").with_term(3), ApplyMode::Replica)
946 .unwrap();
947 applier
949 .apply(&db, &record(2, b"b").with_term(3), ApplyMode::Replica)
950 .unwrap();
951 assert_eq!(applier.last_applied_term(), 3);
952 applier
954 .apply(&db, &record(3, b"c").with_term(7), ApplyMode::Replica)
955 .unwrap();
956 assert_eq!(applier.last_applied_term(), 7);
957 assert_eq!(applier.last_applied_lsn(), 3);
958
959 let err = applier
961 .apply(&db, &record(4, b"d").with_term(3), ApplyMode::Replica)
962 .unwrap_err();
963 assert!(
964 matches!(err, LogicalApplyError::StaleTermFenced { .. }),
965 "got {err:?}"
966 );
967 let _ = std::fs::remove_file(path);
968 }
969
970 #[test]
971 fn apply_skips_older_lsn() {
972 let (db, path) = open_db();
973 let applier = LogicalChangeApplier::new(0);
974 applier
975 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
976 .unwrap();
977 applier
978 .apply(&db, &record(2, b"b"), ApplyMode::Replica)
979 .unwrap();
980 assert_eq!(
981 applier
982 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
983 .unwrap(),
984 ApplyOutcome::Skipped
985 );
986 assert_eq!(applier.last_applied_lsn(), 2);
987 let _ = std::fs::remove_file(path);
988 }
989
990 #[test]
991 fn apply_returns_gap_on_future_lsn() {
992 let (db, path) = open_db();
993 let applier = LogicalChangeApplier::new(0);
994 applier
995 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
996 .unwrap();
997 let err = applier
998 .apply(&db, &record(5, b"e"), ApplyMode::Replica)
999 .unwrap_err();
1000 assert!(
1001 matches!(err, LogicalApplyError::Gap { last: 1, next: 5 }),
1002 "got {err:?}"
1003 );
1004 assert_eq!(applier.last_applied_lsn(), 1);
1005 let _ = std::fs::remove_file(path);
1006 }
1007}