ProcessManagerHandle

Struct ProcessManagerHandle 

Source
pub struct ProcessManagerHandle { /* private fields */ }
Expand description

Type used for communication with ProcessManager task.

It provides asynchronous API for:

  • spawning new child processes, which implements Runnable trait,
  • sending messages to spawned processes as:
    • raw bytes,
    • strings,
    • Rust data types serialized to raw bytes, JSON or MessagePack.
  • receiving messages from spawned processes using asynchronous streams, including:
    • reading messages by multiple subscribers,
    • buffering not read messages with configurable buffer capacity,
    • communication via standard I/O or named pipes.
  • reading logs produced by spawned processes (from standard output/error streams),
  • fetching information about spawned processes (OS-assigned pid, exit status),
  • forcefully killing processes,
  • waiting processes to finish.

ProcessManagerHandle can only be created by calling ProcessManager::spawn method. The handle can be cheaply cloned and used safely by many threads.

Please note that once all manager handles are dropped, then all child processes will be killed and all process directories will be deleted.

Implementations§

Source§

impl ProcessManagerHandle

Source

pub async fn spawn( &self, runnable: impl Runnable, ) -> Result<ProcessId, SpawnProcessError>

Spawn a new child process, returning its assigned identifier, which can be used to interact with the process.

§Examples
use proc_heim::{manager::ProcessManager, model::command::Cmd};

let working_dir = tempfile::tempdir()?.into_path();
let handle = ProcessManager::spawn(working_dir)?;
let cmd = Cmd::new("echo");
let process_id = handle.spawn(cmd).await?;
Source

pub async fn spawn_with_handle( &self, runnable: impl Runnable, ) -> Result<ProcessHandle, SpawnProcessError>

Spawn a new child process, returning a ProcessHandle, which can be used to interact with the process.

Source

pub async fn spawn_with_scoped_handle( &self, runnable: impl Runnable, ) -> Result<ScopedProcessHandle, SpawnProcessError>

Spawn a new child process, returning a ScopedProcessHandle, which can be used to interact with the process. Unlike ProcessHandle, dropping this handle will kill the associated child process.

Source

pub async fn send_message<M>( &self, id: ProcessId, message: M, ) -> Result<(), WriteMessageError>
where M: Into<Message>,

Send a Message to the process with given id.

§Examples
use futures::TryStreamExt;
use proc_heim::{
    manager::ProcessManager,
    model::{
        command::CmdOptions,
        script::{Script, ScriptingLanguage},
    },
};

let working_dir = tempfile::tempdir()?.into_path();
let handle = ProcessManager::spawn(working_dir)?;

let script = Script::with_options(
    ScriptingLanguage::Bash,
    r#"
    read -r msg < /dev/stdin
    echo "Hello $msg"
    "#,
    CmdOptions::with_standard_io_messaging(),
);

let process_id = handle.spawn(script).await?;
handle.send_message(process_id, "John").await?;
let mut stream = handle.subscribe_message_stream(process_id).await?;
let received_msg = stream.try_next().await?.unwrap();
assert_eq!("Hello John", received_msg.try_into_string().unwrap());
Source

pub async fn subscribe_message_stream( &self, id: ProcessId, ) -> Result<impl Stream<Item = Result<Message, ReceiveMessageError>>, ReadMessageError>

Access asynchronous message stream from the process with given id.

Messages read from the process are returned as Message types, allowing a convenient conversion into raw bytes, string or even Rust types, which implement Deserialize trait. If message stream was successfully subscribed, then Ok(stream) is returned, otherwise a ReadMessageError is returned. A stream doesn’t yield raw messages, instead each message is wrapped by Result with ReceiveMessageError error type. For convenient stream transformation use TryMessageStreamExt and MessageStreamExt traits.

