Skip to main content

moq/
audio.rs

1//! Raw-audio import/export via [`moq_audio`].
2//!
3//! Sibling to `moq_publish_media_*` / `moq_consume_audio_ordered`
4//! (those handle already-encoded frames). These functions accept and
5//! return raw PCM, with Opus encode/decode happening inside the FFI
6//! boundary.
7//!
8//! Format / sample rate / channel count are fixed at producer or
9//! consumer construction via [`moq_audio_encoder_input`] /
10//! [`moq_audio_encoder_output`] / [`moq_audio_decoder_output`], so
11//! each [`moq_audio_frame`] carries only payload bytes and a
12//! timestamp.
13
14use std::ffi::{c_char, c_void};
15use std::time::Duration;
16
17use bytes::Bytes;
18use tokio::sync::oneshot;
19
20use crate::ffi::OnStatus;
21use crate::{Error, Id, NonZeroSlab, State, ffi};
22
23// ---- C-visible types ----
24
25/// Raw PCM sample layout, mirroring WebCodecs `AudioData.format`.
26///
27/// The enum is exposed in the C header for readability, but ABI
28/// fields/parameters that carry it are typed `u32`. A C caller
29/// passing an unknown discriminant gets `Error::InvalidCode` instead
30/// of UB.
31///
32/// <https://developer.mozilla.org/en-US/docs/Web/API/AudioData/format>
33#[repr(C)]
34#[allow(non_camel_case_types)]
35#[derive(Clone, Copy, Debug)]
36pub enum moq_audio_format {
37	MOQ_AUDIO_FORMAT_U8 = 0,
38	MOQ_AUDIO_FORMAT_S16 = 1,
39	MOQ_AUDIO_FORMAT_S32 = 2,
40	MOQ_AUDIO_FORMAT_F32 = 3,
41	MOQ_AUDIO_FORMAT_U8_PLANAR = 4,
42	MOQ_AUDIO_FORMAT_S16_PLANAR = 5,
43	MOQ_AUDIO_FORMAT_S32_PLANAR = 6,
44	MOQ_AUDIO_FORMAT_F32_PLANAR = 7,
45}
46
47fn audio_format_from_u32(value: u32) -> Result<moq_audio::AudioFormat, Error> {
48	use moq_audio::AudioFormat;
49	Ok(match value {
50		v if v == moq_audio_format::MOQ_AUDIO_FORMAT_U8 as u32 => AudioFormat::U8,
51		v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S16 as u32 => AudioFormat::S16,
52		v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S32 as u32 => AudioFormat::S32,
53		v if v == moq_audio_format::MOQ_AUDIO_FORMAT_F32 as u32 => AudioFormat::F32,
54		v if v == moq_audio_format::MOQ_AUDIO_FORMAT_U8_PLANAR as u32 => AudioFormat::U8Planar,
55		v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S16_PLANAR as u32 => AudioFormat::S16Planar,
56		v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S32_PLANAR as u32 => AudioFormat::S32Planar,
57		v if v == moq_audio_format::MOQ_AUDIO_FORMAT_F32_PLANAR as u32 => AudioFormat::F32Planar,
58		_ => return Err(Error::InvalidCode),
59	})
60}
61
62/// PCM layout the caller hands to [`moq_publish_audio_raw_frame`].
63#[repr(C)]
64#[allow(non_camel_case_types)]
65pub struct moq_audio_encoder_input {
66	/// `moq_audio_format` discriminant.
67	pub format: u32,
68	pub sample_rate: u32,
69	pub channels: u32,
70}
71
72/// Codec-side configuration. `sample_rate` / `channels` = 0 means
73/// "match the input (snapping the rate up to a libopus-supported
74/// value if necessary)".
75#[repr(C)]
76#[allow(non_camel_case_types)]
77pub struct moq_audio_encoder_output {
78	/// Codec id, UTF-8 (currently only "opus").
79	pub codec: *const c_char,
80	pub codec_len: usize,
81	/// 0 = derive from input.
82	pub sample_rate: u32,
83	/// 0 = derive from input.
84	pub channels: u32,
85	/// 0 = libopus default.
86	pub bitrate: u32,
87	/// Encoded frame duration in milliseconds. Opus accepts
88	/// 2.5/5/10/20/40/60 ms; pass 20 to match the JS publish path.
89	/// (For 2.5 ms, the caller must pre-round; integer ms only.)
90	pub frame_duration_ms: u32,
91}
92
93/// PCM layout the caller wants out of [`moq_consume_audio_raw`].
94#[repr(C)]
95#[allow(non_camel_case_types)]
96pub struct moq_audio_decoder_output {
97	pub format: u32,
98	/// 0 = deliver at the codec's native sample rate.
99	pub sample_rate: u32,
100	/// 0 = deliver at the codec's native channel count.
101	pub channels: u32,
102	/// Upper bound on buffering before skipping a stalled group, in
103	/// milliseconds. Same congestion-control knob as
104	/// `moq_consume_audio_ordered`'s `max_latency_ms`. 0 = skip
105	/// aggressively (the moq-mux default); set to your playout
106	/// buffer (tens to a few hundred ms) for a softer skip. Named
107	/// `_max` to leave room for a future `latency_min_ms`
108	/// (jitter-buffer floor).
109	pub latency_max_ms: u64,
110}
111
112/// One audio frame: payload bytes plus a presentation timestamp.
113///
114/// `data` is owned by the consume slab (see
115/// [`moq_consume_audio_raw_frame_free`]) or borrowed by the publish call
116/// (the publisher copies before returning).
117#[repr(C)]
118#[allow(non_camel_case_types)]
119pub struct moq_audio_frame {
120	pub timestamp_us: u64,
121	pub data: *const u8,
122	pub data_size: usize,
123}
124
125// ---- State extensions (used internally by lib.rs) ----
126
127#[derive(Default)]
128pub struct Audio {
129	producers: NonZeroSlab<moq_audio::AudioProducer>,
130	consumer_tasks: NonZeroSlab<Option<AudioTaskEntry>>,
131	frames: NonZeroSlab<moq_audio::Frame>,
132}
133
134/// A spawned task entry: `close` signals shutdown, `callback` delivers status.
135///
136/// `close` is an `Option` so `consume_close` can drop just the sender without
137/// removing the entry. The task delivers one final terminal callback and then
138/// removes itself, so `user_data` stays valid until that callback fires.
139struct AudioTaskEntry {
140	close: Option<oneshot::Sender<()>>,
141	callback: OnStatus,
142}
143
144impl Audio {
145	pub fn publish(
146		&mut self,
147		broadcast: &mut moq_net::BroadcastProducer,
148		catalog: moq_mux::catalog::Producer,
149		name: &str,
150		input: moq_audio::EncoderInput,
151		output: moq_audio::EncoderOutput,
152	) -> Result<Id, Error> {
153		let producer = moq_audio::AudioProducer::new(broadcast, catalog, name, input, output)?;
154		self.producers.insert(producer)
155	}
156
157	pub fn publish_frame(&mut self, id: Id, frame: moq_audio::Frame) -> Result<(), Error> {
158		let producer = self.producers.get_mut(id).ok_or(Error::MediaNotFound)?;
159		producer.write(&frame)?;
160		Ok(())
161	}
162
163	pub fn publish_close(&mut self, id: Id) -> Result<(), Error> {
164		let producer = self.producers.remove(id).ok_or(Error::MediaNotFound)?;
165		producer.finish()?;
166		Ok(())
167	}
168
169	pub fn consume(
170		&mut self,
171		broadcast: &moq_net::BroadcastConsumer,
172		catalog: &hang::catalog::AudioConfig,
173		name: &str,
174		output: moq_audio::DecoderOutput,
175		on_frame: OnStatus,
176	) -> Result<Id, Error> {
177		let consumer = moq_audio::AudioConsumer::new(broadcast, catalog, name, output)?;
178
179		let channel = oneshot::channel();
180		let entry = AudioTaskEntry {
181			close: Some(channel.0),
182			callback: on_frame,
183		};
184		let id = self.consumer_tasks.insert(Some(entry))?;
185
186		tokio::spawn(async move {
187			let res = Self::run(on_frame, consumer, channel.1).await;
188
189			// Deliver one final terminal callback (code <= 0), then drop the entry.
190			// Pull it out from under the lock so the callback never runs while held.
191			let entry = State::lock().audio.consumer_tasks.remove(id).flatten();
192			if let Some(entry) = entry {
193				entry.callback.call(res);
194			}
195		});
196
197		Ok(id)
198	}
199
200	async fn run(
201		callback: OnStatus,
202		mut consumer: moq_audio::AudioConsumer,
203		mut close: oneshot::Receiver<()>,
204	) -> Result<(), Error> {
205		loop {
206			// `biased` so a pending close always wins over a ready frame.
207			let frame = tokio::select! {
208				biased;
209				_ = &mut close => return Ok(()),
210				frame = consumer.read() => match frame? {
211					Some(frame) => frame,
212					None => return Ok(()),
213				},
214			};
215
216			// Hold the lock only to buffer the frame; release it before the callback.
217			let frame_id = State::lock().audio.frames.insert(frame)?;
218			callback.call(Ok(frame_id));
219		}
220	}
221
222	pub fn consume_close(&mut self, id: Id) -> Result<(), Error> {
223		// Signal shutdown; the task delivers a final callback and removes itself.
224		self.consumer_tasks
225			.get_mut(id)
226			.and_then(|entry| entry.as_mut())
227			.ok_or(Error::TrackNotFound)?
228			.close
229			.take()
230			.ok_or(Error::TrackNotFound)?;
231		Ok(())
232	}
233
234	pub fn frame_info(&self, id: Id, dst: &mut moq_audio_frame) -> Result<(), Error> {
235		let frame = self.frames.get(id).ok_or(Error::FrameNotFound)?;
236		*dst = moq_audio_frame {
237			timestamp_us: frame.timestamp_us,
238			data: frame.data.as_ptr(),
239			data_size: frame.data.len(),
240		};
241		Ok(())
242	}
243
244	pub fn frame_free(&mut self, id: Id) -> Result<(), Error> {
245		self.frames.remove(id).ok_or(Error::FrameNotFound)?;
246		Ok(())
247	}
248}
249
250// ---- C entry points ----
251
252/// Open an audio track on a broadcast.
253///
254/// The encoder configuration is fixed at construction; subsequent
255/// frame writes pass only payload + timestamp via
256/// [`moq_publish_audio_raw_frame`].
257///
258/// Returns a non-zero handle on success or a negative error code.
259///
260/// # Safety
261/// - `name` must point to `name_len` bytes of UTF-8.
262/// - `input` / `output` must point to fully populated structs.
263/// - `output->codec` must point to `output->codec_len` bytes of UTF-8.
264#[unsafe(no_mangle)]
265pub unsafe extern "C" fn moq_publish_audio_raw(
266	broadcast: u32,
267	name: *const c_char,
268	name_len: usize,
269	input: *const moq_audio_encoder_input,
270	output: *const moq_audio_encoder_output,
271) -> i32 {
272	ffi::enter(move || {
273		let broadcast = ffi::parse_id(broadcast)?;
274		let name = unsafe { ffi::parse_str(name, name_len)? }.to_string();
275		let raw_input = unsafe { input.as_ref() }.ok_or(Error::InvalidPointer)?;
276		let raw_output = unsafe { output.as_ref() }.ok_or(Error::InvalidPointer)?;
277		let codec_str = unsafe { ffi::parse_str(raw_output.codec, raw_output.codec_len)? };
278
279		let encoder_input = moq_audio::EncoderInput {
280			format: audio_format_from_u32(raw_input.format)?,
281			sample_rate: raw_input.sample_rate,
282			channels: raw_input.channels,
283		};
284		let encoder_output = moq_audio::EncoderOutput {
285			codec: codec_str
286				.parse()
287				.map_err(|_| Error::UnknownFormat(codec_str.to_string()))?,
288			sample_rate: if raw_output.sample_rate == 0 {
289				None
290			} else {
291				Some(raw_output.sample_rate)
292			},
293			channels: if raw_output.channels == 0 {
294				None
295			} else {
296				Some(raw_output.channels)
297			},
298			bitrate: if raw_output.bitrate == 0 {
299				None
300			} else {
301				Some(raw_output.bitrate)
302			},
303			frame_duration: Duration::from_millis(raw_output.frame_duration_ms.into()),
304		};
305
306		let mut state = State::lock();
307		let State { publish, audio, .. } = &mut *state;
308		let (broadcast_producer, catalog) = publish.pair_mut(broadcast)?;
309
310		audio.publish(
311			broadcast_producer,
312			catalog.clone(),
313			&name,
314			encoder_input,
315			encoder_output,
316		)
317	})
318}
319
320/// Push one audio frame.
321///
322/// `frame->data` is borrowed for the duration of the call; the
323/// producer copies before returning.
324///
325/// # Safety
326/// - `frame` must point to a valid [`moq_audio_frame`].
327/// - `frame->data` must point to `frame->data_size` bytes.
328#[unsafe(no_mangle)]
329pub unsafe extern "C" fn moq_publish_audio_raw_frame(producer: u32, frame: *const moq_audio_frame) -> i32 {
330	ffi::enter(move || {
331		let producer = ffi::parse_id(producer)?;
332		let frame = unsafe { frame.as_ref() }.ok_or(Error::InvalidPointer)?;
333		let data = unsafe { ffi::parse_slice(frame.data, frame.data_size)? };
334
335		let owned = moq_audio::Frame {
336			timestamp_us: frame.timestamp_us,
337			data: Bytes::copy_from_slice(data),
338		};
339
340		State::lock().audio.publish_frame(producer, owned)
341	})
342}
343
344/// Flush any pending samples and finalize an audio producer.
345#[unsafe(no_mangle)]
346pub extern "C" fn moq_publish_audio_raw_close(producer: u32) -> i32 {
347	ffi::enter(move || {
348		let producer = ffi::parse_id(producer)?;
349		State::lock().audio.publish_close(producer)
350	})
351}
352
353/// Subscribe to an audio track and decode it into PCM.
354///
355/// The catalog `index` identifies which audio rendition to subscribe
356/// to, matching the existing `moq_consume_audio_ordered` selection
357/// model. TODO: a future API will pick the right rendition
358/// automatically (ABR).
359///
360/// Returns a non-zero handle on success or a negative error code.
361///
362/// `on_frame` is called with a positive frame ID per frame, then exactly once
363/// more with a terminal code: `0` (closed cleanly) or a negative error. After
364/// the terminal (`<= 0`) callback, `on_frame` is never called again and
365/// `user_data` is never touched again, so release `user_data` there. The
366/// terminal callback fires even after [`moq_consume_audio_raw_close`].
367///
368/// # Safety
369/// - `output` must point to a valid [`moq_audio_decoder_output`].
370/// - `user_data` must stay valid until the terminal (`<= 0`) `on_frame` callback.
371#[unsafe(no_mangle)]
372pub unsafe extern "C" fn moq_consume_audio_raw(
373	catalog: u32,
374	index: u32,
375	output: *const moq_audio_decoder_output,
376	on_frame: Option<extern "C" fn(user_data: *mut c_void, frame: i32)>,
377	user_data: *mut c_void,
378) -> i32 {
379	ffi::enter(move || {
380		let catalog = ffi::parse_id(catalog)?;
381		let raw = unsafe { output.as_ref() }.ok_or(Error::InvalidPointer)?;
382
383		let decoder_output = moq_audio::DecoderOutput {
384			format: audio_format_from_u32(raw.format)?,
385			sample_rate: if raw.sample_rate == 0 {
386				None
387			} else {
388				Some(raw.sample_rate)
389			},
390			channels: if raw.channels == 0 { None } else { Some(raw.channels) },
391			latency_max: if raw.latency_max_ms == 0 {
392				None
393			} else {
394				Some(Duration::from_millis(raw.latency_max_ms))
395			},
396		};
397		let on_frame = unsafe { OnStatus::new(user_data, on_frame) };
398
399		let mut state = State::lock();
400		let (broadcast, audio_cfg, name) = state.consume.audio_rendition(catalog, index as usize)?;
401
402		let State { audio, .. } = &mut *state;
403		audio.consume(&broadcast, &audio_cfg, &name, decoder_output, on_frame)
404	})
405}
406
407/// Stop an audio (raw PCM) consumer's background task.
408///
409/// Returns immediately: zero on success, or a negative code if already closed.
410/// Does NOT free `user_data`; the on-frame callback still fires once more with a
411/// terminal `0` (or a negative error), which is where `user_data` should be
412/// released. Frame IDs already delivered to the callback are likewise not freed;
413/// release each with [`moq_consume_audio_raw_frame_free`].
414#[unsafe(no_mangle)]
415pub extern "C" fn moq_consume_audio_raw_close(consumer: u32) -> i32 {
416	ffi::enter(move || {
417		let consumer = ffi::parse_id(consumer)?;
418		State::lock().audio.consume_close(consumer)
419	})
420}
421
422/// Copy a delivered frame's metadata into `dst`.
423///
424/// The written `dst->data` pointer remains valid until the same `id`
425/// is released with [`moq_consume_audio_raw_frame_free`].
426///
427/// # Safety
428/// - `dst` must point to a writable [`moq_audio_frame`].
429#[unsafe(no_mangle)]
430pub unsafe extern "C" fn moq_consume_audio_raw_frame(id: u32, dst: *mut moq_audio_frame) -> i32 {
431	ffi::enter(move || {
432		let id = ffi::parse_id(id)?;
433		let dst = unsafe { dst.as_mut() }.ok_or(Error::InvalidPointer)?;
434		State::lock().audio.frame_info(id, dst)
435	})
436}
437
438/// Free a frame previously delivered through the consume callback.
439/// Required for every delivered frame ID; closing the parent consumer
440/// is not enough.
441#[unsafe(no_mangle)]
442pub extern "C" fn moq_consume_audio_raw_frame_free(id: u32) -> i32 {
443	ffi::enter(move || {
444		let id = ffi::parse_id(id)?;
445		State::lock().audio.frame_free(id)
446	})
447}