Skip to main content

moq/
audio.rs

1//! Raw-audio import/export via [`moq_audio`].
2//!
3//! Sibling to `moq_publish_media_*` / `moq_consume_audio_ordered`
4//! (those handle already-encoded frames). These functions accept and
5//! return raw PCM, with Opus encode/decode happening inside the FFI
6//! boundary.
7//!
8//! Format / sample rate / channel count are fixed at producer or
9//! consumer construction via [`moq_audio_encoder_input`] /
10//! [`moq_audio_encoder_output`] / [`moq_audio_decoder_output`], so
11//! each [`moq_audio_frame`] carries only payload bytes and a
12//! timestamp.
13
14use std::ffi::{c_char, c_void};
15use std::time::Duration;
16
17use bytes::Bytes;
18use tokio::sync::oneshot;
19
20use crate::ffi::OnStatus;
21use crate::{Error, Id, NonZeroSlab, State, ffi};
22
23// ---- C-visible types ----
24
25/// Raw PCM sample layout, mirroring WebCodecs `AudioData.format`.
26///
27/// The enum is exposed in the C header for readability, but ABI
28/// fields/parameters that carry it are typed `u32`. A C caller
29/// passing an unknown discriminant gets `Error::InvalidCode` instead
30/// of UB.
31///
32/// <https://developer.mozilla.org/en-US/docs/Web/API/AudioData/format>
33#[repr(C)]
34#[allow(non_camel_case_types)]
35#[derive(Clone, Copy, Debug)]
36pub enum moq_audio_format {
37	MOQ_AUDIO_FORMAT_U8 = 0,
38	MOQ_AUDIO_FORMAT_S16 = 1,
39	MOQ_AUDIO_FORMAT_S32 = 2,
40	MOQ_AUDIO_FORMAT_F32 = 3,
41	MOQ_AUDIO_FORMAT_U8_PLANAR = 4,
42	MOQ_AUDIO_FORMAT_S16_PLANAR = 5,
43	MOQ_AUDIO_FORMAT_S32_PLANAR = 6,
44	MOQ_AUDIO_FORMAT_F32_PLANAR = 7,
45}
46
47fn audio_format_from_u32(value: u32) -> Result<moq_audio::AudioFormat, Error> {
48	use moq_audio::AudioFormat;
49	Ok(match value {
50		v if v == moq_audio_format::MOQ_AUDIO_FORMAT_U8 as u32 => AudioFormat::U8,
51		v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S16 as u32 => AudioFormat::S16,
52		v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S32 as u32 => AudioFormat::S32,
53		v if v == moq_audio_format::MOQ_AUDIO_FORMAT_F32 as u32 => AudioFormat::F32,
54		v if v == moq_audio_format::MOQ_AUDIO_FORMAT_U8_PLANAR as u32 => AudioFormat::U8Planar,
55		v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S16_PLANAR as u32 => AudioFormat::S16Planar,
56		v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S32_PLANAR as u32 => AudioFormat::S32Planar,
57		v if v == moq_audio_format::MOQ_AUDIO_FORMAT_F32_PLANAR as u32 => AudioFormat::F32Planar,
58		_ => return Err(Error::InvalidCode),
59	})
60}
61
62/// PCM layout the caller hands to [`moq_publish_audio_raw_frame`].
63#[repr(C)]
64#[allow(non_camel_case_types)]
65pub struct moq_audio_encoder_input {
66	/// `moq_audio_format` discriminant.
67	pub format: u32,
68	pub sample_rate: u32,
69	pub channels: u32,
70}
71
72/// Codec-side configuration. `sample_rate` / `channels` = 0 means
73/// "match the input (snapping the rate up to a libopus-supported
74/// value if necessary)".
75#[repr(C)]
76#[allow(non_camel_case_types)]
77pub struct moq_audio_encoder_output {
78	/// Codec id, UTF-8 (currently only "opus").
79	pub codec: *const c_char,
80	pub codec_len: usize,
81	/// 0 = derive from input.
82	pub sample_rate: u32,
83	/// 0 = derive from input.
84	pub channels: u32,
85	/// 0 = libopus default.
86	pub bitrate: u32,
87	/// Encoded frame duration in milliseconds. Opus accepts
88	/// 2.5/5/10/20/40/60 ms; pass 20 to match the JS publish path.
89	/// (For 2.5 ms, the caller must pre-round; integer ms only.)
90	pub frame_duration_ms: u32,
91}
92
93/// PCM layout the caller wants out of [`moq_consume_audio_raw`].
94#[repr(C)]
95#[allow(non_camel_case_types)]
96pub struct moq_audio_decoder_output {
97	pub format: u32,
98	/// 0 = deliver at the codec's native sample rate.
99	pub sample_rate: u32,
100	/// 0 = deliver at the codec's native channel count.
101	pub channels: u32,
102	/// Upper bound on buffering before skipping a stalled group, in
103	/// milliseconds. Same congestion-control knob as
104	/// `moq_consume_audio_ordered`'s `max_latency_ms`. 0 = skip
105	/// aggressively (the moq-mux default); set to your playout
106	/// buffer (tens to a few hundred ms) for a softer skip. Named
107	/// `_max` to leave room for a future `latency_min_ms`
108	/// (jitter-buffer floor).
109	pub latency_max_ms: u64,
110}
111
112/// One audio frame: payload bytes plus a presentation timestamp.
113///
114/// `data` is owned by the consume slab (see
115/// [`moq_consume_audio_raw_frame_free`]) or borrowed by the publish call
116/// (the publisher copies before returning).
117#[repr(C)]
118#[allow(non_camel_case_types)]
119pub struct moq_audio_frame {
120	pub timestamp_us: u64,
121	pub data: *const u8,
122	pub data_size: usize,
123}
124
125// ---- State extensions (used internally by lib.rs) ----
126
127#[derive(Default)]
128pub struct Audio {
129	producers: NonZeroSlab<moq_audio::AudioProducer>,
130	consumer_tasks: NonZeroSlab<Option<AudioTaskEntry>>,
131	frames: NonZeroSlab<moq_audio::Frame>,
132}
133
134struct AudioTaskEntry {
135	#[allow(dead_code)] // Dropping signals shutdown via channel.
136	close: oneshot::Sender<()>,
137	callback: OnStatus,
138}
139
140impl Audio {
141	pub fn publish(
142		&mut self,
143		broadcast: &mut moq_net::BroadcastProducer,
144		catalog: moq_mux::catalog::hang::Producer,
145		name: &str,
146		input: moq_audio::EncoderInput,
147		output: moq_audio::EncoderOutput,
148	) -> Result<Id, Error> {
149		let producer = moq_audio::AudioProducer::new(broadcast, catalog, name, input, output)?;
150		self.producers.insert(producer)
151	}
152
153	pub fn publish_frame(&mut self, id: Id, frame: moq_audio::Frame) -> Result<(), Error> {
154		let producer = self.producers.get_mut(id).ok_or(Error::MediaNotFound)?;
155		producer.write(&frame)?;
156		Ok(())
157	}
158
159	pub fn publish_close(&mut self, id: Id) -> Result<(), Error> {
160		let producer = self.producers.remove(id).ok_or(Error::MediaNotFound)?;
161		producer.finish()?;
162		Ok(())
163	}
164
165	pub fn consume(
166		&mut self,
167		broadcast: &moq_net::BroadcastConsumer,
168		catalog: &hang::catalog::AudioConfig,
169		name: &str,
170		output: moq_audio::DecoderOutput,
171		on_frame: OnStatus,
172	) -> Result<Id, Error> {
173		let consumer = moq_audio::AudioConsumer::new(broadcast, catalog, name, output)?;
174
175		let channel = oneshot::channel();
176		let entry = AudioTaskEntry {
177			close: channel.0,
178			callback: on_frame,
179		};
180		let id = self.consumer_tasks.insert(Some(entry))?;
181
182		tokio::spawn(async move {
183			let res = tokio::select! {
184				res = Self::run(id, consumer) => res,
185				_ = channel.1 => Ok(()),
186			};
187
188			if let Some(entry) = State::lock().audio.consumer_tasks.remove(id).flatten() {
189				entry.callback.call(res);
190			}
191		});
192
193		Ok(id)
194	}
195
196	async fn run(task_id: Id, mut consumer: moq_audio::AudioConsumer) -> Result<(), Error> {
197		while let Some(frame) = consumer.read().await? {
198			let mut state = State::lock();
199			let Some(Some(entry)) = state.audio.consumer_tasks.get(task_id) else {
200				return Ok(());
201			};
202			let callback = entry.callback;
203			let frame_id = state.audio.frames.insert(frame)?;
204			drop(state);
205
206			callback.call(Ok(frame_id));
207		}
208		Ok(())
209	}
210
211	pub fn consume_close(&mut self, id: Id) -> Result<(), Error> {
212		self.consumer_tasks
213			.get_mut(id)
214			.ok_or(Error::TrackNotFound)?
215			.take()
216			.ok_or(Error::TrackNotFound)?;
217		Ok(())
218	}
219
220	pub fn frame_info(&self, id: Id, dst: &mut moq_audio_frame) -> Result<(), Error> {
221		let frame = self.frames.get(id).ok_or(Error::FrameNotFound)?;
222		*dst = moq_audio_frame {
223			timestamp_us: frame.timestamp_us,
224			data: frame.data.as_ptr(),
225			data_size: frame.data.len(),
226		};
227		Ok(())
228	}
229
230	pub fn frame_free(&mut self, id: Id) -> Result<(), Error> {
231		self.frames.remove(id).ok_or(Error::FrameNotFound)?;
232		Ok(())
233	}
234}
235
236// ---- C entry points ----
237
238/// Open an audio track on a broadcast.
239///
240/// The encoder configuration is fixed at construction; subsequent
241/// frame writes pass only payload + timestamp via
242/// [`moq_publish_audio_raw_frame`].
243///
244/// Returns a non-zero handle on success or a negative error code.
245///
246/// # Safety
247/// - `name` must point to `name_len` bytes of UTF-8.
248/// - `input` / `output` must point to fully populated structs.
249/// - `output->codec` must point to `output->codec_len` bytes of UTF-8.
250#[unsafe(no_mangle)]
251pub unsafe extern "C" fn moq_publish_audio_raw(
252	broadcast: u32,
253	name: *const c_char,
254	name_len: usize,
255	input: *const moq_audio_encoder_input,
256	output: *const moq_audio_encoder_output,
257) -> i32 {
258	ffi::enter(move || {
259		let broadcast = ffi::parse_id(broadcast)?;
260		let name = unsafe { ffi::parse_str(name, name_len)? }.to_string();
261		let raw_input = unsafe { input.as_ref() }.ok_or(Error::InvalidPointer)?;
262		let raw_output = unsafe { output.as_ref() }.ok_or(Error::InvalidPointer)?;
263		let codec_str = unsafe { ffi::parse_str(raw_output.codec, raw_output.codec_len)? };
264
265		let encoder_input = moq_audio::EncoderInput {
266			format: audio_format_from_u32(raw_input.format)?,
267			sample_rate: raw_input.sample_rate,
268			channels: raw_input.channels,
269		};
270		let encoder_output = moq_audio::EncoderOutput {
271			codec: codec_str
272				.parse()
273				.map_err(|_| Error::UnknownFormat(codec_str.to_string()))?,
274			sample_rate: if raw_output.sample_rate == 0 {
275				None
276			} else {
277				Some(raw_output.sample_rate)
278			},
279			channels: if raw_output.channels == 0 {
280				None
281			} else {
282				Some(raw_output.channels)
283			},
284			bitrate: if raw_output.bitrate == 0 {
285				None
286			} else {
287				Some(raw_output.bitrate)
288			},
289			frame_duration: Duration::from_millis(raw_output.frame_duration_ms.into()),
290		};
291
292		let mut state = State::lock();
293		let State { publish, audio, .. } = &mut *state;
294		let (broadcast_producer, catalog) = publish.pair_mut(broadcast)?;
295
296		audio.publish(
297			broadcast_producer,
298			catalog.clone(),
299			&name,
300			encoder_input,
301			encoder_output,
302		)
303	})
304}
305
306/// Push one audio frame.
307///
308/// `frame->data` is borrowed for the duration of the call; the
309/// producer copies before returning.
310///
311/// # Safety
312/// - `frame` must point to a valid [`moq_audio_frame`].
313/// - `frame->data` must point to `frame->data_size` bytes.
314#[unsafe(no_mangle)]
315pub unsafe extern "C" fn moq_publish_audio_raw_frame(producer: u32, frame: *const moq_audio_frame) -> i32 {
316	ffi::enter(move || {
317		let producer = ffi::parse_id(producer)?;
318		let frame = unsafe { frame.as_ref() }.ok_or(Error::InvalidPointer)?;
319		let data = unsafe { ffi::parse_slice(frame.data, frame.data_size)? };
320
321		let owned = moq_audio::Frame {
322			timestamp_us: frame.timestamp_us,
323			data: Bytes::copy_from_slice(data),
324		};
325
326		State::lock().audio.publish_frame(producer, owned)
327	})
328}
329
330/// Flush any pending samples and finalize an audio producer.
331#[unsafe(no_mangle)]
332pub extern "C" fn moq_publish_audio_raw_close(producer: u32) -> i32 {
333	ffi::enter(move || {
334		let producer = ffi::parse_id(producer)?;
335		State::lock().audio.publish_close(producer)
336	})
337}
338
339/// Subscribe to an audio track and decode it into PCM.
340///
341/// The catalog `index` identifies which audio rendition to subscribe
342/// to, matching the existing `moq_consume_audio_ordered` selection
343/// model. TODO: a future API will pick the right rendition
344/// automatically (ABR).
345///
346/// Returns a non-zero handle on success or a negative error code.
347///
348/// # Safety
349/// - `output` must point to a valid [`moq_audio_decoder_output`].
350/// - `on_frame` must remain valid until [`moq_consume_audio_raw_close`] is called.
351#[unsafe(no_mangle)]
352pub unsafe extern "C" fn moq_consume_audio_raw(
353	catalog: u32,
354	index: u32,
355	output: *const moq_audio_decoder_output,
356	on_frame: Option<extern "C" fn(user_data: *mut c_void, frame: i32)>,
357	user_data: *mut c_void,
358) -> i32 {
359	ffi::enter(move || {
360		let catalog = ffi::parse_id(catalog)?;
361		let raw = unsafe { output.as_ref() }.ok_or(Error::InvalidPointer)?;
362
363		let decoder_output = moq_audio::DecoderOutput {
364			format: audio_format_from_u32(raw.format)?,
365			sample_rate: if raw.sample_rate == 0 {
366				None
367			} else {
368				Some(raw.sample_rate)
369			},
370			channels: if raw.channels == 0 { None } else { Some(raw.channels) },
371			latency_max: if raw.latency_max_ms == 0 {
372				None
373			} else {
374				Some(Duration::from_millis(raw.latency_max_ms))
375			},
376		};
377		let on_frame = unsafe { OnStatus::new(user_data, on_frame) };
378
379		let mut state = State::lock();
380		let (broadcast, audio_cfg, name) = state.consume.audio_rendition(catalog, index as usize)?;
381
382		let State { audio, .. } = &mut *state;
383		audio.consume(&broadcast, &audio_cfg, &name, decoder_output, on_frame)
384	})
385}
386
387/// Stop consuming an audio track and cancel its background task.
388///
389/// Does *not* free any frame IDs already delivered to the on-frame
390/// callback. Each one must be released explicitly with
391/// [`moq_consume_audio_raw_frame_free`].
392#[unsafe(no_mangle)]
393pub extern "C" fn moq_consume_audio_raw_close(consumer: u32) -> i32 {
394	ffi::enter(move || {
395		let consumer = ffi::parse_id(consumer)?;
396		State::lock().audio.consume_close(consumer)
397	})
398}
399
400/// Copy a delivered frame's metadata into `dst`.
401///
402/// The written `dst->data` pointer remains valid until the same `id`
403/// is released with [`moq_consume_audio_raw_frame_free`].
404///
405/// # Safety
406/// - `dst` must point to a writable [`moq_audio_frame`].
407#[unsafe(no_mangle)]
408pub unsafe extern "C" fn moq_consume_audio_raw_frame(id: u32, dst: *mut moq_audio_frame) -> i32 {
409	ffi::enter(move || {
410		let id = ffi::parse_id(id)?;
411		let dst = unsafe { dst.as_mut() }.ok_or(Error::InvalidPointer)?;
412		State::lock().audio.frame_info(id, dst)
413	})
414}
415
416/// Free a frame previously delivered through the consume callback.
417/// Required for every delivered frame ID; closing the parent consumer
418/// is not enough.
419#[unsafe(no_mangle)]
420pub extern "C" fn moq_consume_audio_raw_frame_free(id: u32) -> i32 {
421	ffi::enter(move || {
422		let id = ffi::parse_id(id)?;
423		State::lock().audio.frame_free(id)
424	})
425}