#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum AdapterAction {
SetMaxInFlight(usize),
SetOpsReplenish(usize),
EnableOpsThrottle,
DisableOpsThrottle,
}
const REPLENISH_INTERVAL_SECS: f64 = 0.1;
pub(crate) const DROP_REPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
pub(crate) async fn run_adapter(
resource: throttle::Resource,
apply_rate: bool,
mut decision_rx: tokio::sync::watch::Receiver<congestion::Decision>,
sink: std::sync::Arc<congestion::RoutingSink>,
) {
let mut drop_report_interval = tokio::time::interval(DROP_REPORT_INTERVAL);
drop_report_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
drop_report_interval.tick().await;
let mut last_reported = 0u64;
let mut last_applied: congestion::Decision = congestion::Decision::UNLIMITED;
loop {
tokio::select! {
changed = decision_rx.changed() => {
if changed.is_err() {
break;
}
let decision = *decision_rx.borrow();
for action in apply_decision(last_applied, decision) {
match action {
AdapterAction::SetMaxInFlight(n) => {
throttle::set_max_ops_in_flight(resource, n);
}
AdapterAction::SetOpsReplenish(n) => {
if apply_rate {
throttle::set_ops_replenish(n);
}
}
AdapterAction::EnableOpsThrottle => {
if apply_rate {
throttle::enable_ops_throttle();
}
}
AdapterAction::DisableOpsThrottle => {
if apply_rate {
throttle::disable_ops_throttle();
}
}
}
}
last_applied = decision;
}
_ = drop_report_interval.tick() => {
let total = sink.dropped_samples();
if total > last_reported {
tracing::warn!(
"auto-meta-throttle ({resource:?}): RoutingSink dropped {} samples in \
the last {:?} (total: {}); the control-loop channel is under-scaled \
or the task is stalling",
total - last_reported,
DROP_REPORT_INTERVAL,
total,
);
last_reported = total;
}
}
}
}
tracing::debug!(
"auto-meta-throttle ({resource:?}): adapter/monitor exiting (decision channel closed)",
);
}
pub(crate) fn apply_decision(
prev: congestion::Decision,
new: congestion::Decision,
) -> Vec<AdapterAction> {
let mut actions = Vec::new();
if new.max_in_flight != prev.max_in_flight {
match new.max_in_flight {
Some(max) => actions.push(AdapterAction::SetMaxInFlight(max as usize)),
None => actions.push(AdapterAction::SetMaxInFlight(0)),
}
}
if new.rate_per_sec != prev.rate_per_sec {
match new.rate_per_sec {
Some(rate) => {
let replenish = if rate > 0.0 {
((rate * REPLENISH_INTERVAL_SECS) as usize).max(1)
} else {
0
};
actions.push(AdapterAction::SetOpsReplenish(replenish));
actions.push(AdapterAction::EnableOpsThrottle);
}
None => actions.push(AdapterAction::DisableOpsThrottle),
}
}
actions
}
#[cfg(test)]
mod tests {
use super::*;
use congestion::Decision;
#[test]
fn identical_decisions_emit_no_actions() {
let d = Decision::with_concurrency(8);
assert!(apply_decision(d, d).is_empty());
}
#[test]
fn unlimited_to_unlimited_is_noop() {
let d = Decision::UNLIMITED;
assert!(apply_decision(d, d).is_empty());
}
#[test]
fn max_in_flight_none_to_some_applies_cap() {
let actions = apply_decision(Decision::UNLIMITED, Decision::with_concurrency(10));
assert_eq!(actions, vec![AdapterAction::SetMaxInFlight(10)]);
}
#[test]
fn max_in_flight_some_to_some_applies_new_cap() {
let actions = apply_decision(
Decision::with_concurrency(5),
Decision::with_concurrency(20),
);
assert_eq!(actions, vec![AdapterAction::SetMaxInFlight(20)]);
}
#[test]
fn max_in_flight_some_to_none_disables_via_zero_sentinel() {
let actions = apply_decision(Decision::with_concurrency(10), Decision::UNLIMITED);
assert_eq!(actions, vec![AdapterAction::SetMaxInFlight(0)]);
}
#[test]
fn rate_none_to_some_enables_after_setting_replenish() {
let actions = apply_decision(Decision::UNLIMITED, Decision::with_rate(1_000.0));
assert_eq!(
actions,
vec![
AdapterAction::SetOpsReplenish(100),
AdapterAction::EnableOpsThrottle,
],
);
}
#[test]
fn rate_some_to_some_updates_replenish_and_re_enables() {
let actions = apply_decision(Decision::with_rate(100.0), Decision::with_rate(500.0));
assert_eq!(
actions,
vec![
AdapterAction::SetOpsReplenish(50),
AdapterAction::EnableOpsThrottle,
],
);
}
#[test]
fn rate_some_to_none_disables_throttle() {
let actions = apply_decision(Decision::with_rate(1_000.0), Decision::UNLIMITED);
assert_eq!(actions, vec![AdapterAction::DisableOpsThrottle]);
}
#[test]
fn both_dimensions_diff_independently() {
let actions = apply_decision(
Decision::with_concurrency_and_rate(4, 100.0),
Decision::with_concurrency_and_rate(8, 500.0),
);
assert_eq!(
actions,
vec![
AdapterAction::SetMaxInFlight(8),
AdapterAction::SetOpsReplenish(50),
AdapterAction::EnableOpsThrottle,
],
);
}
#[test]
fn only_changed_dimension_emits_action() {
let actions = apply_decision(
Decision::with_concurrency_and_rate(4, 100.0),
Decision::with_concurrency_and_rate(8, 100.0),
);
assert_eq!(actions, vec![AdapterAction::SetMaxInFlight(8)]);
let actions = apply_decision(
Decision::with_concurrency_and_rate(4, 100.0),
Decision::with_concurrency_and_rate(4, 500.0),
);
assert_eq!(
actions,
vec![
AdapterAction::SetOpsReplenish(50),
AdapterAction::EnableOpsThrottle,
],
);
}
#[test]
fn rate_conversion_uses_100ms_interval() {
let actions = apply_decision(Decision::UNLIMITED, Decision::with_rate(10_000.0));
assert_eq!(actions[0], AdapterAction::SetOpsReplenish(1_000));
}
#[test]
fn rate_conversion_rounds_down() {
let actions = apply_decision(Decision::UNLIMITED, Decision::with_rate(125.0));
assert_eq!(actions[0], AdapterAction::SetOpsReplenish(12));
}
#[test]
fn rate_conversion_floors_positive_rates_at_one_token() {
let actions = apply_decision(Decision::UNLIMITED, Decision::with_rate(5.0));
assert_eq!(actions[0], AdapterAction::SetOpsReplenish(1));
let actions = apply_decision(Decision::UNLIMITED, Decision::with_rate(0.1));
assert_eq!(actions[0], AdapterAction::SetOpsReplenish(1));
let actions = apply_decision(Decision::UNLIMITED, Decision::with_rate(10.0));
assert_eq!(actions[0], AdapterAction::SetOpsReplenish(1));
}
#[test]
fn rate_conversion_clamps_negative_and_nan_to_zero() {
let actions_neg = apply_decision(Decision::UNLIMITED, Decision::with_rate(-5.0));
assert_eq!(actions_neg[0], AdapterAction::SetOpsReplenish(0));
let actions_nan = apply_decision(Decision::UNLIMITED, Decision::with_rate(f64::NAN));
assert_eq!(actions_nan[0], AdapterAction::SetOpsReplenish(0));
}
mod properties {
use super::*;
use proptest::prelude::*;
fn any_decision() -> impl Strategy<Value = Decision> {
(
prop::option::of(1u32..10_000),
prop::option::of(0.0f64..100_000.0),
)
.prop_map(|(max, rate)| Decision {
max_in_flight: max,
rate_per_sec: rate,
})
}
proptest! {
#[test]
fn identity_emits_no_actions(d in any_decision()) {
prop_assert!(apply_decision(d, d).is_empty());
}
#[test]
fn only_changed_dimensions_produce_actions(
prev in any_decision(),
new in any_decision(),
) {
let actions = apply_decision(prev, new);
let max_changed = prev.max_in_flight != new.max_in_flight;
let rate_changed = prev.rate_per_sec != new.rate_per_sec;
let has_max_action = actions
.iter()
.any(|a| matches!(a, AdapterAction::SetMaxInFlight(_)));
let has_rate_action = actions.iter().any(|a| {
matches!(
a,
AdapterAction::SetOpsReplenish(_)
| AdapterAction::EnableOpsThrottle
| AdapterAction::DisableOpsThrottle
)
});
prop_assert_eq!(has_max_action, max_changed);
prop_assert_eq!(has_rate_action, rate_changed);
}
#[test]
fn rate_some_transitions_pair_replenish_and_enable(
prev in any_decision(),
rate in 0.0f64..100_000.0,
) {
let new = Decision {
max_in_flight: prev.max_in_flight,
rate_per_sec: Some(rate),
};
let actions = apply_decision(prev, new);
if prev.rate_per_sec == Some(rate) {
prop_assert!(!actions.iter().any(|a| matches!(a,
AdapterAction::SetOpsReplenish(_)
| AdapterAction::EnableOpsThrottle
)));
} else {
let rate_actions: Vec<_> = actions
.iter()
.filter(|a| matches!(
a,
AdapterAction::SetOpsReplenish(_)
| AdapterAction::EnableOpsThrottle
| AdapterAction::DisableOpsThrottle
))
.copied()
.collect();
prop_assert_eq!(rate_actions.len(), 2);
prop_assert!(matches!(
rate_actions[0],
AdapterAction::SetOpsReplenish(_)
));
prop_assert_eq!(rate_actions[1], AdapterAction::EnableOpsThrottle);
}
}
#[test]
fn rate_none_transitions_only_disable(
prev_rate in 0.0f64..100_000.0,
prev_max in prop::option::of(1u32..10_000),
) {
let prev = Decision {
max_in_flight: prev_max,
rate_per_sec: Some(prev_rate),
};
let new = Decision {
max_in_flight: prev_max,
rate_per_sec: None,
};
let actions = apply_decision(prev, new);
let rate_actions: Vec<_> = actions
.iter()
.filter(|a| matches!(
a,
AdapterAction::SetOpsReplenish(_)
| AdapterAction::EnableOpsThrottle
| AdapterAction::DisableOpsThrottle
))
.copied()
.collect();
prop_assert_eq!(rate_actions, vec![AdapterAction::DisableOpsThrottle]);
}
#[test]
fn action_count_bounded(
prev in any_decision(),
new in any_decision(),
) {
let actions = apply_decision(prev, new);
prop_assert!(actions.len() <= 3);
}
}
}
#[test]
fn full_cycle_disable_then_re_enable_applies_enable() {
let disabled = apply_decision(Decision::with_rate(100.0), Decision::UNLIMITED);
assert_eq!(disabled, vec![AdapterAction::DisableOpsThrottle]);
let re_enabled = apply_decision(Decision::UNLIMITED, Decision::with_rate(200.0));
assert_eq!(
re_enabled,
vec![
AdapterAction::SetOpsReplenish(20),
AdapterAction::EnableOpsThrottle,
],
);
}
mod feedback {
use super::*;
use congestion::{
ControlUnit, Controller, Decision, FixedController, RoutingSinkBuilder, Sample, Side,
};
static FEEDBACK_GUARD: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
async fn reset_globals() {
for &side in &throttle::Side::ALL {
for &op in &throttle::MetadataOp::ALL {
throttle::set_max_ops_in_flight(throttle::Resource::meta(side, op), 0);
}
}
throttle::disable_ops_throttle();
congestion::clear_sample_sink();
}
async fn wire_adapter<C: congestion::Controller + 'static>(
side: Side,
op: congestion::MetadataOp,
resource: throttle::Resource,
controller: C,
tick: std::time::Duration,
) -> (tokio::task::JoinHandle<()>, tokio::task::JoinHandle<()>) {
let mut builder = RoutingSinkBuilder::new();
let metadata_rx = builder.metadata_receiver(side, op);
let sink = std::sync::Arc::new(builder.build());
congestion::install_sample_sink(sink.clone());
let (unit, decision_rx, _snapshot_rx) =
ControlUnit::new("test", controller, metadata_rx, tick);
let unit_handle = unit.spawn();
let adapter_handle = tokio::spawn(run_adapter(resource, true, decision_rx, sink));
(unit_handle, adapter_handle)
}
#[tokio::test]
async fn fixed_controller_initial_decision_reaches_throttle() {
let _g = FEEDBACK_GUARD.lock().await;
reset_globals().await;
let side = Side::Source;
let op = congestion::MetadataOp::Stat;
let resource =
throttle::Resource::meta(throttle::Side::Source, throttle::MetadataOp::Stat);
let (unit, adapter) = wire_adapter(
side,
op,
resource,
FixedController::with_concurrency(42),
std::time::Duration::from_millis(10),
)
.await;
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
loop {
if throttle::current_ops_in_flight_limit(resource) == 42 {
break;
}
if std::time::Instant::now() >= deadline {
panic!(
"adapter did not apply initial cwnd=42 within 500ms; \
current limit = {}",
throttle::current_ops_in_flight_limit(resource),
);
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
unit.abort();
adapter.abort();
reset_globals().await;
}
struct ScriptedController {
script: Vec<Decision>,
idx: usize,
}
impl Controller for ScriptedController {
fn on_sample(&mut self, _s: &Sample) {}
fn on_tick(&mut self, _now: std::time::Instant) -> Decision {
let d = self.script[self.idx];
if self.idx + 1 < self.script.len() {
self.idx += 1;
}
d
}
fn name(&self) -> &'static str {
"scripted"
}
}
#[tokio::test]
async fn scripted_decisions_propagate_in_order_to_throttle() {
let _g = FEEDBACK_GUARD.lock().await;
reset_globals().await;
let side = Side::Source;
let op = congestion::MetadataOp::Stat;
let resource =
throttle::Resource::meta(throttle::Side::Source, throttle::MetadataOp::Stat);
let tick = std::time::Duration::from_millis(20);
let controller = ScriptedController {
script: vec![
Decision::with_concurrency(5),
Decision::with_concurrency(25),
Decision::with_concurrency(3),
],
idx: 0,
};
let (unit, adapter) = wire_adapter(side, op, resource, controller, tick).await;
for target in [5_usize, 25, 3] {
let deadline = std::time::Instant::now() + tick * 20;
while throttle::current_ops_in_flight_limit(resource) != target {
if std::time::Instant::now() >= deadline {
panic!(
"cwnd did not reach scripted value {target} within {:?}; \
observed = {}",
tick * 20,
throttle::current_ops_in_flight_limit(resource),
);
}
tokio::time::sleep(tick / 4).await;
}
}
unit.abort();
adapter.abort();
reset_globals().await;
}
#[tokio::test]
async fn decision_with_none_clears_cap_at_throttle() {
let _g = FEEDBACK_GUARD.lock().await;
reset_globals().await;
let side = Side::Source;
let op = congestion::MetadataOp::Stat;
let resource =
throttle::Resource::meta(throttle::Side::Source, throttle::MetadataOp::Stat);
let tick = std::time::Duration::from_millis(20);
let controller = ScriptedController {
script: vec![Decision::with_concurrency(15), Decision::UNLIMITED],
idx: 0,
};
let (unit, adapter) = wire_adapter(side, op, resource, controller, tick).await;
let deadline = std::time::Instant::now() + tick * 10;
while throttle::current_ops_in_flight_limit(resource) != 15 {
if std::time::Instant::now() >= deadline {
panic!("cwnd=15 never landed");
}
tokio::time::sleep(tick / 4).await;
}
let deadline = std::time::Instant::now() + tick * 20;
while throttle::current_ops_in_flight_limit(resource) != 0 {
if std::time::Instant::now() >= deadline {
panic!(
"cwnd did not clear to 0 after None decision; observed {}",
throttle::current_ops_in_flight_limit(resource),
);
}
tokio::time::sleep(tick / 4).await;
}
unit.abort();
adapter.abort();
reset_globals().await;
}
#[tokio::test]
async fn adapter_exits_when_decision_channel_closes() {
let _g = FEEDBACK_GUARD.lock().await;
reset_globals().await;
let side = Side::Source;
let op = congestion::MetadataOp::Stat;
let resource =
throttle::Resource::meta(throttle::Side::Source, throttle::MetadataOp::Stat);
let mut builder = RoutingSinkBuilder::new();
let metadata_rx = builder.metadata_receiver(side, op);
let sink = std::sync::Arc::new(builder.build());
congestion::install_sample_sink(sink.clone());
let (unit, decision_rx, _snapshot_rx) = ControlUnit::new(
"test",
FixedController::with_concurrency(1),
metadata_rx,
std::time::Duration::from_millis(5),
);
let unit_handle = unit.spawn();
let adapter_handle = tokio::spawn(run_adapter(resource, true, decision_rx, sink));
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
congestion::clear_sample_sink();
unit_handle.abort();
let adapter_result =
tokio::time::timeout(std::time::Duration::from_secs(1), adapter_handle).await;
assert!(
adapter_result.is_ok(),
"adapter did not exit within 1s of decision channel close"
);
reset_globals().await;
}
}
}