use std::collections::BTreeMap;
use std::path::PathBuf;
use std::process::Stdio;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio::process::Command as TokioCommand;
use crate::Notification;
use super::TriggerError;
use super::template::{CompiledTemplate, TemplateError, compile, template_error_to_trigger_error};
#[cfg(test)]
mod tests;
const RING_CAP: usize = 4096;
const POST_EXIT_DRAIN_CAP: Duration = Duration::from_secs(5);
#[derive(Clone)]
pub(super) struct CommandConfig {
pub(super) command_template: Result<CompiledTemplate, TemplateError>,
pub(super) env: BTreeMap<String, String>,
pub(super) working_dir: Option<PathBuf>,
}
impl std::fmt::Debug for CommandConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let CommandConfig {
command_template,
env,
working_dir,
} = self;
let template_state: &dyn std::fmt::Display = match command_template {
Ok(_) => &"<compiled-template-redacted>",
Err(_) => &"<bad-template-redacted>",
};
f.debug_struct("CommandConfig")
.field("command_template", &format_args!("{template_state}"))
.field("env_count", &env.len())
.field("working_dir", working_dir)
.finish()
}
}
#[allow(
clippy::too_many_lines,
reason = "function is intrinsically sequential (template render -> working_dir check -> spawn -> drain task spawn -> child wait -> post-exit drain join -> classify exit); extracting subphases adds parameter-passing complexity without improving readability"
)]
pub(super) async fn dispatch_command(
cfg: &CommandConfig,
timeout: Option<Duration>,
notification: &Notification,
) -> Result<(), TriggerError> {
let template = cfg
.command_template
.as_ref()
.map_err(|e| template_error_to_trigger_error(e.clone(), "command"))?;
let rendered_command = template
.render(notification)
.map_err(|e| template_error_to_trigger_error(e, "command"))?;
if let Some(path) = &cfg.working_dir {
if !tokio::fs::try_exists(path)
.await
.map_err(TriggerError::Io)?
{
return Err(TriggerError::Io(std::io::Error::other(format!(
"working_dir {} does not exist",
path.display()
))));
}
let meta = tokio::fs::metadata(path).await.map_err(TriggerError::Io)?;
if !meta.is_dir() {
return Err(TriggerError::Io(std::io::Error::other(format!(
"working_dir {} is not a directory",
path.display()
))));
}
}
let injected_env = env_injection(notification)?;
let mut cmd = TokioCommand::new("/bin/sh");
cmd.arg("-c")
.arg(&rendered_command)
.kill_on_drop(true)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.envs(injected_env)
.envs(&cfg.env);
if let Some(path) = &cfg.working_dir {
cmd.current_dir(path);
}
let mut child = cmd.spawn().map_err(TriggerError::Io)?;
let stdout = child.stdout.take().ok_or_else(|| {
TriggerError::Io(std::io::Error::other(
"stdout pipe missing immediately after spawn; bug in tokio::process",
))
})?;
let stderr = child.stderr.take().ok_or_else(|| {
TriggerError::Io(std::io::Error::other(
"stderr pipe missing immediately after spawn; bug in tokio::process",
))
})?;
let stdout_task = tokio::spawn(drain_to_ring(stdout, RING_CAP, "stdout"));
let stderr_task = tokio::spawn(drain_to_ring(stderr, RING_CAP, "stderr"));
let stdout_abort = stdout_task.abort_handle();
let stderr_abort = stderr_task.abort_handle();
let exit_status = match timeout {
Some(t) => {
let sleep = tokio::time::sleep(t);
tokio::pin!(sleep);
tokio::select! {
status = child.wait() => status.map_err(TriggerError::Io)?,
() = &mut sleep => {
if let Err(e) = child.start_kill() {
tracing::warn!(
event.name = "client.trigger.command.kill_failed",
error = %e,
"failed to send SIGKILL to command child; may be already dead"
);
}
if let Err(e) = child.wait().await {
tracing::warn!(
event.name = "client.trigger.command.reap_failed",
error = %e,
"failed to reap killed command child"
);
}
stdout_abort.abort();
stderr_abort.abort();
return Err(TriggerError::Timeout(t));
}
}
}
None => child.wait().await.map_err(TriggerError::Io)?,
};
let drained = tokio::time::timeout(POST_EXIT_DRAIN_CAP, async {
let s_out = stdout_task.await;
let s_err = stderr_task.await;
(s_out, s_err)
})
.await;
let (stdout_tail, stderr_tail) = match drained {
Ok((s_out, s_err)) => {
let stdout = s_out.unwrap_or_else(|e| {
tracing::warn!(
event.name = "client.trigger.command.stdout_drain_join_failed",
error = %e,
"stdout drain task join failed; captured output unavailable"
);
Vec::new()
});
let stderr = s_err.unwrap_or_else(|e| {
tracing::warn!(
event.name = "client.trigger.command.stderr_drain_join_failed",
error = %e,
"stderr drain task join failed; captured output unavailable"
);
Vec::new()
});
(stdout, stderr)
}
Err(_elapsed) => {
tracing::warn!(
event.name = "client.trigger.command.post_exit_drain_capped",
cap_seconds = POST_EXIT_DRAIN_CAP.as_secs(),
"post-exit drain capped; backgrounded descendant likely holding pipes open"
);
stdout_abort.abort();
stderr_abort.abort();
(Vec::new(), Vec::new())
}
};
tracing::debug!(
event.name = "client.trigger.command.stdout_bytes",
captured_bytes = stdout_tail.len(),
"command stdout captured (content suppressed per no-payload-logging rule)"
);
drop(stdout_tail);
if exit_status.success() {
Ok(())
} else {
let stderr_tail_str = String::from_utf8_lossy(&stderr_tail).into_owned();
Err(TriggerError::Command {
exit_code: exit_status.code().unwrap_or(-1),
stderr_tail: stderr_tail_str,
})
}
}
fn env_injection(n: &Notification) -> Result<Vec<(String, String)>, TriggerError> {
let mut out: Vec<(String, String)> = Vec::new();
out.push(("AVISO_EVENT_TYPE".to_string(), n.event_type.clone()));
out.push(("AVISO_SEQUENCE".to_string(), n.sequence.to_string()));
for (k, v) in &n.identifier {
out.push((format!("AVISO_IDENTIFIER_{}", normalize_key(k)), v.clone()));
}
let payload_json = serde_json::to_string(&n.payload).map_err(TriggerError::Encode)?;
out.push(("AVISO_PAYLOAD_JSON".to_string(), payload_json));
let notification_json = serde_json::to_string(n).map_err(TriggerError::Encode)?;
out.push(("AVISO_NOTIFICATION_JSON".to_string(), notification_json));
Ok(out)
}
pub(super) fn normalize_key(k: &str) -> String {
k.chars()
.map(|c| {
if c.is_ascii_alphanumeric() {
c.to_ascii_uppercase()
} else {
'_'
}
})
.collect()
}
pub(super) async fn drain_to_ring<R>(
mut reader: R,
cap: usize,
stream_label: &'static str,
) -> Vec<u8>
where
R: AsyncRead + Unpin,
{
let mut ring: Vec<u8> = Vec::with_capacity(cap);
let mut buf = vec![0u8; 4096];
loop {
match reader.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
let slice = &buf[..n];
if ring.len() + n > cap {
if n >= cap {
ring.clear();
ring.extend_from_slice(&slice[n - cap..]);
continue;
}
let overflow = ring.len() + n - cap;
ring.drain(..overflow);
}
ring.extend_from_slice(slice);
}
Err(e) => {
tracing::debug!(
event.name = "client.trigger.command.pipe_read_error",
stream = stream_label,
error = %e,
"pipe read error; degrading to partial tail"
);
break;
}
}
}
ring
}
pub(super) fn build_command_config(cmd: impl Into<String>) -> CommandConfig {
let cmd_str = cmd.into();
let compiled = compile(&cmd_str);
CommandConfig {
command_template: compiled,
env: BTreeMap::new(),
working_dir: None,
}
}