proc_heim/process/
manager.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
use std::{
    collections::HashMap,
    fmt::Debug,
    io,
    path::{Path, PathBuf},
    time::Duration,
};

use tokio::{
    sync::{
        broadcast, mpsc,
        oneshot::{self},
    },
    task::JoinHandle,
};
use tokio_stream::{
    wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
    Stream, StreamExt,
};
use uuid::Uuid;

use crate::working_dir::WorkingDir;

use super::{
    log_reader::{LogReaderError, LogsQuery, LogsQueryType},
    message::Message,
    spawner::ProcessSpawner,
    ProcessHandle, ProcessInfo, Runnable,
};
use super::{
    model::{Process, ProcessId},
    spawner::SpawnerError,
};

#[derive(Debug)]
enum ProcessManagerMessage {
    SpawnProcess {
        cmd: Box<dyn Runnable>,
        responder: oneshot::Sender<Result<ProcessId, SpawnProcessError>>,
    },
    SubscribeMessageStream {
        id: ProcessId,
        responder: oneshot::Sender<Result<broadcast::Receiver<Vec<u8>>, ReadMessageError>>,
    },
    WriteMessage {
        id: ProcessId,
        data: Vec<u8>,
        responder: oneshot::Sender<Result<(), WriteMessageError>>,
    },
    KillProcess {
        id: ProcessId,
        responder: oneshot::Sender<Result<(), KillProcessError>>,
    },
    GetLogs {
        id: ProcessId,
        logs_query_type: LogsQueryType,
        query: LogsQuery,
        responder: oneshot::Sender<Result<Vec<String>, GetLogsError>>,
    },
    GetProcessInfo {
        id: ProcessId,
        responder: oneshot::Sender<Result<ProcessInfo, GetProcessInfoError>>,
    },
    Abort {
        responder: oneshot::Sender<Result<(), AbortError>>,
    },
}

/// `ProcessManager` provides asynchronous API for spawning and managing multiple processes.
///
/// The implementation relies on the `Actor model` architecture which benefits from high concurrency and scalability and loose-coupling between actors (units doing some work).
/// `ProcessManager` is a main actor responsible for:
/// * spawning new actors (eg. for sending/reading messages to/from user-defined processes),
/// * forwarding messages between client and other actors.
///
/// All files needed to handle spawned processes are kept in user-specified directory, called further `working directory`.
/// Each process has own, unique subdirectory (`process directory`) inside `working directory`,
/// where `ProcessManager` creates files/named pipes used for communication, logging etc.
/// If the process has been killed manually, then its `process directory` is removed immediately.
///
/// Each spawned process has its own [`ProcessId`], which can be used to interact with it through [`ProcessManagerHandle`].
/// For convenience of interacting with one process, use a [`ProcessHandle`] wrapper.
pub struct ProcessManager {
    working_dir: WorkingDir,
    process_spawner: ProcessSpawner,
    processes: HashMap<ProcessId, Process>,
    receiver: mpsc::Receiver<ProcessManagerMessage>,
    is_aborted: bool,
}

impl ProcessManager {
    /// Spawns a new process manager task with a given `working_directory`, returning a handle associated with the manager's task.
    ///
    /// The `Err` value is returned, if provided `working_directory` is not a directory or is not writeable.
    /// # Examples
    /// ```no_run
    /// # use proc_heim::manager::ProcessManager;
    /// # use std::path::PathBuf;
    /// let working_directory = PathBuf::from("/some/temp/path");
    /// let handle = ProcessManager::spawn(working_directory).expect("Invalid working directory");
    /// ```
    pub fn spawn(working_directory: PathBuf) -> Result<ProcessManagerHandle, io::Error> {
        Self::validate_working_dir(&working_directory)?;
        let (sender, receiver) = mpsc::channel(32);
        let working_dir = WorkingDir::new(working_directory);
        let mut manager = Self {
            working_dir: working_dir.clone(),
            process_spawner: ProcessSpawner::new(working_dir),
            receiver,
            processes: HashMap::new(),
            is_aborted: false,
        };
        tokio::spawn(async move { manager.run().await });
        Ok(ProcessManagerHandle::new(sender))
    }

    fn validate_working_dir(working_directory: &Path) -> Result<(), io::Error> {
        let file_path = working_directory.join(Uuid::new_v4().to_string());
        std::fs::write(&file_path, "testing working dir")?;
        std::fs::remove_file(file_path)
    }

