use std::{future::Future, pin::Pin, sync::Arc, time::Duration};
use web_transport_trait::Stats;
use crate::{BandwidthConsumer, BandwidthProducer, Error, Version};
#[derive(Clone)]
pub struct Session {
session: Arc<dyn SessionInner>,
version: Version,
send_bandwidth: Option<BandwidthConsumer>,
recv_bandwidth: Option<BandwidthConsumer>,
closed: bool,
}
impl Session {
pub(super) fn new<S: web_transport_trait::Session>(
session: S,
version: Version,
recv_bandwidth: Option<BandwidthConsumer>,
) -> Self {
let send_bandwidth = if session.stats().estimated_send_rate().is_some() {
let producer = BandwidthProducer::new();
let consumer = producer.consume();
let session = session.clone();
web_async::spawn(async move {
run_send_bandwidth(&session, producer).await;
});
Some(consumer)
} else {
None
};
Self {
session: Arc::new(session),
version,
send_bandwidth,
recv_bandwidth,
closed: false,
}
}
pub fn version(&self) -> Version {
self.version
}
pub fn send_bandwidth(&self) -> Option<BandwidthConsumer> {
self.send_bandwidth.clone()
}
pub fn recv_bandwidth(&self) -> Option<BandwidthConsumer> {
self.recv_bandwidth.clone()
}
pub fn close(&mut self, err: Error) {
if self.closed {
return;
}
self.closed = true;
self.session.close(err.to_code(), err.to_string().as_ref());
}
pub async fn closed(&self) -> Result<(), Error> {
let err = self.session.closed().await;
Err(Error::Transport(err))
}
}
impl Drop for Session {
fn drop(&mut self) {
if !self.closed {
self.session.close(Error::Cancel.to_code(), "dropped");
}
}
}
async fn run_send_bandwidth<S: web_transport_trait::Session>(session: &S, producer: BandwidthProducer) {
const POLL_INTERVAL: Duration = Duration::from_millis(100);
loop {
tokio::select! {
biased;
_ = producer.closed() => return,
res = producer.used() => {
if res.is_err() {
return;
}
}
}
let mut interval = tokio::time::interval(POLL_INTERVAL);
loop {
tokio::select! {
biased;
_ = producer.closed() => return,
res = producer.unused() => {
if res.is_err() {
return;
}
break;
}
_ = interval.tick() => {
let bitrate = session.stats().estimated_send_rate();
if producer.set(bitrate).is_err() {
return;
}
}
}
}
}
}
trait SessionInner: Send + Sync {
fn close(&self, code: u32, reason: &str);
fn closed(&self) -> Pin<Box<dyn Future<Output = String> + Send + '_>>;
}
impl<S: web_transport_trait::Session> SessionInner for S {
fn close(&self, code: u32, reason: &str) {
S::close(self, code, reason);
}
fn closed(&self) -> Pin<Box<dyn Future<Output = String> + Send + '_>> {
Box::pin(async move { S::closed(self).await.to_string() })
}
}