Skip to main content

moq_net/
server.rs

1use crate::{
2	ALPN_14, ALPN_15, ALPN_16, ALPN_17, ALPN_18, ALPN_LITE, ALPN_LITE_03, ALPN_LITE_04, Error, NEGOTIATED,
3	OriginConsumer, OriginProducer, Session, Version, Versions,
4	coding::{Decode, Encode, Stream},
5	ietf, lite, setup,
6};
7
8/// A MoQ server session builder.
9#[derive(Default, Clone)]
10pub struct Server {
11	publish: Option<OriginConsumer>,
12	consume: Option<OriginProducer>,
13	versions: Versions,
14}
15
16impl Server {
17	pub fn new() -> Self {
18		Default::default()
19	}
20
21	pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
22		self.publish = publish.into();
23		self
24	}
25
26	pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
27		self.consume = consume.into();
28		self
29	}
30
31	/// Set both publish and consume from an `OriginProducer`.
32	///
33	/// This is equivalent to calling `with_publish(origin.consume())` and `with_consume(origin)`.
34	pub fn with_origin(self, origin: OriginProducer) -> Self {
35		let consumer = origin.consume();
36		self.with_publish(consumer).with_consume(origin)
37	}
38
39	pub fn with_versions(mut self, versions: Versions) -> Self {
40		self.versions = versions;
41		self
42	}
43
44	/// Perform the MoQ handshake as a server for the given session.
45	pub async fn accept<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
46		if self.publish.is_none() && self.consume.is_none() {
47			tracing::warn!("not publishing or consuming anything");
48		}
49
50		let (encoding, supported) = match session.protocol() {
51			Some(ALPN_18) => {
52				let v = self
53					.versions
54					.select(Version::Ietf(ietf::Version::Draft18))
55					.ok_or(Error::Version)?;
56
57				// Draft-17+: SETUP is exchanged in the background by the session.
58				ietf::start(
59					session.clone(),
60					None,
61					None,
62					false,
63					self.publish.clone(),
64					self.consume.clone(),
65					ietf::Version::Draft18,
66				)?;
67
68				tracing::debug!(version = ?v, "connected");
69				return Ok(Session::new(session, v, None));
70			}
71			Some(ALPN_17) => {
72				let v = self
73					.versions
74					.select(Version::Ietf(ietf::Version::Draft17))
75					.ok_or(Error::Version)?;
76
77				// Draft-17+: SETUP is exchanged in the background by the session.
78				ietf::start(
79					session.clone(),
80					None,
81					None,
82					false,
83					self.publish.clone(),
84					self.consume.clone(),
85					ietf::Version::Draft17,
86				)?;
87
88				tracing::debug!(version = ?v, "connected");
89				return Ok(Session::new(session, v, None));
90			}
91			Some(ALPN_16) => {
92				let v = self
93					.versions
94					.select(Version::Ietf(ietf::Version::Draft16))
95					.ok_or(Error::Version)?;
96				(v, v.into())
97			}
98			Some(ALPN_15) => {
99				let v = self
100					.versions
101					.select(Version::Ietf(ietf::Version::Draft15))
102					.ok_or(Error::Version)?;
103				(v, v.into())
104			}
105			Some(ALPN_14) => {
106				let v = self
107					.versions
108					.select(Version::Ietf(ietf::Version::Draft14))
109					.ok_or(Error::Version)?;
110				(v, v.into())
111			}
112			Some(ALPN_LITE_04) => {
113				self.versions
114					.select(Version::Lite(lite::Version::Lite04))
115					.ok_or(Error::Version)?;
116
117				let recv_bw = lite::start(
118					session.clone(),
119					None,
120					self.publish.clone(),
121					self.consume.clone(),
122					lite::Version::Lite04,
123				)?;
124
125				return Ok(Session::new(session, lite::Version::Lite04.into(), recv_bw));
126			}
127			Some(ALPN_LITE_03) => {
128				self.versions
129					.select(Version::Lite(lite::Version::Lite03))
130					.ok_or(Error::Version)?;
131
132				// Starting with draft-03, there's no more SETUP control stream.
133				let recv_bw = lite::start(
134					session.clone(),
135					None,
136					self.publish.clone(),
137					self.consume.clone(),
138					lite::Version::Lite03,
139				)?;
140
141				return Ok(Session::new(session, lite::Version::Lite03.into(), recv_bw));
142			}
143			Some(ALPN_LITE) | None => {
144				let supported = self.versions.filter(&NEGOTIATED.into()).ok_or(Error::Version)?;
145				(Version::Ietf(ietf::Version::Draft14), supported)
146			}
147			Some(p) => return Err(Error::UnknownAlpn(p.to_string())),
148		};
149
150		let mut stream = Stream::accept(&session, encoding).await?;
151
152		let mut client: setup::Client = stream.reader.decode().await?;
153
154		// Choose the version to use
155		let version = client
156			.versions
157			.iter()
158			.flat_map(|v| Version::try_from(*v).ok())
159			.find(|v| supported.contains(v))
160			.ok_or(Error::Version)?;
161
162		// Encode parameters using the version-appropriate type.
163		let parameters = match version {
164			Version::Ietf(v) => {
165				let mut parameters = ietf::Parameters::default();
166				parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
167				parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
168				parameters.encode_bytes(v)?
169			}
170			Version::Lite(v) => lite::Parameters::default().encode_bytes(v)?,
171		};
172
173		let server = setup::Server {
174			version: version.into(),
175			parameters,
176		};
177		stream.writer.encode(&server).await?;
178
179		let recv_bw = match version {
180			Version::Lite(v) => {
181				let stream = stream.with_version(v);
182				lite::start(
183					session.clone(),
184					Some(stream),
185					self.publish.clone(),
186					self.consume.clone(),
187					v,
188				)?
189			}
190			Version::Ietf(v) => {
191				// Decode the client's parameters to get their max request ID.
192				let parameters = ietf::Parameters::decode(&mut client.parameters, v)?;
193				let request_id_max = parameters
194					.get_varint(ietf::ParameterVarInt::MaxRequestId)
195					.map(ietf::RequestId);
196
197				let stream = stream.with_version(v);
198				ietf::start(
199					session.clone(),
200					Some(stream),
201					request_id_max,
202					false,
203					self.publish.clone(),
204					self.consume.clone(),
205					v,
206				)?;
207				None
208			}
209		};
210
211		Ok(Session::new(session, version, recv_bw))
212	}
213}