1use std::net;
2use std::path::PathBuf;
3
4use crate::QuicBackend;
5use moq_lite::Session;
6use std::sync::{Arc, RwLock};
7use url::Url;
8#[cfg(feature = "iroh")]
9use web_transport_iroh::iroh;
10
11use anyhow::Context;
12
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use futures::stream::FuturesUnordered;
16use futures::stream::StreamExt;
17
18#[derive(clap::Args, Clone, Default, Debug, serde::Serialize, serde::Deserialize)]
23#[serde(deny_unknown_fields)]
24#[non_exhaustive]
25pub struct ServerTlsConfig {
26 #[arg(long = "tls-cert", id = "tls-cert", env = "MOQ_SERVER_TLS_CERT")]
28 #[serde(default, skip_serializing_if = "Vec::is_empty")]
29 pub cert: Vec<PathBuf>,
30
31 #[arg(long = "tls-key", id = "tls-key", env = "MOQ_SERVER_TLS_KEY")]
33 #[serde(default, skip_serializing_if = "Vec::is_empty")]
34 pub key: Vec<PathBuf>,
35
36 #[arg(
39 long = "tls-generate",
40 id = "tls-generate",
41 value_delimiter = ',',
42 env = "MOQ_SERVER_TLS_GENERATE"
43 )]
44 #[serde(default, skip_serializing_if = "Vec::is_empty")]
45 pub generate: Vec<String>,
46}
47
48#[derive(clap::Args, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
50#[serde(deny_unknown_fields, default)]
51#[non_exhaustive]
52pub struct ServerConfig {
53 #[serde(alias = "listen")]
56 #[arg(id = "server-bind", long = "server-bind", alias = "listen", env = "MOQ_SERVER_BIND")]
57 pub bind: Option<net::SocketAddr>,
58
59 #[arg(id = "server-backend", long = "server-backend", env = "MOQ_SERVER_BACKEND")]
62 pub backend: Option<QuicBackend>,
63
64 #[arg(id = "server-quic-lb-id", long = "server-quic-lb-id", env = "MOQ_SERVER_QUIC_LB_ID")]
67 #[serde(default, skip_serializing_if = "Option::is_none")]
68 pub quic_lb_id: Option<ServerId>,
69
70 #[arg(
73 id = "server-quic-lb-nonce",
74 long = "server-quic-lb-nonce",
75 requires = "server-quic-lb-id",
76 env = "MOQ_SERVER_QUIC_LB_NONCE"
77 )]
78 #[serde(default, skip_serializing_if = "Option::is_none")]
79 pub quic_lb_nonce: Option<usize>,
80
81 #[serde(skip_serializing_if = "Option::is_none")]
83 #[arg(
84 id = "server-max-streams",
85 long = "server-max-streams",
86 env = "MOQ_SERVER_MAX_STREAMS"
87 )]
88 pub max_streams: Option<u64>,
89
90 #[command(flatten)]
91 #[serde(default)]
92 pub tls: ServerTlsConfig,
93}
94
95impl ServerConfig {
96 pub fn init(self) -> anyhow::Result<Server> {
97 Server::new(self)
98 }
99}
100
101pub struct Server {
105 moq: moq_lite::Server,
106 accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<Request>>>,
107 #[cfg(feature = "iroh")]
108 iroh: Option<iroh::Endpoint>,
109 #[cfg(feature = "quinn")]
110 quinn: Option<crate::quinn::QuinnServer>,
111 #[cfg(feature = "quiche")]
112 quiche: Option<crate::quiche::QuicheServer>,
113}
114
115impl Server {
116 pub fn new(config: ServerConfig) -> anyhow::Result<Self> {
117 let backend = config.backend.clone().unwrap_or({
118 #[cfg(feature = "quinn")]
119 {
120 QuicBackend::Quinn
121 }
122 #[cfg(all(feature = "quiche", not(feature = "quinn")))]
123 {
124 QuicBackend::Quiche
125 }
126 #[cfg(all(not(feature = "quiche"), not(feature = "quinn")))]
127 panic!("no QUIC backend compiled; enable quinn or quiche feature");
128 });
129
130 #[cfg(feature = "quinn")]
131 #[allow(unreachable_patterns)]
132 let quinn = match backend {
133 QuicBackend::Quinn => Some(crate::quinn::QuinnServer::new(config.clone())?),
134 _ => None,
135 };
136
137 #[cfg(feature = "quiche")]
138 let quiche = match backend {
139 QuicBackend::Quiche => Some(crate::quiche::QuicheServer::new(config)?),
140 _ => None,
141 };
142
143 Ok(Server {
144 accept: Default::default(),
145 moq: moq_lite::Server::new(),
146 #[cfg(feature = "iroh")]
147 iroh: None,
148 #[cfg(feature = "quinn")]
149 quinn,
150 #[cfg(feature = "quiche")]
151 quiche,
152 })
153 }
154
155 #[cfg(feature = "iroh")]
156 pub fn with_iroh(mut self, iroh: Option<iroh::Endpoint>) -> Self {
157 self.iroh = iroh;
158 self
159 }
160
161 pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
162 self.moq = self.moq.with_publish(publish);
163 self
164 }
165
166 pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
167 self.moq = self.moq.with_consume(consume);
168 self
169 }
170
171 pub fn tls_info(&self) -> Arc<RwLock<ServerTlsInfo>> {
173 #[cfg(feature = "quinn")]
174 if let Some(quinn) = self.quinn.as_ref() {
175 return quinn.tls_info();
176 }
177 #[cfg(feature = "quiche")]
178 if let Some(quiche) = self.quiche.as_ref() {
179 return quiche.tls_info();
180 }
181 unreachable!("no QUIC backend compiled");
182 }
183
184 #[cfg(not(any(feature = "quinn", feature = "quiche", feature = "iroh")))]
185 pub async fn accept(&mut self) -> Option<Request> {
186 unreachable!("no QUIC backend compiled; enable quinn, quiche, or iroh feature");
187 }
188
189 #[cfg(any(feature = "quinn", feature = "quiche", feature = "iroh"))]
197 pub async fn accept(&mut self) -> Option<Request> {
198 loop {
199 #[cfg(feature = "iroh")]
201 let iroh_accept = async {
202 #[cfg(feature = "iroh")]
203 if let Some(endpoint) = self.iroh.as_mut() {
204 return endpoint.accept().await;
205 }
206 None
207 };
208 #[cfg(not(feature = "iroh"))]
209 let iroh_accept = async { None::<()> };
210
211 #[cfg(feature = "quinn")]
212 let quinn_accept = async {
213 #[cfg(feature = "quinn")]
214 if let Some(quinn) = self.quinn.as_mut() {
215 return quinn.accept().await;
216 }
217 None
218 };
219 #[cfg(not(feature = "quinn"))]
220 let quinn_accept = async { None::<()> };
221
222 #[cfg(feature = "quiche")]
223 let quiche_accept = async {
224 #[cfg(feature = "quiche")]
225 if let Some(quiche) = self.quiche.as_mut() {
226 return quiche.accept().await;
227 }
228 None
229 };
230 #[cfg(not(feature = "quiche"))]
231 let quiche_accept = async { None::<()> };
232
233 let server = self.moq.clone();
234
235 tokio::select! {
236 Some(conn) = quinn_accept => {
237 #[cfg(feature = "quinn")]
238 self.accept.push(async move {
239 let quinn = super::quinn::QuinnRequest::accept(conn).await?;
240 Ok(Request {
241 server,
242 kind: RequestKind::Quinn(quinn),
243 })
244 }.boxed());
245 }
246 Some(_conn) = quiche_accept => {
247 #[cfg(feature = "quiche")]
248 self.accept.push(async move {
249 let quiche = super::quiche::QuicheRequest::accept(_conn).await?;
250 Ok(Request {
251 server,
252 kind: RequestKind::Quiche(quiche),
253 })
254 }.boxed());
255 }
256 Some(conn) = iroh_accept => {
257 #[cfg(feature = "iroh")]
258 self.accept.push(async move {
259 let iroh = super::iroh::IrohRequest::accept(conn).await?;
260 Ok(Request {
261 server,
262 kind: RequestKind::Iroh(iroh),
263 })
264 }.boxed());
265 }
266 Some(res) = self.accept.next() => {
267 match res {
268 Ok(session) => return Some(session),
269 Err(err) => tracing::debug!(%err, "failed to accept session"),
270 }
271 }
272 _ = tokio::signal::ctrl_c() => {
273 self.close();
274 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
275 return None;
276 }
277 }
278 }
279 }
280
281 #[cfg(feature = "iroh")]
282 pub fn iroh_endpoint(&self) -> Option<&iroh::Endpoint> {
283 self.iroh.as_ref()
284 }
285
286 pub fn local_addr(&self) -> anyhow::Result<net::SocketAddr> {
287 #[cfg(feature = "quinn")]
288 if let Some(quinn) = self.quinn.as_ref() {
289 return quinn.local_addr();
290 }
291 #[cfg(feature = "quiche")]
292 if let Some(quiche) = self.quiche.as_ref() {
293 return quiche.local_addr();
294 }
295 unreachable!("no QUIC backend compiled");
296 }
297
298 pub fn close(&mut self) {
299 #[cfg(feature = "quinn")]
300 if let Some(quinn) = self.quinn.as_mut() {
301 quinn.close();
302 }
303 #[cfg(feature = "quiche")]
304 if let Some(quiche) = self.quiche.as_mut() {
305 quiche.close();
306 }
307 unreachable!("no QUIC backend compiled");
308 }
309}
310
311pub(crate) enum RequestKind {
313 #[cfg(feature = "quinn")]
314 Quinn(crate::quinn::QuinnRequest),
315 #[cfg(feature = "quiche")]
316 Quiche(crate::quiche::QuicheRequest),
317 #[cfg(feature = "iroh")]
318 Iroh(crate::iroh::IrohRequest),
319}
320
321pub struct Request {
326 server: moq_lite::Server,
327 kind: RequestKind,
328}
329
330impl Request {
331 pub async fn close(self, _code: u16) -> anyhow::Result<()> {
333 match self.kind {
334 #[cfg(feature = "quinn")]
335 RequestKind::Quinn(request) => {
336 let status = web_transport_quinn::http::StatusCode::from_u16(_code).context("invalid status code")?;
337 request.close(status).await?;
338 Ok(())
339 }
340 #[cfg(feature = "quiche")]
341 RequestKind::Quiche(request) => {
342 let status = web_transport_quiche::http::StatusCode::from_u16(_code).context("invalid status code")?;
343 request
344 .reject(status)
345 .await
346 .map_err(|e| anyhow::anyhow!("failed to close quiche WebTransport request: {e}"))?;
347 Ok(())
348 }
349 #[cfg(feature = "iroh")]
350 RequestKind::Iroh(request) => {
351 let status = web_transport_iroh::http::StatusCode::from_u16(_code).context("invalid status code")?;
352 request.close(status).await?;
353 Ok(())
354 }
355 }
356 }
357
358 pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
360 self.server = self.server.with_publish(publish);
361 self
362 }
363
364 pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
366 self.server = self.server.with_consume(consume);
367 self
368 }
369
370 pub async fn ok(self) -> anyhow::Result<Session> {
372 match self.kind {
373 #[cfg(feature = "quinn")]
374 RequestKind::Quinn(request) => Ok(self.server.accept(request.ok().await?).await?),
375 #[cfg(feature = "quiche")]
376 RequestKind::Quiche(request) => {
377 let conn = request
378 .ok()
379 .await
380 .map_err(|e| anyhow::anyhow!("failed to accept quiche WebTransport: {e}"))?;
381 Ok(self.server.accept(conn).await?)
382 }
383 #[cfg(feature = "iroh")]
384 RequestKind::Iroh(request) => Ok(self.server.accept(request.ok().await?).await?),
385 }
386 }
387
388 pub fn url(&self) -> Option<&Url> {
390 #[cfg(not(any(feature = "quinn", feature = "quiche", feature = "iroh")))]
391 unreachable!("no QUIC backend compiled; enable quinn, quiche, or iroh feature");
392
393 match self.kind {
394 #[cfg(feature = "quinn")]
395 RequestKind::Quinn(ref request) => request.url(),
396 #[cfg(feature = "quiche")]
397 RequestKind::Quiche(ref request) => request.url(),
398 #[cfg(feature = "iroh")]
399 RequestKind::Iroh(ref request) => request.url(),
400 }
401 }
402}
403
404#[derive(Debug)]
406pub struct ServerTlsInfo {
407 #[cfg(feature = "quinn")]
408 pub(crate) certs: Vec<Arc<rustls::sign::CertifiedKey>>,
409 pub fingerprints: Vec<String>,
410}
411
412#[serde_with::serde_as]
414#[derive(Clone, serde::Serialize, serde::Deserialize)]
415pub struct ServerId(#[serde_as(as = "serde_with::hex::Hex")] pub(crate) Vec<u8>);
416
417impl ServerId {
418 #[allow(dead_code)]
419 pub(crate) fn len(&self) -> usize {
420 self.0.len()
421 }
422}
423
424impl std::fmt::Debug for ServerId {
425 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
426 f.debug_tuple("QuicLbServerId").field(&hex::encode(&self.0)).finish()
427 }
428}
429
430impl std::str::FromStr for ServerId {
431 type Err = hex::FromHexError;
432
433 fn from_str(s: &str) -> Result<Self, Self::Err> {
434 hex::decode(s).map(Self)
435 }
436}