Skip to main content

graphrag_core/async_processing/
rate_limiting.rs

1//! Rate limiting for API call throttling and concurrency control.
2//!
3//! This module provides the [`RateLimiter`] for controlling both the concurrency and
4//! frequency of API calls to prevent throttling and respect service limits.
5//!
6//! # Main Types
7//!
8//! - [`RateLimiter`]: Dual-level rate limiter with semaphore-based concurrency control
9//!   and time-based request throttling
10//!
11//! # Features
12//!
13//! - Separate rate limiting for LLM and embedding API calls
14//! - Semaphore-based concurrency control (max N simultaneous calls)
15//! - Time-based rate limiting (max N calls per second)
16//! - Automatic waiting when limits are reached
17//! - RAII-style permit handling with automatic release
18//! - Health checking for congestion detection
19//! - Per-second rate window with automatic reset
20//!
21//! # Rate Limiting Strategy
22//!
23//! The rate limiter implements a two-tier approach:
24//!
25//! 1. **Concurrency Control**: Uses semaphores to limit how many API calls can run
26//!    simultaneously. This prevents overwhelming the system with too many parallel requests.
27//!
28//! 2. **Time-Based Rate Limiting**: Tracks requests per second and automatically waits
29//!    when the limit is reached. The counter resets every second.
30//!
31//! # Basic Usage
32//!
33//! ```rust,ignore
34//! use graphrag_core::async_processing::{RateLimiter, AsyncConfig};
35//!
36//! let config = AsyncConfig {
37//!     max_concurrent_llm_calls: 3,
38//!     llm_rate_limit_per_second: 2.0,
39//!     max_concurrent_embeddings: 5,
40//!     embedding_rate_limit_per_second: 10.0,
41//!     ..Default::default()
42//! };
43//!
44//! let rate_limiter = RateLimiter::new(&config);
45//!
46//! // Acquire permit for LLM call (blocks if needed)
47//! let permit = rate_limiter.acquire_llm_permit().await?;
48//! // ... make LLM API call ...
49//! // Permit is automatically released when dropped
50//!
51//! // Check available capacity
52//! let available = rate_limiter.get_available_llm_permits();
53//! println!("Available LLM permits: {}", available);
54//!
55//! // Health check
56//! let status = rate_limiter.health_check();
57//! ```
58
59use std::sync::Arc;
60use std::time::{Duration, Instant};
61use tokio::sync::{Semaphore, SemaphorePermit};
62use tokio::time;
63
64use super::{AsyncConfig, ComponentStatus};
65use crate::core::GraphRAGError;
66
67/// Rate limiter for controlling API call frequency and concurrency
68///
69/// Provides dual-level throttling: semaphore-based concurrency control and
70/// time-based rate limiting for both LLM and embedding API calls.
71#[derive(Debug)]
72pub struct RateLimiter {
73    /// Semaphore limiting concurrent LLM API calls
74    llm_semaphore: Arc<Semaphore>,
75    /// Semaphore limiting concurrent embedding API calls
76    embedding_semaphore: Arc<Semaphore>,
77    /// Tracker for LLM API call rate limiting
78    llm_rate_tracker: Arc<tokio::sync::Mutex<RateTracker>>,
79    /// Tracker for embedding API call rate limiting
80    embedding_rate_tracker: Arc<tokio::sync::Mutex<RateTracker>>,
81    /// Configuration settings
82    config: AsyncConfig,
83}
84
85/// Internal tracker for time-based rate limiting
86#[derive(Debug)]
87struct RateTracker {
88    /// Timestamp of the last request
89    last_request: Option<Instant>,
90    /// Number of requests made in the current second
91    requests_this_second: u32,
92    /// Maximum requests allowed per second
93    rate_limit: f64,
94}
95
96impl RateTracker {
97    /// Creates a new rate tracker with specified rate limit
98    ///
99    /// # Parameters
100    /// - `rate_limit`: Maximum requests allowed per second
101    fn new(rate_limit: f64) -> Self {
102        Self {
103            last_request: None,
104            requests_this_second: 0,
105            rate_limit,
106        }
107    }
108
109    /// Checks rate limit and waits if necessary before allowing request
110    ///
111    /// Automatically resets the counter when entering a new second. If the
112    /// rate limit is reached, waits until the next second before proceeding.
113    ///
114    /// # Returns
115    /// Ok if request can proceed, or an error if rate limiting fails
116    async fn wait_if_needed(&mut self) -> Result<(), GraphRAGError> {
117        let now = Instant::now();
118
119        if let Some(last_request) = self.last_request {
120            let time_since_last = now.duration_since(last_request);
121
122            // Reset counter if we're in a new second
123            if time_since_last >= Duration::from_secs(1) {
124                self.requests_this_second = 0;
125            }
126
127            // Check if we need to wait
128            if self.requests_this_second as f64 >= self.rate_limit {
129                let wait_time = Duration::from_secs(1) - time_since_last;
130                if wait_time > Duration::ZERO {
131                    time::sleep(wait_time).await;
132                }
133                self.requests_this_second = 0;
134            }
135        }
136
137        self.last_request = Some(now);
138        self.requests_this_second += 1;
139
140        Ok(())
141    }
142}
143
144impl RateLimiter {
145    /// Creates a new rate limiter from configuration
146    ///
147    /// Initializes semaphores and rate trackers for both LLM and embedding API calls.
148    ///
149    /// # Parameters
150    /// - `config`: Configuration specifying concurrency and rate limits
151    pub fn new(config: &AsyncConfig) -> Self {
152        Self {
153            llm_semaphore: Arc::new(Semaphore::new(config.max_concurrent_llm_calls)),
154            embedding_semaphore: Arc::new(Semaphore::new(config.max_concurrent_embeddings)),
155            llm_rate_tracker: Arc::new(tokio::sync::Mutex::new(RateTracker::new(
156                config.llm_rate_limit_per_second,
157            ))),
158            embedding_rate_tracker: Arc::new(tokio::sync::Mutex::new(RateTracker::new(
159                config.embedding_rate_limit_per_second,
160            ))),
161            config: config.clone(),
162        }
163    }
164
165    /// Acquires a permit for making an LLM API call
166    ///
167    /// Blocks until both concurrency and rate limits allow the call to proceed.
168    /// The permit must be held for the duration of the API call and will be
169    /// released when dropped.
170    ///
171    /// # Returns
172    /// Semaphore permit on success, or an error if acquisition fails
173    pub async fn acquire_llm_permit(&self) -> Result<SemaphorePermit<'_>, GraphRAGError> {
174        // First acquire the semaphore permit for concurrency control
175        let permit = self
176            .llm_semaphore
177            .acquire()
178            .await
179            .map_err(|e| GraphRAGError::RateLimit {
180                message: format!("Failed to acquire LLM permit: {e}"),
181            })?;
182
183        // Then check rate limiting
184        {
185            let mut rate_tracker = self.llm_rate_tracker.lock().await;
186            rate_tracker.wait_if_needed().await?;
187        }
188
189        Ok(permit)
190    }
191
192    /// Acquires a permit for making an embedding API call
193    ///
194    /// Blocks until both concurrency and rate limits allow the call to proceed.
195    /// The permit must be held for the duration of the API call and will be
196    /// released when dropped.
197    ///
198    /// # Returns
199    /// Semaphore permit on success, or an error if acquisition fails
200    pub async fn acquire_embedding_permit(&self) -> Result<SemaphorePermit<'_>, GraphRAGError> {
201        // First acquire the semaphore permit for concurrency control
202        let permit =
203            self.embedding_semaphore
204                .acquire()
205                .await
206                .map_err(|e| GraphRAGError::RateLimit {
207                    message: format!("Failed to acquire embedding permit: {e}"),
208                })?;
209
210        // Then check rate limiting
211        {
212            let mut rate_tracker = self.embedding_rate_tracker.lock().await;
213            rate_tracker.wait_if_needed().await?;
214        }
215
216        Ok(permit)
217    }
218
219    /// Returns the number of available LLM permits
220    ///
221    /// # Returns
222    /// Number of LLM API calls that can be made immediately without waiting
223    pub fn get_available_llm_permits(&self) -> usize {
224        self.llm_semaphore.available_permits()
225    }
226
227    /// Returns the number of available embedding permits
228    ///
229    /// # Returns
230    /// Number of embedding API calls that can be made immediately without waiting
231    pub fn get_available_embedding_permits(&self) -> usize {
232        self.embedding_semaphore.available_permits()
233    }
234
235    /// Performs a health check on the rate limiter
236    ///
237    /// Checks permit availability to determine if the system is healthy or
238    /// experiencing congestion.
239    ///
240    /// # Returns
241    /// Component status indicating health (Healthy, Warning, or Error)
242    pub fn health_check(&self) -> ComponentStatus {
243        let llm_available = self.get_available_llm_permits();
244        let embedding_available = self.get_available_embedding_permits();
245
246        if llm_available == 0 && embedding_available == 0 {
247            ComponentStatus::Warning("No permits available".to_string())
248        } else if llm_available == 0 {
249            ComponentStatus::Warning("No LLM permits available".to_string())
250        } else if embedding_available == 0 {
251            ComponentStatus::Warning("No embedding permits available".to_string())
252        } else {
253            ComponentStatus::Healthy
254        }
255    }
256
257    /// Returns the current configuration
258    ///
259    /// # Returns
260    /// Reference to the async processing configuration
261    pub fn get_config(&self) -> &AsyncConfig {
262        &self.config
263    }
264}