ipcez 0.1.0

Rust library for ipcez.
Documentation
//! Connects a socket using the transport implied by target and OS.
//!
//! Single responsibility: given an address and detected target/os, establish the
//! appropriate transport (WebSocket, Unix domain socket, or Windows named pipe)
//! and return an InnerSocket for use by the socket module.

use std::time::{Duration, Instant};

use tokio_tungstenite::connect_async;

use crate::event_sender;
use crate::os::OsKind;
use crate::socket::{InnerSocket, MessageFramed, SocketError, WebSocketAdapter};
use crate::target::TargetKind;

#[cfg(unix)]
use crate::socket::PolledUnixStream;
#[cfg(windows)]
use crate::socket::PolledNamedPipe;

/// Poll interval (ms) for local transport liveness checks. Used when constructing polled streams.
const DEFAULT_POLL_INTERVAL_MS: u64 = 10;

/// Connects a socket using the transport implied by `target` and `os`. Used internally by `socket_init`.
/// Normalizes addr: ws/wss left as-is; for Local+Windows adds `\\.\pipe\` prefix if missing.
pub(crate) async fn connect_socket(
    addr: &str,
    target: TargetKind,
    os: OsKind,
) -> Result<InnerSocket, SocketError> {
    let connection_addr: String = match target {
        TargetKind::Remote => addr.to_string(),
        TargetKind::Local => match os {
            OsKind::Linux => addr.to_string(),
            OsKind::Windows => {
                if addr.starts_with(r"\\.\pipe\") {
                    addr.to_string()
                } else {
                    format!(r"\\.\pipe\{}", addr)
                }
            }
        },
    };

    let inner = match target {
        TargetKind::Remote => {
            let (stream, _) = connect_async(&connection_addr).await?;
            InnerSocket::WebSocket(WebSocketAdapter::new(stream))
        }
        TargetKind::Local => match os {
            OsKind::Linux => {
                #[cfg(unix)]
                {
                    let stream = tokio::net::UnixStream::connect(&connection_addr).await?;
                    let polled = PolledUnixStream {
                        inner: stream,
                        last_check: Instant::now(),
                        interval: Duration::from_millis(DEFAULT_POLL_INTERVAL_MS),
                        disconnected: false,
                        peek: None,
                    };
                    let event_name = event_sender::data_ready_event_name(&connection_addr);
                    InnerSocket::Unix(MessageFramed::new(polled), event_name)
                }
                #[cfg(not(unix))]
                return Err(SocketError::UnsupportedCombination(target, os));
            }
            OsKind::Windows => {
                #[cfg(windows)]
                {
                    use tokio::net::windows::named_pipe::ClientOptions;
                    use tokio::time;

                    /// Windows error: pipe is busy (server not waiting).
                    const ERROR_PIPE_BUSY: i32 = 231;

                    let client = loop {
                        match ClientOptions::new().open(&connection_addr) {
                            Ok(c) => break c,
                            Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY) => {}
                            Err(e) => return Err(e.into()),
                        }
                        time::sleep(Duration::from_millis(50)).await;
                    };
                    let polled = PolledNamedPipe {
                        inner: client,
                        last_check: Instant::now(),
                        interval: Duration::from_millis(DEFAULT_POLL_INTERVAL_MS),
                        disconnected: false,
                        peek: None,
                    };
                    let pipe_name = connection_addr[9..].to_string();
                    let event_name = event_sender::data_ready_event_name(&pipe_name);
                    InnerSocket::NamedPipe(MessageFramed::new(polled), event_name)
                }
                #[cfg(not(windows))]
                return Err(SocketError::UnsupportedCombination(target, os));
            }
        },
    };
    Ok(inner)
}