use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::{collections::VecDeque, sync::Weak};
use parking_lot::Mutex;
use tokio::sync::Semaphore;
use crate::{B2Error, Client, UploadUrl};
struct PoolInner {
bucket_id: Option<String>,
client: Client,
sem: Semaphore,
urls: Mutex<VecDeque<UploadUrl>>,
}
#[derive(Clone)]
pub struct Pool(Arc<PoolInner>);
pub struct PooledUploadUrl {
pool: Weak<PoolInner>,
url: Option<UploadUrl>,
}
impl Pool {
pub fn new(client: Client, bucket_id: Option<&str>, max_urls: u8) -> Self {
Self(Arc::new(PoolInner {
bucket_id: bucket_id.map(str::to_owned),
client,
sem: Semaphore::new(max_urls as usize),
urls: Mutex::new(VecDeque::new()),
}))
}
pub async fn get_pooled_upload_url(&self) -> Result<PooledUploadUrl, B2Error> {
match self.0.sem.acquire().await {
Ok(permit) => permit.forget(),
Err(_) => return Err(B2Error::Unknown), }
let inner = &self.0;
if let Some(url) = inner.urls.lock().pop_front() {
return Ok(PooledUploadUrl {
pool: Arc::downgrade(inner),
url: Some(url),
});
}
let new_url = inner.client.get_upload_url(inner.bucket_id.as_deref()).await?;
Ok(PooledUploadUrl {
pool: Arc::downgrade(&self.0),
url: Some(new_url),
})
}
pub fn increase_pool_size(&self, size: usize) {
self.0.sem.add_permits(size);
}
}
impl Deref for Pool {
type Target = Client;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0.client
}
}
impl Deref for PooledUploadUrl {
type Target = UploadUrl;
#[inline]
fn deref(&self) -> &Self::Target {
debug_assert!(self.url.is_some());
unsafe { self.url.as_ref().unwrap_unchecked() }
}
}
impl DerefMut for PooledUploadUrl {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
debug_assert!(self.url.is_some());
unsafe { self.url.as_mut().unwrap_unchecked() }
}
}
impl Drop for PooledUploadUrl {
fn drop(&mut self) {
if let Some(pool) = self.pool.upgrade() {
pool.urls.lock().push_back(unsafe { self.url.take().unwrap_unchecked() });
pool.sem.add_permits(1);
}
}
}