    async fn run(&mut self) {
        loop {
            if self.is_aborted {
                break;
            } else if let Some(msg) = self.receiver.recv().await {
                self.handle_message(msg).await;
            } else {
                break;
            }
        }
    }

    async fn handle_message(&mut self, msg: ProcessManagerMessage) {
        match msg {
            ProcessManagerMessage::SpawnProcess { cmd, responder } => {
                let result = self.spawn_process(cmd).await;
                let _ = responder.send(result);
            }
            ProcessManagerMessage::KillProcess { id, responder } => {
                let result = self.kill_process(id).await;
                let _ = responder.send(result);
            }
            ProcessManagerMessage::SubscribeMessageStream { id, responder } => {
                let result = self.subscribe_to_message_stream(id).await;
                let _ = responder.send(result);
            }
            ProcessManagerMessage::WriteMessage {
                id,
                data,
                responder,
            } => {
                let result = self.send_message(id, data).await;
                let _ = responder.send(result);
            }
            ProcessManagerMessage::GetLogs {
                id,
                logs_query_type,
                query,
                responder,
            } => {
                let result = self.get_logs(id, logs_query_type, query).await;
                let _ = responder.send(result);
            }
            ProcessManagerMessage::GetProcessInfo { id, responder } => {
                let result = self.get_process_info(id);
                let _ = responder.send(result);
            }
            ProcessManagerMessage::Abort { responder } => {
                let result = self.abort().await;
                let _ = responder.send(result);
            }
        }
    }

    async fn spawn_process(
        &mut self,
        runnable: Box<dyn Runnable>,
    ) -> Result<ProcessId, SpawnProcessError> {
        let id = ProcessId::random();
        let process = self.process_spawner.spawn_runnable(&id, runnable)?;
        self.processes.insert(id, process);
        Ok(id)
    }

    async fn kill_process(&mut self, id: ProcessId) -> Result<(), KillProcessError> {
        let process = self
            .processes
            .get_mut(&id)
            .ok_or(KillProcessError::ProcessNotFound(id))?;
        if let Some(reader) = process.message_reader.take() {
            reader.abort().await;
        }
        if let Some(writer) = process.message_writer.take() {
            writer.abort().await;
        }
        if let Some(log_reader) = process.log_reader.take() {
            log_reader.abort().await;
        }
        self.processes.remove(&id); // kill_on_drop() is used to kill child process
        let process_dir = self.working_dir.process_dir(&id);
        let _ = tokio::fs::remove_dir_all(process_dir).await;
        Ok(())
    }

    async fn subscribe_to_message_stream(
        &mut self,
        id: ProcessId,
    ) -> Result<broadcast::Receiver<Vec<u8>>, ReadMessageError> {
        let reader = self
            .processes
            .get(&id)
            .ok_or(ReadMessageError::ProcessNotFound(id))?
            .message_reader
            .as_ref()
            .ok_or(ReadMessageError::MessageReaderNotFound(id))?;

        let receiver = reader
            .subscribe()
            .await
            .map_err(|_| ReadMessageError::MessageReaderKilled)?;
        Ok(receiver)
    }

    async fn send_message(
        &mut self,
        id: ProcessId,
        data: Vec<u8>,
    ) -> Result<(), WriteMessageError> {
        self.processes
            .get(&id)
            .ok_or(WriteMessageError::ProcessNotFound(id))?
            .message_writer
            .as_ref()
            .ok_or(WriteMessageError::MessageWriterNotFound(id))?
            .write(data)
            .await
            .map_err(Into::into)
    }

    async fn get_logs(
        &mut self,
        id: ProcessId,
        logs_query_type: LogsQueryType,
        query: LogsQuery,
    ) -> Result<Vec<String>, GetLogsError> {
        self.processes
            .get(&id)
            .ok_or(GetLogsError::ProcessNotFound(id))?
            .log_reader
            .as_ref()
            .ok_or(GetLogsError::LogReaderNotFound(id))?
            .read_logs(logs_query_type, query)
            .await
            .map_err(Into::into)
    }

    fn get_process_info(&mut self, id: ProcessId) -> Result<ProcessInfo, GetProcessInfoError> {
        let process = self
            .processes
            .get_mut(&id)
            .ok_or(GetProcessInfoError::ProcessNotFound(id))?;
        let pid = process.child.id();
        let exit_status = process.child.try_wait()?;
        Ok(ProcessInfo::new(pid, exit_status))
    }

