Skip to main content

ai_session/
native_portable.rs

1//! Native session management using portable-pty
2
3use anyhow::{Context, Result};
4use portable_pty::{CommandBuilder, PtySize};
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::sync::{Mutex, RwLock, mpsc};
9use uuid::Uuid;
10
11/// Native session that replaces tmux functionality
12pub struct NativeSession {
13    /// Session ID
14    id: String,
15    /// Session name
16    name: String,
17    /// PTY master
18    #[allow(dead_code)]
19    pty_master: Option<Box<dyn portable_pty::MasterPty + Send>>,
20    /// Child process
21    child: Option<Box<dyn portable_pty::Child + Send + Sync>>,
22    /// Output buffer
23    output_buffer: Arc<Mutex<Vec<u8>>>,
24    /// Input channel
25    input_tx: mpsc::Sender<Vec<u8>>,
26    input_rx: Option<mpsc::Receiver<Vec<u8>>>,
27    /// Window size
28    window_size: PtySize,
29    /// Working directory
30    working_dir: PathBuf,
31    /// Environment variables
32    env_vars: HashMap<String, String>,
33    /// Session status
34    status: Arc<RwLock<SessionStatus>>,
35}
36
37#[derive(Debug, Clone, PartialEq)]
38pub enum SessionStatus {
39    /// Session is created but not started
40    Created,
41    /// Session is running
42    Running,
43    /// Session is paused
44    Paused,
45    /// Session has stopped
46    Stopped,
47    /// Session encountered an error
48    Error(String),
49}
50
51impl NativeSession {
52    /// Create a new native session
53    pub fn new(name: &str) -> Result<Self> {
54        let id = Uuid::new_v4().to_string();
55        let window_size = PtySize {
56            rows: 24,
57            cols: 80,
58            pixel_width: 0,
59            pixel_height: 0,
60        };
61
62        let (input_tx, input_rx) = mpsc::channel::<Vec<u8>>(100);
63
64        let session = Self {
65            id: id.clone(),
66            name: name.to_string(),
67            pty_master: None,
68            child: None,
69            output_buffer: Arc::new(Mutex::new(Vec::new())),
70            input_tx,
71            input_rx: Some(input_rx),
72            window_size,
73            working_dir: std::env::current_dir()?,
74            env_vars: std::env::vars().collect(),
75            status: Arc::new(RwLock::new(SessionStatus::Created)),
76        };
77
78        Ok(session)
79    }
80
81    /// Start the session with a shell
82    pub async fn start(&mut self) -> Result<()> {
83        self.start_with_command("/bin/bash").await
84    }
85
86    /// Start the session with a specific command
87    pub async fn start_with_command(&mut self, command: &str) -> Result<()> {
88        // Create PTY system
89        let pty_system = portable_pty::native_pty_system();
90
91        // Create PTY pair
92        let pty_pair = pty_system
93            .openpty(self.window_size)
94            .context("Failed to open PTY")?;
95
96        // Create command
97        let mut cmd = CommandBuilder::new(command);
98        cmd.cwd(&self.working_dir);
99
100        // Set environment variables
101        for (key, value) in &self.env_vars {
102            cmd.env(key, value);
103        }
104
105        // Spawn child process
106        let child = pty_pair
107            .slave
108            .spawn_command(cmd)
109            .context("Failed to spawn child process")?;
110
111        // Get reader and writer from the master
112        let reader = pty_pair
113            .master
114            .try_clone_reader()
115            .context("Failed to clone reader")?;
116        let writer = pty_pair
117            .master
118            .take_writer()
119            .context("Failed to take writer")?;
120
121        // Store the child process
122        self.child = Some(child);
123
124        // Note: We can't store the master after taking the writer from it
125        // The master is consumed by take_writer(), so we'll handle resize differently
126
127        // Update status
128        *self.status.write().await = SessionStatus::Running;
129
130        // Start output reader task
131        let output_buffer = self.output_buffer.clone();
132        let status = self.status.clone();
133
134        tokio::spawn(async move {
135            use std::io::Read;
136            let mut reader = reader;
137            let mut buffer = vec![0u8; 4096];
138
139            loop {
140                match reader.read(&mut buffer) {
141                    Ok(0) => {
142                        // EOF
143                        *status.write().await = SessionStatus::Stopped;
144                        break;
145                    }
146                    Ok(n) => {
147                        let mut output = output_buffer.lock().await;
148                        output.extend_from_slice(&buffer[..n]);
149
150                        // Keep buffer size reasonable (1MB max)
151                        if output.len() > 1_048_576 {
152                            let drain_amount = output.len() - 1_048_576;
153                            output.drain(..drain_amount);
154                        }
155                    }
156                    Err(e) => {
157                        tracing::error!("PTY read error: {}", e);
158                        *status.write().await = SessionStatus::Error(e.to_string());
159                        break;
160                    }
161                }
162            }
163        });
164
165        // Start input writer task
166        if let Some(mut input_rx) = self.input_rx.take() {
167            tokio::spawn(async move {
168                use std::io::Write;
169                let mut writer = writer;
170
171                while let Some(data) = input_rx.recv().await {
172                    if let Err(e) = writer.write_all(&data) {
173                        tracing::error!("PTY write error: {}", e);
174                        break;
175                    }
176                    let _ = writer.flush();
177                }
178            });
179        }
180
181        Ok(())
182    }
183
184    /// Send input to the session
185    pub async fn send_input(&self, data: &str) -> Result<()> {
186        self.input_tx
187            .send(data.as_bytes().to_vec())
188            .await
189            .context("Failed to send input")?;
190        Ok(())
191    }
192
193    /// Get recent output
194    pub async fn get_output(&self, last_n_lines: usize) -> Result<Vec<String>> {
195        let output = self.output_buffer.lock().await;
196        let text = String::from_utf8_lossy(&output);
197
198        let all_lines: Vec<&str> = text.lines().collect();
199        let lines: Vec<String> = all_lines
200            .iter()
201            .rev()
202            .take(last_n_lines)
203            .rev()
204            .map(|s| s.to_string())
205            .collect();
206
207        Ok(lines)
208    }
209
210    /// Get all output
211    pub async fn get_all_output(&self) -> Result<Vec<u8>> {
212        let output = self.output_buffer.lock().await;
213        Ok(output.clone())
214    }
215
216    /// Clear output buffer
217    pub async fn clear_output(&self) -> Result<()> {
218        let mut output = self.output_buffer.lock().await;
219        output.clear();
220        Ok(())
221    }
222
223    /// Resize the terminal
224    pub async fn resize(&mut self, rows: u16, cols: u16) -> Result<()> {
225        self.window_size = PtySize {
226            rows,
227            cols,
228            pixel_width: 0,
229            pixel_height: 0,
230        };
231
232        // Note: Since we don't store the pty_master after taking the writer,
233        // we can't resize the PTY. This is a limitation of the current design.
234        // In a production implementation, we would need to handle this differently.
235
236        Ok(())
237    }
238
239    /// Stop the session
240    pub async fn stop(&mut self) -> Result<()> {
241        if let Some(mut child) = self.child.take() {
242            child.kill()?;
243            let _ = child.wait();
244        }
245
246        *self.status.write().await = SessionStatus::Stopped;
247        Ok(())
248    }
249
250    /// Get session status
251    pub async fn get_status(&self) -> SessionStatus {
252        self.status.read().await.clone()
253    }
254
255    /// Get session ID
256    pub fn id(&self) -> &str {
257        &self.id
258    }
259
260    /// Get session name
261    pub fn name(&self) -> &str {
262        &self.name
263    }
264}
265
266/// Native session manager that replaces tmux
267pub struct NativeSessionManager {
268    /// Active sessions
269    sessions: Arc<RwLock<HashMap<String, Arc<Mutex<NativeSession>>>>>,
270}
271
272impl Default for NativeSessionManager {
273    fn default() -> Self {
274        Self::new()
275    }
276}
277
278impl NativeSessionManager {
279    /// Create new session manager
280    pub fn new() -> Self {
281        Self {
282            sessions: Arc::new(RwLock::new(HashMap::new())),
283        }
284    }
285
286    /// Create a new session
287    pub async fn create_session(&self, name: &str) -> Result<Arc<Mutex<NativeSession>>> {
288        let mut session = NativeSession::new(name)?;
289        session.start().await?;
290
291        let session = Arc::new(Mutex::new(session));
292        let mut sessions = self.sessions.write().await;
293        sessions.insert(name.to_string(), session.clone());
294
295        Ok(session)
296    }
297
298    /// Get session by name
299    pub async fn get_session(&self, name: &str) -> Option<Arc<Mutex<NativeSession>>> {
300        let sessions = self.sessions.read().await;
301        sessions.get(name).cloned()
302    }
303
304    /// List all sessions
305    pub async fn list_sessions(&self) -> Vec<String> {
306        let sessions = self.sessions.read().await;
307        sessions.keys().cloned().collect()
308    }
309
310    /// Delete session
311    pub async fn delete_session(&self, name: &str) -> Result<()> {
312        let mut sessions = self.sessions.write().await;
313        if let Some(session) = sessions.remove(name) {
314            let mut session = session.lock().await;
315            session.stop().await?;
316        }
317        Ok(())
318    }
319
320    /// Check if session exists
321    pub async fn has_session(&self, name: &str) -> bool {
322        let sessions = self.sessions.read().await;
323        sessions.contains_key(name)
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330
331    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
332    #[ignore] // Ignore this test as it requires a proper terminal environment
333    async fn test_native_session() -> Result<()> {
334        // This test is ignored by default because:
335        // 1. It requires a proper terminal/PTY environment
336        // 2. It can hang in CI environments without proper PTY support
337        // 3. The portable-pty library has known issues with certain environments
338
339        let mut session = NativeSession::new("test")?;
340        session.start().await?;
341
342        // Send a command
343        session.send_input("echo 'Hello Native Session'\n").await?;
344
345        // Wait a bit for output
346        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
347
348        // Get output
349        let output = session.get_output(10).await?;
350        assert!(!output.is_empty());
351
352        // Check that our echo appears in output
353        let output_bytes = session.get_all_output().await?;
354        let full_output = String::from_utf8_lossy(&output_bytes);
355        assert!(full_output.contains("Hello Native Session"));
356
357        session.stop().await?;
358        Ok(())
359    }
360
361    #[cfg_attr(not(feature = "native-pty-tests"), ignore)]
362    #[tokio::test]
363    async fn test_session_manager() -> Result<()> {
364        let manager = NativeSessionManager::new();
365
366        // Create session
367        let _session = manager.create_session("test-session").await?;
368
369        // Check it exists
370        assert!(manager.has_session("test-session").await);
371
372        // List sessions
373        let sessions = manager.list_sessions().await;
374        assert_eq!(sessions.len(), 1);
375        assert!(sessions.contains(&"test-session".to_string()));
376
377        // Delete session
378        manager.delete_session("test-session").await?;
379        assert!(!manager.has_session("test-session").await);
380
381        Ok(())
382    }
383}