pub mod container;
pub mod local;
pub mod pepita;
pub mod ssh;
#[cfg(test)]
mod tests_container;
#[cfg(test)]
mod tests_container_b;
#[cfg(test)]
mod tests_container_c;
#[cfg(test)]
mod tests_container_d;
#[cfg(test)]
mod tests_dispatch;
#[cfg(test)]
mod tests_dispatch_b;
#[cfg(test)]
mod tests_ssh;
#[cfg(test)]
mod tests_timeout;
use crate::core::types::Machine;
pub(crate) type ChildPidSlot = std::sync::Arc<std::sync::Mutex<Option<u32>>>;
pub(crate) fn record_child_pid(slot: Option<&ChildPidSlot>, child: &std::process::Child) {
if let Some(slot) = slot {
if let Ok(mut guard) = slot.lock() {
*guard = Some(child.id());
}
}
}
#[cfg(unix)]
pub(crate) fn configure_process_group(cmd: &mut std::process::Command) {
use std::os::unix::process::CommandExt;
cmd.process_group(0);
}
#[cfg(not(unix))]
pub(crate) fn configure_process_group(_cmd: &mut std::process::Command) {}
fn kill_group_command(pid: u32) -> std::process::Command {
let mut cmd = std::process::Command::new("kill");
cmd.args(["-9", "--", &format!("-{pid}")])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null());
cmd
}
fn kill_process_group(pid: u32) {
let _ = kill_group_command(pid).status();
}
pub(crate) fn write_stdin_or_reap(
child: &mut std::process::Child,
script: &str,
) -> Result<(), String> {
use std::io::Write;
if let Some(mut stdin) = child.stdin.take() {
if let Err(e) = stdin.write_all(script.as_bytes()) {
drop(stdin);
kill_and_reap(child);
return Err(format!("stdin write error: {e}"));
}
drop(stdin);
}
Ok(())
}
pub(crate) fn kill_and_reap(child: &mut std::process::Child) {
let _ = child.kill();
let _ = child.wait();
}
#[derive(Debug, Clone)]
pub struct ExecOutput {
pub exit_code: i32,
pub stdout: String,
pub stderr: String,
}
impl ExecOutput {
pub fn success(&self) -> bool {
self.exit_code == 0
}
}
fn validate_before_exec(script: &str) -> Result<(), String> {
let sanitised = strip_data_payloads(script);
crate::core::purifier::validate_script(&sanitised)
.map_err(|e| format!("I8 violation — script failed bashrs validation: {e}"))
}
fn strip_data_payloads(script: &str) -> String {
let re_b64 = regex::Regex::new(r"echo '([A-Za-z0-9+/=\n]+)' \| base64 -d > '([^']+)'")
.expect("base64 regex is valid");
let pass1 = re_b64
.replace_all(script, "echo 'FORJAR_BASE64_STRIPPED' > '$2'")
.into_owned();
let re_heredoc =
regex::Regex::new(r"(?s)<<'FORJAR_EOF'\n.*?\nFORJAR_EOF").expect("heredoc regex is valid");
let pass2 = re_heredoc
.replace_all(
&pass1,
"<<'FORJAR_EOF'\n# payload stripped for lint\nFORJAR_EOF",
)
.into_owned();
let re_copia_rm = regex::Regex::new(r#"rm -f "\$TMPFILE""#).expect("copia rm regex is valid");
let pass3 = re_copia_rm
.replace_all(&pass2, "# forjar-copia: tmpfile cleanup stripped")
.into_owned();
let re_copia_mv =
regex::Regex::new(r#"mv "\$TMPFILE" '[^']+'"#).expect("copia mv regex is valid");
let pass4 = re_copia_mv
.replace_all(&pass3, "# forjar-copia: atomic replace stripped")
.into_owned();
let re_cargo_ops = regex::Regex::new(
r#"(?m)^\s*(?:cp|mkdir -p|rm -rf?)\s+.*\$_(?:STAGING|CACHE_DIR|CARGO_BIN).*$"#,
)
.expect("cargo ops regex is valid");
re_cargo_ops
.replace_all(&pass4, "# forjar-cargo: cache op stripped")
.into_owned()
}
pub fn exec_script(machine: &Machine, script: &str) -> Result<ExecOutput, String> {
exec_script_tracked(machine, script, None)
}
fn exec_script_tracked(
machine: &Machine,
script: &str,
pid_slot: Option<&ChildPidSlot>,
) -> Result<ExecOutput, String> {
validate_before_exec(script)?;
if machine.is_pepita_transport() {
return pepita::exec_pepita(machine, script, pid_slot);
}
if machine.is_container_transport() {
return container::exec_container(machine, script, pid_slot);
}
let is_local =
machine.addr == "127.0.0.1" || machine.addr == "localhost" || is_local_addr(&machine.addr);
if is_local {
local::exec_local(script, pid_slot)
} else {
ssh::exec_ssh(machine, script, pid_slot)
}
}
pub fn exec_script_timeout(
machine: &Machine,
script: &str,
timeout_secs: Option<u64>,
) -> Result<ExecOutput, String> {
let Some(secs) = timeout_secs else {
return exec_script(machine, script);
};
let hostname = machine.hostname.clone();
let machine = machine.clone();
let script = script.to_string();
let pid_slot: ChildPidSlot = std::sync::Arc::new(std::sync::Mutex::new(None));
let worker_slot = pid_slot.clone();
let worker_done = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let worker_done_w = worker_done.clone();
let (tx, rx) = std::sync::mpsc::channel();
let handle = std::thread::spawn(move || {
let result = exec_script_tracked(&machine, &script, Some(&worker_slot));
worker_done_w.store(true, std::sync::atomic::Ordering::SeqCst);
let _ = tx.send(result);
});
match rx.recv_timeout(std::time::Duration::from_secs(secs)) {
Ok(result) => result,
Err(_) => {
kill_worker_child_group(&pid_slot, &worker_done);
let _ = handle.join();
Err(format!(
"transport timeout: script on '{hostname}' exceeded {secs}s limit"
))
}
}
}
fn kill_worker_child_group(
pid_slot: &ChildPidSlot,
worker_done: &std::sync::atomic::AtomicBool,
) -> bool {
if worker_done.load(std::sync::atomic::Ordering::SeqCst) {
return false;
}
if let Some(pid) = pid_slot.lock().ok().and_then(|g| *g) {
kill_process_group(pid);
return true;
}
false
}
pub fn is_ssh_transport(machine: &Machine) -> bool {
!machine.is_pepita_transport()
&& !machine.is_container_transport()
&& machine.addr != "127.0.0.1"
&& machine.addr != "localhost"
&& !is_local_addr(&machine.addr)
}
pub fn exec_script_retry(
machine: &Machine,
script: &str,
timeout_secs: Option<u64>,
ssh_retries: u32,
) -> Result<ExecOutput, String> {
let is_ssh = is_ssh_transport(machine);
let max_attempts = if is_ssh { ssh_retries.clamp(1, 4) } else { 1 };
let mut last_err = String::new();
for attempt in 0..max_attempts {
if attempt > 0 {
let backoff_ms = 200u64 * (1u64 << (attempt - 1));
std::thread::sleep(std::time::Duration::from_millis(backoff_ms));
eprintln!(
" [retry {}/{}] retrying SSH to {} after {}ms backoff",
attempt,
max_attempts - 1,
machine.addr,
backoff_ms
);
}
match exec_script_timeout(machine, script, timeout_secs) {
Ok(out) => return Ok(out),
Err(e) => {
if attempt + 1 < max_attempts && is_transient_ssh_error(&e) {
last_err = e;
continue;
}
return Err(e);
}
}
}
Err(last_err)
}
fn is_transient_ssh_error(err: &str) -> bool {
let lower = err.to_lowercase();
lower.contains("connection refused")
|| lower.contains("connection reset")
|| lower.contains("connection timed out")
|| lower.contains("broken pipe")
|| lower.contains("no route to host")
|| lower.contains("transport timeout")
|| lower.contains("failed to spawn ssh")
}
pub fn query(machine: &Machine, cmd: &str) -> Result<ExecOutput, String> {
validate_before_exec(cmd)?;
exec_script(machine, cmd)
}
fn is_local_addr(addr: &str) -> bool {
if addr == "127.0.0.1" || addr == "localhost" || addr == "::1" {
return true;
}
if let Ok(hostname) = std::fs::read_to_string("/etc/hostname") {
if addr == hostname.trim() {
return true;
}
}
false
}