use crate::error::{Result, SdkError};
use crate::types::WorkerEndpoint;
use serde::Deserialize;
use tracing::{debug, info, warn};
pub const DEFAULT_DISCOVERY_URL: &str = "https://discovery.allenhark.network";
#[derive(Debug, Clone, Deserialize)]
pub struct DiscoveryResponse {
pub regions: Vec<DiscoveryRegion>,
pub workers: Vec<DiscoveryWorker>,
pub recommended_region: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct DiscoveryRegion {
pub id: String,
pub name: String,
pub lat: Option<f64>,
pub lon: Option<f64>,
pub leader_rtt_ms: Option<f64>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct DiscoveryWorker {
pub id: String,
pub region: String,
pub ip: String,
pub ports: WorkerPorts,
pub healthy: bool,
pub version: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct WorkerPorts {
pub quic: u16,
#[serde(default = "default_grpc_port")]
pub grpc: u16,
#[serde(default)]
pub ws: Option<u16>,
#[serde(default)]
pub http: Option<u16>,
}
fn default_grpc_port() -> u16 {
10000
}
pub struct DiscoveryClient {
http: reqwest::Client,
url: String,
}
impl DiscoveryClient {
pub fn new(discovery_url: &str) -> Self {
let http = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.expect("Failed to create HTTP client");
Self {
http,
url: discovery_url.to_string(),
}
}
pub async fn discover(&self) -> Result<DiscoveryResponse> {
let url = format!("{}/v1/discovery", self.url);
debug!(url = %url, "Fetching worker discovery");
let response = self
.http
.get(&url)
.send()
.await
.map_err(|e| SdkError::connection(format!("Discovery request failed: {}", e)))?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(SdkError::connection(format!(
"Discovery failed (HTTP {}): {}",
status, body
)));
}
let discovery: DiscoveryResponse = response
.json()
.await
.map_err(|e| SdkError::connection(format!("Invalid discovery response: {}", e)))?;
info!(
regions = discovery.regions.len(),
workers = discovery.workers.len(),
recommended = ?discovery.recommended_region,
"Discovery complete"
);
Ok(discovery)
}
pub fn workers_for_region<'a>(
&self,
response: &'a DiscoveryResponse,
region: &str,
) -> Vec<&'a DiscoveryWorker> {
response
.workers
.iter()
.filter(|w| w.region == region && w.healthy)
.collect()
}
pub fn best_region(
&self,
response: &DiscoveryResponse,
preferred: Option<&str>,
) -> Option<String> {
if let Some(pref) = preferred {
if response
.workers
.iter()
.any(|w| w.region == pref && w.healthy)
{
return Some(pref.to_string());
}
warn!(
preferred = pref,
"Preferred region has no healthy workers, falling back"
);
}
response.recommended_region.clone()
}
pub fn to_worker_endpoints(workers: &[DiscoveryWorker]) -> Vec<WorkerEndpoint> {
workers
.iter()
.filter(|w| w.healthy)
.map(|w| Self::worker_to_endpoint(w))
.collect()
}
pub fn worker_to_endpoint(worker: &DiscoveryWorker) -> WorkerEndpoint {
let http_endpoint = worker.ports.http
.map(|port| format!("http://{}:{}", worker.ip, port));
let ws_endpoint = worker.ports.ws
.map(|port| format!("ws://{}:{}/ws", worker.ip, port));
WorkerEndpoint::with_endpoints(
&worker.id,
&worker.region,
Some(format!("{}:{}", worker.ip, worker.ports.quic)),
Some(format!("http://{}:{}", worker.ip, worker.ports.grpc)),
ws_endpoint,
http_endpoint,
)
}
}