use super::*;
use crate::transport::memory::{MemoryConfig, MemoryTransport};
use crate::transport::{CommitToken, PayloadFormat, RecordMeta};
use crate::worker::engine::BatchProcessingConfig;
use bytes::Bytes;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
fn default_engine() -> BatchEngine {
BatchEngine::new(BatchProcessingConfig::default())
}
fn mem_transport(timeout_ms: u64) -> MemoryTransport {
MemoryTransport::new(&MemoryConfig {
recv_timeout_ms: timeout_ms,
..Default::default()
})
.expect("memory transport with valid config must construct")
}
fn cancel_after(shutdown: CancellationToken, ms: u64) {
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(ms)).await;
shutdown.cancel();
});
}
fn fan_out(records: Vec<Record>, factor: usize) -> Vec<Record> {
let mut out = Vec::with_capacity(records.len() * factor);
for r in records {
for _ in 0..factor {
out.push(r.clone());
}
}
out
}
#[tokio::test]
async fn fan_out_commits_source_tokens_not_output_count() {
let n = 5usize;
let transport = mem_transport(50);
for i in 0..n {
transport
.inject(None, format!(r#"{{"id":{i}}}"#).into_bytes())
.await
.unwrap();
}
let engine = default_engine();
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 200);
let sink_records = Arc::new(AtomicUsize::new(0));
let sink_tokens = Arc::new(AtomicUsize::new(0));
let sr = Arc::clone(&sink_records);
let st = Arc::clone(&sink_tokens);
engine
.run_workbatch(
&transport,
shutdown,
|batch| Ok(batch.map_records(|recs| fan_out(recs, 2))),
|out: &WorkBatch<_>| {
let sr = Arc::clone(&sr);
let st = Arc::clone(&st);
let records = out.records.len();
let tokens = out.commit_tokens.len();
async move {
sr.fetch_add(records, Ordering::Relaxed);
st.fetch_add(tokens, Ordering::Relaxed);
Ok(())
}
},
CommitMode::Auto,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await
.unwrap();
assert_eq!(
sink_records.load(Ordering::Relaxed),
2 * n,
"all 2N records sunk"
);
assert_eq!(
sink_tokens.load(Ordering::Relaxed),
n,
"N source tokens carried"
);
assert_eq!(
transport.committed_sequence(),
(n - 1) as u64,
"commit advanced to the highest of the N source acks, not the 2N output count"
);
}
#[tokio::test]
async fn sink_error_does_not_commit() {
let transport = mem_transport(50);
transport
.inject(None, br#"{"id":1}"#.to_vec())
.await
.unwrap();
let engine = default_engine();
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 200);
let result = engine
.run_workbatch(
&transport,
shutdown,
|batch| Ok(batch),
|_out: &WorkBatch<_>| async { Err(EngineError::Sink("boom".into())) },
CommitMode::Auto,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await;
assert!(
matches!(result, Err(EngineError::Sink(_))),
"sink error is terminal: the run returns the sink error, got {result:?}"
);
let transport = mem_transport(50);
transport
.inject(None, br#"{"a":1}"#.to_vec())
.await
.unwrap(); transport
.inject(None, br#"{"b":2}"#.to_vec())
.await
.unwrap(); let _ = transport.recv(1).await.unwrap();
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 200);
let result = engine
.run_workbatch(
&transport,
shutdown,
|batch| Ok(batch),
|_out: &WorkBatch<_>| async { Err(EngineError::Sink("boom".into())) },
CommitMode::Auto,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await;
assert!(result.is_err(), "sink error is terminal");
assert_eq!(
transport.committed_sequence(),
0,
"sink error must skip commit -- sequence stays at its initial 0"
);
}
#[tokio::test]
async fn auto_commits_after_sink_ok() {
let transport = mem_transport(50);
for i in 0..3u64 {
transport
.inject(None, format!(r#"{{"id":{i}}}"#).into_bytes())
.await
.unwrap();
}
let engine = default_engine();
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 200);
engine
.run_workbatch(
&transport,
shutdown,
|batch| Ok(batch),
|_out: &WorkBatch<_>| async { Ok(()) },
CommitMode::Auto,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await
.unwrap();
assert_eq!(transport.committed_sequence(), 2);
}
#[tokio::test]
async fn sink_managed_does_not_commit_in_engine() {
let transport = mem_transport(50);
transport
.inject(None, br#"{"a":1}"#.to_vec())
.await
.unwrap(); transport
.inject(None, br#"{"b":2}"#.to_vec())
.await
.unwrap(); let _ = transport.recv(1).await.unwrap();
let engine = default_engine();
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 200);
engine
.run_workbatch(
&transport,
shutdown,
|batch| Ok(batch),
|_out: &WorkBatch<_>| async { Ok(()) },
CommitMode::SinkManaged,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await
.unwrap();
assert_eq!(
transport.committed_sequence(),
0,
"SinkManaged: engine must not commit -- sequence stays at initial 0"
);
}
#[tokio::test]
async fn ticker_fires_and_shutdown_stops_loop() {
let transport = mem_transport(50);
let engine = default_engine();
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 350);
let ticks = Arc::new(AtomicU64::new(0));
let tc = Arc::clone(&ticks);
let result = engine
.run_workbatch(
&transport,
shutdown,
|batch| Ok(batch),
|_out: &WorkBatch<_>| async { Ok(()) },
CommitMode::Auto,
Some((Duration::from_millis(100), move || {
let tc = Arc::clone(&tc);
async move {
tc.fetch_add(1, Ordering::Relaxed);
Ok(())
}
})),
)
.await;
assert!(result.is_ok(), "shutdown stops the loop cleanly");
assert!(
ticks.load(Ordering::Relaxed) >= 2,
"ticker fired at least twice over 350ms at 100ms interval"
);
}
#[tokio::test]
async fn on_demand_transform_reads_field_via_codec_parse() {
let transport = mem_transport(50);
transport
.inject(None, br#"{"_table":"events","id":1}"#.to_vec())
.await
.unwrap();
let engine = default_engine();
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 200);
let seen_table = Arc::new(std::sync::Mutex::new(String::new()));
let st = Arc::clone(&seen_table);
engine
.run_workbatch(
&transport,
shutdown,
move |batch| {
let st = Arc::clone(&st);
Ok(batch.map_records(move |recs| {
recs.into_iter()
.inspect(|r| {
let parsed =
codec::parse(&r.payload, r.metadata.format).expect("valid json");
if let Some(t) = parsed.field_str("_table") {
*st.lock().unwrap() = t.to_string();
}
})
.collect()
}))
},
|_out: &WorkBatch<_>| async { Ok(()) },
CommitMode::Auto,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await
.unwrap();
assert_eq!(*seen_table.lock().unwrap(), "events");
}
#[tokio::test]
async fn parsed_path_pre_parses_and_interner_dedups() {
let transport = mem_transport(50);
for i in 0..4 {
transport
.inject(
None,
format!(r#"{{"_table":"events","id":{i}}}"#).into_bytes(),
)
.await
.unwrap();
}
let engine = default_engine();
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 200);
let tables = Arc::new(AtomicUsize::new(0));
let tc = Arc::clone(&tables);
engine
.run_workbatch_parsed(
&transport,
shutdown,
move |pb: ParsedBatch<'_, _>| {
assert_eq!(pb.records.len(), pb.parsed.len());
let field = pb.intern("_table");
let mut hits = 0;
for parsed in &pb.parsed {
if parsed.field_str(&field) == Some("events") {
hits += 1;
}
}
tc.fetch_add(hits, Ordering::Relaxed);
Ok(WorkBatch::new(pb.records, pb.commit_tokens).with_dlq_entries(pb.dlq_entries))
},
|_out: &WorkBatch<_>| async { Ok(()) },
CommitMode::Auto,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await
.unwrap();
assert_eq!(
tables.load(Ordering::Relaxed),
4,
"all 4 records routed on _table"
);
assert_eq!(transport.committed_sequence(), 3, "all 4 acks committed");
}
#[tokio::test]
async fn parsed_path_routes_parse_failures_to_dlq() {
use crate::worker::engine::FilterDlqPolicy;
let transport = mem_transport(50);
transport
.inject(None, br#"{"id":1}"#.to_vec())
.await
.unwrap(); transport
.inject(None, b"not json {{{".to_vec())
.await
.unwrap(); transport
.inject(None, br#"{"id":3}"#.to_vec())
.await
.unwrap();
let routed = Arc::new(AtomicUsize::new(0));
let rc = Arc::clone(&routed);
let engine = default_engine().with_filter_dlq_policy(FilterDlqPolicy::Route(Arc::new(
move |entries: Vec<crate::transport::filter::FilteredDlqEntry>| {
rc.fetch_add(entries.len(), Ordering::Relaxed);
Ok(())
},
)));
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 200);
let dlq_seen = Arc::new(AtomicUsize::new(0));
let kept = Arc::new(AtomicUsize::new(0));
let ds = Arc::clone(&dlq_seen);
let kp = Arc::clone(&kept);
engine
.run_workbatch_parsed(
&transport,
shutdown,
move |pb: ParsedBatch<'_, _>| {
ds.fetch_add(pb.dlq_entries.len(), Ordering::Relaxed);
kp.fetch_add(pb.records.len(), Ordering::Relaxed);
Ok(WorkBatch::new(pb.records, pb.commit_tokens).with_dlq_entries(pb.dlq_entries))
},
|_out: &WorkBatch<_>| async { Ok(()) },
CommitMode::Auto,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await
.unwrap();
assert_eq!(kept.load(Ordering::Relaxed), 2, "2 records parsed cleanly");
assert_eq!(
dlq_seen.load(Ordering::Relaxed),
1,
"1 parse failure carried to the process closure as a DLQ entry"
);
assert_eq!(
routed.load(Ordering::Relaxed),
1,
"the parse-failure DLQ entry reached the DLQ route point before commit"
);
assert_eq!(transport.committed_sequence(), 2);
}
#[cfg(feature = "memory")]
#[tokio::test]
async fn lease_ingress_batch_accounts_and_releases() {
use crate::memory::{MemoryGuard, MemoryGuardConfig};
let mut engine = default_engine();
let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
limit_bytes: 1024 * 1024,
..Default::default()
}));
engine.set_memory_guard_for_test(Arc::clone(&guard));
let payloads: Vec<Record> = (0..4)
.map(|i| Record {
payload: Bytes::from(format!(r#"{{"id":{i}}}"#)),
key: None,
headers: vec![],
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Json,
},
})
.collect();
let batch = WorkBatch::<MemTok>::from_records(payloads);
let expected = batch.total_payload_bytes() as u64;
assert_eq!(guard.current_bytes(), 0);
{
let _lease = engine.lease_ingress_batch(&batch).expect("guard present");
assert_eq!(guard.current_bytes(), expected, "accounted while held");
}
assert_eq!(guard.current_bytes(), 0, "released on drop");
}
#[cfg(feature = "memory")]
#[derive(Debug, Clone)]
struct MemTok;
#[cfg(feature = "memory")]
impl std::fmt::Display for MemTok {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("memtok")
}
}
#[cfg(feature = "memory")]
impl CommitToken for MemTok {}
struct OrderedReceiver {
next_seq: Arc<AtomicU64>,
total: u64,
committed_hwm: Arc<AtomicU64>,
commit_calls: Arc<AtomicUsize>,
fail_commit_on_seq: Option<u64>,
}
impl OrderedReceiver {
fn new(total: u64) -> Self {
Self {
next_seq: Arc::new(AtomicU64::new(0)),
total,
committed_hwm: Arc::new(AtomicU64::new(u64::MAX)),
commit_calls: Arc::new(AtomicUsize::new(0)),
fail_commit_on_seq: None,
}
}
}
impl crate::transport::TransportBase for OrderedReceiver {
fn close(
&self,
) -> impl std::future::Future<Output = crate::transport::TransportResult<()>> + Send {
std::future::ready(Ok(()))
}
fn is_healthy(&self) -> bool {
true
}
fn name(&self) -> &'static str {
"ordered-test"
}
}
impl TransportReceiver for OrderedReceiver {
type Token = crate::transport::memory::MemoryToken;
fn recv(
&self,
_max: usize,
) -> impl std::future::Future<Output = crate::transport::TransportResult<WorkBatch<Self::Token>>>
+ Send {
let next_seq = Arc::clone(&self.next_seq);
let total = self.total;
async move {
let seq = next_seq.fetch_add(1, Ordering::Relaxed);
if seq >= total {
next_seq.fetch_sub(1, Ordering::Relaxed);
std::future::pending::<()>().await;
}
let record = Record {
payload: Bytes::from(format!(r#"{{"seq":{seq}}}"#)),
key: None,
headers: vec![],
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Json,
},
};
Ok(WorkBatch::new(
vec![record],
vec![crate::transport::memory::MemoryToken { seq }],
))
}
}
async fn commit(&self, tokens: &[Self::Token]) -> crate::transport::TransportResult<()> {
self.commit_calls.fetch_add(1, Ordering::Relaxed);
let Some(max_seq) = tokens.iter().map(|t| t.seq).max() else {
return Ok(());
};
if self.fail_commit_on_seq == Some(max_seq) {
return Err(crate::transport::TransportError::Commit(format!(
"broker commit failed for seq {max_seq}"
)));
}
self.committed_hwm.fetch_max(max_seq, Ordering::Relaxed);
Ok(())
}
}
#[tokio::test]
async fn sink_error_blocks_later_ordered_commits() {
let receiver = OrderedReceiver::new(3);
let committed = Arc::clone(&receiver.committed_hwm);
let commit_calls = Arc::clone(&receiver.commit_calls);
let engine = default_engine();
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 500);
let sink_calls = Arc::new(AtomicUsize::new(0));
let sc = Arc::clone(&sink_calls);
let result = engine
.run_workbatch(
&receiver,
shutdown,
|batch| Ok(batch),
move |out: &WorkBatch<_>| {
let sc = Arc::clone(&sc);
let carries_zero = out.commit_tokens.iter().any(|t| t.seq == 0);
async move {
sc.fetch_add(1, Ordering::Relaxed);
if carries_zero {
Err(EngineError::Sink("boom on token 0".into()))
} else {
Ok(())
}
}
},
CommitMode::Auto,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await;
assert_eq!(
committed.load(Ordering::Relaxed),
u64::MAX,
"sink error on token 0 must leave the committed watermark unmoved -- \
a later token must NOT be committed past the failed offset"
);
assert_eq!(
commit_calls.load(Ordering::Relaxed),
0,
"no commit may fire while token 0's block is unsent"
);
assert!(
result.is_err(),
"sink failure under Auto must be a terminal engine error (ack barrier), \
not a logged continue that drains later blocks"
);
assert_eq!(
sink_calls.load(Ordering::Relaxed),
1,
"loop must stop at the failed block -- token 1 must not be fetched+sunk"
);
}
#[tokio::test]
async fn commit_error_blocks_later_ordered_commits() {
let mut receiver = OrderedReceiver::new(3);
receiver.fail_commit_on_seq = Some(0);
let committed = Arc::clone(&receiver.committed_hwm);
let engine = default_engine();
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 500);
let sink_calls = Arc::new(AtomicUsize::new(0));
let sc = Arc::clone(&sink_calls);
let result = engine
.run_workbatch(
&receiver,
shutdown,
|batch| Ok(batch),
move |_out: &WorkBatch<_>| {
let sc = Arc::clone(&sc);
async move {
sc.fetch_add(1, Ordering::Relaxed);
Ok(())
}
},
CommitMode::Auto,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await;
assert_eq!(
committed.load(Ordering::Relaxed),
u64::MAX,
"failed commit must not leave a later commit to advance past it"
);
assert!(
result.is_err(),
"commit failure must be a terminal ack-barrier error"
);
assert_eq!(
sink_calls.load(Ordering::Relaxed),
1,
"loop must stop at the failed commit -- token 1 must not be processed"
);
}
#[tokio::test]
async fn streaming_sink_error_blocks_later_ordered_commits() {
let receiver = OrderedReceiver::new(3);
let committed = Arc::clone(&receiver.committed_hwm);
let commit_calls = Arc::clone(&receiver.commit_calls);
let engine = default_engine();
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 500);
let sink_calls = Arc::new(AtomicUsize::new(0));
let sc = Arc::clone(&sink_calls);
let result = engine
.run_workbatch_streaming(
&receiver,
shutdown,
|batch| Ok(batch),
move |out: &WorkBatch<_>| {
let sc = Arc::clone(&sc);
let carries_zero = out
.records
.iter()
.any(|r| r.payload.as_ref() == br#"{"seq":0}"#);
async move {
sc.fetch_add(1, Ordering::Relaxed);
if carries_zero {
Err(EngineError::Sink("boom on token 0 (streaming)".into()))
} else {
Ok(())
}
}
},
CommitMode::Auto,
64, None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await;
assert_eq!(
committed.load(Ordering::Relaxed),
u64::MAX,
"streaming sink error on token 0 must not let a later token commit ahead"
);
assert_eq!(
commit_calls.load(Ordering::Relaxed),
0,
"no commit may fire while token 0's block is unsent (streaming)"
);
assert!(
result.is_err(),
"streaming sink failure under Auto must be a terminal ack-barrier error"
);
assert_eq!(
sink_calls.load(Ordering::Relaxed),
1,
"streaming loop must stop at the failed block"
);
}
#[test]
fn split_groups_by_byte_target() {
let records: Vec<Record> = (0..5)
.map(|_| Record {
payload: Bytes::from_static(b"0123456789"), key: None,
headers: vec![],
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Json,
},
})
.collect();
let sub = BatchEngine::split_into_sub_blocks(records, 25);
let lens: Vec<usize> = sub.iter().map(Vec::len).collect();
assert_eq!(lens, vec![2, 2, 1], "20<=25 per block, never overshoot 25");
}
#[test]
fn split_floor_one_oversized_record() {
let records = vec![
Record {
payload: Bytes::from_static(b"this-payload-is-way-over-the-target"),
key: None,
headers: vec![],
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Json,
},
},
Record {
payload: Bytes::from_static(b"small"),
key: None,
headers: vec![],
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Json,
},
},
];
let sub = BatchEngine::split_into_sub_blocks(records, 4);
let lens: Vec<usize> = sub.iter().map(Vec::len).collect();
assert_eq!(lens, vec![1, 1], "oversized record floors to one-per-block");
}
#[test]
fn split_empty_yields_no_sub_blocks() {
let sub = BatchEngine::split_into_sub_blocks(Vec::new(), 100);
assert!(sub.is_empty());
}
#[test]
fn split_smaller_than_target_is_one_sub_block() {
let records: Vec<Record> = (0..3)
.map(|_| Record {
payload: Bytes::from_static(b"abc"),
key: None,
headers: vec![],
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Json,
},
})
.collect();
let sub = BatchEngine::split_into_sub_blocks(records, 10_000);
assert_eq!(sub.len(), 1, "whole batch under target -> single sub-block");
assert_eq!(sub[0].len(), 3);
}
#[cfg(feature = "memory")]
#[tokio::test]
async fn streaming_peak_lease_bounded_to_one_sub_block() {
use crate::memory::{MemoryGuard, MemoryGuardConfig};
const RECORD_BYTES: usize = 64;
const N: usize = 16;
let total: u64 = (RECORD_BYTES * N) as u64; let payload = vec![b'x'; RECORD_BYTES];
let transport = mem_transport(50);
for _ in 0..N {
transport.inject(None, payload.clone()).await.unwrap();
}
let mut engine = default_engine();
let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
limit_bytes: 1024 * 1024,
..Default::default()
}));
engine.set_memory_guard_for_test(Arc::clone(&guard));
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 200);
let sub_block_bytes = total / 4; let one_sub_block_bytes = sub_block_bytes;
let high_water = Arc::new(AtomicU64::new(0));
let guard_for_sink = Arc::clone(&guard);
let hw = Arc::clone(&high_water);
engine
.run_workbatch_streaming(
&transport,
shutdown,
|batch| Ok(batch),
move |_out: &WorkBatch<_>| {
let guard = Arc::clone(&guard_for_sink);
let hw = Arc::clone(&hw);
async move {
let now = guard.current_bytes();
hw.fetch_max(now, Ordering::Relaxed);
Ok(())
}
},
CommitMode::Auto,
sub_block_bytes,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await
.unwrap();
let peak = high_water.load(Ordering::Relaxed);
assert!(
peak <= one_sub_block_bytes,
"peak lease {peak} exceeded one sub-block {one_sub_block_bytes} \
(a whole-batch lease would be {total})"
);
assert!(
peak > 0 && peak < total,
"peak {peak} must be a partial sub-block, strictly less than the \
whole batch {total}"
);
assert_eq!(guard.current_bytes(), 0, "all leases released after run");
}
struct CountingReceiver {
inner: MemoryTransport,
commit_calls: Arc<AtomicUsize>,
commit_token_count: Arc<AtomicUsize>,
sink_calls: Arc<AtomicUsize>,
sink_calls_at_commit: Arc<AtomicUsize>,
}
impl crate::transport::TransportBase for CountingReceiver {
fn close(
&self,
) -> impl std::future::Future<Output = crate::transport::TransportResult<()>> + Send {
self.inner.close()
}
fn is_healthy(&self) -> bool {
self.inner.is_healthy()
}
fn name(&self) -> &'static str {
self.inner.name()
}
}
impl TransportReceiver for CountingReceiver {
type Token = <MemoryTransport as TransportReceiver>::Token;
fn recv(
&self,
max: usize,
) -> impl std::future::Future<Output = crate::transport::TransportResult<WorkBatch<Self::Token>>>
+ Send {
self.inner.recv(max)
}
async fn commit(&self, tokens: &[Self::Token]) -> crate::transport::TransportResult<()> {
self.commit_calls.fetch_add(1, Ordering::Relaxed);
self.commit_token_count
.fetch_add(tokens.len(), Ordering::Relaxed);
self.sink_calls_at_commit
.store(self.sink_calls.load(Ordering::Relaxed), Ordering::Relaxed);
self.inner.commit(tokens).await
}
}
#[tokio::test]
async fn streaming_commits_once_after_final_sub_block() {
const N: usize = 12;
const RECORD_BYTES: usize = 32;
let payload = vec![b'y'; RECORD_BYTES];
let inner = mem_transport(50);
for _ in 0..N {
inner.inject(None, payload.clone()).await.unwrap();
}
let commit_calls = Arc::new(AtomicUsize::new(0));
let commit_token_count = Arc::new(AtomicUsize::new(0));
let sink_calls = Arc::new(AtomicUsize::new(0));
let sink_calls_at_commit = Arc::new(AtomicUsize::new(0));
let receiver = CountingReceiver {
inner,
commit_calls: Arc::clone(&commit_calls),
commit_token_count: Arc::clone(&commit_token_count),
sink_calls: Arc::clone(&sink_calls),
sink_calls_at_commit: Arc::clone(&sink_calls_at_commit),
};
let engine = default_engine();
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 200);
let sc = Arc::clone(&sink_calls);
let sub_block_bytes = (RECORD_BYTES * 3) as u64;
engine
.run_workbatch_streaming(
&receiver,
shutdown,
|batch| Ok(batch),
move |_out: &WorkBatch<_>| {
let sc = Arc::clone(&sc);
async move {
sc.fetch_add(1, Ordering::Relaxed);
Ok(())
}
},
CommitMode::Auto,
sub_block_bytes,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await
.unwrap();
let total_sinks = sink_calls.load(Ordering::Relaxed);
assert!(
total_sinks >= 4,
"expected multiple sub-block sinks, got {total_sinks}"
);
assert_eq!(commit_calls.load(Ordering::Relaxed), 1, "commit fires once");
assert_eq!(
commit_token_count.load(Ordering::Relaxed),
N,
"commit carried all N source tokens"
);
assert_eq!(
sink_calls_at_commit.load(Ordering::Relaxed),
total_sinks,
"commit fired after the last sub-block sink"
);
}
#[tokio::test]
async fn streaming_mid_sub_block_sink_error_skips_commit() {
const N: usize = 9;
const RECORD_BYTES: usize = 32;
let payload = vec![b'z'; RECORD_BYTES];
let inner = mem_transport(50);
for _ in 0..N {
inner.inject(None, payload.clone()).await.unwrap();
}
let commit_calls = Arc::new(AtomicUsize::new(0));
let commit_token_count = Arc::new(AtomicUsize::new(0));
let sink_calls = Arc::new(AtomicUsize::new(0));
let sink_calls_at_commit = Arc::new(AtomicUsize::new(0));
let receiver = CountingReceiver {
inner,
commit_calls: Arc::clone(&commit_calls),
commit_token_count: Arc::clone(&commit_token_count),
sink_calls: Arc::clone(&sink_calls),
sink_calls_at_commit: Arc::clone(&sink_calls_at_commit),
};
let engine = default_engine();
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 200);
let sc = Arc::clone(&sink_calls);
let sub_block_bytes = (RECORD_BYTES * 3) as u64;
let result = engine
.run_workbatch_streaming(
&receiver,
shutdown,
|batch| Ok(batch),
move |_out: &WorkBatch<_>| {
let sc = Arc::clone(&sc);
async move {
let nth = sc.fetch_add(1, Ordering::Relaxed) + 1;
if nth == 2 {
Err(EngineError::Sink("boom on middle sub-block".into()))
} else {
Ok(())
}
}
},
CommitMode::Auto,
sub_block_bytes,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await;
assert!(
matches!(result, Err(EngineError::Sink(_))),
"mid sub-block sink error is terminal, got {result:?}"
);
assert_eq!(
commit_calls.load(Ordering::Relaxed),
0,
"mid sub-block sink error must skip commit"
);
assert_eq!(
sink_calls.load(Ordering::Relaxed),
2,
"stopped after the failing 2nd sub-block (3rd never sunk)"
);
}
#[tokio::test]
async fn streaming_small_batch_is_single_sub_block() {
let transport = mem_transport(50);
for i in 0..3u64 {
transport
.inject(None, format!(r#"{{"id":{i}}}"#).into_bytes())
.await
.unwrap();
}
let engine = default_engine();
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 200);
let sink_calls = Arc::new(AtomicUsize::new(0));
let sink_records = Arc::new(AtomicUsize::new(0));
let scz = Arc::clone(&sink_calls);
let srz = Arc::clone(&sink_records);
engine
.run_workbatch_streaming(
&transport,
shutdown,
|batch| Ok(batch),
move |out: &WorkBatch<_>| {
let scz = Arc::clone(&scz);
let srz = Arc::clone(&srz);
let n = out.records.len();
async move {
scz.fetch_add(1, Ordering::Relaxed);
srz.fetch_add(n, Ordering::Relaxed);
Ok(())
}
},
CommitMode::Auto,
10_000, None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await
.unwrap();
assert_eq!(
sink_calls.load(Ordering::Relaxed),
1,
"under-target batch sinks once (single sub-block)"
);
assert_eq!(
sink_records.load(Ordering::Relaxed),
3,
"all 3 records sunk"
);
assert_eq!(
transport.committed_sequence(),
2,
"all 3 acks committed once"
);
}
#[tokio::test]
async fn streaming_rejects_sink_managed_commit() {
let transport = mem_transport(50);
let engine = default_engine();
let shutdown = CancellationToken::new();
let result = engine
.run_workbatch_streaming(
&transport,
shutdown,
|batch| Ok(batch),
move |_out: &WorkBatch<_>| async move { Ok(()) },
CommitMode::SinkManaged,
10_000,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await;
assert!(
matches!(result, Err(EngineError::SinkManagedUnsupported)),
"SinkManaged on the streaming path must fail fast, got {result:?}"
);
}
#[cfg(feature = "governor")]
fn governed_engine() -> (BatchEngine, crate::governor::SelfRegulationGovernor) {
use crate::memory::{MemoryGuard, MemoryGuardConfig};
let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
limit_bytes: 1024 * 1024,
..Default::default()
}));
let gov = crate::governor::SelfRegulationConfig::default()
.build(guard)
.expect("enabled by default");
let mut engine = default_engine();
engine.set_byte_budget(gov.budget());
(engine, gov)
}
#[cfg(feature = "governor")]
#[tokio::test]
async fn governed_on_streams_and_commits_via_memory_transport() {
let transport = mem_transport(50);
for i in 0..6u64 {
transport
.inject(None, format!(r#"{{"id":{i}}}"#).into_bytes())
.await
.unwrap();
}
let (engine, _gov) = governed_engine();
assert!(engine.is_self_regulated(), "budget wired -> governed path");
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 200);
let sink_records = Arc::new(AtomicUsize::new(0));
let sr = Arc::clone(&sink_records);
engine
.run_governed(
&transport,
shutdown,
|batch| Ok(batch),
move |out: &WorkBatch<_>| {
let sr = Arc::clone(&sr);
let n = out.records.len();
async move {
sr.fetch_add(n, Ordering::Relaxed);
Ok(())
}
},
CommitMode::Auto,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await
.unwrap();
assert_eq!(
sink_records.load(Ordering::Relaxed),
6,
"all records streamed to the sink under the governor"
);
assert_eq!(transport.committed_sequence(), 5, "all 6 acks committed");
}
#[cfg(feature = "governor")]
#[tokio::test]
async fn governed_off_is_whole_batch_passthrough() {
let transport = mem_transport(50);
for i in 0..4u64 {
transport
.inject(None, format!(r#"{{"id":{i}}}"#).into_bytes())
.await
.unwrap();
}
let engine = default_engine();
assert!(
!engine.is_self_regulated(),
"no budget wired -> whole-batch path"
);
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 200);
let sink_calls = Arc::new(AtomicUsize::new(0));
let sink_records = Arc::new(AtomicUsize::new(0));
let sc = Arc::clone(&sink_calls);
let sr = Arc::clone(&sink_records);
engine
.run_governed(
&transport,
shutdown,
|batch| Ok(batch),
move |out: &WorkBatch<_>| {
let sc = Arc::clone(&sc);
let sr = Arc::clone(&sr);
let n = out.records.len();
async move {
sc.fetch_add(1, Ordering::Relaxed);
sr.fetch_add(n, Ordering::Relaxed);
Ok(())
}
},
CommitMode::Auto,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await
.unwrap();
assert_eq!(
sink_calls.load(Ordering::Relaxed),
1,
"OFF path = whole-batch: the block sinks ONCE (not per sub-block)"
);
assert_eq!(sink_records.load(Ordering::Relaxed), 4, "all records sunk");
assert_eq!(transport.committed_sequence(), 3, "all 4 acks committed");
}
#[cfg(feature = "governor")]
#[test]
fn governed_gate_and_budget_share_pressure() {
use crate::governor::{Admit, InboundGate, NoopActuator};
use crate::memory::{MemoryGuard, MemoryGuardConfig};
let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
limit_bytes: 1000,
pressure_threshold: 0.80,
..Default::default()
}));
let gov = crate::governor::SelfRegulationConfig::default()
.build(Arc::clone(&guard))
.expect("enabled");
let gate = InboundGate::new(gov.pressure(), Box::new(NoopActuator));
let budget = gov.budget();
let start = budget.byte_budget();
assert_eq!(gate.evaluate(), Admit::Yes, "low pressure admits");
guard.add_bytes(950); assert_eq!(gate.evaluate(), Admit::Hold, "high pressure holds the gate");
budget.observe(0, Duration::from_millis(1), Duration::from_millis(100));
assert!(
budget.byte_budget() < start,
"high memory shrinks the shared budget (HARD override)"
);
}
#[cfg(feature = "governor")]
#[tokio::test]
async fn send_unaffected_by_pressure_pinned_high() {
use crate::governor::{Hysteresis, MemoryPressureSource, PressureSource, UnifiedPressure};
use crate::memory::{MemoryGuard, MemoryGuardConfig};
use crate::transport::TransportSender;
let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
limit_bytes: 1000,
pressure_threshold: 0.80,
..Default::default()
}));
guard.add_bytes(950); let pressure = Arc::new(UnifiedPressure::new(
vec![Arc::new(MemoryPressureSource::new(Arc::clone(&guard))) as Arc<dyn PressureSource>],
Hysteresis::new(0.80, 0.65).expect("valid band"),
));
assert!(
pressure.should_hold(),
"pinned-high governor must hold the INBOUND gate"
);
let transport = mem_transport(50);
let single = transport
.send("k", Bytes::from_static(br#"{"id":1}"#))
.await;
assert!(
single.is_ok(),
"single send must succeed under pressure (sink never gated), got {single:?}"
);
let records: Vec<Record> = (0..5)
.map(|i| Record {
payload: Bytes::from(format!(r#"{{"id":{i}}}"#)),
key: Some(Arc::from(format!("k{i}").as_str())),
headers: vec![],
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Json,
},
})
.collect();
let batch_res = transport.send_batch(&records).await;
assert!(
batch_res.is_ok(),
"send_batch must succeed under pressure (sink never gated), got {batch_res:?}"
);
assert!(
pressure.should_hold(),
"send does not touch the pressure latch"
);
let got = transport.recv(10).await.unwrap().records;
assert_eq!(got.len(), 6, "1 single + 5 batched records all drained");
}
#[cfg(all(feature = "governor", feature = "memory"))]
fn governed_engine_low_limit(
limit_bytes: u64,
) -> (
BatchEngine,
crate::governor::SelfRegulationGovernor,
Arc<crate::memory::MemoryGuard>,
) {
use crate::memory::{MemoryGuard, MemoryGuardConfig};
let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
limit_bytes,
pressure_threshold: 0.80,
..Default::default()
}));
let gov = crate::governor::SelfRegulationConfig::default()
.build(Arc::clone(&guard))
.expect("enabled by default");
let mut engine = BatchEngine::new(BatchProcessingConfig {
max_chunk_size: 16,
..Default::default()
});
engine.set_byte_budget(gov.budget());
engine.set_memory_guard_for_test(Arc::clone(&guard));
(engine, gov, guard)
}
#[cfg(all(feature = "governor", feature = "memory"))]
#[tokio::test]
async fn operational_never_oom_governed_pipeline_bounds_memory() {
use crate::governor::{Admit, InboundGate, NoopActuator};
const LIMIT: u64 = 18 * 1024; const RECORD_BYTES: usize = 1024; const N: usize = 256; let payload = vec![b'q'; RECORD_BYTES];
let total_payload: u64 = (RECORD_BYTES * N) as u64;
let transport = mem_transport(50);
for _ in 0..N {
transport.inject(None, payload.clone()).await.unwrap();
}
let (engine, gov, guard) = governed_engine_low_limit(LIMIT);
assert!(engine.is_self_regulated(), "budget wired -> governed path");
let gate = Arc::new(InboundGate::new(gov.pressure(), Box::new(NoopActuator)));
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 600);
let sink_records = Arc::new(AtomicUsize::new(0));
let high_water = Arc::new(AtomicU64::new(0));
let gate_held_ever = Arc::new(std::sync::atomic::AtomicBool::new(false));
let sr = Arc::clone(&sink_records);
let hw = Arc::clone(&high_water);
let geh = Arc::clone(&gate_held_ever);
let guard_for_sink = Arc::clone(&guard);
let gate_for_sink = Arc::clone(&gate);
engine
.run_governed(
&transport,
shutdown,
|batch| Ok(batch),
move |out: &WorkBatch<_>| {
let sr = Arc::clone(&sr);
let hw = Arc::clone(&hw);
let geh = Arc::clone(&geh);
let guard = Arc::clone(&guard_for_sink);
let gate = Arc::clone(&gate_for_sink);
let n = out.records.len();
async move {
hw.fetch_max(guard.current_bytes(), Ordering::Relaxed);
if gate.evaluate() == Admit::Hold {
geh.store(true, Ordering::Relaxed);
}
sr.fetch_add(n, Ordering::Relaxed);
Ok(())
}
},
CommitMode::Auto,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await
.unwrap();
assert_eq!(
sink_records.load(Ordering::Relaxed),
N,
"all {N} records drained through the governed sink"
);
assert_eq!(
transport.committed_sequence(),
(N - 1) as u64,
"all source acks committed (drain never stalled)"
);
assert!(
gate_held_ever.load(Ordering::Relaxed),
"inbound gate must engage (Admit::Hold) under sustained pressure"
);
let peak = high_water.load(Ordering::Relaxed);
assert!(
peak > 0,
"some bytes must be accounted while a sub-block is in flight"
);
assert!(
peak < total_payload / 2,
"peak in-flight {peak} must stay well under half the whole payload \
{total_payload} (streaming peak-lease bounds it, never OOM)"
);
assert!(
gov.budget().byte_budget() >= 1,
"byte budget never collapses below its floor"
);
assert_eq!(
guard.current_bytes(),
0,
"all ingress leases released after the run -- no leak"
);
}
struct ByteAwareSource {
remaining: std::sync::Mutex<std::collections::VecDeque<Record>>,
recv_high_water: Arc<AtomicU64>,
committed: Arc<AtomicU64>,
}
impl ByteAwareSource {
fn new(records: Vec<Record>, recv_high_water: Arc<AtomicU64>) -> Self {
Self {
remaining: std::sync::Mutex::new(records.into_iter().collect()),
recv_high_water,
committed: Arc::new(AtomicU64::new(0)),
}
}
fn pull(&self, max_records: usize, max_bytes: Option<u64>) -> Option<WorkBatch<MemTok2>> {
let mut q = self.remaining.lock().unwrap();
if q.is_empty() {
return None;
}
let mut records = Vec::new();
let mut bytes: u64 = 0;
while records.len() < max_records {
let Some(front) = q.front() else { break };
let rb = front.payload.len() as u64;
if let Some(cap) = max_bytes
&& !records.is_empty()
&& bytes.saturating_add(rb) > cap
{
break;
}
bytes = bytes.saturating_add(rb);
records.push(q.pop_front().expect("front exists"));
}
self.recv_high_water.fetch_max(bytes, Ordering::Relaxed);
let n = records.len() as u64;
let base = self.committed.load(Ordering::Relaxed);
let tokens: Vec<MemTok2> = (0..n).map(|i| MemTok2 { seq: base + i }).collect();
Some(WorkBatch::new(records, tokens))
}
}
#[derive(Debug, Clone, Copy)]
struct MemTok2 {
seq: u64,
}
impl std::fmt::Display for MemTok2 {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "memtok2:{}", self.seq)
}
}
impl crate::transport::CommitToken for MemTok2 {}
impl crate::transport::TransportBase for ByteAwareSource {
fn close(
&self,
) -> impl std::future::Future<Output = crate::transport::TransportResult<()>> + Send {
std::future::ready(Ok(()))
}
fn is_healthy(&self) -> bool {
true
}
fn name(&self) -> &'static str {
"byte-aware-source"
}
}
impl TransportReceiver for ByteAwareSource {
type Token = MemTok2;
fn recv(
&self,
max: usize,
) -> impl std::future::Future<Output = crate::transport::TransportResult<WorkBatch<Self::Token>>>
+ Send {
let pulled = self.pull(max, None);
async move {
match pulled {
Some(batch) => Ok(batch),
None => std::future::pending().await,
}
}
}
fn recv_limited(
&self,
limits: crate::transport::RecvLimits,
) -> impl std::future::Future<Output = crate::transport::TransportResult<WorkBatch<Self::Token>>>
+ Send {
let pulled = self.pull(limits.max_records, Some(limits.max_bytes));
async move {
match pulled {
Some(batch) => Ok(batch),
None => std::future::pending().await,
}
}
}
async fn commit(&self, tokens: &[Self::Token]) -> crate::transport::TransportResult<()> {
if let Some(max_seq) = tokens.iter().map(|t| t.seq).max() {
self.committed.fetch_max(max_seq, Ordering::Relaxed);
}
Ok(())
}
}
#[cfg(feature = "governor")]
#[tokio::test]
async fn governed_recv_is_byte_bounded_not_record_bounded() {
use crate::memory::{MemoryGuard, MemoryGuardConfig};
const RECORD_BYTES: usize = 4 * 1024;
const N: usize = 64;
const BUDGET: u64 = 16 * 1024;
let total: u64 = (RECORD_BYTES * N) as u64; let payload = vec![b'b'; RECORD_BYTES];
let records: Vec<Record> = (0..N)
.map(|_| Record {
payload: Bytes::from(payload.clone()),
key: None,
headers: vec![],
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Json,
},
})
.collect();
let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
limit_bytes: 1024 * 1024,
..Default::default()
}));
let cfg = crate::governor::ByteBudgetConfig {
start_bytes: BUDGET,
max_bytes: BUDGET, floor_records: 1,
nominal_record_bytes: RECORD_BYTES as u64,
record_cap: 4096, ..Default::default()
};
let pressure = crate::governor::SelfRegulationConfig::default()
.build(Arc::clone(&guard))
.expect("enabled")
.pressure();
let budget = Arc::new(crate::governor::ByteBudgetController::new(
cfg,
Arc::clone(&pressure),
));
let recv_high_water = Arc::new(AtomicU64::new(0));
let source = ByteAwareSource::new(records, Arc::clone(&recv_high_water));
let mut engine = BatchEngine::new(BatchProcessingConfig {
max_chunk_size: 4096,
..Default::default()
});
engine.set_byte_budget(budget);
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 250);
engine
.run_governed(
&source,
shutdown,
|batch| Ok(batch),
|_out: &WorkBatch<_>| async { Ok(()) },
CommitMode::Auto,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await
.unwrap();
let peak = recv_high_water.load(Ordering::Relaxed);
assert!(
peak <= BUDGET + RECORD_BYTES as u64,
"governed recv retained {peak} bytes at recv time -- must be bounded \
by the byte budget {BUDGET} (+ one record {RECORD_BYTES}), not the \
whole {total}-byte block (record-bounded recv would retain all of it)"
);
assert!(
peak > 0,
"the source did hand out records (sanity: the loop ran)"
);
}
#[test]
fn sub_block_drain_yields_incrementally() {
let records: Vec<Record> = (0..6)
.map(|_| Record {
payload: Bytes::from_static(b"0123456789"),
key: None,
headers: vec![],
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Json,
},
})
.collect();
let mut drain = SubBlockDrain::new(records, 25);
let first = drain.next_sub_block().expect("first sub-block");
assert_eq!(first.len(), 2, "first sub-block is one budget's worth");
let second = drain.next_sub_block().expect("second sub-block");
assert_eq!(second.len(), 2);
let third = drain.next_sub_block().expect("third sub-block");
assert_eq!(third.len(), 2);
assert!(drain.next_sub_block().is_none(), "drain exhausted");
}
use crate::worker::engine::FilterDlqPolicy;
use crate::worker::engine::config::ParseErrorAction;
fn engine_with_parse_action(action: ParseErrorAction) -> BatchEngine {
BatchEngine::new(BatchProcessingConfig {
parse_error_action: action,
..Default::default()
})
}
#[tokio::test]
async fn parsed_parse_error_skip_drops_without_dlq_and_commits_survivors() {
let transport = mem_transport(50);
transport
.inject(None, br#"{"id":1}"#.to_vec())
.await
.unwrap(); transport
.inject(None, b"not json {{{".to_vec())
.await
.unwrap(); transport
.inject(None, br#"{"id":3}"#.to_vec())
.await
.unwrap();
let routed = Arc::new(AtomicUsize::new(0));
let rc = Arc::clone(&routed);
let engine = engine_with_parse_action(ParseErrorAction::Skip).with_filter_dlq_policy(
FilterDlqPolicy::Route(Arc::new(
move |entries: Vec<crate::transport::filter::FilteredDlqEntry>| {
rc.fetch_add(entries.len(), Ordering::Relaxed);
Ok(())
},
)),
);
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 200);
let dlq_seen = Arc::new(AtomicUsize::new(0));
let kept = Arc::new(AtomicUsize::new(0));
let ds = Arc::clone(&dlq_seen);
let kp = Arc::clone(&kept);
engine
.run_workbatch_parsed(
&transport,
shutdown,
move |pb: ParsedBatch<'_, _>| {
ds.fetch_add(pb.dlq_entries.len(), Ordering::Relaxed);
kp.fetch_add(pb.records.len(), Ordering::Relaxed);
Ok(WorkBatch::new(pb.records, pb.commit_tokens).with_dlq_entries(pb.dlq_entries))
},
|_out: &WorkBatch<_>| async { Ok(()) },
CommitMode::Auto,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await
.unwrap();
assert_eq!(kept.load(Ordering::Relaxed), 2, "2 survivors kept");
assert_eq!(
dlq_seen.load(Ordering::Relaxed),
0,
"Skip: parse failure produces NO DLQ entry (dropped, not dead-lettered)"
);
assert_eq!(
routed.load(Ordering::Relaxed),
0,
"Skip: nothing reaches the DLQ route point"
);
assert_eq!(transport.committed_sequence(), 2);
}
#[tokio::test]
async fn parsed_parse_error_fail_batch_skips_commit() {
let receiver = OrderedReceiverBad::new();
let committed = Arc::clone(&receiver.committed_hwm);
let engine = engine_with_parse_action(ParseErrorAction::FailBatch);
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 500);
let sink_calls = Arc::new(AtomicUsize::new(0));
let sc = Arc::clone(&sink_calls);
let result = engine
.run_workbatch_parsed(
&receiver,
shutdown,
|pb: ParsedBatch<'_, _>| {
Ok(WorkBatch::new(pb.records, pb.commit_tokens).with_dlq_entries(pb.dlq_entries))
},
move |_out: &WorkBatch<_>| {
let sc = Arc::clone(&sc);
async move {
sc.fetch_add(1, Ordering::Relaxed);
Ok(())
}
},
CommitMode::Auto,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await;
assert!(
matches!(result, Err(EngineError::ParseBatchFailed(_))),
"FailBatch: a parse failure is a terminal engine error, got {result:?}"
);
assert_eq!(
committed.load(Ordering::Relaxed),
u64::MAX,
"FailBatch: the whole block fails its commit -- watermark unmoved"
);
assert_eq!(
sink_calls.load(Ordering::Relaxed),
0,
"FailBatch: the block never reaches the sink (parse fails first)"
);
}
#[tokio::test]
async fn parsed_parse_error_dlq_routes_before_commit() {
let transport = Arc::new(mem_transport(50));
transport
.inject(None, br#"{"id":1}"#.to_vec())
.await
.unwrap(); transport
.inject(None, b"not json {{{".to_vec())
.await
.unwrap(); transport
.inject(None, br#"{"id":3}"#.to_vec())
.await
.unwrap();
let routed = Arc::new(AtomicUsize::new(0));
let committed_at_route = Arc::new(AtomicU64::new(u64::MAX));
let rc = Arc::clone(&routed);
let car = Arc::clone(&committed_at_route);
let transport_for_route = Arc::clone(&transport);
let engine = engine_with_parse_action(ParseErrorAction::Dlq).with_filter_dlq_policy(
FilterDlqPolicy::Route(Arc::new(
move |entries: Vec<crate::transport::filter::FilteredDlqEntry>| {
car.store(transport_for_route.committed_sequence(), Ordering::Relaxed);
rc.fetch_add(entries.len(), Ordering::Relaxed);
Ok(())
},
)),
);
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 200);
engine
.run_workbatch_parsed(
&*transport,
shutdown,
|pb: ParsedBatch<'_, _>| {
Ok(WorkBatch::new(pb.records, pb.commit_tokens).with_dlq_entries(pb.dlq_entries))
},
|_out: &WorkBatch<_>| async { Ok(()) },
CommitMode::Auto,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await
.unwrap();
assert_eq!(
routed.load(Ordering::Relaxed),
1,
"Dlq: the parse failure reached the DLQ route point"
);
assert_eq!(
committed_at_route.load(Ordering::Relaxed),
0,
"DLQ route ran BEFORE the source commit advanced the watermark"
);
assert_eq!(
transport.committed_sequence(),
2,
"all 3 acks committed after"
);
}
#[tokio::test]
async fn standard_send_batch_sink_does_not_silently_drop_dlq_entries() {
let transport = mem_transport(50);
transport
.inject(None, br#"{"id":1}"#.to_vec())
.await
.unwrap();
transport
.inject(None, br#"{"id":2}"#.to_vec())
.await
.unwrap();
let routed = Arc::new(AtomicUsize::new(0));
let rc = Arc::clone(&routed);
let engine = default_engine().with_filter_dlq_policy(FilterDlqPolicy::Route(Arc::new(
move |entries: Vec<crate::transport::filter::FilteredDlqEntry>| {
rc.fetch_add(entries.len(), Ordering::Relaxed);
Ok(())
},
)));
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 200);
engine
.run_workbatch(
&transport,
shutdown,
|batch| {
let dlq = vec![crate::transport::filter::FilteredDlqEntry {
payload: b"process-emitted dead-letter".to_vec(),
key: None,
reason: "process decided this record is bad".to_string(),
}];
let tokens = batch.commit_tokens;
let records = batch.records;
Ok(WorkBatch::new(records, tokens).with_dlq_entries(dlq))
},
|_out: &WorkBatch<_>| async { Ok(()) },
CommitMode::Auto,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await
.unwrap();
assert!(
routed.load(Ordering::Relaxed) >= 1,
"process-emitted DLQ entry must reach the DLQ route point, not be \
silently dropped on the path to commit"
);
assert_eq!(transport.committed_sequence(), 1);
}
#[tokio::test]
async fn dlq_route_failure_is_terminal_and_blocks_commit() {
let receiver = OrderedReceiverBad::without_parse_fail();
let committed = Arc::clone(&receiver.committed_hwm);
let engine = default_engine().with_filter_dlq_policy(FilterDlqPolicy::Route(Arc::new(
|_e: Vec<crate::transport::filter::FilteredDlqEntry>| {
Err(EngineError::Sink("dlq transport down".into()))
},
)));
let shutdown = CancellationToken::new();
cancel_after(shutdown.clone(), 500);
let result = engine
.run_workbatch(
&receiver,
shutdown,
|batch| {
let dlq = vec![crate::transport::filter::FilteredDlqEntry {
payload: b"bad".to_vec(),
key: None,
reason: "process dlq".to_string(),
}];
Ok(WorkBatch::new(batch.records, batch.commit_tokens).with_dlq_entries(dlq))
},
|_out: &WorkBatch<_>| async { Ok(()) },
CommitMode::Auto,
None::<(
Duration,
fn() -> std::future::Ready<Result<(), EngineError>>,
)>,
)
.await;
assert!(
result.is_err(),
"DLQ route failure must be a terminal ack-barrier error, got {result:?}"
);
assert_eq!(
committed.load(Ordering::Relaxed),
u64::MAX,
"DLQ route failure must skip the commit -- watermark unmoved"
);
}
struct OrderedReceiverBad {
next: Arc<AtomicU64>,
committed_hwm: Arc<AtomicU64>,
good_payload: bool,
}
impl OrderedReceiverBad {
fn new() -> Self {
Self {
next: Arc::new(AtomicU64::new(0)),
committed_hwm: Arc::new(AtomicU64::new(u64::MAX)),
good_payload: false,
}
}
fn without_parse_fail() -> Self {
Self {
next: Arc::new(AtomicU64::new(0)),
committed_hwm: Arc::new(AtomicU64::new(u64::MAX)),
good_payload: true,
}
}
}
impl crate::transport::TransportBase for OrderedReceiverBad {
fn close(
&self,
) -> impl std::future::Future<Output = crate::transport::TransportResult<()>> + Send {
std::future::ready(Ok(()))
}
fn is_healthy(&self) -> bool {
true
}
fn name(&self) -> &'static str {
"ordered-bad-test"
}
}
impl TransportReceiver for OrderedReceiverBad {
type Token = crate::transport::memory::MemoryToken;
fn recv(
&self,
_max: usize,
) -> impl std::future::Future<Output = crate::transport::TransportResult<WorkBatch<Self::Token>>>
+ Send {
let next = Arc::clone(&self.next);
let good = self.good_payload;
async move {
let seq = next.fetch_add(1, Ordering::Relaxed);
if seq >= 1 {
next.fetch_sub(1, Ordering::Relaxed);
std::future::pending::<()>().await;
}
let payload = if good {
Bytes::from_static(br#"{"ok":1}"#)
} else {
Bytes::from_static(b"not json {{{")
};
let record = Record {
payload,
key: None,
headers: vec![],
metadata: RecordMeta {
timestamp_ms: None,
format: PayloadFormat::Json,
},
};
Ok(WorkBatch::new(
vec![record],
vec![crate::transport::memory::MemoryToken { seq }],
))
}
}
async fn commit(&self, tokens: &[Self::Token]) -> crate::transport::TransportResult<()> {
if let Some(max_seq) = tokens.iter().map(|t| t.seq).max() {
self.committed_hwm.fetch_max(max_seq, Ordering::Relaxed);
}
Ok(())
}
}