graphrag_core/async_processing/rate_limiting.rs
1//! Rate limiting for API call throttling and concurrency control.
2//!
3//! This module provides the [`RateLimiter`] for controlling both the concurrency and
4//! frequency of API calls to prevent throttling and respect service limits.
5//!
6//! # Main Types
7//!
8//! - [`RateLimiter`]: Dual-level rate limiter with semaphore-based concurrency control
9//! and time-based request throttling
10//!
11//! # Features
12//!
13//! - Separate rate limiting for LLM and embedding API calls
14//! - Semaphore-based concurrency control (max N simultaneous calls)
15//! - Time-based rate limiting (max N calls per second)
16//! - Automatic waiting when limits are reached
17//! - RAII-style permit handling with automatic release
18//! - Health checking for congestion detection
19//! - Per-second rate window with automatic reset
20//!
21//! # Rate Limiting Strategy
22//!
23//! The rate limiter implements a two-tier approach:
24//!
25//! 1. **Concurrency Control**: Uses semaphores to limit how many API calls can run
26//! simultaneously. This prevents overwhelming the system with too many parallel requests.
27//!
28//! 2. **Time-Based Rate Limiting**: Tracks requests per second and automatically waits
29//! when the limit is reached. The counter resets every second.
30//!
31//! # Basic Usage
32//!
33//! ```rust,ignore
34//! use graphrag_core::async_processing::{RateLimiter, AsyncConfig};
35//!
36//! let config = AsyncConfig {
37//! max_concurrent_llm_calls: 3,
38//! llm_rate_limit_per_second: 2.0,
39//! max_concurrent_embeddings: 5,
40//! embedding_rate_limit_per_second: 10.0,
41//! ..Default::default()
42//! };
43//!
44//! let rate_limiter = RateLimiter::new(&config);
45//!
46//! // Acquire permit for LLM call (blocks if needed)
47//! let permit = rate_limiter.acquire_llm_permit().await?;
48//! // ... make LLM API call ...
49//! // Permit is automatically released when dropped
50//!
51//! // Check available capacity
52//! let available = rate_limiter.get_available_llm_permits();
53//! println!("Available LLM permits: {}", available);
54//!
55//! // Health check
56//! let status = rate_limiter.health_check();
57//! ```
58
59use std::sync::Arc;
60use std::time::{Duration, Instant};
61use tokio::sync::{Semaphore, SemaphorePermit};
62use tokio::time;
63
64use super::{AsyncConfig, ComponentStatus};
65use crate::core::GraphRAGError;
66
67/// Rate limiter for controlling API call frequency and concurrency
68///
69/// Provides dual-level throttling: semaphore-based concurrency control and
70/// time-based rate limiting for both LLM and embedding API calls.
71#[derive(Debug)]
72pub struct RateLimiter {
73 /// Semaphore limiting concurrent LLM API calls
74 llm_semaphore: Arc<Semaphore>,
75 /// Semaphore limiting concurrent embedding API calls
76 embedding_semaphore: Arc<Semaphore>,
77 /// Tracker for LLM API call rate limiting
78 llm_rate_tracker: Arc<tokio::sync::Mutex<RateTracker>>,
79 /// Tracker for embedding API call rate limiting
80 embedding_rate_tracker: Arc<tokio::sync::Mutex<RateTracker>>,
81 /// Configuration settings
82 config: AsyncConfig,
83}
84
85/// Internal tracker for time-based rate limiting
86#[derive(Debug)]
87struct RateTracker {
88 /// Timestamp of the last request
89 last_request: Option<Instant>,
90 /// Number of requests made in the current second
91 requests_this_second: u32,
92 /// Maximum requests allowed per second
93 rate_limit: f64,
94}
95
96impl RateTracker {
97 /// Creates a new rate tracker with specified rate limit
98 ///
99 /// # Parameters
100 /// - `rate_limit`: Maximum requests allowed per second
101 fn new(rate_limit: f64) -> Self {
102 Self {
103 last_request: None,
104 requests_this_second: 0,
105 rate_limit,
106 }
107 }
108
109 /// Checks rate limit and waits if necessary before allowing request
110 ///
111 /// Automatically resets the counter when entering a new second. If the
112 /// rate limit is reached, waits until the next second before proceeding.
113 ///
114 /// # Returns
115 /// Ok if request can proceed, or an error if rate limiting fails
116 async fn wait_if_needed(&mut self) -> Result<(), GraphRAGError> {
117 let now = Instant::now();
118
119 if let Some(last_request) = self.last_request {
120 let time_since_last = now.duration_since(last_request);
121
122 // Reset counter if we're in a new second
123 if time_since_last >= Duration::from_secs(1) {
124 self.requests_this_second = 0;
125 }
126
127 // Check if we need to wait
128 if self.requests_this_second as f64 >= self.rate_limit {
129 let wait_time = Duration::from_secs(1) - time_since_last;
130 if wait_time > Duration::ZERO {
131 time::sleep(wait_time).await;
132 }
133 self.requests_this_second = 0;
134 }
135 }
136
137 self.last_request = Some(now);
138 self.requests_this_second += 1;
139
140 Ok(())
141 }
142}
143
144impl RateLimiter {
145 /// Creates a new rate limiter from configuration
146 ///
147 /// Initializes semaphores and rate trackers for both LLM and embedding API calls.
148 ///
149 /// # Parameters
150 /// - `config`: Configuration specifying concurrency and rate limits
151 pub fn new(config: &AsyncConfig) -> Self {
152 Self {
153 llm_semaphore: Arc::new(Semaphore::new(config.max_concurrent_llm_calls)),
154 embedding_semaphore: Arc::new(Semaphore::new(config.max_concurrent_embeddings)),
155 llm_rate_tracker: Arc::new(tokio::sync::Mutex::new(RateTracker::new(
156 config.llm_rate_limit_per_second,
157 ))),
158 embedding_rate_tracker: Arc::new(tokio::sync::Mutex::new(RateTracker::new(
159 config.embedding_rate_limit_per_second,
160 ))),
161 config: config.clone(),
162 }
163 }
164
165 /// Acquires a permit for making an LLM API call
166 ///
167 /// Blocks until both concurrency and rate limits allow the call to proceed.
168 /// The permit must be held for the duration of the API call and will be
169 /// released when dropped.
170 ///
171 /// # Returns
172 /// Semaphore permit on success, or an error if acquisition fails
173 pub async fn acquire_llm_permit(&self) -> Result<SemaphorePermit<'_>, GraphRAGError> {
174 // First acquire the semaphore permit for concurrency control
175 let permit = self
176 .llm_semaphore
177 .acquire()
178 .await
179 .map_err(|e| GraphRAGError::RateLimit {
180 message: format!("Failed to acquire LLM permit: {e}"),
181 })?;
182
183 // Then check rate limiting
184 {
185 let mut rate_tracker = self.llm_rate_tracker.lock().await;
186 rate_tracker.wait_if_needed().await?;
187 }
188
189 Ok(permit)
190 }
191
192 /// Acquires a permit for making an embedding API call
193 ///
194 /// Blocks until both concurrency and rate limits allow the call to proceed.
195 /// The permit must be held for the duration of the API call and will be
196 /// released when dropped.
197 ///
198 /// # Returns
199 /// Semaphore permit on success, or an error if acquisition fails
200 pub async fn acquire_embedding_permit(&self) -> Result<SemaphorePermit<'_>, GraphRAGError> {
201 // First acquire the semaphore permit for concurrency control
202 let permit =
203 self.embedding_semaphore
204 .acquire()
205 .await
206 .map_err(|e| GraphRAGError::RateLimit {
207 message: format!("Failed to acquire embedding permit: {e}"),
208 })?;
209
210 // Then check rate limiting
211 {
212 let mut rate_tracker = self.embedding_rate_tracker.lock().await;
213 rate_tracker.wait_if_needed().await?;
214 }
215
216 Ok(permit)
217 }
218
219 /// Returns the number of available LLM permits
220 ///
221 /// # Returns
222 /// Number of LLM API calls that can be made immediately without waiting
223 pub fn get_available_llm_permits(&self) -> usize {
224 self.llm_semaphore.available_permits()
225 }
226
227 /// Returns the number of available embedding permits
228 ///
229 /// # Returns
230 /// Number of embedding API calls that can be made immediately without waiting
231 pub fn get_available_embedding_permits(&self) -> usize {
232 self.embedding_semaphore.available_permits()
233 }
234
235 /// Performs a health check on the rate limiter
236 ///
237 /// Checks permit availability to determine if the system is healthy or
238 /// experiencing congestion.
239 ///
240 /// # Returns
241 /// Component status indicating health (Healthy, Warning, or Error)
242 pub fn health_check(&self) -> ComponentStatus {
243 let llm_available = self.get_available_llm_permits();
244 let embedding_available = self.get_available_embedding_permits();
245
246 if llm_available == 0 && embedding_available == 0 {
247 ComponentStatus::Warning("No permits available".to_string())
248 } else if llm_available == 0 {
249 ComponentStatus::Warning("No LLM permits available".to_string())
250 } else if embedding_available == 0 {
251 ComponentStatus::Warning("No embedding permits available".to_string())
252 } else {
253 ComponentStatus::Healthy
254 }
255 }
256
257 /// Returns the current configuration
258 ///
259 /// # Returns
260 /// Reference to the async processing configuration
261 pub fn get_config(&self) -> &AsyncConfig {
262 &self.config
263 }
264}