venta 0.9.0

Reliable producer interface for Pulsar
Documentation
use anyhow::{format_err, Error, Result};
use chrono::DateTime;
use chrono::Utc;
use pulsar::TokioExecutor;
use std::collections::HashMap;

use crate::BackgroundProducer;

pub(crate) struct Message {
    data: Vec<u8>,
    timestamp: Option<DateTime<Utc>>,
    properties: HashMap<String, String>,
}

impl Message {
    pub(crate) async fn send(
        &self,
        pulsar_producer: &mut pulsar::Producer<TokioExecutor>,
    ) -> Result<()> {
        let message = pulsar::producer::Message {
            payload: self.data.to_vec(),
            properties: self.properties.clone(),
            event_time: self
                .timestamp
                .or_else(|| Some(Utc::now()))
                .map(|ts| ts.timestamp_millis() as u64),
            ..Default::default()
        };

        pulsar_producer
            .send(message)
            .await
            .map(drop)
            .map_err(anyhow::Error::from)
    }
}

#[derive(Default)]
pub(crate) struct MessageBuilder {
    data: Option<Result<Vec<u8>>>,
    properties: HashMap<String, String>,
    timestamp: Option<DateTime<Utc>>,
}

impl MessageBuilder {
    fn build(self) -> Result<Message> {
        let data = self
            .data
            .ok_or_else(|| format_err!("No data set"))
            .and_then(|data| data)?;
        let timestamp = self.timestamp.or_else(|| Some(Utc::now()));
        let properties = self.properties;
        Ok(Message {
            data,
            timestamp,
            properties,
        })
    }
}

pub struct ProducedMessage {
    pub(crate) message: MessageBuilder,
    pub(crate) producer: BackgroundProducer,
}

impl ProducedMessage {
    pub fn text(mut self, text: impl Into<Vec<u8>>) -> Self {
        self.message.data.replace(Ok(text.into()));
        self
    }

    pub fn json(mut self, json: &impl serde::Serialize) -> Self {
        self.message
            .data
            .replace(serde_json::to_vec(json).map_err(Error::from));
        self
    }

    pub fn timestamp(mut self, ts: DateTime<Utc>) -> Self {
        self.message.timestamp.replace(ts);
        self
    }

    pub fn property(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
        self.message.properties.insert(name.into(), value.into());
        self
    }

    pub fn properties(mut self, properties: impl Iterator<Item = (String, String)>) -> Self {
        for (key, value) in properties {
            self.message.properties.insert(key, value);
        }
        self
    }

    fn build(self) -> Result<(BackgroundProducer, Message)> {
        let message = self
            .message
            .build()
            .map_err(|s| format_err!("Error constructing message: {}", s))?;
        let producer = self.producer;
        Ok((producer, message))
    }

    pub fn enqueue(self) -> Result<()> {
        let (producer, message) = self.build()?;

        producer.enqueue(message)
    }
}