Skip to main content

batty_cli/
console_pane.rs

1use std::fs;
2use std::io::{self, Read, Write};
3use std::os::fd::AsRawFd;
4use std::path::{Path, PathBuf};
5use std::process::Command;
6use std::time::{Duration, Instant};
7
8use anyhow::{Context, Result};
9use chrono::{Local, TimeZone};
10
11const TICK: Duration = Duration::from_millis(120);
12const MAX_LOG_LINES: usize = 200;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15enum PaneMode {
16    Log,
17    Screen,
18    Compose,
19}
20
21pub fn run(
22    project_root: &Path,
23    member: &str,
24    events_log_path: &Path,
25    pty_log_path: &Path,
26) -> Result<()> {
27    let _raw = RawTerminal::new()?;
28    let mut stdout = io::stdout().lock();
29    let stdout_fd = stdout.as_raw_fd();
30    set_nonblocking(stdout_fd, false)?;
31    let mut stdin = io::stdin().lock();
32    set_nonblocking(stdin.as_raw_fd(), true)?;
33
34    let mut state = PaneState::new(
35        project_root.to_path_buf(),
36        member.to_string(),
37        events_log_path.to_path_buf(),
38        pty_log_path.to_path_buf(),
39    );
40    state.redraw(&mut stdout, stdout_fd)?;
41
42    loop {
43        let mut buf = [0u8; 64];
44        match stdin.read(&mut buf) {
45            Ok(0) => {}
46            Ok(n) => {
47                for byte in &buf[..n] {
48                    if state.handle_key(*byte, &mut stdout, stdout_fd)? {
49                        return Ok(());
50                    }
51                }
52            }
53            Err(error) if is_transient_io_error(&error) => {}
54            Err(error) => return Err(error).context("failed to read pane input"),
55        }
56
57        if state.tick_due() {
58            state.refresh(&mut stdout, stdout_fd)?;
59        }
60
61        std::thread::sleep(TICK);
62    }
63}
64
65fn is_transient_io_error(error: &io::Error) -> bool {
66    matches!(
67        error.kind(),
68        io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted | io::ErrorKind::TimedOut
69    ) || error.raw_os_error() == Some(libc::EAGAIN)
70}
71
72struct PaneState {
73    project_root: PathBuf,
74    member: String,
75    events_log_path: PathBuf,
76    pty_log_path: PathBuf,
77    mode: PaneMode,
78    compose: String,
79    last_screen_mtime: Option<std::time::SystemTime>,
80    last_log_mtime: Option<std::time::SystemTime>,
81    last_tick: Instant,
82}
83
84impl PaneState {
85    fn new(
86        project_root: PathBuf,
87        member: String,
88        events_log_path: PathBuf,
89        pty_log_path: PathBuf,
90    ) -> Self {
91        Self {
92            project_root,
93            member,
94            events_log_path,
95            pty_log_path,
96            mode: PaneMode::Log,
97            compose: String::new(),
98            last_screen_mtime: None,
99            last_log_mtime: None,
100            last_tick: Instant::now(),
101        }
102    }
103
104    fn tick_due(&mut self) -> bool {
105        if self.last_tick.elapsed() >= TICK {
106            self.last_tick = Instant::now();
107            true
108        } else {
109            false
110        }
111    }
112
113    fn handle_key(&mut self, byte: u8, stdout: &mut impl Write, stdout_fd: i32) -> Result<bool> {
114        match self.mode {
115            PaneMode::Compose => self.handle_compose_key(byte, stdout, stdout_fd),
116            PaneMode::Log | PaneMode::Screen => match byte {
117                b'l' | b'L' => {
118                    self.mode = PaneMode::Log;
119                    self.redraw(stdout, stdout_fd)?;
120                    Ok(false)
121                }
122                b's' | b'S' => {
123                    self.mode = PaneMode::Screen;
124                    self.redraw(stdout, stdout_fd)?;
125                    Ok(false)
126                }
127                b'm' | b'M' => {
128                    self.mode = PaneMode::Compose;
129                    self.compose.clear();
130                    self.redraw(stdout, stdout_fd)?;
131                    Ok(false)
132                }
133                3 | b'q' | b'Q' => Ok(true),
134                _ => Ok(false),
135            },
136        }
137    }
138
139    fn handle_compose_key(
140        &mut self,
141        byte: u8,
142        stdout: &mut impl Write,
143        stdout_fd: i32,
144    ) -> Result<bool> {
145        match byte {
146            27 => {
147                self.mode = PaneMode::Log;
148                self.compose.clear();
149                self.redraw(stdout, stdout_fd)?;
150            }
151            b'\r' | b'\n' => {
152                let message = self.compose.trim().to_string();
153                self.mode = PaneMode::Log;
154                self.compose.clear();
155                self.redraw(stdout, stdout_fd)?;
156                if !message.is_empty() {
157                    self.send_message(&message, stdout)?;
158                }
159            }
160            127 | 8 => {
161                self.compose.pop();
162                self.redraw(stdout, stdout_fd)?;
163            }
164            byte if (32..=126).contains(&byte) => {
165                self.compose.push(byte as char);
166                self.redraw(stdout, stdout_fd)?;
167            }
168            _ => {}
169        }
170        Ok(false)
171    }
172
173    fn refresh(&mut self, stdout: &mut impl Write, stdout_fd: i32) -> Result<()> {
174        match self.mode {
175            PaneMode::Log => self.refresh_log(stdout),
176            PaneMode::Screen => self.refresh_screen(stdout, stdout_fd),
177            PaneMode::Compose => Ok(()),
178        }
179    }
180
181    fn redraw(&mut self, stdout: &mut impl Write, stdout_fd: i32) -> Result<()> {
182        match self.mode {
183            PaneMode::Log => self.render_log(stdout),
184            PaneMode::Screen => self.render_screen(stdout, stdout_fd),
185            PaneMode::Compose => self.render_compose(stdout),
186        }
187    }
188
189    fn refresh_log(&mut self, stdout: &mut impl Write) -> Result<()> {
190        let modified = file_modified(&self.events_log_path);
191        if modified == self.last_log_mtime {
192            return Ok(());
193        }
194        self.render_log(stdout)
195    }
196
197    fn refresh_screen(&mut self, stdout: &mut impl Write, stdout_fd: i32) -> Result<()> {
198        let modified = file_modified(&self.pty_log_path);
199        if modified == self.last_screen_mtime {
200            return Ok(());
201        }
202        self.render_screen(stdout, stdout_fd)
203    }
204
205    fn render_log(&mut self, stdout: &mut impl Write) -> Result<()> {
206        let content =
207            format_event_log_for_display(&read_tail_lines(&self.events_log_path, MAX_LOG_LINES)?);
208        self.last_log_mtime = file_modified(&self.events_log_path);
209        clear_screen(stdout)?;
210        write_banner(stdout, self.mode, &self.member)?;
211        write_crlf(stdout, &content)?;
212        flush_retry(stdout)?;
213        Ok(())
214    }
215
216    fn render_screen(&mut self, stdout: &mut impl Write, stdout_fd: i32) -> Result<()> {
217        let (rows, cols) = terminal_size(stdout_fd).unwrap_or((24, 80));
218        let content = render_pty_log_screen(&self.pty_log_path, rows.max(1), cols.max(1))?;
219        self.last_screen_mtime = file_modified(&self.pty_log_path);
220        clear_screen(stdout)?;
221        write_crlf(stdout, &content)?;
222        flush_retry(stdout)?;
223        Ok(())
224    }
225
226    fn render_compose(&mut self, stdout: &mut impl Write) -> Result<()> {
227        clear_screen(stdout)?;
228        write_retry(
229            stdout,
230            format!(
231                "\rmessage to {}  [Enter send, Esc cancel]\r\n\r\n\rmessage> {}",
232                self.member, self.compose
233            )
234            .as_bytes(),
235        )?;
236        flush_retry(stdout)?;
237        Ok(())
238    }
239
240    fn send_message(&self, message: &str, stdout: &mut impl Write) -> Result<()> {
241        let exe = std::env::current_exe().context("failed to resolve batty binary")?;
242        let sender = supervising_sender(&self.project_root, &self.member);
243        let output = Command::new(exe)
244            .current_dir(&self.project_root)
245            .arg("send")
246            .arg("--from")
247            .arg(&sender)
248            .arg(&self.member)
249            .arg(message)
250            .output()
251            .with_context(|| format!("failed to send message to {}", self.member))?;
252        if !output.status.success() {
253            let stderr = String::from_utf8_lossy(&output.stderr);
254            write_retry(
255                stdout,
256                format!("\r\nsend failed: {}\r\n", stderr.trim()).as_bytes(),
257            )?;
258            flush_retry(stdout)?;
259        }
260        Ok(())
261    }
262}
263
264fn supervising_sender(project_root: &Path, member: &str) -> String {
265    let config_path = project_root
266        .join(".batty")
267        .join("team_config")
268        .join("team.yaml");
269    let Ok(team_config) = crate::team::config::TeamConfig::load(&config_path) else {
270        return "human".to_string();
271    };
272    let Ok(members) = crate::team::hierarchy::resolve_hierarchy(&team_config) else {
273        return "human".to_string();
274    };
275    members
276        .iter()
277        .find(|candidate| candidate.name == member)
278        .and_then(|candidate| candidate.reports_to.clone())
279        .unwrap_or_else(|| "human".to_string())
280}
281
282fn render_pty_log_screen(path: &Path, rows: u16, cols: u16) -> Result<String> {
283    let bytes = fs::read(path).unwrap_or_default();
284    let mut parser = vt100::Parser::new(rows, cols, 0);
285    parser.process(&bytes);
286    Ok(trim_screen(parser.screen().contents()))
287}
288
289fn trim_screen(content: String) -> String {
290    content
291        .lines()
292        .map(|line| line.trim_end())
293        .collect::<Vec<_>>()
294        .join("\n")
295}
296
297fn read_tail_lines(path: &Path, max_lines: usize) -> Result<String> {
298    let content = fs::read_to_string(path).unwrap_or_default();
299    let lines: Vec<&str> = content.lines().collect();
300    let start = lines.len().saturating_sub(max_lines);
301    Ok(lines[start..].join("\n"))
302}
303
304fn format_event_log_for_display(content: &str) -> String {
305    content
306        .lines()
307        .map(format_event_log_line)
308        .collect::<Vec<_>>()
309        .join("\n")
310}
311
312fn format_event_log_line(line: &str) -> String {
313    let Some(rest) = line.strip_prefix('[') else {
314        return line.to_string();
315    };
316    let Some((ts, remainder)) = rest.split_once(']') else {
317        return line.to_string();
318    };
319    let Ok(epoch) = ts.parse::<i64>() else {
320        return line.to_string();
321    };
322    let Some(dt) = Local.timestamp_opt(epoch, 0).single() else {
323        return line.to_string();
324    };
325    format!("[{}]{}", dt.format("%Y-%m-%d %H:%M:%S"), remainder)
326}
327
328fn file_modified(path: &Path) -> Option<std::time::SystemTime> {
329    fs::metadata(path).and_then(|meta| meta.modified()).ok()
330}
331
332fn write_banner(stdout: &mut impl Write, mode: PaneMode, member: &str) -> Result<()> {
333    let mode_name = match mode {
334        PaneMode::Log => "logs",
335        PaneMode::Screen => "screen",
336        PaneMode::Compose => "message",
337    };
338    write_retry(
339        stdout,
340        format!("\r[{member}] {mode_name}  [l logs] [s screen] [m message]\r\n\r\n").as_bytes(),
341    )?;
342    Ok(())
343}
344
345fn write_crlf(stdout: &mut impl Write, text: &str) -> Result<()> {
346    for line in text.lines() {
347        write_retry(stdout, format!("\r{line}\r\n").as_bytes())?;
348    }
349    Ok(())
350}
351
352fn clear_screen(stdout: &mut impl Write) -> Result<()> {
353    write_retry(stdout, b"\x1b[2J\x1b[H")?;
354    Ok(())
355}
356
357fn write_retry(stdout: &mut impl Write, bytes: &[u8]) -> Result<()> {
358    let mut written = 0;
359    while written < bytes.len() {
360        match stdout.write(&bytes[written..]) {
361            Ok(0) => break,
362            Ok(n) => written += n,
363            Err(error) if is_transient_io_error(&error) => {
364                std::thread::sleep(Duration::from_millis(10));
365            }
366            Err(error) => return Err(error).context("failed to write pane output"),
367        }
368    }
369    Ok(())
370}
371
372fn flush_retry(stdout: &mut impl Write) -> Result<()> {
373    loop {
374        match stdout.flush() {
375            Ok(()) => return Ok(()),
376            Err(error) if is_transient_io_error(&error) => {
377                std::thread::sleep(Duration::from_millis(10));
378            }
379            Err(error) => return Err(error).context("failed to flush pane output"),
380        }
381    }
382}
383
384fn terminal_size(fd: i32) -> Option<(u16, u16)> {
385    let mut winsize = libc::winsize {
386        ws_row: 0,
387        ws_col: 0,
388        ws_xpixel: 0,
389        ws_ypixel: 0,
390    };
391    // Safety: ioctl with TIOCGWINSZ reads into a valid winsize pointer.
392    let rc = unsafe { libc::ioctl(fd, libc::TIOCGWINSZ, &mut winsize) };
393    if rc == 0 && winsize.ws_row > 0 && winsize.ws_col > 0 {
394        Some((winsize.ws_row, winsize.ws_col))
395    } else {
396        None
397    }
398}
399
400fn set_nonblocking(fd: i32, enabled: bool) -> Result<()> {
401    // Safety: fcntl on a valid file descriptor.
402    let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
403    if flags < 0 {
404        return Err(io::Error::last_os_error()).context("fcntl(F_GETFL) failed");
405    }
406    let new_flags = if enabled {
407        flags | libc::O_NONBLOCK
408    } else {
409        flags & !libc::O_NONBLOCK
410    };
411    // Safety: fcntl on a valid file descriptor.
412    if unsafe { libc::fcntl(fd, libc::F_SETFL, new_flags) } < 0 {
413        return Err(io::Error::last_os_error()).context("fcntl(F_SETFL) failed");
414    }
415    Ok(())
416}
417
418struct RawTerminal {
419    fd: i32,
420    original: libc::termios,
421}
422
423impl RawTerminal {
424    fn new() -> Result<Self> {
425        let fd = io::stdin().as_raw_fd();
426        let mut original = unsafe { std::mem::zeroed::<libc::termios>() };
427        // Safety: tcgetattr/tcsetattr operate on stdin fd and valid termios pointers.
428        if unsafe { libc::tcgetattr(fd, &mut original) } != 0 {
429            return Err(io::Error::last_os_error()).context("tcgetattr failed");
430        }
431        let mut raw = original;
432        raw.c_iflag &= !(libc::BRKINT | libc::ICRNL | libc::INPCK | libc::ISTRIP | libc::IXON);
433        raw.c_oflag &= !libc::OPOST;
434        raw.c_cflag |= libc::CS8;
435        raw.c_lflag &= !(libc::ECHO | libc::ICANON | libc::IEXTEN | libc::ISIG);
436        raw.c_cc[libc::VMIN] = 0;
437        raw.c_cc[libc::VTIME] = 1;
438        if unsafe { libc::tcsetattr(fd, libc::TCSAFLUSH, &raw) } != 0 {
439            return Err(io::Error::last_os_error()).context("tcsetattr failed");
440        }
441        Ok(Self { fd, original })
442    }
443}
444
445impl Drop for RawTerminal {
446    fn drop(&mut self) {
447        // Safety: restoring previously captured termios to same fd.
448        unsafe {
449            libc::tcsetattr(self.fd, libc::TCSAFLUSH, &self.original);
450        }
451        let _ = set_nonblocking(self.fd, false);
452    }
453}
454
455#[cfg(test)]
456mod tests {
457    use super::*;
458
459    #[test]
460    fn trim_screen_removes_trailing_spaces() {
461        let text = "abc   \nxyz  ".to_string();
462        assert_eq!(trim_screen(text), "abc\nxyz");
463    }
464
465    #[test]
466    fn read_tail_lines_limits_to_requested_count() {
467        let tmp = tempfile::tempdir().unwrap();
468        let path = tmp.path().join("events.log");
469        fs::write(&path, "1\n2\n3\n4\n").unwrap();
470        assert_eq!(read_tail_lines(&path, 2).unwrap(), "3\n4");
471    }
472
473    #[test]
474    fn format_event_log_line_rewrites_unix_timestamp() {
475        let formatted = format_event_log_line("[0] <- ready");
476        assert!(formatted.starts_with("["));
477        assert!(formatted.ends_with(" <- ready"));
478        assert_ne!(formatted, "[0] <- ready");
479        assert_eq!(formatted.len(), "[1970-01-01 00:00:00] <- ready".len());
480    }
481
482    #[test]
483    fn format_event_log_line_leaves_non_timestamp_lines_alone() {
484        assert_eq!(format_event_log_line("plain line"), "plain line");
485        assert_eq!(format_event_log_line("[abc] nope"), "[abc] nope");
486    }
487
488    #[test]
489    fn render_pty_log_screen_uses_requested_size() {
490        let tmp = tempfile::tempdir().unwrap();
491        let path = tmp.path().join("pty.log");
492        fs::write(&path, "0123456789abcdef").unwrap();
493        let rendered = render_pty_log_screen(&path, 2, 8).unwrap();
494        assert!(rendered.contains("01234567"));
495    }
496
497    #[test]
498    fn transient_io_error_recognizes_eagain() {
499        let error = io::Error::from_raw_os_error(libc::EAGAIN);
500        assert!(is_transient_io_error(&error));
501    }
502
503    #[test]
504    fn supervising_sender_prefers_parent_role() {
505        let tmp = tempfile::tempdir().unwrap();
506        let batty = tmp.path().join(".batty").join("team_config");
507        fs::create_dir_all(&batty).unwrap();
508        fs::write(
509            batty.join("team.yaml"),
510            r#"
511name: test
512roles:
513  - name: architect
514    role_type: architect
515    agent: claude
516    instances: 1
517    prompt: architect.md
518  - name: manager
519    role_type: manager
520    agent: claude
521    instances: 1
522    prompt: manager.md
523    talks_to: [architect, engineer]
524  - name: engineer
525    role_type: engineer
526    agent: codex
527    instances: 1
528    prompt: engineer.md
529    talks_to: [manager]
530"#,
531        )
532        .unwrap();
533
534        assert_eq!(supervising_sender(tmp.path(), "eng-1-1"), "manager");
535        assert_eq!(supervising_sender(tmp.path(), "manager"), "architect");
536        assert_eq!(supervising_sender(tmp.path(), "architect"), "human");
537    }
538}