tokio_process_tools/output_stream/visitor.rs
1//! Visitor traits, the core abstraction every stream observer builds against.
2//!
3//! [`StreamVisitor`] and [`AsyncStreamVisitor`] describe what a chunk-level observer is, without
4//! committing to a runtime: methods are plain `&mut self` calls (synchronous) or return-position
5//! `impl Future` (asynchronous), and the trait bounds are `Send + 'static` only.
6//!
7//! The tokio-bound machinery that actually drives a visitor (task spawning, cooperative
8//! cancellation, the `Consumer<S>` handle) lives in [`crate::output_stream::consumer`]. Built-in
9//! implementations (`collect`, `inspect`, `wait`, `write`) live in
10//! [`crate::output_stream::visitors`]. User code can implement these traits directly and pass
11//! the visitor to [`Consumable::consume`](crate::Consumable::consume) /
12//! [`Consumable::consume_async`](crate::Consumable::consume_async) on any backend without
13//! touching the built-ins.
14
15use crate::output_stream::Next;
16use crate::output_stream::event::Chunk;
17use std::future::Future;
18
19/// A synchronous visitor that observes stream events and produces a final value.
20///
21/// `StreamVisitor` is the synchronous counterpart to [`AsyncStreamVisitor`]. Implement it on a
22/// type that needs to react to chunks, gaps, and EOF without `.await`-ing between events, then
23/// drive it via `consume` to obtain a [`Consumer`](crate::Consumer) handle that owns the
24/// resulting tokio task.
25///
26/// The built-in visitors under [`crate::visitors`] all implement this trait (or its async
27/// counterpart [`AsyncStreamVisitor`]). Implement it on your own type to plug in custom
28/// chunk-level logic without wrapping a closure in shared mutable state.
29///
30/// # Lifecycle
31///
32/// 1. [`on_chunk`](StreamVisitor::on_chunk) is invoked for every observed chunk. Return
33/// [`Next::Continue`] to keep going or [`Next::Break`] to stop early.
34/// 2. [`on_gap`](StreamVisitor::on_gap) is invoked when the stream backend reports that chunks
35/// were dropped (e.g., best-effort delivery overflow). Use it to reset partial-line buffers
36/// or other accumulated state.
37/// 3. [`on_eof`](StreamVisitor::on_eof) is invoked exactly once when the stream ends naturally.
38/// It is *not* invoked when the visitor returned [`Next::Break`], nor when the consumer task
39/// is cancelled or aborted.
40/// 4. [`into_output`](StreamVisitor::into_output) consumes `self` and returns the value the
41/// [`Consumer`](crate::Consumer)'s `wait`/`cancel` methods yield.
42///
43/// # Example
44///
45/// ```rust, no_run
46/// # use tokio_process_tools::{Chunk, Next, StreamVisitor};
47/// /// Counts chunks and stops after `limit`.
48/// struct CountUntil { count: usize, limit: usize }
49///
50/// impl StreamVisitor for CountUntil {
51/// type Output = usize;
52///
53/// fn on_chunk(&mut self, _chunk: Chunk) -> Next {
54/// self.count += 1;
55/// if self.count >= self.limit { Next::Break } else { Next::Continue }
56/// }
57///
58/// fn into_output(self) -> usize { self.count }
59/// }
60/// ```
61pub trait StreamVisitor: Send + 'static {
62 /// The value produced by [`into_output`](StreamVisitor::into_output) after the visitor has
63 /// finished observing the stream. Returned via [`Consumer::wait`](crate::Consumer::wait) and
64 /// [`Consumer::cancel`](crate::Consumer::cancel).
65 type Output: Send + 'static;
66
67 /// Invoked for every chunk observed on the stream.
68 ///
69 /// Return [`Next::Continue`] to keep visiting, or [`Next::Break`] to stop without consuming
70 /// further events; in the latter case [`on_eof`](StreamVisitor::on_eof) is not called.
71 fn on_chunk(&mut self, chunk: Chunk) -> Next;
72
73 /// Invoked when the stream backend reports that one or more chunks were dropped between the
74 /// last delivered chunk and the next one.
75 ///
76 /// The default implementation does nothing. Override it to reset any partial-line buffers or
77 /// other accumulated state that would be invalidated by the gap.
78 ///
79 /// Whether gaps can occur depends on the guarantees chosen for the backend.
80 fn on_gap(&mut self) {}
81
82 /// Invoked exactly once when the stream ends (EOF or write side dropped).
83 ///
84 /// Not called when the visitor returned [`Next::Break`] from
85 /// [`on_chunk`](StreamVisitor::on_chunk), nor when the consumer task is cancelled or aborted
86 /// before the stream ends.
87 fn on_eof(&mut self) {}
88
89 /// Consumes the visitor and returns its final output.
90 ///
91 /// Called after the visitor has stopped observing events (via EOF, `Break`, or cancellation).
92 /// The returned value is what the owning [`Consumer`](crate::Consumer)'s `wait`/`cancel`
93 /// methods yield.
94 #[must_use]
95 fn into_output(self) -> Self::Output;
96}
97
98/// An asynchronous visitor that observes stream events and produces a final value.
99///
100/// `AsyncStreamVisitor` is the asynchronous counterpart to [`StreamVisitor`]. Use it when
101/// observing a chunk needs to `.await` (network I/O, async writers, channel sends).
102///
103/// The trait uses return-position `impl Future` rather than `async fn` to keep the `Send` bound
104/// on the returned future expressible on stable Rust. See [`StreamVisitor`] for the lifecycle
105/// description; the only difference is that `on_chunk` and `on_eof` are async.
106///
107/// # Example
108///
109/// ```rust, no_run
110/// # use std::future::Future;
111/// # use tokio_process_tools::{AsyncStreamVisitor, Chunk, Next};
112/// /// Forwards every chunk to an mpsc channel.
113/// struct ForwardChunks { tx: tokio::sync::mpsc::Sender<Vec<u8>> }
114///
115/// impl AsyncStreamVisitor for ForwardChunks {
116/// type Output = ();
117///
118/// async fn on_chunk(&mut self, chunk: Chunk) -> Next {
119/// match self.tx.send(chunk.as_ref().to_vec()).await {
120/// Ok(()) => Next::Continue,
121/// Err(_) => Next::Break,
122/// }
123/// }
124///
125/// fn into_output(self) {}
126/// }
127/// ```
128pub trait AsyncStreamVisitor: Send + 'static {
129 /// The value produced by [`into_output`](AsyncStreamVisitor::into_output) after the visitor
130 /// has finished observing the stream. Returned via [`Consumer::wait`](crate::Consumer::wait)
131 /// and [`Consumer::cancel`](crate::Consumer::cancel).
132 type Output: Send + 'static;
133
134 /// Asynchronously observes a single chunk.
135 ///
136 /// Return [`Next::Continue`] to keep visiting, or [`Next::Break`] to stop without consuming
137 /// further events; in the latter case [`on_eof`](AsyncStreamVisitor::on_eof) is not called.
138 fn on_chunk(&mut self, chunk: Chunk) -> impl Future<Output = Next> + Send + '_;
139
140 /// Invoked when the stream backend reports that one or more chunks were dropped between the
141 /// last delivered chunk and the next one.
142 ///
143 /// Synchronous because gap notification carries no payload to await on. The default
144 /// implementation does nothing.
145 fn on_gap(&mut self) {}
146
147 /// Asynchronously observes end-of-stream.
148 ///
149 /// Not called when the visitor returned [`Next::Break`] from
150 /// [`on_chunk`](AsyncStreamVisitor::on_chunk), nor when the consumer task is cancelled or
151 /// aborted before the stream ends. The default implementation is a no-op.
152 fn on_eof(&mut self) -> impl Future<Output = ()> + Send + '_ {
153 async {}
154 }
155
156 /// Consumes the visitor and returns its final output.
157 ///
158 /// Called after the visitor has stopped observing events. Synchronous because no further
159 /// stream interaction is required at this point.
160 #[must_use]
161 fn into_output(self) -> Self::Output;
162}