use std::collections::BTreeMap;
use std::sync::{Mutex, RwLock};
use serde::{Deserialize, Serialize};
use crate::SearchError;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum DocumentOp {
Add {
doc_id: String,
content: String,
metadata: BTreeMap<String, String>,
},
Update {
doc_id: String,
content: String,
metadata: BTreeMap<String, String>,
},
Delete {
doc_id: String,
},
}
impl DocumentOp {
#[must_use]
pub fn doc_id(&self) -> &str {
match self {
Self::Add { doc_id, .. } | Self::Update { doc_id, .. } | Self::Delete { doc_id } => {
doc_id
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CommitEntry {
pub commit_seq: u64,
pub commit_id: String,
pub operations: Vec<DocumentOp>,
pub timestamp: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ReplayWatermark {
pub last_applied_seq: u64,
pub last_applied_at: u64,
pub total_commits_applied: u64,
pub total_ops_applied: u64,
}
impl ReplayWatermark {
#[must_use]
pub const fn empty() -> Self {
Self {
last_applied_seq: 0,
last_applied_at: 0,
total_commits_applied: 0,
total_ops_applied: 0,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum CommitOutcome {
Applied {
commit_seq: u64,
ops_applied: usize,
},
Skipped {
commit_seq: u64,
reason: SkipReason,
},
Failed {
commit_seq: u64,
error: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum SkipReason {
AlreadyApplied,
OutOfOrder {
expected: u64,
got: u64,
},
EmptyCommit,
}
pub trait ReplayConsumer: Send + Sync {
fn apply_add(
&self,
doc_id: &str,
content: &str,
metadata: &BTreeMap<String, String>,
) -> Result<(), SearchError>;
fn apply_update(
&self,
doc_id: &str,
content: &str,
metadata: &BTreeMap<String, String>,
) -> Result<(), SearchError>;
fn apply_delete(&self, doc_id: &str) -> Result<(), SearchError>;
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[allow(clippy::struct_excessive_bools)]
pub struct ReplayPolicy {
pub strict_ordering: bool,
pub skip_duplicates: bool,
pub skip_empty: bool,
pub stop_on_failure: bool,
}
impl Default for ReplayPolicy {
fn default() -> Self {
Self {
strict_ordering: true,
skip_duplicates: true,
skip_empty: true,
stop_on_failure: true,
}
}
}
pub struct CommitReplayEngine {
watermark: RwLock<ReplayWatermark>,
policy: ReplayPolicy,
apply_lock: Mutex<()>,
}
impl CommitReplayEngine {
#[must_use]
pub const fn new(policy: ReplayPolicy) -> Self {
Self {
watermark: RwLock::new(ReplayWatermark::empty()),
policy,
apply_lock: Mutex::new(()),
}
}
#[must_use]
pub const fn resume_from(watermark: ReplayWatermark, policy: ReplayPolicy) -> Self {
Self {
watermark: RwLock::new(watermark),
policy,
apply_lock: Mutex::new(()),
}
}
pub fn apply(
&self,
entry: &CommitEntry,
consumer: &dyn ReplayConsumer,
now_millis: u64,
) -> CommitOutcome {
let _guard = self
.apply_lock
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(skip) = self.check_skip(entry) {
return skip;
}
let mut ops_applied = 0usize;
for op in &entry.operations {
let result = match op {
DocumentOp::Add {
doc_id,
content,
metadata,
} => consumer.apply_add(doc_id, content, metadata),
DocumentOp::Update {
doc_id,
content,
metadata,
} => consumer.apply_update(doc_id, content, metadata),
DocumentOp::Delete { doc_id } => consumer.apply_delete(doc_id),
};
match result {
Ok(()) => ops_applied += 1,
Err(e) => {
return CommitOutcome::Failed {
commit_seq: entry.commit_seq,
error: e.to_string(),
};
}
}
}
self.advance_watermark(entry.commit_seq, now_millis, ops_applied);
CommitOutcome::Applied {
commit_seq: entry.commit_seq,
ops_applied,
}
}
pub fn replay_batch(
&self,
entries: &[CommitEntry],
consumer: &dyn ReplayConsumer,
now_millis: u64,
) -> Vec<CommitOutcome> {
let mut outcomes = Vec::with_capacity(entries.len());
for entry in entries {
let outcome = self.apply(entry, consumer, now_millis);
let failed = matches!(outcome, CommitOutcome::Failed { .. });
outcomes.push(outcome);
if failed && self.policy.stop_on_failure {
break;
}
}
outcomes
}
#[must_use]
pub fn watermark(&self) -> ReplayWatermark {
self.watermark
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
pub fn reset(&self) {
let mut wm = self
.watermark
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*wm = ReplayWatermark::empty();
}
fn check_skip(&self, entry: &CommitEntry) -> Option<CommitOutcome> {
let last_applied_seq = {
let wm = self
.watermark
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner);
wm.last_applied_seq
};
if entry.commit_seq <= last_applied_seq && self.policy.skip_duplicates {
return Some(CommitOutcome::Skipped {
commit_seq: entry.commit_seq,
reason: SkipReason::AlreadyApplied,
});
}
if self.policy.strict_ordering && last_applied_seq > 0 {
let expected = last_applied_seq + 1;
if entry.commit_seq != expected {
return Some(CommitOutcome::Skipped {
commit_seq: entry.commit_seq,
reason: SkipReason::OutOfOrder {
expected,
got: entry.commit_seq,
},
});
}
}
if entry.operations.is_empty() && self.policy.skip_empty {
return Some(CommitOutcome::Skipped {
commit_seq: entry.commit_seq,
reason: SkipReason::EmptyCommit,
});
}
None
}
fn advance_watermark(&self, commit_seq: u64, now_millis: u64, ops: usize) {
let mut wm = self
.watermark
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if commit_seq > wm.last_applied_seq {
wm.last_applied_seq = commit_seq;
}
wm.last_applied_at = now_millis;
wm.total_commits_applied += 1;
wm.total_ops_applied += ops as u64;
drop(wm);
}
}
impl Default for CommitReplayEngine {
fn default() -> Self {
Self::new(ReplayPolicy::default())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
struct RecordingConsumer {
log: Mutex<Vec<String>>,
}
impl RecordingConsumer {
fn new() -> Self {
Self {
log: Mutex::new(Vec::new()),
}
}
fn entries(&self) -> Vec<String> {
self.log.lock().unwrap().clone()
}
}
impl ReplayConsumer for RecordingConsumer {
fn apply_add(
&self,
doc_id: &str,
content: &str,
_metadata: &BTreeMap<String, String>,
) -> Result<(), SearchError> {
self.log
.lock()
.unwrap()
.push(format!("ADD:{doc_id}:{content}"));
Ok(())
}
fn apply_update(
&self,
doc_id: &str,
content: &str,
_metadata: &BTreeMap<String, String>,
) -> Result<(), SearchError> {
self.log
.lock()
.unwrap()
.push(format!("UPDATE:{doc_id}:{content}"));
Ok(())
}
fn apply_delete(&self, doc_id: &str) -> Result<(), SearchError> {
self.log.lock().unwrap().push(format!("DELETE:{doc_id}"));
Ok(())
}
}
struct FailingConsumer;
impl ReplayConsumer for FailingConsumer {
fn apply_add(
&self,
_doc_id: &str,
_content: &str,
_metadata: &BTreeMap<String, String>,
) -> Result<(), SearchError> {
Err(SearchError::SubsystemError {
subsystem: "replay",
source: "test failure".into(),
})
}
fn apply_update(
&self,
_doc_id: &str,
_content: &str,
_metadata: &BTreeMap<String, String>,
) -> Result<(), SearchError> {
Err(SearchError::SubsystemError {
subsystem: "replay",
source: "test failure".into(),
})
}
fn apply_delete(&self, _doc_id: &str) -> Result<(), SearchError> {
Err(SearchError::SubsystemError {
subsystem: "replay",
source: "test failure".into(),
})
}
}
fn make_commit(seq: u64, ops: Vec<DocumentOp>) -> CommitEntry {
CommitEntry {
commit_seq: seq,
commit_id: format!("commit-{seq}"),
operations: ops,
timestamp: 1_700_000_000_000 + seq * 1000,
}
}
fn add_op(doc_id: &str, content: &str) -> DocumentOp {
DocumentOp::Add {
doc_id: doc_id.into(),
content: content.into(),
metadata: BTreeMap::new(),
}
}
fn update_op(doc_id: &str, content: &str) -> DocumentOp {
DocumentOp::Update {
doc_id: doc_id.into(),
content: content.into(),
metadata: BTreeMap::new(),
}
}
fn delete_op(doc_id: &str) -> DocumentOp {
DocumentOp::Delete {
doc_id: doc_id.into(),
}
}
#[test]
fn empty_engine_has_zero_watermark() {
let engine = CommitReplayEngine::default();
let wm = engine.watermark();
assert_eq!(wm.last_applied_seq, 0);
assert_eq!(wm.total_commits_applied, 0);
assert_eq!(wm.total_ops_applied, 0);
}
#[test]
fn single_commit_apply() {
let engine = CommitReplayEngine::default();
let consumer = RecordingConsumer::new();
let commit = make_commit(1, vec![add_op("doc-1", "hello world")]);
let outcome = engine.apply(&commit, &consumer, 5000);
assert!(matches!(
outcome,
CommitOutcome::Applied {
commit_seq: 1,
ops_applied: 1
}
));
assert_eq!(consumer.entries(), vec!["ADD:doc-1:hello world"]);
assert_eq!(engine.watermark().last_applied_seq, 1);
assert_eq!(engine.watermark().total_commits_applied, 1);
}
#[test]
fn multi_op_commit() {
let engine = CommitReplayEngine::default();
let consumer = RecordingConsumer::new();
let commit = make_commit(
1,
vec![
add_op("doc-1", "first"),
add_op("doc-2", "second"),
update_op("doc-1", "first-updated"),
delete_op("doc-3"),
],
);
let outcome = engine.apply(&commit, &consumer, 5000);
assert!(matches!(
outcome,
CommitOutcome::Applied { ops_applied: 4, .. }
));
assert_eq!(engine.watermark().total_ops_applied, 4);
assert_eq!(consumer.entries().len(), 4);
}
#[test]
fn sequential_commits() {
let engine = CommitReplayEngine::default();
let consumer = RecordingConsumer::new();
let c1 = make_commit(1, vec![add_op("doc-1", "v1")]);
let c2 = make_commit(2, vec![update_op("doc-1", "v2")]);
let c3 = make_commit(3, vec![delete_op("doc-1")]);
engine.apply(&c1, &consumer, 1000);
engine.apply(&c2, &consumer, 2000);
engine.apply(&c3, &consumer, 3000);
assert_eq!(engine.watermark().last_applied_seq, 3);
assert_eq!(engine.watermark().total_commits_applied, 3);
assert_eq!(engine.watermark().total_ops_applied, 3);
assert_eq!(
consumer.entries(),
vec!["ADD:doc-1:v1", "UPDATE:doc-1:v2", "DELETE:doc-1"]
);
}
#[test]
fn skip_duplicate_commit() {
let engine = CommitReplayEngine::default();
let consumer = RecordingConsumer::new();
let c1 = make_commit(1, vec![add_op("doc-1", "v1")]);
engine.apply(&c1, &consumer, 1000);
let outcome = engine.apply(&c1, &consumer, 2000);
assert!(matches!(
outcome,
CommitOutcome::Skipped {
reason: SkipReason::AlreadyApplied,
..
}
));
assert_eq!(consumer.entries().len(), 1);
assert_eq!(engine.watermark().total_commits_applied, 1);
}
#[test]
fn strict_ordering_rejects_gap() {
let engine = CommitReplayEngine::default();
let consumer = RecordingConsumer::new();
let c1 = make_commit(1, vec![add_op("doc-1", "v1")]);
engine.apply(&c1, &consumer, 1000);
let c3 = make_commit(3, vec![add_op("doc-3", "v1")]);
let outcome = engine.apply(&c3, &consumer, 2000);
assert!(matches!(
outcome,
CommitOutcome::Skipped {
reason: SkipReason::OutOfOrder {
expected: 2,
got: 3,
},
..
}
));
}
#[test]
fn relaxed_ordering_allows_gap() {
let policy = ReplayPolicy {
strict_ordering: false,
..ReplayPolicy::default()
};
let engine = CommitReplayEngine::new(policy);
let consumer = RecordingConsumer::new();
let c1 = make_commit(1, vec![add_op("doc-1", "v1")]);
engine.apply(&c1, &consumer, 1000);
let c3 = make_commit(3, vec![add_op("doc-3", "v1")]);
let outcome = engine.apply(&c3, &consumer, 2000);
assert!(matches!(outcome, CommitOutcome::Applied { .. }));
assert_eq!(engine.watermark().last_applied_seq, 3);
}
#[test]
fn skip_empty_commit() {
let engine = CommitReplayEngine::default();
let consumer = RecordingConsumer::new();
let c1 = make_commit(1, vec![]);
let outcome = engine.apply(&c1, &consumer, 1000);
assert!(matches!(
outcome,
CommitOutcome::Skipped {
reason: SkipReason::EmptyCommit,
..
}
));
assert_eq!(engine.watermark().last_applied_seq, 0);
}
#[test]
fn allow_empty_commit_when_policy_permits() {
let policy = ReplayPolicy {
skip_empty: false,
..ReplayPolicy::default()
};
let engine = CommitReplayEngine::new(policy);
let consumer = RecordingConsumer::new();
let c1 = make_commit(1, vec![]);
let outcome = engine.apply(&c1, &consumer, 1000);
assert!(matches!(
outcome,
CommitOutcome::Applied { ops_applied: 0, .. }
));
assert_eq!(engine.watermark().last_applied_seq, 1);
}
#[test]
fn consumer_failure_returns_failed_outcome() {
let engine = CommitReplayEngine::default();
let consumer = FailingConsumer;
let c1 = make_commit(1, vec![add_op("doc-1", "v1")]);
let outcome = engine.apply(&c1, &consumer, 1000);
assert!(matches!(outcome, CommitOutcome::Failed { .. }));
assert_eq!(engine.watermark().last_applied_seq, 0);
}
#[test]
fn partial_failure_in_multi_op_commit() {
struct FailOnSecond {
count: Mutex<usize>,
}
impl ReplayConsumer for FailOnSecond {
fn apply_add(
&self,
_doc_id: &str,
_content: &str,
_metadata: &BTreeMap<String, String>,
) -> Result<(), SearchError> {
let mut c = self.count.lock().unwrap();
*c += 1;
if *c >= 2 {
Err(SearchError::SubsystemError {
subsystem: "replay",
source: "second op fails".into(),
})
} else {
Ok(())
}
}
fn apply_update(
&self,
_: &str,
_: &str,
_: &BTreeMap<String, String>,
) -> Result<(), SearchError> {
Ok(())
}
fn apply_delete(&self, _: &str) -> Result<(), SearchError> {
Ok(())
}
}
let engine = CommitReplayEngine::default();
let consumer = FailOnSecond {
count: Mutex::new(0),
};
let commit = make_commit(1, vec![add_op("doc-1", "ok"), add_op("doc-2", "will-fail")]);
let outcome = engine.apply(&commit, &consumer, 1000);
assert!(matches!(
outcome,
CommitOutcome::Failed { commit_seq: 1, .. }
));
assert_eq!(engine.watermark().last_applied_seq, 0);
}
#[test]
fn batch_replay_sequential() {
let engine = CommitReplayEngine::default();
let consumer = RecordingConsumer::new();
let entries = vec![
make_commit(1, vec![add_op("a", "1")]),
make_commit(2, vec![add_op("b", "2")]),
make_commit(3, vec![add_op("c", "3")]),
];
let outcomes = engine.replay_batch(&entries, &consumer, 5000);
assert_eq!(outcomes.len(), 3);
assert!(
outcomes
.iter()
.all(|o| matches!(o, CommitOutcome::Applied { .. }))
);
assert_eq!(engine.watermark().last_applied_seq, 3);
assert_eq!(engine.watermark().total_commits_applied, 3);
}
#[test]
fn batch_replay_stops_on_failure() {
let engine = CommitReplayEngine::default();
let consumer = FailingConsumer;
let entries = vec![
make_commit(1, vec![add_op("a", "1")]),
make_commit(2, vec![add_op("b", "2")]),
];
let outcomes = engine.replay_batch(&entries, &consumer, 5000);
assert_eq!(outcomes.len(), 1);
assert!(matches!(outcomes[0], CommitOutcome::Failed { .. }));
}
#[test]
fn batch_replay_continues_on_failure_when_policy_permits() {
let policy = ReplayPolicy {
stop_on_failure: false,
..ReplayPolicy::default()
};
let engine = CommitReplayEngine::new(policy);
let consumer = FailingConsumer;
let entries = vec![
make_commit(1, vec![add_op("a", "1")]),
make_commit(2, vec![add_op("b", "2")]),
];
let outcomes = engine.replay_batch(&entries, &consumer, 5000);
assert_eq!(outcomes.len(), 2);
}
#[test]
fn resume_from_watermark() {
let saved = ReplayWatermark {
last_applied_seq: 42,
last_applied_at: 1000,
total_commits_applied: 42,
total_ops_applied: 100,
};
let engine = CommitReplayEngine::resume_from(saved, ReplayPolicy::default());
let consumer = RecordingConsumer::new();
let old = make_commit(40, vec![add_op("old", "stale")]);
let outcome = engine.apply(&old, &consumer, 2000);
assert!(matches!(
outcome,
CommitOutcome::Skipped {
reason: SkipReason::AlreadyApplied,
..
}
));
let next = make_commit(43, vec![add_op("new", "fresh")]);
let outcome = engine.apply(&next, &consumer, 3000);
assert!(matches!(outcome, CommitOutcome::Applied { .. }));
assert_eq!(engine.watermark().last_applied_seq, 43);
assert_eq!(engine.watermark().total_commits_applied, 43);
}
#[test]
fn reset_clears_watermark() {
let engine = CommitReplayEngine::default();
let consumer = RecordingConsumer::new();
let c1 = make_commit(1, vec![add_op("doc-1", "v1")]);
engine.apply(&c1, &consumer, 1000);
assert_eq!(engine.watermark().last_applied_seq, 1);
engine.reset();
assert_eq!(engine.watermark().last_applied_seq, 0);
assert_eq!(engine.watermark().total_commits_applied, 0);
}
#[test]
fn document_op_doc_id_accessor() {
let add = add_op("id-add", "content");
assert_eq!(add.doc_id(), "id-add");
let upd = update_op("id-upd", "content");
assert_eq!(upd.doc_id(), "id-upd");
let del = delete_op("id-del");
assert_eq!(del.doc_id(), "id-del");
}
#[test]
fn watermark_serde_roundtrip() {
let wm = ReplayWatermark {
last_applied_seq: 99,
last_applied_at: 1_700_000_000_000,
total_commits_applied: 99,
total_ops_applied: 500,
};
let json = serde_json::to_string(&wm).expect("serialize");
let back: ReplayWatermark = serde_json::from_str(&json).expect("deserialize");
assert_eq!(wm, back);
}
#[test]
fn commit_entry_serde_roundtrip() {
let entry = make_commit(
1,
vec![
add_op("doc-1", "hello"),
update_op("doc-2", "world"),
delete_op("doc-3"),
],
);
let json = serde_json::to_string(&entry).expect("serialize");
let back: CommitEntry = serde_json::from_str(&json).expect("deserialize");
assert_eq!(entry, back);
}
#[test]
fn replay_policy_serde_roundtrip() {
let policy = ReplayPolicy {
strict_ordering: false,
skip_duplicates: true,
skip_empty: false,
stop_on_failure: true,
};
let json = serde_json::to_string(&policy).expect("serialize");
let back: ReplayPolicy = serde_json::from_str(&json).expect("deserialize");
assert_eq!(policy, back);
}
#[test]
fn first_commit_with_strict_ordering_accepts_any_seq() {
let engine = CommitReplayEngine::default();
let consumer = RecordingConsumer::new();
let c = make_commit(42, vec![add_op("doc-1", "v1")]);
let outcome = engine.apply(&c, &consumer, 1000);
assert!(matches!(outcome, CommitOutcome::Applied { .. }));
assert_eq!(engine.watermark().last_applied_seq, 42);
}
#[test]
fn document_op_debug_clone_eq() {
let add = add_op("d1", "content");
let cloned = add.clone();
assert_eq!(add, cloned);
let dbg = format!("{add:?}");
assert!(dbg.contains("Add"));
assert!(dbg.contains("d1"));
let upd = update_op("d2", "new");
assert_ne!(
add,
DocumentOp::Delete {
doc_id: "d1".into()
}
);
let dbg_upd = format!("{upd:?}");
assert!(dbg_upd.contains("Update"));
}
#[test]
fn document_op_serde_each_variant() {
let add = add_op("a", "content-a");
let json = serde_json::to_string(&add).unwrap();
let decoded: DocumentOp = serde_json::from_str(&json).unwrap();
assert_eq!(decoded, add);
let upd = update_op("b", "content-b");
let json = serde_json::to_string(&upd).unwrap();
let decoded: DocumentOp = serde_json::from_str(&json).unwrap();
assert_eq!(decoded, upd);
let del = delete_op("c");
let json = serde_json::to_string(&del).unwrap();
let decoded: DocumentOp = serde_json::from_str(&json).unwrap();
assert_eq!(decoded, del);
}
#[test]
fn document_op_with_metadata() {
let mut meta = BTreeMap::new();
meta.insert("lang".to_string(), "en".to_string());
meta.insert("source".to_string(), "test".to_string());
let op = DocumentOp::Add {
doc_id: "m1".into(),
content: "hello".into(),
metadata: meta.clone(),
};
assert_eq!(op.doc_id(), "m1");
let json = serde_json::to_string(&op).unwrap();
let decoded: DocumentOp = serde_json::from_str(&json).unwrap();
assert_eq!(decoded, op);
if let DocumentOp::Add { metadata: m, .. } = decoded {
assert_eq!(m.len(), 2);
assert_eq!(m["lang"], "en");
}
}
#[test]
fn commit_entry_debug_clone() {
let entry = make_commit(5, vec![add_op("d1", "text")]);
let cloned = entry.clone();
assert_eq!(entry, cloned);
let dbg = format!("{entry:?}");
assert!(dbg.contains("CommitEntry"));
assert!(dbg.contains("commit-5"));
}
#[test]
fn replay_watermark_empty_and_debug_clone() {
let wm = ReplayWatermark::empty();
assert_eq!(wm.last_applied_seq, 0);
assert_eq!(wm.last_applied_at, 0);
assert_eq!(wm.total_commits_applied, 0);
assert_eq!(wm.total_ops_applied, 0);
let cloned = wm.clone();
assert_eq!(wm, cloned);
let dbg = format!("{wm:?}");
assert!(dbg.contains("ReplayWatermark"));
}
#[test]
fn commit_outcome_applied_serde() {
let outcome = CommitOutcome::Applied {
commit_seq: 10,
ops_applied: 3,
};
let json = serde_json::to_string(&outcome).unwrap();
let decoded: CommitOutcome = serde_json::from_str(&json).unwrap();
assert_eq!(decoded, outcome);
}
#[test]
fn commit_outcome_skipped_serde() {
let outcome = CommitOutcome::Skipped {
commit_seq: 5,
reason: SkipReason::AlreadyApplied,
};
let json = serde_json::to_string(&outcome).unwrap();
let decoded: CommitOutcome = serde_json::from_str(&json).unwrap();
assert_eq!(decoded, outcome);
}
#[test]
fn commit_outcome_failed_serde() {
let outcome = CommitOutcome::Failed {
commit_seq: 7,
error: "something broke".into(),
};
let json = serde_json::to_string(&outcome).unwrap();
let decoded: CommitOutcome = serde_json::from_str(&json).unwrap();
assert_eq!(decoded, outcome);
}
#[test]
fn commit_outcome_debug_clone() {
let outcomes = vec![
CommitOutcome::Applied {
commit_seq: 1,
ops_applied: 2,
},
CommitOutcome::Skipped {
commit_seq: 2,
reason: SkipReason::EmptyCommit,
},
CommitOutcome::Failed {
commit_seq: 3,
error: "err".into(),
},
];
for o in &outcomes {
let cloned = o.clone();
assert_eq!(o, &cloned);
let dbg = format!("{o:?}");
assert!(!dbg.is_empty());
}
}
#[test]
fn skip_reason_all_variants_serde() {
let variants = vec![
SkipReason::AlreadyApplied,
SkipReason::OutOfOrder {
expected: 5,
got: 8,
},
SkipReason::EmptyCommit,
];
for v in &variants {
let json = serde_json::to_string(v).unwrap();
let decoded: SkipReason = serde_json::from_str(&json).unwrap();
assert_eq!(&decoded, v);
}
}
#[test]
fn skip_reason_debug_clone() {
let r = SkipReason::OutOfOrder {
expected: 2,
got: 5,
};
let cloned = r.clone();
assert_eq!(r, cloned);
let dbg = format!("{r:?}");
assert!(dbg.contains("OutOfOrder"));
}
#[test]
fn replay_policy_debug_default() {
let policy = ReplayPolicy::default();
assert!(policy.strict_ordering);
assert!(policy.skip_duplicates);
assert!(policy.skip_empty);
assert!(policy.stop_on_failure);
let dbg = format!("{policy:?}");
assert!(dbg.contains("ReplayPolicy"));
}
#[test]
fn watermark_timestamp_tracking() {
let engine = CommitReplayEngine::default();
let consumer = RecordingConsumer::new();
let c1 = make_commit(1, vec![add_op("d1", "v1")]);
engine.apply(&c1, &consumer, 42_000);
assert_eq!(engine.watermark().last_applied_at, 42_000);
let c2 = make_commit(2, vec![add_op("d2", "v2")]);
engine.apply(&c2, &consumer, 99_000);
assert_eq!(engine.watermark().last_applied_at, 99_000);
}
#[test]
fn batch_replay_empty_entries() {
let engine = CommitReplayEngine::default();
let consumer = RecordingConsumer::new();
let outcomes = engine.replay_batch(&[], &consumer, 1000);
assert!(outcomes.is_empty());
assert_eq!(engine.watermark().last_applied_seq, 0);
}
#[test]
fn batch_replay_mixed_skip_and_apply() {
let engine = CommitReplayEngine::default();
let consumer = RecordingConsumer::new();
let c1 = make_commit(1, vec![add_op("a", "1")]);
engine.apply(&c1, &consumer, 1000);
let batch = vec![
make_commit(1, vec![add_op("a", "1")]),
make_commit(2, vec![add_op("b", "2")]),
make_commit(3, vec![]),
];
let outcomes = engine.replay_batch(&batch, &consumer, 2000);
assert_eq!(outcomes.len(), 3);
assert!(matches!(
outcomes[0],
CommitOutcome::Skipped {
reason: SkipReason::AlreadyApplied,
..
}
));
assert!(matches!(outcomes[1], CommitOutcome::Applied { .. }));
assert!(matches!(
outcomes[2],
CommitOutcome::Skipped {
reason: SkipReason::EmptyCommit,
..
}
));
}
#[test]
fn reset_then_replay_full_cycle() {
let engine = CommitReplayEngine::default();
let consumer = RecordingConsumer::new();
let c1 = make_commit(1, vec![add_op("d1", "v1")]);
engine.apply(&c1, &consumer, 1000);
assert_eq!(engine.watermark().last_applied_seq, 1);
engine.reset();
assert_eq!(engine.watermark(), ReplayWatermark::empty());
let outcome = engine.apply(&c1, &consumer, 2000);
assert!(matches!(outcome, CommitOutcome::Applied { .. }));
assert_eq!(engine.watermark().last_applied_seq, 1);
assert_eq!(engine.watermark().total_commits_applied, 1);
}
#[test]
fn skip_duplicates_disabled_allows_reapply() {
let policy = ReplayPolicy {
skip_duplicates: false,
strict_ordering: false,
..ReplayPolicy::default()
};
let engine = CommitReplayEngine::new(policy);
let consumer = RecordingConsumer::new();
let c1 = make_commit(1, vec![add_op("d1", "v1")]);
engine.apply(&c1, &consumer, 1000);
let outcome = engine.apply(&c1, &consumer, 2000);
assert!(matches!(outcome, CommitOutcome::Applied { .. }));
assert_eq!(engine.watermark().total_commits_applied, 2);
assert_eq!(consumer.entries().len(), 2);
}
}