use anyhow::{Context, Result};
use nix::pty::{openpty, OpenptyResult, Winsize};
use nix::unistd::{close, dup2};
use std::collections::HashMap;
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::process::CommandExt;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::{Child, Command};
use tokio::sync::{Mutex, RwLock};
use uuid::Uuid;
pub struct NativeSession {
id: String,
name: String,
pty_master: RawFd,
child: Option<Child>,
output_buffer: Arc<Mutex<Vec<u8>>>,
input_tx: tokio::sync::mpsc::Sender<Vec<u8>>,
window_size: Winsize,
working_dir: PathBuf,
env_vars: HashMap<String, String>,
status: Arc<RwLock<SessionStatus>>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum SessionStatus {
Created,
Running,
Paused,
Stopped,
Error(String),
}
impl NativeSession {
pub fn new(name: &str) -> Result<Self> {
let id = Uuid::new_v4().to_string();
let window_size = Winsize {
ws_row: 24,
ws_col: 80,
ws_xpixel: 0,
ws_ypixel: 0,
};
let OpenptyResult { master, slave } = openpty(Some(&window_size), None)?;
close(slave)?;
let (input_tx, mut input_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(100);
let session = Self {
id: id.clone(),
name: name.to_string(),
pty_master: master,
child: None,
output_buffer: Arc::new(Mutex::new(Vec::new())),
input_tx,
window_size,
working_dir: std::env::current_dir()?,
env_vars: std::env::vars().collect(),
status: Arc::new(RwLock::new(SessionStatus::Created)),
};
Ok(session)
}
pub async fn start(&mut self) -> Result<()> {
self.start_with_command("/bin/bash").await
}
pub async fn start_with_command(&mut self, command: &str) -> Result<()> {
let OpenptyResult { master, slave } = openpty(Some(&self.window_size), None)?;
self.pty_master = master;
let mut cmd = Command::new(command);
cmd.current_dir(&self.working_dir);
for (key, value) in &self.env_vars {
cmd.env(key, value);
}
unsafe {
cmd.pre_exec(move || {
nix::unistd::setsid()?;
nix::pty::unlockpt(master)?;
let slave_path = nix::pty::ptsname_r(master)?;
let slave_fd = nix::fcntl::open(
&slave_path,
nix::fcntl::OFlag::O_RDWR,
nix::sys::stat::Mode::empty(),
)?;
dup2(slave_fd, 0)?;
dup2(slave_fd, 1)?;
dup2(slave_fd, 2)?;
close(slave_fd)?;
close(slave)?;
Ok(())
});
}
let child = cmd.spawn().context("Failed to spawn child process")?;
self.child = Some(child);
*self.status.write().await = SessionStatus::Running;
let master_fd = self.pty_master;
let output_buffer = self.output_buffer.clone();
let status = self.status.clone();
tokio::spawn(async move {
let mut buffer = vec![0u8; 4096];
let mut file = unsafe {
use std::os::unix::io::FromRawFd;
tokio::fs::File::from_raw_fd(master_fd)
};
loop {
match file.read(&mut buffer).await {
Ok(0) => {
*status.write().await = SessionStatus::Stopped;
break;
}
Ok(n) => {
let mut output = output_buffer.lock().await;
output.extend_from_slice(&buffer[..n]);
if output.len() > 1_048_576 {
output.drain(..output.len() - 1_048_576);
}
}
Err(e) => {
tracing::error!("PTY read error: {}", e);
*status.write().await = SessionStatus::Error(e.to_string());
break;
}
}
}
});
let master_fd = self.pty_master;
let mut input_rx = tokio::sync::mpsc::channel::<Vec<u8>>(100).1;
let input_tx = self.input_tx.clone();
tokio::spawn(async move {
let mut file = unsafe {
use std::os::unix::io::FromRawFd;
tokio::fs::File::from_raw_fd(master_fd)
};
while let Some(data) = input_rx.recv().await {
if let Err(e) = file.write_all(&data).await {
tracing::error!("PTY write error: {}", e);
break;
}
}
});
close(slave)?;
Ok(())
}
pub async fn send_input(&self, data: &str) -> Result<()> {
self.input_tx
.send(data.as_bytes().to_vec())
.await
.context("Failed to send input")?;
Ok(())
}
pub async fn get_output(&self, last_n_lines: usize) -> Result<Vec<String>> {
let output = self.output_buffer.lock().await;
let text = String::from_utf8_lossy(&output);
let lines: Vec<String> = text
.lines()
.rev()
.take(last_n_lines)
.rev()
.map(|s| s.to_string())
.collect();
Ok(lines)
}
pub async fn get_all_output(&self) -> Result<Vec<u8>> {
let output = self.output_buffer.lock().await;
Ok(output.clone())
}
pub async fn clear_output(&self) -> Result<()> {
let mut output = self.output_buffer.lock().await;
output.clear();
Ok(())
}
pub async fn resize(&self, rows: u16, cols: u16) -> Result<()> {
let window_size = Winsize {
ws_row: rows,
ws_col: cols,
ws_xpixel: 0,
ws_ypixel: 0,
};
let _ = window_size;
Ok(())
}
pub async fn stop(&mut self) -> Result<()> {
if let Some(mut child) = self.child.take() {
child.kill().await?;
}
*self.status.write().await = SessionStatus::Stopped;
Ok(())
}
pub async fn get_status(&self) -> SessionStatus {
self.status.read().await.clone()
}
pub fn id(&self) -> &str {
&self.id
}
pub fn name(&self) -> &str {
&self.name
}
}
pub struct NativeSessionManager {
sessions: Arc<RwLock<HashMap<String, Arc<Mutex<NativeSession>>>>>,
}
impl NativeSessionManager {
pub fn new() -> Self {
Self {
sessions: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn create_session(&self, name: &str) -> Result<Arc<Mutex<NativeSession>>> {
let mut session = NativeSession::new(name)?;
session.start().await?;
let session = Arc::new(Mutex::new(session));
let mut sessions = self.sessions.write().await;
sessions.insert(name.to_string(), session.clone());
Ok(session)
}
pub async fn get_session(&self, name: &str) -> Option<Arc<Mutex<NativeSession>>> {
let sessions = self.sessions.read().await;
sessions.get(name).cloned()
}
pub async fn list_sessions(&self) -> Vec<String> {
let sessions = self.sessions.read().await;
sessions.keys().cloned().collect()
}
pub async fn delete_session(&self, name: &str) -> Result<()> {
let mut sessions = self.sessions.write().await;
if let Some(session) = sessions.remove(name) {
let mut session = session.lock().await;
session.stop().await?;
}
Ok(())
}
pub async fn has_session(&self, name: &str) -> bool {
let sessions = self.sessions.read().await;
sessions.contains_key(name)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg_attr(not(feature = "native-pty-tests"), ignore)]
#[tokio::test]
async fn test_native_session() -> Result<()> {
let mut session = NativeSession::new("test")?;
session.start().await?;
session.send_input("echo 'Hello Native Session'\n").await?;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let output = session.get_output(10).await?;
assert!(!output.is_empty());
let full_output = String::from_utf8_lossy(&session.get_all_output().await?);
assert!(full_output.contains("Hello Native Session"));
session.stop().await?;
Ok(())
}
#[tokio::test]
async fn test_session_manager() -> Result<()> {
let manager = NativeSessionManager::new();
let session = manager.create_session("test-session").await?;
assert!(manager.has_session("test-session").await);
let sessions = manager.list_sessions().await;
assert_eq!(sessions.len(), 1);
assert!(sessions.contains(&"test-session".to_string()));
manager.delete_session("test-session").await?;
assert!(!manager.has_session("test-session").await);
Ok(())
}
}