fluvio 0.12.3

The offical Fluvio driver for Rust
Documentation
use std::sync::Arc;

use futures_util::stream::{Stream, select_all};
use tracing::{debug, error, trace, instrument, info};
use once_cell::sync::Lazy;
use futures_util::future::{Either, err, join_all};
use futures_util::stream::{StreamExt, once, iter};
use futures_util::FutureExt;

use fluvio_spu_schema::server::stream_fetch::{
    DefaultStreamFetchRequest, DefaultStreamFetchResponse, GZIP_WASM_API, SMART_MODULE_API,
    LegacySmartModulePayload, SmartModuleWasmCompressed, WASM_MODULE_API, WASM_MODULE_V2_API,
};
pub use fluvio_spu_schema::server::stream_fetch::{
    SmartModuleInvocation, SmartModuleInvocationWasm, SmartModuleKind, DerivedStreamInvocation,
};
use dataplane::Isolation;
use dataplane::ReplicaKey;
use dataplane::ErrorCode;
use dataplane::batch::Batch;
use fluvio_types::event::offsets::OffsetPublisher;

use crate::FluvioError;
use crate::offset::{Offset, fetch_offsets};
use crate::spu::{SpuDirectory, SpuPool};
use derive_builder::Builder;

pub use dataplane::record::ConsumerRecord as Record;

/// An interface for consuming events from a particular partition
///
///
///
/// [`Offset`]: struct.Offset.html
/// [`partition_consumer`]: struct.Fluvio.html#method.partition_consumer
/// [`Fluvio`]: struct.Fluvio.html
pub struct PartitionConsumer<P = SpuPool> {
    topic: String,
    partition: i32,
    pool: Arc<P>,
}

