use std::ffi::{c_char, c_void};
use std::time::Duration;
use bytes::Bytes;
use tokio::sync::oneshot;
use crate::ffi::OnStatus;
use crate::{Error, Id, NonZeroSlab, State, ffi};
#[repr(C)]
#[allow(non_camel_case_types)]
#[derive(Clone, Copy, Debug)]
pub enum moq_audio_format {
MOQ_AUDIO_FORMAT_U8 = 0,
MOQ_AUDIO_FORMAT_S16 = 1,
MOQ_AUDIO_FORMAT_S32 = 2,
MOQ_AUDIO_FORMAT_F32 = 3,
MOQ_AUDIO_FORMAT_U8_PLANAR = 4,
MOQ_AUDIO_FORMAT_S16_PLANAR = 5,
MOQ_AUDIO_FORMAT_S32_PLANAR = 6,
MOQ_AUDIO_FORMAT_F32_PLANAR = 7,
}
fn audio_format_from_u32(value: u32) -> Result<moq_audio::AudioFormat, Error> {
use moq_audio::AudioFormat;
Ok(match value {
v if v == moq_audio_format::MOQ_AUDIO_FORMAT_U8 as u32 => AudioFormat::U8,
v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S16 as u32 => AudioFormat::S16,
v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S32 as u32 => AudioFormat::S32,
v if v == moq_audio_format::MOQ_AUDIO_FORMAT_F32 as u32 => AudioFormat::F32,
v if v == moq_audio_format::MOQ_AUDIO_FORMAT_U8_PLANAR as u32 => AudioFormat::U8Planar,
v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S16_PLANAR as u32 => AudioFormat::S16Planar,
v if v == moq_audio_format::MOQ_AUDIO_FORMAT_S32_PLANAR as u32 => AudioFormat::S32Planar,
v if v == moq_audio_format::MOQ_AUDIO_FORMAT_F32_PLANAR as u32 => AudioFormat::F32Planar,
_ => return Err(Error::InvalidCode),
})
}
#[repr(C)]
#[allow(non_camel_case_types)]
pub struct moq_audio_encoder_input {
pub format: u32,
pub sample_rate: u32,
pub channels: u32,
}
#[repr(C)]
#[allow(non_camel_case_types)]
pub struct moq_audio_encoder_output {
pub codec: *const c_char,
pub codec_len: usize,
pub sample_rate: u32,
pub channels: u32,
pub bitrate: u32,
pub frame_duration_ms: u32,
}
#[repr(C)]
#[allow(non_camel_case_types)]
pub struct moq_audio_decoder_output {
pub format: u32,
pub sample_rate: u32,
pub channels: u32,
pub latency_max_ms: u64,
}
#[repr(C)]
#[allow(non_camel_case_types)]
pub struct moq_audio_frame {
pub timestamp_us: u64,
pub data: *const u8,
pub data_size: usize,
}
#[derive(Default)]
pub struct Audio {
producers: NonZeroSlab<moq_audio::AudioProducer>,
consumer_tasks: NonZeroSlab<Option<AudioTaskEntry>>,
frames: NonZeroSlab<moq_audio::Frame>,
}
struct AudioTaskEntry {
#[allow(dead_code)] close: oneshot::Sender<()>,
callback: OnStatus,
}
impl Audio {
pub fn publish(
&mut self,
broadcast: &mut moq_net::BroadcastProducer,
catalog: moq_mux::catalog::hang::Producer,
name: &str,
input: moq_audio::EncoderInput,
output: moq_audio::EncoderOutput,
) -> Result<Id, Error> {
let producer = moq_audio::AudioProducer::new(broadcast, catalog, name, input, output)?;
self.producers.insert(producer)
}
pub fn publish_frame(&mut self, id: Id, frame: moq_audio::Frame) -> Result<(), Error> {
let producer = self.producers.get_mut(id).ok_or(Error::MediaNotFound)?;
producer.write(&frame)?;
Ok(())
}
pub fn publish_close(&mut self, id: Id) -> Result<(), Error> {
let producer = self.producers.remove(id).ok_or(Error::MediaNotFound)?;
producer.finish()?;
Ok(())
}
pub fn consume(
&mut self,
broadcast: &moq_net::BroadcastConsumer,
catalog: &hang::catalog::AudioConfig,
name: &str,
output: moq_audio::DecoderOutput,
on_frame: OnStatus,
) -> Result<Id, Error> {
let consumer = moq_audio::AudioConsumer::new(broadcast, catalog, name, output)?;
let channel = oneshot::channel();
let entry = AudioTaskEntry {
close: channel.0,
callback: on_frame,
};
let id = self.consumer_tasks.insert(Some(entry))?;
tokio::spawn(async move {
let res = tokio::select! {
res = Self::run(id, consumer) => res,
_ = channel.1 => Ok(()),
};
if let Some(entry) = State::lock().audio.consumer_tasks.remove(id).flatten() {
entry.callback.call(res);
}
});
Ok(id)
}
async fn run(task_id: Id, mut consumer: moq_audio::AudioConsumer) -> Result<(), Error> {
while let Some(frame) = consumer.read().await? {
let mut state = State::lock();
let Some(Some(entry)) = state.audio.consumer_tasks.get(task_id) else {
return Ok(());
};
let callback = entry.callback;
let frame_id = state.audio.frames.insert(frame)?;
drop(state);
callback.call(Ok(frame_id));
}
Ok(())
}
pub fn consume_close(&mut self, id: Id) -> Result<(), Error> {
self.consumer_tasks
.get_mut(id)
.ok_or(Error::TrackNotFound)?
.take()
.ok_or(Error::TrackNotFound)?;
Ok(())
}
pub fn frame_info(&self, id: Id, dst: &mut moq_audio_frame) -> Result<(), Error> {
let frame = self.frames.get(id).ok_or(Error::FrameNotFound)?;
*dst = moq_audio_frame {
timestamp_us: frame.timestamp_us,
data: frame.data.as_ptr(),
data_size: frame.data.len(),
};
Ok(())
}
pub fn frame_free(&mut self, id: Id) -> Result<(), Error> {
self.frames.remove(id).ok_or(Error::FrameNotFound)?;
Ok(())
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn moq_publish_audio_raw(
broadcast: u32,
name: *const c_char,
name_len: usize,
input: *const moq_audio_encoder_input,
output: *const moq_audio_encoder_output,
) -> i32 {
ffi::enter(move || {
let broadcast = ffi::parse_id(broadcast)?;
let name = unsafe { ffi::parse_str(name, name_len)? }.to_string();
let raw_input = unsafe { input.as_ref() }.ok_or(Error::InvalidPointer)?;
let raw_output = unsafe { output.as_ref() }.ok_or(Error::InvalidPointer)?;
let codec_str = unsafe { ffi::parse_str(raw_output.codec, raw_output.codec_len)? };
let encoder_input = moq_audio::EncoderInput {
format: audio_format_from_u32(raw_input.format)?,
sample_rate: raw_input.sample_rate,
channels: raw_input.channels,
};
let encoder_output = moq_audio::EncoderOutput {
codec: codec_str
.parse()
.map_err(|_| Error::UnknownFormat(codec_str.to_string()))?,
sample_rate: if raw_output.sample_rate == 0 {
None
} else {
Some(raw_output.sample_rate)
},
channels: if raw_output.channels == 0 {
None
} else {
Some(raw_output.channels)
},
bitrate: if raw_output.bitrate == 0 {
None
} else {
Some(raw_output.bitrate)
},
frame_duration: Duration::from_millis(raw_output.frame_duration_ms.into()),
};
let mut state = State::lock();
let State { publish, audio, .. } = &mut *state;
let (broadcast_producer, catalog) = publish.pair_mut(broadcast)?;
audio.publish(
broadcast_producer,
catalog.clone(),
&name,
encoder_input,
encoder_output,
)
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn moq_publish_audio_raw_frame(producer: u32, frame: *const moq_audio_frame) -> i32 {
ffi::enter(move || {
let producer = ffi::parse_id(producer)?;
let frame = unsafe { frame.as_ref() }.ok_or(Error::InvalidPointer)?;
let data = unsafe { ffi::parse_slice(frame.data, frame.data_size)? };
let owned = moq_audio::Frame {
timestamp_us: frame.timestamp_us,
data: Bytes::copy_from_slice(data),
};
State::lock().audio.publish_frame(producer, owned)
})
}
#[unsafe(no_mangle)]
pub extern "C" fn moq_publish_audio_raw_close(producer: u32) -> i32 {
ffi::enter(move || {
let producer = ffi::parse_id(producer)?;
State::lock().audio.publish_close(producer)
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn moq_consume_audio_raw(
catalog: u32,
index: u32,
output: *const moq_audio_decoder_output,
on_frame: Option<extern "C" fn(user_data: *mut c_void, frame: i32)>,
user_data: *mut c_void,
) -> i32 {
ffi::enter(move || {
let catalog = ffi::parse_id(catalog)?;
let raw = unsafe { output.as_ref() }.ok_or(Error::InvalidPointer)?;
let decoder_output = moq_audio::DecoderOutput {
format: audio_format_from_u32(raw.format)?,
sample_rate: if raw.sample_rate == 0 {
None
} else {
Some(raw.sample_rate)
},
channels: if raw.channels == 0 { None } else { Some(raw.channels) },
latency_max: if raw.latency_max_ms == 0 {
None
} else {
Some(Duration::from_millis(raw.latency_max_ms))
},
};
let on_frame = unsafe { OnStatus::new(user_data, on_frame) };
let mut state = State::lock();
let (broadcast, audio_cfg, name) = state.consume.audio_rendition(catalog, index as usize)?;
let State { audio, .. } = &mut *state;
audio.consume(&broadcast, &audio_cfg, &name, decoder_output, on_frame)
})
}
#[unsafe(no_mangle)]
pub extern "C" fn moq_consume_audio_raw_close(consumer: u32) -> i32 {
ffi::enter(move || {
let consumer = ffi::parse_id(consumer)?;
State::lock().audio.consume_close(consumer)
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn moq_consume_audio_raw_frame(id: u32, dst: *mut moq_audio_frame) -> i32 {
ffi::enter(move || {
let id = ffi::parse_id(id)?;
let dst = unsafe { dst.as_mut() }.ok_or(Error::InvalidPointer)?;
State::lock().audio.frame_info(id, dst)
})
}
#[unsafe(no_mangle)]
pub extern "C" fn moq_consume_audio_raw_frame_free(id: u32) -> i32 {
ffi::enter(move || {
let id = ffi::parse_id(id)?;
State::lock().audio.frame_free(id)
})
}