use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Clone)]
pub struct ConnPool<T: Clone + Send + Sync> {
timer: tokio::time::Instant,
hold_timeout: u64,
conns: Arc<Mutex<std::collections::HashMap<Vec<u8>, T>>>,
last_active: Arc<Mutex<std::collections::HashMap<Vec<u8>, u64>>>,
}
impl<T: Clone + Send + Sync> ConnPool<T> {
pub fn new(hold_timeout: u64) -> Self {
Self {
timer: tokio::time::Instant::now(),
hold_timeout,
conns: Arc::new(Mutex::new(std::collections::HashMap::new())),
last_active: Arc::new(Mutex::new(std::collections::HashMap::new())),
}
}
fn new_handle(&self, pubkey: Vec<u8>) -> ConnPoolHandle {
ConnPoolHandle {
timer: self.timer.clone(),
pubkey,
last_active: self.last_active.clone(),
}
}
pub async fn get(&mut self, pubkey: Vec<u8>) -> Option<T> {
let conns = self.conns.lock().await;
conns.get(&pubkey).cloned()
}
pub async fn insert(&self, pubkey: Vec<u8>, conn: T) -> Option<T> {
let mut conns = self.conns.lock().await;
conns.insert(pubkey.clone(), conn);
conns.get(&pubkey).cloned()
}
pub async fn remove(&self, pubkey: Vec<u8>) {
let mut conns = self.conns.lock().await;
conns.remove(&pubkey);
}
pub async fn hold(&self, pubkey: Vec<u8>) -> ConnPoolHandle {
let mut last_active = self.last_active.lock().await;
last_active.remove(&pubkey);
self.new_handle(pubkey)
}
pub async fn collect(&self) {
log::debug!("collect start");
let mut last_active = self.last_active.lock().await;
let now = self.timer.elapsed().as_secs();
for (k, time) in last_active.clone().iter() {
if now - time > self.hold_timeout {
log::debug!("collect: {:?}", k);
last_active.remove(k);
self.conns.lock().await.remove(k);
}
}
}
}
pub fn collect_loop<T: Clone + Send + Sync + 'static>(
pool: ConnPool<T>,
interval: std::time::Duration,
) {
let pool = pool.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(interval).await;
pool.collect().await;
}
});
}
#[derive(Debug)]
pub struct ConnPoolHandle {
timer: tokio::time::Instant,
pubkey: Vec<u8>,
last_active: Arc<Mutex<std::collections::HashMap<Vec<u8>, u64>>>,
}
impl Drop for ConnPoolHandle {
fn drop(&mut self) {
let timer = self.timer.clone();
let pubkey = self.pubkey.clone();
let last_active = self.last_active.clone();
tokio::spawn(async move {
let mut last_active = last_active.lock().await;
last_active.insert(pubkey, timer.elapsed().as_secs());
});
}
}
#[cfg(test)]
mod test {
#[tokio::test]
async fn test_handle() {
let pool = super::ConnPool::<u64>::new(10);
let pubkey = vec![1, 2, 3];
let handle = pool.hold(pubkey.clone()).await;
{
let last_active = pool.last_active.lock().await;
assert!(matches!(last_active.get(&pubkey), None));
};
drop(handle);
tokio::time::sleep(tokio::time::Duration::from_millis(0)).await;
{
let last_active = pool.last_active.lock().await;
assert!(matches!(last_active.get(&pubkey), Some(_)));
};
}
}