use std::cmp::min;
use std::collections::VecDeque;
use async_std::sync::{Arc, Mutex, Condvar};
use async_std::channel::{Receiver, RecvError};
use crate::client::Connection;
use crate::builder::Config;
use crate::pool::command::Command;
#[derive(Debug)]
pub(crate) struct PoolState {
pub config: Config,
pub inner: Mutex<Inner>,
pub connection_released: Condvar,
}
#[derive(Debug)]
pub(crate) struct Inner {
pub in_progress: usize,
pub acquired_conns: usize,
pub conns: VecDeque<Connection>,
}
impl PoolState {
pub(crate) fn new(config: Config) -> PoolState {
PoolState {
inner: Mutex::new(Inner {
in_progress: 0,
acquired_conns: 0,
conns: VecDeque::with_capacity(
min(config.0.max_connections, 16)),
}),
connection_released: Condvar::new(),
config,
}
}
}
pub(crate) async fn main(state: Arc<PoolState>, rcv: Receiver<Command>) {
loop {
match rcv.recv().await {
Ok(Command::Release(conn)) => {
let mut inner = state.inner.lock().await;
if conn.is_consistent() {
inner.conns.push_back(conn);
} else {
inner.acquired_conns -= 1;
}
state.connection_released.notify_one();
drop(inner);
}
Ok(Command::ConnectionCanceled) => {
let mut inner = state.inner.lock().await;
inner.in_progress -= 1;
state.connection_released.notify_one();
drop(inner);
}
Ok(Command::ConnectionEstablished) => {
let mut inner = state.inner.lock().await;
inner.in_progress -= 1;
inner.acquired_conns += 1;
drop(inner);
}
Ok(Command::Close) | Err(RecvError) => {
break;
}
}
}
}