tokio_process_tools/output_stream/policy.rs
1use crate::output_stream::num_bytes::NumBytes;
2
3mod sealed {
4 pub trait DeliverySealed {}
5
6 pub trait ReplaySealed {}
7}
8
9/// Marker trait implemented by supported stream delivery marker types.
10pub trait Delivery:
11 sealed::DeliverySealed + Clone + Copy + std::fmt::Debug + PartialEq + Eq + Send + Sync + 'static
12{
13 /// Returns the runtime delivery guarantee represented by this marker.
14 fn guarantee(self) -> DeliveryGuarantee;
15}
16
17/// Lossy stream delivery marker that never applies backpressure to the child.
18///
19/// **Mechanism:** The reader task keeps draining the child's pipe regardless of consumer pace. When
20/// a subscriber's buffer fills, the chunk is dropped for that subscriber rather than pausing the
21/// child.
22///
23/// **Cost:** Slow active consumers may observe gaps or dropped output. Line-aware consumers
24/// discard the in-progress partial line and resync at the next newline rather than splicing across
25/// the gap.
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub struct LossyWithoutBackpressure;
28
29impl sealed::DeliverySealed for LossyWithoutBackpressure {}
30
31impl Delivery for LossyWithoutBackpressure {
32 fn guarantee(self) -> DeliveryGuarantee {
33 DeliveryGuarantee::LossyWithoutBackpressure
34 }
35}
36
37/// Reliable stream delivery marker that applies backpressure to the child to keep active
38/// subscribers gap-free.
39///
40/// **Mechanism:** When an active subscriber's buffer is full, the reader task waits before reading
41/// more from the child's pipe. The kernel pipe then fills and the child's next write blocks. This
42/// is the cost paid for reliability.
43///
44/// **Scope:** The guarantee applies only to subscribers that are *currently attached* when each
45/// chunk is produced. Subscribers that attach later do not retroactively receive earlier chunks
46/// from this delivery policy; that is what the replay axis is for.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub struct ReliableWithBackpressure;
49
50impl sealed::DeliverySealed for ReliableWithBackpressure {}
51
52impl Delivery for ReliableWithBackpressure {
53 fn guarantee(self) -> DeliveryGuarantee {
54 DeliveryGuarantee::ReliableWithBackpressure
55 }
56}
57
58/// Marker trait implemented by supported stream replay marker types.
59pub trait Replay:
60 sealed::ReplaySealed + Clone + Copy + std::fmt::Debug + PartialEq + Eq + Send + Sync + 'static
61{
62 /// Returns the replay retention represented by this marker.
63 fn replay_retention(self) -> Option<ReplayRetention>;
64
65 /// Returns whether replay-specific APIs are enabled for this marker.
66 fn replay_enabled(self) -> bool;
67}
68
69/// Marker for streams without replay support.
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71pub struct NoReplay;
72
73impl sealed::ReplaySealed for NoReplay {}
74
75impl Replay for NoReplay {
76 fn replay_retention(self) -> Option<ReplayRetention> {
77 None
78 }
79
80 fn replay_enabled(self) -> bool {
81 false
82 }
83}
84
85/// Marker for streams with replay support.
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub struct ReplayEnabled {
88 /// Replay history retained for subscribers created after output has already arrived.
89 pub replay_retention: ReplayRetention,
90}
91
92impl ReplayEnabled {
93 /// Creates a replay-enabled marker with the given retention.
94 #[must_use]
95 pub fn new(replay_retention: ReplayRetention) -> Self {
96 replay_retention.assert_non_zero("replay_retention");
97 Self { replay_retention }
98 }
99}
100
101impl sealed::ReplaySealed for ReplayEnabled {}
102
103impl Replay for ReplayEnabled {
104 fn replay_retention(self) -> Option<ReplayRetention> {
105 Some(self.replay_retention)
106 }
107
108 fn replay_enabled(self) -> bool {
109 true
110 }
111}
112
113/// Runtime delivery behavior used by typed stream modes.
114///
115/// The two variants describe the same trade-off as the typed [`LossyWithoutBackpressure`] and
116/// [`ReliableWithBackpressure`] markers. Active subscribers either get every chunk at the cost of
117/// pausing the child when their buffer is full, or they tolerate dropped chunks so the child is
118/// never blocked.
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub enum DeliveryGuarantee {
121 /// Keep reading output and drop chunks for slow subscribers when bounded buffers overflow.
122 /// The child is never blocked by consumer pace.
123 LossyWithoutBackpressure,
124
125 /// Wait for active subscribers before reading more output when bounded buffers are full. The
126 /// child's next write blocks once the kernel pipe fills. The reliability scope is limited to
127 /// subscribers attached at the time each chunk is produced; late attachers depend on the
128 /// replay axis for earlier output.
129 ReliableWithBackpressure,
130}
131
132/// Replay history retained by replay-enabled streams.
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum ReplayRetention {
135 /// Keep the latest number of chunks for future subscribers.
136 LastChunks(usize),
137
138 /// Keep whole chunks covering at least the latest number of bytes.
139 ///
140 /// Trimming happens at chunk boundaries: chunks are removed from the front of the retained
141 /// log only while doing so leaves the remaining size at or above the requested limit. Chunks
142 /// are never split, so the actual retained size is "rounded up to whole chunks." A single
143 /// chunk that is itself larger than the configured limit is retained in full until a newer
144 /// chunk arrives that can replace it; the limit is therefore a soft floor on retention, not
145 /// a hard upper bound on memory.
146 ///
147 /// To make the practical bound predictable, pair this with a `read_chunk_size` smaller than
148 /// the retention limit so a single chunk cannot exceed the configured budget.
149 LastBytes(NumBytes),
150
151 /// Keep all output for the stream lifetime.
152 ///
153 /// This can retain unbounded memory. Use it only when the child process and its output volume
154 /// are trusted.
155 All,
156}
157
158impl ReplayRetention {
159 pub(crate) fn assert_non_zero(self, parameter_name: &str) {
160 match self {
161 ReplayRetention::LastChunks(0) => {
162 panic!("{parameter_name} must retain at least one chunk");
163 }
164 ReplayRetention::LastBytes(bytes) if bytes.bytes() == 0 => {
165 panic!("{parameter_name} must retain at least one byte");
166 }
167 ReplayRetention::LastChunks(_)
168 | ReplayRetention::LastBytes(_)
169 | ReplayRetention::All => {}
170 }
171 }
172}