proc_heim/process/manager.rs
1use std::{
2 collections::HashMap,
3 fmt::Debug,
4 io,
5 path::{Path, PathBuf},
6 time::Duration,
7};
8
9use tokio::{
10 sync::{
11 broadcast,
12 mpsc::{self},
13 oneshot::{self},
14 },
15 task::JoinHandle,
16};
17use tokio_stream::{
18 wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
19 Stream, StreamExt,
20};
21use uuid::Uuid;
22
23use crate::working_dir::WorkingDir;
24
25use super::{
26 log_reader::{LogReaderError, LogsQuery, LogsQueryType},
27 message::Message,
28 scoped_process_handle::ScopedProcessHandle,
29 spawner::ProcessSpawner,
30 ProcessHandle, ProcessInfo, Runnable,
31};
32use super::{
33 model::{Process, ProcessId},
34 spawner::SpawnerError,
35};
36
37#[derive(Debug)]
38enum ProcessManagerMessage {
39 SpawnProcess {
40 cmd: Box<dyn Runnable>,
41 responder: oneshot::Sender<Result<ProcessId, SpawnProcessError>>,
42 },
43 SubscribeMessageStream {
44 id: ProcessId,
45 responder: oneshot::Sender<Result<broadcast::Receiver<Vec<u8>>, ReadMessageError>>,
46 },
47 WriteMessage {
48 id: ProcessId,
49 data: Vec<u8>,
50 responder: oneshot::Sender<Result<(), WriteMessageError>>,
51 },
52 KillProcess {
53 id: ProcessId,
54 responder: oneshot::Sender<Result<(), KillProcessError>>,
55 },
56 GetLogs {
57 id: ProcessId,
58 logs_query_type: LogsQueryType,
59 query: LogsQuery,
60 responder: oneshot::Sender<Result<Vec<String>, GetLogsError>>,
61 },
62 GetProcessInfo {
63 id: ProcessId,
64 responder: oneshot::Sender<Result<ProcessInfo, GetProcessInfoError>>,
65 },
66 Abort {
67 responder: oneshot::Sender<()>,
68 },
69}
70
71/// `ProcessManager` provides asynchronous API for spawning and managing multiple processes.
72///
73/// The implementation relies on the `Actor model` architecture which benefits from high concurrency and scalability and loose-coupling between actors (units doing some work).
74/// `ProcessManager` is a main actor responsible for:
75/// * spawning new actors (eg. for sending/reading messages to/from user-defined processes),
76/// * forwarding messages between client and other actors.
77///
78/// All files needed to handle spawned processes are kept in user-specified directory, called further `working directory`.
79/// Each process has own, unique subdirectory (`process directory`) inside `working directory`,
80/// where `ProcessManager` creates files/named pipes used for communication, logging etc.
81/// If the process has been killed manually, then its `process directory` is removed immediately.
82///
83/// Each spawned process has its own [`ProcessId`], which can be used to interact with it through [`ProcessManagerHandle`].
84/// For convenience of interacting with one process, use a [`ProcessHandle`] wrapper.
85///
86/// Please note that once all manager handles are dropped, then all child processes will be killed and all `process directories` will be deleted.
87
88pub struct ProcessManager {
89 working_dir: WorkingDir,
90 process_spawner: ProcessSpawner,
91 processes: HashMap<ProcessId, Process>,
92 receiver: mpsc::Receiver<ProcessManagerMessage>,
93 is_aborted: bool,
94}
95
96impl ProcessManager {
97 /// Spawns a new process manager task with a given `working_directory`, returning a handle associated with the manager's task.
98 ///
99 /// The `Err` value is returned, if provided `working_directory` is not a directory or is not writeable.
100 /// # Examples
101 /// ```no_run
102 /// # use proc_heim::manager::ProcessManager;
103 /// # use std::path::PathBuf;
104 /// let working_directory = PathBuf::from("/some/temp/path");
105 /// let handle = ProcessManager::spawn(working_directory).expect("Invalid working directory");
106 /// ```
107 pub fn spawn(working_directory: PathBuf) -> Result<ProcessManagerHandle, io::Error> {
108 Self::validate_working_dir(&working_directory)?;
109 let (sender, receiver) = mpsc::channel(32);
110 let working_dir = WorkingDir::new(working_directory);
111 let mut manager = Self {
112 working_dir: working_dir.clone(),
113 process_spawner: ProcessSpawner::new(working_dir),
114 receiver,
115 processes: HashMap::new(),
116 is_aborted: false,
117 };
118 tokio::spawn(async move { manager.run().await });
119 Ok(ProcessManagerHandle::new(sender))
120 }
121
122 fn validate_working_dir(working_directory: &Path) -> Result<(), io::Error> {
123 let file_path = working_directory.join(Uuid::new_v4().to_string());
124 std::fs::write(&file_path, "testing working dir")?;
125 std::fs::remove_file(file_path)
126 }
127
128 async fn run(&mut self) {
129 loop {
130 if self.is_aborted {
131 break;
132 } else if let Some(msg) = self.receiver.recv().await {
133 self.handle_message(msg).await;
134 } else {
135 self.abort().await;
136 }
137 }
138 }
139
140 async fn handle_message(&mut self, msg: ProcessManagerMessage) {
141 match msg {
142 ProcessManagerMessage::SpawnProcess { cmd, responder } => {
143 let result = self.spawn_process(cmd).await;
144 let _ = responder.send(result);
145 }
146 ProcessManagerMessage::KillProcess { id, responder } => {
147 let result = self.kill_process(id).await;
148 let _ = responder.send(result);
149 }
150 ProcessManagerMessage::SubscribeMessageStream { id, responder } => {
151 let result = self.subscribe_to_message_stream(id).await;
152 let _ = responder.send(result);
153 }
154 ProcessManagerMessage::WriteMessage {
155 id,
156 data,
157 responder,
158 } => {
159 let result = self.send_message(id, data).await;
160 let _ = responder.send(result);
161 }
162 ProcessManagerMessage::GetLogs {
163 id,
164 logs_query_type,
165 query,
166 responder,
167 } => {
168 let result = self.get_logs(id, logs_query_type, query).await;
169 let _ = responder.send(result);
170 }
171 ProcessManagerMessage::GetProcessInfo { id, responder } => {
172 let result = self.get_process_info(id);
173 let _ = responder.send(result);
174 }
175 ProcessManagerMessage::Abort { responder } => {
176 let result = self.abort().await;
177 let _ = responder.send(result);
178 }
179 }
180 }
181
182 async fn spawn_process(
183 &mut self,
184 runnable: Box<dyn Runnable>,
185 ) -> Result<ProcessId, SpawnProcessError> {
186 let id = ProcessId::random();
187 let process = self.process_spawner.spawn_runnable(&id, runnable)?;
188 self.processes.insert(id, process);
189 Ok(id)
190 }
191
192 async fn kill_process(&mut self, id: ProcessId) -> Result<(), KillProcessError> {
193 let process = self
194 .processes
195 .get_mut(&id)
196 .ok_or(KillProcessError::ProcessNotFound(id))?;
197 if let Some(reader) = process.message_reader.take() {
198 reader.abort().await;
199 }
200 if let Some(writer) = process.message_writer.take() {
201 writer.abort().await;
202 }
203 if let Some(log_reader) = process.log_reader.take() {
204 log_reader.abort().await;
205 }
206 self.processes.remove(&id); // kill_on_drop() is used to kill child process
207 let process_dir = self.working_dir.process_dir(&id);
208 let _ = tokio::fs::remove_dir_all(process_dir).await;
209 Ok(())
210 }
211
212 async fn subscribe_to_message_stream(
213 &mut self,
214 id: ProcessId,
215 ) -> Result<broadcast::Receiver<Vec<u8>>, ReadMessageError> {
216 let reader = self
217 .processes
218 .get(&id)
219 .ok_or(ReadMessageError::ProcessNotFound(id))?
220 .message_reader
221 .as_ref()
222 .ok_or(ReadMessageError::MessageReaderNotFound(id))?;
223
224 let receiver = reader
225 .subscribe()
226 .await
227 .map_err(|_| ReadMessageError::MessageReaderKilled)?;
228 Ok(receiver)
229 }
230
231 async fn send_message(
232 &mut self,
233 id: ProcessId,
234 data: Vec<u8>,
235 ) -> Result<(), WriteMessageError> {
236 self.processes
237 .get(&id)
238 .ok_or(WriteMessageError::ProcessNotFound(id))?
239 .message_writer
240 .as_ref()
241 .ok_or(WriteMessageError::MessageWriterNotFound(id))?
242 .write(data)
243 .await
244 .map_err(Into::into)
245 }
246
247 async fn get_logs(
248 &mut self,
249 id: ProcessId,
250 logs_query_type: LogsQueryType,
251 query: LogsQuery,
252 ) -> Result<Vec<String>, GetLogsError> {
253 self.processes
254 .get(&id)
255 .ok_or(GetLogsError::ProcessNotFound(id))?
256 .log_reader
257 .as_ref()
258 .ok_or(GetLogsError::LogReaderNotFound(id))?
259 .read_logs(logs_query_type, query)
260 .await
261 .map_err(Into::into)
262 }
263
264 fn get_process_info(&mut self, id: ProcessId) -> Result<ProcessInfo, GetProcessInfoError> {
265 let process = self
266 .processes
267 .get_mut(&id)
268 .ok_or(GetProcessInfoError::ProcessNotFound(id))?;
269 let pid = process.child.id();
270 let exit_status = process.child.try_wait()?;
271 Ok(ProcessInfo::new(pid, exit_status))
272 }
273
274 async fn abort(&mut self) {
275 self.is_aborted = true;
276 self.receiver.close();
277 let ids: Vec<ProcessId> = self.processes.keys().cloned().collect();
278 for id in ids {
279 let _ = self.kill_process(id).await;
280 }
281 }
282}
283
284impl Drop for ProcessManager {
285 fn drop(&mut self) {
286 self.processes.keys().for_each(|id| {
287 let process_dir = self.working_dir.process_dir(id);
288 let _ = std::fs::remove_dir_all(process_dir);
289 // All processes will be killed automatically, thanks to kill_on_drop()
290 });
291 }
292}
293
294/// Type used for communication with [`ProcessManager`] task.
295///
296/// It provides asynchronous API for:
297/// * spawning new child processes, which implements [`Runnable`] trait,
298/// * sending messages to spawned processes as:
299/// * raw bytes,
300/// * strings,
301/// * `Rust` data types serialized to raw bytes, `JSON` or `MessagePack`.
302/// * receiving messages from spawned processes using asynchronous streams, including:
303/// * reading messages by multiple subscribers,
304/// * buffering not read messages with configurable buffer capacity,
305/// * communication via standard I/O or named pipes.
306/// * reading logs produced by spawned processes (from standard output/error streams),
307/// * fetching information about spawned processes (OS-assigned pid, exit status),
308/// * forcefully killing processes,
309/// * waiting processes to finish.
310///
311/// `ProcessManagerHandle` can only be created by calling [`ProcessManager::spawn`] method.
312/// The handle can be cheaply cloned and used safely by many threads.
313///
314/// Please note that once all manager handles are dropped, then all child processes will be killed and all `process directories` will be deleted.
315#[derive(Clone, Debug)]
316pub struct ProcessManagerHandle {
317 sender: mpsc::Sender<ProcessManagerMessage>,
318}
319
320impl ProcessManagerHandle {
321 fn new(sender: mpsc::Sender<ProcessManagerMessage>) -> Self {
322 Self { sender }
323 }
324
325 /// Spawn a new child process, returning its assigned identifier, which can be used to interact with the process.
326 /// # Examples
327 /// ```
328 /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
329 ///
330 /// # #[tokio::main]
331 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
332 /// let working_dir = tempfile::tempdir()?.into_path();
333 /// let handle = ProcessManager::spawn(working_dir)?;
334 /// let cmd = Cmd::new("echo");
335 /// let process_id = handle.spawn(cmd).await?;
336 /// # Ok(())
337 /// # }
338 pub async fn spawn(&self, runnable: impl Runnable) -> Result<ProcessId, SpawnProcessError> {
339 let (responder, receiver) = oneshot::channel();
340 let msg = ProcessManagerMessage::SpawnProcess {
341 cmd: Box::new(runnable),
342 responder,
343 };
344 let _ = self.sender.send(msg).await;
345 let process_id = receiver.await??;
346 Ok(process_id)
347 }
348
349 /// Spawn a new child process, returning a [`ProcessHandle`], which can be used to interact with the process.
350 pub async fn spawn_with_handle(
351 &self,
352 runnable: impl Runnable,
353 ) -> Result<ProcessHandle, SpawnProcessError> {
354 let id = self.spawn(runnable).await?;
355 Ok(ProcessHandle::new(id, self.clone()))
356 }
357
358 /// Spawn a new child process, returning a [`ScopedProcessHandle`], which can be used to interact with the process.
359 /// Unlike [`ProcessHandle`], dropping this handle will kill the associated child process.
360 pub async fn spawn_with_scoped_handle(
361 &self,
362 runnable: impl Runnable,
363 ) -> Result<ScopedProcessHandle, SpawnProcessError> {
364 let id = self.spawn(runnable).await?;
365 Ok(ScopedProcessHandle::new(ProcessHandle::new(
366 id,
367 self.clone(),
368 )))
369 }
370
371 /// Send a [`Message`] to the process with given `id`.
372 /// # Examples
373 /// ```
374 /// use futures::TryStreamExt;
375 /// use proc_heim::{
376 /// manager::ProcessManager,
377 /// model::{
378 /// command::CmdOptions,
379 /// script::{Script, ScriptingLanguage},
380 /// },
381 /// };
382 ///
383 /// # #[tokio::main]
384 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
385 /// let working_dir = tempfile::tempdir()?.into_path();
386 /// let handle = ProcessManager::spawn(working_dir)?;
387 ///
388 /// let script = Script::with_options(
389 /// ScriptingLanguage::Bash,
390 /// r#"
391 /// read -r msg < /dev/stdin
392 /// echo "Hello $msg"
393 /// "#,
394 /// CmdOptions::with_standard_io_messaging(),
395 /// );
396 ///
397 /// let process_id = handle.spawn(script).await?;
398 /// handle.send_message(process_id, "John").await?;
399 /// let mut stream = handle.subscribe_message_stream(process_id).await?;
400 /// let received_msg = stream.try_next().await?.unwrap();
401 /// assert_eq!("Hello John", received_msg.try_into_string().unwrap());
402 /// # Ok(())
403 /// # }
404 /// ```
405 pub async fn send_message<M>(&self, id: ProcessId, message: M) -> Result<(), WriteMessageError>
406 where
407 M: Into<Message>,
408 {
409 if let Some(data) = Self::into_bytes_with_eol_char(message.into().bytes) {
410 let (responder, receiver) = oneshot::channel();
411 let msg = ProcessManagerMessage::WriteMessage {
412 id,
413 data,
414 responder,
415 };
416 let _ = self.sender.send(msg).await;
417 receiver.await??;
418 }
419 Ok(())
420 }
421
422 fn into_bytes_with_eol_char(mut bytes: Vec<u8>) -> Option<Vec<u8>> {
423 if let Some(last_char) = bytes.last() {
424 if *last_char != b'\n' {
425 bytes.push(b'\n');
426 }
427 Some(bytes)
428 } else {
429 None
430 }
431 }
432
433 /// Access asynchronous message stream from the process with given `id`.
434 ///
435 /// Messages read from the process are returned as [`Message`] types,
436 /// allowing a convenient conversion into raw bytes, string or even `Rust` types, which implement `Deserialize` trait.
437 /// If message stream was successfully subscribed, then `Ok(stream)` is returned, otherwise a [`ReadMessageError`] is returned.
438 /// A stream doesn't yield raw messages, instead each message is wrapped by `Result` with [`ReceiveMessageError`] error type.
439 /// For convenient stream transformation use [`TryMessageStreamExt`](trait@crate::manager::TryMessageStreamExt) and [`MessageStreamExt`](trait@crate::manager::MessageStreamExt) traits.
440 ///
441 /// `ProcessManager` for each child process assigns a buffer for messages received from it,
442 /// which can be configured via [`CmdOptions::set_message_output_buffer_capacity`](crate::model::command::CmdOptions::set_message_output_buffer_capacity).
443 // NOTE: the same doc as in CmdOptions::set_message_output_buffer_capacity below.
444 /// When parent process is not reading messages produced by the process,
445 /// then the messages are buffered up to the given capacity value.
446 /// If the buffer limit is reached and the process sends a new message,
447 /// the "oldest" buffered message will be removed. Therefore, when retrieving next message from the stream,
448 /// the [`ReceiveMessageError::LostMessages`] error will be returned, indicating how many buffered messages have been removed.
449 ///
450 /// The messages stream can be subscribed multiple times and each subscriber will receive a one copy of the original message.
451 /// Notice that buffer mentioned earlier is created not per subscriber, but per each process, so when one of subscribers not read messages, the buffer will fill up.
452 /// # Examples
453 ///
454 /// Reading a message via standard IO:
455 ///
456 /// ```
457 /// use futures::TryStreamExt;
458 /// use proc_heim::{
459 /// manager::{ProcessManager, TryMessageStreamExt},
460 /// model::command::{Cmd, CmdOptions, MessagingType},
461 /// };
462 ///
463 /// # #[tokio::main]
464 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
465 /// // spawn ProcessManager
466 /// let working_dir = tempfile::tempdir()?.into_path();
467 /// let handle = ProcessManager::spawn(working_dir)?;
468 ///
469 /// // create simple echo command
470 /// let msg = "Hello world!";
471 /// let options = CmdOptions::with_message_output(MessagingType::StandardIo);
472 /// let cmd = Cmd::with_args_and_options("echo", [msg], options);
473 ///
474 /// // read a message from spawned process
475 /// let process_id = handle.spawn(cmd).await?;
476 /// let mut stream = handle.subscribe_message_stream(process_id).await?
477 /// .into_string_stream();
478 /// let received_msg = stream.try_next().await?.unwrap();
479 /// assert_eq!(msg, received_msg);
480 /// # Ok(())
481 /// # }
482 /// ```
483 ///
484 /// Reading messages via named pipes:
485 ///
486 /// ```
487 /// use futures::{StreamExt, TryStreamExt};
488 /// use proc_heim::{
489 /// manager::{ProcessManager, TryMessageStreamExt, ResultStreamExt},
490 /// model::{
491 /// command::CmdOptions,
492 /// script::{Script, ScriptingLanguage},
493 /// },
494 /// };
495 /// use std::path::PathBuf;
496 ///
497 /// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
498 /// let working_dir = tempfile::tempdir()?.into_path();
499 /// let handle = ProcessManager::spawn(working_dir)?;
500 /// let script = Script::with_options(
501 /// ScriptingLanguage::Bash,
502 /// r#"
503 /// counter=0
504 /// while read msg; do
505 /// echo "$counter: $msg" > $OUTPUT_PIPE
506 /// counter=$((counter + 1))
507 /// done < $INPUT_PIPE
508 /// "#,
509 /// CmdOptions::with_named_pipe_messaging(), // we want to send messages bidirectionally
510 /// );
511 ///
512 /// // We can use "spawn_with_handle" instead of "spawn" to get "ProcessHandle",
513 /// // which mimics the "ProcessManagerHandle" API,
514 /// // but without having to pass the process ID to each method call.
515 /// let process_handle = handle.spawn_with_handle(script).await?;
516 ///
517 /// process_handle.send_message("First message").await?;
518 /// // We can send a next message without causing a deadlock here.
519 /// // This is possible because the response to the first message
520 /// // will be read by a dedicated Tokio task,
521 /// // spawned automatically by the Process Manager.
522 /// process_handle.send_message("Second message").await?;
523 ///
524 /// let mut stream = process_handle
525 /// .subscribe_message_stream()
526 /// .await?
527 /// .into_string_stream();
528 ///
529 /// assert_eq!("0: First message", stream.try_next().await?.unwrap());
530 /// assert_eq!("1: Second message", stream.try_next().await?.unwrap());
531 /// # Ok(()) }
532 /// ```
533 pub async fn subscribe_message_stream(
534 &self,
535 id: ProcessId,
536 ) -> Result<impl Stream<Item = Result<Message, ReceiveMessageError>>, ReadMessageError> {
537 let (responder, receiver) = oneshot::channel();
538 let msg = ProcessManagerMessage::SubscribeMessageStream { id, responder };
539 let _ = self.sender.send(msg).await;
540 let message_receiver = receiver.await??;
541 let stream = BroadcastStream::new(message_receiver)
542 .map(|v| v.map(Message::from_bytes).map_err(Into::into));
543 Ok(stream)
544 }
545
546 /// Fetch logs from standard `output` stream, produced by a process with given `id`.
547 ///
548 /// The method will return [`GetLogsError::LoggingTypeWasNotConfigured`] error, if [`LoggingType`](enum@crate::model::command::LoggingType) was set to `StderrOnly`.
549 /// Notice that each string ending with new line character (`\n`) is treated as one log.
550 /// # Examples
551 /// ```
552 /// use proc_heim::{
553 /// manager::{LogsQuery, ProcessManager},
554 /// model::{
555 /// command::{CmdOptions, LoggingType},
556 /// script::{Script, ScriptingLanguage},
557 /// },
558 /// };
559 /// use std::time::Duration;
560 ///
561 /// # #[tokio::main]
562 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
563 /// let working_dir = tempfile::tempdir()?.into_path();
564 /// let handle = ProcessManager::spawn(working_dir)?;
565 /// let script = Script::with_options(
566 /// ScriptingLanguage::Bash,
567 /// r#"
568 /// echo message1
569 /// echo message2
570 /// "#,
571 /// CmdOptions::with_logging(LoggingType::StdoutOnly),
572 /// );
573 ///
574 /// let process_id = handle.spawn(script).await?;
575 ///
576 /// // We need to wait for the process to spawn and finish in order to collect logs,
577 /// // otherwise returned logs will be empty.
578 /// let _ = handle.wait(process_id, Duration::from_micros(10)).await?;
579 ///
580 /// let logs = handle
581 /// .get_logs_stdout(process_id, LogsQuery::fetch_all())
582 /// .await?;
583 /// assert_eq!(2, logs.len());
584 /// assert_eq!("message1", logs[0]);
585 /// assert_eq!("message2", logs[1]);
586 /// # Ok(())
587 /// # }
588 /// ```
589 pub async fn get_logs_stdout(
590 &self,
591 id: ProcessId,
592 query: LogsQuery,
593 ) -> Result<Vec<String>, GetLogsError> {
594 self.get_logs(id, LogsQueryType::Stdout, query).await
595 }
596
597 /// Fetch logs from standard `error` stream, produced by a process with given `id`.
598 ///
599 /// The method will return [`GetLogsError::LoggingTypeWasNotConfigured`] error, if [`LoggingType`](enum@crate::model::command::LoggingType) was set to `StdoutOnly`.
600 /// Notice that each string ending with new line character (`\n`) is treated as one log.
601 /// # Examples
602 /// ```
603 /// use proc_heim::{
604 /// manager::{LogsQuery, ProcessManager},
605 /// model::{
606 /// command::{CmdOptions, LoggingType},
607 /// script::{Script, ScriptingLanguage},
608 /// },
609 /// };
610 /// use std::time::Duration;
611 ///
612 /// # #[tokio::main]
613 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
614 /// let working_dir = tempfile::tempdir()?.into_path();
615 /// let handle = ProcessManager::spawn(working_dir)?;
616 /// let script = Script::with_options(
617 /// ScriptingLanguage::Bash,
618 /// r#"
619 /// echo message1 >&2
620 /// echo message2 >&2
621 /// echo message3 >&2
622 /// "#,
623 /// CmdOptions::with_logging(LoggingType::StderrOnly),
624 /// );
625 ///
626 /// let process_id = handle.spawn(script).await?;
627 ///
628 /// // We need to wait for the process to spawn and finish in order to collect logs,
629 /// // otherwise returned logs will be empty.
630 /// let _ = handle.wait(process_id, Duration::from_micros(10)).await?;
631 ///
632 /// let logs = handle
633 /// .get_logs_stderr(process_id, LogsQuery::with_offset(1))
634 /// .await?;
635 /// assert_eq!(2, logs.len());
636 /// assert_eq!("message2", logs[0]);
637 /// assert_eq!("message3", logs[1]);
638 /// # Ok(())
639 /// # }
640 pub async fn get_logs_stderr(
641 &self,
642 id: ProcessId,
643 query: LogsQuery,
644 ) -> Result<Vec<String>, GetLogsError> {
645 self.get_logs(id, LogsQueryType::Stderr, query).await
646 }
647
648 async fn get_logs(
649 &self,
650 id: ProcessId,
651 logs_query_type: LogsQueryType,
652 query: LogsQuery,
653 ) -> Result<Vec<String>, GetLogsError> {
654 let (responder, receiver) = oneshot::channel();
655 let msg = ProcessManagerMessage::GetLogs {
656 id,
657 logs_query_type,
658 query,
659 responder,
660 };
661 let _ = self.sender.send(msg).await;
662 let logs = receiver.await??;
663 Ok(logs)
664 }
665
666 /// Get information about the process with given `id`.
667 /// # Examples
668 /// Check information of running process:
669 /// ```
670 /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
671 ///
672 /// # #[tokio::main]
673 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
674 /// let working_dir = tempfile::tempdir()?.into_path();
675 /// let handle = ProcessManager::spawn(working_dir)?;
676 /// let process_id = handle.spawn(Cmd::new("cat")).await?;
677 ///
678 /// let process_info = handle.get_process_info(process_id).await?;
679 /// assert!(process_info.pid().is_some());
680 /// # Ok(())
681 /// # }
682 /// ```
683 pub async fn get_process_info(
684 &self,
685 id: ProcessId,
686 ) -> Result<ProcessInfo, GetProcessInfoError> {
687 let (responder, receiver) = oneshot::channel();
688 let msg = ProcessManagerMessage::GetProcessInfo { id, responder };
689 let _ = self.sender.send(msg).await;
690 let data = receiver.await??;
691 Ok(data)
692 }
693
694 /// Wait for the process with given `id` to finish.
695 ///
696 /// This method will spawn a `Tokio` task and check in loop, if the process exited.
697 /// The `poll_interval` parameter specifies how often to check whether the process has completed or not.
698 /// # Examples
699 /// ```
700 /// use std::time::Duration;
701 /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
702 ///
703 /// # #[tokio::main]
704 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
705 /// let working_dir = tempfile::tempdir()?.into_path();
706 /// let handle = ProcessManager::spawn(working_dir)?;
707 /// let process_id = handle.spawn(Cmd::new("env")).await?;
708 ///
709 /// let process_info = handle.wait(process_id, Duration::from_micros(10)).await??;
710 ///
711 /// assert!(!process_info.is_running());
712 /// assert!(process_info.exit_status().unwrap().success());
713 /// # Ok(())
714 /// # }
715 /// ```
716 pub fn wait(
717 &self,
718 id: ProcessId,
719 poll_interval: Duration,
720 ) -> JoinHandle<Result<ProcessInfo, GetProcessInfoError>> {
721 let handle = self.clone();
722 tokio::spawn(async move {
723 loop {
724 let process_info = handle.get_process_info(id).await?;
725 if process_info.is_running() {
726 tokio::time::sleep(poll_interval).await;
727 } else {
728 return Ok(process_info);
729 }
730 }
731 })
732 }
733
734 /// Forcefully kills the process with given `id`.
735 ///
736 /// It will also abort all background tasks related to the process (eg. for messaging, logging) and remove its process directory.
737 /// # Examples
738 /// ```
739 /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
740 ///
741 /// # #[tokio::main]
742 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
743 /// let working_dir = tempfile::tempdir()?.into_path();
744 /// let handle = ProcessManager::spawn(working_dir)?;
745 /// let process_id = handle.spawn(Cmd::new("cat")).await?;
746 ///
747 /// let result = handle.kill(process_id).await;
748 ///
749 /// assert!(result.is_ok());
750 /// # Ok(())
751 /// # }
752 /// ```
753 pub async fn kill(&self, id: ProcessId) -> Result<(), KillProcessError> {
754 let (responder, receiver) = oneshot::channel();
755 let msg = ProcessManagerMessage::KillProcess { id, responder };
756 let _ = self.sender.send(msg).await;
757 receiver.await??;
758 Ok(())
759 }
760
761 /// Abort a [`ProcessManager`] task associated with this handle. This will kill all child processes and delete their corresponding `process directories`.
762 /// Please note that [`ProcessManager`] will be aborted automatically when all of its handles are dropped.
763 pub async fn abort(&self) -> Result<(), AbortError> {
764 let (responder, receiver) = oneshot::channel();
765 let msg = ProcessManagerMessage::Abort { responder };
766 self.sender
767 .send(msg)
768 .await
769 .map_err(|_| AbortError::ManagerAlreadyAborted)?;
770 receiver
771 .await
772 .map_err(|_| AbortError::ManagerAlreadyAborted)?;
773 Ok(())
774 }
775
776 pub(crate) fn try_kill(&self, id: ProcessId) -> bool {
777 let (responder, _) = oneshot::channel();
778 let msg = ProcessManagerMessage::KillProcess { id, responder };
779 self.sender.try_send(msg).is_ok()
780 }
781
782 pub(crate) async fn kill_with_timeout(&self, id: ProcessId, duration: Duration) {
783 let (responder, receiver) = oneshot::channel();
784 let msg = ProcessManagerMessage::KillProcess { id, responder };
785 let _ = self.sender.send_timeout(msg, duration).await;
786 let _ = receiver.await;
787 }
788}
789
790/// Error type returned when reading message bytes from a messages stream.
791#[derive(thiserror::Error, Debug)]
792pub enum ReceiveMessageError {
793 /// Error indicating that some buffered messages were deleted due to buffer capacity overflow.
794 /// If you don't care about deleted messages, you can ignore this error (manually or using [`TryMessageStreamExt::ignore_lost_messages`](crate::manager::TryMessageStreamExt::ignore_lost_messages)).
795 /// Includes the number of deleted messages.
796 /// See [`ProcessManagerHandle::subscribe_message_stream`] for more information.
797 #[error("Lost {0} number of messages due to buffer capacity overflow")]
798 LostMessages(u64),
799}
800
801/// Error type returned when reading a deserializable message from a messages stream.
802#[derive(thiserror::Error, Debug)]
803pub enum ReceiveDeserializedMessageError {
804 /// Error indicating that some buffered messages were deleted due to buffer capacity overflow.
805 /// See [`ReceiveMessageError`] for more information.
806 #[error("Lost {0} number of messages due to buffer capacity overflow")]
807 LostMessages(u64),
808 /// Error indicating that a message cannot be deserialized from bytes. Includes error message.
809 #[error("{0}")]
810 CannotDeserializeMessage(String),
811}
812
813impl From<BroadcastStreamRecvError> for ReceiveMessageError {
814 fn from(err: BroadcastStreamRecvError) -> Self {
815 match err {
816 BroadcastStreamRecvError::Lagged(size) => ReceiveMessageError::LostMessages(size),
817 }
818 }
819}
820
821impl From<ReceiveMessageError> for ReceiveDeserializedMessageError {
822 fn from(value: ReceiveMessageError) -> Self {
823 match value {
824 ReceiveMessageError::LostMessages(number) => {
825 ReceiveDeserializedMessageError::LostMessages(number)
826 }
827 }
828 }
829}
830
831/// Error type returned when spawning a new child process.
832#[derive(thiserror::Error, Debug)]
833pub enum SpawnProcessError {
834 /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
835 #[error("Cannot communicate with spawned process manager")]
836 ManagerCommunicationError(#[from] oneshot::error::RecvError),
837 /// Cannot create named pipes at process directory (first value). Second value is an error code.
838 #[error("Cannot create named pipe at path: {0}. Error code: {1}")]
839 CannotCreateNamedPipe(PathBuf, String),
840 /// Cannot spawn process due to unexpected an IO error.
841 #[error("Cannot spawn process: {0}")]
842 CannotSpawnProcess(#[from] io::Error),
843 /// Cannot create process directory due to an IO error.
844 #[error("Cannot create process directory: {0}")]
845 CannotCreateProcessWorkingDir(io::Error),
846 /// Error was returned by [`Runnable::bootstrap_cmd`] method. Includes an error message returned from this method.
847 #[error("Bootstrap process failed: {0}")]
848 BootstrapProcessFailed(String),
849}
850
851impl From<SpawnerError> for SpawnProcessError {
852 fn from(value: SpawnerError) -> Self {
853 match value {
854 SpawnerError::CannotCreateNamedPipe(pipe_path, err) => {
855 SpawnProcessError::CannotCreateNamedPipe(pipe_path, err.to_string())
856 }
857 SpawnerError::CannotSpawnProcess(err) => SpawnProcessError::CannotSpawnProcess(err),
858 SpawnerError::CannotCreateProcessWorkingDir(err) => {
859 SpawnProcessError::CannotCreateProcessWorkingDir(err)
860 }
861 SpawnerError::BootstrapProcessFailed(err) => Self::BootstrapProcessFailed(err),
862 }
863 }
864}
865
866/// Error type returned when subscribing to a messages stream.
867#[derive(thiserror::Error, Debug)]
868pub enum ReadMessageError {
869 /// The process with given ID was not found (the ID is wrong or the process has been already killed).
870 #[error("Process with id: {0} was not found")]
871 ProcessNotFound(ProcessId),
872 /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
873 #[error("Cannot communicate with spawned process manager")]
874 ManagerCommunicationError(#[from] oneshot::error::RecvError),
875 /// Cannot get messages stream because a [`MessagingType`](enum@crate::model::command::MessagingType) has not been configured for the process.
876 #[error("Message reader for process with id: {0} was not found")]
877 MessageReaderNotFound(ProcessId),
878 /// The task used to read messages from a process has been killed.
879 /// This error "shouldn't" normally occur because the task is aborted when a process is being killed. And after this `ProcessNotFound` will be returned.
880 #[error("Message reader process has been killed")]
881 MessageReaderKilled,
882}
883
884/// Error type returned when sending a message to the process.
885#[derive(thiserror::Error, Debug)]
886pub enum WriteMessageError {
887 /// The process with given ID was not found (the ID is wrong or the process has been already killed).
888 #[error("Process with id: {0} was not found")]
889 ProcessNotFound(ProcessId),
890 /// Cannot serialize message to bytes. Includes error message.
891 #[error("{0}")]
892 CannotSerializeMessage(String),
893 /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
894 #[error("Cannot communicate with spawned process manager")]
895 ManagerCommunicationError(#[from] oneshot::error::RecvError),
896 /// An unexpected IO error occurred when sending a message to the process.
897 #[error("Error occurred when sending message to process: {0}")]
898 IoError(#[from] std::io::Error),
899 /// The task used to send messages to a process has been killed.
900 /// This error "shouldn't" normally occur because the task is aborted when a process is being killed. And after this `ProcessNotFound` will be returned.
901 #[error("Message writer for process with id: {0} was not found")]
902 MessageWriterNotFound(ProcessId),
903}
904
905/// Error type returned when trying to kill the process.
906#[derive(thiserror::Error, Debug)]
907pub enum KillProcessError {
908 /// The process with given ID was not found (the ID is wrong or the process has been already killed).
909 #[error("Process with id: {0} was not found")]
910 ProcessNotFound(ProcessId),
911 /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
912 #[error("Cannot communicate with spawned process manager")]
913 ManagerCommunicationError(#[from] oneshot::error::RecvError),
914}
915
916/// Error type returned when collecting logs from the process.
917#[derive(thiserror::Error, Debug)]
918pub enum GetLogsError {
919 /// The process with given ID was not found (the ID is wrong or the process has been already killed).
920 #[error("Process with id: {0} was not found")]
921 ProcessNotFound(ProcessId),
922 /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
923 #[error("Cannot communicate with spawned process manager")]
924 ManagerCommunicationError(#[from] oneshot::error::RecvError),
925 /// Cannot get logs because a proper [`LoggingType`](enum@crate::model::command::MessagingType) has not been configured for the process.
926 #[error("Logging type: {0} was not configured for process")]
927 LoggingTypeWasNotConfigured(String),
928 /// The task used to reading logs from a process has been killed.
929 /// This error "shouldn't" normally occur because the task is aborted when a process is being killed. And after this `ProcessNotFound` will be returned.
930 #[error("Log read for process with id: {0} was not found")]
931 LogReaderNotFound(ProcessId),
932 /// An unexpected IO error occurred when reading logs from the process.
933 #[error(transparent)]
934 UnExpectedIoError(#[from] io::Error),
935}
936
937impl From<LogReaderError> for GetLogsError {
938 fn from(err: LogReaderError) -> Self {
939 match err {
940 LogReaderError::LogTypeWasNotConfigured(log_type) => {
941 Self::LoggingTypeWasNotConfigured(log_type.to_string())
942 }
943 LogReaderError::UnExpectedIoError(err) => Self::UnExpectedIoError(err),
944 }
945 }
946}
947
948/// Error type returned when trying to get an information about the process.
949#[derive(thiserror::Error, Debug)]
950pub enum GetProcessInfoError {
951 /// The process with given ID was not found (the ID is wrong or the process has been already killed).
952 #[error("Process with id: {0} was not found")]
953 ProcessNotFound(ProcessId),
954 /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
955 #[error("Cannot communicate with spawned process manager")]
956 ManagerCommunicationError(#[from] oneshot::error::RecvError),
957 /// An unexpected IO error occurred when trying to get an information about the process.
958 #[error(transparent)]
959 UnExpectedIoError(#[from] io::Error),
960}
961
962/// Error type returned when trying to abort a [`ProcessManager`] task.
963#[derive(thiserror::Error, Debug)]
964pub enum AbortError {
965 /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
966 #[error("Cannot communicate with spawned process manager")]
967 ManagerAlreadyAborted,
968}