moq_lite/session/
mod.rs

1use crate::{message, BroadcastConsumer, Error, OriginConsumer};
2
3use web_async::spawn;
4
5mod publisher;
6mod reader;
7mod stream;
8mod subscriber;
9mod writer;
10
11use publisher::*;
12use reader::*;
13use stream::*;
14use subscriber::*;
15use writer::*;
16
17/// A MoQ session, used to publish and/or subscribe to broadcasts.
18///
19/// A publisher will [Self::publish] tracks, or alternatively [Self::announce] and [Self::route] arbitrary paths.
20/// A subscriber will [Self::subscribe] to tracks, or alternatively use [Self::announced] to discover arbitrary paths.
21#[derive(Clone)]
22pub struct Session {
23	webtransport: web_transport::Session,
24	publisher: Publisher,
25	subscriber: Subscriber,
26}
27
28impl Session {
29	fn new(mut session: web_transport::Session, stream: Stream) -> Self {
30		tracing::info!("session started");
31
32		let publisher = Publisher::new(session.clone());
33		let subscriber = Subscriber::new(session.clone());
34
35		let this = Self {
36			webtransport: session.clone(),
37			publisher: publisher.clone(),
38			subscriber: subscriber.clone(),
39		};
40
41		spawn(async move {
42			let res = tokio::select! {
43				res = Self::run_session(stream) => res,
44				res = Self::run_bi(session.clone(), publisher) => res,
45				res = Self::run_uni(session.clone(), subscriber) => res,
46			};
47
48			match res {
49				Err(Error::WebTransport(web_transport::Error::Session(_))) => {
50					tracing::info!("session terminated");
51					session.close(1, "");
52				}
53				Err(err) => {
54					tracing::warn!(?err, "session error");
55					session.close(err.to_code(), &err.to_string());
56				}
57				_ => {
58					tracing::info!("session closed");
59					session.close(0, "");
60				}
61			}
62		});
63
64		this
65	}
66
67	/// Perform the MoQ handshake as a client.
68	pub async fn connect<T: Into<web_transport::Session>>(session: T) -> Result<Self, Error> {
69		let mut session = session.into();
70		let mut stream = Stream::open(&mut session, message::ControlType::Session).await?;
71		Self::connect_setup(&mut stream).await?;
72		Ok(Self::new(session, stream))
73	}
74
75	async fn connect_setup(setup: &mut Stream) -> Result<(), Error> {
76		let client = message::ClientSetup {
77			versions: [message::Version::CURRENT].into(),
78			extensions: Default::default(),
79		};
80
81		setup.writer.encode(&client).await?;
82		let server: message::ServerSetup = setup.reader.decode().await?;
83
84		tracing::debug!(version = ?server.version, "connected");
85
86		Ok(())
87	}
88
89	/// Perform the MoQ handshake as a server
90	pub async fn accept<T: Into<web_transport::Session>>(session: T) -> Result<Self, Error> {
91		let mut session = session.into();
92		let mut stream = Stream::accept(&mut session).await?;
93		let kind = stream.reader.decode().await?;
94
95		if kind != message::ControlType::Session {
96			return Err(Error::UnexpectedStream(kind));
97		}
98
99		Self::accept_setup(&mut stream).await?;
100		Ok(Self::new(session, stream))
101	}
102
103	async fn accept_setup(control: &mut Stream) -> Result<(), Error> {
104		let client: message::ClientSetup = control.reader.decode().await?;
105
106		if !client.versions.contains(&message::Version::CURRENT) {
107			return Err(Error::Version(client.versions, [message::Version::CURRENT].into()));
108		}
109
110		let server = message::ServerSetup {
111			version: message::Version::CURRENT,
112			extensions: Default::default(),
113		};
114
115		control.writer.encode(&server).await?;
116
117		tracing::debug!(version = ?server.version, "connected");
118
119		Ok(())
120	}
121
122	async fn run_session(mut stream: Stream) -> Result<(), Error> {
123		while let Some(_info) = stream.reader.decode_maybe::<message::SubscribeOk>().await? {}
124		Err(Error::Cancel)
125	}
126
127	async fn run_uni(mut session: web_transport::Session, subscriber: Subscriber) -> Result<(), Error> {
128		loop {
129			let stream = Reader::accept(&mut session).await?;
130			let subscriber = subscriber.clone();
131
132			spawn(async move {
133				Self::run_data(stream, subscriber).await.ok();
134			});
135		}
136	}
137
138	async fn run_data(mut stream: Reader, mut subscriber: Subscriber) -> Result<(), Error> {
139		let kind = stream.decode().await?;
140
141		let res = match kind {
142			message::DataType::Group => subscriber.recv_group(&mut stream).await,
143		};
144
145		if let Err(err) = res {
146			stream.abort(&err);
147		}
148
149		Ok(())
150	}
151
152	async fn run_bi(mut session: web_transport::Session, publisher: Publisher) -> Result<(), Error> {
153		loop {
154			let stream = Stream::accept(&mut session).await?;
155			let publisher = publisher.clone();
156
157			spawn(async move {
158				Self::run_control(stream, publisher).await.ok();
159			});
160		}
161	}
162
163	async fn run_control(mut stream: Stream, mut publisher: Publisher) -> Result<(), Error> {
164		let kind = stream.reader.decode().await?;
165
166		let res = match kind {
167			message::ControlType::Session => Err(Error::UnexpectedStream(kind)),
168			message::ControlType::Announce => publisher.recv_announce(&mut stream).await,
169			message::ControlType::Subscribe => publisher.recv_subscribe(&mut stream).await,
170		};
171
172		if let Err(err) = &res {
173			stream.writer.abort(err);
174		}
175
176		res
177	}
178
179	/// Publish a broadcast, automatically announcing and serving it.
180	pub fn publish<T: ToString>(&mut self, path: T, broadcast: BroadcastConsumer) {
181		self.publisher.publish(path, broadcast);
182	}
183
184	/// Publish all broadcasts from the given origin with a prefix.
185	pub fn publish_prefix(&mut self, prefix: &str, broadcasts: OriginConsumer) {
186		self.publisher.publish_prefix(prefix, broadcasts);
187	}
188
189	/// Publish all broadcasts from the given origin.
190	pub fn publish_all(&mut self, broadcasts: OriginConsumer) {
191		self.publisher.publish_all(broadcasts);
192	}
193
194	/// Consume a broadcast, returning a handle that can request tracks.
195	///
196	/// No tracks flow over the network until [BroadcastConsumer::subscribe] is called.
197	pub fn consume(&self, path: &str) -> BroadcastConsumer {
198		self.subscriber.consume(path)
199	}
200
201	/// Discover and consume all broadcasts.
202	///
203	/// No tracks flow over the network until [BroadcastConsumer::subscribe] is called.
204	pub fn consume_all(&self) -> OriginConsumer {
205		self.subscriber.consume_prefix("")
206	}
207
208	/// Discover and consume any broadcasts published by the remote matching a prefix.
209	///
210	/// No tracks flow over the network until [BroadcastConsumer::subscribe] is called.
211	pub fn consume_prefix<S: ToString>(&self, prefix: S) -> OriginConsumer {
212		self.subscriber.consume_prefix(prefix)
213	}
214
215	/// Discover and consume a specific broadcast.
216	///
217	/// This is different from `consume` because it waits for an announcement.
218	/// The returned OriginConsumer will ONLY announce a suffix="".
219	///
220	/// TODO: Make a special class for this.
221	pub fn consume_exact(&self, path: &str) -> OriginConsumer {
222		self.subscriber.consume_exact(path)
223	}
224
225	/// Close the underlying WebTransport session.
226	pub fn close(mut self, err: Error) {
227		self.webtransport.close(err.to_code(), &err.to_string());
228	}
229
230	/// Block until the WebTransport session is closed.
231	pub async fn closed(&self) -> Error {
232		self.webtransport.closed().await.into()
233	}
234}