google_cloud_pubsub/subscriber/
builder.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::session::Session;
16use super::transport::Transport;
17use std::sync::Arc;
18
19const MIB: i64 = 1024 * 1024;
20
21/// Builder for the `client::Subscriber::streaming_pull` method.
22pub struct StreamingPull {
23    pub(super) inner: Arc<Transport>,
24    pub(super) subscription: String,
25    pub(super) client_id: String,
26    pub(super) ack_deadline_seconds: i32,
27    pub(super) max_outstanding_messages: i64,
28    pub(super) max_outstanding_bytes: i64,
29}
30
31impl StreamingPull {
32    pub(super) fn new(inner: Arc<Transport>, subscription: String, client_id: String) -> Self {
33        Self {
34            inner,
35            subscription,
36            client_id,
37            ack_deadline_seconds: 10,
38            max_outstanding_messages: 1000,
39            max_outstanding_bytes: 100 * MIB,
40        }
41    }
42
43    /// Creates a new session to receive messages from the subscription.
44    ///
45    /// Note that the underlying connection with the server is lazy-initialized.
46    /// It is not established until `Session::next()` is called.
47    ///
48    /// # Example
49    /// ```
50    /// # use google_cloud_pubsub::client::Subscriber;
51    /// # async fn sample(client: Subscriber) -> anyhow::Result<()> {
52    /// let mut session = client
53    ///     .streaming_pull("projects/my-project/subscriptions/my-subscription")
54    ///     .start();
55    /// while let Some((m, h)) = session.next().await.transpose()? {
56    ///     println!("Received message m={m:?}");
57    ///     h.ack();
58    /// }
59    /// # Ok(()) }
60    /// ```
61    pub fn start(self) -> Session {
62        Session::new(self)
63    }
64
65    /// Sets the ack deadline to use for the stream.
66    ///
67    /// This value represents how long the application has to ack or nack an
68    /// incoming message. Note that this value is independent of the deadline
69    /// configured on the server-side subscription.
70    ///
71    /// If the server does not hear back from the client within this deadline
72    /// (e.g. if an application crashes), it will resend any unacknowledged
73    /// messages to another subscriber.
74    ///
75    /// The minimum deadline you can specify is 10 seconds. The maximum deadline
76    /// you can specify is 600 seconds (10 minutes).
77    ///
78    /// The default value is 10 seconds.
79    ///
80    /// # Example
81    /// ```
82    /// # use google_cloud_pubsub::client::Subscriber;
83    /// # async fn sample() -> anyhow::Result<()> {
84    /// # let client = Subscriber::builder().build().await?;
85    /// let session = client.streaming_pull("projects/my-project/subscriptions/my-subscription")
86    ///     .set_ack_deadline_seconds(20)
87    ///     .start();
88    /// # Ok(()) }
89    /// ```
90    pub fn set_ack_deadline_seconds<T: Into<i32>>(mut self, v: T) -> Self {
91        self.ack_deadline_seconds = v.into();
92        self
93    }
94
95    /// Flow control settings for the maximum number of outstanding messages.
96    ///
97    /// The server will stop sending messages to a client when this many
98    /// messages are outstanding (i.e. that have not been acked or nacked).
99    ///
100    /// The server resumes sending messages when the outstanding message count
101    /// drops below this value.
102    ///
103    /// Use a value <= 0 to set no limit on the number of outstanding messages.
104    ///
105    /// The default value is 1000 messages.
106    ///
107    /// # Example
108    /// ```
109    /// # use google_cloud_pubsub::client::Subscriber;
110    /// # async fn sample() -> anyhow::Result<()> {
111    /// # let client = Subscriber::builder().build().await?;
112    /// let session = client.streaming_pull("projects/my-project/subscriptions/my-subscription")
113    ///     .set_max_outstanding_messages(2000)
114    ///     .start();
115    /// # Ok(()) }
116    /// ```
117    pub fn set_max_outstanding_messages<T: Into<i64>>(mut self, v: T) -> Self {
118        self.max_outstanding_messages = v.into();
119        self
120    }
121
122    /// Flow control settings for the maximum number of outstanding bytes.
123    ///
124    /// The server will stop sending messages to a client when this many bytes
125    /// of messages are outstanding (i.e. that have not been acked or nacked).
126    ///
127    /// The server resumes sending messages when the outstanding byte count
128    /// drops below this value.
129    ///
130    /// Use a value <= 0 to set no limit on the number of outstanding bytes.
131    ///
132    /// The default value is 100 MiB.
133    ///
134    /// # Example
135    /// ```no_rust
136    /// # use google_cloud_pubsub::client::Subscriber;
137    /// # async fn sample() -> anyhow::Result<()> {
138    /// # let client = Subscriber::builder().build().await?;
139    /// const MIB: i64 = 1024 * 1024;
140    /// let session = client.streaming_pull("projects/my-project/subscriptions/my-subscription")
141    ///     .set_max_outstanding_bytes(200 * MIB)
142    ///     .start();
143    /// # Ok(()) }
144    /// ```
145    pub fn set_max_outstanding_bytes<T: Into<i64>>(mut self, v: T) -> Self {
146        self.max_outstanding_bytes = v.into();
147        self
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use auth::credentials::anonymous::Builder as Anonymous;
155    use gaxi::options::ClientConfig;
156
157    const KIB: i64 = 1024;
158
159    async fn test_inner() -> anyhow::Result<Arc<Transport>> {
160        let mut config = ClientConfig::default();
161        config.cred = Some(Anonymous::new().build());
162        let transport = Transport::new(config).await?;
163        Ok(Arc::new(transport))
164    }
165
166    #[tokio::test]
167    async fn reasonable_defaults() -> anyhow::Result<()> {
168        let builder = StreamingPull::new(
169            test_inner().await?,
170            "projects/my-project/subscriptions/my-subscription".to_string(),
171            "client-id".to_string(),
172        );
173        assert_eq!(
174            builder.subscription,
175            "projects/my-project/subscriptions/my-subscription"
176        );
177        assert_eq!(builder.ack_deadline_seconds, 10);
178        assert!(
179            100_000 > builder.max_outstanding_messages && builder.max_outstanding_messages > 100,
180            "max_outstanding_messages={}",
181            builder.max_outstanding_messages
182        );
183        assert!(
184            builder.max_outstanding_bytes > 100 * KIB,
185            "max_outstanding_bytes={}",
186            builder.max_outstanding_bytes
187        );
188
189        Ok(())
190    }
191
192    #[tokio::test]
193    async fn options() -> anyhow::Result<()> {
194        let builder = StreamingPull::new(
195            test_inner().await?,
196            "projects/my-project/subscriptions/my-subscription".to_string(),
197            "client-id".to_string(),
198        )
199        .set_ack_deadline_seconds(20)
200        .set_max_outstanding_messages(12345)
201        .set_max_outstanding_bytes(6789 * KIB);
202        assert_eq!(
203            builder.subscription,
204            "projects/my-project/subscriptions/my-subscription"
205        );
206        assert_eq!(builder.ack_deadline_seconds, 20);
207        assert_eq!(builder.max_outstanding_messages, 12345);
208        assert_eq!(builder.max_outstanding_bytes, 6789 * KIB);
209
210        Ok(())
211    }
212}