1use crate::{
2 ALPN_14, ALPN_15, ALPN_16, ALPN_17, ALPN_LITE, ALPN_LITE_03, Error, NEGOTIATED, OriginConsumer, OriginProducer,
3 Session, Version, Versions,
4 coding::{Decode, Encode, Reader, Stream, Writer},
5 ietf, lite, setup,
6};
7
8#[derive(Default, Clone)]
10pub struct Server {
11 publish: Option<OriginConsumer>,
12 consume: Option<OriginProducer>,
13 versions: Versions,
14}
15
16impl Server {
17 pub fn new() -> Self {
18 Default::default()
19 }
20
21 pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
22 self.publish = publish.into();
23 self
24 }
25
26 pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
27 self.consume = consume.into();
28 self
29 }
30
31 pub fn with_versions(mut self, versions: Versions) -> Self {
32 self.versions = versions;
33 self
34 }
35
36 pub async fn accept<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
38 if self.publish.is_none() && self.consume.is_none() {
39 tracing::warn!("not publishing or consuming anything");
40 }
41
42 let (encoding, supported) = match session.protocol() {
43 Some(ALPN_17) => {
44 let v = self
45 .versions
46 .select(Version::Ietf(ietf::Version::Draft17))
47 .ok_or(Error::Version)?;
48
49 let ietf_v = ietf::Version::Draft17;
50
51 let mut parameters = ietf::Parameters::default();
53 parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
54 let parameters = parameters.encode_bytes(ietf_v)?;
55
56 let server_setup = setup::Server {
57 version: v.into(),
58 parameters,
59 };
60
61 let recv_fut = async {
63 let recv = session.accept_uni().await.map_err(Error::from_transport)?;
64 let mut reader: Reader<S::RecvStream, Version> = Reader::new(recv, v);
65 let stream_type: u64 = reader.decode().await?;
67 if stream_type != 0x2F00 {
68 return Err(Error::UnexpectedStream);
69 }
70 let _client: setup::Client = reader.decode().await?;
72 Ok::<_, Error>(reader)
73 };
74
75 let send_fut = async {
76 let send = session.open_uni().await.map_err(Error::from_transport)?;
77 let mut writer: Writer<S::SendStream, Version> = Writer::new(send, v);
78 writer.encode(&0x2F00u64).await?;
80 writer.encode(&server_setup).await?;
82 Ok::<_, Error>(writer)
83 };
84
85 let (recv_result, send_result) = tokio::join!(recv_fut, send_fut);
86 let reader = recv_result?;
87 let writer = send_result?;
88
89 let stream = Stream {
91 writer: writer.with_version(ietf_v),
92 reader: reader.with_version(ietf_v),
93 };
94
95 ietf::start(
96 session.clone(),
97 stream,
98 None, false,
100 self.publish.clone(),
101 self.consume.clone(),
102 ietf_v,
103 )?;
104
105 tracing::debug!(version = ?v, "connected");
106 return Ok(Session::new(session, v));
107 }
108 Some(ALPN_16) => {
109 let v = self
110 .versions
111 .select(Version::Ietf(ietf::Version::Draft16))
112 .ok_or(Error::Version)?;
113 (v, v.into())
114 }
115 Some(ALPN_15) => {
116 let v = self
117 .versions
118 .select(Version::Ietf(ietf::Version::Draft15))
119 .ok_or(Error::Version)?;
120 (v, v.into())
121 }
122 Some(ALPN_14) => {
123 let v = self
124 .versions
125 .select(Version::Ietf(ietf::Version::Draft14))
126 .ok_or(Error::Version)?;
127 (v, v.into())
128 }
129 Some(ALPN_LITE_03) => {
130 self.versions
131 .select(Version::Lite(lite::Version::Lite03))
132 .ok_or(Error::Version)?;
133
134 lite::start(
136 session.clone(),
137 None,
138 self.publish.clone(),
139 self.consume.clone(),
140 lite::Version::Lite03,
141 )?;
142
143 return Ok(Session::new(session, lite::Version::Lite03.into()));
144 }
145 Some(ALPN_LITE) | None => {
146 let supported = self.versions.filter(&NEGOTIATED.into()).ok_or(Error::Version)?;
147 (Version::Ietf(ietf::Version::Draft14), supported)
148 }
149 Some(p) => return Err(Error::UnknownAlpn(p.to_string())),
150 };
151
152 let mut stream = Stream::accept(&session, encoding).await?;
153
154 let mut client: setup::Client = stream.reader.decode().await?;
155 tracing::trace!(?client, "received client setup");
156
157 let version = client
159 .versions
160 .iter()
161 .flat_map(|v| Version::try_from(*v).ok())
162 .find(|v| supported.contains(v))
163 .ok_or(Error::Version)?;
164
165 let parameters = match version {
167 Version::Ietf(v) => {
168 let mut parameters = ietf::Parameters::default();
169 parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
170 parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
171 parameters.encode_bytes(v)?
172 }
173 Version::Lite(v) => lite::Parameters::default().encode_bytes(v)?,
174 };
175
176 let server = setup::Server {
177 version: version.into(),
178 parameters,
179 };
180 tracing::trace!(?server, "sending server setup");
181 stream.writer.encode(&server).await?;
182
183 match version {
184 Version::Lite(v) => {
185 let stream = stream.with_version(v);
186 lite::start(
187 session.clone(),
188 Some(stream),
189 self.publish.clone(),
190 self.consume.clone(),
191 v,
192 )?;
193 }
194 Version::Ietf(v) => {
195 let parameters = ietf::Parameters::decode(&mut client.parameters, v)?;
197 let request_id_max = parameters
198 .get_varint(ietf::ParameterVarInt::MaxRequestId)
199 .map(ietf::RequestId);
200
201 let stream = stream.with_version(v);
202 ietf::start(
203 session.clone(),
204 stream,
205 request_id_max,
206 false,
207 self.publish.clone(),
208 self.consume.clone(),
209 v,
210 )?;
211 }
212 };
213
214 Ok(Session::new(session, version))
215 }
216}