#![allow(clippy::unwrap_used)]
use std::collections::BTreeMap;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
use async_trait::async_trait;
use futures::stream;
use futures::stream::BoxStream;
use queue_channel::channel;
use queue_core::Producer;
use schema_core::{ColumnName, GenericValue, IndexName, TableName};
use sources_core::RowKey;
use sources_core::SnapshotTable;
use sources_core::cdc::{Ack, AckSink, Change, ChangeEvent};
use sources_core::document::{Document, DocumentId};
use super::*;
use crate::pipeline::{Pipeline, work};
#[derive(Debug)]
struct MockSource {
changes: Mutex<Option<Vec<Change>>>,
}
#[async_trait]
impl ChangeCapture for MockSource {
async fn live(&self) -> sources_core::Result<BoxStream<'static, sources_core::Result<Change>>> {
let changes = self.changes.lock().unwrap().take().unwrap_or_default();
Ok(Box::pin(stream::iter(
changes
.into_iter()
.map(Ok::<Change, sources_core::SourceError>),
)))
}
}
#[derive(Debug)]
struct CountingAck(Arc<AtomicU64>);
impl AckSink for CountingAck {
fn confirm(&self, _seq: u64) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
#[derive(Debug)]
struct MockDocuments;
#[async_trait]
impl DocumentBuilder for MockDocuments {
async fn resolve(
&self,
_table: &TableName,
key: &RowKey,
) -> sources_core::Result<Vec<DocumentId>> {
Ok(vec![DocumentId {
index: IndexName::try_new("users").unwrap(),
key: key.clone(),
}])
}
async fn build(&self, id: &DocumentId) -> sources_core::Result<Document> {
let deleted = matches!(id.key.0.first(), Some((_, GenericValue::Int(2))));
Ok(if deleted {
Document::Delete { id: id.clone() }
} else {
Document::Upsert {
id: id.clone(),
body: GenericValue::Map(Default::default()),
}
})
}
fn backfill_scopes(&self) -> Vec<sources_core::document::IndexScope> {
vec![sources_core::document::IndexScope {
index: IndexName::try_new("users").unwrap(),
root: SnapshotTable {
db_schema: schema_core::DatabaseSchema::try_new("public").unwrap(),
table: TableName::try_new("users").unwrap(),
},
}]
}
}
#[derive(Debug, Default)]
struct RecordingSink {
ops: Arc<Mutex<Vec<String>>>,
}
#[async_trait]
impl Sink for RecordingSink {
async fn upsert(
&self,
index: &IndexName,
id: &str,
_document: &GenericValue,
) -> sinks_core::Result<()> {
self.ops
.lock()
.unwrap()
.push(format!("upsert {} {id}", index.as_ref()));
Ok(())
}
async fn delete(&self, index: &IndexName, id: &str) -> sinks_core::Result<()> {
self.ops
.lock()
.unwrap()
.push(format!("delete {} {id}", index.as_ref()));
Ok(())
}
async fn flush(&self, _caught_up: bool) -> sinks_core::Result<sinks_core::FlushReport> {
Ok(sinks_core::FlushReport::clean())
}
}
fn upsert_change(id: i64, seq: u64, acks: &Arc<AtomicU64>) -> Change {
row_change(false, id, seq, acks)
}
fn delete_change(id: i64, seq: u64, acks: &Arc<AtomicU64>) -> Change {
row_change(true, id, seq, acks)
}
fn row_change(delete: bool, id: i64, seq: u64, acks: &Arc<AtomicU64>) -> Change {
let table = TableName::try_new("users").unwrap();
let key = RowKey(vec![(
ColumnName::try_new("id").unwrap(),
GenericValue::Int(id),
)]);
let event = if delete {
ChangeEvent::Delete { table, key }
} else {
ChangeEvent::Upsert { table, key }
};
Change {
event,
ack: Ack::new(seq, Arc::new(CountingAck(Arc::clone(acks)))),
}
}
#[tokio::test]
async fn drives_changes_to_the_sink_and_acks_each() {
let acks = Arc::new(AtomicU64::new(0));
let ops = Arc::new(Mutex::new(Vec::new()));
let changes = vec![upsert_change(1, 0, &acks), delete_change(2, 1, &acks)];
let engine = Engine::new(
Arc::new(MockSource {
changes: Mutex::new(Some(changes)),
}),
Arc::new(MockDocuments),
Arc::new(RecordingSink {
ops: Arc::clone(&ops),
}),
);
engine.run().await.unwrap();
assert_eq!(
*ops.lock().unwrap(),
vec!["upsert users 1".to_owned(), "delete users 2".to_owned()]
);
assert_eq!(acks.load(Ordering::SeqCst), 2);
}
#[derive(Debug, Default)]
struct FlushLogSink {
ops: Arc<Mutex<Vec<String>>>,
}
#[async_trait]
impl Sink for FlushLogSink {
async fn upsert(
&self,
index: &IndexName,
id: &str,
_document: &GenericValue,
) -> sinks_core::Result<()> {
self.ops
.lock()
.unwrap()
.push(format!("upsert {} {id}", index.as_ref()));
Ok(())
}
async fn delete(&self, index: &IndexName, id: &str) -> sinks_core::Result<()> {
self.ops
.lock()
.unwrap()
.push(format!("delete {} {id}", index.as_ref()));
Ok(())
}
async fn flush(&self, _caught_up: bool) -> sinks_core::Result<sinks_core::FlushReport> {
self.ops.lock().unwrap().push("flush".to_owned());
Ok(sinks_core::FlushReport::clean())
}
}
#[tokio::test]
async fn batches_changes_into_a_single_flush() {
let acks = Arc::new(AtomicU64::new(0));
let ops = Arc::new(Mutex::new(Vec::new()));
let changes = (0..5)
.map(|i| upsert_change(10 + i as i64, i, &acks))
.collect::<Vec<_>>();
Engine::new(
Arc::new(MockSource {
changes: Mutex::new(Some(changes)),
}),
Arc::new(MockDocuments),
Arc::new(FlushLogSink {
ops: Arc::clone(&ops),
}),
)
.with_batch(BatchPolicy {
max_changes: 256,
max_delay: Duration::from_secs(10),
})
.skip_backfill(true)
.run()
.await
.unwrap();
assert_eq!(
*ops.lock().unwrap(),
vec![
"upsert users 10".to_owned(),
"upsert users 11".to_owned(),
"upsert users 12".to_owned(),
"upsert users 13".to_owned(),
"upsert users 14".to_owned(),
"flush".to_owned(),
],
"all five changes batch into exactly one flush, after every upsert",
);
assert_eq!(
acks.load(Ordering::SeqCst),
5,
"the whole batch is confirmed"
);
}
#[derive(Debug)]
struct CountingBuilder {
builds: Arc<AtomicU64>,
}
#[async_trait]
impl DocumentBuilder for CountingBuilder {
async fn resolve(
&self,
_table: &TableName,
_key: &RowKey,
) -> sources_core::Result<Vec<DocumentId>> {
Ok(vec![DocumentId {
index: IndexName::try_new("users").unwrap(),
key: RowKey(vec![(
ColumnName::try_new("id").unwrap(),
GenericValue::Int(1),
)]),
}])
}
async fn build(&self, id: &DocumentId) -> sources_core::Result<Document> {
self.builds.fetch_add(1, Ordering::SeqCst);
Ok(Document::Upsert {
id: id.clone(),
body: GenericValue::Map(Default::default()),
})
}
}
#[tokio::test]
async fn builds_a_repeatedly_touched_document_once_per_batch() {
let acks = Arc::new(AtomicU64::new(0));
let builds = Arc::new(AtomicU64::new(0));
let ops = Arc::new(Mutex::new(Vec::new()));
let changes = (0..3)
.map(|i| upsert_change(100 + i as i64, i, &acks))
.collect::<Vec<_>>();
Engine::new(
Arc::new(MockSource {
changes: Mutex::new(Some(changes)),
}),
Arc::new(CountingBuilder {
builds: Arc::clone(&builds),
}),
Arc::new(FlushLogSink {
ops: Arc::clone(&ops),
}),
)
.with_batch(BatchPolicy {
max_changes: 256,
max_delay: Duration::from_secs(10),
})
.skip_backfill(true)
.run()
.await
.unwrap();
assert_eq!(
builds.load(Ordering::SeqCst),
1,
"the document is assembled once despite three changes touching it"
);
assert_eq!(
*ops.lock().unwrap(),
vec!["upsert users 1".to_owned(), "flush".to_owned()],
"one upsert, one flush",
);
assert_eq!(acks.load(Ordering::SeqCst), 3);
}
#[derive(Debug)]
struct FlushCountSink {
flushes: Arc<AtomicU64>,
}
#[async_trait]
impl Sink for FlushCountSink {
async fn upsert(
&self,
_index: &IndexName,
_id: &str,
_document: &GenericValue,
) -> sinks_core::Result<()> {
Ok(())
}
async fn delete(&self, _index: &IndexName, _id: &str) -> sinks_core::Result<()> {
Ok(())
}
async fn flush(&self, _caught_up: bool) -> sinks_core::Result<sinks_core::FlushReport> {
self.flushes.fetch_add(1, Ordering::SeqCst);
Ok(sinks_core::FlushReport::clean())
}
}
#[derive(Debug)]
struct OrderingAck {
flushes: Arc<AtomicU64>,
observed: Arc<Mutex<BTreeMap<u64, u64>>>,
}
impl AckSink for OrderingAck {
fn confirm(&self, seq: u64) {
let flushes_so_far = self.flushes.load(Ordering::SeqCst);
self.observed.lock().unwrap().insert(seq, flushes_so_far);
}
}
fn ordering_change(
seq: u64,
flushes: &Arc<AtomicU64>,
observed: &Arc<Mutex<BTreeMap<u64, u64>>>,
) -> Change {
let table = TableName::try_new("users").unwrap();
let key = RowKey(vec![(
ColumnName::try_new("id").unwrap(),
GenericValue::Int(seq as i64 + 100),
)]);
Change {
event: ChangeEvent::Upsert { table, key },
ack: Ack::new(
seq,
Arc::new(OrderingAck {
flushes: Arc::clone(flushes),
observed: Arc::clone(observed),
}),
),
}
}
#[tokio::test]
async fn confirms_no_ack_before_its_flush() {
let flushes = Arc::new(AtomicU64::new(0));
let observed = Arc::new(Mutex::new(BTreeMap::new()));
let changes = (0..4)
.map(|seq| ordering_change(seq, &flushes, &observed))
.collect::<Vec<_>>();
Engine::new(
Arc::new(MockSource {
changes: Mutex::new(Some(changes)),
}),
Arc::new(MockDocuments),
Arc::new(FlushCountSink {
flushes: Arc::clone(&flushes),
}),
)
.with_batch(BatchPolicy {
max_changes: 2,
max_delay: Duration::from_secs(10),
})
.skip_backfill(true)
.run()
.await
.unwrap();
assert_eq!(
flushes.load(Ordering::SeqCst),
2,
"four changes → two flushes of two"
);
let observed = observed.lock().unwrap();
assert_eq!(observed.get(&0), Some(&1), "seq 0 confirmed after flush 1");
assert_eq!(observed.get(&1), Some(&1), "seq 1 confirmed after flush 1");
assert_eq!(observed.get(&2), Some(&2), "seq 2 confirmed after flush 2");
assert_eq!(observed.get(&3), Some(&2), "seq 3 confirmed after flush 2");
}
#[derive(Debug)]
struct SeedSource {
rows: Mutex<Option<Vec<Change>>>,
called: Arc<AtomicBool>,
tables: Arc<Mutex<Vec<SnapshotTable>>>,
}
impl SeedSource {
fn new(rows: Vec<Change>) -> Self {
Self {
rows: Mutex::new(Some(rows)),
called: Arc::new(AtomicBool::new(false)),
tables: Arc::new(Mutex::new(Vec::new())),
}
}
}
#[async_trait]
impl ChangeCapture for SeedSource {
async fn live(&self) -> sources_core::Result<BoxStream<'static, sources_core::Result<Change>>> {
Ok(Box::pin(stream::empty()))
}
async fn snapshot(
&self,
tables: &[SnapshotTable],
) -> sources_core::Result<BoxStream<'static, sources_core::Result<Change>>> {
self.called.store(true, Ordering::SeqCst);
*self.tables.lock().unwrap() = tables.to_vec();
let rows = self.rows.lock().unwrap().take().unwrap_or_default();
Ok(Box::pin(stream::iter(
rows.into_iter()
.map(Ok::<Change, sources_core::SourceError>),
)))
}
}
#[derive(Debug)]
struct SeedSink {
seeded: bool,
marked: Arc<Mutex<Vec<String>>>,
ops: Arc<Mutex<Vec<String>>>,
}
#[async_trait]
impl Sink for SeedSink {
async fn upsert(
&self,
index: &IndexName,
id: &str,
_document: &GenericValue,
) -> sinks_core::Result<()> {
self.ops
.lock()
.unwrap()
.push(format!("upsert {} {id}", index.as_ref()));
Ok(())
}
async fn delete(&self, index: &IndexName, id: &str) -> sinks_core::Result<()> {
self.ops
.lock()
.unwrap()
.push(format!("delete {} {id}", index.as_ref()));
Ok(())
}
async fn flush(&self, _caught_up: bool) -> sinks_core::Result<sinks_core::FlushReport> {
Ok(sinks_core::FlushReport::clean())
}
async fn is_seeded(&self, _index: &IndexName) -> sinks_core::Result<bool> {
Ok(self.seeded)
}
async fn mark_seeded(&self, index: &IndexName) -> sinks_core::Result<()> {
self.marked.lock().unwrap().push(index.as_ref().to_owned());
Ok(())
}
}
#[tokio::test]
async fn seeds_an_unseeded_index_then_marks_it() {
let acks = Arc::new(AtomicU64::new(0));
let source = SeedSource::new(vec![upsert_change(1, 0, &acks), upsert_change(3, 1, &acks)]);
let called = Arc::clone(&source.called);
let tables = Arc::clone(&source.tables);
let ops = Arc::new(Mutex::new(Vec::new()));
let marked = Arc::new(Mutex::new(Vec::new()));
let sink = Arc::new(SeedSink {
seeded: false,
marked: Arc::clone(&marked),
ops: Arc::clone(&ops),
});
Engine::new(Arc::new(source), Arc::new(MockDocuments), sink)
.run()
.await
.unwrap();
assert!(
called.load(Ordering::SeqCst),
"snapshot should be requested"
);
let tables = tables.lock().unwrap();
assert_eq!(tables.len(), 1);
assert_eq!(tables.first().unwrap().table.as_ref(), "users");
assert_eq!(
*ops.lock().unwrap(),
vec!["upsert users 1".to_owned(), "upsert users 3".to_owned()]
);
assert_eq!(*marked.lock().unwrap(), vec!["users".to_owned()]);
}
#[tokio::test]
async fn skips_backfill_when_the_sink_reports_seeded() {
let acks = Arc::new(AtomicU64::new(0));
let source = SeedSource::new(vec![upsert_change(1, 0, &acks)]);
let called = Arc::clone(&source.called);
let ops = Arc::new(Mutex::new(Vec::new()));
let marked = Arc::new(Mutex::new(Vec::new()));
let sink = Arc::new(SeedSink {
seeded: true,
marked: Arc::clone(&marked),
ops: Arc::clone(&ops),
});
Engine::new(Arc::new(source), Arc::new(MockDocuments), sink)
.run()
.await
.unwrap();
assert!(
!called.load(Ordering::SeqCst),
"a seeded index is not snapshotted"
);
assert!(ops.lock().unwrap().is_empty());
assert!(marked.lock().unwrap().is_empty());
}
#[derive(Debug, Default)]
struct RecordingObserver {
indexes_ensured: AtomicU64,
captured: AtomicU64,
committed_changes: AtomicU64,
committed_documents: AtomicU64,
batches: AtomicU64,
live: AtomicBool,
}
impl Observer for RecordingObserver {
fn on_indexes_ensured(&self, count: usize) {
self.indexes_ensured.store(count as u64, Ordering::SeqCst);
}
fn on_live_started(&self) {
self.live.store(true, Ordering::SeqCst);
}
fn on_change_captured(&self) {
self.captured.fetch_add(1, Ordering::SeqCst);
}
fn on_batch_committed(&self, stats: BatchStats) {
self.committed_changes
.fetch_add(stats.changes as u64, Ordering::SeqCst);
self.committed_documents
.fetch_add(stats.documents as u64, Ordering::SeqCst);
self.batches.fetch_add(1, Ordering::SeqCst);
}
}
#[tokio::test]
async fn reports_lifecycle_and_progress_to_the_observer() {
let acks = Arc::new(AtomicU64::new(0));
let observer = Arc::new(RecordingObserver::default());
let changes = (0..5)
.map(|i| upsert_change(10 + i as i64, i, &acks))
.collect::<Vec<_>>();
Engine::new(
Arc::new(MockSource {
changes: Mutex::new(Some(changes)),
}),
Arc::new(MockDocuments),
Arc::new(RecordingSink::default()),
)
.with_observer(Arc::clone(&observer) as Arc<dyn Observer>)
.with_batch(BatchPolicy {
max_changes: 256,
max_delay: Duration::from_secs(10),
})
.skip_backfill(true)
.run()
.await
.unwrap();
assert!(observer.live.load(Ordering::SeqCst), "live phase reported");
assert_eq!(observer.captured.load(Ordering::SeqCst), 5, "all captured");
assert_eq!(observer.committed_changes.load(Ordering::SeqCst), 5);
assert_eq!(observer.committed_documents.load(Ordering::SeqCst), 5);
assert_eq!(observer.batches.load(Ordering::SeqCst), 1, "one batch");
}
#[tokio::test]
async fn skip_backfill_flag_overrides_an_unseeded_index() {
let acks = Arc::new(AtomicU64::new(0));
let source = SeedSource::new(vec![upsert_change(1, 0, &acks)]);
let called = Arc::clone(&source.called);
let ops = Arc::new(Mutex::new(Vec::new()));
let marked = Arc::new(Mutex::new(Vec::new()));
let sink = Arc::new(SeedSink {
seeded: false,
marked: Arc::clone(&marked),
ops: Arc::clone(&ops),
});
Engine::new(Arc::new(source), Arc::new(MockDocuments), sink)
.skip_backfill(true)
.run()
.await
.unwrap();
assert!(
!called.load(Ordering::SeqCst),
"skip_backfill suppresses the snapshot"
);
assert!(ops.lock().unwrap().is_empty());
assert!(marked.lock().unwrap().is_empty());
}
#[derive(Debug)]
enum CrashOp {
Upsert(String, GenericValue),
Delete(String),
}
#[derive(Debug)]
struct CrashSink {
store: Arc<Mutex<BTreeMap<String, GenericValue>>>,
staging: Mutex<Vec<CrashOp>>,
fail_next_flush: AtomicBool,
}
impl CrashSink {
fn new(store: Arc<Mutex<BTreeMap<String, GenericValue>>>, fail_first_flush: bool) -> Self {
Self {
store,
staging: Mutex::new(Vec::new()),
fail_next_flush: AtomicBool::new(fail_first_flush),
}
}
}
fn doc_key(index: &IndexName, id: &str) -> String {
format!("{}/{id}", index.as_ref())
}
#[async_trait]
impl Sink for CrashSink {
async fn upsert(
&self,
index: &IndexName,
id: &str,
document: &GenericValue,
) -> sinks_core::Result<()> {
self.staging
.lock()
.unwrap()
.push(CrashOp::Upsert(doc_key(index, id), document.clone()));
Ok(())
}
async fn delete(&self, index: &IndexName, id: &str) -> sinks_core::Result<()> {
self.staging
.lock()
.unwrap()
.push(CrashOp::Delete(doc_key(index, id)));
Ok(())
}
async fn flush(&self, _caught_up: bool) -> sinks_core::Result<sinks_core::FlushReport> {
if self.fail_next_flush.swap(false, Ordering::SeqCst) {
return Err(sinks_core::SinkError::Write(
"simulated crash before flush completed".to_owned(),
));
}
let mut store = self.store.lock().unwrap();
for op in self.staging.lock().unwrap().drain(..) {
match op {
CrashOp::Upsert(key, body) => {
store.insert(key, body);
}
CrashOp::Delete(key) => {
store.remove(&key);
}
}
}
Ok(sinks_core::FlushReport::clean())
}
}
#[tokio::test]
async fn redelivers_and_reapplies_idempotently_after_a_crash_before_flush() {
let store: Arc<Mutex<BTreeMap<String, GenericValue>>> = Arc::new(Mutex::new(BTreeMap::new()));
let acks = Arc::new(AtomicU64::new(0));
let batch = BatchPolicy {
max_changes: 256,
max_delay: Duration::from_secs(10),
};
let run1 = Engine::new(
Arc::new(MockSource {
changes: Mutex::new(Some(vec![
upsert_change(1, 0, &acks),
upsert_change(3, 1, &acks),
])),
}),
Arc::new(MockDocuments),
Arc::new(CrashSink::new(Arc::clone(&store), true)),
)
.with_batch(batch)
.skip_backfill(true)
.run()
.await;
assert!(run1.is_err(), "the crashing flush stops the run");
assert!(
store.lock().unwrap().is_empty(),
"a crash before the flush completes leaves nothing durable"
);
assert_eq!(
acks.load(Ordering::SeqCst),
0,
"no change is confirmed when the flush that would persist it never completed"
);
Engine::new(
Arc::new(MockSource {
changes: Mutex::new(Some(vec![
upsert_change(1, 0, &acks),
upsert_change(3, 1, &acks),
])),
}),
Arc::new(MockDocuments),
Arc::new(CrashSink::new(Arc::clone(&store), false)),
)
.with_batch(batch)
.skip_backfill(true)
.run()
.await
.unwrap();
let store = store.lock().unwrap();
assert_eq!(
store.keys().cloned().collect::<Vec<_>>(),
vec!["users/1".to_owned(), "users/3".to_owned()],
"both documents are durable exactly once after replay — no loss, no duplicate"
);
assert_eq!(
acks.load(Ordering::SeqCst),
2,
"every redelivered change is confirmed once its flush completes"
);
}
#[derive(Debug, Default)]
struct CaughtUpSink {
flushes: Arc<Mutex<Vec<bool>>>,
}
#[async_trait]
impl Sink for CaughtUpSink {
async fn upsert(
&self,
_index: &IndexName,
_id: &str,
_document: &GenericValue,
) -> sinks_core::Result<()> {
Ok(())
}
async fn delete(&self, _index: &IndexName, _id: &str) -> sinks_core::Result<()> {
Ok(())
}
async fn flush(&self, caught_up: bool) -> sinks_core::Result<sinks_core::FlushReport> {
self.flushes.lock().unwrap().push(caught_up);
Ok(sinks_core::FlushReport::clean())
}
}
#[tokio::test]
async fn caught_up_is_false_while_a_backlog_drains_then_true_on_the_last_batch() {
let acks = Arc::new(AtomicU64::new(0));
let flushes = Arc::new(Mutex::new(Vec::new()));
let documents = MockDocuments;
let sink = CaughtUpSink {
flushes: Arc::clone(&flushes),
};
let observer: Arc<dyn Observer> = Arc::new(NoopObserver);
let (producer, mut consumer) = channel::<Change>(16);
for seq in 0..5 {
producer
.publish(upsert_change(seq as i64, seq, &acks))
.await
.unwrap();
}
drop(producer);
let failure_policies = FailurePolicies::default();
let pipeline = Pipeline {
documents: &documents,
sink: &sink,
observer: &observer,
queue_capacity: 16,
batch: BatchPolicy {
max_changes: 2,
max_delay: Duration::from_secs(10),
},
failure_policies: &failure_policies,
};
work(pipeline, &mut consumer, None).await.unwrap();
assert_eq!(
flushes.lock().unwrap().as_slice(),
&[false, false, true],
"a flush is caught up only once it has drained the queue behind it",
);
}
#[derive(Debug, Default)]
struct RejectingSink {
staged: Mutex<Vec<(String, String)>>,
}
#[async_trait]
impl Sink for RejectingSink {
async fn upsert(
&self,
index: &IndexName,
id: &str,
_document: &GenericValue,
) -> sinks_core::Result<()> {
self.staged
.lock()
.unwrap()
.push((index.as_ref().to_owned(), id.to_owned()));
Ok(())
}
async fn delete(&self, index: &IndexName, id: &str) -> sinks_core::Result<()> {
self.staged
.lock()
.unwrap()
.push((index.as_ref().to_owned(), id.to_owned()));
Ok(())
}
async fn flush(&self, _caught_up: bool) -> sinks_core::Result<sinks_core::FlushReport> {
let rejected = self
.staged
.lock()
.unwrap()
.drain(..)
.map(|(index, id)| sinks_core::RejectedDocument {
index,
id,
reason: "simulated item-level rejection".to_owned(),
})
.collect();
Ok(sinks_core::FlushReport { rejected })
}
}
#[derive(Debug, Default)]
struct QuarantineObserver {
quarantined: Mutex<Vec<(String, String)>>,
}
impl Observer for QuarantineObserver {
fn on_document_quarantined(&self, index: &str, id: &str, _reason: &str) {
self.quarantined
.lock()
.unwrap()
.push((index.to_owned(), id.to_owned()));
}
}
fn engine_over(
changes: Vec<Change>,
observer: Arc<dyn Observer>,
policies: FailurePolicies,
) -> Engine {
Engine::new(
Arc::new(MockSource {
changes: Mutex::new(Some(changes)),
}),
Arc::new(MockDocuments),
Arc::new(RejectingSink::default()),
)
.with_observer(observer)
.with_failure_policies(policies)
}
#[test]
fn failure_policies_resolve_override_then_default() {
let policies =
FailurePolicies::new(FailurePolicy::Stop).with_override("analytics", FailurePolicy::Skip);
assert_eq!(policies.resolve("analytics"), FailurePolicy::Skip);
assert_eq!(policies.resolve("users"), FailurePolicy::Stop);
}
#[tokio::test]
async fn skip_policy_quarantines_rejected_documents_and_acks_the_batch() {
let acks = Arc::new(AtomicU64::new(0));
let changes = vec![upsert_change(1, 0, &acks), upsert_change(3, 1, &acks)];
let observer = Arc::new(QuarantineObserver::default());
engine_over(
changes,
Arc::clone(&observer) as Arc<dyn Observer>,
FailurePolicies::new(FailurePolicy::Skip),
)
.run()
.await
.unwrap();
let quarantined = observer.quarantined.lock().unwrap();
assert_eq!(quarantined.len(), 2, "both rejected documents quarantined");
assert!(quarantined.iter().all(|(index, _)| index == "users"));
assert_eq!(
acks.load(Ordering::SeqCst),
2,
"the batch is acked despite rejections"
);
}
#[tokio::test]
async fn stop_policy_errors_and_leaves_the_batch_unconfirmed() {
let acks = Arc::new(AtomicU64::new(0));
let changes = vec![upsert_change(1, 0, &acks)];
let err = engine_over(changes, Arc::new(NoopObserver), FailurePolicies::default())
.run()
.await
.unwrap_err();
assert!(matches!(err, EngineError::DocumentsRejected(1, _)));
assert_eq!(
acks.load(Ordering::SeqCst),
0,
"nothing is acked when the run stops"
);
}
#[tokio::test]
async fn per_index_stop_override_halts_even_when_global_is_skip() {
let acks = Arc::new(AtomicU64::new(0));
let changes = vec![upsert_change(1, 0, &acks)];
let err = engine_over(
changes,
Arc::new(NoopObserver),
FailurePolicies::new(FailurePolicy::Skip).with_override("users", FailurePolicy::Stop),
)
.run()
.await
.unwrap_err();
assert!(matches!(err, EngineError::DocumentsRejected(..)));
assert_eq!(acks.load(Ordering::SeqCst), 0);
}