stygian_graph/adapters/
graphql_throttle.rs1use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use serde_json::Value;
27use tokio::sync::Mutex;
28
29pub use crate::ports::graphql_plugin::CostThrottleConfig;
31
32#[derive(Debug)]
41pub struct LiveBudget {
42 currently_available: f64,
43 maximum_available: f64,
44 restore_rate: f64, last_updated: Instant,
46}
47
48impl LiveBudget {
49 #[must_use]
51 pub fn new(config: &CostThrottleConfig) -> Self {
52 Self {
53 currently_available: config.max_points,
54 maximum_available: config.max_points,
55 restore_rate: config.restore_per_sec,
56 last_updated: Instant::now(),
57 }
58 }
59
60 pub fn update_from_response(&mut self, throttle_status: &Value) {
81 if let Some(max) = throttle_status["maximumAvailable"].as_f64() {
82 self.maximum_available = max;
83 }
84 if let Some(cur) = throttle_status["currentlyAvailable"].as_f64() {
85 self.currently_available = cur;
86 }
87 if let Some(rate) = throttle_status["restoreRate"].as_f64() {
88 self.restore_rate = rate;
89 }
90 self.last_updated = Instant::now();
91 }
92
93 fn projected_available(&self) -> f64 {
95 let elapsed = self.last_updated.elapsed().as_secs_f64();
96 let restored = elapsed * self.restore_rate;
97 (self.currently_available + restored).min(self.maximum_available)
98 }
99}
100
101#[derive(Clone, Debug)]
119pub struct PluginBudget {
120 inner: Arc<Mutex<LiveBudget>>,
121 config: CostThrottleConfig,
122}
123
124impl PluginBudget {
125 #[must_use]
127 pub fn new(config: CostThrottleConfig) -> Self {
128 let budget = LiveBudget::new(&config);
129 Self {
130 inner: Arc::new(Mutex::new(budget)),
131 config,
132 }
133 }
134}
135
136#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
157pub async fn pre_flight_delay(budget: &PluginBudget) {
158 let delay = {
159 let guard = budget.inner.lock().await;
160 let projected = guard.projected_available();
161 let rate = guard.restore_rate.max(1.0);
162 drop(guard);
163 let min = budget.config.min_available;
164 if projected < min {
165 let deficit = min - projected;
166 let secs = (deficit / rate) * 1.1;
167 let ms = (secs * 1_000.0) as u64;
168 Some(Duration::from_millis(ms.min(budget.config.max_delay_ms)))
169 } else {
170 None
171 }
172 };
173
174 if let Some(d) = delay {
175 tracing::debug!(
176 delay_ms = d.as_millis(),
177 "graphql throttle: pre-flight delay"
178 );
179 tokio::time::sleep(d).await;
180 }
181}
182
183pub async fn update_budget(budget: &PluginBudget, response_body: &Value) {
208 let Some(status) = response_body.pointer("/extensions/cost/throttleStatus") else {
209 return;
210 };
211 if status.is_object() {
212 let mut guard = budget.inner.lock().await;
213 guard.update_from_response(status);
214 }
215}
216
217#[must_use]
244#[allow(
245 clippy::cast_possible_truncation,
246 clippy::cast_sign_loss,
247 clippy::cast_possible_wrap
248)]
249pub fn reactive_backoff_ms(config: &CostThrottleConfig, body: &Value, attempt: u32) -> u64 {
250 let status = body.pointer("/extensions/cost/throttleStatus");
251 let max_avail = status
252 .and_then(|s| s.get("maximumAvailable"))
253 .and_then(Value::as_f64)
254 .unwrap_or(config.max_points);
255 let cur_avail = status
256 .and_then(|s| s.get("currentlyAvailable"))
257 .and_then(Value::as_f64)
258 .unwrap_or(0.0);
259 let restore_rate = status
260 .and_then(|s| s.get("restoreRate"))
261 .and_then(Value::as_f64)
262 .unwrap_or(config.restore_per_sec)
263 .max(1.0);
264 let deficit = (max_avail - cur_avail).max(0.0);
265 let base_secs = if deficit > 0.0 {
266 (deficit / restore_rate) * 1.1
267 } else {
268 0.5
269 };
270 let backoff = base_secs * 1.5_f64.powi(attempt as i32);
271 let ms = (backoff * 1_000.0) as u64;
272 ms.clamp(500, config.max_delay_ms)
273}
274
275#[cfg(test)]
280#[allow(
281 clippy::float_cmp,
282 clippy::unwrap_used,
283 clippy::significant_drop_tightening
284)]
285mod tests {
286 use super::*;
287 use serde_json::json;
288
289 #[test]
290 fn live_budget_initialises_from_config() {
291 let config = CostThrottleConfig {
292 max_points: 5_000.0,
293 restore_per_sec: 250.0,
294 min_available: 50.0,
295 max_delay_ms: 10_000,
296 };
297 let budget = LiveBudget::new(&config);
298 assert_eq!(budget.currently_available, 5_000.0);
299 assert_eq!(budget.maximum_available, 5_000.0);
300 assert_eq!(budget.restore_rate, 250.0);
301 }
302
303 #[test]
304 fn live_budget_updates_from_response() {
305 let config = CostThrottleConfig::default();
306 let mut budget = LiveBudget::new(&config);
307
308 let status = json!({
309 "maximumAvailable": 10_000.0,
310 "currentlyAvailable": 3_000.0,
311 "restoreRate": 500.0,
312 });
313 budget.update_from_response(&status);
314
315 assert_eq!(budget.currently_available, 3_000.0);
316 assert_eq!(budget.maximum_available, 10_000.0);
317 }
318
319 #[test]
320 fn projected_available_accounts_for_restore() {
321 let config = CostThrottleConfig {
322 max_points: 10_000.0,
323 restore_per_sec: 1_000.0, ..Default::default()
325 };
326 let mut budget = LiveBudget::new(&config);
327 budget.currently_available = 0.0;
329 let p = budget.projected_available();
332 assert!(p >= 0.0);
333 assert!(p <= 10_000.0);
334 }
335
336 #[test]
337 fn projected_available_caps_at_maximum() {
338 let config = CostThrottleConfig::default();
339 let budget = LiveBudget::new(&config);
340 assert!(budget.projected_available() <= budget.maximum_available);
342 }
343
344 #[tokio::test]
345 async fn pre_flight_delay_does_not_sleep_when_budget_healthy() {
346 let budget = PluginBudget::new(CostThrottleConfig::default());
347 let before = Instant::now();
349 pre_flight_delay(&budget).await;
350 assert!(before.elapsed().as_millis() < 100, "unexpected delay");
351 }
352
353 #[tokio::test]
354 async fn update_budget_parses_throttle_status() {
355 let budget = PluginBudget::new(CostThrottleConfig::default());
356 let response = json!({
357 "data": {},
358 "extensions": { "cost": { "throttleStatus": {
359 "maximumAvailable": 10_000.0,
360 "currentlyAvailable": 2_500.0,
361 "restoreRate": 500.0,
362 }}}
363 });
364 update_budget(&budget, &response).await;
365 let guard = budget.inner.lock().await;
366 assert_eq!(guard.currently_available, 2_500.0);
367 }
368
369 #[test]
370 fn reactive_backoff_ms_clamps_to_500ms_floor() {
371 let config = CostThrottleConfig::default();
372 let body = json!({ "extensions": { "cost": { "throttleStatus": {
373 "maximumAvailable": 10_000.0,
374 "currentlyAvailable": 9_999.0,
375 "restoreRate": 500.0,
376 }}}});
377 let ms = reactive_backoff_ms(&config, &body, 0);
378 assert_eq!(ms, 500); }
380
381 #[test]
382 fn reactive_backoff_ms_increases_with_attempt() {
383 let config = CostThrottleConfig::default();
384 let body = json!({ "extensions": { "cost": { "throttleStatus": {
385 "maximumAvailable": 10_000.0,
386 "currentlyAvailable": 5_000.0,
387 "restoreRate": 500.0,
388 }}}});
389 let ms0 = reactive_backoff_ms(&config, &body, 0);
390 let ms1 = reactive_backoff_ms(&config, &body, 1);
391 let ms2 = reactive_backoff_ms(&config, &body, 2);
392 assert!(ms1 > ms0);
393 assert!(ms2 > ms1);
394 }
395
396 #[test]
397 fn reactive_backoff_ms_caps_at_max_delay() {
398 let config = CostThrottleConfig {
399 max_delay_ms: 1_000,
400 ..Default::default()
401 };
402 let body = json!({ "extensions": { "cost": { "throttleStatus": {
403 "maximumAvailable": 10_000.0,
404 "currentlyAvailable": 0.0,
405 "restoreRate": 1.0, }}}});
407 let ms = reactive_backoff_ms(&config, &body, 10);
408 assert_eq!(ms, 1_000);
409 }
410}