1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
//! Visitor traits — the core abstraction every stream observer builds against.
//!
//! [`StreamVisitor`] and [`AsyncStreamVisitor`] describe what a chunk-level observer is, without
//! committing to a runtime: methods are plain `&mut self` calls (synchronous) or return-position
//! `impl Future` (asynchronous), and the trait bounds are `Send + 'static` only.
//!
//! The tokio-bound machinery that actually drives a visitor — task spawning, cooperative
//! cancellation, the `Consumer<S>` handle — lives in [`crate::output_stream::consumer`]. Built-in
//! implementations (`collect`, `inspect`, `wait`, `write`) live in
//! [`crate::output_stream::visitors`]. User code can implement these traits directly and pass
//! the visitor to `consume_with(...)` / `consume_with_async(...)` on any backend without
//! touching the built-ins.
use crateNext;
use crateChunk;
use Future;
/// A synchronous visitor that observes stream events and produces a final value.
///
/// `StreamVisitor` is the synchronous counterpart to [`AsyncStreamVisitor`]. Implement it on a
/// type that needs to react to chunks, gaps, and EOF without `.await`-ing between events, then
/// drive it via `consume_with` to obtain a [`Consumer`](crate::Consumer) handle that owns the
/// resulting tokio task.
///
/// All built-in consumer factories (`inspect_*`, `collect_*`, `wait_for_line`) construct a
/// built-in visitor and call `consume_with` internally; this trait is what users implement to
/// plug in custom logic without wrapping a closure in shared mutable state.
///
/// # Lifecycle
///
/// 1. [`on_chunk`](StreamVisitor::on_chunk) is invoked for every observed chunk. Return
/// [`Next::Continue`] to keep going or [`Next::Break`] to stop early.
/// 2. [`on_gap`](StreamVisitor::on_gap) is invoked when the stream backend reports that chunks
/// were dropped (e.g., best-effort delivery overflow). Use it to reset partial-line buffers
/// or other accumulated state.
/// 3. [`on_eof`](StreamVisitor::on_eof) is invoked exactly once when the stream ends naturally.
/// It is *not* invoked when the visitor returned [`Next::Break`], nor when the consumer task
/// is cancelled or aborted.
/// 4. [`into_output`](StreamVisitor::into_output) consumes `self` and returns the value the
/// [`Consumer`](crate::Consumer)'s `wait`/`cancel` methods yield.
///
/// # Example
///
/// ```rust, no_run
/// # use tokio_process_tools::{Chunk, Next, StreamVisitor};
/// /// Counts chunks and stops after `limit`.
/// struct CountUntil { count: usize, limit: usize }
///
/// impl StreamVisitor for CountUntil {
/// type Output = usize;
///
/// fn on_chunk(&mut self, _chunk: Chunk) -> Next {
/// self.count += 1;
/// if self.count >= self.limit { Next::Break } else { Next::Continue }
/// }
///
/// fn into_output(self) -> usize { self.count }
/// }
/// ```
/// An asynchronous visitor that observes stream events and produces a final value.
///
/// `AsyncStreamVisitor` is the asynchronous counterpart to [`StreamVisitor`]. Use it when
/// observing a chunk needs to `.await` (network I/O, async writers, channel sends).
///
/// The trait uses return-position `impl Future` rather than `async fn` to keep the `Send` bound
/// on the returned future expressible on stable Rust; this is the same shape used by
/// [`AsyncChunkCollector`](crate::AsyncChunkCollector) and
/// [`AsyncLineCollector`](crate::AsyncLineCollector). See [`StreamVisitor`] for the lifecycle
/// description; the only difference is that `on_chunk` and `on_eof` are async.
///
/// # Example
///
/// ```rust, no_run
/// # use std::future::Future;
/// # use tokio_process_tools::{AsyncStreamVisitor, Chunk, Next};
/// /// Forwards every chunk to an mpsc channel.
/// struct ForwardChunks { tx: tokio::sync::mpsc::Sender<Vec<u8>> }
///
/// impl AsyncStreamVisitor for ForwardChunks {
/// type Output = ();
///
/// async fn on_chunk(&mut self, chunk: Chunk) -> Next {
/// match self.tx.send(chunk.as_ref().to_vec()).await {
/// Ok(()) => Next::Continue,
/// Err(_) => Next::Break,
/// }
/// }
///
/// fn into_output(self) {}
/// }
/// ```