jokoway 0.1.0-rc.1

Jokoway is a high-performance API Gateway built on Pingora (Rust) with dead-simple YAML configs.
Documentation
use crate::config::models::UpstreamServer;
use crate::extensions::dns::DnsResolver;
use crate::server::proxy::CachedPeerConfig;
use async_trait::async_trait;
use pingora::Error;
use pingora::lb::Backend;
use pingora::lb::discovery::ServiceDiscovery;
use std::collections::{BTreeSet, HashMap};
use std::net::SocketAddr;
use std::sync::Arc;

pub struct JokowayUpstreamDiscovery {
    pub servers: Vec<(UpstreamServer, CachedPeerConfig)>,
    pub resolver: Arc<DnsResolver>,
}

impl JokowayUpstreamDiscovery {
    pub fn new(
        servers: Vec<(UpstreamServer, CachedPeerConfig)>,
        resolver: Arc<DnsResolver>,
    ) -> Self {
        Self { servers, resolver }
    }
}

#[async_trait]
impl ServiceDiscovery for JokowayUpstreamDiscovery {
    async fn discover(&self) -> Result<(BTreeSet<Backend>, HashMap<u64, bool>), Box<Error>> {
        let mut backends = BTreeSet::new();

        for (server, cached_config) in &self.servers {
            // Check if it's already an IP address
            if server.host.parse::<SocketAddr>().is_ok() {
                let weight = server.weight.unwrap_or(1) as usize;
                if let Ok(mut backend) = Backend::new_with_weight(&server.host, weight) {
                    backend.ext.insert(cached_config.clone());
                    backends.insert(backend);
                }
                continue;
            }

            // Split host and port
            let (host, port) = if let Some((h, p)) = server.host.rsplit_once(':') {
                (h, p.parse::<u16>().unwrap_or(80))
            } else {
                (server.host.as_str(), 80)
            };

            // Perform DNS resolution
            match self.resolver.lookup_ip(host).await {
                Ok(resolved) => {
                    let count = resolved.len();
                    if count > 0 {
                        let configured_weight = server.weight.unwrap_or(1) as usize;
                        // Give each resolved IP the full configured weight
                        // This preserves the intended weight for each backend
                        let weight_per_addr = configured_weight;

                        for addr in resolved {
                            let socket_addr = SocketAddr::new(addr, port);
                            let addr_str = socket_addr.to_string();
                            log::trace!(
                                "Discovered backend for {}: {} with weight {} (configured weight: {}, resolved IPs: {})",
                                server.host,
                                addr_str,
                                weight_per_addr,
                                configured_weight,
                                count
                            );
                            if let Ok(mut backend) =
                                Backend::new_with_weight(&addr_str, weight_per_addr)
                            {
                                backend.ext.insert(cached_config.clone());
                                backends.insert(backend);
                            }
                        }
                    } else {
                        log::warn!("No addresses resolved for host {}", server.host);
                    }
                }
                Err(e) => {
                    log::error!("Failed to resolve host {}: {}", server.host, e);
                }
            }
        }

        Ok((backends, HashMap::new()))
    }
}