congestion/controller.rs
1//! Core trait and supporting types for pluggable congestion-control algorithms.
2
3/// The outcome of a single measured operation.
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
5pub enum Outcome {
6 /// Operation completed with no backpressure signal.
7 Ok,
8 /// Operation completed but the underlying system signaled backpressure
9 /// (e.g. `EAGAIN`, `ECONNREFUSED`, or an observably long response that
10 /// suggests saturation).
11 Backpressure,
12 /// Operation failed for a reason unrelated to congestion (permission
13 /// denied, no such file, etc.).
14 Error,
15}
16
17/// A single observation of an operation's behavior, fed to a [`Controller`].
18///
19/// The `started_at` / `completed_at` pair should enclose the actual syscall or
20/// network round-trip, not the permit acquisition; the controller reasons
21/// about service time at the bottleneck, not queueing time ahead of it.
22#[derive(Debug, Clone, Copy)]
23pub struct Sample {
24 /// When the operation was submitted to the underlying system.
25 pub started_at: std::time::Instant,
26 /// When the operation reported completion.
27 pub completed_at: std::time::Instant,
28 /// Bytes transferred. Zero for metadata operations.
29 pub bytes: u64,
30 /// How the operation concluded.
31 pub outcome: Outcome,
32}
33
34impl Sample {
35 /// Wall-clock duration of the measured operation.
36 pub fn latency(&self) -> std::time::Duration {
37 self.completed_at.duration_since(self.started_at)
38 }
39}
40
41/// Absolute limits emitted by a controller for the enforcement layer to apply.
42///
43/// A `None` field means "no limit on this dimension." Controllers emit a fresh
44/// `Decision` on every tick; the enforcement layer is responsible for diffing
45/// consecutive decisions and applying only what changed.
46#[derive(Debug, Clone, Copy, PartialEq)]
47pub struct Decision {
48 /// Maximum number of operations that may be in flight concurrently.
49 pub max_in_flight: Option<u32>,
50 /// Maximum submission rate in resource-appropriate units:
51 /// ops/sec for metadata controllers, bytes/sec for data controllers.
52 pub rate_per_sec: Option<f64>,
53}
54
55impl Decision {
56 /// A decision that imposes no limits.
57 pub const UNLIMITED: Decision = Decision {
58 max_in_flight: None,
59 rate_per_sec: None,
60 };
61 /// Decision bounded only by concurrency.
62 pub fn with_concurrency(max_in_flight: u32) -> Decision {
63 Decision {
64 max_in_flight: Some(max_in_flight),
65 rate_per_sec: None,
66 }
67 }
68 /// Decision bounded only by rate.
69 pub fn with_rate(rate_per_sec: f64) -> Decision {
70 Decision {
71 max_in_flight: None,
72 rate_per_sec: Some(rate_per_sec),
73 }
74 }
75 /// Decision bounded by both concurrency and rate.
76 pub fn with_concurrency_and_rate(max_in_flight: u32, rate_per_sec: f64) -> Decision {
77 Decision {
78 max_in_flight: Some(max_in_flight),
79 rate_per_sec: Some(rate_per_sec),
80 }
81 }
82}
83
84/// Read-only view of a controller's internal state, intended for the
85/// progress bar and other observability surfaces.
86///
87/// Snapshots are sampled — never authoritative for enforcement. They
88/// expose the same fields a ratio-based controller reasons about
89/// (`cwnd`, baseline, current observed latency, sample count) so a
90/// renderer can show *why* the current `cwnd` is what it is. Controllers
91/// without a meaningful internal state (e.g. `Noop`) return
92/// [`ControllerSnapshot::default`]; controllers without a latency
93/// signal (e.g. `Fixed`) populate `cwnd` only and leave the latency
94/// fields at zero.
95///
96/// `latency_ratio` is intentionally not pre-computed — the renderer
97/// derives it from `current_latency / baseline_latency` so there is one
98/// source of truth.
99#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
100pub struct ControllerSnapshot {
101 /// Current concurrency window the controller would emit on its
102 /// next tick. `0` means "no cap configured."
103 pub cwnd: u32,
104 /// Long-horizon baseline latency. For ratio-based controllers
105 /// this is the configured baseline percentile over the long sample
106 /// window; the renderer treats it as the "uncongested floor"
107 /// reference. `Duration::ZERO` if no signal yet.
108 pub baseline_latency: std::time::Duration,
109 /// Short-horizon current latency. For ratio-based controllers
110 /// this is the configured current percentile computed over the
111 /// short sample window. `Duration::ZERO` if no fresh samples have
112 /// been observed.
113 pub current_latency: std::time::Duration,
114 /// Cumulative number of samples the controller has consumed.
115 pub samples_seen: u64,
116}
117
118/// A pluggable congestion-control algorithm.
119///
120/// A `Controller` is a stateful, synchronous state machine that consumes
121/// operation-completion [`Sample`]s and, on each tick, emits an absolute
122/// [`Decision`] expressing the current permitted concurrency and/or rate.
123///
124/// Controllers must be `Send` so the enforcement layer can own them from a
125/// dedicated control task. They do not need to be `Sync`: the enforcement
126/// layer guarantees single-threaded access.
127///
128/// All methods are synchronous and do not perform I/O. Time is always passed
129/// in; controllers must not read clocks directly. This keeps algorithms
130/// deterministic under the simulator in [`crate::sim`].
131pub trait Controller: Send {
132 /// Record a completed operation.
133 fn on_sample(&mut self, sample: &Sample);
134 /// Produce the current decision. Called periodically by the enforcement
135 /// layer. The controller must return an absolute limit (not a delta).
136 fn on_tick(&mut self, now: std::time::Instant) -> Decision;
137 /// Short, stable identifier used in logs and metrics (e.g. "noop",
138 /// "fixed", "ratio").
139 fn name(&self) -> &'static str;
140 /// Snapshot of the controller's observable state for diagnostics
141 /// and progress display. Default returns an empty snapshot, which
142 /// is appropriate for controllers that have no meaningful internal
143 /// state to surface.
144 fn snapshot(&self) -> ControllerSnapshot {
145 ControllerSnapshot::default()
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152
153 #[test]
154 fn sample_latency_is_difference_of_timestamps() {
155 let start = std::time::Instant::now();
156 let sample = Sample {
157 started_at: start,
158 completed_at: start + std::time::Duration::from_millis(5),
159 bytes: 0,
160 outcome: Outcome::Ok,
161 };
162 assert_eq!(sample.latency(), std::time::Duration::from_millis(5));
163 }
164
165 #[test]
166 fn decision_unlimited_imposes_no_limits() {
167 assert_eq!(Decision::UNLIMITED.max_in_flight, None);
168 assert_eq!(Decision::UNLIMITED.rate_per_sec, None);
169 }
170
171 #[test]
172 fn decision_constructors_set_only_the_named_dimension() {
173 let c = Decision::with_concurrency(8);
174 assert_eq!(c.max_in_flight, Some(8));
175 assert_eq!(c.rate_per_sec, None);
176 let r = Decision::with_rate(100.0);
177 assert_eq!(r.max_in_flight, None);
178 assert_eq!(r.rate_per_sec, Some(100.0));
179 let both = Decision::with_concurrency_and_rate(4, 50.0);
180 assert_eq!(both.max_in_flight, Some(4));
181 assert_eq!(both.rate_per_sec, Some(50.0));
182 }
183}