tokio_process_tools/output_stream/mod.rs
1//! Process output stream types and helpers.
2//!
3//! The submodules below correspond to the four conceptual layers this subsystem is built from:
4//!
5//! - **Core abstractions:** the [`OutputStream`] / [`Subscription`] / [`Subscribable`] /
6//! [`Next`] traits defined here, [`event`]'s [`Chunk`](event::Chunk) /
7//! [`StreamEvent`](StreamEvent), the [`policy`] / [`config`] / [`num_bytes`] / [`line`]
8//! modules, and the [`visitor`] trait pair every visitor implementation builds against. These
9//! files have no tokio dependency.
10//! - **Tokio runtime adapter** ([`consumer`]): the [`Consumer<S>`](Consumer) handle
11//! plus the driver loops that step a visitor over a subscription on a tokio task with
12//! cooperative-cancel / abort semantics.
13//! - **Tokio stream backends** ([`backend`]): `broadcast` and `single_subscriber`, which ingest
14//! any [`tokio::io::AsyncRead`] and emit [`StreamEvent`](StreamEvent)s.
15//! - **User-replaceable convenience layer** ([`visitors`]): the built-in `collect`, `inspect`,
16//! `wait`, and `write` visitor implementations. `consume(my_visitor)` /
17//! `consume_async(my_visitor)` on any [`Consumable`] stream is the single entry point;
18//! construct a bundled visitor for the common cases or implement [`StreamVisitor`] /
19//! [`AsyncStreamVisitor`] for custom ones.
20
21pub(crate) mod consumer;
22
23/// Output stream backend implementations.
24pub(crate) mod backend;
25
26/// Shared stream consumption configuration.
27pub(crate) mod config;
28
29pub(crate) mod event;
30
31/// Line parsing types and options.
32pub(crate) mod line;
33
34/// `NumBytes` newtype and convenience constructors used throughout the public API.
35pub(crate) mod num_bytes;
36
37/// Delivery and replay policy types shared by output stream backends.
38pub(crate) mod policy;
39
40/// Visitor traits, the runtime-agnostic contract every stream observer implements.
41pub(crate) mod visitor;
42
43/// Built-in [`StreamVisitor`] / [`AsyncStreamVisitor`] implementations covering the common
44/// inspect / collect / write / wait cases.
45pub mod visitors;
46
47use crate::output_stream::consumer::{spawn_consumer_async, spawn_consumer_sync};
48use crate::{AsyncStreamVisitor, Consumer, StreamVisitor};
49use core::error::Error;
50use event::StreamEvent;
51use num_bytes::NumBytes;
52
53/// We support the following implementations:
54///
55/// - [`crate::BroadcastOutputStream`]
56/// - [`crate::SingleSubscriberOutputStream`]
57pub trait OutputStream: Consumable {
58 /// The maximum size of every chunk read by the backing `stream_reader`.
59 fn read_chunk_size(&self) -> NumBytes;
60
61 /// The number of chunks held by the underlying async channel.
62 fn max_buffered_chunks(&self) -> usize;
63
64 /// Type of stream. Can be "stdout" or "stderr". But we do not guarantee this output.
65 /// It should only be used for logging/debugging.
66 fn name(&self) -> &'static str;
67}
68
69/// Stream event subscription used by built-in consumers.
70pub trait Subscription: Send + 'static {
71 /// Returns the next stream event, or `None` once the subscription is closed.
72 ///
73 /// `None` is only returned after the subscription has emitted a terminal event
74 /// ([`StreamEvent::Eof`] or [`StreamEvent::ReadError`]) on this subscription, after which it
75 /// will remain `None` for every subsequent call. Implementations must not return `None` while
76 /// further chunks or gap markers are still pending.
77 fn next_event(&mut self) -> impl Future<Output = Option<StreamEvent>> + Send + '_;
78}
79
80/// Output stream backend that can reject consumer subscriptions.
81pub trait Subscribable {
82 /// The concrete subscription handle returned by [`try_subscribe`](Self::try_subscribe).
83 type Subscription: Subscription;
84
85 /// The error returned when subscription fails.
86 type SubscribeError: Error + Send + Sync + 'static;
87
88 /// Creates a new subscription for a consumer, or returns why the consumer cannot be started.
89 ///
90 /// # Errors
91 ///
92 /// Returns [`Self::SubscribeError`] when the backend cannot start a new consumer, for
93 /// example because a single-subscriber backend already has an active consumer.
94 fn try_subscribe(&self) -> Result<Self::Subscription, Self::SubscribeError>;
95}
96
97/// Enables a stream to be consumed by [`StreamVisitor`]s and [`AsyncStreamVisitor`]s.
98///
99/// Construct a bundled visitor under [`visitors`] (or your own [`StreamVisitor`] /
100/// [`AsyncStreamVisitor`] implementation) and pass it to [`consume`](Self::consume) or
101/// [`consume_async`](Self::consume_async). The returned [`Consumer`] owns the spawned tokio
102/// task that drives the visitor over this stream.
103///
104/// Implementors only need to specify [`Error`](Self::Error). The `consume` and `consume_async`
105/// methods have default impls that subscribe via [`Subscribable::try_subscribe`] and spawn the
106/// consumer task; those defaults additionally require `Self: OutputStream` because they label
107/// the consumer task with [`OutputStream::name`].
108pub trait Consumable: Subscribable {
109 /// Error returned when consumer creation fails. Must be constructible from the underlying
110 /// [`Subscribable::SubscribeError`] so the default `consume` / `consume_async` impls can
111 /// propagate subscription failures.
112 type Error: Error + Send + Sync + 'static + From<Self::SubscribeError>;
113
114 //noinspection RsDoubleMustUse
115 /// Tries to drive the provided synchronous [`StreamVisitor`] over this stream and returns a
116 /// [`Consumer`] that owns the spawned task.
117 ///
118 /// The returned [`Consumer`]'s [`wait`](Consumer::wait) yields whatever the visitor produces
119 /// through [`StreamVisitor::into_output`], allowing visitor implementors to give and get
120 /// back ownership of some value.
121 ///
122 /// # Errors
123 ///
124 /// Returns [`Self::Error`] if the backend rejects the consumer creation (for example,
125 /// because a single-subscriber backend already has an active consumer).
126 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the `Consumer`-internal tokio task, meaning that your visitor is never invoked and the consumer effectively dies immediately. You can safely do a `let _consumer = ...` binding to ignore the typical 'unused' warning."]
127 fn consume<V>(&self, visitor: V) -> Result<Consumer<V::Output>, Self::Error>
128 where
129 V: StreamVisitor,
130 Self: OutputStream,
131 {
132 Ok(spawn_consumer_sync(
133 self.name(),
134 self.try_subscribe()?,
135 visitor,
136 ))
137 }
138
139 //noinspection RsDoubleMustUse
140 /// Tries to drive the provided asynchronous [`AsyncStreamVisitor`] over this stream and
141 /// returns a [`Consumer`] that owns the spawned task.
142 ///
143 /// Use this when observing a chunk requires `.await` (for example, forwarding chunks to an
144 /// async writer or channel). See [`consume`](Self::consume) for the synchronous variant.
145 ///
146 /// # Errors
147 ///
148 /// Returns [`Self::Error`] if the backend rejects the consumer creation.
149 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the `Consumer`-internal tokio task, meaning that your visitor is never invoked and the consumer effectively dies immediately. You can safely do a `let _consumer = ...` binding to ignore the typical 'unused' warning."]
150 fn consume_async<V>(&self, visitor: V) -> Result<Consumer<V::Output>, Self::Error>
151 where
152 V: AsyncStreamVisitor,
153 Self: OutputStream,
154 {
155 Ok(spawn_consumer_async(
156 self.name(),
157 self.try_subscribe()?,
158 visitor,
159 ))
160 }
161}
162
163/// Control flag to indicate whether processing should continue or break.
164///
165/// Returning `Break` from an `Inspector`/`Consumer` will let that instance stop visiting any
166/// more data.
167#[derive(Debug, Clone, Copy, PartialEq, Eq)]
168pub enum Next {
169 /// Interested in receiving additional data.
170 Continue,
171
172 /// Not interested in receiving additional data. Will let the `inspector`/`collector` stop.
173 Break,
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179 use crate::output_stream::backend::broadcast::BroadcastOutputStream;
180 use crate::output_stream::backend::discard::DiscardedOutputStream;
181 use crate::output_stream::backend::single_subscriber::SingleSubscriberOutputStream;
182 use crate::output_stream::config::StreamConfig;
183 use crate::output_stream::event::Chunk;
184 use crate::output_stream::visitors::inspect::InspectChunks;
185 use crate::{ConsumerError, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE};
186 use assertr::prelude::*;
187 use std::fmt::Debug;
188 use tokio::io::AsyncWriteExt;
189
190 /// Generic helper that runs an `InspectChunks` visitor against any [`Consumable`] +
191 /// [`OutputStream`] backend and counts the chunks observed before EOF. Compiles only when
192 /// `Consumable` carries everything callers need (independent of the concrete backend's
193 /// `Error` type), so it pins the trait shape against silent regressions.
194 async fn count_chunks<S>(stream: &S) -> Result<usize, ConsumerError>
195 where
196 S: Consumable + OutputStream,
197 S::SubscribeError: Debug,
198 {
199 use std::sync::{Arc, Mutex};
200
201 let counter = Arc::new(Mutex::new(0_usize));
202 let counter_in_visitor = Arc::clone(&counter);
203 let consumer = stream
204 .consume(
205 InspectChunks::builder()
206 .f(move |_chunk: Chunk| {
207 *counter_in_visitor.lock().unwrap() += 1;
208 Next::Continue
209 })
210 .build(),
211 )
212 .expect("consumer should start");
213 consumer.wait().await?;
214 Ok(*counter.lock().unwrap())
215 }
216
217 #[tokio::test]
218 async fn cross_backend_consumable_smoke() {
219 let stream_config: StreamConfig<crate::LossyWithoutBackpressure, crate::NoReplay> =
220 StreamConfig::builder()
221 .lossy_without_backpressure()
222 .no_replay()
223 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
224 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
225 .build();
226
227 let (broadcast_read, mut broadcast_write) = tokio::io::duplex(64);
228 let broadcast = BroadcastOutputStream::from_stream(broadcast_read, "bcast", stream_config);
229 broadcast_write.write_all(b"abc").await.unwrap();
230 drop(broadcast_write);
231 let broadcast_count = count_chunks(&broadcast).await.unwrap();
232 assert_that!(broadcast_count).is_greater_or_equal_to(1);
233
234 let (single_read, mut single_write) = tokio::io::duplex(64);
235 let single =
236 SingleSubscriberOutputStream::from_stream(single_read, "single", stream_config);
237 single_write.write_all(b"abc").await.unwrap();
238 drop(single_write);
239 let single_count = count_chunks(&single).await.unwrap();
240 assert_that!(single_count).is_greater_or_equal_to(1);
241
242 let discarded = DiscardedOutputStream::new("discard");
243 let discarded_count = count_chunks(&discarded).await.unwrap();
244 assert_that!(discarded_count).is_equal_to(0);
245 }
246}