use anyhow::{Context, Result, anyhow};
#[cfg(unix)]
use nix::sys::stat::Mode;
use std::path::PathBuf;
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tracing::{debug, trace, warn};
use crate::cmd::Cmd;
pub trait PaneHandshake: Send {
fn wrapper_command(&self, shell: &str) -> String;
fn script_content(&self, shell: &str) -> String {
self.wrapper_command(shell)
}
fn wait(self: Box<Self>) -> Result<()>;
}
const HANDSHAKE_TIMEOUT_SECS: u64 = 5;
pub struct TmuxHandshake {
channel: String,
}
impl TmuxHandshake {
pub fn new() -> Result<Self> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let pid = std::process::id();
let channel = format!("wm_ready_{}_{}", pid, nanos);
Cmd::new("tmux")
.args(&["wait-for", "-L", &channel])
.run()
.context("Failed to initialize wait channel")?;
Ok(Self { channel })
}
}
impl PaneHandshake for TmuxHandshake {
fn wrapper_command(&self, shell: &str) -> String {
let escaped_shell = super::util::escape_for_sh_c_inner_single_quote(shell);
format!(
"sh -c \"stty -echo 2>/dev/null; tmux wait-for -U {}; stty echo 2>/dev/null; exec '{}' -l\"",
self.channel, escaped_shell
)
}
fn script_content(&self, shell: &str) -> String {
format!(
"stty -echo 2>/dev/null; tmux wait-for -U {}; stty echo 2>/dev/null; exec '{}' -l",
self.channel, shell
)
}
fn wait(self: Box<Self>) -> Result<()> {
debug!(channel = %self.channel, "tmux:handshake start");
let mut child = std::process::Command::new("tmux")
.args(["wait-for", "-L", &self.channel])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.context("Failed to spawn tmux wait-for command")?;
let start = Instant::now();
let timeout = Duration::from_secs(HANDSHAKE_TIMEOUT_SECS);
loop {
match child.try_wait() {
Ok(Some(status)) => {
if status.success() {
Cmd::new("tmux")
.args(&["wait-for", "-U", &self.channel])
.run()
.context("Failed to cleanup wait channel")?;
debug!(channel = %self.channel, "tmux:handshake success");
return Ok(());
} else {
let _ = Cmd::new("tmux")
.args(&["wait-for", "-U", &self.channel])
.run();
warn!(channel = %self.channel, status = ?status.code(), "tmux:handshake failed (wait-for error)");
return Err(anyhow!(
"Pane handshake failed - tmux wait-for returned error"
));
}
}
Ok(None) => {
if start.elapsed() >= timeout {
let _ = child.kill();
let _ = child.wait();
let _ = Cmd::new("tmux")
.args(&["wait-for", "-U", &self.channel])
.run();
warn!(
channel = %self.channel,
timeout_secs = HANDSHAKE_TIMEOUT_SECS,
"tmux:handshake timeout"
);
return Err(anyhow!(
"Pane handshake timed out after {}s - shell may have failed to start",
HANDSHAKE_TIMEOUT_SECS
));
}
trace!(
channel = %self.channel,
elapsed_ms = start.elapsed().as_millis(),
"tmux:handshake waiting"
);
thread::sleep(Duration::from_millis(50));
}
Err(e) => {
let _ = child.kill();
let _ = child.wait();
let _ = Cmd::new("tmux")
.args(&["wait-for", "-U", &self.channel])
.run();
warn!(channel = %self.channel, error = %e, "tmux:handshake error");
return Err(anyhow!("Error waiting for pane handshake: {}", e));
}
}
}
}
}
#[cfg(unix)]
pub struct UnixPipeHandshake {
pipe_path: PathBuf,
}
#[cfg(unix)]
impl UnixPipeHandshake {
pub fn new() -> Result<Self> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let pid = std::process::id();
let pipe_path = std::env::temp_dir().join(format!("workmux_pipe_{}_{}", pid, nanos));
let mode = Mode::S_IRUSR | Mode::S_IWUSR;
nix::unistd::mkfifo(&pipe_path, mode).context("Failed to create named pipe")?;
Ok(Self { pipe_path })
}
}
#[cfg(unix)]
impl PaneHandshake for UnixPipeHandshake {
fn wrapper_command(&self, shell: &str) -> String {
let escaped_shell = super::util::escape_for_sh_c_inner_single_quote(shell);
format!(
"sh -c 'echo ready > {}; exec '\\''{}'\\'' -l'",
self.pipe_path.display(),
escaped_shell
)
}
fn script_content(&self, shell: &str) -> String {
format!(
"echo ready > {}; exec '{}' -l",
self.pipe_path.display(),
shell
)
}
fn wait(self: Box<Self>) -> Result<()> {
use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::io::AsRawFd;
const POLL_INTERVAL_MS: u64 = 50;
let file = std::fs::OpenOptions::new()
.read(true)
.custom_flags(libc::O_NONBLOCK)
.open(&self.pipe_path)
.context("Failed to open pipe for reading")?;
let fd = file.as_raw_fd();
let start = Instant::now();
let timeout = Duration::from_secs(HANDSHAKE_TIMEOUT_SECS);
loop {
let mut pollfd = libc::pollfd {
fd,
events: libc::POLLIN,
revents: 0,
};
let poll_timeout_ms = POLL_INTERVAL_MS as i32;
let ret = unsafe { libc::poll(&mut pollfd, 1, poll_timeout_ms) };
if ret > 0 && (pollfd.revents & libc::POLLIN) != 0 {
let mut buf = [0u8; 64];
let bytes_read =
unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) };
let _ = std::fs::remove_file(&self.pipe_path);
if bytes_read > 0 {
return Ok(());
} else {
return Err(anyhow!("Pipe closed without receiving handshake signal"));
}
}
if start.elapsed() >= timeout {
let _ = std::fs::remove_file(&self.pipe_path);
return Err(anyhow!(
"Pane handshake timed out after {}s - shell may have failed to start",
HANDSHAKE_TIMEOUT_SECS
));
}
}
}
}
#[cfg(unix)]
impl Drop for UnixPipeHandshake {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.pipe_path);
}
}