use hashbrown::HashMap;
use std::sync::Arc;
use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore};
use crate::AgentmailError;
use crate::config::{AccountConfig, Config};
use crate::imap_client::{self, ImapSession};
pub struct ConnectionPool {
config: Config,
pools: Arc<Mutex<HashMap<String, Vec<ImapSession>>>>,
semaphores: Arc<Mutex<HashMap<String, Arc<Semaphore>>>>,
}
const MAX_CONCURRENT_PER_ACCOUNT: usize = 3;
impl ConnectionPool {
pub fn new(config: Config) -> Self {
Self {
config,
pools: Arc::new(Mutex::new(HashMap::new())),
semaphores: Arc::new(Mutex::new(HashMap::new())),
}
}
async fn account_semaphore(&self, account_name: &str) -> Arc<Semaphore> {
let mut sems = self.semaphores.lock().await;
sems.entry(account_name.to_string())
.or_insert_with(|| {
let limit = self
.config
.accounts
.get(account_name)
.and_then(|c| c.max_connections)
.unwrap_or(MAX_CONCURRENT_PER_ACCOUNT);
Arc::new(Semaphore::new(limit))
})
.clone()
}
fn account_max_connections(&self, account_name: &str) -> usize {
self.config
.accounts
.get(account_name)
.and_then(|c| c.max_connections)
.unwrap_or(MAX_CONCURRENT_PER_ACCOUNT)
}
pub async fn acquire(&self, account_name: &str) -> crate::Result<PooledSession> {
let account_config = self
.config
.accounts
.get(account_name)
.ok_or_else(|| AgentmailError::AccountNotFound(account_name.to_string()))?;
let max_conn = self.account_max_connections(account_name);
let sem = self.account_semaphore(account_name).await;
let permit = sem
.acquire_owned()
.await
.map_err(|_| AgentmailError::Other("concurrency semaphore closed".to_string()))?;
let maybe_session = {
let mut pools = self.pools.lock().await;
pools.get_mut(account_name).and_then(|pool| pool.pop())
};
if let Some(mut session) = maybe_session
&& imap_client::ping(&mut session).await.is_ok()
{
return Ok(PooledSession {
session: Some(session),
account_name: account_name.to_string(),
pool: Arc::clone(&self.pools),
max_connections: max_conn,
_permit: permit,
});
}
let password = crate::credentials::get_password(account_name, account_config).await?;
let session = imap_client::connect(account_config, &password).await?;
Ok(PooledSession {
session: Some(session),
account_name: account_name.to_string(),
pool: Arc::clone(&self.pools),
max_connections: max_conn,
_permit: permit,
})
}
pub fn account_names(&self) -> Vec<String> {
let mut names: Vec<String> = self.config.accounts.keys().cloned().collect();
names.sort();
names
}
pub fn account_config(&self, name: &str) -> Option<&AccountConfig> {
self.config.accounts.get(name)
}
pub fn config(&self) -> &Config {
&self.config
}
}
pub struct PooledSession {
session: Option<ImapSession>,
account_name: String,
pool: Arc<Mutex<HashMap<String, Vec<ImapSession>>>>,
max_connections: usize,
_permit: OwnedSemaphorePermit,
}
impl PooledSession {
pub fn session(&mut self) -> &mut ImapSession {
self.session.as_mut().expect("session already consumed")
}
pub async fn release(mut self) {
if let Some(session) = self.session.take() {
let mut pools = self.pool.lock().await;
let pool = pools.entry(self.account_name.clone()).or_default();
if pool.len() < self.max_connections {
pool.push(session);
}
}
}
}
impl Drop for PooledSession {
fn drop(&mut self) {
}
}