Skip to main content

tandem_runtime/
pty.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use serde::Serialize;
5use tokio::io::{AsyncReadExt, AsyncWriteExt};
6use tokio::process::{Child, ChildStdin, Command};
7use tokio::sync::{Mutex, RwLock};
8use uuid::Uuid;
9
10#[derive(Clone)]
11pub struct PtyManager {
12    sessions: Arc<RwLock<HashMap<String, PtySession>>>,
13}
14
15#[derive(Clone)]
16struct PtySession {
17    id: String,
18    output: Arc<RwLock<String>>,
19    stdin: Arc<Mutex<ChildStdin>>,
20    child: Arc<Mutex<Child>>,
21}
22
23#[derive(Debug, Clone, Serialize)]
24pub struct PtyInfo {
25    pub id: String,
26    pub running: bool,
27}
28
29#[derive(Debug, Clone, Serialize)]
30pub struct PtySnapshot {
31    pub id: String,
32    pub output: String,
33    pub running: bool,
34}
35
36impl PtyManager {
37    pub fn new() -> Self {
38        Self {
39            sessions: Arc::new(RwLock::new(HashMap::new())),
40        }
41    }
42
43    pub async fn list(&self) -> Vec<PtyInfo> {
44        let sessions = self.sessions.read().await;
45        let mut out = Vec::new();
46        for session in sessions.values() {
47            let running = session.child.lock().await.id().is_some();
48            out.push(PtyInfo {
49                id: session.id.clone(),
50                running,
51            });
52        }
53        out
54    }
55
56    pub async fn create(&self) -> anyhow::Result<String> {
57        let mut child = Command::new("powershell")
58            .args(["-NoProfile"])
59            .stdin(std::process::Stdio::piped())
60            .stdout(std::process::Stdio::piped())
61            .stderr(std::process::Stdio::piped())
62            .spawn()?;
63        let stdin = child
64            .stdin
65            .take()
66            .ok_or_else(|| anyhow::anyhow!("stdin unavailable"))?;
67        let stdout = child
68            .stdout
69            .take()
70            .ok_or_else(|| anyhow::anyhow!("stdout unavailable"))?;
71        let stderr = child
72            .stderr
73            .take()
74            .ok_or_else(|| anyhow::anyhow!("stderr unavailable"))?;
75
76        let id = Uuid::new_v4().to_string();
77        let output = Arc::new(RwLock::new(String::new()));
78        let output_stdout = output.clone();
79        let output_stderr = output.clone();
80
81        tokio::spawn(async move {
82            read_stream(output_stdout, stdout).await;
83        });
84        tokio::spawn(async move {
85            read_stream(output_stderr, stderr).await;
86        });
87
88        self.sessions.write().await.insert(
89            id.clone(),
90            PtySession {
91                id: id.clone(),
92                output,
93                stdin: Arc::new(Mutex::new(stdin)),
94                child: Arc::new(Mutex::new(child)),
95            },
96        );
97
98        Ok(id)
99    }
100
101    pub async fn write(&self, id: &str, input: &str) -> anyhow::Result<bool> {
102        let session = {
103            let sessions = self.sessions.read().await;
104            sessions.get(id).cloned()
105        };
106        let Some(session) = session else {
107            return Ok(false);
108        };
109        let mut stdin = session.stdin.lock().await;
110        stdin.write_all(input.as_bytes()).await?;
111        stdin.flush().await?;
112        Ok(true)
113    }
114
115    pub async fn snapshot(&self, id: &str) -> Option<PtySnapshot> {
116        let session = {
117            let sessions = self.sessions.read().await;
118            sessions.get(id).cloned()
119        }?;
120        let output = session.output.read().await.clone();
121        let running = session.child.lock().await.id().is_some();
122        Some(PtySnapshot {
123            id: id.to_string(),
124            output,
125            running,
126        })
127    }
128
129    pub async fn read_since(&self, id: &str, offset: usize) -> Option<(String, usize, bool)> {
130        let snapshot = self.snapshot(id).await?;
131        let bytes = snapshot.output.as_bytes();
132        let safe_offset = offset.min(bytes.len());
133        let tail = String::from_utf8_lossy(&bytes[safe_offset..]).to_string();
134        Some((tail, bytes.len(), snapshot.running))
135    }
136
137    pub async fn kill(&self, id: &str) -> anyhow::Result<bool> {
138        let session = self.sessions.write().await.remove(id);
139        let Some(session) = session else {
140            return Ok(false);
141        };
142        let mut child = session.child.lock().await;
143        let _ = child.kill().await;
144        Ok(true)
145    }
146}
147
148impl Default for PtyManager {
149    fn default() -> Self {
150        Self::new()
151    }
152}
153
154async fn read_stream(
155    output: Arc<RwLock<String>>,
156    mut stream: impl tokio::io::AsyncRead + Unpin + Send + 'static,
157) {
158    let mut buf = vec![0_u8; 4096];
159    loop {
160        let read = match stream.read(&mut buf).await {
161            Ok(0) => break,
162            Ok(n) => n,
163            Err(_) => break,
164        };
165        let chunk = String::from_utf8_lossy(&buf[..read]).to_string();
166        let mut out = output.write().await;
167        out.push_str(&chunk);
168        if out.len() > 200_000 {
169            let cut = out.len().saturating_sub(100_000);
170            let tail = out.split_off(cut);
171            *out = tail;
172        }
173    }
174}