Skip to main content

tasker_pgmq/
config.rs

1//! # Configuration for tasker-pgmq
2//!
3//! This module provides configuration structures for customizing PGMQ notification
4//! behavior, including queue naming patterns, channel prefixes, and notification settings.
5
6use regex::Regex;
7use serde::{Deserialize, Serialize};
8use std::collections::HashSet;
9
10use crate::error::{PgmqNotifyError, Result};
11
12/// Maximum length for PostgreSQL identifiers (NAMEDATALEN - 1)
13const MAX_PG_IDENTIFIER_LENGTH: usize = 63;
14
15/// Configuration for PGMQ notification behavior
16///
17/// This struct controls how PGMQ notifications are generated, formatted, and delivered.
18/// It allows customization of queue naming patterns, notification channels, and
19/// automatic listening behavior.
20///
21/// # Examples
22///
23/// ```rust
24/// use tasker_pgmq::PgmqNotifyConfig;
25/// use std::collections::HashSet;
26///
27/// // Basic configuration with defaults
28/// let config = PgmqNotifyConfig::new();
29/// assert_eq!(config.queue_naming_pattern, r"(?P<namespace>\w+)_queue");
30/// assert!(!config.enable_triggers); // Triggers disabled by default
31///
32/// // Custom configuration with namespace listening
33/// let mut namespaces = HashSet::new();
34/// namespaces.insert("orders".to_string());
35/// namespaces.insert("inventory".to_string());
36///
37/// let config = PgmqNotifyConfig {
38///     queue_naming_pattern: r"(?P<namespace>\w+)_messages".to_string(),
39///     channels_prefix: Some("prod".to_string()),
40///     enable_triggers: true,
41///     default_namespaces: namespaces,
42///     max_payload_size: 4000,
43///     include_metadata: false,
44/// };
45///
46/// assert_eq!(config.channels_prefix, Some("prod".to_string()));
47/// assert!(config.default_namespaces.contains("orders"));
48/// ```
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct PgmqNotifyConfig {
51    /// Pattern for extracting namespace from queue names
52    /// Should contain a named capture group "namespace"
53    /// Default: `r"(?P<namespace>\w+)_queue"` matches "`orders_queue`" -> "orders"
54    pub queue_naming_pattern: String,
55
56    /// Optional prefix for all notification channels to avoid conflicts
57    /// Example: "app1" results in channels like "`app1.pgmq_queue_created`"
58    pub channels_prefix: Option<String>,
59
60    /// Whether to enable database triggers for automatic notifications
61    /// If false, relies on application-level emitters
62    pub enable_triggers: bool,
63
64    /// Default namespaces to auto-listen for `message_ready` events
65    pub default_namespaces: HashSet<String>,
66
67    /// Maximum payload size in bytes (`pg_notify` limit is 8000)
68    pub max_payload_size: usize,
69
70    /// Whether to include queue metadata in notifications
71    pub include_metadata: bool,
72}
73
74impl Default for PgmqNotifyConfig {
75    fn default() -> Self {
76        Self {
77            queue_naming_pattern: r"(?P<namespace>\w+)_queue".to_string(),
78            channels_prefix: None,
79            enable_triggers: false,
80            default_namespaces: HashSet::new(),
81            max_payload_size: 7800, // Leave buffer under 8KB limit
82            include_metadata: true,
83        }
84    }
85}
86
87impl PgmqNotifyConfig {
88    /// Create a new configuration with defaults
89    #[must_use]
90    pub fn new() -> Self {
91        Self::default()
92    }
93
94    /// Set the queue naming pattern for namespace extraction
95    pub fn with_queue_naming_pattern<S: Into<String>>(mut self, pattern: S) -> Self {
96        self.queue_naming_pattern = pattern.into();
97        self
98    }
99
100    /// Set the channels prefix to avoid conflicts
101    pub fn with_channels_prefix<S: Into<String>>(mut self, prefix: S) -> Self {
102        self.channels_prefix = Some(prefix.into());
103        self
104    }
105
106    /// Enable or disable database triggers
107    #[must_use]
108    pub fn with_triggers_enabled(mut self, enabled: bool) -> Self {
109        self.enable_triggers = enabled;
110        self
111    }
112
113    /// Add a default namespace to auto-listen
114    pub fn with_default_namespace<S: Into<String>>(mut self, namespace: S) -> Self {
115        self.default_namespaces.insert(namespace.into());
116        self
117    }
118
119    /// Add multiple default namespaces
120    pub fn with_default_namespaces<I, S>(mut self, namespaces: I) -> Self
121    where
122        I: IntoIterator<Item = S>,
123        S: Into<String>,
124    {
125        for namespace in namespaces {
126            self.default_namespaces.insert(namespace.into());
127        }
128        self
129    }
130
131    /// Set maximum payload size
132    #[must_use]
133    pub fn with_max_payload_size(mut self, size: usize) -> Self {
134        self.max_payload_size = size.min(7800); // Enforce pg_notify limit
135        self
136    }
137
138    /// Enable/disable metadata inclusion
139    #[must_use]
140    pub fn with_metadata_included(mut self, include: bool) -> Self {
141        self.include_metadata = include;
142        self
143    }
144
145    /// Validate the configuration
146    pub fn validate(&self) -> Result<()> {
147        // Test regex compilation
148        self.compiled_pattern()?;
149
150        // Validate payload size
151        if self.max_payload_size > 8000 {
152            return Err(PgmqNotifyError::config(
153                "max_payload_size cannot exceed 8000 bytes (pg_notify limit)",
154            ));
155        }
156
157        // Validate channel prefix
158        if let Some(ref prefix) = self.channels_prefix {
159            if prefix.is_empty() || prefix.len() > 20 {
160                return Err(PgmqNotifyError::config(
161                    "channels_prefix must be 1-20 characters",
162                ));
163            }
164        }
165
166        Ok(())
167    }
168
169    /// Compile the queue naming pattern regex
170    pub fn compiled_pattern(&self) -> Result<Regex> {
171        Regex::new(&self.queue_naming_pattern)
172            .map_err(|_| PgmqNotifyError::invalid_pattern(&self.queue_naming_pattern))
173    }
174
175    /// Extract namespace from queue name using the configured pattern
176    pub fn extract_namespace(&self, queue_name: &str) -> Result<String> {
177        let regex = self.compiled_pattern()?;
178
179        if let Some(captures) = regex.captures(queue_name) {
180            if let Some(namespace_match) = captures.name("namespace") {
181                return Ok(namespace_match.as_str().to_string());
182            }
183        }
184
185        Err(PgmqNotifyError::Configuration {
186            message: format!("Invalid namespace: {queue_name}"),
187        })
188    }
189
190    /// Build channel name with optional prefix, validating the result
191    pub fn build_channel_name(&self, base_channel: &str) -> Result<String> {
192        let name = match &self.channels_prefix {
193            Some(prefix) => format!("{}.{}", prefix, base_channel),
194            None => base_channel.to_string(),
195        };
196        validate_channel_name(&name)?;
197        Ok(name)
198    }
199
200    /// Build namespace-specific channel name, validating the result
201    pub fn build_namespace_channel(&self, base_channel: &str, namespace: &str) -> Result<String> {
202        let channel = format!("{}.{}", base_channel, namespace);
203        self.build_channel_name(&channel)
204    }
205
206    /// Get the queue created channel name
207    pub fn queue_created_channel(&self) -> Result<String> {
208        self.build_channel_name("pgmq_queue_created")
209    }
210
211    /// Get the message ready channel name for a namespace
212    pub fn message_ready_channel(&self, namespace: &str) -> Result<String> {
213        self.build_namespace_channel("pgmq_message_ready", namespace)
214    }
215
216    /// Get the global message ready channel name
217    pub fn global_message_ready_channel(&self) -> Result<String> {
218        self.build_channel_name("pgmq_message_ready")
219    }
220}
221
222/// Validate that a fully-constructed channel name is safe for PostgreSQL NOTIFY/LISTEN.
223///
224/// Channel names are interpolated directly into SQL (NOTIFY/LISTEN don't support
225/// parameterized identifiers), so this validation prevents SQL injection and
226/// malformed identifiers.
227///
228/// Rules:
229/// - Must start with a letter or underscore
230/// - May contain only `[a-zA-Z0-9_.]` (dots used as prefix separator)
231/// - Maximum length: 63 characters (PostgreSQL identifier limit)
232/// - Must not be empty
233pub fn validate_channel_name(channel: &str) -> Result<()> {
234    if channel.is_empty() {
235        return Err(PgmqNotifyError::invalid_channel(
236            "Channel name cannot be empty",
237        ));
238    }
239
240    if channel.len() > MAX_PG_IDENTIFIER_LENGTH {
241        return Err(PgmqNotifyError::invalid_channel(format!(
242            "Channel name '{}' exceeds maximum length of {} characters",
243            channel, MAX_PG_IDENTIFIER_LENGTH
244        )));
245    }
246
247    let valid_start = channel
248        .chars()
249        .next()
250        .is_some_and(|c| c.is_ascii_alphabetic() || c == '_');
251    if !valid_start {
252        return Err(PgmqNotifyError::invalid_channel(format!(
253            "Channel name '{}' must start with a letter or underscore",
254            channel
255        )));
256    }
257
258    if !channel
259        .chars()
260        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '.')
261    {
262        return Err(PgmqNotifyError::invalid_channel(format!(
263            "Channel name '{}' contains invalid characters. \
264             Only alphanumeric characters, underscores, and dots are allowed.",
265            channel
266        )));
267    }
268
269    Ok(())
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275
276    #[test]
277    fn test_default_config() {
278        let config = PgmqNotifyConfig::default();
279        assert!(config.validate().is_ok());
280        assert_eq!(config.queue_naming_pattern, r"(?P<namespace>\w+)_queue");
281        assert!(config.channels_prefix.is_none());
282        assert!(!config.enable_triggers);
283    }
284
285    #[test]
286    fn test_namespace_extraction() {
287        let config = PgmqNotifyConfig::default();
288
289        assert_eq!(config.extract_namespace("orders_queue").unwrap(), "orders");
290        assert_eq!(
291            config.extract_namespace("inventory_queue").unwrap(),
292            "inventory"
293        );
294        assert!(
295            config.extract_namespace("no_match").is_err(),
296            "Should return error for non-matching pattern"
297        );
298    }
299
300    #[test]
301    fn test_channel_naming() {
302        let config = PgmqNotifyConfig::new().with_channels_prefix("app1");
303
304        assert_eq!(
305            config.queue_created_channel().unwrap(),
306            "app1.pgmq_queue_created"
307        );
308        assert_eq!(
309            config.message_ready_channel("orders").unwrap(),
310            "app1.pgmq_message_ready.orders"
311        );
312    }
313
314    #[test]
315    fn test_validation() {
316        // Invalid regex
317        let config = PgmqNotifyConfig::new().with_queue_naming_pattern("[invalid");
318        assert!(config.validate().is_err());
319
320        // Payload size gets capped, so validation passes
321        let config = PgmqNotifyConfig::new().with_max_payload_size(10000);
322        assert!(config.validate().is_ok());
323        assert_eq!(config.max_payload_size, 7800); // Should be capped
324
325        // Valid config
326        let config = PgmqNotifyConfig::new()
327            .with_channels_prefix("test")
328            .with_default_namespace("orders");
329        assert!(config.validate().is_ok());
330    }
331
332    // =========================================================================
333    // TAS-226: Channel Name Validation Tests
334    // =========================================================================
335
336    #[test]
337    fn test_validate_channel_name_valid() {
338        assert!(validate_channel_name("pgmq_queue_created").is_ok());
339        assert!(validate_channel_name("app1.pgmq_message_ready").is_ok());
340        assert!(validate_channel_name("app1.pgmq_message_ready.orders").is_ok());
341        assert!(validate_channel_name("_private_channel").is_ok());
342        assert!(validate_channel_name("a").is_ok());
343    }
344
345    #[test]
346    fn test_validate_channel_name_invalid_chars() {
347        assert!(validate_channel_name("channel;DROP TABLE").is_err());
348        assert!(validate_channel_name("channel name").is_err());
349        assert!(validate_channel_name("channel-name").is_err());
350        assert!(validate_channel_name("channel@name").is_err());
351        assert!(validate_channel_name("channel'name").is_err());
352    }
353
354    #[test]
355    fn test_validate_channel_name_empty() {
356        assert!(validate_channel_name("").is_err());
357    }
358
359    #[test]
360    fn test_validate_channel_name_too_long() {
361        let long_name = "a".repeat(MAX_PG_IDENTIFIER_LENGTH + 1);
362        assert!(validate_channel_name(&long_name).is_err());
363
364        // Exactly at limit should pass
365        let exact_name = "a".repeat(MAX_PG_IDENTIFIER_LENGTH);
366        assert!(validate_channel_name(&exact_name).is_ok());
367    }
368
369    #[test]
370    fn test_validate_channel_name_must_start_with_letter_or_underscore() {
371        assert!(validate_channel_name("1channel").is_err());
372        assert!(validate_channel_name(".channel").is_err());
373        assert!(validate_channel_name("_channel").is_ok());
374        assert!(validate_channel_name("channel").is_ok());
375    }
376
377    #[test]
378    fn test_build_channel_name_validates() {
379        let config = PgmqNotifyConfig::default();
380        // Valid base channel
381        assert!(config.build_channel_name("pgmq_queue_created").is_ok());
382
383        // With prefix
384        let config = PgmqNotifyConfig::new().with_channels_prefix("app1");
385        assert!(config.build_channel_name("pgmq_queue_created").is_ok());
386    }
387
388    #[test]
389    fn test_build_namespace_channel_validates() {
390        let config = PgmqNotifyConfig::default();
391        assert!(config
392            .build_namespace_channel("pgmq_message_ready", "orders")
393            .is_ok());
394
395        // Invalid namespace with special chars should fail
396        assert!(config
397            .build_namespace_channel("pgmq_message_ready", "bad;name")
398            .is_err());
399    }
400}