google_cloud_pubsub/subscriber/
handler.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use tokio::sync::mpsc::UnboundedSender;
16
17/// The action an application does with a message.
18#[derive(Debug, PartialEq)]
19pub(super) enum AckResult {
20    Ack(String),
21    Nack(String),
22    // TODO(#3964) - support exactly once acking
23}
24
25/// A handler for acknowledging or rejecting messages.
26#[derive(Debug)]
27#[non_exhaustive]
28pub enum Handler {
29    AtLeastOnce(AtLeastOnce),
30    // TODO(#3964) - support exactly once acking
31}
32
33impl Handler {
34    /// Acknowledge the message associated with this handler.
35    ///
36    /// Note that the acknowledgement is best effort. The message may still be
37    /// redelivered to this client, or another client.
38    pub fn ack(self) {
39        match self {
40            Handler::AtLeastOnce(h) => h.ack(),
41        }
42    }
43
44    /// Rejects the message associated with this handler.
45    ///
46    /// The message will be removed from this `Subscriber`'s lease management.
47    /// The service will redeliver this message, possibly to another client.
48    pub fn nack(self) {
49        match self {
50            Handler::AtLeastOnce(h) => h.nack(),
51        }
52    }
53}
54
55/// A handler for at-least-once delivery.
56#[derive(Debug)]
57pub struct AtLeastOnce {
58    pub(super) ack_id: String,
59    pub(super) ack_tx: UnboundedSender<AckResult>,
60}
61
62impl AtLeastOnce {
63    /// Acknowledge the message associated with this handler.
64    ///
65    /// Note that the acknowledgement is best effort. The message may still be
66    /// redelivered to this client, or another client.
67    pub fn ack(self) {
68        let _ = self.ack_tx.send(AckResult::Ack(self.ack_id));
69    }
70
71    /// Rejects the message associated with this handler.
72    ///
73    /// The message will be removed from this `Subscriber`'s lease management.
74    /// The service will redeliver this message, possibly to another client.
75    pub fn nack(self) {
76        let _ = self.ack_tx.send(AckResult::Nack(self.ack_id));
77    }
78}
79
80#[cfg(test)]
81mod tests {
82    use super::super::lease_state::tests::test_id;
83    use super::*;
84    use tokio::sync::mpsc::error::TryRecvError;
85    use tokio::sync::mpsc::unbounded_channel;
86
87    #[test]
88    fn handler_ack() -> anyhow::Result<()> {
89        let (ack_tx, mut ack_rx) = unbounded_channel();
90        let h = Handler::AtLeastOnce(AtLeastOnce {
91            ack_id: test_id(1),
92            ack_tx,
93        });
94        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
95
96        h.ack();
97        let ack = ack_rx.try_recv()?;
98        assert_eq!(ack, AckResult::Ack(test_id(1)));
99
100        Ok(())
101    }
102
103    #[test]
104    fn handler_nack() -> anyhow::Result<()> {
105        let (ack_tx, mut ack_rx) = unbounded_channel();
106        let h = Handler::AtLeastOnce(AtLeastOnce {
107            ack_id: test_id(1),
108            ack_tx,
109        });
110        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
111
112        h.nack();
113        let ack = ack_rx.try_recv()?;
114        assert_eq!(ack, AckResult::Nack(test_id(1)));
115
116        Ok(())
117    }
118
119    #[test]
120    fn at_least_once_ack() -> anyhow::Result<()> {
121        let (ack_tx, mut ack_rx) = unbounded_channel();
122        let h = AtLeastOnce {
123            ack_id: test_id(1),
124            ack_tx,
125        };
126        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
127
128        h.ack();
129        let ack = ack_rx.try_recv()?;
130        assert_eq!(ack, AckResult::Ack(test_id(1)));
131
132        Ok(())
133    }
134
135    #[test]
136    fn at_least_once_nack() -> anyhow::Result<()> {
137        let (ack_tx, mut ack_rx) = unbounded_channel();
138        let h = AtLeastOnce {
139            ack_id: test_id(1),
140            ack_tx,
141        };
142        assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
143
144        h.nack();
145        let ack = ack_rx.try_recv()?;
146        assert_eq!(ack, AckResult::Nack(test_id(1)));
147
148        Ok(())
149    }
150}