1use std::ffi::{c_char, c_void};
15use std::time::Duration;
16
17use bytes::Bytes;
18use tokio::sync::oneshot;
19
20use crate::ffi::OnStatus;
21use crate::{Error, Id, NonZeroSlab, State, ffi};
22
23#[repr(C)]
34#[allow(non_camel_case_types)]
35#[derive(Clone, Copy, Debug)]
36pub enum moq_audio_format {
37 MOQ_AUDIO_FORMAT_U8 = 0,
38 MOQ_AUDIO_FORMAT_S16 = 1,
39 MOQ_AUDIO_FORMAT_S32 = 2,
40 MOQ_AUDIO_FORMAT_F32 = 3,
41 MOQ_AUDIO_FORMAT_U8_PLANAR = 4,
42 MOQ_AUDIO_FORMAT_S16_PLANAR = 5,
43 MOQ_AUDIO_FORMAT_S32_PLANAR = 6,
44 MOQ_AUDIO_FORMAT_F32_PLANAR = 7,
45}
46
47fn audio_format_from_u32(value: u32) -> Result<moq_audio::AudioFormat, Error> {
48 use moq_audio::AudioFormat;
49 Ok(match value {
50 v if v == moq_audio_format::MOQ_AUDIO_FORMAT_U8 as u32 => AudioFormat::U8,
51 v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S16 as u32 => AudioFormat::S16,
52 v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S32 as u32 => AudioFormat::S32,
53 v if v == moq_audio_format::MOQ_AUDIO_FORMAT_F32 as u32 => AudioFormat::F32,
54 v if v == moq_audio_format::MOQ_AUDIO_FORMAT_U8_PLANAR as u32 => AudioFormat::U8Planar,
55 v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S16_PLANAR as u32 => AudioFormat::S16Planar,
56 v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S32_PLANAR as u32 => AudioFormat::S32Planar,
57 v if v == moq_audio_format::MOQ_AUDIO_FORMAT_F32_PLANAR as u32 => AudioFormat::F32Planar,
58 _ => return Err(Error::InvalidCode),
59 })
60}
61
62#[repr(C)]
64#[allow(non_camel_case_types)]
65pub struct moq_audio_encoder_input {
66 pub format: u32,
68 pub sample_rate: u32,
69 pub channels: u32,
70}
71
72#[repr(C)]
76#[allow(non_camel_case_types)]
77pub struct moq_audio_encoder_output {
78 pub codec: *const c_char,
80 pub codec_len: usize,
81 pub sample_rate: u32,
83 pub channels: u32,
85 pub bitrate: u32,
87 pub frame_duration_ms: u32,
91}
92
93#[repr(C)]
95#[allow(non_camel_case_types)]
96pub struct moq_audio_decoder_output {
97 pub format: u32,
98 pub sample_rate: u32,
100 pub channels: u32,
102 pub latency_max_ms: u64,
110}
111
112#[repr(C)]
118#[allow(non_camel_case_types)]
119pub struct moq_audio_frame {
120 pub timestamp_us: u64,
121 pub data: *const u8,
122 pub data_size: usize,
123}
124
125#[derive(Default)]
128pub struct Audio {
129 producers: NonZeroSlab<moq_audio::AudioProducer>,
130 consumer_tasks: NonZeroSlab<Option<AudioTaskEntry>>,
131 frames: NonZeroSlab<moq_audio::Frame>,
132}
133
134struct AudioTaskEntry {
140 close: Option<oneshot::Sender<()>>,
141 callback: OnStatus,
142}
143
144impl Audio {
145 pub fn publish(
146 &mut self,
147 broadcast: &mut moq_net::BroadcastProducer,
148 catalog: moq_mux::catalog::hang::Producer,
149 name: &str,
150 input: moq_audio::EncoderInput,
151 output: moq_audio::EncoderOutput,
152 ) -> Result<Id, Error> {
153 let producer = moq_audio::AudioProducer::new(broadcast, catalog, name, input, output)?;
154 self.producers.insert(producer)
155 }
156
157 pub fn publish_frame(&mut self, id: Id, frame: moq_audio::Frame) -> Result<(), Error> {
158 let producer = self.producers.get_mut(id).ok_or(Error::MediaNotFound)?;
159 producer.write(&frame)?;
160 Ok(())
161 }
162
163 pub fn publish_close(&mut self, id: Id) -> Result<(), Error> {
164 let producer = self.producers.remove(id).ok_or(Error::MediaNotFound)?;
165 producer.finish()?;
166 Ok(())
167 }
168
169 pub fn consume(
170 &mut self,
171 broadcast: &moq_net::BroadcastConsumer,
172 catalog: &hang::catalog::AudioConfig,
173 name: &str,
174 output: moq_audio::DecoderOutput,
175 on_frame: OnStatus,
176 ) -> Result<Id, Error> {
177 let consumer = moq_audio::AudioConsumer::new(broadcast, catalog, name, output)?;
178
179 let channel = oneshot::channel();
180 let entry = AudioTaskEntry {
181 close: Some(channel.0),
182 callback: on_frame,
183 };
184 let id = self.consumer_tasks.insert(Some(entry))?;
185
186 tokio::spawn(async move {
187 let res = Self::run(on_frame, consumer, channel.1).await;
188
189 let entry = State::lock().audio.consumer_tasks.remove(id).flatten();
192 if let Some(entry) = entry {
193 entry.callback.call(res);
194 }
195 });
196
197 Ok(id)
198 }
199
200 async fn run(
201 callback: OnStatus,
202 mut consumer: moq_audio::AudioConsumer,
203 mut close: oneshot::Receiver<()>,
204 ) -> Result<(), Error> {
205 loop {
206 let frame = tokio::select! {
208 biased;
209 _ = &mut close => return Ok(()),
210 frame = consumer.read() => match frame? {
211 Some(frame) => frame,
212 None => return Ok(()),
213 },
214 };
215
216 let frame_id = State::lock().audio.frames.insert(frame)?;
218 callback.call(Ok(frame_id));
219 }
220 }
221
222 pub fn consume_close(&mut self, id: Id) -> Result<(), Error> {
223 self.consumer_tasks
225 .get_mut(id)
226 .and_then(|entry| entry.as_mut())
227 .ok_or(Error::TrackNotFound)?
228 .close
229 .take()
230 .ok_or(Error::TrackNotFound)?;
231 Ok(())
232 }
233
234 pub fn frame_info(&self, id: Id, dst: &mut moq_audio_frame) -> Result<(), Error> {
235 let frame = self.frames.get(id).ok_or(Error::FrameNotFound)?;
236 *dst = moq_audio_frame {
237 timestamp_us: frame.timestamp_us,
238 data: frame.data.as_ptr(),
239 data_size: frame.data.len(),
240 };
241 Ok(())
242 }
243
244 pub fn frame_free(&mut self, id: Id) -> Result<(), Error> {
245 self.frames.remove(id).ok_or(Error::FrameNotFound)?;
246 Ok(())
247 }
248}
249
250#[unsafe(no_mangle)]
265pub unsafe extern "C" fn moq_publish_audio_raw(
266 broadcast: u32,
267 name: *const c_char,
268 name_len: usize,
269 input: *const moq_audio_encoder_input,
270 output: *const moq_audio_encoder_output,
271) -> i32 {
272 ffi::enter(move || {
273 let broadcast = ffi::parse_id(broadcast)?;
274 let name = unsafe { ffi::parse_str(name, name_len)? }.to_string();
275 let raw_input = unsafe { input.as_ref() }.ok_or(Error::InvalidPointer)?;
276 let raw_output = unsafe { output.as_ref() }.ok_or(Error::InvalidPointer)?;
277 let codec_str = unsafe { ffi::parse_str(raw_output.codec, raw_output.codec_len)? };
278
279 let encoder_input = moq_audio::EncoderInput {
280 format: audio_format_from_u32(raw_input.format)?,
281 sample_rate: raw_input.sample_rate,
282 channels: raw_input.channels,
283 };
284 let encoder_output = moq_audio::EncoderOutput {
285 codec: codec_str
286 .parse()
287 .map_err(|_| Error::UnknownFormat(codec_str.to_string()))?,
288 sample_rate: if raw_output.sample_rate == 0 {
289 None
290 } else {
291 Some(raw_output.sample_rate)
292 },
293 channels: if raw_output.channels == 0 {
294 None
295 } else {
296 Some(raw_output.channels)
297 },
298 bitrate: if raw_output.bitrate == 0 {
299 None
300 } else {
301 Some(raw_output.bitrate)
302 },
303 frame_duration: Duration::from_millis(raw_output.frame_duration_ms.into()),
304 };
305
306 let mut state = State::lock();
307 let State { publish, audio, .. } = &mut *state;
308 let (broadcast_producer, catalog) = publish.pair_mut(broadcast)?;
309
310 audio.publish(
311 broadcast_producer,
312 catalog.clone(),
313 &name,
314 encoder_input,
315 encoder_output,
316 )
317 })
318}
319
320#[unsafe(no_mangle)]
329pub unsafe extern "C" fn moq_publish_audio_raw_frame(producer: u32, frame: *const moq_audio_frame) -> i32 {
330 ffi::enter(move || {
331 let producer = ffi::parse_id(producer)?;
332 let frame = unsafe { frame.as_ref() }.ok_or(Error::InvalidPointer)?;
333 let data = unsafe { ffi::parse_slice(frame.data, frame.data_size)? };
334
335 let owned = moq_audio::Frame {
336 timestamp_us: frame.timestamp_us,
337 data: Bytes::copy_from_slice(data),
338 };
339
340 State::lock().audio.publish_frame(producer, owned)
341 })
342}
343
344#[unsafe(no_mangle)]
346pub extern "C" fn moq_publish_audio_raw_close(producer: u32) -> i32 {
347 ffi::enter(move || {
348 let producer = ffi::parse_id(producer)?;
349 State::lock().audio.publish_close(producer)
350 })
351}
352
353#[unsafe(no_mangle)]
372pub unsafe extern "C" fn moq_consume_audio_raw(
373 catalog: u32,
374 index: u32,
375 output: *const moq_audio_decoder_output,
376 on_frame: Option<extern "C" fn(user_data: *mut c_void, frame: i32)>,
377 user_data: *mut c_void,
378) -> i32 {
379 ffi::enter(move || {
380 let catalog = ffi::parse_id(catalog)?;
381 let raw = unsafe { output.as_ref() }.ok_or(Error::InvalidPointer)?;
382
383 let decoder_output = moq_audio::DecoderOutput {
384 format: audio_format_from_u32(raw.format)?,
385 sample_rate: if raw.sample_rate == 0 {
386 None
387 } else {
388 Some(raw.sample_rate)
389 },
390 channels: if raw.channels == 0 { None } else { Some(raw.channels) },
391 latency_max: if raw.latency_max_ms == 0 {
392 None
393 } else {
394 Some(Duration::from_millis(raw.latency_max_ms))
395 },
396 };
397 let on_frame = unsafe { OnStatus::new(user_data, on_frame) };
398
399 let mut state = State::lock();
400 let (broadcast, audio_cfg, name) = state.consume.audio_rendition(catalog, index as usize)?;
401
402 let State { audio, .. } = &mut *state;
403 audio.consume(&broadcast, &audio_cfg, &name, decoder_output, on_frame)
404 })
405}
406
407#[unsafe(no_mangle)]
415pub extern "C" fn moq_consume_audio_raw_close(consumer: u32) -> i32 {
416 ffi::enter(move || {
417 let consumer = ffi::parse_id(consumer)?;
418 State::lock().audio.consume_close(consumer)
419 })
420}
421
422#[unsafe(no_mangle)]
430pub unsafe extern "C" fn moq_consume_audio_raw_frame(id: u32, dst: *mut moq_audio_frame) -> i32 {
431 ffi::enter(move || {
432 let id = ffi::parse_id(id)?;
433 let dst = unsafe { dst.as_mut() }.ok_or(Error::InvalidPointer)?;
434 State::lock().audio.frame_info(id, dst)
435 })
436}
437
438#[unsafe(no_mangle)]
442pub extern "C" fn moq_consume_audio_raw_frame_free(id: u32) -> i32 {
443 ffi::enter(move || {
444 let id = ffi::parse_id(id)?;
445 State::lock().audio.frame_free(id)
446 })
447}