librqbit_utp/
stream.rs

1use std::{net::SocketAddr, pin::Pin, task::Poll};
2
3use tokio::io::{AsyncRead, AsyncWrite};
4
5use crate::{stream_rx::UtpStreamReadHalf, stream_tx::UtpStreamWriteHalf};
6
7pub struct UtpStream {
8    reader: UtpStreamReadHalf,
9    writer: UtpStreamWriteHalf,
10    remote_addr: SocketAddr,
11}
12
13impl UtpStream {
14    pub(crate) fn new(
15        reader: UtpStreamReadHalf,
16        writer: UtpStreamWriteHalf,
17        remote_addr: SocketAddr,
18    ) -> Self {
19        Self {
20            reader,
21            writer,
22            remote_addr,
23        }
24    }
25
26    pub fn remote_addr(&self) -> SocketAddr {
27        self.remote_addr
28    }
29
30    pub fn split(self) -> (UtpStreamReadHalf, UtpStreamWriteHalf) {
31        (self.reader, self.writer)
32    }
33
34    #[cfg(test)]
35    pub async fn read_all_available(&mut self) -> std::io::Result<Vec<u8>> {
36        self.reader.read_all_available().await
37    }
38}
39
40impl AsyncRead for UtpStream {
41    fn poll_read(
42        self: Pin<&mut Self>,
43        cx: &mut std::task::Context<'_>,
44        buf: &mut tokio::io::ReadBuf<'_>,
45    ) -> Poll<std::io::Result<()>> {
46        Pin::new(&mut self.get_mut().reader).poll_read(cx, buf)
47    }
48}
49
50impl AsyncWrite for UtpStream {
51    fn poll_write(
52        self: Pin<&mut Self>,
53        cx: &mut std::task::Context<'_>,
54        buf: &[u8],
55    ) -> Poll<Result<usize, std::io::Error>> {
56        Pin::new(&mut self.get_mut().writer).poll_write(cx, buf)
57    }
58
59    fn poll_flush(
60        self: Pin<&mut Self>,
61        cx: &mut std::task::Context<'_>,
62    ) -> Poll<Result<(), std::io::Error>> {
63        Pin::new(&mut self.get_mut().writer).poll_flush(cx)
64    }
65
66    fn poll_shutdown(
67        self: Pin<&mut Self>,
68        cx: &mut std::task::Context<'_>,
69    ) -> Poll<Result<(), std::io::Error>> {
70        Pin::new(&mut self.get_mut().writer).poll_shutdown(cx)
71    }
72}