use std::path::PathBuf;
use std::sync::Arc;
use crate::error::MoqError;
use crate::ffi::Task;
use crate::origin::MoqOriginProducer;
use crate::session::MoqSession;
struct ServerState {
config: moq_native::ServerConfig,
publish: Option<Arc<MoqOriginProducer>>,
consume: Option<Arc<MoqOriginProducer>>,
server: Option<moq_native::Server>,
}
impl ServerState {
async fn listen(&mut self) -> Result<String, MoqError> {
if self.server.is_some() {
return Err(MoqError::Bind("already listening".into()));
}
let server = self
.config
.clone()
.init()
.map_err(|err| MoqError::Bind(format!("{err}")))?;
let addr = server
.local_addr()
.map_err(|err| MoqError::Bind(format!("{err}")))?
.to_string();
self.server = Some(server);
Ok(addr)
}
async fn accept(&mut self) -> Result<Option<Arc<MoqRequest>>, MoqError> {
let server = self
.server
.as_mut()
.ok_or_else(|| MoqError::Bind("not listening; call listen() first".into()))?;
let publish = self.publish.clone();
let consume = self.consume.clone();
match server.accept().await {
Some(request) => Ok(Some(MoqRequest::new(request, publish, consume))),
None => Ok(None),
}
}
}
#[derive(uniffi::Object)]
pub struct MoqServer {
task: Task<ServerState>,
}
#[uniffi::export]
impl MoqServer {
#[uniffi::constructor]
pub fn new() -> Arc<Self> {
let _guard = crate::ffi::RUNTIME.enter();
Arc::new(Self {
task: Task::new(ServerState {
config: moq_native::ServerConfig::default(),
publish: None,
consume: None,
server: None,
}),
})
}
pub fn set_bind(&self, addr: String) -> Result<(), MoqError> {
if addr.parse::<std::net::SocketAddr>().is_err() {
let port_ok = addr
.rsplit_once(':')
.is_some_and(|(_, port)| port.parse::<u16>().is_ok());
if !port_ok {
return Err(MoqError::Bind(format!("invalid bind address: {addr}")));
}
}
if let Some(mut state) = self.task.lock() {
state.config.bind = Some(addr);
}
Ok(())
}
pub fn set_tls_cert(&self, paths: Vec<String>) {
if let Some(mut state) = self.task.lock() {
state.config.tls.cert = paths.into_iter().map(PathBuf::from).collect();
}
}
pub fn set_tls_key(&self, paths: Vec<String>) {
if let Some(mut state) = self.task.lock() {
state.config.tls.key = paths.into_iter().map(PathBuf::from).collect();
}
}
pub fn set_tls_generate(&self, hostnames: Vec<String>) {
if let Some(mut state) = self.task.lock() {
state.config.tls.generate = hostnames;
}
}
pub fn set_publish(&self, origin: Option<Arc<MoqOriginProducer>>) {
if let Some(mut state) = self.task.lock() {
state.publish = origin;
}
}
pub fn set_consume(&self, origin: Option<Arc<MoqOriginProducer>>) {
if let Some(mut state) = self.task.lock() {
state.consume = origin;
}
}
pub async fn listen(&self) -> Result<String, MoqError> {
self.task.run(|mut state| async move { state.listen().await }).await
}
pub async fn accept(&self) -> Result<Option<Arc<MoqRequest>>, MoqError> {
self.task.run(|mut state| async move { state.accept().await }).await
}
pub fn cert_fingerprints(&self) -> Result<Vec<String>, MoqError> {
let state = self
.task
.lock()
.ok_or_else(|| MoqError::Bind("server is busy".into()))?;
let server = state
.server
.as_ref()
.ok_or_else(|| MoqError::Bind("not listening; call listen() first".into()))?;
let info_handle = server.tls_info();
let info = info_handle
.read()
.map_err(|err| MoqError::Bind(format!("tls info lock poisoned: {err}")))?;
Ok(info.fingerprints.clone())
}
pub fn cancel(&self) {
self.task.cancel();
}
}
struct RequestState {
request: Option<moq_native::Request>,
publish: Option<Arc<MoqOriginProducer>>,
consume: Option<Arc<MoqOriginProducer>>,
}
#[derive(uniffi::Object)]
pub struct MoqRequest {
task: Task<RequestState>,
transport: String,
url: Option<String>,
}
impl MoqRequest {
fn new(
request: moq_native::Request,
publish: Option<Arc<MoqOriginProducer>>,
consume: Option<Arc<MoqOriginProducer>>,
) -> Arc<Self> {
let transport = request.transport().to_string();
let url = request.url().map(|u| u.to_string());
Arc::new(Self {
task: Task::new(RequestState {
request: Some(request),
publish,
consume,
}),
transport,
url,
})
}
}
#[uniffi::export]
impl MoqRequest {
pub fn url(&self) -> Option<String> {
self.url.clone()
}
pub fn transport(&self) -> String {
self.transport.clone()
}
pub fn set_publish(&self, origin: Option<Arc<MoqOriginProducer>>) {
if let Some(mut state) = self.task.lock() {
state.publish = origin;
}
}
pub fn set_consume(&self, origin: Option<Arc<MoqOriginProducer>>) {
if let Some(mut state) = self.task.lock() {
state.consume = origin;
}
}
pub async fn ok(&self) -> Result<Arc<MoqSession>, MoqError> {
self.task
.run(|mut state| async move {
let request = state.request.take().ok_or(MoqError::AlreadyResponded)?;
let publish = state.publish.as_ref().map(|o| o.inner().consume());
let consume = state.consume.as_ref().map(|o| o.inner().clone());
let session = request
.with_publish(publish)
.with_consume(consume)
.ok()
.await
.map_err(|err| MoqError::Connect(format!("{err}")))?;
Ok(Arc::new(MoqSession::new(session)))
})
.await
}
pub async fn close(&self, code: u16) -> Result<(), MoqError> {
self.task
.run(move |mut state| async move {
let request = state.request.take().ok_or(MoqError::AlreadyResponded)?;
request
.close(code)
.await
.map_err(|err| MoqError::Reject(format!("{err}")))?;
Ok(())
})
.await
}
pub fn cancel(&self) {
self.task.cancel();
}
}