use bytes::Bytes;
use moq_mux::container::{Frame as MuxFrame, Timestamp};
use crate::codec::{Encoder, EncoderInput, EncoderOutput};
use crate::resample::Resampler;
use crate::{AudioError, Frame};
pub struct AudioProducer {
encoder: Encoder,
resampler: Option<Resampler>,
track: moq_mux::container::Producer<moq_mux::container::legacy::Wire>,
track_name: String,
catalog: moq_mux::catalog::Producer,
pending: Vec<f32>,
frames_produced: u64,
epoch_us: Option<u64>,
}
impl AudioProducer {
pub fn new(
broadcast: &mut moq_net::BroadcastProducer,
catalog: moq_mux::catalog::Producer,
name: impl Into<String>,
input: EncoderInput,
output: EncoderOutput,
) -> Result<Self, AudioError> {
let encoder = Encoder::new(input, output)?;
let resampler = if encoder.input().sample_rate == encoder.codec_rate() {
None
} else {
let chunk_frames = ((encoder.input().sample_rate as u128 * encoder.output().frame_duration.as_micros())
/ 1_000_000) as usize;
Some(Resampler::new(
encoder.input().sample_rate,
encoder.codec_rate(),
encoder.input().channels,
chunk_frames,
)?)
};
let name = name.into();
let track = broadcast.create_track(moq_net::Track {
name: name.clone(),
priority: 0,
})?;
let track = moq_mux::container::Producer::new(track, moq_mux::container::legacy::Wire);
let mut catalog_mut = catalog.clone();
catalog_mut.lock().audio.insert(&name, encoder.catalog())?;
Ok(Self {
encoder,
resampler,
track,
track_name: name,
catalog,
pending: Vec::new(),
frames_produced: 0,
epoch_us: None,
})
}
pub fn track_name(&self) -> &str {
&self.track_name
}
pub fn track(&self) -> &moq_net::TrackProducer {
self.track.track()
}
pub fn reset_epoch(&mut self) {
self.epoch_us = None;
self.frames_produced = 0;
self.pending.clear();
}
pub fn write(&mut self, frame: &Frame) -> Result<(), AudioError> {
let epoch_us = *self.epoch_us.get_or_insert(frame.timestamp_us);
let input = self.encoder.input();
let pcm = input.format.as_interleaved_f32(frame.data.as_ref(), input.channels)?;
let pcm: Vec<f32> = match self.resampler.as_mut() {
Some(r) => r.process(&pcm)?,
None => pcm.into_owned(),
};
self.pending.extend(pcm);
let frame_samples = self.encoder.frame_size() * self.encoder.codec_channels() as usize;
while self.pending.len() >= frame_samples {
let chunk: Vec<f32> = self.pending.drain(..frame_samples).collect();
let packet = self.encoder.encode_f32(&chunk)?;
let timestamp = self.timestamp(epoch_us)?;
self.frames_produced += self.encoder.frame_size() as u64;
self.publish(packet, timestamp)?;
}
Ok(())
}
fn timestamp(&self, epoch_us: u64) -> Result<Timestamp, AudioError> {
let offset_us = (self.frames_produced * 1_000_000) / self.encoder.codec_rate() as u64;
Ok(Timestamp::from_micros(epoch_us + offset_us)?)
}
fn publish(&mut self, payload: Bytes, timestamp: Timestamp) -> Result<(), AudioError> {
let mux_frame = MuxFrame {
timestamp,
payload,
keyframe: true,
};
self.track.write(mux_frame)?;
self.track.finish_group()?;
Ok(())
}
pub fn finish(mut self) -> Result<(), AudioError> {
let frame_samples = self.encoder.frame_size() * self.encoder.codec_channels() as usize;
if !self.pending.is_empty() {
self.pending.resize(frame_samples, 0.0);
let chunk = std::mem::take(&mut self.pending);
let packet = self.encoder.encode_f32(&chunk)?;
let timestamp = self.timestamp(self.epoch_us.unwrap_or(0))?;
self.publish(packet, timestamp)?;
}
self.track.finish()?;
Ok(())
}
}
impl Drop for AudioProducer {
fn drop(&mut self) {
self.catalog.lock().audio.remove(&self.track_name);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{AudioFormat, EncoderInput, EncoderOutput};
fn full_frame(timestamp_us: u64) -> Frame {
let mut data = Vec::with_capacity(960 * 4);
for _ in 0..960 {
data.extend_from_slice(&0.1f32.to_le_bytes());
}
Frame {
timestamp_us,
data: data.into(),
}
}
async fn published_pts(frames: &[Frame], reset_before: Option<usize>) -> Vec<u128> {
let mut broadcast = moq_net::Broadcast::new().produce();
let catalog = moq_mux::catalog::Producer::new(&mut broadcast).unwrap();
let consumer = broadcast.consume();
let input = EncoderInput {
format: AudioFormat::F32,
sample_rate: 48_000,
channels: 1,
};
let mut producer =
AudioProducer::new(&mut broadcast, catalog, "audio", input, EncoderOutput::default()).unwrap();
let track = consumer
.subscribe_track(&moq_net::Track {
name: "audio".into(),
priority: 0,
})
.unwrap();
let mut reader = moq_mux::container::Consumer::new(track, moq_mux::container::legacy::Wire);
let mut pts = Vec::new();
for (i, frame) in frames.iter().enumerate() {
if reset_before == Some(i) {
producer.reset_epoch();
}
producer.write(frame).unwrap();
let read = reader.read().await.unwrap().expect("a packet per full frame");
pts.push(read.timestamp.as_micros());
}
pts
}
#[tokio::test]
async fn epoch_anchors_to_first_frame_timestamp() {
let pts = published_pts(&[full_frame(1_000_000)], None).await;
assert_eq!(pts, vec![1_000_000]);
}
#[tokio::test]
async fn pts_advances_by_frame_duration_ignoring_later_timestamps() {
let pts = published_pts(&[full_frame(1_000), full_frame(999_999)], None).await;
assert_eq!(pts, vec![1_000, 1_000 + 20_000]);
}
#[tokio::test]
async fn reset_epoch_reanchors_so_the_gap_lands_in_pts() {
let pts = published_pts(&[full_frame(0), full_frame(5_000_000)], Some(1)).await;
assert_eq!(pts, vec![0, 5_000_000]);
}
}