1use std::collections::VecDeque;
8use std::io::{Read, Write as IoWrite};
9use std::path::PathBuf;
10use std::sync::{Arc, Mutex};
11use std::thread;
12use std::time::Duration;
13use std::time::Instant;
14
15use anyhow::{Context, Result};
16use portable_pty::{Child, CommandBuilder, PtySize};
17
18use super::classifier::{self, AgentType, ScreenVerdict};
19use super::common::{self, QueuedMessage};
20use super::protocol::{Channel, Command, Event, ShimState};
21use super::pty_log::PtyLogWriter;
22
23const DEFAULT_ROWS: u16 = 50;
28const DEFAULT_COLS: u16 = 220;
29const SCROLLBACK_LINES: usize = 5000;
30
31const POLL_INTERVAL_MS: u64 = 250;
33
34const WORKING_DWELL_MS: u64 = 300;
39
40const KIRO_IDLE_SETTLE_MS: u64 = 1200;
43
44const READY_TIMEOUT_SECS: u64 = 120;
46use common::MAX_QUEUE_DEPTH;
47use common::SESSION_STATS_INTERVAL_SECS;
48
49const PROCESS_EXIT_POLL_MS: u64 = 100;
50const PARENT_DEATH_POLL_SECS: u64 = 1;
51const GROUP_TERM_GRACE_SECS: u64 = 2;
52
53fn format_injected_message(sender: &str, body: &str) -> String {
54 common::format_injected_message(sender, body)
55}
56
57fn shell_single_quote(input: &str) -> String {
58 input.replace('\'', "'\\''")
59}
60
61fn build_supervised_agent_command(command: &str, shim_pid: u32) -> String {
62 let escaped_command = shell_single_quote(command);
63 format!(
64 "shim_pid={shim_pid}; \
65 agent_root_pid=$$; \
66 agent_pgid=$$; \
67 setsid sh -c ' \
68 shim_pid=\"$1\"; \
69 agent_pgid=\"$2\"; \
70 agent_root_pid=\"$3\"; \
71 collect_descendants() {{ \
72 parent_pid=\"$1\"; \
73 for child_pid in $(pgrep -P \"$parent_pid\" 2>/dev/null); do \
74 printf \"%s\\n\" \"$child_pid\"; \
75 collect_descendants \"$child_pid\"; \
76 done; \
77 }}; \
78 while kill -0 \"$shim_pid\" 2>/dev/null; do sleep {PARENT_DEATH_POLL_SECS}; done; \
79 descendant_pids=$(collect_descendants \"$agent_root_pid\"); \
80 kill -TERM -- -\"$agent_pgid\" >/dev/null 2>&1 || true; \
81 for descendant_pid in $descendant_pids; do kill -TERM \"$descendant_pid\" >/dev/null 2>&1 || true; done; \
82 sleep {GROUP_TERM_GRACE_SECS}; \
83 kill -KILL -- -\"$agent_pgid\" >/dev/null 2>&1 || true; \
84 for descendant_pid in $descendant_pids; do kill -KILL \"$descendant_pid\" >/dev/null 2>&1 || true; done \
85 ' _ \"$shim_pid\" \"$agent_pgid\" \"$agent_root_pid\" >/dev/null 2>&1 < /dev/null & \
86 exec bash -lc '{escaped_command}'"
87 )
88}
89
90#[cfg(unix)]
91fn signal_process_group(child: &dyn Child, signal: libc::c_int) -> std::io::Result<()> {
92 let pid = child
93 .process_id()
94 .ok_or_else(|| std::io::Error::other("child process id unavailable"))?;
95 let result = unsafe { libc::killpg(pid as libc::pid_t, signal) };
96 if result == 0 {
97 Ok(())
98 } else {
99 Err(std::io::Error::last_os_error())
100 }
101}
102
103fn terminate_agent_group(
104 child: &mut Box<dyn Child + Send + Sync>,
105 sigterm_grace: Duration,
106) -> std::io::Result<()> {
107 #[cfg(unix)]
108 {
109 signal_process_group(child.as_ref(), libc::SIGTERM)?;
110 let deadline = Instant::now() + sigterm_grace;
111 while Instant::now() <= deadline {
112 if let Ok(Some(_)) = child.try_wait() {
113 return Ok(());
114 }
115 thread::sleep(Duration::from_millis(PROCESS_EXIT_POLL_MS));
116 }
117
118 signal_process_group(child.as_ref(), libc::SIGKILL)?;
119 return Ok(());
120 }
121
122 #[allow(unreachable_code)]
123 child.kill()
124}
125
126fn pty_write_paced(
130 pty_writer: &Arc<Mutex<Box<dyn std::io::Write + Send>>>,
131 agent_type: AgentType,
132 body: &[u8],
133 enter: &[u8],
134) -> std::io::Result<()> {
135 match agent_type {
143 AgentType::Generic => {
144 let mut writer = pty_writer.lock().unwrap();
146 writer.write_all(body)?;
147 writer.write_all(enter)?;
148 writer.flush()?;
149 }
150 _ => {
151 let mut writer = pty_writer.lock().unwrap();
153 writer.write_all(b"\x1b[200~")?;
154 writer.write_all(body)?;
155 writer.write_all(b"\x1b[201~")?;
156 writer.flush()?;
157 drop(writer);
158
159 std::thread::sleep(std::time::Duration::from_millis(200));
161
162 let mut writer = pty_writer.lock().unwrap();
163 writer.write_all(enter)?;
164 writer.flush()?;
165 }
166 }
167 Ok(())
168}
169
170fn enter_seq(agent_type: AgentType) -> &'static str {
174 match agent_type {
175 AgentType::Generic => "\n",
176 _ => "\r", }
178}
179
180#[derive(Debug, Clone)]
185pub struct ShimArgs {
186 pub id: String,
187 pub agent_type: AgentType,
188 pub cmd: String,
189 pub cwd: PathBuf,
190 pub rows: u16,
191 pub cols: u16,
192 pub pty_log_path: Option<PathBuf>,
195}
196
197struct ShimInner {
204 parser: vt100::Parser,
205 state: ShimState,
206 state_changed_at: Instant,
207 last_screen_hash: u64,
208 last_pty_output_at: Instant,
209 started_at: Instant,
210 cumulative_output_bytes: u64,
211 pre_injection_content: String,
212 pending_message_id: Option<String>,
213 agent_type: AgentType,
214 message_queue: VecDeque<QueuedMessage>,
217 dialogs_dismissed: u8,
219 last_working_screen: String,
223}
224
225impl ShimInner {
226 fn screen_contents(&self) -> String {
227 self.parser.screen().contents()
228 }
229
230 fn last_n_lines(&self, n: usize) -> String {
231 let content = self.parser.screen().contents();
232 let lines: Vec<&str> = content.lines().collect();
233 let start = lines.len().saturating_sub(n);
234 lines[start..].join("\n")
235 }
236
237 fn cursor_position(&self) -> (u16, u16) {
238 self.parser.screen().cursor_position()
239 }
240}
241
242fn content_hash(s: &str) -> u64 {
247 let mut hash: u64 = 0xcbf29ce484222325;
248 for byte in s.bytes() {
249 hash ^= byte as u64;
250 hash = hash.wrapping_mul(0x100000001b3);
251 }
252 hash
253}
254
255pub fn run(args: ShimArgs, channel: Channel) -> Result<()> {
263 let rows = if args.rows > 0 {
264 args.rows
265 } else {
266 DEFAULT_ROWS
267 };
268 let cols = if args.cols > 0 {
269 args.cols
270 } else {
271 DEFAULT_COLS
272 };
273
274 let pty_system = portable_pty::native_pty_system();
276 let pty_pair = pty_system
277 .openpty(PtySize {
278 rows,
279 cols,
280 pixel_width: 0,
281 pixel_height: 0,
282 })
283 .context("failed to create PTY")?;
284
285 let shim_pid = std::process::id();
287 let supervised_cmd = build_supervised_agent_command(&args.cmd, shim_pid);
288
289 let mut cmd = CommandBuilder::new("bash");
290 cmd.args(["-lc", &supervised_cmd]);
291 cmd.cwd(&args.cwd);
292 cmd.env_remove("CLAUDECODE"); cmd.env("TERM", "xterm-256color");
294 cmd.env("COLORTERM", "truecolor");
295
296 let mut child = pty_pair
297 .slave
298 .spawn_command(cmd)
299 .context("failed to spawn agent CLI")?;
300
301 drop(pty_pair.slave);
303
304 let mut pty_reader = pty_pair
305 .master
306 .try_clone_reader()
307 .context("failed to clone PTY reader")?;
308
309 let pty_writer = pty_pair
310 .master
311 .take_writer()
312 .context("failed to take PTY writer")?;
313
314 let inner = Arc::new(Mutex::new(ShimInner {
316 parser: vt100::Parser::new(rows, cols, SCROLLBACK_LINES),
317 state: ShimState::Starting,
318 state_changed_at: Instant::now(),
319 last_screen_hash: 0,
320 last_pty_output_at: Instant::now(),
321 started_at: Instant::now(),
322 cumulative_output_bytes: 0,
323 pre_injection_content: String::new(),
324 pending_message_id: None,
325 agent_type: args.agent_type,
326 message_queue: VecDeque::new(),
327 dialogs_dismissed: 0,
328 last_working_screen: String::new(),
329 }));
330
331 let pty_log: Option<Mutex<PtyLogWriter>> = args
333 .pty_log_path
334 .as_deref()
335 .map(|p| PtyLogWriter::new(p).context("failed to create PTY log"))
336 .transpose()?
337 .map(Mutex::new);
338 let pty_log = pty_log.map(Arc::new);
339
340 let pty_writer = Arc::new(Mutex::new(pty_writer));
342
343 let mut cmd_channel = channel;
345 let mut evt_channel = cmd_channel.try_clone().context("failed to clone channel")?;
346
347 let inner_pty = Arc::clone(&inner);
349 let log_handle = pty_log.clone();
350 let pty_writer_pty = Arc::clone(&pty_writer);
351 let pty_handle = std::thread::spawn(move || {
352 let mut buf = [0u8; 4096];
353 loop {
354 match pty_reader.read(&mut buf) {
355 Ok(0) => break, Ok(n) => {
357 if let Some(ref log) = log_handle {
359 let _ = log.lock().unwrap().write(&buf[..n]);
360 }
361
362 let mut inner = inner_pty.lock().unwrap();
363 inner.last_pty_output_at = Instant::now();
364 inner.cumulative_output_bytes =
365 inner.cumulative_output_bytes.saturating_add(n as u64);
366 inner.parser.process(&buf[..n]);
367
368 let content = inner.parser.screen().contents();
375 let hash = content_hash(&content);
376 if hash == inner.last_screen_hash {
377 continue; }
379 inner.last_screen_hash = hash;
380
381 let verdict = classifier::classify(inner.agent_type, inner.parser.screen());
382 let old_state = inner.state;
383
384 if old_state == ShimState::Working {
388 inner.last_working_screen = content.clone();
389 }
390
391 let working_too_short = old_state == ShimState::Working
395 && inner.state_changed_at.elapsed().as_millis() < WORKING_DWELL_MS as u128;
396 let new_state = match (old_state, verdict) {
397 (ShimState::Starting, ScreenVerdict::AgentIdle) => Some(ShimState::Idle),
398 (ShimState::Idle, ScreenVerdict::AgentIdle) => None,
399 (ShimState::Working, ScreenVerdict::AgentIdle) if working_too_short => None,
400 (ShimState::Working, ScreenVerdict::AgentIdle)
401 if inner.agent_type == AgentType::Kiro =>
402 {
403 None
404 }
405 (ShimState::Working, ScreenVerdict::AgentIdle) => Some(ShimState::Idle),
406 (ShimState::Working, ScreenVerdict::AgentWorking) => None,
407 (_, ScreenVerdict::ContextExhausted) => Some(ShimState::ContextExhausted),
408 (_, ScreenVerdict::Unknown) => None,
409 (ShimState::Idle, ScreenVerdict::AgentWorking) => Some(ShimState::Working),
410 (ShimState::Starting, ScreenVerdict::AgentWorking) => {
411 Some(ShimState::Working)
412 }
413 _ => None,
414 };
415
416 if let Some(new) = new_state {
417 let summary = inner.last_n_lines(5);
418 inner.state = new;
419 inner.state_changed_at = Instant::now();
420
421 let pre_content = inner.pre_injection_content.clone();
422 let current_content = inner.screen_contents();
423 let working_screen = inner.last_working_screen.clone();
424 let msg_id = inner.pending_message_id.take();
425
426 let drain_errors =
428 if new == ShimState::Dead || new == ShimState::ContextExhausted {
429 drain_queue_errors(&mut inner.message_queue, new)
430 } else {
431 Vec::new()
432 };
433
434 let queued_msg = if old_state == ShimState::Working
436 && new == ShimState::Idle
437 && !inner.message_queue.is_empty()
438 {
439 inner.message_queue.pop_front()
440 } else {
441 None
442 };
443
444 if let Some(ref msg) = queued_msg {
446 inner.pre_injection_content = inner.screen_contents();
447 inner.pending_message_id = msg.message_id.clone();
448 inner.state = ShimState::Working;
449 inner.state_changed_at = Instant::now();
450 }
451
452 let queue_depth = inner.message_queue.len();
453 let agent_type_for_enter = inner.agent_type;
454 let queued_injected = queued_msg
455 .as_ref()
456 .map(|msg| format_injected_message(&msg.from, &msg.body));
457
458 drop(inner); let events = build_transition_events(
461 old_state,
462 new,
463 &summary,
464 &pre_content,
465 ¤t_content,
466 &working_screen,
467 msg_id,
468 );
469
470 for event in events {
471 if evt_channel.send(&event).is_err() {
472 return; }
474 }
475
476 for event in drain_errors {
478 if evt_channel.send(&event).is_err() {
479 return;
480 }
481 }
482
483 if let Some(msg) = queued_msg {
485 let enter = enter_seq(agent_type_for_enter);
486 let injected = queued_injected.as_deref().unwrap_or(msg.body.as_str());
487 if let Err(e) = pty_write_paced(
488 &pty_writer_pty,
489 agent_type_for_enter,
490 injected.as_bytes(),
491 enter.as_bytes(),
492 ) {
493 let _ = evt_channel.send(&Event::Error {
494 command: "SendMessage".into(),
495 reason: format!("PTY write failed for queued message: {e}"),
496 });
497 }
498
499 let _ = evt_channel.send(&Event::StateChanged {
501 from: ShimState::Idle,
502 to: ShimState::Working,
503 summary: format!(
504 "delivering queued message ({} remaining)",
505 queue_depth
506 ),
507 });
508 }
509 }
510 }
511 Err(_) => break, }
513 }
514
515 let mut inner = inner_pty.lock().unwrap();
517 let last_lines = inner.last_n_lines(10);
518 let old = inner.state;
519 inner.state = ShimState::Dead;
520
521 let drain_errors = drain_queue_errors(&mut inner.message_queue, ShimState::Dead);
523 drop(inner);
524
525 let _ = evt_channel.send(&Event::StateChanged {
526 from: old,
527 to: ShimState::Dead,
528 summary: last_lines.clone(),
529 });
530
531 let _ = evt_channel.send(&Event::Died {
532 exit_code: None,
533 last_lines,
534 });
535
536 for event in drain_errors {
537 let _ = evt_channel.send(&event);
538 }
539 });
540
541 let inner_idle = Arc::clone(&inner);
545 let pty_writer_idle = Arc::clone(&pty_writer);
546 let mut idle_channel = cmd_channel.try_clone().context("failed to clone channel")?;
547 std::thread::spawn(move || {
548 loop {
549 std::thread::sleep(std::time::Duration::from_millis(POLL_INTERVAL_MS));
550
551 let mut inner = inner_idle.lock().unwrap();
552 if inner.agent_type != AgentType::Kiro || inner.state != ShimState::Working {
553 continue;
554 }
555 if inner.last_pty_output_at.elapsed().as_millis() < KIRO_IDLE_SETTLE_MS as u128 {
556 continue;
557 }
558 if classifier::classify(inner.agent_type, inner.parser.screen())
559 != ScreenVerdict::AgentIdle
560 {
561 continue;
562 }
563
564 let summary = inner.last_n_lines(5);
565 let pre_content = inner.pre_injection_content.clone();
566 let current_content = inner.screen_contents();
567 let working_screen = inner.last_working_screen.clone();
568 let msg_id = inner.pending_message_id.take();
569
570 inner.state = ShimState::Idle;
571 inner.state_changed_at = Instant::now();
572
573 let queued_msg = if !inner.message_queue.is_empty() {
574 inner.message_queue.pop_front()
575 } else {
576 None
577 };
578
579 if let Some(ref msg) = queued_msg {
580 inner.pre_injection_content = inner.screen_contents();
581 inner.pending_message_id = msg.message_id.clone();
582 inner.state = ShimState::Working;
583 inner.state_changed_at = Instant::now();
584 }
585
586 let queue_depth = inner.message_queue.len();
587 let agent_type_for_enter = inner.agent_type;
588 let queued_injected = queued_msg
589 .as_ref()
590 .map(|msg| format_injected_message(&msg.from, &msg.body));
591 drop(inner);
592
593 for event in build_transition_events(
594 ShimState::Working,
595 ShimState::Idle,
596 &summary,
597 &pre_content,
598 ¤t_content,
599 &working_screen,
600 msg_id,
601 ) {
602 if idle_channel.send(&event).is_err() {
603 return;
604 }
605 }
606
607 if let Some(msg) = queued_msg {
608 let enter = enter_seq(agent_type_for_enter);
609 let injected = queued_injected.as_deref().unwrap_or(msg.body.as_str());
610 if let Err(e) = pty_write_paced(
611 &pty_writer_idle,
612 agent_type_for_enter,
613 injected.as_bytes(),
614 enter.as_bytes(),
615 ) {
616 let _ = idle_channel.send(&Event::Error {
617 command: "SendMessage".into(),
618 reason: format!("PTY write failed for queued message: {e}"),
619 });
620 continue;
621 }
622
623 let _ = idle_channel.send(&Event::StateChanged {
624 from: ShimState::Idle,
625 to: ShimState::Working,
626 summary: format!("delivering queued message ({} remaining)", queue_depth),
627 });
628 }
629 }
630 });
631
632 let inner_poll = Arc::clone(&inner);
638 let mut poll_channel = cmd_channel
639 .try_clone()
640 .context("failed to clone channel for poll thread")?;
641 std::thread::spawn(move || {
642 loop {
643 std::thread::sleep(std::time::Duration::from_secs(5));
644 let mut inner = inner_poll.lock().unwrap();
645 if inner.state != ShimState::Working {
646 continue;
647 }
648 if inner.last_pty_output_at.elapsed().as_secs() < 2 {
650 continue;
651 }
652 let verdict = classifier::classify(inner.agent_type, inner.parser.screen());
653 if verdict == classifier::ScreenVerdict::AgentIdle {
654 let summary = inner.last_n_lines(5);
655 inner.state = ShimState::Idle;
656 inner.state_changed_at = Instant::now();
657 drop(inner);
658
659 let _ = poll_channel.send(&Event::StateChanged {
662 from: ShimState::Working,
663 to: ShimState::Idle,
664 summary,
665 });
666 }
667 }
668 });
669
670 let inner_stats = Arc::clone(&inner);
671 let mut stats_channel = cmd_channel
672 .try_clone()
673 .context("failed to clone channel for stats thread")?;
674 std::thread::spawn(move || {
675 loop {
676 std::thread::sleep(Duration::from_secs(SESSION_STATS_INTERVAL_SECS));
677 let inner = inner_stats.lock().unwrap();
678 if inner.state == ShimState::Dead {
679 return;
680 }
681 let output_bytes = inner.cumulative_output_bytes;
682 let uptime_secs = inner.started_at.elapsed().as_secs();
683 drop(inner);
684
685 if stats_channel
686 .send(&Event::SessionStats {
687 output_bytes,
688 uptime_secs,
689 })
690 .is_err()
691 {
692 return;
693 }
694 }
695 });
696
697 let inner_cmd = Arc::clone(&inner);
699
700 let start = Instant::now();
704 loop {
705 let mut inner = inner_cmd.lock().unwrap();
706 let state = inner.state;
707 match state {
708 ShimState::Starting => {
709 if inner.dialogs_dismissed < 10 {
711 let content = inner.screen_contents();
712 if classifier::detect_startup_dialog(&content) {
713 let attempt = inner.dialogs_dismissed + 1;
714 let enter = enter_seq(inner.agent_type);
715 inner.dialogs_dismissed = attempt;
716 drop(inner);
717 eprintln!(
718 "[shim {}] auto-dismissing startup dialog (attempt {attempt})",
719 args.id
720 );
721 let mut writer = pty_writer.lock().unwrap();
722 writer.write_all(enter.as_bytes()).ok();
723 writer.flush().ok();
724 std::thread::sleep(std::time::Duration::from_millis(POLL_INTERVAL_MS));
725 continue;
726 }
727 }
728 drop(inner);
729
730 if start.elapsed().as_secs() > READY_TIMEOUT_SECS {
731 let last = inner_cmd.lock().unwrap().last_n_lines(10);
732 cmd_channel.send(&Event::Error {
733 command: "startup".into(),
734 reason: format!(
735 "agent did not show prompt within {}s. Last lines:\n{}",
736 READY_TIMEOUT_SECS, last,
737 ),
738 })?;
739 terminate_agent_group(&mut child, Duration::from_secs(GROUP_TERM_GRACE_SECS))
740 .ok();
741 return Ok(());
742 }
743 thread::sleep(Duration::from_millis(POLL_INTERVAL_MS));
744 }
745 ShimState::Dead => {
746 drop(inner);
747 return Ok(());
748 }
749 ShimState::Idle => {
750 drop(inner);
751 cmd_channel.send(&Event::Ready)?;
752 break;
753 }
754 _ => {
755 drop(inner);
758 if start.elapsed().as_secs() > READY_TIMEOUT_SECS {
759 let last = inner_cmd.lock().unwrap().last_n_lines(10);
760 cmd_channel.send(&Event::Error {
761 command: "startup".into(),
762 reason: format!(
763 "agent did not reach idle within {}s (state: {}). Last lines:\n{}",
764 READY_TIMEOUT_SECS, state, last,
765 ),
766 })?;
767 terminate_agent_group(&mut child, Duration::from_secs(GROUP_TERM_GRACE_SECS))
768 .ok();
769 return Ok(());
770 }
771 thread::sleep(Duration::from_millis(POLL_INTERVAL_MS));
772 }
773 }
774 }
775
776 loop {
778 let cmd = match cmd_channel.recv::<Command>() {
779 Ok(Some(c)) => c,
780 Ok(None) => {
781 eprintln!(
782 "[shim {}] orchestrator disconnected, shutting down",
783 args.id
784 );
785 terminate_agent_group(&mut child, Duration::from_secs(GROUP_TERM_GRACE_SECS)).ok();
786 break;
787 }
788 Err(e) => {
789 eprintln!("[shim {}] channel error: {e}", args.id);
790 terminate_agent_group(&mut child, Duration::from_secs(GROUP_TERM_GRACE_SECS)).ok();
791 break;
792 }
793 };
794
795 match cmd {
796 Command::SendMessage {
797 from,
798 body,
799 message_id,
800 } => {
801 let mut inner = inner_cmd.lock().unwrap();
802 match inner.state {
803 ShimState::Idle => {
804 inner.pre_injection_content = inner.screen_contents();
805 inner.pending_message_id = message_id;
806 let agent_type = inner.agent_type;
807 let enter = enter_seq(agent_type);
808 let injected = format_injected_message(&from, &body);
809 drop(inner);
810 if let Err(e) = pty_write_paced(
814 &pty_writer,
815 agent_type,
816 injected.as_bytes(),
817 enter.as_bytes(),
818 ) {
819 cmd_channel.send(&Event::Error {
820 command: "SendMessage".into(),
821 reason: format!("PTY write failed: {e}"),
822 })?;
823 continue;
825 }
826 let mut inner = inner_cmd.lock().unwrap();
827
828 let old = inner.state;
829 inner.state = ShimState::Working;
830 inner.state_changed_at = Instant::now();
831 let summary = inner.last_n_lines(3);
832 drop(inner);
833
834 cmd_channel.send(&Event::StateChanged {
835 from: old,
836 to: ShimState::Working,
837 summary,
838 })?;
839 }
840 ShimState::Working => {
841 if inner.message_queue.len() >= MAX_QUEUE_DEPTH {
843 let dropped = inner.message_queue.pop_front();
844 let dropped_id = dropped.as_ref().and_then(|m| m.message_id.clone());
845 inner.message_queue.push_back(QueuedMessage {
846 from,
847 body,
848 message_id,
849 });
850 let depth = inner.message_queue.len();
851 drop(inner);
852
853 cmd_channel.send(&Event::Error {
854 command: "SendMessage".into(),
855 reason: format!(
856 "message queue full ({MAX_QUEUE_DEPTH}), dropped oldest message{}",
857 dropped_id
858 .map(|id| format!(" (id: {id})"))
859 .unwrap_or_default(),
860 ),
861 })?;
862 cmd_channel.send(&Event::Warning {
863 message: format!(
864 "message queued while agent working (depth: {depth})"
865 ),
866 idle_secs: None,
867 })?;
868 } else {
869 inner.message_queue.push_back(QueuedMessage {
870 from,
871 body,
872 message_id,
873 });
874 let depth = inner.message_queue.len();
875 drop(inner);
876
877 cmd_channel.send(&Event::Warning {
878 message: format!(
879 "message queued while agent working (depth: {depth})"
880 ),
881 idle_secs: None,
882 })?;
883 }
884 }
885 other => {
886 cmd_channel.send(&Event::Error {
887 command: "SendMessage".into(),
888 reason: format!("agent in {other} state, cannot accept message"),
889 })?;
890 }
891 }
892 }
893
894 Command::CaptureScreen { last_n_lines } => {
895 let inner = inner_cmd.lock().unwrap();
896 let content = match last_n_lines {
897 Some(n) => inner.last_n_lines(n),
898 None => inner.screen_contents(),
899 };
900 let (row, col) = inner.cursor_position();
901 drop(inner);
902 cmd_channel.send(&Event::ScreenCapture {
903 content,
904 cursor_row: row,
905 cursor_col: col,
906 })?;
907 }
908
909 Command::GetState => {
910 let inner = inner_cmd.lock().unwrap();
911 let since = inner.state_changed_at.elapsed().as_secs();
912 let state = inner.state;
913 drop(inner);
914 cmd_channel.send(&Event::State {
915 state,
916 since_secs: since,
917 })?;
918 }
919
920 Command::Resize { rows, cols } => {
921 pty_pair
922 .master
923 .resize(PtySize {
924 rows,
925 cols,
926 pixel_width: 0,
927 pixel_height: 0,
928 })
929 .ok();
930 let mut inner = inner_cmd.lock().unwrap();
931 inner.parser.set_size(rows, cols);
932 }
933
934 Command::Ping => {
935 cmd_channel.send(&Event::Pong)?;
936 }
937
938 Command::Shutdown { timeout_secs } => {
939 eprintln!(
940 "[shim {}] shutdown requested (timeout: {}s)",
941 args.id, timeout_secs
942 );
943 {
944 let mut writer = pty_writer.lock().unwrap();
945 writer.write_all(b"\x03").ok(); writer.flush().ok();
947 }
948 let deadline = Instant::now() + Duration::from_secs(timeout_secs as u64);
949 loop {
950 if Instant::now() > deadline {
951 terminate_agent_group(
952 &mut child,
953 Duration::from_secs(GROUP_TERM_GRACE_SECS),
954 )
955 .ok();
956 break;
957 }
958 if let Ok(Some(_)) = child.try_wait() {
959 break;
960 }
961 thread::sleep(Duration::from_millis(PROCESS_EXIT_POLL_MS));
962 }
963 break;
964 }
965
966 Command::Kill => {
967 terminate_agent_group(&mut child, Duration::from_secs(GROUP_TERM_GRACE_SECS)).ok();
968 break;
969 }
970 }
971 }
972
973 pty_handle.join().ok();
974 Ok(())
975}
976
977fn drain_queue_errors(
978 queue: &mut VecDeque<QueuedMessage>,
979 terminal_state: ShimState,
980) -> Vec<Event> {
981 common::drain_queue_errors(queue, terminal_state)
982}
983
984fn build_transition_events(
989 from: ShimState,
990 to: ShimState,
991 summary: &str,
992 pre_injection_content: &str,
993 current_content: &str,
994 last_working_screen: &str,
995 message_id: Option<String>,
996) -> Vec<Event> {
997 let summary = sanitize_summary(summary);
998 let mut events = vec![Event::StateChanged {
999 from,
1000 to,
1001 summary: summary.clone(),
1002 }];
1003
1004 if from == ShimState::Working && to == ShimState::Idle && !pre_injection_content.is_empty() {
1008 let mut response = extract_response(pre_injection_content, current_content);
1012 if response.is_empty() && !last_working_screen.is_empty() {
1013 response = extract_response(pre_injection_content, last_working_screen);
1014 }
1015 events.push(Event::Completion {
1016 message_id,
1017 response,
1018 last_lines: summary.clone(),
1019 });
1020 }
1021
1022 if to == ShimState::ContextExhausted {
1024 events.push(Event::ContextExhausted {
1025 message: "Agent reported context exhaustion".to_string(),
1026 last_lines: summary,
1027 });
1028 }
1029
1030 events
1031}
1032
1033fn sanitize_summary(summary: &str) -> String {
1034 let cleaned: Vec<String> = summary
1035 .lines()
1036 .filter_map(|line| {
1037 let trimmed = line.trim();
1038 if trimmed.is_empty() || is_tui_chrome(line) || is_prompt_line(trimmed) {
1039 return None;
1040 }
1041 Some(strip_claude_bullets(trimmed))
1042 })
1043 .collect();
1044
1045 if cleaned.is_empty() {
1046 String::new()
1047 } else {
1048 cleaned.join("\n")
1049 }
1050}
1051
1052fn extract_response(pre: &str, current: &str) -> String {
1056 let pre_lines: Vec<&str> = pre.lines().collect();
1057 let cur_lines: Vec<&str> = current.lines().collect();
1058
1059 let overlap = pre_lines.len().min(cur_lines.len());
1060 let mut diverge_at = 0;
1061 for i in 0..overlap {
1062 if pre_lines[i] != cur_lines[i] {
1063 break;
1064 }
1065 diverge_at = i + 1;
1066 }
1067
1068 let response_lines = &cur_lines[diverge_at..];
1069 if response_lines.is_empty() {
1070 return String::new();
1071 }
1072
1073 let filtered: Vec<&str> = response_lines
1075 .iter()
1076 .filter(|line| !is_tui_chrome(line))
1077 .copied()
1078 .collect();
1079
1080 if filtered.is_empty() {
1081 return String::new();
1082 }
1083
1084 let mut end = filtered.len();
1086 while end > 0 && filtered[end - 1].trim().is_empty() {
1087 end -= 1;
1088 }
1089 while end > 0 && is_prompt_line(filtered[end - 1].trim()) {
1090 end -= 1;
1091 }
1092 while end > 0 && filtered[end - 1].trim().is_empty() {
1093 end -= 1;
1094 }
1095
1096 let mut start = 0;
1098 while start < end {
1099 let trimmed = filtered[start].trim();
1100 if trimmed.is_empty() {
1101 start += 1;
1102 } else if trimmed.starts_with('\u{276F}')
1103 && !trimmed['\u{276F}'.len_utf8()..].trim().is_empty()
1104 {
1105 start += 1;
1107 } else {
1108 break;
1109 }
1110 }
1111
1112 let cleaned: Vec<String> = filtered[start..end]
1114 .iter()
1115 .map(|line| strip_claude_bullets(line))
1116 .collect();
1117
1118 cleaned.join("\n")
1119}
1120
1121fn strip_claude_bullets(line: &str) -> String {
1123 let trimmed = line.trim_start();
1124 if trimmed.starts_with('\u{23FA}') {
1125 let after = &trimmed['\u{23FA}'.len_utf8()..];
1126 let leading = line.len() - line.trim_start().len();
1128 format!("{}{}", &" ".repeat(leading), after.trim_start())
1129 } else {
1130 line.to_string()
1131 }
1132}
1133
1134fn is_tui_chrome(line: &str) -> bool {
1138 let trimmed = line.trim();
1139 if trimmed.is_empty() {
1140 return false; }
1142
1143 if trimmed.chars().all(|c| {
1145 matches!(
1146 c,
1147 '─' | '━'
1148 | '═'
1149 | '╌'
1150 | '╍'
1151 | '┄'
1152 | '┅'
1153 | '╶'
1154 | '╴'
1155 | '╸'
1156 | '╺'
1157 | '│'
1158 | '┃'
1159 | '╎'
1160 | '╏'
1161 | '┊'
1162 | '┋'
1163 )
1164 }) {
1165 return true;
1166 }
1167
1168 if trimmed.contains("\u{23F5}\u{23F5}") || trimmed.contains("bypass permissions") {
1170 return true;
1171 }
1172 if trimmed.contains("shift+tab") && trimmed.len() < 80 {
1173 return true;
1174 }
1175
1176 if trimmed.starts_with('$') && trimmed.contains("token") {
1178 return true;
1179 }
1180
1181 let braille_count = trimmed
1183 .chars()
1184 .filter(|c| ('\u{2800}'..='\u{28FF}').contains(c))
1185 .count();
1186 if braille_count > 5 {
1187 return true;
1188 }
1189
1190 let lower = trimmed.to_lowercase();
1192 if lower.contains("welcome to the new kiro") || lower.contains("/feedback command") {
1193 return true;
1194 }
1195
1196 if lower.starts_with("kiro") && lower.contains('\u{25D4}') {
1198 return true;
1200 }
1201
1202 if trimmed.starts_with('╭') || trimmed.starts_with('╰') || trimmed.starts_with('│') {
1204 return true;
1205 }
1206
1207 if lower.starts_with("tip:") || (trimmed.starts_with('⚠') && lower.contains("limit")) {
1209 return true;
1210 }
1211
1212 if lower.contains("ask a question") || lower.contains("describe a task") {
1214 return true;
1215 }
1216
1217 false
1218}
1219
1220fn is_prompt_line(line: &str) -> bool {
1221 line == "\u{276F}"
1222 || line.starts_with("\u{276F} ")
1223 || line == "\u{203A}"
1224 || line.starts_with("\u{203A} ")
1225 || line.ends_with("$ ")
1226 || line.ends_with('$')
1227 || line.ends_with("% ")
1228 || line.ends_with('%')
1229 || line == ">"
1230 || line.starts_with("Kiro>")
1231}
1232
1233#[cfg(test)]
1238mod tests {
1239 use super::*;
1240
1241 #[test]
1242 fn extract_response_basic() {
1243 let pre = "line1\nline2\n$ ";
1244 let cur = "line1\nline2\nhello world\n$ ";
1245 assert_eq!(extract_response(pre, cur), "hello world");
1246 }
1247
1248 #[test]
1249 fn extract_response_multiline() {
1250 let pre = "$ ";
1251 let cur = "$ echo hi\nhi\n$ ";
1252 let resp = extract_response(pre, cur);
1253 assert!(resp.contains("echo hi"));
1254 assert!(resp.contains("hi"));
1255 }
1256
1257 #[test]
1258 fn extract_response_empty() {
1259 let pre = "$ ";
1260 let cur = "$ ";
1261 assert_eq!(extract_response(pre, cur), "");
1262 }
1263
1264 #[test]
1265 fn content_hash_deterministic() {
1266 assert_eq!(content_hash("hello"), content_hash("hello"));
1267 assert_ne!(content_hash("hello"), content_hash("world"));
1268 }
1269
1270 #[test]
1271 fn shell_single_quote_escapes_embedded_quote() {
1272 assert_eq!(shell_single_quote("fix user's bug"), "fix user'\\''s bug");
1273 }
1274
1275 #[test]
1276 fn supervised_command_contains_watchdog_and_exec() {
1277 let command = build_supervised_agent_command("kiro-cli chat 'hello'", 4242);
1278 assert!(command.contains("shim_pid=4242"));
1279 assert!(command.contains("agent_root_pid=$$"));
1280 assert!(command.contains("agent_pgid=$$"));
1281 assert!(command.contains("setsid sh -c"));
1282 assert!(command.contains("shim_pid=\"$1\""));
1283 assert!(command.contains("agent_pgid=\"$2\""));
1284 assert!(command.contains("agent_root_pid=\"$3\""));
1285 assert!(command.contains("collect_descendants()"));
1286 assert!(command.contains("pgrep -P \"$parent_pid\""));
1287 assert!(command.contains("descendant_pids=$(collect_descendants \"$agent_root_pid\")"));
1288 assert!(command.contains("kill -TERM -- -\"$agent_pgid\""));
1289 assert!(command.contains("kill -TERM \"$descendant_pid\""));
1290 assert!(command.contains("kill -KILL -- -\"$agent_pgid\""));
1291 assert!(command.contains("kill -KILL \"$descendant_pid\""));
1292 assert!(command.contains("' _ \"$shim_pid\" \"$agent_pgid\" \"$agent_root_pid\""));
1293 assert!(command.contains("exec bash -lc 'kiro-cli chat '\\''hello'\\'''"));
1294 }
1295
1296 #[test]
1297 fn is_prompt_line_shell_dollar() {
1298 assert!(is_prompt_line("user@host:~$ "));
1299 assert!(is_prompt_line("$"));
1300 }
1301
1302 #[test]
1303 fn is_prompt_line_claude() {
1304 assert!(is_prompt_line("\u{276F}"));
1305 assert!(is_prompt_line("\u{276F} "));
1306 }
1307
1308 #[test]
1309 fn is_prompt_line_codex() {
1310 assert!(is_prompt_line("\u{203A}"));
1311 assert!(is_prompt_line("\u{203A} "));
1312 }
1313
1314 #[test]
1315 fn is_prompt_line_kiro() {
1316 assert!(is_prompt_line("Kiro>"));
1317 assert!(is_prompt_line(">"));
1318 }
1319
1320 #[test]
1321 fn is_prompt_line_not_prompt() {
1322 assert!(!is_prompt_line("hello world"));
1323 assert!(!is_prompt_line("some output here"));
1324 }
1325
1326 #[test]
1327 fn build_transition_events_working_to_idle() {
1328 let events = build_transition_events(
1329 ShimState::Working,
1330 ShimState::Idle,
1331 "summary",
1332 "pre\n$ ",
1333 "pre\nhello\n$ ",
1334 "",
1335 Some("msg-1".into()),
1336 );
1337 assert_eq!(events.len(), 2);
1338 assert!(matches!(&events[0], Event::StateChanged { .. }));
1339 assert!(matches!(&events[1], Event::Completion { .. }));
1340 }
1341
1342 #[test]
1343 fn build_transition_events_to_context_exhausted() {
1344 let events = build_transition_events(
1345 ShimState::Working,
1346 ShimState::ContextExhausted,
1347 "summary",
1348 "",
1349 "",
1350 "",
1351 None,
1352 );
1353 assert_eq!(events.len(), 2);
1355 assert!(matches!(&events[1], Event::ContextExhausted { .. }));
1356 }
1357
1358 #[test]
1359 fn build_transition_events_starting_to_idle() {
1360 let events = build_transition_events(
1361 ShimState::Starting,
1362 ShimState::Idle,
1363 "summary",
1364 "",
1365 "",
1366 "",
1367 None,
1368 );
1369 assert_eq!(events.len(), 1);
1370 assert!(matches!(&events[0], Event::StateChanged { .. }));
1371 }
1372
1373 fn make_queued_msg(id: &str, body: &str) -> QueuedMessage {
1378 QueuedMessage {
1379 from: "user".into(),
1380 body: body.into(),
1381 message_id: Some(id.into()),
1382 }
1383 }
1384
1385 #[test]
1386 fn queue_enqueue_basic() {
1387 let mut queue: VecDeque<QueuedMessage> = VecDeque::new();
1388 queue.push_back(make_queued_msg("m1", "hello"));
1389 queue.push_back(make_queued_msg("m2", "world"));
1390 assert_eq!(queue.len(), 2);
1391 }
1392
1393 #[test]
1394 fn queue_fifo_order() {
1395 let mut queue: VecDeque<QueuedMessage> = VecDeque::new();
1396 queue.push_back(make_queued_msg("m1", "first"));
1397 queue.push_back(make_queued_msg("m2", "second"));
1398 queue.push_back(make_queued_msg("m3", "third"));
1399
1400 let msg = queue.pop_front().unwrap();
1401 assert_eq!(msg.message_id.as_deref(), Some("m1"));
1402 assert_eq!(msg.body, "first");
1403
1404 let msg = queue.pop_front().unwrap();
1405 assert_eq!(msg.message_id.as_deref(), Some("m2"));
1406 assert_eq!(msg.body, "second");
1407
1408 let msg = queue.pop_front().unwrap();
1409 assert_eq!(msg.message_id.as_deref(), Some("m3"));
1410 assert_eq!(msg.body, "third");
1411
1412 assert!(queue.is_empty());
1413 }
1414
1415 #[test]
1416 fn queue_overflow_drops_oldest() {
1417 let mut queue: VecDeque<QueuedMessage> = VecDeque::new();
1418
1419 for i in 0..MAX_QUEUE_DEPTH {
1421 queue.push_back(make_queued_msg(&format!("m{i}"), &format!("msg {i}")));
1422 }
1423 assert_eq!(queue.len(), MAX_QUEUE_DEPTH);
1424
1425 assert!(queue.len() >= MAX_QUEUE_DEPTH);
1427 let dropped = queue.pop_front().unwrap();
1428 assert_eq!(dropped.message_id.as_deref(), Some("m0")); queue.push_back(make_queued_msg("m_new", "new message"));
1430 assert_eq!(queue.len(), MAX_QUEUE_DEPTH);
1431
1432 let first = queue.pop_front().unwrap();
1434 assert_eq!(first.message_id.as_deref(), Some("m1"));
1435 }
1436
1437 #[test]
1438 fn drain_queue_errors_empty() {
1439 let mut queue: VecDeque<QueuedMessage> = VecDeque::new();
1440 let events = drain_queue_errors(&mut queue, ShimState::Dead);
1441 assert!(events.is_empty());
1442 }
1443
1444 #[test]
1445 fn drain_queue_errors_with_messages() {
1446 let mut queue: VecDeque<QueuedMessage> = VecDeque::new();
1447 queue.push_back(make_queued_msg("m1", "hello"));
1448 queue.push_back(make_queued_msg("m2", "world"));
1449 queue.push_back(QueuedMessage {
1450 from: "user".into(),
1451 body: "no id".into(),
1452 message_id: None,
1453 });
1454
1455 let events = drain_queue_errors(&mut queue, ShimState::Dead);
1456 assert_eq!(events.len(), 3);
1457 assert!(queue.is_empty());
1458
1459 for event in &events {
1461 assert!(matches!(event, Event::Error { .. }));
1462 }
1463
1464 if let Event::Error { reason, .. } = &events[0] {
1466 assert!(reason.contains("dead"));
1467 assert!(reason.contains("m1"));
1468 }
1469
1470 if let Event::Error { reason, .. } = &events[2] {
1472 assert!(!reason.contains("(id:"));
1473 }
1474 }
1475
1476 #[test]
1477 fn drain_queue_errors_context_exhausted() {
1478 let mut queue: VecDeque<QueuedMessage> = VecDeque::new();
1479 queue.push_back(make_queued_msg("m1", "hello"));
1480
1481 let events = drain_queue_errors(&mut queue, ShimState::ContextExhausted);
1482 assert_eq!(events.len(), 1);
1483 if let Event::Error { reason, .. } = &events[0] {
1484 assert!(reason.contains("context_exhausted"));
1485 }
1486 }
1487
1488 #[test]
1489 fn queued_message_preserves_fields() {
1490 let msg = QueuedMessage {
1491 from: "manager".into(),
1492 body: "do this task".into(),
1493 message_id: Some("msg-42".into()),
1494 };
1495 assert_eq!(msg.from, "manager");
1496 assert_eq!(msg.body, "do this task");
1497 assert_eq!(msg.message_id.as_deref(), Some("msg-42"));
1498 }
1499
1500 #[test]
1501 fn queued_message_none_id() {
1502 let msg = QueuedMessage {
1503 from: "user".into(),
1504 body: "anonymous".into(),
1505 message_id: None,
1506 };
1507 assert!(msg.message_id.is_none());
1508 }
1509
1510 #[test]
1511 fn max_queue_depth_is_16() {
1512 assert_eq!(MAX_QUEUE_DEPTH, 16);
1513 }
1514
1515 #[test]
1516 fn format_injected_message_includes_sender_and_reply_target() {
1517 let formatted = format_injected_message("human", "what is 2+2?");
1518 assert!(formatted.contains("--- Message from human ---"));
1519 assert!(formatted.contains("Reply-To: human"));
1520 assert!(formatted.contains("batty send human"));
1521 assert!(formatted.ends_with("what is 2+2?"));
1522 }
1523
1524 #[test]
1525 fn format_injected_message_uses_sender_as_reply_target() {
1526 let formatted = format_injected_message("manager", "status?");
1527 assert!(formatted.contains("Reply-To: manager"));
1528 assert!(formatted.contains("batty send manager"));
1529 }
1530
1531 #[test]
1532 fn sanitize_summary_strips_tui_chrome_and_prompt_lines() {
1533 let summary = "────────────────────\n❯ \n ⏵⏵ bypass permissions on\nThe answer is 4\n";
1534 assert_eq!(sanitize_summary(summary), "The answer is 4");
1535 }
1536
1537 #[test]
1538 fn sanitize_summary_keeps_multiline_meaningful_content() {
1539 let summary = " Root cause: stale resume id\n\n Fix: retry with fresh start\n";
1540 assert_eq!(
1541 sanitize_summary(summary),
1542 "Root cause: stale resume id\nFix: retry with fresh start"
1543 );
1544 }
1545
1546 #[test]
1551 fn is_tui_chrome_horizontal_rule() {
1552 assert!(is_tui_chrome("────────────────────────────────────"));
1553 assert!(is_tui_chrome(" ───────── "));
1554 assert!(is_tui_chrome("━━━━━━━━━━━━━━━━━━━━"));
1555 }
1556
1557 #[test]
1558 fn is_tui_chrome_status_bar() {
1559 assert!(is_tui_chrome(
1560 " \u{23F5}\u{23F5} bypass permissions on (shift+tab to toggle)"
1561 ));
1562 assert!(is_tui_chrome(" bypass permissions on"));
1563 assert!(is_tui_chrome(" shift+tab"));
1564 }
1565
1566 #[test]
1567 fn is_tui_chrome_cost_line() {
1568 assert!(is_tui_chrome("$0.01 · 2.3k tokens"));
1569 }
1570
1571 #[test]
1572 fn is_tui_chrome_not_content() {
1573 assert!(!is_tui_chrome("Hello, world!"));
1574 assert!(!is_tui_chrome("The answer is 4"));
1575 assert!(!is_tui_chrome("")); assert!(!is_tui_chrome(" some output "));
1577 }
1578
1579 #[test]
1580 fn extract_response_strips_chrome() {
1581 let pre = "idle screen\n\u{276F} ";
1582 let cur = "\u{276F} Hello\n\nThe answer is 42\n\n\
1583 ────────────────────\n\
1584 \u{23F5}\u{23F5} bypass permissions on\n\
1585 \u{276F} ";
1586 let resp = extract_response(pre, cur);
1587 assert!(resp.contains("42"), "should contain the answer: {resp}");
1588 assert!(
1589 !resp.contains("────"),
1590 "should strip horizontal rule: {resp}"
1591 );
1592 assert!(!resp.contains("bypass"), "should strip status bar: {resp}");
1593 }
1594
1595 #[test]
1596 fn extract_response_strips_echoed_input() {
1597 let pre = "\u{276F} ";
1598 let cur = "\u{276F} What is 2+2?\n\n4\n\n\u{276F} ";
1599 let resp = extract_response(pre, cur);
1600 assert!(resp.contains('4'), "should contain answer: {resp}");
1601 assert!(
1602 !resp.contains("What is 2+2"),
1603 "should strip echoed input: {resp}"
1604 );
1605 }
1606
1607 #[test]
1608 fn extract_response_tui_full_rewrite() {
1609 let pre = "Welcome to Claude\n\n\u{276F} ";
1611 let cur = "\u{276F} Hello\n\nHello! How can I help?\n\n\
1612 ────────────────────\n\
1613 \u{276F} ";
1614 let resp = extract_response(pre, cur);
1615 assert!(
1616 resp.contains("Hello! How can I help?"),
1617 "should extract response from TUI rewrite: {resp}"
1618 );
1619 }
1620
1621 #[test]
1622 fn strip_claude_bullets_removes_marker() {
1623 assert_eq!(strip_claude_bullets("\u{23FA} 4"), "4");
1624 assert_eq!(
1625 strip_claude_bullets(" \u{23FA} hello world"),
1626 " hello world"
1627 );
1628 assert_eq!(strip_claude_bullets("no bullet here"), "no bullet here");
1629 assert_eq!(strip_claude_bullets(""), "");
1630 }
1631
1632 #[test]
1633 fn extract_response_strips_claude_bullets() {
1634 let pre = "\u{276F} ";
1635 let cur = "\u{276F} question\n\n\u{23FA} 42\n\n\u{276F} ";
1636 let resp = extract_response(pre, cur);
1637 assert!(resp.contains("42"), "should contain answer: {resp}");
1638 assert!(
1639 !resp.contains('\u{23FA}'),
1640 "should strip bullet marker: {resp}"
1641 );
1642 }
1643}