1mod output;
10mod runtime;
11
12use std::path::PathBuf;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use serde_json::json;
17use tokio_util::sync::CancellationToken;
18
19use lash_core::plugin::{
20 PluginError, PluginFactory, PluginSessionContext, PluginSpec, PluginSpecFactory, SessionPlugin,
21};
22use lash_core::runtime::ProcessEventSemanticsSpec;
23use lash_core::{
24 PreparedToolCall, ProcessEventType, ProcessHandleDescriptor, ProcessInput, ProcessStartRequest,
25 ProgressSender, PromptContribution, SessionScope, SessionToolAccess, ToolCall, ToolDefinition,
26 ToolProvider, ToolResult, ToolScheduling,
27};
28
29use lash_tool_support::{
30 StaticToolExecute, StaticToolProvider, ToolDefinitionLashlangExt, object_schema,
31 parse_optional_bool, parse_optional_usize_arg, require_str,
32};
33
34use crate::shell::output::{PollOutcome, shell_io_result, timed_out_shell_io_result};
35use crate::shell::runtime::{
36 CommonCommandParams, DEFAULT_EXEC_COMMAND_TIMEOUT_MS, ExecCommandParams,
37 PipeExecProcessRequest, ShellRuntime, StartCommandParams, WaitBehavior,
38};
39
40const SHELL_STDIN_SIGNAL: &str = "stdin";
41const SHELL_STDIN_SIGNAL_EVENT: &str = "signal.stdin";
42
43pub fn shell_prompt_contributions() -> Vec<PromptContribution> {
44 shell_prompt_contributions_for_access(&SessionToolAccess::default())
45}
46
47pub fn shell_prompt_contributions_for_access(
51 access: &SessionToolAccess,
52) -> Vec<PromptContribution> {
53 let mut command_execution = String::from(
54 "Use `shell.exec` for one-shot commands; it returns only after the process exits and successful results include `status: \"completed\"`, `done: true`, and `exit_code`. Use `shell.start` only for interactive or intentionally long-lived processes; it returns a process handle that is visible to `processes.list` and cancellable with `processes.cancel`.",
55 );
56 if tool_callable_from_authority(access, "write_stdin") {
57 command_execution.push_str(" Send stdin to running shell processes with `shell.write`.");
58 }
59 command_execution.push_str(
60 " For builds, installs, tests, migrations, service setup, and verification commands, use `shell.exec` and wait for completion before concluding.",
61 );
62 vec![
63 PromptContribution::guidance("Command Execution", command_execution),
64 PromptContribution::guidance(
65 "Git Safety",
66 "Avoid destructive git commands unless explicitly requested.",
67 ),
68 ]
69}
70
71fn tool_callable_from_authority(access: &SessionToolAccess, name: &str) -> bool {
72 if access.hides(name) {
73 return false;
74 }
75 access.tools.is_empty() || access.tools.iter().any(|tool| tool.name() == name)
76}
77
78pub struct StandardShell {
79 runtime: ShellRuntime,
80}
81
82impl StandardShell {
83 pub fn new() -> Self {
84 Self {
85 runtime: ShellRuntime::new(),
86 }
87 }
88
89 pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
90 self.runtime = self.runtime.with_cwd(cwd);
91 self
92 }
93
94 fn parse_common_command_params(
95 &self,
96 args: &serde_json::Value,
97 ) -> Result<CommonCommandParams, ToolResult> {
98 let cmd = require_str(args, "cmd")?.to_string();
99 let workdir = self.runtime.resolve_workdir(
100 args.get("workdir")
101 .and_then(|value| value.as_str())
102 .filter(|value| !value.is_empty()),
103 );
104 let shell_path = args
105 .get("shell")
106 .and_then(|value| value.as_str())
107 .filter(|value| !value.is_empty())
108 .unwrap_or(&self.runtime.shell_path)
109 .to_string();
110 let login = parse_optional_bool(args, "login", false)?;
111 let max_output_tokens = parse_optional_usize_arg(args, "max_output_tokens", None, true, 1)?;
112
113 Ok(CommonCommandParams {
114 cmd,
115 workdir,
116 shell_path,
117 login,
118 max_output_tokens,
119 })
120 }
121
122 fn parse_exec_command_params(
123 &self,
124 args: &serde_json::Value,
125 ) -> Result<ExecCommandParams, ToolResult> {
126 let common = self.parse_common_command_params(args)?;
127 let timeout_ms = parse_optional_usize_arg(args, "timeout_ms", None, false, 1)?
128 .map(|value| value as u64)
129 .unwrap_or(DEFAULT_EXEC_COMMAND_TIMEOUT_MS);
130
131 Ok(ExecCommandParams {
132 cmd: common.cmd,
133 workdir: common.workdir,
134 shell_path: common.shell_path,
135 login: common.login,
136 timeout_ms,
137 max_output_tokens: common.max_output_tokens,
138 })
139 }
140
141 fn parse_start_command_params(
142 &self,
143 args: &serde_json::Value,
144 ) -> Result<StartCommandParams, ToolResult> {
145 let common = self.parse_common_command_params(args)?;
146
147 Ok(StartCommandParams {
148 cmd: common.cmd,
149 workdir: common.workdir,
150 shell_path: common.shell_path,
151 login: common.login,
152 max_output_tokens: common.max_output_tokens,
153 })
154 }
155
156 async fn exec_command(
157 &self,
158 params: &ExecCommandParams,
159 progress: Option<&ProgressSender>,
160 cancel: Option<CancellationToken>,
161 ) -> ToolResult {
162 let started = Instant::now();
163 let handle_id = self.runtime.allocate_handle_id();
164
165 match self
166 .runtime
167 .exec_pipe_process(PipeExecProcessRequest {
168 id: &handle_id,
169 command: ¶ms.cmd,
170 workdir: ¶ms.workdir,
171 login: params.login,
172 shell_path: ¶ms.shell_path,
173 timeout: Some(Duration::from_millis(params.timeout_ms)),
174 progress,
175 max_output_tokens: params.max_output_tokens,
176 cancel,
177 })
178 .await
179 {
180 Ok(PollOutcome::Running {
181 output,
182 original_token_count,
183 full_output_path,
184 ..
185 }) => timed_out_shell_io_result(
186 &handle_id,
187 output,
188 original_token_count,
189 full_output_path.as_deref(),
190 started.elapsed().as_secs_f64(),
191 params.timeout_ms,
192 ),
193 Ok(PollOutcome::Exited {
194 output,
195 original_token_count,
196 exit_code,
197 full_output_path,
198 }) => shell_io_result(
199 &handle_id,
200 output,
201 Some(exit_code),
202 original_token_count,
203 full_output_path.as_deref(),
204 started.elapsed().as_secs_f64(),
205 ),
206 Ok(PollOutcome::Cancelled) => ToolResult::cancelled("tool call cancelled"),
207 Err(err) => ToolResult::err(json!(err)),
208 }
209 }
210
211 async fn start_command(
212 &self,
213 params: &StartCommandParams,
214 context: &lash_core::ToolContext<'_>,
215 progress: Option<&ProgressSender>,
216 cancel: Option<CancellationToken>,
217 ) -> ToolResult {
218 if let Some(process_id) = context.async_process_id() {
219 return self
220 .run_start_command_process(process_id, params, context, progress, cancel)
221 .await;
222 }
223 self.register_start_command_process(params, context).await
224 }
225
226 async fn register_start_command_process(
227 &self,
228 params: &StartCommandParams,
229 context: &lash_core::ToolContext<'_>,
230 ) -> ToolResult {
231 let process_id = context
232 .tool_call_id()
233 .filter(|id| !id.is_empty())
234 .map(str::to_string)
235 .unwrap_or_else(|| format!("shell:{}", self.runtime.allocate_handle_id()));
236 let args = start_command_process_args(params);
237 let call = PreparedToolCall::from_parts(
238 process_id.clone(),
239 "tool:start_command",
240 "start_command",
241 args,
242 None,
243 serde_json::Value::Null,
244 );
245 let descriptor = ProcessHandleDescriptor::new(Some("shell"), Some(params.cmd.clone()));
246 let request = ProcessStartRequest::new(
247 process_id.clone(),
248 ProcessInput::ToolCall { call },
249 lash_core::ProcessOriginator::host(),
250 )
251 .with_grant(Some(lash_core::ProcessStartGrant {
252 session_scope: SessionScope::new("request-descriptor"),
253 descriptor,
254 }))
255 .with_extra_event_types([shell_signal_event_type()]);
256 match context.processes().start(request).await {
257 Ok(summary) => {
258 let mut handle = serde_json::to_value(summary).unwrap_or_else(|_| {
259 lash_core::RuntimeExecutionContext::process_handle_json(&process_id)
260 });
261 if let Some(object) = handle.as_object_mut() {
262 object.insert("status".to_string(), json!("running"));
263 object.insert("done".to_string(), json!(false));
264 object.insert("running".to_string(), json!(true));
265 }
266 ToolResult::ok(handle)
267 }
268 Err(err) => ToolResult::err_fmt(err.to_string()),
269 }
270 }
271
272 async fn run_start_command_process(
273 &self,
274 process_id: &str,
275 params: &StartCommandParams,
276 context: &lash_core::ToolContext<'_>,
277 progress: Option<&ProgressSender>,
278 cancel: Option<CancellationToken>,
279 ) -> ToolResult {
280 let started = Instant::now();
281 let handle_id = process_id.to_string();
282
283 if let Err(err) = self.runtime.spawn_process(
284 handle_id.clone(),
285 ¶ms.cmd,
286 ¶ms.workdir,
287 params.login,
288 ¶ms.shell_path,
289 ) {
290 return ToolResult::err(json!(err));
291 }
292
293 let signal_done = CancellationToken::new();
294 let signal_forwarder =
295 self.spawn_stdin_signal_forwarder(handle_id.clone(), context, signal_done.clone());
296 match self
297 .runtime
298 .wait_until_exit_or_timeout(
299 &handle_id,
300 None,
301 progress,
302 params.max_output_tokens,
303 WaitBehavior { baseline_len: 0 },
304 cancel,
305 )
306 .await
307 {
308 Ok(PollOutcome::Running { .. }) => {
309 signal_done.cancel();
310 let _ = signal_forwarder.await;
311 self.runtime.remove_process(&handle_id);
312 ToolResult::err_fmt("background shell process returned running without a timeout")
313 }
314 Ok(PollOutcome::Exited {
315 output,
316 original_token_count,
317 exit_code,
318 full_output_path,
319 }) => {
320 signal_done.cancel();
321 let _ = signal_forwarder.await;
322 self.runtime.remove_process(&handle_id);
323 shell_io_result(
324 &handle_id,
325 output,
326 Some(exit_code),
327 original_token_count,
328 full_output_path.as_deref(),
329 started.elapsed().as_secs_f64(),
330 )
331 }
332 Ok(PollOutcome::Cancelled) => {
333 signal_done.cancel();
334 let _ = signal_forwarder.await;
335 self.runtime.remove_process(&handle_id);
336 ToolResult::cancelled("tool call cancelled")
337 }
338 Err(err) => {
339 signal_done.cancel();
340 let _ = signal_forwarder.await;
341 self.runtime.remove_process(&handle_id);
342 ToolResult::err(json!(err))
343 }
344 }
345 }
346
347 fn spawn_stdin_signal_forwarder(
348 &self,
349 process_id: String,
350 context: &lash_core::ToolContext<'_>,
351 done: CancellationToken,
352 ) -> tokio::task::JoinHandle<()> {
353 let runtime = self.runtime.clone();
354 let events = context.process_events();
355 tokio::spawn(async move {
356 let mut after_sequence = 0;
357 loop {
358 let event = tokio::select! {
359 _ = done.cancelled() => break,
360 event = events.wait_event_after(SHELL_STDIN_SIGNAL_EVENT, after_sequence) => event,
361 };
362 let Ok(event) = event else {
363 break;
364 };
365 after_sequence = event.sequence;
366 if let Some(chars) = event.payload.get("chars").and_then(|value| value.as_str()) {
367 let _ = runtime.write_stdin(&process_id, chars).await;
368 }
369 if event
370 .payload
371 .get("close_stdin")
372 .and_then(|value| value.as_bool())
373 .unwrap_or(false)
374 {
375 let _ = runtime.close_stdin(&process_id).await;
376 }
377 }
378 })
379 }
380
381 async fn write_stdin_call(
382 &self,
383 args: &serde_json::Value,
384 context: &lash_core::ToolContext<'_>,
385 ) -> ToolResult {
386 let process_id = match parse_process_id(args) {
387 Ok(value) => value,
388 Err(err) => return err,
389 };
390 let chars = args
391 .get("chars")
392 .and_then(|value| value.as_str())
393 .unwrap_or("");
394 let close_stdin = match parse_optional_bool(args, "close_stdin", false) {
395 Ok(value) => value,
396 Err(err) => return err,
397 };
398 match context
399 .processes()
400 .signal(
401 &process_id,
402 SHELL_STDIN_SIGNAL,
403 json!({
404 "chars": chars,
405 "close_stdin": close_stdin,
406 }),
407 )
408 .await
409 {
410 Ok(event) => ToolResult::ok(json!({
411 "process_id": process_id,
412 "status": "signalled",
413 "sequence": event.sequence,
414 })),
415 Err(err) => ToolResult::err_fmt(err.to_string()),
416 }
417 }
418}
419
420fn start_command_process_args(params: &StartCommandParams) -> serde_json::Value {
421 let mut args = serde_json::Map::new();
422 args.insert("cmd".to_string(), json!(params.cmd.clone()));
423 args.insert(
424 "workdir".to_string(),
425 json!(params.workdir.to_string_lossy().to_string()),
426 );
427 args.insert("shell".to_string(), json!(params.shell_path.clone()));
428 args.insert("login".to_string(), json!(params.login));
429 if let Some(max_output_tokens) = params.max_output_tokens {
430 args.insert("max_output_tokens".to_string(), json!(max_output_tokens));
431 }
432 serde_json::Value::Object(args)
433}
434
435fn shell_signal_event_type() -> ProcessEventType {
436 ProcessEventType {
437 name: SHELL_STDIN_SIGNAL_EVENT.to_string(),
438 payload_schema: lash_core::LashSchema::any(),
439 semantics: ProcessEventSemanticsSpec::default(),
440 }
441}
442
443impl Default for StandardShell {
444 fn default() -> Self {
445 Self::new()
446 }
447}
448
449pub fn shell_provider(shell: StandardShell) -> StaticToolProvider<StandardShell> {
451 let definitions = shell.tool_definitions();
452 StaticToolProvider::new(definitions, shell)
453}
454
455#[async_trait::async_trait]
456impl StaticToolExecute for StandardShell {
457 async fn execute(&self, call: ToolCall<'_>) -> ToolResult {
458 let cancellation_token = call.context.cancellation_token().cloned();
459 self.dispatch(
460 call.name,
461 call.args,
462 call.context,
463 call.progress,
464 cancellation_token,
465 )
466 .await
467 }
468}
469
470impl StandardShell {
471 fn tool_definitions(&self) -> Vec<ToolDefinition> {
472 let exec_command_description = "Run a noninteractive one-shot command with stdin closed and stdout/stderr captured, then wait for it to finish. The command is executed exactly as written by the selected shell; the tool does not add strict-mode prefixes or rewrite pipelines. Completed commands always include `status: \"completed\"`, `done: true`, `running: false`, cleaned `output`, and `exit_code`. Nonzero exit codes are returned as ordinary result data; in Lashlang, `await shell.exec(...)?` does not abort just because the process exited nonzero. Inspect `exit_code` yourself when it matters. Commands time out after 600000 ms by default; set `timeout_ms` to override the hard timeout. Timed-out commands are killed and returned as a tool failure with `status: \"timed_out\"`, `timed_out: true`, and no `exit_code`. Use `shell.start` instead for interactive, TTY-dependent, or intentionally long-lived processes. ANSI/control noise is stripped from returned output. Large or truncated output may also include `full_output_path` pointing at the saved raw stream; prefer that over shell-level `head`/`tail` truncation when you need to inspect more.";
473 let start_command_description = "Start an interactive or intentionally long-lived command in a PTY as a durable background process. The command is executed exactly as written by the selected shell. The result is a process handle with `__handle__: \"process\"`, `id`, `process_id`, `status: \"running\"`, `done: false`, and `running: true`; use `processes.list` to see it and `processes.cancel` to stop it. When the process exits, nonzero exit codes are returned as ordinary result data with `exit_code`; in Lashlang, `?` does not abort just because the process exited nonzero. Inspect `exit_code` yourself. Use `shell.exec` for builds, installs, tests, service setup, verification, and other commands that must complete before the next step.";
474 let command_common = |command_description: &str| {
475 json!({
476 "cmd": {
477 "type": "string",
478 "description": command_description
479 },
480 "workdir": {
481 "type": "string",
482 "description": "Optional working directory to run the command in; defaults to the turn cwd."
483 },
484 "shell": {
485 "type": "string",
486 "description": "Shell binary to launch. Defaults to the user's default shell."
487 },
488 "login": {
489 "type": "boolean",
490 "default": false,
491 "description": "Whether to run the shell with -l semantics. Defaults to false to avoid startup prompts and shell init noise."
492 },
493 "max_output_tokens": {
494 "type": "integer",
495 "minimum": 1,
496 "description": "Maximum number of tokens to return. Excess output will be truncated."
497 }
498 })
499 };
500 vec![
501 ToolDefinition::raw(
502 "tool:exec_command",
503 "exec_command",
504 exec_command_description,
505 {
506 let mut properties = command_common("Shell command to execute.");
507 properties["timeout_ms"] = json!({
508 "type": "integer",
509 "minimum": 1,
510 "default": DEFAULT_EXEC_COMMAND_TIMEOUT_MS,
511 "description": "Hard timeout in milliseconds. If reached before the command exits, the process is killed and returned as a tool failure with `status: \"timed_out\"` and `timed_out: true`. Defaults to 600000 ms."
512 });
513 object_schema(properties, &["cmd"])
514 },
515 shell_exec_output_schema(),
516 )
517 .with_examples(vec![
518 r#"await shell.exec({ cmd: "cargo test -p lash-protocol-rlm", timeout_ms: 600000 })?"#.into(),
519 r#"probe = await shell.exec({ cmd: "test -f Cargo.lock" })?
520submit probe.exit_code == 0"#.into(),
521 ])
522 .with_lashlang_binding(lash_tool_support::lashlang_binding(
523 ["shell"],
524 "exec",
525 &["shell", "bash"],
526 ))
527 .with_scheduling(ToolScheduling::Serial),
528 ToolDefinition::raw(
529 "tool:start_command",
530 "start_command",
531 start_command_description,
532 object_schema(command_common("Shell command to start."), &["cmd"]),
533 shell_start_output_schema(),
534 )
535 .with_examples(vec![
536 r#"await shell.start({ cmd: "python -m http.server 8000" })?"#.into(),
537 ])
538 .with_lashlang_binding(lash_tool_support::lashlang_binding(
539 ["shell"],
540 "start",
541 &["long_running_command", "pty"],
542 ))
543 .with_scheduling(ToolScheduling::Serial),
544 ToolDefinition::raw(
545 "tool:write_stdin",
546 "write_stdin",
547 "Send bytes to stdin for a running shell process started by `shell.start`. Use `close_stdin: true` to send EOF. This only acknowledges delivery of the signal; use process lifecycle tools to inspect or cancel the background process.",
548 object_schema(
549 json!({
550 "process_id": {
551 "type": "string",
552 "description": "Process id returned by `shell.start`."
553 },
554 "chars": {
555 "type": "string",
556 "default": "",
557 "description": "Bytes to write to stdin; may be empty when only closing stdin."
558 },
559 "close_stdin": {
560 "type": "boolean",
561 "default": false,
562 "description": "Close stdin after writing to send EOF to the process."
563 }
564 }),
565 &["process_id"],
566 ),
567 shell_write_output_schema(),
568 )
569 .with_examples(vec![
570 r#"await shell.write({ process_id: "call-shell-1", chars: "status\n" })?"#.into(),
571 r#"await shell.write({ process_id: "call-shell-1", chars: "", close_stdin: true })?"#.into(),
572 ])
573 .with_lashlang_binding(lash_tool_support::lashlang_binding(
574 ["shell"],
575 "write",
576 &["send_stdin", "poll_command"],
577 ))
578 .with_scheduling(ToolScheduling::Serial),
579 ]
580 }
581
582 async fn dispatch(
583 &self,
584 name: &str,
585 args: &serde_json::Value,
586 context: &lash_core::ToolContext<'_>,
587 progress: Option<&ProgressSender>,
588 cancel: Option<CancellationToken>,
589 ) -> ToolResult {
590 match name {
591 "exec_command" => {
592 let params = match self.parse_exec_command_params(args) {
593 Ok(params) => params,
594 Err(err) => return err,
595 };
596 self.exec_command(¶ms, progress, cancel).await
597 }
598 "start_command" => {
599 let params = match self.parse_start_command_params(args) {
600 Ok(params) => params,
601 Err(err) => return err,
602 };
603 self.start_command(¶ms, context, progress, cancel).await
604 }
605 "write_stdin" => self.write_stdin_call(args, context).await,
606 _ => ToolResult::err_fmt(format_args!("Unknown tool: {name}")),
607 }
608 }
609}
610
611fn shell_exec_output_schema() -> serde_json::Value {
612 json!({
613 "type": "object",
614 "properties": {
615 "output": { "type": "string" },
616 "status": { "type": "string", "enum": ["completed", "timed_out"] },
617 "done": { "type": "boolean" },
618 "running": { "type": "boolean" },
619 "wall_time_seconds": { "type": "number", "minimum": 0 },
620 "exit_code": { "type": "integer" },
621 "timed_out": { "type": "boolean" },
622 "error": { "type": "string" },
623 "original_token_count": { "type": "integer", "minimum": 0 },
624 "full_output_path": { "type": "string" }
625 },
626 "required": ["output", "status", "done", "running", "wall_time_seconds"],
627 "additionalProperties": false
628 })
629}
630
631fn shell_start_output_schema() -> serde_json::Value {
632 json!({
633 "type": "object",
634 "properties": {
635 "__handle__": { "type": "string", "enum": ["process"] },
636 "id": { "type": "string" },
637 "process_id": { "type": "string" },
638 "status": { "type": "string", "enum": ["running"] },
639 "done": { "type": "boolean" },
640 "running": { "type": "boolean" }
641 },
642 "required": ["__handle__", "id", "process_id", "status", "done", "running"],
643 "additionalProperties": false
644 })
645}
646
647fn shell_write_output_schema() -> serde_json::Value {
648 json!({
649 "type": "object",
650 "properties": {
651 "process_id": { "type": "string" },
652 "status": { "type": "string", "enum": ["signalled"] },
653 "sequence": { "type": "integer", "minimum": 0 }
654 },
655 "required": ["process_id", "status", "sequence"],
656 "additionalProperties": false
657 })
658}
659
660fn parse_process_id(args: &serde_json::Value) -> Result<String, ToolResult> {
661 require_str(args, "process_id").map(str::to_string)
662}
663
664#[derive(Default)]
670pub struct StandardShellPluginFactory;
671
672impl StandardShellPluginFactory {
673 pub fn new() -> Self {
674 Self
675 }
676}
677
678impl PluginFactory for StandardShellPluginFactory {
679 fn id(&self) -> &'static str {
680 "shell"
681 }
682
683 fn build(&self, ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError> {
684 let tool_access = ctx.tool_access.clone();
685 let provider = Arc::new(shell_provider(StandardShell::new())) as Arc<dyn ToolProvider>;
686 PluginSpecFactory::new(
687 "shell",
688 Arc::new(move |_ctx| {
689 let provider = Arc::clone(&provider);
690 let tool_access = tool_access.clone();
691 Ok(PluginSpec::new()
692 .with_tool_provider(provider)
693 .with_prompt_contributor(Arc::new(move |_ctx| {
694 let tool_access = tool_access.clone();
695 Box::pin(
696 async move { Ok(shell_prompt_contributions_for_access(&tool_access)) },
697 )
698 })))
699 }),
700 )
701 .build(ctx)
702 }
703}
704
705include!("tests.rs");