Skip to main content

camel_component_jms/
consumer.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use camel_component_api::{
6    Body, CamelError, ConcurrencyModel, Consumer, ConsumerContext, Exchange, Message,
7    NetworkRetryPolicy, RuntimeObservability,
8};
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tonic::transport::Channel;
12use tracing::{error, info, warn};
13use uuid::Uuid;
14
15use crate::component::{
16    BRIDGE_TRANSPORT_ERROR_PREFIX, BridgeState, JmsBridgePool, is_bridge_transport_error,
17};
18use crate::config::{DestinationType, ExchangePattern, JmsEndpointConfig, JmsTransactionMode};
19use crate::headers::apply_jms_headers;
20use crate::proto::{JmsMessage, SubscribeRequest, bridge_service_client::BridgeServiceClient};
21
22pub struct JmsConsumer {
23    pool: Arc<JmsBridgePool>,
24    broker_name: String,
25    endpoint_config: JmsEndpointConfig,
26    reconnect: NetworkRetryPolicy,
27    cancel_token: Option<CancellationToken>,
28    task_handles: Vec<JoinHandle<Result<(), CamelError>>>,
29    /// Phase C ADR-0012: used for `metrics().increment_errors(…)` at
30    /// consumer-send failure sites.
31    runtime: Arc<dyn RuntimeObservability>,
32}
33
34impl JmsConsumer {
35    pub fn new(
36        pool: Arc<JmsBridgePool>,
37        broker_name: String,
38        endpoint_config: JmsEndpointConfig,
39        reconnect: NetworkRetryPolicy,
40        runtime: Arc<dyn RuntimeObservability>,
41    ) -> Self {
42        Self {
43            pool,
44            broker_name,
45            endpoint_config,
46            reconnect,
47            cancel_token: None,
48            task_handles: Vec::new(),
49            runtime,
50        }
51    }
52}
53
54fn build_exchange(msg: &JmsMessage, map_jms_headers: bool) -> Exchange {
55    let body_bytes = msg.body.clone();
56    let body = if msg.content_type.starts_with("text/") {
57        match String::from_utf8(body_bytes.clone()) {
58            Ok(s) => Body::Text(s),
59            Err(_) => Body::Bytes(bytes::Bytes::from(body_bytes)),
60        }
61    } else if msg.content_type.contains("json") {
62        match serde_json::from_slice::<serde_json::Value>(&body_bytes) {
63            Ok(v) => Body::Json(v),
64            Err(_) => Body::Bytes(bytes::Bytes::from(body_bytes)),
65        }
66    } else if body_bytes.is_empty() {
67        Body::Empty
68    } else {
69        Body::Bytes(bytes::Bytes::from(body_bytes))
70    };
71
72    let mut exchange = Exchange::new(Message::new(body));
73    // JMS-018: gate header mapping behind config flag
74    if map_jms_headers {
75        apply_jms_headers(&mut exchange, msg);
76    }
77    exchange
78}
79
80fn destination(endpoint_config: &JmsEndpointConfig) -> String {
81    format!(
82        "{}:{}",
83        match endpoint_config.destination_type {
84            DestinationType::Queue => "queue",
85            DestinationType::Topic => "topic",
86        },
87        endpoint_config.destination_name
88    )
89}
90
91async fn await_ready_channel(
92    pool: &JmsBridgePool,
93    broker_name: &str,
94) -> Result<Channel, CamelError> {
95    let slot = pool.get_or_create_slot(broker_name).await?;
96    let mut rx = slot.state_rx.clone();
97
98    loop {
99        match &*rx.borrow() {
100            BridgeState::Ready { channel } => return Ok(channel.clone()),
101            BridgeState::Stopped => {
102                return Err(CamelError::ProcessorError(format!(
103                    "JMS broker '{}' is stopped",
104                    broker_name
105                )));
106            }
107            _ => {}
108        }
109
110        if rx.changed().await.is_err() {
111            return Err(CamelError::ProcessorError(format!(
112                "JMS broker '{}' state channel closed",
113                broker_name
114            )));
115        }
116    }
117}
118
119/// Background consumer loop, extracted so it can be spawned multiple times
120/// for concurrent consumers (JMS-011).
121#[allow(clippy::too_many_arguments)]
122async fn consumer_loop(
123    pool: &JmsBridgePool,
124    broker_name: &str,
125    endpoint_config: &JmsEndpointConfig,
126    reconnect: &NetworkRetryPolicy,
127    cancel: CancellationToken,
128    ctx: &ConsumerContext,
129    idx: u32,
130    runtime: Arc<dyn RuntimeObservability>,
131) {
132    let destination = destination(endpoint_config);
133    let map_headers = endpoint_config.map_jms_headers;
134    let selector = endpoint_config.message_selector.clone();
135    let mut consecutive_transport_failures: u32 = 0;
136    let mut attempt: u32 = 0;
137
138    // JMS-010: message selector — pass to bridge subscription when available
139    // TODO(JMS-010): pass selector to bridge subscription
140    let _selector = selector;
141
142    // Manual retry loops (not retry_async / retry_async_cancelable) — three
143    // retry sites nested inside this outer loop:
144    // - Retries are embedded in `tokio::select!` with cancellation tokens
145    //   (cancel.cancelled(), ctx.cancelled()); retry_async_cancelable
146    //   honours a single CancellationToken but does not interleave
147    //   cancellation checks between every operation step.
148    // - Inter-attempt side effects involve async lifecycle orchestration:
149    //   transport failure counting, pool.restart_slot() for channel
150    //   refresh, attempt counter shared across channel-await, subscribe,
151    //   and stream-error retry sites. These are async operations, not
152    //   just mutable borrows. The HRTB variant (bd rc-cvq) would only
153    //   solve &mut state, not async side-effect orchestration.
154    loop {
155        let channel = tokio::select! {
156            _ = cancel.cancelled() => {
157                info!(
158                    broker = %broker_name,
159                    destination = %destination,
160                    consumer_idx = idx,
161                    "JMS consumer cancelled"
162                );
163                break;
164            }
165            _ = ctx.cancelled() => {
166                info!(
167                    broker = %broker_name,
168                    destination = %destination,
169                    consumer_idx = idx,
170                    "JMS consumer context cancelled"
171                );
172                break;
173            }
174            result = await_ready_channel(pool, broker_name) => {
175                match result {
176                    Ok(channel) => channel,
177                    Err(e) => {
178                        warn!(
179                            broker = %broker_name,
180                            destination = %destination,
181                            consumer_idx = idx,
182                            error = %e,
183                            "JMS consumer waiting for ready bridge failed"
184                        );
185                        attempt += 1;
186                        if !reconnect.should_retry(attempt) {
187                            warn!(
188                                broker = %broker_name,
189                                destination = %destination,
190                                consumer_idx = idx,
191                                attempts = attempt,
192                                "JMS consumer max reconnect attempts reached; terminating"
193                            );
194                            return;
195                        }
196                        let delay = reconnect.delay_for(attempt - 1);
197                        tokio::select! {
198                            _ = cancel.cancelled() => break,
199                            _ = ctx.cancelled() => break,
200                            _ = tokio::time::sleep(delay) => {}
201                        }
202                        continue;
203                    }
204                }
205            }
206        };
207
208        let mut client = BridgeServiceClient::new(channel);
209        let mut stream = match client
210            .subscribe(SubscribeRequest {
211                destination: destination.clone(),
212                subscription_id: Uuid::new_v4().to_string(),
213            })
214            .await
215            .map_err(|e| {
216                CamelError::ProcessorError(format!(
217                    "{BRIDGE_TRANSPORT_ERROR_PREFIX}subscribe error: {e}"
218                ))
219            }) {
220            Ok(resp) => {
221                consecutive_transport_failures = 0;
222                attempt = 0;
223                info!(
224                    broker = %broker_name,
225                    destination = %destination,
226                    consumer_idx = idx,
227                    "JMS consumer subscribed successfully"
228                );
229                resp.into_inner()
230            }
231            Err(e) => {
232                if is_bridge_transport_error(&e) {
233                    consecutive_transport_failures += 1;
234                    if consecutive_transport_failures >= 2 {
235                        warn!(
236                            broker = %broker_name,
237                            destination = %destination,
238                            consumer_idx = idx,
239                            failures = consecutive_transport_failures,
240                            "JMS subscribe transport failures exceeded threshold; refreshing channel"
241                        );
242                        if let Err(refresh_err) = pool.refresh_slot_channel(broker_name).await {
243                            warn!(
244                                broker = %broker_name,
245                                destination = %destination,
246                                consumer_idx = idx,
247                                error = %refresh_err,
248                                "JMS channel refresh failed; requesting bridge restart"
249                            );
250                            pool.restart_slot(broker_name);
251                        }
252                        consecutive_transport_failures = 0;
253                    }
254                } else {
255                    consecutive_transport_failures = 0;
256                }
257                warn!(
258                    broker = %broker_name,
259                    destination = %destination,
260                    consumer_idx = idx,
261                    error = %e,
262                    "JMS subscribe failed; retrying"
263                );
264                attempt += 1;
265                if !reconnect.should_retry(attempt) {
266                    warn!(
267                        broker = %broker_name,
268                        destination = %destination,
269                        consumer_idx = idx,
270                        attempts = attempt,
271                        "JMS consumer max subscribe attempts reached; terminating"
272                    );
273                    return;
274                }
275                let delay = reconnect.delay_for(attempt - 1);
276                tokio::select! {
277                    _ = cancel.cancelled() => break,
278                    _ = ctx.cancelled() => break,
279                    _ = tokio::time::sleep(delay) => {}
280                }
281                continue;
282            }
283        };
284
285        loop {
286            tokio::select! {
287                _ = cancel.cancelled() => {
288                    info!(
289                        broker = %broker_name,
290                        destination = %destination,
291                        consumer_idx = idx,
292                        "JMS consumer cancelled"
293                    );
294                    return;
295                }
296                _ = ctx.cancelled() => {
297                    info!(
298                        broker = %broker_name,
299                        destination = %destination,
300                        consumer_idx = idx,
301                        "JMS consumer context cancelled"
302                    );
303                    return;
304                }
305                msg = stream.message() => {
306                    match msg {
307                        Ok(Some(jms_msg)) => {
308                            consecutive_transport_failures = 0;
309                            attempt = 0;
310                            let exchange = build_exchange(&jms_msg, map_headers);
311                            if let Err(e) = ctx.send(exchange).await {
312                                runtime.metrics().increment_errors(
313                                    ctx.route_id(),
314                                    "b-prime:jms:consumer-send",
315                                );
316                                // log-policy: outside-contract
317                                error!(
318                                    broker = %broker_name,
319                                    consumer_idx = idx,
320                                    "JMS consumer route error: {e}"
321                                );
322                            }
323                        }
324                        Ok(None) => {
325                            info!(
326                                broker = %broker_name,
327                                destination = %destination,
328                                consumer_idx = idx,
329                                "JMS stream ended; reconnecting"
330                            );
331                            break;
332                        }
333                        Err(e) => {
334                            let subscribe_err = CamelError::ProcessorError(format!(
335                                "{BRIDGE_TRANSPORT_ERROR_PREFIX}subscribe error: {e}"
336                            ));
337                            if is_bridge_transport_error(&subscribe_err) {
338                                consecutive_transport_failures += 1;
339                                if consecutive_transport_failures >= 2 {
340                                    warn!(
341                                        broker = %broker_name,
342                                        destination = %destination,
343                                        consumer_idx = idx,
344                                        failures = consecutive_transport_failures,
345                                        "JMS stream transport failures exceeded threshold; refreshing channel"
346                                    );
347                                    if let Err(refresh_err) =
348                                        pool.refresh_slot_channel(broker_name).await
349                                    {
350                                        warn!(
351                                            broker = %broker_name,
352                                            destination = %destination,
353                                            consumer_idx = idx,
354                                            error = %refresh_err,
355                                            "JMS channel refresh failed; requesting bridge restart"
356                                        );
357                                        pool.restart_slot(broker_name);
358                                    }
359                                    consecutive_transport_failures = 0;
360                                }
361                            } else {
362                                consecutive_transport_failures = 0;
363                            }
364                            warn!(
365                                broker = %broker_name,
366                                destination = %destination,
367                                consumer_idx = idx,
368                                error = %subscribe_err,
369                                "JMS stream error; reconnecting"
370                            );
371                            break;
372                        }
373                    }
374                }
375            }
376        }
377
378        attempt += 1;
379        if !reconnect.should_retry(attempt) {
380            warn!(
381                broker = %broker_name,
382                destination = %destination,
383                consumer_idx = idx,
384                attempts = attempt,
385                "JMS consumer max reconnect attempts reached; terminating"
386            );
387            break;
388        }
389        let delay = reconnect.delay_for(attempt - 1);
390        tokio::select! {
391            _ = cancel.cancelled() => break,
392            _ = ctx.cancelled() => break,
393            _ = tokio::time::sleep(delay) => {}
394        }
395    }
396}
397
398#[async_trait]
399impl Consumer for JmsConsumer {
400    async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
401        // Reject double-start (JMS-006)
402        if self.cancel_token.is_some() {
403            return Err(CamelError::EndpointCreationFailed(
404                "JMS consumer already started".into(),
405            ));
406        }
407
408        // JMS-012: warn when session transaction mode requested but not implemented
409        if self.endpoint_config.transaction_mode == JmsTransactionMode::Session {
410            warn!("JMS session transaction mode not yet implemented; using None");
411        }
412
413        // JMS-005: warn when InOut pattern requested but not implemented
414        if self.endpoint_config.exchange_pattern == ExchangePattern::InOut {
415            warn!("JMS InOut pattern not yet implemented");
416        }
417
418        // JMS-017: Pre-flight check — fail fast when bridge is unavailable.
419        // Probe the bridge slot before spawning the background loop so that
420        // missing bridge binaries or unreachable brokers are reported immediately
421        // instead of looping silently.
422        {
423            let slot = self.pool.get_or_create_slot(&self.broker_name).await?;
424            match &*slot.state_rx.borrow() {
425                BridgeState::Ready { .. } => {} // bridge is up — proceed
426                BridgeState::Degraded(reason) => {
427                    return Err(CamelError::ProcessorError(format!(
428                        "JMS bridge not available: {}",
429                        reason
430                    )));
431                }
432                other => {
433                    return Err(CamelError::ProcessorError(format!(
434                        "JMS bridge not available: {:?}",
435                        other
436                    )));
437                }
438            }
439        }
440
441        let pool = Arc::clone(&self.pool);
442        let broker_name = self.broker_name.clone();
443        let endpoint_config = self.endpoint_config.clone();
444        let reconnect = self.reconnect.clone();
445        let cancel = CancellationToken::new();
446        self.cancel_token = Some(cancel.clone());
447
448        // JMS-011: spawn concurrent consumer tasks
449        let consumer_count = endpoint_config.concurrent_consumers;
450        let runtime = self.runtime.clone();
451        // allow-unwrap: consumer_count validated >= 1 in from_uri
452        let handles: Vec<JoinHandle<Result<(), CamelError>>> = (0..consumer_count)
453            .map(|idx| {
454                let pool = Arc::clone(&pool);
455                let broker_name = broker_name.clone();
456                let endpoint_config = endpoint_config.clone();
457                let cancel = cancel.clone();
458                let ctx = ctx.clone();
459                let reconnect = reconnect.clone();
460                let runtime = runtime.clone();
461
462                tokio::spawn(async move {
463                    consumer_loop(
464                        &pool,
465                        &broker_name,
466                        &endpoint_config,
467                        &reconnect,
468                        cancel,
469                        &ctx,
470                        idx,
471                        runtime,
472                    )
473                    .await;
474                    Ok(())
475                })
476            })
477            .collect();
478
479        // Store all handles so stop() can abort/join every concurrent consumer.
480        self.task_handles = handles;
481
482        Ok(())
483    }
484
485    async fn stop(&mut self) -> Result<(), CamelError> {
486        if let Some(cancel) = self.cancel_token.take() {
487            cancel.cancel();
488        }
489        let handles = std::mem::take(&mut self.task_handles);
490        for mut handle in handles {
491            if tokio::time::timeout(Duration::from_secs(5), &mut handle)
492                .await
493                .is_err()
494            {
495                handle.abort();
496                let _ = handle.await;
497                warn!("JMS consumer task did not stop in 5s; aborted");
498            }
499        }
500        Ok(())
501    }
502
503    fn concurrency_model(&self) -> ConcurrencyModel {
504        ConcurrencyModel::Sequential
505    }
506
507    fn background_task_handle(
508        &mut self,
509    ) -> Option<tokio::task::JoinHandle<Result<(), CamelError>>> {
510        // JMS may have multiple concurrent consumer handles; return the first.
511        // The remaining handles are joined in stop().
512        self.task_handles.pop()
513    }
514}
515
516#[cfg(test)]
517mod tests {
518    use super::*;
519    use crate::BrokerType;
520    use crate::config::{JmsPoolConfig, jms_reconnect_default};
521    use camel_component_api::test_support::PanicRuntimeObservability;
522    use tokio::sync::mpsc;
523
524    fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
525        std::sync::Arc::new(PanicRuntimeObservability)
526    }
527
528    #[test]
529    fn build_exchange_text_body() {
530        let msg = JmsMessage {
531            message_id: "ID:1".to_string(),
532            body: b"hello world".to_vec(),
533            content_type: "text/plain".to_string(),
534            ..Default::default()
535        };
536        let ex = build_exchange(&msg, true);
537        assert!(matches!(ex.input.body, Body::Text(_)));
538    }
539
540    #[test]
541    fn build_exchange_binary_body() {
542        let msg = JmsMessage {
543            message_id: "ID:2".to_string(),
544            body: vec![0x00, 0x01, 0x02],
545            content_type: "application/octet-stream".to_string(),
546            ..Default::default()
547        };
548        let ex = build_exchange(&msg, true);
549        assert!(matches!(ex.input.body, Body::Bytes(_)));
550    }
551
552    #[test]
553    fn build_exchange_json_body() {
554        let msg = JmsMessage {
555            message_id: "ID:json".to_string(),
556            body: br#"{"ok":true}"#.to_vec(),
557            content_type: "application/json".to_string(),
558            ..Default::default()
559        };
560        let ex = build_exchange(&msg, true);
561        assert!(matches!(ex.input.body, Body::Json(_)));
562    }
563
564    #[test]
565    fn build_exchange_empty_body() {
566        let msg = JmsMessage {
567            message_id: "ID:3".to_string(),
568            body: vec![],
569            content_type: "".to_string(),
570            ..Default::default()
571        };
572        let ex = build_exchange(&msg, true);
573        assert!(matches!(ex.input.body, Body::Empty));
574    }
575
576    #[test]
577    fn build_exchange_without_header_mapping() {
578        // JMS-018: when map_jms_headers is false, JMS headers must not be set
579        let msg = JmsMessage {
580            message_id: "ID:header-test".to_string(),
581            correlation_id: "CORR:123".to_string(),
582            timestamp: 1700000000,
583            destination: "queue:orders".to_string(),
584            body: b"hello".to_vec(),
585            headers: Default::default(),
586            content_type: "text/plain".to_string(),
587        };
588        let ex = build_exchange(&msg, false);
589        assert!(ex.input.header("JMSMessageID").is_none());
590        assert!(ex.input.header("JMSCorrelationID").is_none());
591        assert!(ex.input.header("JMSTimestamp").is_none());
592        assert!(ex.input.header("JMSDestination").is_none());
593        assert!(ex.input.header("Content-Type").is_none());
594    }
595
596    #[tokio::test]
597    async fn stop_without_start_is_noop() {
598        let pool = Arc::new(
599            JmsBridgePool::from_config(JmsPoolConfig::single_broker(
600                "tcp://localhost:61616",
601                BrokerType::Generic,
602            ))
603            .unwrap(),
604        );
605        let endpoint_cfg = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
606        let mut consumer = JmsConsumer::new(
607            pool,
608            "default".to_string(),
609            endpoint_cfg,
610            jms_reconnect_default(),
611            rt(),
612        );
613        assert!(consumer.stop().await.is_ok());
614    }
615
616    // ── JMS-006: Consumer double-start guard ──────────────────────────────────
617
618    #[tokio::test]
619    async fn consumer_double_start_returns_error() {
620        let pool = Arc::new(
621            JmsBridgePool::from_config(JmsPoolConfig::single_broker(
622                "tcp://localhost:61616",
623                BrokerType::Generic,
624            ))
625            .unwrap(),
626        );
627        let endpoint_cfg = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
628        let mut consumer = JmsConsumer::new(
629            pool,
630            "default".to_string(),
631            endpoint_cfg,
632            jms_reconnect_default(),
633            rt(),
634        );
635
636        // Simulate an already-started state by setting a cancel token directly.
637        consumer.cancel_token = Some(CancellationToken::new());
638
639        let (route_tx, _route_rx) = mpsc::channel(16);
640        let ctx = ConsumerContext::new(
641            route_tx,
642            CancellationToken::new(),
643            "jms-test-route".to_string(),
644        );
645        let result = consumer.start(ctx).await;
646        assert!(result.is_err(), "second start must return an error");
647        let msg = result.unwrap_err().to_string();
648        assert!(
649            msg.contains("already started"),
650            "error must mention already started: {}",
651            msg
652        );
653    }
654
655    // ── JMS-002: Consumer stop joins task handle ─────────────────────────────
656
657    #[tokio::test]
658    async fn test_jms_consumer_stop_joins() {
659        let pool = Arc::new(
660            JmsBridgePool::from_config(JmsPoolConfig::single_broker(
661                "tcp://localhost:61616",
662                BrokerType::Generic,
663            ))
664            .unwrap(),
665        );
666        let endpoint_cfg = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
667        let mut consumer = JmsConsumer::new(
668            pool,
669            "default".to_string(),
670            endpoint_cfg,
671            jms_reconnect_default(),
672            rt(),
673        );
674
675        // Simulate a started consumer with a task that respects cancellation.
676        let cancel = CancellationToken::new();
677        consumer.cancel_token = Some(cancel.clone());
678        consumer.task_handles = vec![tokio::spawn({
679            let cancel = cancel.clone();
680            async move {
681                // Task waits for cancellation then exits cleanly.
682                cancel.cancelled().await;
683                Ok(())
684            }
685        })];
686
687        // stop() must not panic and must join the task.
688        let result = consumer.stop().await;
689        assert!(result.is_ok(), "stop must succeed: {:?}", result.err());
690        // Verify the handles were taken (tasks were joined).
691        assert!(
692            consumer.task_handles.is_empty(),
693            "task_handles must be cleared after stop"
694        );
695    }
696
697    // ── JMS-002: Consumer task panic is absorbed (Pattern S) ─────────────────
698
699    #[tokio::test]
700    async fn stop_absorbs_consumer_task_panic() {
701        let pool = Arc::new(
702            JmsBridgePool::from_config(JmsPoolConfig::single_broker(
703                "tcp://localhost:61616",
704                BrokerType::Generic,
705            ))
706            .unwrap(),
707        );
708        let endpoint_cfg = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
709        let mut consumer = JmsConsumer::new(
710            pool,
711            "default".to_string(),
712            endpoint_cfg,
713            jms_reconnect_default(),
714            rt(),
715        );
716
717        // Manually set a task handle that will panic.
718        consumer.task_handles = vec![tokio::spawn(async {
719            panic!("simulated consumer panic");
720        })];
721        // Give the panic time to materialize.
722        tokio::time::sleep(Duration::from_millis(50)).await;
723
724        // Pattern S: panic is absorbed, stop returns Ok.
725        let result = consumer.stop().await;
726        assert!(
727            result.is_ok(),
728            "stop must absorb panic and return Ok: {:?}",
729            result.err()
730        );
731    }
732
733    // ── JMS-017: Missing bridge returns Err immediately ───────────────────────
734
735    #[tokio::test]
736    async fn missing_bridge_returns_err_within_1s() {
737        struct EnvGuard {
738            key: &'static str,
739            prev: Option<std::ffi::OsString>,
740        }
741        impl Drop for EnvGuard {
742            fn drop(&mut self) {
743                if let Some(v) = &self.prev {
744                    unsafe { std::env::set_var(self.key, v) };
745                } else {
746                    unsafe { std::env::remove_var(self.key) };
747                }
748            }
749        }
750
751        let env_key = "CAMEL_JMS_BRIDGE_BINARY_PATH";
752        let _guard = EnvGuard {
753            key: env_key,
754            prev: std::env::var_os(env_key),
755        };
756        unsafe { std::env::set_var(env_key, "/bin/false") };
757
758        let pool = Arc::new(
759            JmsBridgePool::from_config(JmsPoolConfig {
760                brokers: std::collections::HashMap::from([(
761                    "default".to_string(),
762                    crate::config::BrokerConfig {
763                        broker_url: "tcp://localhost:61616".to_string(),
764                        broker_type: BrokerType::ActiveMq,
765                        username: None,
766                        password: None,
767                    },
768                )]),
769                bridge_start_timeout_ms: 100,
770                ..JmsPoolConfig::default()
771            })
772            .unwrap(),
773        );
774        let endpoint_cfg = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
775        let mut consumer = JmsConsumer::new(
776            pool,
777            "default".to_string(),
778            endpoint_cfg,
779            jms_reconnect_default(),
780            rt(),
781        );
782
783        let (route_tx, _route_rx) = mpsc::channel(16);
784        let ctx = ConsumerContext::new(
785            route_tx,
786            CancellationToken::new(),
787            "jms-test-route-2".to_string(),
788        );
789
790        let start = std::time::Instant::now();
791        let result = consumer.start(ctx).await;
792        let elapsed = start.elapsed();
793
794        assert!(result.is_err(), "expected Err when bridge missing, got Ok");
795        let msg = result.unwrap_err().to_string();
796        assert!(
797            msg.contains("JMS bridge not available"),
798            "error must mention bridge unavailability: {}",
799            msg
800        );
801        assert!(
802            elapsed < Duration::from_secs(1),
803            "must fail within 1s, took {:?}",
804            elapsed
805        );
806
807        // Cleanup: stop the health monitors etc.
808        let _ = consumer.stop().await;
809    }
810
811    /// Regression: max_attempts=N → exactly N invocations (caught OpenSearch off-by-one 1f5c4c2a).
812    /// Replicates the exact retry loop from `consumer_loop` (consumer.rs:166-177):
813    ///   attempt starts at 0, incremented on error, !should_retry(attempt), delay_for(attempt-1)
814    #[tokio::test]
815    async fn retry_loop_invokes_operation_exactly_max_attempts_times() {
816        use std::sync::Arc;
817        use std::sync::atomic::{AtomicU32, Ordering};
818        use std::time::Duration;
819
820        let policy = NetworkRetryPolicy {
821            max_attempts: 3,
822            initial_delay: Duration::from_millis(1),
823            max_delay: Duration::from_millis(1),
824            multiplier: 1.0,
825            ..NetworkRetryPolicy::default()
826        };
827
828        let calls = Arc::new(AtomicU32::new(0));
829        let calls_clone = Arc::clone(&calls);
830        let mut attempt: u32 = 0;
831
832        loop {
833            calls_clone.fetch_add(1, Ordering::SeqCst);
834            let result: Result<(), ()> = Err(());
835            match result {
836                Ok(_) => {
837                    attempt = 0;
838                    break;
839                }
840                Err(_) => {
841                    attempt += 1;
842                    if !policy.should_retry(attempt) {
843                        break;
844                    }
845                    let delay = policy.delay_for(attempt - 1);
846                    tokio::time::sleep(delay).await;
847                    continue;
848                }
849            }
850        }
851
852        assert_eq!(
853            calls.load(Ordering::SeqCst),
854            3,
855            "max_attempts=3 must yield exactly 3 invocations"
856        );
857    }
858}