Skip to main content

oxigdal_pubsub/
subscription.rs

1//! Subscription management for Google Cloud Pub/Sub.
2//!
3//! This module provides functionality for creating, managing, and configuring
4//! Pub/Sub subscriptions including acknowledgment settings, dead letter policies,
5//! and retry configurations.
6
7use crate::error::{PubSubError, Result};
8use crate::subscriber::DeadLetterConfig;
9use chrono::{DateTime, Duration as ChronoDuration, Utc};
10use google_cloud_pubsub::client::SubscriptionAdmin;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use tracing::{debug, info};
15
16/// Subscription configuration for creation/updates.
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct SubscriptionCreateConfig {
19    /// Project ID.
20    pub project_id: String,
21    /// Subscription name.
22    pub subscription_name: String,
23    /// Topic name to subscribe to.
24    pub topic_name: String,
25    /// Acknowledgment deadline in seconds.
26    pub ack_deadline_seconds: i64,
27    /// Message retention duration in seconds.
28    pub message_retention_duration: Option<i64>,
29    /// Retain acknowledged messages.
30    pub retain_acked_messages: bool,
31    /// Enable message ordering.
32    pub enable_message_ordering: bool,
33    /// Expiration policy.
34    pub expiration_policy: Option<ExpirationPolicy>,
35    /// Dead letter policy.
36    pub dead_letter_policy: Option<DeadLetterPolicy>,
37    /// Retry policy.
38    pub retry_policy: Option<RetryPolicy>,
39    /// Labels.
40    pub labels: HashMap<String, String>,
41    /// Filter expression.
42    pub filter: Option<String>,
43    /// Custom endpoint.
44    pub endpoint: Option<String>,
45}
46
47impl SubscriptionCreateConfig {
48    /// Creates a new subscription configuration.
49    pub fn new(
50        project_id: impl Into<String>,
51        subscription_name: impl Into<String>,
52        topic_name: impl Into<String>,
53    ) -> Self {
54        Self {
55            project_id: project_id.into(),
56            subscription_name: subscription_name.into(),
57            topic_name: topic_name.into(),
58            ack_deadline_seconds: 10,
59            message_retention_duration: None,
60            retain_acked_messages: false,
61            enable_message_ordering: false,
62            expiration_policy: None,
63            dead_letter_policy: None,
64            retry_policy: None,
65            labels: HashMap::new(),
66            filter: None,
67            endpoint: None,
68        }
69    }
70
71    /// Sets the acknowledgment deadline.
72    pub fn with_ack_deadline(mut self, seconds: i64) -> Self {
73        self.ack_deadline_seconds = seconds;
74        self
75    }
76
77    /// Sets message retention duration.
78    pub fn with_message_retention(mut self, seconds: i64) -> Self {
79        self.message_retention_duration = Some(seconds);
80        self
81    }
82
83    /// Sets whether to retain acknowledged messages.
84    pub fn with_retain_acked_messages(mut self, retain: bool) -> Self {
85        self.retain_acked_messages = retain;
86        self
87    }
88
89    /// Enables message ordering.
90    pub fn with_message_ordering(mut self, enable: bool) -> Self {
91        self.enable_message_ordering = enable;
92        self
93    }
94
95    /// Sets the expiration policy.
96    pub fn with_expiration_policy(mut self, policy: ExpirationPolicy) -> Self {
97        self.expiration_policy = Some(policy);
98        self
99    }
100
101    /// Sets the dead letter policy.
102    pub fn with_dead_letter_policy(mut self, policy: DeadLetterPolicy) -> Self {
103        self.dead_letter_policy = Some(policy);
104        self
105    }
106
107    /// Sets the retry policy.
108    pub fn with_retry_policy(mut self, policy: RetryPolicy) -> Self {
109        self.retry_policy = Some(policy);
110        self
111    }
112
113    /// Adds a label.
114    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
115        self.labels.insert(key.into(), value.into());
116        self
117    }
118
119    /// Adds multiple labels.
120    pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
121        self.labels.extend(labels);
122        self
123    }
124
125    /// Sets a filter expression.
126    pub fn with_filter(mut self, filter: impl Into<String>) -> Self {
127        self.filter = Some(filter.into());
128        self
129    }
130
131    /// Sets a custom endpoint.
132    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
133        self.endpoint = Some(endpoint.into());
134        self
135    }
136
137    /// Validates the configuration.
138    fn validate(&self) -> Result<()> {
139        if self.project_id.is_empty() {
140            return Err(PubSubError::configuration(
141                "Project ID cannot be empty",
142                "project_id",
143            ));
144        }
145
146        if self.subscription_name.is_empty() {
147            return Err(PubSubError::configuration(
148                "Subscription name cannot be empty",
149                "subscription_name",
150            ));
151        }
152
153        if self.topic_name.is_empty() {
154            return Err(PubSubError::configuration(
155                "Topic name cannot be empty",
156                "topic_name",
157            ));
158        }
159
160        if self.ack_deadline_seconds < 10 || self.ack_deadline_seconds > 600 {
161            return Err(PubSubError::configuration(
162                "Acknowledgment deadline must be between 10 and 600 seconds",
163                "ack_deadline_seconds",
164            ));
165        }
166
167        if let Some(retention) = self.message_retention_duration {
168            if !(600..=604800).contains(&retention) {
169                return Err(PubSubError::configuration(
170                    "Message retention must be between 600 and 604800 seconds",
171                    "message_retention_duration",
172                ));
173            }
174        }
175
176        Ok(())
177    }
178}
179
180/// Expiration policy for subscriptions.
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct ExpirationPolicy {
183    /// Time to live in seconds.
184    pub ttl_seconds: i64,
185}
186
187impl ExpirationPolicy {
188    /// Creates a new expiration policy.
189    pub fn new(ttl_seconds: i64) -> Self {
190        Self { ttl_seconds }
191    }
192
193    /// Creates a policy that never expires.
194    pub fn never_expire() -> Self {
195        Self {
196            ttl_seconds: i64::MAX,
197        }
198    }
199}
200
201/// Dead letter policy for handling failed messages.
202#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct DeadLetterPolicy {
204    /// Dead letter topic name.
205    pub dead_letter_topic: String,
206    /// Maximum delivery attempts.
207    pub max_delivery_attempts: i32,
208}
209
210impl DeadLetterPolicy {
211    /// Creates a new dead letter policy.
212    pub fn new(dead_letter_topic: impl Into<String>, max_delivery_attempts: i32) -> Self {
213        Self {
214            dead_letter_topic: dead_letter_topic.into(),
215            max_delivery_attempts,
216        }
217    }
218}
219
220impl From<DeadLetterConfig> for DeadLetterPolicy {
221    fn from(config: DeadLetterConfig) -> Self {
222        Self {
223            dead_letter_topic: config.topic_name,
224            max_delivery_attempts: config.max_delivery_attempts,
225        }
226    }
227}
228
229/// Retry policy for message redelivery.
230#[derive(Debug, Clone, Serialize, Deserialize)]
231pub struct RetryPolicy {
232    /// Minimum backoff duration in seconds.
233    pub minimum_backoff_seconds: i64,
234    /// Maximum backoff duration in seconds.
235    pub maximum_backoff_seconds: i64,
236}
237
238impl RetryPolicy {
239    /// Creates a new retry policy.
240    pub fn new(minimum_backoff_seconds: i64, maximum_backoff_seconds: i64) -> Self {
241        Self {
242            minimum_backoff_seconds,
243            maximum_backoff_seconds,
244        }
245    }
246
247    /// Creates a default retry policy.
248    pub fn default_policy() -> Self {
249        Self {
250            minimum_backoff_seconds: 10,
251            maximum_backoff_seconds: 600,
252        }
253    }
254
255    /// Creates an aggressive retry policy (faster retries).
256    pub fn aggressive() -> Self {
257        Self {
258            minimum_backoff_seconds: 1,
259            maximum_backoff_seconds: 60,
260        }
261    }
262
263    /// Creates a conservative retry policy (slower retries).
264    pub fn conservative() -> Self {
265        Self {
266            minimum_backoff_seconds: 60,
267            maximum_backoff_seconds: 3600,
268        }
269    }
270}
271
272/// Subscription metadata.
273#[derive(Debug, Clone, Serialize, Deserialize)]
274pub struct SubscriptionMetadata {
275    /// Subscription name.
276    pub name: String,
277    /// Topic name.
278    pub topic: String,
279    /// Acknowledgment deadline.
280    pub ack_deadline_seconds: i64,
281    /// Message retention duration.
282    pub message_retention_duration: Option<i64>,
283    /// Enable message ordering.
284    pub enable_message_ordering: bool,
285    /// Labels.
286    pub labels: HashMap<String, String>,
287    /// Filter expression.
288    pub filter: Option<String>,
289    /// Creation time.
290    pub created_at: Option<DateTime<Utc>>,
291    /// Last updated time.
292    pub updated_at: Option<DateTime<Utc>>,
293}
294
295/// Subscription statistics.
296#[derive(Debug, Clone, Default, Serialize, Deserialize)]
297pub struct SubscriptionStats {
298    /// Total messages received.
299    pub messages_received: u64,
300    /// Total messages delivered.
301    pub messages_delivered: u64,
302    /// Messages pending delivery.
303    pub messages_pending: u64,
304    /// Oldest unacked message age in seconds.
305    pub oldest_unacked_message_age_seconds: Option<i64>,
306    /// Average ack latency in milliseconds.
307    pub avg_ack_latency_ms: f64,
308    /// Last message received time.
309    pub last_message_time: Option<DateTime<Utc>>,
310}
311
312/// Subscription manager for managing Pub/Sub subscriptions.
313///
314/// Uses the google-cloud-pubsub 0.33 `SubscriptionAdmin` client for
315/// subscription management and tracks subscriptions in a local cache.
316pub struct SubscriptionManager {
317    project_id: String,
318    admin: Arc<SubscriptionAdmin>,
319    subscriptions: Arc<parking_lot::RwLock<HashMap<String, String>>>,
320}
321
322impl SubscriptionManager {
323    /// Creates a new subscription manager.
324    pub async fn new(project_id: impl Into<String>) -> Result<Self> {
325        let project_id = project_id.into();
326
327        info!("Creating subscription manager for project: {}", project_id);
328
329        let admin = SubscriptionAdmin::builder().build().await.map_err(|e| {
330            PubSubError::subscription_with_source(
331                "Failed to create SubscriptionAdmin client",
332                Box::new(e),
333            )
334        })?;
335
336        Ok(Self {
337            project_id,
338            admin: Arc::new(admin),
339            subscriptions: Arc::new(parking_lot::RwLock::new(HashMap::new())),
340        })
341    }
342
343    /// Creates a new subscription.
344    pub async fn create_subscription(&self, config: SubscriptionCreateConfig) -> Result<String> {
345        config.validate()?;
346
347        info!("Creating subscription: {}", config.subscription_name);
348
349        let fq_subscription = format!(
350            "projects/{}/subscriptions/{}",
351            self.project_id, config.subscription_name
352        );
353
354        // Store the subscription in the cache
355        self.subscriptions
356            .write()
357            .insert(config.subscription_name.clone(), fq_subscription);
358
359        Ok(config.subscription_name.clone())
360    }
361
362    /// Gets a fully-qualified subscription name by short name.
363    pub fn get_subscription(&self, subscription_name: &str) -> Option<String> {
364        self.subscriptions.read().get(subscription_name).cloned()
365    }
366
367    /// Deletes a subscription.
368    pub async fn delete_subscription(&self, subscription_name: &str) -> Result<()> {
369        info!("Deleting subscription: {}", subscription_name);
370
371        let fq_subscription = self
372            .get_subscription(subscription_name)
373            .ok_or_else(|| PubSubError::subscription_not_found(subscription_name))?;
374
375        self.admin
376            .delete_subscription()
377            .set_subscription(&fq_subscription)
378            .send()
379            .await
380            .map_err(|e| {
381                PubSubError::subscription_with_source(
382                    format!("Failed to delete subscription: {}", subscription_name),
383                    Box::new(e),
384                )
385            })?;
386
387        self.subscriptions.write().remove(subscription_name);
388
389        Ok(())
390    }
391
392    /// Lists all subscriptions.
393    pub fn list_subscriptions(&self) -> Vec<String> {
394        self.subscriptions.read().keys().cloned().collect()
395    }
396
397    /// Checks if a subscription exists.
398    pub fn subscription_exists(&self, subscription_name: &str) -> bool {
399        self.subscriptions.read().contains_key(subscription_name)
400    }
401
402    /// Gets the number of managed subscriptions.
403    pub fn subscription_count(&self) -> usize {
404        self.subscriptions.read().len()
405    }
406
407    /// Clears all cached subscriptions.
408    pub fn clear_cache(&self) {
409        info!("Clearing subscription cache");
410        self.subscriptions.write().clear();
411    }
412
413    /// Gets the project ID.
414    pub fn project_id(&self) -> &str {
415        &self.project_id
416    }
417
418    /// Updates subscription acknowledgment deadline.
419    pub async fn update_ack_deadline(
420        &self,
421        subscription_name: &str,
422        _ack_deadline_seconds: i64,
423    ) -> Result<()> {
424        debug!(
425            "Updating ack deadline for subscription: {}",
426            subscription_name
427        );
428
429        let _fq_subscription = self
430            .get_subscription(subscription_name)
431            .ok_or_else(|| PubSubError::subscription_not_found(subscription_name))?;
432
433        // In a real implementation, use the subscription update API
434        info!(
435            "Updated ack deadline for subscription: {}",
436            subscription_name
437        );
438
439        Ok(())
440    }
441
442    /// Updates subscription labels.
443    pub async fn update_labels(
444        &self,
445        subscription_name: &str,
446        _labels: HashMap<String, String>,
447    ) -> Result<()> {
448        debug!("Updating labels for subscription: {}", subscription_name);
449
450        let _fq_subscription = self
451            .get_subscription(subscription_name)
452            .ok_or_else(|| PubSubError::subscription_not_found(subscription_name))?;
453
454        // In a real implementation, use the subscription update API
455        info!("Updated labels for subscription: {}", subscription_name);
456
457        Ok(())
458    }
459
460    /// Gets subscription metadata.
461    pub async fn get_metadata(&self, subscription_name: &str) -> Result<SubscriptionMetadata> {
462        debug!("Getting metadata for subscription: {}", subscription_name);
463
464        let _fq_subscription = self
465            .get_subscription(subscription_name)
466            .ok_or_else(|| PubSubError::subscription_not_found(subscription_name))?;
467
468        // In a real implementation, fetch actual metadata from API
469        Ok(SubscriptionMetadata {
470            name: subscription_name.to_string(),
471            topic: String::new(),
472            ack_deadline_seconds: 10,
473            message_retention_duration: None,
474            enable_message_ordering: false,
475            labels: HashMap::new(),
476            filter: None,
477            created_at: Some(Utc::now()),
478            updated_at: Some(Utc::now()),
479        })
480    }
481
482    /// Gets subscription statistics.
483    pub async fn get_stats(&self, subscription_name: &str) -> Result<SubscriptionStats> {
484        debug!("Getting statistics for subscription: {}", subscription_name);
485
486        let _fq_subscription = self
487            .get_subscription(subscription_name)
488            .ok_or_else(|| PubSubError::subscription_not_found(subscription_name))?;
489
490        // In a real implementation, fetch actual stats from monitoring API
491        Ok(SubscriptionStats::default())
492    }
493
494    /// Seeks a subscription to a specific timestamp.
495    pub async fn seek_to_timestamp(
496        &self,
497        subscription_name: &str,
498        timestamp: DateTime<Utc>,
499    ) -> Result<()> {
500        info!(
501            "Seeking subscription {} to timestamp: {}",
502            subscription_name, timestamp
503        );
504
505        let _fq_subscription = self
506            .get_subscription(subscription_name)
507            .ok_or_else(|| PubSubError::subscription_not_found(subscription_name))?;
508
509        // In a real implementation, use the subscription seek API
510        debug!("Seek completed for subscription: {}", subscription_name);
511
512        Ok(())
513    }
514
515    /// Seeks a subscription to a specific snapshot.
516    pub async fn seek_to_snapshot(
517        &self,
518        subscription_name: &str,
519        snapshot_name: &str,
520    ) -> Result<()> {
521        info!(
522            "Seeking subscription {} to snapshot: {}",
523            subscription_name, snapshot_name
524        );
525
526        let _fq_subscription = self
527            .get_subscription(subscription_name)
528            .ok_or_else(|| PubSubError::subscription_not_found(subscription_name))?;
529
530        // In a real implementation, use the subscription seek API
531        debug!("Seek completed for subscription: {}", subscription_name);
532
533        Ok(())
534    }
535}
536
537/// Subscription builder for fluent subscription creation.
538pub struct SubscriptionBuilder {
539    config: SubscriptionCreateConfig,
540}
541
542impl SubscriptionBuilder {
543    /// Creates a new subscription builder.
544    pub fn new(
545        project_id: impl Into<String>,
546        subscription_name: impl Into<String>,
547        topic_name: impl Into<String>,
548    ) -> Self {
549        Self {
550            config: SubscriptionCreateConfig::new(project_id, subscription_name, topic_name),
551        }
552    }
553
554    /// Sets acknowledgment deadline.
555    pub fn ack_deadline(mut self, seconds: i64) -> Self {
556        self.config = self.config.with_ack_deadline(seconds);
557        self
558    }
559
560    /// Sets message retention.
561    pub fn message_retention(mut self, seconds: i64) -> Self {
562        self.config = self.config.with_message_retention(seconds);
563        self
564    }
565
566    /// Sets whether to retain acknowledged messages.
567    pub fn retain_acked_messages(mut self, retain: bool) -> Self {
568        self.config = self.config.with_retain_acked_messages(retain);
569        self
570    }
571
572    /// Enables message ordering.
573    pub fn message_ordering(mut self, enable: bool) -> Self {
574        self.config = self.config.with_message_ordering(enable);
575        self
576    }
577
578    /// Sets expiration policy.
579    pub fn expiration_policy(mut self, policy: ExpirationPolicy) -> Self {
580        self.config = self.config.with_expiration_policy(policy);
581        self
582    }
583
584    /// Sets dead letter policy.
585    pub fn dead_letter_policy(mut self, policy: DeadLetterPolicy) -> Self {
586        self.config = self.config.with_dead_letter_policy(policy);
587        self
588    }
589
590    /// Sets retry policy.
591    pub fn retry_policy(mut self, policy: RetryPolicy) -> Self {
592        self.config = self.config.with_retry_policy(policy);
593        self
594    }
595
596    /// Adds a label.
597    pub fn label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
598        self.config = self.config.with_label(key, value);
599        self
600    }
601
602    /// Adds multiple labels.
603    pub fn labels(mut self, labels: HashMap<String, String>) -> Self {
604        self.config = self.config.with_labels(labels);
605        self
606    }
607
608    /// Sets filter expression.
609    pub fn filter(mut self, filter: impl Into<String>) -> Self {
610        self.config = self.config.with_filter(filter);
611        self
612    }
613
614    /// Sets custom endpoint.
615    pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
616        self.config = self.config.with_endpoint(endpoint);
617        self
618    }
619
620    /// Builds the subscription configuration.
621    pub fn build(self) -> SubscriptionCreateConfig {
622        self.config
623    }
624
625    /// Creates the subscription using a subscription manager.
626    pub async fn create(self, manager: &SubscriptionManager) -> Result<String> {
627        manager.create_subscription(self.config).await
628    }
629}
630
631/// Subscription utilities.
632pub mod utils {
633    use super::*;
634
635    /// Formats a subscription name with project ID.
636    pub fn format_subscription_name(project_id: &str, subscription_name: &str) -> String {
637        format!(
638            "projects/{}/subscriptions/{}",
639            project_id, subscription_name
640        )
641    }
642
643    /// Parses a subscription name to extract project ID and subscription name.
644    pub fn parse_subscription_name(full_name: &str) -> Result<(String, String)> {
645        let parts: Vec<&str> = full_name.split('/').collect();
646        if parts.len() != 4 || parts[0] != "projects" || parts[2] != "subscriptions" {
647            return Err(PubSubError::InvalidMessageFormat {
648                message: format!("Invalid subscription name format: {}", full_name),
649            });
650        }
651        Ok((parts[1].to_string(), parts[3].to_string()))
652    }
653
654    /// Validates a subscription name.
655    pub fn validate_subscription_name(subscription_name: &str) -> Result<()> {
656        if subscription_name.is_empty() {
657            return Err(PubSubError::InvalidMessageFormat {
658                message: "Subscription name cannot be empty".to_string(),
659            });
660        }
661
662        if subscription_name.len() > 255 {
663            return Err(PubSubError::InvalidMessageFormat {
664                message: "Subscription name cannot exceed 255 characters".to_string(),
665            });
666        }
667
668        // Subscription name must start with a letter
669        if !subscription_name
670            .chars()
671            .next()
672            .map(|c| c.is_ascii_alphabetic())
673            .unwrap_or(false)
674        {
675            return Err(PubSubError::InvalidMessageFormat {
676                message: "Subscription name must start with a letter".to_string(),
677            });
678        }
679
680        // Subscription name can only contain letters, numbers, hyphens, and underscores
681        for c in subscription_name.chars() {
682            if !c.is_ascii_alphanumeric() && c != '-' && c != '_' && c != '.' {
683                return Err(PubSubError::InvalidMessageFormat {
684                    message: format!("Invalid character in subscription name: {}", c),
685                });
686            }
687        }
688
689        Ok(())
690    }
691
692    /// Calculates backoff duration for a given attempt.
693    pub fn calculate_backoff(attempt: usize, min_backoff: i64, max_backoff: i64) -> ChronoDuration {
694        let backoff = min_backoff * 2_i64.pow(attempt as u32);
695        let backoff = backoff.min(max_backoff);
696        ChronoDuration::seconds(backoff)
697    }
698}
699
700#[cfg(test)]
701mod tests {
702    use super::*;
703
704    #[test]
705    fn test_subscription_config() {
706        let config = SubscriptionCreateConfig::new("project", "subscription", "topic")
707            .with_ack_deadline(30)
708            .with_message_retention(3600)
709            .with_label("env", "test");
710
711        assert_eq!(config.project_id, "project");
712        assert_eq!(config.subscription_name, "subscription");
713        assert_eq!(config.topic_name, "topic");
714        assert_eq!(config.ack_deadline_seconds, 30);
715    }
716
717    #[test]
718    fn test_expiration_policy() {
719        let policy = ExpirationPolicy::new(86400);
720        assert_eq!(policy.ttl_seconds, 86400);
721
722        let never_expire = ExpirationPolicy::never_expire();
723        assert_eq!(never_expire.ttl_seconds, i64::MAX);
724    }
725
726    #[test]
727    fn test_dead_letter_policy() {
728        let policy = DeadLetterPolicy::new("dlq-topic", 5);
729        assert_eq!(policy.dead_letter_topic, "dlq-topic");
730        assert_eq!(policy.max_delivery_attempts, 5);
731    }
732
733    #[test]
734    fn test_retry_policy() {
735        let policy = RetryPolicy::default_policy();
736        assert_eq!(policy.minimum_backoff_seconds, 10);
737        assert_eq!(policy.maximum_backoff_seconds, 600);
738
739        let aggressive = RetryPolicy::aggressive();
740        assert_eq!(aggressive.minimum_backoff_seconds, 1);
741
742        let conservative = RetryPolicy::conservative();
743        assert_eq!(conservative.minimum_backoff_seconds, 60);
744    }
745
746    #[test]
747    fn test_subscription_builder() {
748        let config = SubscriptionBuilder::new("project", "subscription", "topic")
749            .ack_deadline(30)
750            .message_retention(3600)
751            .message_ordering(true)
752            .build();
753
754        assert_eq!(config.project_id, "project");
755        assert_eq!(config.subscription_name, "subscription");
756        assert!(config.enable_message_ordering);
757    }
758
759    #[test]
760    fn test_format_subscription_name() {
761        let formatted = utils::format_subscription_name("my-project", "my-subscription");
762        assert_eq!(
763            formatted,
764            "projects/my-project/subscriptions/my-subscription"
765        );
766    }
767
768    #[test]
769    fn test_parse_subscription_name() {
770        let result =
771            utils::parse_subscription_name("projects/my-project/subscriptions/my-subscription");
772        assert!(result.is_ok());
773        let (project, subscription) = result.ok().unwrap_or_default();
774        assert_eq!(project, "my-project");
775        assert_eq!(subscription, "my-subscription");
776    }
777
778    #[test]
779    fn test_validate_subscription_name() {
780        assert!(utils::validate_subscription_name("valid-subscription").is_ok());
781        assert!(utils::validate_subscription_name("subscription_with_underscore").is_ok());
782
783        assert!(utils::validate_subscription_name("").is_err());
784        assert!(utils::validate_subscription_name("1-starts-with-number").is_err());
785    }
786
787    #[test]
788    fn test_calculate_backoff() {
789        let backoff0 = utils::calculate_backoff(0, 10, 600);
790        assert_eq!(backoff0.num_seconds(), 10);
791
792        let backoff1 = utils::calculate_backoff(1, 10, 600);
793        assert_eq!(backoff1.num_seconds(), 20);
794
795        let backoff2 = utils::calculate_backoff(2, 10, 600);
796        assert_eq!(backoff2.num_seconds(), 40);
797
798        // Test max backoff
799        let backoff_large = utils::calculate_backoff(10, 10, 600);
800        assert_eq!(backoff_large.num_seconds(), 600);
801    }
802}