1use std::path::PathBuf;
2use thiserror::Error;
3
4use crate::spec::{BridgeSpec, JMS_BRIDGE, XML_BRIDGE};
5
6#[derive(Debug, Error)]
7pub enum BridgeError {
8 #[error("IO error: {0}")]
9 Io(#[from] std::io::Error),
10 #[error("Bridge timed out: {0}")]
11 Timeout(String),
12 #[error("Bridge stdout closed before ready message")]
13 StdoutClosed,
14 #[error("Bridge ready message malformed: {0}")]
15 BadReadyMessage(String),
16 #[error("Download failed: {0}")]
17 Download(String),
18 #[error("Checksum mismatch: expected {expected}, got {actual}")]
19 ChecksumMismatch { expected: String, actual: String },
20 #[error("URL not allowed: {0}")]
21 UrlNotAllowed(String),
22 #[error("Transport error: {0}")]
23 Transport(String),
24}
25
26#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
27#[serde(rename_all = "lowercase")]
28pub enum BrokerType {
29 #[serde(alias = "active_mq")]
30 ActiveMq,
31 Artemis,
32 Generic,
33}
34
35impl BrokerType {
36 pub fn as_env_str(&self) -> &'static str {
37 match self {
38 BrokerType::ActiveMq => "activemq",
39 BrokerType::Artemis => "artemis",
40 BrokerType::Generic => "generic",
41 }
42 }
43}
44
45impl std::str::FromStr for BrokerType {
46 type Err = std::convert::Infallible;
47
48 fn from_str(s: &str) -> Result<Self, Self::Err> {
49 Ok(match s.to_lowercase().as_str() {
50 "activemq" => BrokerType::ActiveMq,
51 "artemis" => BrokerType::Artemis,
52 _ => BrokerType::Generic,
53 })
54 }
55}
56
57pub struct BridgeProcessConfig {
58 pub spec: &'static BridgeSpec,
59 pub binary_path: PathBuf,
60 pub broker_url: String,
61 pub broker_type: BrokerType,
62 pub username: Option<String>,
63 pub password: Option<String>,
64 pub start_timeout_ms: u64,
65 pub env_vars: Vec<(String, String)>,
66}
67
68impl BridgeProcessConfig {
69 pub fn jms(
71 binary_path: PathBuf,
72 broker_url: String,
73 broker_type: BrokerType,
74 username: Option<String>,
75 password: Option<String>,
76 start_timeout_ms: u64,
77 ) -> Self {
78 let mut env_vars = vec![
79 ("BRIDGE_BROKER_URL".to_string(), broker_url.clone()),
80 (
81 "BRIDGE_BROKER_TYPE".to_string(),
82 broker_type.as_env_str().to_string(),
83 ),
84 ];
85 if let Some(u) = &username {
86 env_vars.push(("BRIDGE_USERNAME".to_string(), u.clone()));
87 }
88 if let Some(p) = &password {
89 env_vars.push(("BRIDGE_PASSWORD".to_string(), p.clone()));
90 }
91 Self {
92 spec: &JMS_BRIDGE,
93 binary_path,
94 broker_url,
95 broker_type,
96 username,
97 password,
98 start_timeout_ms,
99 env_vars,
100 }
101 }
102
103 pub fn xml(binary_path: PathBuf, start_timeout_ms: u64) -> Self {
105 Self {
106 spec: &XML_BRIDGE,
107 binary_path,
108 broker_url: String::new(),
109 broker_type: BrokerType::Generic,
110 username: None,
111 password: None,
112 start_timeout_ms,
113 env_vars: vec![],
114 }
115 }
116}
117
118pub struct BridgeProcess {
119 child: tokio::process::Child,
120 grpc_port: u16,
121}
122
123impl BridgeProcess {
124 pub fn grpc_port(&self) -> u16 {
125 self.grpc_port
126 }
127
128 pub async fn start(config: &BridgeProcessConfig) -> Result<Self, BridgeError> {
134 use tokio::io::AsyncBufReadExt;
135 use tokio::process::Command;
136 use tokio::time::{Duration, timeout};
137
138 let free_port = {
140 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
141 listener.local_addr()?.port()
142 };
143
144 let stderr_stdio: std::process::Stdio =
146 if let Ok(log_dir) = std::env::var("CAMEL_BRIDGE_LOG_STDERR") {
147 let log_filename = config
148 .spec
149 .log_file_template
150 .replace("{pid}", &std::process::id().to_string());
151 let log_path = if log_dir.is_empty() {
152 format!("/tmp/{log_filename}")
153 } else {
154 format!("{log_dir}/{log_filename}")
155 };
156 match std::fs::File::create(&log_path) {
157 Ok(f) => {
158 eprintln!("[camel-bridge] stderr → {}", log_path);
159 f.into()
160 }
161 Err(e) => {
162 eprintln!(
163 "[camel-bridge] failed to create log file {}: {}",
164 log_path, e
165 );
166 std::process::Stdio::inherit()
167 }
168 }
169 } else {
170 std::process::Stdio::inherit()
171 };
172
173 let mut command = Command::new(&config.binary_path);
174 command
175 .env("QUARKUS_GRPC_SERVER_PORT", free_port.to_string())
176 .env("QUARKUS_HTTP_PORT", "0")
179 .stdout(std::process::Stdio::piped())
180 .stderr(stderr_stdio);
181
182 for (key, value) in &config.env_vars {
184 command.env(key, value);
185 }
186
187 let mut child = command.spawn()?;
188
189 let stdout = child.stdout.take().ok_or(BridgeError::StdoutClosed)?;
190 let mut reader = tokio::io::BufReader::new(stdout).lines();
191
192 let port = timeout(Duration::from_millis(config.start_timeout_ms), async {
193 while let Some(line) = reader.next_line().await? {
194 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line)
195 && v.get("status").and_then(|s| s.as_str()) == Some("ready")
196 {
197 if let Some(p) = v.get("port").and_then(|p| p.as_u64()) {
198 return Ok(p as u16);
199 }
200 return Err(BridgeError::BadReadyMessage(line));
201 }
202 }
203 Err(BridgeError::StdoutClosed)
204 })
205 .await
206 .map_err(|_| {
207 BridgeError::Timeout(format!(
208 "{} failed to start: health check timeout after {}ms",
209 config.spec.name, config.start_timeout_ms
210 ))
211 })??;
212
213 tokio::spawn(async move {
217 while let Ok(Some(line)) = reader.next_line().await {
218 tracing::debug!(target: "camel_bridge::child", "{}", line);
219 }
220 });
221
222 Ok(BridgeProcess {
223 child,
224 grpc_port: port,
225 })
226 }
227
228 pub async fn stop(mut self) -> Result<(), BridgeError> {
230 use tokio::time::{Duration, sleep};
231
232 #[cfg(unix)]
234 {
235 let pid = self.child.id().unwrap_or(0);
236 if pid > 0 {
237 unsafe {
239 libc::kill(pid as i32, libc::SIGTERM);
240 }
241 }
242 }
243
244 #[cfg(not(unix))]
246 let _ = self.child.start_kill();
247
248 tokio::select! {
250 result = self.child.wait() => {
251 result?;
252 }
253 _ = sleep(Duration::from_secs(5)) => {
254 let _ = self.child.start_kill();
255 self.child.wait().await?;
256 }
257 }
258 Ok(())
259 }
260}
261
262impl Drop for BridgeProcess {
263 fn drop(&mut self) {
264 let _ = self.child.start_kill();
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272
273 #[test]
274 fn broker_type_from_str_activemq() {
275 assert_eq!(
276 "activemq".parse::<BrokerType>().unwrap(),
277 BrokerType::ActiveMq
278 );
279 assert_eq!(
280 "ACTIVEMQ".parse::<BrokerType>().unwrap(),
281 BrokerType::ActiveMq
282 );
283 }
284
285 #[test]
286 fn broker_type_from_str_artemis() {
287 assert_eq!(
288 "artemis".parse::<BrokerType>().unwrap(),
289 BrokerType::Artemis
290 );
291 }
292
293 #[test]
294 fn broker_type_from_str_unknown_is_generic() {
295 assert_eq!("ibmmq".parse::<BrokerType>().unwrap(), BrokerType::Generic);
296 }
297
298 #[test]
299 fn broker_type_env_str() {
300 assert_eq!(BrokerType::ActiveMq.as_env_str(), "activemq");
301 assert_eq!(BrokerType::Artemis.as_env_str(), "artemis");
302 assert_eq!(BrokerType::Generic.as_env_str(), "generic");
303 }
304
305 #[test]
306 fn jms_constructor_uses_jms_spec() {
307 let cfg = BridgeProcessConfig::jms(
308 PathBuf::from("/tmp/jms-bridge"),
309 "tcp://localhost:61616".to_string(),
310 BrokerType::ActiveMq,
311 Some("user".to_string()),
312 Some("pass".to_string()),
313 1000,
314 );
315 assert_eq!(cfg.spec.name, "jms-bridge");
316 }
317
318 #[test]
319 fn xml_constructor_uses_xml_spec() {
320 let cfg = BridgeProcessConfig::xml(PathBuf::from("/tmp/xml-bridge"), 1000);
321 assert_eq!(cfg.spec.name, "xml-bridge");
322 }
323}