use axum::{extract::State, response::Json};
use fusillade::Storage;
use fusillade::request::ServiceTierFilter;
use sqlx_pool_router::PoolProvider;
use std::collections::HashMap;
use crate::api::handlers::sla_capacity::parse_window_to_seconds;
use crate::{
AppState,
auth::permissions::{RequiresPermission, operation, resource},
errors::Error,
};
type PendingCountsByModelAndWindow = HashMap<String, HashMap<String, i64>>;
#[utoipa::path(
get,
path = "/admin/api/v1/monitoring/pending-request-counts",
responses(
(status = 200, description = "Pending request counts by model and completion window", body = HashMap<String, HashMap<String, i64>>),
(status = 500, description = "Internal server error"),
),
tag = "monitoring",
)]
#[tracing::instrument(skip_all)]
pub async fn get_pending_request_counts<P: PoolProvider>(
State(state): State<AppState<P>>,
_: RequiresPermission<resource::System, operation::ReadAll>,
) -> Result<Json<PendingCountsByModelAndWindow>, Error> {
let config = state.current_config();
let windows = config
.batches
.allowed_completion_windows
.iter()
.map(|window| (window.clone(), None, parse_window_to_seconds(window)))
.collect::<Vec<_>>();
let states = vec!["pending".to_string(), "claimed".to_string(), "processing".to_string()]; let model_filter: Vec<String> = Vec::new();
let service_tier_filter = ServiceTierFilter::Exclude(vec![Some("priority".to_string())]);
let counts = state
.request_manager
.get_pending_request_counts_by_model_and_window(&windows, &states, &model_filter, &service_tier_filter, false)
.await
.map_err(|e| Error::Internal {
operation: format!("get pending request counts: {}", e),
})?;
Ok(Json(counts))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::api::models::users::Role;
use crate::test::utils::*;
use axum_test::TestServer;
use sqlx::PgPool;
#[sqlx::test]
async fn test_pending_request_counts_requires_system_permission(pool: sqlx::PgPool) {
let (server, _bg): (TestServer, _) = create_test_app(pool.clone(), false).await;
let standard_user = create_test_user(&pool, Role::StandardUser).await;
let response = server
.get("/admin/api/v1/monitoring/pending-request-counts")
.add_header(&add_auth_headers(&standard_user)[0].0, &add_auth_headers(&standard_user)[0].1)
.add_header(&add_auth_headers(&standard_user)[1].0, &add_auth_headers(&standard_user)[1].1)
.await;
response.assert_status(axum::http::StatusCode::FORBIDDEN);
let platform_manager = create_test_user(&pool, Role::PlatformManager).await;
let response = server
.get("/admin/api/v1/monitoring/pending-request-counts")
.add_header(&add_auth_headers(&platform_manager)[0].0, &add_auth_headers(&platform_manager)[0].1)
.add_header(&add_auth_headers(&platform_manager)[1].0, &add_auth_headers(&platform_manager)[1].1)
.await;
response.assert_status_ok();
}
#[sqlx::test]
async fn test_pending_request_counts_returns_empty_when_no_requests(pool: PgPool) {
let (server, _bg): (TestServer, _) = create_test_app(pool.clone(), false).await;
let admin = create_test_admin_user(&pool, Role::PlatformManager).await;
let response = server
.get("/admin/api/v1/monitoring/pending-request-counts")
.add_header(&add_auth_headers(&admin)[0].0, &add_auth_headers(&admin)[0].1)
.add_header(&add_auth_headers(&admin)[1].0, &add_auth_headers(&admin)[1].1)
.await;
response.assert_status_ok();
let counts: HashMap<String, HashMap<String, i64>> = response.json();
assert_eq!(counts.len(), 0, "Should have no pending requests");
}
#[sqlx::test]
async fn test_pending_request_counts_excludes_priority_service_tier(pool: PgPool) {
use fusillade::{CreateSingleRequestBatchInput, Storage};
use sqlx::postgres::PgConnectOptions;
use sqlx_pool_router::TestDbPools;
let (server, _bg): (TestServer, _) = create_test_app(pool.clone(), false).await;
let admin = create_test_admin_user(&pool, Role::PlatformManager).await;
let base_opts: PgConnectOptions = pool.connect_options().as_ref().clone();
let fusillade_pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(2)
.min_connections(0)
.connect_with(base_opts.options([("search_path", "fusillade")]))
.await
.expect("Failed to create fusillade pool");
let fusillade_pools = TestDbPools::new(fusillade_pool.clone()).await.expect("TestDbPools");
let request_manager = fusillade::PostgresRequestManager::new(fusillade_pools, Default::default());
let model = "test-model";
let mut batch_ids = Vec::new();
for completion_window in ["24h", "1h", "0s"] {
let batch_id = uuid::Uuid::new_v4();
request_manager
.create_single_request_batch(CreateSingleRequestBatchInput {
batch_id: Some(batch_id),
request_id: uuid::Uuid::new_v4(),
body: r#"{"input":"x"}"#.to_string(),
model: model.to_string(),
base_url: "http://localhost".to_string(),
endpoint: "/v1/chat/completions".to_string(),
completion_window: completion_window.to_string(),
initial_state: "pending".to_string(),
api_key: None,
created_by: None,
})
.await
.expect("create single-request batch");
batch_ids.push(batch_id);
}
for batch_id in &batch_ids {
sqlx::query("UPDATE batches SET expires_at = NOW() + interval '30 minutes' WHERE id = $1")
.bind(batch_id)
.execute(&fusillade_pool)
.await
.expect("pin expires_at");
}
let response = server
.get("/admin/api/v1/monitoring/pending-request-counts")
.add_header(&add_auth_headers(&admin)[0].0, &add_auth_headers(&admin)[0].1)
.add_header(&add_auth_headers(&admin)[1].0, &add_auth_headers(&admin)[1].1)
.await;
response.assert_status_ok();
let counts: HashMap<String, HashMap<String, i64>> = response.json();
let model_counts = counts
.get(model)
.unwrap_or_else(|| panic!("expected '{model}' in response, got {counts:?}"));
let count_24h = *model_counts.get("24h").unwrap_or(&0);
assert_eq!(
count_24h, 2,
"expected 2 (batch + flex) within 24h window — priority must be excluded; got {count_24h} ({model_counts:?})"
);
}
}