Skip to main content

running_process_client/
pipe_session.rs

1//! Client-side helpers for daemon-owned pipe-backed sessions
2//! (issue #130 milestone 3).
3//!
4//! Mirrors [`crate::pty_session`] for the pipe case. Sessions are spawned,
5//! listed, detached, and terminated via the regular [`DaemonClient`] RPC
6//! channel. Stdin is also an RPC (`write_pipe_stdin`). Stdout/stderr are
7//! attached via [`PipeStreamAttachment`], which owns its own connection
8//! and pumps `PipeStreamFrame` payloads.
9
10use crate::client::{ClientError, DaemonClient};
11use crate::paths;
12use interprocess::local_socket::Stream;
13use interprocess::TryClone;
14use prost::Message;
15use running_process_proto::daemon::{
16    AttachPipeStreamRequest, AttachPipeStreamResponse, DaemonRequest, DaemonResponse,
17    DetachPipeStreamRequest, KeyValue, ListPipeSessionsRequest, ListPipeSessionsResponse,
18    PipeSessionInfo, PipeStreamFrame, PipeStreamKind, RequestType, SpawnPipeSessionRequest,
19    SpawnPipeSessionResponse, StatusCode, TerminatePipeSessionRequest, WritePipeStdinRequest,
20    WritePipeStdinResponse,
21};
22use std::io::{BufReader, BufWriter, Read, Write};
23use std::path::PathBuf;
24
25// ---------------------------------------------------------------------------
26// Spawn / list / terminate / write helpers
27// ---------------------------------------------------------------------------
28
29#[derive(Debug, Clone)]
30pub struct PipeSpawnRequest {
31    pub argv: Vec<String>,
32    pub cwd: Option<PathBuf>,
33    pub env: Vec<(String, String)>,
34    pub clear_inherited_env: bool,
35    pub originator: Option<String>,
36    pub merge_stderr_into_stdout: bool,
37}
38
39impl PipeSpawnRequest {
40    pub fn new<S: Into<String>>(argv: impl IntoIterator<Item = S>) -> Self {
41        Self {
42            argv: argv.into_iter().map(Into::into).collect(),
43            cwd: None,
44            env: Vec::new(),
45            clear_inherited_env: false,
46            originator: None,
47            merge_stderr_into_stdout: false,
48        }
49    }
50
51    pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
52        self.cwd = Some(cwd.into());
53        self
54    }
55
56    pub fn with_originator(mut self, originator: impl Into<String>) -> Self {
57        self.originator = Some(originator.into());
58        self
59    }
60
61    pub fn merge_stderr(mut self) -> Self {
62        self.merge_stderr_into_stdout = true;
63        self
64    }
65
66    pub fn with_envs<I, K, V>(mut self, env: I) -> Self
67    where
68        I: IntoIterator<Item = (K, V)>,
69        K: Into<String>,
70        V: Into<String>,
71    {
72        self.env = env
73            .into_iter()
74            .map(|(k, v)| (k.into(), v.into()))
75            .collect();
76        self
77    }
78}
79
80#[derive(Debug, Clone)]
81pub struct SpawnedPipeSession {
82    pub session_id: String,
83    pub pid: u32,
84    pub created_at: f64,
85}
86
87impl DaemonClient {
88    pub fn spawn_pipe_session(
89        &mut self,
90        request: &PipeSpawnRequest,
91    ) -> Result<SpawnedPipeSession, ClientError> {
92        let proto = SpawnPipeSessionRequest {
93            argv: request.argv.clone(),
94            cwd: request
95                .cwd
96                .as_ref()
97                .map(|p| p.to_string_lossy().into_owned())
98                .unwrap_or_default(),
99            env: request
100                .env
101                .iter()
102                .map(|(k, v)| KeyValue {
103                    key: k.clone(),
104                    value: v.clone(),
105                })
106                .collect(),
107            clear_inherited_env: request.clear_inherited_env,
108            originator: request.originator.clone().unwrap_or_default(),
109            merge_stderr_into_stdout: request.merge_stderr_into_stdout,
110        };
111        let daemon_request = DaemonRequest {
112            id: self.next_request_id(),
113            r#type: RequestType::SpawnPipeSession.into(),
114            protocol_version: 1,
115            client_name: "running-process-client".into(),
116            spawn_pipe_session: Some(proto),
117            ..Default::default()
118        };
119        let response = self.send_request(daemon_request)?;
120        ensure_ok(&response)?;
121        let payload: SpawnPipeSessionResponse = response
122            .spawn_pipe_session
123            .ok_or_else(|| ClientError::Server {
124                code: StatusCode::Internal,
125                message: "spawn_pipe_session response missing payload".into(),
126            })?;
127        Ok(SpawnedPipeSession {
128            session_id: payload.session_id,
129            pid: payload.pid,
130            created_at: payload.created_at,
131        })
132    }
133
134    pub fn list_pipe_sessions(
135        &mut self,
136        originator_filter: &str,
137    ) -> Result<Vec<PipeSessionInfo>, ClientError> {
138        let req = DaemonRequest {
139            id: self.next_request_id(),
140            r#type: RequestType::ListPipeSessions.into(),
141            protocol_version: 1,
142            client_name: "running-process-client".into(),
143            list_pipe_sessions: Some(ListPipeSessionsRequest {
144                originator: originator_filter.into(),
145            }),
146            ..Default::default()
147        };
148        let response = self.send_request(req)?;
149        ensure_ok(&response)?;
150        let payload: ListPipeSessionsResponse = response
151            .list_pipe_sessions
152            .ok_or_else(|| ClientError::Server {
153                code: StatusCode::Internal,
154                message: "list_pipe_sessions response missing payload".into(),
155            })?;
156        Ok(payload.sessions)
157    }
158
159    pub fn detach_pipe_stream(
160        &mut self,
161        session_id: &str,
162        stream: PipeStreamKind,
163    ) -> Result<(), ClientError> {
164        let req = DaemonRequest {
165            id: self.next_request_id(),
166            r#type: RequestType::DetachPipeStream.into(),
167            protocol_version: 1,
168            client_name: "running-process-client".into(),
169            detach_pipe_stream: Some(DetachPipeStreamRequest {
170                session_id: session_id.into(),
171                stream: stream as i32,
172            }),
173            ..Default::default()
174        };
175        let response = self.send_request(req)?;
176        ensure_ok(&response)?;
177        Ok(())
178    }
179
180    pub fn terminate_pipe_session(
181        &mut self,
182        session_id: &str,
183        grace_ms: u32,
184    ) -> Result<(), ClientError> {
185        let req = DaemonRequest {
186            id: self.next_request_id(),
187            r#type: RequestType::TerminatePipeSession.into(),
188            protocol_version: 1,
189            client_name: "running-process-client".into(),
190            terminate_pipe_session: Some(TerminatePipeSessionRequest {
191                session_id: session_id.into(),
192                grace_ms,
193            }),
194            ..Default::default()
195        };
196        let response = self.send_request(req)?;
197        ensure_ok(&response)?;
198        Ok(())
199    }
200
201    pub fn write_pipe_stdin(
202        &mut self,
203        session_id: &str,
204        data: &[u8],
205        close_after: bool,
206    ) -> Result<u64, ClientError> {
207        let req = DaemonRequest {
208            id: self.next_request_id(),
209            r#type: RequestType::WritePipeStdin.into(),
210            protocol_version: 1,
211            client_name: "running-process-client".into(),
212            write_pipe_stdin: Some(WritePipeStdinRequest {
213                session_id: session_id.into(),
214                data: data.to_vec(),
215                close: close_after,
216            }),
217            ..Default::default()
218        };
219        let response = self.send_request(req)?;
220        ensure_ok(&response)?;
221        let payload: WritePipeStdinResponse = response
222            .write_pipe_stdin
223            .ok_or_else(|| ClientError::Server {
224                code: StatusCode::Internal,
225                message: "write_pipe_stdin response missing payload".into(),
226            })?;
227        Ok(payload.bytes_written)
228    }
229}
230
231fn ensure_ok(response: &DaemonResponse) -> Result<(), ClientError> {
232    if response.code == StatusCode::Ok as i32 {
233        return Ok(());
234    }
235    let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
236    Err(ClientError::Server {
237        code,
238        message: response.message.clone(),
239    })
240}
241
242// ---------------------------------------------------------------------------
243// PipeStreamAttachment
244// ---------------------------------------------------------------------------
245
246pub struct PipeStreamAttachment {
247    reader: BufReader<Stream>,
248    pub initial_backlog: Vec<u8>,
249    pub bytes_missed: u64,
250}
251
252#[derive(Debug)]
253pub enum PipeAttachError {
254    Connect(std::io::Error),
255    Io(std::io::Error),
256    Decode(prost::DecodeError),
257    Server { code: StatusCode, message: String },
258    MissingPayload,
259}
260
261impl std::fmt::Display for PipeAttachError {
262    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263        match self {
264            Self::Connect(e) => write!(f, "pipe attach connect failed: {e}"),
265            Self::Io(e) => write!(f, "pipe attach io error: {e}"),
266            Self::Decode(e) => write!(f, "pipe attach decode error: {e}"),
267            Self::Server { code, message } => {
268                write!(f, "pipe attach server error {code:?}: {message}")
269            }
270            Self::MissingPayload => write!(f, "pipe attach response missing payload"),
271        }
272    }
273}
274
275impl std::error::Error for PipeAttachError {}
276
277impl PipeStreamAttachment {
278    pub fn attach(
279        scope_hash: Option<&str>,
280        session_id: &str,
281        stream: PipeStreamKind,
282        steal: bool,
283    ) -> Result<Self, PipeAttachError> {
284        let socket_path = paths::socket_path(scope_hash);
285        Self::attach_to(&socket_path, session_id, stream, steal)
286    }
287
288    pub fn attach_to(
289        socket_path: &str,
290        session_id: &str,
291        stream: PipeStreamKind,
292        steal: bool,
293    ) -> Result<Self, PipeAttachError> {
294        let name = paths::make_socket_name(socket_path).map_err(PipeAttachError::Connect)?;
295        use interprocess::local_socket::traits::Stream as _;
296        let s = Stream::connect(name).map_err(PipeAttachError::Connect)?;
297        let s_clone = s.try_clone().map_err(PipeAttachError::Connect)?;
298        let mut reader = BufReader::new(s);
299        let mut writer = BufWriter::new(s_clone);
300
301        let attach_request = DaemonRequest {
302            id: 1,
303            r#type: RequestType::AttachPipeStream.into(),
304            protocol_version: 1,
305            client_name: "running-process-client".into(),
306            attach_pipe_stream: Some(AttachPipeStreamRequest {
307                session_id: session_id.into(),
308                stream: stream as i32,
309                steal,
310            }),
311            ..Default::default()
312        };
313        write_length_prefixed(&mut writer, &attach_request.encode_to_vec())
314            .map_err(PipeAttachError::Io)?;
315        // We do not need writer after this, but keep it alive via reader's
316        // duplex socket. Drop here.
317        drop(writer);
318
319        let response_bytes = read_length_prefixed(&mut reader).map_err(PipeAttachError::Io)?;
320        let response = DaemonResponse::decode(&response_bytes[..]).map_err(PipeAttachError::Decode)?;
321        if response.code != StatusCode::Ok as i32 {
322            let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
323            return Err(PipeAttachError::Server {
324                code,
325                message: response.message,
326            });
327        }
328        let payload: AttachPipeStreamResponse = response
329            .attach_pipe_stream
330            .ok_or(PipeAttachError::MissingPayload)?;
331
332        Ok(Self {
333            reader,
334            initial_backlog: payload.backlog,
335            bytes_missed: payload.bytes_missed,
336        })
337    }
338
339    pub fn recv_frame(&mut self) -> Result<PipeStreamFrame, PipeAttachError> {
340        let bytes = read_length_prefixed(&mut self.reader).map_err(PipeAttachError::Io)?;
341        PipeStreamFrame::decode(&bytes[..]).map_err(PipeAttachError::Decode)
342    }
343}
344
345fn write_length_prefixed<W: Write>(w: &mut W, payload: &[u8]) -> Result<(), std::io::Error> {
346    let len = payload.len() as u32;
347    w.write_all(&len.to_be_bytes())?;
348    w.write_all(payload)?;
349    w.flush()
350}
351
352fn read_length_prefixed<R: Read>(r: &mut R) -> Result<Vec<u8>, std::io::Error> {
353    let mut len_buf = [0u8; 4];
354    r.read_exact(&mut len_buf)?;
355    let len = u32::from_be_bytes(len_buf) as usize;
356    let mut buf = vec![0u8; len];
357    r.read_exact(&mut buf)?;
358    Ok(buf)
359}