impl<P> PartitionConsumer<P>
where
    P: SpuDirectory,
{
    pub fn new(topic: String, partition: i32, pool: Arc<P>) -> Self {
        Self {
            topic,
            partition,
            pool,
        }
    }

    /// Returns the name of the Topic that this consumer reads from
    pub fn topic(&self) -> &str {
        &self.topic
    }

    /// Returns the ID of the partition that this consumer reads from
    pub fn partition(&self) -> i32 {
        self.partition
    }

    /// Continuously streams events from a particular offset in the consumer's partition
    ///
    /// Streaming is one of the two ways to consume events in Fluvio.
    /// It is a continuous request for new records arriving in a partition,
    /// beginning at a particular offset. You specify the starting point of the
    /// stream using an [`Offset`] and periodically receive events, either individually
    /// or in batches.
    ///
    /// If you want more fine-grained control over how records are streamed,
    /// check out the [`stream_with_config`] method.
    ///
    /// # Example
    ///
    /// ```
    /// # use fluvio::{PartitionConsumer, FluvioError};
    /// # use fluvio::{Offset, ConsumerConfig};
    /// # mod futures {
    /// #     pub use futures_util::stream::StreamExt;
    /// # }
    /// # async fn example(consumer: &PartitionConsumer) -> Result<(), FluvioError> {
    /// use futures::StreamExt;
    /// let mut stream = consumer.stream(Offset::beginning()).await?;
    /// while let Some(Ok(record)) = stream.next().await {
    ///     let key = record.key().map(|key| String::from_utf8_lossy(key).to_string());
    ///     let value = String::from_utf8_lossy(record.value()).to_string();
    ///     println!("Got event: key={:?}, value={}", key, value);
    /// }
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// [`Offset`]: struct.Offset.html
    /// [`ConsumerConfig`]: struct.ConsumerConfig.html
    /// [`stream_with_config`]: struct.ConsumerConfig.html#method.stream_with_config
    #[instrument(skip(self, offset))]
    pub async fn stream(
        &self,
        offset: Offset,
    ) -> Result<impl Stream<Item = Result<Record, ErrorCode>>, FluvioError> {
        let config = ConsumerConfig::builder().build()?;
        let stream = self.stream_with_config(offset, config).await?;

        Ok(stream)
    }

    /// Continuously streams events from a particular offset in the consumer's partition
    ///
    /// Most of the time, you shouldn't need to use a custom [`ConsumerConfig`].
    /// If you don't know what these settings do, try checking out the simpler
    /// [`PartitionConsumer::stream`] method that uses the default streaming settings.
    ///
    /// Streaming is one of the two ways to consume events in Fluvio.
    /// It is a continuous request for new records arriving in a partition,
    /// beginning at a particular offset. You specify the starting point of the
    /// stream using an [`Offset`] and a [`ConsumerConfig`], and periodically
    /// receive events, either individually or in batches.
    ///
    /// # Example
    ///
    /// ```
    /// # use fluvio::{PartitionConsumer, FluvioError};
    /// # use fluvio::{Offset, ConsumerConfig};
    /// # mod futures {
    /// #     pub use futures_util::stream::StreamExt;
    /// # }
    /// # async fn example(consumer: &PartitionConsumer) -> Result<(), FluvioError> {
    /// use futures::StreamExt;
    /// // Use a custom max_bytes value in the config
    /// let fetch_config = ConsumerConfig::builder()
    ///     .max_bytes(1000)
    ///     .build()?;
    /// let mut stream = consumer.stream_with_config(Offset::beginning(), fetch_config).await?;
    /// while let Some(Ok(record)) = stream.next().await {
    ///     let key: Option<String> = record.key().map(|key| String::from_utf8_lossy(key).to_string());
    ///     let value = String::from_utf8_lossy(record.value());
    ///     println!("Got record: key={:?}, value={}", key, value);
    /// }
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// [`Offset`]: struct.Offset.html
    /// [`ConsumerConfig`]: struct.ConsumerConfig.html
    #[instrument(skip(self, offset, config))]
    pub async fn stream_with_config(
        &self,
        offset: Offset,
        config: ConsumerConfig,
    ) -> Result<impl Stream<Item = Result<Record, ErrorCode>>, FluvioError> {
        let stream = self.stream_batches_with_config(offset, config).await?;
        let partition = self.partition;
        let flattened =
            stream.flat_map(move |result: Result<Batch, _>| match result {
                Err(e) => Either::Right(once(err(e))),
                Ok(batch) => {
                    let base_offset = batch.base_offset;
                    let records = batch.own_records().into_iter().enumerate().map(
                        move |(relative, record)| {
                            Ok(Record {
                                partition,
                                offset: base_offset + relative as i64,
                                record,
                            })
                        },
                    );
                    Either::Left(iter(records))
                }
            });

        Ok(flattened)
    }

    /// Continuously streams batches of messages, starting an offset in the consumer's partition
    ///
    /// ```
    /// # use fluvio::{PartitionConsumer, FluvioError};
    /// # use fluvio::{Offset, ConsumerConfig};
    /// # mod futures {
    /// #     pub use futures_util::stream::StreamExt;
    /// # }
    /// # async fn example(consumer: &PartitionConsumer) -> Result<(), FluvioError> {
    /// use futures::StreamExt;
    /// // Use a custom max_bytes value in the config
    /// let fetch_config = ConsumerConfig::builder()
    ///     .max_bytes(1000)
    ///     .build()?;
    /// let mut stream = consumer.stream_batches_with_config(Offset::beginning(), fetch_config).await?;
    /// while let Some(Ok(batch)) = stream.next().await {
    ///     for record in batch.records() {
    ///         let key = record.key.as_ref().map(|key| String::from_utf8_lossy(key.as_ref()).to_string());
    ///         let value = String::from_utf8_lossy(record.value.as_ref()).to_string();
    ///         println!("Got record: key={:?}, value={}", key, value);
    ///     }
    /// }
    /// # Ok(())
    /// # }
    /// ```
    #[instrument(skip(self, offset, config))]
    pub async fn stream_batches_with_config(
        &self,
        offset: Offset,
        config: ConsumerConfig,
    ) -> Result<impl Stream<Item = Result<Batch, ErrorCode>>, FluvioError> {
        let stream = self.request_stream(offset, config).await?;
        let flattened = stream.flat_map(|batch_result: Result<DefaultStreamFetchResponse, _>| {
            let response = match batch_result {
                Ok(response) => response,
                Err(e) => return Either::Right(once(err(e))),
            };

            // If we ever get an error_code AND batches of records, we want to first send
            // the records down the consumer stream, THEN an Err with the error inside.
            // This way the consumer always gets to read all records that were properly
            // processed before hitting an error, so that the error does not obscure those records.
            let batches = response.partition.records.batches.into_iter().map(Ok);
            let error = {
                let code = response.partition.error_code;
                match code {
                    ErrorCode::None => None,
                    _ => Some(Err(code)),
                }
            };

            let items = batches.chain(error.into_iter());
            Either::Left(iter(items))
        });

        Ok(flattened)
    }

    /// Creates a stream of `DefaultStreamFetchResponse` for older consumers who rely
    /// on the internal structure of the fetch response. New clients should use the
    /// `stream` and `stream_with_config` methods.
    #[instrument(skip(self, config))]
    async fn request_stream(
        &self,
        offset: Offset,
        config: ConsumerConfig,
    ) -> Result<impl Stream<Item = Result<DefaultStreamFetchResponse, ErrorCode>>, FluvioError>
    {
        use fluvio_future::task::spawn;
        use futures_util::stream::empty;
        use fluvio_protocol::api::Request;

        let replica = ReplicaKey::new(&self.topic, self.partition);
        let mut serial_socket = self.pool.create_serial_socket(&replica).await?;
        let offsets = fetch_offsets(&mut serial_socket, &replica).await?;

        let start_absolute_offset = offset.resolve(&offsets).await?;
        let end_absolute_offset = offsets.last_stable_offset;
        let record_count = end_absolute_offset - start_absolute_offset;

        debug!(start_absolute_offset, end_absolute_offset, record_count);

        let mut stream_request = DefaultStreamFetchRequest {
            topic: self.topic.to_owned(),
            partition: self.partition,
            fetch_offset: start_absolute_offset,
            isolation: config.isolation,
            max_bytes: config.max_bytes,
            ..Default::default()
        };

        // add wasm module if SPU supports it
        let stream_fetch_version = serial_socket
            .versions()
            .lookup_version(
                DefaultStreamFetchRequest::API_KEY,
                DefaultStreamFetchRequest::DEFAULT_API_VERSION,
            )
            .unwrap_or((WASM_MODULE_API - 1) as i16);
        debug!(%stream_fetch_version, "stream_fetch_version");

        stream_request.derivedstream = config.derivedstream;

        if let Some(smartmodule) = config.smartmodule {
            if stream_fetch_version < SMART_MODULE_API as i16 {
                if let SmartModuleInvocationWasm::AdHoc(wasm) = smartmodule.wasm {
                    let legacy_module = LegacySmartModulePayload {
                        wasm: SmartModuleWasmCompressed::Gzip(wasm),
                        kind: smartmodule.kind,
                        params: smartmodule.params,
                    };
                    legacy_set_wasm(stream_fetch_version, &mut stream_request, legacy_module)?;
                } else {
                    return Err(FluvioError::Other(
                        "SPU does not support persistent WASM".to_owned(),
                    ));
                }
            } else {
                debug!("Using persistent WASM API");
                stream_request.smartmodule = Some(smartmodule);
            }
        }

        if let Some(module) = config.wasm_module {
            legacy_set_wasm(stream_fetch_version, &mut stream_request, module)?;
        }

        let mut stream = self
            .pool
            .create_stream_with_version(&replica, stream_request, stream_fetch_version)
            .await?;

        let ft_stream = async move {
            if let Some(Ok(response)) = stream.next().await {
                let stream_id = response.stream_id;

                trace!("first stream response: {:#?}", response);
                debug!(
                    stream_id,
                    last_offset = ?response.partition.next_offset_for_fetch(),
                    "first stream response"
                );

                let publisher = OffsetPublisher::shared(0);
                let mut listener = publisher.change_listner();

                // update stream with received offsets
                spawn(async move {
                    use fluvio_spu_schema::server::update_offset::{UpdateOffsetsRequest, OffsetUpdate};

                    loop {
                        let fetch_last_value = listener.listen().await;
                        debug!(fetch_last_value, stream_id, "received end fetch");
                        if fetch_last_value < 0 {
                            debug!("fetch last is end, terminating");
                            break;
                        } else {
                            debug!(
                                offset = fetch_last_value,
                                session_id = stream_id,
                                "sending back offset to spu"
                            );
                            let request = UpdateOffsetsRequest {
                                offsets: vec![OffsetUpdate {
                                    offset: fetch_last_value,
                                    session_id: stream_id,
                                }],
                            };
                            debug!(?request, "Sending offset update request:");
                            let response = serial_socket.send_receive(request).await;
                            if let Err(err) = response {
                                error!("error sending offset: {:#?}", err);
                                break;
                            }
                        }
                    }
                    debug!(stream_id, "offset fetch update loop end");
                });

                // send back first offset records exists
                if let Some(last_offset) = response.partition.next_offset_for_fetch() {
                    debug!(last_offset, "notify new last offset");
                    publisher.update(last_offset);
                }

                let response_publisher = publisher.clone();
                let update_stream = stream.map(move |item| {
                    item.map(|response| {
                        if let Some(last_offset) = response.partition.next_offset_for_fetch() {
                            debug!(last_offset, stream_id, "received last offset from spu");
                            response_publisher.update(last_offset);
                        }
                        response
                    })
                    .map_err(|e| {
                        error!(?e, "error in stream");
                        ErrorCode::Other(e.to_string())
                    })
                });
                Either::Left(
                    iter(vec![Ok(response)])
                        .chain(publish_stream::EndPublishSt::new(update_stream, publisher)),
                )
            } else {
                info!("stream ended");
                Either::Right(empty())
            }
        };

        let stream = if config.disable_continuous {
            TakeRecords::new(ft_stream.flatten_stream().boxed(), record_count).boxed()
        } else {
            ft_stream.flatten_stream().boxed()
        };

        Ok(stream)
    }
}