ProcessManager for each child process assigns a buffer for messages received from it, which can be configured via CmdOptions::set_message_output_buffer_capacity. When parent process is not reading messages produced by the process, then the messages are buffered up to the given capacity value. If the buffer limit is reached and the process sends a new message, the “oldest” buffered message will be removed. Therefore, when retrieving next message from the stream, the ReceiveMessageError::LostMessages error will be returned, indicating how many buffered messages have been removed.

The messages stream can be subscribed multiple times and each subscriber will receive a one copy of the original message. Notice that buffer mentioned earlier is created not per subscriber, but per each process, so when one of subscribers not read messages, the buffer will fill up.

§Examples

Reading a message via standard IO:

use futures::TryStreamExt;
use proc_heim::{
    manager::{ProcessManager, TryMessageStreamExt},
    model::command::{Cmd, CmdOptions, MessagingType},
};

// spawn ProcessManager
let working_dir = tempfile::tempdir()?.into_path();
let handle = ProcessManager::spawn(working_dir)?;

// create simple echo command
let msg = "Hello world!";
let options = CmdOptions::with_message_output(MessagingType::StandardIo);
let cmd = Cmd::with_args_and_options("echo", [msg], options);

// read a message from spawned process
let process_id = handle.spawn(cmd).await?;
let mut stream = handle.subscribe_message_stream(process_id).await?
    .into_string_stream();
let received_msg = stream.try_next().await?.unwrap();
assert_eq!(msg, received_msg);

Reading messages via named pipes:

use futures::{StreamExt, TryStreamExt};
use proc_heim::{
    manager::{ProcessManager, TryMessageStreamExt, ResultStreamExt},
    model::{
        command::CmdOptions,
        script::{Script, ScriptingLanguage},
    },
};
use std::path::PathBuf;

let working_dir = tempfile::tempdir()?.into_path();
let handle = ProcessManager::spawn(working_dir)?;
let script = Script::with_options(
    ScriptingLanguage::Bash,
    r#"
    counter=0
    while read msg; do
        echo "$counter: $msg" > $OUTPUT_PIPE
        counter=$((counter + 1))
    done < $INPUT_PIPE
    "#,
    CmdOptions::with_named_pipe_messaging(), // we want to send messages bidirectionally
);

// We can use "spawn_with_handle" instead of "spawn" to get "ProcessHandle",
// which mimics the "ProcessManagerHandle" API,
// but without having to pass the process ID to each method call.
let process_handle = handle.spawn_with_handle(script).await?;

process_handle.send_message("First message").await?;
// We can send a next message without causing a deadlock here.
// This is possible because the response to the first message
// will be read by a dedicated Tokio task,
// spawned automatically by the Process Manager.
process_handle.send_message("Second message").await?;

let mut stream = process_handle
    .subscribe_message_stream()
    .await?
    .into_string_stream();

assert_eq!("0: First message", stream.try_next().await?.unwrap());
assert_eq!("1: Second message", stream.try_next().await?.unwrap());
Source

pub async fn get_logs_stdout( &self, id: ProcessId, query: LogsQuery, ) -> Result<Vec<String>, GetLogsError>

Fetch logs from standard output stream, produced by a process with given id.

The method will return GetLogsError::LoggingTypeWasNotConfigured error, if LoggingType was set to StderrOnly. Notice that each string ending with new line character (\n) is treated as one log.

§Examples
use proc_heim::{
    manager::{LogsQuery, ProcessManager},
    model::{
        command::{CmdOptions, LoggingType},
        script::{Script, ScriptingLanguage},
    },
};
use std::time::Duration;

let working_dir = tempfile::tempdir()?.into_path();
let handle = ProcessManager::spawn(working_dir)?;
let script = Script::with_options(
    ScriptingLanguage::Bash,
    r#"
    echo message1
    echo message2
    "#,
    CmdOptions::with_logging(LoggingType::StdoutOnly),
);

let process_id = handle.spawn(script).await?;

// We need to wait for the process to spawn and finish in order to collect logs,
// otherwise returned logs will be empty.
let _ = handle.wait(process_id, Duration::from_micros(10)).await?;

