1use std::collections::VecDeque;
13use std::io::{BufRead, BufReader, Write as IoWrite};
14use std::process::{Child, Command, Stdio};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Mutex};
17use std::thread;
18use std::time::{Duration, Instant};
19
20use anyhow::{Context, Result};
21
22use super::common::{
23 self, MAX_QUEUE_DEPTH, QueuedMessage, SESSION_STATS_INTERVAL_SECS, drain_queue_errors,
24 format_injected_message,
25};
26use super::kiro_types::{self, AcpMessage};
27use super::protocol::{Channel, Command as ShimCommand, Event, ShimState};
28use super::pty_log::PtyLogWriter;
29use super::runtime::ShimArgs;
30
31const PROCESS_EXIT_POLL_MS: u64 = 100;
36const GROUP_TERM_GRACE_SECS: u64 = 2;
37
38const INIT_TIMEOUT_SECS: u64 = 30;
40
41const CONTEXT_EXHAUSTION_THRESHOLD: f64 = 98.0;
43
44struct KiroState {
49 state: ShimState,
50 state_changed_at: Instant,
51 started_at: Instant,
52 session_id: String,
54 accumulated_response: String,
56 pending_message_id: Option<String>,
58 message_queue: VecDeque<QueuedMessage>,
60 cumulative_output_bytes: u64,
62 initialized: bool,
64 sent_session_new: bool,
66 pending_prompt_request_id: Option<u64>,
68 context_approaching_emitted: bool,
70}
71
72static REQUEST_ID: AtomicU64 = AtomicU64::new(0);
74
75fn next_request_id() -> u64 {
76 REQUEST_ID.fetch_add(1, Ordering::Relaxed)
77}
78
79pub fn run_kiro_acp(args: ShimArgs, channel: Channel) -> Result<()> {
88 let mut child = Command::new("bash")
90 .args(["-lc", &args.cmd])
91 .current_dir(&args.cwd)
92 .stdin(Stdio::piped())
93 .stdout(Stdio::piped())
94 .stderr(Stdio::piped())
95 .env_remove("CLAUDECODE")
96 .spawn()
97 .with_context(|| format!("[shim-kiro {}] failed to spawn kiro-cli acp", args.id))?;
98
99 let child_pid = child.id();
100 eprintln!(
101 "[shim-kiro {}] spawned kiro-cli acp (pid {})",
102 args.id, child_pid
103 );
104
105 let child_stdin = child.stdin.take().context("failed to take child stdin")?;
106 let child_stdout = child.stdout.take().context("failed to take child stdout")?;
107 let child_stderr = child.stderr.take().context("failed to take child stderr")?;
108
109 let state = Arc::new(Mutex::new(KiroState {
111 state: ShimState::Starting,
112 state_changed_at: Instant::now(),
113 started_at: Instant::now(),
114 session_id: String::new(),
115 accumulated_response: String::new(),
116 pending_message_id: None,
117 message_queue: VecDeque::new(),
118 cumulative_output_bytes: 0,
119 initialized: false,
120 sent_session_new: false,
121 pending_prompt_request_id: None,
122 context_approaching_emitted: false,
123 }));
124
125 let stdin_writer = Arc::new(Mutex::new(Some(child_stdin)));
127
128 let pty_log: Option<Arc<Mutex<PtyLogWriter>>> = args
130 .pty_log_path
131 .as_deref()
132 .map(|p| PtyLogWriter::new(p).context("failed to create PTY log"))
133 .transpose()?
134 .map(|w| Arc::new(Mutex::new(w)));
135
136 let mut cmd_channel = channel;
138 let mut evt_channel = cmd_channel
139 .try_clone()
140 .context("failed to clone channel for stdout reader")?;
141
142 {
144 let init_req = kiro_types::initialize_request(next_request_id());
145 let ndjson = init_req.to_ndjson();
146 write_stdin(&stdin_writer, &ndjson);
147 eprintln!("[shim-kiro {}] sent initialize request", args.id);
148 }
149
150 let state_stdout = Arc::clone(&state);
152 let stdin_for_approve = Arc::clone(&stdin_writer);
153 let pty_log_stdout = pty_log.clone();
154 let shim_id = args.id.clone();
155 let cwd_for_init = args.cwd.to_string_lossy().to_string();
156 let stdout_handle = thread::spawn(move || {
157 let reader = BufReader::new(child_stdout);
158 for line_result in reader.lines() {
159 let line = match line_result {
160 Ok(l) => l,
161 Err(e) => {
162 eprintln!("[shim-kiro {shim_id}] stdout read error: {e}");
163 break;
164 }
165 };
166
167 if line.trim().is_empty() {
168 continue;
169 }
170
171 let msg: AcpMessage = match serde_json::from_str(&line) {
172 Ok(m) => m,
173 Err(e) => {
174 eprintln!("[shim-kiro {shim_id}] ignoring unparseable NDJSON: {e}");
175 continue;
176 }
177 };
178
179 if msg.is_response() {
181 let msg_id = msg.id.unwrap();
182
183 if let Some(ref error) = msg.error {
184 eprintln!("[shim-kiro {shim_id}] JSON-RPC error (id={msg_id}): {error}");
185 let mut st = state_stdout.lock().unwrap();
187 if st.pending_prompt_request_id == Some(msg_id) {
188 st.pending_prompt_request_id = None;
189 let error_text = error
190 .get("message")
191 .and_then(|m| m.as_str())
192 .unwrap_or("unknown error");
193
194 if common::detect_context_exhausted(error_text) {
195 let last_lines = last_n_lines_of(&st.accumulated_response, 5);
196 let old = st.state;
197 st.state = ShimState::ContextExhausted;
198 st.state_changed_at = Instant::now();
199 let drain = drain_queue_errors(
200 &mut st.message_queue,
201 ShimState::ContextExhausted,
202 );
203 drop(st);
204 let _ = evt_channel.send(&Event::StateChanged {
205 from: old,
206 to: ShimState::ContextExhausted,
207 summary: last_lines.clone(),
208 });
209 let _ = evt_channel.send(&Event::ContextExhausted {
210 message: error_text.to_string(),
211 last_lines,
212 });
213 for event in drain {
214 let _ = evt_channel.send(&event);
215 }
216 } else {
217 let response = std::mem::take(&mut st.accumulated_response);
219 let last_lines = last_n_lines_of(&response, 5);
220 let msg_id_out = st.pending_message_id.take();
221 st.state = ShimState::Idle;
222 st.state_changed_at = Instant::now();
223 drop(st);
224 let _ = evt_channel.send(&Event::StateChanged {
225 from: ShimState::Working,
226 to: ShimState::Idle,
227 summary: last_lines.clone(),
228 });
229 let _ = evt_channel.send(&Event::Completion {
230 message_id: msg_id_out,
231 response: format!("[error] {error_text}"),
232 last_lines,
233 });
234 }
235 }
236 continue;
237 }
238
239 if let Some(ref result) = msg.result {
240 let mut st = state_stdout.lock().unwrap();
241
242 if !st.initialized && !st.sent_session_new {
244 st.sent_session_new = true;
246 drop(st);
247 let session_req =
248 kiro_types::session_new_request(next_request_id(), &cwd_for_init);
249 let ndjson = session_req.to_ndjson();
250 write_stdin(&stdin_for_approve, &ndjson);
251 eprintln!("[shim-kiro {shim_id}] sent session/new request");
252 continue;
253 }
254
255 if !st.initialized {
257 if let Some(sid) = kiro_types::extract_session_id(result) {
258 st.session_id = sid.to_string();
259 st.initialized = true;
260 st.state = ShimState::Idle;
261 st.state_changed_at = Instant::now();
262 eprintln!("[shim-kiro {shim_id}] session created: {}", st.session_id);
263 drop(st);
264
265 let _ = evt_channel.send(&Event::Ready);
267 }
268 continue;
269 }
270
271 if st.pending_prompt_request_id == Some(msg_id) {
273 st.pending_prompt_request_id = None;
274 let response = if st.accumulated_response.is_empty() {
275 result
276 .get("result")
277 .and_then(|r| r.as_str())
278 .unwrap_or("")
279 .to_string()
280 } else {
281 std::mem::take(&mut st.accumulated_response)
282 };
283 let last_lines = last_n_lines_of(&response, 5);
284 let completed_msg_id = st.pending_message_id.take();
285 let old = st.state;
286 st.state = ShimState::Idle;
287 st.state_changed_at = Instant::now();
288
289 let queued_msg = if !st.message_queue.is_empty() {
291 st.message_queue.pop_front()
292 } else {
293 None
294 };
295 if let Some(ref qm) = queued_msg {
296 st.pending_message_id = qm.message_id.clone();
297 st.state = ShimState::Working;
298 st.state_changed_at = Instant::now();
299 st.accumulated_response.clear();
300 }
301 let session_id = st.session_id.clone();
302 let queue_depth = st.message_queue.len();
303 drop(st);
304
305 let _ = evt_channel.send(&Event::StateChanged {
306 from: old,
307 to: ShimState::Idle,
308 summary: last_lines.clone(),
309 });
310 let _ = evt_channel.send(&Event::Completion {
311 message_id: completed_msg_id,
312 response,
313 last_lines,
314 });
315
316 if let Some(qm) = queued_msg {
318 let text = format_injected_message(&qm.from, &qm.body);
319 let req_id = next_request_id();
320 let prompt_req =
321 kiro_types::session_prompt_request(req_id, &session_id, &text);
322 let ndjson = prompt_req.to_ndjson();
323 write_stdin(&stdin_for_approve, &ndjson);
324 let mut st = state_stdout.lock().unwrap();
325 st.pending_prompt_request_id = Some(req_id);
326 drop(st);
327
328 let _ = evt_channel.send(&Event::StateChanged {
329 from: ShimState::Idle,
330 to: ShimState::Working,
331 summary: format!(
332 "delivering queued message ({queue_depth} remaining)"
333 ),
334 });
335 }
336 }
337 }
338 continue;
339 }
340
341 if msg.is_notification() {
343 let method = msg.method.as_deref().unwrap_or("");
344 let params = msg.params.as_ref();
345
346 match method {
347 "session/update" => {
348 if let Some(params) = params {
349 let update_type = kiro_types::extract_update_type(params).unwrap_or("");
350
351 match update_type {
352 "agent_message_chunk" | "AgentMessageChunk" => {
353 if let Some(text) =
354 kiro_types::extract_message_chunk_text(params)
355 {
356 if !text.is_empty() {
357 let mut st = state_stdout.lock().unwrap();
358 st.accumulated_response.push_str(text);
359 st.cumulative_output_bytes += text.len() as u64;
360
361 if !st.context_approaching_emitted
363 && common::detect_context_approaching_limit(text)
364 {
365 st.context_approaching_emitted = true;
366 drop(st);
367 let _ = evt_channel.send(&Event::ContextApproaching {
368 message: "Agent output contains context-pressure signals".into(),
369 input_tokens: 0,
370 output_tokens: 0,
371 });
372 } else {
373 drop(st);
374 }
375
376 if let Some(ref log) = pty_log_stdout {
377 let _ = log.lock().unwrap().write(text.as_bytes());
378 }
379 }
380 }
381 }
382
383 "agent_thought_chunk" => {
384 if let Some(text) =
386 kiro_types::extract_message_chunk_text(params)
387 {
388 if let Some(ref log) = pty_log_stdout {
389 let _ = log
390 .lock()
391 .unwrap()
392 .write(format!("[thought] {text}").as_bytes());
393 }
394 }
395 }
396
397 "tool_call" | "ToolCall" => {
398 let title = params
400 .get("update")
401 .and_then(|u| u.get("title"))
402 .and_then(|t| t.as_str())
403 .unwrap_or("unknown tool");
404 if let Some(ref log) = pty_log_stdout {
405 let _ = log
406 .lock()
407 .unwrap()
408 .write(format!("[tool] {title}\n").as_bytes());
409 }
410 }
411
412 "tool_call_update" | "ToolCallUpdate" => {
413 }
415
416 "TurnEnd" | "turn_end" => {
417 }
422
423 _ => {
424 }
426 }
427 }
428 }
429
430 "_kiro.dev/metadata" => {
431 if let Some(params) = params {
433 if let Some(usage) = kiro_types::extract_context_usage(params) {
434 if usage >= CONTEXT_EXHAUSTION_THRESHOLD {
435 let mut st = state_stdout.lock().unwrap();
436 let last_lines = last_n_lines_of(&st.accumulated_response, 5);
437 let old = st.state;
438 st.state = ShimState::ContextExhausted;
439 st.state_changed_at = Instant::now();
440 let drain = drain_queue_errors(
441 &mut st.message_queue,
442 ShimState::ContextExhausted,
443 );
444 drop(st);
445
446 let _ = evt_channel.send(&Event::StateChanged {
447 from: old,
448 to: ShimState::ContextExhausted,
449 summary: last_lines.clone(),
450 });
451 let _ = evt_channel.send(&Event::ContextExhausted {
452 message: format!("context usage at {usage:.1}%"),
453 last_lines,
454 });
455 for event in drain {
456 let _ = evt_channel.send(&event);
457 }
458 }
459 }
460 }
461 }
462
463 "_kiro.dev/compaction/status" | "_kiro.dev/clear/status" => {
464 eprintln!("[shim-kiro {shim_id}] {method}: {params:?}");
466 }
467
468 _ => {
469 }
471 }
472 continue;
473 }
474
475 if msg.is_agent_request() {
477 let method = msg.method.as_deref().unwrap_or("");
478 let request_id = msg.id.unwrap();
479
480 match method {
481 "session/request_permission" => {
482 let resp = kiro_types::permission_approve_response(request_id);
484 let ndjson = resp.to_ndjson();
485 write_stdin(&stdin_for_approve, &ndjson);
486 eprintln!(
487 "[shim-kiro {shim_id}] auto-approved permission request {request_id}"
488 );
489 }
490
491 "fs/read_text_file" | "fs/write_text_file" | "terminal/create"
492 | "terminal/kill" => {
493 let error_resp = serde_json::json!({
495 "jsonrpc": "2.0",
496 "id": request_id,
497 "error": {
498 "code": -32601,
499 "message": format!("method not supported by batty shim: {method}")
500 }
501 });
502 let ndjson = serde_json::to_string(&error_resp).unwrap();
503 write_stdin(&stdin_for_approve, &ndjson);
504 }
505
506 _ => {
507 let error_resp = serde_json::json!({
509 "jsonrpc": "2.0",
510 "id": request_id,
511 "error": {
512 "code": -32601,
513 "message": format!("unknown method: {method}")
514 }
515 });
516 let ndjson = serde_json::to_string(&error_resp).unwrap();
517 write_stdin(&stdin_for_approve, &ndjson);
518 }
519 }
520 continue;
521 }
522 }
523
524 let mut st = state_stdout.lock().unwrap();
526 let last_lines = last_n_lines_of(&st.accumulated_response, 10);
527 let old = st.state;
528 st.state = ShimState::Dead;
529 st.state_changed_at = Instant::now();
530
531 let drain = drain_queue_errors(&mut st.message_queue, ShimState::Dead);
532 drop(st);
533
534 let _ = evt_channel.send(&Event::StateChanged {
535 from: old,
536 to: ShimState::Dead,
537 summary: last_lines.clone(),
538 });
539 let _ = evt_channel.send(&Event::Died {
540 exit_code: None,
541 last_lines,
542 });
543 for event in drain {
544 let _ = evt_channel.send(&event);
545 }
546 });
547
548 let shim_id_err = args.id.clone();
550 let pty_log_stderr = pty_log;
551 thread::spawn(move || {
552 let reader = BufReader::new(child_stderr);
553 for line_result in reader.lines() {
554 match line_result {
555 Ok(line) => {
556 eprintln!("[shim-kiro {shim_id_err}] stderr: {line}");
557 if let Some(ref log) = pty_log_stderr {
558 let _ = log
559 .lock()
560 .unwrap()
561 .write(format!("[stderr] {line}\n").as_bytes());
562 }
563 }
564 Err(_) => break,
565 }
566 }
567 });
568
569 let state_stats = Arc::clone(&state);
571 let mut stats_channel = cmd_channel
572 .try_clone()
573 .context("failed to clone channel for stats")?;
574 thread::spawn(move || {
575 loop {
576 thread::sleep(Duration::from_secs(SESSION_STATS_INTERVAL_SECS));
577 let st = state_stats.lock().unwrap();
578 if st.state == ShimState::Dead {
579 return;
580 }
581 let output_bytes = st.cumulative_output_bytes;
582 let uptime_secs = st.started_at.elapsed().as_secs();
583 drop(st);
584
585 if stats_channel
586 .send(&Event::SessionStats {
587 output_bytes,
588 uptime_secs,
589 input_tokens: 0,
590 output_tokens: 0,
591 context_usage_pct: None,
592 })
593 .is_err()
594 {
595 return;
596 }
597 }
598 });
599
600 {
602 let deadline = Instant::now() + Duration::from_secs(INIT_TIMEOUT_SECS);
603 loop {
604 let st = state.lock().unwrap();
605 if st.initialized {
606 break;
607 }
608 if st.state == ShimState::Dead {
609 eprintln!("[shim-kiro {}] agent died during initialization", args.id);
610 return Ok(());
611 }
612 drop(st);
613
614 if Instant::now() > deadline {
615 eprintln!(
616 "[shim-kiro {}] initialization timed out after {}s",
617 args.id, INIT_TIMEOUT_SECS
618 );
619 terminate_child(&mut child);
620 return Ok(());
621 }
622 thread::sleep(Duration::from_millis(PROCESS_EXIT_POLL_MS));
623 }
624 }
625
626 let state_cmd = Arc::clone(&state);
628 loop {
629 let cmd = match cmd_channel.recv::<ShimCommand>() {
630 Ok(Some(c)) => c,
631 Ok(None) => {
632 eprintln!(
633 "[shim-kiro {}] orchestrator disconnected, shutting down",
634 args.id
635 );
636 terminate_child(&mut child);
637 break;
638 }
639 Err(e) => {
640 eprintln!("[shim-kiro {}] channel error: {e}", args.id);
641 terminate_child(&mut child);
642 break;
643 }
644 };
645
646 match cmd {
647 ShimCommand::SendMessage {
648 from,
649 body,
650 message_id,
651 } => {
652 let delivery_id = message_id.clone();
653 let mut st = state_cmd.lock().unwrap();
654 match st.state {
655 ShimState::Idle => {
656 st.pending_message_id = message_id;
657 st.accumulated_response.clear();
658 let session_id = st.session_id.clone();
659 st.state = ShimState::Working;
660 st.state_changed_at = Instant::now();
661
662 let req_id = next_request_id();
663 st.pending_prompt_request_id = Some(req_id);
664 drop(st);
665
666 let text = format_injected_message(&from, &body);
667 let prompt_req =
668 kiro_types::session_prompt_request(req_id, &session_id, &text);
669 let ndjson = prompt_req.to_ndjson();
670
671 if !write_stdin(&stdin_writer, &ndjson) {
672 if let Some(id) = delivery_id {
673 cmd_channel.send(&Event::DeliveryFailed {
674 id,
675 reason: "stdin write failed (closed)".into(),
676 })?;
677 }
678 cmd_channel.send(&Event::Error {
679 command: "SendMessage".into(),
680 reason: "stdin write failed (closed)".into(),
681 })?;
682 continue;
683 }
684
685 if let Some(id) = delivery_id {
686 cmd_channel.send(&Event::MessageDelivered { id })?;
687 }
688 cmd_channel.send(&Event::StateChanged {
689 from: ShimState::Idle,
690 to: ShimState::Working,
691 summary: String::new(),
692 })?;
693 }
694 ShimState::Working => {
695 if st.message_queue.len() >= MAX_QUEUE_DEPTH {
697 let dropped = st.message_queue.pop_front();
698 let dropped_id = dropped.as_ref().and_then(|m| m.message_id.clone());
699 st.message_queue.push_back(QueuedMessage {
700 from,
701 body,
702 message_id,
703 });
704 let depth = st.message_queue.len();
705 drop(st);
706
707 cmd_channel.send(&Event::Error {
708 command: "SendMessage".into(),
709 reason: format!(
710 "message queue full ({MAX_QUEUE_DEPTH}), dropped oldest message{}",
711 dropped_id
712 .map(|id| format!(" (id: {id})"))
713 .unwrap_or_default(),
714 ),
715 })?;
716 cmd_channel.send(&Event::Warning {
717 message: format!(
718 "message queued while agent working (depth: {depth})"
719 ),
720 idle_secs: None,
721 })?;
722 } else {
723 st.message_queue.push_back(QueuedMessage {
724 from,
725 body,
726 message_id,
727 });
728 let depth = st.message_queue.len();
729 drop(st);
730
731 cmd_channel.send(&Event::Warning {
732 message: format!(
733 "message queued while agent working (depth: {depth})"
734 ),
735 idle_secs: None,
736 })?;
737 }
738 }
739 other => {
740 drop(st);
741 cmd_channel.send(&Event::Error {
742 command: "SendMessage".into(),
743 reason: format!("agent in {other} state, cannot accept message"),
744 })?;
745 }
746 }
747 }
748
749 ShimCommand::CaptureScreen { last_n_lines } => {
750 let st = state_cmd.lock().unwrap();
751 let content = match last_n_lines {
752 Some(n) => last_n_lines_of(&st.accumulated_response, n),
753 None => st.accumulated_response.clone(),
754 };
755 drop(st);
756 cmd_channel.send(&Event::ScreenCapture {
757 content,
758 cursor_row: 0,
759 cursor_col: 0,
760 })?;
761 }
762
763 ShimCommand::GetState => {
764 let st = state_cmd.lock().unwrap();
765 let since = st.state_changed_at.elapsed().as_secs();
766 let state = st.state;
767 drop(st);
768 cmd_channel.send(&Event::State {
769 state,
770 since_secs: since,
771 })?;
772 }
773
774 ShimCommand::Resize { .. } => {
775 }
777
778 ShimCommand::Ping => {
779 cmd_channel.send(&Event::Pong)?;
780 }
781
782 ShimCommand::Shutdown {
783 timeout_secs,
784 reason,
785 } => {
786 eprintln!(
787 "[shim-kiro {}] shutdown requested ({}, timeout: {}s)",
788 args.id,
789 reason.label(),
790 timeout_secs
791 );
792 if let Err(error) = super::runtime::preserve_work_before_kill(&args.cwd) {
793 eprintln!("[shim-kiro {}] work preservation failed: {error}", args.id);
794 }
795 if let Ok(mut guard) = stdin_writer.lock() {
799 guard.take(); }
801 terminate_child(&mut child);
802
803 let deadline = Instant::now() + Duration::from_secs(timeout_secs as u64);
805 loop {
806 if Instant::now() > deadline {
807 break;
808 }
809 match child.try_wait() {
810 Ok(Some(_)) => break,
811 _ => thread::sleep(Duration::from_millis(PROCESS_EXIT_POLL_MS)),
812 }
813 }
814 break;
815 }
816
817 ShimCommand::Kill => {
818 if let Err(error) = super::runtime::preserve_work_before_kill(&args.cwd) {
819 eprintln!("[shim-kiro {}] work preservation failed: {error}", args.id);
820 }
821 terminate_child(&mut child);
822 break;
823 }
824 }
825 }
826
827 stdout_handle.join().ok();
828 Ok(())
829}
830
831fn write_stdin(stdin: &Arc<Mutex<Option<std::process::ChildStdin>>>, line: &str) -> bool {
837 if let Ok(mut guard) = stdin.lock() {
838 if let Some(ref mut writer) = *guard {
839 if writeln!(writer, "{line}").is_ok() {
840 let _ = writer.flush();
841 return true;
842 }
843 }
844 }
845 false
846}
847
848fn terminate_child(child: &mut Child) {
850 let pid = child.id();
851
852 #[cfg(unix)]
853 {
854 unsafe {
855 libc::kill(pid as i32, libc::SIGTERM);
856 }
857 let deadline = Instant::now() + Duration::from_secs(GROUP_TERM_GRACE_SECS);
858 loop {
859 if Instant::now() > deadline {
860 break;
861 }
862 match child.try_wait() {
863 Ok(Some(_)) => return,
864 _ => thread::sleep(Duration::from_millis(PROCESS_EXIT_POLL_MS)),
865 }
866 }
867 unsafe {
868 libc::kill(pid as i32, libc::SIGKILL);
869 }
870 }
871
872 #[allow(unreachable_code)]
873 {
874 let _ = child.kill();
875 }
876}
877
878fn last_n_lines_of(text: &str, n: usize) -> String {
880 let lines: Vec<&str> = text.lines().collect();
881 let start = lines.len().saturating_sub(n);
882 lines[start..].join("\n")
883}
884
885#[cfg(test)]
890mod tests {
891 use super::*;
892 use crate::shim::protocol;
893
894 #[test]
895 fn last_n_lines_basic() {
896 let text = "a\nb\nc\nd\ne";
897 assert_eq!(last_n_lines_of(text, 3), "c\nd\ne");
898 assert_eq!(last_n_lines_of(text, 10), "a\nb\nc\nd\ne");
899 assert_eq!(last_n_lines_of(text, 0), "");
900 }
901
902 #[test]
903 fn last_n_lines_empty() {
904 assert_eq!(last_n_lines_of("", 5), "");
905 }
906
907 #[test]
908 fn kiro_state_initial_values() {
909 let st = KiroState {
910 state: ShimState::Starting,
911 state_changed_at: Instant::now(),
912 started_at: Instant::now(),
913 session_id: String::new(),
914 accumulated_response: String::new(),
915 pending_message_id: None,
916 message_queue: VecDeque::new(),
917 cumulative_output_bytes: 0,
918 initialized: false,
919 sent_session_new: false,
920 pending_prompt_request_id: None,
921 context_approaching_emitted: false,
922 };
923 assert_eq!(st.state, ShimState::Starting);
924 assert!(st.session_id.is_empty());
925 assert!(!st.initialized);
926 assert!(!st.sent_session_new);
927 assert!(st.message_queue.is_empty());
928 }
929
930 #[test]
931 fn channel_round_trip_events() {
932 let (parent_sock, child_sock) = protocol::socketpair().unwrap();
933 let mut parent = protocol::Channel::new(parent_sock);
934 let mut child = protocol::Channel::new(child_sock);
935
936 child.send(&Event::Ready).unwrap();
937 let event: Event = parent.recv().unwrap().unwrap();
938 assert!(matches!(event, Event::Ready));
939
940 child
941 .send(&Event::Completion {
942 message_id: Some("m1".into()),
943 response: "done".into(),
944 last_lines: "done".into(),
945 })
946 .unwrap();
947 let event: Event = parent.recv().unwrap().unwrap();
948 match event {
949 Event::Completion {
950 message_id,
951 response,
952 ..
953 } => {
954 assert_eq!(message_id.as_deref(), Some("m1"));
955 assert_eq!(response, "done");
956 }
957 _ => panic!("expected Completion"),
958 }
959 }
960
961 #[test]
962 fn context_exhaustion_threshold() {
963 let threshold = CONTEXT_EXHAUSTION_THRESHOLD;
964 assert!(threshold >= 95.0);
965 assert!(threshold <= 100.0);
966 }
967
968 #[test]
969 fn next_request_id_increments() {
970 let a = next_request_id();
971 let b = next_request_id();
972 assert!(b > a);
973 }
974}