1use std::collections::HashMap;
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::{anyhow, Result};
12use chrono::Utc;
13use serde::{Deserialize, Serialize};
14use tokio::sync::{broadcast, RwLock};
15use tracing::{debug, error, info};
16
17use super::session::{
18 ConfirmInfo, ConfirmResponse, PTYSession, PTYSessionOptions, PermissionDecision, SessionEvent,
19 SessionState,
20};
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
26#[serde(rename_all = "camelCase")]
27pub struct PTYAgentInfo {
28 pub slot_id: String,
29 pub role: String,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 pub pid: Option<u32>,
32 pub state: SessionState,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub started_at: Option<i64>,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub current_task_id: Option<String>,
37 pub log_file: PathBuf,
38}
39
40#[derive(Debug, Clone, Default)]
42pub struct PTYSpawnOptions {
43 pub auto_restart: bool,
45}
46
47#[derive(Debug, Clone, Serialize)]
49pub struct PTYExecuteResult {
50 pub response: String,
51 pub duration_ms: u64,
52}
53
54#[derive(Debug, Clone)]
56pub struct Slot {
57 pub id: String,
58 pub role: String,
59 pub cwd: Option<PathBuf>,
60}
61
62pub trait PermissionPolicy: Send + Sync {
64 fn check_permission(&self, slot_id: &str, role: &str, tool_name: &str) -> PermissionDecision;
65}
66
67pub struct PTYManager {
74 sessions: Arc<RwLock<HashMap<String, Arc<RwLock<PTYSession>>>>>,
76 agent_info: Arc<RwLock<HashMap<String, PTYAgentInfo>>>,
78 logs_dir: PathBuf,
80 auto_restart_slots: Arc<RwLock<std::collections::HashSet<String>>>,
82 permission_policy: Arc<RwLock<Option<Arc<dyn PermissionPolicy>>>>,
84 event_tx: broadcast::Sender<ManagerEvent>,
86}
87
88#[derive(Debug, Clone)]
90pub enum ManagerEvent {
91 Spawned { slot_id: String },
93 StateChange {
95 slot_id: String,
96 new_state: SessionState,
97 prev_state: SessionState,
98 },
99 ConfirmRequired {
101 slot_id: String,
102 prompt: String,
103 tool_info: Option<ConfirmInfo>,
104 },
105 Exited { slot_id: String, exit_code: i32 },
107}
108
109impl PTYManager {
110 pub fn new(logs_dir: PathBuf) -> Self {
112 if !logs_dir.exists() {
114 std::fs::create_dir_all(&logs_dir).ok();
115 }
116
117 let (event_tx, _) = broadcast::channel(1000);
118
119 Self {
120 sessions: Arc::new(RwLock::new(HashMap::new())),
121 agent_info: Arc::new(RwLock::new(HashMap::new())),
122 logs_dir,
123 auto_restart_slots: Arc::new(RwLock::new(std::collections::HashSet::new())),
124 permission_policy: Arc::new(RwLock::new(None)),
125 event_tx,
126 }
127 }
128
129 pub async fn set_permission_policy(&self, policy: Arc<dyn PermissionPolicy>) {
131 *self.permission_policy.write().await = Some(policy);
132 info!("Permission policy set");
133 }
134
135 pub async fn init_slot(&self, slot: &Slot) {
137 let log_file = self.logs_dir.join(format!("pty-{}.log", slot.id));
138
139 let info = PTYAgentInfo {
140 slot_id: slot.id.clone(),
141 role: slot.role.clone(),
142 pid: None,
143 state: SessionState::Exited,
144 started_at: None,
145 current_task_id: None,
146 log_file,
147 };
148
149 self.agent_info.write().await.insert(slot.id.clone(), info);
150 debug!(slot_id = %slot.id, role = %slot.role, "PTY slot initialized");
151 }
152
153 pub async fn spawn(&self, slot: &Slot, options: PTYSpawnOptions) -> Result<PTYAgentInfo> {
155 let info = {
156 let agent_info = self.agent_info.read().await;
157 agent_info
158 .get(&slot.id)
159 .cloned()
160 .ok_or_else(|| anyhow!("Slot not initialized: {}", slot.id))?
161 };
162
163 {
165 let sessions = self.sessions.read().await;
166 if let Some(session) = sessions.get(&slot.id) {
167 let session = session.read().await;
168 if session.is_running() {
169 return Err(anyhow!("PTY session already running: {}", slot.id));
170 }
171 }
172 }
173
174 if options.auto_restart {
176 self.auto_restart_slots.write().await.insert(slot.id.clone());
177 }
178
179 let cwd = slot.cwd.clone().unwrap_or_else(|| {
181 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/"))
182 });
183
184 let mut session = PTYSession::new(PTYSessionOptions {
185 slot_id: slot.id.clone(),
186 cwd,
187 env: None,
188 log_file: Some(info.log_file.clone()),
189 cols: 120,
190 rows: 30,
191 })?;
192
193 let policy = self.permission_policy.read().await.clone();
195 let slot_id = slot.id.clone();
196 let role = slot.role.clone();
197 if let Some(policy) = policy {
198 session
199 .set_permission_check(move |confirm_info: &ConfirmInfo| {
200 let tool_name = confirm_info
201 .tool
202 .as_ref()
203 .map(|t| t.name.as_str())
204 .unwrap_or("");
205 policy.check_permission(&slot_id, &role, tool_name)
206 })
207 .await;
208 }
209
210 let event_tx = self.event_tx.clone();
212 let slot_id_for_events = slot.id.clone();
213 let mut session_rx = session.subscribe();
214
215 tokio::spawn(async move {
216 while let Ok(event) = session_rx.recv().await {
217 match event {
218 SessionEvent::StateChange {
219 new_state,
220 prev_state,
221 } => {
222 let _ = event_tx.send(ManagerEvent::StateChange {
223 slot_id: slot_id_for_events.clone(),
224 new_state,
225 prev_state,
226 });
227 }
228 SessionEvent::ConfirmRequired { prompt, info } => {
229 let _ = event_tx.send(ManagerEvent::ConfirmRequired {
230 slot_id: slot_id_for_events.clone(),
231 prompt,
232 tool_info: info,
233 });
234 }
235 SessionEvent::Exit(code) => {
236 let _ = event_tx.send(ManagerEvent::Exited {
237 slot_id: slot_id_for_events.clone(),
238 exit_code: code,
239 });
240 break;
241 }
242 _ => {}
243 }
244 }
245 });
246
247 session.start().await?;
249
250 let pid = session.pid().await;
251 let state = session.state().await;
252
253 {
255 let mut agent_info = self.agent_info.write().await;
256 if let Some(info) = agent_info.get_mut(&slot.id) {
257 info.pid = pid;
258 info.state = state;
259 info.started_at = Some(Utc::now().timestamp_millis());
260 }
261 }
262
263 {
265 let mut sessions = self.sessions.write().await;
266 sessions.insert(slot.id.clone(), Arc::new(RwLock::new(session)));
267 }
268
269 info!(slot_id = %slot.id, pid = ?pid, "PTY session started");
270 let _ = self.event_tx.send(ManagerEvent::Spawned {
271 slot_id: slot.id.clone(),
272 });
273
274 let manager_sessions = Arc::clone(&self.sessions);
276 let manager_info = Arc::clone(&self.agent_info);
277 let manager_auto_restart = Arc::clone(&self.auto_restart_slots);
278 let manager_policy = Arc::clone(&self.permission_policy);
279 let manager_event_tx = self.event_tx.clone();
280 let slot_for_restart = slot.clone();
281
282 tokio::spawn(async move {
283 let mut interval = tokio::time::interval(Duration::from_secs(1));
285 loop {
286 interval.tick().await;
287
288 let should_restart = {
289 let sessions = manager_sessions.read().await;
290 if let Some(session) = sessions.get(&slot_for_restart.id) {
291 let session = session.read().await;
292 !session.is_running()
293 } else {
294 false
295 }
296 };
297
298 if should_restart {
299 let auto_restart = manager_auto_restart
301 .read()
302 .await
303 .contains(&slot_for_restart.id);
304
305 if auto_restart {
306 info!(slot_id = %slot_for_restart.id, "Auto-restarting PTY session");
307
308 let cwd = slot_for_restart.cwd.clone().unwrap_or_else(|| {
310 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/"))
311 });
312
313 let log_file = {
314 let info = manager_info.read().await;
315 info.get(&slot_for_restart.id)
316 .map(|i| i.log_file.clone())
317 };
318
319 if let Ok(mut new_session) = PTYSession::new(PTYSessionOptions {
320 slot_id: slot_for_restart.id.clone(),
321 cwd,
322 env: None,
323 log_file,
324 cols: 120,
325 rows: 30,
326 }) {
327 let policy = manager_policy.read().await.clone();
329 let slot_id = slot_for_restart.id.clone();
330 let role = slot_for_restart.role.clone();
331 if let Some(policy) = policy {
332 new_session
333 .set_permission_check(move |confirm_info: &ConfirmInfo| {
334 let tool_name = confirm_info
335 .tool
336 .as_ref()
337 .map(|t| t.name.as_str())
338 .unwrap_or("");
339 policy.check_permission(&slot_id, &role, tool_name)
340 })
341 .await;
342 }
343
344 if new_session.start().await.is_ok() {
345 let pid = new_session.pid().await;
346 let state = new_session.state().await;
347
348 {
350 let mut info = manager_info.write().await;
351 if let Some(agent_info) = info.get_mut(&slot_for_restart.id) {
352 agent_info.pid = pid;
353 agent_info.state = state;
354 agent_info.started_at =
355 Some(Utc::now().timestamp_millis());
356 }
357 }
358
359 {
361 let mut sessions = manager_sessions.write().await;
362 sessions.insert(
363 slot_for_restart.id.clone(),
364 Arc::new(RwLock::new(new_session)),
365 );
366 }
367
368 let _ = manager_event_tx.send(ManagerEvent::Spawned {
369 slot_id: slot_for_restart.id.clone(),
370 });
371
372 info!(slot_id = %slot_for_restart.id, "Auto-restart successful");
373 } else {
374 error!(slot_id = %slot_for_restart.id, "Auto-restart failed");
375 }
376 }
377 }
378
379 break;
380 }
381 }
382 });
383
384 let agent_info = self.agent_info.read().await;
386 Ok(agent_info.get(&slot.id).cloned().unwrap())
387 }
388
389 pub async fn send(
391 &self,
392 slot_id: &str,
393 message: &str,
394 timeout_ms: u64,
395 ) -> Result<PTYExecuteResult> {
396 let session = {
397 let sessions = self.sessions.read().await;
398 sessions
399 .get(slot_id)
400 .cloned()
401 .ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
402 };
403
404 {
406 let mut agent_info = self.agent_info.write().await;
407 if let Some(info) = agent_info.get_mut(slot_id) {
408 info.state = SessionState::Thinking;
409 }
410 }
411
412 let start = std::time::Instant::now();
413
414 let response = {
415 let session = session.read().await;
416 session.send(message, timeout_ms).await?
417 };
418
419 let duration_ms = start.elapsed().as_millis() as u64;
420
421 info!(
422 slot_id = slot_id,
423 message_len = message.len(),
424 response_len = response.len(),
425 duration_ms = duration_ms,
426 "Message sent and response received"
427 );
428
429 Ok(PTYExecuteResult {
430 response,
431 duration_ms,
432 })
433 }
434
435 pub async fn subscribe_session(
437 &self,
438 slot_id: &str,
439 ) -> Result<broadcast::Receiver<SessionEvent>> {
440 let session = {
441 let sessions = self.sessions.read().await;
442 sessions
443 .get(slot_id)
444 .cloned()
445 .ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
446 };
447
448 let session = session.read().await;
449 Ok(session.subscribe())
450 }
451
452 pub async fn execute_task(
454 &self,
455 slot: &Slot,
456 task_id: &str,
457 prompt: &str,
458 ) -> Result<PTYExecuteResult> {
459 {
461 let mut agent_info = self.agent_info.write().await;
462 if let Some(info) = agent_info.get_mut(&slot.id) {
463 info.current_task_id = Some(task_id.to_string());
464 }
465 }
466
467 let result = self.send(&slot.id, prompt, 300_000).await;
468
469 {
471 let mut agent_info = self.agent_info.write().await;
472 if let Some(info) = agent_info.get_mut(&slot.id) {
473 info.current_task_id = None;
474 }
475 }
476
477 result
478 }
479
480 pub async fn confirm(&self, slot_id: &str, response: ConfirmResponse) -> Result<()> {
482 let session = {
483 let sessions = self.sessions.read().await;
484 sessions
485 .get(slot_id)
486 .cloned()
487 .ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
488 };
489
490 let session = session.read().await;
491 session.confirm(response).await
492 }
493
494 pub async fn write(&self, slot_id: &str, data: &str) -> Result<()> {
496 let session = {
497 let sessions = self.sessions.read().await;
498 sessions
499 .get(slot_id)
500 .cloned()
501 .ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
502 };
503
504 let session = session.read().await;
505 session.write(data).await
506 }
507
508 pub async fn interrupt(&self, slot_id: &str) -> Result<()> {
510 let session = {
511 let sessions = self.sessions.read().await;
512 sessions
513 .get(slot_id)
514 .cloned()
515 .ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
516 };
517
518 let session = session.read().await;
519 session.interrupt().await
520 }
521
522 pub async fn get_screen(&self, slot_id: &str) -> Result<String> {
524 let session = {
525 let sessions = self.sessions.read().await;
526 sessions
527 .get(slot_id)
528 .cloned()
529 .ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
530 };
531
532 let session = session.read().await;
533 Ok(session.get_screen_text().await)
534 }
535
536 pub async fn get_last_lines(&self, slot_id: &str, n: usize) -> Result<Vec<String>> {
538 let session = {
539 let sessions = self.sessions.read().await;
540 sessions
541 .get(slot_id)
542 .cloned()
543 .ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
544 };
545
546 let session = session.read().await;
547 Ok(session.get_last_lines(n).await)
548 }
549
550 pub async fn get_history(&self, slot_id: &str) -> Vec<super::session::Message> {
552 let sessions = self.sessions.read().await;
553 if let Some(session) = sessions.get(slot_id) {
554 let session = session.read().await;
555 session.history().await
556 } else {
557 Vec::new()
558 }
559 }
560
561 pub async fn kill(&self, slot_id: &str) -> Result<()> {
563 self.auto_restart_slots.write().await.remove(slot_id);
565
566 let session = {
568 let mut sessions = self.sessions.write().await;
569 sessions.remove(slot_id)
570 };
571
572 if let Some(session) = session {
573 let mut session = session.write().await;
574 session.close().await?;
575 }
576
577 {
579 let mut agent_info = self.agent_info.write().await;
580 if let Some(info) = agent_info.get_mut(slot_id) {
581 info.state = SessionState::Exited;
582 info.pid = None;
583 }
584 }
585
586 info!(slot_id = slot_id, "PTY session killed");
587 Ok(())
588 }
589
590 pub async fn restart(&self, slot: &Slot, options: PTYSpawnOptions) -> Result<PTYAgentInfo> {
592 self.kill(&slot.id).await?;
593 self.spawn(slot, options).await
594 }
595
596 pub async fn get_status(&self, slot_id: &str) -> Option<PTYAgentInfo> {
598 self.agent_info.read().await.get(slot_id).cloned()
599 }
600
601 pub async fn get_all_status(&self) -> Vec<PTYAgentInfo> {
603 self.agent_info.read().await.values().cloned().collect()
604 }
605
606 pub async fn is_available(&self, slot_id: &str) -> bool {
608 if let Some(info) = self.agent_info.read().await.get(slot_id) {
609 info.state == SessionState::Idle
610 } else {
611 false
612 }
613 }
614
615 pub async fn is_running(&self, slot_id: &str) -> bool {
617 let sessions = self.sessions.read().await;
618 if let Some(session) = sessions.get(slot_id) {
619 let session = session.read().await;
620 session.is_running()
621 } else {
622 false
623 }
624 }
625
626 pub async fn get_stats(&self) -> ManagerStats {
628 let mut stats = ManagerStats::default();
629
630 let agent_info = self.agent_info.read().await;
631 stats.total = agent_info.len();
632
633 for info in agent_info.values() {
634 match info.state {
635 SessionState::Idle => {
636 stats.idle += 1;
637 stats.running += 1;
638 }
639 SessionState::Thinking
640 | SessionState::Responding
641 | SessionState::ToolRunning
642 | SessionState::Confirming => {
643 stats.busy += 1;
644 stats.running += 1;
645 }
646 SessionState::Starting => {
647 stats.running += 1;
648 }
649 SessionState::Exited | SessionState::Error => {
650 stats.stopped += 1;
651 }
652 }
653 }
654
655 stats
656 }
657
658 pub fn subscribe(&self) -> broadcast::Receiver<ManagerEvent> {
660 self.event_tx.subscribe()
661 }
662
663 pub async fn shutdown(&self) {
665 info!("Shutting down all PTY sessions...");
666
667 self.auto_restart_slots.write().await.clear();
669
670 let slot_ids: Vec<String> = {
672 let sessions = self.sessions.read().await;
673 sessions.keys().cloned().collect()
674 };
675
676 for slot_id in slot_ids {
678 if let Err(e) = self.kill(&slot_id).await {
679 error!(slot_id = %slot_id, error = %e, "Error killing PTY session");
680 }
681 }
682
683 info!("All PTY sessions shut down");
684 }
685}
686
687#[derive(Debug, Clone, Default, Serialize)]
689pub struct ManagerStats {
690 pub total: usize,
691 pub running: usize,
692 pub idle: usize,
693 pub busy: usize,
694 pub stopped: usize,
695}