let logs = handle
    .get_logs_stdout(process_id, LogsQuery::fetch_all())
    .await?;
assert_eq!(2, logs.len());
assert_eq!("message1", logs[0]);
assert_eq!("message2", logs[1]);
Source

pub async fn get_logs_stderr( &self, id: ProcessId, query: LogsQuery, ) -> Result<Vec<String>, GetLogsError>

Fetch logs from standard error stream, produced by a process with given id.

The method will return GetLogsError::LoggingTypeWasNotConfigured error, if LoggingType was set to StdoutOnly. Notice that each string ending with new line character (\n) is treated as one log.

§Examples
use proc_heim::{
    manager::{LogsQuery, ProcessManager},
    model::{
        command::{CmdOptions, LoggingType},
        script::{Script, ScriptingLanguage},
    },
};
use std::time::Duration;

let working_dir = tempfile::tempdir()?.into_path();
let handle = ProcessManager::spawn(working_dir)?;
let script = Script::with_options(
    ScriptingLanguage::Bash,
    r#"
    echo message1 >&2
    echo message2 >&2
    echo message3 >&2
    "#,
    CmdOptions::with_logging(LoggingType::StderrOnly),
);

let process_id = handle.spawn(script).await?;

// We need to wait for the process to spawn and finish in order to collect logs,
// otherwise returned logs will be empty.
let _ = handle.wait(process_id, Duration::from_micros(10)).await?;

let logs = handle
    .get_logs_stderr(process_id, LogsQuery::with_offset(1))
    .await?;
assert_eq!(2, logs.len());
assert_eq!("message2", logs[0]);
assert_eq!("message3", logs[1]);
Source

pub async fn get_process_info( &self, id: ProcessId, ) -> Result<ProcessInfo, GetProcessInfoError>

Get information about the process with given id.

§Examples

Check information of running process:

use proc_heim::{manager::ProcessManager, model::command::Cmd};

let working_dir = tempfile::tempdir()?.into_path();
let handle = ProcessManager::spawn(working_dir)?;
let process_id = handle.spawn(Cmd::new("cat")).await?;

let process_info = handle.get_process_info(process_id).await?;
assert!(process_info.pid().is_some());
Source

pub fn wait( &self, id: ProcessId, poll_interval: Duration, ) -> JoinHandle<Result<ProcessInfo, GetProcessInfoError>>

Wait for the process with given id to finish.

This method will spawn a Tokio task and check in loop, if the process exited. The poll_interval parameter specifies how often to check whether the process has completed or not.

§Examples
use std::time::Duration;
use proc_heim::{manager::ProcessManager, model::command::Cmd};

let working_dir = tempfile::tempdir()?.into_path();
let handle = ProcessManager::spawn(working_dir)?;
let process_id = handle.spawn(Cmd::new("env")).await?;

let process_info = handle.wait(process_id, Duration::from_micros(10)).await??;

assert!(!process_info.is_running());
assert!(process_info.exit_status().unwrap().success());
Source

pub async fn kill(&self, id: ProcessId) -> Result<(), KillProcessError>

Forcefully kills the process with given id.

It will also abort all background tasks related to the process (eg. for messaging, logging) and remove its process directory.

§Examples
use proc_heim::{manager::ProcessManager, model::command::Cmd};

let working_dir = tempfile::tempdir()?.into_path();
let handle = ProcessManager::spawn(working_dir)?;
let process_id = handle.spawn(Cmd::new("cat")).await?;

let result = handle.kill(process_id).await;

assert!(result.is_ok());
Source

pub async fn abort(&self) -> Result<(), AbortError>

Abort a ProcessManager task associated with this handle. This will kill all child processes and delete their corresponding process directories. Please note that ProcessManager will be aborted automatically when all of its handles are dropped.

Trait Implementations§

Source§

impl Clone for ProcessManagerHandle

Source§

fn clone(&self) -> ProcessManagerHandle

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for ProcessManagerHandle

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V