tokio-process-tools 0.10.0

Correctness-focused async subprocess orchestration for Tokio: bounded output, multi-consumer streams, output detection, guaranteed cleanup and graceful termination.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
//! Tokio runtime adapter. Drives a [`StreamVisitor`](crate::StreamVisitor) over a
//! [`Subscription`](crate::output_stream::Subscription) on a tokio task and exposes the
//! [`Consumer<S>`] handle with cooperative-cancel / abort semantics. Required machinery;
//! tokio-bound by construction. The visitor traits this module drives are runtime-agnostic and
//! live one level up at [`crate::output_stream::visitor`].

pub(crate) mod driver;

pub(crate) use driver::{spawn_consumer_async, spawn_consumer_sync};

use crate::StreamReadError;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;
use tokio::time::{Instant, sleep_until};

/// Errors that the [`Consumer`] infrastructure itself can raise while driving its stream.
///
/// These describe failures of the consumer task — joining, or reading the underlying stream.
/// Visitor-specific failures (for example, a write-backed visitor's sink rejecting bytes) live
/// in the visitor's own [`StreamVisitor::Output`](crate::StreamVisitor::Output) /
/// [`AsyncStreamVisitor::Output`](crate::AsyncStreamVisitor::Output) type, not here. So a
/// writer-backed consumer's `wait` returns
/// `Result<Result<W, SinkWriteError>, ConsumerError>`: the outer result is what `ConsumerError`
/// describes, the inner is the writer visitor's own outcome.
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum ConsumerError {
    /// The consumer task could not be joined/terminated.
    #[error("Failed to join/terminate the consumer task over stream '{stream_name}': {source}")]
    TaskJoin {
        /// The name of the stream this consumer operates on.
        stream_name: &'static str,

        /// The source error.
        #[source]
        source: tokio::task::JoinError,
    },

    /// The underlying stream failed while being read.
    #[error("{source}")]
    StreamRead {
        /// The source error.
        #[source]
        source: StreamReadError,
    },
}

/// A trait for types that can act as sinks for collected stream data.
///
/// This is automatically implemented for any type that is `Send + 'static`.
pub trait Sink: Send + 'static {}

impl<T> Sink for T where T: Send + 'static {}

/// The result of [`Consumer::cancel`].
#[derive(Debug)]
pub enum ConsumerCancelOutcome<S: Sink> {
    /// The consumer observed cooperative cancellation before the timeout and returned its sink.
    Cancelled(S),

    /// The timeout elapsed, so the consumer task was aborted and its sink was dropped.
    Aborted,
}

impl<S: Sink> ConsumerCancelOutcome<S> {
    /// Returns the sink from a cooperative cancellation, or `None` if the timeout elapsed.
    #[must_use]
    pub fn into_cancelled(self) -> Option<S> {
        match self {
            Self::Cancelled(sink) => Some(sink),
            Self::Aborted => None,
        }
    }

    /// Returns the sink from a cooperative cancellation, panicking with `message` if the timeout
    /// elapsed and the task was aborted instead.
    ///
    /// Useful in tests where cooperative cancellation is expected to win the race; production
    /// code should match on the outcome explicitly.
    ///
    /// # Panics
    ///
    /// Panics with `message` when this outcome is [`Self::Aborted`].
    pub fn expect_cancelled(self, message: &str) -> S {
        self.into_cancelled().expect(message)
    }
}

/// A handle for a tokio task that consumes a stream by driving a visitor over its events.
///
/// Consumers are produced by the `inspect_*`, `collect_*`, and `wait_for_line` factory methods on
/// [`BroadcastOutputStream`](crate::BroadcastOutputStream) and
/// [`SingleSubscriberOutputStream`](crate::SingleSubscriberOutputStream). The type parameter `S`
/// is the visitor's output (a sink, a writer, `()`, or another value the visitor returns when the
/// stream ends).
///
/// For proper cleanup, call
/// - `wait()`, which waits for the consumer task to complete.
/// - `cancel(timeout)`, which asks the consumer to stop, waits for cooperative completion, and
///   aborts the task if the timeout elapses first.
/// - `abort()`, which forcefully aborts the consumer task.
///
/// If not cleaned up, the termination signal will be sent when dropping this consumer,
/// but the task will be aborted (forceful, not waiting for its regular completion).
pub struct Consumer<S: Sink> {
    /// The name of the stream this consumer operates on.
    pub(crate) stream_name: &'static str,

    pub(crate) task: Option<JoinHandle<Result<S, StreamReadError>>>,
    pub(crate) task_termination_sender: Option<Sender<()>>,
}

pub(crate) struct ConsumerWait<S: Sink> {
    stream_name: &'static str,
    guard: ConsumerWaitGuard<S>,
}

