Skip to main content

google_cloud_pubsub/subscriber/
builder.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::MessageStream;
16use super::ShutdownBehavior;
17use super::transport::Transport;
18use std::sync::Arc;
19use std::time::Duration;
20
21const MIB: i64 = 1024 * 1024;
22
23pub use super::client_builder::ClientBuilder;
24
25/// Builder for the [`client::Subscriber::subscribe`][crate::client::Subscriber::subscribe] method.
26pub struct Subscribe {
27    pub(super) inner: Arc<Transport>,
28    pub(super) subscription: String,
29    pub(super) client_id: String,
30    pub(super) grpc_subchannel_count: usize,
31    pub(super) ack_deadline_seconds: i32,
32    pub(super) max_lease: Duration,
33    pub(super) max_outstanding_messages: i64,
34    pub(super) max_outstanding_bytes: i64,
35    pub(super) shutdown_behavior: ShutdownBehavior,
36}
37
38impl Subscribe {
39    pub(super) fn new(
40        inner: Arc<Transport>,
41        subscription: String,
42        client_id: String,
43        grpc_subchannel_count: usize,
44    ) -> Self {
45        Self {
46            inner,
47            subscription,
48            client_id,
49            grpc_subchannel_count,
50            ack_deadline_seconds: 60,
51            max_lease: Duration::from_secs(60 * 60),
52            max_outstanding_messages: 1000,
53            max_outstanding_bytes: 100 * MIB,
54            shutdown_behavior: ShutdownBehavior::WaitForProcessing,
55        }
56    }
57
58    /// Creates a new stream to receive messages from the subscription.
59    ///
60    /// # Example
61    /// ```
62    /// # use google_cloud_pubsub::client::Subscriber;
63    /// # async fn sample(client: Subscriber) -> anyhow::Result<()> {
64    /// let mut stream = client
65    ///     .subscribe("projects/my-project/subscriptions/my-subscription")
66    ///     .build();
67    /// while let Some((m, h)) = stream.next().await.transpose()? {
68    ///     println!("Received message m={m:?}");
69    ///     h.ack();
70    /// }
71    /// # Ok(()) }
72    /// ```
73    ///
74    /// Note that the underlying connection with the server is lazy-initialized.
75    /// It is not established until [`MessageStream::next()`] is called.
76    pub fn build(self) -> MessageStream {
77        MessageStream::new(self)
78    }
79
80    /// Sets the maximum lease deadline for a message.
81    ///
82    /// # Example
83    /// ```
84    /// # use google_cloud_pubsub::client::Subscriber;
85    /// # use std::time::Duration;
86    /// # async fn sample() -> anyhow::Result<()> {
87    /// # let client = Subscriber::builder().build().await?;
88    /// let stream = client.subscribe("projects/my-project/subscriptions/my-subscription")
89    ///     .set_max_lease(Duration::from_secs(3600))
90    ///     .build();
91    /// # Ok(()) }
92    /// ```
93    ///
94    /// The client holds a message for at most this amount. After a message has
95    /// been held for this long, the client will stop extending its lease.
96    ///
97    /// The default value is 60 minutes. If it takes your application longer
98    /// than 60 minutes to process a message, you should increase this value.
99    pub fn set_max_lease<T: Into<Duration>>(mut self, v: T) -> Self {
100        self.max_lease = v.into();
101        self
102    }
103
104    /// Sets the maximum duration to extend lease deadlines by.
105    ///
106    /// # Example
107    /// ```
108    /// # use google_cloud_pubsub::client::Subscriber;
109    /// # use std::time::Duration;
110    /// # async fn sample() -> anyhow::Result<()> {
111    /// # let client = Subscriber::builder().build().await?;
112    /// let stream = client.subscribe("projects/my-project/subscriptions/my-subscription")
113    ///     .set_max_lease_extension(Duration::from_secs(20))
114    ///     .build();
115    /// # Ok(()) }
116    /// ```
117    ///
118    /// The client extends lease deadlines by at most this amount.
119    ///
120    /// If the server does not hear back from the client within this deadline
121    /// (e.g. if an application crashes), it will resend any unacknowledged
122    /// messages to another subscriber.
123    ///
124    /// Note that this value is independent of the ack deadline configured on
125    /// the subscription.
126    ///
127    /// The minimum deadline you can specify is 10 seconds. The maximum deadline
128    /// you can specify is 10 minutes. The client clamps the supplied value to
129    /// this range.
130    ///
131    /// The default value is 60 seconds.
132    pub fn set_max_lease_extension<T: Into<Duration>>(mut self, v: T) -> Self {
133        self.ack_deadline_seconds = v.into().as_secs().clamp(10, 600) as i32;
134        self
135    }
136
137    /// Flow control settings for the maximum number of outstanding messages.
138    ///
139    /// # Example
140    /// ```
141    /// # use google_cloud_pubsub::client::Subscriber;
142    /// # async fn sample() -> anyhow::Result<()> {
143    /// # let client = Subscriber::builder().build().await?;
144    /// let stream = client.subscribe("projects/my-project/subscriptions/my-subscription")
145    ///     .set_max_outstanding_messages(2000)
146    ///     .build();
147    /// # Ok(()) }
148    /// ```
149    ///
150    /// The server will stop sending messages to a client when this many
151    /// messages are outstanding (i.e. that have not been acked). The server
152    /// resumes sending messages when the outstanding message count drops below
153    /// this value.
154    ///
155    /// The limit applies per-stream. It is not a global limit.
156    ///
157    /// Use a value <= 0 to set no limit on the number of outstanding messages.
158    ///
159    /// The default value is 1000 messages.
160    pub fn set_max_outstanding_messages<T: Into<i64>>(mut self, v: T) -> Self {
161        self.max_outstanding_messages = v.into();
162        self
163    }
164
165    /// Flow control settings for the maximum number of outstanding bytes.
166    ///
167    /// # Example
168    /// ```
169    /// # use google_cloud_pubsub::client::Subscriber;
170    /// # async fn sample() -> anyhow::Result<()> {
171    /// # let client = Subscriber::builder().build().await?;
172    /// const MIB: i64 = 1024 * 1024;
173    /// let stream = client.subscribe("projects/my-project/subscriptions/my-subscription")
174    ///     .set_max_outstanding_bytes(200 * MIB)
175    ///     .build();
176    /// # Ok(()) }
177    /// ```
178    ///
179    /// The server will stop sending messages to a client when this many bytes
180    /// of messages are outstanding (i.e. that have not been acked). The server
181    /// resumes sending messages when the outstanding byte count drops below
182    /// this value.
183    ///
184    /// The limit applies per-stream. It is not a global limit.
185    ///
186    /// Use a value <= 0 to set no limit on the number of outstanding bytes.
187    ///
188    /// The default value is 100 MiB.
189    pub fn set_max_outstanding_bytes<T: Into<i64>>(mut self, v: T) -> Self {
190        self.max_outstanding_bytes = v.into();
191        self
192    }
193
194    /// Sets the shutdown behavior for the stream.
195    ///
196    /// # Example
197    /// ```
198    /// # use google_cloud_pubsub::client::Subscriber;
199    /// # async fn sample() -> anyhow::Result<()> {
200    /// # let client = Subscriber::builder().build().await?;
201    /// use google_cloud_pubsub::subscriber::ShutdownBehavior::NackImmediately;
202    /// let stream = client.subscribe("projects/my-project/subscriptions/my-subscription")
203    ///     .set_shutdown_behavior(NackImmediately)
204    ///     .build();
205    /// # Ok(()) }
206    /// ```
207    ///
208    /// The default behavior is [`WaitForProcessing`][wait].
209    ///
210    /// [wait]: crate::subscriber::ShutdownBehavior::WaitForProcessing
211    pub fn set_shutdown_behavior(mut self, v: ShutdownBehavior) -> Self {
212        self.shutdown_behavior = v;
213        self
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use gaxi::options::ClientConfig;
221    use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
222    use test_case::test_case;
223
224    const KIB: i64 = 1024;
225
226    async fn test_inner() -> anyhow::Result<Arc<Transport>> {
227        let mut config = ClientConfig::default();
228        config.cred = Some(Anonymous::new().build());
229        let transport = Transport::new(config).await?;
230        Ok(Arc::new(transport))
231    }
232
233    #[tokio::test]
234    async fn reasonable_defaults() -> anyhow::Result<()> {
235        let builder = Subscribe::new(
236            test_inner().await?,
237            "projects/my-project/subscriptions/my-subscription".to_string(),
238            "client-id".to_string(),
239            1_usize,
240        );
241        assert_eq!(
242            builder.subscription,
243            "projects/my-project/subscriptions/my-subscription"
244        );
245        assert_eq!(builder.grpc_subchannel_count, 1);
246        assert_eq!(builder.ack_deadline_seconds, 60);
247        assert!(
248            builder.max_lease >= Duration::from_secs(300),
249            "max_lease={:?}",
250            builder.max_lease
251        );
252        assert!(
253            100_000 > builder.max_outstanding_messages && builder.max_outstanding_messages > 100,
254            "max_outstanding_messages={}",
255            builder.max_outstanding_messages
256        );
257        assert!(
258            builder.max_outstanding_bytes > 100 * KIB,
259            "max_outstanding_bytes={}",
260            builder.max_outstanding_bytes
261        );
262        assert_eq!(
263            builder.shutdown_behavior,
264            ShutdownBehavior::WaitForProcessing
265        );
266
267        Ok(())
268    }
269
270    #[tokio::test]
271    async fn options() -> anyhow::Result<()> {
272        let builder = Subscribe::new(
273            test_inner().await?,
274            "projects/my-project/subscriptions/my-subscription".to_string(),
275            "client-id".to_string(),
276            2_usize,
277        )
278        .set_max_lease(Duration::from_secs(3600))
279        .set_max_lease_extension(Duration::from_secs(20))
280        .set_max_outstanding_messages(12345)
281        .set_max_outstanding_bytes(6789 * KIB)
282        .set_shutdown_behavior(ShutdownBehavior::NackImmediately);
283        assert_eq!(
284            builder.subscription,
285            "projects/my-project/subscriptions/my-subscription"
286        );
287        assert_eq!(builder.grpc_subchannel_count, 2);
288        assert_eq!(builder.max_lease, Duration::from_secs(3600));
289        assert_eq!(builder.ack_deadline_seconds, 20);
290        assert_eq!(builder.max_outstanding_messages, 12345);
291        assert_eq!(builder.max_outstanding_bytes, 6789 * KIB);
292        assert_eq!(builder.shutdown_behavior, ShutdownBehavior::NackImmediately);
293
294        Ok(())
295    }
296
297    #[test_case(Duration::ZERO, 10)]
298    #[test_case(Duration::from_secs(42), 42)]
299    #[test_case(Duration::from_secs(4200), 600)]
300    #[tokio::test]
301    async fn clamp_ack_deadline(v: Duration, want: i32) -> anyhow::Result<()> {
302        let builder = Subscribe::new(
303            test_inner().await?,
304            "projects/my-project/subscriptions/my-subscription".to_string(),
305            "client-id".to_string(),
306            1_usize,
307        )
308        .set_max_lease_extension(v);
309        assert_eq!(builder.ack_deadline_seconds, want);
310
311        Ok(())
312    }
313}