moq_lite/session/
mod.rs

1use crate::{message, AnnounceConsumer, BroadcastConsumer, Error, Origin};
2
3use web_async::spawn;
4
5mod publisher;
6mod reader;
7mod stream;
8mod subscriber;
9mod writer;
10
11use publisher::*;
12use reader::*;
13use stream::*;
14use subscriber::*;
15use writer::*;
16
17/// A MoQ session, used to publish and/or subscribe to broadcasts.
18///
19/// A publisher will [Self::publish] tracks, or alternatively [Self::announce] and [Self::route] arbitrary paths.
20/// A subscriber will [Self::subscribe] to tracks, or alternatively use [Self::announced] to discover arbitrary paths.
21#[derive(Clone)]
22pub struct Session {
23	webtransport: web_transport::Session,
24	publisher: Publisher,
25	subscriber: Subscriber,
26}
27
28impl Session {
29	fn new(mut session: web_transport::Session, stream: Stream) -> Self {
30		let publisher = Publisher::new(session.clone());
31		let subscriber = Subscriber::new(session.clone());
32
33		let this = Self {
34			webtransport: session.clone(),
35			publisher: publisher.clone(),
36			subscriber: subscriber.clone(),
37		};
38
39		spawn(async move {
40			let res = tokio::select! {
41				res = Self::run_session(stream) => res,
42				res = Self::run_bi(session.clone(), publisher) => res,
43				res = Self::run_uni(session.clone(), subscriber) => res,
44			};
45
46			if let Err(err) = res {
47				tracing::info!(?err, "session terminated");
48				session.close(err.to_code(), &err.to_string());
49			}
50		});
51
52		this
53	}
54
55	/// Perform the MoQ handshake as a client.
56	pub async fn connect<T: Into<web_transport::Session>>(session: T) -> Result<Self, Error> {
57		let mut session = session.into();
58		let mut stream = Stream::open(&mut session, message::ControlType::Session).await?;
59		Self::connect_setup(&mut stream).await?;
60		Ok(Self::new(session, stream))
61	}
62
63	async fn connect_setup(setup: &mut Stream) -> Result<(), Error> {
64		let client = message::ClientSetup {
65			versions: [message::Version::CURRENT].into(),
66			extensions: Default::default(),
67		};
68
69		setup.writer.encode(&client).await?;
70		let server: message::ServerSetup = setup.reader.decode().await?;
71
72		tracing::debug!(version = ?server.version, "connected");
73
74		Ok(())
75	}
76
77	/// Perform the MoQ handshake as a server
78	pub async fn accept<T: Into<web_transport::Session>>(session: T) -> Result<Self, Error> {
79		let mut session = session.into();
80		let mut stream = Stream::accept(&mut session).await?;
81		let kind = stream.reader.decode().await?;
82
83		if kind != message::ControlType::Session {
84			return Err(Error::UnexpectedStream(kind));
85		}
86
87		Self::accept_setup(&mut stream).await?;
88		Ok(Self::new(session, stream))
89	}
90
91	async fn accept_setup(control: &mut Stream) -> Result<(), Error> {
92		let client: message::ClientSetup = control.reader.decode().await?;
93
94		if !client.versions.contains(&message::Version::CURRENT) {
95			return Err(Error::Version(client.versions, [message::Version::CURRENT].into()));
96		}
97
98		let server = message::ServerSetup {
99			version: message::Version::CURRENT,
100			extensions: Default::default(),
101		};
102
103		control.writer.encode(&server).await?;
104
105		tracing::debug!(version = ?server.version, "connected");
106
107		Ok(())
108	}
109
110	async fn run_session(mut stream: Stream) -> Result<(), Error> {
111		while let Some(_info) = stream.reader.decode_maybe::<message::SubscribeOk>().await? {}
112		Err(Error::Cancel)
113	}
114
115	async fn run_uni(mut session: web_transport::Session, subscriber: Subscriber) -> Result<(), Error> {
116		loop {
117			let stream = Reader::accept(&mut session).await?;
118			let subscriber = subscriber.clone();
119
120			spawn(async move {
121				Self::run_data(stream, subscriber).await.ok();
122			});
123		}
124	}
125
126	async fn run_data(mut stream: Reader, mut subscriber: Subscriber) -> Result<(), Error> {
127		let kind = stream.decode().await?;
128
129		let res = match kind {
130			message::DataType::Group => subscriber.recv_group(&mut stream).await,
131		};
132
133		if let Err(err) = res {
134			stream.abort(&err);
135		}
136
137		Ok(())
138	}
139
140	async fn run_bi(mut session: web_transport::Session, publisher: Publisher) -> Result<(), Error> {
141		loop {
142			let stream = Stream::accept(&mut session).await?;
143			let publisher = publisher.clone();
144
145			spawn(async move {
146				Self::run_control(stream, publisher).await.ok();
147			});
148		}
149	}
150
151	async fn run_control(mut stream: Stream, mut publisher: Publisher) -> Result<(), Error> {
152		let kind = stream.reader.decode().await?;
153
154		let res = match kind {
155			message::ControlType::Session => Err(Error::UnexpectedStream(kind)),
156			message::ControlType::Announce => publisher.recv_announce(&mut stream).await,
157			message::ControlType::Subscribe => publisher.recv_subscribe(&mut stream).await,
158		};
159
160		if let Err(err) = &res {
161			stream.writer.abort(err);
162		}
163
164		res
165	}
166
167	/// Publish a broadcast, automatically announcing and serving it.
168	pub fn publish<T: ToString>(&mut self, path: T, broadcast: BroadcastConsumer) {
169		self.publisher.publish(path, broadcast);
170	}
171
172	/// Scope subscriptions to a broadcast, returning a handle that can request tracks.
173	///
174	/// No data flows over the network until [BroadcastConsumer::subscribe] is called.
175	pub fn consume(&self, path: &str) -> BroadcastConsumer {
176		self.subscriber.consume(path)
177	}
178
179	/// Discover any broadcasts published by the remote matching a prefix.
180	///
181	/// There will be an event each time a broadcast starts and later ends.
182	/// The results contain the suffix only; you may need to re-apply the prefix.
183	pub fn announced<S: ToString>(&self, prefix: S) -> AnnounceConsumer {
184		self.subscriber.announced(prefix.to_string())
185	}
186
187	/// Close the underlying WebTransport session.
188	pub fn close(mut self, err: Error) {
189		self.webtransport.close(err.to_code(), &err.to_string());
190	}
191
192	/// Block until the WebTransport session is closed.
193	pub async fn closed(&self) -> Error {
194		self.webtransport.closed().await.into()
195	}
196
197	/// Publish all of our broadcasts to the given origin.
198	///
199	/// If an optional prefix is provided, the prefix will be applied when inserting into the origin.
200	pub async fn publish_to(&mut self, mut origin: Origin, prefix: &str) {
201		let mut announced = self.announced("");
202
203		while let Some(suffix) = announced.active().await {
204			let broadcast = self.consume(&suffix);
205
206			// We can avoid a string copy if there's no prefix.
207			match prefix {
208				"" => origin.publish(suffix, broadcast),
209				prefix => origin.publish(format!("{}{}", prefix, suffix), broadcast),
210			};
211		}
212	}
213
214	/// Serve all broadcasts from the given origin.
215	///
216	/// If the prefix is provided, then only broadcasts matching the (stripped) prefix are served.
217	pub async fn consume_from(&mut self, origin: Origin, prefix: &str) {
218		let mut remotes = origin.announced(prefix);
219
220		while let Some(suffix) = remotes.active().await {
221			match prefix {
222				// If there's no prefix, we can avoid a string copy.
223				"" => {
224					if let Some(upstream) = origin.consume(&suffix) {
225						// If the broadcast exists (it should...) then we serve it.
226						self.publish(suffix, upstream);
227					}
228				}
229				// We need to re-apply the prefix to get the full path on the origin.
230				prefix => {
231					let path = format!("{}{}", prefix, suffix);
232					if let Some(upstream) = origin.consume(&path) {
233						// If the broadcast exists (it should...) then we serve it.
234						self.publish(suffix, upstream);
235					}
236				}
237			};
238		}
239	}
240}