tokio_uring/net/tcp/
stream.rs

1use std::{
2    io,
3    net::SocketAddr,
4    os::unix::prelude::{AsRawFd, FromRawFd, RawFd},
5};
6
7use crate::{
8    buf::fixed::FixedBuf,
9    buf::{BoundedBuf, BoundedBufMut},
10    io::{SharedFd, Socket},
11    UnsubmittedWrite,
12};
13
14/// A TCP stream between a local and a remote socket.
15///
16/// A TCP stream can either be created by connecting to an endpoint, via the
17/// [`connect`] method, or by [`accepting`] a connection from a [`listener`].
18///
19/// # Examples
20///
21/// ```no_run
22/// use tokio_uring::net::TcpStream;
23/// use std::net::ToSocketAddrs;
24///
25/// fn main() -> std::io::Result<()> {
26///     tokio_uring::start(async {
27///         // Connect to a peer
28///         let mut stream = TcpStream::connect("127.0.0.1:8080".parse().unwrap()).await?;
29///
30///         // Write some data.
31///         let (result, _) = stream.write(b"hello world!".as_slice()).submit().await;
32///         result.unwrap();
33///
34///         Ok(())
35///     })
36/// }
37/// ```
38///
39/// [`connect`]: TcpStream::connect
40/// [`accepting`]: crate::net::TcpListener::accept
41/// [`listener`]: crate::net::TcpListener
42pub struct TcpStream {
43    pub(super) inner: Socket,
44}
45
46impl TcpStream {
47    /// Opens a TCP connection to a remote host at the given `SocketAddr`
48    pub async fn connect(addr: SocketAddr) -> io::Result<TcpStream> {
49        let socket = Socket::new(addr, libc::SOCK_STREAM)?;
50        socket.connect(socket2::SockAddr::from(addr)).await?;
51        let tcp_stream = TcpStream { inner: socket };
52        Ok(tcp_stream)
53    }
54
55    /// Creates new `TcpStream` from a previously bound `std::net::TcpStream`.
56    ///
57    /// This function is intended to be used to wrap a TCP stream from the
58    /// standard library in the tokio-uring equivalent. The conversion assumes nothing
59    /// about the underlying socket; it is left up to the user to decide what socket
60    /// options are appropriate for their use case.
61    ///
62    /// This can be used in conjunction with socket2's `Socket` interface to
63    /// configure a socket before it's handed off, such as setting options like
64    /// `reuse_address` or binding to multiple addresses.
65    pub fn from_std(socket: std::net::TcpStream) -> Self {
66        let inner = Socket::from_std(socket);
67        Self { inner }
68    }
69
70    pub(crate) fn from_socket(inner: Socket) -> Self {
71        Self { inner }
72    }
73
74    /// Read some data from the stream into the buffer.
75    ///
76    /// Returns the original buffer and quantity of data read.
77    pub async fn read<T: BoundedBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
78        self.inner.read(buf).await
79    }
80
81    /// Read some data from the stream into a registered buffer.
82    ///
83    /// Like [`read`], but using a pre-mapped buffer
84    /// registered with [`FixedBufRegistry`].
85    ///
86    /// [`read`]: Self::read
87    /// [`FixedBufRegistry`]: crate::buf::fixed::FixedBufRegistry
88    ///
89    /// # Errors
90    ///
91    /// In addition to errors that can be reported by `read`,
92    /// this operation fails if the buffer is not registered in the
93    /// current `tokio-uring` runtime.
94    pub async fn read_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
95    where
96        T: BoundedBufMut<BufMut = FixedBuf>,
97    {
98        self.inner.read_fixed(buf).await
99    }
100
101    /// Write some data to the stream from the buffer.
102    ///
103    /// Returns the original buffer and quantity of data written.
104    pub fn write<T: BoundedBuf>(&self, buf: T) -> UnsubmittedWrite<T> {
105        self.inner.write(buf)
106    }
107
108    /// Attempts to write an entire buffer to the stream.
109    ///
110    /// This method will continuously call [`write`] until there is no more data to be
111    /// written or an error is returned. This method will not return until the entire
112    /// buffer has been successfully written or an error has occurred.
113    ///
114    /// If the buffer contains no data, this will never call [`write`].
115    ///
116    /// # Errors
117    ///
118    /// This function will return the first error that [`write`] returns.
119    ///
120    /// # Examples
121    ///
122    /// ```no_run
123    /// use std::net::SocketAddr;
124    /// use tokio_uring::net::TcpListener;
125    /// use tokio_uring::buf::BoundedBuf;
126    ///
127    /// let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
128    ///
129    /// tokio_uring::start(async {
130    ///     let listener = TcpListener::bind(addr).unwrap();
131    ///
132    ///     println!("Listening on {}", listener.local_addr().unwrap());
133    ///
134    ///     loop {
135    ///         let (stream, _) = listener.accept().await.unwrap();
136    ///         tokio_uring::spawn(async move {
137    ///             let mut n = 0;
138    ///             let mut buf = vec![0u8; 4096];
139    ///             loop {
140    ///                 let (result, nbuf) = stream.read(buf).await;
141    ///                 buf = nbuf;
142    ///                 let read = result.unwrap();
143    ///                 if read == 0 {
144    ///                     break;
145    ///                 }
146    ///
147    ///                 let (res, slice) = stream.write_all(buf.slice(..read)).await;
148    ///                 let _ = res.unwrap();
149    ///                 buf = slice.into_inner();
150    ///                 n += read;
151    ///             }
152    ///         });
153    ///     }
154    /// });
155    /// ```
156    ///
157    /// [`write`]: Self::write
158    pub async fn write_all<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<(), T> {
159        self.inner.write_all(buf).await
160    }
161
162    /// Writes data into the socket from a registered buffer.
163    ///
164    /// Like [`write`], but using a pre-mapped buffer
165    /// registered with [`FixedBufRegistry`].
166    ///
167    /// [`write`]: Self::write
168    /// [`FixedBufRegistry`]: crate::buf::fixed::FixedBufRegistry
169    ///
170    /// # Errors
171    ///
172    /// In addition to errors that can be reported by `write`,
173    /// this operation fails if the buffer is not registered in the
174    /// current `tokio-uring` runtime.
175    pub async fn write_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
176    where
177        T: BoundedBuf<Buf = FixedBuf>,
178    {
179        self.inner.write_fixed(buf).await
180    }
181
182    /// Attempts to write an entire buffer to the stream.
183    ///
184    /// This method will continuously call [`write_fixed`] until there is no more data to be
185    /// written or an error is returned. This method will not return until the entire
186    /// buffer has been successfully written or an error has occurred.
187    ///
188    /// If the buffer contains no data, this will never call [`write_fixed`].
189    ///
190    /// # Errors
191    ///
192    /// This function will return the first error that [`write_fixed`] returns.
193    ///
194    /// [`write_fixed`]: Self::write_fixed
195    pub async fn write_fixed_all<T>(&self, buf: T) -> crate::BufResult<(), T>
196    where
197        T: BoundedBuf<Buf = FixedBuf>,
198    {
199        self.inner.write_fixed_all(buf).await
200    }
201
202    /// Writes data from multiple buffers into this socket using the scatter/gather IO style.
203    ///
204    /// This function will attempt to write the entire contents of `bufs`, but
205    /// the entire write may not succeed, or the write may also generate an
206    /// error. The bytes will be written starting at the specified offset.
207    ///
208    /// # Return
209    ///
210    /// The method returns the operation result and the same array of buffers
211    /// passed in as an argument. A return value of `0` typically means that the
212    /// underlying socket is no longer able to accept bytes and will likely not
213    /// be able to in the future as well, or that the buffer provided is empty.
214    ///
215    /// # Errors
216    ///
217    /// Each call to `write` may generate an I/O error indicating that the
218    /// operation could not be completed. If an error is returned then no bytes
219    /// in the buffer were written to this writer.
220    ///
221    /// It is **not** considered an error if the entire buffer could not be
222    /// written to this writer.
223    ///
224    /// [`Ok(n)`]: Ok
225    pub async fn writev<T: BoundedBuf>(&self, buf: Vec<T>) -> crate::BufResult<usize, Vec<T>> {
226        self.inner.writev(buf).await
227    }
228
229    /// Shuts down the read, write, or both halves of this connection.
230    ///
231    /// This function will cause all pending and future I/O on the specified portions to return
232    /// immediately with an appropriate value.
233    pub fn shutdown(&self, how: std::net::Shutdown) -> io::Result<()> {
234        self.inner.shutdown(how)
235    }
236
237    /// Sets the value of the TCP_NODELAY option on this socket.
238    ///
239    /// If set, this option disables the Nagle algorithm. This means that segments are always sent
240    /// as soon as possible, even if there is only a small amount of data. When not set, data is
241    /// buffered until there is a sufficient amount to send out, thereby avoiding the frequent
242    /// sending of small packets.
243    pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
244        self.inner.set_nodelay(nodelay)
245    }
246}
247
248impl FromRawFd for TcpStream {
249    unsafe fn from_raw_fd(fd: RawFd) -> Self {
250        TcpStream::from_socket(Socket::from_shared_fd(SharedFd::new(fd)))
251    }
252}
253
254impl AsRawFd for TcpStream {
255    fn as_raw_fd(&self) -> RawFd {
256        self.inner.as_raw_fd()
257    }
258}