Skip to main content

tokio_process_tools/output_stream/consumer/
mod.rs

1//! Tokio runtime adapter. Drives a [`StreamVisitor`](crate::StreamVisitor) over a
2//! [`Subscription`](crate::output_stream::Subscription) on a tokio task and exposes the
3//! [`Consumer<S>`] handle with cooperative-cancel / abort semantics. Required machinery;
4//! tokio-bound by construction. The visitor traits this module drives are runtime-agnostic and
5//! live one level up at [`crate::output_stream::visitor`].
6
7pub(crate) mod driver;
8
9pub(crate) use driver::{spawn_consumer_async, spawn_consumer_sync};
10
11use crate::StreamReadError;
12use std::time::Duration;
13use thiserror::Error;
14use tokio::sync::oneshot::Sender;
15use tokio::task::JoinHandle;
16use tokio::time::{Instant, sleep_until};
17
18/// Errors that the [`Consumer`] infrastructure itself can raise while driving its stream.
19///
20/// These describe failures of the consumer task — joining, or reading the underlying stream.
21/// Visitor-specific failures (for example, a write-backed visitor's sink rejecting bytes) live
22/// in the visitor's own [`StreamVisitor::Output`](crate::StreamVisitor::Output) /
23/// [`AsyncStreamVisitor::Output`](crate::AsyncStreamVisitor::Output) type, not here. So a
24/// writer-backed consumer's `wait` returns
25/// `Result<Result<W, SinkWriteError>, ConsumerError>`: the outer result is what `ConsumerError`
26/// describes, the inner is the writer visitor's own outcome.
27#[derive(Debug, Error)]
28#[non_exhaustive]
29pub enum ConsumerError {
30    /// The consumer task could not be joined/terminated.
31    #[error("Failed to join/terminate the consumer task over stream '{stream_name}': {source}")]
32    TaskJoin {
33        /// The name of the stream this consumer operates on.
34        stream_name: &'static str,
35
36        /// The source error.
37        #[source]
38        source: tokio::task::JoinError,
39    },
40
41    /// The underlying stream failed while being read.
42    #[error("{source}")]
43    StreamRead {
44        /// The source error.
45        #[source]
46        source: StreamReadError,
47    },
48}
49
50/// A trait for types that can act as sinks for collected stream data.
51///
52/// This is automatically implemented for any type that is `Send + 'static`.
53pub trait Sink: Send + 'static {}
54
55impl<T> Sink for T where T: Send + 'static {}
56
57/// The result of [`Consumer::cancel`].
58#[derive(Debug)]
59pub enum ConsumerCancelOutcome<S: Sink> {
60    /// The consumer observed cooperative cancellation before the timeout and returned its sink.
61    Cancelled(S),
62
63    /// The timeout elapsed, so the consumer task was aborted and its sink was dropped.
64    Aborted,
65}
66
67impl<S: Sink> ConsumerCancelOutcome<S> {
68    /// Returns the sink from a cooperative cancellation, or `None` if the timeout elapsed.
69    #[must_use]
70    pub fn into_cancelled(self) -> Option<S> {
71        match self {
72            Self::Cancelled(sink) => Some(sink),
73            Self::Aborted => None,
74        }
75    }
76
77    /// Returns the sink from a cooperative cancellation, panicking with `message` if the timeout
78    /// elapsed and the task was aborted instead.
79    ///
80    /// Useful in tests where cooperative cancellation is expected to win the race; production
81    /// code should match on the outcome explicitly.
82    ///
83    /// # Panics
84    ///
85    /// Panics with `message` when this outcome is [`Self::Aborted`].
86    pub fn expect_cancelled(self, message: &str) -> S {
87        self.into_cancelled().expect(message)
88    }
89}
90
91/// A handle for a tokio task that consumes a stream by driving a visitor over its events.
92///
93/// Consumers are produced by the `inspect_*`, `collect_*`, and `wait_for_line` factory methods on
94/// [`BroadcastOutputStream`](crate::BroadcastOutputStream) and
95/// [`SingleSubscriberOutputStream`](crate::SingleSubscriberOutputStream). The type parameter `S`
96/// is the visitor's output (a sink, a writer, `()`, or another value the visitor returns when the
97/// stream ends).
98///
99/// For proper cleanup, call
100/// - `wait()`, which waits for the consumer task to complete.
101/// - `cancel(timeout)`, which asks the consumer to stop, waits for cooperative completion, and
102///   aborts the task if the timeout elapses first.
103/// - `abort()`, which forcefully aborts the consumer task.
104///
105/// If not cleaned up, the termination signal will be sent when dropping this consumer,
106/// but the task will be aborted (forceful, not waiting for its regular completion).
107pub struct Consumer<S: Sink> {
108    /// The name of the stream this consumer operates on.
109    pub(crate) stream_name: &'static str,
110
111    pub(crate) task: Option<JoinHandle<Result<S, StreamReadError>>>,
112    pub(crate) task_termination_sender: Option<Sender<()>>,
113}
114
115pub(crate) struct ConsumerWait<S: Sink> {
116    stream_name: &'static str,
117    guard: ConsumerWaitGuard<S>,
118}
119
120/// Owns a consumer task while [`Consumer::wait`] is pending.
121///
122/// `Consumer::wait` consumes the [`Consumer`] and then awaits its task. Without this guard,
123/// dropping that wait future after the task handle has been taken would detach the task instead
124/// of applying the same cleanup behavior as dropping an unused [`Consumer`]. The guard makes
125/// `wait` cancellation safe by signalling termination and aborting the task if the wait future is
126/// dropped early.
127struct ConsumerWaitGuard<S: Sink> {
128    task: Option<JoinHandle<Result<S, StreamReadError>>>,
129    task_termination_sender: Option<Sender<()>>,
130}
131
132impl<S: Sink> ConsumerWaitGuard<S> {
133    fn cancel(&mut self) {
134        let _res = self
135            .task_termination_sender
136            .take()
137            .expect("`task_termination_sender` to be present.")
138            .send(());
139    }
140
141    async fn wait(&mut self, stream_name: &'static str) -> Result<S, ConsumerError> {
142        let sink = self
143            .task
144            .as_mut()
145            .expect("`task` to be present.")
146            .await
147            .map_err(|err| ConsumerError::TaskJoin {
148                stream_name,
149                source: err,
150            })?
151            .map_err(|source| ConsumerError::StreamRead { source });
152
153        self.task = None;
154        self.task_termination_sender = None;
155
156        sink
157    }
158
159    async fn abort(&mut self) {
160        if let Some(task_termination_sender) = self.task_termination_sender.take() {
161            let _res = task_termination_sender.send(());
162        }
163        if let Some(task) = &self.task {
164            task.abort();
165        }
166        if let Some(task) = self.task.as_mut() {
167            let _res = task.await;
168        }
169        self.task = None;
170    }
171}
172
173impl<S: Sink> Drop for ConsumerWaitGuard<S> {
174    fn drop(&mut self) {
175        if let Some(task_termination_sender) = self.task_termination_sender.take() {
176            let _res = task_termination_sender.send(());
177        }
178        if let Some(task) = self.task.take() {
179            task.abort();
180        }
181    }
182}
183
184impl<S: Sink> Consumer<S> {
185    pub(crate) fn into_wait(mut self) -> ConsumerWait<S> {
186        ConsumerWait {
187            stream_name: self.stream_name,
188            guard: ConsumerWaitGuard {
189                task: self.task.take(),
190                task_termination_sender: self.task_termination_sender.take(),
191            },
192        }
193    }
194
195    /// Returns whether the consumer task has finished.
196    ///
197    /// This is a non-blocking task-state check. A finished consumer still owns its task result
198    /// until [`wait`](Self::wait), [`cancel`](Self::cancel), or [`abort`](Self::abort) consumes
199    /// it.
200    #[must_use]
201    pub fn is_finished(&self) -> bool {
202        self.task.as_ref().is_none_or(JoinHandle::is_finished)
203    }
204
205    /// Waits for the consumer to terminate naturally and returns its sink.
206    ///
207    /// A consumer will automatically terminate when either:
208    ///
209    /// 1. The underlying write-side of the stream is dropped.
210    /// 2. The underlying stream is closed (by receiving an EOF / final read of 0 bytes).
211    /// 3. The first `Next::Break` is observed.
212    ///
213    /// If none of these may occur in your case, this can hang forever. `wait` also waits for any
214    /// in-flight async visitor callback or writer call to complete.
215    ///
216    /// The stdout/stderr streams naturally close when the process is terminated, so `wait`ing
217    /// on a consumer after termination is fine:
218    ///
219    /// ```rust, no_run
220    /// # use std::time::Duration;
221    /// # use tokio_process_tools::{
222    /// #     AutoName, CollectionOverflowBehavior, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE,
223    /// #     GracefulTimeouts, LineCollectionOptions, LineParsingOptions, NumBytesExt, Process, both,
224    /// # };
225    /// # async fn test() {
226    /// # let cmd = tokio::process::Command::new("ls");
227    /// let mut process = Process::new(cmd)
228    ///     .name(AutoName::program_only())
229    ///     .stdout_and_stderr(|stream| {
230    ///         stream
231    ///             .broadcast()
232    ///             .best_effort_delivery()
233    ///             .no_replay()
234    ///             .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
235    ///             .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
236    ///     })
237    ///     .spawn()
238    ///     .unwrap();
239    /// let consumer = process.stdout().collect_lines_into_vec(
240    ///     LineParsingOptions::default(),
241    ///     LineCollectionOptions::Bounded {
242    ///         max_bytes: 1.megabytes(),
243    ///         max_lines: 1024,
244    ///         overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
245    ///     },
246    /// );
247    /// let timeouts = GracefulTimeouts::builder()
248    ///     .unix(both(Duration::from_secs(1)))
249    ///     .windows(Duration::from_secs(2))
250    ///     .build();
251    /// process.terminate(timeouts).await.unwrap();
252    /// let collected = consumer.wait().await.unwrap(); // This will return immediately.
253    /// # }
254    /// ```
255    ///
256    /// # Errors
257    ///
258    /// Returns [`ConsumerError::TaskJoin`] if the consumer task cannot be joined, or
259    /// [`ConsumerError::StreamRead`] if the underlying stream fails while being read.
260    /// Visitor-specific outcomes (e.g. a writer-backed visitor's sink failure) appear inside
261    /// the returned `S`, not in [`ConsumerError`].
262    ///
263    /// # Panics
264    ///
265    /// Panics if the consumer's internal task has already been taken.
266    pub async fn wait(self) -> Result<S, ConsumerError> {
267        self.into_wait().wait().await
268    }
269
270    /// Forcefully aborts the consumer task.
271    ///
272    /// This drops any pending async visitor callback or writer future, releases the stream
273    /// subscription, and drops the sink/writer instead of returning it. It cannot preempt blocking
274    /// synchronous code that never yields to the async runtime.
275    ///
276    /// For single-subscriber streams, the consumer claim is released after the aborted task has
277    /// been joined during this method.
278    pub async fn abort(self) {
279        self.into_wait().abort().await;
280    }
281
282    /// Cooperatively cancels the consumer, aborting it if `timeout` elapses first.
283    ///
284    /// Returns [`ConsumerCancelOutcome::Cancelled`] with the sink when the consumer observes
285    /// cancellation and exits normally before the timeout. Returns
286    /// [`ConsumerCancelOutcome::Aborted`] when the timeout elapses; in that case the task is
287    /// aborted, any pending callback/write future is dropped, and the sink/writer is not returned.
288    ///
289    /// Cancellation is still cooperative until the timeout boundary: an in-flight async callback
290    /// or writer call must finish before cancellation can be observed. For single-subscriber
291    /// streams, the consumer claim is released before this method returns, both after successful
292    /// cooperative cancellation and after timeout-driven abort.
293    ///
294    /// # Errors
295    ///
296    /// Returns [`ConsumerError::TaskJoin`] if the consumer task cannot be joined before the
297    /// timeout, or [`ConsumerError::StreamRead`] if the underlying stream fails while being read
298    /// before cancellation is observed. Visitor-specific outcomes appear inside the returned
299    /// `S` (carried by [`ConsumerCancelOutcome::Cancelled`]).
300    ///
301    /// # Panics
302    ///
303    /// Panics if the consumer's internal cancellation sender has already been taken.
304    pub async fn cancel(
305        self,
306        timeout: Duration,
307    ) -> Result<ConsumerCancelOutcome<S>, ConsumerError> {
308        let mut wait = self.into_wait();
309        wait.cancel();
310        match wait.wait_until(Instant::now() + timeout).await? {
311            Some(sink) => Ok(ConsumerCancelOutcome::Cancelled(sink)),
312            None => Ok(ConsumerCancelOutcome::Aborted),
313        }
314    }
315}
316
317impl<S: Sink> ConsumerWait<S> {
318    pub(crate) fn cancel(&mut self) {
319        self.guard.cancel();
320    }
321
322    pub(crate) async fn wait(&mut self) -> Result<S, ConsumerError> {
323        self.guard.wait(self.stream_name).await
324    }
325
326    pub(crate) async fn wait_until(
327        &mut self,
328        deadline: Instant,
329    ) -> Result<Option<S>, ConsumerError> {
330        let timeout = sleep_until(deadline);
331        tokio::pin!(timeout);
332
333        tokio::select! {
334            result = self.wait() => result.map(Some),
335            () = &mut timeout => {
336                self.abort().await;
337                Ok(None)
338            }
339        }
340    }
341
342    pub(crate) async fn abort(&mut self) {
343        self.guard.abort().await;
344    }
345}
346
347impl<S: Sink> Drop for Consumer<S> {
348    fn drop(&mut self) {
349        if let Some(task_termination_sender) = self.task_termination_sender.take() {
350            // We ignore any potential error here.
351            // Sending may fail if the task is already terminated (for example, by reaching EOF),
352            // which in turn dropped the receiver end!
353            let _res = task_termination_sender.send(());
354        }
355        if let Some(task) = self.task.take() {
356            task.abort();
357        }
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    use super::*;
364    use assertr::prelude::*;
365    use std::io;
366    use tokio::sync::oneshot;
367
368    #[test]
369    fn stream_read_display_uses_source_context() {
370        let source = StreamReadError::new("stdout", io::Error::from(io::ErrorKind::BrokenPipe));
371        let expected = source.to_string();
372        let err = ConsumerError::StreamRead { source };
373
374        assert_that!(err.to_string()).is_equal_to(expected);
375    }
376
377    #[tokio::test]
378    async fn cancel_returns_cancelled_when_cooperative() {
379        let (task_termination_sender, task_termination_receiver) = oneshot::channel();
380        let consumer = Consumer {
381            stream_name: "custom",
382            task: Some(tokio::spawn(async move {
383                let _res = task_termination_receiver.await;
384                Ok(Vec::<u8>::new())
385            })),
386            task_termination_sender: Some(task_termination_sender),
387        };
388
389        let outcome = consumer.cancel(Duration::from_secs(1)).await.unwrap();
390
391        match outcome {
392            ConsumerCancelOutcome::Cancelled(bytes) => {
393                assert_that!(bytes).is_empty();
394            }
395            ConsumerCancelOutcome::Aborted => {
396                assert_that!(()).fail("expected cooperative cancellation");
397            }
398        }
399    }
400}