ipcez 0.1.0

Rust library for ipcez.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
//! Socket connection that selects transport from target and OS.
//!
//! Single responsibility: connect using WebSocket (remote), Unix domain socket (local + Linux),
//! or named pipe (local + Windows).
//!
//! For local transports (Unix domain socket and Windows named pipe), the socket is wrapped with
//! a liveness check every 10 ms (on the read/write path). If the peer disconnects, subsequent
//! read/write return an error so the user sees the same kind of failure as with WebSocket.
//!
//! Local transports also use **length-prefixed message framing** (4-byte big-endian length + payload)
//! so that one flush sends one message and read yields one message at a time, matching WebSocket
//! semantics. The frame format is the same for Unix and named pipe so peers can interoperate.

use std::error::Error;
use std::fmt;
use std::io;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::sync::Mutex;
use tokio_tungstenite::WebSocketStream;

use crate::connect_socket;
use crate::os::{detect_os_from_env, OsDetectionError, OsKind};
use crate::socket_async_read;
use crate::socket_async_write;
use crate::target::{detect_target_from_env, TargetDetectionError, TargetKind};

/// Error from connecting or using a socket.
#[derive(Debug)]
pub enum SocketError {
    /// I/O error (Unix, named pipe, or underlying transport).
    Io(io::Error),
    /// WebSocket handshake or protocol error.
    Ws(tokio_tungstenite::tungstenite::Error),
    /// Target/OS combination not supported on this platform (e.g. local + Linux on Windows).
    UnsupportedCombination(TargetKind, OsKind),
    /// Environment variable detection failed (`os` or `target` unset or invalid).
    Detection(DetectionError),
    /// Local transport: the recipient did not signal acknowledgment within the time limit (e.g. 5 seconds).
    RecipientAckTimeout,
}

/// Error when resolving endpoint from environment (e.g. `os` or `target` unset/invalid).
#[derive(Debug)]
pub enum DetectionError {
    /// The `os` environment variable was not set or had an invalid value.
    Os(OsDetectionError),
    /// The `target` environment variable was not set or had an invalid value.
    Target(TargetDetectionError),
}

impl fmt::Display for DetectionError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            DetectionError::Os(e) => e.fmt(f),
            DetectionError::Target(e) => e.fmt(f),
        }
    }
}

impl Error for DetectionError {
    fn source(&self) -> Option<&(dyn Error + 'static)> {
        match self {
            DetectionError::Os(e) => Some(e),
            DetectionError::Target(e) => Some(e),
        }
    }
}

impl fmt::Display for SocketError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            SocketError::Io(e) => write!(f, "socket I/O error: {}", e),
            SocketError::Ws(e) => write!(f, "WebSocket error: {}", e),
            SocketError::UnsupportedCombination(t, o) => {
                write!(
                    f,
                    "unsupported combination target={:?} os={:?} on this platform",
                    t, o
                )
            }
            SocketError::Detection(e) => write!(f, "endpoint detection failed: {}", e),
            SocketError::RecipientAckTimeout => {
                write!(f, "recipient did not acknowledge message within the time limit")
            }
        }
    }
}

impl Error for SocketError {
    fn source(&self) -> Option<&(dyn Error + 'static)> {
        match self {
            SocketError::Io(e) => Some(e),
            SocketError::Ws(e) => Some(e),
            SocketError::UnsupportedCombination(_, _) => None,
            SocketError::Detection(e) => e.source(),
            SocketError::RecipientAckTimeout => None,
        }
    }
}

impl From<io::Error> for SocketError {
    fn from(e: io::Error) -> Self {
        SocketError::Io(e)
    }
}

impl From<tokio_tungstenite::tungstenite::Error> for SocketError {
    fn from(e: tokio_tungstenite::tungstenite::Error) -> Self {
        SocketError::Ws(e)
    }
}

/// Maximum allowed message length (bytes) for local transport framing. Length prefix greater than this returns an error.
pub(crate) const MAX_MESSAGE_LEN: u32 = 4 * 1024 * 1024; // 4 MiB

