moq_transfork/session/
mod.rs

1use crate::{message, AnnouncedConsumer, Error, Filter, RouterConsumer, Track, TrackConsumer};
2
3use moq_async::{spawn, Close, OrClose};
4
5mod publisher;
6mod reader;
7mod stream;
8mod subscriber;
9mod writer;
10
11use publisher::*;
12use reader::*;
13use stream::*;
14use subscriber::*;
15use writer::*;
16
17/// A MoqTransfork session, used to publish and/or subscribe to broadcasts.
18///
19/// A publisher will [Self::publish] tracks, or alternatively [Self::announce] and [Self::route] arbitrary paths.
20/// A subscriber will [Self::subscribe] to tracks, or alternatively use [Self::announced] to discover arbitrary paths.
21#[derive(Clone)]
22pub struct Session {
23	webtransport: web_transport::Session,
24	publisher: Publisher,
25	subscriber: Subscriber,
26}
27
28impl Session {
29	fn new(mut session: web_transport::Session, stream: Stream) -> Self {
30		let publisher = Publisher::new(session.clone());
31		let subscriber = Subscriber::new(session.clone());
32
33		let this = Self {
34			webtransport: session.clone(),
35			publisher: publisher.clone(),
36			subscriber: subscriber.clone(),
37		};
38
39		spawn(async move {
40			let res = tokio::select! {
41				res = Self::run_session(stream) => res,
42				res = Self::run_bi(session.clone(), publisher) => res,
43				res = Self::run_uni(session.clone(), subscriber) => res,
44			};
45
46			if let Err(err) = res {
47				tracing::warn!(?err, "terminated");
48				session.close(err.to_code(), &err.to_string());
49			}
50		});
51
52		this
53	}
54
55	/// Perform the MoQ handshake as a client.
56	pub async fn connect<T: Into<web_transport::Session>>(session: T) -> Result<Self, Error> {
57		let mut session = session.into();
58		let mut stream = Stream::open(&mut session, message::ControlType::Session).await?;
59		Self::connect_setup(&mut stream).await.or_close(&mut stream)?;
60		Ok(Self::new(session, stream))
61	}
62
63	async fn connect_setup(setup: &mut Stream) -> Result<(), Error> {
64		let client = message::ClientSetup {
65			versions: [message::Version::CURRENT].into(),
66			extensions: Default::default(),
67		};
68
69		setup.writer.encode(&client).await?;
70		let server: message::ServerSetup = setup.reader.decode().await?;
71
72		tracing::info!(version = ?server.version, "connected");
73
74		Ok(())
75	}
76
77	/// Perform the MoQ handshake as a server
78	pub async fn accept<T: Into<web_transport::Session>>(session: T) -> Result<Self, Error> {
79		let mut session = session.into();
80		let mut stream = Stream::accept(&mut session).await?;
81		let kind = stream.reader.decode().await?;
82
83		if kind != message::ControlType::Session {
84			return Err(Error::UnexpectedStream(kind));
85		}
86
87		Self::accept_setup(&mut stream).await.or_close(&mut stream)?;
88		Ok(Self::new(session, stream))
89	}
90
91	async fn accept_setup(control: &mut Stream) -> Result<(), Error> {
92		let client: message::ClientSetup = control.reader.decode().await?;
93
94		if !client.versions.contains(&message::Version::CURRENT) {
95			return Err(Error::Version(client.versions, [message::Version::CURRENT].into()));
96		}
97
98		let server = message::ServerSetup {
99			version: message::Version::CURRENT,
100			extensions: Default::default(),
101		};
102
103		control.writer.encode(&server).await?;
104
105		tracing::info!(version = ?server.version, "connected");
106
107		Ok(())
108	}
109
110	async fn run_session(mut stream: Stream) -> Result<(), Error> {
111		while let Some(_info) = stream.reader.decode_maybe::<message::Info>().await? {}
112		Err(Error::Cancel)
113	}
114
115	async fn run_uni(mut session: web_transport::Session, subscriber: Subscriber) -> Result<(), Error> {
116		loop {
117			let mut stream = Reader::accept(&mut session).await?;
118			let subscriber = subscriber.clone();
119
120			spawn(async move {
121				Self::run_data(&mut stream, subscriber).await.or_close(&mut stream).ok();
122			});
123		}
124	}
125
126	async fn run_data(stream: &mut Reader, mut subscriber: Subscriber) -> Result<(), Error> {
127		let kind = stream.decode().await?;
128		match kind {
129			message::DataType::Group => subscriber.recv_group(stream).await,
130		}
131	}
132
133	async fn run_bi(mut session: web_transport::Session, publisher: Publisher) -> Result<(), Error> {
134		loop {
135			let mut stream = Stream::accept(&mut session).await?;
136			let publisher = publisher.clone();
137
138			spawn(async move {
139				Self::run_control(&mut stream, publisher)
140					.await
141					.or_close(&mut stream)
142					.ok();
143			});
144		}
145	}
146
147	async fn run_control(stream: &mut Stream, mut publisher: Publisher) -> Result<(), Error> {
148		let kind = stream.reader.decode().await?;
149		match kind {
150			message::ControlType::Session => Err(Error::UnexpectedStream(kind)),
151			message::ControlType::Announce => publisher.recv_announce(stream).await,
152			message::ControlType::Subscribe => publisher.recv_subscribe(stream).await,
153			message::ControlType::Info => publisher.recv_info(stream).await,
154		}
155	}
156
157	/// Publish a track, automatically announcing and serving it.
158	pub fn publish(&mut self, track: TrackConsumer) -> Result<(), Error> {
159		self.publisher.publish(track)
160	}
161
162	/// Optionally announce the provided tracks.
163	///
164	/// This is advanced functionality if you wish to perform dynamic track generation in conjunction with [Self::route].
165	/// [AnnouncedConsumer] will automatically unannounce if the [crate::AnnouncedProducer] is dropped.
166	pub fn announce(&mut self, announced: AnnouncedConsumer) {
167		self.publisher.announce(announced);
168	}
169
170	/// Optionally route unknown paths.
171	///
172	/// This is advanced functionality if you wish to perform dynamic track generation in conjunction with [Self::announce].
173	pub fn route(&mut self, router: RouterConsumer) {
174		self.publisher.route(router);
175	}
176
177	/// Subscribe to a track and start receiving data over the network.
178	pub fn subscribe(&self, track: Track) -> TrackConsumer {
179		self.subscriber.subscribe(track)
180	}
181
182	/// Discover any tracks published by the remote matching a (wildcard) filter.
183	pub fn announced(&self, filter: Filter) -> AnnouncedConsumer {
184		self.subscriber.announced(filter)
185	}
186
187	/// Close the underlying WebTransport session.
188	pub fn close(mut self, err: Error) {
189		self.webtransport.close(err.to_code(), &err.to_string());
190	}
191
192	/// Block until the WebTransport session is closed.
193	pub async fn closed(&self) -> Error {
194		self.webtransport.closed().await.into()
195	}
196}
197
198impl PartialEq for Session {
199	fn eq(&self, other: &Self) -> bool {
200		self.webtransport == other.webtransport
201	}
202}
203
204impl Eq for Session {}