1use crate::client::paths;
14use crate::client::{ClientError, DaemonClient};
15use crate::proto::daemon::{
16 pty_input_frame::Frame as InputOneof, AttachPtySessionRequest, AttachPtySessionResponse,
17 DaemonRequest, DaemonResponse, DetachPtySessionRequest, KeyValue, ListPtySessionsRequest,
18 ListPtySessionsResponse, PtyInputFrame, PtyResize, PtySessionInfo, PtyStreamFrame, RequestType,
19 SpawnPtySessionRequest, SpawnPtySessionResponse, StatusCode, TerminatePtySessionRequest,
20};
21use crate::terminal_graphics::{
22 current_terminal_capabilities, terminal_graphics_capabilities_to_proto, TerminalCapabilities,
23 TerminalGraphicsCapabilities,
24};
25use interprocess::local_socket::Stream;
26use interprocess::TryClone;
27use prost::Message;
28use std::io::{BufReader, BufWriter, Read, Write};
29use std::path::PathBuf;
30use std::time::Duration;
31
32#[derive(Debug, Clone)]
38pub struct PtySpawnRequest {
39 pub argv: Vec<String>,
40 pub cwd: Option<PathBuf>,
41 pub env: Vec<(String, String)>,
42 pub clear_inherited_env: bool,
43 pub rows: u16,
44 pub cols: u16,
45 pub originator: Option<String>,
46}
47
48impl PtySpawnRequest {
49 pub fn new<S: Into<String>>(argv: impl IntoIterator<Item = S>) -> Self {
50 Self {
51 argv: argv.into_iter().map(Into::into).collect(),
52 cwd: None,
53 env: Vec::new(),
54 clear_inherited_env: false,
55 rows: 24,
56 cols: 80,
57 originator: None,
58 }
59 }
60
61 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
62 self.cwd = Some(cwd.into());
63 self
64 }
65
66 pub fn with_size(mut self, rows: u16, cols: u16) -> Self {
67 self.rows = rows;
68 self.cols = cols;
69 self
70 }
71
72 pub fn with_originator(mut self, originator: impl Into<String>) -> Self {
73 self.originator = Some(originator.into());
74 self
75 }
76
77 pub fn with_envs<I, K, V>(mut self, env: I) -> Self
78 where
79 I: IntoIterator<Item = (K, V)>,
80 K: Into<String>,
81 V: Into<String>,
82 {
83 self.env = env.into_iter().map(|(k, v)| (k.into(), v.into())).collect();
84 self
85 }
86}
87
88#[derive(Debug, Clone)]
90pub struct SpawnedPtySession {
91 pub session_id: String,
92 pub pid: u32,
93 pub created_at: f64,
94}
95
96impl DaemonClient {
97 pub fn spawn_pty_session(
99 &mut self,
100 request: &PtySpawnRequest,
101 ) -> Result<SpawnedPtySession, ClientError> {
102 let proto = SpawnPtySessionRequest {
103 argv: request.argv.clone(),
104 cwd: request
105 .cwd
106 .as_ref()
107 .map(|p| p.to_string_lossy().into_owned())
108 .unwrap_or_default(),
109 env: request
110 .env
111 .iter()
112 .map(|(k, v)| KeyValue {
113 key: k.clone(),
114 value: v.clone(),
115 })
116 .collect(),
117 clear_inherited_env: request.clear_inherited_env,
118 rows: request.rows as u32,
119 cols: request.cols as u32,
120 originator: request.originator.clone().unwrap_or_default(),
121 };
122
123 let daemon_request = DaemonRequest {
124 id: self.next_request_id(),
125 r#type: RequestType::SpawnPtySession.into(),
126 protocol_version: 1,
127 client_name: "running-process-client".into(),
128 spawn_pty_session: Some(proto),
129 ..Default::default()
130 };
131
132 let response = self.send_request(daemon_request)?;
133 ensure_ok(&response)?;
134 let payload: SpawnPtySessionResponse =
135 response
136 .spawn_pty_session
137 .ok_or_else(|| ClientError::Server {
138 code: StatusCode::Internal,
139 message: "spawn_pty_session response missing payload".into(),
140 })?;
141 Ok(SpawnedPtySession {
142 session_id: payload.session_id,
143 pid: payload.pid,
144 created_at: payload.created_at,
145 })
146 }
147
148 pub fn list_pty_sessions(
151 &mut self,
152 originator_filter: &str,
153 ) -> Result<Vec<PtySessionInfo>, ClientError> {
154 let req = DaemonRequest {
155 id: self.next_request_id(),
156 r#type: RequestType::ListPtySessions.into(),
157 protocol_version: 1,
158 client_name: "running-process-client".into(),
159 list_pty_sessions: Some(ListPtySessionsRequest {
160 originator: originator_filter.into(),
161 }),
162 ..Default::default()
163 };
164 let response = self.send_request(req)?;
165 ensure_ok(&response)?;
166 let payload: ListPtySessionsResponse =
167 response
168 .list_pty_sessions
169 .ok_or_else(|| ClientError::Server {
170 code: StatusCode::Internal,
171 message: "list_pty_sessions response missing payload".into(),
172 })?;
173 Ok(payload.sessions)
174 }
175
176 pub fn detach_pty_session(&mut self, session_id: &str) -> Result<(), ClientError> {
179 let req = DaemonRequest {
180 id: self.next_request_id(),
181 r#type: RequestType::DetachPtySession.into(),
182 protocol_version: 1,
183 client_name: "running-process-client".into(),
184 detach_pty_session: Some(DetachPtySessionRequest {
185 session_id: session_id.into(),
186 }),
187 ..Default::default()
188 };
189 let response = self.send_request(req)?;
190 ensure_ok(&response)?;
191 Ok(())
192 }
193
194 pub fn terminate_pty_session(
198 &mut self,
199 session_id: &str,
200 grace_ms: u32,
201 ) -> Result<(), ClientError> {
202 let req = DaemonRequest {
203 id: self.next_request_id(),
204 r#type: RequestType::TerminatePtySession.into(),
205 protocol_version: 1,
206 client_name: "running-process-client".into(),
207 terminate_pty_session: Some(TerminatePtySessionRequest {
208 session_id: session_id.into(),
209 grace_ms,
210 }),
211 ..Default::default()
212 };
213 let response = self.send_request(req)?;
214 ensure_ok(&response)?;
215 Ok(())
216 }
217}
218
219fn ensure_ok(response: &DaemonResponse) -> Result<(), ClientError> {
220 if response.code == StatusCode::Ok as i32 {
221 return Ok(());
222 }
223 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
224 Err(ClientError::Server {
225 code,
226 message: response.message.clone(),
227 })
228}
229
230pub struct PtyAttachment {
239 reader: BufReader<Stream>,
240 writer: BufWriter<Stream>,
241 pub initial_backlog: Vec<u8>,
244 pub bytes_missed: u64,
247}
248
249#[derive(Debug)]
251pub enum AttachError {
252 Connect(std::io::Error),
253 Io(std::io::Error),
254 Decode(prost::DecodeError),
255 Server {
256 code: StatusCode,
257 message: String,
258 },
259 MissingPayload,
261}
262
263impl std::fmt::Display for AttachError {
264 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265 match self {
266 AttachError::Connect(e) => write!(f, "attach connect failed: {e}"),
267 AttachError::Io(e) => write!(f, "attach io error: {e}"),
268 AttachError::Decode(e) => write!(f, "attach decode error: {e}"),
269 AttachError::Server { code, message } => {
270 write!(f, "attach server error {code:?}: {message}")
271 }
272 AttachError::MissingPayload => write!(f, "attach response missing payload"),
273 }
274 }
275}
276
277impl std::error::Error for AttachError {}
278
279impl PtyAttachment {
280 pub fn attach(
282 scope_hash: Option<&str>,
283 session_id: &str,
284 rows: u16,
285 cols: u16,
286 steal: bool,
287 ) -> Result<Self, AttachError> {
288 let socket_path = paths::socket_path(scope_hash);
289 Self::attach_to(&socket_path, session_id, rows, cols, steal)
290 }
291
292 pub fn attach_to(
294 socket_path: &str,
295 session_id: &str,
296 rows: u16,
297 cols: u16,
298 steal: bool,
299 ) -> Result<Self, AttachError> {
300 let mut terminal_capabilities = current_terminal_capabilities();
301 if !terminal_capabilities.is_tty {
302 terminal_capabilities.is_tty = true;
303 terminal_capabilities.graphics = TerminalGraphicsCapabilities::unknown();
304 }
305 Self::attach_to_with_terminal_capabilities(
306 socket_path,
307 session_id,
308 rows,
309 cols,
310 steal,
311 terminal_capabilities,
312 )
313 }
314
315 pub fn attach_to_with_terminal_capabilities(
319 socket_path: &str,
320 session_id: &str,
321 rows: u16,
322 cols: u16,
323 steal: bool,
324 terminal_capabilities: TerminalCapabilities,
325 ) -> Result<Self, AttachError> {
326 let name = paths::make_socket_name(socket_path).map_err(AttachError::Connect)?;
327 use interprocess::local_socket::traits::Stream as _;
328 let stream = Stream::connect(name).map_err(AttachError::Connect)?;
329 let stream_clone = stream.try_clone().map_err(AttachError::Connect)?;
330 let mut reader = BufReader::new(stream);
331 let mut writer = BufWriter::new(stream_clone);
332
333 let attach_request = DaemonRequest {
335 id: 1,
336 r#type: RequestType::AttachPtySession.into(),
337 protocol_version: 1,
338 client_name: "running-process-client".into(),
339 attach_pty_session: Some(AttachPtySessionRequest {
340 session_id: session_id.into(),
341 rows: rows as u32,
342 cols: cols as u32,
343 steal,
344 term: terminal_capabilities.term.unwrap_or_default(),
345 is_tty: terminal_capabilities.is_tty,
346 graphics_capabilities: Some(terminal_graphics_capabilities_to_proto(
347 &terminal_capabilities.graphics,
348 )),
349 }),
350 ..Default::default()
351 };
352 write_length_prefixed(&mut writer, &attach_request.encode_to_vec())
353 .map_err(AttachError::Io)?;
354
355 let response_bytes = read_length_prefixed(&mut reader).map_err(AttachError::Io)?;
357 let response = DaemonResponse::decode(&response_bytes[..]).map_err(AttachError::Decode)?;
358 if response.code != StatusCode::Ok as i32 {
359 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
360 return Err(AttachError::Server {
361 code,
362 message: response.message,
363 });
364 }
365 let payload: AttachPtySessionResponse = response
366 .attach_pty_session
367 .ok_or(AttachError::MissingPayload)?;
368
369 Ok(Self {
370 reader,
371 writer,
372 initial_backlog: payload.backlog,
373 bytes_missed: payload.bytes_missed,
374 })
375 }
376
377 pub fn recv_frame(&mut self) -> Result<PtyStreamFrame, AttachError> {
379 let bytes = read_length_prefixed(&mut self.reader).map_err(AttachError::Io)?;
380 PtyStreamFrame::decode(&bytes[..]).map_err(AttachError::Decode)
381 }
382
383 pub fn recv_frame_with_timeout(
388 &mut self,
389 timeout: Duration,
390 ) -> Result<Option<PtyStreamFrame>, AttachError> {
391 let deadline = std::time::Instant::now() + timeout;
396 loop {
397 if !self.reader.buffer().is_empty() {
401 return self.recv_frame().map(Some);
402 }
403 if std::time::Instant::now() >= deadline {
404 return Ok(None);
405 }
406 std::thread::sleep(Duration::from_millis(20));
414 }
428 }
429
430 pub fn send_input(&mut self, bytes: &[u8]) -> Result<(), AttachError> {
432 let frame = PtyInputFrame {
433 frame: Some(InputOneof::Input(bytes.to_vec())),
434 };
435 write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
436 }
437
438 pub fn resize(&mut self, rows: u16, cols: u16) -> Result<(), AttachError> {
440 let frame = PtyInputFrame {
441 frame: Some(InputOneof::Resize(PtyResize {
442 rows: rows as u32,
443 cols: cols as u32,
444 })),
445 };
446 write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
447 }
448
449 pub fn send_interrupt(&mut self) -> Result<(), AttachError> {
451 let frame = PtyInputFrame {
452 frame: Some(InputOneof::Interrupt(true)),
453 };
454 write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
455 }
456
457 pub fn detach(mut self) -> Result<(), AttachError> {
459 let frame = PtyInputFrame {
460 frame: Some(InputOneof::Detach(true)),
461 };
462 write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
463 }
464}
465
466fn write_length_prefixed<W: Write>(w: &mut W, payload: &[u8]) -> Result<(), std::io::Error> {
471 let len = payload.len() as u32;
472 w.write_all(&len.to_be_bytes())?;
473 w.write_all(payload)?;
474 w.flush()
475}
476
477fn read_length_prefixed<R: Read>(r: &mut R) -> Result<Vec<u8>, std::io::Error> {
478 let mut len_buf = [0u8; 4];
479 r.read_exact(&mut len_buf)?;
480 let len = u32::from_be_bytes(len_buf) as usize;
481 let mut buf = vec![0u8; len];
482 r.read_exact(&mut buf)?;
483 Ok(buf)
484}
485
486#[cfg(test)]
491mod tests {
492 use super::*;
493
494 #[test]
495 fn pty_spawn_request_builder_defaults() {
496 let req = PtySpawnRequest::new(["echo", "hi"])
497 .with_size(40, 100)
498 .with_originator("test:1");
499 assert_eq!(req.argv, vec!["echo".to_string(), "hi".to_string()]);
500 assert_eq!(req.rows, 40);
501 assert_eq!(req.cols, 100);
502 assert_eq!(req.originator.as_deref(), Some("test:1"));
503 }
504}