Skip to main content

running_process_client/
client.rs

1//! Synchronous IPC client for the running-process daemon.
2//!
3//! Connects to the daemon over a local socket (Unix domain socket on
4//! Linux/macOS, named pipe on Windows) and exchanges length-prefixed protobuf
5//! messages.
6
7use crate::paths;
8use interprocess::local_socket::Stream;
9use interprocess::TryClone;
10use prost::Message;
11use running_process_proto::daemon::{
12    BulkTerminateSessionsRequest, BulkTerminateSessionsResponse, DaemonRequest, DaemonResponse,
13    GetProcessTreeRequest, GetSessionBacklogRequest, GetSessionBacklogResponse, KeyValue,
14    KillTreeRequest, KillZombiesRequest, ListActiveRequest, ListByOriginatorRequest, PingRequest,
15    PipeStreamKind, PurgeExitedSessionsRequest, PurgeExitedSessionsResponse, RequestType,
16    ResizePtySessionRequest, ServiceConfig, ServiceDeleteRequest, ServiceDescribeRequest,
17    ServiceFlushRequest, ServiceListRequest, ServiceLogsRequest, ServiceRestartRequest,
18    ServiceResurrectRequest, ServiceSaveRequest, ServiceStartRequest, ServiceStopRequest,
19    ShutdownRequest, SpawnDaemonRequest as ProtoSpawnDaemonRequest, StatusCode, StatusRequest,
20};
21use std::io::{BufReader, BufWriter, Read, Write};
22use std::path::PathBuf;
23use std::sync::atomic::{AtomicU64, Ordering};
24
25// ---------------------------------------------------------------------------
26// Error type
27// ---------------------------------------------------------------------------
28
29/// Errors produced by [`DaemonClient`] operations.
30#[derive(Debug)]
31pub enum ClientError {
32    /// Failed to connect to the daemon socket.
33    Connect(std::io::Error),
34    /// I/O error during send or receive.
35    Io(std::io::Error),
36    /// Failed to decode a protobuf response.
37    Decode(prost::DecodeError),
38    /// The daemon returned an application-level error response.
39    Server { code: StatusCode, message: String },
40    /// The daemon is not running and could not be started.
41    DaemonNotRunning,
42}
43
44impl std::fmt::Display for ClientError {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        match self {
47            ClientError::Connect(e) => write!(f, "failed to connect to daemon: {e}"),
48            ClientError::Io(e) => write!(f, "daemon I/O error: {e}"),
49            ClientError::Decode(e) => write!(f, "failed to decode daemon response: {e}"),
50            ClientError::Server { code, message } => {
51                write!(f, "daemon returned {:?}: {}", code, message)
52            }
53            ClientError::DaemonNotRunning => write!(f, "daemon is not running"),
54        }
55    }
56}
57
58impl std::error::Error for ClientError {
59    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
60        match self {
61            ClientError::Connect(e) | ClientError::Io(e) => Some(e),
62            ClientError::Decode(e) => Some(e),
63            ClientError::Server { .. } | ClientError::DaemonNotRunning => None,
64        }
65    }
66}
67
68// ---------------------------------------------------------------------------
69// Spawn API
70// ---------------------------------------------------------------------------
71
72/// Request to spawn a detached daemonized shell command under daemon control.
73#[derive(Debug, Clone)]
74pub struct SpawnCommandRequest {
75    pub command: String,
76    pub cwd: Option<PathBuf>,
77    pub env: Vec<(String, String)>,
78    pub originator: Option<String>,
79    /// When `true`, the daemon clears the inherited env before applying
80    /// [`Self::env`], so the subprocess sees ONLY the supplied map.
81    /// Mirrors Python's `subprocess.Popen(env=…)` replace semantic.
82    /// Default `false` keeps the historic "layer on top of inherited"
83    /// behaviour.
84    pub clear_inherited_env: bool,
85}
86
87impl SpawnCommandRequest {
88    fn default_originator() -> String {
89        let caller = std::env::current_exe()
90            .ok()
91            .and_then(|path| {
92                path.file_stem()
93                    .map(|stem| stem.to_string_lossy().into_owned())
94            })
95            .filter(|value| !value.is_empty())
96            .unwrap_or_else(|| "running-process-client".to_string());
97        format!("{caller}:{}", std::process::id())
98    }
99
100    /// Build a shell-command request using the caller's current working
101    /// directory and environment.
102    pub fn shell(command: impl Into<String>) -> Self {
103        Self {
104            command: command.into(),
105            cwd: std::env::current_dir().ok(),
106            env: std::env::vars().collect(),
107            originator: Some(Self::default_originator()),
108            clear_inherited_env: false,
109        }
110    }
111
112    /// Override the working directory used for the spawned command.
113    pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
114        self.cwd = Some(cwd.into());
115        self
116    }
117
118    /// Replace the environment block sent to the daemon (layered on top
119    /// of the daemon's inherited env, unless [`Self::with_env_replace`]
120    /// is used instead).
121    pub fn with_envs<I, K, V>(mut self, env: I) -> Self
122    where
123        I: IntoIterator<Item = (K, V)>,
124        K: Into<String>,
125        V: Into<String>,
126    {
127        self.env = env
128            .into_iter()
129            .map(|(key, value)| (key.into(), value.into()))
130            .collect();
131        self
132    }
133
134    /// Set the env block AND tell the daemon to clear the inherited
135    /// env first — the subprocess will see ONLY the supplied map.
136    ///
137    /// Mirrors Python's `subprocess.Popen(env=…)` semantic:
138    ///
139    /// ```python
140    /// subprocess.Popen(["..."], env=None)        # inherits
141    /// subprocess.Popen(["..."], env={"K": "V"})  # replaces
142    /// ```
143    ///
144    /// On Windows you typically still want to include `SystemRoot` in
145    /// the supplied map so `cmd.exe` can load its DLLs.
146    pub fn with_env_replace<I, K, V>(mut self, env: I) -> Self
147    where
148        I: IntoIterator<Item = (K, V)>,
149        K: Into<String>,
150        V: Into<String>,
151    {
152        self.env = env
153            .into_iter()
154            .map(|(key, value)| (key.into(), value.into()))
155            .collect();
156        self.clear_inherited_env = true;
157        self
158    }
159
160    /// Add or replace a single environment variable while keeping the rest
161    /// of the existing environment block intact.
162    pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
163        let key = key.into();
164        let value = value.into();
165        if let Some((_, existing)) = self
166            .env
167            .iter_mut()
168            .find(|(existing_key, _)| *existing_key == key)
169        {
170            *existing = value;
171        } else {
172            self.env.push((key, value));
173        }
174        self
175    }
176
177    /// Set the originator value stored in the daemon registry and injected
178    /// into the spawned child environment.
179    pub fn with_originator(mut self, originator: impl Into<String>) -> Self {
180        self.originator = Some(originator.into());
181        self
182    }
183}
184
185/// Information about a daemonized process spawned by the service.
186#[derive(Debug, Clone, PartialEq)]
187pub struct SpawnedDaemon {
188    pub pid: u32,
189    pub created_at: f64,
190    pub command: String,
191    pub cwd: Option<String>,
192    pub originator: Option<String>,
193    pub containment: String,
194}
195
196// ---------------------------------------------------------------------------
197// Client
198// ---------------------------------------------------------------------------
199
200/// Synchronous IPC client that communicates with the daemon over a local socket.
201///
202/// Messages are framed with a 4-byte big-endian length prefix followed by
203/// a protobuf-encoded payload.
204pub struct DaemonClient {
205    reader: BufReader<Stream>,
206    writer: BufWriter<Stream>,
207    next_id: AtomicU64,
208}
209
210impl DaemonClient {
211    /// Connect to a running daemon identified by an optional scope hash.
212    ///
213    /// The socket path is computed by [`paths::socket_path`] and the name type
214    /// dispatch matches the server via [`paths::make_socket_name`].
215    pub fn connect(scope_hash: Option<&str>) -> Result<Self, ClientError> {
216        let path = paths::socket_path(scope_hash);
217        Self::connect_to(&path)
218    }
219
220    /// Connect to a daemon listening at an explicit socket path.
221    ///
222    /// Use this when you already know the socket path (e.g. in integration
223    /// tests that start a server on a unique path).
224    pub fn connect_to(socket_path: &str) -> Result<Self, ClientError> {
225        let name = paths::make_socket_name(socket_path).map_err(ClientError::Connect)?;
226
227        use interprocess::local_socket::traits::Stream as _;
228        let stream = Stream::connect(name).map_err(ClientError::Connect)?;
229        let stream_clone = stream.try_clone().map_err(ClientError::Connect)?;
230
231        Ok(Self {
232            reader: BufReader::new(stream),
233            writer: BufWriter::new(stream_clone),
234            next_id: AtomicU64::new(1),
235        })
236    }
237
238    /// Send a request and wait for the corresponding response.
239    ///
240    /// The request is length-prefixed (4-byte big-endian u32) then protobuf-encoded.
241    /// The response uses the same framing.
242    pub fn send_request(&mut self, request: DaemonRequest) -> Result<DaemonResponse, ClientError> {
243        // Encode
244        let payload = request.encode_to_vec();
245        let len = payload.len() as u32;
246
247        // Write length prefix + payload
248        self.writer
249            .write_all(&len.to_be_bytes())
250            .map_err(ClientError::Io)?;
251        self.writer.write_all(&payload).map_err(ClientError::Io)?;
252        self.writer.flush().map_err(ClientError::Io)?;
253
254        // Read length prefix
255        let mut len_buf = [0u8; 4];
256        self.reader
257            .read_exact(&mut len_buf)
258            .map_err(ClientError::Io)?;
259        let resp_len = u32::from_be_bytes(len_buf) as usize;
260
261        // Read payload
262        let mut resp_buf = vec![0u8; resp_len];
263        self.reader
264            .read_exact(&mut resp_buf)
265            .map_err(ClientError::Io)?;
266
267        DaemonResponse::decode(&resp_buf[..]).map_err(ClientError::Decode)
268    }
269
270    // -----------------------------------------------------------------------
271    // Convenience helpers
272    // -----------------------------------------------------------------------
273
274    /// Allocate the next request ID.
275    pub(crate) fn next_request_id(&self) -> u64 {
276        self.next_id.fetch_add(1, Ordering::Relaxed)
277    }
278
279    fn ensure_ok(&self, response: &DaemonResponse) -> Result<(), ClientError> {
280        if response.code == StatusCode::Ok as i32 {
281            return Ok(());
282        }
283
284        let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
285        Err(ClientError::Server {
286            code,
287            message: response.message.clone(),
288        })
289    }
290
291    /// Ping the daemon to check liveness.
292    pub fn ping(&mut self) -> Result<DaemonResponse, ClientError> {
293        let request = DaemonRequest {
294            id: self.next_request_id(),
295            r#type: RequestType::Ping.into(),
296            protocol_version: 1,
297            client_name: String::from("running-process-client"),
298            ping: Some(PingRequest {}),
299            ..Default::default()
300        };
301        self.send_request(request)
302    }
303
304    /// Ask the daemon to shut down.
305    pub fn shutdown(
306        &mut self,
307        graceful: bool,
308        timeout_seconds: f64,
309    ) -> Result<DaemonResponse, ClientError> {
310        let request = DaemonRequest {
311            id: self.next_request_id(),
312            r#type: RequestType::Shutdown.into(),
313            protocol_version: 1,
314            client_name: String::from("running-process-client"),
315            shutdown: Some(ShutdownRequest {
316                graceful,
317                timeout_seconds,
318            }),
319            ..Default::default()
320        };
321        self.send_request(request)
322    }
323
324    /// Query daemon status.
325    pub fn status(&mut self) -> Result<DaemonResponse, ClientError> {
326        let request = DaemonRequest {
327            id: self.next_request_id(),
328            r#type: RequestType::Status.into(),
329            protocol_version: 1,
330            client_name: String::from("running-process-client"),
331            status: Some(StatusRequest {}),
332            ..Default::default()
333        };
334        self.send_request(request)
335    }
336
337    /// List all active tracked processes.
338    pub fn list_active(&mut self) -> Result<DaemonResponse, ClientError> {
339        let request = DaemonRequest {
340            id: self.next_request_id(),
341            r#type: RequestType::ListActive.into(),
342            protocol_version: 1,
343            client_name: String::from("running-process-client"),
344            list_active: Some(ListActiveRequest {}),
345            ..Default::default()
346        };
347        self.send_request(request)
348    }
349
350    /// List tracked processes filtered by originator tool name.
351    pub fn list_by_originator(&mut self, tool: &str) -> Result<DaemonResponse, ClientError> {
352        let request = DaemonRequest {
353            id: self.next_request_id(),
354            r#type: RequestType::ListByOriginator.into(),
355            protocol_version: 1,
356            client_name: String::from("running-process-client"),
357            list_by_originator: Some(ListByOriginatorRequest {
358                tool: tool.to_string(),
359            }),
360            ..Default::default()
361        };
362        self.send_request(request)
363    }
364
365    /// Kill zombie processes tracked by the daemon.
366    pub fn kill_zombies(&mut self, dry_run: bool) -> Result<DaemonResponse, ClientError> {
367        let request = DaemonRequest {
368            id: self.next_request_id(),
369            r#type: RequestType::KillZombies.into(),
370            protocol_version: 1,
371            client_name: String::from("running-process-client"),
372            kill_zombies: Some(KillZombiesRequest { dry_run }),
373            ..Default::default()
374        };
375        self.send_request(request)
376    }
377
378    /// Kill a process tree rooted at `pid`.
379    pub fn kill_tree(
380        &mut self,
381        pid: u32,
382        timeout_seconds: f64,
383    ) -> Result<DaemonResponse, ClientError> {
384        let request = DaemonRequest {
385            id: self.next_request_id(),
386            r#type: RequestType::KillTree.into(),
387            protocol_version: 1,
388            client_name: String::from("running-process-client"),
389            kill_tree: Some(KillTreeRequest {
390                pid,
391                timeout_seconds,
392            }),
393            ..Default::default()
394        };
395        self.send_request(request)
396    }
397
398    /// Get the process tree display for a given PID.
399    pub fn get_process_tree(&mut self, pid: u32) -> Result<DaemonResponse, ClientError> {
400        let request = DaemonRequest {
401            id: self.next_request_id(),
402            r#type: RequestType::GetProcessTree.into(),
403            protocol_version: 1,
404            client_name: String::from("running-process-client"),
405            get_process_tree: Some(GetProcessTreeRequest { pid }),
406            ..Default::default()
407        };
408        self.send_request(request)
409    }
410
411    /// Ask the daemon to spawn and track a detached shell command.
412    pub fn spawn_command(
413        &mut self,
414        request: &SpawnCommandRequest,
415    ) -> Result<SpawnedDaemon, ClientError> {
416        let daemon_request = DaemonRequest {
417            id: self.next_request_id(),
418            r#type: RequestType::SpawnDaemon.into(),
419            protocol_version: 1,
420            client_name: String::from("running-process-client"),
421            spawn_daemon: Some(ProtoSpawnDaemonRequest {
422                command: request.command.clone(),
423                cwd: request
424                    .cwd
425                    .as_ref()
426                    .map(|cwd| cwd.to_string_lossy().into_owned())
427                    .unwrap_or_default(),
428                env: request
429                    .env
430                    .iter()
431                    .map(|(k, v)| KeyValue {
432                        key: k.clone(),
433                        value: v.clone(),
434                    })
435                    .collect(),
436                originator: request.originator.clone().unwrap_or_default(),
437                clear_inherited_env: request.clear_inherited_env,
438            }),
439            ..Default::default()
440        };
441
442        let response = self.send_request(daemon_request)?;
443        self.ensure_ok(&response)?;
444
445        let payload = response.spawn_daemon.ok_or_else(|| ClientError::Server {
446            code: StatusCode::Internal,
447            message: "spawn response missing payload".to_string(),
448        })?;
449
450        Ok(SpawnedDaemon {
451            pid: payload.pid,
452            created_at: payload.created_at,
453            command: payload.command,
454            cwd: if payload.cwd.is_empty() {
455                None
456            } else {
457                Some(payload.cwd)
458            },
459            originator: if payload.originator.is_empty() {
460                None
461            } else {
462                Some(payload.originator)
463            },
464            containment: payload.containment,
465        })
466    }
467
468    // --- service supervision (runpm) — Phase 1 ---
469
470    /// Start a supervised service from a [`ServiceConfig`].
471    pub fn service_start(&mut self, config: ServiceConfig) -> Result<DaemonResponse, ClientError> {
472        let request = DaemonRequest {
473            id: self.next_request_id(),
474            r#type: RequestType::ServiceStart.into(),
475            protocol_version: 1,
476            client_name: String::from("running-process-client"),
477            service_start: Some(ServiceStartRequest {
478                config: Some(config),
479            }),
480            ..Default::default()
481        };
482        self.send_request(request)
483    }
484
485    /// Stop a supervised service identified by name, id, or `"all"`.
486    pub fn service_stop(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
487        let request = DaemonRequest {
488            id: self.next_request_id(),
489            r#type: RequestType::ServiceStop.into(),
490            protocol_version: 1,
491            client_name: String::from("running-process-client"),
492            service_stop: Some(ServiceStopRequest {
493                target: target.to_string(),
494            }),
495            ..Default::default()
496        };
497        self.send_request(request)
498    }
499
500    /// Restart a supervised service identified by name, id, or `"all"`.
501    pub fn service_restart(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
502        let request = DaemonRequest {
503            id: self.next_request_id(),
504            r#type: RequestType::ServiceRestart.into(),
505            protocol_version: 1,
506            client_name: String::from("running-process-client"),
507            service_restart: Some(ServiceRestartRequest {
508                target: target.to_string(),
509            }),
510            ..Default::default()
511        };
512        self.send_request(request)
513    }
514
515    /// Delete a supervised service from the registry.
516    pub fn service_delete(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
517        let request = DaemonRequest {
518            id: self.next_request_id(),
519            r#type: RequestType::ServiceDelete.into(),
520            protocol_version: 1,
521            client_name: String::from("running-process-client"),
522            service_delete: Some(ServiceDeleteRequest {
523                target: target.to_string(),
524            }),
525            ..Default::default()
526        };
527        self.send_request(request)
528    }
529
530    /// List all supervised services known to the daemon.
531    pub fn service_list(&mut self) -> Result<DaemonResponse, ClientError> {
532        let request = DaemonRequest {
533            id: self.next_request_id(),
534            r#type: RequestType::ServiceList.into(),
535            protocol_version: 1,
536            client_name: String::from("running-process-client"),
537            service_list: Some(ServiceListRequest {}),
538            ..Default::default()
539        };
540        self.send_request(request)
541    }
542
543    /// Describe a single supervised service in detail.
544    pub fn service_describe(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
545        let request = DaemonRequest {
546            id: self.next_request_id(),
547            r#type: RequestType::ServiceDescribe.into(),
548            protocol_version: 1,
549            client_name: String::from("running-process-client"),
550            service_describe: Some(ServiceDescribeRequest {
551                target: target.to_string(),
552            }),
553            ..Default::default()
554        };
555        self.send_request(request)
556    }
557
558    /// Fetch buffered log output for a supervised service.
559    pub fn service_logs(
560        &mut self,
561        target: &str,
562        lines: u32,
563        follow: bool,
564    ) -> Result<DaemonResponse, ClientError> {
565        let request = DaemonRequest {
566            id: self.next_request_id(),
567            r#type: RequestType::ServiceLogs.into(),
568            protocol_version: 1,
569            client_name: String::from("running-process-client"),
570            service_logs: Some(ServiceLogsRequest {
571                target: target.to_string(),
572                lines,
573                follow,
574            }),
575            ..Default::default()
576        };
577        self.send_request(request)
578    }
579
580    /// Flush buffered logs for a supervised service.
581    pub fn service_flush(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
582        let request = DaemonRequest {
583            id: self.next_request_id(),
584            r#type: RequestType::ServiceFlush.into(),
585            protocol_version: 1,
586            client_name: String::from("running-process-client"),
587            service_flush: Some(ServiceFlushRequest {
588                target: target.to_string(),
589            }),
590            ..Default::default()
591        };
592        self.send_request(request)
593    }
594
595    /// Persist the current set of supervised services to a snapshot.
596    pub fn service_save(&mut self) -> Result<DaemonResponse, ClientError> {
597        let request = DaemonRequest {
598            id: self.next_request_id(),
599            r#type: RequestType::ServiceSave.into(),
600            protocol_version: 1,
601            client_name: String::from("running-process-client"),
602            service_save: Some(ServiceSaveRequest {}),
603            ..Default::default()
604        };
605        self.send_request(request)
606    }
607
608    /// Restore supervised services from the most recent snapshot.
609    pub fn service_resurrect(&mut self) -> Result<DaemonResponse, ClientError> {
610        let request = DaemonRequest {
611            id: self.next_request_id(),
612            r#type: RequestType::ServiceResurrect.into(),
613            protocol_version: 1,
614            client_name: String::from("running-process-client"),
615            service_resurrect: Some(ServiceResurrectRequest {}),
616            ..Default::default()
617        };
618        self.send_request(request)
619    }
620
621    /// Resize a PTY session without going through an attach
622    /// (#130 M5 follow-up). The new size persists for the lifetime of
623    /// the session; subsequent attaches can override it via their own
624    /// rows/cols fields.
625    pub fn resize_pty_session(
626        &mut self,
627        session_id: &str,
628        rows: u16,
629        cols: u16,
630    ) -> Result<(), ClientError> {
631        let request = DaemonRequest {
632            id: self.next_request_id(),
633            r#type: RequestType::ResizePtySession.into(),
634            protocol_version: 1,
635            client_name: String::from("running-process-client"),
636            resize_pty_session: Some(ResizePtySessionRequest {
637                session_id: session_id.into(),
638                rows: rows as u32,
639                cols: cols as u32,
640            }),
641            ..Default::default()
642        };
643        let response = self.send_request(request)?;
644        if response.code != StatusCode::Ok as i32 {
645            let code =
646                StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
647            return Err(ClientError::Server {
648                code,
649                message: response.message,
650            });
651        }
652        Ok(())
653    }
654
655    /// Purge exited sessions from both daemon-side registries (#130 M9
656    /// H4). Returns counts of PTY and pipe sessions reaped.
657    pub fn purge_exited_sessions(
658        &mut self,
659        originator: &str,
660    ) -> Result<PurgeExitedSessionsResponse, ClientError> {
661        let request = DaemonRequest {
662            id: self.next_request_id(),
663            r#type: RequestType::PurgeExitedSessions.into(),
664            protocol_version: 1,
665            client_name: String::from("running-process-client"),
666            purge_exited_sessions: Some(PurgeExitedSessionsRequest {
667                originator: originator.into(),
668            }),
669            ..Default::default()
670        };
671        let response = self.send_request(request)?;
672        if response.code != StatusCode::Ok as i32 {
673            let code =
674                StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
675            return Err(ClientError::Server {
676                code,
677                message: response.message,
678            });
679        }
680        response
681            .purge_exited_sessions
682            .ok_or_else(|| ClientError::Server {
683                code: StatusCode::Internal,
684                message: "purge_exited_sessions response missing payload".into(),
685            })
686    }
687
688    /// Schedule termination of every session older than the threshold
689    /// (#130 M9 H4). `older_than_secs=0` terminates everything in scope.
690    pub fn bulk_terminate_sessions(
691        &mut self,
692        older_than_secs: u64,
693        originator: &str,
694        grace_ms: u32,
695    ) -> Result<BulkTerminateSessionsResponse, ClientError> {
696        let request = DaemonRequest {
697            id: self.next_request_id(),
698            r#type: RequestType::BulkTerminateSessions.into(),
699            protocol_version: 1,
700            client_name: String::from("running-process-client"),
701            bulk_terminate_sessions: Some(BulkTerminateSessionsRequest {
702                older_than_secs,
703                originator: originator.into(),
704                grace_ms,
705            }),
706            ..Default::default()
707        };
708        let response = self.send_request(request)?;
709        if response.code != StatusCode::Ok as i32 {
710            let code =
711                StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
712            return Err(ClientError::Server {
713                code,
714                message: response.message,
715            });
716        }
717        response
718            .bulk_terminate_sessions
719            .ok_or_else(|| ClientError::Server {
720                code: StatusCode::Internal,
721                message: "bulk_terminate_sessions response missing payload".into(),
722            })
723    }
724
725    /// Snapshot a PTY or pipe session's output backlog without consuming
726    /// it. For pipe sessions, `pipe_stream` selects between stdout and
727    /// stderr (default stdout). For PTY sessions `pipe_stream` is ignored.
728    /// Returns `None` when the session is not found.
729    pub fn get_session_backlog(
730        &mut self,
731        session_id: &str,
732        pipe_stream: PipeStreamKind,
733    ) -> Result<Option<GetSessionBacklogResponse>, ClientError> {
734        let request = DaemonRequest {
735            id: self.next_request_id(),
736            r#type: RequestType::GetSessionBacklog.into(),
737            protocol_version: 1,
738            client_name: String::from("running-process-client"),
739            get_session_backlog: Some(GetSessionBacklogRequest {
740                session_id: session_id.into(),
741                pipe_stream: pipe_stream as i32,
742            }),
743            ..Default::default()
744        };
745        let response = self.send_request(request)?;
746        if response.code == StatusCode::NotFound as i32 {
747            return Ok(None);
748        }
749        if response.code != StatusCode::Ok as i32 {
750            let code =
751                StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
752            return Err(ClientError::Server {
753                code,
754                message: response.message,
755            });
756        }
757        Ok(response.get_session_backlog)
758    }
759}
760
761// ---------------------------------------------------------------------------
762// Auto-start logic
763// ---------------------------------------------------------------------------
764
765/// Connect to the daemon, starting it first if it is not running.
766///
767/// 1. Attempt to connect.
768/// 2. On failure, spawn `running-process-daemon start` as a detached process.
769/// 3. Retry with exponential back-off: 50 ms, 100 ms, 200 ms, 400 ms.
770/// 4. Return an error if the daemon cannot be reached after all retries.
771pub fn connect_or_start(scope_hash: Option<&str>) -> Result<DaemonClient, ClientError> {
772    // Fast path: daemon already running.
773    if let Ok(client) = DaemonClient::connect(scope_hash) {
774        return Ok(client);
775    }
776
777    // Spawn the daemon as a detached background process.
778    spawn_daemon()?;
779
780    // Retry with exponential back-off.
781    let delays_ms: [u64; 4] = [50, 100, 200, 400];
782    for delay in delays_ms {
783        std::thread::sleep(std::time::Duration::from_millis(delay));
784        if let Ok(client) = DaemonClient::connect(scope_hash) {
785            return Ok(client);
786        }
787    }
788
789    Err(ClientError::DaemonNotRunning)
790}
791
792/// Launch a detached shell command through the running-process daemon.
793///
794/// The daemon owns process tracking after launch, so this helper returns as
795/// soon as the child has been spawned and registered.
796pub fn launch_detached(command: &str) -> Result<SpawnedDaemon, ClientError> {
797    let mut client = connect_or_start(None)?;
798    client.spawn_command(&SpawnCommandRequest::shell(command))
799}
800
801/// Convenience helper that connects to the daemon and asks it to daemonize
802/// the provided shell command under the caller's current cwd/environment.
803///
804/// Prefer [`launch_detached`] in new code; this name is kept for existing
805/// callers.
806pub fn daemonize_command(command: &str) -> Result<SpawnedDaemon, ClientError> {
807    launch_detached(command)
808}
809
810/// Spawn the daemon binary as a detached background process.
811fn spawn_daemon() -> Result<(), ClientError> {
812    let exe = daemon_exe_path();
813
814    #[cfg(windows)]
815    {
816        use std::os::windows::process::CommandExt;
817        // DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP
818        const DETACHED: u32 = 0x0000_0008 | 0x0000_0200;
819        std::process::Command::new(&exe)
820            .arg("start")
821            .creation_flags(DETACHED)
822            .stdin(std::process::Stdio::null())
823            .stdout(std::process::Stdio::null())
824            .stderr(std::process::Stdio::null())
825            .spawn()
826            .map_err(ClientError::Io)?;
827    }
828
829    #[cfg(unix)]
830    {
831        std::process::Command::new(&exe)
832            .arg("start")
833            .stdin(std::process::Stdio::null())
834            .stdout(std::process::Stdio::null())
835            .stderr(std::process::Stdio::null())
836            .spawn()
837            .map_err(ClientError::Io)?;
838    }
839
840    Ok(())
841}
842
843/// Determine the path to the daemon executable.
844///
845/// Looks next to the current executable first, then falls back to expecting
846/// it on `$PATH`.
847fn daemon_exe_path() -> String {
848    if let Ok(mut path) = std::env::current_exe() {
849        path.pop(); // remove current binary name
850        let candidate = path.join(if cfg!(windows) {
851            "running-process-daemon.exe"
852        } else {
853            "running-process-daemon"
854        });
855        if candidate.exists() {
856            return candidate.to_string_lossy().into_owned();
857        }
858    }
859    // Fallback: assume it is on PATH.
860    String::from("running-process-daemon")
861}
862
863#[cfg(test)]
864mod tests {
865    use super::*;
866
867    #[test]
868    fn launch_detached_has_public_sync_signature() {
869        let _api: fn(&str) -> Result<SpawnedDaemon, ClientError> = launch_detached;
870    }
871
872    #[test]
873    fn spawn_command_request_builder_sets_detached_launch_context() {
874        let request = SpawnCommandRequest::shell("echo hello")
875            .with_cwd("work")
876            .with_envs([("A", "1")])
877            .with_env("B", "2")
878            .with_originator("tool:123");
879
880        assert_eq!(request.command, "echo hello");
881        assert_eq!(request.cwd.as_deref(), Some(std::path::Path::new("work")));
882        assert_eq!(
883            request.env,
884            vec![
885                ("A".to_string(), "1".to_string()),
886                ("B".to_string(), "2".to_string())
887            ]
888        );
889        assert_eq!(request.originator.as_deref(), Some("tool:123"));
890    }
891}