Struct fluvio::consumer::PartitionConsumer
source · [−]pub struct PartitionConsumer<P = SpuPool> { /* private fields */ }
Expand description
An interface for consuming events from a particular partition
Implementations
Continuously streams events from a particular offset in the consumer’s partition
Streaming is one of the two ways to consume events in Fluvio.
It is a continuous request for new records arriving in a partition,
beginning at a particular offset. You specify the starting point of the
stream using an Offset
and periodically receive events, either individually
or in batches.
If you want more fine-grained control over how records are streamed,
check out the stream_with_config
method.
Example
use futures::StreamExt;
let mut stream = consumer.stream(Offset::beginning()).await?;
while let Some(Ok(record)) = stream.next().await {
let key = record.key().map(|key| String::from_utf8_lossy(key).to_string());
let value = String::from_utf8_lossy(record.value()).to_string();
println!("Got event: key={:?}, value={}", key, value);
}
pub async fn stream_with_config(
&self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Record, ErrorCode>>, FluvioError>
pub async fn stream_with_config(
&self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Record, ErrorCode>>, FluvioError>
Continuously streams events from a particular offset in the consumer’s partition
Most of the time, you shouldn’t need to use a custom ConsumerConfig
.
If you don’t know what these settings do, try checking out the simpler
PartitionConsumer::stream
method that uses the default streaming settings.
Streaming is one of the two ways to consume events in Fluvio.
It is a continuous request for new records arriving in a partition,
beginning at a particular offset. You specify the starting point of the
stream using an Offset
and a ConsumerConfig
, and periodically
receive events, either individually or in batches.
Example
use futures::StreamExt;
// Use a custom max_bytes value in the config
let fetch_config = ConsumerConfig::builder()
.max_bytes(1000)
.build()?;
let mut stream = consumer.stream_with_config(Offset::beginning(), fetch_config).await?;
while let Some(Ok(record)) = stream.next().await {
let key: Option<String> = record.key().map(|key| String::from_utf8_lossy(key).to_string());
let value = String::from_utf8_lossy(record.value());
println!("Got record: key={:?}, value={}", key, value);
}
pub async fn stream_batches_with_config(
&self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Batch, ErrorCode>>, FluvioError>
pub async fn stream_batches_with_config(
&self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Batch, ErrorCode>>, FluvioError>
Continuously streams batches of messages, starting an offset in the consumer’s partition
use futures::StreamExt;
// Use a custom max_bytes value in the config
let fetch_config = ConsumerConfig::builder()
.max_bytes(1000)
.build()?;
let mut stream = consumer.stream_batches_with_config(Offset::beginning(), fetch_config).await?;
while let Some(Ok(batch)) = stream.next().await {
for record in batch.records() {
let key = record.key.as_ref().map(|key| String::from_utf8_lossy(key.as_ref()).to_string());
let value = String::from_utf8_lossy(record.value.as_ref()).to_string();
println!("Got record: key={:?}, value={}", key, value);
}
}
Auto Trait Implementations
impl<P> RefUnwindSafe for PartitionConsumer<P> where
P: RefUnwindSafe,
impl<P> Send for PartitionConsumer<P> where
P: Send + Sync,
impl<P> Sync for PartitionConsumer<P> where
P: Send + Sync,
impl<P> Unpin for PartitionConsumer<P>
impl<P> UnwindSafe for PartitionConsumer<P> where
P: RefUnwindSafe,
Blanket Implementations
Mutably borrows from an owned value. Read more
Attaches the provided Subscriber
to this type, returning a
WithDispatch
wrapper. Read more
Attaches the current default Subscriber
to this type, returning a
WithDispatch
wrapper. Read more