Skip to main content

tokio_process_tools/output_stream/
mod.rs

1//! Process output stream types and helpers.
2//!
3//! The submodules below correspond to the four conceptual layers this subsystem is built from:
4//!
5//! - **Core abstractions:** the [`OutputStream`] / [`Subscription`] / [`Subscribable`] /
6//!   [`Next`] traits defined here, [`event`]'s [`Chunk`](event::Chunk) /
7//!   [`StreamEvent`](StreamEvent), the [`policy`] / [`config`] / [`num_bytes`] / [`line`]
8//!   modules, and the [`visitor`] trait pair every visitor implementation builds against. These
9//!   files have no tokio dependency.
10//! - **Tokio runtime adapter** ([`consumer`]): the [`Consumer<S>`](Consumer) handle
11//!   plus the driver loops that step a visitor over a subscription on a tokio task with
12//!   cooperative-cancel / abort semantics.
13//! - **Tokio stream backends** ([`backend`]): `broadcast` and `single_subscriber`, which ingest
14//!   any [`tokio::io::AsyncRead`] and emit [`StreamEvent`](StreamEvent)s.
15//! - **User-replaceable convenience layer** ([`visitors`]): the built-in `collect`, `inspect`,
16//!   `wait`, and `write` visitor implementations. `consume(my_visitor)` /
17//!   `consume_async(my_visitor)` on any [`Consumable`] stream is the single entry point;
18//!   construct a bundled visitor for the common cases or implement [`StreamVisitor`] /
19//!   [`AsyncStreamVisitor`] for custom ones.
20
21pub(crate) mod consumer;
22
23/// Output stream backend implementations.
24pub(crate) mod backend;
25
26/// Shared stream consumption configuration.
27pub(crate) mod config;
28
29pub(crate) mod event;
30
31/// Line parsing types and options.
32pub(crate) mod line;
33
34/// `NumBytes` newtype and convenience constructors used throughout the public API.
35pub(crate) mod num_bytes;
36
37/// Delivery and replay policy types shared by output stream backends.
38pub(crate) mod policy;
39
40/// Visitor traits, the runtime-agnostic contract every stream observer implements.
41pub(crate) mod visitor;
42
43/// Built-in [`StreamVisitor`] / [`AsyncStreamVisitor`] implementations covering the common
44/// inspect / collect / write / wait cases.
45pub mod visitors;
46
47use crate::output_stream::consumer::{spawn_consumer_async, spawn_consumer_sync};
48use crate::{AsyncStreamVisitor, Consumer, StreamVisitor};
49use core::error::Error;
50use event::StreamEvent;
51use num_bytes::NumBytes;
52
53/// We support the following implementations:
54///
55/// - [`crate::BroadcastOutputStream`]
56/// - [`crate::SingleSubscriberOutputStream`]
57pub trait OutputStream: Consumable {
58    /// The maximum size of every chunk read by the backing `stream_reader`.
59    fn read_chunk_size(&self) -> NumBytes;
60
61    /// The number of chunks held by the underlying async channel.
62    fn max_buffered_chunks(&self) -> usize;
63
64    /// Type of stream. Can be "stdout" or "stderr". But we do not guarantee this output.
65    /// It should only be used for logging/debugging.
66    fn name(&self) -> &'static str;
67}
68
69/// Stream event subscription used by built-in consumers.
70pub trait Subscription: Send + 'static {
71    /// Returns the next stream event, or `None` once the subscription is closed.
72    ///
73    /// `None` is only returned after the subscription has emitted a terminal event
74    /// ([`StreamEvent::Eof`] or [`StreamEvent::ReadError`]) on this subscription, after which it
75    /// will remain `None` for every subsequent call. Implementations must not return `None` while
76    /// further chunks or gap markers are still pending.
77    fn next_event(&mut self) -> impl Future<Output = Option<StreamEvent>> + Send + '_;
78}
79
80/// Output stream backend that can reject consumer subscriptions.
81pub trait Subscribable {
82    /// The concrete subscription handle returned by [`try_subscribe`](Self::try_subscribe).
83    type Subscription: Subscription;
84
85    /// The error returned when subscription fails.
86    type SubscribeError: Error + Send + Sync + 'static;
87
88    /// Creates a new subscription for a consumer, or returns why the consumer cannot be started.
89    ///
90    /// # Errors
91    ///
92    /// Returns [`Self::SubscribeError`] when the backend cannot start a new consumer, for
93    /// example because a single-subscriber backend already has an active consumer.
94    fn try_subscribe(&self) -> Result<Self::Subscription, Self::SubscribeError>;
95}
96
97/// Enables a stream to be consumed by [`StreamVisitor`]s and [`AsyncStreamVisitor`]s.
98///
99/// Construct a bundled visitor under [`visitors`] (or your own [`StreamVisitor`] /
100/// [`AsyncStreamVisitor`] implementation) and pass it to [`consume`](Self::consume) or
101/// [`consume_async`](Self::consume_async). The returned [`Consumer`] owns the spawned tokio
102/// task that drives the visitor over this stream.
103///
104/// Implementors only need to specify [`Error`](Self::Error). The `consume` and `consume_async`
105/// methods have default impls that subscribe via [`Subscribable::try_subscribe`] and spawn the
106/// consumer task; those defaults additionally require `Self: OutputStream` because they label
107/// the consumer task with [`OutputStream::name`].
108pub trait Consumable: Subscribable {
109    /// Error returned when consumer creation fails. Must be constructible from the underlying
110    /// [`Subscribable::SubscribeError`] so the default `consume` / `consume_async` impls can
111    /// propagate subscription failures.
112    type Error: Error + Send + Sync + 'static + From<Self::SubscribeError>;
113
114    //noinspection RsDoubleMustUse
115    /// Tries to drive the provided synchronous [`StreamVisitor`] over this stream and returns a
116    /// [`Consumer`] that owns the spawned task.
117    ///
118    /// The returned [`Consumer`]'s [`wait`](Consumer::wait) yields whatever the visitor produces
119    /// through [`StreamVisitor::into_output`], allowing visitor implementors to give and get
120    /// back ownership of some value.
121    ///
122    /// # Errors
123    ///
124    /// Returns [`Self::Error`] if the backend rejects the consumer creation (for example,
125    /// because a single-subscriber backend already has an active consumer).
126    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the `Consumer`-internal tokio task, meaning that your visitor is never invoked and the consumer effectively dies immediately. You can safely do a `let _consumer = ...` binding to ignore the typical 'unused' warning."]
127    fn consume<V>(&self, visitor: V) -> Result<Consumer<V::Output>, Self::Error>
128    where
129        V: StreamVisitor,
130        Self: OutputStream,
131    {
132        Ok(spawn_consumer_sync(
133            self.name(),
134            self.try_subscribe()?,
135            visitor,
136        ))
137    }
138
139    //noinspection RsDoubleMustUse
140    /// Tries to drive the provided asynchronous [`AsyncStreamVisitor`] over this stream and
141    /// returns a [`Consumer`] that owns the spawned task.
142    ///
143    /// Use this when observing a chunk requires `.await` (for example, forwarding chunks to an
144    /// async writer or channel). See [`consume`](Self::consume) for the synchronous variant.
145    ///
146    /// # Errors
147    ///
148    /// Returns [`Self::Error`] if the backend rejects the consumer creation.
149    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the `Consumer`-internal tokio task, meaning that your visitor is never invoked and the consumer effectively dies immediately. You can safely do a `let _consumer = ...` binding to ignore the typical 'unused' warning."]
150    fn consume_async<V>(&self, visitor: V) -> Result<Consumer<V::Output>, Self::Error>
151    where
152        V: AsyncStreamVisitor,
153        Self: OutputStream,
154    {
155        Ok(spawn_consumer_async(
156            self.name(),
157            self.try_subscribe()?,
158            visitor,
159        ))
160    }
161}
162
163/// Control flag to indicate whether processing should continue or break.
164///
165/// Returning `Break` from an `Inspector`/`Consumer` will let that instance stop visiting any
166/// more data.
167#[derive(Debug, Clone, Copy, PartialEq, Eq)]
168pub enum Next {
169    /// Interested in receiving additional data.
170    Continue,
171
172    /// Not interested in receiving additional data. Will let the `inspector`/`collector` stop.
173    Break,
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use crate::output_stream::backend::broadcast::BroadcastOutputStream;
180    use crate::output_stream::backend::discard::DiscardedOutputStream;
181    use crate::output_stream::backend::single_subscriber::SingleSubscriberOutputStream;
182    use crate::output_stream::config::StreamConfig;
183    use crate::output_stream::event::Chunk;
184    use crate::output_stream::visitors::inspect::InspectChunks;
185    use crate::{ConsumerError, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE};
186    use assertr::prelude::*;
187    use std::fmt::Debug;
188    use tokio::io::AsyncWriteExt;
189
190    /// Generic helper that runs an `InspectChunks` visitor against any [`Consumable`] +
191    /// [`OutputStream`] backend and counts the chunks observed before EOF. Compiles only when
192    /// `Consumable` carries everything callers need (independent of the concrete backend's
193    /// `Error` type), so it pins the trait shape against silent regressions.
194    async fn count_chunks<S>(stream: &S) -> Result<usize, ConsumerError>
195    where
196        S: Consumable + OutputStream,
197        S::SubscribeError: Debug,
198    {
199        use std::sync::{Arc, Mutex};
200
201        let counter = Arc::new(Mutex::new(0_usize));
202        let counter_in_visitor = Arc::clone(&counter);
203        let consumer = stream
204            .consume(
205                InspectChunks::builder()
206                    .f(move |_chunk: Chunk| {
207                        *counter_in_visitor.lock().unwrap() += 1;
208                        Next::Continue
209                    })
210                    .build(),
211            )
212            .expect("consumer should start");
213        consumer.wait().await?;
214        Ok(*counter.lock().unwrap())
215    }
216
217    #[tokio::test]
218    async fn cross_backend_consumable_smoke() {
219        let stream_config: StreamConfig<crate::LossyWithoutBackpressure, crate::NoReplay> =
220            StreamConfig::builder()
221                .lossy_without_backpressure()
222                .no_replay()
223                .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
224                .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
225                .build();
226
227        let (broadcast_read, mut broadcast_write) = tokio::io::duplex(64);
228        let broadcast = BroadcastOutputStream::from_stream(broadcast_read, "bcast", stream_config);
229        broadcast_write.write_all(b"abc").await.unwrap();
230        drop(broadcast_write);
231        let broadcast_count = count_chunks(&broadcast).await.unwrap();
232        assert_that!(broadcast_count).is_greater_or_equal_to(1);
233
234        let (single_read, mut single_write) = tokio::io::duplex(64);
235        let single =
236            SingleSubscriberOutputStream::from_stream(single_read, "single", stream_config);
237        single_write.write_all(b"abc").await.unwrap();
238        drop(single_write);
239        let single_count = count_chunks(&single).await.unwrap();
240        assert_that!(single_count).is_greater_or_equal_to(1);
241
242        let discarded = DiscardedOutputStream::new("discard");
243        let discarded_count = count_chunks(&discarded).await.unwrap();
244        assert_that!(discarded_count).is_equal_to(0);
245    }
246}