Skip to main content

batty_cli/shim/
runtime_kiro.rs

1//! Kiro ACP-mode shim runtime: communicates with Kiro CLI via JSON-RPC 2.0
2//! on stdin/stdout using the Agent Client Protocol (ACP).
3//!
4//! Like the Claude SDK runtime (`runtime_sdk.rs`), this uses a persistent
5//! subprocess with bidirectional NDJSON. The protocol differs: ACP requires
6//! an initialization handshake (`initialize` + `session/new`) before prompts
7//! can be sent, and uses JSON-RPC 2.0 framing.
8//!
9//! Emits the same `Command`/`Event` protocol to the orchestrator as all other
10//! runtimes, making it transparent to upstream consumers.
11
12use std::collections::VecDeque;
13use std::io::{BufRead, BufReader, Write as IoWrite};
14use std::process::{Child, Command, Stdio};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Mutex};
17use std::thread;
18use std::time::{Duration, Instant};
19
20use anyhow::{Context, Result};
21
22use super::common::{
23    self, MAX_QUEUE_DEPTH, QueuedMessage, SESSION_STATS_INTERVAL_SECS, drain_queue_errors,
24    format_injected_message,
25};
26use super::kiro_types::{self, AcpMessage};
27use super::protocol::{Channel, Command as ShimCommand, Event, ShimState};
28use super::pty_log::PtyLogWriter;
29use super::runtime::ShimArgs;
30
31// ---------------------------------------------------------------------------
32// Configuration
33// ---------------------------------------------------------------------------
34
35const PROCESS_EXIT_POLL_MS: u64 = 100;
36const GROUP_TERM_GRACE_SECS: u64 = 2;
37
38/// Timeout for the initialization handshake (initialize + session/new).
39const INIT_TIMEOUT_SECS: u64 = 30;
40
41/// Context usage percentage threshold to consider context exhausted.
42const CONTEXT_EXHAUSTION_THRESHOLD: f64 = 98.0;
43
44// ---------------------------------------------------------------------------
45// Shared state
46// ---------------------------------------------------------------------------
47
48struct KiroState {
49    state: ShimState,
50    state_changed_at: Instant,
51    started_at: Instant,
52    /// ACP session ID from `session/new` response.
53    session_id: String,
54    /// Accumulated assistant response text for the current turn.
55    accumulated_response: String,
56    /// Message ID of the currently pending (in-flight) message.
57    pending_message_id: Option<String>,
58    /// Messages queued while the agent is in Working state.
59    message_queue: VecDeque<QueuedMessage>,
60    /// Total bytes of response text received.
61    cumulative_output_bytes: u64,
62    /// Whether the initialization handshake is complete.
63    initialized: bool,
64    /// Whether we've sent the session/new request (awaiting its response).
65    sent_session_new: bool,
66    /// Pending prompt request ID (to match result).
67    pending_prompt_request_id: Option<u64>,
68}
69
70/// Monotonically increasing JSON-RPC request ID counter.
71static REQUEST_ID: AtomicU64 = AtomicU64::new(0);
72
73fn next_request_id() -> u64 {
74    REQUEST_ID.fetch_add(1, Ordering::Relaxed)
75}
76
77// ---------------------------------------------------------------------------
78// Main entry point
79// ---------------------------------------------------------------------------
80
81/// Run the Kiro ACP-mode shim. Does not return until the shim exits.
82///
83/// `channel` is the pre-connected socket to the orchestrator (fd 3 or socketpair).
84/// `args.cmd` must launch `kiro-cli acp --trust-all-tools`.
85pub fn run_kiro_acp(args: ShimArgs, channel: Channel) -> Result<()> {
86    // -- Spawn subprocess with piped stdin/stdout/stderr --
87    let mut child = Command::new("bash")
88        .args(["-lc", &args.cmd])
89        .current_dir(&args.cwd)
90        .stdin(Stdio::piped())
91        .stdout(Stdio::piped())
92        .stderr(Stdio::piped())
93        .env_remove("CLAUDECODE")
94        .spawn()
95        .with_context(|| format!("[shim-kiro {}] failed to spawn kiro-cli acp", args.id))?;
96
97    let child_pid = child.id();
98    eprintln!(
99        "[shim-kiro {}] spawned kiro-cli acp (pid {})",
100        args.id, child_pid
101    );
102
103    let child_stdin = child.stdin.take().context("failed to take child stdin")?;
104    let child_stdout = child.stdout.take().context("failed to take child stdout")?;
105    let child_stderr = child.stderr.take().context("failed to take child stderr")?;
106
107    // Shared state
108    let state = Arc::new(Mutex::new(KiroState {
109        state: ShimState::Starting,
110        state_changed_at: Instant::now(),
111        started_at: Instant::now(),
112        session_id: String::new(),
113        accumulated_response: String::new(),
114        pending_message_id: None,
115        message_queue: VecDeque::new(),
116        cumulative_output_bytes: 0,
117        initialized: false,
118        sent_session_new: false,
119        pending_prompt_request_id: None,
120    }));
121
122    // Shared stdin writer — wrapped in Option so Shutdown can take and close it.
123    let stdin_writer = Arc::new(Mutex::new(Some(child_stdin)));
124
125    // PTY log writer (optional)
126    let pty_log: Option<Arc<Mutex<PtyLogWriter>>> = args
127        .pty_log_path
128        .as_deref()
129        .map(|p| PtyLogWriter::new(p).context("failed to create PTY log"))
130        .transpose()?
131        .map(|w| Arc::new(Mutex::new(w)));
132
133    // Channel clones
134    let mut cmd_channel = channel;
135    let mut evt_channel = cmd_channel
136        .try_clone()
137        .context("failed to clone channel for stdout reader")?;
138
139    // -- Send initialization handshake --
140    {
141        let init_req = kiro_types::initialize_request(next_request_id());
142        let ndjson = init_req.to_ndjson();
143        write_stdin(&stdin_writer, &ndjson);
144        eprintln!("[shim-kiro {}] sent initialize request", args.id);
145    }
146
147    // -- stdout reader thread --
148    let state_stdout = Arc::clone(&state);
149    let stdin_for_approve = Arc::clone(&stdin_writer);
150    let pty_log_stdout = pty_log.clone();
151    let shim_id = args.id.clone();
152    let cwd_for_init = args.cwd.to_string_lossy().to_string();
153    let stdout_handle = thread::spawn(move || {
154        let reader = BufReader::new(child_stdout);
155        for line_result in reader.lines() {
156            let line = match line_result {
157                Ok(l) => l,
158                Err(e) => {
159                    eprintln!("[shim-kiro {shim_id}] stdout read error: {e}");
160                    break;
161                }
162            };
163
164            if line.trim().is_empty() {
165                continue;
166            }
167
168            let msg: AcpMessage = match serde_json::from_str(&line) {
169                Ok(m) => m,
170                Err(e) => {
171                    eprintln!("[shim-kiro {shim_id}] ignoring unparseable NDJSON: {e}");
172                    continue;
173                }
174            };
175
176            // -- Handle responses to our requests --
177            if msg.is_response() {
178                let msg_id = msg.id.unwrap();
179
180                if let Some(ref error) = msg.error {
181                    eprintln!("[shim-kiro {shim_id}] JSON-RPC error (id={msg_id}): {error}");
182                    // Check if this was a prompt request that failed
183                    let mut st = state_stdout.lock().unwrap();
184                    if st.pending_prompt_request_id == Some(msg_id) {
185                        st.pending_prompt_request_id = None;
186                        let error_text = error
187                            .get("message")
188                            .and_then(|m| m.as_str())
189                            .unwrap_or("unknown error");
190
191                        if common::detect_context_exhausted(error_text) {
192                            let last_lines = last_n_lines_of(&st.accumulated_response, 5);
193                            let old = st.state;
194                            st.state = ShimState::ContextExhausted;
195                            st.state_changed_at = Instant::now();
196                            let drain = drain_queue_errors(
197                                &mut st.message_queue,
198                                ShimState::ContextExhausted,
199                            );
200                            drop(st);
201                            let _ = evt_channel.send(&Event::StateChanged {
202                                from: old,
203                                to: ShimState::ContextExhausted,
204                                summary: last_lines.clone(),
205                            });
206                            let _ = evt_channel.send(&Event::ContextExhausted {
207                                message: error_text.to_string(),
208                                last_lines,
209                            });
210                            for event in drain {
211                                let _ = evt_channel.send(&event);
212                            }
213                        } else {
214                            // Non-exhaustion error on prompt — complete with error
215                            let response = std::mem::take(&mut st.accumulated_response);
216                            let last_lines = last_n_lines_of(&response, 5);
217                            let msg_id_out = st.pending_message_id.take();
218                            st.state = ShimState::Idle;
219                            st.state_changed_at = Instant::now();
220                            drop(st);
221                            let _ = evt_channel.send(&Event::StateChanged {
222                                from: ShimState::Working,
223                                to: ShimState::Idle,
224                                summary: last_lines.clone(),
225                            });
226                            let _ = evt_channel.send(&Event::Completion {
227                                message_id: msg_id_out,
228                                response: format!("[error] {error_text}"),
229                                last_lines,
230                            });
231                        }
232                    }
233                    continue;
234                }
235
236                if let Some(ref result) = msg.result {
237                    let mut st = state_stdout.lock().unwrap();
238
239                    // Check if this is the initialize response (before session/new was sent)
240                    if !st.initialized && !st.sent_session_new {
241                        // This is the initialize response — now send session/new
242                        st.sent_session_new = true;
243                        drop(st);
244                        let session_req =
245                            kiro_types::session_new_request(next_request_id(), &cwd_for_init);
246                        let ndjson = session_req.to_ndjson();
247                        write_stdin(&stdin_for_approve, &ndjson);
248                        eprintln!("[shim-kiro {shim_id}] sent session/new request");
249                        continue;
250                    }
251
252                    // Check if this is the session/new response
253                    if !st.initialized {
254                        if let Some(sid) = kiro_types::extract_session_id(result) {
255                            st.session_id = sid.to_string();
256                            st.initialized = true;
257                            st.state = ShimState::Idle;
258                            st.state_changed_at = Instant::now();
259                            eprintln!("[shim-kiro {shim_id}] session created: {}", st.session_id);
260                            drop(st);
261
262                            // Now emit Ready — agent is ready for messages
263                            let _ = evt_channel.send(&Event::Ready);
264                        }
265                        continue;
266                    }
267
268                    // Check if this is a prompt result (turn completed)
269                    if st.pending_prompt_request_id == Some(msg_id) {
270                        st.pending_prompt_request_id = None;
271                        let response = if st.accumulated_response.is_empty() {
272                            result
273                                .get("result")
274                                .and_then(|r| r.as_str())
275                                .unwrap_or("")
276                                .to_string()
277                        } else {
278                            std::mem::take(&mut st.accumulated_response)
279                        };
280                        let last_lines = last_n_lines_of(&response, 5);
281                        let completed_msg_id = st.pending_message_id.take();
282                        let old = st.state;
283                        st.state = ShimState::Idle;
284                        st.state_changed_at = Instant::now();
285
286                        // Drain queue
287                        let queued_msg = if !st.message_queue.is_empty() {
288                            st.message_queue.pop_front()
289                        } else {
290                            None
291                        };
292                        if let Some(ref qm) = queued_msg {
293                            st.pending_message_id = qm.message_id.clone();
294                            st.state = ShimState::Working;
295                            st.state_changed_at = Instant::now();
296                            st.accumulated_response.clear();
297                        }
298                        let session_id = st.session_id.clone();
299                        let queue_depth = st.message_queue.len();
300                        drop(st);
301
302                        let _ = evt_channel.send(&Event::StateChanged {
303                            from: old,
304                            to: ShimState::Idle,
305                            summary: last_lines.clone(),
306                        });
307                        let _ = evt_channel.send(&Event::Completion {
308                            message_id: completed_msg_id,
309                            response,
310                            last_lines,
311                        });
312
313                        // Inject queued message
314                        if let Some(qm) = queued_msg {
315                            let text = format_injected_message(&qm.from, &qm.body);
316                            let req_id = next_request_id();
317                            let prompt_req =
318                                kiro_types::session_prompt_request(req_id, &session_id, &text);
319                            let ndjson = prompt_req.to_ndjson();
320                            write_stdin(&stdin_for_approve, &ndjson);
321                            let mut st = state_stdout.lock().unwrap();
322                            st.pending_prompt_request_id = Some(req_id);
323                            drop(st);
324
325                            let _ = evt_channel.send(&Event::StateChanged {
326                                from: ShimState::Idle,
327                                to: ShimState::Working,
328                                summary: format!(
329                                    "delivering queued message ({queue_depth} remaining)"
330                                ),
331                            });
332                        }
333                    }
334                }
335                continue;
336            }
337
338            // -- Handle notifications --
339            if msg.is_notification() {
340                let method = msg.method.as_deref().unwrap_or("");
341                let params = msg.params.as_ref();
342
343                match method {
344                    "session/update" => {
345                        if let Some(params) = params {
346                            let update_type = kiro_types::extract_update_type(params).unwrap_or("");
347
348                            match update_type {
349                                "agent_message_chunk" | "AgentMessageChunk" => {
350                                    if let Some(text) =
351                                        kiro_types::extract_message_chunk_text(params)
352                                    {
353                                        if !text.is_empty() {
354                                            let mut st = state_stdout.lock().unwrap();
355                                            st.accumulated_response.push_str(text);
356                                            st.cumulative_output_bytes += text.len() as u64;
357                                            drop(st);
358
359                                            if let Some(ref log) = pty_log_stdout {
360                                                let _ = log.lock().unwrap().write(text.as_bytes());
361                                            }
362                                        }
363                                    }
364                                }
365
366                                "agent_thought_chunk" => {
367                                    // Agent thinking — log but don't accumulate
368                                    if let Some(text) =
369                                        kiro_types::extract_message_chunk_text(params)
370                                    {
371                                        if let Some(ref log) = pty_log_stdout {
372                                            let _ = log
373                                                .lock()
374                                                .unwrap()
375                                                .write(format!("[thought] {text}").as_bytes());
376                                        }
377                                    }
378                                }
379
380                                "tool_call" | "ToolCall" => {
381                                    // Log tool calls for visibility
382                                    let title = params
383                                        .get("update")
384                                        .and_then(|u| u.get("title"))
385                                        .and_then(|t| t.as_str())
386                                        .unwrap_or("unknown tool");
387                                    if let Some(ref log) = pty_log_stdout {
388                                        let _ = log
389                                            .lock()
390                                            .unwrap()
391                                            .write(format!("[tool] {title}\n").as_bytes());
392                                    }
393                                }
394
395                                "tool_call_update" | "ToolCallUpdate" => {
396                                    // Tool progress — no action needed
397                                }
398
399                                "TurnEnd" | "turn_end" => {
400                                    // Turn ended via notification.
401                                    // The prompt result response will handle the state transition,
402                                    // but some ACP agents send TurnEnd before the result.
403                                    // We do nothing here — the result handler above transitions state.
404                                }
405
406                                _ => {
407                                    // Unknown update type — silently ignore (future-proof)
408                                }
409                            }
410                        }
411                    }
412
413                    "_kiro.dev/metadata" => {
414                        // Check for context exhaustion via high usage percentage
415                        if let Some(params) = params {
416                            if let Some(usage) = kiro_types::extract_context_usage(params) {
417                                if usage >= CONTEXT_EXHAUSTION_THRESHOLD {
418                                    let mut st = state_stdout.lock().unwrap();
419                                    let last_lines = last_n_lines_of(&st.accumulated_response, 5);
420                                    let old = st.state;
421                                    st.state = ShimState::ContextExhausted;
422                                    st.state_changed_at = Instant::now();
423                                    let drain = drain_queue_errors(
424                                        &mut st.message_queue,
425                                        ShimState::ContextExhausted,
426                                    );
427                                    drop(st);
428
429                                    let _ = evt_channel.send(&Event::StateChanged {
430                                        from: old,
431                                        to: ShimState::ContextExhausted,
432                                        summary: last_lines.clone(),
433                                    });
434                                    let _ = evt_channel.send(&Event::ContextExhausted {
435                                        message: format!("context usage at {usage:.1}%"),
436                                        last_lines,
437                                    });
438                                    for event in drain {
439                                        let _ = evt_channel.send(&event);
440                                    }
441                                }
442                            }
443                        }
444                    }
445
446                    "_kiro.dev/compaction/status" | "_kiro.dev/clear/status" => {
447                        // Informational — log only
448                        eprintln!("[shim-kiro {shim_id}] {method}: {params:?}");
449                    }
450
451                    _ => {
452                        // Unknown notification — silently ignore
453                    }
454                }
455                continue;
456            }
457
458            // -- Handle agent-initiated requests --
459            if msg.is_agent_request() {
460                let method = msg.method.as_deref().unwrap_or("");
461                let request_id = msg.id.unwrap();
462
463                match method {
464                    "session/request_permission" => {
465                        // Auto-approve all permission requests
466                        let resp = kiro_types::permission_approve_response(request_id);
467                        let ndjson = resp.to_ndjson();
468                        write_stdin(&stdin_for_approve, &ndjson);
469                        eprintln!(
470                            "[shim-kiro {shim_id}] auto-approved permission request {request_id}"
471                        );
472                    }
473
474                    "fs/read_text_file" | "fs/write_text_file" | "terminal/create"
475                    | "terminal/kill" => {
476                        // Client-side operations — we don't support these, respond with error
477                        let error_resp = serde_json::json!({
478                            "jsonrpc": "2.0",
479                            "id": request_id,
480                            "error": {
481                                "code": -32601,
482                                "message": format!("method not supported by batty shim: {method}")
483                            }
484                        });
485                        let ndjson = serde_json::to_string(&error_resp).unwrap();
486                        write_stdin(&stdin_for_approve, &ndjson);
487                    }
488
489                    _ => {
490                        // Unknown agent request — respond with method not found
491                        let error_resp = serde_json::json!({
492                            "jsonrpc": "2.0",
493                            "id": request_id,
494                            "error": {
495                                "code": -32601,
496                                "message": format!("unknown method: {method}")
497                            }
498                        });
499                        let ndjson = serde_json::to_string(&error_resp).unwrap();
500                        write_stdin(&stdin_for_approve, &ndjson);
501                    }
502                }
503                continue;
504            }
505        }
506
507        // stdout EOF — agent process closed
508        let mut st = state_stdout.lock().unwrap();
509        let last_lines = last_n_lines_of(&st.accumulated_response, 10);
510        let old = st.state;
511        st.state = ShimState::Dead;
512        st.state_changed_at = Instant::now();
513
514        let drain = drain_queue_errors(&mut st.message_queue, ShimState::Dead);
515        drop(st);
516
517        let _ = evt_channel.send(&Event::StateChanged {
518            from: old,
519            to: ShimState::Dead,
520            summary: last_lines.clone(),
521        });
522        let _ = evt_channel.send(&Event::Died {
523            exit_code: None,
524            last_lines,
525        });
526        for event in drain {
527            let _ = evt_channel.send(&event);
528        }
529    });
530
531    // -- stderr reader thread --
532    let shim_id_err = args.id.clone();
533    let pty_log_stderr = pty_log;
534    thread::spawn(move || {
535        let reader = BufReader::new(child_stderr);
536        for line_result in reader.lines() {
537            match line_result {
538                Ok(line) => {
539                    eprintln!("[shim-kiro {shim_id_err}] stderr: {line}");
540                    if let Some(ref log) = pty_log_stderr {
541                        let _ = log
542                            .lock()
543                            .unwrap()
544                            .write(format!("[stderr] {line}\n").as_bytes());
545                    }
546                }
547                Err(_) => break,
548            }
549        }
550    });
551
552    // -- Session stats thread --
553    let state_stats = Arc::clone(&state);
554    let mut stats_channel = cmd_channel
555        .try_clone()
556        .context("failed to clone channel for stats")?;
557    thread::spawn(move || {
558        loop {
559            thread::sleep(Duration::from_secs(SESSION_STATS_INTERVAL_SECS));
560            let st = state_stats.lock().unwrap();
561            if st.state == ShimState::Dead {
562                return;
563            }
564            let output_bytes = st.cumulative_output_bytes;
565            let uptime_secs = st.started_at.elapsed().as_secs();
566            drop(st);
567
568            if stats_channel
569                .send(&Event::SessionStats {
570                    output_bytes,
571                    uptime_secs,
572                })
573                .is_err()
574            {
575                return;
576            }
577        }
578    });
579
580    // -- Wait for initialization to complete before accepting commands --
581    {
582        let deadline = Instant::now() + Duration::from_secs(INIT_TIMEOUT_SECS);
583        loop {
584            let st = state.lock().unwrap();
585            if st.initialized {
586                break;
587            }
588            if st.state == ShimState::Dead {
589                eprintln!("[shim-kiro {}] agent died during initialization", args.id);
590                return Ok(());
591            }
592            drop(st);
593
594            if Instant::now() > deadline {
595                eprintln!(
596                    "[shim-kiro {}] initialization timed out after {}s",
597                    args.id, INIT_TIMEOUT_SECS
598                );
599                terminate_child(&mut child);
600                return Ok(());
601            }
602            thread::sleep(Duration::from_millis(PROCESS_EXIT_POLL_MS));
603        }
604    }
605
606    // -- Command loop (main thread) --
607    let state_cmd = Arc::clone(&state);
608    loop {
609        let cmd = match cmd_channel.recv::<ShimCommand>() {
610            Ok(Some(c)) => c,
611            Ok(None) => {
612                eprintln!(
613                    "[shim-kiro {}] orchestrator disconnected, shutting down",
614                    args.id
615                );
616                terminate_child(&mut child);
617                break;
618            }
619            Err(e) => {
620                eprintln!("[shim-kiro {}] channel error: {e}", args.id);
621                terminate_child(&mut child);
622                break;
623            }
624        };
625
626        match cmd {
627            ShimCommand::SendMessage {
628                from,
629                body,
630                message_id,
631            } => {
632                let mut st = state_cmd.lock().unwrap();
633                match st.state {
634                    ShimState::Idle => {
635                        st.pending_message_id = message_id;
636                        st.accumulated_response.clear();
637                        let session_id = st.session_id.clone();
638                        st.state = ShimState::Working;
639                        st.state_changed_at = Instant::now();
640
641                        let req_id = next_request_id();
642                        st.pending_prompt_request_id = Some(req_id);
643                        drop(st);
644
645                        let text = format_injected_message(&from, &body);
646                        let prompt_req =
647                            kiro_types::session_prompt_request(req_id, &session_id, &text);
648                        let ndjson = prompt_req.to_ndjson();
649
650                        if !write_stdin(&stdin_writer, &ndjson) {
651                            cmd_channel.send(&Event::Error {
652                                command: "SendMessage".into(),
653                                reason: "stdin write failed (closed)".into(),
654                            })?;
655                            continue;
656                        }
657
658                        cmd_channel.send(&Event::StateChanged {
659                            from: ShimState::Idle,
660                            to: ShimState::Working,
661                            summary: String::new(),
662                        })?;
663                    }
664                    ShimState::Working => {
665                        // Queue the message
666                        if st.message_queue.len() >= MAX_QUEUE_DEPTH {
667                            let dropped = st.message_queue.pop_front();
668                            let dropped_id = dropped.as_ref().and_then(|m| m.message_id.clone());
669                            st.message_queue.push_back(QueuedMessage {
670                                from,
671                                body,
672                                message_id,
673                            });
674                            let depth = st.message_queue.len();
675                            drop(st);
676
677                            cmd_channel.send(&Event::Error {
678                                command: "SendMessage".into(),
679                                reason: format!(
680                                    "message queue full ({MAX_QUEUE_DEPTH}), dropped oldest message{}",
681                                    dropped_id
682                                        .map(|id| format!(" (id: {id})"))
683                                        .unwrap_or_default(),
684                                ),
685                            })?;
686                            cmd_channel.send(&Event::Warning {
687                                message: format!(
688                                    "message queued while agent working (depth: {depth})"
689                                ),
690                                idle_secs: None,
691                            })?;
692                        } else {
693                            st.message_queue.push_back(QueuedMessage {
694                                from,
695                                body,
696                                message_id,
697                            });
698                            let depth = st.message_queue.len();
699                            drop(st);
700
701                            cmd_channel.send(&Event::Warning {
702                                message: format!(
703                                    "message queued while agent working (depth: {depth})"
704                                ),
705                                idle_secs: None,
706                            })?;
707                        }
708                    }
709                    other => {
710                        drop(st);
711                        cmd_channel.send(&Event::Error {
712                            command: "SendMessage".into(),
713                            reason: format!("agent in {other} state, cannot accept message"),
714                        })?;
715                    }
716                }
717            }
718
719            ShimCommand::CaptureScreen { last_n_lines } => {
720                let st = state_cmd.lock().unwrap();
721                let content = match last_n_lines {
722                    Some(n) => last_n_lines_of(&st.accumulated_response, n),
723                    None => st.accumulated_response.clone(),
724                };
725                drop(st);
726                cmd_channel.send(&Event::ScreenCapture {
727                    content,
728                    cursor_row: 0,
729                    cursor_col: 0,
730                })?;
731            }
732
733            ShimCommand::GetState => {
734                let st = state_cmd.lock().unwrap();
735                let since = st.state_changed_at.elapsed().as_secs();
736                let state = st.state;
737                drop(st);
738                cmd_channel.send(&Event::State {
739                    state,
740                    since_secs: since,
741                })?;
742            }
743
744            ShimCommand::Resize { .. } => {
745                // No-op in ACP mode — no PTY.
746            }
747
748            ShimCommand::Ping => {
749                cmd_channel.send(&Event::Pong)?;
750            }
751
752            ShimCommand::Shutdown { timeout_secs } => {
753                eprintln!(
754                    "[shim-kiro {}] shutdown requested (timeout: {}s)",
755                    args.id, timeout_secs
756                );
757                // Take stdin out of the shared Option to truly close it.
758                // The stdout reader thread also holds an Arc clone, but taking
759                // from the Option means both sides see None.
760                if let Ok(mut guard) = stdin_writer.lock() {
761                    guard.take(); // closes ChildStdin
762                }
763                terminate_child(&mut child);
764
765                // Give the child a moment to fully exit.
766                let deadline = Instant::now() + Duration::from_secs(timeout_secs as u64);
767                loop {
768                    if Instant::now() > deadline {
769                        break;
770                    }
771                    match child.try_wait() {
772                        Ok(Some(_)) => break,
773                        _ => thread::sleep(Duration::from_millis(PROCESS_EXIT_POLL_MS)),
774                    }
775                }
776                break;
777            }
778
779            ShimCommand::Kill => {
780                terminate_child(&mut child);
781                break;
782            }
783        }
784    }
785
786    stdout_handle.join().ok();
787    Ok(())
788}
789
790// ---------------------------------------------------------------------------
791// Helpers
792// ---------------------------------------------------------------------------
793
794/// Write an NDJSON line to the shared stdin, if it's still open.
795fn write_stdin(stdin: &Arc<Mutex<Option<std::process::ChildStdin>>>, line: &str) -> bool {
796    if let Ok(mut guard) = stdin.lock() {
797        if let Some(ref mut writer) = *guard {
798            if writeln!(writer, "{line}").is_ok() {
799                let _ = writer.flush();
800                return true;
801            }
802        }
803    }
804    false
805}
806
807/// Terminate a child process: SIGTERM, grace period, then SIGKILL.
808fn terminate_child(child: &mut Child) {
809    let pid = child.id();
810
811    #[cfg(unix)]
812    {
813        unsafe {
814            libc::kill(pid as i32, libc::SIGTERM);
815        }
816        let deadline = Instant::now() + Duration::from_secs(GROUP_TERM_GRACE_SECS);
817        loop {
818            if Instant::now() > deadline {
819                break;
820            }
821            match child.try_wait() {
822                Ok(Some(_)) => return,
823                _ => thread::sleep(Duration::from_millis(PROCESS_EXIT_POLL_MS)),
824            }
825        }
826        unsafe {
827            libc::kill(pid as i32, libc::SIGKILL);
828        }
829    }
830
831    #[allow(unreachable_code)]
832    {
833        let _ = child.kill();
834    }
835}
836
837/// Extract the last N lines from a string.
838fn last_n_lines_of(text: &str, n: usize) -> String {
839    let lines: Vec<&str> = text.lines().collect();
840    let start = lines.len().saturating_sub(n);
841    lines[start..].join("\n")
842}
843
844// ---------------------------------------------------------------------------
845// Tests
846// ---------------------------------------------------------------------------
847
848#[cfg(test)]
849mod tests {
850    use super::*;
851    use crate::shim::protocol;
852
853    #[test]
854    fn last_n_lines_basic() {
855        let text = "a\nb\nc\nd\ne";
856        assert_eq!(last_n_lines_of(text, 3), "c\nd\ne");
857        assert_eq!(last_n_lines_of(text, 10), "a\nb\nc\nd\ne");
858        assert_eq!(last_n_lines_of(text, 0), "");
859    }
860
861    #[test]
862    fn last_n_lines_empty() {
863        assert_eq!(last_n_lines_of("", 5), "");
864    }
865
866    #[test]
867    fn kiro_state_initial_values() {
868        let st = KiroState {
869            state: ShimState::Starting,
870            state_changed_at: Instant::now(),
871            started_at: Instant::now(),
872            session_id: String::new(),
873            accumulated_response: String::new(),
874            pending_message_id: None,
875            message_queue: VecDeque::new(),
876            cumulative_output_bytes: 0,
877            initialized: false,
878            sent_session_new: false,
879            pending_prompt_request_id: None,
880        };
881        assert_eq!(st.state, ShimState::Starting);
882        assert!(st.session_id.is_empty());
883        assert!(!st.initialized);
884        assert!(!st.sent_session_new);
885        assert!(st.message_queue.is_empty());
886    }
887
888    #[test]
889    fn channel_round_trip_events() {
890        let (parent_sock, child_sock) = protocol::socketpair().unwrap();
891        let mut parent = protocol::Channel::new(parent_sock);
892        let mut child = protocol::Channel::new(child_sock);
893
894        child.send(&Event::Ready).unwrap();
895        let event: Event = parent.recv().unwrap().unwrap();
896        assert!(matches!(event, Event::Ready));
897
898        child
899            .send(&Event::Completion {
900                message_id: Some("m1".into()),
901                response: "done".into(),
902                last_lines: "done".into(),
903            })
904            .unwrap();
905        let event: Event = parent.recv().unwrap().unwrap();
906        match event {
907            Event::Completion {
908                message_id,
909                response,
910                ..
911            } => {
912                assert_eq!(message_id.as_deref(), Some("m1"));
913                assert_eq!(response, "done");
914            }
915            _ => panic!("expected Completion"),
916        }
917    }
918
919    #[test]
920    fn context_exhaustion_threshold() {
921        let threshold = CONTEXT_EXHAUSTION_THRESHOLD;
922        assert!(threshold >= 95.0);
923        assert!(threshold <= 100.0);
924    }
925
926    #[test]
927    fn next_request_id_increments() {
928        let a = next_request_id();
929        let b = next_request_id();
930        assert!(b > a);
931    }
932}