1use std::path::PathBuf;
2use std::sync::Arc;
3
4use crate::error::MoqError;
5use crate::ffi::Task;
6use crate::origin::MoqOriginProducer;
7use crate::session::MoqSession;
8
9struct ServerState {
10 config: moq_native::ServerConfig,
11 publish: Option<Arc<MoqOriginProducer>>,
12 consume: Option<Arc<MoqOriginProducer>>,
13 server: Option<moq_native::Server>,
14}
15
16impl ServerState {
17 async fn listen(&mut self) -> Result<String, MoqError> {
18 if self.server.is_some() {
19 return Err(MoqError::Bind("already listening".into()));
20 }
21 let server = self
22 .config
23 .clone()
24 .init()
25 .map_err(|err| MoqError::Bind(format!("{err}")))?;
26 let addr = server
27 .local_addr()
28 .map_err(|err| MoqError::Bind(format!("{err}")))?
29 .to_string();
30 self.server = Some(server);
31 Ok(addr)
32 }
33
34 async fn accept(&mut self) -> Result<Option<Arc<MoqRequest>>, MoqError> {
35 let server = self
36 .server
37 .as_mut()
38 .ok_or_else(|| MoqError::Bind("not listening; call listen() first".into()))?;
39 let publish = self.publish.clone();
40 let consume = self.consume.clone();
41 match server.accept().await {
42 Some(request) => Ok(Some(MoqRequest::new(request, publish, consume))),
43 None => Ok(None),
44 }
45 }
46}
47
48#[derive(uniffi::Object)]
50pub struct MoqServer {
51 task: Task<ServerState>,
52}
53
54#[uniffi::export]
55impl MoqServer {
56 #[uniffi::constructor]
58 pub fn new() -> Arc<Self> {
59 let _guard = crate::ffi::RUNTIME.enter();
60 Arc::new(Self {
61 task: Task::new(ServerState {
62 config: moq_native::ServerConfig::default(),
63 publish: None,
64 consume: None,
65 server: None,
66 }),
67 })
68 }
69
70 pub fn set_bind(&self, addr: String) -> Result<(), MoqError> {
75 if addr.parse::<std::net::SocketAddr>().is_err() {
79 let port_ok = addr
80 .rsplit_once(':')
81 .is_some_and(|(_, port)| port.parse::<u16>().is_ok());
82 if !port_ok {
83 return Err(MoqError::Bind(format!("invalid bind address: {addr}")));
84 }
85 }
86 if let Some(mut state) = self.task.lock() {
87 state.config.bind = Some(addr);
88 }
89 Ok(())
90 }
91
92 pub fn set_tls_cert(&self, paths: Vec<String>) {
94 if let Some(mut state) = self.task.lock() {
95 state.config.tls.cert = paths.into_iter().map(PathBuf::from).collect();
96 }
97 }
98
99 pub fn set_tls_key(&self, paths: Vec<String>) {
101 if let Some(mut state) = self.task.lock() {
102 state.config.tls.key = paths.into_iter().map(PathBuf::from).collect();
103 }
104 }
105
106 pub fn set_tls_generate(&self, hostnames: Vec<String>) {
110 if let Some(mut state) = self.task.lock() {
111 state.config.tls.generate = hostnames;
112 }
113 }
114
115 pub fn set_publish(&self, origin: Option<Arc<MoqOriginProducer>>) {
117 if let Some(mut state) = self.task.lock() {
118 state.publish = origin;
119 }
120 }
121
122 pub fn set_consume(&self, origin: Option<Arc<MoqOriginProducer>>) {
124 if let Some(mut state) = self.task.lock() {
125 state.consume = origin;
126 }
127 }
128
129 pub async fn listen(&self) -> Result<String, MoqError> {
132 self.task.run(|mut state| async move { state.listen().await }).await
133 }
134
135 pub async fn accept(&self) -> Result<Option<Arc<MoqRequest>>, MoqError> {
139 self.task.run(|mut state| async move { state.accept().await }).await
140 }
141
142 pub fn cert_fingerprints(&self) -> Result<Vec<String>, MoqError> {
148 let state = self
149 .task
150 .lock()
151 .ok_or_else(|| MoqError::Bind("server is busy".into()))?;
152 let server = state
153 .server
154 .as_ref()
155 .ok_or_else(|| MoqError::Bind("not listening; call listen() first".into()))?;
156 let info_handle = server.tls_info();
157 let info = info_handle
158 .read()
159 .map_err(|err| MoqError::Bind(format!("tls info lock poisoned: {err}")))?;
160 Ok(info.fingerprints.clone())
161 }
162
163 pub fn cancel(&self) {
165 self.task.cancel();
166 }
167}
168
169struct RequestState {
170 request: Option<moq_native::Request>,
171 publish: Option<Arc<MoqOriginProducer>>,
172 consume: Option<Arc<MoqOriginProducer>>,
173}
174
175#[derive(uniffi::Object)]
177pub struct MoqRequest {
178 task: Task<RequestState>,
179 transport: String,
180 url: Option<String>,
181}
182
183impl MoqRequest {
184 fn new(
185 request: moq_native::Request,
186 publish: Option<Arc<MoqOriginProducer>>,
187 consume: Option<Arc<MoqOriginProducer>>,
188 ) -> Arc<Self> {
189 let transport = request.transport().to_string();
190 let url = request.url().map(|u| u.to_string());
191 Arc::new(Self {
192 task: Task::new(RequestState {
193 request: Some(request),
194 publish,
195 consume,
196 }),
197 transport,
198 url,
199 })
200 }
201}
202
203#[uniffi::export]
204impl MoqRequest {
205 pub fn url(&self) -> Option<String> {
207 self.url.clone()
208 }
209
210 pub fn transport(&self) -> String {
212 self.transport.clone()
213 }
214
215 pub fn set_publish(&self, origin: Option<Arc<MoqOriginProducer>>) {
218 if let Some(mut state) = self.task.lock() {
219 state.publish = origin;
220 }
221 }
222
223 pub fn set_consume(&self, origin: Option<Arc<MoqOriginProducer>>) {
226 if let Some(mut state) = self.task.lock() {
227 state.consume = origin;
228 }
229 }
230
231 pub async fn ok(&self) -> Result<Arc<MoqSession>, MoqError> {
235 self.task
236 .run(|mut state| async move {
237 let request = state.request.take().ok_or(MoqError::AlreadyResponded)?;
238 let publish = state.publish.as_ref().map(|o| o.inner().consume());
239 let consume = state.consume.as_ref().map(|o| o.inner().clone());
240 let session = request
241 .with_publish(publish)
242 .with_consume(consume)
243 .ok()
244 .await
245 .map_err(|err| MoqError::Connect(format!("{err}")))?;
246 Ok(Arc::new(MoqSession::new(session)))
247 })
248 .await
249 }
250
251 pub async fn close(&self, code: u16) -> Result<(), MoqError> {
255 self.task
256 .run(move |mut state| async move {
257 let request = state.request.take().ok_or(MoqError::AlreadyResponded)?;
258 request
259 .close(code)
260 .await
261 .map_err(|err| MoqError::Reject(format!("{err}")))?;
262 Ok(())
263 })
264 .await
265 }
266
267 pub fn cancel(&self) {
269 self.task.cancel();
270 }
271}