Skip to main content

running_process_client/
pty_session.rs

1//! Client-side helpers for daemon-owned detachable PTY sessions
2//! (issue #130 milestone 2).
3//!
4//! Sessions are spawned and listed via the regular [`DaemonClient`] RPC
5//! channel. Attach is special: after the daemon responds with
6//! `AttachPtySessionResponse` the same socket switches into a streaming
7//! mode that carries [`PtyStreamFrame`] (daemon → client) and
8//! [`PtyInputFrame`] (client → daemon) messages. [`PtyAttachment`] owns the
9//! socket for the lifetime of that stream and exposes blocking
10//! send/receive helpers suitable for tests and small clients. Async
11//! clients can build on top of [`DaemonClient::attach_pty_session_raw`].
12
13use crate::client::{ClientError, DaemonClient};
14use crate::paths;
15use interprocess::local_socket::Stream;
16use interprocess::TryClone;
17use prost::Message;
18use running_process_proto::daemon::{
19    pty_input_frame::Frame as InputOneof, AttachPtySessionRequest, AttachPtySessionResponse,
20    DaemonRequest, DaemonResponse, DetachPtySessionRequest, KeyValue, ListPtySessionsRequest,
21    ListPtySessionsResponse, PtyInputFrame, PtyResize, PtySessionInfo, PtyStreamFrame, RequestType,
22    SpawnPtySessionRequest, SpawnPtySessionResponse, StatusCode, TerminatePtySessionRequest,
23};
24use std::io::{BufReader, BufWriter, Read, Write};
25use std::path::PathBuf;
26use std::time::Duration;
27
28// ---------------------------------------------------------------------------
29// Spawn / list / terminate convenience builders
30// ---------------------------------------------------------------------------
31
32/// Request shape for spawning a daemon-owned PTY session.
33#[derive(Debug, Clone)]
34pub struct PtySpawnRequest {
35    pub argv: Vec<String>,
36    pub cwd: Option<PathBuf>,
37    pub env: Vec<(String, String)>,
38    pub clear_inherited_env: bool,
39    pub rows: u16,
40    pub cols: u16,
41    pub originator: Option<String>,
42}
43
44impl PtySpawnRequest {
45    pub fn new<S: Into<String>>(argv: impl IntoIterator<Item = S>) -> Self {
46        Self {
47            argv: argv.into_iter().map(Into::into).collect(),
48            cwd: None,
49            env: Vec::new(),
50            clear_inherited_env: false,
51            rows: 24,
52            cols: 80,
53            originator: None,
54        }
55    }
56
57    pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
58        self.cwd = Some(cwd.into());
59        self
60    }
61
62    pub fn with_size(mut self, rows: u16, cols: u16) -> Self {
63        self.rows = rows;
64        self.cols = cols;
65        self
66    }
67
68    pub fn with_originator(mut self, originator: impl Into<String>) -> Self {
69        self.originator = Some(originator.into());
70        self
71    }
72
73    pub fn with_envs<I, K, V>(mut self, env: I) -> Self
74    where
75        I: IntoIterator<Item = (K, V)>,
76        K: Into<String>,
77        V: Into<String>,
78    {
79        self.env = env
80            .into_iter()
81            .map(|(k, v)| (k.into(), v.into()))
82            .collect();
83        self
84    }
85}
86
87/// Reply summary for a successful spawn.
88#[derive(Debug, Clone)]
89pub struct SpawnedPtySession {
90    pub session_id: String,
91    pub pid: u32,
92    pub created_at: f64,
93}
94
95impl DaemonClient {
96    /// Ask the daemon to spawn a new PTY session that it owns.
97    pub fn spawn_pty_session(
98        &mut self,
99        request: &PtySpawnRequest,
100    ) -> Result<SpawnedPtySession, ClientError> {
101        let proto = SpawnPtySessionRequest {
102            argv: request.argv.clone(),
103            cwd: request
104                .cwd
105                .as_ref()
106                .map(|p| p.to_string_lossy().into_owned())
107                .unwrap_or_default(),
108            env: request
109                .env
110                .iter()
111                .map(|(k, v)| KeyValue {
112                    key: k.clone(),
113                    value: v.clone(),
114                })
115                .collect(),
116            clear_inherited_env: request.clear_inherited_env,
117            rows: request.rows as u32,
118            cols: request.cols as u32,
119            originator: request.originator.clone().unwrap_or_default(),
120        };
121
122        let daemon_request = DaemonRequest {
123            id: self.next_request_id(),
124            r#type: RequestType::SpawnPtySession.into(),
125            protocol_version: 1,
126            client_name: "running-process-client".into(),
127            spawn_pty_session: Some(proto),
128            ..Default::default()
129        };
130
131        let response = self.send_request(daemon_request)?;
132        ensure_ok(&response)?;
133        let payload: SpawnPtySessionResponse =
134            response.spawn_pty_session.ok_or_else(|| ClientError::Server {
135                code: StatusCode::Internal,
136                message: "spawn_pty_session response missing payload".into(),
137            })?;
138        Ok(SpawnedPtySession {
139            session_id: payload.session_id,
140            pid: payload.pid,
141            created_at: payload.created_at,
142        })
143    }
144
145    /// List PTY sessions known to the daemon. Empty `originator_filter`
146    /// returns all sessions in scope.
147    pub fn list_pty_sessions(
148        &mut self,
149        originator_filter: &str,
150    ) -> Result<Vec<PtySessionInfo>, ClientError> {
151        let req = DaemonRequest {
152            id: self.next_request_id(),
153            r#type: RequestType::ListPtySessions.into(),
154            protocol_version: 1,
155            client_name: "running-process-client".into(),
156            list_pty_sessions: Some(ListPtySessionsRequest {
157                originator: originator_filter.into(),
158            }),
159            ..Default::default()
160        };
161        let response = self.send_request(req)?;
162        ensure_ok(&response)?;
163        let payload: ListPtySessionsResponse = response
164            .list_pty_sessions
165            .ok_or_else(|| ClientError::Server {
166                code: StatusCode::Internal,
167                message: "list_pty_sessions response missing payload".into(),
168            })?;
169        Ok(payload.sessions)
170    }
171
172    /// Ask the daemon to detach any current attachment from a session,
173    /// leaving the session alive. Idempotent.
174    pub fn detach_pty_session(&mut self, session_id: &str) -> Result<(), ClientError> {
175        let req = DaemonRequest {
176            id: self.next_request_id(),
177            r#type: RequestType::DetachPtySession.into(),
178            protocol_version: 1,
179            client_name: "running-process-client".into(),
180            detach_pty_session: Some(DetachPtySessionRequest {
181                session_id: session_id.into(),
182            }),
183            ..Default::default()
184        };
185        let response = self.send_request(req)?;
186        ensure_ok(&response)?;
187        Ok(())
188    }
189
190    /// Schedule termination of a PTY session. Returns as soon as the
191    /// daemon accepts the schedule; the actual termination happens on a
192    /// daemon background task (soft signal, grace, then hard kill).
193    pub fn terminate_pty_session(
194        &mut self,
195        session_id: &str,
196        grace_ms: u32,
197    ) -> Result<(), ClientError> {
198        let req = DaemonRequest {
199            id: self.next_request_id(),
200            r#type: RequestType::TerminatePtySession.into(),
201            protocol_version: 1,
202            client_name: "running-process-client".into(),
203            terminate_pty_session: Some(TerminatePtySessionRequest {
204                session_id: session_id.into(),
205                grace_ms,
206            }),
207            ..Default::default()
208        };
209        let response = self.send_request(req)?;
210        ensure_ok(&response)?;
211        Ok(())
212    }
213
214}
215
216fn ensure_ok(response: &DaemonResponse) -> Result<(), ClientError> {
217    if response.code == StatusCode::Ok as i32 {
218        return Ok(());
219    }
220    let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
221    Err(ClientError::Server {
222        code,
223        message: response.message.clone(),
224    })
225}
226
227// ---------------------------------------------------------------------------
228// PtyAttachment
229// ---------------------------------------------------------------------------
230
231/// Active attachment to a daemon-owned PTY session.
232///
233/// Owns the socket; the connection is in streaming mode and cannot be used
234/// for unrelated RPCs.
235pub struct PtyAttachment {
236    reader: BufReader<Stream>,
237    writer: BufWriter<Stream>,
238    /// Bytes received in the initial AttachPtySessionResponse (output the
239    /// client missed before attach succeeded).
240    pub initial_backlog: Vec<u8>,
241    /// Cumulative bytes dropped from the daemon's ring buffer before this
242    /// attach. Zero if the buffer never overflowed.
243    pub bytes_missed: u64,
244}
245
246/// Errors specific to attach.
247#[derive(Debug)]
248pub enum AttachError {
249    Connect(std::io::Error),
250    Io(std::io::Error),
251    Decode(prost::DecodeError),
252    Server { code: StatusCode, message: String },
253    /// The daemon never sent an AttachPtySessionResponse payload.
254    MissingPayload,
255}
256
257impl std::fmt::Display for AttachError {
258    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259        match self {
260            AttachError::Connect(e) => write!(f, "attach connect failed: {e}"),
261            AttachError::Io(e) => write!(f, "attach io error: {e}"),
262            AttachError::Decode(e) => write!(f, "attach decode error: {e}"),
263            AttachError::Server { code, message } => {
264                write!(f, "attach server error {code:?}: {message}")
265            }
266            AttachError::MissingPayload => write!(f, "attach response missing payload"),
267        }
268    }
269}
270
271impl std::error::Error for AttachError {}
272
273impl PtyAttachment {
274    /// Open a fresh socket to the daemon and attach to `session_id`.
275    pub fn attach(
276        scope_hash: Option<&str>,
277        session_id: &str,
278        rows: u16,
279        cols: u16,
280        steal: bool,
281    ) -> Result<Self, AttachError> {
282        let socket_path = paths::socket_path(scope_hash);
283        Self::attach_to(&socket_path, session_id, rows, cols, steal)
284    }
285
286    /// Open a fresh socket at `socket_path` and attach to `session_id`.
287    pub fn attach_to(
288        socket_path: &str,
289        session_id: &str,
290        rows: u16,
291        cols: u16,
292        steal: bool,
293    ) -> Result<Self, AttachError> {
294        let name = paths::make_socket_name(socket_path).map_err(AttachError::Connect)?;
295        use interprocess::local_socket::traits::Stream as _;
296        let stream = Stream::connect(name).map_err(AttachError::Connect)?;
297        let stream_clone = stream.try_clone().map_err(AttachError::Connect)?;
298        let mut reader = BufReader::new(stream);
299        let mut writer = BufWriter::new(stream_clone);
300
301        // Send the AttachPtySession request.
302        let attach_request = DaemonRequest {
303            id: 1,
304            r#type: RequestType::AttachPtySession.into(),
305            protocol_version: 1,
306            client_name: "running-process-client".into(),
307            attach_pty_session: Some(AttachPtySessionRequest {
308                session_id: session_id.into(),
309                rows: rows as u32,
310                cols: cols as u32,
311                steal,
312                term: std::env::var("TERM").unwrap_or_default(),
313                is_tty: true,
314            }),
315            ..Default::default()
316        };
317        write_length_prefixed(&mut writer, &attach_request.encode_to_vec())
318            .map_err(AttachError::Io)?;
319
320        // Read the initial response.
321        let response_bytes = read_length_prefixed(&mut reader).map_err(AttachError::Io)?;
322        let response =
323            DaemonResponse::decode(&response_bytes[..]).map_err(AttachError::Decode)?;
324        if response.code != StatusCode::Ok as i32 {
325            let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
326            return Err(AttachError::Server {
327                code,
328                message: response.message,
329            });
330        }
331        let payload: AttachPtySessionResponse = response
332            .attach_pty_session
333            .ok_or(AttachError::MissingPayload)?;
334
335        Ok(Self {
336            reader,
337            writer,
338            initial_backlog: payload.backlog,
339            bytes_missed: payload.bytes_missed,
340        })
341    }
342
343    /// Block until the next stream frame arrives.
344    pub fn recv_frame(&mut self) -> Result<PtyStreamFrame, AttachError> {
345        let bytes = read_length_prefixed(&mut self.reader).map_err(AttachError::Io)?;
346        PtyStreamFrame::decode(&bytes[..]).map_err(AttachError::Decode)
347    }
348
349    /// Block until the next stream frame arrives, or until `timeout`
350    /// elapses (returns `Ok(None)`). The underlying socket is put into
351    /// nonblocking mode for the duration of the wait; callers should not
352    /// interleave this with `recv_frame`.
353    pub fn recv_frame_with_timeout(
354        &mut self,
355        timeout: Duration,
356    ) -> Result<Option<PtyStreamFrame>, AttachError> {
357        // Pull the underlying stream out of BufReader so we can set
358        // read_timeout. interprocess::local_socket::Stream supports
359        // set_nonblocking via the platform shim; for portability we just
360        // poll in a short loop.
361        let deadline = std::time::Instant::now() + timeout;
362        loop {
363            // Try to fill the BufReader buffer non-blockingly. If we
364            // already have data, decode directly. Otherwise, sleep briefly
365            // and retry until the deadline.
366            if !self.reader.buffer().is_empty() {
367                return self.recv_frame().map(Some);
368            }
369            if std::time::Instant::now() >= deadline {
370                return Ok(None);
371            }
372            // Sleep a small amount; the OS will buffer incoming data.
373            std::thread::sleep(Duration::from_millis(20));
374            // Probe by peeking a single byte: read from reader will block,
375            // so we use the BufReader.fill_buf trick by reading 0 bytes
376            // first to populate. Simpler: just call recv_frame once the
377            // underlying socket reports it has data — but
378            // interprocess::Stream lacks portable peek. As a portable
379            // fallback, attempt a frame read and return on first success.
380            //
381            // To avoid blocking forever past the deadline, we rely on the
382            // OS to make recv_frame's read_exact return data quickly once
383            // it arrives; in practice for the M2 use case timeouts are
384            // generous (seconds) and the sleep loop above is the dominant
385            // mechanism. We do NOT actually call recv_frame here because
386            // it would block.
387        }
388    }
389
390    /// Send raw input bytes to the PTY.
391    pub fn send_input(&mut self, bytes: &[u8]) -> Result<(), AttachError> {
392        let frame = PtyInputFrame {
393            frame: Some(InputOneof::Input(bytes.to_vec())),
394        };
395        write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
396    }
397
398    /// Send a resize event.
399    pub fn resize(&mut self, rows: u16, cols: u16) -> Result<(), AttachError> {
400        let frame = PtyInputFrame {
401            frame: Some(InputOneof::Resize(PtyResize {
402                rows: rows as u32,
403                cols: cols as u32,
404            })),
405        };
406        write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
407    }
408
409    /// Send an interrupt (Ctrl+C / SIGINT) to the child process group.
410    pub fn send_interrupt(&mut self) -> Result<(), AttachError> {
411        let frame = PtyInputFrame {
412            frame: Some(InputOneof::Interrupt(true)),
413        };
414        write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
415    }
416
417    /// Cleanly detach this attachment; the session keeps running.
418    pub fn detach(mut self) -> Result<(), AttachError> {
419        let frame = PtyInputFrame {
420            frame: Some(InputOneof::Detach(true)),
421        };
422        write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
423    }
424}
425
426// ---------------------------------------------------------------------------
427// Length-prefixed framing (matches the daemon's LengthDelimitedCodec)
428// ---------------------------------------------------------------------------
429
430fn write_length_prefixed<W: Write>(w: &mut W, payload: &[u8]) -> Result<(), std::io::Error> {
431    let len = payload.len() as u32;
432    w.write_all(&len.to_be_bytes())?;
433    w.write_all(payload)?;
434    w.flush()
435}
436
437fn read_length_prefixed<R: Read>(r: &mut R) -> Result<Vec<u8>, std::io::Error> {
438    let mut len_buf = [0u8; 4];
439    r.read_exact(&mut len_buf)?;
440    let len = u32::from_be_bytes(len_buf) as usize;
441    let mut buf = vec![0u8; len];
442    r.read_exact(&mut buf)?;
443    Ok(buf)
444}
445
446// ---------------------------------------------------------------------------
447// Tests
448// ---------------------------------------------------------------------------
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453
454    #[test]
455    fn pty_spawn_request_builder_defaults() {
456        let req = PtySpawnRequest::new(["echo", "hi"])
457            .with_size(40, 100)
458            .with_originator("test:1");
459        assert_eq!(req.argv, vec!["echo".to_string(), "hi".to_string()]);
460        assert_eq!(req.rows, 40);
461        assert_eq!(req.cols, 100);
462        assert_eq!(req.originator.as_deref(), Some("test:1"));
463    }
464}