use bytes::Bytes;
use moq_mux::container::{Frame as MuxFrame, Timestamp};
use crate::codec::{Encoder, EncoderInput, EncoderOutput};
use crate::resample::Resampler;
use crate::{AudioError, Frame};
pub struct AudioProducer {
encoder: Encoder,
resampler: Option<Resampler>,
track: moq_mux::container::Producer<moq_mux::container::legacy::Wire>,
track_name: String,
catalog: moq_mux::catalog::hang::Producer,
pending: Vec<f32>,
frames_produced: u64,
}
impl AudioProducer {
pub fn new(
broadcast: &mut moq_net::BroadcastProducer,
catalog: moq_mux::catalog::hang::Producer,
name: impl Into<String>,
input: EncoderInput,
output: EncoderOutput,
) -> Result<Self, AudioError> {
let encoder = Encoder::new(input, output)?;
let resampler = if encoder.input().sample_rate == encoder.codec_rate() {
None
} else {
let chunk_frames = ((encoder.input().sample_rate as u128 * encoder.output().frame_duration.as_micros())
/ 1_000_000) as usize;
Some(Resampler::new(
encoder.input().sample_rate,
encoder.codec_rate(),
encoder.input().channels,
chunk_frames,
)?)
};
let name = name.into();
let track = broadcast.create_track(moq_net::Track {
name: name.clone(),
priority: 0,
})?;
let track = moq_mux::container::Producer::new(track, moq_mux::container::legacy::Wire);
let mut catalog_mut = catalog.clone();
catalog_mut.lock().audio.insert(&name, encoder.catalog())?;
Ok(Self {
encoder,
resampler,
track,
track_name: name,
catalog,
pending: Vec::new(),
frames_produced: 0,
})
}
pub fn track_name(&self) -> &str {
&self.track_name
}
pub fn write(&mut self, frame: &Frame) -> Result<(), AudioError> {
let _ = frame.timestamp_us;
let input = self.encoder.input();
let pcm = input.format.as_interleaved_f32(frame.data.as_ref(), input.channels)?;
let pcm: Vec<f32> = match self.resampler.as_mut() {
Some(r) => r.process(&pcm)?,
None => pcm.into_owned(),
};
self.pending.extend(pcm);
let frame_samples = self.encoder.frame_size() * self.encoder.codec_channels() as usize;
while self.pending.len() >= frame_samples {
let chunk: Vec<f32> = self.pending.drain(..frame_samples).collect();
let packet = self.encoder.encode_f32(&chunk)?;
let timestamp =
Timestamp::from_micros((self.frames_produced * 1_000_000) / self.encoder.codec_rate() as u64)?;
self.frames_produced += self.encoder.frame_size() as u64;
self.publish(packet, timestamp)?;
}
Ok(())
}
fn publish(&mut self, payload: Bytes, timestamp: Timestamp) -> Result<(), AudioError> {
let mux_frame = MuxFrame {
timestamp,
payload,
keyframe: true,
};
self.track.write(mux_frame)?;
self.track.finish_group()?;
Ok(())
}
pub fn finish(mut self) -> Result<(), AudioError> {
let frame_samples = self.encoder.frame_size() * self.encoder.codec_channels() as usize;
if !self.pending.is_empty() {
self.pending.resize(frame_samples, 0.0);
let chunk = std::mem::take(&mut self.pending);
let packet = self.encoder.encode_f32(&chunk)?;
let timestamp =
Timestamp::from_micros((self.frames_produced * 1_000_000) / self.encoder.codec_rate() as u64)?;
self.publish(packet, timestamp)?;
}
self.track.finish()?;
Ok(())
}
}
impl Drop for AudioProducer {
fn drop(&mut self) {
self.catalog.lock().audio.remove(&self.track_name);
}
}