#![allow(dead_code)]
use crate::{
neo_clients::{APITrait, HttpProvider, RpcClient},
neo_error::{Neo3Error, Neo3Result},
};
use std::{
collections::VecDeque,
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::{RwLock, Semaphore};
#[derive(Debug, Clone)]
pub struct PoolConfig {
pub max_connections: usize,
pub min_idle: usize,
pub max_idle_time: Duration,
pub connection_timeout: Duration,
pub request_timeout: Duration,
pub max_retries: u32,
pub retry_delay: Duration,
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
max_connections: 10,
min_idle: 2,
max_idle_time: Duration::from_secs(300), connection_timeout: Duration::from_secs(30),
request_timeout: Duration::from_secs(60),
max_retries: 3,
retry_delay: Duration::from_millis(1000),
}
}
}
#[derive(Debug)]
struct PooledConnection {
client: RpcClient<HttpProvider>,
created_at: Instant,
last_used: Instant,
is_healthy: bool,
}
impl PooledConnection {
fn new(endpoint: &str) -> Neo3Result<Self> {
let provider = HttpProvider::new(endpoint).map_err(|e| {
Neo3Error::Network(crate::neo_error::NetworkError::ConnectionFailed(e.to_string()))
})?;
let client = RpcClient::new(provider);
Ok(Self { client, created_at: Instant::now(), last_used: Instant::now(), is_healthy: true })
}
fn is_expired(&self, max_idle_time: Duration) -> bool {
self.last_used.elapsed() > max_idle_time
}
fn mark_used(&mut self) {
self.last_used = Instant::now();
}
async fn health_check(&mut self) -> bool {
match self.client.get_version().await {
Ok(_) => {
self.is_healthy = true;
true
},
Err(_) => {
self.is_healthy = false;
false
},
}
}
}
pub struct ConnectionPool {
config: PoolConfig,
endpoint: String,
connections: Arc<RwLock<VecDeque<PooledConnection>>>,
semaphore: Arc<Semaphore>,
stats: Arc<RwLock<PoolStats>>,
}
#[derive(Debug, Default)]
pub struct PoolStats {
pub total_connections_created: u64,
pub total_requests: u64,
pub successful_requests: u64,
pub failed_requests: u64,
pub retried_requests: u64,
pub current_active_connections: usize,
pub current_idle_connections: usize,
}
impl ConnectionPool {
pub fn new(endpoint: String, config: PoolConfig) -> Self {
let semaphore = Arc::new(Semaphore::new(config.max_connections));
Self {
config,
endpoint,
connections: Arc::new(RwLock::new(VecDeque::new())),
semaphore,
stats: Arc::new(RwLock::new(PoolStats::default())),
}
}
pub async fn execute<F, T>(&self, operation: F) -> Neo3Result<T>
where
F: Fn(
&RpcClient<HttpProvider>,
)
-> std::pin::Pin<Box<dyn std::future::Future<Output = Neo3Result<T>> + Send + '_>>
+ Send
+ Sync,
T: Send,
{
let _permit = self.semaphore.acquire().await.map_err(|_| {
Neo3Error::Network(crate::neo_error::NetworkError::ConnectionFailed(
"Failed to acquire connection permit".to_string(),
))
})?;
let mut retries = 0;
loop {
{
let mut stats = self.stats.write().await;
stats.total_requests += 1;
}
let mut connection = self.get_connection().await?;
let result =
tokio::time::timeout(self.config.request_timeout, operation(&connection.client))
.await;
match result {
Ok(Ok(value)) => {
connection.mark_used();
self.return_connection(connection).await;
let mut stats = self.stats.write().await;
stats.successful_requests += 1;
return Ok(value);
},
Ok(Err(e)) => {
connection.is_healthy = false;
if retries < self.config.max_retries {
retries += 1;
let mut stats = self.stats.write().await;
stats.retried_requests += 1;
tokio::time::sleep(self.config.retry_delay * retries).await;
continue;
} else {
let mut stats = self.stats.write().await;
stats.failed_requests += 1;
return Err(e);
}
},
Err(_) => {
connection.is_healthy = false;
if retries < self.config.max_retries {
retries += 1;
let mut stats = self.stats.write().await;
stats.retried_requests += 1;
tokio::time::sleep(self.config.retry_delay * retries).await;
continue;
} else {
let mut stats = self.stats.write().await;
stats.failed_requests += 1;
return Err(Neo3Error::Network(crate::neo_error::NetworkError::Timeout));
}
},
}
}
}
async fn get_connection(&self) -> Neo3Result<PooledConnection> {
{
let mut connections = self.connections.write().await;
while let Some(mut conn) = connections.pop_front() {
if !conn.is_expired(self.config.max_idle_time) && conn.is_healthy {
conn.mark_used();
return Ok(conn);
}
}
}
let connection = PooledConnection::new(&self.endpoint)?;
let mut stats = self.stats.write().await;
stats.total_connections_created += 1;
stats.current_active_connections += 1;
Ok(connection)
}
async fn return_connection(&self, connection: PooledConnection) {
if connection.is_healthy && !connection.is_expired(self.config.max_idle_time) {
let mut connections = self.connections.write().await;
connections.push_back(connection);
let mut stats = self.stats.write().await;
stats.current_active_connections = stats.current_active_connections.saturating_sub(1);
stats.current_idle_connections = connections.len();
} else {
let mut stats = self.stats.write().await;
stats.current_active_connections = stats.current_active_connections.saturating_sub(1);
}
}
pub async fn health_check(&self) {
let pending: VecDeque<PooledConnection> = {
let mut connections = self.connections.write().await;
std::mem::take(&mut *connections)
};
let mut healthy_connections = VecDeque::new();
for mut conn in pending {
if conn.health_check().await {
healthy_connections.push_back(conn);
}
}
let idle_count = healthy_connections.len();
{
let mut connections = self.connections.write().await;
*connections = healthy_connections;
}
let mut stats = self.stats.write().await;
stats.current_idle_connections = idle_count;
}
pub async fn get_stats(&self) -> PoolStats {
let stats = self.stats.read().await;
PoolStats {
total_connections_created: stats.total_connections_created,
total_requests: stats.total_requests,
successful_requests: stats.successful_requests,
failed_requests: stats.failed_requests,
retried_requests: stats.retried_requests,
current_active_connections: stats.current_active_connections,
current_idle_connections: stats.current_idle_connections,
}
}
pub async fn close(&self) {
let mut connections = self.connections.write().await;
connections.clear();
let mut stats = self.stats.write().await;
stats.current_active_connections = 0;
stats.current_idle_connections = 0;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::neo_protocol::NeoVersion;
#[tokio::test]
async fn test_pool_creation() {
let config = PoolConfig::default();
let pool = ConnectionPool::new("https://testnet.neo.org:443".to_string(), config);
let stats = pool.get_stats().await;
assert_eq!(stats.total_connections_created, 0);
assert_eq!(stats.current_active_connections, 0);
}
#[tokio::test]
async fn test_pool_stats() {
let config = PoolConfig { max_connections: 2, ..Default::default() };
let pool = ConnectionPool::new("https://testnet.neo.org:443".to_string(), config);
let _result =
pool.execute(|_client| Box::pin(async move { Ok(NeoVersion::default()) })).await;
let stats = pool.get_stats().await;
assert!(stats.total_requests > 0);
}
}