Skip to main content

camel_bridge/
process.rs

1use std::fmt;
2use std::ops::Deref;
3use std::path::PathBuf;
4use thiserror::Error;
5use tokio::task::JoinHandle;
6use tokio_util::sync::CancellationToken;
7
8use crate::spec::{BridgeSpec, CXF_BRIDGE, JMS_BRIDGE, XML_BRIDGE};
9
10// ---------------------------------------------------------------------------
11// Redacted<T> — wrapper that never leaks inner value via Debug/Display
12// ---------------------------------------------------------------------------
13
14/// A newtype that redacts its inner value in `Debug` and `Display` output.
15/// Used for password/credential fields to prevent accidental logging.
16#[derive(Clone)]
17pub struct Redacted<T>(T);
18
19impl<T> Redacted<T> {
20    pub fn new(value: T) -> Self {
21        Self(value)
22    }
23
24    pub fn into_inner(self) -> T {
25        self.0
26    }
27}
28
29impl<T> fmt::Debug for Redacted<T> {
30    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31        write!(f, "[REDACTED]")
32    }
33}
34
35impl<T> fmt::Display for Redacted<T> {
36    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37        write!(f, "[REDACTED]")
38    }
39}
40
41impl<T> Deref for Redacted<T> {
42    type Target = T;
43
44    fn deref(&self) -> &Self::Target {
45        &self.0
46    }
47}
48
49#[derive(Debug, Error)]
50pub enum BridgeError {
51    #[error("IO error: {0}")]
52    Io(#[from] std::io::Error),
53    #[error("Bridge timed out: {0}")]
54    Timeout(String),
55    #[error("Bridge stdout closed before ready message")]
56    StdoutClosed,
57    #[error("Bridge ready message malformed: {0}")]
58    BadReadyMessage(String),
59    #[error("Download failed: {0}")]
60    Download(String),
61    #[error("Checksum mismatch: expected {expected}, got {actual}")]
62    ChecksumMismatch { expected: String, actual: String },
63    #[error("URL not allowed: {0}")]
64    UrlNotAllowed(String),
65    #[error("Transport error: {0}")]
66    Transport(String),
67    #[error("Config error: {0}")]
68    Config(String),
69}
70
71#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
72#[serde(rename_all = "lowercase")]
73pub enum BrokerType {
74    #[serde(alias = "active_mq")]
75    ActiveMq,
76    Artemis,
77    Generic,
78}
79
80impl BrokerType {
81    pub fn as_env_str(&self) -> &'static str {
82        match self {
83            BrokerType::ActiveMq => "activemq",
84            BrokerType::Artemis => "artemis",
85            BrokerType::Generic => "generic",
86        }
87    }
88}
89
90impl std::str::FromStr for BrokerType {
91    type Err = BridgeError;
92
93    fn from_str(s: &str) -> Result<Self, Self::Err> {
94        match s.to_lowercase().as_str() {
95            "activemq" => Ok(BrokerType::ActiveMq),
96            "artemis" => Ok(BrokerType::Artemis),
97            "generic" => Ok(BrokerType::Generic),
98            other => Err(BridgeError::Config(format!("unknown broker type: {other}"))), // allow-secret
99        }
100    }
101}
102
103/// Environment variables for a single CXF profile, used by the bridge Java side.
104/// Password fields use [`Redacted`] to prevent accidental credential leakage in logs.
105#[derive(Debug)]
106pub struct CxfProfileEnvVars {
107    pub name: String,
108    pub wsdl_path: String,
109    pub service_name: String,
110    pub port_name: String,
111    pub address: Option<String>,
112    pub keystore_path: Option<String>,
113    pub keystore_password: Option<Redacted<String>>,
114    pub truststore_path: Option<String>,
115    pub truststore_password: Option<Redacted<String>>,
116    pub sig_username: Option<String>,
117    pub sig_password: Option<Redacted<String>>,
118    pub enc_username: Option<String>,
119    pub security_actions_out: Option<String>,
120    pub security_actions_in: Option<String>,
121    pub signature_algorithm: Option<String>,
122    pub signature_digest_algorithm: Option<String>,
123    pub signature_c14n_algorithm: Option<String>,
124    pub signature_parts: Option<String>,
125}
126
127impl CxfProfileEnvVars {
128    pub fn to_env_vars(&self) -> Vec<(String, String)> {
129        let prefix = format!("CXF_PROFILE_{}_", self.name.to_uppercase());
130        let mut vars = vec![
131            (format!("{}WSDL_PATH", prefix), self.wsdl_path.clone()),
132            (format!("{}SERVICE_NAME", prefix), self.service_name.clone()),
133            (format!("{}PORT_NAME", prefix), self.port_name.clone()),
134        ];
135
136        if let Some(ref v) = self.address {
137            vars.push((format!("{}ADDRESS", prefix), v.clone()));
138        }
139        if let Some(ref v) = self.keystore_path {
140            vars.push((format!("{}KEYSTORE_PATH", prefix), v.clone()));
141        }
142        if let Some(ref v) = self.keystore_password {
143            vars.push((format!("{}KEYSTORE_PASSWORD", prefix), (**v).clone()));
144        }
145        if let Some(ref v) = self.truststore_path {
146            vars.push((format!("{}TRUSTSTORE_PATH", prefix), v.clone()));
147        }
148        if let Some(ref v) = self.truststore_password {
149            vars.push((format!("{}TRUSTSTORE_PASSWORD", prefix), (**v).clone()));
150        }
151        if let Some(ref v) = self.sig_username {
152            vars.push((format!("{}SIG_USERNAME", prefix), v.clone()));
153        }
154        if let Some(ref v) = self.sig_password {
155            vars.push((format!("{}SIG_PASSWORD", prefix), (**v).clone()));
156        }
157        if let Some(ref v) = self.enc_username {
158            vars.push((format!("{}ENC_USERNAME", prefix), v.clone()));
159        }
160        if let Some(ref v) = self.security_actions_out {
161            vars.push((format!("{}SECURITY_ACTIONS_OUT", prefix), v.clone()));
162        }
163        if let Some(ref v) = self.security_actions_in {
164            vars.push((format!("{}SECURITY_ACTIONS_IN", prefix), v.clone()));
165        }
166        if let Some(ref v) = self.signature_algorithm {
167            vars.push((format!("{}SIGNATURE_ALGORITHM", prefix), v.clone()));
168        }
169        if let Some(ref v) = self.signature_digest_algorithm {
170            vars.push((format!("{}SIGNATURE_DIGEST_ALGORITHM", prefix), v.clone()));
171        }
172        if let Some(ref v) = self.signature_c14n_algorithm {
173            vars.push((format!("{}SIGNATURE_C14N_ALGORITHM", prefix), v.clone()));
174        }
175        if let Some(ref v) = self.signature_parts {
176            vars.push((format!("{}SIGNATURE_PARTS", prefix), v.clone()));
177        }
178
179        vars
180    }
181}
182
183/// Configuration for spawning a bridge subprocess.
184/// Password fields use [`Redacted`] to prevent accidental credential leakage in logs.
185#[derive(Debug)]
186pub struct BridgeProcessConfig {
187    pub spec: &'static BridgeSpec,
188    pub binary_path: PathBuf,
189    pub broker_url: String,
190    pub broker_type: BrokerType,
191    pub username: Option<String>,
192    pub password: Option<Redacted<String>>,
193    pub start_timeout_ms: u64,
194    pub env_vars: Vec<(String, String)>,
195}
196
197impl BridgeProcessConfig {
198    /// Constructor for the JMS bridge.
199    pub fn jms(
200        binary_path: PathBuf,
201        broker_url: String,
202        broker_type: BrokerType,
203        username: Option<String>,
204        password: Option<Redacted<String>>,
205        start_timeout_ms: u64,
206    ) -> Self {
207        let mut env_vars = vec![
208            ("BRIDGE_BROKER_URL".to_string(), broker_url.clone()),
209            (
210                "BRIDGE_BROKER_TYPE".to_string(),
211                broker_type.as_env_str().to_string(),
212            ),
213        ];
214        if let Some(u) = &username {
215            env_vars.push(("BRIDGE_USERNAME".to_string(), u.clone()));
216        }
217        if let Some(p) = &password {
218            env_vars.push(("BRIDGE_PASSWORD".to_string(), (**p).clone()));
219        }
220        Self {
221            spec: &JMS_BRIDGE,
222            binary_path,
223            broker_url,
224            broker_type,
225            username,
226            password,
227            start_timeout_ms,
228            env_vars,
229        }
230    }
231
232    /// Constructor for the XML bridge.
233    pub fn xml(binary_path: PathBuf, start_timeout_ms: u64) -> Self {
234        Self {
235            spec: &XML_BRIDGE,
236            binary_path,
237            broker_url: String::new(),
238            broker_type: BrokerType::Generic,
239            username: None,
240            password: None,
241            start_timeout_ms,
242            env_vars: vec![],
243        }
244    }
245
246    /// Constructor for the CXF bridge with multi-profile support.
247    /// Generates `CXF_PROFILES=list` env var plus per-profile env vars.
248    pub fn cxf_profiles(
249        binary_path: PathBuf,
250        profiles: &[CxfProfileEnvVars],
251        start_timeout_ms: u64,
252    ) -> Self {
253        let profile_names: Vec<String> = profiles.iter().map(|p| p.name.clone()).collect();
254        let mut env_vars = vec![("CXF_PROFILES".to_string(), profile_names.join(","))];
255
256        for profile in profiles {
257            env_vars.extend(profile.to_env_vars());
258        }
259
260        Self {
261            spec: &CXF_BRIDGE,
262            binary_path,
263            broker_url: String::new(),
264            broker_type: BrokerType::Generic,
265            username: None,
266            password: None,
267            start_timeout_ms,
268            env_vars,
269        }
270    }
271
272    pub fn validate(&self) -> Result<(), String> {
273        if self.start_timeout_ms == 0 {
274            return Err("start_timeout_ms must be > 0".to_string());
275        }
276        Ok(())
277    }
278}
279
280pub struct BridgeProcess {
281    child: tokio::process::Child,
282    grpc_port: u16,
283    token: CancellationToken,
284    handle: Option<JoinHandle<()>>,
285}
286
287impl BridgeProcess {
288    pub fn grpc_port(&self) -> u16 {
289        self.grpc_port
290    }
291
292    /// Spawn the bridge process. Reads the gRPC port from stdout JSON line:
293    ///   {"status":"ready","port":PORT}
294    ///
295    /// Picks a free OS port and passes it to the bridge via `QUARKUS_GRPC_SERVER_PORT`
296    /// so Quarkus binds exactly to that port and PortAnnouncer can echo it back.
297    pub async fn start(config: &BridgeProcessConfig) -> Result<Self, BridgeError> {
298        use tokio::io::AsyncBufReadExt;
299        use tokio::process::Command;
300        use tokio::time::{Duration, timeout};
301
302        config.validate().map_err(BridgeError::Config)?;
303
304        // Bind :0 to let the OS pick a free port, then release so the bridge can use it.
305        let free_port = {
306            let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
307            listener.local_addr()?.port()
308        };
309
310        // If CAMEL_BRIDGE_LOG_STDERR is set, redirect stderr to a file for debugging.
311        let stderr_stdio: std::process::Stdio =
312            if let Ok(log_dir) = std::env::var("CAMEL_BRIDGE_LOG_STDERR") {
313                let log_filename = config
314                    .spec
315                    .log_file_template
316                    .replace("{pid}", &std::process::id().to_string());
317                let log_path = if log_dir.is_empty() {
318                    format!("/tmp/{log_filename}")
319                } else {
320                    format!("{log_dir}/{log_filename}")
321                };
322                match std::fs::File::create(&log_path) {
323                    Ok(f) => {
324                        eprintln!("[camel-bridge] stderr → {}", log_path);
325                        f.into()
326                    }
327                    Err(e) => {
328                        eprintln!(
329                            "[camel-bridge] failed to create log file {}: {}",
330                            log_path, e
331                        );
332                        std::process::Stdio::inherit()
333                    }
334                }
335            } else {
336                std::process::Stdio::inherit()
337            };
338
339        let mut command = Command::new(&config.binary_path);
340        command
341            .env("QUARKUS_GRPC_SERVER_PORT", free_port.to_string())
342            // Let the OS pick a random HTTP port — we only use gRPC.
343            // Without this, Quarkus binds HTTP on 8080 and fails if occupied.
344            .env("QUARKUS_HTTP_PORT", "0")
345            .stdout(std::process::Stdio::piped())
346            .stderr(stderr_stdio);
347
348        // Inject bridge-specific env vars (e.g. JMS broker URL/credentials via ::jms()).
349        for (key, value) in &config.env_vars {
350            command.env(key, value);
351        }
352
353        let mut child = command.spawn()?;
354
355        let stdout = child.stdout.take().ok_or(BridgeError::StdoutClosed)?;
356        let mut reader = tokio::io::BufReader::new(stdout).lines();
357
358        let port = timeout(Duration::from_millis(config.start_timeout_ms), async {
359            while let Some(line) = reader.next_line().await? {
360                if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line)
361                    && v.get("status").and_then(|s| s.as_str()) == Some("ready")
362                {
363                    if let Some(p) = v.get("port").and_then(|p| p.as_u64()) {
364                        return Ok(p as u16);
365                    }
366                    // log-policy: system-broken
367                    tracing::error!("bridge ready message malformed: {line}");
368                    return Err(BridgeError::BadReadyMessage(line));
369                }
370            }
371            // log-policy: system-broken
372            tracing::error!("bridge stdout closed before ready message");
373            Err(BridgeError::StdoutClosed)
374        })
375        .await
376        .map_err(|_| {
377            let msg = format!(
378                "{} failed to start: health check timeout after {}ms",
379                config.spec.name, config.start_timeout_ms
380            );
381            // log-policy: system-broken
382            tracing::error!("{msg}");
383            BridgeError::Timeout(msg)
384        })??;
385
386        // Keep draining stdout in background; allow cooperative cancellation.
387        let token = CancellationToken::new();
388        let child_token = token.clone();
389        let handle = tokio::spawn(async move {
390            loop {
391                tokio::select! {
392                    _ = child_token.cancelled() => break,
393                    line = reader.next_line() => {
394                        match line {
395                            Ok(Some(line)) => tracing::debug!(target: "camel_bridge::child", "{}", line),
396                            Ok(None) | Err(_) => break,
397                        }
398                    }
399                }
400            }
401        });
402
403        Ok(BridgeProcess {
404            child,
405            grpc_port: port,
406            token,
407            handle: Some(handle),
408        })
409    }
410
411    /// Gracefully stop: SIGTERM + wait for exit.
412    pub async fn stop(mut self) -> Result<(), BridgeError> {
413        use tokio::time::{Duration, sleep};
414
415        self.token.cancel();
416
417        if let Some(handle) = self.handle.take() {
418            let join_result = tokio::time::timeout(Duration::from_secs(5), handle).await;
419            if join_result.is_err() {
420                tracing::warn!("bridge stdout drain task did not exit after cancellation");
421            }
422        }
423
424        // Send SIGTERM first (graceful shutdown)
425        #[cfg(unix)]
426        {
427            let pid = self.child.id().unwrap_or(0);
428            if pid > 0 {
429                // SAFETY: libc::kill is called with the child process PID obtained from tokio.
430                unsafe {
431                    libc::kill(pid as i32, libc::SIGTERM);
432                }
433            }
434        }
435
436        // On non-Unix (Windows), fall through to kill immediately
437        #[cfg(not(unix))]
438        let _ = self.child.start_kill();
439
440        // Wait up to 5 seconds for graceful exit, then SIGKILL
441        tokio::select! {
442            result = self.child.wait() => {
443                result?;
444            }
445            _ = sleep(Duration::from_secs(5)) => {
446                let _ = self.child.start_kill();
447                self.child.wait().await?;
448            }
449        }
450        Ok(())
451    }
452}
453
454impl Drop for BridgeProcess {
455    fn drop(&mut self) {
456        self.token.cancel();
457        // Best-effort only. Does NOT wait — cannot block in Drop.
458        let _ = self.child.start_kill();
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use super::*;
465
466    #[test]
467    fn broker_type_from_str_activemq() {
468        assert_eq!(
469            "activemq".parse::<BrokerType>().unwrap(),
470            BrokerType::ActiveMq
471        );
472        assert_eq!(
473            "ACTIVEMQ".parse::<BrokerType>().unwrap(),
474            BrokerType::ActiveMq
475        );
476    }
477
478    #[test]
479    fn broker_type_from_str_artemis() {
480        assert_eq!(
481            "artemis".parse::<BrokerType>().unwrap(),
482            BrokerType::Artemis
483        );
484    }
485
486    #[test]
487    fn broker_type_from_str_generic() {
488        assert_eq!(
489            "generic".parse::<BrokerType>().unwrap(),
490            BrokerType::Generic
491        );
492    }
493
494    #[test]
495    fn broker_type_from_str_unknown_returns_err() {
496        assert!("ibmmq".parse::<BrokerType>().is_err());
497        assert!("UnknownBroker".parse::<BrokerType>().is_err());
498    }
499
500    #[test]
501    fn broker_type_env_str() {
502        assert_eq!(BrokerType::ActiveMq.as_env_str(), "activemq");
503        assert_eq!(BrokerType::Artemis.as_env_str(), "artemis");
504        assert_eq!(BrokerType::Generic.as_env_str(), "generic");
505    }
506
507    #[test]
508    fn jms_constructor_uses_jms_spec() {
509        let cfg = BridgeProcessConfig::jms(
510            PathBuf::from("/tmp/jms-bridge"),
511            "tcp://localhost:61616".to_string(),
512            BrokerType::ActiveMq,
513            Some("user".to_string()),
514            Some(Redacted::new("pass".to_string())),
515            1000,
516        );
517        assert_eq!(cfg.spec.name, "jms-bridge");
518    }
519
520    #[test]
521    fn xml_constructor_uses_xml_spec() {
522        let cfg = BridgeProcessConfig::xml(PathBuf::from("/tmp/xml-bridge"), 1000);
523        assert_eq!(cfg.spec.name, "xml-bridge");
524    }
525
526    #[test]
527    fn cxf_profiles_generates_cxf_profiles_env_var() {
528        let profiles = vec![
529            CxfProfileEnvVars {
530                name: "baleares".to_string(),
531                wsdl_path: "/a.wsdl".to_string(),
532                service_name: "Svc".to_string(),
533                port_name: "Port".to_string(),
534                address: None,
535                keystore_path: None,
536                keystore_password: None,
537                truststore_path: None,
538                truststore_password: None,
539                sig_username: None,
540                sig_password: None,
541                enc_username: None,
542                security_actions_out: None,
543                security_actions_in: None,
544                signature_algorithm: None,
545                signature_digest_algorithm: None,
546                signature_c14n_algorithm: None,
547                signature_parts: None,
548            },
549            CxfProfileEnvVars {
550                name: "extremadura".to_string(),
551                wsdl_path: "/b.wsdl".to_string(),
552                service_name: "Svc2".to_string(),
553                port_name: "Port2".to_string(),
554                address: Some("http://host:9090/ws".to_string()),
555                keystore_path: Some("/b.jks".to_string()),
556                keystore_password: Some(Redacted::new("pass".to_string())),
557                truststore_path: None,
558                truststore_password: None,
559                sig_username: Some("cert".to_string()),
560                sig_password: Some(Redacted::new("sig_pass".to_string())),
561                enc_username: None,
562                security_actions_out: Some("Timestamp Signature".to_string()),
563                security_actions_in: Some("Timestamp Signature".to_string()),
564                signature_algorithm: None,
565                signature_digest_algorithm: None,
566                signature_c14n_algorithm: None,
567                signature_parts: None,
568            },
569        ];
570
571        let cfg =
572            BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
573
574        assert_eq!(cfg.spec.name, "cxf-bridge");
575        assert!(cfg.broker_url.is_empty());
576        assert_eq!(cfg.broker_type, BrokerType::Generic);
577        assert!(cfg.username.is_none());
578        assert!(cfg.password.is_none());
579
580        // Find CXF_PROFILES env var
581        let profiles_var = cfg
582            .env_vars
583            .iter()
584            .find(|(k, _)| k == "CXF_PROFILES")
585            .expect("CXF_PROFILES env var must exist");
586        assert_eq!(profiles_var.1, "baleares,extremadura");
587
588        // Check baleares profile vars (no security)
589        assert!(
590            cfg.env_vars
591                .iter()
592                .any(|(k, v)| k == "CXF_PROFILE_BALEARES_WSDL_PATH" && v == "/a.wsdl")
593        );
594        assert!(
595            cfg.env_vars
596                .iter()
597                .any(|(k, v)| k == "CXF_PROFILE_BALEARES_SERVICE_NAME" && v == "Svc")
598        );
599        assert!(
600            cfg.env_vars
601                .iter()
602                .any(|(k, v)| k == "CXF_PROFILE_BALEARES_PORT_NAME" && v == "Port")
603        );
604        assert!(
605            !cfg.env_vars
606                .iter()
607                .any(|(k, _)| k == "CXF_PROFILE_BALEARES_ADDRESS")
608        );
609
610        // Check extremadura profile vars (with security)
611        assert!(
612            cfg.env_vars
613                .iter()
614                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_WSDL_PATH" && v == "/b.wsdl")
615        );
616        assert!(
617            cfg.env_vars
618                .iter()
619                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_ADDRESS" && v == "http://host:9090/ws")
620        );
621        assert!(
622            cfg.env_vars
623                .iter()
624                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PATH" && v == "/b.jks")
625        );
626        assert!(
627            cfg.env_vars
628                .iter()
629                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PASSWORD" && v == "pass")
630        );
631        assert!(
632            cfg.env_vars
633                .iter()
634                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_USERNAME" && v == "cert")
635        );
636        assert!(
637            cfg.env_vars
638                .iter()
639                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_PASSWORD" && v == "sig_pass")
640        );
641        assert!(
642            cfg.env_vars
643                .iter()
644                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SECURITY_ACTIONS_OUT"
645                    && v == "Timestamp Signature")
646        );
647    }
648
649    #[test]
650    fn cxf_profiles_single_profile_no_security() {
651        let profiles = vec![CxfProfileEnvVars {
652            name: "test".to_string(),
653            wsdl_path: "service.wsdl".to_string(),
654            service_name: "{http://example.com}Service".to_string(),
655            port_name: "{http://example.com}Port".to_string(),
656            address: None,
657            keystore_path: None,
658            keystore_password: None,
659            truststore_path: None,
660            truststore_password: None,
661            sig_username: None,
662            sig_password: None,
663            enc_username: None,
664            security_actions_out: None,
665            security_actions_in: None,
666            signature_algorithm: None,
667            signature_digest_algorithm: None,
668            signature_c14n_algorithm: None,
669            signature_parts: None,
670        }];
671
672        let cfg =
673            BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
674
675        assert_eq!(cfg.spec.name, "cxf-bridge");
676        // CXF_PROFILES + 3 required vars (WSDL_PATH, SERVICE_NAME, PORT_NAME)
677        assert_eq!(cfg.env_vars.len(), 4);
678        assert_eq!(cfg.env_vars[0].0, "CXF_PROFILES");
679        assert_eq!(cfg.env_vars[0].1, "test");
680        assert_eq!(cfg.env_vars[1].0, "CXF_PROFILE_TEST_WSDL_PATH");
681        assert_eq!(cfg.env_vars[1].1, "service.wsdl");
682        assert_eq!(cfg.env_vars[2].0, "CXF_PROFILE_TEST_SERVICE_NAME");
683        assert_eq!(cfg.env_vars[2].1, "{http://example.com}Service");
684        assert_eq!(cfg.env_vars[3].0, "CXF_PROFILE_TEST_PORT_NAME");
685        assert_eq!(cfg.env_vars[3].1, "{http://example.com}Port");
686    }
687
688    #[test]
689    fn profile_env_vars_to_env_vars_includes_all_fields() {
690        let vars = CxfProfileEnvVars {
691            name: "full".to_string(),
692            wsdl_path: "/wsdl".to_string(),
693            service_name: "Svc".to_string(),
694            port_name: "Port".to_string(),
695            address: Some("http://host:8080".to_string()),
696            keystore_path: Some("/ks.jks".to_string()),
697            keystore_password: Some(Redacted::new("ks_pass".to_string())),
698            truststore_path: Some("/ts.jks".to_string()),
699            truststore_password: Some(Redacted::new("ts_pass".to_string())),
700            sig_username: Some("user".to_string()),
701            sig_password: Some(Redacted::new("sig_pass".to_string())),
702            enc_username: None,
703            security_actions_out: Some("Timestamp Signature".to_string()),
704            security_actions_in: Some("Timestamp".to_string()),
705            signature_algorithm: None,
706            signature_digest_algorithm: None,
707            signature_c14n_algorithm: None,
708            signature_parts: None,
709        };
710
711        let env = vars.to_env_vars();
712        // 3 required + 1 address + 8 security = 12
713        assert_eq!(env.len(), 12);
714
715        let keys: Vec<&str> = env.iter().map(|(k, _)| k.as_str()).collect();
716        assert!(keys.contains(&"CXF_PROFILE_FULL_WSDL_PATH"));
717        assert!(keys.contains(&"CXF_PROFILE_FULL_SERVICE_NAME"));
718        assert!(keys.contains(&"CXF_PROFILE_FULL_PORT_NAME"));
719        assert!(keys.contains(&"CXF_PROFILE_FULL_ADDRESS"));
720        assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PATH"));
721        assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PASSWORD"));
722        assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PATH"));
723        assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PASSWORD"));
724        assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_USERNAME"));
725        assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_PASSWORD"));
726        assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_OUT"));
727        assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_IN"));
728    }
729
730    #[test]
731    fn test_start_timeout_zero_rejected() {
732        let config = BridgeProcessConfig::jms(
733            PathBuf::from("/usr/bin/echo"),
734            "tcp://localhost:61616".to_string(),
735            BrokerType::ActiveMq,
736            None,
737            None,
738            0,
739        );
740        let result = config.validate();
741        assert!(result.is_err());
742    }
743
744    #[test]
745    fn test_bridge_rejects_zero_start_timeout() {
746        let config = BridgeProcessConfig::jms(
747            PathBuf::from("/usr/bin/echo"),
748            "tcp://localhost:61616".to_string(),
749            BrokerType::ActiveMq,
750            None,
751            None,
752            0,
753        );
754        assert!(config.validate().is_err());
755    }
756
757    #[tokio::test]
758    async fn test_bridge_stop_completes() {
759        use tokio::process::Command;
760        use tokio::time::{Duration, timeout};
761
762        let child = Command::new("sh")
763            .arg("-c")
764            .arg("trap '' TERM; while true; do echo tick; sleep 1; done")
765            .stdout(std::process::Stdio::null())
766            .spawn()
767            .expect("must spawn test child process");
768
769        let bridge = BridgeProcess {
770            child,
771            grpc_port: 0,
772            token: CancellationToken::new(),
773            handle: None,
774        };
775
776        let result = timeout(Duration::from_secs(10), bridge.stop()).await;
777        assert!(result.is_ok(), "stop() must complete within 10s");
778    }
779
780    // --- BRG-004: Redacted<T> tests ---
781
782    #[test]
783    fn redacted_debug_displays_redacted() {
784        let r = Redacted::new("secret_password".to_string());
785        assert_eq!(format!("{r:?}"), "[REDACTED]");
786    }
787
788    #[test]
789    fn redacted_display_displays_redacted() {
790        let r = Redacted::new("secret_password".to_string());
791        assert_eq!(format!("{r}"), "[REDACTED]");
792    }
793
794    #[test]
795    fn redacted_deref_gives_inner_value() {
796        let r = Redacted::new("secret".to_string());
797        assert_eq!(&*r, "secret");
798    }
799
800    #[test]
801    fn redacted_into_inner_returns_value() {
802        let r = Redacted::new("secret".to_string());
803        assert_eq!(r.into_inner(), "secret");
804    }
805
806    #[test]
807    fn redacted_clone_works() {
808        let r = Redacted::new("secret".to_string());
809        let c = r.clone();
810        assert_eq!(&*c, "secret");
811        assert_eq!(format!("{c:?}"), "[REDACTED]");
812    }
813
814    #[test]
815    fn bridge_process_config_debug_redacts_password() {
816        let cfg = BridgeProcessConfig::jms(
817            PathBuf::from("/tmp/jms-bridge"),
818            "tcp://localhost:61616".to_string(),
819            BrokerType::ActiveMq,
820            Some("user".to_string()),
821            Some(Redacted::new("super_secret".to_string())),
822            1000,
823        );
824        // The Redacted<T> password field must show [REDACTED] in debug output.
825        // env_vars is a separate Vec<(String, String)> used for process injection
826        // and legitimately contains the raw value — that is not a Redacted leak.
827        let password_debug = format!("{:?}", cfg.password); // allow-secret
828        assert!(
829            !password_debug.contains("super_secret"),
830            "Password field must not leak in Debug: {password_debug}"
831        );
832        assert_eq!(
833            password_debug, "Some([REDACTED])",
834            "Password field must show [REDACTED]: {password_debug}"
835        );
836    }
837
838    #[test]
839    fn cxf_profile_debug_redacts_passwords() {
840        let profile = CxfProfileEnvVars {
841            name: "test".to_string(),
842            wsdl_path: "/a.wsdl".to_string(),
843            service_name: "Svc".to_string(),
844            port_name: "Port".to_string(),
845            address: None,
846            keystore_path: None,
847            keystore_password: Some(Redacted::new("ks_secret_val".to_string())),
848            truststore_path: None,
849            truststore_password: Some(Redacted::new("ts_secret_val".to_string())),
850            sig_username: None,
851            sig_password: Some(Redacted::new("sig_secret_val".to_string())),
852            enc_username: None,
853            security_actions_out: None,
854            security_actions_in: None,
855            signature_algorithm: None,
856            signature_digest_algorithm: None,
857            signature_c14n_algorithm: None,
858            signature_parts: None,
859        };
860        let debug_output = format!("{profile:?}");
861        assert!(
862            !debug_output.contains("ks_secret_val")
863                && !debug_output.contains("ts_secret_val")
864                && !debug_output.contains("sig_secret_val"),
865            "Debug must not contain passwords: {debug_output}"
866        );
867    }
868}