throttle/lib.rs
1//! Resource throttling and rate limiting for file operations
2//!
3//! This crate provides throttling mechanisms to control resource usage during file operations.
4//! It helps prevent system overload and allows for controlled resource consumption when working
5//! with large filesets or in resource-constrained environments.
6//!
7//! # Overview
8//!
9//! The throttle system provides three types of rate limiting:
10//!
11//! 1. **Open Files Limit** - Controls the maximum number of simultaneously open files
12//! 2. **Operations Throttle** - Limits the number of operations per second
13//! 3. **I/O Operations Throttle** - Limits the number of I/O operations per second based on chunk size
14//!
15//! All throttling is implemented using token-bucket semaphores that are automatically replenished
16//! at configured intervals.
17//!
18//! # Usage Patterns
19//!
20//! ## Open Files Limit
21//!
22//! Prevents exceeding system file descriptor limits by controlling concurrent file operations:
23//!
24//! ```rust,no_run
25//! use throttle::{set_max_open_files, open_file_permit};
26//!
27//! # async fn example() {
28//! // Configure max open files (typically 80% of system limit)
29//! set_max_open_files(8000);
30//!
31//! // Acquire permit before opening file
32//! let _guard = open_file_permit().await;
33//! // Open file here - permit is automatically released when guard is dropped
34//! # }
35//! ```
36//!
37//! ## Operations Throttling
38//!
39//! Limits general operations per second to reduce system load:
40//!
41//! ```rust,no_run
42//! use throttle::{init_ops_tokens, run_ops_replenish_thread, get_ops_token};
43//! use std::time::Duration;
44//!
45//! # async fn example() {
46//! // Initialize with 100 operations per second
47//! let ops_per_interval = 10;
48//! let interval = Duration::from_millis(100); // 10 tokens / 100ms = 100/sec
49//!
50//! init_ops_tokens(ops_per_interval);
51//!
52//! // Start replenishment in background
53//! tokio::spawn(run_ops_replenish_thread(ops_per_interval, interval));
54//!
55//! // Acquire token before each operation
56//! get_ops_token().await;
57//! // Perform operation here
58//! # }
59//! ```
60//!
61//! ## I/O Operations Throttling
62//!
63//! Limits I/O operations based on file size and chunk size, useful for bandwidth control:
64//!
65//! ```rust,no_run
66//! use throttle::{init_iops_tokens, run_iops_replenish_thread, get_file_iops_tokens};
67//! use std::time::Duration;
68//!
69//! # async fn example() {
70//! // Initialize with desired IOPS limit
71//! let iops_per_interval = 100;
72//! let interval = Duration::from_millis(100);
73//! let chunk_size = 64 * 1024; // 64 KB chunks
74//!
75//! init_iops_tokens(iops_per_interval);
76//! tokio::spawn(run_iops_replenish_thread(iops_per_interval, interval));
77//!
78//! // For a 1 MB file with 64 KB chunks: requires 16 tokens
79//! let file_size = 1024 * 1024;
80//! get_file_iops_tokens(chunk_size, file_size).await;
81//! // Copy file here
82//! # }
83//! ```
84//!
85//! # Token Calculation
86//!
87//! For I/O throttling, the number of tokens required for a file is calculated as:
88//!
89//! ```text
90//! tokens = ⌈file_size / chunk_size⌉
91//! ```
92//!
93//! This allows throttling to be proportional to the amount of data transferred.
94//!
95//! # Replenishment Strategy
96//!
97//! Tokens are replenished using a background task that periodically adds tokens to the semaphore.
98//! The replenishment rate can be tuned by adjusting:
99//!
100//! - **`tokens_per_interval`**: Number of tokens added each interval
101//! - **interval**: Time between replenishments
102//!
103//! For example, to achieve 1000 ops/sec:
104//! - Option 1: 100 tokens every 100ms
105//! - Option 2: 10 tokens every 10ms
106//!
107//! The implementation automatically scales down to prevent excessive granularity while
108//! maintaining the target rate.
109//!
110//! # Thread Safety
111//!
112//! All throttling mechanisms are thread-safe and can be used across multiple async tasks
113//! and threads. The semaphores use efficient `parking_lot` mutexes internally.
114//!
115//! # Performance Considerations
116//!
117//! - **Open Files Limit**: No replenishment needed, permits released automatically
118//! - **Ops/IOPS Throttle**: Background task overhead is minimal (~1 task per throttle type)
119//! - **Token Acquisition**: Async operation that parks task when no tokens available
120//!
121//! # Examples
122//!
123//! ## Complete Throttled Copy Operation
124//!
125//! ```rust,no_run
126//! use throttle::*;
127//! use std::time::Duration;
128//!
129//! async fn setup_throttling() {
130//! // Limit to 80% of 10000 max files
131//! set_max_open_files(8000);
132//!
133//! // 500 operations per second
134//! init_ops_tokens(50);
135//! tokio::spawn(run_ops_replenish_thread(50, Duration::from_millis(100)));
136//!
137//! // 1000 IOPS (with 64KB chunks ≈ 64 MB/s)
138//! init_iops_tokens(100);
139//! tokio::spawn(run_iops_replenish_thread(100, Duration::from_millis(100)));
140//! }
141//!
142//! async fn copy_file_throttled(size: u64) {
143//! let chunk_size = 64 * 1024;
144//!
145//! // Acquire all required permits
146//! get_ops_token().await;
147//! get_file_iops_tokens(chunk_size, size).await;
148//! let _file_guard = open_file_permit().await;
149//!
150//! // Perform copy operation
151//! // ...
152//! }
153//! ```
154
155mod semaphore;
156
157static OPEN_FILES_LIMIT: std::sync::LazyLock<semaphore::Semaphore> =
158 std::sync::LazyLock::new(semaphore::Semaphore::new);
159static OPS_THROTTLE: std::sync::LazyLock<semaphore::Semaphore> =
160 std::sync::LazyLock::new(semaphore::Semaphore::new);
161static IOPS_THROTTLE: std::sync::LazyLock<semaphore::Semaphore> =
162 std::sync::LazyLock::new(semaphore::Semaphore::new);
163
164pub fn set_max_open_files(max_open_files: usize) {
165 OPEN_FILES_LIMIT.setup(max_open_files);
166}
167
168pub struct OpenFileGuard {
169 _permit: Option<tokio::sync::SemaphorePermit<'static>>,
170}
171
172pub async fn open_file_permit() -> OpenFileGuard {
173 OpenFileGuard {
174 _permit: OPEN_FILES_LIMIT.acquire().await,
175 }
176}
177
178pub fn init_ops_tokens(ops_tokens: usize) {
179 OPS_THROTTLE.setup(ops_tokens);
180}
181
182pub fn init_iops_tokens(ops_tokens: usize) {
183 IOPS_THROTTLE.setup(ops_tokens);
184}
185
186pub async fn get_ops_token() {
187 OPS_THROTTLE.consume().await;
188}
189
190async fn get_iops_tokens(tokens: u32) {
191 IOPS_THROTTLE.consume_many(tokens).await;
192}
193
194pub async fn get_file_iops_tokens(chunk_size: u64, file_size: u64) {
195 if chunk_size > 0 {
196 let tokens = 1 + (std::cmp::max(1, file_size) - 1) / chunk_size;
197 if tokens > u64::from(u32::MAX) {
198 tracing::error!(
199 "chunk size: {} is too small to limit throughput for files this big, size: {}",
200 chunk_size,
201 file_size,
202 );
203 } else {
204 // tokens is guaranteed to be <= u32::MAX by check above
205 let tokens_u32 =
206 u32::try_from(tokens).expect("tokens should fit in u32 after bounds check");
207 get_iops_tokens(tokens_u32).await;
208 }
209 }
210}
211
212pub async fn run_ops_replenish_thread(replenish: usize, interval: std::time::Duration) {
213 OPS_THROTTLE.run_replenish_thread(replenish, interval).await;
214}
215
216pub async fn run_iops_replenish_thread(replenish: usize, interval: std::time::Duration) {
217 IOPS_THROTTLE
218 .run_replenish_thread(replenish, interval)
219 .await;
220}