Skip to main content

moq_net/
server.rs

1use crate::{
2	ALPN_14, ALPN_15, ALPN_16, ALPN_17, ALPN_18, ALPN_LITE, ALPN_LITE_03, ALPN_LITE_04, ALPN_LITE_05_WIP, Error,
3	NEGOTIATED, OriginConsumer, OriginProducer, Session, StatsHandle, Version, Versions,
4	coding::{Decode, Encode, Stream},
5	ietf, lite, setup,
6};
7
8/// A MoQ server session builder.
9#[derive(Default, Clone)]
10pub struct Server {
11	publish: Option<OriginConsumer>,
12	consume: Option<OriginProducer>,
13	stats: StatsHandle,
14	versions: Versions,
15}
16
17impl Server {
18	pub fn new() -> Self {
19		Default::default()
20	}
21
22	pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
23		self.publish = publish.into();
24		self
25	}
26
27	pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
28		self.consume = consume.into();
29		self
30	}
31
32	/// Attach a tier-scoped [`StatsHandle`]. Per-broadcast and per-subscription
33	/// counters will be bumped through this handle for the lifetime of the session.
34	/// Pass [`StatsHandle::default`] (a no-op handle) to opt out.
35	pub fn with_stats(mut self, stats: StatsHandle) -> Self {
36		self.stats = stats;
37		self
38	}
39
40	/// Set both publish and consume from an `OriginProducer`.
41	///
42	/// This is equivalent to calling `with_publish(origin.consume())` and `with_consume(origin)`.
43	pub fn with_origin(self, origin: OriginProducer) -> Self {
44		let consumer = origin.consume();
45		self.with_publish(consumer).with_consume(origin)
46	}
47
48	pub fn with_versions(mut self, versions: Versions) -> Self {
49		self.versions = versions;
50		self
51	}
52
53	/// Perform the MoQ handshake as a server for the given session.
54	pub async fn accept<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
55		if self.publish.is_none() && self.consume.is_none() {
56			tracing::warn!("not publishing or consuming anything");
57		}
58
59		let (encoding, supported) = match session.protocol() {
60			Some(ALPN_18) => {
61				let v = self
62					.versions
63					.select(Version::Ietf(ietf::Version::Draft18))
64					.ok_or(Error::Version)?;
65
66				// Draft-17+: SETUP is exchanged in the background by the session.
67				ietf::start(
68					session.clone(),
69					None,
70					None,
71					false,
72					self.publish.clone(),
73					self.consume.clone(),
74					self.stats.clone(),
75					ietf::Version::Draft18,
76				)?;
77
78				tracing::debug!(version = ?v, "connected");
79				return Ok(Session::new(session, v, None));
80			}
81			Some(ALPN_17) => {
82				let v = self
83					.versions
84					.select(Version::Ietf(ietf::Version::Draft17))
85					.ok_or(Error::Version)?;
86
87				// Draft-17+: SETUP is exchanged in the background by the session.
88				ietf::start(
89					session.clone(),
90					None,
91					None,
92					false,
93					self.publish.clone(),
94					self.consume.clone(),
95					self.stats.clone(),
96					ietf::Version::Draft17,
97				)?;
98
99				tracing::debug!(version = ?v, "connected");
100				return Ok(Session::new(session, v, None));
101			}
102			Some(ALPN_16) => {
103				let v = self
104					.versions
105					.select(Version::Ietf(ietf::Version::Draft16))
106					.ok_or(Error::Version)?;
107				(v, v.into())
108			}
109			Some(ALPN_15) => {
110				let v = self
111					.versions
112					.select(Version::Ietf(ietf::Version::Draft15))
113					.ok_or(Error::Version)?;
114				(v, v.into())
115			}
116			Some(ALPN_14) => {
117				let v = self
118					.versions
119					.select(Version::Ietf(ietf::Version::Draft14))
120					.ok_or(Error::Version)?;
121				(v, v.into())
122			}
123			Some(ALPN_LITE_05_WIP) => {
124				self.versions
125					.select(Version::Lite(lite::Version::Lite05Wip))
126					.ok_or(Error::Version)?;
127
128				let recv_bw = lite::start(
129					session.clone(),
130					None,
131					self.publish.clone(),
132					self.consume.clone(),
133					self.stats.clone(),
134					lite::Version::Lite05Wip,
135				)?;
136
137				return Ok(Session::new(session, lite::Version::Lite05Wip.into(), recv_bw));
138			}
139			Some(ALPN_LITE_04) => {
140				self.versions
141					.select(Version::Lite(lite::Version::Lite04))
142					.ok_or(Error::Version)?;
143
144				let recv_bw = lite::start(
145					session.clone(),
146					None,
147					self.publish.clone(),
148					self.consume.clone(),
149					self.stats.clone(),
150					lite::Version::Lite04,
151				)?;
152
153				return Ok(Session::new(session, lite::Version::Lite04.into(), recv_bw));
154			}
155			Some(ALPN_LITE_03) => {
156				self.versions
157					.select(Version::Lite(lite::Version::Lite03))
158					.ok_or(Error::Version)?;
159
160				// Starting with draft-03, there's no more SETUP control stream.
161				let recv_bw = lite::start(
162					session.clone(),
163					None,
164					self.publish.clone(),
165					self.consume.clone(),
166					self.stats.clone(),
167					lite::Version::Lite03,
168				)?;
169
170				return Ok(Session::new(session, lite::Version::Lite03.into(), recv_bw));
171			}
172			Some(ALPN_LITE) | None => {
173				let supported = self.versions.filter(&NEGOTIATED.into()).ok_or(Error::Version)?;
174				(Version::Ietf(ietf::Version::Draft14), supported)
175			}
176			Some(p) => return Err(Error::UnknownAlpn(p.to_string())),
177		};
178
179		let mut stream = Stream::accept(&session, encoding).await?;
180
181		let mut client: setup::Client = stream.reader.decode().await?;
182
183		// Choose the version to use
184		let version = client
185			.versions
186			.iter()
187			.flat_map(|v| Version::try_from(*v).ok())
188			.find(|v| supported.contains(v))
189			.ok_or(Error::Version)?;
190
191		// Encode parameters using the version-appropriate type.
192		let parameters = match version {
193			Version::Ietf(v) => {
194				let mut parameters = ietf::Parameters::default();
195				parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
196				parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
197				parameters.encode_bytes(v)?
198			}
199			Version::Lite(v) => lite::Parameters::default().encode_bytes(v)?,
200		};
201
202		let server = setup::Server {
203			version: version.into(),
204			parameters,
205		};
206		stream.writer.encode(&server).await?;
207
208		let recv_bw = match version {
209			Version::Lite(v) => {
210				let stream = stream.with_version(v);
211				lite::start(
212					session.clone(),
213					Some(stream),
214					self.publish.clone(),
215					self.consume.clone(),
216					self.stats.clone(),
217					v,
218				)?
219			}
220			Version::Ietf(v) => {
221				// Decode the client's parameters to get their max request ID.
222				let parameters = ietf::Parameters::decode(&mut client.parameters, v)?;
223				let request_id_max = parameters
224					.get_varint(ietf::ParameterVarInt::MaxRequestId)
225					.map(ietf::RequestId);
226
227				let stream = stream.with_version(v);
228				ietf::start(
229					session.clone(),
230					Some(stream),
231					request_id_max,
232					false,
233					self.publish.clone(),
234					self.consume.clone(),
235					self.stats.clone(),
236					v,
237				)?;
238				None
239			}
240		};
241
242		Ok(Session::new(session, version, recv_bw))
243	}
244}