    async fn abort(&mut self) -> Result<(), AbortError> {
        self.is_aborted = true;
        self.receiver.close();
        let ids: Vec<ProcessId> = self.processes.keys().cloned().collect();
        for id in ids {
            let _ = self.kill_process(id).await;
        }
        Ok(())
    }
}

/// Type used for communication with [`ProcessManager`] task.
///
/// It provides asynchronous API for:
/// * spawning new child processes, which implements [`Runnable`] trait,
/// * sending messages to spawned processes as:
///     * raw bytes,
///     * strings,
///     * `Rust` data types serialized to raw bytes, `JSON` or `MessagePack`.
/// * receiving messages from spawned processes using asynchronous streams, including:
///     * reading messages by multiple subscribers,
///     * buffering not read messages with configurable buffer capacity,
///     * communication via standard I/O or named pipes.
/// * reading logs produced by spawned processes (from standard output/error streams),
/// * fetching information about spawned processes (OS-assigned pid, exit status),
/// * forcefully killing processes,
/// * waiting processes to finish.
///
/// `ProcessManagerHandle` can only be created by calling [`ProcessManager::spawn`] method.
/// The handle can be cheaply cloned and used safely by many threads.
#[derive(Clone, Debug)]
pub struct ProcessManagerHandle {
    sender: mpsc::Sender<ProcessManagerMessage>,
}

impl ProcessManagerHandle {
    fn new(sender: mpsc::Sender<ProcessManagerMessage>) -> Self {
        Self { sender }
    }

    /// Spawn a new child process, returning its assigned identifier, which can be used to interact with the process.
    /// # Examples
    /// ```
    /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
    ///
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let working_dir = tempfile::tempdir()?.into_path();
    /// let handle = ProcessManager::spawn(working_dir)?;
    /// let cmd = Cmd::new("echo");
    /// let process_id = handle.spawn(cmd).await?;
    /// # Ok(())
    /// # }
    pub async fn spawn(&self, runnable: impl Runnable) -> Result<ProcessId, SpawnProcessError> {
        let (responder, receiver) = oneshot::channel();
        let msg = ProcessManagerMessage::SpawnProcess {
            cmd: Box::new(runnable),
            responder,
        };
        let _ = self.sender.send(msg).await;
        let process_id = receiver.await??;
        Ok(process_id)
    }

    /// Spawn a new child process, returning a [`ProcessHandle`], which can be used to interact with the process.
    pub async fn spawn_with_handle(
        &self,
        runnable: impl Runnable,
    ) -> Result<ProcessHandle, SpawnProcessError> {
        let id = self.spawn(runnable).await?;
        Ok(ProcessHandle::new(id, self.clone()))
    }

    /// Send a [`Message`] to the process with given `id`.
    /// # Examples
    /// ```
    /// use futures::TryStreamExt;
    /// use proc_heim::{
    ///     manager::ProcessManager,
    ///     model::{
    ///         command::CmdOptions,
    ///         script::{Script, ScriptingLanguage},
    ///     },
    /// };
    ///
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let working_dir = tempfile::tempdir()?.into_path();
    /// let handle = ProcessManager::spawn(working_dir)?;
    ///
    /// let script = Script::with_options(
    ///     ScriptingLanguage::Bash,
    ///     r#"
    ///     read -r msg < /dev/stdin
    ///     echo "Hello $msg"
    ///     "#,
    ///     CmdOptions::with_standard_io_messaging(),
    /// );
    ///
    /// let process_id = handle.spawn(script).await?;
    /// handle.send_message(process_id, "John").await?;
    /// let mut stream = handle.subscribe_message_stream(process_id).await?;
    /// let received_msg = stream.try_next().await?.unwrap();
    /// assert_eq!("Hello John", received_msg.try_into_string().unwrap());
    /// # Ok(())
    /// # }
    /// ```
    pub async fn send_message<M>(&self, id: ProcessId, message: M) -> Result<(), WriteMessageError>
    where
        M: Into<Message>,
    {
        if let Some(data) = Self::into_bytes_with_eol_char(message.into().bytes) {
            let (responder, receiver) = oneshot::channel();
            let msg = ProcessManagerMessage::WriteMessage {
                id,
                data,
                responder,
            };
            let _ = self.sender.send(msg).await;
            receiver.await??;
        }
        Ok(())
    }

