tokio_uring/net/unix/
stream.rs

1use crate::{
2    buf::fixed::FixedBuf,
3    buf::{BoundedBuf, BoundedBufMut},
4    io::{SharedFd, Socket},
5    UnsubmittedWrite,
6};
7use socket2::SockAddr;
8use std::{
9    io,
10    os::unix::prelude::{AsRawFd, FromRawFd, RawFd},
11    path::Path,
12};
13
14/// A Unix stream between two local sockets on a Unix OS.
15///
16/// A Unix stream can either be created by connecting to an endpoint, via the
17/// [`connect`] method, or by [`accepting`] a connection from a [`listener`].
18///
19/// # Examples
20///
21/// ```no_run
22/// use tokio_uring::net::UnixStream;
23/// use std::net::ToSocketAddrs;
24///
25/// fn main() -> std::io::Result<()> {
26///     tokio_uring::start(async {
27///         // Connect to a peer
28///         let mut stream = UnixStream::connect("/tmp/tokio-uring-unix-test.sock").await?;
29///
30///         // Write some data.
31///         let (result, _) = stream.write(b"hello world!".as_slice()).submit().await;
32///         result.unwrap();
33///
34///         Ok(())
35///     })
36/// }
37/// ```
38///
39/// [`connect`]: UnixStream::connect
40/// [`accepting`]: crate::net::UnixListener::accept
41/// [`listener`]: crate::net::UnixListener
42pub struct UnixStream {
43    pub(super) inner: Socket,
44}
45
46impl UnixStream {
47    /// Opens a Unix connection to the specified file path. There must be a
48    /// `UnixListener` or equivalent listening on the corresponding Unix domain socket
49    /// to successfully connect and return a `UnixStream`.
50    pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
51        let socket = Socket::new_unix(libc::SOCK_STREAM)?;
52        socket.connect(SockAddr::unix(path)?).await?;
53        let unix_stream = UnixStream { inner: socket };
54        Ok(unix_stream)
55    }
56
57    /// Creates new `UnixStream` from a previously bound `std::os::unix::net::UnixStream`.
58    ///
59    /// This function is intended to be used to wrap a TCP stream from the
60    /// standard library in the tokio-uring equivalent. The conversion assumes nothing
61    /// about the underlying socket; it is left up to the user to decide what socket
62    /// options are appropriate for their use case.
63    ///
64    /// This can be used in conjunction with socket2's `Socket` interface to
65    /// configure a socket before it's handed off, such as setting options like
66    /// `reuse_address` or binding to multiple addresses.
67    pub fn from_std(socket: std::os::unix::net::UnixStream) -> UnixStream {
68        let inner = Socket::from_std(socket);
69        Self { inner }
70    }
71
72    pub(crate) fn from_socket(inner: Socket) -> Self {
73        Self { inner }
74    }
75
76    /// Read some data from the stream into the buffer, returning the original buffer and
77    /// quantity of data read.
78    pub async fn read<T: BoundedBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
79        self.inner.read(buf).await
80    }
81
82    /// Like [`read`], but using a pre-mapped buffer
83    /// registered with [`FixedBufRegistry`].
84    ///
85    /// [`read`]: Self::read
86    /// [`FixedBufRegistry`]: crate::buf::fixed::FixedBufRegistry
87    ///
88    /// # Errors
89    ///
90    /// In addition to errors that can be reported by `read`,
91    /// this operation fails if the buffer is not registered in the
92    /// current `tokio-uring` runtime.
93    pub async fn read_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
94    where
95        T: BoundedBufMut<BufMut = FixedBuf>,
96    {
97        self.inner.read_fixed(buf).await
98    }
99
100    /// Write some data to the stream from the buffer, returning the original buffer and
101    /// quantity of data written.
102    pub fn write<T: BoundedBuf>(&self, buf: T) -> UnsubmittedWrite<T> {
103        self.inner.write(buf)
104    }
105
106    /// Attempts to write an entire buffer to the stream.
107    ///
108    /// This method will continuously call [`write`] until there is no more data to be
109    /// written or an error is returned. This method will not return until the entire
110    /// buffer has been successfully written or an error has occurred.
111    ///
112    /// If the buffer contains no data, this will never call [`write`].
113    ///
114    /// # Errors
115    ///
116    /// This function will return the first error that [`write`] returns.
117    ///
118    /// [`write`]: Self::write
119    pub async fn write_all<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<(), T> {
120        self.inner.write_all(buf).await
121    }
122
123    /// Like [`write`], but using a pre-mapped buffer
124    /// registered with [`FixedBufRegistry`].
125    ///
126    /// [`write`]: Self::write
127    /// [`FixedBufRegistry`]: crate::buf::fixed::FixedBufRegistry
128    ///
129    /// # Errors
130    ///
131    /// In addition to errors that can be reported by `write`,
132    /// this operation fails if the buffer is not registered in the
133    /// current `tokio-uring` runtime.
134    pub async fn write_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
135    where
136        T: BoundedBuf<Buf = FixedBuf>,
137    {
138        self.inner.write_fixed(buf).await
139    }
140
141    /// Attempts to write an entire buffer to the stream.
142    ///
143    /// This method will continuously call [`write_fixed`] until there is no more data to be
144    /// written or an error is returned. This method will not return until the entire
145    /// buffer has been successfully written or an error has occurred.
146    ///
147    /// If the buffer contains no data, this will never call [`write_fixed`].
148    ///
149    /// # Errors
150    ///
151    /// This function will return the first error that [`write_fixed`] returns.
152    ///
153    /// [`write_fixed`]: Self::write
154    pub async fn write_fixed_all<T>(&self, buf: T) -> crate::BufResult<(), T>
155    where
156        T: BoundedBuf<Buf = FixedBuf>,
157    {
158        self.inner.write_fixed_all(buf).await
159    }
160
161    /// Write data from buffers into this socket returning how many bytes were
162    /// written.
163    ///
164    /// This function will attempt to write the entire contents of `bufs`, but
165    /// the entire write may not succeed, or the write may also generate an
166    /// error. The bytes will be written starting at the specified offset.
167    ///
168    /// # Return
169    ///
170    /// The method returns the operation result and the same array of buffers
171    /// passed in as an argument. A return value of `0` typically means that the
172    /// underlying socket is no longer able to accept bytes and will likely not
173    /// be able to in the future as well, or that the buffer provided is empty.
174    ///
175    /// # Errors
176    ///
177    /// Each call to `write` may generate an I/O error indicating that the
178    /// operation could not be completed. If an error is returned then no bytes
179    /// in the buffer were written to this writer.
180    ///
181    /// It is **not** considered an error if the entire buffer could not be
182    /// written to this writer.
183    ///
184    /// [`Ok(n)`]: Ok
185    pub async fn writev<T: BoundedBuf>(&self, buf: Vec<T>) -> crate::BufResult<usize, Vec<T>> {
186        self.inner.writev(buf).await
187    }
188
189    /// Shuts down the read, write, or both halves of this connection.
190    ///
191    /// This function will cause all pending and future I/O on the specified portions to return
192    /// immediately with an appropriate value.
193    pub fn shutdown(&self, how: std::net::Shutdown) -> io::Result<()> {
194        self.inner.shutdown(how)
195    }
196}
197
198impl FromRawFd for UnixStream {
199    unsafe fn from_raw_fd(fd: RawFd) -> Self {
200        UnixStream::from_socket(Socket::from_shared_fd(SharedFd::new(fd)))
201    }
202}
203
204impl AsRawFd for UnixStream {
205    fn as_raw_fd(&self) -> RawFd {
206        self.inner.as_raw_fd()
207    }
208}