1use std::collections::HashMap;
2use std::future::Future;
3use std::path::PathBuf;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::task::{Context, Poll};
8use std::time::{Duration, Instant};
9
10use camel_bridge::{
11 channel::connect_channel,
12 download::ensure_binary,
13 health::wait_for_health,
14 process::{BridgeProcess, BridgeProcessConfig, BrokerType},
15};
16use camel_component_api::{
17 BoxProcessor, CamelError, Component, Consumer, Endpoint, Exchange, NetworkRetryPolicy,
18 ProducerContext,
19};
20use dashmap::DashMap;
21use tokio::sync::{Mutex, watch};
22use tonic::transport::Channel;
23use tower::Service;
24use tracing::{info, warn};
25
26use crate::config::{BrokerConfig, JmsEndpointConfig, JmsPoolConfig};
27use crate::consumer::JmsConsumer;
28use crate::health::JmsHealthCheck;
29use crate::producer::JmsProducer;
30use crate::proto::{HealthRequest, bridge_service_client::BridgeServiceClient};
31
32pub const BRIDGE_TRANSPORT_ERROR_PREFIX: &str = "JMS gRPC ";
40const MAX_RESTART_ATTEMPTS: u32 = 10;
41
42#[derive(Debug, Clone)]
45pub enum BridgeState {
46 Starting,
47 Ready { channel: Channel },
48 Degraded(String),
49 Restarting { attempt: u32, next_at: Instant },
50 Stopped,
51}
52
53pub struct BridgeSlot {
56 pub name: String,
57 pub broker_url: String,
58 pub broker_type: BrokerType,
59 pub credentials: Option<(String, String)>,
60 pub state_rx: watch::Receiver<BridgeState>,
61 pub(crate) state_tx: watch::Sender<BridgeState>,
62 pub process: Arc<tokio::sync::Mutex<Option<BridgeProcess>>>,
64 pub(crate) health_monitor_handle: Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>>,
67}
68
69impl std::fmt::Debug for BridgeSlot {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct("BridgeSlot")
72 .field("name", &self.name)
73 .field("broker_url", &self.broker_url)
74 .field("broker_type", &self.broker_type)
75 .finish()
76 }
77}
78
79pub struct JmsBridgePool {
82 pub(crate) slots: DashMap<String, Arc<BridgeSlot>>,
83 pub(crate) config: HashMap<String, BrokerConfig>,
84 pub(crate) bridge_start_timeout_ms: u64,
85 pub(crate) reconnect: NetworkRetryPolicy,
86 pub(crate) health_check_interval_ms: u64,
87 pub(crate) bridge_version: String,
88 pub(crate) bridge_cache_dir: PathBuf,
89 pub(crate) max_bridges: usize,
91 bridge_create_lock: Mutex<()>,
93 pub(crate) shutting_down: Arc<AtomicBool>,
94}
95
96impl JmsBridgePool {
97 pub fn from_config(pool_config: JmsPoolConfig) -> Result<Self, CamelError> {
98 pool_config.validate()?;
99 let mut reconnect = pool_config.reconnect;
102 if pool_config.broker_reconnect_interval_ms
103 != crate::config::default_broker_reconnect_interval_ms()
104 {
105 reconnect.initial_delay =
106 Duration::from_millis(pool_config.broker_reconnect_interval_ms);
107 }
108 Ok(Self {
109 slots: DashMap::new(),
110 config: pool_config.brokers,
111 bridge_start_timeout_ms: pool_config.bridge_start_timeout_ms,
112 reconnect,
113 health_check_interval_ms: pool_config.health_check_interval_ms,
114 bridge_version: crate::BRIDGE_VERSION.to_string(),
115 bridge_cache_dir: pool_config.bridge_cache_dir,
116 max_bridges: pool_config.max_bridges,
117 bridge_create_lock: Mutex::new(()),
118 shutting_down: Arc::new(AtomicBool::new(false)),
119 })
120 }
121
122 pub fn resolve_broker_name(&self, name: Option<&str>) -> Result<String, CamelError> {
129 match name {
130 Some(n) => {
131 if self.config.contains_key(n) {
132 Ok(n.to_string())
133 } else {
134 Err(CamelError::ProcessorError(format!(
135 "Unknown JMS broker '{n}' — declare it in [components.jms.brokers] in Camel.toml",
136 )))
137 }
138 }
139 None => match self.config.len() {
140 0 => Err(CamelError::ProcessorError(
141 "No JMS brokers configured — declare at least one in [components.jms.brokers] in Camel.toml".to_string(),
142 )),
143 1 => Ok(self.config.keys().next().unwrap().clone()), _ => Err(CamelError::ProcessorError(format!(
145 "Multiple JMS brokers configured ({}); specify one with ?broker=<name> in the URI",
146 self.config.keys().cloned().collect::<Vec<_>>().join(", ")
147 ))),
148 },
149 }
150 }
151
152 pub fn resolve_broker_type(&self, scheme: &str, broker_name: &str) -> BrokerType {
154 let config_type = self
155 .config
156 .get(broker_name)
157 .map(|c| c.broker_type.clone())
158 .unwrap_or(BrokerType::Generic);
159
160 match scheme {
161 "activemq" => {
162 if config_type != BrokerType::ActiveMq && config_type != BrokerType::Generic {
163 warn!(
164 "Scheme 'activemq' overrides configured broker_type '{:?}' for broker '{}'",
165 config_type, broker_name
166 );
167 }
168 BrokerType::ActiveMq
169 }
170 "artemis" => {
171 if config_type != BrokerType::Artemis && config_type != BrokerType::Generic {
172 warn!(
173 "Scheme 'artemis' overrides configured broker_type '{:?}' for broker '{}'",
174 config_type, broker_name
175 );
176 }
177 BrokerType::Artemis
178 }
179 _ => config_type,
180 }
181 }
182
183 pub async fn get_or_create_slot(
186 &self,
187 broker_name: &str,
188 ) -> Result<Arc<BridgeSlot>, CamelError> {
189 if let Some(slot) = self.slots.get(broker_name) {
190 return Ok(Arc::clone(&*slot));
191 }
192
193 let _guard = self.bridge_create_lock.lock().await;
196
197 if let Some(slot) = self.slots.get(broker_name) {
199 return Ok(Arc::clone(&*slot));
200 }
201
202 let total_count = self.slots.len();
209 if total_count >= self.max_bridges {
210 return Err(CamelError::Config(format!(
211 "JMS bridge limit reached: {total_count} bridge(s) >= max_bridges ({})",
212 self.max_bridges
213 )));
214 }
215
216 let broker_config = self.config.get(broker_name).ok_or_else(|| {
217 CamelError::ProcessorError(format!("Unknown JMS broker '{}'", broker_name))
218 })?;
219
220 let broker_url = broker_config.broker_url.clone();
222 let broker_type = broker_config.broker_type.clone();
223 let credentials = match (&broker_config.username, &broker_config.password) {
224 (Some(u), Some(p)) => Some((u.clone(), p.clone())),
225 _ => None,
226 };
227
228 let slot = match self.slots.entry(broker_name.to_string()) {
229 dashmap::Entry::Occupied(existing) => {
230 return Ok(Arc::clone(existing.get()));
231 }
232 dashmap::Entry::Vacant(entry) => {
233 let (state_tx, state_rx) = watch::channel(BridgeState::Starting);
234 let slot = Arc::new(BridgeSlot {
235 name: broker_name.to_string(),
236 broker_url: broker_url.clone(),
237 broker_type: broker_type.clone(),
238 credentials: credentials.clone(),
239 state_rx,
240 state_tx,
241 process: Arc::new(tokio::sync::Mutex::new(None)),
242 health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
243 });
244 entry.insert(Arc::clone(&slot));
245 slot
246 }
247 };
248
249 let start_result = Self::start_bridge_process(
250 &self.bridge_version,
251 &self.bridge_cache_dir,
252 self.bridge_start_timeout_ms,
253 &broker_url,
254 &broker_type,
255 &credentials,
256 )
257 .await;
258
259 match start_result {
260 Ok((process, channel)) => {
261 {
262 let mut guard = slot.process.lock().await;
263 *guard = Some(process);
264 }
265 let _ = slot.state_tx.send(BridgeState::Ready { channel });
266 }
267 Err(e) => {
268 let _ = slot
269 .state_tx
270 .send(BridgeState::Degraded(format!("Initial start failed: {e}")));
271 }
272 }
273
274 self.spawn_health_monitor(Arc::clone(&slot)).await;
275
276 Ok(slot)
277 }
278
279 pub fn restart_slot(&self, broker_name: &str) {
281 if let Some(slot) = self.slots.get(broker_name) {
282 let _ = slot.state_tx.send(BridgeState::Restarting {
283 attempt: 0,
284 next_at: Instant::now(),
285 });
286 }
287 }
288
289 pub async fn refresh_slot_channel(&self, broker_name: &str) -> Result<(), CamelError> {
294 let slot = self
295 .slots
296 .get(broker_name)
297 .map(|s| Arc::clone(&*s))
298 .ok_or_else(|| {
299 CamelError::ProcessorError(format!("Unknown JMS broker '{}'", broker_name))
300 })?;
301
302 let port = {
303 let guard = slot.process.lock().await;
304 let process = guard.as_ref().ok_or_else(|| {
305 CamelError::ProcessorError(format!(
306 "JMS broker '{}' has no running bridge process",
307 broker_name
308 ))
309 })?;
310 process.grpc_port()
311 };
312
313 let channel = connect_channel(port).await.map_err(|e| {
314 CamelError::ProcessorError(format!(
315 "JMS broker '{}' channel refresh failed: {}",
316 broker_name, e
317 ))
318 })?;
319
320 let _ = slot.state_tx.send(BridgeState::Ready { channel });
321 Ok(())
322 }
323
324 pub fn begin_shutdown(&self) {
325 self.shutting_down.store(true, Ordering::SeqCst);
326 }
327
328 pub async fn shutdown(&self) -> Result<(), CamelError> {
330 self.begin_shutdown();
331 let names: Vec<String> = self.slots.iter().map(|e| e.key().clone()).collect();
332 let mut errors: Vec<String> = Vec::new();
333
334 for name in names {
335 if let Some((_, slot)) = self.slots.remove(&name) {
336 let _ = slot.state_tx.send(BridgeState::Stopped);
338
339 let process = {
341 let mut guard = slot.process.lock().await;
342 guard.take()
343 };
344 if let Some(p) = process
345 && let Err(e) = p.stop().await
346 {
347 errors.push(format!("broker '{}': process stop failed: {e}", slot.name));
348 }
349
350 let monitor_handle = {
352 let mut guard = slot.health_monitor_handle.lock().await;
353 guard.take()
354 };
355 if let Some(mut h) = monitor_handle
356 && tokio::time::timeout(Duration::from_secs(5), &mut h)
357 .await
358 .is_err()
359 {
360 h.abort();
361 let _ = h.await;
362 warn!(
363 "health monitor for '{}' did not stop in 5s; aborted",
364 slot.name
365 );
366 }
367 }
368 }
369
370 if errors.is_empty() {
371 Ok(())
372 } else {
373 Err(CamelError::ProcessorError(format!(
374 "JMS pool shutdown completed with {} error(s): {}",
375 errors.len(),
376 errors.join("; ")
377 )))
378 }
379 }
380
381 async fn spawn_health_monitor(&self, slot: Arc<BridgeSlot>) {
382 let health_interval = self.health_check_interval_ms;
383 let bridge_version = self.bridge_version.clone();
384 let bridge_cache_dir = self.bridge_cache_dir.clone();
385 let start_timeout_ms = self.bridge_start_timeout_ms;
386 let handle_ref = Arc::clone(&slot.health_monitor_handle);
387 let shutting_down = Arc::clone(&self.shutting_down);
388 let broker_name = slot.name.clone();
389
390 let handle = tokio::spawn(async move {
391 loop {
392 let state = slot.state_rx.borrow().clone();
393 match state {
394 BridgeState::Stopped => {
395 info!("Health monitor for '{}' exiting (Stopped)", slot.name);
396 break;
397 }
398 BridgeState::Ready { ref channel } => {
399 tokio::time::sleep(Duration::from_millis(health_interval)).await;
400 let mut client = BridgeServiceClient::new(channel.clone());
401 let health_timeout = Duration::from_secs(3);
402 match tokio::time::timeout(health_timeout, client.health(HealthRequest {}))
403 .await
404 {
405 Ok(Ok(_)) => {}
406 Ok(Err(e)) => {
407 warn!(
408 "Health check failed for broker '{}': {e}. Marking Degraded.",
409 slot.name
410 );
411 let _ = slot.state_tx.send(BridgeState::Degraded(e.to_string()));
412 }
413 Err(_) => {
414 let msg = format!(
415 "health RPC timed out after {}ms",
416 health_timeout.as_millis()
417 );
418 warn!(
419 "Health check timed out for broker '{}': {}. Marking Degraded.",
420 slot.name, msg
421 );
422 let _ = slot.state_tx.send(BridgeState::Degraded(msg));
423 }
424 }
425 }
426 BridgeState::Degraded(_) | BridgeState::Starting => {
427 if matches!(*slot.state_rx.borrow(), BridgeState::Stopped) {
428 break;
429 }
430 if shutting_down.load(Ordering::SeqCst) {
431 tracing::info!(
432 "Pool shutting down — not restarting bridge for broker '{}'",
433 broker_name
434 );
435 break;
436 }
437 let _ = slot.state_tx.send(BridgeState::Restarting {
438 attempt: 0,
439 next_at: Instant::now(),
440 });
441 }
442 BridgeState::Restarting { attempt, next_at } => {
443 if shutting_down.load(Ordering::SeqCst) {
444 tracing::info!(
445 "Pool shutting down — aborting restart for broker '{}'",
446 broker_name
447 );
448 break;
449 }
450
451 let now = Instant::now();
452 if now < next_at {
453 tokio::time::sleep(next_at - now).await;
454 }
455
456 info!(
457 "Restarting bridge for broker '{}' (attempt {})",
458 slot.name,
459 attempt + 1
460 );
461
462 if attempt >= MAX_RESTART_ATTEMPTS {
463 tracing::error!(
465 "Max restart attempts ({}) reached for broker '{}' — staying degraded",
466 attempt,
467 broker_name
468 );
469 let _ = slot.state_tx.send(BridgeState::Degraded(format!(
470 "max restart attempts ({}) exceeded",
471 attempt
472 )));
473 break;
474 }
475
476 let old_process = {
477 let mut guard = slot.process.lock().await;
478 guard.take()
479 };
480 if let Some(p) = old_process {
481 let _ = p.stop().await;
482 }
483
484 let start_result = Self::start_bridge_process(
485 &bridge_version,
486 &bridge_cache_dir,
487 start_timeout_ms,
488 &slot.broker_url,
489 &slot.broker_type,
490 &slot.credentials,
491 )
492 .await;
493
494 match start_result {
495 Ok((process, channel)) => {
496 if matches!(*slot.state_rx.borrow(), BridgeState::Stopped) {
499 let _ = process.stop().await;
500 break;
501 }
502 {
503 let mut guard = slot.process.lock().await;
504 *guard = Some(process);
505 }
506 let _ = slot.state_tx.send(BridgeState::Ready { channel });
507 info!("Broker '{}' bridge restarted successfully", slot.name);
508 }
509 Err(e) => {
510 if matches!(*slot.state_rx.borrow(), BridgeState::Stopped) {
512 break;
513 }
514 let delay_secs = std::cmp::min(5 * 2u64.pow(attempt), 120);
515 let next = Instant::now() + Duration::from_secs(delay_secs);
516 warn!(
517 "Failed to restart bridge for '{}' (attempt {}): {e}. Retry in {delay_secs}s",
518 slot.name,
519 attempt + 1
520 );
521 let _ = slot.state_tx.send(BridgeState::Restarting {
522 attempt: attempt + 1,
523 next_at: next,
524 });
525 }
526 }
527 }
528 }
529 }
530 });
531
532 let mut guard = handle_ref.lock().await;
534 *guard = Some(handle);
535 }
536
537 async fn start_bridge_process(
538 bridge_version: &str,
539 bridge_cache_dir: &std::path::Path,
540 start_timeout_ms: u64,
541 broker_url: &str,
542 broker_type: &BrokerType,
543 credentials: &Option<(String, String)>,
544 ) -> Result<(BridgeProcess, Channel), CamelError> {
545 info!(
546 "Starting JMS bridge process for {}...",
547 redact_url(broker_url)
548 );
549 let binary_path = ensure_binary(bridge_version, bridge_cache_dir)
550 .await
551 .map_err(|e| {
552 CamelError::ProcessorError(format!("JMS bridge binary unavailable: {e}"))
553 })?;
554
555 let process_config = BridgeProcessConfig::jms(
556 binary_path,
557 broker_url.to_string(),
558 broker_type.clone(),
559 credentials.as_ref().map(|(u, _)| u.clone()),
560 credentials
561 .as_ref()
562 .map(|(_, p)| camel_bridge::process::Redacted::new(p.clone())),
563 start_timeout_ms,
564 );
565
566 let total_timeout = Duration::from_millis(start_timeout_ms);
567 let result = tokio::time::timeout(total_timeout, async {
568 let process = BridgeProcess::start(&process_config)
569 .await
570 .map_err(|e| CamelError::ProcessorError(format!("JMS bridge start failed: {e}")))?;
571
572 let port = process.grpc_port();
573 let channel = connect_channel(port).await.map_err(|e| {
574 CamelError::ProcessorError(format!("JMS bridge channel connect failed: {e}"))
575 })?;
576
577 wait_for_health(&channel, Duration::from_secs(10), |ch| {
578 let mut client = BridgeServiceClient::new(ch);
579 async move {
580 let resp = client.health(HealthRequest {}).await?;
581 Ok(resp.into_inner().healthy)
582 }
583 })
584 .await
585 .map_err(|e| {
586 CamelError::ProcessorError(format!("JMS bridge health check failed: {e}"))
587 })?;
588
589 Ok::<(BridgeProcess, Channel), CamelError>((process, channel))
590 })
591 .await
592 .map_err(|_| {
593 CamelError::ProcessorError(format!(
594 "JMS bridge start timed out after {}ms",
595 start_timeout_ms
596 ))
597 })??;
598
599 Ok(result)
600 }
601}
602
603#[derive(Clone)]
606pub struct JmsComponent {
607 scheme: String,
608 pool: Arc<JmsBridgePool>,
609}
610
611impl JmsComponent {
612 pub fn with_scheme(scheme: impl Into<String>, pool: Arc<JmsBridgePool>) -> Self {
613 Self {
614 scheme: scheme.into(),
615 pool,
616 }
617 }
618
619 pub fn scheme(&self) -> &str {
620 &self.scheme
621 }
622
623 #[cfg(test)]
625 pub async fn send_for_test(
626 &self,
627 destination: &str,
628 body: &[u8],
629 content_type: &str,
630 ) -> Result<String, CamelError> {
631 let broker_name = self.pool.resolve_broker_name(None)?;
632 let slot = self.pool.get_or_create_slot(&broker_name).await?;
633 let channel = match &*slot.state_rx.borrow() {
634 BridgeState::Ready { channel } => channel.clone(),
635 other => {
636 return Err(CamelError::ProcessorError(format!(
637 "Bridge not ready: {:?}",
638 other
639 )));
640 }
641 };
642 let mut client = BridgeServiceClient::new(channel);
643 let r = client
644 .send(crate::proto::SendRequest {
645 destination: destination.to_string(),
646 body: body.to_vec(),
647 headers: Default::default(),
648 content_type: content_type.to_string(),
649 })
650 .await
651 .map_err(|e| CamelError::ProcessorError(format!("test send error: {e}")))?;
652 Ok(r.into_inner().message_id)
653 }
654}
655
656impl Component for JmsComponent {
657 fn scheme(&self) -> &str {
658 &self.scheme
659 }
660
661 fn create_endpoint(
662 &self,
663 uri: &str,
664 ctx: &dyn camel_component_api::ComponentContext,
665 ) -> Result<Box<dyn Endpoint>, CamelError> {
666 let endpoint_config = JmsEndpointConfig::from_uri(uri)?;
667 let broker_name = self
668 .pool
669 .resolve_broker_name(endpoint_config.broker_name.as_deref())?;
670 let resolved_broker_type = self.pool.resolve_broker_type(&self.scheme, &broker_name);
671
672 let health_check = JmsHealthCheck::new(Arc::clone(&self.pool), broker_name.clone());
673 ctx.register_current_route_health_check(Arc::new(health_check));
674
675 Ok(Box::new(JmsEndpoint {
676 pool: Arc::clone(&self.pool),
677 uri: uri.to_string(),
678 broker_name,
679 resolved_broker_type,
680 endpoint_config,
681 }))
682 }
683}
684
685struct JmsEndpoint {
688 pool: Arc<JmsBridgePool>,
689 uri: String,
690 broker_name: String,
691 resolved_broker_type: BrokerType,
692 endpoint_config: JmsEndpointConfig,
693}
694
695impl Endpoint for JmsEndpoint {
696 fn uri(&self) -> &str {
697 &self.uri
698 }
699
700 fn create_producer(
701 &self,
702 rt: Arc<dyn camel_component_api::RuntimeObservability>,
703 _ctx: &ProducerContext,
704 ) -> Result<BoxProcessor, CamelError> {
705 Ok(BoxProcessor::new(LazyJmsProducer {
706 pool: Arc::clone(&self.pool),
707 broker_name: self.broker_name.clone(),
708 endpoint_config: self.endpoint_config.clone(),
709 resolved_broker_type: self.resolved_broker_type.clone(),
710 runtime: rt,
711 }))
712 }
713
714 fn create_consumer(
715 &self,
716 rt: Arc<dyn camel_component_api::RuntimeObservability>,
717 ) -> Result<Box<dyn Consumer>, CamelError> {
718 Ok(Box::new(JmsConsumer::new(
719 Arc::clone(&self.pool),
720 self.broker_name.clone(),
721 self.endpoint_config.clone(),
722 self.pool.reconnect.clone(),
723 rt,
724 )))
725 }
726}
727
728#[derive(Clone)]
729struct LazyJmsProducer {
730 pool: Arc<JmsBridgePool>,
731 broker_name: String,
732 endpoint_config: JmsEndpointConfig,
733 #[allow(dead_code)]
734 resolved_broker_type: BrokerType,
735 #[allow(dead_code)]
738 runtime: Arc<dyn camel_component_api::RuntimeObservability>,
739}
740
741impl Service<Exchange> for LazyJmsProducer {
742 type Response = Exchange;
743 type Error = CamelError;
744 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
745
746 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
747 if let Some(slot) = self.pool.slots.get(&self.broker_name) {
750 match &*slot.state_rx.borrow() {
751 BridgeState::Ready { .. } => return Poll::Ready(Ok(())),
752 BridgeState::Starting | BridgeState::Restarting { .. } => {
753 let waker = cx.waker().clone();
759 let mut rx = slot.state_rx.clone();
760 if let Ok(handle) = tokio::runtime::Handle::try_current() {
761 handle.spawn(async move {
762 let _ = rx.changed().await;
763 waker.wake();
764 });
765 } else {
766 waker.wake_by_ref();
767 }
768 return Poll::Pending;
769 }
770 BridgeState::Degraded(reason) => {
771 return Poll::Ready(Err(CamelError::ProcessorError(format!(
772 "JMS broker '{}' is degraded: {}",
773 self.broker_name, reason
774 ))));
775 }
776 BridgeState::Stopped => {
777 return Poll::Ready(Err(CamelError::ProcessorError(format!(
778 "JMS broker '{}' is stopped",
779 self.broker_name
780 ))));
781 }
782 }
783 }
784 Poll::Ready(Ok(()))
785 }
786
787 fn call(&mut self, exchange: Exchange) -> Self::Future {
788 let pool = Arc::clone(&self.pool);
789 let broker_name = self.broker_name.clone();
790 let endpoint_config = self.endpoint_config.clone();
791
792 Box::pin(async move {
793 let slot = pool.get_or_create_slot(&broker_name).await?;
794 let mut rx = slot.state_rx.clone();
795
796 loop {
797 let state = rx.borrow().clone();
798 match state {
799 BridgeState::Ready { channel } => {
800 let mut producer = JmsProducer::new(channel, endpoint_config.clone());
801 match producer.call(exchange).await {
802 Ok(done) => return Ok(done),
803 Err(first_err) if is_bridge_transport_error(&first_err) => {
804 warn!(
805 broker = %broker_name,
806 error = %first_err,
807 "JMS send transport error; refreshing channel (no automatic resend)"
808 );
809
810 if let Err(refresh_err) =
811 pool.refresh_slot_channel(&broker_name).await
812 {
813 warn!(
814 broker = %broker_name,
815 error = %refresh_err,
816 "JMS channel refresh failed; requesting bridge restart"
817 );
818 pool.restart_slot(&broker_name);
819 }
820
821 return Err(first_err);
826 }
827 Err(other_err) => return Err(other_err),
828 }
829 }
830 BridgeState::Degraded(reason) => {
831 return Err(CamelError::ProcessorError(format!(
832 "JMS broker '{}' is degraded: {}",
833 broker_name, reason
834 )));
835 }
836 BridgeState::Stopped => {
837 return Err(CamelError::ProcessorError(format!(
838 "JMS broker '{}' is stopped",
839 broker_name
840 )));
841 }
842 BridgeState::Starting | BridgeState::Restarting { .. } => {
843 if rx.changed().await.is_err() {
844 return Err(CamelError::ProcessorError(format!(
845 "JMS broker '{}' state channel closed",
846 broker_name
847 )));
848 }
849 }
850 }
851 }
852 })
853 }
854}
855
856fn redact_url(url: &str) -> String {
861 if let Some(pos) = url.find("://") {
863 let scheme = &url[..pos + 3]; let rest = &url[pos + 3..];
865 if let Some(at_pos) = rest.find('@') {
867 return format!("{}***@{}", scheme, &rest[at_pos + 1..]);
868 }
869 }
870 url.to_string()
871}
872
873pub fn is_bridge_transport_error(err: &CamelError) -> bool {
874 match err {
879 CamelError::ProcessorError(msg) => msg.starts_with(BRIDGE_TRANSPORT_ERROR_PREFIX),
880 _ => false,
881 }
882}
883
884#[cfg(test)]
887mod tests {
888 use camel_component_api::test_support::PanicRuntimeObservability;
889 fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
890 std::sync::Arc::new(PanicRuntimeObservability)
891 }
892 fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
893 std::sync::Arc::new(PanicRuntimeObservability)
894 }
895
896 use super::*;
897 use crate::config::{BrokerConfig, JmsPoolConfig};
898 use std::collections::HashMap;
899
900 #[test]
901 fn from_config_accepts_empty_brokers() {
902 let pool_config = JmsPoolConfig::default();
903 let result = JmsBridgePool::from_config(pool_config);
904 assert!(result.is_ok());
905 }
906
907 #[test]
908 fn resolve_broker_name_with_explicit_name() {
909 let pool = JmsBridgePool::from_config(JmsPoolConfig::single_broker(
910 "tcp://localhost:61616",
911 BrokerType::ActiveMq,
912 ))
913 .unwrap();
914 assert_eq!(
915 pool.resolve_broker_name(Some("default")).unwrap(),
916 "default"
917 );
918 }
919
920 #[test]
921 fn resolve_broker_name_default() {
922 let pool = JmsBridgePool::from_config(JmsPoolConfig::single_broker(
923 "tcp://localhost:61616",
924 BrokerType::ActiveMq,
925 ))
926 .unwrap();
927 assert_eq!(pool.resolve_broker_name(None).unwrap(), "default");
928 }
929
930 #[test]
931 fn resolve_broker_name_unknown_returns_error() {
932 let pool = JmsBridgePool::from_config(JmsPoolConfig::single_broker(
933 "tcp://localhost:61616",
934 BrokerType::ActiveMq,
935 ))
936 .unwrap();
937 let err = pool.resolve_broker_name(Some("unknown")).unwrap_err();
938 assert!(
939 err.to_string().contains("Unknown JMS broker 'unknown'"),
940 "got: {}",
941 err
942 );
943 }
944
945 #[test]
946 fn resolve_broker_type_scheme_overrides() {
947 let pool = JmsBridgePool::from_config(JmsPoolConfig::single_broker(
948 "tcp://localhost:61616",
949 BrokerType::Generic,
950 ))
951 .unwrap();
952 assert_eq!(
953 pool.resolve_broker_type("activemq", "default"),
954 BrokerType::ActiveMq
955 );
956 assert_eq!(
957 pool.resolve_broker_type("artemis", "default"),
958 BrokerType::Artemis
959 );
960 assert_eq!(
961 pool.resolve_broker_type("jms", "default"),
962 BrokerType::Generic
963 );
964 }
965
966 #[test]
967 fn resolve_broker_type_activemq_scheme_overrides_artemis_config() {
968 let pool = JmsBridgePool::from_config(JmsPoolConfig {
969 brokers: HashMap::from([(
970 "main".to_string(),
971 BrokerConfig {
972 broker_url: "tcp://localhost:61616".to_string(),
973 broker_type: BrokerType::Artemis,
974 username: None,
975 password: None,
976 },
977 )]),
978 ..JmsPoolConfig::default()
979 })
980 .unwrap();
981 assert_eq!(
982 pool.resolve_broker_type("activemq", "main"),
983 BrokerType::ActiveMq
984 );
985 assert_eq!(pool.resolve_broker_type("jms", "main"), BrokerType::Artemis);
986 }
987
988 #[test]
989 fn create_endpoint_resolves_broker() {
990 let pool = Arc::new(
991 JmsBridgePool::from_config(JmsPoolConfig::single_broker(
992 "tcp://localhost:61616",
993 BrokerType::ActiveMq,
994 ))
995 .unwrap(),
996 );
997 let component = JmsComponent::with_scheme("jms", pool);
998 let endpoint = component.create_endpoint(
999 "jms:queue:orders",
1000 &camel_component_api::NoOpComponentContext,
1001 );
1002 assert!(endpoint.is_ok(), "got: {:?}", endpoint.err());
1003 }
1004
1005 #[test]
1006 fn create_endpoint_rejects_wrong_scheme() {
1007 let pool = Arc::new(
1008 JmsBridgePool::from_config(JmsPoolConfig::single_broker(
1009 "tcp://localhost:61616",
1010 BrokerType::ActiveMq,
1011 ))
1012 .unwrap(),
1013 );
1014 let component = JmsComponent::with_scheme("jms", pool);
1015 let err = component
1016 .create_endpoint("kafka:orders", &camel_component_api::NoOpComponentContext)
1017 .err()
1018 .unwrap();
1019 assert!(
1020 err.to_string()
1021 .contains("expected scheme 'jms', 'activemq', or 'artemis'"),
1022 "got: {}",
1023 err
1024 );
1025 }
1026
1027 #[test]
1028 fn create_endpoint_with_explicit_broker_param() {
1029 let pool = Arc::new(
1030 JmsBridgePool::from_config(JmsPoolConfig {
1031 brokers: HashMap::from([
1032 (
1033 "primary".to_string(),
1034 BrokerConfig {
1035 broker_url: "tcp://primary:61616".to_string(),
1036 broker_type: BrokerType::ActiveMq,
1037 username: None,
1038 password: None,
1039 },
1040 ),
1041 (
1042 "secondary".to_string(),
1043 BrokerConfig {
1044 broker_url: "tcp://secondary:61616".to_string(),
1045 broker_type: BrokerType::Artemis,
1046 username: None,
1047 password: None,
1048 },
1049 ),
1050 ]),
1051 ..JmsPoolConfig::default()
1052 })
1053 .unwrap(),
1054 );
1055 let component = JmsComponent::with_scheme("jms", Arc::clone(&pool));
1056 let endpoint = component.create_endpoint(
1057 "jms:queue:orders?broker=secondary",
1058 &camel_component_api::NoOpComponentContext,
1059 );
1060 assert!(endpoint.is_ok(), "got: {:?}", endpoint.err());
1061 }
1062
1063 #[tokio::test]
1064 async fn concurrent_get_or_create_slot_no_deadlock() {
1065 use tokio::time::timeout;
1066
1067 struct EnvGuard {
1068 key: &'static str,
1069 prev: Option<std::ffi::OsString>,
1070 }
1071 impl Drop for EnvGuard {
1072 fn drop(&mut self) {
1073 if let Some(v) = &self.prev {
1074 unsafe { std::env::set_var(self.key, v) };
1076 } else {
1077 unsafe { std::env::remove_var(self.key) };
1079 }
1080 }
1081 }
1082
1083 let env_key = "CAMEL_JMS_BRIDGE_BINARY_PATH";
1084 let _guard = EnvGuard {
1085 key: env_key,
1086 prev: std::env::var_os(env_key),
1087 };
1088 unsafe { std::env::set_var(env_key, "/bin/false") };
1090
1091 let pool = Arc::new(
1092 JmsBridgePool::from_config(JmsPoolConfig {
1093 brokers: HashMap::from([(
1094 "test".to_string(),
1095 BrokerConfig {
1096 broker_url: "tcp://localhost:61616".to_string(),
1097 broker_type: BrokerType::ActiveMq,
1098 username: None,
1099 password: None,
1100 },
1101 )]),
1102 bridge_start_timeout_ms: 100,
1103 ..JmsPoolConfig::default()
1104 })
1105 .unwrap(),
1106 );
1107
1108 let handles: Vec<_> = (0..5)
1109 .map(|_| {
1110 let pool = Arc::clone(&pool);
1111 tokio::spawn(async move {
1112 let _ = pool.get_or_create_slot("test").await;
1113 })
1114 })
1115 .collect();
1116
1117 let result = timeout(Duration::from_secs(5), async {
1118 for h in handles {
1119 let _ = h.await;
1120 }
1121 })
1122 .await;
1123
1124 assert!(result.is_ok(), "Concurrent get_or_create_slot deadlocked!");
1125 }
1126
1127 #[tokio::test]
1128 async fn lazy_producer_reports_degraded_when_bridge_start_fails() {
1129 use tower::Service;
1130
1131 struct EnvGuard {
1132 key: &'static str,
1133 prev: Option<std::ffi::OsString>,
1134 }
1135 impl Drop for EnvGuard {
1136 fn drop(&mut self) {
1137 if let Some(v) = &self.prev {
1138 unsafe { std::env::set_var(self.key, v) };
1140 } else {
1141 unsafe { std::env::remove_var(self.key) };
1143 }
1144 }
1145 }
1146
1147 let env_key = "CAMEL_JMS_BRIDGE_BINARY_PATH";
1148 let _guard = EnvGuard {
1149 key: env_key,
1150 prev: std::env::var_os(env_key),
1151 };
1152 unsafe { std::env::set_var(env_key, "/bin/false") };
1154
1155 let pool = Arc::new(
1156 JmsBridgePool::from_config(JmsPoolConfig {
1157 brokers: HashMap::from([(
1158 "default".to_string(),
1159 BrokerConfig {
1160 broker_url: "tcp://localhost:61616".to_string(),
1161 broker_type: BrokerType::ActiveMq,
1162 username: None,
1163 password: None,
1164 },
1165 )]),
1166 bridge_start_timeout_ms: 100,
1167 ..JmsPoolConfig::default()
1168 })
1169 .unwrap(),
1170 );
1171
1172 let component = JmsComponent::with_scheme("jms", pool);
1173 let endpoint = component
1174 .create_endpoint(
1175 "jms:queue:orders",
1176 &camel_component_api::NoOpComponentContext,
1177 )
1178 .unwrap();
1179 let mut producer = endpoint
1180 .create_producer(rt(), &camel_component_api::ProducerContext::default())
1181 .unwrap();
1182
1183 let mut exchange = Exchange::default();
1184 exchange.input.body = camel_component_api::Body::Text("hello".to_string());
1185
1186 let err = producer.call(exchange).await.unwrap_err();
1187 assert!(err.to_string().contains("is degraded"), "got: {}", err);
1188 }
1189
1190 #[tokio::test]
1194 async fn lazy_producer_requests_restart_when_refresh_unavailable() {
1195 use tokio::sync::watch;
1196 use tonic::transport::Endpoint as TonicEndpoint;
1197 use tower::Service;
1198
1199 let dead_channel = TonicEndpoint::from_static("http://127.0.0.1:1").connect_lazy();
1202
1203 let (state_tx, state_rx) = watch::channel(BridgeState::Ready {
1204 channel: dead_channel.clone(),
1205 });
1206
1207 let pool = Arc::new(
1208 JmsBridgePool::from_config(JmsPoolConfig::single_broker(
1209 "tcp://localhost:61616",
1210 BrokerType::ActiveMq,
1211 ))
1212 .unwrap(),
1213 );
1214
1215 let slot = Arc::new(BridgeSlot {
1217 name: "default".to_string(),
1218 broker_url: "tcp://localhost:61616".to_string(),
1219 broker_type: BrokerType::ActiveMq,
1220 credentials: None,
1221 state_rx: state_rx.clone(),
1222 state_tx: state_tx.clone(),
1223 process: Arc::new(tokio::sync::Mutex::new(None)),
1224 health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1225 });
1226 pool.slots.insert("default".to_string(), Arc::clone(&slot));
1227
1228 let endpoint_config =
1229 crate::config::JmsEndpointConfig::from_uri("jms:queue:test-retry").unwrap();
1230
1231 let mut producer = LazyJmsProducer {
1232 pool: Arc::clone(&pool),
1233 broker_name: "default".to_string(),
1234 endpoint_config,
1235 resolved_broker_type: BrokerType::ActiveMq,
1236 runtime: test_rt(),
1237 };
1238
1239 let mut exchange = Exchange::default();
1240 exchange.input.body = camel_component_api::Body::Text("hello".to_string());
1241
1242 let result = producer.call(exchange).await;
1244 assert!(result.is_err(), "expected send to fail");
1245
1246 let state_after = state_rx.borrow().clone();
1249 assert!(
1250 matches!(state_after, BridgeState::Restarting { .. }),
1251 "slot must enter Restarting when refresh is unavailable; got: {:?}",
1252 state_after
1253 );
1254 }
1255
1256 #[test]
1259 fn transport_error_detects_send_error() {
1260 let err = CamelError::ProcessorError(format!(
1261 "{}send error: connection refused",
1262 BRIDGE_TRANSPORT_ERROR_PREFIX
1263 ));
1264 assert!(
1265 is_bridge_transport_error(&err),
1266 "send error must be classified as transport"
1267 );
1268 }
1269
1270 #[test]
1271 fn transport_error_detects_subscribe_error() {
1272 let err = CamelError::ProcessorError(format!(
1273 "{}subscribe error: stream reset",
1274 BRIDGE_TRANSPORT_ERROR_PREFIX
1275 ));
1276 assert!(
1277 is_bridge_transport_error(&err),
1278 "subscribe error must be classified as transport"
1279 );
1280 }
1281
1282 #[test]
1283 fn transport_error_rejects_business_errors() {
1284 let err = CamelError::ProcessorError("JMS broker 'main' is degraded: timeout".to_string());
1285 assert!(
1286 !is_bridge_transport_error(&err),
1287 "degraded state error must NOT be transport"
1288 );
1289 }
1290
1291 #[test]
1292 fn transport_error_rejects_config_errors() {
1293 let err = CamelError::Config("bridge_start_timeout_ms must be > 0".to_string());
1294 assert!(
1295 !is_bridge_transport_error(&err),
1296 "config error must NOT be transport"
1297 );
1298 }
1299
1300 #[test]
1301 fn transport_error_prefix_is_used_by_producer_and_consumer() {
1302 assert!(
1305 BRIDGE_TRANSPORT_ERROR_PREFIX.starts_with("JMS gRPC "),
1306 "prefix must start with 'JMS gRPC '"
1307 );
1308 }
1309
1310 #[tokio::test]
1313 async fn pool_enforces_max_bridges_limit() {
1314 use tokio::sync::watch;
1315
1316 let pool = Arc::new(
1317 JmsBridgePool::from_config(JmsPoolConfig {
1318 brokers: HashMap::from([
1319 (
1320 "b1".to_string(),
1321 BrokerConfig {
1322 broker_url: "tcp://b1:61616".to_string(),
1323 broker_type: BrokerType::ActiveMq,
1324 username: None,
1325 password: None,
1326 },
1327 ),
1328 (
1329 "b2".to_string(),
1330 BrokerConfig {
1331 broker_url: "tcp://b2:61616".to_string(),
1332 broker_type: BrokerType::ActiveMq,
1333 username: None,
1334 password: None,
1335 },
1336 ),
1337 (
1338 "b3".to_string(),
1339 BrokerConfig {
1340 broker_url: "tcp://b3:61616".to_string(),
1341 broker_type: BrokerType::ActiveMq,
1342 username: None,
1343 password: None,
1344 },
1345 ),
1346 ]),
1347 max_bridges: 2,
1348 ..JmsPoolConfig::default()
1349 })
1350 .unwrap(),
1351 );
1352
1353 for name in &["b1", "b2"] {
1356 let (state_tx, state_rx) = watch::channel(BridgeState::Starting);
1357 let slot = Arc::new(BridgeSlot {
1358 name: name.to_string(),
1359 broker_url: format!("tcp://{name}:61616"),
1360 broker_type: BrokerType::ActiveMq,
1361 credentials: None,
1362 state_rx,
1363 state_tx,
1364 process: Arc::new(tokio::sync::Mutex::new(None)),
1365 health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1366 });
1367 pool.slots.insert(name.to_string(), slot);
1368 }
1369
1370 let err = pool.get_or_create_slot("b3").await.unwrap_err();
1372 assert!(
1373 err.to_string().contains("max_bridges"),
1374 "expected max_bridges error, got: {}",
1375 err
1376 );
1377 }
1378
1379 #[tokio::test]
1380 async fn pool_allows_slot_when_below_max_bridges() {
1381 use tokio::sync::watch;
1382
1383 let pool = Arc::new(
1384 JmsBridgePool::from_config(JmsPoolConfig {
1385 brokers: HashMap::from([(
1386 "b1".to_string(),
1387 BrokerConfig {
1388 broker_url: "tcp://b1:61616".to_string(),
1389 broker_type: BrokerType::ActiveMq,
1390 username: None,
1391 password: None,
1392 },
1393 )]),
1394 max_bridges: 2,
1395 bridge_start_timeout_ms: 100,
1396 ..JmsPoolConfig::default()
1397 })
1398 .unwrap(),
1399 );
1400
1401 let (state_tx, state_rx) = watch::channel(BridgeState::Degraded("test".to_string()));
1403 let slot = Arc::new(BridgeSlot {
1404 name: "b1".to_string(),
1405 broker_url: "tcp://b1:61616".to_string(),
1406 broker_type: BrokerType::ActiveMq,
1407 credentials: None,
1408 state_rx,
1409 state_tx,
1410 process: Arc::new(tokio::sync::Mutex::new(None)),
1411 health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1412 });
1413 pool.slots.insert("b1".to_string(), slot);
1414
1415 let result = pool.get_or_create_slot("b1").await;
1418 assert!(result.is_ok(), "existing slot must be returned");
1419 }
1420
1421 #[tokio::test]
1424 async fn poll_ready_returns_pending_when_starting() {
1425 use tokio::sync::watch;
1426 use tower::Service;
1427
1428 let pool = Arc::new(
1429 JmsBridgePool::from_config(JmsPoolConfig::single_broker(
1430 "tcp://localhost:61616",
1431 BrokerType::ActiveMq,
1432 ))
1433 .unwrap(),
1434 );
1435
1436 let (state_tx, state_rx) = watch::channel(BridgeState::Starting);
1437 let slot = Arc::new(BridgeSlot {
1438 name: "default".to_string(),
1439 broker_url: "tcp://localhost:61616".to_string(),
1440 broker_type: BrokerType::ActiveMq,
1441 credentials: None,
1442 state_rx,
1443 state_tx,
1444 process: Arc::new(tokio::sync::Mutex::new(None)),
1445 health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1446 });
1447 pool.slots.insert("default".to_string(), slot);
1448
1449 let endpoint_config = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
1450 let mut producer = LazyJmsProducer {
1451 pool: Arc::clone(&pool),
1452 broker_name: "default".to_string(),
1453 endpoint_config,
1454 resolved_broker_type: BrokerType::ActiveMq,
1455 runtime: test_rt(),
1456 };
1457
1458 let result = producer.poll_ready(&mut Context::from_waker(futures::task::noop_waker_ref()));
1459 assert!(
1460 matches!(result, Poll::Pending),
1461 "poll_ready must be Pending when Starting; got: {:?}",
1462 result
1463 );
1464 }
1465
1466 #[tokio::test]
1467 async fn poll_ready_returns_error_when_degraded() {
1468 use tokio::sync::watch;
1469 use tower::Service;
1470
1471 let pool = Arc::new(
1472 JmsBridgePool::from_config(JmsPoolConfig::single_broker(
1473 "tcp://localhost:61616",
1474 BrokerType::ActiveMq,
1475 ))
1476 .unwrap(),
1477 );
1478
1479 let (state_tx, state_rx) =
1480 watch::channel(BridgeState::Degraded("health check failed".to_string()));
1481 let slot = Arc::new(BridgeSlot {
1482 name: "default".to_string(),
1483 broker_url: "tcp://localhost:61616".to_string(),
1484 broker_type: BrokerType::ActiveMq,
1485 credentials: None,
1486 state_rx,
1487 state_tx,
1488 process: Arc::new(tokio::sync::Mutex::new(None)),
1489 health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1490 });
1491 pool.slots.insert("default".to_string(), slot);
1492
1493 let endpoint_config = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
1494 let mut producer = LazyJmsProducer {
1495 pool: Arc::clone(&pool),
1496 broker_name: "default".to_string(),
1497 endpoint_config,
1498 resolved_broker_type: BrokerType::ActiveMq,
1499 runtime: test_rt(),
1500 };
1501
1502 let result = producer.poll_ready(&mut Context::from_waker(futures::task::noop_waker_ref()));
1503 assert!(
1504 matches!(result, Poll::Ready(Err(_))),
1505 "poll_ready must be Err when Degraded; got: {:?}",
1506 result
1507 );
1508 let err_msg = match result {
1509 Poll::Ready(Err(e)) => e.to_string(),
1510 _ => unreachable!(),
1511 };
1512 assert!(
1513 err_msg.contains("degraded"),
1514 "error must mention degraded: {}",
1515 err_msg
1516 );
1517 }
1518
1519 #[tokio::test]
1520 async fn poll_ready_returns_error_when_stopped() {
1521 use tokio::sync::watch;
1522 use tower::Service;
1523
1524 let pool = Arc::new(
1525 JmsBridgePool::from_config(JmsPoolConfig::single_broker(
1526 "tcp://localhost:61616",
1527 BrokerType::ActiveMq,
1528 ))
1529 .unwrap(),
1530 );
1531
1532 let (state_tx, state_rx) = watch::channel(BridgeState::Stopped);
1533 let slot = Arc::new(BridgeSlot {
1534 name: "default".to_string(),
1535 broker_url: "tcp://localhost:61616".to_string(),
1536 broker_type: BrokerType::ActiveMq,
1537 credentials: None,
1538 state_rx,
1539 state_tx,
1540 process: Arc::new(tokio::sync::Mutex::new(None)),
1541 health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1542 });
1543 pool.slots.insert("default".to_string(), slot);
1544
1545 let endpoint_config = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
1546 let mut producer = LazyJmsProducer {
1547 pool: Arc::clone(&pool),
1548 broker_name: "default".to_string(),
1549 endpoint_config,
1550 resolved_broker_type: BrokerType::ActiveMq,
1551 runtime: test_rt(),
1552 };
1553
1554 let result = producer.poll_ready(&mut Context::from_waker(futures::task::noop_waker_ref()));
1555 assert!(
1556 matches!(result, Poll::Ready(Err(_))),
1557 "poll_ready must be Err when Stopped; got: {:?}",
1558 result
1559 );
1560 }
1561
1562 #[tokio::test]
1563 async fn poll_ready_returns_ready_when_slot_ready() {
1564 use tokio::sync::watch;
1565 use tonic::transport::Endpoint as TonicEndpoint;
1566 use tower::Service;
1567
1568 let pool = Arc::new(
1569 JmsBridgePool::from_config(JmsPoolConfig::single_broker(
1570 "tcp://localhost:61616",
1571 BrokerType::ActiveMq,
1572 ))
1573 .unwrap(),
1574 );
1575
1576 let lazy_channel = TonicEndpoint::from_static("http://127.0.0.1:1").connect_lazy();
1577 let (state_tx, state_rx) = watch::channel(BridgeState::Ready {
1578 channel: lazy_channel,
1579 });
1580 let slot = Arc::new(BridgeSlot {
1581 name: "default".to_string(),
1582 broker_url: "tcp://localhost:61616".to_string(),
1583 broker_type: BrokerType::ActiveMq,
1584 credentials: None,
1585 state_rx,
1586 state_tx,
1587 process: Arc::new(tokio::sync::Mutex::new(None)),
1588 health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1589 });
1590 pool.slots.insert("default".to_string(), slot);
1591
1592 let endpoint_config = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
1593 let mut producer = LazyJmsProducer {
1594 pool: Arc::clone(&pool),
1595 broker_name: "default".to_string(),
1596 endpoint_config,
1597 resolved_broker_type: BrokerType::ActiveMq,
1598 runtime: test_rt(),
1599 };
1600
1601 let result = producer.poll_ready(&mut Context::from_waker(futures::task::noop_waker_ref()));
1602 assert!(
1603 matches!(result, Poll::Ready(Ok(()))),
1604 "poll_ready must be Ready(Ok) when bridge is Ready; got: {:?}",
1605 result
1606 );
1607 }
1608
1609 #[tokio::test]
1610 async fn poll_ready_returns_ready_when_no_slot_exists() {
1611 use tower::Service;
1612
1613 let pool = Arc::new(
1614 JmsBridgePool::from_config(JmsPoolConfig::single_broker(
1615 "tcp://localhost:61616",
1616 BrokerType::ActiveMq,
1617 ))
1618 .unwrap(),
1619 );
1620
1621 let endpoint_config = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
1622 let mut producer = LazyJmsProducer {
1623 pool: Arc::clone(&pool),
1624 broker_name: "default".to_string(),
1625 endpoint_config,
1626 resolved_broker_type: BrokerType::ActiveMq,
1627 runtime: test_rt(),
1628 };
1629
1630 let result = producer.poll_ready(&mut Context::from_waker(futures::task::noop_waker_ref()));
1632 assert!(
1633 matches!(result, Poll::Ready(Ok(()))),
1634 "poll_ready must be Ready(Ok) when no slot exists; got: {:?}",
1635 result
1636 );
1637 }
1638
1639 #[tokio::test]
1642 async fn pool_shutdown_awaits_health_monitor() {
1643 use tokio::sync::watch;
1644
1645 let pool = Arc::new(
1646 JmsBridgePool::from_config(JmsPoolConfig {
1647 brokers: HashMap::from([(
1648 "default".to_string(),
1649 BrokerConfig {
1650 broker_url: "tcp://localhost:61616".to_string(),
1651 broker_type: BrokerType::ActiveMq,
1652 username: None,
1653 password: None,
1654 },
1655 )]),
1656 health_check_interval_ms: 100,
1657 ..JmsPoolConfig::default()
1658 })
1659 .unwrap(),
1660 );
1661
1662 let (state_tx, state_rx) = watch::channel(BridgeState::Ready {
1664 channel: tonic::transport::Endpoint::from_static("http://127.0.0.1:1").connect_lazy(),
1665 });
1666 let monitor_handle_ref: Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>> =
1667 Arc::new(tokio::sync::Mutex::new(None));
1668
1669 let state_rx_clone = state_rx.clone();
1671 let handle = tokio::spawn(async move {
1672 loop {
1673 if matches!(*state_rx_clone.borrow(), BridgeState::Stopped) {
1674 break;
1675 }
1676 tokio::time::sleep(Duration::from_millis(50)).await;
1677 }
1678 });
1679 *monitor_handle_ref.lock().await = Some(handle);
1680
1681 let slot = Arc::new(BridgeSlot {
1682 name: "default".to_string(),
1683 broker_url: "tcp://localhost:61616".to_string(),
1684 broker_type: BrokerType::ActiveMq,
1685 credentials: None,
1686 state_rx,
1687 state_tx,
1688 process: Arc::new(tokio::sync::Mutex::new(None)),
1689 health_monitor_handle: monitor_handle_ref,
1690 });
1691 pool.slots.insert("default".to_string(), slot);
1692
1693 let result = pool.shutdown().await;
1695 let _ = result;
1697 }
1698
1699 #[tokio::test]
1700 async fn health_monitor_handle_stored_after_spawn() {
1701 use tokio::sync::watch;
1702
1703 let pool = Arc::new(
1704 JmsBridgePool::from_config(JmsPoolConfig {
1705 brokers: HashMap::from([(
1706 "default".to_string(),
1707 BrokerConfig {
1708 broker_url: "tcp://localhost:61616".to_string(),
1709 broker_type: BrokerType::ActiveMq,
1710 username: None,
1711 password: None,
1712 },
1713 )]),
1714 health_check_interval_ms: 100,
1715 bridge_start_timeout_ms: 100,
1716 ..JmsPoolConfig::default()
1717 })
1718 .unwrap(),
1719 );
1720
1721 let (state_tx, state_rx) = watch::channel(BridgeState::Stopped);
1722 let slot = Arc::new(BridgeSlot {
1723 name: "default".to_string(),
1724 broker_url: "tcp://localhost:61616".to_string(),
1725 broker_type: BrokerType::ActiveMq,
1726 credentials: None,
1727 state_rx,
1728 state_tx,
1729 process: Arc::new(tokio::sync::Mutex::new(None)),
1730 health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1731 });
1732 pool.slots.insert("default".to_string(), Arc::clone(&slot));
1733
1734 pool.spawn_health_monitor(Arc::clone(&slot)).await;
1735
1736 tokio::time::sleep(Duration::from_millis(50)).await;
1737 let guard = slot.health_monitor_handle.lock().await;
1738 assert!(
1739 guard.is_some(),
1740 "health monitor handle must be stored after spawn_health_monitor"
1741 );
1742 }
1743
1744 #[test]
1747 fn redact_url_strips_userinfo_with_password() {
1748 assert_eq!(
1749 redact_url("tcp://admin:s3cret@broker:61616"),
1750 "tcp://***@broker:61616"
1751 );
1752 }
1753
1754 #[test]
1755 fn redact_url_strips_userinfo_without_password() {
1756 assert_eq!(
1757 redact_url("tcp://admin@broker:61616"),
1758 "tcp://***@broker:61616"
1759 );
1760 }
1761
1762 #[test]
1763 fn redact_url_passes_clean_url_unchanged() {
1764 assert_eq!(redact_url("tcp://localhost:61616"), "tcp://localhost:61616");
1765 }
1766
1767 #[test]
1768 fn redact_url_handles_ssl_scheme() {
1769 assert_eq!(
1770 redact_url("ssl://user:pass@secure-broker:61617"),
1771 "ssl://***@secure-broker:61617"
1772 );
1773 }
1774
1775 #[tokio::test]
1778 async fn concurrent_slot_creation_respects_max_bridges() {
1779 let pool = Arc::new(
1780 JmsBridgePool::from_config(JmsPoolConfig {
1781 brokers: HashMap::from([
1782 (
1783 "b1".to_string(),
1784 BrokerConfig {
1785 broker_url: "tcp://b1:61616".to_string(),
1786 broker_type: BrokerType::ActiveMq,
1787 username: None,
1788 password: None,
1789 },
1790 ),
1791 (
1792 "b2".to_string(),
1793 BrokerConfig {
1794 broker_url: "tcp://b2:61616".to_string(),
1795 broker_type: BrokerType::ActiveMq,
1796 username: None,
1797 password: None,
1798 },
1799 ),
1800 (
1801 "b3".to_string(),
1802 BrokerConfig {
1803 broker_url: "tcp://b3:61616".to_string(),
1804 broker_type: BrokerType::ActiveMq,
1805 username: None,
1806 password: None,
1807 },
1808 ),
1809 ]),
1810 max_bridges: 2,
1811 bridge_start_timeout_ms: 100,
1812 ..JmsPoolConfig::default()
1813 })
1814 .unwrap(),
1815 );
1816
1817 let (state_tx, state_rx) = watch::channel(BridgeState::Starting);
1818 for name in &["b1", "b2"] {
1819 let slot = Arc::new(BridgeSlot {
1820 name: name.to_string(),
1821 broker_url: format!("tcp://{name}:61616"),
1822 broker_type: BrokerType::ActiveMq,
1823 credentials: None,
1824 state_rx: state_rx.clone(),
1825 state_tx: state_tx.clone(),
1826 process: Arc::new(tokio::sync::Mutex::new(None)),
1827 health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1828 });
1829 pool.slots.insert(name.to_string(), slot);
1830 }
1831
1832 assert_eq!(pool.slots.len(), 2);
1833
1834 let guard = pool.bridge_create_lock.lock().await;
1835 let total_count = pool.slots.len();
1836 assert!(total_count >= pool.max_bridges as usize);
1837 let result = if total_count >= pool.max_bridges as usize {
1838 Err(CamelError::Config(format!(
1839 "JMS bridge limit reached: {total_count} bridge(s) >= max_bridges ({})",
1840 pool.max_bridges
1841 )))
1842 } else {
1843 Ok(())
1844 };
1845 drop(guard);
1846
1847 assert!(result.is_err(), "3rd broker should be rejected");
1848 let err_msg = result.unwrap_err().to_string();
1849 assert!(
1850 err_msg.contains("max_bridges"),
1851 "error must mention max_bridges, got: {err_msg}"
1852 );
1853 }
1854
1855 #[tokio::test]
1858 async fn transport_error_refreshes_channel_but_does_not_resend() {
1859 use tokio::sync::watch;
1860 use tonic::transport::Endpoint as TonicEndpoint;
1861 use tower::Service;
1862
1863 let dead_channel = TonicEndpoint::from_static("http://127.0.0.1:1").connect_lazy();
1864
1865 let (state_tx, state_rx) = watch::channel(BridgeState::Ready {
1866 channel: dead_channel.clone(),
1867 });
1868
1869 let pool = Arc::new(
1870 JmsBridgePool::from_config(JmsPoolConfig::single_broker(
1871 "tcp://localhost:61616",
1872 BrokerType::ActiveMq,
1873 ))
1874 .unwrap(),
1875 );
1876
1877 let slot = Arc::new(BridgeSlot {
1878 name: "default".to_string(),
1879 broker_url: "tcp://localhost:61616".to_string(),
1880 broker_type: BrokerType::ActiveMq,
1881 credentials: None,
1882 state_rx: state_rx.clone(),
1883 state_tx: state_tx.clone(),
1884 process: Arc::new(tokio::sync::Mutex::new(None)),
1885 health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1886 });
1887 pool.slots.insert("default".to_string(), Arc::clone(&slot));
1888
1889 let endpoint_config =
1890 crate::config::JmsEndpointConfig::from_uri("jms:queue:test-no-resend").unwrap();
1891
1892 let mut producer = LazyJmsProducer {
1893 pool: Arc::clone(&pool),
1894 broker_name: "default".to_string(),
1895 endpoint_config,
1896 resolved_broker_type: BrokerType::ActiveMq,
1897 runtime: test_rt(),
1898 };
1899
1900 let mut exchange = Exchange::default();
1901 exchange.input.body = camel_component_api::Body::Text("hello".to_string());
1902
1903 let result = producer.call(exchange).await;
1904 assert!(result.is_err(), "expected send to fail");
1905
1906 let state_after = state_rx.borrow().clone();
1907 assert!(
1908 matches!(state_after, BridgeState::Restarting { .. }),
1909 "slot must enter Restarting; got: {:?}",
1910 state_after
1911 );
1912
1913 let err_msg = result.unwrap_err().to_string();
1914 assert!(
1915 err_msg.contains(BRIDGE_TRANSPORT_ERROR_PREFIX),
1916 "error must be original transport error, got: {}",
1917 err_msg
1918 );
1919 }
1920}