use bytes::Buf;
use std::ffi::c_char;
use tokio::sync::oneshot;
use crate::ffi::OnStatus;
use crate::{Error, Id, NonZeroSlab, State, moq_audio_config, moq_frame, moq_video_config};
struct ConsumeCatalog {
broadcast: moq_lite::BroadcastConsumer,
catalog: hang::catalog::Catalog,
audio_codec: Vec<String>,
video_codec: Vec<String>,
}
struct TaskEntry {
#[allow(dead_code)] close: oneshot::Sender<()>,
callback: OnStatus,
}
#[derive(Default)]
pub struct Consume {
broadcast: NonZeroSlab<moq_lite::BroadcastConsumer>,
catalog: NonZeroSlab<ConsumeCatalog>,
catalog_task: NonZeroSlab<Option<TaskEntry>>,
track_task: NonZeroSlab<Option<TaskEntry>>,
frame: NonZeroSlab<hang::container::OrderedFrame>,
}
impl Consume {
pub fn start(&mut self, broadcast: moq_lite::BroadcastConsumer) -> Result<Id, Error> {
self.broadcast.insert(broadcast)
}
pub fn catalog(&mut self, broadcast: Id, on_catalog: OnStatus) -> Result<Id, Error> {
let broadcast = self.broadcast.get(broadcast).ok_or(Error::BroadcastNotFound)?.clone();
let catalog = broadcast.subscribe_track(&hang::catalog::Catalog::default_track())?;
let channel = oneshot::channel();
let entry = TaskEntry {
close: channel.0,
callback: on_catalog,
};
let id = self.catalog_task.insert(Some(entry))?;
tokio::spawn(async move {
let res = tokio::select! {
res = Self::run_catalog(id, broadcast, catalog.into()) => res,
_ = channel.1 => Ok(()),
};
if let Some(entry) = State::lock().consume.catalog_task.remove(id).flatten() {
entry.callback.call(res);
}
});
Ok(id)
}
async fn run_catalog(
task_id: Id,
broadcast: moq_lite::BroadcastConsumer,
mut catalog: hang::CatalogConsumer,
) -> Result<(), Error> {
while let Some(catalog) = catalog.next().await? {
let audio_codec = catalog
.audio
.renditions
.values()
.map(|config| config.codec.to_string())
.collect();
let video_codec = catalog
.video
.renditions
.values()
.map(|config| config.codec.to_string())
.collect();
let catalog = ConsumeCatalog {
broadcast: broadcast.clone(),
catalog,
audio_codec,
video_codec,
};
let mut state = State::lock();
let Some(Some(entry)) = state.consume.catalog_task.get(task_id) else {
return Ok(());
};
let callback = entry.callback;
let snapshot_id = state.consume.catalog.insert(catalog)?;
drop(state);
callback.call(Ok(snapshot_id));
}
Ok(())
}
pub fn video_config(&mut self, catalog: Id, index: usize, dst: &mut moq_video_config) -> Result<(), Error> {
let consume = self.catalog.get(catalog).ok_or(Error::CatalogNotFound)?;
let (rendition, config) = consume
.catalog
.video
.renditions
.iter()
.nth(index)
.ok_or(Error::NoIndex)?;
let codec = consume.video_codec.get(index).ok_or(Error::NoIndex)?;
*dst = moq_video_config {
name: rendition.as_str().as_ptr() as *const c_char,
name_len: rendition.len(),
codec: codec.as_str().as_ptr() as *const c_char,
codec_len: codec.len(),
description: config
.description
.as_ref()
.map(|desc| desc.as_ptr())
.unwrap_or(std::ptr::null()),
description_len: config.description.as_ref().map(|desc| desc.len()).unwrap_or(0),
coded_width: config
.coded_width
.as_ref()
.map(|width| width as *const u32)
.unwrap_or(std::ptr::null()),
coded_height: config
.coded_height
.as_ref()
.map(|height| height as *const u32)
.unwrap_or(std::ptr::null()),
};
Ok(())
}
pub fn audio_config(&mut self, catalog: Id, index: usize, dst: &mut moq_audio_config) -> Result<(), Error> {
let consume = self.catalog.get(catalog).ok_or(Error::CatalogNotFound)?;
let (rendition, config) = consume
.catalog
.audio
.renditions
.iter()
.nth(index)
.ok_or(Error::NoIndex)?;
let codec = consume.audio_codec.get(index).ok_or(Error::NoIndex)?;
*dst = moq_audio_config {
name: rendition.as_str().as_ptr() as *const c_char,
name_len: rendition.len(),
codec: codec.as_str().as_ptr() as *const c_char,
codec_len: codec.len(),
description: config
.description
.as_ref()
.map(|desc| desc.as_ptr())
.unwrap_or(std::ptr::null()),
description_len: config.description.as_ref().map(|desc| desc.len()).unwrap_or(0),
sample_rate: config.sample_rate,
channel_count: config.channel_count,
};
Ok(())
}
pub fn catalog_close(&mut self, catalog: Id) -> Result<(), Error> {
self.catalog_task
.get_mut(catalog)
.ok_or(Error::CatalogNotFound)?
.take()
.ok_or(Error::CatalogNotFound)?;
Ok(())
}
pub fn catalog_free(&mut self, catalog: Id) -> Result<(), Error> {
self.catalog.remove(catalog).ok_or(Error::CatalogNotFound)?;
Ok(())
}
pub fn video_ordered(
&mut self,
catalog: Id,
index: usize,
latency: std::time::Duration,
on_frame: OnStatus,
) -> Result<Id, Error> {
let consume = self.catalog.get(catalog).ok_or(Error::CatalogNotFound)?;
let rendition = consume
.catalog
.video
.renditions
.keys()
.nth(index)
.ok_or(Error::NoIndex)?;
let track = consume.broadcast.subscribe_track(&moq_lite::Track {
name: rendition.clone(),
priority: 1, })?;
let track = hang::container::OrderedConsumer::new(track, latency);
let channel = oneshot::channel();
let entry = TaskEntry {
close: channel.0,
callback: on_frame,
};
let id = self.track_task.insert(Some(entry))?;
tokio::spawn(async move {
let res = tokio::select! {
res = Self::run_track(id, track) => res,
_ = channel.1 => Ok(()),
};
if let Some(entry) = State::lock().consume.track_task.remove(id).flatten() {
entry.callback.call(res);
}
});
Ok(id)
}
pub fn audio_ordered(
&mut self,
catalog: Id,
index: usize,
latency: std::time::Duration,
on_frame: OnStatus,
) -> Result<Id, Error> {
let consume = self.catalog.get(catalog).ok_or(Error::CatalogNotFound)?;
let rendition = consume
.catalog
.audio
.renditions
.keys()
.nth(index)
.ok_or(Error::NoIndex)?;
let track = consume.broadcast.subscribe_track(&moq_lite::Track {
name: rendition.clone(),
priority: 2, })?;
let track = hang::container::OrderedConsumer::new(track, latency);
let channel = oneshot::channel();
let entry = TaskEntry {
close: channel.0,
callback: on_frame,
};
let id = self.track_task.insert(Some(entry))?;
tokio::spawn(async move {
let res = tokio::select! {
res = Self::run_track(id, track) => res,
_ = channel.1 => Ok(()),
};
if let Some(entry) = State::lock().consume.track_task.remove(id).flatten() {
entry.callback.call(res);
}
});
Ok(id)
}
async fn run_track(task_id: Id, mut track: hang::container::OrderedConsumer) -> Result<(), Error> {
while let Some(mut ordered) = track.read().await? {
let mut new_payload = hang::container::BufList::new();
new_payload.push_chunk(if ordered.payload.num_chunks() == 1 {
ordered.payload.get_chunk(0).expect("frame has zero chunks").clone()
} else {
ordered.payload.copy_to_bytes(ordered.payload.num_bytes())
});
let new_frame = hang::container::OrderedFrame {
timestamp: ordered.timestamp,
payload: new_payload,
group: ordered.group,
index: ordered.index,
};
let mut state = State::lock();
let Some(Some(entry)) = state.consume.track_task.get(task_id) else {
return Ok(());
};
let callback = entry.callback;
let frame_id = state.consume.frame.insert(new_frame)?;
drop(state);
callback.call(Ok(frame_id));
}
Ok(())
}
pub fn track_close(&mut self, track: Id) -> Result<(), Error> {
self.track_task
.get_mut(track)
.ok_or(Error::TrackNotFound)?
.take()
.ok_or(Error::TrackNotFound)?;
Ok(())
}
pub fn frame_chunk(&self, frame: Id, index: usize, dst: &mut moq_frame) -> Result<(), Error> {
let ordered = self.frame.get(frame).ok_or(Error::FrameNotFound)?;
let chunk = ordered.payload.get_chunk(index).ok_or(Error::NoIndex)?;
let timestamp_us = ordered
.timestamp
.as_micros()
.try_into()
.map_err(|_| moq_lite::TimeOverflow)?;
*dst = moq_frame {
payload: chunk.as_ptr(),
payload_size: chunk.len(),
timestamp_us,
keyframe: ordered.is_keyframe(),
};
Ok(())
}
pub fn frame_close(&mut self, frame: Id) -> Result<(), Error> {
self.frame.remove(frame).ok_or(Error::FrameNotFound)?;
Ok(())
}
pub fn close(&mut self, consume: Id) -> Result<(), Error> {
self.broadcast.remove(consume).ok_or(Error::BroadcastNotFound)?;
Ok(())
}
}