Skip to main content

lash_plugin_process_controls/
lib.rs

1//! Protocol-stack runtime-control tools (`processes.list`,
2//! `processes.cancel`).
3//!
4//! Dedicated plugins register these tools into the normal tool-provider
5//! surface, so protocol crates do not own or duplicate runtime control behavior.
6
7use std::sync::Arc;
8
9use serde_json::Value;
10
11use lash_core::plugin::{
12    PluginError, PluginFactory, PluginSessionContext, PluginSpec, SessionPlugin,
13    StaticPluginFactory,
14};
15use lash_core::{
16    ToolAgentSurface, ToolAvailabilityConfig, ToolCall, ToolDefinition, ToolProvider, ToolResult,
17    ToolScheduling,
18};
19use lash_tool_support::{StaticToolExecute, StaticToolProvider};
20
21/// Plugin factory for process-control tools.
22///
23/// Declares its provider through a [`PluginSpec`] driven by
24/// [`StaticPluginFactory`], so it does not hand-roll the `SessionPlugin` +
25/// `register` ceremony.
26pub struct ProcessControlsPluginFactory {
27    inner: StaticPluginFactory,
28}
29
30impl ProcessControlsPluginFactory {
31    pub fn new() -> Self {
32        Self::with_cancel_process(true)
33    }
34
35    pub fn without_cancel_process() -> Self {
36        Self::with_cancel_process(false)
37    }
38
39    fn with_cancel_process(include_cancel_process: bool) -> Self {
40        let provider = StaticToolProvider::new(
41            process_control_tool_definitions(include_cancel_process),
42            ProcessControlsTools {
43                include_cancel_process,
44            },
45        );
46        let spec =
47            PluginSpec::new().with_tool_provider(Arc::new(provider) as Arc<dyn ToolProvider>);
48        Self {
49            inner: StaticPluginFactory::new("process_controls", spec),
50        }
51    }
52}
53
54impl Default for ProcessControlsPluginFactory {
55    fn default() -> Self {
56        Self::new()
57    }
58}
59
60impl PluginFactory for ProcessControlsPluginFactory {
61    fn id(&self) -> &'static str {
62        self.inner.id()
63    }
64
65    fn build(&self, ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError> {
66        self.inner.build(ctx)
67    }
68}
69
70struct ProcessControlsTools {
71    include_cancel_process: bool,
72}
73
74#[async_trait::async_trait]
75impl StaticToolExecute for ProcessControlsTools {
76    async fn execute(&self, call: ToolCall<'_>) -> ToolResult {
77        match call.name {
78            "list_process_handles" => execute_process_list_tool_call(call.context, call.args).await,
79            "cancel_process" if self.include_cancel_process => {
80                execute_process_cancel_tool_call(call.context, call.args).await
81            }
82            _ => ToolResult::err_fmt(format_args!("Unknown tool: {}", call.name)),
83        }
84    }
85}
86
87pub fn process_list_tool_definition() -> ToolDefinition {
88    ToolDefinition::raw(
89        "tool:list_process_handles",
90        "list_process_handles",
91        "List process runs visible to this session, including `shell.start` runs, with process id, descriptor, optional definition name, and lifecycle status. Filters are optional; the default returns running runs.",
92        serde_json::json!({
93            "type": "object",
94            "properties": {
95                "status": {
96                    "type": "string",
97                    "enum": ["running", "completed", "failed", "cancelled", "any"],
98                    "description": "Lifecycle status to list. The default is `running`; `any` includes historical runs."
99                },
100                "definition": {
101                    "type": "object",
102                    "description": "A Lashlang process definition value, for example `on_button`."
103                }
104            },
105            "additionalProperties": false
106        }),
107        process_list_output_schema(),
108    )
109    .with_examples(vec![
110        "await processes.list({})?".into(),
111        r#"await processes.list({ status: "any" })?"#.into(),
112        "await processes.list({ definition: on_button })?".into(),
113    ])
114    .with_agent_surface(ToolAgentSurface::new(["processes"], "list"))
115    .with_availability(ToolAvailabilityConfig::callable())
116    .with_scheduling(ToolScheduling::Parallel)
117}
118
119fn process_control_tool_definitions(include_cancel_process: bool) -> Vec<ToolDefinition> {
120    let mut definitions = vec![process_list_tool_definition()];
121    if include_cancel_process {
122        definitions.push(process_cancel_tool_definition());
123    }
124    definitions
125}
126
127pub fn process_cancel_tool_definition() -> ToolDefinition {
128    ToolDefinition::raw(
129        "tool:cancel_process",
130        "cancel_process",
131        "Request cancellation for a durable process, including a running `shell.start` process, by `process_id`.",
132        serde_json::json!({
133            "type": "object",
134            "properties": {
135                "process_id": {
136                    "type": "string",
137                    "description": "Process id returned by a process handle or `processes.list(...)`."
138                }
139            },
140            "required": ["process_id"],
141            "additionalProperties": false
142        }),
143        serde_json::json!({
144            "type": "object",
145            "properties": {
146                "process_id": { "type": "string" },
147                "status": {
148                    "type": "string",
149                    "enum": ["running", "completed", "failed", "cancelled"]
150                }
151            },
152            "required": ["process_id", "status"],
153            "additionalProperties": false
154        }),
155    )
156    .with_examples(vec![
157        r#"await processes.cancel({ process_id: "tool:call-01JZK7G4QP9Q4J7W3Q2E1H6M9C" })?"#.into(),
158        r#"await processes.cancel({ process_id: "subagent:session-01JZK7G4QP9Q4J7W3Q2E1H6M9C" })?"#.into(),
159    ])
160    .with_agent_surface(ToolAgentSurface::new(["processes"], "cancel"))
161    .with_availability(ToolAvailabilityConfig::callable())
162    .with_scheduling(ToolScheduling::Parallel)
163}
164
165pub async fn execute_process_list_tool_call(
166    context: &lash_core::ToolContext<'_>,
167    args: &Value,
168) -> ToolResult {
169    let filter = match lash_core::ProcessListFilter::decode(args) {
170        Ok(filter) => filter,
171        Err(err) => return ToolResult::err_fmt(err),
172    };
173    let processes = context.processes();
174    let result = processes.list_handles_filtered(&filter).await;
175    match result {
176        Ok(entries) => ToolResult::ok(serde_json::json!(entries)),
177        Err(err) => ToolResult::err_fmt(err.to_string()),
178    }
179}
180
181fn process_list_output_schema() -> Value {
182    serde_json::json!({
183        "type": "array",
184        "items": {
185            "type": "object",
186            "properties": {
187                "__handle__": {
188                    "type": "string",
189                    "enum": ["process"],
190                    "description": "Handle marker; pass the whole record where a process handle is needed."
191                },
192                "id": {
193                    "type": "string",
194                    "description": "Process handle id."
195                },
196                "process_id": {
197                    "type": "string",
198                    "description": "Same process id, repeated for tools that ask for process_id."
199                },
200                "descriptor": {
201                    "type": "object",
202                    "properties": {
203                        "kind": { "type": "string" },
204                        "label": { "type": "string" }
205                    },
206                    "additionalProperties": false
207                },
208                "definition": {
209                    "type": "object",
210                    "properties": {
211                        "name": { "type": "string" }
212                    },
213                    "required": ["name"],
214                    "additionalProperties": false
215                },
216                "status": {
217                    "type": "string",
218                    "enum": ["running", "completed", "failed", "cancelled"]
219                }
220            },
221            "required": ["__handle__", "id", "process_id", "descriptor", "status"],
222            "additionalProperties": false
223        }
224    })
225}
226
227pub async fn execute_process_cancel_tool_call(
228    context: &lash_core::ToolContext<'_>,
229    args: &Value,
230) -> ToolResult {
231    let Some(id) = args
232        .get("process_id")
233        .and_then(|value| value.as_str())
234        .map(str::trim)
235        .filter(|value| !value.is_empty())
236    else {
237        return ToolResult::err_fmt("cancel_process requires `process_id`");
238    };
239    let processes = context.processes();
240    match processes.cancel(id).await {
241        Ok(summary) => ToolResult::ok(serde_json::json!(summary)),
242        Err(err) => ToolResult::err_fmt(err.to_string()),
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use std::sync::Mutex;
250
251    #[derive(Default)]
252    struct DenyCancelAbility {
253        calls: Mutex<Vec<(lash_core::ProcessCancelSource, String)>>,
254    }
255
256    impl DenyCancelAbility {
257        fn calls(&self) -> Vec<(lash_core::ProcessCancelSource, String)> {
258            self.calls.lock().expect("cancel calls").clone()
259        }
260    }
261
262    #[async_trait::async_trait]
263    impl lash_core::ProcessCancelAbility for DenyCancelAbility {
264        async fn cancel(
265            &self,
266            _processes: &dyn lash_core::ProcessService,
267            request: lash_core::ProcessCancelRequest<'_>,
268        ) -> Result<lash_core::ProcessRecord, PluginError> {
269            self.calls
270                .lock()
271                .expect("cancel calls")
272                .push((request.source, request.process_id.to_string()));
273            Err(PluginError::Session("denied by host".to_string()))
274        }
275    }
276
277    fn context_with_cancel_ability(
278        ability: Arc<dyn lash_core::ProcessCancelAbility>,
279    ) -> lash_core::ToolContext<'static> {
280        let manager = Arc::new(lash_core::testing::MockSessionManager::default());
281        lash_core::ToolContext::__for_testing_with_process_cancel_ability(
282            "session".to_string(),
283            manager.clone(),
284            manager.clone(),
285            manager,
286            Arc::new(lash_core::UnavailableProcessService),
287            ability,
288            Arc::new(lash_core::InMemoryAttachmentStore::new()),
289            lash_core::DirectCompletionClient::from_fn(|_, _| {
290                Err(PluginError::Session(
291                    "direct completions are unavailable in this test context".to_string(),
292                ))
293            }),
294            None,
295        )
296    }
297
298    #[test]
299    fn tool_definitions_expose_process_control_tools() {
300        let names = process_control_tool_definitions(true)
301            .into_iter()
302            .map(|tool| tool.name().to_string())
303            .collect::<Vec<_>>();
304
305        assert_eq!(names, vec!["list_process_handles", "cancel_process"]);
306    }
307
308    #[test]
309    fn cancel_process_definition_is_callable_when_registered() {
310        let definition = process_cancel_tool_definition();
311        assert_eq!(
312            definition.effective_availability(),
313            lash_core::ToolAvailability::Callable
314        );
315        let rendered = definition.compact_contract().render_signature();
316        assert!(rendered.contains("status: enum["), "{rendered}");
317        assert!(!rendered.contains("terminal:"), "{rendered}");
318    }
319
320    #[test]
321    fn list_process_contract_returns_handle_array() {
322        let definition = process_list_tool_definition();
323
324        assert_eq!(
325            definition.contract.output_schema["type"],
326            serde_json::json!("array")
327        );
328        let rendered = definition.compact_contract().render_signature();
329        assert!(rendered.contains("-> list[record{"), "{rendered}");
330        assert!(rendered.contains("__handle__"), "{rendered}");
331        assert!(rendered.contains("process_id"), "{rendered}");
332        assert!(rendered.contains("definition"), "{rendered}");
333        assert!(rendered.contains("status: enum["), "{rendered}");
334        assert!(rendered.contains("status?: enum["), "{rendered}");
335        assert!(rendered.contains("definition?: record"), "{rendered}");
336        assert!(!rendered.contains("history"), "{rendered}");
337        assert!(!rendered.contains("terminal:"), "{rendered}");
338    }
339
340    #[test]
341    fn plugin_registers_cancel_when_configured_and_omits_it_otherwise() {
342        let standard_session =
343            lash_core::PluginHost::new(
344                std::iter::once(
345                    Arc::new(ProcessControlsPluginFactory::new()) as Arc<dyn PluginFactory>
346                )
347                .chain(lash_core::testing::test_standard_protocol_factories())
348                .collect(),
349            )
350            .build_session("standard", None)
351            .expect("standard session");
352        let standard_names = standard_session
353            .tool_surface("standard")
354            .expect("standard surface")
355            .tool_names()
356            .as_ref()
357            .clone();
358
359        let rlm_session = lash_core::PluginHost::new(
360            std::iter::once(
361                Arc::new(ProcessControlsPluginFactory::without_cancel_process())
362                    as Arc<dyn PluginFactory>,
363            )
364            .chain(lash_core::testing::test_rlm_protocol_factories())
365            .collect(),
366        )
367        .build_session("rlm", None)
368        .expect("rlm session");
369        let rlm_names = rlm_session
370            .tool_surface("rlm")
371            .expect("rlm surface")
372            .tool_names()
373            .as_ref()
374            .clone();
375
376        assert!(standard_names.contains(&"list_process_handles".to_string()));
377        assert!(standard_names.contains(&"cancel_process".to_string()));
378        assert!(rlm_names.contains(&"list_process_handles".to_string()));
379        assert!(!rlm_names.contains(&"cancel_process".to_string()));
380    }
381
382    #[tokio::test]
383    async fn cancel_process_tool_uses_host_cancel_ability() {
384        let ability = Arc::new(DenyCancelAbility::default());
385        let context = context_with_cancel_ability(ability.clone());
386
387        let result = execute_process_cancel_tool_call(
388            &context,
389            &serde_json::json!({ "process_id": "process-1" }),
390        )
391        .await;
392
393        assert!(!result.is_success());
394        assert_eq!(
395            result.value_for_projection(),
396            serde_json::json!("plugin session error: denied by host")
397        );
398        assert_eq!(
399            ability.calls(),
400            vec![(
401                lash_core::ProcessCancelSource::Tool,
402                "process-1".to_string()
403            )]
404        );
405    }
406}