compio_net/
tcp.rs
1use std::{future::Future, io, net::SocketAddr};
2
3use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
4use compio_driver::impl_raw_fd;
5use compio_io::{AsyncRead, AsyncWrite};
6use socket2::{Protocol, SockAddr, Socket as Socket2, Type};
7
8use crate::{
9 OwnedReadHalf, OwnedWriteHalf, PollFd, ReadHalf, Socket, ToSocketAddrsAsync, WriteHalf,
10};
11
12#[derive(Debug, Clone)]
45pub struct TcpListener {
46 inner: Socket,
47}
48
49impl TcpListener {
50 pub async fn bind(addr: impl ToSocketAddrsAsync) -> io::Result<Self> {
58 super::each_addr(addr, |addr| async move {
59 let socket =
60 Socket::bind(&SockAddr::from(addr), Type::STREAM, Some(Protocol::TCP)).await?;
61 socket.listen(128)?;
62 Ok(Self { inner: socket })
63 })
64 .await
65 }
66
67 pub fn close(self) -> impl Future<Output = io::Result<()>> {
70 self.inner.close()
71 }
72
73 pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
79 let (socket, addr) = self.inner.accept().await?;
80 let stream = TcpStream { inner: socket };
81 Ok((stream, addr.as_socket().expect("should be SocketAddr")))
82 }
83
84 pub fn local_addr(&self) -> io::Result<SocketAddr> {
108 self.inner
109 .local_addr()
110 .map(|addr| addr.as_socket().expect("should be SocketAddr"))
111 }
112}
113
114impl_raw_fd!(TcpListener, socket2::Socket, inner, socket);
115
116#[derive(Debug, Clone)]
138pub struct TcpStream {
139 inner: Socket,
140}
141
142impl TcpStream {
143 pub async fn connect(addr: impl ToSocketAddrsAsync) -> io::Result<Self> {
145 use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6};
146
147 super::each_addr(addr, |addr| async move {
148 let addr2 = SockAddr::from(addr);
149 let socket = if cfg!(windows) {
150 let bind_addr = if addr.is_ipv4() {
151 SockAddr::from(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))
152 } else if addr.is_ipv6() {
153 SockAddr::from(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0))
154 } else {
155 return Err(io::Error::new(
156 io::ErrorKind::AddrNotAvailable,
157 "Unsupported address domain.",
158 ));
159 };
160 Socket::bind(&bind_addr, Type::STREAM, Some(Protocol::TCP)).await?
161 } else {
162 Socket::new(addr2.domain(), Type::STREAM, Some(Protocol::TCP)).await?
163 };
164 socket.connect_async(&addr2).await?;
165 Ok(Self { inner: socket })
166 })
167 .await
168 }
169
170 pub fn from_std(stream: std::net::TcpStream) -> io::Result<Self> {
172 Ok(Self {
173 inner: Socket::from_socket2(Socket2::from(stream))?,
174 })
175 }
176
177 pub fn close(self) -> impl Future<Output = io::Result<()>> {
180 self.inner.close()
181 }
182
183 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
185 self.inner
186 .peer_addr()
187 .map(|addr| addr.as_socket().expect("should be SocketAddr"))
188 }
189
190 pub fn local_addr(&self) -> io::Result<SocketAddr> {
192 self.inner
193 .local_addr()
194 .map(|addr| addr.as_socket().expect("should be SocketAddr"))
195 }
196
197 pub fn split(&self) -> (ReadHalf<Self>, WriteHalf<Self>) {
204 crate::split(self)
205 }
206
207 pub fn into_split(self) -> (OwnedReadHalf<Self>, OwnedWriteHalf<Self>) {
213 crate::into_split(self)
214 }
215
216 pub fn to_poll_fd(&self) -> io::Result<PollFd<Socket2>> {
218 self.inner.to_poll_fd()
219 }
220
221 pub fn into_poll_fd(self) -> io::Result<PollFd<Socket2>> {
223 self.inner.into_poll_fd()
224 }
225}
226
227impl AsyncRead for TcpStream {
228 #[inline]
229 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
230 (&*self).read(buf).await
231 }
232
233 #[inline]
234 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
235 (&*self).read_vectored(buf).await
236 }
237}
238
239impl AsyncRead for &TcpStream {
240 #[inline]
241 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
242 self.inner.recv(buf).await
243 }
244
245 #[inline]
246 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
247 self.inner.recv_vectored(buf).await
248 }
249}
250
251impl AsyncWrite for TcpStream {
252 #[inline]
253 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
254 (&*self).write(buf).await
255 }
256
257 #[inline]
258 async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
259 (&*self).write_vectored(buf).await
260 }
261
262 #[inline]
263 async fn flush(&mut self) -> io::Result<()> {
264 (&*self).flush().await
265 }
266
267 #[inline]
268 async fn shutdown(&mut self) -> io::Result<()> {
269 (&*self).shutdown().await
270 }
271}
272
273impl AsyncWrite for &TcpStream {
274 #[inline]
275 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
276 self.inner.send(buf).await
277 }
278
279 #[inline]
280 async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
281 self.inner.send_vectored(buf).await
282 }
283
284 #[inline]
285 async fn flush(&mut self) -> io::Result<()> {
286 Ok(())
287 }
288
289 #[inline]
290 async fn shutdown(&mut self) -> io::Result<()> {
291 self.inner.shutdown().await
292 }
293}
294
295impl_raw_fd!(TcpStream, socket2::Socket, inner, socket);