tcrm-task 0.4.2

Task execution unit for TCRM project
Documentation
use std::time::Duration;

use tokio::{sync::mpsc, time::timeout};

use crate::tasks::config::TaskConfig;
use crate::tasks::event::TaskEventEnvelope;
use crate::tasks::{
    config::StreamSource,
    control::TaskStatusInfo,
    event::{TaskEvent, TaskStopReason},
    tokio::{
        coordinate::integration_tests::helper::{
            expected_completed_executor_state, expected_started_executor_state,
        },
        executor::TaskExecutor,
    },
};
#[tokio::test]
async fn on_stdout() {
    let (tx, mut rx) = mpsc::channel::<TaskEventEnvelope>(64);
    #[cfg(windows)]
    let config = TaskConfig::new("powershell")
        .args(["-Command", "Write-Output 'READY_INDICATOR'"])
        .ready_indicator("READY_INDICATOR".to_string())
        .ready_indicator_source(StreamSource::Stdout);
    #[cfg(unix)]
    let config = TaskConfig::new("echo")
        .args(["READY_INDICATOR"])
        .ready_indicator("READY_INDICATOR".to_string())
        .ready_indicator_source(StreamSource::Stdout);

    #[cfg(feature = "process-group")]
    let config = config.use_process_group(false);

    let mut executor = TaskExecutor::new(config, tx);
    executor.coordinate_start().await.unwrap();

    let mut started = false;
    let mut output_received = false;
    let mut ready_event = false;
    let mut stopped = false;

    while let Ok(Some(envelope)) = timeout(Duration::from_secs(5), rx.recv()).await {
        match envelope.event {
            TaskEvent::Started {
                process_id,
                created_at,
                running_at,
            } => {
                started = true;
                expected_started_executor_state(&executor);
                assert_eq!(process_id, executor.get_process_id().unwrap());
                assert_eq!(created_at, executor.get_create_at());
                assert_eq!(running_at, executor.get_running_at().unwrap());
            }
            TaskEvent::Output { line, src } => {
                output_received = true;
                assert_eq!(line, "READY_INDICATOR");
                assert_eq!(src, StreamSource::Stdout);
            }
            TaskEvent::Stopped {
                exit_code,
                reason,
                finished_at,
                #[cfg(unix)]
                signal,
            } => {
                expected_completed_executor_state(&executor);
                assert_eq!(exit_code, Some(0));
                assert_eq!(exit_code, executor.get_exit_code());
                assert_eq!(finished_at, executor.get_finished_at().unwrap());
                assert_eq!(reason, TaskStopReason::Finished);
                #[cfg(unix)]
                assert_eq!(signal, None);
                stopped = true;
            }

            TaskEvent::Error { error } => {
                panic!("Task encountered an error: {:?}", error);
            }
            TaskEvent::Ready => {
                ready_event = true;
            }
            TaskEvent::ProcessControl { action } => {
                panic!("Unexpected ProcessControl event: {:?}", action);
            }
        }
    }

    assert!(started);
    assert!(output_received);
    assert!(ready_event);
    assert!(stopped);
}
#[tokio::test]
async fn on_stderr() {
    let (tx, mut rx) = mpsc::channel::<TaskEventEnvelope>(64);
    #[cfg(windows)]
    let config = TaskConfig::new("powershell")
        .args(["-Command", "Write-Error", "READY_INDICATOR"])
        .ready_indicator("READY_INDICATOR".to_string())
        .ready_indicator_source(StreamSource::Stderr);
    #[cfg(unix)]
    let config = TaskConfig::new("sh")
        .args(["-c", "echo READY_INDICATOR >&2"])
        .ready_indicator("READY_INDICATOR".to_string())
        .ready_indicator_source(StreamSource::Stderr);

    #[cfg(feature = "process-group")]
    let config = config.use_process_group(false);

    let mut executor = TaskExecutor::new(config, tx);
    executor.coordinate_start().await.unwrap();

    let mut started = false;
    let mut output_received = false;
    let mut ready_event = false;
    let mut stopped = false;

    while let Ok(Some(envelope)) = timeout(Duration::from_secs(5), rx.recv()).await {
        match envelope.event {
            TaskEvent::Started {
                process_id,
                created_at,
                running_at,
            } => {
                started = true;
                expected_started_executor_state(&executor);
                assert_eq!(process_id, executor.get_process_id().unwrap());
                assert_eq!(created_at, executor.get_create_at());
                assert_eq!(running_at, executor.get_running_at().unwrap());
            }
            TaskEvent::Output { line, src } => {
                output_received = true;
                println!("Output line: {}", line);
                assert_eq!(src, StreamSource::Stderr);
            }
            TaskEvent::Stopped {
                exit_code,
                reason,
                finished_at,
                #[cfg(unix)]
                signal,
            } => {
                expected_completed_executor_state(&executor);
                #[cfg(windows)]
                assert_eq!(exit_code, Some(1));
                #[cfg(unix)]
                assert_eq!(exit_code, Some(0));

                assert_eq!(exit_code, executor.get_exit_code());
                assert_eq!(finished_at, executor.get_finished_at().unwrap());
                assert_eq!(reason, TaskStopReason::Finished);
                #[cfg(unix)]
                assert_eq!(signal, None);
                stopped = true;
            }

            TaskEvent::Error { error } => {
                panic!("Task encountered an error: {:?}", error);
            }
            TaskEvent::Ready => {
                ready_event = true;
            }
            TaskEvent::ProcessControl { action } => {
                panic!("Unexpected ProcessControl event: {:?}", action);
            }
        }
    }

    assert!(started);
    assert!(output_received);
    assert!(ready_event);
    assert!(stopped);
}
#[tokio::test]
async fn src_mismatch() {
    let (tx, mut rx) = mpsc::channel::<TaskEventEnvelope>(64);
    #[cfg(windows)]
    let config = TaskConfig::new("powershell")
        .args(["-Command", "Write-Output 'READY_INDICATOR'"])
        .ready_indicator("READY_INDICATOR".to_string())
        .ready_indicator_source(StreamSource::Stderr);
    #[cfg(unix)]
    let config = TaskConfig::new("echo")
        .args(["READY_INDICATOR"])
        .ready_indicator("READY_INDICATOR".to_string())
        .ready_indicator_source(StreamSource::Stderr);

    #[cfg(feature = "process-group")]
    let config = config.use_process_group(false);

    let mut executor = TaskExecutor::new(config, tx);
    executor.coordinate_start().await.unwrap();

    let mut started = false;
    let mut output_received = false;
    let mut stopped = false;

    while let Ok(Some(envelope)) = timeout(Duration::from_secs(5), rx.recv()).await {
        match envelope.event {
            TaskEvent::Started {
                process_id,
                created_at,
                running_at,
            } => {
                started = true;
                expected_started_executor_state(&executor);
                assert_eq!(process_id, executor.get_process_id().unwrap());
                assert_eq!(created_at, executor.get_create_at());
                assert_eq!(running_at, executor.get_running_at().unwrap());
            }
            TaskEvent::Output { line, src } => {
                output_received = true;
                assert_eq!(line, "READY_INDICATOR");
                assert_eq!(src, StreamSource::Stdout);
            }
            TaskEvent::Stopped {
                exit_code,
                reason,
                finished_at,
                #[cfg(unix)]
                signal,
            } => {
                expected_completed_executor_state(&executor);
                assert_eq!(exit_code, Some(0));
                assert_eq!(exit_code, executor.get_exit_code());
                assert_eq!(finished_at, executor.get_finished_at().unwrap());
                assert_eq!(reason, TaskStopReason::Finished);
                #[cfg(unix)]
                assert_eq!(signal, None);
                stopped = true;
            }

            TaskEvent::Error { error } => {
                panic!("Task encountered an error: {:?}", error);
            }
            TaskEvent::Ready => {
                panic!("Should not emit Ready event when source mismatches");
            }
            TaskEvent::ProcessControl { action } => {
                panic!("Unexpected ProcessControl event: {:?}", action);
            }
        }
    }

    assert!(started);
    assert!(output_received);
    assert!(stopped);
}