Skip to main content

running_process/client/
pty_session.rs

1//! Client-side helpers for daemon-owned detachable PTY sessions
2//! (issue #130 milestone 2).
3//!
4//! Sessions are spawned and listed via the regular [`DaemonClient`] RPC
5//! channel. Attach is special: after the daemon responds with
6//! `AttachPtySessionResponse` the same socket switches into a streaming
7//! mode that carries [`PtyStreamFrame`] (daemon → client) and
8//! [`PtyInputFrame`] (client → daemon) messages. [`PtyAttachment`] owns the
9//! socket for the lifetime of that stream and exposes blocking
10//! send/receive helpers suitable for tests and small clients. Async
11//! clients can build on top of the attachment framing exposed by
12//! [`PtyAttachment`].
13
14use crate::client::paths;
15use crate::client::{ClientError, DaemonClient};
16use crate::proto::daemon::{
17    pty_input_frame::Frame as InputOneof, AttachPtySessionRequest, AttachPtySessionResponse,
18    DaemonRequest, DaemonResponse, DetachPtySessionRequest, KeyValue, ListPtySessionsRequest,
19    ListPtySessionsResponse, PtyInputFrame, PtyResize, PtySessionInfo, PtyStreamFrame, RequestType,
20    SpawnPtySessionRequest, SpawnPtySessionResponse, StatusCode, TerminatePtySessionRequest,
21};
22use crate::terminal_graphics::{
23    current_terminal_capabilities, terminal_graphics_capabilities_to_proto, TerminalCapabilities,
24    TerminalGraphicsCapabilities,
25};
26use interprocess::local_socket::Stream;
27use interprocess::TryClone;
28use prost::Message;
29use std::io::{BufReader, BufWriter, Read, Write};
30use std::path::PathBuf;
31use std::time::Duration;
32
33// ---------------------------------------------------------------------------
34// Spawn / list / terminate convenience builders
35// ---------------------------------------------------------------------------
36
37/// Request shape for spawning a daemon-owned PTY session.
38#[derive(Debug, Clone)]
39pub struct PtySpawnRequest {
40    /// Command and arguments to execute inside the PTY.
41    pub argv: Vec<String>,
42    /// Working directory for the spawned process.
43    pub cwd: Option<PathBuf>,
44    /// Environment variables to add or override for the spawned process.
45    pub env: Vec<(String, String)>,
46    /// Whether to start from an empty environment instead of inheriting the daemon's environment.
47    pub clear_inherited_env: bool,
48    /// Initial terminal row count.
49    pub rows: u16,
50    /// Initial terminal column count.
51    pub cols: u16,
52    /// Optional caller-defined owner string used for listing and filtering sessions.
53    pub originator: Option<String>,
54}
55
56impl PtySpawnRequest {
57    /// Create a spawn request with default size and inherited environment.
58    pub fn new<S: Into<String>>(argv: impl IntoIterator<Item = S>) -> Self {
59        Self {
60            argv: argv.into_iter().map(Into::into).collect(),
61            cwd: None,
62            env: Vec::new(),
63            clear_inherited_env: false,
64            rows: 24,
65            cols: 80,
66            originator: None,
67        }
68    }
69
70    /// Set the working directory for the spawned process.
71    pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
72        self.cwd = Some(cwd.into());
73        self
74    }
75
76    /// Set the initial PTY size.
77    pub fn with_size(mut self, rows: u16, cols: u16) -> Self {
78        self.rows = rows;
79        self.cols = cols;
80        self
81    }
82
83    /// Set the caller-defined owner string for this session.
84    pub fn with_originator(mut self, originator: impl Into<String>) -> Self {
85        self.originator = Some(originator.into());
86        self
87    }
88
89    /// Replace the request's explicit environment variables.
90    pub fn with_envs<I, K, V>(mut self, env: I) -> Self
91    where
92        I: IntoIterator<Item = (K, V)>,
93        K: Into<String>,
94        V: Into<String>,
95    {
96        self.env = env.into_iter().map(|(k, v)| (k.into(), v.into())).collect();
97        self
98    }
99}
100
101/// Reply summary for a successful spawn.
102#[derive(Debug, Clone)]
103pub struct SpawnedPtySession {
104    /// Daemon-assigned PTY session identifier.
105    pub session_id: String,
106    /// Process ID of the spawned session leader.
107    pub pid: u32,
108    /// Creation time reported by the daemon, in seconds since the Unix epoch.
109    pub created_at: f64,
110}
111
112impl DaemonClient {
113    /// Ask the daemon to spawn a new PTY session that it owns.
114    pub fn spawn_pty_session(
115        &mut self,
116        request: &PtySpawnRequest,
117    ) -> Result<SpawnedPtySession, ClientError> {
118        let proto = SpawnPtySessionRequest {
119            argv: request.argv.clone(),
120            cwd: request
121                .cwd
122                .as_ref()
123                .map(|p| p.to_string_lossy().into_owned())
124                .unwrap_or_default(),
125            env: request
126                .env
127                .iter()
128                .map(|(k, v)| KeyValue {
129                    key: k.clone(),
130                    value: v.clone(),
131                })
132                .collect(),
133            clear_inherited_env: request.clear_inherited_env,
134            rows: request.rows as u32,
135            cols: request.cols as u32,
136            originator: request.originator.clone().unwrap_or_default(),
137        };
138
139        let daemon_request = DaemonRequest {
140            id: self.next_request_id(),
141            r#type: RequestType::SpawnPtySession.into(),
142            protocol_version: 1,
143            client_name: "running-process-client".into(),
144            spawn_pty_session: Some(proto),
145            ..Default::default()
146        };
147
148        let response = self.send_request(daemon_request)?;
149        ensure_ok(&response)?;
150        let payload: SpawnPtySessionResponse =
151            response
152                .spawn_pty_session
153                .ok_or_else(|| ClientError::Server {
154                    code: StatusCode::Internal,
155                    message: "spawn_pty_session response missing payload".into(),
156                })?;
157        Ok(SpawnedPtySession {
158            session_id: payload.session_id,
159            pid: payload.pid,
160            created_at: payload.created_at,
161        })
162    }
163
164    /// List PTY sessions known to the daemon. Empty `originator_filter`
165    /// returns all sessions in scope.
166    pub fn list_pty_sessions(
167        &mut self,
168        originator_filter: &str,
169    ) -> Result<Vec<PtySessionInfo>, ClientError> {
170        let req = DaemonRequest {
171            id: self.next_request_id(),
172            r#type: RequestType::ListPtySessions.into(),
173            protocol_version: 1,
174            client_name: "running-process-client".into(),
175            list_pty_sessions: Some(ListPtySessionsRequest {
176                originator: originator_filter.into(),
177            }),
178            ..Default::default()
179        };
180        let response = self.send_request(req)?;
181        ensure_ok(&response)?;
182        let payload: ListPtySessionsResponse =
183            response
184                .list_pty_sessions
185                .ok_or_else(|| ClientError::Server {
186                    code: StatusCode::Internal,
187                    message: "list_pty_sessions response missing payload".into(),
188                })?;
189        Ok(payload.sessions)
190    }
191
192    /// Ask the daemon to detach any current attachment from a session,
193    /// leaving the session alive. Idempotent.
194    pub fn detach_pty_session(&mut self, session_id: &str) -> Result<(), ClientError> {
195        let req = DaemonRequest {
196            id: self.next_request_id(),
197            r#type: RequestType::DetachPtySession.into(),
198            protocol_version: 1,
199            client_name: "running-process-client".into(),
200            detach_pty_session: Some(DetachPtySessionRequest {
201                session_id: session_id.into(),
202            }),
203            ..Default::default()
204        };
205        let response = self.send_request(req)?;
206        ensure_ok(&response)?;
207        Ok(())
208    }
209
210    /// Schedule termination of a PTY session. Returns as soon as the
211    /// daemon accepts the schedule; the actual termination happens on a
212    /// daemon background task (soft signal, grace, then hard kill).
213    pub fn terminate_pty_session(
214        &mut self,
215        session_id: &str,
216        grace_ms: u32,
217    ) -> Result<(), ClientError> {
218        let req = DaemonRequest {
219            id: self.next_request_id(),
220            r#type: RequestType::TerminatePtySession.into(),
221            protocol_version: 1,
222            client_name: "running-process-client".into(),
223            terminate_pty_session: Some(TerminatePtySessionRequest {
224                session_id: session_id.into(),
225                grace_ms,
226            }),
227            ..Default::default()
228        };
229        let response = self.send_request(req)?;
230        ensure_ok(&response)?;
231        Ok(())
232    }
233}
234
235fn ensure_ok(response: &DaemonResponse) -> Result<(), ClientError> {
236    if response.code == StatusCode::Ok as i32 {
237        return Ok(());
238    }
239    let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
240    Err(ClientError::Server {
241        code,
242        message: response.message.clone(),
243    })
244}
245
246// ---------------------------------------------------------------------------
247// PtyAttachment
248// ---------------------------------------------------------------------------
249
250/// Active attachment to a daemon-owned PTY session.
251///
252/// Owns the socket; the connection is in streaming mode and cannot be used
253/// for unrelated RPCs.
254pub struct PtyAttachment {
255    reader: BufReader<Stream>,
256    writer: BufWriter<Stream>,
257    /// Bytes received in the initial AttachPtySessionResponse (output the
258    /// client missed before attach succeeded).
259    pub initial_backlog: Vec<u8>,
260    /// Cumulative bytes dropped from the daemon's ring buffer before this
261    /// attach. Zero if the buffer never overflowed.
262    pub bytes_missed: u64,
263}
264
265/// Errors specific to attach.
266#[derive(Debug)]
267pub enum AttachError {
268    /// Failed to open a socket connection to the daemon.
269    Connect(std::io::Error),
270    /// I/O failed while exchanging attach or stream frames.
271    Io(std::io::Error),
272    /// A daemon response or stream frame could not be decoded.
273    Decode(prost::DecodeError),
274    /// The daemon rejected the attach request.
275    Server {
276        /// Status code returned by the daemon.
277        code: StatusCode,
278        /// Human-readable error message returned by the daemon.
279        message: String,
280    },
281    /// The daemon never sent an AttachPtySessionResponse payload.
282    MissingPayload,
283}
284
285impl std::fmt::Display for AttachError {
286    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
287        match self {
288            AttachError::Connect(e) => write!(f, "attach connect failed: {e}"),
289            AttachError::Io(e) => write!(f, "attach io error: {e}"),
290            AttachError::Decode(e) => write!(f, "attach decode error: {e}"),
291            AttachError::Server { code, message } => {
292                write!(f, "attach server error {code:?}: {message}")
293            }
294            AttachError::MissingPayload => write!(f, "attach response missing payload"),
295        }
296    }
297}
298
299impl std::error::Error for AttachError {}
300
301impl PtyAttachment {
302    /// Open a fresh socket to the daemon and attach to `session_id`.
303    pub fn attach(
304        scope_hash: Option<&str>,
305        session_id: &str,
306        rows: u16,
307        cols: u16,
308        steal: bool,
309    ) -> Result<Self, AttachError> {
310        let socket_path = paths::socket_path(scope_hash);
311        Self::attach_to(&socket_path, session_id, rows, cols, steal)
312    }
313
314    /// Open a fresh socket at `socket_path` and attach to `session_id`.
315    pub fn attach_to(
316        socket_path: &str,
317        session_id: &str,
318        rows: u16,
319        cols: u16,
320        steal: bool,
321    ) -> Result<Self, AttachError> {
322        let mut terminal_capabilities = current_terminal_capabilities();
323        if !terminal_capabilities.is_tty {
324            terminal_capabilities.is_tty = true;
325            terminal_capabilities.graphics = TerminalGraphicsCapabilities::unknown();
326        }
327        Self::attach_to_with_terminal_capabilities(
328            socket_path,
329            session_id,
330            rows,
331            cols,
332            steal,
333            terminal_capabilities,
334        )
335    }
336
337    /// Attach with explicit terminal metadata. This is useful for tests,
338    /// non-interactive attach clients, and callers that already performed
339    /// capability probing before opening the daemon socket.
340    pub fn attach_to_with_terminal_capabilities(
341        socket_path: &str,
342        session_id: &str,
343        rows: u16,
344        cols: u16,
345        steal: bool,
346        terminal_capabilities: TerminalCapabilities,
347    ) -> Result<Self, AttachError> {
348        let name = paths::make_socket_name(socket_path).map_err(AttachError::Connect)?;
349        use interprocess::local_socket::traits::Stream as _;
350        let stream = Stream::connect(name).map_err(AttachError::Connect)?;
351        let stream_clone = stream.try_clone().map_err(AttachError::Connect)?;
352        let mut reader = BufReader::new(stream);
353        let mut writer = BufWriter::new(stream_clone);
354
355        // Send the AttachPtySession request.
356        let attach_request = DaemonRequest {
357            id: 1,
358            r#type: RequestType::AttachPtySession.into(),
359            protocol_version: 1,
360            client_name: "running-process-client".into(),
361            attach_pty_session: Some(AttachPtySessionRequest {
362                session_id: session_id.into(),
363                rows: rows as u32,
364                cols: cols as u32,
365                steal,
366                term: terminal_capabilities.term.unwrap_or_default(),
367                is_tty: terminal_capabilities.is_tty,
368                graphics_capabilities: Some(terminal_graphics_capabilities_to_proto(
369                    &terminal_capabilities.graphics,
370                )),
371            }),
372            ..Default::default()
373        };
374        write_length_prefixed(&mut writer, &attach_request.encode_to_vec())
375            .map_err(AttachError::Io)?;
376
377        // Read the initial response.
378        let response_bytes = read_length_prefixed(&mut reader).map_err(AttachError::Io)?;
379        let response = DaemonResponse::decode(&response_bytes[..]).map_err(AttachError::Decode)?;
380        if response.code != StatusCode::Ok as i32 {
381            let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
382            return Err(AttachError::Server {
383                code,
384                message: response.message,
385            });
386        }
387        let payload: AttachPtySessionResponse = response
388            .attach_pty_session
389            .ok_or(AttachError::MissingPayload)?;
390
391        Ok(Self {
392            reader,
393            writer,
394            initial_backlog: payload.backlog,
395            bytes_missed: payload.bytes_missed,
396        })
397    }
398
399    /// Block until the next stream frame arrives.
400    pub fn recv_frame(&mut self) -> Result<PtyStreamFrame, AttachError> {
401        let bytes = read_length_prefixed(&mut self.reader).map_err(AttachError::Io)?;
402        PtyStreamFrame::decode(&bytes[..]).map_err(AttachError::Decode)
403    }
404
405    /// Block until the next stream frame arrives, or until `timeout`
406    /// elapses (returns `Ok(None)`). The underlying socket is put into
407    /// nonblocking mode for the duration of the wait; callers should not
408    /// interleave this with `recv_frame`.
409    pub fn recv_frame_with_timeout(
410        &mut self,
411        timeout: Duration,
412    ) -> Result<Option<PtyStreamFrame>, AttachError> {
413        // Pull the underlying stream out of BufReader so we can set
414        // read_timeout. interprocess::local_socket::Stream supports
415        // set_nonblocking via the platform shim; for portability we just
416        // poll in a short loop.
417        let deadline = std::time::Instant::now() + timeout;
418        loop {
419            // Try to fill the BufReader buffer non-blockingly. If we
420            // already have data, decode directly. Otherwise, sleep briefly
421            // and retry until the deadline.
422            if !self.reader.buffer().is_empty() {
423                return self.recv_frame().map(Some);
424            }
425            if std::time::Instant::now() >= deadline {
426                return Ok(None);
427            }
428            // Sleep a small amount; the OS will buffer incoming data.
429            //
430            // #199: intentional — `interprocess::local_socket::Stream`
431            // lacks a portable peek/ready primitive on Windows. The
432            // 20ms poll is the documented fallback. Replacing with
433            // an event-based primitive would require a per-platform
434            // shim that the upstream crate doesn't expose.
435            std::thread::sleep(Duration::from_millis(20));
436            // Probe by peeking a single byte: read from reader will block,
437            // so we use the BufReader.fill_buf trick by reading 0 bytes
438            // first to populate. Simpler: just call recv_frame once the
439            // underlying socket reports it has data — but
440            // interprocess::Stream lacks portable peek. As a portable
441            // fallback, attempt a frame read and return on first success.
442            //
443            // To avoid blocking forever past the deadline, we rely on the
444            // OS to make recv_frame's read_exact return data quickly once
445            // it arrives; in practice for the M2 use case timeouts are
446            // generous (seconds) and the sleep loop above is the dominant
447            // mechanism. We do NOT actually call recv_frame here because
448            // it would block.
449        }
450    }
451
452    /// Send raw input bytes to the PTY.
453    pub fn send_input(&mut self, bytes: &[u8]) -> Result<(), AttachError> {
454        let frame = PtyInputFrame {
455            frame: Some(InputOneof::Input(bytes.to_vec())),
456        };
457        write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
458    }
459
460    /// Send a resize event.
461    pub fn resize(&mut self, rows: u16, cols: u16) -> Result<(), AttachError> {
462        let frame = PtyInputFrame {
463            frame: Some(InputOneof::Resize(PtyResize {
464                rows: rows as u32,
465                cols: cols as u32,
466            })),
467        };
468        write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
469    }
470
471    /// Send an interrupt (Ctrl+C / SIGINT) to the child process group.
472    pub fn send_interrupt(&mut self) -> Result<(), AttachError> {
473        let frame = PtyInputFrame {
474            frame: Some(InputOneof::Interrupt(true)),
475        };
476        write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
477    }
478
479    /// Cleanly detach this attachment; the session keeps running.
480    pub fn detach(mut self) -> Result<(), AttachError> {
481        let frame = PtyInputFrame {
482            frame: Some(InputOneof::Detach(true)),
483        };
484        write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
485    }
486}
487
488// ---------------------------------------------------------------------------
489// Length-prefixed framing (matches the daemon's LengthDelimitedCodec)
490// ---------------------------------------------------------------------------
491
492fn write_length_prefixed<W: Write>(w: &mut W, payload: &[u8]) -> Result<(), std::io::Error> {
493    let len = payload.len() as u32;
494    w.write_all(&len.to_be_bytes())?;
495    w.write_all(payload)?;
496    w.flush()
497}
498
499fn read_length_prefixed<R: Read>(r: &mut R) -> Result<Vec<u8>, std::io::Error> {
500    let mut len_buf = [0u8; 4];
501    r.read_exact(&mut len_buf)?;
502    let len = u32::from_be_bytes(len_buf) as usize;
503    let mut buf = vec![0u8; len];
504    r.read_exact(&mut buf)?;
505    Ok(buf)
506}
507
508// ---------------------------------------------------------------------------
509// Tests
510// ---------------------------------------------------------------------------
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515
516    #[test]
517    fn pty_spawn_request_builder_defaults() {
518        let req = PtySpawnRequest::new(["echo", "hi"])
519            .with_size(40, 100)
520            .with_originator("test:1");
521        assert_eq!(req.argv, vec!["echo".to_string(), "hi".to_string()]);
522        assert_eq!(req.rows, 40);
523        assert_eq!(req.cols, 100);
524        assert_eq!(req.originator.as_deref(), Some("test:1"));
525    }
526}