fn legacy_set_wasm(
    stream_fetch_version: i16,
    stream_request: &mut DefaultStreamFetchRequest,
    mut module: LegacySmartModulePayload,
) -> Result<(), FluvioError> {
    if stream_fetch_version < WASM_MODULE_API as i16 {
        return Err(FluvioError::Other("SPU does not support WASM".to_owned()));
    }

    if stream_fetch_version < WASM_MODULE_V2_API as i16 {
        debug!("Using WASM V1 API");
        let wasm = module.wasm.get_raw()?;
        stream_request.wasm_module = wasm.into_owned();
    } else {
        debug!("Using WASM V2 API");
        if stream_fetch_version < GZIP_WASM_API as i16 {
            module.wasm.to_raw()?;
        } else {
            debug!("Using compressed WASM API");
            module.wasm.to_gzip()?;
        }
        stream_request.wasm_payload = Some(module);
    }

    Ok(())
}

/// Wrap an inner record stream and only stream until a given number of records have been fetched.
///
/// This is used for "disable continuous" mode. In this mode, we first make a FetchOffsetPartitionResponse
/// in order to see the starting and ending offsets currently available for this partition.
/// Based on the starting offset the caller asks for, we can figure out the "record count", or
/// how many records from the start onward we know for sure we can stream without waiting.
/// We then use `TakeRecords` to stop the stream as soon as we reach that point, so the user
/// (e.g. on the CLI) does not spend any time waiting for new records to be produced, they are
/// simply given all the records that are already available.
struct TakeRecords<S> {
    remaining: i64,
    stream: S,
}

