pub struct ProcessManagerHandle { /* private fields */ }
Expand description
Type used for communication with ProcessManager
task.
It provides asynchronous API for:
- spawning new child processes, which implements
Runnable
trait, - sending messages to spawned processes as:
- raw bytes,
- strings,
Rust
data types serialized to raw bytes,JSON
orMessagePack
.
- receiving messages from spawned processes using asynchronous streams, including:
- reading messages by multiple subscribers,
- buffering not read messages with configurable buffer capacity,
- communication via standard I/O or named pipes.
- reading logs produced by spawned processes (from standard output/error streams),
- fetching information about spawned processes (OS-assigned pid, exit status),
- forcefully killing processes,
- waiting processes to finish.
ProcessManagerHandle
can only be created by calling ProcessManager::spawn
method.
The handle can be cheaply cloned and used safely by many threads.
Please note that once all manager handles are dropped, then all child processes will be killed and all process directories
will be deleted.
Implementations§
Source§impl ProcessManagerHandle
impl ProcessManagerHandle
Sourcepub async fn spawn(
&self,
runnable: impl Runnable,
) -> Result<ProcessId, SpawnProcessError>
pub async fn spawn( &self, runnable: impl Runnable, ) -> Result<ProcessId, SpawnProcessError>
Spawn a new child process, returning its assigned identifier, which can be used to interact with the process.
§Examples
use proc_heim::{manager::ProcessManager, model::command::Cmd};
let working_dir = tempfile::tempdir()?.into_path();
let handle = ProcessManager::spawn(working_dir)?;
let cmd = Cmd::new("echo");
let process_id = handle.spawn(cmd).await?;
Sourcepub async fn spawn_with_handle(
&self,
runnable: impl Runnable,
) -> Result<ProcessHandle, SpawnProcessError>
pub async fn spawn_with_handle( &self, runnable: impl Runnable, ) -> Result<ProcessHandle, SpawnProcessError>
Spawn a new child process, returning a ProcessHandle
, which can be used to interact with the process.
Sourcepub async fn spawn_with_scoped_handle(
&self,
runnable: impl Runnable,
) -> Result<ScopedProcessHandle, SpawnProcessError>
pub async fn spawn_with_scoped_handle( &self, runnable: impl Runnable, ) -> Result<ScopedProcessHandle, SpawnProcessError>
Spawn a new child process, returning a ScopedProcessHandle
, which can be used to interact with the process.
Unlike ProcessHandle
, dropping this handle will kill the associated child process.
Sourcepub async fn send_message<M>(
&self,
id: ProcessId,
message: M,
) -> Result<(), WriteMessageError>
pub async fn send_message<M>( &self, id: ProcessId, message: M, ) -> Result<(), WriteMessageError>
Send a Message
to the process with given id
.
§Examples
use futures::TryStreamExt;
use proc_heim::{
manager::ProcessManager,
model::{
command::CmdOptions,
script::{Script, ScriptingLanguage},
},
};
let working_dir = tempfile::tempdir()?.into_path();
let handle = ProcessManager::spawn(working_dir)?;
let script = Script::with_options(
ScriptingLanguage::Bash,
r#"
read -r msg < /dev/stdin
echo "Hello $msg"
"#,
CmdOptions::with_standard_io_messaging(),
);
let process_id = handle.spawn(script).await?;
handle.send_message(process_id, "John").await?;
let mut stream = handle.subscribe_message_stream(process_id).await?;
let received_msg = stream.try_next().await?.unwrap();
assert_eq!("Hello John", received_msg.try_into_string().unwrap());
Sourcepub async fn subscribe_message_stream(
&self,
id: ProcessId,
) -> Result<impl Stream<Item = Result<Message, ReceiveMessageError>>, ReadMessageError>
pub async fn subscribe_message_stream( &self, id: ProcessId, ) -> Result<impl Stream<Item = Result<Message, ReceiveMessageError>>, ReadMessageError>
Access asynchronous message stream from the process with given id
.
Messages read from the process are returned as Message
types,
allowing a convenient conversion into raw bytes, string or even Rust
types, which implement Deserialize
trait.
If message stream was successfully subscribed, then Ok(stream)
is returned, otherwise a ReadMessageError
is returned.
A stream doesn’t yield raw messages, instead each message is wrapped by Result
with ReceiveMessageError
error type.
For convenient stream transformation use TryMessageStreamExt
and MessageStreamExt
traits.
ProcessManager
for each child process assigns a buffer for messages received from it,
which can be configured via CmdOptions::set_message_output_buffer_capacity
.
When parent process is not reading messages produced by the process,
then the messages are buffered up to the given capacity value.
If the buffer limit is reached and the process sends a new message,
the “oldest” buffered message will be removed. Therefore, when retrieving next message from the stream,
the ReceiveMessageError::LostMessages
error will be returned, indicating how many buffered messages have been removed.
The messages stream can be subscribed multiple times and each subscriber will receive a one copy of the original message. Notice that buffer mentioned earlier is created not per subscriber, but per each process, so when one of subscribers not read messages, the buffer will fill up.
§Examples
Reading a message via standard IO:
use futures::TryStreamExt;
use proc_heim::{
manager::{ProcessManager, TryMessageStreamExt},
model::command::{Cmd, CmdOptions, MessagingType},
};
// spawn ProcessManager
let working_dir = tempfile::tempdir()?.into_path();
let handle = ProcessManager::spawn(working_dir)?;
// create simple echo command
let msg = "Hello world!";
let options = CmdOptions::with_message_output(MessagingType::StandardIo);
let cmd = Cmd::with_args_and_options("echo", [msg], options);
// read a message from spawned process
let process_id = handle.spawn(cmd).await?;
let mut stream = handle.subscribe_message_stream(process_id).await?
.into_string_stream();
let received_msg = stream.try_next().await?.unwrap();
assert_eq!(msg, received_msg);
Reading messages via named pipes:
use futures::{StreamExt, TryStreamExt};
use proc_heim::{
manager::{ProcessManager, TryMessageStreamExt, ResultStreamExt},
model::{
command::CmdOptions,
script::{Script, ScriptingLanguage},
},
};
use std::path::PathBuf;
let working_dir = tempfile::tempdir()?.into_path();
let handle = ProcessManager::spawn(working_dir)?;
let script = Script::with_options(
ScriptingLanguage::Bash,
r#"
counter=0
while read msg; do
echo "$counter: $msg" > $OUTPUT_PIPE
counter=$((counter + 1))
done < $INPUT_PIPE
"#,
CmdOptions::with_named_pipe_messaging(), // we want to send messages bidirectionally
);
// We can use "spawn_with_handle" instead of "spawn" to get "ProcessHandle",
// which mimics the "ProcessManagerHandle" API,
// but without having to pass the process ID to each method call.
let process_handle = handle.spawn_with_handle(script).await?;
process_handle.send_message("First message").await?;
// We can send a next message without causing a deadlock here.
// This is possible because the response to the first message
// will be read by a dedicated Tokio task,
// spawned automatically by the Process Manager.
process_handle.send_message("Second message").await?;
let mut stream = process_handle
.subscribe_message_stream()
.await?
.into_string_stream();
assert_eq!("0: First message", stream.try_next().await?.unwrap());
assert_eq!("1: Second message", stream.try_next().await?.unwrap());
Sourcepub async fn get_logs_stdout(
&self,
id: ProcessId,
query: LogsQuery,
) -> Result<Vec<String>, GetLogsError>
pub async fn get_logs_stdout( &self, id: ProcessId, query: LogsQuery, ) -> Result<Vec<String>, GetLogsError>
Fetch logs from standard output
stream, produced by a process with given id
.
The method will return GetLogsError::LoggingTypeWasNotConfigured
error, if LoggingType
was set to StderrOnly
.
Notice that each string ending with new line character (\n
) is treated as one log.
§Examples
use proc_heim::{
manager::{LogsQuery, ProcessManager},
model::{
command::{CmdOptions, LoggingType},
script::{Script, ScriptingLanguage},
},
};
use std::time::Duration;
let working_dir = tempfile::tempdir()?.into_path();
let handle = ProcessManager::spawn(working_dir)?;
let script = Script::with_options(
ScriptingLanguage::Bash,
r#"
echo message1
echo message2
"#,
CmdOptions::with_logging(LoggingType::StdoutOnly),
);
let process_id = handle.spawn(script).await?;
// We need to wait for the process to spawn and finish in order to collect logs,
// otherwise returned logs will be empty.
let _ = handle.wait(process_id, Duration::from_micros(10)).await?;
let logs = handle
.get_logs_stdout(process_id, LogsQuery::fetch_all())
.await?;
assert_eq!(2, logs.len());
assert_eq!("message1", logs[0]);
assert_eq!("message2", logs[1]);
Sourcepub async fn get_logs_stderr(
&self,
id: ProcessId,
query: LogsQuery,
) -> Result<Vec<String>, GetLogsError>
pub async fn get_logs_stderr( &self, id: ProcessId, query: LogsQuery, ) -> Result<Vec<String>, GetLogsError>
Fetch logs from standard error
stream, produced by a process with given id
.
The method will return GetLogsError::LoggingTypeWasNotConfigured
error, if LoggingType
was set to StdoutOnly
.
Notice that each string ending with new line character (\n
) is treated as one log.
§Examples
use proc_heim::{
manager::{LogsQuery, ProcessManager},
model::{
command::{CmdOptions, LoggingType},
script::{Script, ScriptingLanguage},
},
};
use std::time::Duration;
let working_dir = tempfile::tempdir()?.into_path();
let handle = ProcessManager::spawn(working_dir)?;
let script = Script::with_options(
ScriptingLanguage::Bash,
r#"
echo message1 >&2
echo message2 >&2
echo message3 >&2
"#,
CmdOptions::with_logging(LoggingType::StderrOnly),
);
let process_id = handle.spawn(script).await?;
// We need to wait for the process to spawn and finish in order to collect logs,
// otherwise returned logs will be empty.
let _ = handle.wait(process_id, Duration::from_micros(10)).await?;
let logs = handle
.get_logs_stderr(process_id, LogsQuery::with_offset(1))
.await?;
assert_eq!(2, logs.len());
assert_eq!("message2", logs[0]);
assert_eq!("message3", logs[1]);
Sourcepub async fn get_process_info(
&self,
id: ProcessId,
) -> Result<ProcessInfo, GetProcessInfoError>
pub async fn get_process_info( &self, id: ProcessId, ) -> Result<ProcessInfo, GetProcessInfoError>
Get information about the process with given id
.
§Examples
Check information of running process:
use proc_heim::{manager::ProcessManager, model::command::Cmd};
let working_dir = tempfile::tempdir()?.into_path();
let handle = ProcessManager::spawn(working_dir)?;
let process_id = handle.spawn(Cmd::new("cat")).await?;
let process_info = handle.get_process_info(process_id).await?;
assert!(process_info.pid().is_some());
Sourcepub fn wait(
&self,
id: ProcessId,
poll_interval: Duration,
) -> JoinHandle<Result<ProcessInfo, GetProcessInfoError>>
pub fn wait( &self, id: ProcessId, poll_interval: Duration, ) -> JoinHandle<Result<ProcessInfo, GetProcessInfoError>>
Wait for the process with given id
to finish.
This method will spawn a Tokio
task and check in loop, if the process exited.
The poll_interval
parameter specifies how often to check whether the process has completed or not.
§Examples
use std::time::Duration;
use proc_heim::{manager::ProcessManager, model::command::Cmd};
let working_dir = tempfile::tempdir()?.into_path();
let handle = ProcessManager::spawn(working_dir)?;
let process_id = handle.spawn(Cmd::new("env")).await?;
let process_info = handle.wait(process_id, Duration::from_micros(10)).await??;
assert!(!process_info.is_running());
assert!(process_info.exit_status().unwrap().success());
Sourcepub async fn kill(&self, id: ProcessId) -> Result<(), KillProcessError>
pub async fn kill(&self, id: ProcessId) -> Result<(), KillProcessError>
Forcefully kills the process with given id
.
It will also abort all background tasks related to the process (eg. for messaging, logging) and remove its process directory.
§Examples
use proc_heim::{manager::ProcessManager, model::command::Cmd};
let working_dir = tempfile::tempdir()?.into_path();
let handle = ProcessManager::spawn(working_dir)?;
let process_id = handle.spawn(Cmd::new("cat")).await?;
let result = handle.kill(process_id).await;
assert!(result.is_ok());
Sourcepub async fn abort(&self) -> Result<(), AbortError>
pub async fn abort(&self) -> Result<(), AbortError>
Abort a ProcessManager
task associated with this handle. This will kill all child processes and delete their corresponding process directories
.
Please note that ProcessManager
will be aborted automatically when all of its handles are dropped.
Trait Implementations§
Source§impl Clone for ProcessManagerHandle
impl Clone for ProcessManagerHandle
Source§fn clone(&self) -> ProcessManagerHandle
fn clone(&self) -> ProcessManagerHandle
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read more