1use regex::Regex;
7use serde::{Deserialize, Serialize};
8use std::collections::HashSet;
9
10use crate::error::{PgmqNotifyError, Result};
11
12const MAX_PG_IDENTIFIER_LENGTH: usize = 63;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct PgmqNotifyConfig {
51 pub queue_naming_pattern: String,
55
56 pub channels_prefix: Option<String>,
59
60 pub enable_triggers: bool,
63
64 pub default_namespaces: HashSet<String>,
66
67 pub max_payload_size: usize,
69
70 pub include_metadata: bool,
72}
73
74impl Default for PgmqNotifyConfig {
75 fn default() -> Self {
76 Self {
77 queue_naming_pattern: r"(?P<namespace>\w+)_queue".to_string(),
78 channels_prefix: None,
79 enable_triggers: false,
80 default_namespaces: HashSet::new(),
81 max_payload_size: 7800, include_metadata: true,
83 }
84 }
85}
86
87impl PgmqNotifyConfig {
88 #[must_use]
90 pub fn new() -> Self {
91 Self::default()
92 }
93
94 pub fn with_queue_naming_pattern<S: Into<String>>(mut self, pattern: S) -> Self {
96 self.queue_naming_pattern = pattern.into();
97 self
98 }
99
100 pub fn with_channels_prefix<S: Into<String>>(mut self, prefix: S) -> Self {
102 self.channels_prefix = Some(prefix.into());
103 self
104 }
105
106 #[must_use]
108 pub fn with_triggers_enabled(mut self, enabled: bool) -> Self {
109 self.enable_triggers = enabled;
110 self
111 }
112
113 pub fn with_default_namespace<S: Into<String>>(mut self, namespace: S) -> Self {
115 self.default_namespaces.insert(namespace.into());
116 self
117 }
118
119 pub fn with_default_namespaces<I, S>(mut self, namespaces: I) -> Self
121 where
122 I: IntoIterator<Item = S>,
123 S: Into<String>,
124 {
125 for namespace in namespaces {
126 self.default_namespaces.insert(namespace.into());
127 }
128 self
129 }
130
131 #[must_use]
133 pub fn with_max_payload_size(mut self, size: usize) -> Self {
134 self.max_payload_size = size.min(7800); self
136 }
137
138 #[must_use]
140 pub fn with_metadata_included(mut self, include: bool) -> Self {
141 self.include_metadata = include;
142 self
143 }
144
145 pub fn validate(&self) -> Result<()> {
147 self.compiled_pattern()?;
149
150 if self.max_payload_size > 8000 {
152 return Err(PgmqNotifyError::config(
153 "max_payload_size cannot exceed 8000 bytes (pg_notify limit)",
154 ));
155 }
156
157 if let Some(ref prefix) = self.channels_prefix {
159 if prefix.is_empty() || prefix.len() > 20 {
160 return Err(PgmqNotifyError::config(
161 "channels_prefix must be 1-20 characters",
162 ));
163 }
164 }
165
166 Ok(())
167 }
168
169 pub fn compiled_pattern(&self) -> Result<Regex> {
171 Regex::new(&self.queue_naming_pattern)
172 .map_err(|_| PgmqNotifyError::invalid_pattern(&self.queue_naming_pattern))
173 }
174
175 pub fn extract_namespace(&self, queue_name: &str) -> Result<String> {
177 let regex = self.compiled_pattern()?;
178
179 if let Some(captures) = regex.captures(queue_name) {
180 if let Some(namespace_match) = captures.name("namespace") {
181 return Ok(namespace_match.as_str().to_string());
182 }
183 }
184
185 Err(PgmqNotifyError::Configuration {
186 message: format!("Invalid namespace: {queue_name}"),
187 })
188 }
189
190 pub fn build_channel_name(&self, base_channel: &str) -> Result<String> {
192 let name = match &self.channels_prefix {
193 Some(prefix) => format!("{}.{}", prefix, base_channel),
194 None => base_channel.to_string(),
195 };
196 validate_channel_name(&name)?;
197 Ok(name)
198 }
199
200 pub fn build_namespace_channel(&self, base_channel: &str, namespace: &str) -> Result<String> {
202 let channel = format!("{}.{}", base_channel, namespace);
203 self.build_channel_name(&channel)
204 }
205
206 pub fn queue_created_channel(&self) -> Result<String> {
208 self.build_channel_name("pgmq_queue_created")
209 }
210
211 pub fn message_ready_channel(&self, namespace: &str) -> Result<String> {
213 self.build_namespace_channel("pgmq_message_ready", namespace)
214 }
215
216 pub fn global_message_ready_channel(&self) -> Result<String> {
218 self.build_channel_name("pgmq_message_ready")
219 }
220}
221
222pub fn validate_channel_name(channel: &str) -> Result<()> {
234 if channel.is_empty() {
235 return Err(PgmqNotifyError::invalid_channel(
236 "Channel name cannot be empty",
237 ));
238 }
239
240 if channel.len() > MAX_PG_IDENTIFIER_LENGTH {
241 return Err(PgmqNotifyError::invalid_channel(format!(
242 "Channel name '{}' exceeds maximum length of {} characters",
243 channel, MAX_PG_IDENTIFIER_LENGTH
244 )));
245 }
246
247 let valid_start = channel
248 .chars()
249 .next()
250 .is_some_and(|c| c.is_ascii_alphabetic() || c == '_');
251 if !valid_start {
252 return Err(PgmqNotifyError::invalid_channel(format!(
253 "Channel name '{}' must start with a letter or underscore",
254 channel
255 )));
256 }
257
258 if !channel
259 .chars()
260 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '.')
261 {
262 return Err(PgmqNotifyError::invalid_channel(format!(
263 "Channel name '{}' contains invalid characters. \
264 Only alphanumeric characters, underscores, and dots are allowed.",
265 channel
266 )));
267 }
268
269 Ok(())
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275
276 #[test]
277 fn test_default_config() {
278 let config = PgmqNotifyConfig::default();
279 assert!(config.validate().is_ok());
280 assert_eq!(config.queue_naming_pattern, r"(?P<namespace>\w+)_queue");
281 assert!(config.channels_prefix.is_none());
282 assert!(!config.enable_triggers);
283 }
284
285 #[test]
286 fn test_namespace_extraction() {
287 let config = PgmqNotifyConfig::default();
288
289 assert_eq!(config.extract_namespace("orders_queue").unwrap(), "orders");
290 assert_eq!(
291 config.extract_namespace("inventory_queue").unwrap(),
292 "inventory"
293 );
294 assert!(
295 config.extract_namespace("no_match").is_err(),
296 "Should return error for non-matching pattern"
297 );
298 }
299
300 #[test]
301 fn test_channel_naming() {
302 let config = PgmqNotifyConfig::new().with_channels_prefix("app1");
303
304 assert_eq!(
305 config.queue_created_channel().unwrap(),
306 "app1.pgmq_queue_created"
307 );
308 assert_eq!(
309 config.message_ready_channel("orders").unwrap(),
310 "app1.pgmq_message_ready.orders"
311 );
312 }
313
314 #[test]
315 fn test_validation() {
316 let config = PgmqNotifyConfig::new().with_queue_naming_pattern("[invalid");
318 assert!(config.validate().is_err());
319
320 let config = PgmqNotifyConfig::new().with_max_payload_size(10000);
322 assert!(config.validate().is_ok());
323 assert_eq!(config.max_payload_size, 7800); let config = PgmqNotifyConfig::new()
327 .with_channels_prefix("test")
328 .with_default_namespace("orders");
329 assert!(config.validate().is_ok());
330 }
331
332 #[test]
337 fn test_validate_channel_name_valid() {
338 assert!(validate_channel_name("pgmq_queue_created").is_ok());
339 assert!(validate_channel_name("app1.pgmq_message_ready").is_ok());
340 assert!(validate_channel_name("app1.pgmq_message_ready.orders").is_ok());
341 assert!(validate_channel_name("_private_channel").is_ok());
342 assert!(validate_channel_name("a").is_ok());
343 }
344
345 #[test]
346 fn test_validate_channel_name_invalid_chars() {
347 assert!(validate_channel_name("channel;DROP TABLE").is_err());
348 assert!(validate_channel_name("channel name").is_err());
349 assert!(validate_channel_name("channel-name").is_err());
350 assert!(validate_channel_name("channel@name").is_err());
351 assert!(validate_channel_name("channel'name").is_err());
352 }
353
354 #[test]
355 fn test_validate_channel_name_empty() {
356 assert!(validate_channel_name("").is_err());
357 }
358
359 #[test]
360 fn test_validate_channel_name_too_long() {
361 let long_name = "a".repeat(MAX_PG_IDENTIFIER_LENGTH + 1);
362 assert!(validate_channel_name(&long_name).is_err());
363
364 let exact_name = "a".repeat(MAX_PG_IDENTIFIER_LENGTH);
366 assert!(validate_channel_name(&exact_name).is_ok());
367 }
368
369 #[test]
370 fn test_validate_channel_name_must_start_with_letter_or_underscore() {
371 assert!(validate_channel_name("1channel").is_err());
372 assert!(validate_channel_name(".channel").is_err());
373 assert!(validate_channel_name("_channel").is_ok());
374 assert!(validate_channel_name("channel").is_ok());
375 }
376
377 #[test]
378 fn test_build_channel_name_validates() {
379 let config = PgmqNotifyConfig::default();
380 assert!(config.build_channel_name("pgmq_queue_created").is_ok());
382
383 let config = PgmqNotifyConfig::new().with_channels_prefix("app1");
385 assert!(config.build_channel_name("pgmq_queue_created").is_ok());
386 }
387
388 #[test]
389 fn test_build_namespace_channel_validates() {
390 let config = PgmqNotifyConfig::default();
391 assert!(config
392 .build_namespace_channel("pgmq_message_ready", "orders")
393 .is_ok());
394
395 assert!(config
397 .build_namespace_channel("pgmq_message_ready", "bad;name")
398 .is_err());
399 }
400}