lash_plugin_process_controls/
lib.rs1use std::sync::Arc;
8
9use serde_json::Value;
10
11use lash_core::plugin::{
12 PluginError, PluginFactory, PluginSessionContext, PluginSpec, SessionPlugin,
13 StaticPluginFactory,
14};
15use lash_core::{ToolCall, ToolDefinition, ToolProvider, ToolResult, ToolScheduling};
16use lash_tool_support::{
17 LashlangToolBinding, StaticToolExecute, StaticToolProvider, ToolDefinitionLashlangExt,
18};
19
20pub struct SessionProcessAdminPluginFactory {
26 inner: StaticPluginFactory,
27}
28
29impl SessionProcessAdminPluginFactory {
30 pub fn new() -> Self {
31 Self::with_cancel_process(true)
32 }
33
34 pub fn without_cancel_process() -> Self {
35 Self::with_cancel_process(false)
36 }
37
38 fn with_cancel_process(include_cancel_process: bool) -> Self {
39 let provider = StaticToolProvider::new(
40 processes_tool_definitions(include_cancel_process),
41 SessionProcessAdminTools {
42 include_cancel_process,
43 },
44 );
45 let spec =
46 PluginSpec::new().with_tool_provider(Arc::new(provider) as Arc<dyn ToolProvider>);
47 Self {
48 inner: StaticPluginFactory::new("processess", spec),
49 }
50 }
51}
52
53impl Default for SessionProcessAdminPluginFactory {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59impl PluginFactory for SessionProcessAdminPluginFactory {
60 fn id(&self) -> &'static str {
61 self.inner.id()
62 }
63
64 fn build(&self, ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError> {
65 self.inner.build(ctx)
66 }
67}
68
69struct SessionProcessAdminTools {
70 include_cancel_process: bool,
71}
72
73#[async_trait::async_trait]
74impl StaticToolExecute for SessionProcessAdminTools {
75 async fn execute(&self, call: ToolCall<'_>) -> ToolResult {
76 match call.name {
77 "list_process_handles" => execute_process_list_tool_call(call.context, call.args).await,
78 "cancel_process" if self.include_cancel_process => {
79 execute_process_cancel_tool_call(call.context, call.args).await
80 }
81 _ => ToolResult::err_fmt(format_args!("Unknown tool: {}", call.name)),
82 }
83 }
84}
85
86pub fn process_list_tool_definition() -> ToolDefinition {
87 ToolDefinition::raw(
88 "tool:list_process_handles",
89 "list_process_handles",
90 "List process runs visible to this session, including `shell.start` runs, with process id, descriptor, optional definition name, and lifecycle status. Filters are optional; the default returns running runs.",
91 serde_json::json!({
92 "type": "object",
93 "properties": {
94 "status": {
95 "type": "string",
96 "enum": ["running", "completed", "failed", "cancelled", "any"],
97 "description": "Lifecycle status to list. The default is `running`; `any` includes historical runs."
98 },
99 "definition": {
100 "type": "object",
101 "description": "A Lashlang process definition value, for example `on_button`."
102 }
103 },
104 "additionalProperties": false
105 }),
106 process_list_output_schema(),
107 )
108 .with_examples(vec![
109 "await processes.list({})?".into(),
110 r#"await processes.list({ status: "any" })?"#.into(),
111 "await processes.list({ definition: on_button })?".into(),
112 ])
113 .with_lashlang_binding(LashlangToolBinding::new(["processes"], "list"))
114 .with_scheduling(ToolScheduling::Parallel)
115}
116
117fn processes_tool_definitions(include_cancel_process: bool) -> Vec<ToolDefinition> {
118 let mut definitions = vec![process_list_tool_definition()];
119 if include_cancel_process {
120 definitions.push(process_cancel_tool_definition());
121 }
122 definitions
123}
124
125pub fn process_cancel_tool_definition() -> ToolDefinition {
126 ToolDefinition::raw(
127 "tool:cancel_process",
128 "cancel_process",
129 "Request cancellation for a durable process, including a running `shell.start` process, by `process_id`.",
130 serde_json::json!({
131 "type": "object",
132 "properties": {
133 "process_id": {
134 "type": "string",
135 "description": "Process id returned by a process handle or `processes.list(...)`."
136 }
137 },
138 "required": ["process_id"],
139 "additionalProperties": false
140 }),
141 serde_json::json!({
142 "type": "object",
143 "properties": {
144 "process_id": { "type": "string" },
145 "status": {
146 "type": "string",
147 "enum": ["running", "completed", "failed", "cancelled"]
148 }
149 },
150 "required": ["process_id", "status"],
151 "additionalProperties": false
152 }),
153 )
154 .with_examples(vec![
155 r#"await processes.cancel({ process_id: "tool:call-01JZK7G4QP9Q4J7W3Q2E1H6M9C" })?"#.into(),
156 r#"await processes.cancel({ process_id: "subagent:session-01JZK7G4QP9Q4J7W3Q2E1H6M9C" })?"#.into(),
157 ])
158 .with_lashlang_binding(LashlangToolBinding::new(["processes"], "cancel"))
159 .with_scheduling(ToolScheduling::Parallel)
160}
161
162pub async fn execute_process_list_tool_call(
163 context: &lash_core::ToolContext<'_>,
164 args: &Value,
165) -> ToolResult {
166 let filter = match lash_core::ProcessListFilter::decode(args) {
167 Ok(filter) => filter,
168 Err(err) => return ToolResult::err_fmt(err),
169 };
170 let processes = context.processes();
171 let result = processes.list_handles_filtered(&filter).await;
172 match result {
173 Ok(entries) => ToolResult::ok(serde_json::json!(entries)),
174 Err(err) => ToolResult::err_fmt(err.to_string()),
175 }
176}
177
178fn process_list_output_schema() -> Value {
179 serde_json::json!({
180 "type": "array",
181 "items": {
182 "type": "object",
183 "properties": {
184 "__handle__": {
185 "type": "string",
186 "enum": ["process"],
187 "description": "Handle marker; pass the whole record where a process handle is needed."
188 },
189 "id": {
190 "type": "string",
191 "description": "Process handle id."
192 },
193 "process_id": {
194 "type": "string",
195 "description": "Same process id, repeated for tools that ask for process_id."
196 },
197 "descriptor": {
198 "type": "object",
199 "properties": {
200 "kind": { "type": "string" },
201 "label": { "type": "string" }
202 },
203 "additionalProperties": false
204 },
205 "definition": {
206 "type": "object",
207 "properties": {
208 "name": { "type": "string" }
209 },
210 "required": ["name"],
211 "additionalProperties": false
212 },
213 "status": {
214 "type": "string",
215 "enum": ["running", "completed", "failed", "cancelled"]
216 }
217 },
218 "required": ["__handle__", "id", "process_id", "descriptor", "status"],
219 "additionalProperties": false
220 }
221 })
222}
223
224pub async fn execute_process_cancel_tool_call(
225 context: &lash_core::ToolContext<'_>,
226 args: &Value,
227) -> ToolResult {
228 let Some(id) = args
229 .get("process_id")
230 .and_then(|value| value.as_str())
231 .map(str::trim)
232 .filter(|value| !value.is_empty())
233 else {
234 return ToolResult::err_fmt("cancel_process requires `process_id`");
235 };
236 let processes = context.processes();
237 match processes.cancel(id).await {
238 Ok(summary) => ToolResult::ok(serde_json::json!(summary)),
239 Err(err) => ToolResult::err_fmt(err.to_string()),
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use std::sync::Mutex;
247
248 #[derive(Default)]
249 struct DenyCancelAbility {
250 calls: Mutex<Vec<(lash_core::ProcessCancelSource, String)>>,
251 }
252
253 impl DenyCancelAbility {
254 fn calls(&self) -> Vec<(lash_core::ProcessCancelSource, String)> {
255 self.calls.lock().expect("cancel calls").clone()
256 }
257 }
258
259 #[async_trait::async_trait]
260 impl lash_core::ProcessCancelAbility for DenyCancelAbility {
261 async fn cancel(
262 &self,
263 _processes: &dyn lash_core::ProcessService,
264 request: lash_core::ProcessCancelRequest<'_>,
265 ) -> Result<lash_core::ProcessRecord, PluginError> {
266 self.calls
267 .lock()
268 .expect("cancel calls")
269 .push((request.source, request.process_id.to_string()));
270 Err(PluginError::Session("denied by host".to_string()))
271 }
272 }
273
274 fn context_with_cancel_ability(
275 ability: Arc<dyn lash_core::ProcessCancelAbility>,
276 ) -> lash_core::ToolContext<'static> {
277 let manager = Arc::new(lash_core::testing::MockSessionManager::default());
278 lash_core::ToolContext::__for_testing_with_process_cancel_ability(
279 "session".to_string(),
280 manager.clone(),
281 manager.clone(),
282 manager,
283 Arc::new(lash_core::UnavailableProcessService),
284 ability,
285 Arc::new(lash_core::InMemoryAttachmentStore::new()),
286 lash_core::DirectCompletionClient::from_fn(|_, _| {
287 Err(PluginError::Session(
288 "direct completions are unavailable in this test context".to_string(),
289 ))
290 }),
291 None,
292 )
293 }
294
295 #[test]
296 fn tool_definitions_expose_processes_tools() {
297 let definitions = processes_tool_definitions(true);
298 let names = definitions
299 .iter()
300 .map(|tool| tool.name().to_string())
301 .collect::<Vec<_>>();
302
303 assert_eq!(names, vec!["list_process_handles", "cancel_process"]);
304 #[cfg(not(feature = "lashlang"))]
305 assert!(
306 definitions
307 .iter()
308 .all(|tool| tool.manifest.bindings.is_empty())
309 );
310 #[cfg(feature = "lashlang")]
311 assert!(definitions.iter().all(|tool| {
312 tool.manifest
313 .bindings
314 .contains_key(lash_lashlang_runtime::LASHLANG_TOOL_BINDING_KEY)
315 }));
316 }
317
318 #[test]
319 fn cancel_process_definition_renders_contract() {
320 let definition = process_cancel_tool_definition();
321 let rendered = definition.compact_contract().render_signature();
322 assert!(rendered.contains("status: enum["), "{rendered}");
323 assert!(!rendered.contains("terminal:"), "{rendered}");
324 }
325
326 #[test]
327 fn list_process_contract_returns_handle_array() {
328 let definition = process_list_tool_definition();
329
330 assert_eq!(
331 definition.contract.output_schema["type"],
332 serde_json::json!("array")
333 );
334 let rendered = definition.compact_contract().render_signature();
335 assert!(rendered.contains("-> list[record{"), "{rendered}");
336 assert!(rendered.contains("__handle__"), "{rendered}");
337 assert!(rendered.contains("process_id"), "{rendered}");
338 assert!(rendered.contains("definition"), "{rendered}");
339 assert!(rendered.contains("status: enum["), "{rendered}");
340 assert!(rendered.contains("status?: enum["), "{rendered}");
341 assert!(rendered.contains("definition?: record"), "{rendered}");
342 assert!(!rendered.contains("history"), "{rendered}");
343 assert!(!rendered.contains("terminal:"), "{rendered}");
344 }
345
346 #[test]
347 fn plugin_registers_cancel_when_configured_and_omits_it_otherwise() {
348 let standard_session = lash_core::PluginHost::new(
349 std::iter::once(
350 Arc::new(SessionProcessAdminPluginFactory::new()) as Arc<dyn PluginFactory>
351 )
352 .chain(lash_core::testing::test_standard_protocol_factories())
353 .collect(),
354 )
355 .build_session("standard", None)
356 .expect("standard session");
357 let standard_names = standard_session
358 .resolved_tool_catalog("standard")
359 .expect("standard tool catalog")
360 .tool_names()
361 .as_ref()
362 .clone();
363
364 let rlm_session = lash_core::PluginHost::new(
365 std::iter::once(
366 Arc::new(SessionProcessAdminPluginFactory::without_cancel_process())
367 as Arc<dyn PluginFactory>,
368 )
369 .chain(lash_core::testing::test_code_protocol_factories())
370 .collect(),
371 )
372 .build_session("rlm", None)
373 .expect("rlm session");
374 let rlm_names = rlm_session
375 .resolved_tool_catalog("rlm")
376 .expect("rlm tool catalog")
377 .tool_names()
378 .as_ref()
379 .clone();
380
381 assert!(standard_names.contains(&"list_process_handles".to_string()));
382 assert!(standard_names.contains(&"cancel_process".to_string()));
383 assert!(rlm_names.contains(&"list_process_handles".to_string()));
384 assert!(!rlm_names.contains(&"cancel_process".to_string()));
385 }
386
387 #[tokio::test]
388 async fn cancel_process_tool_uses_host_cancel_ability() {
389 let ability = Arc::new(DenyCancelAbility::default());
390 let context = context_with_cancel_ability(ability.clone());
391
392 let result = execute_process_cancel_tool_call(
393 &context,
394 &serde_json::json!({ "process_id": "process-1" }),
395 )
396 .await;
397
398 assert!(!result.is_success());
399 assert_eq!(
400 result.value_for_projection(),
401 serde_json::json!("plugin session error: denied by host")
402 );
403 assert_eq!(
404 ability.calls(),
405 vec![(
406 lash_core::ProcessCancelSource::Tool,
407 "process-1".to_string()
408 )]
409 );
410 }
411}