use crate::core::providers::Provider;
use std::sync::atomic::{AtomicU8, AtomicU32, AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
pub type DeploymentId = String;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum HealthStatus {
Unknown = 0,
Healthy = 1,
Degraded = 2,
Unhealthy = 3,
Cooldown = 4,
}
impl From<u8> for HealthStatus {
fn from(value: u8) -> Self {
match value {
1 => HealthStatus::Healthy,
2 => HealthStatus::Degraded,
3 => HealthStatus::Unhealthy,
4 => HealthStatus::Cooldown,
_ => HealthStatus::Unknown,
}
}
}
impl From<HealthStatus> for u8 {
fn from(status: HealthStatus) -> Self {
status as u8
}
}
#[derive(Debug, Clone)]
pub struct DeploymentConfig {
pub tpm_limit: Option<u64>,
pub rpm_limit: Option<u64>,
pub max_parallel_requests: Option<u32>,
pub weight: u32,
pub timeout_secs: u64,
pub priority: u32,
}
impl Default for DeploymentConfig {
fn default() -> Self {
Self {
tpm_limit: None,
rpm_limit: None,
max_parallel_requests: None,
weight: 1,
timeout_secs: 60,
priority: 0,
}
}
}
#[derive(Debug)]
pub struct DeploymentState {
pub health: AtomicU8,
pub tpm_current: AtomicU64,
pub rpm_current: AtomicU64,
pub active_requests: AtomicU32,
pub total_requests: AtomicU64,
pub success_requests: AtomicU64,
pub fail_requests: AtomicU64,
pub fails_this_minute: AtomicU32,
pub cooldown_until: AtomicU64,
pub last_request_at: AtomicU64,
pub avg_latency_us: AtomicU64,
pub consecutive_successes: AtomicU32,
pub minute_reset_at: AtomicU64,
}
impl DeploymentState {
pub fn new() -> Self {
let now = current_timestamp();
Self {
health: AtomicU8::new(HealthStatus::Healthy as u8),
tpm_current: AtomicU64::new(0),
rpm_current: AtomicU64::new(0),
active_requests: AtomicU32::new(0),
total_requests: AtomicU64::new(0),
success_requests: AtomicU64::new(0),
fail_requests: AtomicU64::new(0),
fails_this_minute: AtomicU32::new(0),
cooldown_until: AtomicU64::new(0),
last_request_at: AtomicU64::new(0),
avg_latency_us: AtomicU64::new(0),
consecutive_successes: AtomicU32::new(0),
minute_reset_at: AtomicU64::new(now),
}
}
pub fn reset_minute(&self) {
self.tpm_current.store(0, Ordering::Relaxed);
self.rpm_current.store(0, Ordering::Relaxed);
self.fails_this_minute.store(0, Ordering::Relaxed);
self.minute_reset_at
.store(current_timestamp(), Ordering::Relaxed);
}
pub fn health_status(&self) -> HealthStatus {
self.health.load(Ordering::Relaxed).into()
}
}
impl Default for DeploymentState {
fn default() -> Self {
Self::new()
}
}
impl Clone for DeploymentState {
fn clone(&self) -> Self {
Self {
health: AtomicU8::new(self.health.load(Ordering::Relaxed)),
tpm_current: AtomicU64::new(self.tpm_current.load(Ordering::Relaxed)),
rpm_current: AtomicU64::new(self.rpm_current.load(Ordering::Relaxed)),
active_requests: AtomicU32::new(self.active_requests.load(Ordering::Relaxed)),
total_requests: AtomicU64::new(self.total_requests.load(Ordering::Relaxed)),
success_requests: AtomicU64::new(self.success_requests.load(Ordering::Relaxed)),
fail_requests: AtomicU64::new(self.fail_requests.load(Ordering::Relaxed)),
fails_this_minute: AtomicU32::new(self.fails_this_minute.load(Ordering::Relaxed)),
cooldown_until: AtomicU64::new(self.cooldown_until.load(Ordering::Relaxed)),
last_request_at: AtomicU64::new(self.last_request_at.load(Ordering::Relaxed)),
avg_latency_us: AtomicU64::new(self.avg_latency_us.load(Ordering::Relaxed)),
consecutive_successes: AtomicU32::new(
self.consecutive_successes.load(Ordering::Relaxed),
),
minute_reset_at: AtomicU64::new(self.minute_reset_at.load(Ordering::Relaxed)),
}
}
}
#[derive(Debug, Clone)]
pub struct Deployment {
pub id: DeploymentId,
pub provider: Provider,
pub model: String,
pub model_name: String,
pub config: DeploymentConfig,
pub state: DeploymentState,
pub tags: Vec<String>,
}
impl Deployment {
pub fn new(id: DeploymentId, provider: Provider, model: String, model_name: String) -> Self {
Self {
id,
provider,
model,
model_name,
config: DeploymentConfig::default(),
state: DeploymentState::new(),
tags: Vec::new(),
}
}
pub fn with_config(mut self, config: DeploymentConfig) -> Self {
self.config = config;
self
}
pub fn with_tags(mut self, tags: Vec<String>) -> Self {
self.tags = tags;
self
}
pub fn is_healthy(&self) -> bool {
let status = self.state.health_status();
matches!(status, HealthStatus::Healthy | HealthStatus::Degraded)
}
pub fn is_in_cooldown(&self) -> bool {
let cooldown_until = self.state.cooldown_until.load(Ordering::Relaxed);
if cooldown_until == 0 {
return false;
}
let now = current_timestamp();
if cooldown_until > now {
return true;
}
self.state
.health
.compare_exchange(
HealthStatus::Cooldown as u8,
HealthStatus::Degraded as u8,
Ordering::Relaxed,
Ordering::Relaxed,
)
.ok();
false
}
pub fn record_success(&self, tokens: u64, latency_us: u64) {
self.state.total_requests.fetch_add(1, Ordering::Relaxed);
self.state.success_requests.fetch_add(1, Ordering::Relaxed);
self.state.tpm_current.fetch_add(tokens, Ordering::Relaxed);
self.state.rpm_current.fetch_add(1, Ordering::Relaxed);
self.state
.last_request_at
.store(current_timestamp(), Ordering::Relaxed);
let current_avg = self.state.avg_latency_us.load(Ordering::Relaxed);
let new_avg = if current_avg == 0 {
latency_us
} else {
(latency_us + 4 * current_avg) / 5
};
self.state.avg_latency_us.store(new_avg, Ordering::Relaxed);
self.state
.consecutive_successes
.fetch_add(1, Ordering::Relaxed);
}
pub fn record_failure(&self) {
self.state.total_requests.fetch_add(1, Ordering::Relaxed);
self.state.fail_requests.fetch_add(1, Ordering::Relaxed);
self.state.fails_this_minute.fetch_add(1, Ordering::Relaxed);
self.state
.last_request_at
.store(current_timestamp(), Ordering::Relaxed);
self.state.consecutive_successes.store(0, Ordering::Relaxed);
self.state
.health
.store(HealthStatus::Degraded as u8, Ordering::Relaxed);
}
pub fn enter_cooldown(&self, duration_secs: u64) {
let cooldown_until = current_timestamp() + duration_secs;
self.state
.cooldown_until
.store(cooldown_until, Ordering::Relaxed);
self.state
.health
.store(HealthStatus::Cooldown as u8, Ordering::Relaxed);
}
}
fn current_timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| std::time::Duration::from_secs(0))
.as_secs()
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::Ordering;
#[test]
fn test_health_status_from_u8_healthy() {
assert_eq!(HealthStatus::from(1), HealthStatus::Healthy);
}
#[test]
fn test_health_status_from_u8_degraded() {
assert_eq!(HealthStatus::from(2), HealthStatus::Degraded);
}
#[test]
fn test_health_status_from_u8_unhealthy() {
assert_eq!(HealthStatus::from(3), HealthStatus::Unhealthy);
}
#[test]
fn test_health_status_from_u8_cooldown() {
assert_eq!(HealthStatus::from(4), HealthStatus::Cooldown);
}
#[test]
fn test_health_status_from_u8_unknown() {
assert_eq!(HealthStatus::from(0), HealthStatus::Unknown);
assert_eq!(HealthStatus::from(255), HealthStatus::Unknown);
}
#[test]
fn test_health_status_to_u8() {
assert_eq!(u8::from(HealthStatus::Unknown), 0);
assert_eq!(u8::from(HealthStatus::Healthy), 1);
assert_eq!(u8::from(HealthStatus::Degraded), 2);
assert_eq!(u8::from(HealthStatus::Unhealthy), 3);
assert_eq!(u8::from(HealthStatus::Cooldown), 4);
}
#[test]
fn test_health_status_clone() {
let status = HealthStatus::Healthy;
let cloned = status;
assert_eq!(status, cloned);
}
#[test]
fn test_deployment_config_default() {
let config = DeploymentConfig::default();
assert!(config.tpm_limit.is_none());
assert!(config.rpm_limit.is_none());
assert!(config.max_parallel_requests.is_none());
assert_eq!(config.weight, 1);
assert_eq!(config.timeout_secs, 60);
assert_eq!(config.priority, 0);
}
#[test]
fn test_deployment_config_custom() {
let config = DeploymentConfig {
tpm_limit: Some(100_000),
rpm_limit: Some(500),
max_parallel_requests: Some(10),
weight: 2,
timeout_secs: 120,
priority: 1,
};
assert_eq!(config.tpm_limit, Some(100_000));
assert_eq!(config.rpm_limit, Some(500));
assert_eq!(config.max_parallel_requests, Some(10));
assert_eq!(config.weight, 2);
}
#[test]
fn test_deployment_config_clone() {
let config = DeploymentConfig {
tpm_limit: Some(50_000),
rpm_limit: Some(100),
..DeploymentConfig::default()
};
let cloned = config.clone();
assert_eq!(config.tpm_limit, cloned.tpm_limit);
assert_eq!(config.rpm_limit, cloned.rpm_limit);
}
#[test]
fn test_deployment_state_new() {
let state = DeploymentState::new();
assert_eq!(state.health_status(), HealthStatus::Healthy);
assert_eq!(state.tpm_current.load(Ordering::Relaxed), 0);
assert_eq!(state.rpm_current.load(Ordering::Relaxed), 0);
assert_eq!(state.active_requests.load(Ordering::Relaxed), 0);
}
#[test]
fn test_deployment_state_default() {
let state = DeploymentState::default();
assert_eq!(state.health_status(), HealthStatus::Healthy);
}
#[test]
fn test_deployment_state_reset_minute() {
let state = DeploymentState::new();
state.tpm_current.store(1000, Ordering::Relaxed);
state.rpm_current.store(50, Ordering::Relaxed);
state.fails_this_minute.store(5, Ordering::Relaxed);
state.reset_minute();
assert_eq!(state.tpm_current.load(Ordering::Relaxed), 0);
assert_eq!(state.rpm_current.load(Ordering::Relaxed), 0);
assert_eq!(state.fails_this_minute.load(Ordering::Relaxed), 0);
}
#[test]
fn test_deployment_state_health_status() {
let state = DeploymentState::new();
state
.health
.store(HealthStatus::Degraded as u8, Ordering::Relaxed);
assert_eq!(state.health_status(), HealthStatus::Degraded);
}
#[test]
fn test_deployment_state_clone() {
let state = DeploymentState::new();
state.total_requests.store(100, Ordering::Relaxed);
state.success_requests.store(95, Ordering::Relaxed);
let cloned = state.clone();
assert_eq!(cloned.total_requests.load(Ordering::Relaxed), 100);
assert_eq!(cloned.success_requests.load(Ordering::Relaxed), 95);
}
#[test]
fn test_current_timestamp() {
let ts = current_timestamp();
assert!(ts > 0);
assert!(ts > 1577836800); }
#[test]
fn test_current_timestamp_monotonic() {
let ts1 = current_timestamp();
let ts2 = current_timestamp();
assert!(ts2 >= ts1);
}
}