Skip to main content

camel_bridge/
process.rs

1use std::path::PathBuf;
2use thiserror::Error;
3
4use crate::spec::{BridgeSpec, JMS_BRIDGE, XML_BRIDGE};
5
6#[derive(Debug, Error)]
7pub enum BridgeError {
8    #[error("IO error: {0}")]
9    Io(#[from] std::io::Error),
10    #[error("Bridge timed out: {0}")]
11    Timeout(String),
12    #[error("Bridge stdout closed before ready message")]
13    StdoutClosed,
14    #[error("Bridge ready message malformed: {0}")]
15    BadReadyMessage(String),
16    #[error("Download failed: {0}")]
17    Download(String),
18    #[error("Checksum mismatch: expected {expected}, got {actual}")]
19    ChecksumMismatch { expected: String, actual: String },
20    #[error("URL not allowed: {0}")]
21    UrlNotAllowed(String),
22    #[error("Transport error: {0}")]
23    Transport(String),
24}
25
26#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
27#[serde(rename_all = "lowercase")]
28pub enum BrokerType {
29    #[serde(alias = "active_mq")]
30    ActiveMq,
31    Artemis,
32    Generic,
33}
34
35impl BrokerType {
36    pub fn as_env_str(&self) -> &'static str {
37        match self {
38            BrokerType::ActiveMq => "activemq",
39            BrokerType::Artemis => "artemis",
40            BrokerType::Generic => "generic",
41        }
42    }
43}
44
45impl std::str::FromStr for BrokerType {
46    type Err = std::convert::Infallible;
47
48    fn from_str(s: &str) -> Result<Self, Self::Err> {
49        Ok(match s.to_lowercase().as_str() {
50            "activemq" => BrokerType::ActiveMq,
51            "artemis" => BrokerType::Artemis,
52            _ => BrokerType::Generic,
53        })
54    }
55}
56
57pub struct BridgeProcessConfig {
58    pub spec: &'static BridgeSpec,
59    pub binary_path: PathBuf,
60    pub broker_url: String,
61    pub broker_type: BrokerType,
62    pub username: Option<String>,
63    pub password: Option<String>,
64    pub start_timeout_ms: u64,
65    pub env_vars: Vec<(String, String)>,
66}
67
68impl BridgeProcessConfig {
69    /// Constructor for the JMS bridge.
70    pub fn jms(
71        binary_path: PathBuf,
72        broker_url: String,
73        broker_type: BrokerType,
74        username: Option<String>,
75        password: Option<String>,
76        start_timeout_ms: u64,
77    ) -> Self {
78        let mut env_vars = vec![
79            ("BRIDGE_BROKER_URL".to_string(), broker_url.clone()),
80            (
81                "BRIDGE_BROKER_TYPE".to_string(),
82                broker_type.as_env_str().to_string(),
83            ),
84        ];
85        if let Some(u) = &username {
86            env_vars.push(("BRIDGE_USERNAME".to_string(), u.clone()));
87        }
88        if let Some(p) = &password {
89            env_vars.push(("BRIDGE_PASSWORD".to_string(), p.clone()));
90        }
91        Self {
92            spec: &JMS_BRIDGE,
93            binary_path,
94            broker_url,
95            broker_type,
96            username,
97            password,
98            start_timeout_ms,
99            env_vars,
100        }
101    }
102
103    /// Constructor for the XML bridge.
104    pub fn xml(binary_path: PathBuf, start_timeout_ms: u64) -> Self {
105        Self {
106            spec: &XML_BRIDGE,
107            binary_path,
108            broker_url: String::new(),
109            broker_type: BrokerType::Generic,
110            username: None,
111            password: None,
112            start_timeout_ms,
113            env_vars: vec![],
114        }
115    }
116}
117
118pub struct BridgeProcess {
119    child: tokio::process::Child,
120    grpc_port: u16,
121}
122
123impl BridgeProcess {
124    pub fn grpc_port(&self) -> u16 {
125        self.grpc_port
126    }
127
128    /// Spawn the bridge process. Reads the gRPC port from stdout JSON line:
129    ///   {"status":"ready","port":PORT}
130    ///
131    /// Picks a free OS port and passes it to the bridge via `QUARKUS_GRPC_SERVER_PORT`
132    /// so Quarkus binds exactly to that port and PortAnnouncer can echo it back.
133    pub async fn start(config: &BridgeProcessConfig) -> Result<Self, BridgeError> {
134        use tokio::io::AsyncBufReadExt;
135        use tokio::process::Command;
136        use tokio::time::{Duration, timeout};
137
138        // Bind :0 to let the OS pick a free port, then release so the bridge can use it.
139        let free_port = {
140            let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
141            listener.local_addr()?.port()
142        };
143
144        // If CAMEL_BRIDGE_LOG_STDERR is set, redirect stderr to a file for debugging.
145        let stderr_stdio: std::process::Stdio =
146            if let Ok(log_dir) = std::env::var("CAMEL_BRIDGE_LOG_STDERR") {
147                let log_filename = config
148                    .spec
149                    .log_file_template
150                    .replace("{pid}", &std::process::id().to_string());
151                let log_path = if log_dir.is_empty() {
152                    format!("/tmp/{log_filename}")
153                } else {
154                    format!("{log_dir}/{log_filename}")
155                };
156                match std::fs::File::create(&log_path) {
157                    Ok(f) => {
158                        eprintln!("[camel-bridge] stderr → {}", log_path);
159                        f.into()
160                    }
161                    Err(e) => {
162                        eprintln!(
163                            "[camel-bridge] failed to create log file {}: {}",
164                            log_path, e
165                        );
166                        std::process::Stdio::inherit()
167                    }
168                }
169            } else {
170                std::process::Stdio::inherit()
171            };
172
173        let mut command = Command::new(&config.binary_path);
174        command
175            .env("QUARKUS_GRPC_SERVER_PORT", free_port.to_string())
176            // Let the OS pick a random HTTP port — we only use gRPC.
177            // Without this, Quarkus binds HTTP on 8080 and fails if occupied.
178            .env("QUARKUS_HTTP_PORT", "0")
179            .stdout(std::process::Stdio::piped())
180            .stderr(stderr_stdio);
181
182        // Inject bridge-specific env vars (e.g. JMS broker URL/credentials via ::jms()).
183        for (key, value) in &config.env_vars {
184            command.env(key, value);
185        }
186
187        let mut child = command.spawn()?;
188
189        let stdout = child.stdout.take().ok_or(BridgeError::StdoutClosed)?;
190        let mut reader = tokio::io::BufReader::new(stdout).lines();
191
192        let port = timeout(Duration::from_millis(config.start_timeout_ms), async {
193            while let Some(line) = reader.next_line().await? {
194                if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line)
195                    && v.get("status").and_then(|s| s.as_str()) == Some("ready")
196                {
197                    if let Some(p) = v.get("port").and_then(|p| p.as_u64()) {
198                        return Ok(p as u16);
199                    }
200                    return Err(BridgeError::BadReadyMessage(line));
201                }
202            }
203            Err(BridgeError::StdoutClosed)
204        })
205        .await
206        .map_err(|_| {
207            BridgeError::Timeout(format!(
208                "{} failed to start: health check timeout after {}ms",
209                config.spec.name, config.start_timeout_ms
210            ))
211        })??;
212
213        // Keep draining the bridge's stdout in the background so the pipe
214        // buffer never fills up.  If the pipe blocks, the bridge process
215        // blocks on its next stdout write and silently stops responding.
216        tokio::spawn(async move {
217            while let Ok(Some(line)) = reader.next_line().await {
218                tracing::debug!(target: "camel_bridge::child", "{}", line);
219            }
220        });
221
222        Ok(BridgeProcess {
223            child,
224            grpc_port: port,
225        })
226    }
227
228    /// Gracefully stop: SIGTERM + wait for exit.
229    pub async fn stop(mut self) -> Result<(), BridgeError> {
230        use tokio::time::{Duration, sleep};
231
232        // Send SIGTERM first (graceful shutdown)
233        #[cfg(unix)]
234        {
235            let pid = self.child.id().unwrap_or(0);
236            if pid > 0 {
237                // SAFETY: libc::kill is called with the child process PID obtained from tokio.
238                unsafe {
239                    libc::kill(pid as i32, libc::SIGTERM);
240                }
241            }
242        }
243
244        // On non-Unix (Windows), fall through to kill immediately
245        #[cfg(not(unix))]
246        let _ = self.child.start_kill();
247
248        // Wait up to 5 seconds for graceful exit, then SIGKILL
249        tokio::select! {
250            result = self.child.wait() => {
251                result?;
252            }
253            _ = sleep(Duration::from_secs(5)) => {
254                let _ = self.child.start_kill();
255                self.child.wait().await?;
256            }
257        }
258        Ok(())
259    }
260}
261
262impl Drop for BridgeProcess {
263    fn drop(&mut self) {
264        // Best-effort only. Does NOT wait — cannot block in Drop.
265        let _ = self.child.start_kill();
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272
273    #[test]
274    fn broker_type_from_str_activemq() {
275        assert_eq!(
276            "activemq".parse::<BrokerType>().unwrap(),
277            BrokerType::ActiveMq
278        );
279        assert_eq!(
280            "ACTIVEMQ".parse::<BrokerType>().unwrap(),
281            BrokerType::ActiveMq
282        );
283    }
284
285    #[test]
286    fn broker_type_from_str_artemis() {
287        assert_eq!(
288            "artemis".parse::<BrokerType>().unwrap(),
289            BrokerType::Artemis
290        );
291    }
292
293    #[test]
294    fn broker_type_from_str_unknown_is_generic() {
295        assert_eq!("ibmmq".parse::<BrokerType>().unwrap(), BrokerType::Generic);
296    }
297
298    #[test]
299    fn broker_type_env_str() {
300        assert_eq!(BrokerType::ActiveMq.as_env_str(), "activemq");
301        assert_eq!(BrokerType::Artemis.as_env_str(), "artemis");
302        assert_eq!(BrokerType::Generic.as_env_str(), "generic");
303    }
304
305    #[test]
306    fn jms_constructor_uses_jms_spec() {
307        let cfg = BridgeProcessConfig::jms(
308            PathBuf::from("/tmp/jms-bridge"),
309            "tcp://localhost:61616".to_string(),
310            BrokerType::ActiveMq,
311            Some("user".to_string()),
312            Some("pass".to_string()),
313            1000,
314        );
315        assert_eq!(cfg.spec.name, "jms-bridge");
316    }
317
318    #[test]
319    fn xml_constructor_uses_xml_spec() {
320        let cfg = BridgeProcessConfig::xml(PathBuf::from("/tmp/xml-bridge"), 1000);
321        assert_eq!(cfg.spec.name, "xml-bridge");
322    }
323}