Struct fluvio::consumer::MultiplePartitionConsumer [−][src]
pub struct MultiplePartitionConsumer { /* fields omitted */ }
Implementations
Continuously streams events from a particular offset in the selected partitions
Streaming is one of the two ways to consume events in Fluvio.
It is a continuous request for new records arriving in the selected partitions,
beginning at a particular offset. You specify the starting point of the
stream using an Offset
and periodically receive events, either individually
or in batches.
If you want more fine-grained control over how records are streamed,
check out the stream_with_config
method.
Example
use futures::StreamExt;
let mut stream = consumer.stream(Offset::beginning()).await?;
while let Some(Ok(record)) = stream.next().await {
let key = record.key().map(|key| String::from_utf8_lossy(key).to_string());
let value = String::from_utf8_lossy(record.value()).to_string();
println!("Got event: key={:?}, value={}", key, value);
}
pub async fn stream_with_config(
&self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Record, ErrorCode>>, FluvioError>
pub async fn stream_with_config(
&self,
offset: Offset,
config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Record, ErrorCode>>, FluvioError>
Continuously streams events from a particular offset in the selected partitions
Most of the time, you shouldn’t need to use a custom ConsumerConfig
.
If you don’t know what these settings do, try checking out the simpler
[stream
] method that uses the default streaming settings.
Streaming is one of the two ways to consume events in Fluvio.
It is a continuous request for new records arriving in the selected partitions,
beginning at a particular offset. You specify the starting point of the
stream using an Offset
and a ConsumerConfig
, and periodically
receive events, either individually or in batches.
Example
use futures::StreamExt;
// Use a custom max_bytes value in the config
let fetch_config = ConsumerConfig::builder()
.max_bytes(1000)
.build()?;
let mut stream = consumer.stream_with_config(Offset::beginning(), fetch_config).await?;
while let Some(Ok(record)) = stream.next().await {
let key: Option<String> = record.key().map(|key| String::from_utf8_lossy(key).to_string());
let value = String::from_utf8_lossy(record.value());
println!("Got record: key={:?}, value={}", key, value);
}
Auto Trait Implementations
impl !RefUnwindSafe for MultiplePartitionConsumer
impl Send for MultiplePartitionConsumer
impl Sync for MultiplePartitionConsumer
impl Unpin for MultiplePartitionConsumer
impl !UnwindSafe for MultiplePartitionConsumer
Blanket Implementations
Mutably borrows from an owned value. Read more
Instruments this type with the provided Span
, returning an
Instrumented
wrapper. Read more
Attaches the provided Subscriber
to this type, returning a
WithDispatch
wrapper. Read more
Attaches the current default Subscriber
to this type, returning a
WithDispatch
wrapper. Read more
Attaches the provided Subscriber
to this type, returning a
WithDispatch
wrapper. Read more
Attaches the current default Subscriber
to this type, returning a
WithDispatch
wrapper. Read more