Skip to main content

tokio_process_tools/output_stream/
visitor.rs

1//! Visitor traits — the core abstraction every stream observer builds against.
2//!
3//! [`StreamVisitor`] and [`AsyncStreamVisitor`] describe what a chunk-level observer is, without
4//! committing to a runtime: methods are plain `&mut self` calls (synchronous) or return-position
5//! `impl Future` (asynchronous), and the trait bounds are `Send + 'static` only.
6//!
7//! The tokio-bound machinery that actually drives a visitor — task spawning, cooperative
8//! cancellation, the `Consumer<S>` handle — lives in [`crate::output_stream::consumer`]. Built-in
9//! implementations (`collect`, `inspect`, `wait`, `write`) live in
10//! [`crate::output_stream::visitors`]. User code can implement these traits directly and pass
11//! the visitor to `consume_with(...)` / `consume_with_async(...)` on any backend without
12//! touching the built-ins.
13
14use crate::output_stream::Next;
15use crate::output_stream::event::Chunk;
16use std::future::Future;
17
18/// A synchronous visitor that observes stream events and produces a final value.
19///
20/// `StreamVisitor` is the synchronous counterpart to [`AsyncStreamVisitor`]. Implement it on a
21/// type that needs to react to chunks, gaps, and EOF without `.await`-ing between events, then
22/// drive it via `consume_with` to obtain a [`Consumer`](crate::Consumer) handle that owns the
23/// resulting tokio task.
24///
25/// All built-in consumer factories (`inspect_*`, `collect_*`, `wait_for_line`) construct a
26/// built-in visitor and call `consume_with` internally; this trait is what users implement to
27/// plug in custom logic without wrapping a closure in shared mutable state.
28///
29/// # Lifecycle
30///
31/// 1. [`on_chunk`](StreamVisitor::on_chunk) is invoked for every observed chunk. Return
32///    [`Next::Continue`] to keep going or [`Next::Break`] to stop early.
33/// 2. [`on_gap`](StreamVisitor::on_gap) is invoked when the stream backend reports that chunks
34///    were dropped (e.g., best-effort delivery overflow). Use it to reset partial-line buffers
35///    or other accumulated state.
36/// 3. [`on_eof`](StreamVisitor::on_eof) is invoked exactly once when the stream ends naturally.
37///    It is *not* invoked when the visitor returned [`Next::Break`], nor when the consumer task
38///    is cancelled or aborted.
39/// 4. [`into_output`](StreamVisitor::into_output) consumes `self` and returns the value the
40///    [`Consumer`](crate::Consumer)'s `wait`/`cancel` methods yield.
41///
42/// # Example
43///
44/// ```rust, no_run
45/// # use tokio_process_tools::{Chunk, Next, StreamVisitor};
46/// /// Counts chunks and stops after `limit`.
47/// struct CountUntil { count: usize, limit: usize }
48///
49/// impl StreamVisitor for CountUntil {
50///     type Output = usize;
51///
52///     fn on_chunk(&mut self, _chunk: Chunk) -> Next {
53///         self.count += 1;
54///         if self.count >= self.limit { Next::Break } else { Next::Continue }
55///     }
56///
57///     fn into_output(self) -> usize { self.count }
58/// }
59/// ```
60pub trait StreamVisitor: Send + 'static {
61    /// The value produced by [`into_output`](StreamVisitor::into_output) after the visitor has
62    /// finished observing the stream. Returned via [`Consumer::wait`](crate::Consumer::wait) and
63    /// [`Consumer::cancel`](crate::Consumer::cancel).
64    type Output: Send + 'static;
65
66    /// Invoked for every chunk observed on the stream.
67    ///
68    /// Return [`Next::Continue`] to keep visiting, or [`Next::Break`] to stop without consuming
69    /// further events; in the latter case [`on_eof`](StreamVisitor::on_eof) is not called.
70    fn on_chunk(&mut self, chunk: Chunk) -> Next;
71
72    /// Invoked when the stream backend reports that one or more chunks were dropped between the
73    /// last delivered chunk and the next one.
74    ///
75    /// The default implementation does nothing. Override it to reset any partial-line buffers or
76    /// other accumulated state that would be invalidated by the gap.
77    ///
78    /// Whether gaps can occur depends on the guarantees chosen for the backend.
79    fn on_gap(&mut self) {}
80
81    /// Invoked exactly once when the stream ends (EOF or write side dropped).
82    ///
83    /// Not called when the visitor returned [`Next::Break`] from
84    /// [`on_chunk`](StreamVisitor::on_chunk), nor when the consumer task is cancelled or aborted
85    /// before the stream ends.
86    fn on_eof(&mut self) {}
87
88    /// Consumes the visitor and returns its final output.
89    ///
90    /// Called after the visitor has stopped observing events (via EOF, `Break`, or cancellation).
91    /// The returned value is what the owning [`Consumer`](crate::Consumer)'s `wait`/`cancel`
92    /// methods yield.
93    fn into_output(self) -> Self::Output;
94}
95
96/// An asynchronous visitor that observes stream events and produces a final value.
97///
98/// `AsyncStreamVisitor` is the asynchronous counterpart to [`StreamVisitor`]. Use it when
99/// observing a chunk needs to `.await` (network I/O, async writers, channel sends).
100///
101/// The trait uses return-position `impl Future` rather than `async fn` to keep the `Send` bound
102/// on the returned future expressible on stable Rust; this is the same shape used by
103/// [`AsyncChunkCollector`](crate::AsyncChunkCollector) and
104/// [`AsyncLineCollector`](crate::AsyncLineCollector). See [`StreamVisitor`] for the lifecycle
105/// description; the only difference is that `on_chunk` and `on_eof` are async.
106///
107/// # Example
108///
109/// ```rust, no_run
110/// # use std::future::Future;
111/// # use tokio_process_tools::{AsyncStreamVisitor, Chunk, Next};
112/// /// Forwards every chunk to an mpsc channel.
113/// struct ForwardChunks { tx: tokio::sync::mpsc::Sender<Vec<u8>> }
114///
115/// impl AsyncStreamVisitor for ForwardChunks {
116///     type Output = ();
117///
118///     async fn on_chunk(&mut self, chunk: Chunk) -> Next {
119///         match self.tx.send(chunk.as_ref().to_vec()).await {
120///             Ok(()) => Next::Continue,
121///             Err(_) => Next::Break,
122///         }
123///     }
124///
125///     fn into_output(self) {}
126/// }
127/// ```
128pub trait AsyncStreamVisitor: Send + 'static {
129    /// The value produced by [`into_output`](AsyncStreamVisitor::into_output) after the visitor
130    /// has finished observing the stream. Returned via [`Consumer::wait`](crate::Consumer::wait)
131    /// and [`Consumer::cancel`](crate::Consumer::cancel).
132    type Output: Send + 'static;
133
134    /// Asynchronously observes a single chunk.
135    ///
136    /// Return [`Next::Continue`] to keep visiting, or [`Next::Break`] to stop without consuming
137    /// further events; in the latter case [`on_eof`](AsyncStreamVisitor::on_eof) is not called.
138    fn on_chunk(&mut self, chunk: Chunk) -> impl Future<Output = Next> + Send + '_;
139
140    /// Invoked when the stream backend reports that one or more chunks were dropped between the
141    /// last delivered chunk and the next one.
142    ///
143    /// Synchronous because gap notification carries no payload to await on. The default
144    /// implementation does nothing.
145    fn on_gap(&mut self) {}
146
147    /// Asynchronously observes end-of-stream.
148    ///
149    /// Not called when the visitor returned [`Next::Break`] from
150    /// [`on_chunk`](AsyncStreamVisitor::on_chunk), nor when the consumer task is cancelled or
151    /// aborted before the stream ends. The default implementation is a no-op.
152    fn on_eof(&mut self) -> impl Future<Output = ()> + Send + '_ {
153        async {}
154    }
155
156    /// Consumes the visitor and returns its final output.
157    ///
158    /// Called after the visitor has stopped observing events. Synchronous because no further
159    /// stream interaction is required at this point.
160    fn into_output(self) -> Self::Output;
161}