1use crate::{message, BroadcastConsumer, Error, OriginConsumer};
2
3use web_async::spawn;
4
5mod publisher;
6mod reader;
7mod stream;
8mod subscriber;
9mod writer;
10
11use publisher::*;
12use reader::*;
13use stream::*;
14use subscriber::*;
15use writer::*;
16
17#[derive(Clone)]
22pub struct Session {
23 webtransport: web_transport::Session,
24 publisher: Publisher,
25 subscriber: Subscriber,
26}
27
28impl Session {
29 fn new(mut session: web_transport::Session, stream: Stream) -> Self {
30 tracing::info!("session started");
31
32 let publisher = Publisher::new(session.clone());
33 let subscriber = Subscriber::new(session.clone());
34
35 let this = Self {
36 webtransport: session.clone(),
37 publisher: publisher.clone(),
38 subscriber: subscriber.clone(),
39 };
40
41 spawn(async move {
42 let res = tokio::select! {
43 res = Self::run_session(stream) => res,
44 res = Self::run_bi(session.clone(), publisher) => res,
45 res = Self::run_uni(session.clone(), subscriber) => res,
46 };
47
48 match res {
49 Err(Error::WebTransport(web_transport::Error::Session(_))) => {
50 tracing::info!("session terminated");
51 session.close(1, "");
52 }
53 Err(err) => {
54 tracing::warn!(?err, "session error");
55 session.close(err.to_code(), &err.to_string());
56 }
57 _ => {
58 tracing::info!("session closed");
59 session.close(0, "");
60 }
61 }
62 });
63
64 this
65 }
66
67 pub async fn connect<T: Into<web_transport::Session>>(session: T) -> Result<Self, Error> {
69 let mut session = session.into();
70 let mut stream = Stream::open(&mut session, message::ControlType::Session).await?;
71 Self::connect_setup(&mut stream).await?;
72 Ok(Self::new(session, stream))
73 }
74
75 async fn connect_setup(setup: &mut Stream) -> Result<(), Error> {
76 let client = message::ClientSetup {
77 versions: [message::Version::CURRENT].into(),
78 extensions: Default::default(),
79 };
80
81 setup.writer.encode(&client).await?;
82 let server: message::ServerSetup = setup.reader.decode().await?;
83
84 tracing::debug!(version = ?server.version, "connected");
85
86 Ok(())
87 }
88
89 pub async fn accept<T: Into<web_transport::Session>>(session: T) -> Result<Self, Error> {
91 let mut session = session.into();
92 let mut stream = Stream::accept(&mut session).await?;
93 let kind = stream.reader.decode().await?;
94
95 if kind != message::ControlType::Session {
96 return Err(Error::UnexpectedStream(kind));
97 }
98
99 Self::accept_setup(&mut stream).await?;
100 Ok(Self::new(session, stream))
101 }
102
103 async fn accept_setup(control: &mut Stream) -> Result<(), Error> {
104 let client: message::ClientSetup = control.reader.decode().await?;
105
106 if !client.versions.contains(&message::Version::CURRENT) {
107 return Err(Error::Version(client.versions, [message::Version::CURRENT].into()));
108 }
109
110 let server = message::ServerSetup {
111 version: message::Version::CURRENT,
112 extensions: Default::default(),
113 };
114
115 control.writer.encode(&server).await?;
116
117 tracing::debug!(version = ?server.version, "connected");
118
119 Ok(())
120 }
121
122 async fn run_session(mut stream: Stream) -> Result<(), Error> {
123 while let Some(_info) = stream.reader.decode_maybe::<message::SubscribeOk>().await? {}
124 Err(Error::Cancel)
125 }
126
127 async fn run_uni(mut session: web_transport::Session, subscriber: Subscriber) -> Result<(), Error> {
128 loop {
129 let stream = Reader::accept(&mut session).await?;
130 let subscriber = subscriber.clone();
131
132 spawn(async move {
133 Self::run_data(stream, subscriber).await.ok();
134 });
135 }
136 }
137
138 async fn run_data(mut stream: Reader, mut subscriber: Subscriber) -> Result<(), Error> {
139 let kind = stream.decode().await?;
140
141 let res = match kind {
142 message::DataType::Group => subscriber.recv_group(&mut stream).await,
143 };
144
145 if let Err(err) = res {
146 stream.abort(&err);
147 }
148
149 Ok(())
150 }
151
152 async fn run_bi(mut session: web_transport::Session, publisher: Publisher) -> Result<(), Error> {
153 loop {
154 let stream = Stream::accept(&mut session).await?;
155 let publisher = publisher.clone();
156
157 spawn(async move {
158 Self::run_control(stream, publisher).await.ok();
159 });
160 }
161 }
162
163 async fn run_control(mut stream: Stream, mut publisher: Publisher) -> Result<(), Error> {
164 let kind = stream.reader.decode().await?;
165
166 let res = match kind {
167 message::ControlType::Session => Err(Error::UnexpectedStream(kind)),
168 message::ControlType::Announce => publisher.recv_announce(&mut stream).await,
169 message::ControlType::Subscribe => publisher.recv_subscribe(&mut stream).await,
170 };
171
172 if let Err(err) = &res {
173 stream.writer.abort(err);
174 }
175
176 res
177 }
178
179 pub fn publish<T: ToString>(&mut self, path: T, broadcast: BroadcastConsumer) {
181 self.publisher.publish(path, broadcast);
182 }
183
184 pub fn publish_prefix(&mut self, prefix: &str, broadcasts: OriginConsumer) {
186 self.publisher.publish_prefix(prefix, broadcasts);
187 }
188
189 pub fn publish_all(&mut self, broadcasts: OriginConsumer) {
191 self.publisher.publish_all(broadcasts);
192 }
193
194 pub fn consume(&self, path: &str) -> BroadcastConsumer {
198 self.subscriber.consume(path)
199 }
200
201 pub fn consume_all(&self) -> OriginConsumer {
205 self.subscriber.consume_prefix("")
206 }
207
208 pub fn consume_prefix<S: ToString>(&self, prefix: S) -> OriginConsumer {
212 self.subscriber.consume_prefix(prefix)
213 }
214
215 pub fn close(mut self, err: Error) {
217 self.webtransport.close(err.to_code(), &err.to_string());
218 }
219
220 pub async fn closed(&self) -> Error {
222 self.webtransport.closed().await.into()
223 }
224}