Skip to main content

camel_bridge/
process.rs

1use std::fmt;
2use std::ops::Deref;
3use std::path::PathBuf;
4use thiserror::Error;
5use tokio::task::JoinHandle;
6use tokio_util::sync::CancellationToken;
7
8use crate::spec::{BridgeSpec, CXF_BRIDGE, JMS_BRIDGE, XML_BRIDGE};
9
10// ---------------------------------------------------------------------------
11// Redacted<T> — wrapper that never leaks inner value via Debug/Display
12// ---------------------------------------------------------------------------
13
14/// A newtype that redacts its inner value in `Debug` and `Display` output.
15/// Used for password/credential fields to prevent accidental logging.
16#[derive(Clone)]
17pub struct Redacted<T>(T);
18
19impl<T> Redacted<T> {
20    pub fn new(value: T) -> Self {
21        Self(value)
22    }
23
24    pub fn into_inner(self) -> T {
25        self.0
26    }
27}
28
29impl<T> fmt::Debug for Redacted<T> {
30    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31        write!(f, "[REDACTED]")
32    }
33}
34
35impl<T> fmt::Display for Redacted<T> {
36    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37        write!(f, "[REDACTED]")
38    }
39}
40
41impl<T> Deref for Redacted<T> {
42    type Target = T;
43
44    fn deref(&self) -> &Self::Target {
45        &self.0
46    }
47}
48
49#[derive(Debug, Error)]
50pub enum BridgeError {
51    #[error("IO error: {0}")]
52    Io(#[from] std::io::Error),
53    #[error("Bridge timed out: {0}")]
54    Timeout(String),
55    #[error("Bridge stdout closed before ready message")]
56    StdoutClosed,
57    #[error("Bridge ready message malformed: {0}")]
58    BadReadyMessage(String),
59    #[error("Download failed: {0}")]
60    Download(String),
61    #[error("Checksum mismatch: expected {expected}, got {actual}")]
62    ChecksumMismatch { expected: String, actual: String },
63    #[error("URL not allowed: {0}")]
64    UrlNotAllowed(String),
65    #[error("Transport error: {0}")]
66    Transport(String),
67    #[error("Config error: {0}")]
68    Config(String),
69}
70
71#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
72#[serde(rename_all = "lowercase")]
73pub enum BrokerType {
74    #[serde(alias = "active_mq")]
75    ActiveMq,
76    Artemis,
77    Generic,
78}
79
80impl BrokerType {
81    pub fn as_env_str(&self) -> &'static str {
82        match self {
83            BrokerType::ActiveMq => "activemq",
84            BrokerType::Artemis => "artemis",
85            BrokerType::Generic => "generic",
86        }
87    }
88}
89
90impl std::str::FromStr for BrokerType {
91    type Err = BridgeError;
92
93    fn from_str(s: &str) -> Result<Self, Self::Err> {
94        match s.to_lowercase().as_str() {
95            "activemq" => Ok(BrokerType::ActiveMq),
96            "artemis" => Ok(BrokerType::Artemis),
97            "generic" => Ok(BrokerType::Generic),
98            other => Err(BridgeError::Config(format!("unknown broker type: {other}"))), // allow-secret
99        }
100    }
101}
102
103/// Environment variables for a single CXF profile, used by the bridge Java side.
104/// Password fields use [`Redacted`] to prevent accidental credential leakage in logs.
105#[derive(Debug)]
106pub struct CxfProfileEnvVars {
107    pub name: String,
108    pub wsdl_path: String,
109    pub service_name: String,
110    pub port_name: String,
111    pub address: Option<String>,
112    pub keystore_path: Option<String>,
113    pub keystore_password: Option<Redacted<String>>,
114    pub truststore_path: Option<String>,
115    pub truststore_password: Option<Redacted<String>>,
116    pub sig_username: Option<String>,
117    pub sig_password: Option<Redacted<String>>,
118    pub enc_username: Option<String>,
119    pub security_actions_out: Option<String>,
120    pub security_actions_in: Option<String>,
121    pub signature_algorithm: Option<String>,
122    pub signature_digest_algorithm: Option<String>,
123    pub signature_c14n_algorithm: Option<String>,
124    pub signature_parts: Option<String>,
125}
126
127impl CxfProfileEnvVars {
128    pub fn to_env_vars(&self) -> Vec<(String, String)> {
129        let prefix = format!("CXF_PROFILE_{}_", self.name.to_uppercase());
130        let mut vars = vec![
131            (format!("{}WSDL_PATH", prefix), self.wsdl_path.clone()),
132            (format!("{}SERVICE_NAME", prefix), self.service_name.clone()),
133            (format!("{}PORT_NAME", prefix), self.port_name.clone()),
134        ];
135
136        if let Some(ref v) = self.address {
137            vars.push((format!("{}ADDRESS", prefix), v.clone()));
138        }
139        if let Some(ref v) = self.keystore_path {
140            vars.push((format!("{}KEYSTORE_PATH", prefix), v.clone()));
141        }
142        if let Some(ref v) = self.keystore_password {
143            vars.push((format!("{}KEYSTORE_PASSWORD", prefix), (**v).clone()));
144        }
145        if let Some(ref v) = self.truststore_path {
146            vars.push((format!("{}TRUSTSTORE_PATH", prefix), v.clone()));
147        }
148        if let Some(ref v) = self.truststore_password {
149            vars.push((format!("{}TRUSTSTORE_PASSWORD", prefix), (**v).clone()));
150        }
151        if let Some(ref v) = self.sig_username {
152            vars.push((format!("{}SIG_USERNAME", prefix), v.clone()));
153        }
154        if let Some(ref v) = self.sig_password {
155            vars.push((format!("{}SIG_PASSWORD", prefix), (**v).clone()));
156        }
157        if let Some(ref v) = self.enc_username {
158            vars.push((format!("{}ENC_USERNAME", prefix), v.clone()));
159        }
160        if let Some(ref v) = self.security_actions_out {
161            vars.push((format!("{}SECURITY_ACTIONS_OUT", prefix), v.clone()));
162        }
163        if let Some(ref v) = self.security_actions_in {
164            vars.push((format!("{}SECURITY_ACTIONS_IN", prefix), v.clone()));
165        }
166        if let Some(ref v) = self.signature_algorithm {
167            vars.push((format!("{}SIGNATURE_ALGORITHM", prefix), v.clone()));
168        }
169        if let Some(ref v) = self.signature_digest_algorithm {
170            vars.push((format!("{}SIGNATURE_DIGEST_ALGORITHM", prefix), v.clone()));
171        }
172        if let Some(ref v) = self.signature_c14n_algorithm {
173            vars.push((format!("{}SIGNATURE_C14N_ALGORITHM", prefix), v.clone()));
174        }
175        if let Some(ref v) = self.signature_parts {
176            vars.push((format!("{}SIGNATURE_PARTS", prefix), v.clone()));
177        }
178
179        vars
180    }
181}
182
183/// Configuration for spawning a bridge subprocess.
184/// Password fields use [`Redacted`] to prevent accidental credential leakage in logs.
185#[derive(Debug)]
186pub struct BridgeProcessConfig {
187    pub spec: &'static BridgeSpec,
188    pub binary_path: PathBuf,
189    pub broker_url: String,
190    pub broker_type: BrokerType,
191    pub username: Option<String>,
192    pub password: Option<Redacted<String>>,
193    pub start_timeout_ms: u64,
194    pub env_vars: Vec<(String, String)>,
195}
196
197impl BridgeProcessConfig {
198    /// Constructor for the JMS bridge.
199    pub fn jms(
200        binary_path: PathBuf,
201        broker_url: String,
202        broker_type: BrokerType,
203        username: Option<String>,
204        password: Option<Redacted<String>>,
205        start_timeout_ms: u64,
206    ) -> Self {
207        let mut env_vars = vec![
208            ("BRIDGE_BROKER_URL".to_string(), broker_url.clone()),
209            (
210                "BRIDGE_BROKER_TYPE".to_string(),
211                broker_type.as_env_str().to_string(),
212            ),
213        ];
214        if let Some(u) = &username {
215            env_vars.push(("BRIDGE_USERNAME".to_string(), u.clone()));
216        }
217        if let Some(p) = &password {
218            env_vars.push(("BRIDGE_PASSWORD".to_string(), (**p).clone()));
219        }
220        Self {
221            spec: &JMS_BRIDGE,
222            binary_path,
223            broker_url,
224            broker_type,
225            username,
226            password,
227            start_timeout_ms,
228            env_vars,
229        }
230    }
231
232    /// Constructor for the XML bridge.
233    pub fn xml(binary_path: PathBuf, start_timeout_ms: u64) -> Self {
234        Self {
235            spec: &XML_BRIDGE,
236            binary_path,
237            broker_url: String::new(),
238            broker_type: BrokerType::Generic,
239            username: None,
240            password: None,
241            start_timeout_ms,
242            env_vars: vec![],
243        }
244    }
245
246    /// Constructor for the CXF bridge with multi-profile support.
247    /// Generates `CXF_PROFILES=list` env var plus per-profile env vars.
248    pub fn cxf_profiles(
249        binary_path: PathBuf,
250        profiles: &[CxfProfileEnvVars],
251        start_timeout_ms: u64,
252    ) -> Self {
253        let profile_names: Vec<String> = profiles.iter().map(|p| p.name.clone()).collect();
254        let mut env_vars = vec![("CXF_PROFILES".to_string(), profile_names.join(","))];
255
256        for profile in profiles {
257            env_vars.extend(profile.to_env_vars());
258        }
259
260        Self {
261            spec: &CXF_BRIDGE,
262            binary_path,
263            broker_url: String::new(),
264            broker_type: BrokerType::Generic,
265            username: None,
266            password: None,
267            start_timeout_ms,
268            env_vars,
269        }
270    }
271
272    pub fn validate(&self) -> Result<(), String> {
273        if self.start_timeout_ms == 0 {
274            return Err("start_timeout_ms must be > 0".to_string());
275        }
276        Ok(())
277    }
278}
279
280pub struct BridgeProcess {
281    child: tokio::process::Child,
282    grpc_port: u16,
283    token: CancellationToken,
284    handle: Option<JoinHandle<()>>,
285}
286
287impl BridgeProcess {
288    pub fn grpc_port(&self) -> u16 {
289        self.grpc_port
290    }
291
292    /// Spawn the bridge process. Reads the gRPC port from stdout JSON line:
293    ///   {"status":"ready","port":PORT}
294    ///
295    /// Picks a free OS port and passes it to the bridge via `QUARKUS_GRPC_SERVER_PORT`
296    /// so Quarkus binds exactly to that port and PortAnnouncer can echo it back.
297    pub async fn start(config: &BridgeProcessConfig) -> Result<Self, BridgeError> {
298        use tokio::io::AsyncBufReadExt;
299        use tokio::process::Command;
300        use tokio::time::{Duration, timeout};
301
302        config.validate().map_err(BridgeError::Config)?;
303
304        // Bind :0 to let the OS pick a free port, then release so the bridge can use it.
305        let free_port = {
306            let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
307            listener.local_addr()?.port()
308        };
309
310        // If CAMEL_BRIDGE_LOG_STDERR is set, redirect stderr to a file for debugging.
311        let stderr_stdio: std::process::Stdio =
312            if let Ok(log_dir) = std::env::var("CAMEL_BRIDGE_LOG_STDERR") {
313                let log_filename = config
314                    .spec
315                    .log_file_template
316                    .replace("{pid}", &std::process::id().to_string());
317                let log_path = if log_dir.is_empty() {
318                    format!("/tmp/{log_filename}")
319                } else {
320                    format!("{log_dir}/{log_filename}")
321                };
322                match std::fs::File::create(&log_path) {
323                    Ok(f) => {
324                        eprintln!("[camel-bridge] stderr → {}", log_path);
325                        f.into()
326                    }
327                    Err(e) => {
328                        eprintln!(
329                            "[camel-bridge] failed to create log file {}: {}",
330                            log_path, e
331                        );
332                        std::process::Stdio::inherit()
333                    }
334                }
335            } else {
336                std::process::Stdio::inherit()
337            };
338
339        let mut command = Command::new(&config.binary_path);
340        command
341            .env("QUARKUS_GRPC_SERVER_PORT", free_port.to_string())
342            // Let the OS pick a random HTTP port — we only use gRPC.
343            // Without this, Quarkus binds HTTP on 8080 and fails if occupied.
344            .env("QUARKUS_HTTP_PORT", "0")
345            .stdout(std::process::Stdio::piped())
346            .stderr(stderr_stdio);
347
348        // Inject bridge-specific env vars (e.g. JMS broker URL/credentials via ::jms()).
349        for (key, value) in &config.env_vars {
350            command.env(key, value);
351        }
352
353        let mut child = command.spawn()?;
354
355        let stdout = child.stdout.take().ok_or(BridgeError::StdoutClosed)?;
356        let mut reader = tokio::io::BufReader::new(stdout).lines();
357
358        let port = timeout(Duration::from_millis(config.start_timeout_ms), async {
359            while let Some(line) = reader.next_line().await? {
360                if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line)
361                    && v.get("status").and_then(|s| s.as_str()) == Some("ready")
362                {
363                    if let Some(p) = v.get("port").and_then(|p| p.as_u64()) {
364                        return Ok(p as u16);
365                    }
366                    tracing::error!("bridge ready message malformed: {line}");
367                    return Err(BridgeError::BadReadyMessage(line));
368                }
369            }
370            tracing::error!("bridge stdout closed before ready message");
371            Err(BridgeError::StdoutClosed)
372        })
373        .await
374        .map_err(|_| {
375            let msg = format!(
376                "{} failed to start: health check timeout after {}ms",
377                config.spec.name, config.start_timeout_ms
378            );
379            tracing::error!("{msg}");
380            BridgeError::Timeout(msg)
381        })??;
382
383        // Keep draining stdout in background; allow cooperative cancellation.
384        let token = CancellationToken::new();
385        let child_token = token.clone();
386        let handle = tokio::spawn(async move {
387            loop {
388                tokio::select! {
389                    _ = child_token.cancelled() => break,
390                    line = reader.next_line() => {
391                        match line {
392                            Ok(Some(line)) => tracing::debug!(target: "camel_bridge::child", "{}", line),
393                            Ok(None) | Err(_) => break,
394                        }
395                    }
396                }
397            }
398        });
399
400        Ok(BridgeProcess {
401            child,
402            grpc_port: port,
403            token,
404            handle: Some(handle),
405        })
406    }
407
408    /// Gracefully stop: SIGTERM + wait for exit.
409    pub async fn stop(mut self) -> Result<(), BridgeError> {
410        use tokio::time::{Duration, sleep};
411
412        self.token.cancel();
413
414        if let Some(handle) = self.handle.take() {
415            let join_result = tokio::time::timeout(Duration::from_secs(5), handle).await;
416            if join_result.is_err() {
417                tracing::warn!("bridge stdout drain task did not exit after cancellation");
418            }
419        }
420
421        // Send SIGTERM first (graceful shutdown)
422        #[cfg(unix)]
423        {
424            let pid = self.child.id().unwrap_or(0);
425            if pid > 0 {
426                // SAFETY: libc::kill is called with the child process PID obtained from tokio.
427                unsafe {
428                    libc::kill(pid as i32, libc::SIGTERM);
429                }
430            }
431        }
432
433        // On non-Unix (Windows), fall through to kill immediately
434        #[cfg(not(unix))]
435        let _ = self.child.start_kill();
436
437        // Wait up to 5 seconds for graceful exit, then SIGKILL
438        tokio::select! {
439            result = self.child.wait() => {
440                result?;
441            }
442            _ = sleep(Duration::from_secs(5)) => {
443                let _ = self.child.start_kill();
444                self.child.wait().await?;
445            }
446        }
447        Ok(())
448    }
449}
450
451impl Drop for BridgeProcess {
452    fn drop(&mut self) {
453        self.token.cancel();
454        // Best-effort only. Does NOT wait — cannot block in Drop.
455        let _ = self.child.start_kill();
456    }
457}
458
459#[cfg(test)]
460mod tests {
461    use super::*;
462
463    #[test]
464    fn broker_type_from_str_activemq() {
465        assert_eq!(
466            "activemq".parse::<BrokerType>().unwrap(),
467            BrokerType::ActiveMq
468        );
469        assert_eq!(
470            "ACTIVEMQ".parse::<BrokerType>().unwrap(),
471            BrokerType::ActiveMq
472        );
473    }
474
475    #[test]
476    fn broker_type_from_str_artemis() {
477        assert_eq!(
478            "artemis".parse::<BrokerType>().unwrap(),
479            BrokerType::Artemis
480        );
481    }
482
483    #[test]
484    fn broker_type_from_str_generic() {
485        assert_eq!(
486            "generic".parse::<BrokerType>().unwrap(),
487            BrokerType::Generic
488        );
489    }
490
491    #[test]
492    fn broker_type_from_str_unknown_returns_err() {
493        assert!("ibmmq".parse::<BrokerType>().is_err());
494        assert!("UnknownBroker".parse::<BrokerType>().is_err());
495    }
496
497    #[test]
498    fn broker_type_env_str() {
499        assert_eq!(BrokerType::ActiveMq.as_env_str(), "activemq");
500        assert_eq!(BrokerType::Artemis.as_env_str(), "artemis");
501        assert_eq!(BrokerType::Generic.as_env_str(), "generic");
502    }
503
504    #[test]
505    fn jms_constructor_uses_jms_spec() {
506        let cfg = BridgeProcessConfig::jms(
507            PathBuf::from("/tmp/jms-bridge"),
508            "tcp://localhost:61616".to_string(),
509            BrokerType::ActiveMq,
510            Some("user".to_string()),
511            Some(Redacted::new("pass".to_string())),
512            1000,
513        );
514        assert_eq!(cfg.spec.name, "jms-bridge");
515    }
516
517    #[test]
518    fn xml_constructor_uses_xml_spec() {
519        let cfg = BridgeProcessConfig::xml(PathBuf::from("/tmp/xml-bridge"), 1000);
520        assert_eq!(cfg.spec.name, "xml-bridge");
521    }
522
523    #[test]
524    fn cxf_profiles_generates_cxf_profiles_env_var() {
525        let profiles = vec![
526            CxfProfileEnvVars {
527                name: "baleares".to_string(),
528                wsdl_path: "/a.wsdl".to_string(),
529                service_name: "Svc".to_string(),
530                port_name: "Port".to_string(),
531                address: None,
532                keystore_path: None,
533                keystore_password: None,
534                truststore_path: None,
535                truststore_password: None,
536                sig_username: None,
537                sig_password: None,
538                enc_username: None,
539                security_actions_out: None,
540                security_actions_in: None,
541                signature_algorithm: None,
542                signature_digest_algorithm: None,
543                signature_c14n_algorithm: None,
544                signature_parts: None,
545            },
546            CxfProfileEnvVars {
547                name: "extremadura".to_string(),
548                wsdl_path: "/b.wsdl".to_string(),
549                service_name: "Svc2".to_string(),
550                port_name: "Port2".to_string(),
551                address: Some("http://host:9090/ws".to_string()),
552                keystore_path: Some("/b.jks".to_string()),
553                keystore_password: Some(Redacted::new("pass".to_string())),
554                truststore_path: None,
555                truststore_password: None,
556                sig_username: Some("cert".to_string()),
557                sig_password: Some(Redacted::new("sig_pass".to_string())),
558                enc_username: None,
559                security_actions_out: Some("Timestamp Signature".to_string()),
560                security_actions_in: Some("Timestamp Signature".to_string()),
561                signature_algorithm: None,
562                signature_digest_algorithm: None,
563                signature_c14n_algorithm: None,
564                signature_parts: None,
565            },
566        ];
567
568        let cfg =
569            BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
570
571        assert_eq!(cfg.spec.name, "cxf-bridge");
572        assert!(cfg.broker_url.is_empty());
573        assert_eq!(cfg.broker_type, BrokerType::Generic);
574        assert!(cfg.username.is_none());
575        assert!(cfg.password.is_none());
576
577        // Find CXF_PROFILES env var
578        let profiles_var = cfg
579            .env_vars
580            .iter()
581            .find(|(k, _)| k == "CXF_PROFILES")
582            .expect("CXF_PROFILES env var must exist");
583        assert_eq!(profiles_var.1, "baleares,extremadura");
584
585        // Check baleares profile vars (no security)
586        assert!(
587            cfg.env_vars
588                .iter()
589                .any(|(k, v)| k == "CXF_PROFILE_BALEARES_WSDL_PATH" && v == "/a.wsdl")
590        );
591        assert!(
592            cfg.env_vars
593                .iter()
594                .any(|(k, v)| k == "CXF_PROFILE_BALEARES_SERVICE_NAME" && v == "Svc")
595        );
596        assert!(
597            cfg.env_vars
598                .iter()
599                .any(|(k, v)| k == "CXF_PROFILE_BALEARES_PORT_NAME" && v == "Port")
600        );
601        assert!(
602            !cfg.env_vars
603                .iter()
604                .any(|(k, _)| k == "CXF_PROFILE_BALEARES_ADDRESS")
605        );
606
607        // Check extremadura profile vars (with security)
608        assert!(
609            cfg.env_vars
610                .iter()
611                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_WSDL_PATH" && v == "/b.wsdl")
612        );
613        assert!(
614            cfg.env_vars
615                .iter()
616                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_ADDRESS" && v == "http://host:9090/ws")
617        );
618        assert!(
619            cfg.env_vars
620                .iter()
621                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PATH" && v == "/b.jks")
622        );
623        assert!(
624            cfg.env_vars
625                .iter()
626                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PASSWORD" && v == "pass")
627        );
628        assert!(
629            cfg.env_vars
630                .iter()
631                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_USERNAME" && v == "cert")
632        );
633        assert!(
634            cfg.env_vars
635                .iter()
636                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_PASSWORD" && v == "sig_pass")
637        );
638        assert!(
639            cfg.env_vars
640                .iter()
641                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SECURITY_ACTIONS_OUT"
642                    && v == "Timestamp Signature")
643        );
644    }
645
646    #[test]
647    fn cxf_profiles_single_profile_no_security() {
648        let profiles = vec![CxfProfileEnvVars {
649            name: "test".to_string(),
650            wsdl_path: "service.wsdl".to_string(),
651            service_name: "{http://example.com}Service".to_string(),
652            port_name: "{http://example.com}Port".to_string(),
653            address: None,
654            keystore_path: None,
655            keystore_password: None,
656            truststore_path: None,
657            truststore_password: None,
658            sig_username: None,
659            sig_password: None,
660            enc_username: None,
661            security_actions_out: None,
662            security_actions_in: None,
663            signature_algorithm: None,
664            signature_digest_algorithm: None,
665            signature_c14n_algorithm: None,
666            signature_parts: None,
667        }];
668
669        let cfg =
670            BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
671
672        assert_eq!(cfg.spec.name, "cxf-bridge");
673        // CXF_PROFILES + 3 required vars (WSDL_PATH, SERVICE_NAME, PORT_NAME)
674        assert_eq!(cfg.env_vars.len(), 4);
675        assert_eq!(cfg.env_vars[0].0, "CXF_PROFILES");
676        assert_eq!(cfg.env_vars[0].1, "test");
677        assert_eq!(cfg.env_vars[1].0, "CXF_PROFILE_TEST_WSDL_PATH");
678        assert_eq!(cfg.env_vars[1].1, "service.wsdl");
679        assert_eq!(cfg.env_vars[2].0, "CXF_PROFILE_TEST_SERVICE_NAME");
680        assert_eq!(cfg.env_vars[2].1, "{http://example.com}Service");
681        assert_eq!(cfg.env_vars[3].0, "CXF_PROFILE_TEST_PORT_NAME");
682        assert_eq!(cfg.env_vars[3].1, "{http://example.com}Port");
683    }
684
685    #[test]
686    fn profile_env_vars_to_env_vars_includes_all_fields() {
687        let vars = CxfProfileEnvVars {
688            name: "full".to_string(),
689            wsdl_path: "/wsdl".to_string(),
690            service_name: "Svc".to_string(),
691            port_name: "Port".to_string(),
692            address: Some("http://host:8080".to_string()),
693            keystore_path: Some("/ks.jks".to_string()),
694            keystore_password: Some(Redacted::new("ks_pass".to_string())),
695            truststore_path: Some("/ts.jks".to_string()),
696            truststore_password: Some(Redacted::new("ts_pass".to_string())),
697            sig_username: Some("user".to_string()),
698            sig_password: Some(Redacted::new("sig_pass".to_string())),
699            enc_username: None,
700            security_actions_out: Some("Timestamp Signature".to_string()),
701            security_actions_in: Some("Timestamp".to_string()),
702            signature_algorithm: None,
703            signature_digest_algorithm: None,
704            signature_c14n_algorithm: None,
705            signature_parts: None,
706        };
707
708        let env = vars.to_env_vars();
709        // 3 required + 1 address + 8 security = 12
710        assert_eq!(env.len(), 12);
711
712        let keys: Vec<&str> = env.iter().map(|(k, _)| k.as_str()).collect();
713        assert!(keys.contains(&"CXF_PROFILE_FULL_WSDL_PATH"));
714        assert!(keys.contains(&"CXF_PROFILE_FULL_SERVICE_NAME"));
715        assert!(keys.contains(&"CXF_PROFILE_FULL_PORT_NAME"));
716        assert!(keys.contains(&"CXF_PROFILE_FULL_ADDRESS"));
717        assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PATH"));
718        assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PASSWORD"));
719        assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PATH"));
720        assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PASSWORD"));
721        assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_USERNAME"));
722        assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_PASSWORD"));
723        assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_OUT"));
724        assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_IN"));
725    }
726
727    #[test]
728    fn test_start_timeout_zero_rejected() {
729        let config = BridgeProcessConfig::jms(
730            PathBuf::from("/usr/bin/echo"),
731            "tcp://localhost:61616".to_string(),
732            BrokerType::ActiveMq,
733            None,
734            None,
735            0,
736        );
737        let result = config.validate();
738        assert!(result.is_err());
739    }
740
741    #[test]
742    fn test_bridge_rejects_zero_start_timeout() {
743        let config = BridgeProcessConfig::jms(
744            PathBuf::from("/usr/bin/echo"),
745            "tcp://localhost:61616".to_string(),
746            BrokerType::ActiveMq,
747            None,
748            None,
749            0,
750        );
751        assert!(config.validate().is_err());
752    }
753
754    #[tokio::test]
755    async fn test_bridge_stop_completes() {
756        use tokio::process::Command;
757        use tokio::time::{Duration, timeout};
758
759        let child = Command::new("sh")
760            .arg("-c")
761            .arg("trap '' TERM; while true; do echo tick; sleep 1; done")
762            .stdout(std::process::Stdio::null())
763            .spawn()
764            .expect("must spawn test child process");
765
766        let bridge = BridgeProcess {
767            child,
768            grpc_port: 0,
769            token: CancellationToken::new(),
770            handle: None,
771        };
772
773        let result = timeout(Duration::from_secs(5), bridge.stop()).await;
774        assert!(result.is_ok(), "stop() must complete within 5s");
775    }
776
777    // --- BRG-004: Redacted<T> tests ---
778
779    #[test]
780    fn redacted_debug_displays_redacted() {
781        let r = Redacted::new("secret_password".to_string());
782        assert_eq!(format!("{r:?}"), "[REDACTED]");
783    }
784
785    #[test]
786    fn redacted_display_displays_redacted() {
787        let r = Redacted::new("secret_password".to_string());
788        assert_eq!(format!("{r}"), "[REDACTED]");
789    }
790
791    #[test]
792    fn redacted_deref_gives_inner_value() {
793        let r = Redacted::new("secret".to_string());
794        assert_eq!(&*r, "secret");
795    }
796
797    #[test]
798    fn redacted_into_inner_returns_value() {
799        let r = Redacted::new("secret".to_string());
800        assert_eq!(r.into_inner(), "secret");
801    }
802
803    #[test]
804    fn redacted_clone_works() {
805        let r = Redacted::new("secret".to_string());
806        let c = r.clone();
807        assert_eq!(&*c, "secret");
808        assert_eq!(format!("{c:?}"), "[REDACTED]");
809    }
810
811    #[test]
812    fn bridge_process_config_debug_redacts_password() {
813        let cfg = BridgeProcessConfig::jms(
814            PathBuf::from("/tmp/jms-bridge"),
815            "tcp://localhost:61616".to_string(),
816            BrokerType::ActiveMq,
817            Some("user".to_string()),
818            Some(Redacted::new("super_secret".to_string())),
819            1000,
820        );
821        // The Redacted<T> password field must show [REDACTED] in debug output.
822        // env_vars is a separate Vec<(String, String)> used for process injection
823        // and legitimately contains the raw value — that is not a Redacted leak.
824        let password_debug = format!("{:?}", cfg.password); // allow-secret
825        assert!(
826            !password_debug.contains("super_secret"),
827            "Password field must not leak in Debug: {password_debug}"
828        );
829        assert_eq!(
830            password_debug, "Some([REDACTED])",
831            "Password field must show [REDACTED]: {password_debug}"
832        );
833    }
834
835    #[test]
836    fn cxf_profile_debug_redacts_passwords() {
837        let profile = CxfProfileEnvVars {
838            name: "test".to_string(),
839            wsdl_path: "/a.wsdl".to_string(),
840            service_name: "Svc".to_string(),
841            port_name: "Port".to_string(),
842            address: None,
843            keystore_path: None,
844            keystore_password: Some(Redacted::new("ks_secret_val".to_string())),
845            truststore_path: None,
846            truststore_password: Some(Redacted::new("ts_secret_val".to_string())),
847            sig_username: None,
848            sig_password: Some(Redacted::new("sig_secret_val".to_string())),
849            enc_username: None,
850            security_actions_out: None,
851            security_actions_in: None,
852            signature_algorithm: None,
853            signature_digest_algorithm: None,
854            signature_c14n_algorithm: None,
855            signature_parts: None,
856        };
857        let debug_output = format!("{profile:?}");
858        assert!(
859            !debug_output.contains("ks_secret_val")
860                && !debug_output.contains("ts_secret_val")
861                && !debug_output.contains("sig_secret_val"),
862            "Debug must not contain passwords: {debug_output}"
863        );
864    }
865}