moq_lite/session/
mod.rs

1use std::sync::Arc;
2
3use crate::{message, Error, OriginConsumer, OriginProducer};
4
5mod publisher;
6mod reader;
7mod stream;
8mod subscriber;
9mod writer;
10
11use publisher::*;
12use reader::*;
13use stream::*;
14use subscriber::*;
15use tokio::sync::oneshot;
16use writer::*;
17
18/// A MoQ session, constructed with [OriginProducer] and [OriginConsumer] halves.
19///
20/// This simplifies the state machine and immediately rejects any subscriptions that don't match the origin prefix.
21/// You probably want to use [Session] unless you're writing a relay.
22pub struct Session<S: web_transport_trait::Session> {
23	transport: S,
24}
25
26impl<S: web_transport_trait::Session + Sync> Session<S> {
27	async fn new(
28		session: S,
29		stream: Stream<S>,
30		// We will publish any local broadcasts from this origin.
31		publish: Option<OriginConsumer>,
32		// We will consume any remote broadcasts, inserting them into this origin.
33		subscribe: Option<OriginProducer>,
34	) -> Result<Self, Error> {
35		let publisher = Publisher::new(session.clone(), publish);
36		let subscriber = Subscriber::new(session.clone(), subscribe);
37
38		let this = Self {
39			transport: session.clone(),
40		};
41
42		let init = oneshot::channel();
43
44		web_async::spawn(async move {
45			let res = tokio::select! {
46				res = Self::run_session(stream) => res,
47				res = publisher.run() => res,
48				res = subscriber.run(init.0) => res,
49			};
50
51			match res {
52				Err(Error::Transport(_)) => {
53					tracing::info!("session terminated");
54					session.close(1, "");
55				}
56				Err(err) => {
57					tracing::warn!(%err, "session error");
58					session.close(err.to_code(), err.to_string().as_ref());
59				}
60				_ => {
61					tracing::info!("session closed");
62					session.close(0, "");
63				}
64			}
65		});
66
67		// Wait until receiving the initial announcements to prevent some race conditions.
68		// Otherwise, `consume()` might return not found if we don't wait long enough, so just wait.
69		// If the announce stream fails or is closed, this will return an error instead of hanging.
70		// TODO return a better error
71		init.1.await.map_err(|_| Error::Cancel)?;
72
73		Ok(this)
74	}
75
76	/// Perform the MoQ handshake as a client.
77	pub async fn connect(
78		session: S,
79		publish: impl Into<Option<OriginConsumer>>,
80		subscribe: impl Into<Option<OriginProducer>>,
81	) -> Result<Self, Error> {
82		let mut stream = Stream::open(&session, message::ControlType::Session).await?;
83		Self::connect_setup(&mut stream).await?;
84		let session = Self::new(session, stream, publish.into(), subscribe.into()).await?;
85		Ok(session)
86	}
87
88	async fn connect_setup(setup: &mut Stream<S>) -> Result<(), Error> {
89		let client = message::ClientSetup {
90			versions: [message::Version::CURRENT].into(),
91			extensions: Default::default(),
92		};
93
94		setup.writer.encode(&client).await?;
95		let server: message::ServerSetup = setup.reader.decode().await?;
96
97		tracing::debug!(version = ?server.version, "connected");
98
99		Ok(())
100	}
101
102	/// Perform the MoQ handshake as a server
103	pub async fn accept<P: Into<Option<OriginConsumer>>, C: Into<Option<OriginProducer>>>(
104		session: S,
105		publish: P,
106		subscribe: C,
107	) -> Result<Self, Error> {
108		let mut stream = Stream::accept(&session).await?;
109		let kind = stream.reader.decode().await?;
110
111		Self::accept_setup(kind, &mut stream).await?;
112		let session = Self::new(session, stream, publish.into(), subscribe.into()).await?;
113		Ok(session)
114	}
115
116	async fn accept_setup(kind: message::ControlType, control: &mut Stream<S>) -> Result<(), Error> {
117		if kind != message::ControlType::Session && kind != message::ControlType::ClientCompat {
118			return Err(Error::UnexpectedStream(kind));
119		}
120
121		let client: message::ClientSetup = control.reader.decode().await?;
122		if !client.versions.contains(&message::Version::CURRENT) {
123			return Err(Error::Version(client.versions, [message::Version::CURRENT].into()));
124		}
125
126		let server = message::ServerSetup {
127			version: message::Version::CURRENT,
128			extensions: Default::default(),
129		};
130
131		// Backwards compatibility with moq-transport-10
132		if kind == message::ControlType::ClientCompat {
133			// Write a 0x41 just to be backwards compatible.
134			control.writer.encode(&message::ControlType::ServerCompat).await?;
135		}
136
137		control.writer.encode(&server).await?;
138
139		tracing::debug!(version = ?server.version, "connected");
140
141		Ok(())
142	}
143
144	// TODO do something useful with this
145	async fn run_session(mut stream: Stream<S>) -> Result<(), Error> {
146		while let Some(_info) = stream.reader.decode_maybe::<message::SessionInfo>().await? {}
147		Err(Error::Cancel)
148	}
149
150	/// Close the underlying transport session.
151	pub fn close(self, err: Error) {
152		self.transport.close(err.to_code(), err.to_string().as_ref());
153	}
154
155	/// Block until the transport session is closed.
156	pub async fn closed(&self) -> Error {
157		Error::Transport(Arc::new(self.transport.closed().await))
158	}
159}