lash_plugin_process_controls/
lib.rs1use std::sync::Arc;
8
9use serde_json::Value;
10
11use lash_core::plugin::{
12 PluginError, PluginFactory, PluginSessionContext, PluginSpec, SessionPlugin,
13 StaticPluginFactory,
14};
15use lash_core::{
16 ToolAgentSurface, ToolAvailabilityConfig, ToolCall, ToolDefinition, ToolProvider, ToolResult,
17 ToolScheduling,
18};
19use lash_tool_support::{StaticToolExecute, StaticToolProvider};
20
21pub struct ProcessControlsPluginFactory {
27 inner: StaticPluginFactory,
28}
29
30impl ProcessControlsPluginFactory {
31 pub fn new() -> Self {
32 Self::with_cancel_process(true)
33 }
34
35 pub fn without_cancel_process() -> Self {
36 Self::with_cancel_process(false)
37 }
38
39 fn with_cancel_process(include_cancel_process: bool) -> Self {
40 let provider = StaticToolProvider::new(
41 process_control_tool_definitions(include_cancel_process),
42 ProcessControlsTools {
43 include_cancel_process,
44 },
45 );
46 let spec =
47 PluginSpec::new().with_tool_provider(Arc::new(provider) as Arc<dyn ToolProvider>);
48 Self {
49 inner: StaticPluginFactory::new("process_controls", spec),
50 }
51 }
52}
53
54impl Default for ProcessControlsPluginFactory {
55 fn default() -> Self {
56 Self::new()
57 }
58}
59
60impl PluginFactory for ProcessControlsPluginFactory {
61 fn id(&self) -> &'static str {
62 self.inner.id()
63 }
64
65 fn build(&self, ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError> {
66 self.inner.build(ctx)
67 }
68}
69
70struct ProcessControlsTools {
71 include_cancel_process: bool,
72}
73
74#[async_trait::async_trait]
75impl StaticToolExecute for ProcessControlsTools {
76 async fn execute(&self, call: ToolCall<'_>) -> ToolResult {
77 match call.name {
78 "list_process_handles" => execute_process_list_tool_call(call.context, call.args).await,
79 "cancel_process" if self.include_cancel_process => {
80 execute_process_cancel_tool_call(call.context, call.args).await
81 }
82 _ => ToolResult::err_fmt(format_args!("Unknown tool: {}", call.name)),
83 }
84 }
85}
86
87pub fn process_list_tool_definition() -> ToolDefinition {
88 ToolDefinition::raw(
89 "tool:list_process_handles",
90 "list_process_handles",
91 "List process runs visible to this session, including `shell.start` runs, with process id, descriptor, optional definition name, and lifecycle status. Filters are optional; the default returns running runs.",
92 serde_json::json!({
93 "type": "object",
94 "properties": {
95 "status": {
96 "type": "string",
97 "enum": ["running", "completed", "failed", "cancelled", "any"],
98 "description": "Lifecycle status to list. The default is `running`; `any` includes historical runs."
99 },
100 "definition": {
101 "type": "object",
102 "description": "A Lashlang process definition value, for example `on_button`."
103 }
104 },
105 "additionalProperties": false
106 }),
107 process_list_output_schema(),
108 )
109 .with_examples(vec![
110 "await processes.list({})?".into(),
111 r#"await processes.list({ status: "any" })?"#.into(),
112 "await processes.list({ definition: on_button })?".into(),
113 ])
114 .with_agent_surface(ToolAgentSurface::new(["processes"], "list"))
115 .with_availability(ToolAvailabilityConfig::callable())
116 .with_scheduling(ToolScheduling::Parallel)
117}
118
119fn process_control_tool_definitions(include_cancel_process: bool) -> Vec<ToolDefinition> {
120 let mut definitions = vec![process_list_tool_definition()];
121 if include_cancel_process {
122 definitions.push(process_cancel_tool_definition());
123 }
124 definitions
125}
126
127pub fn process_cancel_tool_definition() -> ToolDefinition {
128 ToolDefinition::raw(
129 "tool:cancel_process",
130 "cancel_process",
131 "Request cancellation for a durable process, including a running `shell.start` process, by `process_id`.",
132 serde_json::json!({
133 "type": "object",
134 "properties": {
135 "process_id": {
136 "type": "string",
137 "description": "Process id returned by a process handle or `processes.list(...)`."
138 }
139 },
140 "required": ["process_id"],
141 "additionalProperties": false
142 }),
143 serde_json::json!({
144 "type": "object",
145 "properties": {
146 "process_id": { "type": "string" },
147 "status": {
148 "type": "string",
149 "enum": ["running", "completed", "failed", "cancelled"]
150 }
151 },
152 "required": ["process_id", "status"],
153 "additionalProperties": false
154 }),
155 )
156 .with_examples(vec![
157 r#"await processes.cancel({ process_id: "tool:call-01JZK7G4QP9Q4J7W3Q2E1H6M9C" })?"#.into(),
158 r#"await processes.cancel({ process_id: "subagent:session-01JZK7G4QP9Q4J7W3Q2E1H6M9C" })?"#.into(),
159 ])
160 .with_agent_surface(ToolAgentSurface::new(["processes"], "cancel"))
161 .with_availability(ToolAvailabilityConfig::callable())
162 .with_scheduling(ToolScheduling::Parallel)
163}
164
165pub async fn execute_process_list_tool_call(
166 context: &lash_core::ToolContext<'_>,
167 args: &Value,
168) -> ToolResult {
169 let filter = match lash_core::ProcessListFilter::decode(args) {
170 Ok(filter) => filter,
171 Err(err) => return ToolResult::err_fmt(err),
172 };
173 let processes = context.processes();
174 let result = processes.list_handles_filtered(&filter).await;
175 match result {
176 Ok(entries) => ToolResult::ok(serde_json::json!(entries)),
177 Err(err) => ToolResult::err_fmt(err.to_string()),
178 }
179}
180
181fn process_list_output_schema() -> Value {
182 serde_json::json!({
183 "type": "array",
184 "items": {
185 "type": "object",
186 "properties": {
187 "__handle__": {
188 "type": "string",
189 "enum": ["process"],
190 "description": "Handle marker; pass the whole record where a process handle is needed."
191 },
192 "id": {
193 "type": "string",
194 "description": "Process handle id."
195 },
196 "process_id": {
197 "type": "string",
198 "description": "Same process id, repeated for tools that ask for process_id."
199 },
200 "descriptor": {
201 "type": "object",
202 "properties": {
203 "kind": { "type": "string" },
204 "label": { "type": "string" }
205 },
206 "additionalProperties": false
207 },
208 "definition": {
209 "type": "object",
210 "properties": {
211 "name": { "type": "string" }
212 },
213 "required": ["name"],
214 "additionalProperties": false
215 },
216 "status": {
217 "type": "string",
218 "enum": ["running", "completed", "failed", "cancelled"]
219 }
220 },
221 "required": ["__handle__", "id", "process_id", "descriptor", "status"],
222 "additionalProperties": false
223 }
224 })
225}
226
227pub async fn execute_process_cancel_tool_call(
228 context: &lash_core::ToolContext<'_>,
229 args: &Value,
230) -> ToolResult {
231 let Some(id) = args
232 .get("process_id")
233 .and_then(|value| value.as_str())
234 .map(str::trim)
235 .filter(|value| !value.is_empty())
236 else {
237 return ToolResult::err_fmt("cancel_process requires `process_id`");
238 };
239 let processes = context.processes();
240 match processes.cancel(id).await {
241 Ok(summary) => ToolResult::ok(serde_json::json!(summary)),
242 Err(err) => ToolResult::err_fmt(err.to_string()),
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use std::sync::Mutex;
250
251 #[derive(Default)]
252 struct DenyCancelAbility {
253 calls: Mutex<Vec<(lash_core::ProcessCancelSource, String)>>,
254 }
255
256 impl DenyCancelAbility {
257 fn calls(&self) -> Vec<(lash_core::ProcessCancelSource, String)> {
258 self.calls.lock().expect("cancel calls").clone()
259 }
260 }
261
262 #[async_trait::async_trait]
263 impl lash_core::ProcessCancelAbility for DenyCancelAbility {
264 async fn cancel(
265 &self,
266 _processes: &dyn lash_core::ProcessService,
267 request: lash_core::ProcessCancelRequest<'_>,
268 ) -> Result<lash_core::ProcessRecord, PluginError> {
269 self.calls
270 .lock()
271 .expect("cancel calls")
272 .push((request.source, request.process_id.to_string()));
273 Err(PluginError::Session("denied by host".to_string()))
274 }
275 }
276
277 fn context_with_cancel_ability(
278 ability: Arc<dyn lash_core::ProcessCancelAbility>,
279 ) -> lash_core::ToolContext<'static> {
280 let manager = Arc::new(lash_core::testing::MockSessionManager::default());
281 lash_core::ToolContext::__for_testing_with_process_cancel_ability(
282 "session".to_string(),
283 manager.clone(),
284 manager.clone(),
285 manager,
286 Arc::new(lash_core::UnavailableProcessService),
287 ability,
288 Arc::new(lash_core::InMemoryAttachmentStore::new()),
289 lash_core::DirectCompletionClient::from_fn(|_, _| {
290 Err(PluginError::Session(
291 "direct completions are unavailable in this test context".to_string(),
292 ))
293 }),
294 None,
295 )
296 }
297
298 #[test]
299 fn tool_definitions_expose_process_control_tools() {
300 let names = process_control_tool_definitions(true)
301 .into_iter()
302 .map(|tool| tool.name().to_string())
303 .collect::<Vec<_>>();
304
305 assert_eq!(names, vec!["list_process_handles", "cancel_process"]);
306 }
307
308 #[test]
309 fn cancel_process_definition_is_callable_when_registered() {
310 let definition = process_cancel_tool_definition();
311 assert_eq!(
312 definition.effective_availability(),
313 lash_core::ToolAvailability::Callable
314 );
315 let rendered = definition.compact_contract().render_signature();
316 assert!(rendered.contains("status: enum["), "{rendered}");
317 assert!(!rendered.contains("terminal:"), "{rendered}");
318 }
319
320 #[test]
321 fn list_process_contract_returns_handle_array() {
322 let definition = process_list_tool_definition();
323
324 assert_eq!(
325 definition.contract.output_schema["type"],
326 serde_json::json!("array")
327 );
328 let rendered = definition.compact_contract().render_signature();
329 assert!(rendered.contains("-> list[record{"), "{rendered}");
330 assert!(rendered.contains("__handle__"), "{rendered}");
331 assert!(rendered.contains("process_id"), "{rendered}");
332 assert!(rendered.contains("definition"), "{rendered}");
333 assert!(rendered.contains("status: enum["), "{rendered}");
334 assert!(rendered.contains("status?: enum["), "{rendered}");
335 assert!(rendered.contains("definition?: record"), "{rendered}");
336 assert!(!rendered.contains("history"), "{rendered}");
337 assert!(!rendered.contains("terminal:"), "{rendered}");
338 }
339
340 #[test]
341 fn plugin_registers_cancel_when_configured_and_omits_it_otherwise() {
342 let standard_session =
343 lash_core::PluginHost::new(
344 std::iter::once(
345 Arc::new(ProcessControlsPluginFactory::new()) as Arc<dyn PluginFactory>
346 )
347 .chain(lash_core::testing::test_standard_protocol_factories())
348 .collect(),
349 )
350 .build_session("standard", None)
351 .expect("standard session");
352 let standard_names = standard_session
353 .tool_surface("standard")
354 .expect("standard surface")
355 .tool_names()
356 .as_ref()
357 .clone();
358
359 let rlm_session = lash_core::PluginHost::new(
360 std::iter::once(
361 Arc::new(ProcessControlsPluginFactory::without_cancel_process())
362 as Arc<dyn PluginFactory>,
363 )
364 .chain(lash_core::testing::test_rlm_protocol_factories())
365 .collect(),
366 )
367 .build_session("rlm", None)
368 .expect("rlm session");
369 let rlm_names = rlm_session
370 .tool_surface("rlm")
371 .expect("rlm surface")
372 .tool_names()
373 .as_ref()
374 .clone();
375
376 assert!(standard_names.contains(&"list_process_handles".to_string()));
377 assert!(standard_names.contains(&"cancel_process".to_string()));
378 assert!(rlm_names.contains(&"list_process_handles".to_string()));
379 assert!(!rlm_names.contains(&"cancel_process".to_string()));
380 }
381
382 #[tokio::test]
383 async fn cancel_process_tool_uses_host_cancel_ability() {
384 let ability = Arc::new(DenyCancelAbility::default());
385 let context = context_with_cancel_ability(ability.clone());
386
387 let result = execute_process_cancel_tool_call(
388 &context,
389 &serde_json::json!({ "process_id": "process-1" }),
390 )
391 .await;
392
393 assert!(!result.is_success());
394 assert_eq!(
395 result.value_for_projection(),
396 serde_json::json!("plugin session error: denied by host")
397 );
398 assert_eq!(
399 ability.calls(),
400 vec![(
401 lash_core::ProcessCancelSource::Tool,
402 "process-1".to_string()
403 )]
404 );
405 }
406}