use std::sync::Arc;
use futures_util::stream::Stream;
use tracing::{debug, error, trace};
use once_cell::sync::Lazy;
use futures_util::future::{Either, err};
use futures_util::stream::{StreamExt, once, iter};
use fluvio_spu_schema::server::stream_fetch::{DefaultStreamFetchRequest, DefaultStreamFetchResponse};
use dataplane::Isolation;
use dataplane::ReplicaKey;
use dataplane::ErrorCode;
use dataplane::fetch::DefaultFetchRequest;
use dataplane::fetch::FetchPartition;
use dataplane::fetch::FetchableTopic;
use dataplane::fetch::FetchablePartitionResponse;
use dataplane::record::RecordSet;
use dataplane::record::DefaultRecord;
use dataplane::batch::DefaultBatch;
use fluvio_types::event::offsets::OffsetPublisher;
use crate::FluvioError;
use crate::offset::Offset;
use crate::client::SerialFrame;
use crate::spu::SpuPool;
/// An interface for consuming events from a particular partition
///
/// There are two ways to consume events: by "fetching" events
/// and by "streaming" events. Fetching involves specifying a
/// range of events that you want to consume via their [`Offset`].
/// A fetch is a sort of one-time batch operation: you'll receive
/// all of the events in your range all at once. When you consume
/// events via Streaming, you specify a starting [`Offset`] and
/// receive an object that will continuously yield new events as
/// they arrive.
///
/// # Creating a Consumer
///
/// You can create a `PartitionConsumer` via the [`partition_consumer`]
/// method on the [`Fluvio`] client, like so:
///
/// ```
/// # use fluvio::{Fluvio, Offset, ConsumerConfig, FluvioError};
/// # async fn example(fluvio: &Fluvio) -> Result<(), FluvioError> {
/// let consumer = fluvio.partition_consumer("my-topic", 0).await?;
/// let records = consumer.fetch(Offset::beginning()).await?;
/// # Ok(())
/// # }
/// ```
///
/// [`Offset`]: struct.Offset.html
/// [`partition_consumer`]: struct.Fluvio.html#method.partition_consumer
/// [`Fluvio`]: struct.Fluvio.html
pub struct PartitionConsumer {
topic: String,
partition: i32,
pool: Arc<SpuPool>,
}
impl PartitionConsumer {
pub(crate) fn new(topic: String, partition: i32, pool: Arc<SpuPool>) -> Self {
Self {
topic,
partition,
pool,
}
}
/// Returns the name of the Topic that this consumer reads from
pub fn topic(&self) -> &str {
&self.topic
}
/// Returns the ID of the partition that this consumer reads from
pub fn partition(&self) -> i32 {
self.partition
}
/// Fetches events from a particular offset in the consumer's partition
///
/// A "fetch" is one of the two ways to consume events in Fluvio.
/// It is a batch request for records from a particular offset in
/// the partition. You specify the position of records to retrieve
/// using an [`Offset`], and receive the events as a list of records.
///
/// If you want more fine-grained control over how records are fetched,
/// check out the [`fetch_with_config`] method.
///
/// # Example
///
/// ```
/// # use fluvio::{PartitionConsumer, Offset, ConsumerConfig, FluvioError};
/// # async fn example(consumer: &PartitionConsumer) -> Result<(), FluvioError> {
/// let response = consumer.fetch(Offset::beginning()).await?;
/// for batch in response.records.batches {
/// for record in batch.records() {
/// let string = String::from_utf8_lossy(record.value.as_ref());
/// println!("Got record: {}", string);
/// }
/// }
/// # Ok(())
/// # }
/// ```
///
/// [`Offset`]: struct.Offset.html
/// [`fetch_with_config`]: struct.PartitionConsumer.html#method.fetch_with_config
pub async fn fetch(
&self,
offset: Offset,
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError> {
let records = self
.fetch_with_config(offset, ConsumerConfig::default())
.await?;
Ok(records)
}
/// Fetches events from a consumer using a specific fetching configuration
///
/// Most of the time, you shouldn't need to use a custom [`ConsumerConfig`].
/// If you don't know what these settings do, try checking out the simpler
/// [`fetch`] method that uses the default fetching settings.
///
/// A "fetch" is one of the two ways to consume events in Fluvio.
/// It is a batch request for records from a particular offset in
/// the partition. You specify the range of records to retrieve
/// using an [`Offset`] and a [`ConsumerConfig`], and receive
/// the events as a list of records.
///
/// # Example
///
/// ```
/// # use fluvio::{PartitionConsumer, FluvioError, Offset, ConsumerConfig};
/// # async fn example(consumer: &PartitionConsumer) -> Result<(), FluvioError> {
/// // Use custom fetching configurations
/// let fetch_config = ConsumerConfig::default()
/// .with_max_bytes(1000);
///
/// let response = consumer.fetch_with_config(Offset::beginning(), fetch_config).await?;
/// for batch in response.records.batches {
/// for record in batch.records() {
/// let string = String::from_utf8_lossy(record.value.as_ref());
/// println!("Got record: {}", string);
/// }
/// }
/// # Ok(())
/// # }
/// ```
///
/// [`ConsumerConfig`]: struct.ConsumerConfig.html
/// [`fetch`]: struct.PartitionConsumer.html#method.fetch
/// [`Offset`]: struct.Offset.html
pub async fn fetch_with_config(
&self,
offset: Offset,
option: ConsumerConfig,
) -> Result<FetchablePartitionResponse<RecordSet>, FluvioError> {
let replica = ReplicaKey::new(&self.topic, self.partition);
debug!(
"starting fetch log once: {:#?} from replica: {}",
offset, &replica,
);
let mut leader = self.pool.create_serial_socket(&replica).await?;
debug!("found spu leader {}", leader);
let offset = offset
.to_absolute(&mut leader, &self.topic, self.partition)
.await?;
let partition = FetchPartition {
partition_index: self.partition,
fetch_offset: offset,
max_bytes: option.max_bytes,
..Default::default()
};
let topic_request = FetchableTopic {
name: self.topic.to_owned(),
fetch_partitions: vec![partition],
};
let fetch_request = DefaultFetchRequest {
topics: vec![topic_request],
isolation_level: option.isolation,
max_bytes: option.max_bytes,
..Default::default()
};
let response = leader.send_receive(fetch_request).await?;
debug!("received fetch logs for {}", &replica);
if let Some(partition_response) = response.find_partition(&self.topic, self.partition) {
debug!(
"found partition response with: {} batches: {} bytes",
partition_response.records.batches.len(),
bytes_count(&partition_response.records)
);
Ok(partition_response)
} else {
Err(FluvioError::PartitionNotFound(
self.topic.clone(),
self.partition,
))
}
}
/// Continuously streams events from a particular offset in the consumer's partition
///
/// Streaming is one of the two ways to consume events in Fluvio.
/// It is a continuous request for new records arriving in a partition,
/// beginning at a particular offset. You specify the starting point of the
/// stream using an [`Offset`] and periodically receive events, either individually
/// or in batches.
///
/// If you want more fine-grained control over how records are streamed,
/// check out the [`stream_with_config`] method.
///
/// # Example
///
/// ```
/// # use fluvio::{PartitionConsumer, FluvioError};
/// # use fluvio::{Offset, ConsumerConfig};
/// # mod futures {
/// # pub use futures_util::stream::StreamExt;
/// # }
/// # async fn example(consumer: &PartitionConsumer) -> Result<(), FluvioError> {
/// use futures::StreamExt;
/// let mut stream = consumer.stream(Offset::beginning()).await?;
/// while let Some(Ok(record)) = stream.next().await {
/// let key = record.key().map(|key| String::from_utf8_lossy(key).to_string());
/// let value = String::from_utf8_lossy(record.value()).to_string();
/// println!("Got event: key={:?}, value={}", key, value);
/// }
/// # Ok(())
/// # }
/// ```
///
/// [`Offset`]: struct.Offset.html
/// [`ConsumerConfig`]: struct.ConsumerConfig.html
/// [`stream_with_config`]: struct.ConsumerConfig.html#method.stream_with_config
pub async fn stream(
&self,
offset: Offset,
) -> Result<impl Stream<Item = Result<Record, FluvioError>>, FluvioError> {
let stream = self
.stream_with_config(offset, ConsumerConfig::default())
.await?;
Ok(stream)
}
/// Continuously streams events from a particular offset in the consumer's partition
///
/// Most of the time, you shouldn't need to use a custom [`ConsumerConfig`].
/// If you don't know what these settings do, try checking out the simpler
/// [`stream`] method that uses the default streaming settings.
///
/// Streaming is one of the two ways to consume events in Fluvio.
/// It is a continuous request for new records arriving in a partition,
/// beginning at a particular offset. You specify the starting point of the
/// stream using an [`Offset`] and a [`ConsumerConfig`], and periodically
/// receive events, either individually or in batches.
///
/// # Example
///
/// ```
/// # use fluvio::{PartitionConsumer, FluvioError};
/// # use fluvio::{Offset, ConsumerConfig};
/// # mod futures {
/// # pub use futures_util::stream::StreamExt;
/// # }
/// # async fn example(consumer: &PartitionConsumer) -> Result<(), FluvioError> {
/// use futures::StreamExt;
/// // Use a custom max_bytes value in the config
/// let fetch_config = ConsumerConfig::default()
/// .with_max_bytes(1000);
/// let mut stream = consumer.stream_with_config(Offset::beginning(), fetch_config).await?;
/// while let Some(Ok(record)) = stream.next().await {
/// let key: Option<String> = record.key().map(|key| String::from_utf8_lossy(key).to_string());
/// let value = String::from_utf8_lossy(record.value());
/// println!("Got record: key={:?}, value={}", key, value);
/// }
/// # Ok(())
/// # }
/// ```
///
/// [`Offset`]: struct.Offset.html
/// [`ConsumerConfig`]: struct.ConsumerConfig.html
pub async fn stream_with_config(
&self,
offset: Offset,
config: ConsumerConfig,
) -> Result<impl Stream<Item = Result<Record, FluvioError>>, FluvioError> {
let stream = self.stream_batches_with_config(offset, config).await?;
let flattened =
stream.flat_map(|result: Result<DefaultBatch, _>| match result {
Err(e) => Either::Right(once(err(e))),
Ok(batch) => {
let base_offset = batch.base_offset;
let records = batch.own_records().into_iter().enumerate().map(
move |(relative, record)| {
Ok(Record {
offset: base_offset + relative as i64,
record,
})
},
);
Either::Left(iter(records))
}
});
Ok(flattened)
}
/// Continuously streams batches of messages, starting an offset in the consumer's partition
///
/// ```
/// # use fluvio::{PartitionConsumer, FluvioError};
/// # use fluvio::{Offset, ConsumerConfig};
/// # mod futures {
/// # pub use futures_util::stream::StreamExt;
/// # }
/// # async fn example(consumer: &PartitionConsumer) -> Result<(), FluvioError> {
/// use futures::StreamExt;
/// // Use a custom max_bytes value in the config
/// let fetch_config = ConsumerConfig::default()
/// .with_max_bytes(1000);
/// let mut stream = consumer.stream_batches_with_config(Offset::beginning(), fetch_config).await?;
/// while let Some(Ok(batch)) = stream.next().await {
/// for record in batch.records() {
/// let key = record.key.as_ref().map(|key| String::from_utf8_lossy(key.as_ref()).to_string());
/// let value = String::from_utf8_lossy(record.value.as_ref()).to_string();
/// println!("Got record: key={:?}, value={}", key, value);
/// }
/// }
/// # Ok(())
/// # }
/// ```
pub async fn stream_batches_with_config(
&self,
offset: Offset,
config: ConsumerConfig,
) -> Result<impl Stream<Item = Result<DefaultBatch, FluvioError>>, FluvioError> {
let stream = self.request_stream(offset, config).await?;
let flattened = stream.flat_map(|batch_result: Result<DefaultStreamFetchResponse, _>| {
let response: DefaultStreamFetchResponse = match batch_result {
// If error code is None, continue
Ok(response) if response.partition.error_code == ErrorCode::None => response,
// If error code is anything else, wrap it in an error
Ok(response) => {
let code = response.partition.error_code;
return Either::Right(once(err(FluvioError::ApiError(
fluvio_sc_schema::ApiError::Code(code, None),
))));
}
Err(e) => return Either::Right(once(err(e))),
};
let batches = response.partition.records.batches.into_iter().map(Ok);
Either::Left(iter(batches))
});
Ok(flattened)
}
/// Creates a stream of `DefaultStreamFetchResponse` for older consumers who rely
/// on the internal structure of the fetch response. New clients should use the
/// `stream` and `stream_with_config` methods.
async fn request_stream(
&self,
offset: Offset,
config: ConsumerConfig,
) -> Result<impl Stream<Item = Result<DefaultStreamFetchResponse, FluvioError>>, FluvioError>
{
use fluvio_future::task::spawn;
use futures_util::stream::empty;
use fluvio_spu_schema::server::stream_fetch::WASM_MODULE_API;
use fluvio_protocol::api::Request;
let replica = ReplicaKey::new(&self.topic, self.partition);
debug!(
"starting fetch log once: {:#?} from replica: {}",
offset, &replica,
);
let mut serial_socket = self.pool.create_serial_socket(&replica).await?;
trace!("created serial socket {}", serial_socket);
let start_absolute_offset = offset
.to_absolute(&mut serial_socket, &self.topic, self.partition)
.await?;
let mut stream_request = DefaultStreamFetchRequest {
topic: self.topic.to_owned(),
partition: self.partition,
fetch_offset: start_absolute_offset,
isolation: config.isolation,
max_bytes: config.max_bytes,
..Default::default()
};
// add wasm module if SPU supports it
let stream_fetch_version = serial_socket
.versions()
.lookup_version(DefaultStreamFetchRequest::API_KEY)
.unwrap_or((WASM_MODULE_API - 1) as i16);
if !config.wasm_module.is_empty() {
if stream_fetch_version >= WASM_MODULE_API as i16 {
stream_request.wasm_module = config.wasm_module;
} else {
return Err(FluvioError::Other("SPU does not support WASM".to_owned()));
}
}
let mut stream = self
.pool
.create_stream_with_version(&replica, stream_request, stream_fetch_version)
.await?;
if let Some(Ok(response)) = stream.next().await {
let stream_id = response.stream_id;
trace!("first stream response: {:#?}", response);
debug!(
stream_id,
last_offset = ?response.partition.next_offset_for_fetch(),
"first stream response"
);
let publisher = OffsetPublisher::shared(0);
let mut listener = publisher.change_listner();
// update stream with received offsets
spawn(async move {
use fluvio_spu_schema::server::update_offset::{UpdateOffsetsRequest, OffsetUpdate};
loop {
let fetch_last_value = listener.listen().await;
debug!(fetch_last_value, stream_id, "received end fetch");
if fetch_last_value < 0 {
debug!("fetch last is end, terminating");
break;
} else {
debug!(
offset = fetch_last_value,
session_id = stream_id,
"sending back offset to spu"
);
let response = serial_socket
.send_receive(UpdateOffsetsRequest {
offsets: vec![OffsetUpdate {
offset: fetch_last_value,
session_id: stream_id,
}],
})
.await;
if let Err(err) = response {
error!("error sending offset: {:#?}", err);
break;
}
}
}
debug!(stream_id, "offset fetch update loop end");
});
// send back first offset records exists
if let Some(last_offset) = response.partition.next_offset_for_fetch() {
debug!(last_offset, "notify new last offset");
publisher.update(last_offset);
}
let response_publisher = publisher.clone();
let update_stream = stream.map(move |item| {
item.map(|response| {
if let Some(last_offset) = response.partition.next_offset_for_fetch() {
debug!(last_offset, stream_id, "received last offset from spu");
response_publisher.update(last_offset);
}
response
})
.map_err(|e| e.into())
});
Ok(Either::Left(iter(vec![Ok(response)]).chain(
publish_stream::EndPublishSt::new(update_stream, publisher),
)))
} else {
Ok(Either::Right(empty()))
}
}
}
mod publish_stream {
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Poll, Context};
use pin_project_lite::pin_project;
use futures_util::ready;
use super::Stream;
use super::OffsetPublisher;
// signal offset when stream is done
pin_project! {
pub struct EndPublishSt<St> {
#[pin]
stream: St,
publisher: Arc<OffsetPublisher>
}
}
impl<St> EndPublishSt<St> {
pub fn new(stream: St, publisher: Arc<OffsetPublisher>) -> Self {
Self { stream, publisher }
}
}
impl<S: Stream> Stream for EndPublishSt<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
let this = self.project();
let item = ready!(this.stream.poll_next(cx));
if item.is_none() {
this.publisher.update(-1);
}
Poll::Ready(item)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
}
/// compute total bytes in record set
fn bytes_count(records: &RecordSet) -> usize {
records
.batches
.iter()
.map(|batch| {
batch
.records()
.iter()
.map(|record| record.value.len())
.sum::<usize>()
})
.sum()
}
/// MAX FETCH BYTES
static MAX_FETCH_BYTES: Lazy<i32> = Lazy::new(|| {
use std::env;
let var_value = env::var("FLV_CLIENT_MAX_FETCH_BYTES").unwrap_or_default();
let max_bytes: i32 = var_value.parse().unwrap_or(1000000);
max_bytes
});
/// Configures the behavior of consumer fetching and streaming
#[derive(Debug)]
pub struct ConsumerConfig {
pub(crate) max_bytes: i32,
pub(crate) isolation: Isolation,
wasm_module: Vec<u8>,
}
impl Default for ConsumerConfig {
fn default() -> Self {
Self {
max_bytes: *MAX_FETCH_BYTES,
isolation: Isolation::default(),
wasm_module: vec![],
}
}
}
impl ConsumerConfig {
/// Maximum number of bytes to be fetched at a time.
pub fn with_max_bytes(mut self, max_bytes: i32) -> Self {
self.max_bytes = max_bytes;
self
}
/// set wasm filter
pub fn with_wasm_filter(mut self, bytes: Vec<u8>) -> Self {
self.wasm_module = bytes;
self
}
}
/// The individual record for a given stream.
pub struct Record {
/// The offset of this Record into its partition
offset: i64,
/// The Record contents
record: DefaultRecord,
}
impl Record {
/// The offset from the initial offset for a given stream.
pub fn offset(&self) -> i64 {
self.offset
}
/// Returns the contents of this Record's key, if it exists
pub fn key(&self) -> Option<&[u8]> {
self.record.key().map(|it| it.as_ref())
}
/// Returns the contents of this Record's value
pub fn value(&self) -> &[u8] {
self.record.value().as_ref()
}
/// Returns the inner representation of the Record
pub fn into_inner(self) -> DefaultRecord {
self.record
}
}
impl AsRef<[u8]> for Record {
fn as_ref(&self) -> &[u8] {
self.value()
}
}