1use std::fmt;
2use std::ops::Deref;
3use std::path::PathBuf;
4use thiserror::Error;
5use tokio::task::JoinHandle;
6use tokio_util::sync::CancellationToken;
7
8use crate::spec::{BridgeSpec, CXF_BRIDGE, JMS_BRIDGE, XML_BRIDGE};
9
10#[derive(Clone)]
17pub struct Redacted<T>(T);
18
19impl<T> Redacted<T> {
20 pub fn new(value: T) -> Self {
21 Self(value)
22 }
23
24 pub fn into_inner(self) -> T {
25 self.0
26 }
27}
28
29impl<T> fmt::Debug for Redacted<T> {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 write!(f, "[REDACTED]")
32 }
33}
34
35impl<T> fmt::Display for Redacted<T> {
36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37 write!(f, "[REDACTED]")
38 }
39}
40
41impl<T> Deref for Redacted<T> {
42 type Target = T;
43
44 fn deref(&self) -> &Self::Target {
45 &self.0
46 }
47}
48
49#[derive(Debug, Error)]
50pub enum BridgeError {
51 #[error("IO error: {0}")]
52 Io(#[from] std::io::Error),
53 #[error("Bridge timed out: {0}")]
54 Timeout(String),
55 #[error("Bridge stdout closed before ready message")]
56 StdoutClosed,
57 #[error("Bridge ready message malformed: {0}")]
58 BadReadyMessage(String),
59 #[error("Download failed: {0}")]
60 Download(String),
61 #[error("Checksum mismatch: expected {expected}, got {actual}")]
62 ChecksumMismatch { expected: String, actual: String },
63 #[error("URL not allowed: {0}")]
64 UrlNotAllowed(String),
65 #[error("Transport error: {0}")]
66 Transport(String),
67 #[error("Config error: {0}")]
68 Config(String),
69}
70
71#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
72#[serde(rename_all = "lowercase")]
73pub enum BrokerType {
74 #[serde(alias = "active_mq")]
75 ActiveMq,
76 Artemis,
77 Generic,
78}
79
80impl BrokerType {
81 pub fn as_env_str(&self) -> &'static str {
82 match self {
83 BrokerType::ActiveMq => "activemq",
84 BrokerType::Artemis => "artemis",
85 BrokerType::Generic => "generic",
86 }
87 }
88}
89
90impl std::str::FromStr for BrokerType {
91 type Err = BridgeError;
92
93 fn from_str(s: &str) -> Result<Self, Self::Err> {
94 match s.to_lowercase().as_str() {
95 "activemq" => Ok(BrokerType::ActiveMq),
96 "artemis" => Ok(BrokerType::Artemis),
97 "generic" => Ok(BrokerType::Generic),
98 other => Err(BridgeError::Config(format!("unknown broker type: {other}"))), }
100 }
101}
102
103#[derive(Debug)]
106pub struct CxfProfileEnvVars {
107 pub name: String,
108 pub wsdl_path: String,
109 pub service_name: String,
110 pub port_name: String,
111 pub address: Option<String>,
112 pub keystore_path: Option<String>,
113 pub keystore_password: Option<Redacted<String>>,
114 pub truststore_path: Option<String>,
115 pub truststore_password: Option<Redacted<String>>,
116 pub sig_username: Option<String>,
117 pub sig_password: Option<Redacted<String>>,
118 pub enc_username: Option<String>,
119 pub security_actions_out: Option<String>,
120 pub security_actions_in: Option<String>,
121 pub signature_algorithm: Option<String>,
122 pub signature_digest_algorithm: Option<String>,
123 pub signature_c14n_algorithm: Option<String>,
124 pub signature_parts: Option<String>,
125}
126
127impl CxfProfileEnvVars {
128 pub fn to_env_vars(&self) -> Vec<(String, String)> {
129 let prefix = format!("CXF_PROFILE_{}_", self.name.to_uppercase());
130 let mut vars = vec![
131 (format!("{}WSDL_PATH", prefix), self.wsdl_path.clone()),
132 (format!("{}SERVICE_NAME", prefix), self.service_name.clone()),
133 (format!("{}PORT_NAME", prefix), self.port_name.clone()),
134 ];
135
136 if let Some(ref v) = self.address {
137 vars.push((format!("{}ADDRESS", prefix), v.clone()));
138 }
139 if let Some(ref v) = self.keystore_path {
140 vars.push((format!("{}KEYSTORE_PATH", prefix), v.clone()));
141 }
142 if let Some(ref v) = self.keystore_password {
143 vars.push((format!("{}KEYSTORE_PASSWORD", prefix), (**v).clone()));
144 }
145 if let Some(ref v) = self.truststore_path {
146 vars.push((format!("{}TRUSTSTORE_PATH", prefix), v.clone()));
147 }
148 if let Some(ref v) = self.truststore_password {
149 vars.push((format!("{}TRUSTSTORE_PASSWORD", prefix), (**v).clone()));
150 }
151 if let Some(ref v) = self.sig_username {
152 vars.push((format!("{}SIG_USERNAME", prefix), v.clone()));
153 }
154 if let Some(ref v) = self.sig_password {
155 vars.push((format!("{}SIG_PASSWORD", prefix), (**v).clone()));
156 }
157 if let Some(ref v) = self.enc_username {
158 vars.push((format!("{}ENC_USERNAME", prefix), v.clone()));
159 }
160 if let Some(ref v) = self.security_actions_out {
161 vars.push((format!("{}SECURITY_ACTIONS_OUT", prefix), v.clone()));
162 }
163 if let Some(ref v) = self.security_actions_in {
164 vars.push((format!("{}SECURITY_ACTIONS_IN", prefix), v.clone()));
165 }
166 if let Some(ref v) = self.signature_algorithm {
167 vars.push((format!("{}SIGNATURE_ALGORITHM", prefix), v.clone()));
168 }
169 if let Some(ref v) = self.signature_digest_algorithm {
170 vars.push((format!("{}SIGNATURE_DIGEST_ALGORITHM", prefix), v.clone()));
171 }
172 if let Some(ref v) = self.signature_c14n_algorithm {
173 vars.push((format!("{}SIGNATURE_C14N_ALGORITHM", prefix), v.clone()));
174 }
175 if let Some(ref v) = self.signature_parts {
176 vars.push((format!("{}SIGNATURE_PARTS", prefix), v.clone()));
177 }
178
179 vars
180 }
181}
182
183#[derive(Debug)]
186pub struct BridgeProcessConfig {
187 pub spec: &'static BridgeSpec,
188 pub binary_path: PathBuf,
189 pub broker_url: String,
190 pub broker_type: BrokerType,
191 pub username: Option<String>,
192 pub password: Option<Redacted<String>>,
193 pub start_timeout_ms: u64,
194 pub env_vars: Vec<(String, String)>,
195}
196
197impl BridgeProcessConfig {
198 pub fn jms(
200 binary_path: PathBuf,
201 broker_url: String,
202 broker_type: BrokerType,
203 username: Option<String>,
204 password: Option<Redacted<String>>,
205 start_timeout_ms: u64,
206 ) -> Self {
207 let mut env_vars = vec![
208 ("BRIDGE_BROKER_URL".to_string(), broker_url.clone()),
209 (
210 "BRIDGE_BROKER_TYPE".to_string(),
211 broker_type.as_env_str().to_string(),
212 ),
213 ];
214 if let Some(u) = &username {
215 env_vars.push(("BRIDGE_USERNAME".to_string(), u.clone()));
216 }
217 if let Some(p) = &password {
218 env_vars.push(("BRIDGE_PASSWORD".to_string(), (**p).clone()));
219 }
220 Self {
221 spec: &JMS_BRIDGE,
222 binary_path,
223 broker_url,
224 broker_type,
225 username,
226 password,
227 start_timeout_ms,
228 env_vars,
229 }
230 }
231
232 pub fn xml(binary_path: PathBuf, start_timeout_ms: u64) -> Self {
234 Self {
235 spec: &XML_BRIDGE,
236 binary_path,
237 broker_url: String::new(),
238 broker_type: BrokerType::Generic,
239 username: None,
240 password: None,
241 start_timeout_ms,
242 env_vars: vec![],
243 }
244 }
245
246 pub fn cxf_profiles(
249 binary_path: PathBuf,
250 profiles: &[CxfProfileEnvVars],
251 start_timeout_ms: u64,
252 ) -> Self {
253 let profile_names: Vec<String> = profiles.iter().map(|p| p.name.clone()).collect();
254 let mut env_vars = vec![("CXF_PROFILES".to_string(), profile_names.join(","))];
255
256 for profile in profiles {
257 env_vars.extend(profile.to_env_vars());
258 }
259
260 Self {
261 spec: &CXF_BRIDGE,
262 binary_path,
263 broker_url: String::new(),
264 broker_type: BrokerType::Generic,
265 username: None,
266 password: None,
267 start_timeout_ms,
268 env_vars,
269 }
270 }
271
272 pub fn validate(&self) -> Result<(), String> {
273 if self.start_timeout_ms == 0 {
274 return Err("start_timeout_ms must be > 0".to_string());
275 }
276 Ok(())
277 }
278}
279
280pub struct BridgeProcess {
281 child: tokio::process::Child,
282 grpc_port: u16,
283 token: CancellationToken,
284 handle: Option<JoinHandle<()>>,
285}
286
287impl BridgeProcess {
288 pub fn grpc_port(&self) -> u16 {
289 self.grpc_port
290 }
291
292 pub async fn start(config: &BridgeProcessConfig) -> Result<Self, BridgeError> {
298 use tokio::io::AsyncBufReadExt;
299 use tokio::process::Command;
300 use tokio::time::{Duration, timeout};
301
302 config.validate().map_err(BridgeError::Config)?;
303
304 let free_port = {
306 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
307 listener.local_addr()?.port()
308 };
309
310 let stderr_stdio: std::process::Stdio =
312 if let Ok(log_dir) = std::env::var("CAMEL_BRIDGE_LOG_STDERR") {
313 let log_filename = config
314 .spec
315 .log_file_template
316 .replace("{pid}", &std::process::id().to_string());
317 let log_path = if log_dir.is_empty() {
318 format!("/tmp/{log_filename}")
319 } else {
320 format!("{log_dir}/{log_filename}")
321 };
322 match std::fs::File::create(&log_path) {
323 Ok(f) => {
324 eprintln!("[camel-bridge] stderr → {}", log_path);
325 f.into()
326 }
327 Err(e) => {
328 eprintln!(
329 "[camel-bridge] failed to create log file {}: {}",
330 log_path, e
331 );
332 std::process::Stdio::inherit()
333 }
334 }
335 } else {
336 std::process::Stdio::inherit()
337 };
338
339 let mut command = Command::new(&config.binary_path);
340 command
341 .env("QUARKUS_GRPC_SERVER_PORT", free_port.to_string())
342 .env("QUARKUS_HTTP_PORT", "0")
345 .stdout(std::process::Stdio::piped())
346 .stderr(stderr_stdio);
347
348 for (key, value) in &config.env_vars {
350 command.env(key, value);
351 }
352
353 let mut child = command.spawn()?;
354
355 let stdout = child.stdout.take().ok_or(BridgeError::StdoutClosed)?;
356 let mut reader = tokio::io::BufReader::new(stdout).lines();
357
358 let port = timeout(Duration::from_millis(config.start_timeout_ms), async {
359 while let Some(line) = reader.next_line().await? {
360 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line)
361 && v.get("status").and_then(|s| s.as_str()) == Some("ready")
362 {
363 if let Some(p) = v.get("port").and_then(|p| p.as_u64()) {
364 return Ok(p as u16);
365 }
366 tracing::error!("bridge ready message malformed: {line}");
367 return Err(BridgeError::BadReadyMessage(line));
368 }
369 }
370 tracing::error!("bridge stdout closed before ready message");
371 Err(BridgeError::StdoutClosed)
372 })
373 .await
374 .map_err(|_| {
375 let msg = format!(
376 "{} failed to start: health check timeout after {}ms",
377 config.spec.name, config.start_timeout_ms
378 );
379 tracing::error!("{msg}");
380 BridgeError::Timeout(msg)
381 })??;
382
383 let token = CancellationToken::new();
385 let child_token = token.clone();
386 let handle = tokio::spawn(async move {
387 loop {
388 tokio::select! {
389 _ = child_token.cancelled() => break,
390 line = reader.next_line() => {
391 match line {
392 Ok(Some(line)) => tracing::debug!(target: "camel_bridge::child", "{}", line),
393 Ok(None) | Err(_) => break,
394 }
395 }
396 }
397 }
398 });
399
400 Ok(BridgeProcess {
401 child,
402 grpc_port: port,
403 token,
404 handle: Some(handle),
405 })
406 }
407
408 pub async fn stop(mut self) -> Result<(), BridgeError> {
410 use tokio::time::{Duration, sleep};
411
412 self.token.cancel();
413
414 if let Some(handle) = self.handle.take() {
415 let join_result = tokio::time::timeout(Duration::from_secs(5), handle).await;
416 if join_result.is_err() {
417 tracing::warn!("bridge stdout drain task did not exit after cancellation");
418 }
419 }
420
421 #[cfg(unix)]
423 {
424 let pid = self.child.id().unwrap_or(0);
425 if pid > 0 {
426 unsafe {
428 libc::kill(pid as i32, libc::SIGTERM);
429 }
430 }
431 }
432
433 #[cfg(not(unix))]
435 let _ = self.child.start_kill();
436
437 tokio::select! {
439 result = self.child.wait() => {
440 result?;
441 }
442 _ = sleep(Duration::from_secs(5)) => {
443 let _ = self.child.start_kill();
444 self.child.wait().await?;
445 }
446 }
447 Ok(())
448 }
449}
450
451impl Drop for BridgeProcess {
452 fn drop(&mut self) {
453 self.token.cancel();
454 let _ = self.child.start_kill();
456 }
457}
458
459#[cfg(test)]
460mod tests {
461 use super::*;
462
463 #[test]
464 fn broker_type_from_str_activemq() {
465 assert_eq!(
466 "activemq".parse::<BrokerType>().unwrap(),
467 BrokerType::ActiveMq
468 );
469 assert_eq!(
470 "ACTIVEMQ".parse::<BrokerType>().unwrap(),
471 BrokerType::ActiveMq
472 );
473 }
474
475 #[test]
476 fn broker_type_from_str_artemis() {
477 assert_eq!(
478 "artemis".parse::<BrokerType>().unwrap(),
479 BrokerType::Artemis
480 );
481 }
482
483 #[test]
484 fn broker_type_from_str_generic() {
485 assert_eq!(
486 "generic".parse::<BrokerType>().unwrap(),
487 BrokerType::Generic
488 );
489 }
490
491 #[test]
492 fn broker_type_from_str_unknown_returns_err() {
493 assert!("ibmmq".parse::<BrokerType>().is_err());
494 assert!("UnknownBroker".parse::<BrokerType>().is_err());
495 }
496
497 #[test]
498 fn broker_type_env_str() {
499 assert_eq!(BrokerType::ActiveMq.as_env_str(), "activemq");
500 assert_eq!(BrokerType::Artemis.as_env_str(), "artemis");
501 assert_eq!(BrokerType::Generic.as_env_str(), "generic");
502 }
503
504 #[test]
505 fn jms_constructor_uses_jms_spec() {
506 let cfg = BridgeProcessConfig::jms(
507 PathBuf::from("/tmp/jms-bridge"),
508 "tcp://localhost:61616".to_string(),
509 BrokerType::ActiveMq,
510 Some("user".to_string()),
511 Some(Redacted::new("pass".to_string())),
512 1000,
513 );
514 assert_eq!(cfg.spec.name, "jms-bridge");
515 }
516
517 #[test]
518 fn xml_constructor_uses_xml_spec() {
519 let cfg = BridgeProcessConfig::xml(PathBuf::from("/tmp/xml-bridge"), 1000);
520 assert_eq!(cfg.spec.name, "xml-bridge");
521 }
522
523 #[test]
524 fn cxf_profiles_generates_cxf_profiles_env_var() {
525 let profiles = vec![
526 CxfProfileEnvVars {
527 name: "baleares".to_string(),
528 wsdl_path: "/a.wsdl".to_string(),
529 service_name: "Svc".to_string(),
530 port_name: "Port".to_string(),
531 address: None,
532 keystore_path: None,
533 keystore_password: None,
534 truststore_path: None,
535 truststore_password: None,
536 sig_username: None,
537 sig_password: None,
538 enc_username: None,
539 security_actions_out: None,
540 security_actions_in: None,
541 signature_algorithm: None,
542 signature_digest_algorithm: None,
543 signature_c14n_algorithm: None,
544 signature_parts: None,
545 },
546 CxfProfileEnvVars {
547 name: "extremadura".to_string(),
548 wsdl_path: "/b.wsdl".to_string(),
549 service_name: "Svc2".to_string(),
550 port_name: "Port2".to_string(),
551 address: Some("http://host:9090/ws".to_string()),
552 keystore_path: Some("/b.jks".to_string()),
553 keystore_password: Some(Redacted::new("pass".to_string())),
554 truststore_path: None,
555 truststore_password: None,
556 sig_username: Some("cert".to_string()),
557 sig_password: Some(Redacted::new("sig_pass".to_string())),
558 enc_username: None,
559 security_actions_out: Some("Timestamp Signature".to_string()),
560 security_actions_in: Some("Timestamp Signature".to_string()),
561 signature_algorithm: None,
562 signature_digest_algorithm: None,
563 signature_c14n_algorithm: None,
564 signature_parts: None,
565 },
566 ];
567
568 let cfg =
569 BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
570
571 assert_eq!(cfg.spec.name, "cxf-bridge");
572 assert!(cfg.broker_url.is_empty());
573 assert_eq!(cfg.broker_type, BrokerType::Generic);
574 assert!(cfg.username.is_none());
575 assert!(cfg.password.is_none());
576
577 let profiles_var = cfg
579 .env_vars
580 .iter()
581 .find(|(k, _)| k == "CXF_PROFILES")
582 .expect("CXF_PROFILES env var must exist");
583 assert_eq!(profiles_var.1, "baleares,extremadura");
584
585 assert!(
587 cfg.env_vars
588 .iter()
589 .any(|(k, v)| k == "CXF_PROFILE_BALEARES_WSDL_PATH" && v == "/a.wsdl")
590 );
591 assert!(
592 cfg.env_vars
593 .iter()
594 .any(|(k, v)| k == "CXF_PROFILE_BALEARES_SERVICE_NAME" && v == "Svc")
595 );
596 assert!(
597 cfg.env_vars
598 .iter()
599 .any(|(k, v)| k == "CXF_PROFILE_BALEARES_PORT_NAME" && v == "Port")
600 );
601 assert!(
602 !cfg.env_vars
603 .iter()
604 .any(|(k, _)| k == "CXF_PROFILE_BALEARES_ADDRESS")
605 );
606
607 assert!(
609 cfg.env_vars
610 .iter()
611 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_WSDL_PATH" && v == "/b.wsdl")
612 );
613 assert!(
614 cfg.env_vars
615 .iter()
616 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_ADDRESS" && v == "http://host:9090/ws")
617 );
618 assert!(
619 cfg.env_vars
620 .iter()
621 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PATH" && v == "/b.jks")
622 );
623 assert!(
624 cfg.env_vars
625 .iter()
626 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PASSWORD" && v == "pass")
627 );
628 assert!(
629 cfg.env_vars
630 .iter()
631 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_USERNAME" && v == "cert")
632 );
633 assert!(
634 cfg.env_vars
635 .iter()
636 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_PASSWORD" && v == "sig_pass")
637 );
638 assert!(
639 cfg.env_vars
640 .iter()
641 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SECURITY_ACTIONS_OUT"
642 && v == "Timestamp Signature")
643 );
644 }
645
646 #[test]
647 fn cxf_profiles_single_profile_no_security() {
648 let profiles = vec![CxfProfileEnvVars {
649 name: "test".to_string(),
650 wsdl_path: "service.wsdl".to_string(),
651 service_name: "{http://example.com}Service".to_string(),
652 port_name: "{http://example.com}Port".to_string(),
653 address: None,
654 keystore_path: None,
655 keystore_password: None,
656 truststore_path: None,
657 truststore_password: None,
658 sig_username: None,
659 sig_password: None,
660 enc_username: None,
661 security_actions_out: None,
662 security_actions_in: None,
663 signature_algorithm: None,
664 signature_digest_algorithm: None,
665 signature_c14n_algorithm: None,
666 signature_parts: None,
667 }];
668
669 let cfg =
670 BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
671
672 assert_eq!(cfg.spec.name, "cxf-bridge");
673 assert_eq!(cfg.env_vars.len(), 4);
675 assert_eq!(cfg.env_vars[0].0, "CXF_PROFILES");
676 assert_eq!(cfg.env_vars[0].1, "test");
677 assert_eq!(cfg.env_vars[1].0, "CXF_PROFILE_TEST_WSDL_PATH");
678 assert_eq!(cfg.env_vars[1].1, "service.wsdl");
679 assert_eq!(cfg.env_vars[2].0, "CXF_PROFILE_TEST_SERVICE_NAME");
680 assert_eq!(cfg.env_vars[2].1, "{http://example.com}Service");
681 assert_eq!(cfg.env_vars[3].0, "CXF_PROFILE_TEST_PORT_NAME");
682 assert_eq!(cfg.env_vars[3].1, "{http://example.com}Port");
683 }
684
685 #[test]
686 fn profile_env_vars_to_env_vars_includes_all_fields() {
687 let vars = CxfProfileEnvVars {
688 name: "full".to_string(),
689 wsdl_path: "/wsdl".to_string(),
690 service_name: "Svc".to_string(),
691 port_name: "Port".to_string(),
692 address: Some("http://host:8080".to_string()),
693 keystore_path: Some("/ks.jks".to_string()),
694 keystore_password: Some(Redacted::new("ks_pass".to_string())),
695 truststore_path: Some("/ts.jks".to_string()),
696 truststore_password: Some(Redacted::new("ts_pass".to_string())),
697 sig_username: Some("user".to_string()),
698 sig_password: Some(Redacted::new("sig_pass".to_string())),
699 enc_username: None,
700 security_actions_out: Some("Timestamp Signature".to_string()),
701 security_actions_in: Some("Timestamp".to_string()),
702 signature_algorithm: None,
703 signature_digest_algorithm: None,
704 signature_c14n_algorithm: None,
705 signature_parts: None,
706 };
707
708 let env = vars.to_env_vars();
709 assert_eq!(env.len(), 12);
711
712 let keys: Vec<&str> = env.iter().map(|(k, _)| k.as_str()).collect();
713 assert!(keys.contains(&"CXF_PROFILE_FULL_WSDL_PATH"));
714 assert!(keys.contains(&"CXF_PROFILE_FULL_SERVICE_NAME"));
715 assert!(keys.contains(&"CXF_PROFILE_FULL_PORT_NAME"));
716 assert!(keys.contains(&"CXF_PROFILE_FULL_ADDRESS"));
717 assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PATH"));
718 assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PASSWORD"));
719 assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PATH"));
720 assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PASSWORD"));
721 assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_USERNAME"));
722 assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_PASSWORD"));
723 assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_OUT"));
724 assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_IN"));
725 }
726
727 #[test]
728 fn test_start_timeout_zero_rejected() {
729 let config = BridgeProcessConfig::jms(
730 PathBuf::from("/usr/bin/echo"),
731 "tcp://localhost:61616".to_string(),
732 BrokerType::ActiveMq,
733 None,
734 None,
735 0,
736 );
737 let result = config.validate();
738 assert!(result.is_err());
739 }
740
741 #[test]
742 fn test_bridge_rejects_zero_start_timeout() {
743 let config = BridgeProcessConfig::jms(
744 PathBuf::from("/usr/bin/echo"),
745 "tcp://localhost:61616".to_string(),
746 BrokerType::ActiveMq,
747 None,
748 None,
749 0,
750 );
751 assert!(config.validate().is_err());
752 }
753
754 #[tokio::test]
755 async fn test_bridge_stop_completes() {
756 use tokio::process::Command;
757 use tokio::time::{Duration, timeout};
758
759 let child = Command::new("sh")
760 .arg("-c")
761 .arg("trap '' TERM; while true; do echo tick; sleep 1; done")
762 .stdout(std::process::Stdio::null())
763 .spawn()
764 .expect("must spawn test child process");
765
766 let bridge = BridgeProcess {
767 child,
768 grpc_port: 0,
769 token: CancellationToken::new(),
770 handle: None,
771 };
772
773 let result = timeout(Duration::from_secs(5), bridge.stop()).await;
774 assert!(result.is_ok(), "stop() must complete within 5s");
775 }
776
777 #[test]
780 fn redacted_debug_displays_redacted() {
781 let r = Redacted::new("secret_password".to_string());
782 assert_eq!(format!("{r:?}"), "[REDACTED]");
783 }
784
785 #[test]
786 fn redacted_display_displays_redacted() {
787 let r = Redacted::new("secret_password".to_string());
788 assert_eq!(format!("{r}"), "[REDACTED]");
789 }
790
791 #[test]
792 fn redacted_deref_gives_inner_value() {
793 let r = Redacted::new("secret".to_string());
794 assert_eq!(&*r, "secret");
795 }
796
797 #[test]
798 fn redacted_into_inner_returns_value() {
799 let r = Redacted::new("secret".to_string());
800 assert_eq!(r.into_inner(), "secret");
801 }
802
803 #[test]
804 fn redacted_clone_works() {
805 let r = Redacted::new("secret".to_string());
806 let c = r.clone();
807 assert_eq!(&*c, "secret");
808 assert_eq!(format!("{c:?}"), "[REDACTED]");
809 }
810
811 #[test]
812 fn bridge_process_config_debug_redacts_password() {
813 let cfg = BridgeProcessConfig::jms(
814 PathBuf::from("/tmp/jms-bridge"),
815 "tcp://localhost:61616".to_string(),
816 BrokerType::ActiveMq,
817 Some("user".to_string()),
818 Some(Redacted::new("super_secret".to_string())),
819 1000,
820 );
821 let password_debug = format!("{:?}", cfg.password); assert!(
826 !password_debug.contains("super_secret"),
827 "Password field must not leak in Debug: {password_debug}"
828 );
829 assert_eq!(
830 password_debug, "Some([REDACTED])",
831 "Password field must show [REDACTED]: {password_debug}"
832 );
833 }
834
835 #[test]
836 fn cxf_profile_debug_redacts_passwords() {
837 let profile = CxfProfileEnvVars {
838 name: "test".to_string(),
839 wsdl_path: "/a.wsdl".to_string(),
840 service_name: "Svc".to_string(),
841 port_name: "Port".to_string(),
842 address: None,
843 keystore_path: None,
844 keystore_password: Some(Redacted::new("ks_secret_val".to_string())),
845 truststore_path: None,
846 truststore_password: Some(Redacted::new("ts_secret_val".to_string())),
847 sig_username: None,
848 sig_password: Some(Redacted::new("sig_secret_val".to_string())),
849 enc_username: None,
850 security_actions_out: None,
851 security_actions_in: None,
852 signature_algorithm: None,
853 signature_digest_algorithm: None,
854 signature_c14n_algorithm: None,
855 signature_parts: None,
856 };
857 let debug_output = format!("{profile:?}");
858 assert!(
859 !debug_output.contains("ks_secret_val")
860 && !debug_output.contains("ts_secret_val")
861 && !debug_output.contains("sig_secret_val"),
862 "Debug must not contain passwords: {debug_output}"
863 );
864 }
865}