ai_session/
native_portable.rs1use anyhow::{Context, Result};
4use portable_pty::{CommandBuilder, PtySize};
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::sync::{Mutex, RwLock, mpsc};
9use uuid::Uuid;
10
11pub struct NativeSession {
13 id: String,
15 name: String,
17 #[allow(dead_code)]
19 pty_master: Option<Box<dyn portable_pty::MasterPty + Send>>,
20 child: Option<Box<dyn portable_pty::Child + Send + Sync>>,
22 output_buffer: Arc<Mutex<Vec<u8>>>,
24 input_tx: mpsc::Sender<Vec<u8>>,
26 input_rx: Option<mpsc::Receiver<Vec<u8>>>,
27 window_size: PtySize,
29 working_dir: PathBuf,
31 env_vars: HashMap<String, String>,
33 status: Arc<RwLock<SessionStatus>>,
35}
36
37#[derive(Debug, Clone, PartialEq)]
38pub enum SessionStatus {
39 Created,
41 Running,
43 Paused,
45 Stopped,
47 Error(String),
49}
50
51impl NativeSession {
52 pub fn new(name: &str) -> Result<Self> {
54 let id = Uuid::new_v4().to_string();
55 let window_size = PtySize {
56 rows: 24,
57 cols: 80,
58 pixel_width: 0,
59 pixel_height: 0,
60 };
61
62 let (input_tx, input_rx) = mpsc::channel::<Vec<u8>>(100);
63
64 let session = Self {
65 id: id.clone(),
66 name: name.to_string(),
67 pty_master: None,
68 child: None,
69 output_buffer: Arc::new(Mutex::new(Vec::new())),
70 input_tx,
71 input_rx: Some(input_rx),
72 window_size,
73 working_dir: std::env::current_dir()?,
74 env_vars: std::env::vars().collect(),
75 status: Arc::new(RwLock::new(SessionStatus::Created)),
76 };
77
78 Ok(session)
79 }
80
81 pub async fn start(&mut self) -> Result<()> {
83 self.start_with_command("/bin/bash").await
84 }
85
86 pub async fn start_with_command(&mut self, command: &str) -> Result<()> {
88 let pty_system = portable_pty::native_pty_system();
90
91 let pty_pair = pty_system
93 .openpty(self.window_size)
94 .context("Failed to open PTY")?;
95
96 let mut cmd = CommandBuilder::new(command);
98 cmd.cwd(&self.working_dir);
99
100 for (key, value) in &self.env_vars {
102 cmd.env(key, value);
103 }
104
105 let child = pty_pair
107 .slave
108 .spawn_command(cmd)
109 .context("Failed to spawn child process")?;
110
111 let reader = pty_pair
113 .master
114 .try_clone_reader()
115 .context("Failed to clone reader")?;
116 let writer = pty_pair
117 .master
118 .take_writer()
119 .context("Failed to take writer")?;
120
121 self.child = Some(child);
123
124 *self.status.write().await = SessionStatus::Running;
129
130 let output_buffer = self.output_buffer.clone();
132 let status = self.status.clone();
133
134 tokio::spawn(async move {
135 use std::io::Read;
136 let mut reader = reader;
137 let mut buffer = vec![0u8; 4096];
138
139 loop {
140 match reader.read(&mut buffer) {
141 Ok(0) => {
142 *status.write().await = SessionStatus::Stopped;
144 break;
145 }
146 Ok(n) => {
147 let mut output = output_buffer.lock().await;
148 output.extend_from_slice(&buffer[..n]);
149
150 if output.len() > 1_048_576 {
152 let drain_amount = output.len() - 1_048_576;
153 output.drain(..drain_amount);
154 }
155 }
156 Err(e) => {
157 tracing::error!("PTY read error: {}", e);
158 *status.write().await = SessionStatus::Error(e.to_string());
159 break;
160 }
161 }
162 }
163 });
164
165 if let Some(mut input_rx) = self.input_rx.take() {
167 tokio::spawn(async move {
168 use std::io::Write;
169 let mut writer = writer;
170
171 while let Some(data) = input_rx.recv().await {
172 if let Err(e) = writer.write_all(&data) {
173 tracing::error!("PTY write error: {}", e);
174 break;
175 }
176 let _ = writer.flush();
177 }
178 });
179 }
180
181 Ok(())
182 }
183
184 pub async fn send_input(&self, data: &str) -> Result<()> {
186 self.input_tx
187 .send(data.as_bytes().to_vec())
188 .await
189 .context("Failed to send input")?;
190 Ok(())
191 }
192
193 pub async fn get_output(&self, last_n_lines: usize) -> Result<Vec<String>> {
195 let output = self.output_buffer.lock().await;
196 let text = String::from_utf8_lossy(&output);
197
198 let all_lines: Vec<&str> = text.lines().collect();
199 let lines: Vec<String> = all_lines
200 .iter()
201 .rev()
202 .take(last_n_lines)
203 .rev()
204 .map(|s| s.to_string())
205 .collect();
206
207 Ok(lines)
208 }
209
210 pub async fn get_all_output(&self) -> Result<Vec<u8>> {
212 let output = self.output_buffer.lock().await;
213 Ok(output.clone())
214 }
215
216 pub async fn clear_output(&self) -> Result<()> {
218 let mut output = self.output_buffer.lock().await;
219 output.clear();
220 Ok(())
221 }
222
223 pub async fn resize(&mut self, rows: u16, cols: u16) -> Result<()> {
225 self.window_size = PtySize {
226 rows,
227 cols,
228 pixel_width: 0,
229 pixel_height: 0,
230 };
231
232 Ok(())
237 }
238
239 pub async fn stop(&mut self) -> Result<()> {
241 if let Some(mut child) = self.child.take() {
242 child.kill()?;
243 let _ = child.wait();
244 }
245
246 *self.status.write().await = SessionStatus::Stopped;
247 Ok(())
248 }
249
250 pub async fn get_status(&self) -> SessionStatus {
252 self.status.read().await.clone()
253 }
254
255 pub fn id(&self) -> &str {
257 &self.id
258 }
259
260 pub fn name(&self) -> &str {
262 &self.name
263 }
264}
265
266pub struct NativeSessionManager {
268 sessions: Arc<RwLock<HashMap<String, Arc<Mutex<NativeSession>>>>>,
270}
271
272impl Default for NativeSessionManager {
273 fn default() -> Self {
274 Self::new()
275 }
276}
277
278impl NativeSessionManager {
279 pub fn new() -> Self {
281 Self {
282 sessions: Arc::new(RwLock::new(HashMap::new())),
283 }
284 }
285
286 pub async fn create_session(&self, name: &str) -> Result<Arc<Mutex<NativeSession>>> {
288 let mut session = NativeSession::new(name)?;
289 session.start().await?;
290
291 let session = Arc::new(Mutex::new(session));
292 let mut sessions = self.sessions.write().await;
293 sessions.insert(name.to_string(), session.clone());
294
295 Ok(session)
296 }
297
298 pub async fn get_session(&self, name: &str) -> Option<Arc<Mutex<NativeSession>>> {
300 let sessions = self.sessions.read().await;
301 sessions.get(name).cloned()
302 }
303
304 pub async fn list_sessions(&self) -> Vec<String> {
306 let sessions = self.sessions.read().await;
307 sessions.keys().cloned().collect()
308 }
309
310 pub async fn delete_session(&self, name: &str) -> Result<()> {
312 let mut sessions = self.sessions.write().await;
313 if let Some(session) = sessions.remove(name) {
314 let mut session = session.lock().await;
315 session.stop().await?;
316 }
317 Ok(())
318 }
319
320 pub async fn has_session(&self, name: &str) -> bool {
322 let sessions = self.sessions.read().await;
323 sessions.contains_key(name)
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330
331 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
332 #[ignore] async fn test_native_session() -> Result<()> {
334 let mut session = NativeSession::new("test")?;
340 session.start().await?;
341
342 session.send_input("echo 'Hello Native Session'\n").await?;
344
345 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
347
348 let output = session.get_output(10).await?;
350 assert!(!output.is_empty());
351
352 let output_bytes = session.get_all_output().await?;
354 let full_output = String::from_utf8_lossy(&output_bytes);
355 assert!(full_output.contains("Hello Native Session"));
356
357 session.stop().await?;
358 Ok(())
359 }
360
361 #[cfg_attr(not(feature = "native-pty-tests"), ignore)]
362 #[tokio::test]
363 async fn test_session_manager() -> Result<()> {
364 let manager = NativeSessionManager::new();
365
366 let _session = manager.create_session("test-session").await?;
368
369 assert!(manager.has_session("test-session").await);
371
372 let sessions = manager.list_sessions().await;
374 assert_eq!(sessions.len(), 1);
375 assert!(sessions.contains(&"test-session".to_string()));
376
377 manager.delete_session("test-session").await?;
379 assert!(!manager.has_session("test-session").await);
380
381 Ok(())
382 }
383}