1use crate::{
2 ALPN_14, ALPN_15, ALPN_16, ALPN_17, ALPN_LITE, ALPN_LITE_03, Error, NEGOTIATED, OriginConsumer, OriginProducer,
3 Session, Version, Versions,
4 coding::{Decode, Encode, Stream},
5 ietf, lite, setup,
6};
7
8#[derive(Default, Clone)]
10pub struct Server {
11 publish: Option<OriginConsumer>,
12 consume: Option<OriginProducer>,
13 versions: Versions,
14}
15
16impl Server {
17 pub fn new() -> Self {
18 Default::default()
19 }
20
21 pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
22 self.publish = publish.into();
23 self
24 }
25
26 pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
27 self.consume = consume.into();
28 self
29 }
30
31 pub fn with_origin(self, origin: OriginProducer) -> Self {
35 let consumer = origin.consume();
36 self.with_publish(consumer).with_consume(origin)
37 }
38
39 pub fn with_versions(mut self, versions: Versions) -> Self {
40 self.versions = versions;
41 self
42 }
43
44 pub async fn accept<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
46 if self.publish.is_none() && self.consume.is_none() {
47 tracing::warn!("not publishing or consuming anything");
48 }
49
50 let (encoding, supported) = match session.protocol() {
51 Some(ALPN_17) => {
52 let v = self
53 .versions
54 .select(Version::Ietf(ietf::Version::Draft17))
55 .ok_or(Error::Version)?;
56
57 ietf::start(
59 session.clone(),
60 None,
61 None,
62 false,
63 self.publish.clone(),
64 self.consume.clone(),
65 ietf::Version::Draft17,
66 )?;
67
68 tracing::debug!(version = ?v, "connected");
69 return Ok(Session::new(session, v));
70 }
71 Some(ALPN_16) => {
72 let v = self
73 .versions
74 .select(Version::Ietf(ietf::Version::Draft16))
75 .ok_or(Error::Version)?;
76 (v, v.into())
77 }
78 Some(ALPN_15) => {
79 let v = self
80 .versions
81 .select(Version::Ietf(ietf::Version::Draft15))
82 .ok_or(Error::Version)?;
83 (v, v.into())
84 }
85 Some(ALPN_14) => {
86 let v = self
87 .versions
88 .select(Version::Ietf(ietf::Version::Draft14))
89 .ok_or(Error::Version)?;
90 (v, v.into())
91 }
92 Some(ALPN_LITE_03) => {
93 self.versions
94 .select(Version::Lite(lite::Version::Lite03))
95 .ok_or(Error::Version)?;
96
97 lite::start(
99 session.clone(),
100 None,
101 self.publish.clone(),
102 self.consume.clone(),
103 lite::Version::Lite03,
104 )?;
105
106 return Ok(Session::new(session, lite::Version::Lite03.into()));
107 }
108 Some(ALPN_LITE) | None => {
109 let supported = self.versions.filter(&NEGOTIATED.into()).ok_or(Error::Version)?;
110 (Version::Ietf(ietf::Version::Draft14), supported)
111 }
112 Some(p) => return Err(Error::UnknownAlpn(p.to_string())),
113 };
114
115 let mut stream = Stream::accept(&session, encoding).await?;
116
117 let mut client: setup::Client = stream.reader.decode().await?;
118
119 let version = client
121 .versions
122 .iter()
123 .flat_map(|v| Version::try_from(*v).ok())
124 .find(|v| supported.contains(v))
125 .ok_or(Error::Version)?;
126
127 let parameters = match version {
129 Version::Ietf(v) => {
130 let mut parameters = ietf::Parameters::default();
131 parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
132 parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
133 parameters.encode_bytes(v)?
134 }
135 Version::Lite(v) => lite::Parameters::default().encode_bytes(v)?,
136 };
137
138 let server = setup::Server {
139 version: version.into(),
140 parameters,
141 };
142 stream.writer.encode(&server).await?;
143
144 match version {
145 Version::Lite(v) => {
146 let stream = stream.with_version(v);
147 lite::start(
148 session.clone(),
149 Some(stream),
150 self.publish.clone(),
151 self.consume.clone(),
152 v,
153 )?;
154 }
155 Version::Ietf(v) => {
156 let parameters = ietf::Parameters::decode(&mut client.parameters, v)?;
158 let request_id_max = parameters
159 .get_varint(ietf::ParameterVarInt::MaxRequestId)
160 .map(ietf::RequestId);
161
162 let stream = stream.with_version(v);
163 ietf::start(
164 session.clone(),
165 Some(stream),
166 request_id_max,
167 false,
168 self.publish.clone(),
169 self.consume.clone(),
170 v,
171 )?;
172 }
173 };
174
175 Ok(Session::new(session, version))
176 }
177}