Skip to main content

summer_pubsub/
message.rs

1use bytes::Bytes;
2use google_cloud_pubsub::subscriber::handler::Handler;
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5
6/// Incoming Pub/Sub payload plus shared acknowledgement state
7#[derive(Clone)]
8pub struct Message {
9    pub message_id: String,
10    pub data: Bytes,
11    pub attributes: HashMap<String, String>,
12    ack: Arc<Mutex<Option<Handler>>>,
13}
14
15impl Message {
16    pub(crate) fn new(
17        message_id: String,
18        data: Bytes,
19        attributes: HashMap<String, String>,
20        ack: Arc<Mutex<Option<Handler>>>,
21    ) -> Self {
22        Self {
23            message_id,
24            data,
25            attributes,
26            ack,
27        }
28    }
29
30    /// Acknowledge this message (at-least-once best-effort semantics from the client library).
31    pub fn ack(&self) {
32        if let Ok(mut slot) = self.ack.lock() {
33            if let Some(h) = slot.take() {
34                h.ack();
35            }
36        }
37    }
38
39    /// Negative acknowledgement: the message may be redelivered.
40    pub fn nack(&self) {
41        if let Ok(mut slot) = self.ack.lock() {
42            if let Some(h) = slot.take() {
43                h.nack();
44            }
45        }
46    }
47}
48
49impl Drop for Message {
50    fn drop(&mut self) {
51        if let Ok(mut slot) = self.ack.lock() {
52            if let Some(h) = slot.take() {
53                h.ack();
54            }
55        }
56    }
57}