webtrans_wasm/
recv.rs

1use std::cmp;
2
3use bytes::{BufMut, Bytes};
4use js_sys::Uint8Array;
5use web_sys::WebTransportReceiveStream;
6
7use crate::Error;
8use web_streams::Reader;
9
10/// A byte stream received from the remote peer.
11///
12/// Either side may close with an error code, or the peer may close with a FIN.
13pub struct RecvStream {
14    reader: Reader<Uint8Array>,
15    buffer: Bytes,
16}
17
18impl RecvStream {
19    pub(super) fn new(stream: WebTransportReceiveStream) -> Result<Self, Error> {
20        let reader = Reader::new(&stream)?;
21
22        Ok(Self {
23            reader,
24            buffer: Bytes::new(),
25        })
26    }
27
28    /// Read the next chunk of data with the provided maximum size.
29    ///
30    /// This returns a chunk of data instead of copying, which can be more efficient.
31    pub async fn read(&mut self, max: usize) -> Result<Option<Bytes>, Error> {
32        if !self.buffer.is_empty() {
33            let size = cmp::min(max, self.buffer.len());
34            let data = self.buffer.split_to(size);
35            return Ok(Some(data));
36        }
37
38        let mut data: Bytes = match self.reader.read().await? {
39            Some(data) => Bytes::from(data.to_vec()),
40            None => return Ok(None),
41        };
42
43        if data.len() > max {
44            // The chunk is too large; buffer the remainder for the next read.
45            self.buffer = data.split_off(max);
46        }
47
48        Ok(Some(data))
49    }
50
51    /// Read some data into the provided buffer.
52    ///
53    /// Returns the (non-zero) number of bytes read, or `None` if the stream is closed.
54    /// Advances the buffer by the number of bytes read.
55    pub async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Result<Option<usize>, Error> {
56        let chunk = match self.read(buf.remaining_mut()).await? {
57            Some(chunk) => chunk,
58            None => return Ok(None),
59        };
60
61        let size = chunk.len();
62        buf.put(chunk);
63
64        Ok(Some(size))
65    }
66
67    /// Abort reading from the stream with the given reason.
68    pub fn stop(&mut self, reason: &str) {
69        self.reader.abort(reason);
70    }
71
72    /// Block until the stream has closed and return the error code, if any.
73    pub async fn closed(&self) -> Result<Option<u8>, Error> {
74        let err = match self.reader.closed().await {
75            Ok(()) => return Ok(None),
76            Err(err) => Error::from(err),
77        };
78
79        // If this is a WebTransportError, extract the error code when available.
80        if let Error::Stream(err) = &err {
81            if let Some(code) = err.stream_error_code() {
82                return Ok(Some(code));
83            }
84        }
85
86        Err(err)
87    }
88}
89
90impl Drop for RecvStream {
91    fn drop(&mut self) {
92        self.reader.abort("dropped");
93    }
94}