Struct fluvio::consumer::PartitionConsumer [−][src]
pub struct PartitionConsumer { /* fields omitted */ }
Expand description
An interface for consuming events from a particular partition
There are two ways to consume events: by “fetching” events
and by “streaming” events. Fetching involves specifying a
range of events that you want to consume via their Offset
.
A fetch is a sort of one-time batch operation: you’ll receive
all of the events in your range all at once. When you consume
events via Streaming, you specify a starting Offset
and
receive an object that will continuously yield new events as
they arrive.
Creating a Consumer
You can create a PartitionConsumer
via the partition_consumer
method on the Fluvio
client, like so:
let consumer = fluvio.partition_consumer("my-topic", 0).await?; let records = consumer.fetch(Offset::beginning()).await?;
Implementations
impl PartitionConsumer
[src]
impl PartitionConsumer
[src]pub async fn fetch(
&self,
offset: Offset
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError>
[src]
pub async fn fetch(
&self,
offset: Offset
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError>
[src]Fetches events from a particular offset in the consumer’s partition
A “fetch” is one of the two ways to consume events in Fluvio.
It is a batch request for records from a particular offset in
the partition. You specify the position of records to retrieve
using an Offset
, and receive the events as a list of records.
If you want more fine-grained control over how records are fetched,
check out the fetch_with_config
method.
Example
let response = consumer.fetch(Offset::beginning()).await?; for batch in response.records.batches { for record in batch.records() { let string = String::from_utf8_lossy(record.value.as_ref()); println!("Got record: {}", string); } }
pub async fn fetch_with_config(
&self,
offset: Offset,
option: ConsumerConfig
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError>
[src]
pub async fn fetch_with_config(
&self,
offset: Offset,
option: ConsumerConfig
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError>
[src]Fetches events from a consumer using a specific fetching configuration
Most of the time, you shouldn’t need to use a custom ConsumerConfig
.
If you don’t know what these settings do, try checking out the simpler
fetch
method that uses the default fetching settings.
A “fetch” is one of the two ways to consume events in Fluvio.
It is a batch request for records from a particular offset in
the partition. You specify the range of records to retrieve
using an Offset
and a ConsumerConfig
, and receive
the events as a list of records.
Example
// Use custom fetching configurations let fetch_config = ConsumerConfig::default() .with_max_bytes(1000); let response = consumer.fetch_with_config(Offset::beginning(), fetch_config).await?; for batch in response.records.batches { for record in batch.records() { let string = String::from_utf8_lossy(record.value.as_ref()); println!("Got record: {}", string); } }
pub async fn stream(
&self,
offset: Offset
) -> Result<impl Stream<Item = Result<Record, FluvioError>>, FluvioError>
[src]
pub async fn stream(
&self,
offset: Offset
) -> Result<impl Stream<Item = Result<Record, FluvioError>>, FluvioError>
[src]Continuously streams events from a particular offset in the consumer’s partition
Streaming is one of the two ways to consume events in Fluvio.
It is a continuous request for new records arriving in a partition,
beginning at a particular offset. You specify the starting point of the
stream using an Offset
and periodically receive events, either individually
or in batches.
If you want more fine-grained control over how records are streamed,
check out the stream_with_config
method.
Example
use futures::StreamExt; let mut stream = consumer.stream(Offset::beginning()).await?; while let Some(Ok(record)) = stream.next().await { let key = record.key().map(|key| String::from_utf8_lossy(key).to_string()); let value = String::from_utf8_lossy(record.value()).to_string(); println!("Got event: key={:?}, value={}", key, value); }
pub async fn stream_with_config(
&self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Record, FluvioError>>, FluvioError>
[src]
pub async fn stream_with_config(
&self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Record, FluvioError>>, FluvioError>
[src]Continuously streams events from a particular offset in the consumer’s partition
Most of the time, you shouldn’t need to use a custom ConsumerConfig
.
If you don’t know what these settings do, try checking out the simpler
[stream
] method that uses the default streaming settings.
Streaming is one of the two ways to consume events in Fluvio.
It is a continuous request for new records arriving in a partition,
beginning at a particular offset. You specify the starting point of the
stream using an Offset
and a ConsumerConfig
, and periodically
receive events, either individually or in batches.
Example
use futures::StreamExt; // Use a custom max_bytes value in the config let fetch_config = ConsumerConfig::default() .with_max_bytes(1000); let mut stream = consumer.stream_with_config(Offset::beginning(), fetch_config).await?; while let Some(Ok(record)) = stream.next().await { let key: Option<String> = record.key().map(|key| String::from_utf8_lossy(key).to_string()); let value = String::from_utf8_lossy(record.value()); println!("Got record: key={:?}, value={}", key, value); }
pub async fn stream_batches_with_config(
&self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Batch, FluvioError>>, FluvioError>
[src]
pub async fn stream_batches_with_config(
&self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Batch, FluvioError>>, FluvioError>
[src]Continuously streams batches of messages, starting an offset in the consumer’s partition
use futures::StreamExt; // Use a custom max_bytes value in the config let fetch_config = ConsumerConfig::default() .with_max_bytes(1000); let mut stream = consumer.stream_batches_with_config(Offset::beginning(), fetch_config).await?; while let Some(Ok(batch)) = stream.next().await { for record in batch.records() { let key = record.key.as_ref().map(|key| String::from_utf8_lossy(key.as_ref()).to_string()); let value = String::from_utf8_lossy(record.value.as_ref()).to_string(); println!("Got record: key={:?}, value={}", key, value); } }
Auto Trait Implementations
impl !RefUnwindSafe for PartitionConsumer
impl Send for PartitionConsumer
impl Sync for PartitionConsumer
impl Unpin for PartitionConsumer
impl !UnwindSafe for PartitionConsumer
Blanket Implementations
impl<T> BorrowMut<T> for T where
T: ?Sized,
[src]
impl<T> BorrowMut<T> for T where
T: ?Sized,
[src]pub fn borrow_mut(&mut self) -> &mut T
[src]
pub fn borrow_mut(&mut self) -> &mut T
[src]Mutably borrows from an owned value. Read more
impl<T> Instrument for T
[src]
impl<T> Instrument for T
[src]fn instrument(self, span: Span) -> Instrumented<Self>
[src]
fn instrument(self, span: Span) -> Instrumented<Self>
[src]Instruments this type with the provided Span
, returning an
Instrumented
wrapper. Read more
fn in_current_span(self) -> Instrumented<Self>
[src]
fn in_current_span(self) -> Instrumented<Self>
[src]impl<T> AsyncConnector for T where
T: Send + Sync,
[src]
T: Send + Sync,