/// Error returned when the peer has disconnected (same notification as WebSocket connection lost).
pub(crate) fn connection_lost_error() -> io::Error {
    io::Error::new(io::ErrorKind::ConnectionReset, "connection lost")
}

/// Wraps a Unix stream with a liveness check every 10 ms on read/write; on peer disconnect returns the same error as WebSocket.
#[cfg(unix)]
pub(crate) struct PolledUnixStream {
    pub(crate) inner: tokio::net::UnixStream,
    pub(crate) last_check: Instant,
    pub(crate) interval: Duration,
    pub(crate) disconnected: bool,
    pub(crate) peek: Option<u8>,
}

/// Wraps a named pipe client with a liveness check every 10 ms on read/write; on peer disconnect returns the same error as WebSocket.
#[cfg(windows)]
pub(crate) struct PolledNamedPipe {
    pub(crate) inner: tokio::net::windows::named_pipe::NamedPipeClient,
    pub(crate) last_check: Instant,
    pub(crate) interval: Duration,
    pub(crate) disconnected: bool,
    pub(crate) peek: Option<u8>,
}

#[cfg(unix)]
impl PolledUnixStream {
    pub(crate) fn run_liveness_check(
        &mut self,
        cx: &mut Context<'_>,
    ) -> Poll<io::Result<()>> {
        let mut one = [0u8; 1];
        let mut read_buf = ReadBuf::new(&mut one);
        match tokio::io::AsyncRead::poll_read(
            Pin::new(&mut self.inner),
            cx,
            &mut read_buf,
        ) {
            Poll::Ready(Ok(())) => {
                if read_buf.filled().is_empty() {
                    self.disconnected = true;
                    Poll::Ready(Err(connection_lost_error()))
                } else {
                    self.peek = Some(one[0]);
                    self.last_check = Instant::now();
                    Poll::Ready(Ok(()))
                }
            }
            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
            Poll::Pending => Poll::Pending,
        }
    }
}

#[cfg(windows)]
impl PolledNamedPipe {
    pub(crate) fn run_liveness_check(
        &mut self,
        cx: &mut Context<'_>,
    ) -> Poll<io::Result<()>> {
        let mut one = [0u8; 1];
        let mut read_buf = ReadBuf::new(&mut one);
        match tokio::io::AsyncRead::poll_read(Pin::new(&mut self.inner), cx, &mut read_buf) {
            Poll::Ready(Ok(())) => {
                if read_buf.filled().is_empty() {
                    self.disconnected = true;
                    Poll::Ready(Err(connection_lost_error()))
                } else {
                    self.peek = Some(one[0]);
                    self.last_check = Instant::now();
                    Poll::Ready(Ok(()))
                }
            }
            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
            Poll::Pending => Poll::Pending,
        }
    }
}

/// Wraps a byte stream with length-prefixed message framing (4-byte big-endian length + payload).
/// Used for local transports so that one flush = one message and read yields one message at a time, matching WebSocket semantics.
#[cfg(any(unix, windows))]
pub(crate) struct MessageFramed<T> {
    pub(crate) inner: T,
    pub(crate) length_buf: [u8; 4],
    pub(crate) length_filled: usize,
    pub(crate) read_buf: Option<Vec<u8>>,
    pub(crate) payload_filled: usize,
    pub(crate) read_cursor: usize,
    pub(crate) write_buf: Vec<u8>,
}

