Skip to main content

moq_audio/
consumer.rs

1//! Subscribe to an encoded audio track and emit raw PCM.
2
3use bytes::Bytes;
4
5use crate::codec::{Decoder, DecoderOutput};
6use crate::resample::Resampler;
7use crate::{AudioError, Frame};
8
9/// Subscribe to a moq-mux audio track and emit decoded PCM in the
10/// format declared by [`DecoderOutput`].
11///
12/// Output format / sample rate / channel count are fixed at
13/// construction; [`read`](Self::read) returns plain [`Frame`]s.
14pub struct AudioConsumer {
15	decoder: Decoder,
16	track: moq_mux::container::Consumer<moq_mux::container::legacy::Wire>,
17	resampler: Option<Resampler>,
18	output: DecoderOutput,
19	resolved_sample_rate: u32,
20	resolved_channels: u32,
21}
22
23impl AudioConsumer {
24	/// Subscribe to `name` in `broadcast` using the catalog entry to
25	/// pick the codec.
26	pub fn new(
27		broadcast: &moq_net::BroadcastConsumer,
28		catalog: &hang::catalog::AudioConfig,
29		name: impl Into<String>,
30		output: DecoderOutput,
31	) -> Result<Self, AudioError> {
32		let decoder = Decoder::new(catalog)?;
33		let sample_rate = output.sample_rate.unwrap_or_else(|| decoder.sample_rate());
34		let channels = output.channels.unwrap_or_else(|| decoder.channel_count());
35
36		if channels != decoder.channel_count() {
37			return Err(AudioError::Unsupported(format!(
38				"channel remapping not implemented (decoder {}ch, requested {channels}ch)",
39				decoder.channel_count()
40			)));
41		}
42
43		let resampler = if sample_rate == decoder.sample_rate() {
44			None
45		} else {
46			let chunk_frames = (decoder.sample_rate() as usize * 20) / 1000;
47			Some(Resampler::new(
48				decoder.sample_rate(),
49				sample_rate,
50				decoder.channel_count(),
51				chunk_frames,
52			)?)
53		};
54
55		let name = name.into();
56		let track = broadcast.subscribe_track(&moq_net::Track { name, priority: 0 })?;
57		let mut track = moq_mux::container::Consumer::new(track, moq_mux::container::legacy::Wire);
58		if let Some(latency) = output.latency_max {
59			track = track.with_latency(latency);
60		}
61
62		Ok(Self {
63			decoder,
64			track,
65			resampler,
66			output,
67			resolved_sample_rate: sample_rate,
68			resolved_channels: channels,
69		})
70	}
71
72	pub fn output(&self) -> &DecoderOutput {
73		&self.output
74	}
75
76	/// Sample rate samples are actually delivered at (= `output.sample_rate`
77	/// if set, otherwise the decoder's native rate).
78	pub fn sample_rate(&self) -> u32 {
79		self.resolved_sample_rate
80	}
81
82	/// Channel count samples are actually delivered at.
83	pub fn channels(&self) -> u32 {
84		self.resolved_channels
85	}
86
87	/// Read the next decoded PCM frame, or `None` when the track ends.
88	pub async fn read(&mut self) -> Result<Option<Frame>, AudioError> {
89		let Some(mux_frame) = self.track.read().await? else {
90			return Ok(None);
91		};
92
93		let ts_us: u64 = mux_frame
94			.timestamp
95			.as_micros()
96			.try_into()
97			.map_err(|_| AudioError::Unsupported("timestamp overflow".into()))?;
98
99		let decoded = self.decoder.decode_f32(&mux_frame.payload)?;
100		let pcm = match self.resampler.as_mut() {
101			Some(r) => r.process(&decoded)?,
102			None => decoded,
103		};
104
105		let bytes = self.output.format.from_interleaved_f32(&pcm, self.resolved_channels)?;
106		Ok(Some(Frame {
107			timestamp_us: ts_us,
108			data: Bytes::from(bytes),
109		}))
110	}
111}