pub struct Consumer { /* private fields */ }Expand description
A Kafka consumer.
Implementations§
Source§impl Consumer
impl Consumer
Sourcepub fn builder() -> ConsumerBuilder
pub fn builder() -> ConsumerBuilder
Create a new consumer builder.
Sourcepub async fn subscribe(&self, topics: &[&str]) -> Result<()>
pub async fn subscribe(&self, topics: &[&str]) -> Result<()>
Subscribe to topics.
Replaces the current subscription with the given topics (matching the Kafka Java client’s replace semantics).
Sourcepub async fn assign(
&self,
topic: &str,
partitions: Vec<PartitionId>,
) -> Result<()>
pub async fn assign( &self, topic: &str, partitions: Vec<PartitionId>, ) -> Result<()>
Assign specific partitions manually.
Manual assignment and group subscription are mutually exclusive. This method returns an error if a group coordinator is active.
Sourcepub async fn seek(
&self,
topic: &str,
partition: PartitionId,
offset: Offset,
) -> Result<()>
pub async fn seek( &self, topic: &str, partition: PartitionId, offset: Offset, ) -> Result<()>
Seek to a specific offset.
Sourcepub async fn seek_many(
&self,
offsets: &HashMap<(String, PartitionId), Offset>,
) -> Result<()>
pub async fn seek_many( &self, offsets: &HashMap<(String, PartitionId), Offset>, ) -> Result<()>
Seek multiple partitions to the given offsets in one atomic update.
Equivalent to calling seek for each entry, but acquires
the offset lock only once and recomputes lag metrics once at the end.
§Example
use std::collections::HashMap;
consumer
.seek_many(&HashMap::from([
(("orders".to_string(), 0), 1_000),
(("orders".to_string(), 1), 2_000),
]))
.await?;Sourcepub async fn seek_to_beginning(
&self,
topic: &str,
partition: PartitionId,
) -> Result<()>
pub async fn seek_to_beginning( &self, topic: &str, partition: PartitionId, ) -> Result<()>
Seek to the beginning.
Sourcepub async fn seek_to_end(
&self,
topic: &str,
partition: PartitionId,
) -> Result<()>
pub async fn seek_to_end( &self, topic: &str, partition: PartitionId, ) -> Result<()>
Seek to the end (latest offset).
Sets the consumer position to the high watermark, so subsequent polls will only return new messages produced after this call.
This resolves the actual latest offset via a ListOffsets RPC to the partition leader. The Kafka Fetch API does not interpret special offset values like -1; those are only meaningful in the ListOffsets API.
Sourcepub async fn offsets_for_times(
&self,
partitions: &[(&str, PartitionId)],
timestamp: i64,
) -> HashMap<(String, PartitionId), Result<Offset>>
pub async fn offsets_for_times( &self, partitions: &[(&str, PartitionId)], timestamp: i64, ) -> HashMap<(String, PartitionId), Result<Offset>>
Look up the earliest offset whose message timestamp is greater than or
equal to the given timestamp, for each listed (topic, partition).
Uses the ListOffsets API. Requests are batched by leader broker so each broker receives at most one RPC.
Every input partition appears in the returned map:
Ok(offset)— the broker returned a valid offset (-1means no message exists at or after the timestamp for that partition).Err(e)— a partition-level broker error (e.g.NotLeaderForPartition) or a transport failure prevented resolution for this partition.
§Example
let results = consumer
.offsets_for_times(&[("orders", 0), ("orders", 1)], 1_700_000_000_000)
.await;
for ((topic, partition), result) in &results {
match result {
Ok(offset) => println!("{topic}-{partition}: {offset}"),
Err(e) => eprintln!("{topic}-{partition}: error {e}"),
}
}Sourcepub async fn offsets_for_times_for_topic(
&self,
topic: &str,
timestamp: i64,
) -> Result<HashMap<PartitionId, Result<Offset>>>
pub async fn offsets_for_times_for_topic( &self, topic: &str, timestamp: i64, ) -> Result<HashMap<PartitionId, Result<Offset>>>
Look up the earliest offset whose message timestamp is greater than or equal to the given timestamp, for every partition of a single topic.
Convenience wrapper around Consumer::offsets_for_times that resolves
the topic’s partitions from metadata so callers don’t have to list
them. Always refreshes topic metadata before deriving the partition
list so the results reflect the latest leader assignments (the refresh
is skipped by the metadata layer if cached metadata is still fresh).
Returns Err if the topic cannot be found after the metadata refresh.
On success, each PartitionId maps to Ok(offset) or Err(e) —
see Consumer::offsets_for_times for per-partition semantics.
Sourcepub async fn fetch_watermarks(
&self,
topic: &str,
partition: PartitionId,
) -> Result<(Offset, Offset)>
pub async fn fetch_watermarks( &self, topic: &str, partition: PartitionId, ) -> Result<(Offset, Offset)>
Fetch the low (log start) and high (latest) watermarks for a partition.
Issues two ListOffsets RPCs to the partition leader — one for the
earliest offset (timestamp = -2) and one for the latest
(timestamp = -1) — and returns (low, high). Both RPCs are issued
concurrently.
Sourcepub async fn fetch_metadata(
&self,
topic: Option<&str>,
) -> Result<FetchMetadataResult>
pub async fn fetch_metadata( &self, topic: Option<&str>, ) -> Result<FetchMetadataResult>
Return a snapshot of cluster metadata (brokers and topics).
If topic is Some, only that topic is returned and the metadata
layer is asked to refresh that topic first (the network call is
skipped if cached metadata is still fresh). If None, a snapshot of
all currently cached topics is returned without triggering a refresh
(cached data may be partial or stale).
Sourcepub async fn poll(&self, timeout: Duration) -> Result<Vec<ConsumerRecord>>
pub async fn poll(&self, timeout: Duration) -> Result<Vec<ConsumerRecord>>
Poll for new records.
Performs one broker fetch round-trip per assigned broker, waits up
to timeout for records to arrive, and returns all records received in
that single round. max_poll_records (from ConsumerConfig) caps
the returned slice.
When to use poll: for simple event loops where low per-call
overhead matters and a single broker round-trip per iteration is
acceptable. Processing happens synchronously in the loop; the fetch
latency equals your per-iteration latency.
When to use batch_recv instead: when you need
a fixed batch size for downstream batching (e.g., bulk database inserts
or transactional exactly-once pipelines). batch_recv drains the
internal buffer first and keeps fetching until max_records are
collected or the deadline elapses — it is the throughput-optimised
path.
§Example
let consumer = Consumer::builder()
.bootstrap_servers("localhost:9092")
.group_id("my-group")
.build()
.await?;
consumer.subscribe(&["my-topic"]).await?;
loop {
let records = consumer.poll(std::time::Duration::from_secs(1)).await?;
for record in records {
println!("Received: {:?}", record);
}
}Sourcepub async fn recv(&self) -> Result<ConsumerRecord, RecvError>
pub async fn recv(&self) -> Result<ConsumerRecord, RecvError>
Receive the next record.
This is a convenience method that returns one record at a time.
Internally buffers records from poll() and returns them one by one,
ensuring no records are lost.
Returns Err(RecvError::Closed) when the consumer is shut down.
Returns Err(RecvError::Error(e)) on broker or network failures.
§Example
loop {
match consumer.recv().await {
Ok(record) => process(record),
Err(RecvError::Closed) => break,
Err(RecvError::Error(e)) => return Err(e),
_ => break, // future variants (non_exhaustive)
}
}Sourcepub async fn batch_recv(
&self,
max_records: usize,
timeout: Duration,
) -> Result<BatchRecvOutcome>
pub async fn batch_recv( &self, max_records: usize, timeout: Duration, ) -> Result<BatchRecvOutcome>
Collect up to max_records records, waiting at most timeout.
Returns as soon as max_records have been collected or timeout
elapses, whichever comes first.
If the consumer closes after some records were already buffered or fetched, those records are returned as a partial batch.
When to use batch_recv: for throughput-optimised pipelines that
need fixed-size batches — bulk database inserts, transactional
exactly-once produce-consume loops, or processing frameworks that
benefit from amortising per-batch overhead. batch_recv drains the
internal record buffer before issuing new fetches and keeps looping
until max_records are collected or the deadline elapses.
When to use poll instead: for simple event loops
that process records as they arrive and where the number of records
per iteration does not need to be bounded.
§BatchRecvOutcome
The enum is #[non_exhaustive], so match arms must include a catch-all
(_ => {}) to remain forward-compatible with new variants.
§Errors
Returns Err on broker or network errors.
§Example
use std::time::Duration;
use krafka::consumer::BatchRecvOutcome;
match consumer.batch_recv(100, Duration::from_millis(200)).await? {
BatchRecvOutcome::Records(records) => {
for record in records {
println!("{}: {:?}", record.offset, record.value);
}
}
BatchRecvOutcome::TimedOut => {}
BatchRecvOutcome::Closed => break,
BatchRecvOutcome::EmptyRequest => {}
_ => {} // required: BatchRecvOutcome is #[non_exhaustive]
}Sourcepub fn stream(&self) -> ConsumerStream<'_>
pub fn stream(&self) -> ConsumerStream<'_>
Create an async Stream of records.
Each element is a Result<ConsumerRecord>. The stream terminates
when the consumer is closed (returns None). Broker and network
errors are propagated as Some(Err(...)).
Internally delegates to recv(), which handles
polling, buffering, auto-commit, rebalancing, and shutdown.
§Example
use tokio_stream::StreamExt;
let mut stream = consumer.stream();
while let Some(result) = stream.next().await {
let record = result?;
println!("{}: {}", record.topic, record.offset);
}Sourcepub async fn commit(&self) -> Result<()>
pub async fn commit(&self) -> Result<()>
Commit offsets for all consumed records.
This stores the current offsets for assigned partitions only. When using a consumer group, this sends an OffsetCommit request to the group coordinator. Offsets for revoked partitions are excluded to avoid overwriting the new owner’s progress.
Sourcepub async fn commit_sync(&self) -> Result<()>
pub async fn commit_sync(&self) -> Result<()>
Commit offsets synchronously.
Sourcepub fn commit_async(&self) -> OffsetCommitHandle ⓘ
pub fn commit_async(&self) -> OffsetCommitHandle ⓘ
Commit offsets asynchronously.
Spawns the offset commit as a background task.
Await the returned handle to observe offset-snapshot, transport, and
broker errors. Retriable coordinator failures use the same short
backoff loop as Consumer::commit. If the handle is dropped, the
task continues in the background and its result is discarded.
Sourcepub async fn commit_with_metadata(
&self,
offsets: HashMap<TopicPartition, OffsetAndMetadata>,
) -> Result<()>
pub async fn commit_with_metadata( &self, offsets: HashMap<TopicPartition, OffsetAndMetadata>, ) -> Result<()>
Commit specific offsets with metadata.
Allows committing offsets for specific topic-partitions with optional metadata. This is useful for checkpointing or storing application-specific context.
§Example
use std::collections::HashMap;
use krafka::consumer::{Consumer, OffsetAndMetadata, TopicPartition};
let mut offsets = HashMap::new();
offsets.insert(
TopicPartition::new("my-topic", 0),
OffsetAndMetadata::with_metadata(100, "checkpoint-abc123"),
);
consumer.commit_with_metadata(offsets).await?;Sourcepub async fn position(
&self,
topic: &str,
partition: PartitionId,
) -> Option<Offset>
pub async fn position( &self, topic: &str, partition: PartitionId, ) -> Option<Offset>
Get the current position for a partition.
Sourcepub async fn assignment(&self) -> HashMap<String, Vec<PartitionId>>
pub async fn assignment(&self) -> HashMap<String, Vec<PartitionId>>
Returns a snapshot of the current partition assignments.
The returned HashMap is a clone — modifying it has no effect on the
consumer’s internal state. Assignments may change asynchronously due
to rebalances.
Sourcepub async fn subscription(&self) -> HashSet<String>
pub async fn subscription(&self) -> HashSet<String>
Get all subscribed topics.
Sourcepub async fn current_lag(
&self,
topic: &str,
partition: PartitionId,
) -> Option<u64>
pub async fn current_lag( &self, topic: &str, partition: PartitionId, ) -> Option<u64>
Get the current lag for a specific partition.
Returns the difference between the high watermark (latest offset on the
broker) and the consumer’s current position. Returns None if the high
watermark or position is not yet known (e.g., no fetch has completed for
this partition).
This uses cached high watermarks from the most recent fetch response — no additional network calls are made.
Sourcepub async fn lag(&self) -> LagResult
pub async fn lag(&self) -> LagResult
Get per-partition lag for all assigned partitions.
Returns a LagResult containing per-partition lag values and a list
of partitions whose cached high watermark is older than
crate::consumer::ConsumerConfigBuilder::lag_staleness_threshold
(default: 60 s).
Partitions whose high watermark or position is not yet known are
omitted from LagResult::lag entirely.
Sourcepub async fn cached_beginning_offset(
&self,
topic: &str,
partition: PartitionId,
) -> Option<Offset>
pub async fn cached_beginning_offset( &self, topic: &str, partition: PartitionId, ) -> Option<Offset>
Get the cached beginning (log start) offset for a partition.
Returns the earliest available offset on the broker, cached from
fetch responses. Returns None if no fetch has completed for this
partition yet. No network calls are made.
Sourcepub async fn cached_end_offset(
&self,
topic: &str,
partition: PartitionId,
) -> Option<Offset>
pub async fn cached_end_offset( &self, topic: &str, partition: PartitionId, ) -> Option<Offset>
Get the cached end (high watermark) offset for a partition.
Returns the latest offset on the broker, cached from fetch responses.
Returns None if no fetch has completed for this partition yet.
No network calls are made.
Sourcepub async fn fetch_end_offset(
&self,
topic: &str,
partition: PartitionId,
) -> Result<Offset>
pub async fn fetch_end_offset( &self, topic: &str, partition: PartitionId, ) -> Result<Offset>
Fetch the current end (high-watermark) offset for a single partition
with a live ListOffsets RPC.
Unlike cached_end_offset, which returns a
value from the most-recent fetch response, this method always contacts
the partition leader and returns a fresh value. Use it when staleness
is unacceptable — for example, before the first poll, when a partition
is paused, or when computing precise consumer-lag metrics.
§Errors
Returns Err if the topic name is invalid, no leader is available, or
the broker returns an error.
Sourcepub async fn is_caught_up(&self) -> bool
pub async fn is_caught_up(&self) -> bool
Returns true if the consumer’s current position has reached or
exceeded the high-watermark (end offset) on every assigned partition.
“Caught up” means there are no more records available to consume right now. This check uses cached high-watermarks updated on each successful fetch response; the values are not refreshed by this call.
Returns false if:
- Any assigned partition’s high-watermark has not yet been cached (i.e. no successful fetch has completed for that partition).
- The consumer’s position on any partition is behind its cached high-watermark.
For a precise check against fresh broker state, call
fetch_end_offset on each partition and
compare it to position.
Sourcepub async fn unsubscribe(&self) -> Result<()>
pub async fn unsubscribe(&self) -> Result<()>
Unsubscribe from all topics.
properly notifies the rebalance listener, leaves the consumer group, clears offsets, paused set, and drains recv buffer.
Returns a leave-group error after local state has still been cleared.
Sourcepub async fn pause(&self, topic: &str, partitions: &[PartitionId])
pub async fn pause(&self, topic: &str, partitions: &[PartitionId])
Pause consumption of specific partitions.
Paused partitions will be skipped during poll() until resumed.
Sourcepub async fn resume(&self, topic: &str, partitions: &[PartitionId])
pub async fn resume(&self, topic: &str, partitions: &[PartitionId])
Resume consumption of specific partitions.
Resumes polling for previously paused partitions.
Sourcepub async fn paused_partitions(&self) -> HashSet<(String, PartitionId)>
pub async fn paused_partitions(&self) -> HashSet<(String, PartitionId)>
Get the set of paused partitions.
Sourcepub fn update_seed_brokers(&self, servers: Vec<String>) -> Result<()>
pub fn update_seed_brokers(&self, servers: Vec<String>) -> Result<()>
Replace the bootstrap server list at runtime (KIP-899).
The new addresses are used on the next metadata refresh that falls back to bootstrap servers. Does not close existing connections.
§Errors
Returns an error if servers is empty.
Sourcepub async fn rebootstrap(&self)
pub async fn rebootstrap(&self)
Force a rebootstrap: close all connections, clear the metadata cache, and fall back to bootstrap servers (KIP-899).
Sourcepub async fn close(&self) -> Result<()>
pub async fn close(&self) -> Result<()>
Close the consumer.
Commits offsets (if auto-commit is enabled), leaves the consumer group,
and tears down connections. Calling close() more than once is a no-op.
Returns the first cleanup error after local state and connections have still been torn down.
Sourcepub fn group_coordinator(&self) -> Option<&Arc<GroupCoordinator>>
pub fn group_coordinator(&self) -> Option<&Arc<GroupCoordinator>>
Get the group coordinator, if one is configured.
Sourcepub fn metrics(&self) -> &Arc<ConsumerMetrics> ⓘ
pub fn metrics(&self) -> &Arc<ConsumerMetrics> ⓘ
Get a snapshot of consumer metrics.
Sourcepub fn connection_metrics(&self) -> Arc<ConnectionMetrics> ⓘ
pub fn connection_metrics(&self) -> Arc<ConnectionMetrics> ⓘ
Get the shared connection metrics handle used by this consumer’s broker pool.
Trait Implementations§
Auto Trait Implementations§
impl !Freeze for Consumer
impl !RefUnwindSafe for Consumer
impl Send for Consumer
impl Sync for Consumer
impl Unpin for Consumer
impl UnsafeUnpin for Consumer
impl !UnwindSafe for Consumer
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more