1use std::io::{Read, Write};
19use std::net::TcpStream;
20use std::time::Duration;
21
22use serde::Deserialize;
23
24use crate::broker::BrokerState;
25use crate::config::AutoApproveConfig;
26use crate::error::PawError;
27
28use super::approve::{ApprovalRequest, KeyDispatcher, auto_approve_pane};
29use super::auto_approve::is_safe_command;
30use super::permission_prompt::{PermissionType, detect_permission_prompt};
31use super::stall::detect_stalled_agents;
32
33#[derive(Debug, Clone, PartialEq, Eq)]
35pub enum TickOutcome {
36 NoPrompt,
38 Approved {
40 matched_entry: String,
42 kind: PermissionType,
44 },
45 Forwarded {
48 kind: PermissionType,
50 },
51}
52
53pub trait PaneResolver {
58 fn pane_index_for(&self, agent_id: &str) -> Option<usize>;
61}
62
63impl<F> PaneResolver for F
64where
65 F: Fn(&str) -> Option<usize>,
66{
67 fn pane_index_for(&self, agent_id: &str) -> Option<usize> {
68 self(agent_id)
69 }
70}
71
72pub trait PaneInspector {
77 fn inspect(&self, session: &str, pane_index: usize) -> Option<PermissionType>;
80 fn captured_text(&self, session: &str, pane_index: usize) -> String;
83}
84
85pub struct TmuxPaneInspector;
87
88impl PaneInspector for TmuxPaneInspector {
89 fn inspect(&self, session: &str, pane_index: usize) -> Option<PermissionType> {
90 detect_permission_prompt(session, pane_index)
91 }
92 fn captured_text(&self, session: &str, pane_index: usize) -> String {
93 super::permission_prompt::capture_pane(session, pane_index).unwrap_or_default()
94 }
95}
96
97pub trait QuestionForwarder {
99 fn forward_question(&mut self, agent_id: &str, kind: PermissionType, captured: &str);
104}
105
106pub struct PollContext<'a, R, I, D, Q>
111where
112 R: PaneResolver,
113 I: PaneInspector,
114 D: KeyDispatcher,
115 Q: QuestionForwarder,
116{
117 pub state: Option<&'a BrokerState>,
123 pub session: &'a str,
125 pub config: &'a AutoApproveConfig,
127 pub resolver: &'a R,
129 pub inspector: &'a I,
131 pub dispatcher: &'a mut D,
133 pub forwarder: &'a mut Q,
135 pub broker_url: Option<&'a str>,
137}
138
139pub fn poll_tick<R, I, D, Q>(ctx: &mut PollContext<'_, R, I, D, Q>) -> Vec<(String, TickOutcome)>
142where
143 R: PaneResolver,
144 I: PaneInspector,
145 D: KeyDispatcher,
146 Q: QuestionForwarder,
147{
148 let cfg = ctx.config.resolved();
149 if !cfg.enabled {
150 return Vec::new();
151 }
152 let Some(state) = ctx.state else {
153 return Vec::new();
154 };
155 let threshold = Duration::from_secs(cfg.stall_threshold_seconds);
156 let stalled = detect_stalled_agents(state, threshold);
157 let whitelist = cfg.effective_whitelist();
158 drive_outcomes(stalled, ctx, &cfg, &whitelist)
159}
160
161#[derive(Debug, Clone, Deserialize)]
164pub struct AgentStatusRow {
165 pub agent_id: String,
167 pub status: String,
169 pub last_seen_seconds: u64,
171}
172
173pub fn fetch_status_over_http(broker_url: &str) -> Result<Vec<AgentStatusRow>, PawError> {
179 let addr = broker_url.strip_prefix("http://").unwrap_or(broker_url);
180 let socket_addr = if let Ok(a) = addr.parse() {
181 a
182 } else {
183 use std::net::ToSocketAddrs;
184 addr.to_socket_addrs()
185 .map_err(|e| PawError::SessionError(format!("invalid broker address {addr}: {e}")))?
186 .next()
187 .ok_or_else(|| {
188 PawError::SessionError(format!("broker address {addr} resolved to no addrs"))
189 })?
190 };
191
192 let mut stream = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
193 .map_err(|e| PawError::SessionError(format!("failed to connect to broker: {e}")))?;
194 stream.set_read_timeout(Some(Duration::from_secs(2))).ok();
195 stream.set_write_timeout(Some(Duration::from_secs(2))).ok();
196
197 let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
198 stream
199 .write_all(request.as_bytes())
200 .map_err(|e| PawError::SessionError(format!("failed to write status request: {e}")))?;
201
202 let mut response = String::new();
203 let _ = stream.read_to_string(&mut response);
204
205 let body_start = response
207 .find("\r\n\r\n")
208 .map(|i| i + 4)
209 .ok_or_else(|| PawError::SessionError("malformed broker response".to_string()))?;
210 let body = &response[body_start..];
211
212 let parsed: StatusResponse = serde_json::from_str(body)
213 .map_err(|e| PawError::SessionError(format!("broker /status parse error: {e}")))?;
214 Ok(parsed.agents)
215}
216
217#[derive(Deserialize)]
218struct StatusResponse {
219 agents: Vec<AgentStatusRow>,
220}
221
222#[must_use]
229pub fn stalled_from_status(rows: &[AgentStatusRow], threshold_seconds: u64) -> Vec<String> {
230 rows.iter()
231 .filter(|r| !super::stall::TERMINAL_STATUSES.contains(&r.status.as_str()))
232 .filter(|r| r.last_seen_seconds >= threshold_seconds)
233 .map(|r| r.agent_id.clone())
234 .collect()
235}
236
237pub fn tick_from_status<R, I, D, Q>(
244 rows: &[AgentStatusRow],
245 ctx: &mut PollContext<'_, R, I, D, Q>,
246) -> Vec<(String, TickOutcome)>
247where
248 R: PaneResolver,
249 I: PaneInspector,
250 D: KeyDispatcher,
251 Q: QuestionForwarder,
252{
253 let cfg = ctx.config.resolved();
254 if !cfg.enabled {
255 return Vec::new();
256 }
257 let stalled = stalled_from_status(rows, cfg.stall_threshold_seconds);
258 let whitelist = cfg.effective_whitelist();
259 drive_outcomes(stalled, ctx, &cfg, &whitelist)
260}
261
262fn drive_outcomes<R, I, D, Q>(
263 stalled: Vec<String>,
264 ctx: &mut PollContext<'_, R, I, D, Q>,
265 cfg: &AutoApproveConfig,
266 whitelist: &[String],
267) -> Vec<(String, TickOutcome)>
268where
269 R: PaneResolver,
270 I: PaneInspector,
271 D: KeyDispatcher,
272 Q: QuestionForwarder,
273{
274 let mut out = Vec::with_capacity(stalled.len());
275 for agent_id in stalled {
276 let Some(pane_index) = ctx.resolver.pane_index_for(&agent_id) else {
277 continue;
278 };
279 let Some(kind) = ctx.inspector.inspect(ctx.session, pane_index) else {
280 out.push((agent_id, TickOutcome::NoPrompt));
281 continue;
282 };
283 let captured = ctx.inspector.captured_text(ctx.session, pane_index);
284 let matched = first_whitelist_match(&captured, whitelist);
285 if let Some(entry) = matched {
286 let req = ApprovalRequest {
287 enabled: cfg.enabled,
288 session: ctx.session,
289 pane_index,
290 agent_id: &agent_id,
291 kind,
292 matched_entry: Some(entry.as_str()),
293 broker_url: ctx.broker_url,
294 };
295 match auto_approve_pane(ctx.dispatcher, req) {
296 Ok(true) => out.push((
297 agent_id,
298 TickOutcome::Approved {
299 matched_entry: entry,
300 kind,
301 },
302 )),
303 _ => out.push((agent_id, TickOutcome::Forwarded { kind })),
304 }
305 } else {
306 ctx.forwarder.forward_question(&agent_id, kind, &captured);
307 out.push((agent_id, TickOutcome::Forwarded { kind }));
308 }
309 }
310 out
311}
312
313fn first_whitelist_match(captured: &str, whitelist: &[String]) -> Option<String> {
314 for line in captured.lines() {
318 for entry in whitelist {
319 if is_safe_command(line, std::slice::from_ref(entry)) {
320 return Some(entry.clone());
321 }
322 }
323 }
324 None
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330 use crate::broker::messages::{BrokerMessage, StatusPayload};
331 use crate::broker::{AgentRecord, BrokerState};
332 use crate::config::AutoApproveConfig;
333 use std::cell::RefCell;
334 use std::time::Instant;
335
336 struct StubInspector {
337 kind: Option<PermissionType>,
338 captured: String,
339 }
340 impl PaneInspector for StubInspector {
341 fn inspect(&self, _session: &str, _pane_index: usize) -> Option<PermissionType> {
342 self.kind
343 }
344 fn captured_text(&self, _session: &str, _pane_index: usize) -> String {
345 self.captured.clone()
346 }
347 }
348
349 struct RecordingDispatcher {
350 events: Vec<(String, usize, String)>,
351 }
352 impl KeyDispatcher for RecordingDispatcher {
353 fn send_key(&mut self, session: &str, pane_index: usize, key: &str) -> std::io::Result<()> {
354 self.events
355 .push((session.to_string(), pane_index, key.to_string()));
356 Ok(())
357 }
358 }
359
360 #[derive(Default)]
361 struct RecordingForwarder {
362 forwards: RefCell<Vec<(String, PermissionType, String)>>,
363 }
364 impl QuestionForwarder for RecordingForwarder {
365 fn forward_question(&mut self, agent_id: &str, kind: PermissionType, captured: &str) {
366 self.forwards
367 .borrow_mut()
368 .push((agent_id.to_string(), kind, captured.to_string()));
369 }
370 }
371
372 fn insert_stalled(state: &BrokerState, id: &str, age_secs: u64) {
373 let mut inner = state.write();
374 inner.agents.insert(
375 id.to_string(),
376 AgentRecord {
377 agent_id: id.to_string(),
378 status: "working".to_string(),
379 last_seen: Instant::now()
380 .checked_sub(Duration::from_secs(age_secs))
381 .unwrap_or_else(Instant::now),
382 last_message: Some(BrokerMessage::Status {
383 agent_id: id.to_string(),
384 payload: StatusPayload {
385 status: "working".to_string(),
386 modified_files: Vec::new(),
387 message: None,
388 },
389 }),
390 },
391 );
392 }
393
394 fn run_tick<R: PaneResolver, I: PaneInspector>(
395 state: &BrokerState,
396 cfg: &AutoApproveConfig,
397 resolver: &R,
398 inspector: &I,
399 ) -> (
400 Vec<(String, TickOutcome)>,
401 RecordingDispatcher,
402 RecordingForwarder,
403 ) {
404 let mut dispatcher = RecordingDispatcher { events: vec![] };
405 let mut forwarder = RecordingForwarder::default();
406 let out = {
407 let mut ctx = PollContext {
408 state: Some(state),
409 session: "paw-x",
410 config: cfg,
411 resolver,
412 inspector,
413 dispatcher: &mut dispatcher,
414 forwarder: &mut forwarder,
415 broker_url: None,
416 };
417 poll_tick(&mut ctx)
418 };
419 (out, dispatcher, forwarder)
420 }
421
422 #[test]
423 fn disabled_config_returns_empty() {
424 let state = BrokerState::new(None);
425 insert_stalled(&state, "stuck", 600);
426 let cfg = AutoApproveConfig {
427 enabled: false,
428 ..AutoApproveConfig::default()
429 };
430 let resolver = |_id: &str| Some(1);
431 let inspector = StubInspector {
432 kind: Some(PermissionType::Cargo),
433 captured: "cargo test".into(),
434 };
435 let (out, dispatcher, _) = run_tick(&state, &cfg, &resolver, &inspector);
436 assert!(out.is_empty());
437 assert!(dispatcher.events.is_empty());
438 }
439
440 #[test]
441 fn stalled_safe_agent_is_approved() {
442 let state = BrokerState::new(None);
443 insert_stalled(&state, "agent-a", 600);
444 let cfg = AutoApproveConfig::default();
445 let resolver = |id: &str| if id == "agent-a" { Some(2) } else { None };
446 let inspector = StubInspector {
447 kind: Some(PermissionType::Cargo),
448 captured: "cargo test --workspace".into(),
449 };
450 let (out, dispatcher, forwarder) = run_tick(&state, &cfg, &resolver, &inspector);
451 assert_eq!(out.len(), 1);
452 let (id, outcome) = &out[0];
453 assert_eq!(id, "agent-a");
454 match outcome {
455 TickOutcome::Approved {
456 matched_entry,
457 kind,
458 } => {
459 assert_eq!(matched_entry, "cargo test");
460 assert_eq!(*kind, PermissionType::Cargo);
461 }
462 _ => panic!("expected Approved, got {outcome:?}"),
463 }
464 let keys: Vec<&str> = dispatcher
466 .events
467 .iter()
468 .map(|(_, _, k)| k.as_str())
469 .collect();
470 assert_eq!(keys, vec!["BTab", "Down", "Enter"]);
471 assert!(forwarder.forwards.borrow().is_empty());
472 }
473
474 #[test]
475 fn stalled_unsafe_agent_is_forwarded_not_approved() {
476 let state = BrokerState::new(None);
477 insert_stalled(&state, "agent-b", 600);
478 let cfg = AutoApproveConfig::default();
479 let resolver = |_id: &str| Some(3);
480 let inspector = StubInspector {
481 kind: Some(PermissionType::Unknown),
482 captured: "rm -rf /tmp/foo\nrequires approval".into(),
483 };
484 let (out, dispatcher, forwarder) = run_tick(&state, &cfg, &resolver, &inspector);
485 assert_eq!(out.len(), 1);
486 match &out[0].1 {
487 TickOutcome::Forwarded { kind } => assert_eq!(*kind, PermissionType::Unknown),
488 other => panic!("expected Forwarded, got {other:?}"),
489 }
490 assert!(
491 dispatcher.events.is_empty(),
492 "no keystrokes for unsafe prompt"
493 );
494 let forwards = forwarder.forwards.borrow();
495 assert_eq!(forwards.len(), 1);
496 assert_eq!(forwards[0].0, "agent-b");
497 }
498
499 #[test]
500 fn fresh_agent_is_skipped() {
501 let state = BrokerState::new(None);
502 insert_stalled(&state, "fresh", 0); let cfg = AutoApproveConfig::default();
504 let resolver = |_id: &str| Some(1);
505 let inspector = StubInspector {
506 kind: Some(PermissionType::Cargo),
507 captured: "cargo test".into(),
508 };
509 let (out, dispatcher, _) = run_tick(&state, &cfg, &resolver, &inspector);
510 assert!(out.is_empty(), "fresh agent must not be polled");
511 assert!(dispatcher.events.is_empty());
512 }
513
514 #[test]
515 fn no_marker_means_no_prompt_outcome() {
516 let state = BrokerState::new(None);
517 insert_stalled(&state, "agent-c", 600);
518 let cfg = AutoApproveConfig::default();
519 let resolver = |_id: &str| Some(1);
520 let inspector = StubInspector {
521 kind: None,
522 captured: String::new(),
523 };
524 let (out, dispatcher, _) = run_tick(&state, &cfg, &resolver, &inspector);
525 assert_eq!(out.len(), 1);
526 assert_eq!(out[0].1, TickOutcome::NoPrompt);
527 assert!(dispatcher.events.is_empty());
528 }
529
530 fn row(agent_id: &str, status: &str, last_seen_seconds: u64) -> AgentStatusRow {
533 AgentStatusRow {
534 agent_id: agent_id.to_string(),
535 status: status.to_string(),
536 last_seen_seconds,
537 }
538 }
539
540 #[test]
541 fn stalled_from_status_filters_by_threshold() {
542 let rows = vec![
543 row("fresh", "working", 5),
544 row("stale", "working", 60),
545 row("ancient", "working", 600),
546 ];
547 let stalled = stalled_from_status(&rows, 30);
548 assert!(stalled.contains(&"stale".to_string()));
549 assert!(stalled.contains(&"ancient".to_string()));
550 assert!(!stalled.contains(&"fresh".to_string()));
551 }
552
553 #[test]
554 fn stalled_from_status_skips_terminal() {
555 let rows = vec![
556 row("a", "done", 600),
557 row("b", "verified", 600),
558 row("c", "blocked", 600),
559 row("d", "committed", 600),
560 row("e", "working", 600),
561 ];
562 let stalled = stalled_from_status(&rows, 30);
563 assert_eq!(stalled, vec!["e".to_string()]);
564 }
565
566 #[test]
567 fn tick_from_status_dispatches_safe_prompt() {
568 let rows = vec![row("agent-a", "working", 300)];
569 let cfg = AutoApproveConfig::default();
570 let resolver = |id: &str| if id == "agent-a" { Some(2) } else { None };
571 let inspector = StubInspector {
572 kind: Some(PermissionType::Cargo),
573 captured: "cargo test --workspace".into(),
574 };
575 let mut dispatcher = RecordingDispatcher { events: vec![] };
576 let mut forwarder = RecordingForwarder::default();
577 let out = {
578 let mut ctx = PollContext {
579 state: None,
580 session: "paw-x",
581 config: &cfg,
582 resolver: &resolver,
583 inspector: &inspector,
584 dispatcher: &mut dispatcher,
585 forwarder: &mut forwarder,
586 broker_url: None,
587 };
588 tick_from_status(&rows, &mut ctx)
589 };
590 assert_eq!(out.len(), 1);
591 let keys: Vec<&str> = dispatcher
592 .events
593 .iter()
594 .map(|(_, _, k)| k.as_str())
595 .collect();
596 assert_eq!(keys, vec!["BTab", "Down", "Enter"]);
597 }
598}