Skip to main content

moq_lite/
server.rs

1// TODO: Uncomment when observability feature is merged
2// use std::sync::Arc;
3
4use crate::{
5	Error, NEGOTIATED, OriginConsumer, OriginProducer, Session, Version, Versions,
6	coding::{Decode, Encode, Stream},
7	ietf, lite, setup,
8};
9
10/// A MoQ server session builder.
11#[derive(Default, Clone)]
12pub struct Server {
13	publish: Option<OriginConsumer>,
14	consume: Option<OriginProducer>,
15	versions: Versions,
16	// TODO: Uncomment when observability feature is merged
17	// stats: Option<Arc<dyn crate::Stats>>,
18}
19
20impl Server {
21	pub fn new() -> Self {
22		Default::default()
23	}
24
25	pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
26		self.publish = publish.into();
27		self
28	}
29
30	pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
31		self.consume = consume.into();
32		self
33	}
34
35	pub fn with_versions(mut self, versions: Versions) -> Self {
36		self.versions = versions;
37		self
38	}
39
40	// TODO: Uncomment when observability feature is merged
41	// pub fn with_stats(mut self, stats: impl Into<Option<Arc<dyn crate::Stats>>>) -> Self {
42	// 	self.stats = stats.into();
43	// 	self
44	// }
45
46	/// Perform the MoQ handshake as a server for the given session.
47	pub async fn accept<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
48		if self.publish.is_none() && self.consume.is_none() {
49			tracing::warn!("not publishing or consuming anything");
50		}
51
52		let (encoding, supported) = match session.protocol() {
53			Some(ietf::ALPN_16) => {
54				let v = self
55					.versions
56					.select(ietf::Version::Draft16.into())
57					.ok_or(Error::Version)?;
58				(v, v.into())
59			}
60			Some(ietf::ALPN_15) => {
61				let v = self
62					.versions
63					.select(ietf::Version::Draft15.into())
64					.ok_or(Error::Version)?;
65				(v, v.into())
66			}
67			Some(ietf::ALPN_14) => {
68				let v = self
69					.versions
70					.select(ietf::Version::Draft14.into())
71					.ok_or(Error::Version)?;
72				(v, v.into())
73			}
74			Some(lite::ALPN_03) => {
75				self.versions
76					.select(lite::Version::Draft03.into())
77					.ok_or(Error::Version)?;
78
79				// Starting with draft-03, there's no more SETUP control stream.
80				lite::start(
81					session.clone(),
82					None,
83					self.publish.clone(),
84					self.consume.clone(),
85					lite::Version::Draft03,
86				)?;
87
88				tracing::debug!(version = ?lite::Version::Draft03, "connected");
89
90				return Ok(Session::new(session));
91			}
92			Some(lite::ALPN) | None => {
93				let supported = self.versions.filter(&NEGOTIATED.into()).ok_or(Error::Version)?;
94				(ietf::Version::Draft14.into(), supported)
95			}
96			Some(p) => return Err(Error::UnknownAlpn(p.to_string())),
97		};
98
99		let mut stream = Stream::accept(&session, encoding).await?;
100
101		let mut client: setup::Client = stream.reader.decode().await?;
102		tracing::trace!(?client, "received client setup");
103
104		// Choose the version to use
105		let version = client
106			.versions
107			.iter()
108			.flat_map(|v| Version::try_from(*v).ok())
109			.find(|v| supported.contains(v))
110			.ok_or(Error::Version)?;
111
112		// Only encode parameters if we're using the IETF draft because it has max_request_id
113		let parameters = match version {
114			Version::Ietf(ietf_version) => {
115				let mut parameters = ietf::Parameters::default();
116				parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
117				parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
118				parameters.encode_bytes(ietf_version)?
119			}
120			Version::Lite(_) => lite::Parameters::default().encode_bytes(())?,
121		};
122
123		let server = setup::Server {
124			version: version.into(),
125			parameters,
126		};
127		tracing::trace!(?server, "sending server setup");
128		stream.writer.encode(&server).await?;
129
130		match version {
131			Version::Lite(version) => {
132				let stream = stream.with_version(version);
133				lite::start(
134					session.clone(),
135					Some(stream),
136					self.publish.clone(),
137					self.consume.clone(),
138					version,
139				)?;
140			}
141			Version::Ietf(version) => {
142				// Decode the client's parameters to get their max request ID.
143				let parameters = ietf::Parameters::decode(&mut client.parameters, version)?;
144				let request_id_max =
145					ietf::RequestId(parameters.get_varint(ietf::ParameterVarInt::MaxRequestId).unwrap_or(0));
146
147				let stream = stream.with_version(version);
148				ietf::start(
149					session.clone(),
150					stream,
151					request_id_max,
152					false,
153					self.publish.clone(),
154					self.consume.clone(),
155					version,
156				)?;
157			}
158		};
159
160		tracing::debug!(?version, "connected");
161
162		Ok(Session::new(session))
163	}
164}