impl<S> TakeRecords<S>
where
    S: Stream<Item = Result<DefaultStreamFetchResponse, ErrorCode>> + std::marker::Unpin,
{
    pub fn new(stream: S, until: i64) -> Self {
        Self {
            remaining: until,
            stream,
        }
    }
}

impl<S> Stream for TakeRecords<S>
where
    S: Stream<Item = Result<DefaultStreamFetchResponse, ErrorCode>> + std::marker::Unpin,
{
    type Item = S::Item;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        use std::{pin::Pin, task::Poll};
        use futures_util::ready;
        if self.remaining <= 0 {
            return Poll::Ready(None);
        }
        let next = ready!(Pin::new(&mut self.as_mut().stream).poll_next(cx));
        match next {
            Some(Ok(response)) => {
                // Count how many records are present in this batch's response
                let count: usize = response
                    .partition
                    .records
                    .batches
                    .iter()
                    .map(|it| it.records().len())
                    .sum();
                let diff = self.remaining - count as i64;
                self.remaining = diff.max(0);
                Poll::Ready(Some(Ok(response)))
            }
            other => Poll::Ready(other),
        }
    }
}

mod publish_stream {

    use std::pin::Pin;
    use std::sync::Arc;
    use std::task::{Poll, Context};

    use pin_project_lite::pin_project;
    use futures_util::ready;

