Skip to main content

git_paw/supervisor/
poll.rs

1//! Single-tick orchestration for the supervisor auto-approve poll loop.
2//!
3//! Exposes [`poll_tick`] — given a broker state, a session name, a
4//! pane-index resolver, and an [`AutoApproveConfig`], it:
5//!
6//! 1. Detects stalled agents via [`super::stall::detect_stalled_agents`].
7//! 2. Captures each stalled agent's pane via
8//!    [`super::permission_prompt::detect_permission_prompt`].
9//! 3. For safe-classified prompts, dispatches `BTab Down Enter` via
10//!    [`super::approve::auto_approve_pane`].
11//! 4. For `Unknown` prompts, forwards a question to the dashboard inbox so
12//!    the human can resolve it.
13//!
14//! The loop driver lives in `main.rs` (background thread spawned by
15//! `cmd_supervisor`); this module keeps the per-tick logic pure and
16//! testable.
17
18use std::io::{Read, Write};
19use std::net::TcpStream;
20use std::time::Duration;
21
22use serde::Deserialize;
23
24use crate::broker::BrokerState;
25use crate::config::AutoApproveConfig;
26use crate::error::PawError;
27
28use super::approve::{ApprovalRequest, KeyDispatcher, auto_approve_pane};
29use super::auto_approve::is_safe_command;
30use super::permission_prompt::{PermissionType, detect_permission_prompt};
31use super::stall::detect_stalled_agents;
32
33/// Outcome of processing a single stalled agent during a poll tick.
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub enum TickOutcome {
36    /// No permission prompt was found in the pane.
37    NoPrompt,
38    /// Prompt was detected, classified safe, and approved.
39    Approved {
40        /// Whitelist entry that matched the captured command.
41        matched_entry: String,
42        /// Permission class of the approved prompt.
43        kind: PermissionType,
44    },
45    /// Prompt was detected but did not match the whitelist; the supervisor
46    /// should forward it to the dashboard.
47    Forwarded {
48        /// Permission class of the forwarded prompt.
49        kind: PermissionType,
50    },
51}
52
53/// Trait providing the pane-index for a given agent ID.
54///
55/// `cmd_supervisor` knows the mapping from session state; tests substitute
56/// a closure-backed implementation.
57pub trait PaneResolver {
58    /// Returns the tmux pane index for `agent_id`, or `None` if the agent
59    /// has no pane (e.g. the supervisor itself).
60    fn pane_index_for(&self, agent_id: &str) -> Option<usize>;
61}
62
63impl<F> PaneResolver for F
64where
65    F: Fn(&str) -> Option<usize>,
66{
67    fn pane_index_for(&self, agent_id: &str) -> Option<usize> {
68        self(agent_id)
69    }
70}
71
72/// Trait providing the captured pane content for an agent.
73///
74/// In production this is a thin shim over [`super::permission_prompt::capture_pane`].
75/// Tests inject a stub so the captured text is deterministic.
76pub trait PaneInspector {
77    /// Captures the pane and returns the classification, or `None` when
78    /// no approval marker is present.
79    fn inspect(&self, session: &str, pane_index: usize) -> Option<PermissionType>;
80    /// Returns the raw captured content for whitelist matching, or empty
81    /// string when capture fails.
82    fn captured_text(&self, session: &str, pane_index: usize) -> String;
83}
84
85/// Production [`PaneInspector`] backed by `tmux capture-pane`.
86pub struct TmuxPaneInspector;
87
88impl PaneInspector for TmuxPaneInspector {
89    fn inspect(&self, session: &str, pane_index: usize) -> Option<PermissionType> {
90        detect_permission_prompt(session, pane_index)
91    }
92    fn captured_text(&self, session: &str, pane_index: usize) -> String {
93        super::permission_prompt::capture_pane(session, pane_index).unwrap_or_default()
94    }
95}
96
97/// Forwarder for unsafe prompts — abstracted so tests can record forwards.
98pub trait QuestionForwarder {
99    /// Forward a question to the supervisor dashboard inbox.
100    ///
101    /// Returns the dispatch result; failures are logged but do not abort
102    /// the poll tick.
103    fn forward_question(&mut self, agent_id: &str, kind: PermissionType, captured: &str);
104}
105
106/// Inputs for [`poll_tick`].
107///
108/// Bundled so the per-tick API is one parameter wide and clippy's
109/// `too_many_arguments` lint stays happy.
110pub struct PollContext<'a, R, I, D, Q>
111where
112    R: PaneResolver,
113    I: PaneInspector,
114    D: KeyDispatcher,
115    Q: QuestionForwarder,
116{
117    /// Broker state used for stall detection by [`poll_tick`].
118    ///
119    /// Set to `None` when calling [`tick_from_status`] from a process that
120    /// does not own the broker state (e.g. the supervisor's background
121    /// poll thread, which queries `/status` over HTTP instead).
122    pub state: Option<&'a BrokerState>,
123    /// tmux session name.
124    pub session: &'a str,
125    /// Auto-approve config (presets applied by [`poll_tick`]).
126    pub config: &'a AutoApproveConfig,
127    /// Resolves agent ID to pane index.
128    pub resolver: &'a R,
129    /// Inspects pane content.
130    pub inspector: &'a I,
131    /// Sends approval keystrokes.
132    pub dispatcher: &'a mut D,
133    /// Forwards unsafe prompts to the dashboard.
134    pub forwarder: &'a mut Q,
135    /// Optional broker URL for audit-log publishing.
136    pub broker_url: Option<&'a str>,
137}
138
139/// Runs one tick of the auto-approve poll loop and returns the outcome
140/// for each stalled agent (in iteration order).
141pub fn poll_tick<R, I, D, Q>(ctx: &mut PollContext<'_, R, I, D, Q>) -> Vec<(String, TickOutcome)>
142where
143    R: PaneResolver,
144    I: PaneInspector,
145    D: KeyDispatcher,
146    Q: QuestionForwarder,
147{
148    let cfg = ctx.config.resolved();
149    if !cfg.enabled {
150        return Vec::new();
151    }
152    let Some(state) = ctx.state else {
153        return Vec::new();
154    };
155    let threshold = Duration::from_secs(cfg.stall_threshold_seconds);
156    let stalled = detect_stalled_agents(state, threshold);
157    let whitelist = cfg.effective_whitelist();
158    drive_outcomes(stalled, ctx, &cfg, &whitelist)
159}
160
161/// Subset of an agent record returned by the broker `/status` endpoint that
162/// the supervisor poll loop cares about.
163#[derive(Debug, Clone, Deserialize)]
164pub struct AgentStatusRow {
165    /// Agent identifier (slugified branch name).
166    pub agent_id: String,
167    /// Status label (e.g. `"working"`, `"done"`).
168    pub status: String,
169    /// Seconds since the agent was last seen.
170    pub last_seen_seconds: u64,
171}
172
173/// Fetches the broker `/status` endpoint and returns the agent summary.
174///
175/// Used by `cmd_supervisor`'s background poll thread because the broker
176/// state lives in the dashboard process, not in `cmd_supervisor` itself.
177/// Errors are surfaced so the caller can decide whether to retry.
178pub fn fetch_status_over_http(broker_url: &str) -> Result<Vec<AgentStatusRow>, PawError> {
179    let addr = broker_url.strip_prefix("http://").unwrap_or(broker_url);
180    let socket_addr = if let Ok(a) = addr.parse() {
181        a
182    } else {
183        use std::net::ToSocketAddrs;
184        addr.to_socket_addrs()
185            .map_err(|e| PawError::SessionError(format!("invalid broker address {addr}: {e}")))?
186            .next()
187            .ok_or_else(|| {
188                PawError::SessionError(format!("broker address {addr} resolved to no addrs"))
189            })?
190    };
191
192    let mut stream = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
193        .map_err(|e| PawError::SessionError(format!("failed to connect to broker: {e}")))?;
194    stream.set_read_timeout(Some(Duration::from_secs(2))).ok();
195    stream.set_write_timeout(Some(Duration::from_secs(2))).ok();
196
197    let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
198    stream
199        .write_all(request.as_bytes())
200        .map_err(|e| PawError::SessionError(format!("failed to write status request: {e}")))?;
201
202    let mut response = String::new();
203    let _ = stream.read_to_string(&mut response);
204
205    // Find the JSON body (first `{` after the headers).
206    let body_start = response
207        .find("\r\n\r\n")
208        .map(|i| i + 4)
209        .ok_or_else(|| PawError::SessionError("malformed broker response".to_string()))?;
210    let body = &response[body_start..];
211
212    let parsed: StatusResponse = serde_json::from_str(body)
213        .map_err(|e| PawError::SessionError(format!("broker /status parse error: {e}")))?;
214    Ok(parsed.agents)
215}
216
217#[derive(Deserialize)]
218struct StatusResponse {
219    agents: Vec<AgentStatusRow>,
220}
221
222/// Returns the IDs of agents whose `status` is non-terminal and whose
223/// `last_seen_seconds` is at or above `threshold_seconds`.
224///
225/// HTTP-friendly counterpart to [`super::stall::detect_stalled_agents`]
226/// for callers that only have a `/status` snapshot (the supervisor's
227/// background poll thread).
228#[must_use]
229pub fn stalled_from_status(rows: &[AgentStatusRow], threshold_seconds: u64) -> Vec<String> {
230    rows.iter()
231        .filter(|r| !super::stall::TERMINAL_STATUSES.contains(&r.status.as_str()))
232        .filter(|r| r.last_seen_seconds >= threshold_seconds)
233        .map(|r| r.agent_id.clone())
234        .collect()
235}
236
237/// Runs one tick driven by an HTTP `/status` snapshot rather than an
238/// in-process [`BrokerState`].
239///
240/// Mirrors [`poll_tick`] but takes pre-fetched [`AgentStatusRow`] entries
241/// so the supervisor's background thread does not need access to the
242/// broker's lock.
243pub fn tick_from_status<R, I, D, Q>(
244    rows: &[AgentStatusRow],
245    ctx: &mut PollContext<'_, R, I, D, Q>,
246) -> Vec<(String, TickOutcome)>
247where
248    R: PaneResolver,
249    I: PaneInspector,
250    D: KeyDispatcher,
251    Q: QuestionForwarder,
252{
253    let cfg = ctx.config.resolved();
254    if !cfg.enabled {
255        return Vec::new();
256    }
257    let stalled = stalled_from_status(rows, cfg.stall_threshold_seconds);
258    let whitelist = cfg.effective_whitelist();
259    drive_outcomes(stalled, ctx, &cfg, &whitelist)
260}
261
262fn drive_outcomes<R, I, D, Q>(
263    stalled: Vec<String>,
264    ctx: &mut PollContext<'_, R, I, D, Q>,
265    cfg: &AutoApproveConfig,
266    whitelist: &[String],
267) -> Vec<(String, TickOutcome)>
268where
269    R: PaneResolver,
270    I: PaneInspector,
271    D: KeyDispatcher,
272    Q: QuestionForwarder,
273{
274    let mut out = Vec::with_capacity(stalled.len());
275    for agent_id in stalled {
276        let Some(pane_index) = ctx.resolver.pane_index_for(&agent_id) else {
277            continue;
278        };
279        let Some(kind) = ctx.inspector.inspect(ctx.session, pane_index) else {
280            out.push((agent_id, TickOutcome::NoPrompt));
281            continue;
282        };
283        let captured = ctx.inspector.captured_text(ctx.session, pane_index);
284        let matched = first_whitelist_match(&captured, whitelist);
285        if let Some(entry) = matched {
286            let req = ApprovalRequest {
287                enabled: cfg.enabled,
288                session: ctx.session,
289                pane_index,
290                agent_id: &agent_id,
291                kind,
292                matched_entry: Some(entry.as_str()),
293                broker_url: ctx.broker_url,
294            };
295            match auto_approve_pane(ctx.dispatcher, req) {
296                Ok(true) => out.push((
297                    agent_id,
298                    TickOutcome::Approved {
299                        matched_entry: entry,
300                        kind,
301                    },
302                )),
303                _ => out.push((agent_id, TickOutcome::Forwarded { kind })),
304            }
305        } else {
306            ctx.forwarder.forward_question(&agent_id, kind, &captured);
307            out.push((agent_id, TickOutcome::Forwarded { kind }));
308        }
309    }
310    out
311}
312
313fn first_whitelist_match(captured: &str, whitelist: &[String]) -> Option<String> {
314    // Walk lines so multi-line pane captures only match the actual command
315    // being prompted. Using is_safe_command per-line keeps the prefix-
316    // boundary semantics intact.
317    for line in captured.lines() {
318        for entry in whitelist {
319            if is_safe_command(line, std::slice::from_ref(entry)) {
320                return Some(entry.clone());
321            }
322        }
323    }
324    None
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330    use crate::broker::messages::{BrokerMessage, StatusPayload};
331    use crate::broker::{AgentRecord, BrokerState};
332    use crate::config::AutoApproveConfig;
333    use std::cell::RefCell;
334    use std::time::Instant;
335
336    struct StubInspector {
337        kind: Option<PermissionType>,
338        captured: String,
339    }
340    impl PaneInspector for StubInspector {
341        fn inspect(&self, _session: &str, _pane_index: usize) -> Option<PermissionType> {
342            self.kind
343        }
344        fn captured_text(&self, _session: &str, _pane_index: usize) -> String {
345            self.captured.clone()
346        }
347    }
348
349    struct RecordingDispatcher {
350        events: Vec<(String, usize, String)>,
351    }
352    impl KeyDispatcher for RecordingDispatcher {
353        fn send_key(&mut self, session: &str, pane_index: usize, key: &str) -> std::io::Result<()> {
354            self.events
355                .push((session.to_string(), pane_index, key.to_string()));
356            Ok(())
357        }
358    }
359
360    #[derive(Default)]
361    struct RecordingForwarder {
362        forwards: RefCell<Vec<(String, PermissionType, String)>>,
363    }
364    impl QuestionForwarder for RecordingForwarder {
365        fn forward_question(&mut self, agent_id: &str, kind: PermissionType, captured: &str) {
366            self.forwards
367                .borrow_mut()
368                .push((agent_id.to_string(), kind, captured.to_string()));
369        }
370    }
371
372    fn insert_stalled(state: &BrokerState, id: &str, age_secs: u64) {
373        let mut inner = state.write();
374        inner.agents.insert(
375            id.to_string(),
376            AgentRecord {
377                agent_id: id.to_string(),
378                status: "working".to_string(),
379                last_seen: Instant::now()
380                    .checked_sub(Duration::from_secs(age_secs))
381                    .unwrap_or_else(Instant::now),
382                last_message: Some(BrokerMessage::Status {
383                    agent_id: id.to_string(),
384                    payload: StatusPayload {
385                        status: "working".to_string(),
386                        modified_files: Vec::new(),
387                        message: None,
388                        ..Default::default()
389                    },
390                }),
391            },
392        );
393    }
394
395    fn run_tick<R: PaneResolver, I: PaneInspector>(
396        state: &BrokerState,
397        cfg: &AutoApproveConfig,
398        resolver: &R,
399        inspector: &I,
400    ) -> (
401        Vec<(String, TickOutcome)>,
402        RecordingDispatcher,
403        RecordingForwarder,
404    ) {
405        let mut dispatcher = RecordingDispatcher { events: vec![] };
406        let mut forwarder = RecordingForwarder::default();
407        let out = {
408            let mut ctx = PollContext {
409                state: Some(state),
410                session: "paw-x",
411                config: cfg,
412                resolver,
413                inspector,
414                dispatcher: &mut dispatcher,
415                forwarder: &mut forwarder,
416                broker_url: None,
417            };
418            poll_tick(&mut ctx)
419        };
420        (out, dispatcher, forwarder)
421    }
422
423    #[test]
424    fn disabled_config_returns_empty() {
425        let state = BrokerState::new(None);
426        insert_stalled(&state, "stuck", 600);
427        let cfg = AutoApproveConfig {
428            enabled: false,
429            ..AutoApproveConfig::default()
430        };
431        let resolver = |_id: &str| Some(1);
432        let inspector = StubInspector {
433            kind: Some(PermissionType::Cargo),
434            captured: "cargo test".into(),
435        };
436        let (out, dispatcher, _) = run_tick(&state, &cfg, &resolver, &inspector);
437        assert!(out.is_empty());
438        assert!(dispatcher.events.is_empty());
439    }
440
441    #[test]
442    fn stalled_safe_agent_is_approved() {
443        let state = BrokerState::new(None);
444        insert_stalled(&state, "agent-a", 600);
445        let cfg = AutoApproveConfig::default();
446        let resolver = |id: &str| if id == "agent-a" { Some(2) } else { None };
447        let inspector = StubInspector {
448            kind: Some(PermissionType::Cargo),
449            captured: "cargo test --workspace".into(),
450        };
451        let (out, dispatcher, forwarder) = run_tick(&state, &cfg, &resolver, &inspector);
452        assert_eq!(out.len(), 1);
453        let (id, outcome) = &out[0];
454        assert_eq!(id, "agent-a");
455        match outcome {
456            TickOutcome::Approved {
457                matched_entry,
458                kind,
459            } => {
460                assert_eq!(matched_entry, "cargo test");
461                assert_eq!(*kind, PermissionType::Cargo);
462            }
463            _ => panic!("expected Approved, got {outcome:?}"),
464        }
465        // BTab + Down + Enter dispatched in order.
466        let keys: Vec<&str> = dispatcher
467            .events
468            .iter()
469            .map(|(_, _, k)| k.as_str())
470            .collect();
471        assert_eq!(keys, vec!["BTab", "Down", "Enter"]);
472        assert!(forwarder.forwards.borrow().is_empty());
473    }
474
475    #[test]
476    fn stalled_unsafe_agent_is_forwarded_not_approved() {
477        let state = BrokerState::new(None);
478        insert_stalled(&state, "agent-b", 600);
479        let cfg = AutoApproveConfig::default();
480        let resolver = |_id: &str| Some(3);
481        let inspector = StubInspector {
482            kind: Some(PermissionType::Unknown),
483            captured: "rm -rf /tmp/foo\nrequires approval".into(),
484        };
485        let (out, dispatcher, forwarder) = run_tick(&state, &cfg, &resolver, &inspector);
486        assert_eq!(out.len(), 1);
487        match &out[0].1 {
488            TickOutcome::Forwarded { kind } => assert_eq!(*kind, PermissionType::Unknown),
489            other => panic!("expected Forwarded, got {other:?}"),
490        }
491        assert!(
492            dispatcher.events.is_empty(),
493            "no keystrokes for unsafe prompt"
494        );
495        let forwards = forwarder.forwards.borrow();
496        assert_eq!(forwards.len(), 1);
497        assert_eq!(forwards[0].0, "agent-b");
498    }
499
500    #[test]
501    fn fresh_agent_is_skipped() {
502        let state = BrokerState::new(None);
503        insert_stalled(&state, "fresh", 0); // age 0 < 30s threshold
504        let cfg = AutoApproveConfig::default();
505        let resolver = |_id: &str| Some(1);
506        let inspector = StubInspector {
507            kind: Some(PermissionType::Cargo),
508            captured: "cargo test".into(),
509        };
510        let (out, dispatcher, _) = run_tick(&state, &cfg, &resolver, &inspector);
511        assert!(out.is_empty(), "fresh agent must not be polled");
512        assert!(dispatcher.events.is_empty());
513    }
514
515    #[test]
516    fn no_marker_means_no_prompt_outcome() {
517        let state = BrokerState::new(None);
518        insert_stalled(&state, "agent-c", 600);
519        let cfg = AutoApproveConfig::default();
520        let resolver = |_id: &str| Some(1);
521        let inspector = StubInspector {
522            kind: None,
523            captured: String::new(),
524        };
525        let (out, dispatcher, _) = run_tick(&state, &cfg, &resolver, &inspector);
526        assert_eq!(out.len(), 1);
527        assert_eq!(out[0].1, TickOutcome::NoPrompt);
528        assert!(dispatcher.events.is_empty());
529    }
530
531    // --- stalled_from_status / tick_from_status ---
532
533    fn row(agent_id: &str, status: &str, last_seen_seconds: u64) -> AgentStatusRow {
534        AgentStatusRow {
535            agent_id: agent_id.to_string(),
536            status: status.to_string(),
537            last_seen_seconds,
538        }
539    }
540
541    #[test]
542    fn stalled_from_status_filters_by_threshold() {
543        let rows = vec![
544            row("fresh", "working", 5),
545            row("stale", "working", 60),
546            row("ancient", "working", 600),
547        ];
548        let stalled = stalled_from_status(&rows, 30);
549        assert!(stalled.contains(&"stale".to_string()));
550        assert!(stalled.contains(&"ancient".to_string()));
551        assert!(!stalled.contains(&"fresh".to_string()));
552    }
553
554    #[test]
555    fn stalled_from_status_skips_terminal() {
556        let rows = vec![
557            row("a", "done", 600),
558            row("b", "verified", 600),
559            row("c", "blocked", 600),
560            row("d", "committed", 600),
561            row("e", "working", 600),
562        ];
563        let stalled = stalled_from_status(&rows, 30);
564        assert_eq!(stalled, vec!["e".to_string()]);
565    }
566
567    #[test]
568    fn tick_from_status_dispatches_safe_prompt() {
569        let rows = vec![row("agent-a", "working", 300)];
570        let cfg = AutoApproveConfig::default();
571        let resolver = |id: &str| if id == "agent-a" { Some(2) } else { None };
572        let inspector = StubInspector {
573            kind: Some(PermissionType::Cargo),
574            captured: "cargo test --workspace".into(),
575        };
576        let mut dispatcher = RecordingDispatcher { events: vec![] };
577        let mut forwarder = RecordingForwarder::default();
578        let out = {
579            let mut ctx = PollContext {
580                state: None,
581                session: "paw-x",
582                config: &cfg,
583                resolver: &resolver,
584                inspector: &inspector,
585                dispatcher: &mut dispatcher,
586                forwarder: &mut forwarder,
587                broker_url: None,
588            };
589            tick_from_status(&rows, &mut ctx)
590        };
591        assert_eq!(out.len(), 1);
592        let keys: Vec<&str> = dispatcher
593            .events
594            .iter()
595            .map(|(_, _, k)| k.as_str())
596            .collect();
597        assert_eq!(keys, vec!["BTab", "Down", "Enter"]);
598    }
599}