1use crate::{
5 Error, NEGOTIATED, OriginConsumer, OriginProducer, Session, Version, Versions,
6 coding::{Decode, Encode, Stream},
7 ietf, lite, setup,
8};
9
10#[derive(Default, Clone)]
12pub struct Server {
13 publish: Option<OriginConsumer>,
14 consume: Option<OriginProducer>,
15 versions: Versions,
16 }
19
20impl Server {
21 pub fn new() -> Self {
22 Default::default()
23 }
24
25 pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
26 self.publish = publish.into();
27 self
28 }
29
30 pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
31 self.consume = consume.into();
32 self
33 }
34
35 pub fn with_versions(mut self, versions: Versions) -> Self {
36 self.versions = versions;
37 self
38 }
39
40 pub async fn accept<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
48 if self.publish.is_none() && self.consume.is_none() {
49 tracing::warn!("not publishing or consuming anything");
50 }
51
52 let (encoding, supported) = match session.protocol() {
53 Some(ietf::ALPN_16) => {
54 let v = self
55 .versions
56 .select(ietf::Version::Draft16.into())
57 .ok_or(Error::Version)?;
58 (v, v.into())
59 }
60 Some(ietf::ALPN_15) => {
61 let v = self
62 .versions
63 .select(ietf::Version::Draft15.into())
64 .ok_or(Error::Version)?;
65 (v, v.into())
66 }
67 Some(ietf::ALPN_14) => {
68 let v = self
69 .versions
70 .select(ietf::Version::Draft14.into())
71 .ok_or(Error::Version)?;
72 (v, v.into())
73 }
74 Some(lite::ALPN_03) => {
75 self.versions
76 .select(lite::Version::Draft03.into())
77 .ok_or(Error::Version)?;
78
79 lite::start(
81 session.clone(),
82 None,
83 self.publish.clone(),
84 self.consume.clone(),
85 lite::Version::Draft03,
86 )?;
87
88 tracing::debug!(version = ?lite::Version::Draft03, "connected");
89
90 return Ok(Session::new(session));
91 }
92 Some(lite::ALPN) | None => {
93 let supported = self.versions.filter(&NEGOTIATED.into()).ok_or(Error::Version)?;
94 (ietf::Version::Draft14.into(), supported)
95 }
96 Some(p) => return Err(Error::UnknownAlpn(p.to_string())),
97 };
98
99 let mut stream = Stream::accept(&session, encoding).await?;
100
101 let mut client: setup::Client = stream.reader.decode().await?;
102 tracing::trace!(?client, "received client setup");
103
104 let version = client
106 .versions
107 .iter()
108 .flat_map(|v| Version::try_from(*v).ok())
109 .find(|v| supported.contains(v))
110 .ok_or(Error::Version)?;
111
112 let parameters = match version {
114 Version::Ietf(ietf_version) => {
115 let mut parameters = ietf::Parameters::default();
116 parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
117 parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
118 parameters.encode_bytes(ietf_version)?
119 }
120 Version::Lite(_) => lite::Parameters::default().encode_bytes(())?,
121 };
122
123 let server = setup::Server {
124 version: version.into(),
125 parameters,
126 };
127 tracing::trace!(?server, "sending server setup");
128 stream.writer.encode(&server).await?;
129
130 match version {
131 Version::Lite(version) => {
132 let stream = stream.with_version(version);
133 lite::start(
134 session.clone(),
135 Some(stream),
136 self.publish.clone(),
137 self.consume.clone(),
138 version,
139 )?;
140 }
141 Version::Ietf(version) => {
142 let parameters = ietf::Parameters::decode(&mut client.parameters, version)?;
144 let request_id_max =
145 ietf::RequestId(parameters.get_varint(ietf::ParameterVarInt::MaxRequestId).unwrap_or(0));
146
147 let stream = stream.with_version(version);
148 ietf::start(
149 session.clone(),
150 stream,
151 request_id_max,
152 false,
153 self.publish.clone(),
154 self.consume.clone(),
155 version,
156 )?;
157 }
158 };
159
160 tracing::debug!(?version, "connected");
161
162 Ok(Session::new(session))
163 }
164}