Skip to main content

rivet/tuning/
adaptive.rs

1//! Adaptive batch sizing — live-feedback loop that reacts to DB pressure.
2//!
3//! Both `PostgresSource` and `MysqlSource` sample pressure metrics every
4//! [`ADAPTIVE_SAMPLE_INTERVAL`] batches (`pg_stat_bgwriter.checkpoints_req` for
5//! PG; `Innodb_log_waits` for MySQL) and call [`next_adaptive_batch_size`] to
6//! pick the next fetch size.
7//!
8//! The OPT-2 [`Governor`] runs the same idea at the *parallelism* layer: every
9//! [`GOVERNOR_SAMPLE_INTERVAL_MS`] it samples a [`PressureSource`], folds the
10//! reading through [`GovernorState`], and emits each `(from, to)` transition
11//! through a callback. Extracted from an inline `thread::scope` closure so the
12//! decision loop is unit-testable on a fake `PressureSource` without
13//! requiring a live database + a multi-second wait for two real sample
14//! intervals. The runner binds the callback to its own semaphore-resize +
15//! log + decision-log machinery.
16
17use std::time::{Duration, Instant};
18
19/// Number of batches between adaptive pressure samples.
20pub const ADAPTIVE_SAMPLE_INTERVAL: usize = 10;
21
22/// Hard floor for the adaptive fetch size — the loop never shrinks below this.
23pub const ADAPTIVE_MIN_BATCH: usize = 500;
24
25/// Decide the next adaptive fetch size from current pressure state.
26///
27/// - Under pressure: shrink to 75 %, but never below [`ADAPTIVE_MIN_BATCH`].
28/// - Otherwise: grow to 125 %, but never above the schema-chosen `base` ceiling
29///   (so we recover toward the initial fetch size without overshooting it).
30///
31/// Pure function — exported so adaptive batch-sizing can be unit-tested without
32/// a live database.
33pub fn next_adaptive_batch_size(current: usize, base: usize, under_pressure: bool) -> usize {
34    if under_pressure {
35        (current * 3 / 4).max(ADAPTIVE_MIN_BATCH)
36    } else {
37        (current * 5 / 4).min(base)
38    }
39}
40
41/// Milliseconds between governor pressure samples (the parallelism control
42/// loop in `pipeline::chunked::exec`). Coarser than batch-size adaptation:
43/// spinning workers up/down churns connections, so the governor reacts more
44/// deliberately than the per-batch fetch-size loop.
45pub const GOVERNOR_SAMPLE_INTERVAL_MS: u64 = 1500;
46
47/// Decide the next worker/connection count from current governor state.
48///
49/// Steps by **one** toward the bounds — gentler than the batch loop's ±25 %
50/// ratio, because permit counts are small integers and each step opens or
51/// retires a real source connection:
52/// - Under pressure: shed one worker, never below `min`.
53/// - Otherwise: recover one worker, never above `max`.
54///
55/// `min` is floored at 1 (a 0 ceiling would stall the pool) and `max` at `min`.
56/// Pure function — exported so the governor can be unit-tested without a live DB.
57pub fn next_parallel(current: usize, min: usize, max: usize, under_pressure: bool) -> usize {
58    let lo = min.max(1);
59    let hi = max.max(lo);
60    let cur = current.clamp(lo, hi);
61    if under_pressure {
62        cur.saturating_sub(1).max(lo)
63    } else {
64        (cur + 1).min(hi)
65    }
66}
67
68/// Decision state for the concurrency governor's control loop.
69///
70/// Holds the previous pressure sample and the current target parallelism so the
71/// I/O parts (pressure sampling, semaphore resize, journaling) stay in the
72/// execution layer while this — the actual policy — is unit-testable without a
73/// live database or threads. `under_pressure` mirrors the batch loop: a sample
74/// strictly higher than the previous one means pressure is rising.
75#[derive(Debug)]
76pub struct GovernorState {
77    prev: Option<u64>,
78    current: usize,
79    floor: usize,
80    ceiling: usize,
81}
82
83impl GovernorState {
84    /// Start at `start`, clamped into `[floor, ceiling]`. `ceiling` is floored
85    /// at 1 and `floor` clamped into `[1, ceiling]`.
86    pub fn new(start: usize, floor: usize, ceiling: usize) -> Self {
87        let ceiling = ceiling.max(1);
88        let floor = floor.clamp(1, ceiling);
89        Self {
90            prev: None,
91            current: start.clamp(floor, ceiling),
92            floor,
93            ceiling,
94        }
95    }
96
97    /// Current target parallelism. Test-only observability accessor.
98    #[cfg(test)]
99    pub fn current(&self) -> usize {
100        self.current
101    }
102
103    /// Fold one pressure sample into the state. Returns `Some((from, to))` when
104    /// the target changed (caller should resize the semaphore + journal it), or
105    /// `None` when nothing changed — including when `sample` is `None` (the
106    /// engine couldn't sample, so parallelism holds flat and the baseline is
107    /// left untouched).
108    pub fn observe(&mut self, sample: Option<u64>) -> Option<(usize, usize)> {
109        let cur_p = sample?;
110        let under_pressure = self.prev.is_some_and(|p| cur_p > p);
111        self.prev = Some(cur_p);
112        let next = next_parallel(self.current, self.floor, self.ceiling, under_pressure);
113        if next == self.current {
114            None
115        } else {
116            let from = self.current;
117            self.current = next;
118            Some((from, next))
119        }
120    }
121}
122
123/// How often the governor's `run` loop wakes to check the stop condition.
124/// Kept much shorter than [`GOVERNOR_SAMPLE_INTERVAL_MS`] so the thread exits
125/// promptly when the run finishes, instead of lingering for a full sample
126/// interval after the last chunk completes.
127pub const GOVERNOR_POLL_MS: u64 = 200;
128
129/// Narrow seam the [`Governor`] needs from a source: hand it one pressure
130/// reading. Implemented for `Box<dyn crate::source::Source>` so the
131/// production runner can pass its already-built monitor connection in
132/// directly; tests pass a small in-memory adapter (see `VecSource` in
133/// this module's `tests`) so the decision loop is exercised without
134/// touching a live database.
135///
136/// `Send` because the runner spawns the governor on its own thread
137/// inside `thread::scope`.
138pub trait PressureSource: Send {
139    /// Return the source's current pressure reading, or `None` when the
140    /// source cannot sample this tick (the governor then holds parallelism
141    /// flat — see [`GovernorState::observe`]).
142    fn sample_pressure(&mut self) -> Option<u64>;
143}
144
145impl PressureSource for Box<dyn crate::source::Source> {
146    fn sample_pressure(&mut self) -> Option<u64> {
147        crate::source::Source::sample_pressure(self.as_mut())
148    }
149}
150
151/// The adaptive concurrency governor — the inline `thread::scope` closure
152/// that used to live in [`crate::pipeline::chunked::exec::run_chunked_parallel`]
153/// turned into a self-contained, testable abstraction.
154///
155/// Why a struct (not just functions): the decision policy
156/// ([`GovernorState`]) and the sample cadence (sample/poll intervals,
157/// `RIVET_GOVERNOR_INTERVAL_MS` env override) are runtime-coupled — the
158/// poll interval must be clamped to the sample interval, and the decision
159/// state is mutated across ticks. Bundling them into one type makes the
160/// "what to test, what to fake" boundary obvious: the source is the
161/// dependency, the runner-side side effects are a callback.
162pub struct Governor {
163    state: GovernorState,
164    sample_interval: Duration,
165    poll_interval: Duration,
166}
167
168impl Governor {
169    /// Build a governor that starts at `start`, clamped into `[floor,
170    /// ceiling]`, and uses the env-tunable sample cadence
171    /// (`RIVET_GOVERNOR_INTERVAL_MS`; falls back to
172    /// [`GOVERNOR_SAMPLE_INTERVAL_MS`]). The poll interval is clamped to
173    /// the sample interval so a tiny override (used in deterministic live
174    /// tests) actually polls that fast, instead of being capped at the
175    /// default [`GOVERNOR_POLL_MS`].
176    pub fn new(start: usize, floor: usize, ceiling: usize) -> Self {
177        let sample_ms = sample_interval_ms_from_env();
178        let poll_ms = GOVERNOR_POLL_MS.min(sample_ms);
179        Self {
180            state: GovernorState::new(start, floor, ceiling),
181            sample_interval: Duration::from_millis(sample_ms),
182            poll_interval: Duration::from_millis(poll_ms),
183        }
184    }
185
186    /// Build a governor with explicit intervals — bypasses the env-var
187    /// read so unit tests can drive the loop deterministically without
188    /// mutating process-global state.
189    #[cfg(test)]
190    pub fn with_intervals(
191        start: usize,
192        floor: usize,
193        ceiling: usize,
194        sample_interval: Duration,
195        poll_interval: Duration,
196    ) -> Self {
197        Self {
198            state: GovernorState::new(start, floor, ceiling),
199            sample_interval,
200            poll_interval,
201        }
202    }
203
204    /// Pure decision step: fold one sample into the state. Returns
205    /// `Some((from, to))` on a parallelism transition, `None` otherwise.
206    /// Mirrors [`GovernorState::observe`]; exposed at the [`Governor`]
207    /// surface so tests can drive the policy without entering `run`.
208    pub fn tick(&mut self, sample: Option<u64>) -> Option<(usize, usize)> {
209        self.state.observe(sample)
210    }
211
212    /// Drive the sample loop until `stop` returns true. On every
213    /// parallelism transition the `on_decision(from, to)` callback fires
214    /// — the runner binds it to its semaphore-resize + log +
215    /// decision-log machinery. Polls every `poll_interval`, samples
216    /// every `sample_interval`. The stop predicate is re-checked after
217    /// each poll sleep so a finished run exits within one poll quantum.
218    pub fn run<S, Stop, Decide>(&mut self, source: &mut S, stop: Stop, mut on_decision: Decide)
219    where
220        S: PressureSource + ?Sized,
221        Stop: Fn() -> bool,
222        Decide: FnMut(usize, usize),
223    {
224        let mut last_sample = Instant::now();
225        while !stop() {
226            std::thread::sleep(self.poll_interval);
227            if stop() {
228                break;
229            }
230            if last_sample.elapsed() < self.sample_interval {
231                continue;
232            }
233            last_sample = Instant::now();
234            if let Some((from, to)) = self.tick(source.sample_pressure()) {
235                on_decision(from, to);
236            }
237        }
238    }
239}
240
241/// Read `RIVET_GOVERNOR_INTERVAL_MS` and fall back to the production default.
242/// Lives next to [`Governor`] so live tests and the production runner share
243/// one resolution path — extracted from the inline read in
244/// `run_chunked_parallel` so tests can verify the cadence policy.
245fn sample_interval_ms_from_env() -> u64 {
246    std::env::var("RIVET_GOVERNOR_INTERVAL_MS")
247        .ok()
248        .and_then(|v| v.parse::<u64>().ok())
249        .filter(|&n| n > 0)
250        .unwrap_or(GOVERNOR_SAMPLE_INTERVAL_MS)
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256
257    #[test]
258    fn adaptive_shrinks_by_25_percent_under_pressure() {
259        assert_eq!(next_adaptive_batch_size(10_000, 10_000, true), 7_500);
260        assert_eq!(next_adaptive_batch_size(8_000, 10_000, true), 6_000);
261    }
262
263    #[test]
264    fn adaptive_grows_by_25_percent_when_idle() {
265        // 4_000 × 5/4 = 5_000; well under base ceiling.
266        assert_eq!(next_adaptive_batch_size(4_000, 10_000, false), 5_000);
267    }
268
269    #[test]
270    fn adaptive_recovery_caps_at_base_ceiling() {
271        // 9_000 × 5/4 = 11_250, but base is 10_000 — must clamp.
272        assert_eq!(next_adaptive_batch_size(9_000, 10_000, false), 10_000);
273        // Already at base: stays there.
274        assert_eq!(next_adaptive_batch_size(10_000, 10_000, false), 10_000);
275    }
276
277    #[test]
278    fn adaptive_shrink_respects_min_floor() {
279        // 600 × 3/4 = 450, but ADAPTIVE_MIN_BATCH = 500 — must clamp up.
280        assert_eq!(
281            next_adaptive_batch_size(600, 10_000, true),
282            ADAPTIVE_MIN_BATCH
283        );
284        // Already at floor: stays at floor.
285        assert_eq!(
286            next_adaptive_batch_size(ADAPTIVE_MIN_BATCH, 10_000, true),
287            ADAPTIVE_MIN_BATCH
288        );
289    }
290
291    #[test]
292    fn adaptive_pressure_path_ignores_base_uses_only_floor() {
293        // Pressure path never consults base: shrink is computed from current,
294        // then clamped only to ADAPTIVE_MIN_BATCH. A pathologically low base
295        // does not artificially pin us lower than the floor.
296        assert_eq!(
297            next_adaptive_batch_size(ADAPTIVE_MIN_BATCH, 100, true),
298            ADAPTIVE_MIN_BATCH
299        );
300    }
301
302    #[test]
303    fn adaptive_steady_state_oscillation_stays_bounded() {
304        // Simulate 50 sample cycles under sustained pressure, then sustained recovery.
305        // Verifies: the loop never wanders below floor or above base, and converges.
306        let base = 5_000;
307        let mut s = base;
308        for _ in 0..50 {
309            s = next_adaptive_batch_size(s, base, true);
310        }
311        assert_eq!(
312            s, ADAPTIVE_MIN_BATCH,
313            "sustained pressure must converge to floor"
314        );
315        for _ in 0..50 {
316            s = next_adaptive_batch_size(s, base, false);
317        }
318        assert_eq!(s, base, "sustained recovery must converge to base ceiling");
319    }
320
321    // ── next_parallel (governor) ──────────────────────────────────────────────
322
323    #[test]
324    fn next_parallel_sheds_one_under_pressure() {
325        assert_eq!(next_parallel(8, 1, 8, true), 7);
326        assert_eq!(next_parallel(4, 1, 8, true), 3);
327    }
328
329    #[test]
330    fn next_parallel_recovers_one_when_idle() {
331        assert_eq!(next_parallel(4, 1, 8, false), 5);
332    }
333
334    #[test]
335    fn next_parallel_shrink_respects_min_floor() {
336        assert_eq!(next_parallel(2, 2, 8, true), 2, "already at min stays");
337        assert_eq!(next_parallel(1, 1, 8, true), 1, "never below 1");
338    }
339
340    #[test]
341    fn next_parallel_grow_respects_max_ceiling() {
342        assert_eq!(next_parallel(8, 1, 8, false), 8, "already at max stays");
343    }
344
345    #[test]
346    fn next_parallel_min_floored_at_one() {
347        // A nonsensical min=0 must not let the count drop to 0 (would stall).
348        assert_eq!(next_parallel(1, 0, 8, true), 1);
349    }
350
351    #[test]
352    fn next_parallel_steady_state_converges_to_bounds() {
353        let (min, max) = (2, 6);
354        let mut p = max;
355        for _ in 0..20 {
356            p = next_parallel(p, min, max, true);
357        }
358        assert_eq!(p, min, "sustained pressure converges to min");
359        for _ in 0..20 {
360            p = next_parallel(p, min, max, false);
361        }
362        assert_eq!(p, max, "sustained recovery converges to max");
363    }
364
365    // ── GovernorState ─────────────────────────────────────────────────────────
366
367    #[test]
368    fn governor_state_clamps_start_into_bounds() {
369        assert_eq!(GovernorState::new(99, 2, 6).current(), 6);
370        assert_eq!(GovernorState::new(0, 2, 6).current(), 2);
371        // floor floored at 1, ceiling at 1.
372        assert_eq!(GovernorState::new(5, 0, 0).current(), 1);
373    }
374
375    #[test]
376    fn governor_state_first_sample_only_sets_baseline_then_recovers() {
377        // Start at ceiling: first idle sample wants to grow but is already
378        // capped, so no change is reported.
379        let mut g = GovernorState::new(6, 2, 6);
380        assert_eq!(g.observe(Some(100)), None, "at ceiling, idle ⇒ no change");
381        assert_eq!(g.current(), 6);
382    }
383
384    #[test]
385    fn governor_state_backs_off_under_rising_pressure() {
386        let mut g = GovernorState::new(6, 2, 6);
387        assert_eq!(g.observe(Some(100)), None); // baseline, at ceiling
388        assert_eq!(g.observe(Some(200)), Some((6, 5)), "rising ⇒ shed one");
389        assert_eq!(g.observe(Some(300)), Some((5, 4)));
390        assert_eq!(g.current(), 4);
391    }
392
393    #[test]
394    fn governor_state_recovers_when_pressure_flat() {
395        let mut g = GovernorState::new(3, 2, 6);
396        assert_eq!(
397            g.observe(Some(100)),
398            Some((3, 4)),
399            "flat/idle ⇒ recover one"
400        );
401        assert_eq!(g.observe(Some(100)), Some((4, 5)));
402    }
403
404    #[test]
405    fn governor_state_none_sample_holds_flat_and_keeps_baseline() {
406        let mut g = GovernorState::new(4, 2, 6);
407        assert_eq!(g.observe(Some(200)), Some((4, 5))); // baseline=200, grew
408        assert_eq!(g.observe(None), None, "no sample ⇒ no change");
409        // Baseline stayed 200, so a later 300 still reads as rising.
410        assert_eq!(
411            g.observe(Some(300)),
412            Some((5, 4)),
413            "rising vs preserved baseline"
414        );
415    }
416
417    // ── Governor (the loop, not just the decision) ────────────────────────────
418
419    /// In-memory [`PressureSource`] that hands out canned samples in order
420    /// and reports `None` once exhausted (mimics a source that lost its
421    /// connection mid-run). Bumps a shared counter on every call so the
422    /// test's `stop` predicate can fire after a fixed number of samples
423    /// — keying off decisions instead would deadlock the loop when the
424    /// first sample only sets the baseline (no decision → no signal).
425    struct VecSource {
426        samples: std::collections::VecDeque<Option<u64>>,
427        sample_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
428    }
429
430    impl VecSource {
431        fn new(
432            samples: impl IntoIterator<Item = Option<u64>>,
433            sample_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
434        ) -> Self {
435            Self {
436                samples: samples.into_iter().collect(),
437                sample_count,
438            }
439        }
440    }
441
442    impl PressureSource for VecSource {
443        fn sample_pressure(&mut self) -> Option<u64> {
444            self.sample_count
445                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
446            self.samples.pop_front().unwrap_or(None)
447        }
448    }
449
450    #[test]
451    fn governor_tick_mirrors_governor_state_observe() {
452        // The `tick` method must be a faithful surface for the policy; if it
453        // ever diverges from GovernorState::observe, the runner-side
454        // unit-test guarantee breaks. Drive both with the same sequence and
455        // assert identical outputs.
456        let samples = [Some(100u64), Some(200), Some(150), None, Some(400)];
457        let mut g =
458            Governor::with_intervals(6, 2, 6, Duration::from_millis(1), Duration::from_millis(1));
459        let mut s = GovernorState::new(6, 2, 6);
460        for sample in samples {
461            assert_eq!(g.tick(sample), s.observe(sample));
462        }
463    }
464
465    #[test]
466    fn governor_run_emits_decisions_for_every_rising_sample_until_stop() {
467        // Polls at 1 ms, samples at 1 ms — every wake fires a sample so the
468        // 5 canned samples produce exactly the 4 transitions the policy
469        // would emit (the first sample only sets the baseline, hence one
470        // fewer decision than samples). Stop predicate is keyed on the
471        // *sample* counter — keying it on decisions would deadlock because
472        // the first sample never emits one.
473        use std::sync::Arc;
474        use std::sync::atomic::{AtomicUsize, Ordering};
475        let sample_count = Arc::new(AtomicUsize::new(0));
476        let mut source = VecSource::new(
477            [
478                Some(100),
479                Some(200), // rising → 6→5
480                Some(300), // rising → 5→4
481                Some(400), // rising → 4→3
482                Some(500), // rising → 3→2 (clamped at floor)
483            ],
484            Arc::clone(&sample_count),
485        );
486        let mut gov =
487            Governor::with_intervals(6, 2, 6, Duration::from_millis(1), Duration::from_millis(1));
488        let stop_count = Arc::clone(&sample_count);
489        let stop = move || stop_count.load(Ordering::Relaxed) >= 5;
490        let mut decisions: Vec<(usize, usize)> = Vec::new();
491        gov.run(&mut source, stop, |from, to| {
492            decisions.push((from, to));
493        });
494
495        // First sample (100) only seeds the baseline → no decision.
496        // Samples 2..5 all rise → four shed-one decisions.
497        assert_eq!(decisions, vec![(6, 5), (5, 4), (4, 3), (3, 2)]);
498    }
499
500    #[test]
501    fn governor_run_stops_promptly_within_one_poll_quantum() {
502        // `stop` flips before the first sleep returns; `run` must observe it
503        // immediately on the next stop-check rather than waiting a full
504        // sample interval. Asserts the loop honours the stop predicate as a
505        // hot exit condition (the deadlock-class bug from 16fc662 would
506        // re-surface here as a non-terminating test if regressed).
507        let sample_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
508        let mut source = VecSource::new([Some(100)], sample_count);
509        let mut gov =
510            Governor::with_intervals(6, 2, 6, Duration::from_millis(50), Duration::from_millis(5));
511        let start = std::time::Instant::now();
512        gov.run(&mut source, || true, |_, _| {});
513        assert!(
514            start.elapsed() < Duration::from_millis(40),
515            "run() must exit on stop without sleeping a full sample interval"
516        );
517    }
518}