    use super::Stream;
    use super::OffsetPublisher;

    // signal offset when stream is done
    pin_project! {
        pub struct EndPublishSt<St> {
            #[pin]
            stream: St,
            publisher: Arc<OffsetPublisher>
        }
    }

    impl<St> EndPublishSt<St> {
        pub fn new(stream: St, publisher: Arc<OffsetPublisher>) -> Self {
            Self { stream, publisher }
        }
    }

    impl<S: Stream> Stream for EndPublishSt<S> {
        type Item = S::Item;

        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
            let this = self.project();

            let item = ready!(this.stream.poll_next(cx));
            if item.is_none() {
                this.publisher.update(-1);
            }
            Poll::Ready(item)
        }

        fn size_hint(&self) -> (usize, Option<usize>) {
            self.stream.size_hint()
        }
    }
}

/// MAX FETCH BYTES
static MAX_FETCH_BYTES: Lazy<i32> = Lazy::new(|| {
    use std::env;
    let var_value = env::var("FLV_CLIENT_MAX_FETCH_BYTES").unwrap_or_default();
    let max_bytes: i32 = var_value.parse().unwrap_or(1000000);
    max_bytes
});

/// Configures the behavior of consumer fetching and streaming
#[derive(Debug, Builder, Clone)]
#[builder(build_fn(private, name = "build_impl"))]
pub struct ConsumerConfig {
    #[builder(default)]
    disable_continuous: bool,
    #[builder(default = "*MAX_FETCH_BYTES")]
    pub(crate) max_bytes: i32,
    #[builder(default)]
    pub(crate) isolation: Isolation,
    #[builder(private, default, setter(into, strip_option))]
    pub(crate) wasm_module: Option<LegacySmartModulePayload>,
    #[builder(default)]
    pub(crate) smartmodule: Option<SmartModuleInvocation>,
    #[builder(default)]
    pub(crate) derivedstream: Option<DerivedStreamInvocation>,
}

impl ConsumerConfig {
    pub fn builder() -> ConsumerConfigBuilder {
        ConsumerConfigBuilder::default()
    }
}

impl ConsumerConfigBuilder {
    pub fn build(&self) -> Result<ConsumerConfig, FluvioError> {
        let config = self.build_impl().map_err(|e| {
            FluvioError::ConsumerConfig(format!("Missing required config option: {}", e))
        })?;
        Ok(config)
    }
}

/// Strategy used to select which partitions and from which topics should be streamed by the [`MultiplePartitionConsumer`]
pub enum PartitionSelectionStrategy {
    /// Consume from all the partitions of a given topic
    All(String),
    /// Consume from a given list of topics and partitions
    Multiple(Vec<(String, i32)>),
}

impl PartitionSelectionStrategy {
    async fn selection(&self, spu_pool: Arc<SpuPool>) -> Result<Vec<(String, i32)>, FluvioError> {
        let pairs = match self {
            PartitionSelectionStrategy::All(topic) => {
                let topics = spu_pool.metadata.topics();
                let topic_spec = topics
                    .lookup_by_key(topic)
                    .await?
                    .ok_or_else(|| FluvioError::TopicNotFound(topic.to_string()))?
                    .spec;
                let partition_count = topic_spec.partitions();
                (0..partition_count)
                    .map(|partition| (topic.clone(), partition))
                    .collect::<Vec<_>>()
            }
            PartitionSelectionStrategy::Multiple(topic_partition) => topic_partition.to_owned(),
        };
        Ok(pairs)
    }
}
pub struct MultiplePartitionConsumer {
    strategy: PartitionSelectionStrategy,
    pool: Arc<SpuPool>,
}

impl MultiplePartitionConsumer {
    pub(crate) fn new(strategy: PartitionSelectionStrategy, pool: Arc<SpuPool>) -> Self {
        Self { strategy, pool }
    }

