moq_lite/
client.rs

1// TODO: Uncomment when observability feature is merged
2// use std::sync::Arc;
3
4use crate::{
5	Error, OriginConsumer, OriginProducer, Session, VERSIONS,
6	coding::{Decode, Encode, Stream},
7	ietf, lite, setup,
8};
9
10/// A MoQ client session builder.
11#[derive(Default, Clone)]
12pub struct Client {
13	publish: Option<OriginConsumer>,
14	consume: Option<OriginProducer>,
15	// TODO: Uncomment when observability feature is merged
16	// stats: Option<Arc<dyn crate::Stats>>,
17}
18
19impl Client {
20	pub fn new() -> Self {
21		Default::default()
22	}
23
24	pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
25		self.publish = publish.into();
26		self
27	}
28
29	pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
30		self.consume = consume.into();
31		self
32	}
33
34	// TODO: Uncomment when observability feature is merged
35	// pub fn with_stats(mut self, stats: impl Into<Option<Arc<dyn crate::Stats>>>) -> Self {
36	// 	self.stats = stats.into();
37	// 	self
38	// }
39
40	/// Perform the MoQ handshake as a client negotiating the version.
41	pub async fn connect<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
42		if self.publish.is_none() && self.consume.is_none() {
43			tracing::warn!("not publishing or consuming anything");
44		}
45
46		let mut stream = Stream::open(&session, setup::ServerKind::Ietf14).await?;
47
48		let mut parameters = ietf::Parameters::default();
49		parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
50		parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
51		let parameters = parameters.encode_bytes(());
52
53		let client = setup::Client {
54			// Unfortunately, we have to pick a single draft range to support.
55			// moq-lite can support this handshake.
56			kind: setup::ClientKind::Ietf14,
57			versions: VERSIONS.into(),
58			parameters,
59		};
60
61		// TODO pretty print the parameters.
62		tracing::trace!(?client, "sending client setup");
63		stream.writer.encode(&client).await?;
64
65		let mut server: setup::Server = stream.reader.decode().await?;
66		tracing::trace!(?server, "received server setup");
67
68		if let Ok(version) = lite::Version::try_from(server.version) {
69			let stream = stream.with_version(version);
70			lite::start(
71				session.clone(),
72				stream,
73				self.publish.clone(),
74				self.consume.clone(),
75				version,
76			)
77			.await?;
78		} else if let Ok(version) = ietf::Version::try_from(server.version) {
79			// Decode the parameters to get the initial request ID.
80			let parameters = ietf::Parameters::decode(&mut server.parameters, version)?;
81			let request_id_max =
82				ietf::RequestId(parameters.get_varint(ietf::ParameterVarInt::MaxRequestId).unwrap_or(0));
83
84			let stream = stream.with_version(version);
85			ietf::start(
86				session.clone(),
87				stream,
88				request_id_max,
89				true,
90				self.publish.clone(),
91				self.consume.clone(),
92				version,
93			)
94			.await?;
95		} else {
96			// unreachable, but just in case
97			return Err(Error::Version(client.versions, [server.version].into()));
98		}
99
100		tracing::debug!(version = ?server.version, "connected");
101
102		Ok(Session::new(session))
103	}
104}