moq_transfork/session/
mod.rs1use crate::{message, AnnouncedConsumer, Error, Filter, RouterConsumer, Track, TrackConsumer};
2
3use moq_async::{spawn, Close, OrClose};
4
5mod publisher;
6mod reader;
7mod stream;
8mod subscriber;
9mod writer;
10
11use publisher::*;
12use reader::*;
13use stream::*;
14use subscriber::*;
15use writer::*;
16
17#[derive(Clone)]
22pub struct Session {
23 webtransport: web_transport::Session,
24 publisher: Publisher,
25 subscriber: Subscriber,
26}
27
28impl Session {
29 fn new(mut session: web_transport::Session, stream: Stream) -> Self {
30 let publisher = Publisher::new(session.clone());
31 let subscriber = Subscriber::new(session.clone());
32
33 let this = Self {
34 webtransport: session.clone(),
35 publisher: publisher.clone(),
36 subscriber: subscriber.clone(),
37 };
38
39 spawn(async move {
40 let res = tokio::select! {
41 res = Self::run_session(stream) => res,
42 res = Self::run_bi(session.clone(), publisher) => res,
43 res = Self::run_uni(session.clone(), subscriber) => res,
44 };
45
46 if let Err(err) = res {
47 tracing::warn!(?err, "terminated");
48 session.close(err.to_code(), &err.to_string());
49 }
50 });
51
52 this
53 }
54
55 pub async fn connect<T: Into<web_transport::Session>>(session: T) -> Result<Self, Error> {
57 let mut session = session.into();
58 let mut stream = Stream::open(&mut session, message::ControlType::Session).await?;
59 Self::connect_setup(&mut stream).await.or_close(&mut stream)?;
60 Ok(Self::new(session, stream))
61 }
62
63 async fn connect_setup(setup: &mut Stream) -> Result<(), Error> {
64 let client = message::ClientSetup {
65 versions: [message::Version::CURRENT].into(),
66 extensions: Default::default(),
67 };
68
69 setup.writer.encode(&client).await?;
70 let server: message::ServerSetup = setup.reader.decode().await?;
71
72 tracing::info!(version = ?server.version, "connected");
73
74 Ok(())
75 }
76
77 pub async fn accept<T: Into<web_transport::Session>>(session: T) -> Result<Self, Error> {
79 let mut session = session.into();
80 let mut stream = Stream::accept(&mut session).await?;
81 let kind = stream.reader.decode().await?;
82
83 if kind != message::ControlType::Session {
84 return Err(Error::UnexpectedStream(kind));
85 }
86
87 Self::accept_setup(&mut stream).await.or_close(&mut stream)?;
88 Ok(Self::new(session, stream))
89 }
90
91 async fn accept_setup(control: &mut Stream) -> Result<(), Error> {
92 let client: message::ClientSetup = control.reader.decode().await?;
93
94 if !client.versions.contains(&message::Version::CURRENT) {
95 return Err(Error::Version(client.versions, [message::Version::CURRENT].into()));
96 }
97
98 let server = message::ServerSetup {
99 version: message::Version::CURRENT,
100 extensions: Default::default(),
101 };
102
103 control.writer.encode(&server).await?;
104
105 tracing::info!(version = ?server.version, "connected");
106
107 Ok(())
108 }
109
110 async fn run_session(mut stream: Stream) -> Result<(), Error> {
111 while let Some(_info) = stream.reader.decode_maybe::<message::Info>().await? {}
112 Err(Error::Cancel)
113 }
114
115 async fn run_uni(mut session: web_transport::Session, subscriber: Subscriber) -> Result<(), Error> {
116 loop {
117 let mut stream = Reader::accept(&mut session).await?;
118 let subscriber = subscriber.clone();
119
120 spawn(async move {
121 Self::run_data(&mut stream, subscriber).await.or_close(&mut stream).ok();
122 });
123 }
124 }
125
126 async fn run_data(stream: &mut Reader, mut subscriber: Subscriber) -> Result<(), Error> {
127 let kind = stream.decode().await?;
128 match kind {
129 message::DataType::Group => subscriber.recv_group(stream).await,
130 }
131 }
132
133 async fn run_bi(mut session: web_transport::Session, publisher: Publisher) -> Result<(), Error> {
134 loop {
135 let mut stream = Stream::accept(&mut session).await?;
136 let publisher = publisher.clone();
137
138 spawn(async move {
139 Self::run_control(&mut stream, publisher)
140 .await
141 .or_close(&mut stream)
142 .ok();
143 });
144 }
145 }
146
147 async fn run_control(stream: &mut Stream, mut publisher: Publisher) -> Result<(), Error> {
148 let kind = stream.reader.decode().await?;
149 match kind {
150 message::ControlType::Session => Err(Error::UnexpectedStream(kind)),
151 message::ControlType::Announce => publisher.recv_announce(stream).await,
152 message::ControlType::Subscribe => publisher.recv_subscribe(stream).await,
153 message::ControlType::Info => publisher.recv_info(stream).await,
154 }
155 }
156
157 pub fn publish(&mut self, track: TrackConsumer) -> Result<(), Error> {
159 self.publisher.publish(track)
160 }
161
162 pub fn announce(&mut self, announced: AnnouncedConsumer) {
167 self.publisher.announce(announced);
168 }
169
170 pub fn route(&mut self, router: RouterConsumer) {
174 self.publisher.route(router);
175 }
176
177 pub fn subscribe(&self, track: Track) -> TrackConsumer {
179 self.subscriber.subscribe(track)
180 }
181
182 pub fn announced(&self, filter: Filter) -> AnnouncedConsumer {
184 self.subscriber.announced(filter)
185 }
186
187 pub fn close(mut self, err: Error) {
189 self.webtransport.close(err.to_code(), &err.to_string());
190 }
191
192 pub async fn closed(&self) -> Error {
194 self.webtransport.closed().await.into()
195 }
196}
197
198impl PartialEq for Session {
199 fn eq(&self, other: &Self) -> bool {
200 self.webtransport == other.webtransport
201 }
202}
203
204impl Eq for Session {}