1use crate::{
5 Error, OriginConsumer, OriginProducer, Session, VERSIONS,
6 coding::{Decode, Encode, Stream},
7 ietf, lite, setup,
8};
9
10#[derive(Default, Clone)]
12pub struct Client {
13 publish: Option<OriginConsumer>,
14 consume: Option<OriginProducer>,
15 }
18
19impl Client {
20 pub fn new() -> Self {
21 Default::default()
22 }
23
24 pub fn with_publish(mut self, publish: impl Into<Option<OriginConsumer>>) -> Self {
25 self.publish = publish.into();
26 self
27 }
28
29 pub fn with_consume(mut self, consume: impl Into<Option<OriginProducer>>) -> Self {
30 self.consume = consume.into();
31 self
32 }
33
34 pub async fn connect<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
42 if self.publish.is_none() && self.consume.is_none() {
43 tracing::warn!("not publishing or consuming anything");
44 }
45
46 let mut stream = Stream::open(&session, setup::ServerKind::Ietf14).await?;
47
48 let mut parameters = ietf::Parameters::default();
49 parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64);
50 parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec());
51 let parameters = parameters.encode_bytes(());
52
53 let client = setup::Client {
54 kind: setup::ClientKind::Ietf14,
57 versions: VERSIONS.into(),
58 parameters,
59 };
60
61 tracing::trace!(?client, "sending client setup");
63 stream.writer.encode(&client).await?;
64
65 let mut server: setup::Server = stream.reader.decode().await?;
66 tracing::trace!(?server, "received server setup");
67
68 if let Ok(version) = lite::Version::try_from(server.version) {
69 let stream = stream.with_version(version);
70 lite::start(
71 session.clone(),
72 stream,
73 self.publish.clone(),
74 self.consume.clone(),
75 version,
76 )
77 .await?;
78 } else if let Ok(version) = ietf::Version::try_from(server.version) {
79 let parameters = ietf::Parameters::decode(&mut server.parameters, version)?;
81 let request_id_max =
82 ietf::RequestId(parameters.get_varint(ietf::ParameterVarInt::MaxRequestId).unwrap_or(0));
83
84 let stream = stream.with_version(version);
85 ietf::start(
86 session.clone(),
87 stream,
88 request_id_max,
89 true,
90 self.publish.clone(),
91 self.consume.clone(),
92 version,
93 )
94 .await?;
95 } else {
96 return Err(Error::Version(client.versions, [server.version].into()));
98 }
99
100 tracing::debug!(version = ?server.version, "connected");
101
102 Ok(Session::new(session))
103 }
104}