Skip to main content

gcloud_pubsub/
subscription.rs

1use prost_types::{DurationError, FieldMask};
2use std::collections::HashMap;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::{Duration, SystemTime};
6
7use google_cloud_gax::grpc::codegen::tokio_stream::{Stream, StreamExt};
8use google_cloud_gax::grpc::{Code, Status};
9use google_cloud_gax::retry::RetrySetting;
10use google_cloud_googleapis::pubsub::v1::seek_request::Target;
11use google_cloud_googleapis::pubsub::v1::subscription::AnalyticsHubSubscriptionInfo;
12use google_cloud_googleapis::pubsub::v1::{
13    BigQueryConfig, CloudStorageConfig, CreateSnapshotRequest, DeadLetterPolicy, DeleteSnapshotRequest,
14    DeleteSubscriptionRequest, ExpirationPolicy, GetSnapshotRequest, GetSubscriptionRequest, MessageTransform,
15    PullRequest, PushConfig, RetryPolicy, SeekRequest, Snapshot, Subscription as InternalSubscription,
16    UpdateSubscriptionRequest,
17};
18
19use crate::apiv1::subscriber_client::SubscriberClient;
20use crate::subscriber::{ack, nack, ReceivedMessage, Subscriber, SubscriberConfig};
21use google_cloud_gax::grpc::codegen::tokio_stream::wrappers::ReceiverStream;
22use tokio::sync::mpsc;
23
24#[derive(Debug, Clone, Default)]
25pub struct SubscriptionConfig {
26    pub push_config: Option<PushConfig>,
27    pub ack_deadline_seconds: i32,
28    pub retain_acked_messages: bool,
29    pub message_retention_duration: Option<Duration>,
30    pub labels: HashMap<String, String>,
31    pub enable_message_ordering: bool,
32    pub expiration_policy: Option<ExpirationPolicy>,
33    pub filter: String,
34    pub dead_letter_policy: Option<DeadLetterPolicy>,
35    pub retry_policy: Option<RetryPolicy>,
36    pub detached: bool,
37    pub topic_message_retention_duration: Option<Duration>,
38    pub enable_exactly_once_delivery: bool,
39    pub bigquery_config: Option<BigQueryConfig>,
40    pub state: i32,
41    pub cloud_storage_config: Option<CloudStorageConfig>,
42    pub analytics_hub_subscription_info: Option<AnalyticsHubSubscriptionInfo>,
43    pub message_transforms: Vec<MessageTransform>,
44}
45impl From<InternalSubscription> for SubscriptionConfig {
46    fn from(f: InternalSubscription) -> Self {
47        Self {
48            push_config: f.push_config,
49            bigquery_config: f.bigquery_config,
50            ack_deadline_seconds: f.ack_deadline_seconds,
51            retain_acked_messages: f.retain_acked_messages,
52            message_retention_duration: f
53                .message_retention_duration
54                .map(|v| std::time::Duration::new(v.seconds as u64, v.nanos as u32)),
55            labels: f.labels,
56            enable_message_ordering: f.enable_message_ordering,
57            expiration_policy: f.expiration_policy,
58            filter: f.filter,
59            dead_letter_policy: f.dead_letter_policy,
60            retry_policy: f.retry_policy,
61            detached: f.detached,
62            topic_message_retention_duration: f
63                .topic_message_retention_duration
64                .map(|v| std::time::Duration::new(v.seconds as u64, v.nanos as u32)),
65            enable_exactly_once_delivery: f.enable_exactly_once_delivery,
66            state: f.state,
67            cloud_storage_config: f.cloud_storage_config,
68            analytics_hub_subscription_info: f.analytics_hub_subscription_info,
69            message_transforms: f.message_transforms,
70        }
71    }
72}
73
74#[derive(Debug, Clone, Default)]
75pub struct SubscriptionConfigToUpdate {
76    pub push_config: Option<PushConfig>,
77    pub bigquery_config: Option<BigQueryConfig>,
78    pub ack_deadline_seconds: Option<i32>,
79    pub retain_acked_messages: Option<bool>,
80    pub message_retention_duration: Option<Duration>,
81    pub labels: Option<HashMap<String, String>>,
82    pub expiration_policy: Option<ExpirationPolicy>,
83    pub dead_letter_policy: Option<DeadLetterPolicy>,
84    pub retry_policy: Option<RetryPolicy>,
85}
86
87#[derive(Debug, Clone)]
88pub struct SubscribeConfig {
89    enable_multiple_subscriber: bool,
90    channel_capacity: usize,
91    subscriber_config: Option<SubscriberConfig>,
92}
93
94impl Default for SubscribeConfig {
95    fn default() -> Self {
96        Self {
97            enable_multiple_subscriber: false,
98            channel_capacity: 1000,
99            subscriber_config: None,
100        }
101    }
102}
103
104impl SubscribeConfig {
105    pub fn with_enable_multiple_subscriber(mut self, v: bool) -> Self {
106        self.enable_multiple_subscriber = v;
107        self
108    }
109    pub fn with_subscriber_config(mut self, v: SubscriberConfig) -> Self {
110        self.subscriber_config = Some(v);
111        self
112    }
113    pub fn with_channel_capacity(mut self, v: usize) -> Self {
114        self.channel_capacity = v;
115        self
116    }
117}
118
119#[derive(Debug, Clone)]
120pub enum SeekTo {
121    Timestamp(SystemTime),
122    Snapshot(String),
123}
124
125impl From<SeekTo> for Target {
126    fn from(to: SeekTo) -> Target {
127        use SeekTo::*;
128        match to {
129            Timestamp(t) => Target::Time(prost_types::Timestamp::from(t)),
130            Snapshot(s) => Target::Snapshot(s),
131        }
132    }
133}
134
135pub struct MessageStream {
136    inner: Option<ReceiverStream<ReceivedMessage>>,
137    subscribers: Option<Vec<Subscriber>>,
138}
139
140impl MessageStream {
141    pub async fn dispose(mut self) -> usize {
142        // dispose buffer
143        let tasks = match self.subscribers.take() {
144            Some(t) => t,
145            None => return 0,
146        };
147        let mut inner = match self.inner.take() {
148            Some(t) => t,
149            None => return 0,
150        };
151        inner.close();
152        let mut unprocessed = 0;
153        while let Some(msg) = inner.next().await {
154            let result = msg.nack().await;
155            match result {
156                Ok(_) => unprocessed += 1,
157                Err(e) => tracing::error!("nack message error: {}, {:?}", msg.ack_id(), e),
158            }
159        }
160        tracing::debug!("unprocessed messages in the buffer: {}", unprocessed);
161
162        // stop all the subscribers
163        for task in tasks {
164            let nacked = task.dispose().await;
165            tracing::debug!("unprocessed messages in the subscriber: {}", nacked);
166            unprocessed += nacked;
167        }
168        unprocessed
169    }
170}
171
172impl Drop for MessageStream {
173    fn drop(&mut self) {
174        if self.subscribers.is_none() {
175            return;
176        }
177        let mut inner = match self.inner.take() {
178            Some(t) => t,
179            None => return,
180        };
181        inner.close();
182        tracing::warn!("Call 'dispose' before drop in order to call nack for remaining messages");
183
184        let _forget = tokio::spawn(async move {
185            let mut ack_ids = vec![];
186            let mut subscription = None;
187            let mut client = None;
188            while let Some(msg) = inner.next().await {
189                ack_ids.push(msg.ack_id().to_string());
190                if subscription.is_none() {
191                    subscription = Some(msg.subscription.clone());
192                }
193                if client.is_none() {
194                    client = Some(msg.subscriber_client.clone());
195                }
196            }
197            if let (Some(sub), Some(cli)) = (subscription, client) {
198                tracing::debug!("nack {} unprocessed messages", ack_ids.len());
199                if let Err(err) = nack(&cli, sub, ack_ids).await {
200                    tracing::error!("failed to nack message: {:?}", err);
201                }
202            }
203        });
204    }
205}
206
207impl Stream for MessageStream {
208    type Item = ReceivedMessage;
209
210    // return None when all the subscribers are stopped and the queue is empty.
211    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
212        match &mut self.inner {
213            None => Poll::Ready(None),
214            Some(inner) => Pin::new(inner).poll_next(cx),
215        }
216    }
217}
218
219/// Subscription is a reference to a PubSub subscription.
220#[derive(Clone, Debug)]
221pub struct Subscription {
222    fqsn: String,
223    subc: SubscriberClient,
224}
225
226impl Subscription {
227    pub(crate) fn new(fqsn: String, subc: SubscriberClient) -> Self {
228        Self { fqsn, subc }
229    }
230
231    pub(crate) fn streaming_pool_size(&self) -> usize {
232        self.subc.streaming_pool_size()
233    }
234
235    /// id returns the unique identifier of the subscription within its project.
236    pub fn id(&self) -> String {
237        self.fqsn
238            .rfind('/')
239            .map_or("".to_string(), |i| self.fqsn[(i + 1)..].to_string())
240    }
241
242    /// fully_qualified_name returns the globally unique printable name of the subscription.
243    pub fn fully_qualified_name(&self) -> &str {
244        self.fqsn.as_str()
245    }
246
247    /// fully_qualified_snapshot_name returns the globally unique printable name of the snapshot.
248    pub fn fully_qualified_snapshot_name(&self, id: &str) -> String {
249        if id.contains('/') {
250            id.to_string()
251        } else {
252            format!("{}/snapshots/{}", self.fully_qualified_project_name(), id)
253        }
254    }
255
256    fn fully_qualified_project_name(&self) -> String {
257        let parts: Vec<_> = self
258            .fqsn
259            .split('/')
260            .enumerate()
261            .filter(|&(i, _)| i < 2)
262            .map(|e| e.1)
263            .collect();
264        parts.join("/")
265    }
266
267    pub fn get_client(&self) -> SubscriberClient {
268        self.subc.clone()
269    }
270
271    /// create creates the subscription.
272    pub async fn create(&self, fqtn: &str, cfg: SubscriptionConfig, retry: Option<RetrySetting>) -> Result<(), Status> {
273        self.subc
274            .create_subscription(
275                InternalSubscription {
276                    name: self.fully_qualified_name().to_string(),
277                    topic: fqtn.to_string(),
278                    push_config: cfg.push_config,
279                    bigquery_config: cfg.bigquery_config,
280                    cloud_storage_config: cfg.cloud_storage_config,
281                    ack_deadline_seconds: cfg.ack_deadline_seconds,
282                    labels: cfg.labels,
283                    enable_message_ordering: cfg.enable_message_ordering,
284                    expiration_policy: cfg.expiration_policy,
285                    filter: cfg.filter,
286                    dead_letter_policy: cfg.dead_letter_policy,
287                    retry_policy: cfg.retry_policy,
288                    detached: cfg.detached,
289                    message_retention_duration: cfg
290                        .message_retention_duration
291                        .map(Duration::try_into)
292                        .transpose()
293                        .map_err(|err: DurationError| Status::internal(err.to_string()))?,
294                    retain_acked_messages: cfg.retain_acked_messages,
295                    topic_message_retention_duration: cfg
296                        .topic_message_retention_duration
297                        .map(Duration::try_into)
298                        .transpose()
299                        .map_err(|err: DurationError| Status::internal(err.to_string()))?,
300                    enable_exactly_once_delivery: cfg.enable_exactly_once_delivery,
301                    state: cfg.state,
302                    analytics_hub_subscription_info: cfg.analytics_hub_subscription_info,
303                    message_transforms: cfg.message_transforms,
304                },
305                retry,
306            )
307            .await
308            .map(|_v| ())
309    }
310
311    /// delete deletes the subscription.
312    pub async fn delete(&self, retry: Option<RetrySetting>) -> Result<(), Status> {
313        let req = DeleteSubscriptionRequest {
314            subscription: self.fqsn.to_string(),
315        };
316        self.subc.delete_subscription(req, retry).await.map(|v| v.into_inner())
317    }
318
319    /// exists reports whether the subscription exists on the server.
320    pub async fn exists(&self, retry: Option<RetrySetting>) -> Result<bool, Status> {
321        let req = GetSubscriptionRequest {
322            subscription: self.fqsn.to_string(),
323        };
324        match self.subc.get_subscription(req, retry).await {
325            Ok(_) => Ok(true),
326            Err(e) => {
327                if e.code() == Code::NotFound {
328                    Ok(false)
329                } else {
330                    Err(e)
331                }
332            }
333        }
334    }
335
336    /// config fetches the current configuration for the subscription.
337    pub async fn config(&self, retry: Option<RetrySetting>) -> Result<(String, SubscriptionConfig), Status> {
338        let req = GetSubscriptionRequest {
339            subscription: self.fqsn.to_string(),
340        };
341        self.subc.get_subscription(req, retry).await.map(|v| {
342            let inner = v.into_inner();
343            (inner.topic.to_string(), inner.into())
344        })
345    }
346
347    /// update changes an existing subscription according to the fields set in updating.
348    /// It returns the new SubscriptionConfig.
349    pub async fn update(
350        &self,
351        updating: SubscriptionConfigToUpdate,
352        retry: Option<RetrySetting>,
353    ) -> Result<(String, SubscriptionConfig), Status> {
354        let req = GetSubscriptionRequest {
355            subscription: self.fqsn.to_string(),
356        };
357        let mut config = self.subc.get_subscription(req, retry.clone()).await?.into_inner();
358
359        let mut paths = vec![];
360        if updating.push_config.is_some() {
361            config.push_config = updating.push_config;
362            paths.push("push_config".to_string());
363        }
364        if updating.bigquery_config.is_some() {
365            config.bigquery_config = updating.bigquery_config;
366            paths.push("bigquery_config".to_string());
367        }
368        if let Some(v) = updating.ack_deadline_seconds {
369            config.ack_deadline_seconds = v;
370            paths.push("ack_deadline_seconds".to_string());
371        }
372        if let Some(v) = updating.retain_acked_messages {
373            config.retain_acked_messages = v;
374            paths.push("retain_acked_messages".to_string());
375        }
376        if updating.message_retention_duration.is_some() {
377            config.message_retention_duration = updating
378                .message_retention_duration
379                .map(prost_types::Duration::try_from)
380                .transpose()
381                .map_err(|err| Status::internal(err.to_string()))?;
382            paths.push("message_retention_duration".to_string());
383        }
384        if updating.expiration_policy.is_some() {
385            config.expiration_policy = updating.expiration_policy;
386            paths.push("expiration_policy".to_string());
387        }
388        if let Some(v) = updating.labels {
389            config.labels = v;
390            paths.push("labels".to_string());
391        }
392        if updating.retry_policy.is_some() {
393            config.retry_policy = updating.retry_policy;
394            paths.push("retry_policy".to_string());
395        }
396
397        let update_req = UpdateSubscriptionRequest {
398            subscription: Some(config),
399            update_mask: Some(FieldMask { paths }),
400        };
401        self.subc.update_subscription(update_req, retry).await.map(|v| {
402            let inner = v.into_inner();
403            (inner.topic.to_string(), inner.into())
404        })
405    }
406
407    /// pull pulls messages from the server.
408    pub async fn pull(&self, max_messages: i32, retry: Option<RetrySetting>) -> Result<Vec<ReceivedMessage>, Status> {
409        #[allow(deprecated)]
410        let req = PullRequest {
411            subscription: self.fqsn.clone(),
412            return_immediately: false,
413            max_messages,
414        };
415        let messages = self.subc.pull(req, retry).await?.into_inner().received_messages;
416        Ok(messages
417            .into_iter()
418            .filter(|m| m.message.is_some())
419            .map(|m| {
420                ReceivedMessage::new(
421                    self.fqsn.clone(),
422                    self.subc.clone(),
423                    m.message.unwrap(),
424                    m.ack_id,
425                    (m.delivery_attempt > 0).then_some(m.delivery_attempt as usize),
426                )
427            })
428            .collect())
429    }
430
431    /// Subscribes to a Pub/Sub subscription and creates a `MessageStream` for consuming messages.
432    ///
433    /// This method initializes a message stream by setting up the necessary channel and spawning
434    /// subscriber tasks based on the provided configuration. It supports multiple subscribers and
435    /// configurable channel capacity.
436    ///
437    /// ```
438    /// use google_cloud_gax::grpc::Status;
439    /// use google_cloud_pubsub::subscription::{SubscribeConfig, Subscription};
440    /// use futures_util::StreamExt;
441    /// use tokio::select;
442    /// use tokio_util::sync::CancellationToken;
443    ///
444    /// async fn run(ctx: CancellationToken, subscription: Subscription) -> Result<(), Status> {
445    ///     // Start receiving messages from the subscription.
446    ///     let mut iter = subscription.subscribe(None).await?;
447    ///     // Get buffered messages.
448    ///     // To close safely, use a CancellationToken or to signal shutdown.
449    ///     while let Some(message) = tokio::select!{
450    ///         v = iter.next() => v,
451    ///         _ = ctx.cancelled() => None,
452    ///     } {
453    ///         let _ = message.ack().await;
454    ///     }
455    ///     // Wait for all the unprocessed messages to be Nack.
456    ///     // If you don't call dispose, the unprocessed messages will be Nack when the iterator is dropped.
457    ///     iter.dispose().await;
458    ///     Ok(())
459    ///  }
460    /// ```
461    pub async fn subscribe(&self, opt: Option<SubscribeConfig>) -> Result<MessageStream, Status> {
462        let opt = opt.unwrap_or_default();
463        let (tx, rx) = mpsc::channel(opt.channel_capacity.max(1));
464        let sub_opt = self.unwrap_subscribe_config(opt.subscriber_config).await?;
465
466        // spawn a separate subscriber task for each connection in the pool
467        let subscriber_num = if opt.enable_multiple_subscriber {
468            self.streaming_pool_size()
469        } else {
470            1
471        };
472        let mut subscribers = Vec::with_capacity(subscriber_num);
473        for _ in 0..subscriber_num {
474            subscribers.push(Subscriber::spawn(
475                self.fqsn.clone(),
476                self.subc.clone(),
477                tx.clone(),
478                sub_opt.clone(),
479            ));
480        }
481
482        Ok(MessageStream {
483            inner: Some(ReceiverStream::new(rx)),
484            subscribers: Some(subscribers),
485        })
486    }
487
488    /// Ack acknowledges the messages associated with the ack_ids in the AcknowledgeRequest.
489    /// The Pub/Sub system can remove the relevant messages from the subscription.
490    /// This method is for batch ack.
491    pub async fn ack(&self, ack_ids: Vec<String>) -> Result<(), Status> {
492        ack(&self.subc, self.fqsn.to_string(), ack_ids).await
493    }
494
495    /// seek seeks the subscription a past timestamp or a saved snapshot.
496    pub async fn seek(&self, to: SeekTo, retry: Option<RetrySetting>) -> Result<(), Status> {
497        let to = match to {
498            SeekTo::Timestamp(t) => SeekTo::Timestamp(t),
499            SeekTo::Snapshot(name) => SeekTo::Snapshot(self.fully_qualified_snapshot_name(name.as_str())),
500        };
501
502        let req = SeekRequest {
503            subscription: self.fqsn.to_owned(),
504            target: Some(to.into()),
505        };
506
507        let _ = self.subc.seek(req, retry).await?;
508        Ok(())
509    }
510
511    /// get_snapshot fetches an existing pubsub snapshot.
512    pub async fn get_snapshot(&self, name: &str, retry: Option<RetrySetting>) -> Result<Snapshot, Status> {
513        let req = GetSnapshotRequest {
514            snapshot: self.fully_qualified_snapshot_name(name),
515        };
516        Ok(self.subc.get_snapshot(req, retry).await?.into_inner())
517    }
518
519    /// create_snapshot creates a new pubsub snapshot from the subscription's state at the time of calling.
520    /// The snapshot retains the messages for the topic the subscription is subscribed to, with the acknowledgment
521    /// states consistent with the subscriptions.
522    /// The created snapshot is guaranteed to retain:
523    /// - The message backlog on the subscription -- or to be specific, messages that are unacknowledged
524    ///   at the time of the subscription's creation.
525    /// - All messages published to the subscription's topic after the snapshot's creation.
526    ///   Snapshots have a finite lifetime -- a maximum of 7 days from the time of creation, beyond which
527    ///   they are discarded and any messages being retained solely due to the snapshot dropped.
528    pub async fn create_snapshot(
529        &self,
530        name: &str,
531        labels: HashMap<String, String>,
532        retry: Option<RetrySetting>,
533    ) -> Result<Snapshot, Status> {
534        let req = CreateSnapshotRequest {
535            name: self.fully_qualified_snapshot_name(name),
536            labels,
537            subscription: self.fqsn.to_owned(),
538        };
539        Ok(self.subc.create_snapshot(req, retry).await?.into_inner())
540    }
541
542    /// delete_snapshot deletes an existing pubsub snapshot.
543    pub async fn delete_snapshot(&self, name: &str, retry: Option<RetrySetting>) -> Result<(), Status> {
544        let req = DeleteSnapshotRequest {
545            snapshot: self.fully_qualified_snapshot_name(name),
546        };
547        let _ = self.subc.delete_snapshot(req, retry).await?;
548        Ok(())
549    }
550
551    async fn unwrap_subscribe_config(&self, cfg: Option<SubscriberConfig>) -> Result<SubscriberConfig, Status> {
552        if let Some(cfg) = cfg {
553            return Ok(cfg);
554        }
555        let cfg = self.config(None).await?;
556        let mut default_cfg = SubscriberConfig {
557            stream_ack_deadline_seconds: cfg.1.ack_deadline_seconds.clamp(10, 600),
558            ..Default::default()
559        };
560        if cfg.1.enable_exactly_once_delivery {
561            default_cfg.max_outstanding_messages = 5;
562        }
563        Ok(default_cfg)
564    }
565}
566
567#[cfg(test)]
568#[allow(deprecated)]
569mod tests {
570
571    use std::collections::HashMap;
572
573    use std::time::Duration;
574
575    use futures_util::StreamExt;
576    use serial_test::serial;
577    use tokio_util::sync::CancellationToken;
578
579    use uuid::Uuid;
580
581    use google_cloud_gax::conn::{ConnectionOptions, Environment};
582    use google_cloud_googleapis::pubsub::v1::{PublishRequest, PubsubMessage};
583
584    use crate::apiv1::conn_pool::ConnectionManager;
585    use crate::apiv1::publisher_client::PublisherClient;
586    use crate::apiv1::subscriber_client::SubscriberClient;
587    use crate::subscriber::ReceivedMessage;
588    use crate::subscription::{SeekTo, SubscribeConfig, Subscription, SubscriptionConfig, SubscriptionConfigToUpdate};
589    use crate::topic::Topic;
590
591    const PROJECT_NAME: &str = "local-project";
592    const EMULATOR: &str = "localhost:8681";
593
594    #[tokio::test(flavor = "multi_thread")]
595    #[serial]
596    async fn test_pull() {
597        let (subscription, topic) = create_subscription(false, false).await;
598        let base = PubsubMessage {
599            data: "test_message".into(),
600            ..Default::default()
601        };
602        publish(&topic, Some(vec![base.clone(), base.clone(), base])).await;
603        let messages = subscription.pull(2, None).await.unwrap();
604        assert_eq!(messages.len(), 2);
605        for m in messages {
606            m.ack().await.unwrap();
607        }
608        subscription.delete(None).await.unwrap();
609    }
610
611    #[tokio::test(flavor = "multi_thread")]
612    #[serial]
613    async fn test_batch_ack() {
614        let ctx = CancellationToken::new();
615        let (subscription, topic) = create_subscription(false, false).await;
616        let (sender, receiver) = async_channel::unbounded();
617        let subscription_for_receive = subscription.clone();
618        let ctx_for_subscribe = ctx.clone();
619
620        let subscriber = tokio::spawn(async move {
621            let mut stream = subscription_for_receive.subscribe(None).await.unwrap();
622            while let Some(message) = tokio::select! {
623                v = stream.next() => v,
624                _ = ctx_for_subscribe.cancelled() => None,
625            } {
626                let _ = sender.send(message.ack_id().to_string()).await;
627            }
628            stream.dispose().await;
629            tracing::info!("finish subscriber task");
630        });
631
632        let ack_manager = tokio::spawn(async move {
633            let mut ack_ids = Vec::new();
634            while let Ok(ack_id) = receiver.recv().await {
635                tracing::info!("received ack_id: {}", ack_id);
636                ack_ids.push(ack_id);
637            }
638            assert!(!ack_ids.is_empty());
639            let _ = subscription.ack(ack_ids).await;
640            tracing::info!("finish ack manager task");
641        });
642
643        let msg = PubsubMessage {
644            data: "test".into(),
645            ..Default::default()
646        };
647        let msg: Vec<PubsubMessage> = (0..10).map(|_v| msg.clone()).collect();
648        publish(&topic, Some(msg)).await;
649        tokio::time::sleep(Duration::from_secs(10)).await;
650        ctx.cancel();
651
652        assert!(subscriber.await.is_ok());
653        assert!(ack_manager.await.is_ok());
654    }
655
656    #[tokio::test]
657    #[serial]
658    async fn test_snapshots() {
659        let (subscription, _topic) = create_subscription(false, false).await;
660
661        let snapshot_name = format!("snapshot-{}", rand::random::<u64>());
662        let labels: HashMap<String, String> =
663            HashMap::from_iter([("label-1".into(), "v1".into()), ("label-2".into(), "v2".into())]);
664        let expected_fq_snap_name = format!("projects/{PROJECT_NAME}/snapshots/{snapshot_name}");
665
666        // cleanup; TODO: remove?
667        let _response = subscription.delete_snapshot(snapshot_name.as_str(), None).await;
668
669        // create
670        let created_snapshot = subscription
671            .create_snapshot(snapshot_name.as_str(), labels.clone(), None)
672            .await
673            .unwrap();
674
675        assert_eq!(created_snapshot.name, expected_fq_snap_name);
676        // NOTE: we don't assert the labels due to lack of label support in the pubsub emulator.
677
678        // get
679        let retrieved_snapshot = subscription.get_snapshot(snapshot_name.as_str(), None).await.unwrap();
680        assert_eq!(created_snapshot, retrieved_snapshot);
681
682        // delete
683        subscription
684            .delete_snapshot(snapshot_name.as_str(), None)
685            .await
686            .unwrap();
687
688        let _deleted_snapshot_status = subscription
689            .get_snapshot(snapshot_name.as_str(), None)
690            .await
691            .expect_err("snapshot should have been deleted");
692
693        let _delete_again = subscription
694            .delete_snapshot(snapshot_name.as_str(), None)
695            .await
696            .expect_err("snapshot should already be deleted");
697    }
698
699    #[tokio::test]
700    #[serial]
701    async fn test_seek_snapshot() {
702        let (subscription, topic) = create_subscription(false, false).await;
703        let snapshot_name = format!("snapshot-{}", rand::random::<u64>());
704
705        // publish and receive a message
706        publish(&topic, None).await;
707        let messages = subscription.pull(100, None).await.unwrap();
708        ack_all(&messages).await;
709        assert_eq!(messages.len(), 1);
710
711        // snapshot at received = 1
712        let _snapshot = subscription
713            .create_snapshot(snapshot_name.as_str(), HashMap::new(), None)
714            .await
715            .unwrap();
716
717        // publish and receive another message
718        publish(&topic, None).await;
719        let messages = subscription.pull(100, None).await.unwrap();
720        assert_eq!(messages.len(), 1);
721        ack_all(&messages).await;
722
723        // rewind to snapshot at received = 1
724        subscription
725            .seek(SeekTo::Snapshot(snapshot_name.clone()), None)
726            .await
727            .unwrap();
728
729        // assert we receive the 1 message we should receive again
730        let messages = subscription.pull(100, None).await.unwrap();
731        assert_eq!(messages.len(), 1);
732        ack_all(&messages).await;
733
734        // cleanup
735        subscription
736            .delete_snapshot(snapshot_name.as_str(), None)
737            .await
738            .unwrap();
739        subscription.delete(None).await.unwrap();
740    }
741
742    #[tokio::test]
743    #[serial]
744    async fn test_seek_timestamp() {
745        let (subscription, topic) = create_subscription(false, false).await;
746
747        // enable acked message retention on subscription -- required for timestamp-based seeks
748        subscription
749            .update(
750                SubscriptionConfigToUpdate {
751                    retain_acked_messages: Some(true),
752                    message_retention_duration: Some(Duration::new(60 * 60 * 2, 0)),
753                    ..Default::default()
754                },
755                None,
756            )
757            .await
758            .unwrap();
759
760        // publish and receive a message
761        publish(&topic, None).await;
762        let messages = subscription.pull(100, None).await.unwrap();
763        ack_all(&messages).await;
764        assert_eq!(messages.len(), 1);
765
766        let message_publish_time = messages.first().unwrap().message.publish_time.to_owned().unwrap();
767
768        // rewind to a timestamp where message was just published
769        subscription
770            .seek(SeekTo::Timestamp(message_publish_time.to_owned().try_into().unwrap()), None)
771            .await
772            .unwrap();
773
774        // consume -- should receive the first message again
775        let messages = subscription.pull(100, None).await.unwrap();
776        ack_all(&messages).await;
777        assert_eq!(messages.len(), 1);
778        let seek_message_publish_time = messages.first().unwrap().message.publish_time.to_owned().unwrap();
779        assert_eq!(seek_message_publish_time, message_publish_time);
780
781        // cleanup
782        subscription.delete(None).await.unwrap();
783    }
784
785    #[tokio::test(flavor = "multi_thread")]
786    #[serial]
787    async fn test_subscribe_pattern() {
788        // default
789        let opt = Some(SubscribeConfig::default());
790        test_subscribe(opt.clone(), true, true, 10, 11).await;
791        test_subscribe(opt.clone(), false, true, 10, 11).await;
792        test_subscribe(opt.clone(), true, false, 10, 10).await;
793        test_subscribe(opt.clone(), false, false, 10, 10).await;
794        test_subscribe(opt.clone(), true, true, 10, 5).await;
795        test_subscribe(opt.clone(), false, true, 10, 5).await;
796        test_subscribe(opt.clone(), true, false, 10, 1).await;
797        test_subscribe(opt.clone(), false, false, 10, 1).await;
798        test_subscribe(opt.clone(), true, true, 0, 0).await;
799        test_subscribe(opt.clone(), false, true, 0, 0).await;
800
801        // with multiple subscribers
802        let opt = Some(SubscribeConfig::default().with_enable_multiple_subscriber(true));
803        test_subscribe(opt.clone(), true, false, 10, 11).await;
804        test_subscribe(opt.clone(), false, false, 10, 11).await;
805        test_subscribe(opt.clone(), true, true, 10, 10).await;
806        test_subscribe(opt.clone(), false, true, 10, 10).await;
807        test_subscribe(opt.clone(), true, false, 10, 5).await;
808        test_subscribe(opt.clone(), false, false, 10, 5).await;
809        test_subscribe(opt.clone(), true, true, 10, 1).await;
810        test_subscribe(opt.clone(), false, true, 10, 1).await;
811        test_subscribe(opt.clone(), true, false, 0, 0).await;
812        test_subscribe(opt.clone(), false, false, 0, 0).await;
813
814        // with multiple subscribers and channel capacity
815        let opt = Some(
816            SubscribeConfig::default()
817                .with_enable_multiple_subscriber(true)
818                .with_channel_capacity(1),
819        );
820        test_subscribe(opt.clone(), true, true, 10, 11).await;
821        test_subscribe(opt.clone(), false, true, 10, 11).await;
822        test_subscribe(opt.clone(), true, false, 10, 10).await;
823        test_subscribe(opt.clone(), false, false, 10, 10).await;
824        test_subscribe(opt.clone(), true, true, 10, 5).await;
825        test_subscribe(opt.clone(), false, true, 10, 5).await;
826        test_subscribe(opt.clone(), true, false, 10, 1).await;
827        test_subscribe(opt.clone(), false, false, 10, 1).await;
828        test_subscribe(opt.clone(), true, true, 0, 0).await;
829        test_subscribe(opt.clone(), false, true, 0, 0).await;
830    }
831
832    #[tokio::test(flavor = "multi_thread")]
833    #[serial]
834    async fn test_subscribe_forget() {
835        let (subscription, topic) = create_subscription(false, false).await;
836
837        // for all nack
838        let iter = subscription.subscribe(None).await.unwrap();
839
840        let msg = PubsubMessage {
841            data: "test".into(),
842            ordering_key: "order1".to_string(),
843            ..Default::default()
844        };
845        let msg: Vec<PubsubMessage> = (0..10).map(|_v| msg.clone()).collect();
846        publish(&topic, Some(msg)).await;
847        tokio::time::sleep(Duration::from_secs(5)).await;
848
849        // spawn nack task
850        drop(iter);
851        tokio::time::sleep(Duration::from_secs(3)).await;
852
853        // ensure all the messages should be redelivered
854        let ctx = CancellationToken::new();
855        let ctx_for_sub = ctx.clone();
856        let subscriber = tokio::spawn(async move {
857            let mut acked = 0;
858            let mut iter = subscription.subscribe(None).await.unwrap();
859            while let Some(message) = tokio::select! {
860                v = iter.next() => v,
861                _ = ctx_for_sub.cancelled() => None,
862            } {
863                let _ = message.ack().await;
864                tracing::info!("acked {}", message.message.message_id);
865                acked += 1;
866            }
867            let nack_msgs = iter.dispose().await;
868            assert_eq!(nack_msgs, 0);
869            tracing::info!("disposed");
870            acked
871        });
872
873        tokio::time::sleep(Duration::from_secs(20)).await;
874        ctx.cancel();
875        let acked = subscriber.await.unwrap();
876        assert_eq!(acked, 10);
877    }
878
879    #[tokio::test(flavor = "multi_thread")]
880    #[serial]
881    async fn test_subscribe_finish_on_available() {
882        test_subscribe_unavailable(SubscribeConfig::default()).await;
883
884        let cfg = SubscribeConfig::default().with_enable_multiple_subscriber(true);
885        test_subscribe_unavailable(cfg).await;
886
887        let cfg = SubscribeConfig::default()
888            .with_enable_multiple_subscriber(true)
889            .with_channel_capacity(1);
890        test_subscribe_unavailable(cfg).await;
891    }
892
893    async fn test_subscribe_unavailable(cfg: SubscribeConfig) {
894        let (subscription, _) = create_subscription(true, true).await;
895        let subscription_for_delete = subscription.clone();
896        tokio::spawn(async move {
897            tokio::time::sleep(Duration::from_secs(5)).await;
898            subscription_for_delete.delete(None).await.unwrap();
899        });
900        let mut iter = subscription.subscribe(Some(cfg)).await.unwrap();
901        while let Some(message) = tokio::select! {
902            v = iter.next() => v,
903            _ = tokio::time::sleep(Duration::from_secs(10)) => {
904                panic!("test_subscribe_finish_on_available timeout");
905            }
906        } {
907            message.ack().await.unwrap();
908        }
909        iter.dispose().await;
910    }
911
912    async fn test_subscribe(
913        opt: Option<SubscribeConfig>,
914        enable_exactly_once_delivery: bool,
915        enable_message_ordering: bool,
916        msg_count: usize,
917        limit: usize,
918    ) {
919        tracing::info!(
920            "test_subscribe: exactly_once_delivery={} msg_count={} limit={}",
921            enable_exactly_once_delivery,
922            msg_count,
923            limit
924        );
925        let (subscription, topic) = create_subscription(enable_exactly_once_delivery, enable_message_ordering).await;
926
927        let ctx = CancellationToken::new();
928        let ctx_for_pub = ctx.clone();
929
930        // publish messages
931        let publisher = tokio::spawn(async move {
932            let msg = PubsubMessage {
933                data: "test".into(),
934                ordering_key: "order1".to_string(),
935                ..Default::default()
936            };
937            let msg: Vec<PubsubMessage> = (0..msg_count).map(|_v| msg.clone()).collect();
938            publish(&topic, Some(msg)).await;
939            tokio::time::sleep(Duration::from_secs(10)).await;
940            ctx_for_pub.cancel();
941        });
942
943        // subscribe and ack messages
944        let mut acked = 0;
945        let mut iter = subscription.subscribe(opt).await.unwrap();
946        while let Some(message) = {
947            tokio::select! {
948                v = iter.next() => v,
949                _ = ctx.cancelled() => None
950            }
951        } {
952            let _ = message.ack().await;
953            tracing::info!("acked {}", message.message.message_id);
954            acked += 1;
955            if acked >= limit {
956                // should nack rest of messages
957                break;
958            }
959        }
960        let nack_msgs = iter.dispose().await;
961        assert_eq!(nack_msgs, msg_count - limit.min(msg_count));
962
963        publisher.await.unwrap();
964        tracing::info!("disposed");
965
966        if limit > msg_count {
967            assert_eq!(acked, msg_count);
968        } else {
969            assert_eq!(acked, limit);
970        }
971    }
972
973    async fn ack_all(messages: &[ReceivedMessage]) {
974        for message in messages.iter() {
975            message.ack().await.unwrap();
976        }
977    }
978
979    async fn create_subscription(
980        enable_exactly_once_delivery: bool,
981        enable_message_ordering: bool,
982    ) -> (Subscription, Topic) {
983        let cm = ConnectionManager::new(
984            4,
985            "",
986            &Environment::Emulator(EMULATOR.to_string()),
987            &ConnectionOptions::default(),
988        )
989        .await
990        .unwrap();
991        let cm2 = ConnectionManager::new(
992            4,
993            "",
994            &Environment::Emulator(EMULATOR.to_string()),
995            &ConnectionOptions::default(),
996        )
997        .await
998        .unwrap();
999        let cm3 = ConnectionManager::new(
1000            4,
1001            "",
1002            &Environment::Emulator(EMULATOR.to_string()),
1003            &ConnectionOptions::default(),
1004        )
1005        .await
1006        .unwrap();
1007        let sub_client = SubscriberClient::new(cm, cm2);
1008        let pub_client = PublisherClient::new(cm3);
1009        let uuid = Uuid::new_v4().hyphenated().to_string();
1010
1011        let topic_name = format!("projects/{}/topics/t{}", PROJECT_NAME, &uuid);
1012        let topic = Topic::new(topic_name.clone(), pub_client, sub_client.clone());
1013        topic.create(None, None).await.unwrap();
1014
1015        let subscription_name = format!("projects/{}/subscriptions/s{}", PROJECT_NAME, &uuid);
1016        let subscription = Subscription::new(subscription_name, sub_client);
1017        let config = SubscriptionConfig {
1018            enable_exactly_once_delivery,
1019            enable_message_ordering,
1020            ..Default::default()
1021        };
1022        subscription.create(topic_name.as_str(), config, None).await.unwrap();
1023        (subscription, topic)
1024    }
1025
1026    async fn publish(topic: &Topic, messages: Option<Vec<PubsubMessage>>) {
1027        let pubc = PublisherClient::new(
1028            ConnectionManager::new(
1029                4,
1030                "",
1031                &Environment::Emulator(EMULATOR.to_string()),
1032                &ConnectionOptions::default(),
1033            )
1034            .await
1035            .unwrap(),
1036        );
1037        let messages = messages.unwrap_or(vec![PubsubMessage {
1038            data: "test_message".into(),
1039            ..Default::default()
1040        }]);
1041        let req = PublishRequest {
1042            topic: topic.fully_qualified_name().to_string(),
1043            messages,
1044        };
1045        let _ = pubc.publish(req, None).await;
1046    }
1047}