tokio_process_tools/collector.rs
1use crate::output_stream::{Chunk, Next};
2use std::borrow::Cow;
3use std::fmt::Debug;
4use std::future::Future;
5use thiserror::Error;
6use tokio::sync::oneshot::Sender;
7use tokio::task::JoinHandle;
8
9/// Errors that can occur when collecting stream data.
10#[derive(Debug, Error)]
11pub enum CollectorError {
12 /// The collector task could not be joined/terminated.
13 #[error("Failed to join/terminate the collector task over stream '{stream_name}': {source}")]
14 TaskJoin {
15 /// The name of the stream this collector operates on.
16 stream_name: &'static str,
17
18 /// The source error.
19 #[source]
20 source: tokio::task::JoinError,
21 },
22}
23
24/// A trait for types that can act as sinks for collected stream data.
25///
26/// This is automatically implemented for any type that is `Debug + Send + Sync + 'static`.
27pub trait Sink: Debug + Send + Sync + 'static {}
28
29impl<T> Sink for T where T: Debug + Send + Sync + 'static {}
30
31/// An async collector for raw output chunks.
32///
33/// The collector itself may hold state via `&mut self`, but only the sink `S` is returned from
34/// [`Collector::wait`] or [`Collector::cancel`].
35///
36/// This trait-based API avoids allocating a boxed future for every collected item while still
37/// letting the returned future borrow `chunk` and `sink` across `.await`.
38///
39/// This uses a trait rather than `std::ops::AsyncFn` because stable Rust can express the lending
40/// async callback shape, but cannot yet express the `Send` bound required on an `AsyncFn`
41/// callback's returned future for use inside `tokio::spawn`.
42pub trait AsyncChunkCollector<S: Sink>: Send + 'static {
43 /// Collect a single chunk into `sink`.
44 fn collect<'a>(
45 &'a mut self,
46 chunk: Chunk,
47 sink: &'a mut S,
48 ) -> impl Future<Output = Next> + Send + 'a;
49}
50
51/// An async collector for parsed output lines.
52///
53/// The collector itself may hold state via `&mut self`, but only the sink `S` is returned from
54/// [`Collector::wait`] or [`Collector::cancel`].
55///
56/// This uses a trait rather than `std::ops::AsyncFn` because stable Rust can express the lending
57/// async callback shape, but cannot yet express the `Send` bound required on an `AsyncFn`
58/// callback's returned future for use inside `tokio::spawn`. Once that bound is expressible on
59/// stable Rust, this API can move back toward async-closure ergonomics.
60pub trait AsyncLineCollector<S: Sink>: Send + 'static {
61 /// Collect a single parsed line into `sink`.
62 fn collect<'a>(
63 &'a mut self,
64 line: Cow<'a, str>,
65 sink: &'a mut S,
66 ) -> impl Future<Output = Next> + Send + 'a;
67}
68
69/// A collector for stream data, inspecting it chunk by chunk but also providing mutable access
70/// to a sink in which the data can be stored.
71///
72/// See the `collect_*` functions on `BroadcastOutputStream` and `SingleOutputStream`.
73///
74/// For proper cleanup, call
75/// - `wait()`, which waits for the collection task to complete.
76/// - `cancel()`, which sends a termination signal and then waits for the collection task to complete.
77///
78/// If not cleaned up, the termination signal will be sent when dropping this collector,
79/// but the task will be aborted (forceful, not waiting for its regular completion).
80pub struct Collector<S: Sink> {
81 /// The name of the stream this collector operates on.
82 pub(crate) stream_name: &'static str,
83
84 pub(crate) task: Option<JoinHandle<S>>,
85 pub(crate) task_termination_sender: Option<Sender<()>>,
86}
87
88impl<S: Sink> Collector<S> {
89 /// Checks if this task has finished.
90 #[must_use]
91 pub fn is_finished(&self) -> bool {
92 self.task
93 .as_ref()
94 .is_none_or(tokio::task::JoinHandle::is_finished)
95 }
96
97 /// Wait for the collector to terminate naturally.
98 ///
99 /// A collector will automatically terminate when either:
100 ///
101 /// 1. The underlying write-side of the stream is dropped.
102 /// 2. The underlying stream is closed (by receiving an EOF / final read of 0 bytes).
103 /// 3. The first `Next::Break` is observed.
104 ///
105 /// If none of these may occur in your case, this could/will hang forever!
106 ///
107 /// The stdout/stderr streams naturally close when the process is terminated, so `wait`ing
108 /// on a collector after termination is fine:
109 ///
110 /// ```rust, no_run
111 /// # async fn test() {
112 /// # use std::time::Duration;
113 /// # use tokio_process_tools::{LineParsingOptions, Process};
114 ///
115 /// # let cmd = tokio::process::Command::new("ls");
116 /// let mut process = Process::new(cmd).spawn_broadcast().unwrap();
117 /// let collector = process.stdout().collect_lines_into_vec(LineParsingOptions::default());
118 /// process.terminate(Duration::from_secs(1), Duration::from_secs(1)).await.unwrap();
119 /// let collected = collector.wait().await.unwrap(); // This will return immediately.
120 /// # }
121 /// ```
122 ///
123 /// # Errors
124 ///
125 /// Returns [`CollectorError::TaskJoin`] if the collector task cannot be joined.
126 ///
127 /// # Panics
128 ///
129 /// Panics if the collector's internal task has already been taken.
130 pub async fn wait(mut self) -> Result<S, CollectorError> {
131 // Take the `task_termination_sender`. Let's make sure nobody can ever interfere with us
132 // waiting here. DO NOT drop it, or the task will terminate (at least if it also takes the
133 // receive-error as a signal to terminate)!
134 let tts = self.task_termination_sender.take();
135
136 let sink = self
137 .task
138 .take()
139 .expect("`task` to be present.")
140 .await
141 .map_err(|err| CollectorError::TaskJoin {
142 stream_name: self.stream_name,
143 source: err,
144 });
145
146 // Drop the termination sender, we don't need it. Task is now terminated.
147 drop(tts);
148
149 sink
150 }
151
152 /// Sends a cancellation event to the collector, letting it shut down.
153 ///
154 /// # Errors
155 ///
156 /// Returns [`CollectorError::TaskJoin`] if the collector task cannot be joined.
157 ///
158 /// # Panics
159 ///
160 /// Panics if the collector's internal cancellation sender has already been taken.
161 pub async fn cancel(mut self) -> Result<S, CollectorError> {
162 // We ignore any potential error here.
163 // Sending may fail if the task is already terminated (for example, by reaching EOF),
164 // which in turn dropped the receiver end!
165 let _res = self
166 .task_termination_sender
167 .take()
168 .expect("`task_termination_sender` to be present.")
169 .send(());
170
171 self.wait().await
172 }
173}
174
175impl<S: Sink> Drop for Collector<S> {
176 fn drop(&mut self) {
177 if let Some(task_termination_sender) = self.task_termination_sender.take() {
178 // We ignore any potential error here.
179 // Sending may fail if the task is already terminated (for example, by reaching EOF),
180 // which in turn dropped the receiver end!
181 let _res = task_termination_sender.send(());
182 }
183 if let Some(task) = self.task.take() {
184 task.abort();
185 }
186 }
187}