1use std::collections::HashMap;
2use std::fmt;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::Duration;
6
7use camel_component_api::NetworkRetryPolicy;
8
9use crate::BrokerType;
10
11pub fn default_bridge_cache_dir() -> PathBuf {
12 camel_bridge::download::default_cache_dir()
13}
14
15fn default_max_bridges() -> usize {
16 8
17}
18
19fn default_bridge_start_timeout_ms() -> u64 {
20 30_000
21}
22
23pub fn default_broker_reconnect_interval_ms() -> u64 {
24 5_000
25}
26
27pub(crate) fn jms_reconnect_default() -> NetworkRetryPolicy {
31 NetworkRetryPolicy {
32 max_attempts: 0, initial_delay: Duration::from_millis(default_broker_reconnect_interval_ms()),
34 multiplier: 2.0,
35 max_delay: Duration::from_secs(30),
36 jitter_factor: 0.0,
37 ..NetworkRetryPolicy::default()
38 }
39}
40
41fn default_health_check_interval_ms() -> u64 {
42 5_000
43}
44
45#[derive(Debug, Clone, PartialEq)]
46pub enum DestinationType {
47 Queue,
48 Topic,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
68pub enum AcknowledgementMode {
69 #[default]
70 Auto,
71 Client,
72 DupsOk,
73 Transacted,
74}
75
76impl fmt::Display for AcknowledgementMode {
77 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78 match self {
79 Self::Auto => write!(f, "Auto"),
80 Self::Client => write!(f, "Client"),
81 Self::DupsOk => write!(f, "DupsOk"),
82 Self::Transacted => write!(f, "Transacted"),
83 }
84 }
85}
86
87impl FromStr for AcknowledgementMode {
88 type Err = camel_component_api::CamelError;
89
90 fn from_str(s: &str) -> Result<Self, Self::Err> {
91 match s {
92 "Auto" | "auto" => Ok(Self::Auto),
93 "Client" | "client" => Ok(Self::Client),
94 "DupsOk" | "dupsOk" | "dups_ok" => Ok(Self::DupsOk),
95 "Transacted" | "transacted" => Ok(Self::Transacted),
96 _ => Err(camel_component_api::CamelError::ProcessorError(format!(
97 "invalid acknowledgement mode '{}': expected Auto, Client, DupsOk, or Transacted",
98 s
99 ))),
100 }
101 }
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
115pub enum JmsTransactionMode {
116 #[default]
117 None,
118 Session,
119}
120
121impl fmt::Display for JmsTransactionMode {
122 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123 match self {
124 Self::None => write!(f, "None"),
125 Self::Session => write!(f, "Session"),
126 }
127 }
128}
129
130impl FromStr for JmsTransactionMode {
131 type Err = camel_component_api::CamelError;
132
133 fn from_str(s: &str) -> Result<Self, Self::Err> {
134 match s {
135 "None" | "none" => Ok(Self::None),
136 "Session" | "session" => Ok(Self::Session),
137 _ => Err(camel_component_api::CamelError::ProcessorError(format!(
138 "invalid transaction mode '{}': expected None or Session",
139 s
140 ))),
141 }
142 }
143}
144
145#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
155pub enum ExchangePattern {
156 #[default]
157 InOnly,
158 InOut,
159}
160
161impl fmt::Display for ExchangePattern {
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 match self {
164 Self::InOnly => write!(f, "InOnly"),
165 Self::InOut => write!(f, "InOut"),
166 }
167 }
168}
169
170impl FromStr for ExchangePattern {
171 type Err = camel_component_api::CamelError;
172
173 fn from_str(s: &str) -> Result<Self, Self::Err> {
174 match s {
175 "InOnly" | "inOnly" | "in_only" => Ok(Self::InOnly),
176 "InOut" | "inOut" | "in_out" => Ok(Self::InOut),
177 _ => Err(camel_component_api::CamelError::ProcessorError(format!(
178 "invalid exchange pattern '{}': expected InOnly or InOut",
179 s
180 ))),
181 }
182 }
183}
184
185fn default_concurrent_consumers() -> u32 {
188 1
189}
190
191#[derive(Debug, Clone)]
192pub struct JmsEndpointConfig {
193 pub destination_type: DestinationType,
194 pub destination_name: String,
195 pub broker_name: Option<String>,
196
197 pub acknowledgement_mode: AcknowledgementMode,
199
200 pub message_selector: Option<String>,
203
204 pub concurrent_consumers: u32,
207
208 pub transaction_mode: JmsTransactionMode,
210
211 pub time_to_live: Option<u64>,
214 pub priority: Option<u8>,
216 pub persistent_delivery: bool,
219
220 pub map_jms_headers: bool,
224
225 pub exchange_pattern: ExchangePattern,
227}
228
229impl JmsEndpointConfig {
230 pub fn from_uri(uri: &str) -> Result<Self, camel_component_api::CamelError> {
231 let (scheme, rest) = if let Some(r) = uri.strip_prefix("jms:") {
232 ("jms", r)
233 } else if let Some(r) = uri.strip_prefix("activemq:") {
234 ("activemq", r)
235 } else if let Some(r) = uri.strip_prefix("artemis:") {
236 ("artemis", r)
237 } else {
238 return Err(camel_component_api::CamelError::ProcessorError(
239 "expected scheme 'jms', 'activemq', or 'artemis'".to_string(),
240 ));
241 };
242
243 let (path, query) = match rest.split_once('?') {
244 Some((p, q)) => (p, Some(q)),
245 None => (rest, None),
246 };
247
248 let (destination_type, destination_name) =
249 match path.splitn(2, ':').collect::<Vec<_>>().as_slice() {
250 [name] if !name.is_empty() && scheme != "jms" => {
252 (DestinationType::Queue, name.to_string())
253 }
254 [prefix, name]
256 if (*prefix == "queue" || *prefix == "topic") && !name.is_empty() =>
257 {
258 let dt = if *prefix == "queue" {
259 DestinationType::Queue
260 } else {
261 DestinationType::Topic
262 };
263 (dt, name.to_string())
264 }
265 [name] if !name.is_empty() && scheme == "jms" => {
267 return Err(camel_component_api::CamelError::ProcessorError(format!(
268 "URI 'jms:{}' is ambiguous — use 'jms:queue:{}' or 'jms:topic:{}'",
269 name, name, name
270 )));
271 }
272 _ => {
273 return Err(camel_component_api::CamelError::ProcessorError(
274 "destination must be 'queue:<name>' or 'topic:<name>'".to_string(),
275 ));
276 }
277 };
278
279 let mut broker_name: Option<String> = None;
281 let mut acknowledgement_mode = AcknowledgementMode::default();
282 let mut message_selector: Option<String> = None;
283 let mut concurrent_consumers = default_concurrent_consumers();
284 let mut transaction_mode = JmsTransactionMode::default();
285 let mut time_to_live: Option<u64> = None;
286 let mut priority: Option<u8> = None;
287 let mut persistent_delivery = true;
288 let mut map_jms_headers = true;
289 let mut exchange_pattern = ExchangePattern::default();
290
291 if let Some(q) = query {
292 for kv in q.split('&') {
293 let Some((k, v)) = kv.split_once('=') else {
294 continue;
295 };
296 match k {
297 "broker" if !v.is_empty() => {
298 broker_name = Some(v.to_string());
299 }
300 "acknowledgementMode" | "acknowledgement_mode" => {
301 acknowledgement_mode = AcknowledgementMode::from_str(v)?;
302 }
303 "messageSelector" | "message_selector" if !v.is_empty() => {
304 message_selector = Some(v.to_string());
305 }
306 "concurrentConsumers" | "concurrent_consumers" => {
307 concurrent_consumers = v.parse::<u32>().map_err(|_| {
308 camel_component_api::CamelError::ProcessorError(format!(
309 "invalid concurrent_consumers '{}': expected positive integer",
310 v
311 ))
312 })?;
313 if concurrent_consumers == 0 {
314 return Err(camel_component_api::CamelError::ProcessorError(
315 "concurrent_consumers must be >= 1".to_string(),
316 ));
317 }
318 }
319 "transactionMode" | "transaction_mode" => {
320 transaction_mode = JmsTransactionMode::from_str(v)?;
321 }
322 "timeToLive" | "time_to_live" => {
323 time_to_live = Some(v.parse::<u64>().map_err(|_| {
324 camel_component_api::CamelError::ProcessorError(format!(
325 "invalid time_to_live '{}': expected non-negative integer (ms)",
326 v
327 ))
328 })?);
329 }
330 "priority" => {
331 let p = v.parse::<u8>().map_err(|_| {
332 camel_component_api::CamelError::ProcessorError(format!(
333 "invalid priority '{}': expected integer 0-9",
334 v
335 ))
336 })?;
337 if p > 9 {
338 return Err(camel_component_api::CamelError::ProcessorError(format!(
339 "invalid priority '{}': must be 0-9",
340 p
341 )));
342 }
343 priority = Some(p);
344 }
345 "persistentDelivery" | "persistent_delivery" => {
346 persistent_delivery = v.parse::<bool>().map_err(|_| {
347 camel_component_api::CamelError::ProcessorError(format!(
348 "invalid persistent_delivery '{}': expected true or false",
349 v
350 ))
351 })?;
352 }
353 "mapJmsHeaders" | "map_jms_headers" => {
354 map_jms_headers = v.parse::<bool>().map_err(|_| {
355 camel_component_api::CamelError::ProcessorError(format!(
356 "invalid map_jms_headers '{}': expected true or false",
357 v
358 ))
359 })?;
360 }
361 "exchangePattern" | "exchange_pattern" => {
362 exchange_pattern = ExchangePattern::from_str(v)?;
363 }
364 _ => {} }
366 }
367 }
368
369 Ok(JmsEndpointConfig {
370 destination_type,
371 destination_name,
372 broker_name,
373 acknowledgement_mode,
374 message_selector,
375 concurrent_consumers,
376 transaction_mode,
377 time_to_live,
378 priority,
379 persistent_delivery,
380 map_jms_headers,
381 exchange_pattern,
382 })
383 }
384}
385
386#[derive(Clone, PartialEq, serde::Deserialize)]
387pub struct BrokerConfig {
388 pub broker_url: String,
389 pub broker_type: BrokerType,
390 pub username: Option<String>,
391 pub password: Option<String>,
392}
393
394impl std::fmt::Debug for BrokerConfig {
395 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
396 f.debug_struct("BrokerConfig")
397 .field("broker_url", &self.broker_url)
398 .field("broker_type", &self.broker_type)
399 .field("username", &self.username)
400 .field("password", &self.password.as_ref().map(|_| "<redacted>"))
401 .finish()
402 }
403}
404
405#[derive(Debug, Clone, serde::Deserialize)]
406pub struct JmsPoolConfig {
407 #[serde(default)]
408 pub brokers: HashMap<String, BrokerConfig>,
409 #[serde(default = "default_max_bridges")]
410 pub max_bridges: usize,
411 #[serde(default = "default_bridge_start_timeout_ms")]
412 pub bridge_start_timeout_ms: u64,
413 #[serde(default = "default_broker_reconnect_interval_ms")]
414 pub broker_reconnect_interval_ms: u64,
415 #[serde(default = "default_health_check_interval_ms")]
416 pub health_check_interval_ms: u64,
417 #[serde(default = "default_bridge_cache_dir")]
418 pub bridge_cache_dir: PathBuf,
419 #[serde(default = "jms_reconnect_default")]
420 pub reconnect: NetworkRetryPolicy,
421}
422
423impl Default for JmsPoolConfig {
424 fn default() -> Self {
425 Self {
426 brokers: HashMap::new(),
427 max_bridges: default_max_bridges(),
428 bridge_start_timeout_ms: default_bridge_start_timeout_ms(),
429 broker_reconnect_interval_ms: default_broker_reconnect_interval_ms(),
430 health_check_interval_ms: default_health_check_interval_ms(),
431 bridge_cache_dir: default_bridge_cache_dir(),
432 reconnect: jms_reconnect_default(),
433 }
434 }
435}
436
437impl JmsPoolConfig {
438 pub fn single_broker(broker_url: impl Into<String>, broker_type: BrokerType) -> Self {
441 let url = broker_url.into();
442 let mut brokers = HashMap::new();
443 brokers.insert(
444 "default".to_string(),
445 BrokerConfig {
446 broker_url: url,
447 broker_type,
448 username: None,
449 password: None,
450 },
451 );
452 Self {
453 brokers,
454 max_bridges: 1,
455 ..Self::default()
456 }
457 }
458
459 pub fn validate(&self) -> Result<(), camel_component_api::CamelError> {
463 use camel_component_api::CamelError;
464
465 if self.max_bridges < 1 {
466 return Err(CamelError::Config("max_bridges must be >= 1".to_string()));
467 }
468
469 let known_schemes = ["tcp://", "ssl://", "failover://", "ws://", "wss://"];
470
471 for (name, bc) in &self.brokers {
472 if bc.broker_url.is_empty() {
473 return Err(CamelError::ProcessorError(format!(
474 "broker '{}' has an empty broker_url",
475 name
476 )));
477 }
478
479 let has_known_scheme = known_schemes.iter().any(|s| bc.broker_url.starts_with(s));
480 if !has_known_scheme {
481 return Err(CamelError::ProcessorError(format!(
482 "broker '{}' has an invalid broker_url '{}': must start with one of {:?}",
483 name, bc.broker_url, known_schemes
484 )));
485 }
486 }
487
488 if self.bridge_start_timeout_ms == 0 {
489 return Err(CamelError::Config(
490 "bridge_start_timeout_ms must be > 0".to_string(),
491 ));
492 }
493 if self.health_check_interval_ms == 0 {
494 return Err(CamelError::Config(
495 "health_check_interval_ms must be > 0".to_string(),
496 ));
497 }
498 if self.broker_reconnect_interval_ms == 0 {
499 return Err(CamelError::Config(
500 "broker_reconnect_interval_ms must be > 0".to_string(),
501 ));
502 }
503
504 Ok(())
505 }
506}
507
508#[cfg(test)]
509mod tests {
510 use super::*;
511 use crate::BrokerType;
512
513 #[test]
514 fn parse_jms_queue_explicit() {
515 let cfg = JmsEndpointConfig::from_uri("jms:queue:orders").unwrap();
516 assert_eq!(cfg.destination_type, DestinationType::Queue);
517 assert_eq!(cfg.destination_name, "orders");
518 assert_eq!(cfg.broker_name, None);
519 }
520
521 #[test]
522 fn parse_jms_topic_explicit() {
523 let cfg = JmsEndpointConfig::from_uri("jms:topic:events").unwrap();
524 assert_eq!(cfg.destination_type, DestinationType::Topic);
525 assert_eq!(cfg.destination_name, "events");
526 }
527
528 #[test]
529 fn parse_jms_with_broker_param() {
530 let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?broker=primary").unwrap();
531 assert_eq!(cfg.broker_name, Some("primary".to_string()));
532 assert_eq!(cfg.destination_name, "orders");
533 }
534
535 #[test]
536 fn jms_shorthand_rejected() {
537 let err = JmsEndpointConfig::from_uri("jms:orders").unwrap_err();
538 assert!(err.to_string().contains("ambiguous"), "got: {}", err);
539 }
540
541 #[test]
542 fn invalid_destination_type_returns_error() {
543 let err = JmsEndpointConfig::from_uri("jms:inbox:orders").unwrap_err();
544 assert!(
545 err.to_string().contains("'queue' or 'topic'")
546 || err.to_string().contains("queue:<name>"),
547 "got: {}",
548 err
549 );
550 }
551
552 #[test]
553 fn parse_activemq_queue_explicit() {
554 let cfg = JmsEndpointConfig::from_uri("activemq:queue:orders").unwrap();
555 assert_eq!(cfg.destination_type, DestinationType::Queue);
556 assert_eq!(cfg.destination_name, "orders");
557 }
558
559 #[test]
560 fn parse_activemq_shorthand() {
561 let cfg = JmsEndpointConfig::from_uri("activemq:orders").unwrap();
562 assert_eq!(cfg.destination_type, DestinationType::Queue);
563 assert_eq!(cfg.destination_name, "orders");
564 }
565
566 #[test]
567 fn parse_artemis_shorthand_defaults_to_queue() {
568 let cfg = JmsEndpointConfig::from_uri("artemis:events").unwrap();
569 assert_eq!(cfg.destination_type, DestinationType::Queue);
570 assert_eq!(cfg.destination_name, "events");
571 }
572
573 #[test]
574 fn parse_jms_with_empty_broker_param_treated_as_none() {
575 let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?broker=").unwrap();
576 assert_eq!(cfg.broker_name, None);
577 }
578
579 #[test]
580 fn parse_activemq_topic() {
581 let cfg = JmsEndpointConfig::from_uri("activemq:topic:events").unwrap();
582 assert_eq!(cfg.destination_type, DestinationType::Topic);
583 assert_eq!(cfg.destination_name, "events");
584 }
585
586 #[test]
587 fn single_broker_convenience() {
588 let cfg = JmsPoolConfig::single_broker("tcp://localhost:61616", BrokerType::ActiveMq);
589 assert_eq!(cfg.max_bridges, 1);
590 assert!(cfg.brokers.contains_key("default"));
591 let bc = &cfg.brokers["default"];
592 assert_eq!(bc.broker_url, "tcp://localhost:61616");
593 assert_eq!(bc.broker_type, BrokerType::ActiveMq);
594 }
595
596 #[test]
597 fn default_pool_config() {
598 let cfg = JmsPoolConfig::default();
599 assert_eq!(cfg.max_bridges, 8);
600 assert!(cfg.brokers.is_empty());
601 assert_eq!(cfg.bridge_start_timeout_ms, 30_000);
602 assert_eq!(cfg.broker_reconnect_interval_ms, 5_000);
603 assert_eq!(cfg.health_check_interval_ms, 5_000);
604 }
605
606 #[test]
607 fn validate_empty_broker_url() {
608 let cfg = JmsPoolConfig::single_broker("", BrokerType::ActiveMq);
609 let err = cfg.validate().unwrap_err();
610 assert!(err.to_string().contains("empty broker_url"), "got: {}", err);
611 }
612
613 #[test]
614 fn broker_config_debug_redacts_password() {
615 let bc = BrokerConfig {
616 broker_url: "tcp://localhost:61616".to_string(),
617 broker_type: BrokerType::ActiveMq,
618 username: Some("admin".to_string()),
619 password: Some("secret".to_string()),
620 };
621 let s = format!("{bc:?}");
622 assert!(s.contains("<redacted>"), "got: {s}");
623 assert!(!s.contains("secret"), "got: {s}");
624 }
625
626 #[test]
627 fn validate_ok() {
628 let cfg = JmsPoolConfig::single_broker("tcp://localhost:61616", BrokerType::ActiveMq);
629 cfg.validate().unwrap();
630 }
631
632 #[test]
633 fn validate_rejects_zero_bridge_start_timeout() {
634 let mut cfg = JmsPoolConfig::single_broker("tcp://localhost:61616", BrokerType::ActiveMq);
635 cfg.bridge_start_timeout_ms = 0;
636 let err = cfg.validate().unwrap_err();
637 assert!(
638 err.to_string().contains("bridge_start_timeout_ms"),
639 "got: {}",
640 err
641 );
642 }
643
644 #[test]
645 fn validate_rejects_zero_health_check_interval() {
646 let mut cfg = JmsPoolConfig::single_broker("tcp://localhost:61616", BrokerType::ActiveMq);
647 cfg.health_check_interval_ms = 0;
648 let err = cfg.validate().unwrap_err();
649 assert!(
650 err.to_string().contains("health_check_interval_ms"),
651 "got: {}",
652 err
653 );
654 }
655
656 #[test]
657 fn validate_rejects_zero_reconnect_interval() {
658 let mut cfg = JmsPoolConfig::single_broker("tcp://localhost:61616", BrokerType::ActiveMq);
659 cfg.broker_reconnect_interval_ms = 0;
660 let err = cfg.validate().unwrap_err();
661 assert!(
662 err.to_string().contains("broker_reconnect_interval_ms"),
663 "got: {}",
664 err
665 );
666 }
667
668 #[test]
671 fn rejects_zero_max_bridges() {
672 let mut cfg = JmsPoolConfig::single_broker("tcp://localhost:61616", BrokerType::ActiveMq);
673 cfg.max_bridges = 0;
674 let err = cfg.validate().unwrap_err();
675 assert!(err.to_string().contains("max_bridges"), "got: {}", err);
676 }
677
678 #[test]
679 fn rejects_zero_timeout() {
680 let mut cfg = JmsPoolConfig::single_broker("tcp://localhost:61616", BrokerType::ActiveMq);
681 cfg.bridge_start_timeout_ms = 0;
682 let err = cfg.validate().unwrap_err();
683 assert!(
684 err.to_string().contains("bridge_start_timeout_ms"),
685 "got: {}",
686 err
687 );
688 }
689
690 #[test]
693 fn rejects_empty_broker_url() {
694 let cfg = JmsPoolConfig::single_broker("", BrokerType::ActiveMq);
695 let err = cfg.validate().unwrap_err();
696 assert!(err.to_string().contains("empty broker_url"), "got: {}", err);
697 }
698
699 #[test]
700 fn rejects_unknown_broker_url_scheme() {
701 let cfg = JmsPoolConfig::single_broker("amqp://localhost:5672", BrokerType::ActiveMq);
702 let err = cfg.validate().unwrap_err();
703 assert!(
704 err.to_string().contains("invalid broker_url"),
705 "got: {}",
706 err
707 );
708 }
709
710 #[test]
711 fn accepts_known_broker_url_schemes() {
712 for url in &[
713 "tcp://localhost:61616",
714 "ssl://localhost:61617",
715 "failover://tcp://localhost:61616",
716 "ws://localhost:61618",
717 "wss://localhost:61619",
718 ] {
719 let cfg = JmsPoolConfig::single_broker(*url, BrokerType::ActiveMq);
720 assert!(cfg.validate().is_ok(), "scheme should be accepted: {url}");
721 }
722 }
723
724 #[test]
725 fn accepts_valid_config() {
726 let cfg = JmsPoolConfig::single_broker("tcp://localhost:61616", BrokerType::ActiveMq);
727 assert!(cfg.validate().is_ok());
729 }
730
731 #[test]
734 fn default_endpoint_config_has_sensible_defaults() {
735 let cfg = JmsEndpointConfig::from_uri("jms:queue:orders").unwrap();
736 assert_eq!(cfg.acknowledgement_mode, AcknowledgementMode::Auto);
737 assert_eq!(cfg.message_selector, None);
738 assert_eq!(cfg.concurrent_consumers, 1);
739 assert_eq!(cfg.transaction_mode, JmsTransactionMode::None);
740 assert_eq!(cfg.time_to_live, None);
741 assert_eq!(cfg.priority, None);
742 assert!(cfg.persistent_delivery);
743 assert!(cfg.map_jms_headers);
744 assert_eq!(cfg.exchange_pattern, ExchangePattern::InOnly);
745 }
746
747 #[test]
748 fn parse_acknowledgement_mode_client() {
749 let cfg =
750 JmsEndpointConfig::from_uri("jms:queue:orders?acknowledgementMode=Client").unwrap();
751 assert_eq!(cfg.acknowledgement_mode, AcknowledgementMode::Client);
752 }
753
754 #[test]
755 fn parse_acknowledgement_mode_dups_ok() {
756 let cfg =
757 JmsEndpointConfig::from_uri("jms:queue:orders?acknowledgement_mode=dups_ok").unwrap();
758 assert_eq!(cfg.acknowledgement_mode, AcknowledgementMode::DupsOk);
759 }
760
761 #[test]
762 fn parse_acknowledgement_mode_invalid() {
763 let err = JmsEndpointConfig::from_uri("jms:queue:orders?acknowledgementMode=invalid")
764 .unwrap_err();
765 assert!(
766 err.to_string().contains("invalid acknowledgement mode"),
767 "got: {}",
768 err
769 );
770 }
771
772 #[test]
773 fn parse_message_selector() {
774 let cfg =
775 JmsEndpointConfig::from_uri("jms:queue:orders?messageSelector=priority%20%3E%205")
776 .unwrap();
777 assert_eq!(cfg.message_selector, Some("priority%20%3E%205".to_string()));
779 }
780
781 #[test]
782 fn parse_empty_message_selector_is_none() {
783 let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?message_selector=").unwrap();
784 assert_eq!(cfg.message_selector, None);
785 }
786
787 #[test]
788 fn parse_concurrent_consumers() {
789 let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?concurrentConsumers=4").unwrap();
790 assert_eq!(cfg.concurrent_consumers, 4);
791 }
792
793 #[test]
794 fn parse_concurrent_consumers_zero_rejected() {
795 let err =
796 JmsEndpointConfig::from_uri("jms:queue:orders?concurrentConsumers=0").unwrap_err();
797 assert!(
798 err.to_string()
799 .contains("concurrent_consumers must be >= 1"),
800 "got: {}",
801 err
802 );
803 }
804
805 #[test]
806 fn parse_concurrent_consumers_invalid() {
807 let err =
808 JmsEndpointConfig::from_uri("jms:queue:orders?concurrentConsumers=abc").unwrap_err();
809 assert!(
810 err.to_string().contains("invalid concurrent_consumers"),
811 "got: {}",
812 err
813 );
814 }
815
816 #[test]
817 fn parse_transaction_mode_session() {
818 let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?transactionMode=Session").unwrap();
819 assert_eq!(cfg.transaction_mode, JmsTransactionMode::Session);
820 }
821
822 #[test]
823 fn parse_transaction_mode_invalid() {
824 let err =
825 JmsEndpointConfig::from_uri("jms:queue:orders?transaction_mode=invalid").unwrap_err();
826 assert!(
827 err.to_string().contains("invalid transaction mode"),
828 "got: {}",
829 err
830 );
831 }
832
833 #[test]
834 fn parse_time_to_live() {
835 let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?timeToLive=30000").unwrap();
836 assert_eq!(cfg.time_to_live, Some(30_000));
837 }
838
839 #[test]
840 fn parse_priority() {
841 let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?priority=5").unwrap();
842 assert_eq!(cfg.priority, Some(5));
843 }
844
845 #[test]
846 fn parse_priority_above_9_rejected() {
847 let err = JmsEndpointConfig::from_uri("jms:queue:orders?priority=10").unwrap_err();
848 assert!(err.to_string().contains("must be 0-9"), "got: {}", err);
849 }
850
851 #[test]
852 fn parse_persistent_delivery_false() {
853 let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?persistentDelivery=false").unwrap();
854 assert!(!cfg.persistent_delivery);
855 }
856
857 #[test]
858 fn parse_map_jms_headers_false() {
859 let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?mapJmsHeaders=false").unwrap();
860 assert!(!cfg.map_jms_headers);
861 }
862
863 #[test]
864 fn parse_exchange_pattern_inout() {
865 let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?exchangePattern=InOut").unwrap();
866 assert_eq!(cfg.exchange_pattern, ExchangePattern::InOut);
867 }
868
869 #[test]
870 fn parse_exchange_pattern_invalid() {
871 let err =
872 JmsEndpointConfig::from_uri("jms:queue:orders?exchangePattern=invalid").unwrap_err();
873 assert!(
874 err.to_string().contains("invalid exchange pattern"),
875 "got: {}",
876 err
877 );
878 }
879
880 #[test]
881 fn parse_multiple_query_params() {
882 let cfg = JmsEndpointConfig::from_uri(
883 "jms:queue:orders?broker=primary&acknowledgementMode=Client&concurrentConsumers=3&persistentDelivery=false&priority=7",
884 )
885 .unwrap();
886 assert_eq!(cfg.broker_name, Some("primary".to_string()));
887 assert_eq!(cfg.acknowledgement_mode, AcknowledgementMode::Client);
888 assert_eq!(cfg.concurrent_consumers, 3);
889 assert!(!cfg.persistent_delivery);
890 assert_eq!(cfg.priority, Some(7));
891 }
892
893 #[test]
894 fn acknowledgement_mode_display_roundtrip() {
895 for mode in &[
896 AcknowledgementMode::Auto,
897 AcknowledgementMode::Client,
898 AcknowledgementMode::DupsOk,
899 AcknowledgementMode::Transacted,
900 ] {
901 let s = mode.to_string();
902 let parsed: AcknowledgementMode = s.parse().unwrap();
903 assert_eq!(*mode, parsed);
904 }
905 }
906
907 #[test]
908 fn transaction_mode_display_roundtrip() {
909 for mode in &[JmsTransactionMode::None, JmsTransactionMode::Session] {
910 let s = mode.to_string();
911 let parsed: JmsTransactionMode = s.parse().unwrap();
912 assert_eq!(*mode, parsed);
913 }
914 }
915
916 #[test]
917 fn exchange_pattern_display_roundtrip() {
918 for mode in &[ExchangePattern::InOnly, ExchangePattern::InOut] {
919 let s = mode.to_string();
920 let parsed: ExchangePattern = s.parse().unwrap();
921 assert_eq!(*mode, parsed);
922 }
923 }
924
925 #[test]
926 fn build_exchange_without_header_mapping() {
927 let cfg = JmsEndpointConfig::from_uri("jms:queue:orders?mapJmsHeaders=false").unwrap();
930 assert!(!cfg.map_jms_headers);
931 }
932
933 #[test]
934 fn jms_pool_config_has_reconnect_policy() {
935 let cfg = JmsPoolConfig::default();
936 assert_eq!(cfg.reconnect.max_attempts, 0); assert!(cfg.reconnect.enabled);
938 }
939}