Skip to main content

lash_plugin_process_controls/
lib.rs

1//! Protocol-stack runtime-control tools (`processes.list`,
2//! `processes.cancel`).
3//!
4//! Dedicated plugins register these tools into the normal tool-provider
5//! surface, so protocol crates do not own or duplicate runtime control behavior.
6
7use std::sync::Arc;
8
9use serde_json::Value;
10
11use lash_core::plugin::{
12    PluginError, PluginFactory, PluginSessionContext, PluginSpec, SessionPlugin,
13    StaticPluginFactory,
14};
15use lash_core::{ToolCall, ToolDefinition, ToolProvider, ToolResult, ToolScheduling};
16use lash_tool_support::{
17    LashlangToolBinding, StaticToolExecute, StaticToolProvider, ToolDefinitionLashlangExt,
18};
19
20/// Plugin factory for process-control tools.
21///
22/// Declares its provider through a [`PluginSpec`] driven by
23/// [`StaticPluginFactory`], so it does not hand-roll the `SessionPlugin` +
24/// `register` ceremony.
25pub struct SessionProcessAdminPluginFactory {
26    inner: StaticPluginFactory,
27}
28
29impl SessionProcessAdminPluginFactory {
30    pub fn new() -> Self {
31        Self::with_cancel_process(true)
32    }
33
34    pub fn without_cancel_process() -> Self {
35        Self::with_cancel_process(false)
36    }
37
38    fn with_cancel_process(include_cancel_process: bool) -> Self {
39        let provider = StaticToolProvider::new(
40            processes_tool_definitions(include_cancel_process),
41            SessionProcessAdminTools {
42                include_cancel_process,
43            },
44        );
45        let spec =
46            PluginSpec::new().with_tool_provider(Arc::new(provider) as Arc<dyn ToolProvider>);
47        Self {
48            inner: StaticPluginFactory::new("processess", spec),
49        }
50    }
51}
52
53impl Default for SessionProcessAdminPluginFactory {
54    fn default() -> Self {
55        Self::new()
56    }
57}
58
59impl PluginFactory for SessionProcessAdminPluginFactory {
60    fn id(&self) -> &'static str {
61        self.inner.id()
62    }
63
64    fn build(&self, ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError> {
65        self.inner.build(ctx)
66    }
67}
68
69struct SessionProcessAdminTools {
70    include_cancel_process: bool,
71}
72
73#[async_trait::async_trait]
74impl StaticToolExecute for SessionProcessAdminTools {
75    async fn execute(&self, call: ToolCall<'_>) -> ToolResult {
76        match call.name {
77            "list_process_handles" => execute_process_list_tool_call(call.context, call.args).await,
78            "cancel_process" if self.include_cancel_process => {
79                execute_process_cancel_tool_call(call.context, call.args).await
80            }
81            _ => ToolResult::err_fmt(format_args!("Unknown tool: {}", call.name)),
82        }
83    }
84}
85
86pub fn process_list_tool_definition() -> ToolDefinition {
87    ToolDefinition::raw(
88        "tool:list_process_handles",
89        "list_process_handles",
90        "List process runs visible to this session, including `shell.start` runs, with process id, descriptor, optional definition name, and lifecycle status. Filters are optional; the default returns running runs.",
91        serde_json::json!({
92            "type": "object",
93            "properties": {
94                "status": {
95                    "type": "string",
96                    "enum": ["running", "completed", "failed", "cancelled", "any"],
97                    "description": "Lifecycle status to list. The default is `running`; `any` includes historical runs."
98                },
99                "definition": {
100                    "type": "object",
101                    "description": "A Lashlang process definition value, for example `on_button`."
102                }
103            },
104            "additionalProperties": false
105        }),
106        process_list_output_schema(),
107    )
108    .with_examples(vec![
109        "await processes.list({})?".into(),
110        r#"await processes.list({ status: "any" })?"#.into(),
111        "await processes.list({ definition: on_button })?".into(),
112    ])
113    .with_lashlang_binding(LashlangToolBinding::new(["processes"], "list"))
114    .with_scheduling(ToolScheduling::Parallel)
115}
116
117fn processes_tool_definitions(include_cancel_process: bool) -> Vec<ToolDefinition> {
118    let mut definitions = vec![process_list_tool_definition()];
119    if include_cancel_process {
120        definitions.push(process_cancel_tool_definition());
121    }
122    definitions
123}
124
125pub fn process_cancel_tool_definition() -> ToolDefinition {
126    ToolDefinition::raw(
127        "tool:cancel_process",
128        "cancel_process",
129        "Request cancellation for a durable process, including a running `shell.start` process, by `process_id`.",
130        serde_json::json!({
131            "type": "object",
132            "properties": {
133                "process_id": {
134                    "type": "string",
135                    "description": "Process id returned by a process handle or `processes.list(...)`."
136                }
137            },
138            "required": ["process_id"],
139            "additionalProperties": false
140        }),
141        serde_json::json!({
142            "type": "object",
143            "properties": {
144                "process_id": { "type": "string" },
145                "status": {
146                    "type": "string",
147                    "enum": ["running", "completed", "failed", "cancelled"]
148                }
149            },
150            "required": ["process_id", "status"],
151            "additionalProperties": false
152        }),
153    )
154    .with_examples(vec![
155        r#"await processes.cancel({ process_id: "tool:call-01JZK7G4QP9Q4J7W3Q2E1H6M9C" })?"#.into(),
156        r#"await processes.cancel({ process_id: "subagent:session-01JZK7G4QP9Q4J7W3Q2E1H6M9C" })?"#.into(),
157    ])
158    .with_lashlang_binding(LashlangToolBinding::new(["processes"], "cancel"))
159    .with_scheduling(ToolScheduling::Parallel)
160}
161
162pub async fn execute_process_list_tool_call(
163    context: &lash_core::ToolContext<'_>,
164    args: &Value,
165) -> ToolResult {
166    let filter = match lash_core::ProcessListFilter::decode(args) {
167        Ok(filter) => filter,
168        Err(err) => return ToolResult::err_fmt(err),
169    };
170    let processes = context.processes();
171    let result = processes.list_handles_filtered(&filter).await;
172    match result {
173        Ok(entries) => ToolResult::ok(serde_json::json!(entries)),
174        Err(err) => ToolResult::err_fmt(err.to_string()),
175    }
176}
177
178fn process_list_output_schema() -> Value {
179    serde_json::json!({
180        "type": "array",
181        "items": {
182            "type": "object",
183            "properties": {
184                "__handle__": {
185                    "type": "string",
186                    "enum": ["process"],
187                    "description": "Handle marker; pass the whole record where a process handle is needed."
188                },
189                "id": {
190                    "type": "string",
191                    "description": "Process handle id."
192                },
193                "process_id": {
194                    "type": "string",
195                    "description": "Same process id, repeated for tools that ask for process_id."
196                },
197                "descriptor": {
198                    "type": "object",
199                    "properties": {
200                        "kind": { "type": "string" },
201                        "label": { "type": "string" }
202                    },
203                    "additionalProperties": false
204                },
205                "definition": {
206                    "type": "object",
207                    "properties": {
208                        "name": { "type": "string" }
209                    },
210                    "required": ["name"],
211                    "additionalProperties": false
212                },
213                "status": {
214                    "type": "string",
215                    "enum": ["running", "completed", "failed", "cancelled"]
216                }
217            },
218            "required": ["__handle__", "id", "process_id", "descriptor", "status"],
219            "additionalProperties": false
220        }
221    })
222}
223
224pub async fn execute_process_cancel_tool_call(
225    context: &lash_core::ToolContext<'_>,
226    args: &Value,
227) -> ToolResult {
228    let Some(id) = args
229        .get("process_id")
230        .and_then(|value| value.as_str())
231        .map(str::trim)
232        .filter(|value| !value.is_empty())
233    else {
234        return ToolResult::err_fmt("cancel_process requires `process_id`");
235    };
236    let processes = context.processes();
237    match processes.cancel(id).await {
238        Ok(summary) => ToolResult::ok(serde_json::json!(summary)),
239        Err(err) => ToolResult::err_fmt(err.to_string()),
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use std::sync::Mutex;
247
248    #[derive(Default)]
249    struct DenyCancelAbility {
250        calls: Mutex<Vec<(lash_core::ProcessCancelSource, String)>>,
251    }
252
253    impl DenyCancelAbility {
254        fn calls(&self) -> Vec<(lash_core::ProcessCancelSource, String)> {
255            self.calls.lock().expect("cancel calls").clone()
256        }
257    }
258
259    #[async_trait::async_trait]
260    impl lash_core::ProcessCancelAbility for DenyCancelAbility {
261        async fn cancel(
262            &self,
263            _processes: &dyn lash_core::ProcessService,
264            request: lash_core::ProcessCancelRequest<'_>,
265        ) -> Result<lash_core::ProcessRecord, PluginError> {
266            self.calls
267                .lock()
268                .expect("cancel calls")
269                .push((request.source, request.process_id.to_string()));
270            Err(PluginError::Session("denied by host".to_string()))
271        }
272    }
273
274    fn context_with_cancel_ability(
275        ability: Arc<dyn lash_core::ProcessCancelAbility>,
276    ) -> lash_core::ToolContext<'static> {
277        let manager = Arc::new(lash_core::testing::MockSessionManager::default());
278        lash_core::ToolContext::__for_testing_with_process_cancel_ability(
279            "session".to_string(),
280            manager.clone(),
281            manager.clone(),
282            manager,
283            Arc::new(lash_core::UnavailableProcessService),
284            ability,
285            Arc::new(lash_core::InMemoryAttachmentStore::new()),
286            lash_core::DirectCompletionClient::from_fn(|_, _| {
287                Err(PluginError::Session(
288                    "direct completions are unavailable in this test context".to_string(),
289                ))
290            }),
291            None,
292        )
293    }
294
295    #[test]
296    fn tool_definitions_expose_processes_tools() {
297        let definitions = processes_tool_definitions(true);
298        let names = definitions
299            .iter()
300            .map(|tool| tool.name().to_string())
301            .collect::<Vec<_>>();
302
303        assert_eq!(names, vec!["list_process_handles", "cancel_process"]);
304        #[cfg(not(feature = "lashlang"))]
305        assert!(
306            definitions
307                .iter()
308                .all(|tool| tool.manifest.bindings.is_empty())
309        );
310        #[cfg(feature = "lashlang")]
311        assert!(definitions.iter().all(|tool| {
312            tool.manifest
313                .bindings
314                .contains_key(lash_lashlang_runtime::LASHLANG_TOOL_BINDING_KEY)
315        }));
316    }
317
318    #[test]
319    fn cancel_process_definition_renders_contract() {
320        let definition = process_cancel_tool_definition();
321        let rendered = definition.compact_contract().render_signature();
322        assert!(rendered.contains("status: enum["), "{rendered}");
323        assert!(!rendered.contains("terminal:"), "{rendered}");
324    }
325
326    #[test]
327    fn list_process_contract_returns_handle_array() {
328        let definition = process_list_tool_definition();
329
330        assert_eq!(
331            definition.contract.output_schema["type"],
332            serde_json::json!("array")
333        );
334        let rendered = definition.compact_contract().render_signature();
335        assert!(rendered.contains("-> list[record{"), "{rendered}");
336        assert!(rendered.contains("__handle__"), "{rendered}");
337        assert!(rendered.contains("process_id"), "{rendered}");
338        assert!(rendered.contains("definition"), "{rendered}");
339        assert!(rendered.contains("status: enum["), "{rendered}");
340        assert!(rendered.contains("status?: enum["), "{rendered}");
341        assert!(rendered.contains("definition?: record"), "{rendered}");
342        assert!(!rendered.contains("history"), "{rendered}");
343        assert!(!rendered.contains("terminal:"), "{rendered}");
344    }
345
346    #[test]
347    fn plugin_registers_cancel_when_configured_and_omits_it_otherwise() {
348        let standard_session = lash_core::PluginHost::new(
349            std::iter::once(
350                Arc::new(SessionProcessAdminPluginFactory::new()) as Arc<dyn PluginFactory>
351            )
352            .chain(lash_core::testing::test_standard_protocol_factories())
353            .collect(),
354        )
355        .build_session("standard", None)
356        .expect("standard session");
357        let standard_names = standard_session
358            .resolved_tool_catalog("standard")
359            .expect("standard tool catalog")
360            .tool_names()
361            .as_ref()
362            .clone();
363
364        let rlm_session = lash_core::PluginHost::new(
365            std::iter::once(
366                Arc::new(SessionProcessAdminPluginFactory::without_cancel_process())
367                    as Arc<dyn PluginFactory>,
368            )
369            .chain(lash_core::testing::test_code_protocol_factories())
370            .collect(),
371        )
372        .build_session("rlm", None)
373        .expect("rlm session");
374        let rlm_names = rlm_session
375            .resolved_tool_catalog("rlm")
376            .expect("rlm tool catalog")
377            .tool_names()
378            .as_ref()
379            .clone();
380
381        assert!(standard_names.contains(&"list_process_handles".to_string()));
382        assert!(standard_names.contains(&"cancel_process".to_string()));
383        assert!(rlm_names.contains(&"list_process_handles".to_string()));
384        assert!(!rlm_names.contains(&"cancel_process".to_string()));
385    }
386
387    #[tokio::test]
388    async fn cancel_process_tool_uses_host_cancel_ability() {
389        let ability = Arc::new(DenyCancelAbility::default());
390        let context = context_with_cancel_ability(ability.clone());
391
392        let result = execute_process_cancel_tool_call(
393            &context,
394            &serde_json::json!({ "process_id": "process-1" }),
395        )
396        .await;
397
398        assert!(!result.is_success());
399        assert_eq!(
400            result.value_for_projection(),
401            serde_json::json!("plugin session error: denied by host")
402        );
403        assert_eq!(
404            ability.calls(),
405            vec![(
406                lash_core::ProcessCancelSource::Tool,
407                "process-1".to_string()
408            )]
409        );
410    }
411}