rivet/tuning/adaptive.rs
1//! Adaptive batch sizing — live-feedback loop that reacts to DB pressure.
2//!
3//! Both `PostgresSource` and `MysqlSource` sample pressure metrics every
4//! [`ADAPTIVE_SAMPLE_INTERVAL`] batches (`pg_stat_bgwriter.checkpoints_req` for
5//! PG; `Innodb_log_waits` for MySQL) and call [`next_adaptive_batch_size`] to
6//! pick the next fetch size.
7//!
8//! The OPT-2 [`Governor`] runs the same idea at the *parallelism* layer: every
9//! [`GOVERNOR_SAMPLE_INTERVAL_MS`] it samples a [`PressureSource`], folds the
10//! reading through [`GovernorState`], and emits each `(from, to)` transition
11//! through a callback. Extracted from an inline `thread::scope` closure so the
12//! decision loop is unit-testable on a fake `PressureSource` without
13//! requiring a live database + a multi-second wait for two real sample
14//! intervals. The runner binds the callback to its own semaphore-resize +
15//! log + decision-log machinery.
16
17use std::time::{Duration, Instant};
18
19/// Number of batches between adaptive pressure samples.
20pub const ADAPTIVE_SAMPLE_INTERVAL: usize = 10;
21
22/// Hard floor for the adaptive fetch size — the loop never shrinks below this.
23pub const ADAPTIVE_MIN_BATCH: usize = 500;
24
25/// Decide the next adaptive fetch size from current pressure state.
26///
27/// - Under pressure: shrink to 75 %, but never below [`ADAPTIVE_MIN_BATCH`].
28/// - Otherwise: grow to 125 %, but never above the schema-chosen `base` ceiling
29/// (so we recover toward the initial fetch size without overshooting it).
30///
31/// Pure function — exported so adaptive batch-sizing can be unit-tested without
32/// a live database.
33pub fn next_adaptive_batch_size(current: usize, base: usize, under_pressure: bool) -> usize {
34 if under_pressure {
35 (current * 3 / 4).max(ADAPTIVE_MIN_BATCH)
36 } else {
37 (current * 5 / 4).min(base)
38 }
39}
40
41/// Milliseconds between governor pressure samples (the parallelism control
42/// loop in `pipeline::chunked::exec`). Coarser than batch-size adaptation:
43/// spinning workers up/down churns connections, so the governor reacts more
44/// deliberately than the per-batch fetch-size loop.
45pub const GOVERNOR_SAMPLE_INTERVAL_MS: u64 = 1500;
46
47/// Decide the next worker/connection count from current governor state.
48///
49/// Steps by **one** toward the bounds — gentler than the batch loop's ±25 %
50/// ratio, because permit counts are small integers and each step opens or
51/// retires a real source connection:
52/// - Under pressure: shed one worker, never below `min`.
53/// - Otherwise: recover one worker, never above `max`.
54///
55/// `min` is floored at 1 (a 0 ceiling would stall the pool) and `max` at `min`.
56/// Pure function — exported so the governor can be unit-tested without a live DB.
57pub fn next_parallel(current: usize, min: usize, max: usize, under_pressure: bool) -> usize {
58 let lo = min.max(1);
59 let hi = max.max(lo);
60 let cur = current.clamp(lo, hi);
61 if under_pressure {
62 cur.saturating_sub(1).max(lo)
63 } else {
64 (cur + 1).min(hi)
65 }
66}
67
68/// Decision state for the concurrency governor's control loop.
69///
70/// Holds the previous pressure sample and the current target parallelism so the
71/// I/O parts (pressure sampling, semaphore resize, journaling) stay in the
72/// execution layer while this — the actual policy — is unit-testable without a
73/// live database or threads. `under_pressure` mirrors the batch loop: a sample
74/// strictly higher than the previous one means pressure is rising.
75#[derive(Debug)]
76pub struct GovernorState {
77 prev: Option<u64>,
78 current: usize,
79 floor: usize,
80 ceiling: usize,
81}
82
83impl GovernorState {
84 /// Start at `start`, clamped into `[floor, ceiling]`. `ceiling` is floored
85 /// at 1 and `floor` clamped into `[1, ceiling]`.
86 pub fn new(start: usize, floor: usize, ceiling: usize) -> Self {
87 let ceiling = ceiling.max(1);
88 let floor = floor.clamp(1, ceiling);
89 Self {
90 prev: None,
91 current: start.clamp(floor, ceiling),
92 floor,
93 ceiling,
94 }
95 }
96
97 /// Current target parallelism. Test-only observability accessor.
98 #[cfg(test)]
99 pub fn current(&self) -> usize {
100 self.current
101 }
102
103 /// Fold one pressure sample into the state. Returns `Some((from, to))` when
104 /// the target changed (caller should resize the semaphore + journal it), or
105 /// `None` when nothing changed — including when `sample` is `None` (the
106 /// engine couldn't sample, so parallelism holds flat and the baseline is
107 /// left untouched).
108 pub fn observe(&mut self, sample: Option<u64>) -> Option<(usize, usize)> {
109 let cur_p = sample?;
110 let under_pressure = self.prev.is_some_and(|p| cur_p > p);
111 self.prev = Some(cur_p);
112 let next = next_parallel(self.current, self.floor, self.ceiling, under_pressure);
113 if next == self.current {
114 None
115 } else {
116 let from = self.current;
117 self.current = next;
118 Some((from, next))
119 }
120 }
121}
122
123/// How often the governor's `run` loop wakes to check the stop condition.
124/// Kept much shorter than [`GOVERNOR_SAMPLE_INTERVAL_MS`] so the thread exits
125/// promptly when the run finishes, instead of lingering for a full sample
126/// interval after the last chunk completes.
127pub const GOVERNOR_POLL_MS: u64 = 200;
128
129/// Narrow seam the [`Governor`] needs from a source: hand it one pressure
130/// reading. Implemented for `Box<dyn crate::source::Source>` so the
131/// production runner can pass its already-built monitor connection in
132/// directly; tests pass a small in-memory adapter (see `VecSource` in
133/// this module's `tests`) so the decision loop is exercised without
134/// touching a live database.
135///
136/// `Send` because the runner spawns the governor on its own thread
137/// inside `thread::scope`.
138pub trait PressureSource: Send {
139 /// Return the source's current pressure reading, or `None` when the
140 /// source cannot sample this tick (the governor then holds parallelism
141 /// flat — see [`GovernorState::observe`]).
142 fn sample_pressure(&mut self) -> Option<u64>;
143}
144
145impl PressureSource for Box<dyn crate::source::Source> {
146 fn sample_pressure(&mut self) -> Option<u64> {
147 crate::source::Source::sample_pressure(self.as_mut())
148 }
149}
150
151/// The adaptive concurrency governor — the inline `thread::scope` closure
152/// that used to live in [`crate::pipeline::chunked::exec::run_chunked_parallel`]
153/// turned into a self-contained, testable abstraction.
154///
155/// Why a struct (not just functions): the decision policy
156/// ([`GovernorState`]) and the sample cadence (sample/poll intervals,
157/// `RIVET_GOVERNOR_INTERVAL_MS` env override) are runtime-coupled — the
158/// poll interval must be clamped to the sample interval, and the decision
159/// state is mutated across ticks. Bundling them into one type makes the
160/// "what to test, what to fake" boundary obvious: the source is the
161/// dependency, the runner-side side effects are a callback.
162pub struct Governor {
163 state: GovernorState,
164 sample_interval: Duration,
165 poll_interval: Duration,
166}
167
168impl Governor {
169 /// Build a governor that starts at `start`, clamped into `[floor,
170 /// ceiling]`, and uses the env-tunable sample cadence
171 /// (`RIVET_GOVERNOR_INTERVAL_MS`; falls back to
172 /// [`GOVERNOR_SAMPLE_INTERVAL_MS`]). The poll interval is clamped to
173 /// the sample interval so a tiny override (used in deterministic live
174 /// tests) actually polls that fast, instead of being capped at the
175 /// default [`GOVERNOR_POLL_MS`].
176 pub fn new(start: usize, floor: usize, ceiling: usize) -> Self {
177 let sample_ms = sample_interval_ms_from_env();
178 let poll_ms = GOVERNOR_POLL_MS.min(sample_ms);
179 Self {
180 state: GovernorState::new(start, floor, ceiling),
181 sample_interval: Duration::from_millis(sample_ms),
182 poll_interval: Duration::from_millis(poll_ms),
183 }
184 }
185
186 /// Build a governor with explicit intervals — bypasses the env-var
187 /// read so unit tests can drive the loop deterministically without
188 /// mutating process-global state.
189 #[cfg(test)]
190 pub fn with_intervals(
191 start: usize,
192 floor: usize,
193 ceiling: usize,
194 sample_interval: Duration,
195 poll_interval: Duration,
196 ) -> Self {
197 Self {
198 state: GovernorState::new(start, floor, ceiling),
199 sample_interval,
200 poll_interval,
201 }
202 }
203
204 /// Pure decision step: fold one sample into the state. Returns
205 /// `Some((from, to))` on a parallelism transition, `None` otherwise.
206 /// Mirrors [`GovernorState::observe`]; exposed at the [`Governor`]
207 /// surface so tests can drive the policy without entering `run`.
208 pub fn tick(&mut self, sample: Option<u64>) -> Option<(usize, usize)> {
209 self.state.observe(sample)
210 }
211
212 /// Drive the sample loop until `stop` returns true. On every
213 /// parallelism transition the `on_decision(from, to)` callback fires
214 /// — the runner binds it to its semaphore-resize + log +
215 /// decision-log machinery. Polls every `poll_interval`, samples
216 /// every `sample_interval`. The stop predicate is re-checked after
217 /// each poll sleep so a finished run exits within one poll quantum.
218 pub fn run<S, Stop, Decide>(&mut self, source: &mut S, stop: Stop, mut on_decision: Decide)
219 where
220 S: PressureSource + ?Sized,
221 Stop: Fn() -> bool,
222 Decide: FnMut(usize, usize),
223 {
224 let mut last_sample = Instant::now();
225 while !stop() {
226 std::thread::sleep(self.poll_interval);
227 if stop() {
228 break;
229 }
230 if last_sample.elapsed() < self.sample_interval {
231 continue;
232 }
233 last_sample = Instant::now();
234 if let Some((from, to)) = self.tick(source.sample_pressure()) {
235 on_decision(from, to);
236 }
237 }
238 }
239}
240
241/// Read `RIVET_GOVERNOR_INTERVAL_MS` and fall back to the production default.
242/// Lives next to [`Governor`] so live tests and the production runner share
243/// one resolution path — extracted from the inline read in
244/// `run_chunked_parallel` so tests can verify the cadence policy.
245fn sample_interval_ms_from_env() -> u64 {
246 std::env::var("RIVET_GOVERNOR_INTERVAL_MS")
247 .ok()
248 .and_then(|v| v.parse::<u64>().ok())
249 .filter(|&n| n > 0)
250 .unwrap_or(GOVERNOR_SAMPLE_INTERVAL_MS)
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256
257 #[test]
258 fn adaptive_shrinks_by_25_percent_under_pressure() {
259 assert_eq!(next_adaptive_batch_size(10_000, 10_000, true), 7_500);
260 assert_eq!(next_adaptive_batch_size(8_000, 10_000, true), 6_000);
261 }
262
263 #[test]
264 fn adaptive_grows_by_25_percent_when_idle() {
265 // 4_000 × 5/4 = 5_000; well under base ceiling.
266 assert_eq!(next_adaptive_batch_size(4_000, 10_000, false), 5_000);
267 }
268
269 #[test]
270 fn adaptive_recovery_caps_at_base_ceiling() {
271 // 9_000 × 5/4 = 11_250, but base is 10_000 — must clamp.
272 assert_eq!(next_adaptive_batch_size(9_000, 10_000, false), 10_000);
273 // Already at base: stays there.
274 assert_eq!(next_adaptive_batch_size(10_000, 10_000, false), 10_000);
275 }
276
277 #[test]
278 fn adaptive_shrink_respects_min_floor() {
279 // 600 × 3/4 = 450, but ADAPTIVE_MIN_BATCH = 500 — must clamp up.
280 assert_eq!(
281 next_adaptive_batch_size(600, 10_000, true),
282 ADAPTIVE_MIN_BATCH
283 );
284 // Already at floor: stays at floor.
285 assert_eq!(
286 next_adaptive_batch_size(ADAPTIVE_MIN_BATCH, 10_000, true),
287 ADAPTIVE_MIN_BATCH
288 );
289 }
290
291 #[test]
292 fn adaptive_pressure_path_ignores_base_uses_only_floor() {
293 // Pressure path never consults base: shrink is computed from current,
294 // then clamped only to ADAPTIVE_MIN_BATCH. A pathologically low base
295 // does not artificially pin us lower than the floor.
296 assert_eq!(
297 next_adaptive_batch_size(ADAPTIVE_MIN_BATCH, 100, true),
298 ADAPTIVE_MIN_BATCH
299 );
300 }
301
302 #[test]
303 fn adaptive_steady_state_oscillation_stays_bounded() {
304 // Simulate 50 sample cycles under sustained pressure, then sustained recovery.
305 // Verifies: the loop never wanders below floor or above base, and converges.
306 let base = 5_000;
307 let mut s = base;
308 for _ in 0..50 {
309 s = next_adaptive_batch_size(s, base, true);
310 }
311 assert_eq!(
312 s, ADAPTIVE_MIN_BATCH,
313 "sustained pressure must converge to floor"
314 );
315 for _ in 0..50 {
316 s = next_adaptive_batch_size(s, base, false);
317 }
318 assert_eq!(s, base, "sustained recovery must converge to base ceiling");
319 }
320
321 // ── next_parallel (governor) ──────────────────────────────────────────────
322
323 #[test]
324 fn next_parallel_sheds_one_under_pressure() {
325 assert_eq!(next_parallel(8, 1, 8, true), 7);
326 assert_eq!(next_parallel(4, 1, 8, true), 3);
327 }
328
329 #[test]
330 fn next_parallel_recovers_one_when_idle() {
331 assert_eq!(next_parallel(4, 1, 8, false), 5);
332 }
333
334 #[test]
335 fn next_parallel_shrink_respects_min_floor() {
336 assert_eq!(next_parallel(2, 2, 8, true), 2, "already at min stays");
337 assert_eq!(next_parallel(1, 1, 8, true), 1, "never below 1");
338 }
339
340 #[test]
341 fn next_parallel_grow_respects_max_ceiling() {
342 assert_eq!(next_parallel(8, 1, 8, false), 8, "already at max stays");
343 }
344
345 #[test]
346 fn next_parallel_min_floored_at_one() {
347 // A nonsensical min=0 must not let the count drop to 0 (would stall).
348 assert_eq!(next_parallel(1, 0, 8, true), 1);
349 }
350
351 #[test]
352 fn next_parallel_steady_state_converges_to_bounds() {
353 let (min, max) = (2, 6);
354 let mut p = max;
355 for _ in 0..20 {
356 p = next_parallel(p, min, max, true);
357 }
358 assert_eq!(p, min, "sustained pressure converges to min");
359 for _ in 0..20 {
360 p = next_parallel(p, min, max, false);
361 }
362 assert_eq!(p, max, "sustained recovery converges to max");
363 }
364
365 // ── GovernorState ─────────────────────────────────────────────────────────
366
367 #[test]
368 fn governor_state_clamps_start_into_bounds() {
369 assert_eq!(GovernorState::new(99, 2, 6).current(), 6);
370 assert_eq!(GovernorState::new(0, 2, 6).current(), 2);
371 // floor floored at 1, ceiling at 1.
372 assert_eq!(GovernorState::new(5, 0, 0).current(), 1);
373 }
374
375 #[test]
376 fn governor_state_first_sample_only_sets_baseline_then_recovers() {
377 // Start at ceiling: first idle sample wants to grow but is already
378 // capped, so no change is reported.
379 let mut g = GovernorState::new(6, 2, 6);
380 assert_eq!(g.observe(Some(100)), None, "at ceiling, idle ⇒ no change");
381 assert_eq!(g.current(), 6);
382 }
383
384 #[test]
385 fn governor_state_backs_off_under_rising_pressure() {
386 let mut g = GovernorState::new(6, 2, 6);
387 assert_eq!(g.observe(Some(100)), None); // baseline, at ceiling
388 assert_eq!(g.observe(Some(200)), Some((6, 5)), "rising ⇒ shed one");
389 assert_eq!(g.observe(Some(300)), Some((5, 4)));
390 assert_eq!(g.current(), 4);
391 }
392
393 #[test]
394 fn governor_state_recovers_when_pressure_flat() {
395 let mut g = GovernorState::new(3, 2, 6);
396 assert_eq!(
397 g.observe(Some(100)),
398 Some((3, 4)),
399 "flat/idle ⇒ recover one"
400 );
401 assert_eq!(g.observe(Some(100)), Some((4, 5)));
402 }
403
404 #[test]
405 fn governor_state_none_sample_holds_flat_and_keeps_baseline() {
406 let mut g = GovernorState::new(4, 2, 6);
407 assert_eq!(g.observe(Some(200)), Some((4, 5))); // baseline=200, grew
408 assert_eq!(g.observe(None), None, "no sample ⇒ no change");
409 // Baseline stayed 200, so a later 300 still reads as rising.
410 assert_eq!(
411 g.observe(Some(300)),
412 Some((5, 4)),
413 "rising vs preserved baseline"
414 );
415 }
416
417 // ── Governor (the loop, not just the decision) ────────────────────────────
418
419 /// In-memory [`PressureSource`] that hands out canned samples in order
420 /// and reports `None` once exhausted (mimics a source that lost its
421 /// connection mid-run). Bumps a shared counter on every call so the
422 /// test's `stop` predicate can fire after a fixed number of samples
423 /// — keying off decisions instead would deadlock the loop when the
424 /// first sample only sets the baseline (no decision → no signal).
425 struct VecSource {
426 samples: std::collections::VecDeque<Option<u64>>,
427 sample_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
428 }
429
430 impl VecSource {
431 fn new(
432 samples: impl IntoIterator<Item = Option<u64>>,
433 sample_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
434 ) -> Self {
435 Self {
436 samples: samples.into_iter().collect(),
437 sample_count,
438 }
439 }
440 }
441
442 impl PressureSource for VecSource {
443 fn sample_pressure(&mut self) -> Option<u64> {
444 self.sample_count
445 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
446 self.samples.pop_front().unwrap_or(None)
447 }
448 }
449
450 #[test]
451 fn governor_tick_mirrors_governor_state_observe() {
452 // The `tick` method must be a faithful surface for the policy; if it
453 // ever diverges from GovernorState::observe, the runner-side
454 // unit-test guarantee breaks. Drive both with the same sequence and
455 // assert identical outputs.
456 let samples = [Some(100u64), Some(200), Some(150), None, Some(400)];
457 let mut g =
458 Governor::with_intervals(6, 2, 6, Duration::from_millis(1), Duration::from_millis(1));
459 let mut s = GovernorState::new(6, 2, 6);
460 for sample in samples {
461 assert_eq!(g.tick(sample), s.observe(sample));
462 }
463 }
464
465 #[test]
466 fn governor_run_emits_decisions_for_every_rising_sample_until_stop() {
467 // Polls at 1 ms, samples at 1 ms — every wake fires a sample so the
468 // 5 canned samples produce exactly the 4 transitions the policy
469 // would emit (the first sample only sets the baseline, hence one
470 // fewer decision than samples). Stop predicate is keyed on the
471 // *sample* counter — keying it on decisions would deadlock because
472 // the first sample never emits one.
473 use std::sync::Arc;
474 use std::sync::atomic::{AtomicUsize, Ordering};
475 let sample_count = Arc::new(AtomicUsize::new(0));
476 let mut source = VecSource::new(
477 [
478 Some(100),
479 Some(200), // rising → 6→5
480 Some(300), // rising → 5→4
481 Some(400), // rising → 4→3
482 Some(500), // rising → 3→2 (clamped at floor)
483 ],
484 Arc::clone(&sample_count),
485 );
486 let mut gov =
487 Governor::with_intervals(6, 2, 6, Duration::from_millis(1), Duration::from_millis(1));
488 let stop_count = Arc::clone(&sample_count);
489 let stop = move || stop_count.load(Ordering::Relaxed) >= 5;
490 let mut decisions: Vec<(usize, usize)> = Vec::new();
491 gov.run(&mut source, stop, |from, to| {
492 decisions.push((from, to));
493 });
494
495 // First sample (100) only seeds the baseline → no decision.
496 // Samples 2..5 all rise → four shed-one decisions.
497 assert_eq!(decisions, vec![(6, 5), (5, 4), (4, 3), (3, 2)]);
498 }
499
500 #[test]
501 fn governor_run_stops_promptly_within_one_poll_quantum() {
502 // `stop` flips before the first sleep returns; `run` must observe it
503 // immediately on the next stop-check rather than waiting a full
504 // sample interval. Asserts the loop honours the stop predicate as a
505 // hot exit condition (the deadlock-class bug from 16fc662 would
506 // re-surface here as a non-terminating test if regressed).
507 let sample_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
508 let mut source = VecSource::new([Some(100)], sample_count);
509 let mut gov =
510 Governor::with_intervals(6, 2, 6, Duration::from_millis(50), Duration::from_millis(5));
511 let start = std::time::Instant::now();
512 gov.run(&mut source, || true, |_, _| {});
513 assert!(
514 start.elapsed() < Duration::from_millis(40),
515 "run() must exit on stop without sleeping a full sample interval"
516 );
517 }
518}