use std::sync::Arc;
use futures_util::stream::{Stream, select_all};
use tracing::{debug, error, trace, instrument, info};
use once_cell::sync::Lazy;
use futures_util::future::{Either, err, join_all};
use futures_util::stream::{StreamExt, once, iter};
use futures_util::FutureExt;
use fluvio_spu_schema::server::stream_fetch::{
DefaultStreamFetchRequest, DefaultStreamFetchResponse, GZIP_WASM_API, SMART_MODULE_API,
LegacySmartModulePayload, SmartModuleWasmCompressed, WASM_MODULE_API, WASM_MODULE_V2_API,
};
pub use fluvio_spu_schema::server::stream_fetch::{
SmartModuleInvocation, SmartModuleInvocationWasm, SmartModuleKind, DerivedStreamInvocation,
};
use dataplane::Isolation;
use dataplane::ReplicaKey;
use dataplane::ErrorCode;
use dataplane::batch::Batch;
use fluvio_types::event::offsets::OffsetPublisher;
use crate::FluvioError;
use crate::offset::{Offset, fetch_offsets};
use crate::spu::{SpuDirectory, SpuPool};
use derive_builder::Builder;
pub use dataplane::record::ConsumerRecord as Record;
/// An interface for consuming events from a particular partition
///
///
///
/// [`Offset`]: struct.Offset.html
/// [`partition_consumer`]: struct.Fluvio.html#method.partition_consumer
/// [`Fluvio`]: struct.Fluvio.html
pub struct PartitionConsumer<P = SpuPool> {
topic: String,
partition: i32,
pool: Arc<P>,
}
impl<P> PartitionConsumer<P>
where
P: SpuDirectory,
{
pub fn new(topic: String, partition: i32, pool: Arc<P>) -> Self {
Self {
topic,
partition,
pool,
}
}
/// Returns the name of the Topic that this consumer reads from
pub fn topic(&self) -> &str {
&self.topic
}
/// Returns the ID of the partition that this consumer reads from
pub fn partition(&self) -> i32 {
self.partition
}
/// Continuously streams events from a particular offset in the consumer's partition
///
/// Streaming is one of the two ways to consume events in Fluvio.
/// It is a continuous request for new records arriving in a partition,
/// beginning at a particular offset. You specify the starting point of the
/// stream using an [`Offset`] and periodically receive events, either individually
/// or in batches.
///
/// If you want more fine-grained control over how records are streamed,
/// check out the [`stream_with_config`] method.
///
/// # Example
///
/// ```
/// # use fluvio::{PartitionConsumer, FluvioError};
/// # use fluvio::{Offset, ConsumerConfig};
/// # mod futures {
/// # pub use futures_util::stream::StreamExt;
/// # }
/// # async fn example(consumer: &PartitionConsumer) -> Result<(), FluvioError> {
/// use futures::StreamExt;
/// let mut stream = consumer.stream(Offset::beginning()).await?;
/// while let Some(Ok(record)) = stream.next().await {
/// let key = record.key().map(|key| String::from_utf8_lossy(key).to_string());
/// let value = String::from_utf8_lossy(record.value()).to_string();
/// println!("Got event: key={:?}, value={}", key, value);
/// }
/// # Ok(())
/// # }
/// ```
///
/// [`Offset`]: struct.Offset.html
/// [`ConsumerConfig`]: struct.ConsumerConfig.html
/// [`stream_with_config`]: struct.ConsumerConfig.html#method.stream_with_config
#[instrument(skip(self, offset))]
pub async fn stream(
&self,
offset: Offset,
) -> Result<impl Stream<Item = Result<Record, ErrorCode>>, FluvioError> {
let config = ConsumerConfig::builder().build()?;
let stream = self.stream_with_config(offset, config).await?;
Ok(stream)
}
/// Continuously streams events from a particular offset in the consumer's partition
///
/// Most of the time, you shouldn't need to use a custom [`ConsumerConfig`].
/// If you don't know what these settings do, try checking out the simpler
/// [`PartitionConsumer::stream`] method that uses the default streaming settings.
///
/// Streaming is one of the two ways to consume events in Fluvio.
/// It is a continuous request for new records arriving in a partition,
/// beginning at a particular offset. You specify the starting point of the
/// stream using an [`Offset`] and a [`ConsumerConfig`], and periodically
/// receive events, either individually or in batches.
///
/// # Example
///
/// ```
/// # use fluvio::{PartitionConsumer, FluvioError};
/// # use fluvio::{Offset, ConsumerConfig};
/// # mod futures {
/// # pub use futures_util::stream::StreamExt;
/// # }
/// # async fn example(consumer: &PartitionConsumer) -> Result<(), FluvioError> {
/// use futures::StreamExt;
/// // Use a custom max_bytes value in the config
/// let fetch_config = ConsumerConfig::builder()
/// .max_bytes(1000)
/// .build()?;
/// let mut stream = consumer.stream_with_config(Offset::beginning(), fetch_config).await?;
/// while let Some(Ok(record)) = stream.next().await {
/// let key: Option<String> = record.key().map(|key| String::from_utf8_lossy(key).to_string());
/// let value = String::from_utf8_lossy(record.value());
/// println!("Got record: key={:?}, value={}", key, value);
/// }
/// # Ok(())
/// # }
/// ```
///
/// [`Offset`]: struct.Offset.html
/// [`ConsumerConfig`]: struct.ConsumerConfig.html
#[instrument(skip(self, offset, config))]
pub async fn stream_with_config(
&self,
offset: Offset,
config: ConsumerConfig,
) -> Result<impl Stream<Item = Result<Record, ErrorCode>>, FluvioError> {
let stream = self.stream_batches_with_config(offset, config).await?;
let partition = self.partition;
let flattened =
stream.flat_map(move |result: Result<Batch, _>| match result {
Err(e) => Either::Right(once(err(e))),
Ok(batch) => {
let base_offset = batch.base_offset;
let records = batch.own_records().into_iter().enumerate().map(
move |(relative, record)| {
Ok(Record {
partition,
offset: base_offset + relative as i64,
record,
})
},
);
Either::Left(iter(records))
}
});
Ok(flattened)
}
/// Continuously streams batches of messages, starting an offset in the consumer's partition
///
/// ```
/// # use fluvio::{PartitionConsumer, FluvioError};
/// # use fluvio::{Offset, ConsumerConfig};
/// # mod futures {
/// # pub use futures_util::stream::StreamExt;
/// # }
/// # async fn example(consumer: &PartitionConsumer) -> Result<(), FluvioError> {
/// use futures::StreamExt;
/// // Use a custom max_bytes value in the config
/// let fetch_config = ConsumerConfig::builder()
/// .max_bytes(1000)
/// .build()?;
/// let mut stream = consumer.stream_batches_with_config(Offset::beginning(), fetch_config).await?;
/// while let Some(Ok(batch)) = stream.next().await {
/// for record in batch.records() {
/// let key = record.key.as_ref().map(|key| String::from_utf8_lossy(key.as_ref()).to_string());
/// let value = String::from_utf8_lossy(record.value.as_ref()).to_string();
/// println!("Got record: key={:?}, value={}", key, value);
/// }
/// }
/// # Ok(())
/// # }
/// ```
#[instrument(skip(self, offset, config))]
pub async fn stream_batches_with_config(
&self,
offset: Offset,
config: ConsumerConfig,
) -> Result<impl Stream<Item = Result<Batch, ErrorCode>>, FluvioError> {
let stream = self.request_stream(offset, config).await?;
let flattened = stream.flat_map(|batch_result: Result<DefaultStreamFetchResponse, _>| {
let response = match batch_result {
Ok(response) => response,
Err(e) => return Either::Right(once(err(e))),
};
// If we ever get an error_code AND batches of records, we want to first send
// the records down the consumer stream, THEN an Err with the error inside.
// This way the consumer always gets to read all records that were properly
// processed before hitting an error, so that the error does not obscure those records.
let batches = response.partition.records.batches.into_iter().map(Ok);
let error = {
let code = response.partition.error_code;
match code {
ErrorCode::None => None,
_ => Some(Err(code)),
}
};
let items = batches.chain(error.into_iter());
Either::Left(iter(items))
});
Ok(flattened)
}
/// Creates a stream of `DefaultStreamFetchResponse` for older consumers who rely
/// on the internal structure of the fetch response. New clients should use the
/// `stream` and `stream_with_config` methods.
#[instrument(skip(self, config))]
async fn request_stream(
&self,
offset: Offset,
config: ConsumerConfig,
) -> Result<impl Stream<Item = Result<DefaultStreamFetchResponse, ErrorCode>>, FluvioError>
{
use fluvio_future::task::spawn;
use futures_util::stream::empty;
use fluvio_protocol::api::Request;
let replica = ReplicaKey::new(&self.topic, self.partition);
let mut serial_socket = self.pool.create_serial_socket(&replica).await?;
let offsets = fetch_offsets(&mut serial_socket, &replica).await?;
let start_absolute_offset = offset.resolve(&offsets).await?;
let end_absolute_offset = offsets.last_stable_offset;
let record_count = end_absolute_offset - start_absolute_offset;
debug!(start_absolute_offset, end_absolute_offset, record_count);
let mut stream_request = DefaultStreamFetchRequest {
topic: self.topic.to_owned(),
partition: self.partition,
fetch_offset: start_absolute_offset,
isolation: config.isolation,
max_bytes: config.max_bytes,
..Default::default()
};
// add wasm module if SPU supports it
let stream_fetch_version = serial_socket
.versions()
.lookup_version(
DefaultStreamFetchRequest::API_KEY,
DefaultStreamFetchRequest::DEFAULT_API_VERSION,
)
.unwrap_or((WASM_MODULE_API - 1) as i16);
debug!(%stream_fetch_version, "stream_fetch_version");
stream_request.derivedstream = config.derivedstream;
if let Some(smartmodule) = config.smartmodule {
if stream_fetch_version < SMART_MODULE_API as i16 {
if let SmartModuleInvocationWasm::AdHoc(wasm) = smartmodule.wasm {
let legacy_module = LegacySmartModulePayload {
wasm: SmartModuleWasmCompressed::Gzip(wasm),
kind: smartmodule.kind,
params: smartmodule.params,
};
legacy_set_wasm(stream_fetch_version, &mut stream_request, legacy_module)?;
} else {
return Err(FluvioError::Other(
"SPU does not support persistent WASM".to_owned(),
));
}
} else {
debug!("Using persistent WASM API");
stream_request.smartmodule = Some(smartmodule);
}
}
if let Some(module) = config.wasm_module {
legacy_set_wasm(stream_fetch_version, &mut stream_request, module)?;
}
let mut stream = self
.pool
.create_stream_with_version(&replica, stream_request, stream_fetch_version)
.await?;
let ft_stream = async move {
if let Some(Ok(response)) = stream.next().await {
let stream_id = response.stream_id;
trace!("first stream response: {:#?}", response);
debug!(
stream_id,
last_offset = ?response.partition.next_offset_for_fetch(),
"first stream response"
);
let publisher = OffsetPublisher::shared(0);
let mut listener = publisher.change_listner();
// update stream with received offsets
spawn(async move {
use fluvio_spu_schema::server::update_offset::{UpdateOffsetsRequest, OffsetUpdate};
loop {
let fetch_last_value = listener.listen().await;
debug!(fetch_last_value, stream_id, "received end fetch");
if fetch_last_value < 0 {
debug!("fetch last is end, terminating");
break;
} else {
debug!(
offset = fetch_last_value,
session_id = stream_id,
"sending back offset to spu"
);
let request = UpdateOffsetsRequest {
offsets: vec![OffsetUpdate {
offset: fetch_last_value,
session_id: stream_id,
}],
};
debug!(?request, "Sending offset update request:");
let response = serial_socket.send_receive(request).await;
if let Err(err) = response {
error!("error sending offset: {:#?}", err);
break;
}
}
}
debug!(stream_id, "offset fetch update loop end");
});
// send back first offset records exists
if let Some(last_offset) = response.partition.next_offset_for_fetch() {
debug!(last_offset, "notify new last offset");
publisher.update(last_offset);
}
let response_publisher = publisher.clone();
let update_stream = stream.map(move |item| {
item.map(|response| {
if let Some(last_offset) = response.partition.next_offset_for_fetch() {
debug!(last_offset, stream_id, "received last offset from spu");
response_publisher.update(last_offset);
}
response
})
.map_err(|e| {
error!(?e, "error in stream");
ErrorCode::Other(e.to_string())
})
});
Either::Left(
iter(vec![Ok(response)])
.chain(publish_stream::EndPublishSt::new(update_stream, publisher)),
)
} else {
info!("stream ended");
Either::Right(empty())
}
};
let stream = if config.disable_continuous {
TakeRecords::new(ft_stream.flatten_stream().boxed(), record_count).boxed()
} else {
ft_stream.flatten_stream().boxed()
};
Ok(stream)
}
}
fn legacy_set_wasm(
stream_fetch_version: i16,
stream_request: &mut DefaultStreamFetchRequest,
mut module: LegacySmartModulePayload,
) -> Result<(), FluvioError> {
if stream_fetch_version < WASM_MODULE_API as i16 {
return Err(FluvioError::Other("SPU does not support WASM".to_owned()));
}
if stream_fetch_version < WASM_MODULE_V2_API as i16 {
debug!("Using WASM V1 API");
let wasm = module.wasm.get_raw()?;
stream_request.wasm_module = wasm.into_owned();
} else {
debug!("Using WASM V2 API");
if stream_fetch_version < GZIP_WASM_API as i16 {
module.wasm.to_raw()?;
} else {
debug!("Using compressed WASM API");
module.wasm.to_gzip()?;
}
stream_request.wasm_payload = Some(module);
}
Ok(())
}
/// Wrap an inner record stream and only stream until a given number of records have been fetched.
///
/// This is used for "disable continuous" mode. In this mode, we first make a FetchOffsetPartitionResponse
/// in order to see the starting and ending offsets currently available for this partition.
/// Based on the starting offset the caller asks for, we can figure out the "record count", or
/// how many records from the start onward we know for sure we can stream without waiting.
/// We then use `TakeRecords` to stop the stream as soon as we reach that point, so the user
/// (e.g. on the CLI) does not spend any time waiting for new records to be produced, they are
/// simply given all the records that are already available.
struct TakeRecords<S> {
remaining: i64,
stream: S,
}
impl<S> TakeRecords<S>
where
S: Stream<Item = Result<DefaultStreamFetchResponse, ErrorCode>> + std::marker::Unpin,
{
pub fn new(stream: S, until: i64) -> Self {
Self {
remaining: until,
stream,
}
}
}
impl<S> Stream for TakeRecords<S>
where
S: Stream<Item = Result<DefaultStreamFetchResponse, ErrorCode>> + std::marker::Unpin,
{
type Item = S::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use std::{pin::Pin, task::Poll};
use futures_util::ready;
if self.remaining <= 0 {
return Poll::Ready(None);
}
let next = ready!(Pin::new(&mut self.as_mut().stream).poll_next(cx));
match next {
Some(Ok(response)) => {
// Count how many records are present in this batch's response
let count: usize = response
.partition
.records
.batches
.iter()
.map(|it| it.records().len())
.sum();
let diff = self.remaining - count as i64;
self.remaining = diff.max(0);
Poll::Ready(Some(Ok(response)))
}
other => Poll::Ready(other),
}
}
}
mod publish_stream {
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Poll, Context};
use pin_project_lite::pin_project;
use futures_util::ready;
use super::Stream;
use super::OffsetPublisher;
// signal offset when stream is done
pin_project! {
pub struct EndPublishSt<St> {
#[pin]
stream: St,
publisher: Arc<OffsetPublisher>
}
}
impl<St> EndPublishSt<St> {
pub fn new(stream: St, publisher: Arc<OffsetPublisher>) -> Self {
Self { stream, publisher }
}
}
impl<S: Stream> Stream for EndPublishSt<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
let this = self.project();
let item = ready!(this.stream.poll_next(cx));
if item.is_none() {
this.publisher.update(-1);
}
Poll::Ready(item)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
}
/// MAX FETCH BYTES
static MAX_FETCH_BYTES: Lazy<i32> = Lazy::new(|| {
use std::env;
let var_value = env::var("FLV_CLIENT_MAX_FETCH_BYTES").unwrap_or_default();
let max_bytes: i32 = var_value.parse().unwrap_or(1000000);
max_bytes
});
/// Configures the behavior of consumer fetching and streaming
#[derive(Debug, Builder, Clone)]
#[builder(build_fn(private, name = "build_impl"))]
pub struct ConsumerConfig {
#[builder(default)]
disable_continuous: bool,
#[builder(default = "*MAX_FETCH_BYTES")]
pub(crate) max_bytes: i32,
#[builder(default)]
pub(crate) isolation: Isolation,
#[builder(private, default, setter(into, strip_option))]
pub(crate) wasm_module: Option<LegacySmartModulePayload>,
#[builder(default)]
pub(crate) smartmodule: Option<SmartModuleInvocation>,
#[builder(default)]
pub(crate) derivedstream: Option<DerivedStreamInvocation>,
}
impl ConsumerConfig {
pub fn builder() -> ConsumerConfigBuilder {
ConsumerConfigBuilder::default()
}
}
impl ConsumerConfigBuilder {
pub fn build(&self) -> Result<ConsumerConfig, FluvioError> {
let config = self.build_impl().map_err(|e| {
FluvioError::ConsumerConfig(format!("Missing required config option: {}", e))
})?;
Ok(config)
}
}
/// Strategy used to select which partitions and from which topics should be streamed by the [`MultiplePartitionConsumer`]
pub enum PartitionSelectionStrategy {
/// Consume from all the partitions of a given topic
All(String),
/// Consume from a given list of topics and partitions
Multiple(Vec<(String, i32)>),
}
impl PartitionSelectionStrategy {
async fn selection(&self, spu_pool: Arc<SpuPool>) -> Result<Vec<(String, i32)>, FluvioError> {
let pairs = match self {
PartitionSelectionStrategy::All(topic) => {
let topics = spu_pool.metadata.topics();
let topic_spec = topics
.lookup_by_key(topic)
.await?
.ok_or_else(|| FluvioError::TopicNotFound(topic.to_string()))?
.spec;
let partition_count = topic_spec.partitions();
(0..partition_count)
.map(|partition| (topic.clone(), partition))
.collect::<Vec<_>>()
}
PartitionSelectionStrategy::Multiple(topic_partition) => topic_partition.to_owned(),
};
Ok(pairs)
}
}
pub struct MultiplePartitionConsumer {
strategy: PartitionSelectionStrategy,
pool: Arc<SpuPool>,
}
impl MultiplePartitionConsumer {
pub(crate) fn new(strategy: PartitionSelectionStrategy, pool: Arc<SpuPool>) -> Self {
Self { strategy, pool }
}
/// Continuously streams events from a particular offset in the selected partitions
///
/// Streaming is one of the two ways to consume events in Fluvio.
/// It is a continuous request for new records arriving in the selected partitions,
/// beginning at a particular offset. You specify the starting point of the
/// stream using an [`Offset`] and periodically receive events, either individually
/// or in batches.
///
/// If you want more fine-grained control over how records are streamed,
/// check out the [`stream_with_config`] method.
///
/// # Example
///
/// ```
/// # use fluvio::{MultiplePartitionConsumer, FluvioError};
/// # use fluvio::{Offset, ConsumerConfig};
/// # mod futures {
/// # pub use futures_util::stream::StreamExt;
/// # }
/// # async fn example(consumer: &MultiplePartitionConsumer) -> Result<(), FluvioError> {
/// use futures::StreamExt;
/// let mut stream = consumer.stream(Offset::beginning()).await?;
/// while let Some(Ok(record)) = stream.next().await {
/// let key = record.key().map(|key| String::from_utf8_lossy(key).to_string());
/// let value = String::from_utf8_lossy(record.value()).to_string();
/// println!("Got event: key={:?}, value={}", key, value);
/// }
/// # Ok(())
/// # }
/// ```
///
/// [`Offset`]: struct.Offset.html
/// [`ConsumerConfig`]: struct.ConsumerConfig.html
/// [`stream_with_config`]: struct.ConsumerConfig.html#method.stream_with_config
#[instrument(skip(self, offset))]
pub async fn stream(
&self,
offset: Offset,
) -> Result<impl Stream<Item = Result<Record, ErrorCode>>, FluvioError> {
let config = ConsumerConfig::builder().build()?;
let stream = self.stream_with_config(offset, config).await?;
Ok(stream)
}
/// Continuously streams events from a particular offset in the selected partitions
///
/// Most of the time, you shouldn't need to use a custom [`ConsumerConfig`].
/// If you don't know what these settings do, try checking out the simpler
/// [`stream`] method that uses the default streaming settings.
///
/// Streaming is one of the two ways to consume events in Fluvio.
/// It is a continuous request for new records arriving in the selected partitions,
/// beginning at a particular offset. You specify the starting point of the
/// stream using an [`Offset`] and a [`ConsumerConfig`], and periodically
/// receive events, either individually or in batches.
///
/// # Example
///
/// ```
/// # use fluvio::{MultiplePartitionConsumer, FluvioError};
/// # use fluvio::{Offset, ConsumerConfig};
/// # mod futures {
/// # pub use futures_util::stream::StreamExt;
/// # }
/// # async fn example(consumer: &MultiplePartitionConsumer) -> Result<(), FluvioError> {
/// use futures::StreamExt;
/// // Use a custom max_bytes value in the config
/// let fetch_config = ConsumerConfig::builder()
/// .max_bytes(1000)
/// .build()?;
/// let mut stream = consumer.stream_with_config(Offset::beginning(), fetch_config).await?;
/// while let Some(Ok(record)) = stream.next().await {
/// let key: Option<String> = record.key().map(|key| String::from_utf8_lossy(key).to_string());
/// let value = String::from_utf8_lossy(record.value());
/// println!("Got record: key={:?}, value={}", key, value);
/// }
/// # Ok(())
/// # }
/// ```
///
/// [`Offset`]: struct.Offset.html
/// [`ConsumerConfig`]: struct.ConsumerConfig.html
#[instrument(skip(self, offset, config))]
pub async fn stream_with_config(
&self,
offset: Offset,
config: ConsumerConfig,
) -> Result<impl Stream<Item = Result<Record, ErrorCode>>, FluvioError> {
let consumers = self
.strategy
.selection(self.pool.clone())
.await?
.into_iter()
.map(|(topic, partition)| PartitionConsumer::new(topic, partition, self.pool.clone()))
.collect::<Vec<_>>();
let streams_future = consumers
.iter()
.map(|consumer| consumer.stream_with_config(offset.clone(), config.clone()));
let streams_result = join_all(streams_future).await;
let streams = streams_result.into_iter().collect::<Result<Vec<_>, _>>()?;
Ok(select_all(streams))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_consumer_config_default() {
let _config = ConsumerConfig::builder().build().unwrap();
}
}