Skip to main content

rgx/
transport.rs

1//! The daemon transport, abstracted so the same protocol runs over the best local IPC per platform.
2//!
3//! - **Unix** (`mac`/`linux`): an AF_UNIX socket — byte-for-byte the original implementation, so
4//!   behavior and performance on those platforms are unchanged.
5//! - **Windows**: a loopback TCP connection whose port the daemon publishes in the state dir (Rust's
6//!   `std` has no Windows `UnixStream`). `set_nodelay` keeps small-request latency low.
7//!
8//! The `tcp-transport` feature forces the TCP path on any platform, so the Windows transport can be
9//! exercised and tested on Unix.
10
11use std::io;
12use std::path::Path;
13
14use anyhow::Result;
15
16#[cfg(all(unix, not(feature = "tcp-transport")))]
17mod imp {
18    use std::io::{self, ErrorKind};
19    use std::os::unix::net::{UnixListener, UnixStream};
20    use std::path::{Path, PathBuf};
21
22    use anyhow::Result;
23
24    use crate::paths;
25
26    pub type Listener = UnixListener;
27    pub type Stream = UnixStream;
28
29    fn endpoint(root: &Path) -> PathBuf {
30        paths::state_dir(root).join("daemon.sock")
31    }
32
33    /// Connect to the daemon, spawning nothing. `Ok(None)` means no live daemon owns this root.
34    pub fn connect(root: &Path) -> io::Result<Option<Stream>> {
35        match UnixStream::connect(endpoint(root)) {
36            Ok(s) => Ok(Some(s)),
37            Err(e)
38                if e.kind() == ErrorKind::NotFound || e.kind() == ErrorKind::ConnectionRefused =>
39            {
40                Ok(None)
41            }
42            Err(e) => Err(e),
43        }
44    }
45
46    /// Bind the per-root endpoint, taking ownership. `Ok(None)` means a live daemon already owns it;
47    /// a stale socket (no listener) is removed and rebound.
48    pub fn bind(root: &Path) -> Result<Option<Listener>> {
49        let sock = endpoint(root);
50        match UnixListener::bind(&sock) {
51            Ok(l) => Ok(Some(l)),
52            Err(e) if e.kind() == ErrorKind::AddrInUse => {
53                if UnixStream::connect(&sock).is_ok() {
54                    Ok(None)
55                } else {
56                    std::fs::remove_file(&sock).ok();
57                    Ok(Some(UnixListener::bind(&sock)?))
58                }
59            }
60            Err(e) => Err(e.into()),
61        }
62    }
63
64    pub fn accept(listener: &Listener) -> io::Result<Stream> {
65        listener.accept().map(|(s, _)| s)
66    }
67
68    pub fn cleanup(root: &Path) {
69        let _ = std::fs::remove_file(endpoint(root));
70    }
71}
72
73#[cfg(any(windows, feature = "tcp-transport"))]
74mod imp {
75    use std::io::{self, ErrorKind};
76    use std::net::{Ipv4Addr, TcpListener, TcpStream};
77    use std::path::{Path, PathBuf};
78
79    use anyhow::Result;
80
81    use crate::paths;
82
83    pub type Listener = TcpListener;
84    pub type Stream = TcpStream;
85
86    fn port_file(root: &Path) -> PathBuf {
87        paths::state_dir(root).join("daemon.port")
88    }
89
90    fn read_port(root: &Path) -> Option<u16> {
91        std::fs::read_to_string(port_file(root))
92            .ok()?
93            .trim()
94            .parse()
95            .ok()
96    }
97
98    /// Connect to the daemon, spawning nothing. `Ok(None)` means no live daemon owns this root (no
99    /// port published, or nothing listening on it).
100    pub fn connect(root: &Path) -> io::Result<Option<Stream>> {
101        let Some(port) = read_port(root) else {
102            return Ok(None);
103        };
104        match TcpStream::connect((Ipv4Addr::LOCALHOST, port)) {
105            Ok(s) => {
106                s.set_nodelay(true).ok();
107                Ok(Some(s))
108            }
109            Err(e) if e.kind() == ErrorKind::ConnectionRefused => Ok(None),
110            Err(e) => Err(e),
111        }
112    }
113
114    /// Bind a loopback port and publish it. `Ok(None)` means a live daemon already answers for this
115    /// root. (Loopback-only, so it does not trip the Windows firewall prompt.)
116    pub fn bind(root: &Path) -> Result<Option<Listener>> {
117        if connect(root)?.is_some() {
118            return Ok(None);
119        }
120        let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0))?;
121        let port = listener.local_addr()?.port();
122        let dir = paths::state_dir(root);
123        std::fs::create_dir_all(&dir)?;
124        // Publish the port atomically so a connecting client never reads a half-written file.
125        let tmp = dir.join(format!("daemon.port.{}", std::process::id()));
126        std::fs::write(&tmp, port.to_string())?;
127        std::fs::rename(&tmp, port_file(root))?;
128        Ok(Some(listener))
129    }
130
131    pub fn accept(listener: &Listener) -> io::Result<Stream> {
132        let (s, _) = listener.accept()?;
133        s.set_nodelay(true).ok();
134        Ok(s)
135    }
136
137    pub fn cleanup(root: &Path) {
138        let _ = std::fs::remove_file(port_file(root));
139    }
140}
141
142pub use imp::{Listener, Stream};
143
144/// Connect to the daemon for `root` without spawning one; `Ok(None)` if none is live.
145pub fn connect(root: &Path) -> io::Result<Option<Stream>> {
146    imp::connect(root)
147}
148
149/// Take ownership of `root`'s endpoint; `Ok(None)` if a live daemon already owns it.
150pub fn bind(root: &Path) -> Result<Option<Listener>> {
151    imp::bind(root)
152}
153
154/// Block until one client connects, returning its stream.
155pub fn accept(listener: &Listener) -> io::Result<Stream> {
156    imp::accept(listener)
157}
158
159/// Remove the endpoint's on-disk artifact (socket file / published port) on shutdown.
160pub fn cleanup(root: &Path) {
161    imp::cleanup(root)
162}