use std::sync::Arc;
use bytes::Buf;
use crate::error::MoqError;
use crate::ffi::Task;
use crate::media::*;
#[derive(Clone, uniffi::Object)]
pub struct MoqBroadcastConsumer {
inner: moq_lite::BroadcastConsumer,
}
impl MoqBroadcastConsumer {
pub(crate) fn new(inner: moq_lite::BroadcastConsumer) -> Self {
Self { inner }
}
}
#[derive(uniffi::Object)]
pub struct MoqCatalogConsumer {
task: Task<Catalog>,
}
struct Catalog {
inner: hang::CatalogConsumer,
}
impl Catalog {
async fn next(&mut self) -> Result<Option<MoqCatalog>, MoqError> {
match self.inner.next().await {
Ok(Some(catalog)) => Ok(Some(convert_catalog(&catalog))),
Ok(None) => Ok(None),
Err(e) => Err(e.into()),
}
}
}
#[derive(uniffi::Object)]
pub struct MoqMediaConsumer {
task: Task<Media>,
}
struct Media {
inner: hang::container::OrderedConsumer,
}
impl Media {
async fn next(&mut self) -> Result<Option<MoqFrame>, MoqError> {
let Some(frame) = self.inner.read().await? else {
return Ok(None);
};
let keyframe = frame.is_keyframe();
let timestamp_us: u64 = frame
.timestamp
.as_micros()
.try_into()
.map_err(|_| MoqError::Codec("timestamp overflow".into()))?;
let mut buf = frame.payload;
let payload = buf.copy_to_bytes(buf.remaining()).to_vec();
Ok(Some(MoqFrame {
payload,
timestamp_us,
keyframe,
}))
}
}
#[uniffi::export]
impl MoqBroadcastConsumer {
pub fn subscribe_catalog(&self) -> Result<Arc<MoqCatalogConsumer>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let track = self.inner.subscribe_track(&hang::catalog::Catalog::default_track())?;
let consumer = hang::CatalogConsumer::from(track);
Ok(Arc::new(MoqCatalogConsumer {
task: Task::new(Catalog { inner: consumer }),
}))
}
pub fn subscribe_media(&self, name: String, max_latency_ms: u64) -> Result<Arc<MoqMediaConsumer>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let track = self.inner.subscribe_track(&moq_lite::Track { name, priority: 0 })?;
let latency = std::time::Duration::from_millis(max_latency_ms);
let consumer = hang::container::OrderedConsumer::new(track, latency);
Ok(Arc::new(MoqMediaConsumer {
task: Task::new(Media { inner: consumer }),
}))
}
}
#[uniffi::export]
impl MoqCatalogConsumer {
pub async fn next(&self) -> Result<Option<MoqCatalog>, MoqError> {
self.task.run(|mut state| async move { state.next().await }).await
}
pub fn cancel(&self) {
self.task.cancel();
}
}
#[uniffi::export]
impl MoqMediaConsumer {
pub async fn next(&self) -> Result<Option<MoqFrame>, MoqError> {
self.task.run(|mut state| async move { state.next().await }).await
}
pub fn cancel(&self) {
self.task.cancel();
}
}