Skip to main content

Consumer

Struct Consumer 

Source
pub struct Consumer { /* private fields */ }
Expand description

A Kafka consumer.

Implementations§

Source§

impl Consumer

Source

pub fn builder() -> ConsumerBuilder

Create a new consumer builder.

Source

pub async fn subscribe(&self, topics: &[&str]) -> Result<()>

Subscribe to topics.

Replaces the current subscription with the given topics (matching the Kafka Java client’s replace semantics).

Source

pub async fn assign( &self, topic: &str, partitions: Vec<PartitionId>, ) -> Result<()>

Assign specific partitions manually.

Manual assignment and group subscription are mutually exclusive. This method returns an error if a group coordinator is active.

Source

pub async fn seek( &self, topic: &str, partition: PartitionId, offset: Offset, ) -> Result<()>

Seek to a specific offset.

Source

pub async fn seek_many( &self, offsets: &HashMap<(String, PartitionId), Offset>, ) -> Result<()>

Seek multiple partitions to the given offsets in one atomic update.

Equivalent to calling seek for each entry, but acquires the offset lock only once and recomputes lag metrics once at the end.

§Example
use std::collections::HashMap;

consumer
    .seek_many(&HashMap::from([
        (("orders".to_string(), 0), 1_000),
        (("orders".to_string(), 1), 2_000),
    ]))
    .await?;
Source

pub async fn seek_to_beginning( &self, topic: &str, partition: PartitionId, ) -> Result<()>

Seek to the beginning.

Source

pub async fn seek_to_end( &self, topic: &str, partition: PartitionId, ) -> Result<()>

Seek to the end (latest offset).

Sets the consumer position to the high watermark, so subsequent polls will only return new messages produced after this call.

This resolves the actual latest offset via a ListOffsets RPC to the partition leader. The Kafka Fetch API does not interpret special offset values like -1; those are only meaningful in the ListOffsets API.

Source

pub async fn offsets_for_times( &self, partitions: &[(&str, PartitionId)], timestamp: i64, ) -> HashMap<(String, PartitionId), Result<Offset>>

Look up the earliest offset whose message timestamp is greater than or equal to the given timestamp, for each listed (topic, partition).

Uses the ListOffsets API. Requests are batched by leader broker so each broker receives at most one RPC.

Every input partition appears in the returned map:

  • Ok(offset) — the broker returned a valid offset (-1 means no message exists at or after the timestamp for that partition).
  • Err(e) — a partition-level broker error (e.g. NotLeaderForPartition) or a transport failure prevented resolution for this partition.
§Example
let results = consumer
    .offsets_for_times(&[("orders", 0), ("orders", 1)], 1_700_000_000_000)
    .await;
for ((topic, partition), result) in &results {
    match result {
        Ok(offset) => println!("{topic}-{partition}: {offset}"),
        Err(e) => eprintln!("{topic}-{partition}: error {e}"),
    }
}
Source

pub async fn offsets_for_times_for_topic( &self, topic: &str, timestamp: i64, ) -> Result<HashMap<PartitionId, Result<Offset>>>

Look up the earliest offset whose message timestamp is greater than or equal to the given timestamp, for every partition of a single topic.

Convenience wrapper around Consumer::offsets_for_times that resolves the topic’s partitions from metadata so callers don’t have to list them. Always refreshes topic metadata before deriving the partition list so the results reflect the latest leader assignments (the refresh is skipped by the metadata layer if cached metadata is still fresh).

Returns Err if the topic cannot be found after the metadata refresh. On success, each PartitionId maps to Ok(offset) or Err(e) — see Consumer::offsets_for_times for per-partition semantics.

Source

pub async fn fetch_watermarks( &self, topic: &str, partition: PartitionId, ) -> Result<(Offset, Offset)>

Fetch the low (log start) and high (latest) watermarks for a partition.

Issues two ListOffsets RPCs to the partition leader — one for the earliest offset (timestamp = -2) and one for the latest (timestamp = -1) — and returns (low, high). Both RPCs are issued concurrently.

Source

pub async fn fetch_metadata( &self, topic: Option<&str>, ) -> Result<FetchMetadataResult>

Return a snapshot of cluster metadata (brokers and topics).

If topic is Some, only that topic is returned and the metadata layer is asked to refresh that topic first (the network call is skipped if cached metadata is still fresh). If None, a snapshot of all currently cached topics is returned without triggering a refresh (cached data may be partial or stale).

Source

pub async fn poll(&self, timeout: Duration) -> Result<Vec<ConsumerRecord>>

Poll for new records.

Performs one broker fetch round-trip per assigned broker, waits up to timeout for records to arrive, and returns all records received in that single round. max_poll_records (from ConsumerConfig) caps the returned slice.

When to use poll: for simple event loops where low per-call overhead matters and a single broker round-trip per iteration is acceptable. Processing happens synchronously in the loop; the fetch latency equals your per-iteration latency.

