Skip to main content

rgx/
client.rs

1//! Client side: connect to the project's daemon, spawning it on first use.
2
3use std::path::Path;
4use std::process::{Command, Stdio};
5use std::time::Duration;
6
7use anyhow::{Result, bail};
8
9use crate::proto::{self, Request};
10use crate::transport::{self, Stream};
11
12/// Send one request to the daemon for `root` (spawning it if needed), streaming the response to
13/// `sink`. Returns the total number of bytes written.
14pub fn request_stream(root: &Path, req: &Request, sink: &mut impl std::io::Write) -> Result<usize> {
15    let mut stream = connect_or_spawn(root)?;
16    proto::write_request(&mut stream, req)?;
17    proto::read_stream(&mut stream, sink)
18}
19
20/// Send one request and collect the whole response into a `Vec` (for small responses).
21pub fn request(root: &Path, req: &Request) -> Result<Vec<u8>> {
22    let mut out = Vec::new();
23    request_stream(root, req, &mut out)?;
24    Ok(out)
25}
26
27/// Like [`request`] but never spawns — returns `None` if no daemon is listening. For `stop`/`status`.
28pub fn request_existing(root: &Path, req: &Request) -> Result<Option<Vec<u8>>> {
29    match transport::connect(root)? {
30        Some(mut stream) => {
31            proto::write_request(&mut stream, req)?;
32            Ok(Some(proto::read_stream_to_vec(&mut stream)?))
33        }
34        None => Ok(None),
35    }
36}
37
38/// Park a pagination cursor blob in `root`'s daemon (spawning it if needed); returns the short token
39/// to print in place of the blob.
40pub fn store_cursor(root: &Path, blob: Vec<u8>) -> Result<String> {
41    let out = request(root, &Request::CursorStore { blob })?;
42    Ok(String::from_utf8(out)?)
43}
44
45/// Redeem a pagination token at `root`'s daemon. `Ok(None)` means it expired or was already used —
46/// the daemon replies with an empty frame, or there's no daemon at all (so no stored cursors). Uses a
47/// connect-only request: a stale token must never spawn a fresh daemon (and a cold index build) just
48/// to discover it's gone.
49pub fn take_cursor(root: &Path, token: &str) -> Result<Option<Vec<u8>>> {
50    let reply = request_existing(
51        root,
52        &Request::CursorTake {
53            token: token.to_string(),
54        },
55    )?;
56    Ok(reply.filter(|blob| !blob.is_empty()))
57}
58
59/// Subscribe to the daemon's live status (spawning it if needed), invoking `render` with each status
60/// frame as it arrives, until the daemon closes the stream (or the process is interrupted).
61pub fn watch(root: &Path, mut render: impl FnMut(&[u8])) -> Result<()> {
62    let mut stream = connect_or_spawn(root)?;
63    proto::write_request(&mut stream, &Request::Watch)?;
64    while let Some(frame) = proto::read_watch_frame(&mut stream)? {
65        render(&frame);
66    }
67    Ok(())
68}
69
70fn connect_or_spawn(root: &Path) -> Result<Stream> {
71    if let Some(s) = transport::connect(root)? {
72        return Ok(s);
73    }
74    spawn_daemon(root)?;
75    for _ in 0..400 {
76        std::thread::sleep(Duration::from_millis(25));
77        if let Some(s) = transport::connect(root)? {
78            return Ok(s);
79        }
80    }
81    bail!("daemon did not come up for {}", root.display());
82}
83
84/// Poll until no daemon is listening for `root`, up to ~5s. Used by `restart` after a Shutdown so the
85/// fresh daemon isn't spawned while the old one still holds the endpoint (it would exit as a no-op).
86/// Returns `true` once the endpoint is free, `false` on timeout.
87pub fn wait_until_stopped(root: &Path) -> bool {
88    for _ in 0..200 {
89        if let Ok(None) = transport::connect(root) {
90            return true;
91        }
92        std::thread::sleep(Duration::from_millis(25));
93    }
94    false
95}
96
97/// Spawn a detached background daemon (`rgx --server`) rooted at `root`.
98pub fn spawn_daemon(root: &Path) -> Result<()> {
99    let exe = std::env::current_exe()?;
100    let mut cmd = Command::new(exe);
101    cmd.arg("--server")
102        .current_dir(root)
103        .stdin(Stdio::null())
104        .stdout(Stdio::null())
105        .stderr(Stdio::null());
106    detach(&mut cmd);
107    cmd.spawn()?;
108    Ok(())
109}
110
111/// Put the daemon in its own process group so it outlives this client.
112#[cfg(unix)]
113fn detach(cmd: &mut Command) {
114    use std::os::unix::process::CommandExt;
115    cmd.process_group(0);
116}
117
118#[cfg(windows)]
119fn detach(cmd: &mut Command) {
120    use std::os::windows::process::CommandExt;
121    // DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW
122    cmd.creation_flags(0x0000_0008 | 0x0000_0200 | 0x0800_0000);
123}