stygian_graph/adapters/graphql_throttle.rs
1//! Proactive GraphQL cost-throttle management.
2//!
3//! `LiveBudget` tracks the rolling point budget advertised by APIs that
4//! implement the Shopify / Jobber-style cost-throttle extension envelope:
5//!
6//! ```json
7//! { "extensions": { "cost": {
8//! "requestedQueryCost": 12,
9//! "actualQueryCost": 12,
10//! "throttleStatus": {
11//! "maximumAvailable": 10000.0,
12//! "currentlyAvailable": 9988.0,
13//! "restoreRate": 500.0
14//! }
15//! }}}
16//! ```
17//!
18//! Before each request a *proactive* pre-flight delay is computed: if the
19//! projected available budget (accounting for elapsed restore time and
20//! in-flight reservations) will be too low, the caller sleeps until it
21//! recovers. After the delay, `pre_flight_reserve` atomically reserves an
22//! estimated cost against the budget so concurrent callers immediately see a
23//! reduced balance.
24//!
25//! ## `BudgetGuard` (RAII)
26//!
27//! [`BudgetGuard`] wraps `pre_flight_reserve` + `release_reservation` into a
28//! scope-based guard so callers no longer need to track every exit path
29//! manually. Call [`BudgetGuard::acquire`] before the request and
30//! [`BudgetGuard::release`] on the success path. If `release()` is never
31//! called (e.g. early return or `?` propagation), the `Drop` impl spawns a
32//! background task to release the reservation as a safety net.
33//!
34//! ```no_run
35//! use stygian_graph::adapters::graphql_throttle::{
36//! CostThrottleConfig, PluginBudget, BudgetGuard,
37//! };
38//!
39//! # async fn example() {
40//! let budget = PluginBudget::new(CostThrottleConfig::default());
41//! let guard = BudgetGuard::acquire(&budget).await;
42//! // ... send request ...
43//! guard.release().await; // explicit async release on success path
44//! // If this line is never reached, Drop releases via tokio::spawn.
45//! # }
46//! ```
47
48use std::sync::Arc;
49use std::time::{Duration, Instant};
50
51use serde_json::Value;
52use tokio::sync::Mutex;
53
54/// Re-export from the ports layer — the canonical definition lives there.
55pub use crate::ports::graphql_plugin::CostThrottleConfig;
56
57// ─────────────────────────────────────────────────────────────────────────────
58// LiveBudget
59// ─────────────────────────────────────────────────────────────────────────────
60
61/// Mutable runtime state tracking the current point budget.
62///
63/// One `LiveBudget` should be shared across all requests to the same plugin
64/// endpoint, wrapped in `Arc<Mutex<LiveBudget>>` to serialise updates.
65#[derive(Debug)]
66pub struct LiveBudget {
67 currently_available: f64,
68 maximum_available: f64,
69 restore_rate: f64, // points/second
70 last_updated: Instant,
71 /// Points reserved for requests currently in-flight.
72 pending: f64,
73}
74
75impl LiveBudget {
76 /// Create a new budget initialised from `config` defaults.
77 #[must_use]
78 pub fn new(config: &CostThrottleConfig) -> Self {
79 Self {
80 currently_available: config.max_points,
81 maximum_available: config.max_points,
82 restore_rate: config.restore_per_sec,
83 last_updated: Instant::now(),
84 pending: 0.0,
85 }
86 }
87
88 /// Update the budget from a throttle-status object.
89 ///
90 /// The JSON path is `extensions.cost.throttleStatus` in the GraphQL response body.
91 ///
92 /// # Example
93 ///
94 /// ```rust
95 /// use serde_json::json;
96 /// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, LiveBudget};
97 ///
98 /// let config = CostThrottleConfig::default();
99 /// let mut budget = LiveBudget::new(&config);
100 ///
101 /// let status = json!({
102 /// "maximumAvailable": 10000.0,
103 /// "currentlyAvailable": 4200.0,
104 /// "restoreRate": 500.0,
105 /// });
106 /// budget.update_from_response(&status);
107 /// ```
108 pub fn update_from_response(&mut self, throttle_status: &Value) {
109 if let Some(max) = throttle_status["maximumAvailable"].as_f64() {
110 self.maximum_available = max;
111 }
112 if let Some(cur) = throttle_status["currentlyAvailable"].as_f64() {
113 self.currently_available = cur;
114 }
115 if let Some(rate) = throttle_status["restoreRate"].as_f64() {
116 self.restore_rate = rate;
117 }
118 self.last_updated = Instant::now();
119 }
120
121 /// Compute the projected available budget accounting for elapsed restore
122 /// time and in-flight reservations.
123 fn projected_available(&self) -> f64 {
124 let elapsed = self.last_updated.elapsed().as_secs_f64();
125 let restored = elapsed * self.restore_rate;
126 let gross = (self.currently_available + restored).min(self.maximum_available);
127 (gross - self.pending).max(0.0)
128 }
129
130 /// Reserve `cost` points for an in-flight request.
131 fn reserve(&mut self, cost: f64) {
132 self.pending += cost;
133 }
134
135 /// Release a previous [`reserve`] once the request has completed.
136 fn release(&mut self, cost: f64) {
137 self.pending = (self.pending - cost).max(0.0);
138 }
139}
140
141// ─────────────────────────────────────────────────────────────────────────────
142// Per-plugin budget store
143// ─────────────────────────────────────────────────────────────────────────────
144
145/// A shareable, cheaply-cloneable handle to a per-plugin `LiveBudget`.
146///
147/// Create one per registered plugin and pass it to [`pre_flight_reserve`] before
148/// each request.
149///
150/// # Example
151///
152/// ```rust
153/// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, PluginBudget};
154///
155/// let budget = PluginBudget::new(CostThrottleConfig::default());
156/// let budget2 = budget.clone(); // cheap Arc clone
157/// ```
158#[derive(Clone, Debug)]
159pub struct PluginBudget {
160 inner: Arc<Mutex<LiveBudget>>,
161 config: CostThrottleConfig,
162}
163
164impl PluginBudget {
165 /// Create a new `PluginBudget` initialised from `config`.
166 #[must_use]
167 pub fn new(config: CostThrottleConfig) -> Self {
168 let budget = LiveBudget::new(&config);
169 Self {
170 inner: Arc::new(Mutex::new(budget)),
171 config,
172 }
173 }
174
175 /// Return the `CostThrottleConfig` this budget was initialised from.
176 #[must_use]
177 pub const fn config(&self) -> &CostThrottleConfig {
178 &self.config
179 }
180}
181
182// ─────────────────────────────────────────────────────────────────────────────
183// BudgetGuard (RAII)
184// ─────────────────────────────────────────────────────────────────────────────
185
186/// RAII guard that automatically releases a budget reservation on drop.
187///
188/// Wraps [`pre_flight_reserve`] and [`release_reservation`] into a
189/// scope-based guard so callers no longer need to track every exit path
190/// manually.
191///
192/// On the **success path**, call [`release`](BudgetGuard::release) for a
193/// clean async release. If `release()` is never called (early return, `?`
194/// propagation, panic), the [`Drop`] impl spawns a background Tokio task to
195/// release the reservation — this is the safety net.
196///
197/// Once `AsyncDrop` stabilises on stable Rust, the explicit `release()`
198/// method can be removed and the async cleanup will happen transparently.
199///
200/// # Example
201///
202/// ```no_run
203/// use stygian_graph::adapters::graphql_throttle::{
204/// CostThrottleConfig, PluginBudget, BudgetGuard,
205/// };
206///
207/// # async fn example() {
208/// let budget = PluginBudget::new(CostThrottleConfig::default());
209/// let guard = BudgetGuard::acquire(&budget).await;
210/// // ... send request ...
211/// guard.release().await; // explicit async release on success path
212/// # }
213/// ```
214pub struct BudgetGuard {
215 /// The budget handle. Set to `None` after explicit `release()`.
216 budget: Option<PluginBudget>,
217 /// Reserved cost to release.
218 cost: f64,
219}
220
221impl BudgetGuard {
222 /// Acquire a reservation from `budget`, sleeping if the projected balance
223 /// is too low.
224 ///
225 /// Returns a guard that will release the reservation when dropped or when
226 /// [`release`](Self::release) is called.
227 ///
228 /// # Example
229 ///
230 /// ```no_run
231 /// use stygian_graph::adapters::graphql_throttle::{
232 /// CostThrottleConfig, PluginBudget, BudgetGuard,
233 /// };
234 ///
235 /// # async fn example() {
236 /// let budget = PluginBudget::new(CostThrottleConfig::default());
237 /// let guard = BudgetGuard::acquire(&budget).await;
238 /// // reservation is now held
239 /// guard.release().await;
240 /// # }
241 /// ```
242 pub async fn acquire(budget: &PluginBudget) -> Self {
243 let cost = pre_flight_reserve(budget).await;
244 Self {
245 budget: Some(budget.clone()),
246 cost,
247 }
248 }
249
250 /// The reserved cost held by this guard.
251 #[must_use]
252 pub fn cost(&self) -> f64 {
253 self.cost
254 }
255
256 /// Explicitly release the reservation (async, preferred on success path).
257 ///
258 /// After calling this, the guard's `Drop` impl becomes a no-op.
259 ///
260 /// # Example
261 ///
262 /// ```no_run
263 /// use stygian_graph::adapters::graphql_throttle::{
264 /// CostThrottleConfig, PluginBudget, BudgetGuard,
265 /// };
266 ///
267 /// # async fn example() {
268 /// let budget = PluginBudget::new(CostThrottleConfig::default());
269 /// let guard = BudgetGuard::acquire(&budget).await;
270 /// guard.release().await;
271 /// # }
272 /// ```
273 pub async fn release(mut self) {
274 if let Some(budget) = self.budget.take() {
275 release_reservation(&budget, self.cost).await;
276 }
277 }
278}
279
280impl Drop for BudgetGuard {
281 fn drop(&mut self) {
282 if let Some(budget) = self.budget.take() {
283 let cost = self.cost;
284 // Safety-net: spawn a background task to release the reservation.
285 // This is the fallback for early returns / `?` / panic unwinds.
286 // Once AsyncDrop stabilises this can be replaced with a proper
287 // async drop implementation.
288 tokio::spawn(async move {
289 release_reservation(&budget, cost).await;
290 tracing::debug!(
291 cost,
292 "BudgetGuard: reservation released via Drop safety-net"
293 );
294 });
295 }
296 }
297}
298
299// ─────────────────────────────────────────────────────────────────────────────
300// Public API
301// ─────────────────────────────────────────────────────────────────────────────
302
303/// Sleep if the projected budget is too low, then atomically reserve an
304/// estimated cost for the upcoming request.
305///
306/// Returns the reserved point amount. **Every** exit path after this call —
307/// both success and error — must call [`release_reservation`] with the returned
308/// value to prevent the pending balance growing indefinitely.
309///
310/// The `Mutex` guard is released before the `.await` to satisfy `Send` bounds.
311///
312/// # Example
313///
314/// ```rust
315/// use stygian_graph::adapters::graphql_throttle::{
316/// CostThrottleConfig, PluginBudget, pre_flight_reserve, release_reservation,
317/// };
318///
319/// # async fn example() {
320/// let budget = PluginBudget::new(CostThrottleConfig::default());
321/// let reserved = pre_flight_reserve(&budget).await;
322/// // ... send the request ...
323/// release_reservation(&budget, reserved).await;
324/// # }
325/// ```
326#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
327pub async fn pre_flight_reserve(budget: &PluginBudget) -> f64 {
328 let estimated_cost = budget.config.estimated_cost_per_request;
329 let delay = {
330 let mut guard = budget.inner.lock().await;
331 let projected = guard.projected_available();
332 let rate = guard.restore_rate.max(1.0);
333 let min = budget.config.min_available;
334 let delay = if projected < min + estimated_cost {
335 let deficit = (min + estimated_cost) - projected;
336 let secs = (deficit / rate) * 1.1;
337 let ms = (secs * 1_000.0) as u64;
338 Some(Duration::from_millis(ms.min(budget.config.max_delay_ms)))
339 } else {
340 None
341 };
342 // Reserve while the lock is held so concurrent callers immediately
343 // see the reduced projected balance.
344 guard.reserve(estimated_cost);
345 delay
346 };
347
348 if let Some(d) = delay {
349 tracing::debug!(
350 delay_ms = d.as_millis(),
351 "graphql throttle: pre-flight delay"
352 );
353 tokio::time::sleep(d).await;
354 }
355
356 estimated_cost
357}
358
359/// Release a reservation made by [`pre_flight_reserve`].
360///
361/// Must be called on every exit path after [`pre_flight_reserve`] — both
362/// success and error — to keep the pending balance accurate. On the success
363/// path, call [`update_budget`] first so the live balance is reconciled from
364/// the server-reported `currentlyAvailable` before the reservation is removed.
365///
366/// # Example
367///
368/// ```rust
369/// use stygian_graph::adapters::graphql_throttle::{
370/// CostThrottleConfig, PluginBudget, pre_flight_reserve, release_reservation,
371/// };
372///
373/// # async fn example() {
374/// let budget = PluginBudget::new(CostThrottleConfig::default());
375/// let reserved = pre_flight_reserve(&budget).await;
376/// release_reservation(&budget, reserved).await;
377/// # }
378/// ```
379pub async fn release_reservation(budget: &PluginBudget, cost: f64) {
380 let mut guard = budget.inner.lock().await;
381 guard.release(cost);
382}
383
384/// Update the `PluginBudget` from a completed response body.
385///
386/// Extracts `extensions.cost.throttleStatus` if present and forwards to
387/// [`LiveBudget::update_from_response`].
388///
389/// # Example
390///
391/// ```rust
392/// use serde_json::json;
393/// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, PluginBudget, update_budget};
394///
395/// # async fn example() {
396/// let budget = PluginBudget::new(CostThrottleConfig::default());
397/// let response = json!({
398/// "data": {},
399/// "extensions": { "cost": { "throttleStatus": {
400/// "maximumAvailable": 10000.0,
401/// "currentlyAvailable": 8000.0,
402/// "restoreRate": 500.0,
403/// }}}
404/// });
405/// update_budget(&budget, &response).await;
406/// # }
407/// ```
408pub async fn update_budget(budget: &PluginBudget, response_body: &Value) {
409 let Some(status) = response_body.pointer("/extensions/cost/throttleStatus") else {
410 return;
411 };
412 if status.is_object() {
413 let mut guard = budget.inner.lock().await;
414 guard.update_from_response(status);
415 }
416}
417
418/// Compute the reactive back-off delay from a throttle response body.
419///
420/// Use this when `extensions.cost.throttleStatus` signals `THROTTLED` rather
421/// than projecting from the `LiveBudget`.
422///
423/// ```text
424/// deficit = max_available − currently_available
425/// base_ms = deficit / restore_rate * 1100
426/// ms = (base_ms * 1.5^attempt).clamp(500, max_delay_ms)
427/// ```
428///
429/// # Example
430///
431/// ```rust
432/// use serde_json::json;
433/// use stygian_graph::adapters::graphql_throttle::{CostThrottleConfig, reactive_backoff_ms};
434///
435/// let config = CostThrottleConfig::default();
436/// let body = json!({ "extensions": { "cost": { "throttleStatus": {
437/// "maximumAvailable": 10000.0,
438/// "currentlyAvailable": 0.0,
439/// "restoreRate": 500.0,
440/// }}}});
441/// let ms = reactive_backoff_ms(&config, &body, 0);
442/// assert!(ms >= 500);
443/// ```
444#[must_use]
445#[allow(
446 clippy::cast_possible_truncation,
447 clippy::cast_sign_loss,
448 clippy::cast_possible_wrap
449)]
450pub fn reactive_backoff_ms(config: &CostThrottleConfig, body: &Value, attempt: u32) -> u64 {
451 let status = body.pointer("/extensions/cost/throttleStatus");
452 let max_avail = status
453 .and_then(|s| s.get("maximumAvailable"))
454 .and_then(Value::as_f64)
455 .unwrap_or(config.max_points);
456 let cur_avail = status
457 .and_then(|s| s.get("currentlyAvailable"))
458 .and_then(Value::as_f64)
459 .unwrap_or(0.0);
460 let restore_rate = status
461 .and_then(|s| s.get("restoreRate"))
462 .and_then(Value::as_f64)
463 .unwrap_or(config.restore_per_sec)
464 .max(1.0);
465 let deficit = (max_avail - cur_avail).max(0.0);
466 let base_secs = if deficit > 0.0 {
467 (deficit / restore_rate) * 1.1
468 } else {
469 0.5
470 };
471 let backoff = base_secs * 1.5_f64.powi(attempt as i32);
472 let ms = (backoff * 1_000.0) as u64;
473 ms.clamp(500, config.max_delay_ms)
474}
475
476// ─────────────────────────────────────────────────────────────────────────────
477// Tests
478// ─────────────────────────────────────────────────────────────────────────────
479
480#[cfg(test)]
481#[allow(
482 clippy::float_cmp,
483 clippy::unwrap_used,
484 clippy::significant_drop_tightening
485)]
486mod tests {
487 use super::*;
488 use serde_json::json;
489
490 #[test]
491 fn live_budget_initialises_from_config() {
492 let config = CostThrottleConfig {
493 max_points: 5_000.0,
494 restore_per_sec: 250.0,
495 min_available: 50.0,
496 max_delay_ms: 10_000,
497 estimated_cost_per_request: 100.0,
498 };
499 let budget = LiveBudget::new(&config);
500 assert_eq!(budget.currently_available, 5_000.0);
501 assert_eq!(budget.maximum_available, 5_000.0);
502 assert_eq!(budget.restore_rate, 250.0);
503 }
504
505 #[test]
506 fn live_budget_updates_from_response() {
507 let config = CostThrottleConfig::default();
508 let mut budget = LiveBudget::new(&config);
509
510 let status = json!({
511 "maximumAvailable": 10_000.0,
512 "currentlyAvailable": 3_000.0,
513 "restoreRate": 500.0,
514 });
515 budget.update_from_response(&status);
516
517 assert_eq!(budget.currently_available, 3_000.0);
518 assert_eq!(budget.maximum_available, 10_000.0);
519 }
520
521 #[test]
522 fn projected_available_accounts_for_restore() {
523 let config = CostThrottleConfig {
524 max_points: 10_000.0,
525 restore_per_sec: 1_000.0, // fast restore for test
526 ..Default::default()
527 };
528 let mut budget = LiveBudget::new(&config);
529 // Simulate a low budget
530 budget.currently_available = 0.0;
531 // Immediately after update, projected = 0 + small_elapsed * 1000
532 // which is ~ 0 (sub-millisecond). Just confirm it doesn't panic.
533 let p = budget.projected_available();
534 assert!(p >= 0.0);
535 assert!(p <= 10_000.0);
536 }
537
538 #[test]
539 fn projected_available_caps_at_maximum() {
540 let config = CostThrottleConfig::default();
541 let budget = LiveBudget::new(&config);
542 // Fresh budget is already at maximum
543 assert!(budget.projected_available() <= budget.maximum_available);
544 }
545
546 #[tokio::test]
547 async fn pre_flight_reserve_does_not_sleep_when_budget_healthy() {
548 let budget = PluginBudget::new(CostThrottleConfig::default());
549 // Budget starts full — no delay expected.
550 let before = Instant::now();
551 let reserved = pre_flight_reserve(&budget).await;
552 assert!(before.elapsed().as_millis() < 100, "unexpected delay");
553 assert_eq!(
554 reserved,
555 CostThrottleConfig::default().estimated_cost_per_request
556 );
557 release_reservation(&budget, reserved).await;
558 }
559
560 #[tokio::test]
561 async fn update_budget_parses_throttle_status() {
562 let budget = PluginBudget::new(CostThrottleConfig::default());
563 let response = json!({
564 "data": {},
565 "extensions": { "cost": { "throttleStatus": {
566 "maximumAvailable": 10_000.0,
567 "currentlyAvailable": 2_500.0,
568 "restoreRate": 500.0,
569 }}}
570 });
571 update_budget(&budget, &response).await;
572 let guard = budget.inner.lock().await;
573 assert_eq!(guard.currently_available, 2_500.0);
574 }
575
576 #[tokio::test]
577 async fn concurrent_reservations_reduce_projected_available() {
578 let config = CostThrottleConfig {
579 max_points: 1_000.0,
580 estimated_cost_per_request: 200.0,
581 ..Default::default()
582 };
583 let budget = PluginBudget::new(config);
584
585 // Each pre_flight_reserve atomically deducts from pending, so the
586 // second caller sees a lower projected balance than the first.
587 let r1 = pre_flight_reserve(&budget).await;
588 let r2 = pre_flight_reserve(&budget).await;
589
590 {
591 let guard = budget.inner.lock().await;
592 // Two reservations of 200 → pending = 400
593 assert!((guard.pending - 400.0).abs() < f64::EPSILON);
594 // projected = 1000 - 400 = 600 (approximately, ignoring sub-ms restore)
595 let projected = guard.projected_available();
596 assert!((599.0..=601.0).contains(&projected));
597 }
598
599 release_reservation(&budget, r1).await;
600 release_reservation(&budget, r2).await;
601
602 let guard = budget.inner.lock().await;
603 assert!(guard.pending < f64::EPSILON);
604 }
605
606 #[test]
607 fn reactive_backoff_ms_clamps_to_500ms_floor() {
608 let config = CostThrottleConfig::default();
609 let body = json!({ "extensions": { "cost": { "throttleStatus": {
610 "maximumAvailable": 10_000.0,
611 "currentlyAvailable": 9_999.0,
612 "restoreRate": 500.0,
613 }}}});
614 let ms = reactive_backoff_ms(&config, &body, 0);
615 assert_eq!(ms, 500); // Very small deficit rounds up to floor
616 }
617
618 #[test]
619 fn reactive_backoff_ms_increases_with_attempt() {
620 let config = CostThrottleConfig::default();
621 let body = json!({ "extensions": { "cost": { "throttleStatus": {
622 "maximumAvailable": 10_000.0,
623 "currentlyAvailable": 5_000.0,
624 "restoreRate": 500.0,
625 }}}});
626 let ms0 = reactive_backoff_ms(&config, &body, 0);
627 let ms1 = reactive_backoff_ms(&config, &body, 1);
628 let ms2 = reactive_backoff_ms(&config, &body, 2);
629 assert!(ms1 > ms0);
630 assert!(ms2 > ms1);
631 }
632
633 #[test]
634 fn reactive_backoff_ms_caps_at_max_delay() {
635 let config = CostThrottleConfig {
636 max_delay_ms: 1_000,
637 ..Default::default()
638 };
639 let body = json!({ "extensions": { "cost": { "throttleStatus": {
640 "maximumAvailable": 10_000.0,
641 "currentlyAvailable": 0.0,
642 "restoreRate": 1.0, // very slow restore → huge deficit
643 }}}});
644 let ms = reactive_backoff_ms(&config, &body, 10);
645 assert_eq!(ms, 1_000);
646 }
647
648 #[tokio::test]
649 async fn budget_guard_releases_on_explicit_release() {
650 let budget = PluginBudget::new(CostThrottleConfig::default());
651 let guard = BudgetGuard::acquire(&budget).await;
652 let cost = guard.cost();
653 assert!(cost > 0.0);
654
655 // Pending should be non-zero while guard is held
656 {
657 let inner = budget.inner.lock().await;
658 assert!(inner.pending >= cost);
659 }
660
661 guard.release().await;
662
663 // Pending should be back to zero
664 let inner = budget.inner.lock().await;
665 assert!(inner.pending < f64::EPSILON);
666 }
667
668 #[tokio::test]
669 async fn budget_guard_releases_on_drop() {
670 let budget = PluginBudget::new(CostThrottleConfig::default());
671
672 {
673 let _guard = BudgetGuard::acquire(&budget).await;
674 // guard drops here without explicit release()
675 }
676
677 // Give the spawned task a moment to run
678 tokio::task::yield_now().await;
679 tokio::time::sleep(Duration::from_millis(10)).await;
680
681 let inner = budget.inner.lock().await;
682 assert!(inner.pending < f64::EPSILON, "Drop should have released");
683 }
684}