1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9
10#[derive(Debug, Clone, Copy)]
12pub struct EProcessConfig {
13 pub p0: f64,
15 pub lambda: f64,
17 pub alpha: f64,
19 pub max_evalue: f64,
21}
22
23#[derive(Debug, Clone, Copy, PartialEq)]
25pub struct EProcessSignal {
26 pub fcw_abort_rate: f64,
28 pub cache_miss_ratio: f64,
30 pub memory_pressure: f64,
32 pub anomaly_score: f64,
34}
35
36impl EProcessSignal {
37 #[must_use]
39 pub fn new(fcw_abort_rate: f64, cache_miss_ratio: f64, memory_pressure: f64) -> Self {
40 let fcw_abort_rate = sanitize_unit_interval(fcw_abort_rate);
41 let cache_miss_ratio = sanitize_unit_interval(cache_miss_ratio);
42 let memory_pressure = sanitize_unit_interval(memory_pressure);
43 let anomaly_score =
44 fcw_abort_rate.mul_add(0.5, cache_miss_ratio.mul_add(0.3, memory_pressure * 0.2));
45 Self {
46 fcw_abort_rate,
47 cache_miss_ratio,
48 memory_pressure,
49 anomaly_score: sanitize_unit_interval(anomaly_score),
50 }
51 }
52
53 #[must_use]
55 pub fn with_anomaly_score(mut self, anomaly_score: f64) -> Self {
56 self.anomaly_score = sanitize_unit_interval(anomaly_score);
57 self
58 }
59
60 #[must_use]
61 fn is_anomalous(self) -> bool {
62 self.anomaly_score >= 0.5
63 }
64}
65
66#[derive(Debug, Clone, PartialEq)]
68pub struct EProcessSnapshot {
69 pub evalue: f64,
71 pub observations: u64,
73 pub rejection_threshold: f64,
75 pub priority_threshold: u8,
77 pub last_signal: Option<EProcessSignal>,
79}
80
81#[derive(Debug, Clone, PartialEq)]
83pub struct EProcessDecision {
84 pub snapshot: EProcessSnapshot,
86 pub priority: u8,
88 pub should_shed: bool,
90}
91
92#[derive(Debug, Default)]
94pub struct EProcessTelemetryBridge {
95 signal_present: AtomicBool,
96 fcw_abort_rate_bits: AtomicU64,
97 cache_miss_ratio_bits: AtomicU64,
98 memory_pressure_bits: AtomicU64,
99 anomaly_score_bits: AtomicU64,
100}
101
102impl EProcessTelemetryBridge {
103 #[must_use]
105 pub fn new() -> Self {
106 Self::default()
107 }
108
109 pub fn record_signal(&self, signal: EProcessSignal) {
111 self.fcw_abort_rate_bits
112 .store(signal.fcw_abort_rate.to_bits(), Ordering::Relaxed);
113 self.cache_miss_ratio_bits
114 .store(signal.cache_miss_ratio.to_bits(), Ordering::Relaxed);
115 self.memory_pressure_bits
116 .store(signal.memory_pressure.to_bits(), Ordering::Relaxed);
117 self.anomaly_score_bits
118 .store(signal.anomaly_score.to_bits(), Ordering::Relaxed);
119 self.signal_present.store(true, Ordering::Release);
120 }
121
122 pub fn record_components(
124 &self,
125 fcw_abort_rate: f64,
126 cache_miss_ratio: f64,
127 memory_pressure: f64,
128 ) {
129 self.record_signal(EProcessSignal::new(
130 fcw_abort_rate,
131 cache_miss_ratio,
132 memory_pressure,
133 ));
134 }
135
136 #[must_use]
138 pub fn snapshot(&self) -> Option<EProcessSignal> {
139 if !self.signal_present.load(Ordering::Acquire) {
140 return None;
141 }
142 Some(EProcessSignal {
143 fcw_abort_rate: f64::from_bits(self.fcw_abort_rate_bits.load(Ordering::Relaxed)),
144 cache_miss_ratio: f64::from_bits(self.cache_miss_ratio_bits.load(Ordering::Relaxed)),
145 memory_pressure: f64::from_bits(self.memory_pressure_bits.load(Ordering::Relaxed)),
146 anomaly_score: f64::from_bits(self.anomaly_score_bits.load(Ordering::Relaxed)),
147 })
148 }
149}
150
151#[derive(Debug)]
155pub struct EProcessOracle {
156 config: EProcessConfig,
157 priority_threshold: u8,
159 evalue_bits: AtomicU64,
161 observations: AtomicU64,
163 signal_present: AtomicBool,
165 fcw_abort_rate_bits: AtomicU64,
166 cache_miss_ratio_bits: AtomicU64,
167 memory_pressure_bits: AtomicU64,
168 anomaly_score_bits: AtomicU64,
169}
170
171impl EProcessOracle {
172 #[must_use]
174 pub fn new(config: EProcessConfig, priority_threshold: u8) -> Self {
175 let config = sanitize_config(config);
176 Self {
177 config,
178 priority_threshold,
179 evalue_bits: AtomicU64::new(1.0_f64.to_bits()),
180 observations: AtomicU64::new(0),
181 signal_present: AtomicBool::new(false),
182 fcw_abort_rate_bits: AtomicU64::new(0.0_f64.to_bits()),
183 cache_miss_ratio_bits: AtomicU64::new(0.0_f64.to_bits()),
184 memory_pressure_bits: AtomicU64::new(0.0_f64.to_bits()),
185 anomaly_score_bits: AtomicU64::new(0.0_f64.to_bits()),
186 }
187 }
188
189 pub fn observe_sample(&self, anomaly: bool) {
191 self.observations.fetch_add(1, Ordering::Relaxed);
192
193 let x_t = if anomaly { 1.0 } else { 0.0 };
197 let factor = self
198 .config
199 .lambda
200 .mul_add(x_t - self.config.p0, 1.0)
201 .max(0.0);
202
203 loop {
205 let old_bits = self.evalue_bits.load(Ordering::Relaxed);
206 let old_val = f64::from_bits(old_bits);
207 let mut new_val = old_val * factor;
208 if !new_val.is_finite() {
209 new_val = self.config.max_evalue;
210 }
211 new_val = new_val.min(self.config.max_evalue).max(0.0);
212 let new_bits = new_val.to_bits();
213 if self
214 .evalue_bits
215 .compare_exchange_weak(old_bits, new_bits, Ordering::Release, Ordering::Relaxed)
216 .is_ok()
217 {
218 break;
219 }
220 }
221 }
222
223 pub fn observe_signal(&self, signal: EProcessSignal) {
225 self.store_signal(signal);
226 self.observe_sample(signal.is_anomalous());
227 }
228
229 pub fn observe_bridge(&self, bridge: &EProcessTelemetryBridge) -> bool {
233 let Some(signal) = bridge.snapshot() else {
234 return false;
235 };
236 self.observe_signal(signal);
237 true
238 }
239
240 #[must_use]
243 pub fn should_shed(&self, priority: u8) -> bool {
244 if priority <= self.priority_threshold {
245 return false;
246 }
247 let evalue = f64::from_bits(self.evalue_bits.load(Ordering::Acquire));
248 evalue >= self.rejection_threshold()
249 }
250
251 #[must_use]
253 pub fn decision(&self, priority: u8) -> EProcessDecision {
254 let snapshot = self.snapshot();
255 let should_shed = priority > snapshot.priority_threshold
256 && snapshot.evalue >= snapshot.rejection_threshold;
257 EProcessDecision {
258 snapshot,
259 priority,
260 should_shed,
261 }
262 }
263
264 #[must_use]
266 pub fn e_value(&self) -> f64 {
267 f64::from_bits(self.evalue_bits.load(Ordering::Acquire))
268 }
269
270 #[must_use]
272 pub fn rejection_threshold(&self) -> f64 {
273 1.0 / self.config.alpha
274 }
275
276 #[must_use]
278 pub const fn priority_threshold(&self) -> u8 {
279 self.priority_threshold
280 }
281
282 #[must_use]
284 pub fn snapshot(&self) -> EProcessSnapshot {
285 EProcessSnapshot {
286 evalue: self.e_value(),
287 observations: self.observations.load(Ordering::Relaxed),
288 rejection_threshold: self.rejection_threshold(),
289 priority_threshold: self.priority_threshold,
290 last_signal: self.last_signal(),
291 }
292 }
293
294 fn store_signal(&self, signal: EProcessSignal) {
295 self.fcw_abort_rate_bits
296 .store(signal.fcw_abort_rate.to_bits(), Ordering::Relaxed);
297 self.cache_miss_ratio_bits
298 .store(signal.cache_miss_ratio.to_bits(), Ordering::Relaxed);
299 self.memory_pressure_bits
300 .store(signal.memory_pressure.to_bits(), Ordering::Relaxed);
301 self.anomaly_score_bits
302 .store(signal.anomaly_score.to_bits(), Ordering::Relaxed);
303 self.signal_present.store(true, Ordering::Release);
304 }
305
306 #[must_use]
307 fn last_signal(&self) -> Option<EProcessSignal> {
308 if !self.signal_present.load(Ordering::Acquire) {
309 return None;
310 }
311 Some(EProcessSignal {
312 fcw_abort_rate: f64::from_bits(self.fcw_abort_rate_bits.load(Ordering::Relaxed)),
313 cache_miss_ratio: f64::from_bits(self.cache_miss_ratio_bits.load(Ordering::Relaxed)),
314 memory_pressure: f64::from_bits(self.memory_pressure_bits.load(Ordering::Relaxed)),
315 anomaly_score: f64::from_bits(self.anomaly_score_bits.load(Ordering::Relaxed)),
316 })
317 }
318}
319
320fn sanitize_config(mut config: EProcessConfig) -> EProcessConfig {
321 const EPS: f64 = 1e-9;
322
323 if !config.p0.is_finite() {
324 config.p0 = 0.1;
325 }
326 config.p0 = config.p0.clamp(EPS, 1.0 - EPS);
327
328 if !config.alpha.is_finite() || config.alpha <= 0.0 {
329 config.alpha = 0.05;
330 }
331 config.alpha = config.alpha.clamp(EPS, 1.0);
332
333 if !config.max_evalue.is_finite() || config.max_evalue < 1.0 {
334 config.max_evalue = 1.0;
335 }
336
337 let lambda_min = -1.0 / (1.0 - config.p0) + EPS;
338 let lambda_max = 1.0 / config.p0 - EPS;
339 if !config.lambda.is_finite() {
340 config.lambda = 0.0;
341 }
342 config.lambda = config.lambda.clamp(lambda_min, lambda_max);
343
344 config
345}
346
347fn sanitize_unit_interval(value: f64) -> f64 {
348 if !value.is_finite() {
349 return 0.0;
350 }
351 value.clamp(0.0, 1.0)
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357
358 fn lcg_next(state: &mut u64) -> u64 {
359 *state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
360 *state
361 }
362
363 fn bernoulli_sample(state: &mut u64, p: f64) -> bool {
364 let raw = (lcg_next(state) >> 11) as f64 / ((1_u64 << 53) as f64);
365 raw < p
366 }
367
368 fn test_config() -> EProcessConfig {
369 EProcessConfig {
370 p0: 0.1,
371 lambda: 5.0,
372 alpha: 0.05,
373 max_evalue: 1e12,
374 }
375 }
376
377 #[test]
378 fn eprocess_threshold_crossing_triggers_shed() {
379 let oracle = EProcessOracle::new(test_config(), 1);
380 oracle.observe_sample(true);
381 oracle.observe_sample(true);
382 let snapshot = oracle.snapshot();
383 assert!(snapshot.evalue >= oracle.rejection_threshold());
384 assert_eq!(snapshot.rejection_threshold, oracle.rejection_threshold());
385 assert_eq!(snapshot.priority_threshold, 1);
386 assert!(oracle.should_shed(3));
387 }
388
389 #[test]
390 fn eprocess_priority_threshold_blocks_shed() {
391 let oracle = EProcessOracle::new(test_config(), 1);
392 oracle.observe_sample(true);
393 oracle.observe_sample(true);
394 assert!(!oracle.should_shed(1));
395 }
396
397 #[test]
398 fn eprocess_healthy_stream_does_not_false_alarm() {
399 let oracle = EProcessOracle::new(
400 EProcessConfig {
401 p0: 0.1,
402 lambda: 0.5,
403 alpha: 0.01,
404 max_evalue: 1e12,
405 },
406 0,
407 );
408
409 for _ in 0..500 {
410 oracle.observe_sample(false);
411 }
412
413 let snapshot = oracle.snapshot();
414 assert!(snapshot.evalue < oracle.rejection_threshold());
415 assert!(!oracle.should_shed(2));
416 }
417
418 #[test]
419 fn eprocess_null_rate_stream_stays_below_threshold() {
420 let oracle = EProcessOracle::new(
421 EProcessConfig {
422 p0: 0.1,
423 lambda: 0.5,
424 alpha: 0.01,
425 max_evalue: 1e12,
426 },
427 0,
428 );
429
430 let mut state = 0x5eed_u64;
431 for _ in 0..2_000 {
432 let anomaly = bernoulli_sample(&mut state, 0.02);
433 oracle.observe_sample(anomaly);
434 }
435
436 assert!(oracle.snapshot().evalue < oracle.rejection_threshold());
437 }
438
439 #[test]
440 fn eprocess_snapshot_tracks_observations() {
441 let oracle = EProcessOracle::new(test_config(), 1);
442 oracle.observe_sample(true);
443 oracle.observe_sample(false);
444 oracle.observe_sample(true);
445 assert_eq!(oracle.snapshot().observations, 3);
446 }
447
448 #[test]
449 fn eprocess_signal_snapshot_records_diagnostics() {
450 let oracle = EProcessOracle::new(test_config(), 1);
451 let signal = EProcessSignal::new(0.8, 0.5, 0.25);
452 oracle.observe_signal(signal);
453 let snapshot = oracle.snapshot();
454 assert_eq!(snapshot.last_signal, Some(signal));
455 assert_eq!(snapshot.priority_threshold, 1);
456 assert_eq!(snapshot.rejection_threshold, 20.0);
457 }
458
459 #[test]
460 fn eprocess_bridge_ingestion_updates_last_signal() {
461 let oracle = EProcessOracle::new(test_config(), 1);
462 let bridge = EProcessTelemetryBridge::new();
463 bridge.record_components(0.9, 0.6, 0.5);
464 assert!(oracle.observe_bridge(&bridge));
465 let signal = bridge
466 .snapshot()
467 .expect("bridge should hold the latest signal");
468 assert_eq!(oracle.snapshot().last_signal, Some(signal));
469 }
470
471 #[test]
472 fn eprocess_decision_captures_priority_and_snapshot() {
473 let oracle = EProcessOracle::new(test_config(), 1);
474 oracle.observe_signal(EProcessSignal::new(1.0, 1.0, 1.0));
475 oracle.observe_signal(EProcessSignal::new(1.0, 1.0, 1.0));
476 let decision = oracle.decision(3);
477 assert_eq!(decision.priority, 3);
478 assert!(decision.should_shed);
479 assert_eq!(decision.snapshot.priority_threshold, 1);
480 }
481
482 #[test]
483 fn eprocess_sanitizes_invalid_config() {
484 let oracle = EProcessOracle::new(
485 EProcessConfig {
486 p0: 5.0,
487 lambda: f64::INFINITY,
488 alpha: 0.0,
489 max_evalue: -1.0,
490 },
491 0,
492 );
493
494 oracle.observe_sample(false);
496 oracle.observe_sample(true);
497 let snapshot = oracle.snapshot();
498 assert!(snapshot.evalue.is_finite());
499 assert!(snapshot.evalue >= 0.0);
500 assert!(oracle.rejection_threshold().is_finite());
501 }
502}