tokio_process_tools/process/
stream_config.rs1use crate::output_stream::OutputStream;
2use crate::output_stream::backend::broadcast::BroadcastOutputStream;
3use crate::output_stream::backend::discard::DiscardedOutputStream;
4use crate::output_stream::backend::single_subscriber::SingleSubscriberOutputStream;
5use crate::output_stream::config::{
6 StreamConfig, StreamConfigBuilder, StreamConfigMaxBufferedChunksBuilder,
7 StreamConfigReadChunkSizeBuilder, StreamConfigReadyBuilder, StreamConfigReplayBuilder,
8};
9use crate::output_stream::policy::{
10 Delivery, LossyWithoutBackpressure, ReliableWithBackpressure, Replay, ReplayEnabled,
11};
12use std::marker::PhantomData;
13use std::process::Stdio;
14use tokio::io::AsyncRead;
15
16mod process_stream_config {
17 use super::OutputStream;
18 use std::process::Stdio;
19 use tokio::io::AsyncRead;
20
21 pub trait Sealed<Stream>
22 where
23 Stream: OutputStream,
24 {
25 fn child_stdio(&self) -> Stdio;
30
31 fn into_stream<S>(self, captured: Option<S>, stream_name: &'static str) -> Stream
35 where
36 S: AsyncRead + Unpin + Send + 'static;
37 }
38}
39
40pub trait ProcessStreamConfig<Stream>: process_stream_config::Sealed<Stream>
45where
46 Stream: OutputStream,
47{
48}
49
50impl<Config, Stream> ProcessStreamConfig<Stream> for Config
51where
52 Config: process_stream_config::Sealed<Stream>,
53 Stream: OutputStream,
54{
55}
56
57impl<D, R> process_stream_config::Sealed<BroadcastOutputStream<D, R>>
58 for PipedStreamConfig<BroadcastBackend, StreamConfigReadyBuilder<D, R>>
59where
60 D: Delivery,
61 R: Replay,
62{
63 fn child_stdio(&self) -> Stdio {
64 Stdio::piped()
65 }
66
67 fn into_stream<S>(
68 self,
69 captured: Option<S>,
70 stream_name: &'static str,
71 ) -> BroadcastOutputStream<D, R>
72 where
73 S: AsyncRead + Unpin + Send + 'static,
74 {
75 let stream = captured.expect(
76 "broadcast backend requires a captured pipe; child_stdio() promised Stdio::piped()",
77 );
78 BroadcastOutputStream::from_stream(stream, stream_name, self.stage.build())
79 }
80}
81
82impl<D, R> process_stream_config::Sealed<SingleSubscriberOutputStream<D, R>>
83 for PipedStreamConfig<SingleSubscriberBackend, StreamConfigReadyBuilder<D, R>>
84where
85 D: Delivery,
86 R: Replay,
87{
88 fn child_stdio(&self) -> Stdio {
89 Stdio::piped()
90 }
91
92 fn into_stream<S>(
93 self,
94 captured: Option<S>,
95 stream_name: &'static str,
96 ) -> SingleSubscriberOutputStream<D, R>
97 where
98 S: AsyncRead + Unpin + Send + 'static,
99 {
100 let stream = captured.expect(
101 "single-subscriber backend requires a captured pipe; child_stdio() promised \
102 Stdio::piped()",
103 );
104 SingleSubscriberOutputStream::from_stream(stream, stream_name, self.stage.build())
105 }
106}
107
108impl process_stream_config::Sealed<DiscardedOutputStream> for DiscardedStreamConfig {
109 fn child_stdio(&self) -> Stdio {
110 Stdio::null()
111 }
112
113 fn into_stream<S>(
114 self,
115 _captured: Option<S>,
116 stream_name: &'static str,
117 ) -> DiscardedOutputStream
118 where
119 S: AsyncRead + Unpin + Send + 'static,
120 {
121 DiscardedOutputStream::new(stream_name)
122 }
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub struct ProcessStreamBuilder;
133
134#[doc(hidden)]
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub struct BroadcastBackend;
137
138#[doc(hidden)]
139#[derive(Debug, Clone, Copy, PartialEq, Eq)]
140pub struct SingleSubscriberBackend;
141
142#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub struct DiscardedStreamConfig;
147
148#[doc(hidden)]
149#[derive(Debug, Clone, Copy, PartialEq, Eq)]
150pub struct PipedStreamConfig<Backend, Stage> {
151 stage: Stage,
152 _backend: PhantomData<Backend>,
153}
154
155impl<Backend, Stage> PipedStreamConfig<Backend, Stage> {
156 fn new(stage: Stage) -> Self {
157 Self {
158 stage,
159 _backend: PhantomData,
160 }
161 }
162}
163
164impl<Backend> PipedStreamConfig<Backend, StreamConfigBuilder> {
165 #[must_use]
168 pub fn lossy_without_backpressure(
169 self,
170 ) -> PipedStreamConfig<Backend, StreamConfigReplayBuilder<LossyWithoutBackpressure>> {
171 PipedStreamConfig::new(self.stage.lossy_without_backpressure())
172 }
173
174 #[must_use]
178 pub fn reliable_with_backpressure(
179 self,
180 ) -> PipedStreamConfig<Backend, StreamConfigReplayBuilder<ReliableWithBackpressure>> {
181 PipedStreamConfig::new(self.stage.reliable_with_backpressure())
182 }
183}
184
185impl<Backend, D> PipedStreamConfig<Backend, StreamConfigReplayBuilder<D>>
186where
187 D: Delivery,
188{
189 #[must_use]
191 pub fn no_replay(
192 self,
193 ) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, crate::NoReplay>> {
194 PipedStreamConfig::new(self.stage.no_replay())
195 }
196
197 #[must_use]
199 pub fn replay_last_chunks(
200 self,
201 chunks: usize,
202 ) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, ReplayEnabled>> {
203 PipedStreamConfig::new(self.stage.replay_last_chunks(chunks))
204 }
205
206 #[must_use]
208 pub fn replay_last_bytes(
209 self,
210 bytes: crate::NumBytes,
211 ) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, ReplayEnabled>> {
212 PipedStreamConfig::new(self.stage.replay_last_bytes(bytes))
213 }
214
215 #[must_use]
217 pub fn replay_all(
218 self,
219 ) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, ReplayEnabled>> {
220 PipedStreamConfig::new(self.stage.replay_all())
221 }
222}
223
224impl<Backend, D, R> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, R>>
225where
226 D: Delivery,
227 R: Replay,
228{
229 #[must_use]
231 pub fn read_chunk_size(
232 self,
233 read_chunk_size: crate::NumBytes,
234 ) -> PipedStreamConfig<Backend, StreamConfigMaxBufferedChunksBuilder<D, R>> {
235 PipedStreamConfig::new(self.stage.read_chunk_size(read_chunk_size))
236 }
237}
238
239impl<Backend, D, R> PipedStreamConfig<Backend, StreamConfigMaxBufferedChunksBuilder<D, R>>
240where
241 D: Delivery,
242 R: Replay,
243{
244 #[must_use]
246 pub fn max_buffered_chunks(
247 self,
248 max_buffered_chunks: usize,
249 ) -> PipedStreamConfig<Backend, StreamConfigReadyBuilder<D, R>> {
250 PipedStreamConfig::new(self.stage.max_buffered_chunks(max_buffered_chunks))
251 }
252}
253
254impl ProcessStreamBuilder {
255 #[must_use]
260 pub fn broadcast(self) -> PipedStreamConfig<BroadcastBackend, StreamConfigBuilder> {
261 PipedStreamConfig::new(StreamConfig::builder())
262 }
263
264 #[must_use]
270 pub fn single_subscriber(
271 self,
272 ) -> PipedStreamConfig<SingleSubscriberBackend, StreamConfigBuilder> {
273 PipedStreamConfig::new(StreamConfig::builder())
274 }
275
276 #[must_use]
282 pub fn discard(self) -> DiscardedStreamConfig {
283 DiscardedStreamConfig
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use crate::{NumBytes, NumBytesExt};
291 use assertr::prelude::*;
292
293 mod read_chunk_size {
294 use super::*;
295
296 #[test]
297 fn panics_on_zero_value() {
298 assert_that_panic_by(|| {
299 let _config = ProcessStreamBuilder
300 .single_subscriber()
301 .lossy_without_backpressure()
302 .no_replay()
303 .read_chunk_size(NumBytes::zero())
304 .max_buffered_chunks(1);
305 })
306 .has_type::<String>()
307 .is_equal_to("read_chunk_size must be greater than zero bytes");
308 }
309 }
310
311 mod max_buffered_chunks {
312 use super::*;
313
314 #[test]
315 fn panics_on_zero_for_single_subscriber() {
316 assert_that_panic_by(|| {
317 let _config = ProcessStreamBuilder
318 .single_subscriber()
319 .lossy_without_backpressure()
320 .no_replay()
321 .read_chunk_size(8.bytes())
322 .max_buffered_chunks(0);
323 })
324 .has_type::<String>()
325 .is_equal_to("max_buffered_chunks must be greater than zero");
326 }
327
328 #[test]
329 fn panics_on_zero_for_broadcast() {
330 assert_that_panic_by(|| {
331 let _config = ProcessStreamBuilder
332 .broadcast()
333 .lossy_without_backpressure()
334 .no_replay()
335 .read_chunk_size(8.bytes())
336 .max_buffered_chunks(0);
337 })
338 .has_type::<String>()
339 .is_equal_to("max_buffered_chunks must be greater than zero");
340 }
341 }
342}