use std::io::{self, BufRead, BufReader, Write};
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::PathBuf;
use std::sync::mpsc;
use std::time::Duration;
use serde::{Deserialize, Serialize};
pub(crate) const IPC_POOL_SIZE: usize = 4;
pub(crate) const IPC_QUEUE_CAPACITY: usize = 16;
pub(crate) const IPC_READ_TIMEOUT: Duration = Duration::from_secs(5);
pub(crate) const IPC_WRITE_TIMEOUT: Duration = Duration::from_secs(2);
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SplitDirection {
Horizontal,
Vertical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "cmd", rename_all = "snake_case")]
pub enum IpcRequest {
Split {
direction: SplitDirection,
pane: Option<usize>,
},
Close {
pane: usize,
},
Focus {
pane: usize,
},
Equalize,
List,
Layout {
spec: String,
},
Exec {
pane: usize,
command: String,
},
Save {
path: String,
},
Load {
path: String,
},
ClearHistory {
pane: usize,
},
SetHistoryLimit {
pane: usize,
lines: usize,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PaneInfo {
pub index: usize,
pub id: usize,
pub cols: u16,
pub rows: u16,
pub alive: bool,
pub active: bool,
pub command: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IpcResponse {
pub ok: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub panes: Option<Vec<PaneInfo>>,
}
impl IpcResponse {
pub fn success(message: impl Into<String>) -> Self {
Self {
ok: true,
message: Some(message.into()),
error: None,
panes: None,
}
}
pub fn with_panes(panes: Vec<PaneInfo>) -> Self {
Self {
ok: true,
message: None,
error: None,
panes: Some(panes),
}
}
pub fn error(message: impl Into<String>) -> Self {
Self {
ok: false,
message: None,
error: Some(message.into()),
panes: None,
}
}
}
pub type ResponseSender = std::sync::mpsc::SyncSender<IpcResponse>;
pub fn socket_path() -> PathBuf {
socket_path_for_pid(std::process::id())
}
pub fn socket_path_for_pid(pid: u32) -> PathBuf {
let dir = std::env::var("XDG_RUNTIME_DIR").unwrap_or_else(|_| "/tmp".to_string());
PathBuf::from(format!("{}/ezpn-{}.sock", dir, pid))
}
pub fn start_listener() -> anyhow::Result<mpsc::Receiver<(IpcRequest, ResponseSender)>> {
let path = socket_path();
let _ = std::fs::remove_file(&path);
let listener = UnixListener::bind(&path)?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ = std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o600));
}
let (tx, rx) = mpsc::channel();
let pool = spawn_ipc_pool(tx);
std::thread::spawn(move || {
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
for stream in listener.incoming() {
match stream {
Ok(stream) => {
if let Err(crossbeam_channel::TrySendError::Full(mut s)) =
pool.work_tx.try_send(stream)
{
let resp = IpcResponse::error("ezpn ipc pool saturated; retry");
let _ = write_response(&mut s, &resp);
drop(s);
}
}
Err(_) => break,
}
}
drop(pool);
}));
});
Ok(rx)
}
struct IpcPool {
work_tx: crossbeam_channel::Sender<UnixStream>,
_workers: Vec<std::thread::JoinHandle<()>>,
}
fn spawn_ipc_pool(cmd_tx: mpsc::Sender<(IpcRequest, ResponseSender)>) -> IpcPool {
let (work_tx, work_rx) = crossbeam_channel::bounded::<UnixStream>(IPC_QUEUE_CAPACITY);
let mut workers = Vec::with_capacity(IPC_POOL_SIZE);
for worker_id in 0..IPC_POOL_SIZE {
let rx = work_rx.clone();
let cmd_tx = cmd_tx.clone();
let handle = std::thread::Builder::new()
.name(format!("ezpn-ipc-{worker_id}"))
.spawn(move || {
while let Ok(stream) = rx.recv() {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = stream.set_read_timeout(Some(IPC_READ_TIMEOUT));
let _ = stream.set_write_timeout(Some(IPC_WRITE_TIMEOUT));
handle_client(stream, cmd_tx.clone());
}));
if let Err(payload) = result {
eprintln!(
"ezpn-ipc: worker {worker_id} panicked: {}",
panic_payload_to_string(&payload)
);
}
}
})
.expect("spawn ezpn-ipc worker thread");
workers.push(handle);
}
IpcPool {
work_tx,
_workers: workers,
}
}
fn panic_payload_to_string(payload: &Box<dyn std::any::Any + Send>) -> String {
if let Some(s) = payload.downcast_ref::<&'static str>() {
return (*s).to_string();
}
if let Some(s) = payload.downcast_ref::<String>() {
return s.clone();
}
"unknown panic payload".to_string()
}
pub fn cleanup() {
let _ = std::fs::remove_file(socket_path());
}
fn handle_client(stream: UnixStream, tx: mpsc::Sender<(IpcRequest, ResponseSender)>) {
let Ok(read_stream) = stream.try_clone() else {
return;
};
let reader = BufReader::new(read_stream);
let mut writer = stream;
for line in reader.lines() {
let line = match line {
Ok(line) => line,
Err(e)
if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
{
let _ = write_response(&mut writer, &IpcResponse::error("idle timeout"));
break;
}
Err(_) => break,
};
if line.trim().is_empty() {
continue;
}
let request = match serde_json::from_str::<IpcRequest>(&line) {
Ok(request) => request,
Err(error) => {
let response = IpcResponse::error(format!("invalid request: {}", error));
let _ = write_response(&mut writer, &response);
continue;
}
};
let (resp_tx, resp_rx) = mpsc::sync_channel(1);
if tx.send((request, resp_tx)).is_err() {
let response = IpcResponse::error("listener unavailable");
let _ = write_response(&mut writer, &response);
break;
}
let response = resp_rx
.recv()
.unwrap_or_else(|_| IpcResponse::error("internal error"));
if write_response(&mut writer, &response).is_err() {
break;
}
}
}
fn write_response(writer: &mut UnixStream, response: &IpcResponse) -> anyhow::Result<()> {
writeln!(writer, "{}", serde_json::to_string(response)?)?;
writer.flush()?;
Ok(())
}