Skip to main content

gcloud_pubsub/
client.rs

1use std::env::var;
2
3use google_cloud_gax::conn::{ConnectionOptions, Environment};
4use google_cloud_gax::grpc::Status;
5use google_cloud_gax::retry::RetrySetting;
6use google_cloud_googleapis::pubsub::v1::{
7    DetachSubscriptionRequest, ListSnapshotsRequest, ListSubscriptionsRequest, ListTopicsRequest, Snapshot,
8};
9use token_source::NoopTokenSourceProvider;
10
11use crate::apiv1::conn_pool::{ConnectionManager, PUBSUB};
12use crate::apiv1::publisher_client::PublisherClient;
13use crate::apiv1::subscriber_client::SubscriberClient;
14use crate::subscription::{Subscription, SubscriptionConfig};
15use crate::topic::{Topic, TopicConfig};
16
17#[derive(Debug)]
18pub struct ClientConfig {
19    /// gRPC channel pool size
20    pub pool_size: usize,
21    /// Pub/Sub project_id
22    pub project_id: Option<String>,
23    /// Runtime project info
24    pub environment: Environment,
25    /// Overriding service endpoint
26    pub endpoint: String,
27    /// gRPC connection option
28    pub connection_option: ConnectionOptions,
29}
30
31/// ClientConfigs created by default will prefer to use `PUBSUB_EMULATOR_HOST`
32impl Default for ClientConfig {
33    fn default() -> Self {
34        let emulator = var("PUBSUB_EMULATOR_HOST").ok();
35        let default_project_id = emulator.as_ref().map(|_| "local-project".to_string());
36        Self {
37            pool_size: 4,
38            environment: match emulator {
39                Some(v) => Environment::Emulator(v),
40                None => Environment::GoogleCloud(Box::new(NoopTokenSourceProvider {})),
41            },
42            project_id: default_project_id,
43            endpoint: PUBSUB.to_string(),
44            connection_option: ConnectionOptions::default(),
45        }
46    }
47}
48
49#[cfg(feature = "auth")]
50pub use google_cloud_auth;
51
52#[cfg(feature = "auth")]
53impl ClientConfig {
54    pub async fn with_auth(mut self) -> Result<Self, google_cloud_auth::error::Error> {
55        if let Environment::GoogleCloud(_) = self.environment {
56            let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::auth_config()).await?;
57            self.project_id = self.project_id.or(ts.project_id.clone());
58            self.environment = Environment::GoogleCloud(Box::new(ts))
59        }
60        Ok(self)
61    }
62
63    pub async fn with_credentials(
64        mut self,
65        credentials: google_cloud_auth::credentials::CredentialsFile,
66    ) -> Result<Self, google_cloud_auth::error::Error> {
67        if let Environment::GoogleCloud(_) = self.environment {
68            let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
69                Self::auth_config(),
70                Box::new(credentials),
71            )
72            .await?;
73            self.project_id = self.project_id.or(ts.project_id.clone());
74            self.environment = Environment::GoogleCloud(Box::new(ts))
75        }
76        Ok(self)
77    }
78
79    fn auth_config() -> google_cloud_auth::project::Config<'static> {
80        google_cloud_auth::project::Config::default()
81            .with_audience(crate::apiv1::conn_pool::AUDIENCE)
82            .with_scopes(&crate::apiv1::conn_pool::SCOPES)
83    }
84}
85
86#[derive(thiserror::Error, Debug)]
87pub enum Error {
88    #[error(transparent)]
89    GAX(#[from] google_cloud_gax::conn::Error),
90    #[error("Project ID was not found")]
91    ProjectIdNotFound,
92}
93
94/// Client is a Google Pub/Sub client scoped to a single project.
95///
96/// Clients should be reused rather than being created as needed.
97/// A Client may be shared by multiple tasks.
98#[derive(Clone, Debug)]
99pub struct Client {
100    project_id: String,
101    pubc: PublisherClient,
102    subc: SubscriberClient,
103}
104
105impl Client {
106    /// new creates a Pub/Sub client. See [`ClientConfig`] for more information.
107    pub async fn new(config: ClientConfig) -> Result<Self, Error> {
108        let pubc = PublisherClient::new(
109            ConnectionManager::new(
110                config.pool_size,
111                config.endpoint.as_str(),
112                &config.environment,
113                &config.connection_option,
114            )
115            .await?,
116        );
117        let subc = SubscriberClient::new(
118            ConnectionManager::new(
119                config.pool_size,
120                config.endpoint.as_str(),
121                &config.environment,
122                &config.connection_option,
123            )
124            .await?,
125            ConnectionManager::new(
126                config.pool_size,
127                config.endpoint.as_str(),
128                &config.environment,
129                &config.connection_option,
130            )
131            .await?,
132        );
133        Ok(Self {
134            project_id: config.project_id.ok_or(Error::ProjectIdNotFound)?,
135            pubc,
136            subc,
137        })
138    }
139
140    /// create_subscription creates a new subscription on a topic.
141    ///
142    /// id is the name of the subscription to create. It must start with a letter,
143    /// and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-),
144    /// underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It
145    /// must be between 3 and 255 characters in length, and must not start with
146    /// "goog".
147    ///
148    /// cfg.ack_deadline is the maximum time after a subscriber receives a message before
149    /// the subscriber should acknowledge the message. It must be between 10 and 600
150    /// seconds (inclusive), and is rounded down to the nearest second. If the
151    /// provided ackDeadline is 0, then the default value of 10 seconds is used.
152    /// Note: messages which are obtained via Subscription.Receive need not be
153    /// acknowledged within this deadline, as the deadline will be automatically
154    /// extended.
155    ///
156    /// cfg.push_config may be set to configure this subscription for push delivery.
157    ///
158    /// If the subscription already exists an error will be returned.
159    pub async fn create_subscription(
160        &self,
161        id: &str,
162        topic_id: &str,
163        cfg: SubscriptionConfig,
164        retry: Option<RetrySetting>,
165    ) -> Result<Subscription, Status> {
166        let subscription = self.subscription(id);
167        subscription
168            .create(self.fully_qualified_topic_name(topic_id).as_str(), cfg, retry)
169            .await
170            .map(|_v| subscription)
171    }
172
173    /// subscriptions returns an iterator which returns all of the subscriptions for the client's project.
174    pub async fn get_subscriptions(&self, retry: Option<RetrySetting>) -> Result<Vec<Subscription>, Status> {
175        let req = ListSubscriptionsRequest {
176            project: self.fully_qualified_project_name(),
177            page_size: 0,
178            page_token: "".to_string(),
179        };
180        self.subc.list_subscriptions(req, retry).await.map(|v| {
181            v.into_iter()
182                .map(|x| Subscription::new(x.name, self.subc.clone()))
183                .collect()
184        })
185    }
186
187    /// subscription creates a reference to a subscription.
188    pub fn subscription(&self, id: &str) -> Subscription {
189        Subscription::new(self.fully_qualified_subscription_name(id), self.subc.clone())
190    }
191
192    /// detach_subscription detaches a subscription from its topic. All messages
193    /// retained in the subscription are dropped. Subsequent `Pull` and `StreamingPull`
194    /// requests will return FAILED_PRECONDITION. If the subscription is a push
195    /// subscription, pushes to the endpoint will stop.
196    pub async fn detach_subscription(&self, fqsn: &str, retry: Option<RetrySetting>) -> Result<(), Status> {
197        let req = DetachSubscriptionRequest {
198            subscription: fqsn.to_string(),
199        };
200        self.pubc.detach_subscription(req, retry).await.map(|_v| ())
201    }
202
203    /// create_topic creates a new topic.
204    ///
205    /// The specified topic ID must start with a letter, and contain only letters
206    /// ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.),
207    /// tildes (~), plus (+) or percent signs (%). It must be between 3 and 255
208    /// characters in length, and must not start with "goog". For more information,
209    /// see: https://cloud.google.com/pubsub/docs/admin#resource_names
210    ///
211    /// If the topic already exists an error will be returned.
212    pub async fn create_topic(
213        &self,
214        id: &str,
215        cfg: Option<TopicConfig>,
216        retry: Option<RetrySetting>,
217    ) -> Result<Topic, Status> {
218        let topic = self.topic(id);
219        topic.create(cfg, retry).await.map(|_v| topic)
220    }
221
222    /// topics returns an iterator which returns all of the topics for the client's project.
223    pub async fn get_topics(&self, retry: Option<RetrySetting>) -> Result<Vec<String>, Status> {
224        let req = ListTopicsRequest {
225            project: self.fully_qualified_project_name(),
226            page_size: 0,
227            page_token: "".to_string(),
228        };
229        self.pubc
230            .list_topics(req, retry)
231            .await
232            .map(|v| v.into_iter().map(|x| x.name).collect())
233    }
234
235    /// topic creates a reference to a topic in the client's project.
236    ///
237    /// If a Topic's Publish method is called, it has background tasks
238    /// associated with it. Clean them up by calling topic.stop.
239    ///
240    /// Avoid creating many Topic instances if you use them to publish.
241    pub fn topic(&self, id: &str) -> Topic {
242        Topic::new(self.fully_qualified_topic_name(id), self.pubc.clone(), self.subc.clone())
243    }
244
245    /// get_snapshots lists the existing snapshots. Snapshots are used in Seek (at https://cloud.google.com/pubsub/docs/replay-overview) operations, which
246    /// allow you to manage message acknowledgments in bulk. That is, you can set
247    /// the acknowledgment state of messages in an existing subscription to the
248    /// state captured by a snapshot.
249    pub async fn get_snapshots(&self, retry: Option<RetrySetting>) -> Result<Vec<Snapshot>, Status> {
250        let req = ListSnapshotsRequest {
251            project: self.fully_qualified_project_name(),
252            page_size: 0,
253            page_token: "".to_string(),
254        };
255        self.subc.list_snapshots(req, retry).await
256    }
257
258    pub fn fully_qualified_topic_name(&self, id: &str) -> String {
259        if id.contains('/') {
260            id.to_string()
261        } else {
262            format!("projects/{}/topics/{}", self.project_id, id)
263        }
264    }
265
266    pub fn fully_qualified_subscription_name(&self, id: &str) -> String {
267        if id.contains('/') {
268            id.to_string()
269        } else {
270            format!("projects/{}/subscriptions/{}", self.project_id, id)
271        }
272    }
273
274    fn fully_qualified_project_name(&self) -> String {
275        format!("projects/{}", self.project_id)
276    }
277}
278
279#[cfg(test)]
280#[allow(deprecated)]
281mod tests {
282    use std::collections::HashMap;
283
284    use serial_test::serial;
285
286    use uuid::Uuid;
287
288    use crate::client::Client;
289    use crate::subscription::SubscriptionConfig;
290
291    async fn create_client() -> Client {
292        std::env::set_var("PUBSUB_EMULATOR_HOST", "localhost:8681");
293
294        Client::new(Default::default()).await.unwrap()
295    }
296
297    #[tokio::test(flavor = "multi_thread")]
298    #[serial]
299    async fn test_lifecycle() {
300        let client = create_client().await;
301
302        let uuid = Uuid::new_v4().hyphenated().to_string();
303        let topic_id = &format!("t{}", &uuid);
304        let subscription_id = &format!("s{}", &uuid);
305        let snapshot_id = &format!("snap{}", &uuid);
306        let topics = client.get_topics(None).await.unwrap();
307        let subs = client.get_subscriptions(None).await.unwrap();
308        let snapshots = client.get_snapshots(None).await.unwrap();
309        let _topic = client.create_topic(topic_id.as_str(), None, None).await.unwrap();
310        let subscription = client
311            .create_subscription(subscription_id.as_str(), topic_id.as_str(), SubscriptionConfig::default(), None)
312            .await
313            .unwrap();
314
315        let _ = subscription
316            .create_snapshot(snapshot_id, HashMap::default(), None)
317            .await
318            .unwrap();
319
320        let topics_after = client.get_topics(None).await.unwrap();
321        let subs_after = client.get_subscriptions(None).await.unwrap();
322        let snapshots_after = client.get_snapshots(None).await.unwrap();
323        assert_eq!(1, topics_after.len() - topics.len());
324        assert_eq!(1, subs_after.len() - subs.len());
325        assert_eq!(1, snapshots_after.len() - snapshots.len());
326    }
327}
328
329#[cfg(test)]
330mod tests_in_gcp {
331    use crate::client::{Client, ClientConfig};
332    use crate::publisher::PublisherConfig;
333    use google_cloud_gax::conn::Environment;
334    use google_cloud_gax::grpc::codegen::tokio_stream::StreamExt;
335    use google_cloud_googleapis::pubsub::v1::PubsubMessage;
336    use serial_test::serial;
337    use std::collections::HashMap;
338
339    use crate::subscription::SubscribeConfig;
340    use std::time::Duration;
341    use tokio::select;
342    use tokio_util::sync::CancellationToken;
343
344    fn make_msg(key: &str) -> PubsubMessage {
345        PubsubMessage {
346            data: if key.is_empty() {
347                "empty".into()
348            } else {
349                key.to_string().into()
350            },
351            ordering_key: key.into(),
352            ..Default::default()
353        }
354    }
355
356    #[tokio::test]
357    #[ignore]
358    async fn test_with_auth() {
359        let config = ClientConfig::default().with_auth().await.unwrap();
360        if let Environment::Emulator(_) = config.environment {
361            unreachable!()
362        }
363    }
364
365    #[tokio::test]
366    #[serial]
367    #[ignore]
368    async fn test_publish_ordering_in_gcp_flush_buffer() {
369        let client = Client::new(ClientConfig::default().with_auth().await.unwrap())
370            .await
371            .unwrap();
372        let topic = client.topic("test-topic2");
373        let publisher = topic.new_publisher(Some(PublisherConfig {
374            flush_interval: Duration::from_secs(3),
375            workers: 3,
376            ..Default::default()
377        }));
378
379        let mut awaiters = vec![];
380        for key in ["", "key1", "key2", "key3", "key3"] {
381            awaiters.push(publisher.publish(make_msg(key)).await);
382        }
383        for awaiter in awaiters.into_iter() {
384            tracing::info!("msg id {}", awaiter.get().await.unwrap());
385        }
386
387        // check same key
388        let mut awaiters = vec![];
389        for key in ["", "key1", "key2", "key3", "key3"] {
390            awaiters.push(publisher.publish(make_msg(key)).await);
391        }
392        for awaiter in awaiters.into_iter() {
393            tracing::info!("msg id {}", awaiter.get().await.unwrap());
394        }
395    }
396
397    #[tokio::test]
398    #[serial]
399    #[ignore]
400    async fn test_publish_ordering_in_gcp_limit_exceed() {
401        let client = Client::new(ClientConfig::default().with_auth().await.unwrap())
402            .await
403            .unwrap();
404        let topic = client.topic("test-topic2");
405        let publisher = topic.new_publisher(Some(PublisherConfig {
406            flush_interval: Duration::from_secs(30),
407            workers: 1,
408            bundle_size: 8,
409            ..Default::default()
410        }));
411
412        let mut awaiters = vec![];
413        for key in ["", "key1", "key2", "key3", "key1", "key2", "key3", ""] {
414            awaiters.push(publisher.publish(make_msg(key)).await);
415        }
416        for awaiter in awaiters.into_iter() {
417            tracing::info!("msg id {}", awaiter.get().await.unwrap());
418        }
419
420        // check same key twice
421        let mut awaiters = vec![];
422        for key in ["", "key1", "key2", "key3", "key1", "key2", "key3", ""] {
423            awaiters.push(publisher.publish(make_msg(key)).await);
424        }
425        for awaiter in awaiters.into_iter() {
426            tracing::info!("msg id {}", awaiter.get().await.unwrap());
427        }
428    }
429
430    #[tokio::test]
431    #[serial]
432    #[ignore]
433    async fn test_publish_ordering_in_gcp_bulk() {
434        let client = Client::new(ClientConfig::default().with_auth().await.unwrap())
435            .await
436            .unwrap();
437        let topic = client.topic("test-topic2");
438        let publisher = topic.new_publisher(Some(PublisherConfig {
439            flush_interval: Duration::from_secs(30),
440            workers: 2,
441            bundle_size: 8,
442            ..Default::default()
443        }));
444
445        let msgs = ["", "", "key1", "key1", "key2", "key2", "key3", "key3"]
446            .map(make_msg)
447            .to_vec();
448        for awaiter in publisher.publish_bulk(msgs).await.into_iter() {
449            tracing::info!("msg id {}", awaiter.get().await.unwrap());
450        }
451
452        // check same key twice
453        let msgs = ["", "", "key1", "key1", "key2", "key2", "key3", "key3"]
454            .map(make_msg)
455            .to_vec();
456        for awaiter in publisher.publish_bulk(msgs).await.into_iter() {
457            tracing::info!("msg id {}", awaiter.get().await.unwrap());
458        }
459    }
460    #[tokio::test]
461    #[serial]
462    #[ignore]
463    async fn test_publish_subscribe_exactly_once_delivery() {
464        let client = Client::new(ClientConfig::default().with_auth().await.unwrap())
465            .await
466            .unwrap();
467
468        // Check if the subscription is exactly_once_delivery
469        let subscription = client.subscription("eod-test");
470        let config = subscription.config(None).await.unwrap().1;
471        assert!(config.enable_exactly_once_delivery);
472
473        // publish message
474        let ctx = CancellationToken::new();
475        let ctx_pub = ctx.clone();
476        let publisher = client.topic("eod-test").new_publisher(None);
477        let pub_task = tokio::spawn(async move {
478            tracing::info!("start publisher");
479            loop {
480                if ctx_pub.is_cancelled() {
481                    tracing::info!("finish publisher");
482                    return;
483                }
484                publisher
485                    .publish_blocking(PubsubMessage {
486                        data: "msg".into(),
487                        ..Default::default()
488                    })
489                    .get()
490                    .await
491                    .unwrap();
492            }
493        });
494
495        // subscribe message
496        let ctx_sub = ctx.child_token();
497        let sub_task = tokio::spawn(async move {
498            tracing::info!("start subscriber");
499            let mut stream = subscription.subscribe(None).await.unwrap();
500            let mut msgs = HashMap::new();
501            while let Some(message) = select! {
502                msg = stream.next() => msg,
503                _ = ctx_sub.cancelled() => None
504            } {
505                let msg_id = &message.message.message_id;
506                // heavy task
507                tokio::time::sleep(Duration::from_secs(1)).await;
508                *msgs.entry(msg_id.clone()).or_insert(0) += 1;
509                message.ack().await.unwrap();
510            }
511            stream.dispose().await;
512            tracing::info!("finish subscriber");
513            msgs
514        });
515
516        tokio::time::sleep(Duration::from_secs(60)).await;
517
518        // check redelivered messages
519        ctx.cancel();
520        pub_task.await.unwrap();
521        let received_msgs = sub_task.await.unwrap();
522        assert!(!received_msgs.is_empty());
523
524        tracing::info!("Number of received messages = {}", received_msgs.len());
525        for (msg_id, count) in received_msgs {
526            assert_eq!(count, 1, "msg_id = {msg_id}, count = {count}");
527        }
528    }
529
530    #[tokio::test]
531    #[serial]
532    #[ignore]
533    async fn test_publish_subscribe_ordering() {
534        let client = Client::new(ClientConfig::default().with_auth().await.unwrap())
535            .await
536            .unwrap();
537        let subscription = client.subscription("order-test");
538        let config = subscription.config(None).await.unwrap().1;
539        assert!(config.enable_message_ordering);
540
541        let msg_len = 10;
542        let ctx = CancellationToken::new();
543        let ctx_sub = ctx.clone();
544
545        // publish message
546        tracing::info!("publish messages: size = {msg_len}");
547        let publisher = client.topic("order-test").new_publisher(None);
548        for i in 0..msg_len {
549            publisher
550                .publish(PubsubMessage {
551                    data: i.to_string().into(),
552                    ordering_key: "key1".into(),
553                    ..Default::default()
554                })
555                .await
556                .get()
557                .await
558                .unwrap();
559        }
560
561        let checker = tokio::spawn(async move {
562            tokio::time::sleep(Duration::from_secs(60)).await;
563            ctx.cancel();
564        });
565
566        // subscribe message
567        tracing::info!("start subscriber");
568        let option = SubscribeConfig::default().with_enable_multiple_subscriber(true);
569        let mut stream = subscription.subscribe(Some(option)).await.unwrap();
570        let mut msgs = vec![];
571        while let Some(message) = select! {
572            msg = stream.next() => msg,
573            _ = ctx_sub.cancelled() => None
574        } {
575            let data = message.message.data.clone().to_vec();
576            let i: u8 = String::from_utf8(data).unwrap().parse().unwrap();
577            msgs.push(i);
578            message.ack().await.unwrap();
579        }
580        tracing::info!("finish subscriber");
581        let _ = checker.await;
582        let nack = stream.dispose().await;
583        assert_eq!(nack, 0);
584        assert_eq!(msgs.len(), msg_len as usize);
585        for i in 0..msg_len {
586            assert_eq!(msgs[i as usize], i);
587        }
588    }
589
590    #[tokio::test]
591    #[serial]
592    #[ignore]
593    async fn test_pull_empty() {
594        let client = Client::new(ClientConfig::default().with_auth().await.unwrap())
595            .await
596            .unwrap();
597        let subscription = client.subscription("pull-test");
598        let messages = subscription.pull(10, None).await.unwrap();
599        assert!(messages.is_empty());
600    }
601}