1use std::net;
2use std::path::PathBuf;
3
4use crate::QuicBackend;
5use moq_lite::Session;
6use std::sync::{Arc, RwLock};
7use url::Url;
8#[cfg(feature = "iroh")]
9use web_transport_iroh::iroh;
10
11use anyhow::Context;
12
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use futures::stream::FuturesUnordered;
16use futures::stream::StreamExt;
17
18#[derive(clap::Args, Clone, Default, Debug, serde::Serialize, serde::Deserialize)]
23#[serde(deny_unknown_fields)]
24#[non_exhaustive]
25pub struct ServerTlsConfig {
26 #[arg(long = "tls-cert", id = "tls-cert", env = "MOQ_SERVER_TLS_CERT")]
28 #[serde(default, skip_serializing_if = "Vec::is_empty")]
29 pub cert: Vec<PathBuf>,
30
31 #[arg(long = "tls-key", id = "tls-key", env = "MOQ_SERVER_TLS_KEY")]
33 #[serde(default, skip_serializing_if = "Vec::is_empty")]
34 pub key: Vec<PathBuf>,
35
36 #[arg(
39 long = "tls-generate",
40 id = "tls-generate",
41 value_delimiter = ',',
42 env = "MOQ_SERVER_TLS_GENERATE"
43 )]
44 #[serde(default, skip_serializing_if = "Vec::is_empty")]
45 pub generate: Vec<String>,
46}
47
48#[derive(clap::Args, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
50#[serde(deny_unknown_fields, default)]
51#[non_exhaustive]
52pub struct ServerConfig {
53 #[serde(alias = "listen")]
56 #[arg(id = "server-bind", long = "server-bind", alias = "listen", env = "MOQ_SERVER_BIND")]
57 pub bind: Option<net::SocketAddr>,
58
59 #[arg(id = "server-backend", long = "server-backend", env = "MOQ_SERVER_BACKEND")]
62 pub backend: Option<QuicBackend>,
63
64 #[arg(id = "server-quic-lb-id", long = "server-quic-lb-id", env = "MOQ_SERVER_QUIC_LB_ID")]
67 #[serde(default, skip_serializing_if = "Option::is_none")]
68 pub quic_lb_id: Option<ServerId>,
69
70 #[arg(
73 id = "server-quic-lb-nonce",
74 long = "server-quic-lb-nonce",
75 requires = "server-quic-lb-id",
76 env = "MOQ_SERVER_QUIC_LB_NONCE"
77 )]
78 #[serde(default, skip_serializing_if = "Option::is_none")]
79 pub quic_lb_nonce: Option<usize>,
80
81 #[serde(skip_serializing_if = "Option::is_none")]
83 #[arg(
84 id = "server-max-streams",
85 long = "server-max-streams",
86 env = "MOQ_SERVER_MAX_STREAMS"
87 )]
88 pub max_streams: Option<u64>,
89
90 #[serde(default, skip_serializing_if = "Vec::is_empty")]
98 #[arg(id = "server-version", long = "server-version", env = "MOQ_SERVER_VERSION")]
99 pub version: Vec<moq_lite::Version>,
100
101 #[command(flatten)]
102 #[serde(default)]
103 pub tls: ServerTlsConfig,
104}
105
106impl ServerConfig {
107 pub fn init(self) -> anyhow::Result<Server> {
108 Server::new(self)
109 }
110
111 pub fn versions(&self) -> moq_lite::Versions {
113 if self.version.is_empty() {
114 moq_lite::Versions::all()
115 } else {
116 moq_lite::Versions::from(self.version.clone())
117 }
118 }
119}
120
121pub struct Server {
125 moq: moq_lite::Server,
126 versions: moq_lite::Versions,
127 accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<Request>>>,
128 #[cfg(feature = "iroh")]
129 iroh: Option<iroh::Endpoint>,
130 #[cfg(feature = "noq")]
131 noq: Option<crate::noq::NoqServer>,
132 #[cfg(feature = "quinn")]
133 quinn: Option<crate::quinn::QuinnServer>,
134 #[cfg(feature = "quiche")]
135 quiche: Option<crate::quiche::QuicheServer>,
136 #[cfg(feature = "websocket")]
137 websocket: Option<crate::websocket::WebSocketListener>,
138}
139
140impl Server {
141 pub fn new(config: ServerConfig) -> anyhow::Result<Self> {
142 let backend = config.backend.clone().unwrap_or({
143 #[cfg(feature = "quinn")]
144 {
145 QuicBackend::Quinn
146 }
147 #[cfg(all(feature = "noq", not(feature = "quinn")))]
148 {
149 QuicBackend::Noq
150 }
151 #[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
152 {
153 QuicBackend::Quiche
154 }
155 #[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
156 panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
157 });
158
159 let versions = config.versions();
160
161 #[cfg(feature = "noq")]
162 #[allow(unreachable_patterns)]
163 let noq = match backend {
164 QuicBackend::Noq => Some(crate::noq::NoqServer::new(config.clone())?),
165 _ => None,
166 };
167
168 #[cfg(feature = "quinn")]
169 #[allow(unreachable_patterns)]
170 let quinn = match backend {
171 QuicBackend::Quinn => Some(crate::quinn::QuinnServer::new(config.clone())?),
172 _ => None,
173 };
174
175 #[cfg(feature = "quiche")]
176 let quiche = match backend {
177 QuicBackend::Quiche => Some(crate::quiche::QuicheServer::new(config)?),
178 _ => None,
179 };
180
181 Ok(Server {
182 accept: Default::default(),
183 moq: moq_lite::Server::new().with_versions(versions.clone()),
184 versions,
185 #[cfg(feature = "iroh")]
186 iroh: None,
187 #[cfg(feature = "noq")]
188 noq,
189 #[cfg(feature = "quinn")]
190 quinn,
191 #[cfg(feature = "quiche")]
192 quiche,
193 #[cfg(feature = "websocket")]
194 websocket: None,
195 })
196 }
197
198 #[cfg(feature = "websocket")]
204 pub fn with_websocket(mut self, websocket: Option<crate::websocket::WebSocketListener>) -> Self {
205 self.websocket = websocket;
206 self
207 }
208
209 #[cfg(feature = "iroh")]
210 pub fn with_iroh(mut self, iroh: Option<iroh::Endpoint>) -> Self {
211 self.iroh = iroh;
212 self
213 }
214
215 pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
216 self.moq = self.moq.with_publish(publish);
217 self
218 }
219
220 pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
221 self.moq = self.moq.with_consume(consume);
222 self
223 }
224
225 pub fn tls_info(&self) -> Arc<RwLock<ServerTlsInfo>> {
227 #[cfg(feature = "noq")]
228 if let Some(noq) = self.noq.as_ref() {
229 return noq.tls_info();
230 }
231 #[cfg(feature = "quinn")]
232 if let Some(quinn) = self.quinn.as_ref() {
233 return quinn.tls_info();
234 }
235 #[cfg(feature = "quiche")]
236 if let Some(quiche) = self.quiche.as_ref() {
237 return quiche.tls_info();
238 }
239 unreachable!("no QUIC backend compiled");
240 }
241
242 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
243 pub async fn accept(&mut self) -> Option<Request> {
244 unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
245 }
246
247 #[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh"))]
255 pub async fn accept(&mut self) -> Option<Request> {
256 loop {
257 #[cfg(feature = "noq")]
259 let noq_accept = async {
260 #[cfg(feature = "noq")]
261 if let Some(noq) = self.noq.as_mut() {
262 return noq.accept().await;
263 }
264 None
265 };
266 #[cfg(not(feature = "noq"))]
267 let noq_accept = async { None::<()> };
268
269 #[cfg(feature = "iroh")]
270 let iroh_accept = async {
271 #[cfg(feature = "iroh")]
272 if let Some(endpoint) = self.iroh.as_mut() {
273 return endpoint.accept().await;
274 }
275 None
276 };
277 #[cfg(not(feature = "iroh"))]
278 let iroh_accept = async { None::<()> };
279
280 #[cfg(feature = "quinn")]
281 let quinn_accept = async {
282 #[cfg(feature = "quinn")]
283 if let Some(quinn) = self.quinn.as_mut() {
284 return quinn.accept().await;
285 }
286 None
287 };
288 #[cfg(not(feature = "quinn"))]
289 let quinn_accept = async { None::<()> };
290
291 #[cfg(feature = "quiche")]
292 let quiche_accept = async {
293 #[cfg(feature = "quiche")]
294 if let Some(quiche) = self.quiche.as_mut() {
295 return quiche.accept().await;
296 }
297 None
298 };
299 #[cfg(not(feature = "quiche"))]
300 let quiche_accept = async { None::<()> };
301
302 #[cfg(feature = "websocket")]
303 let ws_ref = self.websocket.as_ref();
304 #[cfg(feature = "websocket")]
305 let ws_accept = async {
306 match ws_ref {
307 Some(ws) => ws.accept().await,
308 None => std::future::pending().await,
309 }
310 };
311 #[cfg(not(feature = "websocket"))]
312 let ws_accept = std::future::pending::<Option<anyhow::Result<()>>>();
313
314 let server = self.moq.clone();
315 let versions = self.versions.clone();
316
317 tokio::select! {
318 Some(_conn) = noq_accept => {
319 #[cfg(feature = "noq")]
320 {
321 let alpns = versions.alpns();
322 self.accept.push(async move {
323 let noq = super::noq::NoqRequest::accept(_conn, alpns).await?;
324 Ok(Request {
325 server,
326 kind: RequestKind::Noq(noq),
327 })
328 }.boxed());
329 }
330 }
331 Some(_conn) = quinn_accept => {
332 #[cfg(feature = "quinn")]
333 {
334 let alpns = versions.alpns();
335 self.accept.push(async move {
336 let quinn = super::quinn::QuinnRequest::accept(_conn, alpns).await?;
337 Ok(Request {
338 server,
339 kind: RequestKind::Quinn(quinn),
340 })
341 }.boxed());
342 }
343 }
344 Some(_conn) = quiche_accept => {
345 #[cfg(feature = "quiche")]
346 {
347 let alpns = versions.alpns();
348 self.accept.push(async move {
349 let quiche = super::quiche::QuicheRequest::accept(_conn, alpns).await?;
350 Ok(Request {
351 server,
352 kind: RequestKind::Quiche(quiche),
353 })
354 }.boxed());
355 }
356 }
357 Some(_conn) = iroh_accept => {
358 #[cfg(feature = "iroh")]
359 self.accept.push(async move {
360 let iroh = super::iroh::IrohRequest::accept(_conn).await?;
361 Ok(Request {
362 server,
363 kind: RequestKind::Iroh(iroh),
364 })
365 }.boxed());
366 }
367 Some(_res) = ws_accept => {
368 #[cfg(feature = "websocket")]
369 match _res {
370 Ok(session) => {
371 return Some(Request {
372 server,
373 kind: RequestKind::WebSocket(session),
374 });
375 }
376 Err(err) => tracing::debug!(%err, "failed to accept WebSocket session"),
377 }
378 }
379 Some(res) = self.accept.next() => {
380 match res {
381 Ok(session) => return Some(session),
382 Err(err) => tracing::debug!(%err, "failed to accept session"),
383 }
384 }
385 _ = tokio::signal::ctrl_c() => {
386 self.close().await;
387 return None;
388 }
389 }
390 }
391 }
392
393 #[cfg(feature = "iroh")]
394 pub fn iroh_endpoint(&self) -> Option<&iroh::Endpoint> {
395 self.iroh.as_ref()
396 }
397
398 pub fn local_addr(&self) -> anyhow::Result<net::SocketAddr> {
399 #[cfg(feature = "noq")]
400 if let Some(noq) = self.noq.as_ref() {
401 return noq.local_addr();
402 }
403 #[cfg(feature = "quinn")]
404 if let Some(quinn) = self.quinn.as_ref() {
405 return quinn.local_addr();
406 }
407 #[cfg(feature = "quiche")]
408 if let Some(quiche) = self.quiche.as_ref() {
409 return quiche.local_addr();
410 }
411 unreachable!("no QUIC backend compiled");
412 }
413
414 #[cfg(feature = "websocket")]
415 pub fn websocket_local_addr(&self) -> Option<net::SocketAddr> {
416 self.websocket.as_ref().and_then(|ws| ws.local_addr().ok())
417 }
418
419 pub async fn close(&mut self) {
420 #[cfg(feature = "noq")]
421 if let Some(noq) = self.noq.as_mut() {
422 noq.close();
423 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
424 }
425 #[cfg(feature = "quinn")]
426 if let Some(quinn) = self.quinn.as_mut() {
427 quinn.close();
428 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
429 }
430 #[cfg(feature = "quiche")]
431 if let Some(quiche) = self.quiche.as_mut() {
432 quiche.close();
433 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
434 }
435 #[cfg(feature = "iroh")]
436 if let Some(iroh) = self.iroh.take() {
437 iroh.close().await;
438 }
439 #[cfg(feature = "websocket")]
440 {
441 let _ = self.websocket.take();
442 }
443 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
444 unreachable!("no QUIC backend compiled");
445 }
446}
447
448pub(crate) enum RequestKind {
450 #[cfg(feature = "noq")]
451 Noq(crate::noq::NoqRequest),
452 #[cfg(feature = "quinn")]
453 Quinn(crate::quinn::QuinnRequest),
454 #[cfg(feature = "quiche")]
455 Quiche(crate::quiche::QuicheRequest),
456 #[cfg(feature = "iroh")]
457 Iroh(crate::iroh::IrohRequest),
458 #[cfg(feature = "websocket")]
459 WebSocket(qmux::Session),
460}
461
462pub struct Request {
467 server: moq_lite::Server,
468 kind: RequestKind,
469}
470
471impl Request {
472 pub async fn close(self, _code: u16) -> anyhow::Result<()> {
474 match self.kind {
475 #[cfg(feature = "noq")]
476 RequestKind::Noq(request) => {
477 let status = web_transport_noq::http::StatusCode::from_u16(_code).context("invalid status code")?;
478 request.close(status).await?;
479 Ok(())
480 }
481 #[cfg(feature = "quinn")]
482 RequestKind::Quinn(request) => {
483 let status = web_transport_quinn::http::StatusCode::from_u16(_code).context("invalid status code")?;
484 request.close(status).await?;
485 Ok(())
486 }
487 #[cfg(feature = "quiche")]
488 RequestKind::Quiche(request) => {
489 let status = web_transport_quiche::http::StatusCode::from_u16(_code).context("invalid status code")?;
490 request
491 .reject(status)
492 .await
493 .map_err(|e| anyhow::anyhow!("failed to close quiche WebTransport request: {e}"))?;
494 Ok(())
495 }
496 #[cfg(feature = "iroh")]
497 RequestKind::Iroh(request) => {
498 let status = web_transport_iroh::http::StatusCode::from_u16(_code).context("invalid status code")?;
499 request.close(status).await?;
500 Ok(())
501 }
502 #[cfg(feature = "websocket")]
503 RequestKind::WebSocket(_session) => {
504 Ok(())
506 }
507 }
508 }
509
510 pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
512 self.server = self.server.with_publish(publish);
513 self
514 }
515
516 pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
518 self.server = self.server.with_consume(consume);
519 self
520 }
521
522 pub async fn ok(self) -> anyhow::Result<Session> {
524 match self.kind {
525 #[cfg(feature = "noq")]
526 RequestKind::Noq(request) => Ok(self.server.accept(request.ok().await?).await?),
527 #[cfg(feature = "quinn")]
528 RequestKind::Quinn(request) => Ok(self.server.accept(request.ok().await?).await?),
529 #[cfg(feature = "quiche")]
530 RequestKind::Quiche(request) => {
531 let conn = request
532 .ok()
533 .await
534 .map_err(|e| anyhow::anyhow!("failed to accept quiche WebTransport: {e}"))?;
535 Ok(self.server.accept(conn).await?)
536 }
537 #[cfg(feature = "iroh")]
538 RequestKind::Iroh(request) => Ok(self.server.accept(request.ok().await?).await?),
539 #[cfg(feature = "websocket")]
540 RequestKind::WebSocket(session) => Ok(self.server.accept(session).await?),
541 }
542 }
543
544 pub fn transport(&self) -> &'static str {
546 match self.kind {
547 #[cfg(feature = "noq")]
548 RequestKind::Noq(_) => "quic",
549 #[cfg(feature = "quinn")]
550 RequestKind::Quinn(_) => "quic",
551 #[cfg(feature = "quiche")]
552 RequestKind::Quiche(_) => "quic",
553 #[cfg(feature = "iroh")]
554 RequestKind::Iroh(_) => "iroh",
555 #[cfg(feature = "websocket")]
556 RequestKind::WebSocket(_) => "websocket",
557 }
558 }
559
560 pub fn url(&self) -> Option<&Url> {
562 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
563 unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
564
565 match self.kind {
566 #[cfg(feature = "noq")]
567 RequestKind::Noq(ref request) => request.url(),
568 #[cfg(feature = "quinn")]
569 RequestKind::Quinn(ref request) => request.url(),
570 #[cfg(feature = "quiche")]
571 RequestKind::Quiche(ref request) => request.url(),
572 #[cfg(feature = "iroh")]
573 RequestKind::Iroh(ref request) => request.url(),
574 #[cfg(feature = "websocket")]
575 RequestKind::WebSocket(_) => None,
576 }
577 }
578}
579
580#[derive(Debug)]
582pub struct ServerTlsInfo {
583 #[cfg(any(feature = "noq", feature = "quinn"))]
584 pub(crate) certs: Vec<Arc<rustls::sign::CertifiedKey>>,
585 pub fingerprints: Vec<String>,
586}
587
588#[serde_with::serde_as]
590#[derive(Clone, serde::Serialize, serde::Deserialize)]
591pub struct ServerId(#[serde_as(as = "serde_with::hex::Hex")] pub(crate) Vec<u8>);
592
593impl ServerId {
594 #[allow(dead_code)]
595 pub(crate) fn len(&self) -> usize {
596 self.0.len()
597 }
598}
599
600impl std::fmt::Debug for ServerId {
601 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
602 f.debug_tuple("QuicLbServerId").field(&hex::encode(&self.0)).finish()
603 }
604}
605
606impl std::str::FromStr for ServerId {
607 type Err = hex::FromHexError;
608
609 fn from_str(s: &str) -> Result<Self, Self::Err> {
610 hex::decode(s).map(Self)
611 }
612}