use crate::core::detector;
use crate::core::locks::{NEXT_LOCK_ID, mutex::MutexGuard};
use crate::core::types::{CondvarId, get_current_thread_id};
use parking_lot::Condvar as ParkingLotCondvar;
use std::ops::DerefMut;
use std::sync::atomic::Ordering;
use std::time::Duration;
pub struct Condvar {
id: CondvarId,
inner: ParkingLotCondvar,
}
impl Condvar {
pub fn new() -> Self {
let id = NEXT_LOCK_ID.fetch_add(1, Ordering::SeqCst);
detector::condvar::create_condvar(id);
Condvar {
id,
inner: ParkingLotCondvar::new(),
}
}
pub fn id(&self) -> CondvarId {
self.id
}
pub fn wait<'a, T>(&self, guard: &mut MutexGuard<'a, T>) {
let thread_id = get_current_thread_id();
let mutex_id = guard.lock_id();
crate::core::detector::condvar::begin_wait(thread_id, self.id, mutex_id);
guard.clear_ownership();
crate::core::detector::mutex::release_mutex(thread_id, mutex_id);
self.inner.wait(guard.inner_guard());
guard.restore_ownership();
detector::mutex::complete_acquire(thread_id, mutex_id);
crate::core::detector::condvar::end_wait(thread_id, self.id, mutex_id);
crate::core::logger::log_interaction_event(
thread_id,
self.id,
crate::core::Events::CondvarWaitEnd,
);
}
pub fn wait_timeout<'a, T>(&self, guard: &mut MutexGuard<'a, T>, timeout: Duration) -> bool {
let thread_id = get_current_thread_id();
let mutex_id = guard.lock_id();
crate::core::detector::condvar::begin_wait(thread_id, self.id, mutex_id);
guard.clear_ownership();
crate::core::detector::mutex::release_mutex(thread_id, mutex_id);
let wait_result = self.inner.wait_for(guard.inner_guard(), timeout);
let timed_out = wait_result.timed_out();
guard.restore_ownership();
detector::mutex::complete_acquire(thread_id, mutex_id);
crate::core::detector::condvar::end_wait(thread_id, self.id, mutex_id);
crate::core::logger::log_interaction_event(
thread_id,
self.id,
crate::core::Events::CondvarWaitEnd,
);
timed_out
}
pub fn wait_while<'a, T, F>(&self, guard: &mut MutexGuard<'a, T>, mut condition: F)
where
F: FnMut(&mut T) -> bool,
{
while condition(guard.deref_mut()) {
self.wait(guard);
}
}
pub fn wait_timeout_while<'a, T, F>(
&self,
guard: &mut MutexGuard<'a, T>,
timeout: Duration,
mut condition: F,
) -> bool
where
F: FnMut(&mut T) -> bool,
{
let start = std::time::Instant::now();
while condition(guard.deref_mut()) {
let elapsed = start.elapsed();
if elapsed >= timeout {
return true; }
let remaining = timeout - elapsed;
if self.wait_timeout(guard, remaining) {
return true; }
}
false }
pub fn notify_one(&self) {
let thread_id = get_current_thread_id();
detector::condvar::notify_one(self.id, thread_id);
self.inner.notify_one();
}
pub fn notify_all(&self) {
let thread_id = get_current_thread_id();
detector::condvar::notify_all(self.id, thread_id);
self.inner.notify_all();
}
}
impl Default for Condvar {
fn default() -> Self {
Self::new()
}
}
impl Drop for Condvar {
fn drop(&mut self) {
detector::condvar::destroy_condvar(self.id);
}
}