Skip to main content

camel_component_jms/
component.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::path::PathBuf;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::task::{Context, Poll};
8use std::time::{Duration, Instant};
9
10use camel_bridge::{
11    channel::connect_channel,
12    download::ensure_binary,
13    health::wait_for_health,
14    process::{BridgeProcess, BridgeProcessConfig, BrokerType},
15};
16use camel_component_api::{
17    BoxProcessor, CamelError, Component, Consumer, Endpoint, Exchange, NetworkRetryPolicy,
18    ProducerContext,
19};
20use dashmap::DashMap;
21use tokio::sync::{Mutex, watch};
22use tonic::transport::Channel;
23use tower::Service;
24use tracing::{info, warn};
25
26use crate::config::{BrokerConfig, JmsEndpointConfig, JmsPoolConfig};
27use crate::consumer::JmsConsumer;
28use crate::health::JmsHealthCheck;
29use crate::producer::JmsProducer;
30use crate::proto::{HealthRequest, bridge_service_client::BridgeServiceClient};
31
32// ── Transport error classification ───────────────────────────────────────────
33
34/// Shared constant prefix for all bridge transport errors.
35///
36/// Both error-producing sites (producer.rs, consumer.rs) and the detection
37/// helper (`is_bridge_transport_error`) reference this constant so that a
38/// format-string drift cannot silently break retry logic.
39pub const BRIDGE_TRANSPORT_ERROR_PREFIX: &str = "JMS gRPC ";
40const MAX_RESTART_ATTEMPTS: u32 = 10;
41
42// ── BridgeState ──────────────────────────────────────────────────────────────
43
44#[derive(Debug, Clone)]
45pub enum BridgeState {
46    Starting,
47    Ready { channel: Channel },
48    Degraded(String),
49    Restarting { attempt: u32, next_at: Instant },
50    Stopped,
51}
52
53// ── BridgeSlot ───────────────────────────────────────────────────────────────
54
55pub struct BridgeSlot {
56    pub name: String,
57    pub broker_url: String,
58    pub broker_type: BrokerType,
59    pub credentials: Option<(String, String)>,
60    pub state_rx: watch::Receiver<BridgeState>,
61    pub(crate) state_tx: watch::Sender<BridgeState>,
62    /// BridgeProcess::stop(mut self) takes ownership — Mutex<Option<>> is required.
63    pub process: Arc<tokio::sync::Mutex<Option<BridgeProcess>>>,
64    /// JoinHandle of the health monitor task for this slot.
65    /// Stored so that shutdown can await the monitor and observe panics.
66    pub(crate) health_monitor_handle: Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>>,
67}
68
69impl std::fmt::Debug for BridgeSlot {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct("BridgeSlot")
72            .field("name", &self.name)
73            .field("broker_url", &self.broker_url)
74            .field("broker_type", &self.broker_type)
75            .finish()
76    }
77}
78
79// ── JmsBridgePool ────────────────────────────────────────────────────────────
80
81pub struct JmsBridgePool {
82    pub(crate) slots: DashMap<String, Arc<BridgeSlot>>,
83    pub(crate) config: HashMap<String, BrokerConfig>,
84    pub(crate) bridge_start_timeout_ms: u64,
85    pub(crate) reconnect: NetworkRetryPolicy,
86    pub(crate) health_check_interval_ms: u64,
87    pub(crate) bridge_version: String,
88    pub(crate) bridge_cache_dir: PathBuf,
89    /// Maximum number of concurrently active (Starting/Ready) bridges.
90    pub(crate) max_bridges: usize,
91    /// Serializes bridge admission (check + insert) to prevent race on max_bridges.
92    bridge_create_lock: Mutex<()>,
93    pub(crate) shutting_down: Arc<AtomicBool>,
94}
95
96impl JmsBridgePool {
97    pub fn from_config(pool_config: JmsPoolConfig) -> Result<Self, CamelError> {
98        pool_config.validate()?;
99        // Backward compat: broker_reconnect_interval_ms overrides
100        // reconnect.initial_delay when explicitly set (non-default).
101        let mut reconnect = pool_config.reconnect;
102        if pool_config.broker_reconnect_interval_ms
103            != crate::config::default_broker_reconnect_interval_ms()
104        {
105            reconnect.initial_delay =
106                Duration::from_millis(pool_config.broker_reconnect_interval_ms);
107        }
108        Ok(Self {
109            slots: DashMap::new(),
110            config: pool_config.brokers,
111            bridge_start_timeout_ms: pool_config.bridge_start_timeout_ms,
112            reconnect,
113            health_check_interval_ms: pool_config.health_check_interval_ms,
114            bridge_version: crate::BRIDGE_VERSION.to_string(),
115            bridge_cache_dir: pool_config.bridge_cache_dir,
116            max_bridges: pool_config.max_bridges,
117            bridge_create_lock: Mutex::new(()),
118            shutting_down: Arc::new(AtomicBool::new(false)),
119        })
120    }
121
122    /// Resolve broker name from the URI `broker=` param.
123    ///
124    /// - If `Some(name)` → validate it exists in config and return it.
125    /// - If `None` and exactly one broker is configured → use it implicitly.
126    /// - If `None` and multiple brokers are configured → error asking for `?broker=`.
127    /// - If `None` and no brokers are configured → error asking to declare brokers.
128    pub fn resolve_broker_name(&self, name: Option<&str>) -> Result<String, CamelError> {
129        match name {
130            Some(n) => {
131                if self.config.contains_key(n) {
132                    Ok(n.to_string())
133                } else {
134                    Err(CamelError::ProcessorError(format!(
135                        "Unknown JMS broker '{n}' — declare it in [components.jms.brokers] in Camel.toml",
136                    )))
137                }
138            }
139            None => match self.config.len() {
140                0 => Err(CamelError::ProcessorError(
141                    "No JMS brokers configured — declare at least one in [components.jms.brokers] in Camel.toml".to_string(),
142                )),
143                1 => Ok(self.config.keys().next().unwrap().clone()), // allow-unwrap
144                _ => Err(CamelError::ProcessorError(format!(
145                    "Multiple JMS brokers configured ({}); specify one with ?broker=<name> in the URI",
146                    self.config.keys().cloned().collect::<Vec<_>>().join(", ")
147                ))),
148            },
149        }
150    }
151
152    /// Resolve broker type: activemq/artemis schemes hard-override config type; jms uses config.
153    pub fn resolve_broker_type(&self, scheme: &str, broker_name: &str) -> BrokerType {
154        let config_type = self
155            .config
156            .get(broker_name)
157            .map(|c| c.broker_type.clone())
158            .unwrap_or(BrokerType::Generic);
159
160        match scheme {
161            "activemq" => {
162                if config_type != BrokerType::ActiveMq && config_type != BrokerType::Generic {
163                    warn!(
164                        "Scheme 'activemq' overrides configured broker_type '{:?}' for broker '{}'",
165                        config_type, broker_name
166                    );
167                }
168                BrokerType::ActiveMq
169            }
170            "artemis" => {
171                if config_type != BrokerType::Artemis && config_type != BrokerType::Generic {
172                    warn!(
173                        "Scheme 'artemis' overrides configured broker_type '{:?}' for broker '{}'",
174                        config_type, broker_name
175                    );
176                }
177                BrokerType::Artemis
178            }
179            _ => config_type,
180        }
181    }
182
183    /// Get or create a BridgeSlot for the given broker name.
184    /// If the slot doesn't exist, starts the bridge process and spawns the health monitor.
185    pub async fn get_or_create_slot(
186        &self,
187        broker_name: &str,
188    ) -> Result<Arc<BridgeSlot>, CamelError> {
189        if let Some(slot) = self.slots.get(broker_name) {
190            return Ok(Arc::clone(&*slot));
191        }
192
193        // Serialize admission: check + insert in single critical section to prevent
194        // concurrent creators from both seeing count below limit and exceeding max_bridges.
195        let _guard = self.bridge_create_lock.lock().await;
196
197        // Re-check after acquiring lock (another caller may have inserted while we waited).
198        if let Some(slot) = self.slots.get(broker_name) {
199            return Ok(Arc::clone(&*slot));
200        }
201
202        // Enforce max_bridges: count ALL slots in the map under the admission lock.
203        // We count total slots (not just Starting/Ready) because any inserted slot
204        // represents an allocated bridge — even Degraded/Restarting slots hold resources
205        // and the bridge process may still be running. Counting only active states would
206        // allow a race: slot A transitions Starting→Degraded between two callers' checks,
207        // letting both pass the limit.
208        let total_count = self.slots.len();
209        if total_count >= self.max_bridges {
210            return Err(CamelError::Config(format!(
211                "JMS bridge limit reached: {total_count} bridge(s) >= max_bridges ({})",
212                self.max_bridges
213            )));
214        }
215
216        let broker_config = self.config.get(broker_name).ok_or_else(|| {
217            CamelError::ProcessorError(format!("Unknown JMS broker '{}'", broker_name))
218        })?;
219
220        // Clone all required broker data before touching DashMap::entry().
221        let broker_url = broker_config.broker_url.clone();
222        let broker_type = broker_config.broker_type.clone();
223        let credentials = match (&broker_config.username, &broker_config.password) {
224            (Some(u), Some(p)) => Some((u.clone(), p.clone())),
225            _ => None,
226        };
227
228        let slot = match self.slots.entry(broker_name.to_string()) {
229            dashmap::Entry::Occupied(existing) => {
230                return Ok(Arc::clone(existing.get()));
231            }
232            dashmap::Entry::Vacant(entry) => {
233                let (state_tx, state_rx) = watch::channel(BridgeState::Starting);
234                let slot = Arc::new(BridgeSlot {
235                    name: broker_name.to_string(),
236                    broker_url: broker_url.clone(),
237                    broker_type: broker_type.clone(),
238                    credentials: credentials.clone(),
239                    state_rx,
240                    state_tx,
241                    process: Arc::new(tokio::sync::Mutex::new(None)),
242                    health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
243                });
244                entry.insert(Arc::clone(&slot));
245                slot
246            }
247        };
248
249        let start_result = Self::start_bridge_process(
250            &self.bridge_version,
251            &self.bridge_cache_dir,
252            self.bridge_start_timeout_ms,
253            &broker_url,
254            &broker_type,
255            &credentials,
256        )
257        .await;
258
259        match start_result {
260            Ok((process, channel)) => {
261                {
262                    let mut guard = slot.process.lock().await;
263                    *guard = Some(process);
264                }
265                let _ = slot.state_tx.send(BridgeState::Ready { channel });
266            }
267            Err(e) => {
268                let _ = slot
269                    .state_tx
270                    .send(BridgeState::Degraded(format!("Initial start failed: {e}")));
271            }
272        }
273
274        self.spawn_health_monitor(Arc::clone(&slot)).await;
275
276        Ok(slot)
277    }
278
279    /// Signal a slot to restart (called by producers on transport errors).
280    pub fn restart_slot(&self, broker_name: &str) {
281        if let Some(slot) = self.slots.get(broker_name) {
282            let _ = slot.state_tx.send(BridgeState::Restarting {
283                attempt: 0,
284                next_at: Instant::now(),
285            });
286        }
287    }
288
289    /// Recreate the tonic channel for an existing running bridge process.
290    ///
291    /// Useful when a channel becomes stale after transport-level failures while
292    /// the underlying bridge process is still alive.
293    pub async fn refresh_slot_channel(&self, broker_name: &str) -> Result<(), CamelError> {
294        let slot = self
295            .slots
296            .get(broker_name)
297            .map(|s| Arc::clone(&*s))
298            .ok_or_else(|| {
299                CamelError::ProcessorError(format!("Unknown JMS broker '{}'", broker_name))
300            })?;
301
302        let port = {
303            let guard = slot.process.lock().await;
304            let process = guard.as_ref().ok_or_else(|| {
305                CamelError::ProcessorError(format!(
306                    "JMS broker '{}' has no running bridge process",
307                    broker_name
308                ))
309            })?;
310            process.grpc_port()
311        };
312
313        let channel = connect_channel(port).await.map_err(|e| {
314            CamelError::ProcessorError(format!(
315                "JMS broker '{}' channel refresh failed: {}",
316                broker_name, e
317            ))
318        })?;
319
320        let _ = slot.state_tx.send(BridgeState::Ready { channel });
321        Ok(())
322    }
323
324    pub fn begin_shutdown(&self) {
325        self.shutting_down.store(true, Ordering::SeqCst);
326    }
327
328    /// Shutdown all slots: stop all bridge processes and await health monitors.
329    pub async fn shutdown(&self) -> Result<(), CamelError> {
330        self.begin_shutdown();
331        let names: Vec<String> = self.slots.iter().map(|e| e.key().clone()).collect();
332        let mut errors: Vec<String> = Vec::new();
333
334        for name in names {
335            if let Some((_, slot)) = self.slots.remove(&name) {
336                // Signal the health monitor to stop.
337                let _ = slot.state_tx.send(BridgeState::Stopped);
338
339                // Stop the bridge process.
340                let process = {
341                    let mut guard = slot.process.lock().await;
342                    guard.take()
343                };
344                if let Some(p) = process
345                    && let Err(e) = p.stop().await
346                {
347                    errors.push(format!("broker '{}': process stop failed: {e}", slot.name));
348                }
349
350                // Await the health monitor task with timeout; abort if it doesn't stop.
351                let monitor_handle = {
352                    let mut guard = slot.health_monitor_handle.lock().await;
353                    guard.take()
354                };
355                if let Some(mut h) = monitor_handle
356                    && tokio::time::timeout(Duration::from_secs(5), &mut h)
357                        .await
358                        .is_err()
359                {
360                    h.abort();
361                    let _ = h.await;
362                    warn!(
363                        "health monitor for '{}' did not stop in 5s; aborted",
364                        slot.name
365                    );
366                }
367            }
368        }
369
370        if errors.is_empty() {
371            Ok(())
372        } else {
373            Err(CamelError::ProcessorError(format!(
374                "JMS pool shutdown completed with {} error(s): {}",
375                errors.len(),
376                errors.join("; ")
377            )))
378        }
379    }
380
381    async fn spawn_health_monitor(&self, slot: Arc<BridgeSlot>) {
382        let health_interval = self.health_check_interval_ms;
383        let bridge_version = self.bridge_version.clone();
384        let bridge_cache_dir = self.bridge_cache_dir.clone();
385        let start_timeout_ms = self.bridge_start_timeout_ms;
386        let handle_ref = Arc::clone(&slot.health_monitor_handle);
387        let shutting_down = Arc::clone(&self.shutting_down);
388        let broker_name = slot.name.clone();
389
390        let handle = tokio::spawn(async move {
391            loop {
392                let state = slot.state_rx.borrow().clone();
393                match state {
394                    BridgeState::Stopped => {
395                        info!("Health monitor for '{}' exiting (Stopped)", slot.name);
396                        break;
397                    }
398                    BridgeState::Ready { ref channel } => {
399                        tokio::time::sleep(Duration::from_millis(health_interval)).await;
400                        let mut client = BridgeServiceClient::new(channel.clone());
401                        let health_timeout = Duration::from_secs(3);
402                        match tokio::time::timeout(health_timeout, client.health(HealthRequest {}))
403                            .await
404                        {
405                            Ok(Ok(_)) => {}
406                            Ok(Err(e)) => {
407                                warn!(
408                                    "Health check failed for broker '{}': {e}. Marking Degraded.",
409                                    slot.name
410                                );
411                                let _ = slot.state_tx.send(BridgeState::Degraded(e.to_string()));
412                            }
413                            Err(_) => {
414                                let msg = format!(
415                                    "health RPC timed out after {}ms",
416                                    health_timeout.as_millis()
417                                );
418                                warn!(
419                                    "Health check timed out for broker '{}': {}. Marking Degraded.",
420                                    slot.name, msg
421                                );
422                                let _ = slot.state_tx.send(BridgeState::Degraded(msg));
423                            }
424                        }
425                    }
426                    BridgeState::Degraded(_) | BridgeState::Starting => {
427                        if matches!(*slot.state_rx.borrow(), BridgeState::Stopped) {
428                            break;
429                        }
430                        if shutting_down.load(Ordering::SeqCst) {
431                            tracing::info!(
432                                "Pool shutting down — not restarting bridge for broker '{}'",
433                                broker_name
434                            );
435                            break;
436                        }
437                        let _ = slot.state_tx.send(BridgeState::Restarting {
438                            attempt: 0,
439                            next_at: Instant::now(),
440                        });
441                    }
442                    BridgeState::Restarting { attempt, next_at } => {
443                        if shutting_down.load(Ordering::SeqCst) {
444                            tracing::info!(
445                                "Pool shutting down — aborting restart for broker '{}'",
446                                broker_name
447                            );
448                            break;
449                        }
450
451                        let now = Instant::now();
452                        if now < next_at {
453                            tokio::time::sleep(next_at - now).await;
454                        }
455
456                        info!(
457                            "Restarting bridge for broker '{}' (attempt {})",
458                            slot.name,
459                            attempt + 1
460                        );
461
462                        if attempt >= MAX_RESTART_ATTEMPTS {
463                            // log-policy: system-broken
464                            tracing::error!(
465                                "Max restart attempts ({}) reached for broker '{}' — staying degraded",
466                                attempt,
467                                broker_name
468                            );
469                            let _ = slot.state_tx.send(BridgeState::Degraded(format!(
470                                "max restart attempts ({}) exceeded",
471                                attempt
472                            )));
473                            break;
474                        }
475
476                        let old_process = {
477                            let mut guard = slot.process.lock().await;
478                            guard.take()
479                        };
480                        if let Some(p) = old_process {
481                            let _ = p.stop().await;
482                        }
483
484                        let start_result = Self::start_bridge_process(
485                            &bridge_version,
486                            &bridge_cache_dir,
487                            start_timeout_ms,
488                            &slot.broker_url,
489                            &slot.broker_type,
490                            &slot.credentials,
491                        )
492                        .await;
493
494                        match start_result {
495                            Ok((process, channel)) => {
496                                // Guard: don't resurrect a stopped slot (shutdown may have run
497                                // while this async bridge start was in-flight).
498                                if matches!(*slot.state_rx.borrow(), BridgeState::Stopped) {
499                                    let _ = process.stop().await;
500                                    break;
501                                }
502                                {
503                                    let mut guard = slot.process.lock().await;
504                                    *guard = Some(process);
505                                }
506                                let _ = slot.state_tx.send(BridgeState::Ready { channel });
507                                info!("Broker '{}' bridge restarted successfully", slot.name);
508                            }
509                            Err(e) => {
510                                // Guard: don't schedule retries after shutdown.
511                                if matches!(*slot.state_rx.borrow(), BridgeState::Stopped) {
512                                    break;
513                                }
514                                let delay_secs = std::cmp::min(5 * 2u64.pow(attempt), 120);
515                                let next = Instant::now() + Duration::from_secs(delay_secs);
516                                warn!(
517                                    "Failed to restart bridge for '{}' (attempt {}): {e}. Retry in {delay_secs}s",
518                                    slot.name,
519                                    attempt + 1
520                                );
521                                let _ = slot.state_tx.send(BridgeState::Restarting {
522                                    attempt: attempt + 1,
523                                    next_at: next,
524                                });
525                            }
526                        }
527                    }
528                }
529            }
530        });
531
532        // Store the handle so shutdown can await the monitor.
533        let mut guard = handle_ref.lock().await;
534        *guard = Some(handle);
535    }
536
537    async fn start_bridge_process(
538        bridge_version: &str,
539        bridge_cache_dir: &std::path::Path,
540        start_timeout_ms: u64,
541        broker_url: &str,
542        broker_type: &BrokerType,
543        credentials: &Option<(String, String)>,
544    ) -> Result<(BridgeProcess, Channel), CamelError> {
545        info!(
546            "Starting JMS bridge process for {}...",
547            redact_url(broker_url)
548        );
549        let binary_path = ensure_binary(bridge_version, bridge_cache_dir)
550            .await
551            .map_err(|e| {
552                CamelError::ProcessorError(format!("JMS bridge binary unavailable: {e}"))
553            })?;
554
555        let process_config = BridgeProcessConfig::jms(
556            binary_path,
557            broker_url.to_string(),
558            broker_type.clone(),
559            credentials.as_ref().map(|(u, _)| u.clone()),
560            credentials
561                .as_ref()
562                .map(|(_, p)| camel_bridge::process::Redacted::new(p.clone())),
563            start_timeout_ms,
564        );
565
566        let total_timeout = Duration::from_millis(start_timeout_ms);
567        let result = tokio::time::timeout(total_timeout, async {
568            let process = BridgeProcess::start(&process_config)
569                .await
570                .map_err(|e| CamelError::ProcessorError(format!("JMS bridge start failed: {e}")))?;
571
572            let port = process.grpc_port();
573            let channel = connect_channel(port).await.map_err(|e| {
574                CamelError::ProcessorError(format!("JMS bridge channel connect failed: {e}"))
575            })?;
576
577            wait_for_health(&channel, Duration::from_secs(10), |ch| {
578                let mut client = BridgeServiceClient::new(ch);
579                async move {
580                    let resp = client.health(HealthRequest {}).await?;
581                    Ok(resp.into_inner().healthy)
582                }
583            })
584            .await
585            .map_err(|e| {
586                CamelError::ProcessorError(format!("JMS bridge health check failed: {e}"))
587            })?;
588
589            Ok::<(BridgeProcess, Channel), CamelError>((process, channel))
590        })
591        .await
592        .map_err(|_| {
593            CamelError::ProcessorError(format!(
594                "JMS bridge start timed out after {}ms",
595                start_timeout_ms
596            ))
597        })??;
598
599        Ok(result)
600    }
601}
602
603// ── JmsComponent ─────────────────────────────────────────────────────────────
604
605#[derive(Clone)]
606pub struct JmsComponent {
607    scheme: String,
608    pool: Arc<JmsBridgePool>,
609}
610
611impl JmsComponent {
612    pub fn with_scheme(scheme: impl Into<String>, pool: Arc<JmsBridgePool>) -> Self {
613        Self {
614            scheme: scheme.into(),
615            pool,
616        }
617    }
618
619    pub fn scheme(&self) -> &str {
620        &self.scheme
621    }
622
623    /// Test helper: send a message directly without going through a route.
624    #[cfg(test)]
625    pub async fn send_for_test(
626        &self,
627        destination: &str,
628        body: &[u8],
629        content_type: &str,
630    ) -> Result<String, CamelError> {
631        let broker_name = self.pool.resolve_broker_name(None)?;
632        let slot = self.pool.get_or_create_slot(&broker_name).await?;
633        let channel = match &*slot.state_rx.borrow() {
634            BridgeState::Ready { channel } => channel.clone(),
635            other => {
636                return Err(CamelError::ProcessorError(format!(
637                    "Bridge not ready: {:?}",
638                    other
639                )));
640            }
641        };
642        let mut client = BridgeServiceClient::new(channel);
643        let r = client
644            .send(crate::proto::SendRequest {
645                destination: destination.to_string(),
646                body: body.to_vec(),
647                headers: Default::default(),
648                content_type: content_type.to_string(),
649            })
650            .await
651            .map_err(|e| CamelError::ProcessorError(format!("test send error: {e}")))?;
652        Ok(r.into_inner().message_id)
653    }
654}
655
656impl Component for JmsComponent {
657    fn scheme(&self) -> &str {
658        &self.scheme
659    }
660
661    fn create_endpoint(
662        &self,
663        uri: &str,
664        ctx: &dyn camel_component_api::ComponentContext,
665    ) -> Result<Box<dyn Endpoint>, CamelError> {
666        let endpoint_config = JmsEndpointConfig::from_uri(uri)?;
667        let broker_name = self
668            .pool
669            .resolve_broker_name(endpoint_config.broker_name.as_deref())?;
670        let resolved_broker_type = self.pool.resolve_broker_type(&self.scheme, &broker_name);
671
672        let health_check = JmsHealthCheck::new(Arc::clone(&self.pool), broker_name.clone());
673        ctx.register_current_route_health_check(Arc::new(health_check));
674
675        Ok(Box::new(JmsEndpoint {
676            pool: Arc::clone(&self.pool),
677            uri: uri.to_string(),
678            broker_name,
679            resolved_broker_type,
680            endpoint_config,
681        }))
682    }
683}
684
685// ── JmsEndpoint ──────────────────────────────────────────────────────────────
686
687struct JmsEndpoint {
688    pool: Arc<JmsBridgePool>,
689    uri: String,
690    broker_name: String,
691    resolved_broker_type: BrokerType,
692    endpoint_config: JmsEndpointConfig,
693}
694
695impl Endpoint for JmsEndpoint {
696    fn uri(&self) -> &str {
697        &self.uri
698    }
699
700    fn create_producer(
701        &self,
702        rt: Arc<dyn camel_component_api::RuntimeObservability>,
703        _ctx: &ProducerContext,
704    ) -> Result<BoxProcessor, CamelError> {
705        Ok(BoxProcessor::new(LazyJmsProducer {
706            pool: Arc::clone(&self.pool),
707            broker_name: self.broker_name.clone(),
708            endpoint_config: self.endpoint_config.clone(),
709            resolved_broker_type: self.resolved_broker_type.clone(),
710            runtime: rt,
711        }))
712    }
713
714    fn create_consumer(
715        &self,
716        rt: Arc<dyn camel_component_api::RuntimeObservability>,
717    ) -> Result<Box<dyn Consumer>, CamelError> {
718        Ok(Box::new(JmsConsumer::new(
719            Arc::clone(&self.pool),
720            self.broker_name.clone(),
721            self.endpoint_config.clone(),
722            self.pool.reconnect.clone(),
723            rt,
724        )))
725    }
726}
727
728#[derive(Clone)]
729struct LazyJmsProducer {
730    pool: Arc<JmsBridgePool>,
731    broker_name: String,
732    endpoint_config: JmsEndpointConfig,
733    #[allow(dead_code)]
734    resolved_broker_type: BrokerType,
735    /// Phase B will use this for `rt.metrics().increment_errors(...)` and
736    /// `rt.health().force_unhealthy_for_route(...)` calls per ADR-0012.
737    #[allow(dead_code)]
738    runtime: Arc<dyn camel_component_api::RuntimeObservability>,
739}
740
741impl Service<Exchange> for LazyJmsProducer {
742    type Response = Exchange;
743    type Error = CamelError;
744    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
745
746    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
747        // Check existing slot state if one is already present.
748        // If no slot exists yet, return Ready — call() will handle async bridge start.
749        if let Some(slot) = self.pool.slots.get(&self.broker_name) {
750            match &*slot.state_rx.borrow() {
751                BridgeState::Ready { .. } => return Poll::Ready(Ok(())),
752                BridgeState::Starting | BridgeState::Restarting { .. } => {
753                    // Register a waker so the executor is notified when bridge state
754                    // changes. Without this, Poll::Pending would stall callers that use
755                    // strict Tower semantics (poll_ready loop before call()).
756                    // Guard with try_current: fall back to wake_by_ref when no Tokio
757                    // runtime is active (e.g. unit tests).
758                    let waker = cx.waker().clone();
759                    let mut rx = slot.state_rx.clone();
760                    if let Ok(handle) = tokio::runtime::Handle::try_current() {
761                        handle.spawn(async move {
762                            let _ = rx.changed().await;
763                            waker.wake();
764                        });
765                    } else {
766                        waker.wake_by_ref();
767                    }
768                    return Poll::Pending;
769                }
770                BridgeState::Degraded(reason) => {
771                    return Poll::Ready(Err(CamelError::ProcessorError(format!(
772                        "JMS broker '{}' is degraded: {}",
773                        self.broker_name, reason
774                    ))));
775                }
776                BridgeState::Stopped => {
777                    return Poll::Ready(Err(CamelError::ProcessorError(format!(
778                        "JMS broker '{}' is stopped",
779                        self.broker_name
780                    ))));
781                }
782            }
783        }
784        Poll::Ready(Ok(()))
785    }
786
787    fn call(&mut self, exchange: Exchange) -> Self::Future {
788        let pool = Arc::clone(&self.pool);
789        let broker_name = self.broker_name.clone();
790        let endpoint_config = self.endpoint_config.clone();
791
792        Box::pin(async move {
793            let slot = pool.get_or_create_slot(&broker_name).await?;
794            let mut rx = slot.state_rx.clone();
795
796            loop {
797                let state = rx.borrow().clone();
798                match state {
799                    BridgeState::Ready { channel } => {
800                        let mut producer = JmsProducer::new(channel, endpoint_config.clone());
801                        match producer.call(exchange).await {
802                            Ok(done) => return Ok(done),
803                            Err(first_err) if is_bridge_transport_error(&first_err) => {
804                                warn!(
805                                    broker = %broker_name,
806                                    error = %first_err,
807                                    "JMS send transport error; refreshing channel (no automatic resend)"
808                                );
809
810                                if let Err(refresh_err) =
811                                    pool.refresh_slot_channel(&broker_name).await
812                                {
813                                    warn!(
814                                        broker = %broker_name,
815                                        error = %refresh_err,
816                                        "JMS channel refresh failed; requesting bridge restart"
817                                    );
818                                    pool.restart_slot(&broker_name);
819                                }
820
821                                // Do NOT automatically resend — the first send may have reached
822                                // the broker even though the ack failed. Resending non-idempotent
823                                // writes causes duplicates. Return the original error so the caller
824                                // can decide whether to retry.
825                                return Err(first_err);
826                            }
827                            Err(other_err) => return Err(other_err),
828                        }
829                    }
830                    BridgeState::Degraded(reason) => {
831                        return Err(CamelError::ProcessorError(format!(
832                            "JMS broker '{}' is degraded: {}",
833                            broker_name, reason
834                        )));
835                    }
836                    BridgeState::Stopped => {
837                        return Err(CamelError::ProcessorError(format!(
838                            "JMS broker '{}' is stopped",
839                            broker_name
840                        )));
841                    }
842                    BridgeState::Starting | BridgeState::Restarting { .. } => {
843                        if rx.changed().await.is_err() {
844                            return Err(CamelError::ProcessorError(format!(
845                                "JMS broker '{}' state channel closed",
846                                broker_name
847                            )));
848                        }
849                    }
850                }
851            }
852        })
853    }
854}
855
856// ── Helpers ──────────────────────────────────────────────────────────────────
857
858/// Redact userinfo (username:password@) from a broker URL for safe logging.
859/// Handles URLs like `tcp://user:pass@host:61616` → `tcp://***@host:61616`.
860fn redact_url(url: &str) -> String {
861    // Find the scheme separator (://)
862    if let Some(pos) = url.find("://") {
863        let scheme = &url[..pos + 3]; // includes "://"
864        let rest = &url[pos + 3..];
865        // Find @ in the remainder — everything before @ is userinfo
866        if let Some(at_pos) = rest.find('@') {
867            return format!("{}***@{}", scheme, &rest[at_pos + 1..]);
868        }
869    }
870    url.to_string()
871}
872
873pub fn is_bridge_transport_error(err: &CamelError) -> bool {
874    // Typed variant matching: only ProcessorError messages that start with
875    // the well-known transport prefix are classified as transport errors.
876    // This rejects Config errors, business errors, and other CamelError variants
877    // without relying on the Display wrapper formatting.
878    match err {
879        CamelError::ProcessorError(msg) => msg.starts_with(BRIDGE_TRANSPORT_ERROR_PREFIX),
880        _ => false,
881    }
882}
883
884// ── Unit tests ───────────────────────────────────────────────────────────────
885
886#[cfg(test)]
887mod tests {
888    use camel_component_api::test_support::PanicRuntimeObservability;
889    fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
890        std::sync::Arc::new(PanicRuntimeObservability)
891    }
892    fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
893        std::sync::Arc::new(PanicRuntimeObservability)
894    }
895
896    use super::*;
897    use crate::config::{BrokerConfig, JmsPoolConfig};
898    use std::collections::HashMap;
899
900    #[test]
901    fn from_config_accepts_empty_brokers() {
902        let pool_config = JmsPoolConfig::default();
903        let result = JmsBridgePool::from_config(pool_config);
904        assert!(result.is_ok());
905    }
906
907    #[test]
908    fn resolve_broker_name_with_explicit_name() {
909        let pool = JmsBridgePool::from_config(JmsPoolConfig::single_broker(
910            "tcp://localhost:61616",
911            BrokerType::ActiveMq,
912        ))
913        .unwrap();
914        assert_eq!(
915            pool.resolve_broker_name(Some("default")).unwrap(),
916            "default"
917        );
918    }
919
920    #[test]
921    fn resolve_broker_name_default() {
922        let pool = JmsBridgePool::from_config(JmsPoolConfig::single_broker(
923            "tcp://localhost:61616",
924            BrokerType::ActiveMq,
925        ))
926        .unwrap();
927        assert_eq!(pool.resolve_broker_name(None).unwrap(), "default");
928    }
929
930    #[test]
931    fn resolve_broker_name_unknown_returns_error() {
932        let pool = JmsBridgePool::from_config(JmsPoolConfig::single_broker(
933            "tcp://localhost:61616",
934            BrokerType::ActiveMq,
935        ))
936        .unwrap();
937        let err = pool.resolve_broker_name(Some("unknown")).unwrap_err();
938        assert!(
939            err.to_string().contains("Unknown JMS broker 'unknown'"),
940            "got: {}",
941            err
942        );
943    }
944
945    #[test]
946    fn resolve_broker_type_scheme_overrides() {
947        let pool = JmsBridgePool::from_config(JmsPoolConfig::single_broker(
948            "tcp://localhost:61616",
949            BrokerType::Generic,
950        ))
951        .unwrap();
952        assert_eq!(
953            pool.resolve_broker_type("activemq", "default"),
954            BrokerType::ActiveMq
955        );
956        assert_eq!(
957            pool.resolve_broker_type("artemis", "default"),
958            BrokerType::Artemis
959        );
960        assert_eq!(
961            pool.resolve_broker_type("jms", "default"),
962            BrokerType::Generic
963        );
964    }
965
966    #[test]
967    fn resolve_broker_type_activemq_scheme_overrides_artemis_config() {
968        let pool = JmsBridgePool::from_config(JmsPoolConfig {
969            brokers: HashMap::from([(
970                "main".to_string(),
971                BrokerConfig {
972                    broker_url: "tcp://localhost:61616".to_string(),
973                    broker_type: BrokerType::Artemis,
974                    username: None,
975                    password: None,
976                },
977            )]),
978            ..JmsPoolConfig::default()
979        })
980        .unwrap();
981        assert_eq!(
982            pool.resolve_broker_type("activemq", "main"),
983            BrokerType::ActiveMq
984        );
985        assert_eq!(pool.resolve_broker_type("jms", "main"), BrokerType::Artemis);
986    }
987
988    #[test]
989    fn create_endpoint_resolves_broker() {
990        let pool = Arc::new(
991            JmsBridgePool::from_config(JmsPoolConfig::single_broker(
992                "tcp://localhost:61616",
993                BrokerType::ActiveMq,
994            ))
995            .unwrap(),
996        );
997        let component = JmsComponent::with_scheme("jms", pool);
998        let endpoint = component.create_endpoint(
999            "jms:queue:orders",
1000            &camel_component_api::NoOpComponentContext,
1001        );
1002        assert!(endpoint.is_ok(), "got: {:?}", endpoint.err());
1003    }
1004
1005    #[test]
1006    fn create_endpoint_rejects_wrong_scheme() {
1007        let pool = Arc::new(
1008            JmsBridgePool::from_config(JmsPoolConfig::single_broker(
1009                "tcp://localhost:61616",
1010                BrokerType::ActiveMq,
1011            ))
1012            .unwrap(),
1013        );
1014        let component = JmsComponent::with_scheme("jms", pool);
1015        let err = component
1016            .create_endpoint("kafka:orders", &camel_component_api::NoOpComponentContext)
1017            .err()
1018            .unwrap();
1019        assert!(
1020            err.to_string()
1021                .contains("expected scheme 'jms', 'activemq', or 'artemis'"),
1022            "got: {}",
1023            err
1024        );
1025    }
1026
1027    #[test]
1028    fn create_endpoint_with_explicit_broker_param() {
1029        let pool = Arc::new(
1030            JmsBridgePool::from_config(JmsPoolConfig {
1031                brokers: HashMap::from([
1032                    (
1033                        "primary".to_string(),
1034                        BrokerConfig {
1035                            broker_url: "tcp://primary:61616".to_string(),
1036                            broker_type: BrokerType::ActiveMq,
1037                            username: None,
1038                            password: None,
1039                        },
1040                    ),
1041                    (
1042                        "secondary".to_string(),
1043                        BrokerConfig {
1044                            broker_url: "tcp://secondary:61616".to_string(),
1045                            broker_type: BrokerType::Artemis,
1046                            username: None,
1047                            password: None,
1048                        },
1049                    ),
1050                ]),
1051                ..JmsPoolConfig::default()
1052            })
1053            .unwrap(),
1054        );
1055        let component = JmsComponent::with_scheme("jms", Arc::clone(&pool));
1056        let endpoint = component.create_endpoint(
1057            "jms:queue:orders?broker=secondary",
1058            &camel_component_api::NoOpComponentContext,
1059        );
1060        assert!(endpoint.is_ok(), "got: {:?}", endpoint.err());
1061    }
1062
1063    #[tokio::test]
1064    async fn concurrent_get_or_create_slot_no_deadlock() {
1065        use tokio::time::timeout;
1066
1067        struct EnvGuard {
1068            key: &'static str,
1069            prev: Option<std::ffi::OsString>,
1070        }
1071        impl Drop for EnvGuard {
1072            fn drop(&mut self) {
1073                if let Some(v) = &self.prev {
1074                    // SAFETY: restoring process env in test scope.
1075                    unsafe { std::env::set_var(self.key, v) };
1076                } else {
1077                    // SAFETY: restoring process env in test scope.
1078                    unsafe { std::env::remove_var(self.key) };
1079                }
1080            }
1081        }
1082
1083        let env_key = "CAMEL_JMS_BRIDGE_BINARY_PATH";
1084        let _guard = EnvGuard {
1085            key: env_key,
1086            prev: std::env::var_os(env_key),
1087        };
1088        // SAFETY: test-scoped env mutation.
1089        unsafe { std::env::set_var(env_key, "/bin/false") };
1090
1091        let pool = Arc::new(
1092            JmsBridgePool::from_config(JmsPoolConfig {
1093                brokers: HashMap::from([(
1094                    "test".to_string(),
1095                    BrokerConfig {
1096                        broker_url: "tcp://localhost:61616".to_string(),
1097                        broker_type: BrokerType::ActiveMq,
1098                        username: None,
1099                        password: None,
1100                    },
1101                )]),
1102                bridge_start_timeout_ms: 100,
1103                ..JmsPoolConfig::default()
1104            })
1105            .unwrap(),
1106        );
1107
1108        let handles: Vec<_> = (0..5)
1109            .map(|_| {
1110                let pool = Arc::clone(&pool);
1111                tokio::spawn(async move {
1112                    let _ = pool.get_or_create_slot("test").await;
1113                })
1114            })
1115            .collect();
1116
1117        let result = timeout(Duration::from_secs(5), async {
1118            for h in handles {
1119                let _ = h.await;
1120            }
1121        })
1122        .await;
1123
1124        assert!(result.is_ok(), "Concurrent get_or_create_slot deadlocked!");
1125    }
1126
1127    #[tokio::test]
1128    async fn lazy_producer_reports_degraded_when_bridge_start_fails() {
1129        use tower::Service;
1130
1131        struct EnvGuard {
1132            key: &'static str,
1133            prev: Option<std::ffi::OsString>,
1134        }
1135        impl Drop for EnvGuard {
1136            fn drop(&mut self) {
1137                if let Some(v) = &self.prev {
1138                    // SAFETY: restoring process env in test scope.
1139                    unsafe { std::env::set_var(self.key, v) };
1140                } else {
1141                    // SAFETY: restoring process env in test scope.
1142                    unsafe { std::env::remove_var(self.key) };
1143                }
1144            }
1145        }
1146
1147        let env_key = "CAMEL_JMS_BRIDGE_BINARY_PATH";
1148        let _guard = EnvGuard {
1149            key: env_key,
1150            prev: std::env::var_os(env_key),
1151        };
1152        // SAFETY: test-scoped env mutation.
1153        unsafe { std::env::set_var(env_key, "/bin/false") };
1154
1155        let pool = Arc::new(
1156            JmsBridgePool::from_config(JmsPoolConfig {
1157                brokers: HashMap::from([(
1158                    "default".to_string(),
1159                    BrokerConfig {
1160                        broker_url: "tcp://localhost:61616".to_string(),
1161                        broker_type: BrokerType::ActiveMq,
1162                        username: None,
1163                        password: None,
1164                    },
1165                )]),
1166                bridge_start_timeout_ms: 100,
1167                ..JmsPoolConfig::default()
1168            })
1169            .unwrap(),
1170        );
1171
1172        let component = JmsComponent::with_scheme("jms", pool);
1173        let endpoint = component
1174            .create_endpoint(
1175                "jms:queue:orders",
1176                &camel_component_api::NoOpComponentContext,
1177            )
1178            .unwrap();
1179        let mut producer = endpoint
1180            .create_producer(rt(), &camel_component_api::ProducerContext::default())
1181            .unwrap();
1182
1183        let mut exchange = Exchange::default();
1184        exchange.input.body = camel_component_api::Body::Text("hello".to_string());
1185
1186        let err = producer.call(exchange).await.unwrap_err();
1187        assert!(err.to_string().contains("is degraded"), "got: {}", err);
1188    }
1189
1190    /// A send transport error should trigger a channel refresh attempt first.
1191    /// If refresh cannot be performed (e.g. no running bridge process metadata),
1192    /// the producer requests a bridge restart as fallback.
1193    #[tokio::test]
1194    async fn lazy_producer_requests_restart_when_refresh_unavailable() {
1195        use tokio::sync::watch;
1196        use tonic::transport::Endpoint as TonicEndpoint;
1197        use tower::Service;
1198
1199        // Build a lazy channel to a port where nothing is listening.
1200        // connect_lazy() succeeds immediately; the error manifests on the actual RPC call.
1201        let dead_channel = TonicEndpoint::from_static("http://127.0.0.1:1").connect_lazy();
1202
1203        let (state_tx, state_rx) = watch::channel(BridgeState::Ready {
1204            channel: dead_channel.clone(),
1205        });
1206
1207        let pool = Arc::new(
1208            JmsBridgePool::from_config(JmsPoolConfig::single_broker(
1209                "tcp://localhost:61616",
1210                BrokerType::ActiveMq,
1211            ))
1212            .unwrap(),
1213        );
1214
1215        // Manually insert a slot with the dead-channel in Ready state.
1216        let slot = Arc::new(BridgeSlot {
1217            name: "default".to_string(),
1218            broker_url: "tcp://localhost:61616".to_string(),
1219            broker_type: BrokerType::ActiveMq,
1220            credentials: None,
1221            state_rx: state_rx.clone(),
1222            state_tx: state_tx.clone(),
1223            process: Arc::new(tokio::sync::Mutex::new(None)),
1224            health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1225        });
1226        pool.slots.insert("default".to_string(), Arc::clone(&slot));
1227
1228        let endpoint_config =
1229            crate::config::JmsEndpointConfig::from_uri("jms:queue:test-retry").unwrap();
1230
1231        let mut producer = LazyJmsProducer {
1232            pool: Arc::clone(&pool),
1233            broker_name: "default".to_string(),
1234            endpoint_config,
1235            resolved_broker_type: BrokerType::ActiveMq,
1236            runtime: test_rt(),
1237        };
1238
1239        let mut exchange = Exchange::default();
1240        exchange.input.body = camel_component_api::Body::Text("hello".to_string());
1241
1242        // The send will fail because the channel points to a dead port.
1243        let result = producer.call(exchange).await;
1244        assert!(result.is_err(), "expected send to fail");
1245
1246        // Refresh cannot run in this setup (slot has no BridgeProcess), so the
1247        // fallback path requests a restart.
1248        let state_after = state_rx.borrow().clone();
1249        assert!(
1250            matches!(state_after, BridgeState::Restarting { .. }),
1251            "slot must enter Restarting when refresh is unavailable; got: {:?}",
1252            state_after
1253        );
1254    }
1255
1256    // ── JMS-007: Transport error classification ──────────────────────────────
1257
1258    #[test]
1259    fn transport_error_detects_send_error() {
1260        let err = CamelError::ProcessorError(format!(
1261            "{}send error: connection refused",
1262            BRIDGE_TRANSPORT_ERROR_PREFIX
1263        ));
1264        assert!(
1265            is_bridge_transport_error(&err),
1266            "send error must be classified as transport"
1267        );
1268    }
1269
1270    #[test]
1271    fn transport_error_detects_subscribe_error() {
1272        let err = CamelError::ProcessorError(format!(
1273            "{}subscribe error: stream reset",
1274            BRIDGE_TRANSPORT_ERROR_PREFIX
1275        ));
1276        assert!(
1277            is_bridge_transport_error(&err),
1278            "subscribe error must be classified as transport"
1279        );
1280    }
1281
1282    #[test]
1283    fn transport_error_rejects_business_errors() {
1284        let err = CamelError::ProcessorError("JMS broker 'main' is degraded: timeout".to_string());
1285        assert!(
1286            !is_bridge_transport_error(&err),
1287            "degraded state error must NOT be transport"
1288        );
1289    }
1290
1291    #[test]
1292    fn transport_error_rejects_config_errors() {
1293        let err = CamelError::Config("bridge_start_timeout_ms must be > 0".to_string());
1294        assert!(
1295            !is_bridge_transport_error(&err),
1296            "config error must NOT be transport"
1297        );
1298    }
1299
1300    #[test]
1301    fn transport_error_prefix_is_used_by_producer_and_consumer() {
1302        // Verify the constant prefix matches what producer.rs and consumer.rs emit.
1303        // If this test fails, the constant has drifted from the error format strings.
1304        assert!(
1305            BRIDGE_TRANSPORT_ERROR_PREFIX.starts_with("JMS gRPC "),
1306            "prefix must start with 'JMS gRPC '"
1307        );
1308    }
1309
1310    // ── JMS-006: max_bridges enforcement ─────────────────────────────────────
1311
1312    #[tokio::test]
1313    async fn pool_enforces_max_bridges_limit() {
1314        use tokio::sync::watch;
1315
1316        let pool = Arc::new(
1317            JmsBridgePool::from_config(JmsPoolConfig {
1318                brokers: HashMap::from([
1319                    (
1320                        "b1".to_string(),
1321                        BrokerConfig {
1322                            broker_url: "tcp://b1:61616".to_string(),
1323                            broker_type: BrokerType::ActiveMq,
1324                            username: None,
1325                            password: None,
1326                        },
1327                    ),
1328                    (
1329                        "b2".to_string(),
1330                        BrokerConfig {
1331                            broker_url: "tcp://b2:61616".to_string(),
1332                            broker_type: BrokerType::ActiveMq,
1333                            username: None,
1334                            password: None,
1335                        },
1336                    ),
1337                    (
1338                        "b3".to_string(),
1339                        BrokerConfig {
1340                            broker_url: "tcp://b3:61616".to_string(),
1341                            broker_type: BrokerType::ActiveMq,
1342                            username: None,
1343                            password: None,
1344                        },
1345                    ),
1346                ]),
1347                max_bridges: 2,
1348                ..JmsPoolConfig::default()
1349            })
1350            .unwrap(),
1351        );
1352
1353        // Manually insert two slots to simulate existing bridges.
1354        // max_bridges counts ALL slots in the map (not just active states).
1355        for name in &["b1", "b2"] {
1356            let (state_tx, state_rx) = watch::channel(BridgeState::Starting);
1357            let slot = Arc::new(BridgeSlot {
1358                name: name.to_string(),
1359                broker_url: format!("tcp://{name}:61616"),
1360                broker_type: BrokerType::ActiveMq,
1361                credentials: None,
1362                state_rx,
1363                state_tx,
1364                process: Arc::new(tokio::sync::Mutex::new(None)),
1365                health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1366            });
1367            pool.slots.insert(name.to_string(), slot);
1368        }
1369
1370        // Attempting to create a third slot should fail.
1371        let err = pool.get_or_create_slot("b3").await.unwrap_err();
1372        assert!(
1373            err.to_string().contains("max_bridges"),
1374            "expected max_bridges error, got: {}",
1375            err
1376        );
1377    }
1378
1379    #[tokio::test]
1380    async fn pool_allows_slot_when_below_max_bridges() {
1381        use tokio::sync::watch;
1382
1383        let pool = Arc::new(
1384            JmsBridgePool::from_config(JmsPoolConfig {
1385                brokers: HashMap::from([(
1386                    "b1".to_string(),
1387                    BrokerConfig {
1388                        broker_url: "tcp://b1:61616".to_string(),
1389                        broker_type: BrokerType::ActiveMq,
1390                        username: None,
1391                        password: None,
1392                    },
1393                )]),
1394                max_bridges: 2,
1395                bridge_start_timeout_ms: 100,
1396                ..JmsPoolConfig::default()
1397            })
1398            .unwrap(),
1399        );
1400
1401        // Insert one slot in Degraded state (not counted as active).
1402        let (state_tx, state_rx) = watch::channel(BridgeState::Degraded("test".to_string()));
1403        let slot = Arc::new(BridgeSlot {
1404            name: "b1".to_string(),
1405            broker_url: "tcp://b1:61616".to_string(),
1406            broker_type: BrokerType::ActiveMq,
1407            credentials: None,
1408            state_rx,
1409            state_tx,
1410            process: Arc::new(tokio::sync::Mutex::new(None)),
1411            health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1412        });
1413        pool.slots.insert("b1".to_string(), slot);
1414
1415        // b1 is Degraded (not active), so creating b1's slot returns existing.
1416        // The max_bridges check only applies to new slots.
1417        let result = pool.get_or_create_slot("b1").await;
1418        assert!(result.is_ok(), "existing slot must be returned");
1419    }
1420
1421    // ── JMS-003: poll_ready reflects bridge state ────────────────────────────
1422
1423    #[tokio::test]
1424    async fn poll_ready_returns_pending_when_starting() {
1425        use tokio::sync::watch;
1426        use tower::Service;
1427
1428        let pool = Arc::new(
1429            JmsBridgePool::from_config(JmsPoolConfig::single_broker(
1430                "tcp://localhost:61616",
1431                BrokerType::ActiveMq,
1432            ))
1433            .unwrap(),
1434        );
1435
1436        let (state_tx, state_rx) = watch::channel(BridgeState::Starting);
1437        let slot = Arc::new(BridgeSlot {
1438            name: "default".to_string(),
1439            broker_url: "tcp://localhost:61616".to_string(),
1440            broker_type: BrokerType::ActiveMq,
1441            credentials: None,
1442            state_rx,
1443            state_tx,
1444            process: Arc::new(tokio::sync::Mutex::new(None)),
1445            health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1446        });
1447        pool.slots.insert("default".to_string(), slot);
1448
1449        let endpoint_config = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
1450        let mut producer = LazyJmsProducer {
1451            pool: Arc::clone(&pool),
1452            broker_name: "default".to_string(),
1453            endpoint_config,
1454            resolved_broker_type: BrokerType::ActiveMq,
1455            runtime: test_rt(),
1456        };
1457
1458        let result = producer.poll_ready(&mut Context::from_waker(futures::task::noop_waker_ref()));
1459        assert!(
1460            matches!(result, Poll::Pending),
1461            "poll_ready must be Pending when Starting; got: {:?}",
1462            result
1463        );
1464    }
1465
1466    #[tokio::test]
1467    async fn poll_ready_returns_error_when_degraded() {
1468        use tokio::sync::watch;
1469        use tower::Service;
1470
1471        let pool = Arc::new(
1472            JmsBridgePool::from_config(JmsPoolConfig::single_broker(
1473                "tcp://localhost:61616",
1474                BrokerType::ActiveMq,
1475            ))
1476            .unwrap(),
1477        );
1478
1479        let (state_tx, state_rx) =
1480            watch::channel(BridgeState::Degraded("health check failed".to_string()));
1481        let slot = Arc::new(BridgeSlot {
1482            name: "default".to_string(),
1483            broker_url: "tcp://localhost:61616".to_string(),
1484            broker_type: BrokerType::ActiveMq,
1485            credentials: None,
1486            state_rx,
1487            state_tx,
1488            process: Arc::new(tokio::sync::Mutex::new(None)),
1489            health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1490        });
1491        pool.slots.insert("default".to_string(), slot);
1492
1493        let endpoint_config = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
1494        let mut producer = LazyJmsProducer {
1495            pool: Arc::clone(&pool),
1496            broker_name: "default".to_string(),
1497            endpoint_config,
1498            resolved_broker_type: BrokerType::ActiveMq,
1499            runtime: test_rt(),
1500        };
1501
1502        let result = producer.poll_ready(&mut Context::from_waker(futures::task::noop_waker_ref()));
1503        assert!(
1504            matches!(result, Poll::Ready(Err(_))),
1505            "poll_ready must be Err when Degraded; got: {:?}",
1506            result
1507        );
1508        let err_msg = match result {
1509            Poll::Ready(Err(e)) => e.to_string(),
1510            _ => unreachable!(),
1511        };
1512        assert!(
1513            err_msg.contains("degraded"),
1514            "error must mention degraded: {}",
1515            err_msg
1516        );
1517    }
1518
1519    #[tokio::test]
1520    async fn poll_ready_returns_error_when_stopped() {
1521        use tokio::sync::watch;
1522        use tower::Service;
1523
1524        let pool = Arc::new(
1525            JmsBridgePool::from_config(JmsPoolConfig::single_broker(
1526                "tcp://localhost:61616",
1527                BrokerType::ActiveMq,
1528            ))
1529            .unwrap(),
1530        );
1531
1532        let (state_tx, state_rx) = watch::channel(BridgeState::Stopped);
1533        let slot = Arc::new(BridgeSlot {
1534            name: "default".to_string(),
1535            broker_url: "tcp://localhost:61616".to_string(),
1536            broker_type: BrokerType::ActiveMq,
1537            credentials: None,
1538            state_rx,
1539            state_tx,
1540            process: Arc::new(tokio::sync::Mutex::new(None)),
1541            health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1542        });
1543        pool.slots.insert("default".to_string(), slot);
1544
1545        let endpoint_config = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
1546        let mut producer = LazyJmsProducer {
1547            pool: Arc::clone(&pool),
1548            broker_name: "default".to_string(),
1549            endpoint_config,
1550            resolved_broker_type: BrokerType::ActiveMq,
1551            runtime: test_rt(),
1552        };
1553
1554        let result = producer.poll_ready(&mut Context::from_waker(futures::task::noop_waker_ref()));
1555        assert!(
1556            matches!(result, Poll::Ready(Err(_))),
1557            "poll_ready must be Err when Stopped; got: {:?}",
1558            result
1559        );
1560    }
1561
1562    #[tokio::test]
1563    async fn poll_ready_returns_ready_when_slot_ready() {
1564        use tokio::sync::watch;
1565        use tonic::transport::Endpoint as TonicEndpoint;
1566        use tower::Service;
1567
1568        let pool = Arc::new(
1569            JmsBridgePool::from_config(JmsPoolConfig::single_broker(
1570                "tcp://localhost:61616",
1571                BrokerType::ActiveMq,
1572            ))
1573            .unwrap(),
1574        );
1575
1576        let lazy_channel = TonicEndpoint::from_static("http://127.0.0.1:1").connect_lazy();
1577        let (state_tx, state_rx) = watch::channel(BridgeState::Ready {
1578            channel: lazy_channel,
1579        });
1580        let slot = Arc::new(BridgeSlot {
1581            name: "default".to_string(),
1582            broker_url: "tcp://localhost:61616".to_string(),
1583            broker_type: BrokerType::ActiveMq,
1584            credentials: None,
1585            state_rx,
1586            state_tx,
1587            process: Arc::new(tokio::sync::Mutex::new(None)),
1588            health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1589        });
1590        pool.slots.insert("default".to_string(), slot);
1591
1592        let endpoint_config = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
1593        let mut producer = LazyJmsProducer {
1594            pool: Arc::clone(&pool),
1595            broker_name: "default".to_string(),
1596            endpoint_config,
1597            resolved_broker_type: BrokerType::ActiveMq,
1598            runtime: test_rt(),
1599        };
1600
1601        let result = producer.poll_ready(&mut Context::from_waker(futures::task::noop_waker_ref()));
1602        assert!(
1603            matches!(result, Poll::Ready(Ok(()))),
1604            "poll_ready must be Ready(Ok) when bridge is Ready; got: {:?}",
1605            result
1606        );
1607    }
1608
1609    #[tokio::test]
1610    async fn poll_ready_returns_ready_when_no_slot_exists() {
1611        use tower::Service;
1612
1613        let pool = Arc::new(
1614            JmsBridgePool::from_config(JmsPoolConfig::single_broker(
1615                "tcp://localhost:61616",
1616                BrokerType::ActiveMq,
1617            ))
1618            .unwrap(),
1619        );
1620
1621        let endpoint_config = crate::config::JmsEndpointConfig::from_uri("jms:queue:test").unwrap();
1622        let mut producer = LazyJmsProducer {
1623            pool: Arc::clone(&pool),
1624            broker_name: "default".to_string(),
1625            endpoint_config,
1626            resolved_broker_type: BrokerType::ActiveMq,
1627            runtime: test_rt(),
1628        };
1629
1630        // No slot exists yet — poll_ready should return Ready so call() can start the bridge.
1631        let result = producer.poll_ready(&mut Context::from_waker(futures::task::noop_waker_ref()));
1632        assert!(
1633            matches!(result, Poll::Ready(Ok(()))),
1634            "poll_ready must be Ready(Ok) when no slot exists; got: {:?}",
1635            result
1636        );
1637    }
1638
1639    // ── JMS-001: Health monitor lifecycle ────────────────────────────────────
1640
1641    #[tokio::test]
1642    async fn pool_shutdown_awaits_health_monitor() {
1643        use tokio::sync::watch;
1644
1645        let pool = Arc::new(
1646            JmsBridgePool::from_config(JmsPoolConfig {
1647                brokers: HashMap::from([(
1648                    "default".to_string(),
1649                    BrokerConfig {
1650                        broker_url: "tcp://localhost:61616".to_string(),
1651                        broker_type: BrokerType::ActiveMq,
1652                        username: None,
1653                        password: None,
1654                    },
1655                )]),
1656                health_check_interval_ms: 100,
1657                ..JmsPoolConfig::default()
1658            })
1659            .unwrap(),
1660        );
1661
1662        // Create a slot manually with a spawned health monitor task.
1663        let (state_tx, state_rx) = watch::channel(BridgeState::Ready {
1664            channel: tonic::transport::Endpoint::from_static("http://127.0.0.1:1").connect_lazy(),
1665        });
1666        let monitor_handle_ref: Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>> =
1667            Arc::new(tokio::sync::Mutex::new(None));
1668
1669        // Spawn a simple monitor that exits on Stopped.
1670        let state_rx_clone = state_rx.clone();
1671        let handle = tokio::spawn(async move {
1672            loop {
1673                if matches!(*state_rx_clone.borrow(), BridgeState::Stopped) {
1674                    break;
1675                }
1676                tokio::time::sleep(Duration::from_millis(50)).await;
1677            }
1678        });
1679        *monitor_handle_ref.lock().await = Some(handle);
1680
1681        let slot = Arc::new(BridgeSlot {
1682            name: "default".to_string(),
1683            broker_url: "tcp://localhost:61616".to_string(),
1684            broker_type: BrokerType::ActiveMq,
1685            credentials: None,
1686            state_rx,
1687            state_tx,
1688            process: Arc::new(tokio::sync::Mutex::new(None)),
1689            health_monitor_handle: monitor_handle_ref,
1690        });
1691        pool.slots.insert("default".to_string(), slot);
1692
1693        // Shutdown should complete without hanging — the monitor exits on Stopped.
1694        let result = pool.shutdown().await;
1695        // May report errors from bridge process (none in this test), but must not hang.
1696        let _ = result;
1697    }
1698
1699    #[tokio::test]
1700    async fn health_monitor_handle_stored_after_spawn() {
1701        use tokio::sync::watch;
1702
1703        let pool = Arc::new(
1704            JmsBridgePool::from_config(JmsPoolConfig {
1705                brokers: HashMap::from([(
1706                    "default".to_string(),
1707                    BrokerConfig {
1708                        broker_url: "tcp://localhost:61616".to_string(),
1709                        broker_type: BrokerType::ActiveMq,
1710                        username: None,
1711                        password: None,
1712                    },
1713                )]),
1714                health_check_interval_ms: 100,
1715                bridge_start_timeout_ms: 100,
1716                ..JmsPoolConfig::default()
1717            })
1718            .unwrap(),
1719        );
1720
1721        let (state_tx, state_rx) = watch::channel(BridgeState::Stopped);
1722        let slot = Arc::new(BridgeSlot {
1723            name: "default".to_string(),
1724            broker_url: "tcp://localhost:61616".to_string(),
1725            broker_type: BrokerType::ActiveMq,
1726            credentials: None,
1727            state_rx,
1728            state_tx,
1729            process: Arc::new(tokio::sync::Mutex::new(None)),
1730            health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1731        });
1732        pool.slots.insert("default".to_string(), Arc::clone(&slot));
1733
1734        pool.spawn_health_monitor(Arc::clone(&slot)).await;
1735
1736        tokio::time::sleep(Duration::from_millis(50)).await;
1737        let guard = slot.health_monitor_handle.lock().await;
1738        assert!(
1739            guard.is_some(),
1740            "health monitor handle must be stored after spawn_health_monitor"
1741        );
1742    }
1743
1744    // ── JMS-008: URL redaction for safe logging ──────────────────────────────
1745
1746    #[test]
1747    fn redact_url_strips_userinfo_with_password() {
1748        assert_eq!(
1749            redact_url("tcp://admin:s3cret@broker:61616"),
1750            "tcp://***@broker:61616"
1751        );
1752    }
1753
1754    #[test]
1755    fn redact_url_strips_userinfo_without_password() {
1756        assert_eq!(
1757            redact_url("tcp://admin@broker:61616"),
1758            "tcp://***@broker:61616"
1759        );
1760    }
1761
1762    #[test]
1763    fn redact_url_passes_clean_url_unchanged() {
1764        assert_eq!(redact_url("tcp://localhost:61616"), "tcp://localhost:61616");
1765    }
1766
1767    #[test]
1768    fn redact_url_handles_ssl_scheme() {
1769        assert_eq!(
1770            redact_url("ssl://user:pass@secure-broker:61617"),
1771            "ssl://***@secure-broker:61617"
1772        );
1773    }
1774
1775    // ── JMS-009: max_bridges race condition under concurrency ────────────────
1776
1777    #[tokio::test]
1778    async fn concurrent_slot_creation_respects_max_bridges() {
1779        let pool = Arc::new(
1780            JmsBridgePool::from_config(JmsPoolConfig {
1781                brokers: HashMap::from([
1782                    (
1783                        "b1".to_string(),
1784                        BrokerConfig {
1785                            broker_url: "tcp://b1:61616".to_string(),
1786                            broker_type: BrokerType::ActiveMq,
1787                            username: None,
1788                            password: None,
1789                        },
1790                    ),
1791                    (
1792                        "b2".to_string(),
1793                        BrokerConfig {
1794                            broker_url: "tcp://b2:61616".to_string(),
1795                            broker_type: BrokerType::ActiveMq,
1796                            username: None,
1797                            password: None,
1798                        },
1799                    ),
1800                    (
1801                        "b3".to_string(),
1802                        BrokerConfig {
1803                            broker_url: "tcp://b3:61616".to_string(),
1804                            broker_type: BrokerType::ActiveMq,
1805                            username: None,
1806                            password: None,
1807                        },
1808                    ),
1809                ]),
1810                max_bridges: 2,
1811                bridge_start_timeout_ms: 100,
1812                ..JmsPoolConfig::default()
1813            })
1814            .unwrap(),
1815        );
1816
1817        let (state_tx, state_rx) = watch::channel(BridgeState::Starting);
1818        for name in &["b1", "b2"] {
1819            let slot = Arc::new(BridgeSlot {
1820                name: name.to_string(),
1821                broker_url: format!("tcp://{name}:61616"),
1822                broker_type: BrokerType::ActiveMq,
1823                credentials: None,
1824                state_rx: state_rx.clone(),
1825                state_tx: state_tx.clone(),
1826                process: Arc::new(tokio::sync::Mutex::new(None)),
1827                health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1828            });
1829            pool.slots.insert(name.to_string(), slot);
1830        }
1831
1832        assert_eq!(pool.slots.len(), 2);
1833
1834        let guard = pool.bridge_create_lock.lock().await;
1835        let total_count = pool.slots.len();
1836        assert!(total_count >= pool.max_bridges as usize);
1837        let result = if total_count >= pool.max_bridges as usize {
1838            Err(CamelError::Config(format!(
1839                "JMS bridge limit reached: {total_count} bridge(s) >= max_bridges ({})",
1840                pool.max_bridges
1841            )))
1842        } else {
1843            Ok(())
1844        };
1845        drop(guard);
1846
1847        assert!(result.is_err(), "3rd broker should be rejected");
1848        let err_msg = result.unwrap_err().to_string();
1849        assert!(
1850            err_msg.contains("max_bridges"),
1851            "error must mention max_bridges, got: {err_msg}"
1852        );
1853    }
1854
1855    // ── JMS-010: Transport error does NOT auto-resend ────────────────────────
1856
1857    #[tokio::test]
1858    async fn transport_error_refreshes_channel_but_does_not_resend() {
1859        use tokio::sync::watch;
1860        use tonic::transport::Endpoint as TonicEndpoint;
1861        use tower::Service;
1862
1863        let dead_channel = TonicEndpoint::from_static("http://127.0.0.1:1").connect_lazy();
1864
1865        let (state_tx, state_rx) = watch::channel(BridgeState::Ready {
1866            channel: dead_channel.clone(),
1867        });
1868
1869        let pool = Arc::new(
1870            JmsBridgePool::from_config(JmsPoolConfig::single_broker(
1871                "tcp://localhost:61616",
1872                BrokerType::ActiveMq,
1873            ))
1874            .unwrap(),
1875        );
1876
1877        let slot = Arc::new(BridgeSlot {
1878            name: "default".to_string(),
1879            broker_url: "tcp://localhost:61616".to_string(),
1880            broker_type: BrokerType::ActiveMq,
1881            credentials: None,
1882            state_rx: state_rx.clone(),
1883            state_tx: state_tx.clone(),
1884            process: Arc::new(tokio::sync::Mutex::new(None)),
1885            health_monitor_handle: Arc::new(tokio::sync::Mutex::new(None)),
1886        });
1887        pool.slots.insert("default".to_string(), Arc::clone(&slot));
1888
1889        let endpoint_config =
1890            crate::config::JmsEndpointConfig::from_uri("jms:queue:test-no-resend").unwrap();
1891
1892        let mut producer = LazyJmsProducer {
1893            pool: Arc::clone(&pool),
1894            broker_name: "default".to_string(),
1895            endpoint_config,
1896            resolved_broker_type: BrokerType::ActiveMq,
1897            runtime: test_rt(),
1898        };
1899
1900        let mut exchange = Exchange::default();
1901        exchange.input.body = camel_component_api::Body::Text("hello".to_string());
1902
1903        let result = producer.call(exchange).await;
1904        assert!(result.is_err(), "expected send to fail");
1905
1906        let state_after = state_rx.borrow().clone();
1907        assert!(
1908            matches!(state_after, BridgeState::Restarting { .. }),
1909            "slot must enter Restarting; got: {:?}",
1910            state_after
1911        );
1912
1913        let err_msg = result.unwrap_err().to_string();
1914        assert!(
1915            err_msg.contains(BRIDGE_TRANSPORT_ERROR_PREFIX),
1916            "error must be original transport error, got: {}",
1917            err_msg
1918        );
1919    }
1920}