Skip to main content

stygian_graph/adapters/
graphql_throttle.rs

1//! Proactive GraphQL cost-throttle management.
2//!
3//! `LiveBudget` tracks the rolling point budget advertised by APIs that
4//! implement the Shopify / Jobber-style cost-throttle extension envelope:
5//!
6//! ```json
7//! { "extensions": { "cost": {
8//!     "requestedQueryCost": 12,
9//!     "actualQueryCost": 12,
10//!     "throttleStatus": {
11//!         "maximumAvailable": 10000.0,
12//!         "currentlyAvailable": 9988.0,
13//!         "restoreRate": 500.0
14//!     }
15//! }}}
16//! ```
17//!
18//! Before each request a *proactive* pre-flight delay is computed: if the
19//! projected available budget (accounting for elapsed restore time) will be
20//! too low, the caller sleeps until it recovers.  This eliminates wasted
21//! requests that would otherwise return `THROTTLED`.
22
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use serde_json::Value;
27use tokio::sync::Mutex;
28
29/// Re-export from the ports layer — the canonical definition lives there.
30pub use crate::ports::graphql_plugin::CostThrottleConfig;
31
32// ─────────────────────────────────────────────────────────────────────────────
33// LiveBudget
34// ─────────────────────────────────────────────────────────────────────────────
35
36/// Mutable runtime state tracking the current point budget.
37///
38/// One `LiveBudget` should be shared across all requests to the same plugin
39/// endpoint, wrapped in `Arc<Mutex<LiveBudget>>` to serialise updates.
40#[derive(Debug)]
41pub struct LiveBudget {
42    currently_available: f64,
43    maximum_available: f64,
44    restore_rate: f64, // points/second
45    last_updated: Instant,
46}
47
48impl LiveBudget {
49    /// Create a new budget initialised from `config` defaults.
50    #[must_use]
51    pub fn new(config: &CostThrottleConfig) -> Self {
52        Self {
53            currently_available: config.max_points,
54            maximum_available: config.max_points,
55            restore_rate: config.restore_per_sec,
56            last_updated: Instant::now(),
57        }
58    }
59
60    /// Update the budget from a throttle-status object.
61    ///
62    /// The JSON path is `extensions.cost.throttleStatus` in the GraphQL response body.
63    ///
64    /// # Example
65    ///
66    /// ```rust
67    /// use serde_json::json;
68    /// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, LiveBudget};
69    ///
70    /// let config = CostThrottleConfig::default();
71    /// let mut budget = LiveBudget::new(&config);
72    ///
73    /// let status = json!({
74    ///     "maximumAvailable": 10000.0,
75    ///     "currentlyAvailable": 4200.0,
76    ///     "restoreRate": 500.0,
77    /// });
78    /// budget.update_from_response(&status);
79    /// ```
80    pub fn update_from_response(&mut self, throttle_status: &Value) {
81        if let Some(max) = throttle_status["maximumAvailable"].as_f64() {
82            self.maximum_available = max;
83        }
84        if let Some(cur) = throttle_status["currentlyAvailable"].as_f64() {
85            self.currently_available = cur;
86        }
87        if let Some(rate) = throttle_status["restoreRate"].as_f64() {
88            self.restore_rate = rate;
89        }
90        self.last_updated = Instant::now();
91    }
92
93    /// Compute the projected available budget accounting for elapsed restore time.
94    fn projected_available(&self) -> f64 {
95        let elapsed = self.last_updated.elapsed().as_secs_f64();
96        let restored = elapsed * self.restore_rate;
97        (self.currently_available + restored).min(self.maximum_available)
98    }
99}
100
101// ─────────────────────────────────────────────────────────────────────────────
102// Per-plugin budget store
103// ─────────────────────────────────────────────────────────────────────────────
104
105/// A shareable, cheaply-cloneable handle to a per-plugin `LiveBudget`.
106///
107/// Create one per registered plugin and pass it to [`pre_flight_delay`] before
108/// each request.
109///
110/// # Example
111///
112/// ```rust
113/// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, PluginBudget};
114///
115/// let budget = PluginBudget::new(CostThrottleConfig::default());
116/// let budget2 = budget.clone(); // cheap Arc clone
117/// ```
118#[derive(Clone, Debug)]
119pub struct PluginBudget {
120    inner: Arc<Mutex<LiveBudget>>,
121    config: CostThrottleConfig,
122}
123
124impl PluginBudget {
125    /// Create a new `PluginBudget` initialised from `config`.
126    #[must_use]
127    pub fn new(config: CostThrottleConfig) -> Self {
128        let budget = LiveBudget::new(&config);
129        Self {
130            inner: Arc::new(Mutex::new(budget)),
131            config,
132        }
133    }
134}
135
136// ─────────────────────────────────────────────────────────────────────────────
137// Public API
138// ─────────────────────────────────────────────────────────────────────────────
139
140/// Compute a pre-flight delay and sleep if the budget is projected to be too low.
141///
142/// Must be called **before** sending a request.  The `Mutex` guard is released
143/// before the `.await` to satisfy `Send` bounds.
144///
145/// # Example
146///
147/// ```rust
148/// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, PluginBudget, pre_flight_delay};
149///
150/// # async fn example() {
151/// let budget = PluginBudget::new(CostThrottleConfig::default());
152/// pre_flight_delay(&budget).await;
153/// // safe to send the request now
154/// # }
155/// ```
156#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
157pub async fn pre_flight_delay(budget: &PluginBudget) {
158    let delay = {
159        let guard = budget.inner.lock().await;
160        let projected = guard.projected_available();
161        let rate = guard.restore_rate.max(1.0);
162        drop(guard);
163        let min = budget.config.min_available;
164        if projected < min {
165            let deficit = min - projected;
166            let secs = (deficit / rate) * 1.1;
167            let ms = (secs * 1_000.0) as u64;
168            Some(Duration::from_millis(ms.min(budget.config.max_delay_ms)))
169        } else {
170            None
171        }
172    };
173
174    if let Some(d) = delay {
175        tracing::debug!(
176            delay_ms = d.as_millis(),
177            "graphql throttle: pre-flight delay"
178        );
179        tokio::time::sleep(d).await;
180    }
181}
182
183/// Update the `PluginBudget` from a completed response body.
184///
185/// Extracts `extensions.cost.throttleStatus` if present and forwards to
186/// [`LiveBudget::update_from_response`].
187///
188/// # Example
189///
190/// ```rust
191/// use serde_json::json;
192/// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, PluginBudget, update_budget};
193///
194/// # async fn example() {
195/// let budget = PluginBudget::new(CostThrottleConfig::default());
196/// let response = json!({
197///     "data": {},
198///     "extensions": { "cost": { "throttleStatus": {
199///         "maximumAvailable": 10000.0,
200///         "currentlyAvailable": 8000.0,
201///         "restoreRate": 500.0,
202///     }}}
203/// });
204/// update_budget(&budget, &response).await;
205/// # }
206/// ```
207pub async fn update_budget(budget: &PluginBudget, response_body: &Value) {
208    let Some(status) = response_body.pointer("/extensions/cost/throttleStatus") else {
209        return;
210    };
211    if status.is_object() {
212        let mut guard = budget.inner.lock().await;
213        guard.update_from_response(status);
214    }
215}
216
217/// Compute the reactive back-off delay from a throttle response body.
218///
219/// Use this when `extensions.cost.throttleStatus` signals `THROTTLED` rather
220/// than projecting from the `LiveBudget`.
221///
222/// ```text
223/// deficit = max_available − currently_available
224/// base_ms = deficit / restore_rate * 1100
225/// ms      = (base_ms * 1.5^attempt).clamp(500, max_delay_ms)
226/// ```
227///
228/// # Example
229///
230/// ```rust
231/// use serde_json::json;
232/// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, reactive_backoff_ms};
233///
234/// let config = CostThrottleConfig::default();
235/// let body = json!({ "extensions": { "cost": { "throttleStatus": {
236///     "maximumAvailable": 10000.0,
237///     "currentlyAvailable": 0.0,
238///     "restoreRate": 500.0,
239/// }}}});
240/// let ms = reactive_backoff_ms(&config, &body, 0);
241/// assert!(ms >= 500);
242/// ```
243#[must_use]
244#[allow(
245    clippy::cast_possible_truncation,
246    clippy::cast_sign_loss,
247    clippy::cast_possible_wrap
248)]
249pub fn reactive_backoff_ms(config: &CostThrottleConfig, body: &Value, attempt: u32) -> u64 {
250    let status = body.pointer("/extensions/cost/throttleStatus");
251    let max_avail = status
252        .and_then(|s| s.get("maximumAvailable"))
253        .and_then(Value::as_f64)
254        .unwrap_or(config.max_points);
255    let cur_avail = status
256        .and_then(|s| s.get("currentlyAvailable"))
257        .and_then(Value::as_f64)
258        .unwrap_or(0.0);
259    let restore_rate = status
260        .and_then(|s| s.get("restoreRate"))
261        .and_then(Value::as_f64)
262        .unwrap_or(config.restore_per_sec)
263        .max(1.0);
264    let deficit = (max_avail - cur_avail).max(0.0);
265    let base_secs = if deficit > 0.0 {
266        (deficit / restore_rate) * 1.1
267    } else {
268        0.5
269    };
270    let backoff = base_secs * 1.5_f64.powi(attempt as i32);
271    let ms = (backoff * 1_000.0) as u64;
272    ms.clamp(500, config.max_delay_ms)
273}
274
275// ─────────────────────────────────────────────────────────────────────────────
276// Tests
277// ─────────────────────────────────────────────────────────────────────────────
278
279#[cfg(test)]
280#[allow(
281    clippy::float_cmp,
282    clippy::unwrap_used,
283    clippy::significant_drop_tightening
284)]
285mod tests {
286    use super::*;
287    use serde_json::json;
288
289    #[test]
290    fn live_budget_initialises_from_config() {
291        let config = CostThrottleConfig {
292            max_points: 5_000.0,
293            restore_per_sec: 250.0,
294            min_available: 50.0,
295            max_delay_ms: 10_000,
296        };
297        let budget = LiveBudget::new(&config);
298        assert_eq!(budget.currently_available, 5_000.0);
299        assert_eq!(budget.maximum_available, 5_000.0);
300        assert_eq!(budget.restore_rate, 250.0);
301    }
302
303    #[test]
304    fn live_budget_updates_from_response() {
305        let config = CostThrottleConfig::default();
306        let mut budget = LiveBudget::new(&config);
307
308        let status = json!({
309            "maximumAvailable": 10_000.0,
310            "currentlyAvailable": 3_000.0,
311            "restoreRate": 500.0,
312        });
313        budget.update_from_response(&status);
314
315        assert_eq!(budget.currently_available, 3_000.0);
316        assert_eq!(budget.maximum_available, 10_000.0);
317    }
318
319    #[test]
320    fn projected_available_accounts_for_restore() {
321        let config = CostThrottleConfig {
322            max_points: 10_000.0,
323            restore_per_sec: 1_000.0, // fast restore for test
324            ..Default::default()
325        };
326        let mut budget = LiveBudget::new(&config);
327        // Simulate a low budget
328        budget.currently_available = 0.0;
329        // Immediately after update, projected = 0 + small_elapsed * 1000
330        // which is ~ 0 (sub-millisecond). Just confirm it doesn't panic.
331        let p = budget.projected_available();
332        assert!(p >= 0.0);
333        assert!(p <= 10_000.0);
334    }
335
336    #[test]
337    fn projected_available_caps_at_maximum() {
338        let config = CostThrottleConfig::default();
339        let budget = LiveBudget::new(&config);
340        // Fresh budget is already at maximum
341        assert!(budget.projected_available() <= budget.maximum_available);
342    }
343
344    #[tokio::test]
345    async fn pre_flight_delay_does_not_sleep_when_budget_healthy() {
346        let budget = PluginBudget::new(CostThrottleConfig::default());
347        // Budget starts full — no delay expected.
348        let before = Instant::now();
349        pre_flight_delay(&budget).await;
350        assert!(before.elapsed().as_millis() < 100, "unexpected delay");
351    }
352
353    #[tokio::test]
354    async fn update_budget_parses_throttle_status() {
355        let budget = PluginBudget::new(CostThrottleConfig::default());
356        let response = json!({
357            "data": {},
358            "extensions": { "cost": { "throttleStatus": {
359                "maximumAvailable": 10_000.0,
360                "currentlyAvailable": 2_500.0,
361                "restoreRate": 500.0,
362            }}}
363        });
364        update_budget(&budget, &response).await;
365        let guard = budget.inner.lock().await;
366        assert_eq!(guard.currently_available, 2_500.0);
367    }
368
369    #[test]
370    fn reactive_backoff_ms_clamps_to_500ms_floor() {
371        let config = CostThrottleConfig::default();
372        let body = json!({ "extensions": { "cost": { "throttleStatus": {
373            "maximumAvailable": 10_000.0,
374            "currentlyAvailable": 9_999.0,
375            "restoreRate": 500.0,
376        }}}});
377        let ms = reactive_backoff_ms(&config, &body, 0);
378        assert_eq!(ms, 500); // Very small deficit rounds up to floor
379    }
380
381    #[test]
382    fn reactive_backoff_ms_increases_with_attempt() {
383        let config = CostThrottleConfig::default();
384        let body = json!({ "extensions": { "cost": { "throttleStatus": {
385            "maximumAvailable": 10_000.0,
386            "currentlyAvailable": 5_000.0,
387            "restoreRate": 500.0,
388        }}}});
389        let ms0 = reactive_backoff_ms(&config, &body, 0);
390        let ms1 = reactive_backoff_ms(&config, &body, 1);
391        let ms2 = reactive_backoff_ms(&config, &body, 2);
392        assert!(ms1 > ms0);
393        assert!(ms2 > ms1);
394    }
395
396    #[test]
397    fn reactive_backoff_ms_caps_at_max_delay() {
398        let config = CostThrottleConfig {
399            max_delay_ms: 1_000,
400            ..Default::default()
401        };
402        let body = json!({ "extensions": { "cost": { "throttleStatus": {
403            "maximumAvailable": 10_000.0,
404            "currentlyAvailable": 0.0,
405            "restoreRate": 1.0, // very slow restore → huge deficit
406        }}}});
407        let ms = reactive_backoff_ms(&config, &body, 10);
408        assert_eq!(ms, 1_000);
409    }
410}