tokio_process_tools/output_stream/consumer/mod.rs
1//! Tokio runtime adapter. Drives a [`StreamVisitor`](crate::StreamVisitor) over a
2//! [`Subscription`](crate::output_stream::Subscription) on a tokio task and exposes the
3//! [`Consumer<S>`] handle with cooperative-cancel / abort semantics. Required machinery;
4//! tokio-bound by construction. The visitor traits this module drives are runtime-agnostic and
5//! live one level up at [`crate::output_stream::visitor`].
6
7pub(crate) mod driver;
8
9pub(crate) use driver::{spawn_consumer_async, spawn_consumer_sync};
10
11use crate::StreamReadError;
12use std::time::Duration;
13use thiserror::Error;
14use tokio::sync::oneshot::Sender;
15use tokio::task::JoinHandle;
16use tokio::time::{Instant, sleep_until};
17
18/// Errors that the [`Consumer`] infrastructure itself can raise while driving its stream.
19///
20/// These describe failures of the consumer task: joining, or reading the underlying stream.
21/// Visitor-specific failures (for example, a write-backed visitor's sink rejecting bytes) live
22/// in the visitor's own [`StreamVisitor::Output`](crate::StreamVisitor::Output) /
23/// [`AsyncStreamVisitor::Output`](crate::AsyncStreamVisitor::Output) type, not here. So a
24/// writer-backed consumer's `wait` returns
25/// `Result<Result<W, SinkWriteError>, ConsumerError>`: the outer result is what `ConsumerError`
26/// describes, the inner is the writer visitor's own outcome.
27#[derive(Debug, Error)]
28#[non_exhaustive]
29pub enum ConsumerError {
30 /// The consumer task could not be joined/terminated.
31 #[error("Failed to join/terminate the consumer task over stream '{stream_name}'.")]
32 TaskJoin {
33 /// The name of the stream this consumer operates on.
34 stream_name: &'static str,
35
36 /// The source error.
37 #[source]
38 source: tokio::task::JoinError,
39 },
40
41 /// The underlying stream failed while being read.
42 #[error("Failed to read stream.")]
43 StreamRead {
44 /// The source error.
45 #[source]
46 source: StreamReadError,
47 },
48}
49
50/// A trait for types that can act as sinks for collected stream data.
51///
52/// This is automatically implemented for any type that is `Send + 'static`.
53pub trait Sink: Send + 'static {}
54
55impl<T> Sink for T where T: Send + 'static {}
56
57/// The result of [`Consumer::cancel`].
58#[derive(Debug)]
59#[must_use = "Discarding the outcome hides whether the consumer cancelled cooperatively \
60 (returning its sink) or was forcefully aborted (dropping its sink)."]
61pub enum ConsumerCancelOutcome<S: Sink> {
62 /// The consumer observed cooperative cancellation before the timeout and returned its sink.
63 Cancelled(S),
64
65 /// The timeout elapsed, so the consumer task was aborted and its sink was dropped.
66 Aborted,
67}
68
69impl<S: Sink> ConsumerCancelOutcome<S> {
70 /// Returns the sink from a cooperative cancellation, or `None` if the timeout elapsed.
71 #[must_use]
72 pub fn into_cancelled(self) -> Option<S> {
73 match self {
74 Self::Cancelled(sink) => Some(sink),
75 Self::Aborted => None,
76 }
77 }
78
79 /// Returns the sink from a cooperative cancellation, panicking with `message` if the timeout
80 /// elapsed and the task was aborted instead.
81 ///
82 /// Useful in tests where cooperative cancellation is expected to win the race; production
83 /// code should match on the outcome explicitly.
84 ///
85 /// # Panics
86 ///
87 /// Panics with `message` when this outcome is [`Self::Aborted`].
88 pub fn expect_cancelled(self, message: &str) -> S {
89 self.into_cancelled().expect(message)
90 }
91}
92
93/// A handle for a tokio task that consumes a stream by driving a visitor over its events.
94///
95/// Consumers are produced by [`Consumable::consume`](crate::Consumable::consume) and
96/// [`Consumable::consume_async`](crate::Consumable::consume_async) on any consumable stream
97/// (e.g. [`BroadcastOutputStream`](crate::BroadcastOutputStream),
98/// [`SingleSubscriberOutputStream`](crate::SingleSubscriberOutputStream),
99/// [`DiscardedOutputStream`](crate::DiscardedOutputStream)). Pass a bundled visitor from
100/// [`crate::visitors`] or your own [`StreamVisitor`](crate::StreamVisitor) /
101/// [`AsyncStreamVisitor`](crate::AsyncStreamVisitor) implementation. The backends additionally
102/// expose `wait_for_line(...)` as a thin timeout wrapper over
103/// [`WaitForLine`](crate::visitors::wait::WaitForLine).
104///
105/// The type parameter `S` is the visitor's output (a sink, a writer, `()`, or another value
106/// the visitor returns when the stream ends).
107///
108/// For proper cleanup, call
109/// - `wait()`, which waits for the consumer task to complete.
110/// - `cancel(timeout)`, which asks the consumer to stop, waits for cooperative completion, and
111/// aborts the task if the timeout elapses first.
112/// - `abort()`, which forcefully aborts the consumer task.
113///
114/// If not cleaned up, the termination signal will be sent when dropping this consumer,
115/// but the task will be aborted (forceful, not waiting for its regular completion).
116pub struct Consumer<S: Sink> {
117 /// The name of the stream this consumer operates on.
118 pub(crate) stream_name: &'static str,
119
120 pub(crate) task: Option<JoinHandle<Result<S, StreamReadError>>>,
121 pub(crate) task_termination_sender: Option<Sender<()>>,
122}
123
124pub(crate) struct ConsumerWait<S: Sink> {
125 stream_name: &'static str,
126 guard: ConsumerWaitGuard<S>,
127}
128
129/// Owns a consumer task while [`Consumer::wait`] is pending.
130///
131/// `Consumer::wait` consumes the [`Consumer`] and then awaits its task. Without this guard,
132/// dropping that wait future after the task handle has been taken would detach the task instead
133/// of applying the same cleanup behavior as dropping an unused [`Consumer`]. The guard makes
134/// `wait` cancellation safe by signalling termination and aborting the task if the wait future is
135/// dropped early.
136struct ConsumerWaitGuard<S: Sink> {
137 task: Option<JoinHandle<Result<S, StreamReadError>>>,
138 task_termination_sender: Option<Sender<()>>,
139}
140
141impl<S: Sink> ConsumerWaitGuard<S> {
142 fn cancel(&mut self) {
143 let _res = self
144 .task_termination_sender
145 .take()
146 .expect("`task_termination_sender` to be present.")
147 .send(());
148 }
149
150 async fn wait(&mut self, stream_name: &'static str) -> Result<S, ConsumerError> {
151 let sink = self
152 .task
153 .as_mut()
154 .expect("`task` to be present.")
155 .await
156 .map_err(|err| ConsumerError::TaskJoin {
157 stream_name,
158 source: err,
159 })?
160 .map_err(|source| ConsumerError::StreamRead { source });
161
162 self.task = None;
163 self.task_termination_sender = None;
164
165 sink
166 }
167
168 async fn abort(&mut self) {
169 if let Some(task_termination_sender) = self.task_termination_sender.take() {
170 let _res = task_termination_sender.send(());
171 }
172 if let Some(task) = &self.task {
173 task.abort();
174 }
175 if let Some(task) = self.task.as_mut() {
176 let _res = task.await;
177 }
178 self.task = None;
179 }
180}
181
182impl<S: Sink> Drop for ConsumerWaitGuard<S> {
183 fn drop(&mut self) {
184 if let Some(task_termination_sender) = self.task_termination_sender.take() {
185 let _res = task_termination_sender.send(());
186 }
187 if let Some(task) = self.task.take() {
188 task.abort();
189 }
190 }
191}
192
193impl<S: Sink> Consumer<S> {
194 pub(crate) fn into_wait(mut self) -> ConsumerWait<S> {
195 ConsumerWait {
196 stream_name: self.stream_name,
197 guard: ConsumerWaitGuard {
198 task: self.task.take(),
199 task_termination_sender: self.task_termination_sender.take(),
200 },
201 }
202 }
203
204 /// Returns whether the consumer task has finished.
205 ///
206 /// This is a non-blocking task-state check. A finished consumer still owns its task result
207 /// until [`wait`](Self::wait), [`cancel`](Self::cancel), or [`abort`](Self::abort) consumes
208 /// it.
209 #[must_use]
210 pub fn is_finished(&self) -> bool {
211 self.task.as_ref().is_none_or(JoinHandle::is_finished)
212 }
213
214 /// Waits for the consumer to terminate naturally and returns its sink.
215 ///
216 /// A consumer will automatically terminate when either:
217 ///
218 /// 1. The underlying write-side of the stream is dropped.
219 /// 2. The underlying stream is closed (by receiving an EOF / final read of 0 bytes).
220 /// 3. The first `Next::Break` is observed.
221 ///
222 /// If none of these may occur in your case, this can hang forever. `wait` also waits for any
223 /// in-flight async visitor callback or writer call to complete.
224 ///
225 /// The stdout/stderr streams naturally close when the process is terminated, so `wait`ing
226 /// on a consumer after termination is fine:
227 ///
228 /// ```rust, no_run
229 /// # use std::time::Duration;
230 /// # use tokio_process_tools::{
231 /// # AutoName, CollectedLines, CollectionOverflowBehavior, Consumable,
232 /// # DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, GracefulShutdown,
233 /// # ParseLines, LineCollectionOptions, LineParsingOptions, NumBytesExt, Process,
234 /// # };
235 /// # async fn test() {
236 /// # let cmd = tokio::process::Command::new("ls");
237 /// let mut process = Process::new(cmd)
238 /// .name(AutoName::program_only())
239 /// .stdout_and_stderr(|stream| {
240 /// stream
241 /// .broadcast()
242 /// .lossy_without_backpressure()
243 /// .no_replay()
244 /// .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
245 /// .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
246 /// })
247 /// .spawn()
248 /// .unwrap();
249 /// let consumer = process
250 /// .stdout()
251 /// .consume(ParseLines::collect(
252 /// LineParsingOptions::default(),
253 /// CollectedLines::new(),
254 /// CollectedLines::line_collector(LineCollectionOptions::Bounded {
255 /// max_bytes: 1.megabytes(),
256 /// max_lines: 1024,
257 /// overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
258 /// }),
259 /// ))
260 /// .expect("no other consumer is attached yet");
261 /// let shutdown = GracefulShutdown::builder()
262 /// .unix_sigterm(Duration::from_secs(1))
263 /// .windows_ctrl_break(Duration::from_secs(2))
264 /// .build();
265 /// process.terminate(shutdown).await.unwrap();
266 /// let collected = consumer.wait().await.unwrap(); // This will return immediately.
267 /// # }
268 /// ```
269 ///
270 /// # Errors
271 ///
272 /// Returns [`ConsumerError::TaskJoin`] if the consumer task cannot be joined, or
273 /// [`ConsumerError::StreamRead`] if the underlying stream fails while being read.
274 /// Visitor-specific outcomes (e.g. a writer-backed visitor's sink failure) appear inside
275 /// the returned `S`, not in [`ConsumerError`].
276 ///
277 /// # Panics
278 ///
279 /// Panics if the consumer's internal task has already been taken.
280 pub async fn wait(self) -> Result<S, ConsumerError> {
281 self.into_wait().wait().await
282 }
283
284 /// Forcefully aborts the consumer task.
285 ///
286 /// This drops any pending async visitor callback or writer future, releases the stream
287 /// subscription, and drops the sink/writer instead of returning it. It cannot preempt blocking
288 /// synchronous code that never yields to the async runtime.
289 ///
290 /// For single-subscriber streams, the consumer claim is released after the aborted task has
291 /// been joined during this method.
292 pub async fn abort(self) {
293 self.into_wait().abort().await;
294 }
295
296 /// Cooperatively cancels the consumer, aborting it if `timeout` elapses first.
297 ///
298 /// Returns [`ConsumerCancelOutcome::Cancelled`] with the sink when the consumer observes
299 /// cancellation and exits normally before the timeout. Returns
300 /// [`ConsumerCancelOutcome::Aborted`] when the timeout elapses; in that case the task is
301 /// aborted, any pending callback/write future is dropped, and the sink/writer is not returned.
302 ///
303 /// Cancellation is still cooperative until the timeout boundary: an in-flight async callback
304 /// or writer call must finish before cancellation can be observed. For single-subscriber
305 /// streams, the consumer claim is released before this method returns, both after successful
306 /// cooperative cancellation and after timeout-driven abort.
307 ///
308 /// # Errors
309 ///
310 /// Returns [`ConsumerError::TaskJoin`] if the consumer task cannot be joined before the
311 /// timeout, or [`ConsumerError::StreamRead`] if the underlying stream fails while being read
312 /// before cancellation is observed. Visitor-specific outcomes appear inside the returned
313 /// `S` (carried by [`ConsumerCancelOutcome::Cancelled`]).
314 ///
315 /// # Panics
316 ///
317 /// Panics if the consumer's internal cancellation sender has already been taken.
318 pub async fn cancel(
319 self,
320 timeout: Duration,
321 ) -> Result<ConsumerCancelOutcome<S>, ConsumerError> {
322 let mut wait = self.into_wait();
323 wait.cancel();
324 match wait.wait_until(Instant::now() + timeout).await? {
325 Some(sink) => Ok(ConsumerCancelOutcome::Cancelled(sink)),
326 None => Ok(ConsumerCancelOutcome::Aborted),
327 }
328 }
329}
330
331impl<S: Sink> ConsumerWait<S> {
332 pub(crate) fn cancel(&mut self) {
333 self.guard.cancel();
334 }
335
336 pub(crate) async fn wait(&mut self) -> Result<S, ConsumerError> {
337 self.guard.wait(self.stream_name).await
338 }
339
340 pub(crate) async fn wait_until(
341 &mut self,
342 deadline: Instant,
343 ) -> Result<Option<S>, ConsumerError> {
344 let timeout = sleep_until(deadline);
345 tokio::pin!(timeout);
346
347 tokio::select! {
348 result = self.wait() => result.map(Some),
349 () = &mut timeout => {
350 self.abort().await;
351 Ok(None)
352 }
353 }
354 }
355
356 pub(crate) async fn abort(&mut self) {
357 self.guard.abort().await;
358 }
359}
360
361impl<S: Sink> Drop for Consumer<S> {
362 fn drop(&mut self) {
363 if let Some(task_termination_sender) = self.task_termination_sender.take() {
364 // We ignore any potential error here.
365 // Sending may fail if the task is already terminated (for example, by reaching EOF),
366 // which in turn dropped the receiver end!
367 let _res = task_termination_sender.send(());
368 }
369 if let Some(task) = self.task.take() {
370 task.abort();
371 }
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378 use assertr::prelude::*;
379 use tokio::sync::oneshot;
380
381 #[tokio::test]
382 async fn cancel_returns_cancelled_when_cooperative() {
383 let (task_termination_sender, task_termination_receiver) = oneshot::channel();
384 let consumer = Consumer {
385 stream_name: "custom",
386 task: Some(tokio::spawn(async move {
387 let _res = task_termination_receiver.await;
388 Ok(Vec::<u8>::new())
389 })),
390 task_termination_sender: Some(task_termination_sender),
391 };
392
393 let outcome = consumer.cancel(Duration::from_secs(1)).await.unwrap();
394
395 match outcome {
396 ConsumerCancelOutcome::Cancelled(bytes) => {
397 assert_that!(bytes).is_empty();
398 }
399 ConsumerCancelOutcome::Aborted => {
400 assert_that!(()).fail("expected cooperative cancellation");
401 }
402 }
403 }
404}