1use std::net;
2use std::path::PathBuf;
3
4use crate::QuicBackend;
5use moq_lite::Session;
6use std::sync::{Arc, RwLock};
7use url::Url;
8#[cfg(feature = "iroh")]
9use web_transport_iroh::iroh;
10
11use anyhow::Context;
12
13use futures::FutureExt;
14use futures::future::BoxFuture;
15use futures::stream::FuturesUnordered;
16use futures::stream::StreamExt;
17
18#[derive(clap::Args, Clone, Default, Debug, serde::Serialize, serde::Deserialize)]
23#[serde(deny_unknown_fields)]
24#[non_exhaustive]
25pub struct ServerTlsConfig {
26 #[arg(long = "tls-cert", id = "tls-cert", env = "MOQ_SERVER_TLS_CERT")]
28 #[serde(default, skip_serializing_if = "Vec::is_empty")]
29 pub cert: Vec<PathBuf>,
30
31 #[arg(long = "tls-key", id = "tls-key", env = "MOQ_SERVER_TLS_KEY")]
33 #[serde(default, skip_serializing_if = "Vec::is_empty")]
34 pub key: Vec<PathBuf>,
35
36 #[arg(
39 long = "tls-generate",
40 id = "tls-generate",
41 value_delimiter = ',',
42 env = "MOQ_SERVER_TLS_GENERATE"
43 )]
44 #[serde(default, skip_serializing_if = "Vec::is_empty")]
45 pub generate: Vec<String>,
46}
47
48#[derive(clap::Args, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
50#[serde(deny_unknown_fields, default)]
51#[non_exhaustive]
52pub struct ServerConfig {
53 #[serde(alias = "listen")]
56 #[arg(id = "server-bind", long = "server-bind", alias = "listen", env = "MOQ_SERVER_BIND")]
57 pub bind: Option<net::SocketAddr>,
58
59 #[arg(id = "server-backend", long = "server-backend", env = "MOQ_SERVER_BACKEND")]
62 pub backend: Option<QuicBackend>,
63
64 #[arg(id = "server-quic-lb-id", long = "server-quic-lb-id", env = "MOQ_SERVER_QUIC_LB_ID")]
67 #[serde(default, skip_serializing_if = "Option::is_none")]
68 pub quic_lb_id: Option<ServerId>,
69
70 #[arg(
73 id = "server-quic-lb-nonce",
74 long = "server-quic-lb-nonce",
75 requires = "server-quic-lb-id",
76 env = "MOQ_SERVER_QUIC_LB_NONCE"
77 )]
78 #[serde(default, skip_serializing_if = "Option::is_none")]
79 pub quic_lb_nonce: Option<usize>,
80
81 #[command(flatten)]
82 #[serde(default)]
83 pub tls: ServerTlsConfig,
84}
85
86impl ServerConfig {
87 pub fn init(self) -> anyhow::Result<Server> {
88 Server::new(self)
89 }
90}
91
92pub struct Server {
96 moq: moq_lite::Server,
97 accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<Request>>>,
98 #[cfg(feature = "iroh")]
99 iroh: Option<iroh::Endpoint>,
100 #[cfg(feature = "quinn")]
101 quinn: Option<crate::quinn::QuinnServer>,
102 #[cfg(feature = "quiche")]
103 quiche: Option<crate::quiche::QuicheServer>,
104}
105
106impl Server {
107 pub fn new(config: ServerConfig) -> anyhow::Result<Self> {
108 let backend = config.backend.clone().unwrap_or({
109 #[cfg(feature = "quinn")]
110 {
111 QuicBackend::Quinn
112 }
113 #[cfg(all(feature = "quiche", not(feature = "quinn")))]
114 {
115 QuicBackend::Quiche
116 }
117 #[cfg(all(not(feature = "quiche"), not(feature = "quinn")))]
118 panic!("no QUIC backend compiled; enable quinn or quiche feature");
119 });
120
121 #[cfg(feature = "quinn")]
122 let quinn = match backend {
123 QuicBackend::Quinn => Some(crate::quinn::QuinnServer::new(config.clone())?),
124 _ => None,
125 };
126
127 #[cfg(feature = "quiche")]
128 let quiche = match backend {
129 QuicBackend::Quiche => Some(crate::quiche::QuicheServer::new(config)?),
130 _ => None,
131 };
132
133 Ok(Server {
134 accept: Default::default(),
135 moq: moq_lite::Server::new(),
136 #[cfg(feature = "iroh")]
137 iroh: None,
138 #[cfg(feature = "quinn")]
139 quinn,
140 #[cfg(feature = "quiche")]
141 quiche,
142 })
143 }
144
145 #[cfg(feature = "iroh")]
146 pub fn with_iroh(mut self, iroh: Option<iroh::Endpoint>) -> Self {
147 self.iroh = iroh;
148 self
149 }
150
151 pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
152 self.moq = self.moq.with_publish(publish);
153 self
154 }
155
156 pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
157 self.moq = self.moq.with_consume(consume);
158 self
159 }
160
161 pub fn tls_info(&self) -> Arc<RwLock<ServerTlsInfo>> {
163 #[cfg(feature = "quinn")]
164 if let Some(quinn) = self.quinn.as_ref() {
165 return quinn.tls_info();
166 }
167 #[cfg(feature = "quiche")]
168 if let Some(quiche) = self.quiche.as_ref() {
169 return quiche.tls_info();
170 }
171 unreachable!("no QUIC backend compiled");
172 }
173
174 #[cfg(not(any(feature = "quinn", feature = "quiche", feature = "iroh")))]
175 pub async fn accept(&mut self) -> Option<Request> {
176 unreachable!("no QUIC backend compiled; enable quinn, quiche, or iroh feature");
177 }
178
179 #[cfg(any(feature = "quinn", feature = "quiche", feature = "iroh"))]
187 pub async fn accept(&mut self) -> Option<Request> {
188 loop {
189 #[cfg(feature = "iroh")]
191 let iroh_accept = async {
192 #[cfg(feature = "iroh")]
193 if let Some(endpoint) = self.iroh.as_mut() {
194 return endpoint.accept().await;
195 }
196 None
197 };
198 #[cfg(not(feature = "iroh"))]
199 let iroh_accept = async { None::<()> };
200
201 #[cfg(feature = "quinn")]
202 let quinn_accept = async {
203 #[cfg(feature = "quinn")]
204 if let Some(quinn) = self.quinn.as_mut() {
205 return quinn.accept().await;
206 }
207 None
208 };
209 #[cfg(not(feature = "quinn"))]
210 let quinn_accept = async { None::<()> };
211
212 #[cfg(feature = "quiche")]
213 let quiche_accept = async {
214 #[cfg(feature = "quiche")]
215 if let Some(quiche) = self.quiche.as_mut() {
216 return quiche.accept().await;
217 }
218 None
219 };
220 #[cfg(not(feature = "quiche"))]
221 let quiche_accept = async { None::<()> };
222
223 let server = self.moq.clone();
224
225 tokio::select! {
226 Some(conn) = quinn_accept => {
227 #[cfg(feature = "quinn")]
228 self.accept.push(async move {
229 let quinn = super::quinn::QuinnRequest::accept(conn).await?;
230 Ok(Request {
231 server,
232 kind: RequestKind::Quinn(quinn),
233 })
234 }.boxed());
235 }
236 Some(conn) = quiche_accept => {
237 #[cfg(feature = "quiche")]
238 self.accept.push(async move {
239 let quiche = super::quiche::QuicheRequest::accept(conn).await?;
240 Ok(Request {
241 server,
242 kind: RequestKind::Quiche(quiche),
243 })
244 }.boxed());
245 }
246 Some(conn) = iroh_accept => {
247 #[cfg(feature = "iroh")]
248 self.accept.push(async move {
249 let iroh = super::iroh::IrohRequest::accept(conn).await?;
250 Ok(Request {
251 server,
252 kind: RequestKind::Iroh(iroh),
253 })
254 }.boxed());
255 }
256 Some(res) = self.accept.next() => {
257 match res {
258 Ok(session) => return Some(session),
259 Err(err) => tracing::debug!(%err, "failed to accept session"),
260 }
261 }
262 _ = tokio::signal::ctrl_c() => {
263 self.close();
264 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
265 return None;
266 }
267 }
268 }
269 }
270
271 #[cfg(feature = "iroh")]
272 pub fn iroh_endpoint(&self) -> Option<&iroh::Endpoint> {
273 self.iroh.as_ref()
274 }
275
276 pub fn local_addr(&self) -> anyhow::Result<net::SocketAddr> {
277 #[cfg(feature = "quinn")]
278 if let Some(quinn) = self.quinn.as_ref() {
279 return quinn.local_addr();
280 }
281 #[cfg(feature = "quiche")]
282 if let Some(quiche) = self.quiche.as_ref() {
283 return quiche.local_addr();
284 }
285 unreachable!("no QUIC backend compiled");
286 }
287
288 pub fn close(&mut self) {
289 #[cfg(feature = "quinn")]
290 if let Some(quinn) = self.quinn.as_mut() {
291 quinn.close();
292 }
293 #[cfg(feature = "quiche")]
294 if let Some(quiche) = self.quiche.as_mut() {
295 quiche.close();
296 }
297 unreachable!("no QUIC backend compiled");
298 }
299}
300
301pub(crate) enum RequestKind {
303 #[cfg(feature = "quinn")]
304 Quinn(crate::quinn::QuinnRequest),
305 #[cfg(feature = "quiche")]
306 Quiche(crate::quiche::QuicheRequest),
307 #[cfg(feature = "iroh")]
308 Iroh(crate::iroh::IrohRequest),
309}
310
311pub struct Request {
316 server: moq_lite::Server,
317 kind: RequestKind,
318}
319
320impl Request {
321 pub async fn close(self, _code: u16) -> anyhow::Result<()> {
323 match self.kind {
324 #[cfg(feature = "quinn")]
325 RequestKind::Quinn(request) => {
326 let status = web_transport_quinn::http::StatusCode::from_u16(_code).context("invalid status code")?;
327 request.close(status).await?;
328 Ok(())
329 }
330 #[cfg(feature = "quiche")]
331 RequestKind::Quiche(request) => {
332 let status = web_transport_quiche::http::StatusCode::from_u16(_code).context("invalid status code")?;
333 request
334 .reject(status)
335 .await
336 .map_err(|e| anyhow::anyhow!("failed to close quiche WebTransport request: {e}"))?;
337 Ok(())
338 }
339 #[cfg(feature = "iroh")]
340 RequestKind::Iroh(request) => {
341 let status = web_transport_iroh::http::StatusCode::from_u16(_code).context("invalid status code")?;
342 request.close(status).await?;
343 Ok(())
344 }
345 }
346 }
347
348 pub fn with_publish(mut self, publish: impl Into<Option<moq_lite::OriginConsumer>>) -> Self {
350 self.server = self.server.with_publish(publish);
351 self
352 }
353
354 pub fn with_consume(mut self, consume: impl Into<Option<moq_lite::OriginProducer>>) -> Self {
356 self.server = self.server.with_consume(consume);
357 self
358 }
359
360 pub async fn ok(self) -> anyhow::Result<Session> {
362 match self.kind {
363 #[cfg(feature = "quinn")]
364 RequestKind::Quinn(request) => Ok(self.server.accept(request.ok().await?).await?),
365 #[cfg(feature = "quiche")]
366 RequestKind::Quiche(request) => {
367 let conn = request
368 .ok()
369 .await
370 .map_err(|e| anyhow::anyhow!("failed to accept quiche WebTransport: {e}"))?;
371 Ok(self.server.accept(conn).await?)
372 }
373 #[cfg(feature = "iroh")]
374 RequestKind::Iroh(request) => Ok(self.server.accept(request.ok().await?).await?),
375 }
376 }
377
378 pub fn url(&self) -> Option<&Url> {
380 #[cfg(not(any(feature = "quinn", feature = "quiche", feature = "iroh")))]
381 unreachable!("no QUIC backend compiled; enable quinn, quiche, or iroh feature");
382
383 match self.kind {
384 #[cfg(feature = "quinn")]
385 RequestKind::Quinn(ref request) => request.url(),
386 #[cfg(feature = "quiche")]
387 RequestKind::Quiche(ref request) => request.url(),
388 #[cfg(feature = "iroh")]
389 RequestKind::Iroh(ref request) => request.url(),
390 }
391 }
392}
393
394#[derive(Debug)]
396pub struct ServerTlsInfo {
397 #[cfg(feature = "quinn")]
398 pub(crate) certs: Vec<Arc<rustls::sign::CertifiedKey>>,
399 pub fingerprints: Vec<String>,
400}
401
402#[serde_with::serde_as]
404#[derive(Clone, serde::Serialize, serde::Deserialize)]
405pub struct ServerId(#[serde_as(as = "serde_with::hex::Hex")] pub(crate) Vec<u8>);
406
407impl ServerId {
408 #[allow(dead_code)]
409 pub(crate) fn len(&self) -> usize {
410 self.0.len()
411 }
412}
413
414impl std::fmt::Debug for ServerId {
415 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
416 f.debug_tuple("QuicLbServerId").field(&hex::encode(&self.0)).finish()
417 }
418}
419
420impl std::str::FromStr for ServerId {
421 type Err = hex::FromHexError;
422
423 fn from_str(s: &str) -> Result<Self, Self::Err> {
424 hex::decode(s).map(Self)
425 }
426}