use crate::attach::{self, C_INPUT, C_RESIZE, S_DETACHED, S_EXIT, S_OUTPUT};
use crate::pane::{ExitInfo, OutputHub, Pane};
use crate::paths;
use crate::session;
use anyhow::{Context, Result, anyhow};
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::{Mutex, Notify, mpsc, watch};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "op", rename_all = "snake_case")]
pub enum Request {
Status,
Log {
#[serde(default)]
tail: Option<usize>,
#[serde(default)]
raw: bool,
},
Screenshot {
format: crate::cli::ShotFormat,
#[serde(default)]
trim: bool,
},
Send {
text: String,
#[serde(default = "default_true")]
newline: bool,
},
Resize { cols: u16, rows: u16 },
Restart,
Kill,
Attach {
#[serde(default)]
cols: u16,
#[serde(default)]
rows: u16,
},
Detach,
}
fn default_true() -> bool {
true
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Response {
pub ok: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(default)]
pub data: serde_json::Value,
}
impl Response {
pub fn ok(data: serde_json::Value) -> Self {
Self {
ok: true,
error: None,
data,
}
}
pub fn err(msg: impl Into<String>) -> Self {
Self {
ok: false,
error: Some(msg.into()),
data: serde_json::Value::Null,
}
}
}
pub enum LoopMessage {
Restart,
}
#[derive(Clone)]
pub struct Handle {
pub session_id: String,
pub cmd_pane: Arc<Mutex<Arc<Pane>>>,
pub action_tx: mpsc::UnboundedSender<LoopMessage>,
pub hub: Arc<OutputHub>,
pub exit_rx: watch::Receiver<Option<ExitInfo>>,
pub detach_tx: Arc<watch::Sender<u64>>,
pub attached: Arc<AtomicUsize>,
}
impl Handle {
pub fn new(
session_id: String,
cmd_pane: Arc<Pane>,
action_tx: mpsc::UnboundedSender<LoopMessage>,
hub: Arc<OutputHub>,
exit_rx: watch::Receiver<Option<ExitInfo>>,
detach_tx: Arc<watch::Sender<u64>>,
attached: Arc<AtomicUsize>,
) -> Self {
Self {
session_id,
cmd_pane: Arc::new(Mutex::new(cmd_pane)),
action_tx,
hub,
exit_rx,
detach_tx,
attached,
}
}
pub async fn replace_cmd_pane(&self, new_pane: Arc<Pane>) {
let mut g = self.cmd_pane.lock().await;
*g = new_pane;
}
}
pub async fn serve(handle: Handle) -> Result<()> {
let path = paths::control_socket_path(&handle.session_id)?;
let _ = tokio::fs::remove_file(&path).await;
let listener = UnixListener::bind(&path)
.with_context(|| format!("binding control socket at {}", path.display()))?;
tokio::spawn(async move {
loop {
let stream = match listener.accept().await {
Ok((s, _)) => s,
Err(_) => break,
};
let h = handle.clone();
tokio::spawn(async move {
let _ = handle_conn(stream, h).await;
});
}
});
Ok(())
}
async fn handle_conn(stream: UnixStream, handle: Handle) -> Result<()> {
let (rd, mut wr) = stream.into_split();
let mut br = BufReader::new(rd);
let mut line = String::new();
let n = br.read_line(&mut line).await?;
if n == 0 {
return Ok(());
}
let req = match serde_json::from_str::<Request>(line.trim()) {
Ok(req) => req,
Err(e) => {
let resp = Response::err(format!("invalid request: {e}"));
let mut bytes = serde_json::to_vec(&resp)?;
bytes.push(b'\n');
wr.write_all(&bytes).await?;
wr.flush().await?;
return Ok(());
}
};
if let Request::Attach { cols, rows } = req {
return handle_attach(br.into_inner(), wr, handle, cols, rows).await;
}
let resp = match dispatch(req, &handle).await {
Ok(data) => Response::ok(data),
Err(e) => Response::err(format!("{e}")),
};
let mut bytes = serde_json::to_vec(&resp)?;
bytes.push(b'\n');
wr.write_all(&bytes).await?;
wr.flush().await?;
wr.shutdown().await?;
Ok(())
}
async fn dispatch(req: Request, handle: &Handle) -> Result<serde_json::Value> {
match req {
Request::Status => {
let status = session::read_status(&handle.session_id).await?;
let mut obj = serde_json::to_value(status)?;
if let serde_json::Value::Object(map) = &mut obj {
let path = paths::output_log_path(&handle.session_id)?;
let output_bytes = tokio::fs::metadata(&path)
.await
.map(|m| m.len())
.unwrap_or(0);
let screen_seq = handle.cmd_pane.lock().await.clone().screen_seq();
map.insert("output_bytes".into(), output_bytes.into());
map.insert("screen_seq".into(), screen_seq.into());
}
Ok(obj)
}
Request::Log { tail, raw } => {
let path = paths::output_log_path(&handle.session_id)?;
read_log(&path, tail, raw).await
}
Request::Screenshot { format, trim } => {
let pane = handle.cmd_pane.lock().await.clone();
Ok(pane.screenshot(format, trim))
}
Request::Send { text, newline } => {
let pane = handle.cmd_pane.lock().await.clone();
pane.write_input(text.as_bytes());
let mut sent = text.len();
if newline {
pane.write_input(b"\n");
sent += 1;
}
Ok(serde_json::json!({ "sent": sent }))
}
Request::Resize { cols, rows } => {
let pane = handle.cmd_pane.lock().await.clone();
pane.resize(rows, cols);
Ok(serde_json::json!({ "cols": cols, "rows": rows }))
}
Request::Kill => {
let pane = handle.cmd_pane.lock().await.clone();
pane.kill();
Ok(serde_json::json!({"killed": true}))
}
Request::Restart => {
handle
.action_tx
.send(LoopMessage::Restart)
.map_err(|_| anyhow!("main loop is gone"))?;
Ok(serde_json::json!({"restart": "queued"}))
}
Request::Detach => {
let v = *handle.detach_tx.borrow();
let _ = handle.detach_tx.send(v.wrapping_add(1));
Ok(serde_json::json!({"detached": true}))
}
Request::Attach { .. } => unreachable!("attach handled before dispatch"),
}
}
async fn handle_attach(
rd: tokio::net::unix::OwnedReadHalf,
mut wr: tokio::net::unix::OwnedWriteHalf,
handle: Handle,
cols: u16,
rows: u16,
) -> Result<()> {
handle.attached.fetch_add(1, Ordering::SeqCst);
let _attached_guard = AttachedGuard(handle.attached.clone());
if cols > 0 && rows > 0 {
handle.cmd_pane.lock().await.clone().resize(rows, cols);
}
let mut output = handle.hub.subscribe();
let mut exit_rx = handle.exit_rx.clone();
let mut detach_rx = handle.detach_tx.subscribe();
let already_exited = exit_rx.borrow().is_some();
let gone = Arc::new(Notify::new());
let reader = {
let gone = gone.clone();
let handle = handle.clone();
tokio::spawn(async move {
let mut rd = rd;
loop {
match attach::read_frame(&mut rd).await {
Ok(Some((C_INPUT, payload))) => {
handle.cmd_pane.lock().await.clone().write_input(&payload);
}
Ok(Some((C_RESIZE, payload))) if payload.len() == 4 => {
let cols = u16::from_be_bytes([payload[0], payload[1]]);
let rows = u16::from_be_bytes([payload[2], payload[3]]);
handle.cmd_pane.lock().await.clone().resize(rows, cols);
}
Ok(Some(_)) => {}
Ok(None) | Err(_) => break,
}
}
gone.notify_one();
})
};
loop {
tokio::select! {
biased;
data = output.recv() => match data {
Some(bytes) => {
if attach::write_frame(&mut wr, S_OUTPUT, &bytes).await.is_err() {
break;
}
}
None => break,
},
_ = exit_rx.changed() => {
let info = *exit_rx.borrow();
if info.is_some() {
let _ = attach::write_frame(&mut wr, S_EXIT, &attach::exit_payload(info)).await;
break;
}
},
_ = detach_rx.changed() => {
let _ = attach::write_frame(&mut wr, S_DETACHED, &[]).await;
break;
},
_ = gone.notified() => break,
}
if already_exited && output.is_empty() {
let info = *exit_rx.borrow();
let _ = attach::write_frame(&mut wr, S_EXIT, &attach::exit_payload(info)).await;
break;
}
}
reader.abort();
Ok(())
}
async fn read_log(path: &Path, tail: Option<usize>, raw: bool) -> Result<serde_json::Value> {
let bytes = match tokio::fs::read(path).await {
Ok(b) => b,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Vec::new(),
Err(e) => return Err(e.into()),
};
let processed = if raw {
bytes
} else {
strip_ansi_escapes::strip(&bytes)
};
let text = String::from_utf8_lossy(&processed).into_owned();
let out = match tail {
Some(n) => last_n_lines(&text, n),
None => text,
};
Ok(serde_json::json!({"text": out}))
}
pub fn last_n_lines(text: &str, n: usize) -> String {
if n == 0 {
return String::new();
}
let trimmed = text.strip_suffix('\n').unwrap_or(text);
let mut start = 0;
for (seen, (i, _)) in trimmed.rmatch_indices('\n').enumerate() {
if seen + 1 == n {
start = i + 1;
break;
}
}
text[start..].to_string()
}
struct AttachedGuard(Arc<AtomicUsize>);
impl Drop for AttachedGuard {
fn drop(&mut self) {
self.0.fetch_sub(1, Ordering::SeqCst);
}
}
pub fn cleanup(session_id: &str) {
if let Ok(path) = paths::control_socket_path(session_id) {
let _ = std::fs::remove_file(path);
}
}
#[cfg(test)]
mod tests {
use super::last_n_lines;
#[test]
fn tail_respects_trailing_newline() {
assert_eq!(last_n_lines("a\nb\nc\n", 2), "b\nc\n");
assert_eq!(last_n_lines("a\nb\nc\n", 1), "c\n");
}
#[test]
fn tail_without_trailing_newline() {
assert_eq!(last_n_lines("a\nb\nc", 2), "b\nc");
}
#[test]
fn tail_larger_than_available_returns_all() {
assert_eq!(last_n_lines("a\nb\n", 10), "a\nb\n");
}
#[test]
fn tail_zero_is_empty() {
assert_eq!(last_n_lines("a\nb\n", 0), "");
}
#[test]
fn tail_empty_input() {
assert_eq!(last_n_lines("", 5), "");
}
}