use std::ffi::c_char;
use tokio::sync::oneshot;
use crate::ffi::OnStatus;
use crate::{Error, Id, NonZeroSlab, State, moq_audio_config, moq_frame, moq_video_config};
struct ConsumeCatalog {
broadcast: moq_net::BroadcastConsumer,
catalog: hang::catalog::Catalog,
audio_codec: Vec<String>,
video_codec: Vec<String>,
}
struct TaskEntry {
close: Option<oneshot::Sender<()>>,
callback: OnStatus,
}
#[derive(Default)]
pub struct Consume {
broadcast: NonZeroSlab<moq_net::BroadcastConsumer>,
catalog: NonZeroSlab<ConsumeCatalog>,
catalog_task: NonZeroSlab<Option<TaskEntry>>,
track_task: NonZeroSlab<Option<TaskEntry>>,
frame: NonZeroSlab<moq_mux::container::Frame>,
raw_task: NonZeroSlab<Option<TaskEntry>>,
raw_frame: NonZeroSlab<bytes::Bytes>,
}
impl Consume {
pub fn start(&mut self, broadcast: moq_net::BroadcastConsumer) -> Result<Id, Error> {
self.broadcast.insert(broadcast)
}
pub fn catalog(&mut self, broadcast: Id, on_catalog: OnStatus) -> Result<Id, Error> {
let broadcast = self.broadcast.get(broadcast).ok_or(Error::BroadcastNotFound)?.clone();
let catalog = broadcast.subscribe_track(&hang::catalog::Catalog::default_track())?;
let channel = oneshot::channel();
let entry = TaskEntry {
close: Some(channel.0),
callback: on_catalog,
};
let id = self.catalog_task.insert(Some(entry))?;
tokio::spawn(async move {
let res = Self::run_catalog(on_catalog, broadcast, catalog.into(), channel.1).await;
let entry = State::lock().consume.catalog_task.remove(id).flatten();
if let Some(entry) = entry {
entry.callback.call(res);
}
});
Ok(id)
}
async fn run_catalog(
callback: OnStatus,
broadcast: moq_net::BroadcastConsumer,
mut catalog: moq_mux::catalog::hang::Consumer,
mut close: oneshot::Receiver<()>,
) -> Result<(), Error> {
loop {
let update = tokio::select! {
biased;
_ = &mut close => return Ok(()),
next = catalog.next() => match next? {
Some(update) => update,
None => return Ok(()),
},
};
let audio_codec = update
.audio
.renditions
.values()
.map(|config| config.codec.to_string())
.collect();
let video_codec = update
.video
.renditions
.values()
.map(|config| config.codec.to_string())
.collect();
let snapshot = ConsumeCatalog {
broadcast: broadcast.clone(),
catalog: update,
audio_codec,
video_codec,
};
let snapshot_id = State::lock().consume.catalog.insert(snapshot)?;
callback.call(Ok(snapshot_id));
}
}
pub fn video_config(&mut self, catalog: Id, index: usize, dst: &mut moq_video_config) -> Result<(), Error> {
let consume = self.catalog.get(catalog).ok_or(Error::CatalogNotFound)?;
let (rendition, config) = consume
.catalog
.video
.renditions
.iter()
.nth(index)
.ok_or(Error::NoIndex)?;
let codec = consume.video_codec.get(index).ok_or(Error::NoIndex)?;
*dst = moq_video_config {
name: rendition.as_str().as_ptr() as *const c_char,
name_len: rendition.len(),
codec: codec.as_str().as_ptr() as *const c_char,
codec_len: codec.len(),
description: config
.description
.as_ref()
.map(|desc| desc.as_ptr())
.unwrap_or(std::ptr::null()),
description_len: config.description.as_ref().map(|desc| desc.len()).unwrap_or(0),
coded_width: config
.coded_width
.as_ref()
.map(|width| width as *const u32)
.unwrap_or(std::ptr::null()),
coded_height: config
.coded_height
.as_ref()
.map(|height| height as *const u32)
.unwrap_or(std::ptr::null()),
};
Ok(())
}
pub fn audio_config(&mut self, catalog: Id, index: usize, dst: &mut moq_audio_config) -> Result<(), Error> {
let consume = self.catalog.get(catalog).ok_or(Error::CatalogNotFound)?;
let (rendition, config) = consume
.catalog
.audio
.renditions
.iter()
.nth(index)
.ok_or(Error::NoIndex)?;
let codec = consume.audio_codec.get(index).ok_or(Error::NoIndex)?;
*dst = moq_audio_config {
name: rendition.as_str().as_ptr() as *const c_char,
name_len: rendition.len(),
codec: codec.as_str().as_ptr() as *const c_char,
codec_len: codec.len(),
description: config
.description
.as_ref()
.map(|desc| desc.as_ptr())
.unwrap_or(std::ptr::null()),
description_len: config.description.as_ref().map(|desc| desc.len()).unwrap_or(0),
sample_rate: config.sample_rate,
channel_count: config.channel_count,
};
Ok(())
}
pub fn catalog_close(&mut self, catalog: Id) -> Result<(), Error> {
self.catalog_task
.get_mut(catalog)
.and_then(|entry| entry.as_mut())
.ok_or(Error::CatalogNotFound)?
.close
.take()
.ok_or(Error::CatalogNotFound)?;
Ok(())
}
pub fn catalog_free(&mut self, catalog: Id) -> Result<(), Error> {
self.catalog.remove(catalog).ok_or(Error::CatalogNotFound)?;
Ok(())
}
pub fn video_ordered(
&mut self,
catalog: Id,
index: usize,
latency: std::time::Duration,
on_frame: OnStatus,
) -> Result<Id, Error> {
let consume = self.catalog.get(catalog).ok_or(Error::CatalogNotFound)?;
let rendition = consume
.catalog
.video
.renditions
.keys()
.nth(index)
.ok_or(Error::NoIndex)?;
let track = consume.broadcast.subscribe_track(&moq_net::Track {
name: rendition.clone(),
priority: 1, })?;
let track =
moq_mux::container::Consumer::new(track, moq_mux::catalog::hang::Container::Legacy).with_latency(latency);
let channel = oneshot::channel();
let entry = TaskEntry {
close: Some(channel.0),
callback: on_frame,
};
let id = self.track_task.insert(Some(entry))?;
tokio::spawn(async move {
let res = Self::run_track(on_frame, track, channel.1).await;
let entry = State::lock().consume.track_task.remove(id).flatten();
if let Some(entry) = entry {
entry.callback.call(res);
}
});
Ok(id)
}
pub fn audio_ordered(
&mut self,
catalog: Id,
index: usize,
latency: std::time::Duration,
on_frame: OnStatus,
) -> Result<Id, Error> {
let consume = self.catalog.get(catalog).ok_or(Error::CatalogNotFound)?;
let rendition = consume
.catalog
.audio
.renditions
.keys()
.nth(index)
.ok_or(Error::NoIndex)?;
let track = consume.broadcast.subscribe_track(&moq_net::Track {
name: rendition.clone(),
priority: 2, })?;
let track =
moq_mux::container::Consumer::new(track, moq_mux::catalog::hang::Container::Legacy).with_latency(latency);
let channel = oneshot::channel();
let entry = TaskEntry {
close: Some(channel.0),
callback: on_frame,
};
let id = self.track_task.insert(Some(entry))?;
tokio::spawn(async move {
let res = Self::run_track(on_frame, track, channel.1).await;
let entry = State::lock().consume.track_task.remove(id).flatten();
if let Some(entry) = entry {
entry.callback.call(res);
}
});
Ok(id)
}
async fn run_track(
callback: OnStatus,
mut track: moq_mux::container::Consumer<moq_mux::catalog::hang::Container>,
mut close: oneshot::Receiver<()>,
) -> Result<(), Error> {
loop {
let frame = tokio::select! {
biased;
_ = &mut close => return Ok(()),
frame = track.read() => match frame? {
Some(frame) => frame,
None => return Ok(()),
},
};
let frame_id = State::lock().consume.frame.insert(frame)?;
callback.call(Ok(frame_id));
}
}
pub fn track_close(&mut self, track: Id) -> Result<(), Error> {
self.track_task
.get_mut(track)
.and_then(|entry| entry.as_mut())
.ok_or(Error::TrackNotFound)?
.close
.take()
.ok_or(Error::TrackNotFound)?;
Ok(())
}
pub fn frame(&self, frame: Id, dst: &mut moq_frame) -> Result<(), Error> {
let f = self.frame.get(frame).ok_or(Error::FrameNotFound)?;
let timestamp_us = f.timestamp.as_micros().try_into().map_err(|_| moq_net::TimeOverflow)?;
*dst = moq_frame {
payload: f.payload.as_ptr(),
payload_size: f.payload.len(),
timestamp_us,
keyframe: f.keyframe,
};
Ok(())
}
pub fn frame_close(&mut self, frame: Id) -> Result<(), Error> {
self.frame.remove(frame).ok_or(Error::FrameNotFound)?;
Ok(())
}
pub fn close(&mut self, consume: Id) -> Result<(), Error> {
self.broadcast.remove(consume).ok_or(Error::BroadcastNotFound)?;
Ok(())
}
pub fn raw_track(&mut self, broadcast: Id, name: &str, on_frame: OnStatus) -> Result<Id, Error> {
let broadcast = self.broadcast.get(broadcast).ok_or(Error::BroadcastNotFound)?;
let track = broadcast.subscribe_track(&moq_net::Track {
name: name.to_string(),
priority: 0,
})?;
let channel = oneshot::channel();
let entry = TaskEntry {
close: Some(channel.0),
callback: on_frame,
};
let id = self.raw_task.insert(Some(entry))?;
tokio::spawn(async move {
let res = Self::run_raw(on_frame, track, channel.1).await;
let entry = State::lock().consume.raw_task.remove(id).flatten();
if let Some(entry) = entry {
entry.callback.call(res);
}
});
Ok(id)
}
async fn run_raw(
callback: OnStatus,
mut track: moq_net::TrackConsumer,
mut close: oneshot::Receiver<()>,
) -> Result<(), Error> {
loop {
let mut group = tokio::select! {
biased;
_ = &mut close => return Ok(()),
group = track.next_group() => match group? {
Some(group) => group,
None => return Ok(()),
},
};
loop {
let payload = tokio::select! {
biased;
_ = &mut close => return Ok(()),
payload = group.read_frame() => match payload? {
Some(payload) => payload,
None => break,
},
};
let frame_id = State::lock().consume.raw_frame.insert(payload)?;
callback.call(Ok(frame_id));
}
}
}
pub fn raw_track_close(&mut self, track: Id) -> Result<(), Error> {
self.raw_task
.get_mut(track)
.and_then(|entry| entry.as_mut())
.ok_or(Error::TrackNotFound)?
.close
.take()
.ok_or(Error::TrackNotFound)?;
Ok(())
}
pub fn raw_frame(&self, frame: Id, dst: &mut moq_frame) -> Result<(), Error> {
let payload = self.raw_frame.get(frame).ok_or(Error::FrameNotFound)?;
*dst = moq_frame {
payload: payload.as_ptr(),
payload_size: payload.len(),
timestamp_us: 0,
keyframe: false,
};
Ok(())
}
pub fn raw_frame_close(&mut self, frame: Id) -> Result<(), Error> {
self.raw_frame.remove(frame).ok_or(Error::FrameNotFound)?;
Ok(())
}
pub fn audio_rendition(
&self,
catalog: Id,
index: usize,
) -> Result<(moq_net::BroadcastConsumer, hang::catalog::AudioConfig, String), Error> {
let consume = self.catalog.get(catalog).ok_or(Error::CatalogNotFound)?;
let (name, config) = consume
.catalog
.audio
.renditions
.iter()
.nth(index)
.ok_or(Error::NoIndex)?;
Ok((consume.broadcast.clone(), config.clone(), name.clone()))
}
}