google_cloud_pubsub/subscriber/
client.rs

1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::builder::StreamingPull;
16use super::client_builder::ClientBuilder;
17use super::transport::Transport;
18use gax::client_builder::Result as BuilderResult;
19use std::sync::Arc;
20
21/// A Subscriber client for the [Cloud Pub/Sub] API.
22///
23/// Use this client to receive messages from a [pull subscription] on a topic.
24///
25/// # Example
26/// ```
27/// # use google_cloud_pubsub::client::Subscriber;
28/// # async fn sample() -> anyhow::Result<()> {
29/// let client = Subscriber::builder().build().await?;
30/// let mut session = client
31///     .streaming_pull("projects/my-project/subscriptions/my-subscription")
32///     .start();
33/// while let Some((m, h)) = session.next().await.transpose()? {
34///     println!("Received message m={m:?}");
35///     h.ack();
36/// }
37/// # Ok(()) }
38/// ```
39///
40/// # Configuration
41///
42/// To configure a `Subscriber` use the `with_*` methods in the type returned by
43/// [builder()][Subscriber::builder]. The default configuration should work for
44/// most applications. Common configuration changes include:
45///
46/// * [with_endpoint()]: by default this client uses the global default endpoint
47///   (`https://pubsub.googleapis.com`). Applications using regional endpoints
48///   or running in restricted networks (e.g. a network configured with
49///   [Private Google Access with VPC Service Controls]) may want to override
50///   this default.
51/// * [with_credentials()]: by default this client uses
52///   [Application Default Credentials]. Applications using custom
53///   authentication may need to override this default.
54///
55/// # Pooling and Cloning
56///
57/// `Subscriber` holds a connection pool internally, it is advised to
58/// create one and then reuse it.  You do not need to wrap `Subscriber` in
59/// an [Rc](std::rc::Rc) or [Arc] to reuse it, because it already uses an `Arc`
60/// internally.
61///
62/// [application default credentials]: https://cloud.google.com/docs/authentication#adc
63/// [cloud pub/sub]: https://docs.cloud.google.com/pubsub/docs/overview
64/// [private google access with vpc service controls]: https://cloud.google.com/vpc-service-controls/docs/private-connectivity
65/// [pull subscription]: https://docs.cloud.google.com/pubsub/docs/pull
66/// [with_endpoint()]: ClientBuilder::with_endpoint
67/// [with_credentials()]: ClientBuilder::with_credentials
68#[derive(Clone, Debug)]
69pub struct Subscriber {
70    inner: Arc<Transport>,
71    client_id: String,
72}
73
74impl Subscriber {
75    /// Returns a builder for [Subscriber].
76    ///
77    /// # Example
78    /// ```
79    /// # use google_cloud_pubsub::client::Subscriber;
80    /// # async fn sample() -> anyhow::Result<()> {
81    /// let client = Subscriber::builder().build().await?;
82    /// # Ok(()) }
83    /// ```
84    pub fn builder() -> ClientBuilder {
85        ClientBuilder::new()
86    }
87
88    /// Receive messages from a [subscription].
89    ///
90    /// The `subscription` is the full name, in the format of
91    /// `projects/*/subscriptions/*`.
92    ///
93    /// # Example
94    /// ```
95    /// # use google_cloud_pubsub::client::Subscriber;
96    /// # async fn sample(client: Subscriber) -> anyhow::Result<()> {
97    /// let mut session = client
98    ///     .streaming_pull("projects/my-project/subscriptions/my-subscription")
99    ///     .start();
100    /// while let Some((m, h)) = session.next().await.transpose()? {
101    ///     println!("Received message m={m:?}");
102    ///     h.ack();
103    /// }
104    /// # Ok(()) }
105    /// ```
106    ///
107    /// [subscription]: https://docs.cloud.google.com/pubsub/docs/subscription-overview
108    pub fn streaming_pull<T>(&self, subscription: T) -> StreamingPull
109    where
110        T: Into<String>,
111    {
112        StreamingPull::new(
113            self.inner.clone(),
114            subscription.into(),
115            self.client_id.clone(),
116        )
117    }
118
119    pub(super) async fn new(builder: ClientBuilder) -> BuilderResult<Self> {
120        let transport = Transport::new(builder.config).await?;
121        Ok(Self {
122            inner: Arc::new(transport),
123            client_id: uuid::Uuid::new_v4().to_string(),
124        })
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131    use auth::credentials::anonymous::Builder as Anonymous;
132    use pubsub_grpc_mock::{MockSubscriber, start};
133
134    #[tokio::test]
135    async fn basic() -> anyhow::Result<()> {
136        let _ = Subscriber::builder().build().await?;
137        Ok(())
138    }
139
140    #[tokio::test]
141    async fn streaming_pull() -> anyhow::Result<()> {
142        let mut mock = MockSubscriber::new();
143        mock.expect_streaming_pull()
144            .return_once(|_| Err(tonic::Status::internal("fail")));
145        let (endpoint, _server) = start("0.0.0.0:0", mock).await?;
146        let client = Subscriber::builder()
147            .with_endpoint(endpoint)
148            .with_credentials(Anonymous::new().build())
149            .build()
150            .await?;
151        let err = client
152            .streaming_pull("projects/p/subscriptions/s")
153            .start()
154            .next()
155            .await
156            .expect("stream should not be empty")
157            .expect_err("the first streamed item should be an error");
158        assert!(err.status().is_some(), "{err:?}");
159        let status = err.status().unwrap();
160        assert_eq!(status.code, gax::error::rpc::Code::Internal);
161        assert_eq!(status.message, "fail");
162
163        Ok(())
164    }
165}