tokio_process_tools/output_stream/
config.rs1use crate::NumBytes;
2use crate::output_stream::policy::{
3 Delivery, DeliveryGuarantee, LossyWithoutBackpressure, NoReplay, ReliableWithBackpressure,
4 Replay, ReplayEnabled, ReplayRetention,
5};
6
7pub const DEFAULT_READ_CHUNK_SIZE: NumBytes = NumBytes(16 * 1024); pub const DEFAULT_MAX_BUFFERED_CHUNKS: usize = 128;
12
13pub(crate) fn assert_max_buffered_chunks_non_zero(chunks: usize, parameter_name: &str) {
14 assert!(chunks > 0, "{parameter_name} must be greater than zero");
15}
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub struct StreamConfig<D = LossyWithoutBackpressure, R = NoReplay>
24where
25 D: Delivery,
26 R: Replay,
27{
28 pub read_chunk_size: NumBytes,
32
33 pub max_buffered_chunks: usize,
39
40 pub delivery: D,
42
43 pub replay: R,
45}
46
47impl StreamConfig<LossyWithoutBackpressure, NoReplay> {
48 #[must_use]
50 pub fn builder() -> StreamConfigBuilder {
51 StreamConfigBuilder
52 }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub struct StreamConfigBuilder;
58
59impl StreamConfigBuilder {
60 #[must_use]
70 pub fn lossy_without_backpressure(self) -> StreamConfigReplayBuilder<LossyWithoutBackpressure> {
71 StreamConfigReplayBuilder {
72 delivery: LossyWithoutBackpressure,
73 }
74 }
75
76 #[must_use]
86 pub fn reliable_with_backpressure(self) -> StreamConfigReplayBuilder<ReliableWithBackpressure> {
87 StreamConfigReplayBuilder {
88 delivery: ReliableWithBackpressure,
89 }
90 }
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub struct StreamConfigReplayBuilder<D>
96where
97 D: Delivery,
98{
99 delivery: D,
100}
101
102impl<D> StreamConfigReplayBuilder<D>
103where
104 D: Delivery,
105{
106 #[must_use]
110 pub fn no_replay(self) -> StreamConfigReadChunkSizeBuilder<D, NoReplay> {
111 StreamConfigReadChunkSizeBuilder {
112 delivery: self.delivery,
113 replay: NoReplay,
114 }
115 }
116
117 #[must_use]
119 pub fn replay_last_chunks(
120 self,
121 chunks: usize,
122 ) -> StreamConfigReadChunkSizeBuilder<D, ReplayEnabled> {
123 let replay_retention = ReplayRetention::LastChunks(chunks);
124 replay_retention.assert_non_zero("chunks");
125 StreamConfigReadChunkSizeBuilder {
126 delivery: self.delivery,
127 replay: ReplayEnabled::new(replay_retention),
128 }
129 }
130
131 #[must_use]
133 pub fn replay_last_bytes(
134 self,
135 bytes: NumBytes,
136 ) -> StreamConfigReadChunkSizeBuilder<D, ReplayEnabled> {
137 let replay_retention = ReplayRetention::LastBytes(bytes);
138 replay_retention.assert_non_zero("bytes");
139 StreamConfigReadChunkSizeBuilder {
140 delivery: self.delivery,
141 replay: ReplayEnabled::new(replay_retention),
142 }
143 }
144
145 #[must_use]
154 pub fn replay_all(self) -> StreamConfigReadChunkSizeBuilder<D, ReplayEnabled> {
155 StreamConfigReadChunkSizeBuilder {
156 delivery: self.delivery,
157 replay: ReplayEnabled::new(ReplayRetention::All),
158 }
159 }
160}
161
162#[derive(Debug, Clone, Copy, PartialEq, Eq)]
164pub struct StreamConfigReadChunkSizeBuilder<D, R>
165where
166 D: Delivery,
167 R: Replay,
168{
169 delivery: D,
170 replay: R,
171}
172
173impl<D, R> StreamConfigReadChunkSizeBuilder<D, R>
174where
175 D: Delivery,
176 R: Replay,
177{
178 #[must_use]
184 pub fn read_chunk_size(
185 self,
186 read_chunk_size: NumBytes,
187 ) -> StreamConfigMaxBufferedChunksBuilder<D, R> {
188 read_chunk_size.assert_non_zero("read_chunk_size");
189 StreamConfigMaxBufferedChunksBuilder {
190 delivery: self.delivery,
191 replay: self.replay,
192 read_chunk_size,
193 }
194 }
195}
196
197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
199pub struct StreamConfigMaxBufferedChunksBuilder<D, R>
200where
201 D: Delivery,
202 R: Replay,
203{
204 delivery: D,
205 replay: R,
206 read_chunk_size: NumBytes,
207}
208
209impl<D, R> StreamConfigMaxBufferedChunksBuilder<D, R>
210where
211 D: Delivery,
212 R: Replay,
213{
214 #[must_use]
220 pub fn max_buffered_chunks(self, max_buffered_chunks: usize) -> StreamConfigReadyBuilder<D, R> {
221 assert_max_buffered_chunks_non_zero(max_buffered_chunks, "max_buffered_chunks");
222 StreamConfigReadyBuilder {
223 config: StreamConfig {
224 read_chunk_size: self.read_chunk_size,
225 max_buffered_chunks,
226 delivery: self.delivery,
227 replay: self.replay,
228 },
229 }
230 }
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq)]
235pub struct StreamConfigReadyBuilder<D, R>
236where
237 D: Delivery,
238 R: Replay,
239{
240 config: StreamConfig<D, R>,
241}
242
243impl<D, R> StreamConfigReadyBuilder<D, R>
244where
245 D: Delivery,
246 R: Replay,
247{
248 #[must_use]
250 pub fn build(self) -> StreamConfig<D, R> {
251 self.config
252 }
253}
254
255impl<D, R> StreamConfig<D, R>
256where
257 D: Delivery,
258 R: Replay,
259{
260 #[must_use]
262 pub fn delivery_guarantee(self) -> DeliveryGuarantee {
263 self.delivery.guarantee()
264 }
265
266 #[must_use]
268 pub fn replay_retention(self) -> Option<ReplayRetention> {
269 self.replay.replay_retention()
270 }
271
272 #[must_use]
274 pub fn replay_enabled(self) -> bool {
275 self.replay.replay_enabled()
276 }
277
278 pub(crate) fn assert_valid(self, parameter_name: &str) {
279 self.read_chunk_size
280 .assert_non_zero(&format!("{parameter_name}.read_chunk_size"));
281 assert_max_buffered_chunks_non_zero(
282 self.max_buffered_chunks,
283 &format!("{parameter_name}.max_buffered_chunks"),
284 );
285 if let Some(replay_retention) = self.replay_retention() {
286 replay_retention.assert_non_zero(&format!("{parameter_name}.replay_retention"));
287 }
288 }
289}
290
291impl<D> StreamConfig<D, ReplayEnabled>
292where
293 D: Delivery,
294{
295 #[must_use]
297 pub fn with_replay_retention(mut self, replay_retention: ReplayRetention) -> Self {
298 replay_retention.assert_non_zero("replay_retention");
299 self.replay.replay_retention = replay_retention;
300 self
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use super::*;
307 use crate::output_stream::num_bytes::NumBytesExt;
308 use crate::{DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE};
309 use assertr::prelude::*;
310
311 #[test]
312 fn builder_creates_expected_delivery_and_replay_configs() {
313 let config: StreamConfig<LossyWithoutBackpressure, NoReplay> = StreamConfig::builder()
314 .lossy_without_backpressure()
315 .no_replay()
316 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
317 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
318 .build();
319
320 assert_that!(config.delivery_guarantee())
321 .is_equal_to(DeliveryGuarantee::LossyWithoutBackpressure);
322 assert_that!(config.replay_enabled()).is_false();
323 assert_that!(config.replay_retention()).is_none();
324 assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
325 assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
326
327 let config: StreamConfig<ReliableWithBackpressure, NoReplay> = StreamConfig::builder()
328 .reliable_with_backpressure()
329 .no_replay()
330 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
331 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
332 .build();
333
334 assert_that!(config.delivery_guarantee())
335 .is_equal_to(DeliveryGuarantee::ReliableWithBackpressure);
336 assert_that!(config.replay_enabled()).is_false();
337 assert_that!(config.replay_retention()).is_none();
338 assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
339 assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
340
341 let config: StreamConfig<LossyWithoutBackpressure, ReplayEnabled> = StreamConfig::builder()
342 .lossy_without_backpressure()
343 .replay_last_chunks(2)
344 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
345 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
346 .build();
347
348 assert_that!(config.delivery_guarantee())
349 .is_equal_to(DeliveryGuarantee::LossyWithoutBackpressure);
350 assert_that!(config.replay_enabled()).is_true();
351 assert_that!(config.replay_retention()).is_equal_to(Some(ReplayRetention::LastChunks(2)));
352 assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
353 assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
354
355 let config: StreamConfig<ReliableWithBackpressure, ReplayEnabled> = StreamConfig::builder()
356 .reliable_with_backpressure()
357 .replay_last_bytes(16.bytes())
358 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
359 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
360 .build();
361
362 assert_that!(config.delivery_guarantee())
363 .is_equal_to(DeliveryGuarantee::ReliableWithBackpressure);
364 assert_that!(config.replay_enabled()).is_true();
365 assert_that!(config.replay_retention())
366 .is_equal_to(Some(ReplayRetention::LastBytes(16.bytes())));
367 assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
368 assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
369 }
370
371 #[test]
372 fn invalid_configs_panic_with_parameter_names() {
373 assert_that_panic_by(|| {
374 let _config = StreamConfig::builder()
375 .lossy_without_backpressure()
376 .no_replay()
377 .read_chunk_size(0.bytes());
378 })
379 .has_type::<String>()
380 .is_equal_to("read_chunk_size must be greater than zero bytes");
381
382 assert_that_panic_by(|| {
383 let _config = StreamConfig::builder()
384 .lossy_without_backpressure()
385 .no_replay()
386 .read_chunk_size(8.bytes())
387 .max_buffered_chunks(0);
388 })
389 .has_type::<String>()
390 .is_equal_to("max_buffered_chunks must be greater than zero");
391
392 assert_that_panic_by(|| {
393 let _config = StreamConfig::builder()
394 .lossy_without_backpressure()
395 .replay_last_chunks(0);
396 })
397 .has_type::<String>()
398 .is_equal_to("chunks must retain at least one chunk");
399
400 assert_that_panic_by(|| {
401 let _config = StreamConfig::builder()
402 .lossy_without_backpressure()
403 .replay_last_bytes(NumBytes::zero());
404 })
405 .has_type::<String>()
406 .is_equal_to("bytes must retain at least one byte");
407
408 assert_that_panic_by(|| {
409 let _replay = ReplayEnabled::new(ReplayRetention::LastChunks(0));
410 })
411 .has_type::<String>()
412 .is_equal_to("replay_retention must retain at least one chunk");
413
414 assert_that_panic_by(|| {
415 let config = StreamConfig::builder()
416 .lossy_without_backpressure()
417 .replay_all()
418 .read_chunk_size(8.bytes())
419 .max_buffered_chunks(2)
420 .build();
421
422 let _config =
423 config.with_replay_retention(ReplayRetention::LastBytes(NumBytes::zero()));
424 })
425 .has_type::<String>()
426 .is_equal_to("replay_retention must retain at least one byte");
427
428 assert_that_panic_by(|| {
429 let config = StreamConfig {
430 read_chunk_size: 8.bytes(),
431 max_buffered_chunks: 2,
432 delivery: LossyWithoutBackpressure,
433 replay: ReplayEnabled {
434 replay_retention: ReplayRetention::LastBytes(NumBytes::zero()),
435 },
436 };
437
438 config.assert_valid("options");
439 })
440 .has_type::<String>()
441 .is_equal_to("options.replay_retention must retain at least one byte");
442 }
443
444 #[tokio::test]
445 async fn one_config_constructs_both_stream_backends() {
446 use crate::OutputStream;
447 use crate::output_stream::backend::broadcast::BroadcastOutputStream;
448 use crate::output_stream::backend::single_subscriber::SingleSubscriberOutputStream;
449
450 let config = StreamConfig::builder()
451 .lossy_without_backpressure()
452 .no_replay()
453 .read_chunk_size(8.bytes())
454 .max_buffered_chunks(2)
455 .build();
456
457 let broadcast = BroadcastOutputStream::from_stream(tokio::io::empty(), "stdout", config);
458 let single_subscriber =
459 SingleSubscriberOutputStream::from_stream(tokio::io::empty(), "stderr", config);
460
461 assert_that!(broadcast.read_chunk_size()).is_equal_to(8.bytes());
462 assert_that!(single_subscriber.read_chunk_size()).is_equal_to(8.bytes());
463 assert_that!(broadcast.max_buffered_chunks()).is_equal_to(2);
464 assert_that!(single_subscriber.max_buffered_chunks()).is_equal_to(2);
465 }
466}