1use crate::{
2 ALPN_14, ALPN_15, ALPN_16, ALPN_17, ALPN_18, ALPN_LITE, ALPN_LITE_03, ALPN_LITE_04, Error, NEGOTIATED,
3 OriginConsumer, OriginProducer, Session, Version, Versions,
4 coding::{Decode, Encode, Stream},
5 ietf, lite, setup,
6};
7
8#[derive(Default, Clone)]
10pub struct Server {
11 publish: Option<OriginConsumer>,
12 consume: Option<OriginProducer>,
13 versions: Versions,
14}
15
16impl Server {
17 pub fn new() -> Self {
18 Default::default()
19 }
20
21 pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
22 self.publish = publish.into();
23 self
24 }
25
26 pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
27 self.consume = consume.into();
28 self
29 }
30
31 pub fn with_origin(self, origin: OriginProducer) -> Self {
35 let consumer = origin.consume();
36 self.with_publish(consumer).with_consume(origin)
37 }
38
39 pub fn with_versions(mut self, versions: Versions) -> Self {
40 self.versions = versions;
41 self
42 }
43
44 pub async fn accept<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
46 if self.publish.is_none() && self.consume.is_none() {
47 tracing::warn!("not publishing or consuming anything");
48 }
49
50 let (encoding, supported) = match session.protocol() {
51 Some(ALPN_18) => {
52 let v = self
53 .versions
54 .select(Version::Ietf(ietf::Version::Draft18))
55 .ok_or(Error::Version)?;
56
57 ietf::start(
59 session.clone(),
60 None,
61 None,
62 false,
63 self.publish.clone(),
64 self.consume.clone(),
65 ietf::Version::Draft18,
66 )?;
67
68 tracing::debug!(version = ?v, "connected");
69 return Ok(Session::new(session, v, None));
70 }
71 Some(ALPN_17) => {
72 let v = self
73 .versions
74 .select(Version::Ietf(ietf::Version::Draft17))
75 .ok_or(Error::Version)?;
76
77 ietf::start(
79 session.clone(),
80 None,
81 None,
82 false,
83 self.publish.clone(),
84 self.consume.clone(),
85 ietf::Version::Draft17,
86 )?;
87
88 tracing::debug!(version = ?v, "connected");
89 return Ok(Session::new(session, v, None));
90 }
91 Some(ALPN_16) => {
92 let v = self
93 .versions
94 .select(Version::Ietf(ietf::Version::Draft16))
95 .ok_or(Error::Version)?;
96 (v, v.into())
97 }
98 Some(ALPN_15) => {
99 let v = self
100 .versions
101 .select(Version::Ietf(ietf::Version::Draft15))
102 .ok_or(Error::Version)?;
103 (v, v.into())
104 }
105 Some(ALPN_14) => {
106 let v = self
107 .versions
108 .select(Version::Ietf(ietf::Version::Draft14))
109 .ok_or(Error::Version)?;
110 (v, v.into())
111 }
112 Some(ALPN_LITE_04) => {
113 self.versions
114 .select(Version::Lite(lite::Version::Lite04))
115 .ok_or(Error::Version)?;
116
117 let recv_bw = lite::start(
118 session.clone(),
119 None,
120 self.publish.clone(),
121 self.consume.clone(),
122 lite::Version::Lite04,
123 )?;
124
125 return Ok(Session::new(session, lite::Version::Lite04.into(), recv_bw));
126 }
127 Some(ALPN_LITE_03) => {
128 self.versions
129 .select(Version::Lite(lite::Version::Lite03))
130 .ok_or(Error::Version)?;
131
132 let recv_bw = lite::start(
134 session.clone(),
135 None,
136 self.publish.clone(),
137 self.consume.clone(),
138 lite::Version::Lite03,
139 )?;
140
141 return Ok(Session::new(session, lite::Version::Lite03.into(), recv_bw));
142 }
143 Some(ALPN_LITE) | None => {
144 let supported = self.versions.filter(&NEGOTIATED.into()).ok_or(Error::Version)?;
145 (Version::Ietf(ietf::Version::Draft14), supported)
146 }
147 Some(p) => return Err(Error::UnknownAlpn(p.to_string())),
148 };
149
150 let mut stream = Stream::accept(&session, encoding).await?;
151
152 let mut client: setup::Client = stream.reader.decode().await?;
153
154 let version = client
156 .versions
157 .iter()
158 .flat_map(|v| Version::try_from(*v).ok())
159 .find(|v| supported.contains(v))
160 .ok_or(Error::Version)?;
161
162 let parameters = match version {
164 Version::Ietf(v) => {
165 let mut parameters = ietf::Parameters::default();
166 parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
167 parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
168 parameters.encode_bytes(v)?
169 }
170 Version::Lite(v) => lite::Parameters::default().encode_bytes(v)?,
171 };
172
173 let server = setup::Server {
174 version: version.into(),
175 parameters,
176 };
177 stream.writer.encode(&server).await?;
178
179 let recv_bw = match version {
180 Version::Lite(v) => {
181 let stream = stream.with_version(v);
182 lite::start(
183 session.clone(),
184 Some(stream),
185 self.publish.clone(),
186 self.consume.clone(),
187 v,
188 )?
189 }
190 Version::Ietf(v) => {
191 let parameters = ietf::Parameters::decode(&mut client.parameters, v)?;
193 let request_id_max = parameters
194 .get_varint(ietf::ParameterVarInt::MaxRequestId)
195 .map(ietf::RequestId);
196
197 let stream = stream.with_version(v);
198 ietf::start(
199 session.clone(),
200 Some(stream),
201 request_id_max,
202 false,
203 self.publish.clone(),
204 self.consume.clone(),
205 v,
206 )?;
207 None
208 }
209 };
210
211 Ok(Session::new(session, version, recv_bw))
212 }
213}