When to use batch_recv instead: when you need a fixed batch size for downstream batching (e.g., bulk database inserts or transactional exactly-once pipelines). batch_recv drains the internal buffer first and keeps fetching until max_records are collected or the deadline elapses — it is the throughput-optimised path.

§Example
let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .build()
    .await?;

consumer.subscribe(&["my-topic"]).await?;

loop {
    let records = consumer.poll(std::time::Duration::from_secs(1)).await?;
    for record in records {
        println!("Received: {:?}", record);
    }
}
Source

pub async fn recv(&self) -> Result<ConsumerRecord, RecvError>

Receive the next record.

This is a convenience method that returns one record at a time. Internally buffers records from poll() and returns them one by one, ensuring no records are lost.

Returns Err(RecvError::Closed) when the consumer is shut down. Returns Err(RecvError::Error(e)) on broker or network failures.

§Example
loop {
    match consumer.recv().await {
        Ok(record)               => process(record),
        Err(RecvError::Closed)   => break,
        Err(RecvError::Error(e)) => return Err(e),
        _ => break, // future variants (non_exhaustive)
    }
}
Source

pub async fn batch_recv( &self, max_records: usize, timeout: Duration, ) -> Result<BatchRecvOutcome>

Collect up to max_records records, waiting at most timeout.

Returns as soon as max_records have been collected or timeout elapses, whichever comes first.

If the consumer closes after some records were already buffered or fetched, those records are returned as a partial batch.

When to use batch_recv: for throughput-optimised pipelines that need fixed-size batches — bulk database inserts, transactional exactly-once produce-consume loops, or processing frameworks that benefit from amortising per-batch overhead. batch_recv drains the internal record buffer before issuing new fetches and keeps looping until max_records are collected or the deadline elapses.

When to use poll instead: for simple event loops that process records as they arrive and where the number of records per iteration does not need to be bounded.

§BatchRecvOutcome

The enum is #[non_exhaustive], so match arms must include a catch-all (_ => {}) to remain forward-compatible with new variants.

§Errors

Returns Err on broker or network errors.

§Example
use std::time::Duration;

use krafka::consumer::BatchRecvOutcome;

match consumer.batch_recv(100, Duration::from_millis(200)).await? {
    BatchRecvOutcome::Records(records) => {
        for record in records {
            println!("{}: {:?}", record.offset, record.value);
        }
    }
    BatchRecvOutcome::TimedOut => {}
    BatchRecvOutcome::Closed => break,
    BatchRecvOutcome::EmptyRequest => {}
    _ => {} // required: BatchRecvOutcome is #[non_exhaustive]
}
Source

pub fn stream(&self) -> ConsumerStream<'_>

Create an async Stream of records.

Each element is a Result<ConsumerRecord>. The stream terminates when the consumer is closed (returns None). Broker and network errors are propagated as Some(Err(...)).

Internally delegates to recv(), which handles polling, buffering, auto-commit, rebalancing, and shutdown.

§Example
use tokio_stream::StreamExt;

let mut stream = consumer.stream();
while let Some(result) = stream.next().await {
    let record = result?;
    println!("{}: {}", record.topic, record.offset);
}
Source

pub async fn commit(&self) -> Result<()>

Commit offsets for all consumed records.

This stores the current offsets for assigned partitions only. When using a consumer group, this sends an OffsetCommit request to the group coordinator. Offsets for revoked partitions are excluded to avoid overwriting the new owner’s progress.

Source

pub async fn commit_sync(&self) -> Result<()>

Commit offsets synchronously.

Source

pub fn commit_async(&self) -> OffsetCommitHandle

Commit offsets asynchronously.

Spawns the offset commit as a background task.

Await the returned handle to observe offset-snapshot, transport, and broker errors. Retriable coordinator failures use the same short backoff loop as Consumer::commit. If the handle is dropped, the task continues in the background and its result is discarded.

Source

pub async fn commit_with_metadata( &self, offsets: HashMap<TopicPartition, OffsetAndMetadata>, ) -> Result<()>

Commit specific offsets with metadata.

Allows committing offsets for specific topic-partitions with optional metadata. This is useful for checkpointing or storing application-specific context.

§Example
use std::collections::HashMap;
use krafka::consumer::{Consumer, OffsetAndMetadata, TopicPartition};

let mut offsets = HashMap::new();
offsets.insert(
    TopicPartition::new("my-topic", 0),
    OffsetAndMetadata::with_metadata(100, "checkpoint-abc123"),
);
consumer.commit_with_metadata(offsets).await?;
Source

pub async fn position( &self, topic: &str, partition: PartitionId, ) -> Option<Offset>

Get the current position for a partition.

Source

pub async fn assignment(&self) -> HashMap<String, Vec<PartitionId>>

Returns a snapshot of the current partition assignments.

