1use std::{future::Future, io, net::SocketAddr};
2
3use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
4use compio_driver::impl_raw_fd;
5use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite, util::Splittable};
6use compio_runtime::{BorrowedBuffer, BufferPool};
7use socket2::{Protocol, SockAddr, Socket as Socket2, Type};
8
9use crate::{
10 OwnedReadHalf, OwnedWriteHalf, PollFd, ReadHalf, Socket, TcpOpts, ToSocketAddrsAsync, WriteHalf,
11};
12
13#[derive(Debug, Clone)]
46pub struct TcpListener {
47 inner: Socket,
48}
49
50impl TcpListener {
51 pub async fn bind(addr: impl ToSocketAddrsAsync) -> io::Result<Self> {
59 Self::bind_with_options(addr, TcpOpts::default().reuse_address(true)).await
60 }
61
62 pub async fn bind_with_options(
70 addr: impl ToSocketAddrsAsync,
71 options: TcpOpts,
72 ) -> io::Result<Self> {
73 super::each_addr(addr, |addr| async move {
74 let sa = SockAddr::from(addr);
75 let socket = Socket::new(sa.domain(), Type::STREAM, Some(Protocol::TCP)).await?;
76 options.setup_socket(&socket)?;
77 socket.socket.bind(&sa)?;
78 socket.listen(128)?;
79 Ok(Self { inner: socket })
80 })
81 .await
82 }
83
84 pub fn from_std(stream: std::net::TcpListener) -> io::Result<Self> {
86 Ok(Self {
87 inner: Socket::from_socket2(Socket2::from(stream))?,
88 })
89 }
90
91 pub fn close(self) -> impl Future<Output = io::Result<()>> {
94 self.inner.close()
95 }
96
97 pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
103 let (socket, addr) = self.inner.accept().await?;
104 let stream = TcpStream { inner: socket };
105 Ok((stream, addr.as_socket().expect("should be SocketAddr")))
106 }
107
108 pub fn local_addr(&self) -> io::Result<SocketAddr> {
132 self.inner
133 .local_addr()
134 .map(|addr| addr.as_socket().expect("should be SocketAddr"))
135 }
136}
137
138impl_raw_fd!(TcpListener, socket2::Socket, inner, socket);
139
140#[derive(Debug, Clone)]
162pub struct TcpStream {
163 inner: Socket,
164}
165
166impl TcpStream {
167 pub async fn connect(addr: impl ToSocketAddrsAsync) -> io::Result<Self> {
169 Self::connect_base(addr, None).await
170 }
171
172 pub async fn connect_with_options(
174 addr: impl ToSocketAddrsAsync,
175 options: TcpOpts,
176 ) -> io::Result<Self> {
177 Self::connect_base(addr, Some(options)).await
178 }
179
180 async fn connect_base(
181 addr: impl ToSocketAddrsAsync,
182 options: Option<TcpOpts>,
183 ) -> io::Result<Self> {
184 use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6};
185
186 super::each_addr(addr, |addr| async move {
187 let addr2 = SockAddr::from(addr);
188 let socket = if cfg!(windows) {
189 let bind_addr = if addr.is_ipv4() {
190 SockAddr::from(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))
191 } else if addr.is_ipv6() {
192 SockAddr::from(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0))
193 } else {
194 return Err(io::Error::new(
195 io::ErrorKind::AddrNotAvailable,
196 "Unsupported address domain.",
197 ));
198 };
199 Socket::bind(&bind_addr, Type::STREAM, Some(Protocol::TCP)).await?
200 } else {
201 Socket::new(addr2.domain(), Type::STREAM, Some(Protocol::TCP)).await?
202 };
203 if let Some(options) = &options {
204 options.setup_socket(&socket)?;
205 }
206 socket.connect_async(&addr2).await?;
207 Ok(Self { inner: socket })
208 })
209 .await
210 }
211
212 pub async fn bind_and_connect(
214 bind_addr: SocketAddr,
215 addr: impl ToSocketAddrsAsync,
216 ) -> io::Result<Self> {
217 Self::bind_and_connect_base(bind_addr, addr, None).await
218 }
219
220 pub async fn bind_and_connect_with_options(
223 bind_addr: SocketAddr,
224 addr: impl ToSocketAddrsAsync,
225 options: TcpOpts,
226 ) -> io::Result<Self> {
227 Self::bind_and_connect_base(bind_addr, addr, Some(options)).await
228 }
229
230 async fn bind_and_connect_base(
231 bind_addr: SocketAddr,
232 addr: impl ToSocketAddrsAsync,
233 options: Option<TcpOpts>,
234 ) -> io::Result<Self> {
235 let options = options.unwrap_or_default();
236 super::each_addr(addr, |addr| async move {
237 let addr = SockAddr::from(addr);
238 let bind_addr = SockAddr::from(bind_addr);
239
240 let socket = Socket::bind(&bind_addr, Type::STREAM, Some(Protocol::TCP)).await?;
241 options.setup_socket(&socket)?;
242 socket.connect_async(&addr).await?;
243 Ok(Self { inner: socket })
244 })
245 .await
246 }
247
248 pub fn from_std(stream: std::net::TcpStream) -> io::Result<Self> {
250 Ok(Self {
251 inner: Socket::from_socket2(Socket2::from(stream))?,
252 })
253 }
254
255 pub fn close(self) -> impl Future<Output = io::Result<()>> {
258 self.inner.close()
259 }
260
261 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
263 self.inner
264 .peer_addr()
265 .map(|addr| addr.as_socket().expect("should be SocketAddr"))
266 }
267
268 pub fn local_addr(&self) -> io::Result<SocketAddr> {
270 self.inner
271 .local_addr()
272 .map(|addr| addr.as_socket().expect("should be SocketAddr"))
273 }
274
275 pub fn split(&self) -> (ReadHalf<'_, Self>, WriteHalf<'_, Self>) {
282 crate::split(self)
283 }
284
285 pub fn into_split(self) -> (OwnedReadHalf<Self>, OwnedWriteHalf<Self>) {
291 crate::into_split(self)
292 }
293
294 pub fn to_poll_fd(&self) -> io::Result<PollFd<Socket2>> {
296 self.inner.to_poll_fd()
297 }
298
299 pub fn into_poll_fd(self) -> io::Result<PollFd<Socket2>> {
301 self.inner.into_poll_fd()
302 }
303
304 pub fn nodelay(&self) -> io::Result<bool> {
309 self.inner.socket.tcp_nodelay()
310 }
311
312 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
320 self.inner.socket.set_tcp_nodelay(nodelay)
321 }
322}
323
324impl AsyncRead for TcpStream {
325 #[inline]
326 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
327 (&*self).read(buf).await
328 }
329
330 #[inline]
331 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
332 (&*self).read_vectored(buf).await
333 }
334}
335
336impl AsyncRead for &TcpStream {
337 #[inline]
338 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
339 self.inner.recv(buf).await
340 }
341
342 #[inline]
343 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
344 self.inner.recv_vectored(buf).await
345 }
346}
347
348impl AsyncReadManaged for TcpStream {
349 type Buffer<'a> = BorrowedBuffer<'a>;
350 type BufferPool = BufferPool;
351
352 async fn read_managed<'a>(
353 &mut self,
354 buffer_pool: &'a Self::BufferPool,
355 len: usize,
356 ) -> io::Result<Self::Buffer<'a>> {
357 (&*self).read_managed(buffer_pool, len).await
358 }
359}
360
361impl AsyncReadManaged for &TcpStream {
362 type Buffer<'a> = BorrowedBuffer<'a>;
363 type BufferPool = BufferPool;
364
365 async fn read_managed<'a>(
366 &mut self,
367 buffer_pool: &'a Self::BufferPool,
368 len: usize,
369 ) -> io::Result<Self::Buffer<'a>> {
370 self.inner.recv_managed(buffer_pool, len as _).await
371 }
372}
373
374impl AsyncWrite for TcpStream {
375 #[inline]
376 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
377 (&*self).write(buf).await
378 }
379
380 #[inline]
381 async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
382 (&*self).write_vectored(buf).await
383 }
384
385 #[inline]
386 async fn flush(&mut self) -> io::Result<()> {
387 (&*self).flush().await
388 }
389
390 #[inline]
391 async fn shutdown(&mut self) -> io::Result<()> {
392 (&*self).shutdown().await
393 }
394}
395
396impl AsyncWrite for &TcpStream {
397 #[inline]
398 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
399 self.inner.send(buf).await
400 }
401
402 #[inline]
403 async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
404 self.inner.send_vectored(buf).await
405 }
406
407 #[inline]
408 async fn flush(&mut self) -> io::Result<()> {
409 Ok(())
410 }
411
412 #[inline]
413 async fn shutdown(&mut self) -> io::Result<()> {
414 self.inner.shutdown().await
415 }
416}
417
418impl Splittable for TcpStream {
419 type ReadHalf = OwnedReadHalf<Self>;
420 type WriteHalf = OwnedWriteHalf<Self>;
421
422 fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
423 crate::into_split(self)
424 }
425}
426
427impl<'a> Splittable for &'a TcpStream {
428 type ReadHalf = ReadHalf<'a, TcpStream>;
429 type WriteHalf = WriteHalf<'a, TcpStream>;
430
431 fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
432 crate::split(self)
433 }
434}
435
436impl_raw_fd!(TcpStream, socket2::Socket, inner, socket);