moq_lite/session/
mod.rs

1use crate::{message, Error, OriginConsumer, OriginProducer};
2
3mod publisher;
4mod reader;
5mod stream;
6mod subscriber;
7mod writer;
8
9use publisher::*;
10use reader::*;
11use stream::*;
12use subscriber::*;
13use writer::*;
14
15/// A MoQ session, constructed with [Publisher] and [Subscriber] halves.
16///
17/// This simplifies the state machine and immediately rejects any subscriptions that don't match the origin prefix.
18/// You probably want to use [Session] unless you're writing a relay.
19#[derive(Clone)]
20pub struct Session {
21	pub webtransport: web_transport::Session,
22}
23
24impl Session {
25	fn new(
26		mut session: web_transport::Session,
27		stream: Stream,
28		// We will publish any local broadcasts from this origin.
29		publish: Option<OriginConsumer>,
30		// We will consume any remote broadcasts, inserting them into this origin.
31		subscribe: Option<OriginProducer>,
32	) -> Self {
33		let publisher = SessionPublisher::new(session.clone(), publish);
34		let subscriber = SessionSubscriber::new(session.clone());
35
36		let this = Self {
37			webtransport: session.clone(),
38		};
39
40		web_async::spawn(async move {
41			let res = tokio::select! {
42				res = Self::run_session(stream) => res,
43				res = Self::run_bi(session.clone(), publisher.clone()) => res,
44				res = Self::run_uni(session.clone(), subscriber.clone()) => res,
45				//res = publisher.run() => res,
46				// Ignore Ok (unused) or when subscribe is None.
47				Some(Err(res)) = async move { Some(subscriber.run(subscribe?).await) } => Err(res),
48			};
49
50			match res {
51				Err(Error::WebTransport(web_transport::Error::Session(_))) => {
52					tracing::info!("session terminated");
53					session.close(1, "");
54				}
55				Err(err) => {
56					tracing::warn!(?err, "session error");
57					session.close(err.to_code(), &err.to_string());
58				}
59				_ => {
60					tracing::info!("session closed");
61					session.close(0, "");
62				}
63			}
64		});
65
66		this
67	}
68
69	/// Perform the MoQ handshake as a client.
70	pub async fn connect<
71		T: Into<web_transport::Session>,
72		P: Into<Option<OriginConsumer>>,
73		C: Into<Option<OriginProducer>>,
74	>(
75		session: T,
76		publish: P,
77		subscribe: C,
78	) -> Result<Self, Error> {
79		let mut session = session.into();
80		let mut stream = Stream::open(&mut session, message::ControlType::Session).await?;
81		Self::connect_setup(&mut stream).await?;
82		Ok(Self::new(session, stream, publish.into(), subscribe.into()))
83	}
84
85	async fn connect_setup(setup: &mut Stream) -> Result<(), Error> {
86		let client = message::ClientSetup {
87			versions: [message::Version::CURRENT].into(),
88			extensions: Default::default(),
89		};
90
91		setup.writer.encode(&client).await?;
92		let server: message::ServerSetup = setup.reader.decode().await?;
93
94		tracing::debug!(version = ?server.version, "connected");
95
96		Ok(())
97	}
98
99	/// Perform the MoQ handshake as a server
100	pub async fn accept<
101		T: Into<web_transport::Session>,
102		P: Into<Option<OriginConsumer>>,
103		C: Into<Option<OriginProducer>>,
104	>(
105		session: T,
106		publish: P,
107		subscribe: C,
108	) -> Result<Self, Error> {
109		let mut session = session.into();
110		let mut stream = Stream::accept(&mut session).await?;
111		let kind = stream.reader.decode().await?;
112
113		if kind != message::ControlType::Session {
114			return Err(Error::UnexpectedStream(kind));
115		}
116
117		Self::accept_setup(&mut stream).await?;
118		Ok(Self::new(session, stream, publish.into(), subscribe.into()))
119	}
120
121	async fn accept_setup(control: &mut Stream) -> Result<(), Error> {
122		let client: message::ClientSetup = control.reader.decode().await?;
123
124		if !client.versions.contains(&message::Version::CURRENT) {
125			return Err(Error::Version(client.versions, [message::Version::CURRENT].into()));
126		}
127
128		let server = message::ServerSetup {
129			version: message::Version::CURRENT,
130			extensions: Default::default(),
131		};
132
133		control.writer.encode(&server).await?;
134
135		tracing::debug!(version = ?server.version, "connected");
136
137		Ok(())
138	}
139
140	async fn run_session(mut stream: Stream) -> Result<(), Error> {
141		while let Some(_info) = stream.reader.decode_maybe::<message::SubscribeOk>().await? {}
142		Err(Error::Cancel)
143	}
144
145	async fn run_uni(mut session: web_transport::Session, subscriber: SessionSubscriber) -> Result<(), Error> {
146		loop {
147			let stream = Reader::accept(&mut session).await?;
148			let subscriber = subscriber.clone();
149
150			web_async::spawn(async move {
151				Self::run_data(stream, subscriber).await.ok();
152			});
153		}
154	}
155
156	async fn run_data(mut stream: Reader, mut subscriber: SessionSubscriber) -> Result<(), Error> {
157		let kind = stream.decode().await?;
158
159		let res = match kind {
160			message::DataType::Group => subscriber.recv_group(&mut stream).await,
161		};
162
163		if let Err(err) = res {
164			stream.abort(&err);
165		}
166
167		Ok(())
168	}
169
170	async fn run_bi(mut session: web_transport::Session, publisher: SessionPublisher) -> Result<(), Error> {
171		loop {
172			let stream = Stream::accept(&mut session).await?;
173			let publisher = publisher.clone();
174
175			web_async::spawn(async move {
176				Self::run_control(stream, publisher).await.ok();
177			});
178		}
179	}
180
181	async fn run_control(mut stream: Stream, mut publisher: SessionPublisher) -> Result<(), Error> {
182		let kind = stream.reader.decode().await?;
183
184		let res = match kind {
185			message::ControlType::Session => Err(Error::UnexpectedStream(kind)),
186			message::ControlType::Announce => publisher.recv_announce(&mut stream).await,
187			message::ControlType::Subscribe => publisher.recv_subscribe(&mut stream).await,
188		};
189
190		if let Err(err) = &res {
191			stream.writer.abort(err);
192		}
193
194		res
195	}
196
197	/// Close the underlying WebTransport session.
198	pub fn close(mut self, err: Error) {
199		self.webtransport.close(err.to_code(), &err.to_string());
200	}
201
202	/// Block until the WebTransport session is closed.
203	pub async fn closed(&self) -> Error {
204		self.webtransport.closed().await.into()
205	}
206}