1use crate::client::paths;
8use crate::proto::daemon::{
9 BulkTerminateSessionsRequest, BulkTerminateSessionsResponse, DaemonRequest, DaemonResponse,
10 GetProcessTreeRequest, GetSessionBacklogRequest, GetSessionBacklogResponse, KeyValue,
11 KillTreeRequest, KillZombiesRequest, ListActiveRequest, ListByOriginatorRequest, PingRequest,
12 PipeStreamKind, PurgeExitedSessionsRequest, PurgeExitedSessionsResponse, RequestType,
13 ResizePtySessionRequest, ServiceConfig, ServiceDeleteRequest, ServiceDescribeRequest,
14 ServiceFlushRequest, ServiceListRequest, ServiceLogsRequest, ServiceRestartRequest,
15 ServiceResurrectRequest, ServiceSaveRequest, ServiceStartRequest, ServiceStopRequest,
16 ShutdownRequest, SpawnDaemonRequest as ProtoSpawnDaemonRequest, StatusCode, StatusRequest,
17};
18use interprocess::local_socket::Stream;
19use interprocess::TryClone;
20use prost::Message;
21use std::io::{BufReader, BufWriter, Read, Write};
22use std::path::PathBuf;
23use std::sync::atomic::{AtomicU64, Ordering};
24
25#[derive(Debug)]
31pub enum ClientError {
32 Connect(std::io::Error),
34 Io(std::io::Error),
36 Decode(prost::DecodeError),
38 Server {
40 code: StatusCode,
42 message: String,
44 },
45 DaemonNotRunning,
47}
48
49impl std::fmt::Display for ClientError {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 match self {
52 ClientError::Connect(e) => write!(f, "failed to connect to daemon: {e}"),
53 ClientError::Io(e) => write!(f, "daemon I/O error: {e}"),
54 ClientError::Decode(e) => write!(f, "failed to decode daemon response: {e}"),
55 ClientError::Server { code, message } => {
56 write!(f, "daemon returned {:?}: {}", code, message)
57 }
58 ClientError::DaemonNotRunning => write!(f, "daemon is not running"),
59 }
60 }
61}
62
63impl std::error::Error for ClientError {
64 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
65 match self {
66 ClientError::Connect(e) | ClientError::Io(e) => Some(e),
67 ClientError::Decode(e) => Some(e),
68 ClientError::Server { .. } | ClientError::DaemonNotRunning => None,
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
79pub struct SpawnCommandRequest {
80 pub command: String,
82 pub cwd: Option<PathBuf>,
84 pub env: Vec<(String, String)>,
86 pub originator: Option<String>,
88 pub clear_inherited_env: bool,
94}
95
96impl SpawnCommandRequest {
97 fn default_originator() -> String {
98 let caller = std::env::current_exe()
99 .ok()
100 .and_then(|path| {
101 path.file_stem()
102 .map(|stem| stem.to_string_lossy().into_owned())
103 })
104 .filter(|value| !value.is_empty())
105 .unwrap_or_else(|| "running-process-client".to_string());
106 format!("{caller}:{}", std::process::id())
107 }
108
109 pub fn shell(command: impl Into<String>) -> Self {
112 Self {
113 command: command.into(),
114 cwd: std::env::current_dir().ok(),
115 env: std::env::vars().collect(),
116 originator: Some(Self::default_originator()),
117 clear_inherited_env: false,
118 }
119 }
120
121 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
123 self.cwd = Some(cwd.into());
124 self
125 }
126
127 pub fn with_envs<I, K, V>(mut self, env: I) -> Self
131 where
132 I: IntoIterator<Item = (K, V)>,
133 K: Into<String>,
134 V: Into<String>,
135 {
136 self.env = env
137 .into_iter()
138 .map(|(key, value)| (key.into(), value.into()))
139 .collect();
140 self
141 }
142
143 pub fn with_env_replace<I, K, V>(mut self, env: I) -> Self
156 where
157 I: IntoIterator<Item = (K, V)>,
158 K: Into<String>,
159 V: Into<String>,
160 {
161 self.env = env
162 .into_iter()
163 .map(|(key, value)| (key.into(), value.into()))
164 .collect();
165 self.clear_inherited_env = true;
166 self
167 }
168
169 pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
172 let key = key.into();
173 let value = value.into();
174 if let Some((_, existing)) = self
175 .env
176 .iter_mut()
177 .find(|(existing_key, _)| *existing_key == key)
178 {
179 *existing = value;
180 } else {
181 self.env.push((key, value));
182 }
183 self
184 }
185
186 pub fn with_originator(mut self, originator: impl Into<String>) -> Self {
189 self.originator = Some(originator.into());
190 self
191 }
192}
193
194#[derive(Debug, Clone, PartialEq)]
196pub struct SpawnedDaemon {
197 pub pid: u32,
199 pub created_at: f64,
201 pub command: String,
203 pub cwd: Option<String>,
205 pub originator: Option<String>,
207 pub containment: String,
209}
210
211pub struct DaemonClient {
220 reader: BufReader<Stream>,
221 writer: BufWriter<Stream>,
222 next_id: AtomicU64,
223}
224
225impl DaemonClient {
226 pub fn connect(scope_hash: Option<&str>) -> Result<Self, ClientError> {
231 let path = paths::socket_path(scope_hash);
232 Self::connect_to(&path)
233 }
234
235 pub fn connect_to(socket_path: &str) -> Result<Self, ClientError> {
240 let name = paths::make_socket_name(socket_path).map_err(ClientError::Connect)?;
241
242 use interprocess::local_socket::traits::Stream as _;
243 let stream = Stream::connect(name).map_err(ClientError::Connect)?;
244 let stream_clone = stream.try_clone().map_err(ClientError::Connect)?;
245
246 Ok(Self {
247 reader: BufReader::new(stream),
248 writer: BufWriter::new(stream_clone),
249 next_id: AtomicU64::new(1),
250 })
251 }
252
253 pub fn send_request(&mut self, request: DaemonRequest) -> Result<DaemonResponse, ClientError> {
258 let payload = request.encode_to_vec();
260 let len = payload.len() as u32;
261
262 self.writer
264 .write_all(&len.to_be_bytes())
265 .map_err(ClientError::Io)?;
266 self.writer.write_all(&payload).map_err(ClientError::Io)?;
267 self.writer.flush().map_err(ClientError::Io)?;
268
269 let mut len_buf = [0u8; 4];
271 self.reader
272 .read_exact(&mut len_buf)
273 .map_err(ClientError::Io)?;
274 let resp_len = u32::from_be_bytes(len_buf) as usize;
275
276 let mut resp_buf = vec![0u8; resp_len];
278 self.reader
279 .read_exact(&mut resp_buf)
280 .map_err(ClientError::Io)?;
281
282 DaemonResponse::decode(&resp_buf[..]).map_err(ClientError::Decode)
283 }
284
285 pub(crate) fn next_request_id(&self) -> u64 {
291 self.next_id.fetch_add(1, Ordering::Relaxed)
292 }
293
294 fn ensure_ok(&self, response: &DaemonResponse) -> Result<(), ClientError> {
295 if response.code == StatusCode::Ok as i32 {
296 return Ok(());
297 }
298
299 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
300 Err(ClientError::Server {
301 code,
302 message: response.message.clone(),
303 })
304 }
305
306 pub fn ping(&mut self) -> Result<DaemonResponse, ClientError> {
308 let request = DaemonRequest {
309 id: self.next_request_id(),
310 r#type: RequestType::Ping.into(),
311 protocol_version: 1,
312 client_name: String::from("running-process-client"),
313 ping: Some(PingRequest {}),
314 ..Default::default()
315 };
316 self.send_request(request)
317 }
318
319 pub fn shutdown(
321 &mut self,
322 graceful: bool,
323 timeout_seconds: f64,
324 ) -> Result<DaemonResponse, ClientError> {
325 let request = DaemonRequest {
326 id: self.next_request_id(),
327 r#type: RequestType::Shutdown.into(),
328 protocol_version: 1,
329 client_name: String::from("running-process-client"),
330 shutdown: Some(ShutdownRequest {
331 graceful,
332 timeout_seconds,
333 }),
334 ..Default::default()
335 };
336 self.send_request(request)
337 }
338
339 pub fn status(&mut self) -> Result<DaemonResponse, ClientError> {
341 let request = DaemonRequest {
342 id: self.next_request_id(),
343 r#type: RequestType::Status.into(),
344 protocol_version: 1,
345 client_name: String::from("running-process-client"),
346 status: Some(StatusRequest {}),
347 ..Default::default()
348 };
349 self.send_request(request)
350 }
351
352 pub fn list_active(&mut self) -> Result<DaemonResponse, ClientError> {
354 let request = DaemonRequest {
355 id: self.next_request_id(),
356 r#type: RequestType::ListActive.into(),
357 protocol_version: 1,
358 client_name: String::from("running-process-client"),
359 list_active: Some(ListActiveRequest {}),
360 ..Default::default()
361 };
362 self.send_request(request)
363 }
364
365 pub fn list_by_originator(&mut self, tool: &str) -> Result<DaemonResponse, ClientError> {
367 let request = DaemonRequest {
368 id: self.next_request_id(),
369 r#type: RequestType::ListByOriginator.into(),
370 protocol_version: 1,
371 client_name: String::from("running-process-client"),
372 list_by_originator: Some(ListByOriginatorRequest {
373 tool: tool.to_string(),
374 }),
375 ..Default::default()
376 };
377 self.send_request(request)
378 }
379
380 pub fn kill_zombies(&mut self, dry_run: bool) -> Result<DaemonResponse, ClientError> {
382 let request = DaemonRequest {
383 id: self.next_request_id(),
384 r#type: RequestType::KillZombies.into(),
385 protocol_version: 1,
386 client_name: String::from("running-process-client"),
387 kill_zombies: Some(KillZombiesRequest { dry_run }),
388 ..Default::default()
389 };
390 self.send_request(request)
391 }
392
393 pub fn kill_tree(
395 &mut self,
396 pid: u32,
397 timeout_seconds: f64,
398 ) -> Result<DaemonResponse, ClientError> {
399 let request = DaemonRequest {
400 id: self.next_request_id(),
401 r#type: RequestType::KillTree.into(),
402 protocol_version: 1,
403 client_name: String::from("running-process-client"),
404 kill_tree: Some(KillTreeRequest {
405 pid,
406 timeout_seconds,
407 }),
408 ..Default::default()
409 };
410 self.send_request(request)
411 }
412
413 pub fn get_process_tree(&mut self, pid: u32) -> Result<DaemonResponse, ClientError> {
415 let request = DaemonRequest {
416 id: self.next_request_id(),
417 r#type: RequestType::GetProcessTree.into(),
418 protocol_version: 1,
419 client_name: String::from("running-process-client"),
420 get_process_tree: Some(GetProcessTreeRequest { pid }),
421 ..Default::default()
422 };
423 self.send_request(request)
424 }
425
426 pub fn spawn_command(
428 &mut self,
429 request: &SpawnCommandRequest,
430 ) -> Result<SpawnedDaemon, ClientError> {
431 let daemon_request = DaemonRequest {
432 id: self.next_request_id(),
433 r#type: RequestType::SpawnDaemon.into(),
434 protocol_version: 1,
435 client_name: String::from("running-process-client"),
436 spawn_daemon: Some(ProtoSpawnDaemonRequest {
437 command: request.command.clone(),
438 cwd: request
439 .cwd
440 .as_ref()
441 .map(|cwd| cwd.to_string_lossy().into_owned())
442 .unwrap_or_default(),
443 env: request
444 .env
445 .iter()
446 .map(|(k, v)| KeyValue {
447 key: k.clone(),
448 value: v.clone(),
449 })
450 .collect(),
451 originator: request.originator.clone().unwrap_or_default(),
452 clear_inherited_env: request.clear_inherited_env,
453 }),
454 ..Default::default()
455 };
456
457 let response = self.send_request(daemon_request)?;
458 self.ensure_ok(&response)?;
459
460 let payload = response.spawn_daemon.ok_or_else(|| ClientError::Server {
461 code: StatusCode::Internal,
462 message: "spawn response missing payload".to_string(),
463 })?;
464
465 Ok(SpawnedDaemon {
466 pid: payload.pid,
467 created_at: payload.created_at,
468 command: payload.command,
469 cwd: if payload.cwd.is_empty() {
470 None
471 } else {
472 Some(payload.cwd)
473 },
474 originator: if payload.originator.is_empty() {
475 None
476 } else {
477 Some(payload.originator)
478 },
479 containment: payload.containment,
480 })
481 }
482
483 pub fn service_start(&mut self, config: ServiceConfig) -> Result<DaemonResponse, ClientError> {
487 let request = DaemonRequest {
488 id: self.next_request_id(),
489 r#type: RequestType::ServiceStart.into(),
490 protocol_version: 1,
491 client_name: String::from("running-process-client"),
492 service_start: Some(ServiceStartRequest {
493 config: Some(config),
494 }),
495 ..Default::default()
496 };
497 self.send_request(request)
498 }
499
500 pub fn service_stop(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
502 let request = DaemonRequest {
503 id: self.next_request_id(),
504 r#type: RequestType::ServiceStop.into(),
505 protocol_version: 1,
506 client_name: String::from("running-process-client"),
507 service_stop: Some(ServiceStopRequest {
508 target: target.to_string(),
509 }),
510 ..Default::default()
511 };
512 self.send_request(request)
513 }
514
515 pub fn service_restart(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
517 let request = DaemonRequest {
518 id: self.next_request_id(),
519 r#type: RequestType::ServiceRestart.into(),
520 protocol_version: 1,
521 client_name: String::from("running-process-client"),
522 service_restart: Some(ServiceRestartRequest {
523 target: target.to_string(),
524 }),
525 ..Default::default()
526 };
527 self.send_request(request)
528 }
529
530 pub fn service_delete(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
532 let request = DaemonRequest {
533 id: self.next_request_id(),
534 r#type: RequestType::ServiceDelete.into(),
535 protocol_version: 1,
536 client_name: String::from("running-process-client"),
537 service_delete: Some(ServiceDeleteRequest {
538 target: target.to_string(),
539 }),
540 ..Default::default()
541 };
542 self.send_request(request)
543 }
544
545 pub fn service_list(&mut self) -> Result<DaemonResponse, ClientError> {
547 let request = DaemonRequest {
548 id: self.next_request_id(),
549 r#type: RequestType::ServiceList.into(),
550 protocol_version: 1,
551 client_name: String::from("running-process-client"),
552 service_list: Some(ServiceListRequest {}),
553 ..Default::default()
554 };
555 self.send_request(request)
556 }
557
558 pub fn service_describe(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
560 let request = DaemonRequest {
561 id: self.next_request_id(),
562 r#type: RequestType::ServiceDescribe.into(),
563 protocol_version: 1,
564 client_name: String::from("running-process-client"),
565 service_describe: Some(ServiceDescribeRequest {
566 target: target.to_string(),
567 }),
568 ..Default::default()
569 };
570 self.send_request(request)
571 }
572
573 pub fn service_logs(
575 &mut self,
576 target: &str,
577 lines: u32,
578 follow: bool,
579 ) -> Result<DaemonResponse, ClientError> {
580 let request = DaemonRequest {
581 id: self.next_request_id(),
582 r#type: RequestType::ServiceLogs.into(),
583 protocol_version: 1,
584 client_name: String::from("running-process-client"),
585 service_logs: Some(ServiceLogsRequest {
586 target: target.to_string(),
587 lines,
588 follow,
589 }),
590 ..Default::default()
591 };
592 self.send_request(request)
593 }
594
595 pub fn service_flush(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
597 let request = DaemonRequest {
598 id: self.next_request_id(),
599 r#type: RequestType::ServiceFlush.into(),
600 protocol_version: 1,
601 client_name: String::from("running-process-client"),
602 service_flush: Some(ServiceFlushRequest {
603 target: target.to_string(),
604 }),
605 ..Default::default()
606 };
607 self.send_request(request)
608 }
609
610 pub fn service_save(&mut self) -> Result<DaemonResponse, ClientError> {
612 let request = DaemonRequest {
613 id: self.next_request_id(),
614 r#type: RequestType::ServiceSave.into(),
615 protocol_version: 1,
616 client_name: String::from("running-process-client"),
617 service_save: Some(ServiceSaveRequest {}),
618 ..Default::default()
619 };
620 self.send_request(request)
621 }
622
623 pub fn service_resurrect(&mut self) -> Result<DaemonResponse, ClientError> {
625 let request = DaemonRequest {
626 id: self.next_request_id(),
627 r#type: RequestType::ServiceResurrect.into(),
628 protocol_version: 1,
629 client_name: String::from("running-process-client"),
630 service_resurrect: Some(ServiceResurrectRequest {}),
631 ..Default::default()
632 };
633 self.send_request(request)
634 }
635
636 pub fn resize_pty_session(
641 &mut self,
642 session_id: &str,
643 rows: u16,
644 cols: u16,
645 ) -> Result<(), ClientError> {
646 let request = DaemonRequest {
647 id: self.next_request_id(),
648 r#type: RequestType::ResizePtySession.into(),
649 protocol_version: 1,
650 client_name: String::from("running-process-client"),
651 resize_pty_session: Some(ResizePtySessionRequest {
652 session_id: session_id.into(),
653 rows: rows as u32,
654 cols: cols as u32,
655 }),
656 ..Default::default()
657 };
658 let response = self.send_request(request)?;
659 if response.code != StatusCode::Ok as i32 {
660 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
661 return Err(ClientError::Server {
662 code,
663 message: response.message,
664 });
665 }
666 Ok(())
667 }
668
669 pub fn purge_exited_sessions(
672 &mut self,
673 originator: &str,
674 ) -> Result<PurgeExitedSessionsResponse, ClientError> {
675 let request = DaemonRequest {
676 id: self.next_request_id(),
677 r#type: RequestType::PurgeExitedSessions.into(),
678 protocol_version: 1,
679 client_name: String::from("running-process-client"),
680 purge_exited_sessions: Some(PurgeExitedSessionsRequest {
681 originator: originator.into(),
682 }),
683 ..Default::default()
684 };
685 let response = self.send_request(request)?;
686 if response.code != StatusCode::Ok as i32 {
687 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
688 return Err(ClientError::Server {
689 code,
690 message: response.message,
691 });
692 }
693 response
694 .purge_exited_sessions
695 .ok_or_else(|| ClientError::Server {
696 code: StatusCode::Internal,
697 message: "purge_exited_sessions response missing payload".into(),
698 })
699 }
700
701 pub fn bulk_terminate_sessions(
704 &mut self,
705 older_than_secs: u64,
706 originator: &str,
707 grace_ms: u32,
708 ) -> Result<BulkTerminateSessionsResponse, ClientError> {
709 let request = DaemonRequest {
710 id: self.next_request_id(),
711 r#type: RequestType::BulkTerminateSessions.into(),
712 protocol_version: 1,
713 client_name: String::from("running-process-client"),
714 bulk_terminate_sessions: Some(BulkTerminateSessionsRequest {
715 older_than_secs,
716 originator: originator.into(),
717 grace_ms,
718 }),
719 ..Default::default()
720 };
721 let response = self.send_request(request)?;
722 if response.code != StatusCode::Ok as i32 {
723 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
724 return Err(ClientError::Server {
725 code,
726 message: response.message,
727 });
728 }
729 response
730 .bulk_terminate_sessions
731 .ok_or_else(|| ClientError::Server {
732 code: StatusCode::Internal,
733 message: "bulk_terminate_sessions response missing payload".into(),
734 })
735 }
736
737 pub fn get_session_backlog(
742 &mut self,
743 session_id: &str,
744 pipe_stream: PipeStreamKind,
745 ) -> Result<Option<GetSessionBacklogResponse>, ClientError> {
746 let request = DaemonRequest {
747 id: self.next_request_id(),
748 r#type: RequestType::GetSessionBacklog.into(),
749 protocol_version: 1,
750 client_name: String::from("running-process-client"),
751 get_session_backlog: Some(GetSessionBacklogRequest {
752 session_id: session_id.into(),
753 pipe_stream: pipe_stream as i32,
754 }),
755 ..Default::default()
756 };
757 let response = self.send_request(request)?;
758 if response.code == StatusCode::NotFound as i32 {
759 return Ok(None);
760 }
761 if response.code != StatusCode::Ok as i32 {
762 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
763 return Err(ClientError::Server {
764 code,
765 message: response.message,
766 });
767 }
768 Ok(response.get_session_backlog)
769 }
770}
771
772pub fn connect_or_start(scope_hash: Option<&str>) -> Result<DaemonClient, ClientError> {
783 if let Ok(client) = DaemonClient::connect(scope_hash) {
785 return Ok(client);
786 }
787
788 spawn_daemon()?;
790
791 let delays_ms: [u64; 4] = [50, 100, 200, 400];
799 for delay in delays_ms {
800 std::thread::sleep(std::time::Duration::from_millis(delay));
801 if let Ok(client) = DaemonClient::connect(scope_hash) {
802 return Ok(client);
803 }
804 }
805
806 Err(ClientError::DaemonNotRunning)
807}
808
809pub fn launch_detached(command: &str) -> Result<SpawnedDaemon, ClientError> {
814 let mut client = connect_or_start(None)?;
815 client.spawn_command(&SpawnCommandRequest::shell(command))
816}
817
818pub fn daemonize_command(command: &str) -> Result<SpawnedDaemon, ClientError> {
824 launch_detached(command)
825}
826
827fn spawn_daemon() -> Result<(), ClientError> {
829 let exe = daemon_exe_path();
830
831 #[cfg(windows)]
832 {
833 use std::os::windows::process::CommandExt;
834 const DETACHED: u32 = 0x0000_0008 | 0x0000_0200;
836 std::process::Command::new(&exe)
837 .arg("start")
838 .creation_flags(DETACHED)
839 .stdin(std::process::Stdio::null())
840 .stdout(std::process::Stdio::null())
841 .stderr(std::process::Stdio::null())
842 .spawn()
843 .map_err(ClientError::Io)?;
844 }
845
846 #[cfg(unix)]
847 {
848 std::process::Command::new(&exe)
849 .arg("start")
850 .stdin(std::process::Stdio::null())
851 .stdout(std::process::Stdio::null())
852 .stderr(std::process::Stdio::null())
853 .spawn()
854 .map_err(ClientError::Io)?;
855 }
856
857 Ok(())
858}
859
860fn daemon_exe_path() -> String {
865 if let Ok(mut path) = std::env::current_exe() {
866 path.pop(); let candidate = path.join(if cfg!(windows) {
868 "running-process-daemon.exe"
869 } else {
870 "running-process-daemon"
871 });
872 if candidate.exists() {
873 return candidate.to_string_lossy().into_owned();
874 }
875 }
876 String::from("running-process-daemon")
878}
879
880#[cfg(test)]
881mod tests {
882 use super::*;
883
884 #[test]
885 fn launch_detached_has_public_sync_signature() {
886 let _api: fn(&str) -> Result<SpawnedDaemon, ClientError> = launch_detached;
887 }
888
889 #[test]
890 fn spawn_command_request_builder_sets_detached_launch_context() {
891 let request = SpawnCommandRequest::shell("echo hello")
892 .with_cwd("work")
893 .with_envs([("A", "1")])
894 .with_env("B", "2")
895 .with_originator("tool:123");
896
897 assert_eq!(request.command, "echo hello");
898 assert_eq!(request.cwd.as_deref(), Some(std::path::Path::new("work")));
899 assert_eq!(
900 request.env,
901 vec![
902 ("A".to_string(), "1".to_string()),
903 ("B".to_string(), "2".to_string())
904 ]
905 );
906 assert_eq!(request.originator.as_deref(), Some("tool:123"));
907 }
908}