impl HeijunkaController {
#[must_use]
pub fn new(config: HeijunkaConfig) -> Self {
Self { config }
}
#[must_use]
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_sign_loss)]
pub fn optimal_concurrency(&self, arrival_rate: f64, latency_ms: f64) -> usize {
let optimal = (arrival_rate * latency_ms / 1000.0).ceil() as usize;
optimal.clamp(1, self.config.max_concurrency)
}
#[must_use]
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_sign_loss)]
#[allow(clippy::cast_precision_loss)]
pub fn should_shed_load(
&self,
current_latency_ms: f64,
current_concurrency: usize,
) -> LoadSheddingDecision {
let should_shed = current_latency_ms > self.config.target_latency_ms
&& current_concurrency >= self.config.max_concurrency;
let ratio = self.config.target_latency_ms / current_latency_ms;
let concurrency_f64: f64 = current_concurrency as f64;
let recommended = (concurrency_f64 * ratio).ceil() as usize;
LoadSheddingDecision {
shed_load: should_shed,
recommended_concurrency: recommended.clamp(1, self.config.max_concurrency),
}
}
#[must_use]
pub fn target_latency_ms(&self) -> f64 {
self.config.target_latency_ms
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum AndonTrigger {
ModelChecksumMismatch {
model_id: String,
},
LatencyExceeded {
p99_ms: f64,
threshold_ms: f64,
},
ErrorRateThreshold {
rate: f64,
threshold: f64,
},
ExpertImbalance {
imbalance_ratio: f64,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AndonResponse {
Rollback,
Notify,
Quarantine,
}
impl AndonTrigger {
#[must_use]
pub fn response(&self) -> AndonResponse {
match self {
Self::ModelChecksumMismatch { .. } => AndonResponse::Rollback,
Self::ErrorRateThreshold { rate, threshold } => {
if *rate > threshold * 2.0 {
AndonResponse::Quarantine
} else {
AndonResponse::Notify
}
},
Self::LatencyExceeded { .. } | Self::ExpertImbalance { .. } => AndonResponse::Notify,
}
}
#[must_use]
pub fn is_critical(&self) -> bool {
matches!(
self.response(),
AndonResponse::Rollback | AndonResponse::Quarantine
)
}
}
#[cfg(test)]
mod tests;