proc_heim/process/
manager.rs

1use std::{
2    collections::HashMap,
3    fmt::Debug,
4    io,
5    path::{Path, PathBuf},
6    time::Duration,
7};
8
9use tokio::{
10    sync::{
11        broadcast, mpsc,
12        oneshot::{self},
13    },
14    task::JoinHandle,
15};
16use tokio_stream::{
17    wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
18    Stream, StreamExt,
19};
20use uuid::Uuid;
21
22use crate::working_dir::WorkingDir;
23
24use super::{
25    log_reader::{LogReaderError, LogsQuery, LogsQueryType},
26    message::Message,
27    spawner::ProcessSpawner,
28    ProcessHandle, ProcessInfo, Runnable,
29};
30use super::{
31    model::{Process, ProcessId},
32    spawner::SpawnerError,
33};
34
35#[derive(Debug)]
36enum ProcessManagerMessage {
37    SpawnProcess {
38        cmd: Box<dyn Runnable>,
39        responder: oneshot::Sender<Result<ProcessId, SpawnProcessError>>,
40    },
41    SubscribeMessageStream {
42        id: ProcessId,
43        responder: oneshot::Sender<Result<broadcast::Receiver<Vec<u8>>, ReadMessageError>>,
44    },
45    WriteMessage {
46        id: ProcessId,
47        data: Vec<u8>,
48        responder: oneshot::Sender<Result<(), WriteMessageError>>,
49    },
50    KillProcess {
51        id: ProcessId,
52        responder: oneshot::Sender<Result<(), KillProcessError>>,
53    },
54    GetLogs {
55        id: ProcessId,
56        logs_query_type: LogsQueryType,
57        query: LogsQuery,
58        responder: oneshot::Sender<Result<Vec<String>, GetLogsError>>,
59    },
60    GetProcessInfo {
61        id: ProcessId,
62        responder: oneshot::Sender<Result<ProcessInfo, GetProcessInfoError>>,
63    },
64    Abort {
65        responder: oneshot::Sender<Result<(), AbortError>>,
66    },
67}
68
69/// `ProcessManager` provides asynchronous API for spawning and managing multiple processes.
70///
71/// The implementation relies on the `Actor model` architecture which benefits from high concurrency and scalability and loose-coupling between actors (units doing some work).
72/// `ProcessManager` is a main actor responsible for:
73/// * spawning new actors (eg. for sending/reading messages to/from user-defined processes),
74/// * forwarding messages between client and other actors.
75///
76/// All files needed to handle spawned processes are kept in user-specified directory, called further `working directory`.
77/// Each process has own, unique subdirectory (`process directory`) inside `working directory`,
78/// where `ProcessManager` creates files/named pipes used for communication, logging etc.
79/// If the process has been killed manually, then its `process directory` is removed immediately.
80///
81/// Each spawned process has its own [`ProcessId`], which can be used to interact with it through [`ProcessManagerHandle`].
82/// For convenience of interacting with one process, use a [`ProcessHandle`] wrapper.
83pub struct ProcessManager {
84    working_dir: WorkingDir,
85    process_spawner: ProcessSpawner,
86    processes: HashMap<ProcessId, Process>,
87    receiver: mpsc::Receiver<ProcessManagerMessage>,
88    is_aborted: bool,
89}
90
91impl ProcessManager {
92    /// Spawns a new process manager task with a given `working_directory`, returning a handle associated with the manager's task.
93    ///
94    /// The `Err` value is returned, if provided `working_directory` is not a directory or is not writeable.
95    /// # Examples
96    /// ```no_run
97    /// # use proc_heim::manager::ProcessManager;
98    /// # use std::path::PathBuf;
99    /// let working_directory = PathBuf::from("/some/temp/path");
100    /// let handle = ProcessManager::spawn(working_directory).expect("Invalid working directory");
101    /// ```
102    pub fn spawn(working_directory: PathBuf) -> Result<ProcessManagerHandle, io::Error> {
103        Self::validate_working_dir(&working_directory)?;
104        let (sender, receiver) = mpsc::channel(32);
105        let working_dir = WorkingDir::new(working_directory);
106        let mut manager = Self {
107            working_dir: working_dir.clone(),
108            process_spawner: ProcessSpawner::new(working_dir),
109            receiver,
110            processes: HashMap::new(),
111            is_aborted: false,
112        };
113        tokio::spawn(async move { manager.run().await });
114        Ok(ProcessManagerHandle::new(sender))
115    }
116
117    fn validate_working_dir(working_directory: &Path) -> Result<(), io::Error> {
118        let file_path = working_directory.join(Uuid::new_v4().to_string());
119        std::fs::write(&file_path, "testing working dir")?;
120        std::fs::remove_file(file_path)
121    }
122
123    async fn run(&mut self) {
124        loop {
125            if self.is_aborted {
126                break;
127            } else if let Some(msg) = self.receiver.recv().await {
128                self.handle_message(msg).await;
129            } else {
130                break;
131            }
132        }
133    }
134
135    async fn handle_message(&mut self, msg: ProcessManagerMessage) {
136        match msg {
137            ProcessManagerMessage::SpawnProcess { cmd, responder } => {
138                let result = self.spawn_process(cmd).await;
139                let _ = responder.send(result);
140            }
141            ProcessManagerMessage::KillProcess { id, responder } => {
142                let result = self.kill_process(id).await;
143                let _ = responder.send(result);
144            }
145            ProcessManagerMessage::SubscribeMessageStream { id, responder } => {
146                let result = self.subscribe_to_message_stream(id).await;
147                let _ = responder.send(result);
148            }
149            ProcessManagerMessage::WriteMessage {
150                id,
151                data,
152                responder,
153            } => {
154                let result = self.send_message(id, data).await;
155                let _ = responder.send(result);
156            }
157            ProcessManagerMessage::GetLogs {
158                id,
159                logs_query_type,
160                query,
161                responder,
162            } => {
163                let result = self.get_logs(id, logs_query_type, query).await;
164                let _ = responder.send(result);
165            }
166            ProcessManagerMessage::GetProcessInfo { id, responder } => {
167                let result = self.get_process_info(id);
168                let _ = responder.send(result);
169            }
170            ProcessManagerMessage::Abort { responder } => {
171                let result = self.abort().await;
172                let _ = responder.send(result);
173            }
174        }
175    }
176
177    async fn spawn_process(
178        &mut self,
179        runnable: Box<dyn Runnable>,
180    ) -> Result<ProcessId, SpawnProcessError> {
181        let id = ProcessId::random();
182        let process = self.process_spawner.spawn_runnable(&id, runnable)?;
183        self.processes.insert(id, process);
184        Ok(id)
185    }
186
187    async fn kill_process(&mut self, id: ProcessId) -> Result<(), KillProcessError> {
188        let process = self
189            .processes
190            .get_mut(&id)
191            .ok_or(KillProcessError::ProcessNotFound(id))?;
192        if let Some(reader) = process.message_reader.take() {
193            reader.abort().await;
194        }
195        if let Some(writer) = process.message_writer.take() {
196            writer.abort().await;
197        }
198        if let Some(log_reader) = process.log_reader.take() {
199            log_reader.abort().await;
200        }
201        self.processes.remove(&id); // kill_on_drop() is used to kill child process
202        let process_dir = self.working_dir.process_dir(&id);
203        let _ = tokio::fs::remove_dir_all(process_dir).await;
204        Ok(())
205    }
206
207    async fn subscribe_to_message_stream(
208        &mut self,
209        id: ProcessId,
210    ) -> Result<broadcast::Receiver<Vec<u8>>, ReadMessageError> {
211        let reader = self
212            .processes
213            .get(&id)
214            .ok_or(ReadMessageError::ProcessNotFound(id))?
215            .message_reader
216            .as_ref()
217            .ok_or(ReadMessageError::MessageReaderNotFound(id))?;
218
219        let receiver = reader
220            .subscribe()
221            .await
222            .map_err(|_| ReadMessageError::MessageReaderKilled)?;
223        Ok(receiver)
224    }
225
226    async fn send_message(
227        &mut self,
228        id: ProcessId,
229        data: Vec<u8>,
230    ) -> Result<(), WriteMessageError> {
231        self.processes
232            .get(&id)
233            .ok_or(WriteMessageError::ProcessNotFound(id))?
234            .message_writer
235            .as_ref()
236            .ok_or(WriteMessageError::MessageWriterNotFound(id))?
237            .write(data)
238            .await
239            .map_err(Into::into)
240    }
241
242    async fn get_logs(
243        &mut self,
244        id: ProcessId,
245        logs_query_type: LogsQueryType,
246        query: LogsQuery,
247    ) -> Result<Vec<String>, GetLogsError> {
248        self.processes
249            .get(&id)
250            .ok_or(GetLogsError::ProcessNotFound(id))?
251            .log_reader
252            .as_ref()
253            .ok_or(GetLogsError::LogReaderNotFound(id))?
254            .read_logs(logs_query_type, query)
255            .await
256            .map_err(Into::into)
257    }
258
259    fn get_process_info(&mut self, id: ProcessId) -> Result<ProcessInfo, GetProcessInfoError> {
260        let process = self
261            .processes
262            .get_mut(&id)
263            .ok_or(GetProcessInfoError::ProcessNotFound(id))?;
264        let pid = process.child.id();
265        let exit_status = process.child.try_wait()?;
266        Ok(ProcessInfo::new(pid, exit_status))
267    }
268
269    async fn abort(&mut self) -> Result<(), AbortError> {
270        self.is_aborted = true;
271        self.receiver.close();
272        let ids: Vec<ProcessId> = self.processes.keys().cloned().collect();
273        for id in ids {
274            let _ = self.kill_process(id).await;
275        }
276        Ok(())
277    }
278}
279
280/// Type used for communication with [`ProcessManager`] task.
281///
282/// It provides asynchronous API for:
283/// * spawning new child processes, which implements [`Runnable`] trait,
284/// * sending messages to spawned processes as:
285///     * raw bytes,
286///     * strings,
287///     * `Rust` data types serialized to raw bytes, `JSON` or `MessagePack`.
288/// * receiving messages from spawned processes using asynchronous streams, including:
289///     * reading messages by multiple subscribers,
290///     * buffering not read messages with configurable buffer capacity,
291///     * communication via standard I/O or named pipes.
292/// * reading logs produced by spawned processes (from standard output/error streams),
293/// * fetching information about spawned processes (OS-assigned pid, exit status),
294/// * forcefully killing processes,
295/// * waiting processes to finish.
296///
297/// `ProcessManagerHandle` can only be created by calling [`ProcessManager::spawn`] method.
298/// The handle can be cheaply cloned and used safely by many threads.
299#[derive(Clone, Debug)]
300pub struct ProcessManagerHandle {
301    sender: mpsc::Sender<ProcessManagerMessage>,
302}
303
304impl ProcessManagerHandle {
305    fn new(sender: mpsc::Sender<ProcessManagerMessage>) -> Self {
306        Self { sender }
307    }
308
309    /// Spawn a new child process, returning its assigned identifier, which can be used to interact with the process.
310    /// # Examples
311    /// ```
312    /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
313    ///
314    /// # #[tokio::main]
315    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
316    /// let working_dir = tempfile::tempdir()?.into_path();
317    /// let handle = ProcessManager::spawn(working_dir)?;
318    /// let cmd = Cmd::new("echo");
319    /// let process_id = handle.spawn(cmd).await?;
320    /// # Ok(())
321    /// # }
322    pub async fn spawn(&self, runnable: impl Runnable) -> Result<ProcessId, SpawnProcessError> {
323        let (responder, receiver) = oneshot::channel();
324        let msg = ProcessManagerMessage::SpawnProcess {
325            cmd: Box::new(runnable),
326            responder,
327        };
328        let _ = self.sender.send(msg).await;
329        let process_id = receiver.await??;
330        Ok(process_id)
331    }
332
333    /// Spawn a new child process, returning a [`ProcessHandle`], which can be used to interact with the process.
334    pub async fn spawn_with_handle(
335        &self,
336        runnable: impl Runnable,
337    ) -> Result<ProcessHandle, SpawnProcessError> {
338        let id = self.spawn(runnable).await?;
339        Ok(ProcessHandle::new(id, self.clone()))
340    }
341
342    /// Send a [`Message`] to the process with given `id`.
343    /// # Examples
344    /// ```
345    /// use futures::TryStreamExt;
346    /// use proc_heim::{
347    ///     manager::ProcessManager,
348    ///     model::{
349    ///         command::CmdOptions,
350    ///         script::{Script, ScriptingLanguage},
351    ///     },
352    /// };
353    ///
354    /// # #[tokio::main]
355    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
356    /// let working_dir = tempfile::tempdir()?.into_path();
357    /// let handle = ProcessManager::spawn(working_dir)?;
358    ///
359    /// let script = Script::with_options(
360    ///     ScriptingLanguage::Bash,
361    ///     r#"
362    ///     read -r msg < /dev/stdin
363    ///     echo "Hello $msg"
364    ///     "#,
365    ///     CmdOptions::with_standard_io_messaging(),
366    /// );
367    ///
368    /// let process_id = handle.spawn(script).await?;
369    /// handle.send_message(process_id, "John").await?;
370    /// let mut stream = handle.subscribe_message_stream(process_id).await?;
371    /// let received_msg = stream.try_next().await?.unwrap();
372    /// assert_eq!("Hello John", received_msg.try_into_string().unwrap());
373    /// # Ok(())
374    /// # }
375    /// ```
376    pub async fn send_message<M>(&self, id: ProcessId, message: M) -> Result<(), WriteMessageError>
377    where
378        M: Into<Message>,
379    {
380        if let Some(data) = Self::into_bytes_with_eol_char(message.into().bytes) {
381            let (responder, receiver) = oneshot::channel();
382            let msg = ProcessManagerMessage::WriteMessage {
383                id,
384                data,
385                responder,
386            };
387            let _ = self.sender.send(msg).await;
388            receiver.await??;
389        }
390        Ok(())
391    }
392
393    fn into_bytes_with_eol_char(mut bytes: Vec<u8>) -> Option<Vec<u8>> {
394        if let Some(last_char) = bytes.last() {
395            if *last_char != b'\n' {
396                bytes.push(b'\n');
397            }
398            Some(bytes)
399        } else {
400            None
401        }
402    }
403
404    /// Access asynchronous message stream from the process with given `id`.
405    ///
406    /// Messages read from the process are returned as [`Message`] types,
407    /// allowing a convenient conversion into raw bytes, string or even `Rust` types, which implement `Deserialize` trait.
408    /// If message stream was successfully subscribed, then `Ok(stream)` is returned, otherwise a [`ReadMessageError`] is returned.
409    /// A stream doesn't yield raw messages, instead each message is wrapped by `Result` with [`ReceiveMessageError`] error type.
410    /// For convenient stream transformation use [`TryMessageStreamExt`](trait@crate::manager::TryMessageStreamExt) and [`MessageStreamExt`](trait@crate::manager::MessageStreamExt) traits.
411    ///
412    /// `ProcessManager` for each child process assigns a buffer for messages received from it,
413    /// which can be configured via [`CmdOptions::set_message_output_buffer_capacity`](crate::model::command::CmdOptions::set_message_output_buffer_capacity).
414    // NOTE: the same doc as in CmdOptions::set_message_output_buffer_capacity below.
415    /// When parent process is not reading messages produced by the process,
416    /// then the messages are buffered up to the given capacity value.
417    /// If the buffer limit is reached and the process sends a new message,
418    /// the "oldest" buffered message will be removed. Therefore, when retrieving next message from the stream,
419    /// the [`ReceiveMessageError::LostMessages`] error will be returned, indicating how many buffered messages have been removed.
420    ///
421    /// The messages stream can be subscribed multiple times and each subscriber will receive a one copy of the original message.
422    /// Notice that buffer mentioned earlier is created not per subscriber, but per each process, so when one of subscribers not read messages, the buffer will fill up.
423    /// # Examples
424    ///
425    /// Reading a message via standard IO:
426    ///
427    /// ```
428    /// use futures::TryStreamExt;
429    /// use proc_heim::{
430    ///     manager::{ProcessManager, TryMessageStreamExt},
431    ///     model::command::{Cmd, CmdOptions, MessagingType},
432    /// };
433    ///
434    /// # #[tokio::main]
435    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
436    /// // spawn ProcessManager
437    /// let working_dir = tempfile::tempdir()?.into_path();
438    /// let handle = ProcessManager::spawn(working_dir)?;
439    ///
440    /// // create simple echo command
441    /// let msg = "Hello world!";
442    /// let options = CmdOptions::with_message_output(MessagingType::StandardIo);
443    /// let cmd = Cmd::with_args_and_options("echo", [msg], options);
444    ///
445    /// // read a message from spawned process
446    /// let process_id = handle.spawn(cmd).await?;
447    /// let mut stream = handle.subscribe_message_stream(process_id).await?
448    ///     .into_string_stream();
449    /// let received_msg = stream.try_next().await?.unwrap();
450    /// assert_eq!(msg, received_msg);
451    /// # Ok(())
452    /// # }
453    /// ```
454    ///
455    /// Reading messages via named pipes:
456    ///
457    /// ```
458    /// use futures::{StreamExt, TryStreamExt};
459    /// use proc_heim::{
460    ///     manager::{ProcessManager, TryMessageStreamExt, ResultStreamExt},
461    ///     model::{
462    ///         command::CmdOptions,
463    ///         script::{Script, ScriptingLanguage},
464    ///     },
465    /// };
466    /// use std::path::PathBuf;
467    ///
468    /// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
469    /// let working_dir = tempfile::tempdir()?.into_path();
470    /// let handle = ProcessManager::spawn(working_dir)?;
471    /// let script = Script::with_options(
472    ///     ScriptingLanguage::Bash,
473    ///     r#"
474    ///     counter=0
475    ///     while read msg; do
476    ///         echo "$counter: $msg" > $OUTPUT_PIPE
477    ///         counter=$((counter + 1))
478    ///     done < $INPUT_PIPE
479    ///     "#,
480    ///     CmdOptions::with_named_pipe_messaging(), // we want to send messages bidirectionally
481    /// );
482    ///
483    /// // We can use "spawn_with_handle" instead of "spawn" to get "ProcessHandle",
484    /// // which mimics the "ProcessManagerHandle" API,
485    /// // but without having to pass the process ID to each method call.
486    /// let process_handle = handle.spawn_with_handle(script).await?;
487    ///
488    /// process_handle.send_message("First message").await?;
489    /// // We can send a next message without causing a deadlock here.
490    /// // This is possible because the response to the first message
491    /// // will be read by a dedicated Tokio task,
492    /// // spawned automatically by the Process Manager.
493    /// process_handle.send_message("Second message").await?;
494    ///
495    /// let mut stream = process_handle
496    ///     .subscribe_message_stream()
497    ///     .await?
498    ///     .into_string_stream();
499    ///
500    /// assert_eq!("0: First message", stream.try_next().await?.unwrap());
501    /// assert_eq!("1: Second message", stream.try_next().await?.unwrap());
502    /// # Ok(()) }
503    /// ```
504    pub async fn subscribe_message_stream(
505        &self,
506        id: ProcessId,
507    ) -> Result<impl Stream<Item = Result<Message, ReceiveMessageError>>, ReadMessageError> {
508        let (responder, receiver) = oneshot::channel();
509        let msg = ProcessManagerMessage::SubscribeMessageStream { id, responder };
510        let _ = self.sender.send(msg).await;
511        let message_receiver = receiver.await??;
512        let stream = BroadcastStream::new(message_receiver)
513            .map(|v| v.map(Message::from_bytes).map_err(Into::into));
514        Ok(stream)
515    }
516
517    /// Fetch logs from standard `output` stream, produced by a process with given `id`.
518    ///
519    /// The method will return [`GetLogsError::LoggingTypeWasNotConfigured`] error, if [`LoggingType`](enum@crate::model::command::LoggingType) was set to `StderrOnly`.
520    /// Notice that each string ending with new line character (`\n`) is treated as one log.
521    /// # Examples
522    /// ```
523    /// use proc_heim::{
524    ///     manager::{LogsQuery, ProcessManager},
525    ///     model::{
526    ///         command::{CmdOptions, LoggingType},
527    ///         script::{Script, ScriptingLanguage},
528    ///     },
529    /// };
530    /// use std::time::Duration;
531    ///
532    /// # #[tokio::main]
533    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
534    /// let working_dir = tempfile::tempdir()?.into_path();
535    /// let handle = ProcessManager::spawn(working_dir)?;
536    /// let script = Script::with_options(
537    ///     ScriptingLanguage::Bash,
538    ///     r#"
539    ///     echo message1
540    ///     echo message2
541    ///     "#,
542    ///     CmdOptions::with_logging(LoggingType::StdoutOnly),
543    /// );
544    ///
545    /// let process_id = handle.spawn(script).await?;
546    ///
547    /// // We need to wait for the process to spawn and finish in order to collect logs,
548    /// // otherwise returned logs will be empty.
549    /// let _ = handle.wait(process_id, Duration::from_micros(10)).await?;
550    ///
551    /// let logs = handle
552    ///     .get_logs_stdout(process_id, LogsQuery::fetch_all())
553    ///     .await?;
554    /// assert_eq!(2, logs.len());
555    /// assert_eq!("message1", logs[0]);
556    /// assert_eq!("message2", logs[1]);
557    /// # Ok(())
558    /// # }
559    /// ```
560    pub async fn get_logs_stdout(
561        &self,
562        id: ProcessId,
563        query: LogsQuery,
564    ) -> Result<Vec<String>, GetLogsError> {
565        self.get_logs(id, LogsQueryType::Stdout, query).await
566    }
567
568    /// Fetch logs from standard `error` stream, produced by a process with given `id`.
569    ///
570    /// The method will return [`GetLogsError::LoggingTypeWasNotConfigured`] error, if [`LoggingType`](enum@crate::model::command::LoggingType) was set to `StdoutOnly`.
571    /// Notice that each string ending with new line character (`\n`) is treated as one log.
572    /// # Examples
573    /// ```
574    /// use proc_heim::{
575    ///     manager::{LogsQuery, ProcessManager},
576    ///     model::{
577    ///         command::{CmdOptions, LoggingType},
578    ///         script::{Script, ScriptingLanguage},
579    ///     },
580    /// };
581    /// use std::time::Duration;
582    ///
583    /// # #[tokio::main]
584    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
585    /// let working_dir = tempfile::tempdir()?.into_path();
586    /// let handle = ProcessManager::spawn(working_dir)?;
587    /// let script = Script::with_options(
588    ///     ScriptingLanguage::Bash,
589    ///     r#"
590    ///     echo message1 >&2
591    ///     echo message2 >&2
592    ///     echo message3 >&2
593    ///     "#,
594    ///     CmdOptions::with_logging(LoggingType::StderrOnly),
595    /// );
596    ///
597    /// let process_id = handle.spawn(script).await?;
598    ///
599    /// // We need to wait for the process to spawn and finish in order to collect logs,
600    /// // otherwise returned logs will be empty.
601    /// let _ = handle.wait(process_id, Duration::from_micros(10)).await?;
602    ///
603    /// let logs = handle
604    ///     .get_logs_stderr(process_id, LogsQuery::with_offset(1))
605    ///     .await?;
606    /// assert_eq!(2, logs.len());
607    /// assert_eq!("message2", logs[0]);
608    /// assert_eq!("message3", logs[1]);
609    /// # Ok(())
610    /// # }
611    pub async fn get_logs_stderr(
612        &self,
613        id: ProcessId,
614        query: LogsQuery,
615    ) -> Result<Vec<String>, GetLogsError> {
616        self.get_logs(id, LogsQueryType::Stderr, query).await
617    }
618
619    async fn get_logs(
620        &self,
621        id: ProcessId,
622        logs_query_type: LogsQueryType,
623        query: LogsQuery,
624    ) -> Result<Vec<String>, GetLogsError> {
625        let (responder, receiver) = oneshot::channel();
626        let msg = ProcessManagerMessage::GetLogs {
627            id,
628            logs_query_type,
629            query,
630            responder,
631        };
632        let _ = self.sender.send(msg).await;
633        let logs = receiver.await??;
634        Ok(logs)
635    }
636
637    /// Get information about the process with given `id`.
638    /// # Examples
639    /// Check information of running process:
640    /// ```
641    /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
642    ///
643    /// # #[tokio::main]
644    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
645    /// let working_dir = tempfile::tempdir()?.into_path();
646    /// let handle = ProcessManager::spawn(working_dir)?;
647    /// let process_id = handle.spawn(Cmd::new("cat")).await?;
648    ///
649    /// let process_info = handle.get_process_info(process_id).await?;
650    /// assert!(process_info.pid().is_some());
651    /// # Ok(())
652    /// # }
653    /// ```
654    pub async fn get_process_info(
655        &self,
656        id: ProcessId,
657    ) -> Result<ProcessInfo, GetProcessInfoError> {
658        let (responder, receiver) = oneshot::channel();
659        let msg = ProcessManagerMessage::GetProcessInfo { id, responder };
660        let _ = self.sender.send(msg).await;
661        let data = receiver.await??;
662        Ok(data)
663    }
664
665    /// Wait for the process with given `id` to finish.
666    ///
667    /// This method will spawn a `Tokio` task and check in loop, if the process exited.
668    /// The `poll_interval` parameter specifies how often to check whether the process has completed or not.
669    /// # Examples
670    /// ```
671    /// use std::time::Duration;
672    /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
673    ///
674    /// # #[tokio::main]
675    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
676    /// let working_dir = tempfile::tempdir()?.into_path();
677    /// let handle = ProcessManager::spawn(working_dir)?;
678    /// let process_id = handle.spawn(Cmd::new("env")).await?;
679    ///
680    /// let process_info = handle.wait(process_id, Duration::from_micros(10)).await??;
681    ///
682    /// assert!(!process_info.is_running());
683    /// assert!(process_info.exit_status().unwrap().success());
684    /// # Ok(())
685    /// # }
686    /// ```
687    pub fn wait(
688        &self,
689        id: ProcessId,
690        poll_interval: Duration,
691    ) -> JoinHandle<Result<ProcessInfo, GetProcessInfoError>> {
692        let handle = self.clone();
693        tokio::spawn(async move {
694            loop {
695                let process_info = handle.get_process_info(id).await?;
696                if process_info.is_running() {
697                    tokio::time::sleep(poll_interval).await;
698                } else {
699                    return Ok(process_info);
700                }
701            }
702        })
703    }
704
705    /// Forcefully kills the process with given `id`.
706    ///
707    /// It will also abort all background tasks related to the process (eg. for messaging, logging) and remove its process directory.
708    /// # Examples
709    /// ```
710    /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
711    ///
712    /// # #[tokio::main]
713    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
714    /// let working_dir = tempfile::tempdir()?.into_path();
715    /// let handle = ProcessManager::spawn(working_dir)?;
716    /// let process_id = handle.spawn(Cmd::new("cat")).await?;
717    ///
718    /// let result = handle.kill(process_id).await;
719    ///
720    /// assert!(result.is_ok());
721    /// # Ok(())
722    /// # }
723    /// ```
724    pub async fn kill(&self, id: ProcessId) -> Result<(), KillProcessError> {
725        let (responder, receiver) = oneshot::channel();
726        let msg = ProcessManagerMessage::KillProcess { id, responder };
727        let _ = self.sender.send(msg).await;
728        receiver.await??;
729        Ok(())
730    }
731
732    /// Abort a [`ProcessManager`] task associated with this handle.
733    pub async fn abort(&self) -> Result<(), AbortError> {
734        let (responder, receiver) = oneshot::channel();
735        let msg = ProcessManagerMessage::Abort { responder };
736        self.sender
737            .send(msg)
738            .await
739            .map_err(|_| AbortError::ManagerAlreadyAborted)?;
740        receiver
741            .await
742            .map_err(|_| AbortError::ManagerAlreadyAborted)??;
743        Ok(())
744    }
745}
746
747/// Error type returned when reading message bytes from a messages stream.
748#[derive(thiserror::Error, Debug)]
749pub enum ReceiveMessageError {
750    /// Error indicating that some buffered messages were deleted due to buffer capacity overflow.
751    /// If you don't care about deleted messages, you can ignore this error (manually or using [`TryMessageStreamExt::ignore_lost_messages`](crate::manager::TryMessageStreamExt::ignore_lost_messages)).
752    /// Includes the number of deleted messages.
753    /// See [`ProcessManagerHandle::subscribe_message_stream`] for more information.
754    #[error("Lost {0} number of messages due to buffer capacity overflow")]
755    LostMessages(u64),
756}
757
758/// Error type returned when reading a deserializable message from a messages stream.
759#[derive(thiserror::Error, Debug)]
760pub enum ReceiveDeserializedMessageError {
761    /// Error indicating that some buffered messages were deleted due to buffer capacity overflow.
762    /// See [`ReceiveMessageError`] for more information.
763    #[error("Lost {0} number of messages due to buffer capacity overflow")]
764    LostMessages(u64),
765    /// Error indicating that a message cannot be deserialized from bytes. Includes error message.
766    #[error("{0}")]
767    CannotDeserializeMessage(String),
768}
769
770impl From<BroadcastStreamRecvError> for ReceiveMessageError {
771    fn from(err: BroadcastStreamRecvError) -> Self {
772        match err {
773            BroadcastStreamRecvError::Lagged(size) => ReceiveMessageError::LostMessages(size),
774        }
775    }
776}
777
778impl From<ReceiveMessageError> for ReceiveDeserializedMessageError {
779    fn from(value: ReceiveMessageError) -> Self {
780        match value {
781            ReceiveMessageError::LostMessages(number) => {
782                ReceiveDeserializedMessageError::LostMessages(number)
783            }
784        }
785    }
786}
787
788/// Error type returned when spawning a new child process.
789#[derive(thiserror::Error, Debug)]
790pub enum SpawnProcessError {
791    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
792    #[error("Cannot communicate with spawned process manager")]
793    ManagerCommunicationError(#[from] oneshot::error::RecvError),
794    /// Cannot create named pipes at process directory (first value). Second value is an error code.
795    #[error("Cannot create named pipe at path: {0}. Error code: {1}")]
796    CannotCreateNamedPipe(PathBuf, String),
797    /// Cannot spawn process due to unexpected an IO error.
798    #[error("Cannot spawn process: {0}")]
799    CannotSpawnProcess(#[from] io::Error),
800    /// Cannot create process directory due to an IO error.
801    #[error("Cannot create process directory: {0}")]
802    CannotCreateProcessWorkingDir(io::Error),
803    /// Error was returned by [`Runnable::bootstrap_cmd`] method. Includes an error message returned from this method.
804    #[error("Bootstrap process failed: {0}")]
805    BootstrapProcessFailed(String),
806}
807
808impl From<SpawnerError> for SpawnProcessError {
809    fn from(value: SpawnerError) -> Self {
810        match value {
811            SpawnerError::CannotCreateNamedPipe(pipe_path, err) => {
812                SpawnProcessError::CannotCreateNamedPipe(pipe_path, err.to_string())
813            }
814            SpawnerError::CannotSpawnProcess(err) => SpawnProcessError::CannotSpawnProcess(err),
815            SpawnerError::CannotCreateProcessWorkingDir(err) => {
816                SpawnProcessError::CannotCreateProcessWorkingDir(err)
817            }
818            SpawnerError::BootstrapProcessFailed(err) => Self::BootstrapProcessFailed(err),
819        }
820    }
821}
822
823/// Error type returned when subscribing to a messages stream.
824#[derive(thiserror::Error, Debug)]
825pub enum ReadMessageError {
826    /// The process with given ID was not found (the ID is wrong or the process has been already killed).
827    #[error("Process with id: {0} was not found")]
828    ProcessNotFound(ProcessId),
829    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
830    #[error("Cannot communicate with spawned process manager")]
831    ManagerCommunicationError(#[from] oneshot::error::RecvError),
832    /// Cannot get messages stream because a [`MessagingType`](enum@crate::model::command::MessagingType) has not been configured for the process.
833    #[error("Message reader for process with id: {0} was not found")]
834    MessageReaderNotFound(ProcessId),
835    /// The task used to read messages from a process has been killed.
836    /// This error "shouldn't" normally occur because the task is aborted when a process is being killed. And after this `ProcessNotFound` will be returned.
837    #[error("Message reader process has been killed")]
838    MessageReaderKilled,
839}
840
841/// Error type returned when sending a message to the process.
842#[derive(thiserror::Error, Debug)]
843pub enum WriteMessageError {
844    /// The process with given ID was not found (the ID is wrong or the process has been already killed).
845    #[error("Process with id: {0} was not found")]
846    ProcessNotFound(ProcessId),
847    /// Cannot serialize message to bytes. Includes error message.
848    #[error("{0}")]
849    CannotSerializeMessage(String),
850    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
851    #[error("Cannot communicate with spawned process manager")]
852    ManagerCommunicationError(#[from] oneshot::error::RecvError),
853    /// An unexpected IO error occurred when sending a message to the process.
854    #[error("Error occurred when sending message to process: {0}")]
855    IoError(#[from] std::io::Error),
856    /// The task used to send messages to a process has been killed.
857    /// This error "shouldn't" normally occur because the task is aborted when a process is being killed. And after this `ProcessNotFound` will be returned.
858    #[error("Message writer for process with id: {0} was not found")]
859    MessageWriterNotFound(ProcessId),
860}
861
862/// Error type returned when trying to kill the process.
863#[derive(thiserror::Error, Debug)]
864pub enum KillProcessError {
865    /// The process with given ID was not found (the ID is wrong or the process has been already killed).
866    #[error("Process with id: {0} was not found")]
867    ProcessNotFound(ProcessId),
868    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
869    #[error("Cannot communicate with spawned process manager")]
870    ManagerCommunicationError(#[from] oneshot::error::RecvError),
871}
872
873/// Error type returned when collecting logs from the process.
874#[derive(thiserror::Error, Debug)]
875pub enum GetLogsError {
876    /// The process with given ID was not found (the ID is wrong or the process has been already killed).
877    #[error("Process with id: {0} was not found")]
878    ProcessNotFound(ProcessId),
879    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
880    #[error("Cannot communicate with spawned process manager")]
881    ManagerCommunicationError(#[from] oneshot::error::RecvError),
882    /// Cannot get logs because a proper [`LoggingType`](enum@crate::model::command::MessagingType) has not been configured for the process.
883    #[error("Logging type: {0} was not configured for process")]
884    LoggingTypeWasNotConfigured(String),
885    /// The task used to reading logs from a process has been killed.
886    /// This error "shouldn't" normally occur because the task is aborted when a process is being killed. And after this `ProcessNotFound` will be returned.
887    #[error("Log read for process with id: {0} was not found")]
888    LogReaderNotFound(ProcessId),
889    /// An unexpected IO error occurred when reading logs from the process.
890    #[error(transparent)]
891    UnExpectedIoError(#[from] io::Error),
892}
893
894impl From<LogReaderError> for GetLogsError {
895    fn from(err: LogReaderError) -> Self {
896        match err {
897            LogReaderError::LogTypeWasNotConfigured(log_type) => {
898                Self::LoggingTypeWasNotConfigured(log_type.to_string())
899            }
900            LogReaderError::UnExpectedIoError(err) => Self::UnExpectedIoError(err),
901        }
902    }
903}
904
905/// Error type returned when trying to get an information about the process.
906#[derive(thiserror::Error, Debug)]
907pub enum GetProcessInfoError {
908    /// The process with given ID was not found (the ID is wrong or the process has been already killed).
909    #[error("Process with id: {0} was not found")]
910    ProcessNotFound(ProcessId),
911    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
912    #[error("Cannot communicate with spawned process manager")]
913    ManagerCommunicationError(#[from] oneshot::error::RecvError),
914    /// An unexpected IO error occurred when trying to get an information about the process.
915    #[error(transparent)]
916    UnExpectedIoError(#[from] io::Error),
917}
918
919/// Error type returned when trying to abort a [`ProcessManager`] task.
920#[derive(thiserror::Error, Debug)]
921pub enum AbortError {
922    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
923    #[error("Cannot communicate with spawned process manager")]
924    ManagerAlreadyAborted,
925}