gcloud_pubsub/
subscription.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::{Duration, SystemTime};
6
7use prost_types::{DurationError, FieldMask};
8use tokio_util::sync::CancellationToken;
9
10use google_cloud_gax::grpc::codegen::tokio_stream::Stream;
11use google_cloud_gax::grpc::{Code, Status};
12use google_cloud_gax::retry::RetrySetting;
13use google_cloud_googleapis::pubsub::v1::seek_request::Target;
14use google_cloud_googleapis::pubsub::v1::subscription::AnalyticsHubSubscriptionInfo;
15use google_cloud_googleapis::pubsub::v1::{
16    BigQueryConfig, CloudStorageConfig, CreateSnapshotRequest, DeadLetterPolicy, DeleteSnapshotRequest,
17    DeleteSubscriptionRequest, ExpirationPolicy, GetSnapshotRequest, GetSubscriptionRequest, MessageTransform,
18    PullRequest, PushConfig, RetryPolicy, SeekRequest, Snapshot, Subscription as InternalSubscription,
19    UpdateSubscriptionRequest,
20};
21
22use crate::apiv1::subscriber_client::SubscriberClient;
23
24use crate::subscriber::{ack, ReceivedMessage, Subscriber, SubscriberConfig};
25
26#[derive(Debug, Clone, Default)]
27pub struct SubscriptionConfig {
28    pub push_config: Option<PushConfig>,
29    pub ack_deadline_seconds: i32,
30    pub retain_acked_messages: bool,
31    pub message_retention_duration: Option<Duration>,
32    pub labels: HashMap<String, String>,
33    pub enable_message_ordering: bool,
34    pub expiration_policy: Option<ExpirationPolicy>,
35    pub filter: String,
36    pub dead_letter_policy: Option<DeadLetterPolicy>,
37    pub retry_policy: Option<RetryPolicy>,
38    pub detached: bool,
39    pub topic_message_retention_duration: Option<Duration>,
40    pub enable_exactly_once_delivery: bool,
41    pub bigquery_config: Option<BigQueryConfig>,
42    pub state: i32,
43    pub cloud_storage_config: Option<CloudStorageConfig>,
44    pub analytics_hub_subscription_info: Option<AnalyticsHubSubscriptionInfo>,
45    pub message_transforms: Vec<MessageTransform>,
46}
47impl From<InternalSubscription> for SubscriptionConfig {
48    fn from(f: InternalSubscription) -> Self {
49        Self {
50            push_config: f.push_config,
51            bigquery_config: f.bigquery_config,
52            ack_deadline_seconds: f.ack_deadline_seconds,
53            retain_acked_messages: f.retain_acked_messages,
54            message_retention_duration: f
55                .message_retention_duration
56                .map(|v| std::time::Duration::new(v.seconds as u64, v.nanos as u32)),
57            labels: f.labels,
58            enable_message_ordering: f.enable_message_ordering,
59            expiration_policy: f.expiration_policy,
60            filter: f.filter,
61            dead_letter_policy: f.dead_letter_policy,
62            retry_policy: f.retry_policy,
63            detached: f.detached,
64            topic_message_retention_duration: f
65                .topic_message_retention_duration
66                .map(|v| std::time::Duration::new(v.seconds as u64, v.nanos as u32)),
67            enable_exactly_once_delivery: f.enable_exactly_once_delivery,
68            state: f.state,
69            cloud_storage_config: f.cloud_storage_config,
70            analytics_hub_subscription_info: f.analytics_hub_subscription_info,
71            message_transforms: f.message_transforms,
72        }
73    }
74}
75
76#[derive(Debug, Clone, Default)]
77pub struct SubscriptionConfigToUpdate {
78    pub push_config: Option<PushConfig>,
79    pub bigquery_config: Option<BigQueryConfig>,
80    pub ack_deadline_seconds: Option<i32>,
81    pub retain_acked_messages: Option<bool>,
82    pub message_retention_duration: Option<Duration>,
83    pub labels: Option<HashMap<String, String>>,
84    pub expiration_policy: Option<ExpirationPolicy>,
85    pub dead_letter_policy: Option<DeadLetterPolicy>,
86    pub retry_policy: Option<RetryPolicy>,
87}
88
89#[derive(Debug, Clone, Default)]
90pub struct SubscribeConfig {
91    enable_multiple_subscriber: bool,
92    channel_capacity: Option<usize>,
93    subscriber_config: Option<SubscriberConfig>,
94}
95
96impl SubscribeConfig {
97    pub fn with_enable_multiple_subscriber(mut self, v: bool) -> Self {
98        self.enable_multiple_subscriber = v;
99        self
100    }
101    pub fn with_subscriber_config(mut self, v: SubscriberConfig) -> Self {
102        self.subscriber_config = Some(v);
103        self
104    }
105    pub fn with_channel_capacity(mut self, v: usize) -> Self {
106        self.channel_capacity = Some(v);
107        self
108    }
109}
110
111#[derive(Debug, Clone)]
112pub struct ReceiveConfig {
113    pub worker_count: usize,
114    pub channel_capacity: Option<usize>,
115    pub subscriber_config: Option<SubscriberConfig>,
116}
117
118impl Default for ReceiveConfig {
119    fn default() -> Self {
120        Self {
121            worker_count: 10,
122            subscriber_config: None,
123            channel_capacity: None,
124        }
125    }
126}
127
128#[derive(Debug, Clone)]
129pub enum SeekTo {
130    Timestamp(SystemTime),
131    Snapshot(String),
132}
133
134impl From<SeekTo> for Target {
135    fn from(to: SeekTo) -> Target {
136        use SeekTo::*;
137        match to {
138            Timestamp(t) => Target::Time(prost_types::Timestamp::from(t)),
139            Snapshot(s) => Target::Snapshot(s),
140        }
141    }
142}
143
144pub struct MessageStream {
145    queue: async_channel::Receiver<ReceivedMessage>,
146    cancel: CancellationToken,
147    tasks: Vec<Subscriber>,
148}
149
150impl MessageStream {
151    pub fn cancellable(&self) -> CancellationToken {
152        self.cancel.clone()
153    }
154
155    pub async fn dispose(&mut self) {
156        // Close streaming pull task
157        if !self.cancel.is_cancelled() {
158            self.cancel.cancel();
159        }
160
161        // Wait for all the streaming pull close.
162        for task in &mut self.tasks {
163            task.done().await;
164        }
165
166        // Nack for remaining messages.
167        while let Ok(message) = self.queue.recv().await {
168            if let Err(err) = message.nack().await {
169                tracing::warn!("failed to nack message messageId={} {:?}", message.message.message_id, err);
170            }
171        }
172    }
173
174    /// Immediately Nack on cancel
175    pub async fn read(&mut self) -> Option<ReceivedMessage> {
176        let message = tokio::select! {
177            msg = self.queue.recv() => msg.ok(),
178            _ = self.cancel.cancelled() => None
179        };
180        if message.is_none() {
181            self.dispose().await;
182        }
183        message
184    }
185}
186
187impl Drop for MessageStream {
188    fn drop(&mut self) {
189        if !self.queue.is_empty() {
190            tracing::warn!("Call 'dispose' before drop in order to call nack for remaining messages");
191        }
192        if !self.cancel.is_cancelled() {
193            self.cancel.cancel();
194        }
195    }
196}
197
198impl Stream for MessageStream {
199    type Item = ReceivedMessage;
200
201    /// Return None unless the queue is open.
202    /// Use CancellationToken for SubscribeConfig to get None
203    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
204        Pin::new(&mut self.get_mut().queue).poll_next(cx)
205    }
206}
207
208/// Subscription is a reference to a PubSub subscription.
209#[derive(Clone, Debug)]
210pub struct Subscription {
211    fqsn: String,
212    subc: SubscriberClient,
213}
214
215impl Subscription {
216    pub(crate) fn new(fqsn: String, subc: SubscriberClient) -> Self {
217        Self { fqsn, subc }
218    }
219
220    pub(crate) fn streaming_pool_size(&self) -> usize {
221        self.subc.streaming_pool_size()
222    }
223
224    /// id returns the unique identifier of the subscription within its project.
225    pub fn id(&self) -> String {
226        self.fqsn
227            .rfind('/')
228            .map_or("".to_string(), |i| self.fqsn[(i + 1)..].to_string())
229    }
230
231    /// fully_qualified_name returns the globally unique printable name of the subscription.
232    pub fn fully_qualified_name(&self) -> &str {
233        self.fqsn.as_str()
234    }
235
236    /// fully_qualified_snapshot_name returns the globally unique printable name of the snapshot.
237    pub fn fully_qualified_snapshot_name(&self, id: &str) -> String {
238        if id.contains('/') {
239            id.to_string()
240        } else {
241            format!("{}/snapshots/{}", self.fully_qualified_project_name(), id)
242        }
243    }
244
245    fn fully_qualified_project_name(&self) -> String {
246        let parts: Vec<_> = self
247            .fqsn
248            .split('/')
249            .enumerate()
250            .filter(|&(i, _)| i < 2)
251            .map(|e| e.1)
252            .collect();
253        parts.join("/")
254    }
255
256    pub fn get_client(&self) -> SubscriberClient {
257        self.subc.clone()
258    }
259
260    /// create creates the subscription.
261    pub async fn create(&self, fqtn: &str, cfg: SubscriptionConfig, retry: Option<RetrySetting>) -> Result<(), Status> {
262        self.subc
263            .create_subscription(
264                InternalSubscription {
265                    name: self.fully_qualified_name().to_string(),
266                    topic: fqtn.to_string(),
267                    push_config: cfg.push_config,
268                    bigquery_config: cfg.bigquery_config,
269                    cloud_storage_config: cfg.cloud_storage_config,
270                    ack_deadline_seconds: cfg.ack_deadline_seconds,
271                    labels: cfg.labels,
272                    enable_message_ordering: cfg.enable_message_ordering,
273                    expiration_policy: cfg.expiration_policy,
274                    filter: cfg.filter,
275                    dead_letter_policy: cfg.dead_letter_policy,
276                    retry_policy: cfg.retry_policy,
277                    detached: cfg.detached,
278                    message_retention_duration: cfg
279                        .message_retention_duration
280                        .map(Duration::try_into)
281                        .transpose()
282                        .map_err(|err: DurationError| Status::internal(err.to_string()))?,
283                    retain_acked_messages: cfg.retain_acked_messages,
284                    topic_message_retention_duration: cfg
285                        .topic_message_retention_duration
286                        .map(Duration::try_into)
287                        .transpose()
288                        .map_err(|err: DurationError| Status::internal(err.to_string()))?,
289                    enable_exactly_once_delivery: cfg.enable_exactly_once_delivery,
290                    state: cfg.state,
291                    analytics_hub_subscription_info: cfg.analytics_hub_subscription_info,
292                    message_transforms: cfg.message_transforms,
293                },
294                retry,
295            )
296            .await
297            .map(|_v| ())
298    }
299
300    /// delete deletes the subscription.
301    pub async fn delete(&self, retry: Option<RetrySetting>) -> Result<(), Status> {
302        let req = DeleteSubscriptionRequest {
303            subscription: self.fqsn.to_string(),
304        };
305        self.subc.delete_subscription(req, retry).await.map(|v| v.into_inner())
306    }
307
308    /// exists reports whether the subscription exists on the server.
309    pub async fn exists(&self, retry: Option<RetrySetting>) -> Result<bool, Status> {
310        let req = GetSubscriptionRequest {
311            subscription: self.fqsn.to_string(),
312        };
313        match self.subc.get_subscription(req, retry).await {
314            Ok(_) => Ok(true),
315            Err(e) => {
316                if e.code() == Code::NotFound {
317                    Ok(false)
318                } else {
319                    Err(e)
320                }
321            }
322        }
323    }
324
325    /// config fetches the current configuration for the subscription.
326    pub async fn config(&self, retry: Option<RetrySetting>) -> Result<(String, SubscriptionConfig), Status> {
327        let req = GetSubscriptionRequest {
328            subscription: self.fqsn.to_string(),
329        };
330        self.subc.get_subscription(req, retry).await.map(|v| {
331            let inner = v.into_inner();
332            (inner.topic.to_string(), inner.into())
333        })
334    }
335
336    /// update changes an existing subscription according to the fields set in updating.
337    /// It returns the new SubscriptionConfig.
338    pub async fn update(
339        &self,
340        updating: SubscriptionConfigToUpdate,
341        retry: Option<RetrySetting>,
342    ) -> Result<(String, SubscriptionConfig), Status> {
343        let req = GetSubscriptionRequest {
344            subscription: self.fqsn.to_string(),
345        };
346        let mut config = self.subc.get_subscription(req, retry.clone()).await?.into_inner();
347
348        let mut paths = vec![];
349        if updating.push_config.is_some() {
350            config.push_config = updating.push_config;
351            paths.push("push_config".to_string());
352        }
353        if updating.bigquery_config.is_some() {
354            config.bigquery_config = updating.bigquery_config;
355            paths.push("bigquery_config".to_string());
356        }
357        if let Some(v) = updating.ack_deadline_seconds {
358            config.ack_deadline_seconds = v;
359            paths.push("ack_deadline_seconds".to_string());
360        }
361        if let Some(v) = updating.retain_acked_messages {
362            config.retain_acked_messages = v;
363            paths.push("retain_acked_messages".to_string());
364        }
365        if updating.message_retention_duration.is_some() {
366            config.message_retention_duration = updating
367                .message_retention_duration
368                .map(prost_types::Duration::try_from)
369                .transpose()
370                .map_err(|err| Status::internal(err.to_string()))?;
371            paths.push("message_retention_duration".to_string());
372        }
373        if updating.expiration_policy.is_some() {
374            config.expiration_policy = updating.expiration_policy;
375            paths.push("expiration_policy".to_string());
376        }
377        if let Some(v) = updating.labels {
378            config.labels = v;
379            paths.push("labels".to_string());
380        }
381        if updating.retry_policy.is_some() {
382            config.retry_policy = updating.retry_policy;
383            paths.push("retry_policy".to_string());
384        }
385
386        let update_req = UpdateSubscriptionRequest {
387            subscription: Some(config),
388            update_mask: Some(FieldMask { paths }),
389        };
390        self.subc.update_subscription(update_req, retry).await.map(|v| {
391            let inner = v.into_inner();
392            (inner.topic.to_string(), inner.into())
393        })
394    }
395
396    /// pull pulls messages from the server.
397    pub async fn pull(&self, max_messages: i32, retry: Option<RetrySetting>) -> Result<Vec<ReceivedMessage>, Status> {
398        #[allow(deprecated)]
399        let req = PullRequest {
400            subscription: self.fqsn.clone(),
401            return_immediately: false,
402            max_messages,
403        };
404        let messages = self.subc.pull(req, retry).await?.into_inner().received_messages;
405        Ok(messages
406            .into_iter()
407            .filter(|m| m.message.is_some())
408            .map(|m| {
409                ReceivedMessage::new(
410                    self.fqsn.clone(),
411                    self.subc.clone(),
412                    m.message.unwrap(),
413                    m.ack_id,
414                    (m.delivery_attempt > 0).then_some(m.delivery_attempt as usize),
415                )
416            })
417            .collect())
418    }
419
420    /// subscribe creates a `Stream` of `ReceivedMessage`
421    /// ```
422    /// use google_cloud_pubsub::subscription::{SubscribeConfig, Subscription};
423    /// use tokio::select;
424    /// use google_cloud_gax::grpc::Status;
425    ///
426    /// async fn run(subscription: Subscription) -> Result<(), Status> {
427    ///     let mut iter = subscription.subscribe(None).await?;
428    ///     let ctx = iter.cancellable();
429    ///     let handler = tokio::spawn(async move {
430    ///         while let Some(message) = iter.read().await {
431    ///             let _ = message.ack().await;
432    ///         }
433    ///     });
434    ///     // Cancel and wait for nack all the pulled messages.
435    ///     ctx.cancel();
436    ///     let _ = handler.await;
437    ///     Ok(())
438    ///  }
439    /// ```
440    ///
441    /// ```
442    /// use google_cloud_pubsub::subscription::{SubscribeConfig, Subscription};
443    /// use futures_util::StreamExt;
444    /// use tokio::select;
445    /// use google_cloud_gax::grpc::Status;
446    ///
447    /// async fn run(subscription: Subscription) -> Result<(), Status> {
448    ///     let mut iter = subscription.subscribe(None).await?;
449    ///     let ctx = iter.cancellable();
450    ///     let handler = tokio::spawn(async move {
451    ///         while let Some(message) = iter.next().await {
452    ///             let _ = message.ack().await;
453    ///         }
454    ///     });
455    ///     // Cancel and wait for receive all the pulled messages.
456    ///     ctx.cancel();
457    ///     let _ = handler.await;
458    ///     Ok(())
459    ///  }
460    /// ```
461    pub async fn subscribe(&self, opt: Option<SubscribeConfig>) -> Result<MessageStream, Status> {
462        let opt = opt.unwrap_or_default();
463        let (tx, rx) = create_channel(opt.channel_capacity);
464        let cancel = CancellationToken::new();
465        let sub_opt = self.unwrap_subscribe_config(opt.subscriber_config).await?;
466
467        // spawn a separate subscriber task for each connection in the pool
468        let subscribers = if opt.enable_multiple_subscriber {
469            self.streaming_pool_size()
470        } else {
471            1
472        };
473        let mut tasks = Vec::with_capacity(subscribers);
474        for _ in 0..subscribers {
475            tasks.push(Subscriber::start(
476                cancel.clone(),
477                self.fqsn.clone(),
478                self.subc.clone(),
479                tx.clone(),
480                sub_opt.clone(),
481            ));
482        }
483
484        Ok(MessageStream {
485            queue: rx,
486            cancel,
487            tasks,
488        })
489    }
490
491    /// receive calls f with the outstanding messages from the subscription.
492    /// It blocks until cancellation token is cancelled, or the service returns a non-retryable error.
493    /// The standard way to terminate a receive is to use CancellationToken.
494    ///
495    /// # Deprecated
496    /// Deprecated since 1.4.1.
497    /// This method will be removed in **1.5.0**.
498    /// Please use [`subscribe`] instead.
499    #[deprecated(
500        since = "1.4.1",
501        note = "This will be removed in 1.5.0. Please use `subscribe` method to get a stream of messages and process the messages from the stream."
502    )]
503    pub async fn receive<F>(
504        &self,
505        f: impl Fn(ReceivedMessage, CancellationToken) -> F + Send + 'static + Sync + Clone,
506        cancel: CancellationToken,
507        config: Option<ReceiveConfig>,
508    ) -> Result<(), Status>
509    where
510        F: Future<Output = ()> + Send + 'static,
511    {
512        let op = config.unwrap_or_default();
513        let mut receivers = Vec::with_capacity(op.worker_count);
514        let mut senders = Vec::with_capacity(receivers.len());
515        let sub_opt = self.unwrap_subscribe_config(op.subscriber_config).await?;
516
517        if self
518            .config(sub_opt.retry_setting.clone())
519            .await?
520            .1
521            .enable_message_ordering
522        {
523            (0..op.worker_count).for_each(|_v| {
524                let (sender, receiver) = create_channel(op.channel_capacity);
525                receivers.push(receiver);
526                senders.push(sender);
527            });
528        } else {
529            let (sender, receiver) = create_channel(op.channel_capacity);
530            (0..op.worker_count).for_each(|_v| {
531                receivers.push(receiver.clone());
532                senders.push(sender.clone());
533            });
534        }
535
536        //same ordering key is in same stream.
537        let subscribers: Vec<Subscriber> = senders
538            .into_iter()
539            .map(|queue| {
540                Subscriber::start(cancel.clone(), self.fqsn.clone(), self.subc.clone(), queue, sub_opt.clone())
541            })
542            .collect();
543
544        let mut message_receivers = Vec::with_capacity(receivers.len());
545        for receiver in receivers {
546            let f_clone = f.clone();
547            let cancel_clone = cancel.clone();
548            let name = self.fqsn.clone();
549            message_receivers.push(tokio::spawn(async move {
550                while let Ok(message) = receiver.recv().await {
551                    f_clone(message, cancel_clone.clone()).await;
552                }
553                // queue is closed by subscriber when the cancellation token is cancelled
554                tracing::trace!("stop message receiver : {}", name);
555            }));
556        }
557        cancel.cancelled().await;
558
559        // wait for all the threads finish.
560        for mut subscriber in subscribers {
561            subscriber.done().await;
562        }
563
564        // wait for all the receivers process received messages
565        for mr in message_receivers {
566            let _ = mr.await;
567        }
568        Ok(())
569    }
570
571    /// Ack acknowledges the messages associated with the ack_ids in the AcknowledgeRequest.
572    /// The Pub/Sub system can remove the relevant messages from the subscription.
573    /// This method is for batch acking.
574    ///
575    /// ```
576    /// use google_cloud_pubsub::client::Client;
577    /// use google_cloud_pubsub::subscription::Subscription;
578    /// use google_cloud_gax::grpc::Status;
579    /// use std::time::Duration;
580    /// use tokio_util::sync::CancellationToken;;
581    ///
582    /// #[tokio::main]
583    /// async fn run(client: Client) -> Result<(), Status> {
584    ///     let subscription = client.subscription("test-subscription");
585    ///     let ctx = CancellationToken::new();
586    ///     let (sender, mut receiver)  = tokio::sync::mpsc::unbounded_channel();
587    ///     let subscription_for_receive = subscription.clone();
588    ///     let ctx_for_receive = ctx.clone();
589    ///     let ctx_for_ack_manager = ctx.clone();
590    ///
591    ///     // receive
592    ///     let handle = tokio::spawn(async move {
593    ///         let _ = subscription_for_receive.receive(move |message, _ctx| {
594    ///             let sender = sender.clone();
595    ///             async move {
596    ///                 let _ = sender.send(message.ack_id().to_string());
597    ///             }
598    ///         }, ctx_for_receive.clone(), None).await;
599    ///     });
600    ///
601    ///     // batch ack manager
602    ///     let ack_manager = tokio::spawn( async move {
603    ///         let mut ack_ids = Vec::new();
604    ///         loop {
605    ///             tokio::select! {
606    ///                 _ = ctx_for_ack_manager.cancelled() => {
607    ///                     return subscription.ack(ack_ids).await;
608    ///                 },
609    ///                 r = tokio::time::timeout(Duration::from_secs(10), receiver.recv()) => match r {
610    ///                     Ok(ack_id) => {
611    ///                         if let Some(ack_id) = ack_id {
612    ///                             ack_ids.push(ack_id);
613    ///                             if ack_ids.len() > 10 {
614    ///                                 let _ = subscription.ack(ack_ids).await;
615    ///                                 ack_ids = Vec::new();
616    ///                             }
617    ///                         }
618    ///                     },
619    ///                     Err(_e) => {
620    ///                         // timeout
621    ///                         let _ = subscription.ack(ack_ids).await;
622    ///                         ack_ids = Vec::new();
623    ///                     }
624    ///                 }
625    ///             }
626    ///         }
627    ///     });
628    ///
629    ///     ctx.cancel();
630    ///     Ok(())
631    ///  }
632    /// ```
633    pub async fn ack(&self, ack_ids: Vec<String>) -> Result<(), Status> {
634        ack(&self.subc, self.fqsn.to_string(), ack_ids).await
635    }
636
637    /// seek seeks the subscription a past timestamp or a saved snapshot.
638    pub async fn seek(&self, to: SeekTo, retry: Option<RetrySetting>) -> Result<(), Status> {
639        let to = match to {
640            SeekTo::Timestamp(t) => SeekTo::Timestamp(t),
641            SeekTo::Snapshot(name) => SeekTo::Snapshot(self.fully_qualified_snapshot_name(name.as_str())),
642        };
643
644        let req = SeekRequest {
645            subscription: self.fqsn.to_owned(),
646            target: Some(to.into()),
647        };
648
649        let _ = self.subc.seek(req, retry).await?;
650        Ok(())
651    }
652
653    /// get_snapshot fetches an existing pubsub snapshot.
654    pub async fn get_snapshot(&self, name: &str, retry: Option<RetrySetting>) -> Result<Snapshot, Status> {
655        let req = GetSnapshotRequest {
656            snapshot: self.fully_qualified_snapshot_name(name),
657        };
658        Ok(self.subc.get_snapshot(req, retry).await?.into_inner())
659    }
660
661    /// create_snapshot creates a new pubsub snapshot from the subscription's state at the time of calling.
662    /// The snapshot retains the messages for the topic the subscription is subscribed to, with the acknowledgment
663    /// states consistent with the subscriptions.
664    /// The created snapshot is guaranteed to retain:
665    /// - The message backlog on the subscription -- or to be specific, messages that are unacknowledged
666    ///   at the time of the subscription's creation.
667    /// - All messages published to the subscription's topic after the snapshot's creation.
668    ///   Snapshots have a finite lifetime -- a maximum of 7 days from the time of creation, beyond which
669    ///   they are discarded and any messages being retained solely due to the snapshot dropped.
670    pub async fn create_snapshot(
671        &self,
672        name: &str,
673        labels: HashMap<String, String>,
674        retry: Option<RetrySetting>,
675    ) -> Result<Snapshot, Status> {
676        let req = CreateSnapshotRequest {
677            name: self.fully_qualified_snapshot_name(name),
678            labels,
679            subscription: self.fqsn.to_owned(),
680        };
681        Ok(self.subc.create_snapshot(req, retry).await?.into_inner())
682    }
683
684    /// delete_snapshot deletes an existing pubsub snapshot.
685    pub async fn delete_snapshot(&self, name: &str, retry: Option<RetrySetting>) -> Result<(), Status> {
686        let req = DeleteSnapshotRequest {
687            snapshot: self.fully_qualified_snapshot_name(name),
688        };
689        let _ = self.subc.delete_snapshot(req, retry).await?;
690        Ok(())
691    }
692
693    async fn unwrap_subscribe_config(&self, cfg: Option<SubscriberConfig>) -> Result<SubscriberConfig, Status> {
694        if let Some(cfg) = cfg {
695            return Ok(cfg);
696        }
697        let cfg = self.config(None).await?;
698        let mut default_cfg = SubscriberConfig {
699            stream_ack_deadline_seconds: cfg.1.ack_deadline_seconds.clamp(10, 600),
700            ..Default::default()
701        };
702        if cfg.1.enable_exactly_once_delivery {
703            default_cfg.max_outstanding_messages = 5;
704        }
705        Ok(default_cfg)
706    }
707}
708
709fn create_channel(
710    channel_capacity: Option<usize>,
711) -> (async_channel::Sender<ReceivedMessage>, async_channel::Receiver<ReceivedMessage>) {
712    match channel_capacity {
713        None => async_channel::unbounded(),
714        Some(cap) => async_channel::bounded(cap),
715    }
716}
717
718#[cfg(test)]
719#[allow(deprecated)]
720mod tests {
721
722    use std::collections::HashMap;
723    use std::sync::atomic::AtomicU32;
724    use std::sync::atomic::Ordering::SeqCst;
725    use std::sync::{Arc, Mutex};
726    use std::time::Duration;
727
728    use futures_util::StreamExt;
729    use serial_test::serial;
730    use tokio_util::sync::CancellationToken;
731    use uuid::Uuid;
732
733    use google_cloud_gax::conn::{ConnectionOptions, Environment};
734    use google_cloud_googleapis::pubsub::v1::{PublishRequest, PubsubMessage};
735
736    use crate::apiv1::conn_pool::ConnectionManager;
737    use crate::apiv1::publisher_client::PublisherClient;
738    use crate::apiv1::subscriber_client::SubscriberClient;
739    use crate::subscriber::ReceivedMessage;
740    use crate::subscription::{
741        ReceiveConfig, SeekTo, SubscribeConfig, Subscription, SubscriptionConfig, SubscriptionConfigToUpdate,
742    };
743
744    const PROJECT_NAME: &str = "local-project";
745    const EMULATOR: &str = "localhost:8681";
746
747    #[ctor::ctor]
748    fn init() {
749        let _ = tracing_subscriber::fmt().try_init();
750    }
751
752    async fn create_subscription(enable_exactly_once_delivery: bool) -> Subscription {
753        let cm = ConnectionManager::new(
754            4,
755            "",
756            &Environment::Emulator(EMULATOR.to_string()),
757            &ConnectionOptions::default(),
758        )
759        .await
760        .unwrap();
761        let cm2 = ConnectionManager::new(
762            4,
763            "",
764            &Environment::Emulator(EMULATOR.to_string()),
765            &ConnectionOptions::default(),
766        )
767        .await
768        .unwrap();
769        let client = SubscriberClient::new(cm, cm2);
770
771        let uuid = Uuid::new_v4().hyphenated().to_string();
772        let subscription_name = format!("projects/{}/subscriptions/s{}", PROJECT_NAME, &uuid);
773        let topic_name = format!("projects/{PROJECT_NAME}/topics/test-topic1");
774        let subscription = Subscription::new(subscription_name, client);
775        let config = SubscriptionConfig {
776            enable_exactly_once_delivery,
777            ..Default::default()
778        };
779        if !subscription.exists(None).await.unwrap() {
780            subscription.create(topic_name.as_str(), config, None).await.unwrap();
781        }
782        subscription
783    }
784
785    async fn publish(messages: Option<Vec<PubsubMessage>>) {
786        let pubc = PublisherClient::new(
787            ConnectionManager::new(
788                4,
789                "",
790                &Environment::Emulator(EMULATOR.to_string()),
791                &ConnectionOptions::default(),
792            )
793            .await
794            .unwrap(),
795        );
796        let messages = messages.unwrap_or(vec![PubsubMessage {
797            data: "test_message".into(),
798            ..Default::default()
799        }]);
800        let req = PublishRequest {
801            topic: format!("projects/{PROJECT_NAME}/topics/test-topic1"),
802            messages,
803        };
804        let _ = pubc.publish(req, None).await;
805    }
806
807    async fn test_subscription(enable_exactly_once_delivery: bool) {
808        let subscription = create_subscription(enable_exactly_once_delivery).await;
809
810        let topic_name = format!("projects/{PROJECT_NAME}/topics/test-topic1");
811        let config = subscription.config(None).await.unwrap();
812        assert_eq!(config.0, topic_name);
813
814        let updating = SubscriptionConfigToUpdate {
815            ack_deadline_seconds: Some(100),
816            ..Default::default()
817        };
818        let new_config = subscription.update(updating, None).await.unwrap();
819        assert_eq!(new_config.0, topic_name);
820        assert_eq!(new_config.1.ack_deadline_seconds, 100);
821
822        let receiver_ctx = CancellationToken::new();
823        let cancel_receiver = receiver_ctx.clone();
824        let handle = tokio::spawn(async move {
825            let _ = subscription
826                .receive(
827                    |message, _ctx| async move {
828                        println!("{}", message.message.message_id);
829                        let _ = message.ack().await;
830                    },
831                    cancel_receiver,
832                    None,
833                )
834                .await;
835            subscription.delete(None).await.unwrap();
836            assert!(!subscription.exists(None).await.unwrap())
837        });
838        tokio::time::sleep(Duration::from_secs(3)).await;
839        receiver_ctx.cancel();
840        let _ = handle.await;
841    }
842
843    #[tokio::test(flavor = "multi_thread")]
844    #[serial]
845    async fn test_pull() {
846        let subscription = create_subscription(false).await;
847        let base = PubsubMessage {
848            data: "test_message".into(),
849            ..Default::default()
850        };
851        publish(Some(vec![base.clone(), base.clone(), base])).await;
852        let messages = subscription.pull(2, None).await.unwrap();
853        assert_eq!(messages.len(), 2);
854        for m in messages {
855            m.ack().await.unwrap();
856        }
857        subscription.delete(None).await.unwrap();
858    }
859
860    #[tokio::test]
861    #[serial]
862    async fn test_subscription_exactly_once() {
863        test_subscription(true).await;
864    }
865
866    #[tokio::test]
867    #[serial]
868    async fn test_subscription_at_least_once() {
869        test_subscription(false).await;
870    }
871
872    #[tokio::test(flavor = "multi_thread")]
873    #[serial]
874    async fn test_multi_subscriber_single_subscription_unbound() {
875        test_multi_subscriber_single_subscription(None).await;
876    }
877
878    #[tokio::test(flavor = "multi_thread")]
879    #[serial]
880    async fn test_multi_subscriber_single_subscription_bound() {
881        let opt = Some(ReceiveConfig {
882            channel_capacity: Some(1),
883            ..Default::default()
884        });
885        test_multi_subscriber_single_subscription(opt).await;
886    }
887
888    async fn test_multi_subscriber_single_subscription(opt: Option<ReceiveConfig>) {
889        let msg = PubsubMessage {
890            data: "test".into(),
891            ..Default::default()
892        };
893        let msg_size = 10;
894        let msgs: Vec<PubsubMessage> = (0..msg_size).map(|_v| msg.clone()).collect();
895        let subscription = create_subscription(false).await;
896        let cancellation_token = CancellationToken::new();
897        let cancel_receiver = cancellation_token.clone();
898        let v = Arc::new(AtomicU32::new(0));
899        let v2 = v.clone();
900        let handle = tokio::spawn(async move {
901            let _ = subscription
902                .receive(
903                    move |message, _ctx| {
904                        let v2 = v2.clone();
905                        async move {
906                            tracing::info!("received {}", message.message.message_id);
907                            v2.fetch_add(1, SeqCst);
908                            let _ = message.ack().await;
909                        }
910                    },
911                    cancel_receiver,
912                    opt,
913                )
914                .await;
915        });
916        publish(Some(msgs)).await;
917        tokio::time::sleep(Duration::from_secs(5)).await;
918        cancellation_token.cancel();
919        let _ = handle.await;
920        assert_eq!(v.load(SeqCst), msg_size);
921    }
922
923    #[tokio::test(flavor = "multi_thread")]
924    #[serial]
925    async fn test_multi_subscriber_multi_subscription() {
926        let mut subscriptions = vec![];
927
928        let ctx = CancellationToken::new();
929        for _ in 0..3 {
930            let subscription = create_subscription(false).await;
931            let v = Arc::new(AtomicU32::new(0));
932            let ctx = ctx.clone();
933            let v2 = v.clone();
934            let handle = tokio::spawn(async move {
935                let _ = subscription
936                    .receive(
937                        move |message, _ctx| {
938                            let v2 = v2.clone();
939                            async move {
940                                v2.fetch_add(1, SeqCst);
941                                let _ = message.ack().await;
942                            }
943                        },
944                        ctx,
945                        None,
946                    )
947                    .await;
948            });
949            subscriptions.push((handle, v))
950        }
951
952        publish(None).await;
953        tokio::time::sleep(Duration::from_secs(5)).await;
954
955        ctx.cancel();
956        for (task, v) in subscriptions {
957            let _ = task.await;
958            assert_eq!(v.load(SeqCst), 1);
959        }
960    }
961
962    #[tokio::test(flavor = "multi_thread")]
963    #[serial]
964    async fn test_batch_acking() {
965        let ctx = CancellationToken::new();
966        let subscription = create_subscription(false).await;
967        let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
968        let subscription_for_receive = subscription.clone();
969        let ctx_for_receive = ctx.clone();
970        let handle = tokio::spawn(async move {
971            let _ = subscription_for_receive
972                .receive(
973                    move |message, _ctx| {
974                        let sender = sender.clone();
975                        async move {
976                            let _ = sender.send(message.ack_id().to_string());
977                        }
978                    },
979                    ctx_for_receive.clone(),
980                    None,
981                )
982                .await;
983        });
984
985        let ctx_for_ack_manager = ctx.clone();
986        let ack_manager = tokio::spawn(async move {
987            let mut ack_ids = Vec::new();
988            while !ctx_for_ack_manager.is_cancelled() {
989                match tokio::time::timeout(Duration::from_secs(10), receiver.recv()).await {
990                    Ok(ack_id) => {
991                        if let Some(ack_id) = ack_id {
992                            ack_ids.push(ack_id);
993                            if ack_ids.len() > 10 {
994                                subscription.ack(ack_ids).await.unwrap();
995                                ack_ids = Vec::new();
996                            }
997                        }
998                    }
999                    Err(_e) => {
1000                        // timeout
1001                        subscription.ack(ack_ids).await.unwrap();
1002                        ack_ids = Vec::new();
1003                    }
1004                }
1005            }
1006            // flush
1007            subscription.ack(ack_ids).await
1008        });
1009
1010        publish(None).await;
1011        tokio::time::sleep(Duration::from_secs(5)).await;
1012
1013        ctx.cancel();
1014        let _ = handle.await;
1015        assert!(ack_manager.await.is_ok());
1016    }
1017
1018    #[tokio::test]
1019    #[serial]
1020    async fn test_snapshots() {
1021        let subscription = create_subscription(false).await;
1022
1023        let snapshot_name = format!("snapshot-{}", rand::random::<u64>());
1024        let labels: HashMap<String, String> =
1025            HashMap::from_iter([("label-1".into(), "v1".into()), ("label-2".into(), "v2".into())]);
1026        let expected_fq_snap_name = format!("projects/{PROJECT_NAME}/snapshots/{snapshot_name}");
1027
1028        // cleanup; TODO: remove?
1029        let _response = subscription.delete_snapshot(snapshot_name.as_str(), None).await;
1030
1031        // create
1032        let created_snapshot = subscription
1033            .create_snapshot(snapshot_name.as_str(), labels.clone(), None)
1034            .await
1035            .unwrap();
1036
1037        assert_eq!(created_snapshot.name, expected_fq_snap_name);
1038        // NOTE: we don't assert the labels due to lack of label support in the pubsub emulator.
1039
1040        // get
1041        let retrieved_snapshot = subscription.get_snapshot(snapshot_name.as_str(), None).await.unwrap();
1042        assert_eq!(created_snapshot, retrieved_snapshot);
1043
1044        // delete
1045        subscription
1046            .delete_snapshot(snapshot_name.as_str(), None)
1047            .await
1048            .unwrap();
1049
1050        let _deleted_snapshot_status = subscription
1051            .get_snapshot(snapshot_name.as_str(), None)
1052            .await
1053            .expect_err("snapshot should have been deleted");
1054
1055        let _delete_again = subscription
1056            .delete_snapshot(snapshot_name.as_str(), None)
1057            .await
1058            .expect_err("snapshot should already be deleted");
1059    }
1060
1061    async fn ack_all(messages: &[ReceivedMessage]) {
1062        for message in messages.iter() {
1063            message.ack().await.unwrap();
1064        }
1065    }
1066
1067    #[tokio::test]
1068    #[serial]
1069    async fn test_seek_snapshot() {
1070        let subscription = create_subscription(false).await;
1071        let snapshot_name = format!("snapshot-{}", rand::random::<u64>());
1072
1073        // publish and receive a message
1074        publish(None).await;
1075        let messages = subscription.pull(100, None).await.unwrap();
1076        ack_all(&messages).await;
1077        assert_eq!(messages.len(), 1);
1078
1079        // snapshot at received = 1
1080        let _snapshot = subscription
1081            .create_snapshot(snapshot_name.as_str(), HashMap::new(), None)
1082            .await
1083            .unwrap();
1084
1085        // publish and receive another message
1086        publish(None).await;
1087        let messages = subscription.pull(100, None).await.unwrap();
1088        assert_eq!(messages.len(), 1);
1089        ack_all(&messages).await;
1090
1091        // rewind to snapshot at received = 1
1092        subscription
1093            .seek(SeekTo::Snapshot(snapshot_name.clone()), None)
1094            .await
1095            .unwrap();
1096
1097        // assert we receive the 1 message we should receive again
1098        let messages = subscription.pull(100, None).await.unwrap();
1099        assert_eq!(messages.len(), 1);
1100        ack_all(&messages).await;
1101
1102        // cleanup
1103        subscription
1104            .delete_snapshot(snapshot_name.as_str(), None)
1105            .await
1106            .unwrap();
1107        subscription.delete(None).await.unwrap();
1108    }
1109
1110    #[tokio::test]
1111    #[serial]
1112    async fn test_seek_timestamp() {
1113        let subscription = create_subscription(false).await;
1114
1115        // enable acked message retention on subscription -- required for timestamp-based seeks
1116        subscription
1117            .update(
1118                SubscriptionConfigToUpdate {
1119                    retain_acked_messages: Some(true),
1120                    message_retention_duration: Some(Duration::new(60 * 60 * 2, 0)),
1121                    ..Default::default()
1122                },
1123                None,
1124            )
1125            .await
1126            .unwrap();
1127
1128        // publish and receive a message
1129        publish(None).await;
1130        let messages = subscription.pull(100, None).await.unwrap();
1131        ack_all(&messages).await;
1132        assert_eq!(messages.len(), 1);
1133
1134        let message_publish_time = messages.first().unwrap().message.publish_time.to_owned().unwrap();
1135
1136        // rewind to a timestamp where message was just published
1137        subscription
1138            .seek(SeekTo::Timestamp(message_publish_time.to_owned().try_into().unwrap()), None)
1139            .await
1140            .unwrap();
1141
1142        // consume -- should receive the first message again
1143        let messages = subscription.pull(100, None).await.unwrap();
1144        ack_all(&messages).await;
1145        assert_eq!(messages.len(), 1);
1146        let seek_message_publish_time = messages.first().unwrap().message.publish_time.to_owned().unwrap();
1147        assert_eq!(seek_message_publish_time, message_publish_time);
1148
1149        // cleanup
1150        subscription.delete(None).await.unwrap();
1151    }
1152
1153    #[tokio::test(flavor = "multi_thread")]
1154    #[serial]
1155    async fn test_subscribe_single_subscriber() {
1156        test_subscribe(None).await;
1157    }
1158
1159    #[tokio::test(flavor = "multi_thread")]
1160    #[serial]
1161    async fn test_subscribe_multiple_subscriber() {
1162        test_subscribe(Some(SubscribeConfig::default().with_enable_multiple_subscriber(true))).await;
1163    }
1164
1165    #[tokio::test(flavor = "multi_thread")]
1166    #[serial]
1167    async fn test_subscribe_multiple_subscriber_bound() {
1168        test_subscribe(Some(
1169            SubscribeConfig::default()
1170                .with_enable_multiple_subscriber(true)
1171                .with_channel_capacity(1),
1172        ))
1173        .await;
1174    }
1175
1176    async fn test_subscribe(opt: Option<SubscribeConfig>) {
1177        let msg = PubsubMessage {
1178            data: "test".into(),
1179            ..Default::default()
1180        };
1181        let msg_count = 10;
1182        let msg: Vec<PubsubMessage> = (0..msg_count).map(|_v| msg.clone()).collect();
1183        let subscription = create_subscription(false).await;
1184        let received = Arc::new(Mutex::new(0));
1185        let checking = received.clone();
1186        let mut iter = subscription.subscribe(opt).await.unwrap();
1187        let cancellable = iter.cancellable();
1188        let handler = tokio::spawn(async move {
1189            while let Some(message) = iter.next().await {
1190                tracing::info!("received {}", message.message.message_id);
1191                *received.lock().unwrap() += 1;
1192                tokio::time::sleep(Duration::from_millis(500)).await;
1193                let _ = message.ack().await;
1194            }
1195        });
1196        publish(Some(msg)).await;
1197        tokio::time::sleep(Duration::from_secs(8)).await;
1198        cancellable.cancel();
1199        let _ = handler.await;
1200        assert_eq!(*checking.lock().unwrap(), msg_count);
1201    }
1202
1203    #[tokio::test(flavor = "multi_thread")]
1204    #[serial]
1205    async fn test_subscribe_nack_on_cancel_read() {
1206        subscribe_nack_on_cancel_read(10, true).await;
1207        subscribe_nack_on_cancel_read(0, true).await;
1208        subscribe_nack_on_cancel_read(10, false).await;
1209        subscribe_nack_on_cancel_read(0, false).await;
1210    }
1211
1212    #[tokio::test(flavor = "multi_thread")]
1213    #[serial]
1214    async fn test_subscribe_nack_on_cancel_next() {
1215        // cancel after subscribe all message
1216        subscribe_nack_on_cancel_next(10, Duration::from_secs(3)).await;
1217        // cancel after process all message
1218        subscribe_nack_on_cancel_next(10, Duration::from_millis(0)).await;
1219        // no message
1220        subscribe_nack_on_cancel_next(0, Duration::from_secs(3)).await;
1221    }
1222
1223    async fn subscribe_nack_on_cancel_read(msg_count: usize, should_cancel: bool) {
1224        let opt = Some(SubscribeConfig::default().with_enable_multiple_subscriber(true));
1225
1226        let msg = PubsubMessage {
1227            data: "test".into(),
1228            ..Default::default()
1229        };
1230        let msg: Vec<PubsubMessage> = (0..msg_count).map(|_v| msg.clone()).collect();
1231        let subscription = create_subscription(false).await;
1232        let received = Arc::new(Mutex::new(0));
1233        let checking = received.clone();
1234
1235        let mut iter = subscription.subscribe(opt).await.unwrap();
1236        let ctx = iter.cancellable();
1237        let handler = tokio::spawn(async move {
1238            while let Some(message) = iter.read().await {
1239                tracing::info!("received {}", message.message.message_id);
1240                *received.lock().unwrap() += 1;
1241                if should_cancel {
1242                    // expect cancel
1243                    tokio::time::sleep(Duration::from_secs(10)).await;
1244                } else {
1245                    tokio::time::sleep(Duration::from_millis(1)).await;
1246                }
1247                let _ = message.ack().await;
1248            }
1249        });
1250        publish(Some(msg)).await;
1251        tokio::time::sleep(Duration::from_secs(10)).await;
1252        ctx.cancel();
1253        handler.await.unwrap();
1254        if should_cancel && msg_count > 0 {
1255            // expect nack
1256            assert!(*checking.lock().unwrap() < msg_count);
1257        } else {
1258            // all delivered
1259            assert_eq!(*checking.lock().unwrap(), msg_count);
1260        }
1261    }
1262
1263    async fn subscribe_nack_on_cancel_next(msg_count: usize, recv_time: Duration) {
1264        let opt = Some(SubscribeConfig::default().with_enable_multiple_subscriber(true));
1265
1266        let msg = PubsubMessage {
1267            data: "test".into(),
1268            ..Default::default()
1269        };
1270        let msg: Vec<PubsubMessage> = (0..msg_count).map(|_v| msg.clone()).collect();
1271        let subscription = create_subscription(false).await;
1272        let received = Arc::new(Mutex::new(0));
1273        let checking = received.clone();
1274
1275        let mut iter = subscription.subscribe(opt).await.unwrap();
1276        let ctx = iter.cancellable();
1277        let handler = tokio::spawn(async move {
1278            while let Some(message) = iter.next().await {
1279                tracing::info!("received {}", message.message.message_id);
1280                *received.lock().unwrap() += 1;
1281                tokio::time::sleep(recv_time).await;
1282                let _ = message.ack().await;
1283            }
1284        });
1285        publish(Some(msg)).await;
1286        tokio::time::sleep(Duration::from_secs(10)).await;
1287        ctx.cancel();
1288        handler.await.unwrap();
1289        assert_eq!(*checking.lock().unwrap(), msg_count);
1290    }
1291
1292    #[tokio::test(flavor = "multi_thread")]
1293    #[serial]
1294    async fn test_message_stream_dispose() {
1295        let subscription = create_subscription(false).await;
1296        let mut iter = subscription.subscribe(None).await.unwrap();
1297        iter.dispose().await;
1298        // no effect
1299        iter.dispose().await;
1300        assert!(iter.next().await.is_none());
1301    }
1302}