1use std::collections::HashMap;
13use std::collections::VecDeque;
14use std::sync::OnceLock;
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, Instant};
17
18use async_trait::async_trait;
19use caliban_agent_core::{Tool, ToolContext, ToolError};
20use caliban_provider::{ContentBlock, TextBlock};
21use caliban_sandbox::SandboxedShim;
22use serde::Deserialize;
23use serde_json::{Value, json};
24
25pub const DEFAULT_RING_CAP_BYTES: usize = 5 * 1024 * 1024 * 1024;
27
28pub const KILL_GRACE: Duration = Duration::from_secs(5);
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum BashStatus {
35 Running,
37 Exited(i32),
39 Killed,
41}
42
43impl BashStatus {
44 #[must_use]
46 pub fn as_str(self) -> &'static str {
47 match self {
48 Self::Running => "running",
49 Self::Exited(_) => "exited",
50 Self::Killed => "killed",
51 }
52 }
53}
54
55#[derive(Debug)]
57pub struct RingBuffer {
58 cap: usize,
59 written: u64,
62 dropped: u64,
65 buf: VecDeque<u8>,
66}
67
68impl RingBuffer {
69 #[must_use]
71 pub fn with_cap(cap: usize) -> Self {
72 Self {
73 cap,
74 written: 0,
75 dropped: 0,
76 buf: VecDeque::with_capacity(std::cmp::min(cap, 64 * 1024)),
77 }
78 }
79
80 pub fn push(&mut self, bytes: &[u8]) -> u64 {
83 let to_take = if bytes.len() > self.cap {
85 let start = bytes.len() - self.cap;
86 self.buf.clear();
87 self.dropped = self.written + (start as u64);
88 &bytes[start..]
89 } else {
90 bytes
91 };
92 let new_total = self.buf.len() + to_take.len();
94 if new_total > self.cap {
95 let drop_n = new_total - self.cap;
96 for _ in 0..drop_n {
97 self.buf.pop_front();
98 }
99 self.dropped += drop_n as u64;
100 }
101 self.buf.extend(to_take);
102 self.written += bytes.len() as u64;
103 self.written
104 }
105
106 #[must_use]
108 pub fn written(&self) -> u64 {
109 self.written
110 }
111
112 #[must_use]
114 pub fn dropped(&self) -> u64 {
115 self.dropped
116 }
117
118 #[must_use]
120 pub fn len(&self) -> usize {
121 self.buf.len()
122 }
123
124 #[must_use]
126 pub fn is_empty(&self) -> bool {
127 self.buf.is_empty()
128 }
129
130 #[must_use]
133 pub fn snapshot(&self) -> (String, u64) {
134 let bytes: Vec<u8> = self.buf.iter().copied().collect();
135 let text = String::from_utf8_lossy(&bytes).into_owned();
136 (text, self.written)
137 }
138
139 #[must_use]
144 pub fn read_since(&self, since: u64) -> (String, u64, u64) {
145 let start_offset = std::cmp::max(since, self.dropped);
146 let skip = usize::try_from(start_offset - self.dropped).unwrap_or(usize::MAX);
149 let bytes: Vec<u8> = self.buf.iter().copied().skip(skip).collect();
150 let text = String::from_utf8_lossy(&bytes).into_owned();
151 (text, start_offset, self.written)
152 }
153}
154
155pub struct BashJob {
157 pub id: String,
159 pub command: String,
161 pub started_at: Instant,
163 pub status: Mutex<BashStatus>,
165 pub stdout: Mutex<RingBuffer>,
167 pub stderr: Mutex<RingBuffer>,
169 pub pid: Mutex<Option<u32>>,
172 pub cancel: tokio_util::sync::CancellationToken,
174}
175
176impl std::fmt::Debug for BashJob {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 f.debug_struct("BashJob")
179 .field("id", &self.id)
180 .field("command", &self.command)
181 .field("started_at", &self.started_at)
182 .field("status", &*self.status.lock().unwrap())
183 .finish_non_exhaustive()
184 }
185}
186
187impl BashJob {
188 #[must_use]
193 pub fn snapshot_status(&self) -> BashStatus {
194 *self.status.lock().unwrap()
195 }
196}
197
198pub struct BashBgRegistry {
200 jobs: Mutex<HashMap<String, Arc<BashJob>>>,
201 cap_bytes: usize,
203}
204
205impl std::fmt::Debug for BashBgRegistry {
206 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207 f.debug_struct("BashBgRegistry")
208 .field("jobs", &self.jobs.lock().unwrap().len())
209 .field("cap_bytes", &self.cap_bytes)
210 .finish()
211 }
212}
213
214impl BashBgRegistry {
215 #[must_use]
217 pub fn new() -> Self {
218 Self::with_cap(DEFAULT_RING_CAP_BYTES)
219 }
220
221 #[must_use]
223 pub fn with_cap(cap_bytes: usize) -> Self {
224 Self {
225 jobs: Mutex::new(HashMap::new()),
226 cap_bytes,
227 }
228 }
229
230 #[must_use]
232 pub fn new_for_test(cap_bytes: usize) -> Arc<Self> {
233 Arc::new(Self::with_cap(cap_bytes))
234 }
235
236 #[must_use]
238 pub fn cap_bytes(&self) -> usize {
239 self.cap_bytes
240 }
241
242 #[must_use]
247 pub fn job_count(&self) -> usize {
248 self.jobs.lock().unwrap().len()
249 }
250
251 #[must_use]
256 pub fn running_count(&self) -> usize {
257 self.jobs
258 .lock()
259 .unwrap()
260 .values()
261 .filter(|j| j.snapshot_status() == BashStatus::Running)
262 .count()
263 }
264
265 pub fn insert(&self, job: Arc<BashJob>) {
270 self.jobs.lock().unwrap().insert(job.id.clone(), job);
271 }
272
273 #[must_use]
278 pub fn get(&self, id: &str) -> Option<Arc<BashJob>> {
279 self.jobs.lock().unwrap().get(id).cloned()
280 }
281
282 pub fn remove(&self, id: &str) -> Option<Arc<BashJob>> {
287 self.jobs.lock().unwrap().remove(id)
288 }
289
290 #[must_use]
295 pub fn list(&self) -> Vec<(String, BashStatus, String)> {
296 self.jobs
297 .lock()
298 .unwrap()
299 .values()
300 .map(|j| (j.id.clone(), j.snapshot_status(), j.command.clone()))
301 .collect()
302 }
303
304 pub fn kill_all(&self) {
309 let ids: Vec<String> = self.jobs.lock().unwrap().keys().cloned().collect();
310 for id in ids {
311 if let Some(job) = self.get(&id)
312 && job.snapshot_status() == BashStatus::Running
313 {
314 kill_job_now(&job, true);
315 }
316 }
317 }
318}
319
320impl Default for BashBgRegistry {
321 fn default() -> Self {
322 Self::new()
323 }
324}
325
326static GLOBAL_REGISTRY: OnceLock<Arc<BashBgRegistry>> = OnceLock::new();
329
330#[must_use]
332pub fn global_registry() -> Arc<BashBgRegistry> {
333 GLOBAL_REGISTRY
334 .get_or_init(|| Arc::new(BashBgRegistry::new()))
335 .clone()
336}
337
338#[allow(unsafe_code)] fn kill_job_now(job: &BashJob, force_kill: bool) {
341 let pid = *job.pid.lock().unwrap();
342 #[cfg(unix)]
343 if let Some(p) = pid {
344 super::signal_process_group(
346 p,
347 if force_kill {
348 libc::SIGKILL
349 } else {
350 libc::SIGTERM
351 },
352 );
353 }
354 #[cfg(not(unix))]
355 let _ = pid;
356 job.cancel.cancel();
357}
358
359#[must_use]
361pub fn new_shell_id() -> String {
362 let id = uuid::Uuid::new_v4().simple().to_string();
363 id.chars().take(12).collect()
364}
365
366pub(super) fn build_shell(
384 command: &str,
385 cwd: &std::path::Path,
386 sandbox: Option<&Arc<SandboxedShim>>,
387) -> Result<tokio::process::Command, ToolError> {
388 use std::process::Stdio;
389
390 let mut shell = tokio::process::Command::new("/bin/sh");
391 shell
392 .arg("-c")
393 .arg(command)
394 .current_dir(cwd)
395 .stdin(Stdio::null())
396 .stdout(Stdio::piped())
397 .stderr(Stdio::piped())
398 .kill_on_drop(true);
399 #[cfg(unix)]
400 shell.process_group(0);
401
402 if let Some(shim) = sandbox {
403 shim.wrap_command(&mut shell, command)
404 .map_err(|e| ToolError::execution(std::io::Error::other(format!("sandbox: {e}"))))?;
405 }
406
407 Ok(shell)
408}
409
410pub fn spawn_background(
422 registry: &Arc<BashBgRegistry>,
423 command: String,
424 cwd: &std::path::Path,
425 sandbox: Option<&Arc<SandboxedShim>>,
426) -> Result<String, ToolError> {
427 let id = new_shell_id();
428 let cap = registry.cap_bytes();
429
430 let mut shell = build_shell(&command, cwd, sandbox)?;
434
435 let mut child = shell.spawn().map_err(ToolError::execution)?;
436 let pid = child.id();
437 let stdout = child.stdout.take().expect("piped");
438 let stderr = child.stderr.take().expect("piped");
439
440 let job = Arc::new(BashJob {
441 id: id.clone(),
442 command,
443 started_at: Instant::now(),
444 status: Mutex::new(BashStatus::Running),
445 stdout: Mutex::new(RingBuffer::with_cap(cap)),
446 stderr: Mutex::new(RingBuffer::with_cap(cap)),
447 pid: Mutex::new(pid),
448 cancel: tokio_util::sync::CancellationToken::new(),
449 });
450 registry.insert(job.clone());
451
452 let stdout_job = job.clone();
454 tokio::spawn(async move {
455 drain_to_ring(stdout, &stdout_job, true).await;
456 });
457 let stderr_job = job.clone();
458 tokio::spawn(async move {
459 drain_to_ring(stderr, &stderr_job, false).await;
460 });
461
462 let watch_job = job.clone();
464 tokio::spawn(async move {
465 let exit = tokio::select! {
466 r = child.wait() => Some(r),
467 () = watch_job.cancel.cancelled() => None,
468 };
469 {
470 let mut status_lock = watch_job.status.lock().unwrap();
471 if let Some(Ok(s)) = exit {
472 if let Some(code) = s.code() {
473 *status_lock = BashStatus::Exited(code);
474 } else {
475 *status_lock = BashStatus::Killed;
476 }
477 } else {
478 *status_lock = BashStatus::Killed;
479 }
480 }
481 if !matches!(exit, Some(Ok(_))) {
482 let _ = child.start_kill();
484 drop(child);
485 }
486 *watch_job.pid.lock().unwrap() = None;
488 });
489
490 Ok(id)
491}
492
493async fn drain_to_ring<R>(reader: R, job: &BashJob, to_stdout: bool)
494where
495 R: tokio::io::AsyncRead + Unpin,
496{
497 use tokio::io::AsyncReadExt;
498 let mut reader = reader;
499 let mut buf = [0u8; 8192];
500 loop {
501 match reader.read(&mut buf).await {
502 Ok(0) | Err(_) => break,
503 Ok(n) => {
504 if to_stdout {
505 job.stdout.lock().unwrap().push(&buf[..n]);
506 } else {
507 job.stderr.lock().unwrap().push(&buf[..n]);
508 }
509 }
510 }
511 }
512}
513
514#[derive(Debug)]
520pub struct BashOutputTool {
521 registry: Arc<BashBgRegistry>,
522 schema: OnceLock<Value>,
523}
524
525impl BashOutputTool {
526 #[must_use]
528 pub fn new(registry: Arc<BashBgRegistry>) -> Self {
529 Self {
530 registry,
531 schema: OnceLock::new(),
532 }
533 }
534
535 #[must_use]
537 pub fn with_global_registry() -> Self {
538 Self::new(global_registry())
539 }
540}
541
542#[derive(Debug, Deserialize)]
543struct BashOutputInput {
544 shell_id: String,
545 #[serde(default)]
546 since_offset: Option<u64>,
547}
548
549#[async_trait]
550impl Tool for BashOutputTool {
551 fn name(&self) -> &'static str {
552 "BashOutput"
553 }
554
555 fn description(&self) -> &'static str {
556 "Read the latest stdout/stderr from a background shell launched via Bash with background:true. Optional since_offset returns only the slice past that absolute byte offset (for incremental polling)."
557 }
558
559 fn input_schema(&self) -> &Value {
560 self.schema.get_or_init(|| {
561 json!({
562 "type": "object",
563 "properties": {
564 "shell_id": { "type": "string", "description": "Shell id returned by Bash(background=true)." },
565 "since_offset": { "type": "integer", "minimum": 0, "description": "Return only bytes after this absolute byte offset (for incremental polling)." }
566 },
567 "required": ["shell_id"]
568 })
569 })
570 }
571
572 async fn invoke(&self, input: Value, _cx: ToolContext) -> Result<Vec<ContentBlock>, ToolError> {
573 let parsed: BashOutputInput = crate::parse_input(input)?;
574 let job = self.registry.get(&parsed.shell_id).ok_or_else(|| {
575 ToolError::execution(std::io::Error::other(format!(
576 "no background shell with id {}",
577 parsed.shell_id
578 )))
579 })?;
580 let since = parsed.since_offset.unwrap_or(0);
581 let (stdout_text, stdout_start, stdout_end) = job.stdout.lock().unwrap().read_since(since);
582 let (stderr_text, stderr_start, stderr_end) = job.stderr.lock().unwrap().read_since(since);
583 let status = job.snapshot_status();
584
585 let age = job.started_at.elapsed();
586 let header = format!(
587 "shell_id: {} status: {} age: {:.1}s\nstdout (bytes {}..{}):\n",
588 job.id,
589 status.as_str(),
590 age.as_secs_f32(),
591 stdout_start,
592 stdout_end,
593 );
594 let mid = format!("\nstderr (bytes {stderr_start}..{stderr_end}):\n");
595 let text = format!("{header}{stdout_text}{mid}{stderr_text}");
596
597 Ok(vec![ContentBlock::Text(TextBlock {
598 text,
599 cache_control: None,
600 })])
601 }
602}
603
604#[derive(Debug)]
611pub struct KillShellTool {
612 registry: Arc<BashBgRegistry>,
613 grace: Duration,
614 schema: OnceLock<Value>,
615}
616
617impl KillShellTool {
618 #[must_use]
620 pub fn new(registry: Arc<BashBgRegistry>) -> Self {
621 Self::with_grace(registry, KILL_GRACE)
622 }
623
624 #[must_use]
626 pub fn with_grace(registry: Arc<BashBgRegistry>, grace: Duration) -> Self {
627 Self {
628 registry,
629 grace,
630 schema: OnceLock::new(),
631 }
632 }
633
634 #[must_use]
636 pub fn with_global_registry() -> Self {
637 Self::new(global_registry())
638 }
639}
640
641#[derive(Debug, Deserialize)]
642struct KillShellInput {
643 shell_id: String,
644}
645
646#[async_trait]
647impl Tool for KillShellTool {
648 fn name(&self) -> &'static str {
649 "KillShell"
650 }
651
652 fn description(&self) -> &'static str {
653 "Terminate a background shell launched via Bash with background:true. Sends SIGTERM, waits ~5s, then SIGKILL. Reaps the child."
654 }
655
656 fn input_schema(&self) -> &Value {
657 self.schema.get_or_init(|| {
658 json!({
659 "type": "object",
660 "properties": {
661 "shell_id": { "type": "string", "description": "Shell id returned by Bash(background=true)." }
662 },
663 "required": ["shell_id"]
664 })
665 })
666 }
667
668 async fn invoke(&self, input: Value, _cx: ToolContext) -> Result<Vec<ContentBlock>, ToolError> {
669 let parsed: KillShellInput = crate::parse_input(input)?;
670 let job = self.registry.get(&parsed.shell_id).ok_or_else(|| {
671 ToolError::execution(std::io::Error::other(format!(
672 "no background shell with id {}",
673 parsed.shell_id
674 )))
675 })?;
676
677 if job.snapshot_status() != BashStatus::Running {
678 return Ok(vec![ContentBlock::Text(TextBlock {
679 text: format!(
680 "Shell {} is already in status {}; no action taken.",
681 job.id,
682 job.snapshot_status().as_str()
683 ),
684 cache_control: None,
685 })]);
686 }
687
688 kill_job_now(&job, false);
689
690 let deadline = Instant::now() + self.grace;
692 while Instant::now() < deadline {
693 if job.snapshot_status() != BashStatus::Running {
694 break;
695 }
696 tokio::time::sleep(Duration::from_millis(50)).await;
697 }
698
699 if job.snapshot_status() == BashStatus::Running {
700 kill_job_now(&job, true);
701 tokio::time::sleep(Duration::from_millis(200)).await;
703 }
704
705 let status = job.snapshot_status();
706 let consumed_stdout = job.stdout.lock().unwrap().written();
707 let consumed_stderr = job.stderr.lock().unwrap().written();
708 Ok(vec![ContentBlock::Text(TextBlock {
709 text: format!(
710 "Killed shell {}; status={}; consumed_stdout={} bytes, consumed_stderr={} bytes",
711 job.id,
712 status.as_str(),
713 consumed_stdout,
714 consumed_stderr,
715 ),
716 cache_control: None,
717 })])
718 }
719}
720
721#[cfg(test)]
722mod tests {
723 use super::*;
724 use serde_json::json;
725 use tokio_util::sync::CancellationToken;
726
727 fn ctx() -> ToolContext {
728 ToolContext {
729 tool_use_id: "t1".into(),
730 cancel: CancellationToken::new(),
731 hooks: None,
732 turn_index: 0,
733 }
734 }
735
736 #[test]
741 fn ring_buffer_drops_oldest_at_cap() {
742 let mut rb = RingBuffer::with_cap(16);
744 rb.push(b"0123456789ABCDEF");
745 assert_eq!(rb.len(), 16);
746 assert_eq!(rb.written(), 16);
747 rb.push(b"GHIJKLMN");
748 assert_eq!(rb.len(), 16);
749 assert_eq!(rb.dropped(), 8);
750 let (text, end) = rb.snapshot();
751 assert_eq!(text, "89ABCDEFGHIJKLMN");
752 assert_eq!(end, 24);
753 }
754
755 #[test]
756 fn ring_buffer_handles_chunk_bigger_than_cap() {
757 let mut rb = RingBuffer::with_cap(4);
758 rb.push(b"0123456789");
759 let (text, end) = rb.snapshot();
760 assert_eq!(text, "6789");
761 assert_eq!(end, 10);
762 assert_eq!(rb.dropped(), 6);
763 }
764
765 #[test]
766 fn ring_buffer_read_since_returns_tail() {
767 let mut rb = RingBuffer::with_cap(32);
768 rb.push(b"hello world");
769 let (text, start, end) = rb.read_since(6);
770 assert_eq!(text, "world");
771 assert_eq!(start, 6);
772 assert_eq!(end, 11);
773 }
774
775 #[test]
780 fn build_shell_without_sandbox_invokes_bin_sh_directly() {
781 let cmd = build_shell("echo hi", &std::env::current_dir().unwrap(), None).unwrap();
782 let std_cmd = cmd.as_std();
783 assert_eq!(std_cmd.get_program(), "/bin/sh");
784 let args: Vec<String> = std_cmd
785 .get_args()
786 .map(|a| a.to_string_lossy().into_owned())
787 .collect();
788 assert_eq!(args, ["-c", "echo hi"]);
789 }
790
791 #[test]
792 fn build_shell_routes_through_the_sandbox_wrap() {
793 let policy = caliban_sandbox::Policy {
799 enabled: true,
800 ..Default::default()
801 };
802 let shim = Arc::new(caliban_sandbox::SandboxedShim::new(policy).unwrap());
803 let cmd = build_shell("echo hi", &std::env::current_dir().unwrap(), Some(&shim)).unwrap();
804 let program = cmd.as_std().get_program().to_string_lossy().into_owned();
805 if shim.is_active() {
806 assert_ne!(
807 program, "/bin/sh",
808 "an active sandbox must wrap the shell program",
809 );
810 } else {
811 assert_eq!(program, "/bin/sh");
812 }
813 }
814
815 #[tokio::test]
816 async fn spawn_background_returns_shell_id_immediately() {
817 let reg = BashBgRegistry::new_for_test(1024 * 1024);
818 let start = Instant::now();
819 let id = spawn_background(
820 ®,
821 "sleep 5".into(),
822 &std::env::current_dir().unwrap(),
823 None,
824 )
825 .unwrap();
826 assert!(start.elapsed() < Duration::from_millis(500));
828 assert_eq!(id.len(), 12);
829 assert!(reg.get(&id).is_some());
831 assert_eq!(reg.running_count(), 1);
832 if let Some(job) = reg.get(&id) {
834 kill_job_now(&job, true);
835 }
836 }
837
838 #[tokio::test]
839 async fn bash_output_returns_streaming_stdout() {
840 let reg = BashBgRegistry::new_for_test(1024 * 1024);
841 let id = spawn_background(
842 ®,
843 "printf 'hello'; sleep 30".into(),
844 &std::env::current_dir().unwrap(),
845 None,
846 )
847 .unwrap();
848 for _ in 0..50 {
850 tokio::time::sleep(Duration::from_millis(50)).await;
851 let job = reg.get(&id).unwrap();
852 let (text, _e) = job.stdout.lock().unwrap().snapshot();
853 if text.contains("hello") {
854 break;
855 }
856 }
857 let tool = BashOutputTool::new(reg.clone());
858 let out = tool.invoke(json!({"shell_id": id}), ctx()).await.unwrap();
859 let ContentBlock::Text(t) = &out[0] else {
860 panic!("expected Text")
861 };
862 assert!(t.text.contains("hello"), "out: {}", t.text);
863 assert!(t.text.contains("status: running"), "out: {}", t.text);
864 if let Some(job) = reg.get(&id) {
866 kill_job_now(&job, true);
867 }
868 }
869
870 #[tokio::test]
871 async fn bash_output_supports_since_offset() {
872 let reg = BashBgRegistry::new_for_test(1024 * 1024);
873 let id = spawn_background(
874 ®,
875 "printf 'aaaaa'; sleep 30".into(),
876 &std::env::current_dir().unwrap(),
877 None,
878 )
879 .unwrap();
880 for _ in 0..50 {
882 tokio::time::sleep(Duration::from_millis(50)).await;
883 let job = reg.get(&id).unwrap();
884 if job.stdout.lock().unwrap().written() >= 5 {
885 break;
886 }
887 }
888 let tool = BashOutputTool::new(reg.clone());
889 let out = tool
890 .invoke(json!({"shell_id": id, "since_offset": 3}), ctx())
891 .await
892 .unwrap();
893 let ContentBlock::Text(t) = &out[0] else {
894 panic!("expected Text")
895 };
896 assert!(t.text.contains("bytes 3..5"), "out: {}", t.text);
897 if let Some(job) = reg.get(&id) {
898 kill_job_now(&job, true);
899 }
900 }
901
902 #[tokio::test]
903 async fn kill_shell_terminates_running_job() {
904 let reg = BashBgRegistry::new_for_test(1024 * 1024);
905 let id = spawn_background(
906 ®,
907 "sleep 60".into(),
908 &std::env::current_dir().unwrap(),
909 None,
910 )
911 .unwrap();
912 assert_eq!(reg.running_count(), 1);
913 let tool = KillShellTool::with_grace(reg.clone(), Duration::from_millis(500));
914 let out = tool
915 .invoke(json!({"shell_id": id.clone()}), ctx())
916 .await
917 .unwrap();
918 let ContentBlock::Text(t) = &out[0] else {
919 panic!("expected Text")
920 };
921 assert!(t.text.contains("Killed shell"), "out: {}", t.text);
922 for _ in 0..20 {
924 if reg.running_count() == 0 {
925 break;
926 }
927 tokio::time::sleep(Duration::from_millis(50)).await;
928 }
929 assert_eq!(reg.running_count(), 0);
930 }
931
932 #[tokio::test]
933 async fn kill_all_terminates_every_running_job() {
934 let reg = BashBgRegistry::new_for_test(1024 * 1024);
935 let ids: Vec<String> = (0..3)
936 .map(|_| {
937 spawn_background(
938 ®,
939 "sleep 60".into(),
940 &std::env::current_dir().unwrap(),
941 None,
942 )
943 .unwrap()
944 })
945 .collect();
946 assert_eq!(reg.running_count(), 3);
947 reg.kill_all();
948 for _ in 0..40 {
949 if reg.running_count() == 0 {
950 break;
951 }
952 tokio::time::sleep(Duration::from_millis(50)).await;
953 }
954 assert_eq!(reg.running_count(), 0);
955 for id in ids {
956 let job = reg.get(&id).unwrap();
957 assert_ne!(job.snapshot_status(), BashStatus::Running);
958 }
959 }
960
961 #[tokio::test]
962 async fn bash_output_unknown_id_returns_error() {
963 let reg = BashBgRegistry::new_for_test(1024);
964 let tool = BashOutputTool::new(reg);
965 let err = tool
966 .invoke(json!({"shell_id": "doesnotexist"}), ctx())
967 .await
968 .unwrap_err();
969 assert!(matches!(err, ToolError::Execution(_)));
970 let msg = format!("{err}");
971 assert!(msg.contains("no background shell"), "msg: {msg}");
972 }
973
974 #[test]
975 fn new_shell_id_is_12_chars() {
976 let id = new_shell_id();
977 assert_eq!(id.len(), 12);
978 assert!(id.chars().all(|c| c.is_ascii_alphanumeric()));
979 }
980}