Skip to main content

moq_lite/
server.rs

1use crate::{
2	ALPN_14, ALPN_15, ALPN_16, ALPN_17, ALPN_LITE, ALPN_LITE_03, Error, NEGOTIATED, OriginConsumer, OriginProducer,
3	Session, Version, Versions,
4	coding::{Decode, Encode, Reader, Stream, Writer},
5	ietf, lite, setup,
6};
7
8/// A MoQ server session builder.
9#[derive(Default, Clone)]
10pub struct Server {
11	publish: Option<OriginConsumer>,
12	consume: Option<OriginProducer>,
13	versions: Versions,
14}
15
16impl Server {
17	pub fn new() -> Self {
18		Default::default()
19	}
20
21	pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
22		self.publish = publish.into();
23		self
24	}
25
26	pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
27		self.consume = consume.into();
28		self
29	}
30
31	pub fn with_versions(mut self, versions: Versions) -> Self {
32		self.versions = versions;
33		self
34	}
35
36	/// Perform the MoQ handshake as a server for the given session.
37	pub async fn accept<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
38		if self.publish.is_none() && self.consume.is_none() {
39			tracing::warn!("not publishing or consuming anything");
40		}
41
42		let (encoding, supported) = match session.protocol() {
43			Some(ALPN_17) => {
44				let v = self
45					.versions
46					.select(Version::Ietf(ietf::Version::Draft17))
47					.ok_or(Error::Version)?;
48
49				let ietf_v = ietf::Version::Draft17;
50
51				// Draft-17: SETUP uses uni streams
52				let mut parameters = ietf::Parameters::default();
53				parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
54				let parameters = parameters.encode_bytes(ietf_v)?;
55
56				let server_setup = setup::Server {
57					version: v.into(),
58					parameters,
59				};
60
61				// Accept and send SETUP concurrently on uni streams
62				let recv_fut = async {
63					let recv = session.accept_uni().await.map_err(Error::from_transport)?;
64					let mut reader: Reader<S::RecvStream, Version> = Reader::new(recv, v);
65					// Read stream type
66					let stream_type: u64 = reader.decode().await?;
67					if stream_type != 0x2F00 {
68						return Err(Error::UnexpectedStream);
69					}
70					// Read client SETUP message
71					let _client: setup::Client = reader.decode().await?;
72					Ok::<_, Error>(reader)
73				};
74
75				let send_fut = async {
76					let send = session.open_uni().await.map_err(Error::from_transport)?;
77					let mut writer: Writer<S::SendStream, Version> = Writer::new(send, v);
78					// Write stream type 0x2F00
79					writer.encode(&0x2F00u64).await?;
80					// Write SETUP message
81					writer.encode(&server_setup).await?;
82					Ok::<_, Error>(writer)
83				};
84
85				let (recv_result, send_result) = tokio::join!(recv_fut, send_fut);
86				let reader = recv_result?;
87				let writer = send_result?;
88
89				// Construct a Stream from the uni streams for GOAWAY/control
90				let stream = Stream {
91					writer: writer.with_version(ietf_v),
92					reader: reader.with_version(ietf_v),
93				};
94
95				ietf::start(
96					session.clone(),
97					stream,
98					None, // Draft-17 removed MaxRequestId
99					false,
100					self.publish.clone(),
101					self.consume.clone(),
102					ietf_v,
103				)?;
104
105				tracing::debug!(version = ?v, "connected");
106				return Ok(Session::new(session, v));
107			}
108			Some(ALPN_16) => {
109				let v = self
110					.versions
111					.select(Version::Ietf(ietf::Version::Draft16))
112					.ok_or(Error::Version)?;
113				(v, v.into())
114			}
115			Some(ALPN_15) => {
116				let v = self
117					.versions
118					.select(Version::Ietf(ietf::Version::Draft15))
119					.ok_or(Error::Version)?;
120				(v, v.into())
121			}
122			Some(ALPN_14) => {
123				let v = self
124					.versions
125					.select(Version::Ietf(ietf::Version::Draft14))
126					.ok_or(Error::Version)?;
127				(v, v.into())
128			}
129			Some(ALPN_LITE_03) => {
130				self.versions
131					.select(Version::Lite(lite::Version::Lite03))
132					.ok_or(Error::Version)?;
133
134				// Starting with draft-03, there's no more SETUP control stream.
135				lite::start(
136					session.clone(),
137					None,
138					self.publish.clone(),
139					self.consume.clone(),
140					lite::Version::Lite03,
141				)?;
142
143				return Ok(Session::new(session, lite::Version::Lite03.into()));
144			}
145			Some(ALPN_LITE) | None => {
146				let supported = self.versions.filter(&NEGOTIATED.into()).ok_or(Error::Version)?;
147				(Version::Ietf(ietf::Version::Draft14), supported)
148			}
149			Some(p) => return Err(Error::UnknownAlpn(p.to_string())),
150		};
151
152		let mut stream = Stream::accept(&session, encoding).await?;
153
154		let mut client: setup::Client = stream.reader.decode().await?;
155		tracing::trace!(?client, "received client setup");
156
157		// Choose the version to use
158		let version = client
159			.versions
160			.iter()
161			.flat_map(|v| Version::try_from(*v).ok())
162			.find(|v| supported.contains(v))
163			.ok_or(Error::Version)?;
164
165		// Encode parameters using the version-appropriate type.
166		let parameters = match version {
167			Version::Ietf(v) => {
168				let mut parameters = ietf::Parameters::default();
169				parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
170				parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
171				parameters.encode_bytes(v)?
172			}
173			Version::Lite(v) => lite::Parameters::default().encode_bytes(v)?,
174		};
175
176		let server = setup::Server {
177			version: version.into(),
178			parameters,
179		};
180		tracing::trace!(?server, "sending server setup");
181		stream.writer.encode(&server).await?;
182
183		match version {
184			Version::Lite(v) => {
185				let stream = stream.with_version(v);
186				lite::start(
187					session.clone(),
188					Some(stream),
189					self.publish.clone(),
190					self.consume.clone(),
191					v,
192				)?;
193			}
194			Version::Ietf(v) => {
195				// Decode the client's parameters to get their max request ID.
196				let parameters = ietf::Parameters::decode(&mut client.parameters, v)?;
197				let request_id_max = parameters
198					.get_varint(ietf::ParameterVarInt::MaxRequestId)
199					.map(ietf::RequestId);
200
201				let stream = stream.with_version(v);
202				ietf::start(
203					session.clone(),
204					stream,
205					request_id_max,
206					false,
207					self.publish.clone(),
208					self.consume.clone(),
209					v,
210				)?;
211			}
212		};
213
214		Ok(Session::new(session, version))
215	}
216}