Skip to main content

moq_lite/
server.rs

1use crate::{
2	ALPN_14, ALPN_15, ALPN_16, ALPN_17, ALPN_LITE, ALPN_LITE_03, Error, NEGOTIATED, OriginConsumer, OriginProducer,
3	Session, Version, Versions,
4	coding::{Decode, Encode, Stream},
5	ietf, lite, setup,
6};
7
8/// A MoQ server session builder.
9#[derive(Default, Clone)]
10pub struct Server {
11	publish: Option<OriginConsumer>,
12	consume: Option<OriginProducer>,
13	versions: Versions,
14}
15
16impl Server {
17	pub fn new() -> Self {
18		Default::default()
19	}
20
21	pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
22		self.publish = publish.into();
23		self
24	}
25
26	pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
27		self.consume = consume.into();
28		self
29	}
30
31	pub fn with_versions(mut self, versions: Versions) -> Self {
32		self.versions = versions;
33		self
34	}
35
36	/// Perform the MoQ handshake as a server for the given session.
37	pub async fn accept<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
38		if self.publish.is_none() && self.consume.is_none() {
39			tracing::warn!("not publishing or consuming anything");
40		}
41
42		let (encoding, supported) = match session.protocol() {
43			Some(ALPN_17) => {
44				let v = self
45					.versions
46					.select(Version::Ietf(ietf::Version::Draft17))
47					.ok_or(Error::Version)?;
48
49				// Draft-17: SETUP is exchanged in the background by the session.
50				ietf::start(
51					session.clone(),
52					None,
53					None,
54					false,
55					self.publish.clone(),
56					self.consume.clone(),
57					ietf::Version::Draft17,
58				)?;
59
60				tracing::debug!(version = ?v, "connected");
61				return Ok(Session::new(session, v));
62			}
63			Some(ALPN_16) => {
64				let v = self
65					.versions
66					.select(Version::Ietf(ietf::Version::Draft16))
67					.ok_or(Error::Version)?;
68				(v, v.into())
69			}
70			Some(ALPN_15) => {
71				let v = self
72					.versions
73					.select(Version::Ietf(ietf::Version::Draft15))
74					.ok_or(Error::Version)?;
75				(v, v.into())
76			}
77			Some(ALPN_14) => {
78				let v = self
79					.versions
80					.select(Version::Ietf(ietf::Version::Draft14))
81					.ok_or(Error::Version)?;
82				(v, v.into())
83			}
84			Some(ALPN_LITE_03) => {
85				self.versions
86					.select(Version::Lite(lite::Version::Lite03))
87					.ok_or(Error::Version)?;
88
89				// Starting with draft-03, there's no more SETUP control stream.
90				lite::start(
91					session.clone(),
92					None,
93					self.publish.clone(),
94					self.consume.clone(),
95					lite::Version::Lite03,
96				)?;
97
98				return Ok(Session::new(session, lite::Version::Lite03.into()));
99			}
100			Some(ALPN_LITE) | None => {
101				let supported = self.versions.filter(&NEGOTIATED.into()).ok_or(Error::Version)?;
102				(Version::Ietf(ietf::Version::Draft14), supported)
103			}
104			Some(p) => return Err(Error::UnknownAlpn(p.to_string())),
105		};
106
107		let mut stream = Stream::accept(&session, encoding).await?;
108
109		let mut client: setup::Client = stream.reader.decode().await?;
110		tracing::trace!(?client, "received client setup");
111
112		// Choose the version to use
113		let version = client
114			.versions
115			.iter()
116			.flat_map(|v| Version::try_from(*v).ok())
117			.find(|v| supported.contains(v))
118			.ok_or(Error::Version)?;
119
120		// Encode parameters using the version-appropriate type.
121		let parameters = match version {
122			Version::Ietf(v) => {
123				let mut parameters = ietf::Parameters::default();
124				parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
125				parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
126				parameters.encode_bytes(v)?
127			}
128			Version::Lite(v) => lite::Parameters::default().encode_bytes(v)?,
129		};
130
131		let server = setup::Server {
132			version: version.into(),
133			parameters,
134		};
135		tracing::trace!(?server, "sending server setup");
136		stream.writer.encode(&server).await?;
137
138		match version {
139			Version::Lite(v) => {
140				let stream = stream.with_version(v);
141				lite::start(
142					session.clone(),
143					Some(stream),
144					self.publish.clone(),
145					self.consume.clone(),
146					v,
147				)?;
148			}
149			Version::Ietf(v) => {
150				// Decode the client's parameters to get their max request ID.
151				let parameters = ietf::Parameters::decode(&mut client.parameters, v)?;
152				let request_id_max = parameters
153					.get_varint(ietf::ParameterVarInt::MaxRequestId)
154					.map(ietf::RequestId);
155
156				let stream = stream.with_version(v);
157				ietf::start(
158					session.clone(),
159					Some(stream),
160					request_id_max,
161					false,
162					self.publish.clone(),
163					self.consume.clone(),
164					v,
165				)?;
166			}
167		};
168
169		Ok(Session::new(session, version))
170	}
171}