use parking_lot::{Condvar, Mutex};
use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
use std::time::Duration;
pub struct ReadyGuard {
tx: Option<std::sync::mpsc::Sender<()>>,
}
impl ReadyGuard {
pub fn new(tx: std::sync::mpsc::Sender<()>) -> Self {
Self { tx: Some(tx) }
}
pub fn send(&mut self) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(());
}
}
}
impl Drop for ReadyGuard {
fn drop(&mut self) {
self.send();
}
}
pub struct PlaybackSync {
pub cancel_handle: CancelHandle,
pub ready_tx: ReadyGuard,
pub clock: crate::clock::PlaybackClock,
pub start_time: Duration,
pub loop_control: LoopControl,
}
#[derive(Clone)]
pub struct LoopControl {
pub loop_break: Arc<AtomicBool>,
pub active_section: Arc<parking_lot::RwLock<Option<crate::player::SectionBounds>>>,
pub section_loop_break: Arc<AtomicBool>,
pub loop_time_consumed: Arc<parking_lot::Mutex<Duration>>,
}
impl LoopControl {
pub fn new() -> Self {
Self {
loop_break: Arc::new(AtomicBool::new(false)),
active_section: Arc::new(parking_lot::RwLock::new(None)),
section_loop_break: Arc::new(AtomicBool::new(false)),
loop_time_consumed: Arc::new(parking_lot::Mutex::new(Duration::ZERO)),
}
}
}
impl Default for LoopControl {
fn default() -> Self {
Self::new()
}
}
#[derive(PartialEq)]
enum CancelState {
Untouched,
Cancelled,
}
#[derive(Clone)]
pub struct CancelHandle {
cancelled: Arc<Mutex<CancelState>>,
condvar: Arc<Condvar>,
}
impl CancelHandle {
pub fn new() -> CancelHandle {
CancelHandle {
cancelled: Arc::new(Mutex::new(CancelState::Untouched)),
condvar: Arc::new(Condvar::new()),
}
}
}
impl Default for CancelHandle {
fn default() -> Self {
Self::new()
}
}
impl CancelHandle {
pub fn is_cancelled(&self) -> bool {
*self.cancelled.lock() == CancelState::Cancelled
}
pub fn wait(&self, finished: Arc<AtomicBool>) {
let mut guard = self.cancelled.lock();
self.condvar.wait_while(&mut guard, |cancelled| {
*cancelled == CancelState::Untouched && !finished.load(Ordering::Relaxed)
});
}
pub fn wait_with_timeout(
&self,
finished: Arc<AtomicBool>,
timeout: std::time::Duration,
) -> bool {
let mut guard = self.cancelled.lock();
let result = self.condvar.wait_while_for(
&mut guard,
|cancelled| *cancelled == CancelState::Untouched && !finished.load(Ordering::Relaxed),
timeout,
);
!result.timed_out()
}
pub fn notify(&self) {
let _guard = self.cancelled.lock();
self.condvar.notify_all();
}
pub fn cancel(&self) {
let mut cancel_state = self.cancelled.lock();
if *cancel_state == CancelState::Untouched {
*cancel_state = CancelState::Cancelled;
self.condvar.notify_all();
}
}
}
#[cfg(test)]
mod test {
use std::thread;
use super::*;
#[test]
fn test_cancel_handle_cancelled() {
let cancel_handle = CancelHandle::new();
assert!(!cancel_handle.is_cancelled());
let join = {
let cancel_handle = cancel_handle.clone();
thread::spawn(move || cancel_handle.wait(Arc::new(AtomicBool::new(false))))
};
cancel_handle.cancel();
assert!(join.join().is_ok());
assert!(cancel_handle.is_cancelled());
}
#[test]
fn test_cancel_handle_finished() {
let cancel_handle = CancelHandle::new();
assert!(!cancel_handle.is_cancelled());
let join = {
let cancel_handle = cancel_handle.clone();
thread::spawn(move || cancel_handle.wait(Arc::new(AtomicBool::new(true))))
};
assert!(join.join().is_ok());
assert!(!cancel_handle.is_cancelled());
}
#[test]
fn test_wait_with_timeout_returns_true_when_finished() {
let cancel_handle = CancelHandle::new();
let finished = Arc::new(AtomicBool::new(true));
assert!(cancel_handle.wait_with_timeout(finished, std::time::Duration::from_secs(1)));
}
#[test]
fn test_wait_with_timeout_returns_false_on_timeout() {
let cancel_handle = CancelHandle::new();
let finished = Arc::new(AtomicBool::new(false));
assert!(!cancel_handle.wait_with_timeout(finished, std::time::Duration::from_millis(50)));
}
#[test]
fn test_wait_with_timeout_returns_true_when_cancelled() {
let cancel_handle = CancelHandle::new();
let finished = Arc::new(AtomicBool::new(false));
let join = {
let cancel_handle = cancel_handle.clone();
thread::spawn(move || {
cancel_handle.wait_with_timeout(finished, std::time::Duration::from_secs(10))
})
};
cancel_handle.cancel();
assert!(join.join().unwrap());
}
#[test]
fn test_cancel_idempotent() {
let cancel_handle = CancelHandle::new();
assert!(!cancel_handle.is_cancelled());
cancel_handle.cancel();
assert!(cancel_handle.is_cancelled());
cancel_handle.cancel();
assert!(cancel_handle.is_cancelled());
}
#[test]
fn test_default_impl() {
let cancel_handle = CancelHandle::default();
assert!(!cancel_handle.is_cancelled());
}
#[test]
fn test_clone_shares_state() {
let handle1 = CancelHandle::new();
let handle2 = handle1.clone();
assert!(!handle2.is_cancelled());
handle1.cancel();
assert!(handle2.is_cancelled());
}
#[test]
fn test_notify_wakes_waiter_when_finished() {
let cancel_handle = CancelHandle::new();
let finished = Arc::new(AtomicBool::new(false));
let join = {
let cancel_handle = cancel_handle.clone();
let finished = finished.clone();
thread::spawn(move || cancel_handle.wait(finished))
};
finished.store(true, Ordering::Relaxed);
cancel_handle.notify();
assert!(join.join().is_ok());
assert!(!cancel_handle.is_cancelled());
}
#[test]
fn test_wait_returns_immediately_when_already_cancelled() {
let cancel_handle = CancelHandle::new();
cancel_handle.cancel();
let finished = Arc::new(AtomicBool::new(false));
cancel_handle.wait(finished);
assert!(cancel_handle.is_cancelled());
}
#[test]
fn test_wait_with_timeout_returns_immediately_when_already_finished() {
let cancel_handle = CancelHandle::new();
let finished = Arc::new(AtomicBool::new(true));
let result = cancel_handle.wait_with_timeout(finished, std::time::Duration::from_millis(1));
assert!(result);
}
#[test]
fn test_ready_guard_explicit_send() {
let (tx, rx) = std::sync::mpsc::channel::<()>();
let mut guard = ReadyGuard::new(tx);
guard.send();
assert!(rx.try_recv().is_ok(), "explicit send should deliver");
guard.send();
assert!(
rx.try_recv().is_err(),
"second send should not deliver again"
);
}
#[test]
fn test_ready_guard_sends_on_drop() {
let (tx, rx) = std::sync::mpsc::channel::<()>();
{
let _guard = ReadyGuard::new(tx);
}
assert!(
rx.try_recv().is_ok(),
"drop should send the ready signal automatically"
);
}
#[test]
fn test_ready_guard_no_double_send_on_drop() {
let (tx, rx) = std::sync::mpsc::channel::<()>();
{
let mut guard = ReadyGuard::new(tx);
guard.send();
}
assert!(rx.try_recv().is_ok(), "explicit send should arrive");
assert!(
rx.try_recv().is_err(),
"drop after explicit send should not send again"
);
}
}