use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
struct PortRecord {
allocated_at: Instant,
ttl: Duration,
}
impl PortRecord {
fn is_expired(&self) -> bool {
self.allocated_at.elapsed() >= self.ttl
}
fn remaining(&self) -> Duration {
self.ttl.saturating_sub(self.allocated_at.elapsed())
}
}
struct PortBitmap {
bits: Vec<u64>,
start: u16,
end: u16,
count: u16,
used: u16,
}
impl PortBitmap {
fn new(start: u16, end: u16) -> Self {
let count = end - start + 1;
let words = (count as usize + 63) / 64;
Self {
bits: vec![0u64; words],
start,
end,
count,
used: 0,
}
}
#[inline]
fn bit_index(&self, port: u16) -> (usize, u8) {
let idx = (port - self.start) as usize;
(idx / 64, (idx % 64) as u8)
}
#[inline]
fn is_set(&self, port: u16) -> bool {
let (word, bit) = self.bit_index(port);
(self.bits[word] >> bit) & 1 == 1
}
#[inline]
fn set(&mut self, port: u16) {
let (word, bit) = self.bit_index(port);
self.bits[word] |= 1u64 << bit;
self.used += 1;
}
#[inline]
fn clear(&mut self, port: u16) {
let (word, bit) = self.bit_index(port);
self.bits[word] &= !(1u64 << bit);
self.used -= 1;
}
fn allocate(&mut self, rng: u64) -> Option<u16> {
if self.used >= self.count {
return None;
}
let random_offset = (rng % self.count as u64) as u16;
let probe_port = self.start + random_offset;
for i in 0..self.count {
let port = probe_port.wrapping_add(i).wrapping_rem(self.count);
let port = self.start + port;
if !self.is_set(port) {
self.set(port);
return Some(port);
}
}
None
}
fn allocate_even(&mut self, rng: u64) -> Option<u16> {
let even_start = if self.start % 2 == 0 { self.start } else { self.start + 1 };
let even_end = if self.end % 2 == 0 { self.end } else { self.end - 1 };
if even_start > even_end {
return None;
}
let even_count = ((even_end - even_start) / 2 + 1) as u64;
let random_offset = (rng % even_count) as u16;
let mut port = even_start + random_offset * 2;
for _ in 0..even_count {
if !self.is_set(port) {
self.set(port);
return Some(port);
}
port += 2;
if port > even_end {
port = even_start;
}
}
None
}
}
#[derive(Clone)]
pub(crate) struct PortAllocator {
inner: Arc<Mutex<PortAllocatorInner>>,
start: u16,
end: u16,
default_ttl: Duration,
}
struct PortAllocatorInner {
bitmap: PortBitmap,
records: HashMap<u16, PortRecord>,
}
impl std::fmt::Debug for PortAllocator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let inner = self.inner.lock();
f.debug_struct("PortAllocator")
.field("range", &(self.start, self.end))
.field("allocated", &inner.records.len())
.field("capacity", &inner.bitmap.count)
.field("default_ttl", &self.default_ttl)
.finish()
}
}
impl PortAllocator {
pub fn new(start: u16, end: u16) -> Self {
Self::with_ttl(start, end, Duration::from_secs(300))
}
pub fn with_ttl(start: u16, end: u16, ttl: Duration) -> Self {
Self {
inner: Arc::new(Mutex::new(PortAllocatorInner {
bitmap: PortBitmap::new(start, end),
records: HashMap::new(),
})),
start,
end,
default_ttl: ttl,
}
}
pub fn allocate(&self) -> Option<u16> {
self.allocate_with_ttl(self.default_ttl)
}
pub fn allocate_with_ttl(&self, ttl: Duration) -> Option<u16> {
let rng = crate::transports::ice::stun::random_u64();
let mut inner = self.inner.lock();
let port = inner.bitmap.allocate(rng)?;
inner.records.insert(port, PortRecord {
allocated_at: Instant::now(),
ttl,
});
Some(port)
}
pub fn allocate_even(&self) -> Option<u16> {
self.allocate_even_with_ttl(self.default_ttl)
}
pub fn allocate_even_with_ttl(&self, ttl: Duration) -> Option<u16> {
let rng = crate::transports::ice::stun::random_u64();
let mut inner = self.inner.lock();
let port = inner.bitmap.allocate_even(rng)?;
inner.records.insert(port, PortRecord {
allocated_at: Instant::now(),
ttl,
});
Some(port)
}
pub fn release(&self, port: u16) {
let mut inner = self.inner.lock();
if inner.bitmap.is_set(port) {
inner.bitmap.clear(port);
inner.records.remove(&port);
}
}
pub fn renew(&self, port: u16) {
self.renew_with_ttl(port, self.default_ttl);
}
pub fn renew_with_ttl(&self, port: u16, ttl: Duration) {
let mut inner = self.inner.lock();
if let Some(record) = inner.records.get_mut(&port) {
record.allocated_at = Instant::now();
record.ttl = ttl;
}
}
pub fn cleanup_expired(&self) -> usize {
let mut inner = self.inner.lock();
let expired: Vec<u16> = inner.records.iter()
.filter(|(_, record)| record.is_expired())
.map(|(&port, _)| port)
.collect();
let count = expired.len();
for port in expired {
inner.bitmap.clear(port);
inner.records.remove(&port);
}
count
}
pub fn allocated_count(&self) -> u16 {
let inner = self.inner.lock();
inner.records.len() as u16
}
pub fn expired_count(&self) -> usize {
let inner = self.inner.lock();
inner.records.values().filter(|r| r.is_expired()).count()
}
pub fn capacity(&self) -> u16 {
self.end - self.start + 1
}
pub fn port_range(&self) -> (u16, u16) {
(self.start, self.end)
}
pub fn remaining_ttl(&self, port: u16) -> Option<Duration> {
let inner = self.inner.lock();
inner.records.get(&port).map(|r| r.remaining())
}
pub fn start_cleanup_task(self, interval: Duration) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut timer = tokio::time::interval(interval);
loop {
timer.tick().await;
let released = self.cleanup_expired();
if released > 0 {
tracing::debug!("Port allocator: released {} expired ports", released);
}
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn test_port_allocator_basic() {
let allocator = PortAllocator::new(1000, 1009);
assert_eq!(allocator.capacity(), 10);
let port = allocator.allocate().unwrap();
assert!(port >= 1000 && port <= 1009);
assert_eq!(allocator.allocated_count(), 1);
allocator.release(port);
assert_eq!(allocator.allocated_count(), 0);
}
#[test]
fn test_port_allocator_ttl_expiry() {
let allocator = PortAllocator::with_ttl(1000, 1004, Duration::from_millis(100));
let port = allocator.allocate().unwrap();
assert_eq!(allocator.allocated_count(), 1);
assert!(allocator.expired_count() == 0);
thread::sleep(Duration::from_millis(150));
assert_eq!(allocator.expired_count(), 1);
let released = allocator.cleanup_expired();
assert_eq!(released, 1);
assert_eq!(allocator.allocated_count(), 0);
}
#[test]
fn test_port_allocator_renew() {
let allocator = PortAllocator::with_ttl(1000, 1004, Duration::from_millis(100));
let port = allocator.allocate().unwrap();
thread::sleep(Duration::from_millis(80));
allocator.renew(port);
thread::sleep(Duration::from_millis(80));
assert_eq!(allocator.expired_count(), 0, "Port should not be expired after renewal");
thread::sleep(Duration::from_millis(50));
assert_eq!(allocator.expired_count(), 1, "Port should be expired after renewal TTL");
}
#[test]
fn test_port_allocator_thread_safety() {
let allocator = Arc::new(PortAllocator::new(10000, 19999));
let mut handles = vec![];
for _ in 0..50 {
let alloc = allocator.clone();
handles.push(thread::spawn(move || {
let mut ports = vec![];
for _ in 0..20 {
if let Some(port) = alloc.allocate_even() {
ports.push(port);
}
}
ports
}));
}
let mut all_ports = vec![];
for h in handles {
all_ports.extend(h.join().unwrap());
}
let unique_count = {
all_ports.sort();
all_ports.dedup();
all_ports.len()
};
assert_eq!(unique_count, 1000);
}
#[test]
fn test_port_allocator_exhaustion_and_recycling() {
let allocator = PortAllocator::with_ttl(30000, 30001, Duration::from_millis(50));
let p1 = allocator.allocate_even().unwrap();
assert_eq!(p1, 30000);
assert!(allocator.allocate_even().is_none());
thread::sleep(Duration::from_millis(100));
allocator.cleanup_expired();
let p2 = allocator.allocate_even().unwrap();
assert_eq!(p2, 30000);
}
#[test]
fn test_port_allocator_manual_release_before_ttl() {
let allocator = PortAllocator::with_ttl(40000, 40004, Duration::from_secs(60));
let port = allocator.allocate().unwrap();
assert_eq!(allocator.allocated_count(), 1);
allocator.release(port);
assert_eq!(allocator.allocated_count(), 0);
let port2 = allocator.allocate().unwrap();
assert_eq!(port, port2);
}
#[tokio::test]
async fn test_port_allocator_cleanup_task() {
let allocator = PortAllocator::with_ttl(50000, 50004, Duration::from_millis(100));
let _port = allocator.allocate().unwrap();
assert_eq!(allocator.allocated_count(), 1);
let handle = allocator.clone().start_cleanup_task(Duration::from_millis(50));
tokio::time::sleep(Duration::from_millis(300)).await;
assert_eq!(allocator.allocated_count(), 0);
handle.abort();
}
#[test]
fn test_port_allocator_high_concurrency_performance() {
let allocator = PortAllocator::new(10000, 59999);
let start = Instant::now();
let mut ports = Vec::with_capacity(10000);
for _ in 0..10000 {
if let Some(port) = allocator.allocate_even() {
ports.push(port);
}
}
let elapsed = start.elapsed();
assert_eq!(ports.len(), 10000);
let mut sorted = ports.clone();
sorted.sort();
sorted.dedup();
assert_eq!(sorted.len(), 10000);
assert!(
elapsed.as_millis() < 50,
"10000 allocations took {:?}, expected < 50ms",
elapsed
);
for port in &ports {
allocator.release(*port);
}
assert_eq!(allocator.allocated_count(), 0);
}
}