1use crate::{message, Error, OriginConsumer, OriginProducer};
2
3mod publisher;
4mod reader;
5mod stream;
6mod subscriber;
7mod writer;
8
9use publisher::*;
10use reader::*;
11use stream::*;
12use subscriber::*;
13use tokio::sync::oneshot;
14use writer::*;
15
16pub struct Session {
21 pub webtransport: web_transport::Session,
22}
23
24impl Session {
25 async fn new(
26 mut session: web_transport::Session,
27 stream: Stream,
28 publish: Option<OriginConsumer>,
30 subscribe: Option<OriginProducer>,
32 ) -> Result<Self, Error> {
33 let publisher = Publisher::new(session.clone(), publish);
34 let subscriber = Subscriber::new(session.clone(), subscribe);
35
36 let this = Self {
37 webtransport: session.clone(),
38 };
39
40 let init = oneshot::channel();
41
42 web_async::spawn(async move {
43 let res = tokio::select! {
44 res = Self::run_session(stream) => res,
45 res = publisher.run() => res,
46 res = subscriber.run(init.0) => res,
47 };
48
49 match res {
50 Err(Error::WebTransport(web_transport::Error::Session(_))) => {
51 tracing::info!("session terminated");
52 session.close(1, "");
53 }
54 Err(err) => {
55 tracing::warn!(?err, "session error");
56 session.close(err.to_code(), &err.to_string());
57 }
58 _ => {
59 tracing::info!("session closed");
60 session.close(0, "");
61 }
62 }
63 });
64
65 init.1.await.map_err(|_| Error::Cancel)?;
70
71 Ok(this)
72 }
73
74 pub async fn connect(
76 session: impl Into<web_transport::Session>,
77 publish: impl Into<Option<OriginConsumer>>,
78 subscribe: impl Into<Option<OriginProducer>>,
79 ) -> Result<Self, Error> {
80 let mut session = session.into();
81 let mut stream = Stream::open(&mut session, message::ControlType::Session).await?;
82 Self::connect_setup(&mut stream).await?;
83 let session = Self::new(session, stream, publish.into(), subscribe.into()).await?;
84 Ok(session)
85 }
86
87 async fn connect_setup(setup: &mut Stream) -> Result<(), Error> {
88 let client = message::ClientSetup {
89 versions: [message::Version::CURRENT].into(),
90 extensions: Default::default(),
91 };
92
93 setup.writer.encode(&client).await?;
94 let server: message::ServerSetup = setup.reader.decode().await?;
95
96 tracing::debug!(version = ?server.version, "connected");
97
98 Ok(())
99 }
100
101 pub async fn accept<
103 T: Into<web_transport::Session>,
104 P: Into<Option<OriginConsumer>>,
105 C: Into<Option<OriginProducer>>,
106 >(
107 session: T,
108 publish: P,
109 subscribe: C,
110 ) -> Result<Self, Error> {
111 let mut session = session.into();
112 let mut stream = Stream::accept(&mut session).await?;
113 let kind = stream.reader.decode().await?;
114
115 if kind != message::ControlType::Session {
116 return Err(Error::UnexpectedStream(kind));
117 }
118
119 Self::accept_setup(&mut stream).await?;
120 let session = Self::new(session, stream, publish.into(), subscribe.into()).await?;
121 Ok(session)
122 }
123
124 async fn accept_setup(control: &mut Stream) -> Result<(), Error> {
125 let client: message::ClientSetup = control.reader.decode().await?;
126
127 if !client.versions.contains(&message::Version::CURRENT) {
128 return Err(Error::Version(client.versions, [message::Version::CURRENT].into()));
129 }
130
131 let server = message::ServerSetup {
132 version: message::Version::CURRENT,
133 extensions: Default::default(),
134 };
135
136 control.writer.encode(&server).await?;
137
138 tracing::debug!(version = ?server.version, "connected");
139
140 Ok(())
141 }
142
143 async fn run_session(mut stream: Stream) -> Result<(), Error> {
145 while let Some(_info) = stream.reader.decode_maybe::<message::SessionInfo>().await? {}
146 Err(Error::Cancel)
147 }
148
149 pub fn close(mut self, err: Error) {
151 self.webtransport.close(err.to_code(), &err.to_string());
152 }
153
154 pub async fn closed(&self) -> Error {
156 self.webtransport.closed().await.into()
157 }
158}