tokio_uring/net/unix/stream.rs
1use crate::{
2 buf::fixed::FixedBuf,
3 buf::{BoundedBuf, BoundedBufMut},
4 io::{SharedFd, Socket},
5 UnsubmittedWrite,
6};
7use socket2::SockAddr;
8use std::{
9 io,
10 os::unix::prelude::{AsRawFd, FromRawFd, RawFd},
11 path::Path,
12};
13
14/// A Unix stream between two local sockets on a Unix OS.
15///
16/// A Unix stream can either be created by connecting to an endpoint, via the
17/// [`connect`] method, or by [`accepting`] a connection from a [`listener`].
18///
19/// # Examples
20///
21/// ```no_run
22/// use tokio_uring::net::UnixStream;
23/// use std::net::ToSocketAddrs;
24///
25/// fn main() -> std::io::Result<()> {
26/// tokio_uring::start(async {
27/// // Connect to a peer
28/// let mut stream = UnixStream::connect("/tmp/tokio-uring-unix-test.sock").await?;
29///
30/// // Write some data.
31/// let (result, _) = stream.write(b"hello world!".as_slice()).submit().await;
32/// result.unwrap();
33///
34/// Ok(())
35/// })
36/// }
37/// ```
38///
39/// [`connect`]: UnixStream::connect
40/// [`accepting`]: crate::net::UnixListener::accept
41/// [`listener`]: crate::net::UnixListener
42pub struct UnixStream {
43 pub(super) inner: Socket,
44}
45
46impl UnixStream {
47 /// Opens a Unix connection to the specified file path. There must be a
48 /// `UnixListener` or equivalent listening on the corresponding Unix domain socket
49 /// to successfully connect and return a `UnixStream`.
50 pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
51 let socket = Socket::new_unix(libc::SOCK_STREAM)?;
52 socket.connect(SockAddr::unix(path)?).await?;
53 let unix_stream = UnixStream { inner: socket };
54 Ok(unix_stream)
55 }
56
57 /// Creates new `UnixStream` from a previously bound `std::os::unix::net::UnixStream`.
58 ///
59 /// This function is intended to be used to wrap a TCP stream from the
60 /// standard library in the tokio-uring equivalent. The conversion assumes nothing
61 /// about the underlying socket; it is left up to the user to decide what socket
62 /// options are appropriate for their use case.
63 ///
64 /// This can be used in conjunction with socket2's `Socket` interface to
65 /// configure a socket before it's handed off, such as setting options like
66 /// `reuse_address` or binding to multiple addresses.
67 pub fn from_std(socket: std::os::unix::net::UnixStream) -> UnixStream {
68 let inner = Socket::from_std(socket);
69 Self { inner }
70 }
71
72 pub(crate) fn from_socket(inner: Socket) -> Self {
73 Self { inner }
74 }
75
76 /// Read some data from the stream into the buffer, returning the original buffer and
77 /// quantity of data read.
78 pub async fn read<T: BoundedBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
79 self.inner.read(buf).await
80 }
81
82 /// Like [`read`], but using a pre-mapped buffer
83 /// registered with [`FixedBufRegistry`].
84 ///
85 /// [`read`]: Self::read
86 /// [`FixedBufRegistry`]: crate::buf::fixed::FixedBufRegistry
87 ///
88 /// # Errors
89 ///
90 /// In addition to errors that can be reported by `read`,
91 /// this operation fails if the buffer is not registered in the
92 /// current `tokio-uring` runtime.
93 pub async fn read_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
94 where
95 T: BoundedBufMut<BufMut = FixedBuf>,
96 {
97 self.inner.read_fixed(buf).await
98 }
99
100 /// Write some data to the stream from the buffer, returning the original buffer and
101 /// quantity of data written.
102 pub fn write<T: BoundedBuf>(&self, buf: T) -> UnsubmittedWrite<T> {
103 self.inner.write(buf)
104 }
105
106 /// Attempts to write an entire buffer to the stream.
107 ///
108 /// This method will continuously call [`write`] until there is no more data to be
109 /// written or an error is returned. This method will not return until the entire
110 /// buffer has been successfully written or an error has occurred.
111 ///
112 /// If the buffer contains no data, this will never call [`write`].
113 ///
114 /// # Errors
115 ///
116 /// This function will return the first error that [`write`] returns.
117 ///
118 /// [`write`]: Self::write
119 pub async fn write_all<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<(), T> {
120 self.inner.write_all(buf).await
121 }
122
123 /// Like [`write`], but using a pre-mapped buffer
124 /// registered with [`FixedBufRegistry`].
125 ///
126 /// [`write`]: Self::write
127 /// [`FixedBufRegistry`]: crate::buf::fixed::FixedBufRegistry
128 ///
129 /// # Errors
130 ///
131 /// In addition to errors that can be reported by `write`,
132 /// this operation fails if the buffer is not registered in the
133 /// current `tokio-uring` runtime.
134 pub async fn write_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
135 where
136 T: BoundedBuf<Buf = FixedBuf>,
137 {
138 self.inner.write_fixed(buf).await
139 }
140
141 /// Attempts to write an entire buffer to the stream.
142 ///
143 /// This method will continuously call [`write_fixed`] until there is no more data to be
144 /// written or an error is returned. This method will not return until the entire
145 /// buffer has been successfully written or an error has occurred.
146 ///
147 /// If the buffer contains no data, this will never call [`write_fixed`].
148 ///
149 /// # Errors
150 ///
151 /// This function will return the first error that [`write_fixed`] returns.
152 ///
153 /// [`write_fixed`]: Self::write
154 pub async fn write_fixed_all<T>(&self, buf: T) -> crate::BufResult<(), T>
155 where
156 T: BoundedBuf<Buf = FixedBuf>,
157 {
158 self.inner.write_fixed_all(buf).await
159 }
160
161 /// Write data from buffers into this socket returning how many bytes were
162 /// written.
163 ///
164 /// This function will attempt to write the entire contents of `bufs`, but
165 /// the entire write may not succeed, or the write may also generate an
166 /// error. The bytes will be written starting at the specified offset.
167 ///
168 /// # Return
169 ///
170 /// The method returns the operation result and the same array of buffers
171 /// passed in as an argument. A return value of `0` typically means that the
172 /// underlying socket is no longer able to accept bytes and will likely not
173 /// be able to in the future as well, or that the buffer provided is empty.
174 ///
175 /// # Errors
176 ///
177 /// Each call to `write` may generate an I/O error indicating that the
178 /// operation could not be completed. If an error is returned then no bytes
179 /// in the buffer were written to this writer.
180 ///
181 /// It is **not** considered an error if the entire buffer could not be
182 /// written to this writer.
183 ///
184 /// [`Ok(n)`]: Ok
185 pub async fn writev<T: BoundedBuf>(&self, buf: Vec<T>) -> crate::BufResult<usize, Vec<T>> {
186 self.inner.writev(buf).await
187 }
188
189 /// Shuts down the read, write, or both halves of this connection.
190 ///
191 /// This function will cause all pending and future I/O on the specified portions to return
192 /// immediately with an appropriate value.
193 pub fn shutdown(&self, how: std::net::Shutdown) -> io::Result<()> {
194 self.inner.shutdown(how)
195 }
196}
197
198impl FromRawFd for UnixStream {
199 unsafe fn from_raw_fd(fd: RawFd) -> Self {
200 UnixStream::from_socket(Socket::from_shared_fd(SharedFd::new(fd)))
201 }
202}
203
204impl AsRawFd for UnixStream {
205 fn as_raw_fd(&self) -> RawFd {
206 self.inner.as_raw_fd()
207 }
208}