1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
use std::sync::Arc;

use async_channel::Receiver;
use async_lock::RwLock;

use fluvio_protocol::record::Offset;
use fluvio_protocol::link::ErrorCode;
use fluvio_types::PartitionId;

use crate::error::Result;
use crate::producer::accumulator::ProducePartitionResponseFuture;

use super::error::ProducerError;

/// Metadata of a record send to a topic
#[derive(Clone, Debug, Default)]
pub struct RecordMetadata {
    /// The partition the record was sent to
    pub(crate) partition_id: PartitionId,
    /// The offset of the record in the topic/partition.
    pub(crate) offset: Offset,
}

impl RecordMetadata {
    /// The offset of the record in the topic/partition.
    pub fn offset(&self) -> Offset {
        self.offset
    }

    /// Partition index the record was sent to
    pub fn partition_id(&self) -> PartitionId {
        self.partition_id
    }
}

/// Possible states of a batch in the accumulator
pub(crate) enum BatchMetadataState {
    /// The batch is buffered and ready to be sent to the SPU
    Buffered(Receiver<ProducePartitionResponseFuture>),
    /// The batch was sent to the SPU. Base offset is known
    Sent(Offset),
    /// There was an error sending the batch to the SPU
    Failed(ProducerError),
}

pub(crate) struct BatchMetadata {
    state: RwLock<BatchMetadataState>,
}

impl BatchMetadata {
    pub(crate) fn new(receiver: Receiver<ProducePartitionResponseFuture>) -> Self {
        Self {
            state: RwLock::new(BatchMetadataState::Buffered(receiver)),
        }
    }

    /// Wait for the base offset of the batch. This is the offset of the first
    /// record in the batch and it is known once the batch is sent to the server.
    pub(crate) async fn base_offset(&self) -> Result<Offset> {
        let mut state = self.state.write().await;
        match &*state {
            BatchMetadataState::Buffered(receiver) => {
                let msg = receiver
                    .recv()
                    .await
                    .map_err(|err| ProducerError::GetRecordMetadata(Some(err)));

                match msg {
                    Ok(offset_future) => {
                        let (offset, error) = offset_future.await;
                        if error == ErrorCode::None {
                            *state = BatchMetadataState::Sent(offset);
                            Ok(offset)
                        } else {
                            let error = ProducerError::SpuErrorCode(error);
                            *state = BatchMetadataState::Failed(error.clone());
                            Err(error.into())
                        }
                    }
                    Err(err) => {
                        *state = BatchMetadataState::Failed(err.clone());
                        Err(err.into())
                    }
                }
            }
            BatchMetadataState::Sent(offset) => Ok(*offset),
            BatchMetadataState::Failed(error) => Err(error.clone().into()),
        }
    }
}

/// Partial information about record metadata.
/// Used to create FutureRecordMetadata once we have the partition id.
pub(crate) struct PartialFutureRecordMetadata {
    /// The offset of the record in the topic/partition.
    relative_offset: Offset,
    batch_metadata: Arc<BatchMetadata>,
}

impl PartialFutureRecordMetadata {
    pub(crate) fn new(relative_offset: Offset, batch_metadata: Arc<BatchMetadata>) -> Self {
        Self {
            relative_offset,
            batch_metadata,
        }
    }

    pub(crate) fn into_future_record_metadata(
        self,
        partition_id: PartitionId,
    ) -> FutureRecordMetadata {
        FutureRecordMetadata {
            partition_id,
            relative_offset: self.relative_offset,
            batch_metadata: self.batch_metadata,
        }
    }
}

/// Output of `TopicProducer::send`
/// Used to wait the `RecordMetadata` of the record being sent.
/// See `FutureRecordMetadata::wait`
pub struct FutureRecordMetadata {
    /// The partition the record was sent to
    pub(crate) partition_id: PartitionId,
    /// The offset of the record in the topic/partition.
    pub(crate) relative_offset: Offset,
    /// Handler to get base offset of the batch
    pub(crate) batch_metadata: Arc<BatchMetadata>,
}

impl FutureRecordMetadata {
    /// wait for the record metadata to be available
    pub async fn wait(self) -> Result<RecordMetadata> {
        let base_offset = self.batch_metadata.base_offset().await?;
        Ok(RecordMetadata {
            partition_id: self.partition_id,
            offset: base_offset + self.relative_offset,
        })
    }
}