use crate::config::models::UpstreamServer;
use crate::extensions::dns::DnsResolver;
use crate::server::proxy::CachedPeerConfig;
use async_trait::async_trait;
use pingora::Error;
use pingora::lb::Backend;
use pingora::lb::discovery::ServiceDiscovery;
use std::collections::{BTreeSet, HashMap};
use std::net::SocketAddr;
use std::sync::Arc;
pub struct JokowayUpstreamDiscovery {
pub servers: Vec<(UpstreamServer, CachedPeerConfig)>,
pub resolver: Arc<DnsResolver>,
}
impl JokowayUpstreamDiscovery {
pub fn new(
servers: Vec<(UpstreamServer, CachedPeerConfig)>,
resolver: Arc<DnsResolver>,
) -> Self {
Self { servers, resolver }
}
}
#[async_trait]
impl ServiceDiscovery for JokowayUpstreamDiscovery {
async fn discover(&self) -> Result<(BTreeSet<Backend>, HashMap<u64, bool>), Box<Error>> {
let mut backends = BTreeSet::new();
for (server, cached_config) in &self.servers {
if server.host.parse::<SocketAddr>().is_ok() {
let weight = server.weight.unwrap_or(1) as usize;
if let Ok(mut backend) = Backend::new_with_weight(&server.host, weight) {
backend.ext.insert(cached_config.clone());
backends.insert(backend);
}
continue;
}
let (host, port) = if let Some((h, p)) = server.host.rsplit_once(':') {
(h, p.parse::<u16>().unwrap_or(80))
} else {
(server.host.as_str(), 80)
};
match self.resolver.lookup_ip(host).await {
Ok(resolved) => {
let count = resolved.len();
if count > 0 {
let configured_weight = server.weight.unwrap_or(1) as usize;
let weight_per_addr = configured_weight;
for addr in resolved {
let socket_addr = SocketAddr::new(addr, port);
let addr_str = socket_addr.to_string();
log::trace!(
"Discovered backend for {}: {} with weight {} (configured weight: {}, resolved IPs: {})",
server.host,
addr_str,
weight_per_addr,
configured_weight,
count
);
if let Ok(mut backend) =
Backend::new_with_weight(&addr_str, weight_per_addr)
{
backend.ext.insert(cached_config.clone());
backends.insert(backend);
}
}
} else {
log::warn!("No addresses resolved for host {}", server.host);
}
}
Err(e) => {
log::error!("Failed to resolve host {}: {}", server.host, e);
}
}
}
Ok((backends, HashMap::new()))
}
}