fsqlite_mvcc/mpc_commit_controller.rs
1//! Model Predictive Control (MPC) commit pipeline controller (IMPL-26).
2//!
3//! Scaffolding-only — this controller is not wired into real commit paths yet.
4//! The intent is to pin the API and algebra so downstream beads can replace the
5//! heuristic in one place.
6//!
7//! The dynamics are a simple linearised single-dimensional queue model:
8//!
9//! ```text
10//! x_{k+1} = a * x_k + b * u_k + w_k
11//! ```
12//!
13//! where `x` is observed queue depth (pending commits), `u` is admission rate
14//! (commits per tick we are willing to accept), and `w` is an unmodelled
15//! disturbance. `a` captures natural drain (e.g. replication, flush), and `b`
16//! is the actuator gain — i.e. how strongly admission rate moves the queue.
17//!
18//! For a one-step horizon LQR with cost `(x_{k+1} - target)^2 + rho * (u - u_prev)^2`
19//! the unconstrained optimum is closed-form:
20//!
21//! ```text
22//! u* = (target - a * x) / b
23//! ```
24//!
25//! (Derived from requiring `a*x + b*u = target`; note admission *raises* the
26//! queue so u is positive when the queue is below target and negative when
27//! above; the `[0, max_rate]` clamp handles the "above target → don't admit,
28//! let natural drain `a` do the work" case.)
29//!
30//! After clamping into `[0, max_rate]` we smooth toward the previous control to
31//! avoid chattering:
32//!
33//! ```text
34//! u = u_prev + (1 / (1 + rho)) * (u* - u_prev)
35//! ```
36//!
37//! This is intentionally 20-ish lines of real logic. Replace with a multi-step
38//! horizon QP when we start caring about anticipated disturbance profiles.
39
40/// Default control-effort weight. Higher values damp adjustments harder.
41const DEFAULT_RHO: f64 = 1.0;
42
43/// Minimum positive actuator gain. Protects against division-by-zero when
44/// a caller accidentally plugs in `b = 0`.
45const MIN_B: f64 = 1e-9;
46
47/// One-step MPC controller for the commit admission pipeline.
48#[derive(Debug, Clone, Copy)]
49pub struct MpcCommitController {
50 /// Desired queue depth (setpoint).
51 target: f64,
52 /// State dynamics coefficient. `x_{k+1} = a * x_k + b * u_k`.
53 /// Values in `[0, 1)` model exponential decay when `u = 0`.
54 a: f64,
55 /// Actuator gain — admission-rate to queue-depth sensitivity.
56 b: f64,
57 /// Previous admission rate, used for smoothing.
58 u_prev: f64,
59 /// Admission-rate cap. The controller never returns more than this.
60 max_rate: f64,
61 /// Control-effort weight. Higher `rho` means smaller control steps.
62 rho: f64,
63}
64
65impl MpcCommitController {
66 /// Build a controller with sensible defaults (`a = 0.9`, `b = 1.0`,
67 /// `rho = 1.0`, `u_prev = 0.0`).
68 #[must_use]
69 pub fn new(target: f64, max_rate: f64) -> Self {
70 Self::with_params(target, max_rate, 0.9, 1.0, DEFAULT_RHO)
71 }
72
73 /// Build a controller with explicit dynamics and smoothing parameters.
74 #[must_use]
75 pub fn with_params(target: f64, max_rate: f64, a: f64, b: f64, rho: f64) -> Self {
76 let target = if target.is_finite() && target >= 0.0 {
77 target
78 } else {
79 0.0
80 };
81 let max_rate = if max_rate.is_finite() && max_rate >= 0.0 {
82 max_rate
83 } else {
84 0.0
85 };
86 let a = if a.is_finite() { a } else { 0.9 };
87 let b = if b.is_finite() && b.abs() > MIN_B {
88 b
89 } else {
90 1.0
91 };
92 let rho = if rho.is_finite() && rho >= 0.0 {
93 rho
94 } else {
95 DEFAULT_RHO
96 };
97 Self {
98 target,
99 a,
100 b,
101 u_prev: 0.0,
102 max_rate,
103 rho,
104 }
105 }
106
107 /// Current setpoint.
108 #[must_use]
109 pub fn target(&self) -> f64 {
110 self.target
111 }
112
113 /// Last admission rate this controller emitted.
114 #[must_use]
115 pub fn last_rate(&self) -> f64 {
116 self.u_prev
117 }
118
119 /// Advance the controller by one observation and return the next admission
120 /// rate. Callers typically interpret this as "commits allowed per tick".
121 pub fn step(&mut self, measured_queue_depth: f64) -> f64 {
122 let x = if measured_queue_depth.is_finite() {
123 measured_queue_depth.max(0.0)
124 } else {
125 0.0
126 };
127
128 // Unconstrained one-step LQR optimum: pick u so that the predicted
129 // next state lands exactly on target, i.e.
130 // a * x + b * u_star = target
131 // => u_star = (target - a * x) / b
132 //
133 // When the queue is *below* target, `u_star` is positive and we admit
134 // more to drive the queue up toward the setpoint. When the queue is
135 // above target the optimum becomes negative and the `[0, max_rate]`
136 // clamp pins it to zero, letting the natural drain coefficient `a` do
137 // the work.
138 let u_star_raw = self.a.mul_add(-x, self.target) / self.b;
139 let u_star = u_star_raw.clamp(0.0, self.max_rate);
140
141 // Smooth toward u_star. The factor 1 / (1 + rho) is the closed-form
142 // minimiser of the smoothed cost once the unconstrained optimum has
143 // been picked; `rho = 0` means "jump immediately", larger `rho` means
144 // slower response.
145 let alpha = 1.0 / (1.0 + self.rho);
146 let u = self.u_prev + alpha * (u_star - self.u_prev);
147 let u = u.clamp(0.0, self.max_rate);
148 self.u_prev = u;
149 u
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 /// Simulate the plant `x_{k+1} = a*x_k + b*u_k + w_k` using the controller's
158 /// own dynamics parameters so the test doesn't have to guess.
159 fn simulate(
160 ctrl: &mut MpcCommitController,
161 x0: f64,
162 steps: usize,
163 disturbance: impl Fn(usize) -> f64,
164 ) -> Vec<f64> {
165 let a = ctrl.a;
166 let b = ctrl.b;
167 let mut x = x0;
168 let mut trace = Vec::with_capacity(steps + 1);
169 trace.push(x);
170 for k in 0..steps {
171 let u = ctrl.step(x);
172 // Plant update with exogenous disturbance.
173 x = (b.mul_add(u, a * x) + disturbance(k)).max(0.0);
174 trace.push(x);
175 }
176 trace
177 }
178
179 #[test]
180 fn mpc_commit_converges_to_target() {
181 // Start at x=10, target=5, drain a=0.9, gain b=1.0, max_rate=10.
182 let mut ctrl = MpcCommitController::with_params(5.0, 10.0, 0.9, 1.0, 1.0);
183 let trace = simulate(&mut ctrl, 10.0, 20, |_| 0.0);
184 let final_x = *trace.last().expect("non-empty trace");
185 // After 20 steps we should be comfortably within 10% of target.
186 assert!(
187 (final_x - 5.0).abs() < 0.5,
188 "expected convergence toward 5.0, got {final_x}; trace={trace:?}"
189 );
190 // And we should be strictly closer than the starting error.
191 assert!((final_x - 5.0).abs() < (10.0_f64 - 5.0).abs());
192 }
193
194 #[test]
195 fn mpc_commit_recovers_from_disturbance() {
196 let mut ctrl = MpcCommitController::with_params(5.0, 10.0, 0.9, 1.0, 1.0);
197 // Settle at target.
198 let _ = simulate(&mut ctrl, 5.0, 40, |_| 0.0);
199 // Inject a one-shot disturbance of +5 at k=0, then let it decay.
200 let trace = simulate(&mut ctrl, 5.0, 30, |k| if k == 0 { 5.0 } else { 0.0 });
201 let final_x = *trace.last().expect("non-empty trace");
202 // Within 20% of target after recovery window.
203 assert!(
204 (final_x - 5.0).abs() < 1.0,
205 "expected recovery within 20% of target, got {final_x}; trace={trace:?}"
206 );
207 }
208
209 #[test]
210 fn mpc_commit_rate_is_clamped() {
211 let mut ctrl = MpcCommitController::with_params(0.0, 3.0, 0.9, 1.0, 0.0);
212 // Huge queue with rho=0 would want a massive u; clamp must kick in.
213 let u = ctrl.step(1_000.0);
214 assert!((0.0..=3.0).contains(&u), "expected clamp to [0,3], got {u}");
215 }
216
217 #[test]
218 fn mpc_commit_sanitises_bad_inputs() {
219 // NaN / infinite / negative inputs all get replaced with safe defaults.
220 let ctrl = MpcCommitController::with_params(
221 f64::NAN,
222 f64::INFINITY,
223 f64::NAN,
224 0.0, // below MIN_B -> defaults to 1.0
225 -1.0,
226 );
227 assert_eq!(ctrl.target(), 0.0);
228 assert_eq!(ctrl.last_rate(), 0.0);
229 }
230}