use futures::future::join_all;
use serde_json::Value;
use std::time::{Duration, Instant};
use crate::api::health_capabilities::{ClusterCapabilities, ClusterCapabilityStatus};
use crate::api::health_contracts::ClusterMirrorHealth;
use crate::data::cluster::ClusterMirrorRecord;
pub(super) async fn probe_cluster_mirrors(
client: reqwest::Client,
mirrors: Vec<ClusterMirrorRecord>,
unavailable_capabilities: fn(&str) -> ClusterCapabilities,
) -> Vec<(String, ClusterMirrorHealth)> {
join_all(mirrors.into_iter().map(|mirror| {
let client = client.clone();
async move {
let root_url: String = mirror.url.clone();
let ping_url: String = format!("{}/ping", mirror.url.trim_end_matches('/'));
let openapi_url: String = format!("{}/openapi.yaml", mirror.url.trim_end_matches('/'));
let ping_started: Instant = Instant::now();
let ping_response: Result<reqwest::Response, reqwest::Error> = client
.get(&ping_url)
.timeout(Duration::from_secs(3))
.send()
.await;
let latency_ms = ping_response
.as_ref()
.ok()
.map(|_| ping_started.elapsed().as_secs_f64() * 1000.0);
let root_response = client
.get(&root_url)
.timeout(Duration::from_secs(4))
.send()
.await;
let root_json: Option<Value> = match root_response {
Ok(response) => response.json::<Value>().await.ok(),
Err(_) => None,
};
let openapi_started: Instant = Instant::now();
let download_response: Result<reqwest::Response, reqwest::Error> = client
.get(&openapi_url)
.timeout(Duration::from_secs(5))
.send()
.await;
let download_bytes_per_sec: Option<f64> = match download_response {
Ok(response) => match response.bytes().await {
Ok(bytes) => {
let elapsed = openapi_started.elapsed().as_secs_f64().max(0.001);
Some(bytes.len() as f64 / elapsed)
}
Err(_) => None,
},
Err(_) => None,
};
let openapi_ok: bool = download_bytes_per_sec.is_some();
let status: &str = if root_json.is_some() && latency_ms.is_some() {
"online"
} else {
"offline"
};
let cargo_toml_version: Option<String> = root_json
.as_ref()
.and_then(|json| {
json.get("cargo_toml_version")
.or_else(|| json.get("version"))
})
.and_then(Value::as_str)
.map(str::to_string);
let message: Option<String> = root_json
.as_ref()
.and_then(|json| json.get("message"))
.and_then(Value::as_str)
.map(str::to_string);
let mirror_capabilities = build_mirror_capabilities(
root_json.as_ref(),
openapi_ok,
status,
unavailable_capabilities,
);
(
mirror.url,
ClusterMirrorHealth {
url: root_url,
status: status.to_string(),
latency_ms,
download_bytes_per_sec,
cargo_toml_version,
message,
capabilities: mirror_capabilities,
},
)
}
}))
.await
}
fn build_mirror_capabilities(
root_json: Option<&Value>,
openapi_ok: bool,
status: &str,
unavailable_capabilities: fn(&str) -> ClusterCapabilities,
) -> ClusterCapabilities {
root_json
.and_then(|json| json.get("capabilities"))
.and_then(|value| serde_json::from_value::<ClusterCapabilities>(value.clone()).ok())
.map(|caps| ClusterCapabilities {
openapi_download: openapi_download_status(openapi_ok),
management_api: route_advertisement_status(
root_json,
"/management/capabilities",
"GET",
),
gateway_fetch: route_advertisement_status(root_json, "/gateway/fetch", "POST"),
..caps
})
.unwrap_or_else(|| {
let reason = if status == "offline" {
"mirror offline"
} else {
"capabilities not reported by mirror"
};
let mut caps = unavailable_capabilities(reason);
caps.openapi_download = openapi_download_status(openapi_ok);
caps.management_api =
route_advertisement_status(root_json, "/management/capabilities", "GET");
caps.gateway_fetch = route_advertisement_status(root_json, "/gateway/fetch", "POST");
caps
})
}
fn openapi_download_status(openapi_ok: bool) -> ClusterCapabilityStatus {
ClusterCapabilityStatus {
available: openapi_ok,
detail: if openapi_ok {
"Downloaded /openapi.yaml successfully".to_string()
} else {
"Failed to download /openapi.yaml".to_string()
},
}
}
fn route_advertisement_status(
root_json: Option<&Value>,
path: &str,
method: &str,
) -> ClusterCapabilityStatus {
let availability = root_json
.and_then(|json| route_supports(json, path, method))
.unwrap_or(false);
let detail = root_json
.and_then(|json| route_supports(json, path, method))
.map(|ok| {
if ok {
format!("Route {path} ({method}) is advertised")
} else {
format!("Route {path} ({method}) not advertised")
}
})
.unwrap_or_else(|| "Mirror did not include routes metadata".to_string());
ClusterCapabilityStatus {
available: availability,
detail,
}
}
fn route_supports(root_json: &Value, path: &str, method: &str) -> Option<bool> {
let routes: &Vec<Value> = root_json.get("routes")?.as_array()?;
let method_upper: String = method.to_uppercase();
let found: bool = routes.iter().any(|route| {
let route_path: Option<&str> = route.get("path").and_then(Value::as_str);
let methods: Option<&Vec<Value>> = route.get("methods").and_then(Value::as_array);
route_path == Some(path)
&& methods
.map(|items| {
items.iter().any(|m| {
m.as_str()
.map(|s| s.eq_ignore_ascii_case(&method_upper))
.unwrap_or(false)
})
})
.unwrap_or(false)
});
Some(found)
}