1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2026-present, Structured World Foundation
//! Compaction I/O rate limiter.
//!
//! Background compaction can saturate disk bandwidth and starve user
//! point reads / range scans, spiking their P99 latency. This limiter
//! caps the rate at which the compaction worker is allowed to issue I/O
//! so user traffic keeps its share of the device. It is invoked only from
//! the compaction path, so flush and user reads are never throttled —
//! they simply never call it.
//!
//! # Model
//!
//! A leaky token bucket measured in bytes. Each request debits the
//! bucket; when the bucket goes into debt the caller must wait long
//! enough for the configured refill rate to repay it, which serialises
//! compaction I/O down to `rate_bytes_per_sec`. A rate of `0` disables
//! throttling: every request is immediate (the default, so the limiter is
//! wired unconditionally and switched on via
//! [`Config::compaction_rate_limit`](crate::Config::compaction_rate_limit)).
//!
//! # Clock injection
//!
//! The core decision function [`RateLimiter::acquire_wait`] takes the
//! current monotonic time as a `Duration` since an arbitrary origin, so
//! it is pure (no syscalls), unit-testable without sleeping, and compiles
//! without `std`. The interruptible blocking wrapper
//! [`RateLimiter::request_interruptible`] (which reads the system clock and
//! sleeps in pollable chunks) is gated behind the `std` feature.
//!
//! A priority-class extension (flush / user I/O also debiting the bucket
//! but draining ahead of compaction) is a planned refinement; this
//! revision throttles compaction alone.
use core::sync::atomic::Ordering;
use core::time::Duration;
use portable_atomic::AtomicU64;
use spin::Mutex;
// no_std-ready: the token-bucket core (this constant, `Bucket`, the limiter's
// rate fields and `acquire_wait`) is clock-agnostic, but the no_std
// `request_interruptible` is a passthrough until a caller injects a monotonic
// clock, so under no_std these are present-but-unused rather than dead.
#[cfg_attr(
not(feature = "std"),
allow(
dead_code,
reason = "no_std-ready token-bucket core; awaits a clock-injecting no_std caller"
)
)]
const NANOS_PER_SEC: u128 = 1_000_000_000;
/// Mutable bucket state, guarded by a single lock.
#[derive(Debug)]
#[cfg_attr(
not(feature = "std"),
allow(
dead_code,
reason = "no_std-ready token-bucket state; awaits a clock-injecting no_std caller"
)
)]
struct Bucket {
/// Available budget in bytes. May go negative (debt) when a request
/// debits more than is currently available; the deficit is what the
/// caller waits out.
available: i64,
/// Monotonic time of the last refill, as nanoseconds since the
/// limiter's origin.
last_refill_nanos: u128,
}
/// Compaction I/O rate limiter (leaky token bucket).
///
/// Share across compaction invocations by wrapping in `Arc`; the limiter
/// is `Sync` (all mutable state is behind a lock / atomics).
///
/// A `rate_bytes_per_sec` of `0` disables throttling entirely: every
/// request returns immediately.
#[derive(Debug)]
#[cfg_attr(
not(feature = "std"),
allow(
dead_code,
reason = "no_std-ready limiter fields read only by acquire_wait, which awaits a clock-injecting no_std caller"
)
)]
pub struct RateLimiter {
/// Refill rate in bytes per second. `0` means unlimited (disabled).
rate_bytes_per_sec: AtomicU64,
/// Maximum positive budget the bucket may accumulate, in bytes: one
/// second of rate, so an idle limiter grants a one-second burst but no
/// more (prevents a long-idle compactor from dumping an unbounded
/// backlog at full speed).
burst_bytes: u64,
bucket: Mutex<Bucket>,
}
impl RateLimiter {
/// Creates a limiter refilling at `rate_bytes_per_sec`.
///
/// `0` disables throttling (every request is immediate). The burst
/// ceiling is one second of rate.
#[must_use]
pub fn new(rate_bytes_per_sec: u64) -> Self {
Self {
rate_bytes_per_sec: AtomicU64::new(rate_bytes_per_sec),
burst_bytes: rate_bytes_per_sec,
bucket: Mutex::new(Bucket {
// Start with a full one-second burst so the first request
// after construction is not penalised.
available: i64::try_from(rate_bytes_per_sec).unwrap_or(i64::MAX),
last_refill_nanos: 0,
}),
}
}
/// Core decision: how long the caller must wait before issuing an I/O
/// of `bytes`, given the current monotonic time `now` (a `Duration`
/// since this limiter's origin).
///
/// Returns [`Duration::ZERO`] when the request may proceed
/// immediately (including when the rate is `0`). Performs no sleeping
/// and reads no clock, so it is fully deterministic for a given `now`
/// sequence and usable in `no_std` builds.
#[must_use]
#[cfg_attr(
not(feature = "std"),
allow(
dead_code,
reason = "no_std-ready token-bucket decision; awaits a clock-injecting no_std caller"
)
)]
pub fn acquire_wait(&self, bytes: u64, now: Duration) -> Duration {
let rate = self.rate_bytes_per_sec.load(Ordering::Relaxed);
if rate == 0 {
return Duration::ZERO;
}
let now_nanos = now.as_nanos();
let rate_u128 = u128::from(rate);
let mut bucket = self.bucket.lock();
// Refill: add the bytes that accrued since the last refill, then
// cap at the burst ceiling. `saturating_sub` guards against a
// non-monotonic `now` (clock should be monotonic, but never let a
// backwards step underflow).
let elapsed = now_nanos.saturating_sub(bucket.last_refill_nanos);
if elapsed > 0 {
let refilled = elapsed.saturating_mul(rate_u128) / NANOS_PER_SEC;
// refilled fits in i64 for any realistic elapsed/rate; clamp
// defensively so an absurd elapsed can't overflow the add.
let refilled = i64::try_from(refilled).unwrap_or(i64::MAX);
bucket.available = bucket.available.saturating_add(refilled);
let burst_i64 = i64::try_from(self.burst_bytes).unwrap_or(i64::MAX);
if bucket.available > burst_i64 {
bucket.available = burst_i64;
}
bucket.last_refill_nanos = now_nanos;
}
// Debit the request. Going negative is the debt the caller pays
// off by waiting.
let debit = i64::try_from(bytes).unwrap_or(i64::MAX);
bucket.available = bucket.available.saturating_sub(debit);
if bucket.available >= 0 {
return Duration::ZERO;
}
// Wait long enough for the refill rate to repay the deficit.
let deficit = bucket.available.unsigned_abs();
let wait_nanos = u128::from(deficit).saturating_mul(NANOS_PER_SEC) / rate_u128;
Duration::from_nanos(u64::try_from(wait_nanos).unwrap_or(u64::MAX))
}
/// Interruptible blocking request: waits (sleeping the current thread)
/// until an I/O of `bytes` may proceed, polling `should_stop` so a
/// shutdown / drop can break a long wait promptly.
///
/// Returns `true` if the caller should abort: either `should_stop` was
/// already set on entry (stop pending before any work) or it fired during
/// the wait. Returns `false` if the full wait elapsed (caller may
/// proceed).
///
/// The budget is debited at most once (via a single
/// [`acquire_wait`](Self::acquire_wait)) — the early returns for
/// `rate == 0` and for an already-pending stop skip the debit entirely.
/// When a wait is computed it is slept in
/// <= [`POLL_INTERVAL`](Self::POLL_INTERVAL) chunks with a `should_stop`
/// check before each, so even a multi-gigabyte item under a low limit
/// cannot stall shutdown for more than one poll interval. Re-calling
/// `acquire_wait` in the loop would wrongly re-debit the bucket each
/// iteration, so the wait is computed once up front.
///
/// A no-op returning `false` when the rate is `0` (no clock read).
/// Only with the `std` feature; `no_std` callers drive `acquire_wait`
/// with their own clock + interruptible wait.
// no-std: caller-provided clock + acquire_wait() + caller's wait/poll loop
#[cfg(feature = "std")]
pub fn request_interruptible(&self, bytes: u64, should_stop: impl Fn() -> bool) -> bool {
// Short-circuit BEFORE any clock read so the unthrottled default
// (rate 0) costs a single relaxed atomic load — the compaction
// merge loop calls this per item.
if self.rate_bytes_per_sec.load(Ordering::Relaxed) == 0 {
return false;
}
// If a stop is already pending, bail before touching the bucket /
// clock — no point debiting or locking on the shutdown path.
if should_stop() {
return true;
}
// Debit once, then sleep the computed wait in interruptible chunks.
let mut remaining = self.acquire_wait(bytes, Self::std_now());
while !remaining.is_zero() {
if should_stop() {
return true;
}
let chunk = remaining.min(Self::POLL_INTERVAL);
std::thread::sleep(chunk);
remaining = remaining.saturating_sub(chunk);
}
false
}
/// `no_std` variant: there is no ambient monotonic clock to throttle
/// against, so this never sleeps. It still honors the caller's stop signal
/// so a shutdown is observed promptly; rate limiting itself is a no-op.
// no-std: wire a caller-provided clock + `acquire_wait` poll loop to restore
// throttling.
#[cfg(not(feature = "std"))]
pub fn request_interruptible(&self, _bytes: u64, should_stop: impl Fn() -> bool) -> bool {
should_stop()
}
/// Maximum single sleep span inside
/// [`request_interruptible`](Self::request_interruptible): the upper
/// bound on how long a stop signal can go unnoticed mid-throttle.
#[cfg(feature = "std")]
const POLL_INTERVAL: Duration = Duration::from_millis(100);
/// Monotonic time since a process-global origin, for the `std`
/// wrapper. A shared origin is fine: each limiter's bucket tracks its
/// own `last_refill_nanos` against the same monotonic reference, so
/// only the deltas matter.
#[cfg(feature = "std")]
fn std_now() -> Duration {
use std::sync::OnceLock;
// `OnceLock` / `Instant` are std-only, hence this helper (and
// `request`) live behind the `std` gate; the no_std path uses
// `acquire_wait` with a caller-supplied clock instead.
static ORIGIN: OnceLock<std::time::Instant> = OnceLock::new();
ORIGIN.get_or_init(std::time::Instant::now).elapsed()
}
}
#[cfg(test)]
mod tests;