Skip to main content

tokio_process_tools/output_stream/
policy.rs

1use crate::output_stream::num_bytes::NumBytes;
2
3mod sealed {
4    pub trait DeliverySealed {}
5
6    pub trait ReplaySealed {}
7}
8
9/// Marker trait implemented by supported stream delivery marker types.
10pub trait Delivery:
11    sealed::DeliverySealed + Clone + Copy + std::fmt::Debug + PartialEq + Eq + Send + Sync + 'static
12{
13    /// Returns the runtime delivery guarantee represented by this marker.
14    fn guarantee(self) -> DeliveryGuarantee;
15}
16
17/// Best-effort stream delivery marker.
18///
19/// Slow active consumers may observe gaps or dropped output when bounded buffers overflow.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub struct BestEffortDelivery;
22
23impl sealed::DeliverySealed for BestEffortDelivery {}
24
25impl Delivery for BestEffortDelivery {
26    fn guarantee(self) -> DeliveryGuarantee {
27        DeliveryGuarantee::BestEffort
28    }
29}
30
31/// Reliable active-subscriber stream delivery marker.
32///
33/// Active consumers apply backpressure when their buffers are full. Consumers that attach later
34/// still depend on replay settings for earlier output.
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub struct ReliableDelivery;
37
38impl sealed::DeliverySealed for ReliableDelivery {}
39
40impl Delivery for ReliableDelivery {
41    fn guarantee(self) -> DeliveryGuarantee {
42        DeliveryGuarantee::ReliableForActiveSubscribers
43    }
44}
45
46/// Marker trait implemented by supported stream replay marker types.
47pub trait Replay:
48    sealed::ReplaySealed + Clone + Copy + std::fmt::Debug + PartialEq + Eq + Send + Sync + 'static
49{
50    /// Returns the replay retention represented by this marker.
51    fn replay_retention(self) -> Option<ReplayRetention>;
52
53    /// Returns whether replay-specific APIs are enabled for this marker.
54    fn replay_enabled(self) -> bool;
55}
56
57/// Marker for streams without replay support.
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub struct NoReplay;
60
61impl sealed::ReplaySealed for NoReplay {}
62
63impl Replay for NoReplay {
64    fn replay_retention(self) -> Option<ReplayRetention> {
65        None
66    }
67
68    fn replay_enabled(self) -> bool {
69        false
70    }
71}
72
73/// Marker for streams with replay support.
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub struct ReplayEnabled {
76    /// Replay history retained for subscribers created after output has already arrived.
77    pub replay_retention: ReplayRetention,
78}
79
80impl ReplayEnabled {
81    /// Creates a replay-enabled marker with the given retention.
82    #[must_use]
83    pub fn new(replay_retention: ReplayRetention) -> Self {
84        replay_retention.assert_non_zero("replay_retention");
85        Self { replay_retention }
86    }
87}
88
89impl sealed::ReplaySealed for ReplayEnabled {}
90
91impl Replay for ReplayEnabled {
92    fn replay_retention(self) -> Option<ReplayRetention> {
93        Some(self.replay_retention)
94    }
95
96    fn replay_enabled(self) -> bool {
97        true
98    }
99}
100
101/// Runtime delivery behavior used by typed stream modes.
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum DeliveryGuarantee {
104    /// Keep reading output and emit gaps or drop output for slow consumers when bounded buffers
105    /// overflow.
106    BestEffort,
107
108    /// Wait for active consumers before reading more output when bounded buffers are full.
109    ReliableForActiveSubscribers,
110}
111
112/// Replay history retained by replay-enabled streams.
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum ReplayRetention {
115    /// Keep the latest number of chunks for future subscribers.
116    LastChunks(usize),
117
118    /// Keep whole chunks covering at least the latest number of bytes.
119    ///
120    /// Trimming happens at chunk boundaries: chunks are removed from the front of the retained
121    /// log only while doing so leaves the remaining size at or above the requested limit. Chunks
122    /// are never split, so the actual retained size is "rounded up to whole chunks." A single
123    /// chunk that is itself larger than the configured limit is retained in full until a newer
124    /// chunk arrives that can replace it; the limit is therefore a soft floor on retention, not
125    /// a hard upper bound on memory.
126    ///
127    /// To make the practical bound predictable, pair this with a `read_chunk_size` smaller than
128    /// the retention limit so a single chunk cannot exceed the configured budget.
129    LastBytes(NumBytes),
130
131    /// Keep all output for the stream lifetime.
132    ///
133    /// This can retain unbounded memory. Use it only when the child process and its output volume
134    /// are trusted.
135    All,
136}
137
138impl ReplayRetention {
139    pub(crate) fn assert_non_zero(self, parameter_name: &str) {
140        match self {
141            ReplayRetention::LastChunks(0) => {
142                panic!("{parameter_name} must retain at least one chunk");
143            }
144            ReplayRetention::LastBytes(bytes) if bytes.bytes() == 0 => {
145                panic!("{parameter_name} must retain at least one byte");
146            }
147            ReplayRetention::LastChunks(_)
148            | ReplayRetention::LastBytes(_)
149            | ReplayRetention::All => {}
150        }
151    }
152}