Skip to main content

stygian_graph/adapters/
graphql_throttle.rs

1//! Proactive GraphQL cost-throttle management.
2//!
3//! `LiveBudget` tracks the rolling point budget advertised by APIs that
4//! implement the Shopify / Jobber-style cost-throttle extension envelope:
5//!
6//! ```json
7//! { "extensions": { "cost": {
8//!     "requestedQueryCost": 12,
9//!     "actualQueryCost": 12,
10//!     "throttleStatus": {
11//!         "maximumAvailable": 10000.0,
12//!         "currentlyAvailable": 9988.0,
13//!         "restoreRate": 500.0
14//!     }
15//! }}}
16//! ```
17//!
18//! Before each request a *proactive* pre-flight delay is computed: if the
19//! projected available budget (accounting for elapsed restore time and
20//! in-flight reservations) will be too low, the caller sleeps until it
21//! recovers.  After the delay, `pre_flight_reserve` atomically reserves an
22//! estimated cost against the budget so concurrent callers immediately see a
23//! reduced balance.
24//!
25//! ## `BudgetGuard` (RAII)
26//!
27//! [`BudgetGuard`] wraps `pre_flight_reserve` + `release_reservation` into a
28//! scope-based guard so callers no longer need to track every exit path
29//! manually.  Call [`BudgetGuard::acquire`] before the request and
30//! [`BudgetGuard::release`] on the success path.  If `release()` is never
31//! called (e.g. early return or `?` propagation), the `Drop` impl spawns a
32//! background task to release the reservation as a safety net.
33//!
34//! ```no_run
35//! use stygian_graph::adapters::graphql_throttle::{
36//!     CostThrottleConfig, PluginBudget, BudgetGuard,
37//! };
38//!
39//! # async fn example() {
40//! let budget = PluginBudget::new(CostThrottleConfig::default());
41//! let guard = BudgetGuard::acquire(&budget).await;
42//! // ... send request ...
43//! guard.release().await; // explicit async release on success path
44//! // If this line is never reached, Drop releases via tokio::spawn.
45//! # }
46//! ```
47
48use std::sync::Arc;
49use std::time::{Duration, Instant};
50
51use serde_json::Value;
52use tokio::sync::Mutex;
53
54/// Re-export from the ports layer — the canonical definition lives there.
55pub use crate::ports::graphql_plugin::CostThrottleConfig;
56
57// ─────────────────────────────────────────────────────────────────────────────
58// LiveBudget
59// ─────────────────────────────────────────────────────────────────────────────
60
61/// Mutable runtime state tracking the current point budget.
62///
63/// One `LiveBudget` should be shared across all requests to the same plugin
64/// endpoint, wrapped in `Arc<Mutex<LiveBudget>>` to serialise updates.
65#[derive(Debug)]
66pub struct LiveBudget {
67    currently_available: f64,
68    maximum_available: f64,
69    restore_rate: f64, // points/second
70    last_updated: Instant,
71    /// Points reserved for requests currently in-flight.
72    pending: f64,
73}
74
75impl LiveBudget {
76    /// Create a new budget initialised from `config` defaults.
77    #[must_use]
78    pub fn new(config: &CostThrottleConfig) -> Self {
79        Self {
80            currently_available: config.max_points,
81            maximum_available: config.max_points,
82            restore_rate: config.restore_per_sec,
83            last_updated: Instant::now(),
84            pending: 0.0,
85        }
86    }
87
88    /// Update the budget from a throttle-status object.
89    ///
90    /// The JSON path is `extensions.cost.throttleStatus` in the GraphQL response body.
91    ///
92    /// # Example
93    ///
94    /// ```rust
95    /// use serde_json::json;
96    /// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, LiveBudget};
97    ///
98    /// let config = CostThrottleConfig::default();
99    /// let mut budget = LiveBudget::new(&config);
100    ///
101    /// let status = json!({
102    ///     "maximumAvailable": 10000.0,
103    ///     "currentlyAvailable": 4200.0,
104    ///     "restoreRate": 500.0,
105    /// });
106    /// budget.update_from_response(&status);
107    /// ```
108    pub fn update_from_response(&mut self, throttle_status: &Value) {
109        if let Some(max) = throttle_status["maximumAvailable"].as_f64() {
110            self.maximum_available = max;
111        }
112        if let Some(cur) = throttle_status["currentlyAvailable"].as_f64() {
113            self.currently_available = cur;
114        }
115        if let Some(rate) = throttle_status["restoreRate"].as_f64() {
116            self.restore_rate = rate;
117        }
118        self.last_updated = Instant::now();
119    }
120
121    /// Compute the projected available budget accounting for elapsed restore
122    /// time and in-flight reservations.
123    fn projected_available(&self) -> f64 {
124        let elapsed = self.last_updated.elapsed().as_secs_f64();
125        let restored = elapsed * self.restore_rate;
126        let gross = (self.currently_available + restored).min(self.maximum_available);
127        (gross - self.pending).max(0.0)
128    }
129
130    /// Reserve `cost` points for an in-flight request.
131    fn reserve(&mut self, cost: f64) {
132        self.pending += cost;
133    }
134
135    /// Release a previous [`reserve`] once the request has completed.
136    fn release(&mut self, cost: f64) {
137        self.pending = (self.pending - cost).max(0.0);
138    }
139}
140
141// ─────────────────────────────────────────────────────────────────────────────
142// Per-plugin budget store
143// ─────────────────────────────────────────────────────────────────────────────
144
145/// A shareable, cheaply-cloneable handle to a per-plugin `LiveBudget`.
146///
147/// Create one per registered plugin and pass it to [`pre_flight_reserve`] before
148/// each request.
149///
150/// # Example
151///
152/// ```rust
153/// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, PluginBudget};
154///
155/// let budget = PluginBudget::new(CostThrottleConfig::default());
156/// let budget2 = budget.clone(); // cheap Arc clone
157/// ```
158#[derive(Clone, Debug)]
159pub struct PluginBudget {
160    inner: Arc<Mutex<LiveBudget>>,
161    config: CostThrottleConfig,
162}
163
164impl PluginBudget {
165    /// Create a new `PluginBudget` initialised from `config`.
166    #[must_use]
167    pub fn new(config: CostThrottleConfig) -> Self {
168        let budget = LiveBudget::new(&config);
169        Self {
170            inner: Arc::new(Mutex::new(budget)),
171            config,
172        }
173    }
174
175    /// Return the `CostThrottleConfig` this budget was initialised from.
176    #[must_use]
177    pub const fn config(&self) -> &CostThrottleConfig {
178        &self.config
179    }
180}
181
182// ─────────────────────────────────────────────────────────────────────────────
183// BudgetGuard (RAII)
184// ─────────────────────────────────────────────────────────────────────────────
185
186/// RAII guard that automatically releases a budget reservation on drop.
187///
188/// Wraps [`pre_flight_reserve`] and [`release_reservation`] into a
189/// scope-based guard so callers no longer need to track every exit path
190/// manually.
191///
192/// On the **success path**, call [`release`](BudgetGuard::release) for a
193/// clean async release.  If `release()` is never called (early return, `?`
194/// propagation, panic), the [`Drop`] impl spawns a background Tokio task to
195/// release the reservation — this is the safety net.
196///
197/// Once `AsyncDrop` stabilises on stable Rust, the explicit `release()`
198/// method can be removed and the async cleanup will happen transparently.
199///
200/// # Example
201///
202/// ```no_run
203/// use stygian_graph::adapters::graphql_throttle::{
204///     CostThrottleConfig, PluginBudget, BudgetGuard,
205/// };
206///
207/// # async fn example() {
208/// let budget = PluginBudget::new(CostThrottleConfig::default());
209/// let guard = BudgetGuard::acquire(&budget).await;
210/// // ... send request ...
211/// guard.release().await; // explicit async release on success path
212/// # }
213/// ```
214pub struct BudgetGuard {
215    /// The budget handle.  Set to `None` after explicit `release()`.
216    budget: Option<PluginBudget>,
217    /// Reserved cost to release.
218    cost: f64,
219}
220
221impl BudgetGuard {
222    /// Acquire a reservation from `budget`, sleeping if the projected balance
223    /// is too low.
224    ///
225    /// Returns a guard that will release the reservation when dropped or when
226    /// [`release`](Self::release) is called.
227    ///
228    /// # Example
229    ///
230    /// ```no_run
231    /// use stygian_graph::adapters::graphql_throttle::{
232    ///     CostThrottleConfig, PluginBudget, BudgetGuard,
233    /// };
234    ///
235    /// # async fn example() {
236    /// let budget = PluginBudget::new(CostThrottleConfig::default());
237    /// let guard = BudgetGuard::acquire(&budget).await;
238    /// // reservation is now held
239    /// guard.release().await;
240    /// # }
241    /// ```
242    pub async fn acquire(budget: &PluginBudget) -> Self {
243        let cost = pre_flight_reserve(budget).await;
244        Self {
245            budget: Some(budget.clone()),
246            cost,
247        }
248    }
249
250    /// The reserved cost held by this guard.
251    #[must_use]
252    pub fn cost(&self) -> f64 {
253        self.cost
254    }
255
256    /// Explicitly release the reservation (async, preferred on success path).
257    ///
258    /// After calling this, the guard's `Drop` impl becomes a no-op.
259    ///
260    /// # Example
261    ///
262    /// ```no_run
263    /// use stygian_graph::adapters::graphql_throttle::{
264    ///     CostThrottleConfig, PluginBudget, BudgetGuard,
265    /// };
266    ///
267    /// # async fn example() {
268    /// let budget = PluginBudget::new(CostThrottleConfig::default());
269    /// let guard = BudgetGuard::acquire(&budget).await;
270    /// guard.release().await;
271    /// # }
272    /// ```
273    pub async fn release(mut self) {
274        if let Some(budget) = self.budget.take() {
275            release_reservation(&budget, self.cost).await;
276        }
277    }
278}
279
280impl Drop for BudgetGuard {
281    fn drop(&mut self) {
282        if let Some(budget) = self.budget.take() {
283            let cost = self.cost;
284            // Safety-net: spawn a background task to release the reservation.
285            // This is the fallback for early returns / `?` / panic unwinds.
286            // Once AsyncDrop stabilises this can be replaced with a proper
287            // async drop implementation.
288            tokio::spawn(async move {
289                release_reservation(&budget, cost).await;
290                tracing::debug!(
291                    cost,
292                    "BudgetGuard: reservation released via Drop safety-net"
293                );
294            });
295        }
296    }
297}
298
299// ─────────────────────────────────────────────────────────────────────────────
300// Public API
301// ─────────────────────────────────────────────────────────────────────────────
302
303/// Sleep if the projected budget is too low, then atomically reserve an
304/// estimated cost for the upcoming request.
305///
306/// Returns the reserved point amount.  **Every** exit path after this call —
307/// both success and error — must call [`release_reservation`] with the returned
308/// value to prevent the pending balance growing indefinitely.
309///
310/// The `Mutex` guard is released before the `.await` to satisfy `Send` bounds.
311///
312/// # Example
313///
314/// ```rust
315/// use stygian_graph::adapters::graphql_throttle::{
316///     CostThrottleConfig, PluginBudget, pre_flight_reserve, release_reservation,
317/// };
318///
319/// # async fn example() {
320/// let budget = PluginBudget::new(CostThrottleConfig::default());
321/// let reserved = pre_flight_reserve(&budget).await;
322/// // ... send the request ...
323/// release_reservation(&budget, reserved).await;
324/// # }
325/// ```
326#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
327pub async fn pre_flight_reserve(budget: &PluginBudget) -> f64 {
328    let estimated_cost = budget.config.estimated_cost_per_request;
329    let delay = {
330        let mut guard = budget.inner.lock().await;
331        let projected = guard.projected_available();
332        let rate = guard.restore_rate.max(1.0);
333        let min = budget.config.min_available;
334        let delay = if projected < min + estimated_cost {
335            let deficit = (min + estimated_cost) - projected;
336            let secs = (deficit / rate) * 1.1;
337            let ms = (secs * 1_000.0) as u64;
338            Some(Duration::from_millis(ms.min(budget.config.max_delay_ms)))
339        } else {
340            None
341        };
342        // Reserve while the lock is held so concurrent callers immediately
343        // see the reduced projected balance.
344        guard.reserve(estimated_cost);
345        delay
346    };
347
348    if let Some(d) = delay {
349        tracing::debug!(
350            delay_ms = d.as_millis(),
351            "graphql throttle: pre-flight delay"
352        );
353        tokio::time::sleep(d).await;
354    }
355
356    estimated_cost
357}
358
359/// Release a reservation made by [`pre_flight_reserve`].
360///
361/// Must be called on every exit path after [`pre_flight_reserve`] — both
362/// success and error — to keep the pending balance accurate.  On the success
363/// path, call [`update_budget`] first so the live balance is reconciled from
364/// the server-reported `currentlyAvailable` before the reservation is removed.
365///
366/// # Example
367///
368/// ```rust
369/// use stygian_graph::adapters::graphql_throttle::{
370///     CostThrottleConfig, PluginBudget, pre_flight_reserve, release_reservation,
371/// };
372///
373/// # async fn example() {
374/// let budget = PluginBudget::new(CostThrottleConfig::default());
375/// let reserved = pre_flight_reserve(&budget).await;
376/// release_reservation(&budget, reserved).await;
377/// # }
378/// ```
379pub async fn release_reservation(budget: &PluginBudget, cost: f64) {
380    let mut guard = budget.inner.lock().await;
381    guard.release(cost);
382}
383
384/// Update the `PluginBudget` from a completed response body.
385///
386/// Extracts `extensions.cost.throttleStatus` if present and forwards to
387/// [`LiveBudget::update_from_response`].
388///
389/// # Example
390///
391/// ```rust
392/// use serde_json::json;
393/// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, PluginBudget, update_budget};
394///
395/// # async fn example() {
396/// let budget = PluginBudget::new(CostThrottleConfig::default());
397/// let response = json!({
398///     "data": {},
399///     "extensions": { "cost": { "throttleStatus": {
400///         "maximumAvailable": 10000.0,
401///         "currentlyAvailable": 8000.0,
402///         "restoreRate": 500.0,
403///     }}}
404/// });
405/// update_budget(&budget, &response).await;
406/// # }
407/// ```
408pub async fn update_budget(budget: &PluginBudget, response_body: &Value) {
409    let Some(status) = response_body.pointer("/extensions/cost/throttleStatus") else {
410        return;
411    };
412    if status.is_object() {
413        let mut guard = budget.inner.lock().await;
414        guard.update_from_response(status);
415    }
416}
417
418/// Compute the reactive back-off delay from a throttle response body.
419///
420/// Use this when `extensions.cost.throttleStatus` signals `THROTTLED` rather
421/// than projecting from the `LiveBudget`.
422///
423/// ```text
424/// deficit = max_available − currently_available
425/// base_ms = deficit / restore_rate * 1100
426/// ms      = (base_ms * 1.5^attempt).clamp(500, max_delay_ms)
427/// ```
428///
429/// # Example
430///
431/// ```rust
432/// use serde_json::json;
433/// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, reactive_backoff_ms};
434///
435/// let config = CostThrottleConfig::default();
436/// let body = json!({ "extensions": { "cost": { "throttleStatus": {
437///     "maximumAvailable": 10000.0,
438///     "currentlyAvailable": 0.0,
439///     "restoreRate": 500.0,
440/// }}}});
441/// let ms = reactive_backoff_ms(&config, &body, 0);
442/// assert!(ms >= 500);
443/// ```
444#[must_use]
445#[allow(
446    clippy::cast_possible_truncation,
447    clippy::cast_sign_loss,
448    clippy::cast_possible_wrap
449)]
450pub fn reactive_backoff_ms(config: &CostThrottleConfig, body: &Value, attempt: u32) -> u64 {
451    let status = body.pointer("/extensions/cost/throttleStatus");
452    let max_avail = status
453        .and_then(|s| s.get("maximumAvailable"))
454        .and_then(Value::as_f64)
455        .unwrap_or(config.max_points);
456    let cur_avail = status
457        .and_then(|s| s.get("currentlyAvailable"))
458        .and_then(Value::as_f64)
459        .unwrap_or(0.0);
460    let restore_rate = status
461        .and_then(|s| s.get("restoreRate"))
462        .and_then(Value::as_f64)
463        .unwrap_or(config.restore_per_sec)
464        .max(1.0);
465    let deficit = (max_avail - cur_avail).max(0.0);
466    let base_secs = if deficit > 0.0 {
467        (deficit / restore_rate) * 1.1
468    } else {
469        0.5
470    };
471    let backoff = base_secs * 1.5_f64.powi(attempt as i32);
472    let ms = (backoff * 1_000.0) as u64;
473    ms.clamp(500, config.max_delay_ms)
474}
475
476// ─────────────────────────────────────────────────────────────────────────────
477// Tests
478// ─────────────────────────────────────────────────────────────────────────────
479
480#[cfg(test)]
481#[allow(
482    clippy::float_cmp,
483    clippy::unwrap_used,
484    clippy::significant_drop_tightening
485)]
486mod tests {
487    use super::*;
488    use serde_json::json;
489
490    #[test]
491    fn live_budget_initialises_from_config() {
492        let config = CostThrottleConfig {
493            max_points: 5_000.0,
494            restore_per_sec: 250.0,
495            min_available: 50.0,
496            max_delay_ms: 10_000,
497            estimated_cost_per_request: 100.0,
498        };
499        let budget = LiveBudget::new(&config);
500        assert_eq!(budget.currently_available, 5_000.0);
501        assert_eq!(budget.maximum_available, 5_000.0);
502        assert_eq!(budget.restore_rate, 250.0);
503    }
504
505    #[test]
506    fn live_budget_updates_from_response() {
507        let config = CostThrottleConfig::default();
508        let mut budget = LiveBudget::new(&config);
509
510        let status = json!({
511            "maximumAvailable": 10_000.0,
512            "currentlyAvailable": 3_000.0,
513            "restoreRate": 500.0,
514        });
515        budget.update_from_response(&status);
516
517        assert_eq!(budget.currently_available, 3_000.0);
518        assert_eq!(budget.maximum_available, 10_000.0);
519    }
520
521    #[test]
522    fn projected_available_accounts_for_restore() {
523        let config = CostThrottleConfig {
524            max_points: 10_000.0,
525            restore_per_sec: 1_000.0, // fast restore for test
526            ..Default::default()
527        };
528        let mut budget = LiveBudget::new(&config);
529        // Simulate a low budget
530        budget.currently_available = 0.0;
531        // Immediately after update, projected = 0 + small_elapsed * 1000
532        // which is ~ 0 (sub-millisecond). Just confirm it doesn't panic.
533        let p = budget.projected_available();
534        assert!(p >= 0.0);
535        assert!(p <= 10_000.0);
536    }
537
538    #[test]
539    fn projected_available_caps_at_maximum() {
540        let config = CostThrottleConfig::default();
541        let budget = LiveBudget::new(&config);
542        // Fresh budget is already at maximum
543        assert!(budget.projected_available() <= budget.maximum_available);
544    }
545
546    #[tokio::test]
547    async fn pre_flight_reserve_does_not_sleep_when_budget_healthy() {
548        let budget = PluginBudget::new(CostThrottleConfig::default());
549        // Budget starts full — no delay expected.
550        let before = Instant::now();
551        let reserved = pre_flight_reserve(&budget).await;
552        assert!(before.elapsed().as_millis() < 100, "unexpected delay");
553        assert_eq!(
554            reserved,
555            CostThrottleConfig::default().estimated_cost_per_request
556        );
557        release_reservation(&budget, reserved).await;
558    }
559
560    #[tokio::test]
561    async fn update_budget_parses_throttle_status() {
562        let budget = PluginBudget::new(CostThrottleConfig::default());
563        let response = json!({
564            "data": {},
565            "extensions": { "cost": { "throttleStatus": {
566                "maximumAvailable": 10_000.0,
567                "currentlyAvailable": 2_500.0,
568                "restoreRate": 500.0,
569            }}}
570        });
571        update_budget(&budget, &response).await;
572        let guard = budget.inner.lock().await;
573        assert_eq!(guard.currently_available, 2_500.0);
574    }
575
576    #[tokio::test]
577    async fn concurrent_reservations_reduce_projected_available() {
578        let config = CostThrottleConfig {
579            max_points: 1_000.0,
580            estimated_cost_per_request: 200.0,
581            ..Default::default()
582        };
583        let budget = PluginBudget::new(config);
584
585        // Each pre_flight_reserve atomically deducts from pending, so the
586        // second caller sees a lower projected balance than the first.
587        let r1 = pre_flight_reserve(&budget).await;
588        let r2 = pre_flight_reserve(&budget).await;
589
590        {
591            let guard = budget.inner.lock().await;
592            // Two reservations of 200 → pending = 400
593            assert!((guard.pending - 400.0).abs() < f64::EPSILON);
594            // projected = 1000 - 400 = 600 (approximately, ignoring sub-ms restore)
595            let projected = guard.projected_available();
596            assert!((599.0..=601.0).contains(&projected));
597        }
598
599        release_reservation(&budget, r1).await;
600        release_reservation(&budget, r2).await;
601
602        let guard = budget.inner.lock().await;
603        assert!(guard.pending < f64::EPSILON);
604    }
605
606    #[test]
607    fn reactive_backoff_ms_clamps_to_500ms_floor() {
608        let config = CostThrottleConfig::default();
609        let body = json!({ "extensions": { "cost": { "throttleStatus": {
610            "maximumAvailable": 10_000.0,
611            "currentlyAvailable": 9_999.0,
612            "restoreRate": 500.0,
613        }}}});
614        let ms = reactive_backoff_ms(&config, &body, 0);
615        assert_eq!(ms, 500); // Very small deficit rounds up to floor
616    }
617
618    #[test]
619    fn reactive_backoff_ms_increases_with_attempt() {
620        let config = CostThrottleConfig::default();
621        let body = json!({ "extensions": { "cost": { "throttleStatus": {
622            "maximumAvailable": 10_000.0,
623            "currentlyAvailable": 5_000.0,
624            "restoreRate": 500.0,
625        }}}});
626        let ms0 = reactive_backoff_ms(&config, &body, 0);
627        let ms1 = reactive_backoff_ms(&config, &body, 1);
628        let ms2 = reactive_backoff_ms(&config, &body, 2);
629        assert!(ms1 > ms0);
630        assert!(ms2 > ms1);
631    }
632
633    #[test]
634    fn reactive_backoff_ms_caps_at_max_delay() {
635        let config = CostThrottleConfig {
636            max_delay_ms: 1_000,
637            ..Default::default()
638        };
639        let body = json!({ "extensions": { "cost": { "throttleStatus": {
640            "maximumAvailable": 10_000.0,
641            "currentlyAvailable": 0.0,
642            "restoreRate": 1.0, // very slow restore → huge deficit
643        }}}});
644        let ms = reactive_backoff_ms(&config, &body, 10);
645        assert_eq!(ms, 1_000);
646    }
647
648    #[tokio::test]
649    async fn budget_guard_releases_on_explicit_release() {
650        let budget = PluginBudget::new(CostThrottleConfig::default());
651        let guard = BudgetGuard::acquire(&budget).await;
652        let cost = guard.cost();
653        assert!(cost > 0.0);
654
655        // Pending should be non-zero while guard is held
656        {
657            let inner = budget.inner.lock().await;
658            assert!(inner.pending >= cost);
659        }
660
661        guard.release().await;
662
663        // Pending should be back to zero
664        let inner = budget.inner.lock().await;
665        assert!(inner.pending < f64::EPSILON);
666    }
667
668    #[tokio::test]
669    async fn budget_guard_releases_on_drop() {
670        let budget = PluginBudget::new(CostThrottleConfig::default());
671
672        {
673            let _guard = BudgetGuard::acquire(&budget).await;
674            // guard drops here without explicit release()
675        }
676
677        // Give the spawned task a moment to run
678        tokio::task::yield_now().await;
679        tokio::time::sleep(Duration::from_millis(10)).await;
680
681        let inner = budget.inner.lock().await;
682        assert!(inner.pending < f64::EPSILON, "Drop should have released");
683    }
684}