use std::{
error::Error,
fmt,
sync::{
atomic::{AtomicBool, Ordering},
mpsc, Arc, Barrier, Mutex,
},
thread,
time::Duration,
};
use midly::live::LiveEvent;
use tokio::{sync::mpsc::Sender, task::JoinHandle};
use tracing::{info, span, Level};
use crate::songs::Song;
#[derive(Clone)]
pub struct Device {
name: String,
barrier: Arc<Barrier>,
closed: Arc<AtomicBool>,
event: Arc<Mutex<Vec<u8>>>,
emit_called: Arc<Mutex<Option<Vec<u8>>>>,
sysex_called: Arc<Mutex<Option<Vec<u8>>>>,
event_thread: Arc<Mutex<Option<JoinHandle<()>>>>,
}
impl Device {
pub fn get(name: &str) -> Device {
Device {
name: name.to_string(),
closed: Arc::new(AtomicBool::new(false)),
barrier: Arc::new(Barrier::new(2)),
event: Arc::new(Mutex::new(Vec::new())),
emit_called: Arc::new(Mutex::new(None)),
sysex_called: Arc::new(Mutex::new(None)),
event_thread: Arc::new(Mutex::new(None)),
}
}
#[cfg(test)]
pub fn mock_event(&self, event: &[u8]) {
{
let mut mutex_event = self.event.lock().expect("unable to get event lock");
*mutex_event = event.to_vec();
}
self.barrier.wait();
self.barrier.wait();
}
#[cfg(test)]
pub fn get_emitted_event(&self) -> Option<Vec<u8>> {
let emit_called = self
.emit_called
.lock()
.expect("unable to get emit called lock");
emit_called.as_ref().map(|event| event.to_vec())
}
#[cfg(test)]
pub fn reset_emitted_event(&self) {
let mut emit_called = self
.emit_called
.lock()
.expect("unable to get emit called lock");
*emit_called = None;
}
#[cfg(test)]
pub fn get_emitted_sysex(&self) -> Option<Vec<u8>> {
let sysex = self.sysex_called.lock().expect("unable to get sysex lock");
sysex.clone()
}
#[cfg(test)]
pub fn reset_emitted_sysex(&self) {
let mut sysex = self.sysex_called.lock().expect("unable to get sysex lock");
*sysex = None;
}
}
impl super::Device for Device {
fn watch_events(&self, sender: Sender<Vec<u8>>) -> Result<(), Box<dyn Error>> {
let mut event_thread = self.event_thread.lock().expect("unable to get lock");
if event_thread.is_some() {
return Err("Already watching events.".into());
}
let barrier = self.barrier.clone();
let event = self.event.clone();
let closed = self.closed.clone();
*event_thread = Some(tokio::task::spawn_blocking(move || loop {
barrier.wait();
{
if closed.load(Ordering::Relaxed) {
return;
}
let event = event.lock().expect("unable to get event lock");
sender
.blocking_send(event.to_vec())
.expect("error sending event");
}
barrier.wait();
}));
Ok(())
}
fn stop_watch_events(&self) {
if !self.closed.swap(true, Ordering::Relaxed) {
self.barrier.wait();
}
}
fn play_from(
&self,
song: Arc<Song>,
sync: crate::playsync::PlaybackSync,
) -> Result<(), Box<dyn Error>> {
let crate::playsync::PlaybackSync {
cancel_handle,
ready_tx,
clock,
start_time,
..
} = sync;
let span = span!(Level::INFO, "play song (mock)");
let _enter = span.enter();
let remaining_duration = song.duration().saturating_sub(start_time);
info!(
device = self.name,
song = song.name(),
duration = song.duration_string(),
start_time = ?start_time,
"Playing song."
);
let (sleep_tx, sleep_rx) = mpsc::channel::<()>();
let finished = Arc::new(AtomicBool::new(false));
let join_handle = {
let cancel_handle = cancel_handle.clone();
let finished = finished.clone();
let clock = clock.clone();
thread::spawn(move || {
let mut ready_tx = ready_tx;
ready_tx.send();
while clock.elapsed() == Duration::ZERO {
if cancel_handle.is_cancelled() {
finished.store(true, Ordering::Relaxed);
cancel_handle.notify();
return;
}
std::hint::spin_loop();
}
if cancel_handle.is_cancelled() {
finished.store(true, Ordering::Relaxed);
cancel_handle.notify();
return;
}
let _ = sleep_rx.recv_timeout(remaining_duration);
finished.store(true, Ordering::Relaxed);
cancel_handle.notify();
})
};
cancel_handle.wait(finished);
sleep_tx.send(())?;
if join_handle.join().is_err() {
return Err("Error while joining thread!".into());
}
Ok(())
}
fn emit(&self, midi_event: Option<LiveEvent<'static>>) -> Result<(), Box<dyn Error>> {
if let Some(midi_event) = midi_event {
let mut emit_called = self
.emit_called
.lock()
.expect("unable to get emit called lock");
let mut buf: Vec<u8> = Vec::with_capacity(8);
midi_event.write(&mut buf)?;
*emit_called = Some(buf);
}
Ok(())
}
fn emit_sysex(&self, bytes: &[u8]) -> Result<(), Box<dyn Error>> {
let mut sysex = self.sysex_called.lock().expect("unable to get sysex lock");
*sysex = Some(bytes.to_vec());
Ok(())
}
#[cfg(test)]
fn to_mock(&self) -> Result<Arc<Device>, Box<dyn Error>> {
Ok(Arc::new(self.clone()))
}
}
impl fmt::Display for Device {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} (Mock)", self.name,)
}
}