1use crate::{message, Error, OriginConsumer, OriginProducer};
2
3mod publisher;
4mod reader;
5mod stream;
6mod subscriber;
7mod writer;
8
9use publisher::*;
10use reader::*;
11use stream::*;
12use subscriber::*;
13use writer::*;
14
15#[derive(Clone)]
20pub struct Session {
21 pub webtransport: web_transport::Session,
22}
23
24impl Session {
25 fn new(
26 mut session: web_transport::Session,
27 stream: Stream,
28 publish: Option<OriginConsumer>,
30 subscribe: Option<OriginProducer>,
32 ) -> Self {
33 let publisher = SessionPublisher::new(session.clone(), publish);
34 let subscriber = SessionSubscriber::new(session.clone());
35
36 let this = Self {
37 webtransport: session.clone(),
38 };
39
40 web_async::spawn(async move {
41 let res = tokio::select! {
42 res = Self::run_session(stream) => res,
43 res = Self::run_bi(session.clone(), publisher.clone()) => res,
44 res = Self::run_uni(session.clone(), subscriber.clone()) => res,
45 Some(Err(res)) = async move { Some(subscriber.run(subscribe?).await) } => Err(res),
48 };
49
50 match res {
51 Err(Error::WebTransport(web_transport::Error::Session(_))) => {
52 tracing::info!("session terminated");
53 session.close(1, "");
54 }
55 Err(err) => {
56 tracing::warn!(?err, "session error");
57 session.close(err.to_code(), &err.to_string());
58 }
59 _ => {
60 tracing::info!("session closed");
61 session.close(0, "");
62 }
63 }
64 });
65
66 this
67 }
68
69 pub async fn connect<
71 T: Into<web_transport::Session>,
72 P: Into<Option<OriginConsumer>>,
73 C: Into<Option<OriginProducer>>,
74 >(
75 session: T,
76 publish: P,
77 subscribe: C,
78 ) -> Result<Self, Error> {
79 let mut session = session.into();
80 let mut stream = Stream::open(&mut session, message::ControlType::Session).await?;
81 Self::connect_setup(&mut stream).await?;
82 Ok(Self::new(session, stream, publish.into(), subscribe.into()))
83 }
84
85 async fn connect_setup(setup: &mut Stream) -> Result<(), Error> {
86 let client = message::ClientSetup {
87 versions: [message::Version::CURRENT].into(),
88 extensions: Default::default(),
89 };
90
91 setup.writer.encode(&client).await?;
92 let server: message::ServerSetup = setup.reader.decode().await?;
93
94 tracing::debug!(version = ?server.version, "connected");
95
96 Ok(())
97 }
98
99 pub async fn accept<
101 T: Into<web_transport::Session>,
102 P: Into<Option<OriginConsumer>>,
103 C: Into<Option<OriginProducer>>,
104 >(
105 session: T,
106 publish: P,
107 subscribe: C,
108 ) -> Result<Self, Error> {
109 let mut session = session.into();
110 let mut stream = Stream::accept(&mut session).await?;
111 let kind = stream.reader.decode().await?;
112
113 if kind != message::ControlType::Session {
114 return Err(Error::UnexpectedStream(kind));
115 }
116
117 Self::accept_setup(&mut stream).await?;
118 Ok(Self::new(session, stream, publish.into(), subscribe.into()))
119 }
120
121 async fn accept_setup(control: &mut Stream) -> Result<(), Error> {
122 let client: message::ClientSetup = control.reader.decode().await?;
123
124 if !client.versions.contains(&message::Version::CURRENT) {
125 return Err(Error::Version(client.versions, [message::Version::CURRENT].into()));
126 }
127
128 let server = message::ServerSetup {
129 version: message::Version::CURRENT,
130 extensions: Default::default(),
131 };
132
133 control.writer.encode(&server).await?;
134
135 tracing::debug!(version = ?server.version, "connected");
136
137 Ok(())
138 }
139
140 async fn run_session(mut stream: Stream) -> Result<(), Error> {
141 while let Some(_info) = stream.reader.decode_maybe::<message::SubscribeOk>().await? {}
142 Err(Error::Cancel)
143 }
144
145 async fn run_uni(mut session: web_transport::Session, subscriber: SessionSubscriber) -> Result<(), Error> {
146 loop {
147 let stream = Reader::accept(&mut session).await?;
148 let subscriber = subscriber.clone();
149
150 web_async::spawn(async move {
151 Self::run_data(stream, subscriber).await.ok();
152 });
153 }
154 }
155
156 async fn run_data(mut stream: Reader, mut subscriber: SessionSubscriber) -> Result<(), Error> {
157 let kind = stream.decode().await?;
158
159 let res = match kind {
160 message::DataType::Group => subscriber.recv_group(&mut stream).await,
161 };
162
163 if let Err(err) = res {
164 stream.abort(&err);
165 }
166
167 Ok(())
168 }
169
170 async fn run_bi(mut session: web_transport::Session, publisher: SessionPublisher) -> Result<(), Error> {
171 loop {
172 let stream = Stream::accept(&mut session).await?;
173 let publisher = publisher.clone();
174
175 web_async::spawn(async move {
176 Self::run_control(stream, publisher).await.ok();
177 });
178 }
179 }
180
181 async fn run_control(mut stream: Stream, mut publisher: SessionPublisher) -> Result<(), Error> {
182 let kind = stream.reader.decode().await?;
183
184 let res = match kind {
185 message::ControlType::Session => Err(Error::UnexpectedStream(kind)),
186 message::ControlType::Announce => publisher.recv_announce(&mut stream).await,
187 message::ControlType::Subscribe => publisher.recv_subscribe(&mut stream).await,
188 };
189
190 if let Err(err) = &res {
191 stream.writer.abort(err);
192 }
193
194 res
195 }
196
197 pub fn close(mut self, err: Error) {
199 self.webtransport.close(err.to_code(), &err.to_string());
200 }
201
202 pub async fn closed(&self) -> Error {
204 self.webtransport.closed().await.into()
205 }
206}