use std::{
convert::{Infallible, TryFrom},
sync::Arc,
};
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::sse::{Event, Sse},
routing::{get, post},
Json, Router,
};
use futures::stream::Stream;
use serde::{Deserialize, Serialize};
use crate::{
apr::{AprModel, AprModelType, ModelWeights, HEADER_SIZE, MAGIC},
audit::{AuditLogger, AuditRecord, InMemoryAuditSink},
cache::{CacheKey, ModelCache},
error::RealizarError,
explain::ShapExplanation,
generate::{GenerationConfig, SamplingStrategy},
layers::{Model, ModelConfig},
metrics::MetricsCollector,
registry::{ModelInfo, ModelRegistry},
tokenizer::BPETokenizer,
};
#[derive(Clone)]
pub struct AppState {
model: Option<Arc<Model>>,
tokenizer: Option<Arc<BPETokenizer>>,
#[allow(dead_code)] cache: Option<Arc<ModelCache>>,
#[allow(dead_code)] cache_key: Option<CacheKey>,
metrics: Arc<MetricsCollector>,
registry: Option<Arc<ModelRegistry>>,
default_model_id: Option<String>,
apr_model: Option<Arc<AprModel>>,
audit_logger: Arc<AuditLogger>,
audit_sink: Arc<InMemoryAuditSink>,
#[cfg(feature = "gpu")]
gpu_model: Option<Arc<std::sync::RwLock<crate::gpu::GpuModel>>>,
quantized_model: Option<Arc<crate::gguf::OwnedQuantizedModel>>,
#[cfg(feature = "gpu")]
cached_model: Option<Arc<crate::gguf::OwnedQuantizedModelCachedSync>>,
#[cfg(feature = "gpu")]
dispatch_metrics: Option<Arc<crate::gguf::DispatchMetrics>>,
#[cfg(feature = "gpu")]
batch_request_tx: Option<tokio::sync::mpsc::Sender<ContinuousBatchRequest>>,
#[cfg(feature = "gpu")]
batch_config: Option<BatchConfig>,
}
fn create_audit_state() -> (Arc<AuditLogger>, Arc<InMemoryAuditSink>) {
let sink = Arc::new(InMemoryAuditSink::new());
let logger = AuditLogger::new(Box::new(InMemorySinkWrapper(sink.clone())))
.with_model_hash("demo-model-hash");
(Arc::new(logger), sink)
}
struct InMemorySinkWrapper(Arc<InMemoryAuditSink>);
impl crate::audit::AuditSink for InMemorySinkWrapper {
fn write_batch(&self, records: &[AuditRecord]) -> Result<(), crate::audit::AuditError> {
self.0.write_batch(records)
}
fn flush(&self) -> Result<(), crate::audit::AuditError> {
self.0.flush()
}
}
impl AppState {
#[must_use]
pub fn new(model: Model, tokenizer: BPETokenizer) -> Self {
let (audit_logger, audit_sink) = create_audit_state();
Self {
model: Some(Arc::new(model)),
tokenizer: Some(Arc::new(tokenizer)),
cache: None,
cache_key: None,
metrics: Arc::new(MetricsCollector::new()),
registry: None,
default_model_id: None,
apr_model: None,
audit_logger,
audit_sink,
#[cfg(feature = "gpu")]
gpu_model: None,
quantized_model: None,
#[cfg(feature = "gpu")]
cached_model: None,
#[cfg(feature = "gpu")]
dispatch_metrics: None,
#[cfg(feature = "gpu")]
batch_request_tx: None,
#[cfg(feature = "gpu")]
batch_config: None,
}
}
pub fn with_registry(
registry: ModelRegistry,
default_model_id: &str,
) -> Result<Self, RealizarError> {
if !registry.contains(default_model_id) {
return Err(RealizarError::ModelNotFound(default_model_id.to_string()));
}
let (audit_logger, audit_sink) = create_audit_state();
Ok(Self {
model: None,
tokenizer: None,
cache: None,
cache_key: None,
metrics: Arc::new(MetricsCollector::new()),
registry: Some(Arc::new(registry)),
default_model_id: Some(default_model_id.to_string()),
apr_model: None,
audit_logger,
audit_sink,
#[cfg(feature = "gpu")]
gpu_model: None,
quantized_model: None,
#[cfg(feature = "gpu")]
cached_model: None,
#[cfg(feature = "gpu")]
dispatch_metrics: None,
#[cfg(feature = "gpu")]
batch_request_tx: None,
#[cfg(feature = "gpu")]
batch_config: None,
})
}
#[allow(clippy::type_complexity)]
fn get_model(
&self,
model_id: Option<&str>,
) -> Result<(Arc<Model>, Arc<BPETokenizer>), RealizarError> {
if let Some(registry) = &self.registry {
let id = model_id
.or(self.default_model_id.as_deref())
.ok_or_else(|| RealizarError::RegistryError("No model ID specified".to_string()))?;
return registry.get(id);
}
let model = self
.model
.clone()
.ok_or_else(|| RealizarError::RegistryError("No model available".to_string()))?;
let tokenizer = self
.tokenizer
.clone()
.ok_or_else(|| RealizarError::RegistryError("No tokenizer available".to_string()))?;
Ok((model, tokenizer))
}
#[must_use]
pub fn with_cache(cache_capacity: usize) -> Self {
let config = ModelConfig {
vocab_size: 100,
hidden_dim: 32,
num_heads: 1,
num_layers: 1,
intermediate_dim: 64,
eps: 1e-5,
};
let model = Model::new(config).expect("Failed to create placeholder model");
let vocab: Vec<String> = (0..100)
.map(|i| {
if i == 0 {
"<unk>".to_string()
} else {
format!("token{i}")
}
})
.collect();
let tokenizer =
BPETokenizer::new(vocab, vec![], "<unk>").expect("Failed to create tokenizer");
let (audit_logger, audit_sink) = create_audit_state();
Self {
model: Some(Arc::new(model)),
tokenizer: Some(Arc::new(tokenizer)),
cache: Some(Arc::new(ModelCache::new(cache_capacity))),
cache_key: Some(CacheKey::new("default".to_string())),
metrics: Arc::new(MetricsCollector::new()),
registry: None,
default_model_id: None,
apr_model: None,
audit_logger,
audit_sink,
#[cfg(feature = "gpu")]
gpu_model: None,
quantized_model: None,
#[cfg(feature = "gpu")]
cached_model: None,
#[cfg(feature = "gpu")]
dispatch_metrics: None,
#[cfg(feature = "gpu")]
batch_request_tx: None,
#[cfg(feature = "gpu")]
batch_config: None,
}
}
pub fn demo() -> Result<Self, RealizarError> {
let config = ModelConfig {
vocab_size: 100,
hidden_dim: 32,
num_heads: 1,
num_layers: 1,
intermediate_dim: 64,
eps: 1e-5,
};
let model = Model::new(config)?;
let vocab: Vec<String> = (0..100)
.map(|i| {
if i == 0 {
"<unk>".to_string()
} else {
format!("token{i}")
}
})
.collect();
let tokenizer = BPETokenizer::new(vocab, vec![], "<unk>")?;
let apr_model = create_demo_apr_model(4)?;
let (audit_logger, audit_sink) = create_audit_state();
Ok(Self {
model: Some(Arc::new(model)),
tokenizer: Some(Arc::new(tokenizer)),
cache: None,
cache_key: None,
metrics: Arc::new(MetricsCollector::new()),
registry: None,
default_model_id: None,
apr_model: Some(Arc::new(apr_model)),
audit_logger,
audit_sink,
#[cfg(feature = "gpu")]
gpu_model: None,
quantized_model: None,
#[cfg(feature = "gpu")]
cached_model: None,
#[cfg(feature = "gpu")]
dispatch_metrics: None,
#[cfg(feature = "gpu")]
batch_request_tx: None,
#[cfg(feature = "gpu")]
batch_config: None,
})
}
#[cfg(feature = "gpu")]
pub fn with_gpu_model(gpu_model: crate::gpu::GpuModel) -> Result<Self, RealizarError> {
let vocab_size = gpu_model.config().vocab_size;
let vocab: Vec<String> = (0..vocab_size)
.map(|i| {
if i == 0 {
"<unk>".to_string()
} else {
format!("token{i}")
}
})
.collect();
let tokenizer = BPETokenizer::new(vocab, vec![], "<unk>")?;
let (audit_logger, audit_sink) = create_audit_state();
Ok(Self {
model: None,
tokenizer: Some(Arc::new(tokenizer)),
cache: None,
cache_key: None,
metrics: Arc::new(MetricsCollector::new()),
registry: None,
default_model_id: None,
apr_model: None,
audit_logger,
audit_sink,
gpu_model: Some(Arc::new(std::sync::RwLock::new(gpu_model))),
quantized_model: None,
cached_model: None,
dispatch_metrics: None,
batch_request_tx: None,
batch_config: None,
})
}
pub fn with_quantized_model(
quantized_model: crate::gguf::OwnedQuantizedModel,
) -> Result<Self, RealizarError> {
let vocab_size = quantized_model.config.vocab_size;
let vocab: Vec<String> = (0..vocab_size)
.map(|i| {
if i == 0 {
"<unk>".to_string()
} else {
format!("token{i}")
}
})
.collect();
let tokenizer = BPETokenizer::new(vocab, vec![], "<unk>")?;
let (audit_logger, audit_sink) = create_audit_state();
Ok(Self {
model: None,
tokenizer: Some(Arc::new(tokenizer)),
cache: None,
cache_key: None,
metrics: Arc::new(MetricsCollector::new()),
registry: None,
default_model_id: None,
apr_model: None,
audit_logger,
audit_sink,
#[cfg(feature = "gpu")]
gpu_model: None,
quantized_model: Some(Arc::new(quantized_model)),
#[cfg(feature = "gpu")]
cached_model: None,
#[cfg(feature = "gpu")]
dispatch_metrics: None,
#[cfg(feature = "gpu")]
batch_request_tx: None,
#[cfg(feature = "gpu")]
batch_config: None,
})
}
#[cfg(feature = "gpu")]
pub fn with_cached_model(
cached_model: crate::gguf::OwnedQuantizedModelCachedSync,
) -> Result<Self, RealizarError> {
let vocab_size = cached_model.model().config.vocab_size;
let vocab: Vec<String> = (0..vocab_size)
.map(|i| {
if i == 0 {
"<unk>".to_string()
} else {
format!("token{i}")
}
})
.collect();
let tokenizer = BPETokenizer::new(vocab, vec![], "<unk>")?;
let (audit_logger, audit_sink) = create_audit_state();
Ok(Self {
model: None,
tokenizer: Some(Arc::new(tokenizer)),
cache: None,
cache_key: None,
metrics: Arc::new(MetricsCollector::new()),
registry: None,
default_model_id: None,
apr_model: None,
audit_logger,
audit_sink,
gpu_model: None,
quantized_model: None,
cached_model: Some(Arc::new(cached_model)),
dispatch_metrics: Some(Arc::new(crate::gguf::DispatchMetrics::new())),
batch_request_tx: None,
batch_config: None,
})
}
#[must_use]
pub fn has_quantized_model(&self) -> bool {
self.quantized_model.is_some()
}
pub fn quantized_model(&self) -> Option<&Arc<crate::gguf::OwnedQuantizedModel>> {
self.quantized_model.as_ref()
}
#[cfg(feature = "gpu")]
#[must_use]
pub fn has_gpu_model(&self) -> bool {
self.gpu_model.is_some()
}
#[cfg(feature = "gpu")]
pub fn gpu_model(&self) -> Option<&Arc<std::sync::RwLock<crate::gpu::GpuModel>>> {
self.gpu_model.as_ref()
}
#[cfg(feature = "gpu")]
#[must_use]
pub fn has_cached_model(&self) -> bool {
self.cached_model.is_some()
}
#[cfg(feature = "gpu")]
pub fn cached_model(&self) -> Option<&Arc<crate::gguf::OwnedQuantizedModelCachedSync>> {
self.cached_model.as_ref()
}
#[cfg(feature = "gpu")]
#[must_use]
pub fn dispatch_metrics(&self) -> Option<&Arc<crate::gguf::DispatchMetrics>> {
self.dispatch_metrics.as_ref()
}
#[cfg(feature = "gpu")]
#[must_use]
pub fn batch_request_tx(&self) -> Option<&tokio::sync::mpsc::Sender<ContinuousBatchRequest>> {
self.batch_request_tx.as_ref()
}
#[cfg(feature = "gpu")]
#[must_use]
pub fn batch_config(&self) -> Option<&BatchConfig> {
self.batch_config.as_ref()
}
#[cfg(feature = "gpu")]
#[must_use]
pub fn batch_enabled(&self) -> bool {
self.batch_request_tx.is_some() && self.batch_config.is_some()
}
#[cfg(feature = "gpu")]
#[must_use]
pub fn with_batch_config(
mut self,
batch_request_tx: tokio::sync::mpsc::Sender<ContinuousBatchRequest>,
batch_config: BatchConfig,
) -> Self {
self.batch_request_tx = Some(batch_request_tx);
self.batch_config = Some(batch_config);
self
}
}
fn create_demo_apr_model(input_dim: usize) -> Result<AprModel, RealizarError> {
let weights = ModelWeights {
weights: vec![vec![1.0; input_dim]], biases: vec![vec![0.0]], dimensions: vec![input_dim, 1],
};
let payload = serde_json::to_vec(&weights).map_err(|e| RealizarError::FormatError {
reason: format!("Failed to serialize model weights: {e}"),
})?;
let mut data = Vec::with_capacity(HEADER_SIZE + payload.len());
data.extend_from_slice(&MAGIC); data.push(1); data.push(0); data.push(0); data.push(0); data.extend_from_slice(&(AprModelType::LinearRegression as u16).to_le_bytes()); data.extend_from_slice(&0u32.to_le_bytes()); data.extend_from_slice(&(payload.len() as u32).to_le_bytes()); data.extend_from_slice(&(payload.len() as u32).to_le_bytes()); data.extend_from_slice(&[0u8; 10]);
data.extend_from_slice(&payload);
AprModel::from_bytes(&data)
}
#[derive(Serialize, Deserialize)]
pub struct HealthResponse {
pub status: String,
pub version: String,
}
#[derive(Serialize, Deserialize)]
pub struct TokenizeRequest {
pub text: String,
pub model_id: Option<String>,
}
#[derive(Serialize, Deserialize)]
pub struct TokenizeResponse {
pub token_ids: Vec<u32>,
pub num_tokens: usize,
}
#[derive(Serialize, Deserialize)]
pub struct GenerateRequest {
pub prompt: String,
#[serde(default = "default_max_tokens")]
pub max_tokens: usize,
#[serde(default = "default_temperature")]
pub temperature: f32,
#[serde(default = "default_strategy")]
pub strategy: String,
#[serde(default = "default_top_k")]
pub top_k: usize,
#[serde(default = "default_top_p")]
pub top_p: f32,
pub seed: Option<u64>,
pub model_id: Option<String>,
}
fn default_max_tokens() -> usize {
50
}
fn default_temperature() -> f32 {
1.0
}
fn default_strategy() -> String {
"greedy".to_string()
}
fn default_top_k() -> usize {
50
}
fn default_top_p() -> f32 {
0.9
}
#[derive(Serialize, Deserialize)]
pub struct GenerateResponse {
pub token_ids: Vec<u32>,
pub text: String,
pub num_generated: usize,
}
#[derive(Serialize)]
pub struct ErrorResponse {
pub error: String,
}
#[derive(Serialize, Deserialize)]
pub struct BatchTokenizeRequest {
pub texts: Vec<String>,
}
#[derive(Serialize, Deserialize)]
pub struct BatchTokenizeResponse {
pub results: Vec<TokenizeResponse>,
}
#[derive(Serialize, Deserialize)]
pub struct BatchGenerateRequest {
pub prompts: Vec<String>,
#[serde(default = "default_max_tokens")]
pub max_tokens: usize,
#[serde(default = "default_temperature")]
pub temperature: f32,
#[serde(default = "default_strategy")]
pub strategy: String,
#[serde(default = "default_top_k")]
pub top_k: usize,
#[serde(default = "default_top_p")]
pub top_p: f32,
pub seed: Option<u64>,
}
#[derive(Serialize, Deserialize)]
pub struct BatchGenerateResponse {
pub results: Vec<GenerateResponse>,
}
#[derive(Serialize, Deserialize)]
pub struct StreamTokenEvent {
pub token_id: u32,
pub text: String,
}
#[derive(Serialize, Deserialize)]
pub struct StreamDoneEvent {
pub num_generated: usize,
}
#[derive(Serialize, Deserialize)]
pub struct ModelsResponse {
pub models: Vec<ModelInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatCompletionRequest {
pub model: String,
pub messages: Vec<ChatMessage>,
#[serde(default)]
pub max_tokens: Option<usize>,
#[serde(default)]
pub temperature: Option<f32>,
#[serde(default)]
pub top_p: Option<f32>,
#[serde(default = "default_n")]
pub n: usize,
#[serde(default)]
pub stream: bool,
#[serde(default)]
pub stop: Option<Vec<String>>,
#[serde(default)]
pub user: Option<String>,
}
fn default_n() -> usize {
1
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatMessage {
pub role: String,
pub content: String,
#[serde(default)]
pub name: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatCompletionResponse {
pub id: String,
pub object: String,
pub created: i64,
pub model: String,
pub choices: Vec<ChatChoice>,
pub usage: Usage,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatChoice {
pub index: usize,
pub message: ChatMessage,
pub finish_reason: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Usage {
pub prompt_tokens: usize,
pub completion_tokens: usize,
pub total_tokens: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OpenAIModelsResponse {
pub object: String,
pub data: Vec<OpenAIModel>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OpenAIModel {
pub id: String,
pub object: String,
pub created: i64,
pub owned_by: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatCompletionChunk {
pub id: String,
pub object: String,
pub created: i64,
pub model: String,
pub choices: Vec<ChatChunkChoice>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatChunkChoice {
pub index: usize,
pub delta: ChatDelta,
pub finish_reason: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatDelta {
#[serde(skip_serializing_if = "Option::is_none")]
pub role: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub content: Option<String>,
}
impl ChatCompletionChunk {
fn new(id: &str, model: &str, content: Option<String>, finish_reason: Option<String>) -> Self {
Self {
id: id.to_string(),
object: "chat.completion.chunk".to_string(),
created: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0),
model: model.to_string(),
choices: vec![ChatChunkChoice {
index: 0,
delta: ChatDelta {
role: if content.is_none() && finish_reason.is_none() {
Some("assistant".to_string())
} else {
None
},
content,
},
finish_reason,
}],
}
}
fn initial(id: &str, model: &str) -> Self {
Self::new(id, model, None, None)
}
fn content(id: &str, model: &str, text: &str) -> Self {
Self::new(id, model, Some(text.to_string()), None)
}
fn done(id: &str, model: &str) -> Self {
Self::new(id, model, None, Some("stop".to_string()))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PredictRequest {
#[serde(default)]
pub model: Option<String>,
pub features: Vec<f32>,
#[serde(default)]
pub feature_names: Option<Vec<String>>,
#[serde(default)]
pub top_k: Option<usize>,
#[serde(default = "default_true")]
pub include_confidence: bool,
}
fn default_true() -> bool {
true
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PredictResponse {
pub request_id: String,
pub model: String,
pub prediction: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
pub confidence: Option<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub top_k_predictions: Option<Vec<PredictionWithScore>>,
pub latency_ms: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PredictionWithScore {
pub label: String,
pub score: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExplainRequest {
#[serde(default)]
pub model: Option<String>,
pub features: Vec<f32>,
pub feature_names: Vec<String>,
#[serde(default = "default_top_k_features")]
pub top_k_features: usize,
#[serde(default = "default_explain_method")]
pub method: String,
}
fn default_top_k_features() -> usize {
5
}
fn default_explain_method() -> String {
"shap".to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExplainResponse {
pub request_id: String,
pub model: String,
pub prediction: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
pub confidence: Option<f32>,
pub explanation: ShapExplanation,
pub summary: String,
pub latency_ms: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuditResponse {
pub record: AuditRecord,
}
pub fn create_router(state: AppState) -> Router {
Router::new()
.route("/health", get(health_handler))
.route("/metrics", get(metrics_handler))
.route("/metrics/dispatch", get(dispatch_metrics_handler))
.route("/metrics/dispatch/reset", post(dispatch_reset_handler))
.route("/models", get(models_handler))
.route("/tokenize", post(tokenize_handler))
.route("/generate", post(generate_handler))
.route("/batch/tokenize", post(batch_tokenize_handler))
.route("/batch/generate", post(batch_generate_handler))
.route("/stream/generate", post(stream_generate_handler))
.route("/realize/generate", post(stream_generate_handler))
.route("/realize/batch", post(batch_generate_handler))
.route("/realize/embed", post(realize_embed_handler))
.route("/realize/model", get(realize_model_handler))
.route("/realize/reload", post(realize_reload_handler))
.route("/v1/models", get(openai_models_handler))
.route("/v1/completions", post(openai_completions_handler))
.route(
"/v1/chat/completions",
post(openai_chat_completions_handler),
)
.route(
"/v1/chat/completions/stream",
post(openai_chat_completions_stream_handler),
)
.route("/v1/embeddings", post(openai_embeddings_handler))
.route("/v1/predict", post(apr_predict_handler))
.route("/v1/explain", post(apr_explain_handler))
.route("/v1/audit/:request_id", get(apr_audit_handler))
.route("/v1/gpu/warmup", post(gpu_warmup_handler))
.route("/v1/gpu/status", get(gpu_status_handler))
.route("/v1/batch/completions", post(gpu_batch_completions_handler))
.route("/v1/metrics", get(server_metrics_handler))
.with_state(state)
}
async fn health_handler() -> Json<HealthResponse> {
Json(HealthResponse {
status: "healthy".to_string(),
version: crate::VERSION.to_string(),
})
}
async fn metrics_handler(State(state): State<AppState>) -> String {
state.metrics.to_prometheus()
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct DispatchMetricsResponse {
pub cpu_dispatches: usize,
pub gpu_dispatches: usize,
pub total_dispatches: usize,
pub gpu_ratio: f64,
pub cpu_latency_p50_us: f64,
pub cpu_latency_p95_us: f64,
pub cpu_latency_p99_us: f64,
pub gpu_latency_p50_us: f64,
pub gpu_latency_p95_us: f64,
pub gpu_latency_p99_us: f64,
pub cpu_latency_mean_us: f64,
pub gpu_latency_mean_us: f64,
pub cpu_latency_min_us: u64,
pub cpu_latency_max_us: u64,
pub gpu_latency_min_us: u64,
pub gpu_latency_max_us: u64,
pub cpu_latency_variance_us: f64,
pub cpu_latency_stddev_us: f64,
pub gpu_latency_variance_us: f64,
pub gpu_latency_stddev_us: f64,
pub bucket_boundaries_us: Vec<String>,
pub cpu_latency_bucket_counts: Vec<usize>,
pub gpu_latency_bucket_counts: Vec<usize>,
pub throughput_rps: f64,
pub elapsed_seconds: f64,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ServerMetricsResponse {
pub throughput_tok_per_sec: f64,
pub latency_p50_ms: f64,
pub latency_p95_ms: f64,
pub latency_p99_ms: f64,
pub gpu_memory_used_bytes: u64,
pub gpu_memory_total_bytes: u64,
pub gpu_utilization_percent: u32,
pub cuda_path_active: bool,
pub batch_size: usize,
pub queue_depth: usize,
pub total_tokens: u64,
pub total_requests: u64,
pub uptime_secs: u64,
pub model_name: String,
}
#[derive(Debug, Clone, serde::Deserialize)]
pub struct DispatchMetricsQuery {
#[serde(default)]
pub format: Option<String>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DispatchResetResponse {
pub success: bool,
pub message: String,
}
#[cfg(feature = "gpu")]
async fn dispatch_reset_handler(State(state): State<AppState>) -> axum::response::Response {
use axum::response::IntoResponse;
if let Some(metrics) = state.dispatch_metrics() {
metrics.reset();
Json(DispatchResetResponse {
success: true,
message: "Metrics reset successfully".to_string(),
})
.into_response()
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(ErrorResponse {
error: "Dispatch metrics not available. No GPU model configured.".to_string(),
}),
)
.into_response()
}
}
#[cfg(not(feature = "gpu"))]
async fn dispatch_reset_handler(State(_state): State<AppState>) -> axum::response::Response {
use axum::response::IntoResponse;
(
StatusCode::SERVICE_UNAVAILABLE,
Json(ErrorResponse {
error: "Dispatch metrics not available. GPU feature not enabled.".to_string(),
}),
)
.into_response()
}
#[cfg(feature = "gpu")]
async fn server_metrics_handler(State(state): State<AppState>) -> Json<ServerMetricsResponse> {
let snapshot = state.metrics.snapshot();
let (latency_p50_ms, latency_p95_ms, latency_p99_ms, gpu_dispatches, cuda_path_active) =
if let Some(dispatch) = state.dispatch_metrics() {
let gpu_p50 = dispatch.gpu_latency_p50_us();
let gpu_p95 = dispatch.gpu_latency_p95_us();
let gpu_p99 = dispatch.gpu_latency_p99_us();
let gpu_count = dispatch.gpu_dispatches();
if gpu_count > 0 {
(
gpu_p50 / 1000.0,
gpu_p95 / 1000.0,
gpu_p99 / 1000.0,
gpu_count,
true,
)
} else {
let cpu_p50 = dispatch.cpu_latency_p50_us();
let cpu_p95 = dispatch.cpu_latency_p95_us();
let cpu_p99 = dispatch.cpu_latency_p99_us();
(
cpu_p50 / 1000.0,
cpu_p95 / 1000.0,
cpu_p99 / 1000.0,
0,
false,
)
}
} else {
(0.0, 0.0, 0.0, 0, false)
};
let (gpu_memory_used_bytes, gpu_memory_total_bytes): (u64, u64) =
if let Some(model) = state.cached_model() {
let used = model.gpu_cache_memory() as u64;
let total = 24 * 1024 * 1024 * 1024u64;
(used, total)
} else {
(0, 0)
};
let gpu_utilization_percent = if let Some(dispatch) = state.dispatch_metrics() {
let total = dispatch.total_dispatches();
if total > 0 {
((gpu_dispatches as f64 / total as f64) * 100.0) as u32
} else {
0
}
} else {
0
};
let (batch_size, queue_depth) = if let Some(config) = state.batch_config() {
(config.optimal_batch, config.queue_size)
} else {
(1, 0)
};
let model_name = if state.cached_model().is_some() {
"phi-2-q4_k_m".to_string()
} else {
"N/A".to_string()
};
Json(ServerMetricsResponse {
throughput_tok_per_sec: snapshot.tokens_per_sec,
latency_p50_ms,
latency_p95_ms,
latency_p99_ms,
gpu_memory_used_bytes,
gpu_memory_total_bytes,
gpu_utilization_percent,
cuda_path_active,
batch_size,
queue_depth,
total_tokens: snapshot.total_tokens as u64,
total_requests: snapshot.total_requests as u64,
uptime_secs: snapshot.uptime_secs,
model_name,
})
}
#[cfg(not(feature = "gpu"))]
async fn server_metrics_handler(State(state): State<AppState>) -> Json<ServerMetricsResponse> {
let snapshot = state.metrics.snapshot();
Json(ServerMetricsResponse {
throughput_tok_per_sec: snapshot.tokens_per_sec,
latency_p50_ms: snapshot.avg_latency_ms,
latency_p95_ms: snapshot.avg_latency_ms * 1.5,
latency_p99_ms: snapshot.avg_latency_ms * 2.0,
gpu_memory_used_bytes: 0,
gpu_memory_total_bytes: 0,
gpu_utilization_percent: 0,
cuda_path_active: false,
batch_size: 1,
queue_depth: 0,
total_tokens: snapshot.total_tokens as u64,
total_requests: snapshot.total_requests as u64,
uptime_secs: snapshot.uptime_secs,
model_name: "N/A".to_string(),
})
}
#[cfg(feature = "gpu")]
async fn dispatch_metrics_handler(
State(state): State<AppState>,
Query(query): Query<DispatchMetricsQuery>,
) -> axum::response::Response {
use axum::response::IntoResponse;
if let Some(metrics) = state.dispatch_metrics() {
let format = query.format.as_deref().unwrap_or("json");
if format == "prometheus" {
let cpu_buckets = metrics.cpu_latency_buckets();
let gpu_buckets = metrics.gpu_latency_buckets();
let cpu_cumulative = [
cpu_buckets[0],
cpu_buckets[0] + cpu_buckets[1],
cpu_buckets[0] + cpu_buckets[1] + cpu_buckets[2],
cpu_buckets[0] + cpu_buckets[1] + cpu_buckets[2] + cpu_buckets[3],
cpu_buckets[0] + cpu_buckets[1] + cpu_buckets[2] + cpu_buckets[3] + cpu_buckets[4],
];
let gpu_cumulative = [
gpu_buckets[0],
gpu_buckets[0] + gpu_buckets[1],
gpu_buckets[0] + gpu_buckets[1] + gpu_buckets[2],
gpu_buckets[0] + gpu_buckets[1] + gpu_buckets[2] + gpu_buckets[3],
gpu_buckets[0] + gpu_buckets[1] + gpu_buckets[2] + gpu_buckets[3] + gpu_buckets[4],
];
let prometheus_output = format!(
"# HELP realizar_dispatch_cpu_total Total CPU dispatch decisions\n\
# TYPE realizar_dispatch_cpu_total counter\n\
realizar_dispatch_cpu_total {}\n\
# HELP realizar_dispatch_gpu_total Total GPU dispatch decisions\n\
# TYPE realizar_dispatch_gpu_total counter\n\
realizar_dispatch_gpu_total {}\n\
# HELP realizar_dispatch_gpu_ratio Ratio of GPU dispatches (0.0 to 1.0)\n\
# TYPE realizar_dispatch_gpu_ratio gauge\n\
realizar_dispatch_gpu_ratio {:.6}\n\
# HELP realizar_dispatch_throughput_rps Requests per second since start or reset\n\
# TYPE realizar_dispatch_throughput_rps gauge\n\
realizar_dispatch_throughput_rps {:.6}\n\
# HELP realizar_dispatch_elapsed_seconds Seconds since start or last reset\n\
# TYPE realizar_dispatch_elapsed_seconds gauge\n\
realizar_dispatch_elapsed_seconds {:.6}\n\
# HELP realizar_dispatch_cpu_latency CPU dispatch latency in microseconds\n\
# TYPE realizar_dispatch_cpu_latency histogram\n\
realizar_dispatch_cpu_latency_bucket{{le=\"100\"}} {}\n\
realizar_dispatch_cpu_latency_bucket{{le=\"500\"}} {}\n\
realizar_dispatch_cpu_latency_bucket{{le=\"1000\"}} {}\n\
realizar_dispatch_cpu_latency_bucket{{le=\"5000\"}} {}\n\
realizar_dispatch_cpu_latency_bucket{{le=\"+Inf\"}} {}\n\
realizar_dispatch_cpu_latency_sum {}\n\
realizar_dispatch_cpu_latency_count {}\n\
# HELP realizar_dispatch_gpu_latency GPU dispatch latency in microseconds\n\
# TYPE realizar_dispatch_gpu_latency histogram\n\
realizar_dispatch_gpu_latency_bucket{{le=\"100\"}} {}\n\
realizar_dispatch_gpu_latency_bucket{{le=\"500\"}} {}\n\
realizar_dispatch_gpu_latency_bucket{{le=\"1000\"}} {}\n\
realizar_dispatch_gpu_latency_bucket{{le=\"5000\"}} {}\n\
realizar_dispatch_gpu_latency_bucket{{le=\"+Inf\"}} {}\n\
realizar_dispatch_gpu_latency_sum {}\n\
realizar_dispatch_gpu_latency_count {}\n",
metrics.cpu_dispatches(),
metrics.gpu_dispatches(),
metrics.gpu_ratio(),
metrics.throughput_rps(),
metrics.elapsed_seconds(),
cpu_cumulative[0],
cpu_cumulative[1],
cpu_cumulative[2],
cpu_cumulative[3],
cpu_cumulative[4],
metrics.cpu_latency_sum_us(),
metrics.cpu_latency_count(),
gpu_cumulative[0],
gpu_cumulative[1],
gpu_cumulative[2],
gpu_cumulative[3],
gpu_cumulative[4],
metrics.gpu_latency_sum_us(),
metrics.gpu_latency_count(),
);
(
StatusCode::OK,
[("content-type", "text/plain; charset=utf-8")],
prometheus_output,
)
.into_response()
} else {
Json(DispatchMetricsResponse {
cpu_dispatches: metrics.cpu_dispatches(),
gpu_dispatches: metrics.gpu_dispatches(),
total_dispatches: metrics.total_dispatches(),
gpu_ratio: metrics.gpu_ratio(),
cpu_latency_p50_us: metrics.cpu_latency_p50_us(),
cpu_latency_p95_us: metrics.cpu_latency_p95_us(),
cpu_latency_p99_us: metrics.cpu_latency_p99_us(),
gpu_latency_p50_us: metrics.gpu_latency_p50_us(),
gpu_latency_p95_us: metrics.gpu_latency_p95_us(),
gpu_latency_p99_us: metrics.gpu_latency_p99_us(),
cpu_latency_mean_us: metrics.cpu_latency_mean_us(),
gpu_latency_mean_us: metrics.gpu_latency_mean_us(),
cpu_latency_min_us: metrics.cpu_latency_min_us(),
cpu_latency_max_us: metrics.cpu_latency_max_us(),
gpu_latency_min_us: metrics.gpu_latency_min_us(),
gpu_latency_max_us: metrics.gpu_latency_max_us(),
cpu_latency_variance_us: metrics.cpu_latency_variance_us(),
cpu_latency_stddev_us: metrics.cpu_latency_stddev_us(),
gpu_latency_variance_us: metrics.gpu_latency_variance_us(),
gpu_latency_stddev_us: metrics.gpu_latency_stddev_us(),
bucket_boundaries_us: metrics.bucket_boundaries_us(),
cpu_latency_bucket_counts: metrics.cpu_latency_buckets().to_vec(),
gpu_latency_bucket_counts: metrics.gpu_latency_buckets().to_vec(),
throughput_rps: metrics.throughput_rps(),
elapsed_seconds: metrics.elapsed_seconds(),
})
.into_response()
}
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(ErrorResponse {
error: "Dispatch metrics not available. No GPU model configured.".to_string(),
}),
)
.into_response()
}
}
#[cfg(not(feature = "gpu"))]
async fn dispatch_metrics_handler(
State(_state): State<AppState>,
Query(_query): Query<DispatchMetricsQuery>,
) -> axum::response::Response {
use axum::response::IntoResponse;
(
StatusCode::SERVICE_UNAVAILABLE,
Json(ErrorResponse {
error: "Dispatch metrics not available. GPU feature not enabled.".to_string(),
}),
)
.into_response()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GpuBatchRequest {
pub prompts: Vec<String>,
#[serde(default = "default_max_tokens")]
pub max_tokens: usize,
#[serde(default)]
pub temperature: f32,
#[serde(default = "default_top_k")]
pub top_k: usize,
#[serde(default)]
pub stop: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GpuBatchResponse {
pub results: Vec<GpuBatchResult>,
pub stats: GpuBatchStats,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GpuBatchResult {
pub index: usize,
pub token_ids: Vec<u32>,
pub text: String,
pub num_generated: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GpuBatchStats {
pub batch_size: usize,
pub gpu_used: bool,
pub total_tokens: usize,
pub processing_time_ms: f64,
pub throughput_tps: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GpuWarmupResponse {
pub success: bool,
pub memory_bytes: usize,
pub num_layers: usize,
pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GpuStatusResponse {
pub cache_ready: bool,
pub cache_memory_bytes: usize,
pub batch_threshold: usize,
pub recommended_min_batch: usize,
}
#[derive(Debug, Clone)]
#[cfg(feature = "gpu")]
pub struct BatchConfig {
pub window_ms: u64,
pub min_batch: usize,
pub optimal_batch: usize,
pub max_batch: usize,
pub queue_size: usize,
pub gpu_threshold: usize,
}
#[cfg(feature = "gpu")]
impl Default for BatchConfig {
fn default() -> Self {
Self {
window_ms: 50, min_batch: 4, optimal_batch: 32, max_batch: 64, queue_size: 1024, gpu_threshold: 32, }
}
}
#[cfg(feature = "gpu")]
impl BatchConfig {
pub fn low_latency() -> Self {
Self {
window_ms: 5,
min_batch: 2,
optimal_batch: 8,
max_batch: 16,
queue_size: 512,
gpu_threshold: 32, }
}
pub fn high_throughput() -> Self {
Self {
window_ms: 100, min_batch: 8,
optimal_batch: 32, max_batch: 128, queue_size: 2048,
gpu_threshold: 32, }
}
pub fn should_process(&self, batch_size: usize) -> bool {
batch_size >= self.optimal_batch
}
pub fn meets_minimum(&self, batch_size: usize) -> bool {
batch_size >= self.min_batch
}
}
#[cfg(feature = "gpu")]
pub struct ContinuousBatchRequest {
pub prompt_tokens: Vec<u32>,
pub max_tokens: usize,
pub temperature: f32,
pub top_k: usize,
pub response_tx: tokio::sync::oneshot::Sender<ContinuousBatchResponse>,
pub submitted_at: std::time::Instant,
}
#[cfg(feature = "gpu")]
#[derive(Debug, Clone)]
pub struct ContinuousBatchResponse {
pub token_ids: Vec<u32>,
pub prompt_len: usize,
pub batched: bool,
pub batch_size: usize,
pub latency_ms: f64,
}
#[cfg(feature = "gpu")]
impl ContinuousBatchResponse {
pub fn single(token_ids: Vec<u32>, prompt_len: usize, latency_ms: f64) -> Self {
Self {
token_ids,
prompt_len,
batched: false,
batch_size: 1,
latency_ms,
}
}
pub fn batched(
token_ids: Vec<u32>,
prompt_len: usize,
batch_size: usize,
latency_ms: f64,
) -> Self {
Self {
token_ids,
prompt_len,
batched: true,
batch_size,
latency_ms,
}
}
pub fn generated_tokens(&self) -> &[u32] {
if self.token_ids.len() > self.prompt_len {
&self.token_ids[self.prompt_len..]
} else {
&[]
}
}
}
#[derive(Debug, Clone, Default)]
#[cfg(feature = "gpu")]
pub struct BatchQueueStats {
pub total_queued: u64,
pub total_batches: u64,
pub total_single: u64,
pub avg_batch_size: f64,
pub avg_wait_ms: f64,
}
#[cfg(feature = "gpu")]
#[derive(Debug)]
pub struct BatchProcessResult {
pub requests_processed: usize,
pub was_batched: bool,
pub total_time_ms: f64,
pub avg_latency_ms: f64,
}
#[cfg(feature = "gpu")]
pub fn spawn_batch_processor(
model: std::sync::Arc<crate::gguf::OwnedQuantizedModelCachedSync>,
config: BatchConfig,
) -> tokio::sync::mpsc::Sender<ContinuousBatchRequest> {
let (tx, rx) = tokio::sync::mpsc::channel(config.queue_size);
tokio::spawn(batch_processor_task(rx, model, config));
tx
}
#[cfg(feature = "gpu")]
async fn batch_processor_task(
mut rx: tokio::sync::mpsc::Receiver<ContinuousBatchRequest>,
model: std::sync::Arc<crate::gguf::OwnedQuantizedModelCachedSync>,
config: BatchConfig,
) {
use std::time::{Duration, Instant};
use tokio::time::timeout;
let mut batch: Vec<ContinuousBatchRequest> = Vec::with_capacity(config.max_batch);
let mut window_start = Instant::now();
loop {
let elapsed = window_start.elapsed();
let remaining = Duration::from_millis(config.window_ms).saturating_sub(elapsed);
match timeout(remaining, rx.recv()).await {
Ok(Some(request)) => {
batch.push(request);
if batch.len() >= config.optimal_batch {
process_batch(&model, &config, &mut batch).await;
window_start = Instant::now();
}
},
Ok(None) => {
if !batch.is_empty() {
process_batch(&model, &config, &mut batch).await;
}
break;
},
Err(_) => {
if !batch.is_empty() {
process_batch(&model, &config, &mut batch).await;
}
window_start = Instant::now();
},
}
}
}
#[cfg(feature = "gpu")]
async fn process_batch(
model: &std::sync::Arc<crate::gguf::OwnedQuantizedModelCachedSync>,
config: &BatchConfig,
batch: &mut Vec<ContinuousBatchRequest>,
) {
use std::time::Instant;
if batch.is_empty() {
return;
}
let batch_size = batch.len();
let batch_start = Instant::now();
let gpu_threshold = config.gpu_threshold;
if batch_size >= gpu_threshold && model.is_gpu_cache_warm() {
let prompts: Vec<Vec<u32>> = batch.iter().map(|r| r.prompt_tokens.clone()).collect();
let first = &batch[0];
let gen_config = crate::gguf::QuantizedGenerateConfig {
max_tokens: first.max_tokens,
temperature: first.temperature,
top_k: first.top_k,
stop_tokens: Vec::new(),
};
let results = model.batch_generate_gpu(&prompts, &gen_config);
let total_latency_ms = batch_start.elapsed().as_secs_f64() * 1000.0;
let per_request_latency_ms = total_latency_ms / batch_size as f64;
match results {
Ok(all_token_ids) => {
for (request, token_ids) in batch.drain(..).zip(all_token_ids.into_iter()) {
let response = ContinuousBatchResponse {
token_ids,
prompt_len: request.prompt_tokens.len(),
batched: true,
batch_size,
latency_ms: per_request_latency_ms,
};
let _ = request.response_tx.send(response);
}
},
Err(_) => {
for request in batch.drain(..) {
let response = ContinuousBatchResponse {
token_ids: request.prompt_tokens.clone(),
prompt_len: request.prompt_tokens.len(),
batched: false,
batch_size,
latency_ms: per_request_latency_ms,
};
let _ = request.response_tx.send(response);
}
},
}
} else {
let mut handles = Vec::with_capacity(batch_size);
for request in batch.drain(..) {
let model = model.clone();
let handle = tokio::spawn(async move {
let start = Instant::now();
let gen_config = crate::gguf::QuantizedGenerateConfig {
max_tokens: request.max_tokens,
temperature: request.temperature,
top_k: request.top_k,
stop_tokens: Vec::new(),
};
let result = model.generate_with_cache(&request.prompt_tokens, &gen_config);
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
let response = match result {
Ok(token_ids) => ContinuousBatchResponse {
token_ids,
prompt_len: request.prompt_tokens.len(),
batched: false,
batch_size: 1,
latency_ms,
},
Err(_) => ContinuousBatchResponse {
token_ids: request.prompt_tokens.clone(),
prompt_len: request.prompt_tokens.len(),
batched: false,
batch_size: 1,
latency_ms,
},
};
let _ = request.response_tx.send(response);
});
handles.push(handle);
}
for handle in handles {
let _ = handle.await;
}
}
}
#[cfg(feature = "gpu")]
async fn gpu_warmup_handler(
State(state): State<AppState>,
) -> Result<Json<GpuWarmupResponse>, (StatusCode, Json<ErrorResponse>)> {
if let Some(cached_model) = state.cached_model() {
match cached_model.warmup_gpu_cache() {
Ok((memory_bytes, num_layers)) => Ok(Json(GpuWarmupResponse {
success: true,
memory_bytes,
num_layers,
message: format!(
"GPU cache warmed up: {} layers, {:.2} GB",
num_layers,
memory_bytes as f64 / 1e9
),
})),
Err(e) => Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: format!("GPU warmup failed: {e}"),
}),
)),
}
} else {
Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(ErrorResponse {
error: "No GPU-capable model loaded. Use with_cached_model() to enable."
.to_string(),
}),
))
}
}
#[cfg(not(feature = "gpu"))]
async fn gpu_warmup_handler(
State(_state): State<AppState>,
) -> Result<Json<GpuWarmupResponse>, (StatusCode, Json<ErrorResponse>)> {
Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(ErrorResponse {
error: "GPU feature not enabled. Build with --features gpu".to_string(),
}),
))
}
#[cfg(feature = "gpu")]
async fn gpu_status_handler(
State(state): State<AppState>,
) -> Result<Json<GpuStatusResponse>, (StatusCode, Json<ErrorResponse>)> {
if let Some(cached_model) = state.cached_model() {
Ok(Json(GpuStatusResponse {
cache_ready: cached_model.is_gpu_cache_warm(),
cache_memory_bytes: cached_model.gpu_cache_memory(),
batch_threshold: 32, recommended_min_batch: 32,
}))
} else {
Ok(Json(GpuStatusResponse {
cache_ready: false,
cache_memory_bytes: 0,
batch_threshold: 32,
recommended_min_batch: 32,
}))
}
}
#[cfg(not(feature = "gpu"))]
async fn gpu_status_handler(
State(_state): State<AppState>,
) -> Result<Json<GpuStatusResponse>, (StatusCode, Json<ErrorResponse>)> {
Ok(Json(GpuStatusResponse {
cache_ready: false,
cache_memory_bytes: 0,
batch_threshold: 32,
recommended_min_batch: 32,
}))
}
#[cfg(feature = "gpu")]
async fn gpu_batch_completions_handler(
State(state): State<AppState>,
Json(request): Json<GpuBatchRequest>,
) -> Result<Json<GpuBatchResponse>, (StatusCode, Json<ErrorResponse>)> {
use std::time::Instant;
if request.prompts.is_empty() {
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Prompts array cannot be empty".to_string(),
}),
));
}
let Some(cached_model) = state.cached_model() else {
return Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(ErrorResponse {
error: "No GPU-capable model loaded".to_string(),
}),
));
};
let gpu_ready = cached_model.is_gpu_cache_warm();
let batch_size = request.prompts.len();
let prompts_tokens: Vec<Vec<u32>> = request
.prompts
.iter()
.map(|p| {
p.bytes().map(|b| b as u32).collect()
})
.collect();
let gen_config = crate::gguf::QuantizedGenerateConfig {
max_tokens: request.max_tokens,
temperature: request.temperature,
top_k: request.top_k,
stop_tokens: vec![],
};
let start = Instant::now();
let gpu_threshold = 32;
let use_gpu = gpu_ready && batch_size >= gpu_threshold;
let results = if use_gpu {
match cached_model.batch_generate_gpu(&prompts_tokens, &gen_config) {
Ok(generated) => generated,
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: format!("GPU batch generation failed: {e}"),
}),
));
},
}
} else {
let mut results = Vec::with_capacity(batch_size);
for prompt in &prompts_tokens {
match cached_model.generate_with_cache(prompt, &gen_config) {
Ok(tokens) => results.push(tokens),
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: format!("Generation failed: {e}"),
}),
));
},
}
}
results
};
let elapsed = start.elapsed();
let total_tokens: usize = results.iter().map(Vec::len).sum();
let throughput_tps = total_tokens as f64 / elapsed.as_secs_f64();
let batch_results: Vec<GpuBatchResult> = results
.into_iter()
.enumerate()
.map(|(idx, tokens)| {
let prompt_len = prompts_tokens.get(idx).map_or(0, Vec::len);
let num_generated = tokens.len().saturating_sub(prompt_len);
GpuBatchResult {
index: idx,
token_ids: tokens.clone(),
text: tokens.iter().map(|&t| t as u8 as char).collect(),
num_generated,
}
})
.collect();
Ok(Json(GpuBatchResponse {
results: batch_results,
stats: GpuBatchStats {
batch_size,
gpu_used: use_gpu,
total_tokens,
processing_time_ms: elapsed.as_secs_f64() * 1000.0,
throughput_tps,
},
}))
}
#[cfg(not(feature = "gpu"))]
async fn gpu_batch_completions_handler(
State(_state): State<AppState>,
Json(_request): Json<GpuBatchRequest>,
) -> Result<Json<GpuBatchResponse>, (StatusCode, Json<ErrorResponse>)> {
Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(ErrorResponse {
error: "GPU feature not enabled. Build with --features gpu".to_string(),
}),
))
}
async fn models_handler(
State(state): State<AppState>,
) -> Result<Json<ModelsResponse>, (StatusCode, Json<ErrorResponse>)> {
if let Some(registry) = &state.registry {
let models = registry.list();
Ok(Json(ModelsResponse { models }))
} else {
Ok(Json(ModelsResponse {
models: vec![ModelInfo {
id: "default".to_string(),
name: "Default Model".to_string(),
description: "Single model deployment".to_string(),
format: "unknown".to_string(),
loaded: true,
}],
}))
}
}
async fn tokenize_handler(
State(state): State<AppState>,
Json(request): Json<TokenizeRequest>,
) -> Result<Json<TokenizeResponse>, (StatusCode, Json<ErrorResponse>)> {
let (_model, tokenizer) = state.get_model(request.model_id.as_deref()).map_err(|e| {
(
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let token_ids = tokenizer.encode(&request.text);
let num_tokens = token_ids.len();
Ok(Json(TokenizeResponse {
token_ids,
num_tokens,
}))
}
async fn generate_handler(
State(state): State<AppState>,
Json(request): Json<GenerateRequest>,
) -> Result<Json<GenerateResponse>, (StatusCode, Json<ErrorResponse>)> {
use std::time::Instant;
let start = Instant::now();
let (model, tokenizer) = state.get_model(request.model_id.as_deref()).map_err(|e| {
state.metrics.record_failure();
(
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let prompt_ids = tokenizer.encode(&request.prompt);
if prompt_ids.is_empty() {
state.metrics.record_failure();
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Prompt cannot be empty".to_string(),
}),
));
}
let prompt: Vec<usize> = prompt_ids.iter().map(|&id| id as usize).collect();
let strategy = match request.strategy.as_str() {
"greedy" => SamplingStrategy::Greedy,
"top_k" => SamplingStrategy::TopK { k: request.top_k },
"top_p" => SamplingStrategy::TopP { p: request.top_p },
_ => {
state.metrics.record_failure();
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: format!("Invalid strategy: {}", request.strategy),
}),
));
},
};
let mut config = GenerationConfig::default()
.with_max_tokens(request.max_tokens)
.with_temperature(request.temperature);
config.strategy = strategy;
if let Some(seed) = request.seed {
config = config.with_seed(seed);
}
let generated = model.generate(&prompt, &config).map_err(|e| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let token_ids: Vec<u32> = generated
.iter()
.map(|&id| {
u32::try_from(id).map_err(|_| {
(
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: format!("Token ID {id} exceeds u32 range"),
}),
)
})
})
.collect::<Result<Vec<_>, _>>()?;
let text = tokenizer.decode(&token_ids).map_err(|e| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let num_generated = generated.len() - prompt.len();
let duration = start.elapsed();
state.metrics.record_success(num_generated, duration);
Ok(Json(GenerateResponse {
token_ids,
text,
num_generated,
}))
}
async fn batch_tokenize_handler(
State(state): State<AppState>,
Json(request): Json<BatchTokenizeRequest>,
) -> Result<Json<BatchTokenizeResponse>, (StatusCode, Json<ErrorResponse>)> {
if request.texts.is_empty() {
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Texts array cannot be empty".to_string(),
}),
));
}
let (_model, tokenizer) = state.get_model(None).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let results: Vec<TokenizeResponse> = request
.texts
.iter()
.map(|text| {
let token_ids = tokenizer.encode(text);
let num_tokens = token_ids.len();
TokenizeResponse {
token_ids,
num_tokens,
}
})
.collect();
Ok(Json(BatchTokenizeResponse { results }))
}
async fn batch_generate_handler(
State(state): State<AppState>,
Json(request): Json<BatchGenerateRequest>,
) -> Result<Json<BatchGenerateResponse>, (StatusCode, Json<ErrorResponse>)> {
if request.prompts.is_empty() {
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Prompts array cannot be empty".to_string(),
}),
));
}
let (model, tokenizer) = state.get_model(None).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let strategy = match request.strategy.as_str() {
"greedy" => SamplingStrategy::Greedy,
"top_k" => SamplingStrategy::TopK { k: request.top_k },
"top_p" => SamplingStrategy::TopP { p: request.top_p },
_ => {
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: format!("Invalid strategy: {}", request.strategy),
}),
));
},
};
let mut config = GenerationConfig::default()
.with_max_tokens(request.max_tokens)
.with_temperature(request.temperature);
config.strategy = strategy;
if let Some(seed) = request.seed {
config = config.with_seed(seed);
}
let mut results = Vec::with_capacity(request.prompts.len());
for prompt_text in &request.prompts {
let prompt_ids = tokenizer.encode(prompt_text);
if prompt_ids.is_empty() {
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: format!("Prompt '{prompt_text}' tokenizes to empty sequence"),
}),
));
}
let prompt: Vec<usize> = prompt_ids.iter().map(|&id| id as usize).collect();
let generated = model.generate(&prompt, &config).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let token_ids: Vec<u32> = generated
.iter()
.map(|&id| {
u32::try_from(id).map_err(|_| {
(
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: format!("Token ID {id} exceeds u32 range"),
}),
)
})
})
.collect::<Result<Vec<_>, _>>()?;
let text = tokenizer.decode(&token_ids).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let num_generated = generated.len() - prompt.len();
results.push(GenerateResponse {
token_ids,
text,
num_generated,
});
}
Ok(Json(BatchGenerateResponse { results }))
}
async fn stream_generate_handler(
State(state): State<AppState>,
Json(request): Json<GenerateRequest>,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, (StatusCode, Json<ErrorResponse>)> {
let (model, tokenizer) = state.get_model(request.model_id.as_deref()).map_err(|e| {
(
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let prompt_ids = tokenizer.encode(&request.prompt);
if prompt_ids.is_empty() {
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Prompt cannot be empty".to_string(),
}),
));
}
let prompt: Vec<usize> = prompt_ids.iter().map(|&id| id as usize).collect();
let prompt_len = prompt.len();
let strategy = match request.strategy.as_str() {
"greedy" => SamplingStrategy::Greedy,
"top_k" => SamplingStrategy::TopK { k: request.top_k },
"top_p" => SamplingStrategy::TopP { p: request.top_p },
_ => {
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: format!("Invalid strategy: {}", request.strategy),
}),
));
},
};
let mut config = GenerationConfig::default()
.with_max_tokens(request.max_tokens)
.with_temperature(request.temperature);
config.strategy = strategy;
if let Some(seed) = request.seed {
config = config.with_seed(seed);
}
let generated = match model.generate(&prompt, &config) {
Ok(tokens) => tokens,
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
));
},
};
let token_ids: Vec<u32> = generated
.iter()
.map(|&id| {
u32::try_from(id).map_err(|_| {
(
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: format!("Token ID {id} exceeds u32 range"),
}),
)
})
})
.collect::<Result<Vec<_>, _>>()?;
let tokenizer_clone = tokenizer;
let stream = async_stream::stream! {
for &token_id in &token_ids[prompt_len..] {
let text = match tokenizer_clone.decode(&[token_id]) {
Ok(t) => t,
Err(_) => String::from("<error>"),
};
let event = StreamTokenEvent { token_id, text };
let data = serde_json::to_string(&event)
.unwrap_or_else(|_| r#"{"error":"serialization failed"}"#.to_string());
yield Ok::<_, Infallible>(Event::default().event("token").data(data));
}
let done_event = StreamDoneEvent {
num_generated: token_ids.len() - prompt_len,
};
let data = serde_json::to_string(&done_event)
.unwrap_or_else(|_| r#"{"error":"serialization failed"}"#.to_string());
yield Ok(Event::default().event("done").data(data));
};
Ok(Sse::new(stream))
}
async fn openai_models_handler(State(state): State<AppState>) -> Json<OpenAIModelsResponse> {
let models = if let Some(registry) = &state.registry {
registry
.list()
.into_iter()
.map(|m| OpenAIModel {
id: m.id,
object: "model".to_string(),
created: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0),
owned_by: "realizar".to_string(),
})
.collect()
} else {
vec![OpenAIModel {
id: "default".to_string(),
object: "model".to_string(),
created: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0),
owned_by: "realizar".to_string(),
}]
};
Json(OpenAIModelsResponse {
object: "list".to_string(),
data: models,
})
}
async fn openai_chat_completions_handler(
State(state): State<AppState>,
Json(request): Json<ChatCompletionRequest>,
) -> Result<Json<ChatCompletionResponse>, (StatusCode, Json<ErrorResponse>)> {
use std::time::Instant;
let start = Instant::now();
let model_id = if request.model == "default" || request.model.is_empty() {
None
} else {
Some(request.model.as_str())
};
let (model, tokenizer) = state.get_model(model_id).map_err(|e| {
state.metrics.record_failure();
(
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let prompt_text = format_chat_messages(&request.messages);
let prompt_ids = tokenizer.encode(&prompt_text);
if prompt_ids.is_empty() {
state.metrics.record_failure();
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Messages cannot be empty".to_string(),
}),
));
}
let prompt_tokens = prompt_ids.len();
let prompt: Vec<usize> = prompt_ids.iter().map(|&id| id as usize).collect();
let max_tokens = request.max_tokens.unwrap_or(256);
let temperature = request.temperature.unwrap_or(0.7);
let mut config = GenerationConfig::default()
.with_max_tokens(max_tokens)
.with_temperature(temperature);
if let Some(top_p) = request.top_p {
config.strategy = SamplingStrategy::TopP { p: top_p };
}
let generated = model.generate(&prompt, &config).map_err(|e| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let token_ids: Vec<u32> = generated
.iter()
.map(|&id| {
u32::try_from(id).map_err(|_| {
(
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: format!("Token ID {id} exceeds u32 range"),
}),
)
})
})
.collect::<Result<Vec<_>, _>>()?;
let generated_ids = &token_ids[prompt.len()..];
let response_text = tokenizer.decode(generated_ids).map_err(|e| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let completion_tokens = generated_ids.len();
let duration = start.elapsed();
state.metrics.record_success(completion_tokens, duration);
let request_id = format!(
"chatcmpl-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
);
Ok(Json(ChatCompletionResponse {
id: request_id,
object: "chat.completion".to_string(),
created: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0),
model: request.model.clone(),
choices: vec![ChatChoice {
index: 0,
message: ChatMessage {
role: "assistant".to_string(),
content: response_text,
name: None,
},
finish_reason: "stop".to_string(),
}],
usage: Usage {
prompt_tokens,
completion_tokens,
total_tokens: prompt_tokens + completion_tokens,
},
}))
}
async fn openai_chat_completions_stream_handler(
State(state): State<AppState>,
Json(request): Json<ChatCompletionRequest>,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, (StatusCode, Json<ErrorResponse>)> {
let model_id = if request.model == "default" || request.model.is_empty() {
None
} else {
Some(request.model.as_str())
};
let (model, tokenizer) = state.get_model(model_id).map_err(|e| {
state.metrics.record_failure();
(
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let prompt_text = format_chat_messages(&request.messages);
let prompt_ids = tokenizer.encode(&prompt_text);
if prompt_ids.is_empty() {
state.metrics.record_failure();
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Messages cannot be empty".to_string(),
}),
));
}
let prompt_len = prompt_ids.len();
let prompt: Vec<usize> = prompt_ids.iter().map(|&id| id as usize).collect();
let max_tokens = request.max_tokens.unwrap_or(256);
let temperature = request.temperature.unwrap_or(0.7);
let mut config = GenerationConfig::default()
.with_max_tokens(max_tokens)
.with_temperature(temperature);
if let Some(top_p) = request.top_p {
config.strategy = SamplingStrategy::TopP { p: top_p };
}
let request_id = format!(
"chatcmpl-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
);
let generated = model.generate(&prompt, &config).map_err(|e| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let token_ids: Vec<u32> = generated
.iter()
.filter_map(|&id| u32::try_from(id).ok())
.collect();
let generated_ids = token_ids[prompt_len..].to_vec();
let model_name = request.model.clone();
let request_id_clone = request_id.clone();
let tokenizer_clone = tokenizer;
let stream = async_stream::stream! {
let initial = ChatCompletionChunk::initial(&request_id_clone, &model_name);
let data = serde_json::to_string(&initial).unwrap_or_default();
yield Ok(Event::default().data(format!("data: {}\n", data)));
for &token_id in &generated_ids {
let text = match tokenizer_clone.decode(&[token_id]) {
Ok(t) => t,
Err(_) => continue,
};
let chunk = ChatCompletionChunk::content(&request_id_clone, &model_name, &text);
let data = serde_json::to_string(&chunk).unwrap_or_default();
yield Ok(Event::default().data(format!("data: {}\n", data)));
}
let done = ChatCompletionChunk::done(&request_id_clone, &model_name);
let data = serde_json::to_string(&done).unwrap_or_default();
yield Ok(Event::default().data(format!("data: {}\n", data)));
yield Ok(Event::default().data("data: [DONE]\n".to_string()));
};
Ok(Sse::new(stream))
}
#[derive(Debug, Clone)]
pub struct ContextWindowConfig {
pub max_tokens: usize,
pub reserved_output_tokens: usize,
pub preserve_system: bool,
}
impl Default for ContextWindowConfig {
fn default() -> Self {
Self {
max_tokens: 4096,
reserved_output_tokens: 256,
preserve_system: true,
}
}
}
impl ContextWindowConfig {
#[must_use]
pub fn new(max_tokens: usize) -> Self {
Self {
max_tokens,
..Default::default()
}
}
#[must_use]
pub fn with_reserved_output(mut self, tokens: usize) -> Self {
self.reserved_output_tokens = tokens;
self
}
fn available_tokens(&self) -> usize {
self.max_tokens.saturating_sub(self.reserved_output_tokens)
}
}
pub struct ContextWindowManager {
config: ContextWindowConfig,
}
impl ContextWindowManager {
#[must_use]
pub fn new(config: ContextWindowConfig) -> Self {
Self { config }
}
#[must_use]
pub fn default_manager() -> Self {
Self::new(ContextWindowConfig::default())
}
fn estimate_tokens(text: &str) -> usize {
const ROLE_OVERHEAD: usize = 10;
text.len().div_ceil(4) + ROLE_OVERHEAD
}
pub fn truncate_messages(&self, messages: &[ChatMessage]) -> (Vec<ChatMessage>, bool) {
let available = self.config.available_tokens();
let total_tokens: usize = messages
.iter()
.map(|m| Self::estimate_tokens(&m.content))
.sum();
if total_tokens <= available {
return (messages.to_vec(), false);
}
let mut result = Vec::new();
let mut used_tokens = 0;
let (system_msgs, other_msgs): (Vec<_>, Vec<_>) = messages
.iter()
.partition(|m| m.role == "system" && self.config.preserve_system);
for msg in &system_msgs {
let tokens = Self::estimate_tokens(&msg.content);
if used_tokens + tokens <= available {
result.push((*msg).clone());
used_tokens += tokens;
}
}
let mut temp_msgs: Vec<ChatMessage> = Vec::new();
for msg in other_msgs.iter().rev() {
let tokens = Self::estimate_tokens(&msg.content);
if used_tokens + tokens <= available {
temp_msgs.push((*msg).clone());
used_tokens += tokens;
} else {
break;
}
}
temp_msgs.reverse();
result.extend(temp_msgs);
(result, true)
}
pub fn needs_truncation(&self, messages: &[ChatMessage]) -> bool {
let available = self.config.available_tokens();
let total_tokens: usize = messages
.iter()
.map(|m| Self::estimate_tokens(&m.content))
.sum();
total_tokens > available
}
pub fn estimate_total_tokens(&self, messages: &[ChatMessage]) -> usize {
messages
.iter()
.map(|m| Self::estimate_tokens(&m.content))
.sum()
}
}
fn format_chat_messages(messages: &[ChatMessage]) -> String {
let mut prompt = String::new();
for msg in messages {
match msg.role.as_str() {
"system" => {
prompt.push_str("System: ");
prompt.push_str(&msg.content);
prompt.push('\n');
},
"user" => {
prompt.push_str("User: ");
prompt.push_str(&msg.content);
prompt.push('\n');
},
"assistant" => {
prompt.push_str("Assistant: ");
prompt.push_str(&msg.content);
prompt.push('\n');
},
_ => {
prompt.push_str(&msg.content);
prompt.push('\n');
},
}
}
prompt.push_str("Assistant: ");
prompt
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbeddingRequest {
pub input: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub model: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbeddingResponse {
pub object: String,
pub data: Vec<EmbeddingData>,
pub model: String,
pub usage: EmbeddingUsage,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbeddingData {
pub object: String,
pub index: usize,
pub embedding: Vec<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbeddingUsage {
pub prompt_tokens: usize,
pub total_tokens: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ModelMetadataResponse {
pub id: String,
pub name: String,
pub format: String,
pub size_bytes: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub quantization: Option<String>,
pub context_length: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub lineage: Option<ModelLineage>,
pub loaded: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ModelLineage {
pub uri: String,
pub version: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub recipe: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent: Option<String>,
pub content_hash: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReloadRequest {
#[serde(skip_serializing_if = "Option::is_none")]
pub model: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReloadResponse {
pub success: bool,
pub message: String,
pub reload_time_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompletionRequest {
pub model: String,
pub prompt: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_tokens: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub temperature: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub top_p: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stop: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompletionResponse {
pub id: String,
pub object: String,
pub created: u64,
pub model: String,
pub choices: Vec<CompletionChoice>,
pub usage: Usage,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompletionChoice {
pub text: String,
pub index: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub logprobs: Option<serde_json::Value>,
pub finish_reason: String,
}
async fn realize_embed_handler(
State(state): State<AppState>,
Json(request): Json<EmbeddingRequest>,
) -> Result<Json<EmbeddingResponse>, (StatusCode, Json<ErrorResponse>)> {
let model_id = request.model.as_deref();
let (_model, tokenizer) = state.get_model(model_id).map_err(|e| {
(
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let token_ids = tokenizer.encode(&request.input);
let prompt_tokens = token_ids.len();
let mut embedding = vec![0.0f32; 384];
for (i, &token_id) in token_ids.iter().enumerate() {
let idx = (token_id as usize) % embedding.len();
let pos_weight = 1.0 / (1.0 + i as f32);
embedding[idx] += pos_weight;
}
let norm: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm > 0.0 {
for v in &mut embedding {
*v /= norm;
}
}
Ok(Json(EmbeddingResponse {
object: "list".to_string(),
data: vec![EmbeddingData {
object: "embedding".to_string(),
index: 0,
embedding,
}],
model: request.model.unwrap_or_else(|| "default".to_string()),
usage: EmbeddingUsage {
prompt_tokens,
total_tokens: prompt_tokens,
},
}))
}
async fn realize_model_handler(
State(state): State<AppState>,
) -> Result<Json<ModelMetadataResponse>, (StatusCode, Json<ErrorResponse>)> {
let model_info = if let Some(registry) = &state.registry {
let models = registry.list();
models.first().cloned()
} else {
Some(ModelInfo {
id: "default".to_string(),
name: "Default Model".to_string(),
description: "Single model deployment".to_string(),
format: "gguf".to_string(),
loaded: true,
})
};
let info = model_info.ok_or_else(|| {
(
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: "No model loaded".to_string(),
}),
)
})?;
Ok(Json(ModelMetadataResponse {
id: info.id.clone(),
name: info.name,
format: info.format,
size_bytes: 0, quantization: Some("Q4_K_M".to_string()),
context_length: 4096,
lineage: Some(ModelLineage {
uri: format!("pacha://{}:latest", info.id),
version: "1.0.0".to_string(),
recipe: None,
parent: None,
content_hash: "blake3:0".repeat(16),
}),
loaded: info.loaded,
}))
}
async fn realize_reload_handler(
State(state): State<AppState>,
Json(request): Json<ReloadRequest>,
) -> Result<Json<ReloadResponse>, (StatusCode, Json<ErrorResponse>)> {
let start = std::time::Instant::now();
let model_id = request.model.unwrap_or_else(|| "default".to_string());
let registry = state.registry.as_ref().ok_or_else(|| {
(
StatusCode::NOT_IMPLEMENTED,
Json(ErrorResponse {
error: "Hot-reload requires registry mode. Start server with --registry flag."
.to_string(),
}),
)
})?;
let model_path = request.path.ok_or_else(|| {
(
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Model path is required for reload. Provide 'path' field with path to model file.".to_string(),
}),
)
})?;
if !registry.contains(&model_id) {
return Err((
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: format!(
"Model '{}' not found in registry. Use POST /realize/models to register first.",
model_id
),
}),
));
}
if !std::path::Path::new(&model_path).exists() {
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: format!("Model file not found: {}", model_path),
}),
));
}
Ok(Json(ReloadResponse {
success: true,
message: format!(
"Model '{}' reload validated from '{}'. Atomic swap ready.",
model_id, model_path
),
reload_time_ms: start.elapsed().as_millis() as u64,
}))
}
async fn openai_completions_handler(
State(state): State<AppState>,
Json(request): Json<CompletionRequest>,
) -> Result<Json<CompletionResponse>, (StatusCode, Json<ErrorResponse>)> {
let start = std::time::Instant::now();
let max_tokens = request.max_tokens.unwrap_or(256);
let temperature = request.temperature.unwrap_or(0.7) as f32;
#[cfg(feature = "gpu")]
if let Some(cached_model) = state.cached_model() {
use crate::gguf::QuantizedGenerateConfig;
let tokenizer = state.tokenizer.clone().ok_or_else(|| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: "No tokenizer available".to_string(),
}),
)
})?;
let prompt_ids = tokenizer.encode(&request.prompt);
if prompt_ids.is_empty() {
state.metrics.record_failure();
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Prompt cannot be empty".to_string(),
}),
));
}
let prompt_tokens = prompt_ids.len();
if state.batch_enabled() {
if let Some(batch_tx) = state.batch_request_tx() {
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let batch_request = ContinuousBatchRequest {
prompt_tokens: prompt_ids.clone(),
max_tokens,
temperature,
top_k: if temperature == 0.0 { 1 } else { 40 },
response_tx,
submitted_at: std::time::Instant::now(),
};
if batch_tx.send(batch_request).await.is_ok() {
match response_rx.await {
Ok(batch_response) => {
let token_ids = batch_response.generated_tokens().to_vec();
let completion_tokens = token_ids.len();
let text = tokenizer.decode(&token_ids).map_err(|e| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let latency = start.elapsed();
state.metrics.record_success(completion_tokens, latency);
let response_id = format!(
"cmpl-batch-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis()
);
return Ok(Json(CompletionResponse {
id: response_id,
object: "text_completion".to_string(),
created: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0),
model: format!("batch-q4k-{}", batch_response.batch_size),
choices: vec![CompletionChoice {
text,
index: 0,
logprobs: None,
finish_reason: if completion_tokens >= max_tokens {
"length".to_string()
} else {
"stop".to_string()
},
}],
usage: Usage {
prompt_tokens,
completion_tokens,
total_tokens: prompt_tokens + completion_tokens,
},
}));
},
Err(_) => {
},
}
}
}
}
let q_config = QuantizedGenerateConfig {
max_tokens,
temperature,
top_k: if temperature == 0.0 { 1 } else { 40 },
stop_tokens: Vec::new(),
};
let generated = if let Some(metrics) = state.dispatch_metrics() {
cached_model
.generate_with_cache_adaptive(&prompt_ids, &q_config, metrics)
.map_err(|e| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?
} else {
cached_model
.generate_with_cache(&prompt_ids, &q_config)
.map_err(|e| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?
};
let token_ids: Vec<u32> = generated.iter().skip(prompt_tokens).copied().collect();
let completion_tokens = token_ids.len();
let text = tokenizer.decode(&token_ids).map_err(|e| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let latency = start.elapsed();
state.metrics.record_success(completion_tokens, latency);
let response_id = format!(
"cmpl-cached-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis()
);
return Ok(Json(CompletionResponse {
id: response_id,
object: "text_completion".to_string(),
created: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0),
model: "cached-q4k".to_string(),
choices: vec![CompletionChoice {
text,
index: 0,
logprobs: None,
finish_reason: if completion_tokens >= max_tokens {
"length".to_string()
} else {
"stop".to_string()
},
}],
usage: Usage {
prompt_tokens,
completion_tokens,
total_tokens: prompt_tokens + completion_tokens,
},
}));
}
if let Some(quantized_model) = state.quantized_model() {
use crate::gguf::QuantizedGenerateConfig;
let tokenizer = state.tokenizer.clone().ok_or_else(|| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: "No tokenizer available".to_string(),
}),
)
})?;
let prompt_ids = tokenizer.encode(&request.prompt);
if prompt_ids.is_empty() {
state.metrics.record_failure();
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Prompt cannot be empty".to_string(),
}),
));
}
let prompt_tokens = prompt_ids.len();
let q_config = QuantizedGenerateConfig {
max_tokens,
temperature,
top_k: if temperature == 0.0 { 1 } else { 40 },
stop_tokens: Vec::new(),
};
let generated = quantized_model
.generate_with_cache(&prompt_ids, &q_config)
.map_err(|e| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let token_ids: Vec<u32> = generated.iter().skip(prompt_tokens).copied().collect();
let completion_tokens = token_ids.len();
let text = tokenizer.decode(&token_ids).map_err(|e| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let latency = start.elapsed();
state.metrics.record_success(completion_tokens, latency);
let response_id = format!(
"cmpl-q4k-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis()
);
return Ok(Json(CompletionResponse {
id: response_id,
object: "text_completion".to_string(),
created: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
model: request.model.clone(),
choices: vec![CompletionChoice {
text,
index: 0,
logprobs: None,
finish_reason: "stop".to_string(),
}],
usage: Usage {
prompt_tokens,
completion_tokens,
total_tokens: prompt_tokens + completion_tokens,
},
}));
}
#[cfg(feature = "gpu")]
if let Some(gpu_model_lock) = state.gpu_model() {
use crate::gpu::GpuGenerateConfig;
let tokenizer = state.tokenizer.clone().ok_or_else(|| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: "No tokenizer available".to_string(),
}),
)
})?;
let prompt_ids = tokenizer.encode(&request.prompt);
if prompt_ids.is_empty() {
state.metrics.record_failure();
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Prompt cannot be empty".to_string(),
}),
));
}
let prompt_tokens = prompt_ids.len();
let prompt: Vec<usize> = prompt_ids.iter().map(|&id| id as usize).collect();
let gpu_config = GpuGenerateConfig {
max_tokens,
temperature,
top_k: 1, stop_tokens: Vec::new(),
};
let mut gpu_model = gpu_model_lock.write().map_err(|e| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: format!("Failed to acquire GPU model lock: {e}"),
}),
)
})?;
let generated = gpu_model.generate(&prompt, &gpu_config).map_err(|e| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let token_ids: Vec<u32> = generated
.iter()
.skip(prompt_tokens)
.filter_map(|&id| u32::try_from(id).ok())
.collect();
let completion_tokens = token_ids.len();
let text = tokenizer.decode(&token_ids).map_err(|e| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let latency = start.elapsed();
state.metrics.record_success(completion_tokens, latency);
let response_id = format!("cmpl-{}", &uuid::Uuid::new_v4().to_string()[..8]);
return Ok(Json(CompletionResponse {
id: response_id,
object: "text_completion".to_string(),
created: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
model: request.model.clone(),
choices: vec![CompletionChoice {
text,
index: 0,
logprobs: None,
finish_reason: "stop".to_string(),
}],
usage: Usage {
prompt_tokens,
completion_tokens,
total_tokens: prompt_tokens + completion_tokens,
},
}));
}
let model_id = if request.model == "default" || request.model.is_empty() {
None
} else {
Some(request.model.as_str())
};
let (model, tokenizer) = state.get_model(model_id).map_err(|e| {
state.metrics.record_failure();
(
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let prompt_ids = tokenizer.encode(&request.prompt);
if prompt_ids.is_empty() {
state.metrics.record_failure();
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Prompt cannot be empty".to_string(),
}),
));
}
let prompt_tokens = prompt_ids.len();
let prompt: Vec<usize> = prompt_ids.iter().map(|&id| id as usize).collect();
let mut config = GenerationConfig::default()
.with_max_tokens(max_tokens)
.with_temperature(temperature);
if let Some(top_p) = request.top_p {
config.strategy = SamplingStrategy::TopP { p: top_p as f32 };
}
let generated = model.generate(&prompt, &config).map_err(|e| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let token_ids: Vec<u32> = generated
.iter()
.skip(prompt_tokens)
.filter_map(|&id| u32::try_from(id).ok())
.collect();
let completion_tokens = token_ids.len();
let text = tokenizer.decode(&token_ids).map_err(|e| {
state.metrics.record_failure();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let latency = start.elapsed();
state.metrics.record_success(completion_tokens, latency);
let response_id = format!(
"cmpl-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
);
Ok(Json(CompletionResponse {
id: response_id,
object: "text_completion".to_string(),
created: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0),
model: request.model,
choices: vec![CompletionChoice {
text,
index: 0,
logprobs: None,
finish_reason: "stop".to_string(),
}],
usage: Usage {
prompt_tokens,
completion_tokens,
total_tokens: prompt_tokens + completion_tokens,
},
}))
}
async fn openai_embeddings_handler(
State(state): State<AppState>,
Json(request): Json<EmbeddingRequest>,
) -> Result<Json<EmbeddingResponse>, (StatusCode, Json<ErrorResponse>)> {
realize_embed_handler(State(state), Json(request)).await
}
async fn apr_predict_handler(
State(state): State<AppState>,
Json(request): Json<PredictRequest>,
) -> Result<Json<PredictResponse>, (StatusCode, Json<ErrorResponse>)> {
let start = std::time::Instant::now();
if request.features.is_empty() {
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Input features cannot be empty".to_string(),
}),
));
}
let apr_model = state.apr_model.as_ref().ok_or_else(|| {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(ErrorResponse {
error: "No APR model loaded. Use AppState::demo() or load a .apr model."
.to_string(),
}),
)
})?;
let request_id = state.audit_logger.log_request(
&format!("{:?}", apr_model.model_type()),
&[request.features.len()],
);
let output = apr_model.predict(&request.features).map_err(|e| {
(
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: format!("Inference failed: {e}"),
}),
)
})?;
let prediction = if output.len() == 1 {
serde_json::json!(output[0])
} else {
let max_idx = output
.iter()
.enumerate()
.max_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
.map_or(0, |(i, _)| i);
serde_json::json!(format!("class_{}", max_idx))
};
let confidence = if output.len() > 1 {
let max_val = output.iter().copied().fold(f32::NEG_INFINITY, f32::max);
let exp_sum: f32 = output.iter().map(|x| (x - max_val).exp()).sum();
let probs: Vec<f32> = output
.iter()
.map(|x| (x - max_val).exp() / exp_sum)
.collect();
probs.into_iter().fold(0.0_f32, f32::max)
} else {
1.0
};
let top_k_predictions = request.top_k.map(|k| {
if output.len() > 1 {
let max_val = output.iter().copied().fold(f32::NEG_INFINITY, f32::max);
let exp_sum: f32 = output.iter().map(|x| (x - max_val).exp()).sum();
let mut probs: Vec<(usize, f32)> = output
.iter()
.enumerate()
.map(|(i, x)| (i, (x - max_val).exp() / exp_sum))
.collect();
probs.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
probs
.into_iter()
.take(k)
.map(|(i, score)| PredictionWithScore {
label: format!("class_{}", i),
score,
})
.collect()
} else {
vec![PredictionWithScore {
label: format!("{:.4}", output[0]),
score: 1.0,
}]
}
});
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
state.audit_logger.log_response(
request_id,
prediction.clone(),
start.elapsed(),
Some(confidence),
);
Ok(Json(PredictResponse {
request_id: request_id.to_string(),
model: request.model.unwrap_or_else(|| "default".to_string()),
prediction,
confidence: if request.include_confidence {
Some(confidence)
} else {
None
},
top_k_predictions,
latency_ms,
}))
}
async fn apr_explain_handler(
State(_state): State<AppState>,
Json(request): Json<ExplainRequest>,
) -> Result<Json<ExplainResponse>, (StatusCode, Json<ErrorResponse>)> {
let start = std::time::Instant::now();
let request_id = uuid::Uuid::new_v4().to_string();
if request.features.is_empty() {
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Input features cannot be empty".to_string(),
}),
));
}
if request.feature_names.len() != request.features.len() {
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: format!(
"Feature names count ({}) must match features count ({})",
request.feature_names.len(),
request.features.len()
),
}),
));
}
let shap_values: Vec<f32> = request
.features
.iter()
.enumerate()
.map(|(i, _)| 0.1 - (i as f32 * 0.02))
.collect();
let explanation = ShapExplanation {
base_value: 0.0,
shap_values: shap_values.clone(),
feature_names: request.feature_names.clone(),
prediction: 0.95,
};
let mut feature_importance: Vec<_> = request
.feature_names
.iter()
.zip(shap_values.iter())
.collect();
feature_importance.sort_by(|a, b| b.1.abs().partial_cmp(&a.1.abs()).unwrap());
let top_features: Vec<_> = feature_importance
.iter()
.take(request.top_k_features)
.collect();
let summary = if top_features.is_empty() {
"No significant features found.".to_string()
} else {
let feature_strs: Vec<String> = top_features
.iter()
.map(|(name, val)| {
let direction = if **val > 0.0 { "+" } else { "-" };
format!("{} ({})", name, direction)
})
.collect();
format!("Top contributing features: {}", feature_strs.join(", "))
};
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
Ok(Json(ExplainResponse {
request_id,
model: request.model.unwrap_or_else(|| "default".to_string()),
prediction: serde_json::json!(0.95),
confidence: Some(0.95),
explanation,
summary,
latency_ms,
}))
}
async fn apr_audit_handler(
State(state): State<AppState>,
Path(request_id): Path<String>,
) -> Result<Json<AuditResponse>, (StatusCode, Json<ErrorResponse>)> {
if uuid::Uuid::parse_str(&request_id).is_err() {
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: format!("Invalid request ID format: {}", request_id),
}),
));
}
let _ = state.audit_logger.flush();
let records = state.audit_sink.records();
let record = records
.into_iter()
.find(|r| r.request_id == request_id)
.ok_or_else(|| {
(
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: format!("Audit record not found for request_id: {}", request_id),
}),
)
})?;
Ok(Json(AuditResponse { record }))
}
#[cfg(all(test, feature = "heavy-tests"))]
mod tests {
use axum::{
body::Body,
http::{Request, StatusCode},
};
use tower::util::ServiceExt;
use super::*;
fn create_test_app() -> Router {
let state = AppState::demo().unwrap();
create_router(state)
}
#[tokio::test]
async fn test_health_endpoint() {
let app = create_test_app();
let response = app
.oneshot(
Request::builder()
.uri("/health")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let health: HealthResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(health.status, "healthy");
}
#[tokio::test]
async fn test_metrics_endpoint() {
let app = create_test_app();
let response = app
.oneshot(
Request::builder()
.uri("/metrics")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let metrics_text = String::from_utf8(body.to_vec()).unwrap();
assert!(metrics_text.contains("realizar_requests_total"));
assert!(metrics_text.contains("realizar_tokens_generated"));
assert!(metrics_text.contains("realizar_error_rate"));
assert!(metrics_text.contains("# HELP"));
assert!(metrics_text.contains("# TYPE"));
}
#[tokio::test]
async fn test_metrics_tracking() {
let state = AppState::demo().unwrap();
let app = create_router(state.clone());
let request = GenerateRequest {
prompt: "token1".to_string(),
max_tokens: 3,
temperature: 1.0,
strategy: "greedy".to_string(),
top_k: 50,
top_p: 0.9,
seed: Some(42),
model_id: None,
};
let _response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/generate")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
let snapshot = state.metrics.snapshot();
assert_eq!(snapshot.total_requests, 1);
assert_eq!(snapshot.successful_requests, 1);
assert!(snapshot.total_tokens > 0);
}
#[tokio::test]
async fn test_parity107_server_metrics_endpoint() {
let app = create_test_app();
let response = app
.oneshot(
Request::builder()
.uri("/v1/metrics")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let metrics: ServerMetricsResponse = serde_json::from_slice(&body).unwrap();
assert!(metrics.throughput_tok_per_sec >= 0.0);
assert!(metrics.latency_p50_ms >= 0.0);
assert!(metrics.latency_p95_ms >= 0.0);
assert!(metrics.latency_p99_ms >= 0.0);
assert!(metrics.gpu_utilization_percent <= 100);
assert!(metrics.batch_size >= 1);
assert!(!metrics.model_name.is_empty());
}
#[tokio::test]
async fn test_tokenize_endpoint() {
let app = create_test_app();
let request = TokenizeRequest {
text: "token1 token2".to_string(),
model_id: None,
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/tokenize")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let result: TokenizeResponse = serde_json::from_slice(&body).unwrap();
assert!(result.num_tokens > 0);
}
#[tokio::test]
async fn test_generate_endpoint() {
let app = create_test_app();
let request = GenerateRequest {
prompt: "token1".to_string(),
max_tokens: 3,
temperature: 1.0,
strategy: "greedy".to_string(),
top_k: 50,
top_p: 0.9,
seed: Some(42),
model_id: None,
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/generate")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let result: GenerateResponse = serde_json::from_slice(&body).unwrap();
assert!(!result.token_ids.is_empty());
}
#[tokio::test]
async fn test_generate_empty_prompt_error() {
let app = create_test_app();
let request = GenerateRequest {
prompt: String::new(),
max_tokens: 3,
temperature: 1.0,
strategy: "greedy".to_string(),
top_k: 50,
top_p: 0.9,
seed: None,
model_id: None,
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/generate")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn test_generate_invalid_strategy_error() {
let app = create_test_app();
let request = GenerateRequest {
prompt: "token1".to_string(),
max_tokens: 3,
temperature: 1.0,
strategy: "invalid".to_string(),
top_k: 50,
top_p: 0.9,
seed: None,
model_id: None,
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/generate")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn test_generate_top_k_strategy() {
let app = create_test_app();
let request = GenerateRequest {
prompt: "token1".to_string(),
max_tokens: 2,
temperature: 0.8,
strategy: "top_k".to_string(),
top_k: 5,
top_p: 0.9,
seed: Some(123),
model_id: None,
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/generate")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_generate_top_p_strategy() {
let app = create_test_app();
let request = GenerateRequest {
prompt: "token1".to_string(),
max_tokens: 2,
temperature: 0.7,
strategy: "top_p".to_string(),
top_k: 50,
top_p: 0.9,
seed: Some(456),
model_id: None,
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/generate")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_app_state_demo() {
let state = AppState::demo();
assert!(state.is_ok());
let state = state.unwrap();
assert_eq!(state.tokenizer.as_ref().unwrap().vocab_size(), 100);
}
#[test]
fn test_default_max_tokens() {
assert_eq!(default_max_tokens(), 50);
}
#[test]
fn test_default_temperature() {
assert!((default_temperature() - 1.0).abs() < 1e-6);
}
#[test]
fn test_default_strategy() {
assert_eq!(default_strategy(), "greedy");
}
#[test]
fn test_default_top_k() {
assert_eq!(default_top_k(), 50);
}
#[test]
fn test_default_top_p() {
assert!((default_top_p() - 0.9).abs() < 1e-6);
}
#[tokio::test]
async fn test_generate_with_defaults() {
let app = create_test_app();
let json = r#"{"prompt": "test"}"#;
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/generate")
.header("content-type", "application/json")
.body(Body::from(json))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let result: GenerateResponse = serde_json::from_slice(&body).unwrap();
assert!(!result.token_ids.is_empty());
assert!(result.num_generated <= 50);
}
#[tokio::test]
async fn test_num_generated_calculation() {
let app1 = create_test_app();
let prompt_tokens = app1
.oneshot(
Request::builder()
.method("POST")
.uri("/tokenize")
.header("content-type", "application/json")
.body(Body::from(r#"{"text": "a"}"#))
.unwrap(),
)
.await
.unwrap();
let prompt_body = axum::body::to_bytes(prompt_tokens.into_body(), usize::MAX)
.await
.unwrap();
let prompt_result: TokenizeResponse = serde_json::from_slice(&prompt_body).unwrap();
let prompt_len = prompt_result.token_ids.len();
let app2 = create_test_app();
let request = GenerateRequest {
prompt: "a".to_string(),
max_tokens: 5,
temperature: 1.0,
strategy: "greedy".to_string(),
top_k: 50,
top_p: 0.9,
seed: Some(42),
model_id: None,
};
let response = app2
.oneshot(
Request::builder()
.method("POST")
.uri("/generate")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let result: GenerateResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(result.num_generated, result.token_ids.len() - prompt_len);
assert!(result.num_generated > 0);
assert!(result.num_generated <= 5);
}
#[tokio::test]
async fn test_batch_tokenize_endpoint() {
let app = create_test_app();
let request = BatchTokenizeRequest {
texts: vec!["token1".to_string(), "token2 token3".to_string()],
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/batch/tokenize")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let result: BatchTokenizeResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(result.results.len(), 2);
assert!(result.results[0].num_tokens > 0);
assert!(result.results[1].num_tokens > 0);
}
#[tokio::test]
async fn test_batch_tokenize_empty_array_error() {
let app = create_test_app();
let request = BatchTokenizeRequest { texts: vec![] };
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/batch/tokenize")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn test_batch_generate_endpoint() {
let app = create_test_app();
let request = BatchGenerateRequest {
prompts: vec!["token1".to_string(), "token2".to_string()],
max_tokens: 3,
temperature: 1.0,
strategy: "greedy".to_string(),
top_k: 50,
top_p: 0.9,
seed: Some(42),
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/batch/generate")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let result: BatchGenerateResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(result.results.len(), 2);
assert!(!result.results[0].token_ids.is_empty());
assert!(!result.results[1].token_ids.is_empty());
assert!(!result.results[0].text.is_empty());
assert!(!result.results[1].text.is_empty());
}
#[tokio::test]
async fn test_batch_generate_empty_array_error() {
let app = create_test_app();
let request = BatchGenerateRequest {
prompts: vec![],
max_tokens: 3,
temperature: 1.0,
strategy: "greedy".to_string(),
top_k: 50,
top_p: 0.9,
seed: None,
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/batch/generate")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn test_batch_generate_with_defaults() {
let app = create_test_app();
let json = r#"{"prompts": ["test1", "test2"]}"#;
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/batch/generate")
.header("content-type", "application/json")
.body(Body::from(json))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let result: BatchGenerateResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(result.results.len(), 2);
for gen_result in &result.results {
assert!(gen_result.num_generated <= 50);
}
}
#[tokio::test]
async fn test_batch_generate_order_preserved() {
let app = create_test_app();
let request = BatchGenerateRequest {
prompts: vec![
"token1".to_string(),
"token2".to_string(),
"token3".to_string(),
],
max_tokens: 2,
temperature: 1.0,
strategy: "greedy".to_string(),
top_k: 50,
top_p: 0.9,
seed: Some(123),
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/batch/generate")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let result: BatchGenerateResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(result.results.len(), 3);
for gen_result in &result.results {
assert!(!gen_result.token_ids.is_empty());
assert!(!gen_result.text.is_empty());
}
}
#[tokio::test]
async fn test_batch_generate_invalid_strategy_error() {
let app = create_test_app();
let request = BatchGenerateRequest {
prompts: vec!["test".to_string()],
max_tokens: 3,
temperature: 1.0,
strategy: "invalid".to_string(),
top_k: 50,
top_p: 0.9,
seed: None,
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/batch/generate")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn test_batch_generate_top_k_strategy() {
let app = create_test_app();
let request = BatchGenerateRequest {
prompts: vec!["token1".to_string(), "token2".to_string()],
max_tokens: 2,
temperature: 0.8,
strategy: "top_k".to_string(),
top_k: 5,
top_p: 0.9,
seed: Some(456),
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/batch/generate")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let result: BatchGenerateResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(result.results.len(), 2);
}
#[tokio::test]
async fn test_batch_generate_top_p_strategy() {
let app = create_test_app();
let request = BatchGenerateRequest {
prompts: vec!["token1".to_string()],
max_tokens: 2,
temperature: 0.7,
strategy: "top_p".to_string(),
top_k: 50,
top_p: 0.9,
seed: Some(789),
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/batch/generate")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let result: BatchGenerateResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(result.results.len(), 1);
}
#[tokio::test]
async fn test_openai_models_endpoint() {
let app = create_test_app();
let response = app
.oneshot(
Request::builder()
.uri("/v1/models")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let result: OpenAIModelsResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(result.object, "list");
assert!(!result.data.is_empty());
assert_eq!(result.data[0].object, "model");
assert_eq!(result.data[0].owned_by, "realizar");
}
#[tokio::test]
async fn test_openai_chat_completions_endpoint() {
let app = create_test_app();
let request = ChatCompletionRequest {
model: "default".to_string(),
messages: vec![
ChatMessage {
role: "system".to_string(),
content: "You are a helpful assistant.".to_string(),
name: None,
},
ChatMessage {
role: "user".to_string(),
content: "Hello".to_string(),
name: None,
},
],
max_tokens: Some(10),
temperature: Some(0.7),
top_p: None,
n: 1,
stream: false,
stop: None,
user: None,
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/v1/chat/completions")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let result: ChatCompletionResponse = serde_json::from_slice(&body).unwrap();
assert!(result.id.starts_with("chatcmpl-"));
assert_eq!(result.object, "chat.completion");
assert_eq!(result.model, "default");
assert_eq!(result.choices.len(), 1);
assert_eq!(result.choices[0].message.role, "assistant");
assert_eq!(result.choices[0].finish_reason, "stop");
assert!(result.usage.total_tokens > 0);
}
#[tokio::test]
async fn test_openai_chat_completions_with_defaults() {
let app = create_test_app();
let json = r#"{"model": "default", "messages": [{"role": "user", "content": "Hi"}]}"#;
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/v1/chat/completions")
.header("content-type", "application/json")
.body(Body::from(json))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let result: ChatCompletionResponse = serde_json::from_slice(&body).unwrap();
assert!(result.id.starts_with("chatcmpl-"));
assert_eq!(result.choices.len(), 1);
}
#[test]
fn test_format_chat_messages_simple() {
let messages = vec![ChatMessage {
role: "user".to_string(),
content: "Hello".to_string(),
name: None,
}];
let result = format_chat_messages(&messages);
assert!(result.contains("User: Hello"));
assert!(result.ends_with("Assistant: "));
}
#[test]
fn test_format_chat_messages_with_system() {
let messages = vec![
ChatMessage {
role: "system".to_string(),
content: "You are helpful.".to_string(),
name: None,
},
ChatMessage {
role: "user".to_string(),
content: "Hi".to_string(),
name: None,
},
];
let result = format_chat_messages(&messages);
assert!(result.contains("System: You are helpful."));
assert!(result.contains("User: Hi"));
assert!(result.ends_with("Assistant: "));
}
#[test]
fn test_format_chat_messages_conversation() {
let messages = vec![
ChatMessage {
role: "user".to_string(),
content: "Hello".to_string(),
name: None,
},
ChatMessage {
role: "assistant".to_string(),
content: "Hi there!".to_string(),
name: None,
},
ChatMessage {
role: "user".to_string(),
content: "How are you?".to_string(),
name: None,
},
];
let result = format_chat_messages(&messages);
assert!(result.contains("User: Hello"));
assert!(result.contains("Assistant: Hi there!"));
assert!(result.contains("User: How are you?"));
assert!(result.ends_with("Assistant: "));
}
#[test]
fn test_default_n() {
assert_eq!(default_n(), 1);
}
#[test]
fn test_chat_message_serialization() {
let msg = ChatMessage {
role: "user".to_string(),
content: "Hello".to_string(),
name: Some("test_user".to_string()),
};
let json = serde_json::to_string(&msg).unwrap();
assert!(json.contains("\"role\":\"user\""));
assert!(json.contains("\"content\":\"Hello\""));
assert!(json.contains("\"name\":\"test_user\""));
}
#[test]
fn test_usage_serialization() {
let usage = Usage {
prompt_tokens: 10,
completion_tokens: 20,
total_tokens: 30,
};
let json = serde_json::to_string(&usage).unwrap();
assert!(json.contains("\"prompt_tokens\":10"));
assert!(json.contains("\"completion_tokens\":20"));
assert!(json.contains("\"total_tokens\":30"));
}
#[test]
fn test_chat_completion_chunk_initial() {
let chunk = ChatCompletionChunk::initial("chatcmpl-123", "gpt-4");
assert_eq!(chunk.id, "chatcmpl-123");
assert_eq!(chunk.object, "chat.completion.chunk");
assert_eq!(chunk.model, "gpt-4");
assert_eq!(chunk.choices.len(), 1);
assert_eq!(chunk.choices[0].delta.role, Some("assistant".to_string()));
assert!(chunk.choices[0].delta.content.is_none());
assert!(chunk.choices[0].finish_reason.is_none());
}
#[test]
fn test_chat_completion_chunk_content() {
let chunk = ChatCompletionChunk::content("chatcmpl-123", "gpt-4", "Hello");
assert_eq!(chunk.id, "chatcmpl-123");
assert_eq!(chunk.choices[0].delta.content, Some("Hello".to_string()));
assert!(chunk.choices[0].delta.role.is_none());
assert!(chunk.choices[0].finish_reason.is_none());
}
#[test]
fn test_chat_completion_chunk_done() {
let chunk = ChatCompletionChunk::done("chatcmpl-123", "gpt-4");
assert_eq!(chunk.id, "chatcmpl-123");
assert!(chunk.choices[0].delta.content.is_none());
assert!(chunk.choices[0].delta.role.is_none());
assert_eq!(chunk.choices[0].finish_reason, Some("stop".to_string()));
}
#[test]
fn test_chat_completion_chunk_serialization() {
let chunk = ChatCompletionChunk::content("chatcmpl-123", "gpt-4", "Hi");
let json = serde_json::to_string(&chunk).unwrap();
assert!(json.contains("\"object\":\"chat.completion.chunk\""));
assert!(json.contains("\"id\":\"chatcmpl-123\""));
assert!(json.contains("\"content\":\"Hi\""));
}
#[test]
fn test_chat_delta_serialization_skip_none() {
let delta = ChatDelta {
role: None,
content: Some("test".to_string()),
};
let json = serde_json::to_string(&delta).unwrap();
assert!(!json.contains("\"role\""));
assert!(json.contains("\"content\":\"test\""));
}
#[test]
fn test_chat_chunk_choice_serialization() {
let choice = ChatChunkChoice {
index: 0,
delta: ChatDelta {
role: Some("assistant".to_string()),
content: None,
},
finish_reason: None,
};
let json = serde_json::to_string(&choice).unwrap();
assert!(json.contains("\"index\":0"));
assert!(json.contains("\"role\":\"assistant\""));
assert!(!json.contains("\"content\""));
}
#[test]
fn test_streaming_chunk_created_timestamp() {
let chunk1 = ChatCompletionChunk::initial("id1", "model");
std::thread::sleep(std::time::Duration::from_millis(10));
let chunk2 = ChatCompletionChunk::initial("id2", "model");
assert!(chunk1.created > 0);
assert!(chunk2.created > 0);
assert!(chunk2.created >= chunk1.created);
}
#[test]
fn test_context_window_config_default() {
let config = ContextWindowConfig::default();
assert_eq!(config.max_tokens, 4096);
assert_eq!(config.reserved_output_tokens, 256);
assert!(config.preserve_system);
}
#[test]
fn test_context_window_config_new() {
let config = ContextWindowConfig::new(8192);
assert_eq!(config.max_tokens, 8192);
assert_eq!(config.reserved_output_tokens, 256);
}
#[test]
fn test_context_window_config_with_reserved() {
let config = ContextWindowConfig::new(4096).with_reserved_output(512);
assert_eq!(config.max_tokens, 4096);
assert_eq!(config.reserved_output_tokens, 512);
}
#[test]
fn test_context_window_available_tokens() {
let config = ContextWindowConfig::new(4096).with_reserved_output(256);
assert_eq!(config.available_tokens(), 3840);
}
#[test]
fn test_context_manager_no_truncation_needed() {
let manager = ContextWindowManager::default_manager();
let messages = vec![ChatMessage {
role: "user".to_string(),
content: "Hello".to_string(),
name: None,
}];
let (result, truncated) = manager.truncate_messages(&messages);
assert!(!truncated);
assert_eq!(result.len(), 1);
}
#[test]
fn test_context_manager_needs_truncation() {
let config = ContextWindowConfig::new(100).with_reserved_output(20);
let manager = ContextWindowManager::new(config);
let messages = vec![ChatMessage {
role: "user".to_string(),
content: "x".repeat(500),
name: None,
}];
assert!(manager.needs_truncation(&messages));
}
#[test]
fn test_context_manager_truncate_preserves_system() {
let config = ContextWindowConfig::new(80).with_reserved_output(20);
let manager = ContextWindowManager::new(config);
let messages = vec![
ChatMessage {
role: "system".to_string(),
content: "You are helpful.".to_string(),
name: None,
},
ChatMessage {
role: "user".to_string(),
content: "x".repeat(200), name: None,
},
ChatMessage {
role: "user".to_string(),
content: "Recent".to_string(),
name: None,
},
];
let (result, truncated) = manager.truncate_messages(&messages);
assert!(truncated);
assert!(result.iter().any(|m| m.role == "system"));
assert!(result.iter().any(|m| m.content == "Recent"));
}
#[test]
fn test_context_manager_truncate_keeps_recent() {
let config = ContextWindowConfig::new(100).with_reserved_output(20);
let mut cfg = config;
cfg.preserve_system = false;
let manager = ContextWindowManager::new(cfg);
let messages = vec![
ChatMessage {
role: "user".to_string(),
content: "Old message 1".to_string(),
name: None,
},
ChatMessage {
role: "user".to_string(),
content: "Old message 2".to_string(),
name: None,
},
ChatMessage {
role: "user".to_string(),
content: "Recent".to_string(),
name: None,
},
];
let (result, truncated) = manager.truncate_messages(&messages);
if truncated {
assert!(result.iter().any(|m| m.content == "Recent"));
}
}
#[test]
fn test_context_manager_estimate_tokens() {
let manager = ContextWindowManager::default_manager();
let messages = vec![ChatMessage {
role: "user".to_string(),
content: "Hello".to_string(),
name: None,
}];
let tokens = manager.estimate_total_tokens(&messages);
assert!(tokens > 0);
assert!(tokens < 100);
}
#[test]
fn test_context_manager_empty_messages() {
let manager = ContextWindowManager::default_manager();
let messages: Vec<ChatMessage> = vec![];
let (result, truncated) = manager.truncate_messages(&messages);
assert!(!truncated);
assert!(result.is_empty());
}
#[test]
fn test_context_manager_single_large_message() {
let config = ContextWindowConfig::new(100).with_reserved_output(20);
let manager = ContextWindowManager::new(config);
let messages = vec![ChatMessage {
role: "user".to_string(),
content: "x".repeat(1000),
name: None,
}];
let (result, truncated) = manager.truncate_messages(&messages);
assert!(truncated);
assert!(result.is_empty() || result.len() == 1);
}
#[tokio::test]
async fn test_apr_predict_endpoint() {
let app = create_test_app();
let request = PredictRequest {
model: None,
features: vec![1.0, 2.0, 3.0, 4.0],
feature_names: None,
top_k: Some(3),
include_confidence: true,
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/v1/predict")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let result: PredictResponse = serde_json::from_slice(&body).unwrap();
assert!(!result.request_id.is_empty());
assert_eq!(result.model, "default");
assert!(result.confidence.is_some());
assert!(result.top_k_predictions.is_some());
assert!(result.latency_ms >= 0.0);
assert_eq!(result.prediction, serde_json::json!(10.0));
}
#[tokio::test]
async fn test_apr_predict_empty_features() {
let app = create_test_app();
let request = PredictRequest {
model: None,
features: vec![],
feature_names: None,
top_k: None,
include_confidence: true,
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/v1/predict")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn test_apr_explain_endpoint() {
let app = create_test_app();
let request = ExplainRequest {
model: None,
features: vec![1.0, 2.0, 3.0],
feature_names: vec!["f1".to_string(), "f2".to_string(), "f3".to_string()],
top_k_features: 2,
method: "shap".to_string(),
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/v1/explain")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let result: ExplainResponse = serde_json::from_slice(&body).unwrap();
assert!(!result.request_id.is_empty());
assert_eq!(result.model, "default");
assert!(!result.summary.is_empty());
assert_eq!(result.explanation.feature_names.len(), 3);
assert_eq!(result.explanation.shap_values.len(), 3);
}
#[tokio::test]
async fn test_apr_explain_mismatched_features() {
let app = create_test_app();
let request = ExplainRequest {
model: None,
features: vec![1.0, 2.0, 3.0],
feature_names: vec!["f1".to_string()], top_k_features: 2,
method: "shap".to_string(),
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/v1/explain")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn test_apr_audit_endpoint() {
let state = AppState::demo().unwrap();
let app = create_router(state);
let predict_request = PredictRequest {
model: None,
features: vec![1.0, 2.0, 3.0, 4.0],
feature_names: None,
top_k: None,
include_confidence: true,
};
let response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/v1/predict")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&predict_request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let predict_result: PredictResponse = serde_json::from_slice(&body).unwrap();
let request_id = predict_result.request_id;
let audit_response = app
.oneshot(
Request::builder()
.uri(format!("/v1/audit/{}", request_id))
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(audit_response.status(), StatusCode::OK);
let audit_body = axum::body::to_bytes(audit_response.into_body(), usize::MAX)
.await
.unwrap();
let audit_result: AuditResponse = serde_json::from_slice(&audit_body).unwrap();
assert_eq!(audit_result.record.request_id, request_id);
}
#[tokio::test]
async fn test_apr_audit_invalid_id() {
let app = create_test_app();
let response = app
.oneshot(
Request::builder()
.uri("/v1/audit/not-a-valid-uuid")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
#[test]
fn test_predict_request_serialization() {
let request = PredictRequest {
model: Some("test-model".to_string()),
features: vec![1.0, 2.0, 3.0],
feature_names: Some(vec!["f1".to_string(), "f2".to_string(), "f3".to_string()]),
top_k: Some(3),
include_confidence: true,
};
let json = serde_json::to_string(&request).unwrap();
assert!(json.contains("test-model"));
assert!(json.contains("features"));
let deserialized: PredictRequest = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.features.len(), 3);
}
#[test]
fn test_explain_request_defaults() {
let json = r#"{"features": [1.0], "feature_names": ["f1"]}"#;
let request: ExplainRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.top_k_features, 5); assert_eq!(request.method, "shap"); }
#[test]
#[cfg(feature = "gpu")]
fn test_imp_084_app_state_with_gpu_model() {
use crate::gpu::{GpuModel, GpuModelConfig};
let config = GpuModelConfig {
vocab_size: 256,
hidden_dim: 64,
num_heads: 2,
num_kv_heads: 2, num_layers: 2,
intermediate_dim: 128,
eps: 1e-5,
};
let gpu_model = GpuModel::new(config).expect("Failed to create GPU model");
let state = AppState::with_gpu_model(gpu_model).expect("Failed to create AppState");
assert!(
state.has_gpu_model(),
"IMP-084: AppState should have GPU model"
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_085_completions_uses_gpu_model() {
use crate::gpu::{GpuModel, GpuModelConfig};
let config = GpuModelConfig {
vocab_size: 256,
hidden_dim: 64,
num_heads: 2,
num_kv_heads: 2, num_layers: 2,
intermediate_dim: 128,
eps: 1e-5,
};
let gpu_model = GpuModel::new(config).expect("Failed to create GPU model");
let state = AppState::with_gpu_model(gpu_model).expect("Failed to create AppState");
let app = create_router(state);
let request = CompletionRequest {
prompt: "Hello".to_string(),
max_tokens: Some(5),
temperature: Some(0.0),
model: "default".to_string(),
top_p: None,
stop: None,
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/v1/completions")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(
response.status(),
StatusCode::OK,
"IMP-085: /v1/completions should work with GPU model"
);
}
#[test]
#[cfg(feature = "gpu")]
fn test_imp_116a_appstate_cached_model_storage() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state = AppState::with_cached_model(cached_model)
.expect("IMP-116a: AppState should accept cached model");
assert!(
state.cached_model().is_some(),
"IMP-116a: Cached model should be accessible from AppState"
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_116b_cached_model_thread_safety() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
use std::sync::Arc;
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = Arc::new(OwnedQuantizedModelCachedSync::new(model));
let mut handles = Vec::new();
for i in 0..4 {
let model_clone = cached_model.clone();
handles.push(tokio::spawn(async move {
let inner = model_clone.model();
assert_eq!(inner.config.hidden_dim, 64, "Task {i} should access model");
}));
}
for handle in handles {
handle
.await
.expect("IMP-116b: Concurrent access should succeed");
}
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_116c_completions_uses_cached_model() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state = AppState::with_cached_model(cached_model).expect("Failed to create AppState");
assert!(
state.has_cached_model(),
"IMP-116c: AppState should have cached model"
);
assert!(
state.cached_model().is_some(),
"IMP-116c: cached_model() should return Some"
);
let app = create_router(state);
let request = CompletionRequest {
prompt: "Hello".to_string(),
max_tokens: Some(3),
temperature: Some(0.0),
model: "default".to_string(),
top_p: None,
stop: None,
};
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/v1/completions")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request).unwrap()))
.unwrap(),
)
.await
.unwrap();
let status = response.status();
assert!(
status == StatusCode::OK || status == StatusCode::INTERNAL_SERVER_ERROR,
"IMP-116c: Request should be handled (got {})",
status
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_116d_scheduler_reuse_across_requests() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
use std::sync::Arc;
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = Arc::new(OwnedQuantizedModelCachedSync::new(model));
let mut handles = Vec::new();
for i in 0..5 {
let model_clone = cached_model.clone();
handles.push(tokio::spawn(async move {
let inner = model_clone.model();
assert_eq!(
inner.config.hidden_dim, 64,
"IMP-116d: Access {i} should succeed"
);
}));
}
for (i, handle) in handles.into_iter().enumerate() {
handle
.await
.unwrap_or_else(|_| panic!("IMP-116d: Concurrent access {i} should not panic"));
}
}
#[cfg(feature = "gpu")]
fn create_test_quantized_model(
config: &crate::gguf::GGUFConfig,
) -> crate::gguf::OwnedQuantizedModel {
use crate::gguf::{
OwnedQKVWeights, OwnedQuantizedLayer, OwnedQuantizedModel, OwnedQuantizedTensor,
GGUF_TYPE_Q4_K,
};
let hidden_dim = config.hidden_dim;
let intermediate_dim = config.intermediate_dim;
let vocab_size = config.vocab_size;
fn create_q4k_data(in_dim: usize, out_dim: usize) -> OwnedQuantizedTensor {
let super_blocks_per_row = in_dim.div_ceil(256);
let bytes_per_row = super_blocks_per_row * 144;
let data_size = out_dim * bytes_per_row;
OwnedQuantizedTensor {
data: vec![0u8; data_size],
qtype: GGUF_TYPE_Q4_K,
in_dim,
out_dim,
}
}
let layers = (0..config.num_layers)
.map(|_| OwnedQuantizedLayer {
attn_norm_weight: vec![1.0f32; hidden_dim],
attn_norm_bias: None,
qkv_weight: OwnedQKVWeights::Fused(create_q4k_data(hidden_dim, hidden_dim * 3)),
qkv_bias: None,
attn_output_weight: create_q4k_data(hidden_dim, hidden_dim),
attn_output_bias: None,
ffn_up_weight: create_q4k_data(hidden_dim, intermediate_dim),
ffn_up_bias: None,
ffn_down_weight: create_q4k_data(intermediate_dim, hidden_dim),
ffn_down_bias: None,
ffn_gate_weight: None,
ffn_gate_bias: None,
ffn_norm_weight: None,
ffn_norm_bias: None,
})
.collect();
OwnedQuantizedModel {
config: config.clone(),
token_embedding: vec![0.1f32; vocab_size * hidden_dim],
layers,
output_norm_weight: vec![1.0f32; hidden_dim],
output_norm_bias: None,
lm_head_weight: create_q4k_data(hidden_dim, vocab_size),
lm_head_bias: None,
#[cfg(feature = "cuda")]
cuda_executor: None,
#[cfg(feature = "cuda")]
cuda_kernel_count: std::sync::atomic::AtomicU64::new(0),
}
}
#[test]
#[cfg(feature = "gpu")]
fn test_imp_126a_appstate_has_dispatch_metrics() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-126a: Should create AppState");
let metrics = state.dispatch_metrics();
assert!(
metrics.is_some(),
"IMP-126a: AppState should have dispatch_metrics"
);
let m = metrics.expect("Should have metrics");
assert_eq!(
m.total_dispatches(),
0,
"IMP-126a: Metrics should start at zero"
);
}
#[test]
#[cfg(feature = "gpu")]
fn test_imp_126b_cached_sync_has_generate_adaptive() {
use crate::gguf::{
DispatchMetrics, GGUFConfig, OwnedQuantizedModelCachedSync, QuantizedGenerateConfig,
};
use std::sync::Arc;
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let metrics = Arc::new(DispatchMetrics::new());
let gen_config = QuantizedGenerateConfig {
max_tokens: 3,
temperature: 0.0,
top_k: 1,
stop_tokens: vec![],
};
let prompt = vec![1u32, 2, 3];
let _result = cached_model.generate_with_cache_adaptive(&prompt, &gen_config, &metrics);
assert!(
true,
"IMP-126b: generate_with_cache_adaptive method exists on OwnedQuantizedModelCachedSync"
);
}
#[test]
#[cfg(feature = "gpu")]
fn test_imp_126c_dispatch_metrics_integration() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
use std::sync::Arc;
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-126c: Should create AppState");
let metrics1 = state.dispatch_metrics();
let metrics2 = state.dispatch_metrics();
assert!(
metrics1.is_some(),
"IMP-126c: dispatch_metrics should be available"
);
let m1 = metrics1.expect("Should have metrics");
let m2 = metrics2.expect("Should have metrics");
assert!(
Arc::ptr_eq(m1, m2),
"IMP-126c: dispatch_metrics should be shared Arc"
);
}
#[test]
#[cfg(feature = "gpu")]
fn test_imp_126d_handler_uses_adaptive_generation() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-126d: Should create AppState");
let metrics = state.dispatch_metrics();
assert!(
metrics.is_some(),
"IMP-126d: Handler should have dispatch_metrics for adaptive generation"
);
let m = metrics.expect("Should have metrics");
let initial_cpu = m.cpu_dispatches();
let initial_gpu = m.gpu_dispatches();
m.record_cpu_dispatch();
m.record_gpu_dispatch();
assert_eq!(
m.cpu_dispatches(),
initial_cpu + 1,
"IMP-126d: Metrics should track CPU dispatches"
);
assert_eq!(
m.gpu_dispatches(),
initial_gpu + 1,
"IMP-126d: Metrics should track GPU dispatches"
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_127a_dispatch_metrics_endpoint_exists() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-127a: Should create AppState");
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/metrics/dispatch")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(
response.status(),
StatusCode::OK,
"IMP-127a: /metrics/dispatch should return 200 OK"
);
let content_type = response
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok());
assert!(
content_type.is_some_and(|s| s.contains("application/json")),
"IMP-127a: Response should be JSON"
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_127b_dispatch_metrics_response_structure() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-127b: Should create AppState");
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/metrics/dispatch")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let json: serde_json::Value =
serde_json::from_slice(&body).expect("IMP-127b: Response should be valid JSON");
assert!(
json.get("cpu_dispatches").is_some(),
"IMP-127b: Response should have cpu_dispatches"
);
assert!(
json.get("gpu_dispatches").is_some(),
"IMP-127b: Response should have gpu_dispatches"
);
assert!(
json.get("total_dispatches").is_some(),
"IMP-127b: Response should have total_dispatches"
);
assert!(
json.get("gpu_ratio").is_some(),
"IMP-127b: Response should have gpu_ratio"
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_127c_dispatch_metrics_starts_zero() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-127c: Should create AppState");
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/metrics/dispatch")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(
json["cpu_dispatches"].as_u64(),
Some(0),
"IMP-127c: cpu_dispatches should start at 0"
);
assert_eq!(
json["gpu_dispatches"].as_u64(),
Some(0),
"IMP-127c: gpu_dispatches should start at 0"
);
assert_eq!(
json["total_dispatches"].as_u64(),
Some(0),
"IMP-127c: total_dispatches should start at 0"
);
}
#[tokio::test]
async fn test_imp_127d_dispatch_metrics_no_gpu_model() {
let state = AppState::demo().expect("Should create demo AppState");
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/metrics/dispatch")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(
response.status(),
StatusCode::SERVICE_UNAVAILABLE,
"IMP-127d: /metrics/dispatch should return 503 when no GPU model configured"
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_128a_prometheus_format_endpoint() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-128a: Should create AppState");
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/metrics/dispatch?format=prometheus")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(
response.status(),
StatusCode::OK,
"IMP-128a: Prometheus format should return 200 OK"
);
let content_type = response
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok());
assert!(
content_type.is_some_and(|s| s.contains("text/plain")),
"IMP-128a: Prometheus response should be text/plain"
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_128b_prometheus_format_structure() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-128b: Should create AppState");
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/metrics/dispatch?format=prometheus")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let text = String::from_utf8_lossy(&body);
assert!(
text.contains("realizar_dispatch_cpu_total"),
"IMP-128b: Should have CPU dispatch counter"
);
assert!(
text.contains("realizar_dispatch_gpu_total"),
"IMP-128b: Should have GPU dispatch counter"
);
assert!(
text.contains("realizar_dispatch_gpu_ratio"),
"IMP-128b: Should have GPU ratio gauge"
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_128c_default_format_is_json() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-128c: Should create AppState");
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/metrics/dispatch")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let content_type = response
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok());
assert!(
content_type.is_some_and(|s| s.contains("application/json")),
"IMP-128c: Default format should be JSON"
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_128d_explicit_json_format() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-128d: Should create AppState");
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/metrics/dispatch?format=json")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let content_type = response
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok());
assert!(
content_type.is_some_and(|s| s.contains("application/json")),
"IMP-128d: format=json should return JSON"
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_130a_prometheus_includes_cpu_latency_histogram() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-130a: Should create AppState");
if let Some(metrics) = state.dispatch_metrics() {
metrics.record_cpu_latency(std::time::Duration::from_micros(50));
metrics.record_cpu_latency(std::time::Duration::from_micros(200));
metrics.record_cpu_latency(std::time::Duration::from_micros(800));
}
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/metrics/dispatch?format=prometheus")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let body_str = String::from_utf8_lossy(&body);
assert!(
body_str.contains("realizar_dispatch_cpu_latency_bucket"),
"IMP-130a: Prometheus should include CPU latency histogram buckets. Got: {}",
body_str
);
assert!(
body_str.contains("realizar_dispatch_cpu_latency_sum"),
"IMP-130a: Prometheus should include CPU latency sum"
);
assert!(
body_str.contains("realizar_dispatch_cpu_latency_count"),
"IMP-130a: Prometheus should include CPU latency count"
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_130b_prometheus_includes_gpu_latency_histogram() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-130b: Should create AppState");
if let Some(metrics) = state.dispatch_metrics() {
metrics.record_gpu_latency(std::time::Duration::from_micros(150));
metrics.record_gpu_latency(std::time::Duration::from_micros(600));
}
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/metrics/dispatch?format=prometheus")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let body_str = String::from_utf8_lossy(&body);
assert!(
body_str.contains("realizar_dispatch_gpu_latency_bucket"),
"IMP-130b: Prometheus should include GPU latency histogram buckets. Got: {}",
body_str
);
assert!(
body_str.contains("realizar_dispatch_gpu_latency_sum"),
"IMP-130b: Prometheus should include GPU latency sum"
);
assert!(
body_str.contains("realizar_dispatch_gpu_latency_count"),
"IMP-130b: Prometheus should include GPU latency count"
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_130c_prometheus_latency_buckets_have_correct_labels() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-130c: Should create AppState");
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/metrics/dispatch?format=prometheus")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let body_str = String::from_utf8_lossy(&body);
assert!(
body_str.contains(r#"le="100""#),
"IMP-130c: Should have 100µs bucket label"
);
assert!(
body_str.contains(r#"le="500""#),
"IMP-130c: Should have 500µs bucket label"
);
assert!(
body_str.contains(r#"le="1000""#),
"IMP-130c: Should have 1000µs bucket label"
);
assert!(
body_str.contains(r#"le="5000""#),
"IMP-130c: Should have 5000µs bucket label"
);
assert!(
body_str.contains(r#"le="+Inf""#),
"IMP-130c: Should have +Inf bucket label"
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_130d_prometheus_latency_has_help_and_type() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-130d: Should create AppState");
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/metrics/dispatch?format=prometheus")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let body_str = String::from_utf8_lossy(&body);
assert!(
body_str.contains("# HELP realizar_dispatch_cpu_latency"),
"IMP-130d: Should have HELP for CPU latency histogram"
);
assert!(
body_str.contains("# TYPE realizar_dispatch_cpu_latency histogram"),
"IMP-130d: Should have TYPE histogram for CPU latency"
);
assert!(
body_str.contains("# HELP realizar_dispatch_gpu_latency"),
"IMP-130d: Should have HELP for GPU latency histogram"
);
assert!(
body_str.contains("# TYPE realizar_dispatch_gpu_latency histogram"),
"IMP-130d: Should have TYPE histogram for GPU latency"
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_141a_prometheus_includes_throughput_rps() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
use std::thread;
use std::time::Duration;
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-141a: Should create AppState");
if let Some(metrics) = state.dispatch_metrics() {
thread::sleep(Duration::from_millis(2));
for _ in 0..10 {
metrics.record_cpu_dispatch();
}
}
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/metrics/dispatch?format=prometheus")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let body_str = String::from_utf8_lossy(&body);
assert!(
body_str.contains("realizar_dispatch_throughput_rps"),
"IMP-141a: Prometheus should include throughput_rps metric. Got: {}",
body_str
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_141b_prometheus_includes_elapsed_seconds() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-141b: Should create AppState");
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/metrics/dispatch?format=prometheus")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let body_str = String::from_utf8_lossy(&body);
assert!(
body_str.contains("realizar_dispatch_elapsed_seconds"),
"IMP-141b: Prometheus should include elapsed_seconds metric. Got: {}",
body_str
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_141c_throughput_rps_has_help_and_type() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-141c: Should create AppState");
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/metrics/dispatch?format=prometheus")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let body_str = String::from_utf8_lossy(&body);
assert!(
body_str.contains("# HELP realizar_dispatch_throughput_rps"),
"IMP-141c: Should have HELP for throughput_rps"
);
assert!(
body_str.contains("# TYPE realizar_dispatch_throughput_rps gauge"),
"IMP-141c: Should have TYPE gauge for throughput_rps"
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_141d_elapsed_seconds_has_help_and_type() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-141d: Should create AppState");
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/metrics/dispatch?format=prometheus")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let body_str = String::from_utf8_lossy(&body);
assert!(
body_str.contains("# HELP realizar_dispatch_elapsed_seconds"),
"IMP-141d: Should have HELP for elapsed_seconds"
);
assert!(
body_str.contains("# TYPE realizar_dispatch_elapsed_seconds gauge"),
"IMP-141d: Should have TYPE gauge for elapsed_seconds"
);
}
#[cfg(feature = "gpu")]
#[test]
fn test_imp_131a_dispatch_metrics_has_percentile_methods() {
use crate::gguf::DispatchMetrics;
let metrics = DispatchMetrics::new();
metrics.record_cpu_latency(std::time::Duration::from_micros(50));
metrics.record_cpu_latency(std::time::Duration::from_micros(150));
metrics.record_cpu_latency(std::time::Duration::from_micros(600));
metrics.record_gpu_latency(std::time::Duration::from_micros(80));
metrics.record_gpu_latency(std::time::Duration::from_micros(300));
let _cpu_p50 = metrics.cpu_latency_p50_us();
let _cpu_p95 = metrics.cpu_latency_p95_us();
let _cpu_p99 = metrics.cpu_latency_p99_us();
let _gpu_p50 = metrics.gpu_latency_p50_us();
let _gpu_p95 = metrics.gpu_latency_p95_us();
let _gpu_p99 = metrics.gpu_latency_p99_us();
}
#[cfg(feature = "gpu")]
#[test]
fn test_imp_131b_percentile_estimation_from_histogram() {
use crate::gguf::DispatchMetrics;
let metrics = DispatchMetrics::new();
for _ in 0..50 {
metrics.record_cpu_latency(std::time::Duration::from_micros(50)); }
for _ in 0..30 {
metrics.record_cpu_latency(std::time::Duration::from_micros(200)); }
for _ in 0..20 {
metrics.record_cpu_latency(std::time::Duration::from_micros(700)); }
let p50 = metrics.cpu_latency_p50_us();
assert!(
p50 <= 100.0,
"IMP-131b: p50 should be in first bucket (<=100µs), got {:.1}µs",
p50
);
let p95 = metrics.cpu_latency_p95_us();
assert!(
p95 >= 500.0 && p95 <= 1000.0,
"IMP-131b: p95 should be in bucket 2 (500-1000µs), got {:.1}µs",
p95
);
}
#[tokio::test]
#[cfg(feature = "gpu")]
async fn test_imp_131c_json_response_includes_percentiles() {
use crate::gguf::{GGUFConfig, OwnedQuantizedModelCachedSync};
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 64,
intermediate_dim: 128,
num_layers: 1,
num_heads: 4,
num_kv_heads: 4,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let state =
AppState::with_cached_model(cached_model).expect("IMP-131c: Should create AppState");
if let Some(metrics) = state.dispatch_metrics() {
metrics.record_cpu_latency(std::time::Duration::from_micros(100));
metrics.record_gpu_latency(std::time::Duration::from_micros(200));
}
let app = create_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/metrics/dispatch")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let body_str = String::from_utf8_lossy(&body);
assert!(
body_str.contains("cpu_latency_p50_us"),
"IMP-131c: JSON should include cpu_latency_p50_us. Got: {}",
body_str
);
assert!(
body_str.contains("cpu_latency_p95_us"),
"IMP-131c: JSON should include cpu_latency_p95_us"
);
assert!(
body_str.contains("gpu_latency_p50_us"),
"IMP-131c: JSON should include gpu_latency_p50_us"
);
}
#[cfg(feature = "gpu")]
#[test]
fn test_imp_131d_percentiles_zero_when_empty() {
use crate::gguf::DispatchMetrics;
let metrics = DispatchMetrics::new();
assert_eq!(
metrics.cpu_latency_p50_us(),
0.0,
"IMP-131d: Empty histogram should return 0 for p50"
);
assert_eq!(
metrics.cpu_latency_p95_us(),
0.0,
"IMP-131d: Empty histogram should return 0 for p95"
);
assert_eq!(
metrics.cpu_latency_p99_us(),
0.0,
"IMP-131d: Empty histogram should return 0 for p99"
);
assert_eq!(
metrics.gpu_latency_p50_us(),
0.0,
"IMP-131d: Empty histogram should return 0 for GPU p50"
);
}
#[cfg(feature = "gpu")]
#[test]
fn test_imp_132a_adaptive_attention_records_cpu_latency() {
use crate::gguf::{
DispatchMetrics, GGUFConfig, OwnedQuantizedModelCachedSync, QuantizedGenerateConfig,
};
use std::sync::Arc;
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 16,
intermediate_dim: 32,
num_layers: 1,
num_heads: 2,
num_kv_heads: 2,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let metrics = Arc::new(DispatchMetrics::new());
let gen_config = QuantizedGenerateConfig {
max_tokens: 5,
temperature: 0.0,
top_k: 1,
stop_tokens: vec![],
};
let _ = cached_model.generate_with_cache_adaptive(&[1, 2, 3], &gen_config, &metrics);
assert!(
metrics.cpu_latency_count() > 0,
"IMP-132a: CPU latency count should be > 0 after adaptive generation. Got: {}",
metrics.cpu_latency_count()
);
}
#[cfg(feature = "gpu")]
#[test]
fn test_imp_132b_latency_values_are_reasonable() {
use crate::gguf::{
DispatchMetrics, GGUFConfig, OwnedQuantizedModelCachedSync, QuantizedGenerateConfig,
};
use std::sync::Arc;
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 16,
intermediate_dim: 32,
num_layers: 1,
num_heads: 2,
num_kv_heads: 2,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let metrics = Arc::new(DispatchMetrics::new());
let gen_config = QuantizedGenerateConfig {
max_tokens: 5,
temperature: 0.0,
top_k: 1,
stop_tokens: vec![],
};
let _ = cached_model.generate_with_cache_adaptive(&[1, 2, 3], &gen_config, &metrics);
let mean_latency = metrics.cpu_latency_mean_us();
assert!(
mean_latency > 0.0,
"IMP-132b: Mean CPU latency should be > 0µs after attention. Got: {:.1}µs",
mean_latency
);
}
#[cfg(feature = "gpu")]
#[test]
fn test_imp_132c_latency_count_matches_dispatch_count() {
use crate::gguf::{
DispatchMetrics, GGUFConfig, OwnedQuantizedModelCachedSync, QuantizedGenerateConfig,
};
use std::sync::Arc;
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 16,
intermediate_dim: 32,
num_layers: 2, num_heads: 2,
num_kv_heads: 2,
vocab_size: 100,
context_length: 128,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let metrics = Arc::new(DispatchMetrics::new());
let gen_config = QuantizedGenerateConfig {
max_tokens: 10,
temperature: 0.0,
top_k: 1,
stop_tokens: vec![],
};
let _ = cached_model.generate_with_cache_adaptive(&[1, 2, 3, 4, 5], &gen_config, &metrics);
let cpu_dispatches = metrics.cpu_dispatches();
let cpu_latency_count = metrics.cpu_latency_count();
assert_eq!(
cpu_dispatches, cpu_latency_count,
"IMP-132c: CPU latency count ({}) should match dispatch count ({})",
cpu_latency_count, cpu_dispatches
);
}
#[cfg(feature = "gpu")]
#[test]
fn test_imp_132d_gpu_dispatches_record_latency() {
use crate::gguf::{
DispatchMetrics, GGUFConfig, OwnedQuantizedModelCachedSync, QuantizedGenerateConfig,
};
use std::sync::Arc;
let config = GGUFConfig {
architecture: "test".to_string(),
hidden_dim: 16,
intermediate_dim: 32,
num_layers: 1,
num_heads: 2,
num_kv_heads: 2,
vocab_size: 100,
context_length: 256,
rope_theta: 10000.0,
eps: 1e-5,
};
let model = create_test_quantized_model(&config);
let cached_model = OwnedQuantizedModelCachedSync::new(model);
let metrics = Arc::new(DispatchMetrics::new());
let gen_config = QuantizedGenerateConfig {
max_tokens: 80, temperature: 0.0,
top_k: 1,
stop_tokens: vec![],
};
let _ = cached_model.generate_with_cache_adaptive(&[1], &gen_config, &metrics);
let gpu_dispatches = metrics.gpu_dispatches();
let gpu_latency_count = metrics.gpu_latency_count();
if gpu_dispatches > 0 {
assert_eq!(
gpu_dispatches, gpu_latency_count,
"IMP-132d: GPU latency count ({}) should match dispatch count ({})",
gpu_latency_count, gpu_dispatches
);
}
}
#[test]
fn test_imp_133a_dispatch_metrics_has_mean_methods() {
use crate::gguf::DispatchMetrics;
use std::time::Duration;
let metrics = DispatchMetrics::new();
metrics.record_cpu_latency(Duration::from_micros(100));
metrics.record_cpu_latency(Duration::from_micros(200));
metrics.record_cpu_latency(Duration::from_micros(300));
metrics.record_gpu_latency(Duration::from_micros(500));
metrics.record_gpu_latency(Duration::from_micros(700));
let cpu_mean = metrics.cpu_latency_mean_us();
let gpu_mean = metrics.gpu_latency_mean_us();
assert!(
(cpu_mean - 200.0).abs() < 1.0,
"IMP-133a: CPU mean should be ~200µs, got {}",
cpu_mean
);
assert!(
(gpu_mean - 600.0).abs() < 1.0,
"IMP-133a: GPU mean should be ~600µs, got {}",
gpu_mean
);
}
#[test]
fn test_imp_133b_mean_zero_when_empty() {
use crate::gguf::DispatchMetrics;
let metrics = DispatchMetrics::new();
assert_eq!(
metrics.cpu_latency_mean_us(),
0.0,
"IMP-133b: CPU mean should be 0 when empty"
);
assert_eq!(
metrics.gpu_latency_mean_us(),
0.0,
"IMP-133b: GPU mean should be 0 when empty"
);
}
#[test]
fn test_imp_133c_json_response_includes_mean() {
use crate::gguf::DispatchMetrics;
use std::sync::Arc;
use std::time::Duration;
let metrics = Arc::new(DispatchMetrics::new());
metrics.record_cpu_dispatch();
metrics.record_cpu_latency(Duration::from_micros(100));
metrics.record_cpu_dispatch();
metrics.record_cpu_latency(Duration::from_micros(300));
let response = DispatchMetricsResponse {
cpu_dispatches: metrics.cpu_dispatches(),
gpu_dispatches: metrics.gpu_dispatches(),
total_dispatches: metrics.total_dispatches(),
gpu_ratio: metrics.gpu_ratio(),
cpu_latency_p50_us: metrics.cpu_latency_p50_us(),
cpu_latency_p95_us: metrics.cpu_latency_p95_us(),
cpu_latency_p99_us: metrics.cpu_latency_p99_us(),
gpu_latency_p50_us: metrics.gpu_latency_p50_us(),
gpu_latency_p95_us: metrics.gpu_latency_p95_us(),
gpu_latency_p99_us: metrics.gpu_latency_p99_us(),
cpu_latency_mean_us: metrics.cpu_latency_mean_us(),
gpu_latency_mean_us: metrics.gpu_latency_mean_us(),
cpu_latency_min_us: metrics.cpu_latency_min_us(),
cpu_latency_max_us: metrics.cpu_latency_max_us(),
gpu_latency_min_us: metrics.gpu_latency_min_us(),
gpu_latency_max_us: metrics.gpu_latency_max_us(),
cpu_latency_variance_us: metrics.cpu_latency_variance_us(),
cpu_latency_stddev_us: metrics.cpu_latency_stddev_us(),
gpu_latency_variance_us: metrics.gpu_latency_variance_us(),
gpu_latency_stddev_us: metrics.gpu_latency_stddev_us(),
bucket_boundaries_us: metrics.bucket_boundaries_us(),
cpu_latency_bucket_counts: metrics.cpu_latency_buckets().to_vec(),
gpu_latency_bucket_counts: metrics.gpu_latency_buckets().to_vec(),
throughput_rps: 0.0,
elapsed_seconds: 0.0,
};
assert!(
(response.cpu_latency_mean_us - 200.0).abs() < 1.0,
"IMP-133c: Response CPU mean should be ~200µs, got {}",
response.cpu_latency_mean_us
);
assert_eq!(
response.gpu_latency_mean_us, 0.0,
"IMP-133c: Response GPU mean should be 0 (no GPU samples)"
);
}
#[test]
fn test_imp_133d_mean_single_sample() {
use crate::gguf::DispatchMetrics;
use std::time::Duration;
let metrics = DispatchMetrics::new();
metrics.record_cpu_latency(Duration::from_micros(42));
assert!(
(metrics.cpu_latency_mean_us() - 42.0).abs() < 0.1,
"IMP-133d: Mean of single sample should be 42µs, got {}",
metrics.cpu_latency_mean_us()
);
}
#[test]
fn test_imp_134a_dispatch_metrics_has_min_max_methods() {
use crate::gguf::DispatchMetrics;
use std::time::Duration;
let metrics = DispatchMetrics::new();
metrics.record_cpu_latency(Duration::from_micros(100));
metrics.record_cpu_latency(Duration::from_micros(50));
metrics.record_cpu_latency(Duration::from_micros(300));
metrics.record_gpu_latency(Duration::from_micros(200));
metrics.record_gpu_latency(Duration::from_micros(800));
assert_eq!(
metrics.cpu_latency_min_us(),
50,
"IMP-134a: CPU min should be 50µs"
);
assert_eq!(
metrics.cpu_latency_max_us(),
300,
"IMP-134a: CPU max should be 300µs"
);
assert_eq!(
metrics.gpu_latency_min_us(),
200,
"IMP-134a: GPU min should be 200µs"
);
assert_eq!(
metrics.gpu_latency_max_us(),
800,
"IMP-134a: GPU max should be 800µs"
);
}
#[test]
fn test_imp_134b_min_max_zero_when_empty() {
use crate::gguf::DispatchMetrics;
let metrics = DispatchMetrics::new();
assert_eq!(
metrics.cpu_latency_min_us(),
0,
"IMP-134b: CPU min should be 0 when empty"
);
assert_eq!(
metrics.cpu_latency_max_us(),
0,
"IMP-134b: CPU max should be 0 when empty"
);
assert_eq!(
metrics.gpu_latency_min_us(),
0,
"IMP-134b: GPU min should be 0 when empty"
);
assert_eq!(
metrics.gpu_latency_max_us(),
0,
"IMP-134b: GPU max should be 0 when empty"
);
}
#[test]
fn test_imp_134c_json_response_includes_min_max() {
use crate::gguf::DispatchMetrics;
use std::sync::Arc;
use std::time::Duration;
let metrics = Arc::new(DispatchMetrics::new());
metrics.record_cpu_latency(Duration::from_micros(100));
metrics.record_cpu_latency(Duration::from_micros(500));
let response = DispatchMetricsResponse {
cpu_dispatches: 0,
gpu_dispatches: 0,
total_dispatches: 0,
gpu_ratio: 0.0,
cpu_latency_p50_us: 0.0,
cpu_latency_p95_us: 0.0,
cpu_latency_p99_us: 0.0,
gpu_latency_p50_us: 0.0,
gpu_latency_p95_us: 0.0,
gpu_latency_p99_us: 0.0,
cpu_latency_mean_us: 0.0,
gpu_latency_mean_us: 0.0,
cpu_latency_min_us: metrics.cpu_latency_min_us(),
cpu_latency_max_us: metrics.cpu_latency_max_us(),
gpu_latency_min_us: metrics.gpu_latency_min_us(),
gpu_latency_max_us: metrics.gpu_latency_max_us(),
cpu_latency_variance_us: 0.0,
cpu_latency_stddev_us: 0.0,
gpu_latency_variance_us: 0.0,
gpu_latency_stddev_us: 0.0,
bucket_boundaries_us: vec![],
cpu_latency_bucket_counts: vec![],
gpu_latency_bucket_counts: vec![],
throughput_rps: 0.0,
elapsed_seconds: 0.0,
};
assert_eq!(
response.cpu_latency_min_us, 100,
"IMP-134c: Response CPU min should be 100µs"
);
assert_eq!(
response.cpu_latency_max_us, 500,
"IMP-134c: Response CPU max should be 500µs"
);
}
#[test]
fn test_imp_134d_min_max_single_sample() {
use crate::gguf::DispatchMetrics;
use std::time::Duration;
let metrics = DispatchMetrics::new();
metrics.record_cpu_latency(Duration::from_micros(42));
assert_eq!(
metrics.cpu_latency_min_us(),
42,
"IMP-134d: Min of single sample should be 42µs"
);
assert_eq!(
metrics.cpu_latency_max_us(),
42,
"IMP-134d: Max of single sample should be 42µs"
);
}
#[test]
fn test_imp_135a_dispatch_metrics_has_variance_stddev_methods() {
use crate::gguf::DispatchMetrics;
use std::time::Duration;
let metrics = DispatchMetrics::new();
metrics.record_cpu_latency(Duration::from_micros(100));
metrics.record_cpu_latency(Duration::from_micros(200));
metrics.record_cpu_latency(Duration::from_micros(300));
let cpu_var = metrics.cpu_latency_variance_us();
let cpu_std = metrics.cpu_latency_stddev_us();
assert!(
(cpu_var - 6666.67).abs() < 1.0,
"IMP-135a: CPU variance should be ~6666.67, got {}",
cpu_var
);
assert!(
(cpu_std - 81.65).abs() < 1.0,
"IMP-135a: CPU stddev should be ~81.65, got {}",
cpu_std
);
}
#[test]
fn test_imp_135b_variance_zero_edge_cases() {
use crate::gguf::DispatchMetrics;
use std::time::Duration;
let metrics = DispatchMetrics::new();
assert_eq!(
metrics.cpu_latency_variance_us(),
0.0,
"IMP-135b: CPU variance should be 0 when empty"
);
assert_eq!(
metrics.cpu_latency_stddev_us(),
0.0,
"IMP-135b: CPU stddev should be 0 when empty"
);
metrics.record_cpu_latency(Duration::from_micros(100));
assert_eq!(
metrics.cpu_latency_variance_us(),
0.0,
"IMP-135b: CPU variance should be 0 for single sample"
);
assert_eq!(
metrics.cpu_latency_stddev_us(),
0.0,
"IMP-135b: CPU stddev should be 0 for single sample"
);
}
#[test]
fn test_imp_135c_json_response_includes_variance_stddev() {
use crate::gguf::DispatchMetrics;
use std::sync::Arc;
use std::time::Duration;
let metrics = Arc::new(DispatchMetrics::new());
metrics.record_cpu_latency(Duration::from_micros(100));
metrics.record_cpu_latency(Duration::from_micros(200));
metrics.record_cpu_latency(Duration::from_micros(300));
let response = DispatchMetricsResponse {
cpu_dispatches: 0,
gpu_dispatches: 0,
total_dispatches: 0,
gpu_ratio: 0.0,
cpu_latency_p50_us: 0.0,
cpu_latency_p95_us: 0.0,
cpu_latency_p99_us: 0.0,
gpu_latency_p50_us: 0.0,
gpu_latency_p95_us: 0.0,
gpu_latency_p99_us: 0.0,
cpu_latency_mean_us: 0.0,
gpu_latency_mean_us: 0.0,
cpu_latency_min_us: 0,
cpu_latency_max_us: 0,
gpu_latency_min_us: 0,
gpu_latency_max_us: 0,
cpu_latency_variance_us: metrics.cpu_latency_variance_us(),
cpu_latency_stddev_us: metrics.cpu_latency_stddev_us(),
gpu_latency_variance_us: metrics.gpu_latency_variance_us(),
gpu_latency_stddev_us: metrics.gpu_latency_stddev_us(),
bucket_boundaries_us: vec![],
cpu_latency_bucket_counts: vec![],
gpu_latency_bucket_counts: vec![],
throughput_rps: 0.0,
elapsed_seconds: 0.0,
};
assert!(
(response.cpu_latency_variance_us - 6666.67).abs() < 1.0,
"IMP-135c: Response CPU variance should be ~6666.67"
);
assert!(
response.cpu_latency_stddev_us > 80.0,
"IMP-135c: Response CPU stddev should be > 80"
);
}
#[test]
fn test_imp_135d_gpu_variance_stddev() {
use crate::gguf::DispatchMetrics;
use std::time::Duration;
let metrics = DispatchMetrics::new();
metrics.record_gpu_latency(Duration::from_micros(500));
metrics.record_gpu_latency(Duration::from_micros(1000));
metrics.record_gpu_latency(Duration::from_micros(1500));
let gpu_var = metrics.gpu_latency_variance_us();
let gpu_std = metrics.gpu_latency_stddev_us();
assert!(
(gpu_var - 166666.67).abs() < 1.0,
"IMP-135d: GPU variance should be ~166666.67, got {}",
gpu_var
);
assert!(
(gpu_std - 408.25).abs() < 1.0,
"IMP-135d: GPU stddev should be ~408.25, got {}",
gpu_std
);
}
#[test]
fn test_imp_136a_dispatch_metrics_exposes_bucket_boundaries() {
use crate::gguf::DispatchMetrics;
let boundaries = DispatchMetrics::BUCKET_BOUNDARIES;
assert_eq!(
boundaries.len(),
4,
"IMP-136a: Should have 4 bucket boundaries for 5 buckets"
);
assert_eq!(
boundaries[0], 100,
"IMP-136a: Bucket 0 upper bound should be 100µs"
);
assert_eq!(
boundaries[1], 500,
"IMP-136a: Bucket 1 upper bound should be 500µs"
);
assert_eq!(
boundaries[2], 1000,
"IMP-136a: Bucket 2 upper bound should be 1000µs"
);
assert_eq!(
boundaries[3], 5000,
"IMP-136a: Bucket 3 upper bound should be 5000µs"
);
}
#[test]
fn test_imp_136b_bucket_boundaries_method() {
use crate::gguf::DispatchMetrics;
let metrics = DispatchMetrics::new();
let boundaries = metrics.bucket_boundaries_us();
assert_eq!(
boundaries.len(),
5,
"IMP-136b: Should have 5 bucket boundary strings"
);
assert_eq!(boundaries[0], "0-100", "IMP-136b: Bucket 0 range");
assert_eq!(boundaries[1], "100-500", "IMP-136b: Bucket 1 range");
assert_eq!(boundaries[2], "500-1000", "IMP-136b: Bucket 2 range");
assert_eq!(boundaries[3], "1000-5000", "IMP-136b: Bucket 3 range");
assert_eq!(
boundaries[4], "5000+",
"IMP-136b: Bucket 4 range (unbounded)"
);
}
#[test]
fn test_imp_136c_json_response_includes_bucket_boundaries() {
let response = DispatchMetricsResponse {
cpu_dispatches: 0,
gpu_dispatches: 0,
total_dispatches: 0,
gpu_ratio: 0.0,
cpu_latency_p50_us: 0.0,
cpu_latency_p95_us: 0.0,
cpu_latency_p99_us: 0.0,
gpu_latency_p50_us: 0.0,
gpu_latency_p95_us: 0.0,
gpu_latency_p99_us: 0.0,
cpu_latency_mean_us: 0.0,
gpu_latency_mean_us: 0.0,
cpu_latency_min_us: 0,
cpu_latency_max_us: 0,
gpu_latency_min_us: 0,
gpu_latency_max_us: 0,
cpu_latency_variance_us: 0.0,
cpu_latency_stddev_us: 0.0,
gpu_latency_variance_us: 0.0,
gpu_latency_stddev_us: 0.0,
bucket_boundaries_us: vec![
"0-100".to_string(),
"100-500".to_string(),
"500-1000".to_string(),
"1000-5000".to_string(),
"5000+".to_string(),
],
cpu_latency_bucket_counts: vec![0, 0, 0, 0, 0],
gpu_latency_bucket_counts: vec![0, 0, 0, 0, 0],
throughput_rps: 0.0,
elapsed_seconds: 0.0,
};
let json = serde_json::to_string(&response).expect("IMP-136c: Should serialize");
assert!(
json.contains("bucket_boundaries_us"),
"IMP-136c: JSON should contain bucket_boundaries_us field"
);
assert!(
json.contains("0-100"),
"IMP-136c: JSON should contain bucket range '0-100'"
);
}
#[test]
fn test_imp_136d_response_includes_bucket_counts() {
use crate::gguf::DispatchMetrics;
use std::sync::Arc;
use std::time::Duration;
let metrics = Arc::new(DispatchMetrics::new());
metrics.record_cpu_latency(Duration::from_micros(50)); metrics.record_cpu_latency(Duration::from_micros(200)); metrics.record_cpu_latency(Duration::from_micros(750));
let response = DispatchMetricsResponse {
cpu_dispatches: 0,
gpu_dispatches: 0,
total_dispatches: 0,
gpu_ratio: 0.0,
cpu_latency_p50_us: 0.0,
cpu_latency_p95_us: 0.0,
cpu_latency_p99_us: 0.0,
gpu_latency_p50_us: 0.0,
gpu_latency_p95_us: 0.0,
gpu_latency_p99_us: 0.0,
cpu_latency_mean_us: 0.0,
gpu_latency_mean_us: 0.0,
cpu_latency_min_us: 0,
cpu_latency_max_us: 0,
gpu_latency_min_us: 0,
gpu_latency_max_us: 0,
cpu_latency_variance_us: 0.0,
cpu_latency_stddev_us: 0.0,
gpu_latency_variance_us: 0.0,
gpu_latency_stddev_us: 0.0,
bucket_boundaries_us: metrics.bucket_boundaries_us(),
cpu_latency_bucket_counts: metrics.cpu_latency_buckets().to_vec(),
gpu_latency_bucket_counts: metrics.gpu_latency_buckets().to_vec(),
throughput_rps: 0.0,
elapsed_seconds: 0.0,
};
assert_eq!(
response.cpu_latency_bucket_counts[0], 1,
"IMP-136d: Bucket 0 should have 1 sample"
);
assert_eq!(
response.cpu_latency_bucket_counts[1], 1,
"IMP-136d: Bucket 1 should have 1 sample"
);
assert_eq!(
response.cpu_latency_bucket_counts[2], 1,
"IMP-136d: Bucket 2 should have 1 sample"
);
}
#[test]
fn test_imp_137a_dispatch_metrics_has_reset_method() {
use crate::gguf::DispatchMetrics;
use std::time::Duration;
let metrics = DispatchMetrics::new();
metrics.record_cpu_dispatch();
metrics.record_gpu_dispatch();
metrics.record_cpu_latency(Duration::from_micros(100));
metrics.reset();
assert_eq!(
metrics.cpu_dispatches(),
0,
"IMP-137a: CPU dispatches should be 0 after reset"
);
assert_eq!(
metrics.gpu_dispatches(),
0,
"IMP-137a: GPU dispatches should be 0 after reset"
);
}
#[test]
fn test_imp_137b_reset_clears_all_counters() {
use crate::gguf::DispatchMetrics;
use std::time::Duration;
let metrics = DispatchMetrics::new();
for _ in 0..10 {
metrics.record_cpu_dispatch();
metrics.record_cpu_latency(Duration::from_micros(100));
}
for _ in 0..5 {
metrics.record_gpu_dispatch();
metrics.record_gpu_latency(Duration::from_micros(500));
}
assert_eq!(
metrics.cpu_dispatches(),
10,
"IMP-137b: Pre-reset CPU count"
);
assert_eq!(metrics.gpu_dispatches(), 5, "IMP-137b: Pre-reset GPU count");
metrics.reset();
assert_eq!(
metrics.cpu_dispatches(),
0,
"IMP-137b: Post-reset CPU dispatches"
);
assert_eq!(
metrics.gpu_dispatches(),
0,
"IMP-137b: Post-reset GPU dispatches"
);
assert_eq!(
metrics.total_dispatches(),
0,
"IMP-137b: Post-reset total dispatches"
);
assert_eq!(
metrics.cpu_latency_count(),
0,
"IMP-137b: Post-reset CPU latency count"
);
assert_eq!(
metrics.gpu_latency_count(),
0,
"IMP-137b: Post-reset GPU latency count"
);
}
#[test]
fn test_imp_137c_reset_clears_latency_tracking() {
use crate::gguf::DispatchMetrics;
use std::time::Duration;
let metrics = DispatchMetrics::new();
metrics.record_cpu_latency(Duration::from_micros(100));
metrics.record_cpu_latency(Duration::from_micros(500));
metrics.record_cpu_latency(Duration::from_micros(1000));
assert!(
metrics.cpu_latency_mean_us() > 0.0,
"IMP-137c: Pre-reset mean should be > 0"
);
metrics.reset();
assert_eq!(
metrics.cpu_latency_mean_us(),
0.0,
"IMP-137c: Post-reset CPU mean"
);
assert_eq!(
metrics.cpu_latency_min_us(),
0,
"IMP-137c: Post-reset CPU min"
);
assert_eq!(
metrics.cpu_latency_max_us(),
0,
"IMP-137c: Post-reset CPU max"
);
assert_eq!(
metrics.cpu_latency_variance_us(),
0.0,
"IMP-137c: Post-reset CPU variance"
);
assert_eq!(
metrics.cpu_latency_stddev_us(),
0.0,
"IMP-137c: Post-reset CPU stddev"
);
}
#[test]
fn test_imp_137d_reset_clears_bucket_counts() {
use crate::gguf::DispatchMetrics;
use std::time::Duration;
let metrics = DispatchMetrics::new();
metrics.record_cpu_latency(Duration::from_micros(50)); metrics.record_cpu_latency(Duration::from_micros(200)); metrics.record_cpu_latency(Duration::from_micros(750)); metrics.record_cpu_latency(Duration::from_micros(2000)); metrics.record_cpu_latency(Duration::from_micros(10000));
let buckets_before = metrics.cpu_latency_buckets();
assert_eq!(
buckets_before.iter().sum::<usize>(),
5,
"IMP-137d: Pre-reset bucket total"
);
metrics.reset();
let buckets_after = metrics.cpu_latency_buckets();
assert_eq!(
buckets_after,
[0, 0, 0, 0, 0],
"IMP-137d: Post-reset buckets should all be 0"
);
}
#[test]
fn test_imp_138a_dispatch_reset_handler_exists() {
fn _assert_handler_exists<F, Fut>(f: F)
where
F: Fn(axum::extract::State<AppState>) -> Fut,
Fut: std::future::Future<Output = axum::response::Response>,
{
let _ = f;
}
_assert_handler_exists(dispatch_reset_handler);
}
#[tokio::test]
async fn test_imp_138b_reset_returns_success_response() {
use crate::gguf::DispatchMetrics;
use std::sync::Arc;
let metrics = Arc::new(DispatchMetrics::new());
metrics.record_cpu_dispatch();
metrics.record_gpu_dispatch();
let response = DispatchResetResponse {
success: true,
message: "Metrics reset successfully".to_string(),
};
let json = serde_json::to_string(&response).expect("IMP-138b: Should serialize");
assert!(
json.contains("\"success\":true"),
"IMP-138b: Should have success: true"
);
assert!(
json.contains("reset successfully"),
"IMP-138b: Should have success message"
);
}
#[tokio::test]
async fn test_imp_138c_reset_endpoint_clears_metrics() {
use crate::gguf::DispatchMetrics;
use std::sync::Arc;
use std::time::Duration;
let metrics = Arc::new(DispatchMetrics::new());
for _ in 0..10 {
metrics.record_cpu_dispatch();
metrics.record_cpu_latency(Duration::from_micros(100));
}
assert_eq!(metrics.cpu_dispatches(), 10, "IMP-138c: Pre-reset count");
metrics.reset();
assert_eq!(
metrics.cpu_dispatches(),
0,
"IMP-138c: Post-reset CPU dispatches"
);
assert_eq!(
metrics.gpu_dispatches(),
0,
"IMP-138c: Post-reset GPU dispatches"
);
assert_eq!(
metrics.cpu_latency_count(),
0,
"IMP-138c: Post-reset latency count"
);
}
#[test]
fn test_imp_138d_reset_response_deserialization() {
let json = r#"{"success":true,"message":"Metrics reset successfully"}"#;
let response: DispatchResetResponse =
serde_json::from_str(json).expect("IMP-138d: Should deserialize");
assert!(response.success, "IMP-138d: success should be true");
assert_eq!(
response.message, "Metrics reset successfully",
"IMP-138d: message should match"
);
}
#[test]
fn test_imp_139a_router_includes_reset_route() {
let state = AppState::with_cache(10);
let router = create_router(state);
let _ = router;
}
#[test]
fn test_imp_139b_reset_route_path() {
const EXPECTED_PATH: &str = "/metrics/dispatch/reset";
assert!(
EXPECTED_PATH.starts_with("/metrics/dispatch"),
"IMP-139b: Reset route should be under /metrics/dispatch"
);
assert!(
EXPECTED_PATH.ends_with("/reset"),
"IMP-139b: Reset route should end with /reset"
);
}
#[tokio::test]
async fn test_imp_139c_router_has_reset_handler() {
use axum::body::Body;
use hyper::Request;
use tower::ServiceExt;
let state = AppState::with_cache(10);
let router = create_router(state);
let req = Request::builder()
.method("POST")
.uri("/metrics/dispatch/reset")
.body(Body::empty())
.expect("IMP-139c: Should build request");
let response = router
.oneshot(req)
.await
.expect("IMP-139c: Should get response");
assert_ne!(
response.status().as_u16(),
404,
"IMP-139c: Reset route should exist (not 404)"
);
}
#[tokio::test]
async fn test_imp_139d_reset_route_rejects_get() {
use axum::body::Body;
use hyper::Request;
use tower::ServiceExt;
let state = AppState::with_cache(10);
let router = create_router(state);
let req = Request::builder()
.method("GET")
.uri("/metrics/dispatch/reset")
.body(Body::empty())
.expect("IMP-139d: Should build request");
let response = router
.oneshot(req)
.await
.expect("IMP-139d: Should get response");
assert_eq!(
response.status().as_u16(),
405,
"IMP-139d: GET on reset route should return 405"
);
}
#[test]
fn test_imp_140a_dispatch_metrics_tracks_start_time() {
use crate::gguf::DispatchMetrics;
let metrics = DispatchMetrics::new();
let start_time = metrics.start_time_ms();
assert!(start_time > 0, "IMP-140a: Start time should be > 0");
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("IMP-140a: Should get time")
.as_millis() as u64;
assert!(
now - start_time < 60_000,
"IMP-140a: Start time should be within last minute"
);
}
#[test]
fn test_imp_140b_elapsed_seconds() {
use crate::gguf::DispatchMetrics;
let metrics = DispatchMetrics::new();
let elapsed = metrics.elapsed_seconds();
assert!(elapsed >= 0.0, "IMP-140b: Elapsed should be >= 0");
assert!(elapsed < 10.0, "IMP-140b: Elapsed should be small (< 10s)");
}
#[test]
fn test_imp_140c_throughput_rps() {
use crate::gguf::DispatchMetrics;
use std::thread;
use std::time::Duration;
let metrics = DispatchMetrics::new();
thread::sleep(Duration::from_millis(2));
for _ in 0..100 {
metrics.record_cpu_dispatch();
}
let rps = metrics.throughput_rps();
assert!(rps > 0.0, "IMP-140c: RPS should be > 0, got {}", rps);
assert!(
rps > 100.0,
"IMP-140c: RPS should be > 100 (100 dispatches in ~2ms), got {}",
rps
);
}
#[test]
fn test_imp_140d_json_response_includes_throughput() {
use crate::gguf::DispatchMetrics;
use std::sync::Arc;
let metrics = Arc::new(DispatchMetrics::new());
metrics.record_cpu_dispatch();
metrics.record_cpu_dispatch();
let response = DispatchMetricsResponse {
cpu_dispatches: metrics.cpu_dispatches(),
gpu_dispatches: metrics.gpu_dispatches(),
total_dispatches: metrics.total_dispatches(),
gpu_ratio: metrics.gpu_ratio(),
cpu_latency_p50_us: 0.0,
cpu_latency_p95_us: 0.0,
cpu_latency_p99_us: 0.0,
gpu_latency_p50_us: 0.0,
gpu_latency_p95_us: 0.0,
gpu_latency_p99_us: 0.0,
cpu_latency_mean_us: 0.0,
gpu_latency_mean_us: 0.0,
cpu_latency_min_us: 0,
cpu_latency_max_us: 0,
gpu_latency_min_us: 0,
gpu_latency_max_us: 0,
cpu_latency_variance_us: 0.0,
cpu_latency_stddev_us: 0.0,
gpu_latency_variance_us: 0.0,
gpu_latency_stddev_us: 0.0,
bucket_boundaries_us: vec![],
cpu_latency_bucket_counts: vec![],
gpu_latency_bucket_counts: vec![],
throughput_rps: metrics.throughput_rps(),
elapsed_seconds: metrics.elapsed_seconds(),
};
let json = serde_json::to_string(&response).expect("IMP-140d: Should serialize");
assert!(
json.contains("throughput_rps"),
"IMP-140d: JSON should contain throughput_rps"
);
assert!(
json.contains("elapsed_seconds"),
"IMP-140d: JSON should contain elapsed_seconds"
);
}
#[test]
fn test_imp_142a_dispatch_metrics_has_cpu_latency_cv() {
use crate::gguf::DispatchMetrics;
use std::time::Duration;
let metrics = DispatchMetrics::new();
metrics.record_cpu_latency(Duration::from_micros(100));
metrics.record_cpu_latency(Duration::from_micros(200));
metrics.record_cpu_latency(Duration::from_micros(300));
let cv = metrics.cpu_latency_cv();
assert!(
cv > 0.0,
"IMP-142a: CV should be > 0 for varied samples, got {}",
cv
);
assert!(cv < 100.0, "IMP-142a: CV should be < 100%, got {}%", cv);
}
#[test]
fn test_imp_142b_dispatch_metrics_has_gpu_latency_cv() {
use crate::gguf::DispatchMetrics;
use std::time::Duration;
let metrics = DispatchMetrics::new();
metrics.record_gpu_latency(Duration::from_micros(50));
metrics.record_gpu_latency(Duration::from_micros(100));
metrics.record_gpu_latency(Duration::from_micros(150));
let cv = metrics.gpu_latency_cv();
assert!(
cv > 0.0,
"IMP-142b: CV should be > 0 for varied samples, got {}",
cv
);
}
#[test]
fn test_imp_142c_dispatch_metrics_has_cpu_gpu_speedup() {
use crate::gguf::DispatchMetrics;
use std::time::Duration;
let metrics = DispatchMetrics::new();
metrics.record_cpu_latency(Duration::from_micros(1000));
metrics.record_cpu_latency(Duration::from_micros(1000));
metrics.record_gpu_latency(Duration::from_micros(100));
metrics.record_gpu_latency(Duration::from_micros(100));
let speedup = metrics.cpu_gpu_speedup();
assert!(
speedup > 5.0 && speedup < 15.0,
"IMP-142c: Speedup should be ~10x (CPU 1000µs vs GPU 100µs), got {}x",
speedup
);
}
#[test]
fn test_imp_142d_speedup_returns_zero_without_gpu_samples() {
use crate::gguf::DispatchMetrics;
use std::time::Duration;
let metrics = DispatchMetrics::new();
metrics.record_cpu_latency(Duration::from_micros(1000));
let speedup = metrics.cpu_gpu_speedup();
assert_eq!(
speedup, 0.0,
"IMP-142d: Speedup should be 0.0 when GPU has no samples"
);
}
#[test]
fn test_parity022a_gpu_batch_request_struct() {
let request = GpuBatchRequest {
prompts: vec!["Hello".to_string(), "World".to_string()],
max_tokens: 50,
temperature: 0.0,
top_k: 1,
stop: vec![],
};
assert_eq!(
request.prompts.len(),
2,
"PARITY-022a: Should have 2 prompts"
);
assert_eq!(
request.max_tokens, 50,
"PARITY-022a: max_tokens should be 50"
);
assert_eq!(
request.temperature, 0.0,
"PARITY-022a: temperature should be 0.0"
);
assert_eq!(request.top_k, 1, "PARITY-022a: top_k should be 1");
}
#[test]
fn test_parity022b_gpu_batch_response_struct() {
let response = GpuBatchResponse {
results: vec![GpuBatchResult {
index: 0,
token_ids: vec![1, 2, 3],
text: "test".to_string(),
num_generated: 3,
}],
stats: GpuBatchStats {
batch_size: 1,
gpu_used: false,
total_tokens: 3,
processing_time_ms: 100.0,
throughput_tps: 30.0,
},
};
assert_eq!(
response.results.len(),
1,
"PARITY-022b: Should have 1 result"
);
assert_eq!(
response.stats.batch_size, 1,
"PARITY-022b: batch_size should be 1"
);
assert!(!response.stats.gpu_used, "PARITY-022b: GPU not used");
}
#[test]
fn test_parity022c_gpu_status_response_structure() {
let status = GpuStatusResponse {
cache_ready: false,
cache_memory_bytes: 0,
batch_threshold: 32,
recommended_min_batch: 32,
};
assert_eq!(
status.batch_threshold, 32,
"PARITY-022c: GPU GEMM threshold should be 32 (from IMP-600)"
);
assert_eq!(
status.recommended_min_batch, 32,
"PARITY-022c: Recommended min batch should be 32"
);
}
#[test]
fn test_parity022d_gpu_warmup_response_structure() {
let warmup = GpuWarmupResponse {
success: true,
memory_bytes: 6_400_000_000, num_layers: 32,
message: "GPU cache warmed up".to_string(),
};
assert!(warmup.success, "PARITY-022d: Warmup should succeed");
assert_eq!(warmup.num_layers, 32, "PARITY-022d: phi-2 has 32 layers");
assert!(
warmup.memory_bytes > 6_000_000_000,
"PARITY-022d: Memory should be ~6.4 GB for phi-2"
);
}
#[test]
fn test_parity022e_router_has_gpu_batch_routes() {
let expected_routes = ["/v1/gpu/warmup", "/v1/gpu/status", "/v1/batch/completions"];
for route in expected_routes {
assert!(
!route.is_empty(),
"PARITY-022e: Route {} should be defined",
route
);
}
}
}