veilid_tools/ipc/ipc_tokio/
unix.rs1use crate::*;
2use futures_util::AsyncRead as FuturesAsyncRead;
3use futures_util::AsyncWrite as FuturesAsyncWrite;
4use futures_util::Stream;
5use std::path::PathBuf;
6use std::{io, path::Path};
7use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
8use tokio::net::{UnixListener, UnixStream};
9use tokio_stream::wrappers::UnixListenerStream;
10pub struct IpcStream {
13 internal: UnixStream,
14}
15
16impl IpcStream {
17 pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<IpcStream> {
18 Ok(IpcStream {
19 internal: UnixStream::connect(path).await?,
20 })
21 }
22}
23
24impl FuturesAsyncRead for IpcStream {
25 fn poll_read(
26 mut self: std::pin::Pin<&mut Self>,
27 cx: &mut std::task::Context<'_>,
28 buf: &mut [u8],
29 ) -> std::task::Poll<io::Result<usize>> {
30 let mut rb = ReadBuf::new(buf);
31 match <UnixStream as AsyncRead>::poll_read(
32 std::pin::Pin::new(&mut self.internal),
33 cx,
34 &mut rb,
35 ) {
36 std::task::Poll::Ready(r) => std::task::Poll::Ready(r.map(|_| rb.filled().len())),
37 std::task::Poll::Pending => std::task::Poll::Pending,
38 }
39 }
40}
41
42impl FuturesAsyncWrite for IpcStream {
43 fn poll_write(
44 mut self: std::pin::Pin<&mut Self>,
45 cx: &mut std::task::Context<'_>,
46 buf: &[u8],
47 ) -> std::task::Poll<io::Result<usize>> {
48 <UnixStream as AsyncWrite>::poll_write(std::pin::Pin::new(&mut self.internal), cx, buf)
49 }
50
51 fn poll_flush(
52 mut self: std::pin::Pin<&mut Self>,
53 cx: &mut std::task::Context<'_>,
54 ) -> std::task::Poll<io::Result<()>> {
55 <UnixStream as AsyncWrite>::poll_flush(std::pin::Pin::new(&mut self.internal), cx)
56 }
57
58 fn poll_close(
59 mut self: std::pin::Pin<&mut Self>,
60 cx: &mut std::task::Context<'_>,
61 ) -> std::task::Poll<io::Result<()>> {
62 <UnixStream as AsyncWrite>::poll_shutdown(std::pin::Pin::new(&mut self.internal), cx)
63 }
64}
65
66pub struct IpcIncoming<'a> {
69 path: PathBuf,
70 internal: UnixListenerStream,
71 phantom: std::marker::PhantomData<&'a ()>,
72}
73
74impl<'a> Stream for IpcIncoming<'a> {
75 type Item = io::Result<IpcStream>;
76
77 fn poll_next(
78 mut self: std::pin::Pin<&mut Self>,
79 cx: &mut std::task::Context<'_>,
80 ) -> std::task::Poll<Option<Self::Item>> {
81 match <UnixListenerStream as Stream>::poll_next(std::pin::Pin::new(&mut self.internal), cx)
82 {
83 std::task::Poll::Ready(ro) => {
84 std::task::Poll::Ready(ro.map(|rr| rr.map(|s| IpcStream { internal: s })))
85 }
86 std::task::Poll::Pending => std::task::Poll::Pending,
87 }
88 }
89}
90
91impl<'a> Drop for IpcIncoming<'a> {
92 fn drop(&mut self) {
93 if let Err(e) = std::fs::remove_file(&self.path) {
95 warn!("Unable to remove IPC socket: {}", e);
96 }
97 }
98}
99
100pub struct IpcListener {
103 path: Option<PathBuf>,
104 internal: Option<Arc<UnixListener>>,
105}
106
107impl IpcListener {
108 pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<Self> {
110 Ok(Self {
111 path: Some(path.as_ref().to_path_buf()),
112 internal: Some(Arc::new(UnixListener::bind(path)?)),
113 })
114 }
115
116 pub fn accept(&self) -> SendPinBoxFuture<io::Result<IpcStream>> {
118 if self.path.is_none() {
119 return Box::pin(std::future::ready(Err(io::Error::from(
120 io::ErrorKind::NotConnected,
121 ))));
122 }
123 let this = IpcListener {
124 path: self.path.clone(),
125 internal: self.internal.clone(),
126 };
127 Box::pin(async move {
128 Ok(IpcStream {
129 internal: this.internal.as_ref().unwrap().accept().await?.0,
130 })
131 })
132 }
133
134 pub fn incoming(&mut self) -> io::Result<IpcIncoming<'_>> {
136 if self.path.is_none() {
137 return Err(io::Error::from(io::ErrorKind::NotConnected));
138 }
139 Ok(IpcIncoming {
140 path: self.path.take().unwrap(),
141 internal: UnixListenerStream::new(
142 Arc::into_inner(self.internal.take().unwrap()).unwrap(),
143 ),
144 phantom: std::marker::PhantomData,
145 })
146 }
147}
148
149impl Drop for IpcListener {
150 fn drop(&mut self) {
151 if let Some(path) = &self.path {
153 if let Err(e) = std::fs::remove_file(path) {
154 warn!("Unable to remove IPC socket: {}", e);
155 }
156 }
157 }
158}