#[cfg(any(unix, windows))]
impl<T> MessageFramed<T>
where
    T: AsyncRead + AsyncWrite + Unpin,
{
    pub(crate) fn new(inner: T) -> Self {
        Self {
            inner,
            length_buf: [0; 4],
            length_filled: 0,
            read_buf: None,
            payload_filled: 0,
            read_cursor: 0,
            write_buf: Vec::new(),
        }
    }

    pub(crate) fn poll_read_fill_length(
        inner: &mut Pin<&mut T>,
        cx: &mut Context<'_>,
        length_buf: &mut [u8; 4],
        length_filled: &mut usize,
    ) -> Poll<io::Result<u32>> {
        while *length_filled < 4 {
            let mut read_buf = ReadBuf::new(&mut length_buf[*length_filled..]);
            match inner.as_mut().poll_read(cx, &mut read_buf) {
                Poll::Ready(Ok(())) => {
                    let n = read_buf.filled().len();
                    if n == 0 {
                        return Poll::Ready(Err(io::Error::new(
                            io::ErrorKind::UnexpectedEof,
                            "connection closed while reading frame length",
                        )));
                    }
                    *length_filled += n;
                }
                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
                Poll::Pending => return Poll::Pending,
            }
        }
        let len = u32::from_be_bytes(*length_buf);
        if len > MAX_MESSAGE_LEN {
            return Poll::Ready(Err(io::Error::new(
                io::ErrorKind::InvalidData,
                format!("frame length {} exceeds max {}", len, MAX_MESSAGE_LEN),
            )));
        }
        Poll::Ready(Ok(len))
    }
}

#[allow(private_interfaces)]
pub(crate) enum InnerSocket {
    /// WebSocket connection (used when target is remote).
    WebSocket(WebSocketAdapter),
    /// Unix domain stream (local + Linux) with liveness polling and message framing.
    /// The second field is the named semaphore name (e.g. `/ipcez_<path>_data_ready`) signaled after each send.
    #[cfg(unix)]
    Unix(MessageFramed<PolledUnixStream>, String),
    /// Named pipe client (local + Windows) with liveness polling and message framing.
    /// The second field is the named event name (e.g. `Global\ipcez.data_ready`) signaled after each send.
    #[cfg(windows)]
    NamedPipe(MessageFramed<PolledNamedPipe>, String),
    /// Socket has been disconnected; the transport was dropped. Subsequent send/read return connection lost.
    Closed,
}

/// Shared socket returned by `socket_init`. Use `send_message()` to send messages and `message_handler()` to handle incoming messages.
pub struct Socket(pub(crate) Arc<Mutex<InnerSocket>>);

/// Wraps a WebSocket stream to implement byte-oriented AsyncRead/AsyncWrite.
///
/// Exposed only so that the `Socket::WebSocket` variant can hold it; prefer using `Socket` and its `AsyncRead`/`AsyncWrite` impls.
#[doc(hidden)]
pub struct WebSocketAdapter {
    pub(crate) stream: Pin<Box<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>>,
    pub(crate) read_buf: Option<Vec<u8>>,
    pub(crate) read_cursor: usize,
    pub(crate) write_buf: Vec<u8>,
}

impl WebSocketAdapter {
    pub(crate) fn new(
        stream: WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
    ) -> Self {
        Self {
            stream: Box::pin(stream),
            read_buf: None,
            read_cursor: 0,
            write_buf: Vec::new(),
        }
    }
}

/// Logs a plain-English explanation and remediation to stderr when env detection fails.
fn log_detection_error_to_stderr(de: &DetectionError) {
    let (problem, what_to_do) = match de {
        DetectionError::Target(TargetDetectionError::NotFound) => (
            "the environment variable `target` is not set".to_string(),
            "For local connections set `target=local` and `os=windows` or `os=linux`. \
             For remote connections use a WebSocket URL (e.g. ws://...) as the endpoint; \
             then these variables are not needed.",
        ),
        DetectionError::Target(TargetDetectionError::InvalidValue(v)) => (
            format!("the environment variable `target` has an invalid value (\"{}\")", v),
            "Set `target` to either `local` or `remote`. For local IPC also set `os=windows` or \
             `os=linux`, then try again.",
        ),
        DetectionError::Os(OsDetectionError::NotFound) => (
            "the environment variable `os` is not set".to_string(),
            "For local connections set `os=windows` or `os=linux` (depending on the peer's \
             operating system) and `target=local`, then try again.",
        ),
        DetectionError::Os(OsDetectionError::InvalidValue(v)) => (
            format!("the environment variable `os` has an invalid value (\"{}\")", v),
            "Set `os` to either `linux` or `windows`, and ensure `target=local` for local \
             connections, then try again.",
        ),
    };
    let msg = format!(
        "ipcez: The connection could not be started because {}.\nWhat to do: {}",
        problem, what_to_do
    );
    crate::logger::log_to_stderr(&msg);
}

