throttle/lib.rs
1//! Resource throttling and rate limiting for file operations
2//!
3//! This crate provides throttling mechanisms to control resource usage during file operations.
4//! It helps prevent system overload and allows for controlled resource consumption when working with large filesets or in resource-constrained environments.
5//!
6//! # Overview
7//!
8//! The throttle system provides three types of rate limiting:
9//!
10//! 1. **Open Files Limit** - Controls the maximum number of simultaneously open files
11//! 2. **Operations Throttle** - Limits the number of operations per second
12//! 3. **I/O Operations Throttle** - Limits the number of I/O operations per second based on chunk size
13//!
14//! All throttling is implemented using token-bucket semaphores that are automatically replenished at configured intervals.
15//!
16//! # Usage Patterns
17//!
18//! ## Open Files Limit
19//!
20//! Prevents exceeding system file descriptor limits by controlling concurrent file operations:
21//!
22//! ```rust,no_run
23//! use throttle::{set_max_open_files, open_file_permit};
24//!
25//! # async fn example() {
26//! // Configure max open files (typically 80% of system limit)
27//! set_max_open_files(8000);
28//!
29//! // Acquire permit before opening file
30//! let _guard = open_file_permit().await;
31//! // Open file here - permit is automatically released when guard is dropped
32//! # }
33//! ```
34//!
35//! ## Operations Throttling
36//!
37//! Limits general operations per second to reduce system load:
38//!
39//! ```rust,no_run
40//! use throttle::{init_ops_tokens, run_ops_replenish_thread, get_ops_token};
41//! use std::time::Duration;
42//!
43//! # async fn example() {
44//! // Initialize with 100 operations per second
45//! let ops_per_interval = 10;
46//! let interval = Duration::from_millis(100); // 10 tokens / 100ms = 100/sec
47//!
48//! init_ops_tokens(ops_per_interval);
49//!
50//! // Start replenishment in background
51//! tokio::spawn(run_ops_replenish_thread(ops_per_interval, interval));
52//!
53//! // Acquire token before each operation
54//! get_ops_token().await;
55//! // Perform operation here
56//! # }
57//! ```
58//!
59//! ## I/O Operations Throttling
60//!
61//! Limits I/O operations based on file size and chunk size, useful for bandwidth control:
62//!
63//! ```rust,no_run
64//! use throttle::{init_iops_tokens, run_iops_replenish_thread, get_file_iops_tokens};
65//! use std::time::Duration;
66//!
67//! # async fn example() {
68//! // Initialize with desired IOPS limit
69//! let iops_per_interval = 100;
70//! let interval = Duration::from_millis(100);
71//! let chunk_size = 64 * 1024; // 64 KB chunks
72//!
73//! init_iops_tokens(iops_per_interval);
74//! tokio::spawn(run_iops_replenish_thread(iops_per_interval, interval));
75//!
76//! // For a 1 MB file with 64 KB chunks: requires 16 tokens
77//! let file_size = 1024 * 1024;
78//! get_file_iops_tokens(chunk_size, file_size).await;
79//! // Copy file here
80//! # }
81//! ```
82//!
83//! # Token Calculation
84//!
85//! For I/O throttling, the number of tokens required for a file is calculated as:
86//!
87//! ```text
88//! tokens = ⌈file_size / chunk_size⌉
89//! ```
90//!
91//! This allows throttling to be proportional to the amount of data transferred.
92//!
93//! # Replenishment Strategy
94//!
95//! Tokens are replenished using a background task that periodically adds tokens to the semaphore.
96//! The replenishment rate can be tuned by adjusting:
97//!
98//! - **`tokens_per_interval`**: Number of tokens added each interval
99//! - **interval**: Time between replenishments
100//!
101//! For example, to achieve 1000 ops/sec:
102//! - Option 1: 100 tokens every 100ms
103//! - Option 2: 10 tokens every 10ms
104//!
105//! The implementation automatically scales down to prevent excessive granularity while maintaining the target rate.
106//!
107//! # Thread Safety
108//!
109//! All throttling mechanisms are thread-safe and can be used across multiple async tasks and threads. The semaphores use efficient `parking_lot` mutexes internally.
110//!
111//! # Performance Considerations
112//!
113//! - **Open Files Limit**: No replenishment needed, permits released automatically
114//! - **Ops/IOPS Throttle**: Background task overhead is minimal (~1 task per throttle type)
115//! - **Token Acquisition**: Async operation that parks task when no tokens available
116//!
117//! # Examples
118//!
119//! ## Complete Throttled Copy Operation
120//!
121//! ```rust,no_run
122//! use throttle::*;
123//! use std::time::Duration;
124//!
125//! async fn setup_throttling() {
126//! // Limit to 80% of 10000 max files
127//! set_max_open_files(8000);
128//!
129//! // 500 operations per second
130//! init_ops_tokens(50);
131//! tokio::spawn(run_ops_replenish_thread(50, Duration::from_millis(100)));
132//!
133//! // 1000 IOPS (with 64KB chunks ≈ 64 MB/s)
134//! init_iops_tokens(100);
135//! tokio::spawn(run_iops_replenish_thread(100, Duration::from_millis(100)));
136//! }
137//!
138//! async fn copy_file_throttled(size: u64) {
139//! let chunk_size = 64 * 1024;
140//!
141//! // Acquire all required permits
142//! get_ops_token().await;
143//! get_file_iops_tokens(chunk_size, size).await;
144//! let _file_guard = open_file_permit().await;
145//!
146//! // Perform copy operation
147//! // ...
148//! }
149//! ```
150
151mod semaphore;
152
153static OPEN_FILES_LIMIT: std::sync::LazyLock<semaphore::Semaphore> =
154 std::sync::LazyLock::new(semaphore::Semaphore::new);
155static OPS_THROTTLE: std::sync::LazyLock<semaphore::Semaphore> =
156 std::sync::LazyLock::new(semaphore::Semaphore::new);
157static IOPS_THROTTLE: std::sync::LazyLock<semaphore::Semaphore> =
158 std::sync::LazyLock::new(semaphore::Semaphore::new);
159
160pub fn set_max_open_files(max_open_files: usize) {
161 OPEN_FILES_LIMIT.setup(max_open_files);
162}
163
164pub struct OpenFileGuard {
165 _permit: Option<tokio::sync::SemaphorePermit<'static>>,
166}
167
168pub async fn open_file_permit() -> OpenFileGuard {
169 OpenFileGuard {
170 _permit: OPEN_FILES_LIMIT.acquire().await,
171 }
172}
173
174pub fn init_ops_tokens(ops_tokens: usize) {
175 OPS_THROTTLE.setup(ops_tokens);
176}
177
178pub fn init_iops_tokens(ops_tokens: usize) {
179 IOPS_THROTTLE.setup(ops_tokens);
180}
181
182pub async fn get_ops_token() {
183 OPS_THROTTLE.consume().await;
184}
185
186async fn get_iops_tokens(tokens: u32) {
187 IOPS_THROTTLE.consume_many(tokens).await;
188}
189
190pub async fn get_file_iops_tokens(chunk_size: u64, file_size: u64) {
191 if chunk_size > 0 {
192 let tokens = 1 + (std::cmp::max(1, file_size) - 1) / chunk_size;
193 if tokens > u64::from(u32::MAX) {
194 tracing::error!(
195 "chunk size: {} is too small to limit throughput for files this big, size: {}",
196 chunk_size,
197 file_size,
198 );
199 } else {
200 // tokens is guaranteed to be <= u32::MAX by check above
201 let tokens_u32 =
202 u32::try_from(tokens).expect("tokens should fit in u32 after bounds check");
203 get_iops_tokens(tokens_u32).await;
204 }
205 }
206}
207
208pub async fn run_ops_replenish_thread(replenish: usize, interval: std::time::Duration) {
209 OPS_THROTTLE.run_replenish_thread(replenish, interval).await;
210}
211
212pub async fn run_iops_replenish_thread(replenish: usize, interval: std::time::Duration) {
213 IOPS_THROTTLE
214 .run_replenish_thread(replenish, interval)
215 .await;
216}