use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex};
use std::time::Instant;
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct PoolKey {
pub site: String,
pub host: String,
}
impl PoolKey {
pub fn new(site: impl Into<String>, host: impl Into<String>) -> Self {
Self {
site: site.into(),
host: host.into(),
}
}
}
impl fmt::Display for PoolKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}@{}", self.site, self.host)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PoolState {
Idle,
Acquired,
InUse,
}
#[derive(Debug)]
pub struct PoolMember {
pub session_id: String,
pub state: PoolState,
pub last_used: Instant,
}
#[derive(Clone, Default)]
pub struct PoolManager {
inner: Arc<Mutex<Inner>>,
}
#[derive(Default)]
struct Inner {
members: HashMap<PoolKey, Vec<PoolMember>>,
by_id: HashMap<String, PoolKey>,
max_per_pool: usize,
}
#[derive(Debug)]
pub enum AcquireOutcome {
Existing(String),
CreateNew,
Busy,
}
impl PoolManager {
pub fn new(max_per_pool: usize) -> Self {
Self {
inner: Arc::new(Mutex::new(Inner {
members: HashMap::new(),
by_id: HashMap::new(),
max_per_pool,
})),
}
}
pub fn acquire_atomic(&self, key: &PoolKey, new_state: PoolState) -> AcquireOutcome {
let mut inner = self.inner.lock().unwrap();
let max = inner.max_per_pool;
if let Some(pool) = inner.members.get_mut(key) {
if let Some(m) = pool.iter_mut().find(|m| m.state == PoolState::Idle) {
m.state = new_state;
m.last_used = Instant::now();
return AcquireOutcome::Existing(m.session_id.clone());
}
if max == 0 || pool.len() < max {
return AcquireOutcome::CreateNew;
}
return AcquireOutcome::Busy;
}
AcquireOutcome::CreateNew
}
pub fn try_acquire(&self, key: &PoolKey) -> AcquireOutcome {
let inner = self.inner.lock().unwrap();
let max = inner.max_per_pool;
let pool = inner.members.get(key);
if let Some(pool) = pool {
if let Some(m) = pool.iter().find(|m| m.state == PoolState::Idle) {
return AcquireOutcome::Existing(m.session_id.clone());
}
if max == 0 || pool.len() < max {
return AcquireOutcome::CreateNew;
}
return AcquireOutcome::Busy;
}
AcquireOutcome::CreateNew
}
pub fn register(&self, key: PoolKey, session_id: String, state: PoolState) {
let mut inner = self.inner.lock().unwrap();
inner.by_id.insert(session_id.clone(), key.clone());
inner.members.entry(key).or_default().push(PoolMember {
session_id,
state,
last_used: Instant::now(),
});
}
pub fn mark(&self, session_id: &str, state: PoolState) -> bool {
let mut inner = self.inner.lock().unwrap();
let Some(key) = inner.by_id.get(session_id).cloned() else {
return false;
};
let Some(pool) = inner.members.get_mut(&key) else {
return false;
};
if let Some(m) = pool.iter_mut().find(|m| m.session_id == session_id) {
m.state = state;
m.last_used = Instant::now();
true
} else {
false
}
}
pub fn remove(&self, session_id: &str) -> Option<PoolKey> {
let mut inner = self.inner.lock().unwrap();
let key = inner.by_id.remove(session_id)?;
if let Some(pool) = inner.members.get_mut(&key) {
pool.retain(|m| m.session_id != session_id);
if pool.is_empty() {
inner.members.remove(&key);
}
}
Some(key)
}
pub fn snapshot(&self) -> Vec<(String, PoolKey, PoolState, u64)> {
let inner = self.inner.lock().unwrap();
let mut out = Vec::new();
for (key, pool) in inner.members.iter() {
for m in pool.iter() {
out.push((
m.session_id.clone(),
key.clone(),
m.state,
m.last_used.elapsed().as_secs(),
));
}
}
out
}
pub fn key_of(&self, session_id: &str) -> Option<PoolKey> {
self.inner.lock().unwrap().by_id.get(session_id).cloned()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pool_key_display() {
let k = PoolKey::new("sa", "127.0.0.1:9222");
assert_eq!(k.to_string(), "sa@127.0.0.1:9222");
}
#[test]
fn pool_key_equality() {
let a = PoolKey::new("sa", "127.0.0.1:9222");
let b = PoolKey::new("sa", "127.0.0.1:9222");
let c = PoolKey::new("twitter", "127.0.0.1:9222");
assert_eq!(a, b);
assert_ne!(a, c);
}
#[test]
fn empty_pool_says_create_new() {
let pm = PoolManager::new(3);
let key = PoolKey::new("sa", "h");
assert!(matches!(pm.try_acquire(&key), AcquireOutcome::CreateNew));
}
#[test]
fn register_then_acquire_reuses_idle() {
let pm = PoolManager::new(3);
let key = PoolKey::new("sa", "h");
pm.register(key.clone(), "sid-1".into(), PoolState::Idle);
match pm.try_acquire(&key) {
AcquireOutcome::Existing(sid) => assert_eq!(sid, "sid-1"),
other => panic!("expected Existing, got {other:?}"),
}
}
#[test]
fn in_use_sessions_do_not_match_idle_acquire() {
let pm = PoolManager::new(3);
let key = PoolKey::new("sa", "h");
pm.register(key.clone(), "sid-1".into(), PoolState::InUse);
assert!(matches!(pm.try_acquire(&key), AcquireOutcome::CreateNew));
}
#[test]
fn pool_full_returns_busy() {
let pm = PoolManager::new(2);
let key = PoolKey::new("sa", "h");
pm.register(key.clone(), "sid-1".into(), PoolState::InUse);
pm.register(key.clone(), "sid-2".into(), PoolState::InUse);
assert!(matches!(pm.try_acquire(&key), AcquireOutcome::Busy));
}
#[test]
fn unlimited_pool_never_busy() {
let pm = PoolManager::new(0);
let key = PoolKey::new("sa", "h");
for i in 0..10 {
pm.register(key.clone(), format!("sid-{i}"), PoolState::InUse);
}
assert!(matches!(pm.try_acquire(&key), AcquireOutcome::CreateNew));
}
#[test]
fn mark_updates_state() {
let pm = PoolManager::new(3);
let key = PoolKey::new("sa", "h");
pm.register(key.clone(), "sid-1".into(), PoolState::InUse);
assert!(pm.mark("sid-1", PoolState::Idle));
assert!(matches!(
pm.try_acquire(&key),
AcquireOutcome::Existing(s) if s == "sid-1"
));
}
#[test]
fn remove_drops_member_and_empty_pool() {
let pm = PoolManager::new(3);
let key = PoolKey::new("sa", "h");
pm.register(key.clone(), "sid-1".into(), PoolState::Idle);
assert_eq!(pm.remove("sid-1"), Some(key.clone()));
assert!(matches!(pm.try_acquire(&key), AcquireOutcome::CreateNew));
assert_eq!(pm.key_of("sid-1"), None);
}
#[test]
fn snapshot_lists_all_members() {
let pm = PoolManager::new(3);
let k1 = PoolKey::new("sa", "h");
let k2 = PoolKey::new("twitter", "h");
pm.register(k1.clone(), "s1".into(), PoolState::Idle);
pm.register(k2.clone(), "s2".into(), PoolState::InUse);
let snap = pm.snapshot();
assert_eq!(snap.len(), 2);
}
#[test]
fn acquire_atomic_flips_state() {
let pm = PoolManager::new(3);
let key = PoolKey::new("sa", "h");
pm.register(key.clone(), "sid-1".into(), PoolState::Idle);
match pm.acquire_atomic(&key, PoolState::InUse) {
AcquireOutcome::Existing(sid) => assert_eq!(sid, "sid-1"),
other => panic!("expected Existing, got {other:?}"),
}
assert!(matches!(
pm.acquire_atomic(&key, PoolState::InUse),
AcquireOutcome::CreateNew
));
}
#[test]
fn snapshot_includes_state_per_member() {
let pm = PoolManager::new(3);
let key = PoolKey::new("sa", "h");
pm.register(key.clone(), "a".into(), PoolState::Idle);
pm.register(key.clone(), "b".into(), PoolState::Acquired);
pm.register(key.clone(), "c".into(), PoolState::InUse);
let snap = pm.snapshot();
let acquired: Vec<_> = snap
.iter()
.filter(|(_, _, s, _)| *s == PoolState::Acquired)
.map(|(sid, _, _, _)| sid.as_str())
.collect();
assert_eq!(acquired, vec!["b"]);
}
#[test]
fn acquire_atomic_concurrent_callers_see_different_outcomes() {
use std::sync::Barrier;
use std::thread;
let pm = PoolManager::new(3);
let key = PoolKey::new("sa", "h");
pm.register(key.clone(), "sid-1".into(), PoolState::Idle);
let barrier = std::sync::Arc::new(Barrier::new(2));
let pm_a = pm.clone();
let pm_b = pm.clone();
let key_a = key.clone();
let key_b = key.clone();
let b_a = barrier.clone();
let b_b = barrier.clone();
let t_a = thread::spawn(move || {
b_a.wait();
pm_a.acquire_atomic(&key_a, PoolState::InUse)
});
let t_b = thread::spawn(move || {
b_b.wait();
pm_b.acquire_atomic(&key_b, PoolState::InUse)
});
let out_a = t_a.join().unwrap();
let out_b = t_b.join().unwrap();
let existing_count = [&out_a, &out_b]
.iter()
.filter(|o| matches!(o, AcquireOutcome::Existing(_)))
.count();
let create_count = [&out_a, &out_b]
.iter()
.filter(|o| matches!(o, AcquireOutcome::CreateNew))
.count();
assert_eq!(existing_count, 1, "exactly one should get Existing");
assert_eq!(create_count, 1, "exactly one should get CreateNew");
}
}