use crate::{
BandwidthConsumer, BandwidthProducer, Error, OriginConsumer, OriginProducer, coding::Stream, lite::SessionInfo,
};
use super::{Publisher, Subscriber, Version};
pub fn start<S: web_transport_trait::Session>(
session: S,
setup: Option<Stream<S, Version>>,
publish: Option<OriginConsumer>,
subscribe: Option<OriginProducer>,
version: Version,
) -> Result<Option<BandwidthConsumer>, Error> {
let recv_bw = BandwidthProducer::new();
let recv_bw_consumer = match version {
Version::Lite01 | Version::Lite02 => None,
_ => Some(recv_bw.consume()),
};
let recv_bw_for_sub = match version {
Version::Lite01 | Version::Lite02 => None,
_ => Some(recv_bw),
};
let publisher = Publisher::new(session.clone(), publish, version);
let subscriber = Subscriber::new(session.clone(), subscribe, recv_bw_for_sub, version);
web_async::spawn(async move {
let res = tokio::select! {
Err(res) = run_session(setup) => Err(res),
res = publisher.run() => res,
res = subscriber.run() => res,
};
match res {
Err(Error::Transport(_)) => {
tracing::info!("session terminated");
session.close(1, "");
}
Err(err) => {
tracing::warn!(%err, "session error");
session.close(err.to_code(), err.to_string().as_ref());
}
_ => {
tracing::info!("session closed");
session.close(0, "");
}
}
});
Ok(recv_bw_consumer)
}
async fn run_session<S: web_transport_trait::Session>(stream: Option<Stream<S, Version>>) -> Result<(), Error> {
if let Some(mut stream) = stream {
while let Some(_info) = stream.reader.decode_maybe::<SessionInfo>().await? {}
return Err(Error::Cancel);
}
Ok(())
}