use std::sync::Arc;
use std::time::{Duration, Instant};
use serde_json::Value;
use tokio::sync::Mutex;
pub use crate::ports::graphql_plugin::CostThrottleConfig;
#[derive(Debug)]
pub struct LiveBudget {
currently_available: f64,
maximum_available: f64,
restore_rate: f64, last_updated: Instant,
pending: f64,
}
impl LiveBudget {
#[must_use]
pub fn new(config: &CostThrottleConfig) -> Self {
Self {
currently_available: config.max_points,
maximum_available: config.max_points,
restore_rate: config.restore_per_sec,
last_updated: Instant::now(),
pending: 0.0,
}
}
pub fn update_from_response(&mut self, throttle_status: &Value) {
if let Some(max) = throttle_status["maximumAvailable"].as_f64() {
self.maximum_available = max;
}
if let Some(cur) = throttle_status["currentlyAvailable"].as_f64() {
self.currently_available = cur;
}
if let Some(rate) = throttle_status["restoreRate"].as_f64() {
self.restore_rate = rate;
}
self.last_updated = Instant::now();
}
fn projected_available(&self) -> f64 {
let elapsed = self.last_updated.elapsed().as_secs_f64();
let restored = elapsed * self.restore_rate;
let gross = (self.currently_available + restored).min(self.maximum_available);
(gross - self.pending).max(0.0)
}
fn reserve(&mut self, cost: f64) {
self.pending += cost;
}
fn release(&mut self, cost: f64) {
self.pending = (self.pending - cost).max(0.0);
}
}
#[derive(Clone, Debug)]
pub struct PluginBudget {
inner: Arc<Mutex<LiveBudget>>,
config: CostThrottleConfig,
}
impl PluginBudget {
#[must_use]
pub fn new(config: CostThrottleConfig) -> Self {
let budget = LiveBudget::new(&config);
Self {
inner: Arc::new(Mutex::new(budget)),
config,
}
}
#[must_use]
pub const fn config(&self) -> &CostThrottleConfig {
&self.config
}
}
pub struct BudgetGuard {
budget: Option<PluginBudget>,
cost: f64,
}
impl BudgetGuard {
pub async fn acquire(budget: &PluginBudget) -> Self {
let cost = pre_flight_reserve(budget).await;
Self {
budget: Some(budget.clone()),
cost,
}
}
#[must_use]
pub const fn cost(&self) -> f64 {
self.cost
}
pub async fn release(mut self) {
if let Some(budget) = self.budget.take() {
release_reservation(&budget, self.cost).await;
}
}
}
impl Drop for BudgetGuard {
fn drop(&mut self) {
if let Some(budget) = self.budget.take() {
let cost = self.cost;
tokio::spawn(async move {
release_reservation(&budget, cost).await;
tracing::debug!(
cost,
"BudgetGuard: reservation released via Drop safety-net"
);
});
}
}
}
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
pub async fn pre_flight_reserve(budget: &PluginBudget) -> f64 {
let estimated_cost = budget.config.estimated_cost_per_request;
let delay = {
let mut guard = budget.inner.lock().await;
let projected = guard.projected_available();
let rate = guard.restore_rate.max(1.0);
let min = budget.config.min_available;
let delay = if projected < min + estimated_cost {
let deficit = (min + estimated_cost) - projected;
let secs = (deficit / rate) * 1.1;
let ms = (secs * 1_000.0) as u64;
Some(Duration::from_millis(ms.min(budget.config.max_delay_ms)))
} else {
None
};
guard.reserve(estimated_cost);
delay
};
if let Some(d) = delay {
tracing::debug!(
delay_ms = d.as_millis(),
"graphql throttle: pre-flight delay"
);
tokio::time::sleep(d).await;
}
estimated_cost
}
pub async fn release_reservation(budget: &PluginBudget, cost: f64) {
let mut guard = budget.inner.lock().await;
guard.release(cost);
}
pub async fn update_budget(budget: &PluginBudget, response_body: &Value) {
let Some(status) = response_body.pointer("/extensions/cost/throttleStatus") else {
return;
};
if status.is_object() {
let mut guard = budget.inner.lock().await;
guard.update_from_response(status);
}
}
#[must_use]
#[allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::cast_possible_wrap
)]
pub fn reactive_backoff_ms(config: &CostThrottleConfig, body: &Value, attempt: u32) -> u64 {
let status = body.pointer("/extensions/cost/throttleStatus");
let max_avail = status
.and_then(|s| s.get("maximumAvailable"))
.and_then(Value::as_f64)
.unwrap_or(config.max_points);
let cur_avail = status
.and_then(|s| s.get("currentlyAvailable"))
.and_then(Value::as_f64)
.unwrap_or(0.0);
let restore_rate = status
.and_then(|s| s.get("restoreRate"))
.and_then(Value::as_f64)
.unwrap_or(config.restore_per_sec)
.max(1.0);
let deficit = (max_avail - cur_avail).max(0.0);
let base_secs = if deficit > 0.0 {
(deficit / restore_rate) * 1.1
} else {
0.5
};
let backoff = base_secs * 1.5_f64.powi(attempt as i32);
let ms = (backoff * 1_000.0) as u64;
ms.clamp(500, config.max_delay_ms)
}
#[cfg(test)]
#[allow(
clippy::float_cmp,
clippy::unwrap_used,
clippy::significant_drop_tightening
)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn live_budget_initialises_from_config() {
let config = CostThrottleConfig {
max_points: 5_000.0,
restore_per_sec: 250.0,
min_available: 50.0,
max_delay_ms: 10_000,
estimated_cost_per_request: 100.0,
};
let budget = LiveBudget::new(&config);
assert_eq!(budget.currently_available, 5_000.0);
assert_eq!(budget.maximum_available, 5_000.0);
assert_eq!(budget.restore_rate, 250.0);
}
#[test]
fn live_budget_updates_from_response() {
let config = CostThrottleConfig::default();
let mut budget = LiveBudget::new(&config);
let status = json!({
"maximumAvailable": 10_000.0,
"currentlyAvailable": 3_000.0,
"restoreRate": 500.0,
});
budget.update_from_response(&status);
assert_eq!(budget.currently_available, 3_000.0);
assert_eq!(budget.maximum_available, 10_000.0);
}
#[test]
fn projected_available_accounts_for_restore() {
let config = CostThrottleConfig {
max_points: 10_000.0,
restore_per_sec: 1_000.0, ..Default::default()
};
let mut budget = LiveBudget::new(&config);
budget.currently_available = 0.0;
let p = budget.projected_available();
assert!(p >= 0.0);
assert!(p <= 10_000.0);
}
#[test]
fn projected_available_caps_at_maximum() {
let config = CostThrottleConfig::default();
let budget = LiveBudget::new(&config);
assert!(budget.projected_available() <= budget.maximum_available);
}
#[tokio::test]
async fn pre_flight_reserve_does_not_sleep_when_budget_healthy() {
let budget = PluginBudget::new(CostThrottleConfig::default());
let before = Instant::now();
let reserved = pre_flight_reserve(&budget).await;
assert!(before.elapsed().as_millis() < 100, "unexpected delay");
assert_eq!(
reserved,
CostThrottleConfig::default().estimated_cost_per_request
);
release_reservation(&budget, reserved).await;
}
#[tokio::test]
async fn update_budget_parses_throttle_status() {
let budget = PluginBudget::new(CostThrottleConfig::default());
let response = json!({
"data": {},
"extensions": { "cost": { "throttleStatus": {
"maximumAvailable": 10_000.0,
"currentlyAvailable": 2_500.0,
"restoreRate": 500.0,
}}}
});
update_budget(&budget, &response).await;
let guard = budget.inner.lock().await;
assert_eq!(guard.currently_available, 2_500.0);
}
#[tokio::test]
async fn concurrent_reservations_reduce_projected_available() {
let config = CostThrottleConfig {
max_points: 1_000.0,
estimated_cost_per_request: 200.0,
..Default::default()
};
let budget = PluginBudget::new(config);
let r1 = pre_flight_reserve(&budget).await;
let r2 = pre_flight_reserve(&budget).await;
{
let guard = budget.inner.lock().await;
assert!((guard.pending - 400.0).abs() < f64::EPSILON);
let projected = guard.projected_available();
assert!((599.0..=601.0).contains(&projected));
}
release_reservation(&budget, r1).await;
release_reservation(&budget, r2).await;
let guard = budget.inner.lock().await;
assert!(guard.pending < f64::EPSILON);
}
#[test]
fn reactive_backoff_ms_clamps_to_500ms_floor() {
let config = CostThrottleConfig::default();
let body = json!({ "extensions": { "cost": { "throttleStatus": {
"maximumAvailable": 10_000.0,
"currentlyAvailable": 9_999.0,
"restoreRate": 500.0,
}}}});
let ms = reactive_backoff_ms(&config, &body, 0);
assert_eq!(ms, 500); }
#[test]
fn reactive_backoff_ms_increases_with_attempt() {
let config = CostThrottleConfig::default();
let body = json!({ "extensions": { "cost": { "throttleStatus": {
"maximumAvailable": 10_000.0,
"currentlyAvailable": 5_000.0,
"restoreRate": 500.0,
}}}});
let ms0 = reactive_backoff_ms(&config, &body, 0);
let ms1 = reactive_backoff_ms(&config, &body, 1);
let ms2 = reactive_backoff_ms(&config, &body, 2);
assert!(ms1 > ms0);
assert!(ms2 > ms1);
}
#[test]
fn reactive_backoff_ms_caps_at_max_delay() {
let config = CostThrottleConfig {
max_delay_ms: 1_000,
..Default::default()
};
let body = json!({ "extensions": { "cost": { "throttleStatus": {
"maximumAvailable": 10_000.0,
"currentlyAvailable": 0.0,
"restoreRate": 1.0, }}}});
let ms = reactive_backoff_ms(&config, &body, 10);
assert_eq!(ms, 1_000);
}
#[tokio::test]
async fn budget_guard_releases_on_explicit_release() {
let budget = PluginBudget::new(CostThrottleConfig::default());
let guard = BudgetGuard::acquire(&budget).await;
let cost = guard.cost();
assert!(cost > 0.0);
{
let inner = budget.inner.lock().await;
assert!(inner.pending >= cost);
}
guard.release().await;
let inner = budget.inner.lock().await;
assert!(inner.pending < f64::EPSILON);
}
#[tokio::test]
async fn budget_guard_releases_on_drop() {
let budget = PluginBudget::new(CostThrottleConfig::default());
{
let _guard = BudgetGuard::acquire(&budget).await;
}
tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(10)).await;
let inner = budget.inner.lock().await;
assert!(inner.pending < f64::EPSILON, "Drop should have released");
}
}