tcrm-task 0.4.2

Task execution unit for TCRM project
Documentation
use core::panic;
use std::time::Duration;

use tokio::{sync::mpsc, time::timeout};

use crate::tasks::{
    config::TaskConfig,
    control::TaskStatusInfo,
    error::TaskError,
    event::{TaskEvent, TaskEventEnvelope, TaskStopReason},
    tokio::{
        coordinate::integration_tests::helper::expected_stopped_executor_state,
        executor::TaskExecutor,
    },
};

#[tokio::test]
async fn empty_command() {
    let (tx, mut rx) = mpsc::channel::<TaskEventEnvelope>(64);

    let config = TaskConfig::new("");

    #[cfg(feature = "process-group")]
    let config = config.use_process_group(false);

    let mut executor = TaskExecutor::new(config, tx);
    let result = executor.coordinate_start().await;
    assert!(matches!(result, Err(TaskError::InvalidConfiguration(_))));
    let mut error_event = false;
    let mut stopped_event = false;

    while let Ok(Some(envelope)) = timeout(Duration::from_secs(5), rx.recv()).await {
        match envelope.event {
            TaskEvent::Started {
                process_id,
                created_at,
                running_at,
            } => {
                panic!(
                    "Unexpected start event: pid {}, created at {:?}, running at {:?}",
                    process_id, created_at, running_at
                );
            }
            TaskEvent::Output { line, src } => {
                panic!("Unexpected output: {} from {:?}", line, src);
            }
            TaskEvent::Stopped {
                exit_code,
                reason,
                finished_at,
                #[cfg(unix)]
                signal,
            } => {
                stopped_event = true;
                expected_stopped_executor_state(&executor);
                assert_eq!(exit_code, None);
                assert_eq!(exit_code, executor.get_exit_code());
                assert_eq!(finished_at, executor.get_finished_at().unwrap());
                assert!(matches!(
                    reason,
                    TaskStopReason::Error(TaskError::InvalidConfiguration(_))
                ));
                #[cfg(unix)]
                assert_eq!(signal, None);
            }

            TaskEvent::Error { error } => {
                error_event = true;
                assert!(matches!(error, TaskError::InvalidConfiguration(_)));
            }
            TaskEvent::Ready => {
                panic!("Unexpected Ready event");
            }
            TaskEvent::ProcessControl { action } => {
                panic!("Unexpected ProcessControl event: {:?}", action);
            }
        }
    }

    assert!(stopped_event);
    assert!(error_event);
}

#[tokio::test]
async fn command_not_found() {
    let (tx, mut rx) = mpsc::channel::<TaskEventEnvelope>(64);

    let config = TaskConfig::new("non_existent_command");

    #[cfg(feature = "process-group")]
    let config = config.use_process_group(false);

    let mut executor = TaskExecutor::new(config, tx);
    let result = executor.coordinate_start().await;
    assert!(matches!(result, Err(TaskError::IO(_))));
    let mut error_event = false;
    let mut stopped_event = false;

    while let Ok(Some(envelope)) = timeout(Duration::from_secs(5), rx.recv()).await {
        match envelope.event {
            TaskEvent::Started {
                process_id,
                created_at,
                running_at,
            } => {
                panic!(
                    "Unexpected start event: pid {}, created at {:?}, running at {:?}",
                    process_id, created_at, running_at
                );
            }
            TaskEvent::Output { line, src } => {
                panic!("Unexpected output: {} from {:?}", line, src);
            }
            TaskEvent::Stopped {
                exit_code,
                reason,
                finished_at,
                #[cfg(unix)]
                signal,
            } => {
                stopped_event = true;
                expected_stopped_executor_state(&executor);
                assert_eq!(exit_code, None);
                assert_eq!(exit_code, executor.get_exit_code());
                assert_eq!(finished_at, executor.get_finished_at().unwrap());
                assert!(matches!(reason, TaskStopReason::Error(TaskError::IO(_))));
                #[cfg(unix)]
                assert_eq!(signal, None);
            }

            TaskEvent::Error { error } => {
                error_event = true;
                assert!(matches!(error, TaskError::IO(_)));
            }
            TaskEvent::Ready => {
                panic!("Unexpected Ready event");
            }
            TaskEvent::ProcessControl { action } => {
                panic!("Unexpected ProcessControl event: {:?}", action);
            }
        }
    }

    assert!(stopped_event);
    assert!(error_event);
}

