use ringline::ConnCtx;
use crate::{Client, Credential, Error};
pub struct PoolConfig {
pub credential: Credential,
pub pool_size: usize,
pub connect_timeout_ms: u64,
}
enum Slot {
Connected(Box<Client>),
Disconnected,
}
pub struct Pool {
credential: Credential,
slots: Vec<Slot>,
next: usize,
connect_timeout_ms: u64,
}
impl Pool {
pub fn new(config: PoolConfig) -> Self {
let mut slots = Vec::with_capacity(config.pool_size);
for _ in 0..config.pool_size {
slots.push(Slot::Disconnected);
}
Pool {
credential: config.credential,
slots,
next: 0,
connect_timeout_ms: config.connect_timeout_ms,
}
}
pub async fn connect_all(&mut self) -> Result<(), Error> {
for i in 0..self.slots.len() {
let client = self.do_connect().await?;
self.slots[i] = Slot::Connected(Box::new(client));
}
Ok(())
}
pub async fn client(&mut self) -> Result<&mut Client, Error> {
let size = self.slots.len();
for _ in 0..size {
let idx = self.next;
self.next = (self.next + 1) % size;
match &self.slots[idx] {
Slot::Connected(_) => {
if let Slot::Connected(client) = &mut self.slots[idx] {
return Ok(client.as_mut());
}
unreachable!();
}
Slot::Disconnected => {
if let Ok(client) = self.do_connect().await {
self.slots[idx] = Slot::Connected(Box::new(client));
if let Slot::Connected(client) = &mut self.slots[idx] {
return Ok(client.as_mut());
}
unreachable!();
}
}
}
}
Err(Error::AllConnectionsFailed)
}
pub fn mark_disconnected(&mut self, conn: ConnCtx) {
let token = conn.token();
for slot in &mut self.slots {
if let Slot::Connected(client) = slot
&& client.conn().token() == token
{
client.conn().close();
*slot = Slot::Disconnected;
return;
}
}
}
pub fn close_all(&mut self) {
for slot in &mut self.slots {
if let Slot::Connected(client) = slot {
client.conn().close();
}
*slot = Slot::Disconnected;
}
}
pub fn connected_count(&self) -> usize {
self.slots
.iter()
.filter(|s| matches!(s, Slot::Connected(_)))
.count()
}
pub fn pool_size(&self) -> usize {
self.slots.len()
}
async fn do_connect(&self) -> Result<Client, Error> {
if self.connect_timeout_ms > 0 {
Client::connect_with_timeout(&self.credential, self.connect_timeout_ms).await
} else {
Client::connect(&self.credential).await
}
}
}