1use std::collections::VecDeque;
13use std::io::{BufRead, BufReader, Write as IoWrite};
14use std::process::{Child, Command, Stdio};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Mutex};
17use std::thread;
18use std::time::{Duration, Instant};
19
20use anyhow::{Context, Result};
21
22use super::common::{
23 self, MAX_QUEUE_DEPTH, QueuedMessage, SESSION_STATS_INTERVAL_SECS, drain_queue_errors,
24 format_injected_message,
25};
26use super::kiro_types::{self, AcpMessage};
27use super::protocol::{Channel, Command as ShimCommand, Event, ShimState};
28use super::pty_log::PtyLogWriter;
29use super::runtime::ShimArgs;
30
31const PROCESS_EXIT_POLL_MS: u64 = 100;
36const GROUP_TERM_GRACE_SECS: u64 = 2;
37
38const INIT_TIMEOUT_SECS: u64 = 30;
40
41const CONTEXT_EXHAUSTION_THRESHOLD: f64 = 98.0;
43
44struct KiroState {
49 state: ShimState,
50 state_changed_at: Instant,
51 started_at: Instant,
52 session_id: String,
54 accumulated_response: String,
56 pending_message_id: Option<String>,
58 message_queue: VecDeque<QueuedMessage>,
60 cumulative_output_bytes: u64,
62 initialized: bool,
64 sent_session_new: bool,
66 pending_prompt_request_id: Option<u64>,
68}
69
70static REQUEST_ID: AtomicU64 = AtomicU64::new(0);
72
73fn next_request_id() -> u64 {
74 REQUEST_ID.fetch_add(1, Ordering::Relaxed)
75}
76
77pub fn run_kiro_acp(args: ShimArgs, channel: Channel) -> Result<()> {
86 let mut child = Command::new("bash")
88 .args(["-lc", &args.cmd])
89 .current_dir(&args.cwd)
90 .stdin(Stdio::piped())
91 .stdout(Stdio::piped())
92 .stderr(Stdio::piped())
93 .env_remove("CLAUDECODE")
94 .spawn()
95 .with_context(|| format!("[shim-kiro {}] failed to spawn kiro-cli acp", args.id))?;
96
97 let child_pid = child.id();
98 eprintln!(
99 "[shim-kiro {}] spawned kiro-cli acp (pid {})",
100 args.id, child_pid
101 );
102
103 let child_stdin = child.stdin.take().context("failed to take child stdin")?;
104 let child_stdout = child.stdout.take().context("failed to take child stdout")?;
105 let child_stderr = child.stderr.take().context("failed to take child stderr")?;
106
107 let state = Arc::new(Mutex::new(KiroState {
109 state: ShimState::Starting,
110 state_changed_at: Instant::now(),
111 started_at: Instant::now(),
112 session_id: String::new(),
113 accumulated_response: String::new(),
114 pending_message_id: None,
115 message_queue: VecDeque::new(),
116 cumulative_output_bytes: 0,
117 initialized: false,
118 sent_session_new: false,
119 pending_prompt_request_id: None,
120 }));
121
122 let stdin_writer = Arc::new(Mutex::new(Some(child_stdin)));
124
125 let pty_log: Option<Arc<Mutex<PtyLogWriter>>> = args
127 .pty_log_path
128 .as_deref()
129 .map(|p| PtyLogWriter::new(p).context("failed to create PTY log"))
130 .transpose()?
131 .map(|w| Arc::new(Mutex::new(w)));
132
133 let mut cmd_channel = channel;
135 let mut evt_channel = cmd_channel
136 .try_clone()
137 .context("failed to clone channel for stdout reader")?;
138
139 {
141 let init_req = kiro_types::initialize_request(next_request_id());
142 let ndjson = init_req.to_ndjson();
143 write_stdin(&stdin_writer, &ndjson);
144 eprintln!("[shim-kiro {}] sent initialize request", args.id);
145 }
146
147 let state_stdout = Arc::clone(&state);
149 let stdin_for_approve = Arc::clone(&stdin_writer);
150 let pty_log_stdout = pty_log.clone();
151 let shim_id = args.id.clone();
152 let cwd_for_init = args.cwd.to_string_lossy().to_string();
153 let stdout_handle = thread::spawn(move || {
154 let reader = BufReader::new(child_stdout);
155 for line_result in reader.lines() {
156 let line = match line_result {
157 Ok(l) => l,
158 Err(e) => {
159 eprintln!("[shim-kiro {shim_id}] stdout read error: {e}");
160 break;
161 }
162 };
163
164 if line.trim().is_empty() {
165 continue;
166 }
167
168 let msg: AcpMessage = match serde_json::from_str(&line) {
169 Ok(m) => m,
170 Err(e) => {
171 eprintln!("[shim-kiro {shim_id}] ignoring unparseable NDJSON: {e}");
172 continue;
173 }
174 };
175
176 if msg.is_response() {
178 let msg_id = msg.id.unwrap();
179
180 if let Some(ref error) = msg.error {
181 eprintln!("[shim-kiro {shim_id}] JSON-RPC error (id={msg_id}): {error}");
182 let mut st = state_stdout.lock().unwrap();
184 if st.pending_prompt_request_id == Some(msg_id) {
185 st.pending_prompt_request_id = None;
186 let error_text = error
187 .get("message")
188 .and_then(|m| m.as_str())
189 .unwrap_or("unknown error");
190
191 if common::detect_context_exhausted(error_text) {
192 let last_lines = last_n_lines_of(&st.accumulated_response, 5);
193 let old = st.state;
194 st.state = ShimState::ContextExhausted;
195 st.state_changed_at = Instant::now();
196 let drain = drain_queue_errors(
197 &mut st.message_queue,
198 ShimState::ContextExhausted,
199 );
200 drop(st);
201 let _ = evt_channel.send(&Event::StateChanged {
202 from: old,
203 to: ShimState::ContextExhausted,
204 summary: last_lines.clone(),
205 });
206 let _ = evt_channel.send(&Event::ContextExhausted {
207 message: error_text.to_string(),
208 last_lines,
209 });
210 for event in drain {
211 let _ = evt_channel.send(&event);
212 }
213 } else {
214 let response = std::mem::take(&mut st.accumulated_response);
216 let last_lines = last_n_lines_of(&response, 5);
217 let msg_id_out = st.pending_message_id.take();
218 st.state = ShimState::Idle;
219 st.state_changed_at = Instant::now();
220 drop(st);
221 let _ = evt_channel.send(&Event::StateChanged {
222 from: ShimState::Working,
223 to: ShimState::Idle,
224 summary: last_lines.clone(),
225 });
226 let _ = evt_channel.send(&Event::Completion {
227 message_id: msg_id_out,
228 response: format!("[error] {error_text}"),
229 last_lines,
230 });
231 }
232 }
233 continue;
234 }
235
236 if let Some(ref result) = msg.result {
237 let mut st = state_stdout.lock().unwrap();
238
239 if !st.initialized && !st.sent_session_new {
241 st.sent_session_new = true;
243 drop(st);
244 let session_req =
245 kiro_types::session_new_request(next_request_id(), &cwd_for_init);
246 let ndjson = session_req.to_ndjson();
247 write_stdin(&stdin_for_approve, &ndjson);
248 eprintln!("[shim-kiro {shim_id}] sent session/new request");
249 continue;
250 }
251
252 if !st.initialized {
254 if let Some(sid) = kiro_types::extract_session_id(result) {
255 st.session_id = sid.to_string();
256 st.initialized = true;
257 st.state = ShimState::Idle;
258 st.state_changed_at = Instant::now();
259 eprintln!("[shim-kiro {shim_id}] session created: {}", st.session_id);
260 drop(st);
261
262 let _ = evt_channel.send(&Event::Ready);
264 }
265 continue;
266 }
267
268 if st.pending_prompt_request_id == Some(msg_id) {
270 st.pending_prompt_request_id = None;
271 let response = if st.accumulated_response.is_empty() {
272 result
273 .get("result")
274 .and_then(|r| r.as_str())
275 .unwrap_or("")
276 .to_string()
277 } else {
278 std::mem::take(&mut st.accumulated_response)
279 };
280 let last_lines = last_n_lines_of(&response, 5);
281 let completed_msg_id = st.pending_message_id.take();
282 let old = st.state;
283 st.state = ShimState::Idle;
284 st.state_changed_at = Instant::now();
285
286 let queued_msg = if !st.message_queue.is_empty() {
288 st.message_queue.pop_front()
289 } else {
290 None
291 };
292 if let Some(ref qm) = queued_msg {
293 st.pending_message_id = qm.message_id.clone();
294 st.state = ShimState::Working;
295 st.state_changed_at = Instant::now();
296 st.accumulated_response.clear();
297 }
298 let session_id = st.session_id.clone();
299 let queue_depth = st.message_queue.len();
300 drop(st);
301
302 let _ = evt_channel.send(&Event::StateChanged {
303 from: old,
304 to: ShimState::Idle,
305 summary: last_lines.clone(),
306 });
307 let _ = evt_channel.send(&Event::Completion {
308 message_id: completed_msg_id,
309 response,
310 last_lines,
311 });
312
313 if let Some(qm) = queued_msg {
315 let text = format_injected_message(&qm.from, &qm.body);
316 let req_id = next_request_id();
317 let prompt_req =
318 kiro_types::session_prompt_request(req_id, &session_id, &text);
319 let ndjson = prompt_req.to_ndjson();
320 write_stdin(&stdin_for_approve, &ndjson);
321 let mut st = state_stdout.lock().unwrap();
322 st.pending_prompt_request_id = Some(req_id);
323 drop(st);
324
325 let _ = evt_channel.send(&Event::StateChanged {
326 from: ShimState::Idle,
327 to: ShimState::Working,
328 summary: format!(
329 "delivering queued message ({queue_depth} remaining)"
330 ),
331 });
332 }
333 }
334 }
335 continue;
336 }
337
338 if msg.is_notification() {
340 let method = msg.method.as_deref().unwrap_or("");
341 let params = msg.params.as_ref();
342
343 match method {
344 "session/update" => {
345 if let Some(params) = params {
346 let update_type = kiro_types::extract_update_type(params).unwrap_or("");
347
348 match update_type {
349 "agent_message_chunk" | "AgentMessageChunk" => {
350 if let Some(text) =
351 kiro_types::extract_message_chunk_text(params)
352 {
353 if !text.is_empty() {
354 let mut st = state_stdout.lock().unwrap();
355 st.accumulated_response.push_str(text);
356 st.cumulative_output_bytes += text.len() as u64;
357 drop(st);
358
359 if let Some(ref log) = pty_log_stdout {
360 let _ = log.lock().unwrap().write(text.as_bytes());
361 }
362 }
363 }
364 }
365
366 "agent_thought_chunk" => {
367 if let Some(text) =
369 kiro_types::extract_message_chunk_text(params)
370 {
371 if let Some(ref log) = pty_log_stdout {
372 let _ = log
373 .lock()
374 .unwrap()
375 .write(format!("[thought] {text}").as_bytes());
376 }
377 }
378 }
379
380 "tool_call" | "ToolCall" => {
381 let title = params
383 .get("update")
384 .and_then(|u| u.get("title"))
385 .and_then(|t| t.as_str())
386 .unwrap_or("unknown tool");
387 if let Some(ref log) = pty_log_stdout {
388 let _ = log
389 .lock()
390 .unwrap()
391 .write(format!("[tool] {title}\n").as_bytes());
392 }
393 }
394
395 "tool_call_update" | "ToolCallUpdate" => {
396 }
398
399 "TurnEnd" | "turn_end" => {
400 }
405
406 _ => {
407 }
409 }
410 }
411 }
412
413 "_kiro.dev/metadata" => {
414 if let Some(params) = params {
416 if let Some(usage) = kiro_types::extract_context_usage(params) {
417 if usage >= CONTEXT_EXHAUSTION_THRESHOLD {
418 let mut st = state_stdout.lock().unwrap();
419 let last_lines = last_n_lines_of(&st.accumulated_response, 5);
420 let old = st.state;
421 st.state = ShimState::ContextExhausted;
422 st.state_changed_at = Instant::now();
423 let drain = drain_queue_errors(
424 &mut st.message_queue,
425 ShimState::ContextExhausted,
426 );
427 drop(st);
428
429 let _ = evt_channel.send(&Event::StateChanged {
430 from: old,
431 to: ShimState::ContextExhausted,
432 summary: last_lines.clone(),
433 });
434 let _ = evt_channel.send(&Event::ContextExhausted {
435 message: format!("context usage at {usage:.1}%"),
436 last_lines,
437 });
438 for event in drain {
439 let _ = evt_channel.send(&event);
440 }
441 }
442 }
443 }
444 }
445
446 "_kiro.dev/compaction/status" | "_kiro.dev/clear/status" => {
447 eprintln!("[shim-kiro {shim_id}] {method}: {params:?}");
449 }
450
451 _ => {
452 }
454 }
455 continue;
456 }
457
458 if msg.is_agent_request() {
460 let method = msg.method.as_deref().unwrap_or("");
461 let request_id = msg.id.unwrap();
462
463 match method {
464 "session/request_permission" => {
465 let resp = kiro_types::permission_approve_response(request_id);
467 let ndjson = resp.to_ndjson();
468 write_stdin(&stdin_for_approve, &ndjson);
469 eprintln!(
470 "[shim-kiro {shim_id}] auto-approved permission request {request_id}"
471 );
472 }
473
474 "fs/read_text_file" | "fs/write_text_file" | "terminal/create"
475 | "terminal/kill" => {
476 let error_resp = serde_json::json!({
478 "jsonrpc": "2.0",
479 "id": request_id,
480 "error": {
481 "code": -32601,
482 "message": format!("method not supported by batty shim: {method}")
483 }
484 });
485 let ndjson = serde_json::to_string(&error_resp).unwrap();
486 write_stdin(&stdin_for_approve, &ndjson);
487 }
488
489 _ => {
490 let error_resp = serde_json::json!({
492 "jsonrpc": "2.0",
493 "id": request_id,
494 "error": {
495 "code": -32601,
496 "message": format!("unknown method: {method}")
497 }
498 });
499 let ndjson = serde_json::to_string(&error_resp).unwrap();
500 write_stdin(&stdin_for_approve, &ndjson);
501 }
502 }
503 continue;
504 }
505 }
506
507 let mut st = state_stdout.lock().unwrap();
509 let last_lines = last_n_lines_of(&st.accumulated_response, 10);
510 let old = st.state;
511 st.state = ShimState::Dead;
512 st.state_changed_at = Instant::now();
513
514 let drain = drain_queue_errors(&mut st.message_queue, ShimState::Dead);
515 drop(st);
516
517 let _ = evt_channel.send(&Event::StateChanged {
518 from: old,
519 to: ShimState::Dead,
520 summary: last_lines.clone(),
521 });
522 let _ = evt_channel.send(&Event::Died {
523 exit_code: None,
524 last_lines,
525 });
526 for event in drain {
527 let _ = evt_channel.send(&event);
528 }
529 });
530
531 let shim_id_err = args.id.clone();
533 let pty_log_stderr = pty_log;
534 thread::spawn(move || {
535 let reader = BufReader::new(child_stderr);
536 for line_result in reader.lines() {
537 match line_result {
538 Ok(line) => {
539 eprintln!("[shim-kiro {shim_id_err}] stderr: {line}");
540 if let Some(ref log) = pty_log_stderr {
541 let _ = log
542 .lock()
543 .unwrap()
544 .write(format!("[stderr] {line}\n").as_bytes());
545 }
546 }
547 Err(_) => break,
548 }
549 }
550 });
551
552 let state_stats = Arc::clone(&state);
554 let mut stats_channel = cmd_channel
555 .try_clone()
556 .context("failed to clone channel for stats")?;
557 thread::spawn(move || {
558 loop {
559 thread::sleep(Duration::from_secs(SESSION_STATS_INTERVAL_SECS));
560 let st = state_stats.lock().unwrap();
561 if st.state == ShimState::Dead {
562 return;
563 }
564 let output_bytes = st.cumulative_output_bytes;
565 let uptime_secs = st.started_at.elapsed().as_secs();
566 drop(st);
567
568 if stats_channel
569 .send(&Event::SessionStats {
570 output_bytes,
571 uptime_secs,
572 })
573 .is_err()
574 {
575 return;
576 }
577 }
578 });
579
580 {
582 let deadline = Instant::now() + Duration::from_secs(INIT_TIMEOUT_SECS);
583 loop {
584 let st = state.lock().unwrap();
585 if st.initialized {
586 break;
587 }
588 if st.state == ShimState::Dead {
589 eprintln!("[shim-kiro {}] agent died during initialization", args.id);
590 return Ok(());
591 }
592 drop(st);
593
594 if Instant::now() > deadline {
595 eprintln!(
596 "[shim-kiro {}] initialization timed out after {}s",
597 args.id, INIT_TIMEOUT_SECS
598 );
599 terminate_child(&mut child);
600 return Ok(());
601 }
602 thread::sleep(Duration::from_millis(PROCESS_EXIT_POLL_MS));
603 }
604 }
605
606 let state_cmd = Arc::clone(&state);
608 loop {
609 let cmd = match cmd_channel.recv::<ShimCommand>() {
610 Ok(Some(c)) => c,
611 Ok(None) => {
612 eprintln!(
613 "[shim-kiro {}] orchestrator disconnected, shutting down",
614 args.id
615 );
616 terminate_child(&mut child);
617 break;
618 }
619 Err(e) => {
620 eprintln!("[shim-kiro {}] channel error: {e}", args.id);
621 terminate_child(&mut child);
622 break;
623 }
624 };
625
626 match cmd {
627 ShimCommand::SendMessage {
628 from,
629 body,
630 message_id,
631 } => {
632 let mut st = state_cmd.lock().unwrap();
633 match st.state {
634 ShimState::Idle => {
635 st.pending_message_id = message_id;
636 st.accumulated_response.clear();
637 let session_id = st.session_id.clone();
638 st.state = ShimState::Working;
639 st.state_changed_at = Instant::now();
640
641 let req_id = next_request_id();
642 st.pending_prompt_request_id = Some(req_id);
643 drop(st);
644
645 let text = format_injected_message(&from, &body);
646 let prompt_req =
647 kiro_types::session_prompt_request(req_id, &session_id, &text);
648 let ndjson = prompt_req.to_ndjson();
649
650 if !write_stdin(&stdin_writer, &ndjson) {
651 cmd_channel.send(&Event::Error {
652 command: "SendMessage".into(),
653 reason: "stdin write failed (closed)".into(),
654 })?;
655 continue;
656 }
657
658 cmd_channel.send(&Event::StateChanged {
659 from: ShimState::Idle,
660 to: ShimState::Working,
661 summary: String::new(),
662 })?;
663 }
664 ShimState::Working => {
665 if st.message_queue.len() >= MAX_QUEUE_DEPTH {
667 let dropped = st.message_queue.pop_front();
668 let dropped_id = dropped.as_ref().and_then(|m| m.message_id.clone());
669 st.message_queue.push_back(QueuedMessage {
670 from,
671 body,
672 message_id,
673 });
674 let depth = st.message_queue.len();
675 drop(st);
676
677 cmd_channel.send(&Event::Error {
678 command: "SendMessage".into(),
679 reason: format!(
680 "message queue full ({MAX_QUEUE_DEPTH}), dropped oldest message{}",
681 dropped_id
682 .map(|id| format!(" (id: {id})"))
683 .unwrap_or_default(),
684 ),
685 })?;
686 cmd_channel.send(&Event::Warning {
687 message: format!(
688 "message queued while agent working (depth: {depth})"
689 ),
690 idle_secs: None,
691 })?;
692 } else {
693 st.message_queue.push_back(QueuedMessage {
694 from,
695 body,
696 message_id,
697 });
698 let depth = st.message_queue.len();
699 drop(st);
700
701 cmd_channel.send(&Event::Warning {
702 message: format!(
703 "message queued while agent working (depth: {depth})"
704 ),
705 idle_secs: None,
706 })?;
707 }
708 }
709 other => {
710 drop(st);
711 cmd_channel.send(&Event::Error {
712 command: "SendMessage".into(),
713 reason: format!("agent in {other} state, cannot accept message"),
714 })?;
715 }
716 }
717 }
718
719 ShimCommand::CaptureScreen { last_n_lines } => {
720 let st = state_cmd.lock().unwrap();
721 let content = match last_n_lines {
722 Some(n) => last_n_lines_of(&st.accumulated_response, n),
723 None => st.accumulated_response.clone(),
724 };
725 drop(st);
726 cmd_channel.send(&Event::ScreenCapture {
727 content,
728 cursor_row: 0,
729 cursor_col: 0,
730 })?;
731 }
732
733 ShimCommand::GetState => {
734 let st = state_cmd.lock().unwrap();
735 let since = st.state_changed_at.elapsed().as_secs();
736 let state = st.state;
737 drop(st);
738 cmd_channel.send(&Event::State {
739 state,
740 since_secs: since,
741 })?;
742 }
743
744 ShimCommand::Resize { .. } => {
745 }
747
748 ShimCommand::Ping => {
749 cmd_channel.send(&Event::Pong)?;
750 }
751
752 ShimCommand::Shutdown { timeout_secs } => {
753 eprintln!(
754 "[shim-kiro {}] shutdown requested (timeout: {}s)",
755 args.id, timeout_secs
756 );
757 if let Ok(mut guard) = stdin_writer.lock() {
761 guard.take(); }
763 terminate_child(&mut child);
764
765 let deadline = Instant::now() + Duration::from_secs(timeout_secs as u64);
767 loop {
768 if Instant::now() > deadline {
769 break;
770 }
771 match child.try_wait() {
772 Ok(Some(_)) => break,
773 _ => thread::sleep(Duration::from_millis(PROCESS_EXIT_POLL_MS)),
774 }
775 }
776 break;
777 }
778
779 ShimCommand::Kill => {
780 terminate_child(&mut child);
781 break;
782 }
783 }
784 }
785
786 stdout_handle.join().ok();
787 Ok(())
788}
789
790fn write_stdin(stdin: &Arc<Mutex<Option<std::process::ChildStdin>>>, line: &str) -> bool {
796 if let Ok(mut guard) = stdin.lock() {
797 if let Some(ref mut writer) = *guard {
798 if writeln!(writer, "{line}").is_ok() {
799 let _ = writer.flush();
800 return true;
801 }
802 }
803 }
804 false
805}
806
807fn terminate_child(child: &mut Child) {
809 let pid = child.id();
810
811 #[cfg(unix)]
812 {
813 unsafe {
814 libc::kill(pid as i32, libc::SIGTERM);
815 }
816 let deadline = Instant::now() + Duration::from_secs(GROUP_TERM_GRACE_SECS);
817 loop {
818 if Instant::now() > deadline {
819 break;
820 }
821 match child.try_wait() {
822 Ok(Some(_)) => return,
823 _ => thread::sleep(Duration::from_millis(PROCESS_EXIT_POLL_MS)),
824 }
825 }
826 unsafe {
827 libc::kill(pid as i32, libc::SIGKILL);
828 }
829 }
830
831 #[allow(unreachable_code)]
832 {
833 let _ = child.kill();
834 }
835}
836
837fn last_n_lines_of(text: &str, n: usize) -> String {
839 let lines: Vec<&str> = text.lines().collect();
840 let start = lines.len().saturating_sub(n);
841 lines[start..].join("\n")
842}
843
844#[cfg(test)]
849mod tests {
850 use super::*;
851 use crate::shim::protocol;
852
853 #[test]
854 fn last_n_lines_basic() {
855 let text = "a\nb\nc\nd\ne";
856 assert_eq!(last_n_lines_of(text, 3), "c\nd\ne");
857 assert_eq!(last_n_lines_of(text, 10), "a\nb\nc\nd\ne");
858 assert_eq!(last_n_lines_of(text, 0), "");
859 }
860
861 #[test]
862 fn last_n_lines_empty() {
863 assert_eq!(last_n_lines_of("", 5), "");
864 }
865
866 #[test]
867 fn kiro_state_initial_values() {
868 let st = KiroState {
869 state: ShimState::Starting,
870 state_changed_at: Instant::now(),
871 started_at: Instant::now(),
872 session_id: String::new(),
873 accumulated_response: String::new(),
874 pending_message_id: None,
875 message_queue: VecDeque::new(),
876 cumulative_output_bytes: 0,
877 initialized: false,
878 sent_session_new: false,
879 pending_prompt_request_id: None,
880 };
881 assert_eq!(st.state, ShimState::Starting);
882 assert!(st.session_id.is_empty());
883 assert!(!st.initialized);
884 assert!(!st.sent_session_new);
885 assert!(st.message_queue.is_empty());
886 }
887
888 #[test]
889 fn channel_round_trip_events() {
890 let (parent_sock, child_sock) = protocol::socketpair().unwrap();
891 let mut parent = protocol::Channel::new(parent_sock);
892 let mut child = protocol::Channel::new(child_sock);
893
894 child.send(&Event::Ready).unwrap();
895 let event: Event = parent.recv().unwrap().unwrap();
896 assert!(matches!(event, Event::Ready));
897
898 child
899 .send(&Event::Completion {
900 message_id: Some("m1".into()),
901 response: "done".into(),
902 last_lines: "done".into(),
903 })
904 .unwrap();
905 let event: Event = parent.recv().unwrap().unwrap();
906 match event {
907 Event::Completion {
908 message_id,
909 response,
910 ..
911 } => {
912 assert_eq!(message_id.as_deref(), Some("m1"));
913 assert_eq!(response, "done");
914 }
915 _ => panic!("expected Completion"),
916 }
917 }
918
919 #[test]
920 fn context_exhaustion_threshold() {
921 let threshold = CONTEXT_EXHAUSTION_THRESHOLD;
922 assert!(threshold >= 95.0);
923 assert!(threshold <= 100.0);
924 }
925
926 #[test]
927 fn next_request_id_increments() {
928 let a = next_request_id();
929 let b = next_request_id();
930 assert!(b > a);
931 }
932}