#![cfg_attr(feature = "fail-on-warnings", deny(warnings))]
#![warn(clippy::all, clippy::pedantic, clippy::nursery, clippy::cargo)]
#![allow(clippy::multiple_crate_versions)]
#![forbid(unsafe_code)]
use std::{
net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6, TcpListener, ToSocketAddrs, UdpSocket},
ops::{Range, RangeInclusive},
};
#[cfg(feature = "reservation")]
mod reservation;
pub type Port = u16;
#[cfg(feature = "reservation")]
pub type PortReservation = reservation::PortReservation<Range<Port>>;
fn test_bind_udp<A: ToSocketAddrs>(addr: A) -> Option<Port> {
Some(UdpSocket::bind(addr).ok()?.local_addr().ok()?.port())
}
fn test_bind_tcp<A: ToSocketAddrs>(addr: A) -> Option<Port> {
Some(TcpListener::bind(addr).ok()?.local_addr().ok()?.port())
}
#[must_use]
pub fn is_free_udp(port: Port) -> bool {
let ipv4 = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port);
let ipv6 = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, 0);
test_bind_udp(ipv6).is_some() && test_bind_udp(ipv4).is_some()
}
#[must_use]
pub fn is_free_tcp(port: Port) -> bool {
let ipv4 = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port);
let ipv6 = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, 0);
test_bind_tcp(ipv6).is_some() && test_bind_tcp(ipv4).is_some()
}
#[must_use]
pub fn is_free(port: Port) -> bool {
is_free_tcp(port) && is_free_udp(port)
}
#[cfg(feature = "rand")]
fn ask_free_tcp_port() -> Option<Port> {
let ipv4 = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0);
let ipv6 = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0);
test_bind_tcp(ipv6).or_else(|| test_bind_tcp(ipv4))
}
#[cfg(feature = "rand")]
#[allow(clippy::must_use_candidate)]
pub fn pick_random_unused_port() -> Option<Port> {
for _ in 0..10 {
let port = switchy_random::rng().gen_range(15000..25000);
if is_free(port) {
return Some(port);
}
}
for _ in 0..10 {
if let Some(port) = ask_free_tcp_port() {
if is_free_udp(port) {
return Some(port);
}
}
}
None
}
pub trait PortRange {
fn into_iter(self) -> impl Iterator<Item = u16>;
fn iter(&self) -> impl Iterator<Item = u16>;
}
impl PortRange for Range<u16> {
#[inline]
fn into_iter(self) -> impl Iterator<Item = u16> {
<Self as IntoIterator>::into_iter(self)
}
#[inline]
fn iter(&self) -> impl Iterator<Item = u16> {
self.clone()
}
}
impl PortRange for RangeInclusive<u16> {
#[inline]
fn into_iter(self) -> impl Iterator<Item = u16> {
<Self as IntoIterator>::into_iter(self)
}
#[inline]
fn iter(&self) -> impl Iterator<Item = u16> {
self.clone()
}
}
pub fn pick_unused_port(range: impl PortRange) -> Option<Port> {
range.into_iter().find(|x| is_free(*x))
}
#[cfg(test)]
pub(crate) mod test_utils {
use std::sync::atomic::{AtomicU16, Ordering};
static NEXT_RANGE_START: AtomicU16 = AtomicU16::new(40000);
pub fn next_port_range(size: u16) -> std::ops::Range<u16> {
let start = NEXT_RANGE_START.fetch_add(size, Ordering::Relaxed);
start..start + size
}
pub fn next_port_range_inclusive(size: u16) -> std::ops::RangeInclusive<u16> {
let start = NEXT_RANGE_START.fetch_add(size, Ordering::Relaxed);
start..=start + size - 1
}
}
#[cfg(test)]
mod tests {
use super::{PortRange, is_free, is_free_tcp, is_free_udp, pick_unused_port, test_utils};
use std::net::{TcpListener, UdpSocket};
#[cfg(feature = "rand")]
use super::pick_random_unused_port;
#[cfg(feature = "rand")]
#[test_log::test]
fn it_works() {
assert!(pick_random_unused_port().is_some());
}
#[test_log::test]
fn port_range_test() {
let range1 = test_utils::next_port_range(1000);
if let Some(p) = pick_unused_port(range1.clone()) {
assert!(range1.contains(&p));
}
let range2 = test_utils::next_port_range(1000);
if let Some(p) = pick_unused_port(range2.clone()) {
assert!(range2.contains(&p));
}
}
#[test_log::test]
fn port_range_inclusize_test() {
let range1 = test_utils::next_port_range_inclusive(1001);
if let Some(p) = pick_unused_port(range1.clone()) {
assert!(range1.contains(&p));
}
let range2 = test_utils::next_port_range_inclusive(1001);
if let Some(p) = pick_unused_port(range2.clone()) {
assert!(range2.contains(&p));
}
}
#[test_log::test]
fn test_is_free_tcp() {
let range = test_utils::next_port_range(1000);
for _ in 0..10 {
if let Some(port) = pick_unused_port(range.clone()) {
if is_free_tcp(port) {
if let Ok(_listener) = TcpListener::bind(("0.0.0.0", port)) {
assert!(!is_free_tcp(port));
return; }
}
}
}
panic!("Could not find a port to test with after 10 attempts");
}
#[test_log::test]
fn test_is_free_udp() {
let range = test_utils::next_port_range(1000);
for _ in 0..10 {
if let Some(port) = pick_unused_port(range.clone()) {
if is_free_udp(port) {
if let Ok(_socket) = UdpSocket::bind(("0.0.0.0", port)) {
assert!(!is_free_udp(port));
return; }
}
}
}
panic!("Could not find a port to test with after 10 attempts");
}
#[test_log::test]
fn test_is_free() {
let range = test_utils::next_port_range(1000);
for _ in 0..10 {
if let Some(port) = pick_unused_port(range.clone()) {
if is_free(port) {
if let Ok(_listener) = TcpListener::bind(("0.0.0.0", port)) {
for _ in 0..10 {
if !is_free(port) {
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
assert!(!is_free(port));
return; }
}
}
}
panic!("Could not find a port to test with after 10 attempts");
}
#[test_log::test]
fn test_is_free_udp_binding() {
let range = test_utils::next_port_range(1000);
for _ in 0..10 {
if let Some(port) = pick_unused_port(range.clone()) {
if is_free(port) {
if let Ok(_socket) = UdpSocket::bind(("0.0.0.0", port)) {
assert!(!is_free(port));
return; }
}
}
}
panic!("Could not find a port to test with after 10 attempts");
}
#[test_log::test]
fn test_port_range_trait_exclusive() {
let range = 15000..15010;
let ports: Vec<u16> = range.iter().collect();
assert_eq!(ports.len(), 10);
assert_eq!(ports[0], 15000);
assert_eq!(ports[9], 15009);
let range2 = 15000..15010;
let ports2: Vec<u16> = PortRange::into_iter(range2).collect();
assert_eq!(ports2.len(), 10);
assert_eq!(ports2[0], 15000);
assert_eq!(ports2[9], 15009);
}
#[test_log::test]
fn test_port_range_trait_inclusive() {
let range = 15000..=15010;
let ports: Vec<u16> = range.iter().collect();
assert_eq!(ports.len(), 11);
assert_eq!(ports[0], 15000);
assert_eq!(ports[10], 15010);
let range2 = 15000..=15010;
let ports2: Vec<u16> = PortRange::into_iter(range2).collect();
assert_eq!(ports2.len(), 11);
assert_eq!(ports2[0], 15000);
assert_eq!(ports2[10], 15010);
}
#[test_log::test]
fn test_port_range_empty() {
let range = 15000..15000;
assert_eq!(range.iter().count(), 0);
let result = pick_unused_port(15000..15000);
assert!(result.is_none());
}
#[test_log::test]
fn test_port_range_single_port() {
let range = 15000..=15000;
let ports: Vec<u16> = range.iter().collect();
assert_eq!(ports.len(), 1);
assert_eq!(ports[0], 15000);
}
#[test_log::test]
fn test_ipv6_tcp_binding_affects_is_free() {
use std::net::{Ipv6Addr, SocketAddrV6};
let range = test_utils::next_port_range(1000);
for _ in 0..10 {
if let Some(port) = pick_unused_port(range.clone()) {
if is_free_tcp(port) {
let addr = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, 0);
if let Ok(_listener) = TcpListener::bind(addr) {
assert!(!is_free_tcp(port));
return; }
}
}
}
}
#[test_log::test]
fn test_ipv6_udp_binding_affects_is_free() {
use std::net::{Ipv6Addr, SocketAddrV6};
let range = test_utils::next_port_range(1000);
for _ in 0..10 {
if let Some(port) = pick_unused_port(range.clone()) {
if is_free_udp(port) {
let addr = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, 0);
if let Ok(_socket) = UdpSocket::bind(addr) {
assert!(!is_free_udp(port));
return; }
}
}
}
}
#[test_log::test]
fn test_pick_unused_port_finds_first_available() {
let range = test_utils::next_port_range(10);
for _ in 0..10 {
if let Some(first_port) = pick_unused_port(range.clone())
&& is_free(first_port)
&& let Ok(_listener) = TcpListener::bind(("0.0.0.0", first_port))
{
if let Some(next_port) = pick_unused_port((first_port + 1)..range.end) {
assert_ne!(next_port, first_port);
assert!(next_port > first_port);
}
return; }
}
panic!("Could not find ports to test with after 10 attempts");
}
#[test_log::test]
fn test_concurrent_tcp_and_udp_binding() {
let range = test_utils::next_port_range(1000);
for _ in 0..10 {
if let Some(port) = pick_unused_port(range.clone())
&& is_free(port)
&& let Ok(tcp_listener) = TcpListener::bind(("0.0.0.0", port))
{
assert!(!is_free(port));
assert!(!is_free_tcp(port));
drop(tcp_listener);
if let Some(port2) = pick_unused_port(range.clone())
&& is_free(port2)
&& let Ok(_udp_socket) = UdpSocket::bind(("0.0.0.0", port2))
{
assert!(!is_free(port2));
assert!(!is_free_udp(port2));
return; }
}
}
panic!("Could not find ports to test with after 10 attempts");
}
#[test_log::test]
fn test_ipv4_tcp_binding_affects_is_free() {
use std::net::{Ipv4Addr, SocketAddrV4};
let range = test_utils::next_port_range(1000);
for _ in 0..10 {
if let Some(port) = pick_unused_port(range.clone()) {
if is_free_tcp(port) {
let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port);
if let Ok(_listener) = TcpListener::bind(addr) {
assert!(!is_free_tcp(port));
return; }
}
}
}
panic!("Could not find a port to test with after 10 attempts");
}
#[test_log::test]
fn test_ipv4_udp_binding_affects_is_free() {
use std::net::{Ipv4Addr, SocketAddrV4};
let range = test_utils::next_port_range(1000);
for _ in 0..10 {
if let Some(port) = pick_unused_port(range.clone()) {
if is_free_udp(port) {
let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port);
if let Ok(_socket) = UdpSocket::bind(addr) {
assert!(!is_free_udp(port));
return; }
}
}
}
panic!("Could not find a port to test with after 10 attempts");
}
#[test_log::test]
fn test_is_free_when_only_udp_bound() {
let range = test_utils::next_port_range(1000);
for _ in 0..10 {
if let Some(port) = pick_unused_port(range.clone())
&& is_free(port)
&& let Ok(_udp_socket) = UdpSocket::bind(("0.0.0.0", port))
{
assert!(!is_free(port));
assert!(!is_free_udp(port));
assert!(is_free_tcp(port));
return; }
}
panic!("Could not find a port to test with after 10 attempts");
}
#[test_log::test]
fn test_pick_unused_port_with_all_ports_occupied() {
use std::net::{Ipv4Addr, SocketAddrV4};
let range = test_utils::next_port_range(3);
let mut listeners = Vec::new();
let mut sockets = Vec::new();
for port in range.clone() {
let addr_tcp = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port);
let addr_udp = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port);
if let Ok(listener) = TcpListener::bind(addr_tcp) {
listeners.push(listener);
}
if let Ok(socket) = UdpSocket::bind(addr_udp) {
sockets.push(socket);
}
}
let result = pick_unused_port(range);
if result.is_some() {
let _ = result.unwrap();
}
drop(listeners);
drop(sockets);
}
}