use crate::paths;
use interprocess::local_socket::Stream;
use interprocess::TryClone;
use prost::Message;
use running_process_proto::daemon::{
BulkTerminateSessionsRequest, BulkTerminateSessionsResponse, DaemonRequest, DaemonResponse,
GetProcessTreeRequest, GetSessionBacklogRequest, GetSessionBacklogResponse, KeyValue,
KillTreeRequest, KillZombiesRequest, ListActiveRequest, ListByOriginatorRequest, PingRequest,
PipeStreamKind, PurgeExitedSessionsRequest, PurgeExitedSessionsResponse, RequestType,
ResizePtySessionRequest, ServiceConfig, ServiceDeleteRequest, ServiceDescribeRequest,
ServiceFlushRequest, ServiceListRequest, ServiceLogsRequest, ServiceRestartRequest,
ServiceResurrectRequest, ServiceSaveRequest, ServiceStartRequest, ServiceStopRequest,
ShutdownRequest, SpawnDaemonRequest as ProtoSpawnDaemonRequest, StatusCode, StatusRequest,
};
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug)]
pub enum ClientError {
Connect(std::io::Error),
Io(std::io::Error),
Decode(prost::DecodeError),
Server { code: StatusCode, message: String },
DaemonNotRunning,
}
impl std::fmt::Display for ClientError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ClientError::Connect(e) => write!(f, "failed to connect to daemon: {e}"),
ClientError::Io(e) => write!(f, "daemon I/O error: {e}"),
ClientError::Decode(e) => write!(f, "failed to decode daemon response: {e}"),
ClientError::Server { code, message } => {
write!(f, "daemon returned {:?}: {}", code, message)
}
ClientError::DaemonNotRunning => write!(f, "daemon is not running"),
}
}
}
impl std::error::Error for ClientError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
ClientError::Connect(e) | ClientError::Io(e) => Some(e),
ClientError::Decode(e) => Some(e),
ClientError::Server { .. } | ClientError::DaemonNotRunning => None,
}
}
}
#[derive(Debug, Clone)]
pub struct SpawnCommandRequest {
pub command: String,
pub cwd: Option<PathBuf>,
pub env: Vec<(String, String)>,
pub originator: Option<String>,
pub clear_inherited_env: bool,
}
impl SpawnCommandRequest {
fn default_originator() -> String {
let caller = std::env::current_exe()
.ok()
.and_then(|path| {
path.file_stem()
.map(|stem| stem.to_string_lossy().into_owned())
})
.filter(|value| !value.is_empty())
.unwrap_or_else(|| "running-process-client".to_string());
format!("{caller}:{}", std::process::id())
}
pub fn shell(command: impl Into<String>) -> Self {
Self {
command: command.into(),
cwd: std::env::current_dir().ok(),
env: std::env::vars().collect(),
originator: Some(Self::default_originator()),
clear_inherited_env: false,
}
}
pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
self.cwd = Some(cwd.into());
self
}
pub fn with_envs<I, K, V>(mut self, env: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
K: Into<String>,
V: Into<String>,
{
self.env = env
.into_iter()
.map(|(key, value)| (key.into(), value.into()))
.collect();
self
}
pub fn with_env_replace<I, K, V>(mut self, env: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
K: Into<String>,
V: Into<String>,
{
self.env = env
.into_iter()
.map(|(key, value)| (key.into(), value.into()))
.collect();
self.clear_inherited_env = true;
self
}
pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
let key = key.into();
let value = value.into();
if let Some((_, existing)) = self
.env
.iter_mut()
.find(|(existing_key, _)| *existing_key == key)
{
*existing = value;
} else {
self.env.push((key, value));
}
self
}
pub fn with_originator(mut self, originator: impl Into<String>) -> Self {
self.originator = Some(originator.into());
self
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct SpawnedDaemon {
pub pid: u32,
pub created_at: f64,
pub command: String,
pub cwd: Option<String>,
pub originator: Option<String>,
pub containment: String,
}
pub struct DaemonClient {
reader: BufReader<Stream>,
writer: BufWriter<Stream>,
next_id: AtomicU64,
}
impl DaemonClient {
pub fn connect(scope_hash: Option<&str>) -> Result<Self, ClientError> {
let path = paths::socket_path(scope_hash);
Self::connect_to(&path)
}
pub fn connect_to(socket_path: &str) -> Result<Self, ClientError> {
let name = paths::make_socket_name(socket_path).map_err(ClientError::Connect)?;
use interprocess::local_socket::traits::Stream as _;
let stream = Stream::connect(name).map_err(ClientError::Connect)?;
let stream_clone = stream.try_clone().map_err(ClientError::Connect)?;
Ok(Self {
reader: BufReader::new(stream),
writer: BufWriter::new(stream_clone),
next_id: AtomicU64::new(1),
})
}
pub fn send_request(&mut self, request: DaemonRequest) -> Result<DaemonResponse, ClientError> {
let payload = request.encode_to_vec();
let len = payload.len() as u32;
self.writer
.write_all(&len.to_be_bytes())
.map_err(ClientError::Io)?;
self.writer.write_all(&payload).map_err(ClientError::Io)?;
self.writer.flush().map_err(ClientError::Io)?;
let mut len_buf = [0u8; 4];
self.reader
.read_exact(&mut len_buf)
.map_err(ClientError::Io)?;
let resp_len = u32::from_be_bytes(len_buf) as usize;
let mut resp_buf = vec![0u8; resp_len];
self.reader
.read_exact(&mut resp_buf)
.map_err(ClientError::Io)?;
DaemonResponse::decode(&resp_buf[..]).map_err(ClientError::Decode)
}
pub(crate) fn next_request_id(&self) -> u64 {
self.next_id.fetch_add(1, Ordering::Relaxed)
}
fn ensure_ok(&self, response: &DaemonResponse) -> Result<(), ClientError> {
if response.code == StatusCode::Ok as i32 {
return Ok(());
}
let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
Err(ClientError::Server {
code,
message: response.message.clone(),
})
}
pub fn ping(&mut self) -> Result<DaemonResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::Ping.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
ping: Some(PingRequest {}),
..Default::default()
};
self.send_request(request)
}
pub fn shutdown(
&mut self,
graceful: bool,
timeout_seconds: f64,
) -> Result<DaemonResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::Shutdown.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
shutdown: Some(ShutdownRequest {
graceful,
timeout_seconds,
}),
..Default::default()
};
self.send_request(request)
}
pub fn status(&mut self) -> Result<DaemonResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::Status.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
status: Some(StatusRequest {}),
..Default::default()
};
self.send_request(request)
}
pub fn list_active(&mut self) -> Result<DaemonResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::ListActive.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
list_active: Some(ListActiveRequest {}),
..Default::default()
};
self.send_request(request)
}
pub fn list_by_originator(&mut self, tool: &str) -> Result<DaemonResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::ListByOriginator.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
list_by_originator: Some(ListByOriginatorRequest {
tool: tool.to_string(),
}),
..Default::default()
};
self.send_request(request)
}
pub fn kill_zombies(&mut self, dry_run: bool) -> Result<DaemonResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::KillZombies.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
kill_zombies: Some(KillZombiesRequest { dry_run }),
..Default::default()
};
self.send_request(request)
}
pub fn kill_tree(
&mut self,
pid: u32,
timeout_seconds: f64,
) -> Result<DaemonResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::KillTree.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
kill_tree: Some(KillTreeRequest {
pid,
timeout_seconds,
}),
..Default::default()
};
self.send_request(request)
}
pub fn get_process_tree(&mut self, pid: u32) -> Result<DaemonResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::GetProcessTree.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
get_process_tree: Some(GetProcessTreeRequest { pid }),
..Default::default()
};
self.send_request(request)
}
pub fn spawn_command(
&mut self,
request: &SpawnCommandRequest,
) -> Result<SpawnedDaemon, ClientError> {
let daemon_request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::SpawnDaemon.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
spawn_daemon: Some(ProtoSpawnDaemonRequest {
command: request.command.clone(),
cwd: request
.cwd
.as_ref()
.map(|cwd| cwd.to_string_lossy().into_owned())
.unwrap_or_default(),
env: request
.env
.iter()
.map(|(k, v)| KeyValue {
key: k.clone(),
value: v.clone(),
})
.collect(),
originator: request.originator.clone().unwrap_or_default(),
clear_inherited_env: request.clear_inherited_env,
}),
..Default::default()
};
let response = self.send_request(daemon_request)?;
self.ensure_ok(&response)?;
let payload = response.spawn_daemon.ok_or_else(|| ClientError::Server {
code: StatusCode::Internal,
message: "spawn response missing payload".to_string(),
})?;
Ok(SpawnedDaemon {
pid: payload.pid,
created_at: payload.created_at,
command: payload.command,
cwd: if payload.cwd.is_empty() {
None
} else {
Some(payload.cwd)
},
originator: if payload.originator.is_empty() {
None
} else {
Some(payload.originator)
},
containment: payload.containment,
})
}
pub fn service_start(&mut self, config: ServiceConfig) -> Result<DaemonResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::ServiceStart.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
service_start: Some(ServiceStartRequest {
config: Some(config),
}),
..Default::default()
};
self.send_request(request)
}
pub fn service_stop(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::ServiceStop.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
service_stop: Some(ServiceStopRequest {
target: target.to_string(),
}),
..Default::default()
};
self.send_request(request)
}
pub fn service_restart(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::ServiceRestart.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
service_restart: Some(ServiceRestartRequest {
target: target.to_string(),
}),
..Default::default()
};
self.send_request(request)
}
pub fn service_delete(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::ServiceDelete.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
service_delete: Some(ServiceDeleteRequest {
target: target.to_string(),
}),
..Default::default()
};
self.send_request(request)
}
pub fn service_list(&mut self) -> Result<DaemonResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::ServiceList.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
service_list: Some(ServiceListRequest {}),
..Default::default()
};
self.send_request(request)
}
pub fn service_describe(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::ServiceDescribe.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
service_describe: Some(ServiceDescribeRequest {
target: target.to_string(),
}),
..Default::default()
};
self.send_request(request)
}
pub fn service_logs(
&mut self,
target: &str,
lines: u32,
follow: bool,
) -> Result<DaemonResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::ServiceLogs.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
service_logs: Some(ServiceLogsRequest {
target: target.to_string(),
lines,
follow,
}),
..Default::default()
};
self.send_request(request)
}
pub fn service_flush(&mut self, target: &str) -> Result<DaemonResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::ServiceFlush.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
service_flush: Some(ServiceFlushRequest {
target: target.to_string(),
}),
..Default::default()
};
self.send_request(request)
}
pub fn service_save(&mut self) -> Result<DaemonResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::ServiceSave.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
service_save: Some(ServiceSaveRequest {}),
..Default::default()
};
self.send_request(request)
}
pub fn service_resurrect(&mut self) -> Result<DaemonResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::ServiceResurrect.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
service_resurrect: Some(ServiceResurrectRequest {}),
..Default::default()
};
self.send_request(request)
}
pub fn resize_pty_session(
&mut self,
session_id: &str,
rows: u16,
cols: u16,
) -> Result<(), ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::ResizePtySession.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
resize_pty_session: Some(ResizePtySessionRequest {
session_id: session_id.into(),
rows: rows as u32,
cols: cols as u32,
}),
..Default::default()
};
let response = self.send_request(request)?;
if response.code != StatusCode::Ok as i32 {
let code =
StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
return Err(ClientError::Server {
code,
message: response.message,
});
}
Ok(())
}
pub fn purge_exited_sessions(
&mut self,
originator: &str,
) -> Result<PurgeExitedSessionsResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::PurgeExitedSessions.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
purge_exited_sessions: Some(PurgeExitedSessionsRequest {
originator: originator.into(),
}),
..Default::default()
};
let response = self.send_request(request)?;
if response.code != StatusCode::Ok as i32 {
let code =
StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
return Err(ClientError::Server {
code,
message: response.message,
});
}
response
.purge_exited_sessions
.ok_or_else(|| ClientError::Server {
code: StatusCode::Internal,
message: "purge_exited_sessions response missing payload".into(),
})
}
pub fn bulk_terminate_sessions(
&mut self,
older_than_secs: u64,
originator: &str,
grace_ms: u32,
) -> Result<BulkTerminateSessionsResponse, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::BulkTerminateSessions.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
bulk_terminate_sessions: Some(BulkTerminateSessionsRequest {
older_than_secs,
originator: originator.into(),
grace_ms,
}),
..Default::default()
};
let response = self.send_request(request)?;
if response.code != StatusCode::Ok as i32 {
let code =
StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
return Err(ClientError::Server {
code,
message: response.message,
});
}
response
.bulk_terminate_sessions
.ok_or_else(|| ClientError::Server {
code: StatusCode::Internal,
message: "bulk_terminate_sessions response missing payload".into(),
})
}
pub fn get_session_backlog(
&mut self,
session_id: &str,
pipe_stream: PipeStreamKind,
) -> Result<Option<GetSessionBacklogResponse>, ClientError> {
let request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::GetSessionBacklog.into(),
protocol_version: 1,
client_name: String::from("running-process-client"),
get_session_backlog: Some(GetSessionBacklogRequest {
session_id: session_id.into(),
pipe_stream: pipe_stream as i32,
}),
..Default::default()
};
let response = self.send_request(request)?;
if response.code == StatusCode::NotFound as i32 {
return Ok(None);
}
if response.code != StatusCode::Ok as i32 {
let code =
StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
return Err(ClientError::Server {
code,
message: response.message,
});
}
Ok(response.get_session_backlog)
}
}
pub fn connect_or_start(scope_hash: Option<&str>) -> Result<DaemonClient, ClientError> {
if let Ok(client) = DaemonClient::connect(scope_hash) {
return Ok(client);
}
spawn_daemon()?;
let delays_ms: [u64; 4] = [50, 100, 200, 400];
for delay in delays_ms {
std::thread::sleep(std::time::Duration::from_millis(delay));
if let Ok(client) = DaemonClient::connect(scope_hash) {
return Ok(client);
}
}
Err(ClientError::DaemonNotRunning)
}
pub fn launch_detached(command: &str) -> Result<SpawnedDaemon, ClientError> {
let mut client = connect_or_start(None)?;
client.spawn_command(&SpawnCommandRequest::shell(command))
}
pub fn daemonize_command(command: &str) -> Result<SpawnedDaemon, ClientError> {
launch_detached(command)
}
fn spawn_daemon() -> Result<(), ClientError> {
let exe = daemon_exe_path();
#[cfg(windows)]
{
use std::os::windows::process::CommandExt;
const DETACHED: u32 = 0x0000_0008 | 0x0000_0200;
std::process::Command::new(&exe)
.arg("start")
.creation_flags(DETACHED)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.map_err(ClientError::Io)?;
}
#[cfg(unix)]
{
std::process::Command::new(&exe)
.arg("start")
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.map_err(ClientError::Io)?;
}
Ok(())
}
fn daemon_exe_path() -> String {
if let Ok(mut path) = std::env::current_exe() {
path.pop(); let candidate = path.join(if cfg!(windows) {
"running-process-daemon.exe"
} else {
"running-process-daemon"
});
if candidate.exists() {
return candidate.to_string_lossy().into_owned();
}
}
String::from("running-process-daemon")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn launch_detached_has_public_sync_signature() {
let _api: fn(&str) -> Result<SpawnedDaemon, ClientError> = launch_detached;
}
#[test]
fn spawn_command_request_builder_sets_detached_launch_context() {
let request = SpawnCommandRequest::shell("echo hello")
.with_cwd("work")
.with_envs([("A", "1")])
.with_env("B", "2")
.with_originator("tool:123");
assert_eq!(request.command, "echo hello");
assert_eq!(request.cwd.as_deref(), Some(std::path::Path::new("work")));
assert_eq!(
request.env,
vec![
("A".to_string(), "1".to_string()),
("B".to_string(), "2".to_string())
]
);
assert_eq!(request.originator.as_deref(), Some("tool:123"));
}
}