Skip to main content

batty_cli/shim/
protocol.rs

1//! Wire protocol: Commands (orchestrator→shim) and Events (shim→orchestrator).
2//!
3//! Transport: length-prefixed JSON over a Unix SOCK_STREAM socketpair.
4//! 4-byte big-endian length prefix + JSON payload.
5
6use serde::{Deserialize, Serialize};
7use std::io::{self, Read, Write};
8use std::os::unix::net::UnixStream;
9
10// ---------------------------------------------------------------------------
11// Commands (sent TO the shim)
12// ---------------------------------------------------------------------------
13
14#[derive(Debug, Serialize, Deserialize)]
15#[serde(tag = "cmd")]
16pub enum Command {
17    SendMessage {
18        from: String,
19        body: String,
20        #[serde(skip_serializing_if = "Option::is_none")]
21        message_id: Option<String>,
22    },
23    CaptureScreen {
24        last_n_lines: Option<usize>,
25    },
26    GetState,
27    Resize {
28        rows: u16,
29        cols: u16,
30    },
31    Shutdown {
32        timeout_secs: u32,
33        #[serde(default)]
34        reason: ShutdownReason,
35    },
36    Kill,
37    Ping,
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
41#[serde(rename_all = "snake_case")]
42pub enum ShutdownReason {
43    #[default]
44    Requested,
45    RestartHandoff,
46    ContextExhausted,
47    TopologyChange,
48    DaemonStop,
49}
50
51impl ShutdownReason {
52    pub fn label(self) -> &'static str {
53        match self {
54            Self::Requested => "requested",
55            Self::RestartHandoff => "restart_handoff",
56            Self::ContextExhausted => "context_exhausted",
57            Self::TopologyChange => "topology_change",
58            Self::DaemonStop => "daemon_stop",
59        }
60    }
61}
62
63// ---------------------------------------------------------------------------
64// Events (sent FROM the shim)
65// ---------------------------------------------------------------------------
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
68#[serde(tag = "event")]
69pub enum Event {
70    Ready,
71    StateChanged {
72        from: ShimState,
73        to: ShimState,
74        summary: String,
75    },
76    MessageDelivered {
77        id: String,
78    },
79    Completion {
80        #[serde(skip_serializing_if = "Option::is_none")]
81        message_id: Option<String>,
82        response: String,
83        last_lines: String,
84    },
85    Died {
86        exit_code: Option<i32>,
87        last_lines: String,
88    },
89    ContextExhausted {
90        message: String,
91        last_lines: String,
92    },
93    ContextWarning {
94        model: Option<String>,
95        output_bytes: u64,
96        uptime_secs: u64,
97        input_tokens: u64,
98        cached_input_tokens: u64,
99        cache_creation_input_tokens: u64,
100        cache_read_input_tokens: u64,
101        output_tokens: u64,
102        reasoning_output_tokens: u64,
103        used_tokens: u64,
104        context_limit_tokens: u64,
105        usage_pct: u8,
106    },
107    ContextApproaching {
108        message: String,
109        input_tokens: u64,
110        output_tokens: u64,
111    },
112    QuotaBlocked {
113        message: String,
114        #[serde(default, skip_serializing_if = "Option::is_none")]
115        retry_at_epoch_secs: Option<u64>,
116        #[serde(default, skip_serializing_if = "Option::is_none")]
117        retry_at_label: Option<String>,
118    },
119    ScreenCapture {
120        content: String,
121        cursor_row: u16,
122        cursor_col: u16,
123    },
124    State {
125        state: ShimState,
126        since_secs: u64,
127    },
128    SessionStats {
129        output_bytes: u64,
130        uptime_secs: u64,
131        #[serde(default)]
132        input_tokens: u64,
133        #[serde(default)]
134        output_tokens: u64,
135        #[serde(default, skip_serializing_if = "Option::is_none")]
136        context_usage_pct: Option<u8>,
137    },
138    Pong,
139    Warning {
140        message: String,
141        idle_secs: Option<u64>,
142    },
143    DeliveryFailed {
144        id: String,
145        reason: String,
146    },
147    Error {
148        command: String,
149        reason: String,
150    },
151}
152
153// ---------------------------------------------------------------------------
154// Shim state
155// ---------------------------------------------------------------------------
156
157#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub enum ShimState {
160    Starting,
161    Idle,
162    Working,
163    Dead,
164    ContextExhausted,
165}
166
167impl std::fmt::Display for ShimState {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        match self {
170            Self::Starting => write!(f, "starting"),
171            Self::Idle => write!(f, "idle"),
172            Self::Working => write!(f, "working"),
173            Self::Dead => write!(f, "dead"),
174            Self::ContextExhausted => write!(f, "context_exhausted"),
175        }
176    }
177}
178
179// ---------------------------------------------------------------------------
180// Framed channel over a Unix socket
181// ---------------------------------------------------------------------------
182
183/// Blocking, length-prefixed JSON channel over a Unix stream socket.
184///
185/// Uses 4-byte big-endian length + JSON payload for robustness.
186pub struct Channel {
187    stream: UnixStream,
188    read_buf: Vec<u8>,
189}
190
191const MAX_MSG: usize = 1_048_576; // 1 MB
192
193impl Channel {
194    pub fn new(stream: UnixStream) -> Self {
195        Self {
196            stream,
197            read_buf: vec![0u8; 4096],
198        }
199    }
200
201    /// Send a serializable message.
202    pub fn send<T: Serialize>(&mut self, msg: &T) -> anyhow::Result<()> {
203        let json = serde_json::to_vec(msg)?;
204        if json.len() > MAX_MSG {
205            anyhow::bail!("message too large: {} bytes", json.len());
206        }
207        let len = (json.len() as u32).to_be_bytes();
208        self.stream.write_all(&len)?;
209        self.stream.write_all(&json)?;
210        self.stream.flush()?;
211        Ok(())
212    }
213
214    /// Receive a deserializable message. Blocks until a message arrives.
215    /// Returns Ok(None) on clean EOF (peer closed).
216    pub fn recv<T: for<'de> Deserialize<'de>>(&mut self) -> anyhow::Result<Option<T>> {
217        let mut len_buf = [0u8; 4];
218        match self.stream.read_exact(&mut len_buf) {
219            Ok(()) => {}
220            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
221            Err(e) => return Err(e.into()),
222        }
223        let len = u32::from_be_bytes(len_buf) as usize;
224        if len > MAX_MSG {
225            anyhow::bail!("incoming message too large: {} bytes", len);
226        }
227        if self.read_buf.len() < len {
228            self.read_buf.resize(len, 0);
229        }
230        self.stream.read_exact(&mut self.read_buf[..len])?;
231        let msg = serde_json::from_slice(&self.read_buf[..len])?;
232        Ok(Some(msg))
233    }
234
235    /// Set a read timeout on the underlying socket.
236    /// After this, `recv()` will return an error if no data arrives
237    /// within the given duration (instead of blocking forever).
238    pub fn set_read_timeout(&mut self, timeout: Option<std::time::Duration>) -> anyhow::Result<()> {
239        self.stream.set_read_timeout(timeout)?;
240        Ok(())
241    }
242
243    /// Set a write timeout on the underlying socket.
244    ///
245    /// Without this, `send()` calls `write_all()` which blocks indefinitely
246    /// when the peer stops draining its receive buffer — e.g. a wedged shim
247    /// that stopped reading. A blocking `write_all` inside the daemon's
248    /// `poll_shim_handles` / ping_pong tick wedges the ENTIRE daemon event
249    /// loop on a single stuck handle, which matches the documented "daemon
250    /// freezes after 10-15 min productive window" failure mode. Setting a
251    /// bounded write timeout turns that hard-hang into a regular `send()`
252    /// error that the caller can classify as a stale / dead handle and
253    /// escalate via the usual respawn / crash paths.
254    pub fn set_write_timeout(
255        &mut self,
256        timeout: Option<std::time::Duration>,
257    ) -> anyhow::Result<()> {
258        self.stream.set_write_timeout(timeout)?;
259        Ok(())
260    }
261
262    /// Clone the underlying fd for use in a second thread.
263    pub fn try_clone(&self) -> anyhow::Result<Self> {
264        Ok(Self {
265            stream: self.stream.try_clone()?,
266            read_buf: vec![0u8; 4096],
267        })
268    }
269}
270
271// ---------------------------------------------------------------------------
272// Create a connected socketpair
273// ---------------------------------------------------------------------------
274
275/// Create a connected pair of Unix stream sockets.
276/// Returns (parent_socket, child_socket).
277pub fn socketpair() -> anyhow::Result<(UnixStream, UnixStream)> {
278    let (a, b) = UnixStream::pair()?;
279    Ok((a, b))
280}
281
282// ---------------------------------------------------------------------------
283// Tests
284// ---------------------------------------------------------------------------
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289
290    #[test]
291    fn roundtrip_command_send_message() {
292        let (a, b) = socketpair().unwrap();
293        let mut sender = Channel::new(a);
294        let mut receiver = Channel::new(b);
295
296        let cmd = Command::SendMessage {
297            from: "user".into(),
298            body: "say hello".into(),
299            message_id: Some("msg-1".into()),
300        };
301        sender.send(&cmd).unwrap();
302        let received: Command = receiver.recv::<Command>().unwrap().unwrap();
303
304        match received {
305            Command::SendMessage {
306                from,
307                body,
308                message_id,
309            } => {
310                assert_eq!(from, "user");
311                assert_eq!(body, "say hello");
312                assert_eq!(message_id.as_deref(), Some("msg-1"));
313            }
314            _ => panic!("wrong variant"),
315        }
316    }
317
318    #[test]
319    fn roundtrip_command_capture_screen() {
320        let (a, b) = socketpair().unwrap();
321        let mut sender = Channel::new(a);
322        let mut receiver = Channel::new(b);
323
324        let cmd = Command::CaptureScreen {
325            last_n_lines: Some(10),
326        };
327        sender.send(&cmd).unwrap();
328        let received: Command = receiver.recv::<Command>().unwrap().unwrap();
329        match received {
330            Command::CaptureScreen { last_n_lines } => assert_eq!(last_n_lines, Some(10)),
331            _ => panic!("wrong variant"),
332        }
333    }
334
335    #[test]
336    fn roundtrip_command_get_state() {
337        let (a, b) = socketpair().unwrap();
338        let mut sender = Channel::new(a);
339        let mut receiver = Channel::new(b);
340
341        sender.send(&Command::GetState).unwrap();
342        let received: Command = receiver.recv::<Command>().unwrap().unwrap();
343        assert!(matches!(received, Command::GetState));
344    }
345
346    #[test]
347    fn roundtrip_command_resize() {
348        let (a, b) = socketpair().unwrap();
349        let mut sender = Channel::new(a);
350        let mut receiver = Channel::new(b);
351
352        let cmd = Command::Resize {
353            rows: 50,
354            cols: 220,
355        };
356        sender.send(&cmd).unwrap();
357        let received: Command = receiver.recv::<Command>().unwrap().unwrap();
358        match received {
359            Command::Resize { rows, cols } => {
360                assert_eq!(rows, 50);
361                assert_eq!(cols, 220);
362            }
363            _ => panic!("wrong variant"),
364        }
365    }
366
367    #[test]
368    fn roundtrip_command_shutdown() {
369        let (a, b) = socketpair().unwrap();
370        let mut sender = Channel::new(a);
371        let mut receiver = Channel::new(b);
372
373        let cmd = Command::Shutdown {
374            timeout_secs: 30,
375            reason: ShutdownReason::Requested,
376        };
377        sender.send(&cmd).unwrap();
378        let received: Command = receiver.recv::<Command>().unwrap().unwrap();
379        match received {
380            Command::Shutdown {
381                timeout_secs,
382                reason,
383            } => {
384                assert_eq!(timeout_secs, 30);
385                assert_eq!(reason, ShutdownReason::Requested);
386            }
387            _ => panic!("wrong variant"),
388        }
389    }
390
391    #[test]
392    fn shutdown_reason_labels_restart_handoff_explicitly() {
393        assert_eq!(ShutdownReason::RestartHandoff.label(), "restart_handoff");
394        assert_ne!(
395            ShutdownReason::RestartHandoff.label(),
396            "orchestrator disconnected"
397        );
398    }
399
400    #[test]
401    fn roundtrip_command_kill() {
402        let (a, b) = socketpair().unwrap();
403        let mut sender = Channel::new(a);
404        let mut receiver = Channel::new(b);
405
406        sender.send(&Command::Kill).unwrap();
407        let received: Command = receiver.recv::<Command>().unwrap().unwrap();
408        assert!(matches!(received, Command::Kill));
409    }
410
411    #[test]
412    fn roundtrip_command_ping() {
413        let (a, b) = socketpair().unwrap();
414        let mut sender = Channel::new(a);
415        let mut receiver = Channel::new(b);
416
417        sender.send(&Command::Ping).unwrap();
418        let received: Command = receiver.recv::<Command>().unwrap().unwrap();
419        assert!(matches!(received, Command::Ping));
420    }
421
422    #[test]
423    fn roundtrip_event_completion() {
424        let (a, b) = socketpair().unwrap();
425        let mut sender = Channel::new(a);
426        let mut receiver = Channel::new(b);
427
428        let evt = Event::Completion {
429            message_id: None,
430            response: "Hello!".into(),
431            last_lines: "Hello!\n❯".into(),
432        };
433        sender.send(&evt).unwrap();
434        let received: Event = receiver.recv::<Event>().unwrap().unwrap();
435
436        match received {
437            Event::Completion { response, .. } => assert_eq!(response, "Hello!"),
438            _ => panic!("wrong variant"),
439        }
440    }
441
442    #[test]
443    fn roundtrip_event_message_delivered() {
444        let (a, b) = socketpair().unwrap();
445        let mut sender = Channel::new(a);
446        let mut receiver = Channel::new(b);
447
448        let evt = Event::MessageDelivered { id: "msg-1".into() };
449        sender.send(&evt).unwrap();
450        let received: Event = receiver.recv::<Event>().unwrap().unwrap();
451
452        match received {
453            Event::MessageDelivered { id } => assert_eq!(id, "msg-1"),
454            _ => panic!("wrong variant"),
455        }
456    }
457
458    #[test]
459    fn roundtrip_event_state_changed() {
460        let (a, b) = socketpair().unwrap();
461        let mut sender = Channel::new(a);
462        let mut receiver = Channel::new(b);
463
464        let evt = Event::StateChanged {
465            from: ShimState::Idle,
466            to: ShimState::Working,
467            summary: "working now".into(),
468        };
469        sender.send(&evt).unwrap();
470        let received: Event = receiver.recv::<Event>().unwrap().unwrap();
471        match received {
472            Event::StateChanged { from, to, summary } => {
473                assert_eq!(from, ShimState::Idle);
474                assert_eq!(to, ShimState::Working);
475                assert_eq!(summary, "working now");
476            }
477            _ => panic!("wrong variant"),
478        }
479    }
480
481    #[test]
482    fn roundtrip_event_ready() {
483        let (a, b) = socketpair().unwrap();
484        let mut sender = Channel::new(a);
485        let mut receiver = Channel::new(b);
486
487        sender.send(&Event::Ready).unwrap();
488        let received: Event = receiver.recv::<Event>().unwrap().unwrap();
489        assert!(matches!(received, Event::Ready));
490    }
491
492    #[test]
493    fn roundtrip_event_pong() {
494        let (a, b) = socketpair().unwrap();
495        let mut sender = Channel::new(a);
496        let mut receiver = Channel::new(b);
497
498        sender.send(&Event::Pong).unwrap();
499        let received: Event = receiver.recv::<Event>().unwrap().unwrap();
500        assert!(matches!(received, Event::Pong));
501    }
502
503    #[test]
504    fn roundtrip_event_delivery_failed() {
505        let (a, b) = socketpair().unwrap();
506        let mut sender = Channel::new(a);
507        let mut receiver = Channel::new(b);
508
509        let evt = Event::DeliveryFailed {
510            id: "msg-1".into(),
511            reason: "stdin write failed".into(),
512        };
513        sender.send(&evt).unwrap();
514        let received: Event = receiver.recv::<Event>().unwrap().unwrap();
515
516        match received {
517            Event::DeliveryFailed { id, reason } => {
518                assert_eq!(id, "msg-1");
519                assert_eq!(reason, "stdin write failed");
520            }
521            _ => panic!("wrong variant"),
522        }
523    }
524
525    #[test]
526    fn roundtrip_event_died() {
527        let (a, b) = socketpair().unwrap();
528        let mut sender = Channel::new(a);
529        let mut receiver = Channel::new(b);
530
531        let evt = Event::Died {
532            exit_code: Some(1),
533            last_lines: "error occurred".into(),
534        };
535        sender.send(&evt).unwrap();
536        let received: Event = receiver.recv::<Event>().unwrap().unwrap();
537        match received {
538            Event::Died {
539                exit_code,
540                last_lines,
541            } => {
542                assert_eq!(exit_code, Some(1));
543                assert_eq!(last_lines, "error occurred");
544            }
545            _ => panic!("wrong variant"),
546        }
547    }
548
549    #[test]
550    fn roundtrip_event_context_exhausted() {
551        let (a, b) = socketpair().unwrap();
552        let mut sender = Channel::new(a);
553        let mut receiver = Channel::new(b);
554
555        let evt = Event::ContextExhausted {
556            message: "context full".into(),
557            last_lines: "last output".into(),
558        };
559        sender.send(&evt).unwrap();
560        let received: Event = receiver.recv::<Event>().unwrap().unwrap();
561        match received {
562            Event::ContextExhausted {
563                message,
564                last_lines,
565            } => {
566                assert_eq!(message, "context full");
567                assert_eq!(last_lines, "last output");
568            }
569            _ => panic!("wrong variant"),
570        }
571    }
572
573    #[test]
574    fn roundtrip_event_screen_capture() {
575        let (a, b) = socketpair().unwrap();
576        let mut sender = Channel::new(a);
577        let mut receiver = Channel::new(b);
578
579        let evt = Event::ScreenCapture {
580            content: "screen data".into(),
581            cursor_row: 5,
582            cursor_col: 10,
583        };
584        sender.send(&evt).unwrap();
585        let received: Event = receiver.recv::<Event>().unwrap().unwrap();
586        match received {
587            Event::ScreenCapture {
588                content,
589                cursor_row,
590                cursor_col,
591            } => {
592                assert_eq!(content, "screen data");
593                assert_eq!(cursor_row, 5);
594                assert_eq!(cursor_col, 10);
595            }
596            _ => panic!("wrong variant"),
597        }
598    }
599
600    #[test]
601    fn roundtrip_event_context_warning() {
602        let (a, b) = socketpair().unwrap();
603        let mut sender = Channel::new(a);
604        let mut receiver = Channel::new(b);
605
606        let evt = Event::ContextWarning {
607            model: Some("claude-sonnet-4-5".into()),
608            output_bytes: 12_345,
609            uptime_secs: 61,
610            input_tokens: 80_000,
611            cached_input_tokens: 5_000,
612            cache_creation_input_tokens: 4_000,
613            cache_read_input_tokens: 3_000,
614            output_tokens: 6_000,
615            reasoning_output_tokens: 2_000,
616            used_tokens: 100_000,
617            context_limit_tokens: 200_000,
618            usage_pct: 50,
619        };
620        sender.send(&evt).unwrap();
621        let received: Event = receiver.recv::<Event>().unwrap().unwrap();
622        match received {
623            Event::ContextWarning {
624                model,
625                output_bytes,
626                uptime_secs,
627                input_tokens,
628                cached_input_tokens,
629                cache_creation_input_tokens,
630                cache_read_input_tokens,
631                output_tokens,
632                reasoning_output_tokens,
633                used_tokens,
634                context_limit_tokens,
635                usage_pct,
636            } => {
637                assert_eq!(model.as_deref(), Some("claude-sonnet-4-5"));
638                assert_eq!(output_bytes, 12_345);
639                assert_eq!(uptime_secs, 61);
640                assert_eq!(input_tokens, 80_000);
641                assert_eq!(cached_input_tokens, 5_000);
642                assert_eq!(cache_creation_input_tokens, 4_000);
643                assert_eq!(cache_read_input_tokens, 3_000);
644                assert_eq!(output_tokens, 6_000);
645                assert_eq!(reasoning_output_tokens, 2_000);
646                assert_eq!(used_tokens, 100_000);
647                assert_eq!(context_limit_tokens, 200_000);
648                assert_eq!(usage_pct, 50);
649            }
650            _ => panic!("wrong variant"),
651        }
652    }
653
654    #[test]
655    fn roundtrip_event_session_stats() {
656        let (a, b) = socketpair().unwrap();
657        let mut sender = Channel::new(a);
658        let mut receiver = Channel::new(b);
659
660        let evt = Event::SessionStats {
661            output_bytes: 123_456,
662            uptime_secs: 61,
663            input_tokens: 5000,
664            output_tokens: 1200,
665            context_usage_pct: Some(84),
666        };
667        sender.send(&evt).unwrap();
668        let received: Event = receiver.recv::<Event>().unwrap().unwrap();
669        match received {
670            Event::SessionStats {
671                output_bytes,
672                uptime_secs,
673                input_tokens,
674                output_tokens,
675                context_usage_pct,
676            } => {
677                assert_eq!(output_bytes, 123_456);
678                assert_eq!(uptime_secs, 61);
679                assert_eq!(input_tokens, 5000);
680                assert_eq!(output_tokens, 1200);
681                assert_eq!(context_usage_pct, Some(84));
682            }
683            _ => panic!("wrong variant"),
684        }
685    }
686
687    #[test]
688    fn roundtrip_event_context_approaching() {
689        let (a, b) = socketpair().unwrap();
690        let mut sender = Channel::new(a);
691        let mut receiver = Channel::new(b);
692
693        let evt = Event::ContextApproaching {
694            message: "context pressure detected".into(),
695            input_tokens: 80000,
696            output_tokens: 20000,
697        };
698        sender.send(&evt).unwrap();
699        let received: Event = receiver.recv::<Event>().unwrap().unwrap();
700        match received {
701            Event::ContextApproaching {
702                message,
703                input_tokens,
704                output_tokens,
705            } => {
706                assert_eq!(message, "context pressure detected");
707                assert_eq!(input_tokens, 80000);
708                assert_eq!(output_tokens, 20000);
709            }
710            _ => panic!("wrong variant"),
711        }
712    }
713
714    #[test]
715    fn roundtrip_event_quota_blocked() {
716        let (a, b) = socketpair().unwrap();
717        let mut sender = Channel::new(a);
718        let mut receiver = Channel::new(b);
719
720        let evt = Event::QuotaBlocked {
721            message: "usage limit reached".into(),
722            retry_at_epoch_secs: Some(1_776_214_440),
723            retry_at_label: Some("2026-04-16 12:54".into()),
724        };
725        sender.send(&evt).unwrap();
726        let received: Event = receiver.recv::<Event>().unwrap().unwrap();
727        match received {
728            Event::QuotaBlocked {
729                message,
730                retry_at_epoch_secs,
731                retry_at_label,
732            } => {
733                assert_eq!(message, "usage limit reached");
734                assert_eq!(retry_at_epoch_secs, Some(1_776_214_440));
735                assert_eq!(retry_at_label.as_deref(), Some("2026-04-16 12:54"));
736            }
737            _ => panic!("wrong variant"),
738        }
739    }
740
741    #[test]
742    fn roundtrip_event_error() {
743        let (a, b) = socketpair().unwrap();
744        let mut sender = Channel::new(a);
745        let mut receiver = Channel::new(b);
746
747        let evt = Event::Error {
748            command: "SendMessage".into(),
749            reason: "agent busy".into(),
750        };
751        sender.send(&evt).unwrap();
752        let received: Event = receiver.recv::<Event>().unwrap().unwrap();
753        match received {
754            Event::Error { command, reason } => {
755                assert_eq!(command, "SendMessage");
756                assert_eq!(reason, "agent busy");
757            }
758            _ => panic!("wrong variant"),
759        }
760    }
761
762    #[test]
763    fn roundtrip_event_warning() {
764        let (a, b) = socketpair().unwrap();
765        let mut sender = Channel::new(a);
766        let mut receiver = Channel::new(b);
767
768        let evt = Event::Warning {
769            message: "no screen change".into(),
770            idle_secs: Some(300),
771        };
772        sender.send(&evt).unwrap();
773        let received: Event = receiver.recv::<Event>().unwrap().unwrap();
774        match received {
775            Event::Warning { message, idle_secs } => {
776                assert_eq!(message, "no screen change");
777                assert_eq!(idle_secs, Some(300));
778            }
779            _ => panic!("wrong variant"),
780        }
781    }
782
783    #[test]
784    fn eof_returns_none() {
785        let (a, b) = socketpair().unwrap();
786        drop(a); // close sender
787        let mut receiver = Channel::new(b);
788        let result: Option<Command> = receiver.recv().unwrap();
789        assert!(result.is_none());
790    }
791
792    #[test]
793    fn all_states_serialize() {
794        for state in [
795            ShimState::Starting,
796            ShimState::Idle,
797            ShimState::Working,
798            ShimState::Dead,
799            ShimState::ContextExhausted,
800        ] {
801            let json = serde_json::to_string(&state).unwrap();
802            let back: ShimState = serde_json::from_str(&json).unwrap();
803            assert_eq!(state, back);
804        }
805    }
806
807    #[test]
808    fn shim_state_display() {
809        assert_eq!(ShimState::Starting.to_string(), "starting");
810        assert_eq!(ShimState::Idle.to_string(), "idle");
811        assert_eq!(ShimState::Working.to_string(), "working");
812        assert_eq!(ShimState::Dead.to_string(), "dead");
813        assert_eq!(ShimState::ContextExhausted.to_string(), "context_exhausted");
814    }
815
816    #[test]
817    fn socketpair_creates_connected_pair() {
818        let (a, b) = socketpair().unwrap();
819        // Basic connectivity: write on a, read on b
820        let mut ch_a = Channel::new(a);
821        let mut ch_b = Channel::new(b);
822        ch_a.send(&Command::Ping).unwrap();
823        let msg: Command = ch_b.recv().unwrap().unwrap();
824        assert!(matches!(msg, Command::Ping));
825    }
826
827    #[test]
828    fn send_times_out_when_peer_stops_reading() {
829        // Regression test for the documented "daemon freezes after 10-15 min
830        // productive window" pattern. Without a write timeout, a wedged shim
831        // that stops draining its socket buffer will cause the daemon's next
832        // `send()` to block inside `write_all` indefinitely, wedging the
833        // entire event loop on a single stuck handle.
834        //
835        // We simulate the wedge by filling the peer's receive buffer beyond
836        // capacity and never calling `recv()` on the other side. With a
837        // short write timeout set, `send()` must return an error within a
838        // bounded window instead of hanging forever.
839        let (a, _b) = socketpair().unwrap();
840        let mut sender = Channel::new(a);
841        sender
842            .set_write_timeout(Some(std::time::Duration::from_millis(50)))
843            .unwrap();
844
845        // Build a large but legal payload and blast it across the pipe
846        // until the kernel refuses more bytes. On Linux the default per-pipe
847        // buffer is a few hundred KB; on macOS it's ~8 KB. Either way, a
848        // bounded retry loop must eventually hit the write timeout instead
849        // of blocking forever.
850        let big_body = "x".repeat(256 * 1024);
851        let cmd = Command::SendMessage {
852            from: "daemon".into(),
853            body: big_body,
854            message_id: None,
855        };
856
857        let start = std::time::Instant::now();
858        let mut attempts = 0;
859        let mut last_err = None;
860        while start.elapsed() < std::time::Duration::from_secs(5) {
861            attempts += 1;
862            match sender.send(&cmd) {
863                Ok(()) => continue,
864                Err(error) => {
865                    last_err = Some(error);
866                    break;
867                }
868            }
869        }
870        let error = last_err.expect("send should have timed out within 5s");
871        let io_error = error
872            .downcast_ref::<std::io::Error>()
873            .expect("write timeout should surface as an io::Error");
874        assert!(
875            matches!(
876                io_error.kind(),
877                std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
878            ),
879            "expected WouldBlock/TimedOut error, got {:?}",
880            io_error.kind()
881        );
882        assert!(
883            attempts >= 1,
884            "sanity check: send loop should have attempted at least once"
885        );
886    }
887}