Struct rdkafka::consumer::stream_consumer::StreamConsumer [−][src]
A high-level consumer with a Stream
interface.
This consumer doesn’t need to be polled explicitly. Extracting an item from
the stream returned by the stream
will
implicitly poll the underlying Kafka consumer.
If you activate the consumer group protocol by calling
subscribe
, the stream consumer will integrate with
librdkafka’s liveness detection as described in KIP-62. You must be sure
that you attempt to extract a message from the stream consumer at least
every max.poll.interval.ms
milliseconds, or librdkafka will assume that
the processing thread is wedged and leave the consumer groups.
Implementations
impl<C, R> StreamConsumer<C, R> where
C: ConsumerContext + 'static,
[src]
C: ConsumerContext + 'static,
pub fn stream(&self) -> MessageStream<'_, C, R>
[src]
Constructs a stream that yields messages from this consumer.
It is legal to have multiple live message streams for the same consumer, and to move those message streams across threads. Note, however, that the message streams share the same underlying state. A message received by the consumer will be delivered to only one of the live message streams. If you seek the underlying consumer, all message streams created from the consumer will begin to draw messages from the new position of the consumer.
If you want multiple independent views of a Kafka topic, create multiple consumers, not multiple message streams.
pub fn start(&self) -> MessageStream<'_, C, R>
[src]
use the more clearly named “StreamConsumer::stream” method instead
Constructs a stream that yields messages from this consumer.
pub async fn recv(&self) -> Result<BorrowedMessage<'_>, KafkaError>
[src]
Receives the next message from the stream.
This method will block until the next message is available or an error
occurs. It is legal to call recv
from multiple threads simultaneously.
Note that this method is exactly as efficient as constructing a single-use message stream and extracting one message from it:
use futures::stream::StreamExt; consumer.stream().next().await.expect("MessageStream never returns None");
Trait Implementations
impl<C, R> Consumer<StreamConsumerContext<C>> for StreamConsumer<C, R> where
C: ConsumerContext,
[src]
C: ConsumerContext,
fn client(&self) -> &Client<StreamConsumerContext<C>>
[src]
fn group_metadata(&self) -> Option<ConsumerGroupMetadata>
[src]
fn subscribe(&self, topics: &[&str]) -> KafkaResult<()>
[src]
fn unsubscribe(&self)
[src]
fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>
[src]
fn seek<T: Into<Timeout>>(
&self,
topic: &str,
partition: i32,
offset: Offset,
timeout: T
) -> KafkaResult<()>
[src]
&self,
topic: &str,
partition: i32,
offset: Offset,
timeout: T
) -> KafkaResult<()>
fn commit(
&self,
topic_partition_list: &TopicPartitionList,
mode: CommitMode
) -> KafkaResult<()>
[src]
&self,
topic_partition_list: &TopicPartitionList,
mode: CommitMode
) -> KafkaResult<()>
fn commit_consumer_state(&self, mode: CommitMode) -> KafkaResult<()>
[src]
fn commit_message(
&self,
message: &BorrowedMessage<'_>,
mode: CommitMode
) -> KafkaResult<()>
[src]
&self,
message: &BorrowedMessage<'_>,
mode: CommitMode
) -> KafkaResult<()>
fn store_offset(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()>
[src]
fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()>
[src]
fn subscription(&self) -> KafkaResult<TopicPartitionList>
[src]
fn assignment(&self) -> KafkaResult<TopicPartitionList>
[src]
fn committed<T>(&self, timeout: T) -> KafkaResult<TopicPartitionList> where
T: Into<Timeout>,
Self: Sized,
[src]
T: Into<Timeout>,
Self: Sized,
fn committed_offsets<T>(
&self,
tpl: TopicPartitionList,
timeout: T
) -> KafkaResult<TopicPartitionList> where
T: Into<Timeout>,
[src]
&self,
tpl: TopicPartitionList,
timeout: T
) -> KafkaResult<TopicPartitionList> where
T: Into<Timeout>,
fn offsets_for_timestamp<T>(
&self,
timestamp: i64,
timeout: T
) -> KafkaResult<TopicPartitionList> where
T: Into<Timeout>,
Self: Sized,
[src]
&self,
timestamp: i64,
timeout: T
) -> KafkaResult<TopicPartitionList> where
T: Into<Timeout>,
Self: Sized,
fn offsets_for_times<T>(
&self,
timestamps: TopicPartitionList,
timeout: T
) -> KafkaResult<TopicPartitionList> where
T: Into<Timeout>,
Self: Sized,
[src]
&self,
timestamps: TopicPartitionList,
timeout: T
) -> KafkaResult<TopicPartitionList> where
T: Into<Timeout>,
Self: Sized,
fn position(&self) -> KafkaResult<TopicPartitionList>
[src]
fn fetch_metadata<T>(
&self,
topic: Option<&str>,
timeout: T
) -> KafkaResult<Metadata> where
T: Into<Timeout>,
Self: Sized,
[src]
&self,
topic: Option<&str>,
timeout: T
) -> KafkaResult<Metadata> where
T: Into<Timeout>,
Self: Sized,
fn fetch_watermarks<T>(
&self,
topic: &str,
partition: i32,
timeout: T
) -> KafkaResult<(i64, i64)> where
T: Into<Timeout>,
Self: Sized,
[src]
&self,
topic: &str,
partition: i32,
timeout: T
) -> KafkaResult<(i64, i64)> where
T: Into<Timeout>,
Self: Sized,
fn fetch_group_list<T>(
&self,
group: Option<&str>,
timeout: T
) -> KafkaResult<GroupList> where
T: Into<Timeout>,
Self: Sized,
[src]
&self,
group: Option<&str>,
timeout: T
) -> KafkaResult<GroupList> where
T: Into<Timeout>,
Self: Sized,
fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()>
[src]
fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()>
[src]
fn context(&self) -> &Arc<C>
[src]
impl<R> FromClientConfig for StreamConsumer<DefaultConsumerContext, R> where
R: AsyncRuntime,
[src]
R: AsyncRuntime,
fn from_config(config: &ClientConfig) -> KafkaResult<Self>
[src]
impl<C, R> FromClientConfigAndContext<C> for StreamConsumer<C, R> where
C: ConsumerContext + 'static,
R: AsyncRuntime,
[src]
C: ConsumerContext + 'static,
R: AsyncRuntime,
Creates a new StreamConsumer
starting from a ClientConfig
.
fn from_config_and_context(
config: &ClientConfig,
context: C
) -> KafkaResult<Self>
[src]
config: &ClientConfig,
context: C
) -> KafkaResult<Self>
Auto Trait Implementations
impl<C = DefaultConsumerContext, R = TokioRuntime> !RefUnwindSafe for StreamConsumer<C, R>
impl<C, R> Send for StreamConsumer<C, R> where
R: Send,
R: Send,
impl<C, R> Sync for StreamConsumer<C, R> where
R: Sync,
R: Sync,
impl<C, R> Unpin for StreamConsumer<C, R> where
R: Unpin,
R: Unpin,
impl<C = DefaultConsumerContext, R = TokioRuntime> !UnwindSafe for StreamConsumer<C, R>
Blanket Implementations
impl<T> Any for T where
T: 'static + ?Sized,
[src]
T: 'static + ?Sized,
impl<T> Borrow<T> for T where
T: ?Sized,
[src]
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
[src]
T: ?Sized,
pub fn borrow_mut(&mut self) -> &mut T
[src]
impl<T> From<T> for T
[src]
impl<T, U> Into<U> for T where
U: From<T>,
[src]
U: From<T>,
impl<T, U> TryFrom<U> for T where
U: Into<T>,
[src]
U: Into<T>,
type Error = Infallible
The type returned in the event of a conversion error.
pub fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>
[src]
impl<T, U> TryInto<U> for T where
U: TryFrom<T>,
[src]
U: TryFrom<T>,