1use crate::{
2 ALPN_14, ALPN_15, ALPN_16, ALPN_17, ALPN_18, ALPN_LITE, ALPN_LITE_03, ALPN_LITE_04, ALPN_LITE_05_WIP, Error,
3 NEGOTIATED, OriginConsumer, OriginProducer, Session, StatsHandle, Version, Versions,
4 coding::{Decode, Encode, Stream},
5 ietf, lite, setup,
6};
7
8#[derive(Default, Clone)]
10pub struct Server {
11 publish: Option<OriginConsumer>,
12 consume: Option<OriginProducer>,
13 stats: StatsHandle,
14 versions: Versions,
15}
16
17impl Server {
18 pub fn new() -> Self {
19 Default::default()
20 }
21
22 pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
23 self.publish = publish.into();
24 self
25 }
26
27 pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
28 self.consume = consume.into();
29 self
30 }
31
32 pub fn with_stats(mut self, stats: StatsHandle) -> Self {
36 self.stats = stats;
37 self
38 }
39
40 pub fn with_origin(self, origin: OriginProducer) -> Self {
44 let consumer = origin.consume();
45 self.with_publish(consumer).with_consume(origin)
46 }
47
48 pub fn with_versions(mut self, versions: Versions) -> Self {
49 self.versions = versions;
50 self
51 }
52
53 pub async fn accept<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
55 if self.publish.is_none() && self.consume.is_none() {
56 tracing::warn!("not publishing or consuming anything");
57 }
58
59 let (encoding, supported) = match session.protocol() {
60 Some(ALPN_18) => {
61 let v = self
62 .versions
63 .select(Version::Ietf(ietf::Version::Draft18))
64 .ok_or(Error::Version)?;
65
66 ietf::start(
68 session.clone(),
69 None,
70 None,
71 false,
72 self.publish.clone(),
73 self.consume.clone(),
74 ietf::Version::Draft18,
75 )?;
76
77 tracing::debug!(version = ?v, "connected");
78 return Ok(Session::new(session, v, None));
79 }
80 Some(ALPN_17) => {
81 let v = self
82 .versions
83 .select(Version::Ietf(ietf::Version::Draft17))
84 .ok_or(Error::Version)?;
85
86 ietf::start(
88 session.clone(),
89 None,
90 None,
91 false,
92 self.publish.clone(),
93 self.consume.clone(),
94 ietf::Version::Draft17,
95 )?;
96
97 tracing::debug!(version = ?v, "connected");
98 return Ok(Session::new(session, v, None));
100 }
101 Some(ALPN_16) => {
102 let v = self
103 .versions
104 .select(Version::Ietf(ietf::Version::Draft16))
105 .ok_or(Error::Version)?;
106 (v, v.into())
107 }
108 Some(ALPN_15) => {
109 let v = self
110 .versions
111 .select(Version::Ietf(ietf::Version::Draft15))
112 .ok_or(Error::Version)?;
113 (v, v.into())
114 }
115 Some(ALPN_14) => {
116 let v = self
117 .versions
118 .select(Version::Ietf(ietf::Version::Draft14))
119 .ok_or(Error::Version)?;
120 (v, v.into())
121 }
122 Some(ALPN_LITE_05_WIP) => {
123 self.versions
124 .select(Version::Lite(lite::Version::Lite05Wip))
125 .ok_or(Error::Version)?;
126
127 let recv_bw = lite::start(
128 session.clone(),
129 None,
130 self.publish.clone(),
131 self.consume.clone(),
132 self.stats.clone(),
133 lite::Version::Lite05Wip,
134 )?;
135
136 return Ok(Session::new(session, lite::Version::Lite05Wip.into(), recv_bw));
137 }
138 Some(ALPN_LITE_04) => {
139 self.versions
140 .select(Version::Lite(lite::Version::Lite04))
141 .ok_or(Error::Version)?;
142
143 let recv_bw = lite::start(
144 session.clone(),
145 None,
146 self.publish.clone(),
147 self.consume.clone(),
148 self.stats.clone(),
149 lite::Version::Lite04,
150 )?;
151
152 return Ok(Session::new(session, lite::Version::Lite04.into(), recv_bw));
153 }
154 Some(ALPN_LITE_03) => {
155 self.versions
156 .select(Version::Lite(lite::Version::Lite03))
157 .ok_or(Error::Version)?;
158
159 let recv_bw = lite::start(
161 session.clone(),
162 None,
163 self.publish.clone(),
164 self.consume.clone(),
165 self.stats.clone(),
166 lite::Version::Lite03,
167 )?;
168
169 return Ok(Session::new(session, lite::Version::Lite03.into(), recv_bw));
170 }
171 Some(ALPN_LITE) | None => {
172 let supported = self.versions.filter(&NEGOTIATED.into()).ok_or(Error::Version)?;
173 (Version::Ietf(ietf::Version::Draft14), supported)
174 }
175 Some(p) => return Err(Error::UnknownAlpn(p.to_string())),
176 };
177
178 let mut stream = Stream::accept(&session, encoding).await?;
179
180 let mut client: setup::Client = stream.reader.decode().await?;
181
182 let version = client
184 .versions
185 .iter()
186 .flat_map(|v| Version::try_from(*v).ok())
187 .find(|v| supported.contains(v))
188 .ok_or(Error::Version)?;
189
190 let parameters = match version {
192 Version::Ietf(v) => {
193 let mut parameters = ietf::Parameters::default();
194 parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
195 parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
196 parameters.encode_bytes(v)?
197 }
198 Version::Lite(v) => lite::Parameters::default().encode_bytes(v)?,
199 };
200
201 let server = setup::Server {
202 version: version.into(),
203 parameters,
204 };
205 stream.writer.encode(&server).await?;
206
207 let recv_bw = match version {
208 Version::Lite(v) => {
209 let stream = stream.with_version(v);
210 lite::start(
211 session.clone(),
212 Some(stream),
213 self.publish.clone(),
214 self.consume.clone(),
215 self.stats.clone(),
216 v,
217 )?
218 }
219 Version::Ietf(v) => {
220 let parameters = ietf::Parameters::decode(&mut client.parameters, v)?;
222 let request_id_max = parameters
223 .get_varint(ietf::ParameterVarInt::MaxRequestId)
224 .map(ietf::RequestId);
225
226 let stream = stream.with_version(v);
227 ietf::start(
228 session.clone(),
229 Some(stream),
230 request_id_max,
231 false,
232 self.publish.clone(),
233 self.consume.clone(),
234 v,
235 )?;
236 None
237 }
238 };
239
240 Ok(Session::new(session, version, recv_bw))
241 }
242}