Skip to main content

congestion/
control_loop.rs

1//! Async control-loop plumbing that ties a [`Controller`] to the hot path.
2//!
3//! The pipeline per controlled resource is:
4//!
5//! ```text
6//!     Probe::complete_ok(_) ──► RoutingSink ──mpsc──► ControlUnit ──watch──► enforcement
7//!                                    (Arc)                (task)               (throttle)
8//! ```
9//!
10//! [`RoutingSink`] receives samples from the global `SampleSink` and fans
11//! them out to per-resource bounded MPSC channels. Each resource has its
12//! own [`ControlUnit`] running as a tokio task: it owns a `Controller`,
13//! drains samples, ticks on a configurable interval, and publishes each
14//! [`Decision`] on a `tokio::sync::watch` channel so the enforcement layer
15//! can apply it.
16//!
17//! # Backpressure
18//!
19//! Samples are a lossy signal: dropping a small fraction does not harm the
20//! controller's estimate. To keep a stalled or slow control task from
21//! leaking memory under a heavy probe rate, the per-resource channels are
22//! bounded (see [`DEFAULT_CHANNEL_CAPACITY`]) and overflow samples are
23//! silently dropped. The count of dropped samples is exposed via
24//! [`RoutingSink::dropped_samples`] for diagnostics.
25
26use crate::controller::{Controller, ControllerSnapshot, Decision, Sample};
27use crate::measurement::{
28    MetadataOp, N_META_OPS, N_META_RESOURCES, ResourceKind, SampleSink, Side,
29};
30
31/// Default cadence at which a control unit calls `on_tick`.
32pub const DEFAULT_TICK_INTERVAL: std::time::Duration = std::time::Duration::from_millis(50);
33
34/// Default per-resource sample channel capacity. At a probe rate of 100k/sec
35/// this is ~40ms of headroom; enough to survive a brief stall in the
36/// control-task scheduler without dropping samples.
37pub const DEFAULT_CHANNEL_CAPACITY: usize = 4096;
38
39/// A single resource's control task.
40///
41/// Construct with [`ControlUnit::new`], receive the decision and snapshot
42/// watches via the returned `(unit, decision_rx, snapshot_rx)` tuple, then
43/// spawn the unit with [`ControlUnit::spawn`]. The task runs until the
44/// sample channel's senders are all dropped (typically because
45/// [`clear_sample_sink`][crate::clear_sample_sink] was called and the
46/// `RoutingSink` went away).
47///
48/// # Watches
49///
50/// Two watches keep enforcement and observability separate:
51///
52/// - `decision_rx` carries the [`Decision`] the enforcement layer applies.
53///   Subscribers wake on every change so caps land promptly.
54/// - `snapshot_rx` carries a [`ControllerSnapshot`] for diagnostics —
55///   used by the progress bar and other UI surfaces. Snapshot-only
56///   changes (e.g. baseline drift on an unchanged `cwnd`) do not wake
57///   enforcement, and a busy enforcement subscriber does not stall
58///   snapshot publication.
59///
60/// # Logging
61///
62/// Each unit emits structured `tracing` events keyed by its [`label`] so
63/// multiple units can be told apart in mixed logs:
64///
65/// - `tracing::trace!` on every tick with the published [`Decision`].
66/// - `tracing::debug!` whenever the published decision differs from the
67///   prior tick's. This is the right level for watching cwnd evolve in
68///   production without drowning in per-tick noise.
69///
70/// [`label`]: ControlUnit::label
71pub struct ControlUnit<C: Controller> {
72    label: &'static str,
73    controller: C,
74    sample_rx: tokio::sync::mpsc::Receiver<Sample>,
75    decision_tx: tokio::sync::watch::Sender<Decision>,
76    snapshot_tx: tokio::sync::watch::Sender<ControllerSnapshot>,
77    tick_interval: std::time::Duration,
78}
79
80impl<C: Controller + 'static> ControlUnit<C> {
81    /// Build a new control unit. Returns the unit, a receiver for its
82    /// decision stream, and a receiver for its snapshot stream. The
83    /// initial decision is [`Decision::UNLIMITED`] and the initial
84    /// snapshot is [`ControllerSnapshot::default`] until the first tick
85    /// fires.
86    ///
87    /// `label` is a short, stable string that identifies this unit in
88    /// log events and in the snapshot registry (e.g. `"meta-src"`,
89    /// `"meta-dst"`). Multiple units of the same controller type are
90    /// typical, so using only the controller name would make logs
91    /// ambiguous.
92    pub fn new(
93        label: &'static str,
94        controller: C,
95        sample_rx: tokio::sync::mpsc::Receiver<Sample>,
96        tick_interval: std::time::Duration,
97    ) -> (
98        Self,
99        tokio::sync::watch::Receiver<Decision>,
100        tokio::sync::watch::Receiver<ControllerSnapshot>,
101    ) {
102        let (decision_tx, decision_rx) = tokio::sync::watch::channel(Decision::UNLIMITED);
103        let (snapshot_tx, snapshot_rx) = tokio::sync::watch::channel(ControllerSnapshot::default());
104        (
105            Self {
106                label,
107                controller,
108                sample_rx,
109                decision_tx,
110                snapshot_tx,
111                tick_interval,
112            },
113            decision_rx,
114            snapshot_rx,
115        )
116    }
117
118    /// The label this unit was constructed with, used as the
119    /// `unit` field on every emitted tracing event.
120    pub fn label(&self) -> &'static str {
121        self.label
122    }
123
124    /// Spawn the control loop on the current tokio runtime. Returns a
125    /// `JoinHandle` that resolves when the sample channel closes.
126    pub fn spawn(self) -> tokio::task::JoinHandle<()> {
127        tokio::spawn(self.run())
128    }
129
130    /// Run the control loop until the sample channel closes.
131    pub async fn run(mut self) {
132        let mut interval = tokio::time::interval(self.tick_interval);
133        // the first `interval.tick()` resolves immediately (at t=0); use it
134        // to publish the controller's initial decision deterministically.
135        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
136        interval.tick().await;
137        let initial = self.controller.on_tick(std::time::Instant::now());
138        Self::log_tick(self.label, self.controller.name(), &initial, None);
139        let _ = self.decision_tx.send(initial);
140        let _ = self.snapshot_tx.send(self.controller.snapshot());
141        let mut last = initial;
142        loop {
143            tokio::select! {
144                _ = interval.tick() => {
145                    let decision = self.controller.on_tick(std::time::Instant::now());
146                    Self::log_tick(self.label, self.controller.name(), &decision, Some(&last));
147                    let _ = self.decision_tx.send(decision);
148                    let _ = self.snapshot_tx.send(self.controller.snapshot());
149                    last = decision;
150                }
151                sample = self.sample_rx.recv() => {
152                    match sample {
153                        Some(s) => {
154                            self.controller.on_sample(&s);
155                            // drain any other immediately-available samples
156                            // so a high sample rate doesn't starve the tick.
157                            while let Ok(s) = self.sample_rx.try_recv() {
158                                self.controller.on_sample(&s);
159                            }
160                        }
161                        None => break,
162                    }
163                }
164            }
165        }
166        tracing::debug!(
167            unit = %self.label,
168            controller = %self.controller.name(),
169            "control loop exiting: sample channel closed",
170        );
171    }
172
173    fn log_tick(
174        label: &'static str,
175        controller: &'static str,
176        decision: &Decision,
177        previous: Option<&Decision>,
178    ) {
179        // every tick at trace; only changes at debug, so production logs
180        // can stay at debug without drowning in unchanged-cwnd churn.
181        tracing::trace!(
182            unit = %label,
183            controller = %controller,
184            max_in_flight = ?decision.max_in_flight,
185            rate_per_sec = ?decision.rate_per_sec,
186            "control tick",
187        );
188        if previous.is_none_or(|p| p != decision) {
189            tracing::debug!(
190                unit = %label,
191                controller = %controller,
192                max_in_flight = ?decision.max_in_flight,
193                rate_per_sec = ?decision.rate_per_sec,
194                prev_max_in_flight = ?previous.and_then(|p| p.max_in_flight),
195                prev_rate_per_sec = ?previous.and_then(|p| p.rate_per_sec),
196                "decision changed",
197            );
198        }
199    }
200}
201
202/// Compute the flat array index for a `(Side, MetadataOp)` pair.
203///
204/// Side is the major axis (so all source-side ops cluster contiguously
205/// and the same for destination), op is the minor axis. The mapping is
206/// the canonical inverse of `RoutingSink::metadata`'s slot layout.
207fn metadata_index(side: Side, op: MetadataOp) -> usize {
208    (side as usize) * N_META_OPS + (op as usize)
209}
210
211/// A [`SampleSink`] that fans samples out to per-resource bounded MPSC
212/// channels, typically each drained by one [`ControlUnit`].
213///
214/// Built via [`RoutingSinkBuilder`]. When a channel is full, samples are
215/// dropped rather than blocking or allocating; the drop count is exposed
216/// by [`RoutingSink::dropped_samples`].
217pub struct RoutingSink {
218    /// One slot per `(Side, MetadataOp)` pair, indexed by
219    /// [`metadata_index`]. `None` slots are silently dropped — that's
220    /// how a tool that doesn't exercise a particular op opts out.
221    metadata: [Option<tokio::sync::mpsc::Sender<Sample>>; N_META_RESOURCES],
222    /// Parallel to `metadata`: optional histogram accumulator per
223    /// `(Side, MetadataOp)` pair. Recorded synchronously inside
224    /// [`SampleSink::record`] so every probe completion lands in the
225    /// accumulator before the function returns — eliminating the drain
226    /// race on shutdown.
227    histograms: [Option<std::sync::Arc<std::sync::Mutex<crate::histogram::HistogramAccumulator>>>;
228        N_META_RESOURCES],
229    read: Option<tokio::sync::mpsc::Sender<Sample>>,
230    write: Option<tokio::sync::mpsc::Sender<Sample>>,
231    dropped: std::sync::Arc<std::sync::atomic::AtomicU64>,
232}
233
234impl RoutingSink {
235    /// Cumulative count of samples dropped because the destination channel
236    /// was full. Closed-channel drops (receiver went away) are not counted.
237    pub fn dropped_samples(&self) -> u64 {
238        self.dropped.load(std::sync::atomic::Ordering::Relaxed)
239    }
240}
241
242impl SampleSink for RoutingSink {
243    /// Records a sample into the histogram accumulator and forwards it to
244    /// the controller's mpsc channel.
245    ///
246    /// The histogram update happens SYNCHRONOUSLY before the `try_send` to
247    /// the controller. This is intentional: the histogram is the source of
248    /// truth for "what probes completed", which must include every probe
249    /// regardless of whether the controller's queue drained. The
250    /// controller's own `samples_seen` counter (visible in
251    /// [`ControllerSnapshot`]) reflects what the controller actually acted
252    /// on, which may be less when the per-resource mpsc is saturated; the
253    /// difference is the count surfaced via [`RoutingSink::dropped_samples`]
254    /// and the periodic warning log. By design, n (histogram) >=
255    /// samples_seen (controller); their difference is the queue-saturation
256    /// signal. Recording only on `try_send` success would cause histogram
257    /// data loss precisely when the user most needs it: under heavy load,
258    /// the controller's queue can saturate, and that's exactly when the
259    /// distribution is most informative.
260    fn record(&self, kind: ResourceKind, sample: &Sample) {
261        // Synchronously update the histogram accumulator (if any) before
262        // the async hop to the control unit. This way every probe that
263        // completes is reflected in the accumulator immediately, so the
264        // logger's final snapshot at shutdown captures every sample —
265        // even ones that haven't yet been drained from the per-resource
266        // mpsc by the control-unit task.
267        if let ResourceKind::Metadata(side, op) = kind
268            && let Some(acc) = &self.histograms[metadata_index(side, op)]
269        {
270            acc.lock()
271                .expect("histogram accumulator mutex poisoned")
272                .record(sample.latency());
273        }
274        let tx = match kind {
275            ResourceKind::Metadata(side, op) => self.metadata[metadata_index(side, op)].as_ref(),
276            ResourceKind::DataRead => self.read.as_ref(),
277            ResourceKind::DataWrite => self.write.as_ref(),
278        };
279        if let Some(tx) = tx {
280            match tx.try_send(*sample) {
281                Ok(()) => {}
282                Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
283                    self.dropped
284                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
285                }
286                Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
287                    // ControlUnit exited; nothing to do.
288                }
289            }
290        }
291    }
292}
293
294/// Incrementally opt resources into the routing sink. Each `*_receiver`
295/// call registers a channel for the corresponding [`ResourceKind`] and
296/// returns the receiver the caller must hand to a [`ControlUnit`].
297pub struct RoutingSinkBuilder {
298    metadata: [Option<tokio::sync::mpsc::Sender<Sample>>; N_META_RESOURCES],
299    histograms: [Option<std::sync::Arc<std::sync::Mutex<crate::histogram::HistogramAccumulator>>>;
300        N_META_RESOURCES],
301    read: Option<tokio::sync::mpsc::Sender<Sample>>,
302    write: Option<tokio::sync::mpsc::Sender<Sample>>,
303    capacity: usize,
304    dropped: std::sync::Arc<std::sync::atomic::AtomicU64>,
305}
306
307impl Default for RoutingSinkBuilder {
308    fn default() -> Self {
309        Self {
310            metadata: [const { None }; N_META_RESOURCES],
311            histograms: [const { None }; N_META_RESOURCES],
312            read: None,
313            write: None,
314            capacity: DEFAULT_CHANNEL_CAPACITY,
315            dropped: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
316        }
317    }
318}
319
320impl RoutingSinkBuilder {
321    pub fn new() -> Self {
322        Self::default()
323    }
324    /// Override the per-channel capacity. Must be at least 1.
325    pub fn with_capacity(mut self, capacity: usize) -> Self {
326        self.capacity = capacity.max(1);
327        self
328    }
329    /// Register a channel for per-file metadata samples of the given
330    /// `(Side, MetadataOp)` pair and return its receiver. Each registered
331    /// pair gets an independent control unit; ops that aren't registered
332    /// have their samples silently dropped (no controller to feed).
333    pub fn metadata_receiver(
334        &mut self,
335        side: Side,
336        op: MetadataOp,
337    ) -> tokio::sync::mpsc::Receiver<Sample> {
338        let (tx, rx) = tokio::sync::mpsc::channel(self.capacity);
339        self.metadata[metadata_index(side, op)] = Some(tx);
340        rx
341    }
342
343    /// Register a histogram accumulator for the given `(Side, MetadataOp)`
344    /// pair. Synchronous: every probe completion will record into this
345    /// accumulator before `record` returns — eliminating the drain race on
346    /// shutdown that arises when the accumulator lives in the `ControlUnit`
347    /// task instead.
348    pub fn metadata_histogram(
349        &mut self,
350        side: Side,
351        op: MetadataOp,
352        accumulator: std::sync::Arc<std::sync::Mutex<crate::histogram::HistogramAccumulator>>,
353    ) {
354        self.histograms[metadata_index(side, op)] = Some(accumulator);
355    }
356    /// Register a channel for read-throughput samples and return its receiver.
357    pub fn read_receiver(&mut self) -> tokio::sync::mpsc::Receiver<Sample> {
358        let (tx, rx) = tokio::sync::mpsc::channel(self.capacity);
359        self.read = Some(tx);
360        rx
361    }
362    /// Register a channel for write-throughput samples and return its receiver.
363    pub fn write_receiver(&mut self) -> tokio::sync::mpsc::Receiver<Sample> {
364        let (tx, rx) = tokio::sync::mpsc::channel(self.capacity);
365        self.write = Some(tx);
366        rx
367    }
368    pub fn build(self) -> RoutingSink {
369        RoutingSink {
370            metadata: self.metadata,
371            histograms: self.histograms,
372            read: self.read,
373            write: self.write,
374            dropped: self.dropped,
375        }
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382    use crate::controller::Outcome;
383    use crate::measurement::{Probe, clear_sample_sink, install_sample_sink};
384    use crate::{FixedController, NoopController};
385
386    fn make_sample(latency_ms: u64) -> Sample {
387        let start = std::time::Instant::now();
388        Sample {
389            started_at: start,
390            completed_at: start + std::time::Duration::from_millis(latency_ms),
391            bytes: 0,
392            outcome: Outcome::Ok,
393        }
394    }
395
396    #[tokio::test]
397    async fn control_unit_publishes_initial_decision_on_spawn() {
398        let (_tx, rx) = tokio::sync::mpsc::channel::<Sample>(32);
399        let controller = FixedController::with_concurrency(42);
400        let (unit, mut decision_rx, _snapshot_rx) =
401            ControlUnit::new("test", controller, rx, std::time::Duration::from_millis(10));
402        unit.spawn();
403        // initial decision published by the first tick
404        decision_rx
405            .changed()
406            .await
407            .expect("initial decision delivered");
408        let decision = *decision_rx.borrow();
409        assert_eq!(decision.max_in_flight, Some(42));
410    }
411
412    #[tokio::test]
413    async fn control_unit_feeds_samples_through_to_controller() {
414        // a controller that just counts samples
415        struct CountingController {
416            count: std::sync::Arc<std::sync::atomic::AtomicU64>,
417        }
418        impl Controller for CountingController {
419            fn on_sample(&mut self, _s: &Sample) {
420                self.count
421                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
422            }
423            fn on_tick(&mut self, _now: std::time::Instant) -> Decision {
424                Decision::UNLIMITED
425            }
426            fn name(&self) -> &'static str {
427                "counting"
428            }
429        }
430        let count = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
431        let (tx, rx) = tokio::sync::mpsc::channel::<Sample>(32);
432        let controller = CountingController {
433            count: count.clone(),
434        };
435        let (unit, _decision_rx, _snapshot_rx) = ControlUnit::new(
436            "test",
437            controller,
438            rx,
439            std::time::Duration::from_millis(100),
440        );
441        let handle = unit.spawn();
442        for _ in 0..5 {
443            tx.send(make_sample(1)).await.expect("send sample");
444        }
445        // closing the sender terminates the control loop
446        drop(tx);
447        handle.await.expect("control loop exits cleanly");
448        assert_eq!(count.load(std::sync::atomic::Ordering::Relaxed), 5);
449    }
450
451    #[tokio::test]
452    async fn routing_sink_dispatches_by_resource_kind() {
453        let mut builder = RoutingSinkBuilder::new();
454        let mut meta_rx = builder.metadata_receiver(Side::Source, MetadataOp::Stat);
455        let mut read_rx = builder.read_receiver();
456        let sink = builder.build();
457        // metadata sample reaches meta_rx
458        sink.record(
459            ResourceKind::Metadata(Side::Source, MetadataOp::Stat),
460            &make_sample(2),
461        );
462        let s = meta_rx.recv().await.expect("metadata sample delivered");
463        assert_eq!(s.bytes, 0);
464        // read sample reaches read_rx
465        sink.record(ResourceKind::DataRead, &make_sample(2));
466        let s = read_rx.recv().await.expect("read sample delivered");
467        assert_eq!(s.bytes, 0);
468        // write sample has no registered receiver; it is silently dropped
469        sink.record(ResourceKind::DataWrite, &make_sample(2));
470        // no assertion needed — the drop must not panic or block
471    }
472
473    #[tokio::test]
474    async fn routing_sink_separates_op_kinds() {
475        // Two metadata ops on the same side land in distinct channels.
476        // This is the property that lets per-syscall controllers maintain
477        // independent baselines.
478        let mut builder = RoutingSinkBuilder::new();
479        let mut stat_rx = builder.metadata_receiver(Side::Destination, MetadataOp::Stat);
480        let mut unlink_rx = builder.metadata_receiver(Side::Destination, MetadataOp::Unlink);
481        let sink = builder.build();
482        sink.record(
483            ResourceKind::Metadata(Side::Destination, MetadataOp::Stat),
484            &make_sample(1),
485        );
486        sink.record(
487            ResourceKind::Metadata(Side::Destination, MetadataOp::Unlink),
488            &make_sample(2),
489        );
490        sink.record(
491            ResourceKind::Metadata(Side::Destination, MetadataOp::Unlink),
492            &make_sample(3),
493        );
494        // Stat channel sees exactly 1 sample; unlink channel sees 2.
495        assert!(stat_rx.recv().await.is_some());
496        assert!(stat_rx.try_recv().is_err());
497        assert!(unlink_rx.recv().await.is_some());
498        assert!(unlink_rx.recv().await.is_some());
499        assert!(unlink_rx.try_recv().is_err());
500    }
501
502    #[tokio::test]
503    async fn routing_sink_counts_dropped_samples_when_channel_is_full() {
504        // tight capacity + no receiver draining → excess samples are dropped.
505        let mut builder = RoutingSinkBuilder::new().with_capacity(2);
506        let _meta_rx = builder.metadata_receiver(Side::Source, MetadataOp::Stat);
507        let sink = builder.build();
508        for _ in 0..5 {
509            sink.record(
510                ResourceKind::Metadata(Side::Source, MetadataOp::Stat),
511                &make_sample(1),
512            );
513        }
514        // first 2 fit in the channel buffer; remaining 3 are dropped.
515        assert_eq!(sink.dropped_samples(), 3);
516    }
517
518    #[tokio::test]
519    async fn routing_sink_integrates_with_global_probe_api() {
520        // guard the process-wide SampleSink mutation so this test can't race
521        // with measurement::tests or other sink-touching tests under
522        // `cargo test`'s threaded runner.
523        let _guard = crate::measurement::SINK_GUARD.lock().await;
524        let mut builder = RoutingSinkBuilder::new();
525        let mut meta_rx = builder.metadata_receiver(Side::Source, MetadataOp::Stat);
526        let sink = builder.build();
527        install_sample_sink(std::sync::Arc::new(sink));
528        Probe::start_metadata(Side::Source, MetadataOp::Stat).complete_ok(0);
529        let s = meta_rx.recv().await.expect("sample flowed through");
530        assert_eq!(s.bytes, 0);
531        clear_sample_sink();
532    }
533
534    #[tokio::test]
535    async fn control_unit_exits_when_all_senders_dropped() {
536        let (tx, rx) = tokio::sync::mpsc::channel::<Sample>(32);
537        let (unit, _decision_rx, _snapshot_rx) = ControlUnit::new(
538            "test",
539            NoopController::new(),
540            rx,
541            std::time::Duration::from_millis(10),
542        );
543        let handle = unit.spawn();
544        drop(tx);
545        tokio::time::timeout(std::time::Duration::from_secs(1), handle)
546            .await
547            .expect("control loop exits within timeout")
548            .expect("control loop joins without panic");
549    }
550
551    #[tokio::test]
552    async fn routing_sink_records_to_histogram_synchronously() {
553        // Synchronous histogram capture: a sample sent to the sink must
554        // land in the accumulator before record() returns, regardless of
555        // whether the corresponding ControlUnit's mpsc has been drained.
556        use crate::histogram::HistogramAccumulator;
557        let mut builder = RoutingSinkBuilder::new();
558        let _meta_rx = builder.metadata_receiver(Side::Source, MetadataOp::Stat);
559        let acc = std::sync::Arc::new(std::sync::Mutex::new(HistogramAccumulator::new()));
560        builder.metadata_histogram(Side::Source, MetadataOp::Stat, acc.clone());
561        let sink = builder.build();
562        sink.record(
563            ResourceKind::Metadata(Side::Source, MetadataOp::Stat),
564            &make_sample(5),
565        );
566        sink.record(
567            ResourceKind::Metadata(Side::Source, MetadataOp::Stat),
568            &make_sample(7),
569        );
570        // Synchronous capture: the accumulator already has both samples,
571        // even though the control-unit task (if any) has not been polled.
572        let snap = acc.lock().unwrap().snapshot_and_reset();
573        assert_eq!(snap.len(), 2);
574    }
575}