Skip to main content

tokio_process_tools/output_stream/consumer/
mod.rs

1//! Tokio runtime adapter. Drives a [`StreamVisitor`](crate::StreamVisitor) over a
2//! [`Subscription`](crate::output_stream::Subscription) on a tokio task and exposes the
3//! [`Consumer<S>`] handle with cooperative-cancel / abort semantics. Required machinery;
4//! tokio-bound by construction. The visitor traits this module drives are runtime-agnostic and
5//! live one level up at [`crate::output_stream::visitor`].
6
7pub(crate) mod driver;
8
9pub(crate) use driver::{spawn_consumer_async, spawn_consumer_sync};
10
11use crate::StreamReadError;
12use std::time::Duration;
13use thiserror::Error;
14use tokio::sync::oneshot::Sender;
15use tokio::task::JoinHandle;
16use tokio::time::{Instant, sleep_until};
17
18/// Errors that the [`Consumer`] infrastructure itself can raise while driving its stream.
19///
20/// These describe failures of the consumer task: joining, or reading the underlying stream.
21/// Visitor-specific failures (for example, a write-backed visitor's sink rejecting bytes) live
22/// in the visitor's own [`StreamVisitor::Output`](crate::StreamVisitor::Output) /
23/// [`AsyncStreamVisitor::Output`](crate::AsyncStreamVisitor::Output) type, not here. So a
24/// writer-backed consumer's `wait` returns
25/// `Result<Result<W, SinkWriteError>, ConsumerError>`: the outer result is what `ConsumerError`
26/// describes, the inner is the writer visitor's own outcome.
27#[derive(Debug, Error)]
28#[non_exhaustive]
29pub enum ConsumerError {
30    /// The consumer task could not be joined/terminated.
31    #[error("Failed to join/terminate the consumer task over stream '{stream_name}'.")]
32    TaskJoin {
33        /// The name of the stream this consumer operates on.
34        stream_name: &'static str,
35
36        /// The source error.
37        #[source]
38        source: tokio::task::JoinError,
39    },
40
41    /// The underlying stream failed while being read.
42    #[error("Failed to read stream.")]
43    StreamRead {
44        /// The source error.
45        #[source]
46        source: StreamReadError,
47    },
48}
49
50/// A trait for types that can act as sinks for collected stream data.
51///
52/// This is automatically implemented for any type that is `Send + 'static`.
53pub trait Sink: Send + 'static {}
54
55impl<T> Sink for T where T: Send + 'static {}
56
57/// The result of [`Consumer::cancel`].
58#[derive(Debug)]
59#[must_use = "Discarding the outcome hides whether the consumer cancelled cooperatively \
60              (returning its sink) or was forcefully aborted (dropping its sink)."]
61pub enum ConsumerCancelOutcome<S: Sink> {
62    /// The consumer observed cooperative cancellation before the timeout and returned its sink.
63    Cancelled(S),
64
65    /// The timeout elapsed, so the consumer task was aborted and its sink was dropped.
66    Aborted,
67}
68
69impl<S: Sink> ConsumerCancelOutcome<S> {
70    /// Returns the sink from a cooperative cancellation, or `None` if the timeout elapsed.
71    #[must_use]
72    pub fn into_cancelled(self) -> Option<S> {
73        match self {
74            Self::Cancelled(sink) => Some(sink),
75            Self::Aborted => None,
76        }
77    }
78
79    /// Returns the sink from a cooperative cancellation, panicking with `message` if the timeout
80    /// elapsed and the task was aborted instead.
81    ///
82    /// Useful in tests where cooperative cancellation is expected to win the race; production
83    /// code should match on the outcome explicitly.
84    ///
85    /// # Panics
86    ///
87    /// Panics with `message` when this outcome is [`Self::Aborted`].
88    pub fn expect_cancelled(self, message: &str) -> S {
89        self.into_cancelled().expect(message)
90    }
91}
92
93/// A handle for a tokio task that consumes a stream by driving a visitor over its events.
94///
95/// Consumers are produced by [`Consumable::consume`](crate::Consumable::consume) and
96/// [`Consumable::consume_async`](crate::Consumable::consume_async) on any consumable stream
97/// (e.g. [`BroadcastOutputStream`](crate::BroadcastOutputStream),
98/// [`SingleSubscriberOutputStream`](crate::SingleSubscriberOutputStream),
99/// [`DiscardedOutputStream`](crate::DiscardedOutputStream)). Pass a bundled visitor from
100/// [`crate::visitors`] or your own [`StreamVisitor`](crate::StreamVisitor) /
101/// [`AsyncStreamVisitor`](crate::AsyncStreamVisitor) implementation. The backends additionally
102/// expose `wait_for_line(...)` as a thin timeout wrapper over
103/// [`WaitForLine`](crate::visitors::wait::WaitForLine).
104///
105/// The type parameter `S` is the visitor's output (a sink, a writer, `()`, or another value
106/// the visitor returns when the stream ends).
107///
108/// For proper cleanup, call
109/// - `wait()`, which waits for the consumer task to complete.
110/// - `cancel(timeout)`, which asks the consumer to stop, waits for cooperative completion, and
111///   aborts the task if the timeout elapses first.
112/// - `abort()`, which forcefully aborts the consumer task.
113///
114/// If not cleaned up, the termination signal will be sent when dropping this consumer,
115/// but the task will be aborted (forceful, not waiting for its regular completion).
116pub struct Consumer<S: Sink> {
117    /// The name of the stream this consumer operates on.
118    pub(crate) stream_name: &'static str,
119
120    pub(crate) task: Option<JoinHandle<Result<S, StreamReadError>>>,
121    pub(crate) task_termination_sender: Option<Sender<()>>,
122}
123
124pub(crate) struct ConsumerWait<S: Sink> {
125    stream_name: &'static str,
126    guard: ConsumerWaitGuard<S>,
127}
128
129/// Owns a consumer task while [`Consumer::wait`] is pending.
130///
131/// `Consumer::wait` consumes the [`Consumer`] and then awaits its task. Without this guard,
132/// dropping that wait future after the task handle has been taken would detach the task instead
133/// of applying the same cleanup behavior as dropping an unused [`Consumer`]. The guard makes
134/// `wait` cancellation safe by signalling termination and aborting the task if the wait future is
135/// dropped early.
136struct ConsumerWaitGuard<S: Sink> {
137    task: Option<JoinHandle<Result<S, StreamReadError>>>,
138    task_termination_sender: Option<Sender<()>>,
139}
140
141impl<S: Sink> ConsumerWaitGuard<S> {
142    fn cancel(&mut self) {
143        let _res = self
144            .task_termination_sender
145            .take()
146            .expect("`task_termination_sender` to be present.")
147            .send(());
148    }
149
150    async fn wait(&mut self, stream_name: &'static str) -> Result<S, ConsumerError> {
151        let sink = self
152            .task
153            .as_mut()
154            .expect("`task` to be present.")
155            .await
156            .map_err(|err| ConsumerError::TaskJoin {
157                stream_name,
158                source: err,
159            })?
160            .map_err(|source| ConsumerError::StreamRead { source });
161
162        self.task = None;
163        self.task_termination_sender = None;
164
165        sink
166    }
167
168    async fn abort(&mut self) {
169        if let Some(task_termination_sender) = self.task_termination_sender.take() {
170            let _res = task_termination_sender.send(());
171        }
172        if let Some(task) = &self.task {
173            task.abort();
174        }
175        if let Some(task) = self.task.as_mut() {
176            let _res = task.await;
177        }
178        self.task = None;
179    }
180}
181
182impl<S: Sink> Drop for ConsumerWaitGuard<S> {
183    fn drop(&mut self) {
184        if let Some(task_termination_sender) = self.task_termination_sender.take() {
185            let _res = task_termination_sender.send(());
186        }
187        if let Some(task) = self.task.take() {
188            task.abort();
189        }
190    }
191}
192
193impl<S: Sink> Consumer<S> {
194    pub(crate) fn into_wait(mut self) -> ConsumerWait<S> {
195        ConsumerWait {
196            stream_name: self.stream_name,
197            guard: ConsumerWaitGuard {
198                task: self.task.take(),
199                task_termination_sender: self.task_termination_sender.take(),
200            },
201        }
202    }
203
204    /// Returns whether the consumer task has finished.
205    ///
206    /// This is a non-blocking task-state check. A finished consumer still owns its task result
207    /// until [`wait`](Self::wait), [`cancel`](Self::cancel), or [`abort`](Self::abort) consumes
208    /// it.
209    #[must_use]
210    pub fn is_finished(&self) -> bool {
211        self.task.as_ref().is_none_or(JoinHandle::is_finished)
212    }
213
214    /// Waits for the consumer to terminate naturally and returns its sink.
215    ///
216    /// A consumer will automatically terminate when either:
217    ///
218    /// 1. The underlying write-side of the stream is dropped.
219    /// 2. The underlying stream is closed (by receiving an EOF / final read of 0 bytes).
220    /// 3. The first `Next::Break` is observed.
221    ///
222    /// If none of these may occur in your case, this can hang forever. `wait` also waits for any
223    /// in-flight async visitor callback or writer call to complete.
224    ///
225    /// The stdout/stderr streams naturally close when the process is terminated, so `wait`ing
226    /// on a consumer after termination is fine:
227    ///
228    /// ```rust, no_run
229    /// # use std::time::Duration;
230    /// # use tokio_process_tools::{
231    /// #     AutoName, CollectedLines, CollectionOverflowBehavior, Consumable,
232    /// #     DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, GracefulShutdown,
233    /// #     ParseLines, LineCollectionOptions, LineParsingOptions, NumBytesExt, Process,
234    /// # };
235    /// # async fn test() {
236    /// # let cmd = tokio::process::Command::new("ls");
237    /// let mut process = Process::new(cmd)
238    ///     .name(AutoName::program_only())
239    ///     .stdout_and_stderr(|stream| {
240    ///         stream
241    ///             .broadcast()
242    ///             .lossy_without_backpressure()
243    ///             .no_replay()
244    ///             .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
245    ///             .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
246    ///     })
247    ///     .spawn()
248    ///     .unwrap();
249    /// let consumer = process
250    ///     .stdout()
251    ///     .consume(ParseLines::collect(
252    ///         LineParsingOptions::default(),
253    ///         CollectedLines::new(),
254    ///         CollectedLines::line_collector(LineCollectionOptions::Bounded {
255    ///             max_bytes: 1.megabytes(),
256    ///             max_lines: 1024,
257    ///             overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
258    ///         }),
259    ///     ))
260    ///     .expect("no other consumer is attached yet");
261    /// let shutdown = GracefulShutdown::builder()
262    ///     .unix_sigterm(Duration::from_secs(1))
263    ///     .windows_ctrl_break(Duration::from_secs(2))
264    ///     .build();
265    /// process.terminate(shutdown).await.unwrap();
266    /// let collected = consumer.wait().await.unwrap(); // This will return immediately.
267    /// # }
268    /// ```
269    ///
270    /// # Errors
271    ///
272    /// Returns [`ConsumerError::TaskJoin`] if the consumer task cannot be joined, or
273    /// [`ConsumerError::StreamRead`] if the underlying stream fails while being read.
274    /// Visitor-specific outcomes (e.g. a writer-backed visitor's sink failure) appear inside
275    /// the returned `S`, not in [`ConsumerError`].
276    ///
277    /// # Panics
278    ///
279    /// Panics if the consumer's internal task has already been taken.
280    pub async fn wait(self) -> Result<S, ConsumerError> {
281        self.into_wait().wait().await
282    }
283
284    /// Forcefully aborts the consumer task.
285    ///
286    /// This drops any pending async visitor callback or writer future, releases the stream
287    /// subscription, and drops the sink/writer instead of returning it. It cannot preempt blocking
288    /// synchronous code that never yields to the async runtime.
289    ///
290    /// For single-subscriber streams, the consumer claim is released after the aborted task has
291    /// been joined during this method.
292    pub async fn abort(self) {
293        self.into_wait().abort().await;
294    }
295
296    /// Cooperatively cancels the consumer, aborting it if `timeout` elapses first.
297    ///
298    /// Returns [`ConsumerCancelOutcome::Cancelled`] with the sink when the consumer observes
299    /// cancellation and exits normally before the timeout. Returns
300    /// [`ConsumerCancelOutcome::Aborted`] when the timeout elapses; in that case the task is
301    /// aborted, any pending callback/write future is dropped, and the sink/writer is not returned.
302    ///
303    /// Cancellation is still cooperative until the timeout boundary: an in-flight async callback
304    /// or writer call must finish before cancellation can be observed. For single-subscriber
305    /// streams, the consumer claim is released before this method returns, both after successful
306    /// cooperative cancellation and after timeout-driven abort.
307    ///
308    /// # Errors
309    ///
310    /// Returns [`ConsumerError::TaskJoin`] if the consumer task cannot be joined before the
311    /// timeout, or [`ConsumerError::StreamRead`] if the underlying stream fails while being read
312    /// before cancellation is observed. Visitor-specific outcomes appear inside the returned
313    /// `S` (carried by [`ConsumerCancelOutcome::Cancelled`]).
314    ///
315    /// # Panics
316    ///
317    /// Panics if the consumer's internal cancellation sender has already been taken.
318    pub async fn cancel(
319        self,
320        timeout: Duration,
321    ) -> Result<ConsumerCancelOutcome<S>, ConsumerError> {
322        let mut wait = self.into_wait();
323        wait.cancel();
324        match wait.wait_until(Instant::now() + timeout).await? {
325            Some(sink) => Ok(ConsumerCancelOutcome::Cancelled(sink)),
326            None => Ok(ConsumerCancelOutcome::Aborted),
327        }
328    }
329}
330
331impl<S: Sink> ConsumerWait<S> {
332    pub(crate) fn cancel(&mut self) {
333        self.guard.cancel();
334    }
335
336    pub(crate) async fn wait(&mut self) -> Result<S, ConsumerError> {
337        self.guard.wait(self.stream_name).await
338    }
339
340    pub(crate) async fn wait_until(
341        &mut self,
342        deadline: Instant,
343    ) -> Result<Option<S>, ConsumerError> {
344        let timeout = sleep_until(deadline);
345        tokio::pin!(timeout);
346
347        tokio::select! {
348            result = self.wait() => result.map(Some),
349            () = &mut timeout => {
350                self.abort().await;
351                Ok(None)
352            }
353        }
354    }
355
356    pub(crate) async fn abort(&mut self) {
357        self.guard.abort().await;
358    }
359}
360
361impl<S: Sink> Drop for Consumer<S> {
362    fn drop(&mut self) {
363        if let Some(task_termination_sender) = self.task_termination_sender.take() {
364            // We ignore any potential error here.
365            // Sending may fail if the task is already terminated (for example, by reaching EOF),
366            // which in turn dropped the receiver end!
367            let _res = task_termination_sender.send(());
368        }
369        if let Some(task) = self.task.take() {
370            task.abort();
371        }
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378    use assertr::prelude::*;
379    use tokio::sync::oneshot;
380
381    #[tokio::test]
382    async fn cancel_returns_cancelled_when_cooperative() {
383        let (task_termination_sender, task_termination_receiver) = oneshot::channel();
384        let consumer = Consumer {
385            stream_name: "custom",
386            task: Some(tokio::spawn(async move {
387                let _res = task_termination_receiver.await;
388                Ok(Vec::<u8>::new())
389            })),
390            task_termination_sender: Some(task_termination_sender),
391        };
392
393        let outcome = consumer.cancel(Duration::from_secs(1)).await.unwrap();
394
395        match outcome {
396            ConsumerCancelOutcome::Cancelled(bytes) => {
397                assert_that!(bytes).is_empty();
398            }
399            ConsumerCancelOutcome::Aborted => {
400                assert_that!(()).fail("expected cooperative cancellation");
401            }
402        }
403    }
404}