Skip to main content

git_paw/supervisor/
poll.rs

1//! Single-tick orchestration for the supervisor auto-approve poll loop.
2//!
3//! Exposes [`poll_tick`] — given a broker state, a session name, a
4//! pane-index resolver, and an [`AutoApproveConfig`], it:
5//!
6//! 1. Detects stalled agents via [`super::stall::detect_stalled_agents`].
7//! 2. Captures each stalled agent's pane via
8//!    [`super::permission_prompt::detect_permission_prompt`].
9//! 3. For safe-classified prompts, dispatches `BTab Down Enter` via
10//!    [`super::approve::auto_approve_pane`].
11//! 4. For `Unknown` prompts, forwards a question to the dashboard inbox so
12//!    the human can resolve it.
13//!
14//! The loop driver lives in `main.rs` (background thread spawned by
15//! `cmd_supervisor`); this module keeps the per-tick logic pure and
16//! testable.
17
18use std::io::{Read, Write};
19use std::net::TcpStream;
20use std::time::Duration;
21
22use serde::Deserialize;
23
24use crate::broker::BrokerState;
25use crate::config::AutoApproveConfig;
26use crate::error::PawError;
27
28use super::approve::{ApprovalRequest, KeyDispatcher, auto_approve_pane};
29use super::auto_approve::is_safe_command;
30use super::permission_prompt::{PermissionType, detect_permission_prompt};
31use super::stall::detect_stalled_agents;
32
33/// Outcome of processing a single stalled agent during a poll tick.
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub enum TickOutcome {
36    /// No permission prompt was found in the pane.
37    NoPrompt,
38    /// Prompt was detected, classified safe, and approved.
39    Approved {
40        /// Whitelist entry that matched the captured command.
41        matched_entry: String,
42        /// Permission class of the approved prompt.
43        kind: PermissionType,
44    },
45    /// Prompt was detected but did not match the whitelist; the supervisor
46    /// should forward it to the dashboard.
47    Forwarded {
48        /// Permission class of the forwarded prompt.
49        kind: PermissionType,
50    },
51}
52
53/// Trait providing the pane-index for a given agent ID.
54///
55/// `cmd_supervisor` knows the mapping from session state; tests substitute
56/// a closure-backed implementation.
57pub trait PaneResolver {
58    /// Returns the tmux pane index for `agent_id`, or `None` if the agent
59    /// has no pane (e.g. the supervisor itself).
60    fn pane_index_for(&self, agent_id: &str) -> Option<usize>;
61}
62
63impl<F> PaneResolver for F
64where
65    F: Fn(&str) -> Option<usize>,
66{
67    fn pane_index_for(&self, agent_id: &str) -> Option<usize> {
68        self(agent_id)
69    }
70}
71
72/// Trait providing the captured pane content for an agent.
73///
74/// In production this is a thin shim over [`super::permission_prompt::capture_pane`].
75/// Tests inject a stub so the captured text is deterministic.
76pub trait PaneInspector {
77    /// Captures the pane and returns the classification, or `None` when
78    /// no approval marker is present.
79    fn inspect(&self, session: &str, pane_index: usize) -> Option<PermissionType>;
80    /// Returns the raw captured content for whitelist matching, or empty
81    /// string when capture fails.
82    fn captured_text(&self, session: &str, pane_index: usize) -> String;
83}
84
85/// Production [`PaneInspector`] backed by `tmux capture-pane`.
86pub struct TmuxPaneInspector;
87
88impl PaneInspector for TmuxPaneInspector {
89    fn inspect(&self, session: &str, pane_index: usize) -> Option<PermissionType> {
90        detect_permission_prompt(session, pane_index)
91    }
92    fn captured_text(&self, session: &str, pane_index: usize) -> String {
93        super::permission_prompt::capture_pane(session, pane_index).unwrap_or_default()
94    }
95}
96
97/// Forwarder for unsafe prompts — abstracted so tests can record forwards.
98pub trait QuestionForwarder {
99    /// Forward a question to the supervisor dashboard inbox.
100    ///
101    /// Returns the dispatch result; failures are logged but do not abort
102    /// the poll tick.
103    fn forward_question(&mut self, agent_id: &str, kind: PermissionType, captured: &str);
104}
105
106/// Inputs for [`poll_tick`].
107///
108/// Bundled so the per-tick API is one parameter wide and clippy's
109/// `too_many_arguments` lint stays happy.
110pub struct PollContext<'a, R, I, D, Q>
111where
112    R: PaneResolver,
113    I: PaneInspector,
114    D: KeyDispatcher,
115    Q: QuestionForwarder,
116{
117    /// Broker state used for stall detection by [`poll_tick`].
118    ///
119    /// Set to `None` when calling [`tick_from_status`] from a process that
120    /// does not own the broker state (e.g. the supervisor's background
121    /// poll thread, which queries `/status` over HTTP instead).
122    pub state: Option<&'a BrokerState>,
123    /// tmux session name.
124    pub session: &'a str,
125    /// Auto-approve config (presets applied by [`poll_tick`]).
126    pub config: &'a AutoApproveConfig,
127    /// Resolves agent ID to pane index.
128    pub resolver: &'a R,
129    /// Inspects pane content.
130    pub inspector: &'a I,
131    /// Sends approval keystrokes.
132    pub dispatcher: &'a mut D,
133    /// Forwards unsafe prompts to the dashboard.
134    pub forwarder: &'a mut Q,
135    /// Optional broker URL for audit-log publishing.
136    pub broker_url: Option<&'a str>,
137}
138
139/// Runs one tick of the auto-approve poll loop and returns the outcome
140/// for each stalled agent (in iteration order).
141pub fn poll_tick<R, I, D, Q>(ctx: &mut PollContext<'_, R, I, D, Q>) -> Vec<(String, TickOutcome)>
142where
143    R: PaneResolver,
144    I: PaneInspector,
145    D: KeyDispatcher,
146    Q: QuestionForwarder,
147{
148    let cfg = ctx.config.resolved();
149    if !cfg.enabled {
150        return Vec::new();
151    }
152    let Some(state) = ctx.state else {
153        return Vec::new();
154    };
155    let threshold = Duration::from_secs(cfg.stall_threshold_seconds);
156    let stalled = detect_stalled_agents(state, threshold);
157    let whitelist = cfg.effective_whitelist();
158    drive_outcomes(stalled, ctx, &cfg, &whitelist)
159}
160
161/// Subset of an agent record returned by the broker `/status` endpoint that
162/// the supervisor poll loop cares about.
163#[derive(Debug, Clone, Deserialize)]
164pub struct AgentStatusRow {
165    /// Agent identifier (slugified branch name).
166    pub agent_id: String,
167    /// Status label (e.g. `"working"`, `"done"`).
168    pub status: String,
169    /// Seconds since the agent was last seen.
170    pub last_seen_seconds: u64,
171}
172
173/// Fetches the broker `/status` endpoint and returns the agent summary.
174///
175/// Used by `cmd_supervisor`'s background poll thread because the broker
176/// state lives in the dashboard process, not in `cmd_supervisor` itself.
177/// Errors are surfaced so the caller can decide whether to retry.
178pub fn fetch_status_over_http(broker_url: &str) -> Result<Vec<AgentStatusRow>, PawError> {
179    let addr = broker_url.strip_prefix("http://").unwrap_or(broker_url);
180    let socket_addr = if let Ok(a) = addr.parse() {
181        a
182    } else {
183        use std::net::ToSocketAddrs;
184        addr.to_socket_addrs()
185            .map_err(|e| PawError::SessionError(format!("invalid broker address {addr}: {e}")))?
186            .next()
187            .ok_or_else(|| {
188                PawError::SessionError(format!("broker address {addr} resolved to no addrs"))
189            })?
190    };
191
192    let mut stream = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
193        .map_err(|e| PawError::SessionError(format!("failed to connect to broker: {e}")))?;
194    stream.set_read_timeout(Some(Duration::from_secs(2))).ok();
195    stream.set_write_timeout(Some(Duration::from_secs(2))).ok();
196
197    let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
198    stream
199        .write_all(request.as_bytes())
200        .map_err(|e| PawError::SessionError(format!("failed to write status request: {e}")))?;
201
202    let mut response = String::new();
203    let _ = stream.read_to_string(&mut response);
204
205    // Find the JSON body (first `{` after the headers).
206    let body_start = response
207        .find("\r\n\r\n")
208        .map(|i| i + 4)
209        .ok_or_else(|| PawError::SessionError("malformed broker response".to_string()))?;
210    let body = &response[body_start..];
211
212    let parsed: StatusResponse = serde_json::from_str(body)
213        .map_err(|e| PawError::SessionError(format!("broker /status parse error: {e}")))?;
214    Ok(parsed.agents)
215}
216
217#[derive(Deserialize)]
218struct StatusResponse {
219    agents: Vec<AgentStatusRow>,
220}
221
222/// Returns the IDs of agents whose `status` is non-terminal and whose
223/// `last_seen_seconds` is at or above `threshold_seconds`.
224///
225/// HTTP-friendly counterpart to [`super::stall::detect_stalled_agents`]
226/// for callers that only have a `/status` snapshot (the supervisor's
227/// background poll thread).
228#[must_use]
229pub fn stalled_from_status(rows: &[AgentStatusRow], threshold_seconds: u64) -> Vec<String> {
230    rows.iter()
231        .filter(|r| !super::stall::TERMINAL_STATUSES.contains(&r.status.as_str()))
232        .filter(|r| r.last_seen_seconds >= threshold_seconds)
233        .map(|r| r.agent_id.clone())
234        .collect()
235}
236
237/// Runs one tick driven by an HTTP `/status` snapshot rather than an
238/// in-process [`BrokerState`].
239///
240/// Mirrors [`poll_tick`] but takes pre-fetched [`AgentStatusRow`] entries
241/// so the supervisor's background thread does not need access to the
242/// broker's lock.
243pub fn tick_from_status<R, I, D, Q>(
244    rows: &[AgentStatusRow],
245    ctx: &mut PollContext<'_, R, I, D, Q>,
246) -> Vec<(String, TickOutcome)>
247where
248    R: PaneResolver,
249    I: PaneInspector,
250    D: KeyDispatcher,
251    Q: QuestionForwarder,
252{
253    let cfg = ctx.config.resolved();
254    if !cfg.enabled {
255        return Vec::new();
256    }
257    let stalled = stalled_from_status(rows, cfg.stall_threshold_seconds);
258    let whitelist = cfg.effective_whitelist();
259    drive_outcomes(stalled, ctx, &cfg, &whitelist)
260}
261
262fn drive_outcomes<R, I, D, Q>(
263    stalled: Vec<String>,
264    ctx: &mut PollContext<'_, R, I, D, Q>,
265    cfg: &AutoApproveConfig,
266    whitelist: &[String],
267) -> Vec<(String, TickOutcome)>
268where
269    R: PaneResolver,
270    I: PaneInspector,
271    D: KeyDispatcher,
272    Q: QuestionForwarder,
273{
274    let mut out = Vec::with_capacity(stalled.len());
275    for agent_id in stalled {
276        let Some(pane_index) = ctx.resolver.pane_index_for(&agent_id) else {
277            continue;
278        };
279        let Some(kind) = ctx.inspector.inspect(ctx.session, pane_index) else {
280            out.push((agent_id, TickOutcome::NoPrompt));
281            continue;
282        };
283        let captured = ctx.inspector.captured_text(ctx.session, pane_index);
284        let matched = first_whitelist_match(&captured, whitelist);
285        if let Some(entry) = matched {
286            let req = ApprovalRequest {
287                enabled: cfg.enabled,
288                session: ctx.session,
289                pane_index,
290                agent_id: &agent_id,
291                kind,
292                matched_entry: Some(entry.as_str()),
293                broker_url: ctx.broker_url,
294            };
295            match auto_approve_pane(ctx.dispatcher, req) {
296                Ok(true) => out.push((
297                    agent_id,
298                    TickOutcome::Approved {
299                        matched_entry: entry,
300                        kind,
301                    },
302                )),
303                _ => out.push((agent_id, TickOutcome::Forwarded { kind })),
304            }
305        } else {
306            ctx.forwarder.forward_question(&agent_id, kind, &captured);
307            out.push((agent_id, TickOutcome::Forwarded { kind }));
308        }
309    }
310    out
311}
312
313fn first_whitelist_match(captured: &str, whitelist: &[String]) -> Option<String> {
314    // Walk lines so multi-line pane captures only match the actual command
315    // being prompted. Using is_safe_command per-line keeps the prefix-
316    // boundary semantics intact.
317    for line in captured.lines() {
318        for entry in whitelist {
319            if is_safe_command(line, std::slice::from_ref(entry)) {
320                return Some(entry.clone());
321            }
322        }
323    }
324    None
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330    use crate::broker::messages::{BrokerMessage, StatusPayload};
331    use crate::broker::{AgentRecord, BrokerState};
332    use crate::config::AutoApproveConfig;
333    use std::cell::RefCell;
334    use std::time::Instant;
335
336    struct StubInspector {
337        kind: Option<PermissionType>,
338        captured: String,
339    }
340    impl PaneInspector for StubInspector {
341        fn inspect(&self, _session: &str, _pane_index: usize) -> Option<PermissionType> {
342            self.kind
343        }
344        fn captured_text(&self, _session: &str, _pane_index: usize) -> String {
345            self.captured.clone()
346        }
347    }
348
349    struct RecordingDispatcher {
350        events: Vec<(String, usize, String)>,
351    }
352    impl KeyDispatcher for RecordingDispatcher {
353        fn send_key(&mut self, session: &str, pane_index: usize, key: &str) -> std::io::Result<()> {
354            self.events
355                .push((session.to_string(), pane_index, key.to_string()));
356            Ok(())
357        }
358    }
359
360    #[derive(Default)]
361    struct RecordingForwarder {
362        forwards: RefCell<Vec<(String, PermissionType, String)>>,
363    }
364    impl QuestionForwarder for RecordingForwarder {
365        fn forward_question(&mut self, agent_id: &str, kind: PermissionType, captured: &str) {
366            self.forwards
367                .borrow_mut()
368                .push((agent_id.to_string(), kind, captured.to_string()));
369        }
370    }
371
372    fn insert_stalled(state: &BrokerState, id: &str, age_secs: u64) {
373        let mut inner = state.write();
374        inner.agents.insert(
375            id.to_string(),
376            AgentRecord {
377                agent_id: id.to_string(),
378                status: "working".to_string(),
379                last_seen: Instant::now()
380                    .checked_sub(Duration::from_secs(age_secs))
381                    .unwrap_or_else(Instant::now),
382                last_message: Some(BrokerMessage::Status {
383                    agent_id: id.to_string(),
384                    payload: StatusPayload {
385                        status: "working".to_string(),
386                        modified_files: Vec::new(),
387                        message: None,
388                    },
389                }),
390            },
391        );
392    }
393
394    fn run_tick<R: PaneResolver, I: PaneInspector>(
395        state: &BrokerState,
396        cfg: &AutoApproveConfig,
397        resolver: &R,
398        inspector: &I,
399    ) -> (
400        Vec<(String, TickOutcome)>,
401        RecordingDispatcher,
402        RecordingForwarder,
403    ) {
404        let mut dispatcher = RecordingDispatcher { events: vec![] };
405        let mut forwarder = RecordingForwarder::default();
406        let out = {
407            let mut ctx = PollContext {
408                state: Some(state),
409                session: "paw-x",
410                config: cfg,
411                resolver,
412                inspector,
413                dispatcher: &mut dispatcher,
414                forwarder: &mut forwarder,
415                broker_url: None,
416            };
417            poll_tick(&mut ctx)
418        };
419        (out, dispatcher, forwarder)
420    }
421
422    #[test]
423    fn disabled_config_returns_empty() {
424        let state = BrokerState::new(None);
425        insert_stalled(&state, "stuck", 600);
426        let cfg = AutoApproveConfig {
427            enabled: false,
428            ..AutoApproveConfig::default()
429        };
430        let resolver = |_id: &str| Some(1);
431        let inspector = StubInspector {
432            kind: Some(PermissionType::Cargo),
433            captured: "cargo test".into(),
434        };
435        let (out, dispatcher, _) = run_tick(&state, &cfg, &resolver, &inspector);
436        assert!(out.is_empty());
437        assert!(dispatcher.events.is_empty());
438    }
439
440    #[test]
441    fn stalled_safe_agent_is_approved() {
442        let state = BrokerState::new(None);
443        insert_stalled(&state, "agent-a", 600);
444        let cfg = AutoApproveConfig::default();
445        let resolver = |id: &str| if id == "agent-a" { Some(2) } else { None };
446        let inspector = StubInspector {
447            kind: Some(PermissionType::Cargo),
448            captured: "cargo test --workspace".into(),
449        };
450        let (out, dispatcher, forwarder) = run_tick(&state, &cfg, &resolver, &inspector);
451        assert_eq!(out.len(), 1);
452        let (id, outcome) = &out[0];
453        assert_eq!(id, "agent-a");
454        match outcome {
455            TickOutcome::Approved {
456                matched_entry,
457                kind,
458            } => {
459                assert_eq!(matched_entry, "cargo test");
460                assert_eq!(*kind, PermissionType::Cargo);
461            }
462            _ => panic!("expected Approved, got {outcome:?}"),
463        }
464        // BTab + Down + Enter dispatched in order.
465        let keys: Vec<&str> = dispatcher
466            .events
467            .iter()
468            .map(|(_, _, k)| k.as_str())
469            .collect();
470        assert_eq!(keys, vec!["BTab", "Down", "Enter"]);
471        assert!(forwarder.forwards.borrow().is_empty());
472    }
473
474    #[test]
475    fn stalled_unsafe_agent_is_forwarded_not_approved() {
476        let state = BrokerState::new(None);
477        insert_stalled(&state, "agent-b", 600);
478        let cfg = AutoApproveConfig::default();
479        let resolver = |_id: &str| Some(3);
480        let inspector = StubInspector {
481            kind: Some(PermissionType::Unknown),
482            captured: "rm -rf /tmp/foo\nrequires approval".into(),
483        };
484        let (out, dispatcher, forwarder) = run_tick(&state, &cfg, &resolver, &inspector);
485        assert_eq!(out.len(), 1);
486        match &out[0].1 {
487            TickOutcome::Forwarded { kind } => assert_eq!(*kind, PermissionType::Unknown),
488            other => panic!("expected Forwarded, got {other:?}"),
489        }
490        assert!(
491            dispatcher.events.is_empty(),
492            "no keystrokes for unsafe prompt"
493        );
494        let forwards = forwarder.forwards.borrow();
495        assert_eq!(forwards.len(), 1);
496        assert_eq!(forwards[0].0, "agent-b");
497    }
498
499    #[test]
500    fn fresh_agent_is_skipped() {
501        let state = BrokerState::new(None);
502        insert_stalled(&state, "fresh", 0); // age 0 < 30s threshold
503        let cfg = AutoApproveConfig::default();
504        let resolver = |_id: &str| Some(1);
505        let inspector = StubInspector {
506            kind: Some(PermissionType::Cargo),
507            captured: "cargo test".into(),
508        };
509        let (out, dispatcher, _) = run_tick(&state, &cfg, &resolver, &inspector);
510        assert!(out.is_empty(), "fresh agent must not be polled");
511        assert!(dispatcher.events.is_empty());
512    }
513
514    #[test]
515    fn no_marker_means_no_prompt_outcome() {
516        let state = BrokerState::new(None);
517        insert_stalled(&state, "agent-c", 600);
518        let cfg = AutoApproveConfig::default();
519        let resolver = |_id: &str| Some(1);
520        let inspector = StubInspector {
521            kind: None,
522            captured: String::new(),
523        };
524        let (out, dispatcher, _) = run_tick(&state, &cfg, &resolver, &inspector);
525        assert_eq!(out.len(), 1);
526        assert_eq!(out[0].1, TickOutcome::NoPrompt);
527        assert!(dispatcher.events.is_empty());
528    }
529
530    // --- stalled_from_status / tick_from_status ---
531
532    fn row(agent_id: &str, status: &str, last_seen_seconds: u64) -> AgentStatusRow {
533        AgentStatusRow {
534            agent_id: agent_id.to_string(),
535            status: status.to_string(),
536            last_seen_seconds,
537        }
538    }
539
540    #[test]
541    fn stalled_from_status_filters_by_threshold() {
542        let rows = vec![
543            row("fresh", "working", 5),
544            row("stale", "working", 60),
545            row("ancient", "working", 600),
546        ];
547        let stalled = stalled_from_status(&rows, 30);
548        assert!(stalled.contains(&"stale".to_string()));
549        assert!(stalled.contains(&"ancient".to_string()));
550        assert!(!stalled.contains(&"fresh".to_string()));
551    }
552
553    #[test]
554    fn stalled_from_status_skips_terminal() {
555        let rows = vec![
556            row("a", "done", 600),
557            row("b", "verified", 600),
558            row("c", "blocked", 600),
559            row("d", "committed", 600),
560            row("e", "working", 600),
561        ];
562        let stalled = stalled_from_status(&rows, 30);
563        assert_eq!(stalled, vec!["e".to_string()]);
564    }
565
566    #[test]
567    fn tick_from_status_dispatches_safe_prompt() {
568        let rows = vec![row("agent-a", "working", 300)];
569        let cfg = AutoApproveConfig::default();
570        let resolver = |id: &str| if id == "agent-a" { Some(2) } else { None };
571        let inspector = StubInspector {
572            kind: Some(PermissionType::Cargo),
573            captured: "cargo test --workspace".into(),
574        };
575        let mut dispatcher = RecordingDispatcher { events: vec![] };
576        let mut forwarder = RecordingForwarder::default();
577        let out = {
578            let mut ctx = PollContext {
579                state: None,
580                session: "paw-x",
581                config: &cfg,
582                resolver: &resolver,
583                inspector: &inspector,
584                dispatcher: &mut dispatcher,
585                forwarder: &mut forwarder,
586                broker_url: None,
587            };
588            tick_from_status(&rows, &mut ctx)
589        };
590        assert_eq!(out.len(), 1);
591        let keys: Vec<&str> = dispatcher
592            .events
593            .iter()
594            .map(|(_, _, k)| k.as_str())
595            .collect();
596        assert_eq!(keys, vec!["BTab", "Down", "Enter"]);
597    }
598}