proc_heim/process/
manager.rs

1use std::{
2    collections::HashMap,
3    fmt::Debug,
4    io,
5    path::{Path, PathBuf},
6    time::Duration,
7};
8
9use tokio::{
10    sync::{
11        broadcast,
12        mpsc::{self},
13        oneshot::{self},
14    },
15    task::JoinHandle,
16};
17use tokio_stream::{
18    wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
19    Stream, StreamExt,
20};
21use uuid::Uuid;
22
23use crate::working_dir::WorkingDir;
24
25use super::{
26    log_reader::{LogReaderError, LogsQuery, LogsQueryType},
27    message::Message,
28    scoped_process_handle::ScopedProcessHandle,
29    spawner::ProcessSpawner,
30    ProcessHandle, ProcessInfo, Runnable,
31};
32use super::{
33    model::{Process, ProcessId},
34    spawner::SpawnerError,
35};
36
37#[derive(Debug)]
38enum ProcessManagerMessage {
39    SpawnProcess {
40        cmd: Box<dyn Runnable>,
41        responder: oneshot::Sender<Result<ProcessId, SpawnProcessError>>,
42    },
43    SubscribeMessageStream {
44        id: ProcessId,
45        responder: oneshot::Sender<Result<broadcast::Receiver<Vec<u8>>, ReadMessageError>>,
46    },
47    WriteMessage {
48        id: ProcessId,
49        data: Vec<u8>,
50        responder: oneshot::Sender<Result<(), WriteMessageError>>,
51    },
52    KillProcess {
53        id: ProcessId,
54        responder: oneshot::Sender<Result<(), KillProcessError>>,
55    },
56    GetLogs {
57        id: ProcessId,
58        logs_query_type: LogsQueryType,
59        query: LogsQuery,
60        responder: oneshot::Sender<Result<Vec<String>, GetLogsError>>,
61    },
62    GetProcessInfo {
63        id: ProcessId,
64        responder: oneshot::Sender<Result<ProcessInfo, GetProcessInfoError>>,
65    },
66    Abort {
67        responder: oneshot::Sender<()>,
68    },
69}
70
71/// `ProcessManager` provides asynchronous API for spawning and managing multiple processes.
72///
73/// The implementation relies on the `Actor model` architecture which benefits from high concurrency and scalability and loose-coupling between actors (units doing some work).
74/// `ProcessManager` is a main actor responsible for:
75/// * spawning new actors (eg. for sending/reading messages to/from user-defined processes),
76/// * forwarding messages between client and other actors.
77///
78/// All files needed to handle spawned processes are kept in user-specified directory, called further `working directory`.
79/// Each process has own, unique subdirectory (`process directory`) inside `working directory`,
80/// where `ProcessManager` creates files/named pipes used for communication, logging etc.
81/// If the process has been killed manually, then its `process directory` is removed immediately.
82///
83/// Each spawned process has its own [`ProcessId`], which can be used to interact with it through [`ProcessManagerHandle`].
84/// For convenience of interacting with one process, use a [`ProcessHandle`] wrapper.
85///
86/// Please note that once all manager handles are dropped, then all child processes will be killed and all `process directories` will be deleted.
87
88pub struct ProcessManager {
89    working_dir: WorkingDir,
90    process_spawner: ProcessSpawner,
91    processes: HashMap<ProcessId, Process>,
92    receiver: mpsc::Receiver<ProcessManagerMessage>,
93    is_aborted: bool,
94}
95
96impl ProcessManager {
97    /// Spawns a new process manager task with a given `working_directory`, returning a handle associated with the manager's task.
98    ///
99    /// The `Err` value is returned, if provided `working_directory` is not a directory or is not writeable.
100    /// # Examples
101    /// ```no_run
102    /// # use proc_heim::manager::ProcessManager;
103    /// # use std::path::PathBuf;
104    /// let working_directory = PathBuf::from("/some/temp/path");
105    /// let handle = ProcessManager::spawn(working_directory).expect("Invalid working directory");
106    /// ```
107    pub fn spawn(working_directory: PathBuf) -> Result<ProcessManagerHandle, io::Error> {
108        Self::validate_working_dir(&working_directory)?;
109        let (sender, receiver) = mpsc::channel(32);
110        let working_dir = WorkingDir::new(working_directory);
111        let mut manager = Self {
112            working_dir: working_dir.clone(),
113            process_spawner: ProcessSpawner::new(working_dir),
114            receiver,
115            processes: HashMap::new(),
116            is_aborted: false,
117        };
118        tokio::spawn(async move { manager.run().await });
119        Ok(ProcessManagerHandle::new(sender))
120    }
121
122    fn validate_working_dir(working_directory: &Path) -> Result<(), io::Error> {
123        let file_path = working_directory.join(Uuid::new_v4().to_string());
124        std::fs::write(&file_path, "testing working dir")?;
125        std::fs::remove_file(file_path)
126    }
127
128    async fn run(&mut self) {
129        loop {
130            if self.is_aborted {
131                break;
132            } else if let Some(msg) = self.receiver.recv().await {
133                self.handle_message(msg).await;
134            } else {
135                self.abort().await;
136            }
137        }
138    }
139
140    async fn handle_message(&mut self, msg: ProcessManagerMessage) {
141        match msg {
142            ProcessManagerMessage::SpawnProcess { cmd, responder } => {
143                let result = self.spawn_process(cmd).await;
144                let _ = responder.send(result);
145            }
146            ProcessManagerMessage::KillProcess { id, responder } => {
147                let result = self.kill_process(id).await;
148                let _ = responder.send(result);
149            }
150            ProcessManagerMessage::SubscribeMessageStream { id, responder } => {
151                let result = self.subscribe_to_message_stream(id).await;
152                let _ = responder.send(result);
153            }
154            ProcessManagerMessage::WriteMessage {
155                id,
156                data,
157                responder,
158            } => {
159                let result = self.send_message(id, data).await;
160                let _ = responder.send(result);
161            }
162            ProcessManagerMessage::GetLogs {
163                id,
164                logs_query_type,
165                query,
166                responder,
167            } => {
168                let result = self.get_logs(id, logs_query_type, query).await;
169                let _ = responder.send(result);
170            }
171            ProcessManagerMessage::GetProcessInfo { id, responder } => {
172                let result = self.get_process_info(id);
173                let _ = responder.send(result);
174            }
175            ProcessManagerMessage::Abort { responder } => {
176                let result = self.abort().await;
177                let _ = responder.send(result);
178            }
179        }
180    }
181
182    async fn spawn_process(
183        &mut self,
184        runnable: Box<dyn Runnable>,
185    ) -> Result<ProcessId, SpawnProcessError> {
186        let id = ProcessId::random();
187        let process = self.process_spawner.spawn_runnable(&id, runnable)?;
188        self.processes.insert(id, process);
189        Ok(id)
190    }
191
192    async fn kill_process(&mut self, id: ProcessId) -> Result<(), KillProcessError> {
193        let process = self
194            .processes
195            .get_mut(&id)
196            .ok_or(KillProcessError::ProcessNotFound(id))?;
197        if let Some(reader) = process.message_reader.take() {
198            reader.abort().await;
199        }
200        if let Some(writer) = process.message_writer.take() {
201            writer.abort().await;
202        }
203        if let Some(log_reader) = process.log_reader.take() {
204            log_reader.abort().await;
205        }
206        self.processes.remove(&id); // kill_on_drop() is used to kill child process
207        let process_dir = self.working_dir.process_dir(&id);
208        let _ = tokio::fs::remove_dir_all(process_dir).await;
209        Ok(())
210    }
211
212    async fn subscribe_to_message_stream(
213        &mut self,
214        id: ProcessId,
215    ) -> Result<broadcast::Receiver<Vec<u8>>, ReadMessageError> {
216        let reader = self
217            .processes
218            .get(&id)
219            .ok_or(ReadMessageError::ProcessNotFound(id))?
220            .message_reader
221            .as_ref()
222            .ok_or(ReadMessageError::MessageReaderNotFound(id))?;
223
224        let receiver = reader
225            .subscribe()
226            .await
227            .map_err(|_| ReadMessageError::MessageReaderKilled)?;
228        Ok(receiver)
229    }
230
231    async fn send_message(
232        &mut self,
233        id: ProcessId,
234        data: Vec<u8>,
235    ) -> Result<(), WriteMessageError> {
236        self.processes
237            .get(&id)
238            .ok_or(WriteMessageError::ProcessNotFound(id))?
239            .message_writer
240            .as_ref()
241            .ok_or(WriteMessageError::MessageWriterNotFound(id))?
242            .write(data)
243            .await
244            .map_err(Into::into)
245    }
246
247    async fn get_logs(
248        &mut self,
249        id: ProcessId,
250        logs_query_type: LogsQueryType,
251        query: LogsQuery,
252    ) -> Result<Vec<String>, GetLogsError> {
253        self.processes
254            .get(&id)
255            .ok_or(GetLogsError::ProcessNotFound(id))?
256            .log_reader
257            .as_ref()
258            .ok_or(GetLogsError::LogReaderNotFound(id))?
259            .read_logs(logs_query_type, query)
260            .await
261            .map_err(Into::into)
262    }
263
264    fn get_process_info(&mut self, id: ProcessId) -> Result<ProcessInfo, GetProcessInfoError> {
265        let process = self
266            .processes
267            .get_mut(&id)
268            .ok_or(GetProcessInfoError::ProcessNotFound(id))?;
269        let pid = process.child.id();
270        let exit_status = process.child.try_wait()?;
271        Ok(ProcessInfo::new(pid, exit_status))
272    }
273
274    async fn abort(&mut self) {
275        self.is_aborted = true;
276        self.receiver.close();
277        let ids: Vec<ProcessId> = self.processes.keys().cloned().collect();
278        for id in ids {
279            let _ = self.kill_process(id).await;
280        }
281    }
282}
283
284impl Drop for ProcessManager {
285    fn drop(&mut self) {
286        self.processes.keys().for_each(|id| {
287            let process_dir = self.working_dir.process_dir(id);
288            let _ = std::fs::remove_dir_all(process_dir);
289            // All processes will be killed automatically, thanks to kill_on_drop()
290        });
291    }
292}
293
294/// Type used for communication with [`ProcessManager`] task.
295///
296/// It provides asynchronous API for:
297/// * spawning new child processes, which implements [`Runnable`] trait,
298/// * sending messages to spawned processes as:
299///     * raw bytes,
300///     * strings,
301///     * `Rust` data types serialized to raw bytes, `JSON` or `MessagePack`.
302/// * receiving messages from spawned processes using asynchronous streams, including:
303///     * reading messages by multiple subscribers,
304///     * buffering not read messages with configurable buffer capacity,
305///     * communication via standard I/O or named pipes.
306/// * reading logs produced by spawned processes (from standard output/error streams),
307/// * fetching information about spawned processes (OS-assigned pid, exit status),
308/// * forcefully killing processes,
309/// * waiting processes to finish.
310///
311/// `ProcessManagerHandle` can only be created by calling [`ProcessManager::spawn`] method.
312/// The handle can be cheaply cloned and used safely by many threads.
313///
314/// Please note that once all manager handles are dropped, then all child processes will be killed and all `process directories` will be deleted.
315#[derive(Clone, Debug)]
316pub struct ProcessManagerHandle {
317    sender: mpsc::Sender<ProcessManagerMessage>,
318}
319
320impl ProcessManagerHandle {
321    fn new(sender: mpsc::Sender<ProcessManagerMessage>) -> Self {
322        Self { sender }
323    }
324
325    /// Spawn a new child process, returning its assigned identifier, which can be used to interact with the process.
326    /// # Examples
327    /// ```
328    /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
329    ///
330    /// # #[tokio::main]
331    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
332    /// let working_dir = tempfile::tempdir()?.into_path();
333    /// let handle = ProcessManager::spawn(working_dir)?;
334    /// let cmd = Cmd::new("echo");
335    /// let process_id = handle.spawn(cmd).await?;
336    /// # Ok(())
337    /// # }
338    pub async fn spawn(&self, runnable: impl Runnable) -> Result<ProcessId, SpawnProcessError> {
339        let (responder, receiver) = oneshot::channel();
340        let msg = ProcessManagerMessage::SpawnProcess {
341            cmd: Box::new(runnable),
342            responder,
343        };
344        let _ = self.sender.send(msg).await;
345        let process_id = receiver.await??;
346        Ok(process_id)
347    }
348
349    /// Spawn a new child process, returning a [`ProcessHandle`], which can be used to interact with the process.
350    pub async fn spawn_with_handle(
351        &self,
352        runnable: impl Runnable,
353    ) -> Result<ProcessHandle, SpawnProcessError> {
354        let id = self.spawn(runnable).await?;
355        Ok(ProcessHandle::new(id, self.clone()))
356    }
357
358    /// Spawn a new child process, returning a [`ScopedProcessHandle`], which can be used to interact with the process.
359    /// Unlike [`ProcessHandle`], dropping this handle will kill the associated child process.
360    pub async fn spawn_with_scoped_handle(
361        &self,
362        runnable: impl Runnable,
363    ) -> Result<ScopedProcessHandle, SpawnProcessError> {
364        let id = self.spawn(runnable).await?;
365        Ok(ScopedProcessHandle::new(ProcessHandle::new(
366            id,
367            self.clone(),
368        )))
369    }
370
371    /// Send a [`Message`] to the process with given `id`.
372    /// # Examples
373    /// ```
374    /// use futures::TryStreamExt;
375    /// use proc_heim::{
376    ///     manager::ProcessManager,
377    ///     model::{
378    ///         command::CmdOptions,
379    ///         script::{Script, ScriptingLanguage},
380    ///     },
381    /// };
382    ///
383    /// # #[tokio::main]
384    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
385    /// let working_dir = tempfile::tempdir()?.into_path();
386    /// let handle = ProcessManager::spawn(working_dir)?;
387    ///
388    /// let script = Script::with_options(
389    ///     ScriptingLanguage::Bash,
390    ///     r#"
391    ///     read -r msg < /dev/stdin
392    ///     echo "Hello $msg"
393    ///     "#,
394    ///     CmdOptions::with_standard_io_messaging(),
395    /// );
396    ///
397    /// let process_id = handle.spawn(script).await?;
398    /// handle.send_message(process_id, "John").await?;
399    /// let mut stream = handle.subscribe_message_stream(process_id).await?;
400    /// let received_msg = stream.try_next().await?.unwrap();
401    /// assert_eq!("Hello John", received_msg.try_into_string().unwrap());
402    /// # Ok(())
403    /// # }
404    /// ```
405    pub async fn send_message<M>(&self, id: ProcessId, message: M) -> Result<(), WriteMessageError>
406    where
407        M: Into<Message>,
408    {
409        if let Some(data) = Self::into_bytes_with_eol_char(message.into().bytes) {
410            let (responder, receiver) = oneshot::channel();
411            let msg = ProcessManagerMessage::WriteMessage {
412                id,
413                data,
414                responder,
415            };
416            let _ = self.sender.send(msg).await;
417            receiver.await??;
418        }
419        Ok(())
420    }
421
422    fn into_bytes_with_eol_char(mut bytes: Vec<u8>) -> Option<Vec<u8>> {
423        if let Some(last_char) = bytes.last() {
424            if *last_char != b'\n' {
425                bytes.push(b'\n');
426            }
427            Some(bytes)
428        } else {
429            None
430        }
431    }
432
433    /// Access asynchronous message stream from the process with given `id`.
434    ///
435    /// Messages read from the process are returned as [`Message`] types,
436    /// allowing a convenient conversion into raw bytes, string or even `Rust` types, which implement `Deserialize` trait.
437    /// If message stream was successfully subscribed, then `Ok(stream)` is returned, otherwise a [`ReadMessageError`] is returned.
438    /// A stream doesn't yield raw messages, instead each message is wrapped by `Result` with [`ReceiveMessageError`] error type.
439    /// For convenient stream transformation use [`TryMessageStreamExt`](trait@crate::manager::TryMessageStreamExt) and [`MessageStreamExt`](trait@crate::manager::MessageStreamExt) traits.
440    ///
441    /// `ProcessManager` for each child process assigns a buffer for messages received from it,
442    /// which can be configured via [`CmdOptions::set_message_output_buffer_capacity`](crate::model::command::CmdOptions::set_message_output_buffer_capacity).
443    // NOTE: the same doc as in CmdOptions::set_message_output_buffer_capacity below.
444    /// When parent process is not reading messages produced by the process,
445    /// then the messages are buffered up to the given capacity value.
446    /// If the buffer limit is reached and the process sends a new message,
447    /// the "oldest" buffered message will be removed. Therefore, when retrieving next message from the stream,
448    /// the [`ReceiveMessageError::LostMessages`] error will be returned, indicating how many buffered messages have been removed.
449    ///
450    /// The messages stream can be subscribed multiple times and each subscriber will receive a one copy of the original message.
451    /// Notice that buffer mentioned earlier is created not per subscriber, but per each process, so when one of subscribers not read messages, the buffer will fill up.
452    /// # Examples
453    ///
454    /// Reading a message via standard IO:
455    ///
456    /// ```
457    /// use futures::TryStreamExt;
458    /// use proc_heim::{
459    ///     manager::{ProcessManager, TryMessageStreamExt},
460    ///     model::command::{Cmd, CmdOptions, MessagingType},
461    /// };
462    ///
463    /// # #[tokio::main]
464    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
465    /// // spawn ProcessManager
466    /// let working_dir = tempfile::tempdir()?.into_path();
467    /// let handle = ProcessManager::spawn(working_dir)?;
468    ///
469    /// // create simple echo command
470    /// let msg = "Hello world!";
471    /// let options = CmdOptions::with_message_output(MessagingType::StandardIo);
472    /// let cmd = Cmd::with_args_and_options("echo", [msg], options);
473    ///
474    /// // read a message from spawned process
475    /// let process_id = handle.spawn(cmd).await?;
476    /// let mut stream = handle.subscribe_message_stream(process_id).await?
477    ///     .into_string_stream();
478    /// let received_msg = stream.try_next().await?.unwrap();
479    /// assert_eq!(msg, received_msg);
480    /// # Ok(())
481    /// # }
482    /// ```
483    ///
484    /// Reading messages via named pipes:
485    ///
486    /// ```
487    /// use futures::{StreamExt, TryStreamExt};
488    /// use proc_heim::{
489    ///     manager::{ProcessManager, TryMessageStreamExt, ResultStreamExt},
490    ///     model::{
491    ///         command::CmdOptions,
492    ///         script::{Script, ScriptingLanguage},
493    ///     },
494    /// };
495    /// use std::path::PathBuf;
496    ///
497    /// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
498    /// let working_dir = tempfile::tempdir()?.into_path();
499    /// let handle = ProcessManager::spawn(working_dir)?;
500    /// let script = Script::with_options(
501    ///     ScriptingLanguage::Bash,
502    ///     r#"
503    ///     counter=0
504    ///     while read msg; do
505    ///         echo "$counter: $msg" > $OUTPUT_PIPE
506    ///         counter=$((counter + 1))
507    ///     done < $INPUT_PIPE
508    ///     "#,
509    ///     CmdOptions::with_named_pipe_messaging(), // we want to send messages bidirectionally
510    /// );
511    ///
512    /// // We can use "spawn_with_handle" instead of "spawn" to get "ProcessHandle",
513    /// // which mimics the "ProcessManagerHandle" API,
514    /// // but without having to pass the process ID to each method call.
515    /// let process_handle = handle.spawn_with_handle(script).await?;
516    ///
517    /// process_handle.send_message("First message").await?;
518    /// // We can send a next message without causing a deadlock here.
519    /// // This is possible because the response to the first message
520    /// // will be read by a dedicated Tokio task,
521    /// // spawned automatically by the Process Manager.
522    /// process_handle.send_message("Second message").await?;
523    ///
524    /// let mut stream = process_handle
525    ///     .subscribe_message_stream()
526    ///     .await?
527    ///     .into_string_stream();
528    ///
529    /// assert_eq!("0: First message", stream.try_next().await?.unwrap());
530    /// assert_eq!("1: Second message", stream.try_next().await?.unwrap());
531    /// # Ok(()) }
532    /// ```
533    pub async fn subscribe_message_stream(
534        &self,
535        id: ProcessId,
536    ) -> Result<impl Stream<Item = Result<Message, ReceiveMessageError>>, ReadMessageError> {
537        let (responder, receiver) = oneshot::channel();
538        let msg = ProcessManagerMessage::SubscribeMessageStream { id, responder };
539        let _ = self.sender.send(msg).await;
540        let message_receiver = receiver.await??;
541        let stream = BroadcastStream::new(message_receiver)
542            .map(|v| v.map(Message::from_bytes).map_err(Into::into));
543        Ok(stream)
544    }
545
546    /// Fetch logs from standard `output` stream, produced by a process with given `id`.
547    ///
548    /// The method will return [`GetLogsError::LoggingTypeWasNotConfigured`] error, if [`LoggingType`](enum@crate::model::command::LoggingType) was set to `StderrOnly`.
549    /// Notice that each string ending with new line character (`\n`) is treated as one log.
550    /// # Examples
551    /// ```
552    /// use proc_heim::{
553    ///     manager::{LogsQuery, ProcessManager},
554    ///     model::{
555    ///         command::{CmdOptions, LoggingType},
556    ///         script::{Script, ScriptingLanguage},
557    ///     },
558    /// };
559    /// use std::time::Duration;
560    ///
561    /// # #[tokio::main]
562    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
563    /// let working_dir = tempfile::tempdir()?.into_path();
564    /// let handle = ProcessManager::spawn(working_dir)?;
565    /// let script = Script::with_options(
566    ///     ScriptingLanguage::Bash,
567    ///     r#"
568    ///     echo message1
569    ///     echo message2
570    ///     "#,
571    ///     CmdOptions::with_logging(LoggingType::StdoutOnly),
572    /// );
573    ///
574    /// let process_id = handle.spawn(script).await?;
575    ///
576    /// // We need to wait for the process to spawn and finish in order to collect logs,
577    /// // otherwise returned logs will be empty.
578    /// let _ = handle.wait(process_id, Duration::from_micros(10)).await?;
579    ///
580    /// let logs = handle
581    ///     .get_logs_stdout(process_id, LogsQuery::fetch_all())
582    ///     .await?;
583    /// assert_eq!(2, logs.len());
584    /// assert_eq!("message1", logs[0]);
585    /// assert_eq!("message2", logs[1]);
586    /// # Ok(())
587    /// # }
588    /// ```
589    pub async fn get_logs_stdout(
590        &self,
591        id: ProcessId,
592        query: LogsQuery,
593    ) -> Result<Vec<String>, GetLogsError> {
594        self.get_logs(id, LogsQueryType::Stdout, query).await
595    }
596
597    /// Fetch logs from standard `error` stream, produced by a process with given `id`.
598    ///
599    /// The method will return [`GetLogsError::LoggingTypeWasNotConfigured`] error, if [`LoggingType`](enum@crate::model::command::LoggingType) was set to `StdoutOnly`.
600    /// Notice that each string ending with new line character (`\n`) is treated as one log.
601    /// # Examples
602    /// ```
603    /// use proc_heim::{
604    ///     manager::{LogsQuery, ProcessManager},
605    ///     model::{
606    ///         command::{CmdOptions, LoggingType},
607    ///         script::{Script, ScriptingLanguage},
608    ///     },
609    /// };
610    /// use std::time::Duration;
611    ///
612    /// # #[tokio::main]
613    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
614    /// let working_dir = tempfile::tempdir()?.into_path();
615    /// let handle = ProcessManager::spawn(working_dir)?;
616    /// let script = Script::with_options(
617    ///     ScriptingLanguage::Bash,
618    ///     r#"
619    ///     echo message1 >&2
620    ///     echo message2 >&2
621    ///     echo message3 >&2
622    ///     "#,
623    ///     CmdOptions::with_logging(LoggingType::StderrOnly),
624    /// );
625    ///
626    /// let process_id = handle.spawn(script).await?;
627    ///
628    /// // We need to wait for the process to spawn and finish in order to collect logs,
629    /// // otherwise returned logs will be empty.
630    /// let _ = handle.wait(process_id, Duration::from_micros(10)).await?;
631    ///
632    /// let logs = handle
633    ///     .get_logs_stderr(process_id, LogsQuery::with_offset(1))
634    ///     .await?;
635    /// assert_eq!(2, logs.len());
636    /// assert_eq!("message2", logs[0]);
637    /// assert_eq!("message3", logs[1]);
638    /// # Ok(())
639    /// # }
640    pub async fn get_logs_stderr(
641        &self,
642        id: ProcessId,
643        query: LogsQuery,
644    ) -> Result<Vec<String>, GetLogsError> {
645        self.get_logs(id, LogsQueryType::Stderr, query).await
646    }
647
648    async fn get_logs(
649        &self,
650        id: ProcessId,
651        logs_query_type: LogsQueryType,
652        query: LogsQuery,
653    ) -> Result<Vec<String>, GetLogsError> {
654        let (responder, receiver) = oneshot::channel();
655        let msg = ProcessManagerMessage::GetLogs {
656            id,
657            logs_query_type,
658            query,
659            responder,
660        };
661        let _ = self.sender.send(msg).await;
662        let logs = receiver.await??;
663        Ok(logs)
664    }
665
666    /// Get information about the process with given `id`.
667    /// # Examples
668    /// Check information of running process:
669    /// ```
670    /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
671    ///
672    /// # #[tokio::main]
673    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
674    /// let working_dir = tempfile::tempdir()?.into_path();
675    /// let handle = ProcessManager::spawn(working_dir)?;
676    /// let process_id = handle.spawn(Cmd::new("cat")).await?;
677    ///
678    /// let process_info = handle.get_process_info(process_id).await?;
679    /// assert!(process_info.pid().is_some());
680    /// # Ok(())
681    /// # }
682    /// ```
683    pub async fn get_process_info(
684        &self,
685        id: ProcessId,
686    ) -> Result<ProcessInfo, GetProcessInfoError> {
687        let (responder, receiver) = oneshot::channel();
688        let msg = ProcessManagerMessage::GetProcessInfo { id, responder };
689        let _ = self.sender.send(msg).await;
690        let data = receiver.await??;
691        Ok(data)
692    }
693
694    /// Wait for the process with given `id` to finish.
695    ///
696    /// This method will spawn a `Tokio` task and check in loop, if the process exited.
697    /// The `poll_interval` parameter specifies how often to check whether the process has completed or not.
698    /// # Examples
699    /// ```
700    /// use std::time::Duration;
701    /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
702    ///
703    /// # #[tokio::main]
704    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
705    /// let working_dir = tempfile::tempdir()?.into_path();
706    /// let handle = ProcessManager::spawn(working_dir)?;
707    /// let process_id = handle.spawn(Cmd::new("env")).await?;
708    ///
709    /// let process_info = handle.wait(process_id, Duration::from_micros(10)).await??;
710    ///
711    /// assert!(!process_info.is_running());
712    /// assert!(process_info.exit_status().unwrap().success());
713    /// # Ok(())
714    /// # }
715    /// ```
716    pub fn wait(
717        &self,
718        id: ProcessId,
719        poll_interval: Duration,
720    ) -> JoinHandle<Result<ProcessInfo, GetProcessInfoError>> {
721        let handle = self.clone();
722        tokio::spawn(async move {
723            loop {
724                let process_info = handle.get_process_info(id).await?;
725                if process_info.is_running() {
726                    tokio::time::sleep(poll_interval).await;
727                } else {
728                    return Ok(process_info);
729                }
730            }
731        })
732    }
733
734    /// Forcefully kills the process with given `id`.
735    ///
736    /// It will also abort all background tasks related to the process (eg. for messaging, logging) and remove its process directory.
737    /// # Examples
738    /// ```
739    /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
740    ///
741    /// # #[tokio::main]
742    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
743    /// let working_dir = tempfile::tempdir()?.into_path();
744    /// let handle = ProcessManager::spawn(working_dir)?;
745    /// let process_id = handle.spawn(Cmd::new("cat")).await?;
746    ///
747    /// let result = handle.kill(process_id).await;
748    ///
749    /// assert!(result.is_ok());
750    /// # Ok(())
751    /// # }
752    /// ```
753    pub async fn kill(&self, id: ProcessId) -> Result<(), KillProcessError> {
754        let (responder, receiver) = oneshot::channel();
755        let msg = ProcessManagerMessage::KillProcess { id, responder };
756        let _ = self.sender.send(msg).await;
757        receiver.await??;
758        Ok(())
759    }
760
761    /// Abort a [`ProcessManager`] task associated with this handle. This will kill all child processes and delete their corresponding `process directories`.
762    /// Please note that [`ProcessManager`] will be aborted automatically when all of its handles are dropped.
763    pub async fn abort(&self) -> Result<(), AbortError> {
764        let (responder, receiver) = oneshot::channel();
765        let msg = ProcessManagerMessage::Abort { responder };
766        self.sender
767            .send(msg)
768            .await
769            .map_err(|_| AbortError::ManagerAlreadyAborted)?;
770        receiver
771            .await
772            .map_err(|_| AbortError::ManagerAlreadyAborted)?;
773        Ok(())
774    }
775
776    pub(crate) fn try_kill(&self, id: ProcessId) -> bool {
777        let (responder, _) = oneshot::channel();
778        let msg = ProcessManagerMessage::KillProcess { id, responder };
779        self.sender.try_send(msg).is_ok()
780    }
781
782    pub(crate) async fn kill_with_timeout(&self, id: ProcessId, duration: Duration) {
783        let (responder, receiver) = oneshot::channel();
784        let msg = ProcessManagerMessage::KillProcess { id, responder };
785        let _ = self.sender.send_timeout(msg, duration).await;
786        let _ = receiver.await;
787    }
788}
789
790/// Error type returned when reading message bytes from a messages stream.
791#[derive(thiserror::Error, Debug)]
792pub enum ReceiveMessageError {
793    /// Error indicating that some buffered messages were deleted due to buffer capacity overflow.
794    /// If you don't care about deleted messages, you can ignore this error (manually or using [`TryMessageStreamExt::ignore_lost_messages`](crate::manager::TryMessageStreamExt::ignore_lost_messages)).
795    /// Includes the number of deleted messages.
796    /// See [`ProcessManagerHandle::subscribe_message_stream`] for more information.
797    #[error("Lost {0} number of messages due to buffer capacity overflow")]
798    LostMessages(u64),
799}
800
801/// Error type returned when reading a deserializable message from a messages stream.
802#[derive(thiserror::Error, Debug)]
803pub enum ReceiveDeserializedMessageError {
804    /// Error indicating that some buffered messages were deleted due to buffer capacity overflow.
805    /// See [`ReceiveMessageError`] for more information.
806    #[error("Lost {0} number of messages due to buffer capacity overflow")]
807    LostMessages(u64),
808    /// Error indicating that a message cannot be deserialized from bytes. Includes error message.
809    #[error("{0}")]
810    CannotDeserializeMessage(String),
811}
812
813impl From<BroadcastStreamRecvError> for ReceiveMessageError {
814    fn from(err: BroadcastStreamRecvError) -> Self {
815        match err {
816            BroadcastStreamRecvError::Lagged(size) => ReceiveMessageError::LostMessages(size),
817        }
818    }
819}
820
821impl From<ReceiveMessageError> for ReceiveDeserializedMessageError {
822    fn from(value: ReceiveMessageError) -> Self {
823        match value {
824            ReceiveMessageError::LostMessages(number) => {
825                ReceiveDeserializedMessageError::LostMessages(number)
826            }
827        }
828    }
829}
830
831/// Error type returned when spawning a new child process.
832#[derive(thiserror::Error, Debug)]
833pub enum SpawnProcessError {
834    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
835    #[error("Cannot communicate with spawned process manager")]
836    ManagerCommunicationError(#[from] oneshot::error::RecvError),
837    /// Cannot create named pipes at process directory (first value). Second value is an error code.
838    #[error("Cannot create named pipe at path: {0}. Error code: {1}")]
839    CannotCreateNamedPipe(PathBuf, String),
840    /// Cannot spawn process due to unexpected an IO error.
841    #[error("Cannot spawn process: {0}")]
842    CannotSpawnProcess(#[from] io::Error),
843    /// Cannot create process directory due to an IO error.
844    #[error("Cannot create process directory: {0}")]
845    CannotCreateProcessWorkingDir(io::Error),
846    /// Error was returned by [`Runnable::bootstrap_cmd`] method. Includes an error message returned from this method.
847    #[error("Bootstrap process failed: {0}")]
848    BootstrapProcessFailed(String),
849}
850
851impl From<SpawnerError> for SpawnProcessError {
852    fn from(value: SpawnerError) -> Self {
853        match value {
854            SpawnerError::CannotCreateNamedPipe(pipe_path, err) => {
855                SpawnProcessError::CannotCreateNamedPipe(pipe_path, err.to_string())
856            }
857            SpawnerError::CannotSpawnProcess(err) => SpawnProcessError::CannotSpawnProcess(err),
858            SpawnerError::CannotCreateProcessWorkingDir(err) => {
859                SpawnProcessError::CannotCreateProcessWorkingDir(err)
860            }
861            SpawnerError::BootstrapProcessFailed(err) => Self::BootstrapProcessFailed(err),
862        }
863    }
864}
865
866/// Error type returned when subscribing to a messages stream.
867#[derive(thiserror::Error, Debug)]
868pub enum ReadMessageError {
869    /// The process with given ID was not found (the ID is wrong or the process has been already killed).
870    #[error("Process with id: {0} was not found")]
871    ProcessNotFound(ProcessId),
872    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
873    #[error("Cannot communicate with spawned process manager")]
874    ManagerCommunicationError(#[from] oneshot::error::RecvError),
875    /// Cannot get messages stream because a [`MessagingType`](enum@crate::model::command::MessagingType) has not been configured for the process.
876    #[error("Message reader for process with id: {0} was not found")]
877    MessageReaderNotFound(ProcessId),
878    /// The task used to read messages from a process has been killed.
879    /// This error "shouldn't" normally occur because the task is aborted when a process is being killed. And after this `ProcessNotFound` will be returned.
880    #[error("Message reader process has been killed")]
881    MessageReaderKilled,
882}
883
884/// Error type returned when sending a message to the process.
885#[derive(thiserror::Error, Debug)]
886pub enum WriteMessageError {
887    /// The process with given ID was not found (the ID is wrong or the process has been already killed).
888    #[error("Process with id: {0} was not found")]
889    ProcessNotFound(ProcessId),
890    /// Cannot serialize message to bytes. Includes error message.
891    #[error("{0}")]
892    CannotSerializeMessage(String),
893    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
894    #[error("Cannot communicate with spawned process manager")]
895    ManagerCommunicationError(#[from] oneshot::error::RecvError),
896    /// An unexpected IO error occurred when sending a message to the process.
897    #[error("Error occurred when sending message to process: {0}")]
898    IoError(#[from] std::io::Error),
899    /// The task used to send messages to a process has been killed.
900    /// This error "shouldn't" normally occur because the task is aborted when a process is being killed. And after this `ProcessNotFound` will be returned.
901    #[error("Message writer for process with id: {0} was not found")]
902    MessageWriterNotFound(ProcessId),
903}
904
905/// Error type returned when trying to kill the process.
906#[derive(thiserror::Error, Debug)]
907pub enum KillProcessError {
908    /// The process with given ID was not found (the ID is wrong or the process has been already killed).
909    #[error("Process with id: {0} was not found")]
910    ProcessNotFound(ProcessId),
911    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
912    #[error("Cannot communicate with spawned process manager")]
913    ManagerCommunicationError(#[from] oneshot::error::RecvError),
914}
915
916/// Error type returned when collecting logs from the process.
917#[derive(thiserror::Error, Debug)]
918pub enum GetLogsError {
919    /// The process with given ID was not found (the ID is wrong or the process has been already killed).
920    #[error("Process with id: {0} was not found")]
921    ProcessNotFound(ProcessId),
922    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
923    #[error("Cannot communicate with spawned process manager")]
924    ManagerCommunicationError(#[from] oneshot::error::RecvError),
925    /// Cannot get logs because a proper [`LoggingType`](enum@crate::model::command::MessagingType) has not been configured for the process.
926    #[error("Logging type: {0} was not configured for process")]
927    LoggingTypeWasNotConfigured(String),
928    /// The task used to reading logs from a process has been killed.
929    /// This error "shouldn't" normally occur because the task is aborted when a process is being killed. And after this `ProcessNotFound` will be returned.
930    #[error("Log read for process with id: {0} was not found")]
931    LogReaderNotFound(ProcessId),
932    /// An unexpected IO error occurred when reading logs from the process.
933    #[error(transparent)]
934    UnExpectedIoError(#[from] io::Error),
935}
936
937impl From<LogReaderError> for GetLogsError {
938    fn from(err: LogReaderError) -> Self {
939        match err {
940            LogReaderError::LogTypeWasNotConfigured(log_type) => {
941                Self::LoggingTypeWasNotConfigured(log_type.to_string())
942            }
943            LogReaderError::UnExpectedIoError(err) => Self::UnExpectedIoError(err),
944        }
945    }
946}
947
948/// Error type returned when trying to get an information about the process.
949#[derive(thiserror::Error, Debug)]
950pub enum GetProcessInfoError {
951    /// The process with given ID was not found (the ID is wrong or the process has been already killed).
952    #[error("Process with id: {0} was not found")]
953    ProcessNotFound(ProcessId),
954    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
955    #[error("Cannot communicate with spawned process manager")]
956    ManagerCommunicationError(#[from] oneshot::error::RecvError),
957    /// An unexpected IO error occurred when trying to get an information about the process.
958    #[error(transparent)]
959    UnExpectedIoError(#[from] io::Error),
960}
961
962/// Error type returned when trying to abort a [`ProcessManager`] task.
963#[derive(thiserror::Error, Debug)]
964pub enum AbortError {
965    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
966    #[error("Cannot communicate with spawned process manager")]
967    ManagerAlreadyAborted,
968}