Skip to main content

batty_cli/shim/
runtime_kiro.rs

1//! Kiro ACP-mode shim runtime: communicates with Kiro CLI via JSON-RPC 2.0
2//! on stdin/stdout using the Agent Client Protocol (ACP).
3//!
4//! Like the Claude SDK runtime (`runtime_sdk.rs`), this uses a persistent
5//! subprocess with bidirectional NDJSON. The protocol differs: ACP requires
6//! an initialization handshake (`initialize` + `session/new`) before prompts
7//! can be sent, and uses JSON-RPC 2.0 framing.
8//!
9//! Emits the same `Command`/`Event` protocol to the orchestrator as all other
10//! runtimes, making it transparent to upstream consumers.
11
12use std::collections::VecDeque;
13use std::io::{BufRead, BufReader, Write as IoWrite};
14use std::process::{Child, Command, Stdio};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Mutex};
17use std::thread;
18use std::time::{Duration, Instant};
19
20use anyhow::{Context, Result};
21
22use super::common::{
23    self, MAX_QUEUE_DEPTH, QueuedMessage, SESSION_STATS_INTERVAL_SECS, drain_queue_errors,
24    format_injected_message,
25};
26use super::kiro_types::{self, AcpMessage};
27use super::protocol::{Channel, Command as ShimCommand, Event, ShimState};
28use super::pty_log::PtyLogWriter;
29use super::runtime::ShimArgs;
30
31// ---------------------------------------------------------------------------
32// Configuration
33// ---------------------------------------------------------------------------
34
35const PROCESS_EXIT_POLL_MS: u64 = 100;
36const GROUP_TERM_GRACE_SECS: u64 = 2;
37
38/// Timeout for the initialization handshake (initialize + session/new).
39const INIT_TIMEOUT_SECS: u64 = 30;
40
41/// Context usage percentage threshold to consider context exhausted.
42const CONTEXT_EXHAUSTION_THRESHOLD: f64 = 98.0;
43
44// ---------------------------------------------------------------------------
45// Shared state
46// ---------------------------------------------------------------------------
47
48struct KiroState {
49    state: ShimState,
50    state_changed_at: Instant,
51    started_at: Instant,
52    /// ACP session ID from `session/new` response.
53    session_id: String,
54    /// Accumulated assistant response text for the current turn.
55    accumulated_response: String,
56    /// Message ID of the currently pending (in-flight) message.
57    pending_message_id: Option<String>,
58    /// Messages queued while the agent is in Working state.
59    message_queue: VecDeque<QueuedMessage>,
60    /// Total bytes of response text received.
61    cumulative_output_bytes: u64,
62    /// Whether the initialization handshake is complete.
63    initialized: bool,
64    /// Whether we've sent the session/new request (awaiting its response).
65    sent_session_new: bool,
66    /// Pending prompt request ID (to match result).
67    pending_prompt_request_id: Option<u64>,
68    /// Whether a ContextApproaching event has already been emitted this session.
69    context_approaching_emitted: bool,
70}
71
72/// Monotonically increasing JSON-RPC request ID counter.
73static REQUEST_ID: AtomicU64 = AtomicU64::new(0);
74
75fn next_request_id() -> u64 {
76    REQUEST_ID.fetch_add(1, Ordering::Relaxed)
77}
78
79// ---------------------------------------------------------------------------
80// Main entry point
81// ---------------------------------------------------------------------------
82
83/// Run the Kiro ACP-mode shim. Does not return until the shim exits.
84///
85/// `channel` is the pre-connected socket to the orchestrator (fd 3 or socketpair).
86/// `args.cmd` must launch `kiro-cli acp --trust-all-tools`.
87pub fn run_kiro_acp(args: ShimArgs, channel: Channel) -> Result<()> {
88    // -- Spawn subprocess with piped stdin/stdout/stderr --
89    let mut child = Command::new("bash")
90        .args(["-lc", &args.cmd])
91        .current_dir(&args.cwd)
92        .stdin(Stdio::piped())
93        .stdout(Stdio::piped())
94        .stderr(Stdio::piped())
95        .env_remove("CLAUDECODE")
96        .spawn()
97        .with_context(|| format!("[shim-kiro {}] failed to spawn kiro-cli acp", args.id))?;
98
99    let child_pid = child.id();
100    eprintln!(
101        "[shim-kiro {}] spawned kiro-cli acp (pid {})",
102        args.id, child_pid
103    );
104
105    let child_stdin = child.stdin.take().context("failed to take child stdin")?;
106    let child_stdout = child.stdout.take().context("failed to take child stdout")?;
107    let child_stderr = child.stderr.take().context("failed to take child stderr")?;
108
109    // Shared state
110    let state = Arc::new(Mutex::new(KiroState {
111        state: ShimState::Starting,
112        state_changed_at: Instant::now(),
113        started_at: Instant::now(),
114        session_id: String::new(),
115        accumulated_response: String::new(),
116        pending_message_id: None,
117        message_queue: VecDeque::new(),
118        cumulative_output_bytes: 0,
119        initialized: false,
120        sent_session_new: false,
121        pending_prompt_request_id: None,
122        context_approaching_emitted: false,
123    }));
124
125    // Shared stdin writer — wrapped in Option so Shutdown can take and close it.
126    let stdin_writer = Arc::new(Mutex::new(Some(child_stdin)));
127
128    // PTY log writer (optional)
129    let pty_log: Option<Arc<Mutex<PtyLogWriter>>> = args
130        .pty_log_path
131        .as_deref()
132        .map(|p| PtyLogWriter::new(p).context("failed to create PTY log"))
133        .transpose()?
134        .map(|w| Arc::new(Mutex::new(w)));
135
136    // Channel clones
137    let mut cmd_channel = channel;
138    let mut evt_channel = cmd_channel
139        .try_clone()
140        .context("failed to clone channel for stdout reader")?;
141
142    // -- Send initialization handshake --
143    {
144        let init_req = kiro_types::initialize_request(next_request_id());
145        let ndjson = init_req.to_ndjson();
146        write_stdin(&stdin_writer, &ndjson);
147        eprintln!("[shim-kiro {}] sent initialize request", args.id);
148    }
149
150    // -- stdout reader thread --
151    let state_stdout = Arc::clone(&state);
152    let stdin_for_approve = Arc::clone(&stdin_writer);
153    let pty_log_stdout = pty_log.clone();
154    let shim_id = args.id.clone();
155    let cwd_for_init = args.cwd.to_string_lossy().to_string();
156    let stdout_handle = thread::spawn(move || {
157        let reader = BufReader::new(child_stdout);
158        for line_result in reader.lines() {
159            let line = match line_result {
160                Ok(l) => l,
161                Err(e) => {
162                    eprintln!("[shim-kiro {shim_id}] stdout read error: {e}");
163                    break;
164                }
165            };
166
167            if line.trim().is_empty() {
168                continue;
169            }
170
171            let msg: AcpMessage = match serde_json::from_str(&line) {
172                Ok(m) => m,
173                Err(e) => {
174                    eprintln!("[shim-kiro {shim_id}] ignoring unparseable NDJSON: {e}");
175                    continue;
176                }
177            };
178
179            // -- Handle responses to our requests --
180            if msg.is_response() {
181                let msg_id = msg.id.unwrap();
182
183                if let Some(ref error) = msg.error {
184                    eprintln!("[shim-kiro {shim_id}] JSON-RPC error (id={msg_id}): {error}");
185                    // Check if this was a prompt request that failed
186                    let mut st = state_stdout.lock().unwrap();
187                    if st.pending_prompt_request_id == Some(msg_id) {
188                        st.pending_prompt_request_id = None;
189                        let error_text = error
190                            .get("message")
191                            .and_then(|m| m.as_str())
192                            .unwrap_or("unknown error");
193
194                        if common::detect_context_exhausted(error_text) {
195                            let last_lines = last_n_lines_of(&st.accumulated_response, 5);
196                            let old = st.state;
197                            st.state = ShimState::ContextExhausted;
198                            st.state_changed_at = Instant::now();
199                            let drain = drain_queue_errors(
200                                &mut st.message_queue,
201                                ShimState::ContextExhausted,
202                            );
203                            drop(st);
204                            let _ = evt_channel.send(&Event::StateChanged {
205                                from: old,
206                                to: ShimState::ContextExhausted,
207                                summary: last_lines.clone(),
208                            });
209                            let _ = evt_channel.send(&Event::ContextExhausted {
210                                message: error_text.to_string(),
211                                last_lines,
212                            });
213                            for event in drain {
214                                let _ = evt_channel.send(&event);
215                            }
216                        } else {
217                            // Non-exhaustion error on prompt — complete with error
218                            let response = std::mem::take(&mut st.accumulated_response);
219                            let last_lines = last_n_lines_of(&response, 5);
220                            let msg_id_out = st.pending_message_id.take();
221                            st.state = ShimState::Idle;
222                            st.state_changed_at = Instant::now();
223                            drop(st);
224                            let _ = evt_channel.send(&Event::StateChanged {
225                                from: ShimState::Working,
226                                to: ShimState::Idle,
227                                summary: last_lines.clone(),
228                            });
229                            let _ = evt_channel.send(&Event::Completion {
230                                message_id: msg_id_out,
231                                response: format!("[error] {error_text}"),
232                                last_lines,
233                            });
234                        }
235                    }
236                    continue;
237                }
238
239                if let Some(ref result) = msg.result {
240                    let mut st = state_stdout.lock().unwrap();
241
242                    // Check if this is the initialize response (before session/new was sent)
243                    if !st.initialized && !st.sent_session_new {
244                        // This is the initialize response — now send session/new
245                        st.sent_session_new = true;
246                        drop(st);
247                        let session_req =
248                            kiro_types::session_new_request(next_request_id(), &cwd_for_init);
249                        let ndjson = session_req.to_ndjson();
250                        write_stdin(&stdin_for_approve, &ndjson);
251                        eprintln!("[shim-kiro {shim_id}] sent session/new request");
252                        continue;
253                    }
254
255                    // Check if this is the session/new response
256                    if !st.initialized {
257                        if let Some(sid) = kiro_types::extract_session_id(result) {
258                            st.session_id = sid.to_string();
259                            st.initialized = true;
260                            st.state = ShimState::Idle;
261                            st.state_changed_at = Instant::now();
262                            eprintln!("[shim-kiro {shim_id}] session created: {}", st.session_id);
263                            drop(st);
264
265                            // Now emit Ready — agent is ready for messages
266                            let _ = evt_channel.send(&Event::Ready);
267                        }
268                        continue;
269                    }
270
271                    // Check if this is a prompt result (turn completed)
272                    if st.pending_prompt_request_id == Some(msg_id) {
273                        st.pending_prompt_request_id = None;
274                        let response = if st.accumulated_response.is_empty() {
275                            result
276                                .get("result")
277                                .and_then(|r| r.as_str())
278                                .unwrap_or("")
279                                .to_string()
280                        } else {
281                            std::mem::take(&mut st.accumulated_response)
282                        };
283                        let last_lines = last_n_lines_of(&response, 5);
284                        let completed_msg_id = st.pending_message_id.take();
285                        let old = st.state;
286                        st.state = ShimState::Idle;
287                        st.state_changed_at = Instant::now();
288
289                        // Drain queue
290                        let queued_msg = if !st.message_queue.is_empty() {
291                            st.message_queue.pop_front()
292                        } else {
293                            None
294                        };
295                        if let Some(ref qm) = queued_msg {
296                            st.pending_message_id = qm.message_id.clone();
297                            st.state = ShimState::Working;
298                            st.state_changed_at = Instant::now();
299                            st.accumulated_response.clear();
300                        }
301                        let session_id = st.session_id.clone();
302                        let queue_depth = st.message_queue.len();
303                        drop(st);
304
305                        let _ = evt_channel.send(&Event::StateChanged {
306                            from: old,
307                            to: ShimState::Idle,
308                            summary: last_lines.clone(),
309                        });
310                        let _ = evt_channel.send(&Event::Completion {
311                            message_id: completed_msg_id,
312                            response,
313                            last_lines,
314                        });
315
316                        // Inject queued message
317                        if let Some(qm) = queued_msg {
318                            let text = format_injected_message(&qm.from, &qm.body);
319                            let req_id = next_request_id();
320                            let prompt_req =
321                                kiro_types::session_prompt_request(req_id, &session_id, &text);
322                            let ndjson = prompt_req.to_ndjson();
323                            write_stdin(&stdin_for_approve, &ndjson);
324                            let mut st = state_stdout.lock().unwrap();
325                            st.pending_prompt_request_id = Some(req_id);
326                            drop(st);
327
328                            let _ = evt_channel.send(&Event::StateChanged {
329                                from: ShimState::Idle,
330                                to: ShimState::Working,
331                                summary: format!(
332                                    "delivering queued message ({queue_depth} remaining)"
333                                ),
334                            });
335                        }
336                    }
337                }
338                continue;
339            }
340
341            // -- Handle notifications --
342            if msg.is_notification() {
343                let method = msg.method.as_deref().unwrap_or("");
344                let params = msg.params.as_ref();
345
346                match method {
347                    "session/update" => {
348                        if let Some(params) = params {
349                            let update_type = kiro_types::extract_update_type(params).unwrap_or("");
350
351                            match update_type {
352                                "agent_message_chunk" | "AgentMessageChunk" => {
353                                    if let Some(text) =
354                                        kiro_types::extract_message_chunk_text(params)
355                                    {
356                                        if !text.is_empty() {
357                                            let mut st = state_stdout.lock().unwrap();
358                                            st.accumulated_response.push_str(text);
359                                            st.cumulative_output_bytes += text.len() as u64;
360
361                                            // Check for context approaching limit.
362                                            if !st.context_approaching_emitted
363                                                && common::detect_context_approaching_limit(text)
364                                            {
365                                                st.context_approaching_emitted = true;
366                                                drop(st);
367                                                let _ = evt_channel.send(&Event::ContextApproaching {
368                                                    message: "Agent output contains context-pressure signals".into(),
369                                                    input_tokens: 0,
370                                                    output_tokens: 0,
371                                                });
372                                            } else {
373                                                drop(st);
374                                            }
375
376                                            if let Some(ref log) = pty_log_stdout {
377                                                let _ = log.lock().unwrap().write(text.as_bytes());
378                                            }
379                                        }
380                                    }
381                                }
382
383                                "agent_thought_chunk" => {
384                                    // Agent thinking — log but don't accumulate
385                                    if let Some(text) =
386                                        kiro_types::extract_message_chunk_text(params)
387                                    {
388                                        if let Some(ref log) = pty_log_stdout {
389                                            let _ = log
390                                                .lock()
391                                                .unwrap()
392                                                .write(format!("[thought] {text}").as_bytes());
393                                        }
394                                    }
395                                }
396
397                                "tool_call" | "ToolCall" => {
398                                    // Log tool calls for visibility
399                                    let title = params
400                                        .get("update")
401                                        .and_then(|u| u.get("title"))
402                                        .and_then(|t| t.as_str())
403                                        .unwrap_or("unknown tool");
404                                    if let Some(ref log) = pty_log_stdout {
405                                        let _ = log
406                                            .lock()
407                                            .unwrap()
408                                            .write(format!("[tool] {title}\n").as_bytes());
409                                    }
410                                }
411
412                                "tool_call_update" | "ToolCallUpdate" => {
413                                    // Tool progress — no action needed
414                                }
415
416                                "TurnEnd" | "turn_end" => {
417                                    // Turn ended via notification.
418                                    // The prompt result response will handle the state transition,
419                                    // but some ACP agents send TurnEnd before the result.
420                                    // We do nothing here — the result handler above transitions state.
421                                }
422
423                                _ => {
424                                    // Unknown update type — silently ignore (future-proof)
425                                }
426                            }
427                        }
428                    }
429
430                    "_kiro.dev/metadata" => {
431                        // Check for context exhaustion via high usage percentage
432                        if let Some(params) = params {
433                            if let Some(usage) = kiro_types::extract_context_usage(params) {
434                                if usage >= CONTEXT_EXHAUSTION_THRESHOLD {
435                                    let mut st = state_stdout.lock().unwrap();
436                                    let last_lines = last_n_lines_of(&st.accumulated_response, 5);
437                                    let old = st.state;
438                                    st.state = ShimState::ContextExhausted;
439                                    st.state_changed_at = Instant::now();
440                                    let drain = drain_queue_errors(
441                                        &mut st.message_queue,
442                                        ShimState::ContextExhausted,
443                                    );
444                                    drop(st);
445
446                                    let _ = evt_channel.send(&Event::StateChanged {
447                                        from: old,
448                                        to: ShimState::ContextExhausted,
449                                        summary: last_lines.clone(),
450                                    });
451                                    let _ = evt_channel.send(&Event::ContextExhausted {
452                                        message: format!("context usage at {usage:.1}%"),
453                                        last_lines,
454                                    });
455                                    for event in drain {
456                                        let _ = evt_channel.send(&event);
457                                    }
458                                }
459                            }
460                        }
461                    }
462
463                    "_kiro.dev/compaction/status" | "_kiro.dev/clear/status" => {
464                        // Informational — log only
465                        eprintln!("[shim-kiro {shim_id}] {method}: {params:?}");
466                    }
467
468                    _ => {
469                        // Unknown notification — silently ignore
470                    }
471                }
472                continue;
473            }
474
475            // -- Handle agent-initiated requests --
476            if msg.is_agent_request() {
477                let method = msg.method.as_deref().unwrap_or("");
478                let request_id = msg.id.unwrap();
479
480                match method {
481                    "session/request_permission" => {
482                        // Auto-approve all permission requests
483                        let resp = kiro_types::permission_approve_response(request_id);
484                        let ndjson = resp.to_ndjson();
485                        write_stdin(&stdin_for_approve, &ndjson);
486                        eprintln!(
487                            "[shim-kiro {shim_id}] auto-approved permission request {request_id}"
488                        );
489                    }
490
491                    "fs/read_text_file" | "fs/write_text_file" | "terminal/create"
492                    | "terminal/kill" => {
493                        // Client-side operations — we don't support these, respond with error
494                        let error_resp = serde_json::json!({
495                            "jsonrpc": "2.0",
496                            "id": request_id,
497                            "error": {
498                                "code": -32601,
499                                "message": format!("method not supported by batty shim: {method}")
500                            }
501                        });
502                        let ndjson = serde_json::to_string(&error_resp).unwrap();
503                        write_stdin(&stdin_for_approve, &ndjson);
504                    }
505
506                    _ => {
507                        // Unknown agent request — respond with method not found
508                        let error_resp = serde_json::json!({
509                            "jsonrpc": "2.0",
510                            "id": request_id,
511                            "error": {
512                                "code": -32601,
513                                "message": format!("unknown method: {method}")
514                            }
515                        });
516                        let ndjson = serde_json::to_string(&error_resp).unwrap();
517                        write_stdin(&stdin_for_approve, &ndjson);
518                    }
519                }
520                continue;
521            }
522        }
523
524        // stdout EOF — agent process closed
525        let mut st = state_stdout.lock().unwrap();
526        let last_lines = last_n_lines_of(&st.accumulated_response, 10);
527        let old = st.state;
528        st.state = ShimState::Dead;
529        st.state_changed_at = Instant::now();
530
531        let drain = drain_queue_errors(&mut st.message_queue, ShimState::Dead);
532        drop(st);
533
534        let _ = evt_channel.send(&Event::StateChanged {
535            from: old,
536            to: ShimState::Dead,
537            summary: last_lines.clone(),
538        });
539        let _ = evt_channel.send(&Event::Died {
540            exit_code: None,
541            last_lines,
542        });
543        for event in drain {
544            let _ = evt_channel.send(&event);
545        }
546    });
547
548    // -- stderr reader thread --
549    let shim_id_err = args.id.clone();
550    let pty_log_stderr = pty_log;
551    thread::spawn(move || {
552        let reader = BufReader::new(child_stderr);
553        for line_result in reader.lines() {
554            match line_result {
555                Ok(line) => {
556                    eprintln!("[shim-kiro {shim_id_err}] stderr: {line}");
557                    if let Some(ref log) = pty_log_stderr {
558                        let _ = log
559                            .lock()
560                            .unwrap()
561                            .write(format!("[stderr] {line}\n").as_bytes());
562                    }
563                }
564                Err(_) => break,
565            }
566        }
567    });
568
569    // -- Session stats thread --
570    let state_stats = Arc::clone(&state);
571    let mut stats_channel = cmd_channel
572        .try_clone()
573        .context("failed to clone channel for stats")?;
574    thread::spawn(move || {
575        loop {
576            thread::sleep(Duration::from_secs(SESSION_STATS_INTERVAL_SECS));
577            let st = state_stats.lock().unwrap();
578            if st.state == ShimState::Dead {
579                return;
580            }
581            let output_bytes = st.cumulative_output_bytes;
582            let uptime_secs = st.started_at.elapsed().as_secs();
583            drop(st);
584
585            if stats_channel
586                .send(&Event::SessionStats {
587                    output_bytes,
588                    uptime_secs,
589                    input_tokens: 0,
590                    output_tokens: 0,
591                    context_usage_pct: None,
592                })
593                .is_err()
594            {
595                return;
596            }
597        }
598    });
599
600    // -- Wait for initialization to complete before accepting commands --
601    {
602        let deadline = Instant::now() + Duration::from_secs(INIT_TIMEOUT_SECS);
603        loop {
604            let st = state.lock().unwrap();
605            if st.initialized {
606                break;
607            }
608            if st.state == ShimState::Dead {
609                eprintln!("[shim-kiro {}] agent died during initialization", args.id);
610                return Ok(());
611            }
612            drop(st);
613
614            if Instant::now() > deadline {
615                eprintln!(
616                    "[shim-kiro {}] initialization timed out after {}s",
617                    args.id, INIT_TIMEOUT_SECS
618                );
619                terminate_child(&mut child);
620                return Ok(());
621            }
622            thread::sleep(Duration::from_millis(PROCESS_EXIT_POLL_MS));
623        }
624    }
625
626    // -- Command loop (main thread) --
627    let state_cmd = Arc::clone(&state);
628    loop {
629        let cmd = match cmd_channel.recv::<ShimCommand>() {
630            Ok(Some(c)) => c,
631            Ok(None) => {
632                eprintln!(
633                    "[shim-kiro {}] orchestrator disconnected, shutting down",
634                    args.id
635                );
636                terminate_child(&mut child);
637                break;
638            }
639            Err(e) => {
640                eprintln!("[shim-kiro {}] channel error: {e}", args.id);
641                terminate_child(&mut child);
642                break;
643            }
644        };
645
646        match cmd {
647            ShimCommand::SendMessage {
648                from,
649                body,
650                message_id,
651            } => {
652                let delivery_id = message_id.clone();
653                let mut st = state_cmd.lock().unwrap();
654                match st.state {
655                    ShimState::Idle => {
656                        st.pending_message_id = message_id;
657                        st.accumulated_response.clear();
658                        let session_id = st.session_id.clone();
659                        st.state = ShimState::Working;
660                        st.state_changed_at = Instant::now();
661
662                        let req_id = next_request_id();
663                        st.pending_prompt_request_id = Some(req_id);
664                        drop(st);
665
666                        let text = format_injected_message(&from, &body);
667                        let prompt_req =
668                            kiro_types::session_prompt_request(req_id, &session_id, &text);
669                        let ndjson = prompt_req.to_ndjson();
670
671                        if !write_stdin(&stdin_writer, &ndjson) {
672                            if let Some(id) = delivery_id {
673                                cmd_channel.send(&Event::DeliveryFailed {
674                                    id,
675                                    reason: "stdin write failed (closed)".into(),
676                                })?;
677                            }
678                            cmd_channel.send(&Event::Error {
679                                command: "SendMessage".into(),
680                                reason: "stdin write failed (closed)".into(),
681                            })?;
682                            continue;
683                        }
684
685                        if let Some(id) = delivery_id {
686                            cmd_channel.send(&Event::MessageDelivered { id })?;
687                        }
688                        cmd_channel.send(&Event::StateChanged {
689                            from: ShimState::Idle,
690                            to: ShimState::Working,
691                            summary: String::new(),
692                        })?;
693                    }
694                    ShimState::Working => {
695                        // Queue the message
696                        if st.message_queue.len() >= MAX_QUEUE_DEPTH {
697                            let dropped = st.message_queue.pop_front();
698                            let dropped_id = dropped.as_ref().and_then(|m| m.message_id.clone());
699                            st.message_queue.push_back(QueuedMessage {
700                                from,
701                                body,
702                                message_id,
703                            });
704                            let depth = st.message_queue.len();
705                            drop(st);
706
707                            cmd_channel.send(&Event::Error {
708                                command: "SendMessage".into(),
709                                reason: format!(
710                                    "message queue full ({MAX_QUEUE_DEPTH}), dropped oldest message{}",
711                                    dropped_id
712                                        .map(|id| format!(" (id: {id})"))
713                                        .unwrap_or_default(),
714                                ),
715                            })?;
716                            cmd_channel.send(&Event::Warning {
717                                message: format!(
718                                    "message queued while agent working (depth: {depth})"
719                                ),
720                                idle_secs: None,
721                            })?;
722                        } else {
723                            st.message_queue.push_back(QueuedMessage {
724                                from,
725                                body,
726                                message_id,
727                            });
728                            let depth = st.message_queue.len();
729                            drop(st);
730
731                            cmd_channel.send(&Event::Warning {
732                                message: format!(
733                                    "message queued while agent working (depth: {depth})"
734                                ),
735                                idle_secs: None,
736                            })?;
737                        }
738                    }
739                    other => {
740                        drop(st);
741                        cmd_channel.send(&Event::Error {
742                            command: "SendMessage".into(),
743                            reason: format!("agent in {other} state, cannot accept message"),
744                        })?;
745                    }
746                }
747            }
748
749            ShimCommand::CaptureScreen { last_n_lines } => {
750                let st = state_cmd.lock().unwrap();
751                let content = match last_n_lines {
752                    Some(n) => last_n_lines_of(&st.accumulated_response, n),
753                    None => st.accumulated_response.clone(),
754                };
755                drop(st);
756                cmd_channel.send(&Event::ScreenCapture {
757                    content,
758                    cursor_row: 0,
759                    cursor_col: 0,
760                })?;
761            }
762
763            ShimCommand::GetState => {
764                let st = state_cmd.lock().unwrap();
765                let since = st.state_changed_at.elapsed().as_secs();
766                let state = st.state;
767                drop(st);
768                cmd_channel.send(&Event::State {
769                    state,
770                    since_secs: since,
771                })?;
772            }
773
774            ShimCommand::Resize { .. } => {
775                // No-op in ACP mode — no PTY.
776            }
777
778            ShimCommand::Ping => {
779                cmd_channel.send(&Event::Pong)?;
780            }
781
782            ShimCommand::Shutdown {
783                timeout_secs,
784                reason,
785            } => {
786                eprintln!(
787                    "[shim-kiro {}] shutdown requested ({}, timeout: {}s)",
788                    args.id,
789                    reason.label(),
790                    timeout_secs
791                );
792                if let Err(error) = super::runtime::preserve_work_before_kill(&args.cwd) {
793                    eprintln!("[shim-kiro {}] work preservation failed: {error}", args.id);
794                }
795                // Take stdin out of the shared Option to truly close it.
796                // The stdout reader thread also holds an Arc clone, but taking
797                // from the Option means both sides see None.
798                if let Ok(mut guard) = stdin_writer.lock() {
799                    guard.take(); // closes ChildStdin
800                }
801                terminate_child(&mut child);
802
803                // Give the child a moment to fully exit.
804                let deadline = Instant::now() + Duration::from_secs(timeout_secs as u64);
805                loop {
806                    if Instant::now() > deadline {
807                        break;
808                    }
809                    match child.try_wait() {
810                        Ok(Some(_)) => break,
811                        _ => thread::sleep(Duration::from_millis(PROCESS_EXIT_POLL_MS)),
812                    }
813                }
814                break;
815            }
816
817            ShimCommand::Kill => {
818                if let Err(error) = super::runtime::preserve_work_before_kill(&args.cwd) {
819                    eprintln!("[shim-kiro {}] work preservation failed: {error}", args.id);
820                }
821                terminate_child(&mut child);
822                break;
823            }
824        }
825    }
826
827    stdout_handle.join().ok();
828    Ok(())
829}
830
831// ---------------------------------------------------------------------------
832// Helpers
833// ---------------------------------------------------------------------------
834
835/// Write an NDJSON line to the shared stdin, if it's still open.
836fn write_stdin(stdin: &Arc<Mutex<Option<std::process::ChildStdin>>>, line: &str) -> bool {
837    if let Ok(mut guard) = stdin.lock() {
838        if let Some(ref mut writer) = *guard {
839            if writeln!(writer, "{line}").is_ok() {
840                let _ = writer.flush();
841                return true;
842            }
843        }
844    }
845    false
846}
847
848/// Terminate a child process: SIGTERM, grace period, then SIGKILL.
849fn terminate_child(child: &mut Child) {
850    let pid = child.id();
851
852    #[cfg(unix)]
853    {
854        unsafe {
855            libc::kill(pid as i32, libc::SIGTERM);
856        }
857        let deadline = Instant::now() + Duration::from_secs(GROUP_TERM_GRACE_SECS);
858        loop {
859            if Instant::now() > deadline {
860                break;
861            }
862            match child.try_wait() {
863                Ok(Some(_)) => return,
864                _ => thread::sleep(Duration::from_millis(PROCESS_EXIT_POLL_MS)),
865            }
866        }
867        unsafe {
868            libc::kill(pid as i32, libc::SIGKILL);
869        }
870    }
871
872    #[allow(unreachable_code)]
873    {
874        let _ = child.kill();
875    }
876}
877
878/// Extract the last N lines from a string.
879fn last_n_lines_of(text: &str, n: usize) -> String {
880    let lines: Vec<&str> = text.lines().collect();
881    let start = lines.len().saturating_sub(n);
882    lines[start..].join("\n")
883}
884
885// ---------------------------------------------------------------------------
886// Tests
887// ---------------------------------------------------------------------------
888
889#[cfg(test)]
890mod tests {
891    use super::*;
892    use crate::shim::protocol;
893
894    #[test]
895    fn last_n_lines_basic() {
896        let text = "a\nb\nc\nd\ne";
897        assert_eq!(last_n_lines_of(text, 3), "c\nd\ne");
898        assert_eq!(last_n_lines_of(text, 10), "a\nb\nc\nd\ne");
899        assert_eq!(last_n_lines_of(text, 0), "");
900    }
901
902    #[test]
903    fn last_n_lines_empty() {
904        assert_eq!(last_n_lines_of("", 5), "");
905    }
906
907    #[test]
908    fn kiro_state_initial_values() {
909        let st = KiroState {
910            state: ShimState::Starting,
911            state_changed_at: Instant::now(),
912            started_at: Instant::now(),
913            session_id: String::new(),
914            accumulated_response: String::new(),
915            pending_message_id: None,
916            message_queue: VecDeque::new(),
917            cumulative_output_bytes: 0,
918            initialized: false,
919            sent_session_new: false,
920            pending_prompt_request_id: None,
921            context_approaching_emitted: false,
922        };
923        assert_eq!(st.state, ShimState::Starting);
924        assert!(st.session_id.is_empty());
925        assert!(!st.initialized);
926        assert!(!st.sent_session_new);
927        assert!(st.message_queue.is_empty());
928    }
929
930    #[test]
931    fn channel_round_trip_events() {
932        let (parent_sock, child_sock) = protocol::socketpair().unwrap();
933        let mut parent = protocol::Channel::new(parent_sock);
934        let mut child = protocol::Channel::new(child_sock);
935
936        child.send(&Event::Ready).unwrap();
937        let event: Event = parent.recv().unwrap().unwrap();
938        assert!(matches!(event, Event::Ready));
939
940        child
941            .send(&Event::Completion {
942                message_id: Some("m1".into()),
943                response: "done".into(),
944                last_lines: "done".into(),
945            })
946            .unwrap();
947        let event: Event = parent.recv().unwrap().unwrap();
948        match event {
949            Event::Completion {
950                message_id,
951                response,
952                ..
953            } => {
954                assert_eq!(message_id.as_deref(), Some("m1"));
955                assert_eq!(response, "done");
956            }
957            _ => panic!("expected Completion"),
958        }
959    }
960
961    #[test]
962    fn context_exhaustion_threshold() {
963        let threshold = CONTEXT_EXHAUSTION_THRESHOLD;
964        assert!(threshold >= 95.0);
965        assert!(threshold <= 100.0);
966    }
967
968    #[test]
969    fn next_request_id_increments() {
970        let a = next_request_id();
971        let b = next_request_id();
972        assert!(b > a);
973    }
974}