Skip to main content

congestion/
controller.rs

1//! Core trait and supporting types for pluggable congestion-control algorithms.
2
3/// The outcome of a single measured operation.
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
5pub enum Outcome {
6    /// Operation completed with no backpressure signal.
7    Ok,
8    /// Operation completed but the underlying system signaled backpressure
9    /// (e.g. `EAGAIN`, `ECONNREFUSED`, or an observably long response that
10    /// suggests saturation).
11    Backpressure,
12    /// Operation failed for a reason unrelated to congestion (permission
13    /// denied, no such file, etc.).
14    Error,
15}
16
17/// A single observation of an operation's behavior, fed to a [`Controller`].
18///
19/// The `started_at` / `completed_at` pair should enclose the actual syscall or
20/// network round-trip, not the permit acquisition; the controller reasons
21/// about service time at the bottleneck, not queueing time ahead of it.
22#[derive(Debug, Clone, Copy)]
23pub struct Sample {
24    /// When the operation was submitted to the underlying system.
25    pub started_at: std::time::Instant,
26    /// When the operation reported completion.
27    pub completed_at: std::time::Instant,
28    /// Bytes transferred. Zero for metadata operations.
29    pub bytes: u64,
30    /// How the operation concluded.
31    pub outcome: Outcome,
32}
33
34impl Sample {
35    /// Wall-clock duration of the measured operation.
36    pub fn latency(&self) -> std::time::Duration {
37        self.completed_at.duration_since(self.started_at)
38    }
39}
40
41/// Absolute limits emitted by a controller for the enforcement layer to apply.
42///
43/// A `None` field means "no limit on this dimension." Controllers emit a fresh
44/// `Decision` on every tick; the enforcement layer is responsible for diffing
45/// consecutive decisions and applying only what changed.
46#[derive(Debug, Clone, Copy, PartialEq)]
47pub struct Decision {
48    /// Maximum number of operations that may be in flight concurrently.
49    pub max_in_flight: Option<u32>,
50    /// Maximum submission rate in resource-appropriate units:
51    /// ops/sec for metadata controllers, bytes/sec for data controllers.
52    pub rate_per_sec: Option<f64>,
53}
54
55impl Decision {
56    /// A decision that imposes no limits.
57    pub const UNLIMITED: Decision = Decision {
58        max_in_flight: None,
59        rate_per_sec: None,
60    };
61    /// Decision bounded only by concurrency.
62    pub fn with_concurrency(max_in_flight: u32) -> Decision {
63        Decision {
64            max_in_flight: Some(max_in_flight),
65            rate_per_sec: None,
66        }
67    }
68    /// Decision bounded only by rate.
69    pub fn with_rate(rate_per_sec: f64) -> Decision {
70        Decision {
71            max_in_flight: None,
72            rate_per_sec: Some(rate_per_sec),
73        }
74    }
75    /// Decision bounded by both concurrency and rate.
76    pub fn with_concurrency_and_rate(max_in_flight: u32, rate_per_sec: f64) -> Decision {
77        Decision {
78            max_in_flight: Some(max_in_flight),
79            rate_per_sec: Some(rate_per_sec),
80        }
81    }
82}
83
84/// Read-only view of a controller's internal state, intended for the
85/// progress bar and other observability surfaces.
86///
87/// Snapshots are sampled — never authoritative for enforcement. They
88/// expose the same fields a ratio-based controller reasons about
89/// (`cwnd`, baseline, current observed latency, sample count) so a
90/// renderer can show *why* the current `cwnd` is what it is. Controllers
91/// without a meaningful internal state (e.g. `Noop`) return
92/// [`ControllerSnapshot::default`]; controllers without a latency
93/// signal (e.g. `Fixed`) populate `cwnd` only and leave the latency
94/// fields at zero.
95///
96/// `latency_ratio` is intentionally not pre-computed — the renderer
97/// derives it from `current_latency / baseline_latency` so there is one
98/// source of truth.
99#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
100pub struct ControllerSnapshot {
101    /// Current concurrency window the controller would emit on its
102    /// next tick. `0` means "no cap configured."
103    pub cwnd: u32,
104    /// Long-horizon baseline latency. For ratio-based controllers
105    /// this is the configured baseline percentile over the long sample
106    /// window; the renderer treats it as the "uncongested floor"
107    /// reference. `Duration::ZERO` if no signal yet.
108    pub baseline_latency: std::time::Duration,
109    /// Short-horizon current latency. For ratio-based controllers
110    /// this is the configured current percentile computed over the
111    /// short sample window. `Duration::ZERO` if no fresh samples have
112    /// been observed.
113    pub current_latency: std::time::Duration,
114    /// Cumulative number of samples the controller has consumed.
115    pub samples_seen: u64,
116}
117
118/// A pluggable congestion-control algorithm.
119///
120/// A `Controller` is a stateful, synchronous state machine that consumes
121/// operation-completion [`Sample`]s and, on each tick, emits an absolute
122/// [`Decision`] expressing the current permitted concurrency and/or rate.
123///
124/// Controllers must be `Send` so the enforcement layer can own them from a
125/// dedicated control task. They do not need to be `Sync`: the enforcement
126/// layer guarantees single-threaded access.
127///
128/// All methods are synchronous and do not perform I/O. Time is always passed
129/// in; controllers must not read clocks directly. This keeps algorithms
130/// deterministic under the simulator in [`crate::sim`].
131pub trait Controller: Send {
132    /// Record a completed operation.
133    fn on_sample(&mut self, sample: &Sample);
134    /// Produce the current decision. Called periodically by the enforcement
135    /// layer. The controller must return an absolute limit (not a delta).
136    fn on_tick(&mut self, now: std::time::Instant) -> Decision;
137    /// Short, stable identifier used in logs and metrics (e.g. "noop",
138    /// "fixed", "ratio").
139    fn name(&self) -> &'static str;
140    /// Snapshot of the controller's observable state for diagnostics
141    /// and progress display. Default returns an empty snapshot, which
142    /// is appropriate for controllers that have no meaningful internal
143    /// state to surface.
144    fn snapshot(&self) -> ControllerSnapshot {
145        ControllerSnapshot::default()
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152
153    #[test]
154    fn sample_latency_is_difference_of_timestamps() {
155        let start = std::time::Instant::now();
156        let sample = Sample {
157            started_at: start,
158            completed_at: start + std::time::Duration::from_millis(5),
159            bytes: 0,
160            outcome: Outcome::Ok,
161        };
162        assert_eq!(sample.latency(), std::time::Duration::from_millis(5));
163    }
164
165    #[test]
166    fn decision_unlimited_imposes_no_limits() {
167        assert_eq!(Decision::UNLIMITED.max_in_flight, None);
168        assert_eq!(Decision::UNLIMITED.rate_per_sec, None);
169    }
170
171    #[test]
172    fn decision_constructors_set_only_the_named_dimension() {
173        let c = Decision::with_concurrency(8);
174        assert_eq!(c.max_in_flight, Some(8));
175        assert_eq!(c.rate_per_sec, None);
176        let r = Decision::with_rate(100.0);
177        assert_eq!(r.max_in_flight, None);
178        assert_eq!(r.rate_per_sec, Some(100.0));
179        let both = Decision::with_concurrency_and_rate(4, 50.0);
180        assert_eq!(both.max_in_flight, Some(4));
181        assert_eq!(both.rate_per_sec, Some(50.0));
182    }
183}