use crate::cli::ShotFormat;
use crate::control::{Request, Response, last_n_lines};
use crate::paths;
use crate::session::{self, Meta, State, Status};
use anyhow::{Context, Result, anyhow};
use chrono::{DateTime, Utc};
use regex::Regex;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
pub async fn list(json: bool) -> Result<()> {
let ids = session::list_ids().await?;
let mut entries = Vec::new();
for id in &ids {
let meta = match session::read_meta(id).await {
Ok(m) => m,
Err(_) => continue,
};
let status = session::read_status(id).await.unwrap_or(Status::starting());
let note = session::read_note(id).await;
entries.push((meta, status, note));
}
entries.sort_by_key(|e| std::cmp::Reverse(e.1.last_change));
if json {
let arr: Vec<serde_json::Value> = entries
.iter()
.map(|(m, s, note)| {
serde_json::json!({
"id": m.id,
"cmd": m.cmd,
"state": s.state,
"alive": is_owner_alive(m, s),
"exit_code": s.exit_code,
"started_at": m.started_at,
"last_change": s.last_change,
"note": note,
})
})
.collect();
println!("{}", serde_json::to_string_pretty(&arr)?);
} else if entries.is_empty() {
println!("(no sessions)");
} else {
println!("{:<10} {:<8} {:<10} CMD", "ID", "STATE", "AGE");
for (m, s, note) in &entries {
let age = format_age(m.started_at, Utc::now());
let cmd = m.cmd.join(" ");
let suffix = match note {
Some(n) if !n.is_empty() => format!(" ⚑ {n}"),
Some(_) => " ⚑".to_string(),
None => String::new(),
};
println!(
"{:<10} {:<8} {:<10} {}{}",
m.id,
state_label_for(Some(m), s),
age,
cmd,
suffix,
);
}
}
Ok(())
}
pub async fn status(session: Option<String>, json: bool) -> Result<()> {
let id = session::resolve(session).await?;
let resp = request(&id, &Request::Status).await;
let data = match resp {
Ok(r) if r.ok => r.data,
_ => {
let mut obj = serde_json::to_value(session::read_status(&id).await?)?;
if let serde_json::Value::Object(map) = &mut obj {
let output_bytes = match paths::output_log_path(&id) {
Ok(p) => tokio::fs::metadata(&p).await.map(|m| m.len()).unwrap_or(0),
Err(_) => 0,
};
map.insert("output_bytes".into(), output_bytes.into());
map.insert("screen_seq".into(), serde_json::Value::Null);
}
obj
}
};
if json {
let mut out = serde_json::Map::new();
out.insert("session".into(), serde_json::Value::String(id));
out.insert("status".into(), data);
println!("{}", serde_json::to_string_pretty(&out)?);
} else {
let s: Status = serde_json::from_value(data)?;
let meta = session::read_meta(&id).await.ok();
println!("session: {id}");
if let Some(m) = meta.as_ref() {
println!("cmd: {}", m.cmd.join(" "));
}
println!("state: {}", state_label_for(meta.as_ref(), &s));
if let Some(c) = s.exit_code {
println!("exit: {c}");
}
if let Some(note) = session::read_note(&id).await {
println!("flag: ⚑ {note}");
}
}
Ok(())
}
pub async fn log(
session: Option<String>,
tail: Option<usize>,
grep: Option<String>,
raw: bool,
since: Option<u64>,
follow: bool,
json: bool,
) -> Result<()> {
let id = session::resolve(session).await?;
let path = paths::output_log_path(&id)?;
let re = grep
.as_deref()
.map(Regex::new)
.transpose()
.context("invalid --grep regex")?;
if follow {
return follow_log(&id, &path, raw, since.unwrap_or(0), re.as_ref()).await;
}
if let Some(off) = since {
let (text, offset) = read_slice(&path, off, raw).await?;
emit_log(&id, grep_filter(text, re.as_ref()), offset, json).await
} else {
let server_tail = if re.is_some() { None } else { tail };
let resp = request(
&id,
&Request::Log {
tail: server_tail,
raw,
},
)
.await;
let text = match resp {
Ok(r) if r.ok => r
.data
.get("text")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
_ => {
let bytes = tokio::fs::read(&path).await.unwrap_or_default();
let processed = if raw {
bytes
} else {
strip_ansi_escapes::strip(&bytes)
};
let text = String::from_utf8_lossy(&processed).into_owned();
match server_tail {
Some(n) => last_n_lines(&text, n),
None => text,
}
}
};
let text = match re.as_ref() {
Some(re) => {
let filtered = grep_filter(text, Some(re));
match tail {
Some(n) => last_n_lines(&filtered, n),
None => filtered,
}
}
None => text,
};
let offset = tokio::fs::metadata(&path)
.await
.map(|m| m.len())
.unwrap_or(0);
emit_log(&id, text, offset, json).await
}
}
fn grep_filter(text: String, re: Option<&Regex>) -> String {
let Some(re) = re else { return text };
let mut out = String::new();
for line in text.lines() {
if re.is_match(line) {
out.push_str(line);
out.push('\n');
}
}
out
}
pub async fn screenshot(session: Option<String>, format: ShotFormat, trim: bool) -> Result<()> {
let id = session::resolve(session).await?;
let req = Request::Screenshot { format, trim };
let data = match request(&id, &req).await {
Ok(r) if r.ok => r.data,
_ => {
let path = paths::output_log_path(&id)?;
let bytes = tokio::fs::read(&path).await.unwrap_or_default();
crate::render::render_log(&bytes, format, trim)
}
};
match format {
ShotFormat::Plain | ShotFormat::Ansi => {
if let Some(text) = data.get("text").and_then(|v| v.as_str()) {
println!("{text}");
}
}
ShotFormat::Json => println!("{}", serde_json::to_string_pretty(&data)?),
}
Ok(())
}
async fn emit_log(id: &str, text: String, offset: u64, json: bool) -> Result<()> {
if json {
let done = is_finished(id).await;
let obj = serde_json::json!({ "text": text, "offset": offset, "done": done });
println!("{}", serde_json::to_string(&obj)?);
} else {
print!("{text}");
}
Ok(())
}
async fn read_slice(path: &std::path::Path, off: u64, raw: bool) -> Result<(String, u64)> {
use tokio::io::{AsyncReadExt, AsyncSeekExt};
let mut f = match tokio::fs::File::open(path).await {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok((String::new(), off)),
Err(e) => return Err(e.into()),
};
let len = f.metadata().await?.len();
if off >= len {
return Ok((String::new(), len));
}
f.seek(std::io::SeekFrom::Start(off)).await?;
let mut bytes = Vec::new();
f.read_to_end(&mut bytes).await?;
let safe = safe_prefix_len(&bytes);
let chunk = &bytes[..safe];
let processed = if raw {
chunk.to_vec()
} else {
strip_ansi_escapes::strip(chunk)
};
Ok((
String::from_utf8_lossy(&processed).into_owned(),
off + safe as u64,
))
}
fn safe_prefix_len(bytes: &[u8]) -> usize {
let mut end = bytes.len();
if let Some(esc) = bytes.iter().rposition(|&b| b == 0x1b)
&& !escape_complete(&bytes[esc..])
{
end = esc;
}
if let Err(e) = std::str::from_utf8(&bytes[..end])
&& e.error_len().is_none()
{
end = e.valid_up_to();
}
end
}
fn escape_complete(tail: &[u8]) -> bool {
match tail.get(1) {
None => false, Some(b'[') => tail[2..].iter().any(|&b| (0x40..=0x7e).contains(&b)),
Some(b']') => tail[2..].contains(&0x07) || tail.windows(2).any(|w| w == [0x1b, b'\\']),
Some(b'O') => tail.len() >= 3,
Some(_) => true,
}
}
async fn is_finished(id: &str) -> bool {
session::read_status(id)
.await
.map(|s| s.state.is_terminal())
.unwrap_or(false)
}
async fn follow_log(
id: &str,
path: &std::path::Path,
raw: bool,
start: u64,
re: Option<&Regex>,
) -> Result<()> {
use std::io::Write as _;
let mut off = start;
let mut idle_after_done = 0u32;
loop {
let (text, new_off) = read_slice(path, off, raw).await?;
let text = grep_filter(text, re);
if !text.is_empty() {
let mut out = std::io::stdout();
let _ = out.write_all(text.as_bytes());
let _ = out.flush();
}
let advanced = new_off > off;
off = new_off;
if is_finished(id).await {
if advanced {
idle_after_done = 0;
} else {
idle_after_done += 1;
if idle_after_done >= 2 {
break;
}
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
Ok(())
}
async fn operate(id: &str, req: &Request, action: &str) -> Result<serde_json::Value> {
match request(id, req).await {
Ok(r) if r.ok => Ok(r.data),
Ok(r) => Err(anyhow!(
r.error.unwrap_or_else(|| format!("{action} failed"))
)),
Err(e) => Err(dead_worker_error(id, action, e).await),
}
}
async fn dead_worker_error(id: &str, action: &str, err: anyhow::Error) -> anyhow::Error {
match session::read_status(id).await {
Ok(s) if s.state.is_terminal() => anyhow!(
"cannot {action}: session {id} has already finished — it no longer accepts input"
),
_ => anyhow!("cannot {action}: session {id} is not running (its worker is gone): {err}"),
}
}
fn emit_result(json: bool, data: serde_json::Value, human: impl FnOnce() -> String) -> Result<()> {
if json {
println!("{}", serde_json::to_string(&data)?);
} else {
println!("{}", human());
}
Ok(())
}
pub async fn restart(session: Option<String>, json: bool) -> Result<()> {
let id = session::resolve(session).await?;
let data = operate(&id, &Request::Restart, "restart").await?;
emit_result(json, data, || format!("restart queued for session {id}"))
}
pub async fn kill(session: Option<String>, json: bool) -> Result<()> {
let id = session::resolve(session).await?;
let data = operate(&id, &Request::Kill, "kill").await?;
emit_result(json, data, || format!("killed session {id}"))
}
pub async fn send(session: Option<String>, text: String, newline: bool, json: bool) -> Result<()> {
let id = session::resolve(session).await?;
let data = operate(&id, &Request::Send { text, newline }, "send").await?;
if json {
println!("{}", serde_json::to_string(&data)?);
}
Ok(())
}
pub async fn key(session: Option<String>, keys: Vec<String>, json: bool) -> Result<()> {
let id = session::resolve(session).await?;
let mut bytes = Vec::new();
for name in &keys {
let seq = key_to_bytes(name)
.ok_or_else(|| anyhow!("unknown key `{name}` (try Enter, Tab, Esc, Up, C-c, F1, …)"))?;
bytes.extend_from_slice(&seq);
}
let text = String::from_utf8(bytes).expect("key sequences are ASCII");
let data = operate(
&id,
&Request::Send {
text,
newline: false,
},
"send keys",
)
.await?;
if json {
println!("{}", serde_json::to_string(&data)?);
}
Ok(())
}
pub async fn resize(session: Option<String>, size: String, json: bool) -> Result<()> {
let id = session::resolve(session).await?;
let (cols, rows) = crate::run::parse_size(&size)?;
let data = operate(&id, &Request::Resize { cols, rows }, "resize").await?;
emit_result(json, data, || {
format!("resized session {id} to {cols}x{rows}")
})
}
pub async fn flag(session: Option<String>, message: Option<String>, json: bool) -> Result<()> {
let id = session::resolve(session).await?;
let msg = message.unwrap_or_else(|| "needs attention".to_string());
session::write_note(&id, &msg).await?;
emit_result(
json,
serde_json::json!({ "flagged": true, "note": msg }),
|| format!("flagged session {id}: {msg}"),
)
}
pub async fn unflag(session: Option<String>, json: bool) -> Result<()> {
let id = session::resolve(session).await?;
session::clear_note(&id).await?;
emit_result(json, serde_json::json!({ "unflagged": true }), || {
format!("unflagged session {id}")
})
}
fn key_to_bytes(name: &str) -> Option<Vec<u8>> {
let lower = name.to_ascii_lowercase();
let ctrl = lower
.strip_prefix("c-")
.or_else(|| lower.strip_prefix("ctrl-"))
.or_else(|| name.strip_prefix('^'));
if let Some(rest) = ctrl
&& rest.len() == 1
{
let c = rest.as_bytes()[0];
if c.is_ascii_alphabetic() {
return Some(vec![c.to_ascii_uppercase() - b'@']); }
}
let seq: &[u8] = match lower.as_str() {
"enter" | "return" | "cr" => b"\r",
"tab" => b"\t",
"esc" | "escape" => b"\x1b",
"space" => b" ",
"backspace" | "bs" => b"\x7f",
"up" => b"\x1b[A",
"down" => b"\x1b[B",
"right" => b"\x1b[C",
"left" => b"\x1b[D",
"home" => b"\x1b[H",
"end" => b"\x1b[F",
"insert" | "ins" => b"\x1b[2~",
"delete" | "del" => b"\x1b[3~",
"pageup" | "pgup" => b"\x1b[5~",
"pagedown" | "pgdn" => b"\x1b[6~",
"f1" => b"\x1bOP",
"f2" => b"\x1bOQ",
"f3" => b"\x1bOR",
"f4" => b"\x1bOS",
"f5" => b"\x1b[15~",
"f6" => b"\x1b[17~",
"f7" => b"\x1b[18~",
"f8" => b"\x1b[19~",
"f9" => b"\x1b[20~",
"f10" => b"\x1b[21~",
"f11" => b"\x1b[23~",
"f12" => b"\x1b[24~",
_ => return None,
};
Some(seq.to_vec())
}
#[allow(clippy::too_many_arguments)]
pub async fn expect(
session: Option<String>,
pattern: String,
timeout: String,
since: Option<u64>,
from_now: bool,
raw: bool,
json: bool,
) -> Result<i32> {
let id = session::resolve(session).await?;
let path = paths::output_log_path(&id)?;
let re = Regex::new(&pattern).context("invalid expect regex")?;
let timeout = crate::run::parse_timeout(&timeout)?;
let deadline = timeout.map(|d| std::time::Instant::now() + d);
let mut off = match since {
Some(o) => o,
None if from_now => tokio::fs::metadata(&path)
.await
.map(|m| m.len())
.unwrap_or(0),
None => 0,
};
let mut buf = String::new();
loop {
let (text, new_off) = read_slice(&path, off, raw).await?;
off = new_off;
if !text.is_empty() {
buf.push_str(&text);
if let Some(m) = re.find(&buf) {
if json {
let obj = serde_json::json!({
"matched": m.as_str(),
"offset": off,
});
println!("{}", serde_json::to_string(&obj)?);
} else {
println!("{}", m.as_str());
}
return Ok(0);
}
}
if is_finished(&id).await {
let (tail, _) = read_slice(&path, off, raw).await?;
buf.push_str(&tail);
if re.is_match(&buf) {
return Ok(0);
}
eprintln!("babysit: session {id} ended before matching /{pattern}/");
return Ok(1);
}
if let Some(dl) = deadline
&& std::time::Instant::now() >= dl
{
eprintln!("babysit: timed out waiting for /{pattern}/ in session {id}");
return Ok(124);
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
pub async fn wait_idle(session: Option<String>, settle: String, timeout: String) -> Result<i32> {
let id = session::resolve(session).await?;
let path = paths::output_log_path(&id)?;
let settle = crate::run::parse_duration(&settle)?;
let timeout = crate::run::parse_timeout(&timeout)?;
let deadline = timeout.map(|d| std::time::Instant::now() + d);
let mut last_size = tokio::fs::metadata(&path)
.await
.map(|m| m.len())
.unwrap_or(0);
let mut quiet_since = std::time::Instant::now();
loop {
let size = tokio::fs::metadata(&path)
.await
.map(|m| m.len())
.unwrap_or(0);
if size != last_size {
last_size = size;
quiet_since = std::time::Instant::now();
} else if quiet_since.elapsed() >= settle {
return Ok(0);
}
if is_finished(&id).await {
return Ok(0);
}
if let Some(dl) = deadline
&& std::time::Instant::now() >= dl
{
eprintln!("babysit: timed out waiting for session {id} to settle");
return Ok(124);
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
}
pub async fn wait(session: Option<String>, timeout: Option<String>) -> Result<i32> {
let id = session::resolve(session).await?;
let timeout = match timeout {
Some(s) => crate::run::parse_timeout(&s)?,
None => None,
};
let deadline = timeout.map(|d| std::time::Instant::now() + d);
loop {
if let Ok(status) = session::read_status(&id).await {
match status.state {
State::Exited => return Ok(status.exit_code.unwrap_or(0)),
State::Killed => return Ok(status.exit_code.unwrap_or(130)),
State::Starting | State::Running => {
if let Ok(meta) = session::read_meta(&id).await
&& !session::is_pid_alive(meta.babysit_pid)
{
eprintln!("babysit: session {id} owner died before exiting");
return Ok(137);
}
}
}
}
if let Some(dl) = deadline
&& std::time::Instant::now() >= dl
{
eprintln!("babysit: timed out waiting for session {id}");
return Ok(124);
}
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
}
pub async fn prune(dry_run: bool, json: bool) -> Result<()> {
let ids = session::list_ids().await?;
let mut targets: Vec<(String, Meta)> = Vec::new();
for id in &ids {
let meta = match session::read_meta(id).await {
Ok(m) => m,
Err(_) => continue,
};
let status = session::read_status(id).await.ok();
let alive = session::is_pid_alive(meta.babysit_pid);
let should_delete = match status.as_ref().map(|s| s.state) {
Some(State::Exited | State::Killed) => true,
Some(State::Starting | State::Running) if !alive => true,
None if !alive => true,
_ => false,
};
if should_delete {
targets.push((id.clone(), meta));
}
}
if targets.is_empty() {
if json {
println!("[]");
} else {
println!("(nothing to prune)");
}
return Ok(());
}
let mut results = Vec::new();
for (id, meta) in &targets {
let cmd = meta.cmd.join(" ");
if dry_run {
if !json {
println!("would delete {id} {cmd}");
}
results.push(serde_json::json!({ "id": id, "cmd": cmd, "deleted": false }));
} else {
let dir = paths::session_dir(id)?;
if let Err(e) = tokio::fs::remove_dir_all(&dir).await {
eprintln!("babysit: failed to remove {}: {e}", dir.display());
continue;
}
if !json {
println!("deleted {id} {cmd}");
}
results.push(serde_json::json!({ "id": id, "cmd": cmd, "deleted": true }));
}
}
if json {
println!("{}", serde_json::to_string_pretty(&results)?);
}
Ok(())
}
async fn request(id: &str, req: &Request) -> Result<Response> {
let path = paths::control_socket_path(id)?;
let mut stream = UnixStream::connect(&path)
.await
.with_context(|| format!("connecting to control socket {}", path.display()))?;
let mut bytes = serde_json::to_vec(req)?;
bytes.push(b'\n');
stream.write_all(&bytes).await?;
stream.flush().await?;
let mut br = BufReader::new(stream);
let mut line = String::new();
br.read_line(&mut line).await?;
let resp: Response = serde_json::from_str(line.trim())?;
Ok(resp)
}
fn state_label_for(meta: Option<&Meta>, s: &Status) -> String {
if matches!(s.state, State::Starting | State::Running) && !is_owner_alive_meta(meta) {
return "dead".into();
}
match s.state {
State::Starting => "starting".into(),
State::Running => "running".into(),
State::Exited => match s.exit_code {
Some(c) => format!("exit:{c}"),
None => "exited".into(),
},
State::Killed => "killed".into(),
}
}
fn is_owner_alive_meta(meta: Option<&Meta>) -> bool {
meta.map(|m| session::is_pid_alive(m.babysit_pid))
.unwrap_or(false)
}
fn is_owner_alive(meta: &Meta, s: &Status) -> bool {
if !matches!(s.state, State::Starting | State::Running) {
return false;
}
session::is_pid_alive(meta.babysit_pid)
}
fn format_age(then: DateTime<Utc>, now: DateTime<Utc>) -> String {
let secs = (now - then).num_seconds().max(0);
if secs < 60 {
format!("{secs}s")
} else if secs < 3600 {
format!("{}m", secs / 60)
} else if secs < 86400 {
format!("{}h", secs / 3600)
} else {
format!("{}d", secs / 86400)
}
}
#[cfg(test)]
mod tests {
use super::{escape_complete, grep_filter, key_to_bytes, safe_prefix_len};
use regex::Regex;
#[test]
fn key_named_sequences() {
assert_eq!(key_to_bytes("Enter").unwrap(), b"\r");
assert_eq!(key_to_bytes("up").unwrap(), b"\x1b[A");
assert_eq!(key_to_bytes("Esc").unwrap(), b"\x1b");
assert_eq!(key_to_bytes("F5").unwrap(), b"\x1b[15~");
assert!(key_to_bytes("nope").is_none());
}
#[test]
fn key_ctrl_combinations() {
assert_eq!(key_to_bytes("C-c").unwrap(), vec![0x03]);
assert_eq!(key_to_bytes("Ctrl-d").unwrap(), vec![0x04]);
assert_eq!(key_to_bytes("^a").unwrap(), vec![0x01]);
assert_eq!(key_to_bytes("C-C").unwrap(), vec![0x03]);
}
#[test]
fn grep_filter_keeps_matching_lines() {
let re = Regex::new("err").unwrap();
let out = grep_filter("ok\nerror here\nfine\nerr2\n".into(), Some(&re));
assert_eq!(out, "error here\nerr2\n");
}
#[test]
fn grep_filter_none_is_passthrough() {
assert_eq!(grep_filter("a\nb\n".into(), None), "a\nb\n");
}
#[test]
fn safe_prefix_holds_back_truncated_utf8() {
let full = "héllo".as_bytes();
let cut = full.len() - 4; let bytes = &full[..cut];
let safe = safe_prefix_len(bytes);
assert_eq!(safe, 1);
assert_eq!(&bytes[..safe], b"h");
}
#[test]
fn safe_prefix_keeps_complete_utf8() {
let bytes = "héllo".as_bytes();
assert_eq!(safe_prefix_len(bytes), bytes.len());
}
#[test]
fn safe_prefix_passes_invalid_bytes_through() {
let bytes = b"ab\xffcd";
assert_eq!(safe_prefix_len(bytes), bytes.len());
}
#[test]
fn safe_prefix_holds_back_partial_ansi() {
let bytes = b"hi\x1b[31";
assert_eq!(safe_prefix_len(bytes), 2);
let done = b"hi\x1b[31m";
assert_eq!(safe_prefix_len(done), done.len());
}
#[test]
fn escape_complete_classifies_common_forms() {
assert!(!escape_complete(b"\x1b")); assert!(!escape_complete(b"\x1b[31")); assert!(escape_complete(b"\x1b[31m")); assert!(!escape_complete(b"\x1b]0;title")); assert!(escape_complete(b"\x1b]0;title\x07")); assert!(!escape_complete(b"\x1bO")); assert!(escape_complete(b"\x1bOP")); }
}