use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, mpsc};
use anyhow::{Result, Context};
use uuid::Uuid;
use std::path::PathBuf;
use portable_pty::{PtySize, CommandBuilder};
pub struct NativeSession {
id: String,
name: String,
#[allow(dead_code)]
pty_master: Option<Box<dyn portable_pty::MasterPty + Send>>,
child: Option<Box<dyn portable_pty::Child + Send + Sync>>,
output_buffer: Arc<Mutex<Vec<u8>>>,
input_tx: mpsc::Sender<Vec<u8>>,
input_rx: Option<mpsc::Receiver<Vec<u8>>>,
window_size: PtySize,
working_dir: PathBuf,
env_vars: HashMap<String, String>,
status: Arc<RwLock<SessionStatus>>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum SessionStatus {
Created,
Running,
Paused,
Stopped,
Error(String),
}
impl NativeSession {
pub fn new(name: &str) -> Result<Self> {
let id = Uuid::new_v4().to_string();
let window_size = PtySize {
rows: 24,
cols: 80,
pixel_width: 0,
pixel_height: 0,
};
let (input_tx, input_rx) = mpsc::channel::<Vec<u8>>(100);
let session = Self {
id: id.clone(),
name: name.to_string(),
pty_master: None,
child: None,
output_buffer: Arc::new(Mutex::new(Vec::new())),
input_tx,
input_rx: Some(input_rx),
window_size,
working_dir: std::env::current_dir()?,
env_vars: std::env::vars().collect(),
status: Arc::new(RwLock::new(SessionStatus::Created)),
};
Ok(session)
}
pub async fn start(&mut self) -> Result<()> {
self.start_with_command("/bin/bash").await
}
pub async fn start_with_command(&mut self, command: &str) -> Result<()> {
let pty_system = portable_pty::native_pty_system();
let pty_pair = pty_system.openpty(self.window_size)
.context("Failed to open PTY")?;
let mut cmd = CommandBuilder::new(command);
cmd.cwd(&self.working_dir);
for (key, value) in &self.env_vars {
cmd.env(key, value);
}
let child = pty_pair.slave.spawn_command(cmd)
.context("Failed to spawn child process")?;
let reader = pty_pair.master.try_clone_reader()
.context("Failed to clone reader")?;
let writer = pty_pair.master.take_writer()
.context("Failed to take writer")?;
self.child = Some(child);
*self.status.write().await = SessionStatus::Running;
let output_buffer = self.output_buffer.clone();
let status = self.status.clone();
tokio::spawn(async move {
use std::io::Read;
let mut reader = reader;
let mut buffer = vec![0u8; 4096];
loop {
match reader.read(&mut buffer) {
Ok(0) => {
*status.write().await = SessionStatus::Stopped;
break;
}
Ok(n) => {
let mut output = output_buffer.lock().await;
output.extend_from_slice(&buffer[..n]);
if output.len() > 1_048_576 {
let drain_amount = output.len() - 1_048_576;
output.drain(..drain_amount);
}
}
Err(e) => {
log::error!("PTY read error: {}", e);
*status.write().await = SessionStatus::Error(e.to_string());
break;
}
}
}
});
if let Some(mut input_rx) = self.input_rx.take() {
tokio::spawn(async move {
use std::io::Write;
let mut writer = writer;
while let Some(data) = input_rx.recv().await {
if let Err(e) = writer.write_all(&data) {
log::error!("PTY write error: {}", e);
break;
}
let _ = writer.flush();
}
});
}
Ok(())
}
pub async fn send_input(&self, data: &str) -> Result<()> {
self.input_tx.send(data.as_bytes().to_vec()).await
.context("Failed to send input")?;
Ok(())
}
pub async fn get_output(&self, last_n_lines: usize) -> Result<Vec<String>> {
let output = self.output_buffer.lock().await;
let text = String::from_utf8_lossy(&output);
let all_lines: Vec<&str> = text.lines().collect();
let lines: Vec<String> = all_lines.iter()
.rev()
.take(last_n_lines)
.rev()
.map(|s| s.to_string())
.collect();
Ok(lines)
}
pub async fn get_all_output(&self) -> Result<Vec<u8>> {
let output = self.output_buffer.lock().await;
Ok(output.clone())
}
pub async fn clear_output(&self) -> Result<()> {
let mut output = self.output_buffer.lock().await;
output.clear();
Ok(())
}
pub async fn resize(&mut self, rows: u16, cols: u16) -> Result<()> {
self.window_size = PtySize {
rows,
cols,
pixel_width: 0,
pixel_height: 0,
};
Ok(())
}
pub async fn stop(&mut self) -> Result<()> {
if let Some(mut child) = self.child.take() {
child.kill()?;
let _ = child.wait();
}
*self.status.write().await = SessionStatus::Stopped;
Ok(())
}
pub async fn get_status(&self) -> SessionStatus {
self.status.read().await.clone()
}
pub fn id(&self) -> &str {
&self.id
}
pub fn name(&self) -> &str {
&self.name
}
}
pub struct NativeSessionManager {
sessions: Arc<RwLock<HashMap<String, Arc<Mutex<NativeSession>>>>>,
}
impl NativeSessionManager {
pub fn new() -> Self {
Self {
sessions: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn create_session(&self, name: &str) -> Result<Arc<Mutex<NativeSession>>> {
let mut session = NativeSession::new(name)?;
session.start().await?;
let session = Arc::new(Mutex::new(session));
let mut sessions = self.sessions.write().await;
sessions.insert(name.to_string(), session.clone());
Ok(session)
}
pub async fn get_session(&self, name: &str) -> Option<Arc<Mutex<NativeSession>>> {
let sessions = self.sessions.read().await;
sessions.get(name).cloned()
}
pub async fn list_sessions(&self) -> Vec<String> {
let sessions = self.sessions.read().await;
sessions.keys().cloned().collect()
}
pub async fn delete_session(&self, name: &str) -> Result<()> {
let mut sessions = self.sessions.write().await;
if let Some(session) = sessions.remove(name) {
let mut session = session.lock().await;
session.stop().await?;
}
Ok(())
}
pub async fn has_session(&self, name: &str) -> bool {
let sessions = self.sessions.read().await;
sessions.contains_key(name)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_native_session() -> Result<()> {
let mut session = NativeSession::new("test")?;
session.start().await?;
session.send_input("echo 'Hello Native Session'\n").await?;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let output = session.get_output(10).await?;
assert!(!output.is_empty());
let output_bytes = session.get_all_output().await?;
let full_output = String::from_utf8_lossy(&output_bytes);
assert!(full_output.contains("Hello Native Session"));
session.stop().await?;
Ok(())
}
#[tokio::test]
async fn test_session_manager() -> Result<()> {
let manager = NativeSessionManager::new();
let _session = manager.create_session("test-session").await?;
assert!(manager.has_session("test-session").await);
let sessions = manager.list_sessions().await;
assert_eq!(sessions.len(), 1);
assert!(sessions.contains(&"test-session".to_string()));
manager.delete_session("test-session").await?;
assert!(!manager.has_session("test-session").await);
Ok(())
}
}