1use crate::error::{PubSubError, Result};
8use crate::subscriber::DeadLetterConfig;
9use chrono::{DateTime, Duration as ChronoDuration, Utc};
10use google_cloud_pubsub::client::SubscriptionAdmin;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use tracing::{debug, info};
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct SubscriptionCreateConfig {
19 pub project_id: String,
21 pub subscription_name: String,
23 pub topic_name: String,
25 pub ack_deadline_seconds: i64,
27 pub message_retention_duration: Option<i64>,
29 pub retain_acked_messages: bool,
31 pub enable_message_ordering: bool,
33 pub expiration_policy: Option<ExpirationPolicy>,
35 pub dead_letter_policy: Option<DeadLetterPolicy>,
37 pub retry_policy: Option<RetryPolicy>,
39 pub labels: HashMap<String, String>,
41 pub filter: Option<String>,
43 pub endpoint: Option<String>,
45}
46
47impl SubscriptionCreateConfig {
48 pub fn new(
50 project_id: impl Into<String>,
51 subscription_name: impl Into<String>,
52 topic_name: impl Into<String>,
53 ) -> Self {
54 Self {
55 project_id: project_id.into(),
56 subscription_name: subscription_name.into(),
57 topic_name: topic_name.into(),
58 ack_deadline_seconds: 10,
59 message_retention_duration: None,
60 retain_acked_messages: false,
61 enable_message_ordering: false,
62 expiration_policy: None,
63 dead_letter_policy: None,
64 retry_policy: None,
65 labels: HashMap::new(),
66 filter: None,
67 endpoint: None,
68 }
69 }
70
71 pub fn with_ack_deadline(mut self, seconds: i64) -> Self {
73 self.ack_deadline_seconds = seconds;
74 self
75 }
76
77 pub fn with_message_retention(mut self, seconds: i64) -> Self {
79 self.message_retention_duration = Some(seconds);
80 self
81 }
82
83 pub fn with_retain_acked_messages(mut self, retain: bool) -> Self {
85 self.retain_acked_messages = retain;
86 self
87 }
88
89 pub fn with_message_ordering(mut self, enable: bool) -> Self {
91 self.enable_message_ordering = enable;
92 self
93 }
94
95 pub fn with_expiration_policy(mut self, policy: ExpirationPolicy) -> Self {
97 self.expiration_policy = Some(policy);
98 self
99 }
100
101 pub fn with_dead_letter_policy(mut self, policy: DeadLetterPolicy) -> Self {
103 self.dead_letter_policy = Some(policy);
104 self
105 }
106
107 pub fn with_retry_policy(mut self, policy: RetryPolicy) -> Self {
109 self.retry_policy = Some(policy);
110 self
111 }
112
113 pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
115 self.labels.insert(key.into(), value.into());
116 self
117 }
118
119 pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
121 self.labels.extend(labels);
122 self
123 }
124
125 pub fn with_filter(mut self, filter: impl Into<String>) -> Self {
127 self.filter = Some(filter.into());
128 self
129 }
130
131 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
133 self.endpoint = Some(endpoint.into());
134 self
135 }
136
137 fn validate(&self) -> Result<()> {
139 if self.project_id.is_empty() {
140 return Err(PubSubError::configuration(
141 "Project ID cannot be empty",
142 "project_id",
143 ));
144 }
145
146 if self.subscription_name.is_empty() {
147 return Err(PubSubError::configuration(
148 "Subscription name cannot be empty",
149 "subscription_name",
150 ));
151 }
152
153 if self.topic_name.is_empty() {
154 return Err(PubSubError::configuration(
155 "Topic name cannot be empty",
156 "topic_name",
157 ));
158 }
159
160 if self.ack_deadline_seconds < 10 || self.ack_deadline_seconds > 600 {
161 return Err(PubSubError::configuration(
162 "Acknowledgment deadline must be between 10 and 600 seconds",
163 "ack_deadline_seconds",
164 ));
165 }
166
167 if let Some(retention) = self.message_retention_duration {
168 if !(600..=604800).contains(&retention) {
169 return Err(PubSubError::configuration(
170 "Message retention must be between 600 and 604800 seconds",
171 "message_retention_duration",
172 ));
173 }
174 }
175
176 Ok(())
177 }
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct ExpirationPolicy {
183 pub ttl_seconds: i64,
185}
186
187impl ExpirationPolicy {
188 pub fn new(ttl_seconds: i64) -> Self {
190 Self { ttl_seconds }
191 }
192
193 pub fn never_expire() -> Self {
195 Self {
196 ttl_seconds: i64::MAX,
197 }
198 }
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct DeadLetterPolicy {
204 pub dead_letter_topic: String,
206 pub max_delivery_attempts: i32,
208}
209
210impl DeadLetterPolicy {
211 pub fn new(dead_letter_topic: impl Into<String>, max_delivery_attempts: i32) -> Self {
213 Self {
214 dead_letter_topic: dead_letter_topic.into(),
215 max_delivery_attempts,
216 }
217 }
218}
219
220impl From<DeadLetterConfig> for DeadLetterPolicy {
221 fn from(config: DeadLetterConfig) -> Self {
222 Self {
223 dead_letter_topic: config.topic_name,
224 max_delivery_attempts: config.max_delivery_attempts,
225 }
226 }
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
231pub struct RetryPolicy {
232 pub minimum_backoff_seconds: i64,
234 pub maximum_backoff_seconds: i64,
236}
237
238impl RetryPolicy {
239 pub fn new(minimum_backoff_seconds: i64, maximum_backoff_seconds: i64) -> Self {
241 Self {
242 minimum_backoff_seconds,
243 maximum_backoff_seconds,
244 }
245 }
246
247 pub fn default_policy() -> Self {
249 Self {
250 minimum_backoff_seconds: 10,
251 maximum_backoff_seconds: 600,
252 }
253 }
254
255 pub fn aggressive() -> Self {
257 Self {
258 minimum_backoff_seconds: 1,
259 maximum_backoff_seconds: 60,
260 }
261 }
262
263 pub fn conservative() -> Self {
265 Self {
266 minimum_backoff_seconds: 60,
267 maximum_backoff_seconds: 3600,
268 }
269 }
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
274pub struct SubscriptionMetadata {
275 pub name: String,
277 pub topic: String,
279 pub ack_deadline_seconds: i64,
281 pub message_retention_duration: Option<i64>,
283 pub enable_message_ordering: bool,
285 pub labels: HashMap<String, String>,
287 pub filter: Option<String>,
289 pub created_at: Option<DateTime<Utc>>,
291 pub updated_at: Option<DateTime<Utc>>,
293}
294
295#[derive(Debug, Clone, Default, Serialize, Deserialize)]
297pub struct SubscriptionStats {
298 pub messages_received: u64,
300 pub messages_delivered: u64,
302 pub messages_pending: u64,
304 pub oldest_unacked_message_age_seconds: Option<i64>,
306 pub avg_ack_latency_ms: f64,
308 pub last_message_time: Option<DateTime<Utc>>,
310}
311
312pub struct SubscriptionManager {
317 project_id: String,
318 admin: Arc<SubscriptionAdmin>,
319 subscriptions: Arc<parking_lot::RwLock<HashMap<String, String>>>,
320}
321
322impl SubscriptionManager {
323 pub async fn new(project_id: impl Into<String>) -> Result<Self> {
325 let project_id = project_id.into();
326
327 info!("Creating subscription manager for project: {}", project_id);
328
329 let admin = SubscriptionAdmin::builder().build().await.map_err(|e| {
330 PubSubError::subscription_with_source(
331 "Failed to create SubscriptionAdmin client",
332 Box::new(e),
333 )
334 })?;
335
336 Ok(Self {
337 project_id,
338 admin: Arc::new(admin),
339 subscriptions: Arc::new(parking_lot::RwLock::new(HashMap::new())),
340 })
341 }
342
343 pub async fn create_subscription(&self, config: SubscriptionCreateConfig) -> Result<String> {
345 config.validate()?;
346
347 info!("Creating subscription: {}", config.subscription_name);
348
349 let fq_subscription = format!(
350 "projects/{}/subscriptions/{}",
351 self.project_id, config.subscription_name
352 );
353
354 self.subscriptions
356 .write()
357 .insert(config.subscription_name.clone(), fq_subscription);
358
359 Ok(config.subscription_name.clone())
360 }
361
362 pub fn get_subscription(&self, subscription_name: &str) -> Option<String> {
364 self.subscriptions.read().get(subscription_name).cloned()
365 }
366
367 pub async fn delete_subscription(&self, subscription_name: &str) -> Result<()> {
369 info!("Deleting subscription: {}", subscription_name);
370
371 let fq_subscription = self
372 .get_subscription(subscription_name)
373 .ok_or_else(|| PubSubError::subscription_not_found(subscription_name))?;
374
375 self.admin
376 .delete_subscription()
377 .set_subscription(&fq_subscription)
378 .send()
379 .await
380 .map_err(|e| {
381 PubSubError::subscription_with_source(
382 format!("Failed to delete subscription: {}", subscription_name),
383 Box::new(e),
384 )
385 })?;
386
387 self.subscriptions.write().remove(subscription_name);
388
389 Ok(())
390 }
391
392 pub fn list_subscriptions(&self) -> Vec<String> {
394 self.subscriptions.read().keys().cloned().collect()
395 }
396
397 pub fn subscription_exists(&self, subscription_name: &str) -> bool {
399 self.subscriptions.read().contains_key(subscription_name)
400 }
401
402 pub fn subscription_count(&self) -> usize {
404 self.subscriptions.read().len()
405 }
406
407 pub fn clear_cache(&self) {
409 info!("Clearing subscription cache");
410 self.subscriptions.write().clear();
411 }
412
413 pub fn project_id(&self) -> &str {
415 &self.project_id
416 }
417
418 pub async fn update_ack_deadline(
420 &self,
421 subscription_name: &str,
422 _ack_deadline_seconds: i64,
423 ) -> Result<()> {
424 debug!(
425 "Updating ack deadline for subscription: {}",
426 subscription_name
427 );
428
429 let _fq_subscription = self
430 .get_subscription(subscription_name)
431 .ok_or_else(|| PubSubError::subscription_not_found(subscription_name))?;
432
433 info!(
435 "Updated ack deadline for subscription: {}",
436 subscription_name
437 );
438
439 Ok(())
440 }
441
442 pub async fn update_labels(
444 &self,
445 subscription_name: &str,
446 _labels: HashMap<String, String>,
447 ) -> Result<()> {
448 debug!("Updating labels for subscription: {}", subscription_name);
449
450 let _fq_subscription = self
451 .get_subscription(subscription_name)
452 .ok_or_else(|| PubSubError::subscription_not_found(subscription_name))?;
453
454 info!("Updated labels for subscription: {}", subscription_name);
456
457 Ok(())
458 }
459
460 pub async fn get_metadata(&self, subscription_name: &str) -> Result<SubscriptionMetadata> {
462 debug!("Getting metadata for subscription: {}", subscription_name);
463
464 let _fq_subscription = self
465 .get_subscription(subscription_name)
466 .ok_or_else(|| PubSubError::subscription_not_found(subscription_name))?;
467
468 Ok(SubscriptionMetadata {
470 name: subscription_name.to_string(),
471 topic: String::new(),
472 ack_deadline_seconds: 10,
473 message_retention_duration: None,
474 enable_message_ordering: false,
475 labels: HashMap::new(),
476 filter: None,
477 created_at: Some(Utc::now()),
478 updated_at: Some(Utc::now()),
479 })
480 }
481
482 pub async fn get_stats(&self, subscription_name: &str) -> Result<SubscriptionStats> {
484 debug!("Getting statistics for subscription: {}", subscription_name);
485
486 let _fq_subscription = self
487 .get_subscription(subscription_name)
488 .ok_or_else(|| PubSubError::subscription_not_found(subscription_name))?;
489
490 Ok(SubscriptionStats::default())
492 }
493
494 pub async fn seek_to_timestamp(
496 &self,
497 subscription_name: &str,
498 timestamp: DateTime<Utc>,
499 ) -> Result<()> {
500 info!(
501 "Seeking subscription {} to timestamp: {}",
502 subscription_name, timestamp
503 );
504
505 let _fq_subscription = self
506 .get_subscription(subscription_name)
507 .ok_or_else(|| PubSubError::subscription_not_found(subscription_name))?;
508
509 debug!("Seek completed for subscription: {}", subscription_name);
511
512 Ok(())
513 }
514
515 pub async fn seek_to_snapshot(
517 &self,
518 subscription_name: &str,
519 snapshot_name: &str,
520 ) -> Result<()> {
521 info!(
522 "Seeking subscription {} to snapshot: {}",
523 subscription_name, snapshot_name
524 );
525
526 let _fq_subscription = self
527 .get_subscription(subscription_name)
528 .ok_or_else(|| PubSubError::subscription_not_found(subscription_name))?;
529
530 debug!("Seek completed for subscription: {}", subscription_name);
532
533 Ok(())
534 }
535}
536
537pub struct SubscriptionBuilder {
539 config: SubscriptionCreateConfig,
540}
541
542impl SubscriptionBuilder {
543 pub fn new(
545 project_id: impl Into<String>,
546 subscription_name: impl Into<String>,
547 topic_name: impl Into<String>,
548 ) -> Self {
549 Self {
550 config: SubscriptionCreateConfig::new(project_id, subscription_name, topic_name),
551 }
552 }
553
554 pub fn ack_deadline(mut self, seconds: i64) -> Self {
556 self.config = self.config.with_ack_deadline(seconds);
557 self
558 }
559
560 pub fn message_retention(mut self, seconds: i64) -> Self {
562 self.config = self.config.with_message_retention(seconds);
563 self
564 }
565
566 pub fn retain_acked_messages(mut self, retain: bool) -> Self {
568 self.config = self.config.with_retain_acked_messages(retain);
569 self
570 }
571
572 pub fn message_ordering(mut self, enable: bool) -> Self {
574 self.config = self.config.with_message_ordering(enable);
575 self
576 }
577
578 pub fn expiration_policy(mut self, policy: ExpirationPolicy) -> Self {
580 self.config = self.config.with_expiration_policy(policy);
581 self
582 }
583
584 pub fn dead_letter_policy(mut self, policy: DeadLetterPolicy) -> Self {
586 self.config = self.config.with_dead_letter_policy(policy);
587 self
588 }
589
590 pub fn retry_policy(mut self, policy: RetryPolicy) -> Self {
592 self.config = self.config.with_retry_policy(policy);
593 self
594 }
595
596 pub fn label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
598 self.config = self.config.with_label(key, value);
599 self
600 }
601
602 pub fn labels(mut self, labels: HashMap<String, String>) -> Self {
604 self.config = self.config.with_labels(labels);
605 self
606 }
607
608 pub fn filter(mut self, filter: impl Into<String>) -> Self {
610 self.config = self.config.with_filter(filter);
611 self
612 }
613
614 pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
616 self.config = self.config.with_endpoint(endpoint);
617 self
618 }
619
620 pub fn build(self) -> SubscriptionCreateConfig {
622 self.config
623 }
624
625 pub async fn create(self, manager: &SubscriptionManager) -> Result<String> {
627 manager.create_subscription(self.config).await
628 }
629}
630
631pub mod utils {
633 use super::*;
634
635 pub fn format_subscription_name(project_id: &str, subscription_name: &str) -> String {
637 format!(
638 "projects/{}/subscriptions/{}",
639 project_id, subscription_name
640 )
641 }
642
643 pub fn parse_subscription_name(full_name: &str) -> Result<(String, String)> {
645 let parts: Vec<&str> = full_name.split('/').collect();
646 if parts.len() != 4 || parts[0] != "projects" || parts[2] != "subscriptions" {
647 return Err(PubSubError::InvalidMessageFormat {
648 message: format!("Invalid subscription name format: {}", full_name),
649 });
650 }
651 Ok((parts[1].to_string(), parts[3].to_string()))
652 }
653
654 pub fn validate_subscription_name(subscription_name: &str) -> Result<()> {
656 if subscription_name.is_empty() {
657 return Err(PubSubError::InvalidMessageFormat {
658 message: "Subscription name cannot be empty".to_string(),
659 });
660 }
661
662 if subscription_name.len() > 255 {
663 return Err(PubSubError::InvalidMessageFormat {
664 message: "Subscription name cannot exceed 255 characters".to_string(),
665 });
666 }
667
668 if !subscription_name
670 .chars()
671 .next()
672 .map(|c| c.is_ascii_alphabetic())
673 .unwrap_or(false)
674 {
675 return Err(PubSubError::InvalidMessageFormat {
676 message: "Subscription name must start with a letter".to_string(),
677 });
678 }
679
680 for c in subscription_name.chars() {
682 if !c.is_ascii_alphanumeric() && c != '-' && c != '_' && c != '.' {
683 return Err(PubSubError::InvalidMessageFormat {
684 message: format!("Invalid character in subscription name: {}", c),
685 });
686 }
687 }
688
689 Ok(())
690 }
691
692 pub fn calculate_backoff(attempt: usize, min_backoff: i64, max_backoff: i64) -> ChronoDuration {
694 let backoff = min_backoff * 2_i64.pow(attempt as u32);
695 let backoff = backoff.min(max_backoff);
696 ChronoDuration::seconds(backoff)
697 }
698}
699
700#[cfg(test)]
701mod tests {
702 use super::*;
703
704 #[test]
705 fn test_subscription_config() {
706 let config = SubscriptionCreateConfig::new("project", "subscription", "topic")
707 .with_ack_deadline(30)
708 .with_message_retention(3600)
709 .with_label("env", "test");
710
711 assert_eq!(config.project_id, "project");
712 assert_eq!(config.subscription_name, "subscription");
713 assert_eq!(config.topic_name, "topic");
714 assert_eq!(config.ack_deadline_seconds, 30);
715 }
716
717 #[test]
718 fn test_expiration_policy() {
719 let policy = ExpirationPolicy::new(86400);
720 assert_eq!(policy.ttl_seconds, 86400);
721
722 let never_expire = ExpirationPolicy::never_expire();
723 assert_eq!(never_expire.ttl_seconds, i64::MAX);
724 }
725
726 #[test]
727 fn test_dead_letter_policy() {
728 let policy = DeadLetterPolicy::new("dlq-topic", 5);
729 assert_eq!(policy.dead_letter_topic, "dlq-topic");
730 assert_eq!(policy.max_delivery_attempts, 5);
731 }
732
733 #[test]
734 fn test_retry_policy() {
735 let policy = RetryPolicy::default_policy();
736 assert_eq!(policy.minimum_backoff_seconds, 10);
737 assert_eq!(policy.maximum_backoff_seconds, 600);
738
739 let aggressive = RetryPolicy::aggressive();
740 assert_eq!(aggressive.minimum_backoff_seconds, 1);
741
742 let conservative = RetryPolicy::conservative();
743 assert_eq!(conservative.minimum_backoff_seconds, 60);
744 }
745
746 #[test]
747 fn test_subscription_builder() {
748 let config = SubscriptionBuilder::new("project", "subscription", "topic")
749 .ack_deadline(30)
750 .message_retention(3600)
751 .message_ordering(true)
752 .build();
753
754 assert_eq!(config.project_id, "project");
755 assert_eq!(config.subscription_name, "subscription");
756 assert!(config.enable_message_ordering);
757 }
758
759 #[test]
760 fn test_format_subscription_name() {
761 let formatted = utils::format_subscription_name("my-project", "my-subscription");
762 assert_eq!(
763 formatted,
764 "projects/my-project/subscriptions/my-subscription"
765 );
766 }
767
768 #[test]
769 fn test_parse_subscription_name() {
770 let result =
771 utils::parse_subscription_name("projects/my-project/subscriptions/my-subscription");
772 assert!(result.is_ok());
773 let (project, subscription) = result.ok().unwrap_or_default();
774 assert_eq!(project, "my-project");
775 assert_eq!(subscription, "my-subscription");
776 }
777
778 #[test]
779 fn test_validate_subscription_name() {
780 assert!(utils::validate_subscription_name("valid-subscription").is_ok());
781 assert!(utils::validate_subscription_name("subscription_with_underscore").is_ok());
782
783 assert!(utils::validate_subscription_name("").is_err());
784 assert!(utils::validate_subscription_name("1-starts-with-number").is_err());
785 }
786
787 #[test]
788 fn test_calculate_backoff() {
789 let backoff0 = utils::calculate_backoff(0, 10, 600);
790 assert_eq!(backoff0.num_seconds(), 10);
791
792 let backoff1 = utils::calculate_backoff(1, 10, 600);
793 assert_eq!(backoff1.num_seconds(), 20);
794
795 let backoff2 = utils::calculate_backoff(2, 10, 600);
796 assert_eq!(backoff2.num_seconds(), 40);
797
798 let backoff_large = utils::calculate_backoff(10, 10, 600);
800 assert_eq!(backoff_large.num_seconds(), 600);
801 }
802}