use std::sync::Arc;
use bytes::Buf;
use crate::error::MoqError;
use crate::ffi::Task;
use crate::media::*;
#[derive(Clone, uniffi::Object)]
pub struct MoqBroadcastConsumer {
inner: moq_lite::BroadcastConsumer,
}
impl MoqBroadcastConsumer {
pub(crate) fn new(inner: moq_lite::BroadcastConsumer) -> Self {
Self { inner }
}
}
#[derive(uniffi::Object)]
pub struct MoqCatalogConsumer {
task: Task<Catalog>,
}
struct Catalog {
inner: hang::CatalogConsumer,
}
impl Catalog {
async fn next(&mut self) -> Result<Option<MoqCatalog>, MoqError> {
match self.inner.next().await {
Ok(Some(catalog)) => Ok(Some(convert_catalog(&catalog))),
Ok(None) => Ok(None),
Err(e) => Err(e.into()),
}
}
}
#[derive(uniffi::Object)]
pub struct MoqMediaConsumer {
task: Task<Media>,
}
struct Media {
inner: moq_mux::ordered::Consumer<hang::catalog::Container>,
}
impl Media {
async fn next(&mut self) -> Result<Option<MoqFrame>, MoqError> {
let frame = self.inner.read().await?;
let Some(frame) = frame else {
return Ok(None);
};
let timestamp_us: u64 = frame
.timestamp
.as_micros()
.try_into()
.map_err(|_| MoqError::Codec("timestamp overflow".into()))?;
let mut buf = frame.payload;
let payload = buf.copy_to_bytes(buf.remaining()).to_vec();
Ok(Some(MoqFrame {
payload,
timestamp_us,
keyframe: frame.keyframe,
}))
}
}
#[uniffi::export]
impl MoqBroadcastConsumer {
pub fn subscribe_catalog(&self) -> Result<Arc<MoqCatalogConsumer>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let track = self.inner.subscribe_track(&hang::catalog::Catalog::default_track())?;
let consumer = hang::CatalogConsumer::from(track);
Ok(Arc::new(MoqCatalogConsumer {
task: Task::new(Catalog { inner: consumer }),
}))
}
pub fn subscribe_media(
&self,
name: String,
container: Container,
max_latency_ms: u64,
) -> Result<Arc<MoqMediaConsumer>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let track = self.inner.subscribe_track(&moq_lite::Track { name, priority: 0 })?;
let container: hang::catalog::Container = container.into();
let latency = std::time::Duration::from_millis(max_latency_ms);
let consumer = moq_mux::ordered::Consumer::new(track, container).with_latency(latency);
Ok(Arc::new(MoqMediaConsumer {
task: Task::new(Media { inner: consumer }),
}))
}
}
#[uniffi::export]
impl MoqCatalogConsumer {
pub async fn next(&self) -> Result<Option<MoqCatalog>, MoqError> {
self.task.run(|mut state| async move { state.next().await }).await
}
pub fn cancel(&self) {
self.task.cancel();
}
}
#[uniffi::export]
impl MoqMediaConsumer {
pub async fn next(&self) -> Result<Option<MoqFrame>, MoqError> {
self.task.run(|mut state| async move { state.next().await }).await
}
pub fn cancel(&self) {
self.task.cancel();
}
}