use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{SampleFormat, StreamConfig as CpalStreamConfig};
use crossbeam_queue::ArrayQueue;
use crate::resample::{catmull_rom, resampled_len};
use super::OscilloscopeConfig;
use crate::backend::{DacBackend, FifoBackend, WriteOutcome};
use crate::buffer_estimate::{BufferEstimator, QueueDepthSource, RuntimeAuthorityEstimator};
use crate::device::{DacCapabilities, DacType};
use crate::error::{Error, Result};
use crate::point::LaserPoint;
struct RuntimeState {
queue: ArrayQueue<(f32, f32)>,
muted: AtomicBool,
connected: AtomicBool,
}
impl RuntimeState {
fn new(capacity: usize) -> Self {
Self {
queue: ArrayQueue::new(capacity),
muted: AtomicBool::new(true),
connected: AtomicBool::new(false),
}
}
fn remaining_capacity(&self) -> usize {
self.queue.capacity().saturating_sub(self.queue.len())
}
fn has_capacity_for(&self, count: usize) -> bool {
count == 0 || self.remaining_capacity() >= count
}
fn queued_points(&self) -> u64 {
self.queue.len() as u64
}
fn clear_queue(&self) {
while self.queue.pop().is_some() {}
}
}
impl QueueDepthSource for RuntimeState {
fn queued_points(&self) -> u64 {
RuntimeState::queued_points(self)
}
}
struct AudioThread {
handle: JoinHandle<()>,
stop_flag: Arc<AtomicBool>,
}
pub struct OscilloscopeBackend {
device_name: String,
sample_rate: u32,
config: OscilloscopeConfig,
caps: DacCapabilities,
runtime: Option<Arc<RuntimeState>>,
audio_thread: Option<AudioThread>,
sample_buffer: Vec<(f32, f32)>,
estimator: RuntimeAuthorityEstimator,
}
impl OscilloscopeBackend {
pub fn new(device_name: String, sample_rate: u32) -> Self {
Self {
device_name,
sample_rate,
config: OscilloscopeConfig::default(),
caps: super::capabilities(sample_rate),
runtime: None,
audio_thread: None,
sample_buffer: Vec::new(),
estimator: RuntimeAuthorityEstimator::new(),
}
}
pub fn set_config(&mut self, config: OscilloscopeConfig) {
self.config = config;
}
fn point_to_samples(p: &LaserPoint, config: &OscilloscopeConfig) -> (f32, f32) {
let mut l = p.x * config.gain + config.dc_offset;
let mut r = p.y * config.gain + config.dc_offset;
if config.clip {
l = l.clamp(-1.0, 1.0);
r = r.clamp(-1.0, 1.0);
}
(l, r)
}
fn buffer_capacity(&self) -> usize {
super::buffer_capacity(self.sample_rate)
}
fn start_audio_thread(&self, runtime: &Arc<RuntimeState>) -> Result<AudioThread> {
let device_name = self.device_name.clone();
let sample_rate = self.sample_rate;
let runtime = Arc::clone(runtime);
let stop_flag = Arc::new(AtomicBool::new(false));
let stop_flag_clone = Arc::clone(&stop_flag);
let handle = thread::Builder::new()
.name(format!("oscilloscope-{}", device_name))
.spawn(move || {
if let Err(e) =
run_audio_thread(&device_name, sample_rate, &runtime, &stop_flag_clone)
{
log::error!("Oscilloscope audio thread error: {}", e);
}
runtime.connected.store(false, Ordering::Release);
})
.map_err(|e| {
Error::backend(std::io::Error::other(format!(
"Failed to spawn oscilloscope thread: {}",
e
)))
})?;
Ok(AudioThread { handle, stop_flag })
}
}
fn fill_f32_output(data: &mut [f32], runtime: &RuntimeState) {
let muted = runtime.muted.load(Ordering::Relaxed);
for chunk in data.chunks_mut(2) {
if let Some((l, r)) = runtime.queue.pop() {
if muted {
chunk[0] = 0.0;
chunk[1] = 0.0;
} else {
chunk[0] = l;
chunk[1] = r;
}
} else {
chunk[0] = 0.0;
chunk[1] = 0.0;
}
}
}
fn fill_i16_output(data: &mut [i16], runtime: &RuntimeState) {
let muted = runtime.muted.load(Ordering::Relaxed);
for chunk in data.chunks_mut(2) {
if let Some((l, r)) = runtime.queue.pop() {
if muted {
chunk[0] = 0;
chunk[1] = 0;
} else {
chunk[0] = (l * i16::MAX as f32) as i16;
chunk[1] = (r * i16::MAX as f32) as i16;
}
} else {
chunk[0] = 0;
chunk[1] = 0;
}
}
}
fn run_audio_thread(
device_name: &str,
sample_rate: u32,
runtime: &Arc<RuntimeState>,
stop_flag: &AtomicBool,
) -> Result<()> {
let host = cpal::default_host();
let device = host
.output_devices()
.map_err(|e| {
Error::backend(std::io::Error::other(format!(
"Failed to enumerate devices: {}",
e
)))
})?
.find(|d| d.name().map(|n| n == device_name).unwrap_or(false))
.ok_or_else(|| Error::disconnected(format!("Audio device '{}' not found", device_name)))?;
let supported_config = device
.supported_output_configs()
.map_err(|e| {
Error::backend(std::io::Error::other(format!(
"Failed to get audio configs: {}",
e
)))
})?
.find(|c| {
c.channels() == 2
&& c.min_sample_rate().0 <= sample_rate
&& c.max_sample_rate().0 >= sample_rate
})
.ok_or_else(|| {
Error::invalid_config(format!(
"Audio device doesn't support stereo output at {} Hz",
sample_rate
))
})?;
let sample_format = supported_config.sample_format();
let config = CpalStreamConfig {
channels: 2,
sample_rate: cpal::SampleRate(sample_rate),
buffer_size: cpal::BufferSize::Default,
};
let runtime_err = Arc::clone(runtime);
let err_callback = move |err: cpal::StreamError| {
log::error!("Oscilloscope stream error: {}", err);
if matches!(err, cpal::StreamError::DeviceNotAvailable) {
runtime_err.connected.store(false, Ordering::Release);
}
};
let stream = match sample_format {
SampleFormat::F32 => {
let rt = Arc::clone(runtime);
device
.build_output_stream(
&config,
move |data: &mut [f32], _| fill_f32_output(data, &rt),
err_callback,
None,
)
.map_err(|e| {
Error::backend(std::io::Error::other(format!(
"Failed to build audio stream: {}",
e
)))
})?
}
SampleFormat::I16 => {
let rt = Arc::clone(runtime);
device
.build_output_stream(
&config,
move |data: &mut [i16], _| fill_i16_output(data, &rt),
err_callback,
None,
)
.map_err(|e| {
Error::backend(std::io::Error::other(format!(
"Failed to build audio stream: {}",
e
)))
})?
}
format => {
return Err(Error::invalid_config(format!(
"Unsupported audio sample format: {:?}",
format
)));
}
};
stream.play().map_err(|e| {
Error::backend(std::io::Error::other(format!(
"Failed to start audio stream: {}",
e
)))
})?;
runtime.connected.store(true, Ordering::Release);
log::info!(
"Oscilloscope started for '{}' at {} Hz",
device_name,
sample_rate
);
while !stop_flag.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(10));
}
log::info!("Oscilloscope stopped for '{}'", device_name);
Ok(())
}
impl DacBackend for OscilloscopeBackend {
fn dac_type(&self) -> DacType {
DacType::Oscilloscope
}
fn caps(&self) -> &DacCapabilities {
&self.caps
}
fn connect(&mut self) -> Result<()> {
if self.is_connected() {
return Ok(());
}
let runtime = Arc::new(RuntimeState::new(self.buffer_capacity()));
let audio_thread = self.start_audio_thread(&runtime)?;
let start = std::time::Instant::now();
while !runtime.connected.load(Ordering::Acquire) {
if start.elapsed() > Duration::from_secs(5) {
audio_thread.stop_flag.store(true, Ordering::Release);
let _ = audio_thread.handle.join();
return Err(Error::backend(std::io::Error::other(
"Timeout waiting for oscilloscope connection",
)));
}
thread::sleep(Duration::from_millis(10));
}
self.estimator
.set_source(Arc::clone(&runtime) as Arc<dyn QueueDepthSource>);
self.runtime = Some(runtime);
self.audio_thread = Some(audio_thread);
log::info!(
"Connected to oscilloscope '{}' at {} Hz",
self.device_name,
self.sample_rate
);
Ok(())
}
fn disconnect(&mut self) -> Result<()> {
if let Some(audio_thread) = self.audio_thread.take() {
audio_thread.stop_flag.store(true, Ordering::Release);
let _ = audio_thread.handle.join();
}
if let Some(runtime) = self.runtime.take() {
runtime.clear_queue();
}
self.estimator.clear_source();
Ok(())
}
fn is_connected(&self) -> bool {
self.runtime
.as_ref()
.is_some_and(|rt| rt.connected.load(Ordering::Relaxed))
}
fn stop(&mut self) -> Result<()> {
if let Some(runtime) = &self.runtime {
runtime.muted.store(true, Ordering::Release);
}
Ok(())
}
fn set_shutter(&mut self, open: bool) -> Result<()> {
if let Some(runtime) = &self.runtime {
runtime.muted.store(!open, Ordering::Release);
}
Ok(())
}
}
fn catmull_rom_samples(
s0: (f32, f32),
s1: (f32, f32),
s2: (f32, f32),
s3: (f32, f32),
t: f32,
) -> (f32, f32) {
(
catmull_rom(s0.0, s1.0, s2.0, s3.0, t),
catmull_rom(s0.1, s1.1, s2.1, s3.1, t),
)
}
impl FifoBackend for OscilloscopeBackend {
fn try_write_points(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
let runtime = self
.runtime
.as_ref()
.ok_or_else(|| Error::disconnected("Not connected"))?;
if !runtime.connected.load(Ordering::Acquire) {
return Err(Error::disconnected("Not connected"));
}
if points.is_empty() {
return Ok(WriteOutcome::Written);
}
let resampling = pps != self.sample_rate;
if resampling {
let output_len = resampled_len(points.len(), pps, self.sample_rate);
if !runtime.has_capacity_for(output_len) {
return Ok(WriteOutcome::WouldBlock);
}
} else if !runtime.has_capacity_for(points.len()) {
return Ok(WriteOutcome::WouldBlock);
}
self.sample_buffer.clear();
self.sample_buffer.extend(
points
.iter()
.map(|p| Self::point_to_samples(p, &self.config)),
);
if resampling {
let output_len = resampled_len(self.sample_buffer.len(), pps, self.sample_rate);
let last_src_idx = (self.sample_buffer.len() - 1) as f32;
let step = if output_len > 1 {
last_src_idx / (output_len - 1) as f32
} else {
0.0
};
let last = self.sample_buffer.len() - 1;
for i in 0..output_len {
let src_pos = i as f32 * step;
let idx = (src_pos as usize).min(last);
let t = src_pos - idx as f32;
let s0 = self.sample_buffer[idx.saturating_sub(1)];
let s1 = self.sample_buffer[idx];
let s2 = self.sample_buffer[(idx + 1).min(last)];
let s3 = self.sample_buffer[(idx + 2).min(last)];
let _ = runtime.queue.push(catmull_rom_samples(s0, s1, s2, s3, t));
}
} else {
for &sample in &self.sample_buffer {
let _ = runtime.queue.push(sample);
}
}
Ok(WriteOutcome::Written)
}
fn estimator(&self) -> &dyn BufferEstimator {
&self.estimator
}
}