Skip to main content

procwire_client/transport/
pipe.rs

1//! Platform-specific pipe/socket implementation.
2//!
3//! - Unix: Unix Domain Socket
4//! - Windows: Named Pipe
5//!
6//! # Example
7//!
8//! ```ignore
9//! use procwire_client::transport::{generate_pipe_path, PipeListener};
10//!
11//! let path = generate_pipe_path();
12//! let listener = PipeListener::bind(&path).await?;
13//! let stream = listener.accept().await?;
14//! ```
15
16use crate::error::Result;
17use tokio::io::{AsyncRead, AsyncWrite};
18
19/// Generate a unique pipe path for this process.
20///
21/// Format:
22/// - Unix: `/tmp/procwire-{pid}-{random}.sock`
23/// - Windows: `\\.\pipe\procwire-{pid}-{random}`
24pub fn generate_pipe_path() -> String {
25    let pid = std::process::id();
26    let rand: u64 = rand_u64();
27
28    #[cfg(unix)]
29    {
30        format!("/tmp/procwire-{}-{:x}.sock", pid, rand)
31    }
32
33    #[cfg(windows)]
34    {
35        format!(r"\\.\pipe\procwire-{}-{:x}", pid, rand)
36    }
37}
38
39/// Simple random u64 using system time and process ID.
40fn rand_u64() -> u64 {
41    use std::time::{SystemTime, UNIX_EPOCH};
42
43    let nanos = SystemTime::now()
44        .duration_since(UNIX_EPOCH)
45        .map(|d| d.as_nanos() as u64)
46        .unwrap_or(0);
47
48    // Mix in process ID and some bit shuffling for better randomness
49    let pid = std::process::id() as u64;
50    nanos.wrapping_mul(0x517cc1b727220a95) ^ pid
51}
52
53// ============================================================================
54// Unix Implementation
55// ============================================================================
56
57#[cfg(unix)]
58mod unix_impl {
59    use super::*;
60    use std::path::Path;
61    use tokio::net::{UnixListener, UnixStream};
62
63    /// Unix Domain Socket listener.
64    pub struct PipeListener {
65        listener: UnixListener,
66        path: String,
67    }
68
69    /// Unix Domain Socket stream (connected).
70    pub struct PipeStream {
71        stream: UnixStream,
72    }
73
74    /// Cleanup guard that removes the socket file on drop.
75    pub struct PipeCleanup {
76        path: String,
77    }
78
79    impl Drop for PipeCleanup {
80        fn drop(&mut self) {
81            let _ = std::fs::remove_file(&self.path);
82        }
83    }
84
85    impl PipeListener {
86        /// Bind to a Unix socket path.
87        ///
88        /// Removes any existing socket file at the path before binding.
89        pub async fn bind(path: &str) -> Result<Self> {
90            // Remove old socket if it exists
91            if Path::new(path).exists() {
92                std::fs::remove_file(path)?;
93            }
94
95            let listener = UnixListener::bind(path)?;
96
97            Ok(Self {
98                listener,
99                path: path.to_string(),
100            })
101        }
102
103        /// Accept a single connection.
104        ///
105        /// Returns a connected `PipeStream`.
106        pub async fn accept(&self) -> Result<PipeStream> {
107            let (stream, _addr) = self.listener.accept().await?;
108            Ok(PipeStream { stream })
109        }
110
111        /// Get the socket path.
112        pub fn path(&self) -> &str {
113            &self.path
114        }
115
116        /// Create a cleanup guard that removes the socket on drop.
117        pub fn cleanup_guard(&self) -> PipeCleanup {
118            PipeCleanup {
119                path: self.path.clone(),
120            }
121        }
122    }
123
124    impl Drop for PipeListener {
125        fn drop(&mut self) {
126            // Clean up socket file when listener is dropped
127            let _ = std::fs::remove_file(&self.path);
128        }
129    }
130
131    impl PipeStream {
132        /// Split into read and write halves.
133        pub fn into_split(self) -> (impl AsyncRead, impl AsyncWrite) {
134            self.stream.into_split()
135        }
136
137        /// Get a reference to the underlying stream.
138        pub fn inner(&self) -> &UnixStream {
139            &self.stream
140        }
141
142        /// Get a mutable reference to the underlying stream.
143        pub fn inner_mut(&mut self) -> &mut UnixStream {
144            &mut self.stream
145        }
146    }
147
148    impl AsyncRead for PipeStream {
149        fn poll_read(
150            mut self: std::pin::Pin<&mut Self>,
151            cx: &mut std::task::Context<'_>,
152            buf: &mut tokio::io::ReadBuf<'_>,
153        ) -> std::task::Poll<std::io::Result<()>> {
154            std::pin::Pin::new(&mut self.stream).poll_read(cx, buf)
155        }
156    }
157
158    impl AsyncWrite for PipeStream {
159        fn poll_write(
160            mut self: std::pin::Pin<&mut Self>,
161            cx: &mut std::task::Context<'_>,
162            buf: &[u8],
163        ) -> std::task::Poll<std::io::Result<usize>> {
164            std::pin::Pin::new(&mut self.stream).poll_write(cx, buf)
165        }
166
167        fn poll_flush(
168            mut self: std::pin::Pin<&mut Self>,
169            cx: &mut std::task::Context<'_>,
170        ) -> std::task::Poll<std::io::Result<()>> {
171            std::pin::Pin::new(&mut self.stream).poll_flush(cx)
172        }
173
174        fn poll_shutdown(
175            mut self: std::pin::Pin<&mut Self>,
176            cx: &mut std::task::Context<'_>,
177        ) -> std::task::Poll<std::io::Result<()>> {
178            std::pin::Pin::new(&mut self.stream).poll_shutdown(cx)
179        }
180    }
181}
182
183// ============================================================================
184// Windows Implementation
185// ============================================================================
186
187#[cfg(windows)]
188mod windows_impl {
189    use super::*;
190    use tokio::net::windows::named_pipe::{NamedPipeServer, ServerOptions};
191
192    /// Windows Named Pipe listener.
193    pub struct PipeListener {
194        path: String,
195    }
196
197    /// Windows Named Pipe stream (connected).
198    pub struct PipeStream {
199        pipe: NamedPipeServer,
200    }
201
202    /// Cleanup guard (no-op on Windows, pipes are auto-cleaned).
203    pub struct PipeCleanup {
204        _path: String,
205    }
206
207    impl Drop for PipeCleanup {
208        fn drop(&mut self) {
209            // Windows Named Pipes are automatically cleaned up
210        }
211    }
212
213    impl PipeListener {
214        /// Create a Named Pipe server.
215        pub async fn bind(path: &str) -> Result<Self> {
216            // Verify we can create the pipe (will be created on first accept)
217            let _ = ServerOptions::new()
218                .first_pipe_instance(true)
219                .create(path)
220                .map_err(ProcwireError::Io)?;
221
222            Ok(Self {
223                path: path.to_string(),
224            })
225        }
226
227        /// Accept a single connection.
228        pub async fn accept(&self) -> Result<PipeStream> {
229            let server = ServerOptions::new()
230                .first_pipe_instance(false)
231                .create(&self.path)
232                .map_err(ProcwireError::Io)?;
233
234            server.connect().await?;
235
236            Ok(PipeStream { pipe: server })
237        }
238
239        /// Get the pipe path.
240        pub fn path(&self) -> &str {
241            &self.path
242        }
243
244        /// Create a cleanup guard (no-op on Windows).
245        pub fn cleanup_guard(&self) -> PipeCleanup {
246            PipeCleanup {
247                _path: self.path.clone(),
248            }
249        }
250    }
251
252    impl PipeStream {
253        /// Split into read and write halves.
254        pub fn into_split(self) -> (impl AsyncRead, impl AsyncWrite) {
255            tokio::io::split(self)
256        }
257
258        /// Get a reference to the underlying pipe.
259        pub fn inner(&self) -> &NamedPipeServer {
260            &self.pipe
261        }
262
263        /// Get a mutable reference to the underlying pipe.
264        pub fn inner_mut(&mut self) -> &mut NamedPipeServer {
265            &mut self.pipe
266        }
267    }
268
269    impl AsyncRead for PipeStream {
270        fn poll_read(
271            mut self: std::pin::Pin<&mut Self>,
272            cx: &mut std::task::Context<'_>,
273            buf: &mut tokio::io::ReadBuf<'_>,
274        ) -> std::task::Poll<std::io::Result<()>> {
275            std::pin::Pin::new(&mut self.pipe).poll_read(cx, buf)
276        }
277    }
278
279    impl AsyncWrite for PipeStream {
280        fn poll_write(
281            mut self: std::pin::Pin<&mut Self>,
282            cx: &mut std::task::Context<'_>,
283            buf: &[u8],
284        ) -> std::task::Poll<std::io::Result<usize>> {
285            std::pin::Pin::new(&mut self.pipe).poll_write(cx, buf)
286        }
287
288        fn poll_flush(
289            mut self: std::pin::Pin<&mut Self>,
290            cx: &mut std::task::Context<'_>,
291        ) -> std::task::Poll<std::io::Result<()>> {
292            std::pin::Pin::new(&mut self.pipe).poll_flush(cx)
293        }
294
295        fn poll_shutdown(
296            mut self: std::pin::Pin<&mut Self>,
297            cx: &mut std::task::Context<'_>,
298        ) -> std::task::Poll<std::io::Result<()>> {
299            std::pin::Pin::new(&mut self.pipe).poll_shutdown(cx)
300        }
301    }
302}
303
304// ============================================================================
305// Platform-independent re-exports
306// ============================================================================
307
308#[cfg(unix)]
309pub use unix_impl::{PipeCleanup, PipeListener, PipeStream};
310
311#[cfg(windows)]
312pub use windows_impl::{PipeCleanup, PipeListener, PipeStream};
313
314/// Create a pipe listener bound to the given path.
315pub async fn create_pipe_listener(path: &str) -> Result<PipeListener> {
316    PipeListener::bind(path).await
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322
323    #[test]
324    fn test_generate_pipe_path_format() {
325        let path = generate_pipe_path();
326
327        #[cfg(unix)]
328        {
329            assert!(path.starts_with("/tmp/procwire-"));
330            assert!(path.ends_with(".sock"));
331        }
332
333        #[cfg(windows)]
334        {
335            assert!(path.starts_with(r"\\.\pipe\procwire-"));
336        }
337    }
338
339    #[test]
340    fn test_generate_pipe_path_uniqueness() {
341        // Generate multiple paths and check they're different
342        let paths: Vec<String> = (0..10).map(|_| generate_pipe_path()).collect();
343
344        for (i, p1) in paths.iter().enumerate() {
345            for (j, p2) in paths.iter().enumerate() {
346                if i != j {
347                    // Note: In theory paths could collide if called at exact same nanosecond
348                    // but in practice this is extremely unlikely
349                    assert_ne!(p1, p2, "Paths should be unique");
350                }
351            }
352        }
353    }
354
355    #[test]
356    fn test_pipe_path_contains_pid() {
357        let path = generate_pipe_path();
358        let pid = std::process::id().to_string();
359        assert!(path.contains(&pid), "Path should contain PID");
360    }
361
362    // Integration tests for actual bind/accept would need tokio runtime
363    // and are better suited for integration tests folder
364}