stynx-code-tools 3.12.1

Tool implementations for bash and file operations
Documentation
use std::collections::HashMap;
use std::process::Stdio;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use stynx_code_errors::{AppError, AppResult};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::{Child, ChildStdin, Command};
use tokio::sync::Mutex;
use tokio::sync::mpsc;

const MAX_BG_OUTPUT: usize = 1_000_000;
const READ_CHUNK: usize = 8192;

pub struct ShellRegistry {
    persistent: Mutex<Option<PersistentShell>>,
    background: Mutex<HashMap<String, BgProcess>>,
    next_bg_id: Mutex<u64>,
}

struct PersistentShell {
    stdin: ChildStdin,
    out_rx: mpsc::UnboundedReceiver<Vec<u8>>,
    child: Child,
    leftover: Vec<u8>,
}

struct BgProcess {
    cmd: String,
    started_at: Instant,
    output: Arc<Mutex<Vec<u8>>>,
    exit: Arc<Mutex<Option<i32>>>,
    child: Arc<Mutex<Option<Child>>>,
    last_read_pos: Arc<Mutex<usize>>,
}

impl ShellRegistry {
    pub fn new() -> Self {
        Self {
            persistent: Mutex::new(None),
            background: Mutex::new(HashMap::new()),
            next_bg_id: Mutex::new(1),
        }
    }

    pub async fn run_sync(&self, command: &str, timeout: Option<Duration>) -> AppResult<String> {
        if let Some(reason) = detect_interactive(command) {
            return Err(AppError::Tool(format!(
                "{reason}\n\
                The persistent shell has no TTY — interactive prompts will deadlock and corrupt the TUI. \
                Options: (a) use non-interactive flags (ssh: -o BatchMode=yes with key auth; sudo: -n with passwordless sudoers); \
                (b) pre-share the secret via env / config; (c) suggest the user runs `! <command>` themselves to handle the prompt interactively."
            )));
        }
        let mut guard = self.persistent.lock().await;
        let need_spawn = match guard.as_mut() {
            None => true,
            Some(s) => s.is_dead(),
        };
        if need_spawn {
            *guard = Some(PersistentShell::spawn().await?);
        }
        let result = guard.as_mut().unwrap().run(command, timeout).await;
        if let Err(AppError::Tool(ref msg)) = result {
            if msg.starts_with("command timed out") {
                tracing::warn!(command = %command, "bash command timed out, resetting persistent shell");
                *guard = None;
            }
        }
        result
    }

    pub async fn run_background(&self, command: &str) -> AppResult<String> {
        let handle = {
            let mut id_guard = self.next_bg_id.lock().await;
            let id = *id_guard;
            *id_guard += 1;
            format!("bg{id}")
        };

        let mut child = Command::new("bash")
            .arg("-c")
            .arg(command)
            .stdin(Stdio::null())
            .stdout(Stdio::piped())
            .stderr(Stdio::piped())
            .spawn()
            .map_err(|e| AppError::Tool(format!("bg spawn failed: {e}")))?;

        let stdout = child.stdout.take().unwrap();
        let stderr = child.stderr.take().unwrap();
        let output = Arc::new(Mutex::new(Vec::<u8>::new()));
        let exit = Arc::new(Mutex::new(None::<i32>));
        let child_arc = Arc::new(Mutex::new(Some(child)));

        spawn_bg_reader(stdout, output.clone());
        spawn_bg_reader(stderr, output.clone());

        let exit_clone = exit.clone();
        let child_clone = child_arc.clone();
        tokio::spawn(async move {
            let code = {
                let mut g = child_clone.lock().await;
                match g.as_mut() {
                    Some(c) => c.wait().await.ok().and_then(|s| s.code()),
                    None => None,
                }
            };
            *exit_clone.lock().await = code;
        });

        let bg = BgProcess {
            cmd: command.to_string(),
            started_at: Instant::now(),
            output,
            exit,
            child: child_arc,
            last_read_pos: Arc::new(Mutex::new(0)),
        };
        self.background.lock().await.insert(handle.clone(), bg);
        Ok(handle)
    }

    pub async fn read_background(&self, handle: &str, full: bool) -> AppResult<String> {
        let guard = self.background.lock().await;
        let bg = guard
            .get(handle)
            .ok_or_else(|| AppError::Tool(format!("no background process '{handle}'")))?;
        let output = bg.output.lock().await;
        let exit = *bg.exit.lock().await;
        let mut pos_guard = bg.last_read_pos.lock().await;
        let start = if full { 0 } else { (*pos_guard).min(output.len()) };
        let text = String::from_utf8_lossy(&output[start..]).into_owned();
        *pos_guard = output.len();

        let elapsed = bg.started_at.elapsed().as_secs();
        let status = match exit {
            Some(c) if c == 0 => format!("[exit 0, ran {elapsed}s]"),
            Some(c) => format!("[exit {c}, ran {elapsed}s]"),
            None => format!("[running, {elapsed}s]"),
        };
        if text.is_empty() {
            Ok(format!("{status}  (no new output)"))
        } else {
            Ok(format!("{status}\n{text}"))
        }
    }

