Skip to main content

moq_net/
server.rs

1use crate::{
2	ALPN_14, ALPN_15, ALPN_16, ALPN_17, ALPN_18, ALPN_LITE, ALPN_LITE_03, ALPN_LITE_04, ALPN_LITE_05_WIP, Error,
3	NEGOTIATED, OriginConsumer, OriginProducer, Session, StatsHandle, Version, Versions,
4	coding::{Decode, Encode, Stream},
5	ietf, lite, setup,
6};
7
8/// A MoQ server session builder.
9#[derive(Default, Clone)]
10pub struct Server {
11	publish: Option<OriginConsumer>,
12	consume: Option<OriginProducer>,
13	stats: StatsHandle,
14	versions: Versions,
15}
16
17impl Server {
18	pub fn new() -> Self {
19		Default::default()
20	}
21
22	pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
23		self.publish = publish.into();
24		self
25	}
26
27	pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
28		self.consume = consume.into();
29		self
30	}
31
32	/// Attach a tier-scoped [`StatsHandle`]. Per-broadcast and per-subscription
33	/// counters will be bumped through this handle for the lifetime of the session.
34	/// Pass [`StatsHandle::default`] (a no-op handle) to opt out.
35	pub fn with_stats(mut self, stats: StatsHandle) -> Self {
36		self.stats = stats;
37		self
38	}
39
40	/// Set both publish and consume from an `OriginProducer`.
41	///
42	/// This is equivalent to calling `with_publish(origin.consume())` and `with_consume(origin)`.
43	pub fn with_origin(self, origin: OriginProducer) -> Self {
44		let consumer = origin.consume();
45		self.with_publish(consumer).with_consume(origin)
46	}
47
48	pub fn with_versions(mut self, versions: Versions) -> Self {
49		self.versions = versions;
50		self
51	}
52
53	/// Perform the MoQ handshake as a server for the given session.
54	pub async fn accept<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
55		if self.publish.is_none() && self.consume.is_none() {
56			tracing::warn!("not publishing or consuming anything");
57		}
58
59		let (encoding, supported) = match session.protocol() {
60			Some(ALPN_18) => {
61				let v = self
62					.versions
63					.select(Version::Ietf(ietf::Version::Draft18))
64					.ok_or(Error::Version)?;
65
66				// Draft-17+: SETUP is exchanged in the background by the session.
67				ietf::start(
68					session.clone(),
69					None,
70					None,
71					false,
72					self.publish.clone(),
73					self.consume.clone(),
74					ietf::Version::Draft18,
75				)?;
76
77				tracing::debug!(version = ?v, "connected");
78				return Ok(Session::new(session, v, None));
79			}
80			Some(ALPN_17) => {
81				let v = self
82					.versions
83					.select(Version::Ietf(ietf::Version::Draft17))
84					.ok_or(Error::Version)?;
85
86				// Draft-17+: SETUP is exchanged in the background by the session.
87				ietf::start(
88					session.clone(),
89					None,
90					None,
91					false,
92					self.publish.clone(),
93					self.consume.clone(),
94					ietf::Version::Draft17,
95				)?;
96
97				tracing::debug!(version = ?v, "connected");
98				// TODO: ietf code path does not yet record stats.
99				return Ok(Session::new(session, v, None));
100			}
101			Some(ALPN_16) => {
102				let v = self
103					.versions
104					.select(Version::Ietf(ietf::Version::Draft16))
105					.ok_or(Error::Version)?;
106				(v, v.into())
107			}
108			Some(ALPN_15) => {
109				let v = self
110					.versions
111					.select(Version::Ietf(ietf::Version::Draft15))
112					.ok_or(Error::Version)?;
113				(v, v.into())
114			}
115			Some(ALPN_14) => {
116				let v = self
117					.versions
118					.select(Version::Ietf(ietf::Version::Draft14))
119					.ok_or(Error::Version)?;
120				(v, v.into())
121			}
122			Some(ALPN_LITE_05_WIP) => {
123				self.versions
124					.select(Version::Lite(lite::Version::Lite05Wip))
125					.ok_or(Error::Version)?;
126
127				let recv_bw = lite::start(
128					session.clone(),
129					None,
130					self.publish.clone(),
131					self.consume.clone(),
132					self.stats.clone(),
133					lite::Version::Lite05Wip,
134				)?;
135
136				return Ok(Session::new(session, lite::Version::Lite05Wip.into(), recv_bw));
137			}
138			Some(ALPN_LITE_04) => {
139				self.versions
140					.select(Version::Lite(lite::Version::Lite04))
141					.ok_or(Error::Version)?;
142
143				let recv_bw = lite::start(
144					session.clone(),
145					None,
146					self.publish.clone(),
147					self.consume.clone(),
148					self.stats.clone(),
149					lite::Version::Lite04,
150				)?;
151
152				return Ok(Session::new(session, lite::Version::Lite04.into(), recv_bw));
153			}
154			Some(ALPN_LITE_03) => {
155				self.versions
156					.select(Version::Lite(lite::Version::Lite03))
157					.ok_or(Error::Version)?;
158
159				// Starting with draft-03, there's no more SETUP control stream.
160				let recv_bw = lite::start(
161					session.clone(),
162					None,
163					self.publish.clone(),
164					self.consume.clone(),
165					self.stats.clone(),
166					lite::Version::Lite03,
167				)?;
168
169				return Ok(Session::new(session, lite::Version::Lite03.into(), recv_bw));
170			}
171			Some(ALPN_LITE) | None => {
172				let supported = self.versions.filter(&NEGOTIATED.into()).ok_or(Error::Version)?;
173				(Version::Ietf(ietf::Version::Draft14), supported)
174			}
175			Some(p) => return Err(Error::UnknownAlpn(p.to_string())),
176		};
177
178		let mut stream = Stream::accept(&session, encoding).await?;
179
180		let mut client: setup::Client = stream.reader.decode().await?;
181
182		// Choose the version to use
183		let version = client
184			.versions
185			.iter()
186			.flat_map(|v| Version::try_from(*v).ok())
187			.find(|v| supported.contains(v))
188			.ok_or(Error::Version)?;
189
190		// Encode parameters using the version-appropriate type.
191		let parameters = match version {
192			Version::Ietf(v) => {
193				let mut parameters = ietf::Parameters::default();
194				parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
195				parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
196				parameters.encode_bytes(v)?
197			}
198			Version::Lite(v) => lite::Parameters::default().encode_bytes(v)?,
199		};
200
201		let server = setup::Server {
202			version: version.into(),
203			parameters,
204		};
205		stream.writer.encode(&server).await?;
206
207		let recv_bw = match version {
208			Version::Lite(v) => {
209				let stream = stream.with_version(v);
210				lite::start(
211					session.clone(),
212					Some(stream),
213					self.publish.clone(),
214					self.consume.clone(),
215					self.stats.clone(),
216					v,
217				)?
218			}
219			Version::Ietf(v) => {
220				// Decode the client's parameters to get their max request ID.
221				let parameters = ietf::Parameters::decode(&mut client.parameters, v)?;
222				let request_id_max = parameters
223					.get_varint(ietf::ParameterVarInt::MaxRequestId)
224					.map(ietf::RequestId);
225
226				let stream = stream.with_version(v);
227				ietf::start(
228					session.clone(),
229					Some(stream),
230					request_id_max,
231					false,
232					self.publish.clone(),
233					self.consume.clone(),
234					v,
235				)?;
236				None
237			}
238		};
239
240		Ok(Session::new(session, version, recv_bw))
241	}
242}