use std::time::Duration;
use axum::{
extract::{Path, State},
http::HeaderMap,
Json,
};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use uuid::Uuid;
use crate::{
error::{ApiError, ApiResult},
middleware::{resolve_org_context, AuthUser},
models::HostedMock,
AppState,
};
const ADMIN_PORT: u16 = 9080;
const PROXY_TIMEOUT: Duration = Duration::from_secs(3);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CircuitBreakerStateResponse {
pub endpoint: String,
pub state: String,
pub stats: CircuitStatsResponse,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CircuitStatsResponse {
pub total_requests: u64,
pub successful_requests: u64,
pub failed_requests: u64,
pub rejected_requests: u64,
pub consecutive_failures: u64,
pub consecutive_successes: u64,
pub success_rate: f64,
pub failure_rate: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BulkheadStateResponse {
pub service: String,
pub stats: BulkheadStatsResponse,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BulkheadStatsResponse {
pub active_requests: u32,
pub queued_requests: u32,
pub total_requests: u64,
pub rejected_requests: u64,
pub timeout_requests: u64,
pub utilization_percent: f64,
}
#[derive(Debug, Serialize)]
pub struct ResilienceEnvelope<T: Serialize> {
pub runtime_state: &'static str,
pub data: Vec<T>,
}
impl<T: Serialize> ResilienceEnvelope<T> {
fn live(data: Vec<T>) -> Self {
Self {
runtime_state: "live",
data,
}
}
fn unreachable() -> Self {
Self {
runtime_state: "unreachable",
data: Vec::new(),
}
}
}
pub async fn list_circuit_breakers(
State(state): State<AppState>,
AuthUser(user_id): AuthUser,
Path(deployment_id): Path<Uuid>,
headers: HeaderMap,
) -> ApiResult<Json<ResilienceEnvelope<CircuitBreakerStateResponse>>> {
let deployment = authorize_deployment(&state, user_id, &headers, deployment_id).await?;
let base_url = admin_base_url(&deployment);
let url = format!("{base_url}/api/resilience/circuit-breakers");
Ok(Json(match proxy_get_vec::<CircuitBreakerStateResponse>(&url).await {
Ok(data) => ResilienceEnvelope::live(data),
Err(err) => {
tracing::warn!(%deployment_id, error = %err, "resilience proxy GET failed");
ResilienceEnvelope::unreachable()
}
}))
}
pub async fn list_bulkheads(
State(state): State<AppState>,
AuthUser(user_id): AuthUser,
Path(deployment_id): Path<Uuid>,
headers: HeaderMap,
) -> ApiResult<Json<ResilienceEnvelope<BulkheadStateResponse>>> {
let deployment = authorize_deployment(&state, user_id, &headers, deployment_id).await?;
let base_url = admin_base_url(&deployment);
let url = format!("{base_url}/api/resilience/bulkheads");
Ok(Json(match proxy_get_vec::<BulkheadStateResponse>(&url).await {
Ok(data) => ResilienceEnvelope::live(data),
Err(err) => {
tracing::warn!(%deployment_id, error = %err, "resilience proxy GET failed");
ResilienceEnvelope::unreachable()
}
}))
}
pub async fn get_summary(
State(state): State<AppState>,
AuthUser(user_id): AuthUser,
Path(deployment_id): Path<Uuid>,
headers: HeaderMap,
) -> ApiResult<Json<Value>> {
let deployment = authorize_deployment(&state, user_id, &headers, deployment_id).await?;
let base_url = admin_base_url(&deployment);
let url = format!("{base_url}/api/resilience/dashboard/summary");
Ok(Json(match proxy_get_value(&url).await {
Ok(mut body) => {
if let Value::Object(ref mut map) = body {
map.insert("runtime_state".into(), Value::String("live".into()));
}
body
}
Err(err) => {
tracing::warn!(%deployment_id, error = %err, "resilience proxy GET summary failed");
json!({
"runtime_state": "unreachable",
"circuit_breakers": { "total": 0, "open": 0, "half_open": 0, "closed": 0 },
"bulkheads": { "total": 0, "active_requests": 0, "queued_requests": 0 },
})
}
}))
}
pub async fn reset_circuit_breaker(
State(state): State<AppState>,
AuthUser(user_id): AuthUser,
Path((deployment_id, endpoint)): Path<(Uuid, String)>,
headers: HeaderMap,
) -> ApiResult<Json<Value>> {
let deployment = authorize_deployment(&state, user_id, &headers, deployment_id).await?;
let base_url = admin_base_url(&deployment);
let url = format!(
"{base_url}/api/resilience/circuit-breakers/{}/reset",
urlencoding::encode(&endpoint),
);
Ok(Json(match proxy_post_empty(&url).await {
Ok(()) => json!({ "accepted": true, "runtime_state": "live" }),
Err(err) => {
tracing::warn!(%deployment_id, %endpoint, error = %err, "resilience proxy POST reset failed");
json!({
"accepted": false,
"runtime_state": "unreachable",
"reason": err.to_string(),
})
}
}))
}
pub async fn reset_bulkhead(
State(state): State<AppState>,
AuthUser(user_id): AuthUser,
Path((deployment_id, service)): Path<(Uuid, String)>,
headers: HeaderMap,
) -> ApiResult<Json<Value>> {
let deployment = authorize_deployment(&state, user_id, &headers, deployment_id).await?;
let base_url = admin_base_url(&deployment);
let url =
format!("{base_url}/api/resilience/bulkheads/{}/reset", urlencoding::encode(&service),);
Ok(Json(match proxy_post_empty(&url).await {
Ok(()) => json!({ "accepted": true, "runtime_state": "live" }),
Err(err) => {
tracing::warn!(%deployment_id, %service, error = %err, "resilience proxy POST reset failed");
json!({
"accepted": false,
"runtime_state": "unreachable",
"reason": err.to_string(),
})
}
}))
}
fn admin_base_url(deployment: &HostedMock) -> String {
format!("http://{}.internal:{ADMIN_PORT}", deployment.fly_app_name())
}
async fn proxy_get_vec<T: for<'de> Deserialize<'de>>(url: &str) -> reqwest::Result<Vec<T>> {
reqwest::Client::builder()
.timeout(PROXY_TIMEOUT)
.build()?
.get(url)
.send()
.await?
.error_for_status()?
.json::<Vec<T>>()
.await
}
async fn proxy_get_value(url: &str) -> reqwest::Result<Value> {
reqwest::Client::builder()
.timeout(PROXY_TIMEOUT)
.build()?
.get(url)
.send()
.await?
.error_for_status()?
.json::<Value>()
.await
}
async fn proxy_post_empty(url: &str) -> reqwest::Result<()> {
reqwest::Client::builder()
.timeout(PROXY_TIMEOUT)
.build()?
.post(url)
.send()
.await?
.error_for_status()?;
Ok(())
}
async fn authorize_deployment(
state: &AppState,
user_id: Uuid,
headers: &HeaderMap,
deployment_id: Uuid,
) -> ApiResult<HostedMock> {
let deployment = HostedMock::find_by_id(state.db.pool(), deployment_id)
.await?
.ok_or_else(|| ApiError::InvalidRequest("Deployment not found".into()))?;
let ctx = resolve_org_context(state, user_id, headers, None)
.await
.map_err(|_| ApiError::InvalidRequest("Organization not found".into()))?;
if ctx.org_id != deployment.org_id {
return Err(ApiError::InvalidRequest("Deployment not found".into()));
}
Ok(deployment)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn envelope_live_round_trips_data() {
let env = ResilienceEnvelope::live(vec![BulkheadStateResponse {
service: "http".into(),
stats: BulkheadStatsResponse {
active_requests: 3,
queued_requests: 0,
total_requests: 17,
rejected_requests: 0,
timeout_requests: 0,
utilization_percent: 3.0,
},
}]);
let body = serde_json::to_value(&env).unwrap();
assert_eq!(body["runtime_state"], "live");
assert_eq!(body["data"].as_array().unwrap().len(), 1);
assert_eq!(body["data"][0]["service"], "http");
assert_eq!(body["data"][0]["stats"]["active_requests"], 3);
}
#[test]
fn envelope_unreachable_is_empty() {
let env: ResilienceEnvelope<CircuitBreakerStateResponse> =
ResilienceEnvelope::unreachable();
let body = serde_json::to_value(&env).unwrap();
assert_eq!(body["runtime_state"], "unreachable");
assert_eq!(body["data"].as_array().unwrap().len(), 0);
}
}