google_cloud_pubsub/subscriber/builder.rs
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::MessageStream;
16use super::ShutdownBehavior;
17use super::transport::Transport;
18use std::sync::Arc;
19use std::time::Duration;
20
21const MIB: i64 = 1024 * 1024;
22
23pub use super::client_builder::ClientBuilder;
24
25/// Builder for the [`client::Subscriber::subscribe`][crate::client::Subscriber::subscribe] method.
26pub struct Subscribe {
27 pub(super) inner: Arc<Transport>,
28 pub(super) subscription: String,
29 pub(super) client_id: String,
30 pub(super) grpc_subchannel_count: usize,
31 pub(super) ack_deadline_seconds: i32,
32 pub(super) max_lease: Duration,
33 pub(super) max_outstanding_messages: i64,
34 pub(super) max_outstanding_bytes: i64,
35 pub(super) shutdown_behavior: ShutdownBehavior,
36}
37
38impl Subscribe {
39 pub(super) fn new(
40 inner: Arc<Transport>,
41 subscription: String,
42 client_id: String,
43 grpc_subchannel_count: usize,
44 ) -> Self {
45 Self {
46 inner,
47 subscription,
48 client_id,
49 grpc_subchannel_count,
50 ack_deadline_seconds: 60,
51 max_lease: Duration::from_secs(60 * 60),
52 max_outstanding_messages: 1000,
53 max_outstanding_bytes: 100 * MIB,
54 shutdown_behavior: ShutdownBehavior::WaitForProcessing,
55 }
56 }
57
58 /// Creates a new stream to receive messages from the subscription.
59 ///
60 /// # Example
61 /// ```
62 /// # use google_cloud_pubsub::client::Subscriber;
63 /// # async fn sample(client: Subscriber) -> anyhow::Result<()> {
64 /// let mut stream = client
65 /// .subscribe("projects/my-project/subscriptions/my-subscription")
66 /// .build();
67 /// while let Some((m, h)) = stream.next().await.transpose()? {
68 /// println!("Received message m={m:?}");
69 /// h.ack();
70 /// }
71 /// # Ok(()) }
72 /// ```
73 ///
74 /// Note that the underlying connection with the server is lazy-initialized.
75 /// It is not established until [`MessageStream::next()`] is called.
76 pub fn build(self) -> MessageStream {
77 MessageStream::new(self)
78 }
79
80 /// Sets the maximum lease deadline for a message.
81 ///
82 /// # Example
83 /// ```
84 /// # use google_cloud_pubsub::client::Subscriber;
85 /// # use std::time::Duration;
86 /// # async fn sample() -> anyhow::Result<()> {
87 /// # let client = Subscriber::builder().build().await?;
88 /// let stream = client.subscribe("projects/my-project/subscriptions/my-subscription")
89 /// .set_max_lease(Duration::from_secs(3600))
90 /// .build();
91 /// # Ok(()) }
92 /// ```
93 ///
94 /// The client holds a message for at most this amount. After a message has
95 /// been held for this long, the client will stop extending its lease.
96 ///
97 /// The default value is 60 minutes. If it takes your application longer
98 /// than 60 minutes to process a message, you should increase this value.
99 pub fn set_max_lease<T: Into<Duration>>(mut self, v: T) -> Self {
100 self.max_lease = v.into();
101 self
102 }
103
104 /// Sets the maximum duration to extend lease deadlines by.
105 ///
106 /// # Example
107 /// ```
108 /// # use google_cloud_pubsub::client::Subscriber;
109 /// # use std::time::Duration;
110 /// # async fn sample() -> anyhow::Result<()> {
111 /// # let client = Subscriber::builder().build().await?;
112 /// let stream = client.subscribe("projects/my-project/subscriptions/my-subscription")
113 /// .set_max_lease_extension(Duration::from_secs(20))
114 /// .build();
115 /// # Ok(()) }
116 /// ```
117 ///
118 /// The client extends lease deadlines by at most this amount.
119 ///
120 /// If the server does not hear back from the client within this deadline
121 /// (e.g. if an application crashes), it will resend any unacknowledged
122 /// messages to another subscriber.
123 ///
124 /// Note that this value is independent of the ack deadline configured on
125 /// the subscription.
126 ///
127 /// The minimum deadline you can specify is 10 seconds. The maximum deadline
128 /// you can specify is 10 minutes. The client clamps the supplied value to
129 /// this range.
130 ///
131 /// The default value is 60 seconds.
132 pub fn set_max_lease_extension<T: Into<Duration>>(mut self, v: T) -> Self {
133 self.ack_deadline_seconds = v.into().as_secs().clamp(10, 600) as i32;
134 self
135 }
136
137 /// Flow control settings for the maximum number of outstanding messages.
138 ///
139 /// # Example
140 /// ```
141 /// # use google_cloud_pubsub::client::Subscriber;
142 /// # async fn sample() -> anyhow::Result<()> {
143 /// # let client = Subscriber::builder().build().await?;
144 /// let stream = client.subscribe("projects/my-project/subscriptions/my-subscription")
145 /// .set_max_outstanding_messages(2000)
146 /// .build();
147 /// # Ok(()) }
148 /// ```
149 ///
150 /// The server will stop sending messages to a client when this many
151 /// messages are outstanding (i.e. that have not been acked). The server
152 /// resumes sending messages when the outstanding message count drops below
153 /// this value.
154 ///
155 /// The limit applies per-stream. It is not a global limit.
156 ///
157 /// Use a value <= 0 to set no limit on the number of outstanding messages.
158 ///
159 /// The default value is 1000 messages.
160 pub fn set_max_outstanding_messages<T: Into<i64>>(mut self, v: T) -> Self {
161 self.max_outstanding_messages = v.into();
162 self
163 }
164
165 /// Flow control settings for the maximum number of outstanding bytes.
166 ///
167 /// # Example
168 /// ```
169 /// # use google_cloud_pubsub::client::Subscriber;
170 /// # async fn sample() -> anyhow::Result<()> {
171 /// # let client = Subscriber::builder().build().await?;
172 /// const MIB: i64 = 1024 * 1024;
173 /// let stream = client.subscribe("projects/my-project/subscriptions/my-subscription")
174 /// .set_max_outstanding_bytes(200 * MIB)
175 /// .build();
176 /// # Ok(()) }
177 /// ```
178 ///
179 /// The server will stop sending messages to a client when this many bytes
180 /// of messages are outstanding (i.e. that have not been acked). The server
181 /// resumes sending messages when the outstanding byte count drops below
182 /// this value.
183 ///
184 /// The limit applies per-stream. It is not a global limit.
185 ///
186 /// Use a value <= 0 to set no limit on the number of outstanding bytes.
187 ///
188 /// The default value is 100 MiB.
189 pub fn set_max_outstanding_bytes<T: Into<i64>>(mut self, v: T) -> Self {
190 self.max_outstanding_bytes = v.into();
191 self
192 }
193
194 /// Sets the shutdown behavior for the stream.
195 ///
196 /// # Example
197 /// ```
198 /// # use google_cloud_pubsub::client::Subscriber;
199 /// # async fn sample() -> anyhow::Result<()> {
200 /// # let client = Subscriber::builder().build().await?;
201 /// use google_cloud_pubsub::subscriber::ShutdownBehavior::NackImmediately;
202 /// let stream = client.subscribe("projects/my-project/subscriptions/my-subscription")
203 /// .set_shutdown_behavior(NackImmediately)
204 /// .build();
205 /// # Ok(()) }
206 /// ```
207 ///
208 /// The default behavior is [`WaitForProcessing`][wait].
209 ///
210 /// [wait]: crate::subscriber::ShutdownBehavior::WaitForProcessing
211 pub fn set_shutdown_behavior(mut self, v: ShutdownBehavior) -> Self {
212 self.shutdown_behavior = v;
213 self
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use gaxi::options::ClientConfig;
221 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
222 use test_case::test_case;
223
224 const KIB: i64 = 1024;
225
226 async fn test_inner() -> anyhow::Result<Arc<Transport>> {
227 let mut config = ClientConfig::default();
228 config.cred = Some(Anonymous::new().build());
229 let transport = Transport::new(config).await?;
230 Ok(Arc::new(transport))
231 }
232
233 #[tokio::test]
234 async fn reasonable_defaults() -> anyhow::Result<()> {
235 let builder = Subscribe::new(
236 test_inner().await?,
237 "projects/my-project/subscriptions/my-subscription".to_string(),
238 "client-id".to_string(),
239 1_usize,
240 );
241 assert_eq!(
242 builder.subscription,
243 "projects/my-project/subscriptions/my-subscription"
244 );
245 assert_eq!(builder.grpc_subchannel_count, 1);
246 assert_eq!(builder.ack_deadline_seconds, 60);
247 assert!(
248 builder.max_lease >= Duration::from_secs(300),
249 "max_lease={:?}",
250 builder.max_lease
251 );
252 assert!(
253 100_000 > builder.max_outstanding_messages && builder.max_outstanding_messages > 100,
254 "max_outstanding_messages={}",
255 builder.max_outstanding_messages
256 );
257 assert!(
258 builder.max_outstanding_bytes > 100 * KIB,
259 "max_outstanding_bytes={}",
260 builder.max_outstanding_bytes
261 );
262 assert_eq!(
263 builder.shutdown_behavior,
264 ShutdownBehavior::WaitForProcessing
265 );
266
267 Ok(())
268 }
269
270 #[tokio::test]
271 async fn options() -> anyhow::Result<()> {
272 let builder = Subscribe::new(
273 test_inner().await?,
274 "projects/my-project/subscriptions/my-subscription".to_string(),
275 "client-id".to_string(),
276 2_usize,
277 )
278 .set_max_lease(Duration::from_secs(3600))
279 .set_max_lease_extension(Duration::from_secs(20))
280 .set_max_outstanding_messages(12345)
281 .set_max_outstanding_bytes(6789 * KIB)
282 .set_shutdown_behavior(ShutdownBehavior::NackImmediately);
283 assert_eq!(
284 builder.subscription,
285 "projects/my-project/subscriptions/my-subscription"
286 );
287 assert_eq!(builder.grpc_subchannel_count, 2);
288 assert_eq!(builder.max_lease, Duration::from_secs(3600));
289 assert_eq!(builder.ack_deadline_seconds, 20);
290 assert_eq!(builder.max_outstanding_messages, 12345);
291 assert_eq!(builder.max_outstanding_bytes, 6789 * KIB);
292 assert_eq!(builder.shutdown_behavior, ShutdownBehavior::NackImmediately);
293
294 Ok(())
295 }
296
297 #[test_case(Duration::ZERO, 10)]
298 #[test_case(Duration::from_secs(42), 42)]
299 #[test_case(Duration::from_secs(4200), 600)]
300 #[tokio::test]
301 async fn clamp_ack_deadline(v: Duration, want: i32) -> anyhow::Result<()> {
302 let builder = Subscribe::new(
303 test_inner().await?,
304 "projects/my-project/subscriptions/my-subscription".to_string(),
305 "client-id".to_string(),
306 1_usize,
307 )
308 .set_max_lease_extension(v);
309 assert_eq!(builder.ack_deadline_seconds, want);
310
311 Ok(())
312 }
313}