    /// Continuously streams events from a particular offset in the selected partitions
    ///
    /// Streaming is one of the two ways to consume events in Fluvio.
    /// It is a continuous request for new records arriving in the selected partitions,
    /// beginning at a particular offset. You specify the starting point of the
    /// stream using an [`Offset`] and periodically receive events, either individually
    /// or in batches.
    ///
    /// If you want more fine-grained control over how records are streamed,
    /// check out the [`stream_with_config`] method.
    ///
    /// # Example
    ///
    /// ```
    /// # use fluvio::{MultiplePartitionConsumer, FluvioError};
    /// # use fluvio::{Offset, ConsumerConfig};
    /// # mod futures {
    /// #     pub use futures_util::stream::StreamExt;
    /// # }
    /// # async fn example(consumer: &MultiplePartitionConsumer) -> Result<(), FluvioError> {
    /// use futures::StreamExt;
    /// let mut stream = consumer.stream(Offset::beginning()).await?;
    /// while let Some(Ok(record)) = stream.next().await {
    ///     let key = record.key().map(|key| String::from_utf8_lossy(key).to_string());
    ///     let value = String::from_utf8_lossy(record.value()).to_string();
    ///     println!("Got event: key={:?}, value={}", key, value);
    /// }
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// [`Offset`]: struct.Offset.html
    /// [`ConsumerConfig`]: struct.ConsumerConfig.html
    /// [`stream_with_config`]: struct.ConsumerConfig.html#method.stream_with_config
    #[instrument(skip(self, offset))]
    pub async fn stream(
        &self,
        offset: Offset,
    ) -> Result<impl Stream<Item = Result<Record, ErrorCode>>, FluvioError> {
        let config = ConsumerConfig::builder().build()?;
        let stream = self.stream_with_config(offset, config).await?;

        Ok(stream)
    }

    /// Continuously streams events from a particular offset in the selected partitions
    ///
    /// Most of the time, you shouldn't need to use a custom [`ConsumerConfig`].
    /// If you don't know what these settings do, try checking out the simpler
    /// [`stream`] method that uses the default streaming settings.
    ///
    /// Streaming is one of the two ways to consume events in Fluvio.
    /// It is a continuous request for new records arriving in the selected partitions,
    /// beginning at a particular offset. You specify the starting point of the
    /// stream using an [`Offset`] and a [`ConsumerConfig`], and periodically
    /// receive events, either individually or in batches.
    ///
    /// # Example
    ///
    /// ```
    /// # use fluvio::{MultiplePartitionConsumer, FluvioError};
    /// # use fluvio::{Offset, ConsumerConfig};
    /// # mod futures {
    /// #     pub use futures_util::stream::StreamExt;
    /// # }
    /// # async fn example(consumer: &MultiplePartitionConsumer) -> Result<(), FluvioError> {
    /// use futures::StreamExt;
    /// // Use a custom max_bytes value in the config
    /// let fetch_config = ConsumerConfig::builder()
    ///     .max_bytes(1000)
    ///     .build()?;
    /// let mut stream = consumer.stream_with_config(Offset::beginning(), fetch_config).await?;
    /// while let Some(Ok(record)) = stream.next().await {
    ///     let key: Option<String> = record.key().map(|key| String::from_utf8_lossy(key).to_string());
    ///     let value = String::from_utf8_lossy(record.value());
    ///     println!("Got record: key={:?}, value={}", key, value);
    /// }
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// [`Offset`]: struct.Offset.html
    /// [`ConsumerConfig`]: struct.ConsumerConfig.html
    #[instrument(skip(self, offset, config))]
    pub async fn stream_with_config(
        &self,
        offset: Offset,
        config: ConsumerConfig,
    ) -> Result<impl Stream<Item = Result<Record, ErrorCode>>, FluvioError> {
        let consumers = self
            .strategy
            .selection(self.pool.clone())
            .await?
            .into_iter()
            .map(|(topic, partition)| PartitionConsumer::new(topic, partition, self.pool.clone()))
            .collect::<Vec<_>>();

        let streams_future = consumers
            .iter()
            .map(|consumer| consumer.stream_with_config(offset.clone(), config.clone()));

        let streams_result = join_all(streams_future).await;

        let streams = streams_result.into_iter().collect::<Result<Vec<_>, _>>()?;

        Ok(select_all(streams))
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_consumer_config_default() {
        let _config = ConsumerConfig::builder().build().unwrap();
    }
}