Skip to main content

rivet/tuning/
adaptive.rs

1//! Adaptive batch sizing — live-feedback loop that reacts to DB pressure.
2//!
3//! Both `PostgresSource` and `MysqlSource` sample pressure metrics every
4//! [`ADAPTIVE_SAMPLE_INTERVAL`] batches (`pg_stat_bgwriter.checkpoints_req` for
5//! PG; `Innodb_log_waits` for MySQL) and call [`next_adaptive_batch_size`] to
6//! pick the next fetch size.
7
8/// Number of batches between adaptive pressure samples.
9pub const ADAPTIVE_SAMPLE_INTERVAL: usize = 10;
10
11/// Hard floor for the adaptive fetch size — the loop never shrinks below this.
12pub const ADAPTIVE_MIN_BATCH: usize = 500;
13
14/// Decide the next adaptive fetch size from current pressure state.
15///
16/// - Under pressure: shrink to 75 %, but never below [`ADAPTIVE_MIN_BATCH`].
17/// - Otherwise: grow to 125 %, but never above the schema-chosen `base` ceiling
18///   (so we recover toward the initial fetch size without overshooting it).
19///
20/// Pure function — exported so adaptive batch-sizing can be unit-tested without
21/// a live database.
22pub fn next_adaptive_batch_size(current: usize, base: usize, under_pressure: bool) -> usize {
23    if under_pressure {
24        (current * 3 / 4).max(ADAPTIVE_MIN_BATCH)
25    } else {
26        (current * 5 / 4).min(base)
27    }
28}
29
30#[cfg(test)]
31mod tests {
32    use super::*;
33
34    #[test]
35    fn adaptive_shrinks_by_25_percent_under_pressure() {
36        assert_eq!(next_adaptive_batch_size(10_000, 10_000, true), 7_500);
37        assert_eq!(next_adaptive_batch_size(8_000, 10_000, true), 6_000);
38    }
39
40    #[test]
41    fn adaptive_grows_by_25_percent_when_idle() {
42        // 4_000 × 5/4 = 5_000; well under base ceiling.
43        assert_eq!(next_adaptive_batch_size(4_000, 10_000, false), 5_000);
44    }
45
46    #[test]
47    fn adaptive_recovery_caps_at_base_ceiling() {
48        // 9_000 × 5/4 = 11_250, but base is 10_000 — must clamp.
49        assert_eq!(next_adaptive_batch_size(9_000, 10_000, false), 10_000);
50        // Already at base: stays there.
51        assert_eq!(next_adaptive_batch_size(10_000, 10_000, false), 10_000);
52    }
53
54    #[test]
55    fn adaptive_shrink_respects_min_floor() {
56        // 600 × 3/4 = 450, but ADAPTIVE_MIN_BATCH = 500 — must clamp up.
57        assert_eq!(
58            next_adaptive_batch_size(600, 10_000, true),
59            ADAPTIVE_MIN_BATCH
60        );
61        // Already at floor: stays at floor.
62        assert_eq!(
63            next_adaptive_batch_size(ADAPTIVE_MIN_BATCH, 10_000, true),
64            ADAPTIVE_MIN_BATCH
65        );
66    }
67
68    #[test]
69    fn adaptive_pressure_path_ignores_base_uses_only_floor() {
70        // Pressure path never consults base: shrink is computed from current,
71        // then clamped only to ADAPTIVE_MIN_BATCH. A pathologically low base
72        // does not artificially pin us lower than the floor.
73        assert_eq!(
74            next_adaptive_batch_size(ADAPTIVE_MIN_BATCH, 100, true),
75            ADAPTIVE_MIN_BATCH
76        );
77    }
78
79    #[test]
80    fn adaptive_steady_state_oscillation_stays_bounded() {
81        // Simulate 50 sample cycles under sustained pressure, then sustained recovery.
82        // Verifies: the loop never wanders below floor or above base, and converges.
83        let base = 5_000;
84        let mut s = base;
85        for _ in 0..50 {
86            s = next_adaptive_batch_size(s, base, true);
87        }
88        assert_eq!(
89            s, ADAPTIVE_MIN_BATCH,
90            "sustained pressure must converge to floor"
91        );
92        for _ in 0..50 {
93            s = next_adaptive_batch_size(s, base, false);
94        }
95        assert_eq!(s, base, "sustained recovery must converge to base ceiling");
96    }
97}