use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Condvar, Mutex};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use crate::{AudioError, AudioFormat, AudioProducer, EncoderInput, EncoderOutput, Frame};
#[derive(Clone, Debug, Default)]
#[non_exhaustive]
pub struct Config {
pub device: Option<String>,
pub sample_rate: Option<u32>,
pub channels: Option<u32>,
}
pub struct Microphone {
_stream: cpal::Stream,
rx: Receiver<Vec<f32>>,
sample_rate: u32,
channels: u32,
frames_read: u64,
}
impl Microphone {
pub fn format(config: &Config) -> Result<(u32, u32), AudioError> {
let (_, _, stream_config) = resolve(config)?;
Ok((stream_config.sample_rate, stream_config.channels as u32))
}
pub fn open(config: &Config) -> Result<Self, AudioError> {
let (device, sample_format, stream_config) = resolve(config)?;
let sample_rate = stream_config.sample_rate;
let channels = stream_config.channels as u32;
let (tx, rx) = std::sync::mpsc::channel::<Vec<f32>>();
let stream = match sample_format {
cpal::SampleFormat::F32 => device.build_input_stream(
stream_config,
move |data: &[f32], _: &_| forward(&tx, data.to_vec()),
stream_err,
None,
),
cpal::SampleFormat::I16 => device.build_input_stream(
stream_config,
move |data: &[i16], _: &_| forward(&tx, data.iter().map(|&s| s as f32 / 32768.0).collect()),
stream_err,
None,
),
cpal::SampleFormat::U16 => device.build_input_stream(
stream_config,
move |data: &[u16], _: &_| forward(&tx, data.iter().map(|&s| (s as f32 - 32768.0) / 32768.0).collect()),
stream_err,
None,
),
other => {
return Err(AudioError::Unsupported(format!(
"unsupported input sample format {other:?}"
)));
}
}
.map_err(cpal_err)?;
stream.play().map_err(cpal_err)?;
tracing::info!(device = %device, sample_rate, channels, "opened microphone");
Ok(Self {
_stream: stream,
rx,
sample_rate,
channels,
frames_read: 0,
})
}
pub fn sample_rate(&self) -> u32 {
self.sample_rate
}
pub fn channels(&self) -> u32 {
self.channels
}
pub fn read(&mut self) -> Result<Option<Frame>, AudioError> {
let Ok(samples) = self.rx.recv() else {
return Ok(None); };
let timestamp_us = self.frames_read * 1_000_000 / self.sample_rate as u64;
self.frames_read += (samples.len() / self.channels.max(1) as usize) as u64;
let mut bytes = Vec::with_capacity(samples.len() * 4);
for sample in &samples {
bytes.extend_from_slice(&sample.to_le_bytes());
}
Ok(Some(Frame {
timestamp_us,
data: bytes.into(),
}))
}
}
pub async fn publish_microphone(
mut broadcast: moq_net::BroadcastProducer,
catalog: moq_mux::catalog::Producer,
config: Config,
track_name: impl Into<String>,
output: EncoderOutput,
clock: moq_mux::Clock,
) -> Result<(), AudioError> {
let (sample_rate, channels) = Microphone::format(&config)?;
let input = EncoderInput {
format: AudioFormat::F32,
sample_rate,
channels,
};
let producer = AudioProducer::new(&mut broadcast, catalog, track_name, input, output)?;
let track = producer.track().clone();
let gate = Gate::new();
let worker_gate = gate.clone();
let mut worker = tokio::task::spawn_blocking(move || capture_loop(producer, config, worker_gate, clock));
tokio::select! {
res = &mut worker => res.map_err(task_err)?,
() = monitor_demand(&track, &gate) => {
gate.close();
worker.await.map_err(task_err)?
}
}
}
async fn monitor_demand(track: &moq_net::TrackProducer, gate: &Gate) {
loop {
match track.used().await {
Ok(()) => gate.set_active(true),
Err(err) => return log_track_ended(err),
}
match track.unused().await {
Ok(()) => gate.set_active(false),
Err(err) => return log_track_ended(err),
}
}
}
fn log_track_ended(err: moq_net::Error) {
if matches!(err, moq_net::Error::Dropped | moq_net::Error::Closed) {
tracing::debug!("audio track no longer announced; stopping capture");
} else {
tracing::warn!(error = %err, "audio track aborted; stopping capture");
}
}
fn capture_loop(
mut producer: AudioProducer,
config: Config,
gate: Arc<Gate>,
clock: moq_mux::Clock,
) -> Result<(), AudioError> {
let mut mic: Option<Microphone> = None;
loop {
if !gate.is_active() {
if mic.take().is_some() {
producer.reset_epoch();
tracing::info!("no listeners: released microphone");
}
if !gate.wait_active() {
break; }
continue;
}
if mic.is_none() {
mic = Some(Microphone::open(&config)?);
}
let Some(mut frame) = mic.as_mut().expect("mic open above").read()? else {
break; };
frame.timestamp_us = clock.micros();
producer.write(&frame)?;
}
producer.finish()?;
Ok(())
}
fn task_err(err: tokio::task::JoinError) -> AudioError {
AudioError::Unsupported(format!("capture task: {err}"))
}
struct Gate {
state: Mutex<GateState>,
cond: Condvar,
}
#[derive(Default)]
struct GateState {
active: bool,
closed: bool,
}
impl Gate {
fn new() -> Arc<Self> {
Arc::new(Self {
state: Mutex::new(GateState::default()),
cond: Condvar::new(),
})
}
fn set_active(&self, active: bool) {
let mut state = self.state.lock().unwrap();
state.active = active;
self.cond.notify_all();
}
fn close(&self) {
let mut state = self.state.lock().unwrap();
state.active = false;
state.closed = true;
self.cond.notify_all();
}
fn is_active(&self) -> bool {
self.state.lock().unwrap().active
}
fn wait_active(&self) -> bool {
let mut state = self.state.lock().unwrap();
while !state.active && !state.closed {
state = self.cond.wait(state).unwrap();
}
!state.closed
}
}
fn forward(tx: &Sender<Vec<f32>>, samples: Vec<f32>) {
let _ = tx.send(samples);
}
fn resolve(config: &Config) -> Result<(cpal::Device, cpal::SampleFormat, cpal::StreamConfig), AudioError> {
let host = cpal::default_host();
let device = match &config.device {
Some(name) => host
.input_devices()
.map_err(cpal_err)?
.find(|d| d.to_string() == *name)
.ok_or_else(|| AudioError::Unsupported(format!("input device {name:?} not found")))?,
None => host
.default_input_device()
.ok_or_else(|| AudioError::Unsupported("no default input device".into()))?,
};
let supported = device.default_input_config().map_err(cpal_err)?;
let sample_format = supported.sample_format();
let mut stream_config = supported.config();
if let Some(rate) = config.sample_rate {
stream_config.sample_rate = rate;
}
if let Some(channels) = config.channels {
stream_config.channels = channels as u16;
}
Ok((device, sample_format, stream_config))
}
fn stream_err(err: cpal::Error) {
tracing::error!(error = %err, "microphone stream error");
}
fn cpal_err(err: cpal::Error) -> AudioError {
AudioError::Unsupported(format!("audio capture: {err}"))
}