1use crate::error::{PubSubError, Result};
8use bytes::Bytes;
9use chrono::{DateTime, Utc};
10use dashmap::DashMap;
11use google_cloud_pubsub::client::Subscriber as GcpSubscriber;
12use google_cloud_pubsub::client::SubscriptionAdmin;
13use parking_lot::RwLock;
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::time::Duration;
19use tokio::task::JoinHandle;
20use tracing::{debug, error, info};
21
22pub const DEFAULT_MAX_OUTSTANDING_MESSAGES: usize = 1000;
24
25pub const DEFAULT_MAX_OUTSTANDING_BYTES: usize = 100_000_000; pub const DEFAULT_ACK_DEADLINE_SECONDS: i64 = 10;
30
31pub const DEFAULT_HANDLER_CONCURRENCY: usize = 10;
33
34#[derive(Debug, Clone)]
36pub struct ReceivedMessage {
37 pub message_id: String,
39 pub data: Bytes,
41 pub attributes: HashMap<String, String>,
43 pub publish_time: DateTime<Utc>,
45 pub ordering_key: Option<String>,
47 pub delivery_attempt: i32,
49 #[allow(dead_code)]
51 pub(crate) ack_id: String,
52}
53
54impl ReceivedMessage {
55 pub fn size(&self) -> usize {
57 self.data.len()
58 }
59
60 pub fn is_redelivery(&self) -> bool {
62 self.delivery_attempt > 1
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum SubscriptionType {
69 Pull,
71 Push,
73}
74
75#[derive(Debug, Clone)]
77pub struct FlowControlSettings {
78 pub max_outstanding_messages: usize,
80 pub max_outstanding_bytes: usize,
82 pub max_messages_per_second: u64,
84}
85
86impl Default for FlowControlSettings {
87 fn default() -> Self {
88 Self {
89 max_outstanding_messages: DEFAULT_MAX_OUTSTANDING_MESSAGES,
90 max_outstanding_bytes: DEFAULT_MAX_OUTSTANDING_BYTES,
91 max_messages_per_second: 0,
92 }
93 }
94}
95
96#[derive(Debug, Clone)]
98pub struct DeadLetterConfig {
99 pub topic_name: String,
101 pub max_delivery_attempts: i32,
103}
104
105impl DeadLetterConfig {
106 pub fn new(topic_name: impl Into<String>, max_delivery_attempts: i32) -> Self {
108 Self {
109 topic_name: topic_name.into(),
110 max_delivery_attempts,
111 }
112 }
113}
114
115#[derive(Debug, Clone)]
117pub struct SubscriberConfig {
118 pub project_id: String,
120 pub subscription_name: String,
122 pub subscription_type: SubscriptionType,
124 pub ack_deadline_seconds: i64,
126 pub flow_control: FlowControlSettings,
128 pub handler_concurrency: usize,
130 pub enable_ordering: bool,
132 pub dead_letter_config: Option<DeadLetterConfig>,
134 pub endpoint: Option<String>,
136 pub auto_extend_deadline: bool,
138}
139
140impl Default for SubscriberConfig {
141 fn default() -> Self {
142 Self {
143 project_id: String::new(),
144 subscription_name: String::new(),
145 subscription_type: SubscriptionType::Pull,
146 ack_deadline_seconds: DEFAULT_ACK_DEADLINE_SECONDS,
147 flow_control: FlowControlSettings::default(),
148 handler_concurrency: DEFAULT_HANDLER_CONCURRENCY,
149 enable_ordering: false,
150 dead_letter_config: None,
151 endpoint: None,
152 auto_extend_deadline: true,
153 }
154 }
155}
156
157impl SubscriberConfig {
158 pub fn new(project_id: impl Into<String>, subscription_name: impl Into<String>) -> Self {
160 Self {
161 project_id: project_id.into(),
162 subscription_name: subscription_name.into(),
163 ..Default::default()
164 }
165 }
166
167 pub fn with_type(mut self, subscription_type: SubscriptionType) -> Self {
169 self.subscription_type = subscription_type;
170 self
171 }
172
173 pub fn with_ack_deadline(mut self, seconds: i64) -> Self {
175 self.ack_deadline_seconds = seconds;
176 self
177 }
178
179 pub fn with_flow_control(mut self, settings: FlowControlSettings) -> Self {
181 self.flow_control = settings;
182 self
183 }
184
185 pub fn with_handler_concurrency(mut self, concurrency: usize) -> Self {
187 self.handler_concurrency = concurrency;
188 self
189 }
190
191 pub fn with_ordering(mut self, enable: bool) -> Self {
193 self.enable_ordering = enable;
194 self
195 }
196
197 pub fn with_dead_letter(mut self, config: DeadLetterConfig) -> Self {
199 self.dead_letter_config = Some(config);
200 self
201 }
202
203 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
205 self.endpoint = Some(endpoint.into());
206 self
207 }
208
209 fn validate(&self) -> Result<()> {
211 if self.project_id.is_empty() {
212 return Err(PubSubError::configuration(
213 "Project ID cannot be empty",
214 "project_id",
215 ));
216 }
217
218 if self.subscription_name.is_empty() {
219 return Err(PubSubError::configuration(
220 "Subscription name cannot be empty",
221 "subscription_name",
222 ));
223 }
224
225 if self.ack_deadline_seconds < 10 || self.ack_deadline_seconds > 600 {
226 return Err(PubSubError::configuration(
227 "Acknowledgment deadline must be between 10 and 600 seconds",
228 "ack_deadline_seconds",
229 ));
230 }
231
232 if self.handler_concurrency == 0 {
233 return Err(PubSubError::configuration(
234 "Handler concurrency must be greater than 0",
235 "handler_concurrency",
236 ));
237 }
238
239 Ok(())
240 }
241}
242
243#[derive(Debug, Clone, Default, Serialize, Deserialize)]
245pub struct SubscriberStats {
246 pub messages_received: u64,
248 pub bytes_received: u64,
250 pub messages_acknowledged: u64,
252 pub messages_nacked: u64,
254 pub messages_to_dlq: u64,
256 pub ack_errors: u64,
258 pub outstanding_messages: u64,
260 pub outstanding_bytes: u64,
262 pub last_receive: Option<DateTime<Utc>>,
264}
265
266pub enum HandlerResult {
268 Ack,
270 Nack,
272 DeadLetter,
274}
275
276pub type MessageHandler = Arc<dyn Fn(ReceivedMessage) -> HandlerResult + Send + Sync>;
278
279pub struct Subscriber {
281 config: SubscriberConfig,
282 gcp_subscriber: Arc<GcpSubscriber>,
284 #[allow(dead_code)]
286 admin: Arc<SubscriptionAdmin>,
287 fq_subscription: String,
289 stats: Arc<RwLock<SubscriberStats>>,
290 outstanding_messages: Arc<DashMap<String, ReceivedMessage>>,
291 running: Arc<AtomicBool>,
292 message_count: Arc<AtomicU64>,
293 byte_count: Arc<AtomicU64>,
294}
295
296impl Subscriber {
297 pub async fn new(config: SubscriberConfig) -> Result<Self> {
299 config.validate()?;
300
301 info!(
302 "Creating subscriber for subscription: {}/{}",
303 config.project_id, config.subscription_name
304 );
305
306 let fq_subscription = format!(
307 "projects/{}/subscriptions/{}",
308 config.project_id, config.subscription_name
309 );
310
311 let mut sub_builder = GcpSubscriber::builder();
313 if let Some(endpoint) = &config.endpoint {
314 sub_builder = sub_builder.with_endpoint(endpoint);
315 }
316 let gcp_subscriber = sub_builder.build().await.map_err(|e| {
317 PubSubError::subscription_with_source(
318 "Failed to create Pub/Sub subscriber client",
319 Box::new(e),
320 )
321 })?;
322
323 let mut admin_builder = SubscriptionAdmin::builder();
325 if let Some(endpoint) = &config.endpoint {
326 admin_builder = admin_builder.with_endpoint(endpoint);
327 }
328 let admin = admin_builder.build().await.map_err(|e| {
329 PubSubError::subscription_with_source(
330 "Failed to create subscription admin client",
331 Box::new(e),
332 )
333 })?;
334
335 Ok(Self {
336 config,
337 gcp_subscriber: Arc::new(gcp_subscriber),
338 admin: Arc::new(admin),
339 fq_subscription,
340 stats: Arc::new(RwLock::new(SubscriberStats::default())),
341 outstanding_messages: Arc::new(DashMap::new()),
342 running: Arc::new(AtomicBool::new(false)),
343 message_count: Arc::new(AtomicU64::new(0)),
344 byte_count: Arc::new(AtomicU64::new(0)),
345 })
346 }
347
348 pub async fn pull_one(&self) -> Result<Option<ReceivedMessage>> {
353 self.check_flow_control(1, 0)?;
354
355 debug!(
356 "Pulling message from subscription: {}",
357 self.config.subscription_name
358 );
359
360 let mut stream = self.gcp_subscriber.subscribe(&self.fq_subscription).build();
362
363 let result = tokio::time::timeout(Duration::from_millis(500), stream.next()).await;
365
366 match result {
367 Ok(Some(Ok((msg, handler)))) => {
368 let received = ReceivedMessage {
369 message_id: msg.message_id.clone(),
370 data: msg.data.clone(),
371 attributes: msg.attributes.clone(),
372 publish_time: Utc::now(),
373 ordering_key: if msg.ordering_key.is_empty() {
374 None
375 } else {
376 Some(msg.ordering_key.clone())
377 },
378 delivery_attempt: 0,
379 ack_id: msg.message_id.clone(),
380 };
381
382 handler.ack();
385 self.track_message(&received);
386 Ok(Some(received))
387 }
388 Ok(Some(Err(e))) => Err(PubSubError::subscription_with_source(
389 "Failed to pull message",
390 Box::new(e),
391 )),
392 Ok(None) | Err(_) => Ok(None),
393 }
394 }
395
396 pub async fn pull(&self, max_messages: i32) -> Result<Vec<ReceivedMessage>> {
400 self.check_flow_control(max_messages as usize, 0)?;
401
402 debug!(
403 "Pulling up to {} messages from subscription: {}",
404 max_messages, self.config.subscription_name
405 );
406
407 let mut stream = self.gcp_subscriber.subscribe(&self.fq_subscription).build();
408
409 let mut received = Vec::new();
410 let timeout_duration = Duration::from_millis(500);
411
412 for _i in 0..max_messages {
413 let result = tokio::time::timeout(timeout_duration, stream.next()).await;
414 match result {
415 Ok(Some(Ok((msg, handler)))) => {
416 let message = ReceivedMessage {
417 message_id: msg.message_id.clone(),
418 data: msg.data.clone(),
419 attributes: msg.attributes.clone(),
420 publish_time: Utc::now(),
421 ordering_key: if msg.ordering_key.is_empty() {
422 None
423 } else {
424 Some(msg.ordering_key.clone())
425 },
426 delivery_attempt: 0,
427 ack_id: msg.message_id.clone(),
428 };
429 handler.ack();
430 self.track_message(&message);
431 received.push(message);
432 }
433 Ok(Some(Err(e))) => {
434 return Err(PubSubError::subscription_with_source(
435 "Failed to pull messages",
436 Box::new(e),
437 ));
438 }
439 Ok(None) | Err(_) => break,
440 }
441 }
442
443 Ok(received)
444 }
445
446 pub async fn acknowledge(&self, message: &ReceivedMessage) -> Result<()> {
451 debug!("Acknowledging message: {}", message.message_id);
452
453 self.untrack_message(message);
456 self.stats.write().messages_acknowledged += 1;
457
458 Ok(())
459 }
460
461 pub async fn nack(&self, message: &ReceivedMessage) -> Result<()> {
466 debug!("Not acknowledging message: {}", message.message_id);
467
468 self.untrack_message(message);
471 self.stats.write().messages_nacked += 1;
472
473 Ok(())
474 }
475
476 pub async fn extend_deadline(&self, message: &ReceivedMessage, seconds: i32) -> Result<()> {
481 debug!(
482 "Extending acknowledgment deadline for message: {} by {} seconds",
483 message.message_id, seconds
484 );
485
486 info!(
489 "Deadline extension for {} noted (managed by streaming pull)",
490 message.message_id
491 );
492
493 Ok(())
494 }
495
496 pub async fn send_to_dead_letter(&self, message: &ReceivedMessage) -> Result<()> {
498 let dlq_config = self.config.dead_letter_config.as_ref().ok_or_else(|| {
499 PubSubError::dead_letter("Dead letter queue not configured", &message.message_id)
500 })?;
501
502 info!(
503 "Sending message {} to dead letter queue: {}",
504 message.message_id, dlq_config.topic_name
505 );
506
507 self.acknowledge(message).await?;
509
510 self.stats.write().messages_to_dlq += 1;
511
512 Ok(())
513 }
514
515 pub async fn start<F>(&self, handler: F) -> Result<JoinHandle<()>>
519 where
520 F: Fn(ReceivedMessage) -> HandlerResult + Send + Sync + 'static,
521 {
522 if self.running.swap(true, Ordering::SeqCst) {
523 return Err(PubSubError::subscription("Subscriber already running"));
524 }
525
526 info!(
527 "Starting subscriber for subscription: {}",
528 self.config.subscription_name
529 );
530
531 let handler = Arc::new(handler);
532 let running = Arc::clone(&self.running);
533 let stats = Arc::clone(&self.stats);
534 let outstanding_messages = Arc::clone(&self.outstanding_messages);
535 let message_count = Arc::clone(&self.message_count);
536 let byte_count = Arc::clone(&self.byte_count);
537 let gcp_subscriber = Arc::clone(&self.gcp_subscriber);
538 let fq_subscription = self.fq_subscription.clone();
539 let dead_letter_config = self.config.dead_letter_config.clone();
540
541 let pull_task = tokio::spawn(async move {
542 let mut stream = gcp_subscriber.subscribe(&fq_subscription).build();
543
544 while running.load(Ordering::SeqCst) {
545 let result = tokio::time::timeout(Duration::from_millis(500), stream.next()).await;
546
547 match result {
548 Ok(Some(Ok((msg, stream_handler)))) => {
549 let received = ReceivedMessage {
550 message_id: msg.message_id.clone(),
551 data: msg.data.clone(),
552 attributes: msg.attributes.clone(),
553 publish_time: Utc::now(),
554 ordering_key: if msg.ordering_key.is_empty() {
555 None
556 } else {
557 Some(msg.ordering_key.clone())
558 },
559 delivery_attempt: 0,
560 ack_id: msg.message_id.clone(),
561 };
562
563 outstanding_messages.insert(received.message_id.clone(), received.clone());
565 message_count.fetch_add(1, Ordering::Relaxed);
566 byte_count.fetch_add(received.size() as u64, Ordering::Relaxed);
567 {
568 let mut s = stats.write();
569 s.messages_received += 1;
570 s.bytes_received += received.size() as u64;
571 s.outstanding_messages += 1;
572 s.outstanding_bytes += received.size() as u64;
573 s.last_receive = Some(Utc::now());
574 }
575
576 let result = handler(received.clone());
577 match result {
578 HandlerResult::Ack => {
579 stream_handler.ack();
580 stats.write().messages_acknowledged += 1;
581 }
582 HandlerResult::Nack => {
583 drop(stream_handler);
586 stats.write().messages_nacked += 1;
587 }
588 HandlerResult::DeadLetter => {
589 if dead_letter_config.is_some() {
590 stream_handler.ack();
591 stats.write().messages_to_dlq += 1;
592 } else {
593 drop(stream_handler);
594 error!(
595 "DLQ not configured for message: {}",
596 received.message_id
597 );
598 }
599 }
600 }
601
602 outstanding_messages.remove(&received.message_id);
604 message_count.fetch_sub(1, Ordering::Relaxed);
605 byte_count.fetch_sub(received.size() as u64, Ordering::Relaxed);
606 {
607 let mut s = stats.write();
608 s.outstanding_messages = s.outstanding_messages.saturating_sub(1);
609 s.outstanding_bytes =
610 s.outstanding_bytes.saturating_sub(received.size() as u64);
611 }
612 }
613 Ok(Some(Err(e))) => {
614 error!("Error receiving message: {}", e);
615 tokio::time::sleep(Duration::from_secs(1)).await;
616 }
617 Ok(None) => {
618 break;
620 }
621 Err(_) => {
622 }
624 }
625 }
626 });
627
628 Ok(pull_task)
629 }
630
631 pub fn stop(&self) {
633 info!(
634 "Stopping subscriber for subscription: {}",
635 self.config.subscription_name
636 );
637 self.running.store(false, Ordering::SeqCst);
638 }
639
640 fn check_flow_control(&self, messages: usize, bytes: usize) -> Result<()> {
642 let current_messages = self.message_count.load(Ordering::Relaxed) as usize;
643 let current_bytes = self.byte_count.load(Ordering::Relaxed) as usize;
644
645 if current_messages + messages > self.config.flow_control.max_outstanding_messages {
646 return Err(PubSubError::flow_control(
647 "Outstanding message limit exceeded",
648 current_messages + messages,
649 self.config.flow_control.max_outstanding_messages,
650 ));
651 }
652
653 if current_bytes + bytes > self.config.flow_control.max_outstanding_bytes {
654 return Err(PubSubError::flow_control(
655 "Outstanding bytes limit exceeded",
656 current_bytes + bytes,
657 self.config.flow_control.max_outstanding_bytes,
658 ));
659 }
660
661 Ok(())
662 }
663
664 fn track_message(&self, message: &ReceivedMessage) {
666 self.outstanding_messages
667 .insert(message.message_id.clone(), message.clone());
668 self.message_count.fetch_add(1, Ordering::Relaxed);
669 self.byte_count
670 .fetch_add(message.size() as u64, Ordering::Relaxed);
671
672 let mut stats = self.stats.write();
673 stats.messages_received += 1;
674 stats.bytes_received += message.size() as u64;
675 stats.outstanding_messages += 1;
676 stats.outstanding_bytes += message.size() as u64;
677 stats.last_receive = Some(Utc::now());
678 }
679
680 fn untrack_message(&self, message: &ReceivedMessage) {
682 self.outstanding_messages.remove(&message.message_id);
683 self.message_count.fetch_sub(1, Ordering::Relaxed);
684 self.byte_count
685 .fetch_sub(message.size() as u64, Ordering::Relaxed);
686
687 let mut stats = self.stats.write();
688 stats.outstanding_messages = stats.outstanding_messages.saturating_sub(1);
689 stats.outstanding_bytes = stats
690 .outstanding_bytes
691 .saturating_sub(message.size() as u64);
692 }
693
694 #[allow(dead_code)]
696 fn clone_arc(&self) -> Arc<Self> {
697 Arc::new(Self {
698 config: self.config.clone(),
699 gcp_subscriber: Arc::clone(&self.gcp_subscriber),
700 admin: Arc::clone(&self.admin),
701 fq_subscription: self.fq_subscription.clone(),
702 stats: Arc::clone(&self.stats),
703 outstanding_messages: Arc::clone(&self.outstanding_messages),
704 running: Arc::clone(&self.running),
705 message_count: Arc::clone(&self.message_count),
706 byte_count: Arc::clone(&self.byte_count),
707 })
708 }
709
710 pub fn stats(&self) -> SubscriberStats {
712 self.stats.read().clone()
713 }
714
715 pub fn reset_stats(&self) {
717 *self.stats.write() = SubscriberStats::default();
718 }
719
720 pub fn subscription_name(&self) -> &str {
722 &self.config.subscription_name
723 }
724
725 pub fn project_id(&self) -> &str {
727 &self.config.project_id
728 }
729
730 pub fn is_running(&self) -> bool {
732 self.running.load(Ordering::SeqCst)
733 }
734}
735
736#[cfg(test)]
737mod tests {
738 use super::*;
739
740 #[test]
741 fn test_subscriber_config() {
742 let config = SubscriberConfig::new("my-project", "my-subscription")
743 .with_type(SubscriptionType::Pull)
744 .with_ack_deadline(30)
745 .with_ordering(true);
746
747 assert_eq!(config.project_id, "my-project");
748 assert_eq!(config.subscription_name, "my-subscription");
749 assert_eq!(config.subscription_type, SubscriptionType::Pull);
750 assert_eq!(config.ack_deadline_seconds, 30);
751 assert!(config.enable_ordering);
752 }
753
754 #[test]
755 fn test_config_validation() {
756 let invalid_config = SubscriberConfig::default();
757 assert!(invalid_config.validate().is_err());
758
759 let valid_config = SubscriberConfig::new("project", "subscription");
760 assert!(valid_config.validate().is_ok());
761 }
762
763 #[test]
764 fn test_flow_control_settings() {
765 let settings = FlowControlSettings::default();
766 assert_eq!(
767 settings.max_outstanding_messages,
768 DEFAULT_MAX_OUTSTANDING_MESSAGES
769 );
770 assert_eq!(
771 settings.max_outstanding_bytes,
772 DEFAULT_MAX_OUTSTANDING_BYTES
773 );
774 }
775
776 #[test]
777 fn test_dead_letter_config() {
778 let config = DeadLetterConfig::new("dlq-topic", 5);
779 assert_eq!(config.topic_name, "dlq-topic");
780 assert_eq!(config.max_delivery_attempts, 5);
781 }
782
783 #[test]
784 fn test_received_message() {
785 let message = ReceivedMessage {
786 message_id: "msg-1".to_string(),
787 data: Bytes::from(b"test data".to_vec()),
788 attributes: HashMap::new(),
789 publish_time: Utc::now(),
790 ordering_key: None,
791 delivery_attempt: 1,
792 ack_id: "ack-1".to_string(),
793 };
794
795 assert_eq!(message.size(), 9);
796 assert!(!message.is_redelivery());
797
798 let redelivered = ReceivedMessage {
799 delivery_attempt: 2,
800 ..message.clone()
801 };
802 assert!(redelivered.is_redelivery());
803 }
804
805 #[test]
806 fn test_subscriber_stats() {
807 let stats = SubscriberStats::default();
808 assert_eq!(stats.messages_received, 0);
809 assert_eq!(stats.messages_acknowledged, 0);
810 assert_eq!(stats.messages_nacked, 0);
811 assert_eq!(stats.messages_to_dlq, 0);
812 }
813}