Struct KafkaStreamConsumerAdapter

Source
pub struct KafkaStreamConsumerAdapter(/* private fields */);
Expand description

An adapter used to make easier Stream implementation of StreamConsumer.

Implementations§

Source§

impl KafkaStreamConsumerAdapter

Source

pub fn new(csm: StreamConsumer) -> Self

Creates new adapter.

Methods from Deref<Target = StreamConsumer>§

Source

pub fn stream(&self) -> MessageStream<'_, C>

Constructs a stream that yields messages from this consumer.

It is legal to have multiple live message streams for the same consumer, and to move those message streams across threads. Note, however, that the message streams share the same underlying state. A message received by the consumer will be delivered to only one of the live message streams. If you seek the underlying consumer, all message streams created from the consumer will begin to draw messages from the new position of the consumer.

If you want multiple independent views of a Kafka topic, create multiple consumers, not multiple message streams.

Source

pub async fn recv(&self) -> Result<BorrowedMessage<'_>, KafkaError>

Receives the next message from the stream.

This method will block until the next message is available or an error occurs. It is legal to call recv from multiple threads simultaneously.

This method is cancellation safe.

Note that this method is exactly as efficient as constructing a single-use message stream and extracting one message from it:

use futures::stream::StreamExt;

consumer.stream().next().await.expect("MessageStream never returns None");
Source

pub fn split_partition_queue( self: &Arc<StreamConsumer<C, R>>, topic: &str, partition: i32, ) -> Option<StreamPartitionQueue<C, R>>

Splits messages for the specified partition into their own stream.

If the topic or partition is invalid, returns None.

After calling this method, newly-fetched messages for the specified partition will be returned via StreamPartitionQueue::recv rather than StreamConsumer::recv. Note that there may be buffered messages for the specified partition that will continue to be returned by StreamConsumer::recv. For best results, call split_partition_queue before the first call to StreamConsumer::recv.

You must periodically await StreamConsumer::recv, even if no messages are expected, to serve events. Consider using a background task like:

tokio::spawn(async move {
    let message = stream_consumer.recv().await;
    panic!("main stream consumer queue unexpectedly received message: {:?}", message);
})

Note that calling Consumer::assign will deactivate any existing partition queues. You will need to call this method for every partition that should be split after every call to assign.

Beware that this method is implemented for &Arc<Self>, not &self. You will need to wrap your consumer in an Arc in order to call this method. This design permits moving the partition queue to another thread while ensuring the partition queue does not outlive the consumer.

Trait Implementations§

Source§

impl AsMut<StreamConsumer> for KafkaStreamConsumerAdapter

Source§

fn as_mut(&mut self) -> &mut StreamConsumer

Converts this type into a mutable reference of the (usually inferred) input type.
Source§

impl AsRef<StreamConsumer> for KafkaStreamConsumerAdapter

Source§

fn as_ref(&self) -> &StreamConsumer

Converts this type into a shared reference of the (usually inferred) input type.
Source§

impl Borrow<StreamConsumer> for KafkaStreamConsumerAdapter

Source§

fn borrow(&self) -> &StreamConsumer

Immutably borrows from an owned value. Read more
Source§

impl BorrowMut<StreamConsumer> for KafkaStreamConsumerAdapter

Source§

fn borrow_mut(&mut self) -> &mut StreamConsumer

Mutably borrows from an owned value. Read more
Source§

impl Deref for KafkaStreamConsumerAdapter

Source§

type Target = StreamConsumer

The resulting type after dereferencing.
Source§

fn deref(&self) -> &Self::Target

Dereferences the value.
Source§

impl DerefMut for KafkaStreamConsumerAdapter

Source§

fn deref_mut(&mut self) -> &mut Self::Target

Mutably dereferences the value.
Source§

impl From<StreamConsumer> for KafkaStreamConsumerAdapter

Source§

fn from(csm: StreamConsumer) -> Self

Converts to this type from the input type.
Source§

impl Stream<Result<OwnedMessage, KafkaError>> for KafkaStreamConsumerAdapter

Source§

async fn next(&mut self) -> Option<KafkaResult<OwnedMessage>>

Returns next item.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<P, T> Receiver for P
where P: Deref<Target = T> + ?Sized, T: ?Sized,

Source§

type Target = T

🔬This is a nightly-only experimental API. (arbitrary_self_types)
The target type on which the method may be called.
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more