Skip to main content

stynx_code_tools/infrastructure/
persistent_shell.rs

1use std::collections::HashMap;
2use std::process::Stdio;
3use std::sync::Arc;
4use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
5
6use stynx_code_errors::{AppError, AppResult};
7use tokio::io::{AsyncReadExt, AsyncWriteExt};
8use tokio::process::{Child, ChildStdin, Command};
9use tokio::sync::Mutex;
10use tokio::sync::mpsc;
11
12const MAX_BG_OUTPUT: usize = 1_000_000;
13const READ_CHUNK: usize = 8192;
14
15pub struct ShellRegistry {
16    persistent: Mutex<Option<PersistentShell>>,
17    background: Mutex<HashMap<String, BgProcess>>,
18    next_bg_id: Mutex<u64>,
19}
20
21struct PersistentShell {
22    stdin: ChildStdin,
23    out_rx: mpsc::UnboundedReceiver<Vec<u8>>,
24    child: Child,
25    leftover: Vec<u8>,
26}
27
28struct BgProcess {
29    cmd: String,
30    started_at: Instant,
31    output: Arc<Mutex<Vec<u8>>>,
32    exit: Arc<Mutex<Option<i32>>>,
33    child: Arc<Mutex<Option<Child>>>,
34    last_read_pos: Arc<Mutex<usize>>,
35}
36
37impl ShellRegistry {
38    pub fn new() -> Self {
39        Self {
40            persistent: Mutex::new(None),
41            background: Mutex::new(HashMap::new()),
42            next_bg_id: Mutex::new(1),
43        }
44    }
45
46    pub async fn run_sync(&self, command: &str, timeout: Option<Duration>) -> AppResult<String> {
47        if let Some(reason) = detect_interactive(command) {
48            return Err(AppError::Tool(format!(
49                "{reason}\n\
50                The persistent shell has no TTY — interactive prompts will deadlock and corrupt the TUI. \
51                Options: (a) use non-interactive flags (ssh: -o BatchMode=yes with key auth; sudo: -n with passwordless sudoers); \
52                (b) pre-share the secret via env / config; (c) suggest the user runs `! <command>` themselves to handle the prompt interactively."
53            )));
54        }
55        let mut guard = self.persistent.lock().await;
56        let need_spawn = match guard.as_mut() {
57            None => true,
58            Some(s) => s.is_dead(),
59        };
60        if need_spawn {
61            *guard = Some(PersistentShell::spawn().await?);
62        }
63        let result = guard.as_mut().unwrap().run(command, timeout).await;
64        if let Err(AppError::Tool(ref msg)) = result {
65            if msg.starts_with("command timed out") {
66                tracing::warn!(command = %command, "bash command timed out, resetting persistent shell");
67                *guard = None;
68            }
69        }
70        result
71    }
72
73    pub async fn run_background(&self, command: &str) -> AppResult<String> {
74        let handle = {
75            let mut id_guard = self.next_bg_id.lock().await;
76            let id = *id_guard;
77            *id_guard += 1;
78            format!("bg{id}")
79        };
80
81        let mut child = Command::new("bash")
82            .arg("-c")
83            .arg(command)
84            .stdin(Stdio::null())
85            .stdout(Stdio::piped())
86            .stderr(Stdio::piped())
87            .spawn()
88            .map_err(|e| AppError::Tool(format!("bg spawn failed: {e}")))?;
89
90        let stdout = child.stdout.take().unwrap();
91        let stderr = child.stderr.take().unwrap();
92        let output = Arc::new(Mutex::new(Vec::<u8>::new()));
93        let exit = Arc::new(Mutex::new(None::<i32>));
94        let child_arc = Arc::new(Mutex::new(Some(child)));
95
96        spawn_bg_reader(stdout, output.clone());
97        spawn_bg_reader(stderr, output.clone());
98
99        let exit_clone = exit.clone();
100        let child_clone = child_arc.clone();
101        tokio::spawn(async move {
102            let code = {
103                let mut g = child_clone.lock().await;
104                match g.as_mut() {
105                    Some(c) => c.wait().await.ok().and_then(|s| s.code()),
106                    None => None,
107                }
108            };
109            *exit_clone.lock().await = code;
110        });
111
112        let bg = BgProcess {
113            cmd: command.to_string(),
114            started_at: Instant::now(),
115            output,
116            exit,
117            child: child_arc,
118            last_read_pos: Arc::new(Mutex::new(0)),
119        };
120        self.background.lock().await.insert(handle.clone(), bg);
121        Ok(handle)
122    }
123
124    pub async fn read_background(&self, handle: &str, full: bool) -> AppResult<String> {
125        let guard = self.background.lock().await;
126        let bg = guard
127            .get(handle)
128            .ok_or_else(|| AppError::Tool(format!("no background process '{handle}'")))?;
129        let output = bg.output.lock().await;
130        let exit = *bg.exit.lock().await;
131        let mut pos_guard = bg.last_read_pos.lock().await;
132        let start = if full { 0 } else { (*pos_guard).min(output.len()) };
133        let text = String::from_utf8_lossy(&output[start..]).into_owned();
134        *pos_guard = output.len();
135
136        let elapsed = bg.started_at.elapsed().as_secs();
137        let status = match exit {
138            Some(c) if c == 0 => format!("[exit 0, ran {elapsed}s]"),
139            Some(c) => format!("[exit {c}, ran {elapsed}s]"),
140            None => format!("[running, {elapsed}s]"),
141        };
142        if text.is_empty() {
143            Ok(format!("{status}  (no new output)"))
144        } else {
145            Ok(format!("{status}\n{text}"))
146        }
147    }
148
149    pub async fn kill_background(&self, handle: &str) -> AppResult<String> {
150        let guard = self.background.lock().await;
151        let bg = guard
152            .get(handle)
153            .ok_or_else(|| AppError::Tool(format!("no background process '{handle}'")))?;
154        let mut child_guard = bg.child.lock().await;
155        if let Some(c) = child_guard.as_mut() {
156            let _ = c.start_kill();
157        }
158        Ok(format!("killed {handle}"))
159    }
160
161    pub async fn list_background(&self) -> String {
162        let guard = self.background.lock().await;
163        if guard.is_empty() {
164            return "no background processes".into();
165        }
166        let mut out = String::from("background processes:\n");
167        for (k, v) in guard.iter() {
168            let exit = *v.exit.lock().await;
169            let elapsed = v.started_at.elapsed().as_secs();
170            let state = match exit {
171                Some(c) => format!("exit {c}"),
172                None => "running".into(),
173            };
174            let cmd = first_line(&v.cmd);
175            let cmd = if cmd.len() > 80 { format!("{}…", &cmd[..79]) } else { cmd.to_string() };
176            out.push_str(&format!("  {k:<6} [{state:<10}] {elapsed}s  $ {cmd}\n"));
177        }
178        out
179    }
180}
181
182fn spawn_bg_reader<R>(mut r: R, output: Arc<Mutex<Vec<u8>>>)
183where
184    R: AsyncReadExt + Unpin + Send + 'static,
185{
186    tokio::spawn(async move {
187        let mut buf = vec![0u8; READ_CHUNK];
188        loop {
189            match r.read(&mut buf).await {
190                Ok(0) => break,
191                Ok(n) => {
192                    let mut g = output.lock().await;
193                    g.extend_from_slice(&buf[..n]);
194                    if g.len() > MAX_BG_OUTPUT {
195                        let excess = g.len() - MAX_BG_OUTPUT;
196                        g.drain(..excess);
197                    }
198                }
199                Err(_) => break,
200            }
201        }
202    });
203}
204
205impl PersistentShell {
206    async fn spawn() -> AppResult<Self> {
207        let mut child = Command::new("bash")
208            .args(["--norc", "--noprofile"])
209            .env("PS1", "")
210            .env("PS2", "")
211            .stdin(Stdio::piped())
212            .stdout(Stdio::piped())
213            .stderr(Stdio::piped())
214            .kill_on_drop(true)
215            .spawn()
216            .map_err(|e| AppError::Tool(format!("failed to spawn persistent shell: {e}")))?;
217
218        let stdin = child.stdin.take().unwrap();
219        let stdout = child.stdout.take().unwrap();
220        let stderr = child.stderr.take().unwrap();
221
222        let (tx, out_rx) = mpsc::unbounded_channel::<Vec<u8>>();
223        Self::spawn_reader(stdout, tx.clone());
224        Self::spawn_reader(stderr, tx);
225
226        Ok(Self { stdin, out_rx, child, leftover: Vec::new() })
227    }
228
229    fn spawn_reader<R>(mut r: R, tx: mpsc::UnboundedSender<Vec<u8>>)
230    where
231        R: AsyncReadExt + Unpin + Send + 'static,
232    {
233        tokio::spawn(async move {
234            let mut buf = vec![0u8; READ_CHUNK];
235            loop {
236                match r.read(&mut buf).await {
237                    Ok(0) => break,
238                    Ok(n) => {
239                        if tx.send(buf[..n].to_vec()).is_err() {
240                            return;
241                        }
242                    }
243                    Err(_) => break,
244                }
245            }
246        });
247    }
248
249    fn is_dead(&mut self) -> bool {
250        matches!(self.child.try_wait(), Ok(Some(_)))
251    }
252
253    async fn run(&mut self, command: &str, timeout: Option<Duration>) -> AppResult<String> {
254        let nonce = nonce_hex();
255        let marker = format!("__STYNX_DONE_{nonce}__");
256
257        let payload = format!(
258            "{{\n{command}\n}} 2>&1\nprintf '\\n%s:%d\\n' '{marker}' $?\n"
259        );
260        self.stdin
261            .write_all(payload.as_bytes())
262            .await
263            .map_err(|e| AppError::Tool(format!("shell write failed: {e}")))?;
264        self.stdin.flush().await.ok();
265
266        let needle = format!("\n{marker}:");
267        let needle_bytes = needle.as_bytes();
268
269        let mut buf = std::mem::take(&mut self.leftover);
270        let deadline = timeout.map(|d| Instant::now() + d);
271
272        loop {
273            while let Ok(chunk) = self.out_rx.try_recv() {
274                buf.extend_from_slice(&chunk);
275            }
276
277            if let Some(pos) = find_subslice(&buf, needle_bytes) {
278                let after = pos + needle_bytes.len();
279                if let Some(nl_rel) = buf[after..].iter().position(|&b| b == b'\n') {
280                    let nl_abs = after + nl_rel;
281                    let exit_slice = &buf[after..nl_abs];
282                    let exit_code = std::str::from_utf8(exit_slice)
283                        .ok()
284                        .and_then(|s| s.parse::<i32>().ok())
285                        .unwrap_or(-1);
286                    let output_bytes = &buf[..pos];
287                    let text = String::from_utf8_lossy(output_bytes).into_owned();
288                    self.leftover = buf[nl_abs + 1..].to_vec();
289                    if exit_code == 0 {
290                        return Ok(if text.is_empty() { "(no output)".into() } else { text });
291                    }
292                    return Ok(format!("{text}\n[exit {exit_code}]"));
293                }
294            }
295
296            if let Some(d) = deadline {
297                if Instant::now() >= d {
298                    self.leftover = buf;
299                    let secs = timeout.map(|t| t.as_secs()).unwrap_or(0);
300                    return Err(AppError::Tool(format!(
301                        "command timed out after {secs}s. \
302the persistent shell is now in an unknown state — consider running \
303the command with background:true if it's a long-running process."
304                    )));
305                }
306            }
307
308            match tokio::time::timeout(Duration::from_millis(40), self.out_rx.recv()).await {
309                Ok(Some(chunk)) => buf.extend_from_slice(&chunk),
310                Ok(None) => {
311                    self.leftover = buf;
312                    return Err(AppError::Tool(
313                        "persistent shell stdout closed unexpectedly".into(),
314                    ));
315                }
316                Err(_) => {}
317            }
318        }
319    }
320}
321
322fn detect_interactive(command: &str) -> Option<String> {
323    let trimmed = command.trim();
324    let head = trimmed.split('|').next().unwrap_or("").trim();
325    let mut tokens = head.split_whitespace();
326    let Some(first) = tokens.next() else { return None; };
327    let rest: Vec<&str> = tokens.collect();
328
329    match first {
330        "ssh" => {
331            let has_batchmode = rest.windows(2).any(|w| w[0] == "-o" && w[1].eq_ignore_ascii_case("BatchMode=yes"));
332            let has_i_arg = rest.iter().any(|t| *t == "-i" || t.starts_with("-i") && t.len() > 2);
333            if !has_batchmode && !has_i_arg {
334                return Some("blocked: `ssh` without key auth or `-o BatchMode=yes` will hit a password prompt.".into());
335            }
336        }
337        "scp" | "sftp" | "rsync" => {
338            let has_batchmode = rest.windows(2).any(|w| w[0] == "-o" && w[1].eq_ignore_ascii_case("BatchMode=yes"));
339            if !has_batchmode && !rest.iter().any(|t| *t == "-i") {
340                return Some(format!("blocked: `{first}` will prompt for a password without key auth or `-o BatchMode=yes`."));
341            }
342        }
343        "sudo" => {
344            let has_n = rest.iter().any(|t| *t == "-n" || *t == "--non-interactive");
345            if !has_n {
346                return Some("blocked: `sudo` without `-n` will prompt for a password.".into());
347            }
348        }
349        "passwd" | "su" | "login" => {
350            return Some(format!("blocked: `{first}` is interactive and cannot run in the persistent shell."));
351        }
352        "vim" | "vi" | "nvim" | "nano" | "emacs" | "less" | "more" | "top" | "htop" | "btop" | "watch" | "tmux" | "screen" | "man" => {
353            return Some(format!("blocked: `{first}` is a TUI/curses program and will deadlock the persistent shell."));
354        }
355        "mysql" | "psql" | "sqlite3" | "redis-cli" | "mongo" | "mongosh" => {
356            let has_command_flag = rest.iter().any(|t| matches!(*t, "-c" | "--command" | "-e" | "--execute"));
357            if !has_command_flag {
358                return Some(format!("blocked: `{first}` without `-c`/`-e` opens an interactive REPL that will deadlock."));
359            }
360        }
361        "gpg" | "ssh-keygen" | "ssh-add" => {
362            let has_batch = rest.iter().any(|t| *t == "--batch" || *t == "--pinentry-mode=loopback" || *t == "-N");
363            if !has_batch {
364                return Some(format!("blocked: `{first}` may prompt for a passphrase — pass `--batch` or `-N \"\"` if you know what you're doing."));
365            }
366        }
367        _ => {}
368    }
369    None
370}
371
372fn find_subslice(hay: &[u8], needle: &[u8]) -> Option<usize> {
373    if needle.is_empty() || hay.len() < needle.len() {
374        return None;
375    }
376    hay.windows(needle.len()).position(|w| w == needle)
377}
378
379fn nonce_hex() -> String {
380
381    let now = SystemTime::now()
382        .duration_since(UNIX_EPOCH)
383        .map(|d| d.as_nanos())
384        .unwrap_or(0);
385    let pid = std::process::id() as u128;
386    format!("{:032x}", now ^ (pid << 96))
387}
388
389fn first_line(s: &str) -> &str {
390    s.lines().next().unwrap_or(s)
391}