tipsy/
lib.rs

1//! Tokio IPC transport. Under the hood uses Unix Domain Sockets for Linux/Mac
2//! and Named Pipes for Windows.
3
4#![warn(missing_docs, missing_debug_implementations)]
5#![forbid(clippy::unwrap_used)]
6#![cfg_attr(docsrs, feature(doc_cfg))]
7#![doc = include_str!("../README.md")]
8
9#[cfg(not(windows))]
10mod unix;
11#[cfg(windows)]
12mod win;
13
14use std::io;
15use std::path::{Path, PathBuf};
16use std::pin::Pin;
17use std::task::{Context, Poll};
18
19use futures_util::Stream;
20use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
21
22mod platform {
23    #[cfg(unix)]
24    pub(crate) use crate::unix::{
25        Connection, Endpoint, IpcStream, SecurityAttributes, from_std_stream,
26    };
27    #[cfg(windows)]
28    pub(crate) use crate::win::{Connection, Endpoint, IpcStream, SecurityAttributes};
29}
30
31/// Path used for an IPC client or server.
32pub trait IntoIpcPath: Send {
33    /// Converts the object into an IPC path.
34    fn into_ipc_path(self) -> io::Result<PathBuf>;
35}
36
37impl IntoIpcPath for PathBuf {
38    fn into_ipc_path(self) -> io::Result<PathBuf> {
39        Ok(self)
40    }
41}
42
43/// How to proceed when the socket path already exists
44#[derive(Debug, PartialEq, Eq, Clone, Copy)]
45pub enum OnConflict {
46    /// Ignore the conflicting socket and continue
47    Ignore,
48    /// Throw an error when attempting to bind to the path
49    Error,
50    /// Overwrite the existing socket
51    Overwrite,
52}
53
54/// Cross-platform representation of an IPC connection path.
55///
56/// Calling [`IntoIpcPath::into_ipc_path`] on this struct will generate a platform-specific IPC
57/// path.
58///
59/// Windows: `\\.\pipe\{serverId}`
60///
61/// Mac: `$TMPDIR/{serverId}.sock`
62///
63/// Linux: `$XDG_RUNTIME_DIR/{serverId}.sock` (defaults to `$TMPDIR` if it doesn't exist)
64///
65/// The value for `serverId` can contain forward slashes, which will be interpreted as part of the
66/// path. On Windows, these will be converted to backslashes.
67///
68/// # Example
69///
70/// ```
71/// use std::env;
72///
73/// use tipsy::{IntoIpcPath, ServerId};
74///
75/// // Forcing these environment variables to ensure consistent results.
76/// // You probably don't want to do this in your application.
77/// unsafe {
78///     env::set_var("XDG_RUNTIME_DIR", "/tmp");
79///     env::set_var("TMPDIR", "/tmp");
80/// }
81///
82/// let server_id = ServerId::new("some/id");
83/// let path = server_id.into_ipc_path().unwrap();
84/// let path = path.to_string_lossy();
85///
86/// if cfg!(windows) {
87///     assert_eq!(r"\\.\pipe\some\id", path);
88/// } else {
89///     assert_eq!("/tmp/some/id.sock", path);
90/// }
91/// ```
92#[derive(Clone, Debug, PartialEq, Eq)]
93pub struct ServerId<T>
94where
95    T: Into<String> + Send,
96{
97    id: T,
98    parent_folder: Option<PathBuf>,
99}
100
101impl<T> ServerId<T>
102where
103    T: Into<String> + Send,
104{
105    /// Creates a new [`ServerId`].
106    pub fn new(id: T) -> Self {
107        Self {
108            id,
109            parent_folder: None,
110        }
111    }
112
113    /// Explicitly sets the parent folder for the socket instead of relying on the default
114    /// OS-specific behavior. This only has an effect on Unix systems.
115    ///
116    /// # Example
117    ///
118    /// ```
119    /// use tipsy::{IntoIpcPath, ServerId};
120    ///
121    /// let server_id = ServerId::new("myid").parent_folder("/home");
122    /// let path = server_id.into_ipc_path().unwrap();
123    /// let path = path.to_string_lossy();
124    ///
125    /// if cfg!(windows) {
126    ///     assert_eq!(r"\\.\pipe\myid", path);
127    /// } else {
128    ///     assert_eq!("/home/myid.sock", path);
129    /// }
130    /// ```
131    pub fn parent_folder<P>(mut self, folder: P) -> Self
132    where
133        P: Into<PathBuf>,
134    {
135        self.parent_folder = Some(folder.into());
136        self
137    }
138}
139
140impl<T> IntoIpcPath for ServerId<T>
141where
142    T: Into<String> + Send,
143{
144    fn into_ipc_path(self) -> io::Result<PathBuf> {
145        self.into_ipc_path()
146    }
147}
148
149/// Permissions and ownership for the IPC connection.
150#[derive(Debug, Clone)]
151pub struct SecurityAttributes(platform::SecurityAttributes);
152
153impl SecurityAttributes {
154    /// New default security attributes.
155    pub fn empty() -> Self {
156        Self(platform::SecurityAttributes::empty())
157    }
158
159    /// New default security attributes that allow everyone to connect.
160    ///
161    /// On Windows, this is equivalent to [`SecurityAttributes::allow_everyone_create`].
162    pub fn allow_everyone_connect() -> io::Result<Self> {
163        Ok(Self(platform::SecurityAttributes::allow_everyone_connect()?))
164    }
165
166    /// Set a custom permission on the socket.
167    ///
168    /// Has no effect on Windows.
169    pub fn mode(self, mode: u16) -> io::Result<Self> {
170        Ok(Self(self.0.mode(mode)?))
171    }
172
173    /// New default security attributes that allow everyone to create.
174    ///
175    /// On Windows, this is equivalent to [`SecurityAttributes::allow_everyone_connect`].
176    pub fn allow_everyone_create() -> io::Result<Self> {
177        Ok(Self(platform::SecurityAttributes::allow_everyone_create()?))
178    }
179}
180
181/// IPC endpoint.
182#[derive(Debug, Clone)]
183pub struct Endpoint(platform::Endpoint);
184
185impl Endpoint {
186    /// Stream of incoming connections
187    pub fn incoming(self) -> io::Result<IpcStream> {
188        Ok(IpcStream(self.0.incoming()?))
189    }
190    /// Set security attributes for the connection
191    pub fn security_attributes(mut self, security_attributes: SecurityAttributes) -> Self {
192        self.0 = self.0.security_attributes(security_attributes.0);
193        self
194    }
195    /// Returns the path of the endpoint.
196    pub fn path(&self) -> &Path {
197        self.0.path()
198    }
199    /// Make new connection using the provided path and running event pool.
200    pub async fn connect<P>(path: P) -> io::Result<Connection>
201    where
202        P: IntoIpcPath,
203    {
204        Ok(Connection(platform::Endpoint::connect(path).await?))
205    }
206
207    /// New IPC endpoint at the given path
208    pub fn new<P>(path: P, on_conflict: OnConflict) -> io::Result<Self>
209    where
210        P: IntoIpcPath,
211    {
212        Ok(Self(platform::Endpoint::new(path, on_conflict)?))
213    }
214}
215
216/// IPC connection.
217#[derive(Debug)]
218pub struct Connection(platform::Connection);
219
220impl Connection {
221    /// Create a stream from an existing [`UnixStream`](std::os::unix::net::UnixStream).
222    #[cfg(unix)]
223    pub async fn from_std_stream(stream: std::os::unix::net::UnixStream) -> io::Result<Self> {
224        Ok(Self(platform::from_std_stream(stream).await?))
225    }
226}
227
228impl AsyncRead for Connection {
229    fn poll_read(
230        self: Pin<&mut Self>,
231        ctx: &mut Context<'_>,
232        buf: &mut ReadBuf<'_>,
233    ) -> Poll<io::Result<()>> {
234        let this = Pin::into_inner(self);
235        Pin::new(&mut this.0).poll_read(ctx, buf)
236    }
237}
238
239impl AsyncWrite for Connection {
240    fn poll_write(
241        self: Pin<&mut Self>,
242        ctx: &mut Context<'_>,
243        buf: &[u8],
244    ) -> Poll<Result<usize, io::Error>> {
245        let this = Pin::into_inner(self);
246        Pin::new(&mut this.0).poll_write(ctx, buf)
247    }
248
249    fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
250        let this = Pin::into_inner(self);
251        Pin::new(&mut this.0).poll_flush(ctx)
252    }
253
254    fn poll_shutdown(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
255        let this = Pin::into_inner(self);
256        Pin::new(&mut this.0).poll_shutdown(ctx)
257    }
258}
259
260/// Stream of incoming connections.
261#[derive(Debug)]
262pub struct IpcStream(platform::IpcStream);
263
264impl IpcStream {
265    /// Create a listener from an existing [`UnixListener`](std::os::unix::net::UnixListener).
266    #[cfg(unix)]
267    pub fn from_std_listener(listener: std::os::unix::net::UnixListener) -> io::Result<Self> {
268        Ok(Self(platform::IpcStream::from_std_listener(listener)?))
269    }
270}
271
272impl Stream for IpcStream {
273    type Item = io::Result<Connection>;
274
275    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
276        let this = Pin::into_inner(self);
277        Pin::new(&mut this.0).poll_next(cx).map_ok(Connection)
278    }
279}