use super::*;
#[derive(Clone)]
pub struct SessionPool {
inner: Arc<Inner>,
}
impl std::fmt::Debug for SessionPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SessionPool")
.field("max_sessions", &self.max_sessions())
.finish()
}
}
impl SessionPool {
pub fn new(client: RaSvnClient, max_sessions: usize) -> Result<Self, SvnError> {
Self::with_config(client, SessionPoolConfig::new(max_sessions)?)
}
pub fn with_config(client: RaSvnClient, config: SessionPoolConfig) -> Result<Self, SvnError> {
let max_sessions = config.max_sessions;
Ok(Self {
inner: Arc::new(Inner {
client,
config,
idle: Mutex::new(Vec::new()),
semaphore: Arc::new(Semaphore::new(max_sessions)),
}),
})
}
pub fn max_sessions(&self) -> usize {
self.inner.config.max_sessions
}
pub fn config(&self) -> SessionPoolConfig {
self.inner.config.clone()
}
pub async fn warm_up(&self) -> Result<usize, SvnError> {
self.warm_up_to(self.inner.config.prewarm_sessions).await
}
pub async fn warm_up_to(&self, target_idle: usize) -> Result<usize, SvnError> {
let target_idle = target_idle.min(self.inner.config.max_sessions);
if target_idle == 0 {
return Ok(0);
}
let mut created = 0usize;
let mut sessions = Vec::new();
let mut permits = Vec::new();
loop {
let idle_len = match self.inner.idle.lock() {
Ok(idle) => idle.len(),
Err(_) => 0,
};
if idle_len + sessions.len() >= target_idle {
break;
}
let permit = self.inner.acquire_permit().await?;
let session = self.inner.client.open_session().await?;
permits.push(permit);
sessions.push(session);
created += 1;
}
if !sessions.is_empty() {
let now = Instant::now();
if let Ok(mut idle) = self.inner.idle.lock() {
for session in sessions {
idle.push(IdleSession {
session,
idle_since: now,
});
}
}
}
drop(permits);
Ok(created)
}
pub async fn session(&self) -> Result<PooledSession, SvnError> {
let permit = self.inner.acquire_permit().await?;
let now = Instant::now();
let session = loop {
let entry = self.inner.pop_idle_session();
let Some(entry) = entry else {
break None;
};
if let Some(timeout) = self.inner.config.idle_timeout
&& now.saturating_duration_since(entry.idle_since) >= timeout
{
continue;
}
let idle_for = now.saturating_duration_since(entry.idle_since);
let mut session = entry.session;
let should_check = match self.inner.config.health_check {
SessionPoolHealthCheck::None => false,
SessionPoolHealthCheck::OnCheckout => true,
SessionPoolHealthCheck::OnCheckoutIfIdleFor(min_idle) => idle_for >= min_idle,
};
if should_check && session.get_latest_rev().await.is_err() {
continue;
}
break Some(session);
};
let session = match session {
Some(session) => session,
None => self.inner.client.open_session().await?,
};
Ok(PooledSession {
inner: self.inner.clone(),
session: Some(session),
permit: Some(permit),
})
}
#[cfg(test)]
pub(crate) async fn acquire_permit_for_test(&self) -> Result<OwnedSemaphorePermit, SvnError> {
self.inner.acquire_permit().await
}
}
impl RaSvnClient {
pub fn session_pool(&self, max_sessions: usize) -> Result<SessionPool, SvnError> {
SessionPool::new(self.clone(), max_sessions)
}
pub fn session_pool_with_config(
&self,
config: SessionPoolConfig,
) -> Result<SessionPool, SvnError> {
SessionPool::with_config(self.clone(), config)
}
}
struct Inner {
client: RaSvnClient,
config: SessionPoolConfig,
idle: Mutex<Vec<IdleSession>>,
semaphore: Arc<Semaphore>,
}
impl Inner {
async fn acquire_permit(&self) -> Result<OwnedSemaphorePermit, SvnError> {
let fut = self.semaphore.clone().acquire_owned();
if let Some(timeout) = self.config.acquire_timeout {
match tokio::time::timeout(timeout, fut).await {
Ok(permit) => permit.map_err(|_| SvnError::Protocol("session pool closed".into())),
Err(_) => Err(SvnError::Protocol("session pool acquire timed out".into())),
}
} else {
fut.await
.map_err(|_| SvnError::Protocol("session pool closed".into()))
}
}
fn pop_idle_session(&self) -> Option<IdleSession> {
self.idle.lock().ok().and_then(|mut idle| idle.pop())
}
fn push_idle_session(&self, session: RaSvnSession) {
if let Ok(mut idle) = self.idle.lock() {
idle.push(IdleSession {
session,
idle_since: Instant::now(),
});
}
}
}
#[derive(Debug)]
struct IdleSession {
session: RaSvnSession,
idle_since: Instant,
}
pub struct PooledSession {
inner: Arc<Inner>,
session: Option<RaSvnSession>,
permit: Option<OwnedSemaphorePermit>,
}
impl std::fmt::Debug for PooledSession {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PooledSession").finish()
}
}
impl Deref for PooledSession {
type Target = RaSvnSession;
#[allow(clippy::panic)]
fn deref(&self) -> &Self::Target {
match self.session.as_ref() {
Some(session) => session,
None => {
panic!("pooled session missing inner value");
}
}
}
}
impl DerefMut for PooledSession {
#[allow(clippy::panic)]
fn deref_mut(&mut self) -> &mut Self::Target {
match self.session.as_mut() {
Some(session) => session,
None => {
panic!("pooled session missing inner value");
}
}
}
}
impl Drop for PooledSession {
fn drop(&mut self) {
if let Some(session) = self.session.take() {
self.inner.push_idle_session(session);
}
let _permit = self.permit.take();
}
}