memberlist_net/stream_layer/
tcp.rs

1use std::{io, marker::PhantomData, net::SocketAddr};
2
3use agnostic::{
4  Runtime,
5  net::{Net, TcpListener as _, TcpStream as _},
6};
7use futures::{AsyncReadExt, AsyncWriteExt};
8use peekable::future::AsyncPeekable;
9
10use super::{Listener, PromisedStream, StreamLayer};
11
12/// Tcp stream layer.
13#[repr(transparent)]
14pub struct Tcp<R>(PhantomData<R>);
15
16impl<R> Clone for Tcp<R> {
17  #[inline]
18  fn clone(&self) -> Self {
19    *self
20  }
21}
22
23impl<R> Copy for Tcp<R> {}
24
25impl<R> Default for Tcp<R> {
26  #[inline]
27  fn default() -> Self {
28    Self(PhantomData)
29  }
30}
31
32impl<R> Tcp<R> {
33  /// Creates a new instance.
34  #[inline]
35  pub const fn new() -> Self {
36    Self(PhantomData)
37  }
38}
39
40impl<R: Runtime> StreamLayer for Tcp<R> {
41  type Runtime = R;
42  type Listener = TcpListener<R>;
43  type Stream = TcpStream<R>;
44  type Options = ();
45
46  #[inline]
47  async fn new(_: Self::Options) -> io::Result<Self> {
48    Ok(Self::default())
49  }
50
51  async fn connect(&self, addr: SocketAddr) -> io::Result<Self::Stream> {
52    <<R::Net as Net>::TcpStream as agnostic::net::TcpStream>::connect(addr)
53      .await
54      .and_then(|stream| {
55        let local_addr = stream.local_addr()?;
56        let (reader, writer) = stream.into_split();
57
58        Ok(TcpStream {
59          local_addr,
60          peer_addr: addr,
61          reader: AsyncPeekable::new(reader),
62          writer,
63        })
64      })
65  }
66
67  async fn bind(&self, addr: SocketAddr) -> io::Result<Self::Listener> {
68    <<R::Net as Net>::TcpListener as agnostic::net::TcpListener>::bind(addr)
69      .await
70      .and_then(|ln| {
71        ln.local_addr()
72          .map(|local_addr| TcpListener { ln, local_addr })
73      })
74  }
75
76  fn is_secure() -> bool {
77    false
78  }
79}
80
81/// [`Listener`] of the TCP stream layer
82pub struct TcpListener<R: Runtime> {
83  ln: <R::Net as Net>::TcpListener,
84  local_addr: SocketAddr,
85}
86
87impl<R: Runtime> Listener for TcpListener<R> {
88  type Stream = TcpStream<R>;
89
90  async fn accept(&self) -> io::Result<(Self::Stream, SocketAddr)> {
91    self.ln.accept().await.map(|(conn, addr)| {
92      let (reader, writer) = conn.into_split();
93
94      (
95        TcpStream {
96          writer,
97          reader: AsyncPeekable::new(reader),
98          local_addr: self.local_addr,
99          peer_addr: addr,
100        },
101        addr,
102      )
103    })
104  }
105
106  async fn shutdown(&self) -> io::Result<()> {
107    Ok(())
108  }
109
110  fn local_addr(&self) -> SocketAddr {
111    self.local_addr
112  }
113}
114
115/// [`PromisedStream`] of the TCP stream layer
116#[pin_project::pin_project]
117pub struct TcpStream<R: Runtime> {
118  #[pin]
119  writer: <<R::Net as Net>::TcpStream as agnostic::net::TcpStream>::OwnedWriteHalf,
120  #[pin]
121  reader: AsyncPeekable<<<R::Net as Net>::TcpStream as agnostic::net::TcpStream>::OwnedReadHalf>,
122  local_addr: SocketAddr,
123  peer_addr: SocketAddr,
124}
125
126impl<R: Runtime> memberlist_core::transport::Connection for TcpStream<R> {
127  type Reader =
128    AsyncPeekable<<<R::Net as Net>::TcpStream as agnostic::net::TcpStream>::OwnedReadHalf>;
129
130  type Writer = <<R::Net as Net>::TcpStream as agnostic::net::TcpStream>::OwnedWriteHalf;
131
132  #[inline]
133  fn split(self) -> (Self::Reader, Self::Writer) {
134    (self.reader, self.writer)
135  }
136
137  async fn close(&mut self) -> std::io::Result<()> {
138    AsyncWriteExt::close(&mut self.writer).await
139  }
140
141  async fn write_all(&mut self, payload: &[u8]) -> std::io::Result<()> {
142    AsyncWriteExt::write_all(&mut self.writer, payload).await
143  }
144
145  async fn flush(&mut self) -> std::io::Result<()> {
146    AsyncWriteExt::flush(&mut self.writer).await
147  }
148
149  async fn peek(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
150    self.reader.peek(buf).await
151  }
152
153  async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
154    AsyncReadExt::read_exact(&mut self.reader, buf).await
155  }
156
157  async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
158    AsyncReadExt::read(&mut self.reader, buf).await
159  }
160
161  async fn peek_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
162    self.reader.peek_exact(buf).await
163  }
164
165  fn consume_peek(&mut self) {
166    self.reader.consume();
167  }
168}
169
170impl<R: Runtime> PromisedStream for TcpStream<R> {
171  type Instant = R::Instant;
172
173  #[inline]
174  fn local_addr(&self) -> SocketAddr {
175    self.local_addr
176  }
177
178  #[inline]
179  fn peer_addr(&self) -> SocketAddr {
180    self.peer_addr
181  }
182}