use crate::client::{ClientError, DaemonClient};
use crate::paths;
use interprocess::local_socket::Stream;
use interprocess::TryClone;
use prost::Message;
use running_process_proto::daemon::{
pty_input_frame::Frame as InputOneof, AttachPtySessionRequest, AttachPtySessionResponse,
DaemonRequest, DaemonResponse, DetachPtySessionRequest, KeyValue, ListPtySessionsRequest,
ListPtySessionsResponse, PtyInputFrame, PtyResize, PtySessionInfo, PtyStreamFrame, RequestType,
SpawnPtySessionRequest, SpawnPtySessionResponse, StatusCode, TerminatePtySessionRequest,
};
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::PathBuf;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct PtySpawnRequest {
pub argv: Vec<String>,
pub cwd: Option<PathBuf>,
pub env: Vec<(String, String)>,
pub clear_inherited_env: bool,
pub rows: u16,
pub cols: u16,
pub originator: Option<String>,
}
impl PtySpawnRequest {
pub fn new<S: Into<String>>(argv: impl IntoIterator<Item = S>) -> Self {
Self {
argv: argv.into_iter().map(Into::into).collect(),
cwd: None,
env: Vec::new(),
clear_inherited_env: false,
rows: 24,
cols: 80,
originator: None,
}
}
pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
self.cwd = Some(cwd.into());
self
}
pub fn with_size(mut self, rows: u16, cols: u16) -> Self {
self.rows = rows;
self.cols = cols;
self
}
pub fn with_originator(mut self, originator: impl Into<String>) -> Self {
self.originator = Some(originator.into());
self
}
pub fn with_envs<I, K, V>(mut self, env: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
K: Into<String>,
V: Into<String>,
{
self.env = env
.into_iter()
.map(|(k, v)| (k.into(), v.into()))
.collect();
self
}
}
#[derive(Debug, Clone)]
pub struct SpawnedPtySession {
pub session_id: String,
pub pid: u32,
pub created_at: f64,
}
impl DaemonClient {
pub fn spawn_pty_session(
&mut self,
request: &PtySpawnRequest,
) -> Result<SpawnedPtySession, ClientError> {
let proto = SpawnPtySessionRequest {
argv: request.argv.clone(),
cwd: request
.cwd
.as_ref()
.map(|p| p.to_string_lossy().into_owned())
.unwrap_or_default(),
env: request
.env
.iter()
.map(|(k, v)| KeyValue {
key: k.clone(),
value: v.clone(),
})
.collect(),
clear_inherited_env: request.clear_inherited_env,
rows: request.rows as u32,
cols: request.cols as u32,
originator: request.originator.clone().unwrap_or_default(),
};
let daemon_request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::SpawnPtySession.into(),
protocol_version: 1,
client_name: "running-process-client".into(),
spawn_pty_session: Some(proto),
..Default::default()
};
let response = self.send_request(daemon_request)?;
ensure_ok(&response)?;
let payload: SpawnPtySessionResponse =
response.spawn_pty_session.ok_or_else(|| ClientError::Server {
code: StatusCode::Internal,
message: "spawn_pty_session response missing payload".into(),
})?;
Ok(SpawnedPtySession {
session_id: payload.session_id,
pid: payload.pid,
created_at: payload.created_at,
})
}
pub fn list_pty_sessions(
&mut self,
originator_filter: &str,
) -> Result<Vec<PtySessionInfo>, ClientError> {
let req = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::ListPtySessions.into(),
protocol_version: 1,
client_name: "running-process-client".into(),
list_pty_sessions: Some(ListPtySessionsRequest {
originator: originator_filter.into(),
}),
..Default::default()
};
let response = self.send_request(req)?;
ensure_ok(&response)?;
let payload: ListPtySessionsResponse = response
.list_pty_sessions
.ok_or_else(|| ClientError::Server {
code: StatusCode::Internal,
message: "list_pty_sessions response missing payload".into(),
})?;
Ok(payload.sessions)
}
pub fn detach_pty_session(&mut self, session_id: &str) -> Result<(), ClientError> {
let req = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::DetachPtySession.into(),
protocol_version: 1,
client_name: "running-process-client".into(),
detach_pty_session: Some(DetachPtySessionRequest {
session_id: session_id.into(),
}),
..Default::default()
};
let response = self.send_request(req)?;
ensure_ok(&response)?;
Ok(())
}
pub fn terminate_pty_session(
&mut self,
session_id: &str,
grace_ms: u32,
) -> Result<(), ClientError> {
let req = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::TerminatePtySession.into(),
protocol_version: 1,
client_name: "running-process-client".into(),
terminate_pty_session: Some(TerminatePtySessionRequest {
session_id: session_id.into(),
grace_ms,
}),
..Default::default()
};
let response = self.send_request(req)?;
ensure_ok(&response)?;
Ok(())
}
}
fn ensure_ok(response: &DaemonResponse) -> Result<(), ClientError> {
if response.code == StatusCode::Ok as i32 {
return Ok(());
}
let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
Err(ClientError::Server {
code,
message: response.message.clone(),
})
}
pub struct PtyAttachment {
reader: BufReader<Stream>,
writer: BufWriter<Stream>,
pub initial_backlog: Vec<u8>,
pub bytes_missed: u64,
}
#[derive(Debug)]
pub enum AttachError {
Connect(std::io::Error),
Io(std::io::Error),
Decode(prost::DecodeError),
Server { code: StatusCode, message: String },
MissingPayload,
}
impl std::fmt::Display for AttachError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AttachError::Connect(e) => write!(f, "attach connect failed: {e}"),
AttachError::Io(e) => write!(f, "attach io error: {e}"),
AttachError::Decode(e) => write!(f, "attach decode error: {e}"),
AttachError::Server { code, message } => {
write!(f, "attach server error {code:?}: {message}")
}
AttachError::MissingPayload => write!(f, "attach response missing payload"),
}
}
}
impl std::error::Error for AttachError {}
impl PtyAttachment {
pub fn attach(
scope_hash: Option<&str>,
session_id: &str,
rows: u16,
cols: u16,
steal: bool,
) -> Result<Self, AttachError> {
let socket_path = paths::socket_path(scope_hash);
Self::attach_to(&socket_path, session_id, rows, cols, steal)
}
pub fn attach_to(
socket_path: &str,
session_id: &str,
rows: u16,
cols: u16,
steal: bool,
) -> Result<Self, AttachError> {
let name = paths::make_socket_name(socket_path).map_err(AttachError::Connect)?;
use interprocess::local_socket::traits::Stream as _;
let stream = Stream::connect(name).map_err(AttachError::Connect)?;
let stream_clone = stream.try_clone().map_err(AttachError::Connect)?;
let mut reader = BufReader::new(stream);
let mut writer = BufWriter::new(stream_clone);
let attach_request = DaemonRequest {
id: 1,
r#type: RequestType::AttachPtySession.into(),
protocol_version: 1,
client_name: "running-process-client".into(),
attach_pty_session: Some(AttachPtySessionRequest {
session_id: session_id.into(),
rows: rows as u32,
cols: cols as u32,
steal,
term: std::env::var("TERM").unwrap_or_default(),
is_tty: true,
}),
..Default::default()
};
write_length_prefixed(&mut writer, &attach_request.encode_to_vec())
.map_err(AttachError::Io)?;
let response_bytes = read_length_prefixed(&mut reader).map_err(AttachError::Io)?;
let response =
DaemonResponse::decode(&response_bytes[..]).map_err(AttachError::Decode)?;
if response.code != StatusCode::Ok as i32 {
let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
return Err(AttachError::Server {
code,
message: response.message,
});
}
let payload: AttachPtySessionResponse = response
.attach_pty_session
.ok_or(AttachError::MissingPayload)?;
Ok(Self {
reader,
writer,
initial_backlog: payload.backlog,
bytes_missed: payload.bytes_missed,
})
}
pub fn recv_frame(&mut self) -> Result<PtyStreamFrame, AttachError> {
let bytes = read_length_prefixed(&mut self.reader).map_err(AttachError::Io)?;
PtyStreamFrame::decode(&bytes[..]).map_err(AttachError::Decode)
}
pub fn recv_frame_with_timeout(
&mut self,
timeout: Duration,
) -> Result<Option<PtyStreamFrame>, AttachError> {
let deadline = std::time::Instant::now() + timeout;
loop {
if !self.reader.buffer().is_empty() {
return self.recv_frame().map(Some);
}
if std::time::Instant::now() >= deadline {
return Ok(None);
}
std::thread::sleep(Duration::from_millis(20));
}
}
pub fn send_input(&mut self, bytes: &[u8]) -> Result<(), AttachError> {
let frame = PtyInputFrame {
frame: Some(InputOneof::Input(bytes.to_vec())),
};
write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
}
pub fn resize(&mut self, rows: u16, cols: u16) -> Result<(), AttachError> {
let frame = PtyInputFrame {
frame: Some(InputOneof::Resize(PtyResize {
rows: rows as u32,
cols: cols as u32,
})),
};
write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
}
pub fn send_interrupt(&mut self) -> Result<(), AttachError> {
let frame = PtyInputFrame {
frame: Some(InputOneof::Interrupt(true)),
};
write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
}
pub fn detach(mut self) -> Result<(), AttachError> {
let frame = PtyInputFrame {
frame: Some(InputOneof::Detach(true)),
};
write_length_prefixed(&mut self.writer, &frame.encode_to_vec()).map_err(AttachError::Io)
}
}
fn write_length_prefixed<W: Write>(w: &mut W, payload: &[u8]) -> Result<(), std::io::Error> {
let len = payload.len() as u32;
w.write_all(&len.to_be_bytes())?;
w.write_all(payload)?;
w.flush()
}
fn read_length_prefixed<R: Read>(r: &mut R) -> Result<Vec<u8>, std::io::Error> {
let mut len_buf = [0u8; 4];
r.read_exact(&mut len_buf)?;
let len = u32::from_be_bytes(len_buf) as usize;
let mut buf = vec![0u8; len];
r.read_exact(&mut buf)?;
Ok(buf)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pty_spawn_request_builder_defaults() {
let req = PtySpawnRequest::new(["echo", "hi"])
.with_size(40, 100)
.with_originator("test:1");
assert_eq!(req.argv, vec!["echo".to_string(), "hi".to_string()]);
assert_eq!(req.rows, 40);
assert_eq!(req.cols, 100);
assert_eq!(req.originator.as_deref(), Some("test:1"));
}
}