1use crate::{message, AnnounceConsumer, BroadcastConsumer, Error, Origin};
2
3use web_async::spawn;
4
5mod publisher;
6mod reader;
7mod stream;
8mod subscriber;
9mod writer;
10
11use publisher::*;
12use reader::*;
13use stream::*;
14use subscriber::*;
15use writer::*;
16
17#[derive(Clone)]
22pub struct Session {
23 webtransport: web_transport::Session,
24 publisher: Publisher,
25 subscriber: Subscriber,
26}
27
28impl Session {
29 fn new(mut session: web_transport::Session, stream: Stream) -> Self {
30 let publisher = Publisher::new(session.clone());
31 let subscriber = Subscriber::new(session.clone());
32
33 let this = Self {
34 webtransport: session.clone(),
35 publisher: publisher.clone(),
36 subscriber: subscriber.clone(),
37 };
38
39 spawn(async move {
40 let res = tokio::select! {
41 res = Self::run_session(stream) => res,
42 res = Self::run_bi(session.clone(), publisher) => res,
43 res = Self::run_uni(session.clone(), subscriber) => res,
44 };
45
46 if let Err(err) = res {
47 tracing::info!(?err, "session terminated");
48 session.close(err.to_code(), &err.to_string());
49 }
50 });
51
52 this
53 }
54
55 pub async fn connect<T: Into<web_transport::Session>>(session: T) -> Result<Self, Error> {
57 let mut session = session.into();
58 let mut stream = Stream::open(&mut session, message::ControlType::Session).await?;
59 Self::connect_setup(&mut stream).await?;
60 Ok(Self::new(session, stream))
61 }
62
63 async fn connect_setup(setup: &mut Stream) -> Result<(), Error> {
64 let client = message::ClientSetup {
65 versions: [message::Version::CURRENT].into(),
66 extensions: Default::default(),
67 };
68
69 setup.writer.encode(&client).await?;
70 let server: message::ServerSetup = setup.reader.decode().await?;
71
72 tracing::debug!(version = ?server.version, "connected");
73
74 Ok(())
75 }
76
77 pub async fn accept<T: Into<web_transport::Session>>(session: T) -> Result<Self, Error> {
79 let mut session = session.into();
80 let mut stream = Stream::accept(&mut session).await?;
81 let kind = stream.reader.decode().await?;
82
83 if kind != message::ControlType::Session {
84 return Err(Error::UnexpectedStream(kind));
85 }
86
87 Self::accept_setup(&mut stream).await?;
88 Ok(Self::new(session, stream))
89 }
90
91 async fn accept_setup(control: &mut Stream) -> Result<(), Error> {
92 let client: message::ClientSetup = control.reader.decode().await?;
93
94 if !client.versions.contains(&message::Version::CURRENT) {
95 return Err(Error::Version(client.versions, [message::Version::CURRENT].into()));
96 }
97
98 let server = message::ServerSetup {
99 version: message::Version::CURRENT,
100 extensions: Default::default(),
101 };
102
103 control.writer.encode(&server).await?;
104
105 tracing::debug!(version = ?server.version, "connected");
106
107 Ok(())
108 }
109
110 async fn run_session(mut stream: Stream) -> Result<(), Error> {
111 while let Some(_info) = stream.reader.decode_maybe::<message::SubscribeOk>().await? {}
112 Err(Error::Cancel)
113 }
114
115 async fn run_uni(mut session: web_transport::Session, subscriber: Subscriber) -> Result<(), Error> {
116 loop {
117 let stream = Reader::accept(&mut session).await?;
118 let subscriber = subscriber.clone();
119
120 spawn(async move {
121 Self::run_data(stream, subscriber).await.ok();
122 });
123 }
124 }
125
126 async fn run_data(mut stream: Reader, mut subscriber: Subscriber) -> Result<(), Error> {
127 let kind = stream.decode().await?;
128
129 let res = match kind {
130 message::DataType::Group => subscriber.recv_group(&mut stream).await,
131 };
132
133 if let Err(err) = res {
134 stream.abort(&err);
135 }
136
137 Ok(())
138 }
139
140 async fn run_bi(mut session: web_transport::Session, publisher: Publisher) -> Result<(), Error> {
141 loop {
142 let stream = Stream::accept(&mut session).await?;
143 let publisher = publisher.clone();
144
145 spawn(async move {
146 Self::run_control(stream, publisher).await.ok();
147 });
148 }
149 }
150
151 async fn run_control(mut stream: Stream, mut publisher: Publisher) -> Result<(), Error> {
152 let kind = stream.reader.decode().await?;
153
154 let res = match kind {
155 message::ControlType::Session => Err(Error::UnexpectedStream(kind)),
156 message::ControlType::Announce => publisher.recv_announce(&mut stream).await,
157 message::ControlType::Subscribe => publisher.recv_subscribe(&mut stream).await,
158 };
159
160 if let Err(err) = &res {
161 stream.writer.abort(err);
162 }
163
164 res
165 }
166
167 pub fn publish<T: ToString>(&mut self, path: T, broadcast: BroadcastConsumer) {
169 self.publisher.publish(path, broadcast);
170 }
171
172 pub fn consume(&self, path: &str) -> BroadcastConsumer {
176 self.subscriber.consume(path)
177 }
178
179 pub fn announced<S: ToString>(&self, prefix: S) -> AnnounceConsumer {
184 self.subscriber.announced(prefix.to_string())
185 }
186
187 pub fn close(mut self, err: Error) {
189 self.webtransport.close(err.to_code(), &err.to_string());
190 }
191
192 pub async fn closed(&self) -> Error {
194 self.webtransport.closed().await.into()
195 }
196
197 pub async fn publish_to(&mut self, mut origin: Origin, prefix: &str) {
201 let mut announced = self.announced("");
202
203 while let Some(suffix) = announced.active().await {
204 let broadcast = self.consume(&suffix);
205
206 match prefix {
208 "" => origin.publish(suffix, broadcast),
209 prefix => origin.publish(format!("{}{}", prefix, suffix), broadcast),
210 };
211 }
212 }
213
214 pub async fn consume_from(&mut self, origin: Origin, prefix: &str) {
218 let mut remotes = origin.announced(prefix);
219
220 while let Some(suffix) = remotes.active().await {
221 match prefix {
222 "" => {
224 if let Some(upstream) = origin.consume(&suffix) {
225 self.publish(suffix, upstream);
227 }
228 }
229 prefix => {
231 let path = format!("{}{}", prefix, suffix);
232 if let Some(upstream) = origin.consume(&path) {
233 self.publish(suffix, upstream);
235 }
236 }
237 };
238 }
239 }
240}