1use std::sync::Arc;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use camel_component_api::{
6 Body, CamelError, ConcurrencyModel, Consumer, ConsumerContext, Exchange, Message,
7 NetworkRetryPolicy, RuntimeObservability,
8};
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tonic::transport::Channel;
12use tracing::{error, info, warn};
13use uuid::Uuid;
14
15use crate::component::{
16 BRIDGE_TRANSPORT_ERROR_PREFIX, BridgeState, JmsBridgePool, is_bridge_transport_error,
17};
18use crate::config::{DestinationType, ExchangePattern, JmsEndpointConfig, JmsTransactionMode};
19use crate::headers::apply_jms_headers;
20use crate::proto::{JmsMessage, SubscribeRequest, bridge_service_client::BridgeServiceClient};
21
22pub struct JmsConsumer {
23 pool: Arc<JmsBridgePool>,
24 broker_name: String,
25 endpoint_config: JmsEndpointConfig,
26 reconnect: NetworkRetryPolicy,
27 cancel_token: Option<CancellationToken>,
28 task_handles: Vec<JoinHandle<Result<(), CamelError>>>,
29 runtime: Arc<dyn RuntimeObservability>,
32}
33
34impl JmsConsumer {
35 pub fn new(
36 pool: Arc<JmsBridgePool>,
37 broker_name: String,
38 endpoint_config: JmsEndpointConfig,
39 reconnect: NetworkRetryPolicy,
40 runtime: Arc<dyn RuntimeObservability>,
41 ) -> Self {
42 Self {
43 pool,
44 broker_name,
45 endpoint_config,
46 reconnect,
47 cancel_token: None,
48 task_handles: Vec::new(),
49 runtime,
50 }
51 }
52}
53
54fn build_exchange(msg: &JmsMessage, map_jms_headers: bool) -> Exchange {
55 let body_bytes = msg.body.clone();
56 let body = if msg.content_type.starts_with("text/") {
57 match String::from_utf8(body_bytes.clone()) {
58 Ok(s) => Body::Text(s),
59 Err(_) => Body::Bytes(bytes::Bytes::from(body_bytes)),
60 }
61 } else if msg.content_type.contains("json") {
62 match serde_json::from_slice::<serde_json::Value>(&body_bytes) {
63 Ok(v) => Body::Json(v),
64 Err(_) => Body::Bytes(bytes::Bytes::from(body_bytes)),
65 }
66 } else if body_bytes.is_empty() {
67 Body::Empty
68 } else {
69 Body::Bytes(bytes::Bytes::from(body_bytes))
70 };
71
72 let mut exchange = Exchange::new(Message::new(body));
73 if map_jms_headers {
75 apply_jms_headers(&mut exchange, msg);
76 }
77 exchange
78}
79
80fn destination(endpoint_config: &JmsEndpointConfig) -> String {
81 format!(
82 "{}:{}",
83 match endpoint_config.destination_type {
84 DestinationType::Queue => "queue",
85 DestinationType::Topic => "topic",
86 },
87 endpoint_config.destination_name
88 )
89}
90
91async fn await_ready_channel(
92 pool: &JmsBridgePool,
93 broker_name: &str,
94) -> Result<Channel, CamelError> {
95 let slot = pool.get_or_create_slot(broker_name).await?;
96 let mut rx = slot.state_rx.clone();
97
98 loop {
99 match &*rx.borrow() {
100 BridgeState::Ready { channel } => return Ok(channel.clone()),
101 BridgeState::Stopped => {
102 return Err(CamelError::ProcessorError(format!(
103 "JMS broker '{}' is stopped",
104 broker_name
105 )));
106 }
107 _ => {}
108 }
109
110 if rx.changed().await.is_err() {
111 return Err(CamelError::ProcessorError(format!(
112 "JMS broker '{}' state channel closed",
113 broker_name
114 )));
115 }
116 }
117}
118
119#[allow(clippy::too_many_arguments)]
122async fn consumer_loop(
123 pool: &JmsBridgePool,
124 broker_name: &str,
125 endpoint_config: &JmsEndpointConfig,
126 reconnect: &NetworkRetryPolicy,
127 cancel: CancellationToken,
128 ctx: &ConsumerContext,
129 idx: u32,
130 runtime: Arc<dyn RuntimeObservability>,
131) {
132 let destination = destination(endpoint_config);
133 let map_headers = endpoint_config.map_jms_headers;
134 let selector = endpoint_config.message_selector.clone();
135 let mut consecutive_transport_failures: u32 = 0;
136 let mut attempt: u32 = 0;
137
138 let _selector = selector;
141
142 loop {
155 let channel = tokio::select! {
156 _ = cancel.cancelled() => {
157 info!(
158 broker = %broker_name,
159 destination = %destination,
160 consumer_idx = idx,
161 "JMS consumer cancelled"
162 );
163 break;
164 }
165 _ = ctx.cancelled() => {
166 info!(
167 broker = %broker_name,
168 destination = %destination,
169 consumer_idx = idx,
170 "JMS consumer context cancelled"
171 );
172 break;
173 }
174 result = await_ready_channel(pool, broker_name) => {
175 match result {
176 Ok(channel) => channel,
177 Err(e) => {
178 warn!(
179 broker = %broker_name,
180 destination = %destination,
181 consumer_idx = idx,
182 error = %e,
183 "JMS consumer waiting for ready bridge failed"
184 );
185 attempt += 1;
186 if !reconnect.should_retry(attempt) {
187 warn!(
188 broker = %broker_name,
189 destination = %destination,
190 consumer_idx = idx,
191 attempts = attempt,
192 "JMS consumer max reconnect attempts reached; terminating"
193 );
194 return;
195 }
196 let delay = reconnect.delay_for(attempt - 1);
197 tokio::select! {
198 _ = cancel.cancelled() => break,
199 _ = ctx.cancelled() => break,
200 _ = tokio::time::sleep(delay) => {}
201 }
202 continue;
203 }
204 }
205 }
206 };
207
208 let mut client = BridgeServiceClient::new(channel);
209 let mut stream = match client
210 .subscribe(SubscribeRequest {
211 destination: destination.clone(),
212 subscription_id: Uuid::new_v4().to_string(),
213 })
214 .await
215 .map_err(|e| {
216 CamelError::ProcessorError(format!(
217 "{BRIDGE_TRANSPORT_ERROR_PREFIX}subscribe error: {e}"
218 ))
219 }) {
220 Ok(resp) => {
221 consecutive_transport_failures = 0;
222 attempt = 0;
223 info!(
224 broker = %broker_name,
225 destination = %destination,
226 consumer_idx = idx,
227 "JMS consumer subscribed successfully"
228 );
229 resp.into_inner()
230 }
231 Err(e) => {
232 if is_bridge_transport_error(&e) {
233 consecutive_transport_failures += 1;
234 if consecutive_transport_failures >= 2 {
235 warn!(
236 broker = %broker_name,
237 destination = %destination,
238 consumer_idx = idx,
239 failures = consecutive_transport_failures,
240 "JMS subscribe transport failures exceeded threshold; refreshing channel"
241 );
242 if let Err(refresh_err) = pool.refresh_slot_channel(broker_name).await {
243 warn!(
244 broker = %broker_name,
245 destination = %destination,
246 consumer_idx = idx,
247 error = %refresh_err,
248 "JMS channel refresh failed; requesting bridge restart"
249 );
250 pool.restart_slot(broker_name);
251 }
252 consecutive_transport_failures = 0;
253 }
254 } else {
255 consecutive_transport_failures = 0;
256 }
257 warn!(
258 broker = %broker_name,
259 destination = %destination,
260 consumer_idx = idx,
261 error = %e,
262 "JMS subscribe failed; retrying"
263 );
264 attempt += 1;
265 if !reconnect.should_retry(attempt) {
266 warn!(
267 broker = %broker_name,
268 destination = %destination,
269 consumer_idx = idx,
270 attempts = attempt,
271 "JMS consumer max subscribe attempts reached; terminating"
272 );
273 return;
274 }
275 let delay = reconnect.delay_for(attempt - 1);
276 tokio::select! {
277 _ = cancel.cancelled() => break,
278 _ = ctx.cancelled() => break,
279 _ = tokio::time::sleep(delay) => {}
280 }
281 continue;
282 }
283 };
284
285 loop {
286 tokio::select! {
287 _ = cancel.cancelled() => {
288 info!(
289 broker = %broker_name,
290 destination = %destination,
291 consumer_idx = idx,
292 "JMS consumer cancelled"
293 );
294 return;
295 }
296 _ = ctx.cancelled() => {
297 info!(
298 broker = %broker_name,
299 destination = %destination,
300 consumer_idx = idx,
301 "JMS consumer context cancelled"
302 );
303 return;
304 }
305 msg = stream.message() => {
306 match msg {
307 Ok(Some(jms_msg)) => {
308 consecutive_transport_failures = 0;
309 attempt = 0;
310 let exchange = build_exchange(&jms_msg, map_headers);
311 if let Err(e) = ctx.send(exchange).await {
312 runtime.metrics().increment_errors(
313 ctx.route_id(),
314 "b-prime:jms:consumer-send",
315 );
316 error!(
318 broker = %broker_name,
319 consumer_idx = idx,
320 "JMS consumer route error: {e}"
321 );
322 }
323 }
324 Ok(None) => {
325 info!(
326 broker = %broker_name,
327 destination = %destination,
328 consumer_idx = idx,
329 "JMS stream ended; reconnecting"
330 );
331 break;
332 }
333 Err(e) => {
334 let subscribe_err = CamelError::ProcessorError(format!(
335 "{BRIDGE_TRANSPORT_ERROR_PREFIX}subscribe error: {e}"
336 ));
337 if is_bridge_transport_error(&subscribe_err) {
338 consecutive_transport_failures += 1;
339 if consecutive_transport_failures >= 2 {
340 warn!(
341 broker = %broker_name,
342 destination = %destination,
343 consumer_idx = idx,
344 failures = consecutive_transport_failures,
345 "JMS stream transport failures exceeded threshold; refreshing channel"
346 );
347 if let Err(refresh_err) =
348 pool.refresh_slot_channel(broker_name).await
349 {
350 warn!(
351 broker = %broker_name,
352 destination = %destination,
353 consumer_idx = idx,
354 error = %refresh_err,
355 "JMS channel refresh failed; requesting bridge restart"
356 );
357 pool.restart_slot(broker_name);
358 }
359 consecutive_transport_failures = 0;
360 }
361 } else {
362 consecutive_transport_failures = 0;
363 }
364 warn!(
365 broker = %broker_name,
366 destination = %destination,
367 consumer_idx = idx,
368 error = %subscribe_err,
369 "JMS stream error; reconnecting"
370 );
371 break;
372 }
373 }
374 }
375 }
376 }
377
378 attempt += 1;
379 if !reconnect.should_retry(attempt) {
380 warn!(
381 broker = %broker_name,
382 destination = %destination,
383 consumer_idx = idx,
384 attempts = attempt,
385 "JMS consumer max reconnect attempts reached; terminating"
386 );
387 break;
388 }
389 let delay = reconnect.delay_for(attempt - 1);
390 tokio::select! {
391 _ = cancel.cancelled() => break,
392 _ = ctx.cancelled() => break,
393 _ = tokio::time::sleep(delay) => {}
394 }
395 }
396}
397
398#[async_trait]
399impl Consumer for JmsConsumer {
400 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
401 if self.cancel_token.is_some() {
403 return Err(CamelError::EndpointCreationFailed(
404 "JMS consumer already started".into(),
405 ));
406 }
407
408 if self.endpoint_config.transaction_mode == JmsTransactionMode::Session {
410 warn!("JMS session transaction mode not yet implemented; using None");
411 }
412
413 if self.endpoint_config.exchange_pattern == ExchangePattern::InOut {
415 warn!("JMS InOut pattern not yet implemented");
416 }
417
418 {
423 let slot = self.pool.get_or_create_slot(&self.broker_name).await?;
424 match &*slot.state_rx.borrow() {
425 BridgeState::Ready { .. } => {} BridgeState::Degraded(reason) => {
427 return Err(CamelError::ProcessorError(format!(
428 "JMS bridge not available: {}",
429 reason
430 )));
431 }
432 other => {
433 return Err(CamelError::ProcessorError(format!(
434 "JMS bridge not available: {:?}",
435 other
436 )));
437 }
438 }
439 }
440
441 let pool = Arc::clone(&self.pool);
442 let broker_name = self.broker_name.clone();
443 let endpoint_config = self.endpoint_config.clone();
444 let reconnect = self.reconnect.clone();
445 let cancel = CancellationToken::new();
446 self.cancel_token = Some(cancel.clone());
447
448 let consumer_count = endpoint_config.concurrent_consumers;
450 let runtime = self.runtime.clone();
451 let handles: Vec<JoinHandle<Result<(), CamelError>>> = (0..consumer_count)
453 .map(|idx| {
454 let pool = Arc::clone(&pool);
455 let broker_name = broker_name.clone();
456 let endpoint_config = endpoint_config.clone();
457 let cancel = cancel.clone();
458 let ctx = ctx.clone();
459 let reconnect = reconnect.clone();
460 let runtime = runtime.clone();
461
462 tokio::spawn(async move {
463 consumer_loop(
464 &pool,
465 &broker_name,
466 &endpoint_config,
467 &reconnect,
468 cancel,
469 &ctx,
470 idx,
471 runtime,
472 )
473 .await;
474 Ok(())
475 })
476 })
477 .collect();
478
479 self.task_handles = handles;
481
482 Ok(())
483 }
484
485 async fn stop(&mut self) -> Result<(), CamelError> {
486 if let Some(cancel) = self.cancel_token.take() {
487 cancel.cancel();
488 }
489 let handles = std::mem::take(&mut self.task_handles);
490 for mut handle in handles {
491 if tokio::time::timeout(Duration::from_secs(5), &mut handle)
492 .await
493 .is_err()
494 {
495 handle.abort();
496 let _ = handle.await;
497 warn!("JMS consumer task did not stop in 5s; aborted");
498 }
499 }
500 Ok(())
501 }
502
503 fn concurrency_model(&self) -> ConcurrencyModel {
504 ConcurrencyModel::Sequential
505 }
506
507 fn background_task_handle(
508 &mut self,
509 ) -> Option<tokio::task::JoinHandle<Result<(), CamelError>>> {
510 self.task_handles.pop()
513 }
514}
515
516#[cfg(test)]
517mod tests {
518 use super::*;
519 use crate::BrokerType;
520 use crate::config::{JmsPoolConfig, jms_reconnect_default};
521 use camel_component_api::test_support::PanicRuntimeObservability;
522 use tokio::sync::mpsc;
523
524 fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
525 std::sync::Arc::new(PanicRuntimeObservability)
526 }
527
528 #[test]
529 fn build_exchange_text_body() {
530 let msg = JmsMessage {
531 message_id: "ID:1".to_string(),
532 body: b"hello world".to_vec(),
533 content_type: "text/plain".to_string(),
534 ..Default::default()
535 };
536 let ex = build_exchange(&msg, true);
537 assert!(matches!(ex.input.body, Body::Text(_)));
538 }
539
540 #[test]
541 fn build_exchange_binary_body() {
542 let msg = JmsMessage {
543 message_id: "ID:2".to_string(),
544 body: vec![0x00, 0x01, 0x02],
545 content_type: "application/octet-stream".to_string(),
546 ..Default::default()
547 };
548 let ex = build_exchange(&msg, true);
549 assert!(matches!(ex.input.body, Body::Bytes(_)));
550 }
551
552 #[test]
553 fn build_exchange_json_body() {
554 let msg = JmsMessage {
555 message_id: "ID:json".to_string(),
556 body: br#"{"ok":true}"#.to_vec(),
557 content_type: "application/json".to_string(),
558 ..Default::default()
559 };
560 let ex = build_exchange(&msg, true);
561 assert!(matches!(ex.input.body, Body::Json(_)));
562 }
563
564 #[test]
565 fn build_exchange_empty_body() {
566 let msg = JmsMessage {
567 message_id: "ID:3".to_string(),
568 body: vec![],
569 content_type: "".to_string(),
570 ..Default::default()
571 };
572 let ex = build_exchange(&msg, true);
573 assert!(matches!(ex.input.body, Body::Empty));
574 }
575
576 #[test]
577 fn build_exchange_without_header_mapping() {
578 let msg = JmsMessage {
580 message_id: "ID:header-test".to_string(),
581 correlation_id: "CORR:123".to_string(),
582 timestamp: 1700000000,
583 destination: "queue:orders".to_string(),
584 body: b"hello".to_vec(),
585 headers: Default::default(),
586 content_type: "text/plain".to_string(),
587 };
588 let ex = build_exchange(&msg, false);
589 assert!(ex.input.header("JMSMessageID").is_none());
590 assert!(ex.input.header("JMSCorrelationID").is_none());
591 assert!(ex.input.header("JMSTimestamp").is_none());
592 assert!(ex.input.header("JMSDestination").is_none());
593 assert!(ex.input.header("Content-Type").is_none());
594 }
595
596 #[tokio::test]
597 async fn stop_without_start_is_noop() {
598 let pool = Arc::new(
599 JmsBridgePool::from_config(JmsPoolConfig::single_broker(
600 "tcp://localhost:61616",
601 BrokerType::Generic,
602 ))
603 .unwrap(),
604 );
605 let endpoint_cfg = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
606 let mut consumer = JmsConsumer::new(
607 pool,
608 "default".to_string(),
609 endpoint_cfg,
610 jms_reconnect_default(),
611 rt(),
612 );
613 assert!(consumer.stop().await.is_ok());
614 }
615
616 #[tokio::test]
619 async fn consumer_double_start_returns_error() {
620 let pool = Arc::new(
621 JmsBridgePool::from_config(JmsPoolConfig::single_broker(
622 "tcp://localhost:61616",
623 BrokerType::Generic,
624 ))
625 .unwrap(),
626 );
627 let endpoint_cfg = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
628 let mut consumer = JmsConsumer::new(
629 pool,
630 "default".to_string(),
631 endpoint_cfg,
632 jms_reconnect_default(),
633 rt(),
634 );
635
636 consumer.cancel_token = Some(CancellationToken::new());
638
639 let (route_tx, _route_rx) = mpsc::channel(16);
640 let ctx = ConsumerContext::new(
641 route_tx,
642 CancellationToken::new(),
643 "jms-test-route".to_string(),
644 );
645 let result = consumer.start(ctx).await;
646 assert!(result.is_err(), "second start must return an error");
647 let msg = result.unwrap_err().to_string();
648 assert!(
649 msg.contains("already started"),
650 "error must mention already started: {}",
651 msg
652 );
653 }
654
655 #[tokio::test]
658 async fn test_jms_consumer_stop_joins() {
659 let pool = Arc::new(
660 JmsBridgePool::from_config(JmsPoolConfig::single_broker(
661 "tcp://localhost:61616",
662 BrokerType::Generic,
663 ))
664 .unwrap(),
665 );
666 let endpoint_cfg = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
667 let mut consumer = JmsConsumer::new(
668 pool,
669 "default".to_string(),
670 endpoint_cfg,
671 jms_reconnect_default(),
672 rt(),
673 );
674
675 let cancel = CancellationToken::new();
677 consumer.cancel_token = Some(cancel.clone());
678 consumer.task_handles = vec![tokio::spawn({
679 let cancel = cancel.clone();
680 async move {
681 cancel.cancelled().await;
683 Ok(())
684 }
685 })];
686
687 let result = consumer.stop().await;
689 assert!(result.is_ok(), "stop must succeed: {:?}", result.err());
690 assert!(
692 consumer.task_handles.is_empty(),
693 "task_handles must be cleared after stop"
694 );
695 }
696
697 #[tokio::test]
700 async fn stop_absorbs_consumer_task_panic() {
701 let pool = Arc::new(
702 JmsBridgePool::from_config(JmsPoolConfig::single_broker(
703 "tcp://localhost:61616",
704 BrokerType::Generic,
705 ))
706 .unwrap(),
707 );
708 let endpoint_cfg = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
709 let mut consumer = JmsConsumer::new(
710 pool,
711 "default".to_string(),
712 endpoint_cfg,
713 jms_reconnect_default(),
714 rt(),
715 );
716
717 consumer.task_handles = vec![tokio::spawn(async {
719 panic!("simulated consumer panic");
720 })];
721 tokio::time::sleep(Duration::from_millis(50)).await;
723
724 let result = consumer.stop().await;
726 assert!(
727 result.is_ok(),
728 "stop must absorb panic and return Ok: {:?}",
729 result.err()
730 );
731 }
732
733 #[tokio::test]
736 async fn missing_bridge_returns_err_within_1s() {
737 struct EnvGuard {
738 key: &'static str,
739 prev: Option<std::ffi::OsString>,
740 }
741 impl Drop for EnvGuard {
742 fn drop(&mut self) {
743 if let Some(v) = &self.prev {
744 unsafe { std::env::set_var(self.key, v) };
745 } else {
746 unsafe { std::env::remove_var(self.key) };
747 }
748 }
749 }
750
751 let env_key = "CAMEL_JMS_BRIDGE_BINARY_PATH";
752 let _guard = EnvGuard {
753 key: env_key,
754 prev: std::env::var_os(env_key),
755 };
756 unsafe { std::env::set_var(env_key, "/bin/false") };
757
758 let pool = Arc::new(
759 JmsBridgePool::from_config(JmsPoolConfig {
760 brokers: std::collections::HashMap::from([(
761 "default".to_string(),
762 crate::config::BrokerConfig {
763 broker_url: "tcp://localhost:61616".to_string(),
764 broker_type: BrokerType::ActiveMq,
765 username: None,
766 password: None,
767 },
768 )]),
769 bridge_start_timeout_ms: 100,
770 ..JmsPoolConfig::default()
771 })
772 .unwrap(),
773 );
774 let endpoint_cfg = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
775 let mut consumer = JmsConsumer::new(
776 pool,
777 "default".to_string(),
778 endpoint_cfg,
779 jms_reconnect_default(),
780 rt(),
781 );
782
783 let (route_tx, _route_rx) = mpsc::channel(16);
784 let ctx = ConsumerContext::new(
785 route_tx,
786 CancellationToken::new(),
787 "jms-test-route-2".to_string(),
788 );
789
790 let start = std::time::Instant::now();
791 let result = consumer.start(ctx).await;
792 let elapsed = start.elapsed();
793
794 assert!(result.is_err(), "expected Err when bridge missing, got Ok");
795 let msg = result.unwrap_err().to_string();
796 assert!(
797 msg.contains("JMS bridge not available"),
798 "error must mention bridge unavailability: {}",
799 msg
800 );
801 assert!(
802 elapsed < Duration::from_secs(1),
803 "must fail within 1s, took {:?}",
804 elapsed
805 );
806
807 let _ = consumer.stop().await;
809 }
810
811 #[tokio::test]
815 async fn retry_loop_invokes_operation_exactly_max_attempts_times() {
816 use std::sync::Arc;
817 use std::sync::atomic::{AtomicU32, Ordering};
818 use std::time::Duration;
819
820 let policy = NetworkRetryPolicy {
821 max_attempts: 3,
822 initial_delay: Duration::from_millis(1),
823 max_delay: Duration::from_millis(1),
824 multiplier: 1.0,
825 ..NetworkRetryPolicy::default()
826 };
827
828 let calls = Arc::new(AtomicU32::new(0));
829 let calls_clone = Arc::clone(&calls);
830 let mut attempt: u32 = 0;
831
832 loop {
833 calls_clone.fetch_add(1, Ordering::SeqCst);
834 let result: Result<(), ()> = Err(());
835 match result {
836 Ok(_) => {
837 attempt = 0;
838 break;
839 }
840 Err(_) => {
841 attempt += 1;
842 if !policy.should_retry(attempt) {
843 break;
844 }
845 let delay = policy.delay_for(attempt - 1);
846 tokio::time::sleep(delay).await;
847 continue;
848 }
849 }
850 }
851
852 assert_eq!(
853 calls.load(Ordering::SeqCst),
854 3,
855 "max_attempts=3 must yield exactly 3 invocations"
856 );
857 }
858}