1#![forbid(unsafe_code)]
2
3use std::cell::RefCell;
46use std::sync::{Arc, Mutex};
47use std::time::Instant;
48
49use crate::conformal_alert::{AlertConfig, AlertDecision, AlertStats, ConformalAlert};
50use crate::resize_coalescer::{DecisionLog, TelemetryHooks};
51use crate::voi_sampling::{VoiConfig, VoiSampler, VoiSummary};
52
53#[derive(Debug, Clone)]
55pub struct SlaConfig {
56 pub alpha: f64,
59
60 pub min_calibration: usize,
63
64 pub max_calibration: usize,
67
68 pub target_latency_ms: f64,
72
73 pub enable_logging: bool,
76
77 pub alert_cooldown: u64,
80
81 pub hysteresis: f64,
84
85 pub voi_sampling: Option<VoiConfig>,
88}
89
90impl Default for SlaConfig {
91 fn default() -> Self {
92 Self {
93 alpha: 0.05,
94 min_calibration: 20,
95 max_calibration: 200,
96 target_latency_ms: 100.0,
97 enable_logging: true,
98 alert_cooldown: 10,
99 hysteresis: 1.1,
100 voi_sampling: None,
101 }
102 }
103}
104
105#[derive(Debug, Clone)]
107pub struct ResizeEvidence {
108 pub timestamp: Instant,
110 pub latency_ms: f64,
112 pub applied_size: (u16, u16),
114 pub forced: bool,
116 pub regime: &'static str,
118 pub coalesce_ms: Option<f64>,
120}
121
122#[derive(Debug, Clone)]
124pub struct SlaLogEntry {
125 pub event_idx: u64,
127 pub event_type: &'static str,
129 pub latency_ms: f64,
131 pub target_latency_ms: f64,
133 pub threshold_ms: f64,
135 pub e_value: f64,
137 pub is_alert: bool,
139 pub alert_reason: Option<String>,
141 pub applied_size: (u16, u16),
143 pub forced: bool,
145}
146
147#[derive(Debug, Clone)]
149pub struct SlaSummary {
150 pub total_events: u64,
152 pub calibration_events: usize,
154 pub total_alerts: u64,
156 pub current_threshold_ms: f64,
158 pub mean_latency_ms: f64,
160 pub std_latency_ms: f64,
162 pub current_e_value: f64,
164 pub empirical_fpr: f64,
166 pub target_latency_ms: f64,
168}
169
170pub struct ResizeSlaMonitor {
175 config: SlaConfig,
176 alerter: RefCell<ConformalAlert>,
177 event_count: RefCell<u64>,
178 total_alerts: RefCell<u64>,
179 last_alert: RefCell<Option<AlertDecision>>,
180 logs: RefCell<Vec<SlaLogEntry>>,
181 sampler: RefCell<Option<VoiSampler>>,
182}
183
184impl ResizeSlaMonitor {
185 pub fn new(config: SlaConfig) -> Self {
187 let alert_config = AlertConfig {
188 alpha: config.alpha,
189 min_calibration: config.min_calibration,
190 max_calibration: config.max_calibration,
191 enable_logging: config.enable_logging,
192 hysteresis: config.hysteresis,
193 alert_cooldown: config.alert_cooldown,
194 ..AlertConfig::default()
195 };
196 let sampler = config.voi_sampling.clone().map(VoiSampler::new);
197
198 Self {
199 config,
200 alerter: RefCell::new(ConformalAlert::new(alert_config)),
201 event_count: RefCell::new(0),
202 total_alerts: RefCell::new(0),
203 last_alert: RefCell::new(None),
204 logs: RefCell::new(Vec::new()),
205 sampler: RefCell::new(sampler),
206 }
207 }
208
209 pub fn on_decision(&self, entry: &DecisionLog) -> Option<AlertDecision> {
211 let latency_ms = entry.coalesce_ms.unwrap_or(entry.time_since_render_ms);
213 let applied_size = entry.applied_size?;
214 if let Some(ref mut sampler) = *self.sampler.borrow_mut() {
215 let decision = sampler.decide(entry.timestamp);
216 if !decision.should_sample {
217 return None;
218 }
219 let result = self.process_latency(latency_ms, applied_size, entry.forced);
220 let violated = latency_ms > self.config.target_latency_ms;
221 sampler.observe_at(violated, entry.timestamp);
222 return result;
223 }
224
225 self.process_latency(latency_ms, applied_size, entry.forced)
226 }
227
228 fn process_latency(
230 &self,
231 latency_ms: f64,
232 applied_size: (u16, u16),
233 forced: bool,
234 ) -> Option<AlertDecision> {
235 *self.event_count.borrow_mut() += 1;
236 let event_idx = *self.event_count.borrow();
237
238 let mut alerter = self.alerter.borrow_mut();
239
240 if alerter.calibration_count() < self.config.min_calibration {
242 alerter.calibrate(latency_ms);
243
244 if self.config.enable_logging {
245 self.logs.borrow_mut().push(SlaLogEntry {
246 event_idx,
247 event_type: "calibrate",
248 latency_ms,
249 target_latency_ms: self.config.target_latency_ms,
250 threshold_ms: alerter.threshold(),
251 e_value: alerter.e_value(),
252 is_alert: false,
253 alert_reason: None,
254 applied_size,
255 forced,
256 });
257 }
258
259 return None;
260 }
261
262 let decision = alerter.observe(latency_ms);
264
265 if self.config.enable_logging {
266 self.logs.borrow_mut().push(SlaLogEntry {
267 event_idx,
268 event_type: if decision.is_alert {
269 "alert"
270 } else {
271 "observe"
272 },
273 latency_ms,
274 target_latency_ms: self.config.target_latency_ms,
275 threshold_ms: decision.evidence.conformal_threshold,
276 e_value: decision.evidence.e_value,
277 is_alert: decision.is_alert,
278 alert_reason: if decision.is_alert {
279 Some(format!("{:?}", decision.evidence.reason))
280 } else {
281 None
282 },
283 applied_size,
284 forced,
285 });
286 }
287
288 if decision.is_alert {
289 *self.total_alerts.borrow_mut() += 1;
290 *self.last_alert.borrow_mut() = Some(decision.clone());
291 }
292
293 Some(decision)
294 }
295
296 pub fn last_alert(&self) -> Option<AlertDecision> {
298 self.last_alert.borrow().clone()
299 }
300
301 pub fn summary(&self) -> SlaSummary {
303 let alerter = self.alerter.borrow();
304 let stats = alerter.stats();
305
306 SlaSummary {
307 total_events: *self.event_count.borrow(),
308 calibration_events: stats.calibration_samples,
309 total_alerts: *self.total_alerts.borrow(),
310 current_threshold_ms: stats.current_threshold,
311 mean_latency_ms: stats.calibration_mean,
312 std_latency_ms: stats.calibration_std,
313 current_e_value: stats.current_e_value,
314 empirical_fpr: stats.empirical_fpr,
315 target_latency_ms: self.config.target_latency_ms,
316 }
317 }
318
319 pub fn alerter_stats(&self) -> AlertStats {
321 self.alerter.borrow().stats()
322 }
323
324 pub fn logs(&self) -> Vec<SlaLogEntry> {
326 self.logs.borrow().clone()
327 }
328
329 pub fn logs_to_jsonl(&self) -> String {
331 let logs = self.logs.borrow();
332 let mut output = String::new();
333
334 for entry in logs.iter() {
335 let line = format!(
336 r#"{{"event":"sla","idx":{},"type":"{}","latency_ms":{:.3},"target_ms":{:.1},"threshold_ms":{:.3},"e_value":{:.6},"alert":{},"reason":{},"size":[{},{}],"forced":{}}}"#,
337 entry.event_idx,
338 entry.event_type,
339 entry.latency_ms,
340 entry.target_latency_ms,
341 entry.threshold_ms,
342 entry.e_value,
343 entry.is_alert,
344 entry
345 .alert_reason
346 .as_ref()
347 .map(|r| format!("\"{}\"", r))
348 .unwrap_or_else(|| "null".to_string()),
349 entry.applied_size.0,
350 entry.applied_size.1,
351 entry.forced
352 );
353 output.push_str(&line);
354 output.push('\n');
355 }
356
357 output
358 }
359
360 pub fn clear_logs(&self) {
362 self.logs.borrow_mut().clear();
363 }
364
365 pub fn reset(&self) {
367 let alert_config = AlertConfig {
368 alpha: self.config.alpha,
369 min_calibration: self.config.min_calibration,
370 max_calibration: self.config.max_calibration,
371 enable_logging: self.config.enable_logging,
372 hysteresis: self.config.hysteresis,
373 alert_cooldown: self.config.alert_cooldown,
374 ..AlertConfig::default()
375 };
376
377 *self.alerter.borrow_mut() = ConformalAlert::new(alert_config);
378 *self.event_count.borrow_mut() = 0;
379 *self.total_alerts.borrow_mut() = 0;
380 *self.last_alert.borrow_mut() = None;
381 self.logs.borrow_mut().clear();
382 *self.sampler.borrow_mut() = self.config.voi_sampling.clone().map(VoiSampler::new);
383 }
384
385 pub fn threshold_ms(&self) -> f64 {
387 self.alerter.borrow().threshold()
388 }
389
390 pub fn is_active(&self) -> bool {
392 self.alerter.borrow().calibration_count() >= self.config.min_calibration
393 }
394
395 pub fn calibration_count(&self) -> usize {
397 self.alerter.borrow().calibration_count()
398 }
399
400 pub fn sampling_summary(&self) -> Option<VoiSummary> {
402 self.sampler.borrow().as_ref().map(VoiSampler::summary)
403 }
404
405 pub fn sampling_logs_to_jsonl(&self) -> Option<String> {
407 self.sampler
408 .borrow()
409 .as_ref()
410 .map(|sampler| sampler.logs_to_jsonl())
411 }
412}
413
414pub fn make_sla_hooks(config: SlaConfig) -> (TelemetryHooks, Arc<Mutex<ResizeSlaMonitor>>) {
422 let monitor = Arc::new(Mutex::new(ResizeSlaMonitor::new(config)));
423 let monitor_clone = Arc::clone(&monitor);
424
425 let hooks = TelemetryHooks::new().on_resize_applied(move |entry: &DecisionLog| {
427 if (entry.action == "apply" || entry.action == "apply_forced")
429 && let Ok(monitor) = monitor_clone.lock()
430 {
431 monitor.on_decision(entry);
432 }
433 });
434
435 (hooks, monitor)
436}
437
438#[cfg(test)]
443mod tests {
444 use super::*;
445 use crate::resize_coalescer::Regime;
446
447 fn test_config() -> SlaConfig {
448 SlaConfig {
449 alpha: 0.05,
450 min_calibration: 5,
451 max_calibration: 50,
452 target_latency_ms: 50.0,
453 enable_logging: true,
454 alert_cooldown: 0,
455 hysteresis: 1.0,
456 voi_sampling: None,
457 }
458 }
459
460 fn sample_decision_log(now: Instant, latency_ms: f64) -> DecisionLog {
461 DecisionLog {
462 timestamp: now,
463 elapsed_ms: 0.0,
464 event_idx: 1,
465 dt_ms: 0.0,
466 event_rate: 0.0,
467 regime: Regime::Steady,
468 action: "apply",
469 pending_size: None,
470 applied_size: Some((80, 24)),
471 time_since_render_ms: latency_ms,
472 coalesce_ms: Some(latency_ms),
473 forced: false,
474 }
475 }
476
477 #[test]
482 fn initial_state() {
483 let monitor = ResizeSlaMonitor::new(test_config());
484
485 assert!(!monitor.is_active());
486 assert_eq!(monitor.calibration_count(), 0);
487 assert!(monitor.last_alert().is_none());
488 assert!(monitor.logs().is_empty());
489 }
490
491 #[test]
492 fn calibration_phase() {
493 let monitor = ResizeSlaMonitor::new(test_config());
494
495 for i in 0..5 {
497 let result = monitor.process_latency(10.0 + i as f64, (80, 24), false);
498 assert!(result.is_none(), "Should be in calibration phase");
499 }
500
501 assert!(monitor.is_active());
502 assert_eq!(monitor.calibration_count(), 5);
503 }
504
505 #[test]
506 fn detection_phase_normal() {
507 let monitor = ResizeSlaMonitor::new(test_config());
508
509 for i in 0..5 {
511 monitor.process_latency(10.0 + i as f64, (80, 24), false);
512 }
513
514 let result = monitor.process_latency(12.0, (80, 24), false);
516 assert!(result.is_some());
517 assert!(!result.unwrap().is_alert);
518 }
519
520 #[test]
521 fn detection_phase_alert() {
522 let mut config = test_config();
523 config.hysteresis = 0.1; let monitor = ResizeSlaMonitor::new(config);
525
526 for _ in 0..5 {
528 monitor.process_latency(10.0, (80, 24), false);
529 }
530
531 let result = monitor.process_latency(1000.0, (80, 24), false);
533 assert!(result.is_some());
534
535 let decision = result.unwrap();
536 assert!(
537 decision.evidence.conformal_alert || decision.evidence.eprocess_alert,
538 "Extreme latency should trigger alert"
539 );
540 }
541
542 #[test]
547 fn logging_captures_events() {
548 let monitor = ResizeSlaMonitor::new(test_config());
549
550 for i in 0..5 {
552 monitor.process_latency(10.0 + i as f64, (80, 24), false);
553 }
554
555 monitor.process_latency(12.0, (80, 24), false);
557 monitor.process_latency(15.0, (100, 40), true);
558
559 let logs = monitor.logs();
560 assert_eq!(logs.len(), 7);
561
562 assert_eq!(logs[0].event_type, "calibrate");
564 assert_eq!(logs[4].event_type, "calibrate");
565
566 assert_eq!(logs[5].event_type, "observe");
568 assert_eq!(logs[6].applied_size, (100, 40));
569 assert!(logs[6].forced);
570 }
571
572 #[test]
573 fn jsonl_format() {
574 let monitor = ResizeSlaMonitor::new(test_config());
575
576 for i in 0..5 {
581 monitor.process_latency(10.0 + i as f64, (80, 24), false);
582 }
583 monitor.process_latency(12.0, (80, 24), false);
584
585 let jsonl = monitor.logs_to_jsonl();
586 assert!(jsonl.contains(r#""event":"sla""#));
587 assert!(jsonl.contains(r#""type":"calibrate""#));
588 assert!(jsonl.contains(r#""type":"observe""#));
589 assert!(jsonl.contains(r#""latency_ms":"#));
590 assert!(jsonl.contains(r#""threshold_ms":"#));
591 }
592
593 #[test]
598 fn summary_reflects_state() {
599 let monitor = ResizeSlaMonitor::new(test_config());
600
601 for i in 0..10 {
602 monitor.process_latency(10.0 + (i as f64) * 2.0, (80, 24), false);
603 }
604
605 let summary = monitor.summary();
606 assert_eq!(summary.total_events, 10);
607 assert!(summary.mean_latency_ms > 0.0);
608 assert!(summary.current_threshold_ms > 0.0);
609 assert_eq!(summary.target_latency_ms, 50.0);
610 }
611
612 #[test]
617 fn reset_clears_state() {
618 let monitor = ResizeSlaMonitor::new(test_config());
619
620 for i in 0..10 {
621 monitor.process_latency(10.0 + i as f64, (80, 24), false);
622 }
623
624 assert!(monitor.is_active());
625 assert!(!monitor.logs().is_empty());
626
627 monitor.reset();
628
629 assert!(!monitor.is_active());
630 assert!(monitor.logs().is_empty());
631 assert_eq!(monitor.calibration_count(), 0);
632 }
633
634 #[test]
639 fn on_decision_processes_entry() {
640 use crate::resize_coalescer::Regime;
641
642 let monitor = ResizeSlaMonitor::new(test_config());
643
644 let entry = DecisionLog {
646 timestamp: std::time::Instant::now(),
647 elapsed_ms: 0.0,
648 event_idx: 1,
649 dt_ms: 0.0,
650 event_rate: 0.0,
651 regime: Regime::Steady,
652 action: "apply",
653 pending_size: None,
654 applied_size: Some((100, 40)),
655 time_since_render_ms: 15.0,
656 coalesce_ms: Some(15.0),
657 forced: false,
658 };
659
660 let result = monitor.on_decision(&entry);
661 assert!(result.is_none()); for i in 0..5 {
665 let entry = DecisionLog {
666 timestamp: std::time::Instant::now(),
667 elapsed_ms: 0.0,
668 event_idx: 2 + i,
669 dt_ms: 0.0,
670 event_rate: 0.0,
671 regime: Regime::Steady,
672 action: "apply",
673 pending_size: None,
674 applied_size: Some((100, 40)),
675 time_since_render_ms: 15.0 + i as f64,
676 coalesce_ms: Some(15.0 + i as f64),
677 forced: false,
678 };
679 monitor.on_decision(&entry);
680 }
681
682 assert!(monitor.is_active());
683 }
684
685 #[test]
690 fn make_sla_hooks_creates_valid_hooks() {
691 let (_hooks, monitor) = make_sla_hooks(test_config());
692
693 let monitor = monitor.lock().expect("sla monitor lock");
695 assert!(!monitor.is_active());
696 assert_eq!(monitor.calibration_count(), 0);
697 }
698
699 #[test]
704 fn property_calibration_mean_accurate() {
705 let monitor = ResizeSlaMonitor::new(test_config());
706
707 let samples: Vec<f64> = vec![10.0, 20.0, 30.0, 40.0, 50.0];
708 let expected_mean: f64 = samples.iter().sum::<f64>() / samples.len() as f64;
709
710 for &s in &samples {
711 monitor.process_latency(s, (80, 24), false);
712 }
713
714 let summary = monitor.summary();
715 assert!(
716 (summary.mean_latency_ms - expected_mean).abs() < 0.01,
717 "Mean should be accurate: {} vs {}",
718 summary.mean_latency_ms,
719 expected_mean
720 );
721 }
722
723 #[test]
724 fn property_alert_count_nondecreasing() {
725 let mut config = test_config();
726 config.hysteresis = 0.1;
727 config.alert_cooldown = 0;
728 let monitor = ResizeSlaMonitor::new(config);
729
730 for _ in 0..5 {
732 monitor.process_latency(10.0, (80, 24), false);
733 }
734
735 let mut prev_alerts = 0u64;
736 for i in 0..20 {
737 let latency = if i % 3 == 0 { 1000.0 } else { 10.0 };
738 monitor.process_latency(latency, (80, 24), false);
739
740 let current_alerts = *monitor.total_alerts.borrow();
741 assert!(
742 current_alerts >= prev_alerts,
743 "Alert count should be non-decreasing"
744 );
745 prev_alerts = current_alerts;
746 }
747 }
748
749 #[test]
750 fn deterministic_behavior() {
751 let config = test_config();
752
753 let run = || {
754 let monitor = ResizeSlaMonitor::new(config.clone());
755 for i in 0..10 {
756 monitor.process_latency(10.0 + i as f64, (80, 24), false);
757 }
758 (
759 monitor.summary().mean_latency_ms,
760 monitor.threshold_ms(),
761 *monitor.total_alerts.borrow(),
762 )
763 };
764
765 let (m1, t1, a1) = run();
766 let (m2, t2, a2) = run();
767
768 assert!((m1 - m2).abs() < 1e-10, "Mean must be deterministic");
769 assert!((t1 - t2).abs() < 1e-10, "Threshold must be deterministic");
770 assert_eq!(a1, a2, "Alert count must be deterministic");
771 }
772
773 #[test]
774 fn voi_sampling_skips_when_policy_says_no() {
775 let mut config = test_config();
776 config.voi_sampling = Some(VoiConfig {
777 sample_cost: 10.0,
778 max_interval_events: 0,
779 max_interval_ms: 0,
780 ..VoiConfig::default()
781 });
782 let monitor = ResizeSlaMonitor::new(config);
783
784 let entry = sample_decision_log(Instant::now(), 12.0);
785 let result = monitor.on_decision(&entry);
786 assert!(result.is_none(), "Sampling should skip under high cost");
787
788 let summary = monitor.summary();
789 assert_eq!(summary.total_events, 0);
790 let sampling = monitor.sampling_summary().expect("sampling summary");
791 assert_eq!(sampling.total_events, 1);
792 }
793
794 #[test]
795 fn voi_sampling_forced_sample_records_event() {
796 let mut config = test_config();
797 config.min_calibration = 0;
800 config.voi_sampling = Some(VoiConfig {
801 sample_cost: 10.0,
802 max_interval_events: 1,
803 ..VoiConfig::default()
804 });
805 let monitor = ResizeSlaMonitor::new(config);
806
807 let entry = sample_decision_log(Instant::now(), 12.0);
808 let result = monitor.on_decision(&entry);
809 assert!(result.is_some());
810
811 let summary = monitor.summary();
812 assert_eq!(summary.total_events, 1);
813 let sampling = monitor.sampling_summary().expect("sampling summary");
814 assert_eq!(sampling.total_samples, 1);
815 }
816}