throttle/
lib.rs

1//! Resource throttling and rate limiting for file operations
2//!
3//! This crate provides throttling mechanisms to control resource usage during file operations.
4//! It helps prevent system overload and allows for controlled resource consumption when working with large filesets or in resource-constrained environments.
5//!
6//! # Overview
7//!
8//! The throttle system provides three types of rate limiting:
9//!
10//! 1. **Open Files Limit** - Controls the maximum number of simultaneously open files
11//! 2. **Operations Throttle** - Limits the number of operations per second
12//! 3. **I/O Operations Throttle** - Limits the number of I/O operations per second based on chunk size
13//!
14//! All throttling is implemented using token-bucket semaphores that are automatically replenished at configured intervals.
15//!
16//! # Usage Patterns
17//!
18//! ## Open Files Limit
19//!
20//! Prevents exceeding system file descriptor limits by controlling concurrent file operations:
21//!
22//! ```rust,no_run
23//! use throttle::{set_max_open_files, open_file_permit};
24//!
25//! # async fn example() {
26//! // Configure max open files (typically 80% of system limit)
27//! set_max_open_files(8000);
28//!
29//! // Acquire permit before opening file
30//! let _guard = open_file_permit().await;
31//! // Open file here - permit is automatically released when guard is dropped
32//! # }
33//! ```
34//!
35//! ## Operations Throttling
36//!
37//! Limits general operations per second to reduce system load:
38//!
39//! ```rust,no_run
40//! use throttle::{init_ops_tokens, run_ops_replenish_thread, get_ops_token};
41//! use std::time::Duration;
42//!
43//! # async fn example() {
44//! // Initialize with 100 operations per second
45//! let ops_per_interval = 10;
46//! let interval = Duration::from_millis(100); // 10 tokens / 100ms = 100/sec
47//!
48//! init_ops_tokens(ops_per_interval);
49//!
50//! // Start replenishment in background
51//! tokio::spawn(run_ops_replenish_thread(ops_per_interval, interval));
52//!
53//! // Acquire token before each operation
54//! get_ops_token().await;
55//! // Perform operation here
56//! # }
57//! ```
58//!
59//! ## I/O Operations Throttling
60//!
61//! Limits I/O operations based on file size and chunk size, useful for bandwidth control:
62//!
63//! ```rust,no_run
64//! use throttle::{init_iops_tokens, run_iops_replenish_thread, get_file_iops_tokens};
65//! use std::time::Duration;
66//!
67//! # async fn example() {
68//! // Initialize with desired IOPS limit
69//! let iops_per_interval = 100;
70//! let interval = Duration::from_millis(100);
71//! let chunk_size = 64 * 1024; // 64 KB chunks
72//!
73//! init_iops_tokens(iops_per_interval);
74//! tokio::spawn(run_iops_replenish_thread(iops_per_interval, interval));
75//!
76//! // For a 1 MB file with 64 KB chunks: requires 16 tokens
77//! let file_size = 1024 * 1024;
78//! get_file_iops_tokens(chunk_size, file_size).await;
79//! // Copy file here
80//! # }
81//! ```
82//!
83//! # Token Calculation
84//!
85//! For I/O throttling, the number of tokens required for a file is calculated as:
86//!
87//! ```text
88//! tokens = ⌈file_size / chunk_size⌉
89//! ```
90//!
91//! This allows throttling to be proportional to the amount of data transferred.
92//!
93//! # Replenishment Strategy
94//!
95//! Tokens are replenished using a background task that periodically adds tokens to the semaphore.
96//! The replenishment rate can be tuned by adjusting:
97//!
98//! - **`tokens_per_interval`**: Number of tokens added each interval
99//! - **interval**: Time between replenishments
100//!
101//! For example, to achieve 1000 ops/sec:
102//! - Option 1: 100 tokens every 100ms
103//! - Option 2: 10 tokens every 10ms
104//!
105//! The implementation automatically scales down to prevent excessive granularity while maintaining the target rate.
106//!
107//! # Thread Safety
108//!
109//! All throttling mechanisms are thread-safe and can be used across multiple async tasks and threads. The semaphores use efficient `parking_lot` mutexes internally.
110//!
111//! # Performance Considerations
112//!
113//! - **Open Files Limit**: No replenishment needed, permits released automatically
114//! - **Ops/IOPS Throttle**: Background task overhead is minimal (~1 task per throttle type)
115//! - **Token Acquisition**: Async operation that parks task when no tokens available
116//!
117//! # Examples
118//!
119//! ## Complete Throttled Copy Operation
120//!
121//! ```rust,no_run
122//! use throttle::*;
123//! use std::time::Duration;
124//!
125//! async fn setup_throttling() {
126//!     // Limit to 80% of 10000 max files
127//!     set_max_open_files(8000);
128//!
129//!     // 500 operations per second
130//!     init_ops_tokens(50);
131//!     tokio::spawn(run_ops_replenish_thread(50, Duration::from_millis(100)));
132//!
133//!     // 1000 IOPS (with 64KB chunks ≈ 64 MB/s)
134//!     init_iops_tokens(100);
135//!     tokio::spawn(run_iops_replenish_thread(100, Duration::from_millis(100)));
136//! }
137//!
138//! async fn copy_file_throttled(size: u64) {
139//!     let chunk_size = 64 * 1024;
140//!
141//!     // Acquire all required permits
142//!     get_ops_token().await;
143//!     get_file_iops_tokens(chunk_size, size).await;
144//!     let _file_guard = open_file_permit().await;
145//!
146//!     // Perform copy operation
147//!     // ...
148//! }
149//! ```
150
151mod semaphore;
152
153static OPEN_FILES_LIMIT: std::sync::LazyLock<semaphore::Semaphore> =
154    std::sync::LazyLock::new(semaphore::Semaphore::new);
155static OPS_THROTTLE: std::sync::LazyLock<semaphore::Semaphore> =
156    std::sync::LazyLock::new(semaphore::Semaphore::new);
157static IOPS_THROTTLE: std::sync::LazyLock<semaphore::Semaphore> =
158    std::sync::LazyLock::new(semaphore::Semaphore::new);
159
160pub fn set_max_open_files(max_open_files: usize) {
161    OPEN_FILES_LIMIT.setup(max_open_files);
162}
163
164pub struct OpenFileGuard {
165    _permit: Option<tokio::sync::SemaphorePermit<'static>>,
166}
167
168pub async fn open_file_permit() -> OpenFileGuard {
169    OpenFileGuard {
170        _permit: OPEN_FILES_LIMIT.acquire().await,
171    }
172}
173
174pub fn init_ops_tokens(ops_tokens: usize) {
175    OPS_THROTTLE.setup(ops_tokens);
176}
177
178pub fn init_iops_tokens(ops_tokens: usize) {
179    IOPS_THROTTLE.setup(ops_tokens);
180}
181
182pub async fn get_ops_token() {
183    OPS_THROTTLE.consume().await;
184}
185
186async fn get_iops_tokens(tokens: u32) {
187    IOPS_THROTTLE.consume_many(tokens).await;
188}
189
190pub async fn get_file_iops_tokens(chunk_size: u64, file_size: u64) {
191    if chunk_size > 0 {
192        let tokens = 1 + (std::cmp::max(1, file_size) - 1) / chunk_size;
193        if tokens > u64::from(u32::MAX) {
194            tracing::error!(
195                "chunk size: {} is too small to limit throughput for files this big, size: {}",
196                chunk_size,
197                file_size,
198            );
199        } else {
200            // tokens is guaranteed to be <= u32::MAX by check above
201            let tokens_u32 =
202                u32::try_from(tokens).expect("tokens should fit in u32 after bounds check");
203            get_iops_tokens(tokens_u32).await;
204        }
205    }
206}
207
208pub async fn run_ops_replenish_thread(replenish: usize, interval: std::time::Duration) {
209    OPS_THROTTLE.run_replenish_thread(replenish, interval).await;
210}
211
212pub async fn run_iops_replenish_thread(replenish: usize, interval: std::time::Duration) {
213    IOPS_THROTTLE
214        .run_replenish_thread(replenish, interval)
215        .await;
216}