1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3use std::sync::atomic::{AtomicUsize, Ordering};
4
5use anyhow::{Context, Result, anyhow};
6use chrono::Utc;
7use hashbrown::HashMap;
8use tokio::sync::{Mutex, RwLock, watch};
9use tokio::task::JoinHandle;
10use vtcode_bash_runner::{PipeSpawnOptions, ProcessHandle, spawn_pipe_process_with_options};
11
12use crate::tools::ExecSessionId;
13use crate::tools::pty::PtySize;
14use crate::tools::registry::{PtySessionGuard, PtySessionManager};
15use crate::tools::types::VTCodeExecSession;
16use crate::utils::path::{canonicalize_workspace, ensure_path_within_workspace};
17use crate::zsh_exec_bridge::ZshExecBridgeSession;
18
19struct PipeSessionRecord {
20 metadata: VTCodeExecSession,
21 handle: Arc<ProcessHandle>,
22 output: Arc<Mutex<String>>,
23 pending_offset: AtomicUsize,
24 output_task: Mutex<Option<JoinHandle<()>>>,
25 exit_task: Mutex<Option<JoinHandle<()>>>,
26 activity_tx: watch::Sender<u64>,
27}
28
29impl PipeSessionRecord {
30 fn new(
31 metadata: VTCodeExecSession,
32 handle: Arc<ProcessHandle>,
33 output: Arc<Mutex<String>>,
34 output_task: JoinHandle<()>,
35 exit_task: JoinHandle<()>,
36 activity_tx: watch::Sender<u64>,
37 ) -> Self {
38 Self {
39 metadata,
40 handle,
41 output,
42 pending_offset: AtomicUsize::new(0),
43 output_task: Mutex::new(Some(output_task)),
44 exit_task: Mutex::new(Some(exit_task)),
45 activity_tx,
46 }
47 }
48}
49
50#[derive(Clone)]
51struct PipeSessionManager {
52 workspace_root: PathBuf,
53 sessions: Arc<RwLock<HashMap<ExecSessionId, Arc<PipeSessionRecord>>>>,
54}
55
56impl PipeSessionManager {
57 fn new(workspace_root: PathBuf) -> Self {
58 Self {
59 workspace_root: canonicalize_workspace(&workspace_root),
60 sessions: Arc::new(RwLock::new(HashMap::new())),
61 }
62 }
63
64 async fn create_session(
65 &self,
66 session_id: ExecSessionId,
67 command: Vec<String>,
68 working_dir: PathBuf,
69 env: HashMap<String, String>,
70 ) -> Result<VTCodeExecSession> {
71 if command.is_empty() {
72 return Err(anyhow!("exec session command cannot be empty"));
73 }
74 let working_dir = canonicalize_workspace(&working_dir);
75 self.ensure_within_workspace(&working_dir)?;
76
77 {
78 let sessions = self.sessions.read().await;
79 if sessions.contains_key(session_id.as_str()) {
80 return Err(anyhow!(
81 "exec session '{}' already exists",
82 session_id.as_str()
83 ));
84 }
85 }
86
87 let mut command_parts = command;
88 let program = command_parts.remove(0);
89 let args = command_parts;
90
91 let opts = PipeSpawnOptions::new(program.clone(), working_dir.clone())
92 .args(args.clone())
93 .env(env);
94 let spawned = spawn_pipe_process_with_options(opts)
95 .await
96 .with_context(|| format!("failed to spawn pipe session '{}'", session_id))?;
97
98 let metadata = VTCodeExecSession {
99 id: session_id.clone(),
100 backend: "pipe".to_string(),
101 command: program,
102 args,
103 working_dir: Some(self.format_working_dir(&working_dir)),
104 rows: None,
105 cols: None,
106 child_pid: None,
107 started_at: Some(Utc::now()),
108 lifecycle_state: Some(crate::tools::types::VTCodeSessionLifecycleState::Running),
109 exit_code: None,
110 };
111
112 let handle = Arc::new(spawned.session);
113 let output = Arc::new(Mutex::new(String::new()));
114 let output_clone = Arc::clone(&output);
115 let mut output_rx = spawned.output_rx;
116 let output_handle = Arc::clone(&handle);
117 let (activity_tx, _) = watch::channel(0u64);
118 let output_activity_tx = activity_tx.clone();
119 let output_task = tokio::spawn(async move {
120 loop {
121 match tokio::time::timeout(tokio::time::Duration::from_millis(15), output_rx.recv())
122 .await
123 {
124 Ok(Ok(chunk)) => {
125 let text = String::from_utf8_lossy(&chunk);
126 let mut guard = output_clone.lock().await;
127 guard.push_str(&text);
128 output_activity_tx.send_modify(|version| *version += 1);
129 }
130 Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
131 Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => break,
132 Err(_) if output_handle.has_exited() && output_handle.is_output_drained() => {
133 break;
134 }
135 Err(_) => continue,
136 }
137 }
138 });
139 let exit_rx = spawned.exit_rx;
140 let exit_activity_tx = activity_tx.clone();
141 let exit_task = tokio::spawn(async move {
142 let _ = exit_rx.await;
143 exit_activity_tx.send_modify(|version| *version += 1);
144 });
145 let record = Arc::new(PipeSessionRecord::new(
146 metadata.clone(),
147 handle,
148 output,
149 output_task,
150 exit_task,
151 activity_tx,
152 ));
153
154 let mut sessions = self.sessions.write().await;
155 sessions.insert(session_id, record);
156
157 Ok(metadata)
158 }
159
160 async fn read_session_output(&self, session_id: &str, drain: bool) -> Result<Option<String>> {
161 let record = self.session_record(session_id).await?;
162 let start = record.pending_offset.load(Ordering::SeqCst);
163 let output = record.output.lock().await;
164 if start >= output.len() {
165 return Ok(None);
166 }
167
168 let pending = output.get(start..).map(ToOwned::to_owned).ok_or_else(|| {
169 anyhow!(
170 "pipe session '{}' produced invalid output boundary",
171 session_id
172 )
173 })?;
174
175 if drain {
176 record.pending_offset.store(output.len(), Ordering::SeqCst);
177 }
178
179 if pending.is_empty() {
180 Ok(None)
181 } else {
182 Ok(Some(pending))
183 }
184 }
185
186 async fn send_input_to_session(
187 &self,
188 session_id: &str,
189 data: &[u8],
190 append_newline: bool,
191 ) -> Result<usize> {
192 let record = self.session_record(session_id).await?;
193 record
194 .handle
195 .write(data.to_vec())
196 .await
197 .map_err(|_| anyhow!("exec session '{}' is no longer writable", session_id))?;
198
199 if append_newline {
200 record
201 .handle
202 .write(b"\n".to_vec())
203 .await
204 .map_err(|_| anyhow!("exec session '{}' is no longer writable", session_id))?;
205 }
206
207 Ok(data.len() + usize::from(append_newline))
208 }
209
210 async fn is_session_completed(&self, session_id: &str) -> Result<Option<i32>> {
211 let record = self.session_record(session_id).await?;
212 if record.handle.has_exited() {
213 Ok(record.handle.exit_code())
214 } else {
215 Ok(None)
216 }
217 }
218
219 async fn terminate_session(&self, session_id: &str) -> Result<()> {
220 let record = self.session_record(session_id).await?;
221 record.handle.terminate();
222 Ok(())
223 }
224
225 async fn close_session(&self, session_id: &str) -> Result<VTCodeExecSession> {
226 let record = {
227 let mut sessions = self.sessions.write().await;
228 sessions
229 .remove(session_id)
230 .ok_or_else(|| anyhow!("exec session '{}' not found", session_id))?
231 };
232
233 record.handle.terminate();
234 if let Some(task) = record.output_task.lock().await.take() {
235 task.abort();
236 }
237 if let Some(task) = record.exit_task.lock().await.take() {
238 task.abort();
239 }
240
241 Ok(record.metadata.clone())
242 }
243
244 async fn activity_receiver(&self, session_id: &str) -> Result<watch::Receiver<u64>> {
245 let record = self.session_record(session_id).await?;
246 Ok(record.activity_tx.subscribe())
247 }
248
249 async fn is_output_drained(&self, session_id: &str) -> Result<bool> {
250 let record = self.session_record(session_id).await?;
251 let output_task = record.output_task.lock().await;
252 let output_task_finished = match output_task.as_ref() {
253 Some(task) => task.is_finished(),
254 None => true,
255 };
256 Ok(record.handle.is_output_drained() && output_task_finished)
257 }
258
259 async fn terminate_all_sessions(&self) -> Result<()> {
260 let ids = {
261 let sessions = self.sessions.read().await;
262 sessions.keys().cloned().collect::<Vec<_>>()
263 };
264
265 for session_id in ids {
266 self.close_session(&session_id).await?;
267 }
268
269 Ok(())
270 }
271
272 async fn session_record(&self, session_id: &str) -> Result<Arc<PipeSessionRecord>> {
273 let sessions = self.sessions.read().await;
274 sessions
275 .get(session_id)
276 .cloned()
277 .ok_or_else(|| anyhow!("exec session '{}' not found", session_id))
278 }
279
280 fn ensure_within_workspace(&self, candidate: &Path) -> Result<()> {
281 ensure_path_within_workspace(candidate, &self.workspace_root).map(|_| ())
282 }
283
284 fn format_working_dir(&self, path: &Path) -> String {
285 match path.strip_prefix(&self.workspace_root) {
286 Ok(relative) if relative.as_os_str().is_empty() => ".".into(),
287 Ok(relative) => relative.to_string_lossy().replace("\\", "/"),
288 Err(_) => path.to_string_lossy().into_owned(),
289 }
290 }
291}
292
293#[derive(Debug, Clone, Copy, PartialEq, Eq)]
294pub enum ExecSessionBackend {
295 Pipe,
296 Pty,
297}
298
299struct ExecSessionRecord {
300 metadata: VTCodeExecSession,
301 backend: ExecSessionBackend,
302 _pty_guard: Option<PtySessionGuard>,
303}
304
305impl ExecSessionRecord {
306 fn new(
307 metadata: VTCodeExecSession,
308 backend: ExecSessionBackend,
309 pty_guard: Option<PtySessionGuard>,
310 ) -> Self {
311 Self {
312 metadata,
313 backend,
314 _pty_guard: pty_guard,
315 }
316 }
317}
318
319#[derive(Clone)]
320pub struct ExecSessionManager {
321 pipe_sessions: PipeSessionManager,
322 pty_sessions: PtySessionManager,
323 sessions: Arc<RwLock<HashMap<ExecSessionId, Arc<ExecSessionRecord>>>>,
324}
325
326impl ExecSessionManager {
327 #[must_use]
328 pub fn new(workspace_root: PathBuf, pty_sessions: PtySessionManager) -> Self {
329 Self {
330 pipe_sessions: PipeSessionManager::new(workspace_root),
331 pty_sessions,
332 sessions: Arc::new(RwLock::new(HashMap::new())),
333 }
334 }
335
336 pub(crate) async fn create_pipe_session(
337 &self,
338 session_id: ExecSessionId,
339 command: Vec<String>,
340 working_dir: PathBuf,
341 env: HashMap<String, String>,
342 ) -> Result<VTCodeExecSession> {
343 self.ensure_session_absent(&session_id).await?;
344 let metadata = self
345 .pipe_sessions
346 .create_session(session_id, command, working_dir, env)
347 .await?;
348 self.insert_session(metadata.clone(), ExecSessionBackend::Pipe, None)
349 .await?;
350 Ok(metadata)
351 }
352
353 pub(crate) async fn create_pty_session(
354 &self,
355 session_id: ExecSessionId,
356 command: Vec<String>,
357 working_dir: PathBuf,
358 size: PtySize,
359 extra_env: HashMap<String, String>,
360 zsh_exec_bridge: Option<ZshExecBridgeSession>,
361 ) -> Result<VTCodeExecSession> {
362 self.ensure_session_absent(&session_id).await?;
363 let pty_guard = self.pty_sessions.start_session()?;
364 let metadata = self.pty_sessions.manager().create_session_with_bridge(
365 session_id.clone().into(),
366 command,
367 working_dir,
368 size,
369 extra_env,
370 zsh_exec_bridge,
371 )?;
372 let exec_metadata = VTCodeExecSession::from(metadata);
373 self.insert_session(
374 exec_metadata.clone(),
375 ExecSessionBackend::Pty,
376 Some(pty_guard),
377 )
378 .await?;
379 Ok(exec_metadata)
380 }
381
382 pub(crate) async fn snapshot_session(&self, session_id: &str) -> Result<VTCodeExecSession> {
383 let record = self.session_record(session_id).await?;
384 match record.backend {
385 ExecSessionBackend::Pipe => {
386 self.pipe_sessions
387 .session_record(session_id)
388 .await
389 .map(|r| {
390 let mut metadata = r.metadata.clone();
391 let exit_code = if r.handle.has_exited() {
392 r.handle.exit_code()
393 } else {
394 None
395 };
396 metadata.exit_code = exit_code;
397 metadata.lifecycle_state = Some(if exit_code.is_some() {
398 crate::tools::types::VTCodeSessionLifecycleState::Exited
399 } else {
400 crate::tools::types::VTCodeSessionLifecycleState::Running
401 });
402 metadata
403 })
404 }
405 ExecSessionBackend::Pty => self
406 .pty_sessions
407 .manager()
408 .snapshot_session(session_id)
409 .map(VTCodeExecSession::from),
410 }
411 }
412
413 pub(crate) async fn list_sessions(&self) -> Vec<VTCodeExecSession> {
414 let sessions = self.sessions.read().await;
415 let mut listed = sessions
416 .values()
417 .map(|record| record.metadata.clone())
418 .collect::<Vec<_>>();
419 listed.sort_by(|left, right| left.id.cmp(&right.id));
420 listed
421 }
422
423 pub(crate) async fn read_session_output(
424 &self,
425 session_id: &str,
426 drain: bool,
427 ) -> Result<Option<String>> {
428 let record = self.session_record(session_id).await?;
429 match record.backend {
430 ExecSessionBackend::Pipe => {
431 self.pipe_sessions
432 .read_session_output(session_id, drain)
433 .await
434 }
435 ExecSessionBackend::Pty => self
436 .pty_sessions
437 .manager()
438 .read_session_output(session_id, drain),
439 }
440 }
441
442 pub(crate) async fn send_input_to_session(
443 &self,
444 session_id: &str,
445 data: &[u8],
446 append_newline: bool,
447 ) -> Result<usize> {
448 let record = self.session_record(session_id).await?;
449 match record.backend {
450 ExecSessionBackend::Pipe => {
451 self.pipe_sessions
452 .send_input_to_session(session_id, data, append_newline)
453 .await
454 }
455 ExecSessionBackend::Pty => {
456 self.pty_sessions
457 .manager()
458 .send_input_to_session(session_id, data, append_newline)
459 }
460 }
461 }
462
463 pub(crate) async fn is_session_completed(&self, session_id: &str) -> Result<Option<i32>> {
464 let record = self.session_record(session_id).await?;
465 match record.backend {
466 ExecSessionBackend::Pipe => self.pipe_sessions.is_session_completed(session_id).await,
467 ExecSessionBackend::Pty => self.pty_sessions.manager().is_session_completed(session_id),
468 }
469 }
470
471 pub(crate) async fn activity_receiver(
472 &self,
473 session_id: &str,
474 ) -> Result<Option<watch::Receiver<u64>>> {
475 let record = self.session_record(session_id).await?;
476 match record.backend {
477 ExecSessionBackend::Pipe => self
478 .pipe_sessions
479 .activity_receiver(session_id)
480 .await
481 .map(Some),
482 ExecSessionBackend::Pty => Ok(None),
483 }
484 }
485
486 pub(crate) async fn is_output_drained(&self, session_id: &str) -> Result<bool> {
487 let record = self.session_record(session_id).await?;
488 match record.backend {
489 ExecSessionBackend::Pipe => self.pipe_sessions.is_output_drained(session_id).await,
490 ExecSessionBackend::Pty => Ok(true),
491 }
492 }
493
494 pub(crate) async fn terminate_session(&self, session_id: &str) -> Result<()> {
495 let record = self.session_record(session_id).await?;
496 match record.backend {
497 ExecSessionBackend::Pipe => self.pipe_sessions.terminate_session(session_id).await,
498 ExecSessionBackend::Pty => self.pty_sessions.manager().terminate_session(session_id),
499 }
500 }
501
502 pub(crate) async fn close_session(&self, session_id: &str) -> Result<VTCodeExecSession> {
503 let record = {
504 let mut sessions = self.sessions.write().await;
505 sessions
506 .remove(session_id)
507 .ok_or_else(|| anyhow!("exec session '{}' not found", session_id))?
508 };
509
510 let metadata = match record.backend {
511 ExecSessionBackend::Pipe => self.pipe_sessions.close_session(session_id).await?,
512 ExecSessionBackend::Pty => self
513 .pty_sessions
514 .manager()
515 .close_session(session_id)
516 .map(VTCodeExecSession::from)?,
517 };
518
519 Ok(metadata)
520 }
521
522 pub(crate) async fn prune_exited_session(
523 &self,
524 session_id: &str,
525 ) -> Result<Option<VTCodeExecSession>> {
526 if self.is_session_completed(session_id).await?.is_some() {
527 return self.close_session(session_id).await.map(Some);
528 }
529 Ok(None)
530 }
531
532 pub(crate) async fn terminate_all_sessions_async(&self) -> Result<()> {
533 let ids = {
534 let sessions = self.sessions.read().await;
535 sessions.keys().cloned().collect::<Vec<_>>()
536 };
537
538 let mut failures = Vec::new();
539 for session_id in ids {
540 if let Err(err) = self.close_session(&session_id).await {
541 failures.push(format!("{session_id}: {err}"));
542 }
543 }
544
545 if let Err(err) = self.pipe_sessions.terminate_all_sessions().await {
546 failures.push(err.to_string());
547 }
548
549 if failures.is_empty() {
550 Ok(())
551 } else {
552 Err(anyhow!(
553 "failed to terminate all exec sessions: {}",
554 failures.join("; ")
555 ))
556 }
557 }
558
559 async fn insert_session(
560 &self,
561 metadata: VTCodeExecSession,
562 backend: ExecSessionBackend,
563 pty_guard: Option<PtySessionGuard>,
564 ) -> Result<()> {
565 let mut sessions = self.sessions.write().await;
566 use hashbrown::hash_map::Entry;
567 match sessions.entry(metadata.id.clone()) {
568 Entry::Occupied(_) => Err(anyhow!(
569 "exec session '{}' already exists",
570 metadata.id.as_str()
571 )),
572 Entry::Vacant(entry) => {
573 entry.insert(Arc::new(ExecSessionRecord::new(
574 metadata, backend, pty_guard,
575 )));
576 Ok(())
577 }
578 }
579 }
580
581 async fn ensure_session_absent(&self, session_id: &str) -> Result<()> {
582 let sessions = self.sessions.read().await;
583 if sessions.contains_key(session_id) {
584 return Err(anyhow!("exec session '{}' already exists", session_id));
585 }
586 Ok(())
587 }
588
589 async fn session_record(&self, session_id: &str) -> Result<Arc<ExecSessionRecord>> {
590 let sessions = self.sessions.read().await;
591 sessions
592 .get(session_id)
593 .cloned()
594 .ok_or_else(|| anyhow!("exec session '{}' not found", session_id))
595 }
596}
597
598#[cfg(test)]
599mod tests {
600 use hashbrown::HashMap;
601 use tempfile::tempdir;
602 use tokio::time::{Duration, timeout};
603
604 use super::ExecSessionManager;
605 use crate::config::PtyConfig;
606 use crate::tools::pty::PtySize;
607 use crate::tools::registry::PtySessionManager;
608 use crate::utils::path::canonicalize_workspace;
609
610 #[tokio::test]
611 #[cfg(all(unix, feature = "tui"))]
612 async fn pty_session_limit_holds_until_exec_session_close() -> anyhow::Result<()> {
613 let temp_dir = tempdir()?;
614 let workspace_root = canonicalize_workspace(temp_dir.path());
615 let pty_sessions = PtySessionManager::new(
616 workspace_root.clone(),
617 PtyConfig {
618 max_sessions: 1,
619 ..Default::default()
620 },
621 );
622 let manager = ExecSessionManager::new(workspace_root.clone(), pty_sessions);
623 let size = PtySize {
624 rows: 24,
625 cols: 80,
626 pixel_width: 0,
627 pixel_height: 0,
628 };
629
630 manager
631 .create_pty_session(
632 "run-1".to_string().into(),
633 vec![
634 "/bin/sh".to_string(),
635 "-c".to_string(),
636 "sleep 1".to_string(),
637 ],
638 workspace_root.clone(),
639 size,
640 HashMap::new(),
641 None,
642 )
643 .await?;
644
645 let second = manager
646 .create_pty_session(
647 "run-2".to_string().into(),
648 vec![
649 "/bin/sh".to_string(),
650 "-c".to_string(),
651 "sleep 1".to_string(),
652 ],
653 workspace_root.clone(),
654 size,
655 HashMap::new(),
656 None,
657 )
658 .await;
659 assert!(second.is_err());
660 assert!(
661 second
662 .unwrap_err()
663 .to_string()
664 .contains("Maximum PTY sessions")
665 );
666
667 manager.close_session("run-1").await?;
668 manager
669 .create_pty_session(
670 "run-3".to_string().into(),
671 vec![
672 "/bin/sh".to_string(),
673 "-c".to_string(),
674 "sleep 1".to_string(),
675 ],
676 workspace_root,
677 size,
678 HashMap::new(),
679 None,
680 )
681 .await?;
682 manager.close_session("run-3").await?;
683
684 Ok(())
685 }
686
687 #[tokio::test]
688 #[cfg(unix)]
689 async fn pipe_session_activity_receiver_notifies_on_output() -> anyhow::Result<()> {
690 let temp_dir = tempdir()?;
691 let workspace_root = canonicalize_workspace(temp_dir.path());
692 let pty_sessions = PtySessionManager::new(workspace_root.clone(), PtyConfig::default());
693 let manager = ExecSessionManager::new(workspace_root.clone(), pty_sessions);
694
695 manager
696 .create_pipe_session(
697 "run-1".to_string().into(),
698 vec![
699 "/bin/sh".to_string(),
700 "-c".to_string(),
701 "printf hello".to_string(),
702 ],
703 workspace_root,
704 HashMap::new(),
705 )
706 .await?;
707
708 let mut activity_rx = manager
709 .activity_receiver("run-1")
710 .await?
711 .expect("pipe sessions should expose activity receiver");
712
713 timeout(Duration::from_secs(2), activity_rx.changed()).await??;
714 let output = manager
715 .read_session_output("run-1", true)
716 .await?
717 .expect("session output");
718 assert!(output.contains("hello"));
719
720 manager.close_session("run-1").await?;
721 Ok(())
722 }
723}