use std::sync::Arc;
use tokio::sync::oneshot;
use url::Url;
use crate::{Error, Id, NonZeroSlab, State, ffi};
struct TaskEntry {
#[allow(dead_code)] close: oneshot::Sender<()>,
callback: ffi::OnStatus,
}
#[derive(Default)]
pub struct Session {
task: NonZeroSlab<Option<TaskEntry>>,
}
impl Session {
pub fn connect(
&mut self,
url: Url,
publish: Option<moq_lite::OriginConsumer>,
consume: Option<moq_lite::OriginProducer>,
callback: ffi::OnStatus,
) -> Result<Id, Error> {
let closed = oneshot::channel();
let entry = TaskEntry {
close: closed.0,
callback,
};
let id = self.task.insert(Some(entry))?;
tokio::spawn(async move {
let res = tokio::select! {
_ = closed.1 => Err(Error::Closed),
res = Self::connect_run(id, url, publish, consume) => res,
};
if let Some(entry) = State::lock().session.task.remove(id).flatten() {
entry.callback.call(res);
}
});
Ok(id)
}
async fn connect_run(
task_id: Id,
url: Url,
publish: Option<moq_lite::OriginConsumer>,
consume: Option<moq_lite::OriginProducer>,
) -> Result<(), Error> {
let client = moq_native::ClientConfig::default()
.init()
.map_err(|err| Error::Connect(Arc::new(err)))?;
let session = client
.with_publish(publish)
.with_consume(consume)
.connect(url)
.await
.map_err(|err| Error::Connect(Arc::new(err)))?;
if let Some(Some(entry)) = State::lock().session.task.get(task_id) {
entry.callback.call(());
}
session.closed().await?;
Ok(())
}
pub fn close(&mut self, id: Id) -> Result<(), Error> {
self.task
.get_mut(id)
.ok_or(Error::SessionNotFound)?
.take()
.ok_or(Error::SessionNotFound)?;
Ok(())
}
}