stynx_code_tools/infrastructure/
persistent_shell.rs1use std::collections::HashMap;
2use std::process::Stdio;
3use std::sync::Arc;
4use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
5
6use stynx_code_errors::{AppError, AppResult};
7use tokio::io::{AsyncReadExt, AsyncWriteExt};
8use tokio::process::{Child, ChildStdin, Command};
9use tokio::sync::Mutex;
10use tokio::sync::mpsc;
11
12const MAX_BG_OUTPUT: usize = 1_000_000;
13const READ_CHUNK: usize = 8192;
14
15pub struct ShellRegistry {
16 persistent: Mutex<Option<PersistentShell>>,
17 background: Mutex<HashMap<String, BgProcess>>,
18 next_bg_id: Mutex<u64>,
19}
20
21struct PersistentShell {
22 stdin: ChildStdin,
23 out_rx: mpsc::UnboundedReceiver<Vec<u8>>,
24 child: Child,
25 leftover: Vec<u8>,
26}
27
28struct BgProcess {
29 cmd: String,
30 started_at: Instant,
31 output: Arc<Mutex<Vec<u8>>>,
32 exit: Arc<Mutex<Option<i32>>>,
33 child: Arc<Mutex<Option<Child>>>,
34 last_read_pos: Arc<Mutex<usize>>,
35}
36
37impl ShellRegistry {
38 pub fn new() -> Self {
39 Self {
40 persistent: Mutex::new(None),
41 background: Mutex::new(HashMap::new()),
42 next_bg_id: Mutex::new(1),
43 }
44 }
45
46 pub async fn run_sync(&self, command: &str, timeout: Option<Duration>) -> AppResult<String> {
47 if let Some(reason) = detect_interactive(command) {
48 return Err(AppError::Tool(format!(
49 "{reason}\n\
50 The persistent shell has no TTY — interactive prompts will deadlock and corrupt the TUI. \
51 Options: (a) use non-interactive flags (ssh: -o BatchMode=yes with key auth; sudo: -n with passwordless sudoers); \
52 (b) pre-share the secret via env / config; (c) suggest the user runs `! <command>` themselves to handle the prompt interactively."
53 )));
54 }
55 let mut guard = self.persistent.lock().await;
56 let need_spawn = match guard.as_mut() {
57 None => true,
58 Some(s) => s.is_dead(),
59 };
60 if need_spawn {
61 *guard = Some(PersistentShell::spawn().await?);
62 }
63 let result = guard.as_mut().unwrap().run(command, timeout).await;
64 if let Err(AppError::Tool(ref msg)) = result {
65 if msg.starts_with("command timed out") {
66 tracing::warn!(command = %command, "bash command timed out, resetting persistent shell");
67 *guard = None;
68 }
69 }
70 result
71 }
72
73 pub async fn run_background(&self, command: &str) -> AppResult<String> {
74 let handle = {
75 let mut id_guard = self.next_bg_id.lock().await;
76 let id = *id_guard;
77 *id_guard += 1;
78 format!("bg{id}")
79 };
80
81 let mut child = Command::new("bash")
82 .arg("-c")
83 .arg(command)
84 .stdin(Stdio::null())
85 .stdout(Stdio::piped())
86 .stderr(Stdio::piped())
87 .spawn()
88 .map_err(|e| AppError::Tool(format!("bg spawn failed: {e}")))?;
89
90 let stdout = child.stdout.take().unwrap();
91 let stderr = child.stderr.take().unwrap();
92 let output = Arc::new(Mutex::new(Vec::<u8>::new()));
93 let exit = Arc::new(Mutex::new(None::<i32>));
94 let child_arc = Arc::new(Mutex::new(Some(child)));
95
96 spawn_bg_reader(stdout, output.clone());
97 spawn_bg_reader(stderr, output.clone());
98
99 let exit_clone = exit.clone();
100 let child_clone = child_arc.clone();
101 tokio::spawn(async move {
102 let code = {
103 let mut g = child_clone.lock().await;
104 match g.as_mut() {
105 Some(c) => c.wait().await.ok().and_then(|s| s.code()),
106 None => None,
107 }
108 };
109 *exit_clone.lock().await = code;
110 });
111
112 let bg = BgProcess {
113 cmd: command.to_string(),
114 started_at: Instant::now(),
115 output,
116 exit,
117 child: child_arc,
118 last_read_pos: Arc::new(Mutex::new(0)),
119 };
120 self.background.lock().await.insert(handle.clone(), bg);
121 Ok(handle)
122 }
123
124 pub async fn read_background(&self, handle: &str, full: bool) -> AppResult<String> {
125 let guard = self.background.lock().await;
126 let bg = guard
127 .get(handle)
128 .ok_or_else(|| AppError::Tool(format!("no background process '{handle}'")))?;
129 let output = bg.output.lock().await;
130 let exit = *bg.exit.lock().await;
131 let mut pos_guard = bg.last_read_pos.lock().await;
132 let start = if full { 0 } else { (*pos_guard).min(output.len()) };
133 let text = String::from_utf8_lossy(&output[start..]).into_owned();
134 *pos_guard = output.len();
135
136 let elapsed = bg.started_at.elapsed().as_secs();
137 let status = match exit {
138 Some(c) if c == 0 => format!("[exit 0, ran {elapsed}s]"),
139 Some(c) => format!("[exit {c}, ran {elapsed}s]"),
140 None => format!("[running, {elapsed}s]"),
141 };
142 if text.is_empty() {
143 Ok(format!("{status} (no new output)"))
144 } else {
145 Ok(format!("{status}\n{text}"))
146 }
147 }
148
149 pub async fn kill_background(&self, handle: &str) -> AppResult<String> {
150 let guard = self.background.lock().await;
151 let bg = guard
152 .get(handle)
153 .ok_or_else(|| AppError::Tool(format!("no background process '{handle}'")))?;
154 let mut child_guard = bg.child.lock().await;
155 if let Some(c) = child_guard.as_mut() {
156 let _ = c.start_kill();
157 }
158 Ok(format!("killed {handle}"))
159 }
160
161 pub async fn list_background(&self) -> String {
162 let guard = self.background.lock().await;
163 if guard.is_empty() {
164 return "no background processes".into();
165 }
166 let mut out = String::from("background processes:\n");
167 for (k, v) in guard.iter() {
168 let exit = *v.exit.lock().await;
169 let elapsed = v.started_at.elapsed().as_secs();
170 let state = match exit {
171 Some(c) => format!("exit {c}"),
172 None => "running".into(),
173 };
174 let cmd = first_line(&v.cmd);
175 let cmd = if cmd.len() > 80 { format!("{}…", &cmd[..79]) } else { cmd.to_string() };
176 out.push_str(&format!(" {k:<6} [{state:<10}] {elapsed}s $ {cmd}\n"));
177 }
178 out
179 }
180}
181
182fn spawn_bg_reader<R>(mut r: R, output: Arc<Mutex<Vec<u8>>>)
183where
184 R: AsyncReadExt + Unpin + Send + 'static,
185{
186 tokio::spawn(async move {
187 let mut buf = vec![0u8; READ_CHUNK];
188 loop {
189 match r.read(&mut buf).await {
190 Ok(0) => break,
191 Ok(n) => {
192 let mut g = output.lock().await;
193 g.extend_from_slice(&buf[..n]);
194 if g.len() > MAX_BG_OUTPUT {
195 let excess = g.len() - MAX_BG_OUTPUT;
196 g.drain(..excess);
197 }
198 }
199 Err(_) => break,
200 }
201 }
202 });
203}
204
205impl PersistentShell {
206 async fn spawn() -> AppResult<Self> {
207 let mut child = Command::new("bash")
208 .args(["--norc", "--noprofile"])
209 .env("PS1", "")
210 .env("PS2", "")
211 .stdin(Stdio::piped())
212 .stdout(Stdio::piped())
213 .stderr(Stdio::piped())
214 .kill_on_drop(true)
215 .spawn()
216 .map_err(|e| AppError::Tool(format!("failed to spawn persistent shell: {e}")))?;
217
218 let stdin = child.stdin.take().unwrap();
219 let stdout = child.stdout.take().unwrap();
220 let stderr = child.stderr.take().unwrap();
221
222 let (tx, out_rx) = mpsc::unbounded_channel::<Vec<u8>>();
223 Self::spawn_reader(stdout, tx.clone());
224 Self::spawn_reader(stderr, tx);
225
226 Ok(Self { stdin, out_rx, child, leftover: Vec::new() })
227 }
228
229 fn spawn_reader<R>(mut r: R, tx: mpsc::UnboundedSender<Vec<u8>>)
230 where
231 R: AsyncReadExt + Unpin + Send + 'static,
232 {
233 tokio::spawn(async move {
234 let mut buf = vec![0u8; READ_CHUNK];
235 loop {
236 match r.read(&mut buf).await {
237 Ok(0) => break,
238 Ok(n) => {
239 if tx.send(buf[..n].to_vec()).is_err() {
240 return;
241 }
242 }
243 Err(_) => break,
244 }
245 }
246 });
247 }
248
249 fn is_dead(&mut self) -> bool {
250 matches!(self.child.try_wait(), Ok(Some(_)))
251 }
252
253 async fn run(&mut self, command: &str, timeout: Option<Duration>) -> AppResult<String> {
254 let nonce = nonce_hex();
255 let marker = format!("__STYNX_DONE_{nonce}__");
256
257 let payload = format!(
258 "{{\n{command}\n}} 2>&1\nprintf '\\n%s:%d\\n' '{marker}' $?\n"
259 );
260 self.stdin
261 .write_all(payload.as_bytes())
262 .await
263 .map_err(|e| AppError::Tool(format!("shell write failed: {e}")))?;
264 self.stdin.flush().await.ok();
265
266 let needle = format!("\n{marker}:");
267 let needle_bytes = needle.as_bytes();
268
269 let mut buf = std::mem::take(&mut self.leftover);
270 let deadline = timeout.map(|d| Instant::now() + d);
271
272 loop {
273 while let Ok(chunk) = self.out_rx.try_recv() {
274 buf.extend_from_slice(&chunk);
275 }
276
277 if let Some(pos) = find_subslice(&buf, needle_bytes) {
278 let after = pos + needle_bytes.len();
279 if let Some(nl_rel) = buf[after..].iter().position(|&b| b == b'\n') {
280 let nl_abs = after + nl_rel;
281 let exit_slice = &buf[after..nl_abs];
282 let exit_code = std::str::from_utf8(exit_slice)
283 .ok()
284 .and_then(|s| s.parse::<i32>().ok())
285 .unwrap_or(-1);
286 let output_bytes = &buf[..pos];
287 let text = String::from_utf8_lossy(output_bytes).into_owned();
288 self.leftover = buf[nl_abs + 1..].to_vec();
289 if exit_code == 0 {
290 return Ok(if text.is_empty() { "(no output)".into() } else { text });
291 }
292 return Ok(format!("{text}\n[exit {exit_code}]"));
293 }
294 }
295
296 if let Some(d) = deadline {
297 if Instant::now() >= d {
298 self.leftover = buf;
299 let secs = timeout.map(|t| t.as_secs()).unwrap_or(0);
300 return Err(AppError::Tool(format!(
301 "command timed out after {secs}s. \
302the persistent shell is now in an unknown state — consider running \
303the command with background:true if it's a long-running process."
304 )));
305 }
306 }
307
308 match tokio::time::timeout(Duration::from_millis(40), self.out_rx.recv()).await {
309 Ok(Some(chunk)) => buf.extend_from_slice(&chunk),
310 Ok(None) => {
311 self.leftover = buf;
312 return Err(AppError::Tool(
313 "persistent shell stdout closed unexpectedly".into(),
314 ));
315 }
316 Err(_) => {}
317 }
318 }
319 }
320}
321
322fn detect_interactive(command: &str) -> Option<String> {
323 let trimmed = command.trim();
324 let head = trimmed.split('|').next().unwrap_or("").trim();
325 let mut tokens = head.split_whitespace();
326 let Some(first) = tokens.next() else { return None; };
327 let rest: Vec<&str> = tokens.collect();
328
329 match first {
330 "ssh" => {
331 let has_batchmode = rest.windows(2).any(|w| w[0] == "-o" && w[1].eq_ignore_ascii_case("BatchMode=yes"));
332 let has_i_arg = rest.iter().any(|t| *t == "-i" || t.starts_with("-i") && t.len() > 2);
333 if !has_batchmode && !has_i_arg {
334 return Some("blocked: `ssh` without key auth or `-o BatchMode=yes` will hit a password prompt.".into());
335 }
336 }
337 "scp" | "sftp" | "rsync" => {
338 let has_batchmode = rest.windows(2).any(|w| w[0] == "-o" && w[1].eq_ignore_ascii_case("BatchMode=yes"));
339 if !has_batchmode && !rest.iter().any(|t| *t == "-i") {
340 return Some(format!("blocked: `{first}` will prompt for a password without key auth or `-o BatchMode=yes`."));
341 }
342 }
343 "sudo" => {
344 let has_n = rest.iter().any(|t| *t == "-n" || *t == "--non-interactive");
345 if !has_n {
346 return Some("blocked: `sudo` without `-n` will prompt for a password.".into());
347 }
348 }
349 "passwd" | "su" | "login" => {
350 return Some(format!("blocked: `{first}` is interactive and cannot run in the persistent shell."));
351 }
352 "vim" | "vi" | "nvim" | "nano" | "emacs" | "less" | "more" | "top" | "htop" | "btop" | "watch" | "tmux" | "screen" | "man" => {
353 return Some(format!("blocked: `{first}` is a TUI/curses program and will deadlock the persistent shell."));
354 }
355 "mysql" | "psql" | "sqlite3" | "redis-cli" | "mongo" | "mongosh" => {
356 let has_command_flag = rest.iter().any(|t| matches!(*t, "-c" | "--command" | "-e" | "--execute"));
357 if !has_command_flag {
358 return Some(format!("blocked: `{first}` without `-c`/`-e` opens an interactive REPL that will deadlock."));
359 }
360 }
361 "gpg" | "ssh-keygen" | "ssh-add" => {
362 let has_batch = rest.iter().any(|t| *t == "--batch" || *t == "--pinentry-mode=loopback" || *t == "-N");
363 if !has_batch {
364 return Some(format!("blocked: `{first}` may prompt for a passphrase — pass `--batch` or `-N \"\"` if you know what you're doing."));
365 }
366 }
367 _ => {}
368 }
369 None
370}
371
372fn find_subslice(hay: &[u8], needle: &[u8]) -> Option<usize> {
373 if needle.is_empty() || hay.len() < needle.len() {
374 return None;
375 }
376 hay.windows(needle.len()).position(|w| w == needle)
377}
378
379fn nonce_hex() -> String {
380
381 let now = SystemTime::now()
382 .duration_since(UNIX_EPOCH)
383 .map(|d| d.as_nanos())
384 .unwrap_or(0);
385 let pid = std::process::id() as u128;
386 format!("{:032x}", now ^ (pid << 96))
387}
388
389fn first_line(s: &str) -> &str {
390 s.lines().next().unwrap_or(s)
391}