1use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Condvar, Mutex};
5
6use crate::api::{RedDBError, RedDBResult};
7use crate::application::entity::metadata_from_json;
8use crate::replication::cdc::{ChangeOperation, ChangeRecord};
9use crate::storage::{EntityId, EntityKind, RedDB, UnifiedStore};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ApplyMode {
13 Replica,
14 Restore,
15}
16
17#[derive(Debug, Default)]
23pub struct ReplicaApplyMetrics {
24 pub gap_total: std::sync::atomic::AtomicU64,
25 pub divergence_total: std::sync::atomic::AtomicU64,
26 pub apply_error_total: std::sync::atomic::AtomicU64,
27 pub decode_error_total: std::sync::atomic::AtomicU64,
28 pub apply_miss_total: std::sync::atomic::AtomicU64,
34}
35
36impl ReplicaApplyMetrics {
37 pub fn record(&self, kind: ApplyErrorKind) {
38 use std::sync::atomic::Ordering::Relaxed;
39 match kind {
40 ApplyErrorKind::Gap => {
41 self.gap_total.fetch_add(1, Relaxed);
42 }
43 ApplyErrorKind::Divergence => {
44 self.divergence_total.fetch_add(1, Relaxed);
45 }
46 ApplyErrorKind::Apply => {
47 self.apply_error_total.fetch_add(1, Relaxed);
48 }
49 ApplyErrorKind::Decode => {
50 self.decode_error_total.fetch_add(1, Relaxed);
51 }
52 ApplyErrorKind::Miss => {
53 self.apply_miss_total.fetch_add(1, Relaxed);
54 }
55 }
56 }
57
58 pub fn snapshot(&self) -> [(ApplyErrorKind, u64); 5] {
59 use std::sync::atomic::Ordering::Relaxed;
60 [
61 (ApplyErrorKind::Gap, self.gap_total.load(Relaxed)),
62 (
63 ApplyErrorKind::Divergence,
64 self.divergence_total.load(Relaxed),
65 ),
66 (ApplyErrorKind::Apply, self.apply_error_total.load(Relaxed)),
67 (
68 ApplyErrorKind::Decode,
69 self.decode_error_total.load(Relaxed),
70 ),
71 (ApplyErrorKind::Miss, self.apply_miss_total.load(Relaxed)),
72 ]
73 }
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77pub enum ApplyErrorKind {
78 Gap,
79 Divergence,
80 Apply,
81 Decode,
82 Miss,
85}
86
87impl ApplyErrorKind {
88 pub fn label(self) -> &'static str {
89 match self {
90 Self::Gap => "gap",
91 Self::Divergence => "divergence",
92 Self::Apply => "apply",
93 Self::Decode => "decode",
94 Self::Miss => "apply_miss",
95 }
96 }
97}
98
99impl LogicalApplyError {
100 pub fn kind(&self) -> ApplyErrorKind {
101 match self {
102 Self::Gap { .. } => ApplyErrorKind::Gap,
103 Self::Divergence { .. } => ApplyErrorKind::Divergence,
104 Self::Apply { .. } => ApplyErrorKind::Apply,
105 }
106 }
107}
108
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
115pub enum ApplyOutcome {
116 Applied,
118 Idempotent,
120 Skipped,
122}
123
124#[derive(Debug)]
125pub enum LogicalApplyError {
126 Gap {
127 last: u64,
128 next: u64,
129 },
130 Divergence {
131 expected_term: u64,
132 got_term: u64,
133 lsn: u64,
134 expected: String,
135 got: String,
136 },
137 Apply {
138 lsn: u64,
139 source: RedDBError,
140 },
141}
142
143impl std::fmt::Display for LogicalApplyError {
144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 match self {
146 Self::Gap { last, next } => write!(f, "LSN gap on apply: last={last} next={next}"),
147 Self::Divergence {
148 expected_term,
149 got_term,
150 lsn,
151 expected,
152 got,
153 } => write!(
154 f,
155 "LSN divergence on apply at term/lsn=({got_term},{lsn}): expected term {expected_term} payload hash {expected}, got {got}"
156 ),
157 Self::Apply { lsn, source } => write!(f, "apply error at lsn={lsn}: {source}"),
158 }
159 }
160}
161
162impl std::error::Error for LogicalApplyError {}
163
164#[derive(Debug, Clone, PartialEq, Eq)]
165pub enum BookmarkWaitError {
166 Timeout { target_lsn: u64, applied_lsn: u64 },
167 TermMismatch { target_term: u64, applied_term: u64 },
168}
169
170impl BookmarkWaitError {
171 pub fn is_timeout(&self) -> bool {
172 matches!(self, Self::Timeout { .. })
173 }
174}
175
176impl std::fmt::Display for BookmarkWaitError {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 match self {
179 Self::Timeout {
180 target_lsn,
181 applied_lsn,
182 } => write!(
183 f,
184 "timed out waiting for causal bookmark lsn {target_lsn}; applied={applied_lsn}"
185 ),
186 Self::TermMismatch {
187 target_term,
188 applied_term,
189 } => write!(
190 f,
191 "causal bookmark term mismatch: target={target_term} applied={applied_term}"
192 ),
193 }
194 }
195}
196
197impl std::error::Error for BookmarkWaitError {}
198
199pub struct LogicalChangeApplier {
204 last_applied_term: AtomicU64,
205 last_applied_lsn: AtomicU64,
206 received_frontier_lsn: AtomicU64,
207 last_payload_hash: Mutex<Option<[u8; 32]>>,
208 apply_wait: (Mutex<()>, Condvar),
209 metrics: std::sync::Arc<ReplicaApplyMetrics>,
214}
215
216impl LogicalChangeApplier {
217 pub fn new(starting_lsn: u64) -> Self {
222 Self::with_metrics(
223 starting_lsn,
224 std::sync::Arc::new(ReplicaApplyMetrics::default()),
225 )
226 }
227
228 pub fn with_metrics(starting_lsn: u64, metrics: std::sync::Arc<ReplicaApplyMetrics>) -> Self {
233 Self {
234 last_applied_term: AtomicU64::new(crate::replication::DEFAULT_REPLICATION_TERM),
235 last_applied_lsn: AtomicU64::new(starting_lsn),
236 received_frontier_lsn: AtomicU64::new(starting_lsn),
237 last_payload_hash: Mutex::new(None),
238 apply_wait: (Mutex::new(()), Condvar::new()),
239 metrics,
240 }
241 }
242
243 pub fn metrics(&self) -> &std::sync::Arc<ReplicaApplyMetrics> {
245 &self.metrics
246 }
247
248 pub fn last_applied_lsn(&self) -> u64 {
249 self.last_applied_lsn.load(Ordering::Acquire)
250 }
251
252 pub fn received_frontier_lsn(&self) -> u64 {
253 self.received_frontier_lsn.load(Ordering::Acquire)
254 }
255
256 pub fn last_applied_term(&self) -> u64 {
257 self.last_applied_term.load(Ordering::Acquire)
258 }
259
260 pub fn wait_for_bookmark(
261 &self,
262 bookmark: &crate::replication::CausalBookmark,
263 timeout: std::time::Duration,
264 ) -> Result<(), BookmarkWaitError> {
265 let deadline = std::time::Instant::now() + timeout;
266 let target_lsn = bookmark.commit_lsn();
267 let target_term = bookmark.term();
268
269 let mut guard = self.apply_wait.0.lock().expect("apply wait mutex");
270 loop {
271 let applied_lsn = self.last_applied_lsn();
272 let applied_term = self.last_applied_term();
273 if applied_lsn >= target_lsn {
274 if applied_term == target_term {
275 return Ok(());
276 }
277 return Err(BookmarkWaitError::TermMismatch {
278 target_term,
279 applied_term,
280 });
281 }
282
283 let now = std::time::Instant::now();
284 if now >= deadline {
285 return Err(BookmarkWaitError::Timeout {
286 target_lsn,
287 applied_lsn,
288 });
289 }
290 let remaining = deadline.saturating_duration_since(now);
291 let (next_guard, wait_result) = self
292 .apply_wait
293 .1
294 .wait_timeout(guard, remaining)
295 .expect("apply wait condvar");
296 guard = next_guard;
297 if wait_result.timed_out() {
298 return Err(BookmarkWaitError::Timeout {
299 target_lsn,
300 applied_lsn: self.last_applied_lsn(),
301 });
302 }
303 }
304 }
305
306 pub fn apply(
314 &self,
315 db: &RedDB,
316 record: &ChangeRecord,
317 mode: ApplyMode,
318 ) -> Result<ApplyOutcome, LogicalApplyError> {
319 let last = self.last_applied_lsn.load(Ordering::Acquire);
320 let last_term = self.last_applied_term.load(Ordering::Acquire);
321 let payload_hash = record_payload_hash(record);
322 self.received_frontier_lsn
323 .fetch_max(record.lsn, Ordering::AcqRel);
324
325 if last == 0 && record.lsn > 0 {
326 self.do_apply(db, record, mode)?;
327 self.last_applied_term.store(record.term, Ordering::Release);
328 self.last_applied_lsn.store(record.lsn, Ordering::Release);
329 *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
330 self.apply_wait.1.notify_all();
331 return Ok(ApplyOutcome::Applied);
332 }
333
334 if record.lsn == last {
335 let prior = *self.last_payload_hash.lock().expect("payload hash mutex");
336 return match prior {
337 Some(p) if p == payload_hash => Ok(ApplyOutcome::Idempotent),
338 Some(p) => Err(LogicalApplyError::Divergence {
339 expected_term: last_term,
340 got_term: record.term,
341 lsn: record.lsn,
342 expected: hex_digest(&p),
343 got: hex_digest(&payload_hash),
344 }),
345 None => Ok(ApplyOutcome::Idempotent),
346 };
347 }
348 if record.lsn < last {
349 return Ok(ApplyOutcome::Skipped);
350 }
351 if record.lsn > last + 1 {
352 return Err(LogicalApplyError::Gap {
353 last,
354 next: record.lsn,
355 });
356 }
357
358 self.do_apply(db, record, mode)?;
359 self.last_applied_term.store(record.term, Ordering::Release);
360 self.last_applied_lsn.store(record.lsn, Ordering::Release);
361 *self.last_payload_hash.lock().expect("payload hash mutex") = Some(payload_hash);
362 self.apply_wait.1.notify_all();
363 Ok(ApplyOutcome::Applied)
364 }
365
366 fn do_apply(
367 &self,
368 db: &RedDB,
369 record: &ChangeRecord,
370 mode: ApplyMode,
371 ) -> Result<(), LogicalApplyError> {
372 Self::apply_record_with_metrics(db, record, mode, &self.metrics).map_err(|err| {
373 LogicalApplyError::Apply {
374 lsn: record.lsn,
375 source: err,
376 }
377 })
378 }
379
380 pub fn apply_record(db: &RedDB, record: &ChangeRecord, mode: ApplyMode) -> RedDBResult<()> {
387 Self::apply_record_with_metrics(db, record, mode, &ReplicaApplyMetrics::default())
388 }
389
390 pub fn apply_record_with_metrics(
399 db: &RedDB,
400 record: &ChangeRecord,
401 _mode: ApplyMode,
402 metrics: &ReplicaApplyMetrics,
403 ) -> RedDBResult<()> {
404 let store = db.store();
405 match record.operation {
406 ChangeOperation::Delete => {
407 match store.delete(&record.collection, EntityId::new(record.entity_id)) {
408 Ok(true) => {}
409 Ok(false) => {
410 metrics.record(ApplyErrorKind::Miss);
413 tracing::warn!(
414 target: "reddb::replication::apply",
415 lsn = record.lsn,
416 collection = %record.collection,
417 entity_id = record.entity_id,
418 "replica delete found no matching entity; recorded apply miss (non-fatal divergence signal)"
419 );
420 }
421 Err(crate::storage::StoreError::CollectionNotFound(name)) => {
422 metrics.record(ApplyErrorKind::Miss);
425 tracing::warn!(
426 target: "reddb::replication::apply",
427 lsn = record.lsn,
428 collection = %name,
429 entity_id = record.entity_id,
430 "replica delete against missing collection; recorded apply miss (non-fatal divergence signal)"
431 );
432 }
433 Err(err) => {
434 return Err(RedDBError::Internal(err.to_string()));
438 }
439 }
440 }
441 ChangeOperation::Refresh => {
442 let records = record.refresh_records.clone().ok_or_else(|| {
451 RedDBError::Internal(
452 "replication refresh record missing refresh_records payload".to_string(),
453 )
454 })?;
455 store
456 .refresh_collection_from_records(&record.collection, records)
457 .map_err(|err| RedDBError::Internal(err.to_string()))?;
458 }
459 ChangeOperation::Insert | ChangeOperation::Update => {
460 let Some(bytes) = &record.entity_bytes else {
461 return Err(RedDBError::Internal(
462 "replication record missing entity payload".to_string(),
463 ));
464 };
465 let entity = UnifiedStore::deserialize_entity(bytes, store.format_version())
466 .map_err(|err| RedDBError::Internal(err.to_string()))?;
467
468 if matches!(entity.kind, EntityKind::TableRow { .. }) {
487 let logical = entity.logical_id();
488 let new_id = entity.id;
489 let superseding_xid = if entity.xmin != 0 { entity.xmin } else { 1 };
490 let stale: Vec<_> = store
491 .table_row_versions_by_logical_id(&record.collection, logical)
492 .into_iter()
493 .filter(|version| version.id != new_id && version.xmax == 0)
494 .collect();
495 if !stale.is_empty() {
496 let manager = store
497 .get_collection(&record.collection)
498 .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
499 for mut version in stale {
500 version.set_xmax(superseding_xid);
501 manager
502 .update(version)
503 .map_err(|err| RedDBError::Internal(err.to_string()))?;
504 }
505 }
506 }
507
508 let exists = store
509 .get(&record.collection, EntityId::new(record.entity_id))
510 .is_some();
511 if exists {
512 let manager = store
513 .get_collection(&record.collection)
514 .ok_or_else(|| RedDBError::NotFound(record.collection.clone()))?;
515 manager
516 .update(entity.clone())
517 .map_err(|err| RedDBError::Internal(err.to_string()))?;
518 } else {
519 store
520 .insert_auto(&record.collection, entity.clone())
521 .map_err(|err| RedDBError::Internal(err.to_string()))?;
522 }
523 if let Some(metadata_json) = &record.metadata {
524 let metadata = metadata_from_json(metadata_json)
525 .map_err(|err| RedDBError::Internal(err.to_string()))?;
526 store
527 .set_metadata(&record.collection, entity.id, metadata)
528 .map_err(|err| RedDBError::Internal(err.to_string()))?;
529 }
530 store
531 .context_index()
532 .index_entity(&record.collection, &entity);
533 }
534 }
535 Ok(())
536 }
537}
538
539fn record_payload_hash(record: &ChangeRecord) -> [u8; 32] {
540 let mut hasher = crate::crypto::sha256::Sha256::new();
541 hasher.update(&record.term.to_le_bytes());
542 hasher.update(&record.lsn.to_le_bytes());
543 hasher.update(&[record.operation as u8]);
544 hasher.update(record.collection.as_bytes());
545 hasher.update(&record.entity_id.to_le_bytes());
546 if let Some(bytes) = &record.entity_bytes {
547 hasher.update(bytes);
548 }
549 if let Some(records) = &record.refresh_records {
553 hasher.update(&(records.len() as u64).to_le_bytes());
554 for r in records {
555 hasher.update(&(r.len() as u64).to_le_bytes());
556 hasher.update(r);
557 }
558 }
559 hasher.finalize()
560}
561
562fn hex_digest(bytes: &[u8; 32]) -> String {
563 crate::utils::to_hex(bytes)
564}
565
566#[cfg(test)]
567mod tests {
568 use super::*;
569 use crate::replication::cdc::ChangeOperation;
570 use crate::storage::schema::Value;
571 use crate::storage::{EntityData, EntityId, EntityKind, RedDB, RowData, UnifiedEntity};
572 use std::sync::Arc;
573
574 fn open_db() -> (RedDB, std::path::PathBuf) {
575 let path = std::env::temp_dir().join(format!(
576 "reddb_logical_apply_{}_{}",
577 std::process::id(),
578 std::time::SystemTime::now()
579 .duration_since(std::time::UNIX_EPOCH)
580 .unwrap()
581 .as_nanos()
582 ));
583 let _ = std::fs::remove_file(&path);
584 let db = RedDB::open(&path).unwrap();
585 (db, path)
586 }
587
588 fn record(lsn: u64, payload: &[u8]) -> ChangeRecord {
589 let timestamp = 100 + lsn;
590 let mut entity = UnifiedEntity::new(
591 EntityId::new(lsn),
592 EntityKind::TableRow {
593 table: Arc::from("users"),
594 row_id: lsn,
595 },
596 EntityData::Row(RowData::with_names(
597 vec![Value::UnsignedInteger(lsn), Value::Blob(payload.to_vec())],
598 vec!["id".to_string(), "payload".to_string()],
599 )),
600 );
601 entity.created_at = timestamp;
602 entity.updated_at = timestamp;
603 entity.sequence_id = lsn;
604 ChangeRecord::from_entity(
605 lsn,
606 timestamp,
607 ChangeOperation::Insert,
608 "users",
609 "row",
610 &entity,
611 crate::api::REDDB_FORMAT_VERSION,
612 None,
613 )
614 }
615
616 fn delete_record(lsn: u64, collection: &str, entity_id: u64) -> ChangeRecord {
617 ChangeRecord {
618 term: crate::replication::DEFAULT_REPLICATION_TERM,
619 lsn,
620 timestamp: 100 + lsn,
621 operation: ChangeOperation::Delete,
622 collection: collection.to_string(),
623 entity_id,
624 entity_kind: "row".to_string(),
625 entity_bytes: None,
626 metadata: None,
627 refresh_records: None,
628 }
629 }
630
631 fn table_row_entity(id: u64) -> UnifiedEntity {
632 let mut entity = UnifiedEntity::new(
633 EntityId::new(id),
634 EntityKind::TableRow {
635 table: Arc::from("users"),
636 row_id: id,
637 },
638 EntityData::Row(RowData::with_names(
639 vec![Value::UnsignedInteger(id)],
640 vec!["id".to_string()],
641 )),
642 );
643 entity.created_at = 100 + id;
644 entity.updated_at = 100 + id;
645 entity.sequence_id = id;
646 entity
647 }
648
649 #[test]
653 fn delete_against_missing_collection_records_apply_miss() {
654 let (db, path) = open_db();
655 let metrics = ReplicaApplyMetrics::default();
656 let before = metrics.apply_miss_total.load(Ordering::Relaxed);
657
658 LogicalChangeApplier::apply_record_with_metrics(
659 &db,
660 &delete_record(1, "no_such_collection", 42),
661 ApplyMode::Replica,
662 &metrics,
663 )
664 .expect("missing-target delete is non-fatal");
665
666 assert_eq!(
667 metrics.apply_miss_total.load(Ordering::Relaxed),
668 before + 1,
669 "delete against a missing collection must bump the apply-miss signal"
670 );
671 let _ = std::fs::remove_file(path);
672 }
673
674 #[test]
677 fn delete_against_missing_entity_records_apply_miss() {
678 let (db, path) = open_db();
679 let _ = db.store().get_or_create_collection("users");
680 let metrics = ReplicaApplyMetrics::default();
681
682 LogicalChangeApplier::apply_record_with_metrics(
683 &db,
684 &delete_record(1, "users", 9999),
685 ApplyMode::Replica,
686 &metrics,
687 )
688 .expect("missing-entity delete is non-fatal");
689
690 assert_eq!(
691 metrics.apply_miss_total.load(Ordering::Relaxed),
692 1,
693 "delete of an absent entity must bump the apply-miss signal"
694 );
695 let _ = std::fs::remove_file(path);
696 }
697
698 #[test]
701 fn delete_of_present_target_records_no_apply_miss() {
702 let (db, path) = open_db();
703 let store = db.store();
704 let _ = store.get_or_create_collection("users");
705 let id = store
706 .insert_auto("users", table_row_entity(1))
707 .expect("insert entity");
708 let metrics = ReplicaApplyMetrics::default();
709
710 LogicalChangeApplier::apply_record_with_metrics(
711 &db,
712 &delete_record(1, "users", id.raw()),
713 ApplyMode::Replica,
714 &metrics,
715 )
716 .expect("present-target delete applies");
717
718 assert_eq!(
719 metrics.apply_miss_total.load(Ordering::Relaxed),
720 0,
721 "deleting a present target must not fire the apply-miss signal"
722 );
723 assert!(
724 store.get("users", id).is_none(),
725 "the entity must actually be removed on the normal path"
726 );
727 let _ = std::fs::remove_file(path);
728 }
729
730 #[test]
733 fn stateful_apply_surfaces_delete_miss_via_metrics_handle() {
734 let (db, path) = open_db();
735 let applier =
736 LogicalChangeApplier::with_metrics(0, Arc::new(ReplicaApplyMetrics::default()));
737
738 applier
739 .apply(&db, &delete_record(1, "ghost", 7), ApplyMode::Replica)
740 .expect("missing-target delete advances the chain");
741
742 assert_eq!(
743 applier.metrics().apply_miss_total.load(Ordering::Relaxed),
744 1,
745 "the applier's shared metrics handle must record the miss"
746 );
747 assert_eq!(
748 applier.last_applied_lsn(),
749 1,
750 "a non-fatal miss still advances the LSN chain"
751 );
752 let _ = std::fs::remove_file(path);
753 }
754
755 #[test]
756 fn apply_advances_on_monotonic_lsn() {
757 let (db, path) = open_db();
758 let applier = LogicalChangeApplier::new(0);
759 assert_eq!(
760 applier
761 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
762 .unwrap(),
763 ApplyOutcome::Applied
764 );
765 assert_eq!(applier.last_applied_lsn(), 1);
766 assert_eq!(
767 applier
768 .apply(&db, &record(2, b"b"), ApplyMode::Replica)
769 .unwrap(),
770 ApplyOutcome::Applied
771 );
772 assert_eq!(applier.last_applied_lsn(), 2);
773 let _ = std::fs::remove_file(path);
774 }
775
776 #[test]
777 fn apply_idempotent_on_duplicate_lsn_same_payload() {
778 let (db, path) = open_db();
779 let applier = LogicalChangeApplier::new(0);
780 let r = record(5, b"same");
781 applier.apply(&db, &r, ApplyMode::Replica).unwrap();
782 assert_eq!(
783 applier.apply(&db, &r, ApplyMode::Replica).unwrap(),
784 ApplyOutcome::Idempotent
785 );
786 assert_eq!(applier.last_applied_lsn(), 5);
787 let _ = std::fs::remove_file(path);
788 }
789
790 #[test]
791 fn apply_fails_closed_on_lsn_collision_diff_payload() {
792 let (db, path) = open_db();
793 let applier = LogicalChangeApplier::new(0);
794 applier
795 .apply(&db, &record(7, b"first"), ApplyMode::Replica)
796 .unwrap();
797 let err = applier
798 .apply(&db, &record(7, b"different"), ApplyMode::Replica)
799 .unwrap_err();
800 assert!(
801 matches!(err, LogicalApplyError::Divergence { lsn: 7, .. }),
802 "got {err:?}"
803 );
804 let _ = std::fs::remove_file(path);
805 }
806
807 #[test]
808 fn apply_fails_closed_on_same_lsn_different_term() {
809 let (db, path) = open_db();
810 let applier = LogicalChangeApplier::new(0);
811 applier
812 .apply(&db, &record(7, b"same").with_term(1), ApplyMode::Replica)
813 .unwrap();
814 let err = applier
815 .apply(&db, &record(7, b"same").with_term(2), ApplyMode::Replica)
816 .unwrap_err();
817 assert!(
818 matches!(
819 err,
820 LogicalApplyError::Divergence {
821 lsn: 7,
822 expected_term: 1,
823 got_term: 2,
824 ..
825 }
826 ),
827 "got {err:?}"
828 );
829 assert_eq!(applier.last_applied_term(), 1);
830 assert_eq!(applier.last_applied_lsn(), 7);
831 let _ = std::fs::remove_file(path);
832 }
833
834 #[test]
835 fn apply_skips_older_lsn() {
836 let (db, path) = open_db();
837 let applier = LogicalChangeApplier::new(0);
838 applier
839 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
840 .unwrap();
841 applier
842 .apply(&db, &record(2, b"b"), ApplyMode::Replica)
843 .unwrap();
844 assert_eq!(
845 applier
846 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
847 .unwrap(),
848 ApplyOutcome::Skipped
849 );
850 assert_eq!(applier.last_applied_lsn(), 2);
851 let _ = std::fs::remove_file(path);
852 }
853
854 #[test]
855 fn apply_returns_gap_on_future_lsn() {
856 let (db, path) = open_db();
857 let applier = LogicalChangeApplier::new(0);
858 applier
859 .apply(&db, &record(1, b"a"), ApplyMode::Replica)
860 .unwrap();
861 let err = applier
862 .apply(&db, &record(5, b"e"), ApplyMode::Replica)
863 .unwrap_err();
864 assert!(
865 matches!(err, LogicalApplyError::Gap { last: 1, next: 5 }),
866 "got {err:?}"
867 );
868 assert_eq!(applier.last_applied_lsn(), 1);
869 let _ = std::fs::remove_file(path);
870 }
871}