[−][src]Struct fluvio::PartitionConsumer
An interface for consuming events from a particular partition
There are two ways to consume events: by "fetching" events
and by "streaming" events. Fetching involves specifying a
range of events that you want to consume via their Offset
.
A fetch is a sort of one-time batch operation: you'll receive
all of the events in your range all at once. When you consume
events via Streaming, you specify a starting Offset
and
receive an object that will continuously yield new events as
they arrive.
Creating a Consumer
You can create a PartitionConsumer
via the partition_consumer
method on the Fluvio
client, like so:
let consumer = fluvio.partition_consumer("my-topic", 0).await?; let records = consumer.fetch(Offset::beginning()).await?;
Implementations
impl PartitionConsumer
[src]
pub async fn fetch<'_>(
&'_ self,
offset: Offset
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError>
[src]
&'_ self,
offset: Offset
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError>
Fetches events from a particular offset in the consumer's partition
A "fetch" is one of the two ways to consume events in Fluvio.
It is a batch request for records from a particular offset in
the partition. You specify the position of records to retrieve
using an Offset
, and receive the events as a list of records.
If you want more fine-grained control over how records are fetched,
check out the fetch_with_config
method.
Example
let response = consumer.fetch(Offset::beginning()).await?; for batch in response.records.batches { for record in batch.records { if let Some(record) = record.value.inner_value() { let string = String::from_utf8(record) .expect("record should be a string"); println!("Got record: {}", string); } } }
pub async fn fetch_with_config<'_>(
&'_ self,
offset: Offset,
option: ConsumerConfig
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError>
[src]
&'_ self,
offset: Offset,
option: ConsumerConfig
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError>
Fetches events from a consumer using a specific fetching configuration
Most of the time, you shouldn't need to use a custom ConsumerConfig
.
If you don't know what these settings do, try checking out the simpler
fetch
method that uses the default fetching settings.
A "fetch" is one of the two ways to consume events in Fluvio.
It is a batch request for records from a particular offset in
the partition. You specify the range of records to retrieve
using an Offset
and a ConsumerConfig
, and receive
the events as a list of records.
Example
// Use custom fetching configurations let fetch_config = ConsumerConfig::default() .with_max_bytes(1000); let response = consumer.fetch_with_config(Offset::beginning(), fetch_config).await?; for batch in response.records.batches { for record in batch.records { if let Some(record) = record.value.inner_value() { let string = String::from_utf8(record) .expect("record should be a string"); println!("Got record: {}", string); } } }
pub async fn stream<'_>(
&'_ self,
offset: Offset
) -> Result<impl Stream<Item = Result<Record, FluvioError>>, FluvioError>
[src]
&'_ self,
offset: Offset
) -> Result<impl Stream<Item = Result<Record, FluvioError>>, FluvioError>
Continuously streams events from a particular offset in the consumer's partition
Streaming is one of the two ways to consume events in Fluvio.
It is a continuous request for new records arriving in a partition,
beginning at a particular offset. You specify the starting point of the
stream using an Offset
and periodically receive events, either individually
or in batches.
If you want more fine-grained control over how records are streamed,
check out the stream_with_config
method.
Example
use futures::StreamExt; let mut stream = consumer.stream(Offset::beginning()).await?; while let Some(Ok(record)) = stream.next().await { if let Some(bytes) = record.try_into_bytes() { let string = String::from_utf8_lossy(&bytes); println!("Got event: {}", string); } }
pub async fn stream_with_config<'_>(
&'_ self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Record, FluvioError>>, FluvioError>
[src]
&'_ self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Record, FluvioError>>, FluvioError>
Continuously streams events from a particular offset in the consumer's partition
Most of the time, you shouldn't need to use a custom ConsumerConfig
.
If you don't know what these settings do, try checking out the simpler
[stream
] method that uses the default streaming settings.
Streaming is one of the two ways to consume events in Fluvio.
It is a continuous request for new records arriving in a partition,
beginning at a particular offset. You specify the starting point of the
stream using an Offset
and a ConsumerConfig
, and periodically
receive events, either individually or in batches.
Example
use futures::StreamExt; // Use a custom max_bytes value in the config let fetch_config = ConsumerConfig::default() .with_max_bytes(1000); let mut stream = consumer.stream_with_config(Offset::beginning(), fetch_config).await?; while let Some(Ok(record)) = stream.next().await { if let Some(bytes) = record.try_into_bytes() { let string = String::from_utf8_lossy(&bytes); println!("Got event: {}", string); } }
Auto Trait Implementations
impl !RefUnwindSafe for PartitionConsumer
impl Send for PartitionConsumer
impl Sync for PartitionConsumer
impl Unpin for PartitionConsumer
impl !UnwindSafe for PartitionConsumer
Blanket Implementations
impl<T> Any for T where
T: 'static + ?Sized,
[src]
T: 'static + ?Sized,
impl<T> Borrow<T> for T where
T: ?Sized,
[src]
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
[src]
T: ?Sized,
pub fn borrow_mut(&mut self) -> &mut T
[src]
impl<T> Erased for T
impl<T> From<T> for T
[src]
impl<T> Instrument for T
[src]
fn instrument(self, span: Span) -> Instrumented<Self>
[src]
fn in_current_span(self) -> Instrumented<Self>
[src]
impl<T> Instrument for T
[src]
fn instrument(self, span: Span) -> Instrumented<Self>
[src]
fn in_current_span(self) -> Instrumented<Self>
[src]
impl<T, U> Into<U> for T where
U: From<T>,
[src]
U: From<T>,
impl<T, U> TryFrom<U> for T where
U: Into<T>,
[src]
U: Into<T>,
type Error = Infallible
The type returned in the event of a conversion error.
pub fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>
[src]
impl<T, U> TryInto<U> for T where
U: TryFrom<T>,
[src]
U: TryFrom<T>,
type Error = <U as TryFrom<T>>::Error
The type returned in the event of a conversion error.
pub fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>
[src]
impl<T> WithSubscriber for T
[src]
fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self> where
S: Into<Dispatch>,
[src]
S: Into<Dispatch>,