diidi_travel_common_queue/
queue.rs1use std::collections::BTreeMap;
2use std::fmt::Debug;
3use std::future::Future;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8
9use crate::error::{QueueError, QueueResult};
10use crate::feature::QueueFeatures;
11
12#[derive(Debug, Clone)]
13pub struct QueueMessage {
14 pub topic: String,
15 pub payload: Bytes,
16 pub headers: BTreeMap<String, String>,
17 pub key: Option<String>,
18 pub partition: Option<i32>,
19 pub dead_letter: Option<DeadLetterTarget>,
20 pub attributes: serde_json::Value,
21}
22
23impl QueueMessage {
24 pub fn new(topic: impl Into<String>, payload: impl Into<Bytes>) -> Self {
25 Self {
26 topic: topic.into(),
27 payload: payload.into(),
28 headers: BTreeMap::new(),
29 key: None,
30 partition: None,
31 dead_letter: None,
32 attributes: serde_json::Value::Null,
33 }
34 }
35
36 pub fn with_key(mut self, key: impl Into<String>) -> Self {
37 self.key = Some(key.into());
38 self
39 }
40
41 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
42 self.headers.insert(key.into(), value.into());
43 self
44 }
45
46 pub fn with_partition(mut self, partition: i32) -> Self {
47 self.partition = Some(partition);
48 self
49 }
50
51 pub fn with_dead_letter(mut self, target: DeadLetterTarget) -> Self {
52 self.dead_letter = Some(target);
53 self
54 }
55
56 pub fn with_attributes(mut self, attributes: serde_json::Value) -> Self {
57 self.attributes = attributes;
58 self
59 }
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct DeadLetterTarget {
64 pub topic: String,
65 pub routing_key: Option<String>,
66}
67
68impl DeadLetterTarget {
69 pub fn new(topic: impl Into<String>) -> Self {
70 Self { topic: topic.into(), routing_key: None }
71 }
72
73 pub fn with_routing_key(mut self, routing_key: impl Into<String>) -> Self {
74 self.routing_key = Some(routing_key.into());
75 self
76 }
77}
78
79#[derive(Debug, Clone)]
80pub struct QueueReceipt {
81 pub token: Bytes,
82 pub attributes: serde_json::Value,
83}
84
85impl QueueReceipt {
86 pub fn new(token: impl Into<Bytes>) -> Self {
87 Self { token: token.into(), attributes: serde_json::Value::Null }
88 }
89
90 pub fn with_attributes(mut self, attributes: serde_json::Value) -> Self {
91 self.attributes = attributes;
92 self
93 }
94}
95
96#[derive(Debug, Clone)]
97pub struct QueueDelivery {
98 pub message: QueueMessage,
99 pub receipt: QueueReceipt,
100}
101
102impl QueueDelivery {
103 pub fn new(message: QueueMessage, receipt: QueueReceipt) -> Self {
104 Self { message, receipt }
105 }
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub enum NackAction {
110 Requeue,
111 DeadLetter,
112 Drop,
113}
114
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum ConsumeAction {
117 Ack,
118 Nack(NackAction),
119}
120
121#[async_trait]
122pub trait Queue: Send + Sync + Debug + 'static {
123 fn name(&self) -> &'static str;
124
125 fn features(&self) -> QueueFeatures {
126 QueueFeatures::default()
127 }
128
129 async fn publish(&self, message: QueueMessage) -> QueueResult<()>;
130
131 async fn publish_many(&self, items: Vec<QueueMessage>) -> QueueResult<()> {
132 default_batch::publish_many(self, items).await
133 }
134
135 async fn receive(&self) -> QueueResult<QueueDelivery>;
136
137 async fn receive_many(&self, max: usize) -> QueueResult<Vec<QueueDelivery>> {
138 default_batch::receive_many(self, max).await
139 }
140
141 async fn ack(&self, receipt: QueueReceipt) -> QueueResult<()>;
142
143 async fn ack_many(&self, receipts: Vec<QueueReceipt>) -> QueueResult<()> {
144 default_batch::ack_many(self, receipts).await
145 }
146
147 async fn nack(&self, _receipt: QueueReceipt, action: NackAction) -> QueueResult<()> {
148 Err(QueueError::unsupported(self.name(), action.as_str()))
149 }
150
151 async fn nack_many(&self, receipts: Vec<QueueReceipt>, action: NackAction) -> QueueResult<()> {
152 default_batch::nack_many(self, receipts, action).await
153 }
154
155 async fn ping(&self) -> QueueResult<()> {
156 Ok(())
157 }
158}
159
160impl NackAction {
161 pub fn as_str(&self) -> &'static str {
162 match self {
163 NackAction::Requeue => "nack_requeue",
164 NackAction::DeadLetter => "nack_dead_letter",
165 NackAction::Drop => "nack_drop",
166 }
167 }
168}
169
170pub async fn consume_once<Q, F, Fut>(queue: &Q, handler: F) -> QueueResult<()>
171where
172 Q: Queue + ?Sized,
173 F: FnOnce(QueueDelivery) -> Fut + Send,
174 Fut: Future<Output = QueueResult<ConsumeAction>> + Send,
175{
176 let delivery = queue.receive().await?;
177 match handler(delivery.clone()).await? {
178 ConsumeAction::Ack => queue.ack(delivery.receipt).await,
179 ConsumeAction::Nack(action) => queue.nack(delivery.receipt, action).await,
180 }
181}
182
183pub mod default_batch {
184 use super::*;
185
186 pub async fn publish_many<Q: Queue + ?Sized>(queue: &Q, items: Vec<QueueMessage>) -> QueueResult<()> {
187 for item in items {
188 queue.publish(item).await?;
189 }
190 Ok(())
191 }
192
193 pub async fn receive_many<Q: Queue + ?Sized>(queue: &Q, max: usize) -> QueueResult<Vec<QueueDelivery>> {
194 let mut out = Vec::with_capacity(max);
195 for _ in 0..max {
196 out.push(queue.receive().await?);
197 }
198 Ok(out)
199 }
200
201 pub async fn ack_many<Q: Queue + ?Sized>(queue: &Q, receipts: Vec<QueueReceipt>) -> QueueResult<()> {
202 for receipt in receipts {
203 queue.ack(receipt).await?;
204 }
205 Ok(())
206 }
207
208 pub async fn nack_many<Q: Queue + ?Sized>(
209 queue: &Q,
210 receipts: Vec<QueueReceipt>,
211 action: NackAction,
212 ) -> QueueResult<()> {
213 for receipt in receipts {
214 queue.nack(receipt, action).await?;
215 }
216 Ok(())
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223 use crate::feature::QueueFeature;
224 use std::sync::atomic::{AtomicUsize, Ordering};
225
226 #[derive(Debug, Default)]
227 struct CountingQueue {
228 publishes: AtomicUsize,
229 acks: AtomicUsize,
230 }
231
232 #[async_trait]
233 impl Queue for CountingQueue {
234 fn name(&self) -> &'static str {
235 "counting"
236 }
237
238 async fn publish(&self, _: QueueMessage) -> QueueResult<()> {
239 self.publishes.fetch_add(1, Ordering::Relaxed);
240 Ok(())
241 }
242
243 async fn receive(&self) -> QueueResult<QueueDelivery> {
244 Ok(QueueDelivery::new(QueueMessage::new("topic", Bytes::from_static(b"p")), QueueReceipt::new("r")))
245 }
246
247 async fn ack(&self, _: QueueReceipt) -> QueueResult<()> {
248 self.acks.fetch_add(1, Ordering::Relaxed);
249 Ok(())
250 }
251 }
252
253 #[tokio::test]
254 async fn default_batch_publish_and_ack() {
255 let q = CountingQueue::default();
256 default_batch::publish_many(&q, vec![QueueMessage::new("t", Bytes::from_static(b"a"))]).await.unwrap();
257 assert_eq!(q.publishes.load(Ordering::Relaxed), 1);
258 let delivery = q.receive().await.unwrap();
259 q.ack(delivery.receipt).await.unwrap();
260 assert_eq!(q.acks.load(Ordering::Relaxed), 1);
261 }
262
263 #[test]
264 fn features_default_is_empty() {
265 assert!(QueueFeatures::default().is_empty());
266 assert!(!QueueFeatures::new([QueueFeature::Headers]).is_empty());
267 }
268
269 #[test]
270 fn message_partition_and_dlt_builders() {
271 let dlt = DeadLetterTarget::new("dlt-topic").with_routing_key("rk");
272 let msg = QueueMessage::new("topic", Bytes::from_static(b"p"))
273 .with_partition(2)
274 .with_dead_letter(dlt.clone());
275 assert_eq!(msg.partition, Some(2));
276 assert_eq!(msg.dead_letter.as_ref().map(|d| d.topic.as_str()), Some("dlt-topic"));
277 assert_eq!(msg.dead_letter.as_ref().and_then(|d| d.routing_key.as_deref()), Some("rk"));
278 }
279
280 #[tokio::test]
281 async fn consume_once_ack_path() {
282 #[derive(Debug, Default)]
283 struct TestQueue {
284 acks: AtomicUsize,
285 }
286
287 #[async_trait]
288 impl Queue for TestQueue {
289 fn name(&self) -> &'static str {
290 "test"
291 }
292
293 async fn publish(&self, _: QueueMessage) -> QueueResult<()> {
294 Ok(())
295 }
296
297 async fn receive(&self) -> QueueResult<QueueDelivery> {
298 Ok(QueueDelivery::new(
299 QueueMessage::new("topic", Bytes::from_static(b"payload")),
300 QueueReceipt::new("receipt"),
301 ))
302 }
303
304 async fn ack(&self, _: QueueReceipt) -> QueueResult<()> {
305 self.acks.fetch_add(1, Ordering::Relaxed);
306 Ok(())
307 }
308 }
309
310 let queue = TestQueue::default();
311 consume_once(&queue, |_delivery| async { Ok(ConsumeAction::Ack) }).await.unwrap();
312 assert_eq!(queue.acks.load(Ordering::Relaxed), 1);
313 }
314}