Skip to main content

running_process/client/
client.rs

1//! Synchronous IPC client for the running-process daemon.
2//!
3//! Connects to the daemon over a local socket (Unix domain socket on
4//! Linux/macOS, named pipe on Windows) and exchanges length-prefixed protobuf
5//! messages.
6
7use crate::client::paths;
8use crate::proto::daemon::{
9    BulkTerminateSessionsRequest, BulkTerminateSessionsResponse, DaemonRequest, DaemonResponse,
10    GetProcessTreeRequest, GetSessionBacklogRequest, GetSessionBacklogResponse, KeyValue,
11    KillTreeRequest, KillZombiesRequest, ListActiveRequest, ListByOriginatorRequest, PingRequest,
12    PipeStreamKind, PurgeExitedSessionsRequest, PurgeExitedSessionsResponse, RequestType,
13    ResizePtySessionRequest, ServiceConfig, ServiceDeleteRequest, ServiceDescribeRequest,
14    ServiceFlushRequest, ServiceListRequest, ServiceLogsRequest, ServiceRestartRequest,
15    ServiceResurrectRequest, ServiceSaveRequest, ServiceStartRequest, ServiceStopRequest,
16    ShutdownRequest, SpawnDaemonRequest as ProtoSpawnDaemonRequest, StatusCode, StatusRequest,
17};
18use interprocess::local_socket::Stream;
19use interprocess::TryClone;
20use prost::Message;
21use std::io::{BufReader, BufWriter, Read, Write};
22use std::path::PathBuf;
23use std::sync::atomic::{AtomicU64, Ordering};
24
25// ---------------------------------------------------------------------------
26// Error type
27// ---------------------------------------------------------------------------
28
29/// Errors produced by [`DaemonClient`] operations.
30#[derive(Debug)]
31pub enum ClientError {
32    /// Failed to connect to the daemon socket.
33    Connect(std::io::Error),
34    /// I/O error during send or receive.
35    Io(std::io::Error),
36    /// Failed to decode a protobuf response.
37    Decode(prost::DecodeError),
38    /// The daemon returned an application-level error response.
39    Server {
40        /// Application-level status code returned by the daemon.
41        code: StatusCode,
42        /// Human-readable daemon error message.
43        message: String,
44    },
45    /// The daemon is not running and could not be started.
46    DaemonNotRunning,
47}
48
49impl std::fmt::Display for ClientError {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        match self {
52            ClientError::Connect(e) => write!(f, "failed to connect to daemon: {e}"),
53            ClientError::Io(e) => write!(f, "daemon I/O error: {e}"),
54            ClientError::Decode(e) => write!(f, "failed to decode daemon response: {e}"),
55            ClientError::Server { code, message } => {
56                write!(f, "daemon returned {:?}: {}", code, message)
57            }
58            ClientError::DaemonNotRunning => write!(f, "daemon is not running"),
59        }
60    }
61}
62
63impl std::error::Error for ClientError {
64    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
65        match self {
66            ClientError::Connect(e) | ClientError::Io(e) => Some(e),
67            ClientError::Decode(e) => Some(e),
68            ClientError::Server { .. } | ClientError::DaemonNotRunning => None,
69        }
70    }
71}
72
73// ---------------------------------------------------------------------------
74// Spawn API
75// ---------------------------------------------------------------------------
76
77/// Request to spawn a detached daemonized shell command under daemon control.
78#[derive(Debug, Clone)]
79pub struct SpawnCommandRequest {
80    /// Shell command line to execute.
81    pub command: String,
82    /// Working directory for the spawned command.
83    pub cwd: Option<PathBuf>,
84    /// Environment key/value pairs sent with the request.
85    pub env: Vec<(String, String)>,
86    /// Caller-provided originator used for tracking and filtering.
87    pub originator: Option<String>,
88    /// When `true`, the daemon clears the inherited env before applying
89    /// [`Self::env`], so the subprocess sees ONLY the supplied map.
90    /// Mirrors Python's `subprocess.Popen(env=…)` replace semantic.
91    /// Default `false` keeps the historic "layer on top of inherited"
92    /// behaviour.
93    pub clear_inherited_env: bool,
94}
95
96impl SpawnCommandRequest {
97    fn default_originator() -> String {
98        let caller = std::env::current_exe()
99            .ok()
100            .and_then(|path| {
101                path.file_stem()
102                    .map(|stem| stem.to_string_lossy().into_owned())
103            })
104            .filter(|value| !value.is_empty())
105            .unwrap_or_else(|| "running-process-client".to_string());
106        format!("{caller}:{}", std::process::id())
107    }
108
109    /// Build a shell-command request using the caller's current working
110    /// directory and environment.
111    pub fn shell(command: impl Into<String>) -> Self {
112        Self {
113            command: command.into(),
114            cwd: std::env::current_dir().ok(),
115            env: std::env::vars().collect(),
116            originator: Some(Self::default_originator()),
117            clear_inherited_env: false,
118        }
119    }
120
121    /// Override the working directory used for the spawned command.
122    pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
123        self.cwd = Some(cwd.into());
124        self
125    }
126
127    /// Replace the environment block sent to the daemon (layered on top
128    /// of the daemon's inherited env, unless [`Self::with_env_replace`]
129    /// is used instead).
130    pub fn with_envs<I, K, V>(mut self, env: I) -> Self
131    where
132        I: IntoIterator<Item = (K, V)>,
133        K: Into<String>,
134        V: Into<String>,
135    {
136        self.env = env
137            .into_iter()
138            .map(|(key, value)| (key.into(), value.into()))
139            .collect();
140        self
141    }
142
143    /// Set the env block AND tell the daemon to clear the inherited
144    /// env first — the subprocess will see ONLY the supplied map.
145    ///
146    /// Mirrors Python's `subprocess.Popen(env=…)` semantic:
147    ///
148    /// ```python
149    /// subprocess.Popen(["..."], env=None)        # inherits
150    /// subprocess.Popen(["..."], env={"K": "V"})  # replaces
151    /// ```
152    ///
153    /// On Windows you typically still want to include `SystemRoot` in
154    /// the supplied map so `cmd.exe` can load its DLLs.
155    pub fn with_env_replace<I, K, V>(mut self, env: I) -> Self
156    where
157        I: IntoIterator<Item = (K, V)>,
158        K: Into<String>,
159        V: Into<String>,
160    {
161        self.env = env
162            .into_iter()
163            .map(|(key, value)| (key.into(), value.into()))
164            .collect();
165        self.clear_inherited_env = true;
166        self
167    }
168
169    /// Add or replace a single environment variable while keeping the rest
170    /// of the existing environment block intact.
171    pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
172        let key = key.into();
173        let value = value.into();
174        if let Some((_, existing)) = self
175            .env
176            .iter_mut()
177            .find(|(existing_key, _)| *existing_key == key)
178        {
179            *existing = value;
180        } else {
181            self.env.push((key, value));
182        }
183        self
184    }
185
186    /// Set the originator value stored in the daemon registry and injected
187    /// into the spawned child environment.
188    pub fn with_originator(mut self, originator: impl Into<String>) -> Self {
189        self.originator = Some(originator.into());
190        self
191    }
192}
193
194/// Information about a daemonized process spawned by the service.
195#[derive(Debug, Clone, PartialEq)]
196pub struct SpawnedDaemon {
197    /// Operating-system process identifier of the spawned daemon.
198    pub pid: u32,
199    /// Daemon-side creation timestamp in Unix seconds.
200    pub created_at: f64,
201    /// Shell command registered for the spawned daemon.
202    pub command: String,
203    /// Working directory reported for the spawned daemon.
204    pub cwd: Option<String>,
205    /// Originator recorded for the spawned daemon.
206    pub originator: Option<String>,
207    /// Containment mechanism used by the daemon for this process.
208    pub containment: String,
209}
210
211// ---------------------------------------------------------------------------
212// Client
213// ---------------------------------------------------------------------------
214
215/// Synchronous IPC client that communicates with the daemon over a local socket.
216///
217/// Messages are framed with a 4-byte big-endian length prefix followed by
218/// a protobuf-encoded payload.
219pub struct DaemonClient {
220    reader: BufReader<Stream>,
221    writer: BufWriter<Stream>,
222    next_id: AtomicU64,
223}
224
225impl DaemonClient {
226    /// Connect to a running daemon identified by an optional scope hash.
227    ///
228    /// The socket path is computed by [`paths::socket_path`] and the name type
229    /// dispatch matches the server via [`paths::make_socket_name`].
230    pub fn connect(scope_hash: Option<&str>) -> Result<Self, ClientError> {
231        let path = paths::socket_path(scope_hash);
232        Self::connect_to(&path)
233    }
234
235    /// Connect to a daemon listening at an explicit socket path.
236    ///
237    /// Use this when you already know the socket path (e.g. in integration
238    /// tests that start a server on a unique path).
239    pub fn connect_to(socket_path: &str) -> Result<Self, ClientError> {
240        let name = paths::make_socket_name(socket_path).map_err(ClientError::Connect)?;
241
242        use interprocess::local_socket::traits::Stream as _;
243        let stream = Stream::connect(name).map_err(ClientError::Connect)?;
244        let stream_clone = stream.try_clone().map_err(ClientError::Connect)?;
245
246        Ok(Self {
247            reader: BufReader::new(stream),
248            writer: BufWriter::new(stream_clone),
249            next_id: AtomicU64::new(1),
250        })
251    }
252
253    /// Send a request and wait for the corresponding response.
254    ///
255    /// The request is length-prefixed (4-byte big-endian u32) then protobuf-encoded.
256    /// The response uses the same framing.
257    pub fn send_request(&mut self, request: DaemonRequest) -> Result<DaemonResponse, ClientError> {
258        // Encode
259        let payload = request.encode_to_vec();
260        let len = payload.len() as u32;
261
262        // Write length prefix + payload
263        self.writer
264            .write_all(&len.to_be_bytes())
265            .map_err(ClientError::Io)?;
266        self.writer.write_all(&payload).map_err(ClientError::Io)?;
267        self.writer.flush().map_err(ClientError::Io)?;
268
269        // Read length prefix
270        let mut len_buf = [0u8; 4];
271        self.reader
272            .read_exact(&mut len_buf)
273            .map_err(ClientError::Io)?;
274        let resp_len = u32::from_be_bytes(len_buf) as usize;
275
276        // Read payload
277        let mut resp_buf = vec![0u8; resp_len];
278        self.reader
279            .read_exact(&mut resp_buf)
280            .map_err(ClientError::Io)?;
281
282        DaemonResponse::decode(&resp_buf[..]).map_err(ClientError::Decode)
283    }
284
285    // -----------------------------------------------------------------------
286    // Convenience helpers
287    // -----------------------------------------------------------------------
288
289    /// Allocate the next request ID.
290    pub(crate) fn next_request_id(&self) -> u64 {
291        self.next_id.fetch_add(1, Ordering::Relaxed)
292    }
293
294    fn ensure_ok(&self, response: &DaemonResponse) -> Result<(), ClientError> {
295        if response.code == StatusCode::Ok as i32 {
296            return Ok(());
297        }
298
299        let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
300        Err(ClientError::Server {
301            code,
302            message: response.message.clone(),
303        })
304    }
305
306    /// Ping the daemon to check liveness.
307    pub fn ping(&mut self) -> Result<DaemonResponse, ClientError> {
308        let request = DaemonRequest {
309            id: self.next_request_id(),
310            r#type: RequestType::Ping.into(),
311            protocol_version: 1,
312            client_name: String::from("running-process-client"),
313            ping: Some(PingRequest {}),
314            ..Default::default()
315        };
316        self.send_request(request)
317    }
318
319    /// Ask the daemon to shut down.
320    pub fn shutdown(
321        &mut self,
322        graceful: bool,
323        timeout_seconds: f64,
324    ) -> Result<DaemonResponse, ClientError> {
325        let request = DaemonRequest {
326            id: self.next_request_id(),
327            r#type: RequestType::Shutdown.into(),
328            protocol_version: 1,
329            client_name: String::from("running-process-client"),
330            shutdown: Some(ShutdownRequest {
331                graceful,
332                timeout_seconds,
333            }),
334            ..Default::default()
335        };
336        self.send_request(request)
337    }
338
339    /// Query daemon status.
340    pub fn status(&mut self) -> Result<DaemonResponse, ClientError> {
341        let request = DaemonRequest {
342            id: self.next_request_id(),
343            r#type: RequestType::Status.into(),
344            protocol_version: 1,
345            client_name: String::from("running-process-client"),
346            status: Some(StatusRequest {}),
347            ..Default::default()
348        };
349        self.send_request(request)
350    }
351
352    /// List all active tracked processes.
353    pub fn list_active(&mut self) -> Result<DaemonResponse, ClientError> {
354        let request = DaemonRequest {
355            id: self.next_request_id(),
356            r#type: RequestType::ListActive.into(),
357            protocol_version: 1,
358            client_name: String::from("running-process-client"),
359            list_active: Some(ListActiveRequest {}),
360            ..Default::default()
361        };
362        self.send_request(request)
363    }
364
365    /// List tracked processes filtered by originator tool name.
366    pub fn list_by_originator(&mut self, tool: &str) -> Result<DaemonResponse, ClientError> {
367        let request = DaemonRequest {
368            id: self.next_request_id(),
369            r#type: RequestType::ListByOriginator.into(),
370            protocol_version: 1,
371            client_name: String::from("running-process-client"),
372            list_by_originator: Some(ListByOriginatorRequest {
373                tool: tool.to_string(),
374            }),
375            ..Default::default()
376        };
377        self.send_request(request)
378    }
379
380    /// Kill zombie processes tracked by the daemon.
381    pub fn kill_zombies(&mut self, dry_run: bool) -> Result<DaemonResponse, ClientError> {
382        let request = DaemonRequest {
383            id: self.next_request_id(),
384            r#type: RequestType::KillZombies.into(),
385            protocol_version: 1,
386            client_name: String::from("running-process-client"),
387            kill_zombies: Some(KillZombiesRequest { dry_run }),
388            ..Default::default()
389        };
390        self.send_request(request)
391    }
392
393    /// Kill a process tree rooted at `pid`.
394    pub fn kill_tree(
395        &mut self,
396        pid: u32,
397        timeout_seconds: f64,
398    ) -> Result<DaemonResponse, ClientError> {
399        let request = DaemonRequest {
400            id: self.next_request_id(),
401            r#type: RequestType::KillTree.into(),
402            protocol_version: 1,
403            client_name: String::from("running-process-client"),
404            kill_tree: Some(KillTreeRequest {
405                pid,
406                timeout_seconds,
407            }),
408            ..Default::default()
409        };
410        self.send_request(request)
411    }
412
413    /// Get the process tree display for a given PID.
414    pub fn get_process_tree(&mut self, pid: u32) -> Result<DaemonResponse, ClientError> {
415        let request = DaemonRequest {
416            id: self.next_request_id(),
417            r#type: RequestType::GetProcessTree.into(),
418            protocol_version: 1,
419            client_name: String::from("running-process-client"),
420            get_process_tree: Some(GetProcessTreeRequest { pid }),
421            ..Default::default()
422        };
423        self.send_request(request)
424    }
425
426    /// Ask the daemon to spawn and track a detached shell command.
427    pub fn spawn_command(
428        &mut self,
429        request: &SpawnCommandRequest,
430    ) -> Result<SpawnedDaemon, ClientError> {
431        let daemon_request = DaemonRequest {
432            id: self.next_request_id(),
433            r#type: RequestType::SpawnDaemon.into(),
434            protocol_version: 1,
435            client_name: String::from("running-process-client"),
436            spawn_daemon: Some(ProtoSpawnDaemonRequest {
437                command: request.command.clone(),
438                cwd: request
439                    .cwd
440                    .as_ref()
441                    .map(|cwd| cwd.to_string_lossy().into_owned())
442                    .unwrap_or_default(),
443                env: request
444                    .env
445                    .iter()
446                    .map(|(k, v)| KeyValue {
447                        key: k.clone(),
448                        value: v.clone(),
449                    })
450                    .collect(),
451                originator: request.originator.clone().unwrap_or_default(),
452                clear_inherited_env: request.clear_inherited_env,
453            }),
454            ..Default::default()
455        };
456
457        let response = self.send_request(daemon_request)?;
458        self.ensure_ok(&response)?;
459
460        let payload = response.spawn_daemon.ok_or_else(|| ClientError::Server {
461            code: StatusCode::Internal,
462            message: "spawn response missing payload".to_string(),
463        })?;
464
465        Ok(SpawnedDaemon {
466            pid: payload.pid,
467            created_at: payload.created_at,
468            command: payload.command,
469            cwd: if payload.cwd.is_empty() {
470                None
471            } else {
472                Some(payload.cwd)
473            },
474            originator: if payload.originator.is_empty() {
475                None
476            } else {
477                Some(payload.originator)
478            },
479            containment: payload.containment,
480        })
481    }
482
483    // --- service supervision (runpm) — Phase 1 ---
484
485    /// Start a supervised service from a [`ServiceConfig`].
486    pub fn service_start(&mut self, config: ServiceConfig) -> Result<DaemonResponse, ClientError> {
487        let request = DaemonRequest {
488            id: self.next_request_id(),
489            r#type: RequestType::ServiceStart.into(),
490            protocol_version: 1,
491            client_name: String::from("running-process-client"),
492            service_start: Some(ServiceStartRequest {
493                config: Some(config),
494            }),
495            ..Default::default()
496        };
497        self.send_request(request)
498    }
499
500    /// Stop a supervised service identified by name, id, or `"all"`.
501    pub fn service_stop(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
502        let request = DaemonRequest {
503            id: self.next_request_id(),
504            r#type: RequestType::ServiceStop.into(),
505            protocol_version: 1,
506            client_name: String::from("running-process-client"),
507            service_stop: Some(ServiceStopRequest {
508                target: target.to_string(),
509            }),
510            ..Default::default()
511        };
512        self.send_request(request)
513    }
514
515    /// Restart a supervised service identified by name, id, or `"all"`.
516    pub fn service_restart(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
517        let request = DaemonRequest {
518            id: self.next_request_id(),
519            r#type: RequestType::ServiceRestart.into(),
520            protocol_version: 1,
521            client_name: String::from("running-process-client"),
522            service_restart: Some(ServiceRestartRequest {
523                target: target.to_string(),
524            }),
525            ..Default::default()
526        };
527        self.send_request(request)
528    }
529
530    /// Delete a supervised service from the registry.
531    pub fn service_delete(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
532        let request = DaemonRequest {
533            id: self.next_request_id(),
534            r#type: RequestType::ServiceDelete.into(),
535            protocol_version: 1,
536            client_name: String::from("running-process-client"),
537            service_delete: Some(ServiceDeleteRequest {
538                target: target.to_string(),
539            }),
540            ..Default::default()
541        };
542        self.send_request(request)
543    }
544
545    /// List all supervised services known to the daemon.
546    pub fn service_list(&mut self) -> Result<DaemonResponse, ClientError> {
547        let request = DaemonRequest {
548            id: self.next_request_id(),
549            r#type: RequestType::ServiceList.into(),
550            protocol_version: 1,
551            client_name: String::from("running-process-client"),
552            service_list: Some(ServiceListRequest {}),
553            ..Default::default()
554        };
555        self.send_request(request)
556    }
557
558    /// Describe a single supervised service in detail.
559    pub fn service_describe(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
560        let request = DaemonRequest {
561            id: self.next_request_id(),
562            r#type: RequestType::ServiceDescribe.into(),
563            protocol_version: 1,
564            client_name: String::from("running-process-client"),
565            service_describe: Some(ServiceDescribeRequest {
566                target: target.to_string(),
567            }),
568            ..Default::default()
569        };
570        self.send_request(request)
571    }
572
573    /// Fetch buffered log output for a supervised service.
574    pub fn service_logs(
575        &mut self,
576        target: &str,
577        lines: u32,
578        follow: bool,
579    ) -> Result<DaemonResponse, ClientError> {
580        let request = DaemonRequest {
581            id: self.next_request_id(),
582            r#type: RequestType::ServiceLogs.into(),
583            protocol_version: 1,
584            client_name: String::from("running-process-client"),
585            service_logs: Some(ServiceLogsRequest {
586                target: target.to_string(),
587                lines,
588                follow,
589            }),
590            ..Default::default()
591        };
592        self.send_request(request)
593    }
594
595    /// Flush buffered logs for a supervised service.
596    pub fn service_flush(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
597        let request = DaemonRequest {
598            id: self.next_request_id(),
599            r#type: RequestType::ServiceFlush.into(),
600            protocol_version: 1,
601            client_name: String::from("running-process-client"),
602            service_flush: Some(ServiceFlushRequest {
603                target: target.to_string(),
604            }),
605            ..Default::default()
606        };
607        self.send_request(request)
608    }
609
610    /// Persist the current set of supervised services to a snapshot.
611    pub fn service_save(&mut self) -> Result<DaemonResponse, ClientError> {
612        let request = DaemonRequest {
613            id: self.next_request_id(),
614            r#type: RequestType::ServiceSave.into(),
615            protocol_version: 1,
616            client_name: String::from("running-process-client"),
617            service_save: Some(ServiceSaveRequest {}),
618            ..Default::default()
619        };
620        self.send_request(request)
621    }
622
623    /// Restore supervised services from the most recent snapshot.
624    pub fn service_resurrect(&mut self) -> Result<DaemonResponse, ClientError> {
625        let request = DaemonRequest {
626            id: self.next_request_id(),
627            r#type: RequestType::ServiceResurrect.into(),
628            protocol_version: 1,
629            client_name: String::from("running-process-client"),
630            service_resurrect: Some(ServiceResurrectRequest {}),
631            ..Default::default()
632        };
633        self.send_request(request)
634    }
635
636    /// Resize a PTY session without going through an attach
637    /// (#130 M5 follow-up). The new size persists for the lifetime of
638    /// the session; subsequent attaches can override it via their own
639    /// rows/cols fields.
640    pub fn resize_pty_session(
641        &mut self,
642        session_id: &str,
643        rows: u16,
644        cols: u16,
645    ) -> Result<(), ClientError> {
646        let request = DaemonRequest {
647            id: self.next_request_id(),
648            r#type: RequestType::ResizePtySession.into(),
649            protocol_version: 1,
650            client_name: String::from("running-process-client"),
651            resize_pty_session: Some(ResizePtySessionRequest {
652                session_id: session_id.into(),
653                rows: rows as u32,
654                cols: cols as u32,
655            }),
656            ..Default::default()
657        };
658        let response = self.send_request(request)?;
659        if response.code != StatusCode::Ok as i32 {
660            let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
661            return Err(ClientError::Server {
662                code,
663                message: response.message,
664            });
665        }
666        Ok(())
667    }
668
669    /// Purge exited sessions from both daemon-side registries (#130 M9
670    /// H4). Returns counts of PTY and pipe sessions reaped.
671    pub fn purge_exited_sessions(
672        &mut self,
673        originator: &str,
674    ) -> Result<PurgeExitedSessionsResponse, ClientError> {
675        let request = DaemonRequest {
676            id: self.next_request_id(),
677            r#type: RequestType::PurgeExitedSessions.into(),
678            protocol_version: 1,
679            client_name: String::from("running-process-client"),
680            purge_exited_sessions: Some(PurgeExitedSessionsRequest {
681                originator: originator.into(),
682            }),
683            ..Default::default()
684        };
685        let response = self.send_request(request)?;
686        if response.code != StatusCode::Ok as i32 {
687            let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
688            return Err(ClientError::Server {
689                code,
690                message: response.message,
691            });
692        }
693        response
694            .purge_exited_sessions
695            .ok_or_else(|| ClientError::Server {
696                code: StatusCode::Internal,
697                message: "purge_exited_sessions response missing payload".into(),
698            })
699    }
700
701    /// Schedule termination of every session older than the threshold
702    /// (#130 M9 H4). `older_than_secs=0` terminates everything in scope.
703    pub fn bulk_terminate_sessions(
704        &mut self,
705        older_than_secs: u64,
706        originator: &str,
707        grace_ms: u32,
708    ) -> Result<BulkTerminateSessionsResponse, ClientError> {
709        let request = DaemonRequest {
710            id: self.next_request_id(),
711            r#type: RequestType::BulkTerminateSessions.into(),
712            protocol_version: 1,
713            client_name: String::from("running-process-client"),
714            bulk_terminate_sessions: Some(BulkTerminateSessionsRequest {
715                older_than_secs,
716                originator: originator.into(),
717                grace_ms,
718            }),
719            ..Default::default()
720        };
721        let response = self.send_request(request)?;
722        if response.code != StatusCode::Ok as i32 {
723            let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
724            return Err(ClientError::Server {
725                code,
726                message: response.message,
727            });
728        }
729        response
730            .bulk_terminate_sessions
731            .ok_or_else(|| ClientError::Server {
732                code: StatusCode::Internal,
733                message: "bulk_terminate_sessions response missing payload".into(),
734            })
735    }
736
737    /// Snapshot a PTY or pipe session's output backlog without consuming
738    /// it. For pipe sessions, `pipe_stream` selects between stdout and
739    /// stderr (default stdout). For PTY sessions `pipe_stream` is ignored.
740    /// Returns `None` when the session is not found.
741    pub fn get_session_backlog(
742        &mut self,
743        session_id: &str,
744        pipe_stream: PipeStreamKind,
745    ) -> Result<Option<GetSessionBacklogResponse>, ClientError> {
746        let request = DaemonRequest {
747            id: self.next_request_id(),
748            r#type: RequestType::GetSessionBacklog.into(),
749            protocol_version: 1,
750            client_name: String::from("running-process-client"),
751            get_session_backlog: Some(GetSessionBacklogRequest {
752                session_id: session_id.into(),
753                pipe_stream: pipe_stream as i32,
754            }),
755            ..Default::default()
756        };
757        let response = self.send_request(request)?;
758        if response.code == StatusCode::NotFound as i32 {
759            return Ok(None);
760        }
761        if response.code != StatusCode::Ok as i32 {
762            let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
763            return Err(ClientError::Server {
764                code,
765                message: response.message,
766            });
767        }
768        Ok(response.get_session_backlog)
769    }
770}
771
772// ---------------------------------------------------------------------------
773// Auto-start logic
774// ---------------------------------------------------------------------------
775
776/// Connect to the daemon, starting it first if it is not running.
777///
778/// 1. Attempt to connect.
779/// 2. On failure, spawn `running-process-daemon start` as a detached process.
780/// 3. Retry with exponential back-off: 50 ms, 100 ms, 200 ms, 400 ms.
781/// 4. Return an error if the daemon cannot be reached after all retries.
782pub fn connect_or_start(scope_hash: Option<&str>) -> Result<DaemonClient, ClientError> {
783    // Fast path: daemon already running.
784    if let Ok(client) = DaemonClient::connect(scope_hash) {
785        return Ok(client);
786    }
787
788    // Spawn the daemon as a detached background process.
789    spawn_daemon()?;
790
791    // Retry with exponential back-off.
792    //
793    // #199: intentional — the daemon binds its socket asynchronously
794    // after `spawn_daemon()` returns. There's no event the OS can
795    // signal us with when the socket is ready, so we poll. Exponential
796    // back-off (50→100→200→400ms) is the standard pattern; total
797    // wait caps at 750ms.
798    let delays_ms: [u64; 4] = [50, 100, 200, 400];
799    for delay in delays_ms {
800        std::thread::sleep(std::time::Duration::from_millis(delay));
801        if let Ok(client) = DaemonClient::connect(scope_hash) {
802            return Ok(client);
803        }
804    }
805
806    Err(ClientError::DaemonNotRunning)
807}
808
809/// Launch a detached shell command through the running-process daemon.
810///
811/// The daemon owns process tracking after launch, so this helper returns as
812/// soon as the child has been spawned and registered.
813pub fn launch_detached(command: &str) -> Result<SpawnedDaemon, ClientError> {
814    let mut client = connect_or_start(None)?;
815    client.spawn_command(&SpawnCommandRequest::shell(command))
816}
817
818/// Convenience helper that connects to the daemon and asks it to daemonize
819/// the provided shell command under the caller's current cwd/environment.
820///
821/// Prefer [`launch_detached`] in new code; this name is kept for existing
822/// callers.
823pub fn daemonize_command(command: &str) -> Result<SpawnedDaemon, ClientError> {
824    launch_detached(command)
825}
826
827/// Spawn the daemon binary as a detached background process.
828fn spawn_daemon() -> Result<(), ClientError> {
829    let exe = daemon_exe_path();
830
831    #[cfg(windows)]
832    {
833        use std::os::windows::process::CommandExt;
834        // DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP
835        const DETACHED: u32 = 0x0000_0008 | 0x0000_0200;
836        std::process::Command::new(&exe)
837            .arg("start")
838            .creation_flags(DETACHED)
839            .stdin(std::process::Stdio::null())
840            .stdout(std::process::Stdio::null())
841            .stderr(std::process::Stdio::null())
842            .spawn()
843            .map_err(ClientError::Io)?;
844    }
845
846    #[cfg(unix)]
847    {
848        std::process::Command::new(&exe)
849            .arg("start")
850            .stdin(std::process::Stdio::null())
851            .stdout(std::process::Stdio::null())
852            .stderr(std::process::Stdio::null())
853            .spawn()
854            .map_err(ClientError::Io)?;
855    }
856
857    Ok(())
858}
859
860/// Determine the path to the daemon executable.
861///
862/// Looks next to the current executable first, then falls back to expecting
863/// it on `$PATH`.
864fn daemon_exe_path() -> String {
865    if let Ok(mut path) = std::env::current_exe() {
866        path.pop(); // remove current binary name
867        let candidate = path.join(if cfg!(windows) {
868            "running-process-daemon.exe"
869        } else {
870            "running-process-daemon"
871        });
872        if candidate.exists() {
873            return candidate.to_string_lossy().into_owned();
874        }
875    }
876    // Fallback: assume it is on PATH.
877    String::from("running-process-daemon")
878}
879
880#[cfg(test)]
881mod tests {
882    use super::*;
883
884    #[test]
885    fn launch_detached_has_public_sync_signature() {
886        let _api: fn(&str) -> Result<SpawnedDaemon, ClientError> = launch_detached;
887    }
888
889    #[test]
890    fn spawn_command_request_builder_sets_detached_launch_context() {
891        let request = SpawnCommandRequest::shell("echo hello")
892            .with_cwd("work")
893            .with_envs([("A", "1")])
894            .with_env("B", "2")
895            .with_originator("tool:123");
896
897        assert_eq!(request.command, "echo hello");
898        assert_eq!(request.cwd.as_deref(), Some(std::path::Path::new("work")));
899        assert_eq!(
900            request.env,
901            vec![
902                ("A".to_string(), "1".to_string()),
903                ("B".to_string(), "2".to_string())
904            ]
905        );
906        assert_eq!(request.originator.as_deref(), Some("tool:123"));
907    }
908}