Skip to main content

venta/
producer.rs

1use crate::message::{Message, ProducedMessage};
2use anyhow::format_err;
3use anyhow::Result;
4use futures::Future;
5use pulsar::Producer;
6use pulsar::TokioExecutor;
7use std::error::Error as StdError;
8use std::sync::atomic::AtomicU64;
9use std::sync::atomic::Ordering;
10use std::sync::Arc;
11use std::time::Duration;
12use std::time::Instant;
13use tokio::sync::mpsc::{Receiver, Sender};
14
15const SEND_TIMEOUT: Duration = Duration::from_secs(30);
16const RETRY_DELAY: Duration = Duration::from_secs(1);
17
18#[derive(Clone)]
19pub struct BackgroundProducer {
20    #[cfg(feature = "metrics")]
21    topic_name: String,
22    tx: Sender<Message>,
23    pending: Arc<AtomicU64>,
24}
25
26struct RetryQueue {
27    rx: Receiver<Message>,
28    unsent: Option<Message>,
29    next_retry: Option<Instant>,
30}
31
32impl RetryQueue {
33    async fn next(&mut self) -> Option<Message> {
34        if let Some(ts) = self.next_retry.take() {
35            tokio::time::sleep_until(ts.into()).await;
36        }
37        if let Some(message) = self.unsent.take() {
38            Some(message)
39        } else {
40            self.rx.recv().await
41        }
42    }
43
44    fn schedule_retry(&mut self, message: Message) {
45        assert!(
46            self.unsent.is_none(),
47            "schedule_retry called with an already pending message"
48        );
49        self.unsent.replace(message);
50        self.next_retry = Some(Instant::now() + RETRY_DELAY);
51    }
52}
53
54impl BackgroundProducer {
55    pub async fn spawn_simple(
56        url: impl Into<String>,
57        topic: impl Into<String>,
58        producer_name: Option<String>,
59    ) -> Result<Self> {
60        let url: String = url.into();
61        let topic: String = topic.into();
62
63        Self::spawn(move || {
64            let url = url.clone();
65            let topic = topic.clone();
66            let producer_name = producer_name.clone();
67            async move {
68                let mut returned = pulsar::Pulsar::builder(url.clone(), TokioExecutor)
69                    .build()
70                    .await?
71                    .producer()
72                    .with_topic(topic);
73                if let Some(producer_name) = producer_name.clone() {
74                    returned = returned.with_name(&producer_name);
75                }
76
77                returned.build().await
78            }
79        })
80        .await
81    }
82
83    pub async fn spawn<Fut, F, E>(producer_factory: F) -> Result<Self>
84    where
85        Fut: Future<Output = Result<Producer<TokioExecutor>, E>> + Send,
86        E: Into<anyhow::Error> + StdError,
87        F: Fn() -> Fut + Send + Sync + 'static,
88    {
89        let mut producer = Some(producer_factory().await.map_err(Into::into)?);
90
91        #[cfg(feature = "metrics")]
92        let topic_name = producer.as_ref().unwrap().topic().to_owned();
93
94        let (tx, rx) = tokio::sync::mpsc::channel::<Message>(1000);
95        let pending = Arc::new(AtomicU64::new(0));
96        let pending_msgs = pending.clone();
97        let mut queue = RetryQueue {
98            rx,
99            unsent: None,
100            next_retry: None,
101        };
102        #[cfg(feature = "metrics")]
103        let topic_name_label = topic_name.clone();
104
105        tokio::task::spawn(async move {
106            loop {
107                let message = match queue.next().await {
108                    None => break,
109                    Some(message) => message,
110                };
111
112                if producer.is_none() {
113                    match producer_factory().await {
114                        Ok(p) => producer = Some(p),
115                        Err(e) => {
116                            log::error!("Failed recreting producer: {:?}", e);
117                            queue.schedule_retry(message);
118                            continue;
119                        }
120                    }
121                }
122
123                let res =
124                    tokio::time::timeout(SEND_TIMEOUT, message.send(producer.as_mut().unwrap()))
125                        .await;
126
127                let needs_producer_recreate = res.is_err();
128
129                if let Err(e) = res
130                    .map_err(|elapsed| format_err!("Timeout sending message after {:?}", elapsed))
131                    .and_then(|r| r.map_err(anyhow::Error::from))
132                {
133                    log::error!("Sending message failed: {:?}", e);
134                    queue.schedule_retry(message);
135                    if needs_producer_recreate {
136                        producer = None
137                    }
138                } else {
139                    pending_msgs.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
140                    #[cfg(feature = "metrics")]
141                    crate::metrics::NUM_MSGS_SENT
142                        .with_label_values(&[&topic_name_label])
143                        .inc();
144                }
145            }
146        });
147        Ok(Self {
148            #[cfg(feature = "metrics")]
149            topic_name,
150            tx,
151            pending,
152        })
153    }
154
155    pub fn produce(&self) -> ProducedMessage {
156        ProducedMessage {
157            message: Default::default(),
158            producer: self.clone(),
159        }
160    }
161
162    pub fn has_pending_messages(&self) -> bool {
163        self.num_pending_messages() > 0
164    }
165
166    pub fn num_pending_messages(&self) -> u64 {
167        self.pending.load(Ordering::Relaxed)
168    }
169
170    pub(crate) fn enqueue(&self, msg: Message) -> Result<()> {
171        self.tx
172            .try_send(msg)
173            .map_err(|_| format_err!("Cannot enqueue message"))
174            .map(|()| {
175                self.pending
176                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
177                #[cfg(feature = "metrics")]
178                crate::metrics::NUM_MSGS_QUEUED
179                    .with_label_values(&[&self.topic_name])
180                    .inc();
181            })
182    }
183}