Skip to main content

running_process/client/
pipe_session.rs

1//! Client-side helpers for daemon-owned pipe-backed sessions
2//! (issue #130 milestone 3).
3//!
4//! Mirrors [`crate::client::pty_session`] for the pipe case. Sessions are
5//! spawned, listed, detached, and terminated via the regular [`DaemonClient`] RPC
6//! channel. Stdin is also an RPC (`write_pipe_stdin`). Stdout/stderr are
7//! attached via [`PipeStreamAttachment`], which owns its own connection
8//! and pumps `PipeStreamFrame` payloads.
9
10use crate::client::paths;
11use crate::client::{ClientError, DaemonClient};
12use crate::proto::daemon::{
13    AttachPipeStreamRequest, AttachPipeStreamResponse, DaemonRequest, DaemonResponse,
14    DetachPipeStreamRequest, KeyValue, ListPipeSessionsRequest, ListPipeSessionsResponse,
15    PipeSessionInfo, PipeStreamFrame, PipeStreamKind, RequestType, SpawnPipeSessionRequest,
16    SpawnPipeSessionResponse, StatusCode, TerminatePipeSessionRequest, WritePipeStdinRequest,
17    WritePipeStdinResponse,
18};
19use interprocess::local_socket::Stream;
20use interprocess::TryClone;
21use prost::Message;
22use std::io::{BufReader, BufWriter, Read, Write};
23use std::path::PathBuf;
24
25// ---------------------------------------------------------------------------
26// Spawn / list / terminate / write helpers
27// ---------------------------------------------------------------------------
28
29/// Request shape for spawning a daemon-owned pipe-backed session.
30#[derive(Debug, Clone)]
31pub struct PipeSpawnRequest {
32    /// Program and arguments. The first element is the executable.
33    pub argv: Vec<String>,
34    /// Working directory for the child. `None` leaves the daemon default in effect.
35    pub cwd: Option<PathBuf>,
36    /// Environment variables to overlay onto the inherited environment.
37    pub env: Vec<(String, String)>,
38    /// Start from an empty environment before applying [`Self::env`].
39    pub clear_inherited_env: bool,
40    /// Optional label used by list filters. `None` lets the daemon assign one.
41    pub originator: Option<String>,
42    /// Merge stderr into stdout instead of keeping a separately attachable stderr stream.
43    pub merge_stderr_into_stdout: bool,
44}
45
46impl PipeSpawnRequest {
47    /// Create a spawn request from argv with inherited environment and separate stderr.
48    pub fn new<S: Into<String>>(argv: impl IntoIterator<Item = S>) -> Self {
49        Self {
50            argv: argv.into_iter().map(Into::into).collect(),
51            cwd: None,
52            env: Vec::new(),
53            clear_inherited_env: false,
54            originator: None,
55            merge_stderr_into_stdout: false,
56        }
57    }
58
59    /// Set the working directory for the child process.
60    pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
61        self.cwd = Some(cwd.into());
62        self
63    }
64
65    /// Set the originator label stored with the session.
66    pub fn with_originator(mut self, originator: impl Into<String>) -> Self {
67        self.originator = Some(originator.into());
68        self
69    }
70
71    /// Merge stderr into stdout for this session.
72    pub fn merge_stderr(mut self) -> Self {
73        self.merge_stderr_into_stdout = true;
74        self
75    }
76
77    /// Replace the environment overlay applied when spawning the child.
78    pub fn with_envs<I, K, V>(mut self, env: I) -> Self
79    where
80        I: IntoIterator<Item = (K, V)>,
81        K: Into<String>,
82        V: Into<String>,
83    {
84        self.env = env.into_iter().map(|(k, v)| (k.into(), v.into())).collect();
85        self
86    }
87}
88
89/// Reply summary for a successful pipe session spawn.
90#[derive(Debug, Clone)]
91pub struct SpawnedPipeSession {
92    /// Daemon-assigned session identifier used by later pipe session RPCs.
93    pub session_id: String,
94    /// Operating-system process ID for the spawned child.
95    pub pid: u32,
96    /// Daemon-recorded creation time as seconds since the Unix epoch.
97    pub created_at: f64,
98}
99
100impl DaemonClient {
101    /// Ask the daemon to spawn a new pipe-backed child process.
102    pub fn spawn_pipe_session(
103        &mut self,
104        request: &PipeSpawnRequest,
105    ) -> Result<SpawnedPipeSession, ClientError> {
106        let proto = SpawnPipeSessionRequest {
107            argv: request.argv.clone(),
108            cwd: request
109                .cwd
110                .as_ref()
111                .map(|p| p.to_string_lossy().into_owned())
112                .unwrap_or_default(),
113            env: request
114                .env
115                .iter()
116                .map(|(k, v)| KeyValue {
117                    key: k.clone(),
118                    value: v.clone(),
119                })
120                .collect(),
121            clear_inherited_env: request.clear_inherited_env,
122            originator: request.originator.clone().unwrap_or_default(),
123            merge_stderr_into_stdout: request.merge_stderr_into_stdout,
124        };
125        let daemon_request = DaemonRequest {
126            id: self.next_request_id(),
127            r#type: RequestType::SpawnPipeSession.into(),
128            protocol_version: 1,
129            client_name: "running-process-client".into(),
130            spawn_pipe_session: Some(proto),
131            ..Default::default()
132        };
133        let response = self.send_request(daemon_request)?;
134        ensure_ok(&response)?;
135        let payload: SpawnPipeSessionResponse =
136            response
137                .spawn_pipe_session
138                .ok_or_else(|| ClientError::Server {
139                    code: StatusCode::Internal,
140                    message: "spawn_pipe_session response missing payload".into(),
141                })?;
142        Ok(SpawnedPipeSession {
143            session_id: payload.session_id,
144            pid: payload.pid,
145            created_at: payload.created_at,
146        })
147    }
148
149    /// List pipe sessions known to the daemon.
150    ///
151    /// An empty `originator_filter` returns all sessions in the current daemon scope.
152    pub fn list_pipe_sessions(
153        &mut self,
154        originator_filter: &str,
155    ) -> Result<Vec<PipeSessionInfo>, ClientError> {
156        let req = DaemonRequest {
157            id: self.next_request_id(),
158            r#type: RequestType::ListPipeSessions.into(),
159            protocol_version: 1,
160            client_name: "running-process-client".into(),
161            list_pipe_sessions: Some(ListPipeSessionsRequest {
162                originator: originator_filter.into(),
163            }),
164            ..Default::default()
165        };
166        let response = self.send_request(req)?;
167        ensure_ok(&response)?;
168        let payload: ListPipeSessionsResponse =
169            response
170                .list_pipe_sessions
171                .ok_or_else(|| ClientError::Server {
172                    code: StatusCode::Internal,
173                    message: "list_pipe_sessions response missing payload".into(),
174                })?;
175        Ok(payload.sessions)
176    }
177
178    /// Detach any current attachment from one pipe output stream.
179    ///
180    /// The session remains alive and the stream can be attached again later.
181    pub fn detach_pipe_stream(
182        &mut self,
183        session_id: &str,
184        stream: PipeStreamKind,
185    ) -> Result<(), ClientError> {
186        let req = DaemonRequest {
187            id: self.next_request_id(),
188            r#type: RequestType::DetachPipeStream.into(),
189            protocol_version: 1,
190            client_name: "running-process-client".into(),
191            detach_pipe_stream: Some(DetachPipeStreamRequest {
192                session_id: session_id.into(),
193                stream: stream as i32,
194            }),
195            ..Default::default()
196        };
197        let response = self.send_request(req)?;
198        ensure_ok(&response)?;
199        Ok(())
200    }
201
202    /// Schedule termination of a pipe session.
203    ///
204    /// The daemon accepts `0` as its default grace period before hard kill.
205    pub fn terminate_pipe_session(
206        &mut self,
207        session_id: &str,
208        grace_ms: u32,
209    ) -> Result<(), ClientError> {
210        let req = DaemonRequest {
211            id: self.next_request_id(),
212            r#type: RequestType::TerminatePipeSession.into(),
213            protocol_version: 1,
214            client_name: "running-process-client".into(),
215            terminate_pipe_session: Some(TerminatePipeSessionRequest {
216                session_id: session_id.into(),
217                grace_ms,
218            }),
219            ..Default::default()
220        };
221        let response = self.send_request(req)?;
222        ensure_ok(&response)?;
223        Ok(())
224    }
225
226    /// Write bytes to a session's stdin pipe.
227    ///
228    /// When `close_after` is true, the daemon closes stdin after writing `data`.
229    pub fn write_pipe_stdin(
230        &mut self,
231        session_id: &str,
232        data: &[u8],
233        close_after: bool,
234    ) -> Result<u64, ClientError> {
235        let req = DaemonRequest {
236            id: self.next_request_id(),
237            r#type: RequestType::WritePipeStdin.into(),
238            protocol_version: 1,
239            client_name: "running-process-client".into(),
240            write_pipe_stdin: Some(WritePipeStdinRequest {
241                session_id: session_id.into(),
242                data: data.to_vec(),
243                close: close_after,
244            }),
245            ..Default::default()
246        };
247        let response = self.send_request(req)?;
248        ensure_ok(&response)?;
249        let payload: WritePipeStdinResponse =
250            response
251                .write_pipe_stdin
252                .ok_or_else(|| ClientError::Server {
253                    code: StatusCode::Internal,
254                    message: "write_pipe_stdin response missing payload".into(),
255                })?;
256        Ok(payload.bytes_written)
257    }
258}
259
260fn ensure_ok(response: &DaemonResponse) -> Result<(), ClientError> {
261    if response.code == StatusCode::Ok as i32 {
262        return Ok(());
263    }
264    let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
265    Err(ClientError::Server {
266        code,
267        message: response.message.clone(),
268    })
269}
270
271// ---------------------------------------------------------------------------
272// PipeStreamAttachment
273// ---------------------------------------------------------------------------
274
275/// Active attachment to one stdout or stderr stream of a pipe-backed session.
276///
277/// Owns the socket after it switches into one-way streaming mode.
278pub struct PipeStreamAttachment {
279    reader: BufReader<Stream>,
280    /// Bytes from the stream backlog that the client missed before attaching.
281    pub initial_backlog: Vec<u8>,
282    /// Cumulative bytes dropped from the daemon's backlog before this attachment.
283    pub bytes_missed: u64,
284}
285
286/// Errors from opening or reading a pipe stream attachment.
287#[derive(Debug)]
288pub enum PipeAttachError {
289    /// Opening the daemon socket failed.
290    Connect(std::io::Error),
291    /// Reading or writing the length-prefixed socket stream failed.
292    Io(std::io::Error),
293    /// Decoding a daemon response or stream frame failed.
294    Decode(prost::DecodeError),
295    /// The daemon rejected the attach request.
296    Server {
297        /// Server status code returned by the daemon.
298        code: StatusCode,
299        /// Human-readable error message returned by the daemon.
300        message: String,
301    },
302    /// The daemon accepted the attach request but omitted its payload.
303    MissingPayload,
304}
305
306impl std::fmt::Display for PipeAttachError {
307    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
308        match self {
309            Self::Connect(e) => write!(f, "pipe attach connect failed: {e}"),
310            Self::Io(e) => write!(f, "pipe attach io error: {e}"),
311            Self::Decode(e) => write!(f, "pipe attach decode error: {e}"),
312            Self::Server { code, message } => {
313                write!(f, "pipe attach server error {code:?}: {message}")
314            }
315            Self::MissingPayload => write!(f, "pipe attach response missing payload"),
316        }
317    }
318}
319
320impl std::error::Error for PipeAttachError {}
321
322impl PipeStreamAttachment {
323    /// Open the scoped daemon socket and attach to a session output stream.
324    ///
325    /// When `steal` is true, the daemon evicts any existing attachment on the same stream.
326    pub fn attach(
327        scope_hash: Option<&str>,
328        session_id: &str,
329        stream: PipeStreamKind,
330        steal: bool,
331    ) -> Result<Self, PipeAttachError> {
332        let socket_path = paths::socket_path(scope_hash);
333        Self::attach_to(&socket_path, session_id, stream, steal)
334    }
335
336    /// Open an explicit daemon socket path and attach to a session output stream.
337    ///
338    /// When `steal` is true, the daemon evicts any existing attachment on the same stream.
339    pub fn attach_to(
340        socket_path: &str,
341        session_id: &str,
342        stream: PipeStreamKind,
343        steal: bool,
344    ) -> Result<Self, PipeAttachError> {
345        let name = paths::make_socket_name(socket_path).map_err(PipeAttachError::Connect)?;
346        use interprocess::local_socket::traits::Stream as _;
347        let s = Stream::connect(name).map_err(PipeAttachError::Connect)?;
348        let s_clone = s.try_clone().map_err(PipeAttachError::Connect)?;
349        let mut reader = BufReader::new(s);
350        let mut writer = BufWriter::new(s_clone);
351
352        let attach_request = DaemonRequest {
353            id: 1,
354            r#type: RequestType::AttachPipeStream.into(),
355            protocol_version: 1,
356            client_name: "running-process-client".into(),
357            attach_pipe_stream: Some(AttachPipeStreamRequest {
358                session_id: session_id.into(),
359                stream: stream as i32,
360                steal,
361            }),
362            ..Default::default()
363        };
364        write_length_prefixed(&mut writer, &attach_request.encode_to_vec())
365            .map_err(PipeAttachError::Io)?;
366        // We do not need writer after this, but keep it alive via reader's
367        // duplex socket. Drop here.
368        drop(writer);
369
370        let response_bytes = read_length_prefixed(&mut reader).map_err(PipeAttachError::Io)?;
371        let response =
372            DaemonResponse::decode(&response_bytes[..]).map_err(PipeAttachError::Decode)?;
373        if response.code != StatusCode::Ok as i32 {
374            let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
375            return Err(PipeAttachError::Server {
376                code,
377                message: response.message,
378            });
379        }
380        let payload: AttachPipeStreamResponse = response
381            .attach_pipe_stream
382            .ok_or(PipeAttachError::MissingPayload)?;
383
384        Ok(Self {
385            reader,
386            initial_backlog: payload.backlog,
387            bytes_missed: payload.bytes_missed,
388        })
389    }
390
391    /// Block until the next stream frame arrives.
392    pub fn recv_frame(&mut self) -> Result<PipeStreamFrame, PipeAttachError> {
393        let bytes = read_length_prefixed(&mut self.reader).map_err(PipeAttachError::Io)?;
394        PipeStreamFrame::decode(&bytes[..]).map_err(PipeAttachError::Decode)
395    }
396}
397
398fn write_length_prefixed<W: Write>(w: &mut W, payload: &[u8]) -> Result<(), std::io::Error> {
399    let len = payload.len() as u32;
400    w.write_all(&len.to_be_bytes())?;
401    w.write_all(payload)?;
402    w.flush()
403}
404
405fn read_length_prefixed<R: Read>(r: &mut R) -> Result<Vec<u8>, std::io::Error> {
406    let mut len_buf = [0u8; 4];
407    r.read_exact(&mut len_buf)?;
408    let len = u32::from_be_bytes(len_buf) as usize;
409    let mut buf = vec![0u8; len];
410    r.read_exact(&mut buf)?;
411    Ok(buf)
412}