use std::sync::Arc;
use parking_lot::Mutex;
use tokio::sync::Semaphore;
use super::client::{HelloPayload, RpcClient, RpcClientError};
#[derive(Debug, Clone)]
pub struct RpcPoolConfig {
pub address: String,
pub max_connections: usize,
pub hello: HelloPayload,
}
pub struct RpcPool {
config: RpcPoolConfig,
permits: Arc<Semaphore>,
idle: Arc<Mutex<Vec<RpcClient>>>,
}
impl RpcPool {
pub fn new(config: RpcPoolConfig) -> Self {
let max = config.max_connections.max(1);
Self {
permits: Arc::new(Semaphore::new(max)),
idle: Arc::new(Mutex::new(Vec::with_capacity(max))),
config,
}
}
pub async fn acquire(&self) -> Result<PooledClient, RpcClientError> {
let permit = self
.permits
.clone()
.acquire_owned()
.await
.expect("semaphore not closed");
let client = {
let mut idle = self.idle.lock();
idle.pop()
};
let client = match client {
Some(c) => c,
None => {
let c = RpcClient::connect(&self.config.address).await?;
let _ = c.hello(self.config.hello.clone()).await?;
c
}
};
Ok(PooledClient {
inner: Some(client),
idle: Arc::clone(&self.idle),
_permit: permit,
})
}
pub fn idle_count(&self) -> usize {
self.idle.lock().len()
}
}
pub struct PooledClient {
inner: Option<RpcClient>,
idle: Arc<Mutex<Vec<RpcClient>>>,
_permit: tokio::sync::OwnedSemaphorePermit,
}
impl PooledClient {
pub fn client(&self) -> &RpcClient {
self.inner
.as_ref()
.expect("PooledClient inner is only None during Drop")
}
}
impl Drop for PooledClient {
fn drop(&mut self) {
if let Some(client) = self.inner.take() {
let mut idle = self.idle.lock();
idle.push(client);
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
#[test]
fn config_round_trip() {
let cfg = RpcPoolConfig {
address: "localhost:15503".into(),
max_connections: 4,
hello: HelloPayload::new("test"),
};
let pool = RpcPool::new(cfg);
assert_eq!(pool.idle_count(), 0);
}
}