Skip to main content

fsqlite_mvcc/
mpc_commit_controller.rs

1//! Model Predictive Control (MPC) commit pipeline controller (IMPL-26).
2//!
3//! Scaffolding-only — this controller is not wired into real commit paths yet.
4//! The intent is to pin the API and algebra so downstream beads can replace the
5//! heuristic in one place.
6//!
7//! The dynamics are a simple linearised single-dimensional queue model:
8//!
9//! ```text
10//!     x_{k+1} = a * x_k + b * u_k + w_k
11//! ```
12//!
13//! where `x` is observed queue depth (pending commits), `u` is admission rate
14//! (commits per tick we are willing to accept), and `w` is an unmodelled
15//! disturbance. `a` captures natural drain (e.g. replication, flush), and `b`
16//! is the actuator gain — i.e. how strongly admission rate moves the queue.
17//!
18//! For a one-step horizon LQR with cost `(x_{k+1} - target)^2 + rho * (u - u_prev)^2`
19//! the unconstrained optimum is closed-form:
20//!
21//! ```text
22//!     u* = (target - a * x) / b
23//! ```
24//!
25//! (Derived from requiring `a*x + b*u = target`; note admission *raises* the
26//! queue so u is positive when the queue is below target and negative when
27//! above; the `[0, max_rate]` clamp handles the "above target → don't admit,
28//! let natural drain `a` do the work" case.)
29//!
30//! After clamping into `[0, max_rate]` we smooth toward the previous control to
31//! avoid chattering:
32//!
33//! ```text
34//!     u = u_prev + (1 / (1 + rho)) * (u* - u_prev)
35//! ```
36//!
37//! This is intentionally 20-ish lines of real logic. Replace with a multi-step
38//! horizon QP when we start caring about anticipated disturbance profiles.
39
40/// Default control-effort weight. Higher values damp adjustments harder.
41const DEFAULT_RHO: f64 = 1.0;
42
43/// Minimum positive actuator gain. Protects against division-by-zero when
44/// a caller accidentally plugs in `b = 0`.
45const MIN_B: f64 = 1e-9;
46
47/// One-step MPC controller for the commit admission pipeline.
48#[derive(Debug, Clone, Copy)]
49pub struct MpcCommitController {
50    /// Desired queue depth (setpoint).
51    target: f64,
52    /// State dynamics coefficient. `x_{k+1} = a * x_k + b * u_k`.
53    /// Values in `[0, 1)` model exponential decay when `u = 0`.
54    a: f64,
55    /// Actuator gain — admission-rate to queue-depth sensitivity.
56    b: f64,
57    /// Previous admission rate, used for smoothing.
58    u_prev: f64,
59    /// Admission-rate cap. The controller never returns more than this.
60    max_rate: f64,
61    /// Control-effort weight. Higher `rho` means smaller control steps.
62    rho: f64,
63}
64
65impl MpcCommitController {
66    /// Build a controller with sensible defaults (`a = 0.9`, `b = 1.0`,
67    /// `rho = 1.0`, `u_prev = 0.0`).
68    #[must_use]
69    pub fn new(target: f64, max_rate: f64) -> Self {
70        Self::with_params(target, max_rate, 0.9, 1.0, DEFAULT_RHO)
71    }
72
73    /// Build a controller with explicit dynamics and smoothing parameters.
74    #[must_use]
75    pub fn with_params(target: f64, max_rate: f64, a: f64, b: f64, rho: f64) -> Self {
76        let target = if target.is_finite() && target >= 0.0 {
77            target
78        } else {
79            0.0
80        };
81        let max_rate = if max_rate.is_finite() && max_rate >= 0.0 {
82            max_rate
83        } else {
84            0.0
85        };
86        let a = if a.is_finite() { a } else { 0.9 };
87        let b = if b.is_finite() && b.abs() > MIN_B {
88            b
89        } else {
90            1.0
91        };
92        let rho = if rho.is_finite() && rho >= 0.0 {
93            rho
94        } else {
95            DEFAULT_RHO
96        };
97        Self {
98            target,
99            a,
100            b,
101            u_prev: 0.0,
102            max_rate,
103            rho,
104        }
105    }
106
107    /// Current setpoint.
108    #[must_use]
109    pub fn target(&self) -> f64 {
110        self.target
111    }
112
113    /// Last admission rate this controller emitted.
114    #[must_use]
115    pub fn last_rate(&self) -> f64 {
116        self.u_prev
117    }
118
119    /// Advance the controller by one observation and return the next admission
120    /// rate. Callers typically interpret this as "commits allowed per tick".
121    pub fn step(&mut self, measured_queue_depth: f64) -> f64 {
122        let x = if measured_queue_depth.is_finite() {
123            measured_queue_depth.max(0.0)
124        } else {
125            0.0
126        };
127
128        // Unconstrained one-step LQR optimum: pick u so that the predicted
129        // next state lands exactly on target, i.e.
130        //      a * x + b * u_star = target
131        //   => u_star = (target - a * x) / b
132        //
133        // When the queue is *below* target, `u_star` is positive and we admit
134        // more to drive the queue up toward the setpoint. When the queue is
135        // above target the optimum becomes negative and the `[0, max_rate]`
136        // clamp pins it to zero, letting the natural drain coefficient `a` do
137        // the work.
138        let u_star_raw = self.a.mul_add(-x, self.target) / self.b;
139        let u_star = u_star_raw.clamp(0.0, self.max_rate);
140
141        // Smooth toward u_star. The factor 1 / (1 + rho) is the closed-form
142        // minimiser of the smoothed cost once the unconstrained optimum has
143        // been picked; `rho = 0` means "jump immediately", larger `rho` means
144        // slower response.
145        let alpha = 1.0 / (1.0 + self.rho);
146        let u = self.u_prev + alpha * (u_star - self.u_prev);
147        let u = u.clamp(0.0, self.max_rate);
148        self.u_prev = u;
149        u
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    /// Simulate the plant `x_{k+1} = a*x_k + b*u_k + w_k` using the controller's
158    /// own dynamics parameters so the test doesn't have to guess.
159    fn simulate(
160        ctrl: &mut MpcCommitController,
161        x0: f64,
162        steps: usize,
163        disturbance: impl Fn(usize) -> f64,
164    ) -> Vec<f64> {
165        let a = ctrl.a;
166        let b = ctrl.b;
167        let mut x = x0;
168        let mut trace = Vec::with_capacity(steps + 1);
169        trace.push(x);
170        for k in 0..steps {
171            let u = ctrl.step(x);
172            // Plant update with exogenous disturbance.
173            x = (b.mul_add(u, a * x) + disturbance(k)).max(0.0);
174            trace.push(x);
175        }
176        trace
177    }
178
179    #[test]
180    fn mpc_commit_converges_to_target() {
181        // Start at x=10, target=5, drain a=0.9, gain b=1.0, max_rate=10.
182        let mut ctrl = MpcCommitController::with_params(5.0, 10.0, 0.9, 1.0, 1.0);
183        let trace = simulate(&mut ctrl, 10.0, 20, |_| 0.0);
184        let final_x = *trace.last().expect("non-empty trace");
185        // After 20 steps we should be comfortably within 10% of target.
186        assert!(
187            (final_x - 5.0).abs() < 0.5,
188            "expected convergence toward 5.0, got {final_x}; trace={trace:?}"
189        );
190        // And we should be strictly closer than the starting error.
191        assert!((final_x - 5.0).abs() < (10.0_f64 - 5.0).abs());
192    }
193
194    #[test]
195    fn mpc_commit_recovers_from_disturbance() {
196        let mut ctrl = MpcCommitController::with_params(5.0, 10.0, 0.9, 1.0, 1.0);
197        // Settle at target.
198        let _ = simulate(&mut ctrl, 5.0, 40, |_| 0.0);
199        // Inject a one-shot disturbance of +5 at k=0, then let it decay.
200        let trace = simulate(&mut ctrl, 5.0, 30, |k| if k == 0 { 5.0 } else { 0.0 });
201        let final_x = *trace.last().expect("non-empty trace");
202        // Within 20% of target after recovery window.
203        assert!(
204            (final_x - 5.0).abs() < 1.0,
205            "expected recovery within 20% of target, got {final_x}; trace={trace:?}"
206        );
207    }
208
209    #[test]
210    fn mpc_commit_rate_is_clamped() {
211        let mut ctrl = MpcCommitController::with_params(0.0, 3.0, 0.9, 1.0, 0.0);
212        // Huge queue with rho=0 would want a massive u; clamp must kick in.
213        let u = ctrl.step(1_000.0);
214        assert!((0.0..=3.0).contains(&u), "expected clamp to [0,3], got {u}");
215    }
216
217    #[test]
218    fn mpc_commit_sanitises_bad_inputs() {
219        // NaN / infinite / negative inputs all get replaced with safe defaults.
220        let ctrl = MpcCommitController::with_params(
221            f64::NAN,
222            f64::INFINITY,
223            f64::NAN,
224            0.0, // below MIN_B -> defaults to 1.0
225            -1.0,
226        );
227        assert_eq!(ctrl.target(), 0.0);
228        assert_eq!(ctrl.last_rate(), 0.0);
229    }
230}