use std::sync::Arc;
use tokio::task::JoinHandle;
use tracing::*;
use super::{
Error, ProducerClient,
aggregator::{self, Aggregator, StatusDeaggregator, TryPush},
broadcast::{BroadcastOnce, BroadcastOnceReceiver},
};
use crate::client::partition::Compression;
pub(super) type BatchWriteResult<A> = Result<Arc<AggregatedStatus<A>>, Error>;
#[derive(Debug)]
pub(super) struct AggregatedStatus<A>
where
A: Aggregator,
{
aggregated_status: Vec<i64>,
status_deagg: <A as Aggregator>::StatusDeaggregator,
}
#[derive(Debug)]
pub(crate) struct ResultHandle<A>
where
A: Aggregator,
{
receiver: BroadcastOnceReceiver<BatchWriteResult<A>>,
tag: A::Tag,
}
impl<A> ResultHandle<A>
where
A: Aggregator,
{
fn new(receiver: BroadcastOnceReceiver<BatchWriteResult<A>>, tag: A::Tag) -> Self {
Self { receiver, tag }
}
pub(super) async fn wait(&mut self) -> Result<BatchWriteResult<A>, Error> {
self.receiver
.receive()
.await
.map_err(|e| Error::FlushError(e.to_string()))
}
pub(super) fn result(
self,
status: BatchWriteResult<A>,
) -> Result<<A as aggregator::AggregatorStatus>::Status, Error> {
let status = status?;
status
.status_deagg
.deaggregate(&status.aggregated_status, self.tag)
.map_err(|e| Error::Aggregator(e.into()))
}
}
pub(crate) enum FlushResult<T> {
Ok(T, Option<JoinHandle<()>>),
Error(T, Error),
}
#[derive(Debug)]
pub(crate) struct BatchBuilder<A>
where
A: Aggregator,
{
aggregator: A,
results: BroadcastOnce<BatchWriteResult<A>>,
}
impl<A> BatchBuilder<A>
where
A: Aggregator,
{
pub(crate) fn new(aggregator: A) -> Self {
Self {
aggregator,
results: Default::default(),
}
}
pub(super) fn try_push(
&mut self,
data: A::Input,
) -> Result<TryPush<A::Input, ResultHandle<A>>, Error> {
match self
.aggregator
.try_push(data)
.map_err(|e| Error::Aggregator(e.into()))?
{
TryPush::NoCapacity(data) => Ok(TryPush::NoCapacity(data)),
TryPush::Aggregated(tag) => Ok(TryPush::Aggregated(ResultHandle::new(
self.results.receiver(),
tag,
))),
}
}
pub(super) fn background_flush(
mut self,
client: Arc<dyn ProducerClient>,
compression: Compression,
) -> FlushResult<Self> {
let (batch, status_deagg) = match self.aggregator.flush() {
Ok(v) => v,
Err(e) => {
return FlushResult::Error(Self::new(self.aggregator), Error::Aggregator(e.into()));
}
};
if batch.is_empty() {
debug!(?client, "No data aggregated, skipping client request");
self.results.broadcast(Ok(Arc::new(AggregatedStatus {
aggregated_status: vec![],
status_deagg,
})));
return FlushResult::Ok(Self::new(self.aggregator), None);
}
let handle = tokio::spawn({
let broadcast = self.results;
async move {
let res = match client.produce(batch, compression).await {
Ok(status) => Ok(Arc::new(AggregatedStatus {
aggregated_status: status,
status_deagg,
})),
Err(e) => {
error!(?client, error=?e, "Failed to produce records");
Err(Error::Client(Arc::new(e)))
}
};
broadcast.broadcast(res);
}
});
FlushResult::Ok(Self::new(self.aggregator), Some(handle))
}
}