use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
use atomic_wait::*;
struct SemaphoreGuard<'a> {
count: &'a AtomicU32,
}
fn dec(count: &AtomicU32) -> SemaphoreGuard {
let mut prev = 1;
loop {
match count.compare_exchange_weak(prev, prev - 1, Ordering::Acquire, Ordering::Relaxed) {
Ok(_) => break,
Err(actual) => {
if actual == 0 {
wait(count, 0);
prev = 1;
} else {
prev = actual;
}
}
}
}
SemaphoreGuard { count }
}
impl Drop for SemaphoreGuard<'_> {
fn drop(&mut self) {
if self.count.fetch_add(1, Ordering::Release) == 0 {
wake_one(self.count);
}
}
}
pub struct Semaphored<B> {
inner: B,
count: AtomicU32,
}
impl<B: Backend> Semaphored<B> {
pub fn new(inner: B, concurrency: u32) -> Self {
Self {
inner,
count: AtomicU32::new(concurrency),
}
}
}
impl<B: Backend> Backend for Semaphored<B> {
fn read(&self, from: &str) -> Result<Box<dyn Read + Send + 'static>> {
let _sem = dec(&self.count);
self.inner.read(from)
}
fn write(&self, len: u64, from: &mut (dyn Read + Send), to: &str) -> Result<()> {
let _sem = dec(&self.count);
self.inner.write(len, from, to)
}
fn remove(&self, which: &str) -> Result<()> {
let _sem = dec(&self.count);
self.inner.remove(which)
}
fn list(&self, prefix: &str) -> Result<Vec<(String, u64)>> {
let _sem = dec(&self.count);
self.inner.list(prefix)
}
}