1use std::collections::VecDeque;
8use std::io::{Read, Write as IoWrite};
9use std::path::PathBuf;
10use std::sync::{Arc, Mutex};
11use std::time::Instant;
12
13use anyhow::{Context, Result};
14use portable_pty::{CommandBuilder, PtySize};
15
16use super::classifier::{self, AgentType, ScreenVerdict};
17use super::protocol::{Channel, Command, Event, ShimState};
18use super::pty_log::PtyLogWriter;
19
20const DEFAULT_ROWS: u16 = 50;
25const DEFAULT_COLS: u16 = 220;
26const SCROLLBACK_LINES: usize = 5000;
27
28const POLL_INTERVAL_MS: u64 = 250;
30
31const READY_TIMEOUT_SECS: u64 = 120;
33
34const MAX_QUEUE_DEPTH: usize = 16;
36
37#[derive(Debug, Clone)]
42pub struct ShimArgs {
43 pub id: String,
44 pub agent_type: AgentType,
45 pub cmd: String,
46 pub cwd: PathBuf,
47 pub rows: u16,
48 pub cols: u16,
49 pub pty_log_path: Option<PathBuf>,
52}
53
54#[derive(Debug, Clone)]
59struct QueuedMessage {
60 #[allow(dead_code)]
61 from: String,
62 body: String,
63 message_id: Option<String>,
64}
65
66struct ShimInner {
71 parser: vt100::Parser,
72 state: ShimState,
73 state_changed_at: Instant,
74 last_screen_hash: u64,
75 pre_injection_content: String,
76 pending_message_id: Option<String>,
77 agent_type: AgentType,
78 message_queue: VecDeque<QueuedMessage>,
81}
82
83impl ShimInner {
84 fn screen_contents(&self) -> String {
85 self.parser.screen().contents()
86 }
87
88 fn last_n_lines(&self, n: usize) -> String {
89 let content = self.parser.screen().contents();
90 let lines: Vec<&str> = content.lines().collect();
91 let start = lines.len().saturating_sub(n);
92 lines[start..].join("\n")
93 }
94
95 fn cursor_position(&self) -> (u16, u16) {
96 self.parser.screen().cursor_position()
97 }
98}
99
100fn content_hash(s: &str) -> u64 {
105 let mut hash: u64 = 0xcbf29ce484222325;
106 for byte in s.bytes() {
107 hash ^= byte as u64;
108 hash = hash.wrapping_mul(0x100000001b3);
109 }
110 hash
111}
112
113pub fn run(args: ShimArgs, channel: Channel) -> Result<()> {
121 let rows = if args.rows > 0 {
122 args.rows
123 } else {
124 DEFAULT_ROWS
125 };
126 let cols = if args.cols > 0 {
127 args.cols
128 } else {
129 DEFAULT_COLS
130 };
131
132 let pty_system = portable_pty::native_pty_system();
134 let pty_pair = pty_system
135 .openpty(PtySize {
136 rows,
137 cols,
138 pixel_width: 0,
139 pixel_height: 0,
140 })
141 .context("failed to create PTY")?;
142
143 let mut cmd = CommandBuilder::new("bash");
145 cmd.args(["-c", &args.cmd]);
146 cmd.cwd(&args.cwd);
147 cmd.env_remove("CLAUDECODE"); let mut child = pty_pair
150 .slave
151 .spawn_command(cmd)
152 .context("failed to spawn agent CLI")?;
153
154 drop(pty_pair.slave);
156
157 let mut pty_reader = pty_pair
158 .master
159 .try_clone_reader()
160 .context("failed to clone PTY reader")?;
161
162 let pty_writer = pty_pair
163 .master
164 .take_writer()
165 .context("failed to take PTY writer")?;
166
167 let inner = Arc::new(Mutex::new(ShimInner {
169 parser: vt100::Parser::new(rows, cols, SCROLLBACK_LINES),
170 state: ShimState::Starting,
171 state_changed_at: Instant::now(),
172 last_screen_hash: 0,
173 pre_injection_content: String::new(),
174 pending_message_id: None,
175 agent_type: args.agent_type,
176 message_queue: VecDeque::new(),
177 }));
178
179 let pty_log: Option<Mutex<PtyLogWriter>> = args
181 .pty_log_path
182 .as_deref()
183 .map(|p| PtyLogWriter::new(p).context("failed to create PTY log"))
184 .transpose()?
185 .map(Mutex::new);
186 let pty_log = pty_log.map(Arc::new);
187
188 let pty_writer = Arc::new(Mutex::new(pty_writer));
190
191 let mut cmd_channel = channel;
193 let mut evt_channel = cmd_channel.try_clone().context("failed to clone channel")?;
194
195 let inner_pty = Arc::clone(&inner);
197 let log_handle = pty_log.clone();
198 let pty_writer_pty = Arc::clone(&pty_writer);
199 let pty_handle = std::thread::spawn(move || {
200 let mut buf = [0u8; 4096];
201 loop {
202 match pty_reader.read(&mut buf) {
203 Ok(0) => break, Ok(n) => {
205 if let Some(ref log) = log_handle {
207 let _ = log.lock().unwrap().write(&buf[..n]);
208 }
209
210 let mut inner = inner_pty.lock().unwrap();
211 inner.parser.process(&buf[..n]);
212
213 let content = inner.parser.screen().contents();
220 let hash = content_hash(&content);
221 if hash == inner.last_screen_hash {
222 continue; }
224 inner.last_screen_hash = hash;
225
226 let verdict = classifier::classify(inner.agent_type, inner.parser.screen());
227 let old_state = inner.state;
228
229 let new_state = match (old_state, verdict) {
230 (ShimState::Starting, ScreenVerdict::AgentIdle) => Some(ShimState::Idle),
231 (ShimState::Idle, ScreenVerdict::AgentIdle) => None,
232 (ShimState::Working, ScreenVerdict::AgentIdle) => Some(ShimState::Idle),
233 (ShimState::Working, ScreenVerdict::AgentWorking) => None,
234 (_, ScreenVerdict::ContextExhausted) => Some(ShimState::ContextExhausted),
235 (_, ScreenVerdict::Unknown) => None,
236 (ShimState::Idle, ScreenVerdict::AgentWorking) => Some(ShimState::Working),
237 (ShimState::Starting, ScreenVerdict::AgentWorking) => {
238 Some(ShimState::Working)
239 }
240 _ => None,
241 };
242
243 if let Some(new) = new_state {
244 let summary = inner.last_n_lines(5);
245 inner.state = new;
246 inner.state_changed_at = Instant::now();
247
248 let pre_content = inner.pre_injection_content.clone();
249 let current_content = inner.screen_contents();
250 let msg_id = inner.pending_message_id.take();
251
252 let drain_errors =
254 if new == ShimState::Dead || new == ShimState::ContextExhausted {
255 drain_queue_errors(&mut inner.message_queue, new)
256 } else {
257 Vec::new()
258 };
259
260 let queued_msg = if old_state == ShimState::Working
262 && new == ShimState::Idle
263 && !inner.message_queue.is_empty()
264 {
265 inner.message_queue.pop_front()
266 } else {
267 None
268 };
269
270 if let Some(ref msg) = queued_msg {
272 inner.pre_injection_content = inner.screen_contents();
273 inner.pending_message_id = msg.message_id.clone();
274 inner.state = ShimState::Working;
275 inner.state_changed_at = Instant::now();
276 }
277
278 let queue_depth = inner.message_queue.len();
279
280 drop(inner); let events = build_transition_events(
283 old_state,
284 new,
285 &summary,
286 &pre_content,
287 ¤t_content,
288 msg_id,
289 );
290
291 for event in events {
292 if evt_channel.send(&event).is_err() {
293 return; }
295 }
296
297 for event in drain_errors {
299 if evt_channel.send(&event).is_err() {
300 return;
301 }
302 }
303
304 if let Some(msg) = queued_msg {
306 let formatted = format!("{}\n", msg.body);
307 let mut writer = pty_writer_pty.lock().unwrap();
308 if let Err(e) = writer.write_all(formatted.as_bytes()) {
309 let _ = evt_channel.send(&Event::Error {
310 command: "SendMessage".into(),
311 reason: format!("PTY write failed for queued message: {e}"),
312 });
313 } else {
314 writer.flush().ok();
315 }
316
317 let _ = evt_channel.send(&Event::StateChanged {
319 from: ShimState::Idle,
320 to: ShimState::Working,
321 summary: format!(
322 "delivering queued message ({} remaining)",
323 queue_depth
324 ),
325 });
326 }
327 }
328 }
329 Err(_) => break, }
331 }
332
333 let mut inner = inner_pty.lock().unwrap();
335 let last_lines = inner.last_n_lines(10);
336 let old = inner.state;
337 inner.state = ShimState::Dead;
338
339 let drain_errors = drain_queue_errors(&mut inner.message_queue, ShimState::Dead);
341 drop(inner);
342
343 let _ = evt_channel.send(&Event::StateChanged {
344 from: old,
345 to: ShimState::Dead,
346 summary: last_lines.clone(),
347 });
348
349 let _ = evt_channel.send(&Event::Died {
350 exit_code: None,
351 last_lines,
352 });
353
354 for event in drain_errors {
355 let _ = evt_channel.send(&event);
356 }
357 });
358
359 let inner_cmd = Arc::clone(&inner);
361
362 let start = Instant::now();
364 loop {
365 let state = inner_cmd.lock().unwrap().state;
366 match state {
367 ShimState::Starting => {
368 if start.elapsed().as_secs() > READY_TIMEOUT_SECS {
369 let last = inner_cmd.lock().unwrap().last_n_lines(10);
370 cmd_channel.send(&Event::Error {
371 command: "startup".into(),
372 reason: format!(
373 "agent did not show prompt within {}s. Last lines:\n{}",
374 READY_TIMEOUT_SECS, last,
375 ),
376 })?;
377 child.kill().ok();
378 return Ok(());
379 }
380 std::thread::sleep(std::time::Duration::from_millis(POLL_INTERVAL_MS));
381 }
382 ShimState::Dead => {
383 return Ok(());
384 }
385 _ => {
386 cmd_channel.send(&Event::Ready)?;
387 break;
388 }
389 }
390 }
391
392 loop {
394 let cmd = match cmd_channel.recv::<Command>() {
395 Ok(Some(c)) => c,
396 Ok(None) => {
397 eprintln!(
398 "[shim {}] orchestrator disconnected, shutting down",
399 args.id
400 );
401 child.kill().ok();
402 break;
403 }
404 Err(e) => {
405 eprintln!("[shim {}] channel error: {e}", args.id);
406 child.kill().ok();
407 break;
408 }
409 };
410
411 match cmd {
412 Command::SendMessage {
413 from,
414 body,
415 message_id,
416 } => {
417 let mut inner = inner_cmd.lock().unwrap();
418 match inner.state {
419 ShimState::Idle => {
420 inner.pre_injection_content = inner.screen_contents();
421 inner.pending_message_id = message_id;
422
423 let formatted = format!("{}\n", body);
424 let mut writer = pty_writer.lock().unwrap();
425 if let Err(e) = writer.write_all(formatted.as_bytes()) {
426 drop(inner);
427 cmd_channel.send(&Event::Error {
428 command: "SendMessage".into(),
429 reason: format!("PTY write failed: {e}"),
430 })?;
431 continue;
432 }
433 writer.flush().ok();
434
435 let old = inner.state;
436 inner.state = ShimState::Working;
437 inner.state_changed_at = Instant::now();
438 let summary = inner.last_n_lines(3);
439 drop(inner);
440
441 cmd_channel.send(&Event::StateChanged {
442 from: old,
443 to: ShimState::Working,
444 summary,
445 })?;
446 }
447 ShimState::Working => {
448 if inner.message_queue.len() >= MAX_QUEUE_DEPTH {
450 let dropped = inner.message_queue.pop_front();
451 let dropped_id = dropped.as_ref().and_then(|m| m.message_id.clone());
452 inner.message_queue.push_back(QueuedMessage {
453 from,
454 body,
455 message_id,
456 });
457 let depth = inner.message_queue.len();
458 drop(inner);
459
460 cmd_channel.send(&Event::Error {
461 command: "SendMessage".into(),
462 reason: format!(
463 "message queue full ({MAX_QUEUE_DEPTH}), dropped oldest message{}",
464 dropped_id
465 .map(|id| format!(" (id: {id})"))
466 .unwrap_or_default(),
467 ),
468 })?;
469 cmd_channel.send(&Event::Warning {
470 message: format!(
471 "message queued while agent working (depth: {depth})"
472 ),
473 idle_secs: None,
474 })?;
475 } else {
476 inner.message_queue.push_back(QueuedMessage {
477 from,
478 body,
479 message_id,
480 });
481 let depth = inner.message_queue.len();
482 drop(inner);
483
484 cmd_channel.send(&Event::Warning {
485 message: format!(
486 "message queued while agent working (depth: {depth})"
487 ),
488 idle_secs: None,
489 })?;
490 }
491 }
492 other => {
493 cmd_channel.send(&Event::Error {
494 command: "SendMessage".into(),
495 reason: format!("agent in {other} state, cannot accept message"),
496 })?;
497 }
498 }
499 }
500
501 Command::CaptureScreen { last_n_lines } => {
502 let inner = inner_cmd.lock().unwrap();
503 let content = match last_n_lines {
504 Some(n) => inner.last_n_lines(n),
505 None => inner.screen_contents(),
506 };
507 let (row, col) = inner.cursor_position();
508 drop(inner);
509 cmd_channel.send(&Event::ScreenCapture {
510 content,
511 cursor_row: row,
512 cursor_col: col,
513 })?;
514 }
515
516 Command::GetState => {
517 let inner = inner_cmd.lock().unwrap();
518 let since = inner.state_changed_at.elapsed().as_secs();
519 let state = inner.state;
520 drop(inner);
521 cmd_channel.send(&Event::State {
522 state,
523 since_secs: since,
524 })?;
525 }
526
527 Command::Resize { rows, cols } => {
528 pty_pair
529 .master
530 .resize(PtySize {
531 rows,
532 cols,
533 pixel_width: 0,
534 pixel_height: 0,
535 })
536 .ok();
537 let mut inner = inner_cmd.lock().unwrap();
538 inner.parser.set_size(rows, cols);
539 }
540
541 Command::Ping => {
542 cmd_channel.send(&Event::Pong)?;
543 }
544
545 Command::Shutdown { timeout_secs } => {
546 eprintln!(
547 "[shim {}] shutdown requested (timeout: {}s)",
548 args.id, timeout_secs
549 );
550 {
551 let mut writer = pty_writer.lock().unwrap();
552 writer.write_all(b"\x03").ok(); writer.flush().ok();
554 }
555 let deadline = Instant::now() + std::time::Duration::from_secs(timeout_secs as u64);
556 loop {
557 if Instant::now() > deadline {
558 child.kill().ok();
559 break;
560 }
561 if let Ok(Some(_)) = child.try_wait() {
562 break;
563 }
564 std::thread::sleep(std::time::Duration::from_millis(100));
565 }
566 break;
567 }
568
569 Command::Kill => {
570 child.kill().ok();
571 break;
572 }
573 }
574 }
575
576 pty_handle.join().ok();
577 Ok(())
578}
579
580fn drain_queue_errors(
585 queue: &mut VecDeque<QueuedMessage>,
586 terminal_state: ShimState,
587) -> Vec<Event> {
588 let mut events = Vec::new();
589 while let Some(msg) = queue.pop_front() {
590 events.push(Event::Error {
591 command: "SendMessage".into(),
592 reason: format!(
593 "agent entered {} state, queued message dropped{}",
594 terminal_state,
595 msg.message_id
596 .map(|id| format!(" (id: {id})"))
597 .unwrap_or_default(),
598 ),
599 });
600 }
601 events
602}
603
604fn build_transition_events(
609 from: ShimState,
610 to: ShimState,
611 summary: &str,
612 pre_injection_content: &str,
613 current_content: &str,
614 message_id: Option<String>,
615) -> Vec<Event> {
616 let mut events = vec![Event::StateChanged {
617 from,
618 to,
619 summary: summary.to_string(),
620 }];
621
622 if from == ShimState::Working && to == ShimState::Idle {
624 let response = extract_response(pre_injection_content, current_content);
625 events.push(Event::Completion {
626 message_id,
627 response,
628 last_lines: summary.to_string(),
629 });
630 }
631
632 if to == ShimState::ContextExhausted {
634 events.push(Event::ContextExhausted {
635 message: "Agent reported context exhaustion".to_string(),
636 last_lines: summary.to_string(),
637 });
638 }
639
640 events
641}
642
643fn extract_response(pre: &str, current: &str) -> String {
646 let pre_lines: Vec<&str> = pre.lines().collect();
647 let cur_lines: Vec<&str> = current.lines().collect();
648
649 let overlap = pre_lines.len().min(cur_lines.len());
650 let mut diverge_at = 0;
651 for i in 0..overlap {
652 if pre_lines[i] != cur_lines[i] {
653 break;
654 }
655 diverge_at = i + 1;
656 }
657
658 let response_lines = &cur_lines[diverge_at..];
659 if response_lines.is_empty() {
660 return String::new();
661 }
662
663 let mut end = response_lines.len();
665 while end > 0 && response_lines[end - 1].trim().is_empty() {
666 end -= 1;
667 }
668 while end > 0 && is_prompt_line(response_lines[end - 1].trim()) {
669 end -= 1;
670 }
671 while end > 0 && response_lines[end - 1].trim().is_empty() {
672 end -= 1;
673 }
674
675 response_lines[..end].join("\n")
676}
677
678fn is_prompt_line(line: &str) -> bool {
679 line == "\u{276F}"
680 || line.starts_with("\u{276F} ")
681 || line == "\u{203A}"
682 || line.starts_with("\u{203A} ")
683 || line.ends_with("$ ")
684 || line.ends_with('$')
685 || line.ends_with("% ")
686 || line.ends_with('%')
687 || line == ">"
688 || line.starts_with("Kiro>")
689}
690
691#[cfg(test)]
696mod tests {
697 use super::*;
698
699 #[test]
700 fn extract_response_basic() {
701 let pre = "line1\nline2\n$ ";
702 let cur = "line1\nline2\nhello world\n$ ";
703 assert_eq!(extract_response(pre, cur), "hello world");
704 }
705
706 #[test]
707 fn extract_response_multiline() {
708 let pre = "$ ";
709 let cur = "$ echo hi\nhi\n$ ";
710 let resp = extract_response(pre, cur);
711 assert!(resp.contains("echo hi"));
712 assert!(resp.contains("hi"));
713 }
714
715 #[test]
716 fn extract_response_empty() {
717 let pre = "$ ";
718 let cur = "$ ";
719 assert_eq!(extract_response(pre, cur), "");
720 }
721
722 #[test]
723 fn content_hash_deterministic() {
724 assert_eq!(content_hash("hello"), content_hash("hello"));
725 assert_ne!(content_hash("hello"), content_hash("world"));
726 }
727
728 #[test]
729 fn is_prompt_line_shell_dollar() {
730 assert!(is_prompt_line("user@host:~$ "));
731 assert!(is_prompt_line("$"));
732 }
733
734 #[test]
735 fn is_prompt_line_claude() {
736 assert!(is_prompt_line("\u{276F}"));
737 assert!(is_prompt_line("\u{276F} "));
738 }
739
740 #[test]
741 fn is_prompt_line_codex() {
742 assert!(is_prompt_line("\u{203A}"));
743 assert!(is_prompt_line("\u{203A} "));
744 }
745
746 #[test]
747 fn is_prompt_line_kiro() {
748 assert!(is_prompt_line("Kiro>"));
749 assert!(is_prompt_line(">"));
750 }
751
752 #[test]
753 fn is_prompt_line_not_prompt() {
754 assert!(!is_prompt_line("hello world"));
755 assert!(!is_prompt_line("some output here"));
756 }
757
758 #[test]
759 fn build_transition_events_working_to_idle() {
760 let events = build_transition_events(
761 ShimState::Working,
762 ShimState::Idle,
763 "summary",
764 "pre\n$ ",
765 "pre\nhello\n$ ",
766 Some("msg-1".into()),
767 );
768 assert_eq!(events.len(), 2);
769 assert!(matches!(&events[0], Event::StateChanged { .. }));
770 assert!(matches!(&events[1], Event::Completion { .. }));
771 }
772
773 #[test]
774 fn build_transition_events_to_context_exhausted() {
775 let events = build_transition_events(
776 ShimState::Working,
777 ShimState::ContextExhausted,
778 "summary",
779 "",
780 "",
781 None,
782 );
783 assert_eq!(events.len(), 2);
785 assert!(matches!(&events[1], Event::ContextExhausted { .. }));
786 }
787
788 #[test]
789 fn build_transition_events_starting_to_idle() {
790 let events = build_transition_events(
791 ShimState::Starting,
792 ShimState::Idle,
793 "summary",
794 "",
795 "",
796 None,
797 );
798 assert_eq!(events.len(), 1);
799 assert!(matches!(&events[0], Event::StateChanged { .. }));
800 }
801
802 fn make_queued_msg(id: &str, body: &str) -> QueuedMessage {
807 QueuedMessage {
808 from: "user".into(),
809 body: body.into(),
810 message_id: Some(id.into()),
811 }
812 }
813
814 #[test]
815 fn queue_enqueue_basic() {
816 let mut queue: VecDeque<QueuedMessage> = VecDeque::new();
817 queue.push_back(make_queued_msg("m1", "hello"));
818 queue.push_back(make_queued_msg("m2", "world"));
819 assert_eq!(queue.len(), 2);
820 }
821
822 #[test]
823 fn queue_fifo_order() {
824 let mut queue: VecDeque<QueuedMessage> = VecDeque::new();
825 queue.push_back(make_queued_msg("m1", "first"));
826 queue.push_back(make_queued_msg("m2", "second"));
827 queue.push_back(make_queued_msg("m3", "third"));
828
829 let msg = queue.pop_front().unwrap();
830 assert_eq!(msg.message_id.as_deref(), Some("m1"));
831 assert_eq!(msg.body, "first");
832
833 let msg = queue.pop_front().unwrap();
834 assert_eq!(msg.message_id.as_deref(), Some("m2"));
835 assert_eq!(msg.body, "second");
836
837 let msg = queue.pop_front().unwrap();
838 assert_eq!(msg.message_id.as_deref(), Some("m3"));
839 assert_eq!(msg.body, "third");
840
841 assert!(queue.is_empty());
842 }
843
844 #[test]
845 fn queue_overflow_drops_oldest() {
846 let mut queue: VecDeque<QueuedMessage> = VecDeque::new();
847
848 for i in 0..MAX_QUEUE_DEPTH {
850 queue.push_back(make_queued_msg(&format!("m{i}"), &format!("msg {i}")));
851 }
852 assert_eq!(queue.len(), MAX_QUEUE_DEPTH);
853
854 assert!(queue.len() >= MAX_QUEUE_DEPTH);
856 let dropped = queue.pop_front().unwrap();
857 assert_eq!(dropped.message_id.as_deref(), Some("m0")); queue.push_back(make_queued_msg("m_new", "new message"));
859 assert_eq!(queue.len(), MAX_QUEUE_DEPTH);
860
861 let first = queue.pop_front().unwrap();
863 assert_eq!(first.message_id.as_deref(), Some("m1"));
864 }
865
866 #[test]
867 fn drain_queue_errors_empty() {
868 let mut queue: VecDeque<QueuedMessage> = VecDeque::new();
869 let events = drain_queue_errors(&mut queue, ShimState::Dead);
870 assert!(events.is_empty());
871 }
872
873 #[test]
874 fn drain_queue_errors_with_messages() {
875 let mut queue: VecDeque<QueuedMessage> = VecDeque::new();
876 queue.push_back(make_queued_msg("m1", "hello"));
877 queue.push_back(make_queued_msg("m2", "world"));
878 queue.push_back(QueuedMessage {
879 from: "user".into(),
880 body: "no id".into(),
881 message_id: None,
882 });
883
884 let events = drain_queue_errors(&mut queue, ShimState::Dead);
885 assert_eq!(events.len(), 3);
886 assert!(queue.is_empty());
887
888 for event in &events {
890 assert!(matches!(event, Event::Error { .. }));
891 }
892
893 if let Event::Error { reason, .. } = &events[0] {
895 assert!(reason.contains("dead"));
896 assert!(reason.contains("m1"));
897 }
898
899 if let Event::Error { reason, .. } = &events[2] {
901 assert!(!reason.contains("(id:"));
902 }
903 }
904
905 #[test]
906 fn drain_queue_errors_context_exhausted() {
907 let mut queue: VecDeque<QueuedMessage> = VecDeque::new();
908 queue.push_back(make_queued_msg("m1", "hello"));
909
910 let events = drain_queue_errors(&mut queue, ShimState::ContextExhausted);
911 assert_eq!(events.len(), 1);
912 if let Event::Error { reason, .. } = &events[0] {
913 assert!(reason.contains("context_exhausted"));
914 }
915 }
916
917 #[test]
918 fn queued_message_preserves_fields() {
919 let msg = QueuedMessage {
920 from: "manager".into(),
921 body: "do this task".into(),
922 message_id: Some("msg-42".into()),
923 };
924 assert_eq!(msg.from, "manager");
925 assert_eq!(msg.body, "do this task");
926 assert_eq!(msg.message_id.as_deref(), Some("msg-42"));
927 }
928
929 #[test]
930 fn queued_message_none_id() {
931 let msg = QueuedMessage {
932 from: "user".into(),
933 body: "anonymous".into(),
934 message_id: None,
935 };
936 assert!(msg.message_id.is_none());
937 }
938
939 #[test]
940 fn max_queue_depth_is_16() {
941 assert_eq!(MAX_QUEUE_DEPTH, 16);
942 }
943}