#[derive(Debug, Clone)]
pub struct ConformalPID {
alpha_target: f64,
k_p: f64,
k_i: f64,
k_d: f64,
alpha_t: f64,
s_prev: f64,
n_updates: u64,
cumulative_coverage: f64,
}
impl ConformalPID {
pub fn new(alpha_target: f64, k_p: f64, k_i: f64, k_d: f64) -> Self {
assert!(
alpha_target > 0.0 && alpha_target < 1.0,
"alpha_target must be in (0, 1), got {alpha_target}"
);
assert!(k_p >= 0.0, "k_p must be >= 0, got {k_p}");
assert!(k_i >= 0.0, "k_i must be >= 0, got {k_i}");
assert!(k_d >= 0.0, "k_d must be >= 0, got {k_d}");
Self {
alpha_target,
k_p,
k_i,
k_d,
alpha_t: alpha_target,
s_prev: 0.0,
n_updates: 0,
cumulative_coverage: 0.0,
}
}
pub fn default_90() -> Self {
Self::new(0.1, 0.05, 0.01, 0.005)
}
pub fn default_95() -> Self {
Self::new(0.05, 0.05, 0.01, 0.005)
}
pub fn update(&mut self, prediction: f64, target: f64) {
let score = (prediction - target).abs();
let err = if score > self.alpha_t { 1.0 } else { 0.0 };
self.alpha_t += self.k_i * (self.alpha_target - err)
+ self.k_p * score
+ self.k_d * (score - self.s_prev);
self.alpha_t = self.alpha_t.clamp(0.001, 10.0);
self.n_updates += 1;
let covered = if err == 0.0 { 1.0 } else { 0.0 };
self.cumulative_coverage += (covered - self.cumulative_coverage) / self.n_updates as f64;
self.s_prev = score;
}
pub fn current_alpha(&self) -> f64 {
self.alpha_t
}
pub fn interval_width(&self) -> f64 {
self.alpha_t
}
pub fn coverage(&self) -> f64 {
self.cumulative_coverage
}
pub fn n_updates(&self) -> u64 {
self.n_updates
}
pub fn reset(&mut self) {
self.alpha_t = self.alpha_target;
self.s_prev = 0.0;
self.n_updates = 0;
self.cumulative_coverage = 0.0;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn coverage_converges_to_target() {
let mut cpid = ConformalPID::default_90();
for i in 0..1000 {
let target = (i as f64) * 0.01;
let offset = if i % 10 == 0 { 50.0 } else { 0.01 };
cpid.update(target + offset, target);
}
assert!(
cpid.coverage() > 0.5,
"coverage {} should be > 0.5",
cpid.coverage()
);
}
#[test]
fn pid_tighter_than_constant_threshold() {
let mut cpid = ConformalPID::new(0.1, 0.05, 0.01, 0.005);
let initial_alpha = cpid.current_alpha();
for i in 0..200 {
let t = i as f64;
cpid.update(t + 0.01, t);
}
assert!(
(cpid.current_alpha() - initial_alpha).abs() > 1e-6,
"PID should adapt alpha_t, but it stayed at {initial_alpha}"
);
}
#[test]
fn reset_clears_state() {
let mut cpid = ConformalPID::default_90();
for i in 0..50 {
cpid.update(i as f64 + 1.0, i as f64);
}
assert!(cpid.n_updates() > 0);
cpid.reset();
assert_eq!(cpid.n_updates(), 0);
assert!((cpid.current_alpha() - 0.1).abs() < 1e-10);
assert!((cpid.coverage()).abs() < 1e-10);
}
#[test]
fn score_derivative_term_reacts_to_changes() {
let mut with_d = ConformalPID::new(0.1, 0.0, 0.01, 0.05);
let mut without_d = ConformalPID::new(0.1, 0.0, 0.01, 0.0);
for i in 0..50 {
let t = i as f64;
with_d.update(t + 0.1, t);
without_d.update(t + 0.1, t);
}
with_d.update(100.0, 0.0);
without_d.update(100.0, 0.0);
assert!(
(with_d.current_alpha() - without_d.current_alpha()).abs() > 1e-6,
"D-term should cause different alpha after score spike"
);
}
#[test]
fn saturate_prevents_runaway() {
let mut cpid = ConformalPID::new(0.1, 1.0, 1.0, 1.0);
for _ in 0..1000 {
cpid.update(1e6, 0.0);
}
assert!(
cpid.current_alpha() <= 10.0,
"alpha_t {} should be <= 10.0",
cpid.current_alpha()
);
assert!(
cpid.current_alpha() >= 0.001,
"alpha_t {} should be >= 0.001",
cpid.current_alpha()
);
let mut cpid2 = ConformalPID::new(0.1, 0.0, 1.0, 0.0);
for _ in 0..10000 {
cpid2.update(0.0, 0.0); }
assert!(cpid2.current_alpha() >= 0.001);
assert!(cpid2.current_alpha() <= 10.0);
}
#[test]
fn n_updates_tracks_correctly() {
let mut cpid = ConformalPID::default_95();
assert_eq!(cpid.n_updates(), 0);
for i in 1..=37 {
cpid.update(i as f64, 0.0);
assert_eq!(cpid.n_updates(), i);
}
}
}