tokio_process_tools/output_stream/mod.rs
1//! Process output stream types and helpers.
2//!
3//! The submodules below correspond to the four conceptual layers this subsystem is built from:
4//!
5//! - **Core abstractions** — the [`OutputStream`] / [`Subscription`] / [`TrySubscribable`] /
6//! [`Next`] traits defined here, [`event`]'s [`Chunk`](event::Chunk) /
7//! [`StreamEvent`](StreamEvent), the [`policy`] / [`config`] / [`num_bytes`] / [`line`]
8//! modules, and the [`visitor`] trait pair every visitor implementation builds against. These
9//! files have no tokio dependency.
10//! - **Tokio runtime adapter** ([`consumer`]) — the [`Consumer<S>`](consumer::Consumer) handle
11//! plus the driver loops that step a visitor over a subscription on a tokio task with
12//! cooperative-cancel / abort semantics.
13//! - **Tokio stream backends** ([`backend`]) — `broadcast` and `single_subscriber`, which ingest
14//! any [`tokio::io::AsyncRead`] and emit [`StreamEvent`](StreamEvent)s.
15//! - **User-replaceable convenience layer** ([`visitors`]) — the built-in `collect`, `inspect`,
16//! `wait`, and `write` visitors plus the `inspect_*` / `collect_*` factory macro that wires
17//! them as inherent methods on each backend. `consume_with(my_visitor)` is enough to use the
18//! library; everything in this module is sugar for the common cases.
19
20pub(crate) mod consumer;
21
22/// Output stream backend implementations.
23pub(crate) mod backend;
24
25/// Shared stream consumption configuration.
26pub(crate) mod config;
27
28pub(crate) mod event;
29
30/// Line parsing types and options.
31pub(crate) mod line;
32
33/// `NumBytes` newtype and convenience constructors used throughout the public API.
34pub(crate) mod num_bytes;
35
36/// Delivery and replay policy types shared by output stream backends.
37pub(crate) mod policy;
38
39/// Visitor traits, the runtime-agnostic contract every stream observer implements.
40pub(crate) mod visitor;
41
42/// Built-in visitors and the convenience factory macro that instantiates them.
43pub(crate) mod visitors;
44
45use crate::StreamConsumerError;
46use event::StreamEvent;
47use num_bytes::NumBytes;
48
49/// We support the following implementations:
50///
51/// - [`crate::BroadcastOutputStream`]
52/// - [`crate::SingleSubscriberOutputStream`]
53pub trait OutputStream {
54 /// The maximum size of every chunk read by the backing `stream_reader`.
55 fn read_chunk_size(&self) -> NumBytes;
56
57 /// The number of chunks held by the underlying async channel.
58 fn max_buffered_chunks(&self) -> usize;
59
60 /// Type of stream. Can be "stdout" or "stderr". But we do not guarantee this output.
61 /// It should only be used for logging/debugging.
62 fn name(&self) -> &'static str;
63}
64
65/// Stream event subscription used by built-in consumers.
66pub trait Subscription: Send + 'static {
67 /// Returns the next stream event, or `None` once the subscription is closed.
68 fn next_event(&mut self) -> impl Future<Output = Option<StreamEvent>> + Send + '_;
69}
70
71/// Output stream backend that can reject consumer subscriptions.
72pub trait TrySubscribable: OutputStream {
73 /// Creates a new subscription for a consumer, or returns why the consumer cannot be started.
74 ///
75 /// # Errors
76 ///
77 /// Returns [`StreamConsumerError`] when the backend cannot start a new consumer, for
78 /// example because a single-subscriber backend already has an active consumer.
79 fn try_subscribe(&self) -> Result<impl Subscription, StreamConsumerError>;
80}
81
82/// Control flag to indicate whether processing should continue or break.
83///
84/// Returning `Break` from an `Inspector`/`Consumer` will let that instance stop visiting any
85/// more data.
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum Next {
88 /// Interested in receiving additional data.
89 Continue,
90
91 /// Not interested in receiving additional data. Will let the `inspector`/`collector` stop.
92 Break,
93}