1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::{Duration, SystemTime};
6
7use prost_types::{DurationError, FieldMask};
8use tokio_util::sync::CancellationToken;
9
10use google_cloud_gax::grpc::codegen::tokio_stream::Stream;
11use google_cloud_gax::grpc::{Code, Status};
12use google_cloud_gax::retry::RetrySetting;
13use google_cloud_googleapis::pubsub::v1::seek_request::Target;
14use google_cloud_googleapis::pubsub::v1::subscription::AnalyticsHubSubscriptionInfo;
15use google_cloud_googleapis::pubsub::v1::{
16 BigQueryConfig, CloudStorageConfig, CreateSnapshotRequest, DeadLetterPolicy, DeleteSnapshotRequest,
17 DeleteSubscriptionRequest, ExpirationPolicy, GetSnapshotRequest, GetSubscriptionRequest, MessageTransform,
18 PullRequest, PushConfig, RetryPolicy, SeekRequest, Snapshot, Subscription as InternalSubscription,
19 UpdateSubscriptionRequest,
20};
21
22use crate::apiv1::subscriber_client::SubscriberClient;
23
24use crate::subscriber::{ack, ReceivedMessage, Subscriber, SubscriberConfig};
25
26#[derive(Debug, Clone, Default)]
27pub struct SubscriptionConfig {
28 pub push_config: Option<PushConfig>,
29 pub ack_deadline_seconds: i32,
30 pub retain_acked_messages: bool,
31 pub message_retention_duration: Option<Duration>,
32 pub labels: HashMap<String, String>,
33 pub enable_message_ordering: bool,
34 pub expiration_policy: Option<ExpirationPolicy>,
35 pub filter: String,
36 pub dead_letter_policy: Option<DeadLetterPolicy>,
37 pub retry_policy: Option<RetryPolicy>,
38 pub detached: bool,
39 pub topic_message_retention_duration: Option<Duration>,
40 pub enable_exactly_once_delivery: bool,
41 pub bigquery_config: Option<BigQueryConfig>,
42 pub state: i32,
43 pub cloud_storage_config: Option<CloudStorageConfig>,
44 pub analytics_hub_subscription_info: Option<AnalyticsHubSubscriptionInfo>,
45 pub message_transforms: Vec<MessageTransform>,
46}
47impl From<InternalSubscription> for SubscriptionConfig {
48 fn from(f: InternalSubscription) -> Self {
49 Self {
50 push_config: f.push_config,
51 bigquery_config: f.bigquery_config,
52 ack_deadline_seconds: f.ack_deadline_seconds,
53 retain_acked_messages: f.retain_acked_messages,
54 message_retention_duration: f
55 .message_retention_duration
56 .map(|v| std::time::Duration::new(v.seconds as u64, v.nanos as u32)),
57 labels: f.labels,
58 enable_message_ordering: f.enable_message_ordering,
59 expiration_policy: f.expiration_policy,
60 filter: f.filter,
61 dead_letter_policy: f.dead_letter_policy,
62 retry_policy: f.retry_policy,
63 detached: f.detached,
64 topic_message_retention_duration: f
65 .topic_message_retention_duration
66 .map(|v| std::time::Duration::new(v.seconds as u64, v.nanos as u32)),
67 enable_exactly_once_delivery: f.enable_exactly_once_delivery,
68 state: f.state,
69 cloud_storage_config: f.cloud_storage_config,
70 analytics_hub_subscription_info: f.analytics_hub_subscription_info,
71 message_transforms: f.message_transforms,
72 }
73 }
74}
75
76#[derive(Debug, Clone, Default)]
77pub struct SubscriptionConfigToUpdate {
78 pub push_config: Option<PushConfig>,
79 pub bigquery_config: Option<BigQueryConfig>,
80 pub ack_deadline_seconds: Option<i32>,
81 pub retain_acked_messages: Option<bool>,
82 pub message_retention_duration: Option<Duration>,
83 pub labels: Option<HashMap<String, String>>,
84 pub expiration_policy: Option<ExpirationPolicy>,
85 pub dead_letter_policy: Option<DeadLetterPolicy>,
86 pub retry_policy: Option<RetryPolicy>,
87}
88
89#[derive(Debug, Clone, Default)]
90pub struct SubscribeConfig {
91 enable_multiple_subscriber: bool,
92 channel_capacity: Option<usize>,
93 subscriber_config: Option<SubscriberConfig>,
94}
95
96impl SubscribeConfig {
97 pub fn with_enable_multiple_subscriber(mut self, v: bool) -> Self {
98 self.enable_multiple_subscriber = v;
99 self
100 }
101 pub fn with_subscriber_config(mut self, v: SubscriberConfig) -> Self {
102 self.subscriber_config = Some(v);
103 self
104 }
105 pub fn with_channel_capacity(mut self, v: usize) -> Self {
106 self.channel_capacity = Some(v);
107 self
108 }
109}
110
111#[derive(Debug, Clone)]
112pub struct ReceiveConfig {
113 pub worker_count: usize,
114 pub channel_capacity: Option<usize>,
115 pub subscriber_config: Option<SubscriberConfig>,
116}
117
118impl Default for ReceiveConfig {
119 fn default() -> Self {
120 Self {
121 worker_count: 10,
122 subscriber_config: None,
123 channel_capacity: None,
124 }
125 }
126}
127
128#[derive(Debug, Clone)]
129pub enum SeekTo {
130 Timestamp(SystemTime),
131 Snapshot(String),
132}
133
134impl From<SeekTo> for Target {
135 fn from(to: SeekTo) -> Target {
136 use SeekTo::*;
137 match to {
138 Timestamp(t) => Target::Time(prost_types::Timestamp::from(t)),
139 Snapshot(s) => Target::Snapshot(s),
140 }
141 }
142}
143
144pub struct MessageStream {
145 queue: async_channel::Receiver<ReceivedMessage>,
146 cancel: CancellationToken,
147 tasks: Vec<Subscriber>,
148}
149
150impl MessageStream {
151 pub fn cancellable(&self) -> CancellationToken {
152 self.cancel.clone()
153 }
154
155 pub async fn dispose(&mut self) {
156 if !self.cancel.is_cancelled() {
158 self.cancel.cancel();
159 }
160
161 for task in &mut self.tasks {
163 task.done().await;
164 }
165
166 while let Ok(message) = self.queue.recv().await {
168 if let Err(err) = message.nack().await {
169 tracing::warn!("failed to nack message messageId={} {:?}", message.message.message_id, err);
170 }
171 }
172 }
173
174 pub async fn read(&mut self) -> Option<ReceivedMessage> {
176 let message = tokio::select! {
177 msg = self.queue.recv() => msg.ok(),
178 _ = self.cancel.cancelled() => None
179 };
180 if message.is_none() {
181 self.dispose().await;
182 }
183 message
184 }
185}
186
187impl Drop for MessageStream {
188 fn drop(&mut self) {
189 if !self.queue.is_empty() {
190 tracing::warn!("Call 'dispose' before drop in order to call nack for remaining messages");
191 }
192 if !self.cancel.is_cancelled() {
193 self.cancel.cancel();
194 }
195 }
196}
197
198impl Stream for MessageStream {
199 type Item = ReceivedMessage;
200
201 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
204 Pin::new(&mut self.get_mut().queue).poll_next(cx)
205 }
206}
207
208#[derive(Clone, Debug)]
210pub struct Subscription {
211 fqsn: String,
212 subc: SubscriberClient,
213}
214
215impl Subscription {
216 pub(crate) fn new(fqsn: String, subc: SubscriberClient) -> Self {
217 Self { fqsn, subc }
218 }
219
220 pub(crate) fn streaming_pool_size(&self) -> usize {
221 self.subc.streaming_pool_size()
222 }
223
224 pub fn id(&self) -> String {
226 self.fqsn
227 .rfind('/')
228 .map_or("".to_string(), |i| self.fqsn[(i + 1)..].to_string())
229 }
230
231 pub fn fully_qualified_name(&self) -> &str {
233 self.fqsn.as_str()
234 }
235
236 pub fn fully_qualified_snapshot_name(&self, id: &str) -> String {
238 if id.contains('/') {
239 id.to_string()
240 } else {
241 format!("{}/snapshots/{}", self.fully_qualified_project_name(), id)
242 }
243 }
244
245 fn fully_qualified_project_name(&self) -> String {
246 let parts: Vec<_> = self
247 .fqsn
248 .split('/')
249 .enumerate()
250 .filter(|&(i, _)| i < 2)
251 .map(|e| e.1)
252 .collect();
253 parts.join("/")
254 }
255
256 pub fn get_client(&self) -> SubscriberClient {
257 self.subc.clone()
258 }
259
260 pub async fn create(&self, fqtn: &str, cfg: SubscriptionConfig, retry: Option<RetrySetting>) -> Result<(), Status> {
262 self.subc
263 .create_subscription(
264 InternalSubscription {
265 name: self.fully_qualified_name().to_string(),
266 topic: fqtn.to_string(),
267 push_config: cfg.push_config,
268 bigquery_config: cfg.bigquery_config,
269 cloud_storage_config: cfg.cloud_storage_config,
270 ack_deadline_seconds: cfg.ack_deadline_seconds,
271 labels: cfg.labels,
272 enable_message_ordering: cfg.enable_message_ordering,
273 expiration_policy: cfg.expiration_policy,
274 filter: cfg.filter,
275 dead_letter_policy: cfg.dead_letter_policy,
276 retry_policy: cfg.retry_policy,
277 detached: cfg.detached,
278 message_retention_duration: cfg
279 .message_retention_duration
280 .map(Duration::try_into)
281 .transpose()
282 .map_err(|err: DurationError| Status::internal(err.to_string()))?,
283 retain_acked_messages: cfg.retain_acked_messages,
284 topic_message_retention_duration: cfg
285 .topic_message_retention_duration
286 .map(Duration::try_into)
287 .transpose()
288 .map_err(|err: DurationError| Status::internal(err.to_string()))?,
289 enable_exactly_once_delivery: cfg.enable_exactly_once_delivery,
290 state: cfg.state,
291 analytics_hub_subscription_info: cfg.analytics_hub_subscription_info,
292 message_transforms: cfg.message_transforms,
293 },
294 retry,
295 )
296 .await
297 .map(|_v| ())
298 }
299
300 pub async fn delete(&self, retry: Option<RetrySetting>) -> Result<(), Status> {
302 let req = DeleteSubscriptionRequest {
303 subscription: self.fqsn.to_string(),
304 };
305 self.subc.delete_subscription(req, retry).await.map(|v| v.into_inner())
306 }
307
308 pub async fn exists(&self, retry: Option<RetrySetting>) -> Result<bool, Status> {
310 let req = GetSubscriptionRequest {
311 subscription: self.fqsn.to_string(),
312 };
313 match self.subc.get_subscription(req, retry).await {
314 Ok(_) => Ok(true),
315 Err(e) => {
316 if e.code() == Code::NotFound {
317 Ok(false)
318 } else {
319 Err(e)
320 }
321 }
322 }
323 }
324
325 pub async fn config(&self, retry: Option<RetrySetting>) -> Result<(String, SubscriptionConfig), Status> {
327 let req = GetSubscriptionRequest {
328 subscription: self.fqsn.to_string(),
329 };
330 self.subc.get_subscription(req, retry).await.map(|v| {
331 let inner = v.into_inner();
332 (inner.topic.to_string(), inner.into())
333 })
334 }
335
336 pub async fn update(
339 &self,
340 updating: SubscriptionConfigToUpdate,
341 retry: Option<RetrySetting>,
342 ) -> Result<(String, SubscriptionConfig), Status> {
343 let req = GetSubscriptionRequest {
344 subscription: self.fqsn.to_string(),
345 };
346 let mut config = self.subc.get_subscription(req, retry.clone()).await?.into_inner();
347
348 let mut paths = vec![];
349 if updating.push_config.is_some() {
350 config.push_config = updating.push_config;
351 paths.push("push_config".to_string());
352 }
353 if updating.bigquery_config.is_some() {
354 config.bigquery_config = updating.bigquery_config;
355 paths.push("bigquery_config".to_string());
356 }
357 if let Some(v) = updating.ack_deadline_seconds {
358 config.ack_deadline_seconds = v;
359 paths.push("ack_deadline_seconds".to_string());
360 }
361 if let Some(v) = updating.retain_acked_messages {
362 config.retain_acked_messages = v;
363 paths.push("retain_acked_messages".to_string());
364 }
365 if updating.message_retention_duration.is_some() {
366 config.message_retention_duration = updating
367 .message_retention_duration
368 .map(prost_types::Duration::try_from)
369 .transpose()
370 .map_err(|err| Status::internal(err.to_string()))?;
371 paths.push("message_retention_duration".to_string());
372 }
373 if updating.expiration_policy.is_some() {
374 config.expiration_policy = updating.expiration_policy;
375 paths.push("expiration_policy".to_string());
376 }
377 if let Some(v) = updating.labels {
378 config.labels = v;
379 paths.push("labels".to_string());
380 }
381 if updating.retry_policy.is_some() {
382 config.retry_policy = updating.retry_policy;
383 paths.push("retry_policy".to_string());
384 }
385
386 let update_req = UpdateSubscriptionRequest {
387 subscription: Some(config),
388 update_mask: Some(FieldMask { paths }),
389 };
390 self.subc.update_subscription(update_req, retry).await.map(|v| {
391 let inner = v.into_inner();
392 (inner.topic.to_string(), inner.into())
393 })
394 }
395
396 pub async fn pull(&self, max_messages: i32, retry: Option<RetrySetting>) -> Result<Vec<ReceivedMessage>, Status> {
398 #[allow(deprecated)]
399 let req = PullRequest {
400 subscription: self.fqsn.clone(),
401 return_immediately: false,
402 max_messages,
403 };
404 let messages = self.subc.pull(req, retry).await?.into_inner().received_messages;
405 Ok(messages
406 .into_iter()
407 .filter(|m| m.message.is_some())
408 .map(|m| {
409 ReceivedMessage::new(
410 self.fqsn.clone(),
411 self.subc.clone(),
412 m.message.unwrap(),
413 m.ack_id,
414 (m.delivery_attempt > 0).then_some(m.delivery_attempt as usize),
415 )
416 })
417 .collect())
418 }
419
420 pub async fn subscribe(&self, opt: Option<SubscribeConfig>) -> Result<MessageStream, Status> {
462 let opt = opt.unwrap_or_default();
463 let (tx, rx) = create_channel(opt.channel_capacity);
464 let cancel = CancellationToken::new();
465 let sub_opt = self.unwrap_subscribe_config(opt.subscriber_config).await?;
466
467 let subscribers = if opt.enable_multiple_subscriber {
469 self.streaming_pool_size()
470 } else {
471 1
472 };
473 let mut tasks = Vec::with_capacity(subscribers);
474 for _ in 0..subscribers {
475 tasks.push(Subscriber::start(
476 cancel.clone(),
477 self.fqsn.clone(),
478 self.subc.clone(),
479 tx.clone(),
480 sub_opt.clone(),
481 ));
482 }
483
484 Ok(MessageStream {
485 queue: rx,
486 cancel,
487 tasks,
488 })
489 }
490
491 #[deprecated(
500 since = "1.4.1",
501 note = "This will be removed in 1.5.0. Please use `subscribe` method to get a stream of messages and process the messages from the stream."
502 )]
503 pub async fn receive<F>(
504 &self,
505 f: impl Fn(ReceivedMessage, CancellationToken) -> F + Send + 'static + Sync + Clone,
506 cancel: CancellationToken,
507 config: Option<ReceiveConfig>,
508 ) -> Result<(), Status>
509 where
510 F: Future<Output = ()> + Send + 'static,
511 {
512 let op = config.unwrap_or_default();
513 let mut receivers = Vec::with_capacity(op.worker_count);
514 let mut senders = Vec::with_capacity(receivers.len());
515 let sub_opt = self.unwrap_subscribe_config(op.subscriber_config).await?;
516
517 if self
518 .config(sub_opt.retry_setting.clone())
519 .await?
520 .1
521 .enable_message_ordering
522 {
523 (0..op.worker_count).for_each(|_v| {
524 let (sender, receiver) = create_channel(op.channel_capacity);
525 receivers.push(receiver);
526 senders.push(sender);
527 });
528 } else {
529 let (sender, receiver) = create_channel(op.channel_capacity);
530 (0..op.worker_count).for_each(|_v| {
531 receivers.push(receiver.clone());
532 senders.push(sender.clone());
533 });
534 }
535
536 let subscribers: Vec<Subscriber> = senders
538 .into_iter()
539 .map(|queue| {
540 Subscriber::start(cancel.clone(), self.fqsn.clone(), self.subc.clone(), queue, sub_opt.clone())
541 })
542 .collect();
543
544 let mut message_receivers = Vec::with_capacity(receivers.len());
545 for receiver in receivers {
546 let f_clone = f.clone();
547 let cancel_clone = cancel.clone();
548 let name = self.fqsn.clone();
549 message_receivers.push(tokio::spawn(async move {
550 while let Ok(message) = receiver.recv().await {
551 f_clone(message, cancel_clone.clone()).await;
552 }
553 tracing::trace!("stop message receiver : {}", name);
555 }));
556 }
557 cancel.cancelled().await;
558
559 for mut subscriber in subscribers {
561 subscriber.done().await;
562 }
563
564 for mr in message_receivers {
566 let _ = mr.await;
567 }
568 Ok(())
569 }
570
571 pub async fn ack(&self, ack_ids: Vec<String>) -> Result<(), Status> {
634 ack(&self.subc, self.fqsn.to_string(), ack_ids).await
635 }
636
637 pub async fn seek(&self, to: SeekTo, retry: Option<RetrySetting>) -> Result<(), Status> {
639 let to = match to {
640 SeekTo::Timestamp(t) => SeekTo::Timestamp(t),
641 SeekTo::Snapshot(name) => SeekTo::Snapshot(self.fully_qualified_snapshot_name(name.as_str())),
642 };
643
644 let req = SeekRequest {
645 subscription: self.fqsn.to_owned(),
646 target: Some(to.into()),
647 };
648
649 let _ = self.subc.seek(req, retry).await?;
650 Ok(())
651 }
652
653 pub async fn get_snapshot(&self, name: &str, retry: Option<RetrySetting>) -> Result<Snapshot, Status> {
655 let req = GetSnapshotRequest {
656 snapshot: self.fully_qualified_snapshot_name(name),
657 };
658 Ok(self.subc.get_snapshot(req, retry).await?.into_inner())
659 }
660
661 pub async fn create_snapshot(
671 &self,
672 name: &str,
673 labels: HashMap<String, String>,
674 retry: Option<RetrySetting>,
675 ) -> Result<Snapshot, Status> {
676 let req = CreateSnapshotRequest {
677 name: self.fully_qualified_snapshot_name(name),
678 labels,
679 subscription: self.fqsn.to_owned(),
680 };
681 Ok(self.subc.create_snapshot(req, retry).await?.into_inner())
682 }
683
684 pub async fn delete_snapshot(&self, name: &str, retry: Option<RetrySetting>) -> Result<(), Status> {
686 let req = DeleteSnapshotRequest {
687 snapshot: self.fully_qualified_snapshot_name(name),
688 };
689 let _ = self.subc.delete_snapshot(req, retry).await?;
690 Ok(())
691 }
692
693 async fn unwrap_subscribe_config(&self, cfg: Option<SubscriberConfig>) -> Result<SubscriberConfig, Status> {
694 if let Some(cfg) = cfg {
695 return Ok(cfg);
696 }
697 let cfg = self.config(None).await?;
698 let mut default_cfg = SubscriberConfig {
699 stream_ack_deadline_seconds: cfg.1.ack_deadline_seconds.clamp(10, 600),
700 ..Default::default()
701 };
702 if cfg.1.enable_exactly_once_delivery {
703 default_cfg.max_outstanding_messages = 5;
704 }
705 Ok(default_cfg)
706 }
707}
708
709fn create_channel(
710 channel_capacity: Option<usize>,
711) -> (async_channel::Sender<ReceivedMessage>, async_channel::Receiver<ReceivedMessage>) {
712 match channel_capacity {
713 None => async_channel::unbounded(),
714 Some(cap) => async_channel::bounded(cap),
715 }
716}
717
718#[cfg(test)]
719#[allow(deprecated)]
720mod tests {
721
722 use std::collections::HashMap;
723 use std::sync::atomic::AtomicU32;
724 use std::sync::atomic::Ordering::SeqCst;
725 use std::sync::{Arc, Mutex};
726 use std::time::Duration;
727
728 use futures_util::StreamExt;
729 use serial_test::serial;
730 use tokio_util::sync::CancellationToken;
731 use uuid::Uuid;
732
733 use google_cloud_gax::conn::{ConnectionOptions, Environment};
734 use google_cloud_googleapis::pubsub::v1::{PublishRequest, PubsubMessage};
735
736 use crate::apiv1::conn_pool::ConnectionManager;
737 use crate::apiv1::publisher_client::PublisherClient;
738 use crate::apiv1::subscriber_client::SubscriberClient;
739 use crate::subscriber::ReceivedMessage;
740 use crate::subscription::{
741 ReceiveConfig, SeekTo, SubscribeConfig, Subscription, SubscriptionConfig, SubscriptionConfigToUpdate,
742 };
743
744 const PROJECT_NAME: &str = "local-project";
745 const EMULATOR: &str = "localhost:8681";
746
747 #[ctor::ctor]
748 fn init() {
749 let _ = tracing_subscriber::fmt().try_init();
750 }
751
752 async fn create_subscription(enable_exactly_once_delivery: bool) -> Subscription {
753 let cm = ConnectionManager::new(
754 4,
755 "",
756 &Environment::Emulator(EMULATOR.to_string()),
757 &ConnectionOptions::default(),
758 )
759 .await
760 .unwrap();
761 let cm2 = ConnectionManager::new(
762 4,
763 "",
764 &Environment::Emulator(EMULATOR.to_string()),
765 &ConnectionOptions::default(),
766 )
767 .await
768 .unwrap();
769 let client = SubscriberClient::new(cm, cm2);
770
771 let uuid = Uuid::new_v4().hyphenated().to_string();
772 let subscription_name = format!("projects/{}/subscriptions/s{}", PROJECT_NAME, &uuid);
773 let topic_name = format!("projects/{PROJECT_NAME}/topics/test-topic1");
774 let subscription = Subscription::new(subscription_name, client);
775 let config = SubscriptionConfig {
776 enable_exactly_once_delivery,
777 ..Default::default()
778 };
779 if !subscription.exists(None).await.unwrap() {
780 subscription.create(topic_name.as_str(), config, None).await.unwrap();
781 }
782 subscription
783 }
784
785 async fn publish(messages: Option<Vec<PubsubMessage>>) {
786 let pubc = PublisherClient::new(
787 ConnectionManager::new(
788 4,
789 "",
790 &Environment::Emulator(EMULATOR.to_string()),
791 &ConnectionOptions::default(),
792 )
793 .await
794 .unwrap(),
795 );
796 let messages = messages.unwrap_or(vec![PubsubMessage {
797 data: "test_message".into(),
798 ..Default::default()
799 }]);
800 let req = PublishRequest {
801 topic: format!("projects/{PROJECT_NAME}/topics/test-topic1"),
802 messages,
803 };
804 let _ = pubc.publish(req, None).await;
805 }
806
807 async fn test_subscription(enable_exactly_once_delivery: bool) {
808 let subscription = create_subscription(enable_exactly_once_delivery).await;
809
810 let topic_name = format!("projects/{PROJECT_NAME}/topics/test-topic1");
811 let config = subscription.config(None).await.unwrap();
812 assert_eq!(config.0, topic_name);
813
814 let updating = SubscriptionConfigToUpdate {
815 ack_deadline_seconds: Some(100),
816 ..Default::default()
817 };
818 let new_config = subscription.update(updating, None).await.unwrap();
819 assert_eq!(new_config.0, topic_name);
820 assert_eq!(new_config.1.ack_deadline_seconds, 100);
821
822 let receiver_ctx = CancellationToken::new();
823 let cancel_receiver = receiver_ctx.clone();
824 let handle = tokio::spawn(async move {
825 let _ = subscription
826 .receive(
827 |message, _ctx| async move {
828 println!("{}", message.message.message_id);
829 let _ = message.ack().await;
830 },
831 cancel_receiver,
832 None,
833 )
834 .await;
835 subscription.delete(None).await.unwrap();
836 assert!(!subscription.exists(None).await.unwrap())
837 });
838 tokio::time::sleep(Duration::from_secs(3)).await;
839 receiver_ctx.cancel();
840 let _ = handle.await;
841 }
842
843 #[tokio::test(flavor = "multi_thread")]
844 #[serial]
845 async fn test_pull() {
846 let subscription = create_subscription(false).await;
847 let base = PubsubMessage {
848 data: "test_message".into(),
849 ..Default::default()
850 };
851 publish(Some(vec![base.clone(), base.clone(), base])).await;
852 let messages = subscription.pull(2, None).await.unwrap();
853 assert_eq!(messages.len(), 2);
854 for m in messages {
855 m.ack().await.unwrap();
856 }
857 subscription.delete(None).await.unwrap();
858 }
859
860 #[tokio::test]
861 #[serial]
862 async fn test_subscription_exactly_once() {
863 test_subscription(true).await;
864 }
865
866 #[tokio::test]
867 #[serial]
868 async fn test_subscription_at_least_once() {
869 test_subscription(false).await;
870 }
871
872 #[tokio::test(flavor = "multi_thread")]
873 #[serial]
874 async fn test_multi_subscriber_single_subscription_unbound() {
875 test_multi_subscriber_single_subscription(None).await;
876 }
877
878 #[tokio::test(flavor = "multi_thread")]
879 #[serial]
880 async fn test_multi_subscriber_single_subscription_bound() {
881 let opt = Some(ReceiveConfig {
882 channel_capacity: Some(1),
883 ..Default::default()
884 });
885 test_multi_subscriber_single_subscription(opt).await;
886 }
887
888 async fn test_multi_subscriber_single_subscription(opt: Option<ReceiveConfig>) {
889 let msg = PubsubMessage {
890 data: "test".into(),
891 ..Default::default()
892 };
893 let msg_size = 10;
894 let msgs: Vec<PubsubMessage> = (0..msg_size).map(|_v| msg.clone()).collect();
895 let subscription = create_subscription(false).await;
896 let cancellation_token = CancellationToken::new();
897 let cancel_receiver = cancellation_token.clone();
898 let v = Arc::new(AtomicU32::new(0));
899 let v2 = v.clone();
900 let handle = tokio::spawn(async move {
901 let _ = subscription
902 .receive(
903 move |message, _ctx| {
904 let v2 = v2.clone();
905 async move {
906 tracing::info!("received {}", message.message.message_id);
907 v2.fetch_add(1, SeqCst);
908 let _ = message.ack().await;
909 }
910 },
911 cancel_receiver,
912 opt,
913 )
914 .await;
915 });
916 publish(Some(msgs)).await;
917 tokio::time::sleep(Duration::from_secs(5)).await;
918 cancellation_token.cancel();
919 let _ = handle.await;
920 assert_eq!(v.load(SeqCst), msg_size);
921 }
922
923 #[tokio::test(flavor = "multi_thread")]
924 #[serial]
925 async fn test_multi_subscriber_multi_subscription() {
926 let mut subscriptions = vec![];
927
928 let ctx = CancellationToken::new();
929 for _ in 0..3 {
930 let subscription = create_subscription(false).await;
931 let v = Arc::new(AtomicU32::new(0));
932 let ctx = ctx.clone();
933 let v2 = v.clone();
934 let handle = tokio::spawn(async move {
935 let _ = subscription
936 .receive(
937 move |message, _ctx| {
938 let v2 = v2.clone();
939 async move {
940 v2.fetch_add(1, SeqCst);
941 let _ = message.ack().await;
942 }
943 },
944 ctx,
945 None,
946 )
947 .await;
948 });
949 subscriptions.push((handle, v))
950 }
951
952 publish(None).await;
953 tokio::time::sleep(Duration::from_secs(5)).await;
954
955 ctx.cancel();
956 for (task, v) in subscriptions {
957 let _ = task.await;
958 assert_eq!(v.load(SeqCst), 1);
959 }
960 }
961
962 #[tokio::test(flavor = "multi_thread")]
963 #[serial]
964 async fn test_batch_acking() {
965 let ctx = CancellationToken::new();
966 let subscription = create_subscription(false).await;
967 let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
968 let subscription_for_receive = subscription.clone();
969 let ctx_for_receive = ctx.clone();
970 let handle = tokio::spawn(async move {
971 let _ = subscription_for_receive
972 .receive(
973 move |message, _ctx| {
974 let sender = sender.clone();
975 async move {
976 let _ = sender.send(message.ack_id().to_string());
977 }
978 },
979 ctx_for_receive.clone(),
980 None,
981 )
982 .await;
983 });
984
985 let ctx_for_ack_manager = ctx.clone();
986 let ack_manager = tokio::spawn(async move {
987 let mut ack_ids = Vec::new();
988 while !ctx_for_ack_manager.is_cancelled() {
989 match tokio::time::timeout(Duration::from_secs(10), receiver.recv()).await {
990 Ok(ack_id) => {
991 if let Some(ack_id) = ack_id {
992 ack_ids.push(ack_id);
993 if ack_ids.len() > 10 {
994 subscription.ack(ack_ids).await.unwrap();
995 ack_ids = Vec::new();
996 }
997 }
998 }
999 Err(_e) => {
1000 subscription.ack(ack_ids).await.unwrap();
1002 ack_ids = Vec::new();
1003 }
1004 }
1005 }
1006 subscription.ack(ack_ids).await
1008 });
1009
1010 publish(None).await;
1011 tokio::time::sleep(Duration::from_secs(5)).await;
1012
1013 ctx.cancel();
1014 let _ = handle.await;
1015 assert!(ack_manager.await.is_ok());
1016 }
1017
1018 #[tokio::test]
1019 #[serial]
1020 async fn test_snapshots() {
1021 let subscription = create_subscription(false).await;
1022
1023 let snapshot_name = format!("snapshot-{}", rand::random::<u64>());
1024 let labels: HashMap<String, String> =
1025 HashMap::from_iter([("label-1".into(), "v1".into()), ("label-2".into(), "v2".into())]);
1026 let expected_fq_snap_name = format!("projects/{PROJECT_NAME}/snapshots/{snapshot_name}");
1027
1028 let _response = subscription.delete_snapshot(snapshot_name.as_str(), None).await;
1030
1031 let created_snapshot = subscription
1033 .create_snapshot(snapshot_name.as_str(), labels.clone(), None)
1034 .await
1035 .unwrap();
1036
1037 assert_eq!(created_snapshot.name, expected_fq_snap_name);
1038 let retrieved_snapshot = subscription.get_snapshot(snapshot_name.as_str(), None).await.unwrap();
1042 assert_eq!(created_snapshot, retrieved_snapshot);
1043
1044 subscription
1046 .delete_snapshot(snapshot_name.as_str(), None)
1047 .await
1048 .unwrap();
1049
1050 let _deleted_snapshot_status = subscription
1051 .get_snapshot(snapshot_name.as_str(), None)
1052 .await
1053 .expect_err("snapshot should have been deleted");
1054
1055 let _delete_again = subscription
1056 .delete_snapshot(snapshot_name.as_str(), None)
1057 .await
1058 .expect_err("snapshot should already be deleted");
1059 }
1060
1061 async fn ack_all(messages: &[ReceivedMessage]) {
1062 for message in messages.iter() {
1063 message.ack().await.unwrap();
1064 }
1065 }
1066
1067 #[tokio::test]
1068 #[serial]
1069 async fn test_seek_snapshot() {
1070 let subscription = create_subscription(false).await;
1071 let snapshot_name = format!("snapshot-{}", rand::random::<u64>());
1072
1073 publish(None).await;
1075 let messages = subscription.pull(100, None).await.unwrap();
1076 ack_all(&messages).await;
1077 assert_eq!(messages.len(), 1);
1078
1079 let _snapshot = subscription
1081 .create_snapshot(snapshot_name.as_str(), HashMap::new(), None)
1082 .await
1083 .unwrap();
1084
1085 publish(None).await;
1087 let messages = subscription.pull(100, None).await.unwrap();
1088 assert_eq!(messages.len(), 1);
1089 ack_all(&messages).await;
1090
1091 subscription
1093 .seek(SeekTo::Snapshot(snapshot_name.clone()), None)
1094 .await
1095 .unwrap();
1096
1097 let messages = subscription.pull(100, None).await.unwrap();
1099 assert_eq!(messages.len(), 1);
1100 ack_all(&messages).await;
1101
1102 subscription
1104 .delete_snapshot(snapshot_name.as_str(), None)
1105 .await
1106 .unwrap();
1107 subscription.delete(None).await.unwrap();
1108 }
1109
1110 #[tokio::test]
1111 #[serial]
1112 async fn test_seek_timestamp() {
1113 let subscription = create_subscription(false).await;
1114
1115 subscription
1117 .update(
1118 SubscriptionConfigToUpdate {
1119 retain_acked_messages: Some(true),
1120 message_retention_duration: Some(Duration::new(60 * 60 * 2, 0)),
1121 ..Default::default()
1122 },
1123 None,
1124 )
1125 .await
1126 .unwrap();
1127
1128 publish(None).await;
1130 let messages = subscription.pull(100, None).await.unwrap();
1131 ack_all(&messages).await;
1132 assert_eq!(messages.len(), 1);
1133
1134 let message_publish_time = messages.first().unwrap().message.publish_time.to_owned().unwrap();
1135
1136 subscription
1138 .seek(SeekTo::Timestamp(message_publish_time.to_owned().try_into().unwrap()), None)
1139 .await
1140 .unwrap();
1141
1142 let messages = subscription.pull(100, None).await.unwrap();
1144 ack_all(&messages).await;
1145 assert_eq!(messages.len(), 1);
1146 let seek_message_publish_time = messages.first().unwrap().message.publish_time.to_owned().unwrap();
1147 assert_eq!(seek_message_publish_time, message_publish_time);
1148
1149 subscription.delete(None).await.unwrap();
1151 }
1152
1153 #[tokio::test(flavor = "multi_thread")]
1154 #[serial]
1155 async fn test_subscribe_single_subscriber() {
1156 test_subscribe(None).await;
1157 }
1158
1159 #[tokio::test(flavor = "multi_thread")]
1160 #[serial]
1161 async fn test_subscribe_multiple_subscriber() {
1162 test_subscribe(Some(SubscribeConfig::default().with_enable_multiple_subscriber(true))).await;
1163 }
1164
1165 #[tokio::test(flavor = "multi_thread")]
1166 #[serial]
1167 async fn test_subscribe_multiple_subscriber_bound() {
1168 test_subscribe(Some(
1169 SubscribeConfig::default()
1170 .with_enable_multiple_subscriber(true)
1171 .with_channel_capacity(1),
1172 ))
1173 .await;
1174 }
1175
1176 async fn test_subscribe(opt: Option<SubscribeConfig>) {
1177 let msg = PubsubMessage {
1178 data: "test".into(),
1179 ..Default::default()
1180 };
1181 let msg_count = 10;
1182 let msg: Vec<PubsubMessage> = (0..msg_count).map(|_v| msg.clone()).collect();
1183 let subscription = create_subscription(false).await;
1184 let received = Arc::new(Mutex::new(0));
1185 let checking = received.clone();
1186 let mut iter = subscription.subscribe(opt).await.unwrap();
1187 let cancellable = iter.cancellable();
1188 let handler = tokio::spawn(async move {
1189 while let Some(message) = iter.next().await {
1190 tracing::info!("received {}", message.message.message_id);
1191 *received.lock().unwrap() += 1;
1192 tokio::time::sleep(Duration::from_millis(500)).await;
1193 let _ = message.ack().await;
1194 }
1195 });
1196 publish(Some(msg)).await;
1197 tokio::time::sleep(Duration::from_secs(8)).await;
1198 cancellable.cancel();
1199 let _ = handler.await;
1200 assert_eq!(*checking.lock().unwrap(), msg_count);
1201 }
1202
1203 #[tokio::test(flavor = "multi_thread")]
1204 #[serial]
1205 async fn test_subscribe_nack_on_cancel_read() {
1206 subscribe_nack_on_cancel_read(10, true).await;
1207 subscribe_nack_on_cancel_read(0, true).await;
1208 subscribe_nack_on_cancel_read(10, false).await;
1209 subscribe_nack_on_cancel_read(0, false).await;
1210 }
1211
1212 #[tokio::test(flavor = "multi_thread")]
1213 #[serial]
1214 async fn test_subscribe_nack_on_cancel_next() {
1215 subscribe_nack_on_cancel_next(10, Duration::from_secs(3)).await;
1217 subscribe_nack_on_cancel_next(10, Duration::from_millis(0)).await;
1219 subscribe_nack_on_cancel_next(0, Duration::from_secs(3)).await;
1221 }
1222
1223 async fn subscribe_nack_on_cancel_read(msg_count: usize, should_cancel: bool) {
1224 let opt = Some(SubscribeConfig::default().with_enable_multiple_subscriber(true));
1225
1226 let msg = PubsubMessage {
1227 data: "test".into(),
1228 ..Default::default()
1229 };
1230 let msg: Vec<PubsubMessage> = (0..msg_count).map(|_v| msg.clone()).collect();
1231 let subscription = create_subscription(false).await;
1232 let received = Arc::new(Mutex::new(0));
1233 let checking = received.clone();
1234
1235 let mut iter = subscription.subscribe(opt).await.unwrap();
1236 let ctx = iter.cancellable();
1237 let handler = tokio::spawn(async move {
1238 while let Some(message) = iter.read().await {
1239 tracing::info!("received {}", message.message.message_id);
1240 *received.lock().unwrap() += 1;
1241 if should_cancel {
1242 tokio::time::sleep(Duration::from_secs(10)).await;
1244 } else {
1245 tokio::time::sleep(Duration::from_millis(1)).await;
1246 }
1247 let _ = message.ack().await;
1248 }
1249 });
1250 publish(Some(msg)).await;
1251 tokio::time::sleep(Duration::from_secs(10)).await;
1252 ctx.cancel();
1253 handler.await.unwrap();
1254 if should_cancel && msg_count > 0 {
1255 assert!(*checking.lock().unwrap() < msg_count);
1257 } else {
1258 assert_eq!(*checking.lock().unwrap(), msg_count);
1260 }
1261 }
1262
1263 async fn subscribe_nack_on_cancel_next(msg_count: usize, recv_time: Duration) {
1264 let opt = Some(SubscribeConfig::default().with_enable_multiple_subscriber(true));
1265
1266 let msg = PubsubMessage {
1267 data: "test".into(),
1268 ..Default::default()
1269 };
1270 let msg: Vec<PubsubMessage> = (0..msg_count).map(|_v| msg.clone()).collect();
1271 let subscription = create_subscription(false).await;
1272 let received = Arc::new(Mutex::new(0));
1273 let checking = received.clone();
1274
1275 let mut iter = subscription.subscribe(opt).await.unwrap();
1276 let ctx = iter.cancellable();
1277 let handler = tokio::spawn(async move {
1278 while let Some(message) = iter.next().await {
1279 tracing::info!("received {}", message.message.message_id);
1280 *received.lock().unwrap() += 1;
1281 tokio::time::sleep(recv_time).await;
1282 let _ = message.ack().await;
1283 }
1284 });
1285 publish(Some(msg)).await;
1286 tokio::time::sleep(Duration::from_secs(10)).await;
1287 ctx.cancel();
1288 handler.await.unwrap();
1289 assert_eq!(*checking.lock().unwrap(), msg_count);
1290 }
1291
1292 #[tokio::test(flavor = "multi_thread")]
1293 #[serial]
1294 async fn test_message_stream_dispose() {
1295 let subscription = create_subscription(false).await;
1296 let mut iter = subscription.subscribe(None).await.unwrap();
1297 iter.dispose().await;
1298 iter.dispose().await;
1300 assert!(iter.next().await.is_none());
1301 }
1302}