Skip to main content

reef/
daemon.rs

1//! Persistent bash coprocess daemon for `reef persist full`.
2//!
3//! Architecture:
4//!   - `start()` spawns a detached daemon process (`reef daemon _serve`).
5//!   - `exec()` connects to the socket, sends a command, and receives
6//!     the output + env diff + exit code.
7//!   - `stop()` sends a shutdown signal via the socket.
8//!   - `status()` checks if the daemon is alive by pinging the socket.
9//!
10//! The daemon runs single-threaded — one command at a time, matching
11//! interactive shell semantics. Zero external dependencies.
12
13use std::io::{self, BufRead, BufReader, Read, Write};
14use std::os::unix::net::{UnixListener, UnixStream};
15use std::process::{Command, Stdio};
16use std::{fs, process};
17
18use crate::env_diff::{self, EnvSnapshot};
19
20/// Null-delimited sentinel markers used in the bash protocol.
21/// Null bytes avoid collisions with any command output.
22const ENV_SENTINEL: &str = "\0__REEF_DAEMON_ENV__\0";
23const CWD_SENTINEL: &str = "\0__REEF_DAEMON_CWD__\0";
24const EXIT_SENTINEL: &str = "\0__REEF_DAEMON_EXIT__\0";
25const DONE_SENTINEL: &str = "\0__REEF_DAEMON_DONE__\0";
26
27/// Magic command sent by `stop()` to shut down the daemon.
28const SHUTDOWN_CMD: &str = "__REEF_SHUTDOWN__";
29
30/// Magic command sent by `status()` to check if the daemon is alive.
31const PING_CMD: &str = "__REEF_PING__";
32const PONG_RESPONSE: &[u8] = b"__REEF_PONG__\n";
33
34// -----------------------------------------------------------------------
35// Client API (called by `reef daemon exec/stop/status`)
36// -----------------------------------------------------------------------
37
38/// Send a command to the daemon and print results.
39/// Returns the command's exit code.
40#[must_use]
41pub fn exec(socket_path: &str, command: &str) -> i32 {
42    let mut stream = match UnixStream::connect(socket_path) {
43        Ok(s) => s,
44        Err(e) => {
45            eprintln!("reef daemon: failed to connect: {e}");
46            eprintln!("reef daemon: is the daemon running? try: reef persist full");
47            return 1;
48        }
49    };
50
51    let before = EnvSnapshot::capture_current();
52
53    // Send command length (4 bytes LE) + command bytes
54    let cmd_bytes = command.as_bytes();
55    // Shell commands are always far below u32::MAX; truncation cannot occur
56    // in practice, and the server rejects oversized payloads anyway.
57    #[allow(clippy::cast_possible_truncation)]
58    let len = cmd_bytes.len() as u32;
59    if stream.write_all(&len.to_le_bytes()).is_err()
60        || stream.write_all(cmd_bytes).is_err()
61        || stream.flush().is_err()
62    {
63        eprintln!("reef daemon: failed to send command");
64        return 1;
65    }
66
67    // Read response until we see DONE_SENTINEL
68    let mut response = Vec::with_capacity(4096);
69    let mut buf = [0u8; 4096];
70    loop {
71        let n = match stream.read(&mut buf) {
72            Ok(0) => break,
73            Ok(n) => n,
74            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
75            Err(e) => {
76                eprintln!("reef daemon: read error: {e}");
77                return 1;
78            }
79        };
80        response.extend_from_slice(&buf[..n]);
81        if contains_sentinel(&response, DONE_SENTINEL) {
82            break;
83        }
84    }
85
86    parse_and_print_response(&before, &response)
87}
88
89/// Tell the daemon to shut down.
90pub fn stop(socket_path: &str) {
91    if let Ok(mut stream) = UnixStream::connect(socket_path) {
92        let cmd_bytes = SHUTDOWN_CMD.as_bytes();
93        // Constant string — always fits in u32.
94        #[allow(clippy::cast_possible_truncation)]
95        let len = cmd_bytes.len() as u32;
96        let _ = stream.write_all(&len.to_le_bytes());
97        let _ = stream.write_all(cmd_bytes);
98        let _ = stream.flush();
99    }
100    // Clean up socket file
101    let _ = fs::remove_file(socket_path);
102}
103
104/// Check if the daemon is running and responsive.
105#[must_use]
106pub fn status(socket_path: &str) -> bool {
107    let Ok(mut stream) = UnixStream::connect(socket_path) else {
108        return false;
109    };
110
111    let cmd_bytes = PING_CMD.as_bytes();
112    // Constant string — always fits in u32.
113    #[allow(clippy::cast_possible_truncation)]
114    let len = cmd_bytes.len() as u32;
115    if stream.write_all(&len.to_le_bytes()).is_err()
116        || stream.write_all(cmd_bytes).is_err()
117        || stream.flush().is_err()
118    {
119        return false;
120    }
121
122    let mut buf = [0u8; 64];
123    match stream.read(&mut buf) {
124        Ok(n) => &buf[..n] == PONG_RESPONSE,
125        Err(_) => false,
126    }
127}
128
129/// Parse the daemon response: extract user output, env diff, and exit code.
130fn parse_and_print_response(before: &EnvSnapshot, response: &[u8]) -> i32 {
131    let data = String::from_utf8_lossy(response);
132
133    // Response format:
134    //   <user_output>ENV_SENTINEL<env_data>CWD_SENTINEL<cwd>EXIT_SENTINEL<code>DONE_SENTINEL
135
136    let Some(env_pos) = data.find(ENV_SENTINEL) else {
137        // No sentinels — dump everything as output
138        let _ = io::stderr().write_all(response);
139        return 1;
140    };
141
142    let after_env = &data[env_pos + ENV_SENTINEL.len()..];
143
144    let Some(cwd_pos) = after_env.find(CWD_SENTINEL) else {
145        return 1;
146    };
147    let env_section = &after_env[..cwd_pos];
148
149    let after_cwd = &after_env[cwd_pos + CWD_SENTINEL.len()..];
150    let Some(exit_pos) = after_cwd.find(EXIT_SENTINEL) else {
151        return 1;
152    };
153    let cwd_section = after_cwd[..exit_pos].trim();
154
155    let after_exit = &after_cwd[exit_pos + EXIT_SENTINEL.len()..];
156    let done_pos = after_exit.find(DONE_SENTINEL).unwrap_or(after_exit.len());
157    let exit_code: i32 = after_exit[..done_pos].trim().parse().unwrap_or(1);
158
159    // Exit 127 = command not found in bash. Suppress the error message
160    // so the fish wrapper can fall back to trying it as a fish command.
161    if exit_code == 127 {
162        return 127;
163    }
164
165    // Print user output to stderr (so user sees it)
166    let user_output = &response[..env_pos];
167    if !user_output.is_empty() {
168        let _ = io::stderr().write_all(user_output);
169    }
170
171    // Build env snapshot and diff
172    let after = EnvSnapshot::new(
173        env_diff::parse_null_separated_env(env_section),
174        cwd_section.to_string(),
175    );
176
177    let mut buf = String::new();
178    before.diff_into(&after, &mut buf);
179    if !buf.is_empty() {
180        let _ = io::stdout().lock().write_all(buf.as_bytes());
181    }
182
183    exit_code
184}
185
186// -----------------------------------------------------------------------
187// Server (daemon process)
188// -----------------------------------------------------------------------
189
190/// Start the daemon: spawn a detached `reef daemon _serve` process.
191pub fn start(socket_path: &str) {
192    // Remove stale socket if it exists
193    let _ = fs::remove_file(socket_path);
194
195    let exe = match std::env::current_exe() {
196        Ok(e) => e,
197        Err(e) => {
198            eprintln!("reef daemon: failed to find executable: {e}");
199            process::exit(1);
200        }
201    };
202
203    match Command::new(exe)
204        .args(["daemon", "_serve", "--socket", socket_path])
205        .stdin(Stdio::null())
206        .stdout(Stdio::null())
207        .stderr(Stdio::inherit())
208        .spawn()
209    {
210        Ok(_) => {}
211        Err(e) => {
212            eprintln!("reef daemon: failed to spawn: {e}");
213            process::exit(1);
214        }
215    }
216
217    // Wait for socket to appear (up to 500ms)
218    for _ in 0..50 {
219        if std::path::Path::new(socket_path).exists() {
220            return;
221        }
222        std::thread::sleep(std::time::Duration::from_millis(10));
223    }
224
225    eprintln!("reef daemon: timed out waiting for socket");
226}
227
228/// Main daemon loop: spawn bash, accept connections, proxy commands.
229/// Called by `reef daemon _serve` (internal, not user-facing).
230///
231/// # Panics
232///
233/// Panics if `bash.stdin` or `bash.stdout` cannot be taken after spawning
234/// with `Stdio::piped()`. This is infallible in practice — `take()` only
235/// returns `None` if called twice, and we call it exactly once.
236pub fn serve(socket_path: &str) {
237    let listener = match UnixListener::bind(socket_path) {
238        Ok(l) => l,
239        Err(e) => {
240            eprintln!("reef daemon: failed to bind socket: {e}");
241            return;
242        }
243    };
244
245    // Spawn persistent bash process
246    let mut bash = match Command::new("bash")
247        .stdin(Stdio::piped())
248        .stdout(Stdio::piped())
249        .stderr(Stdio::inherit())
250        .spawn()
251    {
252        Ok(p) => p,
253        Err(e) => {
254            eprintln!("reef daemon: failed to spawn bash: {e}");
255            let _ = fs::remove_file(socket_path);
256            return;
257        }
258    };
259
260    let bash_stdin = bash.stdin.take().expect("stdin was set to piped");
261    let bash_stdout = bash.stdout.take().expect("stdout was set to piped");
262
263    let mut writer = io::BufWriter::new(bash_stdin);
264    let mut reader = BufReader::new(bash_stdout);
265
266    for stream in listener.incoming() {
267        let Ok(mut stream) = stream else {
268            continue;
269        };
270
271        // Read command: 4-byte LE length + command bytes
272        let mut len_buf = [0u8; 4];
273        if stream.read_exact(&mut len_buf).is_err() {
274            continue;
275        }
276        let cmd_len = u32::from_le_bytes(len_buf) as usize;
277
278        // Guard against absurd lengths — reject anything over 16 MiB to
279        // prevent a malicious or buggy client from exhausting memory.
280        const MAX_CMD_LEN: usize = 16 * 1024 * 1024;
281        if cmd_len > MAX_CMD_LEN {
282            continue;
283        }
284
285        let mut cmd_buf = vec![0u8; cmd_len];
286        if stream.read_exact(&mut cmd_buf).is_err() {
287            continue;
288        }
289        let command = String::from_utf8_lossy(&cmd_buf);
290
291        // Handle special commands
292        if *command == *SHUTDOWN_CMD {
293            let _ = bash.kill();
294            let _ = bash.wait();
295            let _ = fs::remove_file(socket_path);
296            return;
297        }
298
299        if *command == *PING_CMD {
300            let _ = stream.write_all(PONG_RESPONSE);
301            continue;
302        }
303
304        // Build the bash script to execute
305        let script = build_daemon_script(&command);
306
307        // Send to bash
308        if writeln!(writer, "{script}").is_err() || writer.flush().is_err() {
309            // Bash process died
310            let _ = stream.write_all(b"reef daemon: bash process died\n");
311            let _ = bash.kill();
312            let _ = fs::remove_file(socket_path);
313            return;
314        }
315
316        // Read bash output until DONE_SENTINEL
317        let mut response = Vec::with_capacity(4096);
318        loop {
319            let mut line = Vec::new();
320            match reader.read_until(b'\n', &mut line) {
321                Ok(0) | Err(_) => break, // EOF or error — bash died
322                Ok(_) => {
323                    response.extend_from_slice(&line);
324                    if contains_sentinel(&response, DONE_SENTINEL) {
325                        break;
326                    }
327                }
328            }
329        }
330
331        // Send response back to client
332        let _ = stream.write_all(&response);
333
334        // Check if bash is still alive
335        if let Some(_status) = bash.try_wait().ok().flatten() {
336            let _ = fs::remove_file(socket_path);
337            return;
338        }
339    }
340}
341
342/// Build a bash script block for the daemon to eval.
343///
344/// The script:
345/// 1. Evals the user's command with output to stdout (inherited as stderr)
346/// 2. Captures exit code
347/// 3. Prints env dump with null-delimited sentinels
348fn build_daemon_script(command: &str) -> String {
349    // Escape command for eval (single-quote it)
350    let mut escaped = String::with_capacity(command.len() + 2);
351    escaped.push('\'');
352    for &b in command.as_bytes() {
353        if b == b'\'' {
354            escaped.push_str("'\\''");
355        } else {
356            escaped.push(b as char);
357        }
358    }
359    escaped.push('\'');
360
361    let mut s = String::with_capacity(escaped.len() + 256);
362    s.push_str("eval ");
363    s.push_str(&escaped);
364    s.push_str(" >&2\n");
365    s.push_str("__reef_exit=$?\n");
366    s.push_str("printf '\\0__REEF_DAEMON_ENV__\\0'\n");
367    s.push_str("env -0\n");
368    s.push_str("printf '\\0__REEF_DAEMON_CWD__\\0'\n");
369    s.push_str("pwd\n");
370    s.push_str("printf '\\0__REEF_DAEMON_EXIT__\\0%d\\0__REEF_DAEMON_DONE__\\0\\n' $__reef_exit\n");
371    s
372}
373
374/// Check if a byte slice contains a sentinel string.
375fn contains_sentinel(data: &[u8], sentinel: &str) -> bool {
376    let sentinel_bytes = sentinel.as_bytes();
377    data.windows(sentinel_bytes.len())
378        .any(|w| w == sentinel_bytes)
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384
385    #[test]
386    fn contains_sentinel_finds_match() {
387        let data = b"hello\0__REEF_DAEMON_DONE__\0\n";
388        assert!(contains_sentinel(data, DONE_SENTINEL));
389    }
390
391    #[test]
392    fn contains_sentinel_no_match() {
393        let data = b"hello world\n";
394        assert!(!contains_sentinel(data, DONE_SENTINEL));
395    }
396
397    #[test]
398    fn build_daemon_script_format() {
399        let script = build_daemon_script("echo hello");
400        assert!(script.contains("eval 'echo hello'"));
401        assert!(script.contains("__reef_exit=$?"));
402        assert!(script.contains("env -0"));
403        assert!(script.contains("pwd"));
404    }
405
406    #[test]
407    fn build_daemon_script_escapes_quotes() {
408        let script = build_daemon_script("echo 'it'\"s\"");
409        assert!(script.contains("'\\''"));
410    }
411
412    #[test]
413    fn parse_response_extracts_exit_code() {
414        let before = EnvSnapshot::new(
415            std::collections::HashMap::new(),
416            "/home".to_string(),
417        );
418
419        let mut response = Vec::new();
420        response.extend_from_slice(b"output text");
421        response.extend_from_slice(ENV_SENTINEL.as_bytes());
422        response.extend_from_slice(b"MY_VAR=hello\0");
423        response.extend_from_slice(CWD_SENTINEL.as_bytes());
424        response.extend_from_slice(b"/tmp\n");
425        response.extend_from_slice(EXIT_SENTINEL.as_bytes());
426        response.extend_from_slice(b"42");
427        response.extend_from_slice(DONE_SENTINEL.as_bytes());
428
429        let exit_code = parse_and_print_response(&before, &response);
430        assert_eq!(exit_code, 42);
431    }
432}