1use bytes::Bytes;
4
5use crate::codec::{Decoder, DecoderOutput};
6use crate::resample::Resampler;
7use crate::{AudioError, Frame};
8
9pub struct AudioConsumer {
15 decoder: Decoder,
16 track: moq_mux::container::Consumer<moq_mux::container::legacy::Wire>,
17 resampler: Option<Resampler>,
18 output: DecoderOutput,
19 resolved_sample_rate: u32,
20 resolved_channels: u32,
21}
22
23impl AudioConsumer {
24 pub fn new(
27 broadcast: &moq_net::BroadcastConsumer,
28 catalog: &hang::catalog::AudioConfig,
29 name: impl Into<String>,
30 output: DecoderOutput,
31 ) -> Result<Self, AudioError> {
32 let decoder = Decoder::new(catalog)?;
33 let sample_rate = output.sample_rate.unwrap_or_else(|| decoder.sample_rate());
34 let channels = output.channels.unwrap_or_else(|| decoder.channel_count());
35
36 if channels != decoder.channel_count() {
37 return Err(AudioError::Unsupported(format!(
38 "channel remapping not implemented (decoder {}ch, requested {channels}ch)",
39 decoder.channel_count()
40 )));
41 }
42
43 let resampler = if sample_rate == decoder.sample_rate() {
44 None
45 } else {
46 let chunk_frames = (decoder.sample_rate() as usize * 20) / 1000;
47 Some(Resampler::new(
48 decoder.sample_rate(),
49 sample_rate,
50 decoder.channel_count(),
51 chunk_frames,
52 )?)
53 };
54
55 let name = name.into();
56 let track = broadcast.subscribe_track(&moq_net::Track { name, priority: 0 })?;
57 let mut track = moq_mux::container::Consumer::new(track, moq_mux::container::legacy::Wire);
58 if let Some(latency) = output.latency_max {
59 track = track.with_latency(latency);
60 }
61
62 Ok(Self {
63 decoder,
64 track,
65 resampler,
66 output,
67 resolved_sample_rate: sample_rate,
68 resolved_channels: channels,
69 })
70 }
71
72 pub fn output(&self) -> &DecoderOutput {
73 &self.output
74 }
75
76 pub fn sample_rate(&self) -> u32 {
79 self.resolved_sample_rate
80 }
81
82 pub fn channels(&self) -> u32 {
84 self.resolved_channels
85 }
86
87 pub async fn read(&mut self) -> Result<Option<Frame>, AudioError> {
89 let Some(mux_frame) = self.track.read().await? else {
90 return Ok(None);
91 };
92
93 let ts_us: u64 = mux_frame
94 .timestamp
95 .as_micros()
96 .try_into()
97 .map_err(|_| AudioError::Unsupported("timestamp overflow".into()))?;
98
99 let decoded = self.decoder.decode_f32(&mux_frame.payload)?;
100 let pcm = match self.resampler.as_mut() {
101 Some(r) => r.process(&decoded)?,
102 None => decoded,
103 };
104
105 let bytes = self.output.format.from_interleaved_f32(&pcm, self.resolved_channels)?;
106 Ok(Some(Frame {
107 timestamp_us: ts_us,
108 data: Bytes::from(bytes),
109 }))
110 }
111}