1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
//! Resource throttling and rate limiting for file operations
//!
//! This crate provides throttling mechanisms to control resource usage during file operations.
//! It helps prevent system overload and allows for controlled resource consumption when working with large filesets or in resource-constrained environments.
//!
//! # Overview
//!
//! The throttle system provides three types of rate limiting:
//!
//! 1. **Open Files Limit** - Controls the maximum number of simultaneously open files
//! 2. **Operations Throttle** - Limits the number of operations per second
//! 3. **I/O Operations Throttle** - Limits the number of I/O operations per second based on chunk size
//!
//! All throttling is implemented using token-bucket semaphores that are automatically replenished at configured intervals.
//!
//! # Usage Patterns
//!
//! ## Open Files Limit
//!
//! Prevents exceeding system file descriptor limits by controlling concurrent file operations:
//!
//! ```rust,no_run
//! use throttle::{set_max_open_files, open_file_permit};
//!
//! # async fn example() {
//! // Configure max open files (typically 80% of system limit)
//! set_max_open_files(8000);
//!
//! // Acquire permit before opening file
//! let _guard = open_file_permit().await;
//! // Open file here - permit is automatically released when guard is dropped
//! # }
//! ```
//!
//! ## Operations Throttling
//!
//! Limits general operations per second to reduce system load:
//!
//! ```rust,no_run
//! use throttle::{init_ops_tokens, run_ops_replenish_thread, get_ops_token};
//! use std::time::Duration;
//!
//! # async fn example() {
//! // Initialize with 100 operations per second
//! let ops_per_interval = 10;
//! let interval = Duration::from_millis(100); // 10 tokens / 100ms = 100/sec
//!
//! init_ops_tokens(ops_per_interval);
//!
//! // Start replenishment in background
//! tokio::spawn(run_ops_replenish_thread(ops_per_interval, interval));
//!
//! // Acquire token before each operation
//! get_ops_token().await;
//! // Perform operation here
//! # }
//! ```
//!
//! ## I/O Operations Throttling
//!
//! Limits I/O operations based on file size and chunk size, useful for bandwidth control:
//!
//! ```rust,no_run
//! use throttle::{init_iops_tokens, run_iops_replenish_thread, get_file_iops_tokens};
//! use std::time::Duration;
//!
//! # async fn example() {
//! // Initialize with desired IOPS limit
//! let iops_per_interval = 100;
//! let interval = Duration::from_millis(100);
//! let chunk_size = 64 * 1024; // 64 KB chunks
//!
//! init_iops_tokens(iops_per_interval);
//! tokio::spawn(run_iops_replenish_thread(iops_per_interval, interval));
//!
//! // For a 1 MB file with 64 KB chunks: requires 16 tokens
//! let file_size = 1024 * 1024;
//! get_file_iops_tokens(chunk_size, file_size).await;
//! // Copy file here
//! # }
//! ```
//!
//! # Token Calculation
//!
//! For I/O throttling, the number of tokens required for a file is calculated as:
//!
//! ```text
//! tokens = ⌈file_size / chunk_size⌉
//! ```
//!
//! This allows throttling to be proportional to the amount of data transferred.
//!
//! # Replenishment Strategy
//!
//! Tokens are replenished using a background task that periodically adds tokens to the semaphore.
//! The replenishment rate can be tuned by adjusting:
//!
//! - **`tokens_per_interval`**: Number of tokens added each interval
//! - **interval**: Time between replenishments
//!
//! For example, to achieve 1000 ops/sec:
//! - Option 1: 100 tokens every 100ms
//! - Option 2: 10 tokens every 10ms
//!
//! The implementation automatically scales down to prevent excessive granularity while maintaining the target rate.
//!
//! # Thread Safety
//!
//! All throttling mechanisms are thread-safe and can be used across multiple async tasks and threads. The semaphores use efficient `parking_lot` mutexes internally.
//!
//! # Performance Considerations
//!
//! - **Open Files Limit**: No replenishment needed, permits released automatically
//! - **Ops/IOPS Throttle**: Background task overhead is minimal (~1 task per throttle type)
//! - **Token Acquisition**: Async operation that parks task when no tokens available
//!
//! # Examples
//!
//! ## Complete Throttled Copy Operation
//!
//! ```rust,no_run
//! use throttle::*;
//! use std::time::Duration;
//!
//! async fn setup_throttling() {
//! // Limit to 80% of 10000 max files
//! set_max_open_files(8000);
//!
//! // 500 operations per second
//! init_ops_tokens(50);
//! tokio::spawn(run_ops_replenish_thread(50, Duration::from_millis(100)));
//!
//! // 1000 IOPS (with 64KB chunks ≈ 64 MB/s)
//! init_iops_tokens(100);
//! tokio::spawn(run_iops_replenish_thread(100, Duration::from_millis(100)));
//! }
//!
//! async fn copy_file_throttled(size: u64) {
//! let chunk_size = 64 * 1024;
//!
//! // Acquire all required permits
//! get_ops_token().await;
//! get_file_iops_tokens(chunk_size, size).await;
//! let _file_guard = open_file_permit().await;
//!
//! // Perform copy operation
//! // ...
//! }
//! ```
/// Which filesystem side a metadata syscall touches.
///
/// Mirrors the congestion crate's `Side`; this enum is intentionally
/// independent so `throttle` has no dependency on `congestion`.
/// Which metadata syscall a permit / cap belongs to.
///
/// Mirrors `congestion::MetadataOp` variant-for-variant. Each crate
/// indexes its own flat array using its own enum, so the per-crate
/// discriminants drive routing inside that crate; the cross-crate
/// translation goes through name-based bridge functions in
/// `common::walk`. Adding a variant here without a matching one in
/// `congestion` (or vice versa) is caught at compile time by the
/// exhaustive matches in those bridges.
/// Number of [`MetadataOp`] variants. Keep in sync when adding variants.
pub const N_META_OPS: usize = 9;
/// Number of [`Side`] variants.
pub const N_SIDES: usize = 2;
/// Total number of distinct (Side, MetadataOp) controllers.
pub const N_META_RESOURCES: usize = N_META_OPS * N_SIDES;
/// Which throttled metadata resource a permit / cap belongs to.
///
/// Each `(Side, MetadataOp)` pair gets its own independent concurrency
/// cap so the controller for one syscall on one filesystem can adjust
/// without dragging others along — for example, `(Source, Stat)` and
/// `(Destination, Unlink)` are completely independent enforcement gates.
static OPEN_FILES_LIMIT: LazyLock =
new;
// Spawn-time backpressure for operations that don't actually hold open
// file descriptors but still benefit from a bound on in-flight tasks
// (rm, cmp). Kept separate from OPEN_FILES_LIMIT so that an inner rm
// triggered by a copy/link path that already holds an OPEN_FILES_LIMIT
// permit cannot deadlock against itself when the outer permit pool is
// saturated. Both semaphores are sized to the same configured limit by
// `set_max_open_files`.
static PENDING_META_LIMIT: LazyLock =
new;
static OPS_THROTTLE: LazyLock =
new;
static IOPS_THROTTLE: LazyLock =
new;
// Per-(Side, MetadataOp) concurrency caps driven by the congestion
// controller. One semaphore per [`Resource`] so each syscall on each
// side is throttled independently. Distinct from OPS_THROTTLE (rate)
// and OPEN_FILES_LIMIT (FDs).
//
// `[const { ... }; N]` initializes each slot independently — without
// the inline-const block the array repeat syntax would require `Copy`,
// which `LazyLock` is not.
static OPS_IN_FLIGHT_LIMITS: =
;
/// Configure the spawn-time concurrency caps from a single knob.
///
/// Despite the name, this sizes **two** independent semaphores:
///
/// * [`open_file_permit`] — actual file-descriptor backpressure for
/// paths that hold open fds (copy/link).
/// * [`pending_meta_permit`] — task-spawn backpressure for recursive
/// metadata-only walks (rm/cmp) that don't hold fds. Sized
/// separately so paths that compose these operations
/// (e.g. `copy_file → rm` for an overwrite of a directory
/// destination) cannot deadlock against the open-files pool.
///
/// Pass `0` to disable both caps; `setup` is idempotent and intended
/// for startup or test reset.
pub async
/// Backpressure guard for in-flight metadata-only operations (rm, cmp).
///
/// Held across a spawned task to bound the live task count under
/// recursive walk operations that don't hold open file descriptors.
/// Kept distinct from [`OpenFileGuard`] so that paths which compose
/// these operations (e.g. `copy_file → rm` for an overwrite of a
/// directory destination) don't deadlock against a saturated
/// `OPEN_FILES_LIMIT`.
pub async
/// Dynamically set the maximum number of concurrent operations in flight
/// for the given [`Resource`].
///
/// Increasing the cap is instant; decreasing it can temporarily overshoot
/// if permits are already held — they return naturally on drop. This is
/// the enforcement knob the adaptive controller drives.
///
/// Setting to 0 disables the cap for *subsequent* acquires (they return
/// immediately without a permit), but does not wake acquirers already
/// blocked inside the semaphore's wait queue — they remain parked until
/// permits become available. Callers that need a cancellable disable
/// should use a higher-level shutdown signal.
/// Acquire a permit from the ops-in-flight cap for the given [`Resource`].
/// No-op (returns immediately) when that resource's cap is not configured.
pub async
pub async
async
pub async
pub async
pub async
/// Dynamically update the ops-throttle replenish count.
///
/// Takes effect on the next iteration of the replenish loop started by
/// [`run_ops_replenish_thread`] — no loop restart, no permits forcibly
/// drained. Setting `value = 0` pauses replenishment (tokens already in
/// the bucket will be consumed but no new ones will be added).
///
/// Intended for congestion-control layers that translate a Controller's
/// decisions into dynamic rate targets. For the update to visibly gate
/// ops, two things must be true:
///
/// 1. A replenish task must be running (spawned via
/// [`run_ops_replenish_thread`]); otherwise the new value is stored
/// but no token refills happen.
/// 2. The ops-throttle must be enabled (via [`init_ops_tokens`] with a
/// non-zero value, or [`enable_ops_throttle`] after a prior
/// [`disable_ops_throttle`]); otherwise [`get_ops_token`] is a no-op
/// regardless of the replenish count.
/// Dynamically update the iops-throttle replenish count. See
/// [`set_ops_replenish`] for the semantics.
/// Disable the ops-throttle, making [`get_ops_token`] a no-op. Mirrors
/// the "unlimited on this dimension" semantics of `Decision` so an
/// adaptive controller can transition a previously-set rate back to "no
/// limit" by sending `rate_per_sec: None`.
///
/// The replenish loop keeps running (it has no mid-loop flag check) but
/// its token additions become inert until the flag is flipped back on
/// via [`enable_ops_throttle`].
/// Re-enable the ops-throttle after [`disable_ops_throttle`] — the
/// counterpart that allows a controller to toggle rate capping on and
/// off via `Decision::rate_per_sec`. Returns `true` if enablement took
/// effect, `false` if the throttle was never initialized (i.e.
/// `--ops-throttle` was not set at startup) so there is nothing to
/// enable.
/// Current in-flight concurrency cap for the given [`Resource`], for
/// metrics and integration tests. Returns `0` when the cap has been set
/// to zero (disabled) or has never been configured.