Struct fluvio::consumer::PartitionConsumer [−][src]
pub struct PartitionConsumer<P = SpuPool> { /* fields omitted */ }
Expand description
An interface for consuming events from a particular partition
There are two ways to consume events: by “fetching” events
and by “streaming” events. Fetching involves specifying a
range of events that you want to consume via their Offset
.
A fetch is a sort of one-time batch operation: you’ll receive
all of the events in your range all at once. When you consume
events via Streaming, you specify a starting Offset
and
receive an object that will continuously yield new events as
they arrive.
Creating a Consumer
You can create a PartitionConsumer
via the partition_consumer
method on the Fluvio
client, like so:
let consumer = fluvio.partition_consumer("my-topic", 0).await?;
let records = consumer.fetch(Offset::beginning()).await?;
Implementations
pub async fn fetch(
&self,
offset: Offset
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError>
👎 Deprecated since 0.9.2: Use ‘stream’ instead
pub async fn fetch(
&self,
offset: Offset
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError>
Use ‘stream’ instead
Fetches events from a particular offset in the consumer’s partition
A “fetch” is one of the two ways to consume events in Fluvio.
It is a batch request for records from a particular offset in
the partition. You specify the position of records to retrieve
using an Offset
, and receive the events as a list of records.
If you want more fine-grained control over how records are fetched,
check out the fetch_with_config
method.
Example
let response = consumer.fetch(Offset::beginning()).await?;
for batch in response.records.batches {
for record in batch.records() {
let string = String::from_utf8_lossy(record.value.as_ref());
println!("Got record: {}", string);
}
}
pub async fn fetch_with_config(
&self,
offset: Offset,
option: ConsumerConfig
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError>
👎 Deprecated since 0.9.2: Use ‘stream_with_config’ instead
pub async fn fetch_with_config(
&self,
offset: Offset,
option: ConsumerConfig
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError>
Use ‘stream_with_config’ instead
Fetches events from a consumer using a specific fetching configuration
Most of the time, you shouldn’t need to use a custom ConsumerConfig
.
If you don’t know what these settings do, try checking out the simpler
fetch
method that uses the default fetching settings.
A “fetch” is one of the two ways to consume events in Fluvio.
It is a batch request for records from a particular offset in
the partition. You specify the range of records to retrieve
using an Offset
and a ConsumerConfig
, and receive
the events as a list of records.
Example
// Use custom fetching configurations
let fetch_config = ConsumerConfig::builder()
.max_bytes(1000)
.build()?;
let response = consumer.fetch_with_config(Offset::beginning(), fetch_config).await?;
for batch in response.records.batches {
for record in batch.records() {
let string = String::from_utf8_lossy(record.value.as_ref());
println!("Got record: {}", string);
}
}
Continuously streams events from a particular offset in the consumer’s partition
Streaming is one of the two ways to consume events in Fluvio.
It is a continuous request for new records arriving in a partition,
beginning at a particular offset. You specify the starting point of the
stream using an Offset
and periodically receive events, either individually
or in batches.
If you want more fine-grained control over how records are streamed,
check out the stream_with_config
method.
Example
use futures::StreamExt;
let mut stream = consumer.stream(Offset::beginning()).await?;
while let Some(Ok(record)) = stream.next().await {
let key = record.key().map(|key| String::from_utf8_lossy(key).to_string());
let value = String::from_utf8_lossy(record.value()).to_string();
println!("Got event: key={:?}, value={}", key, value);
}
pub async fn stream_with_config(
&self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Record, ErrorCode>>, FluvioError>
pub async fn stream_with_config(
&self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Record, ErrorCode>>, FluvioError>
Continuously streams events from a particular offset in the consumer’s partition
Most of the time, you shouldn’t need to use a custom ConsumerConfig
.
If you don’t know what these settings do, try checking out the simpler
[stream
] method that uses the default streaming settings.
Streaming is one of the two ways to consume events in Fluvio.
It is a continuous request for new records arriving in a partition,
beginning at a particular offset. You specify the starting point of the
stream using an Offset
and a ConsumerConfig
, and periodically
receive events, either individually or in batches.
Example
use futures::StreamExt;
// Use a custom max_bytes value in the config
let fetch_config = ConsumerConfig::builder()
.max_bytes(1000)
.build()?;
let mut stream = consumer.stream_with_config(Offset::beginning(), fetch_config).await?;
while let Some(Ok(record)) = stream.next().await {
let key: Option<String> = record.key().map(|key| String::from_utf8_lossy(key).to_string());
let value = String::from_utf8_lossy(record.value());
println!("Got record: key={:?}, value={}", key, value);
}
pub async fn stream_batches_with_config(
&self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Batch, ErrorCode>>, FluvioError>
pub async fn stream_batches_with_config(
&self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Batch, ErrorCode>>, FluvioError>
Continuously streams batches of messages, starting an offset in the consumer’s partition
use futures::StreamExt;
// Use a custom max_bytes value in the config
let fetch_config = ConsumerConfig::builder()
.max_bytes(1000)
.build()?;
let mut stream = consumer.stream_batches_with_config(Offset::beginning(), fetch_config).await?;
while let Some(Ok(batch)) = stream.next().await {
for record in batch.records() {
let key = record.key.as_ref().map(|key| String::from_utf8_lossy(key.as_ref()).to_string());
let value = String::from_utf8_lossy(record.value.as_ref()).to_string();
println!("Got record: key={:?}, value={}", key, value);
}
}
Auto Trait Implementations
impl<P> RefUnwindSafe for PartitionConsumer<P> where
P: RefUnwindSafe,
impl<P> Send for PartitionConsumer<P> where
P: Send + Sync,
impl<P> Sync for PartitionConsumer<P> where
P: Send + Sync,
impl<P> Unpin for PartitionConsumer<P>
impl<P> UnwindSafe for PartitionConsumer<P> where
P: RefUnwindSafe,
Blanket Implementations
Mutably borrows from an owned value. Read more
Instruments this type with the provided Span
, returning an
Instrumented
wrapper. Read more
Attaches the provided Subscriber
to this type, returning a
WithDispatch
wrapper. Read more
Attaches the current default Subscriber
to this type, returning a
WithDispatch
wrapper. Read more
Attaches the provided Subscriber
to this type, returning a
WithDispatch
wrapper. Read more
Attaches the current default Subscriber
to this type, returning a
WithDispatch
wrapper. Read more