Skip to main content

tokio_process_tools/output_stream/consumer/
mod.rs

1//! Tokio runtime adapter. Drives a [`StreamVisitor`](crate::StreamVisitor) over a
2//! [`Subscription`](crate::output_stream::Subscription) on a tokio task and exposes the
3//! [`Consumer<S>`] handle with cooperative-cancel / abort semantics. Required machinery;
4//! tokio-bound by construction. The visitor traits this module drives are runtime-agnostic and
5//! live one level up at [`crate::output_stream::visitor`].
6
7pub(crate) mod driver;
8
9pub(crate) use driver::{spawn_consumer_async, spawn_consumer_sync};
10
11use crate::StreamReadError;
12use std::time::Duration;
13use thiserror::Error;
14use tokio::sync::oneshot::Sender;
15use tokio::task::JoinHandle;
16use tokio::time::{Instant, sleep_until};
17
18/// Errors that the [`Consumer`] infrastructure itself can raise while driving its stream.
19///
20/// These describe failures of the consumer task — joining, or reading the underlying stream.
21/// Visitor-specific failures (for example, a write-backed visitor's sink rejecting bytes) live
22/// in the visitor's own [`StreamVisitor::Output`](crate::StreamVisitor::Output) /
23/// [`AsyncStreamVisitor::Output`](crate::AsyncStreamVisitor::Output) type, not here. So a
24/// writer-backed consumer's `wait` returns
25/// `Result<Result<W, SinkWriteError>, ConsumerError>`: the outer result is what `ConsumerError`
26/// describes, the inner is the writer visitor's own outcome.
27#[derive(Debug, Error)]
28#[non_exhaustive]
29pub enum ConsumerError {
30    /// The consumer task could not be joined/terminated.
31    #[error("Failed to join/terminate the consumer task over stream '{stream_name}': {source}")]
32    TaskJoin {
33        /// The name of the stream this consumer operates on.
34        stream_name: &'static str,
35
36        /// The source error.
37        #[source]
38        source: tokio::task::JoinError,
39    },
40
41    /// The underlying stream failed while being read.
42    #[error("{source}")]
43    StreamRead {
44        /// The source error.
45        #[source]
46        source: StreamReadError,
47    },
48}
49
50/// A trait for types that can act as sinks for collected stream data.
51///
52/// This is automatically implemented for any type that is `Send + 'static`.
53pub trait Sink: Send + 'static {}
54
55impl<T> Sink for T where T: Send + 'static {}
56
57/// The result of [`Consumer::cancel`].
58#[derive(Debug)]
59pub enum ConsumerCancelOutcome<S: Sink> {
60    /// The consumer observed cooperative cancellation before the timeout and returned its sink.
61    Cancelled(S),
62
63    /// The timeout elapsed, so the consumer task was aborted and its sink was dropped.
64    Aborted,
65}
66
67impl<S: Sink> ConsumerCancelOutcome<S> {
68    /// Returns the sink from a cooperative cancellation, or `None` if the timeout elapsed.
69    #[must_use]
70    pub fn into_cancelled(self) -> Option<S> {
71        match self {
72            Self::Cancelled(sink) => Some(sink),
73            Self::Aborted => None,
74        }
75    }
76
77    /// Returns the sink from a cooperative cancellation, panicking with `message` if the timeout
78    /// elapsed and the task was aborted instead.
79    ///
80    /// Useful in tests where cooperative cancellation is expected to win the race; production
81    /// code should match on the outcome explicitly.
82    ///
83    /// # Panics
84    ///
85    /// Panics with `message` when this outcome is [`Self::Aborted`].
86    pub fn expect_cancelled(self, message: &str) -> S {
87        self.into_cancelled().expect(message)
88    }
89}
90
91/// A handle for a tokio task that consumes a stream by driving a visitor over its events.
92///
93/// Consumers are produced by the `inspect_*`, `collect_*`, and `wait_for_line` factory methods on
94/// [`BroadcastOutputStream`](crate::BroadcastOutputStream) and
95/// [`SingleSubscriberOutputStream`](crate::SingleSubscriberOutputStream). The type parameter `S`
96/// is the visitor's output (a sink, a writer, `()`, or another value the visitor returns when the
97/// stream ends).
98///
99/// For proper cleanup, call
100/// - `wait()`, which waits for the consumer task to complete.
101/// - `cancel(timeout)`, which asks the consumer to stop, waits for cooperative completion, and
102///   aborts the task if the timeout elapses first.
103/// - `abort()`, which forcefully aborts the consumer task.
104///
105/// If not cleaned up, the termination signal will be sent when dropping this consumer,
106/// but the task will be aborted (forceful, not waiting for its regular completion).
107pub struct Consumer<S: Sink> {
108    /// The name of the stream this consumer operates on.
109    pub(crate) stream_name: &'static str,
110
111    pub(crate) task: Option<JoinHandle<Result<S, StreamReadError>>>,
112    pub(crate) task_termination_sender: Option<Sender<()>>,
113}
114
115pub(crate) struct ConsumerWait<S: Sink> {
116    stream_name: &'static str,
117    guard: ConsumerWaitGuard<S>,
118}
119
120/// Owns a consumer task while [`Consumer::wait`] is pending.
121///
122/// `Consumer::wait` consumes the [`Consumer`] and then awaits its task. Without this guard,
123/// dropping that wait future after the task handle has been taken would detach the task instead
124/// of applying the same cleanup behavior as dropping an unused [`Consumer`]. The guard makes
125/// `wait` cancellation safe by signalling termination and aborting the task if the wait future is
126/// dropped early.
127struct ConsumerWaitGuard<S: Sink> {
128    task: Option<JoinHandle<Result<S, StreamReadError>>>,
129    task_termination_sender: Option<Sender<()>>,
130}
131
132impl<S: Sink> ConsumerWaitGuard<S> {
133    fn cancel(&mut self) {
134        let _res = self
135            .task_termination_sender
136            .take()
137            .expect("`task_termination_sender` to be present.")
138            .send(());
139    }
140
141    async fn wait(&mut self, stream_name: &'static str) -> Result<S, ConsumerError> {
142        let sink = self
143            .task
144            .as_mut()
145            .expect("`task` to be present.")
146            .await
147            .map_err(|err| ConsumerError::TaskJoin {
148                stream_name,
149                source: err,
150            })?
151            .map_err(|source| ConsumerError::StreamRead { source });
152
153        self.task = None;
154        self.task_termination_sender = None;
155
156        sink
157    }
158
159    async fn abort(&mut self) {
160        if let Some(task_termination_sender) = self.task_termination_sender.take() {
161            let _res = task_termination_sender.send(());
162        }
163        if let Some(task) = &self.task {
164            task.abort();
165        }
166        if let Some(task) = self.task.as_mut() {
167            let _res = task.await;
168        }
169        self.task = None;
170    }
171}
172
173impl<S: Sink> Drop for ConsumerWaitGuard<S> {
174    fn drop(&mut self) {
175        if let Some(task_termination_sender) = self.task_termination_sender.take() {
176            let _res = task_termination_sender.send(());
177        }
178        if let Some(task) = self.task.take() {
179            task.abort();
180        }
181    }
182}
183
184impl<S: Sink> Consumer<S> {
185    pub(crate) fn into_wait(mut self) -> ConsumerWait<S> {
186        ConsumerWait {
187            stream_name: self.stream_name,
188            guard: ConsumerWaitGuard {
189                task: self.task.take(),
190                task_termination_sender: self.task_termination_sender.take(),
191            },
192        }
193    }
194
195    /// Returns whether the consumer task has finished.
196    ///
197    /// This is a non-blocking task-state check. A finished consumer still owns its task result
198    /// until [`wait`](Self::wait), [`cancel`](Self::cancel), or [`abort`](Self::abort) consumes
199    /// it.
200    #[must_use]
201    pub fn is_finished(&self) -> bool {
202        self.task.as_ref().is_none_or(JoinHandle::is_finished)
203    }
204
205    /// Waits for the consumer to terminate naturally and returns its sink.
206    ///
207    /// A consumer will automatically terminate when either:
208    ///
209    /// 1. The underlying write-side of the stream is dropped.
210    /// 2. The underlying stream is closed (by receiving an EOF / final read of 0 bytes).
211    /// 3. The first `Next::Break` is observed.
212    ///
213    /// If none of these may occur in your case, this can hang forever. `wait` also waits for any
214    /// in-flight async visitor callback or writer call to complete.
215    ///
216    /// The stdout/stderr streams naturally close when the process is terminated, so `wait`ing
217    /// on a consumer after termination is fine:
218    ///
219    /// ```rust, no_run
220    /// # async fn test() {
221    /// # use std::time::Duration;
222    /// # use tokio_process_tools::{
223    /// #     AutoName, CollectionOverflowBehavior, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE,
224    /// #     LineCollectionOptions, LineParsingOptions, NumBytesExt, Process,
225    /// # };
226    ///
227    /// # let cmd = tokio::process::Command::new("ls");
228    /// let mut process = Process::new(cmd)
229    ///     .name(AutoName::program_only())
230    ///     .stdout_and_stderr(|stream| {
231    ///         stream
232    ///             .broadcast()
233    ///             .best_effort_delivery()
234    ///             .no_replay()
235    ///             .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
236    ///             .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
237    ///     })
238    ///     .spawn()
239    ///     .unwrap();
240    /// let consumer = process.stdout().collect_lines_into_vec(
241    ///     LineParsingOptions::default(),
242    ///     LineCollectionOptions::Bounded {
243    ///         max_bytes: 1.megabytes(),
244    ///         max_lines: 1024,
245    ///         overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
246    ///     },
247    /// );
248    /// process.terminate(Duration::from_secs(1), Duration::from_secs(1)).await.unwrap();
249    /// let collected = consumer.wait().await.unwrap(); // This will return immediately.
250    /// # }
251    /// ```
252    ///
253    /// # Errors
254    ///
255    /// Returns [`ConsumerError::TaskJoin`] if the consumer task cannot be joined, or
256    /// [`ConsumerError::StreamRead`] if the underlying stream fails while being read.
257    /// Visitor-specific outcomes (e.g. a writer-backed visitor's sink failure) appear inside
258    /// the returned `S`, not in [`ConsumerError`].
259    ///
260    /// # Panics
261    ///
262    /// Panics if the consumer's internal task has already been taken.
263    pub async fn wait(self) -> Result<S, ConsumerError> {
264        self.into_wait().wait().await
265    }
266
267    /// Forcefully aborts the consumer task.
268    ///
269    /// This drops any pending async visitor callback or writer future, releases the stream
270    /// subscription, and drops the sink/writer instead of returning it. It cannot preempt blocking
271    /// synchronous code that never yields to the async runtime.
272    ///
273    /// For single-subscriber streams, the consumer claim is released after the aborted task has
274    /// been joined during this method.
275    pub async fn abort(self) {
276        self.into_wait().abort().await;
277    }
278
279    /// Cooperatively cancels the consumer, aborting it if `timeout` elapses first.
280    ///
281    /// Returns [`ConsumerCancelOutcome::Cancelled`] with the sink when the consumer observes
282    /// cancellation and exits normally before the timeout. Returns
283    /// [`ConsumerCancelOutcome::Aborted`] when the timeout elapses; in that case the task is
284    /// aborted, any pending callback/write future is dropped, and the sink/writer is not returned.
285    ///
286    /// Cancellation is still cooperative until the timeout boundary: an in-flight async callback
287    /// or writer call must finish before cancellation can be observed. For single-subscriber
288    /// streams, the consumer claim is released before this method returns, both after successful
289    /// cooperative cancellation and after timeout-driven abort.
290    ///
291    /// # Errors
292    ///
293    /// Returns [`ConsumerError::TaskJoin`] if the consumer task cannot be joined before the
294    /// timeout, or [`ConsumerError::StreamRead`] if the underlying stream fails while being read
295    /// before cancellation is observed. Visitor-specific outcomes appear inside the returned
296    /// `S` (carried by [`ConsumerCancelOutcome::Cancelled`]).
297    ///
298    /// # Panics
299    ///
300    /// Panics if the consumer's internal cancellation sender has already been taken.
301    pub async fn cancel(
302        self,
303        timeout: Duration,
304    ) -> Result<ConsumerCancelOutcome<S>, ConsumerError> {
305        let mut wait = self.into_wait();
306        wait.cancel();
307        match wait.wait_until(Instant::now() + timeout).await? {
308            Some(sink) => Ok(ConsumerCancelOutcome::Cancelled(sink)),
309            None => Ok(ConsumerCancelOutcome::Aborted),
310        }
311    }
312}
313
314impl<S: Sink> ConsumerWait<S> {
315    pub(crate) fn cancel(&mut self) {
316        self.guard.cancel();
317    }
318
319    pub(crate) async fn wait(&mut self) -> Result<S, ConsumerError> {
320        self.guard.wait(self.stream_name).await
321    }
322
323    pub(crate) async fn wait_until(
324        &mut self,
325        deadline: Instant,
326    ) -> Result<Option<S>, ConsumerError> {
327        let timeout = sleep_until(deadline);
328        tokio::pin!(timeout);
329
330        tokio::select! {
331            result = self.wait() => result.map(Some),
332            () = &mut timeout => {
333                self.abort().await;
334                Ok(None)
335            }
336        }
337    }
338
339    pub(crate) async fn abort(&mut self) {
340        self.guard.abort().await;
341    }
342}
343
344impl<S: Sink> Drop for Consumer<S> {
345    fn drop(&mut self) {
346        if let Some(task_termination_sender) = self.task_termination_sender.take() {
347            // We ignore any potential error here.
348            // Sending may fail if the task is already terminated (for example, by reaching EOF),
349            // which in turn dropped the receiver end!
350            let _res = task_termination_sender.send(());
351        }
352        if let Some(task) = self.task.take() {
353            task.abort();
354        }
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361    use assertr::prelude::*;
362    use std::io;
363    use tokio::sync::oneshot;
364
365    #[test]
366    fn stream_read_display_uses_source_context() {
367        let source = StreamReadError::new("stdout", io::Error::from(io::ErrorKind::BrokenPipe));
368        let expected = source.to_string();
369        let err = ConsumerError::StreamRead { source };
370
371        assert_that!(err.to_string()).is_equal_to(expected);
372    }
373
374    #[tokio::test]
375    async fn cancel_returns_cancelled_when_cooperative() {
376        let (task_termination_sender, task_termination_receiver) = oneshot::channel();
377        let consumer = Consumer {
378            stream_name: "custom",
379            task: Some(tokio::spawn(async move {
380                let _res = task_termination_receiver.await;
381                Ok(Vec::<u8>::new())
382            })),
383            task_termination_sender: Some(task_termination_sender),
384        };
385
386        let outcome = consumer.cancel(Duration::from_secs(1)).await.unwrap();
387
388        match outcome {
389            ConsumerCancelOutcome::Cancelled(bytes) => {
390                assert_that!(bytes).is_empty();
391            }
392            ConsumerCancelOutcome::Aborted => {
393                assert_that!(()).fail("expected cooperative cancellation");
394            }
395        }
396    }
397}