pub mod http;
pub mod script;
pub mod shell;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::Duration;
use sqlx::SqlitePool;
use tokio::fs;
use crate::config::{ExecutorConfig, HttpMethod};
use crate::interpolation::interpolate_command;
use crate::models::ExecutionStatus;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ScriptRuntime {
Direct,
JavaScript,
Python,
}
#[derive(Clone)]
pub enum ResolvedExecutor {
Shell {
command: String,
},
Script {
path: PathBuf,
runtime: ScriptRuntime,
},
Http {
method: HttpMethod,
url: String,
headers: HashMap<String, String>,
body: Option<String>,
follow_redirects: bool,
},
}
pub fn resolve_executor(config: &ExecutorConfig, payload_json: &str) -> ResolvedExecutor {
match config {
ExecutorConfig::Shell { command } => {
let interpolated = if let Ok(payload_value) =
serde_json::from_str::<serde_json::Value>(payload_json)
{
interpolate_command(command, &payload_value).into_owned()
} else {
command.clone()
};
ResolvedExecutor::Shell {
command: interpolated,
}
}
ExecutorConfig::Script { path } => ResolvedExecutor::Script {
path: PathBuf::from(path),
runtime: ScriptRuntime::Direct,
},
ExecutorConfig::JavaScript { path } => ResolvedExecutor::Script {
path: PathBuf::from(path),
runtime: ScriptRuntime::JavaScript,
},
ExecutorConfig::Python { path } => ResolvedExecutor::Script {
path: PathBuf::from(path),
runtime: ScriptRuntime::Python,
},
ExecutorConfig::Http {
method,
url,
headers,
body,
follow_redirects,
} => {
let payload_value: serde_json::Value = serde_json::from_str(payload_json)
.unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
let interpolated_url = interpolate_command(url, &payload_value).into_owned();
let interpolated_body = body
.as_deref()
.map(|b| interpolate_command(b, &payload_value).into_owned());
ResolvedExecutor::Http {
method: *method,
url: interpolated_url,
headers: headers.clone(),
body: interpolated_body,
follow_redirects: *follow_redirects,
}
}
}
}
#[derive(Clone)]
pub struct ExecutionContext {
pub execution_id: String,
pub hook_slug: String,
pub executor: ResolvedExecutor,
pub env: HashMap<String, String>,
pub cwd: Option<String>,
pub timeout: Duration,
pub logs_dir: String,
pub payload_json: String,
pub http_client: Option<reqwest::Client>,
}
pub struct ExecutionResult {
pub status: ExecutionStatus,
pub exit_code: Option<i32>,
pub log_dir: String,
}
pub(crate) async fn prepare_log_files(
logs_dir: &str,
execution_id: &str,
payload_json: &str,
) -> std::io::Result<(PathBuf, fs::File, fs::File)> {
let log_dir = Path::new(logs_dir).join(execution_id);
fs::create_dir_all(&log_dir).await?;
fs::write(log_dir.join("payload.json"), payload_json.as_bytes()).await?;
let stdout_file = fs::OpenOptions::new()
.create(true)
.append(true)
.open(log_dir.join("stdout.log"))
.await?;
let stderr_file = fs::OpenOptions::new()
.create(true)
.append(true)
.open(log_dir.join("stderr.log"))
.await?;
Ok((log_dir, stdout_file, stderr_file))
}
pub(crate) fn system_env_vars() -> HashMap<String, String> {
const INHERIT_VARS: &[&str] = &["PATH", "HOME", "USER", "LANG"];
let mut vars = HashMap::with_capacity(INHERIT_VARS.len());
for &name in INHERIT_VARS {
if let Ok(val) = std::env::var(name) {
vars.insert(name.into(), val);
}
}
vars
}
pub async fn run(pool: &SqlitePool, ctx: ExecutionContext) -> ExecutionResult {
match &ctx.executor {
ResolvedExecutor::Shell { command } => shell::run_shell(pool, &ctx, command).await,
ResolvedExecutor::Script { path, runtime } => {
script::run_script(pool, &ctx, path, *runtime).await
}
ResolvedExecutor::Http { .. } => {
let client = ctx.http_client.clone().unwrap_or_default();
http::run_http(pool, &ctx, &client).await
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::Db;
use crate::models::execution;
use sqlx::SqlitePool;
#[tokio::test]
async fn prepare_log_files_creates_directory_and_files() {
let tmp = tempfile::TempDir::new().expect("temp dir");
let logs_dir = tmp.path().to_str().expect("utf-8 path");
let exec_id = "test-exec-001";
let (log_dir, _stdout, _stderr) = prepare_log_files(logs_dir, exec_id, "{}")
.await
.expect("prepare_log_files");
assert!(log_dir.exists());
assert!(log_dir.join("stdout.log").exists());
assert!(log_dir.join("stderr.log").exists());
}
#[test]
fn system_env_vars_includes_path() {
let vars = system_env_vars();
assert!(
vars.contains_key("PATH"),
"PATH should be present in system env vars"
);
}
#[test]
fn system_env_vars_excludes_arbitrary_vars() {
unsafe { std::env::set_var("SENDWORD_TEST_ARBITRARY_XYZ_999", "leaked") };
let vars = system_env_vars();
assert!(
!vars.contains_key("SENDWORD_TEST_ARBITRARY_XYZ_999"),
"arbitrary env vars should not be inherited"
);
unsafe { std::env::remove_var("SENDWORD_TEST_ARBITRARY_XYZ_999") };
}
#[test]
fn resolve_shell_interpolates_payload_fields() {
let resolved = resolve_executor(
&ExecutorConfig::Shell {
command: "deploy {{ action }}".into(),
},
r#"{"action":"prod"}"#,
);
let ResolvedExecutor::Shell { command } = resolved else {
panic!("expected shell executor");
};
assert_eq!(command, "deploy 'prod'");
}
#[test]
fn resolve_direct_script_runtime() {
let resolved = resolve_executor(
&ExecutorConfig::Script {
path: "data/scripts/deploy.sh".into(),
},
"{}",
);
let ResolvedExecutor::Script { path, runtime } = resolved else {
panic!("expected script executor");
};
assert_eq!(path, PathBuf::from("data/scripts/deploy.sh"));
assert_eq!(runtime, ScriptRuntime::Direct);
}
#[test]
fn resolve_javascript_script_runtime() {
let resolved = resolve_executor(
&ExecutorConfig::JavaScript {
path: "data/scripts/deploy.js".into(),
},
"{}",
);
let ResolvedExecutor::Script { path, runtime } = resolved else {
panic!("expected script executor");
};
assert_eq!(path, PathBuf::from("data/scripts/deploy.js"));
assert_eq!(runtime, ScriptRuntime::JavaScript);
}
#[test]
fn resolve_python_script_runtime() {
let resolved = resolve_executor(
&ExecutorConfig::Python {
path: "data/scripts/deploy.py".into(),
},
"{}",
);
let ResolvedExecutor::Script { path, runtime } = resolved else {
panic!("expected script executor");
};
assert_eq!(path, PathBuf::from("data/scripts/deploy.py"));
assert_eq!(runtime, ScriptRuntime::Python);
}
#[test]
fn resolve_http_interpolates_url_and_body() {
let mut headers = HashMap::new();
headers.insert("X-Test".into(), "static".into());
let resolved = resolve_executor(
&ExecutorConfig::Http {
method: HttpMethod::Post,
url: "https://example.test/{{ action }}".into(),
headers: headers.clone(),
body: Some(r#"{"action":"{{ action }}"}"#.into()),
follow_redirects: false,
},
r#"{"action":"deploy"}"#,
);
let ResolvedExecutor::Http {
method,
url,
headers: resolved_headers,
body,
follow_redirects,
} = resolved
else {
panic!("expected http executor");
};
assert_eq!(method, HttpMethod::Post);
assert_eq!(url, "https://example.test/'deploy'");
assert_eq!(resolved_headers, headers);
assert_eq!(body.as_deref(), Some(r#"{"action":"'deploy'"}"#));
assert!(!follow_redirects);
}
#[tokio::test]
async fn prepare_log_files_creates_nested_parents() {
let tmp = tempfile::TempDir::new().expect("temp dir");
let nested = tmp.path().join("a").join("b").join("logs");
let logs_dir = nested.to_str().expect("utf-8 path");
let exec_id = "test-exec-002";
let (log_dir, _stdout, _stderr) = prepare_log_files(logs_dir, exec_id, "{}")
.await
.expect("prepare_log_files");
assert!(log_dir.exists());
assert!(log_dir.join("stdout.log").exists());
assert!(log_dir.join("stderr.log").exists());
}
async fn test_pool() -> SqlitePool {
let db = Db::new_in_memory().await.expect("in-memory db");
db.migrate().await.expect("migration");
db.pool().clone()
}
async fn setup_execution(pool: &SqlitePool, logs_dir: &str, command: &str) -> ExecutionContext {
let exec = execution::create(
pool,
&execution::NewExecution {
id: None,
hook_slug: "test-hook",
log_path: logs_dir,
trigger_source: "127.0.0.1",
request_payload: "{}",
retry_of: None,
status: None,
},
)
.await
.expect("create execution");
ExecutionContext {
execution_id: exec.id,
hook_slug: "test-hook".into(),
executor: ResolvedExecutor::Shell {
command: command.into(),
},
env: HashMap::new(),
cwd: None,
timeout: Duration::from_secs(10),
logs_dir: logs_dir.into(),
payload_json: "{}".into(),
http_client: None,
}
}
async fn read_log(logs_dir: &str, exec_id: &str, file: &str) -> String {
let path = Path::new(logs_dir).join(exec_id).join(file);
tokio::fs::read_to_string(path).await.unwrap_or_default()
}
#[tokio::test]
async fn successful_command_returns_success() {
let tmp = tempfile::TempDir::new().expect("temp dir");
let logs_dir = tmp.path().to_str().expect("utf-8 path");
let pool = test_pool().await;
let ctx = setup_execution(&pool, logs_dir, "echo hello").await;
let exec_id = ctx.execution_id.clone();
let result = run(&pool, ctx).await;
assert_eq!(result.status, ExecutionStatus::Success);
assert_eq!(result.exit_code, Some(0));
let stdout = read_log(logs_dir, &exec_id, "stdout.log").await;
assert_eq!(stdout.trim(), "hello");
let stderr = read_log(logs_dir, &exec_id, "stderr.log").await;
assert!(stderr.is_empty());
let exec = execution::get_by_id(&pool, &exec_id).await.expect("get");
assert_eq!(exec.status, ExecutionStatus::Success);
assert!(exec.started_at.is_some());
assert!(exec.completed_at.is_some());
assert_eq!(exec.exit_code, Some(0));
}
#[tokio::test]
async fn failing_command_returns_failed_with_exit_code() {
let tmp = tempfile::TempDir::new().expect("temp dir");
let logs_dir = tmp.path().to_str().expect("utf-8 path");
let pool = test_pool().await;
let ctx = setup_execution(&pool, logs_dir, "exit 42").await;
let exec_id = ctx.execution_id.clone();
let result = run(&pool, ctx).await;
assert_eq!(result.status, ExecutionStatus::Failed);
assert_eq!(result.exit_code, Some(42));
let exec = execution::get_by_id(&pool, &exec_id).await.expect("get");
assert_eq!(exec.status, ExecutionStatus::Failed);
assert_eq!(exec.exit_code, Some(42));
}
#[tokio::test]
async fn stderr_output_is_captured() {
let tmp = tempfile::TempDir::new().expect("temp dir");
let logs_dir = tmp.path().to_str().expect("utf-8 path");
let pool = test_pool().await;
let ctx = setup_execution(&pool, logs_dir, "echo error >&2").await;
let exec_id = ctx.execution_id.clone();
let _result = run(&pool, ctx).await;
let stderr = read_log(logs_dir, &exec_id, "stderr.log").await;
assert_eq!(stderr.trim(), "error");
}
#[tokio::test]
async fn timeout_kills_long_running_command() {
let tmp = tempfile::TempDir::new().expect("temp dir");
let logs_dir = tmp.path().to_str().expect("utf-8 path");
let pool = test_pool().await;
let mut ctx = setup_execution(&pool, logs_dir, "sleep 60").await;
ctx.timeout = Duration::from_secs(1);
let exec_id = ctx.execution_id.clone();
let start = std::time::Instant::now();
let result = run(&pool, ctx).await;
let elapsed = start.elapsed();
assert_eq!(result.status, ExecutionStatus::TimedOut);
assert!(result.exit_code.is_none());
assert!(
elapsed < Duration::from_secs(5),
"timeout test should complete quickly, took {elapsed:?}"
);
let exec = execution::get_by_id(&pool, &exec_id).await.expect("get");
assert_eq!(exec.status, ExecutionStatus::TimedOut);
}
#[tokio::test]
async fn hook_env_vars_are_passed_to_command() {
let tmp = tempfile::TempDir::new().expect("temp dir");
let logs_dir = tmp.path().to_str().expect("utf-8 path");
let pool = test_pool().await;
let mut ctx = setup_execution(&pool, logs_dir, "echo $MY_VAR").await;
ctx.env.insert("MY_VAR".into(), "hello".into());
let exec_id = ctx.execution_id.clone();
let _result = run(&pool, ctx).await;
let stdout = read_log(logs_dir, &exec_id, "stdout.log").await;
assert_eq!(stdout.trim(), "hello");
}
#[tokio::test]
async fn sendword_env_vars_are_set() {
let tmp = tempfile::TempDir::new().expect("temp dir");
let logs_dir = tmp.path().to_str().expect("utf-8 path");
let pool = test_pool().await;
let ctx = setup_execution(
&pool,
logs_dir,
"echo $SENDWORD_EXECUTION_ID $SENDWORD_HOOK_SLUG",
)
.await;
let exec_id = ctx.execution_id.clone();
let _result = run(&pool, ctx).await;
let stdout = read_log(logs_dir, &exec_id, "stdout.log").await;
let parts: Vec<&str> = stdout.trim().split_whitespace().collect();
assert_eq!(parts.len(), 2);
assert_eq!(parts[0], exec_id);
assert_eq!(parts[1], "test-hook");
}
#[tokio::test]
async fn working_directory_is_respected() {
let tmp = tempfile::TempDir::new().expect("temp dir");
let logs_dir = tmp.path().to_str().expect("utf-8 path");
let pool = test_pool().await;
let work_dir = tempfile::TempDir::new().expect("work dir");
let work_path = work_dir.path().canonicalize().expect("canonical path");
let mut ctx = setup_execution(&pool, logs_dir, "pwd").await;
ctx.cwd = Some(work_path.to_str().expect("utf-8").into());
let exec_id = ctx.execution_id.clone();
let _result = run(&pool, ctx).await;
let stdout = read_log(logs_dir, &exec_id, "stdout.log").await;
assert_eq!(stdout.trim(), work_path.to_str().expect("utf-8"));
}
#[tokio::test]
async fn invalid_cwd_results_in_failed() {
let tmp = tempfile::TempDir::new().expect("temp dir");
let logs_dir = tmp.path().to_str().expect("utf-8 path");
let pool = test_pool().await;
let mut ctx = setup_execution(&pool, logs_dir, "echo should-not-run").await;
ctx.cwd = Some("/nonexistent/path/that/does/not/exist".into());
let exec_id = ctx.execution_id.clone();
let result = run(&pool, ctx).await;
assert_eq!(result.status, ExecutionStatus::Failed);
let stderr = read_log(logs_dir, &exec_id, "stderr.log").await;
assert!(
!stderr.is_empty(),
"stderr.log should contain spawn error message"
);
}
#[tokio::test]
async fn environment_is_clean_no_inherited_server_vars() {
let tmp = tempfile::TempDir::new().expect("temp dir");
let logs_dir = tmp.path().to_str().expect("utf-8 path");
let pool = test_pool().await;
unsafe { std::env::set_var("SENDWORD_TEST_UNIQUE_VAR_12345", "leaked") };
let ctx = setup_execution(
&pool,
logs_dir,
"echo ${SENDWORD_TEST_UNIQUE_VAR_12345:-clean}",
)
.await;
let exec_id = ctx.execution_id.clone();
let _result = run(&pool, ctx).await;
unsafe { std::env::remove_var("SENDWORD_TEST_UNIQUE_VAR_12345") };
let stdout = read_log(logs_dir, &exec_id, "stdout.log").await;
assert_eq!(
stdout.trim(),
"clean",
"server env vars should not leak to child process"
);
}
#[tokio::test]
async fn prepare_log_files_creates_payload_json() {
let tmp = tempfile::TempDir::new().expect("temp dir");
let logs_dir = tmp.path().to_str().expect("utf-8 path");
let exec_id = "test-exec-payload";
let payload = r#"{"test":true}"#;
let (log_dir, _stdout, _stderr) = prepare_log_files(logs_dir, exec_id, payload)
.await
.expect("prepare_log_files");
assert!(log_dir.join("payload.json").exists());
let contents = tokio::fs::read_to_string(log_dir.join("payload.json"))
.await
.expect("read payload.json");
assert_eq!(contents, payload);
}
#[tokio::test]
async fn payload_json_file_is_written_to_log_dir() {
let tmp = tempfile::TempDir::new().expect("temp dir");
let logs_dir = tmp.path().to_str().expect("utf-8 path");
let pool = test_pool().await;
let mut ctx = setup_execution(&pool, logs_dir, "echo ok").await;
ctx.payload_json = r#"{"action":"deploy","count":3}"#.into();
let exec_id = ctx.execution_id.clone();
let _result = run(&pool, ctx).await;
let payload_path = Path::new(logs_dir).join(&exec_id).join("payload.json");
let contents = tokio::fs::read_to_string(&payload_path)
.await
.expect("payload.json should exist");
assert_eq!(contents, r#"{"action":"deploy","count":3}"#);
}
#[tokio::test]
async fn sendword_payload_env_var_is_set() {
let tmp = tempfile::TempDir::new().expect("temp dir");
let logs_dir = tmp.path().to_str().expect("utf-8 path");
let pool = test_pool().await;
let mut ctx = setup_execution(&pool, logs_dir, "echo $SENDWORD_PAYLOAD").await;
ctx.payload_json = r#"{"key":"value"}"#.into();
let exec_id = ctx.execution_id.clone();
let _result = run(&pool, ctx).await;
let stdout = read_log(logs_dir, &exec_id, "stdout.log").await;
assert_eq!(stdout.trim(), r#"{"key":"value"}"#);
}
#[tokio::test]
async fn empty_payload_json_written_to_log_dir() {
let tmp = tempfile::TempDir::new().expect("temp dir");
let logs_dir = tmp.path().to_str().expect("utf-8 path");
let pool = test_pool().await;
let ctx = setup_execution(&pool, logs_dir, "echo ok").await;
let exec_id = ctx.execution_id.clone();
let _result = run(&pool, ctx).await;
let payload_path = Path::new(logs_dir).join(&exec_id).join("payload.json");
let contents = tokio::fs::read_to_string(&payload_path)
.await
.expect("payload.json should exist even for empty payload");
assert_eq!(contents, "{}");
}
#[tokio::test]
async fn sendword_payload_env_var_set_for_empty_payload() {
let tmp = tempfile::TempDir::new().expect("temp dir");
let logs_dir = tmp.path().to_str().expect("utf-8 path");
let pool = test_pool().await;
let ctx = setup_execution(&pool, logs_dir, "echo $SENDWORD_PAYLOAD").await;
let exec_id = ctx.execution_id.clone();
let _result = run(&pool, ctx).await;
let stdout = read_log(logs_dir, &exec_id, "stdout.log").await;
assert_eq!(stdout.trim(), "{}");
}
#[tokio::test]
async fn payload_json_overwritten_not_appended_on_retry() {
let tmp = tempfile::TempDir::new().expect("temp dir");
let logs_dir = tmp.path().to_str().expect("utf-8 path");
let exec_id = "test-exec-overwrite";
let payload_v1 = r#"{"version":1}"#;
let payload_v2 = r#"{"version":2}"#;
let _ = prepare_log_files(logs_dir, exec_id, payload_v1)
.await
.expect("first prepare");
let (log_dir, _, _) = prepare_log_files(logs_dir, exec_id, payload_v2)
.await
.expect("second prepare");
let contents = tokio::fs::read_to_string(log_dir.join("payload.json"))
.await
.expect("read payload.json");
assert_eq!(
contents, payload_v2,
"payload.json should be overwritten, not appended"
);
}
}