Skip to main content

diidi_travel_common_queue/
queue.rs

1use std::collections::BTreeMap;
2use std::fmt::Debug;
3use std::future::Future;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8
9use crate::error::{QueueError, QueueResult};
10use crate::feature::QueueFeatures;
11
12#[derive(Debug, Clone)]
13pub struct QueueMessage {
14  pub topic: String,
15  pub payload: Bytes,
16  pub headers: BTreeMap<String, String>,
17  pub key: Option<String>,
18  pub partition: Option<i32>,
19  pub dead_letter: Option<DeadLetterTarget>,
20  pub attributes: serde_json::Value,
21}
22
23impl QueueMessage {
24  pub fn new(topic: impl Into<String>, payload: impl Into<Bytes>) -> Self {
25    Self {
26      topic: topic.into(),
27      payload: payload.into(),
28      headers: BTreeMap::new(),
29      key: None,
30      partition: None,
31      dead_letter: None,
32      attributes: serde_json::Value::Null,
33    }
34  }
35
36  pub fn with_key(mut self, key: impl Into<String>) -> Self {
37    self.key = Some(key.into());
38    self
39  }
40
41  pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
42    self.headers.insert(key.into(), value.into());
43    self
44  }
45
46  pub fn with_partition(mut self, partition: i32) -> Self {
47    self.partition = Some(partition);
48    self
49  }
50
51  pub fn with_dead_letter(mut self, target: DeadLetterTarget) -> Self {
52    self.dead_letter = Some(target);
53    self
54  }
55
56  pub fn with_attributes(mut self, attributes: serde_json::Value) -> Self {
57    self.attributes = attributes;
58    self
59  }
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct DeadLetterTarget {
64  pub topic: String,
65  pub routing_key: Option<String>,
66}
67
68impl DeadLetterTarget {
69  pub fn new(topic: impl Into<String>) -> Self {
70    Self { topic: topic.into(), routing_key: None }
71  }
72
73  pub fn with_routing_key(mut self, routing_key: impl Into<String>) -> Self {
74    self.routing_key = Some(routing_key.into());
75    self
76  }
77}
78
79#[derive(Debug, Clone)]
80pub struct QueueReceipt {
81  pub token: Bytes,
82  pub attributes: serde_json::Value,
83}
84
85impl QueueReceipt {
86  pub fn new(token: impl Into<Bytes>) -> Self {
87    Self { token: token.into(), attributes: serde_json::Value::Null }
88  }
89
90  pub fn with_attributes(mut self, attributes: serde_json::Value) -> Self {
91    self.attributes = attributes;
92    self
93  }
94}
95
96#[derive(Debug, Clone)]
97pub struct QueueDelivery {
98  pub message: QueueMessage,
99  pub receipt: QueueReceipt,
100}
101
102impl QueueDelivery {
103  pub fn new(message: QueueMessage, receipt: QueueReceipt) -> Self {
104    Self { message, receipt }
105  }
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub enum NackAction {
110  Requeue,
111  DeadLetter,
112  Drop,
113}
114
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum ConsumeAction {
117  Ack,
118  Nack(NackAction),
119}
120
121#[async_trait]
122pub trait Queue: Send + Sync + Debug + 'static {
123  fn name(&self) -> &'static str;
124
125  fn features(&self) -> QueueFeatures {
126    QueueFeatures::default()
127  }
128
129  async fn publish(&self, message: QueueMessage) -> QueueResult<()>;
130
131  async fn publish_many(&self, items: Vec<QueueMessage>) -> QueueResult<()> {
132    default_batch::publish_many(self, items).await
133  }
134
135  async fn receive(&self) -> QueueResult<QueueDelivery>;
136
137  async fn receive_many(&self, max: usize) -> QueueResult<Vec<QueueDelivery>> {
138    default_batch::receive_many(self, max).await
139  }
140
141  async fn ack(&self, receipt: QueueReceipt) -> QueueResult<()>;
142
143  async fn ack_many(&self, receipts: Vec<QueueReceipt>) -> QueueResult<()> {
144    default_batch::ack_many(self, receipts).await
145  }
146
147  async fn nack(&self, _receipt: QueueReceipt, action: NackAction) -> QueueResult<()> {
148    Err(QueueError::unsupported(self.name(), action.as_str()))
149  }
150
151  async fn nack_many(&self, receipts: Vec<QueueReceipt>, action: NackAction) -> QueueResult<()> {
152    default_batch::nack_many(self, receipts, action).await
153  }
154
155  async fn ping(&self) -> QueueResult<()> {
156    Ok(())
157  }
158}
159
160impl NackAction {
161  pub fn as_str(&self) -> &'static str {
162    match self {
163      NackAction::Requeue => "nack_requeue",
164      NackAction::DeadLetter => "nack_dead_letter",
165      NackAction::Drop => "nack_drop",
166    }
167  }
168}
169
170pub async fn consume_once<Q, F, Fut>(queue: &Q, handler: F) -> QueueResult<()>
171where
172  Q: Queue + ?Sized,
173  F: FnOnce(QueueDelivery) -> Fut + Send,
174  Fut: Future<Output = QueueResult<ConsumeAction>> + Send,
175{
176  let delivery = queue.receive().await?;
177  match handler(delivery.clone()).await? {
178    ConsumeAction::Ack => queue.ack(delivery.receipt).await,
179    ConsumeAction::Nack(action) => queue.nack(delivery.receipt, action).await,
180  }
181}
182
183pub mod default_batch {
184  use super::*;
185
186  pub async fn publish_many<Q: Queue + ?Sized>(queue: &Q, items: Vec<QueueMessage>) -> QueueResult<()> {
187    for item in items {
188      queue.publish(item).await?;
189    }
190    Ok(())
191  }
192
193  pub async fn receive_many<Q: Queue + ?Sized>(queue: &Q, max: usize) -> QueueResult<Vec<QueueDelivery>> {
194    let mut out = Vec::with_capacity(max);
195    for _ in 0..max {
196      out.push(queue.receive().await?);
197    }
198    Ok(out)
199  }
200
201  pub async fn ack_many<Q: Queue + ?Sized>(queue: &Q, receipts: Vec<QueueReceipt>) -> QueueResult<()> {
202    for receipt in receipts {
203      queue.ack(receipt).await?;
204    }
205    Ok(())
206  }
207
208  pub async fn nack_many<Q: Queue + ?Sized>(
209    queue: &Q,
210    receipts: Vec<QueueReceipt>,
211    action: NackAction,
212  ) -> QueueResult<()> {
213    for receipt in receipts {
214      queue.nack(receipt, action).await?;
215    }
216    Ok(())
217  }
218}
219
220#[cfg(test)]
221mod tests {
222  use super::*;
223  use crate::feature::QueueFeature;
224  use std::sync::atomic::{AtomicUsize, Ordering};
225
226  #[derive(Debug, Default)]
227  struct CountingQueue {
228    publishes: AtomicUsize,
229    acks: AtomicUsize,
230  }
231
232  #[async_trait]
233  impl Queue for CountingQueue {
234    fn name(&self) -> &'static str {
235      "counting"
236    }
237
238    async fn publish(&self, _: QueueMessage) -> QueueResult<()> {
239      self.publishes.fetch_add(1, Ordering::Relaxed);
240      Ok(())
241    }
242
243    async fn receive(&self) -> QueueResult<QueueDelivery> {
244      Ok(QueueDelivery::new(QueueMessage::new("topic", Bytes::from_static(b"p")), QueueReceipt::new("r")))
245    }
246
247    async fn ack(&self, _: QueueReceipt) -> QueueResult<()> {
248      self.acks.fetch_add(1, Ordering::Relaxed);
249      Ok(())
250    }
251  }
252
253  #[tokio::test]
254  async fn default_batch_publish_and_ack() {
255    let q = CountingQueue::default();
256    default_batch::publish_many(&q, vec![QueueMessage::new("t", Bytes::from_static(b"a"))]).await.unwrap();
257    assert_eq!(q.publishes.load(Ordering::Relaxed), 1);
258    let delivery = q.receive().await.unwrap();
259    q.ack(delivery.receipt).await.unwrap();
260    assert_eq!(q.acks.load(Ordering::Relaxed), 1);
261  }
262
263  #[test]
264  fn features_default_is_empty() {
265    assert!(QueueFeatures::default().is_empty());
266    assert!(!QueueFeatures::new([QueueFeature::Headers]).is_empty());
267  }
268
269  #[test]
270  fn message_partition_and_dlt_builders() {
271    let dlt = DeadLetterTarget::new("dlt-topic").with_routing_key("rk");
272    let msg = QueueMessage::new("topic", Bytes::from_static(b"p"))
273      .with_partition(2)
274      .with_dead_letter(dlt.clone());
275    assert_eq!(msg.partition, Some(2));
276    assert_eq!(msg.dead_letter.as_ref().map(|d| d.topic.as_str()), Some("dlt-topic"));
277    assert_eq!(msg.dead_letter.as_ref().and_then(|d| d.routing_key.as_deref()), Some("rk"));
278  }
279
280  #[tokio::test]
281  async fn consume_once_ack_path() {
282    #[derive(Debug, Default)]
283    struct TestQueue {
284      acks: AtomicUsize,
285    }
286
287    #[async_trait]
288    impl Queue for TestQueue {
289      fn name(&self) -> &'static str {
290        "test"
291      }
292
293      async fn publish(&self, _: QueueMessage) -> QueueResult<()> {
294        Ok(())
295      }
296
297      async fn receive(&self) -> QueueResult<QueueDelivery> {
298        Ok(QueueDelivery::new(
299          QueueMessage::new("topic", Bytes::from_static(b"payload")),
300          QueueReceipt::new("receipt"),
301        ))
302      }
303
304      async fn ack(&self, _: QueueReceipt) -> QueueResult<()> {
305        self.acks.fetch_add(1, Ordering::Relaxed);
306        Ok(())
307      }
308    }
309
310    let queue = TestQueue::default();
311    consume_once(&queue, |_delivery| async { Ok(ConsumeAction::Ack) }).await.unwrap();
312    assert_eq!(queue.acks.load(Ordering::Relaxed), 1);
313  }
314}