#[tokio::test]
async fn not_exist_dir() {
    let (tx, mut rx) = mpsc::channel::<TaskEventEnvelope>(64);

    #[cfg(windows)]
    let config = TaskConfig::new("cmd")
        .args(["/C", "echo test"])
        .working_dir("/non/existent/directory");
    #[cfg(unix)]
    let config = TaskConfig::new("echo")
        .args(["test"])
        .working_dir("/non/existent/directory");

    #[cfg(feature = "process-group")]
    let config = config.use_process_group(false);

    let mut executor = TaskExecutor::new(config, tx);
    let result = executor.coordinate_start().await;
    assert!(matches!(result, Err(TaskError::IO(_))));
    let mut error_event = false;
    let mut stopped_event = false;

    while let Ok(Some(envelope)) = timeout(Duration::from_secs(5), rx.recv()).await {
        match envelope.event {
            TaskEvent::Started {
                process_id,
                created_at,
                running_at,
            } => {
                panic!(
                    "Unexpected start event: pid {}, created at {:?}, running at {:?}",
                    process_id, created_at, running_at
                );
            }
            TaskEvent::Output { line, src } => {
                panic!("Unexpected output: {} from {:?}", line, src);
            }
            TaskEvent::Stopped {
                exit_code,
                reason,
                finished_at,
                #[cfg(unix)]
                signal,
            } => {
                stopped_event = true;
                expected_stopped_executor_state(&executor);
                assert_eq!(exit_code, None);
                assert_eq!(exit_code, executor.get_exit_code());
                assert_eq!(finished_at, executor.get_finished_at().unwrap());
                assert!(matches!(reason, TaskStopReason::Error(TaskError::IO(_))));
                #[cfg(unix)]
                assert_eq!(signal, None);
            }

            TaskEvent::Error { error } => {
                error_event = true;
                assert!(matches!(error, TaskError::IO(_)));
            }
            TaskEvent::Ready => {
                panic!("Unexpected Ready event");
            }
            TaskEvent::ProcessControl { action } => {
                panic!("Unexpected ProcessControl event: {:?}", action);
            }
        }
    }

    assert!(stopped_event);
    assert!(error_event);
}

#[tokio::test]
async fn zero_timeout() {
    let (tx, mut rx) = mpsc::channel::<TaskEventEnvelope>(64);

    #[cfg(windows)]
    let config = TaskConfig::new("powershell").args(["-Command", "Start-Sleep -Seconds 1"]);
    #[cfg(unix)]
    let config = TaskConfig::new("sleep").args(["1"]);

    let config = config.timeout_ms(0);
    #[cfg(feature = "process-group")]
    let config = config.use_process_group(false);

    let mut executor = TaskExecutor::new(config, tx);
    let result = executor.coordinate_start().await;
    assert!(matches!(result, Err(TaskError::InvalidConfiguration(_))));
    let mut error_event = false;
    let mut stopped_event = false;

    while let Ok(Some(envelope)) = timeout(Duration::from_secs(5), rx.recv()).await {
        match envelope.event {
            TaskEvent::Started {
                process_id,
                created_at,
                running_at,
            } => {
                panic!(
                    "Unexpected start event: pid {}, created at {:?}, running at {:?}",
                    process_id, created_at, running_at
                );
            }
            TaskEvent::Output { line, src } => {
                panic!("Unexpected output: {} from {:?}", line, src);
            }
            TaskEvent::Stopped {
                exit_code,
                reason,
                finished_at,
                #[cfg(unix)]
                signal,
            } => {
                stopped_event = true;
                expected_stopped_executor_state(&executor);
                assert_eq!(exit_code, None);
                assert_eq!(exit_code, executor.get_exit_code());
                assert_eq!(finished_at, executor.get_finished_at().unwrap());
                assert!(matches!(
                    reason,
                    TaskStopReason::Error(TaskError::InvalidConfiguration(_))
                ));
                #[cfg(unix)]
                assert_eq!(signal, None);
            }

            TaskEvent::Error { error } => {
                error_event = true;
                assert!(matches!(error, TaskError::InvalidConfiguration(_)));
            }
            TaskEvent::Ready => {
                panic!("Unexpected Ready event");
            }
            TaskEvent::ProcessControl { action } => {
                panic!("Unexpected ProcessControl event: {:?}", action);
            }
        }
    }

    assert!(stopped_event);
    assert!(error_event);
}