1use crate::client::{ClientError, DaemonClient};
14use crate::paths;
15use interprocess::local_socket::Stream;
16use interprocess::TryClone;
17use prost::Message;
18use running_process_proto::daemon::{
19 pty_input_frame::Frame as InputOneof, AttachPtySessionRequest, AttachPtySessionResponse,
20 DaemonRequest, DaemonResponse, DetachPtySessionRequest, KeyValue, ListPtySessionsRequest,
21 ListPtySessionsResponse, PtyInputFrame, PtyResize, PtySessionInfo, PtyStreamFrame, RequestType,
22 SpawnPtySessionRequest, SpawnPtySessionResponse, StatusCode, TerminatePtySessionRequest,
23};
24use std::io::{BufReader, BufWriter, Read, Write};
25use std::path::PathBuf;
26use std::time::Duration;
27
28#[derive(Debug, Clone)]
34pub struct PtySpawnRequest {
35 pub argv: Vec<String>,
36 pub cwd: Option<PathBuf>,
37 pub env: Vec<(String, String)>,
38 pub clear_inherited_env: bool,
39 pub rows: u16,
40 pub cols: u16,
41 pub originator: Option<String>,
42}
43
44impl PtySpawnRequest {
45 pub fn new<S: Into<String>>(argv: impl IntoIterator<Item = S>) -> Self {
46 Self {
47 argv: argv.into_iter().map(Into::into).collect(),
48 cwd: None,
49 env: Vec::new(),
50 clear_inherited_env: false,
51 rows: 24,
52 cols: 80,
53 originator: None,
54 }
55 }
56
57 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
58 self.cwd = Some(cwd.into());
59 self
60 }
61
62 pub fn with_size(mut self, rows: u16, cols: u16) -> Self {
63 self.rows = rows;
64 self.cols = cols;
65 self
66 }
67
68 pub fn with_originator(mut self, originator: impl Into<String>) -> Self {
69 self.originator = Some(originator.into());
70 self
71 }
72
73 pub fn with_envs<I, K, V>(mut self, env: I) -> Self
74 where
75 I: IntoIterator<Item = (K, V)>,
76 K: Into<String>,
77 V: Into<String>,
78 {
79 self.env = env
80 .into_iter()
81 .map(|(k, v)| (k.into(), v.into()))
82 .collect();
83 self
84 }
85}
86
87#[derive(Debug, Clone)]
89pub struct SpawnedPtySession {
90 pub session_id: String,
91 pub pid: u32,
92 pub created_at: f64,
93}
94
95impl DaemonClient {
96 pub fn spawn_pty_session(
98 &mut self,
99 request: &PtySpawnRequest,
100 ) -> Result<SpawnedPtySession, ClientError> {
101 let proto = SpawnPtySessionRequest {
102 argv: request.argv.clone(),
103 cwd: request
104 .cwd
105 .as_ref()
106 .map(|p| p.to_string_lossy().into_owned())
107 .unwrap_or_default(),
108 env: request
109 .env
110 .iter()
111 .map(|(k, v)| KeyValue {
112 key: k.clone(),
113 value: v.clone(),
114 })
115 .collect(),
116 clear_inherited_env: request.clear_inherited_env,
117 rows: request.rows as u32,
118 cols: request.cols as u32,
119 originator: request.originator.clone().unwrap_or_default(),
120 };
121
122 let daemon_request = DaemonRequest {
123 id: self.next_request_id(),
124 r#type: RequestType::SpawnPtySession.into(),
125 protocol_version: 1,
126 client_name: "running-process-client".into(),
127 spawn_pty_session: Some(proto),
128 ..Default::default()
129 };
130
131 let response = self.send_request(daemon_request)?;
132 ensure_ok(&response)?;
133 let payload: SpawnPtySessionResponse =
134 response.spawn_pty_session.ok_or_else(|| ClientError::Server {
135 code: StatusCode::Internal,
136 message: "spawn_pty_session response missing payload".into(),
137 })?;
138 Ok(SpawnedPtySession {
139 session_id: payload.session_id,
140 pid: payload.pid,
141 created_at: payload.created_at,
142 })
143 }
144
145 pub fn list_pty_sessions(
148 &mut self,
149 originator_filter: &str,
150 ) -> Result<Vec<PtySessionInfo>, ClientError> {
151 let req = DaemonRequest {
152 id: self.next_request_id(),
153 r#type: RequestType::ListPtySessions.into(),
154 protocol_version: 1,
155 client_name: "running-process-client".into(),
156 list_pty_sessions: Some(ListPtySessionsRequest {
157 originator: originator_filter.into(),
158 }),
159 ..Default::default()
160 };
161 let response = self.send_request(req)?;
162 ensure_ok(&response)?;
163 let payload: ListPtySessionsResponse = response
164 .list_pty_sessions
165 .ok_or_else(|| ClientError::Server {
166 code: StatusCode::Internal,
167 message: "list_pty_sessions response missing payload".into(),
168 })?;
169 Ok(payload.sessions)
170 }
171
172 pub fn detach_pty_session(&mut self, session_id: &str) -> Result<(), ClientError> {
175 let req = DaemonRequest {
176 id: self.next_request_id(),
177 r#type: RequestType::DetachPtySession.into(),
178 protocol_version: 1,
179 client_name: "running-process-client".into(),
180 detach_pty_session: Some(DetachPtySessionRequest {
181 session_id: session_id.into(),
182 }),
183 ..Default::default()
184 };
185 let response = self.send_request(req)?;
186 ensure_ok(&response)?;
187 Ok(())
188 }
189
190 pub fn terminate_pty_session(
194 &mut self,
195 session_id: &str,
196 grace_ms: u32,
197 ) -> Result<(), ClientError> {
198 let req = DaemonRequest {
199 id: self.next_request_id(),
200 r#type: RequestType::TerminatePtySession.into(),
201 protocol_version: 1,
202 client_name: "running-process-client".into(),
203 terminate_pty_session: Some(TerminatePtySessionRequest {
204 session_id: session_id.into(),
205 grace_ms,
206 }),
207 ..Default::default()
208 };
209 let response = self.send_request(req)?;
210 ensure_ok(&response)?;
211 Ok(())
212 }
213
214}
215
216fn ensure_ok(response: &DaemonResponse) -> Result<(), ClientError> {
217 if response.code == StatusCode::Ok as i32 {
218 return Ok(());
219 }
220 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
221 Err(ClientError::Server {
222 code,
223 message: response.message.clone(),
224 })
225}
226
227pub struct PtyAttachment {
236 reader: BufReader<Stream>,
237 writer: BufWriter<Stream>,
238 pub initial_backlog: Vec<u8>,
241 pub bytes_missed: u64,
244}
245
246#[derive(Debug)]
248pub enum AttachError {
249 Connect(std::io::Error),
250 Io(std::io::Error),
251 Decode(prost::DecodeError),
252 Server { code: StatusCode, message: String },
253 MissingPayload,
255}
256
257impl std::fmt::Display for AttachError {
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 match self {
260 AttachError::Connect(e) => write!(f, "attach connect failed: {e}"),
261 AttachError::Io(e) => write!(f, "attach io error: {e}"),
262 AttachError::Decode(e) => write!(f, "attach decode error: {e}"),
263 AttachError::Server { code, message } => {
264 write!(f, "attach server error {code:?}: {message}")
265 }
266 AttachError::MissingPayload => write!(f, "attach response missing payload"),
267 }
268 }
269}
270
271impl std::error::Error for AttachError {}
272
273impl PtyAttachment {
274 pub fn attach(
276 scope_hash: Option<&str>,
277 session_id: &str,
278 rows: u16,
279 cols: u16,
280 steal: bool,
281 ) -> Result<Self, AttachError> {
282 let socket_path = paths::socket_path(scope_hash);
283 Self::attach_to(&socket_path, session_id, rows, cols, steal)
284 }
285
286 pub fn attach_to(
288 socket_path: &str,
289 session_id: &str,
290 rows: u16,
291 cols: u16,
292 steal: bool,
293 ) -> Result<Self, AttachError> {
294 let name = paths::make_socket_name(socket_path).map_err(AttachError::Connect)?;
295 use interprocess::local_socket::traits::Stream as _;
296 let stream = Stream::connect(name).map_err(AttachError::Connect)?;
297 let stream_clone = stream.try_clone().map_err(AttachError::Connect)?;
298 let mut reader = BufReader::new(stream);
299 let mut writer = BufWriter::new(stream_clone);
300
301 let attach_request = DaemonRequest {
303 id: 1,
304 r#type: RequestType::AttachPtySession.into(),
305 protocol_version: 1,
306 client_name: "running-process-client".into(),
307 attach_pty_session: Some(AttachPtySessionRequest {
308 session_id: session_id.into(),
309 rows: rows as u32,
310 cols: cols as u32,
311 steal,
312 term: std::env::var("TERM").unwrap_or_default(),
313 is_tty: true,
314 }),
315 ..Default::default()
316 };
317 write_length_prefixed(&mut writer, &attach_request.encode_to_vec())
318 .map_err(AttachError::Io)?;
319
320 let response_bytes = read_length_prefixed(&mut reader).map_err(AttachError::Io)?;
322 let response =
323 DaemonResponse::decode(&response_bytes[..]).map_err(AttachError::Decode)?;
324 if response.code != StatusCode::Ok as i32 {
325 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
326 return Err(AttachError::Server {
327 code,
328 message: response.message,
329 });
330 }
331 let payload: AttachPtySessionResponse = response
332 .attach_pty_session
333 .ok_or(AttachError::MissingPayload)?;
334
335 Ok(Self {
336 reader,
337 writer,
338 initial_backlog: payload.backlog,
339 bytes_missed: payload.bytes_missed,
340 })
341 }
342
343 pub fn recv_frame(&mut self) -> Result<PtyStreamFrame, AttachError> {
345 let bytes = read_length_prefixed(&mut self.reader).map_err(AttachError::Io)?;
346 PtyStreamFrame::decode(&bytes[..]).map_err(AttachError::Decode)
347 }
348
349 pub fn recv_frame_with_timeout(
354 &mut self,
355 timeout: Duration,
356 ) -> Result<Option<PtyStreamFrame>, AttachError> {
357 let deadline = std::time::Instant::now() + timeout;
362 loop {
363 if !self.reader.buffer().is_empty() {
367 return self.recv_frame().map(Some);
368 }
369 if std::time::Instant::now() >= deadline {
370 return Ok(None);
371 }
372 std::thread::sleep(Duration::from_millis(20));
374 }
388 }
389
390 pub fn send_input(&mut self, bytes: &[u8]) -> Result<(), AttachError> {
392 let frame = PtyInputFrame {
393 frame: Some(InputOneof::Input(bytes.to_vec())),
394 };
395 write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
396 }
397
398 pub fn resize(&mut self, rows: u16, cols: u16) -> Result<(), AttachError> {
400 let frame = PtyInputFrame {
401 frame: Some(InputOneof::Resize(PtyResize {
402 rows: rows as u32,
403 cols: cols as u32,
404 })),
405 };
406 write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
407 }
408
409 pub fn send_interrupt(&mut self) -> Result<(), AttachError> {
411 let frame = PtyInputFrame {
412 frame: Some(InputOneof::Interrupt(true)),
413 };
414 write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
415 }
416
417 pub fn detach(mut self) -> Result<(), AttachError> {
419 let frame = PtyInputFrame {
420 frame: Some(InputOneof::Detach(true)),
421 };
422 write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
423 }
424}
425
426fn write_length_prefixed<W: Write>(w: &mut W, payload: &[u8]) -> Result<(), std::io::Error> {
431 let len = payload.len() as u32;
432 w.write_all(&len.to_be_bytes())?;
433 w.write_all(payload)?;
434 w.flush()
435}
436
437fn read_length_prefixed<R: Read>(r: &mut R) -> Result<Vec<u8>, std::io::Error> {
438 let mut len_buf = [0u8; 4];
439 r.read_exact(&mut len_buf)?;
440 let len = u32::from_be_bytes(len_buf) as usize;
441 let mut buf = vec![0u8; len];
442 r.read_exact(&mut buf)?;
443 Ok(buf)
444}
445
446#[cfg(test)]
451mod tests {
452 use super::*;
453
454 #[test]
455 fn pty_spawn_request_builder_defaults() {
456 let req = PtySpawnRequest::new(["echo", "hi"])
457 .with_size(40, 100)
458 .with_originator("test:1");
459 assert_eq!(req.argv, vec!["echo".to_string(), "hi".to_string()]);
460 assert_eq!(req.rows, 40);
461 assert_eq!(req.cols, 100);
462 assert_eq!(req.originator.as_deref(), Some("test:1"));
463 }
464}