Skip to main content

lash_plugin_process_controls/
lib.rs

1//! Protocol-stack runtime-control tools (`processes.list`,
2//! `processes.cancel`).
3//!
4//! Dedicated plugins register these tools into the normal tool-provider
5//! surface, so protocol crates do not own or duplicate runtime control behavior.
6
7use std::sync::Arc;
8
9use serde_json::Value;
10
11use lash_core::plugin::{
12    PluginError, PluginFactory, PluginSessionContext, PluginSpec, SessionPlugin,
13    StaticPluginFactory,
14};
15use lash_core::{
16    ToolAvailabilityConfig, ToolCall, ToolDefinition, ToolProvider, ToolResult, ToolScheduling,
17};
18use lash_tool_support::{
19    LashlangToolBinding, StaticToolExecute, StaticToolProvider, ToolDefinitionLashlangExt,
20};
21
22/// Plugin factory for process-control tools.
23///
24/// Declares its provider through a [`PluginSpec`] driven by
25/// [`StaticPluginFactory`], so it does not hand-roll the `SessionPlugin` +
26/// `register` ceremony.
27pub struct SessionProcessAdminPluginFactory {
28    inner: StaticPluginFactory,
29}
30
31impl SessionProcessAdminPluginFactory {
32    pub fn new() -> Self {
33        Self::with_cancel_process(true)
34    }
35
36    pub fn without_cancel_process() -> Self {
37        Self::with_cancel_process(false)
38    }
39
40    fn with_cancel_process(include_cancel_process: bool) -> Self {
41        let provider = StaticToolProvider::new(
42            processes_tool_definitions(include_cancel_process),
43            SessionProcessAdminTools {
44                include_cancel_process,
45            },
46        );
47        let spec =
48            PluginSpec::new().with_tool_provider(Arc::new(provider) as Arc<dyn ToolProvider>);
49        Self {
50            inner: StaticPluginFactory::new("processess", spec),
51        }
52    }
53}
54
55impl Default for SessionProcessAdminPluginFactory {
56    fn default() -> Self {
57        Self::new()
58    }
59}
60
61impl PluginFactory for SessionProcessAdminPluginFactory {
62    fn id(&self) -> &'static str {
63        self.inner.id()
64    }
65
66    fn build(&self, ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError> {
67        self.inner.build(ctx)
68    }
69}
70
71struct SessionProcessAdminTools {
72    include_cancel_process: bool,
73}
74
75#[async_trait::async_trait]
76impl StaticToolExecute for SessionProcessAdminTools {
77    async fn execute(&self, call: ToolCall<'_>) -> ToolResult {
78        match call.name {
79            "list_process_handles" => execute_process_list_tool_call(call.context, call.args).await,
80            "cancel_process" if self.include_cancel_process => {
81                execute_process_cancel_tool_call(call.context, call.args).await
82            }
83            _ => ToolResult::err_fmt(format_args!("Unknown tool: {}", call.name)),
84        }
85    }
86}
87
88pub fn process_list_tool_definition() -> ToolDefinition {
89    ToolDefinition::raw(
90        "tool:list_process_handles",
91        "list_process_handles",
92        "List process runs visible to this session, including `shell.start` runs, with process id, descriptor, optional definition name, and lifecycle status. Filters are optional; the default returns running runs.",
93        serde_json::json!({
94            "type": "object",
95            "properties": {
96                "status": {
97                    "type": "string",
98                    "enum": ["running", "completed", "failed", "cancelled", "any"],
99                    "description": "Lifecycle status to list. The default is `running`; `any` includes historical runs."
100                },
101                "definition": {
102                    "type": "object",
103                    "description": "A Lashlang process definition value, for example `on_button`."
104                }
105            },
106            "additionalProperties": false
107        }),
108        process_list_output_schema(),
109    )
110    .with_examples(vec![
111        "await processes.list({})?".into(),
112        r#"await processes.list({ status: "any" })?"#.into(),
113        "await processes.list({ definition: on_button })?".into(),
114    ])
115    .with_lashlang_binding(LashlangToolBinding::new(["processes"], "list"))
116    .with_availability(ToolAvailabilityConfig::callable())
117    .with_scheduling(ToolScheduling::Parallel)
118}
119
120fn processes_tool_definitions(include_cancel_process: bool) -> Vec<ToolDefinition> {
121    let mut definitions = vec![process_list_tool_definition()];
122    if include_cancel_process {
123        definitions.push(process_cancel_tool_definition());
124    }
125    definitions
126}
127
128pub fn process_cancel_tool_definition() -> ToolDefinition {
129    ToolDefinition::raw(
130        "tool:cancel_process",
131        "cancel_process",
132        "Request cancellation for a durable process, including a running `shell.start` process, by `process_id`.",
133        serde_json::json!({
134            "type": "object",
135            "properties": {
136                "process_id": {
137                    "type": "string",
138                    "description": "Process id returned by a process handle or `processes.list(...)`."
139                }
140            },
141            "required": ["process_id"],
142            "additionalProperties": false
143        }),
144        serde_json::json!({
145            "type": "object",
146            "properties": {
147                "process_id": { "type": "string" },
148                "status": {
149                    "type": "string",
150                    "enum": ["running", "completed", "failed", "cancelled"]
151                }
152            },
153            "required": ["process_id", "status"],
154            "additionalProperties": false
155        }),
156    )
157    .with_examples(vec![
158        r#"await processes.cancel({ process_id: "tool:call-01JZK7G4QP9Q4J7W3Q2E1H6M9C" })?"#.into(),
159        r#"await processes.cancel({ process_id: "subagent:session-01JZK7G4QP9Q4J7W3Q2E1H6M9C" })?"#.into(),
160    ])
161    .with_lashlang_binding(LashlangToolBinding::new(["processes"], "cancel"))
162    .with_availability(ToolAvailabilityConfig::callable())
163    .with_scheduling(ToolScheduling::Parallel)
164}
165
166pub async fn execute_process_list_tool_call(
167    context: &lash_core::ToolContext<'_>,
168    args: &Value,
169) -> ToolResult {
170    let filter = match lash_core::ProcessListFilter::decode(args) {
171        Ok(filter) => filter,
172        Err(err) => return ToolResult::err_fmt(err),
173    };
174    let processes = context.processes();
175    let result = processes.list_handles_filtered(&filter).await;
176    match result {
177        Ok(entries) => ToolResult::ok(serde_json::json!(entries)),
178        Err(err) => ToolResult::err_fmt(err.to_string()),
179    }
180}
181
182fn process_list_output_schema() -> Value {
183    serde_json::json!({
184        "type": "array",
185        "items": {
186            "type": "object",
187            "properties": {
188                "__handle__": {
189                    "type": "string",
190                    "enum": ["process"],
191                    "description": "Handle marker; pass the whole record where a process handle is needed."
192                },
193                "id": {
194                    "type": "string",
195                    "description": "Process handle id."
196                },
197                "process_id": {
198                    "type": "string",
199                    "description": "Same process id, repeated for tools that ask for process_id."
200                },
201                "descriptor": {
202                    "type": "object",
203                    "properties": {
204                        "kind": { "type": "string" },
205                        "label": { "type": "string" }
206                    },
207                    "additionalProperties": false
208                },
209                "definition": {
210                    "type": "object",
211                    "properties": {
212                        "name": { "type": "string" }
213                    },
214                    "required": ["name"],
215                    "additionalProperties": false
216                },
217                "status": {
218                    "type": "string",
219                    "enum": ["running", "completed", "failed", "cancelled"]
220                }
221            },
222            "required": ["__handle__", "id", "process_id", "descriptor", "status"],
223            "additionalProperties": false
224        }
225    })
226}
227
228pub async fn execute_process_cancel_tool_call(
229    context: &lash_core::ToolContext<'_>,
230    args: &Value,
231) -> ToolResult {
232    let Some(id) = args
233        .get("process_id")
234        .and_then(|value| value.as_str())
235        .map(str::trim)
236        .filter(|value| !value.is_empty())
237    else {
238        return ToolResult::err_fmt("cancel_process requires `process_id`");
239    };
240    let processes = context.processes();
241    match processes.cancel(id).await {
242        Ok(summary) => ToolResult::ok(serde_json::json!(summary)),
243        Err(err) => ToolResult::err_fmt(err.to_string()),
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250    use std::sync::Mutex;
251
252    #[derive(Default)]
253    struct DenyCancelAbility {
254        calls: Mutex<Vec<(lash_core::ProcessCancelSource, String)>>,
255    }
256
257    impl DenyCancelAbility {
258        fn calls(&self) -> Vec<(lash_core::ProcessCancelSource, String)> {
259            self.calls.lock().expect("cancel calls").clone()
260        }
261    }
262
263    #[async_trait::async_trait]
264    impl lash_core::ProcessCancelAbility for DenyCancelAbility {
265        async fn cancel(
266            &self,
267            _processes: &dyn lash_core::ProcessService,
268            request: lash_core::ProcessCancelRequest<'_>,
269        ) -> Result<lash_core::ProcessRecord, PluginError> {
270            self.calls
271                .lock()
272                .expect("cancel calls")
273                .push((request.source, request.process_id.to_string()));
274            Err(PluginError::Session("denied by host".to_string()))
275        }
276    }
277
278    fn context_with_cancel_ability(
279        ability: Arc<dyn lash_core::ProcessCancelAbility>,
280    ) -> lash_core::ToolContext<'static> {
281        let manager = Arc::new(lash_core::testing::MockSessionManager::default());
282        lash_core::ToolContext::__for_testing_with_process_cancel_ability(
283            "session".to_string(),
284            manager.clone(),
285            manager.clone(),
286            manager,
287            Arc::new(lash_core::UnavailableProcessService),
288            ability,
289            Arc::new(lash_core::InMemoryAttachmentStore::new()),
290            lash_core::DirectCompletionClient::from_fn(|_, _| {
291                Err(PluginError::Session(
292                    "direct completions are unavailable in this test context".to_string(),
293                ))
294            }),
295            None,
296        )
297    }
298
299    #[test]
300    fn tool_definitions_expose_processes_tools() {
301        let definitions = processes_tool_definitions(true);
302        let names = definitions
303            .iter()
304            .map(|tool| tool.name().to_string())
305            .collect::<Vec<_>>();
306
307        assert_eq!(names, vec!["list_process_handles", "cancel_process"]);
308        #[cfg(not(feature = "lashlang"))]
309        assert!(
310            definitions
311                .iter()
312                .all(|tool| tool.manifest.bindings.is_empty())
313        );
314        #[cfg(feature = "lashlang")]
315        assert!(definitions.iter().all(|tool| {
316            tool.manifest
317                .bindings
318                .contains_key(lash_lashlang_runtime::LASHLANG_TOOL_BINDING_KEY)
319        }));
320    }
321
322    #[test]
323    fn cancel_process_definition_is_callable_when_registered() {
324        let definition = process_cancel_tool_definition();
325        assert_eq!(
326            definition.effective_availability(),
327            lash_core::ToolAvailability::Callable
328        );
329        let rendered = definition.compact_contract().render_signature();
330        assert!(rendered.contains("status: enum["), "{rendered}");
331        assert!(!rendered.contains("terminal:"), "{rendered}");
332    }
333
334    #[test]
335    fn list_process_contract_returns_handle_array() {
336        let definition = process_list_tool_definition();
337
338        assert_eq!(
339            definition.contract.output_schema["type"],
340            serde_json::json!("array")
341        );
342        let rendered = definition.compact_contract().render_signature();
343        assert!(rendered.contains("-> list[record{"), "{rendered}");
344        assert!(rendered.contains("__handle__"), "{rendered}");
345        assert!(rendered.contains("process_id"), "{rendered}");
346        assert!(rendered.contains("definition"), "{rendered}");
347        assert!(rendered.contains("status: enum["), "{rendered}");
348        assert!(rendered.contains("status?: enum["), "{rendered}");
349        assert!(rendered.contains("definition?: record"), "{rendered}");
350        assert!(!rendered.contains("history"), "{rendered}");
351        assert!(!rendered.contains("terminal:"), "{rendered}");
352    }
353
354    #[test]
355    fn plugin_registers_cancel_when_configured_and_omits_it_otherwise() {
356        let standard_session = lash_core::PluginHost::new(
357            std::iter::once(
358                Arc::new(SessionProcessAdminPluginFactory::new()) as Arc<dyn PluginFactory>
359            )
360            .chain(lash_core::testing::test_standard_protocol_factories())
361            .collect(),
362        )
363        .build_session("standard", None)
364        .expect("standard session");
365        let standard_names = standard_session
366            .resolved_tool_catalog("standard")
367            .expect("standard tool catalog")
368            .tool_names()
369            .as_ref()
370            .clone();
371
372        let rlm_session = lash_core::PluginHost::new(
373            std::iter::once(
374                Arc::new(SessionProcessAdminPluginFactory::without_cancel_process())
375                    as Arc<dyn PluginFactory>,
376            )
377            .chain(lash_core::testing::test_code_protocol_factories())
378            .collect(),
379        )
380        .build_session("rlm", None)
381        .expect("rlm session");
382        let rlm_names = rlm_session
383            .resolved_tool_catalog("rlm")
384            .expect("rlm tool catalog")
385            .tool_names()
386            .as_ref()
387            .clone();
388
389        assert!(standard_names.contains(&"list_process_handles".to_string()));
390        assert!(standard_names.contains(&"cancel_process".to_string()));
391        assert!(rlm_names.contains(&"list_process_handles".to_string()));
392        assert!(!rlm_names.contains(&"cancel_process".to_string()));
393    }
394
395    #[tokio::test]
396    async fn cancel_process_tool_uses_host_cancel_ability() {
397        let ability = Arc::new(DenyCancelAbility::default());
398        let context = context_with_cancel_ability(ability.clone());
399
400        let result = execute_process_cancel_tool_call(
401            &context,
402            &serde_json::json!({ "process_id": "process-1" }),
403        )
404        .await;
405
406        assert!(!result.is_success());
407        assert_eq!(
408            result.value_for_projection(),
409            serde_json::json!("plugin session error: denied by host")
410        );
411        assert_eq!(
412            ability.calls(),
413            vec![(
414                lash_core::ProcessCancelSource::Tool,
415                "process-1".to_string()
416            )]
417        );
418    }
419}