use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{Mutex, Semaphore};
use crate::client::ElectrumClient;
use crate::error::{ElectrumError, Result};
use crate::types::ClientConfig;
struct PooledConnection {
client: ElectrumClient,
created_at: Instant,
last_used: Instant,
use_count: usize,
}
impl PooledConnection {
fn new(client: ElectrumClient) -> Self {
let now = Instant::now();
Self {
client,
created_at: now,
last_used: now,
use_count: 0,
}
}
fn touch(&mut self) {
self.last_used = Instant::now();
self.use_count += 1;
}
fn age(&self) -> Duration {
self.created_at.elapsed()
}
fn idle_time(&self) -> Duration {
self.last_used.elapsed()
}
}
#[derive(Debug, Clone)]
pub struct PoolConfig {
pub min_connections: usize,
pub max_connections: usize,
pub idle_timeout: Duration,
pub max_age: Duration,
pub acquire_timeout: Duration,
pub validate_on_acquire: bool,
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
min_connections: 1,
max_connections: 10,
idle_timeout: Duration::from_secs(300),
max_age: Duration::from_secs(3600),
acquire_timeout: Duration::from_secs(30),
validate_on_acquire: true,
}
}
}
impl PoolConfig {
pub fn new() -> Self {
Self::default()
}
pub fn min_connections(mut self, min: usize) -> Self {
self.min_connections = min;
self
}
pub fn max_connections(mut self, max: usize) -> Self {
self.max_connections = max;
self
}
pub fn idle_timeout(mut self, timeout: Duration) -> Self {
self.idle_timeout = timeout;
self
}
pub fn max_age(mut self, age: Duration) -> Self {
self.max_age = age;
self
}
pub fn acquire_timeout(mut self, timeout: Duration) -> Self {
self.acquire_timeout = timeout;
self
}
pub fn validate_on_acquire(mut self, validate: bool) -> Self {
self.validate_on_acquire = validate;
self
}
}
pub struct ConnectionPool {
config: PoolConfig,
client_config: ClientConfig,
connections: Mutex<VecDeque<PooledConnection>>,
semaphore: Arc<Semaphore>,
active_count: AtomicUsize,
total_created: AtomicUsize,
}
impl ConnectionPool {
pub fn new(client_config: ClientConfig, pool_config: PoolConfig) -> Self {
let semaphore = Arc::new(Semaphore::new(pool_config.max_connections));
Self {
config: pool_config,
client_config,
connections: Mutex::new(VecDeque::new()),
semaphore,
active_count: AtomicUsize::new(0),
total_created: AtomicUsize::new(0),
}
}
pub fn with_defaults(client_config: ClientConfig) -> Self {
Self::new(client_config, PoolConfig::default())
}
pub async fn initialize(&self) -> Result<()> {
let mut conns = self.connections.lock().await;
while conns.len() < self.config.min_connections {
let client = ElectrumClient::with_config(self.client_config.clone()).await?;
conns.push_back(PooledConnection::new(client));
self.total_created.fetch_add(1, Ordering::SeqCst);
}
Ok(())
}
pub async fn acquire(&self) -> Result<PooledClient<'_>> {
let permit = tokio::time::timeout(
self.config.acquire_timeout,
self.semaphore.clone().acquire_owned(),
)
.await
.map_err(|_| ElectrumError::Timeout)?
.map_err(|_| ElectrumError::ConnectionFailed("Pool closed".into()))?;
let mut conns = self.connections.lock().await;
while let Some(mut conn) = conns.pop_front() {
if conn.age() > self.config.max_age {
continue; }
if conn.idle_time() > self.config.idle_timeout {
continue; }
if self.config.validate_on_acquire {
drop(conns);
if conn.client.ping().await.is_ok() {
conn.touch();
self.active_count.fetch_add(1, Ordering::SeqCst);
return Ok(PooledClient {
connection: Some(conn),
pool: self,
_permit: permit,
});
}
conns = self.connections.lock().await;
continue; }
conn.touch();
self.active_count.fetch_add(1, Ordering::SeqCst);
return Ok(PooledClient {
connection: Some(conn),
pool: self,
_permit: permit,
});
}
drop(conns);
let client = ElectrumClient::with_config(self.client_config.clone()).await?;
let mut conn = PooledConnection::new(client);
conn.touch();
self.total_created.fetch_add(1, Ordering::SeqCst);
self.active_count.fetch_add(1, Ordering::SeqCst);
Ok(PooledClient {
connection: Some(conn),
pool: self,
_permit: permit,
})
}
#[allow(dead_code)]
async fn release(&self, conn: PooledConnection) {
self.active_count.fetch_sub(1, Ordering::SeqCst);
if conn.age() > self.config.max_age {
return; }
let mut conns = self.connections.lock().await;
if conns.len() < self.config.max_connections {
conns.push_back(conn);
}
}
pub async fn stats(&self) -> PoolStats {
let conns = self.connections.lock().await;
PoolStats {
idle_connections: conns.len(),
active_connections: self.active_count.load(Ordering::SeqCst),
total_created: self.total_created.load(Ordering::SeqCst),
max_connections: self.config.max_connections,
}
}
pub async fn close(&self) {
let mut conns = self.connections.lock().await;
conns.clear();
}
pub async fn cleanup(&self) {
let mut conns = self.connections.lock().await;
conns.retain(|conn| {
conn.idle_time() <= self.config.idle_timeout &&
conn.age() <= self.config.max_age
});
}
}
#[derive(Debug, Clone)]
pub struct PoolStats {
pub idle_connections: usize,
pub active_connections: usize,
pub total_created: usize,
pub max_connections: usize,
}
impl PoolStats {
pub fn total_connections(&self) -> usize {
self.idle_connections + self.active_connections
}
pub fn utilization(&self) -> f64 {
if self.max_connections == 0 {
0.0
} else {
(self.active_connections as f64 / self.max_connections as f64) * 100.0
}
}
}
pub struct PooledClient<'a> {
connection: Option<PooledConnection>,
pool: &'a ConnectionPool,
_permit: tokio::sync::OwnedSemaphorePermit,
}
impl<'a> PooledClient<'a> {
pub fn client(&self) -> &ElectrumClient {
&self.connection.as_ref().unwrap().client
}
pub fn use_count(&self) -> usize {
self.connection.as_ref().unwrap().use_count
}
pub fn age(&self) -> Duration {
self.connection.as_ref().unwrap().age()
}
}
impl<'a> std::ops::Deref for PooledClient<'a> {
type Target = ElectrumClient;
fn deref(&self) -> &Self::Target {
self.client()
}
}
impl<'a> Drop for PooledClient<'a> {
fn drop(&mut self) {
self.connection.take();
self.pool.active_count.fetch_sub(1, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pool_config_default() {
let config = PoolConfig::default();
assert_eq!(config.min_connections, 1);
assert_eq!(config.max_connections, 10);
}
#[test]
fn test_pool_config_builder() {
let config = PoolConfig::new()
.min_connections(2)
.max_connections(20)
.idle_timeout(Duration::from_secs(60));
assert_eq!(config.min_connections, 2);
assert_eq!(config.max_connections, 20);
assert_eq!(config.idle_timeout, Duration::from_secs(60));
}
#[test]
fn test_pool_stats() {
let stats = PoolStats {
idle_connections: 5,
active_connections: 3,
total_created: 10,
max_connections: 10,
};
assert_eq!(stats.total_connections(), 8);
assert_eq!(stats.utilization(), 30.0);
}
}