proc_heim/process/manager.rs
1use std::{
2 collections::HashMap,
3 fmt::Debug,
4 io,
5 path::{Path, PathBuf},
6 time::Duration,
7};
8
9use tokio::{
10 sync::{
11 broadcast, mpsc,
12 oneshot::{self},
13 },
14 task::JoinHandle,
15};
16use tokio_stream::{
17 wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
18 Stream, StreamExt,
19};
20use uuid::Uuid;
21
22use crate::working_dir::WorkingDir;
23
24use super::{
25 log_reader::{LogReaderError, LogsQuery, LogsQueryType},
26 message::Message,
27 spawner::ProcessSpawner,
28 ProcessHandle, ProcessInfo, Runnable,
29};
30use super::{
31 model::{Process, ProcessId},
32 spawner::SpawnerError,
33};
34
35#[derive(Debug)]
36enum ProcessManagerMessage {
37 SpawnProcess {
38 cmd: Box<dyn Runnable>,
39 responder: oneshot::Sender<Result<ProcessId, SpawnProcessError>>,
40 },
41 SubscribeMessageStream {
42 id: ProcessId,
43 responder: oneshot::Sender<Result<broadcast::Receiver<Vec<u8>>, ReadMessageError>>,
44 },
45 WriteMessage {
46 id: ProcessId,
47 data: Vec<u8>,
48 responder: oneshot::Sender<Result<(), WriteMessageError>>,
49 },
50 KillProcess {
51 id: ProcessId,
52 responder: oneshot::Sender<Result<(), KillProcessError>>,
53 },
54 GetLogs {
55 id: ProcessId,
56 logs_query_type: LogsQueryType,
57 query: LogsQuery,
58 responder: oneshot::Sender<Result<Vec<String>, GetLogsError>>,
59 },
60 GetProcessInfo {
61 id: ProcessId,
62 responder: oneshot::Sender<Result<ProcessInfo, GetProcessInfoError>>,
63 },
64 Abort {
65 responder: oneshot::Sender<Result<(), AbortError>>,
66 },
67}
68
69/// `ProcessManager` provides asynchronous API for spawning and managing multiple processes.
70///
71/// The implementation relies on the `Actor model` architecture which benefits from high concurrency and scalability and loose-coupling between actors (units doing some work).
72/// `ProcessManager` is a main actor responsible for:
73/// * spawning new actors (eg. for sending/reading messages to/from user-defined processes),
74/// * forwarding messages between client and other actors.
75///
76/// All files needed to handle spawned processes are kept in user-specified directory, called further `working directory`.
77/// Each process has own, unique subdirectory (`process directory`) inside `working directory`,
78/// where `ProcessManager` creates files/named pipes used for communication, logging etc.
79/// If the process has been killed manually, then its `process directory` is removed immediately.
80///
81/// Each spawned process has its own [`ProcessId`], which can be used to interact with it through [`ProcessManagerHandle`].
82/// For convenience of interacting with one process, use a [`ProcessHandle`] wrapper.
83pub struct ProcessManager {
84 working_dir: WorkingDir,
85 process_spawner: ProcessSpawner,
86 processes: HashMap<ProcessId, Process>,
87 receiver: mpsc::Receiver<ProcessManagerMessage>,
88 is_aborted: bool,
89}
90
91impl ProcessManager {
92 /// Spawns a new process manager task with a given `working_directory`, returning a handle associated with the manager's task.
93 ///
94 /// The `Err` value is returned, if provided `working_directory` is not a directory or is not writeable.
95 /// # Examples
96 /// ```no_run
97 /// # use proc_heim::manager::ProcessManager;
98 /// # use std::path::PathBuf;
99 /// let working_directory = PathBuf::from("/some/temp/path");
100 /// let handle = ProcessManager::spawn(working_directory).expect("Invalid working directory");
101 /// ```
102 pub fn spawn(working_directory: PathBuf) -> Result<ProcessManagerHandle, io::Error> {
103 Self::validate_working_dir(&working_directory)?;
104 let (sender, receiver) = mpsc::channel(32);
105 let working_dir = WorkingDir::new(working_directory);
106 let mut manager = Self {
107 working_dir: working_dir.clone(),
108 process_spawner: ProcessSpawner::new(working_dir),
109 receiver,
110 processes: HashMap::new(),
111 is_aborted: false,
112 };
113 tokio::spawn(async move { manager.run().await });
114 Ok(ProcessManagerHandle::new(sender))
115 }
116
117 fn validate_working_dir(working_directory: &Path) -> Result<(), io::Error> {
118 let file_path = working_directory.join(Uuid::new_v4().to_string());
119 std::fs::write(&file_path, "testing working dir")?;
120 std::fs::remove_file(file_path)
121 }
122
123 async fn run(&mut self) {
124 loop {
125 if self.is_aborted {
126 break;
127 } else if let Some(msg) = self.receiver.recv().await {
128 self.handle_message(msg).await;
129 } else {
130 break;
131 }
132 }
133 }
134
135 async fn handle_message(&mut self, msg: ProcessManagerMessage) {
136 match msg {
137 ProcessManagerMessage::SpawnProcess { cmd, responder } => {
138 let result = self.spawn_process(cmd).await;
139 let _ = responder.send(result);
140 }
141 ProcessManagerMessage::KillProcess { id, responder } => {
142 let result = self.kill_process(id).await;
143 let _ = responder.send(result);
144 }
145 ProcessManagerMessage::SubscribeMessageStream { id, responder } => {
146 let result = self.subscribe_to_message_stream(id).await;
147 let _ = responder.send(result);
148 }
149 ProcessManagerMessage::WriteMessage {
150 id,
151 data,
152 responder,
153 } => {
154 let result = self.send_message(id, data).await;
155 let _ = responder.send(result);
156 }
157 ProcessManagerMessage::GetLogs {
158 id,
159 logs_query_type,
160 query,
161 responder,
162 } => {
163 let result = self.get_logs(id, logs_query_type, query).await;
164 let _ = responder.send(result);
165 }
166 ProcessManagerMessage::GetProcessInfo { id, responder } => {
167 let result = self.get_process_info(id);
168 let _ = responder.send(result);
169 }
170 ProcessManagerMessage::Abort { responder } => {
171 let result = self.abort().await;
172 let _ = responder.send(result);
173 }
174 }
175 }
176
177 async fn spawn_process(
178 &mut self,
179 runnable: Box<dyn Runnable>,
180 ) -> Result<ProcessId, SpawnProcessError> {
181 let id = ProcessId::random();
182 let process = self.process_spawner.spawn_runnable(&id, runnable)?;
183 self.processes.insert(id, process);
184 Ok(id)
185 }
186
187 async fn kill_process(&mut self, id: ProcessId) -> Result<(), KillProcessError> {
188 let process = self
189 .processes
190 .get_mut(&id)
191 .ok_or(KillProcessError::ProcessNotFound(id))?;
192 if let Some(reader) = process.message_reader.take() {
193 reader.abort().await;
194 }
195 if let Some(writer) = process.message_writer.take() {
196 writer.abort().await;
197 }
198 if let Some(log_reader) = process.log_reader.take() {
199 log_reader.abort().await;
200 }
201 self.processes.remove(&id); // kill_on_drop() is used to kill child process
202 let process_dir = self.working_dir.process_dir(&id);
203 let _ = tokio::fs::remove_dir_all(process_dir).await;
204 Ok(())
205 }
206
207 async fn subscribe_to_message_stream(
208 &mut self,
209 id: ProcessId,
210 ) -> Result<broadcast::Receiver<Vec<u8>>, ReadMessageError> {
211 let reader = self
212 .processes
213 .get(&id)
214 .ok_or(ReadMessageError::ProcessNotFound(id))?
215 .message_reader
216 .as_ref()
217 .ok_or(ReadMessageError::MessageReaderNotFound(id))?;
218
219 let receiver = reader
220 .subscribe()
221 .await
222 .map_err(|_| ReadMessageError::MessageReaderKilled)?;
223 Ok(receiver)
224 }
225
226 async fn send_message(
227 &mut self,
228 id: ProcessId,
229 data: Vec<u8>,
230 ) -> Result<(), WriteMessageError> {
231 self.processes
232 .get(&id)
233 .ok_or(WriteMessageError::ProcessNotFound(id))?
234 .message_writer
235 .as_ref()
236 .ok_or(WriteMessageError::MessageWriterNotFound(id))?
237 .write(data)
238 .await
239 .map_err(Into::into)
240 }
241
242 async fn get_logs(
243 &mut self,
244 id: ProcessId,
245 logs_query_type: LogsQueryType,
246 query: LogsQuery,
247 ) -> Result<Vec<String>, GetLogsError> {
248 self.processes
249 .get(&id)
250 .ok_or(GetLogsError::ProcessNotFound(id))?
251 .log_reader
252 .as_ref()
253 .ok_or(GetLogsError::LogReaderNotFound(id))?
254 .read_logs(logs_query_type, query)
255 .await
256 .map_err(Into::into)
257 }
258
259 fn get_process_info(&mut self, id: ProcessId) -> Result<ProcessInfo, GetProcessInfoError> {
260 let process = self
261 .processes
262 .get_mut(&id)
263 .ok_or(GetProcessInfoError::ProcessNotFound(id))?;
264 let pid = process.child.id();
265 let exit_status = process.child.try_wait()?;
266 Ok(ProcessInfo::new(pid, exit_status))
267 }
268
269 async fn abort(&mut self) -> Result<(), AbortError> {
270 self.is_aborted = true;
271 self.receiver.close();
272 let ids: Vec<ProcessId> = self.processes.keys().cloned().collect();
273 for id in ids {
274 let _ = self.kill_process(id).await;
275 }
276 Ok(())
277 }
278}
279
280/// Type used for communication with [`ProcessManager`] task.
281///
282/// It provides asynchronous API for:
283/// * spawning new child processes, which implements [`Runnable`] trait,
284/// * sending messages to spawned processes as:
285/// * raw bytes,
286/// * strings,
287/// * `Rust` data types serialized to raw bytes, `JSON` or `MessagePack`.
288/// * receiving messages from spawned processes using asynchronous streams, including:
289/// * reading messages by multiple subscribers,
290/// * buffering not read messages with configurable buffer capacity,
291/// * communication via standard I/O or named pipes.
292/// * reading logs produced by spawned processes (from standard output/error streams),
293/// * fetching information about spawned processes (OS-assigned pid, exit status),
294/// * forcefully killing processes,
295/// * waiting processes to finish.
296///
297/// `ProcessManagerHandle` can only be created by calling [`ProcessManager::spawn`] method.
298/// The handle can be cheaply cloned and used safely by many threads.
299#[derive(Clone, Debug)]
300pub struct ProcessManagerHandle {
301 sender: mpsc::Sender<ProcessManagerMessage>,
302}
303
304impl ProcessManagerHandle {
305 fn new(sender: mpsc::Sender<ProcessManagerMessage>) -> Self {
306 Self { sender }
307 }
308
309 /// Spawn a new child process, returning its assigned identifier, which can be used to interact with the process.
310 /// # Examples
311 /// ```
312 /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
313 ///
314 /// # #[tokio::main]
315 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
316 /// let working_dir = tempfile::tempdir()?.into_path();
317 /// let handle = ProcessManager::spawn(working_dir)?;
318 /// let cmd = Cmd::new("echo");
319 /// let process_id = handle.spawn(cmd).await?;
320 /// # Ok(())
321 /// # }
322 pub async fn spawn(&self, runnable: impl Runnable) -> Result<ProcessId, SpawnProcessError> {
323 let (responder, receiver) = oneshot::channel();
324 let msg = ProcessManagerMessage::SpawnProcess {
325 cmd: Box::new(runnable),
326 responder,
327 };
328 let _ = self.sender.send(msg).await;
329 let process_id = receiver.await??;
330 Ok(process_id)
331 }
332
333 /// Spawn a new child process, returning a [`ProcessHandle`], which can be used to interact with the process.
334 pub async fn spawn_with_handle(
335 &self,
336 runnable: impl Runnable,
337 ) -> Result<ProcessHandle, SpawnProcessError> {
338 let id = self.spawn(runnable).await?;
339 Ok(ProcessHandle::new(id, self.clone()))
340 }
341
342 /// Send a [`Message`] to the process with given `id`.
343 /// # Examples
344 /// ```
345 /// use futures::TryStreamExt;
346 /// use proc_heim::{
347 /// manager::ProcessManager,
348 /// model::{
349 /// command::CmdOptions,
350 /// script::{Script, ScriptingLanguage},
351 /// },
352 /// };
353 ///
354 /// # #[tokio::main]
355 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
356 /// let working_dir = tempfile::tempdir()?.into_path();
357 /// let handle = ProcessManager::spawn(working_dir)?;
358 ///
359 /// let script = Script::with_options(
360 /// ScriptingLanguage::Bash,
361 /// r#"
362 /// read -r msg < /dev/stdin
363 /// echo "Hello $msg"
364 /// "#,
365 /// CmdOptions::with_standard_io_messaging(),
366 /// );
367 ///
368 /// let process_id = handle.spawn(script).await?;
369 /// handle.send_message(process_id, "John").await?;
370 /// let mut stream = handle.subscribe_message_stream(process_id).await?;
371 /// let received_msg = stream.try_next().await?.unwrap();
372 /// assert_eq!("Hello John", received_msg.try_into_string().unwrap());
373 /// # Ok(())
374 /// # }
375 /// ```
376 pub async fn send_message<M>(&self, id: ProcessId, message: M) -> Result<(), WriteMessageError>
377 where
378 M: Into<Message>,
379 {
380 if let Some(data) = Self::into_bytes_with_eol_char(message.into().bytes) {
381 let (responder, receiver) = oneshot::channel();
382 let msg = ProcessManagerMessage::WriteMessage {
383 id,
384 data,
385 responder,
386 };
387 let _ = self.sender.send(msg).await;
388 receiver.await??;
389 }
390 Ok(())
391 }
392
393 fn into_bytes_with_eol_char(mut bytes: Vec<u8>) -> Option<Vec<u8>> {
394 if let Some(last_char) = bytes.last() {
395 if *last_char != b'\n' {
396 bytes.push(b'\n');
397 }
398 Some(bytes)
399 } else {
400 None
401 }
402 }
403
404 /// Access asynchronous message stream from the process with given `id`.
405 ///
406 /// Messages read from the process are returned as [`Message`] types,
407 /// allowing a convenient conversion into raw bytes, string or even `Rust` types, which implement `Deserialize` trait.
408 /// If message stream was successfully subscribed, then `Ok(stream)` is returned, otherwise a [`ReadMessageError`] is returned.
409 /// A stream doesn't yield raw messages, instead each message is wrapped by `Result` with [`ReceiveMessageError`] error type.
410 /// For convenient stream transformation use [`TryMessageStreamExt`](trait@crate::manager::TryMessageStreamExt) and [`MessageStreamExt`](trait@crate::manager::MessageStreamExt) traits.
411 ///
412 /// `ProcessManager` for each child process assigns a buffer for messages received from it,
413 /// which can be configured via [`CmdOptions::set_message_output_buffer_capacity`](crate::model::command::CmdOptions::set_message_output_buffer_capacity).
414 // NOTE: the same doc as in CmdOptions::set_message_output_buffer_capacity below.
415 /// When parent process is not reading messages produced by the process,
416 /// then the messages are buffered up to the given capacity value.
417 /// If the buffer limit is reached and the process sends a new message,
418 /// the "oldest" buffered message will be removed. Therefore, when retrieving next message from the stream,
419 /// the [`ReceiveMessageError::LostMessages`] error will be returned, indicating how many buffered messages have been removed.
420 ///
421 /// The messages stream can be subscribed multiple times and each subscriber will receive a one copy of the original message.
422 /// Notice that buffer mentioned earlier is created not per subscriber, but per each process, so when one of subscribers not read messages, the buffer will fill up.
423 /// # Examples
424 ///
425 /// Reading a message via standard IO:
426 ///
427 /// ```
428 /// use futures::TryStreamExt;
429 /// use proc_heim::{
430 /// manager::{ProcessManager, TryMessageStreamExt},
431 /// model::command::{Cmd, CmdOptions, MessagingType},
432 /// };
433 ///
434 /// # #[tokio::main]
435 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
436 /// // spawn ProcessManager
437 /// let working_dir = tempfile::tempdir()?.into_path();
438 /// let handle = ProcessManager::spawn(working_dir)?;
439 ///
440 /// // create simple echo command
441 /// let msg = "Hello world!";
442 /// let options = CmdOptions::with_message_output(MessagingType::StandardIo);
443 /// let cmd = Cmd::with_args_and_options("echo", [msg], options);
444 ///
445 /// // read a message from spawned process
446 /// let process_id = handle.spawn(cmd).await?;
447 /// let mut stream = handle.subscribe_message_stream(process_id).await?
448 /// .into_string_stream();
449 /// let received_msg = stream.try_next().await?.unwrap();
450 /// assert_eq!(msg, received_msg);
451 /// # Ok(())
452 /// # }
453 /// ```
454 ///
455 /// Reading messages via named pipes:
456 ///
457 /// ```
458 /// use futures::{StreamExt, TryStreamExt};
459 /// use proc_heim::{
460 /// manager::{ProcessManager, TryMessageStreamExt, ResultStreamExt},
461 /// model::{
462 /// command::CmdOptions,
463 /// script::{Script, ScriptingLanguage},
464 /// },
465 /// };
466 /// use std::path::PathBuf;
467 ///
468 /// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
469 /// let working_dir = tempfile::tempdir()?.into_path();
470 /// let handle = ProcessManager::spawn(working_dir)?;
471 /// let script = Script::with_options(
472 /// ScriptingLanguage::Bash,
473 /// r#"
474 /// counter=0
475 /// while read msg; do
476 /// echo "$counter: $msg" > $OUTPUT_PIPE
477 /// counter=$((counter + 1))
478 /// done < $INPUT_PIPE
479 /// "#,
480 /// CmdOptions::with_named_pipe_messaging(), // we want to send messages bidirectionally
481 /// );
482 ///
483 /// // We can use "spawn_with_handle" instead of "spawn" to get "ProcessHandle",
484 /// // which mimics the "ProcessManagerHandle" API,
485 /// // but without having to pass the process ID to each method call.
486 /// let process_handle = handle.spawn_with_handle(script).await?;
487 ///
488 /// process_handle.send_message("First message").await?;
489 /// // We can send a next message without causing a deadlock here.
490 /// // This is possible because the response to the first message
491 /// // will be read by a dedicated Tokio task,
492 /// // spawned automatically by the Process Manager.
493 /// process_handle.send_message("Second message").await?;
494 ///
495 /// let mut stream = process_handle
496 /// .subscribe_message_stream()
497 /// .await?
498 /// .into_string_stream();
499 ///
500 /// assert_eq!("0: First message", stream.try_next().await?.unwrap());
501 /// assert_eq!("1: Second message", stream.try_next().await?.unwrap());
502 /// # Ok(()) }
503 /// ```
504 pub async fn subscribe_message_stream(
505 &self,
506 id: ProcessId,
507 ) -> Result<impl Stream<Item = Result<Message, ReceiveMessageError>>, ReadMessageError> {
508 let (responder, receiver) = oneshot::channel();
509 let msg = ProcessManagerMessage::SubscribeMessageStream { id, responder };
510 let _ = self.sender.send(msg).await;
511 let message_receiver = receiver.await??;
512 let stream = BroadcastStream::new(message_receiver)
513 .map(|v| v.map(Message::from_bytes).map_err(Into::into));
514 Ok(stream)
515 }
516
517 /// Fetch logs from standard `output` stream, produced by a process with given `id`.
518 ///
519 /// The method will return [`GetLogsError::LoggingTypeWasNotConfigured`] error, if [`LoggingType`](enum@crate::model::command::LoggingType) was set to `StderrOnly`.
520 /// Notice that each string ending with new line character (`\n`) is treated as one log.
521 /// # Examples
522 /// ```
523 /// use proc_heim::{
524 /// manager::{LogsQuery, ProcessManager},
525 /// model::{
526 /// command::{CmdOptions, LoggingType},
527 /// script::{Script, ScriptingLanguage},
528 /// },
529 /// };
530 /// use std::time::Duration;
531 ///
532 /// # #[tokio::main]
533 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
534 /// let working_dir = tempfile::tempdir()?.into_path();
535 /// let handle = ProcessManager::spawn(working_dir)?;
536 /// let script = Script::with_options(
537 /// ScriptingLanguage::Bash,
538 /// r#"
539 /// echo message1
540 /// echo message2
541 /// "#,
542 /// CmdOptions::with_logging(LoggingType::StdoutOnly),
543 /// );
544 ///
545 /// let process_id = handle.spawn(script).await?;
546 ///
547 /// // We need to wait for the process to spawn and finish in order to collect logs,
548 /// // otherwise returned logs will be empty.
549 /// let _ = handle.wait(process_id, Duration::from_micros(10)).await?;
550 ///
551 /// let logs = handle
552 /// .get_logs_stdout(process_id, LogsQuery::fetch_all())
553 /// .await?;
554 /// assert_eq!(2, logs.len());
555 /// assert_eq!("message1", logs[0]);
556 /// assert_eq!("message2", logs[1]);
557 /// # Ok(())
558 /// # }
559 /// ```
560 pub async fn get_logs_stdout(
561 &self,
562 id: ProcessId,
563 query: LogsQuery,
564 ) -> Result<Vec<String>, GetLogsError> {
565 self.get_logs(id, LogsQueryType::Stdout, query).await
566 }
567
568 /// Fetch logs from standard `error` stream, produced by a process with given `id`.
569 ///
570 /// The method will return [`GetLogsError::LoggingTypeWasNotConfigured`] error, if [`LoggingType`](enum@crate::model::command::LoggingType) was set to `StdoutOnly`.
571 /// Notice that each string ending with new line character (`\n`) is treated as one log.
572 /// # Examples
573 /// ```
574 /// use proc_heim::{
575 /// manager::{LogsQuery, ProcessManager},
576 /// model::{
577 /// command::{CmdOptions, LoggingType},
578 /// script::{Script, ScriptingLanguage},
579 /// },
580 /// };
581 /// use std::time::Duration;
582 ///
583 /// # #[tokio::main]
584 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
585 /// let working_dir = tempfile::tempdir()?.into_path();
586 /// let handle = ProcessManager::spawn(working_dir)?;
587 /// let script = Script::with_options(
588 /// ScriptingLanguage::Bash,
589 /// r#"
590 /// echo message1 >&2
591 /// echo message2 >&2
592 /// echo message3 >&2
593 /// "#,
594 /// CmdOptions::with_logging(LoggingType::StderrOnly),
595 /// );
596 ///
597 /// let process_id = handle.spawn(script).await?;
598 ///
599 /// // We need to wait for the process to spawn and finish in order to collect logs,
600 /// // otherwise returned logs will be empty.
601 /// let _ = handle.wait(process_id, Duration::from_micros(10)).await?;
602 ///
603 /// let logs = handle
604 /// .get_logs_stderr(process_id, LogsQuery::with_offset(1))
605 /// .await?;
606 /// assert_eq!(2, logs.len());
607 /// assert_eq!("message2", logs[0]);
608 /// assert_eq!("message3", logs[1]);
609 /// # Ok(())
610 /// # }
611 pub async fn get_logs_stderr(
612 &self,
613 id: ProcessId,
614 query: LogsQuery,
615 ) -> Result<Vec<String>, GetLogsError> {
616 self.get_logs(id, LogsQueryType::Stderr, query).await
617 }
618
619 async fn get_logs(
620 &self,
621 id: ProcessId,
622 logs_query_type: LogsQueryType,
623 query: LogsQuery,
624 ) -> Result<Vec<String>, GetLogsError> {
625 let (responder, receiver) = oneshot::channel();
626 let msg = ProcessManagerMessage::GetLogs {
627 id,
628 logs_query_type,
629 query,
630 responder,
631 };
632 let _ = self.sender.send(msg).await;
633 let logs = receiver.await??;
634 Ok(logs)
635 }
636
637 /// Get information about the process with given `id`.
638 /// # Examples
639 /// Check information of running process:
640 /// ```
641 /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
642 ///
643 /// # #[tokio::main]
644 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
645 /// let working_dir = tempfile::tempdir()?.into_path();
646 /// let handle = ProcessManager::spawn(working_dir)?;
647 /// let process_id = handle.spawn(Cmd::new("cat")).await?;
648 ///
649 /// let process_info = handle.get_process_info(process_id).await?;
650 /// assert!(process_info.pid().is_some());
651 /// # Ok(())
652 /// # }
653 /// ```
654 pub async fn get_process_info(
655 &self,
656 id: ProcessId,
657 ) -> Result<ProcessInfo, GetProcessInfoError> {
658 let (responder, receiver) = oneshot::channel();
659 let msg = ProcessManagerMessage::GetProcessInfo { id, responder };
660 let _ = self.sender.send(msg).await;
661 let data = receiver.await??;
662 Ok(data)
663 }
664
665 /// Wait for the process with given `id` to finish.
666 ///
667 /// This method will spawn a `Tokio` task and check in loop, if the process exited.
668 /// The `poll_interval` parameter specifies how often to check whether the process has completed or not.
669 /// # Examples
670 /// ```
671 /// use std::time::Duration;
672 /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
673 ///
674 /// # #[tokio::main]
675 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
676 /// let working_dir = tempfile::tempdir()?.into_path();
677 /// let handle = ProcessManager::spawn(working_dir)?;
678 /// let process_id = handle.spawn(Cmd::new("env")).await?;
679 ///
680 /// let process_info = handle.wait(process_id, Duration::from_micros(10)).await??;
681 ///
682 /// assert!(!process_info.is_running());
683 /// assert!(process_info.exit_status().unwrap().success());
684 /// # Ok(())
685 /// # }
686 /// ```
687 pub fn wait(
688 &self,
689 id: ProcessId,
690 poll_interval: Duration,
691 ) -> JoinHandle<Result<ProcessInfo, GetProcessInfoError>> {
692 let handle = self.clone();
693 tokio::spawn(async move {
694 loop {
695 let process_info = handle.get_process_info(id).await?;
696 if process_info.is_running() {
697 tokio::time::sleep(poll_interval).await;
698 } else {
699 return Ok(process_info);
700 }
701 }
702 })
703 }
704
705 /// Forcefully kills the process with given `id`.
706 ///
707 /// It will also abort all background tasks related to the process (eg. for messaging, logging) and remove its process directory.
708 /// # Examples
709 /// ```
710 /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
711 ///
712 /// # #[tokio::main]
713 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
714 /// let working_dir = tempfile::tempdir()?.into_path();
715 /// let handle = ProcessManager::spawn(working_dir)?;
716 /// let process_id = handle.spawn(Cmd::new("cat")).await?;
717 ///
718 /// let result = handle.kill(process_id).await;
719 ///
720 /// assert!(result.is_ok());
721 /// # Ok(())
722 /// # }
723 /// ```
724 pub async fn kill(&self, id: ProcessId) -> Result<(), KillProcessError> {
725 let (responder, receiver) = oneshot::channel();
726 let msg = ProcessManagerMessage::KillProcess { id, responder };
727 let _ = self.sender.send(msg).await;
728 receiver.await??;
729 Ok(())
730 }
731
732 /// Abort a [`ProcessManager`] task associated with this handle.
733 pub async fn abort(&self) -> Result<(), AbortError> {
734 let (responder, receiver) = oneshot::channel();
735 let msg = ProcessManagerMessage::Abort { responder };
736 self.sender
737 .send(msg)
738 .await
739 .map_err(|_| AbortError::ManagerAlreadyAborted)?;
740 receiver
741 .await
742 .map_err(|_| AbortError::ManagerAlreadyAborted)??;
743 Ok(())
744 }
745}
746
747/// Error type returned when reading message bytes from a messages stream.
748#[derive(thiserror::Error, Debug)]
749pub enum ReceiveMessageError {
750 /// Error indicating that some buffered messages were deleted due to buffer capacity overflow.
751 /// If you don't care about deleted messages, you can ignore this error (manually or using [`TryMessageStreamExt::ignore_lost_messages`](crate::manager::TryMessageStreamExt::ignore_lost_messages)).
752 /// Includes the number of deleted messages.
753 /// See [`ProcessManagerHandle::subscribe_message_stream`] for more information.
754 #[error("Lost {0} number of messages due to buffer capacity overflow")]
755 LostMessages(u64),
756}
757
758/// Error type returned when reading a deserializable message from a messages stream.
759#[derive(thiserror::Error, Debug)]
760pub enum ReceiveDeserializedMessageError {
761 /// Error indicating that some buffered messages were deleted due to buffer capacity overflow.
762 /// See [`ReceiveMessageError`] for more information.
763 #[error("Lost {0} number of messages due to buffer capacity overflow")]
764 LostMessages(u64),
765 /// Error indicating that a message cannot be deserialized from bytes. Includes error message.
766 #[error("{0}")]
767 CannotDeserializeMessage(String),
768}
769
770impl From<BroadcastStreamRecvError> for ReceiveMessageError {
771 fn from(err: BroadcastStreamRecvError) -> Self {
772 match err {
773 BroadcastStreamRecvError::Lagged(size) => ReceiveMessageError::LostMessages(size),
774 }
775 }
776}
777
778impl From<ReceiveMessageError> for ReceiveDeserializedMessageError {
779 fn from(value: ReceiveMessageError) -> Self {
780 match value {
781 ReceiveMessageError::LostMessages(number) => {
782 ReceiveDeserializedMessageError::LostMessages(number)
783 }
784 }
785 }
786}
787
788/// Error type returned when spawning a new child process.
789#[derive(thiserror::Error, Debug)]
790pub enum SpawnProcessError {
791 /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
792 #[error("Cannot communicate with spawned process manager")]
793 ManagerCommunicationError(#[from] oneshot::error::RecvError),
794 /// Cannot create named pipes at process directory (first value). Second value is an error code.
795 #[error("Cannot create named pipe at path: {0}. Error code: {1}")]
796 CannotCreateNamedPipe(PathBuf, String),
797 /// Cannot spawn process due to unexpected an IO error.
798 #[error("Cannot spawn process: {0}")]
799 CannotSpawnProcess(#[from] io::Error),
800 /// Cannot create process directory due to an IO error.
801 #[error("Cannot create process directory: {0}")]
802 CannotCreateProcessWorkingDir(io::Error),
803 /// Error was returned by [`Runnable::bootstrap_cmd`] method. Includes an error message returned from this method.
804 #[error("Bootstrap process failed: {0}")]
805 BootstrapProcessFailed(String),
806}
807
808impl From<SpawnerError> for SpawnProcessError {
809 fn from(value: SpawnerError) -> Self {
810 match value {
811 SpawnerError::CannotCreateNamedPipe(pipe_path, err) => {
812 SpawnProcessError::CannotCreateNamedPipe(pipe_path, err.to_string())
813 }
814 SpawnerError::CannotSpawnProcess(err) => SpawnProcessError::CannotSpawnProcess(err),
815 SpawnerError::CannotCreateProcessWorkingDir(err) => {
816 SpawnProcessError::CannotCreateProcessWorkingDir(err)
817 }
818 SpawnerError::BootstrapProcessFailed(err) => Self::BootstrapProcessFailed(err),
819 }
820 }
821}
822
823/// Error type returned when subscribing to a messages stream.
824#[derive(thiserror::Error, Debug)]
825pub enum ReadMessageError {
826 /// The process with given ID was not found (the ID is wrong or the process has been already killed).
827 #[error("Process with id: {0} was not found")]
828 ProcessNotFound(ProcessId),
829 /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
830 #[error("Cannot communicate with spawned process manager")]
831 ManagerCommunicationError(#[from] oneshot::error::RecvError),
832 /// Cannot get messages stream because a [`MessagingType`](enum@crate::model::command::MessagingType) has not been configured for the process.
833 #[error("Message reader for process with id: {0} was not found")]
834 MessageReaderNotFound(ProcessId),
835 /// The task used to read messages from a process has been killed.
836 /// This error "shouldn't" normally occur because the task is aborted when a process is being killed. And after this `ProcessNotFound` will be returned.
837 #[error("Message reader process has been killed")]
838 MessageReaderKilled,
839}
840
841/// Error type returned when sending a message to the process.
842#[derive(thiserror::Error, Debug)]
843pub enum WriteMessageError {
844 /// The process with given ID was not found (the ID is wrong or the process has been already killed).
845 #[error("Process with id: {0} was not found")]
846 ProcessNotFound(ProcessId),
847 /// Cannot serialize message to bytes. Includes error message.
848 #[error("{0}")]
849 CannotSerializeMessage(String),
850 /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
851 #[error("Cannot communicate with spawned process manager")]
852 ManagerCommunicationError(#[from] oneshot::error::RecvError),
853 /// An unexpected IO error occurred when sending a message to the process.
854 #[error("Error occurred when sending message to process: {0}")]
855 IoError(#[from] std::io::Error),
856 /// The task used to send messages to a process has been killed.
857 /// This error "shouldn't" normally occur because the task is aborted when a process is being killed. And after this `ProcessNotFound` will be returned.
858 #[error("Message writer for process with id: {0} was not found")]
859 MessageWriterNotFound(ProcessId),
860}
861
862/// Error type returned when trying to kill the process.
863#[derive(thiserror::Error, Debug)]
864pub enum KillProcessError {
865 /// The process with given ID was not found (the ID is wrong or the process has been already killed).
866 #[error("Process with id: {0} was not found")]
867 ProcessNotFound(ProcessId),
868 /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
869 #[error("Cannot communicate with spawned process manager")]
870 ManagerCommunicationError(#[from] oneshot::error::RecvError),
871}
872
873/// Error type returned when collecting logs from the process.
874#[derive(thiserror::Error, Debug)]
875pub enum GetLogsError {
876 /// The process with given ID was not found (the ID is wrong or the process has been already killed).
877 #[error("Process with id: {0} was not found")]
878 ProcessNotFound(ProcessId),
879 /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
880 #[error("Cannot communicate with spawned process manager")]
881 ManagerCommunicationError(#[from] oneshot::error::RecvError),
882 /// Cannot get logs because a proper [`LoggingType`](enum@crate::model::command::MessagingType) has not been configured for the process.
883 #[error("Logging type: {0} was not configured for process")]
884 LoggingTypeWasNotConfigured(String),
885 /// The task used to reading logs from a process has been killed.
886 /// This error "shouldn't" normally occur because the task is aborted when a process is being killed. And after this `ProcessNotFound` will be returned.
887 #[error("Log read for process with id: {0} was not found")]
888 LogReaderNotFound(ProcessId),
889 /// An unexpected IO error occurred when reading logs from the process.
890 #[error(transparent)]
891 UnExpectedIoError(#[from] io::Error),
892}
893
894impl From<LogReaderError> for GetLogsError {
895 fn from(err: LogReaderError) -> Self {
896 match err {
897 LogReaderError::LogTypeWasNotConfigured(log_type) => {
898 Self::LoggingTypeWasNotConfigured(log_type.to_string())
899 }
900 LogReaderError::UnExpectedIoError(err) => Self::UnExpectedIoError(err),
901 }
902 }
903}
904
905/// Error type returned when trying to get an information about the process.
906#[derive(thiserror::Error, Debug)]
907pub enum GetProcessInfoError {
908 /// The process with given ID was not found (the ID is wrong or the process has been already killed).
909 #[error("Process with id: {0} was not found")]
910 ProcessNotFound(ProcessId),
911 /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
912 #[error("Cannot communicate with spawned process manager")]
913 ManagerCommunicationError(#[from] oneshot::error::RecvError),
914 /// An unexpected IO error occurred when trying to get an information about the process.
915 #[error(transparent)]
916 UnExpectedIoError(#[from] io::Error),
917}
918
919/// Error type returned when trying to abort a [`ProcessManager`] task.
920#[derive(thiserror::Error, Debug)]
921pub enum AbortError {
922 /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
923 #[error("Cannot communicate with spawned process manager")]
924 ManagerAlreadyAborted,
925}