rivet/tuning/adaptive.rs
1//! Adaptive batch sizing — live-feedback loop that reacts to DB pressure.
2//!
3//! Both `PostgresSource` and `MysqlSource` sample pressure metrics every
4//! [`ADAPTIVE_SAMPLE_INTERVAL`] batches (`pg_stat_bgwriter.checkpoints_req` for
5//! PG; `Innodb_log_waits` for MySQL) and call [`next_adaptive_batch_size`] to
6//! pick the next fetch size.
7
8/// Number of batches between adaptive pressure samples.
9pub const ADAPTIVE_SAMPLE_INTERVAL: usize = 10;
10
11/// Hard floor for the adaptive fetch size — the loop never shrinks below this.
12pub const ADAPTIVE_MIN_BATCH: usize = 500;
13
14/// Decide the next adaptive fetch size from current pressure state.
15///
16/// - Under pressure: shrink to 75 %, but never below [`ADAPTIVE_MIN_BATCH`].
17/// - Otherwise: grow to 125 %, but never above the schema-chosen `base` ceiling
18/// (so we recover toward the initial fetch size without overshooting it).
19///
20/// Pure function — exported so adaptive batch-sizing can be unit-tested without
21/// a live database.
22pub fn next_adaptive_batch_size(current: usize, base: usize, under_pressure: bool) -> usize {
23 if under_pressure {
24 (current * 3 / 4).max(ADAPTIVE_MIN_BATCH)
25 } else {
26 (current * 5 / 4).min(base)
27 }
28}
29
30#[cfg(test)]
31mod tests {
32 use super::*;
33
34 #[test]
35 fn adaptive_shrinks_by_25_percent_under_pressure() {
36 assert_eq!(next_adaptive_batch_size(10_000, 10_000, true), 7_500);
37 assert_eq!(next_adaptive_batch_size(8_000, 10_000, true), 6_000);
38 }
39
40 #[test]
41 fn adaptive_grows_by_25_percent_when_idle() {
42 // 4_000 × 5/4 = 5_000; well under base ceiling.
43 assert_eq!(next_adaptive_batch_size(4_000, 10_000, false), 5_000);
44 }
45
46 #[test]
47 fn adaptive_recovery_caps_at_base_ceiling() {
48 // 9_000 × 5/4 = 11_250, but base is 10_000 — must clamp.
49 assert_eq!(next_adaptive_batch_size(9_000, 10_000, false), 10_000);
50 // Already at base: stays there.
51 assert_eq!(next_adaptive_batch_size(10_000, 10_000, false), 10_000);
52 }
53
54 #[test]
55 fn adaptive_shrink_respects_min_floor() {
56 // 600 × 3/4 = 450, but ADAPTIVE_MIN_BATCH = 500 — must clamp up.
57 assert_eq!(
58 next_adaptive_batch_size(600, 10_000, true),
59 ADAPTIVE_MIN_BATCH
60 );
61 // Already at floor: stays at floor.
62 assert_eq!(
63 next_adaptive_batch_size(ADAPTIVE_MIN_BATCH, 10_000, true),
64 ADAPTIVE_MIN_BATCH
65 );
66 }
67
68 #[test]
69 fn adaptive_pressure_path_ignores_base_uses_only_floor() {
70 // Pressure path never consults base: shrink is computed from current,
71 // then clamped only to ADAPTIVE_MIN_BATCH. A pathologically low base
72 // does not artificially pin us lower than the floor.
73 assert_eq!(
74 next_adaptive_batch_size(ADAPTIVE_MIN_BATCH, 100, true),
75 ADAPTIVE_MIN_BATCH
76 );
77 }
78
79 #[test]
80 fn adaptive_steady_state_oscillation_stays_bounded() {
81 // Simulate 50 sample cycles under sustained pressure, then sustained recovery.
82 // Verifies: the loop never wanders below floor or above base, and converges.
83 let base = 5_000;
84 let mut s = base;
85 for _ in 0..50 {
86 s = next_adaptive_batch_size(s, base, true);
87 }
88 assert_eq!(
89 s, ADAPTIVE_MIN_BATCH,
90 "sustained pressure must converge to floor"
91 );
92 for _ in 0..50 {
93 s = next_adaptive_batch_size(s, base, false);
94 }
95 assert_eq!(s, base, "sustained recovery must converge to base ceiling");
96 }
97}