proc_heim/process/manager.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925
use std::{
collections::HashMap,
fmt::Debug,
io,
path::{Path, PathBuf},
time::Duration,
};
use tokio::{
sync::{
broadcast, mpsc,
oneshot::{self},
},
task::JoinHandle,
};
use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
Stream, StreamExt,
};
use uuid::Uuid;
use crate::working_dir::WorkingDir;
use super::{
log_reader::{LogReaderError, LogsQuery, LogsQueryType},
message::Message,
spawner::ProcessSpawner,
ProcessHandle, ProcessInfo, Runnable,
};
use super::{
model::{Process, ProcessId},
spawner::SpawnerError,
};
#[derive(Debug)]
enum ProcessManagerMessage {
SpawnProcess {
cmd: Box<dyn Runnable>,
responder: oneshot::Sender<Result<ProcessId, SpawnProcessError>>,
},
SubscribeMessageStream {
id: ProcessId,
responder: oneshot::Sender<Result<broadcast::Receiver<Vec<u8>>, ReadMessageError>>,
},
WriteMessage {
id: ProcessId,
data: Vec<u8>,
responder: oneshot::Sender<Result<(), WriteMessageError>>,
},
KillProcess {
id: ProcessId,
responder: oneshot::Sender<Result<(), KillProcessError>>,
},
GetLogs {
id: ProcessId,
logs_query_type: LogsQueryType,
query: LogsQuery,
responder: oneshot::Sender<Result<Vec<String>, GetLogsError>>,
},
GetProcessInfo {
id: ProcessId,
responder: oneshot::Sender<Result<ProcessInfo, GetProcessInfoError>>,
},
Abort {
responder: oneshot::Sender<Result<(), AbortError>>,
},
}
/// `ProcessManager` provides asynchronous API for spawning and managing multiple processes.
///
/// The implementation relies on the `Actor model` architecture which benefits from high concurrency and scalability and loose-coupling between actors (units doing some work).
/// `ProcessManager` is a main actor responsible for:
/// * spawning new actors (eg. for sending/reading messages to/from user-defined processes),
/// * forwarding messages between client and other actors.
///
/// All files needed to handle spawned processes are kept in user-specified directory, called further `working directory`.
/// Each process has own, unique subdirectory (`process directory`) inside `working directory`,
/// where `ProcessManager` creates files/named pipes used for communication, logging etc.
/// If the process has been killed manually, then its `process directory` is removed immediately.
///
/// Each spawned process has its own [`ProcessId`], which can be used to interact with it through [`ProcessManagerHandle`].
/// For convenience of interacting with one process, use a [`ProcessHandle`] wrapper.
pub struct ProcessManager {
working_dir: WorkingDir,
process_spawner: ProcessSpawner,
processes: HashMap<ProcessId, Process>,
receiver: mpsc::Receiver<ProcessManagerMessage>,
is_aborted: bool,
}
impl ProcessManager {
/// Spawns a new process manager task with a given `working_directory`, returning a handle associated with the manager's task.
///
/// The `Err` value is returned, if provided `working_directory` is not a directory or is not writeable.
/// # Examples
/// ```no_run
/// # use proc_heim::manager::ProcessManager;
/// # use std::path::PathBuf;
/// let working_directory = PathBuf::from("/some/temp/path");
/// let handle = ProcessManager::spawn(working_directory).expect("Invalid working directory");
/// ```
pub fn spawn(working_directory: PathBuf) -> Result<ProcessManagerHandle, io::Error> {
Self::validate_working_dir(&working_directory)?;
let (sender, receiver) = mpsc::channel(32);
let working_dir = WorkingDir::new(working_directory);
let mut manager = Self {
working_dir: working_dir.clone(),
process_spawner: ProcessSpawner::new(working_dir),
receiver,
processes: HashMap::new(),
is_aborted: false,
};
tokio::spawn(async move { manager.run().await });
Ok(ProcessManagerHandle::new(sender))
}
fn validate_working_dir(working_directory: &Path) -> Result<(), io::Error> {
let file_path = working_directory.join(Uuid::new_v4().to_string());
std::fs::write(&file_path, "testing working dir")?;
std::fs::remove_file(file_path)
}
async fn run(&mut self) {
loop {
if self.is_aborted {
break;
} else if let Some(msg) = self.receiver.recv().await {
self.handle_message(msg).await;
} else {
break;
}
}
}
async fn handle_message(&mut self, msg: ProcessManagerMessage) {
match msg {
ProcessManagerMessage::SpawnProcess { cmd, responder } => {
let result = self.spawn_process(cmd).await;
let _ = responder.send(result);
}
ProcessManagerMessage::KillProcess { id, responder } => {
let result = self.kill_process(id).await;
let _ = responder.send(result);
}
ProcessManagerMessage::SubscribeMessageStream { id, responder } => {
let result = self.subscribe_to_message_stream(id).await;
let _ = responder.send(result);
}
ProcessManagerMessage::WriteMessage {
id,
data,
responder,
} => {
let result = self.send_message(id, data).await;
let _ = responder.send(result);
}
ProcessManagerMessage::GetLogs {
id,
logs_query_type,
query,
responder,
} => {
let result = self.get_logs(id, logs_query_type, query).await;
let _ = responder.send(result);
}
ProcessManagerMessage::GetProcessInfo { id, responder } => {
let result = self.get_process_info(id);
let _ = responder.send(result);
}
ProcessManagerMessage::Abort { responder } => {
let result = self.abort().await;
let _ = responder.send(result);
}
}
}
async fn spawn_process(
&mut self,
runnable: Box<dyn Runnable>,
) -> Result<ProcessId, SpawnProcessError> {
let id = ProcessId::random();
let process = self.process_spawner.spawn_runnable(&id, runnable)?;
self.processes.insert(id, process);
Ok(id)
}
async fn kill_process(&mut self, id: ProcessId) -> Result<(), KillProcessError> {
let process = self
.processes
.get_mut(&id)
.ok_or(KillProcessError::ProcessNotFound(id))?;
if let Some(reader) = process.message_reader.take() {
reader.abort().await;
}
if let Some(writer) = process.message_writer.take() {
writer.abort().await;
}
if let Some(log_reader) = process.log_reader.take() {
log_reader.abort().await;
}
self.processes.remove(&id); // kill_on_drop() is used to kill child process
let process_dir = self.working_dir.process_dir(&id);
let _ = tokio::fs::remove_dir_all(process_dir).await;
Ok(())
}
async fn subscribe_to_message_stream(
&mut self,
id: ProcessId,
) -> Result<broadcast::Receiver<Vec<u8>>, ReadMessageError> {
let reader = self
.processes
.get(&id)
.ok_or(ReadMessageError::ProcessNotFound(id))?
.message_reader
.as_ref()
.ok_or(ReadMessageError::MessageReaderNotFound(id))?;
let receiver = reader
.subscribe()
.await
.map_err(|_| ReadMessageError::MessageReaderKilled)?;
Ok(receiver)
}
async fn send_message(
&mut self,
id: ProcessId,
data: Vec<u8>,
) -> Result<(), WriteMessageError> {
self.processes
.get(&id)
.ok_or(WriteMessageError::ProcessNotFound(id))?
.message_writer
.as_ref()
.ok_or(WriteMessageError::MessageWriterNotFound(id))?
.write(data)
.await
.map_err(Into::into)
}
async fn get_logs(
&mut self,
id: ProcessId,
logs_query_type: LogsQueryType,
query: LogsQuery,
) -> Result<Vec<String>, GetLogsError> {
self.processes
.get(&id)
.ok_or(GetLogsError::ProcessNotFound(id))?
.log_reader
.as_ref()
.ok_or(GetLogsError::LogReaderNotFound(id))?
.read_logs(logs_query_type, query)
.await
.map_err(Into::into)
}
fn get_process_info(&mut self, id: ProcessId) -> Result<ProcessInfo, GetProcessInfoError> {
let process = self
.processes
.get_mut(&id)
.ok_or(GetProcessInfoError::ProcessNotFound(id))?;
let pid = process.child.id();
let exit_status = process.child.try_wait()?;
Ok(ProcessInfo::new(pid, exit_status))
}
async fn abort(&mut self) -> Result<(), AbortError> {
self.is_aborted = true;
self.receiver.close();
let ids: Vec<ProcessId> = self.processes.keys().cloned().collect();
for id in ids {
let _ = self.kill_process(id).await;
}
Ok(())
}
}
/// Type used for communication with [`ProcessManager`] task.
///
/// It provides asynchronous API for:
/// * spawning new child processes, which implements [`Runnable`] trait,
/// * sending messages to spawned processes as:
/// * raw bytes,
/// * strings,
/// * `Rust` data types serialized to raw bytes, `JSON` or `MessagePack`.
/// * receiving messages from spawned processes using asynchronous streams, including:
/// * reading messages by multiple subscribers,
/// * buffering not read messages with configurable buffer capacity,
/// * communication via standard I/O or named pipes.
/// * reading logs produced by spawned processes (from standard output/error streams),
/// * fetching information about spawned processes (OS-assigned pid, exit status),
/// * forcefully killing processes,
/// * waiting processes to finish.
///
/// `ProcessManagerHandle` can only be created by calling [`ProcessManager::spawn`] method.
/// The handle can be cheaply cloned and used safely by many threads.
#[derive(Clone, Debug)]
pub struct ProcessManagerHandle {
sender: mpsc::Sender<ProcessManagerMessage>,
}
impl ProcessManagerHandle {
fn new(sender: mpsc::Sender<ProcessManagerMessage>) -> Self {
Self { sender }
}
/// Spawn a new child process, returning its assigned identifier, which can be used to interact with the process.
/// # Examples
/// ```
/// use proc_heim::{manager::ProcessManager, model::command::Cmd};
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let working_dir = tempfile::tempdir()?.into_path();
/// let handle = ProcessManager::spawn(working_dir)?;
/// let cmd = Cmd::new("echo");
/// let process_id = handle.spawn(cmd).await?;
/// # Ok(())
/// # }
pub async fn spawn(&self, runnable: impl Runnable) -> Result<ProcessId, SpawnProcessError> {
let (responder, receiver) = oneshot::channel();
let msg = ProcessManagerMessage::SpawnProcess {
cmd: Box::new(runnable),
responder,
};
let _ = self.sender.send(msg).await;
let process_id = receiver.await??;
Ok(process_id)
}
/// Spawn a new child process, returning a [`ProcessHandle`], which can be used to interact with the process.
pub async fn spawn_with_handle(
&self,
runnable: impl Runnable,
) -> Result<ProcessHandle, SpawnProcessError> {
let id = self.spawn(runnable).await?;
Ok(ProcessHandle::new(id, self.clone()))
}
/// Send a [`Message`] to the process with given `id`.
/// # Examples
/// ```
/// use futures::TryStreamExt;
/// use proc_heim::{
/// manager::ProcessManager,
/// model::{
/// command::CmdOptions,
/// script::{Script, ScriptingLanguage},
/// },
/// };
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let working_dir = tempfile::tempdir()?.into_path();
/// let handle = ProcessManager::spawn(working_dir)?;
///
/// let script = Script::with_options(
/// ScriptingLanguage::Bash,
/// r#"
/// read -r msg < /dev/stdin
/// echo "Hello $msg"
/// "#,
/// CmdOptions::with_standard_io_messaging(),
/// );
///
/// let process_id = handle.spawn(script).await?;
/// handle.send_message(process_id, "John").await?;
/// let mut stream = handle.subscribe_message_stream(process_id).await?;
/// let received_msg = stream.try_next().await?.unwrap();
/// assert_eq!("Hello John", received_msg.try_into_string().unwrap());
/// # Ok(())
/// # }
/// ```
pub async fn send_message<M>(&self, id: ProcessId, message: M) -> Result<(), WriteMessageError>
where
M: Into<Message>,
{
if let Some(data) = Self::into_bytes_with_eol_char(message.into().bytes) {
let (responder, receiver) = oneshot::channel();
let msg = ProcessManagerMessage::WriteMessage {
id,
data,
responder,
};
let _ = self.sender.send(msg).await;
receiver.await??;
}
Ok(())
}
fn into_bytes_with_eol_char(mut bytes: Vec<u8>) -> Option<Vec<u8>> {
if let Some(last_char) = bytes.last() {
if *last_char != b'\n' {
bytes.push(b'\n');
}
Some(bytes)
} else {
None
}
}
/// Access asynchronous message stream from the process with given `id`.
///
/// Messages read from the process are returned as [`Message`] types,
/// allowing a convenient conversion into raw bytes, string or even `Rust` types, which implement `Deserialize` trait.
/// If message stream was successfully subscribed, then `Ok(stream)` is returned, otherwise a [`ReadMessageError`] is returned.
/// A stream doesn't yield raw messages, instead each message is wrapped by `Result` with [`ReceiveMessageError`] error type.
/// For convenient stream transformation use [`TryMessageStreamExt`](trait@crate::manager::TryMessageStreamExt) and [`MessageStreamExt`](trait@crate::manager::MessageStreamExt) traits.
///
/// `ProcessManager` for each child process assigns a buffer for messages received from it,
/// which can be configured via [`CmdOptions::set_message_output_buffer_capacity`](crate::model::command::CmdOptions::set_message_output_buffer_capacity).
// NOTE: the same doc as in CmdOptions::set_message_output_buffer_capacity below.
/// When parent process is not reading messages produced by the process,
/// then the messages are buffered up to the given capacity value.
/// If the buffer limit is reached and the process sends a new message,
/// the "oldest" buffered message will be removed. Therefore, when retrieving next message from the stream,
/// the [`ReceiveMessageError::LostMessages`] error will be returned, indicating how many buffered messages have been removed.
///
/// The messages stream can be subscribed multiple times and each subscriber will receive a one copy of the original message.
/// Notice that buffer mentioned earlier is created not per subscriber, but per each process, so when one of subscribers not read messages, the buffer will fill up.
/// # Examples
///
/// Reading a message via standard IO:
///
/// ```
/// use futures::TryStreamExt;
/// use proc_heim::{
/// manager::{ProcessManager, TryMessageStreamExt},
/// model::command::{Cmd, CmdOptions, MessagingType},
/// };
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// // spawn ProcessManager
/// let working_dir = tempfile::tempdir()?.into_path();
/// let handle = ProcessManager::spawn(working_dir)?;
///
/// // create simple echo command
/// let msg = "Hello world!";
/// let options = CmdOptions::with_message_output(MessagingType::StandardIo);
/// let cmd = Cmd::with_args_and_options("echo", [msg], options);
///
/// // read a message from spawned process
/// let process_id = handle.spawn(cmd).await?;
/// let mut stream = handle.subscribe_message_stream(process_id).await?
/// .into_string_stream();
/// let received_msg = stream.try_next().await?.unwrap();
/// assert_eq!(msg, received_msg);
/// # Ok(())
/// # }
/// ```
///
/// Reading messages via named pipes:
///
/// ```
/// use futures::{StreamExt, TryStreamExt};
/// use proc_heim::{
/// manager::{ProcessManager, TryMessageStreamExt, ResultStreamExt},
/// model::{
/// command::CmdOptions,
/// script::{Script, ScriptingLanguage},
/// },
/// };
/// use std::path::PathBuf;
///
/// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let working_dir = tempfile::tempdir()?.into_path();
/// let handle = ProcessManager::spawn(working_dir)?;
/// let script = Script::with_options(
/// ScriptingLanguage::Bash,
/// r#"
/// counter=0
/// while read msg; do
/// echo "$counter: $msg" > $OUTPUT_PIPE
/// counter=$((counter + 1))
/// done < $INPUT_PIPE
/// "#,
/// CmdOptions::with_named_pipe_messaging(), // we want to send messages bidirectionally
/// );
///
/// // We can use "spawn_with_handle" instead of "spawn" to get "ProcessHandle",
/// // which mimics the "ProcessManagerHandle" API,
/// // but without having to pass the process ID to each method call.
/// let process_handle = handle.spawn_with_handle(script).await?;
///
/// process_handle.send_message("First message").await?;
/// // We can send a next message without causing a deadlock here.
/// // This is possible because the response to the first message
/// // will be read by a dedicated Tokio task,
/// // spawned automatically by the Process Manager.
/// process_handle.send_message("Second message").await?;
///
/// let mut stream = process_handle
/// .subscribe_message_stream()
/// .await?
/// .into_string_stream();
///
/// assert_eq!("0: First message", stream.try_next().await?.unwrap());
/// assert_eq!("1: Second message", stream.try_next().await?.unwrap());
/// # Ok(()) }
/// ```
pub async fn subscribe_message_stream(
&self,
id: ProcessId,
) -> Result<impl Stream<Item = Result<Message, ReceiveMessageError>>, ReadMessageError> {
let (responder, receiver) = oneshot::channel();
let msg = ProcessManagerMessage::SubscribeMessageStream { id, responder };
let _ = self.sender.send(msg).await;
let message_receiver = receiver.await??;
let stream = BroadcastStream::new(message_receiver)
.map(|v| v.map(Message::from_bytes).map_err(Into::into));
Ok(stream)
}
/// Fetch logs from standard `output` stream, produced by a process with given `id`.
///
/// The method will return [`GetLogsError::LoggingTypeWasNotConfigured`] error, if [`LoggingType`](enum@crate::model::command::LoggingType) was set to `StderrOnly`.
/// Notice that each string ending with new line character (`\n`) is treated as one log.
/// # Examples
/// ```
/// use proc_heim::{
/// manager::{LogsQuery, ProcessManager},
/// model::{
/// command::{CmdOptions, LoggingType},
/// script::{Script, ScriptingLanguage},
/// },
/// };
/// use std::time::Duration;
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let working_dir = tempfile::tempdir()?.into_path();
/// let handle = ProcessManager::spawn(working_dir)?;
/// let script = Script::with_options(
/// ScriptingLanguage::Bash,
/// r#"
/// echo message1
/// echo message2
/// "#,
/// CmdOptions::with_logging(LoggingType::StdoutOnly),
/// );
///
/// let process_id = handle.spawn(script).await?;
///
/// // We need to wait for the process to spawn and finish in order to collect logs,
/// // otherwise returned logs will be empty.
/// let _ = handle.wait(process_id, Duration::from_micros(10)).await?;
///
/// let logs = handle
/// .get_logs_stdout(process_id, LogsQuery::fetch_all())
/// .await?;
/// assert_eq!(2, logs.len());
/// assert_eq!("message1", logs[0]);
/// assert_eq!("message2", logs[1]);
/// # Ok(())
/// # }
/// ```
pub async fn get_logs_stdout(
&self,
id: ProcessId,
query: LogsQuery,
) -> Result<Vec<String>, GetLogsError> {
self.get_logs(id, LogsQueryType::Stdout, query).await
}
/// Fetch logs from standard `error` stream, produced by a process with given `id`.
///
/// The method will return [`GetLogsError::LoggingTypeWasNotConfigured`] error, if [`LoggingType`](enum@crate::model::command::LoggingType) was set to `StdoutOnly`.
/// Notice that each string ending with new line character (`\n`) is treated as one log.
/// # Examples
/// ```
/// use proc_heim::{
/// manager::{LogsQuery, ProcessManager},
/// model::{
/// command::{CmdOptions, LoggingType},
/// script::{Script, ScriptingLanguage},
/// },
/// };
/// use std::time::Duration;
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let working_dir = tempfile::tempdir()?.into_path();
/// let handle = ProcessManager::spawn(working_dir)?;
/// let script = Script::with_options(
/// ScriptingLanguage::Bash,
/// r#"
/// echo message1 >&2
/// echo message2 >&2
/// echo message3 >&2
/// "#,
/// CmdOptions::with_logging(LoggingType::StderrOnly),
/// );
///
/// let process_id = handle.spawn(script).await?;
///
/// // We need to wait for the process to spawn and finish in order to collect logs,
/// // otherwise returned logs will be empty.
/// let _ = handle.wait(process_id, Duration::from_micros(10)).await?;
///
/// let logs = handle
/// .get_logs_stderr(process_id, LogsQuery::with_offset(1))
/// .await?;
/// assert_eq!(2, logs.len());
/// assert_eq!("message2", logs[0]);
/// assert_eq!("message3", logs[1]);
/// # Ok(())
/// # }
pub async fn get_logs_stderr(
&self,
id: ProcessId,
query: LogsQuery,
) -> Result<Vec<String>, GetLogsError> {
self.get_logs(id, LogsQueryType::Stderr, query).await
}
async fn get_logs(
&self,
id: ProcessId,
logs_query_type: LogsQueryType,
query: LogsQuery,
) -> Result<Vec<String>, GetLogsError> {
let (responder, receiver) = oneshot::channel();
let msg = ProcessManagerMessage::GetLogs {
id,
logs_query_type,
query,
responder,
};
let _ = self.sender.send(msg).await;
let logs = receiver.await??;
Ok(logs)
}
/// Get information about the process with given `id`.
/// # Examples
/// Check information of running process:
/// ```
/// use proc_heim::{manager::ProcessManager, model::command::Cmd};
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let working_dir = tempfile::tempdir()?.into_path();
/// let handle = ProcessManager::spawn(working_dir)?;
/// let process_id = handle.spawn(Cmd::new("cat")).await?;
///
/// let process_info = handle.get_process_info(process_id).await?;
/// assert!(process_info.pid().is_some());
/// # Ok(())
/// # }
/// ```
pub async fn get_process_info(
&self,
id: ProcessId,
) -> Result<ProcessInfo, GetProcessInfoError> {
let (responder, receiver) = oneshot::channel();
let msg = ProcessManagerMessage::GetProcessInfo { id, responder };
let _ = self.sender.send(msg).await;
let data = receiver.await??;
Ok(data)
}
/// Wait for the process with given `id` to finish.
///
/// This method will spawn a `Tokio` task and check in loop, if the process exited.
/// The `poll_interval` parameter specifies how often to check whether the process has completed or not.
/// # Examples
/// ```
/// use std::time::Duration;
/// use proc_heim::{manager::ProcessManager, model::command::Cmd};
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let working_dir = tempfile::tempdir()?.into_path();
/// let handle = ProcessManager::spawn(working_dir)?;
/// let process_id = handle.spawn(Cmd::new("env")).await?;
///
/// let process_info = handle.wait(process_id, Duration::from_micros(10)).await??;
///
/// assert!(!process_info.is_running());
/// assert!(process_info.exit_status().unwrap().success());
/// # Ok(())
/// # }
/// ```
pub fn wait(
&self,
id: ProcessId,
poll_interval: Duration,
) -> JoinHandle<Result<ProcessInfo, GetProcessInfoError>> {
let handle = self.clone();
tokio::spawn(async move {
loop {
let process_info = handle.get_process_info(id).await?;
if process_info.is_running() {
tokio::time::sleep(poll_interval).await;
} else {
return Ok(process_info);
}
}
})
}
/// Forcefully kills the process with given `id`.
///
/// It will also abort all background tasks related to the process (eg. for messaging, logging) and remove its process directory.
/// # Examples
/// ```
/// use proc_heim::{manager::ProcessManager, model::command::Cmd};
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let working_dir = tempfile::tempdir()?.into_path();
/// let handle = ProcessManager::spawn(working_dir)?;
/// let process_id = handle.spawn(Cmd::new("cat")).await?;
///
/// let result = handle.kill(process_id).await;
///
/// assert!(result.is_ok());
/// # Ok(())
/// # }
/// ```
pub async fn kill(&self, id: ProcessId) -> Result<(), KillProcessError> {
let (responder, receiver) = oneshot::channel();
let msg = ProcessManagerMessage::KillProcess { id, responder };
let _ = self.sender.send(msg).await;
receiver.await??;
Ok(())
}
/// Abort a [`ProcessManager`] task associated with this handle.
pub async fn abort(&self) -> Result<(), AbortError> {
let (responder, receiver) = oneshot::channel();
let msg = ProcessManagerMessage::Abort { responder };
self.sender
.send(msg)
.await
.map_err(|_| AbortError::ManagerAlreadyAborted)?;
receiver
.await
.map_err(|_| AbortError::ManagerAlreadyAborted)??;
Ok(())
}
}
/// Error type returned when reading message bytes from a messages stream.
#[derive(thiserror::Error, Debug)]
pub enum ReceiveMessageError {
/// Error indicating that some buffered messages were deleted due to buffer capacity overflow.
/// If you don't care about deleted messages, you can ignore this error (manually or using [`TryMessageStreamExt::ignore_lost_messages`](crate::manager::TryMessageStreamExt::ignore_lost_messages)).
/// Includes the number of deleted messages.
/// See [`ProcessManagerHandle::subscribe_message_stream`] for more information.
#[error("Lost {0} number of messages due to buffer capacity overflow")]
LostMessages(u64),
}
/// Error type returned when reading a deserializable message from a messages stream.
#[derive(thiserror::Error, Debug)]
pub enum ReceiveDeserializedMessageError {
/// Error indicating that some buffered messages were deleted due to buffer capacity overflow.
/// See [`ReceiveMessageError`] for more information.
#[error("Lost {0} number of messages due to buffer capacity overflow")]
LostMessages(u64),
/// Error indicating that a message cannot be deserialized from bytes. Includes error message.
#[error("{0}")]
CannotDeserializeMessage(String),
}
impl From<BroadcastStreamRecvError> for ReceiveMessageError {
fn from(err: BroadcastStreamRecvError) -> Self {
match err {
BroadcastStreamRecvError::Lagged(size) => ReceiveMessageError::LostMessages(size),
}
}
}
impl From<ReceiveMessageError> for ReceiveDeserializedMessageError {
fn from(value: ReceiveMessageError) -> Self {
match value {
ReceiveMessageError::LostMessages(number) => {
ReceiveDeserializedMessageError::LostMessages(number)
}
}
}
}
/// Error type returned when spawning a new child process.
#[derive(thiserror::Error, Debug)]
pub enum SpawnProcessError {
/// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
#[error("Cannot communicate with spawned process manager")]
ManagerCommunicationError(#[from] oneshot::error::RecvError),
/// Cannot create named pipes at process directory (first value). Second value is an error code.
#[error("Cannot create named pipe at path: {0}. Error code: {1}")]
CannotCreateNamedPipe(PathBuf, String),
/// Cannot spawn process due to unexpected an IO error.
#[error("Cannot spawn process: {0}")]
CannotSpawnProcess(#[from] io::Error),
/// Cannot create process directory due to an IO error.
#[error("Cannot create process directory: {0}")]
CannotCreateProcessWorkingDir(io::Error),
/// Error was returned by [`Runnable::bootstrap_cmd`] method. Includes an error message returned from this method.
#[error("Bootstrap process failed: {0}")]
BootstrapProcessFailed(String),
}
impl From<SpawnerError> for SpawnProcessError {
fn from(value: SpawnerError) -> Self {
match value {
SpawnerError::CannotCreateNamedPipe(pipe_path, err) => {
SpawnProcessError::CannotCreateNamedPipe(pipe_path, err.to_string())
}
SpawnerError::CannotSpawnProcess(err) => SpawnProcessError::CannotSpawnProcess(err),
SpawnerError::CannotCreateProcessWorkingDir(err) => {
SpawnProcessError::CannotCreateProcessWorkingDir(err)
}
SpawnerError::BootstrapProcessFailed(err) => Self::BootstrapProcessFailed(err),
}
}
}
/// Error type returned when subscribing to a messages stream.
#[derive(thiserror::Error, Debug)]
pub enum ReadMessageError {
/// The process with given ID was not found (the ID is wrong or the process has been already killed).
#[error("Process with id: {0} was not found")]
ProcessNotFound(ProcessId),
/// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
#[error("Cannot communicate with spawned process manager")]
ManagerCommunicationError(#[from] oneshot::error::RecvError),
/// Cannot get messages stream because a [`MessagingType`](enum@crate::model::command::MessagingType) has not been configured for the process.
#[error("Message reader for process with id: {0} was not found")]
MessageReaderNotFound(ProcessId),
/// The task used to read messages from a process has been killed.
/// This error "shouldn't" normally occur because the task is aborted when a process is being killed. And after this `ProcessNotFound` will be returned.
#[error("Message reader process has been killed")]
MessageReaderKilled,
}
/// Error type returned when sending a message to the process.
#[derive(thiserror::Error, Debug)]
pub enum WriteMessageError {
/// The process with given ID was not found (the ID is wrong or the process has been already killed).
#[error("Process with id: {0} was not found")]
ProcessNotFound(ProcessId),
/// Cannot serialize message to bytes. Includes error message.
#[error("{0}")]
CannotSerializeMessage(String),
/// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
#[error("Cannot communicate with spawned process manager")]
ManagerCommunicationError(#[from] oneshot::error::RecvError),
/// An unexpected IO error occurred when sending a message to the process.
#[error("Error occurred when sending message to process: {0}")]
IoError(#[from] std::io::Error),
/// The task used to send messages to a process has been killed.
/// This error "shouldn't" normally occur because the task is aborted when a process is being killed. And after this `ProcessNotFound` will be returned.
#[error("Message writer for process with id: {0} was not found")]
MessageWriterNotFound(ProcessId),
}
/// Error type returned when trying to kill the process.
#[derive(thiserror::Error, Debug)]
pub enum KillProcessError {
/// The process with given ID was not found (the ID is wrong or the process has been already killed).
#[error("Process with id: {0} was not found")]
ProcessNotFound(ProcessId),
/// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
#[error("Cannot communicate with spawned process manager")]
ManagerCommunicationError(#[from] oneshot::error::RecvError),
}
/// Error type returned when collecting logs from the process.
#[derive(thiserror::Error, Debug)]
pub enum GetLogsError {
/// The process with given ID was not found (the ID is wrong or the process has been already killed).
#[error("Process with id: {0} was not found")]
ProcessNotFound(ProcessId),
/// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
#[error("Cannot communicate with spawned process manager")]
ManagerCommunicationError(#[from] oneshot::error::RecvError),
/// Cannot get logs because a proper [`LoggingType`](enum@crate::model::command::MessagingType) has not been configured for the process.
#[error("Logging type: {0} was not configured for process")]
LoggingTypeWasNotConfigured(String),
/// The task used to reading logs from a process has been killed.
/// This error "shouldn't" normally occur because the task is aborted when a process is being killed. And after this `ProcessNotFound` will be returned.
#[error("Log read for process with id: {0} was not found")]
LogReaderNotFound(ProcessId),
/// An unexpected IO error occurred when reading logs from the process.
#[error(transparent)]
UnExpectedIoError(#[from] io::Error),
}
impl From<LogReaderError> for GetLogsError {
fn from(err: LogReaderError) -> Self {
match err {
LogReaderError::LogTypeWasNotConfigured(log_type) => {
Self::LoggingTypeWasNotConfigured(log_type.to_string())
}
LogReaderError::UnExpectedIoError(err) => Self::UnExpectedIoError(err),
}
}
}
/// Error type returned when trying to get an information about the process.
#[derive(thiserror::Error, Debug)]
pub enum GetProcessInfoError {
/// The process with given ID was not found (the ID is wrong or the process has been already killed).
#[error("Process with id: {0} was not found")]
ProcessNotFound(ProcessId),
/// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
#[error("Cannot communicate with spawned process manager")]
ManagerCommunicationError(#[from] oneshot::error::RecvError),
/// An unexpected IO error occurred when trying to get an information about the process.
#[error(transparent)]
UnExpectedIoError(#[from] io::Error),
}
/// Error type returned when trying to abort a [`ProcessManager`] task.
#[derive(thiserror::Error, Debug)]
pub enum AbortError {
/// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
#[error("Cannot communicate with spawned process manager")]
ManagerAlreadyAborted,
}