use std::cmp;
use bytes::{BufMut, Bytes};
use js_sys::Uint8Array;
use web_sys::WebTransportReceiveStream;
use crate::Error;
use web_streams::Reader;
pub struct RecvStream {
reader: Reader<Uint8Array>,
buffer: Bytes,
}
impl RecvStream {
pub(super) fn new(stream: WebTransportReceiveStream) -> Result<Self, Error> {
let reader = Reader::new(&stream)?;
Ok(Self {
reader,
buffer: Bytes::new(),
})
}
pub async fn read(&mut self, max: usize) -> Result<Option<Bytes>, Error> {
if !self.buffer.is_empty() {
let size = cmp::min(max, self.buffer.len());
let data = self.buffer.split_to(size);
return Ok(Some(data));
}
let mut data: Bytes = match self.reader.read().await? {
Some(data) => Bytes::from(data.to_vec()),
None => return Ok(None),
};
if data.len() > max {
self.buffer = data.split_off(max);
}
Ok(Some(data))
}
pub async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Result<Option<usize>, Error> {
let chunk = match self.read(buf.remaining_mut()).await? {
Some(chunk) => chunk,
None => return Ok(None),
};
let size = chunk.len();
buf.put(chunk);
Ok(Some(size))
}
pub fn stop(&mut self, reason: &str) {
self.reader.abort(reason);
}
pub async fn closed(&self) -> Result<Option<u8>, Error> {
let err = match self.reader.closed().await {
Ok(()) => return Ok(None),
Err(err) => Error::from(err),
};
if let Error::Stream(err) = &err {
if let Some(code) = err.stream_error_code() {
return Ok(Some(code));
}
}
Err(err)
}
}
impl Drop for RecvStream {
fn drop(&mut self) {
self.reader.abort("dropped");
}
}