1use serde::{Deserialize, Serialize};
7use std::io::{self, Read, Write};
8use std::os::unix::net::UnixStream;
9
10#[derive(Debug, Serialize, Deserialize)]
15#[serde(tag = "cmd")]
16pub enum Command {
17 SendMessage {
18 from: String,
19 body: String,
20 #[serde(skip_serializing_if = "Option::is_none")]
21 message_id: Option<String>,
22 },
23 CaptureScreen {
24 last_n_lines: Option<usize>,
25 },
26 GetState,
27 Resize {
28 rows: u16,
29 cols: u16,
30 },
31 Shutdown {
32 timeout_secs: u32,
33 #[serde(default)]
34 reason: ShutdownReason,
35 },
36 Kill,
37 Ping,
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
41#[serde(rename_all = "snake_case")]
42pub enum ShutdownReason {
43 #[default]
44 Requested,
45 RestartHandoff,
46 ContextExhausted,
47 TopologyChange,
48 DaemonStop,
49}
50
51impl ShutdownReason {
52 pub fn label(self) -> &'static str {
53 match self {
54 Self::Requested => "requested",
55 Self::RestartHandoff => "restart_handoff",
56 Self::ContextExhausted => "context_exhausted",
57 Self::TopologyChange => "topology_change",
58 Self::DaemonStop => "daemon_stop",
59 }
60 }
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
68#[serde(tag = "event")]
69pub enum Event {
70 Ready,
71 StateChanged {
72 from: ShimState,
73 to: ShimState,
74 summary: String,
75 },
76 MessageDelivered {
77 id: String,
78 },
79 Completion {
80 #[serde(skip_serializing_if = "Option::is_none")]
81 message_id: Option<String>,
82 response: String,
83 last_lines: String,
84 },
85 Died {
86 exit_code: Option<i32>,
87 last_lines: String,
88 },
89 ContextExhausted {
90 message: String,
91 last_lines: String,
92 },
93 ContextWarning {
94 model: Option<String>,
95 output_bytes: u64,
96 uptime_secs: u64,
97 input_tokens: u64,
98 cached_input_tokens: u64,
99 cache_creation_input_tokens: u64,
100 cache_read_input_tokens: u64,
101 output_tokens: u64,
102 reasoning_output_tokens: u64,
103 used_tokens: u64,
104 context_limit_tokens: u64,
105 usage_pct: u8,
106 },
107 ContextApproaching {
108 message: String,
109 input_tokens: u64,
110 output_tokens: u64,
111 },
112 QuotaBlocked {
113 message: String,
114 #[serde(default, skip_serializing_if = "Option::is_none")]
115 retry_at_epoch_secs: Option<u64>,
116 #[serde(default, skip_serializing_if = "Option::is_none")]
117 retry_at_label: Option<String>,
118 },
119 ScreenCapture {
120 content: String,
121 cursor_row: u16,
122 cursor_col: u16,
123 },
124 State {
125 state: ShimState,
126 since_secs: u64,
127 },
128 SessionStats {
129 output_bytes: u64,
130 uptime_secs: u64,
131 #[serde(default)]
132 input_tokens: u64,
133 #[serde(default)]
134 output_tokens: u64,
135 #[serde(default, skip_serializing_if = "Option::is_none")]
136 context_usage_pct: Option<u8>,
137 },
138 Pong,
139 Warning {
140 message: String,
141 idle_secs: Option<u64>,
142 },
143 DeliveryFailed {
144 id: String,
145 reason: String,
146 },
147 Error {
148 command: String,
149 reason: String,
150 },
151}
152
153#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub enum ShimState {
160 Starting,
161 Idle,
162 Working,
163 Dead,
164 ContextExhausted,
165}
166
167impl std::fmt::Display for ShimState {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 match self {
170 Self::Starting => write!(f, "starting"),
171 Self::Idle => write!(f, "idle"),
172 Self::Working => write!(f, "working"),
173 Self::Dead => write!(f, "dead"),
174 Self::ContextExhausted => write!(f, "context_exhausted"),
175 }
176 }
177}
178
179pub struct Channel {
187 stream: UnixStream,
188 read_buf: Vec<u8>,
189}
190
191const MAX_MSG: usize = 1_048_576; impl Channel {
194 pub fn new(stream: UnixStream) -> Self {
195 Self {
196 stream,
197 read_buf: vec![0u8; 4096],
198 }
199 }
200
201 pub fn send<T: Serialize>(&mut self, msg: &T) -> anyhow::Result<()> {
203 let json = serde_json::to_vec(msg)?;
204 if json.len() > MAX_MSG {
205 anyhow::bail!("message too large: {} bytes", json.len());
206 }
207 let len = (json.len() as u32).to_be_bytes();
208 self.stream.write_all(&len)?;
209 self.stream.write_all(&json)?;
210 self.stream.flush()?;
211 Ok(())
212 }
213
214 pub fn recv<T: for<'de> Deserialize<'de>>(&mut self) -> anyhow::Result<Option<T>> {
217 let mut len_buf = [0u8; 4];
218 match self.stream.read_exact(&mut len_buf) {
219 Ok(()) => {}
220 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
221 Err(e) => return Err(e.into()),
222 }
223 let len = u32::from_be_bytes(len_buf) as usize;
224 if len > MAX_MSG {
225 anyhow::bail!("incoming message too large: {} bytes", len);
226 }
227 if self.read_buf.len() < len {
228 self.read_buf.resize(len, 0);
229 }
230 self.stream.read_exact(&mut self.read_buf[..len])?;
231 let msg = serde_json::from_slice(&self.read_buf[..len])?;
232 Ok(Some(msg))
233 }
234
235 pub fn set_read_timeout(&mut self, timeout: Option<std::time::Duration>) -> anyhow::Result<()> {
239 self.stream.set_read_timeout(timeout)?;
240 Ok(())
241 }
242
243 pub fn set_write_timeout(
255 &mut self,
256 timeout: Option<std::time::Duration>,
257 ) -> anyhow::Result<()> {
258 self.stream.set_write_timeout(timeout)?;
259 Ok(())
260 }
261
262 pub fn try_clone(&self) -> anyhow::Result<Self> {
264 Ok(Self {
265 stream: self.stream.try_clone()?,
266 read_buf: vec![0u8; 4096],
267 })
268 }
269}
270
271pub fn socketpair() -> anyhow::Result<(UnixStream, UnixStream)> {
278 let (a, b) = UnixStream::pair()?;
279 Ok((a, b))
280}
281
282#[cfg(test)]
287mod tests {
288 use super::*;
289
290 #[test]
291 fn roundtrip_command_send_message() {
292 let (a, b) = socketpair().unwrap();
293 let mut sender = Channel::new(a);
294 let mut receiver = Channel::new(b);
295
296 let cmd = Command::SendMessage {
297 from: "user".into(),
298 body: "say hello".into(),
299 message_id: Some("msg-1".into()),
300 };
301 sender.send(&cmd).unwrap();
302 let received: Command = receiver.recv::<Command>().unwrap().unwrap();
303
304 match received {
305 Command::SendMessage {
306 from,
307 body,
308 message_id,
309 } => {
310 assert_eq!(from, "user");
311 assert_eq!(body, "say hello");
312 assert_eq!(message_id.as_deref(), Some("msg-1"));
313 }
314 _ => panic!("wrong variant"),
315 }
316 }
317
318 #[test]
319 fn roundtrip_command_capture_screen() {
320 let (a, b) = socketpair().unwrap();
321 let mut sender = Channel::new(a);
322 let mut receiver = Channel::new(b);
323
324 let cmd = Command::CaptureScreen {
325 last_n_lines: Some(10),
326 };
327 sender.send(&cmd).unwrap();
328 let received: Command = receiver.recv::<Command>().unwrap().unwrap();
329 match received {
330 Command::CaptureScreen { last_n_lines } => assert_eq!(last_n_lines, Some(10)),
331 _ => panic!("wrong variant"),
332 }
333 }
334
335 #[test]
336 fn roundtrip_command_get_state() {
337 let (a, b) = socketpair().unwrap();
338 let mut sender = Channel::new(a);
339 let mut receiver = Channel::new(b);
340
341 sender.send(&Command::GetState).unwrap();
342 let received: Command = receiver.recv::<Command>().unwrap().unwrap();
343 assert!(matches!(received, Command::GetState));
344 }
345
346 #[test]
347 fn roundtrip_command_resize() {
348 let (a, b) = socketpair().unwrap();
349 let mut sender = Channel::new(a);
350 let mut receiver = Channel::new(b);
351
352 let cmd = Command::Resize {
353 rows: 50,
354 cols: 220,
355 };
356 sender.send(&cmd).unwrap();
357 let received: Command = receiver.recv::<Command>().unwrap().unwrap();
358 match received {
359 Command::Resize { rows, cols } => {
360 assert_eq!(rows, 50);
361 assert_eq!(cols, 220);
362 }
363 _ => panic!("wrong variant"),
364 }
365 }
366
367 #[test]
368 fn roundtrip_command_shutdown() {
369 let (a, b) = socketpair().unwrap();
370 let mut sender = Channel::new(a);
371 let mut receiver = Channel::new(b);
372
373 let cmd = Command::Shutdown {
374 timeout_secs: 30,
375 reason: ShutdownReason::Requested,
376 };
377 sender.send(&cmd).unwrap();
378 let received: Command = receiver.recv::<Command>().unwrap().unwrap();
379 match received {
380 Command::Shutdown {
381 timeout_secs,
382 reason,
383 } => {
384 assert_eq!(timeout_secs, 30);
385 assert_eq!(reason, ShutdownReason::Requested);
386 }
387 _ => panic!("wrong variant"),
388 }
389 }
390
391 #[test]
392 fn shutdown_reason_labels_restart_handoff_explicitly() {
393 assert_eq!(ShutdownReason::RestartHandoff.label(), "restart_handoff");
394 assert_ne!(
395 ShutdownReason::RestartHandoff.label(),
396 "orchestrator disconnected"
397 );
398 }
399
400 #[test]
401 fn roundtrip_command_kill() {
402 let (a, b) = socketpair().unwrap();
403 let mut sender = Channel::new(a);
404 let mut receiver = Channel::new(b);
405
406 sender.send(&Command::Kill).unwrap();
407 let received: Command = receiver.recv::<Command>().unwrap().unwrap();
408 assert!(matches!(received, Command::Kill));
409 }
410
411 #[test]
412 fn roundtrip_command_ping() {
413 let (a, b) = socketpair().unwrap();
414 let mut sender = Channel::new(a);
415 let mut receiver = Channel::new(b);
416
417 sender.send(&Command::Ping).unwrap();
418 let received: Command = receiver.recv::<Command>().unwrap().unwrap();
419 assert!(matches!(received, Command::Ping));
420 }
421
422 #[test]
423 fn roundtrip_event_completion() {
424 let (a, b) = socketpair().unwrap();
425 let mut sender = Channel::new(a);
426 let mut receiver = Channel::new(b);
427
428 let evt = Event::Completion {
429 message_id: None,
430 response: "Hello!".into(),
431 last_lines: "Hello!\n❯".into(),
432 };
433 sender.send(&evt).unwrap();
434 let received: Event = receiver.recv::<Event>().unwrap().unwrap();
435
436 match received {
437 Event::Completion { response, .. } => assert_eq!(response, "Hello!"),
438 _ => panic!("wrong variant"),
439 }
440 }
441
442 #[test]
443 fn roundtrip_event_message_delivered() {
444 let (a, b) = socketpair().unwrap();
445 let mut sender = Channel::new(a);
446 let mut receiver = Channel::new(b);
447
448 let evt = Event::MessageDelivered { id: "msg-1".into() };
449 sender.send(&evt).unwrap();
450 let received: Event = receiver.recv::<Event>().unwrap().unwrap();
451
452 match received {
453 Event::MessageDelivered { id } => assert_eq!(id, "msg-1"),
454 _ => panic!("wrong variant"),
455 }
456 }
457
458 #[test]
459 fn roundtrip_event_state_changed() {
460 let (a, b) = socketpair().unwrap();
461 let mut sender = Channel::new(a);
462 let mut receiver = Channel::new(b);
463
464 let evt = Event::StateChanged {
465 from: ShimState::Idle,
466 to: ShimState::Working,
467 summary: "working now".into(),
468 };
469 sender.send(&evt).unwrap();
470 let received: Event = receiver.recv::<Event>().unwrap().unwrap();
471 match received {
472 Event::StateChanged { from, to, summary } => {
473 assert_eq!(from, ShimState::Idle);
474 assert_eq!(to, ShimState::Working);
475 assert_eq!(summary, "working now");
476 }
477 _ => panic!("wrong variant"),
478 }
479 }
480
481 #[test]
482 fn roundtrip_event_ready() {
483 let (a, b) = socketpair().unwrap();
484 let mut sender = Channel::new(a);
485 let mut receiver = Channel::new(b);
486
487 sender.send(&Event::Ready).unwrap();
488 let received: Event = receiver.recv::<Event>().unwrap().unwrap();
489 assert!(matches!(received, Event::Ready));
490 }
491
492 #[test]
493 fn roundtrip_event_pong() {
494 let (a, b) = socketpair().unwrap();
495 let mut sender = Channel::new(a);
496 let mut receiver = Channel::new(b);
497
498 sender.send(&Event::Pong).unwrap();
499 let received: Event = receiver.recv::<Event>().unwrap().unwrap();
500 assert!(matches!(received, Event::Pong));
501 }
502
503 #[test]
504 fn roundtrip_event_delivery_failed() {
505 let (a, b) = socketpair().unwrap();
506 let mut sender = Channel::new(a);
507 let mut receiver = Channel::new(b);
508
509 let evt = Event::DeliveryFailed {
510 id: "msg-1".into(),
511 reason: "stdin write failed".into(),
512 };
513 sender.send(&evt).unwrap();
514 let received: Event = receiver.recv::<Event>().unwrap().unwrap();
515
516 match received {
517 Event::DeliveryFailed { id, reason } => {
518 assert_eq!(id, "msg-1");
519 assert_eq!(reason, "stdin write failed");
520 }
521 _ => panic!("wrong variant"),
522 }
523 }
524
525 #[test]
526 fn roundtrip_event_died() {
527 let (a, b) = socketpair().unwrap();
528 let mut sender = Channel::new(a);
529 let mut receiver = Channel::new(b);
530
531 let evt = Event::Died {
532 exit_code: Some(1),
533 last_lines: "error occurred".into(),
534 };
535 sender.send(&evt).unwrap();
536 let received: Event = receiver.recv::<Event>().unwrap().unwrap();
537 match received {
538 Event::Died {
539 exit_code,
540 last_lines,
541 } => {
542 assert_eq!(exit_code, Some(1));
543 assert_eq!(last_lines, "error occurred");
544 }
545 _ => panic!("wrong variant"),
546 }
547 }
548
549 #[test]
550 fn roundtrip_event_context_exhausted() {
551 let (a, b) = socketpair().unwrap();
552 let mut sender = Channel::new(a);
553 let mut receiver = Channel::new(b);
554
555 let evt = Event::ContextExhausted {
556 message: "context full".into(),
557 last_lines: "last output".into(),
558 };
559 sender.send(&evt).unwrap();
560 let received: Event = receiver.recv::<Event>().unwrap().unwrap();
561 match received {
562 Event::ContextExhausted {
563 message,
564 last_lines,
565 } => {
566 assert_eq!(message, "context full");
567 assert_eq!(last_lines, "last output");
568 }
569 _ => panic!("wrong variant"),
570 }
571 }
572
573 #[test]
574 fn roundtrip_event_screen_capture() {
575 let (a, b) = socketpair().unwrap();
576 let mut sender = Channel::new(a);
577 let mut receiver = Channel::new(b);
578
579 let evt = Event::ScreenCapture {
580 content: "screen data".into(),
581 cursor_row: 5,
582 cursor_col: 10,
583 };
584 sender.send(&evt).unwrap();
585 let received: Event = receiver.recv::<Event>().unwrap().unwrap();
586 match received {
587 Event::ScreenCapture {
588 content,
589 cursor_row,
590 cursor_col,
591 } => {
592 assert_eq!(content, "screen data");
593 assert_eq!(cursor_row, 5);
594 assert_eq!(cursor_col, 10);
595 }
596 _ => panic!("wrong variant"),
597 }
598 }
599
600 #[test]
601 fn roundtrip_event_context_warning() {
602 let (a, b) = socketpair().unwrap();
603 let mut sender = Channel::new(a);
604 let mut receiver = Channel::new(b);
605
606 let evt = Event::ContextWarning {
607 model: Some("claude-sonnet-4-5".into()),
608 output_bytes: 12_345,
609 uptime_secs: 61,
610 input_tokens: 80_000,
611 cached_input_tokens: 5_000,
612 cache_creation_input_tokens: 4_000,
613 cache_read_input_tokens: 3_000,
614 output_tokens: 6_000,
615 reasoning_output_tokens: 2_000,
616 used_tokens: 100_000,
617 context_limit_tokens: 200_000,
618 usage_pct: 50,
619 };
620 sender.send(&evt).unwrap();
621 let received: Event = receiver.recv::<Event>().unwrap().unwrap();
622 match received {
623 Event::ContextWarning {
624 model,
625 output_bytes,
626 uptime_secs,
627 input_tokens,
628 cached_input_tokens,
629 cache_creation_input_tokens,
630 cache_read_input_tokens,
631 output_tokens,
632 reasoning_output_tokens,
633 used_tokens,
634 context_limit_tokens,
635 usage_pct,
636 } => {
637 assert_eq!(model.as_deref(), Some("claude-sonnet-4-5"));
638 assert_eq!(output_bytes, 12_345);
639 assert_eq!(uptime_secs, 61);
640 assert_eq!(input_tokens, 80_000);
641 assert_eq!(cached_input_tokens, 5_000);
642 assert_eq!(cache_creation_input_tokens, 4_000);
643 assert_eq!(cache_read_input_tokens, 3_000);
644 assert_eq!(output_tokens, 6_000);
645 assert_eq!(reasoning_output_tokens, 2_000);
646 assert_eq!(used_tokens, 100_000);
647 assert_eq!(context_limit_tokens, 200_000);
648 assert_eq!(usage_pct, 50);
649 }
650 _ => panic!("wrong variant"),
651 }
652 }
653
654 #[test]
655 fn roundtrip_event_session_stats() {
656 let (a, b) = socketpair().unwrap();
657 let mut sender = Channel::new(a);
658 let mut receiver = Channel::new(b);
659
660 let evt = Event::SessionStats {
661 output_bytes: 123_456,
662 uptime_secs: 61,
663 input_tokens: 5000,
664 output_tokens: 1200,
665 context_usage_pct: Some(84),
666 };
667 sender.send(&evt).unwrap();
668 let received: Event = receiver.recv::<Event>().unwrap().unwrap();
669 match received {
670 Event::SessionStats {
671 output_bytes,
672 uptime_secs,
673 input_tokens,
674 output_tokens,
675 context_usage_pct,
676 } => {
677 assert_eq!(output_bytes, 123_456);
678 assert_eq!(uptime_secs, 61);
679 assert_eq!(input_tokens, 5000);
680 assert_eq!(output_tokens, 1200);
681 assert_eq!(context_usage_pct, Some(84));
682 }
683 _ => panic!("wrong variant"),
684 }
685 }
686
687 #[test]
688 fn roundtrip_event_context_approaching() {
689 let (a, b) = socketpair().unwrap();
690 let mut sender = Channel::new(a);
691 let mut receiver = Channel::new(b);
692
693 let evt = Event::ContextApproaching {
694 message: "context pressure detected".into(),
695 input_tokens: 80000,
696 output_tokens: 20000,
697 };
698 sender.send(&evt).unwrap();
699 let received: Event = receiver.recv::<Event>().unwrap().unwrap();
700 match received {
701 Event::ContextApproaching {
702 message,
703 input_tokens,
704 output_tokens,
705 } => {
706 assert_eq!(message, "context pressure detected");
707 assert_eq!(input_tokens, 80000);
708 assert_eq!(output_tokens, 20000);
709 }
710 _ => panic!("wrong variant"),
711 }
712 }
713
714 #[test]
715 fn roundtrip_event_quota_blocked() {
716 let (a, b) = socketpair().unwrap();
717 let mut sender = Channel::new(a);
718 let mut receiver = Channel::new(b);
719
720 let evt = Event::QuotaBlocked {
721 message: "usage limit reached".into(),
722 retry_at_epoch_secs: Some(1_776_214_440),
723 retry_at_label: Some("2026-04-16 12:54".into()),
724 };
725 sender.send(&evt).unwrap();
726 let received: Event = receiver.recv::<Event>().unwrap().unwrap();
727 match received {
728 Event::QuotaBlocked {
729 message,
730 retry_at_epoch_secs,
731 retry_at_label,
732 } => {
733 assert_eq!(message, "usage limit reached");
734 assert_eq!(retry_at_epoch_secs, Some(1_776_214_440));
735 assert_eq!(retry_at_label.as_deref(), Some("2026-04-16 12:54"));
736 }
737 _ => panic!("wrong variant"),
738 }
739 }
740
741 #[test]
742 fn roundtrip_event_error() {
743 let (a, b) = socketpair().unwrap();
744 let mut sender = Channel::new(a);
745 let mut receiver = Channel::new(b);
746
747 let evt = Event::Error {
748 command: "SendMessage".into(),
749 reason: "agent busy".into(),
750 };
751 sender.send(&evt).unwrap();
752 let received: Event = receiver.recv::<Event>().unwrap().unwrap();
753 match received {
754 Event::Error { command, reason } => {
755 assert_eq!(command, "SendMessage");
756 assert_eq!(reason, "agent busy");
757 }
758 _ => panic!("wrong variant"),
759 }
760 }
761
762 #[test]
763 fn roundtrip_event_warning() {
764 let (a, b) = socketpair().unwrap();
765 let mut sender = Channel::new(a);
766 let mut receiver = Channel::new(b);
767
768 let evt = Event::Warning {
769 message: "no screen change".into(),
770 idle_secs: Some(300),
771 };
772 sender.send(&evt).unwrap();
773 let received: Event = receiver.recv::<Event>().unwrap().unwrap();
774 match received {
775 Event::Warning { message, idle_secs } => {
776 assert_eq!(message, "no screen change");
777 assert_eq!(idle_secs, Some(300));
778 }
779 _ => panic!("wrong variant"),
780 }
781 }
782
783 #[test]
784 fn eof_returns_none() {
785 let (a, b) = socketpair().unwrap();
786 drop(a); let mut receiver = Channel::new(b);
788 let result: Option<Command> = receiver.recv().unwrap();
789 assert!(result.is_none());
790 }
791
792 #[test]
793 fn all_states_serialize() {
794 for state in [
795 ShimState::Starting,
796 ShimState::Idle,
797 ShimState::Working,
798 ShimState::Dead,
799 ShimState::ContextExhausted,
800 ] {
801 let json = serde_json::to_string(&state).unwrap();
802 let back: ShimState = serde_json::from_str(&json).unwrap();
803 assert_eq!(state, back);
804 }
805 }
806
807 #[test]
808 fn shim_state_display() {
809 assert_eq!(ShimState::Starting.to_string(), "starting");
810 assert_eq!(ShimState::Idle.to_string(), "idle");
811 assert_eq!(ShimState::Working.to_string(), "working");
812 assert_eq!(ShimState::Dead.to_string(), "dead");
813 assert_eq!(ShimState::ContextExhausted.to_string(), "context_exhausted");
814 }
815
816 #[test]
817 fn socketpair_creates_connected_pair() {
818 let (a, b) = socketpair().unwrap();
819 let mut ch_a = Channel::new(a);
821 let mut ch_b = Channel::new(b);
822 ch_a.send(&Command::Ping).unwrap();
823 let msg: Command = ch_b.recv().unwrap().unwrap();
824 assert!(matches!(msg, Command::Ping));
825 }
826
827 #[test]
828 fn send_times_out_when_peer_stops_reading() {
829 let (a, _b) = socketpair().unwrap();
840 let mut sender = Channel::new(a);
841 sender
842 .set_write_timeout(Some(std::time::Duration::from_millis(50)))
843 .unwrap();
844
845 let big_body = "x".repeat(256 * 1024);
851 let cmd = Command::SendMessage {
852 from: "daemon".into(),
853 body: big_body,
854 message_id: None,
855 };
856
857 let start = std::time::Instant::now();
858 let mut attempts = 0;
859 let mut last_err = None;
860 while start.elapsed() < std::time::Duration::from_secs(5) {
861 attempts += 1;
862 match sender.send(&cmd) {
863 Ok(()) => continue,
864 Err(error) => {
865 last_err = Some(error);
866 break;
867 }
868 }
869 }
870 let error = last_err.expect("send should have timed out within 5s");
871 let io_error = error
872 .downcast_ref::<std::io::Error>()
873 .expect("write timeout should surface as an io::Error");
874 assert!(
875 matches!(
876 io_error.kind(),
877 std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
878 ),
879 "expected WouldBlock/TimedOut error, got {:?}",
880 io_error.kind()
881 );
882 assert!(
883 attempts >= 1,
884 "sanity check: send loop should have attempted at least once"
885 );
886 }
887}