use ahash::AHashMap as HashMap;
use std::fmt;
use std::panic::{AssertUnwindSafe, catch_unwind};
use std::sync::Arc;
use crate::consumer::ConsumerRecord;
use crate::error::KrafkaError;
use crate::producer::{ProducerRecord, RecordMetadata};
use crate::{Offset, PartitionId};
pub type InterceptorResult = std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>;
pub trait ProducerInterceptor: Send + Sync + fmt::Debug {
fn on_send(&self, _record: &mut ProducerRecord) -> InterceptorResult {
Ok(())
}
fn on_acknowledgement(
&self,
_metadata: &RecordMetadata,
_error: Option<&KrafkaError>,
) -> InterceptorResult {
Ok(())
}
fn close(&self) -> InterceptorResult {
Ok(())
}
}
pub trait ConsumerInterceptor: Send + Sync + fmt::Debug {
fn on_consume(&self, _records: &[ConsumerRecord]) -> InterceptorResult {
Ok(())
}
fn on_commit(
&self,
_offsets: &HashMap<(String, PartitionId), Offset>,
_error: Option<&KrafkaError>,
) -> InterceptorResult {
Ok(())
}
fn close(&self) -> InterceptorResult {
Ok(())
}
}
#[derive(Debug)]
pub(crate) struct NoOpProducerInterceptor;
impl ProducerInterceptor for NoOpProducerInterceptor {}
#[derive(Debug)]
pub(crate) struct NoOpConsumerInterceptor;
impl ConsumerInterceptor for NoOpConsumerInterceptor {}
pub(crate) struct ProducerInterceptorChain {
interceptors: Vec<Arc<dyn ProducerInterceptor>>,
}
impl fmt::Debug for ProducerInterceptorChain {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ProducerInterceptorChain")
.field("len", &self.interceptors.len())
.finish()
}
}
impl ProducerInterceptorChain {
pub fn new(interceptors: Vec<Arc<dyn ProducerInterceptor>>) -> Self {
Self { interceptors }
}
}
impl ProducerInterceptor for ProducerInterceptorChain {
fn on_send(&self, record: &mut ProducerRecord) -> InterceptorResult {
for (i, interceptor) in self.interceptors.iter().enumerate() {
let snapshot = record.clone();
match catch_unwind(AssertUnwindSafe(|| interceptor.on_send(record))) {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::warn!(
chain_index = i,
chain_len = self.interceptors.len(),
topic = record.topic.as_str(),
error = %e,
"ProducerInterceptor.on_send failed",
);
}
Err(_) => {
*record = snapshot;
tracing::error!(
chain_index = i,
chain_len = self.interceptors.len(),
topic = record.topic.as_str(),
"ProducerInterceptor.on_send panicked — record restored (payload redacted)",
);
}
}
}
Ok(())
}
fn on_acknowledgement(
&self,
metadata: &RecordMetadata,
error: Option<&KrafkaError>,
) -> InterceptorResult {
for (i, interceptor) in self.interceptors.iter().enumerate() {
match catch_unwind(AssertUnwindSafe(|| {
interceptor.on_acknowledgement(metadata, error)
})) {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::warn!(
chain_index = i,
chain_len = self.interceptors.len(),
topic = metadata.topic.as_str(),
partition = metadata.partition,
error = %e,
"ProducerInterceptor.on_acknowledgement failed",
);
}
Err(_) => {
tracing::error!(
chain_index = i,
chain_len = self.interceptors.len(),
topic = metadata.topic.as_str(),
partition = metadata.partition,
"ProducerInterceptor.on_acknowledgement panicked (payload redacted)",
);
}
}
}
Ok(())
}
fn close(&self) -> InterceptorResult {
for (i, interceptor) in self.interceptors.iter().enumerate() {
match catch_unwind(AssertUnwindSafe(|| interceptor.close())) {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::warn!(
chain_index = i,
chain_len = self.interceptors.len(),
error = %e,
"ProducerInterceptor.close failed",
);
}
Err(_) => {
tracing::error!(
chain_index = i,
chain_len = self.interceptors.len(),
"ProducerInterceptor.close panicked (payload redacted)",
);
}
}
}
Ok(())
}
}
pub(crate) struct ConsumerInterceptorChain {
interceptors: Vec<Arc<dyn ConsumerInterceptor>>,
}
impl fmt::Debug for ConsumerInterceptorChain {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConsumerInterceptorChain")
.field("len", &self.interceptors.len())
.finish()
}
}
impl ConsumerInterceptorChain {
pub fn new(interceptors: Vec<Arc<dyn ConsumerInterceptor>>) -> Self {
Self { interceptors }
}
}
impl ConsumerInterceptor for ConsumerInterceptorChain {
fn on_consume(&self, records: &[ConsumerRecord]) -> InterceptorResult {
for (i, interceptor) in self.interceptors.iter().enumerate() {
match catch_unwind(AssertUnwindSafe(|| interceptor.on_consume(records))) {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::warn!(
chain_index = i,
chain_len = self.interceptors.len(),
record_count = records.len(),
error = %e,
"ConsumerInterceptor.on_consume failed",
);
}
Err(_) => {
tracing::error!(
chain_index = i,
chain_len = self.interceptors.len(),
record_count = records.len(),
"ConsumerInterceptor.on_consume panicked (payload redacted)",
);
}
}
}
Ok(())
}
fn on_commit(
&self,
offsets: &HashMap<(String, PartitionId), Offset>,
error: Option<&KrafkaError>,
) -> InterceptorResult {
for (i, interceptor) in self.interceptors.iter().enumerate() {
match catch_unwind(AssertUnwindSafe(|| interceptor.on_commit(offsets, error))) {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::warn!(
chain_index = i,
chain_len = self.interceptors.len(),
offset_count = offsets.len(),
error = %e,
"ConsumerInterceptor.on_commit failed",
);
}
Err(_) => {
tracing::error!(
chain_index = i,
chain_len = self.interceptors.len(),
offset_count = offsets.len(),
"ConsumerInterceptor.on_commit panicked (payload redacted)",
);
}
}
}
Ok(())
}
fn close(&self) -> InterceptorResult {
for (i, interceptor) in self.interceptors.iter().enumerate() {
match catch_unwind(AssertUnwindSafe(|| interceptor.close())) {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::warn!(
chain_index = i,
chain_len = self.interceptors.len(),
error = %e,
"ConsumerInterceptor.close failed",
);
}
Err(_) => {
tracing::error!(
chain_index = i,
chain_len = self.interceptors.len(),
"ConsumerInterceptor.close panicked (payload redacted)",
);
}
}
}
Ok(())
}
}
pub(crate) fn safe_on_send(interceptor: &dyn ProducerInterceptor, record: &mut ProducerRecord) {
match catch_unwind(AssertUnwindSafe(|| interceptor.on_send(record))) {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::warn!(
topic = record.topic.as_str(),
error = %e,
"ProducerInterceptor.on_send failed",
);
}
Err(_) => {
tracing::error!(
topic = record.topic.as_str(),
"ProducerInterceptor.on_send panicked (payload redacted)",
);
}
}
}
pub(crate) fn safe_on_acknowledgement(
interceptor: &dyn ProducerInterceptor,
metadata: &RecordMetadata,
error: Option<&KrafkaError>,
) {
match catch_unwind(AssertUnwindSafe(|| {
interceptor.on_acknowledgement(metadata, error)
})) {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::warn!(
topic = metadata.topic.as_str(),
partition = metadata.partition,
error = %e,
"ProducerInterceptor.on_acknowledgement failed",
);
}
Err(_) => {
tracing::error!(
topic = metadata.topic.as_str(),
partition = metadata.partition,
"ProducerInterceptor.on_acknowledgement panicked (payload redacted)",
);
}
}
}
pub(crate) fn safe_producer_close(interceptor: &dyn ProducerInterceptor) {
match catch_unwind(AssertUnwindSafe(|| interceptor.close())) {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::warn!(
error = %e,
"ProducerInterceptor.close failed",
);
}
Err(_) => {
tracing::error!("ProducerInterceptor.close panicked (payload redacted)");
}
}
}
pub(crate) fn safe_on_consume(interceptor: &dyn ConsumerInterceptor, records: &[ConsumerRecord]) {
match catch_unwind(AssertUnwindSafe(|| interceptor.on_consume(records))) {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::warn!(
record_count = records.len(),
error = %e,
"ConsumerInterceptor.on_consume failed",
);
}
Err(_) => {
tracing::error!(
record_count = records.len(),
"ConsumerInterceptor.on_consume panicked (payload redacted)",
);
}
}
}
pub(crate) fn safe_on_commit(
interceptor: &dyn ConsumerInterceptor,
offsets: &HashMap<(String, PartitionId), Offset>,
error: Option<&KrafkaError>,
) {
match catch_unwind(AssertUnwindSafe(|| interceptor.on_commit(offsets, error))) {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::warn!(
offset_count = offsets.len(),
error = %e,
"ConsumerInterceptor.on_commit failed",
);
}
Err(_) => {
tracing::error!(
offset_count = offsets.len(),
"ConsumerInterceptor.on_commit panicked (payload redacted)",
);
}
}
}
pub(crate) fn safe_consumer_close(interceptor: &dyn ConsumerInterceptor) {
match catch_unwind(AssertUnwindSafe(|| interceptor.close())) {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::warn!(
error = %e,
"ConsumerInterceptor.close failed",
);
}
Err(_) => {
tracing::error!("ConsumerInterceptor.close panicked (payload redacted)");
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[derive(Debug)]
struct TestProducerInterceptor {
send_count: std::sync::atomic::AtomicUsize,
ack_count: std::sync::atomic::AtomicUsize,
}
impl TestProducerInterceptor {
fn new() -> Self {
Self {
send_count: std::sync::atomic::AtomicUsize::new(0),
ack_count: std::sync::atomic::AtomicUsize::new(0),
}
}
fn send_count(&self) -> usize {
self.send_count.load(std::sync::atomic::Ordering::Relaxed)
}
fn ack_count(&self) -> usize {
self.ack_count.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl ProducerInterceptor for TestProducerInterceptor {
fn on_send(&self, record: &mut ProducerRecord) -> InterceptorResult {
self.send_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
record.headers.push((
"x-intercepted".to_string(),
bytes::Bytes::from_static(b"true"),
));
Ok(())
}
fn on_acknowledgement(
&self,
_metadata: &RecordMetadata,
_error: Option<&KrafkaError>,
) -> InterceptorResult {
self.ack_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(())
}
}
#[derive(Debug)]
struct TestConsumerInterceptor {
consume_count: std::sync::atomic::AtomicUsize,
commit_count: std::sync::atomic::AtomicUsize,
}
impl TestConsumerInterceptor {
fn new() -> Self {
Self {
consume_count: std::sync::atomic::AtomicUsize::new(0),
commit_count: std::sync::atomic::AtomicUsize::new(0),
}
}
fn consume_count(&self) -> usize {
self.consume_count
.load(std::sync::atomic::Ordering::Relaxed)
}
fn commit_count(&self) -> usize {
self.commit_count.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl ConsumerInterceptor for TestConsumerInterceptor {
fn on_consume(&self, records: &[ConsumerRecord]) -> InterceptorResult {
self.consume_count
.fetch_add(records.len(), std::sync::atomic::Ordering::Relaxed);
Ok(())
}
fn on_commit(
&self,
_offsets: &HashMap<(String, PartitionId), Offset>,
_error: Option<&KrafkaError>,
) -> InterceptorResult {
self.commit_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(())
}
}
#[test]
fn test_producer_interceptor_on_send() {
let interceptor = TestProducerInterceptor::new();
let mut record = ProducerRecord::new("test-topic", b"value".to_vec());
assert_eq!(interceptor.send_count(), 0);
assert!(record.headers.is_empty());
interceptor.on_send(&mut record).unwrap();
assert_eq!(interceptor.send_count(), 1);
assert_eq!(record.headers.len(), 1);
assert_eq!(record.headers[0].0, "x-intercepted");
assert_eq!(record.headers[0].1, bytes::Bytes::from_static(b"true"));
}
#[test]
fn test_producer_interceptor_on_acknowledgement() {
let interceptor = TestProducerInterceptor::new();
let metadata = RecordMetadata {
topic: "test-topic".to_string(),
partition: 0,
offset: 42,
timestamp: 1000,
};
interceptor.on_acknowledgement(&metadata, None).unwrap();
assert_eq!(interceptor.ack_count(), 1);
let err = KrafkaError::config("test error");
interceptor
.on_acknowledgement(&metadata, Some(&err))
.unwrap();
assert_eq!(interceptor.ack_count(), 2);
}
#[test]
fn test_consumer_interceptor_on_consume() {
let interceptor = TestConsumerInterceptor::new();
let records = vec![
ConsumerRecord::new("test-topic", 0, 0, None, Some(bytes::Bytes::from("v1"))),
ConsumerRecord::new("test-topic", 0, 1, None, Some(bytes::Bytes::from("v2"))),
];
interceptor.on_consume(&records).unwrap();
assert_eq!(interceptor.consume_count(), 2);
}
#[test]
fn test_consumer_interceptor_on_commit() {
let interceptor = TestConsumerInterceptor::new();
let mut offsets = HashMap::new();
offsets.insert(("test-topic".to_string(), 0), 10i64);
interceptor.on_commit(&offsets, None).unwrap();
assert_eq!(interceptor.commit_count(), 1);
}
#[test]
fn test_noop_interceptors() {
let producer_interceptor = NoOpProducerInterceptor;
let mut record = ProducerRecord::new("test", b"value".to_vec());
producer_interceptor.on_send(&mut record).unwrap();
assert!(record.headers.is_empty());
let consumer_interceptor = NoOpConsumerInterceptor;
consumer_interceptor.on_consume(&[]).unwrap();
consumer_interceptor
.on_commit(&HashMap::new(), None)
.unwrap();
}
#[derive(Debug)]
struct PanickingProducerInterceptor;
impl ProducerInterceptor for PanickingProducerInterceptor {
fn on_send(&self, _record: &mut ProducerRecord) -> InterceptorResult {
panic!("on_send panic");
}
fn on_acknowledgement(
&self,
_metadata: &RecordMetadata,
_error: Option<&KrafkaError>,
) -> InterceptorResult {
panic!("on_acknowledgement panic");
}
fn close(&self) -> InterceptorResult {
panic!("producer close panic");
}
}
#[derive(Debug)]
struct PanickingConsumerInterceptor;
impl ConsumerInterceptor for PanickingConsumerInterceptor {
fn on_consume(&self, _records: &[ConsumerRecord]) -> InterceptorResult {
panic!("on_consume panic");
}
fn on_commit(
&self,
_offsets: &HashMap<(String, PartitionId), Offset>,
_error: Option<&KrafkaError>,
) -> InterceptorResult {
panic!("on_commit panic");
}
fn close(&self) -> InterceptorResult {
panic!("consumer close panic");
}
}
#[test]
fn test_safe_on_send_catches_panic() {
let interceptor = PanickingProducerInterceptor;
let mut record = ProducerRecord::new("test", b"value".to_vec());
safe_on_send(&interceptor, &mut record);
}
#[test]
fn test_safe_on_acknowledgement_catches_panic() {
let interceptor = PanickingProducerInterceptor;
let metadata = RecordMetadata {
topic: "test".to_string(),
partition: 0,
offset: 0,
timestamp: 0,
};
safe_on_acknowledgement(&interceptor, &metadata, None);
}
#[test]
fn test_safe_producer_close_catches_panic() {
let interceptor = PanickingProducerInterceptor;
safe_producer_close(&interceptor);
}
#[test]
fn test_safe_on_consume_catches_panic() {
let interceptor = PanickingConsumerInterceptor;
safe_on_consume(&interceptor, &[]);
}
#[test]
fn test_safe_on_commit_catches_panic() {
let interceptor = PanickingConsumerInterceptor;
safe_on_commit(&interceptor, &HashMap::new(), None);
}
#[test]
fn test_safe_consumer_close_catches_panic() {
let interceptor = PanickingConsumerInterceptor;
safe_consumer_close(&interceptor);
}
#[test]
fn test_close_default_noop() {
let p = NoOpProducerInterceptor;
p.close().unwrap();
let c = NoOpConsumerInterceptor;
c.close().unwrap();
}
#[derive(Debug)]
struct OrderedProducerInterceptor {
name: &'static str,
log: Arc<std::sync::Mutex<Vec<String>>>,
}
impl ProducerInterceptor for OrderedProducerInterceptor {
fn on_send(&self, _record: &mut ProducerRecord) -> InterceptorResult {
self.log
.lock()
.unwrap()
.push(format!("{}.on_send", self.name));
Ok(())
}
fn on_acknowledgement(
&self,
_metadata: &RecordMetadata,
_error: Option<&KrafkaError>,
) -> InterceptorResult {
self.log
.lock()
.unwrap()
.push(format!("{}.on_ack", self.name));
Ok(())
}
fn close(&self) -> InterceptorResult {
self.log
.lock()
.unwrap()
.push(format!("{}.close", self.name));
Ok(())
}
}
#[test]
fn test_producer_chain_executes_in_order() {
let log = Arc::new(std::sync::Mutex::new(Vec::new()));
let chain = ProducerInterceptorChain::new(vec![
Arc::new(OrderedProducerInterceptor {
name: "first",
log: Arc::clone(&log),
}),
Arc::new(OrderedProducerInterceptor {
name: "second",
log: Arc::clone(&log),
}),
Arc::new(OrderedProducerInterceptor {
name: "third",
log: Arc::clone(&log),
}),
]);
let mut record = ProducerRecord::new("test", b"value".to_vec());
chain.on_send(&mut record).unwrap();
let metadata = RecordMetadata {
topic: "test".to_string(),
partition: 0,
offset: 0,
timestamp: 0,
};
chain.on_acknowledgement(&metadata, None).unwrap();
chain.close().unwrap();
let log = log.lock().unwrap();
assert_eq!(
*log,
vec![
"first.on_send",
"second.on_send",
"third.on_send",
"first.on_ack",
"second.on_ack",
"third.on_ack",
"first.close",
"second.close",
"third.close",
]
);
}
#[test]
fn test_producer_chain_on_send_mutations_visible_to_next() {
#[derive(Debug)]
struct HeaderAdder(&'static str);
impl ProducerInterceptor for HeaderAdder {
fn on_send(&self, record: &mut ProducerRecord) -> InterceptorResult {
record.headers.push((
self.0.to_string(),
bytes::Bytes::copy_from_slice(self.0.as_bytes()),
));
Ok(())
}
}
let chain = ProducerInterceptorChain::new(vec![
Arc::new(HeaderAdder("first")),
Arc::new(HeaderAdder("second")),
]);
let mut record = ProducerRecord::new("test", b"value".to_vec());
chain.on_send(&mut record).unwrap();
assert_eq!(record.headers.len(), 2);
assert_eq!(record.headers[0].0, "first");
assert_eq!(record.headers[1].0, "second");
}
#[test]
fn test_producer_chain_panic_isolation() {
let log = Arc::new(std::sync::Mutex::new(Vec::new()));
let chain = ProducerInterceptorChain::new(vec![
Arc::new(OrderedProducerInterceptor {
name: "before",
log: Arc::clone(&log),
}),
Arc::new(PanickingProducerInterceptor),
Arc::new(OrderedProducerInterceptor {
name: "after",
log: Arc::clone(&log),
}),
]);
let mut record = ProducerRecord::new("test", b"value".to_vec());
chain.on_send(&mut record).unwrap();
let metadata = RecordMetadata {
topic: "test".to_string(),
partition: 0,
offset: 0,
timestamp: 0,
};
chain.on_acknowledgement(&metadata, None).unwrap();
chain.close().unwrap();
let log = log.lock().unwrap();
assert_eq!(
*log,
vec![
"before.on_send",
"after.on_send",
"before.on_ack",
"after.on_ack",
"before.close",
"after.close",
]
);
}
#[test]
fn test_producer_chain_empty() {
let chain = ProducerInterceptorChain::new(vec![]);
let mut record = ProducerRecord::new("test", b"value".to_vec());
chain.on_send(&mut record).unwrap();
chain.close().unwrap();
}
#[derive(Debug)]
struct OrderedConsumerInterceptor {
name: &'static str,
log: Arc<std::sync::Mutex<Vec<String>>>,
}
impl ConsumerInterceptor for OrderedConsumerInterceptor {
fn on_consume(&self, _records: &[ConsumerRecord]) -> InterceptorResult {
self.log
.lock()
.unwrap()
.push(format!("{}.on_consume", self.name));
Ok(())
}
fn on_commit(
&self,
_offsets: &HashMap<(String, PartitionId), Offset>,
_error: Option<&KrafkaError>,
) -> InterceptorResult {
self.log
.lock()
.unwrap()
.push(format!("{}.on_commit", self.name));
Ok(())
}
fn close(&self) -> InterceptorResult {
self.log
.lock()
.unwrap()
.push(format!("{}.close", self.name));
Ok(())
}
}
#[test]
fn test_consumer_chain_executes_in_order() {
let log = Arc::new(std::sync::Mutex::new(Vec::new()));
let chain = ConsumerInterceptorChain::new(vec![
Arc::new(OrderedConsumerInterceptor {
name: "first",
log: Arc::clone(&log),
}),
Arc::new(OrderedConsumerInterceptor {
name: "second",
log: Arc::clone(&log),
}),
]);
chain.on_consume(&[]).unwrap();
chain.on_commit(&HashMap::new(), None).unwrap();
chain.close().unwrap();
let log = log.lock().unwrap();
assert_eq!(
*log,
vec![
"first.on_consume",
"second.on_consume",
"first.on_commit",
"second.on_commit",
"first.close",
"second.close",
]
);
}
#[test]
fn test_consumer_chain_panic_isolation() {
let log = Arc::new(std::sync::Mutex::new(Vec::new()));
let chain = ConsumerInterceptorChain::new(vec![
Arc::new(OrderedConsumerInterceptor {
name: "before",
log: Arc::clone(&log),
}),
Arc::new(PanickingConsumerInterceptor),
Arc::new(OrderedConsumerInterceptor {
name: "after",
log: Arc::clone(&log),
}),
]);
chain.on_consume(&[]).unwrap();
chain.on_commit(&HashMap::new(), None).unwrap();
chain.close().unwrap();
let log = log.lock().unwrap();
assert_eq!(
*log,
vec![
"before.on_consume",
"after.on_consume",
"before.on_commit",
"after.on_commit",
"before.close",
"after.close",
]
);
}
#[test]
fn test_consumer_chain_empty() {
let chain = ConsumerInterceptorChain::new(vec![]);
chain.on_consume(&[]).unwrap();
chain.on_commit(&HashMap::new(), None).unwrap();
chain.close().unwrap();
}
#[test]
fn test_chain_via_safe_wrappers() {
let log = Arc::new(std::sync::Mutex::new(Vec::new()));
let chain = ProducerInterceptorChain::new(vec![
Arc::new(OrderedProducerInterceptor {
name: "a",
log: Arc::clone(&log),
}),
Arc::new(OrderedProducerInterceptor {
name: "b",
log: Arc::clone(&log),
}),
]);
let mut record = ProducerRecord::new("test", b"v".to_vec());
safe_on_send(&chain, &mut record);
let log = log.lock().unwrap();
assert_eq!(*log, vec!["a.on_send", "b.on_send"]);
}
#[derive(Debug)]
struct FailingProducerInterceptor;
impl ProducerInterceptor for FailingProducerInterceptor {
fn on_send(&self, _record: &mut ProducerRecord) -> InterceptorResult {
Err("metrics backend unavailable".into())
}
fn on_acknowledgement(
&self,
_metadata: &RecordMetadata,
_error: Option<&KrafkaError>,
) -> InterceptorResult {
Err("ack handler failed".into())
}
fn close(&self) -> InterceptorResult {
Err("cleanup failed".into())
}
}
#[derive(Debug)]
struct FailingConsumerInterceptor;
impl ConsumerInterceptor for FailingConsumerInterceptor {
fn on_consume(&self, _records: &[ConsumerRecord]) -> InterceptorResult {
Err("consume handler failed".into())
}
fn on_commit(
&self,
_offsets: &HashMap<(String, PartitionId), Offset>,
_error: Option<&KrafkaError>,
) -> InterceptorResult {
Err("commit handler failed".into())
}
fn close(&self) -> InterceptorResult {
Err("cleanup failed".into())
}
}
#[test]
fn test_producer_chain_error_isolation() {
let log = Arc::new(std::sync::Mutex::new(Vec::new()));
let chain = ProducerInterceptorChain::new(vec![
Arc::new(OrderedProducerInterceptor {
name: "before",
log: Arc::clone(&log),
}),
Arc::new(FailingProducerInterceptor),
Arc::new(OrderedProducerInterceptor {
name: "after",
log: Arc::clone(&log),
}),
]);
let mut record = ProducerRecord::new("test", b"value".to_vec());
chain.on_send(&mut record).unwrap();
chain.close().unwrap();
let log = log.lock().unwrap();
assert_eq!(
*log,
vec![
"before.on_send",
"after.on_send",
"before.close",
"after.close"
]
);
}
#[test]
fn test_consumer_chain_error_isolation() {
let log = Arc::new(std::sync::Mutex::new(Vec::new()));
let chain = ConsumerInterceptorChain::new(vec![
Arc::new(OrderedConsumerInterceptor {
name: "before",
log: Arc::clone(&log),
}),
Arc::new(FailingConsumerInterceptor),
Arc::new(OrderedConsumerInterceptor {
name: "after",
log: Arc::clone(&log),
}),
]);
chain.on_consume(&[]).unwrap();
chain.close().unwrap();
let log = log.lock().unwrap();
assert_eq!(
*log,
vec![
"before.on_consume",
"after.on_consume",
"before.close",
"after.close"
]
);
}
#[test]
fn test_safe_wrappers_catch_errors() {
let interceptor = FailingProducerInterceptor;
let mut record = ProducerRecord::new("test", b"v".to_vec());
safe_on_send(&interceptor, &mut record);
safe_producer_close(&interceptor);
let interceptor = FailingConsumerInterceptor;
safe_on_consume(&interceptor, &[]);
safe_consumer_close(&interceptor);
}
#[test]
fn test_producer_chain_error_at_first_position() {
let log = Arc::new(std::sync::Mutex::new(Vec::new()));
let chain = ProducerInterceptorChain::new(vec![
Arc::new(FailingProducerInterceptor),
Arc::new(OrderedProducerInterceptor {
name: "second",
log: Arc::clone(&log),
}),
]);
let mut record = ProducerRecord::new("test", b"value".to_vec());
chain.on_send(&mut record).unwrap();
let log = log.lock().unwrap();
assert_eq!(*log, vec!["second.on_send"]);
}
#[test]
fn test_producer_chain_error_at_last_position() {
let log = Arc::new(std::sync::Mutex::new(Vec::new()));
let chain = ProducerInterceptorChain::new(vec![
Arc::new(OrderedProducerInterceptor {
name: "first",
log: Arc::clone(&log),
}),
Arc::new(FailingProducerInterceptor),
]);
let mut record = ProducerRecord::new("test", b"value".to_vec());
chain.on_send(&mut record).unwrap();
let log = log.lock().unwrap();
assert_eq!(*log, vec!["first.on_send"]);
}
#[test]
fn test_producer_chain_mixed_error_and_panic() {
let log = Arc::new(std::sync::Mutex::new(Vec::new()));
let chain = ProducerInterceptorChain::new(vec![
Arc::new(OrderedProducerInterceptor {
name: "first",
log: Arc::clone(&log),
}),
Arc::new(FailingProducerInterceptor),
Arc::new(PanickingProducerInterceptor),
Arc::new(OrderedProducerInterceptor {
name: "last",
log: Arc::clone(&log),
}),
]);
let mut record = ProducerRecord::new("test", b"value".to_vec());
chain.on_send(&mut record).unwrap();
let log = log.lock().unwrap();
assert_eq!(*log, vec!["first.on_send", "last.on_send"]);
}
#[test]
fn test_consumer_chain_mixed_error_and_panic() {
let log = Arc::new(std::sync::Mutex::new(Vec::new()));
let chain = ConsumerInterceptorChain::new(vec![
Arc::new(OrderedConsumerInterceptor {
name: "first",
log: Arc::clone(&log),
}),
Arc::new(FailingConsumerInterceptor),
Arc::new(PanickingConsumerInterceptor),
Arc::new(OrderedConsumerInterceptor {
name: "last",
log: Arc::clone(&log),
}),
]);
chain.on_consume(&[]).unwrap();
let log = log.lock().unwrap();
assert_eq!(*log, vec!["first.on_consume", "last.on_consume"]);
}
}