use std::ffi::c_char;
use tokio::sync::oneshot;
use crate::ffi::OnStatus;
use crate::{Error, Id, NonZeroSlab, State, moq_announced};
struct TaskEntry {
#[allow(dead_code)] close: oneshot::Sender<()>,
callback: OnStatus,
}
#[derive(Default)]
pub struct Origin {
active: NonZeroSlab<moq_lite::OriginProducer>,
announced: NonZeroSlab<(String, bool)>,
announced_task: NonZeroSlab<Option<TaskEntry>>,
}
impl Origin {
pub fn create(&mut self) -> Result<Id, Error> {
self.active.insert(moq_lite::OriginProducer::default())
}
pub fn get(&self, id: Id) -> Result<&moq_lite::OriginProducer, Error> {
self.active.get(id).ok_or(Error::OriginNotFound)
}
pub fn announced(&mut self, origin: Id, on_announce: OnStatus) -> Result<Id, Error> {
let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?;
let consumer = origin.consume();
let channel = oneshot::channel();
let entry = TaskEntry {
close: channel.0,
callback: on_announce,
};
let id = self.announced_task.insert(Some(entry))?;
tokio::spawn(async move {
let res = tokio::select! {
res = Self::run_announced(id, consumer) => res,
_ = channel.1 => Ok(()),
};
if let Some(entry) = State::lock().origin.announced_task.remove(id).flatten() {
entry.callback.call(res);
}
});
Ok(id)
}
async fn run_announced(task_id: Id, mut consumer: moq_lite::OriginConsumer) -> Result<(), Error> {
while let Some((path, broadcast)) = consumer.announced().await {
let mut state = State::lock();
let Some(Some(entry)) = state.origin.announced_task.get(task_id) else {
return Ok(());
};
let callback = entry.callback;
let announced_id = state.origin.announced.insert((path.to_string(), broadcast.is_some()))?;
drop(state);
callback.call(announced_id);
}
Ok(())
}
pub fn announced_info(&self, announced: Id, dst: &mut moq_announced) -> Result<(), Error> {
let announced = self.announced.get(announced).ok_or(Error::AnnouncementNotFound)?;
*dst = moq_announced {
path: announced.0.as_str().as_ptr() as *const c_char,
path_len: announced.0.len(),
active: announced.1,
};
Ok(())
}
pub fn announced_close(&mut self, announced: Id) -> Result<(), Error> {
self.announced_task
.get_mut(announced)
.ok_or(Error::AnnouncementNotFound)?
.take()
.ok_or(Error::AnnouncementNotFound)?;
Ok(())
}
pub fn consume<P: moq_lite::AsPath>(&mut self, origin: Id, path: P) -> Result<moq_lite::BroadcastConsumer, Error> {
let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?;
origin.consume().consume_broadcast(path).ok_or(Error::BroadcastNotFound)
}
pub fn publish<P: moq_lite::AsPath>(
&mut self,
origin: Id,
path: P,
broadcast: moq_lite::BroadcastConsumer,
) -> Result<(), Error> {
let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?;
origin.publish_broadcast(path, broadcast);
Ok(())
}
pub fn close(&mut self, origin: Id) -> Result<(), Error> {
self.active.remove(origin).ok_or(Error::OriginNotFound)?;
Ok(())
}
}