use std::{
process::{Command, ExitStatus},
time::Duration,
};
use anyhow::Context;
use crate::{
pipeline::{control::request, naming::matching::SourceNamePattern, MeasurementPipeline},
plugin::event::StartConsumerMeasurement,
resources::ResourceConsumer,
};
use super::{builder::ShutdownError, RunningAgent};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum WatchError {
#[error("failed to spawn process {0}")]
ProcessSpawn(String, #[source] std::io::Error),
#[error("failed to wait for pid {0}")]
ProcessWait(u32, #[source] std::io::Error),
#[error("error in shutdown")]
Shutdown(#[source] ShutdownError),
}
pub fn watch_process(
agent: RunningAgent,
program: String,
args: Vec<String>,
shutdown_timeout: Duration,
) -> Result<(), WatchError> {
if let Err(e) = trigger_measurement_now(&agent.pipeline) {
log::error!("Could not trigger a first measurement before the child spawn: {e}");
}
let exit_status = exec_child(program, args)?;
log::info!("Child process exited with status {exit_status}, Alumet will now stop.");
if let Err(e) = trigger_measurement_now(&agent.pipeline) {
log::error!("Could not trigger one last measurement after the child exit: {e}");
}
agent.pipeline.control_handle().shutdown();
agent.wait_for_shutdown(shutdown_timeout).map_err(WatchError::Shutdown)
}
fn exec_child(external_command: String, args: Vec<String>) -> Result<ExitStatus, WatchError> {
let mut p = Command::new(external_command.clone())
.args(args)
.spawn()
.map_err(|e| WatchError::ProcessSpawn(external_command.clone(), e))?;
let pid = p.id();
log::info!("Child process '{external_command}' spawned with pid {pid}.");
crate::plugin::event::start_consumer_measurement()
.publish(StartConsumerMeasurement(vec![ResourceConsumer::Process { pid }]));
let status = p.wait().map_err(|e| WatchError::ProcessWait(pid, e))?;
Ok(status)
}
const TRIGGER_TIMEOUT: Duration = Duration::from_secs(1);
fn trigger_measurement_now(pipeline: &MeasurementPipeline) -> anyhow::Result<()> {
let control_handle = pipeline.control_handle();
let send_task = control_handle.send_wait(
request::source(SourceNamePattern::wildcard()).trigger_now(),
TRIGGER_TIMEOUT,
);
pipeline
.async_runtime()
.block_on(send_task)
.context("failed to send TriggerMessage")
}