1use std::cmp;
2
3use bytes::{BufMut, Bytes};
4use js_sys::Uint8Array;
5use web_sys::WebTransportReceiveStream;
6
7use crate::Error;
8use web_streams::Reader;
9
10pub struct RecvStream {
14 reader: Reader<Uint8Array>,
15 buffer: Bytes,
16}
17
18impl RecvStream {
19 pub(super) fn new(stream: WebTransportReceiveStream) -> Result<Self, Error> {
20 let reader = Reader::new(&stream)?;
21
22 Ok(Self {
23 reader,
24 buffer: Bytes::new(),
25 })
26 }
27
28 pub async fn read(&mut self, max: usize) -> Result<Option<Bytes>, Error> {
32 if !self.buffer.is_empty() {
33 let size = cmp::min(max, self.buffer.len());
34 let data = self.buffer.split_to(size);
35 return Ok(Some(data));
36 }
37
38 let mut data: Bytes = match self.reader.read().await? {
39 Some(data) => Bytes::from(data.to_vec()),
40 None => return Ok(None),
41 };
42
43 if data.len() > max {
44 self.buffer = data.split_off(max);
46 }
47
48 Ok(Some(data))
49 }
50
51 pub async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Result<Option<usize>, Error> {
56 let chunk = match self.read(buf.remaining_mut()).await? {
57 Some(chunk) => chunk,
58 None => return Ok(None),
59 };
60
61 let size = chunk.len();
62 buf.put(chunk);
63
64 Ok(Some(size))
65 }
66
67 pub fn stop(&mut self, reason: &str) {
69 self.reader.abort(reason);
70 }
71
72 pub async fn closed(&self) -> Result<Option<u8>, Error> {
74 let err = match self.reader.closed().await {
75 Ok(()) => return Ok(None),
76 Err(err) => Error::from(err),
77 };
78
79 if let Error::Stream(err) = &err {
81 if let Some(code) = err.stream_error_code() {
82 return Ok(Some(code));
83 }
84 }
85
86 Err(err)
87 }
88}
89
90impl Drop for RecvStream {
91 fn drop(&mut self) {
92 self.reader.abort("dropped");
93 }
94}