Skip to main content

batty_cli/team/watcher/
mod.rs

1//! Disk-based session monitoring — polls agent output via tmux capture-pane.
2//!
3//! Detects agent completion, crashes, and staleness by periodically capturing
4//! pane output and checking for state changes. For Codex and Claude, this also
5//! tails their on-disk session JSONL data to reduce false classifications from
6//! stale pane text.
7
8mod claude;
9mod codex;
10mod screen;
11
12use anyhow::Result;
13use std::fs;
14use std::path::{Path, PathBuf};
15use std::time::Instant;
16
17use crate::tmux;
18
19pub(crate) use claude::discover_claude_session_file;
20pub use screen::is_at_agent_prompt;
21
22use claude::ClaudeSessionTracker;
23use codex::CodexSessionTracker;
24use screen::{classify_capture_state, detect_context_exhausted, next_state_after_capture};
25
26/// State of a watched agent session.
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum WatcherState {
29    /// Agent is actively producing output.
30    Active,
31    /// Agent CLI has started and is showing its input prompt, ready for messages.
32    /// Transitional state between pane creation and first Idle classification.
33    Ready,
34    /// No agent running in pane (idle / waiting for assignment).
35    Idle,
36    /// The tmux pane no longer exists or its process has exited.
37    PaneDead,
38    /// Agent reported that its conversation/session is too large to continue.
39    ContextExhausted,
40}
41
42pub struct SessionWatcher {
43    pub pane_id: String,
44    pub member_name: String,
45    pub state: WatcherState,
46    completion_observed: bool,
47    last_output_hash: u64,
48    last_capture: String,
49    /// Timestamp of the last time pane output changed (hash differed from previous poll).
50    last_output_changed_at: Instant,
51    tracker: Option<SessionTracker>,
52    /// Whether the agent prompt has been observed at least once since creation
53    /// or last `activate()`. False means the pane may not be ready for messages.
54    ready_confirmed: bool,
55}
56
57#[derive(Debug, Clone, Default, PartialEq, Eq)]
58pub struct CodexQualitySignals {
59    pub last_response_chars: Option<usize>,
60    pub assistant_message_count: u32,
61    pub tool_call_count: u32,
62    pub unique_tool_names: Vec<String>,
63    pub shortening_streak: u32,
64    pub repeated_output_streak: u32,
65    pub shrinking_responses: bool,
66    pub repeated_identical_outputs: bool,
67    pub tool_failure_message: Option<String>,
68}
69
70#[derive(Debug, Clone)]
71pub enum SessionTrackerConfig {
72    Codex { cwd: PathBuf },
73    Claude { cwd: PathBuf },
74}
75
76enum SessionTracker {
77    Codex(CodexSessionTracker),
78    Claude(ClaudeSessionTracker),
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub(super) enum TrackerKind {
83    None,
84    Codex,
85    Claude,
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub(super) enum TrackerState {
90    Active,
91    Idle,
92    Completed,
93    Unknown,
94}
95
96impl SessionWatcher {
97    pub fn new(
98        pane_id: &str,
99        member_name: &str,
100        _stale_secs: u64,
101        tracker: Option<SessionTrackerConfig>,
102    ) -> Self {
103        Self {
104            pane_id: pane_id.to_string(),
105            member_name: member_name.to_string(),
106            state: WatcherState::Idle,
107            completion_observed: false,
108            last_output_hash: 0,
109            last_capture: String::new(),
110            last_output_changed_at: Instant::now(),
111            ready_confirmed: false,
112            tracker: tracker.map(|tracker| match tracker {
113                SessionTrackerConfig::Codex { cwd } => SessionTracker::Codex(CodexSessionTracker {
114                    sessions_root: default_codex_sessions_root(),
115                    cwd,
116                    session_id: None,
117                    session_file: None,
118                    offset: 0,
119                    quality: CodexQualitySignals::default(),
120                    last_response_hash: None,
121                }),
122                SessionTrackerConfig::Claude { cwd } => {
123                    SessionTracker::Claude(ClaudeSessionTracker {
124                        projects_root: default_claude_projects_root(),
125                        cwd,
126                        session_id: None,
127                        session_file: None,
128                        offset: 0,
129                        last_state: TrackerState::Unknown,
130                    })
131                }
132            }),
133        }
134    }
135
136    /// Poll the pane and update state.
137    pub fn poll(&mut self) -> Result<WatcherState> {
138        // Check if pane still exists
139        if !tmux::pane_exists(&self.pane_id) {
140            self.state = WatcherState::PaneDead;
141            return Ok(self.state);
142        }
143
144        // Check if pane process died
145        if tmux::pane_dead(&self.pane_id).unwrap_or(false) {
146            self.state = WatcherState::PaneDead;
147            return Ok(self.state);
148        }
149
150        // If idle or ready, peek at the pane to detect if the agent started working.
151        // This lets the watcher self-heal without requiring explicit activation
152        // from the daemon whenever a nudge, standup, or external input arrives.
153        if matches!(self.state, WatcherState::Idle | WatcherState::Ready) {
154            let capture = match tmux::capture_pane(&self.pane_id) {
155                Ok(capture) => capture,
156                Err(_) => {
157                    self.state = WatcherState::PaneDead;
158                    return Ok(self.state);
159                }
160            };
161            if detect_context_exhausted(&capture) {
162                self.last_capture = capture;
163                self.state = WatcherState::ContextExhausted;
164                return Ok(self.state);
165            }
166            let screen_state = classify_capture_state(&capture);
167            // When we first see the agent prompt, confirm readiness.
168            if screen_state == screen::ScreenState::Idle && !self.ready_confirmed {
169                self.ready_confirmed = true;
170                self.last_capture = capture;
171                self.state = WatcherState::Ready;
172                return Ok(self.state);
173            }
174            let tracker_state = self.poll_tracker().unwrap_or(TrackerState::Unknown);
175            self.completion_observed = tracker_state == TrackerState::Completed;
176            let tracker_kind = self.tracker_kind();
177            if !capture.is_empty() {
178                self.last_capture = capture;
179                let next_state =
180                    next_state_after_capture(tracker_kind, screen_state, tracker_state, self.state);
181                if next_state != WatcherState::Idle || self.ready_confirmed {
182                    self.last_output_hash = simple_hash(&self.last_capture);
183                    self.last_output_changed_at = Instant::now();
184                    self.state = next_state;
185                }
186            }
187            return Ok(self.state);
188        }
189
190        // Capture current pane content
191        let capture = match tmux::capture_pane(&self.pane_id) {
192            Ok(capture) => capture,
193            Err(_) => {
194                self.state = WatcherState::PaneDead;
195                return Ok(self.state);
196            }
197        };
198        if detect_context_exhausted(&capture) {
199            self.last_capture = capture;
200            self.state = WatcherState::ContextExhausted;
201            return Ok(self.state);
202        }
203        let hash = simple_hash(&capture);
204        let screen_state = classify_capture_state(&capture);
205        let tracker_state = self.poll_tracker().unwrap_or(TrackerState::Unknown);
206        self.completion_observed = tracker_state == TrackerState::Completed;
207        let tracker_kind = self.tracker_kind();
208
209        if hash != self.last_output_hash {
210            self.last_output_hash = hash;
211            self.last_output_changed_at = Instant::now();
212            self.last_capture = capture;
213            self.state =
214                next_state_after_capture(tracker_kind, screen_state, tracker_state, self.state);
215        } else {
216            self.last_capture = capture;
217            self.state =
218                next_state_after_capture(tracker_kind, screen_state, tracker_state, self.state);
219        }
220
221        Ok(self.state)
222    }
223
224    /// Whether this agent's pane has been confirmed ready for message delivery.
225    ///
226    /// Returns `true` once the agent prompt has been observed at least once since
227    /// the watcher was created or last activated. This prevents injecting messages
228    /// into panes where the agent CLI hasn't finished starting.
229    pub fn is_ready_for_delivery(&self) -> bool {
230        self.ready_confirmed
231    }
232
233    /// Externally confirm that the agent pane is ready (e.g. from a delivery
234    /// readiness check that observed the prompt). Updates state to Ready if it
235    /// was still in the initial Idle state before first readiness confirmation.
236    pub fn confirm_ready(&mut self) {
237        let was_unconfirmed = !self.ready_confirmed;
238        self.ready_confirmed = true;
239        if was_unconfirmed && self.state == WatcherState::Idle {
240            self.state = WatcherState::Ready;
241        }
242    }
243
244    /// Mark this watcher as actively working.
245    pub fn activate(&mut self) {
246        self.state = WatcherState::Active;
247        self.completion_observed = false;
248        self.last_output_hash = 0;
249        self.last_output_changed_at = Instant::now();
250        // A message was just injected so the pane was confirmed ready.
251        self.ready_confirmed = true;
252        if let Some(tracker) = self.tracker.as_mut() {
253            match tracker {
254                SessionTracker::Codex(codex) => {
255                    codex.session_file = None;
256                    codex.offset = 0;
257                    codex.quality = CodexQualitySignals::default();
258                    codex.last_response_hash = None;
259                }
260                SessionTracker::Claude(claude) => {
261                    claude.session_file = None;
262                    claude.offset = 0;
263                    claude.last_state = TrackerState::Unknown;
264                }
265            }
266        }
267    }
268
269    pub fn set_session_id(&mut self, session_id: Option<String>) {
270        if let Some(tracker) = self.tracker.as_mut() {
271            match tracker {
272                SessionTracker::Codex(codex) => {
273                    if codex.session_id == session_id {
274                        return;
275                    }
276                    self.completion_observed = false;
277                    codex.session_id = session_id;
278                    codex.session_file = None;
279                    codex.offset = 0;
280                    codex.quality = CodexQualitySignals::default();
281                    codex.last_response_hash = None;
282                }
283                SessionTracker::Claude(claude) => {
284                    if claude.session_id == session_id {
285                        return;
286                    }
287                    self.completion_observed = false;
288                    claude.session_id = session_id;
289                    claude.session_file = None;
290                    claude.offset = 0;
291                    claude.last_state = TrackerState::Unknown;
292                }
293            }
294        }
295    }
296
297    /// Mark this watcher as idle.
298    pub fn deactivate(&mut self) {
299        self.state = WatcherState::Idle;
300        self.completion_observed = false;
301    }
302
303    /// Seconds since the last time pane output changed.
304    pub fn secs_since_last_output_change(&self) -> u64 {
305        self.last_output_changed_at.elapsed().as_secs()
306    }
307
308    /// Get the last captured pane output.
309    pub fn last_output(&self) -> &str {
310        &self.last_capture
311    }
312
313    /// Get the last N lines of captured output.
314    pub fn last_lines(&self, n: usize) -> String {
315        let lines: Vec<&str> = self.last_capture.lines().collect();
316        let start = lines.len().saturating_sub(n);
317        lines[start..].join("\n")
318    }
319
320    pub fn current_session_id(&self) -> Option<String> {
321        match self.tracker.as_ref() {
322            Some(SessionTracker::Codex(codex)) => codex
323                .session_file
324                .as_ref()
325                .and_then(|path| codex::codex_session_resume_id(path).ok().flatten())
326                .or_else(|| codex.session_id.clone()),
327            Some(SessionTracker::Claude(claude)) => session_file_id(claude.session_file.as_ref()),
328            None => None,
329        }
330    }
331
332    pub fn configured_session_id(&self) -> Option<String> {
333        match self.tracker.as_ref() {
334            Some(SessionTracker::Codex(codex)) => codex.session_id.clone(),
335            Some(SessionTracker::Claude(claude)) => claude.session_id.clone(),
336            None => None,
337        }
338    }
339
340    pub fn current_session_size_bytes(&self) -> Option<u64> {
341        let path = match self.tracker.as_ref() {
342            Some(SessionTracker::Codex(codex)) => codex.session_file.as_ref(),
343            Some(SessionTracker::Claude(claude)) => claude.session_file.as_ref(),
344            None => None,
345        }?;
346        fs::metadata(path).ok().map(|metadata| metadata.len())
347    }
348
349    pub fn codex_quality_signals(&self) -> Option<CodexQualitySignals> {
350        match self.tracker.as_ref() {
351            Some(SessionTracker::Codex(codex)) => Some(codex.quality.clone()),
352            _ => None,
353        }
354    }
355
356    pub fn take_completion_event(&mut self) -> bool {
357        let observed = self.completion_observed;
358        self.completion_observed = false;
359        observed
360    }
361
362    pub fn refresh_session_tracking(&mut self) -> Result<()> {
363        let _ = self.poll_tracker()?;
364        Ok(())
365    }
366
367    fn poll_tracker(&mut self) -> Result<TrackerState> {
368        let current_state = self.state;
369        let Some(tracker) = self.tracker.as_mut() else {
370            return Ok(TrackerState::Unknown);
371        };
372
373        match tracker {
374            SessionTracker::Codex(codex) => {
375                if codex.session_file.is_none() {
376                    codex.session_file = codex::discover_codex_session_file(
377                        &codex.sessions_root,
378                        &codex.cwd,
379                        codex.session_id.as_deref(),
380                    )?;
381                    if let Some(session_file) = codex.session_file.as_ref() {
382                        codex.session_id = codex::codex_session_resume_id(session_file)?;
383                        codex.offset = current_file_len(session_file)?;
384                    }
385                    codex.quality = CodexQualitySignals::default();
386                    codex.last_response_hash = None;
387                    return Ok(TrackerState::Unknown);
388                }
389
390                let Some(session_file) = codex.session_file.clone() else {
391                    return Ok(TrackerState::Unknown);
392                };
393
394                if !session_file.exists() {
395                    codex.session_file = None;
396                    codex.offset = 0;
397                    codex.quality = CodexQualitySignals::default();
398                    codex.last_response_hash = None;
399                    return Ok(TrackerState::Unknown);
400                }
401
402                let state = codex::poll_codex_session_file(
403                    &session_file,
404                    &mut codex.offset,
405                    &mut codex.quality,
406                    &mut codex.last_response_hash,
407                )?;
408
409                // When idle with no new events, check if a newer session file
410                // appeared (agent started a new task). Re-discover so the
411                // tracker picks up the latest session.
412                if state == TrackerState::Unknown
413                    && matches!(current_state, WatcherState::Idle | WatcherState::Ready)
414                {
415                    if let Some(latest) = codex::discover_codex_session_file(
416                        &codex.sessions_root,
417                        &codex.cwd,
418                        codex.session_id.as_deref(),
419                    )? {
420                        if latest != session_file {
421                            codex.session_file = Some(latest.clone());
422                            codex.session_id = codex::codex_session_resume_id(&latest)?;
423                            codex.offset = 0;
424                            codex.quality = CodexQualitySignals::default();
425                            codex.last_response_hash = None;
426                            return codex::poll_codex_session_file(
427                                &latest,
428                                &mut codex.offset,
429                                &mut codex.quality,
430                                &mut codex.last_response_hash,
431                            );
432                        }
433                    }
434                }
435
436                Ok(state)
437            }
438            SessionTracker::Claude(claude) => claude::poll_claude_session(claude),
439        }
440    }
441
442    fn tracker_kind(&self) -> TrackerKind {
443        match self.tracker {
444            Some(SessionTracker::Codex(_)) => TrackerKind::Codex,
445            Some(SessionTracker::Claude(_)) => TrackerKind::Claude,
446            None => TrackerKind::None,
447        }
448    }
449}
450
451// --- Shared utility functions used by submodules ---
452
453pub(super) fn simple_hash(s: &str) -> u64 {
454    // FNV-1a style hash, good enough for change detection
455    let mut hash: u64 = 0xcbf29ce484222325;
456    for byte in s.bytes() {
457        hash ^= byte as u64;
458        hash = hash.wrapping_mul(0x100000001b3);
459    }
460    hash
461}
462
463fn default_codex_sessions_root() -> PathBuf {
464    std::env::var_os("HOME")
465        .map(PathBuf::from)
466        .unwrap_or_else(|| PathBuf::from("/"))
467        .join(".codex")
468        .join("sessions")
469}
470
471fn default_claude_projects_root() -> PathBuf {
472    std::env::var_os("HOME")
473        .map(PathBuf::from)
474        .unwrap_or_else(|| PathBuf::from("/"))
475        .join(".claude")
476        .join("projects")
477}
478
479pub(super) fn current_file_len(path: &Path) -> Result<u64> {
480    Ok(fs::metadata(path)?.len())
481}
482
483pub(super) fn session_file_id(path: Option<&PathBuf>) -> Option<String> {
484    path.and_then(|path| {
485        path.file_stem()
486            .and_then(|stem| stem.to_str())
487            .map(|stem| stem.to_string())
488    })
489}
490
491pub(super) fn read_dir_paths(dir: &Path) -> Result<Vec<PathBuf>> {
492    let mut paths = Vec::new();
493    for entry in fs::read_dir(dir)? {
494        let entry = entry?;
495        paths.push(entry.path());
496    }
497    Ok(paths)
498}
499
500#[cfg(test)]
501mod tests {
502    use super::*;
503    use serial_test::serial;
504
505    #[test]
506    fn simple_hash_differs_for_different_input() {
507        assert_ne!(simple_hash("hello"), simple_hash("world"));
508        assert_eq!(simple_hash("same"), simple_hash("same"));
509    }
510
511    #[test]
512    fn new_watcher_starts_idle() {
513        let w = SessionWatcher::new("%0", "eng-1-1", 300, None);
514        assert_eq!(w.state, WatcherState::Idle);
515    }
516
517    #[test]
518    fn activate_sets_active() {
519        let mut w = SessionWatcher::new("%0", "eng-1-1", 300, None);
520        w.activate();
521        assert_eq!(w.state, WatcherState::Active);
522    }
523
524    #[test]
525    fn deactivate_sets_idle() {
526        let mut w = SessionWatcher::new("%0", "eng-1-1", 300, None);
527        w.activate();
528        w.deactivate();
529        assert_eq!(w.state, WatcherState::Idle);
530    }
531
532    #[test]
533    fn last_lines_returns_tail() {
534        let mut w = SessionWatcher::new("%0", "eng-1-1", 300, None);
535        w.last_capture = "line1\nline2\nline3\nline4\nline5".to_string();
536        assert_eq!(w.last_lines(3), "line3\nline4\nline5");
537        assert_eq!(w.last_lines(10), "line1\nline2\nline3\nline4\nline5");
538    }
539
540    #[test]
541    #[serial]
542    #[cfg_attr(not(feature = "integration"), ignore)]
543    fn idle_poll_consumes_non_empty_capture() {
544        let session = "batty-test-watcher-idle-poll";
545        let _ = crate::tmux::kill_session(session);
546
547        crate::tmux::create_session(
548            session,
549            "bash",
550            &[
551                "-lc".to_string(),
552                "printf 'watcher-idle-poll\\n'; sleep 3".to_string(),
553            ],
554            "/tmp",
555        )
556        .unwrap();
557        std::thread::sleep(std::time::Duration::from_millis(300));
558
559        let pane_id = crate::tmux::pane_id(session).unwrap();
560        let mut watcher = SessionWatcher::new(&pane_id, "eng-1-1", 300, None);
561
562        assert_eq!(watcher.poll().unwrap(), WatcherState::Idle);
563        assert!(!watcher.last_output().is_empty());
564
565        crate::tmux::kill_session(session).unwrap();
566    }
567
568    #[test]
569    #[serial]
570    #[cfg_attr(not(feature = "integration"), ignore)]
571    fn active_poll_updates_state_when_capture_changes() {
572        let session = "batty-test-watcher-active-change";
573        let _ = crate::tmux::kill_session(session);
574
575        crate::tmux::create_session(
576            session,
577            "bash",
578            &[
579                "-lc".to_string(),
580                "printf 'watcher-active-change\\n'; sleep 3".to_string(),
581            ],
582            "/tmp",
583        )
584        .unwrap();
585        std::thread::sleep(std::time::Duration::from_millis(300));
586
587        let pane_id = crate::tmux::pane_id(session).unwrap();
588        let mut watcher = SessionWatcher::new(&pane_id, "eng-1-1", 300, None);
589        watcher.state = WatcherState::Active;
590
591        assert_eq!(watcher.poll().unwrap(), WatcherState::Active);
592        assert_ne!(watcher.last_output_hash, 0);
593        assert!(!watcher.last_output().is_empty());
594
595        crate::tmux::kill_session(session).unwrap();
596    }
597
598    #[test]
599    #[serial]
600    #[cfg_attr(not(feature = "integration"), ignore)]
601    fn idle_poll_detects_context_exhaustion() {
602        let session = format!("batty-test-watcher-context-exhaust-{}", std::process::id());
603        let _ = crate::tmux::kill_session(&session);
604
605        crate::tmux::create_session(&session, "cat", &[], "/tmp").unwrap();
606        let pane_id = crate::tmux::pane_id(&session).unwrap();
607        std::thread::sleep(std::time::Duration::from_millis(100));
608        crate::tmux::send_keys(&pane_id, "Conversation is too long to continue.", true).unwrap();
609        std::thread::sleep(std::time::Duration::from_millis(150));
610
611        let mut watcher = SessionWatcher::new(&pane_id, "eng-1-1", 300, None);
612
613        assert_eq!(watcher.poll().unwrap(), WatcherState::ContextExhausted);
614        assert!(watcher.last_output().contains("Conversation is too long"));
615
616        crate::tmux::kill_session(&session).unwrap();
617    }
618
619    #[test]
620    #[serial]
621    #[cfg_attr(not(feature = "integration"), ignore)]
622    fn active_poll_keeps_previous_state_when_capture_is_unchanged() {
623        let session = "batty-test-watcher-unchanged";
624        let _ = crate::tmux::kill_session(session);
625
626        crate::tmux::create_session(
627            session,
628            "bash",
629            &[
630                "-lc".to_string(),
631                "printf 'watcher-unchanged\\n'; sleep 3".to_string(),
632            ],
633            "/tmp",
634        )
635        .unwrap();
636        std::thread::sleep(std::time::Duration::from_millis(300));
637
638        let pane_id = crate::tmux::pane_id(session).unwrap();
639        let capture = crate::tmux::capture_pane(&pane_id).unwrap();
640        let mut watcher = SessionWatcher::new(&pane_id, "eng-1-1", 0, None);
641        watcher.state = WatcherState::Active;
642        watcher.last_capture = capture.clone();
643        watcher.last_output_hash = simple_hash(&capture);
644
645        assert_eq!(watcher.poll().unwrap(), WatcherState::Active);
646
647        crate::tmux::kill_session(session).unwrap();
648    }
649
650    #[test]
651    fn missing_pane_poll_reports_pane_dead() {
652        let mut watcher = SessionWatcher::new("%999999", "eng-1-1", 300, None);
653        assert_eq!(watcher.poll().unwrap(), WatcherState::PaneDead);
654    }
655
656    #[test]
657    #[serial]
658    #[cfg_attr(not(feature = "integration"), ignore)]
659    fn pane_dead_poll_reports_pane_dead() {
660        let session = format!("batty-test-watcher-pane-dead-{}", std::process::id());
661        let _ = crate::tmux::kill_session(&session);
662
663        crate::tmux::create_session(&session, "bash", &[], "/tmp").unwrap();
664        crate::tmux::create_window(&session, "keeper", "sleep", &["30".to_string()], "/tmp")
665            .unwrap();
666        let pane_id = crate::tmux::pane_id(&session).unwrap();
667        std::process::Command::new("tmux")
668            .args(["set-option", "-p", "-t", &pane_id, "remain-on-exit", "on"])
669            .output()
670            .unwrap();
671
672        crate::tmux::send_keys(&pane_id, "exit", true).unwrap();
673        for _ in 0..5 {
674            if crate::tmux::pane_dead(&pane_id).unwrap_or(false) {
675                break;
676            }
677            std::thread::sleep(std::time::Duration::from_millis(200));
678        }
679        assert!(crate::tmux::pane_dead(&pane_id).unwrap());
680
681        let mut watcher = SessionWatcher::new(&pane_id, "eng-1-1", 300, None);
682        assert_eq!(watcher.poll().unwrap(), WatcherState::PaneDead);
683
684        crate::tmux::kill_session(&session).unwrap();
685    }
686
687    #[test]
688    fn watcher_exposes_codex_quality_signals() {
689        let mut watcher = SessionWatcher::new("%0", "eng-1-1", 300, None);
690        watcher.tracker = Some(SessionTracker::Codex(CodexSessionTracker {
691            sessions_root: PathBuf::from("/tmp"),
692            cwd: PathBuf::from("/repo"),
693            session_id: None,
694            session_file: None,
695            offset: 0,
696            quality: CodexQualitySignals {
697                last_response_chars: Some(12),
698                assistant_message_count: 4,
699                tool_call_count: 2,
700                unique_tool_names: vec!["exec_command".to_string(), "apply_patch".to_string()],
701                shortening_streak: 2,
702                repeated_output_streak: 3,
703                shrinking_responses: true,
704                repeated_identical_outputs: true,
705                tool_failure_message: Some("exec_command failed".to_string()),
706            },
707            last_response_hash: Some(simple_hash("same response")),
708        }));
709
710        assert_eq!(
711            watcher.codex_quality_signals(),
712            Some(CodexQualitySignals {
713                last_response_chars: Some(12),
714                assistant_message_count: 4,
715                tool_call_count: 2,
716                unique_tool_names: vec!["exec_command".to_string(), "apply_patch".to_string()],
717                shortening_streak: 2,
718                repeated_output_streak: 3,
719                shrinking_responses: true,
720                repeated_identical_outputs: true,
721                tool_failure_message: Some("exec_command failed".to_string()),
722            })
723        );
724    }
725
726    #[test]
727    fn watcher_set_session_id_rebinds_codex_tracker() {
728        let mut watcher = SessionWatcher::new("%0", "eng-1", 300, None);
729        watcher.tracker = Some(SessionTracker::Codex(CodexSessionTracker {
730            sessions_root: PathBuf::from("/tmp"),
731            cwd: PathBuf::from("/repo"),
732            session_id: Some("old-session".to_string()),
733            session_file: Some(PathBuf::from("/tmp/old-session.jsonl")),
734            offset: 42,
735            quality: CodexQualitySignals {
736                last_response_chars: Some(12),
737                assistant_message_count: 3,
738                tool_call_count: 1,
739                unique_tool_names: vec!["exec_command".to_string()],
740                shortening_streak: 1,
741                repeated_output_streak: 2,
742                shrinking_responses: true,
743                repeated_identical_outputs: true,
744                tool_failure_message: Some("failure".to_string()),
745            },
746            last_response_hash: Some(simple_hash("old")),
747        }));
748
749        watcher.set_session_id(Some("new-session".to_string()));
750
751        let Some(SessionTracker::Codex(codex)) = watcher.tracker.as_ref() else {
752            panic!("expected codex tracker");
753        };
754        assert_eq!(codex.session_id.as_deref(), Some("new-session"));
755        assert!(codex.session_file.is_none());
756        assert_eq!(codex.offset, 0);
757        assert_eq!(codex.quality, CodexQualitySignals::default());
758        assert!(codex.last_response_hash.is_none());
759    }
760
761    #[test]
762    fn watcher_exposes_tracker_session_id_from_bound_file() {
763        let mut watcher = SessionWatcher::new("%0", "architect", 300, None);
764        watcher.tracker = Some(SessionTracker::Claude(ClaudeSessionTracker {
765            projects_root: PathBuf::from("/tmp"),
766            cwd: PathBuf::from("/repo"),
767            session_id: Some("1e94dc68-6004-402a-9a7b-1bfca674806e".to_string()),
768            session_file: Some(PathBuf::from(
769                "/tmp/-Users-zedmor-project/1e94dc68-6004-402a-9a7b-1bfca674806e.jsonl",
770            )),
771            offset: 0,
772            last_state: TrackerState::Unknown,
773        }));
774
775        assert_eq!(
776            watcher.current_session_id().as_deref(),
777            Some("1e94dc68-6004-402a-9a7b-1bfca674806e")
778        );
779    }
780
781    #[test]
782    fn watcher_exposes_codex_payload_session_id_from_bound_file() {
783        let tmp = tempfile::tempdir().unwrap();
784        let session_file = tmp.path().join("rollout-2026-03-26T13-54-07-sample.jsonl");
785        fs::write(
786            &session_file,
787            "{\"type\":\"session_meta\",\"payload\":{\"id\":\"019d2b48-3d33-7613-bb3d-d0b4ecd45e2e\",\"cwd\":\"/repo/.batty/codex-context/architect\"}}\n",
788        )
789        .unwrap();
790
791        let mut watcher = SessionWatcher::new("%0", "architect", 300, None);
792        watcher.tracker = Some(SessionTracker::Codex(CodexSessionTracker {
793            sessions_root: tmp.path().to_path_buf(),
794            cwd: PathBuf::from("/repo/.batty/codex-context/architect"),
795            session_id: Some("rollout-2026-03-26T13-54-07-sample".to_string()),
796            session_file: Some(session_file),
797            offset: 0,
798            quality: CodexQualitySignals::default(),
799            last_response_hash: None,
800        }));
801
802        assert_eq!(
803            watcher.current_session_id().as_deref(),
804            Some("019d2b48-3d33-7613-bb3d-d0b4ecd45e2e")
805        );
806    }
807
808    #[test]
809    fn refresh_session_tracking_binds_codex_tracker_without_pane_poll() {
810        let tmp = tempfile::tempdir().unwrap();
811        let sessions_root = tmp.path().join("sessions");
812        let session_dir = sessions_root.join("2026").join("03").join("27");
813        fs::create_dir_all(&session_dir).unwrap();
814        let cwd = PathBuf::from("/repo/.batty/codex-context/architect");
815        let session_file = session_dir.join("rollout-2026-03-27T15-04-13-sample.jsonl");
816        fs::write(
817            &session_file,
818            format!(
819                "{{\"type\":\"session_meta\",\"payload\":{{\"id\":\"019d30ae-c469-7e33-8c33-45dcdc85804c\",\"cwd\":\"{}\"}}}}\n",
820                cwd.display()
821            ),
822        )
823        .unwrap();
824
825        let mut watcher = SessionWatcher::new("%0", "architect", 300, None);
826        watcher.tracker = Some(SessionTracker::Codex(CodexSessionTracker {
827            sessions_root,
828            cwd,
829            session_id: None,
830            session_file: None,
831            offset: 0,
832            quality: CodexQualitySignals::default(),
833            last_response_hash: None,
834        }));
835
836        watcher.refresh_session_tracking().unwrap();
837
838        assert_eq!(
839            watcher.current_session_id().as_deref(),
840            Some("019d30ae-c469-7e33-8c33-45dcdc85804c")
841        );
842    }
843
844    fn production_unwrap_expect_count(source: &str) -> usize {
845        let prod = if let Some(pos) = source.find("\n#[cfg(test)]\nmod tests") {
846            &source[..pos]
847        } else {
848            source
849        };
850        prod.lines()
851            .filter(|line| {
852                let trimmed = line.trim();
853                !trimmed.starts_with("#[cfg(test)]")
854                    && (trimmed.contains(".unwrap(") || trimmed.contains(".expect("))
855            })
856            .count()
857    }
858
859    #[test]
860    fn production_watcher_has_no_unwrap_or_expect_calls() {
861        // Check all submodule files as well as the main module.
862        let mod_src = include_str!("mod.rs");
863        assert_eq!(
864            production_unwrap_expect_count(mod_src),
865            0,
866            "production watcher/mod.rs should avoid unwrap/expect"
867        );
868        let screen_src = include_str!("screen.rs");
869        assert_eq!(
870            production_unwrap_expect_count(screen_src),
871            0,
872            "production watcher/screen.rs should avoid unwrap/expect"
873        );
874        let codex_src = include_str!("codex.rs");
875        assert_eq!(
876            production_unwrap_expect_count(codex_src),
877            0,
878            "production watcher/codex.rs should avoid unwrap/expect"
879        );
880        let claude_src = include_str!("claude.rs");
881        assert_eq!(
882            production_unwrap_expect_count(claude_src),
883            0,
884            "production watcher/claude.rs should avoid unwrap/expect"
885        );
886    }
887
888    #[test]
889    fn secs_since_last_output_change_starts_at_zero() {
890        let w = SessionWatcher::new("%0", "eng-1-1", 300, None);
891        // Freshly created watcher — elapsed should be near zero.
892        assert!(w.secs_since_last_output_change() < 2);
893    }
894
895    #[test]
896    fn activate_resets_last_output_changed_at() {
897        let mut w = SessionWatcher::new("%0", "eng-1-1", 300, None);
898        // Simulate time passing by backdating the field.
899        w.last_output_changed_at = Instant::now() - std::time::Duration::from_secs(600);
900        assert!(w.secs_since_last_output_change() >= 600);
901
902        w.activate();
903        assert!(w.secs_since_last_output_change() < 2);
904    }
905
906    // --- Readiness gate tests ---
907
908    #[test]
909    fn new_watcher_is_not_ready_for_delivery() {
910        let w = SessionWatcher::new("%0", "eng-1-1", 300, None);
911        assert!(!w.is_ready_for_delivery());
912        assert_eq!(w.state, WatcherState::Idle);
913    }
914
915    #[test]
916    fn confirm_ready_sets_ready_state() {
917        let mut w = SessionWatcher::new("%0", "eng-1-1", 300, None);
918        w.confirm_ready();
919        assert!(w.is_ready_for_delivery());
920        assert_eq!(w.state, WatcherState::Ready);
921    }
922
923    #[test]
924    fn activate_sets_ready_confirmed() {
925        let mut w = SessionWatcher::new("%0", "eng-1-1", 300, None);
926        assert!(!w.is_ready_for_delivery());
927        w.activate();
928        assert!(w.is_ready_for_delivery());
929        assert_eq!(w.state, WatcherState::Active);
930    }
931
932    #[test]
933    fn deactivate_preserves_readiness() {
934        let mut w = SessionWatcher::new("%0", "eng-1-1", 300, None);
935        w.activate();
936        assert!(w.is_ready_for_delivery());
937        w.deactivate();
938        assert!(w.is_ready_for_delivery());
939        assert_eq!(w.state, WatcherState::Idle);
940    }
941
942    #[test]
943    fn confirm_ready_on_already_idle_with_completion_does_not_override() {
944        let mut w = SessionWatcher::new("%0", "eng-1-1", 300, None);
945        w.activate();
946        w.deactivate();
947        w.confirm_ready();
948        assert_eq!(w.state, WatcherState::Idle);
949        assert!(w.is_ready_for_delivery());
950    }
951}