1use crate::{
2 ALPN_14, ALPN_15, ALPN_16, ALPN_17, ALPN_LITE, ALPN_LITE_03, ALPN_LITE_04, Error, NEGOTIATED, OriginConsumer,
3 OriginProducer, Session, Version, Versions,
4 coding::{Decode, Encode, Stream},
5 ietf, lite, setup,
6};
7
8#[derive(Default, Clone)]
10pub struct Server {
11 publish: Option<OriginConsumer>,
12 consume: Option<OriginProducer>,
13 versions: Versions,
14}
15
16impl Server {
17 pub fn new() -> Self {
18 Default::default()
19 }
20
21 pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
22 self.publish = publish.into();
23 self
24 }
25
26 pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
27 self.consume = consume.into();
28 self
29 }
30
31 pub fn with_origin(self, origin: OriginProducer) -> Self {
35 let consumer = origin.consume();
36 self.with_publish(consumer).with_consume(origin)
37 }
38
39 pub fn with_versions(mut self, versions: Versions) -> Self {
40 self.versions = versions;
41 self
42 }
43
44 pub async fn accept<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
46 if self.publish.is_none() && self.consume.is_none() {
47 tracing::warn!("not publishing or consuming anything");
48 }
49
50 let (encoding, supported) = match session.protocol() {
51 Some(ALPN_17) => {
52 let v = self
53 .versions
54 .select(Version::Ietf(ietf::Version::Draft17))
55 .ok_or(Error::Version)?;
56
57 ietf::start(
59 session.clone(),
60 None,
61 None,
62 false,
63 self.publish.clone(),
64 self.consume.clone(),
65 ietf::Version::Draft17,
66 )?;
67
68 tracing::debug!(version = ?v, "connected");
69 return Ok(Session::new(session, v, None));
70 }
71 Some(ALPN_16) => {
72 let v = self
73 .versions
74 .select(Version::Ietf(ietf::Version::Draft16))
75 .ok_or(Error::Version)?;
76 (v, v.into())
77 }
78 Some(ALPN_15) => {
79 let v = self
80 .versions
81 .select(Version::Ietf(ietf::Version::Draft15))
82 .ok_or(Error::Version)?;
83 (v, v.into())
84 }
85 Some(ALPN_14) => {
86 let v = self
87 .versions
88 .select(Version::Ietf(ietf::Version::Draft14))
89 .ok_or(Error::Version)?;
90 (v, v.into())
91 }
92 Some(ALPN_LITE_04) => {
93 self.versions
94 .select(Version::Lite(lite::Version::Lite04))
95 .ok_or(Error::Version)?;
96
97 let recv_bw = lite::start(
98 session.clone(),
99 None,
100 self.publish.clone(),
101 self.consume.clone(),
102 lite::Version::Lite04,
103 )?;
104
105 return Ok(Session::new(session, lite::Version::Lite04.into(), recv_bw));
106 }
107 Some(ALPN_LITE_03) => {
108 self.versions
109 .select(Version::Lite(lite::Version::Lite03))
110 .ok_or(Error::Version)?;
111
112 let recv_bw = lite::start(
114 session.clone(),
115 None,
116 self.publish.clone(),
117 self.consume.clone(),
118 lite::Version::Lite03,
119 )?;
120
121 return Ok(Session::new(session, lite::Version::Lite03.into(), recv_bw));
122 }
123 Some(ALPN_LITE) | None => {
124 let supported = self.versions.filter(&NEGOTIATED.into()).ok_or(Error::Version)?;
125 (Version::Ietf(ietf::Version::Draft14), supported)
126 }
127 Some(p) => return Err(Error::UnknownAlpn(p.to_string())),
128 };
129
130 let mut stream = Stream::accept(&session, encoding).await?;
131
132 let mut client: setup::Client = stream.reader.decode().await?;
133
134 let version = client
136 .versions
137 .iter()
138 .flat_map(|v| Version::try_from(*v).ok())
139 .find(|v| supported.contains(v))
140 .ok_or(Error::Version)?;
141
142 let parameters = match version {
144 Version::Ietf(v) => {
145 let mut parameters = ietf::Parameters::default();
146 parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
147 parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
148 parameters.encode_bytes(v)?
149 }
150 Version::Lite(v) => lite::Parameters::default().encode_bytes(v)?,
151 };
152
153 let server = setup::Server {
154 version: version.into(),
155 parameters,
156 };
157 stream.writer.encode(&server).await?;
158
159 let recv_bw = match version {
160 Version::Lite(v) => {
161 let stream = stream.with_version(v);
162 lite::start(
163 session.clone(),
164 Some(stream),
165 self.publish.clone(),
166 self.consume.clone(),
167 v,
168 )?
169 }
170 Version::Ietf(v) => {
171 let parameters = ietf::Parameters::decode(&mut client.parameters, v)?;
173 let request_id_max = parameters
174 .get_varint(ietf::ParameterVarInt::MaxRequestId)
175 .map(ietf::RequestId);
176
177 let stream = stream.with_version(v);
178 ietf::start(
179 session.clone(),
180 Some(stream),
181 request_id_max,
182 false,
183 self.publish.clone(),
184 self.consume.clone(),
185 v,
186 )?;
187 None
188 }
189 };
190
191 Ok(Session::new(session, version, recv_bw))
192 }
193}