1use std::ffi::{c_char, c_void};
15use std::time::Duration;
16
17use bytes::Bytes;
18use tokio::sync::oneshot;
19
20use crate::ffi::OnStatus;
21use crate::{Error, Id, NonZeroSlab, State, ffi};
22
23#[repr(C)]
34#[allow(non_camel_case_types)]
35#[derive(Clone, Copy, Debug)]
36pub enum moq_audio_format {
37 MOQ_AUDIO_FORMAT_U8 = 0,
38 MOQ_AUDIO_FORMAT_S16 = 1,
39 MOQ_AUDIO_FORMAT_S32 = 2,
40 MOQ_AUDIO_FORMAT_F32 = 3,
41 MOQ_AUDIO_FORMAT_U8_PLANAR = 4,
42 MOQ_AUDIO_FORMAT_S16_PLANAR = 5,
43 MOQ_AUDIO_FORMAT_S32_PLANAR = 6,
44 MOQ_AUDIO_FORMAT_F32_PLANAR = 7,
45}
46
47fn audio_format_from_u32(value: u32) -> Result<moq_audio::AudioFormat, Error> {
48 use moq_audio::AudioFormat;
49 Ok(match value {
50 v if v == moq_audio_format::MOQ_AUDIO_FORMAT_U8 as u32 => AudioFormat::U8,
51 v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S16 as u32 => AudioFormat::S16,
52 v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S32 as u32 => AudioFormat::S32,
53 v if v == moq_audio_format::MOQ_AUDIO_FORMAT_F32 as u32 => AudioFormat::F32,
54 v if v == moq_audio_format::MOQ_AUDIO_FORMAT_U8_PLANAR as u32 => AudioFormat::U8Planar,
55 v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S16_PLANAR as u32 => AudioFormat::S16Planar,
56 v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S32_PLANAR as u32 => AudioFormat::S32Planar,
57 v if v == moq_audio_format::MOQ_AUDIO_FORMAT_F32_PLANAR as u32 => AudioFormat::F32Planar,
58 _ => return Err(Error::InvalidCode),
59 })
60}
61
62#[repr(C)]
64#[allow(non_camel_case_types)]
65pub struct moq_audio_encoder_input {
66 pub format: u32,
68 pub sample_rate: u32,
69 pub channels: u32,
70}
71
72#[repr(C)]
76#[allow(non_camel_case_types)]
77pub struct moq_audio_encoder_output {
78 pub codec: *const c_char,
80 pub codec_len: usize,
81 pub sample_rate: u32,
83 pub channels: u32,
85 pub bitrate: u32,
87 pub frame_duration_ms: u32,
91}
92
93#[repr(C)]
95#[allow(non_camel_case_types)]
96pub struct moq_audio_decoder_output {
97 pub format: u32,
98 pub sample_rate: u32,
100 pub channels: u32,
102 pub latency_max_ms: u64,
110}
111
112#[repr(C)]
118#[allow(non_camel_case_types)]
119pub struct moq_audio_frame {
120 pub timestamp_us: u64,
121 pub data: *const u8,
122 pub data_size: usize,
123}
124
125#[derive(Default)]
128pub struct Audio {
129 producers: NonZeroSlab<moq_audio::AudioProducer>,
130 consumer_tasks: NonZeroSlab<Option<AudioTaskEntry>>,
131 frames: NonZeroSlab<moq_audio::Frame>,
132}
133
134struct AudioTaskEntry {
135 #[allow(dead_code)] close: oneshot::Sender<()>,
137 callback: OnStatus,
138}
139
140impl Audio {
141 pub fn publish(
142 &mut self,
143 broadcast: &mut moq_net::BroadcastProducer,
144 catalog: moq_mux::catalog::hang::Producer,
145 name: &str,
146 input: moq_audio::EncoderInput,
147 output: moq_audio::EncoderOutput,
148 ) -> Result<Id, Error> {
149 let producer = moq_audio::AudioProducer::new(broadcast, catalog, name, input, output)?;
150 self.producers.insert(producer)
151 }
152
153 pub fn publish_frame(&mut self, id: Id, frame: moq_audio::Frame) -> Result<(), Error> {
154 let producer = self.producers.get_mut(id).ok_or(Error::MediaNotFound)?;
155 producer.write(&frame)?;
156 Ok(())
157 }
158
159 pub fn publish_close(&mut self, id: Id) -> Result<(), Error> {
160 let producer = self.producers.remove(id).ok_or(Error::MediaNotFound)?;
161 producer.finish()?;
162 Ok(())
163 }
164
165 pub fn consume(
166 &mut self,
167 broadcast: &moq_net::BroadcastConsumer,
168 catalog: &hang::catalog::AudioConfig,
169 name: &str,
170 output: moq_audio::DecoderOutput,
171 on_frame: OnStatus,
172 ) -> Result<Id, Error> {
173 let consumer = moq_audio::AudioConsumer::new(broadcast, catalog, name, output)?;
174
175 let channel = oneshot::channel();
176 let entry = AudioTaskEntry {
177 close: channel.0,
178 callback: on_frame,
179 };
180 let id = self.consumer_tasks.insert(Some(entry))?;
181
182 tokio::spawn(async move {
183 let res = tokio::select! {
184 res = Self::run(id, consumer) => res,
185 _ = channel.1 => Ok(()),
186 };
187
188 if let Some(entry) = State::lock().audio.consumer_tasks.remove(id).flatten() {
189 entry.callback.call(res);
190 }
191 });
192
193 Ok(id)
194 }
195
196 async fn run(task_id: Id, mut consumer: moq_audio::AudioConsumer) -> Result<(), Error> {
197 while let Some(frame) = consumer.read().await? {
198 let mut state = State::lock();
199 let Some(Some(entry)) = state.audio.consumer_tasks.get(task_id) else {
200 return Ok(());
201 };
202 let callback = entry.callback;
203 let frame_id = state.audio.frames.insert(frame)?;
204 drop(state);
205
206 callback.call(Ok(frame_id));
207 }
208 Ok(())
209 }
210
211 pub fn consume_close(&mut self, id: Id) -> Result<(), Error> {
212 self.consumer_tasks
213 .get_mut(id)
214 .ok_or(Error::TrackNotFound)?
215 .take()
216 .ok_or(Error::TrackNotFound)?;
217 Ok(())
218 }
219
220 pub fn frame_info(&self, id: Id, dst: &mut moq_audio_frame) -> Result<(), Error> {
221 let frame = self.frames.get(id).ok_or(Error::FrameNotFound)?;
222 *dst = moq_audio_frame {
223 timestamp_us: frame.timestamp_us,
224 data: frame.data.as_ptr(),
225 data_size: frame.data.len(),
226 };
227 Ok(())
228 }
229
230 pub fn frame_free(&mut self, id: Id) -> Result<(), Error> {
231 self.frames.remove(id).ok_or(Error::FrameNotFound)?;
232 Ok(())
233 }
234}
235
236#[unsafe(no_mangle)]
251pub unsafe extern "C" fn moq_publish_audio_raw(
252 broadcast: u32,
253 name: *const c_char,
254 name_len: usize,
255 input: *const moq_audio_encoder_input,
256 output: *const moq_audio_encoder_output,
257) -> i32 {
258 ffi::enter(move || {
259 let broadcast = ffi::parse_id(broadcast)?;
260 let name = unsafe { ffi::parse_str(name, name_len)? }.to_string();
261 let raw_input = unsafe { input.as_ref() }.ok_or(Error::InvalidPointer)?;
262 let raw_output = unsafe { output.as_ref() }.ok_or(Error::InvalidPointer)?;
263 let codec_str = unsafe { ffi::parse_str(raw_output.codec, raw_output.codec_len)? };
264
265 let encoder_input = moq_audio::EncoderInput {
266 format: audio_format_from_u32(raw_input.format)?,
267 sample_rate: raw_input.sample_rate,
268 channels: raw_input.channels,
269 };
270 let encoder_output = moq_audio::EncoderOutput {
271 codec: codec_str
272 .parse()
273 .map_err(|_| Error::UnknownFormat(codec_str.to_string()))?,
274 sample_rate: if raw_output.sample_rate == 0 {
275 None
276 } else {
277 Some(raw_output.sample_rate)
278 },
279 channels: if raw_output.channels == 0 {
280 None
281 } else {
282 Some(raw_output.channels)
283 },
284 bitrate: if raw_output.bitrate == 0 {
285 None
286 } else {
287 Some(raw_output.bitrate)
288 },
289 frame_duration: Duration::from_millis(raw_output.frame_duration_ms.into()),
290 };
291
292 let mut state = State::lock();
293 let State { publish, audio, .. } = &mut *state;
294 let (broadcast_producer, catalog) = publish.pair_mut(broadcast)?;
295
296 audio.publish(
297 broadcast_producer,
298 catalog.clone(),
299 &name,
300 encoder_input,
301 encoder_output,
302 )
303 })
304}
305
306#[unsafe(no_mangle)]
315pub unsafe extern "C" fn moq_publish_audio_raw_frame(producer: u32, frame: *const moq_audio_frame) -> i32 {
316 ffi::enter(move || {
317 let producer = ffi::parse_id(producer)?;
318 let frame = unsafe { frame.as_ref() }.ok_or(Error::InvalidPointer)?;
319 let data = unsafe { ffi::parse_slice(frame.data, frame.data_size)? };
320
321 let owned = moq_audio::Frame {
322 timestamp_us: frame.timestamp_us,
323 data: Bytes::copy_from_slice(data),
324 };
325
326 State::lock().audio.publish_frame(producer, owned)
327 })
328}
329
330#[unsafe(no_mangle)]
332pub extern "C" fn moq_publish_audio_raw_close(producer: u32) -> i32 {
333 ffi::enter(move || {
334 let producer = ffi::parse_id(producer)?;
335 State::lock().audio.publish_close(producer)
336 })
337}
338
339#[unsafe(no_mangle)]
352pub unsafe extern "C" fn moq_consume_audio_raw(
353 catalog: u32,
354 index: u32,
355 output: *const moq_audio_decoder_output,
356 on_frame: Option<extern "C" fn(user_data: *mut c_void, frame: i32)>,
357 user_data: *mut c_void,
358) -> i32 {
359 ffi::enter(move || {
360 let catalog = ffi::parse_id(catalog)?;
361 let raw = unsafe { output.as_ref() }.ok_or(Error::InvalidPointer)?;
362
363 let decoder_output = moq_audio::DecoderOutput {
364 format: audio_format_from_u32(raw.format)?,
365 sample_rate: if raw.sample_rate == 0 {
366 None
367 } else {
368 Some(raw.sample_rate)
369 },
370 channels: if raw.channels == 0 { None } else { Some(raw.channels) },
371 latency_max: if raw.latency_max_ms == 0 {
372 None
373 } else {
374 Some(Duration::from_millis(raw.latency_max_ms))
375 },
376 };
377 let on_frame = unsafe { OnStatus::new(user_data, on_frame) };
378
379 let mut state = State::lock();
380 let (broadcast, audio_cfg, name) = state.consume.audio_rendition(catalog, index as usize)?;
381
382 let State { audio, .. } = &mut *state;
383 audio.consume(&broadcast, &audio_cfg, &name, decoder_output, on_frame)
384 })
385}
386
387#[unsafe(no_mangle)]
393pub extern "C" fn moq_consume_audio_raw_close(consumer: u32) -> i32 {
394 ffi::enter(move || {
395 let consumer = ffi::parse_id(consumer)?;
396 State::lock().audio.consume_close(consumer)
397 })
398}
399
400#[unsafe(no_mangle)]
408pub unsafe extern "C" fn moq_consume_audio_raw_frame(id: u32, dst: *mut moq_audio_frame) -> i32 {
409 ffi::enter(move || {
410 let id = ffi::parse_id(id)?;
411 let dst = unsafe { dst.as_mut() }.ok_or(Error::InvalidPointer)?;
412 State::lock().audio.frame_info(id, dst)
413 })
414}
415
416#[unsafe(no_mangle)]
420pub extern "C" fn moq_consume_audio_raw_frame_free(id: u32) -> i32 {
421 ffi::enter(move || {
422 let id = ffi::parse_id(id)?;
423 State::lock().audio.frame_free(id)
424 })
425}