Skip to main content

moq_ffi/
server.rs

1use std::path::PathBuf;
2use std::sync::Arc;
3
4use crate::error::MoqError;
5use crate::ffi::Task;
6use crate::origin::MoqOriginProducer;
7use crate::session::MoqSession;
8
9struct ServerState {
10	config: moq_native::ServerConfig,
11	publish: Option<Arc<MoqOriginProducer>>,
12	consume: Option<Arc<MoqOriginProducer>>,
13	server: Option<moq_native::Server>,
14}
15
16impl ServerState {
17	async fn listen(&mut self) -> Result<String, MoqError> {
18		if self.server.is_some() {
19			return Err(MoqError::Bind("already listening".into()));
20		}
21		let server = self
22			.config
23			.clone()
24			.init()
25			.map_err(|err| MoqError::Bind(format!("{err}")))?;
26		let addr = server
27			.local_addr()
28			.map_err(|err| MoqError::Bind(format!("{err}")))?
29			.to_string();
30		self.server = Some(server);
31		Ok(addr)
32	}
33
34	async fn accept(&mut self) -> Result<Option<Arc<MoqRequest>>, MoqError> {
35		let server = self
36			.server
37			.as_mut()
38			.ok_or_else(|| MoqError::Bind("not listening; call listen() first".into()))?;
39		let publish = self.publish.clone();
40		let consume = self.consume.clone();
41		match server.accept().await {
42			Some(request) => Ok(Some(MoqRequest::new(request, publish, consume))),
43			None => Ok(None),
44		}
45	}
46}
47
48/// A MoQ server that accepts incoming QUIC/WebTransport sessions.
49#[derive(uniffi::Object)]
50pub struct MoqServer {
51	task: Task<ServerState>,
52}
53
54#[uniffi::export]
55impl MoqServer {
56	/// Create a new MoQ server with default configuration.
57	#[uniffi::constructor]
58	pub fn new() -> Arc<Self> {
59		let _guard = crate::ffi::RUNTIME.enter();
60		Arc::new(Self {
61			task: Task::new(ServerState {
62				config: moq_native::ServerConfig::default(),
63				publish: None,
64				consume: None,
65				server: None,
66			}),
67		})
68	}
69
70	/// Set the address to bind, e.g. `127.0.0.1:4443`, `[::]:443`, or `localhost:0`.
71	///
72	/// Validated syntactically up-front. DNS hostnames are accepted and resolved
73	/// at `listen()` time.
74	pub fn set_bind(&self, addr: String) -> Result<(), MoqError> {
75		// Mirrors `MoqClient::set_bind` by surfacing parse errors here rather
76		// than at listen() time. The server takes a String (not SocketAddr) so
77		// DNS hostnames are allowed; we only check syntactic structure here.
78		if addr.parse::<std::net::SocketAddr>().is_err() {
79			let port_ok = addr
80				.rsplit_once(':')
81				.is_some_and(|(_, port)| port.parse::<u16>().is_ok());
82			if !port_ok {
83				return Err(MoqError::Bind(format!("invalid bind address: {addr}")));
84			}
85		}
86		if let Some(mut state) = self.task.lock() {
87			state.config.bind = Some(addr);
88		}
89		Ok(())
90	}
91
92	/// Load TLS certificate chains from PEM files on disk.
93	pub fn set_tls_cert(&self, paths: Vec<String>) {
94		if let Some(mut state) = self.task.lock() {
95			state.config.tls.cert = paths.into_iter().map(PathBuf::from).collect();
96		}
97	}
98
99	/// Load TLS private keys from PEM files on disk.
100	pub fn set_tls_key(&self, paths: Vec<String>) {
101		if let Some(mut state) = self.task.lock() {
102			state.config.tls.key = paths.into_iter().map(PathBuf::from).collect();
103		}
104	}
105
106	/// Generate self-signed TLS certificates for the given hostnames.
107	///
108	/// Clients must either pin the certificate fingerprint or disable verification.
109	pub fn set_tls_generate(&self, hostnames: Vec<String>) {
110		if let Some(mut state) = self.task.lock() {
111			state.config.tls.generate = hostnames;
112		}
113	}
114
115	/// Set the origin to publish broadcasts to incoming sessions.
116	pub fn set_publish(&self, origin: Option<Arc<MoqOriginProducer>>) {
117		if let Some(mut state) = self.task.lock() {
118			state.publish = origin;
119		}
120	}
121
122	/// Set the origin to consume broadcasts from incoming sessions.
123	pub fn set_consume(&self, origin: Option<Arc<MoqOriginProducer>>) {
124		if let Some(mut state) = self.task.lock() {
125			state.consume = origin;
126		}
127	}
128
129	/// Bind the listening socket. Returns the bound local address as a string,
130	/// which is useful when binding to an ephemeral port (`:0`).
131	pub async fn listen(&self) -> Result<String, MoqError> {
132		self.task.run(|mut state| async move { state.listen().await }).await
133	}
134
135	/// Accept the next incoming session. Returns `None` when the server has closed.
136	///
137	/// `listen()` must be called first.
138	pub async fn accept(&self) -> Result<Option<Arc<MoqRequest>>, MoqError> {
139		self.task.run(|mut state| async move { state.accept().await }).await
140	}
141
142	/// SHA-256 fingerprints of the configured TLS certificates, hex-encoded.
143	///
144	/// Useful for pinning a generated self-signed certificate in a browser via
145	/// WebTransport's `serverCertificateHashes`. Returns an error if called
146	/// before `listen()`.
147	pub fn cert_fingerprints(&self) -> Result<Vec<String>, MoqError> {
148		let state = self
149			.task
150			.lock()
151			.ok_or_else(|| MoqError::Bind("server is busy".into()))?;
152		let server = state
153			.server
154			.as_ref()
155			.ok_or_else(|| MoqError::Bind("not listening; call listen() first".into()))?;
156		let info_handle = server.tls_info();
157		let info = info_handle
158			.read()
159			.map_err(|err| MoqError::Bind(format!("tls info lock poisoned: {err}")))?;
160		Ok(info.fingerprints.clone())
161	}
162
163	/// Cancel any in-flight `listen()` or `accept()` call.
164	pub fn cancel(&self) {
165		self.task.cancel();
166	}
167}
168
169struct RequestState {
170	request: Option<moq_native::Request>,
171	publish: Option<Arc<MoqOriginProducer>>,
172	consume: Option<Arc<MoqOriginProducer>>,
173}
174
175/// An incoming MoQ session that can be accepted or rejected.
176#[derive(uniffi::Object)]
177pub struct MoqRequest {
178	task: Task<RequestState>,
179	transport: String,
180	url: Option<String>,
181}
182
183impl MoqRequest {
184	fn new(
185		request: moq_native::Request,
186		publish: Option<Arc<MoqOriginProducer>>,
187		consume: Option<Arc<MoqOriginProducer>>,
188	) -> Arc<Self> {
189		let transport = request.transport().to_string();
190		let url = request.url().map(|u| u.to_string());
191		Arc::new(Self {
192			task: Task::new(RequestState {
193				request: Some(request),
194				publish,
195				consume,
196			}),
197			transport,
198			url,
199		})
200	}
201}
202
203#[uniffi::export]
204impl MoqRequest {
205	/// The URL provided by the client, if any.
206	pub fn url(&self) -> Option<String> {
207		self.url.clone()
208	}
209
210	/// The transport type, e.g. `"quic"`, `"iroh"`, or `"websocket"`.
211	pub fn transport(&self) -> String {
212		self.transport.clone()
213	}
214
215	/// Override the publish origin for this session. Falls back to the server's
216	/// configured publish origin if unset.
217	pub fn set_publish(&self, origin: Option<Arc<MoqOriginProducer>>) {
218		if let Some(mut state) = self.task.lock() {
219			state.publish = origin;
220		}
221	}
222
223	/// Override the consume origin for this session. Falls back to the server's
224	/// configured consume origin if unset.
225	pub fn set_consume(&self, origin: Option<Arc<MoqOriginProducer>>) {
226		if let Some(mut state) = self.task.lock() {
227			state.consume = origin;
228		}
229	}
230
231	/// Complete the MoQ handshake and return the established session.
232	///
233	/// Returns `AlreadyResponded` if `ok()` or `close()` has already been called.
234	pub async fn ok(&self) -> Result<Arc<MoqSession>, MoqError> {
235		self.task
236			.run(|mut state| async move {
237				let request = state.request.take().ok_or(MoqError::AlreadyResponded)?;
238				let publish = state.publish.as_ref().map(|o| o.inner().consume());
239				let consume = state.consume.as_ref().map(|o| o.inner().clone());
240				let session = request
241					.with_publish(publish)
242					.with_consume(consume)
243					.ok()
244					.await
245					.map_err(|err| MoqError::Connect(format!("{err}")))?;
246				Ok(Arc::new(MoqSession::new(session)))
247			})
248			.await
249	}
250
251	/// Reject the session with the given HTTP status code.
252	///
253	/// Returns `AlreadyResponded` if `ok()` or `close()` has already been called.
254	pub async fn close(&self, code: u16) -> Result<(), MoqError> {
255		self.task
256			.run(move |mut state| async move {
257				let request = state.request.take().ok_or(MoqError::AlreadyResponded)?;
258				request
259					.close(code)
260					.await
261					.map_err(|err| MoqError::Reject(format!("{err}")))?;
262				Ok(())
263			})
264			.await
265	}
266
267	/// Cancel any in-flight `ok()` or `close()` call.
268	pub fn cancel(&self) {
269		self.task.cancel();
270	}
271}