procwire_client/transport/
pipe.rs1use crate::error::Result;
17use tokio::io::{AsyncRead, AsyncWrite};
18
19pub fn generate_pipe_path() -> String {
25 let pid = std::process::id();
26 let rand: u64 = rand_u64();
27
28 #[cfg(unix)]
29 {
30 format!("/tmp/procwire-{}-{:x}.sock", pid, rand)
31 }
32
33 #[cfg(windows)]
34 {
35 format!(r"\\.\pipe\procwire-{}-{:x}", pid, rand)
36 }
37}
38
39fn rand_u64() -> u64 {
41 use std::time::{SystemTime, UNIX_EPOCH};
42
43 let nanos = SystemTime::now()
44 .duration_since(UNIX_EPOCH)
45 .map(|d| d.as_nanos() as u64)
46 .unwrap_or(0);
47
48 let pid = std::process::id() as u64;
50 nanos.wrapping_mul(0x517cc1b727220a95) ^ pid
51}
52
53#[cfg(unix)]
58mod unix_impl {
59 use super::*;
60 use std::path::Path;
61 use tokio::net::{UnixListener, UnixStream};
62
63 pub struct PipeListener {
65 listener: UnixListener,
66 path: String,
67 }
68
69 pub struct PipeStream {
71 stream: UnixStream,
72 }
73
74 pub struct PipeCleanup {
76 path: String,
77 }
78
79 impl Drop for PipeCleanup {
80 fn drop(&mut self) {
81 let _ = std::fs::remove_file(&self.path);
82 }
83 }
84
85 impl PipeListener {
86 pub async fn bind(path: &str) -> Result<Self> {
90 if Path::new(path).exists() {
92 std::fs::remove_file(path)?;
93 }
94
95 let listener = UnixListener::bind(path)?;
96
97 Ok(Self {
98 listener,
99 path: path.to_string(),
100 })
101 }
102
103 pub async fn accept(&self) -> Result<PipeStream> {
107 let (stream, _addr) = self.listener.accept().await?;
108 Ok(PipeStream { stream })
109 }
110
111 pub fn path(&self) -> &str {
113 &self.path
114 }
115
116 pub fn cleanup_guard(&self) -> PipeCleanup {
118 PipeCleanup {
119 path: self.path.clone(),
120 }
121 }
122 }
123
124 impl Drop for PipeListener {
125 fn drop(&mut self) {
126 let _ = std::fs::remove_file(&self.path);
128 }
129 }
130
131 impl PipeStream {
132 pub fn into_split(self) -> (impl AsyncRead, impl AsyncWrite) {
134 self.stream.into_split()
135 }
136
137 pub fn inner(&self) -> &UnixStream {
139 &self.stream
140 }
141
142 pub fn inner_mut(&mut self) -> &mut UnixStream {
144 &mut self.stream
145 }
146 }
147
148 impl AsyncRead for PipeStream {
149 fn poll_read(
150 mut self: std::pin::Pin<&mut Self>,
151 cx: &mut std::task::Context<'_>,
152 buf: &mut tokio::io::ReadBuf<'_>,
153 ) -> std::task::Poll<std::io::Result<()>> {
154 std::pin::Pin::new(&mut self.stream).poll_read(cx, buf)
155 }
156 }
157
158 impl AsyncWrite for PipeStream {
159 fn poll_write(
160 mut self: std::pin::Pin<&mut Self>,
161 cx: &mut std::task::Context<'_>,
162 buf: &[u8],
163 ) -> std::task::Poll<std::io::Result<usize>> {
164 std::pin::Pin::new(&mut self.stream).poll_write(cx, buf)
165 }
166
167 fn poll_flush(
168 mut self: std::pin::Pin<&mut Self>,
169 cx: &mut std::task::Context<'_>,
170 ) -> std::task::Poll<std::io::Result<()>> {
171 std::pin::Pin::new(&mut self.stream).poll_flush(cx)
172 }
173
174 fn poll_shutdown(
175 mut self: std::pin::Pin<&mut Self>,
176 cx: &mut std::task::Context<'_>,
177 ) -> std::task::Poll<std::io::Result<()>> {
178 std::pin::Pin::new(&mut self.stream).poll_shutdown(cx)
179 }
180 }
181}
182
183#[cfg(windows)]
188mod windows_impl {
189 use super::*;
190 use tokio::net::windows::named_pipe::{NamedPipeServer, ServerOptions};
191
192 pub struct PipeListener {
194 path: String,
195 }
196
197 pub struct PipeStream {
199 pipe: NamedPipeServer,
200 }
201
202 pub struct PipeCleanup {
204 _path: String,
205 }
206
207 impl Drop for PipeCleanup {
208 fn drop(&mut self) {
209 }
211 }
212
213 impl PipeListener {
214 pub async fn bind(path: &str) -> Result<Self> {
216 let _ = ServerOptions::new()
218 .first_pipe_instance(true)
219 .create(path)
220 .map_err(ProcwireError::Io)?;
221
222 Ok(Self {
223 path: path.to_string(),
224 })
225 }
226
227 pub async fn accept(&self) -> Result<PipeStream> {
229 let server = ServerOptions::new()
230 .first_pipe_instance(false)
231 .create(&self.path)
232 .map_err(ProcwireError::Io)?;
233
234 server.connect().await?;
235
236 Ok(PipeStream { pipe: server })
237 }
238
239 pub fn path(&self) -> &str {
241 &self.path
242 }
243
244 pub fn cleanup_guard(&self) -> PipeCleanup {
246 PipeCleanup {
247 _path: self.path.clone(),
248 }
249 }
250 }
251
252 impl PipeStream {
253 pub fn into_split(self) -> (impl AsyncRead, impl AsyncWrite) {
255 tokio::io::split(self)
256 }
257
258 pub fn inner(&self) -> &NamedPipeServer {
260 &self.pipe
261 }
262
263 pub fn inner_mut(&mut self) -> &mut NamedPipeServer {
265 &mut self.pipe
266 }
267 }
268
269 impl AsyncRead for PipeStream {
270 fn poll_read(
271 mut self: std::pin::Pin<&mut Self>,
272 cx: &mut std::task::Context<'_>,
273 buf: &mut tokio::io::ReadBuf<'_>,
274 ) -> std::task::Poll<std::io::Result<()>> {
275 std::pin::Pin::new(&mut self.pipe).poll_read(cx, buf)
276 }
277 }
278
279 impl AsyncWrite for PipeStream {
280 fn poll_write(
281 mut self: std::pin::Pin<&mut Self>,
282 cx: &mut std::task::Context<'_>,
283 buf: &[u8],
284 ) -> std::task::Poll<std::io::Result<usize>> {
285 std::pin::Pin::new(&mut self.pipe).poll_write(cx, buf)
286 }
287
288 fn poll_flush(
289 mut self: std::pin::Pin<&mut Self>,
290 cx: &mut std::task::Context<'_>,
291 ) -> std::task::Poll<std::io::Result<()>> {
292 std::pin::Pin::new(&mut self.pipe).poll_flush(cx)
293 }
294
295 fn poll_shutdown(
296 mut self: std::pin::Pin<&mut Self>,
297 cx: &mut std::task::Context<'_>,
298 ) -> std::task::Poll<std::io::Result<()>> {
299 std::pin::Pin::new(&mut self.pipe).poll_shutdown(cx)
300 }
301 }
302}
303
304#[cfg(unix)]
309pub use unix_impl::{PipeCleanup, PipeListener, PipeStream};
310
311#[cfg(windows)]
312pub use windows_impl::{PipeCleanup, PipeListener, PipeStream};
313
314pub async fn create_pipe_listener(path: &str) -> Result<PipeListener> {
316 PipeListener::bind(path).await
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322
323 #[test]
324 fn test_generate_pipe_path_format() {
325 let path = generate_pipe_path();
326
327 #[cfg(unix)]
328 {
329 assert!(path.starts_with("/tmp/procwire-"));
330 assert!(path.ends_with(".sock"));
331 }
332
333 #[cfg(windows)]
334 {
335 assert!(path.starts_with(r"\\.\pipe\procwire-"));
336 }
337 }
338
339 #[test]
340 fn test_generate_pipe_path_uniqueness() {
341 let paths: Vec<String> = (0..10).map(|_| generate_pipe_path()).collect();
343
344 for (i, p1) in paths.iter().enumerate() {
345 for (j, p2) in paths.iter().enumerate() {
346 if i != j {
347 assert_ne!(p1, p2, "Paths should be unique");
350 }
351 }
352 }
353 }
354
355 #[test]
356 fn test_pipe_path_contains_pid() {
357 let path = generate_pipe_path();
358 let pid = std::process::id().to_string();
359 assert!(path.contains(&pid), "Path should contain PID");
360 }
361
362 }