1use crate::client::paths;
15use crate::client::{ClientError, DaemonClient};
16use crate::proto::daemon::{
17 pty_input_frame::Frame as InputOneof, AttachPtySessionRequest, AttachPtySessionResponse,
18 DaemonRequest, DaemonResponse, DetachPtySessionRequest, KeyValue, ListPtySessionsRequest,
19 ListPtySessionsResponse, PtyInputFrame, PtyResize, PtySessionInfo, PtyStreamFrame, RequestType,
20 SpawnPtySessionRequest, SpawnPtySessionResponse, StatusCode, TerminatePtySessionRequest,
21};
22use crate::terminal_graphics::{
23 current_terminal_capabilities, terminal_graphics_capabilities_to_proto, TerminalCapabilities,
24 TerminalGraphicsCapabilities,
25};
26use interprocess::local_socket::Stream;
27use interprocess::TryClone;
28use prost::Message;
29use std::io::{BufReader, BufWriter, Read, Write};
30use std::path::PathBuf;
31use std::time::Duration;
32
33#[derive(Debug, Clone)]
39pub struct PtySpawnRequest {
40 pub argv: Vec<String>,
42 pub cwd: Option<PathBuf>,
44 pub env: Vec<(String, String)>,
46 pub clear_inherited_env: bool,
48 pub rows: u16,
50 pub cols: u16,
52 pub originator: Option<String>,
54}
55
56impl PtySpawnRequest {
57 pub fn new<S: Into<String>>(argv: impl IntoIterator<Item = S>) -> Self {
59 Self {
60 argv: argv.into_iter().map(Into::into).collect(),
61 cwd: None,
62 env: Vec::new(),
63 clear_inherited_env: false,
64 rows: 24,
65 cols: 80,
66 originator: None,
67 }
68 }
69
70 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
72 self.cwd = Some(cwd.into());
73 self
74 }
75
76 pub fn with_size(mut self, rows: u16, cols: u16) -> Self {
78 self.rows = rows;
79 self.cols = cols;
80 self
81 }
82
83 pub fn with_originator(mut self, originator: impl Into<String>) -> Self {
85 self.originator = Some(originator.into());
86 self
87 }
88
89 pub fn with_envs<I, K, V>(mut self, env: I) -> Self
91 where
92 I: IntoIterator<Item = (K, V)>,
93 K: Into<String>,
94 V: Into<String>,
95 {
96 self.env = env.into_iter().map(|(k, v)| (k.into(), v.into())).collect();
97 self
98 }
99}
100
101#[derive(Debug, Clone)]
103pub struct SpawnedPtySession {
104 pub session_id: String,
106 pub pid: u32,
108 pub created_at: f64,
110}
111
112impl DaemonClient {
113 pub fn spawn_pty_session(
115 &mut self,
116 request: &PtySpawnRequest,
117 ) -> Result<SpawnedPtySession, ClientError> {
118 let proto = SpawnPtySessionRequest {
119 argv: request.argv.clone(),
120 cwd: request
121 .cwd
122 .as_ref()
123 .map(|p| p.to_string_lossy().into_owned())
124 .unwrap_or_default(),
125 env: request
126 .env
127 .iter()
128 .map(|(k, v)| KeyValue {
129 key: k.clone(),
130 value: v.clone(),
131 })
132 .collect(),
133 clear_inherited_env: request.clear_inherited_env,
134 rows: request.rows as u32,
135 cols: request.cols as u32,
136 originator: request.originator.clone().unwrap_or_default(),
137 };
138
139 let daemon_request = DaemonRequest {
140 id: self.next_request_id(),
141 r#type: RequestType::SpawnPtySession.into(),
142 protocol_version: 1,
143 client_name: "running-process-client".into(),
144 spawn_pty_session: Some(proto),
145 ..Default::default()
146 };
147
148 let response = self.send_request(daemon_request)?;
149 ensure_ok(&response)?;
150 let payload: SpawnPtySessionResponse =
151 response
152 .spawn_pty_session
153 .ok_or_else(|| ClientError::Server {
154 code: StatusCode::Internal,
155 message: "spawn_pty_session response missing payload".into(),
156 })?;
157 Ok(SpawnedPtySession {
158 session_id: payload.session_id,
159 pid: payload.pid,
160 created_at: payload.created_at,
161 })
162 }
163
164 pub fn list_pty_sessions(
167 &mut self,
168 originator_filter: &str,
169 ) -> Result<Vec<PtySessionInfo>, ClientError> {
170 let req = DaemonRequest {
171 id: self.next_request_id(),
172 r#type: RequestType::ListPtySessions.into(),
173 protocol_version: 1,
174 client_name: "running-process-client".into(),
175 list_pty_sessions: Some(ListPtySessionsRequest {
176 originator: originator_filter.into(),
177 }),
178 ..Default::default()
179 };
180 let response = self.send_request(req)?;
181 ensure_ok(&response)?;
182 let payload: ListPtySessionsResponse =
183 response
184 .list_pty_sessions
185 .ok_or_else(|| ClientError::Server {
186 code: StatusCode::Internal,
187 message: "list_pty_sessions response missing payload".into(),
188 })?;
189 Ok(payload.sessions)
190 }
191
192 pub fn detach_pty_session(&mut self, session_id: &str) -> Result<(), ClientError> {
195 let req = DaemonRequest {
196 id: self.next_request_id(),
197 r#type: RequestType::DetachPtySession.into(),
198 protocol_version: 1,
199 client_name: "running-process-client".into(),
200 detach_pty_session: Some(DetachPtySessionRequest {
201 session_id: session_id.into(),
202 }),
203 ..Default::default()
204 };
205 let response = self.send_request(req)?;
206 ensure_ok(&response)?;
207 Ok(())
208 }
209
210 pub fn terminate_pty_session(
214 &mut self,
215 session_id: &str,
216 grace_ms: u32,
217 ) -> Result<(), ClientError> {
218 let req = DaemonRequest {
219 id: self.next_request_id(),
220 r#type: RequestType::TerminatePtySession.into(),
221 protocol_version: 1,
222 client_name: "running-process-client".into(),
223 terminate_pty_session: Some(TerminatePtySessionRequest {
224 session_id: session_id.into(),
225 grace_ms,
226 }),
227 ..Default::default()
228 };
229 let response = self.send_request(req)?;
230 ensure_ok(&response)?;
231 Ok(())
232 }
233}
234
235fn ensure_ok(response: &DaemonResponse) -> Result<(), ClientError> {
236 if response.code == StatusCode::Ok as i32 {
237 return Ok(());
238 }
239 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
240 Err(ClientError::Server {
241 code,
242 message: response.message.clone(),
243 })
244}
245
246pub struct PtyAttachment {
255 reader: BufReader<Stream>,
256 writer: BufWriter<Stream>,
257 pub initial_backlog: Vec<u8>,
260 pub bytes_missed: u64,
263}
264
265#[derive(Debug)]
267pub enum AttachError {
268 Connect(std::io::Error),
270 Io(std::io::Error),
272 Decode(prost::DecodeError),
274 Server {
276 code: StatusCode,
278 message: String,
280 },
281 MissingPayload,
283}
284
285impl std::fmt::Display for AttachError {
286 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
287 match self {
288 AttachError::Connect(e) => write!(f, "attach connect failed: {e}"),
289 AttachError::Io(e) => write!(f, "attach io error: {e}"),
290 AttachError::Decode(e) => write!(f, "attach decode error: {e}"),
291 AttachError::Server { code, message } => {
292 write!(f, "attach server error {code:?}: {message}")
293 }
294 AttachError::MissingPayload => write!(f, "attach response missing payload"),
295 }
296 }
297}
298
299impl std::error::Error for AttachError {}
300
301impl PtyAttachment {
302 pub fn attach(
304 scope_hash: Option<&str>,
305 session_id: &str,
306 rows: u16,
307 cols: u16,
308 steal: bool,
309 ) -> Result<Self, AttachError> {
310 let socket_path = paths::socket_path(scope_hash);
311 Self::attach_to(&socket_path, session_id, rows, cols, steal)
312 }
313
314 pub fn attach_to(
316 socket_path: &str,
317 session_id: &str,
318 rows: u16,
319 cols: u16,
320 steal: bool,
321 ) -> Result<Self, AttachError> {
322 let mut terminal_capabilities = current_terminal_capabilities();
323 if !terminal_capabilities.is_tty {
324 terminal_capabilities.is_tty = true;
325 terminal_capabilities.graphics = TerminalGraphicsCapabilities::unknown();
326 }
327 Self::attach_to_with_terminal_capabilities(
328 socket_path,
329 session_id,
330 rows,
331 cols,
332 steal,
333 terminal_capabilities,
334 )
335 }
336
337 pub fn attach_to_with_terminal_capabilities(
341 socket_path: &str,
342 session_id: &str,
343 rows: u16,
344 cols: u16,
345 steal: bool,
346 terminal_capabilities: TerminalCapabilities,
347 ) -> Result<Self, AttachError> {
348 let name = paths::make_socket_name(socket_path).map_err(AttachError::Connect)?;
349 use interprocess::local_socket::traits::Stream as _;
350 let stream = Stream::connect(name).map_err(AttachError::Connect)?;
351 let stream_clone = stream.try_clone().map_err(AttachError::Connect)?;
352 let mut reader = BufReader::new(stream);
353 let mut writer = BufWriter::new(stream_clone);
354
355 let attach_request = DaemonRequest {
357 id: 1,
358 r#type: RequestType::AttachPtySession.into(),
359 protocol_version: 1,
360 client_name: "running-process-client".into(),
361 attach_pty_session: Some(AttachPtySessionRequest {
362 session_id: session_id.into(),
363 rows: rows as u32,
364 cols: cols as u32,
365 steal,
366 term: terminal_capabilities.term.unwrap_or_default(),
367 is_tty: terminal_capabilities.is_tty,
368 graphics_capabilities: Some(terminal_graphics_capabilities_to_proto(
369 &terminal_capabilities.graphics,
370 )),
371 }),
372 ..Default::default()
373 };
374 write_length_prefixed(&mut writer, &attach_request.encode_to_vec())
375 .map_err(AttachError::Io)?;
376
377 let response_bytes = read_length_prefixed(&mut reader).map_err(AttachError::Io)?;
379 let response = DaemonResponse::decode(&response_bytes[..]).map_err(AttachError::Decode)?;
380 if response.code != StatusCode::Ok as i32 {
381 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
382 return Err(AttachError::Server {
383 code,
384 message: response.message,
385 });
386 }
387 let payload: AttachPtySessionResponse = response
388 .attach_pty_session
389 .ok_or(AttachError::MissingPayload)?;
390
391 Ok(Self {
392 reader,
393 writer,
394 initial_backlog: payload.backlog,
395 bytes_missed: payload.bytes_missed,
396 })
397 }
398
399 pub fn recv_frame(&mut self) -> Result<PtyStreamFrame, AttachError> {
401 let bytes = read_length_prefixed(&mut self.reader).map_err(AttachError::Io)?;
402 PtyStreamFrame::decode(&bytes[..]).map_err(AttachError::Decode)
403 }
404
405 pub fn recv_frame_with_timeout(
410 &mut self,
411 timeout: Duration,
412 ) -> Result<Option<PtyStreamFrame>, AttachError> {
413 let deadline = std::time::Instant::now() + timeout;
418 loop {
419 if !self.reader.buffer().is_empty() {
423 return self.recv_frame().map(Some);
424 }
425 if std::time::Instant::now() >= deadline {
426 return Ok(None);
427 }
428 std::thread::sleep(Duration::from_millis(20));
436 }
450 }
451
452 pub fn send_input(&mut self, bytes: &[u8]) -> Result<(), AttachError> {
454 let frame = PtyInputFrame {
455 frame: Some(InputOneof::Input(bytes.to_vec())),
456 };
457 write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
458 }
459
460 pub fn resize(&mut self, rows: u16, cols: u16) -> Result<(), AttachError> {
462 let frame = PtyInputFrame {
463 frame: Some(InputOneof::Resize(PtyResize {
464 rows: rows as u32,
465 cols: cols as u32,
466 })),
467 };
468 write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
469 }
470
471 pub fn send_interrupt(&mut self) -> Result<(), AttachError> {
473 let frame = PtyInputFrame {
474 frame: Some(InputOneof::Interrupt(true)),
475 };
476 write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
477 }
478
479 pub fn detach(mut self) -> Result<(), AttachError> {
481 let frame = PtyInputFrame {
482 frame: Some(InputOneof::Detach(true)),
483 };
484 write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
485 }
486}
487
488fn write_length_prefixed<W: Write>(w: &mut W, payload: &[u8]) -> Result<(), std::io::Error> {
493 let len = payload.len() as u32;
494 w.write_all(&len.to_be_bytes())?;
495 w.write_all(payload)?;
496 w.flush()
497}
498
499fn read_length_prefixed<R: Read>(r: &mut R) -> Result<Vec<u8>, std::io::Error> {
500 let mut len_buf = [0u8; 4];
501 r.read_exact(&mut len_buf)?;
502 let len = u32::from_be_bytes(len_buf) as usize;
503 let mut buf = vec![0u8; len];
504 r.read_exact(&mut buf)?;
505 Ok(buf)
506}
507
508#[cfg(test)]
513mod tests {
514 use super::*;
515
516 #[test]
517 fn pty_spawn_request_builder_defaults() {
518 let req = PtySpawnRequest::new(["echo", "hi"])
519 .with_size(40, 100)
520 .with_originator("test:1");
521 assert_eq!(req.argv, vec!["echo".to_string(), "hi".to_string()]);
522 assert_eq!(req.rows, 40);
523 assert_eq!(req.cols, 100);
524 assert_eq!(req.originator.as_deref(), Some("test:1"));
525 }
526}