use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::Mutex;
use tokio::sync::Semaphore;
use tracing::{debug, info, trace, warn};
use crate::config::ServerConfig;
use crate::connection::{ConnectionState, NntpConnection};
use crate::error::{NntpError, NntpResult};
const IDLE_TIMEOUT: Duration = Duration::from_secs(60);
const ACQUIRE_TIMEOUT: Duration = Duration::from_secs(30);
pub struct PooledConnection {
pub conn: NntpConnection,
pub last_used: Instant,
semaphore: Option<Arc<Semaphore>>,
}
impl PooledConnection {
pub fn unmanaged(conn: NntpConnection) -> Self {
Self {
conn,
last_used: Instant::now(),
semaphore: None,
}
}
}
impl Drop for PooledConnection {
fn drop(&mut self) {
if let Some(sem) = self.semaphore.take() {
warn!(
conn_id = %self.conn.server_id,
"PooledConnection dropped without release/discard — freeing permit"
);
sem.add_permits(1);
}
}
}
pub struct ConnectionPool {
config: Arc<ServerConfig>,
idle: Mutex<Vec<PooledConnection>>,
semaphore: Arc<Semaphore>,
created_count: Mutex<u32>,
last_connect: tokio::sync::Mutex<Instant>,
ramp_up_delay: Duration,
}
impl ConnectionPool {
pub fn new(config: Arc<ServerConfig>) -> Self {
let max_conns = config.connections as usize;
let ramp_up_delay = Duration::from_millis(config.ramp_up_delay_ms as u64);
Self {
idle: Mutex::new(Vec::with_capacity(max_conns)),
semaphore: Arc::new(Semaphore::new(max_conns)),
created_count: Mutex::new(0),
last_connect: tokio::sync::Mutex::new(Instant::now() - Duration::from_secs(60)),
ramp_up_delay,
config,
}
}
pub async fn acquire(&self) -> NntpResult<PooledConnection> {
let available_permits = self.semaphore.available_permits();
let idle_count = self.idle.lock().len();
debug!(
server = %self.config.name,
available_permits,
idle_count,
max_conns = self.config.connections,
"Pool acquire: waiting for slot"
);
let permit = tokio::time::timeout(ACQUIRE_TIMEOUT, self.semaphore.clone().acquire_owned())
.await
.map_err(|_| {
warn!(
server = %self.config.name,
available_permits,
idle_count,
max_conns = self.config.connections,
"Pool acquire TIMED OUT after {}s — all {} slots busy",
ACQUIRE_TIMEOUT.as_secs(),
self.config.connections
);
NntpError::Timeout(format!(
"Timed out waiting for connection to {}",
self.config.name
))
})?
.map_err(|_| {
NntpError::Connection(format!(
"Connection pool for {} is closed",
self.config.name
))
})?;
let maybe_idle = { self.idle.lock().pop() };
if let Some(mut pooled) = maybe_idle {
pooled.semaphore = Some(Arc::clone(&self.semaphore));
if pooled.conn.state == ConnectionState::Ready && pooled.conn.is_connected() {
if pooled.last_used.elapsed() > IDLE_TIMEOUT {
info!(
server = %self.config.name,
conn_id = %pooled.conn.server_id,
idle_secs = pooled.last_used.elapsed().as_secs(),
"Pool: idle connection stale — health checking"
);
match pooled.conn.stat_article("<health-check@pool>").await {
Ok(_) | Err(NntpError::ArticleNotFound(_)) => {
debug!(
server = %self.config.name,
conn_id = %pooled.conn.server_id,
"Pool: health check passed, reusing"
);
pooled.last_used = Instant::now();
permit.forget(); return Ok(pooled);
}
Err(e) => {
warn!(
server = %self.config.name,
conn_id = %pooled.conn.server_id,
error = %e,
"Pool: idle connection FAILED health check — creating new"
);
}
}
} else {
debug!(
server = %self.config.name,
conn_id = %pooled.conn.server_id,
idle_secs = pooled.last_used.elapsed().as_secs(),
"Pool: reusing idle connection"
);
permit.forget();
return Ok(pooled);
}
} else {
warn!(
server = %self.config.name,
conn_id = %pooled.conn.server_id,
state = ?pooled.conn.state,
connected = pooled.conn.is_connected(),
"Pool: idle connection in bad state — creating new"
);
}
}
info!(
server = %self.config.name,
"Pool: no reusable connection, creating new"
);
let conn = self.create_connection().await?;
permit.forget();
Ok(PooledConnection {
conn,
last_used: Instant::now(),
semaphore: Some(Arc::clone(&self.semaphore)),
})
}
pub fn release(&self, mut pooled: PooledConnection) {
pooled.semaphore = None;
if pooled.conn.state == ConnectionState::Ready && pooled.conn.is_connected() {
pooled.last_used = Instant::now();
let idle_after = {
let mut idle = self.idle.lock();
idle.push(pooled);
idle.len()
};
self.semaphore.add_permits(1);
debug!(
server = %self.config.name,
idle_count = idle_after,
"Pool: connection released back to idle"
);
} else {
warn!(
server = %self.config.name,
conn_id = %pooled.conn.server_id,
state = ?pooled.conn.state,
connected = pooled.conn.is_connected(),
"Pool: discarding unhealthy connection on release"
);
drop(pooled);
self.semaphore.add_permits(1);
}
}
pub fn discard(&self, mut pooled: PooledConnection) {
pooled.semaphore = None;
info!(
server = %self.config.name,
conn_id = %pooled.conn.server_id,
state = ?pooled.conn.state,
"Pool: discarding connection (fatal error)"
);
drop(pooled);
self.semaphore.add_permits(1);
}
pub fn idle_count(&self) -> usize {
self.idle.lock().len()
}
pub async fn close_idle(&self) {
let conns: Vec<PooledConnection> = {
let mut idle = self.idle.lock();
idle.drain(..).collect()
};
let count = conns.len();
for mut c in conns {
let _ = c.conn.quit().await;
self.semaphore.add_permits(1);
}
if count > 0 {
debug!(server = %self.config.name, count, "Closed idle connections");
}
}
pub fn config(&self) -> &ServerConfig {
&self.config
}
#[cfg(test)]
pub(crate) fn available_permits(&self) -> usize {
self.semaphore.available_permits()
}
pub async fn wait_for_ramp_up(&self) {
if self.ramp_up_delay.is_zero() {
return;
}
let mut last = self.last_connect.lock().await;
let elapsed = last.elapsed();
if elapsed < self.ramp_up_delay {
let wait = self.ramp_up_delay - elapsed;
trace!(
server = %self.config.name,
wait_ms = wait.as_millis(),
"Ramp-up: waiting before new connection"
);
tokio::time::sleep(wait).await;
}
*last = Instant::now();
}
pub fn ramp_up_delay(&self) -> Duration {
self.ramp_up_delay
}
async fn create_connection(&self) -> NntpResult<NntpConnection> {
self.wait_for_ramp_up().await;
let idx = {
let mut count = self.created_count.lock();
*count += 1;
*count
};
let conn_id = format!("{}#{}", self.config.id, idx);
info!(
server = %self.config.name,
conn_id = %conn_id,
host = %self.config.host,
port = self.config.port,
total_created = idx,
"Pool: creating new NNTP connection"
);
let mut conn = NntpConnection::new(conn_id.clone());
conn.connect(&self.config).await.inspect_err(|e| {
warn!(
server = %self.config.name,
conn_id = %conn_id,
error = %e,
"Pool: new connection FAILED"
);
self.semaphore.add_permits(1);
})?;
info!(
server = %self.config.name,
conn_id = %conn_id,
"Pool: new connection ready"
);
Ok(conn)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::testutil::{MockConfig, MockNntpServer, test_config};
#[tokio::test]
async fn test_pool_new() {
let config = Arc::new(test_config(12345));
let pool = ConnectionPool::new(config.clone());
assert_eq!(pool.idle_count(), 0);
assert_eq!(pool.config().id, "test-server");
assert_eq!(pool.available_permits(), 4); }
#[tokio::test]
async fn test_pool_acquire_creates_connection() {
let server = MockNntpServer::start(MockConfig::default()).await;
let config = Arc::new(test_config(server.port()));
let pool = ConnectionPool::new(config);
let pooled = pool.acquire().await.unwrap();
assert_eq!(pooled.conn.state, ConnectionState::Ready);
assert!(pooled.conn.is_connected());
assert_eq!(pool.idle_count(), 0);
pool.release(pooled);
assert_eq!(pool.idle_count(), 1);
}
#[tokio::test]
async fn test_pool_release_and_reuse() {
let server = MockNntpServer::start(MockConfig::default()).await;
let config = Arc::new(test_config(server.port()));
let pool = ConnectionPool::new(config);
let pooled = pool.acquire().await.unwrap();
let first_id = pooled.conn.server_id.clone();
pool.release(pooled);
assert_eq!(pool.idle_count(), 1);
let pooled = pool.acquire().await.unwrap();
assert_eq!(pooled.conn.server_id, first_id);
assert_eq!(pool.idle_count(), 0);
pool.release(pooled);
}
#[tokio::test]
async fn test_pool_discard_frees_slot() {
let server = MockNntpServer::start(MockConfig::default()).await;
let mut cfg = test_config(server.port());
cfg.connections = 2;
let pool = ConnectionPool::new(Arc::new(cfg));
let c1 = pool.acquire().await.unwrap();
let c2 = pool.acquire().await.unwrap();
assert_eq!(pool.available_permits(), 0);
pool.discard(c1);
assert_eq!(pool.available_permits(), 1);
assert_eq!(pool.idle_count(), 0);
pool.release(c2);
}
#[tokio::test]
async fn test_pool_close_idle() {
let server = MockNntpServer::start(MockConfig::default()).await;
let config = Arc::new(test_config(server.port()));
let pool = ConnectionPool::new(config);
let c1 = pool.acquire().await.unwrap();
let c2 = pool.acquire().await.unwrap();
pool.release(c1);
pool.release(c2);
assert_eq!(pool.idle_count(), 2);
pool.close_idle().await;
assert_eq!(pool.idle_count(), 0);
}
#[tokio::test]
async fn test_pool_idle_count_tracking() {
let server = MockNntpServer::start(MockConfig::default()).await;
let config = Arc::new(test_config(server.port()));
let pool = ConnectionPool::new(config);
assert_eq!(pool.idle_count(), 0);
let c1 = pool.acquire().await.unwrap();
assert_eq!(pool.idle_count(), 0);
let c2 = pool.acquire().await.unwrap();
assert_eq!(pool.idle_count(), 0);
pool.release(c1);
assert_eq!(pool.idle_count(), 1);
pool.release(c2);
assert_eq!(pool.idle_count(), 2);
}
}