1use std::net;
2#[cfg(test)]
3use std::path::PathBuf;
4
5use crate::{Error, QuicBackend};
6use moq_net::Session;
7use std::sync::{Arc, RwLock};
8use url::Url;
9#[cfg(feature = "iroh")]
10use web_transport_iroh::iroh;
11
12use futures::FutureExt;
13use futures::future::BoxFuture;
14use futures::stream::FuturesUnordered;
15use futures::stream::StreamExt;
16
17#[derive(clap::Args, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
19#[serde(deny_unknown_fields, default)]
20#[non_exhaustive]
21pub struct ServerConfig {
22 #[serde(alias = "listen")]
30 #[arg(id = "server-bind", long = "server-bind", alias = "listen", env = "MOQ_SERVER_BIND")]
31 pub bind: Option<String>,
32
33 #[arg(id = "server-backend", long = "server-backend", env = "MOQ_SERVER_BACKEND")]
36 pub backend: Option<QuicBackend>,
37
38 #[arg(id = "server-quic-lb-id", long = "server-quic-lb-id", env = "MOQ_SERVER_QUIC_LB_ID")]
41 #[serde(default, skip_serializing_if = "Option::is_none")]
42 pub quic_lb_id: Option<ServerId>,
43
44 #[arg(
47 id = "server-quic-lb-nonce",
48 long = "server-quic-lb-nonce",
49 requires = "server-quic-lb-id",
50 env = "MOQ_SERVER_QUIC_LB_NONCE"
51 )]
52 #[serde(default, skip_serializing_if = "Option::is_none")]
53 pub quic_lb_nonce: Option<usize>,
54
55 #[arg(
63 id = "server-preferred-v4",
64 long = "server-preferred-v4",
65 env = "MOQ_SERVER_PREFERRED_V4"
66 )]
67 #[serde(default, skip_serializing_if = "Option::is_none")]
68 pub preferred_v4: Option<net::SocketAddrV4>,
69
70 #[arg(
74 id = "server-preferred-v6",
75 long = "server-preferred-v6",
76 env = "MOQ_SERVER_PREFERRED_V6"
77 )]
78 #[serde(default, skip_serializing_if = "Option::is_none")]
79 pub preferred_v6: Option<net::SocketAddrV6>,
80
81 #[serde(skip_serializing_if = "Option::is_none")]
83 #[arg(
84 id = "server-max-streams",
85 long = "server-max-streams",
86 env = "MOQ_SERVER_MAX_STREAMS"
87 )]
88 pub max_streams: Option<u64>,
89
90 #[serde(default, skip_serializing_if = "Vec::is_empty")]
98 #[arg(id = "server-version", long = "server-version", env = "MOQ_SERVER_VERSION")]
99 pub version: Vec<moq_net::Version>,
100
101 #[command(flatten)]
102 #[serde(default)]
103 pub tls: crate::tls::Server,
104}
105
106impl ServerConfig {
107 pub fn init(self) -> crate::Result<Server> {
108 Server::new(self)
109 }
110
111 pub fn versions(&self) -> moq_net::Versions {
113 if self.version.is_empty() {
114 moq_net::Versions::all()
115 } else {
116 moq_net::Versions::from(self.version.clone())
117 }
118 }
119}
120
121pub(crate) const DEFAULT_BIND: &str = "[::]:443";
123
124pub struct Server {
128 moq: moq_net::Server,
129 versions: moq_net::Versions,
130 accept: FuturesUnordered<BoxFuture<'static, crate::Result<Request>>>,
131 #[cfg(feature = "iroh")]
132 iroh: Option<iroh::Endpoint>,
133 #[cfg(feature = "noq")]
134 noq: Option<crate::noq::NoqServer>,
135 #[cfg(feature = "quinn")]
136 quinn: Option<crate::quinn::QuinnServer>,
137 #[cfg(feature = "quiche")]
138 quiche: Option<crate::quiche::QuicheServer>,
139 #[cfg(feature = "websocket")]
140 websocket: Option<crate::websocket::Listener>,
141}
142
143impl Server {
144 pub fn new(config: ServerConfig) -> crate::Result<Self> {
145 let backend = config.backend.clone().unwrap_or({
146 #[cfg(feature = "quinn")]
147 {
148 QuicBackend::Quinn
149 }
150 #[cfg(all(feature = "noq", not(feature = "quinn")))]
151 {
152 QuicBackend::Noq
153 }
154 #[cfg(all(feature = "quiche", not(feature = "quinn"), not(feature = "noq")))]
155 {
156 QuicBackend::Quiche
157 }
158 #[cfg(all(not(feature = "quiche"), not(feature = "quinn"), not(feature = "noq")))]
159 panic!("no QUIC backend compiled; enable noq, quinn, or quiche feature");
160 });
161
162 let versions = config.versions();
163
164 if !config.tls.root.is_empty() {
165 let mtls_supported = match backend {
166 #[cfg(feature = "quinn")]
167 QuicBackend::Quinn => true,
168 #[cfg(feature = "noq")]
169 QuicBackend::Noq => true,
170 #[allow(unreachable_patterns)]
171 _ => false,
172 };
173 if !mtls_supported {
174 return Err(Error::MtlsUnsupported);
175 }
176 }
177
178 #[cfg(feature = "noq")]
179 #[allow(unreachable_patterns)]
180 let noq = match backend {
181 QuicBackend::Noq => Some(crate::noq::NoqServer::new(config.clone())?),
182 _ => None,
183 };
184
185 #[cfg(feature = "quinn")]
186 #[allow(unreachable_patterns)]
187 let quinn = match backend {
188 QuicBackend::Quinn => Some(crate::quinn::QuinnServer::new(config.clone())?),
189 _ => None,
190 };
191
192 #[cfg(feature = "quiche")]
193 let quiche = match backend {
194 QuicBackend::Quiche => Some(crate::quiche::QuicheServer::new(config)?),
195 _ => None,
196 };
197
198 Ok(Server {
199 accept: Default::default(),
200 moq: moq_net::Server::new().with_versions(versions.clone()),
201 versions,
202 #[cfg(feature = "iroh")]
203 iroh: None,
204 #[cfg(feature = "noq")]
205 noq,
206 #[cfg(feature = "quinn")]
207 quinn,
208 #[cfg(feature = "quiche")]
209 quiche,
210 #[cfg(feature = "websocket")]
211 websocket: None,
212 })
213 }
214
215 #[cfg(feature = "websocket")]
221 pub fn with_websocket(mut self, websocket: Option<crate::websocket::Listener>) -> Self {
222 self.websocket = websocket;
223 self
224 }
225
226 #[cfg(feature = "iroh")]
227 pub fn with_iroh(mut self, iroh: Option<iroh::Endpoint>) -> Self {
228 self.iroh = iroh;
229 self
230 }
231
232 pub fn with_publish(mut self, publish: impl Into<Option<moq_net::OriginConsumer>>) -> Self {
233 self.moq = self.moq.with_publish(publish);
234 self
235 }
236
237 pub fn with_consume(mut self, consume: impl Into<Option<moq_net::OriginProducer>>) -> Self {
238 self.moq = self.moq.with_consume(consume);
239 self
240 }
241
242 pub fn with_stats(mut self, stats: moq_net::StatsHandle) -> Self {
244 self.moq = self.moq.with_stats(stats);
245 self
246 }
247
248 pub fn tls_info(&self) -> Arc<RwLock<crate::tls::Info>> {
250 #[cfg(feature = "noq")]
251 if let Some(noq) = self.noq.as_ref() {
252 return noq.tls_info();
253 }
254 #[cfg(feature = "quinn")]
255 if let Some(quinn) = self.quinn.as_ref() {
256 return quinn.tls_info();
257 }
258 #[cfg(feature = "quiche")]
259 if let Some(quiche) = self.quiche.as_ref() {
260 return quiche.tls_info();
261 }
262 unreachable!("no QUIC backend compiled");
263 }
264
265 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
266 pub async fn accept(&mut self) -> Option<Request> {
267 unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
268 }
269
270 #[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh"))]
278 pub async fn accept(&mut self) -> Option<Request> {
279 loop {
280 #[cfg(feature = "noq")]
282 let noq_accept = async {
283 #[cfg(feature = "noq")]
284 if let Some(noq) = self.noq.as_mut() {
285 return noq.accept().await;
286 }
287 None
288 };
289 #[cfg(not(feature = "noq"))]
290 let noq_accept = async { None::<()> };
291
292 #[cfg(feature = "iroh")]
293 let iroh_accept = async {
294 #[cfg(feature = "iroh")]
295 if let Some(endpoint) = self.iroh.as_mut() {
296 return endpoint.accept().await;
297 }
298 None
299 };
300 #[cfg(not(feature = "iroh"))]
301 let iroh_accept = async { None::<()> };
302
303 #[cfg(feature = "quinn")]
304 let quinn_accept = async {
305 #[cfg(feature = "quinn")]
306 if let Some(quinn) = self.quinn.as_mut() {
307 return quinn.accept().await;
308 }
309 None
310 };
311 #[cfg(not(feature = "quinn"))]
312 let quinn_accept = async { None::<()> };
313
314 #[cfg(feature = "quiche")]
315 let quiche_accept = async {
316 #[cfg(feature = "quiche")]
317 if let Some(quiche) = self.quiche.as_mut() {
318 return quiche.accept().await;
319 }
320 None
321 };
322 #[cfg(not(feature = "quiche"))]
323 let quiche_accept = async { None::<()> };
324
325 #[cfg(feature = "websocket")]
326 let ws_ref = self.websocket.as_ref();
327 #[cfg(feature = "websocket")]
328 let ws_accept = async {
329 match ws_ref {
330 Some(ws) => ws.accept().await,
331 None => std::future::pending().await,
332 }
333 };
334 #[cfg(not(feature = "websocket"))]
335 let ws_accept = std::future::pending::<Option<crate::Result<()>>>();
336
337 let server = self.moq.clone();
338 let versions = self.versions.clone();
339
340 tokio::select! {
341 Some(_conn) = noq_accept => {
342 #[cfg(feature = "noq")]
343 {
344 let alpns = versions.alpns();
345 self.accept.push(async move {
346 let noq = super::noq::NoqRequest::accept(_conn, alpns).await?;
347 Ok(Request {
348 server,
349 kind: RequestKind::Noq(noq),
350 })
351 }.boxed());
352 }
353 }
354 Some(_conn) = quinn_accept => {
355 #[cfg(feature = "quinn")]
356 {
357 let alpns = versions.alpns();
358 self.accept.push(async move {
359 let quinn = super::quinn::QuinnRequest::accept(_conn, alpns).await?;
360 Ok(Request {
361 server,
362 kind: RequestKind::Quinn(Box::new(quinn)),
363 })
364 }.boxed());
365 }
366 }
367 Some(_conn) = quiche_accept => {
368 #[cfg(feature = "quiche")]
369 {
370 let alpns = versions.alpns();
371 self.accept.push(async move {
372 let quiche = super::quiche::QuicheRequest::accept(_conn, alpns).await?;
373 Ok(Request {
374 server,
375 kind: RequestKind::Quiche(quiche),
376 })
377 }.boxed());
378 }
379 }
380 Some(_conn) = iroh_accept => {
381 #[cfg(feature = "iroh")]
382 self.accept.push(async move {
383 let iroh = super::iroh::Request::accept(_conn).await?;
384 Ok(Request {
385 server,
386 kind: RequestKind::Iroh(iroh),
387 })
388 }.boxed());
389 }
390 Some(_res) = ws_accept => {
391 #[cfg(feature = "websocket")]
392 match _res {
393 Ok(session) => {
394 return Some(Request {
395 server,
396 kind: RequestKind::WebSocket(session),
397 });
398 }
399 Err(err) => tracing::debug!(%err, "failed to accept WebSocket session"),
400 }
401 }
402 Some(res) = self.accept.next() => {
403 match res {
404 Ok(session) => return Some(session),
405 Err(err) => tracing::debug!(%err, "failed to accept session"),
406 }
407 }
408 _ = tokio::signal::ctrl_c() => {
409 self.close().await;
410 return None;
411 }
412 }
413 }
414 }
415
416 #[cfg(feature = "iroh")]
417 pub fn iroh_endpoint(&self) -> Option<&iroh::Endpoint> {
418 self.iroh.as_ref()
419 }
420
421 pub fn local_addr(&self) -> crate::Result<net::SocketAddr> {
422 #[cfg(feature = "noq")]
423 if let Some(noq) = self.noq.as_ref() {
424 return Ok(noq.local_addr()?);
425 }
426 #[cfg(feature = "quinn")]
427 if let Some(quinn) = self.quinn.as_ref() {
428 return Ok(quinn.local_addr()?);
429 }
430 #[cfg(feature = "quiche")]
431 if let Some(quiche) = self.quiche.as_ref() {
432 return Ok(quiche.local_addr()?);
433 }
434 unreachable!("no QUIC backend compiled");
435 }
436
437 #[cfg(feature = "websocket")]
438 pub fn websocket_local_addr(&self) -> Option<net::SocketAddr> {
439 self.websocket.as_ref().and_then(|ws| ws.local_addr().ok())
440 }
441
442 pub async fn close(&mut self) {
443 #[cfg(feature = "noq")]
444 if let Some(noq) = self.noq.as_mut() {
445 noq.close();
446 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
447 }
448 #[cfg(feature = "quinn")]
449 if let Some(quinn) = self.quinn.as_mut() {
450 quinn.close();
451 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
452 }
453 #[cfg(feature = "quiche")]
454 if let Some(quiche) = self.quiche.as_mut() {
455 quiche.close();
456 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
457 }
458 #[cfg(feature = "iroh")]
459 if let Some(iroh) = self.iroh.take() {
460 iroh.close().await;
461 }
462 #[cfg(feature = "websocket")]
463 {
464 let _ = self.websocket.take();
465 }
466 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
467 unreachable!("no QUIC backend compiled");
468 }
469}
470
471pub(crate) enum RequestKind {
473 #[cfg(feature = "noq")]
474 Noq(crate::noq::NoqRequest),
475 #[cfg(feature = "quinn")]
476 Quinn(Box<crate::quinn::QuinnRequest>),
477 #[cfg(feature = "quiche")]
478 Quiche(crate::quiche::QuicheRequest),
479 #[cfg(feature = "iroh")]
480 Iroh(crate::iroh::Request),
481 #[cfg(feature = "websocket")]
482 WebSocket(qmux::Session),
483}
484
485pub struct Request {
490 server: moq_net::Server,
491 kind: RequestKind,
492}
493
494impl Request {
495 pub async fn close(self, _code: u16) -> crate::Result<()> {
497 match self.kind {
498 #[cfg(feature = "noq")]
499 RequestKind::Noq(request) => {
500 let status =
501 web_transport_noq::http::StatusCode::from_u16(_code).map_err(|_| Error::InvalidStatusCode)?;
502 request.close(status).await.map_err(crate::noq::Error::Server)?;
503 Ok(())
504 }
505 #[cfg(feature = "quinn")]
506 RequestKind::Quinn(request) => {
507 let status =
508 web_transport_quinn::http::StatusCode::from_u16(_code).map_err(|_| Error::InvalidStatusCode)?;
509 request.close(status).await.map_err(crate::quinn::Error::Server)?;
510 Ok(())
511 }
512 #[cfg(feature = "quiche")]
513 RequestKind::Quiche(request) => {
514 let status =
515 web_transport_quiche::http::StatusCode::from_u16(_code).map_err(|_| Error::InvalidStatusCode)?;
516 request.reject(status).await.map_err(crate::quiche::Error::Reject)?;
517 Ok(())
518 }
519 #[cfg(feature = "iroh")]
520 RequestKind::Iroh(request) => {
521 let status =
522 web_transport_iroh::http::StatusCode::from_u16(_code).map_err(|_| Error::InvalidStatusCode)?;
523 request.close(status).await.map_err(crate::iroh::Error::Server)?;
524 Ok(())
525 }
526 #[cfg(feature = "websocket")]
527 RequestKind::WebSocket(_session) => {
528 Ok(())
530 }
531 }
532 }
533
534 pub fn with_publish(mut self, publish: impl Into<Option<moq_net::OriginConsumer>>) -> Self {
536 self.server = self.server.with_publish(publish);
537 self
538 }
539
540 pub fn with_consume(mut self, consume: impl Into<Option<moq_net::OriginProducer>>) -> Self {
542 self.server = self.server.with_consume(consume);
543 self
544 }
545
546 pub fn with_stats(mut self, stats: moq_net::StatsHandle) -> Self {
548 self.server = self.server.with_stats(stats);
549 self
550 }
551
552 pub async fn ok(self) -> crate::Result<Session> {
554 match self.kind {
555 #[cfg(feature = "noq")]
556 RequestKind::Noq(request) => Ok(self
557 .server
558 .accept(request.ok().await.map_err(crate::noq::Error::Server)?)
559 .await?),
560 #[cfg(feature = "quinn")]
561 RequestKind::Quinn(request) => Ok(self
562 .server
563 .accept(request.ok().await.map_err(crate::quinn::Error::Server)?)
564 .await?),
565 #[cfg(feature = "quiche")]
566 RequestKind::Quiche(request) => {
567 let conn = request.ok().await.map_err(crate::quiche::Error::Accept)?;
568 Ok(self.server.accept(conn).await?)
569 }
570 #[cfg(feature = "iroh")]
571 RequestKind::Iroh(request) => Ok(self
572 .server
573 .accept(request.ok().await.map_err(crate::iroh::Error::Server)?)
574 .await?),
575 #[cfg(feature = "websocket")]
576 RequestKind::WebSocket(session) => Ok(self.server.accept(session).await?),
577 }
578 }
579
580 pub fn transport(&self) -> &'static str {
582 match self.kind {
583 #[cfg(feature = "noq")]
584 RequestKind::Noq(_) => "quic",
585 #[cfg(feature = "quinn")]
586 RequestKind::Quinn(_) => "quic",
587 #[cfg(feature = "quiche")]
588 RequestKind::Quiche(_) => "quic",
589 #[cfg(feature = "iroh")]
590 RequestKind::Iroh(_) => "iroh",
591 #[cfg(feature = "websocket")]
592 RequestKind::WebSocket(_) => "websocket",
593 }
594 }
595
596 pub fn url(&self) -> Option<&Url> {
598 #[cfg(not(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh")))]
599 unreachable!("no QUIC backend compiled; enable noq, quinn, quiche, or iroh feature");
600
601 match self.kind {
602 #[cfg(feature = "noq")]
603 RequestKind::Noq(ref request) => request.url(),
604 #[cfg(feature = "quinn")]
605 RequestKind::Quinn(ref request) => request.url(),
606 #[cfg(feature = "quiche")]
607 RequestKind::Quiche(ref request) => request.url(),
608 #[cfg(feature = "iroh")]
609 RequestKind::Iroh(ref request) => request.url(),
610 #[cfg(feature = "websocket")]
611 RequestKind::WebSocket(_) => None,
612 }
613 }
614
615 pub fn has_peer_certificate(&self) -> bool {
620 match self.kind {
621 #[cfg(feature = "quinn")]
622 RequestKind::Quinn(ref request) => request.has_peer_certificate(),
623 #[cfg(feature = "noq")]
624 RequestKind::Noq(ref request) => request.has_peer_certificate(),
625 #[cfg(feature = "quiche")]
626 RequestKind::Quiche(_) => false,
627 #[cfg(feature = "iroh")]
628 RequestKind::Iroh(_) => false,
629 #[cfg(feature = "websocket")]
630 RequestKind::WebSocket(_) => false,
631 #[cfg(not(any(
632 feature = "noq",
633 feature = "quinn",
634 feature = "quiche",
635 feature = "iroh",
636 feature = "websocket"
637 )))]
638 _ => false,
639 }
640 }
641}
642
643#[serde_with::serde_as]
645#[derive(Clone, serde::Serialize, serde::Deserialize)]
646pub struct ServerId(#[serde_as(as = "serde_with::hex::Hex")] pub(crate) Vec<u8>);
647
648impl ServerId {
649 #[allow(dead_code)]
650 pub(crate) fn len(&self) -> usize {
651 self.0.len()
652 }
653}
654
655impl std::fmt::Debug for ServerId {
656 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
657 f.debug_tuple("QuicLbServerId").field(&hex::encode(&self.0)).finish()
658 }
659}
660
661impl std::str::FromStr for ServerId {
662 type Err = hex::FromHexError;
663
664 fn from_str(s: &str) -> Result<Self, Self::Err> {
665 hex::decode(s).map(Self)
666 }
667}
668
669#[cfg(test)]
670mod tests {
671 use super::*;
672
673 #[test]
674 fn test_tls_string_or_array() {
675 let single = r#"
677 cert = "cert.pem"
678 key = "key.pem"
679 "#;
680 let config: crate::tls::Server = toml::from_str(single).unwrap();
681 assert_eq!(config.cert, vec![PathBuf::from("cert.pem")]);
682 assert_eq!(config.key, vec![PathBuf::from("key.pem")]);
683
684 let array = r#"
686 cert = ["a.pem", "b.pem"]
687 key = ["a.key", "b.key"]
688 generate = ["localhost"]
689 root = ["ca.pem"]
690 "#;
691 let config: crate::tls::Server = toml::from_str(array).unwrap();
692 assert_eq!(config.cert, vec![PathBuf::from("a.pem"), PathBuf::from("b.pem")]);
693 assert_eq!(config.key, vec![PathBuf::from("a.key"), PathBuf::from("b.key")]);
694 assert_eq!(config.generate, vec!["localhost".to_string()]);
695 assert_eq!(config.root, vec![PathBuf::from("ca.pem")]);
696 }
697}