Skip to main content

google_cloud_pubsub/subscriber/
client_builder.rs

1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::client::Subscriber;
16use crate::ClientBuilderResult as BuilderResult;
17use gaxi::options::ClientConfig;
18use google_cloud_auth::credentials::Credentials;
19
20// This is to handle large metadata when errors are returned for exactly once delivery.
21const MAX_INBOUND_METADATA_SIZE: u32 = 4 * 1024 * 1024; // 4MB API maximum metadata size
22
23/// A builder for [Subscriber].
24///
25/// # Example
26/// ```
27/// # use google_cloud_pubsub::client::Subscriber;
28/// # async fn sample() -> anyhow::Result<()> {
29/// let builder = Subscriber::builder();
30/// let client = builder
31///     .with_endpoint("https://pubsub.googleapis.com")
32///     .build()
33///     .await?;
34/// # Ok(()) }
35/// ```
36pub struct ClientBuilder {
37    pub(super) config: ClientConfig,
38}
39
40impl ClientBuilder {
41    pub(super) fn new() -> Self {
42        let mut config = ClientConfig::default();
43        config.grpc_max_header_list_size = Some(MAX_INBOUND_METADATA_SIZE);
44        Self { config }
45    }
46
47    /// Creates a new client.
48    ///
49    /// # Example
50    /// ```
51    /// # use google_cloud_pubsub::client::Subscriber;
52    /// # async fn sample() -> anyhow::Result<()> {
53    /// let client = Subscriber::builder().build().await?;
54    /// # Ok(()) }
55    /// ```
56    pub async fn build(self) -> BuilderResult<Subscriber> {
57        Subscriber::new(self).await
58    }
59
60    /// Sets the endpoint.
61    ///
62    /// # Example
63    /// ```
64    /// # use google_cloud_pubsub::client::Subscriber;
65    /// # async fn sample() -> anyhow::Result<()> {
66    /// let client = Subscriber::builder()
67    ///     .with_endpoint("https://private.googleapis.com")
68    ///     .build()
69    ///     .await?;
70    /// # Ok(()) }
71    /// ```
72    pub fn with_endpoint<V: Into<String>>(mut self, v: V) -> Self {
73        self.config.endpoint = Some(v.into());
74        self
75    }
76
77    /// Configures the authentication credentials.
78    ///
79    /// More information about valid credentials types can be found in the
80    /// [google-cloud-auth] crate documentation.
81    ///
82    /// # Example
83    /// ```
84    /// # use google_cloud_pubsub::client::Subscriber;
85    /// # async fn sample() -> anyhow::Result<()> {
86    /// use google_cloud_auth::credentials::mds;
87    /// let client = Subscriber::builder()
88    ///     .with_credentials(
89    ///         mds::Builder::default()
90    ///             .with_scopes(["https://www.googleapis.com/auth/cloud-platform.read-only"])
91    ///             .build()?)
92    ///     .build()
93    ///     .await?;
94    /// # Ok(()) }
95    /// ```
96    ///
97    /// [google-cloud-auth]: https://docs.rs/google-cloud-auth
98    pub fn with_credentials<V: Into<Credentials>>(mut self, v: V) -> Self {
99        self.config.cred = Some(v.into());
100        self
101    }
102
103    /// Configure the number of subchannels used by the client.
104    ///
105    /// # Example
106    /// ```
107    /// # use google_cloud_pubsub::client::Subscriber;
108    /// # async fn sample() -> anyhow::Result<()> {
109    /// let count = std::thread::available_parallelism()?.get();
110    /// let client = Subscriber::builder()
111    ///     .with_grpc_subchannel_count(count)
112    ///     .build()
113    ///     .await?;
114    /// # Ok(()) }
115    /// ```
116    ///
117    /// gRPC-based clients may exhibit high latency if many requests need to be
118    /// demuxed over a single HTTP/2 connection (often called a *subchannel* in
119    /// gRPC).
120    ///
121    /// Consider using more subchannels if your application opens many message
122    /// streams. Consider using fewer subchannels if your application needs the
123    /// file descriptors for other purposes.
124    pub fn with_grpc_subchannel_count(mut self, v: usize) -> Self {
125        self.config.grpc_subchannel_count = Some(v);
126        self
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133    use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
134
135    #[test]
136    fn defaults() {
137        let builder = ClientBuilder::new();
138        assert!(builder.config.endpoint.is_none(), "{:?}", builder.config);
139        assert!(builder.config.cred.is_none(), "{:?}", builder.config);
140        assert!(
141            builder.config.grpc_subchannel_count.is_none(),
142            "{:?}",
143            builder.config
144        );
145        assert_eq!(
146            builder.config.grpc_max_header_list_size,
147            Some(MAX_INBOUND_METADATA_SIZE)
148        );
149    }
150
151    #[test]
152    fn setters() {
153        let builder = ClientBuilder::new()
154            .with_endpoint("test-endpoint.com")
155            .with_credentials(Anonymous::new().build())
156            .with_grpc_subchannel_count(16);
157        assert_eq!(
158            builder.config.endpoint,
159            Some("test-endpoint.com".to_string())
160        );
161        assert!(builder.config.cred.is_some(), "{:?}", builder.config);
162        assert_eq!(builder.config.grpc_subchannel_count, Some(16));
163    }
164}