1use std::fs;
2use std::io::{self, Read, Write};
3use std::os::fd::AsRawFd;
4use std::path::{Path, PathBuf};
5use std::process::Command;
6use std::time::{Duration, Instant};
7
8use anyhow::{Context, Result};
9use chrono::{Local, TimeZone};
10
11const TICK: Duration = Duration::from_millis(120);
12const MAX_LOG_LINES: usize = 200;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15enum PaneMode {
16 Log,
17 Screen,
18 Compose,
19}
20
21pub fn run(
22 project_root: &Path,
23 member: &str,
24 events_log_path: &Path,
25 pty_log_path: &Path,
26) -> Result<()> {
27 let _raw = RawTerminal::new()?;
28 let mut stdout = io::stdout().lock();
29 let stdout_fd = stdout.as_raw_fd();
30 set_nonblocking(stdout_fd, false)?;
31 let mut stdin = io::stdin().lock();
32 set_nonblocking(stdin.as_raw_fd(), true)?;
33
34 let mut state = PaneState::new(
35 project_root.to_path_buf(),
36 member.to_string(),
37 events_log_path.to_path_buf(),
38 pty_log_path.to_path_buf(),
39 );
40 state.redraw(&mut stdout, stdout_fd)?;
41
42 loop {
43 let mut buf = [0u8; 64];
44 match stdin.read(&mut buf) {
45 Ok(0) => {}
46 Ok(n) => {
47 for byte in &buf[..n] {
48 if state.handle_key(*byte, &mut stdout, stdout_fd)? {
49 return Ok(());
50 }
51 }
52 }
53 Err(error) if is_transient_io_error(&error) => {}
54 Err(error) => return Err(error).context("failed to read pane input"),
55 }
56
57 if state.tick_due() {
58 state.refresh(&mut stdout, stdout_fd)?;
59 }
60
61 std::thread::sleep(TICK);
62 }
63}
64
65fn is_transient_io_error(error: &io::Error) -> bool {
66 matches!(
67 error.kind(),
68 io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted | io::ErrorKind::TimedOut
69 ) || error.raw_os_error() == Some(libc::EAGAIN)
70}
71
72struct PaneState {
73 project_root: PathBuf,
74 member: String,
75 events_log_path: PathBuf,
76 pty_log_path: PathBuf,
77 mode: PaneMode,
78 compose: String,
79 last_screen_mtime: Option<std::time::SystemTime>,
80 last_log_mtime: Option<std::time::SystemTime>,
81 last_tick: Instant,
82}
83
84impl PaneState {
85 fn new(
86 project_root: PathBuf,
87 member: String,
88 events_log_path: PathBuf,
89 pty_log_path: PathBuf,
90 ) -> Self {
91 Self {
92 project_root,
93 member,
94 events_log_path,
95 pty_log_path,
96 mode: PaneMode::Log,
97 compose: String::new(),
98 last_screen_mtime: None,
99 last_log_mtime: None,
100 last_tick: Instant::now(),
101 }
102 }
103
104 fn tick_due(&mut self) -> bool {
105 if self.last_tick.elapsed() >= TICK {
106 self.last_tick = Instant::now();
107 true
108 } else {
109 false
110 }
111 }
112
113 fn handle_key(&mut self, byte: u8, stdout: &mut impl Write, stdout_fd: i32) -> Result<bool> {
114 match self.mode {
115 PaneMode::Compose => self.handle_compose_key(byte, stdout, stdout_fd),
116 PaneMode::Log | PaneMode::Screen => match byte {
117 b'l' | b'L' => {
118 self.mode = PaneMode::Log;
119 self.redraw(stdout, stdout_fd)?;
120 Ok(false)
121 }
122 b's' | b'S' => {
123 self.mode = PaneMode::Screen;
124 self.redraw(stdout, stdout_fd)?;
125 Ok(false)
126 }
127 b'm' | b'M' => {
128 self.mode = PaneMode::Compose;
129 self.compose.clear();
130 self.redraw(stdout, stdout_fd)?;
131 Ok(false)
132 }
133 3 | b'q' | b'Q' => Ok(true),
134 _ => Ok(false),
135 },
136 }
137 }
138
139 fn handle_compose_key(
140 &mut self,
141 byte: u8,
142 stdout: &mut impl Write,
143 stdout_fd: i32,
144 ) -> Result<bool> {
145 match byte {
146 27 => {
147 self.mode = PaneMode::Log;
148 self.compose.clear();
149 self.redraw(stdout, stdout_fd)?;
150 }
151 b'\r' | b'\n' => {
152 let message = self.compose.trim().to_string();
153 self.mode = PaneMode::Log;
154 self.compose.clear();
155 self.redraw(stdout, stdout_fd)?;
156 if !message.is_empty() {
157 self.send_message(&message, stdout)?;
158 }
159 }
160 127 | 8 => {
161 self.compose.pop();
162 self.redraw(stdout, stdout_fd)?;
163 }
164 byte if (32..=126).contains(&byte) => {
165 self.compose.push(byte as char);
166 self.redraw(stdout, stdout_fd)?;
167 }
168 _ => {}
169 }
170 Ok(false)
171 }
172
173 fn refresh(&mut self, stdout: &mut impl Write, stdout_fd: i32) -> Result<()> {
174 match self.mode {
175 PaneMode::Log => self.refresh_log(stdout),
176 PaneMode::Screen => self.refresh_screen(stdout, stdout_fd),
177 PaneMode::Compose => Ok(()),
178 }
179 }
180
181 fn redraw(&mut self, stdout: &mut impl Write, stdout_fd: i32) -> Result<()> {
182 match self.mode {
183 PaneMode::Log => self.render_log(stdout),
184 PaneMode::Screen => self.render_screen(stdout, stdout_fd),
185 PaneMode::Compose => self.render_compose(stdout),
186 }
187 }
188
189 fn refresh_log(&mut self, stdout: &mut impl Write) -> Result<()> {
190 let modified = file_modified(&self.events_log_path);
191 if modified == self.last_log_mtime {
192 return Ok(());
193 }
194 self.render_log(stdout)
195 }
196
197 fn refresh_screen(&mut self, stdout: &mut impl Write, stdout_fd: i32) -> Result<()> {
198 let modified = file_modified(&self.pty_log_path);
199 if modified == self.last_screen_mtime {
200 return Ok(());
201 }
202 self.render_screen(stdout, stdout_fd)
203 }
204
205 fn render_log(&mut self, stdout: &mut impl Write) -> Result<()> {
206 let content =
207 format_event_log_for_display(&read_tail_lines(&self.events_log_path, MAX_LOG_LINES)?);
208 self.last_log_mtime = file_modified(&self.events_log_path);
209 clear_screen(stdout)?;
210 write_banner(stdout, self.mode, &self.member)?;
211 write_crlf(stdout, &content)?;
212 flush_retry(stdout)?;
213 Ok(())
214 }
215
216 fn render_screen(&mut self, stdout: &mut impl Write, stdout_fd: i32) -> Result<()> {
217 let (rows, cols) = terminal_size(stdout_fd).unwrap_or((24, 80));
218 let content = render_pty_log_screen(&self.pty_log_path, rows.max(1), cols.max(1))?;
219 self.last_screen_mtime = file_modified(&self.pty_log_path);
220 clear_screen(stdout)?;
221 write_crlf(stdout, &content)?;
222 flush_retry(stdout)?;
223 Ok(())
224 }
225
226 fn render_compose(&mut self, stdout: &mut impl Write) -> Result<()> {
227 clear_screen(stdout)?;
228 write_retry(
229 stdout,
230 format!(
231 "\rmessage to {} [Enter send, Esc cancel]\r\n\r\n\rmessage> {}",
232 self.member, self.compose
233 )
234 .as_bytes(),
235 )?;
236 flush_retry(stdout)?;
237 Ok(())
238 }
239
240 fn send_message(&self, message: &str, stdout: &mut impl Write) -> Result<()> {
241 let exe = std::env::current_exe().context("failed to resolve batty binary")?;
242 let sender = supervising_sender(&self.project_root, &self.member);
243 let output = Command::new(exe)
244 .current_dir(&self.project_root)
245 .arg("send")
246 .arg("--from")
247 .arg(&sender)
248 .arg(&self.member)
249 .arg(message)
250 .output()
251 .with_context(|| format!("failed to send message to {}", self.member))?;
252 if !output.status.success() {
253 let stderr = String::from_utf8_lossy(&output.stderr);
254 write_retry(
255 stdout,
256 format!("\r\nsend failed: {}\r\n", stderr.trim()).as_bytes(),
257 )?;
258 flush_retry(stdout)?;
259 }
260 Ok(())
261 }
262}
263
264fn supervising_sender(project_root: &Path, member: &str) -> String {
265 let config_path = project_root
266 .join(".batty")
267 .join("team_config")
268 .join("team.yaml");
269 let Ok(team_config) = crate::team::config::TeamConfig::load(&config_path) else {
270 return "human".to_string();
271 };
272 let Ok(members) = crate::team::hierarchy::resolve_hierarchy(&team_config) else {
273 return "human".to_string();
274 };
275 members
276 .iter()
277 .find(|candidate| candidate.name == member)
278 .and_then(|candidate| candidate.reports_to.clone())
279 .unwrap_or_else(|| "human".to_string())
280}
281
282fn render_pty_log_screen(path: &Path, rows: u16, cols: u16) -> Result<String> {
283 let bytes = fs::read(path).unwrap_or_default();
284 let mut parser = vt100::Parser::new(rows, cols, 0);
285 parser.process(&bytes);
286 Ok(trim_screen(parser.screen().contents()))
287}
288
289fn trim_screen(content: String) -> String {
290 content
291 .lines()
292 .map(|line| line.trim_end())
293 .collect::<Vec<_>>()
294 .join("\n")
295}
296
297fn read_tail_lines(path: &Path, max_lines: usize) -> Result<String> {
298 let content = fs::read_to_string(path).unwrap_or_default();
299 let lines: Vec<&str> = content.lines().collect();
300 let start = lines.len().saturating_sub(max_lines);
301 Ok(lines[start..].join("\n"))
302}
303
304fn format_event_log_for_display(content: &str) -> String {
305 content
306 .lines()
307 .map(format_event_log_line)
308 .collect::<Vec<_>>()
309 .join("\n")
310}
311
312fn format_event_log_line(line: &str) -> String {
313 let Some(rest) = line.strip_prefix('[') else {
314 return line.to_string();
315 };
316 let Some((ts, remainder)) = rest.split_once(']') else {
317 return line.to_string();
318 };
319 let Ok(epoch) = ts.parse::<i64>() else {
320 return line.to_string();
321 };
322 let Some(dt) = Local.timestamp_opt(epoch, 0).single() else {
323 return line.to_string();
324 };
325 format!("[{}]{}", dt.format("%Y-%m-%d %H:%M:%S"), remainder)
326}
327
328fn file_modified(path: &Path) -> Option<std::time::SystemTime> {
329 fs::metadata(path).and_then(|meta| meta.modified()).ok()
330}
331
332fn write_banner(stdout: &mut impl Write, mode: PaneMode, member: &str) -> Result<()> {
333 let mode_name = match mode {
334 PaneMode::Log => "logs",
335 PaneMode::Screen => "screen",
336 PaneMode::Compose => "message",
337 };
338 write_retry(
339 stdout,
340 format!("\r[{member}] {mode_name} [l logs] [s screen] [m message]\r\n\r\n").as_bytes(),
341 )?;
342 Ok(())
343}
344
345fn write_crlf(stdout: &mut impl Write, text: &str) -> Result<()> {
346 for line in text.lines() {
347 write_retry(stdout, format!("\r{line}\r\n").as_bytes())?;
348 }
349 Ok(())
350}
351
352fn clear_screen(stdout: &mut impl Write) -> Result<()> {
353 write_retry(stdout, b"\x1b[2J\x1b[H")?;
354 Ok(())
355}
356
357fn write_retry(stdout: &mut impl Write, bytes: &[u8]) -> Result<()> {
358 let mut written = 0;
359 while written < bytes.len() {
360 match stdout.write(&bytes[written..]) {
361 Ok(0) => break,
362 Ok(n) => written += n,
363 Err(error) if is_transient_io_error(&error) => {
364 std::thread::sleep(Duration::from_millis(10));
365 }
366 Err(error) => return Err(error).context("failed to write pane output"),
367 }
368 }
369 Ok(())
370}
371
372fn flush_retry(stdout: &mut impl Write) -> Result<()> {
373 loop {
374 match stdout.flush() {
375 Ok(()) => return Ok(()),
376 Err(error) if is_transient_io_error(&error) => {
377 std::thread::sleep(Duration::from_millis(10));
378 }
379 Err(error) => return Err(error).context("failed to flush pane output"),
380 }
381 }
382}
383
384fn terminal_size(fd: i32) -> Option<(u16, u16)> {
385 let mut winsize = libc::winsize {
386 ws_row: 0,
387 ws_col: 0,
388 ws_xpixel: 0,
389 ws_ypixel: 0,
390 };
391 let rc = unsafe { libc::ioctl(fd, libc::TIOCGWINSZ, &mut winsize) };
393 if rc == 0 && winsize.ws_row > 0 && winsize.ws_col > 0 {
394 Some((winsize.ws_row, winsize.ws_col))
395 } else {
396 None
397 }
398}
399
400fn set_nonblocking(fd: i32, enabled: bool) -> Result<()> {
401 let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
403 if flags < 0 {
404 return Err(io::Error::last_os_error()).context("fcntl(F_GETFL) failed");
405 }
406 let new_flags = if enabled {
407 flags | libc::O_NONBLOCK
408 } else {
409 flags & !libc::O_NONBLOCK
410 };
411 if unsafe { libc::fcntl(fd, libc::F_SETFL, new_flags) } < 0 {
413 return Err(io::Error::last_os_error()).context("fcntl(F_SETFL) failed");
414 }
415 Ok(())
416}
417
418struct RawTerminal {
419 fd: i32,
420 original: libc::termios,
421}
422
423impl RawTerminal {
424 fn new() -> Result<Self> {
425 let fd = io::stdin().as_raw_fd();
426 let mut original = unsafe { std::mem::zeroed::<libc::termios>() };
427 if unsafe { libc::tcgetattr(fd, &mut original) } != 0 {
429 return Err(io::Error::last_os_error()).context("tcgetattr failed");
430 }
431 let mut raw = original;
432 raw.c_iflag &= !(libc::BRKINT | libc::ICRNL | libc::INPCK | libc::ISTRIP | libc::IXON);
433 raw.c_oflag &= !libc::OPOST;
434 raw.c_cflag |= libc::CS8;
435 raw.c_lflag &= !(libc::ECHO | libc::ICANON | libc::IEXTEN | libc::ISIG);
436 raw.c_cc[libc::VMIN] = 0;
437 raw.c_cc[libc::VTIME] = 1;
438 if unsafe { libc::tcsetattr(fd, libc::TCSAFLUSH, &raw) } != 0 {
439 return Err(io::Error::last_os_error()).context("tcsetattr failed");
440 }
441 Ok(Self { fd, original })
442 }
443}
444
445impl Drop for RawTerminal {
446 fn drop(&mut self) {
447 unsafe {
449 libc::tcsetattr(self.fd, libc::TCSAFLUSH, &self.original);
450 }
451 let _ = set_nonblocking(self.fd, false);
452 }
453}
454
455#[cfg(test)]
456mod tests {
457 use super::*;
458
459 #[test]
460 fn trim_screen_removes_trailing_spaces() {
461 let text = "abc \nxyz ".to_string();
462 assert_eq!(trim_screen(text), "abc\nxyz");
463 }
464
465 #[test]
466 fn read_tail_lines_limits_to_requested_count() {
467 let tmp = tempfile::tempdir().unwrap();
468 let path = tmp.path().join("events.log");
469 fs::write(&path, "1\n2\n3\n4\n").unwrap();
470 assert_eq!(read_tail_lines(&path, 2).unwrap(), "3\n4");
471 }
472
473 #[test]
474 fn format_event_log_line_rewrites_unix_timestamp() {
475 let formatted = format_event_log_line("[0] <- ready");
476 assert!(formatted.starts_with("["));
477 assert!(formatted.ends_with(" <- ready"));
478 assert_ne!(formatted, "[0] <- ready");
479 assert_eq!(formatted.len(), "[1970-01-01 00:00:00] <- ready".len());
480 }
481
482 #[test]
483 fn format_event_log_line_leaves_non_timestamp_lines_alone() {
484 assert_eq!(format_event_log_line("plain line"), "plain line");
485 assert_eq!(format_event_log_line("[abc] nope"), "[abc] nope");
486 }
487
488 #[test]
489 fn render_pty_log_screen_uses_requested_size() {
490 let tmp = tempfile::tempdir().unwrap();
491 let path = tmp.path().join("pty.log");
492 fs::write(&path, "0123456789abcdef").unwrap();
493 let rendered = render_pty_log_screen(&path, 2, 8).unwrap();
494 assert!(rendered.contains("01234567"));
495 }
496
497 #[test]
498 fn transient_io_error_recognizes_eagain() {
499 let error = io::Error::from_raw_os_error(libc::EAGAIN);
500 assert!(is_transient_io_error(&error));
501 }
502
503 #[test]
504 fn supervising_sender_prefers_parent_role() {
505 let tmp = tempfile::tempdir().unwrap();
506 let batty = tmp.path().join(".batty").join("team_config");
507 fs::create_dir_all(&batty).unwrap();
508 fs::write(
509 batty.join("team.yaml"),
510 r#"
511name: test
512roles:
513 - name: architect
514 role_type: architect
515 agent: claude
516 instances: 1
517 prompt: architect.md
518 - name: manager
519 role_type: manager
520 agent: claude
521 instances: 1
522 prompt: manager.md
523 talks_to: [architect, engineer]
524 - name: engineer
525 role_type: engineer
526 agent: codex
527 instances: 1
528 prompt: engineer.md
529 talks_to: [manager]
530"#,
531 )
532 .unwrap();
533
534 assert_eq!(supervising_sender(tmp.path(), "eng-1-1"), "manager");
535 assert_eq!(supervising_sender(tmp.path(), "manager"), "architect");
536 assert_eq!(supervising_sender(tmp.path(), "architect"), "human");
537 }
538}