Skip to main content

queue_runtime/
provider.rs

1//! Provider types and configuration.
2
3use chrono::Duration;
4use serde::{Deserialize, Serialize};
5use std::fmt;
6
7/// Enumeration of supported queue providers
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
9pub enum ProviderType {
10    AzureServiceBus,
11    AwsSqs,
12    InMemory,
13    RabbitMq,
14    Nats,
15}
16
17impl ProviderType {
18    /// Get session support level for provider
19    pub fn supports_sessions(&self) -> SessionSupport {
20        match self {
21            Self::AzureServiceBus => SessionSupport::Native,
22            Self::AwsSqs => SessionSupport::Emulated, // Via FIFO queues
23            Self::InMemory => SessionSupport::Native,
24            Self::RabbitMq => SessionSupport::Emulated, // Via routing keys and in-memory tracking
25            Self::Nats => SessionSupport::Emulated,     // Via JetStream consumer filter subjects
26        }
27    }
28
29    /// Check if provider supports batch operations
30    pub fn supports_batching(&self) -> bool {
31        match self {
32            Self::AzureServiceBus => true,
33            Self::AwsSqs => true,
34            Self::InMemory => true,
35            Self::RabbitMq => true,
36            Self::Nats => true,
37        }
38    }
39
40    /// Get maximum message size for provider
41    ///
42    /// # Notes
43    ///
44    /// The NATS limit (1 MB) reflects the JetStream server default
45    /// (`max_payload` setting).  Operators who raise `max_payload` on the
46    /// server can publish larger messages; the client-side check here will
47    /// then be the binding limit until a matching configuration option is
48    /// added.  See the [`NatsConfig`] documentation for guidance.
49    pub fn max_message_size(&self) -> usize {
50        match self {
51            Self::AzureServiceBus => 1024 * 1024, // 1MB
52            Self::AwsSqs => 256 * 1024,           // 256KB
53            Self::InMemory => 10 * 1024 * 1024,   // 10MB
54            Self::RabbitMq => 128 * 1024 * 1024,  // 128MB (configurable, practical limit)
55            Self::Nats => 1024 * 1024,            // 1MB (NATS server default; see note above)
56        }
57    }
58}
59
60/// Level of session support provided by different providers
61#[derive(Debug, Clone, PartialEq, Eq)]
62pub enum SessionSupport {
63    /// Provider has built-in session support (Azure Service Bus)
64    Native,
65    /// Provider emulates sessions via other mechanisms (AWS SQS FIFO)
66    Emulated,
67}
68
69/// Configuration for queue client initialization
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct QueueConfig {
72    pub provider: ProviderConfig,
73    pub default_timeout: Duration,
74    pub max_retry_attempts: u32,
75    pub retry_base_delay: Duration,
76    pub enable_dead_letter: bool,
77}
78
79impl Default for QueueConfig {
80    fn default() -> Self {
81        Self {
82            provider: ProviderConfig::InMemory(InMemoryConfig::default()),
83            default_timeout: Duration::seconds(30),
84            max_retry_attempts: 3,
85            retry_base_delay: Duration::seconds(1),
86            enable_dead_letter: true,
87        }
88    }
89}
90
91/// Provider-specific configuration
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub enum ProviderConfig {
94    AzureServiceBus(AzureServiceBusConfig),
95    AwsSqs(AwsSqsConfig),
96    InMemory(InMemoryConfig),
97    RabbitMq(crate::providers::rabbitmq::RabbitMqConfig),
98    Nats(crate::providers::nats::NatsConfig),
99}
100
101/// Azure Service Bus configuration
102#[derive(Clone, Serialize, Deserialize)]
103pub struct AzureServiceBusConfig {
104    #[serde(skip_serializing)]
105    pub connection_string: Option<String>,
106    pub namespace: Option<String>,
107    #[serde(skip, default = "default_auth_method")]
108    pub auth_method: crate::providers::AzureAuthMethod,
109    pub use_sessions: bool,
110    pub session_timeout: Duration,
111}
112
113fn default_auth_method() -> crate::providers::AzureAuthMethod {
114    crate::providers::AzureAuthMethod::DefaultCredential
115}
116
117impl fmt::Debug for AzureServiceBusConfig {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        f.debug_struct("AzureServiceBusConfig")
120            .field(
121                "connection_string",
122                &self.connection_string.as_ref().map(|_| "<REDACTED>"),
123            )
124            .field("namespace", &self.namespace)
125            .field("auth_method", &self.auth_method)
126            .field("use_sessions", &self.use_sessions)
127            .field("session_timeout", &self.session_timeout)
128            .finish()
129    }
130}
131
132/// AWS SQS configuration
133#[derive(Clone, Serialize, Deserialize)]
134pub struct AwsSqsConfig {
135    pub region: String,
136    pub access_key_id: Option<String>,
137    /// AWS secret access key. Not serialized to prevent accidental exposure in
138    /// config files or logs; must be supplied at runtime (e.g. from an
139    /// environment variable or secrets manager).
140    #[serde(skip)]
141    pub secret_access_key: Option<String>,
142    pub use_fifo_queues: bool,
143}
144
145impl fmt::Debug for AwsSqsConfig {
146    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147        f.debug_struct("AwsSqsConfig")
148            .field("region", &self.region)
149            .field("access_key_id", &self.access_key_id)
150            .field(
151                "secret_access_key",
152                &self.secret_access_key.as_ref().map(|_| "<REDACTED>"),
153            )
154            .field("use_fifo_queues", &self.use_fifo_queues)
155            .finish()
156    }
157}
158
159/// In-memory provider configuration
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct InMemoryConfig {
162    pub max_queue_size: usize,
163    pub enable_persistence: bool,
164    pub max_delivery_count: u32,
165    pub default_message_ttl: Option<Duration>,
166    pub enable_dead_letter_queue: bool,
167    pub session_lock_duration: Duration,
168}
169
170impl Default for InMemoryConfig {
171    fn default() -> Self {
172        Self {
173            max_queue_size: 10000,
174            enable_persistence: false,
175            max_delivery_count: 3,
176            default_message_ttl: None,
177            enable_dead_letter_queue: true,
178            session_lock_duration: Duration::minutes(5),
179        }
180    }
181}
182
183#[cfg(test)]
184#[path = "provider_tests.rs"]
185mod tests;