tokio_process_tools/output_stream/visitor.rs
1//! Visitor traits — the core abstraction every stream observer builds against.
2//!
3//! [`StreamVisitor`] and [`AsyncStreamVisitor`] describe what a chunk-level observer is, without
4//! committing to a runtime: methods are plain `&mut self` calls (synchronous) or return-position
5//! `impl Future` (asynchronous), and the trait bounds are `Send + 'static` only.
6//!
7//! The tokio-bound machinery that actually drives a visitor — task spawning, cooperative
8//! cancellation, the `Consumer<S>` handle — lives in [`crate::output_stream::consumer`]. Built-in
9//! implementations (`collect`, `inspect`, `wait`, `write`) live in
10//! [`crate::output_stream::visitors`]. User code can implement these traits directly and pass
11//! the visitor to `consume_with(...)` / `consume_with_async(...)` on any backend without
12//! touching the built-ins.
13
14use crate::output_stream::Next;
15use crate::output_stream::event::Chunk;
16use std::future::Future;
17
18/// A synchronous visitor that observes stream events and produces a final value.
19///
20/// `StreamVisitor` is the synchronous counterpart to [`AsyncStreamVisitor`]. Implement it on a
21/// type that needs to react to chunks, gaps, and EOF without `.await`-ing between events, then
22/// drive it via `consume_with` to obtain a [`Consumer`](crate::Consumer) handle that owns the
23/// resulting tokio task.
24///
25/// All built-in consumer factories (`inspect_*`, `collect_*`, `wait_for_line`) construct a
26/// built-in visitor and call `consume_with` internally; this trait is what users implement to
27/// plug in custom logic without wrapping a closure in shared mutable state.
28///
29/// # Lifecycle
30///
31/// 1. [`on_chunk`](StreamVisitor::on_chunk) is invoked for every observed chunk. Return
32/// [`Next::Continue`] to keep going or [`Next::Break`] to stop early.
33/// 2. [`on_gap`](StreamVisitor::on_gap) is invoked when the stream backend reports that chunks
34/// were dropped (e.g., best-effort delivery overflow). Use it to reset partial-line buffers
35/// or other accumulated state.
36/// 3. [`on_eof`](StreamVisitor::on_eof) is invoked exactly once when the stream ends naturally.
37/// It is *not* invoked when the visitor returned [`Next::Break`], nor when the consumer task
38/// is cancelled or aborted.
39/// 4. [`into_output`](StreamVisitor::into_output) consumes `self` and returns the value the
40/// [`Consumer`](crate::Consumer)'s `wait`/`cancel` methods yield.
41///
42/// # Example
43///
44/// ```rust, no_run
45/// # use tokio_process_tools::{Chunk, Next, StreamVisitor};
46/// /// Counts chunks and stops after `limit`.
47/// struct CountUntil { count: usize, limit: usize }
48///
49/// impl StreamVisitor for CountUntil {
50/// type Output = usize;
51///
52/// fn on_chunk(&mut self, _chunk: Chunk) -> Next {
53/// self.count += 1;
54/// if self.count >= self.limit { Next::Break } else { Next::Continue }
55/// }
56///
57/// fn into_output(self) -> usize { self.count }
58/// }
59/// ```
60pub trait StreamVisitor: Send + 'static {
61 /// The value produced by [`into_output`](StreamVisitor::into_output) after the visitor has
62 /// finished observing the stream. Returned via [`Consumer::wait`](crate::Consumer::wait) and
63 /// [`Consumer::cancel`](crate::Consumer::cancel).
64 type Output: Send + 'static;
65
66 /// Invoked for every chunk observed on the stream.
67 ///
68 /// Return [`Next::Continue`] to keep visiting, or [`Next::Break`] to stop without consuming
69 /// further events; in the latter case [`on_eof`](StreamVisitor::on_eof) is not called.
70 fn on_chunk(&mut self, chunk: Chunk) -> Next;
71
72 /// Invoked when the stream backend reports that one or more chunks were dropped between the
73 /// last delivered chunk and the next one.
74 ///
75 /// The default implementation does nothing. Override it to reset any partial-line buffers or
76 /// other accumulated state that would be invalidated by the gap.
77 ///
78 /// Whether gaps can occur depends on the guarantees chosen for the backend.
79 fn on_gap(&mut self) {}
80
81 /// Invoked exactly once when the stream ends (EOF or write side dropped).
82 ///
83 /// Not called when the visitor returned [`Next::Break`] from
84 /// [`on_chunk`](StreamVisitor::on_chunk), nor when the consumer task is cancelled or aborted
85 /// before the stream ends.
86 fn on_eof(&mut self) {}
87
88 /// Consumes the visitor and returns its final output.
89 ///
90 /// Called after the visitor has stopped observing events (via EOF, `Break`, or cancellation).
91 /// The returned value is what the owning [`Consumer`](crate::Consumer)'s `wait`/`cancel`
92 /// methods yield.
93 fn into_output(self) -> Self::Output;
94}
95
96/// An asynchronous visitor that observes stream events and produces a final value.
97///
98/// `AsyncStreamVisitor` is the asynchronous counterpart to [`StreamVisitor`]. Use it when
99/// observing a chunk needs to `.await` (network I/O, async writers, channel sends).
100///
101/// The trait uses return-position `impl Future` rather than `async fn` to keep the `Send` bound
102/// on the returned future expressible on stable Rust; this is the same shape used by
103/// [`AsyncChunkCollector`](crate::AsyncChunkCollector) and
104/// [`AsyncLineCollector`](crate::AsyncLineCollector). See [`StreamVisitor`] for the lifecycle
105/// description; the only difference is that `on_chunk` and `on_eof` are async.
106///
107/// # Example
108///
109/// ```rust, no_run
110/// # use std::future::Future;
111/// # use tokio_process_tools::{AsyncStreamVisitor, Chunk, Next};
112/// /// Forwards every chunk to an mpsc channel.
113/// struct ForwardChunks { tx: tokio::sync::mpsc::Sender<Vec<u8>> }
114///
115/// impl AsyncStreamVisitor for ForwardChunks {
116/// type Output = ();
117///
118/// async fn on_chunk(&mut self, chunk: Chunk) -> Next {
119/// match self.tx.send(chunk.as_ref().to_vec()).await {
120/// Ok(()) => Next::Continue,
121/// Err(_) => Next::Break,
122/// }
123/// }
124///
125/// fn into_output(self) {}
126/// }
127/// ```
128pub trait AsyncStreamVisitor: Send + 'static {
129 /// The value produced by [`into_output`](AsyncStreamVisitor::into_output) after the visitor
130 /// has finished observing the stream. Returned via [`Consumer::wait`](crate::Consumer::wait)
131 /// and [`Consumer::cancel`](crate::Consumer::cancel).
132 type Output: Send + 'static;
133
134 /// Asynchronously observes a single chunk.
135 ///
136 /// Return [`Next::Continue`] to keep visiting, or [`Next::Break`] to stop without consuming
137 /// further events; in the latter case [`on_eof`](AsyncStreamVisitor::on_eof) is not called.
138 fn on_chunk(&mut self, chunk: Chunk) -> impl Future<Output = Next> + Send + '_;
139
140 /// Invoked when the stream backend reports that one or more chunks were dropped between the
141 /// last delivered chunk and the next one.
142 ///
143 /// Synchronous because gap notification carries no payload to await on. The default
144 /// implementation does nothing.
145 fn on_gap(&mut self) {}
146
147 /// Asynchronously observes end-of-stream.
148 ///
149 /// Not called when the visitor returned [`Next::Break`] from
150 /// [`on_chunk`](AsyncStreamVisitor::on_chunk), nor when the consumer task is cancelled or
151 /// aborted before the stream ends. The default implementation is a no-op.
152 fn on_eof(&mut self) -> impl Future<Output = ()> + Send + '_ {
153 async {}
154 }
155
156 /// Consumes the visitor and returns its final output.
157 ///
158 /// Called after the visitor has stopped observing events. Synchronous because no further
159 /// stream interaction is required at this point.
160 fn into_output(self) -> Self::Output;
161}