/// Resolves `(addr, target, os)` from an endpoint string and environment.
/// URL (ws:// or wss://) → remote; otherwise local with env detection. Path prefix for Windows pipe is added in connect_socket.
fn resolve_endpoint(endpoint: &str) -> Result<(String, TargetKind, OsKind), SocketError> {
    if endpoint.starts_with("ws://") || endpoint.starts_with("wss://") {
        return Ok((endpoint.to_string(), TargetKind::Remote, OsKind::Linux));
    }
    let target = detect_target_from_env().map_err(DetectionError::Target).map_err(SocketError::Detection)?;
    let os = detect_os_from_env().map_err(DetectionError::Os).map_err(SocketError::Detection)?;
    let addr = endpoint.to_string();
    Ok((addr, target, os))
}

/// Initializes a socket connection using a single endpoint string. No transport-specific knowledge required.
///
/// **Endpoint rule:** If `endpoint` starts with `ws://` or `wss://`, connects as **remote** (WebSocket).
/// Otherwise treats as **local**: reads `target` and `os` from the environment first; if either is unset or invalid,
/// returns `SocketError::Detection` with a message that states which variable is missing or invalid.
/// On Windows (local), if the string does not start with `\\.\pipe\`, that prefix is added automatically.
///
/// Set the `target` and `os` environment variables for local connections; use a WebSocket URL for remote.
pub async fn socket_init(endpoint: &str) -> Result<Socket, SocketError> {
    let (addr, target, os) = match resolve_endpoint(endpoint) {
        Ok(t) => t,
        Err(e) => {
            if let SocketError::Detection(ref de) = e {
                log_detection_error_to_stderr(de);
            }
            return Err(e);
        }
    };
    let inner = match connect_socket::connect_socket(&addr, target, os).await {
        Ok(inner) => inner,
        Err(e) => {
            crate::logger::log_error(&e);
            return Err(e);
        }
    };
    Ok(Socket(Arc::new(Mutex::new(inner))))
}

impl Socket {
    /// Sends one message. Fails if the message length exceeds the maximum allowed size.
    ///
    /// For **local** transports (named pipe, Unix domain socket): after writing the message and
    /// signaling the "data ready" event/semaphore, the call waits up to **5 seconds** for the
    /// recipient to signal "data acked" (named event on Windows, named semaphore on Linux).
    /// If no acknowledgment is received in time, returns `Err(SocketError::RecipientAckTimeout)`.
    /// The recipient (e.g. `message_handler` callback or an external process) must signal the
    /// "data acked" event/semaphore after each successful read so the sender can complete.
    ///
    /// On Windows the data-ready event is `Global\<pipe_name>.data_ready`; the ack event is
    /// `Global\<pipe_name>.data_acked`. On Linux the semaphores are `/ipcez_<path>_data_ready`
    /// and `/ipcez_<path>_data_acked`.
    ///
    /// For **WebSocket** (remote), no ack wait is performed; the call returns after write and flush.
    pub async fn send_message(&self, msg: &[u8]) -> Result<(), SocketError> {
        socket_async_write::send_message(&self.0, msg).await
    }

    /// Closes the connection. The underlying transport is dropped. Subsequent `send_message()` calls and the
    /// `message_handler` callback will see a connection-lost error.
    pub async fn disconnect(&self) {
        let mut inner = self.0.lock().await;
        *inner = InnerSocket::Closed;
    }

    /// Spawns a task that reads incoming messages and invokes `callback` for each. The task runs until the connection fails or is closed.
    /// The callback receives `Ok(msg)` for each message and `Err(e)` when the connection fails or closes (then the task stops).
    pub fn message_handler<F, Fut>(&self, callback: F)
    where
        F: FnMut(Result<Vec<u8>, SocketError>) -> Fut + Send + 'static,
        Fut: std::future::Future<Output = ()> + Send,
    {
        socket_async_read::spawn_message_handler(self.0.clone(), callback);
    }
}