use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
pub fn is_private_ip(ip: IpAddr) -> bool {
match ip {
IpAddr::V4(ipv4) => {
ipv4.is_private()
|| ipv4.is_loopback()
|| ipv4.is_link_local()
|| ipv4.is_broadcast()
|| ipv4.is_multicast()
|| ipv4.is_unspecified()
|| (ipv4.octets()[0] == 169 && ipv4.octets()[1] == 254) }
IpAddr::V6(ipv6) => {
if ipv6.is_loopback() || ipv6.is_multicast() || ipv6.is_unspecified() {
return true;
}
if let Some(v4) = embedded_ipv4(ipv6) {
return is_private_ip(IpAddr::V4(v4));
}
let seg = ipv6.segments();
seg[0] & 0xfe00 == 0xfc00 || seg[0] & 0xffc0 == 0xfe80 }
}
}
fn embedded_ipv4(ipv6: Ipv6Addr) -> Option<Ipv4Addr> {
if let Some(v4) = ipv6.to_ipv4_mapped() {
return Some(v4);
}
let seg = ipv6.segments();
(seg[0] == 0 && seg[1] == 0 && seg[2] == 0 && seg[3] == 0 && seg[4] == 0 && seg[5] == 0)
.then(|| Ipv4Addr::new((seg[6] >> 8) as u8, seg[6] as u8, (seg[7] >> 8) as u8, seg[7] as u8))
}
pub async fn resolve_and_check_public(host: &str, port: u16) -> Result<Vec<SocketAddr>, String> {
let addrs = tokio::net::lookup_host((host, port))
.await
.map_err(|e| format!("DNS resolution failed for {host}: {e}"))?;
let mut resolved = Vec::new();
for sa in addrs {
if is_private_ip(sa.ip()) {
return Err(format!("host {host} resolved to private/reserved IP {}", sa.ip()));
}
resolved.push(sa);
}
if resolved.is_empty() {
return Err(format!("DNS resolution returned no addresses for {host}"));
}
Ok(resolved)
}
pub fn is_aws_metadata(host: &str) -> bool {
host == "169.254.169.254" || host == "fd00:ec2::254"
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::Ipv4Addr;
#[test]
fn private_ipv4_blocked() {
assert!(is_private_ip(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1))));
assert!(is_private_ip(IpAddr::V4(Ipv4Addr::new(172, 16, 0, 1))));
assert!(is_private_ip(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1))));
assert!(is_private_ip(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))));
assert!(is_private_ip(IpAddr::V4(Ipv4Addr::new(169, 254, 1, 1))));
}
#[test]
fn public_ipv4_allowed() {
assert!(!is_private_ip(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8))));
assert!(!is_private_ip(IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1))));
}
#[test]
fn aws_metadata_blocked() {
assert!(is_aws_metadata("169.254.169.254"));
assert!(is_aws_metadata("fd00:ec2::254"));
assert!(!is_aws_metadata("8.8.8.8"));
}
#[test]
fn ipv4_mapped_ipv6_normalized() {
let cases = ["::ffff:127.0.0.1", "::ffff:169.254.169.254", "::ffff:10.0.0.1"];
for s in cases {
let ip: IpAddr = s.parse().unwrap();
assert!(is_private_ip(ip), "{s} should be flagged private");
}
let pub_ip: IpAddr = "::ffff:8.8.8.8".parse().unwrap();
assert!(!is_private_ip(pub_ip));
}
#[test]
fn ipv4_compatible_ipv6_normalized() {
let cases = ["::127.0.0.1", "::169.254.169.254", "::10.0.0.1"];
for s in cases {
let ip: IpAddr = s.parse().unwrap();
assert!(is_private_ip(ip), "{s} should be flagged private");
}
let pub_ip: IpAddr = "::8.8.8.8".parse().unwrap();
assert!(!is_private_ip(pub_ip));
}
}