    fn into_bytes_with_eol_char(mut bytes: Vec<u8>) -> Option<Vec<u8>> {
        if let Some(last_char) = bytes.last() {
            if *last_char != b'\n' {
                bytes.push(b'\n');
            }
            Some(bytes)
        } else {
            None
        }
    }

    /// Access asynchronous message stream from the process with given `id`.
    ///
    /// Messages read from the process are returned as [`Message`] types,
    /// allowing a convenient conversion into raw bytes, string or even `Rust` types, which implement `Deserialize` trait.
    /// If message stream was successfully subscribed, then `Ok(stream)` is returned, otherwise a [`ReadMessageError`] is returned.
    /// A stream doesn't yield raw messages, instead each message is wrapped by `Result` with [`ReceiveMessageError`] error type.
    /// For convenient stream transformation use [`TryMessageStreamExt`](trait@crate::manager::TryMessageStreamExt) and [`MessageStreamExt`](trait@crate::manager::MessageStreamExt) traits.
    ///
    /// `ProcessManager` for each child process assigns a buffer for messages received from it,
    /// which can be configured via [`CmdOptions::set_message_output_buffer_capacity`](crate::model::command::CmdOptions::set_message_output_buffer_capacity).
    // NOTE: the same doc as in CmdOptions::set_message_output_buffer_capacity below.
    /// When parent process is not reading messages produced by the process,
    /// then the messages are buffered up to the given capacity value.
    /// If the buffer limit is reached and the process sends a new message,
    /// the "oldest" buffered message will be removed. Therefore, when retrieving next message from the stream,
    /// the [`ReceiveMessageError::LostMessages`] error will be returned, indicating how many buffered messages have been removed.
    ///
    /// The messages stream can be subscribed multiple times and each subscriber will receive a one copy of the original message.
    /// Notice that buffer mentioned earlier is created not per subscriber, but per each process, so when one of subscribers not read messages, the buffer will fill up.
    /// # Examples
    ///
    /// Reading a message via standard IO:
    ///
    /// ```
    /// use futures::TryStreamExt;
    /// use proc_heim::{
    ///     manager::{ProcessManager, TryMessageStreamExt},
    ///     model::command::{Cmd, CmdOptions, MessagingType},
    /// };
    ///
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// // spawn ProcessManager
    /// let working_dir = tempfile::tempdir()?.into_path();
    /// let handle = ProcessManager::spawn(working_dir)?;
    ///
    /// // create simple echo command
    /// let msg = "Hello world!";
    /// let options = CmdOptions::with_message_output(MessagingType::StandardIo);
    /// let cmd = Cmd::with_args_and_options("echo", [msg], options);
    ///
    /// // read a message from spawned process
    /// let process_id = handle.spawn(cmd).await?;
    /// let mut stream = handle.subscribe_message_stream(process_id).await?
    ///     .into_string_stream();
    /// let received_msg = stream.try_next().await?.unwrap();
    /// assert_eq!(msg, received_msg);
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// Reading messages via named pipes:
    ///
    /// ```
    /// use futures::{StreamExt, TryStreamExt};
    /// use proc_heim::{
    ///     manager::{ProcessManager, TryMessageStreamExt, ResultStreamExt},
    ///     model::{
    ///         command::CmdOptions,
    ///         script::{Script, ScriptingLanguage},
    ///     },
    /// };
    /// use std::path::PathBuf;
    ///
    /// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let working_dir = tempfile::tempdir()?.into_path();
    /// let handle = ProcessManager::spawn(working_dir)?;
    /// let script = Script::with_options(
    ///     ScriptingLanguage::Bash,
    ///     r#"
    ///     counter=0
    ///     while read msg; do
    ///         echo "$counter: $msg" > $OUTPUT_PIPE
    ///         counter=$((counter + 1))
    ///     done < $INPUT_PIPE
    ///     "#,
    ///     CmdOptions::with_named_pipe_messaging(), // we want to send messages bidirectionally
    /// );
    ///
    /// // We can use "spawn_with_handle" instead of "spawn" to get "ProcessHandle",
    /// // which mimics the "ProcessManagerHandle" API,
    /// // but without having to pass the process ID to each method call.
    /// let process_handle = handle.spawn_with_handle(script).await?;
    ///
    /// process_handle.send_message("First message").await?;
    /// // We can send a next message without causing a deadlock here.
    /// // This is possible because the response to the first message
    /// // will be read by a dedicated Tokio task,
    /// // spawned automatically by the Process Manager.
    /// process_handle.send_message("Second message").await?;
    ///
    /// let mut stream = process_handle
    ///     .subscribe_message_stream()
    ///     .await?
    ///     .into_string_stream();
    ///
    /// assert_eq!("0: First message", stream.try_next().await?.unwrap());
    /// assert_eq!("1: Second message", stream.try_next().await?.unwrap());
    /// # Ok(()) }
    /// ```
    pub async fn subscribe_message_stream(
        &self,
        id: ProcessId,
    ) -> Result<impl Stream<Item = Result<Message, ReceiveMessageError>>, ReadMessageError> {
        let (responder, receiver) = oneshot::channel();
        let msg = ProcessManagerMessage::SubscribeMessageStream { id, responder };
        let _ = self.sender.send(msg).await;
        let message_receiver = receiver.await??;
        let stream = BroadcastStream::new(message_receiver)
            .map(|v| v.map(Message::from_bytes).map_err(Into::into));
        Ok(stream)
    }

