tokio_uring/net/tcp/stream.rs
1use std::{
2 io,
3 net::SocketAddr,
4 os::unix::prelude::{AsRawFd, FromRawFd, RawFd},
5};
6
7use crate::{
8 buf::fixed::FixedBuf,
9 buf::{BoundedBuf, BoundedBufMut},
10 io::{SharedFd, Socket},
11 UnsubmittedWrite,
12};
13
14/// A TCP stream between a local and a remote socket.
15///
16/// A TCP stream can either be created by connecting to an endpoint, via the
17/// [`connect`] method, or by [`accepting`] a connection from a [`listener`].
18///
19/// # Examples
20///
21/// ```no_run
22/// use tokio_uring::net::TcpStream;
23/// use std::net::ToSocketAddrs;
24///
25/// fn main() -> std::io::Result<()> {
26/// tokio_uring::start(async {
27/// // Connect to a peer
28/// let mut stream = TcpStream::connect("127.0.0.1:8080".parse().unwrap()).await?;
29///
30/// // Write some data.
31/// let (result, _) = stream.write(b"hello world!".as_slice()).submit().await;
32/// result.unwrap();
33///
34/// Ok(())
35/// })
36/// }
37/// ```
38///
39/// [`connect`]: TcpStream::connect
40/// [`accepting`]: crate::net::TcpListener::accept
41/// [`listener`]: crate::net::TcpListener
42pub struct TcpStream {
43 pub(super) inner: Socket,
44}
45
46impl TcpStream {
47 /// Opens a TCP connection to a remote host at the given `SocketAddr`
48 pub async fn connect(addr: SocketAddr) -> io::Result<TcpStream> {
49 let socket = Socket::new(addr, libc::SOCK_STREAM)?;
50 socket.connect(socket2::SockAddr::from(addr)).await?;
51 let tcp_stream = TcpStream { inner: socket };
52 Ok(tcp_stream)
53 }
54
55 /// Creates new `TcpStream` from a previously bound `std::net::TcpStream`.
56 ///
57 /// This function is intended to be used to wrap a TCP stream from the
58 /// standard library in the tokio-uring equivalent. The conversion assumes nothing
59 /// about the underlying socket; it is left up to the user to decide what socket
60 /// options are appropriate for their use case.
61 ///
62 /// This can be used in conjunction with socket2's `Socket` interface to
63 /// configure a socket before it's handed off, such as setting options like
64 /// `reuse_address` or binding to multiple addresses.
65 pub fn from_std(socket: std::net::TcpStream) -> Self {
66 let inner = Socket::from_std(socket);
67 Self { inner }
68 }
69
70 pub(crate) fn from_socket(inner: Socket) -> Self {
71 Self { inner }
72 }
73
74 /// Read some data from the stream into the buffer.
75 ///
76 /// Returns the original buffer and quantity of data read.
77 pub async fn read<T: BoundedBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
78 self.inner.read(buf).await
79 }
80
81 /// Read some data from the stream into a registered buffer.
82 ///
83 /// Like [`read`], but using a pre-mapped buffer
84 /// registered with [`FixedBufRegistry`].
85 ///
86 /// [`read`]: Self::read
87 /// [`FixedBufRegistry`]: crate::buf::fixed::FixedBufRegistry
88 ///
89 /// # Errors
90 ///
91 /// In addition to errors that can be reported by `read`,
92 /// this operation fails if the buffer is not registered in the
93 /// current `tokio-uring` runtime.
94 pub async fn read_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
95 where
96 T: BoundedBufMut<BufMut = FixedBuf>,
97 {
98 self.inner.read_fixed(buf).await
99 }
100
101 /// Write some data to the stream from the buffer.
102 ///
103 /// Returns the original buffer and quantity of data written.
104 pub fn write<T: BoundedBuf>(&self, buf: T) -> UnsubmittedWrite<T> {
105 self.inner.write(buf)
106 }
107
108 /// Attempts to write an entire buffer to the stream.
109 ///
110 /// This method will continuously call [`write`] until there is no more data to be
111 /// written or an error is returned. This method will not return until the entire
112 /// buffer has been successfully written or an error has occurred.
113 ///
114 /// If the buffer contains no data, this will never call [`write`].
115 ///
116 /// # Errors
117 ///
118 /// This function will return the first error that [`write`] returns.
119 ///
120 /// # Examples
121 ///
122 /// ```no_run
123 /// use std::net::SocketAddr;
124 /// use tokio_uring::net::TcpListener;
125 /// use tokio_uring::buf::BoundedBuf;
126 ///
127 /// let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
128 ///
129 /// tokio_uring::start(async {
130 /// let listener = TcpListener::bind(addr).unwrap();
131 ///
132 /// println!("Listening on {}", listener.local_addr().unwrap());
133 ///
134 /// loop {
135 /// let (stream, _) = listener.accept().await.unwrap();
136 /// tokio_uring::spawn(async move {
137 /// let mut n = 0;
138 /// let mut buf = vec![0u8; 4096];
139 /// loop {
140 /// let (result, nbuf) = stream.read(buf).await;
141 /// buf = nbuf;
142 /// let read = result.unwrap();
143 /// if read == 0 {
144 /// break;
145 /// }
146 ///
147 /// let (res, slice) = stream.write_all(buf.slice(..read)).await;
148 /// let _ = res.unwrap();
149 /// buf = slice.into_inner();
150 /// n += read;
151 /// }
152 /// });
153 /// }
154 /// });
155 /// ```
156 ///
157 /// [`write`]: Self::write
158 pub async fn write_all<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<(), T> {
159 self.inner.write_all(buf).await
160 }
161
162 /// Writes data into the socket from a registered buffer.
163 ///
164 /// Like [`write`], but using a pre-mapped buffer
165 /// registered with [`FixedBufRegistry`].
166 ///
167 /// [`write`]: Self::write
168 /// [`FixedBufRegistry`]: crate::buf::fixed::FixedBufRegistry
169 ///
170 /// # Errors
171 ///
172 /// In addition to errors that can be reported by `write`,
173 /// this operation fails if the buffer is not registered in the
174 /// current `tokio-uring` runtime.
175 pub async fn write_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
176 where
177 T: BoundedBuf<Buf = FixedBuf>,
178 {
179 self.inner.write_fixed(buf).await
180 }
181
182 /// Attempts to write an entire buffer to the stream.
183 ///
184 /// This method will continuously call [`write_fixed`] until there is no more data to be
185 /// written or an error is returned. This method will not return until the entire
186 /// buffer has been successfully written or an error has occurred.
187 ///
188 /// If the buffer contains no data, this will never call [`write_fixed`].
189 ///
190 /// # Errors
191 ///
192 /// This function will return the first error that [`write_fixed`] returns.
193 ///
194 /// [`write_fixed`]: Self::write_fixed
195 pub async fn write_fixed_all<T>(&self, buf: T) -> crate::BufResult<(), T>
196 where
197 T: BoundedBuf<Buf = FixedBuf>,
198 {
199 self.inner.write_fixed_all(buf).await
200 }
201
202 /// Writes data from multiple buffers into this socket using the scatter/gather IO style.
203 ///
204 /// This function will attempt to write the entire contents of `bufs`, but
205 /// the entire write may not succeed, or the write may also generate an
206 /// error. The bytes will be written starting at the specified offset.
207 ///
208 /// # Return
209 ///
210 /// The method returns the operation result and the same array of buffers
211 /// passed in as an argument. A return value of `0` typically means that the
212 /// underlying socket is no longer able to accept bytes and will likely not
213 /// be able to in the future as well, or that the buffer provided is empty.
214 ///
215 /// # Errors
216 ///
217 /// Each call to `write` may generate an I/O error indicating that the
218 /// operation could not be completed. If an error is returned then no bytes
219 /// in the buffer were written to this writer.
220 ///
221 /// It is **not** considered an error if the entire buffer could not be
222 /// written to this writer.
223 ///
224 /// [`Ok(n)`]: Ok
225 pub async fn writev<T: BoundedBuf>(&self, buf: Vec<T>) -> crate::BufResult<usize, Vec<T>> {
226 self.inner.writev(buf).await
227 }
228
229 /// Shuts down the read, write, or both halves of this connection.
230 ///
231 /// This function will cause all pending and future I/O on the specified portions to return
232 /// immediately with an appropriate value.
233 pub fn shutdown(&self, how: std::net::Shutdown) -> io::Result<()> {
234 self.inner.shutdown(how)
235 }
236
237 /// Sets the value of the TCP_NODELAY option on this socket.
238 ///
239 /// If set, this option disables the Nagle algorithm. This means that segments are always sent
240 /// as soon as possible, even if there is only a small amount of data. When not set, data is
241 /// buffered until there is a sufficient amount to send out, thereby avoiding the frequent
242 /// sending of small packets.
243 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
244 self.inner.set_nodelay(nodelay)
245 }
246}
247
248impl FromRawFd for TcpStream {
249 unsafe fn from_raw_fd(fd: RawFd) -> Self {
250 TcpStream::from_socket(Socket::from_shared_fd(SharedFd::new(fd)))
251 }
252}
253
254impl AsRawFd for TcpStream {
255 fn as_raw_fd(&self) -> RawFd {
256 self.inner.as_raw_fd()
257 }
258}