Skip to main content

vtcode_core/tools/
exec_session.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3use std::sync::atomic::{AtomicUsize, Ordering};
4
5use anyhow::{Context, Result, anyhow};
6use chrono::Utc;
7use hashbrown::HashMap;
8use tokio::sync::{Mutex, RwLock, watch};
9use tokio::task::JoinHandle;
10use vtcode_bash_runner::{PipeSpawnOptions, ProcessHandle, spawn_pipe_process_with_options};
11
12use crate::tools::ExecSessionId;
13use crate::tools::pty::PtySize;
14use crate::tools::registry::{PtySessionGuard, PtySessionManager};
15use crate::tools::types::VTCodeExecSession;
16use crate::utils::path::{canonicalize_workspace, ensure_path_within_workspace};
17use crate::zsh_exec_bridge::ZshExecBridgeSession;
18
19struct PipeSessionRecord {
20    metadata: VTCodeExecSession,
21    handle: Arc<ProcessHandle>,
22    output: Arc<Mutex<String>>,
23    pending_offset: AtomicUsize,
24    output_task: Mutex<Option<JoinHandle<()>>>,
25    exit_task: Mutex<Option<JoinHandle<()>>>,
26    activity_tx: watch::Sender<u64>,
27}
28
29impl PipeSessionRecord {
30    fn new(
31        metadata: VTCodeExecSession,
32        handle: Arc<ProcessHandle>,
33        output: Arc<Mutex<String>>,
34        output_task: JoinHandle<()>,
35        exit_task: JoinHandle<()>,
36        activity_tx: watch::Sender<u64>,
37    ) -> Self {
38        Self {
39            metadata,
40            handle,
41            output,
42            pending_offset: AtomicUsize::new(0),
43            output_task: Mutex::new(Some(output_task)),
44            exit_task: Mutex::new(Some(exit_task)),
45            activity_tx,
46        }
47    }
48}
49
50#[derive(Clone)]
51struct PipeSessionManager {
52    workspace_root: PathBuf,
53    sessions: Arc<RwLock<HashMap<ExecSessionId, Arc<PipeSessionRecord>>>>,
54}
55
56impl PipeSessionManager {
57    fn new(workspace_root: PathBuf) -> Self {
58        Self {
59            workspace_root: canonicalize_workspace(&workspace_root),
60            sessions: Arc::new(RwLock::new(HashMap::new())),
61        }
62    }
63
64    async fn create_session(
65        &self,
66        session_id: ExecSessionId,
67        command: Vec<String>,
68        working_dir: PathBuf,
69        env: HashMap<String, String>,
70    ) -> Result<VTCodeExecSession> {
71        if command.is_empty() {
72            return Err(anyhow!("exec session command cannot be empty"));
73        }
74        let working_dir = canonicalize_workspace(&working_dir);
75        self.ensure_within_workspace(&working_dir)?;
76
77        {
78            let sessions = self.sessions.read().await;
79            if sessions.contains_key(session_id.as_str()) {
80                return Err(anyhow!(
81                    "exec session '{}' already exists",
82                    session_id.as_str()
83                ));
84            }
85        }
86
87        let mut command_parts = command;
88        let program = command_parts.remove(0);
89        let args = command_parts;
90
91        let opts = PipeSpawnOptions::new(program.clone(), working_dir.clone())
92            .args(args.clone())
93            .env(env);
94        let spawned = spawn_pipe_process_with_options(opts)
95            .await
96            .with_context(|| format!("failed to spawn pipe session '{}'", session_id))?;
97
98        let metadata = VTCodeExecSession {
99            id: session_id.clone(),
100            backend: "pipe".to_string(),
101            command: program,
102            args,
103            working_dir: Some(self.format_working_dir(&working_dir)),
104            rows: None,
105            cols: None,
106            child_pid: None,
107            started_at: Some(Utc::now()),
108            lifecycle_state: Some(crate::tools::types::VTCodeSessionLifecycleState::Running),
109            exit_code: None,
110        };
111
112        let handle = Arc::new(spawned.session);
113        let output = Arc::new(Mutex::new(String::new()));
114        let output_clone = Arc::clone(&output);
115        let mut output_rx = spawned.output_rx;
116        let output_handle = Arc::clone(&handle);
117        let (activity_tx, _) = watch::channel(0u64);
118        let output_activity_tx = activity_tx.clone();
119        let output_task = tokio::spawn(async move {
120            loop {
121                match tokio::time::timeout(tokio::time::Duration::from_millis(15), output_rx.recv())
122                    .await
123                {
124                    Ok(Ok(chunk)) => {
125                        let text = String::from_utf8_lossy(&chunk);
126                        let mut guard = output_clone.lock().await;
127                        guard.push_str(&text);
128                        output_activity_tx.send_modify(|version| *version += 1);
129                    }
130                    Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
131                    Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => break,
132                    Err(_) if output_handle.has_exited() && output_handle.is_output_drained() => {
133                        break;
134                    }
135                    Err(_) => continue,
136                }
137            }
138        });
139        let exit_rx = spawned.exit_rx;
140        let exit_activity_tx = activity_tx.clone();
141        let exit_task = tokio::spawn(async move {
142            let _ = exit_rx.await;
143            exit_activity_tx.send_modify(|version| *version += 1);
144        });
145        let record = Arc::new(PipeSessionRecord::new(
146            metadata.clone(),
147            handle,
148            output,
149            output_task,
150            exit_task,
151            activity_tx,
152        ));
153
154        let mut sessions = self.sessions.write().await;
155        sessions.insert(session_id, record);
156
157        Ok(metadata)
158    }
159
160    async fn read_session_output(&self, session_id: &str, drain: bool) -> Result<Option<String>> {
161        let record = self.session_record(session_id).await?;
162        let start = record.pending_offset.load(Ordering::SeqCst);
163        let output = record.output.lock().await;
164        if start >= output.len() {
165            return Ok(None);
166        }
167
168        let pending = output.get(start..).map(ToOwned::to_owned).ok_or_else(|| {
169            anyhow!(
170                "pipe session '{}' produced invalid output boundary",
171                session_id
172            )
173        })?;
174
175        if drain {
176            record.pending_offset.store(output.len(), Ordering::SeqCst);
177        }
178
179        if pending.is_empty() {
180            Ok(None)
181        } else {
182            Ok(Some(pending))
183        }
184    }
185
186    async fn send_input_to_session(
187        &self,
188        session_id: &str,
189        data: &[u8],
190        append_newline: bool,
191    ) -> Result<usize> {
192        let record = self.session_record(session_id).await?;
193        record
194            .handle
195            .write(data.to_vec())
196            .await
197            .map_err(|_| anyhow!("exec session '{}' is no longer writable", session_id))?;
198
199        if append_newline {
200            record
201                .handle
202                .write(b"\n".to_vec())
203                .await
204                .map_err(|_| anyhow!("exec session '{}' is no longer writable", session_id))?;
205        }
206
207        Ok(data.len() + usize::from(append_newline))
208    }
209
210    async fn is_session_completed(&self, session_id: &str) -> Result<Option<i32>> {
211        let record = self.session_record(session_id).await?;
212        if record.handle.has_exited() {
213            Ok(record.handle.exit_code())
214        } else {
215            Ok(None)
216        }
217    }
218
219    async fn terminate_session(&self, session_id: &str) -> Result<()> {
220        let record = self.session_record(session_id).await?;
221        record.handle.terminate();
222        Ok(())
223    }
224
225    async fn close_session(&self, session_id: &str) -> Result<VTCodeExecSession> {
226        let record = {
227            let mut sessions = self.sessions.write().await;
228            sessions
229                .remove(session_id)
230                .ok_or_else(|| anyhow!("exec session '{}' not found", session_id))?
231        };
232
233        record.handle.terminate();
234        if let Some(task) = record.output_task.lock().await.take() {
235            task.abort();
236        }
237        if let Some(task) = record.exit_task.lock().await.take() {
238            task.abort();
239        }
240
241        Ok(record.metadata.clone())
242    }
243
244    async fn activity_receiver(&self, session_id: &str) -> Result<watch::Receiver<u64>> {
245        let record = self.session_record(session_id).await?;
246        Ok(record.activity_tx.subscribe())
247    }
248
249    async fn is_output_drained(&self, session_id: &str) -> Result<bool> {
250        let record = self.session_record(session_id).await?;
251        let output_task = record.output_task.lock().await;
252        let output_task_finished = match output_task.as_ref() {
253            Some(task) => task.is_finished(),
254            None => true,
255        };
256        Ok(record.handle.is_output_drained() && output_task_finished)
257    }
258
259    async fn terminate_all_sessions(&self) -> Result<()> {
260        let ids = {
261            let sessions = self.sessions.read().await;
262            sessions.keys().cloned().collect::<Vec<_>>()
263        };
264
265        for session_id in ids {
266            self.close_session(&session_id).await?;
267        }
268
269        Ok(())
270    }
271
272    async fn session_record(&self, session_id: &str) -> Result<Arc<PipeSessionRecord>> {
273        let sessions = self.sessions.read().await;
274        sessions
275            .get(session_id)
276            .cloned()
277            .ok_or_else(|| anyhow!("exec session '{}' not found", session_id))
278    }
279
280    fn ensure_within_workspace(&self, candidate: &Path) -> Result<()> {
281        ensure_path_within_workspace(candidate, &self.workspace_root).map(|_| ())
282    }
283
284    fn format_working_dir(&self, path: &Path) -> String {
285        match path.strip_prefix(&self.workspace_root) {
286            Ok(relative) if relative.as_os_str().is_empty() => ".".into(),
287            Ok(relative) => relative.to_string_lossy().replace("\\", "/"),
288            Err(_) => path.to_string_lossy().into_owned(),
289        }
290    }
291}
292
293#[derive(Debug, Clone, Copy, PartialEq, Eq)]
294pub enum ExecSessionBackend {
295    Pipe,
296    Pty,
297}
298
299struct ExecSessionRecord {
300    metadata: VTCodeExecSession,
301    backend: ExecSessionBackend,
302    _pty_guard: Option<PtySessionGuard>,
303}
304
305impl ExecSessionRecord {
306    fn new(
307        metadata: VTCodeExecSession,
308        backend: ExecSessionBackend,
309        pty_guard: Option<PtySessionGuard>,
310    ) -> Self {
311        Self {
312            metadata,
313            backend,
314            _pty_guard: pty_guard,
315        }
316    }
317}
318
319#[derive(Clone)]
320pub struct ExecSessionManager {
321    pipe_sessions: PipeSessionManager,
322    pty_sessions: PtySessionManager,
323    sessions: Arc<RwLock<HashMap<ExecSessionId, Arc<ExecSessionRecord>>>>,
324}
325
326impl ExecSessionManager {
327    #[must_use]
328    pub fn new(workspace_root: PathBuf, pty_sessions: PtySessionManager) -> Self {
329        Self {
330            pipe_sessions: PipeSessionManager::new(workspace_root),
331            pty_sessions,
332            sessions: Arc::new(RwLock::new(HashMap::new())),
333        }
334    }
335
336    pub(crate) async fn create_pipe_session(
337        &self,
338        session_id: ExecSessionId,
339        command: Vec<String>,
340        working_dir: PathBuf,
341        env: HashMap<String, String>,
342    ) -> Result<VTCodeExecSession> {
343        self.ensure_session_absent(&session_id).await?;
344        let metadata = self
345            .pipe_sessions
346            .create_session(session_id, command, working_dir, env)
347            .await?;
348        self.insert_session(metadata.clone(), ExecSessionBackend::Pipe, None)
349            .await?;
350        Ok(metadata)
351    }
352
353    pub(crate) async fn create_pty_session(
354        &self,
355        session_id: ExecSessionId,
356        command: Vec<String>,
357        working_dir: PathBuf,
358        size: PtySize,
359        extra_env: HashMap<String, String>,
360        zsh_exec_bridge: Option<ZshExecBridgeSession>,
361    ) -> Result<VTCodeExecSession> {
362        self.ensure_session_absent(&session_id).await?;
363        let pty_guard = self.pty_sessions.start_session()?;
364        let metadata = self.pty_sessions.manager().create_session_with_bridge(
365            session_id.clone().into(),
366            command,
367            working_dir,
368            size,
369            extra_env,
370            zsh_exec_bridge,
371        )?;
372        let exec_metadata = VTCodeExecSession::from(metadata);
373        self.insert_session(
374            exec_metadata.clone(),
375            ExecSessionBackend::Pty,
376            Some(pty_guard),
377        )
378        .await?;
379        Ok(exec_metadata)
380    }
381
382    pub(crate) async fn snapshot_session(&self, session_id: &str) -> Result<VTCodeExecSession> {
383        let record = self.session_record(session_id).await?;
384        match record.backend {
385            ExecSessionBackend::Pipe => {
386                self.pipe_sessions
387                    .session_record(session_id)
388                    .await
389                    .map(|r| {
390                        let mut metadata = r.metadata.clone();
391                        let exit_code = if r.handle.has_exited() {
392                            r.handle.exit_code()
393                        } else {
394                            None
395                        };
396                        metadata.exit_code = exit_code;
397                        metadata.lifecycle_state = Some(if exit_code.is_some() {
398                            crate::tools::types::VTCodeSessionLifecycleState::Exited
399                        } else {
400                            crate::tools::types::VTCodeSessionLifecycleState::Running
401                        });
402                        metadata
403                    })
404            }
405            ExecSessionBackend::Pty => self
406                .pty_sessions
407                .manager()
408                .snapshot_session(session_id)
409                .map(VTCodeExecSession::from),
410        }
411    }
412
413    pub(crate) async fn list_sessions(&self) -> Vec<VTCodeExecSession> {
414        let sessions = self.sessions.read().await;
415        let mut listed = sessions
416            .values()
417            .map(|record| record.metadata.clone())
418            .collect::<Vec<_>>();
419        listed.sort_by(|left, right| left.id.cmp(&right.id));
420        listed
421    }
422
423    pub(crate) async fn read_session_output(
424        &self,
425        session_id: &str,
426        drain: bool,
427    ) -> Result<Option<String>> {
428        let record = self.session_record(session_id).await?;
429        match record.backend {
430            ExecSessionBackend::Pipe => {
431                self.pipe_sessions
432                    .read_session_output(session_id, drain)
433                    .await
434            }
435            ExecSessionBackend::Pty => self
436                .pty_sessions
437                .manager()
438                .read_session_output(session_id, drain),
439        }
440    }
441
442    pub(crate) async fn send_input_to_session(
443        &self,
444        session_id: &str,
445        data: &[u8],
446        append_newline: bool,
447    ) -> Result<usize> {
448        let record = self.session_record(session_id).await?;
449        match record.backend {
450            ExecSessionBackend::Pipe => {
451                self.pipe_sessions
452                    .send_input_to_session(session_id, data, append_newline)
453                    .await
454            }
455            ExecSessionBackend::Pty => {
456                self.pty_sessions
457                    .manager()
458                    .send_input_to_session(session_id, data, append_newline)
459            }
460        }
461    }
462
463    pub(crate) async fn is_session_completed(&self, session_id: &str) -> Result<Option<i32>> {
464        let record = self.session_record(session_id).await?;
465        match record.backend {
466            ExecSessionBackend::Pipe => self.pipe_sessions.is_session_completed(session_id).await,
467            ExecSessionBackend::Pty => self.pty_sessions.manager().is_session_completed(session_id),
468        }
469    }
470
471    pub(crate) async fn activity_receiver(
472        &self,
473        session_id: &str,
474    ) -> Result<Option<watch::Receiver<u64>>> {
475        let record = self.session_record(session_id).await?;
476        match record.backend {
477            ExecSessionBackend::Pipe => self
478                .pipe_sessions
479                .activity_receiver(session_id)
480                .await
481                .map(Some),
482            ExecSessionBackend::Pty => Ok(None),
483        }
484    }
485
486    pub(crate) async fn is_output_drained(&self, session_id: &str) -> Result<bool> {
487        let record = self.session_record(session_id).await?;
488        match record.backend {
489            ExecSessionBackend::Pipe => self.pipe_sessions.is_output_drained(session_id).await,
490            ExecSessionBackend::Pty => Ok(true),
491        }
492    }
493
494    pub(crate) async fn terminate_session(&self, session_id: &str) -> Result<()> {
495        let record = self.session_record(session_id).await?;
496        match record.backend {
497            ExecSessionBackend::Pipe => self.pipe_sessions.terminate_session(session_id).await,
498            ExecSessionBackend::Pty => self.pty_sessions.manager().terminate_session(session_id),
499        }
500    }
501
502    pub(crate) async fn close_session(&self, session_id: &str) -> Result<VTCodeExecSession> {
503        let record = {
504            let mut sessions = self.sessions.write().await;
505            sessions
506                .remove(session_id)
507                .ok_or_else(|| anyhow!("exec session '{}' not found", session_id))?
508        };
509
510        let metadata = match record.backend {
511            ExecSessionBackend::Pipe => self.pipe_sessions.close_session(session_id).await?,
512            ExecSessionBackend::Pty => self
513                .pty_sessions
514                .manager()
515                .close_session(session_id)
516                .map(VTCodeExecSession::from)?,
517        };
518
519        Ok(metadata)
520    }
521
522    pub(crate) async fn prune_exited_session(
523        &self,
524        session_id: &str,
525    ) -> Result<Option<VTCodeExecSession>> {
526        if self.is_session_completed(session_id).await?.is_some() {
527            return self.close_session(session_id).await.map(Some);
528        }
529        Ok(None)
530    }
531
532    pub(crate) async fn terminate_all_sessions_async(&self) -> Result<()> {
533        let ids = {
534            let sessions = self.sessions.read().await;
535            sessions.keys().cloned().collect::<Vec<_>>()
536        };
537
538        let mut failures = Vec::new();
539        for session_id in ids {
540            if let Err(err) = self.close_session(&session_id).await {
541                failures.push(format!("{session_id}: {err}"));
542            }
543        }
544
545        if let Err(err) = self.pipe_sessions.terminate_all_sessions().await {
546            failures.push(err.to_string());
547        }
548
549        if failures.is_empty() {
550            Ok(())
551        } else {
552            Err(anyhow!(
553                "failed to terminate all exec sessions: {}",
554                failures.join("; ")
555            ))
556        }
557    }
558
559    async fn insert_session(
560        &self,
561        metadata: VTCodeExecSession,
562        backend: ExecSessionBackend,
563        pty_guard: Option<PtySessionGuard>,
564    ) -> Result<()> {
565        let mut sessions = self.sessions.write().await;
566        use hashbrown::hash_map::Entry;
567        match sessions.entry(metadata.id.clone()) {
568            Entry::Occupied(_) => Err(anyhow!(
569                "exec session '{}' already exists",
570                metadata.id.as_str()
571            )),
572            Entry::Vacant(entry) => {
573                entry.insert(Arc::new(ExecSessionRecord::new(
574                    metadata, backend, pty_guard,
575                )));
576                Ok(())
577            }
578        }
579    }
580
581    async fn ensure_session_absent(&self, session_id: &str) -> Result<()> {
582        let sessions = self.sessions.read().await;
583        if sessions.contains_key(session_id) {
584            return Err(anyhow!("exec session '{}' already exists", session_id));
585        }
586        Ok(())
587    }
588
589    async fn session_record(&self, session_id: &str) -> Result<Arc<ExecSessionRecord>> {
590        let sessions = self.sessions.read().await;
591        sessions
592            .get(session_id)
593            .cloned()
594            .ok_or_else(|| anyhow!("exec session '{}' not found", session_id))
595    }
596}
597
598#[cfg(test)]
599mod tests {
600    use hashbrown::HashMap;
601    use tempfile::tempdir;
602    use tokio::time::{Duration, timeout};
603
604    use super::ExecSessionManager;
605    use crate::config::PtyConfig;
606    use crate::tools::pty::PtySize;
607    use crate::tools::registry::PtySessionManager;
608    use crate::utils::path::canonicalize_workspace;
609
610    #[tokio::test]
611    #[cfg(all(unix, feature = "tui"))]
612    async fn pty_session_limit_holds_until_exec_session_close() -> anyhow::Result<()> {
613        let temp_dir = tempdir()?;
614        let workspace_root = canonicalize_workspace(temp_dir.path());
615        let pty_sessions = PtySessionManager::new(
616            workspace_root.clone(),
617            PtyConfig {
618                max_sessions: 1,
619                ..Default::default()
620            },
621        );
622        let manager = ExecSessionManager::new(workspace_root.clone(), pty_sessions);
623        let size = PtySize {
624            rows: 24,
625            cols: 80,
626            pixel_width: 0,
627            pixel_height: 0,
628        };
629
630        manager
631            .create_pty_session(
632                "run-1".to_string().into(),
633                vec![
634                    "/bin/sh".to_string(),
635                    "-c".to_string(),
636                    "sleep 1".to_string(),
637                ],
638                workspace_root.clone(),
639                size,
640                HashMap::new(),
641                None,
642            )
643            .await?;
644
645        let second = manager
646            .create_pty_session(
647                "run-2".to_string().into(),
648                vec![
649                    "/bin/sh".to_string(),
650                    "-c".to_string(),
651                    "sleep 1".to_string(),
652                ],
653                workspace_root.clone(),
654                size,
655                HashMap::new(),
656                None,
657            )
658            .await;
659        assert!(second.is_err());
660        assert!(
661            second
662                .unwrap_err()
663                .to_string()
664                .contains("Maximum PTY sessions")
665        );
666
667        manager.close_session("run-1").await?;
668        manager
669            .create_pty_session(
670                "run-3".to_string().into(),
671                vec![
672                    "/bin/sh".to_string(),
673                    "-c".to_string(),
674                    "sleep 1".to_string(),
675                ],
676                workspace_root,
677                size,
678                HashMap::new(),
679                None,
680            )
681            .await?;
682        manager.close_session("run-3").await?;
683
684        Ok(())
685    }
686
687    #[tokio::test]
688    #[cfg(unix)]
689    async fn pipe_session_activity_receiver_notifies_on_output() -> anyhow::Result<()> {
690        let temp_dir = tempdir()?;
691        let workspace_root = canonicalize_workspace(temp_dir.path());
692        let pty_sessions = PtySessionManager::new(workspace_root.clone(), PtyConfig::default());
693        let manager = ExecSessionManager::new(workspace_root.clone(), pty_sessions);
694
695        manager
696            .create_pipe_session(
697                "run-1".to_string().into(),
698                vec![
699                    "/bin/sh".to_string(),
700                    "-c".to_string(),
701                    "printf hello".to_string(),
702                ],
703                workspace_root,
704                HashMap::new(),
705            )
706            .await?;
707
708        let mut activity_rx = manager
709            .activity_receiver("run-1")
710            .await?
711            .expect("pipe sessions should expose activity receiver");
712
713        timeout(Duration::from_secs(2), activity_rx.changed()).await??;
714        let output = manager
715            .read_session_output("run-1", true)
716            .await?
717            .expect("session output");
718        assert!(output.contains("hello"));
719
720        manager.close_session("run-1").await?;
721        Ok(())
722    }
723}