Skip to main content

veilid_tools/ipc/ipc_tokio/
unix.rs

1use crate::*;
2use futures_util::AsyncRead as FuturesAsyncRead;
3use futures_util::AsyncWrite as FuturesAsyncWrite;
4use futures_util::Stream;
5use std::path::PathBuf;
6use std::{io, path::Path};
7use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
8use tokio::net::{UnixListener, UnixStream};
9use tokio_stream::wrappers::UnixListenerStream;
10/////////////////////////////////////////////////////////////
11
12pub struct IpcStream {
13    internal: UnixStream,
14}
15
16impl IpcStream {
17    pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<IpcStream> {
18        Ok(IpcStream {
19            internal: UnixStream::connect(path).await?,
20        })
21    }
22}
23
24impl FuturesAsyncRead for IpcStream {
25    fn poll_read(
26        mut self: std::pin::Pin<&mut Self>,
27        cx: &mut std::task::Context<'_>,
28        buf: &mut [u8],
29    ) -> std::task::Poll<io::Result<usize>> {
30        let mut rb = ReadBuf::new(buf);
31        match <UnixStream as AsyncRead>::poll_read(
32            std::pin::Pin::new(&mut self.internal),
33            cx,
34            &mut rb,
35        ) {
36            std::task::Poll::Ready(r) => std::task::Poll::Ready(r.map(|_| rb.filled().len())),
37            std::task::Poll::Pending => std::task::Poll::Pending,
38        }
39    }
40}
41
42impl FuturesAsyncWrite for IpcStream {
43    fn poll_write(
44        mut self: std::pin::Pin<&mut Self>,
45        cx: &mut std::task::Context<'_>,
46        buf: &[u8],
47    ) -> std::task::Poll<io::Result<usize>> {
48        <UnixStream as AsyncWrite>::poll_write(std::pin::Pin::new(&mut self.internal), cx, buf)
49    }
50
51    fn poll_flush(
52        mut self: std::pin::Pin<&mut Self>,
53        cx: &mut std::task::Context<'_>,
54    ) -> std::task::Poll<io::Result<()>> {
55        <UnixStream as AsyncWrite>::poll_flush(std::pin::Pin::new(&mut self.internal), cx)
56    }
57
58    fn poll_close(
59        mut self: std::pin::Pin<&mut Self>,
60        cx: &mut std::task::Context<'_>,
61    ) -> std::task::Poll<io::Result<()>> {
62        <UnixStream as AsyncWrite>::poll_shutdown(std::pin::Pin::new(&mut self.internal), cx)
63    }
64}
65
66/////////////////////////////////////////////////////////////
67
68pub struct IpcIncoming<'a> {
69    path: PathBuf,
70    internal: UnixListenerStream,
71    phantom: std::marker::PhantomData<&'a ()>,
72}
73
74impl Stream for IpcIncoming<'_> {
75    type Item = io::Result<IpcStream>;
76
77    fn poll_next(
78        mut self: std::pin::Pin<&mut Self>,
79        cx: &mut std::task::Context<'_>,
80    ) -> std::task::Poll<Option<Self::Item>> {
81        match <UnixListenerStream as Stream>::poll_next(std::pin::Pin::new(&mut self.internal), cx)
82        {
83            std::task::Poll::Ready(ro) => {
84                std::task::Poll::Ready(ro.map(|rr| rr.map(|s| IpcStream { internal: s })))
85            }
86            std::task::Poll::Pending => std::task::Poll::Pending,
87        }
88    }
89}
90
91impl Drop for IpcIncoming<'_> {
92    fn drop(&mut self) {
93        // Clean up IPC path
94        if let Err(e) = std::fs::remove_file(&self.path) {
95            warn!("Unable to remove IPC socket: {}", e);
96        }
97    }
98}
99
100/////////////////////////////////////////////////////////////
101
102pub struct IpcListener {
103    path: Option<PathBuf>,
104    internal: Option<Arc<UnixListener>>,
105}
106
107impl IpcListener {
108    /// Creates a new `IpcListener` bound to the specified path.
109    #[expect(clippy::unused_async)]
110    pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<Self> {
111        Ok(Self {
112            path: Some(path.as_ref().to_path_buf()),
113            internal: Some(Arc::new(UnixListener::bind(path)?)),
114        })
115    }
116
117    /// Accepts a new incoming connection to this listener.
118    #[must_use]
119    pub fn accept(&self) -> PinBoxFutureStatic<io::Result<IpcStream>> {
120        if self.path.is_none() {
121            return Box::pin(std::future::ready(Err(io::Error::from(
122                io::ErrorKind::NotConnected,
123            ))));
124        }
125        let this = IpcListener {
126            path: self.path.clone(),
127            internal: self.internal.clone(),
128        };
129        Box::pin(async move {
130            Ok(IpcStream {
131                internal: this.internal.as_ref().unwrap_or_log().accept().await?.0,
132            })
133        })
134    }
135
136    /// Returns a stream of incoming connections.
137    pub fn incoming(&mut self) -> io::Result<IpcIncoming<'_>> {
138        if self.path.is_none() {
139            return Err(io::Error::from(io::ErrorKind::NotConnected));
140        }
141        Ok(IpcIncoming {
142            path: self.path.take().unwrap_or_log(),
143            internal: UnixListenerStream::new(
144                Arc::into_inner(self.internal.take().unwrap_or_log()).unwrap_or_log(),
145            ),
146            phantom: std::marker::PhantomData,
147        })
148    }
149}
150
151impl Drop for IpcListener {
152    fn drop(&mut self) {
153        // Clean up IPC path
154        if let Some(path) = &self.path {
155            if let Err(e) = std::fs::remove_file(path) {
156                warn!("Unable to remove IPC socket: {}", e);
157            }
158        }
159    }
160}