use event_listener::Event;
use std::{
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
#[cfg(feature = "semaphore-trace")]
mod trace {
use super::*;
use log::debug;
use once_cell::sync::Lazy;
use std::sync::atomic::AtomicU64;
use std::time::SystemTime;
static SYS_START: Lazy<SystemTime> = Lazy::new(SystemTime::now);
#[inline]
pub(super) fn sys_now() -> u64 {
SystemTime::now().duration_since(*SYS_START).unwrap_or_default().as_micros() as u64
}
#[derive(Debug, Default)]
pub struct TraceInner {
readers_start: AtomicU64,
readers_time: AtomicU64,
log_time: AtomicU64,
log_value: AtomicU64,
}
impl TraceInner {
pub(super) fn mark_readers_start(&self) {
self.readers_start.store(sys_now(), Ordering::Relaxed);
}
pub(super) fn mark_readers_end(&self) {
let start = self.readers_start.load(Ordering::Relaxed);
let now = sys_now();
if start < now {
let readers_time = self.readers_time.fetch_add(now - start, Ordering::Relaxed) + now - start;
let log_time = self.log_time.load(Ordering::Relaxed);
if log_time + (Duration::from_secs(10).as_micros() as u64) < now {
let log_value = self.log_value.load(Ordering::Relaxed);
debug!(
"Semaphore: log interval: {:?}, readers time: {:?}, fraction: {:.4}",
Duration::from_micros(now - log_time),
Duration::from_micros(readers_time - log_value),
(readers_time - log_value) as f64 / (now - log_time) as f64
);
self.log_value.store(readers_time, Ordering::Relaxed);
self.log_time.store(now, Ordering::Relaxed);
}
}
}
}
}
#[cfg(feature = "semaphore-trace")]
use trace::*;
#[cfg(feature = "semaphore-trace")]
pub(crate) fn get_module_path() -> &'static str {
module_path!()
}
#[derive(Debug)]
pub(crate) struct Semaphore {
counter: AtomicUsize,
signal: Event,
#[cfg(feature = "semaphore-trace")]
trace_inner: TraceInner,
}
impl Semaphore {
pub const MAX_PERMITS: usize = usize::MAX;
pub fn new(available_permits: usize) -> Semaphore {
cfg_if::cfg_if! {
if #[cfg(feature = "semaphore-trace")] {
Semaphore {
counter: AtomicUsize::new(available_permits),
signal: Event::new(),
trace_inner: Default::default(),
}
} else {
Semaphore {
counter: AtomicUsize::new(available_permits),
signal: Event::new(),
}
}
}
}
pub fn try_acquire(&self, permits: usize) -> Option<usize> {
let mut count = self.counter.load(Ordering::Acquire);
loop {
if count < permits {
return None;
}
match self.counter.compare_exchange_weak(count, count - permits, Ordering::AcqRel, Ordering::Acquire) {
Ok(_) => {
#[cfg(feature = "semaphore-trace")]
if permits == 1 && count == Self::MAX_PERMITS {
self.trace_inner.mark_readers_start();
}
return Some(count);
}
Err(c) => count = c,
}
}
}
pub async fn acquire(&self, permits: usize) -> usize {
let mut listener = None;
loop {
if let Some(slot) = self.try_acquire(permits) {
return slot;
}
match listener.take() {
None => listener = Some(self.signal.listen()),
Some(l) => l.await,
}
}
}
pub fn blocking_acquire(&self, permits: usize) -> usize {
let mut listener = None;
loop {
if let Some(slot) = self.try_acquire(permits) {
return slot;
}
match listener.take() {
None => listener = Some(self.signal.listen()),
Some(l) => l.wait(),
}
}
}
pub fn release(&self, permits: usize) -> usize {
let slot = self.counter.fetch_add(permits, Ordering::AcqRel) + permits;
#[cfg(feature = "semaphore-trace")]
if permits == 1 && slot == Self::MAX_PERMITS {
self.trace_inner.mark_readers_end();
}
self.signal.notify(permits);
slot
}
pub fn blocking_yield(&self, permits: usize) -> usize {
self.release(permits);
self.signal.listen().wait_timeout(Duration::from_micros(30));
self.blocking_acquire(permits)
}
}