    /// Fetch logs from standard `output` stream, produced by a process with given `id`.
    ///
    /// The method will return [`GetLogsError::LoggingTypeWasNotConfigured`] error, if [`LoggingType`](enum@crate::model::command::LoggingType) was set to `StderrOnly`.
    /// Notice that each string ending with new line character (`\n`) is treated as one log.
    /// # Examples
    /// ```
    /// use proc_heim::{
    ///     manager::{LogsQuery, ProcessManager},
    ///     model::{
    ///         command::{CmdOptions, LoggingType},
    ///         script::{Script, ScriptingLanguage},
    ///     },
    /// };
    /// use std::time::Duration;
    ///
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let working_dir = tempfile::tempdir()?.into_path();
    /// let handle = ProcessManager::spawn(working_dir)?;
    /// let script = Script::with_options(
    ///     ScriptingLanguage::Bash,
    ///     r#"
    ///     echo message1
    ///     echo message2
    ///     "#,
    ///     CmdOptions::with_logging(LoggingType::StdoutOnly),
    /// );
    ///
    /// let process_id = handle.spawn(script).await?;
    ///
    /// // We need to wait for the process to spawn and finish in order to collect logs,
    /// // otherwise returned logs will be empty.
    /// let _ = handle.wait(process_id, Duration::from_micros(10)).await?;
    ///
    /// let logs = handle
    ///     .get_logs_stdout(process_id, LogsQuery::fetch_all())
    ///     .await?;
    /// assert_eq!(2, logs.len());
    /// assert_eq!("message1", logs[0]);
    /// assert_eq!("message2", logs[1]);
    /// # Ok(())
    /// # }
    /// ```
    pub async fn get_logs_stdout(
        &self,
        id: ProcessId,
        query: LogsQuery,
    ) -> Result<Vec<String>, GetLogsError> {
        self.get_logs(id, LogsQueryType::Stdout, query).await
    }

    /// Fetch logs from standard `error` stream, produced by a process with given `id`.
    ///
    /// The method will return [`GetLogsError::LoggingTypeWasNotConfigured`] error, if [`LoggingType`](enum@crate::model::command::LoggingType) was set to `StdoutOnly`.
    /// Notice that each string ending with new line character (`\n`) is treated as one log.
    /// # Examples
    /// ```
    /// use proc_heim::{
    ///     manager::{LogsQuery, ProcessManager},
    ///     model::{
    ///         command::{CmdOptions, LoggingType},
    ///         script::{Script, ScriptingLanguage},
    ///     },
    /// };
    /// use std::time::Duration;
    ///
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let working_dir = tempfile::tempdir()?.into_path();
    /// let handle = ProcessManager::spawn(working_dir)?;
    /// let script = Script::with_options(
    ///     ScriptingLanguage::Bash,
    ///     r#"
    ///     echo message1 >&2
    ///     echo message2 >&2
    ///     echo message3 >&2
    ///     "#,
    ///     CmdOptions::with_logging(LoggingType::StderrOnly),
    /// );
    ///
    /// let process_id = handle.spawn(script).await?;
    ///
    /// // We need to wait for the process to spawn and finish in order to collect logs,
    /// // otherwise returned logs will be empty.
    /// let _ = handle.wait(process_id, Duration::from_micros(10)).await?;
    ///
    /// let logs = handle
    ///     .get_logs_stderr(process_id, LogsQuery::with_offset(1))
    ///     .await?;
    /// assert_eq!(2, logs.len());
    /// assert_eq!("message2", logs[0]);
    /// assert_eq!("message3", logs[1]);
    /// # Ok(())
    /// # }
    pub async fn get_logs_stderr(
        &self,
        id: ProcessId,
        query: LogsQuery,
    ) -> Result<Vec<String>, GetLogsError> {
        self.get_logs(id, LogsQueryType::Stderr, query).await
    }

