1use std::net;
2use std::path::PathBuf;
3
4use crate::QuicBackend;
5use moq_lite::Session;
6use std::sync::{Arc, RwLock};
7use url::Url;
8#[cfg(feature = "iroh")]
9use web_transport_iroh::iroh;
10
11use anyhow::Context;
12
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use futures::stream::FuturesUnordered;
16use futures::stream::StreamExt;
17
18#[derive(clap::Args, Clone, Default, Debug, serde::Serialize, serde::Deserialize)]
23#[serde(deny_unknown_fields)]
24#[non_exhaustive]
25pub struct ServerTlsConfig {
26 #[arg(long = "tls-cert", id = "tls-cert", env = "MOQ_SERVER_TLS_CERT")]
28 #[serde(default, skip_serializing_if = "Vec::is_empty")]
29 pub cert: Vec<PathBuf>,
30
31 #[arg(long = "tls-key", id = "tls-key", env = "MOQ_SERVER_TLS_KEY")]
33 #[serde(default, skip_serializing_if = "Vec::is_empty")]
34 pub key: Vec<PathBuf>,
35
36 #[arg(
39 long = "tls-generate",
40 id = "tls-generate",
41 value_delimiter = ',',
42 env = "MOQ_SERVER_TLS_GENERATE"
43 )]
44 #[serde(default, skip_serializing_if = "Vec::is_empty")]
45 pub generate: Vec<String>,
46}
47
48#[derive(clap::Args, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
50#[serde(deny_unknown_fields, default)]
51#[non_exhaustive]
52pub struct ServerConfig {
53 #[serde(alias = "listen")]
56 #[arg(id = "server-bind", long = "server-bind", alias = "listen", env = "MOQ_SERVER_BIND")]
57 pub bind: Option<net::SocketAddr>,
58
59 #[arg(id = "server-backend", long = "server-backend", env = "MOQ_SERVER_BACKEND")]
62 pub backend: Option<QuicBackend>,
63
64 #[arg(id = "server-quic-lb-id", long = "server-quic-lb-id", env = "MOQ_SERVER_QUIC_LB_ID")]
67 #[serde(default, skip_serializing_if = "Option::is_none")]
68 pub quic_lb_id: Option<ServerId>,
69
70 #[arg(
73 id = "server-quic-lb-nonce",
74 long = "server-quic-lb-nonce",
75 requires = "server-quic-lb-id",
76 env = "MOQ_SERVER_QUIC_LB_NONCE"
77 )]
78 #[serde(default, skip_serializing_if = "Option::is_none")]
79 pub quic_lb_nonce: Option<usize>,
80
81 #[serde(skip_serializing_if = "Option::is_none")]
83 #[arg(
84 id = "server-max-streams",
85 long = "server-max-streams",
86 env = "MOQ_SERVER_MAX_STREAMS"
87 )]
88 pub max_streams: Option<u64>,
89
90 #[serde(default, skip_serializing_if = "Vec::is_empty")]
98 #[arg(id = "server-version", long = "server-version", env = "MOQ_SERVER_VERSION")]
99 pub version: Vec<moq_lite::Version>,
100
101 #[command(flatten)]
102 #[serde(default)]
103 pub tls: ServerTlsConfig,
104}
105
106impl ServerConfig {
107 pub fn init(self) -> anyhow::Result<Server> {
108 Server::new(self)
109 }
110
111 pub fn versions(&self) -> moq_lite::Versions {
113 if self.version.is_empty() {
114 moq_lite::Versions::all()
115 } else {
116 moq_lite::Versions::from(self.version.clone())
117 }
118 }
119}
120
121pub struct Server {
125 moq: moq_lite::Server,
126 versions: moq_lite::Versions,
127 accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<Request>>>,
128 #[cfg(feature = "iroh")]
129 iroh: Option<iroh::Endpoint>,
130 #[cfg(feature = "quinn")]
131 quinn: Option<crate::quinn::QuinnServer>,
132 #[cfg(feature = "quiche")]
133 quiche: Option<crate::quiche::QuicheServer>,
134}
135
136impl Server {
137 pub fn new(config: ServerConfig) -> anyhow::Result<Self> {
138 let backend = config.backend.clone().unwrap_or({
139 #[cfg(feature = "quinn")]
140 {
141 QuicBackend::Quinn
142 }
143 #[cfg(all(feature = "quiche", not(feature = "quinn")))]
144 {
145 QuicBackend::Quiche
146 }
147 #[cfg(all(not(feature = "quiche"), not(feature = "quinn")))]
148 panic!("no QUIC backend compiled; enable quinn or quiche feature");
149 });
150
151 let versions = config.versions();
152
153 #[cfg(feature = "quinn")]
154 #[allow(unreachable_patterns)]
155 let quinn = match backend {
156 QuicBackend::Quinn => Some(crate::quinn::QuinnServer::new(config.clone())?),
157 _ => None,
158 };
159
160 #[cfg(feature = "quiche")]
161 let quiche = match backend {
162 QuicBackend::Quiche => Some(crate::quiche::QuicheServer::new(config)?),
163 _ => None,
164 };
165
166 Ok(Server {
167 accept: Default::default(),
168 moq: moq_lite::Server::new().with_versions(versions.clone()),
169 versions,
170 #[cfg(feature = "iroh")]
171 iroh: None,
172 #[cfg(feature = "quinn")]
173 quinn,
174 #[cfg(feature = "quiche")]
175 quiche,
176 })
177 }
178
179 #[cfg(feature = "iroh")]
180 pub fn with_iroh(mut self, iroh: Option<iroh::Endpoint>) -> Self {
181 self.iroh = iroh;
182 self
183 }
184
185 pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
186 self.moq = self.moq.with_publish(publish);
187 self
188 }
189
190 pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
191 self.moq = self.moq.with_consume(consume);
192 self
193 }
194
195 pub fn tls_info(&self) -> Arc<RwLock<ServerTlsInfo>> {
197 #[cfg(feature = "quinn")]
198 if let Some(quinn) = self.quinn.as_ref() {
199 return quinn.tls_info();
200 }
201 #[cfg(feature = "quiche")]
202 if let Some(quiche) = self.quiche.as_ref() {
203 return quiche.tls_info();
204 }
205 unreachable!("no QUIC backend compiled");
206 }
207
208 #[cfg(not(any(feature = "quinn", feature = "quiche", feature = "iroh")))]
209 pub async fn accept(&mut self) -> Option<Request> {
210 unreachable!("no QUIC backend compiled; enable quinn, quiche, or iroh feature");
211 }
212
213 #[cfg(any(feature = "quinn", feature = "quiche", feature = "iroh"))]
221 pub async fn accept(&mut self) -> Option<Request> {
222 loop {
223 #[cfg(feature = "iroh")]
225 let iroh_accept = async {
226 #[cfg(feature = "iroh")]
227 if let Some(endpoint) = self.iroh.as_mut() {
228 return endpoint.accept().await;
229 }
230 None
231 };
232 #[cfg(not(feature = "iroh"))]
233 let iroh_accept = async { None::<()> };
234
235 #[cfg(feature = "quinn")]
236 let quinn_accept = async {
237 #[cfg(feature = "quinn")]
238 if let Some(quinn) = self.quinn.as_mut() {
239 return quinn.accept().await;
240 }
241 None
242 };
243 #[cfg(not(feature = "quinn"))]
244 let quinn_accept = async { None::<()> };
245
246 #[cfg(feature = "quiche")]
247 let quiche_accept = async {
248 #[cfg(feature = "quiche")]
249 if let Some(quiche) = self.quiche.as_mut() {
250 return quiche.accept().await;
251 }
252 None
253 };
254 #[cfg(not(feature = "quiche"))]
255 let quiche_accept = async { None::<()> };
256
257 let server = self.moq.clone();
258 let versions = self.versions.clone();
259
260 tokio::select! {
261 Some(_conn) = quinn_accept => {
262 #[cfg(feature = "quinn")]
263 {
264 let alpns = versions.alpns();
265 self.accept.push(async move {
266 let quinn = super::quinn::QuinnRequest::accept(_conn, alpns).await?;
267 Ok(Request {
268 server,
269 kind: RequestKind::Quinn(quinn),
270 })
271 }.boxed());
272 }
273 }
274 Some(_conn) = quiche_accept => {
275 #[cfg(feature = "quiche")]
276 {
277 let alpns = versions.alpns();
278 self.accept.push(async move {
279 let quiche = super::quiche::QuicheRequest::accept(_conn, alpns).await?;
280 Ok(Request {
281 server,
282 kind: RequestKind::Quiche(quiche),
283 })
284 }.boxed());
285 }
286 }
287 Some(_conn) = iroh_accept => {
288 #[cfg(feature = "iroh")]
289 self.accept.push(async move {
290 let iroh = super::iroh::IrohRequest::accept(_conn).await?;
291 Ok(Request {
292 server,
293 kind: RequestKind::Iroh(iroh),
294 })
295 }.boxed());
296 }
297 Some(res) = self.accept.next() => {
298 match res {
299 Ok(session) => return Some(session),
300 Err(err) => tracing::debug!(%err, "failed to accept session"),
301 }
302 }
303 _ = tokio::signal::ctrl_c() => {
304 self.close().await;
305 return None;
306 }
307 }
308 }
309 }
310
311 #[cfg(feature = "iroh")]
312 pub fn iroh_endpoint(&self) -> Option<&iroh::Endpoint> {
313 self.iroh.as_ref()
314 }
315
316 pub fn local_addr(&self) -> anyhow::Result<net::SocketAddr> {
317 #[cfg(feature = "quinn")]
318 if let Some(quinn) = self.quinn.as_ref() {
319 return quinn.local_addr();
320 }
321 #[cfg(feature = "quiche")]
322 if let Some(quiche) = self.quiche.as_ref() {
323 return quiche.local_addr();
324 }
325 unreachable!("no QUIC backend compiled");
326 }
327
328 pub async fn close(&mut self) {
329 #[cfg(feature = "quinn")]
330 if let Some(quinn) = self.quinn.as_mut() {
331 quinn.close();
332 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
333 }
334 #[cfg(feature = "quiche")]
335 if let Some(quiche) = self.quiche.as_mut() {
336 quiche.close();
337 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
338 }
339 #[cfg(feature = "iroh")]
340 if let Some(iroh) = self.iroh.take() {
341 iroh.close().await;
342 }
343 #[cfg(not(any(feature = "quinn", feature = "quiche", feature = "iroh")))]
344 unreachable!("no QUIC backend compiled");
345 }
346}
347
348pub(crate) enum RequestKind {
350 #[cfg(feature = "quinn")]
351 Quinn(crate::quinn::QuinnRequest),
352 #[cfg(feature = "quiche")]
353 Quiche(crate::quiche::QuicheRequest),
354 #[cfg(feature = "iroh")]
355 Iroh(crate::iroh::IrohRequest),
356}
357
358pub struct Request {
363 server: moq_lite::Server,
364 kind: RequestKind,
365}
366
367impl Request {
368 pub async fn close(self, _code: u16) -> anyhow::Result<()> {
370 match self.kind {
371 #[cfg(feature = "quinn")]
372 RequestKind::Quinn(request) => {
373 let status = web_transport_quinn::http::StatusCode::from_u16(_code).context("invalid status code")?;
374 request.close(status).await?;
375 Ok(())
376 }
377 #[cfg(feature = "quiche")]
378 RequestKind::Quiche(request) => {
379 let status = web_transport_quiche::http::StatusCode::from_u16(_code).context("invalid status code")?;
380 request
381 .reject(status)
382 .await
383 .map_err(|e| anyhow::anyhow!("failed to close quiche WebTransport request: {e}"))?;
384 Ok(())
385 }
386 #[cfg(feature = "iroh")]
387 RequestKind::Iroh(request) => {
388 let status = web_transport_iroh::http::StatusCode::from_u16(_code).context("invalid status code")?;
389 request.close(status).await?;
390 Ok(())
391 }
392 }
393 }
394
395 pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
397 self.server = self.server.with_publish(publish);
398 self
399 }
400
401 pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
403 self.server = self.server.with_consume(consume);
404 self
405 }
406
407 pub async fn ok(self) -> anyhow::Result<Session> {
409 match self.kind {
410 #[cfg(feature = "quinn")]
411 RequestKind::Quinn(request) => Ok(self.server.accept(request.ok().await?).await?),
412 #[cfg(feature = "quiche")]
413 RequestKind::Quiche(request) => {
414 let conn = request
415 .ok()
416 .await
417 .map_err(|e| anyhow::anyhow!("failed to accept quiche WebTransport: {e}"))?;
418 Ok(self.server.accept(conn).await?)
419 }
420 #[cfg(feature = "iroh")]
421 RequestKind::Iroh(request) => Ok(self.server.accept(request.ok().await?).await?),
422 }
423 }
424
425 pub fn url(&self) -> Option<&Url> {
427 #[cfg(not(any(feature = "quinn", feature = "quiche", feature = "iroh")))]
428 unreachable!("no QUIC backend compiled; enable quinn, quiche, or iroh feature");
429
430 match self.kind {
431 #[cfg(feature = "quinn")]
432 RequestKind::Quinn(ref request) => request.url(),
433 #[cfg(feature = "quiche")]
434 RequestKind::Quiche(ref request) => request.url(),
435 #[cfg(feature = "iroh")]
436 RequestKind::Iroh(ref request) => request.url(),
437 }
438 }
439}
440
441#[derive(Debug)]
443pub struct ServerTlsInfo {
444 #[cfg(feature = "quinn")]
445 pub(crate) certs: Vec<Arc<rustls::sign::CertifiedKey>>,
446 pub fingerprints: Vec<String>,
447}
448
449#[serde_with::serde_as]
451#[derive(Clone, serde::Serialize, serde::Deserialize)]
452pub struct ServerId(#[serde_as(as = "serde_with::hex::Hex")] pub(crate) Vec<u8>);
453
454impl ServerId {
455 #[allow(dead_code)]
456 pub(crate) fn len(&self) -> usize {
457 self.0.len()
458 }
459}
460
461impl std::fmt::Debug for ServerId {
462 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
463 f.debug_tuple("QuicLbServerId").field(&hex::encode(&self.0)).finish()
464 }
465}
466
467impl std::str::FromStr for ServerId {
468 type Err = hex::FromHexError;
469
470 fn from_str(s: &str) -> Result<Self, Self::Err> {
471 hex::decode(s).map(Self)
472 }
473}