---
layout: default
title: Interceptors
nav_order: 6
description: "Producer and consumer interceptor hooks for observability and record modification"
---
# Interceptors Guide
Interceptors allow you to hook into the producer and consumer pipelines at key points.
They are modeled after the Kafka Java client's `ProducerInterceptor` and `ConsumerInterceptor`
interfaces, including support for **ordered interceptor chains**.
## Overview
| `on_send` | Producer | Before a record is partitioned and sent |
| `on_acknowledgement` | Producer | After a record is acknowledged (or fails) |
| `close` | Producer | When the producer is shutting down |
| `on_consume` | Consumer | After records are fetched, before returned to the application |
| `on_commit` | Consumer | After offsets are committed |
| `close` | Consumer | When the consumer is shutting down |
Use cases:
- **Observability**: Count records, measure latency, log errors
- **Record enrichment**: Add tracing headers, inject metadata
- **Auditing**: Track what was produced and consumed
- **Metrics collection**: Feed data into Prometheus, StatsD, etc.
## Producer Interceptor
### Trait Definition
```rust
pub type InterceptorResult = Result<(), Box<dyn std::error::Error + Send + Sync>>;
pub trait ProducerInterceptor: Send + Sync + fmt::Debug {
/// Called before a record is sent (before partitioning).
/// The record can be mutated (e.g., adding headers).
fn on_send(&self, _record: &mut ProducerRecord) -> InterceptorResult { Ok(()) }
/// Called after a record is acknowledged or fails.
/// `error` is `None` on success.
fn on_acknowledgement(&self, _metadata: &RecordMetadata, _error: Option<&KrafkaError>) -> InterceptorResult { Ok(()) }
/// Called when the producer is being closed.
/// Use this to release any resources held by the interceptor.
fn close(&self) -> InterceptorResult { Ok(()) }
}
```
All methods have default no-op implementations, so you only need to override the hooks you care about.
**Note:** `on_acknowledgement` is called for *all* send paths — both the direct send path
(linger = 0) and the batched accumulator path (linger > 0).
### Example: Tracing Headers
```rust
use krafka::interceptor::{InterceptorResult, ProducerInterceptor};
use krafka::producer::{Producer, ProducerRecord, RecordMetadata};
use krafka::error::KrafkaError;
use std::sync::Arc;
use uuid::Uuid;
#[derive(Debug)]
struct TracingInterceptor;
impl ProducerInterceptor for TracingInterceptor {
fn on_send(&self, record: &mut ProducerRecord) -> InterceptorResult {
let trace_id = Uuid::new_v4().to_string();
record.headers.push(("x-trace-id".to_string(), trace_id.into_bytes()));
Ok(())
}
fn on_acknowledgement(&self, metadata: &RecordMetadata, error: Option<&KrafkaError>) -> InterceptorResult {
match error {
None => tracing::info!(
topic = %metadata.topic,
partition = metadata.partition,
offset = metadata.offset,
"record acknowledged"
),
Some(e) => tracing::error!("send failed: {}", e),
}
Ok(())
}
}
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.add_interceptor(Arc::new(TracingInterceptor))
.build()
.await?;
```
### Example: Metrics Counter
```rust
use krafka::interceptor::{InterceptorResult, ProducerInterceptor};
use krafka::producer::{ProducerRecord, RecordMetadata};
use krafka::error::KrafkaError;
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug)]
struct MetricsInterceptor {
sent: AtomicU64,
errors: AtomicU64,
}
impl MetricsInterceptor {
fn new() -> Self {
Self {
sent: AtomicU64::new(0),
errors: AtomicU64::new(0),
}
}
}
impl ProducerInterceptor for MetricsInterceptor {
fn on_acknowledgement(&self, _metadata: &RecordMetadata, error: Option<&KrafkaError>) -> InterceptorResult {
if error.is_some() {
self.errors.fetch_add(1, Ordering::Relaxed);
} else {
self.sent.fetch_add(1, Ordering::Relaxed);
}
Ok(())
}
}
```
## Consumer Interceptor
### Trait Definition
```rust
pub trait ConsumerInterceptor: Send + Sync + fmt::Debug {
/// Called after records are fetched, before returned to the application.
fn on_consume(&self, _records: &[ConsumerRecord]) -> InterceptorResult { Ok(()) }
/// Called after offsets are committed.
/// The map keys are `(topic, partition)` and values are the committed offsets.
fn on_commit(
&self,
_offsets: &HashMap<(String, PartitionId), Offset>,
_error: Option<&KrafkaError>,
) -> InterceptorResult { Ok(()) }
/// Called when the consumer is being closed.
/// Use this to release any resources held by the interceptor.
fn close(&self) -> InterceptorResult { Ok(()) }
}
```
### Example: Consumption Logging
```rust
use krafka::interceptor::{ConsumerInterceptor, InterceptorResult};
use krafka::consumer::{Consumer, ConsumerRecord};
use std::sync::Arc;
#[derive(Debug)]
struct LoggingInterceptor;
impl ConsumerInterceptor for LoggingInterceptor {
fn on_consume(&self, records: &[ConsumerRecord]) -> InterceptorResult {
for record in records {
println!(
"Consumed: topic={}, partition={}, offset={}",
record.topic, record.partition, record.offset
);
}
Ok(())
}
}
let consumer = Consumer::builder()
.bootstrap_servers("localhost:9092")
.group_id("my-group")
.add_interceptor(Arc::new(LoggingInterceptor))
.build()
.await?;
```
### Example: Commit Monitoring
```rust
use krafka::interceptor::{ConsumerInterceptor, InterceptorResult};
use krafka::error::KrafkaError;
use krafka::{Offset, PartitionId};
use std::collections::HashMap;
#[derive(Debug)]
struct CommitMonitor;
impl ConsumerInterceptor for CommitMonitor {
fn on_commit(
&self,
offsets: &HashMap<(String, PartitionId), Offset>,
error: Option<&KrafkaError>,
) -> InterceptorResult {
match error {
None => {
for ((topic, partition), offset) in offsets {
println!("Committed {}:{} at offset {}", topic, partition, offset);
}
}
Some(e) => eprintln!("Commit failed: {}", e),
}
Ok(())
}
}
```
## Wiring Interceptors
### Single Interceptor
```rust
use std::sync::Arc;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.interceptor(Arc::new(MyProducerInterceptor))
.build()
.await?;
```
### Interceptor Chain
Multiple interceptors execute in the order they are added. Each interceptor is
individually error- and panic-isolated — a failure in one interceptor will
not prevent the remaining interceptors from running.
For `on_send`, each interceptor sees the record as modified by all preceding
interceptors in the chain.
> **Error semantics:** In Java, `onSend` returns a new record — if an
> interceptor throws, the next one receives the record from the last
> *successful* interceptor. In Rust, `on_send` mutates in-place (`&mut`);
> if an interceptor returns an error or panics mid-mutation, the next
> interceptor sees a partially-mutated record. Avoid building chains
> where later interceptors depend on invariants set by earlier ones.
```rust
use std::sync::Arc;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.add_interceptor(Arc::new(TracingInterceptor))
.add_interceptor(Arc::new(MetricsInterceptor))
.add_interceptor(Arc::new(AuditInterceptor))
.build()
.await?;
```
```rust
use std::sync::Arc;
let consumer = Consumer::builder()
.bootstrap_servers("localhost:9092")
.group_id("my-group")
.add_interceptor(Arc::new(LoggingInterceptor))
.add_interceptor(Arc::new(MetricsInterceptor))
.build()
.await?;
```
> **Note:** `interceptor()` replaces any previously added interceptors with a
> single one. `add_interceptor()` appends to the chain. Don't mix both in the
> same builder.
### No Interceptor (Default)
When no interceptor is configured, a no-op implementation is used internally.
There is zero overhead — the no-op methods are inlined away by the compiler.
## Pipeline Integration Points
### Producer Pipeline
```
send_record(record)
│
▼
interceptor.on_send(&mut record) ← Modify record here
│
▼
partitioner.partition()
│
▼
encode + send to broker
│
├─ success ─► interceptor.on_acknowledgement(metadata, None)
│
└─ failure ─► interceptor.on_acknowledgement(metadata, Some(error))
```
### Consumer Pipeline
```
poll()
│
▼
fetch from brokers
│
▼
interceptor.on_consume(&records) ← Observe records here
│
▼
return records to application
│
▼
commit()
│
▼
interceptor.on_commit(&offsets, error) ← Only committed offsets (filtered to assigned partitions)
```
## Thread Safety
Interceptors must implement `Send + Sync + Debug`. Use atomic types or `Mutex`/`RwLock`
for any mutable state:
```rust
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug)]
struct SafeInterceptor {
counter: AtomicU64,
}
// AtomicU64 is Send + Sync, so SafeInterceptor is too
```
## Security Considerations
- **Headers may contain credentials:** `on_send()` receives all record headers, which
may include auth tokens or API keys. Do not log full record contents without sanitization.
- **Error messages may leak secrets:** `on_acknowledgement()` error messages from auth
failures may contain broker-echoed details.
- **Debug impls may expose secrets:** Never log the interceptor instance itself
(e.g. `{:?}`) — user-provided `Debug` implementations may expose credentials.
## Error Handling & Panic Safety
All interceptor methods return `InterceptorResult` (`Result<(), Box<dyn Error + Send + Sync>>`).
Errors are **non-fatal** — the chain continues and the error is logged at `warn!`.
This gives interceptor authors a clean, idiomatic way to signal failures
(e.g. a metrics backend is down) without resorting to panics.
As a safety net, all calls are additionally wrapped in `catch_unwind`.
Panics are caught and logged at `error!` with the panic payload **redacted**
(user-provided `Debug` impls may leak secrets). The chain continues even after a panic.
| `Ok(())` | — | Yes |
| `Err(e)` | `warn!` | Yes |
| panic | `error!` (payload redacted) | Yes |
## Next Steps
- [Producer Guide](producer.md) - Producer configuration and usage
- [Consumer Guide](consumer.md) - Consumer groups and offset management
- [Admin Client](admin.md) - Cluster administration