use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Result};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, RwLock};
use tracing::{debug, error, info};
use super::session::{
ConfirmInfo, ConfirmResponse, PTYSession, PTYSessionOptions, PermissionDecision, SessionEvent,
SessionState,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PTYAgentInfo {
pub slot_id: String,
pub role: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub pid: Option<u32>,
pub state: SessionState,
#[serde(skip_serializing_if = "Option::is_none")]
pub started_at: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub current_task_id: Option<String>,
pub log_file: PathBuf,
}
#[derive(Debug, Clone, Default)]
pub struct PTYSpawnOptions {
pub auto_restart: bool,
}
#[derive(Debug, Clone, Serialize)]
pub struct PTYExecuteResult {
pub response: String,
pub duration_ms: u64,
}
#[derive(Debug, Clone)]
pub struct Slot {
pub id: String,
pub role: String,
pub cwd: Option<PathBuf>,
}
pub trait PermissionPolicy: Send + Sync {
fn check_permission(&self, slot_id: &str, role: &str, tool_name: &str) -> PermissionDecision;
}
pub struct PTYManager {
sessions: Arc<RwLock<HashMap<String, Arc<RwLock<PTYSession>>>>>,
agent_info: Arc<RwLock<HashMap<String, PTYAgentInfo>>>,
logs_dir: PathBuf,
auto_restart_slots: Arc<RwLock<std::collections::HashSet<String>>>,
permission_policy: Arc<RwLock<Option<Arc<dyn PermissionPolicy>>>>,
event_tx: broadcast::Sender<ManagerEvent>,
}
#[derive(Debug, Clone)]
pub enum ManagerEvent {
Spawned { slot_id: String },
StateChange {
slot_id: String,
new_state: SessionState,
prev_state: SessionState,
},
ConfirmRequired {
slot_id: String,
prompt: String,
tool_info: Option<ConfirmInfo>,
},
Exited { slot_id: String, exit_code: i32 },
}
impl PTYManager {
pub fn new(logs_dir: PathBuf) -> Self {
if !logs_dir.exists() {
std::fs::create_dir_all(&logs_dir).ok();
}
let (event_tx, _) = broadcast::channel(1000);
Self {
sessions: Arc::new(RwLock::new(HashMap::new())),
agent_info: Arc::new(RwLock::new(HashMap::new())),
logs_dir,
auto_restart_slots: Arc::new(RwLock::new(std::collections::HashSet::new())),
permission_policy: Arc::new(RwLock::new(None)),
event_tx,
}
}
pub async fn set_permission_policy(&self, policy: Arc<dyn PermissionPolicy>) {
*self.permission_policy.write().await = Some(policy);
info!("Permission policy set");
}
pub async fn init_slot(&self, slot: &Slot) {
let log_file = self.logs_dir.join(format!("pty-{}.log", slot.id));
let info = PTYAgentInfo {
slot_id: slot.id.clone(),
role: slot.role.clone(),
pid: None,
state: SessionState::Exited,
started_at: None,
current_task_id: None,
log_file,
};
self.agent_info.write().await.insert(slot.id.clone(), info);
debug!(slot_id = %slot.id, role = %slot.role, "PTY slot initialized");
}
pub async fn spawn(&self, slot: &Slot, options: PTYSpawnOptions) -> Result<PTYAgentInfo> {
let info = {
let agent_info = self.agent_info.read().await;
agent_info
.get(&slot.id)
.cloned()
.ok_or_else(|| anyhow!("Slot not initialized: {}", slot.id))?
};
{
let sessions = self.sessions.read().await;
if let Some(session) = sessions.get(&slot.id) {
let session = session.read().await;
if session.is_running() {
return Err(anyhow!("PTY session already running: {}", slot.id));
}
}
}
if options.auto_restart {
self.auto_restart_slots.write().await.insert(slot.id.clone());
}
let cwd = slot.cwd.clone().unwrap_or_else(|| {
std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/"))
});
let mut session = PTYSession::new(PTYSessionOptions {
slot_id: slot.id.clone(),
cwd,
env: None,
log_file: Some(info.log_file.clone()),
cols: 120,
rows: 30,
})?;
let policy = self.permission_policy.read().await.clone();
let slot_id = slot.id.clone();
let role = slot.role.clone();
if let Some(policy) = policy {
session
.set_permission_check(move |confirm_info: &ConfirmInfo| {
let tool_name = confirm_info
.tool
.as_ref()
.map(|t| t.name.as_str())
.unwrap_or("");
policy.check_permission(&slot_id, &role, tool_name)
})
.await;
}
let event_tx = self.event_tx.clone();
let slot_id_for_events = slot.id.clone();
let mut session_rx = session.subscribe();
tokio::spawn(async move {
while let Ok(event) = session_rx.recv().await {
match event {
SessionEvent::StateChange {
new_state,
prev_state,
} => {
let _ = event_tx.send(ManagerEvent::StateChange {
slot_id: slot_id_for_events.clone(),
new_state,
prev_state,
});
}
SessionEvent::ConfirmRequired { prompt, info } => {
let _ = event_tx.send(ManagerEvent::ConfirmRequired {
slot_id: slot_id_for_events.clone(),
prompt,
tool_info: info,
});
}
SessionEvent::Exit(code) => {
let _ = event_tx.send(ManagerEvent::Exited {
slot_id: slot_id_for_events.clone(),
exit_code: code,
});
break;
}
_ => {}
}
}
});
session.start().await?;
let pid = session.pid().await;
let state = session.state().await;
{
let mut agent_info = self.agent_info.write().await;
if let Some(info) = agent_info.get_mut(&slot.id) {
info.pid = pid;
info.state = state;
info.started_at = Some(Utc::now().timestamp_millis());
}
}
{
let mut sessions = self.sessions.write().await;
sessions.insert(slot.id.clone(), Arc::new(RwLock::new(session)));
}
info!(slot_id = %slot.id, pid = ?pid, "PTY session started");
let _ = self.event_tx.send(ManagerEvent::Spawned {
slot_id: slot.id.clone(),
});
let manager_sessions = Arc::clone(&self.sessions);
let manager_info = Arc::clone(&self.agent_info);
let manager_auto_restart = Arc::clone(&self.auto_restart_slots);
let manager_policy = Arc::clone(&self.permission_policy);
let manager_event_tx = self.event_tx.clone();
let slot_for_restart = slot.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
let should_restart = {
let sessions = manager_sessions.read().await;
if let Some(session) = sessions.get(&slot_for_restart.id) {
let session = session.read().await;
!session.is_running()
} else {
false
}
};
if should_restart {
let auto_restart = manager_auto_restart
.read()
.await
.contains(&slot_for_restart.id);
if auto_restart {
info!(slot_id = %slot_for_restart.id, "Auto-restarting PTY session");
let cwd = slot_for_restart.cwd.clone().unwrap_or_else(|| {
std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/"))
});
let log_file = {
let info = manager_info.read().await;
info.get(&slot_for_restart.id)
.map(|i| i.log_file.clone())
};
if let Ok(mut new_session) = PTYSession::new(PTYSessionOptions {
slot_id: slot_for_restart.id.clone(),
cwd,
env: None,
log_file,
cols: 120,
rows: 30,
}) {
let policy = manager_policy.read().await.clone();
let slot_id = slot_for_restart.id.clone();
let role = slot_for_restart.role.clone();
if let Some(policy) = policy {
new_session
.set_permission_check(move |confirm_info: &ConfirmInfo| {
let tool_name = confirm_info
.tool
.as_ref()
.map(|t| t.name.as_str())
.unwrap_or("");
policy.check_permission(&slot_id, &role, tool_name)
})
.await;
}
if new_session.start().await.is_ok() {
let pid = new_session.pid().await;
let state = new_session.state().await;
{
let mut info = manager_info.write().await;
if let Some(agent_info) = info.get_mut(&slot_for_restart.id) {
agent_info.pid = pid;
agent_info.state = state;
agent_info.started_at =
Some(Utc::now().timestamp_millis());
}
}
{
let mut sessions = manager_sessions.write().await;
sessions.insert(
slot_for_restart.id.clone(),
Arc::new(RwLock::new(new_session)),
);
}
let _ = manager_event_tx.send(ManagerEvent::Spawned {
slot_id: slot_for_restart.id.clone(),
});
info!(slot_id = %slot_for_restart.id, "Auto-restart successful");
} else {
error!(slot_id = %slot_for_restart.id, "Auto-restart failed");
}
}
}
break;
}
}
});
let agent_info = self.agent_info.read().await;
Ok(agent_info.get(&slot.id).cloned().unwrap())
}
pub async fn send(
&self,
slot_id: &str,
message: &str,
timeout_ms: u64,
) -> Result<PTYExecuteResult> {
let session = {
let sessions = self.sessions.read().await;
sessions
.get(slot_id)
.cloned()
.ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
};
{
let mut agent_info = self.agent_info.write().await;
if let Some(info) = agent_info.get_mut(slot_id) {
info.state = SessionState::Thinking;
}
}
let start = std::time::Instant::now();
let response = {
let session = session.read().await;
session.send(message, timeout_ms).await?
};
let duration_ms = start.elapsed().as_millis() as u64;
info!(
slot_id = slot_id,
message_len = message.len(),
response_len = response.len(),
duration_ms = duration_ms,
"Message sent and response received"
);
Ok(PTYExecuteResult {
response,
duration_ms,
})
}
pub async fn subscribe_session(
&self,
slot_id: &str,
) -> Result<broadcast::Receiver<SessionEvent>> {
let session = {
let sessions = self.sessions.read().await;
sessions
.get(slot_id)
.cloned()
.ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
};
let session = session.read().await;
Ok(session.subscribe())
}
pub async fn execute_task(
&self,
slot: &Slot,
task_id: &str,
prompt: &str,
) -> Result<PTYExecuteResult> {
{
let mut agent_info = self.agent_info.write().await;
if let Some(info) = agent_info.get_mut(&slot.id) {
info.current_task_id = Some(task_id.to_string());
}
}
let result = self.send(&slot.id, prompt, 300_000).await;
{
let mut agent_info = self.agent_info.write().await;
if let Some(info) = agent_info.get_mut(&slot.id) {
info.current_task_id = None;
}
}
result
}
pub async fn confirm(&self, slot_id: &str, response: ConfirmResponse) -> Result<()> {
let session = {
let sessions = self.sessions.read().await;
sessions
.get(slot_id)
.cloned()
.ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
};
let session = session.read().await;
session.confirm(response).await
}
pub async fn write(&self, slot_id: &str, data: &str) -> Result<()> {
let session = {
let sessions = self.sessions.read().await;
sessions
.get(slot_id)
.cloned()
.ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
};
let session = session.read().await;
session.write(data).await
}
pub async fn interrupt(&self, slot_id: &str) -> Result<()> {
let session = {
let sessions = self.sessions.read().await;
sessions
.get(slot_id)
.cloned()
.ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
};
let session = session.read().await;
session.interrupt().await
}
pub async fn get_screen(&self, slot_id: &str) -> Result<String> {
let session = {
let sessions = self.sessions.read().await;
sessions
.get(slot_id)
.cloned()
.ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
};
let session = session.read().await;
Ok(session.get_screen_text().await)
}
pub async fn get_last_lines(&self, slot_id: &str, n: usize) -> Result<Vec<String>> {
let session = {
let sessions = self.sessions.read().await;
sessions
.get(slot_id)
.cloned()
.ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
};
let session = session.read().await;
Ok(session.get_last_lines(n).await)
}
pub async fn get_history(&self, slot_id: &str) -> Vec<super::session::Message> {
let sessions = self.sessions.read().await;
if let Some(session) = sessions.get(slot_id) {
let session = session.read().await;
session.history().await
} else {
Vec::new()
}
}
pub async fn kill(&self, slot_id: &str) -> Result<()> {
self.auto_restart_slots.write().await.remove(slot_id);
let session = {
let mut sessions = self.sessions.write().await;
sessions.remove(slot_id)
};
if let Some(session) = session {
let mut session = session.write().await;
session.close().await?;
}
{
let mut agent_info = self.agent_info.write().await;
if let Some(info) = agent_info.get_mut(slot_id) {
info.state = SessionState::Exited;
info.pid = None;
}
}
info!(slot_id = slot_id, "PTY session killed");
Ok(())
}
pub async fn restart(&self, slot: &Slot, options: PTYSpawnOptions) -> Result<PTYAgentInfo> {
self.kill(&slot.id).await?;
self.spawn(slot, options).await
}
pub async fn get_status(&self, slot_id: &str) -> Option<PTYAgentInfo> {
self.agent_info.read().await.get(slot_id).cloned()
}
pub async fn get_all_status(&self) -> Vec<PTYAgentInfo> {
self.agent_info.read().await.values().cloned().collect()
}
pub async fn is_available(&self, slot_id: &str) -> bool {
if let Some(info) = self.agent_info.read().await.get(slot_id) {
info.state == SessionState::Idle
} else {
false
}
}
pub async fn is_running(&self, slot_id: &str) -> bool {
let sessions = self.sessions.read().await;
if let Some(session) = sessions.get(slot_id) {
let session = session.read().await;
session.is_running()
} else {
false
}
}
pub async fn get_stats(&self) -> ManagerStats {
let mut stats = ManagerStats::default();
let agent_info = self.agent_info.read().await;
stats.total = agent_info.len();
for info in agent_info.values() {
match info.state {
SessionState::Idle => {
stats.idle += 1;
stats.running += 1;
}
SessionState::Thinking
| SessionState::Responding
| SessionState::ToolRunning
| SessionState::Confirming => {
stats.busy += 1;
stats.running += 1;
}
SessionState::Starting => {
stats.running += 1;
}
SessionState::Exited | SessionState::Error => {
stats.stopped += 1;
}
}
}
stats
}
pub fn subscribe(&self) -> broadcast::Receiver<ManagerEvent> {
self.event_tx.subscribe()
}
pub async fn shutdown(&self) {
info!("Shutting down all PTY sessions...");
self.auto_restart_slots.write().await.clear();
let slot_ids: Vec<String> = {
let sessions = self.sessions.read().await;
sessions.keys().cloned().collect()
};
for slot_id in slot_ids {
if let Err(e) = self.kill(&slot_id).await {
error!(slot_id = %slot_id, error = %e, "Error killing PTY session");
}
}
info!("All PTY sessions shut down");
}
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct ManagerStats {
pub total: usize,
pub running: usize,
pub idle: usize,
pub busy: usize,
pub stopped: usize,
}