Skip to main content

moq_net/
server.rs

1use crate::{
2	ALPN_14, ALPN_15, ALPN_16, ALPN_17, ALPN_18, ALPN_LITE, ALPN_LITE_03, ALPN_LITE_04, Error, NEGOTIATED,
3	OriginConsumer, OriginProducer, Session, StatsHandle, Version, Versions,
4	coding::{Decode, Encode, Stream},
5	ietf, lite, setup,
6};
7
8/// A MoQ server session builder.
9#[derive(Default, Clone)]
10pub struct Server {
11	publish: Option<OriginConsumer>,
12	consume: Option<OriginProducer>,
13	stats: StatsHandle,
14	versions: Versions,
15}
16
17impl Server {
18	pub fn new() -> Self {
19		Default::default()
20	}
21
22	pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
23		self.publish = publish.into();
24		self
25	}
26
27	pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
28		self.consume = consume.into();
29		self
30	}
31
32	/// Attach a tier-scoped [`StatsHandle`]. Per-broadcast and per-subscription
33	/// counters will be bumped through this handle for the lifetime of the session.
34	/// Pass [`StatsHandle::disabled`] (also the default) to opt out.
35	pub fn with_stats(mut self, stats: StatsHandle) -> Self {
36		self.stats = stats;
37		self
38	}
39
40	/// Set both publish and consume from an `OriginProducer`.
41	///
42	/// This is equivalent to calling `with_publish(origin.consume())` and `with_consume(origin)`.
43	pub fn with_origin(self, origin: OriginProducer) -> Self {
44		let consumer = origin.consume();
45		self.with_publish(consumer).with_consume(origin)
46	}
47
48	pub fn with_versions(mut self, versions: Versions) -> Self {
49		self.versions = versions;
50		self
51	}
52
53	/// Perform the MoQ handshake as a server for the given session.
54	pub async fn accept<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
55		if self.publish.is_none() && self.consume.is_none() {
56			tracing::warn!("not publishing or consuming anything");
57		}
58
59		let (encoding, supported) = match session.protocol() {
60			Some(ALPN_18) => {
61				let v = self
62					.versions
63					.select(Version::Ietf(ietf::Version::Draft18))
64					.ok_or(Error::Version)?;
65
66				// Draft-17+: SETUP is exchanged in the background by the session.
67				ietf::start(
68					session.clone(),
69					None,
70					None,
71					false,
72					self.publish.clone(),
73					self.consume.clone(),
74					ietf::Version::Draft18,
75				)?;
76
77				tracing::debug!(version = ?v, "connected");
78				return Ok(Session::new(session, v, None));
79			}
80			Some(ALPN_17) => {
81				let v = self
82					.versions
83					.select(Version::Ietf(ietf::Version::Draft17))
84					.ok_or(Error::Version)?;
85
86				// Draft-17+: SETUP is exchanged in the background by the session.
87				ietf::start(
88					session.clone(),
89					None,
90					None,
91					false,
92					self.publish.clone(),
93					self.consume.clone(),
94					ietf::Version::Draft17,
95				)?;
96
97				tracing::debug!(version = ?v, "connected");
98				// TODO: ietf code path does not yet record stats.
99				return Ok(Session::new(session, v, None));
100			}
101			Some(ALPN_16) => {
102				let v = self
103					.versions
104					.select(Version::Ietf(ietf::Version::Draft16))
105					.ok_or(Error::Version)?;
106				(v, v.into())
107			}
108			Some(ALPN_15) => {
109				let v = self
110					.versions
111					.select(Version::Ietf(ietf::Version::Draft15))
112					.ok_or(Error::Version)?;
113				(v, v.into())
114			}
115			Some(ALPN_14) => {
116				let v = self
117					.versions
118					.select(Version::Ietf(ietf::Version::Draft14))
119					.ok_or(Error::Version)?;
120				(v, v.into())
121			}
122			Some(ALPN_LITE_04) => {
123				self.versions
124					.select(Version::Lite(lite::Version::Lite04))
125					.ok_or(Error::Version)?;
126
127				let recv_bw = lite::start(
128					session.clone(),
129					None,
130					self.publish.clone(),
131					self.consume.clone(),
132					self.stats.clone(),
133					lite::Version::Lite04,
134				)?;
135
136				return Ok(Session::new(session, lite::Version::Lite04.into(), recv_bw));
137			}
138			Some(ALPN_LITE_03) => {
139				self.versions
140					.select(Version::Lite(lite::Version::Lite03))
141					.ok_or(Error::Version)?;
142
143				// Starting with draft-03, there's no more SETUP control stream.
144				let recv_bw = lite::start(
145					session.clone(),
146					None,
147					self.publish.clone(),
148					self.consume.clone(),
149					self.stats.clone(),
150					lite::Version::Lite03,
151				)?;
152
153				return Ok(Session::new(session, lite::Version::Lite03.into(), recv_bw));
154			}
155			Some(ALPN_LITE) | None => {
156				let supported = self.versions.filter(&NEGOTIATED.into()).ok_or(Error::Version)?;
157				(Version::Ietf(ietf::Version::Draft14), supported)
158			}
159			Some(p) => return Err(Error::UnknownAlpn(p.to_string())),
160		};
161
162		let mut stream = Stream::accept(&session, encoding).await?;
163
164		let mut client: setup::Client = stream.reader.decode().await?;
165
166		// Choose the version to use
167		let version = client
168			.versions
169			.iter()
170			.flat_map(|v| Version::try_from(*v).ok())
171			.find(|v| supported.contains(v))
172			.ok_or(Error::Version)?;
173
174		// Encode parameters using the version-appropriate type.
175		let parameters = match version {
176			Version::Ietf(v) => {
177				let mut parameters = ietf::Parameters::default();
178				parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
179				parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
180				parameters.encode_bytes(v)?
181			}
182			Version::Lite(v) => lite::Parameters::default().encode_bytes(v)?,
183		};
184
185		let server = setup::Server {
186			version: version.into(),
187			parameters,
188		};
189		stream.writer.encode(&server).await?;
190
191		let recv_bw = match version {
192			Version::Lite(v) => {
193				let stream = stream.with_version(v);
194				lite::start(
195					session.clone(),
196					Some(stream),
197					self.publish.clone(),
198					self.consume.clone(),
199					self.stats.clone(),
200					v,
201				)?
202			}
203			Version::Ietf(v) => {
204				// Decode the client's parameters to get their max request ID.
205				let parameters = ietf::Parameters::decode(&mut client.parameters, v)?;
206				let request_id_max = parameters
207					.get_varint(ietf::ParameterVarInt::MaxRequestId)
208					.map(ietf::RequestId);
209
210				let stream = stream.with_version(v);
211				ietf::start(
212					session.clone(),
213					Some(stream),
214					request_id_max,
215					false,
216					self.publish.clone(),
217					self.consume.clone(),
218					v,
219				)?;
220				None
221			}
222		};
223
224		Ok(Session::new(session, version, recv_bw))
225	}
226}