Struct fluvio::consumer::PartitionConsumer[][src]

pub struct PartitionConsumer { /* fields omitted */ }

An interface for consuming events from a particular partition

There are two ways to consume events: by “fetching” events and by “streaming” events. Fetching involves specifying a range of events that you want to consume via their Offset. A fetch is a sort of one-time batch operation: you’ll receive all of the events in your range all at once. When you consume events via Streaming, you specify a starting Offset and receive an object that will continuously yield new events as they arrive.

Creating a Consumer

You can create a PartitionConsumer via the partition_consumer method on the Fluvio client, like so:

let consumer = fluvio.partition_consumer("my-topic", 0).await?;
let records = consumer.fetch(Offset::beginning()).await?;

Implementations

impl PartitionConsumer[src]

pub fn topic(&self) -> &str[src]

Returns the name of the Topic that this consumer reads from

pub fn partition(&self) -> i32[src]

Returns the ID of the partition that this consumer reads from

pub async fn fetch(
    &self,
    offset: Offset
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError>
[src]

Fetches events from a particular offset in the consumer’s partition

A “fetch” is one of the two ways to consume events in Fluvio. It is a batch request for records from a particular offset in the partition. You specify the position of records to retrieve using an Offset, and receive the events as a list of records.

If you want more fine-grained control over how records are fetched, check out the fetch_with_config method.

Example

let response = consumer.fetch(Offset::beginning()).await?;
for batch in response.records.batches {
    for record in batch.records() {
        let string = String::from_utf8_lossy(record.value.as_ref());
        println!("Got record: {}", string);
    }
}

pub async fn fetch_with_config(
    &self,
    offset: Offset,
    option: ConsumerConfig
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError>
[src]

Fetches events from a consumer using a specific fetching configuration

Most of the time, you shouldn’t need to use a custom ConsumerConfig. If you don’t know what these settings do, try checking out the simpler fetch method that uses the default fetching settings.

A “fetch” is one of the two ways to consume events in Fluvio. It is a batch request for records from a particular offset in the partition. You specify the range of records to retrieve using an Offset and a ConsumerConfig, and receive the events as a list of records.

Example

// Use custom fetching configurations
let fetch_config = ConsumerConfig::default()
    .with_max_bytes(1000);

let response = consumer.fetch_with_config(Offset::beginning(), fetch_config).await?;
for batch in response.records.batches {
    for record in batch.records() {
        let string = String::from_utf8_lossy(record.value.as_ref());
        println!("Got record: {}", string);
    }
}

pub async fn stream(
    &self,
    offset: Offset
) -> Result<impl Stream<Item = Result<Record, FluvioError>>, FluvioError>
[src]

Continuously streams events from a particular offset in the consumer’s partition

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and periodically receive events, either individually or in batches.

If you want more fine-grained control over how records are streamed, check out the stream_with_config method.

Example

use futures::StreamExt;
let mut stream = consumer.stream(Offset::beginning()).await?;
while let Some(Ok(record)) = stream.next().await {
    let key = record.key().map(|key| String::from_utf8_lossy(key).to_string());
    let value = String::from_utf8_lossy(record.value()).to_string();
    println!("Got event: key={:?}, value={}", key, value);
}

pub async fn stream_with_config(
    &self,
    offset: Offset,
    config: ConsumerConfig
) -> Result<impl Stream<Item = Result<Record, FluvioError>>, FluvioError>
[src]

Continuously streams events from a particular offset in the consumer’s partition

Most of the time, you shouldn’t need to use a custom ConsumerConfig. If you don’t know what these settings do, try checking out the simpler [stream] method that uses the default streaming settings.

Streaming is one of the two ways to consume events in Fluvio. It is a continuous request for new records arriving in a partition, beginning at a particular offset. You specify the starting point of the stream using an Offset and a ConsumerConfig, and periodically receive events, either individually or in batches.

Example

use futures::StreamExt;
// Use a custom max_bytes value in the config
let fetch_config = ConsumerConfig::default()
    .with_max_bytes(1000);
let mut stream = consumer.stream_with_config(Offset::beginning(), fetch_config).await?;
while let Some(Ok(record)) = stream.next().await {
    let key: Option<String> = record.key().map(|key| String::from_utf8_lossy(key).to_string());
    let value = String::from_utf8_lossy(record.value());
    println!("Got record: key={:?}, value={}", key, value);
}

pub async fn stream_batches_with_config(
    &self,
    offset: Offset,
    config: ConsumerConfig
) -> Result<impl Stream<Item = Result<DefaultBatch, FluvioError>>, FluvioError>
[src]

Continuously streams batches of messages, starting an offset in the consumer’s partition

use futures::StreamExt;
// Use a custom max_bytes value in the config
let fetch_config = ConsumerConfig::default()
    .with_max_bytes(1000);
let mut stream = consumer.stream_batches_with_config(Offset::beginning(), fetch_config).await?;
while let Some(Ok(batch)) = stream.next().await {
    for record in batch.records() {
        let key = record.key.as_ref().map(|key| String::from_utf8_lossy(key.as_ref()).to_string());
        let value = String::from_utf8_lossy(record.value.as_ref()).to_string();
        println!("Got record: key={:?}, value={}", key, value);
    }
}

Auto Trait Implementations

Blanket Implementations

impl<T> Any for T where
    T: 'static + ?Sized
[src]

impl<T> Borrow<T> for T where
    T: ?Sized
[src]

impl<T> BorrowMut<T> for T where
    T: ?Sized
[src]

impl<T> Erased for T

impl<T> From<T> for T[src]

impl<T> Instrument for T[src]

impl<T> Instrument for T[src]

impl<T, U> Into<U> for T where
    U: From<T>, 
[src]

impl<T, U> TryFrom<U> for T where
    U: Into<T>, 
[src]

type Error = Infallible

The type returned in the event of a conversion error.

impl<T, U> TryInto<U> for T where
    U: TryFrom<T>, 
[src]

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.

impl<T> WithSubscriber for T[src]