Skip to main content

batty_cli/shim/
runtime.rs

1//! The shim process: owns a PTY, runs an agent CLI, classifies state,
2//! communicates with the orchestrator via a Channel on fd 3.
3//!
4//! Raw PTY output is also streamed to a log file so tmux display panes can
5//! `tail -F` it and render agent output in real time.
6
7use std::collections::VecDeque;
8use std::io::{Read, Write as IoWrite};
9use std::path::PathBuf;
10use std::sync::{Arc, Mutex};
11use std::time::Instant;
12
13use anyhow::{Context, Result};
14use portable_pty::{CommandBuilder, PtySize};
15
16use super::classifier::{self, AgentType, ScreenVerdict};
17use super::protocol::{Channel, Command, Event, ShimState};
18use super::pty_log::PtyLogWriter;
19
20// ---------------------------------------------------------------------------
21// Configuration
22// ---------------------------------------------------------------------------
23
24const DEFAULT_ROWS: u16 = 50;
25const DEFAULT_COLS: u16 = 220;
26const SCROLLBACK_LINES: usize = 5000;
27
28/// How often to check for state changes when no PTY output arrives (ms).
29const POLL_INTERVAL_MS: u64 = 250;
30
31/// Max time to wait for agent to show its first prompt (secs).
32const READY_TIMEOUT_SECS: u64 = 120;
33
34/// Maximum number of messages that can be queued while the agent is working.
35const MAX_QUEUE_DEPTH: usize = 16;
36
37// ---------------------------------------------------------------------------
38// Args (parsed from CLI in main.rs, passed here)
39// ---------------------------------------------------------------------------
40
41#[derive(Debug, Clone)]
42pub struct ShimArgs {
43    pub id: String,
44    pub agent_type: AgentType,
45    pub cmd: String,
46    pub cwd: PathBuf,
47    pub rows: u16,
48    pub cols: u16,
49    /// Optional path for the PTY log file. When set, raw PTY output is
50    /// streamed to this file so tmux display panes can `tail -F` it.
51    pub pty_log_path: Option<PathBuf>,
52}
53
54// ---------------------------------------------------------------------------
55// Queued message (buffered while agent is Working)
56// ---------------------------------------------------------------------------
57
58#[derive(Debug, Clone)]
59struct QueuedMessage {
60    #[allow(dead_code)]
61    from: String,
62    body: String,
63    message_id: Option<String>,
64}
65
66// ---------------------------------------------------------------------------
67// Shared state between PTY reader thread and command handler thread
68// ---------------------------------------------------------------------------
69
70struct ShimInner {
71    parser: vt100::Parser,
72    state: ShimState,
73    state_changed_at: Instant,
74    last_screen_hash: u64,
75    pre_injection_content: String,
76    pending_message_id: Option<String>,
77    agent_type: AgentType,
78    /// Messages queued while the agent is in Working state.
79    /// Drained FIFO on Working→Idle transitions.
80    message_queue: VecDeque<QueuedMessage>,
81}
82
83impl ShimInner {
84    fn screen_contents(&self) -> String {
85        self.parser.screen().contents()
86    }
87
88    fn last_n_lines(&self, n: usize) -> String {
89        let content = self.parser.screen().contents();
90        let lines: Vec<&str> = content.lines().collect();
91        let start = lines.len().saturating_sub(n);
92        lines[start..].join("\n")
93    }
94
95    fn cursor_position(&self) -> (u16, u16) {
96        self.parser.screen().cursor_position()
97    }
98}
99
100// ---------------------------------------------------------------------------
101// FNV-1a hash for change detection
102// ---------------------------------------------------------------------------
103
104fn content_hash(s: &str) -> u64 {
105    let mut hash: u64 = 0xcbf29ce484222325;
106    for byte in s.bytes() {
107        hash ^= byte as u64;
108        hash = hash.wrapping_mul(0x100000001b3);
109    }
110    hash
111}
112
113// ---------------------------------------------------------------------------
114// Main shim entry point
115// ---------------------------------------------------------------------------
116
117/// Run the shim. This function does not return until the shim exits.
118/// `channel` is the pre-connected socket to the orchestrator (fd 3 or
119/// from a socketpair).
120pub fn run(args: ShimArgs, channel: Channel) -> Result<()> {
121    let rows = if args.rows > 0 {
122        args.rows
123    } else {
124        DEFAULT_ROWS
125    };
126    let cols = if args.cols > 0 {
127        args.cols
128    } else {
129        DEFAULT_COLS
130    };
131
132    // -- Create PTY --
133    let pty_system = portable_pty::native_pty_system();
134    let pty_pair = pty_system
135        .openpty(PtySize {
136            rows,
137            cols,
138            pixel_width: 0,
139            pixel_height: 0,
140        })
141        .context("failed to create PTY")?;
142
143    // -- Spawn agent CLI on slave side --
144    let mut cmd = CommandBuilder::new("bash");
145    cmd.args(["-c", &args.cmd]);
146    cmd.cwd(&args.cwd);
147    cmd.env_remove("CLAUDECODE"); // prevent nested detection
148
149    let mut child = pty_pair
150        .slave
151        .spawn_command(cmd)
152        .context("failed to spawn agent CLI")?;
153
154    // Close slave in parent (agent has its own copy)
155    drop(pty_pair.slave);
156
157    let mut pty_reader = pty_pair
158        .master
159        .try_clone_reader()
160        .context("failed to clone PTY reader")?;
161
162    let pty_writer = pty_pair
163        .master
164        .take_writer()
165        .context("failed to take PTY writer")?;
166
167    // -- Shared state --
168    let inner = Arc::new(Mutex::new(ShimInner {
169        parser: vt100::Parser::new(rows, cols, SCROLLBACK_LINES),
170        state: ShimState::Starting,
171        state_changed_at: Instant::now(),
172        last_screen_hash: 0,
173        pre_injection_content: String::new(),
174        pending_message_id: None,
175        agent_type: args.agent_type,
176        message_queue: VecDeque::new(),
177    }));
178
179    // -- PTY log writer (optional) --
180    let pty_log: Option<Mutex<PtyLogWriter>> = args
181        .pty_log_path
182        .as_deref()
183        .map(|p| PtyLogWriter::new(p).context("failed to create PTY log"))
184        .transpose()?
185        .map(Mutex::new);
186    let pty_log = pty_log.map(Arc::new);
187
188    // Wrap PTY writer in Arc<Mutex> so both threads can write
189    let pty_writer = Arc::new(Mutex::new(pty_writer));
190
191    // Channel for sending events (cloned for PTY reader thread)
192    let mut cmd_channel = channel;
193    let mut evt_channel = cmd_channel.try_clone().context("failed to clone channel")?;
194
195    // -- PTY reader thread: reads agent output, feeds vt100, detects state --
196    let inner_pty = Arc::clone(&inner);
197    let log_handle = pty_log.clone();
198    let pty_writer_pty = Arc::clone(&pty_writer);
199    let pty_handle = std::thread::spawn(move || {
200        let mut buf = [0u8; 4096];
201        loop {
202            match pty_reader.read(&mut buf) {
203                Ok(0) => break, // EOF — agent closed PTY
204                Ok(n) => {
205                    // Stream raw bytes to PTY log for tmux display panes
206                    if let Some(ref log) = log_handle {
207                        let _ = log.lock().unwrap().write(&buf[..n]);
208                    }
209
210                    let mut inner = inner_pty.lock().unwrap();
211                    inner.parser.process(&buf[..n]);
212
213                    // Classify when the screen content actually changes.
214                    // The content hash avoids redundant classifications —
215                    // no time-based debounce because it causes the PTY
216                    // reader to block on the next read and miss state
217                    // transitions when the prompt arrives shortly after
218                    // preceding output.
219                    let content = inner.parser.screen().contents();
220                    let hash = content_hash(&content);
221                    if hash == inner.last_screen_hash {
222                        continue; // no visual change
223                    }
224                    inner.last_screen_hash = hash;
225
226                    let verdict = classifier::classify(inner.agent_type, inner.parser.screen());
227                    let old_state = inner.state;
228
229                    let new_state = match (old_state, verdict) {
230                        (ShimState::Starting, ScreenVerdict::AgentIdle) => Some(ShimState::Idle),
231                        (ShimState::Idle, ScreenVerdict::AgentIdle) => None,
232                        (ShimState::Working, ScreenVerdict::AgentIdle) => Some(ShimState::Idle),
233                        (ShimState::Working, ScreenVerdict::AgentWorking) => None,
234                        (_, ScreenVerdict::ContextExhausted) => Some(ShimState::ContextExhausted),
235                        (_, ScreenVerdict::Unknown) => None,
236                        (ShimState::Idle, ScreenVerdict::AgentWorking) => Some(ShimState::Working),
237                        (ShimState::Starting, ScreenVerdict::AgentWorking) => {
238                            Some(ShimState::Working)
239                        }
240                        _ => None,
241                    };
242
243                    if let Some(new) = new_state {
244                        let summary = inner.last_n_lines(5);
245                        inner.state = new;
246                        inner.state_changed_at = Instant::now();
247
248                        let pre_content = inner.pre_injection_content.clone();
249                        let current_content = inner.screen_contents();
250                        let msg_id = inner.pending_message_id.take();
251
252                        // On terminal states, drain the queue
253                        let drain_errors =
254                            if new == ShimState::Dead || new == ShimState::ContextExhausted {
255                                drain_queue_errors(&mut inner.message_queue, new)
256                            } else {
257                                Vec::new()
258                            };
259
260                        // On Working→Idle, check for queued messages to inject
261                        let queued_msg = if old_state == ShimState::Working
262                            && new == ShimState::Idle
263                            && !inner.message_queue.is_empty()
264                        {
265                            inner.message_queue.pop_front()
266                        } else {
267                            None
268                        };
269
270                        // If we're injecting a queued message, stay in Working
271                        if let Some(ref msg) = queued_msg {
272                            inner.pre_injection_content = inner.screen_contents();
273                            inner.pending_message_id = msg.message_id.clone();
274                            inner.state = ShimState::Working;
275                            inner.state_changed_at = Instant::now();
276                        }
277
278                        let queue_depth = inner.message_queue.len();
279
280                        drop(inner); // release lock before I/O
281
282                        let events = build_transition_events(
283                            old_state,
284                            new,
285                            &summary,
286                            &pre_content,
287                            &current_content,
288                            msg_id,
289                        );
290
291                        for event in events {
292                            if evt_channel.send(&event).is_err() {
293                                return; // orchestrator disconnected
294                            }
295                        }
296
297                        // Send drain errors for terminal states
298                        for event in drain_errors {
299                            if evt_channel.send(&event).is_err() {
300                                return;
301                            }
302                        }
303
304                        // Inject queued message into PTY
305                        if let Some(msg) = queued_msg {
306                            let formatted = format!("{}\n", msg.body);
307                            let mut writer = pty_writer_pty.lock().unwrap();
308                            if let Err(e) = writer.write_all(formatted.as_bytes()) {
309                                let _ = evt_channel.send(&Event::Error {
310                                    command: "SendMessage".into(),
311                                    reason: format!("PTY write failed for queued message: {e}"),
312                                });
313                            } else {
314                                writer.flush().ok();
315                            }
316
317                            // Emit StateChanged Idle→Working for the queued message
318                            let _ = evt_channel.send(&Event::StateChanged {
319                                from: ShimState::Idle,
320                                to: ShimState::Working,
321                                summary: format!(
322                                    "delivering queued message ({} remaining)",
323                                    queue_depth
324                                ),
325                            });
326                        }
327                    }
328                }
329                Err(_) => break, // PTY error — agent likely exited
330            }
331        }
332
333        // Agent PTY closed — mark as dead
334        let mut inner = inner_pty.lock().unwrap();
335        let last_lines = inner.last_n_lines(10);
336        let old = inner.state;
337        inner.state = ShimState::Dead;
338
339        // Drain any remaining queued messages
340        let drain_errors = drain_queue_errors(&mut inner.message_queue, ShimState::Dead);
341        drop(inner);
342
343        let _ = evt_channel.send(&Event::StateChanged {
344            from: old,
345            to: ShimState::Dead,
346            summary: last_lines.clone(),
347        });
348
349        let _ = evt_channel.send(&Event::Died {
350            exit_code: None,
351            last_lines,
352        });
353
354        for event in drain_errors {
355            let _ = evt_channel.send(&event);
356        }
357    });
358
359    // -- Main thread: handle commands from orchestrator --
360    let inner_cmd = Arc::clone(&inner);
361
362    // Wait for Ready (Starting → Idle transition) with timeout
363    let start = Instant::now();
364    loop {
365        let state = inner_cmd.lock().unwrap().state;
366        match state {
367            ShimState::Starting => {
368                if start.elapsed().as_secs() > READY_TIMEOUT_SECS {
369                    let last = inner_cmd.lock().unwrap().last_n_lines(10);
370                    cmd_channel.send(&Event::Error {
371                        command: "startup".into(),
372                        reason: format!(
373                            "agent did not show prompt within {}s. Last lines:\n{}",
374                            READY_TIMEOUT_SECS, last,
375                        ),
376                    })?;
377                    child.kill().ok();
378                    return Ok(());
379                }
380                std::thread::sleep(std::time::Duration::from_millis(POLL_INTERVAL_MS));
381            }
382            ShimState::Dead => {
383                return Ok(());
384            }
385            _ => {
386                cmd_channel.send(&Event::Ready)?;
387                break;
388            }
389        }
390    }
391
392    // -- Command loop --
393    loop {
394        let cmd = match cmd_channel.recv::<Command>() {
395            Ok(Some(c)) => c,
396            Ok(None) => {
397                eprintln!(
398                    "[shim {}] orchestrator disconnected, shutting down",
399                    args.id
400                );
401                child.kill().ok();
402                break;
403            }
404            Err(e) => {
405                eprintln!("[shim {}] channel error: {e}", args.id);
406                child.kill().ok();
407                break;
408            }
409        };
410
411        match cmd {
412            Command::SendMessage {
413                from,
414                body,
415                message_id,
416            } => {
417                let mut inner = inner_cmd.lock().unwrap();
418                match inner.state {
419                    ShimState::Idle => {
420                        inner.pre_injection_content = inner.screen_contents();
421                        inner.pending_message_id = message_id;
422
423                        let formatted = format!("{}\n", body);
424                        let mut writer = pty_writer.lock().unwrap();
425                        if let Err(e) = writer.write_all(formatted.as_bytes()) {
426                            drop(inner);
427                            cmd_channel.send(&Event::Error {
428                                command: "SendMessage".into(),
429                                reason: format!("PTY write failed: {e}"),
430                            })?;
431                            continue;
432                        }
433                        writer.flush().ok();
434
435                        let old = inner.state;
436                        inner.state = ShimState::Working;
437                        inner.state_changed_at = Instant::now();
438                        let summary = inner.last_n_lines(3);
439                        drop(inner);
440
441                        cmd_channel.send(&Event::StateChanged {
442                            from: old,
443                            to: ShimState::Working,
444                            summary,
445                        })?;
446                    }
447                    ShimState::Working => {
448                        // Queue the message for delivery when agent returns to Idle
449                        if inner.message_queue.len() >= MAX_QUEUE_DEPTH {
450                            let dropped = inner.message_queue.pop_front();
451                            let dropped_id = dropped.as_ref().and_then(|m| m.message_id.clone());
452                            inner.message_queue.push_back(QueuedMessage {
453                                from,
454                                body,
455                                message_id,
456                            });
457                            let depth = inner.message_queue.len();
458                            drop(inner);
459
460                            cmd_channel.send(&Event::Error {
461                                command: "SendMessage".into(),
462                                reason: format!(
463                                    "message queue full ({MAX_QUEUE_DEPTH}), dropped oldest message{}",
464                                    dropped_id
465                                        .map(|id| format!(" (id: {id})"))
466                                        .unwrap_or_default(),
467                                ),
468                            })?;
469                            cmd_channel.send(&Event::Warning {
470                                message: format!(
471                                    "message queued while agent working (depth: {depth})"
472                                ),
473                                idle_secs: None,
474                            })?;
475                        } else {
476                            inner.message_queue.push_back(QueuedMessage {
477                                from,
478                                body,
479                                message_id,
480                            });
481                            let depth = inner.message_queue.len();
482                            drop(inner);
483
484                            cmd_channel.send(&Event::Warning {
485                                message: format!(
486                                    "message queued while agent working (depth: {depth})"
487                                ),
488                                idle_secs: None,
489                            })?;
490                        }
491                    }
492                    other => {
493                        cmd_channel.send(&Event::Error {
494                            command: "SendMessage".into(),
495                            reason: format!("agent in {other} state, cannot accept message"),
496                        })?;
497                    }
498                }
499            }
500
501            Command::CaptureScreen { last_n_lines } => {
502                let inner = inner_cmd.lock().unwrap();
503                let content = match last_n_lines {
504                    Some(n) => inner.last_n_lines(n),
505                    None => inner.screen_contents(),
506                };
507                let (row, col) = inner.cursor_position();
508                drop(inner);
509                cmd_channel.send(&Event::ScreenCapture {
510                    content,
511                    cursor_row: row,
512                    cursor_col: col,
513                })?;
514            }
515
516            Command::GetState => {
517                let inner = inner_cmd.lock().unwrap();
518                let since = inner.state_changed_at.elapsed().as_secs();
519                let state = inner.state;
520                drop(inner);
521                cmd_channel.send(&Event::State {
522                    state,
523                    since_secs: since,
524                })?;
525            }
526
527            Command::Resize { rows, cols } => {
528                pty_pair
529                    .master
530                    .resize(PtySize {
531                        rows,
532                        cols,
533                        pixel_width: 0,
534                        pixel_height: 0,
535                    })
536                    .ok();
537                let mut inner = inner_cmd.lock().unwrap();
538                inner.parser.set_size(rows, cols);
539            }
540
541            Command::Ping => {
542                cmd_channel.send(&Event::Pong)?;
543            }
544
545            Command::Shutdown { timeout_secs } => {
546                eprintln!(
547                    "[shim {}] shutdown requested (timeout: {}s)",
548                    args.id, timeout_secs
549                );
550                {
551                    let mut writer = pty_writer.lock().unwrap();
552                    writer.write_all(b"\x03").ok(); // Ctrl-C
553                    writer.flush().ok();
554                }
555                let deadline = Instant::now() + std::time::Duration::from_secs(timeout_secs as u64);
556                loop {
557                    if Instant::now() > deadline {
558                        child.kill().ok();
559                        break;
560                    }
561                    if let Ok(Some(_)) = child.try_wait() {
562                        break;
563                    }
564                    std::thread::sleep(std::time::Duration::from_millis(100));
565                }
566                break;
567            }
568
569            Command::Kill => {
570                child.kill().ok();
571                break;
572            }
573        }
574    }
575
576    pty_handle.join().ok();
577    Ok(())
578}
579
580// ---------------------------------------------------------------------------
581// Drain the message queue, emitting Error events for each dropped message
582// ---------------------------------------------------------------------------
583
584fn drain_queue_errors(
585    queue: &mut VecDeque<QueuedMessage>,
586    terminal_state: ShimState,
587) -> Vec<Event> {
588    let mut events = Vec::new();
589    while let Some(msg) = queue.pop_front() {
590        events.push(Event::Error {
591            command: "SendMessage".into(),
592            reason: format!(
593                "agent entered {} state, queued message dropped{}",
594                terminal_state,
595                msg.message_id
596                    .map(|id| format!(" (id: {id})"))
597                    .unwrap_or_default(),
598            ),
599        });
600    }
601    events
602}
603
604// ---------------------------------------------------------------------------
605// Build events for a state transition
606// ---------------------------------------------------------------------------
607
608fn build_transition_events(
609    from: ShimState,
610    to: ShimState,
611    summary: &str,
612    pre_injection_content: &str,
613    current_content: &str,
614    message_id: Option<String>,
615) -> Vec<Event> {
616    let mut events = vec![Event::StateChanged {
617        from,
618        to,
619        summary: summary.to_string(),
620    }];
621
622    // Working → Idle = completion
623    if from == ShimState::Working && to == ShimState::Idle {
624        let response = extract_response(pre_injection_content, current_content);
625        events.push(Event::Completion {
626            message_id,
627            response,
628            last_lines: summary.to_string(),
629        });
630    }
631
632    // Any → ContextExhausted
633    if to == ShimState::ContextExhausted {
634        events.push(Event::ContextExhausted {
635            message: "Agent reported context exhaustion".to_string(),
636            last_lines: summary.to_string(),
637        });
638    }
639
640    events
641}
642
643/// Extract the agent's response by diffing pre-injection and post-completion
644/// screen content.
645fn extract_response(pre: &str, current: &str) -> String {
646    let pre_lines: Vec<&str> = pre.lines().collect();
647    let cur_lines: Vec<&str> = current.lines().collect();
648
649    let overlap = pre_lines.len().min(cur_lines.len());
650    let mut diverge_at = 0;
651    for i in 0..overlap {
652        if pre_lines[i] != cur_lines[i] {
653            break;
654        }
655        diverge_at = i + 1;
656    }
657
658    let response_lines = &cur_lines[diverge_at..];
659    if response_lines.is_empty() {
660        return String::new();
661    }
662
663    // Strip trailing empty lines and prompt lines
664    let mut end = response_lines.len();
665    while end > 0 && response_lines[end - 1].trim().is_empty() {
666        end -= 1;
667    }
668    while end > 0 && is_prompt_line(response_lines[end - 1].trim()) {
669        end -= 1;
670    }
671    while end > 0 && response_lines[end - 1].trim().is_empty() {
672        end -= 1;
673    }
674
675    response_lines[..end].join("\n")
676}
677
678fn is_prompt_line(line: &str) -> bool {
679    line == "\u{276F}"
680        || line.starts_with("\u{276F} ")
681        || line == "\u{203A}"
682        || line.starts_with("\u{203A} ")
683        || line.ends_with("$ ")
684        || line.ends_with('$')
685        || line.ends_with("% ")
686        || line.ends_with('%')
687        || line == ">"
688        || line.starts_with("Kiro>")
689}
690
691// ---------------------------------------------------------------------------
692// Tests
693// ---------------------------------------------------------------------------
694
695#[cfg(test)]
696mod tests {
697    use super::*;
698
699    #[test]
700    fn extract_response_basic() {
701        let pre = "line1\nline2\n$ ";
702        let cur = "line1\nline2\nhello world\n$ ";
703        assert_eq!(extract_response(pre, cur), "hello world");
704    }
705
706    #[test]
707    fn extract_response_multiline() {
708        let pre = "$ ";
709        let cur = "$ echo hi\nhi\n$ ";
710        let resp = extract_response(pre, cur);
711        assert!(resp.contains("echo hi"));
712        assert!(resp.contains("hi"));
713    }
714
715    #[test]
716    fn extract_response_empty() {
717        let pre = "$ ";
718        let cur = "$ ";
719        assert_eq!(extract_response(pre, cur), "");
720    }
721
722    #[test]
723    fn content_hash_deterministic() {
724        assert_eq!(content_hash("hello"), content_hash("hello"));
725        assert_ne!(content_hash("hello"), content_hash("world"));
726    }
727
728    #[test]
729    fn is_prompt_line_shell_dollar() {
730        assert!(is_prompt_line("user@host:~$ "));
731        assert!(is_prompt_line("$"));
732    }
733
734    #[test]
735    fn is_prompt_line_claude() {
736        assert!(is_prompt_line("\u{276F}"));
737        assert!(is_prompt_line("\u{276F} "));
738    }
739
740    #[test]
741    fn is_prompt_line_codex() {
742        assert!(is_prompt_line("\u{203A}"));
743        assert!(is_prompt_line("\u{203A} "));
744    }
745
746    #[test]
747    fn is_prompt_line_kiro() {
748        assert!(is_prompt_line("Kiro>"));
749        assert!(is_prompt_line(">"));
750    }
751
752    #[test]
753    fn is_prompt_line_not_prompt() {
754        assert!(!is_prompt_line("hello world"));
755        assert!(!is_prompt_line("some output here"));
756    }
757
758    #[test]
759    fn build_transition_events_working_to_idle() {
760        let events = build_transition_events(
761            ShimState::Working,
762            ShimState::Idle,
763            "summary",
764            "pre\n$ ",
765            "pre\nhello\n$ ",
766            Some("msg-1".into()),
767        );
768        assert_eq!(events.len(), 2);
769        assert!(matches!(&events[0], Event::StateChanged { .. }));
770        assert!(matches!(&events[1], Event::Completion { .. }));
771    }
772
773    #[test]
774    fn build_transition_events_to_context_exhausted() {
775        let events = build_transition_events(
776            ShimState::Working,
777            ShimState::ContextExhausted,
778            "summary",
779            "",
780            "",
781            None,
782        );
783        // StateChanged + ContextExhausted (no Completion since it's not Idle)
784        assert_eq!(events.len(), 2);
785        assert!(matches!(&events[1], Event::ContextExhausted { .. }));
786    }
787
788    #[test]
789    fn build_transition_events_starting_to_idle() {
790        let events = build_transition_events(
791            ShimState::Starting,
792            ShimState::Idle,
793            "summary",
794            "",
795            "",
796            None,
797        );
798        assert_eq!(events.len(), 1);
799        assert!(matches!(&events[0], Event::StateChanged { .. }));
800    }
801
802    // -----------------------------------------------------------------------
803    // Message queue tests
804    // -----------------------------------------------------------------------
805
806    fn make_queued_msg(id: &str, body: &str) -> QueuedMessage {
807        QueuedMessage {
808            from: "user".into(),
809            body: body.into(),
810            message_id: Some(id.into()),
811        }
812    }
813
814    #[test]
815    fn queue_enqueue_basic() {
816        let mut queue: VecDeque<QueuedMessage> = VecDeque::new();
817        queue.push_back(make_queued_msg("m1", "hello"));
818        queue.push_back(make_queued_msg("m2", "world"));
819        assert_eq!(queue.len(), 2);
820    }
821
822    #[test]
823    fn queue_fifo_order() {
824        let mut queue: VecDeque<QueuedMessage> = VecDeque::new();
825        queue.push_back(make_queued_msg("m1", "first"));
826        queue.push_back(make_queued_msg("m2", "second"));
827        queue.push_back(make_queued_msg("m3", "third"));
828
829        let msg = queue.pop_front().unwrap();
830        assert_eq!(msg.message_id.as_deref(), Some("m1"));
831        assert_eq!(msg.body, "first");
832
833        let msg = queue.pop_front().unwrap();
834        assert_eq!(msg.message_id.as_deref(), Some("m2"));
835        assert_eq!(msg.body, "second");
836
837        let msg = queue.pop_front().unwrap();
838        assert_eq!(msg.message_id.as_deref(), Some("m3"));
839        assert_eq!(msg.body, "third");
840
841        assert!(queue.is_empty());
842    }
843
844    #[test]
845    fn queue_overflow_drops_oldest() {
846        let mut queue: VecDeque<QueuedMessage> = VecDeque::new();
847
848        // Fill to MAX_QUEUE_DEPTH
849        for i in 0..MAX_QUEUE_DEPTH {
850            queue.push_back(make_queued_msg(&format!("m{i}"), &format!("msg {i}")));
851        }
852        assert_eq!(queue.len(), MAX_QUEUE_DEPTH);
853
854        // Overflow: drop oldest, add new
855        assert!(queue.len() >= MAX_QUEUE_DEPTH);
856        let dropped = queue.pop_front().unwrap();
857        assert_eq!(dropped.message_id.as_deref(), Some("m0")); // oldest dropped
858        queue.push_back(make_queued_msg("m_new", "new message"));
859        assert_eq!(queue.len(), MAX_QUEUE_DEPTH);
860
861        // First item should now be m1 (m0 was dropped)
862        let first = queue.pop_front().unwrap();
863        assert_eq!(first.message_id.as_deref(), Some("m1"));
864    }
865
866    #[test]
867    fn drain_queue_errors_empty() {
868        let mut queue: VecDeque<QueuedMessage> = VecDeque::new();
869        let events = drain_queue_errors(&mut queue, ShimState::Dead);
870        assert!(events.is_empty());
871    }
872
873    #[test]
874    fn drain_queue_errors_with_messages() {
875        let mut queue: VecDeque<QueuedMessage> = VecDeque::new();
876        queue.push_back(make_queued_msg("m1", "hello"));
877        queue.push_back(make_queued_msg("m2", "world"));
878        queue.push_back(QueuedMessage {
879            from: "user".into(),
880            body: "no id".into(),
881            message_id: None,
882        });
883
884        let events = drain_queue_errors(&mut queue, ShimState::Dead);
885        assert_eq!(events.len(), 3);
886        assert!(queue.is_empty());
887
888        // All should be Error events
889        for event in &events {
890            assert!(matches!(event, Event::Error { .. }));
891        }
892
893        // First error should mention the message_id
894        if let Event::Error { reason, .. } = &events[0] {
895            assert!(reason.contains("dead"));
896            assert!(reason.contains("m1"));
897        }
898
899        // Third error (no message_id) should not contain "(id:"
900        if let Event::Error { reason, .. } = &events[2] {
901            assert!(!reason.contains("(id:"));
902        }
903    }
904
905    #[test]
906    fn drain_queue_errors_context_exhausted() {
907        let mut queue: VecDeque<QueuedMessage> = VecDeque::new();
908        queue.push_back(make_queued_msg("m1", "hello"));
909
910        let events = drain_queue_errors(&mut queue, ShimState::ContextExhausted);
911        assert_eq!(events.len(), 1);
912        if let Event::Error { reason, .. } = &events[0] {
913            assert!(reason.contains("context_exhausted"));
914        }
915    }
916
917    #[test]
918    fn queued_message_preserves_fields() {
919        let msg = QueuedMessage {
920            from: "manager".into(),
921            body: "do this task".into(),
922            message_id: Some("msg-42".into()),
923        };
924        assert_eq!(msg.from, "manager");
925        assert_eq!(msg.body, "do this task");
926        assert_eq!(msg.message_id.as_deref(), Some("msg-42"));
927    }
928
929    #[test]
930    fn queued_message_none_id() {
931        let msg = QueuedMessage {
932            from: "user".into(),
933            body: "anonymous".into(),
934            message_id: None,
935        };
936        assert!(msg.message_id.is_none());
937    }
938
939    #[test]
940    fn max_queue_depth_is_16() {
941        assert_eq!(MAX_QUEUE_DEPTH, 16);
942    }
943}