Skip to main content

camel_component_jms/
config.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::Duration;
6
7use camel_component_api::NetworkRetryPolicy;
8
9use crate::BrokerType;
10
11pub fn default_bridge_cache_dir() -> PathBuf {
12    camel_bridge::download::default_cache_dir()
13}
14
15fn default_max_bridges() -> usize {
16    8
17}
18
19fn default_bridge_start_timeout_ms() -> u64 {
20    30_000
21}
22
23pub fn default_broker_reconnect_interval_ms() -> u64 {
24    5_000
25}
26
27/// Per-component reconnect default: unlimited retries (max_attempts=0),
28/// preserving the previous infinite-reconnect behavior via BackoffState.
29/// Operators can opt into bounded retry via TOML `[reconnect]`.
30pub(crate) fn jms_reconnect_default() -> NetworkRetryPolicy {
31    NetworkRetryPolicy {
32        max_attempts: 0, // unlimited
33        initial_delay: Duration::from_millis(default_broker_reconnect_interval_ms()),
34        multiplier: 2.0,
35        max_delay: Duration::from_secs(30),
36        jitter_factor: 0.0,
37        ..NetworkRetryPolicy::default()
38    }
39}
40
41fn default_health_check_interval_ms() -> u64 {
42    5_000
43}
44
45#[derive(Debug, Clone, PartialEq)]
46pub enum DestinationType {
47    Queue,
48    Topic,
49}
50
51// ── JMS-009: Acknowledgement mode ────────────────────────────────────────────
52
53/// JMS session acknowledgement mode.
54///
55/// Controls how the JMS provider acknowledges consumed messages.
56///
57/// - `Auto`: The session automatically acknowledges a message after it is
58///   delivered to the consumer. This is the simplest mode but may lose
59///   messages if the consumer fails before processing.
60/// - `Client`: The consumer must explicitly acknowledge each message.
61///   Provides full control over when acknowledgement occurs.
62/// - `DupsOk`: The session lazily acknowledges messages, which may result
63///   in duplicate deliveries. Optimises throughput at the cost of
64///   potential duplicates.
65/// - `Transacted`: Messages are acknowledged as part of a transaction.
66///   See [`JmsTransactionMode`] for transaction configuration.
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
68pub enum AcknowledgementMode {
69    #[default]
70    Auto,
71    Client,
72    DupsOk,
73    Transacted,
74}
75
76impl fmt::Display for AcknowledgementMode {
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        match self {
79            Self::Auto => write!(f, "Auto"),
80            Self::Client => write!(f, "Client"),
81            Self::DupsOk => write!(f, "DupsOk"),
82            Self::Transacted => write!(f, "Transacted"),
83        }
84    }
85}
86
87impl FromStr for AcknowledgementMode {
88    type Err = camel_component_api::CamelError;
89
90    fn from_str(s: &str) -> Result<Self, Self::Err> {
91        match s {
92            "Auto" | "auto" => Ok(Self::Auto),
93            "Client" | "client" => Ok(Self::Client),
94            "DupsOk" | "dupsOk" | "dups_ok" => Ok(Self::DupsOk),
95            "Transacted" | "transacted" => Ok(Self::Transacted),
96            _ => Err(camel_component_api::CamelError::ProcessorError(format!(
97                "invalid acknowledgement mode '{}': expected Auto, Client, DupsOk, or Transacted",
98                s
99            ))),
100        }
101    }
102}
103
104// ── JMS-012: Transaction mode ────────────────────────────────────────────────
105
106/// JMS transaction mode for sessions.
107///
108/// - `None`: No transaction boundaries are applied. Each operation is
109///   auto-acknowledged (subject to the acknowledgement mode).
110/// - `Session`: All send/receive operations within a session are batched
111///   into a single transaction that must be explicitly committed or rolled
112///   back. **Not yet implemented** — using this value emits a warning and
113///   falls back to `None`.
114#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
115pub enum JmsTransactionMode {
116    #[default]
117    None,
118    Session,
119}
120
121impl fmt::Display for JmsTransactionMode {
122    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123        match self {
124            Self::None => write!(f, "None"),
125            Self::Session => write!(f, "Session"),
126        }
127    }
128}
129
130impl FromStr for JmsTransactionMode {
131    type Err = camel_component_api::CamelError;
132
133    fn from_str(s: &str) -> Result<Self, Self::Err> {
134        match s {
135            "None" | "none" => Ok(Self::None),
136            "Session" | "session" => Ok(Self::Session),
137            _ => Err(camel_component_api::CamelError::ProcessorError(format!(
138                "invalid transaction mode '{}': expected None or Session",
139                s
140            ))),
141        }
142    }
143}
144
145// ── JMS-005: Exchange pattern ────────────────────────────────────────────────
146
147/// Message exchange pattern for JMS endpoints.
148///
149/// - `InOnly`: Fire-and-forget (default). The producer sends a message and
150///   does not wait for a reply.
151/// - `InOut`: Request-reply. The producer sends a message and waits for a
152///   correlated reply on a temporary or dedicated reply destination.
153///   **Not yet implemented** — using this value emits a warning.
154#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
155pub enum ExchangePattern {
156    #[default]
157    InOnly,
158    InOut,
159}
160
161impl fmt::Display for ExchangePattern {
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        match self {
164            Self::InOnly => write!(f, "InOnly"),
165            Self::InOut => write!(f, "InOut"),
166        }
167    }
168}
169
170impl FromStr for ExchangePattern {
171    type Err = camel_component_api::CamelError;
172
173    fn from_str(s: &str) -> Result<Self, Self::Err> {
174        match s {
175            "InOnly" | "inOnly" | "in_only" => Ok(Self::InOnly),
176            "InOut" | "inOut" | "in_out" => Ok(Self::InOut),
177            _ => Err(camel_component_api::CamelError::ProcessorError(format!(
178                "invalid exchange pattern '{}': expected InOnly or InOut",
179                s
180            ))),
181        }
182    }
183}
184
185// ── Endpoint config ──────────────────────────────────────────────────────────
186
187fn default_concurrent_consumers() -> u32 {
188    1
189}
190
191#[derive(Debug, Clone)]
192pub struct JmsEndpointConfig {
193    pub destination_type: DestinationType,
194    pub destination_name: String,
195    pub broker_name: Option<String>,
196
197    // JMS-009: Acknowledgement mode for the JMS session (default: Auto).
198    pub acknowledgement_mode: AcknowledgementMode,
199
200    // JMS-010: JMS SQL-92 selector expression used to filter incoming messages.
201    // Only messages matching the selector are delivered to the consumer.
202    pub message_selector: Option<String>,
203
204    // JMS-011: Number of concurrent consumer tasks to spawn for this endpoint.
205    // Each consumer polls messages independently. Default: 1.
206    pub concurrent_consumers: u32,
207
208    // JMS-012: Transaction mode for the JMS session (default: None).
209    pub transaction_mode: JmsTransactionMode,
210
211    // JMS-013: QoS options for outbound messages.
212    /// Message time-to-live in milliseconds. `None` means no expiration.
213    pub time_to_live: Option<u64>,
214    /// JMS message priority (0-9, where 9 is highest). `None` uses broker default.
215    pub priority: Option<u8>,
216    /// Whether to use persistent delivery mode (default: true). Persistent
217    /// messages survive broker restarts; non-persistent messages may be lost.
218    pub persistent_delivery: bool,
219
220    // JMS-018: When `true` (default), JMS message properties are mapped to
221    // Camel exchange headers on receive and Camel exchange headers are mapped
222    // to JMS message properties on send.
223    pub map_jms_headers: bool,
224
225    // JMS-005: Message exchange pattern (default: InOnly).
226    pub exchange_pattern: ExchangePattern,
227}
228
229impl JmsEndpointConfig {
230    pub fn from_uri(uri: &str) -> Result<Self, camel_component_api::CamelError> {
231        let (scheme, rest) = if let Some(r) = uri.strip_prefix("jms:") {
232            ("jms", r)
233        } else if let Some(r) = uri.strip_prefix("activemq:") {
234            ("activemq", r)
235        } else if let Some(r) = uri.strip_prefix("artemis:") {
236            ("artemis", r)
237        } else {
238            return Err(camel_component_api::CamelError::ProcessorError(
239                "expected scheme 'jms', 'activemq', or 'artemis'".to_string(),
240            ));
241        };
242
243        let (path, query) = match rest.split_once('?') {
244            Some((p, q)) => (p, Some(q)),
245            None => (rest, None),
246        };
247
248        let (destination_type, destination_name) =
249            match path.splitn(2, ':').collect::<Vec<_>>().as_slice() {
250                // Shorthand (no prefix): only allowed for activemq/artemis, NOT jms
251                [name] if !name.is_empty() && scheme != "jms" => {
252                    (DestinationType::Queue, name.to_string())
253                }
254                // Explicit queue: or topic:
255                [prefix, name]
256                    if (*prefix == "queue" || *prefix == "topic") && !name.is_empty() =>
257                {
258                    let dt = if *prefix == "queue" {
259                        DestinationType::Queue
260                    } else {
261                        DestinationType::Topic
262                    };
263                    (dt, name.to_string())
264                }
265                // jms: shorthand (rejected)
266                [name] if !name.is_empty() && scheme == "jms" => {
267                    return Err(camel_component_api::CamelError::ProcessorError(format!(
268                        "URI 'jms:{}' is ambiguous — use 'jms:queue:{}' or 'jms:topic:{}'",
269                        name, name, name
270                    )));
271                }
272                _ => {
273                    return Err(camel_component_api::CamelError::ProcessorError(
274                        "destination must be 'queue:<name>' or 'topic:<name>'".to_string(),
275                    ));
276                }
277            };
278
279        // Parse query parameters
280        let mut broker_name: Option<String> = None;
281        let mut acknowledgement_mode = AcknowledgementMode::default();
282        let mut message_selector: Option<String> = None;
283        let mut concurrent_consumers = default_concurrent_consumers();
284        let mut transaction_mode = JmsTransactionMode::default();
285        let mut time_to_live: Option<u64> = None;
286        let mut priority: Option<u8> = None;
287        let mut persistent_delivery = true;
288        let mut map_jms_headers = true;
289        let mut exchange_pattern = ExchangePattern::default();
290
291        if let Some(q) = query {
292            for kv in q.split('&') {
293                let Some((k, v)) = kv.split_once('=') else {
294                    continue;
295                };
296                match k {
297                    "broker" if !v.is_empty() => {
298                        broker_name = Some(v.to_string());
299                    }
300                    "acknowledgementMode" | "acknowledgement_mode" => {
301                        acknowledgement_mode = AcknowledgementMode::from_str(v)?;
302                    }
303                    "messageSelector" | "message_selector" if !v.is_empty() => {
304                        message_selector = Some(v.to_string());
305                    }
306                    "concurrentConsumers" | "concurrent_consumers" => {
307                        concurrent_consumers = v.parse::<u32>().map_err(|_| {
308                            camel_component_api::CamelError::ProcessorError(format!(
309                                "invalid concurrent_consumers '{}': expected positive integer",
310                                v
311                            ))
312                        })?;
313                        if concurrent_consumers == 0 {
314                            return Err(camel_component_api::CamelError::ProcessorError(
315                                "concurrent_consumers must be >= 1".to_string(),
316                            ));
317                        }
318                    }
319                    "transactionMode" | "transaction_mode" => {
320                        transaction_mode = JmsTransactionMode::from_str(v)?;
321                    }
322                    "timeToLive" | "time_to_live" => {
323                        time_to_live = Some(v.parse::<u64>().map_err(|_| {
324                            camel_component_api::CamelError::ProcessorError(format!(
325                                "invalid time_to_live '{}': expected non-negative integer (ms)",
326                                v
327                            ))
328                        })?);
329                    }
330                    "priority" => {
331                        let p = v.parse::<u8>().map_err(|_| {
332                            camel_component_api::CamelError::ProcessorError(format!(
333                                "invalid priority '{}': expected integer 0-9",
334                                v
335                            ))
336                        })?;
337                        if p > 9 {
338                            return Err(camel_component_api::CamelError::ProcessorError(format!(
339                                "invalid priority '{}': must be 0-9",
340                                p
341                            )));
342                        }
343                        priority = Some(p);
344                    }
345                    "persistentDelivery" | "persistent_delivery" => {
346                        persistent_delivery = v.parse::<bool>().map_err(|_| {
347                            camel_component_api::CamelError::ProcessorError(format!(
348                                "invalid persistent_delivery '{}': expected true or false",
349                                v
350                            ))
351                        })?;
352                    }
353                    "mapJmsHeaders" | "map_jms_headers" => {
354                        map_jms_headers = v.parse::<bool>().map_err(|_| {
355                            camel_component_api::CamelError::ProcessorError(format!(
356                                "invalid map_jms_headers '{}': expected true or false",
357                                v
358                            ))
359                        })?;
360                    }
361                    "exchangePattern" | "exchange_pattern" => {
362                        exchange_pattern = ExchangePattern::from_str(v)?;
363                    }
364                    _ => {} // ignore unknown params
365                }
366            }
367        }
368
369        Ok(JmsEndpointConfig {
370            destination_type,
371            destination_name,
372            broker_name,
373            acknowledgement_mode,
374            message_selector,
375            concurrent_consumers,
376            transaction_mode,
377            time_to_live,
378            priority,
379            persistent_delivery,
380            map_jms_headers,
381            exchange_pattern,
382        })
383    }
384}
385
386#[derive(Clone, PartialEq, serde::Deserialize)]
387pub struct BrokerConfig {
388    pub broker_url: String,
389    pub broker_type: BrokerType,
390    pub username: Option<String>,
391    pub password: Option<String>,
392}
393
394impl std::fmt::Debug for BrokerConfig {
395    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
396        f.debug_struct("BrokerConfig")
397            .field("broker_url", &self.broker_url)
398            .field("broker_type", &self.broker_type)
399            .field("username", &self.username)
400            .field("password", &self.password.as_ref().map(|_| "<redacted>"))
401            .finish()
402    }
403}
404
405#[derive(Debug, Clone, serde::Deserialize)]
406pub struct JmsPoolConfig {
407    #[serde(default)]
408    pub brokers: HashMap<String, BrokerConfig>,
409    #[serde(default = "default_max_bridges")]
410    pub max_bridges: usize,
411    #[serde(default = "default_bridge_start_timeout_ms")]
412    pub bridge_start_timeout_ms: u64,
413    #[serde(default = "default_broker_reconnect_interval_ms")]
414    pub broker_reconnect_interval_ms: u64,
415    #[serde(default = "default_health_check_interval_ms")]
416    pub health_check_interval_ms: u64,
417    #[serde(default = "default_bridge_cache_dir")]
418    pub bridge_cache_dir: PathBuf,
419    #[serde(default = "jms_reconnect_default")]
420    pub reconnect: NetworkRetryPolicy,
421}
422
423impl Default for JmsPoolConfig {
424    fn default() -> Self {
425        Self {
426            brokers: HashMap::new(),
427            max_bridges: default_max_bridges(),
428            bridge_start_timeout_ms: default_bridge_start_timeout_ms(),
429            broker_reconnect_interval_ms: default_broker_reconnect_interval_ms(),
430            health_check_interval_ms: default_health_check_interval_ms(),
431            bridge_cache_dir: default_bridge_cache_dir(),
432            reconnect: jms_reconnect_default(),
433        }
434    }
435}
436
437impl JmsPoolConfig {
438    /// Convenience constructor for single-broker scenarios (tests, simple examples).
439    /// Creates a pool with one broker named "default".
440    pub fn single_broker(broker_url: impl Into<String>, broker_type: BrokerType) -> Self {
441        let url = broker_url.into();
442        let mut brokers = HashMap::new();
443        brokers.insert(
444            "default".to_string(),
445            BrokerConfig {
446                broker_url: url,
447                broker_type,
448                username: None,
449                password: None,
450            },
451        );
452        Self {
453            brokers,
454            max_bridges: 1,
455            ..Self::default()
456        }
457    }
458
459    /// Validates the config: all brokers must have non-empty URLs with a known
460    /// scheme, `max_bridges` must be >= 1, and all timing fields must be strictly
461    /// positive to prevent busy-loops.
462    pub fn validate(&self) -> Result<(), camel_component_api::CamelError> {
463        use camel_component_api::CamelError;
464
465        if self.max_bridges < 1 {
466            return Err(CamelError::Config("max_bridges must be >= 1".to_string()));
467        }
468
469        let known_schemes = ["tcp://", "ssl://", "failover://", "ws://", "wss://"];
470
471        for (name, bc) in &self.brokers {
472            if bc.broker_url.is_empty() {
473                return Err(CamelError::ProcessorError(format!(
474                    "broker '{}' has an empty broker_url",
475                    name
476                )));
477            }
478
479            let has_known_scheme = known_schemes.iter().any(|s| bc.broker_url.starts_with(s));
480            if !has_known_scheme {
481                return Err(CamelError::ProcessorError(format!(
482                    "broker '{}' has an invalid broker_url '{}': must start with one of {:?}",
483                    name, bc.broker_url, known_schemes
484                )));
485            }
486        }
487
488        if self.bridge_start_timeout_ms == 0 {
489            return Err(CamelError::Config(
490                "bridge_start_timeout_ms must be > 0".to_string(),
491            ));
492        }
493        if self.health_check_interval_ms == 0 {
494            return Err(CamelError::Config(
495                "health_check_interval_ms must be > 0".to_string(),
496            ));
497        }
498        if self.broker_reconnect_interval_ms == 0 {
499            return Err(CamelError::Config(
500                "broker_reconnect_interval_ms must be > 0".to_string(),
501            ));
502        }
503
504        Ok(())
505    }
506}
507
508#[cfg(test)]
509mod tests {
510    use super::*;
511    use crate::BrokerType;
512
513    #[test]
514    fn parse_jms_queue_explicit() {
515        let cfg = JmsEndpointConfig::from_uri("jms:queue:orders").unwrap();
516        assert_eq!(cfg.destination_type, DestinationType::Queue);
517        assert_eq!(cfg.destination_name, "orders");
518        assert_eq!(cfg.broker_name, None);
519    }
520
521    #[test]
522    fn parse_jms_topic_explicit() {
523        let cfg = JmsEndpointConfig::from_uri("jms:topic:events").unwrap();
524        assert_eq!(cfg.destination_type, DestinationType::Topic);
525        assert_eq!(cfg.destination_name, "events");
526    }
527
528    #[test]
529    fn parse_jms_with_broker_param() {
530        let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?broker=primary").unwrap();
531        assert_eq!(cfg.broker_name, Some("primary".to_string()));
532        assert_eq!(cfg.destination_name, "orders");
533    }
534
535    #[test]
536    fn jms_shorthand_rejected() {
537        let err = JmsEndpointConfig::from_uri("jms:orders").unwrap_err();
538        assert!(err.to_string().contains("ambiguous"), "got: {}", err);
539    }
540
541    #[test]
542    fn invalid_destination_type_returns_error() {
543        let err = JmsEndpointConfig::from_uri("jms:inbox:orders").unwrap_err();
544        assert!(
545            err.to_string().contains("'queue' or 'topic'")
546                || err.to_string().contains("queue:<name>"),
547            "got: {}",
548            err
549        );
550    }
551
552    #[test]
553    fn parse_activemq_queue_explicit() {
554        let cfg = JmsEndpointConfig::from_uri("activemq:queue:orders").unwrap();
555        assert_eq!(cfg.destination_type, DestinationType::Queue);
556        assert_eq!(cfg.destination_name, "orders");
557    }
558
559    #[test]
560    fn parse_activemq_shorthand() {
561        let cfg = JmsEndpointConfig::from_uri("activemq:orders").unwrap();
562        assert_eq!(cfg.destination_type, DestinationType::Queue);
563        assert_eq!(cfg.destination_name, "orders");
564    }
565
566    #[test]
567    fn parse_artemis_shorthand_defaults_to_queue() {
568        let cfg = JmsEndpointConfig::from_uri("artemis:events").unwrap();
569        assert_eq!(cfg.destination_type, DestinationType::Queue);
570        assert_eq!(cfg.destination_name, "events");
571    }
572
573    #[test]
574    fn parse_jms_with_empty_broker_param_treated_as_none() {
575        let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?broker=").unwrap();
576        assert_eq!(cfg.broker_name, None);
577    }
578
579    #[test]
580    fn parse_activemq_topic() {
581        let cfg = JmsEndpointConfig::from_uri("activemq:topic:events").unwrap();
582        assert_eq!(cfg.destination_type, DestinationType::Topic);
583        assert_eq!(cfg.destination_name, "events");
584    }
585
586    #[test]
587    fn single_broker_convenience() {
588        let cfg = JmsPoolConfig::single_broker("tcp://localhost:61616", BrokerType::ActiveMq);
589        assert_eq!(cfg.max_bridges, 1);
590        assert!(cfg.brokers.contains_key("default"));
591        let bc = &cfg.brokers["default"];
592        assert_eq!(bc.broker_url, "tcp://localhost:61616");
593        assert_eq!(bc.broker_type, BrokerType::ActiveMq);
594    }
595
596    #[test]
597    fn default_pool_config() {
598        let cfg = JmsPoolConfig::default();
599        assert_eq!(cfg.max_bridges, 8);
600        assert!(cfg.brokers.is_empty());
601        assert_eq!(cfg.bridge_start_timeout_ms, 30_000);
602        assert_eq!(cfg.broker_reconnect_interval_ms, 5_000);
603        assert_eq!(cfg.health_check_interval_ms, 5_000);
604    }
605
606    #[test]
607    fn validate_empty_broker_url() {
608        let cfg = JmsPoolConfig::single_broker("", BrokerType::ActiveMq);
609        let err = cfg.validate().unwrap_err();
610        assert!(err.to_string().contains("empty broker_url"), "got: {}", err);
611    }
612
613    #[test]
614    fn broker_config_debug_redacts_password() {
615        let bc = BrokerConfig {
616            broker_url: "tcp://localhost:61616".to_string(),
617            broker_type: BrokerType::ActiveMq,
618            username: Some("admin".to_string()),
619            password: Some("secret".to_string()),
620        };
621        let s = format!("{bc:?}");
622        assert!(s.contains("<redacted>"), "got: {s}");
623        assert!(!s.contains("secret"), "got: {s}");
624    }
625
626    #[test]
627    fn validate_ok() {
628        let cfg = JmsPoolConfig::single_broker("tcp://localhost:61616", BrokerType::ActiveMq);
629        cfg.validate().unwrap();
630    }
631
632    #[test]
633    fn validate_rejects_zero_bridge_start_timeout() {
634        let mut cfg = JmsPoolConfig::single_broker("tcp://localhost:61616", BrokerType::ActiveMq);
635        cfg.bridge_start_timeout_ms = 0;
636        let err = cfg.validate().unwrap_err();
637        assert!(
638            err.to_string().contains("bridge_start_timeout_ms"),
639            "got: {}",
640            err
641        );
642    }
643
644    #[test]
645    fn validate_rejects_zero_health_check_interval() {
646        let mut cfg = JmsPoolConfig::single_broker("tcp://localhost:61616", BrokerType::ActiveMq);
647        cfg.health_check_interval_ms = 0;
648        let err = cfg.validate().unwrap_err();
649        assert!(
650            err.to_string().contains("health_check_interval_ms"),
651            "got: {}",
652            err
653        );
654    }
655
656    #[test]
657    fn validate_rejects_zero_reconnect_interval() {
658        let mut cfg = JmsPoolConfig::single_broker("tcp://localhost:61616", BrokerType::ActiveMq);
659        cfg.broker_reconnect_interval_ms = 0;
660        let err = cfg.validate().unwrap_err();
661        assert!(
662            err.to_string().contains("broker_reconnect_interval_ms"),
663            "got: {}",
664            err
665        );
666    }
667
668    // ── JMS-006: max_bridges validation ──────────────────────────────────────
669
670    #[test]
671    fn rejects_zero_max_bridges() {
672        let mut cfg = JmsPoolConfig::single_broker("tcp://localhost:61616", BrokerType::ActiveMq);
673        cfg.max_bridges = 0;
674        let err = cfg.validate().unwrap_err();
675        assert!(err.to_string().contains("max_bridges"), "got: {}", err);
676    }
677
678    #[test]
679    fn rejects_zero_timeout() {
680        let mut cfg = JmsPoolConfig::single_broker("tcp://localhost:61616", BrokerType::ActiveMq);
681        cfg.bridge_start_timeout_ms = 0;
682        let err = cfg.validate().unwrap_err();
683        assert!(
684            err.to_string().contains("bridge_start_timeout_ms"),
685            "got: {}",
686            err
687        );
688    }
689
690    // ── JMS-016: broker URL scheme validation ────────────────────────────────
691
692    #[test]
693    fn rejects_empty_broker_url() {
694        let cfg = JmsPoolConfig::single_broker("", BrokerType::ActiveMq);
695        let err = cfg.validate().unwrap_err();
696        assert!(err.to_string().contains("empty broker_url"), "got: {}", err);
697    }
698
699    #[test]
700    fn rejects_unknown_broker_url_scheme() {
701        let cfg = JmsPoolConfig::single_broker("amqp://localhost:5672", BrokerType::ActiveMq);
702        let err = cfg.validate().unwrap_err();
703        assert!(
704            err.to_string().contains("invalid broker_url"),
705            "got: {}",
706            err
707        );
708    }
709
710    #[test]
711    fn accepts_known_broker_url_schemes() {
712        for url in &[
713            "tcp://localhost:61616",
714            "ssl://localhost:61617",
715            "failover://tcp://localhost:61616",
716            "ws://localhost:61618",
717            "wss://localhost:61619",
718        ] {
719            let cfg = JmsPoolConfig::single_broker(*url, BrokerType::ActiveMq);
720            assert!(cfg.validate().is_ok(), "scheme should be accepted: {url}");
721        }
722    }
723
724    #[test]
725    fn accepts_valid_config() {
726        let cfg = JmsPoolConfig::single_broker("tcp://localhost:61616", BrokerType::ActiveMq);
727        // ensure max_bridges and timeout have valid defaults
728        assert!(cfg.validate().is_ok());
729    }
730
731    // ── JMS-005/JMS-009/JMS-010/JMS-011/JMS-012/JMS-013/JMS-018: endpoint config ──
732
733    #[test]
734    fn default_endpoint_config_has_sensible_defaults() {
735        let cfg = JmsEndpointConfig::from_uri("jms:queue:orders").unwrap();
736        assert_eq!(cfg.acknowledgement_mode, AcknowledgementMode::Auto);
737        assert_eq!(cfg.message_selector, None);
738        assert_eq!(cfg.concurrent_consumers, 1);
739        assert_eq!(cfg.transaction_mode, JmsTransactionMode::None);
740        assert_eq!(cfg.time_to_live, None);
741        assert_eq!(cfg.priority, None);
742        assert!(cfg.persistent_delivery);
743        assert!(cfg.map_jms_headers);
744        assert_eq!(cfg.exchange_pattern, ExchangePattern::InOnly);
745    }
746
747    #[test]
748    fn parse_acknowledgement_mode_client() {
749        let cfg =
750            JmsEndpointConfig::from_uri("jms:queue:orders?acknowledgementMode=Client").unwrap();
751        assert_eq!(cfg.acknowledgement_mode, AcknowledgementMode::Client);
752    }
753
754    #[test]
755    fn parse_acknowledgement_mode_dups_ok() {
756        let cfg =
757            JmsEndpointConfig::from_uri("jms:queue:orders?acknowledgement_mode=dups_ok").unwrap();
758        assert_eq!(cfg.acknowledgement_mode, AcknowledgementMode::DupsOk);
759    }
760
761    #[test]
762    fn parse_acknowledgement_mode_invalid() {
763        let err = JmsEndpointConfig::from_uri("jms:queue:orders?acknowledgementMode=invalid")
764            .unwrap_err();
765        assert!(
766            err.to_string().contains("invalid acknowledgement mode"),
767            "got: {}",
768            err
769        );
770    }
771
772    #[test]
773    fn parse_message_selector() {
774        let cfg =
775            JmsEndpointConfig::from_uri("jms:queue:orders?messageSelector=priority%20%3E%205")
776                .unwrap();
777        // URL encoding is NOT decoded by from_uri — the raw value is stored
778        assert_eq!(cfg.message_selector, Some("priority%20%3E%205".to_string()));
779    }
780
781    #[test]
782    fn parse_empty_message_selector_is_none() {
783        let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?message_selector=").unwrap();
784        assert_eq!(cfg.message_selector, None);
785    }
786
787    #[test]
788    fn parse_concurrent_consumers() {
789        let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?concurrentConsumers=4").unwrap();
790        assert_eq!(cfg.concurrent_consumers, 4);
791    }
792
793    #[test]
794    fn parse_concurrent_consumers_zero_rejected() {
795        let err =
796            JmsEndpointConfig::from_uri("jms:queue:orders?concurrentConsumers=0").unwrap_err();
797        assert!(
798            err.to_string()
799                .contains("concurrent_consumers must be >= 1"),
800            "got: {}",
801            err
802        );
803    }
804
805    #[test]
806    fn parse_concurrent_consumers_invalid() {
807        let err =
808            JmsEndpointConfig::from_uri("jms:queue:orders?concurrentConsumers=abc").unwrap_err();
809        assert!(
810            err.to_string().contains("invalid concurrent_consumers"),
811            "got: {}",
812            err
813        );
814    }
815
816    #[test]
817    fn parse_transaction_mode_session() {
818        let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?transactionMode=Session").unwrap();
819        assert_eq!(cfg.transaction_mode, JmsTransactionMode::Session);
820    }
821
822    #[test]
823    fn parse_transaction_mode_invalid() {
824        let err =
825            JmsEndpointConfig::from_uri("jms:queue:orders?transaction_mode=invalid").unwrap_err();
826        assert!(
827            err.to_string().contains("invalid transaction mode"),
828            "got: {}",
829            err
830        );
831    }
832
833    #[test]
834    fn parse_time_to_live() {
835        let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?timeToLive=30000").unwrap();
836        assert_eq!(cfg.time_to_live, Some(30_000));
837    }
838
839    #[test]
840    fn parse_priority() {
841        let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?priority=5").unwrap();
842        assert_eq!(cfg.priority, Some(5));
843    }
844
845    #[test]
846    fn parse_priority_above_9_rejected() {
847        let err = JmsEndpointConfig::from_uri("jms:queue:orders?priority=10").unwrap_err();
848        assert!(err.to_string().contains("must be 0-9"), "got: {}", err);
849    }
850
851    #[test]
852    fn parse_persistent_delivery_false() {
853        let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?persistentDelivery=false").unwrap();
854        assert!(!cfg.persistent_delivery);
855    }
856
857    #[test]
858    fn parse_map_jms_headers_false() {
859        let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?mapJmsHeaders=false").unwrap();
860        assert!(!cfg.map_jms_headers);
861    }
862
863    #[test]
864    fn parse_exchange_pattern_inout() {
865        let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?exchangePattern=InOut").unwrap();
866        assert_eq!(cfg.exchange_pattern, ExchangePattern::InOut);
867    }
868
869    #[test]
870    fn parse_exchange_pattern_invalid() {
871        let err =
872            JmsEndpointConfig::from_uri("jms:queue:orders?exchangePattern=invalid").unwrap_err();
873        assert!(
874            err.to_string().contains("invalid exchange pattern"),
875            "got: {}",
876            err
877        );
878    }
879
880    #[test]
881    fn parse_multiple_query_params() {
882        let cfg = JmsEndpointConfig::from_uri(
883            "jms:queue:orders?broker=primary&acknowledgementMode=Client&concurrentConsumers=3&persistentDelivery=false&priority=7",
884        )
885        .unwrap();
886        assert_eq!(cfg.broker_name, Some("primary".to_string()));
887        assert_eq!(cfg.acknowledgement_mode, AcknowledgementMode::Client);
888        assert_eq!(cfg.concurrent_consumers, 3);
889        assert!(!cfg.persistent_delivery);
890        assert_eq!(cfg.priority, Some(7));
891    }
892
893    #[test]
894    fn acknowledgement_mode_display_roundtrip() {
895        for mode in &[
896            AcknowledgementMode::Auto,
897            AcknowledgementMode::Client,
898            AcknowledgementMode::DupsOk,
899            AcknowledgementMode::Transacted,
900        ] {
901            let s = mode.to_string();
902            let parsed: AcknowledgementMode = s.parse().unwrap();
903            assert_eq!(*mode, parsed);
904        }
905    }
906
907    #[test]
908    fn transaction_mode_display_roundtrip() {
909        for mode in &[JmsTransactionMode::None, JmsTransactionMode::Session] {
910            let s = mode.to_string();
911            let parsed: JmsTransactionMode = s.parse().unwrap();
912            assert_eq!(*mode, parsed);
913        }
914    }
915
916    #[test]
917    fn exchange_pattern_display_roundtrip() {
918        for mode in &[ExchangePattern::InOnly, ExchangePattern::InOut] {
919            let s = mode.to_string();
920            let parsed: ExchangePattern = s.parse().unwrap();
921            assert_eq!(*mode, parsed);
922        }
923    }
924
925    #[test]
926    fn build_exchange_without_header_mapping() {
927        // Verify map_jms_headers=false prevents header mapping in consumer
928        // (tested indirectly through the consumer module's build_exchange)
929        let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?mapJmsHeaders=false").unwrap();
930        assert!(!cfg.map_jms_headers);
931    }
932
933    #[test]
934    fn jms_pool_config_has_reconnect_policy() {
935        let cfg = JmsPoolConfig::default();
936        assert_eq!(cfg.reconnect.max_attempts, 0); // unlimited
937        assert!(cfg.reconnect.enabled);
938    }
939}