1use std::fmt;
2use std::ops::Deref;
3use std::path::PathBuf;
4use thiserror::Error;
5use tokio::task::JoinHandle;
6use tokio_util::sync::CancellationToken;
7
8use crate::spec::{BridgeSpec, CXF_BRIDGE, JMS_BRIDGE, XML_BRIDGE};
9
10#[derive(Clone)]
17pub struct Redacted<T>(T);
18
19impl<T> Redacted<T> {
20 pub fn new(value: T) -> Self {
21 Self(value)
22 }
23
24 pub fn into_inner(self) -> T {
25 self.0
26 }
27}
28
29impl<T> fmt::Debug for Redacted<T> {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 write!(f, "[REDACTED]")
32 }
33}
34
35impl<T> fmt::Display for Redacted<T> {
36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37 write!(f, "[REDACTED]")
38 }
39}
40
41impl<T> Deref for Redacted<T> {
42 type Target = T;
43
44 fn deref(&self) -> &Self::Target {
45 &self.0
46 }
47}
48
49#[derive(Debug, Error)]
50pub enum BridgeError {
51 #[error("IO error: {0}")]
52 Io(#[from] std::io::Error),
53 #[error("Bridge timed out: {0}")]
54 Timeout(String),
55 #[error("Bridge stdout closed before ready message")]
56 StdoutClosed,
57 #[error("Bridge ready message malformed: {0}")]
58 BadReadyMessage(String),
59 #[error("Download failed: {0}")]
60 Download(String),
61 #[error("Checksum mismatch: expected {expected}, got {actual}")]
62 ChecksumMismatch { expected: String, actual: String },
63 #[error("URL not allowed: {0}")]
64 UrlNotAllowed(String),
65 #[error("Transport error: {0}")]
66 Transport(String),
67 #[error("Config error: {0}")]
68 Config(String),
69}
70
71#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
72#[serde(rename_all = "lowercase")]
73pub enum BrokerType {
74 #[serde(alias = "active_mq")]
75 ActiveMq,
76 Artemis,
77 Generic,
78}
79
80impl BrokerType {
81 pub fn as_env_str(&self) -> &'static str {
82 match self {
83 BrokerType::ActiveMq => "activemq",
84 BrokerType::Artemis => "artemis",
85 BrokerType::Generic => "generic",
86 }
87 }
88}
89
90impl std::str::FromStr for BrokerType {
91 type Err = BridgeError;
92
93 fn from_str(s: &str) -> Result<Self, Self::Err> {
94 match s.to_lowercase().as_str() {
95 "activemq" => Ok(BrokerType::ActiveMq),
96 "artemis" => Ok(BrokerType::Artemis),
97 "generic" => Ok(BrokerType::Generic),
98 other => Err(BridgeError::Config(format!("unknown broker type: {other}"))), }
100 }
101}
102
103#[derive(Debug)]
106pub struct CxfProfileEnvVars {
107 pub name: String,
108 pub wsdl_path: String,
109 pub service_name: String,
110 pub port_name: String,
111 pub address: Option<String>,
112 pub keystore_path: Option<String>,
113 pub keystore_password: Option<Redacted<String>>,
114 pub truststore_path: Option<String>,
115 pub truststore_password: Option<Redacted<String>>,
116 pub sig_username: Option<String>,
117 pub sig_password: Option<Redacted<String>>,
118 pub enc_username: Option<String>,
119 pub security_actions_out: Option<String>,
120 pub security_actions_in: Option<String>,
121 pub signature_algorithm: Option<String>,
122 pub signature_digest_algorithm: Option<String>,
123 pub signature_c14n_algorithm: Option<String>,
124 pub signature_parts: Option<String>,
125}
126
127impl CxfProfileEnvVars {
128 pub fn to_env_vars(&self) -> Vec<(String, String)> {
129 let prefix = format!("CXF_PROFILE_{}_", self.name.to_uppercase());
130 let mut vars = vec![
131 (format!("{}WSDL_PATH", prefix), self.wsdl_path.clone()),
132 (format!("{}SERVICE_NAME", prefix), self.service_name.clone()),
133 (format!("{}PORT_NAME", prefix), self.port_name.clone()),
134 ];
135
136 if let Some(ref v) = self.address {
137 vars.push((format!("{}ADDRESS", prefix), v.clone()));
138 }
139 if let Some(ref v) = self.keystore_path {
140 vars.push((format!("{}KEYSTORE_PATH", prefix), v.clone()));
141 }
142 if let Some(ref v) = self.keystore_password {
143 vars.push((format!("{}KEYSTORE_PASSWORD", prefix), (**v).clone()));
144 }
145 if let Some(ref v) = self.truststore_path {
146 vars.push((format!("{}TRUSTSTORE_PATH", prefix), v.clone()));
147 }
148 if let Some(ref v) = self.truststore_password {
149 vars.push((format!("{}TRUSTSTORE_PASSWORD", prefix), (**v).clone()));
150 }
151 if let Some(ref v) = self.sig_username {
152 vars.push((format!("{}SIG_USERNAME", prefix), v.clone()));
153 }
154 if let Some(ref v) = self.sig_password {
155 vars.push((format!("{}SIG_PASSWORD", prefix), (**v).clone()));
156 }
157 if let Some(ref v) = self.enc_username {
158 vars.push((format!("{}ENC_USERNAME", prefix), v.clone()));
159 }
160 if let Some(ref v) = self.security_actions_out {
161 vars.push((format!("{}SECURITY_ACTIONS_OUT", prefix), v.clone()));
162 }
163 if let Some(ref v) = self.security_actions_in {
164 vars.push((format!("{}SECURITY_ACTIONS_IN", prefix), v.clone()));
165 }
166 if let Some(ref v) = self.signature_algorithm {
167 vars.push((format!("{}SIGNATURE_ALGORITHM", prefix), v.clone()));
168 }
169 if let Some(ref v) = self.signature_digest_algorithm {
170 vars.push((format!("{}SIGNATURE_DIGEST_ALGORITHM", prefix), v.clone()));
171 }
172 if let Some(ref v) = self.signature_c14n_algorithm {
173 vars.push((format!("{}SIGNATURE_C14N_ALGORITHM", prefix), v.clone()));
174 }
175 if let Some(ref v) = self.signature_parts {
176 vars.push((format!("{}SIGNATURE_PARTS", prefix), v.clone()));
177 }
178
179 vars
180 }
181}
182
183#[derive(Debug)]
186pub struct BridgeProcessConfig {
187 pub spec: &'static BridgeSpec,
188 pub binary_path: PathBuf,
189 pub broker_url: String,
190 pub broker_type: BrokerType,
191 pub username: Option<String>,
192 pub password: Option<Redacted<String>>,
193 pub start_timeout_ms: u64,
194 pub env_vars: Vec<(String, String)>,
195}
196
197impl BridgeProcessConfig {
198 pub fn jms(
200 binary_path: PathBuf,
201 broker_url: String,
202 broker_type: BrokerType,
203 username: Option<String>,
204 password: Option<Redacted<String>>,
205 start_timeout_ms: u64,
206 ) -> Self {
207 let mut env_vars = vec![
208 ("BRIDGE_BROKER_URL".to_string(), broker_url.clone()),
209 (
210 "BRIDGE_BROKER_TYPE".to_string(),
211 broker_type.as_env_str().to_string(),
212 ),
213 ];
214 if let Some(u) = &username {
215 env_vars.push(("BRIDGE_USERNAME".to_string(), u.clone()));
216 }
217 if let Some(p) = &password {
218 env_vars.push(("BRIDGE_PASSWORD".to_string(), (**p).clone()));
219 }
220 Self {
221 spec: &JMS_BRIDGE,
222 binary_path,
223 broker_url,
224 broker_type,
225 username,
226 password,
227 start_timeout_ms,
228 env_vars,
229 }
230 }
231
232 pub fn xml(binary_path: PathBuf, start_timeout_ms: u64) -> Self {
234 Self {
235 spec: &XML_BRIDGE,
236 binary_path,
237 broker_url: String::new(),
238 broker_type: BrokerType::Generic,
239 username: None,
240 password: None,
241 start_timeout_ms,
242 env_vars: vec![],
243 }
244 }
245
246 pub fn cxf_profiles(
249 binary_path: PathBuf,
250 profiles: &[CxfProfileEnvVars],
251 start_timeout_ms: u64,
252 ) -> Self {
253 let profile_names: Vec<String> = profiles.iter().map(|p| p.name.clone()).collect();
254 let mut env_vars = vec![("CXF_PROFILES".to_string(), profile_names.join(","))];
255
256 for profile in profiles {
257 env_vars.extend(profile.to_env_vars());
258 }
259
260 Self {
261 spec: &CXF_BRIDGE,
262 binary_path,
263 broker_url: String::new(),
264 broker_type: BrokerType::Generic,
265 username: None,
266 password: None,
267 start_timeout_ms,
268 env_vars,
269 }
270 }
271
272 pub fn validate(&self) -> Result<(), String> {
273 if self.start_timeout_ms == 0 {
274 return Err("start_timeout_ms must be > 0".to_string());
275 }
276 Ok(())
277 }
278}
279
280pub struct BridgeProcess {
281 child: tokio::process::Child,
282 grpc_port: u16,
283 token: CancellationToken,
284 handle: Option<JoinHandle<()>>,
285}
286
287impl BridgeProcess {
288 pub fn grpc_port(&self) -> u16 {
289 self.grpc_port
290 }
291
292 pub async fn start(config: &BridgeProcessConfig) -> Result<Self, BridgeError> {
298 use tokio::io::AsyncBufReadExt;
299 use tokio::process::Command;
300 use tokio::time::{Duration, timeout};
301
302 config.validate().map_err(BridgeError::Config)?;
303
304 let free_port = {
306 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
307 listener.local_addr()?.port()
308 };
309
310 let stderr_stdio: std::process::Stdio =
312 if let Ok(log_dir) = std::env::var("CAMEL_BRIDGE_LOG_STDERR") {
313 let log_filename = config
314 .spec
315 .log_file_template
316 .replace("{pid}", &std::process::id().to_string());
317 let log_path = if log_dir.is_empty() {
318 format!("/tmp/{log_filename}")
319 } else {
320 format!("{log_dir}/{log_filename}")
321 };
322 match std::fs::File::create(&log_path) {
323 Ok(f) => {
324 eprintln!("[camel-bridge] stderr → {}", log_path);
325 f.into()
326 }
327 Err(e) => {
328 eprintln!(
329 "[camel-bridge] failed to create log file {}: {}",
330 log_path, e
331 );
332 std::process::Stdio::inherit()
333 }
334 }
335 } else {
336 std::process::Stdio::inherit()
337 };
338
339 let mut command = Command::new(&config.binary_path);
340 command
341 .env("QUARKUS_GRPC_SERVER_PORT", free_port.to_string())
342 .env("QUARKUS_HTTP_PORT", "0")
345 .stdout(std::process::Stdio::piped())
346 .stderr(stderr_stdio);
347
348 for (key, value) in &config.env_vars {
350 command.env(key, value);
351 }
352
353 let mut child = command.spawn()?;
354
355 let stdout = child.stdout.take().ok_or(BridgeError::StdoutClosed)?;
356 let mut reader = tokio::io::BufReader::new(stdout).lines();
357
358 let port = timeout(Duration::from_millis(config.start_timeout_ms), async {
359 while let Some(line) = reader.next_line().await? {
360 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line)
361 && v.get("status").and_then(|s| s.as_str()) == Some("ready")
362 {
363 if let Some(p) = v.get("port").and_then(|p| p.as_u64()) {
364 return Ok(p as u16);
365 }
366 tracing::error!("bridge ready message malformed: {line}");
368 return Err(BridgeError::BadReadyMessage(line));
369 }
370 }
371 tracing::error!("bridge stdout closed before ready message");
373 Err(BridgeError::StdoutClosed)
374 })
375 .await
376 .map_err(|_| {
377 let msg = format!(
378 "{} failed to start: health check timeout after {}ms",
379 config.spec.name, config.start_timeout_ms
380 );
381 tracing::error!("{msg}");
383 BridgeError::Timeout(msg)
384 })??;
385
386 let token = CancellationToken::new();
388 let child_token = token.clone();
389 let handle = tokio::spawn(async move {
390 loop {
391 tokio::select! {
392 _ = child_token.cancelled() => break,
393 line = reader.next_line() => {
394 match line {
395 Ok(Some(line)) => tracing::debug!(target: "camel_bridge::child", "{}", line),
396 Ok(None) | Err(_) => break,
397 }
398 }
399 }
400 }
401 });
402
403 Ok(BridgeProcess {
404 child,
405 grpc_port: port,
406 token,
407 handle: Some(handle),
408 })
409 }
410
411 pub async fn stop(mut self) -> Result<(), BridgeError> {
413 use tokio::time::{Duration, sleep};
414
415 self.token.cancel();
416
417 if let Some(handle) = self.handle.take() {
418 let join_result = tokio::time::timeout(Duration::from_secs(5), handle).await;
419 if join_result.is_err() {
420 tracing::warn!("bridge stdout drain task did not exit after cancellation");
421 }
422 }
423
424 #[cfg(unix)]
426 {
427 let pid = self.child.id().unwrap_or(0);
428 if pid > 0 {
429 unsafe {
431 libc::kill(pid as i32, libc::SIGTERM);
432 }
433 }
434 }
435
436 #[cfg(not(unix))]
438 let _ = self.child.start_kill();
439
440 tokio::select! {
442 result = self.child.wait() => {
443 result?;
444 }
445 _ = sleep(Duration::from_secs(5)) => {
446 let _ = self.child.start_kill();
447 self.child.wait().await?;
448 }
449 }
450 Ok(())
451 }
452}
453
454impl Drop for BridgeProcess {
455 fn drop(&mut self) {
456 self.token.cancel();
457 let _ = self.child.start_kill();
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465
466 #[test]
467 fn broker_type_from_str_activemq() {
468 assert_eq!(
469 "activemq".parse::<BrokerType>().unwrap(),
470 BrokerType::ActiveMq
471 );
472 assert_eq!(
473 "ACTIVEMQ".parse::<BrokerType>().unwrap(),
474 BrokerType::ActiveMq
475 );
476 }
477
478 #[test]
479 fn broker_type_from_str_artemis() {
480 assert_eq!(
481 "artemis".parse::<BrokerType>().unwrap(),
482 BrokerType::Artemis
483 );
484 }
485
486 #[test]
487 fn broker_type_from_str_generic() {
488 assert_eq!(
489 "generic".parse::<BrokerType>().unwrap(),
490 BrokerType::Generic
491 );
492 }
493
494 #[test]
495 fn broker_type_from_str_unknown_returns_err() {
496 assert!("ibmmq".parse::<BrokerType>().is_err());
497 assert!("UnknownBroker".parse::<BrokerType>().is_err());
498 }
499
500 #[test]
501 fn broker_type_env_str() {
502 assert_eq!(BrokerType::ActiveMq.as_env_str(), "activemq");
503 assert_eq!(BrokerType::Artemis.as_env_str(), "artemis");
504 assert_eq!(BrokerType::Generic.as_env_str(), "generic");
505 }
506
507 #[test]
508 fn jms_constructor_uses_jms_spec() {
509 let cfg = BridgeProcessConfig::jms(
510 PathBuf::from("/tmp/jms-bridge"),
511 "tcp://localhost:61616".to_string(),
512 BrokerType::ActiveMq,
513 Some("user".to_string()),
514 Some(Redacted::new("pass".to_string())),
515 1000,
516 );
517 assert_eq!(cfg.spec.name, "jms-bridge");
518 }
519
520 #[test]
521 fn xml_constructor_uses_xml_spec() {
522 let cfg = BridgeProcessConfig::xml(PathBuf::from("/tmp/xml-bridge"), 1000);
523 assert_eq!(cfg.spec.name, "xml-bridge");
524 }
525
526 #[test]
527 fn cxf_profiles_generates_cxf_profiles_env_var() {
528 let profiles = vec![
529 CxfProfileEnvVars {
530 name: "baleares".to_string(),
531 wsdl_path: "/a.wsdl".to_string(),
532 service_name: "Svc".to_string(),
533 port_name: "Port".to_string(),
534 address: None,
535 keystore_path: None,
536 keystore_password: None,
537 truststore_path: None,
538 truststore_password: None,
539 sig_username: None,
540 sig_password: None,
541 enc_username: None,
542 security_actions_out: None,
543 security_actions_in: None,
544 signature_algorithm: None,
545 signature_digest_algorithm: None,
546 signature_c14n_algorithm: None,
547 signature_parts: None,
548 },
549 CxfProfileEnvVars {
550 name: "extremadura".to_string(),
551 wsdl_path: "/b.wsdl".to_string(),
552 service_name: "Svc2".to_string(),
553 port_name: "Port2".to_string(),
554 address: Some("http://host:9090/ws".to_string()),
555 keystore_path: Some("/b.jks".to_string()),
556 keystore_password: Some(Redacted::new("pass".to_string())),
557 truststore_path: None,
558 truststore_password: None,
559 sig_username: Some("cert".to_string()),
560 sig_password: Some(Redacted::new("sig_pass".to_string())),
561 enc_username: None,
562 security_actions_out: Some("Timestamp Signature".to_string()),
563 security_actions_in: Some("Timestamp Signature".to_string()),
564 signature_algorithm: None,
565 signature_digest_algorithm: None,
566 signature_c14n_algorithm: None,
567 signature_parts: None,
568 },
569 ];
570
571 let cfg =
572 BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
573
574 assert_eq!(cfg.spec.name, "cxf-bridge");
575 assert!(cfg.broker_url.is_empty());
576 assert_eq!(cfg.broker_type, BrokerType::Generic);
577 assert!(cfg.username.is_none());
578 assert!(cfg.password.is_none());
579
580 let profiles_var = cfg
582 .env_vars
583 .iter()
584 .find(|(k, _)| k == "CXF_PROFILES")
585 .expect("CXF_PROFILES env var must exist");
586 assert_eq!(profiles_var.1, "baleares,extremadura");
587
588 assert!(
590 cfg.env_vars
591 .iter()
592 .any(|(k, v)| k == "CXF_PROFILE_BALEARES_WSDL_PATH" && v == "/a.wsdl")
593 );
594 assert!(
595 cfg.env_vars
596 .iter()
597 .any(|(k, v)| k == "CXF_PROFILE_BALEARES_SERVICE_NAME" && v == "Svc")
598 );
599 assert!(
600 cfg.env_vars
601 .iter()
602 .any(|(k, v)| k == "CXF_PROFILE_BALEARES_PORT_NAME" && v == "Port")
603 );
604 assert!(
605 !cfg.env_vars
606 .iter()
607 .any(|(k, _)| k == "CXF_PROFILE_BALEARES_ADDRESS")
608 );
609
610 assert!(
612 cfg.env_vars
613 .iter()
614 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_WSDL_PATH" && v == "/b.wsdl")
615 );
616 assert!(
617 cfg.env_vars
618 .iter()
619 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_ADDRESS" && v == "http://host:9090/ws")
620 );
621 assert!(
622 cfg.env_vars
623 .iter()
624 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PATH" && v == "/b.jks")
625 );
626 assert!(
627 cfg.env_vars
628 .iter()
629 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PASSWORD" && v == "pass")
630 );
631 assert!(
632 cfg.env_vars
633 .iter()
634 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_USERNAME" && v == "cert")
635 );
636 assert!(
637 cfg.env_vars
638 .iter()
639 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_PASSWORD" && v == "sig_pass")
640 );
641 assert!(
642 cfg.env_vars
643 .iter()
644 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SECURITY_ACTIONS_OUT"
645 && v == "Timestamp Signature")
646 );
647 }
648
649 #[test]
650 fn cxf_profiles_single_profile_no_security() {
651 let profiles = vec![CxfProfileEnvVars {
652 name: "test".to_string(),
653 wsdl_path: "service.wsdl".to_string(),
654 service_name: "{http://example.com}Service".to_string(),
655 port_name: "{http://example.com}Port".to_string(),
656 address: None,
657 keystore_path: None,
658 keystore_password: None,
659 truststore_path: None,
660 truststore_password: None,
661 sig_username: None,
662 sig_password: None,
663 enc_username: None,
664 security_actions_out: None,
665 security_actions_in: None,
666 signature_algorithm: None,
667 signature_digest_algorithm: None,
668 signature_c14n_algorithm: None,
669 signature_parts: None,
670 }];
671
672 let cfg =
673 BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
674
675 assert_eq!(cfg.spec.name, "cxf-bridge");
676 assert_eq!(cfg.env_vars.len(), 4);
678 assert_eq!(cfg.env_vars[0].0, "CXF_PROFILES");
679 assert_eq!(cfg.env_vars[0].1, "test");
680 assert_eq!(cfg.env_vars[1].0, "CXF_PROFILE_TEST_WSDL_PATH");
681 assert_eq!(cfg.env_vars[1].1, "service.wsdl");
682 assert_eq!(cfg.env_vars[2].0, "CXF_PROFILE_TEST_SERVICE_NAME");
683 assert_eq!(cfg.env_vars[2].1, "{http://example.com}Service");
684 assert_eq!(cfg.env_vars[3].0, "CXF_PROFILE_TEST_PORT_NAME");
685 assert_eq!(cfg.env_vars[3].1, "{http://example.com}Port");
686 }
687
688 #[test]
689 fn profile_env_vars_to_env_vars_includes_all_fields() {
690 let vars = CxfProfileEnvVars {
691 name: "full".to_string(),
692 wsdl_path: "/wsdl".to_string(),
693 service_name: "Svc".to_string(),
694 port_name: "Port".to_string(),
695 address: Some("http://host:8080".to_string()),
696 keystore_path: Some("/ks.jks".to_string()),
697 keystore_password: Some(Redacted::new("ks_pass".to_string())),
698 truststore_path: Some("/ts.jks".to_string()),
699 truststore_password: Some(Redacted::new("ts_pass".to_string())),
700 sig_username: Some("user".to_string()),
701 sig_password: Some(Redacted::new("sig_pass".to_string())),
702 enc_username: None,
703 security_actions_out: Some("Timestamp Signature".to_string()),
704 security_actions_in: Some("Timestamp".to_string()),
705 signature_algorithm: None,
706 signature_digest_algorithm: None,
707 signature_c14n_algorithm: None,
708 signature_parts: None,
709 };
710
711 let env = vars.to_env_vars();
712 assert_eq!(env.len(), 12);
714
715 let keys: Vec<&str> = env.iter().map(|(k, _)| k.as_str()).collect();
716 assert!(keys.contains(&"CXF_PROFILE_FULL_WSDL_PATH"));
717 assert!(keys.contains(&"CXF_PROFILE_FULL_SERVICE_NAME"));
718 assert!(keys.contains(&"CXF_PROFILE_FULL_PORT_NAME"));
719 assert!(keys.contains(&"CXF_PROFILE_FULL_ADDRESS"));
720 assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PATH"));
721 assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PASSWORD"));
722 assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PATH"));
723 assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PASSWORD"));
724 assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_USERNAME"));
725 assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_PASSWORD"));
726 assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_OUT"));
727 assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_IN"));
728 }
729
730 #[test]
731 fn test_start_timeout_zero_rejected() {
732 let config = BridgeProcessConfig::jms(
733 PathBuf::from("/usr/bin/echo"),
734 "tcp://localhost:61616".to_string(),
735 BrokerType::ActiveMq,
736 None,
737 None,
738 0,
739 );
740 let result = config.validate();
741 assert!(result.is_err());
742 }
743
744 #[test]
745 fn test_bridge_rejects_zero_start_timeout() {
746 let config = BridgeProcessConfig::jms(
747 PathBuf::from("/usr/bin/echo"),
748 "tcp://localhost:61616".to_string(),
749 BrokerType::ActiveMq,
750 None,
751 None,
752 0,
753 );
754 assert!(config.validate().is_err());
755 }
756
757 #[tokio::test]
758 async fn test_bridge_stop_completes() {
759 use tokio::process::Command;
760 use tokio::time::{Duration, timeout};
761
762 let child = Command::new("sh")
763 .arg("-c")
764 .arg("trap '' TERM; while true; do echo tick; sleep 1; done")
765 .stdout(std::process::Stdio::null())
766 .spawn()
767 .expect("must spawn test child process");
768
769 let bridge = BridgeProcess {
770 child,
771 grpc_port: 0,
772 token: CancellationToken::new(),
773 handle: None,
774 };
775
776 let result = timeout(Duration::from_secs(10), bridge.stop()).await;
777 assert!(result.is_ok(), "stop() must complete within 10s");
778 }
779
780 #[test]
783 fn redacted_debug_displays_redacted() {
784 let r = Redacted::new("secret_password".to_string());
785 assert_eq!(format!("{r:?}"), "[REDACTED]");
786 }
787
788 #[test]
789 fn redacted_display_displays_redacted() {
790 let r = Redacted::new("secret_password".to_string());
791 assert_eq!(format!("{r}"), "[REDACTED]");
792 }
793
794 #[test]
795 fn redacted_deref_gives_inner_value() {
796 let r = Redacted::new("secret".to_string());
797 assert_eq!(&*r, "secret");
798 }
799
800 #[test]
801 fn redacted_into_inner_returns_value() {
802 let r = Redacted::new("secret".to_string());
803 assert_eq!(r.into_inner(), "secret");
804 }
805
806 #[test]
807 fn redacted_clone_works() {
808 let r = Redacted::new("secret".to_string());
809 let c = r.clone();
810 assert_eq!(&*c, "secret");
811 assert_eq!(format!("{c:?}"), "[REDACTED]");
812 }
813
814 #[test]
815 fn bridge_process_config_debug_redacts_password() {
816 let cfg = BridgeProcessConfig::jms(
817 PathBuf::from("/tmp/jms-bridge"),
818 "tcp://localhost:61616".to_string(),
819 BrokerType::ActiveMq,
820 Some("user".to_string()),
821 Some(Redacted::new("super_secret".to_string())),
822 1000,
823 );
824 let password_debug = format!("{:?}", cfg.password); assert!(
829 !password_debug.contains("super_secret"),
830 "Password field must not leak in Debug: {password_debug}"
831 );
832 assert_eq!(
833 password_debug, "Some([REDACTED])",
834 "Password field must show [REDACTED]: {password_debug}"
835 );
836 }
837
838 #[test]
839 fn cxf_profile_debug_redacts_passwords() {
840 let profile = CxfProfileEnvVars {
841 name: "test".to_string(),
842 wsdl_path: "/a.wsdl".to_string(),
843 service_name: "Svc".to_string(),
844 port_name: "Port".to_string(),
845 address: None,
846 keystore_path: None,
847 keystore_password: Some(Redacted::new("ks_secret_val".to_string())),
848 truststore_path: None,
849 truststore_password: Some(Redacted::new("ts_secret_val".to_string())),
850 sig_username: None,
851 sig_password: Some(Redacted::new("sig_secret_val".to_string())),
852 enc_username: None,
853 security_actions_out: None,
854 security_actions_in: None,
855 signature_algorithm: None,
856 signature_digest_algorithm: None,
857 signature_c14n_algorithm: None,
858 signature_parts: None,
859 };
860 let debug_output = format!("{profile:?}");
861 assert!(
862 !debug_output.contains("ks_secret_val")
863 && !debug_output.contains("ts_secret_val")
864 && !debug_output.contains("sig_secret_val"),
865 "Debug must not contain passwords: {debug_output}"
866 );
867 }
868}