1use crate::message::{Message, ProducedMessage};
2use anyhow::format_err;
3use anyhow::Result;
4use futures::Future;
5use pulsar::Producer;
6use pulsar::TokioExecutor;
7use std::error::Error as StdError;
8use std::sync::atomic::AtomicU64;
9use std::sync::atomic::Ordering;
10use std::sync::Arc;
11use std::time::Duration;
12use std::time::Instant;
13use tokio::sync::mpsc::{Receiver, Sender};
14
15const SEND_TIMEOUT: Duration = Duration::from_secs(30);
16const RETRY_DELAY: Duration = Duration::from_secs(1);
17
18#[derive(Clone)]
19pub struct BackgroundProducer {
20 #[cfg(feature = "metrics")]
21 topic_name: String,
22 tx: Sender<Message>,
23 pending: Arc<AtomicU64>,
24}
25
26struct RetryQueue {
27 rx: Receiver<Message>,
28 unsent: Option<Message>,
29 next_retry: Option<Instant>,
30}
31
32impl RetryQueue {
33 async fn next(&mut self) -> Option<Message> {
34 if let Some(ts) = self.next_retry.take() {
35 tokio::time::sleep_until(ts.into()).await;
36 }
37 if let Some(message) = self.unsent.take() {
38 Some(message)
39 } else {
40 self.rx.recv().await
41 }
42 }
43
44 fn schedule_retry(&mut self, message: Message) {
45 assert!(
46 self.unsent.is_none(),
47 "schedule_retry called with an already pending message"
48 );
49 self.unsent.replace(message);
50 self.next_retry = Some(Instant::now() + RETRY_DELAY);
51 }
52}
53
54impl BackgroundProducer {
55 pub async fn spawn_simple(
56 url: impl Into<String>,
57 topic: impl Into<String>,
58 producer_name: Option<String>,
59 ) -> Result<Self> {
60 let url: String = url.into();
61 let topic: String = topic.into();
62
63 Self::spawn(move || {
64 let url = url.clone();
65 let topic = topic.clone();
66 let producer_name = producer_name.clone();
67 async move {
68 let mut returned = pulsar::Pulsar::builder(url.clone(), TokioExecutor)
69 .build()
70 .await?
71 .producer()
72 .with_topic(topic);
73 if let Some(producer_name) = producer_name.clone() {
74 returned = returned.with_name(&producer_name);
75 }
76
77 returned.build().await
78 }
79 })
80 .await
81 }
82
83 pub async fn spawn<Fut, F, E>(producer_factory: F) -> Result<Self>
84 where
85 Fut: Future<Output = Result<Producer<TokioExecutor>, E>> + Send,
86 E: Into<anyhow::Error> + StdError,
87 F: Fn() -> Fut + Send + Sync + 'static,
88 {
89 let mut producer = Some(producer_factory().await.map_err(Into::into)?);
90
91 #[cfg(feature = "metrics")]
92 let topic_name = producer.as_ref().unwrap().topic().to_owned();
93
94 let (tx, rx) = tokio::sync::mpsc::channel::<Message>(1000);
95 let pending = Arc::new(AtomicU64::new(0));
96 let pending_msgs = pending.clone();
97 let mut queue = RetryQueue {
98 rx,
99 unsent: None,
100 next_retry: None,
101 };
102 #[cfg(feature = "metrics")]
103 let topic_name_label = topic_name.clone();
104
105 tokio::task::spawn(async move {
106 loop {
107 let message = match queue.next().await {
108 None => break,
109 Some(message) => message,
110 };
111
112 if producer.is_none() {
113 match producer_factory().await {
114 Ok(p) => producer = Some(p),
115 Err(e) => {
116 log::error!("Failed recreting producer: {:?}", e);
117 queue.schedule_retry(message);
118 continue;
119 }
120 }
121 }
122
123 let res =
124 tokio::time::timeout(SEND_TIMEOUT, message.send(producer.as_mut().unwrap()))
125 .await;
126
127 let needs_producer_recreate = res.is_err();
128
129 if let Err(e) = res
130 .map_err(|elapsed| format_err!("Timeout sending message after {:?}", elapsed))
131 .and_then(|r| r.map_err(anyhow::Error::from))
132 {
133 log::error!("Sending message failed: {:?}", e);
134 queue.schedule_retry(message);
135 if needs_producer_recreate {
136 producer = None
137 }
138 } else {
139 pending_msgs.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
140 #[cfg(feature = "metrics")]
141 crate::metrics::NUM_MSGS_SENT
142 .with_label_values(&[&topic_name_label])
143 .inc();
144 }
145 }
146 });
147 Ok(Self {
148 #[cfg(feature = "metrics")]
149 topic_name,
150 tx,
151 pending,
152 })
153 }
154
155 pub fn produce(&self) -> ProducedMessage {
156 ProducedMessage {
157 message: Default::default(),
158 producer: self.clone(),
159 }
160 }
161
162 pub fn has_pending_messages(&self) -> bool {
163 self.num_pending_messages() > 0
164 }
165
166 pub fn num_pending_messages(&self) -> u64 {
167 self.pending.load(Ordering::Relaxed)
168 }
169
170 pub(crate) fn enqueue(&self, msg: Message) -> Result<()> {
171 self.tx
172 .try_send(msg)
173 .map_err(|_| format_err!("Cannot enqueue message"))
174 .map(|()| {
175 self.pending
176 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
177 #[cfg(feature = "metrics")]
178 crate::metrics::NUM_MSGS_QUEUED
179 .with_label_values(&[&self.topic_name])
180 .inc();
181 })
182 }
183}