1use std::sync::Arc;
2
3use crate::{message, Error, OriginConsumer, OriginProducer};
4
5mod publisher;
6mod reader;
7mod stream;
8mod subscriber;
9mod writer;
10
11use publisher::*;
12use reader::*;
13use stream::*;
14use subscriber::*;
15use tokio::sync::oneshot;
16use writer::*;
17
18pub struct Session<S: web_transport_trait::Session> {
23 transport: S,
24}
25
26impl<S: web_transport_trait::Session + Sync> Session<S> {
27 async fn new(
28 session: S,
29 stream: Stream<S>,
30 publish: Option<OriginConsumer>,
32 subscribe: Option<OriginProducer>,
34 ) -> Result<Self, Error> {
35 let publisher = Publisher::new(session.clone(), publish);
36 let subscriber = Subscriber::new(session.clone(), subscribe);
37
38 let this = Self {
39 transport: session.clone(),
40 };
41
42 let init = oneshot::channel();
43
44 web_async::spawn(async move {
45 let res = tokio::select! {
46 res = Self::run_session(stream) => res,
47 res = publisher.run() => res,
48 res = subscriber.run(init.0) => res,
49 };
50
51 match res {
52 Err(Error::Transport(_)) => {
53 tracing::info!("session terminated");
54 session.close(1, "");
55 }
56 Err(err) => {
57 tracing::warn!(%err, "session error");
58 session.close(err.to_code(), err.to_string().as_ref());
59 }
60 _ => {
61 tracing::info!("session closed");
62 session.close(0, "");
63 }
64 }
65 });
66
67 init.1.await.map_err(|_| Error::Cancel)?;
72
73 Ok(this)
74 }
75
76 pub async fn connect(
78 session: S,
79 publish: impl Into<Option<OriginConsumer>>,
80 subscribe: impl Into<Option<OriginProducer>>,
81 ) -> Result<Self, Error> {
82 let mut stream = Stream::open(&session, message::ControlType::Session).await?;
83 Self::connect_setup(&mut stream).await?;
84 let session = Self::new(session, stream, publish.into(), subscribe.into()).await?;
85 Ok(session)
86 }
87
88 async fn connect_setup(setup: &mut Stream<S>) -> Result<(), Error> {
89 let client = message::ClientSetup {
90 versions: [message::Version::CURRENT].into(),
91 extensions: Default::default(),
92 };
93
94 setup.writer.encode(&client).await?;
95 let server: message::ServerSetup = setup.reader.decode().await?;
96
97 tracing::debug!(version = ?server.version, "connected");
98
99 Ok(())
100 }
101
102 pub async fn accept<P: Into<Option<OriginConsumer>>, C: Into<Option<OriginProducer>>>(
104 session: S,
105 publish: P,
106 subscribe: C,
107 ) -> Result<Self, Error> {
108 let mut stream = Stream::accept(&session).await?;
109 let kind = stream.reader.decode().await?;
110
111 Self::accept_setup(kind, &mut stream).await?;
112 let session = Self::new(session, stream, publish.into(), subscribe.into()).await?;
113 Ok(session)
114 }
115
116 async fn accept_setup(kind: message::ControlType, control: &mut Stream<S>) -> Result<(), Error> {
117 if kind != message::ControlType::Session && kind != message::ControlType::ClientCompat {
118 return Err(Error::UnexpectedStream(kind));
119 }
120
121 let client: message::ClientSetup = control.reader.decode().await?;
122 if !client.versions.contains(&message::Version::CURRENT) {
123 return Err(Error::Version(client.versions, [message::Version::CURRENT].into()));
124 }
125
126 let server = message::ServerSetup {
127 version: message::Version::CURRENT,
128 extensions: Default::default(),
129 };
130
131 if kind == message::ControlType::ClientCompat {
133 control.writer.encode(&message::ControlType::ServerCompat).await?;
135 }
136
137 control.writer.encode(&server).await?;
138
139 tracing::debug!(version = ?server.version, "connected");
140
141 Ok(())
142 }
143
144 async fn run_session(mut stream: Stream<S>) -> Result<(), Error> {
146 while let Some(_info) = stream.reader.decode_maybe::<message::SessionInfo>().await? {}
147 Err(Error::Cancel)
148 }
149
150 pub fn close(self, err: Error) {
152 self.transport.close(err.to_code(), err.to_string().as_ref());
153 }
154
155 pub async fn closed(&self) -> Error {
157 Error::Transport(Arc::new(self.transport.closed().await))
158 }
159}