The returned HashMap is a clone — modifying it has no effect on the consumer’s internal state. Assignments may change asynchronously due to rebalances.

Source

pub async fn subscription(&self) -> HashSet<String>

Get all subscribed topics.

Source

pub async fn current_lag( &self, topic: &str, partition: PartitionId, ) -> Option<u64>

Get the current lag for a specific partition.

Returns the difference between the high watermark (latest offset on the broker) and the consumer’s current position. Returns None if the high watermark or position is not yet known (e.g., no fetch has completed for this partition).

This uses cached high watermarks from the most recent fetch response — no additional network calls are made.

Source

pub async fn lag(&self) -> LagResult

Get per-partition lag for all assigned partitions.

Returns a LagResult containing per-partition lag values and a list of partitions whose cached high watermark is older than crate::consumer::ConsumerConfigBuilder::lag_staleness_threshold (default: 60 s).

Partitions whose high watermark or position is not yet known are omitted from LagResult::lag entirely.

Source

pub async fn cached_beginning_offset( &self, topic: &str, partition: PartitionId, ) -> Option<Offset>

Get the cached beginning (log start) offset for a partition.

Returns the earliest available offset on the broker, cached from fetch responses. Returns None if no fetch has completed for this partition yet. No network calls are made.

Source

pub async fn cached_end_offset( &self, topic: &str, partition: PartitionId, ) -> Option<Offset>

Get the cached end (high watermark) offset for a partition.

Returns the latest offset on the broker, cached from fetch responses. Returns None if no fetch has completed for this partition yet. No network calls are made.

Source

pub async fn fetch_end_offset( &self, topic: &str, partition: PartitionId, ) -> Result<Offset>

Fetch the current end (high-watermark) offset for a single partition with a live ListOffsets RPC.

Unlike cached_end_offset, which returns a value from the most-recent fetch response, this method always contacts the partition leader and returns a fresh value. Use it when staleness is unacceptable — for example, before the first poll, when a partition is paused, or when computing precise consumer-lag metrics.

§Errors

Returns Err if the topic name is invalid, no leader is available, or the broker returns an error.

Source

pub async fn is_caught_up(&self) -> bool

Returns true if the consumer’s current position has reached or exceeded the high-watermark (end offset) on every assigned partition.

“Caught up” means there are no more records available to consume right now. This check uses cached high-watermarks updated on each successful fetch response; the values are not refreshed by this call.

Returns false if:

  • Any assigned partition’s high-watermark has not yet been cached (i.e. no successful fetch has completed for that partition).
  • The consumer’s position on any partition is behind its cached high-watermark.

For a precise check against fresh broker state, call fetch_end_offset on each partition and compare it to position.

Source

pub async fn unsubscribe(&self) -> Result<()>

Unsubscribe from all topics.

properly notifies the rebalance listener, leaves the consumer group, clears offsets, paused set, and drains recv buffer.

Returns a leave-group error after local state has still been cleared.

Source

pub async fn pause(&self, topic: &str, partitions: &[PartitionId])

Pause consumption of specific partitions.

Paused partitions will be skipped during poll() until resumed.

Source

pub async fn resume(&self, topic: &str, partitions: &[PartitionId])

Resume consumption of specific partitions.

Resumes polling for previously paused partitions.

Source

pub async fn paused_partitions(&self) -> HashSet<(String, PartitionId)>

Get the set of paused partitions.

Source

pub fn update_seed_brokers(&self, servers: Vec<String>) -> Result<()>

Replace the bootstrap server list at runtime (KIP-899).

The new addresses are used on the next metadata refresh that falls back to bootstrap servers. Does not close existing connections.

§Errors

Returns an error if servers is empty.

Source

pub async fn rebootstrap(&self)

Force a rebootstrap: close all connections, clear the metadata cache, and fall back to bootstrap servers (KIP-899).

Source

pub async fn close(&self) -> Result<()>

Close the consumer.

Commits offsets (if auto-commit is enabled), leaves the consumer group, and tears down connections. Calling close() more than once is a no-op.

Returns the first cleanup error after local state and connections have still been torn down.

Source

pub fn is_closed(&self) -> bool

Check if the consumer is closed.

Source

pub fn group_coordinator(&self) -> Option<&Arc<GroupCoordinator>>

Get the group coordinator, if one is configured.

Source

pub fn metrics(&self) -> &Arc<ConsumerMetrics>

Get a snapshot of consumer metrics.

Source

pub fn connection_metrics(&self) -> Arc<ConnectionMetrics>

Get the shared connection metrics handle used by this consumer’s broker pool.

Trait Implementations§

Source§

impl Drop for Consumer

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

fn pin_drop(self: Pin<&mut Self>)

🔬This is a nightly-only experimental API. (pin_ergonomics)
Execute the destructor for this type, but different to Drop::drop, it requires self to be pinned. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

Source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more