pub mod detector;
pub mod histogram;
pub mod prometheus;
pub mod vllm;
pub mod warmup;
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq, Hash)]
pub enum EngineType {
Vllm,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq, Hash)]
pub enum DeploymentMode {
Docker,
Native,
}
impl std::fmt::Display for EngineType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
EngineType::Vllm => write!(f, "vLLM"),
}
}
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
#[serde(tag = "type", content = "message")]
pub enum EngineStatus {
Running,
Loading,
Stopped,
Error(String),
}
#[derive(Clone, Debug, serde::Serialize)]
pub struct ModelInfo {
pub name: String,
pub parameter_size: Option<String>,
pub quantization: Option<String>,
}
#[derive(Clone, Debug, serde::Serialize, Default)]
pub struct LatencyPercentiles {
pub p50_ms: Option<f64>,
pub p95_ms: Option<f64>,
pub p99_ms: Option<f64>,
}
pub const TTFT_SLO_MS: f64 = 500.0;
pub const ITL_SLO_MS: f64 = 50.0;
pub const E2E_SLO_MS: f64 = 5000.0;
#[derive(Clone, Debug, serde::Serialize, Default)]
pub struct EngineMetrics {
pub tokens_per_sec: Option<f64>,
pub avg_tokens_per_sec: Option<f64>,
pub per_request_tps: Option<f64>,
pub ttft_ms: Option<f64>,
pub active_requests: Option<u64>,
pub queued_requests: Option<u64>,
pub kv_cache_percent: Option<f64>,
pub kv_cache_is_estimated: bool,
pub total_requests: Option<u64>,
pub e2e_latency_ms: Option<f64>,
pub prompt_tokens_per_sec: Option<f64>,
pub avg_prompt_tokens_per_sec: Option<f64>,
pub per_request_prompt_tps: Option<f64>,
pub swapped_requests: Option<u64>,
pub prefix_cache_hit_rate: Option<f64>,
pub queue_time_ms: Option<f64>,
pub inter_token_latency_ms: Option<f64>,
pub preemptions_total: Option<u64>,
pub avg_batch_size: Option<f64>,
pub ttft_percentiles: Option<LatencyPercentiles>,
pub itl_percentiles: Option<LatencyPercentiles>,
pub e2e_percentiles: Option<LatencyPercentiles>,
pub ttft_goodput_pct: Option<f64>,
pub itl_goodput_pct: Option<f64>,
pub e2e_goodput_pct: Option<f64>,
pub warming_up: bool,
}
#[derive(Clone, Debug, serde::Serialize)]
pub struct RecentRequest {
pub start_ms: u64,
pub end_ms: u64,
pub tokens_per_sec: f64,
pub ttft_ms: f64,
}
#[derive(Clone, Debug, serde::Serialize)]
pub struct EngineSnapshot {
pub engine_type: EngineType,
pub endpoint: String,
pub status: EngineStatus,
pub model: Option<ModelInfo>,
pub metrics: Option<EngineMetrics>,
pub recent_requests: Vec<RecentRequest>,
pub deployment_mode: DeploymentMode,
}
#[async_trait]
pub trait EngineAdapter: Send + Sync {
fn engine_type(&self) -> EngineType;
fn endpoint(&self) -> &str;
async fn health_check(&self) -> EngineStatus;
async fn get_model_info(&self) -> Option<ModelInfo>;
async fn get_metrics(&self) -> Option<EngineMetrics>;
}
pub struct EngineState {
pub adapter: Box<dyn EngineAdapter>,
pub consecutive_failures: u32,
pub last_seen: Instant,
pub status: EngineStatus,
pub stopped_at: Option<Instant>,
pub deployment_mode: DeploymentMode,
}
impl EngineState {
pub fn new(adapter: Box<dyn EngineAdapter>, deployment_mode: DeploymentMode) -> Self {
Self {
adapter,
consecutive_failures: 0,
last_seen: Instant::now(),
status: EngineStatus::Running,
stopped_at: None,
deployment_mode,
}
}
pub fn record_probe_result(&mut self, success: bool) {
if success {
self.consecutive_failures = 0;
self.last_seen = Instant::now();
self.status = EngineStatus::Running;
self.stopped_at = None;
} else {
self.consecutive_failures += 1;
if self.consecutive_failures >= 3 {
if self.stopped_at.is_none() {
self.stopped_at = Some(Instant::now());
}
self.status = EngineStatus::Stopped;
}
}
}
pub fn should_remove(&self) -> bool {
if let Some(stopped) = self.stopped_at {
self.status == EngineStatus::Stopped && stopped.elapsed() > Duration::from_secs(30)
} else {
false
}
}
}
#[derive(Clone, Debug)]
pub struct EngineOverride {
pub engine_type: EngineType,
pub endpoint: String,
}
pub fn create_adapter(
engine_type: EngineType,
endpoint: String,
client: reqwest::Client,
model_hint: Option<String>,
) -> Box<dyn EngineAdapter> {
match engine_type {
EngineType::Vllm => Box::new(vllm::VllmAdapter::new(client, endpoint, model_hint)),
}
}
pub async fn engine_collector_loop(
shared_snapshots: Arc<RwLock<Vec<EngineSnapshot>>>,
overrides: Vec<EngineOverride>,
) {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(2))
.build()
.unwrap_or_default();
let mut sys = sysinfo::System::new();
let mut engine_map: HashMap<(EngineType, String), EngineState> = HashMap::new();
for ov in &overrides {
let adapter = create_adapter(
ov.engine_type.clone(),
ov.endpoint.clone(),
client.clone(),
None,
);
let key = (ov.engine_type.clone(), ov.endpoint.clone());
engine_map.insert(key, EngineState::new(adapter, DeploymentMode::Native));
tracing::info!(
"Manual engine override registered: {} at {}",
ov.engine_type,
ov.endpoint
);
}
let mut detection_interval = tokio::time::interval(Duration::from_secs(5));
let mut poll_interval = tokio::time::interval(Duration::from_secs(1));
loop {
tokio::select! {
_ = detection_interval.tick() => {
sys.refresh_processes(sysinfo::ProcessesToUpdate::All, true);
let detected = detector::detect_engines(&sys, &client).await;
for d in &detected {
let key = (d.engine_type.clone(), d.endpoint.clone());
engine_map.entry(key).or_insert_with(|| {
let adapter = create_adapter(
d.engine_type.clone(),
d.endpoint.clone(),
client.clone(),
d.served_model.clone(),
);
tracing::info!(
"Detected engine: {} at {} (model={:?})",
d.engine_type,
d.endpoint,
d.served_model,
);
EngineState::new(adapter, d.deployment_mode.clone())
});
}
}
_ = poll_interval.tick() => {
let mut snapshots = Vec::new();
let keys: Vec<_> = engine_map.keys().cloned().collect();
for key in &keys {
if let Some(state) = engine_map.get_mut(key) {
let health = state.adapter.health_check().await;
let success = matches!(health, EngineStatus::Running | EngineStatus::Loading);
state.record_probe_result(success);
let status = if success { health } else { state.status.clone() };
let model = if success {
state.adapter.get_model_info().await
} else {
None
};
let metrics = if success {
state.adapter.get_metrics().await
} else {
None
};
snapshots.push(EngineSnapshot {
engine_type: state.adapter.engine_type(),
endpoint: state.adapter.endpoint().to_string(),
status,
model,
metrics,
recent_requests: Vec::new(),
deployment_mode: state.deployment_mode.clone(),
});
}
}
engine_map.retain(|_key, state| !state.should_remove());
let mut lock = shared_snapshots.write().await;
*lock = snapshots;
}
}
}
}