1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2026-present, Structured World Foundation
//! Computed write-backpressure verdict.
//!
//! The engine is a library: `insert` is synchronous and non-blocking, and flush
//! plus compaction are driven by the caller, not by an engine-owned thread. So
//! backpressure is a **verdict the caller consults**, not an internal stall: the
//! engine would deadlock if it blocked a write on compaction debt draining, since
//! the blocked thread may be the one that runs compaction. This mirrors the
//! storage-admission predicate ([`crate::AbstractTree::write_admission`]).
//!
//! [`Backpressure`] is computed from two independent signals against
//! caller-configured thresholds (see the `*_slowdown` / `*_stop` fields on
//! [`RuntimeConfig`](crate::runtime_config::RuntimeConfig)):
//!
//! - **L0 table count** — count-triggered, the same signal the leveled `choose`
//! trigger uses; a tall L0 is what spikes read amplification.
//! - **Pending compaction bytes** — the size-target debt the strategy reports.
//!
//! The verdict is the more severe of the two axes. With every threshold unset the
//! verdict is always [`Backpressure::None`], so the feature is off by default and
//! the write path is unchanged.
use core::time::Duration;
/// Thresholds that drive the [`Backpressure`] verdict. Every field is opt-in:
/// `None` disables that axis. With all fields `None` the verdict is always
/// [`Backpressure::None`].
///
/// A `slowdown` threshold without its matching `stop` (or vice versa) is honoured
/// independently: the axis still produces the tier whose threshold is set.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct BackpressureThresholds {
/// L0 table count at or above which the verdict is at least
/// [`Backpressure::Slowdown`].
pub l0_slowdown: Option<usize>,
/// L0 table count at or above which the verdict is [`Backpressure::Stop`].
pub l0_stop: Option<usize>,
/// Pending-compaction bytes at or above which the verdict is at least
/// [`Backpressure::Slowdown`].
pub bytes_slowdown: Option<u64>,
/// Pending-compaction bytes at or above which the verdict is
/// [`Backpressure::Stop`].
pub bytes_stop: Option<u64>,
/// The slowdown delay returned at the stop threshold. The actual
/// [`Backpressure::Slowdown`] delay ramps linearly from zero at the slowdown
/// threshold to this cap at the stop threshold, so there is no cliff between
/// healthy and stopped. `None` (or `Duration::ZERO`) makes a slowdown tier
/// report a zero-length delay (advisory tier only).
pub max_slowdown: Option<Duration>,
}
impl BackpressureThresholds {
/// All axes disabled: the verdict is always [`Backpressure::None`]. The
/// default, and a usable `const` for config construction.
pub const OFF: Self = Self {
l0_slowdown: None,
l0_stop: None,
bytes_slowdown: None,
bytes_stop: None,
max_slowdown: None,
};
/// `true` when no axis is configured, so the verdict is always
/// [`Backpressure::None`]. The hot path checks this first to skip the
/// version inspection entirely.
#[must_use]
pub const fn is_off(&self) -> bool {
self.l0_slowdown.is_none()
&& self.l0_stop.is_none()
&& self.bytes_slowdown.is_none()
&& self.bytes_stop.is_none()
}
}
/// A computed write-backpressure verdict.
///
/// Advisory: the caller honours it in its own write loop (sleep at
/// [`Slowdown`](Backpressure::Slowdown), pause / shed at
/// [`Stop`](Backpressure::Stop)). The engine never blocks on it.
///
/// # Example
///
/// The caller consults the verdict before a write and honours it. Tracking how
/// long it spent throttled is the caller's own metric (the engine cannot observe
/// caller-side sleep time):
///
/// ```
/// use core::time::Duration;
/// use lsm_tree::Backpressure;
///
/// /// Returns how long the caller should pause before this write (zero = proceed).
/// fn honour(verdict: Backpressure) -> Duration {
/// match verdict {
/// Backpressure::None => Duration::ZERO,
/// Backpressure::Slowdown { suggested_delay } => suggested_delay,
/// // Stop: pause / shed load until a later verdict drops below stop.
/// Backpressure::Stop => Duration::from_millis(50),
/// }
/// }
///
/// assert_eq!(honour(Backpressure::None), Duration::ZERO);
/// assert_eq!(
/// honour(Backpressure::Slowdown { suggested_delay: Duration::from_micros(200) }),
/// Duration::from_micros(200),
/// );
/// ```
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Backpressure {
/// The tree is within its target shape; write at full rate.
None,
/// The tree is past a slowdown threshold but below stop. The caller should
/// delay each write by `suggested_delay` to let compaction catch up. The
/// delay grows with the overage (zero at the slowdown threshold, up to the
/// configured cap at the stop threshold).
Slowdown {
/// The recommended per-write delay.
suggested_delay: Duration,
},
/// The tree is at or past a stop threshold. The caller should stop admitting
/// writes until a later verdict drops back below stop (the verdict is
/// computed, not latched, so it clears as soon as compaction drains).
Stop,
}
impl Backpressure {
/// Compute the verdict from the two live signals against `thresholds`.
///
/// Pure and allocation-free so it is unit-testable without a tree. The result
/// is the more severe of the L0-count and pending-bytes axes; the slowdown
/// delay is the larger of the two axes' ramped delays.
#[must_use]
pub fn compute(
l0_table_count: usize,
pending_bytes: u64,
thresholds: &BackpressureThresholds,
) -> Self {
if thresholds.is_off() {
return Self::None;
}
// Stop dominates: if either axis is at its stop threshold, stop.
let l0_stop = thresholds.l0_stop.is_some_and(|t| l0_table_count >= t);
let bytes_stop = thresholds.bytes_stop.is_some_and(|t| pending_bytes >= t);
if l0_stop || bytes_stop {
return Self::Stop;
}
// Otherwise, the slowdown tier if either axis is past its slowdown
// threshold. The delay is the larger ramp across the two axes.
let cap = thresholds.max_slowdown.unwrap_or(Duration::ZERO);
let mut delay = Duration::ZERO;
let mut slowing = false;
if let Some(soft) = thresholds.l0_slowdown
&& l0_table_count >= soft
{
slowing = true;
delay = delay.max(ramp_usize(l0_table_count, soft, thresholds.l0_stop, cap));
}
if let Some(soft) = thresholds.bytes_slowdown
&& pending_bytes >= soft
{
slowing = true;
delay = delay.max(ramp_u64(pending_bytes, soft, thresholds.bytes_stop, cap));
}
if slowing {
Self::Slowdown {
suggested_delay: delay,
}
} else {
Self::None
}
}
/// `true` for any tier above [`Backpressure::None`].
#[must_use]
pub const fn is_throttled(&self) -> bool {
!matches!(self, Self::None)
}
}
/// Linear ramp of `value` in `[soft, hard)` onto `[0, cap]`. Without a `hard`
/// bound the delay sits at `cap` once `value >= soft` (no interval to ramp over).
fn ramp_usize(value: usize, soft: usize, hard: Option<usize>, cap: Duration) -> Duration {
match hard {
// hard > soft: linear fraction of the cap. `value` is in `[soft, hard)`
// here (stop tier was already handled), so `0 <= num < den`.
Some(hard) if hard > soft => {
let num = (value - soft) as u64;
let den = (hard - soft) as u64;
scale(cap, num, den)
}
_ => cap,
}
}
/// `u64` twin of [`ramp_usize`].
fn ramp_u64(value: u64, soft: u64, hard: Option<u64>, cap: Duration) -> Duration {
match hard {
Some(hard) if hard > soft => {
let num = value - soft;
let den = hard - soft;
scale(cap, num, den)
}
_ => cap,
}
}
/// `cap * num / den` in nanoseconds, exact, with `num < den`.
fn scale(cap: Duration, num: u64, den: u64) -> Duration {
if den == 0 {
return cap;
}
// Apply the fraction WITHOUT forming `cap_nanos * num` first: for a
// near-`Duration::MAX` cap and a wide span that product can exceed u128, and
// saturating it would distort the ramp (collapsing it far below the intended
// proportion). This mulDiv decomposition is exact and overflow-free given the
// `num < den` invariant: the quotient term `(cap_nanos / den) * num` is
// <= cap_nanos (since num <= den), and the remainder term's product
// `(cap_nanos % den) * num` is < den * num < u128::MAX (both factors < u64::MAX).
let cap_nanos = cap.as_nanos();
let num = u128::from(num);
let den = u128::from(den);
let nanos = (cap_nanos / den) * num + ((cap_nanos % den) * num) / den;
// secs <= cap.as_secs() (which is a u64), so try_from never actually
// saturates here; the fallback is defensive only.
let secs = u64::try_from(nanos / 1_000_000_000).unwrap_or(u64::MAX);
// n % 1_000_000_000 is < 1e9 < u32::MAX, so the subsec nanos fit u32.
let sub = (nanos % 1_000_000_000) as u32;
Duration::new(secs, sub)
}
#[cfg(test)]
mod tests;