memberlist_net/stream_layer/
tcp.rs1use std::{io, marker::PhantomData, net::SocketAddr};
2
3use agnostic::{
4 Runtime,
5 net::{Net, TcpListener as _, TcpStream as _},
6};
7use futures::{AsyncReadExt, AsyncWriteExt};
8use peekable::future::AsyncPeekable;
9
10use super::{Listener, PromisedStream, StreamLayer};
11
12#[repr(transparent)]
14pub struct Tcp<R>(PhantomData<R>);
15
16impl<R> Clone for Tcp<R> {
17 #[inline]
18 fn clone(&self) -> Self {
19 *self
20 }
21}
22
23impl<R> Copy for Tcp<R> {}
24
25impl<R> Default for Tcp<R> {
26 #[inline]
27 fn default() -> Self {
28 Self(PhantomData)
29 }
30}
31
32impl<R> Tcp<R> {
33 #[inline]
35 pub const fn new() -> Self {
36 Self(PhantomData)
37 }
38}
39
40impl<R: Runtime> StreamLayer for Tcp<R> {
41 type Runtime = R;
42 type Listener = TcpListener<R>;
43 type Stream = TcpStream<R>;
44 type Options = ();
45
46 #[inline]
47 async fn new(_: Self::Options) -> io::Result<Self> {
48 Ok(Self::default())
49 }
50
51 async fn connect(&self, addr: SocketAddr) -> io::Result<Self::Stream> {
52 <<R::Net as Net>::TcpStream as agnostic::net::TcpStream>::connect(addr)
53 .await
54 .and_then(|stream| {
55 let local_addr = stream.local_addr()?;
56 let (reader, writer) = stream.into_split();
57
58 Ok(TcpStream {
59 local_addr,
60 peer_addr: addr,
61 reader: AsyncPeekable::new(reader),
62 writer,
63 })
64 })
65 }
66
67 async fn bind(&self, addr: SocketAddr) -> io::Result<Self::Listener> {
68 <<R::Net as Net>::TcpListener as agnostic::net::TcpListener>::bind(addr)
69 .await
70 .and_then(|ln| {
71 ln.local_addr()
72 .map(|local_addr| TcpListener { ln, local_addr })
73 })
74 }
75
76 fn is_secure() -> bool {
77 false
78 }
79}
80
81pub struct TcpListener<R: Runtime> {
83 ln: <R::Net as Net>::TcpListener,
84 local_addr: SocketAddr,
85}
86
87impl<R: Runtime> Listener for TcpListener<R> {
88 type Stream = TcpStream<R>;
89
90 async fn accept(&self) -> io::Result<(Self::Stream, SocketAddr)> {
91 self.ln.accept().await.map(|(conn, addr)| {
92 let (reader, writer) = conn.into_split();
93
94 (
95 TcpStream {
96 writer,
97 reader: AsyncPeekable::new(reader),
98 local_addr: self.local_addr,
99 peer_addr: addr,
100 },
101 addr,
102 )
103 })
104 }
105
106 async fn shutdown(&self) -> io::Result<()> {
107 Ok(())
108 }
109
110 fn local_addr(&self) -> SocketAddr {
111 self.local_addr
112 }
113}
114
115#[pin_project::pin_project]
117pub struct TcpStream<R: Runtime> {
118 #[pin]
119 writer: <<R::Net as Net>::TcpStream as agnostic::net::TcpStream>::OwnedWriteHalf,
120 #[pin]
121 reader: AsyncPeekable<<<R::Net as Net>::TcpStream as agnostic::net::TcpStream>::OwnedReadHalf>,
122 local_addr: SocketAddr,
123 peer_addr: SocketAddr,
124}
125
126impl<R: Runtime> memberlist_core::transport::Connection for TcpStream<R> {
127 type Reader =
128 AsyncPeekable<<<R::Net as Net>::TcpStream as agnostic::net::TcpStream>::OwnedReadHalf>;
129
130 type Writer = <<R::Net as Net>::TcpStream as agnostic::net::TcpStream>::OwnedWriteHalf;
131
132 #[inline]
133 fn split(self) -> (Self::Reader, Self::Writer) {
134 (self.reader, self.writer)
135 }
136
137 async fn close(&mut self) -> std::io::Result<()> {
138 AsyncWriteExt::close(&mut self.writer).await
139 }
140
141 async fn write_all(&mut self, payload: &[u8]) -> std::io::Result<()> {
142 AsyncWriteExt::write_all(&mut self.writer, payload).await
143 }
144
145 async fn flush(&mut self) -> std::io::Result<()> {
146 AsyncWriteExt::flush(&mut self.writer).await
147 }
148
149 async fn peek(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
150 self.reader.peek(buf).await
151 }
152
153 async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
154 AsyncReadExt::read_exact(&mut self.reader, buf).await
155 }
156
157 async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
158 AsyncReadExt::read(&mut self.reader, buf).await
159 }
160
161 async fn peek_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
162 self.reader.peek_exact(buf).await
163 }
164
165 fn consume_peek(&mut self) {
166 self.reader.consume();
167 }
168}
169
170impl<R: Runtime> PromisedStream for TcpStream<R> {
171 type Instant = R::Instant;
172
173 #[inline]
174 fn local_addr(&self) -> SocketAddr {
175 self.local_addr
176 }
177
178 #[inline]
179 fn peer_addr(&self) -> SocketAddr {
180 self.peer_addr
181 }
182}