1use std::io::{Read, Write};
19use std::net::TcpStream;
20use std::time::Duration;
21
22use serde::Deserialize;
23
24use crate::broker::BrokerState;
25use crate::config::AutoApproveConfig;
26use crate::error::PawError;
27
28use super::approve::{ApprovalRequest, KeyDispatcher, auto_approve_pane};
29use super::auto_approve::is_safe_command;
30use super::permission_prompt::{PermissionType, detect_permission_prompt};
31use super::stall::detect_stalled_agents;
32
33#[derive(Debug, Clone, PartialEq, Eq)]
35pub enum TickOutcome {
36 NoPrompt,
38 Approved {
40 matched_entry: String,
42 kind: PermissionType,
44 },
45 Forwarded {
48 kind: PermissionType,
50 },
51}
52
53pub trait PaneResolver {
58 fn pane_index_for(&self, agent_id: &str) -> Option<usize>;
61}
62
63impl<F> PaneResolver for F
64where
65 F: Fn(&str) -> Option<usize>,
66{
67 fn pane_index_for(&self, agent_id: &str) -> Option<usize> {
68 self(agent_id)
69 }
70}
71
72pub trait PaneInspector {
77 fn inspect(&self, session: &str, pane_index: usize) -> Option<PermissionType>;
80 fn captured_text(&self, session: &str, pane_index: usize) -> String;
83}
84
85pub struct TmuxPaneInspector;
87
88impl PaneInspector for TmuxPaneInspector {
89 fn inspect(&self, session: &str, pane_index: usize) -> Option<PermissionType> {
90 detect_permission_prompt(session, pane_index)
91 }
92 fn captured_text(&self, session: &str, pane_index: usize) -> String {
93 super::permission_prompt::capture_pane(session, pane_index).unwrap_or_default()
94 }
95}
96
97pub trait QuestionForwarder {
99 fn forward_question(&mut self, agent_id: &str, kind: PermissionType, captured: &str);
104}
105
106pub struct PollContext<'a, R, I, D, Q>
111where
112 R: PaneResolver,
113 I: PaneInspector,
114 D: KeyDispatcher,
115 Q: QuestionForwarder,
116{
117 pub state: Option<&'a BrokerState>,
123 pub session: &'a str,
125 pub config: &'a AutoApproveConfig,
127 pub resolver: &'a R,
129 pub inspector: &'a I,
131 pub dispatcher: &'a mut D,
133 pub forwarder: &'a mut Q,
135 pub broker_url: Option<&'a str>,
137}
138
139pub fn poll_tick<R, I, D, Q>(ctx: &mut PollContext<'_, R, I, D, Q>) -> Vec<(String, TickOutcome)>
142where
143 R: PaneResolver,
144 I: PaneInspector,
145 D: KeyDispatcher,
146 Q: QuestionForwarder,
147{
148 let cfg = ctx.config.resolved();
149 if !cfg.enabled {
150 return Vec::new();
151 }
152 let Some(state) = ctx.state else {
153 return Vec::new();
154 };
155 let threshold = Duration::from_secs(cfg.stall_threshold_seconds);
156 let stalled = detect_stalled_agents(state, threshold);
157 let whitelist = cfg.effective_whitelist();
158 drive_outcomes(stalled, ctx, &cfg, &whitelist)
159}
160
161#[derive(Debug, Clone, Deserialize)]
164pub struct AgentStatusRow {
165 pub agent_id: String,
167 pub status: String,
169 pub last_seen_seconds: u64,
171}
172
173pub fn fetch_status_over_http(broker_url: &str) -> Result<Vec<AgentStatusRow>, PawError> {
179 let addr = broker_url.strip_prefix("http://").unwrap_or(broker_url);
180 let socket_addr = if let Ok(a) = addr.parse() {
181 a
182 } else {
183 use std::net::ToSocketAddrs;
184 addr.to_socket_addrs()
185 .map_err(|e| PawError::SessionError(format!("invalid broker address {addr}: {e}")))?
186 .next()
187 .ok_or_else(|| {
188 PawError::SessionError(format!("broker address {addr} resolved to no addrs"))
189 })?
190 };
191
192 let mut stream = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
193 .map_err(|e| PawError::SessionError(format!("failed to connect to broker: {e}")))?;
194 stream.set_read_timeout(Some(Duration::from_secs(2))).ok();
195 stream.set_write_timeout(Some(Duration::from_secs(2))).ok();
196
197 let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
198 stream
199 .write_all(request.as_bytes())
200 .map_err(|e| PawError::SessionError(format!("failed to write status request: {e}")))?;
201
202 let mut response = String::new();
203 let _ = stream.read_to_string(&mut response);
204
205 let body_start = response
207 .find("\r\n\r\n")
208 .map(|i| i + 4)
209 .ok_or_else(|| PawError::SessionError("malformed broker response".to_string()))?;
210 let body = &response[body_start..];
211
212 let parsed: StatusResponse = serde_json::from_str(body)
213 .map_err(|e| PawError::SessionError(format!("broker /status parse error: {e}")))?;
214 Ok(parsed.agents)
215}
216
217#[derive(Deserialize)]
218struct StatusResponse {
219 agents: Vec<AgentStatusRow>,
220}
221
222#[must_use]
229pub fn stalled_from_status(rows: &[AgentStatusRow], threshold_seconds: u64) -> Vec<String> {
230 rows.iter()
231 .filter(|r| !super::stall::TERMINAL_STATUSES.contains(&r.status.as_str()))
232 .filter(|r| r.last_seen_seconds >= threshold_seconds)
233 .map(|r| r.agent_id.clone())
234 .collect()
235}
236
237pub fn tick_from_status<R, I, D, Q>(
244 rows: &[AgentStatusRow],
245 ctx: &mut PollContext<'_, R, I, D, Q>,
246) -> Vec<(String, TickOutcome)>
247where
248 R: PaneResolver,
249 I: PaneInspector,
250 D: KeyDispatcher,
251 Q: QuestionForwarder,
252{
253 let cfg = ctx.config.resolved();
254 if !cfg.enabled {
255 return Vec::new();
256 }
257 let stalled = stalled_from_status(rows, cfg.stall_threshold_seconds);
258 let whitelist = cfg.effective_whitelist();
259 drive_outcomes(stalled, ctx, &cfg, &whitelist)
260}
261
262fn drive_outcomes<R, I, D, Q>(
263 stalled: Vec<String>,
264 ctx: &mut PollContext<'_, R, I, D, Q>,
265 cfg: &AutoApproveConfig,
266 whitelist: &[String],
267) -> Vec<(String, TickOutcome)>
268where
269 R: PaneResolver,
270 I: PaneInspector,
271 D: KeyDispatcher,
272 Q: QuestionForwarder,
273{
274 let mut out = Vec::with_capacity(stalled.len());
275 for agent_id in stalled {
276 let Some(pane_index) = ctx.resolver.pane_index_for(&agent_id) else {
277 continue;
278 };
279 let Some(kind) = ctx.inspector.inspect(ctx.session, pane_index) else {
280 out.push((agent_id, TickOutcome::NoPrompt));
281 continue;
282 };
283 let captured = ctx.inspector.captured_text(ctx.session, pane_index);
284 let matched = first_whitelist_match(&captured, whitelist);
285 if let Some(entry) = matched {
286 let req = ApprovalRequest {
287 enabled: cfg.enabled,
288 session: ctx.session,
289 pane_index,
290 agent_id: &agent_id,
291 kind,
292 matched_entry: Some(entry.as_str()),
293 broker_url: ctx.broker_url,
294 };
295 match auto_approve_pane(ctx.dispatcher, req) {
296 Ok(true) => out.push((
297 agent_id,
298 TickOutcome::Approved {
299 matched_entry: entry,
300 kind,
301 },
302 )),
303 _ => out.push((agent_id, TickOutcome::Forwarded { kind })),
304 }
305 } else {
306 ctx.forwarder.forward_question(&agent_id, kind, &captured);
307 out.push((agent_id, TickOutcome::Forwarded { kind }));
308 }
309 }
310 out
311}
312
313fn first_whitelist_match(captured: &str, whitelist: &[String]) -> Option<String> {
314 for line in captured.lines() {
318 for entry in whitelist {
319 if is_safe_command(line, std::slice::from_ref(entry)) {
320 return Some(entry.clone());
321 }
322 }
323 }
324 None
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330 use crate::broker::messages::{BrokerMessage, StatusPayload};
331 use crate::broker::{AgentRecord, BrokerState};
332 use crate::config::AutoApproveConfig;
333 use std::cell::RefCell;
334 use std::time::Instant;
335
336 struct StubInspector {
337 kind: Option<PermissionType>,
338 captured: String,
339 }
340 impl PaneInspector for StubInspector {
341 fn inspect(&self, _session: &str, _pane_index: usize) -> Option<PermissionType> {
342 self.kind
343 }
344 fn captured_text(&self, _session: &str, _pane_index: usize) -> String {
345 self.captured.clone()
346 }
347 }
348
349 struct RecordingDispatcher {
350 events: Vec<(String, usize, String)>,
351 }
352 impl KeyDispatcher for RecordingDispatcher {
353 fn send_key(&mut self, session: &str, pane_index: usize, key: &str) -> std::io::Result<()> {
354 self.events
355 .push((session.to_string(), pane_index, key.to_string()));
356 Ok(())
357 }
358 }
359
360 #[derive(Default)]
361 struct RecordingForwarder {
362 forwards: RefCell<Vec<(String, PermissionType, String)>>,
363 }
364 impl QuestionForwarder for RecordingForwarder {
365 fn forward_question(&mut self, agent_id: &str, kind: PermissionType, captured: &str) {
366 self.forwards
367 .borrow_mut()
368 .push((agent_id.to_string(), kind, captured.to_string()));
369 }
370 }
371
372 fn insert_stalled(state: &BrokerState, id: &str, age_secs: u64) {
373 let mut inner = state.write();
374 inner.agents.insert(
375 id.to_string(),
376 AgentRecord {
377 agent_id: id.to_string(),
378 status: "working".to_string(),
379 last_seen: Instant::now()
380 .checked_sub(Duration::from_secs(age_secs))
381 .unwrap_or_else(Instant::now),
382 last_message: Some(BrokerMessage::Status {
383 agent_id: id.to_string(),
384 payload: StatusPayload {
385 status: "working".to_string(),
386 modified_files: Vec::new(),
387 message: None,
388 ..Default::default()
389 },
390 }),
391 },
392 );
393 }
394
395 fn run_tick<R: PaneResolver, I: PaneInspector>(
396 state: &BrokerState,
397 cfg: &AutoApproveConfig,
398 resolver: &R,
399 inspector: &I,
400 ) -> (
401 Vec<(String, TickOutcome)>,
402 RecordingDispatcher,
403 RecordingForwarder,
404 ) {
405 let mut dispatcher = RecordingDispatcher { events: vec![] };
406 let mut forwarder = RecordingForwarder::default();
407 let out = {
408 let mut ctx = PollContext {
409 state: Some(state),
410 session: "paw-x",
411 config: cfg,
412 resolver,
413 inspector,
414 dispatcher: &mut dispatcher,
415 forwarder: &mut forwarder,
416 broker_url: None,
417 };
418 poll_tick(&mut ctx)
419 };
420 (out, dispatcher, forwarder)
421 }
422
423 #[test]
424 fn disabled_config_returns_empty() {
425 let state = BrokerState::new(None);
426 insert_stalled(&state, "stuck", 600);
427 let cfg = AutoApproveConfig {
428 enabled: false,
429 ..AutoApproveConfig::default()
430 };
431 let resolver = |_id: &str| Some(1);
432 let inspector = StubInspector {
433 kind: Some(PermissionType::Cargo),
434 captured: "cargo test".into(),
435 };
436 let (out, dispatcher, _) = run_tick(&state, &cfg, &resolver, &inspector);
437 assert!(out.is_empty());
438 assert!(dispatcher.events.is_empty());
439 }
440
441 #[test]
442 fn stalled_safe_agent_is_approved() {
443 let state = BrokerState::new(None);
444 insert_stalled(&state, "agent-a", 600);
445 let cfg = AutoApproveConfig::default();
446 let resolver = |id: &str| if id == "agent-a" { Some(2) } else { None };
447 let inspector = StubInspector {
448 kind: Some(PermissionType::Cargo),
449 captured: "cargo test --workspace".into(),
450 };
451 let (out, dispatcher, forwarder) = run_tick(&state, &cfg, &resolver, &inspector);
452 assert_eq!(out.len(), 1);
453 let (id, outcome) = &out[0];
454 assert_eq!(id, "agent-a");
455 match outcome {
456 TickOutcome::Approved {
457 matched_entry,
458 kind,
459 } => {
460 assert_eq!(matched_entry, "cargo test");
461 assert_eq!(*kind, PermissionType::Cargo);
462 }
463 _ => panic!("expected Approved, got {outcome:?}"),
464 }
465 let keys: Vec<&str> = dispatcher
467 .events
468 .iter()
469 .map(|(_, _, k)| k.as_str())
470 .collect();
471 assert_eq!(keys, vec!["BTab", "Down", "Enter"]);
472 assert!(forwarder.forwards.borrow().is_empty());
473 }
474
475 #[test]
476 fn stalled_unsafe_agent_is_forwarded_not_approved() {
477 let state = BrokerState::new(None);
478 insert_stalled(&state, "agent-b", 600);
479 let cfg = AutoApproveConfig::default();
480 let resolver = |_id: &str| Some(3);
481 let inspector = StubInspector {
482 kind: Some(PermissionType::Unknown),
483 captured: "rm -rf /tmp/foo\nrequires approval".into(),
484 };
485 let (out, dispatcher, forwarder) = run_tick(&state, &cfg, &resolver, &inspector);
486 assert_eq!(out.len(), 1);
487 match &out[0].1 {
488 TickOutcome::Forwarded { kind } => assert_eq!(*kind, PermissionType::Unknown),
489 other => panic!("expected Forwarded, got {other:?}"),
490 }
491 assert!(
492 dispatcher.events.is_empty(),
493 "no keystrokes for unsafe prompt"
494 );
495 let forwards = forwarder.forwards.borrow();
496 assert_eq!(forwards.len(), 1);
497 assert_eq!(forwards[0].0, "agent-b");
498 }
499
500 #[test]
501 fn fresh_agent_is_skipped() {
502 let state = BrokerState::new(None);
503 insert_stalled(&state, "fresh", 0); let cfg = AutoApproveConfig::default();
505 let resolver = |_id: &str| Some(1);
506 let inspector = StubInspector {
507 kind: Some(PermissionType::Cargo),
508 captured: "cargo test".into(),
509 };
510 let (out, dispatcher, _) = run_tick(&state, &cfg, &resolver, &inspector);
511 assert!(out.is_empty(), "fresh agent must not be polled");
512 assert!(dispatcher.events.is_empty());
513 }
514
515 #[test]
516 fn no_marker_means_no_prompt_outcome() {
517 let state = BrokerState::new(None);
518 insert_stalled(&state, "agent-c", 600);
519 let cfg = AutoApproveConfig::default();
520 let resolver = |_id: &str| Some(1);
521 let inspector = StubInspector {
522 kind: None,
523 captured: String::new(),
524 };
525 let (out, dispatcher, _) = run_tick(&state, &cfg, &resolver, &inspector);
526 assert_eq!(out.len(), 1);
527 assert_eq!(out[0].1, TickOutcome::NoPrompt);
528 assert!(dispatcher.events.is_empty());
529 }
530
531 fn row(agent_id: &str, status: &str, last_seen_seconds: u64) -> AgentStatusRow {
534 AgentStatusRow {
535 agent_id: agent_id.to_string(),
536 status: status.to_string(),
537 last_seen_seconds,
538 }
539 }
540
541 #[test]
542 fn stalled_from_status_filters_by_threshold() {
543 let rows = vec![
544 row("fresh", "working", 5),
545 row("stale", "working", 60),
546 row("ancient", "working", 600),
547 ];
548 let stalled = stalled_from_status(&rows, 30);
549 assert!(stalled.contains(&"stale".to_string()));
550 assert!(stalled.contains(&"ancient".to_string()));
551 assert!(!stalled.contains(&"fresh".to_string()));
552 }
553
554 #[test]
555 fn stalled_from_status_skips_terminal() {
556 let rows = vec![
557 row("a", "done", 600),
558 row("b", "verified", 600),
559 row("c", "blocked", 600),
560 row("d", "committed", 600),
561 row("e", "working", 600),
562 ];
563 let stalled = stalled_from_status(&rows, 30);
564 assert_eq!(stalled, vec!["e".to_string()]);
565 }
566
567 #[test]
568 fn tick_from_status_dispatches_safe_prompt() {
569 let rows = vec![row("agent-a", "working", 300)];
570 let cfg = AutoApproveConfig::default();
571 let resolver = |id: &str| if id == "agent-a" { Some(2) } else { None };
572 let inspector = StubInspector {
573 kind: Some(PermissionType::Cargo),
574 captured: "cargo test --workspace".into(),
575 };
576 let mut dispatcher = RecordingDispatcher { events: vec![] };
577 let mut forwarder = RecordingForwarder::default();
578 let out = {
579 let mut ctx = PollContext {
580 state: None,
581 session: "paw-x",
582 config: &cfg,
583 resolver: &resolver,
584 inspector: &inspector,
585 dispatcher: &mut dispatcher,
586 forwarder: &mut forwarder,
587 broker_url: None,
588 };
589 tick_from_status(&rows, &mut ctx)
590 };
591 assert_eq!(out.len(), 1);
592 let keys: Vec<&str> = dispatcher
593 .events
594 .iter()
595 .map(|(_, _, k)| k.as_str())
596 .collect();
597 assert_eq!(keys, vec!["BTab", "Down", "Enter"]);
598 }
599}