use std::str::FromStr;
use std::sync::Arc;
use bytes::Buf;
use crate::consumer::{MoqBroadcastConsumer, MoqGroupConsumer, MoqTrackConsumer};
use crate::error::MoqError;
use crate::ffi::Task;
pub(crate) struct BroadcastProducer {
pub(crate) broadcast: moq_net::BroadcastProducer,
pub(crate) catalog: moq_mux::catalog::Producer,
}
struct MediaProducer {
decoder: moq_mux::import::Framed,
track: moq_net::TrackProducer,
}
struct MediaStreamProducer {
decoder: moq_mux::import::Stream,
buffer: bytes::BytesMut,
}
#[derive(uniffi::Object)]
pub struct MoqBroadcastProducer {
state: std::sync::Mutex<Option<BroadcastProducer>>,
}
#[derive(uniffi::Object)]
pub struct MoqBroadcastDynamic {
task: Task<DynamicProducer>,
}
struct DynamicProducer {
inner: moq_net::BroadcastDynamic,
}
impl DynamicProducer {
async fn requested_track(&mut self) -> Result<Arc<MoqTrackProducer>, MoqError> {
let track = self.inner.requested_track().await?;
Ok(Arc::new(MoqTrackProducer {
inner: std::sync::Mutex::new(Some(track)),
}))
}
}
impl MoqBroadcastProducer {
pub(crate) fn consume_inner(&self) -> Result<moq_net::BroadcastConsumer, MoqError> {
let guard = self.state.lock().unwrap();
let state = guard.as_ref().ok_or_else(|| MoqError::Closed)?;
Ok(state.broadcast.consume())
}
pub(crate) fn with_state<R>(
&self,
f: impl FnOnce(&mut BroadcastProducer) -> Result<R, MoqError>,
) -> Result<R, MoqError> {
let mut guard = self.state.lock().unwrap();
let state = guard.as_mut().ok_or(MoqError::Closed)?;
f(state)
}
}
#[derive(uniffi::Object)]
pub struct MoqMediaProducer {
inner: std::sync::Mutex<Option<MediaProducer>>,
}
#[derive(uniffi::Object)]
pub struct MoqMediaStreamProducer {
inner: std::sync::Mutex<Option<MediaStreamProducer>>,
}
#[uniffi::export]
impl MoqBroadcastProducer {
pub fn consume(&self) -> Result<Arc<MoqBroadcastConsumer>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
Ok(Arc::new(MoqBroadcastConsumer::new(self.consume_inner()?)))
}
pub fn dynamic(&self) -> Result<Arc<MoqBroadcastDynamic>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let guard = self.state.lock().unwrap();
let state = guard.as_ref().ok_or_else(|| MoqError::Closed)?;
Ok(Arc::new(MoqBroadcastDynamic {
task: Task::new(DynamicProducer {
inner: state.broadcast.dynamic(),
}),
}))
}
#[uniffi::constructor]
pub fn new() -> Result<Arc<Self>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let mut broadcast = moq_net::Broadcast::new().produce();
let catalog = moq_mux::catalog::Producer::new(&mut broadcast)?;
Ok(Arc::new(Self {
state: std::sync::Mutex::new(Some(BroadcastProducer { broadcast, catalog })),
}))
}
pub fn publish_media(&self, format: String, init: Vec<u8>) -> Result<Arc<MoqMediaProducer>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let guard = self.state.lock().unwrap();
let state = guard.as_ref().ok_or_else(|| MoqError::Closed)?;
let format = moq_mux::import::FramedFormat::from_str(&format)
.map_err(|_| MoqError::Codec(format!("unknown format: {format}")))?;
let mut buf = init.as_slice();
let decoder = moq_mux::import::Framed::new(state.broadcast.clone(), state.catalog.clone(), format, &mut buf)
.map_err(|err| MoqError::Codec(format!("init failed: {err}")))?;
if buf.has_remaining() {
return Err(MoqError::Codec("init failed: trailing bytes".into()));
}
let track = decoder
.track()
.map_err(|err| MoqError::Codec(format!("track unavailable: {err}")))?
.clone();
Ok(Arc::new(MoqMediaProducer {
inner: std::sync::Mutex::new(Some(MediaProducer { decoder, track })),
}))
}
pub fn publish_media_on_track(
&self,
track: &MoqTrackProducer,
format: String,
init: Vec<u8>,
) -> Result<Arc<MoqMediaProducer>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let guard = self.state.lock().unwrap();
let state = guard.as_ref().ok_or_else(|| MoqError::Closed)?;
let format = moq_mux::import::FramedFormat::from_str(&format)
.map_err(|_| MoqError::Codec(format!("unknown format: {format}")))?;
let track_clone = {
let guard = track.inner.lock().unwrap();
guard.as_ref().ok_or_else(|| MoqError::Closed)?.clone()
};
let mut buf = init.as_slice();
let decoder =
moq_mux::import::Framed::new_with_track(track_clone.clone(), state.catalog.clone(), format, &mut buf)
.map_err(|err| MoqError::Codec(format!("init failed: {err}")))?;
if buf.has_remaining() {
return Err(MoqError::Codec("init failed: trailing bytes".into()));
}
let mut guard = track.inner.lock().unwrap();
guard.take().ok_or_else(|| MoqError::Closed)?;
Ok(Arc::new(MoqMediaProducer {
inner: std::sync::Mutex::new(Some(MediaProducer {
decoder,
track: track_clone,
})),
}))
}
pub fn publish_media_stream(&self, format: String) -> Result<Arc<MoqMediaStreamProducer>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let guard = self.state.lock().unwrap();
let state = guard.as_ref().ok_or_else(|| MoqError::Closed)?;
let format = moq_mux::import::StreamFormat::from_str(&format)
.map_err(|_| MoqError::Codec(format!("unknown stream format: {format}")))?;
let decoder = moq_mux::import::Stream::new(state.broadcast.clone(), state.catalog.clone(), format)
.map_err(|err| MoqError::Codec(format!("init failed: {err}")))?;
Ok(Arc::new(MoqMediaStreamProducer {
inner: std::sync::Mutex::new(Some(MediaStreamProducer {
decoder,
buffer: bytes::BytesMut::new(),
})),
}))
}
pub fn publish_track(&self, name: String) -> Result<Arc<MoqTrackProducer>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let guard = self.state.lock().unwrap();
let state = guard.as_ref().ok_or_else(|| MoqError::Closed)?;
let track = moq_net::Track { name, priority: 0 };
let mut broadcast = state.broadcast.clone();
let producer = broadcast.create_track(track)?;
Ok(Arc::new(MoqTrackProducer {
inner: std::sync::Mutex::new(Some(producer)),
}))
}
pub fn finish(&self) -> Result<(), MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let mut guard = self.state.lock().unwrap();
let mut state = guard.take().ok_or_else(|| MoqError::Closed)?;
state.catalog.finish()?;
Ok(())
}
}
#[uniffi::export]
impl MoqBroadcastDynamic {
pub async fn requested_track(&self) -> Result<Arc<MoqTrackProducer>, MoqError> {
self.task
.run(|mut state| async move { state.requested_track().await })
.await
}
pub fn cancel(&self) {
self.task.cancel();
}
}
#[derive(uniffi::Object)]
pub struct MoqTrackProducer {
inner: std::sync::Mutex<Option<moq_net::TrackProducer>>,
}
#[uniffi::export]
impl MoqTrackProducer {
pub fn name(&self) -> Result<String, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let guard = self.inner.lock().unwrap();
let track = guard.as_ref().ok_or_else(|| MoqError::Closed)?;
Ok(track.name.clone())
}
pub async fn used(&self) -> Result<(), MoqError> {
let track = self.inner.lock().unwrap().as_ref().ok_or(MoqError::Closed)?.clone();
match crate::ffi::RUNTIME.spawn(async move { track.used().await }).await {
Ok(result) => result.map_err(Into::into),
Err(e) if e.is_cancelled() => Err(MoqError::Cancelled),
Err(e) => Err(MoqError::Task(e)),
}
}
pub async fn unused(&self) -> Result<(), MoqError> {
let track = self.inner.lock().unwrap().as_ref().ok_or(MoqError::Closed)?.clone();
match crate::ffi::RUNTIME.spawn(async move { track.unused().await }).await {
Ok(result) => result.map_err(Into::into),
Err(e) if e.is_cancelled() => Err(MoqError::Cancelled),
Err(e) => Err(MoqError::Task(e)),
}
}
pub fn consume(&self) -> Result<Arc<MoqTrackConsumer>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let guard = self.inner.lock().unwrap();
let track = guard.as_ref().ok_or_else(|| MoqError::Closed)?;
Ok(Arc::new(MoqTrackConsumer::new(track.consume())))
}
pub fn append_group(&self) -> Result<Arc<MoqGroupProducer>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let mut guard = self.inner.lock().unwrap();
let track = guard.as_mut().ok_or_else(|| MoqError::Closed)?;
let group = track.append_group()?;
Ok(Arc::new(MoqGroupProducer {
sequence: group.sequence,
inner: std::sync::Mutex::new(Some(group)),
}))
}
pub fn write_frame(&self, payload: Vec<u8>) -> Result<(), MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let mut guard = self.inner.lock().unwrap();
let track = guard.as_mut().ok_or_else(|| MoqError::Closed)?;
track.write_frame(payload)?;
Ok(())
}
pub fn abort(&self, error_code: i32) -> Result<(), MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let error_code = u16::try_from(error_code).map_err(|_| MoqError::InvalidErrorCode(error_code))?;
let mut guard = self.inner.lock().unwrap();
let mut track = guard.take().ok_or_else(|| MoqError::Closed)?;
track.abort(moq_net::Error::App(error_code))?;
Ok(())
}
pub fn finish(&self) -> Result<(), MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let mut guard = self.inner.lock().unwrap();
let mut track = guard.take().ok_or_else(|| MoqError::Closed)?;
track.finish()?;
Ok(())
}
}
#[derive(uniffi::Object)]
pub struct MoqGroupProducer {
sequence: u64,
inner: std::sync::Mutex<Option<moq_net::GroupProducer>>,
}
#[uniffi::export]
impl MoqGroupProducer {
pub fn sequence(&self) -> u64 {
self.sequence
}
pub fn consume(&self) -> Result<Arc<MoqGroupConsumer>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let guard = self.inner.lock().unwrap();
let group = guard.as_ref().ok_or_else(|| MoqError::Closed)?;
Ok(Arc::new(MoqGroupConsumer::new(group.consume())))
}
pub fn write_frame(&self, payload: Vec<u8>) -> Result<(), MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let mut guard = self.inner.lock().unwrap();
let group = guard.as_mut().ok_or_else(|| MoqError::Closed)?;
group.write_frame(payload)?;
Ok(())
}
pub fn finish(&self) -> Result<(), MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let mut guard = self.inner.lock().unwrap();
let mut group = guard.take().ok_or_else(|| MoqError::Closed)?;
group.finish()?;
Ok(())
}
}
#[uniffi::export]
impl MoqMediaProducer {
pub fn name(&self) -> Result<String, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let guard = self.inner.lock().unwrap();
let media = guard.as_ref().ok_or_else(|| MoqError::Closed)?;
Ok(media.track.name.clone())
}
pub async fn used(&self) -> Result<(), MoqError> {
let track = self
.inner
.lock()
.unwrap()
.as_ref()
.ok_or(MoqError::Closed)?
.track
.clone();
match crate::ffi::RUNTIME.spawn(async move { track.used().await }).await {
Ok(result) => result.map_err(Into::into),
Err(e) if e.is_cancelled() => Err(MoqError::Cancelled),
Err(e) => Err(MoqError::Task(e)),
}
}
pub async fn unused(&self) -> Result<(), MoqError> {
let track = self
.inner
.lock()
.unwrap()
.as_ref()
.ok_or(MoqError::Closed)?
.track
.clone();
match crate::ffi::RUNTIME.spawn(async move { track.unused().await }).await {
Ok(result) => result.map_err(Into::into),
Err(e) if e.is_cancelled() => Err(MoqError::Cancelled),
Err(e) => Err(MoqError::Task(e)),
}
}
pub fn write_frame(&self, payload: Vec<u8>, timestamp_us: u64) -> Result<(), MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let mut guard = self.inner.lock().unwrap();
let media = guard.as_mut().ok_or_else(|| MoqError::Closed)?;
let timestamp = hang::container::Timestamp::from_micros(timestamp_us)?;
let mut data = payload.as_slice();
media
.decoder
.decode_frame(&mut data, Some(timestamp))
.map_err(|err| MoqError::Codec(format!("decode failed: {err}")))?;
if data.has_remaining() {
return Err(MoqError::Codec("buffer was not fully consumed".into()));
}
Ok(())
}
pub fn finish(&self) -> Result<(), MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let mut guard = self.inner.lock().unwrap();
let mut media = guard.take().ok_or_else(|| MoqError::Closed)?;
media
.decoder
.finish()
.map_err(|err| MoqError::Codec(format!("finish failed: {err}")))?;
Ok(())
}
}
#[uniffi::export]
impl MoqMediaStreamProducer {
pub fn write(&self, payload: Vec<u8>) -> Result<(), MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let mut guard = self.inner.lock().unwrap();
let media = guard.as_mut().ok_or_else(|| MoqError::Closed)?;
media.buffer.extend_from_slice(&payload);
media
.decoder
.decode_stream(&mut media.buffer)
.map_err(|err| MoqError::Codec(format!("decode failed: {err}")))?;
Ok(())
}
pub fn finish(&self) -> Result<(), MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let mut guard = self.inner.lock().unwrap();
let mut media = guard.take().ok_or_else(|| MoqError::Closed)?;
media
.decoder
.finish()
.map_err(|err| MoqError::Codec(format!("finish failed: {err}")))?;
Ok(())
}
}