    async fn get_logs(
        &self,
        id: ProcessId,
        logs_query_type: LogsQueryType,
        query: LogsQuery,
    ) -> Result<Vec<String>, GetLogsError> {
        let (responder, receiver) = oneshot::channel();
        let msg = ProcessManagerMessage::GetLogs {
            id,
            logs_query_type,
            query,
            responder,
        };
        let _ = self.sender.send(msg).await;
        let logs = receiver.await??;
        Ok(logs)
    }

    /// Get information about the process with given `id`.
    /// # Examples
    /// Check information of running process:
    /// ```
    /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
    ///
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let working_dir = tempfile::tempdir()?.into_path();
    /// let handle = ProcessManager::spawn(working_dir)?;
    /// let process_id = handle.spawn(Cmd::new("cat")).await?;
    ///
    /// let process_info = handle.get_process_info(process_id).await?;
    /// assert!(process_info.pid().is_some());
    /// # Ok(())
    /// # }
    /// ```
    pub async fn get_process_info(
        &self,
        id: ProcessId,
    ) -> Result<ProcessInfo, GetProcessInfoError> {
        let (responder, receiver) = oneshot::channel();
        let msg = ProcessManagerMessage::GetProcessInfo { id, responder };
        let _ = self.sender.send(msg).await;
        let data = receiver.await??;
        Ok(data)
    }

    /// Wait for the process with given `id` to finish.
    ///
    /// This method will spawn a `Tokio` task and check in loop, if the process exited.
    /// The `poll_interval` parameter specifies how often to check whether the process has completed or not.
    /// # Examples
    /// ```
    /// use std::time::Duration;
    /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
    ///
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let working_dir = tempfile::tempdir()?.into_path();
    /// let handle = ProcessManager::spawn(working_dir)?;
    /// let process_id = handle.spawn(Cmd::new("env")).await?;
    ///
    /// let process_info = handle.wait(process_id, Duration::from_micros(10)).await??;
    ///
    /// assert!(!process_info.is_running());
    /// assert!(process_info.exit_status().unwrap().success());
    /// # Ok(())
    /// # }
    /// ```
    pub fn wait(
        &self,
        id: ProcessId,
        poll_interval: Duration,
    ) -> JoinHandle<Result<ProcessInfo, GetProcessInfoError>> {
        let handle = self.clone();
        tokio::spawn(async move {
            loop {
                let process_info = handle.get_process_info(id).await?;
                if process_info.is_running() {
                    tokio::time::sleep(poll_interval).await;
                } else {
                    return Ok(process_info);
                }
            }
        })
    }

    /// Forcefully kills the process with given `id`.
    ///
    /// It will also abort all background tasks related to the process (eg. for messaging, logging) and remove its process directory.
    /// # Examples
    /// ```
    /// use proc_heim::{manager::ProcessManager, model::command::Cmd};
    ///
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let working_dir = tempfile::tempdir()?.into_path();
    /// let handle = ProcessManager::spawn(working_dir)?;
    /// let process_id = handle.spawn(Cmd::new("cat")).await?;
    ///
    /// let result = handle.kill(process_id).await;
    ///
    /// assert!(result.is_ok());
    /// # Ok(())
    /// # }
    /// ```
    pub async fn kill(&self, id: ProcessId) -> Result<(), KillProcessError> {
        let (responder, receiver) = oneshot::channel();
        let msg = ProcessManagerMessage::KillProcess { id, responder };
        let _ = self.sender.send(msg).await;
        receiver.await??;
        Ok(())
    }