/// Owns a consumer task while [`Consumer::wait`] is pending.
///
/// `Consumer::wait` consumes the [`Consumer`] and then awaits its task. Without this guard,
/// dropping that wait future after the task handle has been taken would detach the task instead
/// of applying the same cleanup behavior as dropping an unused [`Consumer`]. The guard makes
/// `wait` cancellation safe by signalling termination and aborting the task if the wait future is
/// dropped early.
struct ConsumerWaitGuard<S: Sink> {
    task: Option<JoinHandle<Result<S, StreamReadError>>>,
    task_termination_sender: Option<Sender<()>>,
}

impl<S: Sink> ConsumerWaitGuard<S> {
    fn cancel(&mut self) {
        let _res = self
            .task_termination_sender
            .take()
            .expect("`task_termination_sender` to be present.")
            .send(());
    }

    async fn wait(&mut self, stream_name: &'static str) -> Result<S, ConsumerError> {
        let sink = self
            .task
            .as_mut()
            .expect("`task` to be present.")
            .await
            .map_err(|err| ConsumerError::TaskJoin {
                stream_name,
                source: err,
            })?
            .map_err(|source| ConsumerError::StreamRead { source });

        self.task = None;
        self.task_termination_sender = None;

        sink
    }

    async fn abort(&mut self) {
        if let Some(task_termination_sender) = self.task_termination_sender.take() {
            let _res = task_termination_sender.send(());
        }
        if let Some(task) = &self.task {
            task.abort();
        }
        if let Some(task) = self.task.as_mut() {
            let _res = task.await;
        }
        self.task = None;
    }
}

impl<S: Sink> Drop for ConsumerWaitGuard<S> {
    fn drop(&mut self) {
        if let Some(task_termination_sender) = self.task_termination_sender.take() {
            let _res = task_termination_sender.send(());
        }
        if let Some(task) = self.task.take() {
            task.abort();
        }
    }
}

impl<S: Sink> Consumer<S> {
    pub(crate) fn into_wait(mut self) -> ConsumerWait<S> {
        ConsumerWait {
            stream_name: self.stream_name,
            guard: ConsumerWaitGuard {
                task: self.task.take(),
                task_termination_sender: self.task_termination_sender.take(),
            },
        }
    }

    /// Returns whether the consumer task has finished.
    ///
    /// This is a non-blocking task-state check. A finished consumer still owns its task result
    /// until [`wait`](Self::wait), [`cancel`](Self::cancel), or [`abort`](Self::abort) consumes
    /// it.
    #[must_use]
    pub fn is_finished(&self) -> bool {
        self.task.as_ref().is_none_or(JoinHandle::is_finished)
    }

    /// Waits for the consumer to terminate naturally and returns its sink.
    ///
    /// A consumer will automatically terminate when either:
    ///
    /// 1. The underlying write-side of the stream is dropped.
    /// 2. The underlying stream is closed (by receiving an EOF / final read of 0 bytes).
    /// 3. The first `Next::Break` is observed.
    ///
    /// If none of these may occur in your case, this can hang forever. `wait` also waits for any
    /// in-flight async visitor callback or writer call to complete.
    ///
    /// The stdout/stderr streams naturally close when the process is terminated, so `wait`ing
    /// on a consumer after termination is fine:
    ///
    /// ```rust, no_run
    /// # use std::time::Duration;
    /// # use tokio_process_tools::{
    /// #     AutoName, CollectionOverflowBehavior, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE,
    /// #     GracefulTimeouts, LineCollectionOptions, LineParsingOptions, NumBytesExt, Process,
    /// # };
    /// # async fn test() {
    /// # let cmd = tokio::process::Command::new("ls");
    /// let mut process = Process::new(cmd)
    ///     .name(AutoName::program_only())
    ///     .stdout_and_stderr(|stream| {
    ///         stream
    ///             .broadcast()
    ///             .best_effort_delivery()
    ///             .no_replay()
    ///             .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
    ///             .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
    ///     })
    ///     .spawn()
    ///     .unwrap();
    /// let consumer = process.stdout().collect_lines_into_vec(
    ///     LineParsingOptions::default(),
    ///     LineCollectionOptions::Bounded {
    ///         max_bytes: 1.megabytes(),
    ///         max_lines: 1024,
    ///         overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
    ///     },
    /// );
    /// # #[cfg(unix)]
    /// let timeouts = GracefulTimeouts {
    ///     interrupt_timeout: Duration::from_secs(1),
    ///     terminate_timeout: Duration::from_secs(1),
    /// };
    /// # #[cfg(windows)]
    /// # let timeouts = GracefulTimeouts { graceful_timeout: Duration::from_secs(2) };
    /// process.terminate(timeouts).await.unwrap();
    /// let collected = consumer.wait().await.unwrap(); // This will return immediately.
    /// # }
    /// ```
    ///
    /// # Errors
    ///
    /// Returns [`ConsumerError::TaskJoin`] if the consumer task cannot be joined, or
    /// [`ConsumerError::StreamRead`] if the underlying stream fails while being read.
    /// Visitor-specific outcomes (e.g. a writer-backed visitor's sink failure) appear inside
    /// the returned `S`, not in [`ConsumerError`].
    ///
    /// # Panics
    ///
    /// Panics if the consumer's internal task has already been taken.
    pub async fn wait(self) -> Result<S, ConsumerError> {
        self.into_wait().wait().await
    }

