google_cloud_pubsub/subscriber/client.rs
1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::builder::StreamingPull;
16use super::client_builder::ClientBuilder;
17use super::transport::Transport;
18use gax::client_builder::Result as BuilderResult;
19use std::sync::Arc;
20
21/// A Subscriber client for the [Cloud Pub/Sub] API.
22///
23/// Use this client to receive messages from a [pull subscription] on a topic.
24///
25/// # Example
26/// ```
27/// # use google_cloud_pubsub::client::Subscriber;
28/// # async fn sample() -> anyhow::Result<()> {
29/// let client = Subscriber::builder().build().await?;
30/// let mut session = client
31/// .streaming_pull("projects/my-project/subscriptions/my-subscription")
32/// .start();
33/// while let Some((m, h)) = session.next().await.transpose()? {
34/// println!("Received message m={m:?}");
35/// h.ack();
36/// }
37/// # Ok(()) }
38/// ```
39///
40/// # Configuration
41///
42/// To configure a `Subscriber` use the `with_*` methods in the type returned by
43/// [builder()][Subscriber::builder]. The default configuration should work for
44/// most applications. Common configuration changes include:
45///
46/// * [with_endpoint()]: by default this client uses the global default endpoint
47/// (`https://pubsub.googleapis.com`). Applications using regional endpoints
48/// or running in restricted networks (e.g. a network configured with
49/// [Private Google Access with VPC Service Controls]) may want to override
50/// this default.
51/// * [with_credentials()]: by default this client uses
52/// [Application Default Credentials]. Applications using custom
53/// authentication may need to override this default.
54///
55/// # Pooling and Cloning
56///
57/// `Subscriber` holds a connection pool internally, it is advised to
58/// create one and then reuse it. You do not need to wrap `Subscriber` in
59/// an [Rc](std::rc::Rc) or [Arc] to reuse it, because it already uses an `Arc`
60/// internally.
61///
62/// [application default credentials]: https://cloud.google.com/docs/authentication#adc
63/// [cloud pub/sub]: https://docs.cloud.google.com/pubsub/docs/overview
64/// [private google access with vpc service controls]: https://cloud.google.com/vpc-service-controls/docs/private-connectivity
65/// [pull subscription]: https://docs.cloud.google.com/pubsub/docs/pull
66/// [with_endpoint()]: ClientBuilder::with_endpoint
67/// [with_credentials()]: ClientBuilder::with_credentials
68#[derive(Clone, Debug)]
69pub struct Subscriber {
70 inner: Arc<Transport>,
71 client_id: String,
72}
73
74impl Subscriber {
75 /// Returns a builder for [Subscriber].
76 ///
77 /// # Example
78 /// ```
79 /// # use google_cloud_pubsub::client::Subscriber;
80 /// # async fn sample() -> anyhow::Result<()> {
81 /// let client = Subscriber::builder().build().await?;
82 /// # Ok(()) }
83 /// ```
84 pub fn builder() -> ClientBuilder {
85 ClientBuilder::new()
86 }
87
88 /// Receive messages from a [subscription].
89 ///
90 /// The `subscription` is the full name, in the format of
91 /// `projects/*/subscriptions/*`.
92 ///
93 /// # Example
94 /// ```
95 /// # use google_cloud_pubsub::client::Subscriber;
96 /// # async fn sample(client: Subscriber) -> anyhow::Result<()> {
97 /// let mut session = client
98 /// .streaming_pull("projects/my-project/subscriptions/my-subscription")
99 /// .start();
100 /// while let Some((m, h)) = session.next().await.transpose()? {
101 /// println!("Received message m={m:?}");
102 /// h.ack();
103 /// }
104 /// # Ok(()) }
105 /// ```
106 ///
107 /// [subscription]: https://docs.cloud.google.com/pubsub/docs/subscription-overview
108 pub fn streaming_pull<T>(&self, subscription: T) -> StreamingPull
109 where
110 T: Into<String>,
111 {
112 StreamingPull::new(
113 self.inner.clone(),
114 subscription.into(),
115 self.client_id.clone(),
116 )
117 }
118
119 pub(super) async fn new(builder: ClientBuilder) -> BuilderResult<Self> {
120 let transport = Transport::new(builder.config).await?;
121 Ok(Self {
122 inner: Arc::new(transport),
123 client_id: uuid::Uuid::new_v4().to_string(),
124 })
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131 use auth::credentials::anonymous::Builder as Anonymous;
132 use pubsub_grpc_mock::{MockSubscriber, start};
133
134 #[tokio::test]
135 async fn basic() -> anyhow::Result<()> {
136 let _ = Subscriber::builder().build().await?;
137 Ok(())
138 }
139
140 #[tokio::test]
141 async fn streaming_pull() -> anyhow::Result<()> {
142 let mut mock = MockSubscriber::new();
143 mock.expect_streaming_pull()
144 .return_once(|_| Err(tonic::Status::internal("fail")));
145 let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
146 let client = Subscriber::builder()
147 .with_endpoint(endpoint)
148 .with_credentials(Anonymous::new().build())
149 .build()
150 .await?;
151 let err = client
152 .streaming_pull("projects/p/subscriptions/s")
153 .start()
154 .next()
155 .await
156 .expect("stream should not be empty")
157 .expect_err("the first streamed item should be an error");
158 assert!(err.status().is_some(), "{err:?}");
159 let status = err.status().unwrap();
160 assert_eq!(status.code, gax::error::rpc::Code::Internal);
161 assert_eq!(status.message, "fail");
162
163 Ok(())
164 }
165}