1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
use crate::producer::record::{RecordMetadata, FutureRecordMetadata};
use crate::producer::error::ProducerError;
use crate::error::Result;

/// Struct returned by of TopicProduce::send call, it is used
///  to gather the record metadata associated to each send call.
#[derive(Default)]
pub struct ProduceOutput {
    record_metadata: Vec<FutureRecordMetadata>,
}

impl ProduceOutput {
    /// Add future record metadata to the output.
    pub(crate) fn add(&mut self, record_metadata: FutureRecordMetadata) {
        self.record_metadata.push(record_metadata);
    }

    /// Wait for all record metadata of all records sent using smartmodule
    #[cfg(feature = "smartengine")]
    pub async fn wait_all(self) -> Result<Vec<RecordMetadata>> {
        let mut records_metadata = Vec::with_capacity(self.record_metadata.len());

        for future in self.record_metadata.into_iter() {
            let metadata = future.wait().await?;
            records_metadata.push(metadata)
        }
        Ok(records_metadata)
    }

    /// Wait for the record metadata
    pub async fn wait(self) -> Result<RecordMetadata> {
        let future_record_metadata = self
            .record_metadata
            .into_iter()
            .next()
            .ok_or(ProducerError::GetRecordMetadata(None))?;
        let record_metadata = future_record_metadata.wait().await?;
        Ok(record_metadata)
    }
}