use std::sync::Arc;
use std::time::Instant;
use crate::adaptive_load_shedding::{LoadSheddingManager, LoadSheddingStats};
use crate::error::StreamError;
use crate::event::StreamEvent;
use super::admission::{StreamAdmissionController, StreamAdmissionDecision};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BackpressureAction {
Pass,
Drop,
Throttle,
}
#[derive(Debug)]
pub enum SlaBackpressureDecision {
Admit { tokens_left: f64, lag_ms: i64 },
Shed,
Throttle,
Reject(StreamError),
}
impl SlaBackpressureDecision {
pub fn is_admit(&self) -> bool {
matches!(self, Self::Admit { .. })
}
pub fn is_reject(&self) -> bool {
matches!(self, Self::Reject(_))
}
pub fn is_shed(&self) -> bool {
matches!(self, Self::Shed)
}
pub fn is_throttle(&self) -> bool {
matches!(self, Self::Throttle)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum SlaBackpressurePolicy {
#[default]
Strict,
PreferThrottle,
BypassShedder,
}
pub struct SlaBackpressureCoordinator {
admission: Arc<StreamAdmissionController>,
shedder: Arc<LoadSheddingManager>,
policy: SlaBackpressurePolicy,
}
impl SlaBackpressureCoordinator {
pub fn new(
admission: Arc<StreamAdmissionController>,
shedder: Arc<LoadSheddingManager>,
) -> Self {
Self {
admission,
shedder,
policy: SlaBackpressurePolicy::Strict,
}
}
pub fn with_policy(mut self, policy: SlaBackpressurePolicy) -> Self {
self.policy = policy;
self
}
pub async fn evaluate(
&self,
stream_id: &str,
event: &StreamEvent,
event_ts: Instant,
now: Instant,
) -> SlaBackpressureDecision {
let admit = match self.admission.try_admit(stream_id, event_ts, now) {
Ok(decision) => decision,
Err(e) => return SlaBackpressureDecision::Reject(e),
};
if matches!(self.policy, SlaBackpressurePolicy::BypassShedder) {
return decision_from_admit(admit);
}
let drop = self.shedder.should_drop_event(event).await;
if drop {
self.shedder.record_dropped_event(event).await;
match self.policy {
SlaBackpressurePolicy::Strict => return SlaBackpressureDecision::Shed,
SlaBackpressurePolicy::PreferThrottle => return SlaBackpressureDecision::Throttle,
SlaBackpressurePolicy::BypassShedder => {} }
}
decision_from_admit(admit)
}
pub async fn shedder_stats(&self) -> LoadSheddingStats {
self.shedder.get_stats().await
}
pub fn admission(&self) -> &Arc<StreamAdmissionController> {
&self.admission
}
}
fn decision_from_admit(admit: StreamAdmissionDecision) -> SlaBackpressureDecision {
match admit {
StreamAdmissionDecision::Admit {
tokens_left,
lag_ms,
} => SlaBackpressureDecision::Admit {
tokens_left,
lag_ms,
},
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adaptive_load_shedding::LoadSheddingConfig;
use crate::event::EventMetadata;
use crate::sla::StreamSlaConfig;
use chrono::Utc;
use oxirs_core::sla::SlaClass;
use std::time::Duration;
fn heartbeat() -> StreamEvent {
StreamEvent::Heartbeat {
timestamp: Utc::now(),
source: "t".to_string(),
metadata: EventMetadata::default(),
}
}
fn coordinator() -> SlaBackpressureCoordinator {
let admission = Arc::new(StreamAdmissionController::new());
admission.register_stream("orders", StreamSlaConfig::for_class(SlaClass::Platinum));
let shedder_cfg = LoadSheddingConfig {
enable_load_shedding: false, ..Default::default()
};
let shedder = Arc::new(LoadSheddingManager::new(shedder_cfg).expect("shedder ok"));
SlaBackpressureCoordinator::new(admission, shedder)
}
#[tokio::test]
async fn test_admit_when_within_sla_and_no_overload() {
let c = coordinator();
let now = Instant::now();
let dec = c.evaluate("orders", &heartbeat(), now, now).await;
match dec {
SlaBackpressureDecision::Admit { tokens_left, .. } => assert!(tokens_left > 0.0),
other => panic!("expected Admit, got {other:?}"),
}
}
#[tokio::test]
async fn test_reject_when_unregistered_stream() {
let c = coordinator();
let now = Instant::now();
let dec = c.evaluate("ghost", &heartbeat(), now, now).await;
assert!(dec.is_reject());
}
#[tokio::test]
async fn test_reject_takes_precedence_over_shedder() {
let admission = Arc::new(StreamAdmissionController::new());
admission.register_stream("br", StreamSlaConfig::for_class(SlaClass::Bronze));
let shedder_cfg = LoadSheddingConfig {
enable_load_shedding: true,
..Default::default()
};
let shedder = Arc::new(LoadSheddingManager::new(shedder_cfg).unwrap_or_else(|_| {
LoadSheddingManager::new(LoadSheddingConfig {
enable_load_shedding: false,
..Default::default()
})
.expect("fallback shedder")
}));
let coord = SlaBackpressureCoordinator::new(admission, shedder);
let mut admitted = 0;
let mut rejected = 0;
for _ in 0..40 {
let n = Instant::now();
let dec = coord.evaluate("br", &heartbeat(), n, n).await;
if dec.is_admit() {
admitted += 1;
} else if dec.is_reject() {
rejected += 1;
}
}
assert!(admitted >= 1);
assert!(rejected >= 1);
}
#[tokio::test]
async fn test_lag_violation_rejects_via_admission() {
let admission = Arc::new(StreamAdmissionController::new());
admission.register_stream(
"lag",
StreamSlaConfig::for_class(SlaClass::Platinum).with_max_lag(Duration::from_millis(20)),
);
let shedder = Arc::new(
LoadSheddingManager::new(LoadSheddingConfig {
enable_load_shedding: false,
..Default::default()
})
.expect("shedder"),
);
let coord = SlaBackpressureCoordinator::new(admission, shedder);
let event_ts = Instant::now();
tokio::time::sleep(Duration::from_millis(50)).await;
let now = Instant::now();
let dec = coord.evaluate("lag", &heartbeat(), event_ts, now).await;
assert!(dec.is_reject());
}
#[tokio::test]
async fn test_bypass_shedder_policy_skips_shedder() {
let admission = Arc::new(StreamAdmissionController::new());
admission.register_stream("p", StreamSlaConfig::for_class(SlaClass::Platinum));
let shedder = Arc::new(
LoadSheddingManager::new(LoadSheddingConfig {
enable_load_shedding: true,
..Default::default()
})
.expect("shedder"),
);
let coord = SlaBackpressureCoordinator::new(admission, shedder)
.with_policy(SlaBackpressurePolicy::BypassShedder);
let now = Instant::now();
let dec = coord.evaluate("p", &heartbeat(), now, now).await;
assert!(dec.is_admit());
}
}