1use crate::{
2 ALPN_14, ALPN_15, ALPN_16, ALPN_17, ALPN_18, ALPN_LITE, ALPN_LITE_03, ALPN_LITE_04, ALPN_LITE_05_WIP, Error,
3 NEGOTIATED, OriginConsumer, OriginProducer, Session, StatsHandle, Version, Versions,
4 coding::{Decode, Encode, Stream},
5 ietf, lite, setup,
6};
7
8#[derive(Default, Clone)]
10pub struct Server {
11 publish: Option<OriginConsumer>,
12 consume: Option<OriginProducer>,
13 stats: StatsHandle,
14 versions: Versions,
15}
16
17impl Server {
18 pub fn new() -> Self {
19 Default::default()
20 }
21
22 pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
23 self.publish = publish.into();
24 self
25 }
26
27 pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
28 self.consume = consume.into();
29 self
30 }
31
32 pub fn with_stats(mut self, stats: StatsHandle) -> Self {
36 self.stats = stats;
37 self
38 }
39
40 pub fn with_origin(self, origin: OriginProducer) -> Self {
44 let consumer = origin.consume();
45 self.with_publish(consumer).with_consume(origin)
46 }
47
48 pub fn with_versions(mut self, versions: Versions) -> Self {
49 self.versions = versions;
50 self
51 }
52
53 pub async fn accept<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
55 if self.publish.is_none() && self.consume.is_none() {
56 tracing::warn!("not publishing or consuming anything");
57 }
58
59 let (encoding, supported) = match session.protocol() {
60 Some(ALPN_18) => {
61 let v = self
62 .versions
63 .select(Version::Ietf(ietf::Version::Draft18))
64 .ok_or(Error::Version)?;
65
66 ietf::start(
68 session.clone(),
69 None,
70 None,
71 false,
72 self.publish.clone(),
73 self.consume.clone(),
74 self.stats.clone(),
75 ietf::Version::Draft18,
76 )?;
77
78 tracing::debug!(version = ?v, "connected");
79 return Ok(Session::new(session, v, None));
80 }
81 Some(ALPN_17) => {
82 let v = self
83 .versions
84 .select(Version::Ietf(ietf::Version::Draft17))
85 .ok_or(Error::Version)?;
86
87 ietf::start(
89 session.clone(),
90 None,
91 None,
92 false,
93 self.publish.clone(),
94 self.consume.clone(),
95 self.stats.clone(),
96 ietf::Version::Draft17,
97 )?;
98
99 tracing::debug!(version = ?v, "connected");
100 return Ok(Session::new(session, v, None));
101 }
102 Some(ALPN_16) => {
103 let v = self
104 .versions
105 .select(Version::Ietf(ietf::Version::Draft16))
106 .ok_or(Error::Version)?;
107 (v, v.into())
108 }
109 Some(ALPN_15) => {
110 let v = self
111 .versions
112 .select(Version::Ietf(ietf::Version::Draft15))
113 .ok_or(Error::Version)?;
114 (v, v.into())
115 }
116 Some(ALPN_14) => {
117 let v = self
118 .versions
119 .select(Version::Ietf(ietf::Version::Draft14))
120 .ok_or(Error::Version)?;
121 (v, v.into())
122 }
123 Some(ALPN_LITE_05_WIP) => {
124 self.versions
125 .select(Version::Lite(lite::Version::Lite05Wip))
126 .ok_or(Error::Version)?;
127
128 let recv_bw = lite::start(
129 session.clone(),
130 None,
131 self.publish.clone(),
132 self.consume.clone(),
133 self.stats.clone(),
134 lite::Version::Lite05Wip,
135 )?;
136
137 return Ok(Session::new(session, lite::Version::Lite05Wip.into(), recv_bw));
138 }
139 Some(ALPN_LITE_04) => {
140 self.versions
141 .select(Version::Lite(lite::Version::Lite04))
142 .ok_or(Error::Version)?;
143
144 let recv_bw = lite::start(
145 session.clone(),
146 None,
147 self.publish.clone(),
148 self.consume.clone(),
149 self.stats.clone(),
150 lite::Version::Lite04,
151 )?;
152
153 return Ok(Session::new(session, lite::Version::Lite04.into(), recv_bw));
154 }
155 Some(ALPN_LITE_03) => {
156 self.versions
157 .select(Version::Lite(lite::Version::Lite03))
158 .ok_or(Error::Version)?;
159
160 let recv_bw = lite::start(
162 session.clone(),
163 None,
164 self.publish.clone(),
165 self.consume.clone(),
166 self.stats.clone(),
167 lite::Version::Lite03,
168 )?;
169
170 return Ok(Session::new(session, lite::Version::Lite03.into(), recv_bw));
171 }
172 Some(ALPN_LITE) | None => {
173 let supported = self.versions.filter(&NEGOTIATED.into()).ok_or(Error::Version)?;
174 (Version::Ietf(ietf::Version::Draft14), supported)
175 }
176 Some(p) => return Err(Error::UnknownAlpn(p.to_string())),
177 };
178
179 let mut stream = Stream::accept(&session, encoding).await?;
180
181 let mut client: setup::Client = stream.reader.decode().await?;
182
183 let version = client
185 .versions
186 .iter()
187 .flat_map(|v| Version::try_from(*v).ok())
188 .find(|v| supported.contains(v))
189 .ok_or(Error::Version)?;
190
191 let parameters = match version {
193 Version::Ietf(v) => {
194 let mut parameters = ietf::Parameters::default();
195 parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
196 parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
197 parameters.encode_bytes(v)?
198 }
199 Version::Lite(v) => lite::Parameters::default().encode_bytes(v)?,
200 };
201
202 let server = setup::Server {
203 version: version.into(),
204 parameters,
205 };
206 stream.writer.encode(&server).await?;
207
208 let recv_bw = match version {
209 Version::Lite(v) => {
210 let stream = stream.with_version(v);
211 lite::start(
212 session.clone(),
213 Some(stream),
214 self.publish.clone(),
215 self.consume.clone(),
216 self.stats.clone(),
217 v,
218 )?
219 }
220 Version::Ietf(v) => {
221 let parameters = ietf::Parameters::decode(&mut client.parameters, v)?;
223 let request_id_max = parameters
224 .get_varint(ietf::ParameterVarInt::MaxRequestId)
225 .map(ietf::RequestId);
226
227 let stream = stream.with_version(v);
228 ietf::start(
229 session.clone(),
230 Some(stream),
231 request_id_max,
232 false,
233 self.publish.clone(),
234 self.consume.clone(),
235 self.stats.clone(),
236 v,
237 )?;
238 None
239 }
240 };
241
242 Ok(Session::new(session, version, recv_bw))
243 }
244}