use std::sync::Arc;
use async_channel::Receiver;
use async_lock::RwLock;
use fluvio_protocol::record::Offset;
use fluvio_protocol::link::ErrorCode;
use fluvio_types::PartitionId;
use crate::error::Result;
use crate::producer::accumulator::ProducePartitionResponseFuture;
use super::error::ProducerError;
#[derive(Clone, Debug, Default)]
pub struct RecordMetadata {
pub(crate) partition_id: PartitionId,
pub(crate) offset: Offset,
}
impl RecordMetadata {
pub fn offset(&self) -> Offset {
self.offset
}
pub fn partition_id(&self) -> PartitionId {
self.partition_id
}
}
pub(crate) enum BatchMetadataState {
Buffered(Receiver<ProducePartitionResponseFuture>),
Sent(Offset),
Failed(ProducerError),
}
pub(crate) struct BatchMetadata {
state: RwLock<BatchMetadataState>,
}
impl BatchMetadata {
pub(crate) fn new(receiver: Receiver<ProducePartitionResponseFuture>) -> Self {
Self {
state: RwLock::new(BatchMetadataState::Buffered(receiver)),
}
}
pub(crate) async fn base_offset(&self) -> Result<Offset> {
let mut state = self.state.write().await;
match &*state {
BatchMetadataState::Buffered(receiver) => {
let msg = receiver
.recv()
.await
.map_err(|err| ProducerError::GetRecordMetadata(Some(err)));
match msg {
Ok(offset_future) => {
let (offset, error) = offset_future.await;
if error == ErrorCode::None {
*state = BatchMetadataState::Sent(offset);
Ok(offset)
} else {
let error = ProducerError::SpuErrorCode(error);
*state = BatchMetadataState::Failed(error.clone());
Err(error.into())
}
}
Err(err) => {
*state = BatchMetadataState::Failed(err.clone());
Err(err.into())
}
}
}
BatchMetadataState::Sent(offset) => Ok(*offset),
BatchMetadataState::Failed(error) => Err(error.clone().into()),
}
}
}
pub(crate) struct PartialFutureRecordMetadata {
relative_offset: Offset,
batch_metadata: Arc<BatchMetadata>,
}
impl PartialFutureRecordMetadata {
pub(crate) fn new(relative_offset: Offset, batch_metadata: Arc<BatchMetadata>) -> Self {
Self {
relative_offset,
batch_metadata,
}
}
pub(crate) fn into_future_record_metadata(
self,
partition_id: PartitionId,
) -> FutureRecordMetadata {
FutureRecordMetadata {
partition_id,
relative_offset: self.relative_offset,
batch_metadata: self.batch_metadata,
}
}
}
pub struct FutureRecordMetadata {
pub(crate) partition_id: PartitionId,
pub(crate) relative_offset: Offset,
pub(crate) batch_metadata: Arc<BatchMetadata>,
}
impl FutureRecordMetadata {
pub async fn wait(self) -> Result<RecordMetadata> {
let base_offset = self.batch_metadata.base_offset().await?;
Ok(RecordMetadata {
partition_id: self.partition_id,
offset: base_offset + self.relative_offset,
})
}
}