tokio_process_tools/process/
stream_config.rs1use crate::output_stream::OutputStream;
2use crate::output_stream::backend::broadcast::BroadcastOutputStream;
3use crate::output_stream::backend::discard::DiscardedOutputStream;
4use crate::output_stream::backend::single_subscriber::SingleSubscriberOutputStream;
5use crate::output_stream::config::{
6 StreamConfig, StreamConfigBuilder, StreamConfigMaxBufferedChunksBuilder,
7 StreamConfigReadChunkSizeBuilder, StreamConfigReadyBuilder, StreamConfigReplayBuilder,
8};
9use crate::output_stream::policy::{
10 BestEffortDelivery, Delivery, ReliableDelivery, Replay, ReplayEnabled,
11};
12use std::marker::PhantomData;
13use std::process::Stdio;
14use tokio::io::AsyncRead;
15
16mod process_stream_config {
17 use super::OutputStream;
18 use std::process::Stdio;
19 use tokio::io::AsyncRead;
20
21 pub trait Sealed<Stream>
22 where
23 Stream: OutputStream,
24 {
25 fn child_stdio(&self) -> Stdio;
30
31 fn into_stream<S>(self, captured: Option<S>, stream_name: &'static str) -> Stream
35 where
36 S: AsyncRead + Unpin + Send + 'static;
37 }
38}
39
40pub trait ProcessStreamConfig<Stream>: process_stream_config::Sealed<Stream>
45where
46 Stream: OutputStream,
47{
48}
49
50impl<Config, Stream> ProcessStreamConfig<Stream> for Config
51where
52 Config: process_stream_config::Sealed<Stream>,
53 Stream: OutputStream,
54{
55}
56
57impl<D, R> process_stream_config::Sealed<BroadcastOutputStream<D, R>>
58 for PipedStreamConfig<BroadcastBackend, StreamConfigReadyBuilder<D, R>>
59where
60 D: Delivery,
61 R: Replay,
62{
63 fn child_stdio(&self) -> Stdio {
64 Stdio::piped()
65 }
66
67 fn into_stream<S>(
68 self,
69 captured: Option<S>,
70 stream_name: &'static str,
71 ) -> BroadcastOutputStream<D, R>
72 where
73 S: AsyncRead + Unpin + Send + 'static,
74 {
75 let stream = captured.expect(
76 "broadcast backend requires a captured pipe; child_stdio() promised Stdio::piped()",
77 );
78 BroadcastOutputStream::from_stream(stream, stream_name, self.stage.build())
79 }
80}
81
82impl<D, R> process_stream_config::Sealed<SingleSubscriberOutputStream<D, R>>
83 for PipedStreamConfig<SingleSubscriberBackend, StreamConfigReadyBuilder<D, R>>
84where
85 D: Delivery,
86 R: Replay,
87{
88 fn child_stdio(&self) -> Stdio {
89 Stdio::piped()
90 }
91
92 fn into_stream<S>(
93 self,
94 captured: Option<S>,
95 stream_name: &'static str,
96 ) -> SingleSubscriberOutputStream<D, R>
97 where
98 S: AsyncRead + Unpin + Send + 'static,
99 {
100 let stream = captured.expect(
101 "single-subscriber backend requires a captured pipe; child_stdio() promised \
102 Stdio::piped()",
103 );
104 SingleSubscriberOutputStream::from_stream(stream, stream_name, self.stage.build())
105 }
106}
107
108impl process_stream_config::Sealed<DiscardedOutputStream> for DiscardedStreamConfig {
109 fn child_stdio(&self) -> Stdio {
110 Stdio::null()
111 }
112
113 fn into_stream<S>(
114 self,
115 _captured: Option<S>,
116 stream_name: &'static str,
117 ) -> DiscardedOutputStream
118 where
119 S: AsyncRead + Unpin + Send + 'static,
120 {
121 DiscardedOutputStream::new(stream_name)
122 }
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub struct ProcessStreamBuilder;
133
134#[doc(hidden)]
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub struct BroadcastBackend;
137
138#[doc(hidden)]
139#[derive(Debug, Clone, Copy, PartialEq, Eq)]
140pub struct SingleSubscriberBackend;
141
142#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub struct DiscardedStreamConfig;
147
148#[doc(hidden)]
149#[derive(Debug, Clone, Copy, PartialEq, Eq)]
150pub struct PipedStreamConfig<Backend, Stage> {
151 stage: Stage,
152 _backend: PhantomData<Backend>,
153}
154
155impl<Backend, Stage> PipedStreamConfig<Backend, Stage> {
156 fn new(stage: Stage) -> Self {
157 Self {
158 stage,
159 _backend: PhantomData,
160 }
161 }
162}
163
164impl<Backend> PipedStreamConfig<Backend, StreamConfigBuilder> {
165 #[must_use]
167 pub fn best_effort_delivery(
168 self,
169 ) -> PipedStreamConfig<Backend, StreamConfigReplayBuilder<BestEffortDelivery>> {
170 PipedStreamConfig::new(self.stage.best_effort_delivery())
171 }
172
173 #[must_use]
175 pub fn reliable_for_active_subscribers(
176 self,
177 ) -> PipedStreamConfig<Backend, StreamConfigReplayBuilder<ReliableDelivery>> {
178 PipedStreamConfig::new(self.stage.reliable_for_active_subscribers())
179 }
180}
181
182impl<Backend, D> PipedStreamConfig<Backend, StreamConfigReplayBuilder<D>>
183where
184 D: Delivery,
185{
186 #[must_use]
188 pub fn no_replay(
189 self,
190 ) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, crate::NoReplay>> {
191 PipedStreamConfig::new(self.stage.no_replay())
192 }
193
194 #[must_use]
196 pub fn replay_last_chunks(
197 self,
198 chunks: usize,
199 ) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, ReplayEnabled>> {
200 PipedStreamConfig::new(self.stage.replay_last_chunks(chunks))
201 }
202
203 #[must_use]
205 pub fn replay_last_bytes(
206 self,
207 bytes: crate::NumBytes,
208 ) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, ReplayEnabled>> {
209 PipedStreamConfig::new(self.stage.replay_last_bytes(bytes))
210 }
211
212 #[must_use]
214 pub fn replay_all(
215 self,
216 ) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, ReplayEnabled>> {
217 PipedStreamConfig::new(self.stage.replay_all())
218 }
219}
220
221impl<Backend, D, R> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, R>>
222where
223 D: Delivery,
224 R: Replay,
225{
226 #[must_use]
228 pub fn read_chunk_size(
229 self,
230 read_chunk_size: crate::NumBytes,
231 ) -> PipedStreamConfig<Backend, StreamConfigMaxBufferedChunksBuilder<D, R>> {
232 PipedStreamConfig::new(self.stage.read_chunk_size(read_chunk_size))
233 }
234}
235
236impl<Backend, D, R> PipedStreamConfig<Backend, StreamConfigMaxBufferedChunksBuilder<D, R>>
237where
238 D: Delivery,
239 R: Replay,
240{
241 #[must_use]
243 pub fn max_buffered_chunks(
244 self,
245 max_buffered_chunks: usize,
246 ) -> PipedStreamConfig<Backend, StreamConfigReadyBuilder<D, R>> {
247 PipedStreamConfig::new(self.stage.max_buffered_chunks(max_buffered_chunks))
248 }
249}
250
251impl ProcessStreamBuilder {
252 #[must_use]
257 pub fn broadcast(self) -> PipedStreamConfig<BroadcastBackend, StreamConfigBuilder> {
258 PipedStreamConfig::new(StreamConfig::builder())
259 }
260
261 #[must_use]
267 pub fn single_subscriber(
268 self,
269 ) -> PipedStreamConfig<SingleSubscriberBackend, StreamConfigBuilder> {
270 PipedStreamConfig::new(StreamConfig::builder())
271 }
272
273 #[must_use]
279 pub fn discard(self) -> DiscardedStreamConfig {
280 DiscardedStreamConfig
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use crate::{NumBytes, NumBytesExt};
288 use assertr::prelude::*;
289
290 mod read_chunk_size {
291 use super::*;
292
293 #[test]
294 fn panics_on_zero_value() {
295 assert_that_panic_by(|| {
296 let _config = ProcessStreamBuilder
297 .single_subscriber()
298 .best_effort_delivery()
299 .no_replay()
300 .read_chunk_size(NumBytes::zero())
301 .max_buffered_chunks(1);
302 })
303 .has_type::<String>()
304 .is_equal_to("read_chunk_size must be greater than zero bytes");
305 }
306 }
307
308 mod max_buffered_chunks {
309 use super::*;
310
311 #[test]
312 fn panics_on_zero_for_single_subscriber() {
313 assert_that_panic_by(|| {
314 let _config = ProcessStreamBuilder
315 .single_subscriber()
316 .best_effort_delivery()
317 .no_replay()
318 .read_chunk_size(8.bytes())
319 .max_buffered_chunks(0);
320 })
321 .has_type::<String>()
322 .is_equal_to("max_buffered_chunks must be greater than zero");
323 }
324
325 #[test]
326 fn panics_on_zero_for_broadcast() {
327 assert_that_panic_by(|| {
328 let _config = ProcessStreamBuilder
329 .broadcast()
330 .best_effort_delivery()
331 .no_replay()
332 .read_chunk_size(8.bytes())
333 .max_buffered_chunks(0);
334 })
335 .has_type::<String>()
336 .is_equal_to("max_buffered_chunks must be greater than zero");
337 }
338 }
339}