1use crate::{
2 ALPN_14, ALPN_15, ALPN_16, ALPN_17, ALPN_LITE, ALPN_LITE_03, Error, NEGOTIATED, OriginConsumer, OriginProducer,
3 Session, Version, Versions,
4 coding::{Decode, Encode, Reader, Stream, Writer},
5 ietf, lite, setup,
6};
7
8#[derive(Default, Clone)]
10pub struct Server {
11 publish: Option<OriginConsumer>,
12 consume: Option<OriginProducer>,
13 versions: Versions,
14}
15
16impl Server {
17 pub fn new() -> Self {
18 Default::default()
19 }
20
21 pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
22 self.publish = publish.into();
23 self
24 }
25
26 pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
27 self.consume = consume.into();
28 self
29 }
30
31 pub fn with_versions(mut self, versions: Versions) -> Self {
32 self.versions = versions;
33 self
34 }
35
36 pub async fn accept<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
38 if self.publish.is_none() && self.consume.is_none() {
39 tracing::warn!("not publishing or consuming anything");
40 }
41
42 let (encoding, supported) = match session.protocol() {
43 Some(ALPN_17) => {
44 let v = self
45 .versions
46 .select(Version::Ietf(ietf::Version::Draft17))
47 .ok_or(Error::Version)?;
48
49 let ietf_v = ietf::Version::Draft17;
50
51 let mut parameters = ietf::Parameters::default();
53 parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
54 let parameters = parameters.encode_bytes(ietf_v)?;
55
56 let server_setup = setup::Server {
57 version: v.into(),
58 parameters,
59 };
60
61 let recv_fut = async {
63 let recv = session.accept_uni().await.map_err(Error::from_transport)?;
64 let mut reader: Reader<S::RecvStream, Version> = Reader::new(recv, v);
65 let _client: setup::Client = reader.decode().await?;
67 Ok::<_, Error>(reader)
68 };
69
70 let send_fut = async {
71 let send = session.open_uni().await.map_err(Error::from_transport)?;
72 let mut writer: Writer<S::SendStream, Version> = Writer::new(send, v);
73 writer.encode(&server_setup).await?;
75 Ok::<_, Error>(writer)
76 };
77
78 let (recv_result, send_result) = tokio::join!(recv_fut, send_fut);
79 let reader = recv_result?;
80 let writer = send_result?;
81
82 let stream = Stream {
84 writer: writer.with_version(ietf_v),
85 reader: reader.with_version(ietf_v),
86 };
87
88 ietf::start(
89 session.clone(),
90 stream,
91 None, false,
93 self.publish.clone(),
94 self.consume.clone(),
95 ietf_v,
96 )?;
97
98 tracing::debug!(version = ?v, "connected");
99 return Ok(Session::new(session, v));
100 }
101 Some(ALPN_16) => {
102 let v = self
103 .versions
104 .select(Version::Ietf(ietf::Version::Draft16))
105 .ok_or(Error::Version)?;
106 (v, v.into())
107 }
108 Some(ALPN_15) => {
109 let v = self
110 .versions
111 .select(Version::Ietf(ietf::Version::Draft15))
112 .ok_or(Error::Version)?;
113 (v, v.into())
114 }
115 Some(ALPN_14) => {
116 let v = self
117 .versions
118 .select(Version::Ietf(ietf::Version::Draft14))
119 .ok_or(Error::Version)?;
120 (v, v.into())
121 }
122 Some(ALPN_LITE_03) => {
123 self.versions
124 .select(Version::Lite(lite::Version::Lite03))
125 .ok_or(Error::Version)?;
126
127 lite::start(
129 session.clone(),
130 None,
131 self.publish.clone(),
132 self.consume.clone(),
133 lite::Version::Lite03,
134 )?;
135
136 return Ok(Session::new(session, lite::Version::Lite03.into()));
137 }
138 Some(ALPN_LITE) | None => {
139 let supported = self.versions.filter(&NEGOTIATED.into()).ok_or(Error::Version)?;
140 (Version::Ietf(ietf::Version::Draft14), supported)
141 }
142 Some(p) => return Err(Error::UnknownAlpn(p.to_string())),
143 };
144
145 let mut stream = Stream::accept(&session, encoding).await?;
146
147 let mut client: setup::Client = stream.reader.decode().await?;
148 tracing::trace!(?client, "received client setup");
149
150 let version = client
152 .versions
153 .iter()
154 .flat_map(|v| Version::try_from(*v).ok())
155 .find(|v| supported.contains(v))
156 .ok_or(Error::Version)?;
157
158 let parameters = match version {
160 Version::Ietf(v) => {
161 let mut parameters = ietf::Parameters::default();
162 parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
163 parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
164 parameters.encode_bytes(v)?
165 }
166 Version::Lite(v) => lite::Parameters::default().encode_bytes(v)?,
167 };
168
169 let server = setup::Server {
170 version: version.into(),
171 parameters,
172 };
173 tracing::trace!(?server, "sending server setup");
174 stream.writer.encode(&server).await?;
175
176 match version {
177 Version::Lite(v) => {
178 let stream = stream.with_version(v);
179 lite::start(
180 session.clone(),
181 Some(stream),
182 self.publish.clone(),
183 self.consume.clone(),
184 v,
185 )?;
186 }
187 Version::Ietf(v) => {
188 let parameters = ietf::Parameters::decode(&mut client.parameters, v)?;
190 let request_id_max = parameters
191 .get_varint(ietf::ParameterVarInt::MaxRequestId)
192 .map(ietf::RequestId);
193
194 let stream = stream.with_version(v);
195 ietf::start(
196 session.clone(),
197 stream,
198 request_id_max,
199 false,
200 self.publish.clone(),
201 self.consume.clone(),
202 v,
203 )?;
204 }
205 };
206
207 Ok(Session::new(session, version))
208 }
209}