1use std::net;
2#[cfg(test)]
3use std::path::PathBuf;
4
5use crate::{Error, QuicBackend};
6use moq_net::Session;
7use std::sync::{Arc, RwLock};
8use url::Url;
9#[cfg(feature = "iroh")]
10use web_transport_iroh::iroh;
11
12use futures::FutureExt;
13use futures::future::BoxFuture;
14use futures::stream::FuturesUnordered;
15use futures::stream::StreamExt;
16
17#[derive(clap::Args, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
19#[serde(deny_unknown_fields, default)]
20#[non_exhaustive]
21pub struct ServerConfig {
22 #[serde(alias = "listen")]
30 #[arg(id = "server-bind", long = "server-bind", alias = "listen", env = "MOQ_SERVER_BIND")]
31 pub bind: Option<String>,
32
33 #[arg(id = "server-backend", long = "server-backend", env = "MOQ_SERVER_BACKEND")]
36 pub backend: Option<QuicBackend>,
37
38 #[arg(id = "server-quic-lb-id", long = "server-quic-lb-id", env = "MOQ_SERVER_QUIC_LB_ID")]
41 #[serde(default, skip_serializing_if = "Option::is_none")]
42 pub quic_lb_id: Option<ServerId>,
43
44 #[arg(
47 id = "server-quic-lb-nonce",
48 long = "server-quic-lb-nonce",
49 requires = "server-quic-lb-id",
50 env = "MOQ_SERVER_QUIC_LB_NONCE"
51 )]
52 #[serde(default, skip_serializing_if = "Option::is_none")]
53 pub quic_lb_nonce: Option<usize>,
54
55 #[arg(
63 id = "server-preferred-v4",
64 long = "server-preferred-v4",
65 env = "MOQ_SERVER_PREFERRED_V4"
66 )]
67 #[serde(default, skip_serializing_if = "Option::is_none")]
68 pub preferred_v4: Option<net::SocketAddrV4>,
69
70 #[arg(
74 id = "server-preferred-v6",
75 long = "server-preferred-v6",
76 env = "MOQ_SERVER_PREFERRED_V6"
77 )]
78 #[serde(default, skip_serializing_if = "Option::is_none")]
79 pub preferred_v6: Option<net::SocketAddrV6>,
80
81 #[serde(skip_serializing_if = "Option::is_none")]
83 #[arg(
84 id = "server-max-streams",
85 long = "server-max-streams",
86 env = "MOQ_SERVER_MAX_STREAMS"
87 )]
88 pub max_streams: Option<u64>,
89
90 #[serde(default, skip_serializing_if = "Vec::is_empty")]
98 #[arg(id = "server-version", long = "server-version", env = "MOQ_SERVER_VERSION")]
99 pub version: Vec<moq_net::Version>,
100
101 #[command(flatten)]
102 #[serde(default)]
103 pub tls: crate::tls::Server,
104}
105
106impl ServerConfig {
107 pub fn init(self) -> crate::Result<Server> {
108 Server::new(self)
109 }
110
111 pub fn versions(&self) -> moq_net::Versions {
113 if self.version.is_empty() {
114 moq_net::Versions::all()
115 } else {
116 moq_net::Versions::from(self.version.clone())
117 }
118 }
119}
120
121pub(crate) const DEFAULT_BIND: &str = "[::]:443";
123
124pub struct Server {
128 moq: moq_net::Server,
129 versions: moq_net::Versions,
130 accept: FuturesUnordered<BoxFuture<'static, crate::Result<Request>>>,
131 #[cfg(feature = "iroh")]
132 iroh: Option<iroh::Endpoint>,
133 #[cfg(feature = "noq")]
134 noq: Option<crate::noq::NoqServer>,
135 #[cfg(feature = "quinn")]
136 quinn: Option<crate::quinn::QuinnServer>,
137 #[cfg(feature = "quiche")]
138 quiche: Option<crate::quiche::QuicheServer>,
139 #[cfg(feature = "websocket")]
140 websocket: Option<crate::websocket::Listener>,
141}
142
143impl Server {
144 pub fn new(config: ServerConfig) -> crate::Result<Self> {
145 let backend = config.backend.clone().unwrap_or({
146 #[cfg(feature = "quinn")]
147 {
148 QuicBackend::Quinn
149 }
150 #[cfg(all(feature = "noq", not(feature = "quinn")))]
151 {
152 QuicBackend::Noq
153 }
154 #[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
155 {
156 QuicBackend::Quiche
157 }
158 #[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
159 panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
160 });
161
162 let versions = config.versions();
163
164 if !config.tls.root.is_empty() {
165 #[cfg(feature = "quinn")]
166 let quinn_backend = matches!(backend, QuicBackend::Quinn);
167 #[cfg(not(feature = "quinn"))]
168 let quinn_backend = false;
169 if !quinn_backend {
170 return Err(Error::MtlsQuinnOnly);
171 }
172 }
173
174 #[cfg(feature = "noq")]
175 #[allow(unreachable_patterns)]
176 let noq = match backend {
177 QuicBackend::Noq => Some(crate::noq::NoqServer::new(config.clone())?),
178 _ => None,
179 };
180
181 #[cfg(feature = "quinn")]
182 #[allow(unreachable_patterns)]
183 let quinn = match backend {
184 QuicBackend::Quinn => Some(crate::quinn::QuinnServer::new(config.clone())?),
185 _ => None,
186 };
187
188 #[cfg(feature = "quiche")]
189 let quiche = match backend {
190 QuicBackend::Quiche => Some(crate::quiche::QuicheServer::new(config)?),
191 _ => None,
192 };
193
194 Ok(Server {
195 accept: Default::default(),
196 moq: moq_net::Server::new().with_versions(versions.clone()),
197 versions,
198 #[cfg(feature = "iroh")]
199 iroh: None,
200 #[cfg(feature = "noq")]
201 noq,
202 #[cfg(feature = "quinn")]
203 quinn,
204 #[cfg(feature = "quiche")]
205 quiche,
206 #[cfg(feature = "websocket")]
207 websocket: None,
208 })
209 }
210
211 #[cfg(feature = "websocket")]
217 pub fn with_websocket(mut self, websocket: Option<crate::websocket::Listener>) -> Self {
218 self.websocket = websocket;
219 self
220 }
221
222 #[cfg(feature = "iroh")]
223 pub fn with_iroh(mut self, iroh: Option<iroh::Endpoint>) -> Self {
224 self.iroh = iroh;
225 self
226 }
227
228 pub fn with_publish(mut self, publish: impl Into<Option<moq_net::OriginConsumer>>) -> Self {
229 self.moq = self.moq.with_publish(publish);
230 self
231 }
232
233 pub fn with_consume(mut self, consume: impl Into<Option<moq_net::OriginProducer>>) -> Self {
234 self.moq = self.moq.with_consume(consume);
235 self
236 }
237
238 pub fn with_stats(mut self, stats: moq_net::StatsHandle) -> Self {
240 self.moq = self.moq.with_stats(stats);
241 self
242 }
243
244 pub fn tls_info(&self) -> Arc<RwLock<crate::tls::Info>> {
246 #[cfg(feature = "noq")]
247 if let Some(noq) = self.noq.as_ref() {
248 return noq.tls_info();
249 }
250 #[cfg(feature = "quinn")]
251 if let Some(quinn) = self.quinn.as_ref() {
252 return quinn.tls_info();
253 }
254 #[cfg(feature = "quiche")]
255 if let Some(quiche) = self.quiche.as_ref() {
256 return quiche.tls_info();
257 }
258 unreachable!("no QUIC backend compiled");
259 }
260
261 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
262 pub async fn accept(&mut self) -> Option<Request> {
263 unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
264 }
265
266 #[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh"))]
274 pub async fn accept(&mut self) -> Option<Request> {
275 loop {
276 #[cfg(feature = "noq")]
278 let noq_accept = async {
279 #[cfg(feature = "noq")]
280 if let Some(noq) = self.noq.as_mut() {
281 return noq.accept().await;
282 }
283 None
284 };
285 #[cfg(not(feature = "noq"))]
286 let noq_accept = async { None::<()> };
287
288 #[cfg(feature = "iroh")]
289 let iroh_accept = async {
290 #[cfg(feature = "iroh")]
291 if let Some(endpoint) = self.iroh.as_mut() {
292 return endpoint.accept().await;
293 }
294 None
295 };
296 #[cfg(not(feature = "iroh"))]
297 let iroh_accept = async { None::<()> };
298
299 #[cfg(feature = "quinn")]
300 let quinn_accept = async {
301 #[cfg(feature = "quinn")]
302 if let Some(quinn) = self.quinn.as_mut() {
303 return quinn.accept().await;
304 }
305 None
306 };
307 #[cfg(not(feature = "quinn"))]
308 let quinn_accept = async { None::<()> };
309
310 #[cfg(feature = "quiche")]
311 let quiche_accept = async {
312 #[cfg(feature = "quiche")]
313 if let Some(quiche) = self.quiche.as_mut() {
314 return quiche.accept().await;
315 }
316 None
317 };
318 #[cfg(not(feature = "quiche"))]
319 let quiche_accept = async { None::<()> };
320
321 #[cfg(feature = "websocket")]
322 let ws_ref = self.websocket.as_ref();
323 #[cfg(feature = "websocket")]
324 let ws_accept = async {
325 match ws_ref {
326 Some(ws) => ws.accept().await,
327 None => std::future::pending().await,
328 }
329 };
330 #[cfg(not(feature = "websocket"))]
331 let ws_accept = std::future::pending::<Option<crate::Result<()>>>();
332
333 let server = self.moq.clone();
334 let versions = self.versions.clone();
335
336 tokio::select! {
337 Some(_conn) = noq_accept => {
338 #[cfg(feature = "noq")]
339 {
340 let alpns = versions.alpns();
341 self.accept.push(async move {
342 let noq = super::noq::NoqRequest::accept(_conn, alpns).await?;
343 Ok(Request {
344 server,
345 kind: RequestKind::Noq(noq),
346 })
347 }.boxed());
348 }
349 }
350 Some(_conn) = quinn_accept => {
351 #[cfg(feature = "quinn")]
352 {
353 let alpns = versions.alpns();
354 self.accept.push(async move {
355 let quinn = super::quinn::QuinnRequest::accept(_conn, alpns).await?;
356 Ok(Request {
357 server,
358 kind: RequestKind::Quinn(Box::new(quinn)),
359 })
360 }.boxed());
361 }
362 }
363 Some(_conn) = quiche_accept => {
364 #[cfg(feature = "quiche")]
365 {
366 let alpns = versions.alpns();
367 self.accept.push(async move {
368 let quiche = super::quiche::QuicheRequest::accept(_conn, alpns).await?;
369 Ok(Request {
370 server,
371 kind: RequestKind::Quiche(quiche),
372 })
373 }.boxed());
374 }
375 }
376 Some(_conn) = iroh_accept => {
377 #[cfg(feature = "iroh")]
378 self.accept.push(async move {
379 let iroh = super::iroh::Request::accept(_conn).await?;
380 Ok(Request {
381 server,
382 kind: RequestKind::Iroh(iroh),
383 })
384 }.boxed());
385 }
386 Some(_res) = ws_accept => {
387 #[cfg(feature = "websocket")]
388 match _res {
389 Ok(session) => {
390 return Some(Request {
391 server,
392 kind: RequestKind::WebSocket(session),
393 });
394 }
395 Err(err) => tracing::debug!(%err, "failed to accept WebSocket session"),
396 }
397 }
398 Some(res) = self.accept.next() => {
399 match res {
400 Ok(session) => return Some(session),
401 Err(err) => tracing::debug!(%err, "failed to accept session"),
402 }
403 }
404 _ = tokio::signal::ctrl_c() => {
405 self.close().await;
406 return None;
407 }
408 }
409 }
410 }
411
412 #[cfg(feature = "iroh")]
413 pub fn iroh_endpoint(&self) -> Option<&iroh::Endpoint> {
414 self.iroh.as_ref()
415 }
416
417 pub fn local_addr(&self) -> crate::Result<net::SocketAddr> {
418 #[cfg(feature = "noq")]
419 if let Some(noq) = self.noq.as_ref() {
420 return Ok(noq.local_addr()?);
421 }
422 #[cfg(feature = "quinn")]
423 if let Some(quinn) = self.quinn.as_ref() {
424 return Ok(quinn.local_addr()?);
425 }
426 #[cfg(feature = "quiche")]
427 if let Some(quiche) = self.quiche.as_ref() {
428 return Ok(quiche.local_addr()?);
429 }
430 unreachable!("no QUIC backend compiled");
431 }
432
433 #[cfg(feature = "websocket")]
434 pub fn websocket_local_addr(&self) -> Option<net::SocketAddr> {
435 self.websocket.as_ref().and_then(|ws| ws.local_addr().ok())
436 }
437
438 pub async fn close(&mut self) {
439 #[cfg(feature = "noq")]
440 if let Some(noq) = self.noq.as_mut() {
441 noq.close();
442 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
443 }
444 #[cfg(feature = "quinn")]
445 if let Some(quinn) = self.quinn.as_mut() {
446 quinn.close();
447 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
448 }
449 #[cfg(feature = "quiche")]
450 if let Some(quiche) = self.quiche.as_mut() {
451 quiche.close();
452 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
453 }
454 #[cfg(feature = "iroh")]
455 if let Some(iroh) = self.iroh.take() {
456 iroh.close().await;
457 }
458 #[cfg(feature = "websocket")]
459 {
460 let _ = self.websocket.take();
461 }
462 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
463 unreachable!("no QUIC backend compiled");
464 }
465}
466
467pub(crate) enum RequestKind {
469 #[cfg(feature = "noq")]
470 Noq(crate::noq::NoqRequest),
471 #[cfg(feature = "quinn")]
472 Quinn(Box<crate::quinn::QuinnRequest>),
473 #[cfg(feature = "quiche")]
474 Quiche(crate::quiche::QuicheRequest),
475 #[cfg(feature = "iroh")]
476 Iroh(crate::iroh::Request),
477 #[cfg(feature = "websocket")]
478 WebSocket(qmux::Session),
479}
480
481pub struct Request {
486 server: moq_net::Server,
487 kind: RequestKind,
488}
489
490impl Request {
491 pub async fn close(self, _code: u16) -> crate::Result<()> {
493 match self.kind {
494 #[cfg(feature = "noq")]
495 RequestKind::Noq(request) => {
496 let status =
497 web_transport_noq::http::StatusCode::from_u16(_code).map_err(|_| Error::InvalidStatusCode)?;
498 request.close(status).await.map_err(crate::noq::Error::Server)?;
499 Ok(())
500 }
501 #[cfg(feature = "quinn")]
502 RequestKind::Quinn(request) => {
503 let status =
504 web_transport_quinn::http::StatusCode::from_u16(_code).map_err(|_| Error::InvalidStatusCode)?;
505 request.close(status).await.map_err(crate::quinn::Error::Server)?;
506 Ok(())
507 }
508 #[cfg(feature = "quiche")]
509 RequestKind::Quiche(request) => {
510 let status =
511 web_transport_quiche::http::StatusCode::from_u16(_code).map_err(|_| Error::InvalidStatusCode)?;
512 request.reject(status).await.map_err(crate::quiche::Error::Reject)?;
513 Ok(())
514 }
515 #[cfg(feature = "iroh")]
516 RequestKind::Iroh(request) => {
517 let status =
518 web_transport_iroh::http::StatusCode::from_u16(_code).map_err(|_| Error::InvalidStatusCode)?;
519 request.close(status).await.map_err(crate::iroh::Error::Server)?;
520 Ok(())
521 }
522 #[cfg(feature = "websocket")]
523 RequestKind::WebSocket(_session) => {
524 Ok(())
526 }
527 }
528 }
529
530 pub fn with_publish(mut self, publish: impl Into<Option<moq_net::OriginConsumer>>) -> Self {
532 self.server = self.server.with_publish(publish);
533 self
534 }
535
536 pub fn with_consume(mut self, consume: impl Into<Option<moq_net::OriginProducer>>) -> Self {
538 self.server = self.server.with_consume(consume);
539 self
540 }
541
542 pub fn with_stats(mut self, stats: moq_net::StatsHandle) -> Self {
544 self.server = self.server.with_stats(stats);
545 self
546 }
547
548 pub async fn ok(self) -> crate::Result<Session> {
550 match self.kind {
551 #[cfg(feature = "noq")]
552 RequestKind::Noq(request) => Ok(self
553 .server
554 .accept(request.ok().await.map_err(crate::noq::Error::Server)?)
555 .await?),
556 #[cfg(feature = "quinn")]
557 RequestKind::Quinn(request) => Ok(self
558 .server
559 .accept(request.ok().await.map_err(crate::quinn::Error::Server)?)
560 .await?),
561 #[cfg(feature = "quiche")]
562 RequestKind::Quiche(request) => {
563 let conn = request.ok().await.map_err(crate::quiche::Error::Accept)?;
564 Ok(self.server.accept(conn).await?)
565 }
566 #[cfg(feature = "iroh")]
567 RequestKind::Iroh(request) => Ok(self
568 .server
569 .accept(request.ok().await.map_err(crate::iroh::Error::Server)?)
570 .await?),
571 #[cfg(feature = "websocket")]
572 RequestKind::WebSocket(session) => Ok(self.server.accept(session).await?),
573 }
574 }
575
576 pub fn transport(&self) -> &'static str {
578 match self.kind {
579 #[cfg(feature = "noq")]
580 RequestKind::Noq(_) => "quic",
581 #[cfg(feature = "quinn")]
582 RequestKind::Quinn(_) => "quic",
583 #[cfg(feature = "quiche")]
584 RequestKind::Quiche(_) => "quic",
585 #[cfg(feature = "iroh")]
586 RequestKind::Iroh(_) => "iroh",
587 #[cfg(feature = "websocket")]
588 RequestKind::WebSocket(_) => "websocket",
589 }
590 }
591
592 pub fn url(&self) -> Option<&Url> {
594 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
595 unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
596
597 match self.kind {
598 #[cfg(feature = "noq")]
599 RequestKind::Noq(ref request) => request.url(),
600 #[cfg(feature = "quinn")]
601 RequestKind::Quinn(ref request) => request.url(),
602 #[cfg(feature = "quiche")]
603 RequestKind::Quiche(ref request) => request.url(),
604 #[cfg(feature = "iroh")]
605 RequestKind::Iroh(ref request) => request.url(),
606 #[cfg(feature = "websocket")]
607 RequestKind::WebSocket(_) => None,
608 }
609 }
610
611 pub fn has_peer_certificate(&self) -> bool {
616 match self.kind {
617 #[cfg(feature = "quinn")]
618 RequestKind::Quinn(ref request) => request.has_peer_certificate(),
619 #[cfg(feature = "noq")]
620 RequestKind::Noq(_) => false,
621 #[cfg(feature = "quiche")]
622 RequestKind::Quiche(_) => false,
623 #[cfg(feature = "iroh")]
624 RequestKind::Iroh(_) => false,
625 #[cfg(feature = "websocket")]
626 RequestKind::WebSocket(_) => false,
627 #[cfg(not(any(
628 feature = "noq",
629 feature = "quinn",
630 feature = "quiche",
631 feature = "iroh",
632 feature = "websocket"
633 )))]
634 _ => false,
635 }
636 }
637}
638
639#[serde_with::serde_as]
641#[derive(Clone, serde::Serialize, serde::Deserialize)]
642pub struct ServerId(#[serde_as(as = "serde_with::hex::Hex")] pub(crate) Vec<u8>);
643
644impl ServerId {
645 #[allow(dead_code)]
646 pub(crate) fn len(&self) -> usize {
647 self.0.len()
648 }
649}
650
651impl std::fmt::Debug for ServerId {
652 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
653 f.debug_tuple("QuicLbServerId").field(&hex::encode(&self.0)).finish()
654 }
655}
656
657impl std::str::FromStr for ServerId {
658 type Err = hex::FromHexError;
659
660 fn from_str(s: &str) -> Result<Self, Self::Err> {
661 hex::decode(s).map(Self)
662 }
663}
664
665#[cfg(test)]
666mod tests {
667 use super::*;
668
669 #[test]
670 fn test_tls_string_or_array() {
671 let single = r#"
673 cert = "cert.pem"
674 key = "key.pem"
675 "#;
676 let config: crate::tls::Server = toml::from_str(single).unwrap();
677 assert_eq!(config.cert, vec![PathBuf::from("cert.pem")]);
678 assert_eq!(config.key, vec![PathBuf::from("key.pem")]);
679
680 let array = r#"
682 cert = ["a.pem", "b.pem"]
683 key = ["a.key", "b.key"]
684 generate = ["localhost"]
685 root = ["ca.pem"]
686 "#;
687 let config: crate::tls::Server = toml::from_str(array).unwrap();
688 assert_eq!(config.cert, vec![PathBuf::from("a.pem"), PathBuf::from("b.pem")]);
689 assert_eq!(config.key, vec![PathBuf::from("a.key"), PathBuf::from("b.key")]);
690 assert_eq!(config.generate, vec!["localhost".to_string()]);
691 assert_eq!(config.root, vec![PathBuf::from("ca.pem")]);
692 }
693}