use std::time::Duration;
use tokio::{sync::mpsc, time::timeout};
use crate::tasks::config::TaskConfig;
use crate::tasks::event::TaskEventEnvelope;
use crate::tasks::{
control::TaskControl,
event::{TaskEvent, TaskStopReason, TaskTerminateReason},
tokio::executor::TaskExecutor,
};
#[tokio::test]
async fn send_signal_to_process() {
let (tx, mut rx) = mpsc::channel::<TaskEventEnvelope>(64);
let config = TaskConfig::new("bash")
.args([
"-c",
"trap 'echo Signal received' SIGUSR1; echo Ready; while true; do sleep 0.1; done",
])
.ready_indicator("Ready");
#[cfg(feature = "process-group")]
let config = config.use_process_group(false);
let mut executor = TaskExecutor::new(config, tx);
executor.coordinate_start().await.unwrap();
let mut started = false;
let mut ready = false;
let mut signal_handled = false;
let mut stopped = false;
let mut process_id = None;
while let Ok(Some(envelope)) = timeout(Duration::from_secs(5), rx.recv()).await {
match envelope.event {
TaskEvent::Started {
process_id: pid, ..
} => {
started = true;
process_id = Some(pid);
}
TaskEvent::Ready { .. } => {
ready = true;
executor
.send_signal(nix::sys::signal::Signal::SIGUSR1)
.unwrap();
}
TaskEvent::Output { line, .. } => {
if line.contains("Signal received") {
signal_handled = true;
executor
.terminate_task(TaskTerminateReason::UserRequested)
.unwrap();
}
}
TaskEvent::Stopped { reason, .. } => {
stopped = true;
assert!(matches!(reason, TaskStopReason::Terminated(_)));
break;
}
_ => {}
}
}
assert!(started, "Task should have started");
assert!(ready, "Task should have been ready");
assert!(signal_handled, "Signal should have been handled");
assert!(stopped, "Task should have stopped");
assert!(process_id.is_some(), "Should have process ID");
}
#[cfg(feature = "process-group")]
#[tokio::test]
async fn send_signal_to_process_group() {
let (tx, mut rx) = mpsc::channel::<TaskEventEnvelope>(64);
let config = TaskConfig::new("bash")
.args(["-c", "trap 'echo Parent received signal' SIGUSR1; echo Ready; bash -c 'trap \"echo Child received signal >&2\" SIGUSR1; while true; do sleep 0.1; done' & while true; do sleep 0.1; done"])
.ready_indicator("Ready")
.use_process_group(true);
let mut executor = TaskExecutor::new(config, tx);
executor.coordinate_start().await.unwrap();
let mut started = false;
let mut ready = false;
let mut parent_signal_handled = false;
let mut child_signal_handled = false;
let mut stopped = false;
while let Ok(Some(envelope)) = timeout(Duration::from_secs(6), rx.recv()).await {
match envelope.event {
TaskEvent::Started { .. } => {
started = true;
}
TaskEvent::Ready { .. } => {
ready = true;
tokio::time::sleep(Duration::from_millis(200)).await;
executor
.send_signal(nix::sys::signal::Signal::SIGUSR1)
.unwrap();
}
TaskEvent::Output { line, .. } => {
if line.contains("Parent received signal") {
parent_signal_handled = true;
}
if line.contains("Child received signal") {
child_signal_handled = true;
}
if parent_signal_handled && child_signal_handled {
executor
.terminate_task(TaskTerminateReason::UserRequested)
.unwrap();
}
}
TaskEvent::Stopped { reason, .. } => {
stopped = true;
assert!(matches!(reason, TaskStopReason::Terminated(_)));
break;
}
_ => {}
}
}
assert!(started, "Task should have started");
assert!(ready, "Task should have been ready");
assert!(parent_signal_handled, "Parent should have received signal");
assert!(
child_signal_handled,
"Child process should have received signal in process group"
);
assert!(stopped, "Task should have stopped");
}
#[tokio::test]
async fn send_signal_to_finished_process() {
let (tx, mut rx) = mpsc::channel::<TaskEventEnvelope>(64);
let config = TaskConfig::new("echo").args(["hello"]);
#[cfg(feature = "process-group")]
let config = config.use_process_group(false);
let mut executor = TaskExecutor::new(config, tx);
executor.coordinate_start().await.unwrap();
let mut stopped = false;
while let Ok(Some(envelope)) = timeout(Duration::from_secs(5), rx.recv()).await {
match envelope.event {
TaskEvent::Stopped { .. } => {
stopped = true;
break;
}
_ => {}
}
}
assert!(stopped, "Task should have stopped");
let result = executor.send_signal(nix::sys::signal::Signal::SIGUSR1);
assert!(
result.is_err(),
"Should fail to send signal to finished process"
);
}
#[tokio::test]
async fn send_multiple_signals() {
let (tx, mut rx) = mpsc::channel::<TaskEventEnvelope>(64);
let config = TaskConfig::new("bash")
.args(["-c", "trap 'echo SIGUSR1 >&2' SIGUSR1; trap 'echo SIGUSR2 >&2' SIGUSR2; trap 'echo SIGHUP >&2' SIGHUP; echo Ready; while true; do sleep 0.1; done"])
.ready_indicator("Ready");
#[cfg(feature = "process-group")]
let config = config.use_process_group(false);
let mut executor = TaskExecutor::new(config, tx);
executor.coordinate_start().await.unwrap();
let mut ready = false;
let mut sigusr1_received = false;
let mut sigusr2_received = false;
let mut sighup_received = false;
let mut signals_sent = 0;
while let Ok(Some(envelope)) = timeout(Duration::from_secs(5), rx.recv()).await {
match envelope.event {
TaskEvent::Ready { .. } => {
ready = true;
executor
.send_signal(nix::sys::signal::Signal::SIGUSR1)
.unwrap();
signals_sent += 1;
}
TaskEvent::Output { line, .. } => {
if line.contains("SIGUSR1") {
sigusr1_received = true;
executor
.send_signal(nix::sys::signal::Signal::SIGUSR2)
.unwrap();
signals_sent += 1;
} else if line.contains("SIGUSR2") {
sigusr2_received = true;
executor
.send_signal(nix::sys::signal::Signal::SIGHUP)
.unwrap();
signals_sent += 1;
} else if line.contains("SIGHUP") {
sighup_received = true;
executor
.terminate_task(TaskTerminateReason::UserRequested)
.unwrap();
}
}
TaskEvent::Stopped { .. } => {
break;
}
_ => {}
}
}
assert!(ready, "Task should have been ready");
assert_eq!(signals_sent, 3, "Should have sent 3 signals");
assert!(sigusr1_received, "SIGUSR1 should have been received");
assert!(sigusr2_received, "SIGUSR2 should have been received");
assert!(sighup_received, "SIGHUP should have been received");
}