1use crate::paths;
8use interprocess::local_socket::Stream;
9use interprocess::TryClone;
10use prost::Message;
11use running_process_proto::daemon::{
12 BulkTerminateSessionsRequest, BulkTerminateSessionsResponse, DaemonRequest, DaemonResponse,
13 GetProcessTreeRequest, GetSessionBacklogRequest, GetSessionBacklogResponse, KeyValue,
14 KillTreeRequest, KillZombiesRequest, ListActiveRequest, ListByOriginatorRequest, PingRequest,
15 PipeStreamKind, PurgeExitedSessionsRequest, PurgeExitedSessionsResponse, RequestType,
16 ResizePtySessionRequest, ServiceConfig, ServiceDeleteRequest, ServiceDescribeRequest,
17 ServiceFlushRequest, ServiceListRequest, ServiceLogsRequest, ServiceRestartRequest,
18 ServiceResurrectRequest, ServiceSaveRequest, ServiceStartRequest, ServiceStopRequest,
19 ShutdownRequest, SpawnDaemonRequest as ProtoSpawnDaemonRequest, StatusCode, StatusRequest,
20};
21use std::io::{BufReader, BufWriter, Read, Write};
22use std::path::PathBuf;
23use std::sync::atomic::{AtomicU64, Ordering};
24
25#[derive(Debug)]
31pub enum ClientError {
32 Connect(std::io::Error),
34 Io(std::io::Error),
36 Decode(prost::DecodeError),
38 Server { code: StatusCode, message: String },
40 DaemonNotRunning,
42}
43
44impl std::fmt::Display for ClientError {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 ClientError::Connect(e) => write!(f, "failed to connect to daemon: {e}"),
48 ClientError::Io(e) => write!(f, "daemon I/O error: {e}"),
49 ClientError::Decode(e) => write!(f, "failed to decode daemon response: {e}"),
50 ClientError::Server { code, message } => {
51 write!(f, "daemon returned {:?}: {}", code, message)
52 }
53 ClientError::DaemonNotRunning => write!(f, "daemon is not running"),
54 }
55 }
56}
57
58impl std::error::Error for ClientError {
59 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
60 match self {
61 ClientError::Connect(e) | ClientError::Io(e) => Some(e),
62 ClientError::Decode(e) => Some(e),
63 ClientError::Server { .. } | ClientError::DaemonNotRunning => None,
64 }
65 }
66}
67
68#[derive(Debug, Clone)]
74pub struct SpawnCommandRequest {
75 pub command: String,
76 pub cwd: Option<PathBuf>,
77 pub env: Vec<(String, String)>,
78 pub originator: Option<String>,
79 pub clear_inherited_env: bool,
85}
86
87impl SpawnCommandRequest {
88 fn default_originator() -> String {
89 let caller = std::env::current_exe()
90 .ok()
91 .and_then(|path| {
92 path.file_stem()
93 .map(|stem| stem.to_string_lossy().into_owned())
94 })
95 .filter(|value| !value.is_empty())
96 .unwrap_or_else(|| "running-process-client".to_string());
97 format!("{caller}:{}", std::process::id())
98 }
99
100 pub fn shell(command: impl Into<String>) -> Self {
103 Self {
104 command: command.into(),
105 cwd: std::env::current_dir().ok(),
106 env: std::env::vars().collect(),
107 originator: Some(Self::default_originator()),
108 clear_inherited_env: false,
109 }
110 }
111
112 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
114 self.cwd = Some(cwd.into());
115 self
116 }
117
118 pub fn with_envs<I, K, V>(mut self, env: I) -> Self
122 where
123 I: IntoIterator<Item = (K, V)>,
124 K: Into<String>,
125 V: Into<String>,
126 {
127 self.env = env
128 .into_iter()
129 .map(|(key, value)| (key.into(), value.into()))
130 .collect();
131 self
132 }
133
134 pub fn with_env_replace<I, K, V>(mut self, env: I) -> Self
147 where
148 I: IntoIterator<Item = (K, V)>,
149 K: Into<String>,
150 V: Into<String>,
151 {
152 self.env = env
153 .into_iter()
154 .map(|(key, value)| (key.into(), value.into()))
155 .collect();
156 self.clear_inherited_env = true;
157 self
158 }
159
160 pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
163 let key = key.into();
164 let value = value.into();
165 if let Some((_, existing)) = self
166 .env
167 .iter_mut()
168 .find(|(existing_key, _)| *existing_key == key)
169 {
170 *existing = value;
171 } else {
172 self.env.push((key, value));
173 }
174 self
175 }
176
177 pub fn with_originator(mut self, originator: impl Into<String>) -> Self {
180 self.originator = Some(originator.into());
181 self
182 }
183}
184
185#[derive(Debug, Clone, PartialEq)]
187pub struct SpawnedDaemon {
188 pub pid: u32,
189 pub created_at: f64,
190 pub command: String,
191 pub cwd: Option<String>,
192 pub originator: Option<String>,
193 pub containment: String,
194}
195
196pub struct DaemonClient {
205 reader: BufReader<Stream>,
206 writer: BufWriter<Stream>,
207 next_id: AtomicU64,
208}
209
210impl DaemonClient {
211 pub fn connect(scope_hash: Option<&str>) -> Result<Self, ClientError> {
216 let path = paths::socket_path(scope_hash);
217 Self::connect_to(&path)
218 }
219
220 pub fn connect_to(socket_path: &str) -> Result<Self, ClientError> {
225 let name = paths::make_socket_name(socket_path).map_err(ClientError::Connect)?;
226
227 use interprocess::local_socket::traits::Stream as _;
228 let stream = Stream::connect(name).map_err(ClientError::Connect)?;
229 let stream_clone = stream.try_clone().map_err(ClientError::Connect)?;
230
231 Ok(Self {
232 reader: BufReader::new(stream),
233 writer: BufWriter::new(stream_clone),
234 next_id: AtomicU64::new(1),
235 })
236 }
237
238 pub fn send_request(&mut self, request: DaemonRequest) -> Result<DaemonResponse, ClientError> {
243 let payload = request.encode_to_vec();
245 let len = payload.len() as u32;
246
247 self.writer
249 .write_all(&len.to_be_bytes())
250 .map_err(ClientError::Io)?;
251 self.writer.write_all(&payload).map_err(ClientError::Io)?;
252 self.writer.flush().map_err(ClientError::Io)?;
253
254 let mut len_buf = [0u8; 4];
256 self.reader
257 .read_exact(&mut len_buf)
258 .map_err(ClientError::Io)?;
259 let resp_len = u32::from_be_bytes(len_buf) as usize;
260
261 let mut resp_buf = vec![0u8; resp_len];
263 self.reader
264 .read_exact(&mut resp_buf)
265 .map_err(ClientError::Io)?;
266
267 DaemonResponse::decode(&resp_buf[..]).map_err(ClientError::Decode)
268 }
269
270 pub(crate) fn next_request_id(&self) -> u64 {
276 self.next_id.fetch_add(1, Ordering::Relaxed)
277 }
278
279 fn ensure_ok(&self, response: &DaemonResponse) -> Result<(), ClientError> {
280 if response.code == StatusCode::Ok as i32 {
281 return Ok(());
282 }
283
284 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
285 Err(ClientError::Server {
286 code,
287 message: response.message.clone(),
288 })
289 }
290
291 pub fn ping(&mut self) -> Result<DaemonResponse, ClientError> {
293 let request = DaemonRequest {
294 id: self.next_request_id(),
295 r#type: RequestType::Ping.into(),
296 protocol_version: 1,
297 client_name: String::from("running-process-client"),
298 ping: Some(PingRequest {}),
299 ..Default::default()
300 };
301 self.send_request(request)
302 }
303
304 pub fn shutdown(
306 &mut self,
307 graceful: bool,
308 timeout_seconds: f64,
309 ) -> Result<DaemonResponse, ClientError> {
310 let request = DaemonRequest {
311 id: self.next_request_id(),
312 r#type: RequestType::Shutdown.into(),
313 protocol_version: 1,
314 client_name: String::from("running-process-client"),
315 shutdown: Some(ShutdownRequest {
316 graceful,
317 timeout_seconds,
318 }),
319 ..Default::default()
320 };
321 self.send_request(request)
322 }
323
324 pub fn status(&mut self) -> Result<DaemonResponse, ClientError> {
326 let request = DaemonRequest {
327 id: self.next_request_id(),
328 r#type: RequestType::Status.into(),
329 protocol_version: 1,
330 client_name: String::from("running-process-client"),
331 status: Some(StatusRequest {}),
332 ..Default::default()
333 };
334 self.send_request(request)
335 }
336
337 pub fn list_active(&mut self) -> Result<DaemonResponse, ClientError> {
339 let request = DaemonRequest {
340 id: self.next_request_id(),
341 r#type: RequestType::ListActive.into(),
342 protocol_version: 1,
343 client_name: String::from("running-process-client"),
344 list_active: Some(ListActiveRequest {}),
345 ..Default::default()
346 };
347 self.send_request(request)
348 }
349
350 pub fn list_by_originator(&mut self, tool: &str) -> Result<DaemonResponse, ClientError> {
352 let request = DaemonRequest {
353 id: self.next_request_id(),
354 r#type: RequestType::ListByOriginator.into(),
355 protocol_version: 1,
356 client_name: String::from("running-process-client"),
357 list_by_originator: Some(ListByOriginatorRequest {
358 tool: tool.to_string(),
359 }),
360 ..Default::default()
361 };
362 self.send_request(request)
363 }
364
365 pub fn kill_zombies(&mut self, dry_run: bool) -> Result<DaemonResponse, ClientError> {
367 let request = DaemonRequest {
368 id: self.next_request_id(),
369 r#type: RequestType::KillZombies.into(),
370 protocol_version: 1,
371 client_name: String::from("running-process-client"),
372 kill_zombies: Some(KillZombiesRequest { dry_run }),
373 ..Default::default()
374 };
375 self.send_request(request)
376 }
377
378 pub fn kill_tree(
380 &mut self,
381 pid: u32,
382 timeout_seconds: f64,
383 ) -> Result<DaemonResponse, ClientError> {
384 let request = DaemonRequest {
385 id: self.next_request_id(),
386 r#type: RequestType::KillTree.into(),
387 protocol_version: 1,
388 client_name: String::from("running-process-client"),
389 kill_tree: Some(KillTreeRequest {
390 pid,
391 timeout_seconds,
392 }),
393 ..Default::default()
394 };
395 self.send_request(request)
396 }
397
398 pub fn get_process_tree(&mut self, pid: u32) -> Result<DaemonResponse, ClientError> {
400 let request = DaemonRequest {
401 id: self.next_request_id(),
402 r#type: RequestType::GetProcessTree.into(),
403 protocol_version: 1,
404 client_name: String::from("running-process-client"),
405 get_process_tree: Some(GetProcessTreeRequest { pid }),
406 ..Default::default()
407 };
408 self.send_request(request)
409 }
410
411 pub fn spawn_command(
413 &mut self,
414 request: &SpawnCommandRequest,
415 ) -> Result<SpawnedDaemon, ClientError> {
416 let daemon_request = DaemonRequest {
417 id: self.next_request_id(),
418 r#type: RequestType::SpawnDaemon.into(),
419 protocol_version: 1,
420 client_name: String::from("running-process-client"),
421 spawn_daemon: Some(ProtoSpawnDaemonRequest {
422 command: request.command.clone(),
423 cwd: request
424 .cwd
425 .as_ref()
426 .map(|cwd| cwd.to_string_lossy().into_owned())
427 .unwrap_or_default(),
428 env: request
429 .env
430 .iter()
431 .map(|(k, v)| KeyValue {
432 key: k.clone(),
433 value: v.clone(),
434 })
435 .collect(),
436 originator: request.originator.clone().unwrap_or_default(),
437 clear_inherited_env: request.clear_inherited_env,
438 }),
439 ..Default::default()
440 };
441
442 let response = self.send_request(daemon_request)?;
443 self.ensure_ok(&response)?;
444
445 let payload = response.spawn_daemon.ok_or_else(|| ClientError::Server {
446 code: StatusCode::Internal,
447 message: "spawn response missing payload".to_string(),
448 })?;
449
450 Ok(SpawnedDaemon {
451 pid: payload.pid,
452 created_at: payload.created_at,
453 command: payload.command,
454 cwd: if payload.cwd.is_empty() {
455 None
456 } else {
457 Some(payload.cwd)
458 },
459 originator: if payload.originator.is_empty() {
460 None
461 } else {
462 Some(payload.originator)
463 },
464 containment: payload.containment,
465 })
466 }
467
468 pub fn service_start(&mut self, config: ServiceConfig) -> Result<DaemonResponse, ClientError> {
472 let request = DaemonRequest {
473 id: self.next_request_id(),
474 r#type: RequestType::ServiceStart.into(),
475 protocol_version: 1,
476 client_name: String::from("running-process-client"),
477 service_start: Some(ServiceStartRequest {
478 config: Some(config),
479 }),
480 ..Default::default()
481 };
482 self.send_request(request)
483 }
484
485 pub fn service_stop(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
487 let request = DaemonRequest {
488 id: self.next_request_id(),
489 r#type: RequestType::ServiceStop.into(),
490 protocol_version: 1,
491 client_name: String::from("running-process-client"),
492 service_stop: Some(ServiceStopRequest {
493 target: target.to_string(),
494 }),
495 ..Default::default()
496 };
497 self.send_request(request)
498 }
499
500 pub fn service_restart(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
502 let request = DaemonRequest {
503 id: self.next_request_id(),
504 r#type: RequestType::ServiceRestart.into(),
505 protocol_version: 1,
506 client_name: String::from("running-process-client"),
507 service_restart: Some(ServiceRestartRequest {
508 target: target.to_string(),
509 }),
510 ..Default::default()
511 };
512 self.send_request(request)
513 }
514
515 pub fn service_delete(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
517 let request = DaemonRequest {
518 id: self.next_request_id(),
519 r#type: RequestType::ServiceDelete.into(),
520 protocol_version: 1,
521 client_name: String::from("running-process-client"),
522 service_delete: Some(ServiceDeleteRequest {
523 target: target.to_string(),
524 }),
525 ..Default::default()
526 };
527 self.send_request(request)
528 }
529
530 pub fn service_list(&mut self) -> Result<DaemonResponse, ClientError> {
532 let request = DaemonRequest {
533 id: self.next_request_id(),
534 r#type: RequestType::ServiceList.into(),
535 protocol_version: 1,
536 client_name: String::from("running-process-client"),
537 service_list: Some(ServiceListRequest {}),
538 ..Default::default()
539 };
540 self.send_request(request)
541 }
542
543 pub fn service_describe(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
545 let request = DaemonRequest {
546 id: self.next_request_id(),
547 r#type: RequestType::ServiceDescribe.into(),
548 protocol_version: 1,
549 client_name: String::from("running-process-client"),
550 service_describe: Some(ServiceDescribeRequest {
551 target: target.to_string(),
552 }),
553 ..Default::default()
554 };
555 self.send_request(request)
556 }
557
558 pub fn service_logs(
560 &mut self,
561 target: &str,
562 lines: u32,
563 follow: bool,
564 ) -> Result<DaemonResponse, ClientError> {
565 let request = DaemonRequest {
566 id: self.next_request_id(),
567 r#type: RequestType::ServiceLogs.into(),
568 protocol_version: 1,
569 client_name: String::from("running-process-client"),
570 service_logs: Some(ServiceLogsRequest {
571 target: target.to_string(),
572 lines,
573 follow,
574 }),
575 ..Default::default()
576 };
577 self.send_request(request)
578 }
579
580 pub fn service_flush(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
582 let request = DaemonRequest {
583 id: self.next_request_id(),
584 r#type: RequestType::ServiceFlush.into(),
585 protocol_version: 1,
586 client_name: String::from("running-process-client"),
587 service_flush: Some(ServiceFlushRequest {
588 target: target.to_string(),
589 }),
590 ..Default::default()
591 };
592 self.send_request(request)
593 }
594
595 pub fn service_save(&mut self) -> Result<DaemonResponse, ClientError> {
597 let request = DaemonRequest {
598 id: self.next_request_id(),
599 r#type: RequestType::ServiceSave.into(),
600 protocol_version: 1,
601 client_name: String::from("running-process-client"),
602 service_save: Some(ServiceSaveRequest {}),
603 ..Default::default()
604 };
605 self.send_request(request)
606 }
607
608 pub fn service_resurrect(&mut self) -> Result<DaemonResponse, ClientError> {
610 let request = DaemonRequest {
611 id: self.next_request_id(),
612 r#type: RequestType::ServiceResurrect.into(),
613 protocol_version: 1,
614 client_name: String::from("running-process-client"),
615 service_resurrect: Some(ServiceResurrectRequest {}),
616 ..Default::default()
617 };
618 self.send_request(request)
619 }
620
621 pub fn resize_pty_session(
626 &mut self,
627 session_id: &str,
628 rows: u16,
629 cols: u16,
630 ) -> Result<(), ClientError> {
631 let request = DaemonRequest {
632 id: self.next_request_id(),
633 r#type: RequestType::ResizePtySession.into(),
634 protocol_version: 1,
635 client_name: String::from("running-process-client"),
636 resize_pty_session: Some(ResizePtySessionRequest {
637 session_id: session_id.into(),
638 rows: rows as u32,
639 cols: cols as u32,
640 }),
641 ..Default::default()
642 };
643 let response = self.send_request(request)?;
644 if response.code != StatusCode::Ok as i32 {
645 let code =
646 StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
647 return Err(ClientError::Server {
648 code,
649 message: response.message,
650 });
651 }
652 Ok(())
653 }
654
655 pub fn purge_exited_sessions(
658 &mut self,
659 originator: &str,
660 ) -> Result<PurgeExitedSessionsResponse, ClientError> {
661 let request = DaemonRequest {
662 id: self.next_request_id(),
663 r#type: RequestType::PurgeExitedSessions.into(),
664 protocol_version: 1,
665 client_name: String::from("running-process-client"),
666 purge_exited_sessions: Some(PurgeExitedSessionsRequest {
667 originator: originator.into(),
668 }),
669 ..Default::default()
670 };
671 let response = self.send_request(request)?;
672 if response.code != StatusCode::Ok as i32 {
673 let code =
674 StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
675 return Err(ClientError::Server {
676 code,
677 message: response.message,
678 });
679 }
680 response
681 .purge_exited_sessions
682 .ok_or_else(|| ClientError::Server {
683 code: StatusCode::Internal,
684 message: "purge_exited_sessions response missing payload".into(),
685 })
686 }
687
688 pub fn bulk_terminate_sessions(
691 &mut self,
692 older_than_secs: u64,
693 originator: &str,
694 grace_ms: u32,
695 ) -> Result<BulkTerminateSessionsResponse, ClientError> {
696 let request = DaemonRequest {
697 id: self.next_request_id(),
698 r#type: RequestType::BulkTerminateSessions.into(),
699 protocol_version: 1,
700 client_name: String::from("running-process-client"),
701 bulk_terminate_sessions: Some(BulkTerminateSessionsRequest {
702 older_than_secs,
703 originator: originator.into(),
704 grace_ms,
705 }),
706 ..Default::default()
707 };
708 let response = self.send_request(request)?;
709 if response.code != StatusCode::Ok as i32 {
710 let code =
711 StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
712 return Err(ClientError::Server {
713 code,
714 message: response.message,
715 });
716 }
717 response
718 .bulk_terminate_sessions
719 .ok_or_else(|| ClientError::Server {
720 code: StatusCode::Internal,
721 message: "bulk_terminate_sessions response missing payload".into(),
722 })
723 }
724
725 pub fn get_session_backlog(
730 &mut self,
731 session_id: &str,
732 pipe_stream: PipeStreamKind,
733 ) -> Result<Option<GetSessionBacklogResponse>, ClientError> {
734 let request = DaemonRequest {
735 id: self.next_request_id(),
736 r#type: RequestType::GetSessionBacklog.into(),
737 protocol_version: 1,
738 client_name: String::from("running-process-client"),
739 get_session_backlog: Some(GetSessionBacklogRequest {
740 session_id: session_id.into(),
741 pipe_stream: pipe_stream as i32,
742 }),
743 ..Default::default()
744 };
745 let response = self.send_request(request)?;
746 if response.code == StatusCode::NotFound as i32 {
747 return Ok(None);
748 }
749 if response.code != StatusCode::Ok as i32 {
750 let code =
751 StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
752 return Err(ClientError::Server {
753 code,
754 message: response.message,
755 });
756 }
757 Ok(response.get_session_backlog)
758 }
759}
760
761pub fn connect_or_start(scope_hash: Option<&str>) -> Result<DaemonClient, ClientError> {
772 if let Ok(client) = DaemonClient::connect(scope_hash) {
774 return Ok(client);
775 }
776
777 spawn_daemon()?;
779
780 let delays_ms: [u64; 4] = [50, 100, 200, 400];
782 for delay in delays_ms {
783 std::thread::sleep(std::time::Duration::from_millis(delay));
784 if let Ok(client) = DaemonClient::connect(scope_hash) {
785 return Ok(client);
786 }
787 }
788
789 Err(ClientError::DaemonNotRunning)
790}
791
792pub fn launch_detached(command: &str) -> Result<SpawnedDaemon, ClientError> {
797 let mut client = connect_or_start(None)?;
798 client.spawn_command(&SpawnCommandRequest::shell(command))
799}
800
801pub fn daemonize_command(command: &str) -> Result<SpawnedDaemon, ClientError> {
807 launch_detached(command)
808}
809
810fn spawn_daemon() -> Result<(), ClientError> {
812 let exe = daemon_exe_path();
813
814 #[cfg(windows)]
815 {
816 use std::os::windows::process::CommandExt;
817 const DETACHED: u32 = 0x0000_0008 | 0x0000_0200;
819 std::process::Command::new(&exe)
820 .arg("start")
821 .creation_flags(DETACHED)
822 .stdin(std::process::Stdio::null())
823 .stdout(std::process::Stdio::null())
824 .stderr(std::process::Stdio::null())
825 .spawn()
826 .map_err(ClientError::Io)?;
827 }
828
829 #[cfg(unix)]
830 {
831 std::process::Command::new(&exe)
832 .arg("start")
833 .stdin(std::process::Stdio::null())
834 .stdout(std::process::Stdio::null())
835 .stderr(std::process::Stdio::null())
836 .spawn()
837 .map_err(ClientError::Io)?;
838 }
839
840 Ok(())
841}
842
843fn daemon_exe_path() -> String {
848 if let Ok(mut path) = std::env::current_exe() {
849 path.pop(); let candidate = path.join(if cfg!(windows) {
851 "running-process-daemon.exe"
852 } else {
853 "running-process-daemon"
854 });
855 if candidate.exists() {
856 return candidate.to_string_lossy().into_owned();
857 }
858 }
859 String::from("running-process-daemon")
861}
862
863#[cfg(test)]
864mod tests {
865 use super::*;
866
867 #[test]
868 fn launch_detached_has_public_sync_signature() {
869 let _api: fn(&str) -> Result<SpawnedDaemon, ClientError> = launch_detached;
870 }
871
872 #[test]
873 fn spawn_command_request_builder_sets_detached_launch_context() {
874 let request = SpawnCommandRequest::shell("echo hello")
875 .with_cwd("work")
876 .with_envs([("A", "1")])
877 .with_env("B", "2")
878 .with_originator("tool:123");
879
880 assert_eq!(request.command, "echo hello");
881 assert_eq!(request.cwd.as_deref(), Some(std::path::Path::new("work")));
882 assert_eq!(
883 request.env,
884 vec![
885 ("A".to_string(), "1".to_string()),
886 ("B".to_string(), "2".to_string())
887 ]
888 );
889 assert_eq!(request.originator.as_deref(), Some("tool:123"));
890 }
891}