Skip to main content

tokio_process_tools/output_stream/
visitor.rs

1//! Visitor traits, the core abstraction every stream observer builds against.
2//!
3//! [`StreamVisitor`] and [`AsyncStreamVisitor`] describe what a chunk-level observer is, without
4//! committing to a runtime: methods are plain `&mut self` calls (synchronous) or return-position
5//! `impl Future` (asynchronous), and the trait bounds are `Send + 'static` only.
6//!
7//! The tokio-bound machinery that actually drives a visitor (task spawning, cooperative
8//! cancellation, the `Consumer<S>` handle) lives in [`crate::output_stream::consumer`]. Built-in
9//! implementations (`collect`, `inspect`, `wait`, `write`) live in
10//! [`crate::output_stream::visitors`]. User code can implement these traits directly and pass
11//! the visitor to [`Consumable::consume`](crate::Consumable::consume) /
12//! [`Consumable::consume_async`](crate::Consumable::consume_async) on any backend without
13//! touching the built-ins.
14
15use crate::output_stream::Next;
16use crate::output_stream::event::Chunk;
17use std::future::Future;
18
19/// A synchronous visitor that observes stream events and produces a final value.
20///
21/// `StreamVisitor` is the synchronous counterpart to [`AsyncStreamVisitor`]. Implement it on a
22/// type that needs to react to chunks, gaps, and EOF without `.await`-ing between events, then
23/// drive it via `consume` to obtain a [`Consumer`](crate::Consumer) handle that owns the
24/// resulting tokio task.
25///
26/// The built-in visitors under [`crate::visitors`] all implement this trait (or its async
27/// counterpart [`AsyncStreamVisitor`]). Implement it on your own type to plug in custom
28/// chunk-level logic without wrapping a closure in shared mutable state.
29///
30/// # Lifecycle
31///
32/// 1. [`on_chunk`](StreamVisitor::on_chunk) is invoked for every observed chunk. Return
33///    [`Next::Continue`] to keep going or [`Next::Break`] to stop early.
34/// 2. [`on_gap`](StreamVisitor::on_gap) is invoked when the stream backend reports that chunks
35///    were dropped (e.g., best-effort delivery overflow). Use it to reset partial-line buffers
36///    or other accumulated state.
37/// 3. [`on_eof`](StreamVisitor::on_eof) is invoked exactly once when the stream ends naturally.
38///    It is *not* invoked when the visitor returned [`Next::Break`], nor when the consumer task
39///    is cancelled or aborted.
40/// 4. [`into_output`](StreamVisitor::into_output) consumes `self` and returns the value the
41///    [`Consumer`](crate::Consumer)'s `wait`/`cancel` methods yield.
42///
43/// # Example
44///
45/// ```rust, no_run
46/// # use tokio_process_tools::{Chunk, Next, StreamVisitor};
47/// /// Counts chunks and stops after `limit`.
48/// struct CountUntil { count: usize, limit: usize }
49///
50/// impl StreamVisitor for CountUntil {
51///     type Output = usize;
52///
53///     fn on_chunk(&mut self, _chunk: Chunk) -> Next {
54///         self.count += 1;
55///         if self.count >= self.limit { Next::Break } else { Next::Continue }
56///     }
57///
58///     fn into_output(self) -> usize { self.count }
59/// }
60/// ```
61pub trait StreamVisitor: Send + 'static {
62    /// The value produced by [`into_output`](StreamVisitor::into_output) after the visitor has
63    /// finished observing the stream. Returned via [`Consumer::wait`](crate::Consumer::wait) and
64    /// [`Consumer::cancel`](crate::Consumer::cancel).
65    type Output: Send + 'static;
66
67    /// Invoked for every chunk observed on the stream.
68    ///
69    /// Return [`Next::Continue`] to keep visiting, or [`Next::Break`] to stop without consuming
70    /// further events; in the latter case [`on_eof`](StreamVisitor::on_eof) is not called.
71    fn on_chunk(&mut self, chunk: Chunk) -> Next;
72
73    /// Invoked when the stream backend reports that one or more chunks were dropped between the
74    /// last delivered chunk and the next one.
75    ///
76    /// The default implementation does nothing. Override it to reset any partial-line buffers or
77    /// other accumulated state that would be invalidated by the gap.
78    ///
79    /// Whether gaps can occur depends on the guarantees chosen for the backend.
80    fn on_gap(&mut self) {}
81
82    /// Invoked exactly once when the stream ends (EOF or write side dropped).
83    ///
84    /// Not called when the visitor returned [`Next::Break`] from
85    /// [`on_chunk`](StreamVisitor::on_chunk), nor when the consumer task is cancelled or aborted
86    /// before the stream ends.
87    fn on_eof(&mut self) {}
88
89    /// Consumes the visitor and returns its final output.
90    ///
91    /// Called after the visitor has stopped observing events (via EOF, `Break`, or cancellation).
92    /// The returned value is what the owning [`Consumer`](crate::Consumer)'s `wait`/`cancel`
93    /// methods yield.
94    #[must_use]
95    fn into_output(self) -> Self::Output;
96}
97
98/// An asynchronous visitor that observes stream events and produces a final value.
99///
100/// `AsyncStreamVisitor` is the asynchronous counterpart to [`StreamVisitor`]. Use it when
101/// observing a chunk needs to `.await` (network I/O, async writers, channel sends).
102///
103/// The trait uses return-position `impl Future` rather than `async fn` to keep the `Send` bound
104/// on the returned future expressible on stable Rust. See [`StreamVisitor`] for the lifecycle
105/// description; the only difference is that `on_chunk` and `on_eof` are async.
106///
107/// # Example
108///
109/// ```rust, no_run
110/// # use std::future::Future;
111/// # use tokio_process_tools::{AsyncStreamVisitor, Chunk, Next};
112/// /// Forwards every chunk to an mpsc channel.
113/// struct ForwardChunks { tx: tokio::sync::mpsc::Sender<Vec<u8>> }
114///
115/// impl AsyncStreamVisitor for ForwardChunks {
116///     type Output = ();
117///
118///     async fn on_chunk(&mut self, chunk: Chunk) -> Next {
119///         match self.tx.send(chunk.as_ref().to_vec()).await {
120///             Ok(()) => Next::Continue,
121///             Err(_) => Next::Break,
122///         }
123///     }
124///
125///     fn into_output(self) {}
126/// }
127/// ```
128pub trait AsyncStreamVisitor: Send + 'static {
129    /// The value produced by [`into_output`](AsyncStreamVisitor::into_output) after the visitor
130    /// has finished observing the stream. Returned via [`Consumer::wait`](crate::Consumer::wait)
131    /// and [`Consumer::cancel`](crate::Consumer::cancel).
132    type Output: Send + 'static;
133
134    /// Asynchronously observes a single chunk.
135    ///
136    /// Return [`Next::Continue`] to keep visiting, or [`Next::Break`] to stop without consuming
137    /// further events; in the latter case [`on_eof`](AsyncStreamVisitor::on_eof) is not called.
138    fn on_chunk(&mut self, chunk: Chunk) -> impl Future<Output = Next> + Send + '_;
139
140    /// Invoked when the stream backend reports that one or more chunks were dropped between the
141    /// last delivered chunk and the next one.
142    ///
143    /// Synchronous because gap notification carries no payload to await on. The default
144    /// implementation does nothing.
145    fn on_gap(&mut self) {}
146
147    /// Asynchronously observes end-of-stream.
148    ///
149    /// Not called when the visitor returned [`Next::Break`] from
150    /// [`on_chunk`](AsyncStreamVisitor::on_chunk), nor when the consumer task is cancelled or
151    /// aborted before the stream ends. The default implementation is a no-op.
152    fn on_eof(&mut self) -> impl Future<Output = ()> + Send + '_ {
153        async {}
154    }
155
156    /// Consumes the visitor and returns its final output.
157    ///
158    /// Called after the visitor has stopped observing events. Synchronous because no further
159    /// stream interaction is required at this point.
160    #[must_use]
161    fn into_output(self) -> Self::Output;
162}