use std::{
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Condvar, Mutex,
},
time::{Duration, Instant},
};
use futures::executor::block_on;
use futures::FutureExt as _;
use pulseaudio::{protocol, AsPlaybackSource};
use crate::{
host::{emit_error, latch::Latch, ErrorCallbackArc},
traits::StreamTrait,
Data, Error, ErrorKind, FrameCount, InputCallbackInfo, InputStreamTimestamp,
OutputCallbackInfo, OutputStreamTimestamp, SampleFormat, StreamInstant,
};
const LATENCY_MAX_INTERVAL: Duration = Duration::from_millis(100);
struct LatencyHandle {
cancel: Arc<AtomicBool>,
update: Arc<(Mutex<bool>, Condvar)>,
}
impl LatencyHandle {
fn new() -> Self {
Self {
cancel: Arc::new(AtomicBool::new(false)),
update: Arc::new((Mutex::new(false), Condvar::new())),
}
}
fn notify(&self) {
let (lock, cvar) = &*self.update;
*lock.lock().unwrap_or_else(|e| e.into_inner()) = true;
cvar.notify_one();
}
fn cancel(&self) {
self.cancel.store(true, Ordering::Relaxed);
self.notify();
}
}
enum StreamInner {
Playback(pulseaudio::PlaybackStream, Instant, LatencyHandle),
Record(pulseaudio::RecordStream, Instant, LatencyHandle),
}
pub struct Stream {
inner: StreamInner,
workers: Vec<std::thread::JoinHandle<()>>,
latch: Latch,
}
impl Drop for Stream {
fn drop(&mut self) {
match &mut self.inner {
StreamInner::Playback(stream, _, handle) => {
handle.cancel();
let _ = stream.clone().delete().now_or_never();
}
StreamInner::Record(_, _, handle) => {
handle.cancel();
}
}
self.signal_ready();
for handle in self.workers.drain(..) {
if handle.thread().id() != std::thread::current().id() {
let _ = handle.join();
}
}
}
}
impl StreamTrait for Stream {
fn play(&self) -> Result<(), Error> {
match &self.inner {
StreamInner::Playback(stream, _, handle) => {
block_on(stream.uncork()).map_err(Error::from)?;
handle.notify();
}
StreamInner::Record(stream, _, handle) => {
block_on(stream.uncork()).map_err(Error::from)?;
block_on(stream.started()).map_err(Error::from)?;
handle.notify();
}
}
Ok(())
}
fn pause(&self) -> Result<(), Error> {
let res = match &self.inner {
StreamInner::Playback(stream, _, _) => block_on(stream.cork()),
StreamInner::Record(stream, _, _) => block_on(stream.cork()),
};
res.map_err(Error::from)?;
match &self.inner {
StreamInner::Playback(_, _, handle) | StreamInner::Record(_, _, handle) => {
handle.notify()
}
}
Ok(())
}
fn now(&self) -> StreamInstant {
let start = match &self.inner {
StreamInner::Playback(_, start, _) | StreamInner::Record(_, start, _) => *start,
};
let elapsed = start.elapsed();
StreamInstant::new(elapsed.as_secs(), elapsed.subsec_nanos())
}
fn buffer_size(&self) -> Result<FrameCount, Error> {
let (spec, bytes) = match &self.inner {
StreamInner::Playback(s, _, _) => (
s.sample_spec(),
s.buffer_attr().minimum_request_length as usize,
),
StreamInner::Record(s, _, _) => {
(s.sample_spec(), s.buffer_attr().fragment_size as usize)
}
};
let frame_size = spec.channels as usize * spec.format.bytes_per_sample();
Ok((bytes / frame_size) as _)
}
}
impl Stream {
pub fn new_playback<D, E>(
client: pulseaudio::Client,
params: protocol::PlaybackStreamParams,
mut data_callback: D,
error_callback: E,
) -> Result<Self, Error>
where
D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
E: FnMut(Error) + Send + 'static,
{
let start = Instant::now();
let current_latency_micros = Arc::new(AtomicU64::new(0));
let last_poll_micros = Arc::new(AtomicU64::new(0));
let latency_clone = current_latency_micros.clone();
let poll_clone = last_poll_micros.clone();
let sample_spec = params.sample_spec;
let pa_format = sample_spec.format;
let format: SampleFormat = pa_format.try_into().map_err(|_| {
Error::with_message(
ErrorKind::UnsupportedConfig,
"Sample format is not supported",
)
})?;
let silence_byte = if format == SampleFormat::U8 {
0x80u8
} else {
0u8
};
let handle = LatencyHandle::new();
let update_callback = handle.update.clone();
let callback = move |buf: &mut [u8]| {
let elapsed = Instant::now().saturating_duration_since(start);
let elapsed_usec = elapsed.as_micros() as u64;
let stored_latency = latency_clone.load(Ordering::Relaxed);
let poll_usec = poll_clone.load(Ordering::Relaxed);
let elapsed_since_poll = elapsed_usec
.saturating_sub(poll_usec)
.min(LATENCY_MAX_INTERVAL.as_micros() as u64);
let latency = stored_latency.saturating_sub(elapsed_since_poll);
let playback_time = elapsed + Duration::from_micros(latency);
let timestamp = OutputStreamTimestamp {
callback: StreamInstant::new(elapsed.as_secs(), elapsed.subsec_nanos()),
playback: StreamInstant::new(playback_time.as_secs(), playback_time.subsec_nanos()),
};
buf.fill(silence_byte);
let bps = sample_spec.format.bytes_per_sample();
let n_samples = buf.len() / bps;
let mut data = unsafe { Data::from_parts(buf.as_mut_ptr().cast(), n_samples, format) };
data_callback(&mut data, &OutputCallbackInfo { timestamp });
let (lock, cvar) = &*update_callback;
*lock.lock().unwrap_or_else(|e| e.into_inner()) = true;
cvar.notify_one();
buf.len()
};
let stream = block_on(client.create_playback_stream(params, callback.as_playback_source()))
.map_err(Error::from)?;
let error_callback: ErrorCallbackArc = Arc::new(Mutex::new(error_callback));
let stream_clone = stream.clone();
let error_callback_clone = error_callback.clone();
let cancel_driver = handle.cancel.clone();
let mut latch = Latch::new();
let waiter_driver = latch.waiter();
let driver_handle = std::thread::spawn(move || {
waiter_driver.wait();
if let Err(e) = block_on(stream_clone.play_all()) {
if !cancel_driver.load(Ordering::Relaxed) {
emit_error(&error_callback_clone, Error::from(e));
}
}
});
let cancel_thread = handle.cancel.clone();
let update_thread = handle.update.clone();
let stream_clone = stream.clone();
let latency_clone = current_latency_micros.clone();
let poll_clone = last_poll_micros.clone();
let waiter_latency = latch.waiter();
let latency_handle = std::thread::spawn(move || {
waiter_latency.wait();
loop {
if cancel_thread.load(Ordering::Relaxed) {
break;
}
let timing_info = match block_on(stream_clone.timing_info()) {
Ok(timing_info) => timing_info,
Err(e) => {
emit_error(&error_callback, Error::from(e));
break;
}
};
let poll_since_epoch =
Instant::now().saturating_duration_since(start).as_micros() as u64;
poll_clone.store(poll_since_epoch, Ordering::Relaxed);
store_latency(
&latency_clone,
sample_spec,
timing_info.sink_usec,
timing_info.write_offset,
timing_info.read_offset,
);
let (lock, cvar) = &*update_thread;
let Ok(guard) = lock.lock() else { break };
let (mut guard, _) = cvar
.wait_timeout_while(guard, LATENCY_MAX_INTERVAL, |notified| !*notified)
.unwrap_or_else(|e| e.into_inner());
*guard = false;
}
});
latch.add_thread(driver_handle.thread().clone());
latch.add_thread(latency_handle.thread().clone());
Ok(Self {
inner: StreamInner::Playback(stream, start, handle),
workers: vec![driver_handle, latency_handle],
latch,
})
}
pub fn new_record<D, E>(
client: pulseaudio::Client,
params: protocol::RecordStreamParams,
mut data_callback: D,
mut error_callback: E,
) -> Result<Self, Error>
where
D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
E: FnMut(Error) + Send + 'static,
{
let start = Instant::now();
let current_latency_micros = Arc::new(AtomicU64::new(0));
let last_poll_micros = Arc::new(AtomicU64::new(0));
let latency_clone = current_latency_micros.clone();
let poll_clone = last_poll_micros.clone();
let sample_spec = params.sample_spec;
let pa_format = sample_spec.format;
let format: SampleFormat = pa_format.try_into().map_err(|_| {
Error::with_message(
ErrorKind::UnsupportedConfig,
"Sample format is not supported",
)
})?;
let handle = LatencyHandle::new();
let update_callback = handle.update.clone();
let callback = move |buf: &[u8]| {
let elapsed = Instant::now().saturating_duration_since(start);
let elapsed_usec = elapsed.as_micros() as u64;
let stored_latency = latency_clone.load(Ordering::Relaxed);
let poll_usec = poll_clone.load(Ordering::Relaxed);
let elapsed_since_poll = elapsed_usec
.saturating_sub(poll_usec)
.min(LATENCY_MAX_INTERVAL.as_micros() as u64);
let latency = stored_latency.saturating_add(elapsed_since_poll);
let capture_time = elapsed
.checked_sub(Duration::from_micros(latency))
.unwrap_or_default();
let timestamp = InputStreamTimestamp {
callback: StreamInstant::new(elapsed.as_secs(), elapsed.subsec_nanos()),
capture: StreamInstant::new(capture_time.as_secs(), capture_time.subsec_nanos()),
};
let bps = sample_spec.format.bytes_per_sample();
let n_samples = buf.len() / bps;
let data = unsafe { Data::from_parts(buf.as_ptr() as *mut _, n_samples, format) };
data_callback(&data, &InputCallbackInfo { timestamp });
let (lock, cvar) = &*update_callback;
*lock.lock().unwrap_or_else(|e| e.into_inner()) = true;
cvar.notify_one();
};
let stream =
block_on(client.create_record_stream(params, callback)).map_err(Error::from)?;
let cancel_thread = handle.cancel.clone();
let update_thread = handle.update.clone();
let stream_clone = stream.clone();
let latency_clone = current_latency_micros.clone();
let poll_clone = last_poll_micros.clone();
let mut latch = Latch::new();
let waiter_latency = latch.waiter();
let latency_handle = std::thread::spawn(move || {
waiter_latency.wait();
loop {
if cancel_thread.load(Ordering::Relaxed) {
break;
}
let timing_info = match block_on(stream_clone.timing_info()) {
Ok(timing_info) => timing_info,
Err(e) => {
error_callback(Error::from(e));
break;
}
};
let poll_since_epoch =
Instant::now().saturating_duration_since(start).as_micros() as u64;
poll_clone.store(poll_since_epoch, Ordering::Relaxed);
store_latency(
&latency_clone,
sample_spec,
timing_info.source_usec,
timing_info.write_offset,
timing_info.read_offset,
);
let (lock, cvar) = &*update_thread;
let Ok(guard) = lock.lock() else { break };
let (mut guard, _) = cvar
.wait_timeout_while(guard, LATENCY_MAX_INTERVAL, |notified| !*notified)
.unwrap_or_else(|e| e.into_inner());
*guard = false;
}
});
latch.add_thread(latency_handle.thread().clone());
Ok(Self {
inner: StreamInner::Record(stream, start, handle),
workers: vec![latency_handle],
latch,
})
}
pub(crate) fn signal_ready(&self) {
self.latch.release();
}
}
fn store_latency(
latency_micros: &AtomicU64,
sample_spec: protocol::SampleSpec,
device_latency_usec: u64,
write_offset: i64,
read_offset: i64,
) {
let offset = (write_offset - read_offset).max(0) as u64;
let latency =
Duration::from_micros(device_latency_usec) + sample_spec.bytes_to_duration(offset as usize);
latency_micros.store(
latency.as_micros().try_into().unwrap_or(u64::MAX),
Ordering::Relaxed,
);
}