1use hashbrown::HashMap;
2use std::io::Read;
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, mpsc};
5use std::thread;
6use std::time::{Duration, Instant};
7
8use anyhow::{Context, Result, anyhow};
9use chrono::Utc;
10use parking_lot::{Mutex, RwLock};
11use portable_pty::{CommandBuilder, PtySize, native_pty_system};
12use shell_words::join;
13use tokio::sync::Mutex as TokioMutex;
14use tracing::{debug, info, warn};
15
16use super::command_utils::{
17 is_long_running_command, is_long_running_command_string, is_sandbox_wrapper_program,
18 is_shell_program,
19};
20use super::manager_utils::{clamp_timeout, exit_status_code, set_command_environment};
21
22use super::screen_backend::PtyScreenState;
23use super::scrollback::PtyScrollback;
24use super::session::PtySessionHandle;
25use super::types::{PtyCommandRequest, PtyCommandResult};
26
27use hashbrown::hash_map::Entry;
28use once_cell::sync::Lazy;
29
30static WORKSPACE_COMMAND_LOCKS: Lazy<Mutex<HashMap<PathBuf, Arc<tokio::sync::Mutex<()>>>>> =
34 Lazy::new(|| Mutex::new(HashMap::new()));
35
36fn get_command_lock(workspace_root: &Path) -> Arc<tokio::sync::Mutex<()>> {
38 let mut locks = WORKSPACE_COMMAND_LOCKS.lock();
39 match locks.entry(workspace_root.to_path_buf()) {
40 Entry::Occupied(entry) => entry.get().clone(),
41 Entry::Vacant(entry) => {
42 let lock = Arc::new(tokio::sync::Mutex::new(()));
43 entry.insert(lock.clone());
44 lock
45 }
46 }
47}
48
49const THREAD_JOIN_GRACE_PERIOD_MS: u64 = 500;
51
52use crate::audit::PermissionAuditLog;
53use crate::config::{CommandsConfig, PtyConfig};
54use crate::telemetry::perf;
55use crate::tools::path_env;
56use crate::tools::shell::resolve_fallback_shell;
57use crate::tools::types::VTCodePtySession;
58use crate::utils::gatekeeper;
59use crate::utils::path::ensure_path_within_workspace;
60use crate::utils::unicode_monitor::UNICODE_MONITOR;
61
62mod session_ops;
63
64#[derive(Clone)]
65pub struct PtyManager {
66 workspace_root: PathBuf,
67 config: PtyConfig,
68 inner: Arc<PtyState>,
69 audit_log: Option<Arc<TokioMutex<PermissionAuditLog>>>,
70 extra_paths: Arc<RwLock<Arc<Vec<PathBuf>>>>,
71}
72
73#[derive(Default)]
74struct PtyState {
75 sessions: Mutex<HashMap<String, Arc<PtySessionHandle>>>,
76}
77
78impl PtyManager {
79 pub fn new(workspace_root: PathBuf, config: PtyConfig) -> Self {
80 let resolved_root = workspace_root
81 .canonicalize()
82 .unwrap_or_else(|_| workspace_root.clone());
83
84 let default_paths = path_env::compute_extra_search_paths(
85 &CommandsConfig::default().extra_path_entries,
86 &resolved_root,
87 );
88
89 Self {
90 workspace_root: resolved_root,
91 config,
92 inner: Arc::new(PtyState::default()),
93 audit_log: None,
94 extra_paths: Arc::new(RwLock::new(Arc::new(default_paths))),
95 }
96 }
97
98 pub fn with_audit_log(mut self, audit_log: Arc<TokioMutex<PermissionAuditLog>>) -> Self {
99 self.audit_log = Some(audit_log);
100 self
101 }
102
103 pub fn config(&self) -> &PtyConfig {
104 &self.config
105 }
106
107 pub fn apply_commands_config(&self, commands_config: &CommandsConfig) {
108 let new_paths = path_env::compute_extra_search_paths(
109 &commands_config.extra_path_entries,
110 &self.workspace_root,
111 );
112 *self.extra_paths.write() = Arc::new(new_paths);
113 }
114
115 pub fn describe_working_dir(&self, path: &Path) -> String {
116 self.format_working_dir(path)
117 }
118
119 pub async fn run_command(&self, mut request: PtyCommandRequest) -> Result<PtyCommandResult> {
120 if request.command.is_empty() {
121 return Err(anyhow!("PTY command cannot be empty"));
122 }
123
124 let mut command = std::mem::take(&mut request.command);
125 let program = command.remove(0);
126 let args = command;
127 let timeout = clamp_timeout(request.timeout);
128 let work_dir = std::mem::take(&mut request.working_dir);
129 let size = request.size;
130 let start = Instant::now();
131
132 let mut tags = HashMap::new();
133 tags.insert("subsystem".to_string(), "pty".to_string());
134 tags.insert("program".to_string(), program.clone());
135 perf::record_value("vtcode.perf.spawn_count", 1.0, tags);
136
137 gatekeeper::check_quarantine_for_program(&program);
138 self.ensure_within_workspace(&work_dir)?;
139 let workspace_root = self.workspace_root.clone();
140 let extra_paths = self.extra_paths.read().clone();
141 let max_tokens = request.max_tokens;
142
143 let needs_lock = is_long_running_command(&program)
145 || (is_shell_program(&program)
146 && args.iter().any(|arg| is_long_running_command_string(arg)));
147
148 let command_lock = if needs_lock {
153 Some(get_command_lock(&workspace_root))
154 } else {
155 None
156 };
157 let _command_guard = if let Some(ref lock) = command_lock {
158 debug!(
159 target: "vtcode.pty.command_lock",
160 program = %program,
161 workspace = %workspace_root.display(),
162 "Acquiring per-workspace command lock to serialize long-running invocations"
163 );
164 Some(lock.lock().await)
165 } else {
166 None
167 };
168
169 let result =
170 tokio::task::spawn_blocking(move || -> Result<PtyCommandResult> {
171 let timeout_duration = Duration::from_millis(timeout);
172
173 let (exec_program, exec_args, display_program, _use_shell_wrapper) =
177 if (is_shell_program(&program)
178 && args.iter().any(|arg| arg == "-c" || arg == "/C"))
179 || is_sandbox_wrapper_program(&program, &args)
180 {
181 (program.clone(), args.clone(), program.clone(), false)
183 } else {
184 let shell = resolve_fallback_shell();
185 let full_command =
186 join(std::iter::once(program.clone()).chain(args.iter().cloned()));
187 (
188 shell.clone(),
189 vec!["-lc".to_owned(), full_command.clone()],
190 program.clone(),
191 true,
192 )
193 };
194
195 let mut builder = CommandBuilder::new(exec_program.clone());
196 for arg in &exec_args {
197 builder.arg(arg);
198 }
199 builder.cwd(&work_dir);
200 let extra_env = HashMap::new();
201 set_command_environment(
202 &mut builder,
203 &display_program,
204 size,
205 &workspace_root,
206 &extra_paths,
207 &extra_env,
208 );
209
210 let pty_system = native_pty_system();
211 let pair = pty_system
212 .openpty(size)
213 .context("failed to allocate PTY pair")?;
214
215 let mut child = pair
216 .slave
217 .spawn_command(builder)
218 .with_context(|| format!("failed to spawn PTY command '{display_program}'"))?;
219 let child_pid = child.process_id();
220 let mut killer = child.clone_killer();
221 drop(pair.slave);
222
223 let reader = pair
224 .master
225 .try_clone_reader()
226 .context("failed to clone PTY reader")?;
227
228 let (wait_tx, wait_rx) = mpsc::channel();
229 let wait_thread = thread::spawn(move || {
230 let status = child.wait();
231 let _ = wait_tx.send(());
232 status
233 });
234
235 let reader_thread = thread::spawn(move || -> Result<Vec<u8>> {
236 let mut reader = reader;
237 let mut buffer = [0u8; 4096];
238 let mut collected = Vec::new();
239
240 loop {
241match reader.read(&mut buffer) {
242 Ok(0) => break,
243 Ok(bytes_read) => {
244 collected.extend_from_slice(&buffer[..bytes_read]);
245 }
246 Err(error) if error.kind() == std::io::ErrorKind::Interrupted => {
247 continue;
248 }
249 Err(error) => {
250 return Err(error).context("failed to read PTY command output");
251 }
252}
253 }
254
255 Ok(collected)
256 });
257
258 let wait_result = match wait_rx.recv_timeout(timeout_duration) {
259 Ok(()) => wait_thread.join().map_err(|panic| {
260anyhow!("PTY command wait thread panicked: {:?}", panic)
261 })?,
262 Err(mpsc::RecvTimeoutError::Timeout) => {
263 if let Some(pid) = child_pid {
267 let _ = vtcode_bash_runner::graceful_kill_process_group_default(pid);
268 } else {
269 let _ = killer.kill();
270 }
271
272let grace_period = Duration::from_millis(THREAD_JOIN_GRACE_PERIOD_MS);
274match wait_rx.recv_timeout(grace_period) {
275 Ok(()) | Err(mpsc::RecvTimeoutError::Disconnected) => {
276 }
279 Err(mpsc::RecvTimeoutError::Timeout) => {
280 warn!(
281 target: "vtcode.pty.timeout",
282 timeout_ms = timeout,
283 grace_ms = THREAD_JOIN_GRACE_PERIOD_MS,
284 "PTY command did not exit within grace period after kill, detaching threads"
285 );
286 drop(wait_thread);
288 drop(reader_thread);
289 return Err(anyhow!(
290 "PTY command timed out after {} milliseconds and did not respond to kill signal",
291 timeout
292 ));
293 }
294}
295
296match wait_thread.join() {
298 Ok(result) => {
299 if let Err(error) = result {
300 warn!(
301 target: "vtcode.pty.timeout",
302 error = %error,
303 "PTY command wait error after timeout"
304 );
305 }
306 }
307 Err(panic) => {
308 warn!(
309 target: "vtcode.pty.timeout",
310 "PTY wait thread panicked: {:?}",
311 panic
312 );
313 }
314}
315
316let reader_handle = thread::spawn(move || reader_thread.join());
320match reader_handle.join() {
321 Ok(Ok(Ok(_))) => {}
322 Ok(Ok(Err(e))) => {
323 warn!(
324 target: "vtcode.pty.timeout",
325 error = %e,
326 "PTY reader error after timeout"
327 );
328 }
329 Ok(Err(panic)) => {
330 warn!(
331 target: "vtcode.pty.timeout",
332 "PTY reader thread panicked: {:?}",
333 panic
334 );
335 }
336 Err(_) => {
337 warn!(
338 target: "vtcode.pty.timeout",
339 "Failed to join PTY reader thread wrapper"
340 );
341 }
342}
343
344return Err(anyhow!(
345 "PTY command timed out after {} milliseconds",
346 timeout
347));
348 }
349 Err(mpsc::RecvTimeoutError::Disconnected) => {
350let grace_period = Duration::from_millis(THREAD_JOIN_GRACE_PERIOD_MS);
353
354let wait_wrapper = thread::spawn(move || wait_thread.join());
356thread::sleep(grace_period);
357if wait_wrapper.is_finished() {
358 match wait_wrapper.join() {
359 Ok(Ok(result)) => {
360 if let Err(error) = result {
361 return Err(error).context(
362 "failed to wait for PTY command after channel disconnect",
363 );
364 }
365 }
366 Ok(Err(panic)) => {
367 return Err(anyhow!(
368 "PTY wait thread panicked: {:?}",
369 panic
370 ));
371 }
372 Err(_) => {
373 return Err(anyhow!(
374 "PTY wait channel disconnected and thread join failed"
375 ));
376 }
377 }
378} else {
379 warn!(
380 target: "vtcode.pty.disconnect",
381 "PTY wait thread did not exit within grace period, detaching"
382 );
383 drop(reader_thread);
384 return Err(anyhow!(
385 "PTY command wait channel disconnected unexpectedly"
386 ));
387}
388
389match reader_thread.join() {
391 Ok(Ok(_)) => {}
392 Ok(Err(e)) => {
393 warn!(
394 target: "vtcode.pty.disconnect",
395 error = %e,
396 "PTY reader error after channel disconnect"
397 );
398 }
399 Err(panic) => {
400 warn!(
401 target: "vtcode.pty.disconnect",
402 "PTY reader panicked: {:?}",
403 panic
404 );
405 }
406}
407
408return Err(anyhow!(
409 "PTY command wait channel disconnected unexpectedly"
410));
411 }
412 };
413
414 let status = wait_result.context("failed to wait for PTY command to exit")?;
415
416 let output_bytes = reader_thread
417 .join()
418 .map_err(|panic| anyhow!("PTY command reader thread panicked: {:?}", panic))?
419 .context("failed to read PTY command output")?;
420 let mut output = String::from_utf8_lossy(&output_bytes).into_owned();
421 let exit_code = exit_status_code(status);
422
423 if let Some(max_tokens) = max_tokens {
425 if max_tokens > 0 {
426if output.len() > max_tokens * 4 {
428 let truncate_point = (max_tokens * 4).min(output.len());
429 output.truncate(truncate_point);
430 output.push_str("\n[... truncated by max_tokens ...]");
431}
432 } else {
433}
435 }
436 Ok(PtyCommandResult {
439 exit_code,
440 output,
441 duration: start.elapsed(),
442 size,
443 applied_max_tokens: max_tokens,
444 })
445 })
446 .await
447 .context("failed to join PTY command task")??;
448
449 Ok(result)
450 }
451
452 pub async fn resolve_working_dir(&self, requested: Option<&str>) -> Result<PathBuf> {
453 let requested = match requested {
454 Some(dir) if !dir.trim().is_empty() => dir.trim(),
455 _ => return Ok(self.workspace_root.clone()),
456 };
457
458 let candidate = self.workspace_root.join(requested);
459 let normalized =
460 ensure_path_within_workspace(&candidate, &self.workspace_root).map_err(|_| {
461 anyhow!(
462 "Working directory '{}' escapes the workspace root",
463 candidate.display()
464 )
465 })?;
466 let metadata = tokio::fs::metadata(&normalized).await.with_context(|| {
467 format!(
468 "Working directory '{}' does not exist",
469 normalized.display()
470 )
471 })?;
472 if !metadata.is_dir() {
473 return Err(anyhow!(
474 "Working directory '{}' is not a directory",
475 normalized.display()
476 ));
477 }
478 Ok(normalized)
479 }
480
481 pub fn create_session(
482 &self,
483 session_id: String,
484 command: Vec<String>,
485 working_dir: PathBuf,
486 size: PtySize,
487 ) -> Result<VTCodePtySession> {
488 self.create_session_with_bridge(
489 session_id,
490 command,
491 working_dir,
492 size,
493 HashMap::new(),
494 None,
495 )
496 }
497
498 pub(crate) fn create_session_with_bridge(
499 &self,
500 session_id: String,
501 command: Vec<String>,
502 working_dir: PathBuf,
503 size: PtySize,
504 extra_env: HashMap<String, String>,
505 zsh_exec_bridge: Option<crate::zsh_exec_bridge::ZshExecBridgeSession>,
506 ) -> Result<VTCodePtySession> {
507 if command.is_empty() {
508 return Err(anyhow!(
509 "PTY session command cannot be empty.\n\
510 This is an internal error - command validation should have caught this earlier.\n\
511 Please report this with the command-session parameters used."
512 ));
513 }
514
515 let mut sessions = self.inner.sessions.lock();
517 use hashbrown::hash_map::Entry;
518 let entry = match sessions.entry(session_id.clone()) {
519 Entry::Occupied(_) => {
520 return Err(anyhow!("PTY session '{}' already exists", session_id));
521 }
522 Entry::Vacant(e) => e,
523 };
524
525 let mut command_parts = command;
526 let program = command_parts.remove(0);
527 let args = command_parts;
528 let extra_paths = self.extra_paths.read().clone();
529
530 let (exec_program, exec_args, display_program) = if (is_shell_program(&program)
534 && args.iter().any(|arg| arg == "-c" || arg == "/C"))
535 || is_sandbox_wrapper_program(&program, &args)
536 {
537 (program.clone(), args.clone(), program.clone())
539 } else {
540 let shell = resolve_fallback_shell();
541 let full_command = join(std::iter::once(program.clone()).chain(args.iter().cloned()));
542
543 if full_command.is_empty() {
545 return Err(anyhow!(
546 "Failed to construct command string from program '{}' and args {:?}",
547 program,
548 args
549 ));
550 }
551
552 (
553 shell.clone(),
554 vec!["-lc".to_owned(), full_command.clone()],
555 program.clone(),
556 )
557 };
558
559 let pty_system = native_pty_system();
560 let pair = pty_system
561 .openpty(size)
562 .context("failed to allocate PTY pair")?;
563
564 let mut builder = CommandBuilder::new(exec_program.clone());
565 for arg in &exec_args {
566 builder.arg(arg);
567 }
568 builder.cwd(&working_dir);
569 self.ensure_within_workspace(&working_dir)?;
570 set_command_environment(
571 &mut builder,
572 &display_program,
573 size,
574 &self.workspace_root,
575 &extra_paths,
576 &extra_env,
577 );
578
579 let child = pair.slave.spawn_command(builder).with_context(|| {
580 format!("failed to spawn PTY session command '{}'", display_program)
581 })?;
582
583 let child_pid = child.process_id();
585
586 drop(pair.slave);
587
588 let master = pair.master;
589 let mut reader = master
590 .try_clone_reader()
591 .context("failed to clone PTY reader")?;
592 let writer = master.take_writer().context("failed to take PTY writer")?;
593
594 let screen_state = Arc::new(Mutex::new(PtyScreenState::new(
595 size,
596 self.config.scrollback_lines,
597 )));
598 let scrollback = Arc::new(Mutex::new(PtyScrollback::new(
599 self.config.scrollback_lines,
600 self.config.max_scrollback_bytes,
601 )));
602 debug!(
603 session_id = %session_id,
604 rows = size.rows,
605 cols = size.cols,
606 "Created PTY session"
607 );
608 let screen_state_clone = Arc::clone(&screen_state);
609 let scrollback_clone = Arc::clone(&scrollback);
610 let session_name = session_id.clone();
611 UNICODE_MONITOR.start_session();
613
614 let reader_thread = thread::Builder::new()
615 .name(format!("vtcode-pty-reader-{session_name}"))
616 .spawn(move || {
617 let mut buffer = [0u8; 8192]; let mut utf8_buffer: Vec<u8> = Vec::with_capacity(8192); let mut total_bytes = 0usize;
620 let mut unicode_detection_hits = 0usize;
621
622 loop {
623 match reader.read(&mut buffer) {
624Ok(0) => {
625 if !utf8_buffer.is_empty() {
626 let mut scrollback = scrollback_clone.lock();
627 scrollback.push_utf8(&mut utf8_buffer, true);
628 }
629 debug!("PTY session '{}' reader reached EOF (processed {} bytes, {} unicode detections)",
630 session_name, total_bytes, unicode_detection_hits);
631 break;
632}
633Ok(bytes_read) => {
634 let chunk = &buffer[..bytes_read];
635 total_bytes += bytes_read;
636
637 let likely_unicode = chunk.iter().any(|&b| b >= 0x80);
639 if likely_unicode {
640 unicode_detection_hits += 1;
641 }
642
643 {
644 let mut screen_state = screen_state_clone.lock();
645 screen_state.process(chunk);
646 }
647
648 utf8_buffer.extend_from_slice(chunk);
649 {
650 let mut scrollback = scrollback_clone.lock();
651 scrollback.push_utf8(&mut utf8_buffer, false);
652 }
653
654 if utf8_buffer.capacity() > 32768 && utf8_buffer.len() < 1024 {
656 utf8_buffer.shrink_to_fit();
657 }
658}
659Err(error) => {
660 warn!("PTY session '{}' reader error: {} (processed {} bytes)",
661 session_name, error, total_bytes);
662 break;
663}
664 }
665 }
666 debug!("PTY session '{}' reader thread finished (total: {} bytes, unicode detections: {})",
667 session_name, total_bytes, unicode_detection_hits);
668
669 UNICODE_MONITOR.end_session();
671
672 if unicode_detection_hits > 0 {
674 let scrollback = scrollback_clone.lock();
675 let metrics = scrollback.metrics();
676 if metrics.unicode_errors > 0 {
677warn!("PTY session '{}' had {} unicode errors during processing",
678 session_name, metrics.unicode_errors);
679 }
680 if metrics.total_unicode_chars > 0 {
681info!("PTY session '{}' processed {} unicode characters across {} sessions with {} buffer remainder",
682 session_name, metrics.total_unicode_chars, metrics.unicode_sessions, metrics.utf8_buffer_size);
683 }
684 }
685 })
686 .context("failed to spawn PTY reader thread")?;
687
688 let metadata = VTCodePtySession {
689 id: session_id.clone(),
690 command: program,
691 args,
692 working_dir: Some(self.format_working_dir(&working_dir)),
693 rows: size.rows,
694 cols: size.cols,
695 child_pid,
696 started_at: Some(Utc::now()),
697 lifecycle_state: Some(crate::tools::types::VTCodeSessionLifecycleState::Running),
698 exit_code: None,
699 screen_contents: None,
700 scrollback: None,
701 };
702
703 entry.insert(Arc::new(PtySessionHandle {
705 master: Mutex::new(master),
706 child: Mutex::new(child),
707 child_pid,
708 writer: Mutex::new(Some(writer)),
709 screen_state,
710 scrollback,
711 reader_thread: Mutex::new(Some(reader_thread)),
712 metadata: metadata.clone(),
713 last_input: Mutex::new(None),
714 _zsh_exec_bridge: zsh_exec_bridge,
715 }));
716
717 Ok(metadata)
718 }
719
720 fn format_working_dir(&self, path: &Path) -> String {
721 match path.strip_prefix(&self.workspace_root) {
722 Ok(relative) if relative.as_os_str().is_empty() => ".".into(),
723 Ok(relative) => relative.to_string_lossy().replace("\\", "/"),
724 Err(_) => path.to_string_lossy().into_owned(),
725 }
726 }
727
728 fn ensure_within_workspace(&self, candidate: &Path) -> Result<()> {
729 ensure_path_within_workspace(candidate, &self.workspace_root).map(|_| ())
730 }
731}