    /// Forcefully aborts the consumer task.
    ///
    /// This drops any pending async visitor callback or writer future, releases the stream
    /// subscription, and drops the sink/writer instead of returning it. It cannot preempt blocking
    /// synchronous code that never yields to the async runtime.
    ///
    /// For single-subscriber streams, the consumer claim is released after the aborted task has
    /// been joined during this method.
    pub async fn abort(self) {
        self.into_wait().abort().await;
    }

    /// Cooperatively cancels the consumer, aborting it if `timeout` elapses first.
    ///
    /// Returns [`ConsumerCancelOutcome::Cancelled`] with the sink when the consumer observes
    /// cancellation and exits normally before the timeout. Returns
    /// [`ConsumerCancelOutcome::Aborted`] when the timeout elapses; in that case the task is
    /// aborted, any pending callback/write future is dropped, and the sink/writer is not returned.
    ///
    /// Cancellation is still cooperative until the timeout boundary: an in-flight async callback
    /// or writer call must finish before cancellation can be observed. For single-subscriber
    /// streams, the consumer claim is released before this method returns, both after successful
    /// cooperative cancellation and after timeout-driven abort.
    ///
    /// # Errors
    ///
    /// Returns [`ConsumerError::TaskJoin`] if the consumer task cannot be joined before the
    /// timeout, or [`ConsumerError::StreamRead`] if the underlying stream fails while being read
    /// before cancellation is observed. Visitor-specific outcomes appear inside the returned
    /// `S` (carried by [`ConsumerCancelOutcome::Cancelled`]).
    ///
    /// # Panics
    ///
    /// Panics if the consumer's internal cancellation sender has already been taken.
    pub async fn cancel(
        self,
        timeout: Duration,
    ) -> Result<ConsumerCancelOutcome<S>, ConsumerError> {
        let mut wait = self.into_wait();
        wait.cancel();
        match wait.wait_until(Instant::now() + timeout).await? {
            Some(sink) => Ok(ConsumerCancelOutcome::Cancelled(sink)),
            None => Ok(ConsumerCancelOutcome::Aborted),
        }
    }
}

impl<S: Sink> ConsumerWait<S> {
    pub(crate) fn cancel(&mut self) {
        self.guard.cancel();
    }

    pub(crate) async fn wait(&mut self) -> Result<S, ConsumerError> {
        self.guard.wait(self.stream_name).await
    }

    pub(crate) async fn wait_until(
        &mut self,
        deadline: Instant,
    ) -> Result<Option<S>, ConsumerError> {
        let timeout = sleep_until(deadline);
        tokio::pin!(timeout);

        tokio::select! {
            result = self.wait() => result.map(Some),
            () = &mut timeout => {
                self.abort().await;
                Ok(None)
            }
        }
    }

    pub(crate) async fn abort(&mut self) {
        self.guard.abort().await;
    }
}

impl<S: Sink> Drop for Consumer<S> {
    fn drop(&mut self) {
        if let Some(task_termination_sender) = self.task_termination_sender.take() {
            // We ignore any potential error here.
            // Sending may fail if the task is already terminated (for example, by reaching EOF),
            // which in turn dropped the receiver end!
            let _res = task_termination_sender.send(());
        }
        if let Some(task) = self.task.take() {
            task.abort();
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use assertr::prelude::*;
    use std::io;
    use tokio::sync::oneshot;

    #[test]
    fn stream_read_display_uses_source_context() {
        let source = StreamReadError::new("stdout", io::Error::from(io::ErrorKind::BrokenPipe));
        let expected = source.to_string();
        let err = ConsumerError::StreamRead { source };

        assert_that!(err.to_string()).is_equal_to(expected);
    }

    #[tokio::test]
    async fn cancel_returns_cancelled_when_cooperative() {
        let (task_termination_sender, task_termination_receiver) = oneshot::channel();
        let consumer = Consumer {
            stream_name: "custom",
            task: Some(tokio::spawn(async move {
                let _res = task_termination_receiver.await;
                Ok(Vec::<u8>::new())
            })),
            task_termination_sender: Some(task_termination_sender),
        };

        let outcome = consumer.cancel(Duration::from_secs(1)).await.unwrap();

        match outcome {
            ConsumerCancelOutcome::Cancelled(bytes) => {
                assert_that!(bytes).is_empty();
            }
            ConsumerCancelOutcome::Aborted => {
                assert_that!(()).fail("expected cooperative cancellation");
            }
        }
    }
}