tokio_process_tools/output_stream/consumer/mod.rs
1//! Tokio runtime adapter. Drives a [`StreamVisitor`](crate::StreamVisitor) over a
2//! [`Subscription`](crate::output_stream::Subscription) on a tokio task and exposes the
3//! [`Consumer<S>`] handle with cooperative-cancel / abort semantics. Required machinery;
4//! tokio-bound by construction. The visitor traits this module drives are runtime-agnostic and
5//! live one level up at [`crate::output_stream::visitor`].
6
7pub(crate) mod driver;
8
9pub(crate) use driver::{spawn_consumer_async, spawn_consumer_sync};
10
11use crate::StreamReadError;
12use std::time::Duration;
13use thiserror::Error;
14use tokio::sync::oneshot::Sender;
15use tokio::task::JoinHandle;
16use tokio::time::{Instant, sleep_until};
17
18/// Errors that the [`Consumer`] infrastructure itself can raise while driving its stream.
19///
20/// These describe failures of the consumer task — joining, or reading the underlying stream.
21/// Visitor-specific failures (for example, a write-backed visitor's sink rejecting bytes) live
22/// in the visitor's own [`StreamVisitor::Output`](crate::StreamVisitor::Output) /
23/// [`AsyncStreamVisitor::Output`](crate::AsyncStreamVisitor::Output) type, not here. So a
24/// writer-backed consumer's `wait` returns
25/// `Result<Result<W, SinkWriteError>, ConsumerError>`: the outer result is what `ConsumerError`
26/// describes, the inner is the writer visitor's own outcome.
27#[derive(Debug, Error)]
28#[non_exhaustive]
29pub enum ConsumerError {
30 /// The consumer task could not be joined/terminated.
31 #[error("Failed to join/terminate the consumer task over stream '{stream_name}': {source}")]
32 TaskJoin {
33 /// The name of the stream this consumer operates on.
34 stream_name: &'static str,
35
36 /// The source error.
37 #[source]
38 source: tokio::task::JoinError,
39 },
40
41 /// The underlying stream failed while being read.
42 #[error("{source}")]
43 StreamRead {
44 /// The source error.
45 #[source]
46 source: StreamReadError,
47 },
48}
49
50/// A trait for types that can act as sinks for collected stream data.
51///
52/// This is automatically implemented for any type that is `Send + 'static`.
53pub trait Sink: Send + 'static {}
54
55impl<T> Sink for T where T: Send + 'static {}
56
57/// The result of [`Consumer::cancel`].
58#[derive(Debug)]
59pub enum ConsumerCancelOutcome<S: Sink> {
60 /// The consumer observed cooperative cancellation before the timeout and returned its sink.
61 Cancelled(S),
62
63 /// The timeout elapsed, so the consumer task was aborted and its sink was dropped.
64 Aborted,
65}
66
67impl<S: Sink> ConsumerCancelOutcome<S> {
68 /// Returns the sink from a cooperative cancellation, or `None` if the timeout elapsed.
69 #[must_use]
70 pub fn into_cancelled(self) -> Option<S> {
71 match self {
72 Self::Cancelled(sink) => Some(sink),
73 Self::Aborted => None,
74 }
75 }
76
77 /// Returns the sink from a cooperative cancellation, panicking with `message` if the timeout
78 /// elapsed and the task was aborted instead.
79 ///
80 /// Useful in tests where cooperative cancellation is expected to win the race; production
81 /// code should match on the outcome explicitly.
82 ///
83 /// # Panics
84 ///
85 /// Panics with `message` when this outcome is [`Self::Aborted`].
86 pub fn expect_cancelled(self, message: &str) -> S {
87 self.into_cancelled().expect(message)
88 }
89}
90
91/// A handle for a tokio task that consumes a stream by driving a visitor over its events.
92///
93/// Consumers are produced by the `inspect_*`, `collect_*`, and `wait_for_line` factory methods on
94/// [`BroadcastOutputStream`](crate::BroadcastOutputStream) and
95/// [`SingleSubscriberOutputStream`](crate::SingleSubscriberOutputStream). The type parameter `S`
96/// is the visitor's output (a sink, a writer, `()`, or another value the visitor returns when the
97/// stream ends).
98///
99/// For proper cleanup, call
100/// - `wait()`, which waits for the consumer task to complete.
101/// - `cancel(timeout)`, which asks the consumer to stop, waits for cooperative completion, and
102/// aborts the task if the timeout elapses first.
103/// - `abort()`, which forcefully aborts the consumer task.
104///
105/// If not cleaned up, the termination signal will be sent when dropping this consumer,
106/// but the task will be aborted (forceful, not waiting for its regular completion).
107pub struct Consumer<S: Sink> {
108 /// The name of the stream this consumer operates on.
109 pub(crate) stream_name: &'static str,
110
111 pub(crate) task: Option<JoinHandle<Result<S, StreamReadError>>>,
112 pub(crate) task_termination_sender: Option<Sender<()>>,
113}
114
115pub(crate) struct ConsumerWait<S: Sink> {
116 stream_name: &'static str,
117 guard: ConsumerWaitGuard<S>,
118}
119
120/// Owns a consumer task while [`Consumer::wait`] is pending.
121///
122/// `Consumer::wait` consumes the [`Consumer`] and then awaits its task. Without this guard,
123/// dropping that wait future after the task handle has been taken would detach the task instead
124/// of applying the same cleanup behavior as dropping an unused [`Consumer`]. The guard makes
125/// `wait` cancellation safe by signalling termination and aborting the task if the wait future is
126/// dropped early.
127struct ConsumerWaitGuard<S: Sink> {
128 task: Option<JoinHandle<Result<S, StreamReadError>>>,
129 task_termination_sender: Option<Sender<()>>,
130}
131
132impl<S: Sink> ConsumerWaitGuard<S> {
133 fn cancel(&mut self) {
134 let _res = self
135 .task_termination_sender
136 .take()
137 .expect("`task_termination_sender` to be present.")
138 .send(());
139 }
140
141 async fn wait(&mut self, stream_name: &'static str) -> Result<S, ConsumerError> {
142 let sink = self
143 .task
144 .as_mut()
145 .expect("`task` to be present.")
146 .await
147 .map_err(|err| ConsumerError::TaskJoin {
148 stream_name,
149 source: err,
150 })?
151 .map_err(|source| ConsumerError::StreamRead { source });
152
153 self.task = None;
154 self.task_termination_sender = None;
155
156 sink
157 }
158
159 async fn abort(&mut self) {
160 if let Some(task_termination_sender) = self.task_termination_sender.take() {
161 let _res = task_termination_sender.send(());
162 }
163 if let Some(task) = &self.task {
164 task.abort();
165 }
166 if let Some(task) = self.task.as_mut() {
167 let _res = task.await;
168 }
169 self.task = None;
170 }
171}
172
173impl<S: Sink> Drop for ConsumerWaitGuard<S> {
174 fn drop(&mut self) {
175 if let Some(task_termination_sender) = self.task_termination_sender.take() {
176 let _res = task_termination_sender.send(());
177 }
178 if let Some(task) = self.task.take() {
179 task.abort();
180 }
181 }
182}
183
184impl<S: Sink> Consumer<S> {
185 pub(crate) fn into_wait(mut self) -> ConsumerWait<S> {
186 ConsumerWait {
187 stream_name: self.stream_name,
188 guard: ConsumerWaitGuard {
189 task: self.task.take(),
190 task_termination_sender: self.task_termination_sender.take(),
191 },
192 }
193 }
194
195 /// Returns whether the consumer task has finished.
196 ///
197 /// This is a non-blocking task-state check. A finished consumer still owns its task result
198 /// until [`wait`](Self::wait), [`cancel`](Self::cancel), or [`abort`](Self::abort) consumes
199 /// it.
200 #[must_use]
201 pub fn is_finished(&self) -> bool {
202 self.task.as_ref().is_none_or(JoinHandle::is_finished)
203 }
204
205 /// Waits for the consumer to terminate naturally and returns its sink.
206 ///
207 /// A consumer will automatically terminate when either:
208 ///
209 /// 1. The underlying write-side of the stream is dropped.
210 /// 2. The underlying stream is closed (by receiving an EOF / final read of 0 bytes).
211 /// 3. The first `Next::Break` is observed.
212 ///
213 /// If none of these may occur in your case, this can hang forever. `wait` also waits for any
214 /// in-flight async visitor callback or writer call to complete.
215 ///
216 /// The stdout/stderr streams naturally close when the process is terminated, so `wait`ing
217 /// on a consumer after termination is fine:
218 ///
219 /// ```rust, no_run
220 /// # async fn test() {
221 /// # use std::time::Duration;
222 /// # use tokio_process_tools::{
223 /// # AutoName, CollectionOverflowBehavior, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE,
224 /// # LineCollectionOptions, LineParsingOptions, NumBytesExt, Process,
225 /// # };
226 ///
227 /// # let cmd = tokio::process::Command::new("ls");
228 /// let mut process = Process::new(cmd)
229 /// .name(AutoName::program_only())
230 /// .stdout_and_stderr(|stream| {
231 /// stream
232 /// .broadcast()
233 /// .best_effort_delivery()
234 /// .no_replay()
235 /// .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
236 /// .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
237 /// })
238 /// .spawn()
239 /// .unwrap();
240 /// let consumer = process.stdout().collect_lines_into_vec(
241 /// LineParsingOptions::default(),
242 /// LineCollectionOptions::Bounded {
243 /// max_bytes: 1.megabytes(),
244 /// max_lines: 1024,
245 /// overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
246 /// },
247 /// );
248 /// process.terminate(Duration::from_secs(1), Duration::from_secs(1)).await.unwrap();
249 /// let collected = consumer.wait().await.unwrap(); // This will return immediately.
250 /// # }
251 /// ```
252 ///
253 /// # Errors
254 ///
255 /// Returns [`ConsumerError::TaskJoin`] if the consumer task cannot be joined, or
256 /// [`ConsumerError::StreamRead`] if the underlying stream fails while being read.
257 /// Visitor-specific outcomes (e.g. a writer-backed visitor's sink failure) appear inside
258 /// the returned `S`, not in [`ConsumerError`].
259 ///
260 /// # Panics
261 ///
262 /// Panics if the consumer's internal task has already been taken.
263 pub async fn wait(self) -> Result<S, ConsumerError> {
264 self.into_wait().wait().await
265 }
266
267 /// Forcefully aborts the consumer task.
268 ///
269 /// This drops any pending async visitor callback or writer future, releases the stream
270 /// subscription, and drops the sink/writer instead of returning it. It cannot preempt blocking
271 /// synchronous code that never yields to the async runtime.
272 ///
273 /// For single-subscriber streams, the consumer claim is released after the aborted task has
274 /// been joined during this method.
275 pub async fn abort(self) {
276 self.into_wait().abort().await;
277 }
278
279 /// Cooperatively cancels the consumer, aborting it if `timeout` elapses first.
280 ///
281 /// Returns [`ConsumerCancelOutcome::Cancelled`] with the sink when the consumer observes
282 /// cancellation and exits normally before the timeout. Returns
283 /// [`ConsumerCancelOutcome::Aborted`] when the timeout elapses; in that case the task is
284 /// aborted, any pending callback/write future is dropped, and the sink/writer is not returned.
285 ///
286 /// Cancellation is still cooperative until the timeout boundary: an in-flight async callback
287 /// or writer call must finish before cancellation can be observed. For single-subscriber
288 /// streams, the consumer claim is released before this method returns, both after successful
289 /// cooperative cancellation and after timeout-driven abort.
290 ///
291 /// # Errors
292 ///
293 /// Returns [`ConsumerError::TaskJoin`] if the consumer task cannot be joined before the
294 /// timeout, or [`ConsumerError::StreamRead`] if the underlying stream fails while being read
295 /// before cancellation is observed. Visitor-specific outcomes appear inside the returned
296 /// `S` (carried by [`ConsumerCancelOutcome::Cancelled`]).
297 ///
298 /// # Panics
299 ///
300 /// Panics if the consumer's internal cancellation sender has already been taken.
301 pub async fn cancel(
302 self,
303 timeout: Duration,
304 ) -> Result<ConsumerCancelOutcome<S>, ConsumerError> {
305 let mut wait = self.into_wait();
306 wait.cancel();
307 match wait.wait_until(Instant::now() + timeout).await? {
308 Some(sink) => Ok(ConsumerCancelOutcome::Cancelled(sink)),
309 None => Ok(ConsumerCancelOutcome::Aborted),
310 }
311 }
312}
313
314impl<S: Sink> ConsumerWait<S> {
315 pub(crate) fn cancel(&mut self) {
316 self.guard.cancel();
317 }
318
319 pub(crate) async fn wait(&mut self) -> Result<S, ConsumerError> {
320 self.guard.wait(self.stream_name).await
321 }
322
323 pub(crate) async fn wait_until(
324 &mut self,
325 deadline: Instant,
326 ) -> Result<Option<S>, ConsumerError> {
327 let timeout = sleep_until(deadline);
328 tokio::pin!(timeout);
329
330 tokio::select! {
331 result = self.wait() => result.map(Some),
332 () = &mut timeout => {
333 self.abort().await;
334 Ok(None)
335 }
336 }
337 }
338
339 pub(crate) async fn abort(&mut self) {
340 self.guard.abort().await;
341 }
342}
343
344impl<S: Sink> Drop for Consumer<S> {
345 fn drop(&mut self) {
346 if let Some(task_termination_sender) = self.task_termination_sender.take() {
347 // We ignore any potential error here.
348 // Sending may fail if the task is already terminated (for example, by reaching EOF),
349 // which in turn dropped the receiver end!
350 let _res = task_termination_sender.send(());
351 }
352 if let Some(task) = self.task.take() {
353 task.abort();
354 }
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361 use assertr::prelude::*;
362 use std::io;
363 use tokio::sync::oneshot;
364
365 #[test]
366 fn stream_read_display_uses_source_context() {
367 let source = StreamReadError::new("stdout", io::Error::from(io::ErrorKind::BrokenPipe));
368 let expected = source.to_string();
369 let err = ConsumerError::StreamRead { source };
370
371 assert_that!(err.to_string()).is_equal_to(expected);
372 }
373
374 #[tokio::test]
375 async fn cancel_returns_cancelled_when_cooperative() {
376 let (task_termination_sender, task_termination_receiver) = oneshot::channel();
377 let consumer = Consumer {
378 stream_name: "custom",
379 task: Some(tokio::spawn(async move {
380 let _res = task_termination_receiver.await;
381 Ok(Vec::<u8>::new())
382 })),
383 task_termination_sender: Some(task_termination_sender),
384 };
385
386 let outcome = consumer.cancel(Duration::from_secs(1)).await.unwrap();
387
388 match outcome {
389 ConsumerCancelOutcome::Cancelled(bytes) => {
390 assert_that!(bytes).is_empty();
391 }
392 ConsumerCancelOutcome::Aborted => {
393 assert_that!(()).fail("expected cooperative cancellation");
394 }
395 }
396 }
397}