use std::collections::HashMap;
use std::sync::LazyLock;
use std::time::Duration;
use parking_lot::Mutex;
use tokio::sync::Semaphore;
use tokio::time::Instant;
use tracing::{debug, info};
const MAX_CONCURRENT_CONNECTS: usize = 5;
const MIN_CONNECT_INTERVAL: Duration = Duration::from_millis(100);
static GATE: LazyLock<ConnectGate> = LazyLock::new(ConnectGate::new);
struct HostState {
semaphore: Semaphore,
last_connect: Mutex<Instant>,
}
impl HostState {
fn new() -> Self {
Self {
semaphore: Semaphore::new(MAX_CONCURRENT_CONNECTS),
last_connect: Mutex::new(Instant::now() - MIN_CONNECT_INTERVAL),
}
}
}
struct ConnectGate {
hosts: Mutex<HashMap<String, &'static HostState>>,
}
impl ConnectGate {
fn new() -> Self {
Self {
hosts: Mutex::new(HashMap::new()),
}
}
fn host_state(&self, host: &str) -> &'static HostState {
let mut hosts = self.hosts.lock();
if let Some(state) = hosts.get(host) {
return state;
}
let state: &'static HostState = Box::leak(Box::new(HostState::new()));
hosts.insert(host.to_string(), state);
state
}
}
pub async fn acquire(host: &str) -> ConnectPermit {
let state = GATE.host_state(host);
let permit = state
.semaphore
.acquire()
.await
.expect("connect gate semaphore closed");
let sleep_for = {
let mut last = state.last_connect.lock();
let now = Instant::now();
let elapsed = now.duration_since(*last);
if elapsed < MIN_CONNECT_INTERVAL {
let delay = MIN_CONNECT_INTERVAL - elapsed;
*last = now + delay;
delay
} else {
*last = now;
Duration::ZERO
}
};
if !sleep_for.is_zero() {
debug!(
host = %host,
delay_ms = sleep_for.as_millis(),
"Connect gate: pacing connection start"
);
tokio::time::sleep(sleep_for).await;
}
info!(
host = %host,
available = state.semaphore.available_permits(),
max = MAX_CONCURRENT_CONNECTS,
"Connect gate: slot acquired"
);
ConnectPermit { _permit: permit }
}
pub struct ConnectPermit {
_permit: tokio::sync::SemaphorePermit<'static>,
}