1use std::io::{Read, Write};
19use std::net::TcpStream;
20use std::path::PathBuf;
21use std::time::Duration;
22
23use serde::Deserialize;
24
25use crate::broker::BrokerState;
26use crate::config::AutoApproveConfig;
27use crate::error::PawError;
28
29use super::approve::{ApprovalRequest, KeyDispatcher, auto_approve_pane};
30use super::auto_approve::{is_safe_command, is_worktree_file_op};
31use super::permission_prompt::{PermissionType, detect_permission_prompt};
32use super::stall::detect_stalled_agents;
33
34#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum TickOutcome {
37 NoPrompt,
39 Approved {
41 matched_entry: String,
43 kind: PermissionType,
45 },
46 Forwarded {
49 kind: PermissionType,
51 },
52}
53
54pub trait PaneResolver {
59 fn pane_index_for(&self, agent_id: &str) -> Option<usize>;
62}
63
64impl<F> PaneResolver for F
65where
66 F: Fn(&str) -> Option<usize>,
67{
68 fn pane_index_for(&self, agent_id: &str) -> Option<usize> {
69 self(agent_id)
70 }
71}
72
73pub trait WorktreeResolver {
82 fn worktree_root_for(&self, agent_id: &str) -> Option<PathBuf>;
84}
85
86impl<F> WorktreeResolver for F
87where
88 F: Fn(&str) -> Option<PathBuf>,
89{
90 fn worktree_root_for(&self, agent_id: &str) -> Option<PathBuf> {
91 self(agent_id)
92 }
93}
94
95pub trait PaneInspector {
100 fn inspect(&self, session: &str, pane_index: usize) -> Option<PermissionType>;
103 fn captured_text(&self, session: &str, pane_index: usize) -> String;
106}
107
108pub struct TmuxPaneInspector;
110
111impl PaneInspector for TmuxPaneInspector {
112 fn inspect(&self, session: &str, pane_index: usize) -> Option<PermissionType> {
113 detect_permission_prompt(session, pane_index)
114 }
115 fn captured_text(&self, session: &str, pane_index: usize) -> String {
116 super::permission_prompt::capture_pane(session, pane_index).unwrap_or_default()
117 }
118}
119
120pub trait QuestionForwarder {
122 fn forward_question(&mut self, agent_id: &str, kind: PermissionType, captured: &str);
127}
128
129pub struct PollContext<'a, R, I, D, Q, W>
134where
135 R: PaneResolver,
136 I: PaneInspector,
137 D: KeyDispatcher,
138 Q: QuestionForwarder,
139 W: WorktreeResolver,
140{
141 pub state: Option<&'a BrokerState>,
147 pub session: &'a str,
149 pub config: &'a AutoApproveConfig,
151 pub resolver: &'a R,
153 pub inspector: &'a I,
155 pub dispatcher: &'a mut D,
157 pub forwarder: &'a mut Q,
159 pub worktree_resolver: &'a W,
161 pub broker_url: Option<&'a str>,
163}
164
165pub fn poll_tick<R, I, D, Q, W>(
168 ctx: &mut PollContext<'_, R, I, D, Q, W>,
169) -> Vec<(String, TickOutcome)>
170where
171 R: PaneResolver,
172 I: PaneInspector,
173 D: KeyDispatcher,
174 Q: QuestionForwarder,
175 W: WorktreeResolver,
176{
177 let cfg = ctx.config.resolved();
178 if !cfg.enabled {
179 return Vec::new();
180 }
181 let Some(state) = ctx.state else {
182 return Vec::new();
183 };
184 let threshold = Duration::from_secs(cfg.stall_threshold_seconds);
185 let stalled = detect_stalled_agents(state, threshold);
186 let whitelist = cfg.effective_whitelist();
187 drive_outcomes(stalled, ctx, &cfg, &whitelist)
188}
189
190#[derive(Debug, Clone, Deserialize)]
193pub struct AgentStatusRow {
194 pub agent_id: String,
196 pub status: String,
198 pub last_seen_seconds: u64,
200}
201
202pub fn fetch_status_over_http(broker_url: &str) -> Result<Vec<AgentStatusRow>, PawError> {
208 let addr = broker_url.strip_prefix("http://").unwrap_or(broker_url);
209 let socket_addr = if let Ok(a) = addr.parse() {
210 a
211 } else {
212 use std::net::ToSocketAddrs;
213 addr.to_socket_addrs()
214 .map_err(|e| PawError::SessionError(format!("invalid broker address {addr}: {e}")))?
215 .next()
216 .ok_or_else(|| {
217 PawError::SessionError(format!("broker address {addr} resolved to no addrs"))
218 })?
219 };
220
221 let mut stream = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
222 .map_err(|e| PawError::SessionError(format!("failed to connect to broker: {e}")))?;
223 stream.set_read_timeout(Some(Duration::from_secs(2))).ok();
224 stream.set_write_timeout(Some(Duration::from_secs(2))).ok();
225
226 let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
227 stream
228 .write_all(request.as_bytes())
229 .map_err(|e| PawError::SessionError(format!("failed to write status request: {e}")))?;
230
231 let mut response = String::new();
232 let _ = stream.read_to_string(&mut response);
233
234 let body_start = response
236 .find("\r\n\r\n")
237 .map(|i| i + 4)
238 .ok_or_else(|| PawError::SessionError("malformed broker response".to_string()))?;
239 let body = &response[body_start..];
240
241 let parsed: StatusResponse = serde_json::from_str(body)
242 .map_err(|e| PawError::SessionError(format!("broker /status parse error: {e}")))?;
243 Ok(parsed.agents)
244}
245
246#[derive(Deserialize)]
247struct StatusResponse {
248 agents: Vec<AgentStatusRow>,
249}
250
251#[must_use]
258pub fn stalled_from_status(rows: &[AgentStatusRow], threshold_seconds: u64) -> Vec<String> {
259 rows.iter()
260 .filter(|r| !super::stall::TERMINAL_STATUSES.contains(&r.status.as_str()))
261 .filter(|r| r.last_seen_seconds >= threshold_seconds)
262 .map(|r| r.agent_id.clone())
263 .collect()
264}
265
266pub fn tick_from_status<R, I, D, Q, W>(
273 rows: &[AgentStatusRow],
274 ctx: &mut PollContext<'_, R, I, D, Q, W>,
275) -> Vec<(String, TickOutcome)>
276where
277 R: PaneResolver,
278 I: PaneInspector,
279 D: KeyDispatcher,
280 Q: QuestionForwarder,
281 W: WorktreeResolver,
282{
283 let cfg = ctx.config.resolved();
284 if !cfg.enabled {
285 return Vec::new();
286 }
287 let stalled = stalled_from_status(rows, cfg.stall_threshold_seconds);
288 let whitelist = cfg.effective_whitelist();
289 drive_outcomes(stalled, ctx, &cfg, &whitelist)
290}
291
292fn drive_outcomes<R, I, D, Q, W>(
293 stalled: Vec<String>,
294 ctx: &mut PollContext<'_, R, I, D, Q, W>,
295 cfg: &AutoApproveConfig,
296 whitelist: &[String],
297) -> Vec<(String, TickOutcome)>
298where
299 R: PaneResolver,
300 I: PaneInspector,
301 D: KeyDispatcher,
302 Q: QuestionForwarder,
303 W: WorktreeResolver,
304{
305 let mut out = Vec::with_capacity(stalled.len());
306 for agent_id in stalled {
307 let Some(pane_index) = ctx.resolver.pane_index_for(&agent_id) else {
308 continue;
309 };
310 let Some(kind) = ctx.inspector.inspect(ctx.session, pane_index) else {
311 out.push((agent_id, TickOutcome::NoPrompt));
312 continue;
313 };
314 let captured = ctx.inspector.captured_text(ctx.session, pane_index);
315 if let Some(entry) = first_whitelist_match(&captured, whitelist) {
318 let req = ApprovalRequest {
319 enabled: cfg.enabled,
320 session: ctx.session,
321 pane_index,
322 agent_id: &agent_id,
323 kind,
324 matched_entry: Some(entry.as_str()),
325 broker_url: ctx.broker_url,
326 };
327 match auto_approve_pane(ctx.dispatcher, req) {
328 Ok(true) => out.push((
329 agent_id,
330 TickOutcome::Approved {
331 matched_entry: entry,
332 kind,
333 },
334 )),
335 _ => out.push((agent_id, TickOutcome::Forwarded { kind })),
336 }
337 continue;
338 }
339
340 if let Some(root) = ctx.worktree_resolver.worktree_root_for(&agent_id)
344 && is_worktree_file_op(&captured, &root, cfg.approve_worktree_writes())
345 {
346 let req = ApprovalRequest {
347 enabled: cfg.enabled,
348 session: ctx.session,
349 pane_index,
350 agent_id: &agent_id,
351 kind: PermissionType::WorktreeFileOp,
352 matched_entry: Some("worktree-file-op"),
353 broker_url: ctx.broker_url,
354 };
355 match auto_approve_pane(ctx.dispatcher, req) {
356 Ok(true) => out.push((
357 agent_id,
358 TickOutcome::Approved {
359 matched_entry: "worktree-file-op".to_string(),
360 kind: PermissionType::WorktreeFileOp,
361 },
362 )),
363 _ => out.push((
364 agent_id,
365 TickOutcome::Forwarded {
366 kind: PermissionType::WorktreeFileOp,
367 },
368 )),
369 }
370 continue;
371 }
372
373 ctx.forwarder.forward_question(&agent_id, kind, &captured);
374 out.push((agent_id, TickOutcome::Forwarded { kind }));
375 }
376 out
377}
378
379fn first_whitelist_match(captured: &str, whitelist: &[String]) -> Option<String> {
380 for line in captured.lines() {
384 for entry in whitelist {
385 if is_safe_command(line, std::slice::from_ref(entry)) {
386 return Some(entry.clone());
387 }
388 }
389 }
390 None
391}
392
393#[cfg(test)]
394mod tests {
395 use super::*;
396 use crate::broker::messages::{BrokerMessage, StatusPayload};
397 use crate::broker::{AgentRecord, BrokerState};
398 use crate::config::AutoApproveConfig;
399 use std::cell::RefCell;
400 use std::time::Instant;
401
402 struct StubInspector {
403 kind: Option<PermissionType>,
404 captured: String,
405 }
406 impl PaneInspector for StubInspector {
407 fn inspect(&self, _session: &str, _pane_index: usize) -> Option<PermissionType> {
408 self.kind
409 }
410 fn captured_text(&self, _session: &str, _pane_index: usize) -> String {
411 self.captured.clone()
412 }
413 }
414
415 struct RecordingDispatcher {
416 events: Vec<(String, usize, String)>,
417 }
418 impl KeyDispatcher for RecordingDispatcher {
419 fn send_key(&mut self, session: &str, pane_index: usize, key: &str) -> std::io::Result<()> {
420 self.events
421 .push((session.to_string(), pane_index, key.to_string()));
422 Ok(())
423 }
424 }
425
426 #[derive(Default)]
427 struct RecordingForwarder {
428 forwards: RefCell<Vec<(String, PermissionType, String)>>,
429 }
430 impl QuestionForwarder for RecordingForwarder {
431 fn forward_question(&mut self, agent_id: &str, kind: PermissionType, captured: &str) {
432 self.forwards
433 .borrow_mut()
434 .push((agent_id.to_string(), kind, captured.to_string()));
435 }
436 }
437
438 fn insert_stalled(state: &BrokerState, id: &str, age_secs: u64) {
439 let mut inner = state.write();
440 inner.agents.insert(
441 id.to_string(),
442 AgentRecord {
443 agent_id: id.to_string(),
444 status: "working".to_string(),
445 last_seen: Instant::now()
446 .checked_sub(Duration::from_secs(age_secs))
447 .unwrap_or_else(Instant::now),
448 last_message: Some(BrokerMessage::Status {
449 agent_id: id.to_string(),
450 payload: StatusPayload {
451 status: "working".to_string(),
452 modified_files: Vec::new(),
453 message: None,
454 ..Default::default()
455 },
456 }),
457 last_committed_at: None,
458 },
459 );
460 }
461
462 fn run_tick<R: PaneResolver, I: PaneInspector>(
463 state: &BrokerState,
464 cfg: &AutoApproveConfig,
465 resolver: &R,
466 inspector: &I,
467 ) -> (
468 Vec<(String, TickOutcome)>,
469 RecordingDispatcher,
470 RecordingForwarder,
471 ) {
472 let no_worktree = |_id: &str| None::<PathBuf>;
474 let mut dispatcher = RecordingDispatcher { events: vec![] };
475 let mut forwarder = RecordingForwarder::default();
476 let out = {
477 let mut ctx = PollContext {
478 state: Some(state),
479 session: "paw-x",
480 config: cfg,
481 resolver,
482 inspector,
483 dispatcher: &mut dispatcher,
484 forwarder: &mut forwarder,
485 worktree_resolver: &no_worktree,
486 broker_url: None,
487 };
488 poll_tick(&mut ctx)
489 };
490 (out, dispatcher, forwarder)
491 }
492
493 #[test]
494 fn disabled_config_returns_empty() {
495 let state = BrokerState::new(None);
496 insert_stalled(&state, "stuck", 600);
497 let cfg = AutoApproveConfig {
498 enabled: false,
499 ..AutoApproveConfig::default()
500 };
501 let resolver = |_id: &str| Some(1);
502 let inspector = StubInspector {
503 kind: Some(PermissionType::Cargo),
504 captured: "cargo test".into(),
505 };
506 let (out, dispatcher, _) = run_tick(&state, &cfg, &resolver, &inspector);
507 assert!(out.is_empty());
508 assert!(dispatcher.events.is_empty());
509 }
510
511 #[test]
512 fn stalled_safe_agent_is_approved() {
513 let state = BrokerState::new(None);
514 insert_stalled(&state, "agent-a", 600);
515 let cfg = AutoApproveConfig::default();
516 let resolver = |id: &str| if id == "agent-a" { Some(2) } else { None };
517 let inspector = StubInspector {
518 kind: Some(PermissionType::Cargo),
519 captured: "cargo test --workspace".into(),
520 };
521 let (out, dispatcher, forwarder) = run_tick(&state, &cfg, &resolver, &inspector);
522 assert_eq!(out.len(), 1);
523 let (id, outcome) = &out[0];
524 assert_eq!(id, "agent-a");
525 match outcome {
526 TickOutcome::Approved {
527 matched_entry,
528 kind,
529 } => {
530 assert_eq!(matched_entry, "cargo test");
531 assert_eq!(*kind, PermissionType::Cargo);
532 }
533 _ => panic!("expected Approved, got {outcome:?}"),
534 }
535 let keys: Vec<&str> = dispatcher
537 .events
538 .iter()
539 .map(|(_, _, k)| k.as_str())
540 .collect();
541 assert_eq!(keys, vec!["BTab", "Down", "Enter"]);
542 assert!(forwarder.forwards.borrow().is_empty());
543 }
544
545 #[test]
546 fn stalled_unsafe_agent_is_forwarded_not_approved() {
547 let state = BrokerState::new(None);
548 insert_stalled(&state, "agent-b", 600);
549 let cfg = AutoApproveConfig::default();
550 let resolver = |_id: &str| Some(3);
551 let inspector = StubInspector {
552 kind: Some(PermissionType::Unknown),
553 captured: "rm -rf /tmp/foo\nrequires approval".into(),
554 };
555 let (out, dispatcher, forwarder) = run_tick(&state, &cfg, &resolver, &inspector);
556 assert_eq!(out.len(), 1);
557 match &out[0].1 {
558 TickOutcome::Forwarded { kind } => assert_eq!(*kind, PermissionType::Unknown),
559 other => panic!("expected Forwarded, got {other:?}"),
560 }
561 assert!(
562 dispatcher.events.is_empty(),
563 "no keystrokes for unsafe prompt"
564 );
565 let forwards = forwarder.forwards.borrow();
566 assert_eq!(forwards.len(), 1);
567 assert_eq!(forwards[0].0, "agent-b");
568 }
569
570 #[test]
571 fn fresh_agent_is_skipped() {
572 let state = BrokerState::new(None);
573 insert_stalled(&state, "fresh", 0); let cfg = AutoApproveConfig::default();
575 let resolver = |_id: &str| Some(1);
576 let inspector = StubInspector {
577 kind: Some(PermissionType::Cargo),
578 captured: "cargo test".into(),
579 };
580 let (out, dispatcher, _) = run_tick(&state, &cfg, &resolver, &inspector);
581 assert!(out.is_empty(), "fresh agent must not be polled");
582 assert!(dispatcher.events.is_empty());
583 }
584
585 #[test]
586 fn no_marker_means_no_prompt_outcome() {
587 let state = BrokerState::new(None);
588 insert_stalled(&state, "agent-c", 600);
589 let cfg = AutoApproveConfig::default();
590 let resolver = |_id: &str| Some(1);
591 let inspector = StubInspector {
592 kind: None,
593 captured: String::new(),
594 };
595 let (out, dispatcher, _) = run_tick(&state, &cfg, &resolver, &inspector);
596 assert_eq!(out.len(), 1);
597 assert_eq!(out[0].1, TickOutcome::NoPrompt);
598 assert!(dispatcher.events.is_empty());
599 }
600
601 fn row(agent_id: &str, status: &str, last_seen_seconds: u64) -> AgentStatusRow {
604 AgentStatusRow {
605 agent_id: agent_id.to_string(),
606 status: status.to_string(),
607 last_seen_seconds,
608 }
609 }
610
611 #[test]
612 fn stalled_from_status_filters_by_threshold() {
613 let rows = vec![
614 row("fresh", "working", 5),
615 row("stale", "working", 60),
616 row("ancient", "working", 600),
617 ];
618 let stalled = stalled_from_status(&rows, 30);
619 assert!(stalled.contains(&"stale".to_string()));
620 assert!(stalled.contains(&"ancient".to_string()));
621 assert!(!stalled.contains(&"fresh".to_string()));
622 }
623
624 #[test]
625 fn stalled_from_status_skips_terminal() {
626 let rows = vec![
627 row("a", "done", 600),
628 row("b", "verified", 600),
629 row("c", "blocked", 600),
630 row("d", "committed", 600),
631 row("e", "working", 600),
632 ];
633 let stalled = stalled_from_status(&rows, 30);
634 assert_eq!(stalled, vec!["e".to_string()]);
635 }
636
637 #[test]
638 fn tick_from_status_dispatches_safe_prompt() {
639 let rows = vec![row("agent-a", "working", 300)];
640 let cfg = AutoApproveConfig::default();
641 let resolver = |id: &str| if id == "agent-a" { Some(2) } else { None };
642 let inspector = StubInspector {
643 kind: Some(PermissionType::Cargo),
644 captured: "cargo test --workspace".into(),
645 };
646 let no_worktree = |_id: &str| None::<PathBuf>;
647 let mut dispatcher = RecordingDispatcher { events: vec![] };
648 let mut forwarder = RecordingForwarder::default();
649 let out = {
650 let mut ctx = PollContext {
651 state: None,
652 session: "paw-x",
653 config: &cfg,
654 resolver: &resolver,
655 inspector: &inspector,
656 dispatcher: &mut dispatcher,
657 forwarder: &mut forwarder,
658 worktree_resolver: &no_worktree,
659 broker_url: None,
660 };
661 tick_from_status(&rows, &mut ctx)
662 };
663 assert_eq!(out.len(), 1);
664 let keys: Vec<&str> = dispatcher
665 .events
666 .iter()
667 .map(|(_, _, k)| k.as_str())
668 .collect();
669 assert_eq!(keys, vec!["BTab", "Down", "Enter"]);
670 }
671
672 fn run_tick_with_worktree<R, I, Wt>(
675 state: &BrokerState,
676 cfg: &AutoApproveConfig,
677 resolver: &R,
678 inspector: &I,
679 worktree_resolver: &Wt,
680 ) -> (
681 Vec<(String, TickOutcome)>,
682 RecordingDispatcher,
683 RecordingForwarder,
684 )
685 where
686 R: PaneResolver,
687 I: PaneInspector,
688 Wt: WorktreeResolver,
689 {
690 let mut dispatcher = RecordingDispatcher { events: vec![] };
691 let mut forwarder = RecordingForwarder::default();
692 let out = {
693 let mut ctx = PollContext {
694 state: Some(state),
695 session: "paw-x",
696 config: cfg,
697 resolver,
698 inspector,
699 dispatcher: &mut dispatcher,
700 forwarder: &mut forwarder,
701 worktree_resolver,
702 broker_url: None,
703 };
704 poll_tick(&mut ctx)
705 };
706 (out, dispatcher, forwarder)
707 }
708
709 #[test]
710 fn in_worktree_file_prompt_is_auto_approved() {
711 let tmp = tempfile::tempdir().unwrap();
712 let root = tmp.path().to_path_buf();
713 let state = BrokerState::new(None);
714 insert_stalled(&state, "agent-a", 600);
715 let cfg = AutoApproveConfig::default();
716 let resolver = |id: &str| if id == "agent-a" { Some(2) } else { None };
717 let inspector = StubInspector {
720 kind: Some(PermissionType::Unknown),
721 captured: "Do you want to allow this write to Containerfile?".into(),
722 };
723 let worktree = move |id: &str| {
724 if id == "agent-a" {
725 Some(root.clone())
726 } else {
727 None
728 }
729 };
730 let (out, dispatcher, forwarder) =
731 run_tick_with_worktree(&state, &cfg, &resolver, &inspector, &worktree);
732 assert_eq!(out.len(), 1);
733 match &out[0].1 {
734 TickOutcome::Approved {
735 matched_entry,
736 kind,
737 } => {
738 assert_eq!(matched_entry, "worktree-file-op");
739 assert_eq!(*kind, PermissionType::WorktreeFileOp);
740 }
741 other => panic!("expected Approved worktree-file-op, got {other:?}"),
742 }
743 let keys: Vec<&str> = dispatcher
744 .events
745 .iter()
746 .map(|(_, _, k)| k.as_str())
747 .collect();
748 assert_eq!(keys, vec!["BTab", "Down", "Enter"]);
749 assert!(forwarder.forwards.borrow().is_empty());
750 }
751
752 #[test]
753 fn out_of_worktree_file_prompt_is_forwarded() {
754 let tmp = tempfile::tempdir().unwrap();
755 let root = tmp.path().to_path_buf();
756 let state = BrokerState::new(None);
757 insert_stalled(&state, "agent-b", 600);
758 let cfg = AutoApproveConfig::default();
759 let resolver = |_id: &str| Some(3);
760 let inspector = StubInspector {
761 kind: Some(PermissionType::Unknown),
762 captured: "Do you want to allow this write to /etc/hosts?".into(),
763 };
764 let worktree = move |_id: &str| Some(root.clone());
765 let (out, dispatcher, forwarder) =
766 run_tick_with_worktree(&state, &cfg, &resolver, &inspector, &worktree);
767 assert_eq!(out.len(), 1);
768 assert!(matches!(out[0].1, TickOutcome::Forwarded { .. }));
769 assert!(
770 dispatcher.events.is_empty(),
771 "out-of-worktree prompt must not dispatch keystrokes"
772 );
773 assert_eq!(forwarder.forwards.borrow().len(), 1);
774 }
775
776 #[test]
777 fn disabled_worktree_writes_forwards_file_prompt() {
778 let tmp = tempfile::tempdir().unwrap();
779 let root = tmp.path().to_path_buf();
780 let state = BrokerState::new(None);
781 insert_stalled(&state, "agent-c", 600);
782 let cfg = AutoApproveConfig {
783 approve_worktree_writes: Some(false),
784 ..AutoApproveConfig::default()
785 };
786 let resolver = |_id: &str| Some(1);
787 let inspector = StubInspector {
788 kind: Some(PermissionType::Unknown),
789 captured: "Do you want to allow this write to Containerfile?".into(),
790 };
791 let worktree = move |_id: &str| Some(root.clone());
792 let (out, dispatcher, _forwarder) =
793 run_tick_with_worktree(&state, &cfg, &resolver, &inspector, &worktree);
794 assert_eq!(out.len(), 1);
795 assert!(
796 matches!(out[0].1, TickOutcome::Forwarded { .. }),
797 "approve_worktree_writes=false must forward, got {:?}",
798 out[0].1
799 );
800 assert!(dispatcher.events.is_empty());
801 }
802}