1use crate::client::{ClientError, DaemonClient};
11use crate::paths;
12use interprocess::local_socket::Stream;
13use interprocess::TryClone;
14use prost::Message;
15use running_process_proto::daemon::{
16 AttachPipeStreamRequest, AttachPipeStreamResponse, DaemonRequest, DaemonResponse,
17 DetachPipeStreamRequest, KeyValue, ListPipeSessionsRequest, ListPipeSessionsResponse,
18 PipeSessionInfo, PipeStreamFrame, PipeStreamKind, RequestType, SpawnPipeSessionRequest,
19 SpawnPipeSessionResponse, StatusCode, TerminatePipeSessionRequest, WritePipeStdinRequest,
20 WritePipeStdinResponse,
21};
22use std::io::{BufReader, BufWriter, Read, Write};
23use std::path::PathBuf;
24
25#[derive(Debug, Clone)]
30pub struct PipeSpawnRequest {
31 pub argv: Vec<String>,
32 pub cwd: Option<PathBuf>,
33 pub env: Vec<(String, String)>,
34 pub clear_inherited_env: bool,
35 pub originator: Option<String>,
36 pub merge_stderr_into_stdout: bool,
37}
38
39impl PipeSpawnRequest {
40 pub fn new<S: Into<String>>(argv: impl IntoIterator<Item = S>) -> Self {
41 Self {
42 argv: argv.into_iter().map(Into::into).collect(),
43 cwd: None,
44 env: Vec::new(),
45 clear_inherited_env: false,
46 originator: None,
47 merge_stderr_into_stdout: false,
48 }
49 }
50
51 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
52 self.cwd = Some(cwd.into());
53 self
54 }
55
56 pub fn with_originator(mut self, originator: impl Into<String>) -> Self {
57 self.originator = Some(originator.into());
58 self
59 }
60
61 pub fn merge_stderr(mut self) -> Self {
62 self.merge_stderr_into_stdout = true;
63 self
64 }
65
66 pub fn with_envs<I, K, V>(mut self, env: I) -> Self
67 where
68 I: IntoIterator<Item = (K, V)>,
69 K: Into<String>,
70 V: Into<String>,
71 {
72 self.env = env
73 .into_iter()
74 .map(|(k, v)| (k.into(), v.into()))
75 .collect();
76 self
77 }
78}
79
80#[derive(Debug, Clone)]
81pub struct SpawnedPipeSession {
82 pub session_id: String,
83 pub pid: u32,
84 pub created_at: f64,
85}
86
87impl DaemonClient {
88 pub fn spawn_pipe_session(
89 &mut self,
90 request: &PipeSpawnRequest,
91 ) -> Result<SpawnedPipeSession, ClientError> {
92 let proto = SpawnPipeSessionRequest {
93 argv: request.argv.clone(),
94 cwd: request
95 .cwd
96 .as_ref()
97 .map(|p| p.to_string_lossy().into_owned())
98 .unwrap_or_default(),
99 env: request
100 .env
101 .iter()
102 .map(|(k, v)| KeyValue {
103 key: k.clone(),
104 value: v.clone(),
105 })
106 .collect(),
107 clear_inherited_env: request.clear_inherited_env,
108 originator: request.originator.clone().unwrap_or_default(),
109 merge_stderr_into_stdout: request.merge_stderr_into_stdout,
110 };
111 let daemon_request = DaemonRequest {
112 id: self.next_request_id(),
113 r#type: RequestType::SpawnPipeSession.into(),
114 protocol_version: 1,
115 client_name: "running-process-client".into(),
116 spawn_pipe_session: Some(proto),
117 ..Default::default()
118 };
119 let response = self.send_request(daemon_request)?;
120 ensure_ok(&response)?;
121 let payload: SpawnPipeSessionResponse = response
122 .spawn_pipe_session
123 .ok_or_else(|| ClientError::Server {
124 code: StatusCode::Internal,
125 message: "spawn_pipe_session response missing payload".into(),
126 })?;
127 Ok(SpawnedPipeSession {
128 session_id: payload.session_id,
129 pid: payload.pid,
130 created_at: payload.created_at,
131 })
132 }
133
134 pub fn list_pipe_sessions(
135 &mut self,
136 originator_filter: &str,
137 ) -> Result<Vec<PipeSessionInfo>, ClientError> {
138 let req = DaemonRequest {
139 id: self.next_request_id(),
140 r#type: RequestType::ListPipeSessions.into(),
141 protocol_version: 1,
142 client_name: "running-process-client".into(),
143 list_pipe_sessions: Some(ListPipeSessionsRequest {
144 originator: originator_filter.into(),
145 }),
146 ..Default::default()
147 };
148 let response = self.send_request(req)?;
149 ensure_ok(&response)?;
150 let payload: ListPipeSessionsResponse = response
151 .list_pipe_sessions
152 .ok_or_else(|| ClientError::Server {
153 code: StatusCode::Internal,
154 message: "list_pipe_sessions response missing payload".into(),
155 })?;
156 Ok(payload.sessions)
157 }
158
159 pub fn detach_pipe_stream(
160 &mut self,
161 session_id: &str,
162 stream: PipeStreamKind,
163 ) -> Result<(), ClientError> {
164 let req = DaemonRequest {
165 id: self.next_request_id(),
166 r#type: RequestType::DetachPipeStream.into(),
167 protocol_version: 1,
168 client_name: "running-process-client".into(),
169 detach_pipe_stream: Some(DetachPipeStreamRequest {
170 session_id: session_id.into(),
171 stream: stream as i32,
172 }),
173 ..Default::default()
174 };
175 let response = self.send_request(req)?;
176 ensure_ok(&response)?;
177 Ok(())
178 }
179
180 pub fn terminate_pipe_session(
181 &mut self,
182 session_id: &str,
183 grace_ms: u32,
184 ) -> Result<(), ClientError> {
185 let req = DaemonRequest {
186 id: self.next_request_id(),
187 r#type: RequestType::TerminatePipeSession.into(),
188 protocol_version: 1,
189 client_name: "running-process-client".into(),
190 terminate_pipe_session: Some(TerminatePipeSessionRequest {
191 session_id: session_id.into(),
192 grace_ms,
193 }),
194 ..Default::default()
195 };
196 let response = self.send_request(req)?;
197 ensure_ok(&response)?;
198 Ok(())
199 }
200
201 pub fn write_pipe_stdin(
202 &mut self,
203 session_id: &str,
204 data: &[u8],
205 close_after: bool,
206 ) -> Result<u64, ClientError> {
207 let req = DaemonRequest {
208 id: self.next_request_id(),
209 r#type: RequestType::WritePipeStdin.into(),
210 protocol_version: 1,
211 client_name: "running-process-client".into(),
212 write_pipe_stdin: Some(WritePipeStdinRequest {
213 session_id: session_id.into(),
214 data: data.to_vec(),
215 close: close_after,
216 }),
217 ..Default::default()
218 };
219 let response = self.send_request(req)?;
220 ensure_ok(&response)?;
221 let payload: WritePipeStdinResponse = response
222 .write_pipe_stdin
223 .ok_or_else(|| ClientError::Server {
224 code: StatusCode::Internal,
225 message: "write_pipe_stdin response missing payload".into(),
226 })?;
227 Ok(payload.bytes_written)
228 }
229}
230
231fn ensure_ok(response: &DaemonResponse) -> Result<(), ClientError> {
232 if response.code == StatusCode::Ok as i32 {
233 return Ok(());
234 }
235 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
236 Err(ClientError::Server {
237 code,
238 message: response.message.clone(),
239 })
240}
241
242pub struct PipeStreamAttachment {
247 reader: BufReader<Stream>,
248 pub initial_backlog: Vec<u8>,
249 pub bytes_missed: u64,
250}
251
252#[derive(Debug)]
253pub enum PipeAttachError {
254 Connect(std::io::Error),
255 Io(std::io::Error),
256 Decode(prost::DecodeError),
257 Server { code: StatusCode, message: String },
258 MissingPayload,
259}
260
261impl std::fmt::Display for PipeAttachError {
262 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263 match self {
264 Self::Connect(e) => write!(f, "pipe attach connect failed: {e}"),
265 Self::Io(e) => write!(f, "pipe attach io error: {e}"),
266 Self::Decode(e) => write!(f, "pipe attach decode error: {e}"),
267 Self::Server { code, message } => {
268 write!(f, "pipe attach server error {code:?}: {message}")
269 }
270 Self::MissingPayload => write!(f, "pipe attach response missing payload"),
271 }
272 }
273}
274
275impl std::error::Error for PipeAttachError {}
276
277impl PipeStreamAttachment {
278 pub fn attach(
279 scope_hash: Option<&str>,
280 session_id: &str,
281 stream: PipeStreamKind,
282 steal: bool,
283 ) -> Result<Self, PipeAttachError> {
284 let socket_path = paths::socket_path(scope_hash);
285 Self::attach_to(&socket_path, session_id, stream, steal)
286 }
287
288 pub fn attach_to(
289 socket_path: &str,
290 session_id: &str,
291 stream: PipeStreamKind,
292 steal: bool,
293 ) -> Result<Self, PipeAttachError> {
294 let name = paths::make_socket_name(socket_path).map_err(PipeAttachError::Connect)?;
295 use interprocess::local_socket::traits::Stream as _;
296 let s = Stream::connect(name).map_err(PipeAttachError::Connect)?;
297 let s_clone = s.try_clone().map_err(PipeAttachError::Connect)?;
298 let mut reader = BufReader::new(s);
299 let mut writer = BufWriter::new(s_clone);
300
301 let attach_request = DaemonRequest {
302 id: 1,
303 r#type: RequestType::AttachPipeStream.into(),
304 protocol_version: 1,
305 client_name: "running-process-client".into(),
306 attach_pipe_stream: Some(AttachPipeStreamRequest {
307 session_id: session_id.into(),
308 stream: stream as i32,
309 steal,
310 }),
311 ..Default::default()
312 };
313 write_length_prefixed(&mut writer, &attach_request.encode_to_vec())
314 .map_err(PipeAttachError::Io)?;
315 drop(writer);
318
319 let response_bytes = read_length_prefixed(&mut reader).map_err(PipeAttachError::Io)?;
320 let response = DaemonResponse::decode(&response_bytes[..]).map_err(PipeAttachError::Decode)?;
321 if response.code != StatusCode::Ok as i32 {
322 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
323 return Err(PipeAttachError::Server {
324 code,
325 message: response.message,
326 });
327 }
328 let payload: AttachPipeStreamResponse = response
329 .attach_pipe_stream
330 .ok_or(PipeAttachError::MissingPayload)?;
331
332 Ok(Self {
333 reader,
334 initial_backlog: payload.backlog,
335 bytes_missed: payload.bytes_missed,
336 })
337 }
338
339 pub fn recv_frame(&mut self) -> Result<PipeStreamFrame, PipeAttachError> {
340 let bytes = read_length_prefixed(&mut self.reader).map_err(PipeAttachError::Io)?;
341 PipeStreamFrame::decode(&bytes[..]).map_err(PipeAttachError::Decode)
342 }
343}
344
345fn write_length_prefixed<W: Write>(w: &mut W, payload: &[u8]) -> Result<(), std::io::Error> {
346 let len = payload.len() as u32;
347 w.write_all(&len.to_be_bytes())?;
348 w.write_all(payload)?;
349 w.flush()
350}
351
352fn read_length_prefixed<R: Read>(r: &mut R) -> Result<Vec<u8>, std::io::Error> {
353 let mut len_buf = [0u8; 4];
354 r.read_exact(&mut len_buf)?;
355 let len = u32::from_be_bytes(len_buf) as usize;
356 let mut buf = vec![0u8; len];
357 r.read_exact(&mut buf)?;
358 Ok(buf)
359}