proc-heim 0.1.5

Library for running and managing short-lived and long-lived processes using asynchronous API
Documentation
use std::time::Duration;

use crate::common::create_process_manager;

use futures::{FutureExt, StreamExt, TryStreamExt};
use proc_heim::{
    manager::{GetProcessInfoError, LogsQuery, TryMessageStreamExt},
    model::script::ScriptingLanguage,
};
use test_utils::{
    cmd_collection::{hanging_forever_cmd, std_io::echo_cmd},
    scripts_collection::*,
};

mod common;
mod test_cases;

#[cfg(feature = "json")]
use crate::test_cases::ExampleMessage;
#[cfg(feature = "json")]
use proc_heim::model::command::CmdOptions;

#[tokio::test]
async fn test_process_handle_wrapper() {
    let (_dir, manager_handle) = create_process_manager();

    let arg: &str = "example argument with spaces";
    let args = vec![arg.to_owned()];
    let message_to_sent = "Test message";

    let script = build_echo_script(ScriptingLanguage::Bash, BASH_ECHO_SCRIPT, &args);

    let handle = manager_handle
        .spawn_with_handle(script.clone())
        .await
        .unwrap();

    handle.send_message(message_to_sent).await.unwrap();

    let mut stream = handle
        .subscribe_message_stream()
        .await
        .unwrap()
        .into_string_stream();
    let message = stream.try_next().await.unwrap().unwrap();
    assert_eq!(message_to_sent, message);
    assert!(stream.next().now_or_never().is_none());

    let stdout = handle
        .get_logs_stdout(LogsQuery::fetch_all())
        .await
        .unwrap();

    assert_eq!(2, stdout.len());
    assert_eq!(format!("First Argument: {arg}"), *stdout.first().unwrap());
    assert_eq!(
        format!("Received: {message_to_sent}"),
        *stdout.get(1).unwrap()
    );

    let errors = handle
        .get_logs_stderr(LogsQuery::fetch_all())
        .await
        .unwrap();

    assert_eq!(1, errors.len());
    assert_eq!(
        format!("Error: {message_to_sent}"),
        *errors.first().unwrap()
    );

    let process_data = handle
        .wait(Duration::from_millis(500))
        .await
        .unwrap()
        .unwrap();
    assert!(process_data.exit_status().unwrap().success());
}

#[tokio::test]
async fn should_kill_process() {
    let (dir, manager_handle) = create_process_manager();
    let script = build_echo_script(
        ScriptingLanguage::Bash,
        BASH_ECHO_SCRIPT,
        &["arg".to_owned()],
    );
    let handle = manager_handle.spawn_with_handle(script).await.unwrap();
    tokio::time::sleep(Duration::from_secs(1)).await;

    let process_data = handle.get_process_info().await.unwrap();
    assert!(process_data.is_running());

    let process_dir = dir.path().join(handle.id().to_string());

    assert!(process_dir.exists());
    assert!(handle.kill().await.is_ok());
    assert!(!process_dir.exists());

    let result = handle.get_process_info().await;
    assert!(matches!(
        result,
        Err(GetProcessInfoError::ProcessNotFound(_))
    ));
}

#[cfg(feature = "json")]
#[tokio::test]
async fn should_write_and_read_json() {
    use proc_heim::manager::serde::MessageFormat;
    use proc_heim::manager::{Message, ResultStreamExt};
    use proc_heim::model::script::Script;

    let (_dir, manager_handle) = create_process_manager();
    let script = Script::with_options(
        ScriptingLanguage::Bash,
        r#"
            read msg < /dev/stdin
            echo "$msg"
            "#,
        CmdOptions::with_standard_io_messaging(),
    );
    let handle = manager_handle.spawn_with_handle(script).await.unwrap();

    let data1 = "Some message";
    let example_message = ExampleMessage {
        data1: data1.into(),
        ..Default::default()
    };

    let message = Message::from_serializable(&example_message, &MessageFormat::Json).unwrap();
    handle.send_message(message).await.unwrap();

    let mut stream = handle
        .subscribe_message_stream()
        .await
        .unwrap()
        .into_deserialized_stream::<ExampleMessage>(&MessageFormat::Json)
        .filter_ok();
    let next_message = stream.next().await.unwrap();
    assert_eq!(next_message, example_message);
}

#[tokio::test]
async fn should_wait_for_process_completion() {
    let (_dir, manager_handle) = create_process_manager();
    let handle = manager_handle
        .spawn_with_handle(echo_cmd("some message"))
        .await
        .unwrap();

    let data = handle
        .wait(Duration::from_micros(100))
        .await
        .unwrap()
        .unwrap();
    assert!(data.exit_status().is_some());
}

#[tokio::test]
async fn should_not_kill_process_on_drop() {
    let (_dir, manager_handle) = create_process_manager();

    let handle = manager_handle
        .spawn_with_handle(hanging_forever_cmd())
        .await
        .unwrap();
    let process_id = *handle.id();

    drop(handle);
    tokio::time::sleep(Duration::from_secs(1)).await;

    let info = manager_handle.get_process_info(process_id).await.unwrap();
    assert!(info.is_running());
}