Skip to main content

fsqlite_types/
eprocess.rs

1//! Lightweight e-process oracle for statistical query shedding.
2//!
3//! An e-process is a non-negative supermartingale under H₀ with E₀ = 1.
4//! When the running e-value exceeds 1/α we reject H₀ (anomaly detected).
5//! This module provides a self-contained implementation used by [`Cx`] to
6//! shed low-priority queries when anomaly pressure is high.
7
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9
10/// Configuration for the e-process martingale.
11#[derive(Debug, Clone, Copy)]
12pub struct EProcessConfig {
13    /// Null hypothesis anomaly rate bound.
14    pub p0: f64,
15    /// Betting parameter in `E_{t+1} = E_t * (1 + lambda * (x_t - p0))`.
16    pub lambda: f64,
17    /// Significance level (reject when e-value >= 1/alpha).
18    pub alpha: f64,
19    /// Cap on e-value to prevent overflow.
20    pub max_evalue: f64,
21}
22
23/// Weighted health telemetry feeding the e-process update.
24#[derive(Debug, Clone, Copy, PartialEq)]
25pub struct EProcessSignal {
26    /// First-committer-wins abort rate or equivalent write-conflict signal.
27    pub fcw_abort_rate: f64,
28    /// Pager/cache miss ratio on the `[0, 1]` interval.
29    pub cache_miss_ratio: f64,
30    /// Memory or cache pressure on the `[0, 1]` interval.
31    pub memory_pressure: f64,
32    /// Final anomaly score used to classify the observation.
33    pub anomaly_score: f64,
34}
35
36impl EProcessSignal {
37    /// Create a signal from raw telemetry, using the connection default weights.
38    #[must_use]
39    pub fn new(fcw_abort_rate: f64, cache_miss_ratio: f64, memory_pressure: f64) -> Self {
40        let fcw_abort_rate = sanitize_unit_interval(fcw_abort_rate);
41        let cache_miss_ratio = sanitize_unit_interval(cache_miss_ratio);
42        let memory_pressure = sanitize_unit_interval(memory_pressure);
43        let anomaly_score =
44            fcw_abort_rate.mul_add(0.5, cache_miss_ratio.mul_add(0.3, memory_pressure * 0.2));
45        Self {
46            fcw_abort_rate,
47            cache_miss_ratio,
48            memory_pressure,
49            anomaly_score: sanitize_unit_interval(anomaly_score),
50        }
51    }
52
53    /// Override the derived anomaly score with an externally computed one.
54    #[must_use]
55    pub fn with_anomaly_score(mut self, anomaly_score: f64) -> Self {
56        self.anomaly_score = sanitize_unit_interval(anomaly_score);
57        self
58    }
59
60    #[must_use]
61    fn is_anomalous(self) -> bool {
62        self.anomaly_score >= 0.5
63    }
64}
65
66/// Snapshot of oracle state for diagnostics.
67#[derive(Debug, Clone, PartialEq)]
68pub struct EProcessSnapshot {
69    /// Current e-value (encoded as f64 bits).
70    pub evalue: f64,
71    /// Total observations processed.
72    pub observations: u64,
73    /// Current rejection threshold (`1 / alpha`).
74    pub rejection_threshold: f64,
75    /// Priority must be strictly greater than this threshold to be shed.
76    pub priority_threshold: u8,
77    /// Most recent telemetry signal, when one was recorded.
78    pub last_signal: Option<EProcessSignal>,
79}
80
81/// Decision artifact captured when a checkpoint consults the oracle.
82#[derive(Debug, Clone, PartialEq)]
83pub struct EProcessDecision {
84    /// Snapshot used to make the decision.
85    pub snapshot: EProcessSnapshot,
86    /// Priority level of the checked context.
87    pub priority: u8,
88    /// Whether the oracle recommends shedding the checked context.
89    pub should_shed: bool,
90}
91
92/// Lock-free bridge for publishing e-process telemetry from runtime code.
93#[derive(Debug, Default)]
94pub struct EProcessTelemetryBridge {
95    signal_present: AtomicBool,
96    fcw_abort_rate_bits: AtomicU64,
97    cache_miss_ratio_bits: AtomicU64,
98    memory_pressure_bits: AtomicU64,
99    anomaly_score_bits: AtomicU64,
100}
101
102impl EProcessTelemetryBridge {
103    /// Create an empty bridge.
104    #[must_use]
105    pub fn new() -> Self {
106        Self::default()
107    }
108
109    /// Publish a fully materialized signal.
110    pub fn record_signal(&self, signal: EProcessSignal) {
111        self.fcw_abort_rate_bits
112            .store(signal.fcw_abort_rate.to_bits(), Ordering::Relaxed);
113        self.cache_miss_ratio_bits
114            .store(signal.cache_miss_ratio.to_bits(), Ordering::Relaxed);
115        self.memory_pressure_bits
116            .store(signal.memory_pressure.to_bits(), Ordering::Relaxed);
117        self.anomaly_score_bits
118            .store(signal.anomaly_score.to_bits(), Ordering::Relaxed);
119        self.signal_present.store(true, Ordering::Release);
120    }
121
122    /// Publish telemetry components using the default anomaly-score weights.
123    pub fn record_components(
124        &self,
125        fcw_abort_rate: f64,
126        cache_miss_ratio: f64,
127        memory_pressure: f64,
128    ) {
129        self.record_signal(EProcessSignal::new(
130            fcw_abort_rate,
131            cache_miss_ratio,
132            memory_pressure,
133        ));
134    }
135
136    /// Read the most recently published signal.
137    #[must_use]
138    pub fn snapshot(&self) -> Option<EProcessSignal> {
139        if !self.signal_present.load(Ordering::Acquire) {
140            return None;
141        }
142        Some(EProcessSignal {
143            fcw_abort_rate: f64::from_bits(self.fcw_abort_rate_bits.load(Ordering::Relaxed)),
144            cache_miss_ratio: f64::from_bits(self.cache_miss_ratio_bits.load(Ordering::Relaxed)),
145            memory_pressure: f64::from_bits(self.memory_pressure_bits.load(Ordering::Relaxed)),
146            anomaly_score: f64::from_bits(self.anomaly_score_bits.load(Ordering::Relaxed)),
147        })
148    }
149}
150
151/// Anytime-valid anomaly oracle that signals when to shed low-priority work.
152///
153/// Thread-safe: all state is stored in atomics.
154#[derive(Debug)]
155pub struct EProcessOracle {
156    config: EProcessConfig,
157    /// Priority threshold: only shed contexts with priority strictly above this value.
158    priority_threshold: u8,
159    /// Running e-value stored as f64 bits in an AtomicU64.
160    evalue_bits: AtomicU64,
161    /// Total observation count.
162    observations: AtomicU64,
163    /// Most recent telemetry signal, when supplied.
164    signal_present: AtomicBool,
165    fcw_abort_rate_bits: AtomicU64,
166    cache_miss_ratio_bits: AtomicU64,
167    memory_pressure_bits: AtomicU64,
168    anomaly_score_bits: AtomicU64,
169}
170
171impl EProcessOracle {
172    /// Create a new oracle.
173    #[must_use]
174    pub fn new(config: EProcessConfig, priority_threshold: u8) -> Self {
175        let config = sanitize_config(config);
176        Self {
177            config,
178            priority_threshold,
179            evalue_bits: AtomicU64::new(1.0_f64.to_bits()),
180            observations: AtomicU64::new(0),
181            signal_present: AtomicBool::new(false),
182            fcw_abort_rate_bits: AtomicU64::new(0.0_f64.to_bits()),
183            cache_miss_ratio_bits: AtomicU64::new(0.0_f64.to_bits()),
184            memory_pressure_bits: AtomicU64::new(0.0_f64.to_bits()),
185            anomaly_score_bits: AtomicU64::new(0.0_f64.to_bits()),
186        }
187    }
188
189    /// Record an observation. `anomaly = true` means an anomaly was observed.
190    pub fn observe_sample(&self, anomaly: bool) {
191        self.observations.fetch_add(1, Ordering::Relaxed);
192
193        // Betting-martingale update (anytime-valid under H0):
194        //   E_{t+1} = E_t * (1 + lambda * (x_t - p0))
195        // where x_t = 1 for anomaly and 0 for normal.
196        let x_t = if anomaly { 1.0 } else { 0.0 };
197        let factor = self
198            .config
199            .lambda
200            .mul_add(x_t - self.config.p0, 1.0)
201            .max(0.0);
202
203        // Atomic CAS loop to update the e-value.
204        loop {
205            let old_bits = self.evalue_bits.load(Ordering::Relaxed);
206            let old_val = f64::from_bits(old_bits);
207            let mut new_val = old_val * factor;
208            if !new_val.is_finite() {
209                new_val = self.config.max_evalue;
210            }
211            new_val = new_val.min(self.config.max_evalue).max(0.0);
212            let new_bits = new_val.to_bits();
213            if self
214                .evalue_bits
215                .compare_exchange_weak(old_bits, new_bits, Ordering::Release, Ordering::Relaxed)
216                .is_ok()
217            {
218                break;
219            }
220        }
221    }
222
223    /// Record a full telemetry signal and update the e-process classification.
224    pub fn observe_signal(&self, signal: EProcessSignal) {
225        self.store_signal(signal);
226        self.observe_sample(signal.is_anomalous());
227    }
228
229    /// Record the latest signal published into a telemetry bridge.
230    ///
231    /// Returns `true` when a signal was present and consumed.
232    pub fn observe_bridge(&self, bridge: &EProcessTelemetryBridge) -> bool {
233        let Some(signal) = bridge.snapshot() else {
234            return false;
235        };
236        self.observe_signal(signal);
237        true
238    }
239
240    /// Returns `true` if the oracle recommends shedding a context at the given
241    /// priority level. Only sheds when priority > threshold AND e-value >= 1/alpha.
242    #[must_use]
243    pub fn should_shed(&self, priority: u8) -> bool {
244        if priority <= self.priority_threshold {
245            return false;
246        }
247        let evalue = f64::from_bits(self.evalue_bits.load(Ordering::Acquire));
248        evalue >= self.rejection_threshold()
249    }
250
251    /// Build a decision artifact for a specific priority level.
252    #[must_use]
253    pub fn decision(&self, priority: u8) -> EProcessDecision {
254        let snapshot = self.snapshot();
255        let should_shed = priority > snapshot.priority_threshold
256            && snapshot.evalue >= snapshot.rejection_threshold;
257        EProcessDecision {
258            snapshot,
259            priority,
260            should_shed,
261        }
262    }
263
264    /// Current e-value for the oracle.
265    #[must_use]
266    pub fn e_value(&self) -> f64 {
267        f64::from_bits(self.evalue_bits.load(Ordering::Acquire))
268    }
269
270    /// Rejection threshold `1/alpha` for the current oracle configuration.
271    #[must_use]
272    pub fn rejection_threshold(&self) -> f64 {
273        1.0 / self.config.alpha
274    }
275
276    /// Priority threshold for shedding.
277    #[must_use]
278    pub const fn priority_threshold(&self) -> u8 {
279        self.priority_threshold
280    }
281
282    /// Snapshot current oracle state.
283    #[must_use]
284    pub fn snapshot(&self) -> EProcessSnapshot {
285        EProcessSnapshot {
286            evalue: self.e_value(),
287            observations: self.observations.load(Ordering::Relaxed),
288            rejection_threshold: self.rejection_threshold(),
289            priority_threshold: self.priority_threshold,
290            last_signal: self.last_signal(),
291        }
292    }
293
294    fn store_signal(&self, signal: EProcessSignal) {
295        self.fcw_abort_rate_bits
296            .store(signal.fcw_abort_rate.to_bits(), Ordering::Relaxed);
297        self.cache_miss_ratio_bits
298            .store(signal.cache_miss_ratio.to_bits(), Ordering::Relaxed);
299        self.memory_pressure_bits
300            .store(signal.memory_pressure.to_bits(), Ordering::Relaxed);
301        self.anomaly_score_bits
302            .store(signal.anomaly_score.to_bits(), Ordering::Relaxed);
303        self.signal_present.store(true, Ordering::Release);
304    }
305
306    #[must_use]
307    fn last_signal(&self) -> Option<EProcessSignal> {
308        if !self.signal_present.load(Ordering::Acquire) {
309            return None;
310        }
311        Some(EProcessSignal {
312            fcw_abort_rate: f64::from_bits(self.fcw_abort_rate_bits.load(Ordering::Relaxed)),
313            cache_miss_ratio: f64::from_bits(self.cache_miss_ratio_bits.load(Ordering::Relaxed)),
314            memory_pressure: f64::from_bits(self.memory_pressure_bits.load(Ordering::Relaxed)),
315            anomaly_score: f64::from_bits(self.anomaly_score_bits.load(Ordering::Relaxed)),
316        })
317    }
318}
319
320fn sanitize_config(mut config: EProcessConfig) -> EProcessConfig {
321    const EPS: f64 = 1e-9;
322
323    if !config.p0.is_finite() {
324        config.p0 = 0.1;
325    }
326    config.p0 = config.p0.clamp(EPS, 1.0 - EPS);
327
328    if !config.alpha.is_finite() || config.alpha <= 0.0 {
329        config.alpha = 0.05;
330    }
331    config.alpha = config.alpha.clamp(EPS, 1.0);
332
333    if !config.max_evalue.is_finite() || config.max_evalue < 1.0 {
334        config.max_evalue = 1.0;
335    }
336
337    let lambda_min = -1.0 / (1.0 - config.p0) + EPS;
338    let lambda_max = 1.0 / config.p0 - EPS;
339    if !config.lambda.is_finite() {
340        config.lambda = 0.0;
341    }
342    config.lambda = config.lambda.clamp(lambda_min, lambda_max);
343
344    config
345}
346
347fn sanitize_unit_interval(value: f64) -> f64 {
348    if !value.is_finite() {
349        return 0.0;
350    }
351    value.clamp(0.0, 1.0)
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357
358    fn lcg_next(state: &mut u64) -> u64 {
359        *state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
360        *state
361    }
362
363    fn bernoulli_sample(state: &mut u64, p: f64) -> bool {
364        let raw = (lcg_next(state) >> 11) as f64 / ((1_u64 << 53) as f64);
365        raw < p
366    }
367
368    fn test_config() -> EProcessConfig {
369        EProcessConfig {
370            p0: 0.1,
371            lambda: 5.0,
372            alpha: 0.05,
373            max_evalue: 1e12,
374        }
375    }
376
377    #[test]
378    fn eprocess_threshold_crossing_triggers_shed() {
379        let oracle = EProcessOracle::new(test_config(), 1);
380        oracle.observe_sample(true);
381        oracle.observe_sample(true);
382        let snapshot = oracle.snapshot();
383        assert!(snapshot.evalue >= oracle.rejection_threshold());
384        assert_eq!(snapshot.rejection_threshold, oracle.rejection_threshold());
385        assert_eq!(snapshot.priority_threshold, 1);
386        assert!(oracle.should_shed(3));
387    }
388
389    #[test]
390    fn eprocess_priority_threshold_blocks_shed() {
391        let oracle = EProcessOracle::new(test_config(), 1);
392        oracle.observe_sample(true);
393        oracle.observe_sample(true);
394        assert!(!oracle.should_shed(1));
395    }
396
397    #[test]
398    fn eprocess_healthy_stream_does_not_false_alarm() {
399        let oracle = EProcessOracle::new(
400            EProcessConfig {
401                p0: 0.1,
402                lambda: 0.5,
403                alpha: 0.01,
404                max_evalue: 1e12,
405            },
406            0,
407        );
408
409        for _ in 0..500 {
410            oracle.observe_sample(false);
411        }
412
413        let snapshot = oracle.snapshot();
414        assert!(snapshot.evalue < oracle.rejection_threshold());
415        assert!(!oracle.should_shed(2));
416    }
417
418    #[test]
419    fn eprocess_null_rate_stream_stays_below_threshold() {
420        let oracle = EProcessOracle::new(
421            EProcessConfig {
422                p0: 0.1,
423                lambda: 0.5,
424                alpha: 0.01,
425                max_evalue: 1e12,
426            },
427            0,
428        );
429
430        let mut state = 0x5eed_u64;
431        for _ in 0..2_000 {
432            let anomaly = bernoulli_sample(&mut state, 0.02);
433            oracle.observe_sample(anomaly);
434        }
435
436        assert!(oracle.snapshot().evalue < oracle.rejection_threshold());
437    }
438
439    #[test]
440    fn eprocess_snapshot_tracks_observations() {
441        let oracle = EProcessOracle::new(test_config(), 1);
442        oracle.observe_sample(true);
443        oracle.observe_sample(false);
444        oracle.observe_sample(true);
445        assert_eq!(oracle.snapshot().observations, 3);
446    }
447
448    #[test]
449    fn eprocess_signal_snapshot_records_diagnostics() {
450        let oracle = EProcessOracle::new(test_config(), 1);
451        let signal = EProcessSignal::new(0.8, 0.5, 0.25);
452        oracle.observe_signal(signal);
453        let snapshot = oracle.snapshot();
454        assert_eq!(snapshot.last_signal, Some(signal));
455        assert_eq!(snapshot.priority_threshold, 1);
456        assert_eq!(snapshot.rejection_threshold, 20.0);
457    }
458
459    #[test]
460    fn eprocess_bridge_ingestion_updates_last_signal() {
461        let oracle = EProcessOracle::new(test_config(), 1);
462        let bridge = EProcessTelemetryBridge::new();
463        bridge.record_components(0.9, 0.6, 0.5);
464        assert!(oracle.observe_bridge(&bridge));
465        let signal = bridge
466            .snapshot()
467            .expect("bridge should hold the latest signal");
468        assert_eq!(oracle.snapshot().last_signal, Some(signal));
469    }
470
471    #[test]
472    fn eprocess_decision_captures_priority_and_snapshot() {
473        let oracle = EProcessOracle::new(test_config(), 1);
474        oracle.observe_signal(EProcessSignal::new(1.0, 1.0, 1.0));
475        oracle.observe_signal(EProcessSignal::new(1.0, 1.0, 1.0));
476        let decision = oracle.decision(3);
477        assert_eq!(decision.priority, 3);
478        assert!(decision.should_shed);
479        assert_eq!(decision.snapshot.priority_threshold, 1);
480    }
481
482    #[test]
483    fn eprocess_sanitizes_invalid_config() {
484        let oracle = EProcessOracle::new(
485            EProcessConfig {
486                p0: 5.0,
487                lambda: f64::INFINITY,
488                alpha: 0.0,
489                max_evalue: -1.0,
490            },
491            0,
492        );
493
494        // Should remain finite and non-negative after updates.
495        oracle.observe_sample(false);
496        oracle.observe_sample(true);
497        let snapshot = oracle.snapshot();
498        assert!(snapshot.evalue.is_finite());
499        assert!(snapshot.evalue >= 0.0);
500        assert!(oracle.rejection_threshold().is_finite());
501    }
502}