1use prost_types::{DurationError, FieldMask};
2use std::collections::HashMap;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::{Duration, SystemTime};
6
7use google_cloud_gax::grpc::codegen::tokio_stream::{Stream, StreamExt};
8use google_cloud_gax::grpc::{Code, Status};
9use google_cloud_gax::retry::RetrySetting;
10use google_cloud_googleapis::pubsub::v1::seek_request::Target;
11use google_cloud_googleapis::pubsub::v1::subscription::AnalyticsHubSubscriptionInfo;
12use google_cloud_googleapis::pubsub::v1::{
13 BigQueryConfig, CloudStorageConfig, CreateSnapshotRequest, DeadLetterPolicy, DeleteSnapshotRequest,
14 DeleteSubscriptionRequest, ExpirationPolicy, GetSnapshotRequest, GetSubscriptionRequest, MessageTransform,
15 PullRequest, PushConfig, RetryPolicy, SeekRequest, Snapshot, Subscription as InternalSubscription,
16 UpdateSubscriptionRequest,
17};
18
19use crate::apiv1::subscriber_client::SubscriberClient;
20use crate::subscriber::{ack, nack, ReceivedMessage, Subscriber, SubscriberConfig};
21use google_cloud_gax::grpc::codegen::tokio_stream::wrappers::ReceiverStream;
22use tokio::sync::mpsc;
23
24#[derive(Debug, Clone, Default)]
25pub struct SubscriptionConfig {
26 pub push_config: Option<PushConfig>,
27 pub ack_deadline_seconds: i32,
28 pub retain_acked_messages: bool,
29 pub message_retention_duration: Option<Duration>,
30 pub labels: HashMap<String, String>,
31 pub enable_message_ordering: bool,
32 pub expiration_policy: Option<ExpirationPolicy>,
33 pub filter: String,
34 pub dead_letter_policy: Option<DeadLetterPolicy>,
35 pub retry_policy: Option<RetryPolicy>,
36 pub detached: bool,
37 pub topic_message_retention_duration: Option<Duration>,
38 pub enable_exactly_once_delivery: bool,
39 pub bigquery_config: Option<BigQueryConfig>,
40 pub state: i32,
41 pub cloud_storage_config: Option<CloudStorageConfig>,
42 pub analytics_hub_subscription_info: Option<AnalyticsHubSubscriptionInfo>,
43 pub message_transforms: Vec<MessageTransform>,
44}
45impl From<InternalSubscription> for SubscriptionConfig {
46 fn from(f: InternalSubscription) -> Self {
47 Self {
48 push_config: f.push_config,
49 bigquery_config: f.bigquery_config,
50 ack_deadline_seconds: f.ack_deadline_seconds,
51 retain_acked_messages: f.retain_acked_messages,
52 message_retention_duration: f
53 .message_retention_duration
54 .map(|v| std::time::Duration::new(v.seconds as u64, v.nanos as u32)),
55 labels: f.labels,
56 enable_message_ordering: f.enable_message_ordering,
57 expiration_policy: f.expiration_policy,
58 filter: f.filter,
59 dead_letter_policy: f.dead_letter_policy,
60 retry_policy: f.retry_policy,
61 detached: f.detached,
62 topic_message_retention_duration: f
63 .topic_message_retention_duration
64 .map(|v| std::time::Duration::new(v.seconds as u64, v.nanos as u32)),
65 enable_exactly_once_delivery: f.enable_exactly_once_delivery,
66 state: f.state,
67 cloud_storage_config: f.cloud_storage_config,
68 analytics_hub_subscription_info: f.analytics_hub_subscription_info,
69 message_transforms: f.message_transforms,
70 }
71 }
72}
73
74#[derive(Debug, Clone, Default)]
75pub struct SubscriptionConfigToUpdate {
76 pub push_config: Option<PushConfig>,
77 pub bigquery_config: Option<BigQueryConfig>,
78 pub ack_deadline_seconds: Option<i32>,
79 pub retain_acked_messages: Option<bool>,
80 pub message_retention_duration: Option<Duration>,
81 pub labels: Option<HashMap<String, String>>,
82 pub expiration_policy: Option<ExpirationPolicy>,
83 pub dead_letter_policy: Option<DeadLetterPolicy>,
84 pub retry_policy: Option<RetryPolicy>,
85}
86
87#[derive(Debug, Clone)]
88pub struct SubscribeConfig {
89 enable_multiple_subscriber: bool,
90 channel_capacity: usize,
91 subscriber_config: Option<SubscriberConfig>,
92}
93
94impl Default for SubscribeConfig {
95 fn default() -> Self {
96 Self {
97 enable_multiple_subscriber: false,
98 channel_capacity: 1000,
99 subscriber_config: None,
100 }
101 }
102}
103
104impl SubscribeConfig {
105 pub fn with_enable_multiple_subscriber(mut self, v: bool) -> Self {
106 self.enable_multiple_subscriber = v;
107 self
108 }
109 pub fn with_subscriber_config(mut self, v: SubscriberConfig) -> Self {
110 self.subscriber_config = Some(v);
111 self
112 }
113 pub fn with_channel_capacity(mut self, v: usize) -> Self {
114 self.channel_capacity = v;
115 self
116 }
117}
118
119#[derive(Debug, Clone)]
120pub enum SeekTo {
121 Timestamp(SystemTime),
122 Snapshot(String),
123}
124
125impl From<SeekTo> for Target {
126 fn from(to: SeekTo) -> Target {
127 use SeekTo::*;
128 match to {
129 Timestamp(t) => Target::Time(prost_types::Timestamp::from(t)),
130 Snapshot(s) => Target::Snapshot(s),
131 }
132 }
133}
134
135pub struct MessageStream {
136 inner: Option<ReceiverStream<ReceivedMessage>>,
137 subscribers: Option<Vec<Subscriber>>,
138}
139
140impl MessageStream {
141 pub async fn dispose(mut self) -> usize {
142 let tasks = match self.subscribers.take() {
144 Some(t) => t,
145 None => return 0,
146 };
147 let mut inner = match self.inner.take() {
148 Some(t) => t,
149 None => return 0,
150 };
151 inner.close();
152 let mut unprocessed = 0;
153 while let Some(msg) = inner.next().await {
154 let result = msg.nack().await;
155 match result {
156 Ok(_) => unprocessed += 1,
157 Err(e) => tracing::error!("nack message error: {}, {:?}", msg.ack_id(), e),
158 }
159 }
160 tracing::debug!("unprocessed messages in the buffer: {}", unprocessed);
161
162 for task in tasks {
164 let nacked = task.dispose().await;
165 tracing::debug!("unprocessed messages in the subscriber: {}", nacked);
166 unprocessed += nacked;
167 }
168 unprocessed
169 }
170}
171
172impl Drop for MessageStream {
173 fn drop(&mut self) {
174 if self.subscribers.is_none() {
175 return;
176 }
177 let mut inner = match self.inner.take() {
178 Some(t) => t,
179 None => return,
180 };
181 inner.close();
182 tracing::warn!("Call 'dispose' before drop in order to call nack for remaining messages");
183
184 let _forget = tokio::spawn(async move {
185 let mut ack_ids = vec![];
186 let mut subscription = None;
187 let mut client = None;
188 while let Some(msg) = inner.next().await {
189 ack_ids.push(msg.ack_id().to_string());
190 if subscription.is_none() {
191 subscription = Some(msg.subscription.clone());
192 }
193 if client.is_none() {
194 client = Some(msg.subscriber_client.clone());
195 }
196 }
197 if let (Some(sub), Some(cli)) = (subscription, client) {
198 tracing::debug!("nack {} unprocessed messages", ack_ids.len());
199 if let Err(err) = nack(&cli, sub, ack_ids).await {
200 tracing::error!("failed to nack message: {:?}", err);
201 }
202 }
203 });
204 }
205}
206
207impl Stream for MessageStream {
208 type Item = ReceivedMessage;
209
210 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
212 match &mut self.inner {
213 None => Poll::Ready(None),
214 Some(inner) => Pin::new(inner).poll_next(cx),
215 }
216 }
217}
218
219#[derive(Clone, Debug)]
221pub struct Subscription {
222 fqsn: String,
223 subc: SubscriberClient,
224}
225
226impl Subscription {
227 pub(crate) fn new(fqsn: String, subc: SubscriberClient) -> Self {
228 Self { fqsn, subc }
229 }
230
231 pub(crate) fn streaming_pool_size(&self) -> usize {
232 self.subc.streaming_pool_size()
233 }
234
235 pub fn id(&self) -> String {
237 self.fqsn
238 .rfind('/')
239 .map_or("".to_string(), |i| self.fqsn[(i + 1)..].to_string())
240 }
241
242 pub fn fully_qualified_name(&self) -> &str {
244 self.fqsn.as_str()
245 }
246
247 pub fn fully_qualified_snapshot_name(&self, id: &str) -> String {
249 if id.contains('/') {
250 id.to_string()
251 } else {
252 format!("{}/snapshots/{}", self.fully_qualified_project_name(), id)
253 }
254 }
255
256 fn fully_qualified_project_name(&self) -> String {
257 let parts: Vec<_> = self
258 .fqsn
259 .split('/')
260 .enumerate()
261 .filter(|&(i, _)| i < 2)
262 .map(|e| e.1)
263 .collect();
264 parts.join("/")
265 }
266
267 pub fn get_client(&self) -> SubscriberClient {
268 self.subc.clone()
269 }
270
271 pub async fn create(&self, fqtn: &str, cfg: SubscriptionConfig, retry: Option<RetrySetting>) -> Result<(), Status> {
273 self.subc
274 .create_subscription(
275 InternalSubscription {
276 name: self.fully_qualified_name().to_string(),
277 topic: fqtn.to_string(),
278 push_config: cfg.push_config,
279 bigquery_config: cfg.bigquery_config,
280 cloud_storage_config: cfg.cloud_storage_config,
281 ack_deadline_seconds: cfg.ack_deadline_seconds,
282 labels: cfg.labels,
283 enable_message_ordering: cfg.enable_message_ordering,
284 expiration_policy: cfg.expiration_policy,
285 filter: cfg.filter,
286 dead_letter_policy: cfg.dead_letter_policy,
287 retry_policy: cfg.retry_policy,
288 detached: cfg.detached,
289 message_retention_duration: cfg
290 .message_retention_duration
291 .map(Duration::try_into)
292 .transpose()
293 .map_err(|err: DurationError| Status::internal(err.to_string()))?,
294 retain_acked_messages: cfg.retain_acked_messages,
295 topic_message_retention_duration: cfg
296 .topic_message_retention_duration
297 .map(Duration::try_into)
298 .transpose()
299 .map_err(|err: DurationError| Status::internal(err.to_string()))?,
300 enable_exactly_once_delivery: cfg.enable_exactly_once_delivery,
301 state: cfg.state,
302 analytics_hub_subscription_info: cfg.analytics_hub_subscription_info,
303 message_transforms: cfg.message_transforms,
304 },
305 retry,
306 )
307 .await
308 .map(|_v| ())
309 }
310
311 pub async fn delete(&self, retry: Option<RetrySetting>) -> Result<(), Status> {
313 let req = DeleteSubscriptionRequest {
314 subscription: self.fqsn.to_string(),
315 };
316 self.subc.delete_subscription(req, retry).await.map(|v| v.into_inner())
317 }
318
319 pub async fn exists(&self, retry: Option<RetrySetting>) -> Result<bool, Status> {
321 let req = GetSubscriptionRequest {
322 subscription: self.fqsn.to_string(),
323 };
324 match self.subc.get_subscription(req, retry).await {
325 Ok(_) => Ok(true),
326 Err(e) => {
327 if e.code() == Code::NotFound {
328 Ok(false)
329 } else {
330 Err(e)
331 }
332 }
333 }
334 }
335
336 pub async fn config(&self, retry: Option<RetrySetting>) -> Result<(String, SubscriptionConfig), Status> {
338 let req = GetSubscriptionRequest {
339 subscription: self.fqsn.to_string(),
340 };
341 self.subc.get_subscription(req, retry).await.map(|v| {
342 let inner = v.into_inner();
343 (inner.topic.to_string(), inner.into())
344 })
345 }
346
347 pub async fn update(
350 &self,
351 updating: SubscriptionConfigToUpdate,
352 retry: Option<RetrySetting>,
353 ) -> Result<(String, SubscriptionConfig), Status> {
354 let req = GetSubscriptionRequest {
355 subscription: self.fqsn.to_string(),
356 };
357 let mut config = self.subc.get_subscription(req, retry.clone()).await?.into_inner();
358
359 let mut paths = vec![];
360 if updating.push_config.is_some() {
361 config.push_config = updating.push_config;
362 paths.push("push_config".to_string());
363 }
364 if updating.bigquery_config.is_some() {
365 config.bigquery_config = updating.bigquery_config;
366 paths.push("bigquery_config".to_string());
367 }
368 if let Some(v) = updating.ack_deadline_seconds {
369 config.ack_deadline_seconds = v;
370 paths.push("ack_deadline_seconds".to_string());
371 }
372 if let Some(v) = updating.retain_acked_messages {
373 config.retain_acked_messages = v;
374 paths.push("retain_acked_messages".to_string());
375 }
376 if updating.message_retention_duration.is_some() {
377 config.message_retention_duration = updating
378 .message_retention_duration
379 .map(prost_types::Duration::try_from)
380 .transpose()
381 .map_err(|err| Status::internal(err.to_string()))?;
382 paths.push("message_retention_duration".to_string());
383 }
384 if updating.expiration_policy.is_some() {
385 config.expiration_policy = updating.expiration_policy;
386 paths.push("expiration_policy".to_string());
387 }
388 if let Some(v) = updating.labels {
389 config.labels = v;
390 paths.push("labels".to_string());
391 }
392 if updating.retry_policy.is_some() {
393 config.retry_policy = updating.retry_policy;
394 paths.push("retry_policy".to_string());
395 }
396
397 let update_req = UpdateSubscriptionRequest {
398 subscription: Some(config),
399 update_mask: Some(FieldMask { paths }),
400 };
401 self.subc.update_subscription(update_req, retry).await.map(|v| {
402 let inner = v.into_inner();
403 (inner.topic.to_string(), inner.into())
404 })
405 }
406
407 pub async fn pull(&self, max_messages: i32, retry: Option<RetrySetting>) -> Result<Vec<ReceivedMessage>, Status> {
409 #[allow(deprecated)]
410 let req = PullRequest {
411 subscription: self.fqsn.clone(),
412 return_immediately: false,
413 max_messages,
414 };
415 let messages = self.subc.pull(req, retry).await?.into_inner().received_messages;
416 Ok(messages
417 .into_iter()
418 .filter(|m| m.message.is_some())
419 .map(|m| {
420 ReceivedMessage::new(
421 self.fqsn.clone(),
422 self.subc.clone(),
423 m.message.unwrap(),
424 m.ack_id,
425 (m.delivery_attempt > 0).then_some(m.delivery_attempt as usize),
426 )
427 })
428 .collect())
429 }
430
431 pub async fn subscribe(&self, opt: Option<SubscribeConfig>) -> Result<MessageStream, Status> {
462 let opt = opt.unwrap_or_default();
463 let (tx, rx) = mpsc::channel(opt.channel_capacity.max(1));
464 let sub_opt = self.unwrap_subscribe_config(opt.subscriber_config).await?;
465
466 let subscriber_num = if opt.enable_multiple_subscriber {
468 self.streaming_pool_size()
469 } else {
470 1
471 };
472 let mut subscribers = Vec::with_capacity(subscriber_num);
473 for _ in 0..subscriber_num {
474 subscribers.push(Subscriber::spawn(
475 self.fqsn.clone(),
476 self.subc.clone(),
477 tx.clone(),
478 sub_opt.clone(),
479 ));
480 }
481
482 Ok(MessageStream {
483 inner: Some(ReceiverStream::new(rx)),
484 subscribers: Some(subscribers),
485 })
486 }
487
488 pub async fn ack(&self, ack_ids: Vec<String>) -> Result<(), Status> {
492 ack(&self.subc, self.fqsn.to_string(), ack_ids).await
493 }
494
495 pub async fn seek(&self, to: SeekTo, retry: Option<RetrySetting>) -> Result<(), Status> {
497 let to = match to {
498 SeekTo::Timestamp(t) => SeekTo::Timestamp(t),
499 SeekTo::Snapshot(name) => SeekTo::Snapshot(self.fully_qualified_snapshot_name(name.as_str())),
500 };
501
502 let req = SeekRequest {
503 subscription: self.fqsn.to_owned(),
504 target: Some(to.into()),
505 };
506
507 let _ = self.subc.seek(req, retry).await?;
508 Ok(())
509 }
510
511 pub async fn get_snapshot(&self, name: &str, retry: Option<RetrySetting>) -> Result<Snapshot, Status> {
513 let req = GetSnapshotRequest {
514 snapshot: self.fully_qualified_snapshot_name(name),
515 };
516 Ok(self.subc.get_snapshot(req, retry).await?.into_inner())
517 }
518
519 pub async fn create_snapshot(
529 &self,
530 name: &str,
531 labels: HashMap<String, String>,
532 retry: Option<RetrySetting>,
533 ) -> Result<Snapshot, Status> {
534 let req = CreateSnapshotRequest {
535 name: self.fully_qualified_snapshot_name(name),
536 labels,
537 subscription: self.fqsn.to_owned(),
538 };
539 Ok(self.subc.create_snapshot(req, retry).await?.into_inner())
540 }
541
542 pub async fn delete_snapshot(&self, name: &str, retry: Option<RetrySetting>) -> Result<(), Status> {
544 let req = DeleteSnapshotRequest {
545 snapshot: self.fully_qualified_snapshot_name(name),
546 };
547 let _ = self.subc.delete_snapshot(req, retry).await?;
548 Ok(())
549 }
550
551 async fn unwrap_subscribe_config(&self, cfg: Option<SubscriberConfig>) -> Result<SubscriberConfig, Status> {
552 if let Some(cfg) = cfg {
553 return Ok(cfg);
554 }
555 let cfg = self.config(None).await?;
556 let mut default_cfg = SubscriberConfig {
557 stream_ack_deadline_seconds: cfg.1.ack_deadline_seconds.clamp(10, 600),
558 ..Default::default()
559 };
560 if cfg.1.enable_exactly_once_delivery {
561 default_cfg.max_outstanding_messages = 5;
562 }
563 Ok(default_cfg)
564 }
565}
566
567#[cfg(test)]
568#[allow(deprecated)]
569mod tests {
570
571 use std::collections::HashMap;
572
573 use std::time::Duration;
574
575 use futures_util::StreamExt;
576 use serial_test::serial;
577 use tokio_util::sync::CancellationToken;
578
579 use uuid::Uuid;
580
581 use google_cloud_gax::conn::{ConnectionOptions, Environment};
582 use google_cloud_googleapis::pubsub::v1::{PublishRequest, PubsubMessage};
583
584 use crate::apiv1::conn_pool::ConnectionManager;
585 use crate::apiv1::publisher_client::PublisherClient;
586 use crate::apiv1::subscriber_client::SubscriberClient;
587 use crate::subscriber::ReceivedMessage;
588 use crate::subscription::{SeekTo, SubscribeConfig, Subscription, SubscriptionConfig, SubscriptionConfigToUpdate};
589 use crate::topic::Topic;
590
591 const PROJECT_NAME: &str = "local-project";
592 const EMULATOR: &str = "localhost:8681";
593
594 #[tokio::test(flavor = "multi_thread")]
595 #[serial]
596 async fn test_pull() {
597 let (subscription, topic) = create_subscription(false, false).await;
598 let base = PubsubMessage {
599 data: "test_message".into(),
600 ..Default::default()
601 };
602 publish(&topic, Some(vec![base.clone(), base.clone(), base])).await;
603 let messages = subscription.pull(2, None).await.unwrap();
604 assert_eq!(messages.len(), 2);
605 for m in messages {
606 m.ack().await.unwrap();
607 }
608 subscription.delete(None).await.unwrap();
609 }
610
611 #[tokio::test(flavor = "multi_thread")]
612 #[serial]
613 async fn test_batch_ack() {
614 let ctx = CancellationToken::new();
615 let (subscription, topic) = create_subscription(false, false).await;
616 let (sender, receiver) = async_channel::unbounded();
617 let subscription_for_receive = subscription.clone();
618 let ctx_for_subscribe = ctx.clone();
619
620 let subscriber = tokio::spawn(async move {
621 let mut stream = subscription_for_receive.subscribe(None).await.unwrap();
622 while let Some(message) = tokio::select! {
623 v = stream.next() => v,
624 _ = ctx_for_subscribe.cancelled() => None,
625 } {
626 let _ = sender.send(message.ack_id().to_string()).await;
627 }
628 stream.dispose().await;
629 tracing::info!("finish subscriber task");
630 });
631
632 let ack_manager = tokio::spawn(async move {
633 let mut ack_ids = Vec::new();
634 while let Ok(ack_id) = receiver.recv().await {
635 tracing::info!("received ack_id: {}", ack_id);
636 ack_ids.push(ack_id);
637 }
638 assert!(!ack_ids.is_empty());
639 let _ = subscription.ack(ack_ids).await;
640 tracing::info!("finish ack manager task");
641 });
642
643 let msg = PubsubMessage {
644 data: "test".into(),
645 ..Default::default()
646 };
647 let msg: Vec<PubsubMessage> = (0..10).map(|_v| msg.clone()).collect();
648 publish(&topic, Some(msg)).await;
649 tokio::time::sleep(Duration::from_secs(10)).await;
650 ctx.cancel();
651
652 assert!(subscriber.await.is_ok());
653 assert!(ack_manager.await.is_ok());
654 }
655
656 #[tokio::test]
657 #[serial]
658 async fn test_snapshots() {
659 let (subscription, _topic) = create_subscription(false, false).await;
660
661 let snapshot_name = format!("snapshot-{}", rand::random::<u64>());
662 let labels: HashMap<String, String> =
663 HashMap::from_iter([("label-1".into(), "v1".into()), ("label-2".into(), "v2".into())]);
664 let expected_fq_snap_name = format!("projects/{PROJECT_NAME}/snapshots/{snapshot_name}");
665
666 let _response = subscription.delete_snapshot(snapshot_name.as_str(), None).await;
668
669 let created_snapshot = subscription
671 .create_snapshot(snapshot_name.as_str(), labels.clone(), None)
672 .await
673 .unwrap();
674
675 assert_eq!(created_snapshot.name, expected_fq_snap_name);
676 let retrieved_snapshot = subscription.get_snapshot(snapshot_name.as_str(), None).await.unwrap();
680 assert_eq!(created_snapshot, retrieved_snapshot);
681
682 subscription
684 .delete_snapshot(snapshot_name.as_str(), None)
685 .await
686 .unwrap();
687
688 let _deleted_snapshot_status = subscription
689 .get_snapshot(snapshot_name.as_str(), None)
690 .await
691 .expect_err("snapshot should have been deleted");
692
693 let _delete_again = subscription
694 .delete_snapshot(snapshot_name.as_str(), None)
695 .await
696 .expect_err("snapshot should already be deleted");
697 }
698
699 #[tokio::test]
700 #[serial]
701 async fn test_seek_snapshot() {
702 let (subscription, topic) = create_subscription(false, false).await;
703 let snapshot_name = format!("snapshot-{}", rand::random::<u64>());
704
705 publish(&topic, None).await;
707 let messages = subscription.pull(100, None).await.unwrap();
708 ack_all(&messages).await;
709 assert_eq!(messages.len(), 1);
710
711 let _snapshot = subscription
713 .create_snapshot(snapshot_name.as_str(), HashMap::new(), None)
714 .await
715 .unwrap();
716
717 publish(&topic, None).await;
719 let messages = subscription.pull(100, None).await.unwrap();
720 assert_eq!(messages.len(), 1);
721 ack_all(&messages).await;
722
723 subscription
725 .seek(SeekTo::Snapshot(snapshot_name.clone()), None)
726 .await
727 .unwrap();
728
729 let messages = subscription.pull(100, None).await.unwrap();
731 assert_eq!(messages.len(), 1);
732 ack_all(&messages).await;
733
734 subscription
736 .delete_snapshot(snapshot_name.as_str(), None)
737 .await
738 .unwrap();
739 subscription.delete(None).await.unwrap();
740 }
741
742 #[tokio::test]
743 #[serial]
744 async fn test_seek_timestamp() {
745 let (subscription, topic) = create_subscription(false, false).await;
746
747 subscription
749 .update(
750 SubscriptionConfigToUpdate {
751 retain_acked_messages: Some(true),
752 message_retention_duration: Some(Duration::new(60 * 60 * 2, 0)),
753 ..Default::default()
754 },
755 None,
756 )
757 .await
758 .unwrap();
759
760 publish(&topic, None).await;
762 let messages = subscription.pull(100, None).await.unwrap();
763 ack_all(&messages).await;
764 assert_eq!(messages.len(), 1);
765
766 let message_publish_time = messages.first().unwrap().message.publish_time.to_owned().unwrap();
767
768 subscription
770 .seek(SeekTo::Timestamp(message_publish_time.to_owned().try_into().unwrap()), None)
771 .await
772 .unwrap();
773
774 let messages = subscription.pull(100, None).await.unwrap();
776 ack_all(&messages).await;
777 assert_eq!(messages.len(), 1);
778 let seek_message_publish_time = messages.first().unwrap().message.publish_time.to_owned().unwrap();
779 assert_eq!(seek_message_publish_time, message_publish_time);
780
781 subscription.delete(None).await.unwrap();
783 }
784
785 #[tokio::test(flavor = "multi_thread")]
786 #[serial]
787 async fn test_subscribe_pattern() {
788 let opt = Some(SubscribeConfig::default());
790 test_subscribe(opt.clone(), true, true, 10, 11).await;
791 test_subscribe(opt.clone(), false, true, 10, 11).await;
792 test_subscribe(opt.clone(), true, false, 10, 10).await;
793 test_subscribe(opt.clone(), false, false, 10, 10).await;
794 test_subscribe(opt.clone(), true, true, 10, 5).await;
795 test_subscribe(opt.clone(), false, true, 10, 5).await;
796 test_subscribe(opt.clone(), true, false, 10, 1).await;
797 test_subscribe(opt.clone(), false, false, 10, 1).await;
798 test_subscribe(opt.clone(), true, true, 0, 0).await;
799 test_subscribe(opt.clone(), false, true, 0, 0).await;
800
801 let opt = Some(SubscribeConfig::default().with_enable_multiple_subscriber(true));
803 test_subscribe(opt.clone(), true, false, 10, 11).await;
804 test_subscribe(opt.clone(), false, false, 10, 11).await;
805 test_subscribe(opt.clone(), true, true, 10, 10).await;
806 test_subscribe(opt.clone(), false, true, 10, 10).await;
807 test_subscribe(opt.clone(), true, false, 10, 5).await;
808 test_subscribe(opt.clone(), false, false, 10, 5).await;
809 test_subscribe(opt.clone(), true, true, 10, 1).await;
810 test_subscribe(opt.clone(), false, true, 10, 1).await;
811 test_subscribe(opt.clone(), true, false, 0, 0).await;
812 test_subscribe(opt.clone(), false, false, 0, 0).await;
813
814 let opt = Some(
816 SubscribeConfig::default()
817 .with_enable_multiple_subscriber(true)
818 .with_channel_capacity(1),
819 );
820 test_subscribe(opt.clone(), true, true, 10, 11).await;
821 test_subscribe(opt.clone(), false, true, 10, 11).await;
822 test_subscribe(opt.clone(), true, false, 10, 10).await;
823 test_subscribe(opt.clone(), false, false, 10, 10).await;
824 test_subscribe(opt.clone(), true, true, 10, 5).await;
825 test_subscribe(opt.clone(), false, true, 10, 5).await;
826 test_subscribe(opt.clone(), true, false, 10, 1).await;
827 test_subscribe(opt.clone(), false, false, 10, 1).await;
828 test_subscribe(opt.clone(), true, true, 0, 0).await;
829 test_subscribe(opt.clone(), false, true, 0, 0).await;
830 }
831
832 #[tokio::test(flavor = "multi_thread")]
833 #[serial]
834 async fn test_subscribe_forget() {
835 let (subscription, topic) = create_subscription(false, false).await;
836
837 let iter = subscription.subscribe(None).await.unwrap();
839
840 let msg = PubsubMessage {
841 data: "test".into(),
842 ordering_key: "order1".to_string(),
843 ..Default::default()
844 };
845 let msg: Vec<PubsubMessage> = (0..10).map(|_v| msg.clone()).collect();
846 publish(&topic, Some(msg)).await;
847 tokio::time::sleep(Duration::from_secs(5)).await;
848
849 drop(iter);
851 tokio::time::sleep(Duration::from_secs(3)).await;
852
853 let ctx = CancellationToken::new();
855 let ctx_for_sub = ctx.clone();
856 let subscriber = tokio::spawn(async move {
857 let mut acked = 0;
858 let mut iter = subscription.subscribe(None).await.unwrap();
859 while let Some(message) = tokio::select! {
860 v = iter.next() => v,
861 _ = ctx_for_sub.cancelled() => None,
862 } {
863 let _ = message.ack().await;
864 tracing::info!("acked {}", message.message.message_id);
865 acked += 1;
866 }
867 let nack_msgs = iter.dispose().await;
868 assert_eq!(nack_msgs, 0);
869 tracing::info!("disposed");
870 acked
871 });
872
873 tokio::time::sleep(Duration::from_secs(20)).await;
874 ctx.cancel();
875 let acked = subscriber.await.unwrap();
876 assert_eq!(acked, 10);
877 }
878
879 #[tokio::test(flavor = "multi_thread")]
880 #[serial]
881 async fn test_subscribe_finish_on_available() {
882 test_subscribe_unavailable(SubscribeConfig::default()).await;
883
884 let cfg = SubscribeConfig::default().with_enable_multiple_subscriber(true);
885 test_subscribe_unavailable(cfg).await;
886
887 let cfg = SubscribeConfig::default()
888 .with_enable_multiple_subscriber(true)
889 .with_channel_capacity(1);
890 test_subscribe_unavailable(cfg).await;
891 }
892
893 async fn test_subscribe_unavailable(cfg: SubscribeConfig) {
894 let (subscription, _) = create_subscription(true, true).await;
895 let subscription_for_delete = subscription.clone();
896 tokio::spawn(async move {
897 tokio::time::sleep(Duration::from_secs(5)).await;
898 subscription_for_delete.delete(None).await.unwrap();
899 });
900 let mut iter = subscription.subscribe(Some(cfg)).await.unwrap();
901 while let Some(message) = tokio::select! {
902 v = iter.next() => v,
903 _ = tokio::time::sleep(Duration::from_secs(10)) => {
904 panic!("test_subscribe_finish_on_available timeout");
905 }
906 } {
907 message.ack().await.unwrap();
908 }
909 iter.dispose().await;
910 }
911
912 async fn test_subscribe(
913 opt: Option<SubscribeConfig>,
914 enable_exactly_once_delivery: bool,
915 enable_message_ordering: bool,
916 msg_count: usize,
917 limit: usize,
918 ) {
919 tracing::info!(
920 "test_subscribe: exactly_once_delivery={} msg_count={} limit={}",
921 enable_exactly_once_delivery,
922 msg_count,
923 limit
924 );
925 let (subscription, topic) = create_subscription(enable_exactly_once_delivery, enable_message_ordering).await;
926
927 let ctx = CancellationToken::new();
928 let ctx_for_pub = ctx.clone();
929
930 let publisher = tokio::spawn(async move {
932 let msg = PubsubMessage {
933 data: "test".into(),
934 ordering_key: "order1".to_string(),
935 ..Default::default()
936 };
937 let msg: Vec<PubsubMessage> = (0..msg_count).map(|_v| msg.clone()).collect();
938 publish(&topic, Some(msg)).await;
939 tokio::time::sleep(Duration::from_secs(10)).await;
940 ctx_for_pub.cancel();
941 });
942
943 let mut acked = 0;
945 let mut iter = subscription.subscribe(opt).await.unwrap();
946 while let Some(message) = {
947 tokio::select! {
948 v = iter.next() => v,
949 _ = ctx.cancelled() => None
950 }
951 } {
952 let _ = message.ack().await;
953 tracing::info!("acked {}", message.message.message_id);
954 acked += 1;
955 if acked >= limit {
956 break;
958 }
959 }
960 let nack_msgs = iter.dispose().await;
961 assert_eq!(nack_msgs, msg_count - limit.min(msg_count));
962
963 publisher.await.unwrap();
964 tracing::info!("disposed");
965
966 if limit > msg_count {
967 assert_eq!(acked, msg_count);
968 } else {
969 assert_eq!(acked, limit);
970 }
971 }
972
973 async fn ack_all(messages: &[ReceivedMessage]) {
974 for message in messages.iter() {
975 message.ack().await.unwrap();
976 }
977 }
978
979 async fn create_subscription(
980 enable_exactly_once_delivery: bool,
981 enable_message_ordering: bool,
982 ) -> (Subscription, Topic) {
983 let cm = ConnectionManager::new(
984 4,
985 "",
986 &Environment::Emulator(EMULATOR.to_string()),
987 &ConnectionOptions::default(),
988 )
989 .await
990 .unwrap();
991 let cm2 = ConnectionManager::new(
992 4,
993 "",
994 &Environment::Emulator(EMULATOR.to_string()),
995 &ConnectionOptions::default(),
996 )
997 .await
998 .unwrap();
999 let cm3 = ConnectionManager::new(
1000 4,
1001 "",
1002 &Environment::Emulator(EMULATOR.to_string()),
1003 &ConnectionOptions::default(),
1004 )
1005 .await
1006 .unwrap();
1007 let sub_client = SubscriberClient::new(cm, cm2);
1008 let pub_client = PublisherClient::new(cm3);
1009 let uuid = Uuid::new_v4().hyphenated().to_string();
1010
1011 let topic_name = format!("projects/{}/topics/t{}", PROJECT_NAME, &uuid);
1012 let topic = Topic::new(topic_name.clone(), pub_client, sub_client.clone());
1013 topic.create(None, None).await.unwrap();
1014
1015 let subscription_name = format!("projects/{}/subscriptions/s{}", PROJECT_NAME, &uuid);
1016 let subscription = Subscription::new(subscription_name, sub_client);
1017 let config = SubscriptionConfig {
1018 enable_exactly_once_delivery,
1019 enable_message_ordering,
1020 ..Default::default()
1021 };
1022 subscription.create(topic_name.as_str(), config, None).await.unwrap();
1023 (subscription, topic)
1024 }
1025
1026 async fn publish(topic: &Topic, messages: Option<Vec<PubsubMessage>>) {
1027 let pubc = PublisherClient::new(
1028 ConnectionManager::new(
1029 4,
1030 "",
1031 &Environment::Emulator(EMULATOR.to_string()),
1032 &ConnectionOptions::default(),
1033 )
1034 .await
1035 .unwrap(),
1036 );
1037 let messages = messages.unwrap_or(vec![PubsubMessage {
1038 data: "test_message".into(),
1039 ..Default::default()
1040 }]);
1041 let req = PublishRequest {
1042 topic: topic.fully_qualified_name().to_string(),
1043 messages,
1044 };
1045 let _ = pubc.publish(req, None).await;
1046 }
1047}