#[cfg(feature = "gpu")]
async fn process_batch(
model: &std::sync::Arc<crate::gguf::OwnedQuantizedModelCachedSync>,
config: &BatchConfig,
batch: &mut Vec<ContinuousBatchRequest>,
) {
use std::time::Instant;
if batch.is_empty() {
return;
}
let batch_size = batch.len();
let batch_start = Instant::now();
let gpu_threshold = config.gpu_threshold;
if batch_size >= gpu_threshold && model.is_gpu_cache_warm() {
let prompts: Vec<Vec<u32>> = batch.iter().map(|r| r.prompt_tokens.clone()).collect();
let first = &batch[0];
let gen_config = crate::gguf::QuantizedGenerateConfig {
max_tokens: first.max_tokens,
temperature: first.temperature,
top_k: first.top_k,
stop_tokens: Vec::new(),
trace: false,
..Default::default()
};
let results = model.batch_generate_gpu(&prompts, &gen_config);
let total_latency_ms = batch_start.elapsed().as_secs_f64() * 1000.0;
let per_request_latency_ms = total_latency_ms / batch_size as f64;
match results {
Ok(all_token_ids) => {
for (request, token_ids) in batch.drain(..).zip(all_token_ids.into_iter()) {
let response = ContinuousBatchResponse {
token_ids,
prompt_len: request.prompt_tokens.len(),
batched: true,
batch_size,
latency_ms: per_request_latency_ms,
};
let _ = request.response_tx.send(response);
}
},
Err(_) => {
for request in batch.drain(..) {
let response = ContinuousBatchResponse {
token_ids: request.prompt_tokens.clone(),
prompt_len: request.prompt_tokens.len(),
batched: false,
batch_size,
latency_ms: per_request_latency_ms,
};
let _ = request.response_tx.send(response);
}
},
}
} else {
let mut handles = Vec::with_capacity(batch_size);
for request in batch.drain(..) {
let model = model.clone();
let handle = tokio::spawn(async move {
let start = Instant::now();
let gen_config = crate::gguf::QuantizedGenerateConfig {
max_tokens: request.max_tokens,
temperature: request.temperature,
top_k: request.top_k,
stop_tokens: Vec::new(),
trace: false,
..Default::default()
};
let result = model.generate_with_cache(&request.prompt_tokens, &gen_config);
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
let response = match result {
Ok(token_ids) => ContinuousBatchResponse {
token_ids,
prompt_len: request.prompt_tokens.len(),
batched: false,
batch_size: 1,
latency_ms,
},
Err(_) => ContinuousBatchResponse {
token_ids: request.prompt_tokens.clone(),
prompt_len: request.prompt_tokens.len(),
batched: false,
batch_size: 1,
latency_ms,
},
};
let _ = request.response_tx.send(response);
});
handles.push(handle);
}
for handle in handles {
let _ = handle.await;
}
}
}
#[cfg(feature = "gpu")]
pub async fn gpu_warmup_handler(
State(state): State<AppState>,
) -> Result<Json<GpuWarmupResponse>, (StatusCode, Json<ErrorResponse>)> {
if let Some(cached_model) = state.cached_model() {
match cached_model.warmup_gpu_cache() {
Ok((memory_bytes, num_layers)) => Ok(Json(GpuWarmupResponse {
success: true,
memory_bytes,
num_layers,
message: format!(
"GPU cache warmed up: {} layers, {:.2} GB",
num_layers,
memory_bytes as f64 / 1e9
),
})),
Err(e) => Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: format!("GPU warmup failed: {e}"),
}),
)),
}
} else {
Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(ErrorResponse {
error: "No GPU-capable model loaded. Use with_cached_model() to enable."
.to_string(),
}),
))
}
}
#[cfg(not(feature = "gpu"))]
pub async fn gpu_warmup_handler(
State(_state): State<AppState>,
) -> Result<Json<GpuWarmupResponse>, (StatusCode, Json<ErrorResponse>)> {
Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(ErrorResponse {
error: "GPU feature not enabled. Build with --features gpu".to_string(),
}),
))
}
#[cfg(feature = "gpu")]
pub async fn gpu_status_handler(
State(state): State<AppState>,
) -> Result<Json<GpuStatusResponse>, (StatusCode, Json<ErrorResponse>)> {
if let Some(cached_model) = state.cached_model() {
Ok(Json(GpuStatusResponse {
cache_ready: cached_model.is_gpu_cache_warm(),
cache_memory_bytes: cached_model.gpu_cache_memory(),
batch_threshold: 32, recommended_min_batch: 32,
}))
} else {
Ok(Json(GpuStatusResponse {
cache_ready: false,
cache_memory_bytes: 0,
batch_threshold: 32,
recommended_min_batch: 32,
}))
}
}
#[cfg(not(feature = "gpu"))]
pub async fn gpu_status_handler(
State(_state): State<AppState>,
) -> Result<Json<GpuStatusResponse>, (StatusCode, Json<ErrorResponse>)> {
Ok(Json(GpuStatusResponse {
cache_ready: false,
cache_memory_bytes: 0,
batch_threshold: 32,
recommended_min_batch: 32,
}))
}
#[cfg(feature = "gpu")]
pub async fn gpu_batch_completions_handler(
State(state): State<AppState>,
Json(request): Json<GpuBatchRequest>,
) -> Result<Json<GpuBatchResponse>, (StatusCode, Json<ErrorResponse>)> {
use std::time::Instant;
if request.prompts.is_empty() {
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Prompts array cannot be empty".to_string(),
}),
));
}
let Some(cached_model) = state.cached_model() else {
return Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(ErrorResponse {
error: "No GPU-capable model loaded".to_string(),
}),
));
};
let gpu_ready = cached_model.is_gpu_cache_warm();
let batch_size = request.prompts.len();
let prompts_tokens: Vec<Vec<u32>> = request
.prompts
.iter()
.map(|p| {
p.bytes().map(|b| b as u32).collect()
})
.collect();
let gen_config = crate::gguf::QuantizedGenerateConfig {
max_tokens: request.max_tokens,
temperature: request.temperature,
top_k: request.top_k,
stop_tokens: vec![],
trace: false,
..Default::default()
};
let start = Instant::now();
let gpu_threshold = 32;
let use_gpu = gpu_ready && batch_size >= gpu_threshold;
let results = if use_gpu {
match cached_model.batch_generate_gpu(&prompts_tokens, &gen_config) {
Ok(generated) => generated,
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: format!("GPU batch generation failed: {e}"),
}),
));
},
}
} else {
let mut results = Vec::with_capacity(batch_size);
for prompt in &prompts_tokens {
match cached_model.generate_with_cache(prompt, &gen_config) {
Ok(tokens) => results.push(tokens),
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: format!("Generation failed: {e}"),
}),
));
},
}
}
results
};
let elapsed = start.elapsed();
let total_tokens: usize = results.iter().map(Vec::len).sum();
let throughput_tps = total_tokens as f64 / elapsed.as_secs_f64();
let batch_results: Vec<GpuBatchResult> = results
.into_iter()
.enumerate()
.map(|(idx, tokens)| {
let prompt_len = prompts_tokens.get(idx).map_or(0, Vec::len);
let num_generated = tokens.len().saturating_sub(prompt_len);
GpuBatchResult {
index: idx,
token_ids: tokens.clone(),
text: tokens.iter().map(|&t| t as u8 as char).collect(),
num_generated,
}
})
.collect();
Ok(Json(GpuBatchResponse {
results: batch_results,
stats: GpuBatchStats {
batch_size,
gpu_used: use_gpu,
total_tokens,
processing_time_ms: elapsed.as_secs_f64() * 1000.0,
throughput_tps,
},
}))
}
#[cfg(not(feature = "gpu"))]
pub async fn gpu_batch_completions_handler(
State(_state): State<AppState>,
Json(_request): Json<GpuBatchRequest>,
) -> Result<Json<GpuBatchResponse>, (StatusCode, Json<ErrorResponse>)> {
Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(ErrorResponse {
error: "GPU feature not enabled. Build with --features gpu".to_string(),
}),
))
}
pub async fn models_handler(
State(state): State<AppState>,
) -> Result<Json<ModelsResponse>, (StatusCode, Json<ErrorResponse>)> {
if let Some(registry) = &state.registry {
let models = registry.list();
Ok(Json(ModelsResponse { models }))
} else {
Ok(Json(ModelsResponse {
models: vec![ModelInfo {
id: "default".to_string(),
name: "Default Model".to_string(),
description: "Single model deployment".to_string(),
format: "unknown".to_string(),
loaded: true,
}],
}))
}
}
pub async fn tokenize_handler(
State(state): State<AppState>,
Json(request): Json<TokenizeRequest>,
) -> Result<Json<TokenizeResponse>, (StatusCode, Json<ErrorResponse>)> {
let (_model, tokenizer) = state.get_model(request.model_id.as_deref()).map_err(|e| {
(
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let token_ids = tokenizer.encode(&request.text);
let num_tokens = token_ids.len();
Ok(Json(TokenizeResponse {
token_ids,
num_tokens,
}))
}
#[cfg(feature = "cuda")]
fn try_cuda_generate(
state: &AppState,
request: &GenerateRequest,
) -> Result<Option<GenerateResponse>, ApiErr> {
use crate::gguf::QuantizedGenerateConfig;
let cuda_model_lock = match state.cuda_model() {
Some(l) => l,
None => return Ok(None),
};
let tokenizer = require_tok(state)?;
let prompt_ids = tokenize_prompt(&tokenizer, &request.prompt)?;
let prompt_tokens = prompt_ids.len();
let q_config = QuantizedGenerateConfig {
max_tokens: request.max_tokens,
temperature: request.temperature,
top_k: if request.temperature == 0.0 {
1
} else {
request.top_k
},
stop_tokens: vec![eos_id(&tokenizer, state.model_eos_token_id())],
trace: false,
..Default::default()
};
let mut cuda_model = cuda_model_lock.write().map_err(|_| {
api_err(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to acquire CUDA model lock",
)
})?;
let generated = cuda_model
.generate_gpu_resident(&prompt_ids, &q_config)
.map_err(|e| {
api_err(
StatusCode::INTERNAL_SERVER_ERROR,
format!("CUDA generation failed: {e}"),
)
})?;
let text = tokenizer
.decode(&generated)
.map_err(|e| api_err(StatusCode::INTERNAL_SERVER_ERROR, e))?;
Ok(Some(GenerateResponse {
num_generated: generated.len().saturating_sub(prompt_tokens),
token_ids: generated,
text,
}))
}