Struct fluvio::consumer::PartitionConsumer [−][src]
pub struct PartitionConsumer { /* fields omitted */ }
Expand description
An interface for consuming events from a particular partition
There are two ways to consume events: by “fetching” events
and by “streaming” events. Fetching involves specifying a
range of events that you want to consume via their Offset
.
A fetch is a sort of one-time batch operation: you’ll receive
all of the events in your range all at once. When you consume
events via Streaming, you specify a starting Offset
and
receive an object that will continuously yield new events as
they arrive.
Creating a Consumer
You can create a PartitionConsumer
via the partition_consumer
method on the Fluvio
client, like so:
let consumer = fluvio.partition_consumer("my-topic", 0).await?;
let records = consumer.fetch(Offset::beginning()).await?;
Implementations
pub async fn fetch(
&self,
offset: Offset
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError>
👎 Deprecated since 0.9.2: Use ‘stream’ instead
pub async fn fetch(
&self,
offset: Offset
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError>
Use ‘stream’ instead
Fetches events from a particular offset in the consumer’s partition
A “fetch” is one of the two ways to consume events in Fluvio.
It is a batch request for records from a particular offset in
the partition. You specify the position of records to retrieve
using an Offset
, and receive the events as a list of records.
If you want more fine-grained control over how records are fetched,
check out the fetch_with_config
method.
Example
let response = consumer.fetch(Offset::beginning()).await?;
for batch in response.records.batches {
for record in batch.records() {
let string = String::from_utf8_lossy(record.value.as_ref());
println!("Got record: {}", string);
}
}
pub async fn fetch_with_config(
&self,
offset: Offset,
option: ConsumerConfig
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError>
👎 Deprecated since 0.9.2: Use ‘stream_with_config’ instead
pub async fn fetch_with_config(
&self,
offset: Offset,
option: ConsumerConfig
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError>
Use ‘stream_with_config’ instead
Fetches events from a consumer using a specific fetching configuration
Most of the time, you shouldn’t need to use a custom ConsumerConfig
.
If you don’t know what these settings do, try checking out the simpler
fetch
method that uses the default fetching settings.
A “fetch” is one of the two ways to consume events in Fluvio.
It is a batch request for records from a particular offset in
the partition. You specify the range of records to retrieve
using an Offset
and a ConsumerConfig
, and receive
the events as a list of records.
Example
// Use custom fetching configurations
let fetch_config = ConsumerConfig::builder()
.max_bytes(1000)
.build()?;
let response = consumer.fetch_with_config(Offset::beginning(), fetch_config).await?;
for batch in response.records.batches {
for record in batch.records() {
let string = String::from_utf8_lossy(record.value.as_ref());
println!("Got record: {}", string);
}
}
pub async fn stream(
&self,
offset: Offset
) -> Result<impl Stream<Item = Result<Record, FluvioError>>, FluvioError>
pub async fn stream(
&self,
offset: Offset
) -> Result<impl Stream<Item = Result<Record, FluvioError>>, FluvioError>
Continuously streams events from a particular offset in the consumer’s partition
Streaming is one of the two ways to consume events in Fluvio.
It is a continuous request for new records arriving in a partition,
beginning at a particular offset. You specify the starting point of the
stream using an Offset
and periodically receive events, either individually
or in batches.
If you want more fine-grained control over how records are streamed,
check out the stream_with_config
method.
Example
use futures::StreamExt;
let mut stream = consumer.stream(Offset::beginning()).await?;
while let Some(Ok(record)) = stream.next().await {
let key = record.key().map(|key| String::from_utf8_lossy(key).to_string());
let value = String::from_utf8_lossy(record.value()).to_string();
println!("Got event: key={:?}, value={}", key, value);
}
pub async fn stream_with_config(
&self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Record, FluvioError>>, FluvioError>
pub async fn stream_with_config(
&self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Record, FluvioError>>, FluvioError>
Continuously streams events from a particular offset in the consumer’s partition
Most of the time, you shouldn’t need to use a custom ConsumerConfig
.
If you don’t know what these settings do, try checking out the simpler
[stream
] method that uses the default streaming settings.
Streaming is one of the two ways to consume events in Fluvio.
It is a continuous request for new records arriving in a partition,
beginning at a particular offset. You specify the starting point of the
stream using an Offset
and a ConsumerConfig
, and periodically
receive events, either individually or in batches.
Example
use futures::StreamExt;
// Use a custom max_bytes value in the config
let fetch_config = ConsumerConfig::builder()
.max_bytes(1000)
.build()?;
let mut stream = consumer.stream_with_config(Offset::beginning(), fetch_config).await?;
while let Some(Ok(record)) = stream.next().await {
let key: Option<String> = record.key().map(|key| String::from_utf8_lossy(key).to_string());
let value = String::from_utf8_lossy(record.value());
println!("Got record: key={:?}, value={}", key, value);
}
pub async fn stream_batches_with_config(
&self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Batch, FluvioError>>, FluvioError>
pub async fn stream_batches_with_config(
&self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Batch, FluvioError>>, FluvioError>
Continuously streams batches of messages, starting an offset in the consumer’s partition
use futures::StreamExt;
// Use a custom max_bytes value in the config
let fetch_config = ConsumerConfig::builder()
.max_bytes(1000)
.build()?;
let mut stream = consumer.stream_batches_with_config(Offset::beginning(), fetch_config).await?;
while let Some(Ok(batch)) = stream.next().await {
for record in batch.records() {
let key = record.key.as_ref().map(|key| String::from_utf8_lossy(key.as_ref()).to_string());
let value = String::from_utf8_lossy(record.value.as_ref()).to_string();
println!("Got record: key={:?}, value={}", key, value);
}
}
Auto Trait Implementations
impl !RefUnwindSafe for PartitionConsumer
impl Send for PartitionConsumer
impl Sync for PartitionConsumer
impl Unpin for PartitionConsumer
impl !UnwindSafe for PartitionConsumer
Blanket Implementations
Mutably borrows from an owned value. Read more
Instruments this type with the provided Span
, returning an
Instrumented
wrapper. Read more
Instruments this type with the provided Span
, returning an
Instrumented
wrapper. Read more
Attaches the provided Subscriber
to this type, returning a
WithDispatch
wrapper. Read more
Attaches the current default Subscriber
to this type, returning a
WithDispatch
wrapper. Read more