    /// Abort a [`ProcessManager`] task associated with this handle.
    pub async fn abort(&self) -> Result<(), AbortError> {
        let (responder, receiver) = oneshot::channel();
        let msg = ProcessManagerMessage::Abort { responder };
        self.sender
            .send(msg)
            .await
            .map_err(|_| AbortError::ManagerAlreadyAborted)?;
        receiver
            .await
            .map_err(|_| AbortError::ManagerAlreadyAborted)??;
        Ok(())
    }
}

/// Error type returned when reading message bytes from a messages stream.
#[derive(thiserror::Error, Debug)]
pub enum ReceiveMessageError {
    /// Error indicating that some buffered messages were deleted due to buffer capacity overflow.
    /// If you don't care about deleted messages, you can ignore this error (manually or using [`TryMessageStreamExt::ignore_lost_messages`](crate::manager::TryMessageStreamExt::ignore_lost_messages)).
    /// Includes the number of deleted messages.
    /// See [`ProcessManagerHandle::subscribe_message_stream`] for more information.
    #[error("Lost {0} number of messages due to buffer capacity overflow")]
    LostMessages(u64),
}

/// Error type returned when reading a deserializable message from a messages stream.
#[derive(thiserror::Error, Debug)]
pub enum ReceiveDeserializedMessageError {
    /// Error indicating that some buffered messages were deleted due to buffer capacity overflow.
    /// See [`ReceiveMessageError`] for more information.
    #[error("Lost {0} number of messages due to buffer capacity overflow")]
    LostMessages(u64),
    /// Error indicating that a message cannot be deserialized from bytes. Includes error message.
    #[error("{0}")]
    CannotDeserializeMessage(String),
}

impl From<BroadcastStreamRecvError> for ReceiveMessageError {
    fn from(err: BroadcastStreamRecvError) -> Self {
        match err {
            BroadcastStreamRecvError::Lagged(size) => ReceiveMessageError::LostMessages(size),
        }
    }
}

impl From<ReceiveMessageError> for ReceiveDeserializedMessageError {
    fn from(value: ReceiveMessageError) -> Self {
        match value {
            ReceiveMessageError::LostMessages(number) => {
                ReceiveDeserializedMessageError::LostMessages(number)
            }
        }
    }
}

