Skip to main content

tokio_process_tools/output_stream/
policy.rs

1use crate::output_stream::num_bytes::NumBytes;
2
3mod sealed {
4    pub trait DeliverySealed {}
5
6    pub trait ReplaySealed {}
7}
8
9/// Marker trait implemented by supported stream delivery marker types.
10pub trait Delivery:
11    sealed::DeliverySealed + Clone + Copy + std::fmt::Debug + PartialEq + Eq + Send + Sync + 'static
12{
13    /// Returns the runtime delivery guarantee represented by this marker.
14    fn guarantee(self) -> DeliveryGuarantee;
15}
16
17/// Lossy stream delivery marker that never applies backpressure to the child.
18///
19/// **Mechanism:** The reader task keeps draining the child's pipe regardless of consumer pace. When
20/// a subscriber's buffer fills, the chunk is dropped for that subscriber rather than pausing the
21/// child.
22///
23/// **Cost:** Slow active consumers may observe gaps or dropped output. Line-aware consumers
24/// discard the in-progress partial line and resync at the next newline rather than splicing across
25/// the gap.
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub struct LossyWithoutBackpressure;
28
29impl sealed::DeliverySealed for LossyWithoutBackpressure {}
30
31impl Delivery for LossyWithoutBackpressure {
32    fn guarantee(self) -> DeliveryGuarantee {
33        DeliveryGuarantee::LossyWithoutBackpressure
34    }
35}
36
37/// Reliable stream delivery marker that applies backpressure to the child to keep active
38/// subscribers gap-free.
39///
40/// **Mechanism:** When an active subscriber's buffer is full, the reader task waits before reading
41/// more from the child's pipe. The kernel pipe then fills and the child's next write blocks. This
42/// is the cost paid for reliability.
43///
44/// **Scope:** The guarantee applies only to subscribers that are *currently attached* when each
45/// chunk is produced. Subscribers that attach later do not retroactively receive earlier chunks
46/// from this delivery policy; that is what the replay axis is for.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub struct ReliableWithBackpressure;
49
50impl sealed::DeliverySealed for ReliableWithBackpressure {}
51
52impl Delivery for ReliableWithBackpressure {
53    fn guarantee(self) -> DeliveryGuarantee {
54        DeliveryGuarantee::ReliableWithBackpressure
55    }
56}
57
58/// Marker trait implemented by supported stream replay marker types.
59pub trait Replay:
60    sealed::ReplaySealed + Clone + Copy + std::fmt::Debug + PartialEq + Eq + Send + Sync + 'static
61{
62    /// Returns the replay retention represented by this marker.
63    fn replay_retention(self) -> Option<ReplayRetention>;
64
65    /// Returns whether replay-specific APIs are enabled for this marker.
66    fn replay_enabled(self) -> bool;
67}
68
69/// Marker for streams without replay support.
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71pub struct NoReplay;
72
73impl sealed::ReplaySealed for NoReplay {}
74
75impl Replay for NoReplay {
76    fn replay_retention(self) -> Option<ReplayRetention> {
77        None
78    }
79
80    fn replay_enabled(self) -> bool {
81        false
82    }
83}
84
85/// Marker for streams with replay support.
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub struct ReplayEnabled {
88    /// Replay history retained for subscribers created after output has already arrived.
89    pub replay_retention: ReplayRetention,
90}
91
92impl ReplayEnabled {
93    /// Creates a replay-enabled marker with the given retention.
94    #[must_use]
95    pub fn new(replay_retention: ReplayRetention) -> Self {
96        replay_retention.assert_non_zero("replay_retention");
97        Self { replay_retention }
98    }
99}
100
101impl sealed::ReplaySealed for ReplayEnabled {}
102
103impl Replay for ReplayEnabled {
104    fn replay_retention(self) -> Option<ReplayRetention> {
105        Some(self.replay_retention)
106    }
107
108    fn replay_enabled(self) -> bool {
109        true
110    }
111}
112
113/// Runtime delivery behavior used by typed stream modes.
114///
115/// The two variants describe the same trade-off as the typed [`LossyWithoutBackpressure`] and
116/// [`ReliableWithBackpressure`] markers. Active subscribers either get every chunk at the cost of
117/// pausing the child when their buffer is full, or they tolerate dropped chunks so the child is
118/// never blocked.
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub enum DeliveryGuarantee {
121    /// Keep reading output and drop chunks for slow subscribers when bounded buffers overflow.
122    /// The child is never blocked by consumer pace.
123    LossyWithoutBackpressure,
124
125    /// Wait for active subscribers before reading more output when bounded buffers are full. The
126    /// child's next write blocks once the kernel pipe fills. The reliability scope is limited to
127    /// subscribers attached at the time each chunk is produced; late attachers depend on the
128    /// replay axis for earlier output.
129    ReliableWithBackpressure,
130}
131
132/// Replay history retained by replay-enabled streams.
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum ReplayRetention {
135    /// Keep the latest number of chunks for future subscribers.
136    LastChunks(usize),
137
138    /// Keep whole chunks covering at least the latest number of bytes.
139    ///
140    /// Trimming happens at chunk boundaries: chunks are removed from the front of the retained
141    /// log only while doing so leaves the remaining size at or above the requested limit. Chunks
142    /// are never split, so the actual retained size is "rounded up to whole chunks." A single
143    /// chunk that is itself larger than the configured limit is retained in full until a newer
144    /// chunk arrives that can replace it; the limit is therefore a soft floor on retention, not
145    /// a hard upper bound on memory.
146    ///
147    /// To make the practical bound predictable, pair this with a `read_chunk_size` smaller than
148    /// the retention limit so a single chunk cannot exceed the configured budget.
149    LastBytes(NumBytes),
150
151    /// Keep all output for the stream lifetime.
152    ///
153    /// This can retain unbounded memory. Use it only when the child process and its output volume
154    /// are trusted.
155    All,
156}
157
158impl ReplayRetention {
159    pub(crate) fn assert_non_zero(self, parameter_name: &str) {
160        match self {
161            ReplayRetention::LastChunks(0) => {
162                panic!("{parameter_name} must retain at least one chunk");
163            }
164            ReplayRetention::LastBytes(bytes) if bytes.bytes() == 0 => {
165                panic!("{parameter_name} must retain at least one byte");
166            }
167            ReplayRetention::LastChunks(_)
168            | ReplayRetention::LastBytes(_)
169            | ReplayRetention::All => {}
170        }
171    }
172}