google_cloud_pubsub/subscriber/builder.rs
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::session::Session;
16use super::transport::Transport;
17use std::sync::Arc;
18
19const MIB: i64 = 1024 * 1024;
20
21/// Builder for the `client::Subscriber::streaming_pull` method.
22pub struct StreamingPull {
23 pub(super) inner: Arc<Transport>,
24 pub(super) subscription: String,
25 pub(super) client_id: String,
26 pub(super) ack_deadline_seconds: i32,
27 pub(super) max_outstanding_messages: i64,
28 pub(super) max_outstanding_bytes: i64,
29}
30
31impl StreamingPull {
32 pub(super) fn new(inner: Arc<Transport>, subscription: String, client_id: String) -> Self {
33 Self {
34 inner,
35 subscription,
36 client_id,
37 ack_deadline_seconds: 10,
38 max_outstanding_messages: 1000,
39 max_outstanding_bytes: 100 * MIB,
40 }
41 }
42
43 /// Creates a new session to receive messages from the subscription.
44 ///
45 /// Note that the underlying connection with the server is lazy-initialized.
46 /// It is not established until `Session::next()` is called.
47 ///
48 /// # Example
49 /// ```
50 /// # use google_cloud_pubsub::client::Subscriber;
51 /// # async fn sample(client: Subscriber) -> anyhow::Result<()> {
52 /// let mut session = client
53 /// .streaming_pull("projects/my-project/subscriptions/my-subscription")
54 /// .start();
55 /// while let Some((m, h)) = session.next().await.transpose()? {
56 /// println!("Received message m={m:?}");
57 /// h.ack();
58 /// }
59 /// # Ok(()) }
60 /// ```
61 pub fn start(self) -> Session {
62 Session::new(self)
63 }
64
65 /// Sets the ack deadline to use for the stream.
66 ///
67 /// This value represents how long the application has to ack or nack an
68 /// incoming message. Note that this value is independent of the deadline
69 /// configured on the server-side subscription.
70 ///
71 /// If the server does not hear back from the client within this deadline
72 /// (e.g. if an application crashes), it will resend any unacknowledged
73 /// messages to another subscriber.
74 ///
75 /// The minimum deadline you can specify is 10 seconds. The maximum deadline
76 /// you can specify is 600 seconds (10 minutes).
77 ///
78 /// The default value is 10 seconds.
79 ///
80 /// # Example
81 /// ```
82 /// # use google_cloud_pubsub::client::Subscriber;
83 /// # async fn sample() -> anyhow::Result<()> {
84 /// # let client = Subscriber::builder().build().await?;
85 /// let session = client.streaming_pull("projects/my-project/subscriptions/my-subscription")
86 /// .set_ack_deadline_seconds(20)
87 /// .start();
88 /// # Ok(()) }
89 /// ```
90 pub fn set_ack_deadline_seconds<T: Into<i32>>(mut self, v: T) -> Self {
91 self.ack_deadline_seconds = v.into();
92 self
93 }
94
95 /// Flow control settings for the maximum number of outstanding messages.
96 ///
97 /// The server will stop sending messages to a client when this many
98 /// messages are outstanding (i.e. that have not been acked or nacked).
99 ///
100 /// The server resumes sending messages when the outstanding message count
101 /// drops below this value.
102 ///
103 /// Use a value <= 0 to set no limit on the number of outstanding messages.
104 ///
105 /// The default value is 1000 messages.
106 ///
107 /// # Example
108 /// ```
109 /// # use google_cloud_pubsub::client::Subscriber;
110 /// # async fn sample() -> anyhow::Result<()> {
111 /// # let client = Subscriber::builder().build().await?;
112 /// let session = client.streaming_pull("projects/my-project/subscriptions/my-subscription")
113 /// .set_max_outstanding_messages(2000)
114 /// .start();
115 /// # Ok(()) }
116 /// ```
117 pub fn set_max_outstanding_messages<T: Into<i64>>(mut self, v: T) -> Self {
118 self.max_outstanding_messages = v.into();
119 self
120 }
121
122 /// Flow control settings for the maximum number of outstanding bytes.
123 ///
124 /// The server will stop sending messages to a client when this many bytes
125 /// of messages are outstanding (i.e. that have not been acked or nacked).
126 ///
127 /// The server resumes sending messages when the outstanding byte count
128 /// drops below this value.
129 ///
130 /// Use a value <= 0 to set no limit on the number of outstanding bytes.
131 ///
132 /// The default value is 100 MiB.
133 ///
134 /// # Example
135 /// ```no_rust
136 /// # use google_cloud_pubsub::client::Subscriber;
137 /// # async fn sample() -> anyhow::Result<()> {
138 /// # let client = Subscriber::builder().build().await?;
139 /// const MIB: i64 = 1024 * 1024;
140 /// let session = client.streaming_pull("projects/my-project/subscriptions/my-subscription")
141 /// .set_max_outstanding_bytes(200 * MIB)
142 /// .start();
143 /// # Ok(()) }
144 /// ```
145 pub fn set_max_outstanding_bytes<T: Into<i64>>(mut self, v: T) -> Self {
146 self.max_outstanding_bytes = v.into();
147 self
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use auth::credentials::anonymous::Builder as Anonymous;
155 use gaxi::options::ClientConfig;
156
157 const KIB: i64 = 1024;
158
159 async fn test_inner() -> anyhow::Result<Arc<Transport>> {
160 let mut config = ClientConfig::default();
161 config.cred = Some(Anonymous::new().build());
162 let transport = Transport::new(config).await?;
163 Ok(Arc::new(transport))
164 }
165
166 #[tokio::test]
167 async fn reasonable_defaults() -> anyhow::Result<()> {
168 let builder = StreamingPull::new(
169 test_inner().await?,
170 "projects/my-project/subscriptions/my-subscription".to_string(),
171 "client-id".to_string(),
172 );
173 assert_eq!(
174 builder.subscription,
175 "projects/my-project/subscriptions/my-subscription"
176 );
177 assert_eq!(builder.ack_deadline_seconds, 10);
178 assert!(
179 100_000 > builder.max_outstanding_messages && builder.max_outstanding_messages > 100,
180 "max_outstanding_messages={}",
181 builder.max_outstanding_messages
182 );
183 assert!(
184 builder.max_outstanding_bytes > 100 * KIB,
185 "max_outstanding_bytes={}",
186 builder.max_outstanding_bytes
187 );
188
189 Ok(())
190 }
191
192 #[tokio::test]
193 async fn options() -> anyhow::Result<()> {
194 let builder = StreamingPull::new(
195 test_inner().await?,
196 "projects/my-project/subscriptions/my-subscription".to_string(),
197 "client-id".to_string(),
198 )
199 .set_ack_deadline_seconds(20)
200 .set_max_outstanding_messages(12345)
201 .set_max_outstanding_bytes(6789 * KIB);
202 assert_eq!(
203 builder.subscription,
204 "projects/my-project/subscriptions/my-subscription"
205 );
206 assert_eq!(builder.ack_deadline_seconds, 20);
207 assert_eq!(builder.max_outstanding_messages, 12345);
208 assert_eq!(builder.max_outstanding_bytes, 6789 * KIB);
209
210 Ok(())
211 }
212}