1use std::env::var;
2
3use google_cloud_gax::conn::{ConnectionOptions, Environment};
4use google_cloud_gax::grpc::Status;
5use google_cloud_gax::retry::RetrySetting;
6use google_cloud_googleapis::pubsub::v1::{
7 DetachSubscriptionRequest, ListSnapshotsRequest, ListSubscriptionsRequest, ListTopicsRequest, Snapshot,
8};
9use token_source::NoopTokenSourceProvider;
10
11use crate::apiv1::conn_pool::{ConnectionManager, PUBSUB};
12use crate::apiv1::publisher_client::PublisherClient;
13use crate::apiv1::subscriber_client::SubscriberClient;
14use crate::subscription::{Subscription, SubscriptionConfig};
15use crate::topic::{Topic, TopicConfig};
16
17#[derive(Debug)]
18pub struct ClientConfig {
19 pub pool_size: usize,
21 pub project_id: Option<String>,
23 pub environment: Environment,
25 pub endpoint: String,
27 pub connection_option: ConnectionOptions,
29}
30
31impl Default for ClientConfig {
33 fn default() -> Self {
34 let emulator = var("PUBSUB_EMULATOR_HOST").ok();
35 let default_project_id = emulator.as_ref().map(|_| "local-project".to_string());
36 Self {
37 pool_size: 4,
38 environment: match emulator {
39 Some(v) => Environment::Emulator(v),
40 None => Environment::GoogleCloud(Box::new(NoopTokenSourceProvider {})),
41 },
42 project_id: default_project_id,
43 endpoint: PUBSUB.to_string(),
44 connection_option: ConnectionOptions::default(),
45 }
46 }
47}
48
49#[cfg(feature = "auth")]
50pub use google_cloud_auth;
51
52#[cfg(feature = "auth")]
53impl ClientConfig {
54 pub async fn with_auth(mut self) -> Result<Self, google_cloud_auth::error::Error> {
55 if let Environment::GoogleCloud(_) = self.environment {
56 let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::auth_config()).await?;
57 self.project_id = self.project_id.or(ts.project_id.clone());
58 self.environment = Environment::GoogleCloud(Box::new(ts))
59 }
60 Ok(self)
61 }
62
63 pub async fn with_credentials(
64 mut self,
65 credentials: google_cloud_auth::credentials::CredentialsFile,
66 ) -> Result<Self, google_cloud_auth::error::Error> {
67 if let Environment::GoogleCloud(_) = self.environment {
68 let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
69 Self::auth_config(),
70 Box::new(credentials),
71 )
72 .await?;
73 self.project_id = self.project_id.or(ts.project_id.clone());
74 self.environment = Environment::GoogleCloud(Box::new(ts))
75 }
76 Ok(self)
77 }
78
79 fn auth_config() -> google_cloud_auth::project::Config<'static> {
80 google_cloud_auth::project::Config::default()
81 .with_audience(crate::apiv1::conn_pool::AUDIENCE)
82 .with_scopes(&crate::apiv1::conn_pool::SCOPES)
83 }
84}
85
86#[derive(thiserror::Error, Debug)]
87pub enum Error {
88 #[error(transparent)]
89 GAX(#[from] google_cloud_gax::conn::Error),
90 #[error("Project ID was not found")]
91 ProjectIdNotFound,
92}
93
94#[derive(Clone, Debug)]
99pub struct Client {
100 project_id: String,
101 pubc: PublisherClient,
102 subc: SubscriberClient,
103}
104
105impl Client {
106 pub async fn new(config: ClientConfig) -> Result<Self, Error> {
108 let pubc = PublisherClient::new(
109 ConnectionManager::new(
110 config.pool_size,
111 config.endpoint.as_str(),
112 &config.environment,
113 &config.connection_option,
114 )
115 .await?,
116 );
117 let subc = SubscriberClient::new(
118 ConnectionManager::new(
119 config.pool_size,
120 config.endpoint.as_str(),
121 &config.environment,
122 &config.connection_option,
123 )
124 .await?,
125 ConnectionManager::new(
126 config.pool_size,
127 config.endpoint.as_str(),
128 &config.environment,
129 &config.connection_option,
130 )
131 .await?,
132 );
133 Ok(Self {
134 project_id: config.project_id.ok_or(Error::ProjectIdNotFound)?,
135 pubc,
136 subc,
137 })
138 }
139
140 pub async fn create_subscription(
160 &self,
161 id: &str,
162 topic_id: &str,
163 cfg: SubscriptionConfig,
164 retry: Option<RetrySetting>,
165 ) -> Result<Subscription, Status> {
166 let subscription = self.subscription(id);
167 subscription
168 .create(self.fully_qualified_topic_name(topic_id).as_str(), cfg, retry)
169 .await
170 .map(|_v| subscription)
171 }
172
173 pub async fn get_subscriptions(&self, retry: Option<RetrySetting>) -> Result<Vec<Subscription>, Status> {
175 let req = ListSubscriptionsRequest {
176 project: self.fully_qualified_project_name(),
177 page_size: 0,
178 page_token: "".to_string(),
179 };
180 self.subc.list_subscriptions(req, retry).await.map(|v| {
181 v.into_iter()
182 .map(|x| Subscription::new(x.name, self.subc.clone()))
183 .collect()
184 })
185 }
186
187 pub fn subscription(&self, id: &str) -> Subscription {
189 Subscription::new(self.fully_qualified_subscription_name(id), self.subc.clone())
190 }
191
192 pub async fn detach_subscription(&self, fqsn: &str, retry: Option<RetrySetting>) -> Result<(), Status> {
197 let req = DetachSubscriptionRequest {
198 subscription: fqsn.to_string(),
199 };
200 self.pubc.detach_subscription(req, retry).await.map(|_v| ())
201 }
202
203 pub async fn create_topic(
213 &self,
214 id: &str,
215 cfg: Option<TopicConfig>,
216 retry: Option<RetrySetting>,
217 ) -> Result<Topic, Status> {
218 let topic = self.topic(id);
219 topic.create(cfg, retry).await.map(|_v| topic)
220 }
221
222 pub async fn get_topics(&self, retry: Option<RetrySetting>) -> Result<Vec<String>, Status> {
224 let req = ListTopicsRequest {
225 project: self.fully_qualified_project_name(),
226 page_size: 0,
227 page_token: "".to_string(),
228 };
229 self.pubc
230 .list_topics(req, retry)
231 .await
232 .map(|v| v.into_iter().map(|x| x.name).collect())
233 }
234
235 pub fn topic(&self, id: &str) -> Topic {
242 Topic::new(self.fully_qualified_topic_name(id), self.pubc.clone(), self.subc.clone())
243 }
244
245 pub async fn get_snapshots(&self, retry: Option<RetrySetting>) -> Result<Vec<Snapshot>, Status> {
250 let req = ListSnapshotsRequest {
251 project: self.fully_qualified_project_name(),
252 page_size: 0,
253 page_token: "".to_string(),
254 };
255 self.subc.list_snapshots(req, retry).await
256 }
257
258 pub fn fully_qualified_topic_name(&self, id: &str) -> String {
259 if id.contains('/') {
260 id.to_string()
261 } else {
262 format!("projects/{}/topics/{}", self.project_id, id)
263 }
264 }
265
266 pub fn fully_qualified_subscription_name(&self, id: &str) -> String {
267 if id.contains('/') {
268 id.to_string()
269 } else {
270 format!("projects/{}/subscriptions/{}", self.project_id, id)
271 }
272 }
273
274 fn fully_qualified_project_name(&self) -> String {
275 format!("projects/{}", self.project_id)
276 }
277}
278
279#[cfg(test)]
280#[allow(deprecated)]
281mod tests {
282 use std::collections::HashMap;
283
284 use serial_test::serial;
285
286 use uuid::Uuid;
287
288 use crate::client::Client;
289 use crate::subscription::SubscriptionConfig;
290
291 async fn create_client() -> Client {
292 std::env::set_var("PUBSUB_EMULATOR_HOST", "localhost:8681");
293
294 Client::new(Default::default()).await.unwrap()
295 }
296
297 #[tokio::test(flavor = "multi_thread")]
298 #[serial]
299 async fn test_lifecycle() {
300 let client = create_client().await;
301
302 let uuid = Uuid::new_v4().hyphenated().to_string();
303 let topic_id = &format!("t{}", &uuid);
304 let subscription_id = &format!("s{}", &uuid);
305 let snapshot_id = &format!("snap{}", &uuid);
306 let topics = client.get_topics(None).await.unwrap();
307 let subs = client.get_subscriptions(None).await.unwrap();
308 let snapshots = client.get_snapshots(None).await.unwrap();
309 let _topic = client.create_topic(topic_id.as_str(), None, None).await.unwrap();
310 let subscription = client
311 .create_subscription(subscription_id.as_str(), topic_id.as_str(), SubscriptionConfig::default(), None)
312 .await
313 .unwrap();
314
315 let _ = subscription
316 .create_snapshot(snapshot_id, HashMap::default(), None)
317 .await
318 .unwrap();
319
320 let topics_after = client.get_topics(None).await.unwrap();
321 let subs_after = client.get_subscriptions(None).await.unwrap();
322 let snapshots_after = client.get_snapshots(None).await.unwrap();
323 assert_eq!(1, topics_after.len() - topics.len());
324 assert_eq!(1, subs_after.len() - subs.len());
325 assert_eq!(1, snapshots_after.len() - snapshots.len());
326 }
327}
328
329#[cfg(test)]
330mod tests_in_gcp {
331 use crate::client::{Client, ClientConfig};
332 use crate::publisher::PublisherConfig;
333 use google_cloud_gax::conn::Environment;
334 use google_cloud_gax::grpc::codegen::tokio_stream::StreamExt;
335 use google_cloud_googleapis::pubsub::v1::PubsubMessage;
336 use serial_test::serial;
337 use std::collections::HashMap;
338
339 use crate::subscription::SubscribeConfig;
340 use std::time::Duration;
341 use tokio::select;
342 use tokio_util::sync::CancellationToken;
343
344 fn make_msg(key: &str) -> PubsubMessage {
345 PubsubMessage {
346 data: if key.is_empty() {
347 "empty".into()
348 } else {
349 key.to_string().into()
350 },
351 ordering_key: key.into(),
352 ..Default::default()
353 }
354 }
355
356 #[tokio::test]
357 #[ignore]
358 async fn test_with_auth() {
359 let config = ClientConfig::default().with_auth().await.unwrap();
360 if let Environment::Emulator(_) = config.environment {
361 unreachable!()
362 }
363 }
364
365 #[tokio::test]
366 #[serial]
367 #[ignore]
368 async fn test_publish_ordering_in_gcp_flush_buffer() {
369 let client = Client::new(ClientConfig::default().with_auth().await.unwrap())
370 .await
371 .unwrap();
372 let topic = client.topic("test-topic2");
373 let publisher = topic.new_publisher(Some(PublisherConfig {
374 flush_interval: Duration::from_secs(3),
375 workers: 3,
376 ..Default::default()
377 }));
378
379 let mut awaiters = vec![];
380 for key in ["", "key1", "key2", "key3", "key3"] {
381 awaiters.push(publisher.publish(make_msg(key)).await);
382 }
383 for awaiter in awaiters.into_iter() {
384 tracing::info!("msg id {}", awaiter.get().await.unwrap());
385 }
386
387 let mut awaiters = vec![];
389 for key in ["", "key1", "key2", "key3", "key3"] {
390 awaiters.push(publisher.publish(make_msg(key)).await);
391 }
392 for awaiter in awaiters.into_iter() {
393 tracing::info!("msg id {}", awaiter.get().await.unwrap());
394 }
395 }
396
397 #[tokio::test]
398 #[serial]
399 #[ignore]
400 async fn test_publish_ordering_in_gcp_limit_exceed() {
401 let client = Client::new(ClientConfig::default().with_auth().await.unwrap())
402 .await
403 .unwrap();
404 let topic = client.topic("test-topic2");
405 let publisher = topic.new_publisher(Some(PublisherConfig {
406 flush_interval: Duration::from_secs(30),
407 workers: 1,
408 bundle_size: 8,
409 ..Default::default()
410 }));
411
412 let mut awaiters = vec![];
413 for key in ["", "key1", "key2", "key3", "key1", "key2", "key3", ""] {
414 awaiters.push(publisher.publish(make_msg(key)).await);
415 }
416 for awaiter in awaiters.into_iter() {
417 tracing::info!("msg id {}", awaiter.get().await.unwrap());
418 }
419
420 let mut awaiters = vec![];
422 for key in ["", "key1", "key2", "key3", "key1", "key2", "key3", ""] {
423 awaiters.push(publisher.publish(make_msg(key)).await);
424 }
425 for awaiter in awaiters.into_iter() {
426 tracing::info!("msg id {}", awaiter.get().await.unwrap());
427 }
428 }
429
430 #[tokio::test]
431 #[serial]
432 #[ignore]
433 async fn test_publish_ordering_in_gcp_bulk() {
434 let client = Client::new(ClientConfig::default().with_auth().await.unwrap())
435 .await
436 .unwrap();
437 let topic = client.topic("test-topic2");
438 let publisher = topic.new_publisher(Some(PublisherConfig {
439 flush_interval: Duration::from_secs(30),
440 workers: 2,
441 bundle_size: 8,
442 ..Default::default()
443 }));
444
445 let msgs = ["", "", "key1", "key1", "key2", "key2", "key3", "key3"]
446 .map(make_msg)
447 .to_vec();
448 for awaiter in publisher.publish_bulk(msgs).await.into_iter() {
449 tracing::info!("msg id {}", awaiter.get().await.unwrap());
450 }
451
452 let msgs = ["", "", "key1", "key1", "key2", "key2", "key3", "key3"]
454 .map(make_msg)
455 .to_vec();
456 for awaiter in publisher.publish_bulk(msgs).await.into_iter() {
457 tracing::info!("msg id {}", awaiter.get().await.unwrap());
458 }
459 }
460 #[tokio::test]
461 #[serial]
462 #[ignore]
463 async fn test_publish_subscribe_exactly_once_delivery() {
464 let client = Client::new(ClientConfig::default().with_auth().await.unwrap())
465 .await
466 .unwrap();
467
468 let subscription = client.subscription("eod-test");
470 let config = subscription.config(None).await.unwrap().1;
471 assert!(config.enable_exactly_once_delivery);
472
473 let ctx = CancellationToken::new();
475 let ctx_pub = ctx.clone();
476 let publisher = client.topic("eod-test").new_publisher(None);
477 let pub_task = tokio::spawn(async move {
478 tracing::info!("start publisher");
479 loop {
480 if ctx_pub.is_cancelled() {
481 tracing::info!("finish publisher");
482 return;
483 }
484 publisher
485 .publish_blocking(PubsubMessage {
486 data: "msg".into(),
487 ..Default::default()
488 })
489 .get()
490 .await
491 .unwrap();
492 }
493 });
494
495 let ctx_sub = ctx.child_token();
497 let sub_task = tokio::spawn(async move {
498 tracing::info!("start subscriber");
499 let mut stream = subscription.subscribe(None).await.unwrap();
500 let mut msgs = HashMap::new();
501 while let Some(message) = select! {
502 msg = stream.next() => msg,
503 _ = ctx_sub.cancelled() => None
504 } {
505 let msg_id = &message.message.message_id;
506 tokio::time::sleep(Duration::from_secs(1)).await;
508 *msgs.entry(msg_id.clone()).or_insert(0) += 1;
509 message.ack().await.unwrap();
510 }
511 stream.dispose().await;
512 tracing::info!("finish subscriber");
513 msgs
514 });
515
516 tokio::time::sleep(Duration::from_secs(60)).await;
517
518 ctx.cancel();
520 pub_task.await.unwrap();
521 let received_msgs = sub_task.await.unwrap();
522 assert!(!received_msgs.is_empty());
523
524 tracing::info!("Number of received messages = {}", received_msgs.len());
525 for (msg_id, count) in received_msgs {
526 assert_eq!(count, 1, "msg_id = {msg_id}, count = {count}");
527 }
528 }
529
530 #[tokio::test]
531 #[serial]
532 #[ignore]
533 async fn test_publish_subscribe_ordering() {
534 let client = Client::new(ClientConfig::default().with_auth().await.unwrap())
535 .await
536 .unwrap();
537 let subscription = client.subscription("order-test");
538 let config = subscription.config(None).await.unwrap().1;
539 assert!(config.enable_message_ordering);
540
541 let msg_len = 10;
542 let ctx = CancellationToken::new();
543 let ctx_sub = ctx.clone();
544
545 tracing::info!("publish messages: size = {msg_len}");
547 let publisher = client.topic("order-test").new_publisher(None);
548 for i in 0..msg_len {
549 publisher
550 .publish(PubsubMessage {
551 data: i.to_string().into(),
552 ordering_key: "key1".into(),
553 ..Default::default()
554 })
555 .await
556 .get()
557 .await
558 .unwrap();
559 }
560
561 let checker = tokio::spawn(async move {
562 tokio::time::sleep(Duration::from_secs(60)).await;
563 ctx.cancel();
564 });
565
566 tracing::info!("start subscriber");
568 let option = SubscribeConfig::default().with_enable_multiple_subscriber(true);
569 let mut stream = subscription.subscribe(Some(option)).await.unwrap();
570 let mut msgs = vec![];
571 while let Some(message) = select! {
572 msg = stream.next() => msg,
573 _ = ctx_sub.cancelled() => None
574 } {
575 let data = message.message.data.clone().to_vec();
576 let i: u8 = String::from_utf8(data).unwrap().parse().unwrap();
577 msgs.push(i);
578 message.ack().await.unwrap();
579 }
580 tracing::info!("finish subscriber");
581 let _ = checker.await;
582 let nack = stream.dispose().await;
583 assert_eq!(nack, 0);
584 assert_eq!(msgs.len(), msg_len as usize);
585 for i in 0..msg_len {
586 assert_eq!(msgs[i as usize], i);
587 }
588 }
589
590 #[tokio::test]
591 #[serial]
592 #[ignore]
593 async fn test_pull_empty() {
594 let client = Client::new(ClientConfig::default().with_auth().await.unwrap())
595 .await
596 .unwrap();
597 let subscription = client.subscription("pull-test");
598 let messages = subscription.pull(10, None).await.unwrap();
599 assert!(messages.is_empty());
600 }
601}