tokio_process_tools/
collector.rs

1use crate::output_stream::Next;
2use std::fmt::Debug;
3use std::future::Future;
4use std::pin::Pin;
5use thiserror::Error;
6use tokio::sync::oneshot::Sender;
7use tokio::task::JoinHandle;
8
9/// Errors that can occur when collecting stream data.
10#[derive(Debug, Error)]
11pub enum CollectorError {
12    /// The collector task could not be joined/terminated.
13    #[error("Failed to join/terminate the collector task over stream '{stream_name}': {source}")]
14    TaskJoin {
15        /// The name of the stream this collector operates on.
16        stream_name: &'static str,
17
18        /// The source error.
19        #[source]
20        source: tokio::task::JoinError,
21    },
22}
23
24/// A trait for types that can act as sinks for collected stream data.
25///
26/// This is automatically implemented for any type that is `Debug + Send + Sync + 'static`.
27pub trait Sink: Debug + Send + Sync + 'static {}
28
29impl<T> Sink for T where T: Debug + Send + Sync + 'static {}
30
31// NOTE: We use Pin<Box> here to force usage of Higher-Rank Trait Bounds (HRTBs).
32// The returned futures will most-likely capture the `&mut T`and are therefore poised
33// by its lifetime. Without the trait-object usage, this would not work.
34pub type AsyncCollectFn<'a> = Pin<Box<dyn Future<Output = Next> + Send + 'a>>;
35
36/// A collector for stream data, inspecting it chunk by chunk but also providing mutable access
37/// to a sink in which the data can be stored.
38///
39/// See the `collect_*` functions on `BroadcastOutputStream` and `SingleOutputStream`.
40///
41/// For proper cleanup, call
42/// - `wait()`, which waits for the collection task to complete.
43/// - `cancel()`, which sends a termination signal and then waits for the collection task to complete.
44///
45/// If not cleaned up, the termination signal will be sent when dropping this collector,
46/// but the task will be aborted (forceful, not waiting for its regular completion).
47pub struct Collector<S: Sink> {
48    /// The name of the stream this collector operates on.
49    pub(crate) stream_name: &'static str,
50
51    pub(crate) task: Option<JoinHandle<S>>,
52    pub(crate) task_termination_sender: Option<Sender<()>>,
53}
54
55impl<S: Sink> Collector<S> {
56    /// Checks if this task has finished.
57    pub fn is_finished(&self) -> bool {
58        self.task.as_ref().map(|t| t.is_finished()).unwrap_or(true)
59    }
60
61    /// Wait for the collector to terminate naturally.
62    ///
63    /// A collector will automatically terminate when either:
64    ///
65    /// 1. The underlying write-side of the stream is dropped.
66    /// 2. The underlying stream is closed (by receiving an EOF / final read of 0 bytes).
67    /// 3. The first `Next::Break` is observed.
68    ///
69    /// If none of these may occur in your case, this could/will hang forever!
70    ///
71    /// The stdout/stderr streams naturally close when the process is terminated, so `wait`ing
72    /// on a collector after termination is fine:
73    ///
74    /// ```rust, no_run
75    /// # async fn test() {
76    /// # use std::time::Duration;
77    /// # use tokio_process_tools::{LineParsingOptions, Process};
78    ///
79    /// # let cmd = tokio::process::Command::new("ls");
80    /// let mut process = Process::new(cmd).spawn_broadcast().unwrap();
81    /// let collector = process.stdout().collect_lines_into_vec(LineParsingOptions::default());
82    /// process.terminate(Duration::from_secs(1), Duration::from_secs(1)).await.unwrap();
83    /// let collected = collector.wait().await.unwrap(); // This will return immediately.
84    /// # }
85    /// ```
86    pub async fn wait(mut self) -> Result<S, CollectorError> {
87        // Take the `task_termination_sender`. Let's make sure nobody can ever interfere with us
88        // waiting here. DO NOT drop it, or the task will terminate (at least if it also takes the
89        // receive-error as a signal to terminate)!
90        let tts = self.task_termination_sender.take();
91
92        let sink = self
93            .task
94            .take()
95            .expect("`task` to be present.")
96            .await
97            .map_err(|err| CollectorError::TaskJoin {
98                stream_name: self.stream_name,
99                source: err,
100            });
101
102        // Drop the termination sender, we don't need it. Task is now terminated.
103        drop(tts);
104
105        sink
106    }
107
108    /// Sends a cancellation event to the collector, letting it shut down.
109    pub async fn cancel(mut self) -> Result<S, CollectorError> {
110        // We ignore any potential error here.
111        // Sending may fail if the task is already terminated (for example, by reaching EOF),
112        // which in turn dropped the receiver end!
113        let _res = self
114            .task_termination_sender
115            .take()
116            .expect("`task_termination_sender` to be present.")
117            .send(());
118
119        self.wait().await
120    }
121}
122
123impl<S: Sink> Drop for Collector<S> {
124    fn drop(&mut self) {
125        if let Some(task_termination_sender) = self.task_termination_sender.take() {
126            // We ignore any potential error here.
127            // Sending may fail if the task is already terminated (for example, by reaching EOF),
128            // which in turn dropped the receiver end!
129            let _res = task_termination_sender.send(());
130        }
131        if let Some(task) = self.task.take() {
132            task.abort();
133        }
134    }
135}