veilid_tools/ipc/ipc_tokio/
unix.rs

1use crate::*;
2use futures_util::AsyncRead as FuturesAsyncRead;
3use futures_util::AsyncWrite as FuturesAsyncWrite;
4use futures_util::Stream;
5use std::path::PathBuf;
6use std::{io, path::Path};
7use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
8use tokio::net::{UnixListener, UnixStream};
9use tokio_stream::wrappers::UnixListenerStream;
10/////////////////////////////////////////////////////////////
11
12pub struct IpcStream {
13    internal: UnixStream,
14}
15
16impl IpcStream {
17    pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<IpcStream> {
18        Ok(IpcStream {
19            internal: UnixStream::connect(path).await?,
20        })
21    }
22}
23
24impl FuturesAsyncRead for IpcStream {
25    fn poll_read(
26        mut self: std::pin::Pin<&mut Self>,
27        cx: &mut std::task::Context<'_>,
28        buf: &mut [u8],
29    ) -> std::task::Poll<io::Result<usize>> {
30        let mut rb = ReadBuf::new(buf);
31        match <UnixStream as AsyncRead>::poll_read(
32            std::pin::Pin::new(&mut self.internal),
33            cx,
34            &mut rb,
35        ) {
36            std::task::Poll::Ready(r) => std::task::Poll::Ready(r.map(|_| rb.filled().len())),
37            std::task::Poll::Pending => std::task::Poll::Pending,
38        }
39    }
40}
41
42impl FuturesAsyncWrite for IpcStream {
43    fn poll_write(
44        mut self: std::pin::Pin<&mut Self>,
45        cx: &mut std::task::Context<'_>,
46        buf: &[u8],
47    ) -> std::task::Poll<io::Result<usize>> {
48        <UnixStream as AsyncWrite>::poll_write(std::pin::Pin::new(&mut self.internal), cx, buf)
49    }
50
51    fn poll_flush(
52        mut self: std::pin::Pin<&mut Self>,
53        cx: &mut std::task::Context<'_>,
54    ) -> std::task::Poll<io::Result<()>> {
55        <UnixStream as AsyncWrite>::poll_flush(std::pin::Pin::new(&mut self.internal), cx)
56    }
57
58    fn poll_close(
59        mut self: std::pin::Pin<&mut Self>,
60        cx: &mut std::task::Context<'_>,
61    ) -> std::task::Poll<io::Result<()>> {
62        <UnixStream as AsyncWrite>::poll_shutdown(std::pin::Pin::new(&mut self.internal), cx)
63    }
64}
65
66/////////////////////////////////////////////////////////////
67
68pub struct IpcIncoming<'a> {
69    path: PathBuf,
70    internal: UnixListenerStream,
71    phantom: std::marker::PhantomData<&'a ()>,
72}
73
74impl<'a> Stream for IpcIncoming<'a> {
75    type Item = io::Result<IpcStream>;
76
77    fn poll_next(
78        mut self: std::pin::Pin<&mut Self>,
79        cx: &mut std::task::Context<'_>,
80    ) -> std::task::Poll<Option<Self::Item>> {
81        match <UnixListenerStream as Stream>::poll_next(std::pin::Pin::new(&mut self.internal), cx)
82        {
83            std::task::Poll::Ready(ro) => {
84                std::task::Poll::Ready(ro.map(|rr| rr.map(|s| IpcStream { internal: s })))
85            }
86            std::task::Poll::Pending => std::task::Poll::Pending,
87        }
88    }
89}
90
91impl<'a> Drop for IpcIncoming<'a> {
92    fn drop(&mut self) {
93        // Clean up IPC path
94        if let Err(e) = std::fs::remove_file(&self.path) {
95            warn!("Unable to remove IPC socket: {}", e);
96        }
97    }
98}
99
100/////////////////////////////////////////////////////////////
101
102pub struct IpcListener {
103    path: Option<PathBuf>,
104    internal: Option<Arc<UnixListener>>,
105}
106
107impl IpcListener {
108    /// Creates a new `IpcListener` bound to the specified path.
109    pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<Self> {
110        Ok(Self {
111            path: Some(path.as_ref().to_path_buf()),
112            internal: Some(Arc::new(UnixListener::bind(path)?)),
113        })
114    }
115
116    /// Accepts a new incoming connection to this listener.
117    pub fn accept(&self) -> SendPinBoxFuture<io::Result<IpcStream>> {
118        if self.path.is_none() {
119            return Box::pin(std::future::ready(Err(io::Error::from(
120                io::ErrorKind::NotConnected,
121            ))));
122        }
123        let this = IpcListener {
124            path: self.path.clone(),
125            internal: self.internal.clone(),
126        };
127        Box::pin(async move {
128            Ok(IpcStream {
129                internal: this.internal.as_ref().unwrap().accept().await?.0,
130            })
131        })
132    }
133
134    /// Returns a stream of incoming connections.
135    pub fn incoming(&mut self) -> io::Result<IpcIncoming<'_>> {
136        if self.path.is_none() {
137            return Err(io::Error::from(io::ErrorKind::NotConnected));
138        }
139        Ok(IpcIncoming {
140            path: self.path.take().unwrap(),
141            internal: UnixListenerStream::new(
142                Arc::into_inner(self.internal.take().unwrap()).unwrap(),
143            ),
144            phantom: std::marker::PhantomData,
145        })
146    }
147}
148
149impl Drop for IpcListener {
150    fn drop(&mut self) {
151        // Clean up IPC path
152        if let Some(path) = &self.path {
153            if let Err(e) = std::fs::remove_file(path) {
154                warn!("Unable to remove IPC socket: {}", e);
155            }
156        }
157    }
158}