throttle/lib.rs
1//! Resource throttling and rate limiting for file operations
2//!
3//! This crate provides throttling mechanisms to control resource usage during file operations.
4//! It helps prevent system overload and allows for controlled resource consumption when working with large filesets or in resource-constrained environments.
5//!
6//! # Overview
7//!
8//! The throttle system provides three types of rate limiting:
9//!
10//! 1. **Open Files Limit** - Controls the maximum number of simultaneously open files
11//! 2. **Operations Throttle** - Limits the number of operations per second
12//! 3. **I/O Operations Throttle** - Limits the number of I/O operations per second based on chunk size
13//!
14//! All throttling is implemented using token-bucket semaphores that are automatically replenished at configured intervals.
15//!
16//! # Usage Patterns
17//!
18//! ## Open Files Limit
19//!
20//! Prevents exceeding system file descriptor limits by controlling concurrent file operations:
21//!
22//! ```rust,no_run
23//! use throttle::{set_max_open_files, open_file_permit};
24//!
25//! # async fn example() {
26//! // Configure max open files (typically 80% of system limit)
27//! set_max_open_files(8000);
28//!
29//! // Acquire permit before opening file
30//! let _guard = open_file_permit().await;
31//! // Open file here - permit is automatically released when guard is dropped
32//! # }
33//! ```
34//!
35//! ## Operations Throttling
36//!
37//! Limits general operations per second to reduce system load:
38//!
39//! ```rust,no_run
40//! use throttle::{init_ops_tokens, run_ops_replenish_thread, get_ops_token};
41//! use std::time::Duration;
42//!
43//! # async fn example() {
44//! // Initialize with 100 operations per second
45//! let ops_per_interval = 10;
46//! let interval = Duration::from_millis(100); // 10 tokens / 100ms = 100/sec
47//!
48//! init_ops_tokens(ops_per_interval);
49//!
50//! // Start replenishment in background
51//! tokio::spawn(run_ops_replenish_thread(ops_per_interval, interval));
52//!
53//! // Acquire token before each operation
54//! get_ops_token().await;
55//! // Perform operation here
56//! # }
57//! ```
58//!
59//! ## I/O Operations Throttling
60//!
61//! Limits I/O operations based on file size and chunk size, useful for bandwidth control:
62//!
63//! ```rust,no_run
64//! use throttle::{init_iops_tokens, run_iops_replenish_thread, get_file_iops_tokens};
65//! use std::time::Duration;
66//!
67//! # async fn example() {
68//! // Initialize with desired IOPS limit
69//! let iops_per_interval = 100;
70//! let interval = Duration::from_millis(100);
71//! let chunk_size = 64 * 1024; // 64 KB chunks
72//!
73//! init_iops_tokens(iops_per_interval);
74//! tokio::spawn(run_iops_replenish_thread(iops_per_interval, interval));
75//!
76//! // For a 1 MB file with 64 KB chunks: requires 16 tokens
77//! let file_size = 1024 * 1024;
78//! get_file_iops_tokens(chunk_size, file_size).await;
79//! // Copy file here
80//! # }
81//! ```
82//!
83//! # Token Calculation
84//!
85//! For I/O throttling, the number of tokens required for a file is calculated as:
86//!
87//! ```text
88//! tokens = ⌈file_size / chunk_size⌉
89//! ```
90//!
91//! This allows throttling to be proportional to the amount of data transferred.
92//!
93//! # Replenishment Strategy
94//!
95//! Tokens are replenished using a background task that periodically adds tokens to the semaphore.
96//! The replenishment rate can be tuned by adjusting:
97//!
98//! - **`tokens_per_interval`**: Number of tokens added each interval
99//! - **interval**: Time between replenishments
100//!
101//! For example, to achieve 1000 ops/sec:
102//! - Option 1: 100 tokens every 100ms
103//! - Option 2: 10 tokens every 10ms
104//!
105//! The implementation automatically scales down to prevent excessive granularity while maintaining the target rate.
106//!
107//! # Thread Safety
108//!
109//! All throttling mechanisms are thread-safe and can be used across multiple async tasks and threads. The semaphores use efficient `parking_lot` mutexes internally.
110//!
111//! # Performance Considerations
112//!
113//! - **Open Files Limit**: No replenishment needed, permits released automatically
114//! - **Ops/IOPS Throttle**: Background task overhead is minimal (~1 task per throttle type)
115//! - **Token Acquisition**: Async operation that parks task when no tokens available
116//!
117//! # Examples
118//!
119//! ## Complete Throttled Copy Operation
120//!
121//! ```rust,no_run
122//! use throttle::*;
123//! use std::time::Duration;
124//!
125//! async fn setup_throttling() {
126//! // Limit to 80% of 10000 max files
127//! set_max_open_files(8000);
128//!
129//! // 500 operations per second
130//! init_ops_tokens(50);
131//! tokio::spawn(run_ops_replenish_thread(50, Duration::from_millis(100)));
132//!
133//! // 1000 IOPS (with 64KB chunks ≈ 64 MB/s)
134//! init_iops_tokens(100);
135//! tokio::spawn(run_iops_replenish_thread(100, Duration::from_millis(100)));
136//! }
137//!
138//! async fn copy_file_throttled(size: u64) {
139//! let chunk_size = 64 * 1024;
140//!
141//! // Acquire all required permits
142//! get_ops_token().await;
143//! get_file_iops_tokens(chunk_size, size).await;
144//! let _file_guard = open_file_permit().await;
145//!
146//! // Perform copy operation
147//! // ...
148//! }
149//! ```
150
151mod semaphore;
152
153/// Which filesystem side a metadata syscall touches.
154///
155/// Mirrors the congestion crate's `Side`; this enum is intentionally
156/// independent so `throttle` has no dependency on `congestion`.
157#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
158#[repr(u8)]
159pub enum Side {
160 Source = 0,
161 Destination = 1,
162}
163
164/// Which metadata syscall a permit / cap belongs to.
165///
166/// Mirrors `congestion::MetadataOp` variant-for-variant. Each crate
167/// indexes its own flat array using its own enum, so the per-crate
168/// discriminants drive routing inside that crate; the cross-crate
169/// translation goes through name-based bridge functions in
170/// `common::walk`. Adding a variant here without a matching one in
171/// `congestion` (or vice versa) is caught at compile time by the
172/// exhaustive matches in those bridges.
173#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
174#[repr(u8)]
175pub enum MetadataOp {
176 Stat = 0,
177 ReadLink = 1,
178 MkDir = 2,
179 RmDir = 3,
180 Unlink = 4,
181 HardLink = 5,
182 Symlink = 6,
183 Chmod = 7,
184 OpenCreate = 8,
185}
186
187/// Number of [`MetadataOp`] variants. Keep in sync when adding variants.
188pub const N_META_OPS: usize = 9;
189/// Number of [`Side`] variants.
190pub const N_SIDES: usize = 2;
191/// Total number of distinct (Side, MetadataOp) controllers.
192pub const N_META_RESOURCES: usize = N_META_OPS * N_SIDES;
193
194impl MetadataOp {
195 /// All op variants, in discriminant order.
196 pub const ALL: [Self; N_META_OPS] = [
197 Self::Stat,
198 Self::ReadLink,
199 Self::MkDir,
200 Self::RmDir,
201 Self::Unlink,
202 Self::HardLink,
203 Self::Symlink,
204 Self::Chmod,
205 Self::OpenCreate,
206 ];
207}
208
209impl Side {
210 /// All side variants, in discriminant order.
211 pub const ALL: [Self; N_SIDES] = [Self::Source, Self::Destination];
212}
213
214/// Which throttled metadata resource a permit / cap belongs to.
215///
216/// Each `(Side, MetadataOp)` pair gets its own independent concurrency
217/// cap so the controller for one syscall on one filesystem can adjust
218/// without dragging others along — for example, `(Source, Stat)` and
219/// `(Destination, Unlink)` are completely independent enforcement gates.
220#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
221pub struct Resource {
222 pub side: Side,
223 pub op: MetadataOp,
224}
225
226impl Resource {
227 /// Construct a metadata resource for the given side + op.
228 pub const fn meta(side: Side, op: MetadataOp) -> Self {
229 Self { side, op }
230 }
231 /// Map the resource to its slot in the per-resource semaphore array.
232 /// Side is the major axis, op the minor — matches the corresponding
233 /// fan-out layout in `congestion::RoutingSink`.
234 fn index(self) -> usize {
235 (self.side as usize) * N_META_OPS + (self.op as usize)
236 }
237}
238
239static OPEN_FILES_LIMIT: std::sync::LazyLock<semaphore::Semaphore> =
240 std::sync::LazyLock::new(semaphore::Semaphore::new);
241// Spawn-time backpressure for operations that don't actually hold open
242// file descriptors but still benefit from a bound on in-flight tasks
243// (rm, cmp). Kept separate from OPEN_FILES_LIMIT so that an inner rm
244// triggered by a copy/link path that already holds an OPEN_FILES_LIMIT
245// permit cannot deadlock against itself when the outer permit pool is
246// saturated. Both semaphores are sized to the same configured limit by
247// `set_max_open_files`.
248static PENDING_META_LIMIT: std::sync::LazyLock<semaphore::Semaphore> =
249 std::sync::LazyLock::new(semaphore::Semaphore::new);
250static OPS_THROTTLE: std::sync::LazyLock<semaphore::Semaphore> =
251 std::sync::LazyLock::new(semaphore::Semaphore::new);
252static IOPS_THROTTLE: std::sync::LazyLock<semaphore::Semaphore> =
253 std::sync::LazyLock::new(semaphore::Semaphore::new);
254// Per-(Side, MetadataOp) concurrency caps driven by the congestion
255// controller. One semaphore per [`Resource`] so each syscall on each
256// side is throttled independently. Distinct from OPS_THROTTLE (rate)
257// and OPEN_FILES_LIMIT (FDs).
258//
259// `[const { ... }; N]` initializes each slot independently — without
260// the inline-const block the array repeat syntax would require `Copy`,
261// which `LazyLock` is not.
262static OPS_IN_FLIGHT_LIMITS: [std::sync::LazyLock<semaphore::Semaphore>; N_META_RESOURCES] =
263 [const { std::sync::LazyLock::new(semaphore::Semaphore::new) }; N_META_RESOURCES];
264
265fn ops_in_flight_limit(resource: Resource) -> &'static semaphore::Semaphore {
266 &OPS_IN_FLIGHT_LIMITS[resource.index()]
267}
268
269/// Configure the spawn-time concurrency caps from a single knob.
270///
271/// Despite the name, this sizes **two** independent semaphores:
272///
273/// * [`open_file_permit`] — actual file-descriptor backpressure for
274/// paths that hold open fds (copy/link).
275/// * [`pending_meta_permit`] — task-spawn backpressure for recursive
276/// metadata-only walks (rm/cmp) that don't hold fds. Sized
277/// separately so paths that compose these operations
278/// (e.g. `copy_file → rm` for an overwrite of a directory
279/// destination) cannot deadlock against the open-files pool.
280///
281/// Pass `0` to disable both caps; `setup` is idempotent and intended
282/// for startup or test reset.
283pub fn set_max_open_files(max_open_files: usize) {
284 OPEN_FILES_LIMIT.setup(max_open_files);
285 PENDING_META_LIMIT.setup(max_open_files);
286}
287
288pub struct OpenFileGuard {
289 _permit: Option<semaphore::Permit<'static>>,
290}
291
292pub async fn open_file_permit() -> OpenFileGuard {
293 OpenFileGuard {
294 _permit: OPEN_FILES_LIMIT.acquire().await,
295 }
296}
297
298/// Backpressure guard for in-flight metadata-only operations (rm, cmp).
299///
300/// Held across a spawned task to bound the live task count under
301/// recursive walk operations that don't hold open file descriptors.
302/// Kept distinct from [`OpenFileGuard`] so that paths which compose
303/// these operations (e.g. `copy_file → rm` for an overwrite of a
304/// directory destination) don't deadlock against a saturated
305/// `OPEN_FILES_LIMIT`.
306pub struct PendingMetaGuard {
307 _permit: Option<semaphore::Permit<'static>>,
308}
309
310pub async fn pending_meta_permit() -> PendingMetaGuard {
311 PendingMetaGuard {
312 _permit: PENDING_META_LIMIT.acquire().await,
313 }
314}
315
316/// Dynamically set the maximum number of concurrent operations in flight
317/// for the given [`Resource`].
318///
319/// Increasing the cap is instant; decreasing it can temporarily overshoot
320/// if permits are already held — they return naturally on drop. This is
321/// the enforcement knob the adaptive controller drives.
322///
323/// Setting to 0 disables the cap for *subsequent* acquires (they return
324/// immediately without a permit), but does not wake acquirers already
325/// blocked inside the semaphore's wait queue — they remain parked until
326/// permits become available. Callers that need a cancellable disable
327/// should use a higher-level shutdown signal.
328pub fn set_max_ops_in_flight(resource: Resource, max_in_flight: usize) {
329 ops_in_flight_limit(resource).set_max(max_in_flight);
330}
331
332pub struct OpsInFlightGuard {
333 _permit: Option<semaphore::Permit<'static>>,
334}
335
336/// Acquire a permit from the ops-in-flight cap for the given [`Resource`].
337/// No-op (returns immediately) when that resource's cap is not configured.
338pub async fn ops_in_flight_permit(resource: Resource) -> OpsInFlightGuard {
339 OpsInFlightGuard {
340 _permit: ops_in_flight_limit(resource).acquire().await,
341 }
342}
343
344pub fn init_ops_tokens(ops_tokens: usize) {
345 OPS_THROTTLE.setup(ops_tokens);
346}
347
348pub fn init_iops_tokens(ops_tokens: usize) {
349 IOPS_THROTTLE.setup(ops_tokens);
350}
351
352pub async fn get_ops_token() {
353 OPS_THROTTLE.consume().await;
354}
355
356async fn get_iops_tokens(tokens: u32) {
357 IOPS_THROTTLE.consume_many(tokens).await;
358}
359
360pub async fn get_file_iops_tokens(chunk_size: u64, file_size: u64) {
361 if let Some(div) = (std::cmp::max(1, file_size) - 1).checked_div(chunk_size) {
362 let tokens = 1 + div;
363 if tokens > u64::from(u32::MAX) {
364 tracing::error!(
365 "chunk size: {} is too small to limit throughput for files this big, size: {}",
366 chunk_size,
367 file_size,
368 );
369 } else {
370 // tokens is guaranteed to be <= u32::MAX by check above
371 let tokens_u32 =
372 u32::try_from(tokens).expect("tokens should fit in u32 after bounds check");
373 get_iops_tokens(tokens_u32).await;
374 }
375 }
376}
377
378pub async fn run_ops_replenish_thread(replenish: usize, interval: std::time::Duration) {
379 OPS_THROTTLE.run_replenish_thread(replenish, interval).await;
380}
381
382pub async fn run_iops_replenish_thread(replenish: usize, interval: std::time::Duration) {
383 IOPS_THROTTLE
384 .run_replenish_thread(replenish, interval)
385 .await;
386}
387
388/// Dynamically update the ops-throttle replenish count.
389///
390/// Takes effect on the next iteration of the replenish loop started by
391/// [`run_ops_replenish_thread`] — no loop restart, no permits forcibly
392/// drained. Setting `value = 0` pauses replenishment (tokens already in
393/// the bucket will be consumed but no new ones will be added).
394///
395/// Intended for congestion-control layers that translate a Controller's
396/// decisions into dynamic rate targets. For the update to visibly gate
397/// ops, two things must be true:
398///
399/// 1. A replenish task must be running (spawned via
400/// [`run_ops_replenish_thread`]); otherwise the new value is stored
401/// but no token refills happen.
402/// 2. The ops-throttle must be enabled (via [`init_ops_tokens`] with a
403/// non-zero value, or [`enable_ops_throttle`] after a prior
404/// [`disable_ops_throttle`]); otherwise [`get_ops_token`] is a no-op
405/// regardless of the replenish count.
406pub fn set_ops_replenish(value: usize) {
407 OPS_THROTTLE.set_replenish(value);
408}
409
410/// Dynamically update the iops-throttle replenish count. See
411/// [`set_ops_replenish`] for the semantics.
412pub fn set_iops_replenish(value: usize) {
413 IOPS_THROTTLE.set_replenish(value);
414}
415
416/// Disable the ops-throttle, making [`get_ops_token`] a no-op. Mirrors
417/// the "unlimited on this dimension" semantics of `Decision` so an
418/// adaptive controller can transition a previously-set rate back to "no
419/// limit" by sending `rate_per_sec: None`.
420///
421/// The replenish loop keeps running (it has no mid-loop flag check) but
422/// its token additions become inert until the flag is flipped back on
423/// via [`enable_ops_throttle`].
424pub fn disable_ops_throttle() {
425 OPS_THROTTLE.disable();
426}
427
428/// Re-enable the ops-throttle after [`disable_ops_throttle`] — the
429/// counterpart that allows a controller to toggle rate capping on and
430/// off via `Decision::rate_per_sec`. Returns `true` if enablement took
431/// effect, `false` if the throttle was never initialized (i.e.
432/// `--ops-throttle` was not set at startup) so there is nothing to
433/// enable.
434pub fn enable_ops_throttle() -> bool {
435 OPS_THROTTLE.enable()
436}
437
438/// Current in-flight concurrency cap for the given [`Resource`], for
439/// metrics and integration tests. Returns `0` when the cap has been set
440/// to zero (disabled) or has never been configured.
441#[must_use]
442pub fn current_ops_in_flight_limit(resource: Resource) -> usize {
443 ops_in_flight_limit(resource).current_limit()
444}