tokio_process_tools/output_stream/
config.rs1use crate::NumBytes;
2use crate::output_stream::policy::{
3 BestEffortDelivery, Delivery, DeliveryGuarantee, NoReplay, ReliableDelivery, Replay,
4 ReplayEnabled, ReplayRetention,
5};
6
7pub const DEFAULT_READ_CHUNK_SIZE: NumBytes = NumBytes(16 * 1024); pub const DEFAULT_MAX_BUFFERED_CHUNKS: usize = 128;
12
13pub(crate) fn assert_max_buffered_chunks_non_zero(chunks: usize, parameter_name: &str) {
14 assert!(chunks > 0, "{parameter_name} must be greater than zero");
15}
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub struct StreamConfig<D = BestEffortDelivery, R = NoReplay>
24where
25 D: Delivery,
26 R: Replay,
27{
28 pub read_chunk_size: NumBytes,
32
33 pub max_buffered_chunks: usize,
39
40 pub delivery: D,
42
43 pub replay: R,
45}
46
47impl StreamConfig<BestEffortDelivery, NoReplay> {
48 #[must_use]
50 pub fn builder() -> StreamConfigBuilder {
51 StreamConfigBuilder
52 }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub struct StreamConfigBuilder;
58
59impl StreamConfigBuilder {
60 #[must_use]
66 pub fn best_effort_delivery(self) -> StreamConfigReplayBuilder<BestEffortDelivery> {
67 StreamConfigReplayBuilder {
68 delivery: BestEffortDelivery,
69 }
70 }
71
72 #[must_use]
78 pub fn reliable_for_active_subscribers(self) -> StreamConfigReplayBuilder<ReliableDelivery> {
79 StreamConfigReplayBuilder {
80 delivery: ReliableDelivery,
81 }
82 }
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub struct StreamConfigReplayBuilder<D>
88where
89 D: Delivery,
90{
91 delivery: D,
92}
93
94impl<D> StreamConfigReplayBuilder<D>
95where
96 D: Delivery,
97{
98 #[must_use]
102 pub fn no_replay(self) -> StreamConfigReadChunkSizeBuilder<D, NoReplay> {
103 StreamConfigReadChunkSizeBuilder {
104 delivery: self.delivery,
105 replay: NoReplay,
106 }
107 }
108
109 #[must_use]
111 pub fn replay_last_chunks(
112 self,
113 chunks: usize,
114 ) -> StreamConfigReadChunkSizeBuilder<D, ReplayEnabled> {
115 let replay_retention = ReplayRetention::LastChunks(chunks);
116 replay_retention.assert_non_zero("chunks");
117 StreamConfigReadChunkSizeBuilder {
118 delivery: self.delivery,
119 replay: ReplayEnabled::new(replay_retention),
120 }
121 }
122
123 #[must_use]
125 pub fn replay_last_bytes(
126 self,
127 bytes: NumBytes,
128 ) -> StreamConfigReadChunkSizeBuilder<D, ReplayEnabled> {
129 let replay_retention = ReplayRetention::LastBytes(bytes);
130 replay_retention.assert_non_zero("bytes");
131 StreamConfigReadChunkSizeBuilder {
132 delivery: self.delivery,
133 replay: ReplayEnabled::new(replay_retention),
134 }
135 }
136
137 #[must_use]
144 pub fn replay_all(self) -> StreamConfigReadChunkSizeBuilder<D, ReplayEnabled> {
145 StreamConfigReadChunkSizeBuilder {
146 delivery: self.delivery,
147 replay: ReplayEnabled::new(ReplayRetention::All),
148 }
149 }
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub struct StreamConfigReadChunkSizeBuilder<D, R>
155where
156 D: Delivery,
157 R: Replay,
158{
159 delivery: D,
160 replay: R,
161}
162
163impl<D, R> StreamConfigReadChunkSizeBuilder<D, R>
164where
165 D: Delivery,
166 R: Replay,
167{
168 #[must_use]
174 pub fn read_chunk_size(
175 self,
176 read_chunk_size: NumBytes,
177 ) -> StreamConfigMaxBufferedChunksBuilder<D, R> {
178 read_chunk_size.assert_non_zero("read_chunk_size");
179 StreamConfigMaxBufferedChunksBuilder {
180 delivery: self.delivery,
181 replay: self.replay,
182 read_chunk_size,
183 }
184 }
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
189pub struct StreamConfigMaxBufferedChunksBuilder<D, R>
190where
191 D: Delivery,
192 R: Replay,
193{
194 delivery: D,
195 replay: R,
196 read_chunk_size: NumBytes,
197}
198
199impl<D, R> StreamConfigMaxBufferedChunksBuilder<D, R>
200where
201 D: Delivery,
202 R: Replay,
203{
204 #[must_use]
210 pub fn max_buffered_chunks(self, max_buffered_chunks: usize) -> StreamConfigReadyBuilder<D, R> {
211 assert_max_buffered_chunks_non_zero(max_buffered_chunks, "max_buffered_chunks");
212 StreamConfigReadyBuilder {
213 config: StreamConfig {
214 read_chunk_size: self.read_chunk_size,
215 max_buffered_chunks,
216 delivery: self.delivery,
217 replay: self.replay,
218 },
219 }
220 }
221}
222
223#[derive(Debug, Clone, Copy, PartialEq, Eq)]
225pub struct StreamConfigReadyBuilder<D, R>
226where
227 D: Delivery,
228 R: Replay,
229{
230 config: StreamConfig<D, R>,
231}
232
233impl<D, R> StreamConfigReadyBuilder<D, R>
234where
235 D: Delivery,
236 R: Replay,
237{
238 #[must_use]
240 pub fn build(self) -> StreamConfig<D, R> {
241 self.config
242 }
243}
244
245impl<D, R> StreamConfig<D, R>
246where
247 D: Delivery,
248 R: Replay,
249{
250 #[must_use]
252 pub fn delivery_guarantee(self) -> DeliveryGuarantee {
253 self.delivery.guarantee()
254 }
255
256 #[must_use]
258 pub fn replay_retention(self) -> Option<ReplayRetention> {
259 self.replay.replay_retention()
260 }
261
262 #[must_use]
264 pub fn replay_enabled(self) -> bool {
265 self.replay.replay_enabled()
266 }
267
268 pub(crate) fn assert_valid(self, parameter_name: &str) {
269 self.read_chunk_size
270 .assert_non_zero(&format!("{parameter_name}.read_chunk_size"));
271 assert_max_buffered_chunks_non_zero(
272 self.max_buffered_chunks,
273 &format!("{parameter_name}.max_buffered_chunks"),
274 );
275 if let Some(replay_retention) = self.replay_retention() {
276 replay_retention.assert_non_zero(&format!("{parameter_name}.replay_retention"));
277 }
278 }
279}
280
281impl<D> StreamConfig<D, ReplayEnabled>
282where
283 D: Delivery,
284{
285 #[must_use]
287 pub fn with_replay_retention(mut self, replay_retention: ReplayRetention) -> Self {
288 replay_retention.assert_non_zero("replay_retention");
289 self.replay.replay_retention = replay_retention;
290 self
291 }
292}
293
294#[cfg(test)]
295mod tests {
296 use super::*;
297 use crate::output_stream::num_bytes::NumBytesExt;
298 use crate::{DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE};
299 use assertr::prelude::*;
300
301 #[test]
302 fn builder_creates_expected_delivery_and_replay_configs() {
303 let config: StreamConfig<BestEffortDelivery, NoReplay> = StreamConfig::builder()
304 .best_effort_delivery()
305 .no_replay()
306 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
307 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
308 .build();
309
310 assert_that!(config.delivery_guarantee()).is_equal_to(DeliveryGuarantee::BestEffort);
311 assert_that!(config.replay_enabled()).is_false();
312 assert_that!(config.replay_retention()).is_none();
313 assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
314 assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
315
316 let config: StreamConfig<ReliableDelivery, NoReplay> = StreamConfig::builder()
317 .reliable_for_active_subscribers()
318 .no_replay()
319 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
320 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
321 .build();
322
323 assert_that!(config.delivery_guarantee())
324 .is_equal_to(DeliveryGuarantee::ReliableForActiveSubscribers);
325 assert_that!(config.replay_enabled()).is_false();
326 assert_that!(config.replay_retention()).is_none();
327 assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
328 assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
329
330 let config: StreamConfig<BestEffortDelivery, ReplayEnabled> = StreamConfig::builder()
331 .best_effort_delivery()
332 .replay_last_chunks(2)
333 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
334 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
335 .build();
336
337 assert_that!(config.delivery_guarantee()).is_equal_to(DeliveryGuarantee::BestEffort);
338 assert_that!(config.replay_enabled()).is_true();
339 assert_that!(config.replay_retention()).is_equal_to(Some(ReplayRetention::LastChunks(2)));
340 assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
341 assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
342
343 let config: StreamConfig<ReliableDelivery, ReplayEnabled> = StreamConfig::builder()
344 .reliable_for_active_subscribers()
345 .replay_last_bytes(16.bytes())
346 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
347 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
348 .build();
349
350 assert_that!(config.delivery_guarantee())
351 .is_equal_to(DeliveryGuarantee::ReliableForActiveSubscribers);
352 assert_that!(config.replay_enabled()).is_true();
353 assert_that!(config.replay_retention())
354 .is_equal_to(Some(ReplayRetention::LastBytes(16.bytes())));
355 assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
356 assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
357 }
358
359 #[test]
360 fn invalid_configs_panic_with_parameter_names() {
361 assert_that_panic_by(|| {
362 let _config = StreamConfig::builder()
363 .best_effort_delivery()
364 .no_replay()
365 .read_chunk_size(0.bytes());
366 })
367 .has_type::<String>()
368 .is_equal_to("read_chunk_size must be greater than zero bytes");
369
370 assert_that_panic_by(|| {
371 let _config = StreamConfig::builder()
372 .best_effort_delivery()
373 .no_replay()
374 .read_chunk_size(8.bytes())
375 .max_buffered_chunks(0);
376 })
377 .has_type::<String>()
378 .is_equal_to("max_buffered_chunks must be greater than zero");
379
380 assert_that_panic_by(|| {
381 let _config = StreamConfig::builder()
382 .best_effort_delivery()
383 .replay_last_chunks(0);
384 })
385 .has_type::<String>()
386 .is_equal_to("chunks must retain at least one chunk");
387
388 assert_that_panic_by(|| {
389 let _config = StreamConfig::builder()
390 .best_effort_delivery()
391 .replay_last_bytes(NumBytes::zero());
392 })
393 .has_type::<String>()
394 .is_equal_to("bytes must retain at least one byte");
395
396 assert_that_panic_by(|| {
397 let _replay = ReplayEnabled::new(ReplayRetention::LastChunks(0));
398 })
399 .has_type::<String>()
400 .is_equal_to("replay_retention must retain at least one chunk");
401
402 assert_that_panic_by(|| {
403 let config = StreamConfig::builder()
404 .best_effort_delivery()
405 .replay_all()
406 .read_chunk_size(8.bytes())
407 .max_buffered_chunks(2)
408 .build();
409
410 let _config =
411 config.with_replay_retention(ReplayRetention::LastBytes(NumBytes::zero()));
412 })
413 .has_type::<String>()
414 .is_equal_to("replay_retention must retain at least one byte");
415
416 assert_that_panic_by(|| {
417 let config = StreamConfig {
418 read_chunk_size: 8.bytes(),
419 max_buffered_chunks: 2,
420 delivery: BestEffortDelivery,
421 replay: ReplayEnabled {
422 replay_retention: ReplayRetention::LastBytes(NumBytes::zero()),
423 },
424 };
425
426 config.assert_valid("options");
427 })
428 .has_type::<String>()
429 .is_equal_to("options.replay_retention must retain at least one byte");
430 }
431
432 #[tokio::test]
433 async fn one_config_constructs_both_stream_backends() {
434 use crate::OutputStream;
435 use crate::output_stream::backend::broadcast::BroadcastOutputStream;
436 use crate::output_stream::backend::single_subscriber::SingleSubscriberOutputStream;
437
438 let config = StreamConfig::builder()
439 .best_effort_delivery()
440 .no_replay()
441 .read_chunk_size(8.bytes())
442 .max_buffered_chunks(2)
443 .build();
444
445 let broadcast = BroadcastOutputStream::from_stream(tokio::io::empty(), "stdout", config);
446 let single_subscriber =
447 SingleSubscriberOutputStream::from_stream(tokio::io::empty(), "stderr", config);
448
449 assert_that!(broadcast.read_chunk_size()).is_equal_to(8.bytes());
450 assert_that!(single_subscriber.read_chunk_size()).is_equal_to(8.bytes());
451 assert_that!(broadcast.max_buffered_chunks()).is_equal_to(2);
452 assert_that!(single_subscriber.max_buffered_chunks()).is_equal_to(2);
453 }
454}