    pub async fn kill_background(&self, handle: &str) -> AppResult<String> {
        let guard = self.background.lock().await;
        let bg = guard
            .get(handle)
            .ok_or_else(|| AppError::Tool(format!("no background process '{handle}'")))?;
        let mut child_guard = bg.child.lock().await;
        if let Some(c) = child_guard.as_mut() {
            let _ = c.start_kill();
        }
        Ok(format!("killed {handle}"))
    }

    pub async fn list_background(&self) -> String {
        let guard = self.background.lock().await;
        if guard.is_empty() {
            return "no background processes".into();
        }
        let mut out = String::from("background processes:\n");
        for (k, v) in guard.iter() {
            let exit = *v.exit.lock().await;
            let elapsed = v.started_at.elapsed().as_secs();
            let state = match exit {
                Some(c) => format!("exit {c}"),
                None => "running".into(),
            };
            let cmd = first_line(&v.cmd);
            let cmd = if cmd.len() > 80 { format!("{}", &cmd[..79]) } else { cmd.to_string() };
            out.push_str(&format!("  {k:<6} [{state:<10}] {elapsed}s  $ {cmd}\n"));
        }
        out
    }
}

fn spawn_bg_reader<R>(mut r: R, output: Arc<Mutex<Vec<u8>>>)
where
    R: AsyncReadExt + Unpin + Send + 'static,
{
    tokio::spawn(async move {
        let mut buf = vec![0u8; READ_CHUNK];
        loop {
            match r.read(&mut buf).await {
                Ok(0) => break,
                Ok(n) => {
                    let mut g = output.lock().await;
                    g.extend_from_slice(&buf[..n]);
                    if g.len() > MAX_BG_OUTPUT {
                        let excess = g.len() - MAX_BG_OUTPUT;
                        g.drain(..excess);
                    }
                }
                Err(_) => break,
            }
        }
    });
}

impl PersistentShell {
    async fn spawn() -> AppResult<Self> {
        let mut child = Command::new("bash")
            .args(["--norc", "--noprofile"])
            .env("PS1", "")
            .env("PS2", "")
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .stderr(Stdio::piped())
            .kill_on_drop(true)
            .spawn()
            .map_err(|e| AppError::Tool(format!("failed to spawn persistent shell: {e}")))?;

        let stdin = child.stdin.take().unwrap();
        let stdout = child.stdout.take().unwrap();
        let stderr = child.stderr.take().unwrap();

        let (tx, out_rx) = mpsc::unbounded_channel::<Vec<u8>>();
        Self::spawn_reader(stdout, tx.clone());
        Self::spawn_reader(stderr, tx);

        Ok(Self { stdin, out_rx, child, leftover: Vec::new() })
    }

    fn spawn_reader<R>(mut r: R, tx: mpsc::UnboundedSender<Vec<u8>>)
    where
        R: AsyncReadExt + Unpin + Send + 'static,
    {
        tokio::spawn(async move {
            let mut buf = vec![0u8; READ_CHUNK];
            loop {
                match r.read(&mut buf).await {
                    Ok(0) => break,
                    Ok(n) => {
                        if tx.send(buf[..n].to_vec()).is_err() {
                            return;
                        }
                    }
                    Err(_) => break,
                }
            }
        });
    }

    fn is_dead(&mut self) -> bool {
        matches!(self.child.try_wait(), Ok(Some(_)))
    }

