Skip to main content

keep_running/
daemon.rs

1use crate::protocol::{decode_message, encode_message, ClientMessage, DaemonMessage};
2use crate::session::{self, SessionInfo};
3use anyhow::{Context, Result};
4use nix::pty::{openpty, OpenptyResult, Winsize};
5use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus};
6use nix::unistd::{setsid, Pid};
7use std::io::{Read, Write};
8use std::os::fd::AsRawFd;
9use std::os::unix::net::{UnixListener, UnixStream};
10use std::os::unix::process::CommandExt;
11use std::process::Command;
12use std::time::Duration;
13
14const MAX_BUFFER_SIZE: usize = 5 * 1024 * 1024; // 5MB scrollback history
15/// When send_buf exceeds this, stop queuing new live output (client can't keep up).
16/// The data is still saved in the history buffer for replay on reattach.
17const MAX_SEND_BUF: usize = 4 * 1024 * 1024; // 4MB
18
19/// Shared state for the daemon
20struct DaemonState {
21    /// Output buffer for replay on reattach
22    buffer: Vec<u8>,
23    /// Current client connection (if any)
24    client: Option<UnixStream>,
25    /// Outgoing data queued for the client
26    send_buf: Vec<u8>,
27    /// Child process ID (for informational purposes)
28    #[allow(dead_code)]
29    child_pid: Pid,
30    /// Whether child has exited
31    child_exited: bool,
32    /// Child exit code
33    exit_code: Option<i32>,
34    /// PTY master fd
35    master_fd: i32,
36    /// Replay cursor: how far into `buffer` we've sent for the current replay.
37    /// None means no replay in progress.
38    replay_offset: Option<usize>,
39}
40
41impl DaemonState {
42    fn new(child_pid: Pid, master_fd: i32) -> Self {
43        Self {
44            buffer: Vec::new(),
45            client: None,
46            send_buf: Vec::new(),
47            child_pid,
48            child_exited: false,
49            exit_code: None,
50            master_fd,
51            replay_offset: None,
52        }
53    }
54
55    /// Queue an encoded message for sending to the client.
56    /// Returns false if client was dropped (send buffer overflow or encode error).
57    fn queue_message(&mut self, msg: &DaemonMessage) -> bool {
58        if self.client.is_none() {
59            return false;
60        }
61        match encode_message(msg) {
62            Ok(encoded) => {
63                self.send_buf.extend_from_slice(&encoded);
64                true
65            }
66            Err(_) => false,
67        }
68    }
69
70    /// Flush as much of send_buf as possible using non-blocking writes.
71    /// Drops the client if the send buffer is too large and no replay is in progress.
72    fn flush_send_buf(&mut self) {
73        if self.send_buf.is_empty() || self.client.is_none() {
74            return;
75        }
76
77        if let Some(ref mut client) = self.client {
78            loop {
79                match client.write(&self.send_buf) {
80                    Ok(0) => break,
81                    Ok(n) => {
82                        self.send_buf.drain(0..n);
83                        if self.send_buf.is_empty() {
84                            break;
85                        }
86                    }
87                    Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
88                    Err(_) => {
89                        // Real write error — client is gone
90                        self.send_buf.clear();
91                        self.replay_offset = None;
92                        self.client = None;
93                        return;
94                    }
95                }
96            }
97        }
98    }
99
100    /// Buffer output data AND send to client if connected.
101    /// If the client can't keep up (send_buf too large), we skip sending
102    /// live output — the child process is never blocked. The data is still
103    /// in the history buffer and the client will catch up on reattach.
104    fn handle_output(&mut self, data: &[u8]) {
105        // Always buffer output for session history
106        self.buffer_data(data);
107
108        // Only queue for client if send_buf isn't backed up
109        if self.send_buf.len() <= MAX_SEND_BUF {
110            let msg = DaemonMessage::Output(data.to_vec());
111            self.queue_message(&msg);
112        }
113    }
114
115    fn buffer_data(&mut self, data: &[u8]) {
116        self.buffer.extend_from_slice(data);
117        // Trim if too large (drop oldest)
118        if self.buffer.len() > MAX_BUFFER_SIZE {
119            let trim = self.buffer.len() - MAX_BUFFER_SIZE;
120            self.buffer.drain(0..trim);
121        }
122    }
123
124    /// Begin an incremental replay of the session history buffer.
125    /// Sends a ReplayStart message and sets up the replay cursor.
126    /// Actual data is fed via pump_replay() each poll iteration.
127    fn start_replay(&mut self) {
128        if self.client.is_none() || self.buffer.is_empty() {
129            return;
130        }
131        self.queue_message(&DaemonMessage::ReplayStart);
132        self.replay_offset = Some(0);
133    }
134
135    /// Feed the next batch of replay data into send_buf.
136    /// Called each poll iteration while a replay is in progress.
137    /// Only queues more data when send_buf is below a threshold so we
138    /// don't accumulate faster than the client can drain.
139    fn pump_replay(&mut self) {
140        const CHUNK_SIZE: usize = 32 * 1024; // 32KB per message
141        /// How much we allow in send_buf before pausing replay
142        const REPLAY_SEND_WATERMARK: usize = 256 * 1024; // 256KB
143
144        let offset = match self.replay_offset {
145            Some(o) => o,
146            None => return,
147        };
148
149        if self.client.is_none() {
150            self.replay_offset = None;
151            return;
152        }
153
154        // Don't queue more if send_buf is already full enough
155        if self.send_buf.len() > REPLAY_SEND_WATERMARK {
156            return;
157        }
158
159        if offset >= self.buffer.len() {
160            // Replay complete
161            self.queue_message(&DaemonMessage::ReplayEnd);
162            self.replay_offset = None;
163            return;
164        }
165
166        // Queue a batch of chunks up to the watermark
167        let mut pos = offset;
168        while pos < self.buffer.len() && self.send_buf.len() <= REPLAY_SEND_WATERMARK {
169            let end = (pos + CHUNK_SIZE).min(self.buffer.len());
170            let chunk = self.buffer[pos..end].to_vec();
171            pos = end;
172            let msg = DaemonMessage::Output(chunk);
173            if !self.queue_message(&msg) {
174                self.replay_offset = None;
175                return;
176            }
177        }
178        self.replay_offset = Some(pos);
179    }
180
181    /// Resize the PTY
182    fn resize_pty(&self, cols: u16, rows: u16) {
183        let winsize = Winsize {
184            ws_row: rows,
185            ws_col: cols,
186            ws_xpixel: 0,
187            ws_ypixel: 0,
188        };
189        unsafe {
190            libc::ioctl(self.master_fd, libc::TIOCSWINSZ as _, &winsize);
191        }
192    }
193}
194
195/// Daemonize the current process
196pub fn daemonize() -> Result<()> {
197    // First fork
198    let pid = unsafe { libc::fork() };
199    if pid < 0 {
200        return Err(std::io::Error::last_os_error()).context("First fork failed");
201    }
202    if pid > 0 {
203        // Parent exits
204        std::process::exit(0);
205    }
206
207    // Create new session
208    setsid().context("setsid failed")?;
209
210    // Second fork (prevent acquiring controlling terminal)
211    let pid = unsafe { libc::fork() };
212    if pid < 0 {
213        return Err(std::io::Error::last_os_error()).context("Second fork failed");
214    }
215    if pid > 0 {
216        // First child exits
217        std::process::exit(0);
218    }
219
220    // Close stdin/stdout/stderr and redirect to /dev/null
221    let devnull = std::fs::OpenOptions::new()
222        .read(true)
223        .write(true)
224        .open("/dev/null")?;
225    let fd = devnull.as_raw_fd();
226    unsafe {
227        libc::dup2(fd, 0);
228        libc::dup2(fd, 1);
229        libc::dup2(fd, 2);
230    }
231
232    Ok(())
233}
234
235/// Spawn command in a PTY, return (master_fd, child_pid)
236fn spawn_in_pty(command: &[String], cols: u16, rows: u16) -> Result<(i32, Pid)> {
237    let winsize = Winsize {
238        ws_row: rows,
239        ws_col: cols,
240        ws_xpixel: 0,
241        ws_ypixel: 0,
242    };
243
244    let OpenptyResult { master, slave } = openpty(&winsize, None).context("Failed to open PTY")?;
245
246    let master_fd = master.as_raw_fd();
247    let slave_fd = slave.as_raw_fd();
248
249    let (program, args) = command.split_first().context("Empty command")?;
250
251    let pid = unsafe { libc::fork() };
252    if pid < 0 {
253        return Err(std::io::Error::last_os_error()).context("Failed to fork");
254    }
255
256    if pid == 0 {
257        // Child process
258        // Close master
259        drop(master);
260
261        // Create new session
262        let _ = setsid();
263
264        // Set slave as controlling terminal
265        unsafe {
266            libc::ioctl(slave_fd, libc::TIOCSCTTY as _, 0);
267        }
268
269        // Dup slave to stdin/stdout/stderr
270        unsafe {
271            libc::dup2(slave_fd, 0);
272            libc::dup2(slave_fd, 1);
273            libc::dup2(slave_fd, 2);
274        }
275
276        if slave_fd > 2 {
277            drop(slave);
278        }
279
280        // Exec — set KEEP_RUNNING so nested sessions can be detected
281        let err = Command::new(program)
282            .args(args)
283            .env("KEEP_RUNNING", "1")
284            .exec();
285        eprintln!("Failed to exec '{}': {}", program, err);
286        std::process::exit(1);
287    }
288
289    // Parent
290    drop(slave);
291
292    // Forget master so it doesn't get closed when OwnedFd drops
293    std::mem::forget(master);
294
295    Ok((master_fd, Pid::from_raw(pid)))
296}
297
298/// Run the daemon for a session
299pub fn run_daemon(name: String, command: Vec<String>) -> Result<()> {
300    // Default size - will be updated when client attaches
301    let cols = 80;
302    let rows = 24;
303
304    // Create PTY and spawn child
305    let (master_fd, child_pid) = spawn_in_pty(&command, cols, rows)?;
306
307    // Set up socket
308    let socket_path = session::socket_path(&name)?;
309    if socket_path.exists() {
310        std::fs::remove_file(&socket_path)?;
311    }
312
313    let listener = UnixListener::bind(&socket_path).context("Failed to bind socket")?;
314    listener.set_nonblocking(true)?;
315
316    // Save session info
317    let info = SessionInfo {
318        name: name.clone(),
319        command: command.clone(),
320        pid: std::process::id(),
321        created_at: session::timestamp(),
322        socket_path: socket_path.to_string_lossy().to_string(),
323    };
324    session::save_session(&info)?;
325
326    // Initialize state
327    let mut state = DaemonState::new(child_pid, master_fd);
328
329    // Set PTY master to non-blocking
330    unsafe {
331        let flags = libc::fcntl(master_fd, libc::F_GETFL);
332        libc::fcntl(master_fd, libc::F_SETFL, flags | libc::O_NONBLOCK);
333    }
334
335    let mut pty_read_buf = [0u8; 4096];
336    let mut client_read_buf = [0u8; 4096];
337    let mut client_msg_buf = Vec::new();
338
339    // Track whether we've notified the client about exit
340    let mut exit_notified = false;
341    // When we queued ChildExited for the current client. A well-behaved client
342    // disconnects shortly after receiving this; if the connection is still alive
343    // `grace_period` later, the peer is a zombie.
344    let mut exit_notified_at: Option<std::time::Instant> = None;
345    // Track if PTY has been drained after child exit
346    let mut pty_drained = false;
347    // When child exited (for grace period)
348    let mut child_exit_time: Option<std::time::Instant> = None;
349    // True when a client received the ChildExited message and then disconnected
350    let mut exit_delivered = false;
351    // Zombie-client detection: when did `send_buf` last fail to make progress?
352    // If a peer connects but never reads (SIGSTOP'd, kernel-buffer-deadlocked,
353    // or just abandoned), `send_buf` accumulates and `flush_send_buf` keeps
354    // returning WouldBlock without an error. Without this, the daemon would
355    // think a client is attached forever.
356    let mut send_stalled_since: Option<std::time::Instant> = None;
357    let mut prev_send_buf_len: usize = 0;
358
359    let listener_fd = listener.as_raw_fd();
360
361    // Grace period: keep daemon alive this long after child exits with no client,
362    // so detached clients have time to reattach and see the output.
363    // Tunable via KEEP_RUNNING_GRACE_SECS so tests don't have to wait 30s.
364    let grace_period: Duration = std::env::var("KEEP_RUNNING_GRACE_SECS")
365        .ok()
366        .and_then(|s| s.parse::<u64>().ok())
367        .map(Duration::from_secs)
368        .unwrap_or(Duration::from_secs(30));
369
370    // Zombie-client timeout: how long send_buf can sit non-empty without making
371    // progress before we treat the peer as unresponsive and force-disconnect.
372    // Tunable via KEEP_RUNNING_ZOMBIE_SECS for tests.
373    let zombie_timeout: Duration = std::env::var("KEEP_RUNNING_ZOMBIE_SECS")
374        .ok()
375        .and_then(|s| s.parse::<u64>().ok())
376        .map(Duration::from_secs)
377        .unwrap_or(Duration::from_secs(30));
378
379    loop {
380        // Check if child has exited
381        if !state.child_exited {
382            match waitpid(child_pid, Some(WaitPidFlag::WNOHANG)) {
383                Ok(WaitStatus::Exited(_, code)) => {
384                    state.child_exited = true;
385                    state.exit_code = Some(code);
386                    child_exit_time = Some(std::time::Instant::now());
387                }
388                Ok(WaitStatus::Signaled(_, _, _)) => {
389                    state.child_exited = true;
390                    state.exit_code = None;
391                    child_exit_time = Some(std::time::Instant::now());
392                }
393                _ => {}
394            }
395        }
396
397        // Daemon exits when child has exited, no client connected, AND either:
398        // - a client received the ChildExited notification and then disconnected, OR
399        // - the grace period has elapsed (no one is coming back).
400        //
401        // We deliberately do not require `pty_drained` here. If the PTY hasn't
402        // drained yet (e.g. POLLHUP semantics differ across platforms) we still
403        // want to exit once the grace window is up — anything we haven't read
404        // by then is lost regardless. Previously this condition could deadlock
405        // the daemon at ~95% CPU forever if `pty_drained` never flipped.
406        if state.child_exited && state.client.is_none() {
407            let should_exit = exit_delivered
408                || child_exit_time
409                    .map(|t| t.elapsed() >= grace_period)
410                    .unwrap_or(false);
411            if should_exit {
412                let _ = session::remove_session(&name);
413                return Ok(());
414            }
415        }
416
417        // Build poll fds: [pty_master, listener, client?]
418        let mut pollfds: Vec<libc::pollfd> = Vec::with_capacity(3);
419
420        // Poll PTY master for readable data — never block the child.
421        // Once the child has exited AND the PTY is drained, set fd to -1
422        // (poll() ignores negative fds). Otherwise POLLHUP keeps firing on
423        // the closed slave end and the loop spins at 100% CPU during the
424        // grace period. We still keep the slot in pollfds so subsequent
425        // index references (pollfds[0]) stay valid.
426        let master_pollfd = if state.child_exited && pty_drained {
427            -1
428        } else {
429            master_fd
430        };
431        pollfds.push(libc::pollfd {
432            fd: master_pollfd,
433            events: libc::POLLIN,
434            revents: 0,
435        });
436
437        // Always poll listener for new connections
438        pollfds.push(libc::pollfd {
439            fd: listener_fd,
440            events: libc::POLLIN,
441            revents: 0,
442        });
443
444        // Poll client if connected
445        let client_poll_idx = if let Some(ref client) = state.client {
446            let mut events = libc::POLLIN;
447            if !state.send_buf.is_empty() {
448                events |= libc::POLLOUT;
449            }
450            pollfds.push(libc::pollfd {
451                fd: client.as_raw_fd(),
452                events,
453                revents: 0,
454            });
455            Some(pollfds.len() - 1)
456        } else {
457            None
458        };
459
460        // Use a longer timeout when idle, short when we have pending sends or replay.
461        // The short timeout is only useful when there's actually a client to drain
462        // `send_buf` into — otherwise we'd burn ~95% CPU spinning on a buffer with
463        // no consumer (this is what caused stuck daemons after a child exited but
464        // a queued ChildExited message had no client to receive it).
465        let has_drainable_work = !state.send_buf.is_empty() || state.replay_offset.is_some();
466        let timeout_ms = if has_drainable_work && state.client.is_some() {
467            1
468        } else {
469            500
470        };
471
472        let poll_ret = unsafe {
473            libc::poll(
474                pollfds.as_mut_ptr(),
475                pollfds.len() as libc::nfds_t,
476                timeout_ms,
477            )
478        };
479
480        // poll error (not EINTR)
481        if poll_ret < 0 {
482            let err = std::io::Error::last_os_error();
483            if err.kind() != std::io::ErrorKind::Interrupted {
484                // Unexpected poll error, brief sleep and retry
485                std::thread::sleep(Duration::from_millis(10));
486            }
487            continue;
488        }
489
490        // Try to accept new connection (if listener is readable or on timeout)
491        if pollfds[1].revents & libc::POLLIN != 0 {
492            if let Ok((stream, _)) = listener.accept() {
493                stream.set_nonblocking(true).ok();
494                state.client = Some(stream);
495                state.send_buf.clear();
496                client_msg_buf.clear();
497                exit_notified = false;
498                exit_notified_at = None;
499            }
500        }
501
502        // Read from PTY
503        if pollfds[0].revents & (libc::POLLIN | libc::POLLHUP) != 0 {
504            loop {
505                let pty_read = unsafe {
506                    libc::read(
507                        master_fd,
508                        pty_read_buf.as_mut_ptr() as *mut libc::c_void,
509                        pty_read_buf.len(),
510                    )
511                };
512
513                if pty_read > 0 {
514                    let data = &pty_read_buf[..pty_read as usize];
515                    state.handle_output(data);
516                    pty_drained = false;
517                } else if pty_read < 0 {
518                    let err = std::io::Error::last_os_error();
519                    if err.kind() == std::io::ErrorKind::WouldBlock {
520                        if state.child_exited {
521                            pty_drained = true;
522                        }
523                    } else {
524                        // Read error other than WouldBlock — almost always EIO
525                        // on Linux, which fires once the slave is fully closed
526                        // (child has exited or is about to). Capture the real
527                        // exit status via waitpid so we don't report code=None
528                        // for a clean exit. Bounded wait covers the race where
529                        // the kernel reports the closed slave before SIGCHLD.
530                        if !state.child_exited {
531                            let deadline = std::time::Instant::now() + Duration::from_millis(500);
532                            loop {
533                                match waitpid(child_pid, Some(WaitPidFlag::WNOHANG)) {
534                                    Ok(WaitStatus::Exited(_, code)) => {
535                                        state.exit_code = Some(code);
536                                        state.child_exited = true;
537                                        child_exit_time = Some(std::time::Instant::now());
538                                        break;
539                                    }
540                                    Ok(WaitStatus::Signaled(_, _, _)) => {
541                                        state.exit_code = None;
542                                        state.child_exited = true;
543                                        child_exit_time = Some(std::time::Instant::now());
544                                        break;
545                                    }
546                                    _ => {
547                                        if std::time::Instant::now() >= deadline {
548                                            // Fall back: mark exited so we
549                                            // don't spin on EIO forever.
550                                            state.child_exited = true;
551                                            child_exit_time = Some(std::time::Instant::now());
552                                            break;
553                                        }
554                                        std::thread::sleep(Duration::from_millis(5));
555                                    }
556                                }
557                            }
558                        }
559                        pty_drained = true;
560                    }
561                    break;
562                } else {
563                    // EOF
564                    pty_drained = true;
565                    break;
566                }
567            }
568        } else if state.child_exited {
569            // Child exited and poll didn't report PTY readable — it's drained
570            pty_drained = true;
571        }
572
573        // Send ChildExited only after PTY is drained
574        if state.child_exited && pty_drained && !exit_notified {
575            let msg = DaemonMessage::ChildExited {
576                code: state.exit_code,
577            };
578            state.queue_message(&msg);
579            exit_notified = true;
580            exit_notified_at = Some(std::time::Instant::now());
581        }
582
583        // Read from client
584        let mut client_disconnected = false;
585        let mut should_replay = false;
586        let mut resize_request: Option<(u16, u16)> = None;
587
588        if let Some(idx) = client_poll_idx {
589            let client_revents = pollfds[idx].revents;
590
591            if client_revents & (libc::POLLIN | libc::POLLHUP | libc::POLLERR) != 0 {
592                if let Some(ref mut client) = state.client {
593                    match client.read(&mut client_read_buf) {
594                        Ok(0) => {
595                            client_disconnected = true;
596                        }
597                        Ok(n) => {
598                            client_msg_buf.extend_from_slice(&client_read_buf[..n]);
599
600                            // Process messages
601                            loop {
602                                match decode_message::<ClientMessage>(&client_msg_buf) {
603                                    Ok(Some((msg, consumed))) => {
604                                        client_msg_buf.drain(0..consumed);
605
606                                        match msg {
607                                            ClientMessage::Attach { cols, rows } => {
608                                                resize_request = Some((cols, rows));
609                                                state.queue_message(&DaemonMessage::Attached);
610                                                should_replay = true;
611
612                                                if state.child_exited
613                                                    && pty_drained
614                                                    && !exit_notified
615                                                {
616                                                    let msg = DaemonMessage::ChildExited {
617                                                        code: state.exit_code,
618                                                    };
619                                                    state.queue_message(&msg);
620                                                    exit_notified = true;
621                                                    exit_notified_at =
622                                                        Some(std::time::Instant::now());
623                                                }
624                                            }
625                                            ClientMessage::Input(data) => unsafe {
626                                                libc::write(
627                                                    master_fd,
628                                                    data.as_ptr() as *const libc::c_void,
629                                                    data.len(),
630                                                );
631                                            },
632                                            ClientMessage::Resize { cols, rows } => {
633                                                resize_request = Some((cols, rows));
634                                            }
635                                            ClientMessage::Detach => {
636                                                client_disconnected = true;
637                                            }
638                                        }
639                                    }
640                                    Ok(None) => break,
641                                    Err(_) => {
642                                        client_disconnected = true;
643                                        break;
644                                    }
645                                }
646                            }
647                        }
648                        Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
649                        Err(_) => {
650                            client_disconnected = true;
651                        }
652                    }
653                }
654            }
655        }
656
657        // Handle resize outside of borrow
658        if let Some((cols, rows)) = resize_request {
659            state.resize_pty(cols, rows);
660        }
661
662        // Start incremental replay after processing attach message
663        if should_replay {
664            state.start_replay();
665        }
666
667        // Feed more replay data if a replay is in progress
668        state.pump_replay();
669
670        if client_disconnected {
671            // If we already sent ChildExited to this client, the info was delivered
672            if exit_notified {
673                exit_delivered = true;
674            }
675            state.client = None;
676            state.send_buf.clear();
677            state.replay_offset = None;
678            client_msg_buf.clear();
679            send_stalled_since = None;
680            prev_send_buf_len = 0;
681            exit_notified_at = None;
682        }
683
684        // Flush queued data to client (non-blocking)
685        state.flush_send_buf();
686
687        // Zombie-client detection (variant A): the peer connected but our
688        // `send_buf` is stuck — the kernel-side socket buffer filled up and
689        // they aren't reading. Happens when a client is SIGSTOP'd or wedged.
690        if state.client.is_some() && !state.send_buf.is_empty() {
691            if state.send_buf.len() < prev_send_buf_len {
692                send_stalled_since = None;
693            } else {
694                let started = send_stalled_since.get_or_insert_with(std::time::Instant::now);
695                if started.elapsed() >= zombie_timeout {
696                    if exit_notified {
697                        exit_delivered = true;
698                    }
699                    state.client = None;
700                    state.send_buf.clear();
701                    state.replay_offset = None;
702                    client_msg_buf.clear();
703                    send_stalled_since = None;
704                    exit_notified_at = None;
705                }
706            }
707        } else {
708            send_stalled_since = None;
709        }
710        prev_send_buf_len = state.send_buf.len();
711
712        // Zombie-client detection (variant B): we sent ChildExited and the
713        // peer never disconnected. Real clients show a final status and exit
714        // within milliseconds of receiving this; anyone still here a full
715        // grace period later is wedged. This is the failure mode that lets
716        // a backgrounded `keep-running run -- <cmd> &` (the client gets
717        // SIGTTIN-stopped on its first stdin read) hold the daemon open
718        // forever — `send_buf` empties because the kernel buffer absorbed
719        // the small ChildExited frame, so variant A above never triggers.
720        if state.client.is_some() && exit_notified {
721            if let Some(t) = exit_notified_at {
722                if t.elapsed() >= grace_period {
723                    state.client = None;
724                    state.send_buf.clear();
725                    state.replay_offset = None;
726                    client_msg_buf.clear();
727                    exit_notified_at = None;
728                    // Don't set exit_delivered — we don't know the peer
729                    // actually saw the message. Cleanup will fall through
730                    // to the grace-period branch above.
731                }
732            }
733        }
734    }
735}
736
737/// Fork and run daemon in background
738pub fn start_daemon(name: String, command: Vec<String>) -> Result<()> {
739    let pid = unsafe { libc::fork() };
740
741    if pid < 0 {
742        return Err(std::io::Error::last_os_error()).context("Fork failed");
743    }
744
745    if pid == 0 {
746        // Child - become daemon
747        if let Err(e) = daemonize() {
748            eprintln!("Daemonize failed: {}", e);
749            std::process::exit(1);
750        }
751
752        if let Err(_e) = run_daemon(name, command) {
753            std::process::exit(1);
754        }
755
756        std::process::exit(0);
757    }
758
759    // Parent - wait for daemon to set up
760    std::thread::sleep(Duration::from_millis(200));
761
762    Ok(())
763}