1use crate::{
2 ALPN_14, ALPN_15, ALPN_16, ALPN_17, ALPN_18, ALPN_LITE, ALPN_LITE_03, ALPN_LITE_04, Error, NEGOTIATED,
3 OriginConsumer, OriginProducer, Session, StatsHandle, Version, Versions,
4 coding::{Decode, Encode, Stream},
5 ietf, lite, setup,
6};
7
8#[derive(Default, Clone)]
10pub struct Server {
11 publish: Option<OriginConsumer>,
12 consume: Option<OriginProducer>,
13 stats: StatsHandle,
14 versions: Versions,
15}
16
17impl Server {
18 pub fn new() -> Self {
19 Default::default()
20 }
21
22 pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
23 self.publish = publish.into();
24 self
25 }
26
27 pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
28 self.consume = consume.into();
29 self
30 }
31
32 pub fn with_stats(mut self, stats: StatsHandle) -> Self {
36 self.stats = stats;
37 self
38 }
39
40 pub fn with_origin(self, origin: OriginProducer) -> Self {
44 let consumer = origin.consume();
45 self.with_publish(consumer).with_consume(origin)
46 }
47
48 pub fn with_versions(mut self, versions: Versions) -> Self {
49 self.versions = versions;
50 self
51 }
52
53 pub async fn accept<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
55 if self.publish.is_none() && self.consume.is_none() {
56 tracing::warn!("not publishing or consuming anything");
57 }
58
59 let (encoding, supported) = match session.protocol() {
60 Some(ALPN_18) => {
61 let v = self
62 .versions
63 .select(Version::Ietf(ietf::Version::Draft18))
64 .ok_or(Error::Version)?;
65
66 ietf::start(
68 session.clone(),
69 None,
70 None,
71 false,
72 self.publish.clone(),
73 self.consume.clone(),
74 ietf::Version::Draft18,
75 )?;
76
77 tracing::debug!(version = ?v, "connected");
78 return Ok(Session::new(session, v, None));
79 }
80 Some(ALPN_17) => {
81 let v = self
82 .versions
83 .select(Version::Ietf(ietf::Version::Draft17))
84 .ok_or(Error::Version)?;
85
86 ietf::start(
88 session.clone(),
89 None,
90 None,
91 false,
92 self.publish.clone(),
93 self.consume.clone(),
94 ietf::Version::Draft17,
95 )?;
96
97 tracing::debug!(version = ?v, "connected");
98 return Ok(Session::new(session, v, None));
100 }
101 Some(ALPN_16) => {
102 let v = self
103 .versions
104 .select(Version::Ietf(ietf::Version::Draft16))
105 .ok_or(Error::Version)?;
106 (v, v.into())
107 }
108 Some(ALPN_15) => {
109 let v = self
110 .versions
111 .select(Version::Ietf(ietf::Version::Draft15))
112 .ok_or(Error::Version)?;
113 (v, v.into())
114 }
115 Some(ALPN_14) => {
116 let v = self
117 .versions
118 .select(Version::Ietf(ietf::Version::Draft14))
119 .ok_or(Error::Version)?;
120 (v, v.into())
121 }
122 Some(ALPN_LITE_04) => {
123 self.versions
124 .select(Version::Lite(lite::Version::Lite04))
125 .ok_or(Error::Version)?;
126
127 let recv_bw = lite::start(
128 session.clone(),
129 None,
130 self.publish.clone(),
131 self.consume.clone(),
132 self.stats.clone(),
133 lite::Version::Lite04,
134 )?;
135
136 return Ok(Session::new(session, lite::Version::Lite04.into(), recv_bw));
137 }
138 Some(ALPN_LITE_03) => {
139 self.versions
140 .select(Version::Lite(lite::Version::Lite03))
141 .ok_or(Error::Version)?;
142
143 let recv_bw = lite::start(
145 session.clone(),
146 None,
147 self.publish.clone(),
148 self.consume.clone(),
149 self.stats.clone(),
150 lite::Version::Lite03,
151 )?;
152
153 return Ok(Session::new(session, lite::Version::Lite03.into(), recv_bw));
154 }
155 Some(ALPN_LITE) | None => {
156 let supported = self.versions.filter(&NEGOTIATED.into()).ok_or(Error::Version)?;
157 (Version::Ietf(ietf::Version::Draft14), supported)
158 }
159 Some(p) => return Err(Error::UnknownAlpn(p.to_string())),
160 };
161
162 let mut stream = Stream::accept(&session, encoding).await?;
163
164 let mut client: setup::Client = stream.reader.decode().await?;
165
166 let version = client
168 .versions
169 .iter()
170 .flat_map(|v| Version::try_from(*v).ok())
171 .find(|v| supported.contains(v))
172 .ok_or(Error::Version)?;
173
174 let parameters = match version {
176 Version::Ietf(v) => {
177 let mut parameters = ietf::Parameters::default();
178 parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
179 parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
180 parameters.encode_bytes(v)?
181 }
182 Version::Lite(v) => lite::Parameters::default().encode_bytes(v)?,
183 };
184
185 let server = setup::Server {
186 version: version.into(),
187 parameters,
188 };
189 stream.writer.encode(&server).await?;
190
191 let recv_bw = match version {
192 Version::Lite(v) => {
193 let stream = stream.with_version(v);
194 lite::start(
195 session.clone(),
196 Some(stream),
197 self.publish.clone(),
198 self.consume.clone(),
199 self.stats.clone(),
200 v,
201 )?
202 }
203 Version::Ietf(v) => {
204 let parameters = ietf::Parameters::decode(&mut client.parameters, v)?;
206 let request_id_max = parameters
207 .get_varint(ietf::ParameterVarInt::MaxRequestId)
208 .map(ietf::RequestId);
209
210 let stream = stream.with_version(v);
211 ietf::start(
212 session.clone(),
213 Some(stream),
214 request_id_max,
215 false,
216 self.publish.clone(),
217 self.consume.clone(),
218 v,
219 )?;
220 None
221 }
222 };
223
224 Ok(Session::new(session, version, recv_bw))
225 }
226}