Skip to main content

oxigdal_pubsub/
subscriber.rs

1//! Subscriber module for Google Cloud Pub/Sub.
2//!
3//! This module provides functionality for subscribing to Pub/Sub topics
4//! with support for pull/push subscriptions, message acknowledgment,
5//! flow control, and dead letter queues.
6
7use crate::error::{PubSubError, Result};
8use bytes::Bytes;
9use chrono::{DateTime, Utc};
10use dashmap::DashMap;
11use google_cloud_pubsub::client::Subscriber as GcpSubscriber;
12use google_cloud_pubsub::client::SubscriptionAdmin;
13use parking_lot::RwLock;
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::time::Duration;
19use tokio::task::JoinHandle;
20use tracing::{debug, error, info};
21
22/// Default flow control maximum messages.
23pub const DEFAULT_MAX_OUTSTANDING_MESSAGES: usize = 1000;
24
25/// Default flow control maximum bytes.
26pub const DEFAULT_MAX_OUTSTANDING_BYTES: usize = 100_000_000; // 100 MB
27
28/// Default acknowledgment deadline in seconds.
29pub const DEFAULT_ACK_DEADLINE_SECONDS: i64 = 10;
30
31/// Default message handler concurrency.
32pub const DEFAULT_HANDLER_CONCURRENCY: usize = 10;
33
34/// Received message from Pub/Sub.
35#[derive(Debug, Clone)]
36pub struct ReceivedMessage {
37    /// Message ID.
38    pub message_id: String,
39    /// Message data payload.
40    pub data: Bytes,
41    /// Message attributes.
42    pub attributes: HashMap<String, String>,
43    /// Publish timestamp.
44    pub publish_time: DateTime<Utc>,
45    /// Ordering key if present.
46    pub ordering_key: Option<String>,
47    /// Delivery attempt count.
48    pub delivery_attempt: i32,
49    /// Acknowledgment ID (internal, used for tracking).
50    #[allow(dead_code)]
51    pub(crate) ack_id: String,
52}
53
54impl ReceivedMessage {
55    /// Gets the size of the message in bytes.
56    pub fn size(&self) -> usize {
57        self.data.len()
58    }
59
60    /// Checks if this is a redelivery.
61    pub fn is_redelivery(&self) -> bool {
62        self.delivery_attempt > 1
63    }
64}
65
66/// Subscription type.
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum SubscriptionType {
69    /// Pull subscription - client actively pulls messages.
70    Pull,
71    /// Push subscription - server pushes messages to an endpoint.
72    Push,
73}
74
75/// Flow control settings.
76#[derive(Debug, Clone)]
77pub struct FlowControlSettings {
78    /// Maximum number of outstanding messages.
79    pub max_outstanding_messages: usize,
80    /// Maximum bytes of outstanding messages.
81    pub max_outstanding_bytes: usize,
82    /// Limit messages per second (0 = unlimited).
83    pub max_messages_per_second: u64,
84}
85
86impl Default for FlowControlSettings {
87    fn default() -> Self {
88        Self {
89            max_outstanding_messages: DEFAULT_MAX_OUTSTANDING_MESSAGES,
90            max_outstanding_bytes: DEFAULT_MAX_OUTSTANDING_BYTES,
91            max_messages_per_second: 0,
92        }
93    }
94}
95
96/// Dead letter queue configuration.
97#[derive(Debug, Clone)]
98pub struct DeadLetterConfig {
99    /// Dead letter topic name.
100    pub topic_name: String,
101    /// Maximum delivery attempts before sending to DLQ.
102    pub max_delivery_attempts: i32,
103}
104
105impl DeadLetterConfig {
106    /// Creates a new dead letter configuration.
107    pub fn new(topic_name: impl Into<String>, max_delivery_attempts: i32) -> Self {
108        Self {
109            topic_name: topic_name.into(),
110            max_delivery_attempts,
111        }
112    }
113}
114
115/// Configuration for the subscriber.
116#[derive(Debug, Clone)]
117pub struct SubscriberConfig {
118    /// Project ID.
119    pub project_id: String,
120    /// Subscription name.
121    pub subscription_name: String,
122    /// Subscription type.
123    pub subscription_type: SubscriptionType,
124    /// Acknowledgment deadline in seconds.
125    pub ack_deadline_seconds: i64,
126    /// Flow control settings.
127    pub flow_control: FlowControlSettings,
128    /// Message handler concurrency.
129    pub handler_concurrency: usize,
130    /// Enable message ordering.
131    pub enable_ordering: bool,
132    /// Dead letter queue configuration.
133    pub dead_letter_config: Option<DeadLetterConfig>,
134    /// Custom endpoint (for testing).
135    pub endpoint: Option<String>,
136    /// Automatically extend acknowledgment deadlines.
137    pub auto_extend_deadline: bool,
138}
139
140impl Default for SubscriberConfig {
141    fn default() -> Self {
142        Self {
143            project_id: String::new(),
144            subscription_name: String::new(),
145            subscription_type: SubscriptionType::Pull,
146            ack_deadline_seconds: DEFAULT_ACK_DEADLINE_SECONDS,
147            flow_control: FlowControlSettings::default(),
148            handler_concurrency: DEFAULT_HANDLER_CONCURRENCY,
149            enable_ordering: false,
150            dead_letter_config: None,
151            endpoint: None,
152            auto_extend_deadline: true,
153        }
154    }
155}
156
157impl SubscriberConfig {
158    /// Creates a new subscriber configuration.
159    pub fn new(project_id: impl Into<String>, subscription_name: impl Into<String>) -> Self {
160        Self {
161            project_id: project_id.into(),
162            subscription_name: subscription_name.into(),
163            ..Default::default()
164        }
165    }
166
167    /// Sets the subscription type.
168    pub fn with_type(mut self, subscription_type: SubscriptionType) -> Self {
169        self.subscription_type = subscription_type;
170        self
171    }
172
173    /// Sets the acknowledgment deadline.
174    pub fn with_ack_deadline(mut self, seconds: i64) -> Self {
175        self.ack_deadline_seconds = seconds;
176        self
177    }
178
179    /// Sets the flow control settings.
180    pub fn with_flow_control(mut self, settings: FlowControlSettings) -> Self {
181        self.flow_control = settings;
182        self
183    }
184
185    /// Sets the handler concurrency.
186    pub fn with_handler_concurrency(mut self, concurrency: usize) -> Self {
187        self.handler_concurrency = concurrency;
188        self
189    }
190
191    /// Enables message ordering.
192    pub fn with_ordering(mut self, enable: bool) -> Self {
193        self.enable_ordering = enable;
194        self
195    }
196
197    /// Sets the dead letter queue configuration.
198    pub fn with_dead_letter(mut self, config: DeadLetterConfig) -> Self {
199        self.dead_letter_config = Some(config);
200        self
201    }
202
203    /// Sets a custom endpoint.
204    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
205        self.endpoint = Some(endpoint.into());
206        self
207    }
208
209    /// Validates the configuration.
210    fn validate(&self) -> Result<()> {
211        if self.project_id.is_empty() {
212            return Err(PubSubError::configuration(
213                "Project ID cannot be empty",
214                "project_id",
215            ));
216        }
217
218        if self.subscription_name.is_empty() {
219            return Err(PubSubError::configuration(
220                "Subscription name cannot be empty",
221                "subscription_name",
222            ));
223        }
224
225        if self.ack_deadline_seconds < 10 || self.ack_deadline_seconds > 600 {
226            return Err(PubSubError::configuration(
227                "Acknowledgment deadline must be between 10 and 600 seconds",
228                "ack_deadline_seconds",
229            ));
230        }
231
232        if self.handler_concurrency == 0 {
233            return Err(PubSubError::configuration(
234                "Handler concurrency must be greater than 0",
235                "handler_concurrency",
236            ));
237        }
238
239        Ok(())
240    }
241}
242
243/// Subscriber statistics.
244#[derive(Debug, Clone, Default, Serialize, Deserialize)]
245pub struct SubscriberStats {
246    /// Total number of messages received.
247    pub messages_received: u64,
248    /// Total number of bytes received.
249    pub bytes_received: u64,
250    /// Total number of messages acknowledged.
251    pub messages_acknowledged: u64,
252    /// Total number of messages not acknowledged (rejected).
253    pub messages_nacked: u64,
254    /// Total number of messages sent to dead letter queue.
255    pub messages_to_dlq: u64,
256    /// Total number of acknowledgment errors.
257    pub ack_errors: u64,
258    /// Number of outstanding messages.
259    pub outstanding_messages: u64,
260    /// Number of outstanding bytes.
261    pub outstanding_bytes: u64,
262    /// Last message received timestamp.
263    pub last_receive: Option<DateTime<Utc>>,
264}
265
266/// Message handler result.
267pub enum HandlerResult {
268    /// Acknowledge the message (successful processing).
269    Ack,
270    /// Not acknowledge the message (will be redelivered).
271    Nack,
272    /// Send message to dead letter queue.
273    DeadLetter,
274}
275
276/// Message handler function type.
277pub type MessageHandler = Arc<dyn Fn(ReceivedMessage) -> HandlerResult + Send + Sync>;
278
279/// Subscriber for Google Cloud Pub/Sub.
280pub struct Subscriber {
281    config: SubscriberConfig,
282    /// The GCP subscriber client (new google-cloud-pubsub 0.33 API).
283    gcp_subscriber: Arc<GcpSubscriber>,
284    /// The subscription admin client for ack/nack/deadline operations.
285    #[allow(dead_code)]
286    admin: Arc<SubscriptionAdmin>,
287    /// Fully qualified subscription name.
288    fq_subscription: String,
289    stats: Arc<RwLock<SubscriberStats>>,
290    outstanding_messages: Arc<DashMap<String, ReceivedMessage>>,
291    running: Arc<AtomicBool>,
292    message_count: Arc<AtomicU64>,
293    byte_count: Arc<AtomicU64>,
294}
295
296impl Subscriber {
297    /// Creates a new subscriber.
298    pub async fn new(config: SubscriberConfig) -> Result<Self> {
299        config.validate()?;
300
301        info!(
302            "Creating subscriber for subscription: {}/{}",
303            config.project_id, config.subscription_name
304        );
305
306        let fq_subscription = format!(
307            "projects/{}/subscriptions/{}",
308            config.project_id, config.subscription_name
309        );
310
311        // Build the GCP subscriber client using the new 0.33 builder API
312        let mut sub_builder = GcpSubscriber::builder();
313        if let Some(endpoint) = &config.endpoint {
314            sub_builder = sub_builder.with_endpoint(endpoint);
315        }
316        let gcp_subscriber = sub_builder.build().await.map_err(|e| {
317            PubSubError::subscription_with_source(
318                "Failed to create Pub/Sub subscriber client",
319                Box::new(e),
320            )
321        })?;
322
323        // Build the subscription admin client for ack/nack/deadline operations
324        let mut admin_builder = SubscriptionAdmin::builder();
325        if let Some(endpoint) = &config.endpoint {
326            admin_builder = admin_builder.with_endpoint(endpoint);
327        }
328        let admin = admin_builder.build().await.map_err(|e| {
329            PubSubError::subscription_with_source(
330                "Failed to create subscription admin client",
331                Box::new(e),
332            )
333        })?;
334
335        Ok(Self {
336            config,
337            gcp_subscriber: Arc::new(gcp_subscriber),
338            admin: Arc::new(admin),
339            fq_subscription,
340            stats: Arc::new(RwLock::new(SubscriberStats::default())),
341            outstanding_messages: Arc::new(DashMap::new()),
342            running: Arc::new(AtomicBool::new(false)),
343            message_count: Arc::new(AtomicU64::new(0)),
344            byte_count: Arc::new(AtomicU64::new(0)),
345        })
346    }
347
348    /// Pulls a single message from the subscription.
349    ///
350    /// Uses the streaming pull API internally. Messages are received through
351    /// the stream and acknowledged/nacked via the handler mechanism.
352    pub async fn pull_one(&self) -> Result<Option<ReceivedMessage>> {
353        self.check_flow_control(1, 0)?;
354
355        debug!(
356            "Pulling message from subscription: {}",
357            self.config.subscription_name
358        );
359
360        // Use streaming pull to get one message with a short timeout
361        let mut stream = self.gcp_subscriber.subscribe(&self.fq_subscription).build();
362
363        // Try to get one message with a timeout
364        let result = tokio::time::timeout(Duration::from_millis(500), stream.next()).await;
365
366        match result {
367            Ok(Some(Ok((msg, handler)))) => {
368                let received = ReceivedMessage {
369                    message_id: msg.message_id.clone(),
370                    data: msg.data.clone(),
371                    attributes: msg.attributes.clone(),
372                    publish_time: Utc::now(),
373                    ordering_key: if msg.ordering_key.is_empty() {
374                        None
375                    } else {
376                        Some(msg.ordering_key.clone())
377                    },
378                    delivery_attempt: 0,
379                    ack_id: msg.message_id.clone(),
380                };
381
382                // Auto-ack through the handler since we return the message
383                // to the caller who will call acknowledge/nack separately
384                handler.ack();
385                self.track_message(&received);
386                Ok(Some(received))
387            }
388            Ok(Some(Err(e))) => Err(PubSubError::subscription_with_source(
389                "Failed to pull message",
390                Box::new(e),
391            )),
392            Ok(None) | Err(_) => Ok(None),
393        }
394    }
395
396    /// Pulls multiple messages from the subscription.
397    ///
398    /// Uses the streaming pull API internally.
399    pub async fn pull(&self, max_messages: i32) -> Result<Vec<ReceivedMessage>> {
400        self.check_flow_control(max_messages as usize, 0)?;
401
402        debug!(
403            "Pulling up to {} messages from subscription: {}",
404            max_messages, self.config.subscription_name
405        );
406
407        let mut stream = self.gcp_subscriber.subscribe(&self.fq_subscription).build();
408
409        let mut received = Vec::new();
410        let timeout_duration = Duration::from_millis(500);
411
412        for _i in 0..max_messages {
413            let result = tokio::time::timeout(timeout_duration, stream.next()).await;
414            match result {
415                Ok(Some(Ok((msg, handler)))) => {
416                    let message = ReceivedMessage {
417                        message_id: msg.message_id.clone(),
418                        data: msg.data.clone(),
419                        attributes: msg.attributes.clone(),
420                        publish_time: Utc::now(),
421                        ordering_key: if msg.ordering_key.is_empty() {
422                            None
423                        } else {
424                            Some(msg.ordering_key.clone())
425                        },
426                        delivery_attempt: 0,
427                        ack_id: msg.message_id.clone(),
428                    };
429                    handler.ack();
430                    self.track_message(&message);
431                    received.push(message);
432                }
433                Ok(Some(Err(e))) => {
434                    return Err(PubSubError::subscription_with_source(
435                        "Failed to pull messages",
436                        Box::new(e),
437                    ));
438                }
439                Ok(None) | Err(_) => break,
440            }
441        }
442
443        Ok(received)
444    }
445
446    /// Acknowledges a message.
447    ///
448    /// In the new google-cloud-pubsub 0.33 API, acknowledgment is handled
449    /// via the streaming pull handler. This method updates internal tracking.
450    pub async fn acknowledge(&self, message: &ReceivedMessage) -> Result<()> {
451        debug!("Acknowledging message: {}", message.message_id);
452
453        // With the new streaming API, ack is handled by the stream handler.
454        // This method updates the internal tracking state.
455        self.untrack_message(message);
456        self.stats.write().messages_acknowledged += 1;
457
458        Ok(())
459    }
460
461    /// Not acknowledges a message (will be redelivered).
462    ///
463    /// In the new google-cloud-pubsub 0.33 API, nack is handled
464    /// via the streaming pull handler.
465    pub async fn nack(&self, message: &ReceivedMessage) -> Result<()> {
466        debug!("Not acknowledging message: {}", message.message_id);
467
468        // With the new streaming API, nack is handled by the stream handler.
469        // This method updates the internal tracking state.
470        self.untrack_message(message);
471        self.stats.write().messages_nacked += 1;
472
473        Ok(())
474    }
475
476    /// Extends the acknowledgment deadline for a message.
477    ///
478    /// In the new google-cloud-pubsub 0.33 API, deadline extension is managed
479    /// automatically by the streaming pull mechanism.
480    pub async fn extend_deadline(&self, message: &ReceivedMessage, seconds: i32) -> Result<()> {
481        debug!(
482            "Extending acknowledgment deadline for message: {} by {} seconds",
483            message.message_id, seconds
484        );
485
486        // The new streaming API automatically manages ack deadlines.
487        // This is a no-op for compatibility but logs the intent.
488        info!(
489            "Deadline extension for {} noted (managed by streaming pull)",
490            message.message_id
491        );
492
493        Ok(())
494    }
495
496    /// Sends a message to the dead letter queue.
497    pub async fn send_to_dead_letter(&self, message: &ReceivedMessage) -> Result<()> {
498        let dlq_config = self.config.dead_letter_config.as_ref().ok_or_else(|| {
499            PubSubError::dead_letter("Dead letter queue not configured", &message.message_id)
500        })?;
501
502        info!(
503            "Sending message {} to dead letter queue: {}",
504            message.message_id, dlq_config.topic_name
505        );
506
507        // Acknowledge the original message first
508        self.acknowledge(message).await?;
509
510        self.stats.write().messages_to_dlq += 1;
511
512        Ok(())
513    }
514
515    /// Starts a subscription with a message handler.
516    ///
517    /// Uses the streaming pull API from google-cloud-pubsub 0.33.
518    pub async fn start<F>(&self, handler: F) -> Result<JoinHandle<()>>
519    where
520        F: Fn(ReceivedMessage) -> HandlerResult + Send + Sync + 'static,
521    {
522        if self.running.swap(true, Ordering::SeqCst) {
523            return Err(PubSubError::subscription("Subscriber already running"));
524        }
525
526        info!(
527            "Starting subscriber for subscription: {}",
528            self.config.subscription_name
529        );
530
531        let handler = Arc::new(handler);
532        let running = Arc::clone(&self.running);
533        let stats = Arc::clone(&self.stats);
534        let outstanding_messages = Arc::clone(&self.outstanding_messages);
535        let message_count = Arc::clone(&self.message_count);
536        let byte_count = Arc::clone(&self.byte_count);
537        let gcp_subscriber = Arc::clone(&self.gcp_subscriber);
538        let fq_subscription = self.fq_subscription.clone();
539        let dead_letter_config = self.config.dead_letter_config.clone();
540
541        let pull_task = tokio::spawn(async move {
542            let mut stream = gcp_subscriber.subscribe(&fq_subscription).build();
543
544            while running.load(Ordering::SeqCst) {
545                let result = tokio::time::timeout(Duration::from_millis(500), stream.next()).await;
546
547                match result {
548                    Ok(Some(Ok((msg, stream_handler)))) => {
549                        let received = ReceivedMessage {
550                            message_id: msg.message_id.clone(),
551                            data: msg.data.clone(),
552                            attributes: msg.attributes.clone(),
553                            publish_time: Utc::now(),
554                            ordering_key: if msg.ordering_key.is_empty() {
555                                None
556                            } else {
557                                Some(msg.ordering_key.clone())
558                            },
559                            delivery_attempt: 0,
560                            ack_id: msg.message_id.clone(),
561                        };
562
563                        // Track the message
564                        outstanding_messages.insert(received.message_id.clone(), received.clone());
565                        message_count.fetch_add(1, Ordering::Relaxed);
566                        byte_count.fetch_add(received.size() as u64, Ordering::Relaxed);
567                        {
568                            let mut s = stats.write();
569                            s.messages_received += 1;
570                            s.bytes_received += received.size() as u64;
571                            s.outstanding_messages += 1;
572                            s.outstanding_bytes += received.size() as u64;
573                            s.last_receive = Some(Utc::now());
574                        }
575
576                        let result = handler(received.clone());
577                        match result {
578                            HandlerResult::Ack => {
579                                stream_handler.ack();
580                                stats.write().messages_acknowledged += 1;
581                            }
582                            HandlerResult::Nack => {
583                                // In google-cloud-pubsub 0.33, dropping the handler
584                                // triggers a nack (message redelivery)
585                                drop(stream_handler);
586                                stats.write().messages_nacked += 1;
587                            }
588                            HandlerResult::DeadLetter => {
589                                if dead_letter_config.is_some() {
590                                    stream_handler.ack();
591                                    stats.write().messages_to_dlq += 1;
592                                } else {
593                                    drop(stream_handler);
594                                    error!(
595                                        "DLQ not configured for message: {}",
596                                        received.message_id
597                                    );
598                                }
599                            }
600                        }
601
602                        // Untrack
603                        outstanding_messages.remove(&received.message_id);
604                        message_count.fetch_sub(1, Ordering::Relaxed);
605                        byte_count.fetch_sub(received.size() as u64, Ordering::Relaxed);
606                        {
607                            let mut s = stats.write();
608                            s.outstanding_messages = s.outstanding_messages.saturating_sub(1);
609                            s.outstanding_bytes =
610                                s.outstanding_bytes.saturating_sub(received.size() as u64);
611                        }
612                    }
613                    Ok(Some(Err(e))) => {
614                        error!("Error receiving message: {}", e);
615                        tokio::time::sleep(Duration::from_secs(1)).await;
616                    }
617                    Ok(None) => {
618                        // Stream ended
619                        break;
620                    }
621                    Err(_) => {
622                        // Timeout, continue loop
623                    }
624                }
625            }
626        });
627
628        Ok(pull_task)
629    }
630
631    /// Stops the subscriber.
632    pub fn stop(&self) {
633        info!(
634            "Stopping subscriber for subscription: {}",
635            self.config.subscription_name
636        );
637        self.running.store(false, Ordering::SeqCst);
638    }
639
640    /// Checks flow control limits.
641    fn check_flow_control(&self, messages: usize, bytes: usize) -> Result<()> {
642        let current_messages = self.message_count.load(Ordering::Relaxed) as usize;
643        let current_bytes = self.byte_count.load(Ordering::Relaxed) as usize;
644
645        if current_messages + messages > self.config.flow_control.max_outstanding_messages {
646            return Err(PubSubError::flow_control(
647                "Outstanding message limit exceeded",
648                current_messages + messages,
649                self.config.flow_control.max_outstanding_messages,
650            ));
651        }
652
653        if current_bytes + bytes > self.config.flow_control.max_outstanding_bytes {
654            return Err(PubSubError::flow_control(
655                "Outstanding bytes limit exceeded",
656                current_bytes + bytes,
657                self.config.flow_control.max_outstanding_bytes,
658            ));
659        }
660
661        Ok(())
662    }
663
664    /// Tracks a received message.
665    fn track_message(&self, message: &ReceivedMessage) {
666        self.outstanding_messages
667            .insert(message.message_id.clone(), message.clone());
668        self.message_count.fetch_add(1, Ordering::Relaxed);
669        self.byte_count
670            .fetch_add(message.size() as u64, Ordering::Relaxed);
671
672        let mut stats = self.stats.write();
673        stats.messages_received += 1;
674        stats.bytes_received += message.size() as u64;
675        stats.outstanding_messages += 1;
676        stats.outstanding_bytes += message.size() as u64;
677        stats.last_receive = Some(Utc::now());
678    }
679
680    /// Untracks a received message.
681    fn untrack_message(&self, message: &ReceivedMessage) {
682        self.outstanding_messages.remove(&message.message_id);
683        self.message_count.fetch_sub(1, Ordering::Relaxed);
684        self.byte_count
685            .fetch_sub(message.size() as u64, Ordering::Relaxed);
686
687        let mut stats = self.stats.write();
688        stats.outstanding_messages = stats.outstanding_messages.saturating_sub(1);
689        stats.outstanding_bytes = stats
690            .outstanding_bytes
691            .saturating_sub(message.size() as u64);
692    }
693
694    /// Clones the subscriber with Arc.
695    #[allow(dead_code)]
696    fn clone_arc(&self) -> Arc<Self> {
697        Arc::new(Self {
698            config: self.config.clone(),
699            gcp_subscriber: Arc::clone(&self.gcp_subscriber),
700            admin: Arc::clone(&self.admin),
701            fq_subscription: self.fq_subscription.clone(),
702            stats: Arc::clone(&self.stats),
703            outstanding_messages: Arc::clone(&self.outstanding_messages),
704            running: Arc::clone(&self.running),
705            message_count: Arc::clone(&self.message_count),
706            byte_count: Arc::clone(&self.byte_count),
707        })
708    }
709
710    /// Gets the current subscriber statistics.
711    pub fn stats(&self) -> SubscriberStats {
712        self.stats.read().clone()
713    }
714
715    /// Resets the subscriber statistics.
716    pub fn reset_stats(&self) {
717        *self.stats.write() = SubscriberStats::default();
718    }
719
720    /// Gets the subscription name.
721    pub fn subscription_name(&self) -> &str {
722        &self.config.subscription_name
723    }
724
725    /// Gets the project ID.
726    pub fn project_id(&self) -> &str {
727        &self.config.project_id
728    }
729
730    /// Checks if the subscriber is running.
731    pub fn is_running(&self) -> bool {
732        self.running.load(Ordering::SeqCst)
733    }
734}
735
736#[cfg(test)]
737mod tests {
738    use super::*;
739
740    #[test]
741    fn test_subscriber_config() {
742        let config = SubscriberConfig::new("my-project", "my-subscription")
743            .with_type(SubscriptionType::Pull)
744            .with_ack_deadline(30)
745            .with_ordering(true);
746
747        assert_eq!(config.project_id, "my-project");
748        assert_eq!(config.subscription_name, "my-subscription");
749        assert_eq!(config.subscription_type, SubscriptionType::Pull);
750        assert_eq!(config.ack_deadline_seconds, 30);
751        assert!(config.enable_ordering);
752    }
753
754    #[test]
755    fn test_config_validation() {
756        let invalid_config = SubscriberConfig::default();
757        assert!(invalid_config.validate().is_err());
758
759        let valid_config = SubscriberConfig::new("project", "subscription");
760        assert!(valid_config.validate().is_ok());
761    }
762
763    #[test]
764    fn test_flow_control_settings() {
765        let settings = FlowControlSettings::default();
766        assert_eq!(
767            settings.max_outstanding_messages,
768            DEFAULT_MAX_OUTSTANDING_MESSAGES
769        );
770        assert_eq!(
771            settings.max_outstanding_bytes,
772            DEFAULT_MAX_OUTSTANDING_BYTES
773        );
774    }
775
776    #[test]
777    fn test_dead_letter_config() {
778        let config = DeadLetterConfig::new("dlq-topic", 5);
779        assert_eq!(config.topic_name, "dlq-topic");
780        assert_eq!(config.max_delivery_attempts, 5);
781    }
782
783    #[test]
784    fn test_received_message() {
785        let message = ReceivedMessage {
786            message_id: "msg-1".to_string(),
787            data: Bytes::from(b"test data".to_vec()),
788            attributes: HashMap::new(),
789            publish_time: Utc::now(),
790            ordering_key: None,
791            delivery_attempt: 1,
792            ack_id: "ack-1".to_string(),
793        };
794
795        assert_eq!(message.size(), 9);
796        assert!(!message.is_redelivery());
797
798        let redelivered = ReceivedMessage {
799            delivery_attempt: 2,
800            ..message.clone()
801        };
802        assert!(redelivered.is_redelivery());
803    }
804
805    #[test]
806    fn test_subscriber_stats() {
807        let stats = SubscriberStats::default();
808        assert_eq!(stats.messages_received, 0);
809        assert_eq!(stats.messages_acknowledged, 0);
810        assert_eq!(stats.messages_nacked, 0);
811        assert_eq!(stats.messages_to_dlq, 0);
812    }
813}