    async fn run(&mut self, command: &str, timeout: Option<Duration>) -> AppResult<String> {
        let nonce = nonce_hex();
        let marker = format!("__STYNX_DONE_{nonce}__");

        let payload = format!(
            "{{\n{command}\n}} 2>&1\nprintf '\\n%s:%d\\n' '{marker}' $?\n"
        );
        self.stdin
            .write_all(payload.as_bytes())
            .await
            .map_err(|e| AppError::Tool(format!("shell write failed: {e}")))?;
        self.stdin.flush().await.ok();

        let needle = format!("\n{marker}:");
        let needle_bytes = needle.as_bytes();

        let mut buf = std::mem::take(&mut self.leftover);
        let deadline = timeout.map(|d| Instant::now() + d);

        loop {
            while let Ok(chunk) = self.out_rx.try_recv() {
                buf.extend_from_slice(&chunk);
            }

            if let Some(pos) = find_subslice(&buf, needle_bytes) {
                let after = pos + needle_bytes.len();
                if let Some(nl_rel) = buf[after..].iter().position(|&b| b == b'\n') {
                    let nl_abs = after + nl_rel;
                    let exit_slice = &buf[after..nl_abs];
                    let exit_code = std::str::from_utf8(exit_slice)
                        .ok()
                        .and_then(|s| s.parse::<i32>().ok())
                        .unwrap_or(-1);
                    let output_bytes = &buf[..pos];
                    let text = String::from_utf8_lossy(output_bytes).into_owned();
                    self.leftover = buf[nl_abs + 1..].to_vec();
                    if exit_code == 0 {
                        return Ok(if text.is_empty() { "(no output)".into() } else { text });
                    }
                    return Ok(format!("{text}\n[exit {exit_code}]"));
                }
            }

            if let Some(d) = deadline {
                if Instant::now() >= d {
                    self.leftover = buf;
                    let secs = timeout.map(|t| t.as_secs()).unwrap_or(0);
                    return Err(AppError::Tool(format!(
                        "command timed out after {secs}s. \
the persistent shell is now in an unknown state — consider running \
the command with background:true if it's a long-running process."
                    )));
                }
            }

            match tokio::time::timeout(Duration::from_millis(40), self.out_rx.recv()).await {
                Ok(Some(chunk)) => buf.extend_from_slice(&chunk),
                Ok(None) => {
                    self.leftover = buf;
                    return Err(AppError::Tool(
                        "persistent shell stdout closed unexpectedly".into(),
                    ));
                }
                Err(_) => {}
            }
        }
    }
}

fn detect_interactive(command: &str) -> Option<String> {
    let trimmed = command.trim();
    let head = trimmed.split('|').next().unwrap_or("").trim();
    let mut tokens = head.split_whitespace();
    let Some(first) = tokens.next() else { return None; };
    let rest: Vec<&str> = tokens.collect();

    match first {
        "ssh" => {
            let has_batchmode = rest.windows(2).any(|w| w[0] == "-o" && w[1].eq_ignore_ascii_case("BatchMode=yes"));
            let has_i_arg = rest.iter().any(|t| *t == "-i" || t.starts_with("-i") && t.len() > 2);
            if !has_batchmode && !has_i_arg {
                return Some("blocked: `ssh` without key auth or `-o BatchMode=yes` will hit a password prompt.".into());
            }
        }
        "scp" | "sftp" | "rsync" => {
            let has_batchmode = rest.windows(2).any(|w| w[0] == "-o" && w[1].eq_ignore_ascii_case("BatchMode=yes"));
            if !has_batchmode && !rest.iter().any(|t| *t == "-i") {
                return Some(format!("blocked: `{first}` will prompt for a password without key auth or `-o BatchMode=yes`."));
            }
        }
        "sudo" => {
            let has_n = rest.iter().any(|t| *t == "-n" || *t == "--non-interactive");
            if !has_n {
                return Some("blocked: `sudo` without `-n` will prompt for a password.".into());
            }
        }
        "passwd" | "su" | "login" => {
            return Some(format!("blocked: `{first}` is interactive and cannot run in the persistent shell."));
        }
        "vim" | "vi" | "nvim" | "nano" | "emacs" | "less" | "more" | "top" | "htop" | "btop" | "watch" | "tmux" | "screen" | "man" => {
            return Some(format!("blocked: `{first}` is a TUI/curses program and will deadlock the persistent shell."));
        }
        "mysql" | "psql" | "sqlite3" | "redis-cli" | "mongo" | "mongosh" => {
            let has_command_flag = rest.iter().any(|t| matches!(*t, "-c" | "--command" | "-e" | "--execute"));
            if !has_command_flag {
                return Some(format!("blocked: `{first}` without `-c`/`-e` opens an interactive REPL that will deadlock."));
            }
        }
        "gpg" | "ssh-keygen" | "ssh-add" => {
            let has_batch = rest.iter().any(|t| *t == "--batch" || *t == "--pinentry-mode=loopback" || *t == "-N");
            if !has_batch {
                return Some(format!("blocked: `{first}` may prompt for a passphrase — pass `--batch` or `-N \"\"` if you know what you're doing."));
            }
        }
        _ => {}
    }
    None
}

fn find_subslice(hay: &[u8], needle: &[u8]) -> Option<usize> {
    if needle.is_empty() || hay.len() < needle.len() {
        return None;
    }
    hay.windows(needle.len()).position(|w| w == needle)
}

fn nonce_hex() -> String {

    let now = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|d| d.as_nanos())
        .unwrap_or(0);
    let pid = std::process::id() as u128;
    format!("{:032x}", now ^ (pid << 96))
}

fn first_line(s: &str) -> &str {
    s.lines().next().unwrap_or(s)
}