1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
//! IPC primitives (feature-gated)
//!
//! Minimal scaffold for cross-platform IPC channels used for control and health.
//! Currently implements a Tokio Unix socket server/client on Unix.
#[cfg(unix)]
/// Unix-specific IPC primitives implemented with Tokio Unix sockets.
pub mod unix {
use std::io;
use std::os::unix::fs::FileTypeExt;
use std::path::Path;
use tokio::net::{UnixListener, UnixStream};
/// Bind a Unix domain socket at the given path.
///
/// # Security
///
/// Attempts to atomically remove stale sockets and bind. However, there's still
/// a small TOCTOU window. For maximum security, ensure the socket directory
/// is only writable by the daemon process.
///
/// # Errors
///
/// Returns an error if the socket file cannot be removed or if binding to the
/// provided path fails.
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
let path_ref = path.as_ref();
// First attempt: try to bind directly
match UnixListener::bind(path_ref) {
Ok(listener) => return Ok(listener),
Err(e) if e.kind() == io::ErrorKind::AddrInUse => {
// Address in use - check if it's a stale socket
}
Err(e) => return Err(e),
}
// Validate the existing file is a socket (not symlink or other file type)
match tokio::fs::symlink_metadata(path_ref).await {
Ok(metadata) => {
let file_type = metadata.file_type();
if file_type.is_symlink() {
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"IPC path exists and is a symlink (potential security risk)",
));
}
if !file_type.is_socket() {
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"IPC path exists and is not a Unix socket",
));
}
// It's a socket - try to remove it
tokio::fs::remove_file(path_ref).await?;
}
Err(_) => {
// File doesn't exist or can't be accessed
return Err(io::Error::new(
io::ErrorKind::AddrInUse,
"Socket address in use but cannot verify file type",
));
}
}
// Try binding again after removal
UnixListener::bind(path_ref)
}
/// Connect to a Unix domain socket at the given path.
///
/// # Errors
///
/// Returns an error if the connection to the provided socket path fails.
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
UnixStream::connect(path).await
}
}
/// Windows-specific IPC primitives implemented with Tokio named pipes.
#[cfg(windows)]
pub mod windows {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::windows::named_pipe::{
ClientOptions, NamedPipeClient, NamedPipeServer, ServerOptions,
};
/// Create a new named pipe server at the given pipe name (e.g., \\?\pipe\proc-daemon).
///
/// Returns a server handle that can `connect().await` to wait for a client.
///
/// # Errors
///
/// Returns an error if the named pipe cannot be created (e.g., name in use,
/// invalid pipe name, or insufficient privileges).
pub fn create_server<S: AsRef<str>>(name: S) -> std::io::Result<NamedPipeServer> {
ServerOptions::new()
.first_pipe_instance(true)
.create(name.as_ref())
}
/// Wait asynchronously for a client to connect to the given server instance.
///
/// # Errors
///
/// Returns an error if the underlying pipe handle reports a connection failure.
pub async fn server_connect(server: &NamedPipeServer) -> std::io::Result<()> {
server.connect().await
}
/// Create a new named pipe client and connect to the given pipe name.
///
/// # Errors
///
/// Returns an error if the pipe cannot be opened (e.g., not found or busy).
pub fn connect<S: AsRef<str>>(name: S) -> std::io::Result<NamedPipeClient> {
ClientOptions::new().open(name.as_ref())
}
/// Simple echo handler demonstrating async read/write on a server connection.
///
/// # Errors
///
/// Returns an error if reading from or writing to the pipe fails.
pub async fn echo_once(mut server: NamedPipeServer) -> std::io::Result<()> {
let mut buf = [0u8; 1024];
let n = server.read(&mut buf).await?;
if n > 0 {
server.write_all(&buf[..n]).await?;
}
server.flush().await?;
Ok(())
}
}