use std::collections::{HashMap, VecDeque};
use std::io::{Read, Write};
use std::path::PathBuf;
use std::sync::Arc;
use portable_pty::{native_pty_system, CommandBuilder, PtySize};
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, mpsc, Mutex, RwLock};
use tracing::{debug, info};
pub type SessionId = String;
const DEFAULT_ROWS: u16 = 24;
const DEFAULT_COLS: u16 = 80;
const OUTPUT_CHANNEL_CAPACITY: usize = 256;
const SCROLLBACK_BUFFER_CAPACITY: usize = 64 * 1024;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TerminalSessionInfo {
pub id: SessionId,
pub cwd: String,
pub rows: u16,
pub cols: u16,
pub created_at: String,
#[serde(default)]
pub project_id: String,
#[serde(default)]
pub root: String,
}
#[derive(Debug, Deserialize)]
pub struct CreateTerminalRequest {
pub cwd: String,
pub rows: Option<u16>,
pub cols: Option<u16>,
#[serde(default)]
pub project_id: String,
#[serde(default)]
pub root: String,
}
#[derive(Debug, Deserialize)]
pub struct CreateTerminalFromContextRequest {
pub project_id: String,
pub root: String,
pub rows: Option<u16>,
pub cols: Option<u16>,
}
#[derive(Debug, Deserialize)]
pub struct ResizeTerminalRequest {
pub rows: u16,
pub cols: u16,
}
enum PtyCommand {
Resize { rows: u16, cols: u16 },
Shutdown,
}
type SharedScrollback = Arc<std::sync::Mutex<VecDeque<u8>>>;
struct TerminalSession {
info: TerminalSessionInfo,
writer: Arc<Mutex<Box<dyn Write + Send>>>,
output_tx: broadcast::Sender<Vec<u8>>,
pty_cmd_tx: mpsc::UnboundedSender<PtyCommand>,
scrollback: SharedScrollback,
}
pub struct TerminalManager {
sessions: RwLock<HashMap<SessionId, TerminalSession>>,
}
pub type SharedTerminalManager = Arc<TerminalManager>;
pub fn create_terminal_manager() -> SharedTerminalManager {
Arc::new(TerminalManager {
sessions: RwLock::new(HashMap::new()),
})
}
impl TerminalManager {
pub async fn create_session(
&self,
request: CreateTerminalRequest,
) -> Result<TerminalSessionInfo, String> {
let rows = request.rows.unwrap_or(DEFAULT_ROWS);
let cols = request.cols.unwrap_or(DEFAULT_COLS);
let cwd_path = PathBuf::from(&request.cwd);
if !cwd_path.exists() {
return Err(format!("Working directory does not exist: {}", request.cwd));
}
let session_id = generate_session_id();
let created_at = chrono::Utc::now().to_rfc3339();
info!(
session_id = %session_id,
cwd = %request.cwd,
rows = rows,
cols = cols,
"Creating terminal session"
);
let cwd = request.cwd.clone();
let sid = session_id.clone();
let ts = created_at.clone();
let (pty_cmd_tx, pty_cmd_rx) = mpsc::unbounded_channel::<PtyCommand>();
let (output_tx, _) = broadcast::channel(OUTPUT_CHANNEL_CAPACITY);
let scrollback: SharedScrollback = Arc::new(std::sync::Mutex::new(
VecDeque::with_capacity(SCROLLBACK_BUFFER_CAPACITY),
));
let output_tx_clone = output_tx.clone();
let scrollback_clone = scrollback.clone();
let sid_clone = sid.clone();
let (writer_tx, writer_rx) = tokio::sync::oneshot::channel();
std::thread::spawn(move || {
let pty_system = native_pty_system();
let pair = match pty_system.openpty(PtySize {
rows,
cols,
pixel_width: 0,
pixel_height: 0,
}) {
Ok(pair) => pair,
Err(e) => {
let _ = writer_tx.send(Err(format!("Failed to open PTY: {}", e)));
return;
}
};
let shell = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_string());
let mut cmd = CommandBuilder::new(&shell);
cmd.arg("-l"); cmd.cwd(&cwd);
let _child = match pair.slave.spawn_command(cmd) {
Ok(child) => child,
Err(e) => {
let _ = writer_tx.send(Err(format!("Failed to spawn shell: {}", e)));
return;
}
};
drop(pair.slave);
let reader = match pair.master.try_clone_reader() {
Ok(r) => r,
Err(e) => {
let _ = writer_tx.send(Err(format!("Failed to clone PTY reader: {}", e)));
return;
}
};
let writer = match pair.master.take_writer() {
Ok(w) => w,
Err(e) => {
let _ = writer_tx.send(Err(format!("Failed to take PTY writer: {}", e)));
return;
}
};
let _ = writer_tx.send(Ok(writer));
let tx = output_tx_clone;
let reader_sid = sid_clone.clone();
std::thread::spawn(move || {
read_pty_output(reader, tx, scrollback_clone, reader_sid);
});
let mut pty_cmd_rx = pty_cmd_rx;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
while let Some(cmd) = pty_cmd_rx.recv().await {
match cmd {
PtyCommand::Resize { rows, cols } => {
if let Err(e) = pair.master.resize(PtySize {
rows,
cols,
pixel_width: 0,
pixel_height: 0,
}) {
debug!(session_id = %sid_clone, error = %e, "Failed to resize PTY");
}
}
PtyCommand::Shutdown => {
break;
}
}
}
});
info!(session_id = %sid_clone, "PTY management thread exiting");
});
let writer = writer_rx
.await
.map_err(|_| "PTY thread terminated before sending writer".to_string())??;
let info = TerminalSessionInfo {
id: sid.clone(),
cwd: request.cwd,
rows,
cols,
created_at: ts,
project_id: request.project_id,
root: request.root,
};
let session = TerminalSession {
info: info.clone(),
writer: Arc::new(Mutex::new(writer)),
output_tx,
pty_cmd_tx,
scrollback,
};
self.sessions.write().await.insert(session_id, session);
Ok(info)
}
pub async fn list_sessions(&self) -> Vec<TerminalSessionInfo> {
let sessions = self.sessions.read().await;
sessions.values().map(|s| s.info.clone()).collect()
}
pub async fn delete_session(&self, session_id: &str) -> Result<(), String> {
let session = self
.sessions
.write()
.await
.remove(session_id)
.ok_or_else(|| format!("Session not found: {}", session_id))?;
info!(session_id = %session_id, "Deleting terminal session");
let _ = session.pty_cmd_tx.send(PtyCommand::Shutdown);
drop(session);
Ok(())
}
pub async fn write_input(&self, session_id: &str, data: &[u8]) -> Result<(), String> {
let sessions = self.sessions.read().await;
let session = sessions
.get(session_id)
.ok_or_else(|| format!("Session not found: {}", session_id))?;
let mut writer = session.writer.lock().await;
writer
.write_all(data)
.map_err(|e| format!("Failed to write to PTY: {}", e))?;
writer
.flush()
.map_err(|e| format!("Failed to flush PTY: {}", e))?;
Ok(())
}
pub async fn subscribe_output(
&self,
session_id: &str,
) -> Result<broadcast::Receiver<Vec<u8>>, String> {
let sessions = self.sessions.read().await;
let session = sessions
.get(session_id)
.ok_or_else(|| format!("Session not found: {}", session_id))?;
Ok(session.output_tx.subscribe())
}
pub async fn resize_session(
&self,
session_id: &str,
rows: u16,
cols: u16,
) -> Result<(), String> {
let mut sessions = self.sessions.write().await;
let session = sessions
.get_mut(session_id)
.ok_or_else(|| format!("Session not found: {}", session_id))?;
debug!(session_id = %session_id, rows = rows, cols = cols, "Resizing terminal");
session
.pty_cmd_tx
.send(PtyCommand::Resize { rows, cols })
.map_err(|_| "PTY management thread is gone".to_string())?;
session.info.rows = rows;
session.info.cols = cols;
Ok(())
}
pub async fn session_exists(&self, session_id: &str) -> bool {
self.sessions.read().await.contains_key(session_id)
}
pub async fn get_scrollback(&self, session_id: &str) -> Result<Vec<u8>, String> {
let sessions = self.sessions.read().await;
let session = sessions
.get(session_id)
.ok_or_else(|| format!("Session not found: {}", session_id))?;
let sb = session
.scrollback
.lock()
.map_err(|e| format!("Failed to lock scrollback buffer: {}", e))?;
Ok(sb.iter().copied().collect())
}
}
fn read_pty_output(
mut reader: Box<dyn Read + Send>,
tx: broadcast::Sender<Vec<u8>>,
scrollback: SharedScrollback,
session_id: String,
) {
let mut buf = [0u8; 4096];
loop {
match reader.read(&mut buf) {
Ok(0) => {
debug!(session_id = %session_id, "PTY EOF");
break;
}
Ok(n) => {
let data = buf[..n].to_vec();
if let Ok(mut sb) = scrollback.lock() {
for &byte in &data {
if sb.len() >= SCROLLBACK_BUFFER_CAPACITY {
sb.pop_front();
}
sb.push_back(byte);
}
}
let _ = tx.send(data);
}
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock {
std::thread::sleep(std::time::Duration::from_millis(10));
continue;
}
debug!(session_id = %session_id, error = %e, "PTY read error");
break;
}
}
}
info!(session_id = %session_id, "PTY reader thread exiting");
}
fn generate_session_id() -> String {
use rand::Rng;
let mut rng = rand::thread_rng();
let id: u64 = rng.gen();
format!("term-{:016x}", id)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_create_and_list_sessions() {
let manager = create_terminal_manager();
let info = manager
.create_session(CreateTerminalRequest {
cwd: "/tmp".to_string(),
rows: Some(24),
cols: Some(80),
project_id: String::new(),
root: String::new(),
})
.await
.unwrap();
assert!(info.id.starts_with("term-"));
assert_eq!(info.cwd, "/tmp");
assert_eq!(info.rows, 24);
assert_eq!(info.cols, 80);
let sessions = manager.list_sessions().await;
assert_eq!(sessions.len(), 1);
assert_eq!(sessions[0].id, info.id);
manager.delete_session(&info.id).await.unwrap();
let sessions = manager.list_sessions().await;
assert_eq!(sessions.len(), 0);
}
#[tokio::test]
async fn test_create_session_invalid_cwd() {
let manager = create_terminal_manager();
let result = manager
.create_session(CreateTerminalRequest {
cwd: "/nonexistent/path/that/does/not/exist".to_string(),
rows: None,
cols: None,
project_id: String::new(),
root: String::new(),
})
.await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("does not exist"));
}
#[tokio::test]
async fn test_delete_nonexistent_session() {
let manager = create_terminal_manager();
let result = manager.delete_session("nonexistent").await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("not found"));
}
#[tokio::test]
async fn test_session_exists() {
let manager = create_terminal_manager();
assert!(!manager.session_exists("nonexistent").await);
let info = manager
.create_session(CreateTerminalRequest {
cwd: "/tmp".to_string(),
rows: None,
cols: None,
project_id: String::new(),
root: String::new(),
})
.await
.unwrap();
assert!(manager.session_exists(&info.id).await);
manager.delete_session(&info.id).await.unwrap();
assert!(!manager.session_exists(&info.id).await);
}
#[tokio::test]
async fn test_session_preserves_project_id_and_root() {
let manager = create_terminal_manager();
let info = manager
.create_session(CreateTerminalRequest {
cwd: "/tmp".to_string(),
rows: Some(24),
cols: Some(80),
project_id: "proj1".to_string(),
root: "worktree:feature-x".to_string(),
})
.await
.unwrap();
assert_eq!(info.project_id, "proj1");
assert_eq!(info.root, "worktree:feature-x");
let sessions = manager.list_sessions().await;
assert_eq!(sessions.len(), 1);
assert_eq!(sessions[0].project_id, "proj1");
assert_eq!(sessions[0].root, "worktree:feature-x");
manager.delete_session(&info.id).await.unwrap();
}
#[tokio::test]
async fn test_scrollback_buffer_available() {
let manager = create_terminal_manager();
let info = manager
.create_session(CreateTerminalRequest {
cwd: "/tmp".to_string(),
rows: Some(24),
cols: Some(80),
project_id: String::new(),
root: String::new(),
})
.await
.unwrap();
let scrollback = manager.get_scrollback(&info.id).await.unwrap();
assert!(scrollback.is_empty());
let result = manager.get_scrollback("nonexistent").await;
assert!(result.is_err());
manager.delete_session(&info.id).await.unwrap();
}
}