Skip to main content

batty_cli/team/watcher/
mod.rs

1//! Disk-based session monitoring — polls agent output via tmux capture-pane.
2//!
3//! Detects agent completion, crashes, and staleness by periodically capturing
4//! pane output and checking for state changes. For Codex and Claude, this also
5//! tails their on-disk session JSONL data to reduce false classifications from
6//! stale pane text.
7
8mod claude;
9mod codex;
10mod screen;
11
12use anyhow::Result;
13use std::fs;
14use std::path::{Path, PathBuf};
15use std::time::Instant;
16
17use crate::tmux;
18
19pub(crate) use claude::discover_claude_session_file;
20pub use screen::is_at_agent_prompt;
21
22use claude::ClaudeSessionTracker;
23use codex::CodexSessionTracker;
24use screen::{classify_capture_state, detect_context_exhausted, next_state_after_capture};
25
26/// State of a watched agent session.
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum WatcherState {
29    /// Agent is actively producing output.
30    Active,
31    /// Agent CLI has started and is showing its input prompt, ready for messages.
32    /// Transitional state between pane creation and first Idle classification.
33    Ready,
34    /// No agent running in pane (idle / waiting for assignment).
35    Idle,
36    /// The tmux pane no longer exists or its process has exited.
37    PaneDead,
38    /// Agent reported that its conversation/session is too large to continue.
39    ContextExhausted,
40}
41
42pub struct SessionWatcher {
43    pub pane_id: String,
44    pub member_name: String,
45    pub state: WatcherState,
46    completion_observed: bool,
47    last_output_hash: u64,
48    last_capture: String,
49    /// Timestamp of the last time pane output changed (hash differed from previous poll).
50    last_output_changed_at: Instant,
51    tracker: Option<SessionTracker>,
52    /// Whether the agent prompt has been observed at least once since creation
53    /// or last `activate()`. False means the pane may not be ready for messages.
54    ready_confirmed: bool,
55}
56
57#[derive(Debug, Clone, Default, PartialEq, Eq)]
58pub struct CodexQualitySignals {
59    pub last_response_chars: Option<usize>,
60    pub shortening_streak: u32,
61    pub repeated_output_streak: u32,
62    pub shrinking_responses: bool,
63    pub repeated_identical_outputs: bool,
64    pub tool_failure_message: Option<String>,
65}
66
67#[derive(Debug, Clone)]
68pub enum SessionTrackerConfig {
69    Codex { cwd: PathBuf },
70    Claude { cwd: PathBuf },
71}
72
73enum SessionTracker {
74    Codex(CodexSessionTracker),
75    Claude(ClaudeSessionTracker),
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub(super) enum TrackerKind {
80    None,
81    Codex,
82    Claude,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub(super) enum TrackerState {
87    Active,
88    Idle,
89    Completed,
90    Unknown,
91}
92
93impl SessionWatcher {
94    pub fn new(
95        pane_id: &str,
96        member_name: &str,
97        _stale_secs: u64,
98        tracker: Option<SessionTrackerConfig>,
99    ) -> Self {
100        Self {
101            pane_id: pane_id.to_string(),
102            member_name: member_name.to_string(),
103            state: WatcherState::Idle,
104            completion_observed: false,
105            last_output_hash: 0,
106            last_capture: String::new(),
107            last_output_changed_at: Instant::now(),
108            ready_confirmed: false,
109            tracker: tracker.map(|tracker| match tracker {
110                SessionTrackerConfig::Codex { cwd } => SessionTracker::Codex(CodexSessionTracker {
111                    sessions_root: default_codex_sessions_root(),
112                    cwd,
113                    session_id: None,
114                    session_file: None,
115                    offset: 0,
116                    quality: CodexQualitySignals::default(),
117                    last_response_hash: None,
118                }),
119                SessionTrackerConfig::Claude { cwd } => {
120                    SessionTracker::Claude(ClaudeSessionTracker {
121                        projects_root: default_claude_projects_root(),
122                        cwd,
123                        session_id: None,
124                        session_file: None,
125                        offset: 0,
126                        last_state: TrackerState::Unknown,
127                    })
128                }
129            }),
130        }
131    }
132
133    /// Poll the pane and update state.
134    pub fn poll(&mut self) -> Result<WatcherState> {
135        // Check if pane still exists
136        if !tmux::pane_exists(&self.pane_id) {
137            self.state = WatcherState::PaneDead;
138            return Ok(self.state);
139        }
140
141        // Check if pane process died
142        if tmux::pane_dead(&self.pane_id).unwrap_or(false) {
143            self.state = WatcherState::PaneDead;
144            return Ok(self.state);
145        }
146
147        // If idle or ready, peek at the pane to detect if the agent started working.
148        // This lets the watcher self-heal without requiring explicit activation
149        // from the daemon whenever a nudge, standup, or external input arrives.
150        if matches!(self.state, WatcherState::Idle | WatcherState::Ready) {
151            let capture = match tmux::capture_pane(&self.pane_id) {
152                Ok(capture) => capture,
153                Err(_) => {
154                    self.state = WatcherState::PaneDead;
155                    return Ok(self.state);
156                }
157            };
158            if detect_context_exhausted(&capture) {
159                self.last_capture = capture;
160                self.state = WatcherState::ContextExhausted;
161                return Ok(self.state);
162            }
163            let screen_state = classify_capture_state(&capture);
164            // When we first see the agent prompt, confirm readiness.
165            if screen_state == screen::ScreenState::Idle && !self.ready_confirmed {
166                self.ready_confirmed = true;
167                self.last_capture = capture;
168                self.state = WatcherState::Ready;
169                return Ok(self.state);
170            }
171            let tracker_state = self.poll_tracker().unwrap_or(TrackerState::Unknown);
172            self.completion_observed = tracker_state == TrackerState::Completed;
173            let tracker_kind = self.tracker_kind();
174            if !capture.is_empty() {
175                self.last_capture = capture;
176                let next_state =
177                    next_state_after_capture(tracker_kind, screen_state, tracker_state, self.state);
178                if next_state != WatcherState::Idle || self.ready_confirmed {
179                    self.last_output_hash = simple_hash(&self.last_capture);
180                    self.last_output_changed_at = Instant::now();
181                    self.state = next_state;
182                }
183            }
184            return Ok(self.state);
185        }
186
187        // Capture current pane content
188        let capture = match tmux::capture_pane(&self.pane_id) {
189            Ok(capture) => capture,
190            Err(_) => {
191                self.state = WatcherState::PaneDead;
192                return Ok(self.state);
193            }
194        };
195        if detect_context_exhausted(&capture) {
196            self.last_capture = capture;
197            self.state = WatcherState::ContextExhausted;
198            return Ok(self.state);
199        }
200        let hash = simple_hash(&capture);
201        let screen_state = classify_capture_state(&capture);
202        let tracker_state = self.poll_tracker().unwrap_or(TrackerState::Unknown);
203        self.completion_observed = tracker_state == TrackerState::Completed;
204        let tracker_kind = self.tracker_kind();
205
206        if hash != self.last_output_hash {
207            self.last_output_hash = hash;
208            self.last_output_changed_at = Instant::now();
209            self.last_capture = capture;
210            self.state =
211                next_state_after_capture(tracker_kind, screen_state, tracker_state, self.state);
212        } else {
213            self.last_capture = capture;
214            self.state =
215                next_state_after_capture(tracker_kind, screen_state, tracker_state, self.state);
216        }
217
218        Ok(self.state)
219    }
220
221    /// Whether this agent's pane has been confirmed ready for message delivery.
222    ///
223    /// Returns `true` once the agent prompt has been observed at least once since
224    /// the watcher was created or last activated. This prevents injecting messages
225    /// into panes where the agent CLI hasn't finished starting.
226    pub fn is_ready_for_delivery(&self) -> bool {
227        self.ready_confirmed
228    }
229
230    /// Externally confirm that the agent pane is ready (e.g. from a delivery
231    /// readiness check that observed the prompt). Updates state to Ready if it
232    /// was still in the initial Idle state before first readiness confirmation.
233    pub fn confirm_ready(&mut self) {
234        let was_unconfirmed = !self.ready_confirmed;
235        self.ready_confirmed = true;
236        if was_unconfirmed && self.state == WatcherState::Idle {
237            self.state = WatcherState::Ready;
238        }
239    }
240
241    /// Mark this watcher as actively working.
242    pub fn activate(&mut self) {
243        self.state = WatcherState::Active;
244        self.completion_observed = false;
245        self.last_output_hash = 0;
246        self.last_output_changed_at = Instant::now();
247        // A message was just injected so the pane was confirmed ready.
248        self.ready_confirmed = true;
249        if let Some(tracker) = self.tracker.as_mut() {
250            match tracker {
251                SessionTracker::Codex(codex) => {
252                    codex.session_file = None;
253                    codex.offset = 0;
254                    codex.quality = CodexQualitySignals::default();
255                    codex.last_response_hash = None;
256                }
257                SessionTracker::Claude(claude) => {
258                    claude.session_file = None;
259                    claude.offset = 0;
260                    claude.last_state = TrackerState::Unknown;
261                }
262            }
263        }
264    }
265
266    pub fn set_session_id(&mut self, session_id: Option<String>) {
267        if let Some(tracker) = self.tracker.as_mut() {
268            match tracker {
269                SessionTracker::Codex(codex) => {
270                    if codex.session_id == session_id {
271                        return;
272                    }
273                    self.completion_observed = false;
274                    codex.session_id = session_id;
275                    codex.session_file = None;
276                    codex.offset = 0;
277                    codex.quality = CodexQualitySignals::default();
278                    codex.last_response_hash = None;
279                }
280                SessionTracker::Claude(claude) => {
281                    if claude.session_id == session_id {
282                        return;
283                    }
284                    self.completion_observed = false;
285                    claude.session_id = session_id;
286                    claude.session_file = None;
287                    claude.offset = 0;
288                    claude.last_state = TrackerState::Unknown;
289                }
290            }
291        }
292    }
293
294    /// Mark this watcher as idle.
295    pub fn deactivate(&mut self) {
296        self.state = WatcherState::Idle;
297        self.completion_observed = false;
298    }
299
300    /// Seconds since the last time pane output changed.
301    pub fn secs_since_last_output_change(&self) -> u64 {
302        self.last_output_changed_at.elapsed().as_secs()
303    }
304
305    /// Get the last captured pane output.
306    pub fn last_output(&self) -> &str {
307        &self.last_capture
308    }
309
310    /// Get the last N lines of captured output.
311    pub fn last_lines(&self, n: usize) -> String {
312        let lines: Vec<&str> = self.last_capture.lines().collect();
313        let start = lines.len().saturating_sub(n);
314        lines[start..].join("\n")
315    }
316
317    pub fn current_session_id(&self) -> Option<String> {
318        match self.tracker.as_ref() {
319            Some(SessionTracker::Codex(codex)) => codex
320                .session_file
321                .as_ref()
322                .and_then(|path| codex::codex_session_resume_id(path).ok().flatten())
323                .or_else(|| codex.session_id.clone()),
324            Some(SessionTracker::Claude(claude)) => session_file_id(claude.session_file.as_ref()),
325            None => None,
326        }
327    }
328
329    pub fn configured_session_id(&self) -> Option<String> {
330        match self.tracker.as_ref() {
331            Some(SessionTracker::Codex(codex)) => codex.session_id.clone(),
332            Some(SessionTracker::Claude(claude)) => claude.session_id.clone(),
333            None => None,
334        }
335    }
336
337    pub fn current_session_size_bytes(&self) -> Option<u64> {
338        let path = match self.tracker.as_ref() {
339            Some(SessionTracker::Codex(codex)) => codex.session_file.as_ref(),
340            Some(SessionTracker::Claude(claude)) => claude.session_file.as_ref(),
341            None => None,
342        }?;
343        fs::metadata(path).ok().map(|metadata| metadata.len())
344    }
345
346    pub fn codex_quality_signals(&self) -> Option<CodexQualitySignals> {
347        match self.tracker.as_ref() {
348            Some(SessionTracker::Codex(codex)) => Some(codex.quality.clone()),
349            _ => None,
350        }
351    }
352
353    pub fn take_completion_event(&mut self) -> bool {
354        let observed = self.completion_observed;
355        self.completion_observed = false;
356        observed
357    }
358
359    pub fn refresh_session_tracking(&mut self) -> Result<()> {
360        let _ = self.poll_tracker()?;
361        Ok(())
362    }
363
364    fn poll_tracker(&mut self) -> Result<TrackerState> {
365        let current_state = self.state;
366        let Some(tracker) = self.tracker.as_mut() else {
367            return Ok(TrackerState::Unknown);
368        };
369
370        match tracker {
371            SessionTracker::Codex(codex) => {
372                if codex.session_file.is_none() {
373                    codex.session_file = codex::discover_codex_session_file(
374                        &codex.sessions_root,
375                        &codex.cwd,
376                        codex.session_id.as_deref(),
377                    )?;
378                    if let Some(session_file) = codex.session_file.as_ref() {
379                        codex.session_id = codex::codex_session_resume_id(session_file)?;
380                        codex.offset = current_file_len(session_file)?;
381                    }
382                    codex.quality = CodexQualitySignals::default();
383                    codex.last_response_hash = None;
384                    return Ok(TrackerState::Unknown);
385                }
386
387                let Some(session_file) = codex.session_file.clone() else {
388                    return Ok(TrackerState::Unknown);
389                };
390
391                if !session_file.exists() {
392                    codex.session_file = None;
393                    codex.offset = 0;
394                    codex.quality = CodexQualitySignals::default();
395                    codex.last_response_hash = None;
396                    return Ok(TrackerState::Unknown);
397                }
398
399                let state = codex::poll_codex_session_file(
400                    &session_file,
401                    &mut codex.offset,
402                    &mut codex.quality,
403                    &mut codex.last_response_hash,
404                )?;
405
406                // When idle with no new events, check if a newer session file
407                // appeared (agent started a new task). Re-discover so the
408                // tracker picks up the latest session.
409                if state == TrackerState::Unknown
410                    && matches!(current_state, WatcherState::Idle | WatcherState::Ready)
411                {
412                    if let Some(latest) = codex::discover_codex_session_file(
413                        &codex.sessions_root,
414                        &codex.cwd,
415                        codex.session_id.as_deref(),
416                    )? {
417                        if latest != session_file {
418                            codex.session_file = Some(latest.clone());
419                            codex.session_id = codex::codex_session_resume_id(&latest)?;
420                            codex.offset = 0;
421                            codex.quality = CodexQualitySignals::default();
422                            codex.last_response_hash = None;
423                            return codex::poll_codex_session_file(
424                                &latest,
425                                &mut codex.offset,
426                                &mut codex.quality,
427                                &mut codex.last_response_hash,
428                            );
429                        }
430                    }
431                }
432
433                Ok(state)
434            }
435            SessionTracker::Claude(claude) => claude::poll_claude_session(claude),
436        }
437    }
438
439    fn tracker_kind(&self) -> TrackerKind {
440        match self.tracker {
441            Some(SessionTracker::Codex(_)) => TrackerKind::Codex,
442            Some(SessionTracker::Claude(_)) => TrackerKind::Claude,
443            None => TrackerKind::None,
444        }
445    }
446}
447
448// --- Shared utility functions used by submodules ---
449
450pub(super) fn simple_hash(s: &str) -> u64 {
451    // FNV-1a style hash, good enough for change detection
452    let mut hash: u64 = 0xcbf29ce484222325;
453    for byte in s.bytes() {
454        hash ^= byte as u64;
455        hash = hash.wrapping_mul(0x100000001b3);
456    }
457    hash
458}
459
460fn default_codex_sessions_root() -> PathBuf {
461    std::env::var_os("HOME")
462        .map(PathBuf::from)
463        .unwrap_or_else(|| PathBuf::from("/"))
464        .join(".codex")
465        .join("sessions")
466}
467
468fn default_claude_projects_root() -> PathBuf {
469    std::env::var_os("HOME")
470        .map(PathBuf::from)
471        .unwrap_or_else(|| PathBuf::from("/"))
472        .join(".claude")
473        .join("projects")
474}
475
476pub(super) fn current_file_len(path: &Path) -> Result<u64> {
477    Ok(fs::metadata(path)?.len())
478}
479
480pub(super) fn session_file_id(path: Option<&PathBuf>) -> Option<String> {
481    path.and_then(|path| {
482        path.file_stem()
483            .and_then(|stem| stem.to_str())
484            .map(|stem| stem.to_string())
485    })
486}
487
488pub(super) fn read_dir_paths(dir: &Path) -> Result<Vec<PathBuf>> {
489    let mut paths = Vec::new();
490    for entry in fs::read_dir(dir)? {
491        let entry = entry?;
492        paths.push(entry.path());
493    }
494    Ok(paths)
495}
496
497#[cfg(test)]
498mod tests {
499    use super::*;
500    use serial_test::serial;
501
502    #[test]
503    fn simple_hash_differs_for_different_input() {
504        assert_ne!(simple_hash("hello"), simple_hash("world"));
505        assert_eq!(simple_hash("same"), simple_hash("same"));
506    }
507
508    #[test]
509    fn new_watcher_starts_idle() {
510        let w = SessionWatcher::new("%0", "eng-1-1", 300, None);
511        assert_eq!(w.state, WatcherState::Idle);
512    }
513
514    #[test]
515    fn activate_sets_active() {
516        let mut w = SessionWatcher::new("%0", "eng-1-1", 300, None);
517        w.activate();
518        assert_eq!(w.state, WatcherState::Active);
519    }
520
521    #[test]
522    fn deactivate_sets_idle() {
523        let mut w = SessionWatcher::new("%0", "eng-1-1", 300, None);
524        w.activate();
525        w.deactivate();
526        assert_eq!(w.state, WatcherState::Idle);
527    }
528
529    #[test]
530    fn last_lines_returns_tail() {
531        let mut w = SessionWatcher::new("%0", "eng-1-1", 300, None);
532        w.last_capture = "line1\nline2\nline3\nline4\nline5".to_string();
533        assert_eq!(w.last_lines(3), "line3\nline4\nline5");
534        assert_eq!(w.last_lines(10), "line1\nline2\nline3\nline4\nline5");
535    }
536
537    #[test]
538    #[serial]
539    #[cfg_attr(not(feature = "integration"), ignore)]
540    fn idle_poll_consumes_non_empty_capture() {
541        let session = "batty-test-watcher-idle-poll";
542        let _ = crate::tmux::kill_session(session);
543
544        crate::tmux::create_session(
545            session,
546            "bash",
547            &[
548                "-lc".to_string(),
549                "printf 'watcher-idle-poll\\n'; sleep 3".to_string(),
550            ],
551            "/tmp",
552        )
553        .unwrap();
554        std::thread::sleep(std::time::Duration::from_millis(300));
555
556        let pane_id = crate::tmux::pane_id(session).unwrap();
557        let mut watcher = SessionWatcher::new(&pane_id, "eng-1-1", 300, None);
558
559        assert_eq!(watcher.poll().unwrap(), WatcherState::Idle);
560        assert!(!watcher.last_output().is_empty());
561
562        crate::tmux::kill_session(session).unwrap();
563    }
564
565    #[test]
566    #[serial]
567    #[cfg_attr(not(feature = "integration"), ignore)]
568    fn active_poll_updates_state_when_capture_changes() {
569        let session = "batty-test-watcher-active-change";
570        let _ = crate::tmux::kill_session(session);
571
572        crate::tmux::create_session(
573            session,
574            "bash",
575            &[
576                "-lc".to_string(),
577                "printf 'watcher-active-change\\n'; sleep 3".to_string(),
578            ],
579            "/tmp",
580        )
581        .unwrap();
582        std::thread::sleep(std::time::Duration::from_millis(300));
583
584        let pane_id = crate::tmux::pane_id(session).unwrap();
585        let mut watcher = SessionWatcher::new(&pane_id, "eng-1-1", 300, None);
586        watcher.state = WatcherState::Active;
587
588        assert_eq!(watcher.poll().unwrap(), WatcherState::Active);
589        assert_ne!(watcher.last_output_hash, 0);
590        assert!(!watcher.last_output().is_empty());
591
592        crate::tmux::kill_session(session).unwrap();
593    }
594
595    #[test]
596    #[serial]
597    #[cfg_attr(not(feature = "integration"), ignore)]
598    fn idle_poll_detects_context_exhaustion() {
599        let session = format!("batty-test-watcher-context-exhaust-{}", std::process::id());
600        let _ = crate::tmux::kill_session(&session);
601
602        crate::tmux::create_session(&session, "cat", &[], "/tmp").unwrap();
603        let pane_id = crate::tmux::pane_id(&session).unwrap();
604        std::thread::sleep(std::time::Duration::from_millis(100));
605        crate::tmux::send_keys(&pane_id, "Conversation is too long to continue.", true).unwrap();
606        std::thread::sleep(std::time::Duration::from_millis(150));
607
608        let mut watcher = SessionWatcher::new(&pane_id, "eng-1-1", 300, None);
609
610        assert_eq!(watcher.poll().unwrap(), WatcherState::ContextExhausted);
611        assert!(watcher.last_output().contains("Conversation is too long"));
612
613        crate::tmux::kill_session(&session).unwrap();
614    }
615
616    #[test]
617    #[serial]
618    #[cfg_attr(not(feature = "integration"), ignore)]
619    fn active_poll_keeps_previous_state_when_capture_is_unchanged() {
620        let session = "batty-test-watcher-unchanged";
621        let _ = crate::tmux::kill_session(session);
622
623        crate::tmux::create_session(
624            session,
625            "bash",
626            &[
627                "-lc".to_string(),
628                "printf 'watcher-unchanged\\n'; sleep 3".to_string(),
629            ],
630            "/tmp",
631        )
632        .unwrap();
633        std::thread::sleep(std::time::Duration::from_millis(300));
634
635        let pane_id = crate::tmux::pane_id(session).unwrap();
636        let capture = crate::tmux::capture_pane(&pane_id).unwrap();
637        let mut watcher = SessionWatcher::new(&pane_id, "eng-1-1", 0, None);
638        watcher.state = WatcherState::Active;
639        watcher.last_capture = capture.clone();
640        watcher.last_output_hash = simple_hash(&capture);
641
642        assert_eq!(watcher.poll().unwrap(), WatcherState::Active);
643
644        crate::tmux::kill_session(session).unwrap();
645    }
646
647    #[test]
648    fn missing_pane_poll_reports_pane_dead() {
649        let mut watcher = SessionWatcher::new("%999999", "eng-1-1", 300, None);
650        assert_eq!(watcher.poll().unwrap(), WatcherState::PaneDead);
651    }
652
653    #[test]
654    #[serial]
655    #[cfg_attr(not(feature = "integration"), ignore)]
656    fn pane_dead_poll_reports_pane_dead() {
657        let session = format!("batty-test-watcher-pane-dead-{}", std::process::id());
658        let _ = crate::tmux::kill_session(&session);
659
660        crate::tmux::create_session(&session, "bash", &[], "/tmp").unwrap();
661        crate::tmux::create_window(&session, "keeper", "sleep", &["30".to_string()], "/tmp")
662            .unwrap();
663        let pane_id = crate::tmux::pane_id(&session).unwrap();
664        std::process::Command::new("tmux")
665            .args(["set-option", "-p", "-t", &pane_id, "remain-on-exit", "on"])
666            .output()
667            .unwrap();
668
669        crate::tmux::send_keys(&pane_id, "exit", true).unwrap();
670        for _ in 0..5 {
671            if crate::tmux::pane_dead(&pane_id).unwrap_or(false) {
672                break;
673            }
674            std::thread::sleep(std::time::Duration::from_millis(200));
675        }
676        assert!(crate::tmux::pane_dead(&pane_id).unwrap());
677
678        let mut watcher = SessionWatcher::new(&pane_id, "eng-1-1", 300, None);
679        assert_eq!(watcher.poll().unwrap(), WatcherState::PaneDead);
680
681        crate::tmux::kill_session(&session).unwrap();
682    }
683
684    #[test]
685    fn watcher_exposes_codex_quality_signals() {
686        let mut watcher = SessionWatcher::new("%0", "eng-1-1", 300, None);
687        watcher.tracker = Some(SessionTracker::Codex(CodexSessionTracker {
688            sessions_root: PathBuf::from("/tmp"),
689            cwd: PathBuf::from("/repo"),
690            session_id: None,
691            session_file: None,
692            offset: 0,
693            quality: CodexQualitySignals {
694                last_response_chars: Some(12),
695                shortening_streak: 2,
696                repeated_output_streak: 3,
697                shrinking_responses: true,
698                repeated_identical_outputs: true,
699                tool_failure_message: Some("exec_command failed".to_string()),
700            },
701            last_response_hash: Some(simple_hash("same response")),
702        }));
703
704        assert_eq!(
705            watcher.codex_quality_signals(),
706            Some(CodexQualitySignals {
707                last_response_chars: Some(12),
708                shortening_streak: 2,
709                repeated_output_streak: 3,
710                shrinking_responses: true,
711                repeated_identical_outputs: true,
712                tool_failure_message: Some("exec_command failed".to_string()),
713            })
714        );
715    }
716
717    #[test]
718    fn watcher_set_session_id_rebinds_codex_tracker() {
719        let mut watcher = SessionWatcher::new("%0", "eng-1", 300, None);
720        watcher.tracker = Some(SessionTracker::Codex(CodexSessionTracker {
721            sessions_root: PathBuf::from("/tmp"),
722            cwd: PathBuf::from("/repo"),
723            session_id: Some("old-session".to_string()),
724            session_file: Some(PathBuf::from("/tmp/old-session.jsonl")),
725            offset: 42,
726            quality: CodexQualitySignals {
727                last_response_chars: Some(12),
728                shortening_streak: 1,
729                repeated_output_streak: 2,
730                shrinking_responses: true,
731                repeated_identical_outputs: true,
732                tool_failure_message: Some("failure".to_string()),
733            },
734            last_response_hash: Some(simple_hash("old")),
735        }));
736
737        watcher.set_session_id(Some("new-session".to_string()));
738
739        let Some(SessionTracker::Codex(codex)) = watcher.tracker.as_ref() else {
740            panic!("expected codex tracker");
741        };
742        assert_eq!(codex.session_id.as_deref(), Some("new-session"));
743        assert!(codex.session_file.is_none());
744        assert_eq!(codex.offset, 0);
745        assert_eq!(codex.quality, CodexQualitySignals::default());
746        assert!(codex.last_response_hash.is_none());
747    }
748
749    #[test]
750    fn watcher_exposes_tracker_session_id_from_bound_file() {
751        let mut watcher = SessionWatcher::new("%0", "architect", 300, None);
752        watcher.tracker = Some(SessionTracker::Claude(ClaudeSessionTracker {
753            projects_root: PathBuf::from("/tmp"),
754            cwd: PathBuf::from("/repo"),
755            session_id: Some("1e94dc68-6004-402a-9a7b-1bfca674806e".to_string()),
756            session_file: Some(PathBuf::from(
757                "/tmp/-Users-zedmor-project/1e94dc68-6004-402a-9a7b-1bfca674806e.jsonl",
758            )),
759            offset: 0,
760            last_state: TrackerState::Unknown,
761        }));
762
763        assert_eq!(
764            watcher.current_session_id().as_deref(),
765            Some("1e94dc68-6004-402a-9a7b-1bfca674806e")
766        );
767    }
768
769    #[test]
770    fn watcher_exposes_codex_payload_session_id_from_bound_file() {
771        let tmp = tempfile::tempdir().unwrap();
772        let session_file = tmp.path().join("rollout-2026-03-26T13-54-07-sample.jsonl");
773        fs::write(
774            &session_file,
775            "{\"type\":\"session_meta\",\"payload\":{\"id\":\"019d2b48-3d33-7613-bb3d-d0b4ecd45e2e\",\"cwd\":\"/repo/.batty/codex-context/architect\"}}\n",
776        )
777        .unwrap();
778
779        let mut watcher = SessionWatcher::new("%0", "architect", 300, None);
780        watcher.tracker = Some(SessionTracker::Codex(CodexSessionTracker {
781            sessions_root: tmp.path().to_path_buf(),
782            cwd: PathBuf::from("/repo/.batty/codex-context/architect"),
783            session_id: Some("rollout-2026-03-26T13-54-07-sample".to_string()),
784            session_file: Some(session_file),
785            offset: 0,
786            quality: CodexQualitySignals::default(),
787            last_response_hash: None,
788        }));
789
790        assert_eq!(
791            watcher.current_session_id().as_deref(),
792            Some("019d2b48-3d33-7613-bb3d-d0b4ecd45e2e")
793        );
794    }
795
796    #[test]
797    fn refresh_session_tracking_binds_codex_tracker_without_pane_poll() {
798        let tmp = tempfile::tempdir().unwrap();
799        let sessions_root = tmp.path().join("sessions");
800        let session_dir = sessions_root.join("2026").join("03").join("27");
801        fs::create_dir_all(&session_dir).unwrap();
802        let cwd = PathBuf::from("/repo/.batty/codex-context/architect");
803        let session_file = session_dir.join("rollout-2026-03-27T15-04-13-sample.jsonl");
804        fs::write(
805            &session_file,
806            format!(
807                "{{\"type\":\"session_meta\",\"payload\":{{\"id\":\"019d30ae-c469-7e33-8c33-45dcdc85804c\",\"cwd\":\"{}\"}}}}\n",
808                cwd.display()
809            ),
810        )
811        .unwrap();
812
813        let mut watcher = SessionWatcher::new("%0", "architect", 300, None);
814        watcher.tracker = Some(SessionTracker::Codex(CodexSessionTracker {
815            sessions_root,
816            cwd,
817            session_id: None,
818            session_file: None,
819            offset: 0,
820            quality: CodexQualitySignals::default(),
821            last_response_hash: None,
822        }));
823
824        watcher.refresh_session_tracking().unwrap();
825
826        assert_eq!(
827            watcher.current_session_id().as_deref(),
828            Some("019d30ae-c469-7e33-8c33-45dcdc85804c")
829        );
830    }
831
832    fn production_unwrap_expect_count(source: &str) -> usize {
833        let prod = if let Some(pos) = source.find("\n#[cfg(test)]\nmod tests") {
834            &source[..pos]
835        } else {
836            source
837        };
838        prod.lines()
839            .filter(|line| {
840                let trimmed = line.trim();
841                !trimmed.starts_with("#[cfg(test)]")
842                    && (trimmed.contains(".unwrap(") || trimmed.contains(".expect("))
843            })
844            .count()
845    }
846
847    #[test]
848    fn production_watcher_has_no_unwrap_or_expect_calls() {
849        // Check all submodule files as well as the main module.
850        let mod_src = include_str!("mod.rs");
851        assert_eq!(
852            production_unwrap_expect_count(mod_src),
853            0,
854            "production watcher/mod.rs should avoid unwrap/expect"
855        );
856        let screen_src = include_str!("screen.rs");
857        assert_eq!(
858            production_unwrap_expect_count(screen_src),
859            0,
860            "production watcher/screen.rs should avoid unwrap/expect"
861        );
862        let codex_src = include_str!("codex.rs");
863        assert_eq!(
864            production_unwrap_expect_count(codex_src),
865            0,
866            "production watcher/codex.rs should avoid unwrap/expect"
867        );
868        let claude_src = include_str!("claude.rs");
869        assert_eq!(
870            production_unwrap_expect_count(claude_src),
871            0,
872            "production watcher/claude.rs should avoid unwrap/expect"
873        );
874    }
875
876    #[test]
877    fn secs_since_last_output_change_starts_at_zero() {
878        let w = SessionWatcher::new("%0", "eng-1-1", 300, None);
879        // Freshly created watcher — elapsed should be near zero.
880        assert!(w.secs_since_last_output_change() < 2);
881    }
882
883    #[test]
884    fn activate_resets_last_output_changed_at() {
885        let mut w = SessionWatcher::new("%0", "eng-1-1", 300, None);
886        // Simulate time passing by backdating the field.
887        w.last_output_changed_at = Instant::now() - std::time::Duration::from_secs(600);
888        assert!(w.secs_since_last_output_change() >= 600);
889
890        w.activate();
891        assert!(w.secs_since_last_output_change() < 2);
892    }
893
894    // --- Readiness gate tests ---
895
896    #[test]
897    fn new_watcher_is_not_ready_for_delivery() {
898        let w = SessionWatcher::new("%0", "eng-1-1", 300, None);
899        assert!(!w.is_ready_for_delivery());
900        assert_eq!(w.state, WatcherState::Idle);
901    }
902
903    #[test]
904    fn confirm_ready_sets_ready_state() {
905        let mut w = SessionWatcher::new("%0", "eng-1-1", 300, None);
906        w.confirm_ready();
907        assert!(w.is_ready_for_delivery());
908        assert_eq!(w.state, WatcherState::Ready);
909    }
910
911    #[test]
912    fn activate_sets_ready_confirmed() {
913        let mut w = SessionWatcher::new("%0", "eng-1-1", 300, None);
914        assert!(!w.is_ready_for_delivery());
915        w.activate();
916        assert!(w.is_ready_for_delivery());
917        assert_eq!(w.state, WatcherState::Active);
918    }
919
920    #[test]
921    fn deactivate_preserves_readiness() {
922        let mut w = SessionWatcher::new("%0", "eng-1-1", 300, None);
923        w.activate();
924        assert!(w.is_ready_for_delivery());
925        w.deactivate();
926        assert!(w.is_ready_for_delivery());
927        assert_eq!(w.state, WatcherState::Idle);
928    }
929
930    #[test]
931    fn confirm_ready_on_already_idle_with_completion_does_not_override() {
932        let mut w = SessionWatcher::new("%0", "eng-1-1", 300, None);
933        w.activate();
934        w.deactivate();
935        w.confirm_ready();
936        assert_eq!(w.state, WatcherState::Idle);
937        assert!(w.is_ready_for_delivery());
938    }
939}