Skip to main content

running_process/client/
pty_session.rs

1//! Client-side helpers for daemon-owned detachable PTY sessions
2//! (issue #130 milestone 2).
3//!
4//! Sessions are spawned and listed via the regular [`DaemonClient`] RPC
5//! channel. Attach is special: after the daemon responds with
6//! `AttachPtySessionResponse` the same socket switches into a streaming
7//! mode that carries [`PtyStreamFrame`] (daemon → client) and
8//! [`PtyInputFrame`] (client → daemon) messages. [`PtyAttachment`] owns the
9//! socket for the lifetime of that stream and exposes blocking
10//! send/receive helpers suitable for tests and small clients. Async
11//! clients can build on top of [`DaemonClient::attach_pty_session_raw`].
12
13use crate::client::paths;
14use crate::client::{ClientError, DaemonClient};
15use crate::proto::daemon::{
16    pty_input_frame::Frame as InputOneof, AttachPtySessionRequest, AttachPtySessionResponse,
17    DaemonRequest, DaemonResponse, DetachPtySessionRequest, KeyValue, ListPtySessionsRequest,
18    ListPtySessionsResponse, PtyInputFrame, PtyResize, PtySessionInfo, PtyStreamFrame, RequestType,
19    SpawnPtySessionRequest, SpawnPtySessionResponse, StatusCode, TerminatePtySessionRequest,
20};
21use crate::terminal_graphics::{
22    current_terminal_capabilities, terminal_graphics_capabilities_to_proto, TerminalCapabilities,
23    TerminalGraphicsCapabilities,
24};
25use interprocess::local_socket::Stream;
26use interprocess::TryClone;
27use prost::Message;
28use std::io::{BufReader, BufWriter, Read, Write};
29use std::path::PathBuf;
30use std::time::Duration;
31
32// ---------------------------------------------------------------------------
33// Spawn / list / terminate convenience builders
34// ---------------------------------------------------------------------------
35
36/// Request shape for spawning a daemon-owned PTY session.
37#[derive(Debug, Clone)]
38pub struct PtySpawnRequest {
39    pub argv: Vec<String>,
40    pub cwd: Option<PathBuf>,
41    pub env: Vec<(String, String)>,
42    pub clear_inherited_env: bool,
43    pub rows: u16,
44    pub cols: u16,
45    pub originator: Option<String>,
46}
47
48impl PtySpawnRequest {
49    pub fn new<S: Into<String>>(argv: impl IntoIterator<Item = S>) -> Self {
50        Self {
51            argv: argv.into_iter().map(Into::into).collect(),
52            cwd: None,
53            env: Vec::new(),
54            clear_inherited_env: false,
55            rows: 24,
56            cols: 80,
57            originator: None,
58        }
59    }
60
61    pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
62        self.cwd = Some(cwd.into());
63        self
64    }
65
66    pub fn with_size(mut self, rows: u16, cols: u16) -> Self {
67        self.rows = rows;
68        self.cols = cols;
69        self
70    }
71
72    pub fn with_originator(mut self, originator: impl Into<String>) -> Self {
73        self.originator = Some(originator.into());
74        self
75    }
76
77    pub fn with_envs<I, K, V>(mut self, env: I) -> Self
78    where
79        I: IntoIterator<Item = (K, V)>,
80        K: Into<String>,
81        V: Into<String>,
82    {
83        self.env = env.into_iter().map(|(k, v)| (k.into(), v.into())).collect();
84        self
85    }
86}
87
88/// Reply summary for a successful spawn.
89#[derive(Debug, Clone)]
90pub struct SpawnedPtySession {
91    pub session_id: String,
92    pub pid: u32,
93    pub created_at: f64,
94}
95
96impl DaemonClient {
97    /// Ask the daemon to spawn a new PTY session that it owns.
98    pub fn spawn_pty_session(
99        &mut self,
100        request: &PtySpawnRequest,
101    ) -> Result<SpawnedPtySession, ClientError> {
102        let proto = SpawnPtySessionRequest {
103            argv: request.argv.clone(),
104            cwd: request
105                .cwd
106                .as_ref()
107                .map(|p| p.to_string_lossy().into_owned())
108                .unwrap_or_default(),
109            env: request
110                .env
111                .iter()
112                .map(|(k, v)| KeyValue {
113                    key: k.clone(),
114                    value: v.clone(),
115                })
116                .collect(),
117            clear_inherited_env: request.clear_inherited_env,
118            rows: request.rows as u32,
119            cols: request.cols as u32,
120            originator: request.originator.clone().unwrap_or_default(),
121        };
122
123        let daemon_request = DaemonRequest {
124            id: self.next_request_id(),
125            r#type: RequestType::SpawnPtySession.into(),
126            protocol_version: 1,
127            client_name: "running-process-client".into(),
128            spawn_pty_session: Some(proto),
129            ..Default::default()
130        };
131
132        let response = self.send_request(daemon_request)?;
133        ensure_ok(&response)?;
134        let payload: SpawnPtySessionResponse =
135            response
136                .spawn_pty_session
137                .ok_or_else(|| ClientError::Server {
138                    code: StatusCode::Internal,
139                    message: "spawn_pty_session response missing payload".into(),
140                })?;
141        Ok(SpawnedPtySession {
142            session_id: payload.session_id,
143            pid: payload.pid,
144            created_at: payload.created_at,
145        })
146    }
147
148    /// List PTY sessions known to the daemon. Empty `originator_filter`
149    /// returns all sessions in scope.
150    pub fn list_pty_sessions(
151        &mut self,
152        originator_filter: &str,
153    ) -> Result<Vec<PtySessionInfo>, ClientError> {
154        let req = DaemonRequest {
155            id: self.next_request_id(),
156            r#type: RequestType::ListPtySessions.into(),
157            protocol_version: 1,
158            client_name: "running-process-client".into(),
159            list_pty_sessions: Some(ListPtySessionsRequest {
160                originator: originator_filter.into(),
161            }),
162            ..Default::default()
163        };
164        let response = self.send_request(req)?;
165        ensure_ok(&response)?;
166        let payload: ListPtySessionsResponse =
167            response
168                .list_pty_sessions
169                .ok_or_else(|| ClientError::Server {
170                    code: StatusCode::Internal,
171                    message: "list_pty_sessions response missing payload".into(),
172                })?;
173        Ok(payload.sessions)
174    }
175
176    /// Ask the daemon to detach any current attachment from a session,
177    /// leaving the session alive. Idempotent.
178    pub fn detach_pty_session(&mut self, session_id: &str) -> Result<(), ClientError> {
179        let req = DaemonRequest {
180            id: self.next_request_id(),
181            r#type: RequestType::DetachPtySession.into(),
182            protocol_version: 1,
183            client_name: "running-process-client".into(),
184            detach_pty_session: Some(DetachPtySessionRequest {
185                session_id: session_id.into(),
186            }),
187            ..Default::default()
188        };
189        let response = self.send_request(req)?;
190        ensure_ok(&response)?;
191        Ok(())
192    }
193
194    /// Schedule termination of a PTY session. Returns as soon as the
195    /// daemon accepts the schedule; the actual termination happens on a
196    /// daemon background task (soft signal, grace, then hard kill).
197    pub fn terminate_pty_session(
198        &mut self,
199        session_id: &str,
200        grace_ms: u32,
201    ) -> Result<(), ClientError> {
202        let req = DaemonRequest {
203            id: self.next_request_id(),
204            r#type: RequestType::TerminatePtySession.into(),
205            protocol_version: 1,
206            client_name: "running-process-client".into(),
207            terminate_pty_session: Some(TerminatePtySessionRequest {
208                session_id: session_id.into(),
209                grace_ms,
210            }),
211            ..Default::default()
212        };
213        let response = self.send_request(req)?;
214        ensure_ok(&response)?;
215        Ok(())
216    }
217}
218
219fn ensure_ok(response: &DaemonResponse) -> Result<(), ClientError> {
220    if response.code == StatusCode::Ok as i32 {
221        return Ok(());
222    }
223    let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
224    Err(ClientError::Server {
225        code,
226        message: response.message.clone(),
227    })
228}
229
230// ---------------------------------------------------------------------------
231// PtyAttachment
232// ---------------------------------------------------------------------------
233
234/// Active attachment to a daemon-owned PTY session.
235///
236/// Owns the socket; the connection is in streaming mode and cannot be used
237/// for unrelated RPCs.
238pub struct PtyAttachment {
239    reader: BufReader<Stream>,
240    writer: BufWriter<Stream>,
241    /// Bytes received in the initial AttachPtySessionResponse (output the
242    /// client missed before attach succeeded).
243    pub initial_backlog: Vec<u8>,
244    /// Cumulative bytes dropped from the daemon's ring buffer before this
245    /// attach. Zero if the buffer never overflowed.
246    pub bytes_missed: u64,
247}
248
249/// Errors specific to attach.
250#[derive(Debug)]
251pub enum AttachError {
252    Connect(std::io::Error),
253    Io(std::io::Error),
254    Decode(prost::DecodeError),
255    Server {
256        code: StatusCode,
257        message: String,
258    },
259    /// The daemon never sent an AttachPtySessionResponse payload.
260    MissingPayload,
261}
262
263impl std::fmt::Display for AttachError {
264    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265        match self {
266            AttachError::Connect(e) => write!(f, "attach connect failed: {e}"),
267            AttachError::Io(e) => write!(f, "attach io error: {e}"),
268            AttachError::Decode(e) => write!(f, "attach decode error: {e}"),
269            AttachError::Server { code, message } => {
270                write!(f, "attach server error {code:?}: {message}")
271            }
272            AttachError::MissingPayload => write!(f, "attach response missing payload"),
273        }
274    }
275}
276
277impl std::error::Error for AttachError {}
278
279impl PtyAttachment {
280    /// Open a fresh socket to the daemon and attach to `session_id`.
281    pub fn attach(
282        scope_hash: Option<&str>,
283        session_id: &str,
284        rows: u16,
285        cols: u16,
286        steal: bool,
287    ) -> Result<Self, AttachError> {
288        let socket_path = paths::socket_path(scope_hash);
289        Self::attach_to(&socket_path, session_id, rows, cols, steal)
290    }
291
292    /// Open a fresh socket at `socket_path` and attach to `session_id`.
293    pub fn attach_to(
294        socket_path: &str,
295        session_id: &str,
296        rows: u16,
297        cols: u16,
298        steal: bool,
299    ) -> Result<Self, AttachError> {
300        let mut terminal_capabilities = current_terminal_capabilities();
301        if !terminal_capabilities.is_tty {
302            terminal_capabilities.is_tty = true;
303            terminal_capabilities.graphics = TerminalGraphicsCapabilities::unknown();
304        }
305        Self::attach_to_with_terminal_capabilities(
306            socket_path,
307            session_id,
308            rows,
309            cols,
310            steal,
311            terminal_capabilities,
312        )
313    }
314
315    /// Attach with explicit terminal metadata. This is useful for tests,
316    /// non-interactive attach clients, and callers that already performed
317    /// capability probing before opening the daemon socket.
318    pub fn attach_to_with_terminal_capabilities(
319        socket_path: &str,
320        session_id: &str,
321        rows: u16,
322        cols: u16,
323        steal: bool,
324        terminal_capabilities: TerminalCapabilities,
325    ) -> Result<Self, AttachError> {
326        let name = paths::make_socket_name(socket_path).map_err(AttachError::Connect)?;
327        use interprocess::local_socket::traits::Stream as _;
328        let stream = Stream::connect(name).map_err(AttachError::Connect)?;
329        let stream_clone = stream.try_clone().map_err(AttachError::Connect)?;
330        let mut reader = BufReader::new(stream);
331        let mut writer = BufWriter::new(stream_clone);
332
333        // Send the AttachPtySession request.
334        let attach_request = DaemonRequest {
335            id: 1,
336            r#type: RequestType::AttachPtySession.into(),
337            protocol_version: 1,
338            client_name: "running-process-client".into(),
339            attach_pty_session: Some(AttachPtySessionRequest {
340                session_id: session_id.into(),
341                rows: rows as u32,
342                cols: cols as u32,
343                steal,
344                term: terminal_capabilities.term.unwrap_or_default(),
345                is_tty: terminal_capabilities.is_tty,
346                graphics_capabilities: Some(terminal_graphics_capabilities_to_proto(
347                    &terminal_capabilities.graphics,
348                )),
349            }),
350            ..Default::default()
351        };
352        write_length_prefixed(&mut writer, &attach_request.encode_to_vec())
353            .map_err(AttachError::Io)?;
354
355        // Read the initial response.
356        let response_bytes = read_length_prefixed(&mut reader).map_err(AttachError::Io)?;
357        let response = DaemonResponse::decode(&response_bytes[..]).map_err(AttachError::Decode)?;
358        if response.code != StatusCode::Ok as i32 {
359            let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
360            return Err(AttachError::Server {
361                code,
362                message: response.message,
363            });
364        }
365        let payload: AttachPtySessionResponse = response
366            .attach_pty_session
367            .ok_or(AttachError::MissingPayload)?;
368
369        Ok(Self {
370            reader,
371            writer,
372            initial_backlog: payload.backlog,
373            bytes_missed: payload.bytes_missed,
374        })
375    }
376
377    /// Block until the next stream frame arrives.
378    pub fn recv_frame(&mut self) -> Result<PtyStreamFrame, AttachError> {
379        let bytes = read_length_prefixed(&mut self.reader).map_err(AttachError::Io)?;
380        PtyStreamFrame::decode(&bytes[..]).map_err(AttachError::Decode)
381    }
382
383    /// Block until the next stream frame arrives, or until `timeout`
384    /// elapses (returns `Ok(None)`). The underlying socket is put into
385    /// nonblocking mode for the duration of the wait; callers should not
386    /// interleave this with `recv_frame`.
387    pub fn recv_frame_with_timeout(
388        &mut self,
389        timeout: Duration,
390    ) -> Result<Option<PtyStreamFrame>, AttachError> {
391        // Pull the underlying stream out of BufReader so we can set
392        // read_timeout. interprocess::local_socket::Stream supports
393        // set_nonblocking via the platform shim; for portability we just
394        // poll in a short loop.
395        let deadline = std::time::Instant::now() + timeout;
396        loop {
397            // Try to fill the BufReader buffer non-blockingly. If we
398            // already have data, decode directly. Otherwise, sleep briefly
399            // and retry until the deadline.
400            if !self.reader.buffer().is_empty() {
401                return self.recv_frame().map(Some);
402            }
403            if std::time::Instant::now() >= deadline {
404                return Ok(None);
405            }
406            // Sleep a small amount; the OS will buffer incoming data.
407            //
408            // #199: intentional — `interprocess::local_socket::Stream`
409            // lacks a portable peek/ready primitive on Windows. The
410            // 20ms poll is the documented fallback. Replacing with
411            // an event-based primitive would require a per-platform
412            // shim that the upstream crate doesn't expose.
413            std::thread::sleep(Duration::from_millis(20));
414            // Probe by peeking a single byte: read from reader will block,
415            // so we use the BufReader.fill_buf trick by reading 0 bytes
416            // first to populate. Simpler: just call recv_frame once the
417            // underlying socket reports it has data — but
418            // interprocess::Stream lacks portable peek. As a portable
419            // fallback, attempt a frame read and return on first success.
420            //
421            // To avoid blocking forever past the deadline, we rely on the
422            // OS to make recv_frame's read_exact return data quickly once
423            // it arrives; in practice for the M2 use case timeouts are
424            // generous (seconds) and the sleep loop above is the dominant
425            // mechanism. We do NOT actually call recv_frame here because
426            // it would block.
427        }
428    }
429
430    /// Send raw input bytes to the PTY.
431    pub fn send_input(&mut self, bytes: &[u8]) -> Result<(), AttachError> {
432        let frame = PtyInputFrame {
433            frame: Some(InputOneof::Input(bytes.to_vec())),
434        };
435        write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
436    }
437
438    /// Send a resize event.
439    pub fn resize(&mut self, rows: u16, cols: u16) -> Result<(), AttachError> {
440        let frame = PtyInputFrame {
441            frame: Some(InputOneof::Resize(PtyResize {
442                rows: rows as u32,
443                cols: cols as u32,
444            })),
445        };
446        write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
447    }
448
449    /// Send an interrupt (Ctrl+C / SIGINT) to the child process group.
450    pub fn send_interrupt(&mut self) -> Result<(), AttachError> {
451        let frame = PtyInputFrame {
452            frame: Some(InputOneof::Interrupt(true)),
453        };
454        write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
455    }
456
457    /// Cleanly detach this attachment; the session keeps running.
458    pub fn detach(mut self) -> Result<(), AttachError> {
459        let frame = PtyInputFrame {
460            frame: Some(InputOneof::Detach(true)),
461        };
462        write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
463    }
464}
465
466// ---------------------------------------------------------------------------
467// Length-prefixed framing (matches the daemon's LengthDelimitedCodec)
468// ---------------------------------------------------------------------------
469
470fn write_length_prefixed<W: Write>(w: &mut W, payload: &[u8]) -> Result<(), std::io::Error> {
471    let len = payload.len() as u32;
472    w.write_all(&len.to_be_bytes())?;
473    w.write_all(payload)?;
474    w.flush()
475}
476
477fn read_length_prefixed<R: Read>(r: &mut R) -> Result<Vec<u8>, std::io::Error> {
478    let mut len_buf = [0u8; 4];
479    r.read_exact(&mut len_buf)?;
480    let len = u32::from_be_bytes(len_buf) as usize;
481    let mut buf = vec![0u8; len];
482    r.read_exact(&mut buf)?;
483    Ok(buf)
484}
485
486// ---------------------------------------------------------------------------
487// Tests
488// ---------------------------------------------------------------------------
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493
494    #[test]
495    fn pty_spawn_request_builder_defaults() {
496        let req = PtySpawnRequest::new(["echo", "hi"])
497            .with_size(40, 100)
498            .with_originator("test:1");
499        assert_eq!(req.argv, vec!["echo".to_string(), "hi".to_string()]);
500        assert_eq!(req.rows, 40);
501        assert_eq!(req.cols, 100);
502        assert_eq!(req.originator.as_deref(), Some("test:1"));
503    }
504}