/// Error type returned when spawning a new child process.
#[derive(thiserror::Error, Debug)]
pub enum SpawnProcessError {
    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
    #[error("Cannot communicate with spawned process manager")]
    ManagerCommunicationError(#[from] oneshot::error::RecvError),
    /// Cannot create named pipes at process directory (first value). Second value is an error code.
    #[error("Cannot create named pipe at path: {0}. Error code: {1}")]
    CannotCreateNamedPipe(PathBuf, String),
    /// Cannot spawn process due to unexpected an IO error.
    #[error("Cannot spawn process: {0}")]
    CannotSpawnProcess(#[from] io::Error),
    /// Cannot create process directory due to an IO error.
    #[error("Cannot create process directory: {0}")]
    CannotCreateProcessWorkingDir(io::Error),
    /// Error was returned by [`Runnable::bootstrap_cmd`] method. Includes an error message returned from this method.
    #[error("Bootstrap process failed: {0}")]
    BootstrapProcessFailed(String),
}

impl From<SpawnerError> for SpawnProcessError {
    fn from(value: SpawnerError) -> Self {
        match value {
            SpawnerError::CannotCreateNamedPipe(pipe_path, err) => {
                SpawnProcessError::CannotCreateNamedPipe(pipe_path, err.to_string())
            }
            SpawnerError::CannotSpawnProcess(err) => SpawnProcessError::CannotSpawnProcess(err),
            SpawnerError::CannotCreateProcessWorkingDir(err) => {
                SpawnProcessError::CannotCreateProcessWorkingDir(err)
            }
            SpawnerError::BootstrapProcessFailed(err) => Self::BootstrapProcessFailed(err),
        }
    }
}

/// Error type returned when subscribing to a messages stream.
#[derive(thiserror::Error, Debug)]
pub enum ReadMessageError {
    /// The process with given ID was not found (the ID is wrong or the process has been already killed).
    #[error("Process with id: {0} was not found")]
    ProcessNotFound(ProcessId),
    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
    #[error("Cannot communicate with spawned process manager")]
    ManagerCommunicationError(#[from] oneshot::error::RecvError),
    /// Cannot get messages stream because a [`MessagingType`](enum@crate::model::command::MessagingType) has not been configured for the process.
    #[error("Message reader for process with id: {0} was not found")]
    MessageReaderNotFound(ProcessId),
    /// The task used to read messages from a process has been killed.
    /// This error "shouldn't" normally occur because the task is aborted when a process is being killed. And after this `ProcessNotFound` will be returned.
    #[error("Message reader process has been killed")]
    MessageReaderKilled,
}

/// Error type returned when sending a message to the process.
#[derive(thiserror::Error, Debug)]
pub enum WriteMessageError {
    /// The process with given ID was not found (the ID is wrong or the process has been already killed).
    #[error("Process with id: {0} was not found")]
    ProcessNotFound(ProcessId),
    /// Cannot serialize message to bytes. Includes error message.
    #[error("{0}")]
    CannotSerializeMessage(String),
    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
    #[error("Cannot communicate with spawned process manager")]
    ManagerCommunicationError(#[from] oneshot::error::RecvError),
    /// An unexpected IO error occurred when sending a message to the process.
    #[error("Error occurred when sending message to process: {0}")]
    IoError(#[from] std::io::Error),
    /// The task used to send messages to a process has been killed.
    /// This error "shouldn't" normally occur because the task is aborted when a process is being killed. And after this `ProcessNotFound` will be returned.
    #[error("Message writer for process with id: {0} was not found")]
    MessageWriterNotFound(ProcessId),
}

/// Error type returned when trying to kill the process.
#[derive(thiserror::Error, Debug)]
pub enum KillProcessError {
    /// The process with given ID was not found (the ID is wrong or the process has been already killed).
    #[error("Process with id: {0} was not found")]
    ProcessNotFound(ProcessId),
    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
    #[error("Cannot communicate with spawned process manager")]
    ManagerCommunicationError(#[from] oneshot::error::RecvError),
}

/// Error type returned when collecting logs from the process.
#[derive(thiserror::Error, Debug)]
pub enum GetLogsError {
    /// The process with given ID was not found (the ID is wrong or the process has been already killed).
    #[error("Process with id: {0} was not found")]
    ProcessNotFound(ProcessId),
    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
    #[error("Cannot communicate with spawned process manager")]
    ManagerCommunicationError(#[from] oneshot::error::RecvError),
    /// Cannot get logs because a proper [`LoggingType`](enum@crate::model::command::MessagingType) has not been configured for the process.
    #[error("Logging type: {0} was not configured for process")]
    LoggingTypeWasNotConfigured(String),
    /// The task used to reading logs from a process has been killed.
    /// This error "shouldn't" normally occur because the task is aborted when a process is being killed. And after this `ProcessNotFound` will be returned.
    #[error("Log read for process with id: {0} was not found")]
    LogReaderNotFound(ProcessId),
    /// An unexpected IO error occurred when reading logs from the process.
    #[error(transparent)]
    UnExpectedIoError(#[from] io::Error),
}

impl From<LogReaderError> for GetLogsError {
    fn from(err: LogReaderError) -> Self {
        match err {
            LogReaderError::LogTypeWasNotConfigured(log_type) => {
                Self::LoggingTypeWasNotConfigured(log_type.to_string())
            }
            LogReaderError::UnExpectedIoError(err) => Self::UnExpectedIoError(err),
        }
    }
}

/// Error type returned when trying to get an information about the process.
#[derive(thiserror::Error, Debug)]
pub enum GetProcessInfoError {
    /// The process with given ID was not found (the ID is wrong or the process has been already killed).
    #[error("Process with id: {0} was not found")]
    ProcessNotFound(ProcessId),
    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
    #[error("Cannot communicate with spawned process manager")]
    ManagerCommunicationError(#[from] oneshot::error::RecvError),
    /// An unexpected IO error occurred when trying to get an information about the process.
    #[error(transparent)]
    UnExpectedIoError(#[from] io::Error),
}

/// Error type returned when trying to abort a [`ProcessManager`] task.
#[derive(thiserror::Error, Debug)]
pub enum AbortError {
    /// Cannot communicate with spawned process manager. Probably process manager task has been aborted.
    #[error("Cannot communicate with spawned process manager")]
    ManagerAlreadyAborted,
}