use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use std::time::Instant;
use event_listener::Event;
#[cfg(target_arch = "wasm32")]
use web_time::Instant;
use async_channel::Sender;
use async_lock::RwLock;
use tracing::trace;
use futures_util::future::{BoxFuture, Either, Shared};
use futures_util::{FutureExt, ready};
use fluvio_future::future::timeout;
use fluvio_protocol::record::Batch;
use fluvio_compression::Compression;
use fluvio_protocol::record::Offset;
use fluvio_protocol::link::ErrorCode;
use fluvio_spu_schema::produce::ProduceResponse;
use fluvio_protocol::record::Record;
use fluvio_socket::SocketError;
use fluvio_types::{PartitionId, Timestamp, PartitionCount};
use crate::producer::record::{BatchMetadata, FutureRecordMetadata, PartialFutureRecordMetadata};
use crate::producer::ProducerError;
use crate::error::Result;
use super::event::EventHandler;
use super::memory_batch::{MemoryBatch, MemoryBatchStatus};
const RECORD_ENQUEUE_TIMEOUT: Duration = Duration::from_secs(30);
pub(crate) type BatchHandler = (Arc<BatchEvents>, Arc<BatchesDeque>);
pub(crate) struct BatchesDeque {
pub batches: RwLock<VecDeque<ProducerBatch>>,
pub free_space_event: Event,
}
impl BatchesDeque {
pub(crate) fn new() -> Self {
Self {
batches: RwLock::new(VecDeque::new()),
free_space_event: Event::new(),
}
}
pub(crate) fn shared() -> Arc<Self> {
Arc::new(Self::new())
}
}
pub(crate) struct RecordAccumulator {
batch_size: usize,
max_request_size: usize,
queue_size: usize,
batches: Arc<RwLock<HashMap<PartitionId, BatchHandler>>>,
compression: Compression,
}
impl RecordAccumulator {
pub(crate) fn new(
batch_size: usize,
max_request_size: usize,
queue_size: usize,
partition_n: PartitionCount,
compression: Compression,
) -> Self {
let batches = (0..partition_n)
.map(|p| (p, (BatchEvents::shared(), BatchesDeque::shared())))
.collect::<HashMap<_, _>>();
Self {
batches: Arc::new(RwLock::new(batches)),
max_request_size,
batch_size,
compression,
queue_size,
}
}
pub(crate) async fn add_partition(
&self,
partition_id: PartitionId,
value: (Arc<BatchEvents>, Arc<BatchesDeque>),
) -> BatchHandler {
self.batches
.write()
.await
.insert(partition_id, value.clone());
value
}
pub(crate) async fn push_record(
&self,
record: Record,
partition_id: PartitionId,
) -> Result<PushRecord, ProducerError> {
let created_at = Instant::now();
let batches_lock = self.batches.read().await;
let (batch_events, batches_lock) = batches_lock
.get(&partition_id)
.ok_or(ProducerError::PartitionNotFound(partition_id))?;
self.wait_for_space(batches_lock.clone()).await?;
let mut batches = batches_lock.batches.write().await;
if let Some(batch) = batches.back_mut() {
match batch.push_record(record) {
Ok(ProduceBatchStatus::Added(push_record)) => {
if batch.is_full() {
batch_events.notify_batch_full().await;
}
return Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
));
}
Ok(ProduceBatchStatus::NotAdded(record)) => {
if batch.is_full() {
batch_events.notify_batch_full().await;
}
let push_record = self
.create_and_new_batch(batch_events, &mut batches, record, 1, created_at)
.await?;
return Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
));
}
Err(err) => {
return Err(err);
}
}
}
trace!(partition_id, "Creating a new batch");
let push_record = self
.create_and_new_batch(batch_events, &mut batches, record, 1, created_at)
.await?;
Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
))
}
async fn wait_for_space(&self, batches_lock: Arc<BatchesDeque>) -> Result<(), ProducerError> {
let space_listener = batches_lock.free_space_event.listen();
loop {
let batches = batches_lock.batches.read().await;
if batches.len() < self.queue_size {
batches_lock.free_space_event.notify(1);
break;
}
}
match timeout(RECORD_ENQUEUE_TIMEOUT, space_listener).await {
Ok(_) => Ok(()),
Err(_) => Err(ProducerError::BatchQueueWaitTimeout),
}
}
async fn create_and_new_batch(
&self,
batch_events: &BatchEvents,
batches: &mut VecDeque<ProducerBatch>,
record: Record,
attempts: usize,
created_at: Instant,
) -> Result<PartialFutureRecordMetadata, ProducerError> {
if attempts > 2 {
return Err(ProducerError::Internal(
"Attempts exceeded while creating a new batch".to_string(),
));
}
let mut batch = ProducerBatch::new(
self.max_request_size,
self.batch_size,
self.compression,
created_at,
);
match batch.push_record(record) {
Ok(ProduceBatchStatus::Added(push_record)) => {
batch_events.notify_new_batch().await;
if batch.is_full() {
batch_events.notify_batch_full().await;
}
batches.push_back(batch);
Ok(push_record)
}
Ok(ProduceBatchStatus::NotAdded(record)) => {
batch_events.notify_new_batch().await;
if batch.is_full() {
batch_events.notify_batch_full().await;
}
batches.push_back(batch);
Box::pin(self.create_and_new_batch(
batch_events,
batches,
record,
attempts + 1,
created_at,
))
.await
}
Err(err) => Err(err),
}
}
pub(crate) async fn batches(&self) -> HashMap<PartitionId, BatchHandler> {
self.batches.read().await.clone()
}
}
#[derive(Debug)]
pub struct ProduceCompletionBatchEvent {
pub bytes_size: u64,
pub records_len: u64,
pub partition: PartitionId,
pub created_at: Instant,
pub elapsed: Duration,
}
pub type SharedProducerCallback = Arc<dyn ProducerCallback + Send + Sync>;
pub trait ProducerCallback {
fn finished(&self, item: ProduceCompletionBatchEvent) -> BoxFuture<'_, anyhow::Result<()>>;
}
pub(crate) struct PushRecord {
pub(crate) future: FutureRecordMetadata,
}
impl PushRecord {
fn new(future: FutureRecordMetadata) -> Self
where {
Self { future }
}
}
enum ProduceBatchStatus {
Added(PartialFutureRecordMetadata),
NotAdded(Record),
}
pub(crate) struct ProducerBatch {
pub(crate) notify: Sender<ProducePartitionResponseFuture>,
batch_metadata: Arc<BatchMetadata>,
batch: MemoryBatch,
}
impl ProducerBatch {
fn new(
write_limit: usize,
batch_limit: usize,
compression: Compression,
created_at: Instant,
) -> Self {
let (sender, receiver) = async_channel::bounded(1);
let batch_metadata = Arc::new(BatchMetadata::new(receiver, Some(created_at)));
let batch = MemoryBatch::new(write_limit, batch_limit, compression);
Self {
notify: sender,
batch_metadata,
batch,
}
}
fn push_record(&mut self, record: Record) -> Result<ProduceBatchStatus, ProducerError> {
match self.batch.push_record(record) {
Ok(MemoryBatchStatus::Added(offset)) => Ok(ProduceBatchStatus::Added(
PartialFutureRecordMetadata::new(offset, self.batch_metadata.clone()),
)),
Ok(MemoryBatchStatus::NotAdded(record)) => Ok(ProduceBatchStatus::NotAdded(record)),
Err(err) => Err(err),
}
}
pub(crate) fn is_full(&self) -> bool {
self.batch.is_full()
}
pub(crate) fn elapsed(&self) -> Timestamp {
self.batch.elapsed()
}
pub(crate) fn batch(self) -> Batch {
self.batch.into()
}
pub(crate) fn metadata(&self) -> Arc<BatchMetadata> {
self.batch_metadata.clone()
}
}
pub(crate) struct BatchEvents {
batch_full: EventHandler,
new_batch: EventHandler,
}
impl BatchEvents {
fn new() -> Self {
let batch_full = EventHandler::new();
let new_batch = EventHandler::new();
Self {
batch_full,
new_batch,
}
}
pub fn shared() -> Arc<Self> {
Arc::new(Self::new())
}
pub async fn listen_batch_full(&self) {
self.batch_full.listen().await
}
pub async fn listen_new_batch(&self) {
self.new_batch.listen().await
}
pub async fn notify_batch_full(&self) {
self.batch_full.notify().await;
}
pub async fn notify_new_batch(&self) {
self.new_batch.notify().await;
}
}
type ProduceResponseFuture = Shared<BoxFuture<'static, Arc<Result<ProduceResponse, SocketError>>>>;
pub(crate) struct ProducePartitionResponseFuture {
inner: Either<(ProduceResponseFuture, usize), Option<(Offset, ErrorCode)>>,
}
impl ProducePartitionResponseFuture {
pub(crate) fn ready(offset: Offset, error: ErrorCode) -> Self {
Self {
inner: Either::Right(Some((offset, error))),
}
}
pub(crate) fn from(response_fut: ProduceResponseFuture, num: usize) -> Self {
Self {
inner: Either::Left((response_fut, num)),
}
}
}
impl Future for ProducePartitionResponseFuture {
type Output = (Offset, ErrorCode);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.get_mut().inner {
Either::Left(ref mut pair) => {
let response = ready!((pair.0).poll_unpin(cx));
match response.as_ref() {
Ok(response) => Poll::Ready(
response
.responses
.iter()
.flat_map(|t| &t.partitions)
.nth(pair.1)
.map(|p| (p.base_offset, ErrorCode::None))
.unwrap_or_else(|| {
(
Offset::default(),
ErrorCode::Other(
"partition not found during collecting async response"
.to_string(),
),
)
}),
),
Err(err) => Poll::Ready((0, ErrorCode::Other(format!("{err:?}")))),
}
}
Either::Right(ref mut maybe_pair) => match maybe_pair.take() {
None => Poll::Ready((0, ErrorCode::Other("empty response".to_string()))),
Some(pair) => Poll::Ready(pair),
},
}
}
}
#[cfg(test)]
mod test {
use super::*;
use fluvio_protocol::record::{Record, RawRecords};
use fluvio_spu_schema::produce::{PartitionProduceResponse, TopicProduceResponse};
use fluvio_protocol::Encoder;
#[test]
fn test_producer_batch_push_and_not_full() {
let record = Record::from(("key", "value"));
let size = record.write_size(0);
let mut pb = ProducerBatch::new(
1_048_576,
size * 3
+ 1
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
Compression::None,
Instant::now(),
);
assert!(matches!(
pb.push_record(record.clone()),
Ok(ProduceBatchStatus::Added(_))
));
assert!(matches!(
pb.push_record(record.clone()),
Ok(ProduceBatchStatus::Added(_))
));
assert!(matches!(
pb.push_record(record.clone()),
Ok(ProduceBatchStatus::Added(_))
));
assert!(!pb.is_full());
assert!(matches!(
pb.push_record(record),
Ok(ProduceBatchStatus::NotAdded(_))
));
}
#[test]
fn test_producer_batch_push_and_full() {
let record = Record::from(("key", "value"));
let size = record.write_size(0);
let mut pb = ProducerBatch::new(
1_048_576,
size * 3
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
Compression::None,
Instant::now(),
);
assert!(matches!(
pb.push_record(record.clone()),
Ok(ProduceBatchStatus::Added(_))
));
assert!(matches!(
pb.push_record(record.clone()),
Ok(ProduceBatchStatus::Added(_))
));
assert!(matches!(
pb.push_record(record.clone()),
Ok(ProduceBatchStatus::Added(_))
));
assert!(pb.is_full());
assert!(matches!(
pb.push_record(record),
Ok(ProduceBatchStatus::NotAdded(_))
));
}
#[test]
fn test_producer_write_limit() {
let record = Record::from(("key", "value"));
let size = record.write_size(0);
let mut pb = ProducerBatch::new(
size * 3
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
size * 3
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
Compression::None,
Instant::now(),
);
assert!(matches!(
pb.push_record(record.clone()),
Ok(ProduceBatchStatus::Added(_))
));
assert!(matches!(
pb.push_record(record.clone()),
Ok(ProduceBatchStatus::Added(_))
));
assert!(matches!(
pb.push_record(record.clone()),
Ok(ProduceBatchStatus::Added(_))
));
assert!(pb.is_full());
assert!(pb.push_record(record).is_err());
}
#[fluvio_future::test]
async fn test_record_accumulator() {
let record = Record::from(("key", "value"));
let size = record.write_size(0);
let accumulator = RecordAccumulator::new(
size * 3
+ Batch::<RawRecords>::default().write_size(0)
+ Vec::<RawRecords>::default().write_size(0),
1_048_576,
10,
1,
Compression::None,
);
let timeout = std::time::Duration::from_millis(200);
let batches = accumulator
.batches()
.await
.get(&0)
.expect("failed to get batch info")
.0
.clone();
accumulator
.push_record(record.clone(), 0)
.await
.expect("failed push");
assert!(
fluvio_future::future::timeout(timeout, batches.listen_new_batch())
.await
.is_ok()
);
assert!(
fluvio_future::future::timeout(timeout, batches.listen_batch_full())
.await
.is_err()
);
accumulator
.push_record(record.clone(), 0)
.await
.expect("failed push");
assert!(
fluvio_future::future::timeout(timeout, batches.listen_batch_full())
.await
.is_err()
);
accumulator
.push_record(record, 0)
.await
.expect("failed push");
assert!(
fluvio_future::future::timeout(timeout, batches.listen_batch_full())
.await
.is_ok()
);
let record_2 = Record::from(("key_2", "value_2"));
let batch_events = BatchEvents::shared();
let batches_deque = BatchesDeque::shared();
accumulator
.add_partition(1, (batch_events.clone(), batches_deque.clone()))
.await;
accumulator
.push_record(record_2.clone(), 1)
.await
.expect("failed push");
let batches = accumulator
.batches()
.await
.get(&1)
.expect("failed to get batch info")
.0
.clone();
assert!(
fluvio_future::future::timeout(timeout, batches.listen_new_batch())
.await
.is_ok()
);
assert!(
fluvio_future::future::timeout(timeout, batches.listen_batch_full())
.await
.is_err()
);
}
#[fluvio_future::test]
async fn test_produce_partition_response_future_ready() {
let offset = 10;
let error_code = ErrorCode::default();
let fut = ProducePartitionResponseFuture::ready(offset, error_code.clone());
let (resolved_offset, resolved_error) = fut.await;
assert_eq!(offset, resolved_offset);
assert_eq!(error_code, resolved_error);
}
#[fluvio_future::test]
async fn test_produce_partition_response_future_on_error() {
let num = 0;
let fut = async { Arc::new(Err(SocketError::SocketClosed)) }
.boxed()
.shared();
let fut = ProducePartitionResponseFuture::from(fut, num);
let (resolved_offset, resolved_error) = fut.await;
assert_eq!(resolved_offset, 0);
assert_eq!(resolved_error, ErrorCode::Other("SocketClosed".to_string()));
}
#[fluvio_future::test]
async fn test_produce_partition_response_future_resolved() {
let num = 2;
let fut = async {
Arc::new(Ok(ProduceResponse {
responses: vec![
TopicProduceResponse {
name: "".to_string(),
partitions: vec![
PartitionProduceResponse {
base_offset: 1,
..Default::default()
},
PartitionProduceResponse {
base_offset: 2,
..Default::default()
},
],
},
TopicProduceResponse {
name: "".to_string(),
partitions: vec![PartitionProduceResponse {
base_offset: 3,
..Default::default()
}],
},
],
throttle_time_ms: 0,
}))
}
.boxed()
.shared();
let fut = ProducePartitionResponseFuture::from(fut, num);
let (resolved_offset, resolved_error) = fut.await;
assert_eq!(resolved_offset, 3);
assert_eq!(resolved_error, ErrorCode::None);
}
#[fluvio_future::test]
async fn test_produce_partition_response_future_not_found() {
let num = 2;
let fut = async {
Arc::new(Ok(ProduceResponse {
responses: vec![TopicProduceResponse {
name: "".to_string(),
partitions: vec![PartitionProduceResponse {
base_offset: 3,
..Default::default()
}],
}],
throttle_time_ms: 0,
}))
}
.boxed()
.shared();
let fut = ProducePartitionResponseFuture::from(fut, num);
let (resolved_offset, resolved_error) = fut.await;
assert_eq!(resolved_offset, 0);
assert_eq!(
resolved_error,
ErrorCode::Other("partition not found during collecting async response".to_string())
);
}
}