gcloud_pubsub/
subscription.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::{Duration, SystemTime};
6
7use prost_types::{DurationError, FieldMask};
8use tokio_util::sync::CancellationToken;
9
10use google_cloud_gax::grpc::codegen::tokio_stream::Stream;
11use google_cloud_gax::grpc::{Code, Status};
12use google_cloud_gax::retry::RetrySetting;
13use google_cloud_googleapis::pubsub::v1::seek_request::Target;
14use google_cloud_googleapis::pubsub::v1::subscription::AnalyticsHubSubscriptionInfo;
15use google_cloud_googleapis::pubsub::v1::{
16    BigQueryConfig, CloudStorageConfig, CreateSnapshotRequest, DeadLetterPolicy, DeleteSnapshotRequest,
17    DeleteSubscriptionRequest, ExpirationPolicy, GetSnapshotRequest, GetSubscriptionRequest, MessageTransform,
18    PullRequest, PushConfig, RetryPolicy, SeekRequest, Snapshot, Subscription as InternalSubscription,
19    UpdateSubscriptionRequest,
20};
21
22use crate::apiv1::subscriber_client::SubscriberClient;
23
24use crate::subscriber::{ack, ReceivedMessage, Subscriber, SubscriberConfig};
25
26#[derive(Debug, Clone, Default)]
27pub struct SubscriptionConfig {
28    pub push_config: Option<PushConfig>,
29    pub ack_deadline_seconds: i32,
30    pub retain_acked_messages: bool,
31    pub message_retention_duration: Option<Duration>,
32    pub labels: HashMap<String, String>,
33    pub enable_message_ordering: bool,
34    pub expiration_policy: Option<ExpirationPolicy>,
35    pub filter: String,
36    pub dead_letter_policy: Option<DeadLetterPolicy>,
37    pub retry_policy: Option<RetryPolicy>,
38    pub detached: bool,
39    pub topic_message_retention_duration: Option<Duration>,
40    pub enable_exactly_once_delivery: bool,
41    pub bigquery_config: Option<BigQueryConfig>,
42    pub state: i32,
43    pub cloud_storage_config: Option<CloudStorageConfig>,
44    pub analytics_hub_subscription_info: Option<AnalyticsHubSubscriptionInfo>,
45    pub message_transforms: Vec<MessageTransform>,
46}
47impl From<InternalSubscription> for SubscriptionConfig {
48    fn from(f: InternalSubscription) -> Self {
49        Self {
50            push_config: f.push_config,
51            bigquery_config: f.bigquery_config,
52            ack_deadline_seconds: f.ack_deadline_seconds,
53            retain_acked_messages: f.retain_acked_messages,
54            message_retention_duration: f
55                .message_retention_duration
56                .map(|v| std::time::Duration::new(v.seconds as u64, v.nanos as u32)),
57            labels: f.labels,
58            enable_message_ordering: f.enable_message_ordering,
59            expiration_policy: f.expiration_policy,
60            filter: f.filter,
61            dead_letter_policy: f.dead_letter_policy,
62            retry_policy: f.retry_policy,
63            detached: f.detached,
64            topic_message_retention_duration: f
65                .topic_message_retention_duration
66                .map(|v| std::time::Duration::new(v.seconds as u64, v.nanos as u32)),
67            enable_exactly_once_delivery: f.enable_exactly_once_delivery,
68            state: f.state,
69            cloud_storage_config: f.cloud_storage_config,
70            analytics_hub_subscription_info: f.analytics_hub_subscription_info,
71            message_transforms: f.message_transforms,
72        }
73    }
74}
75
76#[derive(Debug, Clone, Default)]
77pub struct SubscriptionConfigToUpdate {
78    pub push_config: Option<PushConfig>,
79    pub bigquery_config: Option<BigQueryConfig>,
80    pub ack_deadline_seconds: Option<i32>,
81    pub retain_acked_messages: Option<bool>,
82    pub message_retention_duration: Option<Duration>,
83    pub labels: Option<HashMap<String, String>>,
84    pub expiration_policy: Option<ExpirationPolicy>,
85    pub dead_letter_policy: Option<DeadLetterPolicy>,
86    pub retry_policy: Option<RetryPolicy>,
87}
88
89#[derive(Debug, Clone, Default)]
90pub struct SubscribeConfig {
91    enable_multiple_subscriber: bool,
92    channel_capacity: Option<usize>,
93    subscriber_config: Option<SubscriberConfig>,
94}
95
96impl SubscribeConfig {
97    pub fn with_enable_multiple_subscriber(mut self, v: bool) -> Self {
98        self.enable_multiple_subscriber = v;
99        self
100    }
101    pub fn with_subscriber_config(mut self, v: SubscriberConfig) -> Self {
102        self.subscriber_config = Some(v);
103        self
104    }
105    pub fn with_channel_capacity(mut self, v: usize) -> Self {
106        self.channel_capacity = Some(v);
107        self
108    }
109}
110
111#[derive(Debug, Clone)]
112pub struct ReceiveConfig {
113    pub worker_count: usize,
114    pub channel_capacity: Option<usize>,
115    pub subscriber_config: Option<SubscriberConfig>,
116}
117
118impl Default for ReceiveConfig {
119    fn default() -> Self {
120        Self {
121            worker_count: 10,
122            subscriber_config: None,
123            channel_capacity: None,
124        }
125    }
126}
127
128#[derive(Debug, Clone)]
129pub enum SeekTo {
130    Timestamp(SystemTime),
131    Snapshot(String),
132}
133
134impl From<SeekTo> for Target {
135    fn from(to: SeekTo) -> Target {
136        use SeekTo::*;
137        match to {
138            Timestamp(t) => Target::Time(prost_types::Timestamp::from(t)),
139            Snapshot(s) => Target::Snapshot(s),
140        }
141    }
142}
143
144pub struct MessageStream {
145    queue: async_channel::Receiver<ReceivedMessage>,
146    cancel: CancellationToken,
147    tasks: Vec<Subscriber>,
148}
149
150impl MessageStream {
151    pub fn cancellable(&self) -> CancellationToken {
152        self.cancel.clone()
153    }
154
155    pub async fn dispose(&mut self) {
156        // Close streaming pull task
157        if !self.cancel.is_cancelled() {
158            self.cancel.cancel();
159        }
160
161        // Wait for all the streaming pull close.
162        for task in &mut self.tasks {
163            task.done().await;
164        }
165
166        // Nack for remaining messages.
167        while let Ok(message) = self.queue.recv().await {
168            if let Err(err) = message.nack().await {
169                tracing::warn!("failed to nack message messageId={} {:?}", message.message.message_id, err);
170            }
171        }
172    }
173
174    /// Immediately Nack on cancel
175    pub async fn read(&mut self) -> Option<ReceivedMessage> {
176        let message = tokio::select! {
177            msg = self.queue.recv() => msg.ok(),
178            _ = self.cancel.cancelled() => None
179        };
180        if message.is_none() {
181            self.dispose().await;
182        }
183        message
184    }
185}
186
187impl Drop for MessageStream {
188    fn drop(&mut self) {
189        if !self.queue.is_empty() {
190            tracing::warn!("Call 'dispose' before drop in order to call nack for remaining messages");
191        }
192        if !self.cancel.is_cancelled() {
193            self.cancel.cancel();
194        }
195    }
196}
197
198impl Stream for MessageStream {
199    type Item = ReceivedMessage;
200
201    /// Return None unless the queue is open.
202    /// Use CancellationToken for SubscribeConfig to get None
203    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
204        Pin::new(&mut self.get_mut().queue).poll_next(cx)
205    }
206}
207
208/// Subscription is a reference to a PubSub subscription.
209#[derive(Clone, Debug)]
210pub struct Subscription {
211    fqsn: String,
212    subc: SubscriberClient,
213}
214
215impl Subscription {
216    pub(crate) fn new(fqsn: String, subc: SubscriberClient) -> Self {
217        Self { fqsn, subc }
218    }
219
220    pub(crate) fn streaming_pool_size(&self) -> usize {
221        self.subc.streaming_pool_size()
222    }
223
224    /// id returns the unique identifier of the subscription within its project.
225    pub fn id(&self) -> String {
226        self.fqsn
227            .rfind('/')
228            .map_or("".to_string(), |i| self.fqsn[(i + 1)..].to_string())
229    }
230
231    /// fully_qualified_name returns the globally unique printable name of the subscription.
232    pub fn fully_qualified_name(&self) -> &str {
233        self.fqsn.as_str()
234    }
235
236    /// fully_qualified_snapshot_name returns the globally unique printable name of the snapshot.
237    pub fn fully_qualified_snapshot_name(&self, id: &str) -> String {
238        if id.contains('/') {
239            id.to_string()
240        } else {
241            format!("{}/snapshots/{}", self.fully_qualified_project_name(), id)
242        }
243    }
244
245    fn fully_qualified_project_name(&self) -> String {
246        let parts: Vec<_> = self
247            .fqsn
248            .split('/')
249            .enumerate()
250            .filter(|&(i, _)| i < 2)
251            .map(|e| e.1)
252            .collect();
253        parts.join("/")
254    }
255
256    pub fn get_client(&self) -> SubscriberClient {
257        self.subc.clone()
258    }
259
260    /// create creates the subscription.
261    pub async fn create(&self, fqtn: &str, cfg: SubscriptionConfig, retry: Option<RetrySetting>) -> Result<(), Status> {
262        self.subc
263            .create_subscription(
264                InternalSubscription {
265                    name: self.fully_qualified_name().to_string(),
266                    topic: fqtn.to_string(),
267                    push_config: cfg.push_config,
268                    bigquery_config: cfg.bigquery_config,
269                    cloud_storage_config: cfg.cloud_storage_config,
270                    ack_deadline_seconds: cfg.ack_deadline_seconds,
271                    labels: cfg.labels,
272                    enable_message_ordering: cfg.enable_message_ordering,
273                    expiration_policy: cfg.expiration_policy,
274                    filter: cfg.filter,
275                    dead_letter_policy: cfg.dead_letter_policy,
276                    retry_policy: cfg.retry_policy,
277                    detached: cfg.detached,
278                    message_retention_duration: cfg
279                        .message_retention_duration
280                        .map(Duration::try_into)
281                        .transpose()
282                        .map_err(|err: DurationError| Status::internal(err.to_string()))?,
283                    retain_acked_messages: cfg.retain_acked_messages,
284                    topic_message_retention_duration: cfg
285                        .topic_message_retention_duration
286                        .map(Duration::try_into)
287                        .transpose()
288                        .map_err(|err: DurationError| Status::internal(err.to_string()))?,
289                    enable_exactly_once_delivery: cfg.enable_exactly_once_delivery,
290                    state: cfg.state,
291                    analytics_hub_subscription_info: cfg.analytics_hub_subscription_info,
292                    message_transforms: cfg.message_transforms,
293                },
294                retry,
295            )
296            .await
297            .map(|_v| ())
298    }
299
300    /// delete deletes the subscription.
301    pub async fn delete(&self, retry: Option<RetrySetting>) -> Result<(), Status> {
302        let req = DeleteSubscriptionRequest {
303            subscription: self.fqsn.to_string(),
304        };
305        self.subc.delete_subscription(req, retry).await.map(|v| v.into_inner())
306    }
307
308    /// exists reports whether the subscription exists on the server.
309    pub async fn exists(&self, retry: Option<RetrySetting>) -> Result<bool, Status> {
310        let req = GetSubscriptionRequest {
311            subscription: self.fqsn.to_string(),
312        };
313        match self.subc.get_subscription(req, retry).await {
314            Ok(_) => Ok(true),
315            Err(e) => {
316                if e.code() == Code::NotFound {
317                    Ok(false)
318                } else {
319                    Err(e)
320                }
321            }
322        }
323    }
324
325    /// config fetches the current configuration for the subscription.
326    pub async fn config(&self, retry: Option<RetrySetting>) -> Result<(String, SubscriptionConfig), Status> {
327        let req = GetSubscriptionRequest {
328            subscription: self.fqsn.to_string(),
329        };
330        self.subc.get_subscription(req, retry).await.map(|v| {
331            let inner = v.into_inner();
332            (inner.topic.to_string(), inner.into())
333        })
334    }
335
336    /// update changes an existing subscription according to the fields set in updating.
337    /// It returns the new SubscriptionConfig.
338    pub async fn update(
339        &self,
340        updating: SubscriptionConfigToUpdate,
341        retry: Option<RetrySetting>,
342    ) -> Result<(String, SubscriptionConfig), Status> {
343        let req = GetSubscriptionRequest {
344            subscription: self.fqsn.to_string(),
345        };
346        let mut config = self.subc.get_subscription(req, retry.clone()).await?.into_inner();
347
348        let mut paths = vec![];
349        if updating.push_config.is_some() {
350            config.push_config = updating.push_config;
351            paths.push("push_config".to_string());
352        }
353        if updating.bigquery_config.is_some() {
354            config.bigquery_config = updating.bigquery_config;
355            paths.push("bigquery_config".to_string());
356        }
357        if let Some(v) = updating.ack_deadline_seconds {
358            config.ack_deadline_seconds = v;
359            paths.push("ack_deadline_seconds".to_string());
360        }
361        if let Some(v) = updating.retain_acked_messages {
362            config.retain_acked_messages = v;
363            paths.push("retain_acked_messages".to_string());
364        }
365        if updating.message_retention_duration.is_some() {
366            config.message_retention_duration = updating
367                .message_retention_duration
368                .map(prost_types::Duration::try_from)
369                .transpose()
370                .map_err(|err| Status::internal(err.to_string()))?;
371            paths.push("message_retention_duration".to_string());
372        }
373        if updating.expiration_policy.is_some() {
374            config.expiration_policy = updating.expiration_policy;
375            paths.push("expiration_policy".to_string());
376        }
377        if let Some(v) = updating.labels {
378            config.labels = v;
379            paths.push("labels".to_string());
380        }
381        if updating.retry_policy.is_some() {
382            config.retry_policy = updating.retry_policy;
383            paths.push("retry_policy".to_string());
384        }
385
386        let update_req = UpdateSubscriptionRequest {
387            subscription: Some(config),
388            update_mask: Some(FieldMask { paths }),
389        };
390        self.subc.update_subscription(update_req, retry).await.map(|v| {
391            let inner = v.into_inner();
392            (inner.topic.to_string(), inner.into())
393        })
394    }
395
396    /// pull get message synchronously.
397    /// It blocks until at least one message is available.
398    pub async fn pull(&self, max_messages: i32, retry: Option<RetrySetting>) -> Result<Vec<ReceivedMessage>, Status> {
399        #[allow(deprecated)]
400        let req = PullRequest {
401            subscription: self.fqsn.clone(),
402            return_immediately: false,
403            max_messages,
404        };
405        let messages = self.subc.pull(req, retry).await?.into_inner().received_messages;
406        Ok(messages
407            .into_iter()
408            .filter(|m| m.message.is_some())
409            .map(|m| {
410                ReceivedMessage::new(
411                    self.fqsn.clone(),
412                    self.subc.clone(),
413                    m.message.unwrap(),
414                    m.ack_id,
415                    (m.delivery_attempt > 0).then_some(m.delivery_attempt as usize),
416                )
417            })
418            .collect())
419    }
420
421    /// subscribe creates a `Stream` of `ReceivedMessage`
422    /// ```
423    /// use google_cloud_pubsub::subscription::{SubscribeConfig, Subscription};
424    /// use tokio::select;
425    /// use google_cloud_gax::grpc::Status;
426    ///
427    /// async fn run(subscription: Subscription) -> Result<(), Status> {
428    ///     let mut iter = subscription.subscribe(None).await?;
429    ///     let ctx = iter.cancellable();
430    ///     let handler = tokio::spawn(async move {
431    ///         while let Some(message) = iter.read().await {
432    ///             let _ = message.ack().await;
433    ///         }
434    ///     });
435    ///     // Cancel and wait for nack all the pulled messages.
436    ///     ctx.cancel();
437    ///     let _ = handler.await;
438    ///     Ok(())
439    ///  }
440    /// ```
441    ///
442    /// ```
443    /// use google_cloud_pubsub::subscription::{SubscribeConfig, Subscription};
444    /// use futures_util::StreamExt;
445    /// use tokio::select;
446    /// use google_cloud_gax::grpc::Status;
447    ///
448    /// async fn run(subscription: Subscription) -> Result<(), Status> {
449    ///     let mut iter = subscription.subscribe(None).await?;
450    ///     let ctx = iter.cancellable();
451    ///     let handler = tokio::spawn(async move {
452    ///         while let Some(message) = iter.next().await {
453    ///             let _ = message.ack().await;
454    ///         }
455    ///     });
456    ///     // Cancel and wait for receive all the pulled messages.
457    ///     ctx.cancel();
458    ///     let _ = handler.await;
459    ///     Ok(())
460    ///  }
461    /// ```
462    pub async fn subscribe(&self, opt: Option<SubscribeConfig>) -> Result<MessageStream, Status> {
463        let opt = opt.unwrap_or_default();
464        let (tx, rx) = create_channel(opt.channel_capacity);
465        let cancel = CancellationToken::new();
466        let sub_opt = self.unwrap_subscribe_config(opt.subscriber_config).await?;
467
468        // spawn a separate subscriber task for each connection in the pool
469        let subscribers = if opt.enable_multiple_subscriber {
470            self.streaming_pool_size()
471        } else {
472            1
473        };
474        let mut tasks = Vec::with_capacity(subscribers);
475        for _ in 0..subscribers {
476            tasks.push(Subscriber::start(
477                cancel.clone(),
478                self.fqsn.clone(),
479                self.subc.clone(),
480                tx.clone(),
481                sub_opt.clone(),
482            ));
483        }
484
485        Ok(MessageStream {
486            queue: rx,
487            cancel,
488            tasks,
489        })
490    }
491
492    /// receive calls f with the outstanding messages from the subscription.
493    /// It blocks until cancellation token is cancelled, or the service returns a non-retryable error.
494    /// The standard way to terminate a receive is to use CancellationToken.
495    pub async fn receive<F>(
496        &self,
497        f: impl Fn(ReceivedMessage, CancellationToken) -> F + Send + 'static + Sync + Clone,
498        cancel: CancellationToken,
499        config: Option<ReceiveConfig>,
500    ) -> Result<(), Status>
501    where
502        F: Future<Output = ()> + Send + 'static,
503    {
504        let op = config.unwrap_or_default();
505        let mut receivers = Vec::with_capacity(op.worker_count);
506        let mut senders = Vec::with_capacity(receivers.len());
507        let sub_opt = self.unwrap_subscribe_config(op.subscriber_config).await?;
508
509        if self
510            .config(sub_opt.retry_setting.clone())
511            .await?
512            .1
513            .enable_message_ordering
514        {
515            (0..op.worker_count).for_each(|_v| {
516                let (sender, receiver) = create_channel(op.channel_capacity);
517                receivers.push(receiver);
518                senders.push(sender);
519            });
520        } else {
521            let (sender, receiver) = create_channel(op.channel_capacity);
522            (0..op.worker_count).for_each(|_v| {
523                receivers.push(receiver.clone());
524                senders.push(sender.clone());
525            });
526        }
527
528        //same ordering key is in same stream.
529        let subscribers: Vec<Subscriber> = senders
530            .into_iter()
531            .map(|queue| {
532                Subscriber::start(cancel.clone(), self.fqsn.clone(), self.subc.clone(), queue, sub_opt.clone())
533            })
534            .collect();
535
536        let mut message_receivers = Vec::with_capacity(receivers.len());
537        for receiver in receivers {
538            let f_clone = f.clone();
539            let cancel_clone = cancel.clone();
540            let name = self.fqsn.clone();
541            message_receivers.push(tokio::spawn(async move {
542                while let Ok(message) = receiver.recv().await {
543                    f_clone(message, cancel_clone.clone()).await;
544                }
545                // queue is closed by subscriber when the cancellation token is cancelled
546                tracing::trace!("stop message receiver : {}", name);
547            }));
548        }
549        cancel.cancelled().await;
550
551        // wait for all the threads finish.
552        for mut subscriber in subscribers {
553            subscriber.done().await;
554        }
555
556        // wait for all the receivers process received messages
557        for mr in message_receivers {
558            let _ = mr.await;
559        }
560        Ok(())
561    }
562
563    /// Ack acknowledges the messages associated with the ack_ids in the AcknowledgeRequest.
564    /// The Pub/Sub system can remove the relevant messages from the subscription.
565    /// This method is for batch acking.
566    ///
567    /// ```
568    /// use google_cloud_pubsub::client::Client;
569    /// use google_cloud_pubsub::subscription::Subscription;
570    /// use google_cloud_gax::grpc::Status;
571    /// use std::time::Duration;
572    /// use tokio_util::sync::CancellationToken;;
573    ///
574    /// #[tokio::main]
575    /// async fn run(client: Client) -> Result<(), Status> {
576    ///     let subscription = client.subscription("test-subscription");
577    ///     let ctx = CancellationToken::new();
578    ///     let (sender, mut receiver)  = tokio::sync::mpsc::unbounded_channel();
579    ///     let subscription_for_receive = subscription.clone();
580    ///     let ctx_for_receive = ctx.clone();
581    ///     let ctx_for_ack_manager = ctx.clone();
582    ///
583    ///     // receive
584    ///     let handle = tokio::spawn(async move {
585    ///         let _ = subscription_for_receive.receive(move |message, _ctx| {
586    ///             let sender = sender.clone();
587    ///             async move {
588    ///                 let _ = sender.send(message.ack_id().to_string());
589    ///             }
590    ///         }, ctx_for_receive.clone(), None).await;
591    ///     });
592    ///
593    ///     // batch ack manager
594    ///     let ack_manager = tokio::spawn( async move {
595    ///         let mut ack_ids = Vec::new();
596    ///         loop {
597    ///             tokio::select! {
598    ///                 _ = ctx_for_ack_manager.cancelled() => {
599    ///                     return subscription.ack(ack_ids).await;
600    ///                 },
601    ///                 r = tokio::time::timeout(Duration::from_secs(10), receiver.recv()) => match r {
602    ///                     Ok(ack_id) => {
603    ///                         if let Some(ack_id) = ack_id {
604    ///                             ack_ids.push(ack_id);
605    ///                             if ack_ids.len() > 10 {
606    ///                                 let _ = subscription.ack(ack_ids).await;
607    ///                                 ack_ids = Vec::new();
608    ///                             }
609    ///                         }
610    ///                     },
611    ///                     Err(_e) => {
612    ///                         // timeout
613    ///                         let _ = subscription.ack(ack_ids).await;
614    ///                         ack_ids = Vec::new();
615    ///                     }
616    ///                 }
617    ///             }
618    ///         }
619    ///     });
620    ///
621    ///     ctx.cancel();
622    ///     Ok(())
623    ///  }
624    /// ```
625    pub async fn ack(&self, ack_ids: Vec<String>) -> Result<(), Status> {
626        ack(&self.subc, self.fqsn.to_string(), ack_ids).await
627    }
628
629    /// seek seeks the subscription a past timestamp or a saved snapshot.
630    pub async fn seek(&self, to: SeekTo, retry: Option<RetrySetting>) -> Result<(), Status> {
631        let to = match to {
632            SeekTo::Timestamp(t) => SeekTo::Timestamp(t),
633            SeekTo::Snapshot(name) => SeekTo::Snapshot(self.fully_qualified_snapshot_name(name.as_str())),
634        };
635
636        let req = SeekRequest {
637            subscription: self.fqsn.to_owned(),
638            target: Some(to.into()),
639        };
640
641        let _ = self.subc.seek(req, retry).await?;
642        Ok(())
643    }
644
645    /// get_snapshot fetches an existing pubsub snapshot.
646    pub async fn get_snapshot(&self, name: &str, retry: Option<RetrySetting>) -> Result<Snapshot, Status> {
647        let req = GetSnapshotRequest {
648            snapshot: self.fully_qualified_snapshot_name(name),
649        };
650        Ok(self.subc.get_snapshot(req, retry).await?.into_inner())
651    }
652
653    /// create_snapshot creates a new pubsub snapshot from the subscription's state at the time of calling.
654    /// The snapshot retains the messages for the topic the subscription is subscribed to, with the acknowledgment
655    /// states consistent with the subscriptions.
656    /// The created snapshot is guaranteed to retain:
657    /// - The message backlog on the subscription -- or to be specific, messages that are unacknowledged
658    ///   at the time of the subscription's creation.
659    /// - All messages published to the subscription's topic after the snapshot's creation.
660    ///   Snapshots have a finite lifetime -- a maximum of 7 days from the time of creation, beyond which
661    ///   they are discarded and any messages being retained solely due to the snapshot dropped.
662    pub async fn create_snapshot(
663        &self,
664        name: &str,
665        labels: HashMap<String, String>,
666        retry: Option<RetrySetting>,
667    ) -> Result<Snapshot, Status> {
668        let req = CreateSnapshotRequest {
669            name: self.fully_qualified_snapshot_name(name),
670            labels,
671            subscription: self.fqsn.to_owned(),
672        };
673        Ok(self.subc.create_snapshot(req, retry).await?.into_inner())
674    }
675
676    /// delete_snapshot deletes an existing pubsub snapshot.
677    pub async fn delete_snapshot(&self, name: &str, retry: Option<RetrySetting>) -> Result<(), Status> {
678        let req = DeleteSnapshotRequest {
679            snapshot: self.fully_qualified_snapshot_name(name),
680        };
681        let _ = self.subc.delete_snapshot(req, retry).await?;
682        Ok(())
683    }
684
685    async fn unwrap_subscribe_config(&self, cfg: Option<SubscriberConfig>) -> Result<SubscriberConfig, Status> {
686        if let Some(cfg) = cfg {
687            return Ok(cfg);
688        }
689        let cfg = self.config(None).await?;
690        let mut default_cfg = SubscriberConfig {
691            stream_ack_deadline_seconds: cfg.1.ack_deadline_seconds.clamp(10, 600),
692            ..Default::default()
693        };
694        if cfg.1.enable_exactly_once_delivery {
695            default_cfg.max_outstanding_messages = 5;
696        }
697        Ok(default_cfg)
698    }
699}
700
701fn create_channel(
702    channel_capacity: Option<usize>,
703) -> (async_channel::Sender<ReceivedMessage>, async_channel::Receiver<ReceivedMessage>) {
704    match channel_capacity {
705        None => async_channel::unbounded(),
706        Some(cap) => async_channel::bounded(cap),
707    }
708}
709
710#[cfg(test)]
711mod tests {
712    use std::collections::HashMap;
713    use std::sync::atomic::AtomicU32;
714    use std::sync::atomic::Ordering::SeqCst;
715    use std::sync::{Arc, Mutex};
716    use std::time::Duration;
717
718    use futures_util::StreamExt;
719    use serial_test::serial;
720    use tokio_util::sync::CancellationToken;
721    use uuid::Uuid;
722
723    use google_cloud_gax::conn::{ConnectionOptions, Environment};
724    use google_cloud_googleapis::pubsub::v1::{PublishRequest, PubsubMessage};
725
726    use crate::apiv1::conn_pool::ConnectionManager;
727    use crate::apiv1::publisher_client::PublisherClient;
728    use crate::apiv1::subscriber_client::SubscriberClient;
729    use crate::subscriber::ReceivedMessage;
730    use crate::subscription::{
731        ReceiveConfig, SeekTo, SubscribeConfig, Subscription, SubscriptionConfig, SubscriptionConfigToUpdate,
732    };
733
734    const PROJECT_NAME: &str = "local-project";
735    const EMULATOR: &str = "localhost:8681";
736
737    #[ctor::ctor]
738    fn init() {
739        let _ = tracing_subscriber::fmt().try_init();
740    }
741
742    async fn create_subscription(enable_exactly_once_delivery: bool) -> Subscription {
743        let cm = ConnectionManager::new(
744            4,
745            "",
746            &Environment::Emulator(EMULATOR.to_string()),
747            &ConnectionOptions::default(),
748        )
749        .await
750        .unwrap();
751        let cm2 = ConnectionManager::new(
752            4,
753            "",
754            &Environment::Emulator(EMULATOR.to_string()),
755            &ConnectionOptions::default(),
756        )
757        .await
758        .unwrap();
759        let client = SubscriberClient::new(cm, cm2);
760
761        let uuid = Uuid::new_v4().hyphenated().to_string();
762        let subscription_name = format!("projects/{}/subscriptions/s{}", PROJECT_NAME, &uuid);
763        let topic_name = format!("projects/{PROJECT_NAME}/topics/test-topic1");
764        let subscription = Subscription::new(subscription_name, client);
765        let config = SubscriptionConfig {
766            enable_exactly_once_delivery,
767            ..Default::default()
768        };
769        if !subscription.exists(None).await.unwrap() {
770            subscription.create(topic_name.as_str(), config, None).await.unwrap();
771        }
772        subscription
773    }
774
775    async fn publish(messages: Option<Vec<PubsubMessage>>) {
776        let pubc = PublisherClient::new(
777            ConnectionManager::new(
778                4,
779                "",
780                &Environment::Emulator(EMULATOR.to_string()),
781                &ConnectionOptions::default(),
782            )
783            .await
784            .unwrap(),
785        );
786        let messages = messages.unwrap_or(vec![PubsubMessage {
787            data: "test_message".into(),
788            ..Default::default()
789        }]);
790        let req = PublishRequest {
791            topic: format!("projects/{PROJECT_NAME}/topics/test-topic1"),
792            messages,
793        };
794        let _ = pubc.publish(req, None).await;
795    }
796
797    async fn test_subscription(enable_exactly_once_delivery: bool) {
798        let subscription = create_subscription(enable_exactly_once_delivery).await;
799
800        let topic_name = format!("projects/{PROJECT_NAME}/topics/test-topic1");
801        let config = subscription.config(None).await.unwrap();
802        assert_eq!(config.0, topic_name);
803
804        let updating = SubscriptionConfigToUpdate {
805            ack_deadline_seconds: Some(100),
806            ..Default::default()
807        };
808        let new_config = subscription.update(updating, None).await.unwrap();
809        assert_eq!(new_config.0, topic_name);
810        assert_eq!(new_config.1.ack_deadline_seconds, 100);
811
812        let receiver_ctx = CancellationToken::new();
813        let cancel_receiver = receiver_ctx.clone();
814        let handle = tokio::spawn(async move {
815            let _ = subscription
816                .receive(
817                    |message, _ctx| async move {
818                        println!("{}", message.message.message_id);
819                        let _ = message.ack().await;
820                    },
821                    cancel_receiver,
822                    None,
823                )
824                .await;
825            subscription.delete(None).await.unwrap();
826            assert!(!subscription.exists(None).await.unwrap())
827        });
828        tokio::time::sleep(Duration::from_secs(3)).await;
829        receiver_ctx.cancel();
830        let _ = handle.await;
831    }
832
833    #[tokio::test(flavor = "multi_thread")]
834    #[serial]
835    async fn test_pull() {
836        let subscription = create_subscription(false).await;
837        let base = PubsubMessage {
838            data: "test_message".into(),
839            ..Default::default()
840        };
841        publish(Some(vec![base.clone(), base.clone(), base])).await;
842        let messages = subscription.pull(2, None).await.unwrap();
843        assert_eq!(messages.len(), 2);
844        for m in messages {
845            m.ack().await.unwrap();
846        }
847        subscription.delete(None).await.unwrap();
848    }
849
850    #[tokio::test]
851    #[serial]
852    async fn test_subscription_exactly_once() {
853        test_subscription(true).await;
854    }
855
856    #[tokio::test]
857    #[serial]
858    async fn test_subscription_at_least_once() {
859        test_subscription(false).await;
860    }
861
862    #[tokio::test(flavor = "multi_thread")]
863    #[serial]
864    async fn test_multi_subscriber_single_subscription_unbound() {
865        test_multi_subscriber_single_subscription(None).await;
866    }
867
868    #[tokio::test(flavor = "multi_thread")]
869    #[serial]
870    async fn test_multi_subscriber_single_subscription_bound() {
871        let opt = Some(ReceiveConfig {
872            channel_capacity: Some(1),
873            ..Default::default()
874        });
875        test_multi_subscriber_single_subscription(opt).await;
876    }
877
878    async fn test_multi_subscriber_single_subscription(opt: Option<ReceiveConfig>) {
879        let msg = PubsubMessage {
880            data: "test".into(),
881            ..Default::default()
882        };
883        let msg_size = 10;
884        let msgs: Vec<PubsubMessage> = (0..msg_size).map(|_v| msg.clone()).collect();
885        let subscription = create_subscription(false).await;
886        let cancellation_token = CancellationToken::new();
887        let cancel_receiver = cancellation_token.clone();
888        let v = Arc::new(AtomicU32::new(0));
889        let v2 = v.clone();
890        let handle = tokio::spawn(async move {
891            let _ = subscription
892                .receive(
893                    move |message, _ctx| {
894                        let v2 = v2.clone();
895                        async move {
896                            tracing::info!("received {}", message.message.message_id);
897                            v2.fetch_add(1, SeqCst);
898                            let _ = message.ack().await;
899                        }
900                    },
901                    cancel_receiver,
902                    opt,
903                )
904                .await;
905        });
906        publish(Some(msgs)).await;
907        tokio::time::sleep(Duration::from_secs(5)).await;
908        cancellation_token.cancel();
909        let _ = handle.await;
910        assert_eq!(v.load(SeqCst), msg_size);
911    }
912
913    #[tokio::test(flavor = "multi_thread")]
914    #[serial]
915    async fn test_multi_subscriber_multi_subscription() {
916        let mut subscriptions = vec![];
917
918        let ctx = CancellationToken::new();
919        for _ in 0..3 {
920            let subscription = create_subscription(false).await;
921            let v = Arc::new(AtomicU32::new(0));
922            let ctx = ctx.clone();
923            let v2 = v.clone();
924            let handle = tokio::spawn(async move {
925                let _ = subscription
926                    .receive(
927                        move |message, _ctx| {
928                            let v2 = v2.clone();
929                            async move {
930                                v2.fetch_add(1, SeqCst);
931                                let _ = message.ack().await;
932                            }
933                        },
934                        ctx,
935                        None,
936                    )
937                    .await;
938            });
939            subscriptions.push((handle, v))
940        }
941
942        publish(None).await;
943        tokio::time::sleep(Duration::from_secs(5)).await;
944
945        ctx.cancel();
946        for (task, v) in subscriptions {
947            let _ = task.await;
948            assert_eq!(v.load(SeqCst), 1);
949        }
950    }
951
952    #[tokio::test(flavor = "multi_thread")]
953    #[serial]
954    async fn test_batch_acking() {
955        let ctx = CancellationToken::new();
956        let subscription = create_subscription(false).await;
957        let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
958        let subscription_for_receive = subscription.clone();
959        let ctx_for_receive = ctx.clone();
960        let handle = tokio::spawn(async move {
961            let _ = subscription_for_receive
962                .receive(
963                    move |message, _ctx| {
964                        let sender = sender.clone();
965                        async move {
966                            let _ = sender.send(message.ack_id().to_string());
967                        }
968                    },
969                    ctx_for_receive.clone(),
970                    None,
971                )
972                .await;
973        });
974
975        let ctx_for_ack_manager = ctx.clone();
976        let ack_manager = tokio::spawn(async move {
977            let mut ack_ids = Vec::new();
978            while !ctx_for_ack_manager.is_cancelled() {
979                match tokio::time::timeout(Duration::from_secs(10), receiver.recv()).await {
980                    Ok(ack_id) => {
981                        if let Some(ack_id) = ack_id {
982                            ack_ids.push(ack_id);
983                            if ack_ids.len() > 10 {
984                                subscription.ack(ack_ids).await.unwrap();
985                                ack_ids = Vec::new();
986                            }
987                        }
988                    }
989                    Err(_e) => {
990                        // timeout
991                        subscription.ack(ack_ids).await.unwrap();
992                        ack_ids = Vec::new();
993                    }
994                }
995            }
996            // flush
997            subscription.ack(ack_ids).await
998        });
999
1000        publish(None).await;
1001        tokio::time::sleep(Duration::from_secs(5)).await;
1002
1003        ctx.cancel();
1004        let _ = handle.await;
1005        assert!(ack_manager.await.is_ok());
1006    }
1007
1008    #[tokio::test]
1009    #[serial]
1010    async fn test_snapshots() {
1011        let subscription = create_subscription(false).await;
1012
1013        let snapshot_name = format!("snapshot-{}", rand::random::<u64>());
1014        let labels: HashMap<String, String> =
1015            HashMap::from_iter([("label-1".into(), "v1".into()), ("label-2".into(), "v2".into())]);
1016        let expected_fq_snap_name = format!("projects/{PROJECT_NAME}/snapshots/{snapshot_name}");
1017
1018        // cleanup; TODO: remove?
1019        let _response = subscription.delete_snapshot(snapshot_name.as_str(), None).await;
1020
1021        // create
1022        let created_snapshot = subscription
1023            .create_snapshot(snapshot_name.as_str(), labels.clone(), None)
1024            .await
1025            .unwrap();
1026
1027        assert_eq!(created_snapshot.name, expected_fq_snap_name);
1028        // NOTE: we don't assert the labels due to lack of label support in the pubsub emulator.
1029
1030        // get
1031        let retrieved_snapshot = subscription.get_snapshot(snapshot_name.as_str(), None).await.unwrap();
1032        assert_eq!(created_snapshot, retrieved_snapshot);
1033
1034        // delete
1035        subscription
1036            .delete_snapshot(snapshot_name.as_str(), None)
1037            .await
1038            .unwrap();
1039
1040        let _deleted_snapshot_status = subscription
1041            .get_snapshot(snapshot_name.as_str(), None)
1042            .await
1043            .expect_err("snapshot should have been deleted");
1044
1045        let _delete_again = subscription
1046            .delete_snapshot(snapshot_name.as_str(), None)
1047            .await
1048            .expect_err("snapshot should already be deleted");
1049    }
1050
1051    async fn ack_all(messages: &[ReceivedMessage]) {
1052        for message in messages.iter() {
1053            message.ack().await.unwrap();
1054        }
1055    }
1056
1057    #[tokio::test]
1058    #[serial]
1059    async fn test_seek_snapshot() {
1060        let subscription = create_subscription(false).await;
1061        let snapshot_name = format!("snapshot-{}", rand::random::<u64>());
1062
1063        // publish and receive a message
1064        publish(None).await;
1065        let messages = subscription.pull(100, None).await.unwrap();
1066        ack_all(&messages).await;
1067        assert_eq!(messages.len(), 1);
1068
1069        // snapshot at received = 1
1070        let _snapshot = subscription
1071            .create_snapshot(snapshot_name.as_str(), HashMap::new(), None)
1072            .await
1073            .unwrap();
1074
1075        // publish and receive another message
1076        publish(None).await;
1077        let messages = subscription.pull(100, None).await.unwrap();
1078        assert_eq!(messages.len(), 1);
1079        ack_all(&messages).await;
1080
1081        // rewind to snapshot at received = 1
1082        subscription
1083            .seek(SeekTo::Snapshot(snapshot_name.clone()), None)
1084            .await
1085            .unwrap();
1086
1087        // assert we receive the 1 message we should receive again
1088        let messages = subscription.pull(100, None).await.unwrap();
1089        assert_eq!(messages.len(), 1);
1090        ack_all(&messages).await;
1091
1092        // cleanup
1093        subscription
1094            .delete_snapshot(snapshot_name.as_str(), None)
1095            .await
1096            .unwrap();
1097        subscription.delete(None).await.unwrap();
1098    }
1099
1100    #[tokio::test]
1101    #[serial]
1102    async fn test_seek_timestamp() {
1103        let subscription = create_subscription(false).await;
1104
1105        // enable acked message retention on subscription -- required for timestamp-based seeks
1106        subscription
1107            .update(
1108                SubscriptionConfigToUpdate {
1109                    retain_acked_messages: Some(true),
1110                    message_retention_duration: Some(Duration::new(60 * 60 * 2, 0)),
1111                    ..Default::default()
1112                },
1113                None,
1114            )
1115            .await
1116            .unwrap();
1117
1118        // publish and receive a message
1119        publish(None).await;
1120        let messages = subscription.pull(100, None).await.unwrap();
1121        ack_all(&messages).await;
1122        assert_eq!(messages.len(), 1);
1123
1124        let message_publish_time = messages.first().unwrap().message.publish_time.to_owned().unwrap();
1125
1126        // rewind to a timestamp where message was just published
1127        subscription
1128            .seek(SeekTo::Timestamp(message_publish_time.to_owned().try_into().unwrap()), None)
1129            .await
1130            .unwrap();
1131
1132        // consume -- should receive the first message again
1133        let messages = subscription.pull(100, None).await.unwrap();
1134        ack_all(&messages).await;
1135        assert_eq!(messages.len(), 1);
1136        let seek_message_publish_time = messages.first().unwrap().message.publish_time.to_owned().unwrap();
1137        assert_eq!(seek_message_publish_time, message_publish_time);
1138
1139        // cleanup
1140        subscription.delete(None).await.unwrap();
1141    }
1142
1143    #[tokio::test(flavor = "multi_thread")]
1144    #[serial]
1145    async fn test_subscribe_single_subscriber() {
1146        test_subscribe(None).await;
1147    }
1148
1149    #[tokio::test(flavor = "multi_thread")]
1150    #[serial]
1151    async fn test_subscribe_multiple_subscriber() {
1152        test_subscribe(Some(SubscribeConfig::default().with_enable_multiple_subscriber(true))).await;
1153    }
1154
1155    #[tokio::test(flavor = "multi_thread")]
1156    #[serial]
1157    async fn test_subscribe_multiple_subscriber_bound() {
1158        test_subscribe(Some(
1159            SubscribeConfig::default()
1160                .with_enable_multiple_subscriber(true)
1161                .with_channel_capacity(1),
1162        ))
1163        .await;
1164    }
1165
1166    async fn test_subscribe(opt: Option<SubscribeConfig>) {
1167        let msg = PubsubMessage {
1168            data: "test".into(),
1169            ..Default::default()
1170        };
1171        let msg_count = 10;
1172        let msg: Vec<PubsubMessage> = (0..msg_count).map(|_v| msg.clone()).collect();
1173        let subscription = create_subscription(false).await;
1174        let received = Arc::new(Mutex::new(0));
1175        let checking = received.clone();
1176        let mut iter = subscription.subscribe(opt).await.unwrap();
1177        let cancellable = iter.cancellable();
1178        let handler = tokio::spawn(async move {
1179            while let Some(message) = iter.next().await {
1180                tracing::info!("received {}", message.message.message_id);
1181                *received.lock().unwrap() += 1;
1182                tokio::time::sleep(Duration::from_millis(500)).await;
1183                let _ = message.ack().await;
1184            }
1185        });
1186        publish(Some(msg)).await;
1187        tokio::time::sleep(Duration::from_secs(8)).await;
1188        cancellable.cancel();
1189        let _ = handler.await;
1190        assert_eq!(*checking.lock().unwrap(), msg_count);
1191    }
1192
1193    #[tokio::test(flavor = "multi_thread")]
1194    #[serial]
1195    async fn test_subscribe_nack_on_cancel_read() {
1196        subscribe_nack_on_cancel_read(10, true).await;
1197        subscribe_nack_on_cancel_read(0, true).await;
1198        subscribe_nack_on_cancel_read(10, false).await;
1199        subscribe_nack_on_cancel_read(0, false).await;
1200    }
1201
1202    #[tokio::test(flavor = "multi_thread")]
1203    #[serial]
1204    async fn test_subscribe_nack_on_cancel_next() {
1205        // cancel after subscribe all message
1206        subscribe_nack_on_cancel_next(10, Duration::from_secs(3)).await;
1207        // cancel after process all message
1208        subscribe_nack_on_cancel_next(10, Duration::from_millis(0)).await;
1209        // no message
1210        subscribe_nack_on_cancel_next(0, Duration::from_secs(3)).await;
1211    }
1212
1213    async fn subscribe_nack_on_cancel_read(msg_count: usize, should_cancel: bool) {
1214        let opt = Some(SubscribeConfig::default().with_enable_multiple_subscriber(true));
1215
1216        let msg = PubsubMessage {
1217            data: "test".into(),
1218            ..Default::default()
1219        };
1220        let msg: Vec<PubsubMessage> = (0..msg_count).map(|_v| msg.clone()).collect();
1221        let subscription = create_subscription(false).await;
1222        let received = Arc::new(Mutex::new(0));
1223        let checking = received.clone();
1224
1225        let mut iter = subscription.subscribe(opt).await.unwrap();
1226        let ctx = iter.cancellable();
1227        let handler = tokio::spawn(async move {
1228            while let Some(message) = iter.read().await {
1229                tracing::info!("received {}", message.message.message_id);
1230                *received.lock().unwrap() += 1;
1231                if should_cancel {
1232                    // expect cancel
1233                    tokio::time::sleep(Duration::from_secs(10)).await;
1234                } else {
1235                    tokio::time::sleep(Duration::from_millis(1)).await;
1236                }
1237                let _ = message.ack().await;
1238            }
1239        });
1240        publish(Some(msg)).await;
1241        tokio::time::sleep(Duration::from_secs(10)).await;
1242        ctx.cancel();
1243        handler.await.unwrap();
1244        if should_cancel && msg_count > 0 {
1245            // expect nack
1246            assert!(*checking.lock().unwrap() < msg_count);
1247        } else {
1248            // all delivered
1249            assert_eq!(*checking.lock().unwrap(), msg_count);
1250        }
1251    }
1252
1253    async fn subscribe_nack_on_cancel_next(msg_count: usize, recv_time: Duration) {
1254        let opt = Some(SubscribeConfig::default().with_enable_multiple_subscriber(true));
1255
1256        let msg = PubsubMessage {
1257            data: "test".into(),
1258            ..Default::default()
1259        };
1260        let msg: Vec<PubsubMessage> = (0..msg_count).map(|_v| msg.clone()).collect();
1261        let subscription = create_subscription(false).await;
1262        let received = Arc::new(Mutex::new(0));
1263        let checking = received.clone();
1264
1265        let mut iter = subscription.subscribe(opt).await.unwrap();
1266        let ctx = iter.cancellable();
1267        let handler = tokio::spawn(async move {
1268            while let Some(message) = iter.next().await {
1269                tracing::info!("received {}", message.message.message_id);
1270                *received.lock().unwrap() += 1;
1271                tokio::time::sleep(recv_time).await;
1272                let _ = message.ack().await;
1273            }
1274        });
1275        publish(Some(msg)).await;
1276        tokio::time::sleep(Duration::from_secs(10)).await;
1277        ctx.cancel();
1278        handler.await.unwrap();
1279        assert_eq!(*checking.lock().unwrap(), msg_count);
1280    }
1281
1282    #[tokio::test(flavor = "multi_thread")]
1283    #[serial]
1284    async fn test_message_stream_dispose() {
1285        let subscription = create_subscription(false).await;
1286        let mut iter = subscription.subscribe(None).await.unwrap();
1287        iter.dispose().await;
1288        // no effect
1289        iter.dispose().await;
1290        assert!(iter.next().await.is_none());
1291    }
1292}