use std::sync::Arc;
use bytes::Buf;
use crate::error::MoqError;
use crate::ffi::Task;
use crate::media::*;
#[derive(Clone, uniffi::Object)]
pub struct MoqBroadcastConsumer {
inner: moq_net::BroadcastConsumer,
}
impl MoqBroadcastConsumer {
pub(crate) fn new(inner: moq_net::BroadcastConsumer) -> Self {
Self { inner }
}
}
#[derive(uniffi::Object)]
pub struct MoqCatalogConsumer {
task: Task<Catalog>,
}
struct Catalog {
inner: moq_mux::catalog::Consumer,
}
impl Catalog {
async fn next(&mut self) -> Result<Option<MoqCatalog>, MoqError> {
match self.inner.next().await {
Ok(Some(catalog)) => Ok(Some(convert_catalog(&catalog))),
Ok(None) => Ok(None),
Err(e) => Err(e.into()),
}
}
}
#[derive(uniffi::Object)]
pub struct MoqMediaConsumer {
task: Task<Media>,
}
struct Media {
inner: moq_mux::container::Consumer<moq_mux::container::Hang>,
}
impl Media {
async fn next(&mut self) -> Result<Option<MoqFrame>, MoqError> {
let frame = self.inner.read().await?;
let Some(frame) = frame else {
return Ok(None);
};
let timestamp_us: u64 = frame
.timestamp
.as_micros()
.try_into()
.map_err(|_| MoqError::Codec("timestamp overflow".into()))?;
let mut buf = frame.payload;
let payload = buf.copy_to_bytes(buf.remaining()).to_vec();
Ok(Some(MoqFrame {
payload,
timestamp_us,
keyframe: frame.keyframe,
}))
}
}
#[uniffi::export]
impl MoqBroadcastConsumer {
pub fn subscribe_catalog(&self) -> Result<Arc<MoqCatalogConsumer>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let track = self.inner.subscribe_track(&hang::catalog::Catalog::default_track())?;
let consumer = moq_mux::catalog::Consumer::from(track);
Ok(Arc::new(MoqCatalogConsumer {
task: Task::new(Catalog { inner: consumer }),
}))
}
pub fn subscribe_track(&self, name: String) -> Result<Arc<MoqTrackConsumer>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let track = self.inner.subscribe_track(&moq_net::Track { name, priority: 0 })?;
Ok(Arc::new(MoqTrackConsumer::new(track)))
}
pub fn subscribe_media(
&self,
name: String,
container: Container,
max_latency_ms: u64,
) -> Result<Arc<MoqMediaConsumer>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let container: hang::catalog::Container = container.into();
let media: moq_mux::container::Hang = (&container)
.try_into()
.map_err(|e| MoqError::Codec(format!("invalid container: {e}")))?;
let track = self.inner.subscribe_track(&moq_net::Track { name, priority: 0 })?;
let latency = std::time::Duration::from_millis(max_latency_ms);
let consumer = moq_mux::container::Consumer::new(track, media).with_latency(latency);
Ok(Arc::new(MoqMediaConsumer {
task: Task::new(Media { inner: consumer }),
}))
}
}
struct TrackInner {
track: moq_net::TrackConsumer,
}
impl TrackInner {
async fn recv_group(&mut self) -> Result<Option<moq_net::GroupConsumer>, MoqError> {
Ok(self.track.recv_group().await?)
}
async fn next_group(&mut self) -> Result<Option<moq_net::GroupConsumer>, MoqError> {
Ok(self.track.next_group().await?)
}
async fn read_frame(&mut self) -> Result<Option<Vec<u8>>, MoqError> {
Ok(self.track.read_frame().await?.map(|b| b.to_vec()))
}
}
#[derive(uniffi::Object)]
pub struct MoqTrackConsumer {
task: Task<TrackInner>,
}
impl MoqTrackConsumer {
pub(crate) fn new(track: moq_net::TrackConsumer) -> Self {
Self {
task: Task::new(TrackInner { track }),
}
}
}
#[uniffi::export]
impl MoqTrackConsumer {
pub async fn recv_group(&self) -> Result<Option<Arc<MoqGroupConsumer>>, MoqError> {
self.task
.run(|mut state| async move {
Ok(state.recv_group().await?.map(|group| {
Arc::new(MoqGroupConsumer {
sequence: group.sequence,
task: Task::new(GroupInner { group }),
})
}))
})
.await
}
pub async fn next_group(&self) -> Result<Option<Arc<MoqGroupConsumer>>, MoqError> {
self.task
.run(|mut state| async move {
Ok(state.next_group().await?.map(|group| {
Arc::new(MoqGroupConsumer {
sequence: group.sequence,
task: Task::new(GroupInner { group }),
})
}))
})
.await
}
pub async fn read_frame(&self) -> Result<Option<Vec<u8>>, MoqError> {
self.task.run(|mut state| async move { state.read_frame().await }).await
}
pub fn cancel(&self) {
self.task.cancel();
}
}
struct GroupInner {
group: moq_net::GroupConsumer,
}
impl GroupInner {
async fn read_frame(&mut self) -> Result<Option<Vec<u8>>, MoqError> {
Ok(self.group.read_frame().await?.map(|b| b.to_vec()))
}
}
#[derive(uniffi::Object)]
pub struct MoqGroupConsumer {
sequence: u64,
task: Task<GroupInner>,
}
impl MoqGroupConsumer {
pub(crate) fn new(group: moq_net::GroupConsumer) -> Self {
Self {
sequence: group.sequence,
task: Task::new(GroupInner { group }),
}
}
}
#[uniffi::export]
impl MoqGroupConsumer {
pub fn sequence(&self) -> u64 {
self.sequence
}
pub async fn read_frame(&self) -> Result<Option<Vec<u8>>, MoqError> {
self.task.run(|mut state| async move { state.read_frame().await }).await
}
pub fn cancel(&self) {
self.task.cancel();
}
}
#[uniffi::export]
impl MoqCatalogConsumer {
pub async fn next(&self) -> Result<Option<MoqCatalog>, MoqError> {
self.task.run(|mut state| async move { state.next().await }).await
}
pub fn cancel(&self) {
self.task.cancel();
}
}
#[uniffi::export]
impl MoqMediaConsumer {
pub async fn next(&self) -> Result<Option<MoqFrame>, MoqError> {
self.task.run(|mut state| async move { state.next().await }).await
}
pub fn cancel(&self) {
self.task.cancel();
}
}