use axum::{extract::State, response::Json};
use fusillade::Storage;
use fusillade::request::ServiceTierFilter;
use sqlx_pool_router::PoolProvider;
use std::collections::HashMap;
use crate::api::handlers::sla_capacity::parse_window_to_seconds;
use crate::{
AppState,
auth::permissions::{RequiresPermission, operation, resource},
errors::Error,
};
type PendingCountsByModelAndWindow = HashMap<String, HashMap<String, i64>>;
#[utoipa::path(
get,
path = "/admin/api/v1/monitoring/pending-request-counts",
responses(
(status = 200, description = "Pending request counts by model and completion window", body = HashMap<String, HashMap<String, i64>>),
(status = 500, description = "Internal server error"),
),
tag = "monitoring",
)]
#[tracing::instrument(skip_all)]
pub async fn get_pending_request_counts<P: PoolProvider>(
State(state): State<AppState<P>>,
_: RequiresPermission<resource::System, operation::ReadAll>,
) -> Result<Json<PendingCountsByModelAndWindow>, Error> {
let config = state.current_config();
let windows = config
.batches
.allowed_completion_windows
.iter()
.map(|window| (window.clone(), None, parse_window_to_seconds(window)))
.collect::<Vec<_>>();
let states = vec!["pending".to_string(), "claimed".to_string(), "processing".to_string()]; let model_filter: Vec<String> = Vec::new();
let service_tier_filter = ServiceTierFilter::Exclude(vec![Some("priority".to_string())]);
let counts = state
.request_manager
.get_pending_request_counts_by_model_and_window(
&windows,
&states,
&model_filter,
&service_tier_filter,
config.batches.priority_decay_window_secs,
false,
)
.await
.map_err(|e| Error::Internal {
operation: format!("get pending request counts: {}", e),
})?;
Ok(Json(counts))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::api::models::users::Role;
use crate::test::utils::*;
use axum_test::TestServer;
use sqlx::PgPool;
#[sqlx::test]
async fn test_pending_request_counts_requires_system_permission(pool: sqlx::PgPool) {
let (server, _bg): (TestServer, _) = create_test_app(pool.clone(), false).await;
let standard_user = create_test_user(&pool, Role::StandardUser).await;
let response = server
.get("/admin/api/v1/monitoring/pending-request-counts")
.add_header(&add_auth_headers(&standard_user)[0].0, &add_auth_headers(&standard_user)[0].1)
.add_header(&add_auth_headers(&standard_user)[1].0, &add_auth_headers(&standard_user)[1].1)
.await;
response.assert_status(axum::http::StatusCode::FORBIDDEN);
let platform_manager = create_test_user(&pool, Role::PlatformManager).await;
let response = server
.get("/admin/api/v1/monitoring/pending-request-counts")
.add_header(&add_auth_headers(&platform_manager)[0].0, &add_auth_headers(&platform_manager)[0].1)
.add_header(&add_auth_headers(&platform_manager)[1].0, &add_auth_headers(&platform_manager)[1].1)
.await;
response.assert_status_ok();
}
#[sqlx::test]
async fn test_pending_request_counts_returns_empty_when_no_requests(pool: PgPool) {
let (server, _bg): (TestServer, _) = create_test_app(pool.clone(), false).await;
let admin = create_test_admin_user(&pool, Role::PlatformManager).await;
let response = server
.get("/admin/api/v1/monitoring/pending-request-counts")
.add_header(&add_auth_headers(&admin)[0].0, &add_auth_headers(&admin)[0].1)
.add_header(&add_auth_headers(&admin)[1].0, &add_auth_headers(&admin)[1].1)
.await;
response.assert_status_ok();
let counts: HashMap<String, HashMap<String, i64>> = response.json();
assert_eq!(counts.len(), 0, "Should have no pending requests");
}
#[sqlx::test]
async fn test_pending_request_counts_excludes_priority_service_tier(pool: PgPool) {
use fusillade::{BatchInput, RequestTemplateInput, Storage};
use sqlx::postgres::PgConnectOptions;
use sqlx_pool_router::TestDbPools;
let (server, _bg): (TestServer, _) = create_test_app(pool.clone(), false).await;
let admin = create_test_admin_user(&pool, Role::PlatformManager).await;
let base_opts: PgConnectOptions = pool.connect_options().as_ref().clone();
let fusillade_pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(2)
.min_connections(0)
.connect_with(base_opts.options([("search_path", "fusillade")]))
.await
.expect("Failed to create fusillade pool");
let fusillade_pools = TestDbPools::new(fusillade_pool.clone()).await.expect("TestDbPools");
let request_manager = fusillade::PostgresRequestManager::new(fusillade_pools, Default::default());
let model = "test-model";
let mut batch_ids = Vec::new();
for completion_window in ["24h", "1h"] {
let template = RequestTemplateInput {
custom_id: None,
endpoint: "https://api.example.com".to_string(),
method: "POST".to_string(),
path: "/v1/chat/completions".to_string(),
body: r#"{"input":"x"}"#.to_string(),
model: model.to_string(),
api_key: "key".to_string(),
};
let file_id = request_manager
.create_file(format!("queue-test-{completion_window}"), None, vec![template])
.await
.expect("create_file");
let batch = request_manager
.create_batch(BatchInput {
file_id,
endpoint: "/v1/chat/completions".to_string(),
completion_window: completion_window.to_string(),
metadata: None,
created_by: None,
api_key_id: None,
api_key: None,
total_requests: None,
})
.await
.expect("create_batch");
batch_ids.push(batch.id.0);
}
request_manager
.create_realtime(fusillade::CreateRealtimeInput {
request_id: uuid::Uuid::new_v4(),
body: r#"{"input":"x"}"#.to_string(),
model: model.to_string(),
endpoint: "http://localhost".to_string(),
method: "POST".to_string(),
path: "/v1/responses".to_string(),
api_key: String::new(),
created_by: "queue-user".to_string(),
})
.await
.expect("create_realtime");
for batch_id in &batch_ids {
sqlx::query("UPDATE batches SET expires_at = NOW() + interval '30 minutes' WHERE id = $1")
.bind(batch_id)
.execute(&fusillade_pool)
.await
.expect("pin expires_at");
}
let response = server
.get("/admin/api/v1/monitoring/pending-request-counts")
.add_header(&add_auth_headers(&admin)[0].0, &add_auth_headers(&admin)[0].1)
.add_header(&add_auth_headers(&admin)[1].0, &add_auth_headers(&admin)[1].1)
.await;
response.assert_status_ok();
let counts: HashMap<String, HashMap<String, i64>> = response.json();
let model_counts = counts
.get(model)
.unwrap_or_else(|| panic!("expected '{model}' in response, got {counts:?}"));
let count_24h = *model_counts.get("24h").unwrap_or(&0);
assert_eq!(
count_24h, 2,
"expected 2 (batch + flex) within 24h window — priority must be excluded; got {count_24h} ({model_counts:?})"
);
}
#[sqlx::test]
async fn test_pending_request_counts_priority_decay_window_includes_recent_flex(pool: PgPool) {
use fusillade::{CreateFlexInput, RequestId, Storage};
use sqlx::postgres::PgConnectOptions;
use sqlx_pool_router::TestDbPools;
let mut config = create_test_config();
config.batches.allowed_completion_windows = vec!["1h".to_string(), "24h".to_string()];
config.batches.priority_decay_window_secs = Some(600);
let (server, _bg): (TestServer, _) = create_test_app_with_config(pool.clone(), config, false).await;
let admin = create_test_admin_user(&pool, Role::PlatformManager).await;
let base_opts: PgConnectOptions = pool.connect_options().as_ref().clone();
let fusillade_pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(2)
.min_connections(0)
.connect_with(base_opts.options([("search_path", "fusillade")]))
.await
.expect("Failed to create fusillade pool");
let fusillade_pools = TestDbPools::new(fusillade_pool.clone()).await.expect("TestDbPools");
let request_manager = fusillade::PostgresRequestManager::new(fusillade_pools, Default::default());
let model = "flex-decay-model";
let recent_id = uuid::Uuid::new_v4();
request_manager
.create_flex(CreateFlexInput {
request_id: recent_id,
body: r#"{"input":"recent"}"#.to_string(),
model: model.to_string(),
endpoint: "http://localhost".to_string(),
method: "POST".to_string(),
path: "/v1/responses".to_string(),
api_key: String::new(),
created_by: "queue-user".to_string(),
})
.await
.expect("create recent flex");
mark_fusillade_request_processing(&fusillade_pool, recent_id)
.await
.expect("start recent flex");
request_manager
.complete_request(RequestId(recent_id), r#"{"output":"recent"}"#, 200)
.await
.expect("complete recent flex");
let old_id = uuid::Uuid::new_v4();
request_manager
.create_flex(CreateFlexInput {
request_id: old_id,
body: r#"{"input":"old"}"#.to_string(),
model: model.to_string(),
endpoint: "http://localhost".to_string(),
method: "POST".to_string(),
path: "/v1/responses".to_string(),
api_key: String::new(),
created_by: "queue-user".to_string(),
})
.await
.expect("create old flex");
mark_fusillade_request_processing(&fusillade_pool, old_id)
.await
.expect("start old flex");
request_manager
.complete_request(RequestId(old_id), r#"{"output":"old"}"#, 200)
.await
.expect("complete old flex");
let failed_id = uuid::Uuid::new_v4();
request_manager
.create_flex(CreateFlexInput {
request_id: failed_id,
body: r#"{"input":"failed"}"#.to_string(),
model: model.to_string(),
endpoint: "http://localhost".to_string(),
method: "POST".to_string(),
path: "/v1/responses".to_string(),
api_key: String::new(),
created_by: "queue-user".to_string(),
})
.await
.expect("create failed flex");
mark_fusillade_request_processing(&fusillade_pool, failed_id)
.await
.expect("start failed flex");
request_manager
.fail_request(RequestId(failed_id), r#"{"error":"failed"}"#, 500)
.await
.expect("fail flex");
let canceled_id = uuid::Uuid::new_v4();
request_manager
.create_flex(CreateFlexInput {
request_id: canceled_id,
body: r#"{"input":"canceled"}"#.to_string(),
model: model.to_string(),
endpoint: "http://localhost".to_string(),
method: "POST".to_string(),
path: "/v1/responses".to_string(),
api_key: String::new(),
created_by: "queue-user".to_string(),
})
.await
.expect("create canceled flex");
sqlx::query("UPDATE requests SET completed_at = NOW() - INTERVAL '5 minutes' WHERE id = $1")
.bind(recent_id)
.execute(&fusillade_pool)
.await
.expect("age recent completion");
sqlx::query("UPDATE requests SET completed_at = NOW() - INTERVAL '20 minutes' WHERE id = $1")
.bind(old_id)
.execute(&fusillade_pool)
.await
.expect("age old completion");
sqlx::query("UPDATE requests SET failed_at = NOW() - INTERVAL '5 minutes' WHERE id = $1")
.bind(failed_id)
.execute(&fusillade_pool)
.await
.expect("age failed request");
sqlx::query("UPDATE requests SET state = 'canceled', canceled_at = NOW() - INTERVAL '5 minutes' WHERE id = $1")
.bind(canceled_id)
.execute(&fusillade_pool)
.await
.expect("cancel request");
let response = server
.get("/admin/api/v1/monitoring/pending-request-counts")
.add_header(&add_auth_headers(&admin)[0].0, &add_auth_headers(&admin)[0].1)
.add_header(&add_auth_headers(&admin)[1].0, &add_auth_headers(&admin)[1].1)
.await;
response.assert_status_ok();
let counts: HashMap<String, HashMap<String, i64>> = response.json();
let model_counts = counts
.get(model)
.unwrap_or_else(|| panic!("expected '{model}' in response, got {counts:?}"));
assert_eq!(
*model_counts.get("1h").unwrap_or(&0),
1,
"only completed flex requests within the 10 minute decay window should count"
);
}
async fn mark_fusillade_request_processing(pool: &PgPool, id: uuid::Uuid) -> sqlx::Result<()> {
sqlx::query(
r#"
UPDATE requests
SET state = 'processing',
daemon_id = gen_random_uuid(),
claimed_at = NOW(),
started_at = NOW()
WHERE id = $1
"#,
)
.bind(id)
.execute(pool)
.await?;
Ok(())
}
}