pub mod raw_mode;
pub mod server_launcher;
use crate::protocol::{self, ClientMsg, ServerMsg, FrameReader, read_one_message};
use std::io::{self, BufWriter, Read, Write};
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
use raw_mode::RawMode;
use server_launcher::ensure_server_running;
const DETACH_KEY: u8 = 0x1c;
const FOCUS_IN: u8 = b'I';
const FOCUS_OUT: u8 = b'O';
struct PanicHookGuard;
impl Drop for PanicHookGuard {
fn drop(&mut self) {
let _ = std::panic::take_hook();
}
}
enum DispatchResult {
Continue,
Done,
}
fn dispatch_server_msg(msg: &ServerMsg, stdout: &mut impl Write) -> io::Result<DispatchResult> {
match msg {
ServerMsg::ScrollbackLine(line) => {
stdout.write_all(line)?;
stdout.write_all(b"\r\n")?;
}
ServerMsg::ScreenUpdate(data) => {
stdout.write_all(data)?;
}
ServerMsg::Passthrough(data) => {
stdout.write_all(data)?;
stdout.flush()?;
}
ServerMsg::History(lines) => {
for line in lines {
stdout.write_all(line)?;
stdout.write_all(b"\r\n")?;
}
}
ServerMsg::SessionEnded => {
stdout.flush()?;
eprintln!("[retach: session ended]");
return Ok(DispatchResult::Done);
}
ServerMsg::Error(e) => {
stdout.flush()?;
eprintln!("[retach error: {}]", e);
return Ok(DispatchResult::Done);
}
other => {
tracing::debug!("ignoring unexpected server message: {:?}", std::mem::discriminant(other));
}
}
Ok(DispatchResult::Continue)
}
fn get_terminal_size() -> (u16, u16) {
if let Some(size) = terminal_size::terminal_size() {
(size.0 .0, size.1 .0)
} else {
(crate::session::DEFAULT_COLS, crate::session::DEFAULT_ROWS)
}
}
type SocketWriter = std::sync::Arc<tokio::sync::Mutex<tokio::net::unix::OwnedWriteHalf>>;
async fn run_stdin_to_socket(sw: SocketWriter) -> anyhow::Result<()> {
let mut carry: Vec<u8> = Vec::with_capacity(2);
loop {
let result = tokio::task::spawn_blocking(|| {
let mut buf = [0u8; 1024];
let n = io::stdin().read(&mut buf)?;
Ok::<_, io::Error>((buf, n))
})
.await;
match result {
Ok(Ok((_buf, 0))) => {
if !carry.is_empty() {
let msg = protocol::encode(&ClientMsg::Input(std::mem::take(&mut carry)))?;
let mut w = sw.lock().await;
w.write_all(&msg).await?;
}
break;
}
Ok(Ok((buf, n))) => {
let raw: Vec<u8> = if carry.is_empty() {
buf[..n].to_vec()
} else {
let mut combined = std::mem::take(&mut carry);
combined.extend_from_slice(&buf[..n]);
combined
};
if let Some(pos) = raw.iter().position(|&b| b == DETACH_KEY) {
let mut w = sw.lock().await;
if pos > 0 {
if let Ok(msg) = protocol::encode(&ClientMsg::Input(raw[..pos].to_vec())) {
w.write_all(&msg).await?;
}
}
if let Ok(msg) = protocol::encode(&ClientMsg::Detach) {
w.write_all(&msg).await?;
}
drop(w);
break;
}
let mut filtered = Vec::with_capacity(raw.len());
let mut i = 0;
while i < raw.len() {
if raw[i] == 0x1b {
if i + 1 < raw.len() {
if raw[i + 1] == b'[' {
if i + 2 < raw.len() {
if raw[i + 2] == FOCUS_IN {
if let Ok(msg) = protocol::encode(&ClientMsg::RefreshScreen) {
let mut w = sw.lock().await;
let _ = w.write_all(&msg).await;
}
i += 3;
continue;
} else if raw[i + 2] == FOCUS_OUT {
i += 3;
continue;
}
} else {
carry.extend_from_slice(&raw[i..]);
break;
}
} else {
filtered.push(raw[i]);
i += 1;
continue;
}
} else {
carry.push(0x1b);
break;
}
}
filtered.push(raw[i]);
i += 1;
}
if !filtered.is_empty() {
let msg = protocol::encode(&ClientMsg::Input(filtered))?;
let mut w = sw.lock().await;
w.write_all(&msg).await?;
}
}
Ok(Err(e)) => return Err(anyhow::Error::from(e)),
Err(e) => return Err(anyhow::Error::from(e)),
}
}
Ok(())
}
async fn run_socket_to_stdout(
mut sock_reader: tokio::net::unix::OwnedReadHalf,
leftover: Vec<u8>,
) -> anyhow::Result<()> {
let mut frames = FrameReader::with_leftover(leftover);
let mut stdout = BufWriter::new(io::stdout());
while let Some(msg) = frames.decode_next::<ServerMsg>()? {
if matches!(dispatch_server_msg(&msg, &mut stdout)?, DispatchResult::Done) {
return Ok(());
}
}
stdout.flush()?;
loop {
if !frames.fill_from(&mut sock_reader).await? {
eprintln!("[retach: detached]");
break;
}
while let Some(msg) = frames.decode_next::<ServerMsg>()? {
if matches!(dispatch_server_msg(&msg, &mut stdout)?, DispatchResult::Done) {
return Ok(());
}
}
stdout.flush()?;
}
Ok(())
}
pub async fn connect(name: &str, history: usize, mode: crate::protocol::ConnectMode) -> anyhow::Result<()> {
ensure_server_running().await?;
let mut stream = UnixStream::connect(crate::server::socket_path()?).await?;
let (cols, rows) = get_terminal_size();
let msg = protocol::encode(&ClientMsg::Connect {
name: name.to_string(),
history,
cols,
rows,
mode,
})?;
stream.write_all(&msg).await?;
let mut frames = FrameReader::new();
loop {
if !frames.fill_from(&mut stream).await? {
eprintln!("[retach: server closed connection]");
return Ok(());
}
if let Some(msg) = frames.decode_next::<ServerMsg>()? {
match msg {
ServerMsg::Connected { name: ref session_name, new_session } => {
if new_session {
eprintln!("[retach: new session '{}' (detach: Ctrl+\\)]", session_name);
} else {
eprintln!("[retach: reattached to '{}' (detach: Ctrl+\\)]", session_name);
}
break;
}
ServerMsg::Error(e) => {
eprintln!("[retach error: {}]", e);
return Ok(());
}
_ => {
eprintln!("[retach: unexpected response from server]");
return Ok(());
}
}
}
}
let leftover = frames.into_leftover();
let prev_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
raw_mode::emergency_restore();
cleanup_terminal();
prev_hook(info);
}));
let _hook_guard = PanicHookGuard;
let _raw = RawMode::enter()?;
let (sock_reader, sock_writer) = stream.into_split();
let sock_writer = std::sync::Arc::new(tokio::sync::Mutex::new(sock_writer));
let mut sigwinch =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::window_change())?;
let sw = sock_writer.clone();
let sigwinch_handle = tokio::spawn(async move {
while sigwinch.recv().await.is_some() {
let (cols, rows) = get_terminal_size();
let mut w = sw.lock().await;
if let Ok(msg) = protocol::encode(&ClientMsg::Resize { cols, rows }) {
if let Err(e) = w.write_all(&msg).await {
tracing::debug!(error = %e, "failed to send resize");
break;
}
}
if let Ok(msg) = protocol::encode(&ClientMsg::RefreshScreen) {
if let Err(e) = w.write_all(&msg).await {
tracing::debug!(error = %e, "failed to send refresh after resize");
break;
}
}
}
});
let stdin_task = tokio::spawn(run_stdin_to_socket(sock_writer.clone()));
let socket_task = tokio::spawn(run_socket_to_stdout(sock_reader, leftover));
let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())?;
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
tokio::select! {
r = stdin_task => {
if let Ok(Err(e)) = r {
tracing::debug!(error = %e, "stdin task error");
}
}
r = socket_task => {
if let Ok(Err(e)) = r {
tracing::warn!(error = %e, "socket task error");
eprintln!("[retach error: {}]", e);
}
}
_ = sigint.recv() => {
tracing::debug!("received SIGINT, detaching");
}
_ = sigterm.recv() => {
tracing::debug!("received SIGTERM, detaching");
}
}
sigwinch_handle.abort();
drop(_hook_guard);
cleanup_terminal();
Ok(())
}
fn cleanup_terminal() {
let mut stdout = io::stdout();
let _ = stdout.write_all(concat!(
"\x1b[?25h", "\x1b[?7h", "\x1b[?1l", "\x1b[?2004l", "\x1b[?1000l", "\x1b[?1002l", "\x1b[?1003l", "\x1b[?1005l", "\x1b[?1006l", "\x1b[?1004l", "\x1b[?2026l", "\x1b>", "\x1b[0 q", "\x1b[0m", ).as_bytes());
let _ = stdout.flush();
}
pub async fn list_sessions() -> anyhow::Result<()> {
let path = crate::server::socket_path()?;
if !path.exists() {
println!("No active sessions");
return Ok(());
}
let mut stream = UnixStream::connect(&path).await?;
let msg = protocol::encode(&ClientMsg::ListSessions)?;
stream.write_all(&msg).await?;
let resp: ServerMsg = read_one_message(&mut stream).await?;
if let ServerMsg::SessionList(sessions) = resp {
if sessions.is_empty() {
println!("No active sessions");
} else {
for s in sessions {
println!("{} ({}x{})", s.name, s.cols, s.rows);
}
}
}
Ok(())
}
pub async fn kill_session(name: &str) -> anyhow::Result<()> {
let path = crate::server::socket_path()?;
if !path.exists() {
anyhow::bail!("server not running");
}
let mut stream = UnixStream::connect(&path).await?;
let msg = protocol::encode(&ClientMsg::KillSession {
name: name.to_string(),
})?;
stream.write_all(&msg).await?;
let resp: ServerMsg = read_one_message(&mut stream).await?;
match resp {
ServerMsg::SessionKilled { name } => println!("killed session '{}'", name),
ServerMsg::Error(e) => println!("error: {}", e),
_ => {}
}
Ok(())
}