#![allow(dead_code)]
mod config;
mod stream;
mod offset;
mod retry;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use adaptive_backoff::prelude::{
Backoff, BackoffBuilder, ExponentialBackoff, ExponentialBackoffBuilder,
};
use anyhow::Result;
use async_channel::Sender;
use async_lock::Mutex;
use fluvio_future::timer::sleep;
use fluvio_socket::VersionedSerialSocket;
use fluvio_spu_schema::server::consumer_offset::{
FetchConsumerOffsetsRequest, UpdateConsumerOffsetRequest,
};
use tracing::{debug, error, trace, instrument, info, warn};
use futures_util::stream::{Stream, select_all};
use once_cell::sync::Lazy;
use futures_util::future::{Either, err, try_join_all};
use futures_util::stream::{StreamExt, once, iter};
use futures_util::FutureExt;
use fluvio_types::PartitionId;
use fluvio_types::defaults::{
CONSUMER_REPLICA_KEY, FLUVIO_CLIENT_MAX_FETCH_BYTES, FLUVIO_MAX_SIZE_TOPIC_NAME,
RECONNECT_BACKOFF_FACTOR, RECONNECT_BACKOFF_MAX_DURATION, RECONNECT_BACKOFF_MIN_DURATION,
};
use fluvio_spu_schema::server::stream_fetch::{
DefaultStreamFetchRequest, DefaultStreamFetchResponse, CHAIN_SMARTMODULE_API,
OFFSET_MANAGEMENT_API,
};
use fluvio_protocol::record::ReplicaKey;
use fluvio_protocol::link::ErrorCode;
use fluvio_protocol::record::Batch;
use crate::FluvioError;
use crate::metrics::ClientMetrics;
use crate::offset::{Offset, fetch_offsets};
use crate::spu::{SpuDirectory, SpuSocketPool};
pub use config::{ConsumerConfig, ConsumerConfigBuilder};
pub use config::{ConsumerConfigExt, ConsumerConfigExtBuilder, OffsetManagementStrategy, RetryMode};
pub use stream::{
ConsumerStream, MultiplePartitionConsumerStream, SinglePartitionConsumerStream,
ConsumerBoxFuture,
};
pub use offset::ConsumerOffset;
pub use retry::ConsumerRetryStream;
pub use fluvio_protocol::record::ConsumerRecord;
pub use fluvio_protocol::record::ConsumerRecord as Record;
pub use fluvio_spu_schema::server::smartmodule::SmartModuleInvocation;
pub use fluvio_spu_schema::server::smartmodule::SmartModuleInvocationWasm;
pub use fluvio_spu_schema::server::smartmodule::SmartModuleKind;
pub use fluvio_spu_schema::server::smartmodule::SmartModuleContextData;
pub use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
const STREAM_TO_SERVER_CHANNEL_SIZE: usize = 100;
const MAX_ATTEMPTS_CONSUMER_OFFSET: usize = 30;
#[cfg(target_arch = "wasm32")]
pub type BoxConsumerStream =
Pin<Box<dyn ConsumerStream<Item = Result<ConsumerRecord, ErrorCode>> + 'static>>;
#[cfg(not(target_arch = "wasm32"))]
pub type BoxConsumerStream =
Pin<Box<dyn ConsumerStream<Item = Result<ConsumerRecord, ErrorCode>> + Send + 'static>>;
type ShararedConsumerStream = Arc<Mutex<BoxConsumerStream>>;
type ConsumerFutureOutput = (
ShararedConsumerStream,
Option<Result<(ConsumerRecord, Option<i64>), ErrorCode>>,
);
#[cfg(target_arch = "wasm32")]
type BoxConsumerFuture = Pin<Box<dyn Future<Output = ConsumerFutureOutput> + 'static>>;
#[cfg(not(target_arch = "wasm32"))]
type BoxConsumerFuture = Pin<Box<dyn Future<Output = ConsumerFutureOutput> + Send + 'static>>;
pub struct PartitionConsumer<P = SpuSocketPool> {
topic: String,
partition: PartitionId,
pool: Arc<P>,
metrics: Arc<ClientMetrics>,
}
impl<P> Clone for PartitionConsumer<P> {
fn clone(&self) -> Self {
Self {
topic: self.topic.clone(),
partition: self.partition,
pool: self.pool.clone(),
metrics: self.metrics.clone(),
}
}
}
impl<P> PartitionConsumer<P>
where
P: SpuDirectory + 'static,
{
pub fn new(
topic: String,
partition: PartitionId,
pool: Arc<P>,
metrics: Arc<ClientMetrics>,
) -> Self {
Self {
topic,
partition,
pool,
metrics,
}
}
pub fn topic(&self) -> &str {
&self.topic
}
pub fn partition(&self) -> PartitionId {
self.partition
}
pub fn metrics(&self) -> Arc<ClientMetrics> {
self.metrics.clone()
}
#[instrument(skip(self, offset))]
#[deprecated(
since = "0.21.8",
note = "use `Fluvio::consumer_with_config()` instead"
)]
#[allow(deprecated)]
pub async fn stream(
&self,
offset: Offset,
) -> Result<impl Stream<Item = Result<Record, ErrorCode>> + use<P>> {
let config = ConsumerConfig::builder().build()?;
let stream = self.stream_with_config(offset, config).await?;
Ok(stream)
}
#[instrument(skip(self, offset, config))]
#[deprecated(
since = "0.21.8",
note = "use `Fluvio::consumer_with_config()` instead"
)]
pub async fn stream_with_config(
&self,
offset: Offset,
config: ConsumerConfig,
) -> Result<impl Stream<Item = Result<Record, ErrorCode>> + use<P>> {
let (stream, start_offset, _) = self
.inner_stream_batches_with_config(offset, config, None)
.await?;
let partition = self.partition;
let flattened = stream.flat_map(move |result: Result<Batch, _>| match result {
Err(e) => Either::Right(once(err(e))),
Ok(batch) => {
let records =
batch
.into_consumer_records_iter(partition)
.filter_map(move |record| {
if record.offset >= start_offset {
Some(Ok(record))
} else {
None
}
});
Either::Left(iter(records))
}
});
Ok(flattened)
}
#[instrument(skip(self, offset, config))]
pub async fn stream_batches_with_config(
&self,
offset: Offset,
config: ConsumerConfig,
) -> Result<impl Stream<Item = Result<Batch, ErrorCode>> + use<P>> {
let (stream, _start_offset, _) = self
.inner_stream_batches_with_config(offset, config, None)
.await?;
Ok(stream)
}
#[instrument(skip(self, offset, config))]
async fn inner_stream_batches_with_config(
&self,
offset: Offset,
config: ConsumerConfig,
consumer_id: Option<String>,
) -> Result<(
impl Stream<Item = Result<Batch, ErrorCode>> + use<P>,
fluvio_protocol::record::Offset,
Sender<StreamToServer>,
)> {
let (stream, start_offset, stream_to_server) =
self.request_stream(offset, config, consumer_id).await?;
let metrics = self.metrics.clone();
let flattened =
stream.flat_map(move |batch_result: Result<DefaultStreamFetchResponse, _>| {
let response = match batch_result {
Ok(response) => response,
Err(e) => return Either::Right(once(err(e))),
};
let inner_metrics = metrics.clone();
let batches =
response
.partition
.records
.batches
.into_iter()
.map(move |raw_batch| {
inner_metrics
.consumer()
.add_records(raw_batch.records_len() as u64);
inner_metrics
.consumer()
.add_bytes(raw_batch.batch_len() as u64);
let batch: Result<Batch, _> = raw_batch.try_into();
match batch {
Ok(batch) => Ok(batch),
Err(err) => {
tracing::error!("{err:?}");
Err(ErrorCode::Other(err.to_string()))
}
}
});
let error = {
let code = response.partition.error_code;
match code {
ErrorCode::None => None,
_ => Some(Err(code)),
}
};
let items = batches.chain(error.into_iter());
Either::Left(iter(items))
});
Ok((flattened, start_offset, stream_to_server))
}
#[instrument(skip(self, config))]
async fn request_stream(
&self,
offset: Offset,
config: ConsumerConfig,
consumer_id: Option<String>,
) -> Result<(
impl Stream<Item = Result<DefaultStreamFetchResponse, ErrorCode>> + use<P>,
fluvio_protocol::record::Offset,
Sender<StreamToServer>,
)> {
use fluvio_future::task::spawn;
use futures_util::stream::empty;
let replica = ReplicaKey::new(&self.topic, self.partition);
let mut serial_socket = self.pool.create_serial_socket(&replica).await?;
let consumer_offset = if let Some(ref consumer_id) = consumer_id {
let consumer_offset_socket = self.create_serial_socket_retry().await?;
let response = consumer_offset_socket
.send_receive(FetchConsumerOffsetsRequest::with_opts(
Some((self.topic.to_owned(), self.partition).into()),
Some(consumer_id.clone()),
))
.await?;
if response.error_code != ErrorCode::None {
error!("Error getting consumer offset: {:#?}", response.error_code);
return Err(response.error_code.into());
}
response
.consumers
.iter()
.map(|consumer| consumer.offset + 1)
.next()
} else {
None
};
let offsets = fetch_offsets(&mut serial_socket, &replica).await?;
let start_absolute_offset = offset.resolve(&offsets, consumer_offset).await?;
let end_absolute_offset = offsets.last_stable_offset;
let record_count = end_absolute_offset - start_absolute_offset;
debug!(start_absolute_offset, end_absolute_offset, record_count);
let with_consumer_id = consumer_id.is_some();
let stream_request = DefaultStreamFetchRequest::builder()
.topic(self.topic.to_owned())
.partition(self.partition)
.fetch_offset(start_absolute_offset)
.isolation(config.isolation)
.max_bytes(config.max_bytes)
.smartmodules(config.smartmodule)
.consumer_id(consumer_id)
.build()?;
let stream_fetch_version = serial_socket
.versions()
.lookup_version::<DefaultStreamFetchRequest>()
.unwrap_or(CHAIN_SMARTMODULE_API - 1);
debug!(%stream_fetch_version, "stream_fetch_version");
if stream_fetch_version < CHAIN_SMARTMODULE_API {
warn!(
"SPU does not support SmartModule chaining. SmartModules will not be applied to the stream"
);
}
if with_consumer_id && stream_fetch_version < OFFSET_MANAGEMENT_API {
warn!("SPU does not support Offset Management API");
}
let mut stream = self
.pool
.create_stream_with_version(&replica, stream_request, stream_fetch_version)
.await?;
let (server_sender, server_recv) =
async_channel::bounded::<StreamToServer>(STREAM_TO_SERVER_CHANNEL_SIZE);
let server_sender_clone = server_sender.clone();
let ft_stream = async move {
if let Some(Ok(raw_response)) = stream.next().await {
let response: DefaultStreamFetchResponse = raw_response;
let stream_id = response.stream_id;
trace!("first stream response: {:#?}", response);
debug!(
stream_id,
last_offset = ?response.partition.next_offset_for_fetch(),
"first stream response"
);
spawn(async move {
use fluvio_spu_schema::server::update_offset::{UpdateOffsetsRequest, OffsetUpdate};
loop {
match server_recv.recv().await {
Ok(StreamToServer::UpdateOffset(fetch_last_value)) => {
debug!(fetch_last_value, stream_id, "received end fetch");
debug!(
offset = fetch_last_value,
session_id = stream_id,
"sending back offset to spu"
);
let request = UpdateOffsetsRequest {
offsets: vec![OffsetUpdate {
offset: fetch_last_value,
session_id: stream_id,
}],
};
debug!(?request, "Sending offset update request:");
let response = serial_socket.send_receive(request).await;
if let Err(err) = response {
error!("error sending offset: {:#?}", err);
break;
}
}
Ok(StreamToServer::Close) => {
debug!("fetch last is end, terminating");
break;
}
Ok(StreamToServer::FlushManagedOffset { offset, callback }) => {
debug!(offset, stream_id, "flush offset request");
let request = UpdateConsumerOffsetRequest {
session_id: stream_id,
offset,
};
let response = serial_socket.send_receive(request).await;
match response {
Ok(response) => callback.send(response.error_code).await,
Err(err) => {
error!("offset flush request error: {:?}", err);
callback
.send(ErrorCode::OffsetFlushRequestError(
err.to_string(),
))
.await;
break;
}
};
}
Err(err) => {
debug!("stream to server channel closed: {err:?}");
break;
}
}
}
debug!(stream_id, "offset fetch update loop end");
});
if let Some(last_offset) = response.partition.next_offset_for_fetch() {
debug!(last_offset, "notify new last offset");
let _ = server_sender_clone
.send(StreamToServer::UpdateOffset(last_offset))
.await;
}
let server_sender_clone2 = server_sender_clone.clone();
let update_stream = StreamExt::map(stream, move |item| {
item.inspect(|response| {
if let Some(last_offset) = response.partition.next_offset_for_fetch() {
debug!(last_offset, stream_id, "received last offset from spu");
let _ = server_sender_clone
.try_send(StreamToServer::UpdateOffset(last_offset));
}
})
.map_err(|e| {
error!(?e, "error in stream");
ErrorCode::Other(e.to_string())
})
});
Either::Left(
iter(vec![Ok(response)]).chain(publish_stream::EndPublishSt::new(
update_stream,
server_sender_clone2,
)),
)
} else {
info!("stream ended");
Either::Right(empty())
}
};
let stream = if config.disable_continuous {
TakeRecords::new(ft_stream.flatten_stream().boxed(), record_count).boxed()
} else {
ft_stream.flatten_stream().boxed()
};
Ok((stream, start_absolute_offset, server_sender))
}
async fn create_serial_socket_retry(&self) -> Result<VersionedSerialSocket> {
let mut attempts = 0;
let mut backoff = create_backoff()?;
loop {
match self
.pool
.create_serial_socket(&CONSUMER_REPLICA_KEY.into())
.await
{
Ok(socket) => return Ok(socket),
Err(err) => {
error!("Failed to create consumer offset socket: {:#?}", err);
backoff_and_wait(&mut backoff).await;
attempts += 1;
if attempts >= MAX_ATTEMPTS_CONSUMER_OFFSET {
return Err(ErrorCode::Other(
"Failed to create consumer offset socket".to_string(),
)
.into());
}
}
};
}
}
#[instrument(skip(self, config))]
pub(crate) async fn consumer_stream_with_config(
self,
config: ConsumerConfigExt,
) -> Result<SinglePartitionConsumerStream<impl Stream<Item = Result<Record, ErrorCode>> + use<P>>>
{
let (offset, config, consumer_id, strategy, flush_period, flusher_check_period) =
config.into_parts();
let (stream, start_offset, stream_to_server) = self
.inner_stream_batches_with_config(offset, config, consumer_id)
.await?;
let partition = self.partition;
let flattened = stream.flat_map(move |result: Result<Batch, _>| match result {
Err(e) => Either::Right(once(err(e))),
Ok(batch) => {
let records =
batch
.into_consumer_records_iter(partition)
.filter_map(move |record| {
if record.offset >= start_offset {
Some(Ok(record))
} else {
None
}
});
Either::Left(iter(records))
}
});
Ok(SinglePartitionConsumerStream::new(
flattened,
strategy,
flush_period,
flusher_check_period,
stream_to_server,
))
}
}
struct TakeRecords<S> {
remaining: i64,
stream: S,
}
impl<S> TakeRecords<S>
where
S: Stream<Item = Result<DefaultStreamFetchResponse, ErrorCode>> + std::marker::Unpin,
{
pub fn new(stream: S, until: i64) -> Self {
Self {
remaining: until,
stream,
}
}
}
impl<S> Stream for TakeRecords<S>
where
S: Stream<Item = Result<DefaultStreamFetchResponse, ErrorCode>> + std::marker::Unpin,
{
type Item = S::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use std::{pin::Pin, task::Poll};
use futures_util::ready;
if self.remaining <= 0 {
return Poll::Ready(None);
}
let next = ready!(Pin::new(&mut self.as_mut().stream).poll_next(cx));
match next {
Some(Ok(response)) => {
let count: usize = response
.partition
.records
.batches
.iter()
.map(|it| it.records_len())
.sum();
let diff = self.remaining - count as i64;
self.remaining = diff.max(0);
Poll::Ready(Some(Ok(response)))
}
other => Poll::Ready(other),
}
}
}
mod publish_stream {
use std::pin::Pin;
use std::task::{Poll, Context};
use async_channel::Sender;
use pin_project::pin_project;
use futures_util::ready;
use super::{Stream, StreamToServer};
#[pin_project]
pub struct EndPublishSt<St> {
#[pin]
stream: St,
publisher: Sender<StreamToServer>,
}
impl<St> EndPublishSt<St> {
pub fn new(stream: St, publisher: Sender<StreamToServer>) -> Self {
Self { stream, publisher }
}
}
impl<S: Stream> Stream for EndPublishSt<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
let this = self.project();
let item = ready!(this.stream.poll_next(cx));
if item.is_none() {
let _ = this.publisher.try_send(StreamToServer::Close);
}
Poll::Ready(item)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
}
static MAX_FETCH_BYTES: Lazy<i32> = Lazy::new(|| {
use std::env;
use fluvio_protocol::Encoder;
use fluvio_spu_schema::fetch::FetchResponse;
use fluvio_spu_schema::fetch::FetchableTopicResponse;
use fluvio_spu_schema::fetch::FetchablePartitionResponse;
use fluvio_protocol::record::MemoryRecords;
let var_value = env::var("FLV_CLIENT_MAX_FETCH_BYTES").unwrap_or_default();
let max_bytes: i32 = var_value.parse().unwrap_or_else(|_| {
FetchResponse::<MemoryRecords>::default().write_size(0) as i32
+ FetchableTopicResponse::<MemoryRecords>::default().write_size(0) as i32
+ FetchablePartitionResponse::<MemoryRecords>::default().write_size(0) as i32
+ FLUVIO_MAX_SIZE_TOPIC_NAME as i32 + FLUVIO_CLIENT_MAX_FETCH_BYTES
});
max_bytes
});
#[derive(Clone)]
pub enum PartitionSelectionStrategy {
All(String),
Multiple(Vec<(String, PartitionId)>),
}
impl PartitionSelectionStrategy {
async fn selection(&self, spu_pool: Arc<SpuSocketPool>) -> Result<Vec<(String, PartitionId)>> {
let pairs = match self {
PartitionSelectionStrategy::All(topic) => {
let topics = spu_pool.metadata.topics();
let topic_spec = topics
.lookup_by_key(topic)
.await?
.ok_or_else(|| FluvioError::TopicNotFound(topic.to_string()))?
.spec;
let partition_count = topic_spec.partitions();
(0..(partition_count as PartitionId))
.map(|partition| (topic.clone(), partition))
.collect::<Vec<_>>()
}
PartitionSelectionStrategy::Multiple(topic_partition) => topic_partition.to_owned(),
};
Ok(pairs)
}
}
#[derive(Clone)]
pub struct MultiplePartitionConsumer {
strategy: PartitionSelectionStrategy,
pool: Arc<SpuSocketPool>,
metrics: Arc<ClientMetrics>,
}
impl MultiplePartitionConsumer {
pub(crate) fn new(
strategy: PartitionSelectionStrategy,
pool: Arc<SpuSocketPool>,
metrics: Arc<ClientMetrics>,
) -> Self {
Self {
strategy,
pool,
metrics,
}
}
#[instrument(skip(self, offset))]
#[deprecated(
since = "0.21.8",
note = "use `Fluvio::consumer_with_config()` instead"
)]
#[allow(deprecated)]
pub async fn stream(
&self,
offset: Offset,
) -> Result<impl Stream<Item = Result<Record, ErrorCode>> + use<>> {
let config = ConsumerConfig::builder().build()?;
let stream = self.stream_with_config(offset, config).await?;
Ok(stream)
}
#[instrument(skip(self, offset, config))]
#[deprecated(
since = "0.21.8",
note = "use `Fluvio::consumer_with_config()` instead"
)]
#[allow(deprecated)]
pub async fn stream_with_config(
&self,
offset: Offset,
config: ConsumerConfig,
) -> Result<impl Stream<Item = Result<Record, ErrorCode>> + use<>> {
let consumers: Vec<_> = self
.strategy
.selection(self.pool.clone())
.await?
.into_iter()
.map(|(topic, partition)| {
PartitionConsumer::new(
topic,
partition as PartitionId,
self.pool.clone(),
self.metrics.clone(),
)
})
.collect();
let stream_futures = consumers.into_iter().map(|consumer| {
let offset = offset.clone();
let config = config.clone();
async move { consumer.stream_with_config(offset, config).await }
});
let streams = try_join_all(stream_futures).await?;
Ok(select_all(streams))
}
}
#[derive(Debug, Clone)]
pub(crate) enum StreamToServer {
UpdateOffset(i64),
FlushManagedOffset {
offset: i64,
callback: StreamToServerCallback<ErrorCode>,
},
Close,
}
#[derive(Debug, Clone)]
pub(crate) enum StreamToServerCallback<T> {
NoOp,
Channel(Sender<T>),
}
impl<T> StreamToServerCallback<T> {
pub(crate) async fn send(&self, value: T) {
match self {
Self::NoOp => {}
Self::Channel(channel) => {
if let Err(err) = channel.send(value).await {
error!("stream callback error: {err:?}");
}
}
}
}
}
fn create_backoff() -> Result<ExponentialBackoff> {
ExponentialBackoffBuilder::default()
.factor(RECONNECT_BACKOFF_FACTOR)
.min(RECONNECT_BACKOFF_MIN_DURATION)
.max(RECONNECT_BACKOFF_MAX_DURATION)
.build()
}
async fn backoff_and_wait(backoff: &mut ExponentialBackoff) {
let wait_duration = backoff.wait();
let _ = sleep(wait_duration).await;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_consumer_config_default() {
let _config = ConsumerConfig::builder().build().unwrap();
}
}