use tracing::debug;
use wtransport_proto::bytes::IoReadError;
use wtransport_proto::capsule::capsules;
use wtransport_proto::capsule::Capsule;
use wtransport_proto::capsule::{self};
use wtransport_proto::error::ErrorCode;
use wtransport_proto::varint::VarInt;
use super::session::StreamSession;
use super::ProtoReadError;
use crate::driver::DriverError;
use crate::error::ApplicationClose;
use std::future::pending;
pub struct ConnectStream {
stream: Option<StreamSession>,
}
impl ConnectStream {
pub fn empty() -> Self {
Self { stream: None }
}
pub fn is_empty(&self) -> bool {
self.stream.is_none()
}
pub fn set_stream(&mut self, stream: StreamSession) {
self.stream = Some(stream);
}
pub async fn run(&mut self) -> DriverError {
let stream = match self.stream.as_mut() {
Some(stream) => stream,
None => pending().await,
};
loop {
return match stream.read_frame().await {
Ok(frame) => {
let capsule = match Capsule::with_frame(&frame) {
Some(capsule)
if matches!(
capsule.kind(),
capsule::CapsuleKind::CloseWebTransportSession
) =>
{
capsule
}
_ => {
debug!(
"Unknown capsule of kind {:?} of {}B",
frame.kind(),
frame.payload().len()
);
continue;
}
};
let close_session =
match capsules::CloseWebTransportSession::with_capsule(&capsule) {
Ok(close_session) => close_session,
Err(error_code) => return DriverError::Proto(error_code),
};
self.stream
.take()
.unwrap()
.reset(ErrorCode::NoError.to_code());
DriverError::ApplicationClosed(ApplicationClose::new(
close_session.error_code(),
close_session
.reason()
.as_bytes()
.to_vec()
.into_boxed_slice(),
))
}
Err(ProtoReadError::H3(error_code)) => DriverError::Proto(error_code),
Err(ProtoReadError::IO(io_error)) => match io_error {
IoReadError::ImmediateFin => DriverError::ApplicationClosed(
ApplicationClose::new(VarInt::from_u32(0), Box::new([])),
),
IoReadError::UnexpectedFin | IoReadError::Reset => {
DriverError::Proto(ErrorCode::ClosedCriticalStream)
}
IoReadError::NotConnected => DriverError::NotConnected,
},
};
}
}
}