1use std::io::{self, BufRead, BufReader, Read, Write};
14use std::os::unix::net::{UnixListener, UnixStream};
15use std::process::{Command, Stdio};
16use std::{fs, process};
17
18use crate::env_diff::{self, EnvSnapshot};
19
20const ENV_SENTINEL: &str = "\0__REEF_DAEMON_ENV__\0";
23const CWD_SENTINEL: &str = "\0__REEF_DAEMON_CWD__\0";
24const EXIT_SENTINEL: &str = "\0__REEF_DAEMON_EXIT__\0";
25const DONE_SENTINEL: &str = "\0__REEF_DAEMON_DONE__\0";
26
27const SHUTDOWN_CMD: &str = "__REEF_SHUTDOWN__";
29
30const PING_CMD: &str = "__REEF_PING__";
32const PONG_RESPONSE: &[u8] = b"__REEF_PONG__\n";
33
34#[must_use]
41pub fn exec(socket_path: &str, command: &str) -> i32 {
42 let mut stream = match UnixStream::connect(socket_path) {
43 Ok(s) => s,
44 Err(e) => {
45 eprintln!("reef daemon: failed to connect: {e}");
46 eprintln!("reef daemon: is the daemon running? try: reef persist full");
47 return 1;
48 }
49 };
50
51 let before = EnvSnapshot::capture_current();
52
53 let cmd_bytes = command.as_bytes();
55 #[allow(clippy::cast_possible_truncation)]
58 let len = cmd_bytes.len() as u32;
59 if stream.write_all(&len.to_le_bytes()).is_err()
60 || stream.write_all(cmd_bytes).is_err()
61 || stream.flush().is_err()
62 {
63 eprintln!("reef daemon: failed to send command");
64 return 1;
65 }
66
67 let mut response = Vec::with_capacity(4096);
69 let mut buf = [0u8; 4096];
70 loop {
71 let n = match stream.read(&mut buf) {
72 Ok(0) => break,
73 Ok(n) => n,
74 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
75 Err(e) => {
76 eprintln!("reef daemon: read error: {e}");
77 return 1;
78 }
79 };
80 response.extend_from_slice(&buf[..n]);
81 if contains_sentinel(&response, DONE_SENTINEL) {
82 break;
83 }
84 }
85
86 parse_and_print_response(&before, &response)
87}
88
89pub fn stop(socket_path: &str) {
91 if let Ok(mut stream) = UnixStream::connect(socket_path) {
92 let cmd_bytes = SHUTDOWN_CMD.as_bytes();
93 #[allow(clippy::cast_possible_truncation)]
95 let len = cmd_bytes.len() as u32;
96 let _ = stream.write_all(&len.to_le_bytes());
97 let _ = stream.write_all(cmd_bytes);
98 let _ = stream.flush();
99 }
100 let _ = fs::remove_file(socket_path);
102}
103
104#[must_use]
106pub fn status(socket_path: &str) -> bool {
107 let Ok(mut stream) = UnixStream::connect(socket_path) else {
108 return false;
109 };
110
111 let cmd_bytes = PING_CMD.as_bytes();
112 #[allow(clippy::cast_possible_truncation)]
114 let len = cmd_bytes.len() as u32;
115 if stream.write_all(&len.to_le_bytes()).is_err()
116 || stream.write_all(cmd_bytes).is_err()
117 || stream.flush().is_err()
118 {
119 return false;
120 }
121
122 let mut buf = [0u8; 64];
123 match stream.read(&mut buf) {
124 Ok(n) => &buf[..n] == PONG_RESPONSE,
125 Err(_) => false,
126 }
127}
128
129fn parse_and_print_response(before: &EnvSnapshot, response: &[u8]) -> i32 {
131 let data = String::from_utf8_lossy(response);
132
133 let Some(env_pos) = data.find(ENV_SENTINEL) else {
137 let _ = io::stderr().write_all(response);
139 return 1;
140 };
141
142 let after_env = &data[env_pos + ENV_SENTINEL.len()..];
143
144 let Some(cwd_pos) = after_env.find(CWD_SENTINEL) else {
145 return 1;
146 };
147 let env_section = &after_env[..cwd_pos];
148
149 let after_cwd = &after_env[cwd_pos + CWD_SENTINEL.len()..];
150 let Some(exit_pos) = after_cwd.find(EXIT_SENTINEL) else {
151 return 1;
152 };
153 let cwd_section = after_cwd[..exit_pos].trim();
154
155 let after_exit = &after_cwd[exit_pos + EXIT_SENTINEL.len()..];
156 let done_pos = after_exit.find(DONE_SENTINEL).unwrap_or(after_exit.len());
157 let exit_code: i32 = after_exit[..done_pos].trim().parse().unwrap_or(1);
158
159 if exit_code == 127 {
162 return 127;
163 }
164
165 let user_output = &response[..env_pos];
167 if !user_output.is_empty() {
168 let _ = io::stderr().write_all(user_output);
169 }
170
171 let after = EnvSnapshot::new(
173 env_diff::parse_null_separated_env(env_section),
174 cwd_section.to_string(),
175 );
176
177 let mut buf = String::new();
178 before.diff_into(&after, &mut buf);
179 if !buf.is_empty() {
180 let _ = io::stdout().lock().write_all(buf.as_bytes());
181 }
182
183 exit_code
184}
185
186pub fn start(socket_path: &str) {
192 let _ = fs::remove_file(socket_path);
194
195 let exe = match std::env::current_exe() {
196 Ok(e) => e,
197 Err(e) => {
198 eprintln!("reef daemon: failed to find executable: {e}");
199 process::exit(1);
200 }
201 };
202
203 match Command::new(exe)
204 .args(["daemon", "_serve", "--socket", socket_path])
205 .stdin(Stdio::null())
206 .stdout(Stdio::null())
207 .stderr(Stdio::inherit())
208 .spawn()
209 {
210 Ok(_) => {}
211 Err(e) => {
212 eprintln!("reef daemon: failed to spawn: {e}");
213 process::exit(1);
214 }
215 }
216
217 for _ in 0..50 {
219 if std::path::Path::new(socket_path).exists() {
220 return;
221 }
222 std::thread::sleep(std::time::Duration::from_millis(10));
223 }
224
225 eprintln!("reef daemon: timed out waiting for socket");
226}
227
228pub fn serve(socket_path: &str) {
237 let listener = match UnixListener::bind(socket_path) {
238 Ok(l) => l,
239 Err(e) => {
240 eprintln!("reef daemon: failed to bind socket: {e}");
241 return;
242 }
243 };
244
245 let mut bash = match Command::new("bash")
247 .stdin(Stdio::piped())
248 .stdout(Stdio::piped())
249 .stderr(Stdio::inherit())
250 .spawn()
251 {
252 Ok(p) => p,
253 Err(e) => {
254 eprintln!("reef daemon: failed to spawn bash: {e}");
255 let _ = fs::remove_file(socket_path);
256 return;
257 }
258 };
259
260 let bash_stdin = bash.stdin.take().expect("stdin was set to piped");
261 let bash_stdout = bash.stdout.take().expect("stdout was set to piped");
262
263 let mut writer = io::BufWriter::new(bash_stdin);
264 let mut reader = BufReader::new(bash_stdout);
265
266 for stream in listener.incoming() {
267 let Ok(mut stream) = stream else {
268 continue;
269 };
270
271 let mut len_buf = [0u8; 4];
273 if stream.read_exact(&mut len_buf).is_err() {
274 continue;
275 }
276 let cmd_len = u32::from_le_bytes(len_buf) as usize;
277
278 const MAX_CMD_LEN: usize = 16 * 1024 * 1024;
281 if cmd_len > MAX_CMD_LEN {
282 continue;
283 }
284
285 let mut cmd_buf = vec![0u8; cmd_len];
286 if stream.read_exact(&mut cmd_buf).is_err() {
287 continue;
288 }
289 let command = String::from_utf8_lossy(&cmd_buf);
290
291 if *command == *SHUTDOWN_CMD {
293 let _ = bash.kill();
294 let _ = bash.wait();
295 let _ = fs::remove_file(socket_path);
296 return;
297 }
298
299 if *command == *PING_CMD {
300 let _ = stream.write_all(PONG_RESPONSE);
301 continue;
302 }
303
304 let script = build_daemon_script(&command);
306
307 if writeln!(writer, "{script}").is_err() || writer.flush().is_err() {
309 let _ = stream.write_all(b"reef daemon: bash process died\n");
311 let _ = bash.kill();
312 let _ = fs::remove_file(socket_path);
313 return;
314 }
315
316 let mut response = Vec::with_capacity(4096);
318 loop {
319 let mut line = Vec::new();
320 match reader.read_until(b'\n', &mut line) {
321 Ok(0) | Err(_) => break, Ok(_) => {
323 response.extend_from_slice(&line);
324 if contains_sentinel(&response, DONE_SENTINEL) {
325 break;
326 }
327 }
328 }
329 }
330
331 let _ = stream.write_all(&response);
333
334 if let Some(_status) = bash.try_wait().ok().flatten() {
336 let _ = fs::remove_file(socket_path);
337 return;
338 }
339 }
340}
341
342fn build_daemon_script(command: &str) -> String {
349 let mut escaped = String::with_capacity(command.len() + 2);
351 escaped.push('\'');
352 for &b in command.as_bytes() {
353 if b == b'\'' {
354 escaped.push_str("'\\''");
355 } else {
356 escaped.push(b as char);
357 }
358 }
359 escaped.push('\'');
360
361 let mut s = String::with_capacity(escaped.len() + 256);
362 s.push_str("eval ");
363 s.push_str(&escaped);
364 s.push_str(" >&2\n");
365 s.push_str("__reef_exit=$?\n");
366 s.push_str("printf '\\0__REEF_DAEMON_ENV__\\0'\n");
367 s.push_str("env -0\n");
368 s.push_str("printf '\\0__REEF_DAEMON_CWD__\\0'\n");
369 s.push_str("pwd\n");
370 s.push_str("printf '\\0__REEF_DAEMON_EXIT__\\0%d\\0__REEF_DAEMON_DONE__\\0\\n' $__reef_exit\n");
371 s
372}
373
374fn contains_sentinel(data: &[u8], sentinel: &str) -> bool {
376 let sentinel_bytes = sentinel.as_bytes();
377 data.windows(sentinel_bytes.len())
378 .any(|w| w == sentinel_bytes)
379}
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384
385 #[test]
386 fn contains_sentinel_finds_match() {
387 let data = b"hello\0__REEF_DAEMON_DONE__\0\n";
388 assert!(contains_sentinel(data, DONE_SENTINEL));
389 }
390
391 #[test]
392 fn contains_sentinel_no_match() {
393 let data = b"hello world\n";
394 assert!(!contains_sentinel(data, DONE_SENTINEL));
395 }
396
397 #[test]
398 fn build_daemon_script_format() {
399 let script = build_daemon_script("echo hello");
400 assert!(script.contains("eval 'echo hello'"));
401 assert!(script.contains("__reef_exit=$?"));
402 assert!(script.contains("env -0"));
403 assert!(script.contains("pwd"));
404 }
405
406 #[test]
407 fn build_daemon_script_escapes_quotes() {
408 let script = build_daemon_script("echo 'it'\"s\"");
409 assert!(script.contains("'\\''"));
410 }
411
412 #[test]
413 fn parse_response_extracts_exit_code() {
414 let before = EnvSnapshot::new(
415 std::collections::HashMap::new(),
416 "/home".to_string(),
417 );
418
419 let mut response = Vec::new();
420 response.extend_from_slice(b"output text");
421 response.extend_from_slice(ENV_SENTINEL.as_bytes());
422 response.extend_from_slice(b"MY_VAR=hello\0");
423 response.extend_from_slice(CWD_SENTINEL.as_bytes());
424 response.extend_from_slice(b"/tmp\n");
425 response.extend_from_slice(EXIT_SENTINEL.as_bytes());
426 response.extend_from_slice(b"42");
427 response.extend_from_slice(DONE_SENTINEL.as_bytes());
428
429 let exit_code = parse_and_print_response(&before, &response);
430 assert_eq!(exit_code, 42);
431 }
432}