1use crate::client::paths;
11use crate::client::{ClientError, DaemonClient};
12use crate::proto::daemon::{
13 AttachPipeStreamRequest, AttachPipeStreamResponse, DaemonRequest, DaemonResponse,
14 DetachPipeStreamRequest, KeyValue, ListPipeSessionsRequest, ListPipeSessionsResponse,
15 PipeSessionInfo, PipeStreamFrame, PipeStreamKind, RequestType, SpawnPipeSessionRequest,
16 SpawnPipeSessionResponse, StatusCode, TerminatePipeSessionRequest, WritePipeStdinRequest,
17 WritePipeStdinResponse,
18};
19use interprocess::local_socket::Stream;
20use interprocess::TryClone;
21use prost::Message;
22use std::io::{BufReader, BufWriter, Read, Write};
23use std::path::PathBuf;
24
25#[derive(Debug, Clone)]
31pub struct PipeSpawnRequest {
32 pub argv: Vec<String>,
34 pub cwd: Option<PathBuf>,
36 pub env: Vec<(String, String)>,
38 pub clear_inherited_env: bool,
40 pub originator: Option<String>,
42 pub merge_stderr_into_stdout: bool,
44}
45
46impl PipeSpawnRequest {
47 pub fn new<S: Into<String>>(argv: impl IntoIterator<Item = S>) -> Self {
49 Self {
50 argv: argv.into_iter().map(Into::into).collect(),
51 cwd: None,
52 env: Vec::new(),
53 clear_inherited_env: false,
54 originator: None,
55 merge_stderr_into_stdout: false,
56 }
57 }
58
59 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
61 self.cwd = Some(cwd.into());
62 self
63 }
64
65 pub fn with_originator(mut self, originator: impl Into<String>) -> Self {
67 self.originator = Some(originator.into());
68 self
69 }
70
71 pub fn merge_stderr(mut self) -> Self {
73 self.merge_stderr_into_stdout = true;
74 self
75 }
76
77 pub fn with_envs<I, K, V>(mut self, env: I) -> Self
79 where
80 I: IntoIterator<Item = (K, V)>,
81 K: Into<String>,
82 V: Into<String>,
83 {
84 self.env = env.into_iter().map(|(k, v)| (k.into(), v.into())).collect();
85 self
86 }
87}
88
89#[derive(Debug, Clone)]
91pub struct SpawnedPipeSession {
92 pub session_id: String,
94 pub pid: u32,
96 pub created_at: f64,
98}
99
100impl DaemonClient {
101 pub fn spawn_pipe_session(
103 &mut self,
104 request: &PipeSpawnRequest,
105 ) -> Result<SpawnedPipeSession, ClientError> {
106 let proto = SpawnPipeSessionRequest {
107 argv: request.argv.clone(),
108 cwd: request
109 .cwd
110 .as_ref()
111 .map(|p| p.to_string_lossy().into_owned())
112 .unwrap_or_default(),
113 env: request
114 .env
115 .iter()
116 .map(|(k, v)| KeyValue {
117 key: k.clone(),
118 value: v.clone(),
119 })
120 .collect(),
121 clear_inherited_env: request.clear_inherited_env,
122 originator: request.originator.clone().unwrap_or_default(),
123 merge_stderr_into_stdout: request.merge_stderr_into_stdout,
124 };
125 let daemon_request = DaemonRequest {
126 id: self.next_request_id(),
127 r#type: RequestType::SpawnPipeSession.into(),
128 protocol_version: 1,
129 client_name: "running-process-client".into(),
130 spawn_pipe_session: Some(proto),
131 ..Default::default()
132 };
133 let response = self.send_request(daemon_request)?;
134 ensure_ok(&response)?;
135 let payload: SpawnPipeSessionResponse =
136 response
137 .spawn_pipe_session
138 .ok_or_else(|| ClientError::Server {
139 code: StatusCode::Internal,
140 message: "spawn_pipe_session response missing payload".into(),
141 })?;
142 Ok(SpawnedPipeSession {
143 session_id: payload.session_id,
144 pid: payload.pid,
145 created_at: payload.created_at,
146 })
147 }
148
149 pub fn list_pipe_sessions(
153 &mut self,
154 originator_filter: &str,
155 ) -> Result<Vec<PipeSessionInfo>, ClientError> {
156 let req = DaemonRequest {
157 id: self.next_request_id(),
158 r#type: RequestType::ListPipeSessions.into(),
159 protocol_version: 1,
160 client_name: "running-process-client".into(),
161 list_pipe_sessions: Some(ListPipeSessionsRequest {
162 originator: originator_filter.into(),
163 }),
164 ..Default::default()
165 };
166 let response = self.send_request(req)?;
167 ensure_ok(&response)?;
168 let payload: ListPipeSessionsResponse =
169 response
170 .list_pipe_sessions
171 .ok_or_else(|| ClientError::Server {
172 code: StatusCode::Internal,
173 message: "list_pipe_sessions response missing payload".into(),
174 })?;
175 Ok(payload.sessions)
176 }
177
178 pub fn detach_pipe_stream(
182 &mut self,
183 session_id: &str,
184 stream: PipeStreamKind,
185 ) -> Result<(), ClientError> {
186 let req = DaemonRequest {
187 id: self.next_request_id(),
188 r#type: RequestType::DetachPipeStream.into(),
189 protocol_version: 1,
190 client_name: "running-process-client".into(),
191 detach_pipe_stream: Some(DetachPipeStreamRequest {
192 session_id: session_id.into(),
193 stream: stream as i32,
194 }),
195 ..Default::default()
196 };
197 let response = self.send_request(req)?;
198 ensure_ok(&response)?;
199 Ok(())
200 }
201
202 pub fn terminate_pipe_session(
206 &mut self,
207 session_id: &str,
208 grace_ms: u32,
209 ) -> Result<(), ClientError> {
210 let req = DaemonRequest {
211 id: self.next_request_id(),
212 r#type: RequestType::TerminatePipeSession.into(),
213 protocol_version: 1,
214 client_name: "running-process-client".into(),
215 terminate_pipe_session: Some(TerminatePipeSessionRequest {
216 session_id: session_id.into(),
217 grace_ms,
218 }),
219 ..Default::default()
220 };
221 let response = self.send_request(req)?;
222 ensure_ok(&response)?;
223 Ok(())
224 }
225
226 pub fn write_pipe_stdin(
230 &mut self,
231 session_id: &str,
232 data: &[u8],
233 close_after: bool,
234 ) -> Result<u64, ClientError> {
235 let req = DaemonRequest {
236 id: self.next_request_id(),
237 r#type: RequestType::WritePipeStdin.into(),
238 protocol_version: 1,
239 client_name: "running-process-client".into(),
240 write_pipe_stdin: Some(WritePipeStdinRequest {
241 session_id: session_id.into(),
242 data: data.to_vec(),
243 close: close_after,
244 }),
245 ..Default::default()
246 };
247 let response = self.send_request(req)?;
248 ensure_ok(&response)?;
249 let payload: WritePipeStdinResponse =
250 response
251 .write_pipe_stdin
252 .ok_or_else(|| ClientError::Server {
253 code: StatusCode::Internal,
254 message: "write_pipe_stdin response missing payload".into(),
255 })?;
256 Ok(payload.bytes_written)
257 }
258}
259
260fn ensure_ok(response: &DaemonResponse) -> Result<(), ClientError> {
261 if response.code == StatusCode::Ok as i32 {
262 return Ok(());
263 }
264 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
265 Err(ClientError::Server {
266 code,
267 message: response.message.clone(),
268 })
269}
270
271pub struct PipeStreamAttachment {
279 reader: BufReader<Stream>,
280 pub initial_backlog: Vec<u8>,
282 pub bytes_missed: u64,
284}
285
286#[derive(Debug)]
288pub enum PipeAttachError {
289 Connect(std::io::Error),
291 Io(std::io::Error),
293 Decode(prost::DecodeError),
295 Server {
297 code: StatusCode,
299 message: String,
301 },
302 MissingPayload,
304}
305
306impl std::fmt::Display for PipeAttachError {
307 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
308 match self {
309 Self::Connect(e) => write!(f, "pipe attach connect failed: {e}"),
310 Self::Io(e) => write!(f, "pipe attach io error: {e}"),
311 Self::Decode(e) => write!(f, "pipe attach decode error: {e}"),
312 Self::Server { code, message } => {
313 write!(f, "pipe attach server error {code:?}: {message}")
314 }
315 Self::MissingPayload => write!(f, "pipe attach response missing payload"),
316 }
317 }
318}
319
320impl std::error::Error for PipeAttachError {}
321
322impl PipeStreamAttachment {
323 pub fn attach(
327 scope_hash: Option<&str>,
328 session_id: &str,
329 stream: PipeStreamKind,
330 steal: bool,
331 ) -> Result<Self, PipeAttachError> {
332 let socket_path = paths::socket_path(scope_hash);
333 Self::attach_to(&socket_path, session_id, stream, steal)
334 }
335
336 pub fn attach_to(
340 socket_path: &str,
341 session_id: &str,
342 stream: PipeStreamKind,
343 steal: bool,
344 ) -> Result<Self, PipeAttachError> {
345 let name = paths::make_socket_name(socket_path).map_err(PipeAttachError::Connect)?;
346 use interprocess::local_socket::traits::Stream as _;
347 let s = Stream::connect(name).map_err(PipeAttachError::Connect)?;
348 let s_clone = s.try_clone().map_err(PipeAttachError::Connect)?;
349 let mut reader = BufReader::new(s);
350 let mut writer = BufWriter::new(s_clone);
351
352 let attach_request = DaemonRequest {
353 id: 1,
354 r#type: RequestType::AttachPipeStream.into(),
355 protocol_version: 1,
356 client_name: "running-process-client".into(),
357 attach_pipe_stream: Some(AttachPipeStreamRequest {
358 session_id: session_id.into(),
359 stream: stream as i32,
360 steal,
361 }),
362 ..Default::default()
363 };
364 write_length_prefixed(&mut writer, &attach_request.encode_to_vec())
365 .map_err(PipeAttachError::Io)?;
366 drop(writer);
369
370 let response_bytes = read_length_prefixed(&mut reader).map_err(PipeAttachError::Io)?;
371 let response =
372 DaemonResponse::decode(&response_bytes[..]).map_err(PipeAttachError::Decode)?;
373 if response.code != StatusCode::Ok as i32 {
374 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
375 return Err(PipeAttachError::Server {
376 code,
377 message: response.message,
378 });
379 }
380 let payload: AttachPipeStreamResponse = response
381 .attach_pipe_stream
382 .ok_or(PipeAttachError::MissingPayload)?;
383
384 Ok(Self {
385 reader,
386 initial_backlog: payload.backlog,
387 bytes_missed: payload.bytes_missed,
388 })
389 }
390
391 pub fn recv_frame(&mut self) -> Result<PipeStreamFrame, PipeAttachError> {
393 let bytes = read_length_prefixed(&mut self.reader).map_err(PipeAttachError::Io)?;
394 PipeStreamFrame::decode(&bytes[..]).map_err(PipeAttachError::Decode)
395 }
396}
397
398fn write_length_prefixed<W: Write>(w: &mut W, payload: &[u8]) -> Result<(), std::io::Error> {
399 let len = payload.len() as u32;
400 w.write_all(&len.to_be_bytes())?;
401 w.write_all(payload)?;
402 w.flush()
403}
404
405fn read_length_prefixed<R: Read>(r: &mut R) -> Result<Vec<u8>, std::io::Error> {
406 let mut len_buf = [0u8; 4];
407 r.read_exact(&mut len_buf)?;
408 let len = u32::from_be_bytes(len_buf) as usize;
409 let mut buf = vec![0u8; len];
410 r.read_exact(&mut buf)?;
411 Ok(buf)
412}