Skip to main content

lash_core/session/
process_handles.rs

1use serde_json::json;
2
3use super::execution_context::RuntimeExecutionContext;
4use super::tool_execution::ToolInvocationReply;
5use crate::tool_dispatch::ToolPreparationOutcome;
6use crate::{
7    ProcessHandleDescriptor, ProcessInput, ProcessRegistration, ToolCallOutput, ToolCallRecord,
8};
9
10const PROCESS_HANDLE_KIND: &str = "process";
11
12impl RuntimeExecutionContext<'_> {
13    pub(super) fn process_handle_value(id: &str, tool_name: &str) -> serde_json::Value {
14        let _ = tool_name;
15        Self::process_handle_json(id)
16    }
17
18    pub fn process_handle_json(id: &str) -> serde_json::Value {
19        json!({
20            "__handle__": "process",
21            "id": id,
22        })
23    }
24
25    pub(super) fn process_status_value(status: &crate::ProcessRecord) -> serde_json::Value {
26        json!({
27            "process_id": status.id,
28            "status": status.status.label(),
29        })
30    }
31
32    pub(super) fn parse_process_handle(
33        handle: &serde_json::Value,
34    ) -> Result<(String, Option<String>), String> {
35        let kind = handle
36            .get("__handle__")
37            .and_then(|value| value.as_str())
38            .ok_or_else(|| "Invalid process handle: missing `__handle__`".to_string())?;
39        if kind != PROCESS_HANDLE_KIND {
40            return Err(format!("Invalid process handle kind: {kind}"));
41        }
42        let id = handle
43            .get("id")
44            .and_then(|value| value.as_str())
45            .filter(|value| !value.is_empty())
46            .ok_or_else(|| "Invalid process handle: missing `id`".to_string())?;
47        let tool_name = handle
48            .get("tool")
49            .and_then(|value| value.as_str())
50            .map(str::to_string);
51        Ok((id.to_string(), tool_name))
52    }
53
54    pub(super) async fn start_tool_process(
55        &self,
56        call_id: String,
57        tool_name: String,
58        args: serde_json::Value,
59    ) -> ToolInvocationReply {
60        let handle_id = call_id.clone();
61        let pending_call = crate::sansio::PendingToolCall {
62            call_id: call_id.clone(),
63            tool_name: tool_name.clone(),
64            args: args.clone(),
65            replay: None,
66        };
67        let prepared_call = match self.prepare_tool_call(pending_call).await {
68            ToolPreparationOutcome::Prepared(prepared) => prepared,
69            ToolPreparationOutcome::Completed(outcome) => {
70                let mut record = outcome.record;
71                record.call_id = Some(call_id);
72                return ToolInvocationReply::from_output(record.output.clone()).with_record(record);
73            }
74        };
75        let registration = ProcessRegistration::session_start_draft(
76            handle_id.clone(),
77            ProcessInput::ToolCall {
78                call: prepared_call.clone(),
79            },
80        );
81        let registration = match self
82            .attach_captured_process_execution_env(registration)
83            .await
84        {
85            Ok(registration) => registration,
86            Err(err) => return ToolInvocationReply::error(json!(err.to_string())),
87        };
88        if let Err(err) =
89            self.dispatch
90                .processes
91                .start(
92                    &self.session_id,
93                    registration,
94                    crate::ProcessStartOptions::new().with_descriptor(
95                        ProcessHandleDescriptor::new(Some("tool"), Some(tool_name.clone())),
96                    ),
97                    self.process_scope(self.parent_invocation.clone()),
98                )
99                .await
100        {
101            return ToolInvocationReply::error(json!(err.to_string()));
102        }
103
104        let handle_value = Self::process_handle_value(&handle_id, &tool_name);
105        let record = ToolCallRecord {
106            call_id: Some(call_id),
107            tool: prepared_call.tool_name,
108            args: prepared_call.args,
109            output: ToolCallOutput::success(handle_value.clone()),
110            duration_ms: 0,
111        };
112        ToolInvocationReply::success(handle_value).with_record(record)
113    }
114
115    fn recorded_process_reply(
116        call_id: String,
117        tool: impl Into<String>,
118        args: serde_json::Value,
119        output: ToolCallOutput,
120        started: std::time::Instant,
121    ) -> ToolInvocationReply {
122        let record = ToolCallRecord {
123            call_id: Some(call_id),
124            tool: tool.into(),
125            args,
126            output: output.clone(),
127            duration_ms: started.elapsed().as_millis() as u64,
128        };
129        ToolInvocationReply::from_output(output).with_record(record)
130    }
131
132    fn recorded_process_error(
133        call_id: String,
134        tool: &'static str,
135        args: serde_json::Value,
136        message: impl Into<String>,
137        started: std::time::Instant,
138    ) -> ToolInvocationReply {
139        let output = ToolInvocationReply::error(json!(message.into())).output;
140        Self::recorded_process_reply(call_id, tool, args, output, started)
141    }
142
143    pub(super) async fn await_process_handle(
144        &self,
145        call_id: String,
146        handle: serde_json::Value,
147    ) -> ToolInvocationReply {
148        let started = std::time::Instant::now();
149        let args = json!({ "handle": handle.clone() });
150        let (handle_id, _hinted_tool_name) = match Self::parse_process_handle(&handle) {
151            Ok(parsed) => parsed,
152            Err(err) => {
153                return Self::recorded_process_error(call_id, "await_process", args, err, started);
154            }
155        };
156        // Possession of a handle this run created is sufficient capability;
157        // session grant visibility only gates handles that arrived from
158        // elsewhere (run-local children carry no grants by design).
159        if !self.is_run_local_process(&handle_id)
160            && let Err(err) = self
161                .dispatch
162                .processes
163                .validate_visible(
164                    &self.session_id,
165                    std::slice::from_ref(&handle_id),
166                    self.process_scope(self.parent_invocation.clone()),
167                )
168                .await
169        {
170            return Self::recorded_process_error(
171                call_id,
172                "await_process",
173                args,
174                err.to_string(),
175                started,
176            );
177        }
178        let output = self
179            .await_process_with_cancellation(
180                &handle_id,
181                self.parent_invocation.clone(),
182                self.cancellation_token.clone(),
183            )
184            .await;
185        let output = match output {
186            Ok(output) => output.into_tool_output(),
187            Err(err) => ToolInvocationReply::error(json!(err.to_string())).output,
188        };
189        Self::recorded_process_reply(call_id, "await_process", args, output, started)
190    }
191
192    pub(super) async fn signal_process_handle(
193        &self,
194        call_id: String,
195        handle: serde_json::Value,
196        signal_name: String,
197        payload: serde_json::Value,
198    ) -> ToolInvocationReply {
199        let started = std::time::Instant::now();
200        let args = json!({
201            "handle": handle.clone(),
202            "signal_name": signal_name.clone(),
203            "payload": payload.clone()
204        });
205        let (handle_id, _hinted_tool_name) = match Self::parse_process_handle(&handle) {
206            Ok(parsed) => parsed,
207            Err(err) => {
208                return Self::recorded_process_error(call_id, "signal_process", args, err, started);
209            }
210        };
211        let signal_id = format!("process-{call_id}");
212        let output = match self
213            .dispatch
214            .processes
215            .signal(
216                &self.session_id,
217                &handle_id,
218                signal_name,
219                signal_id,
220                payload,
221                self.process_scope(self.parent_invocation.clone()),
222            )
223            .await
224        {
225            Ok(event) => ToolCallOutput::success(json!({
226                "process_id": event.process_id,
227                "sequence": event.sequence,
228            })),
229            Err(err) => ToolInvocationReply::error(json!(format!("signal failed: {err}"))).output,
230        };
231        Self::recorded_process_reply(call_id, "signal_process", args, output, started)
232    }
233
234    pub(super) async fn cancel_process_handle(
235        &self,
236        call_id: String,
237        handle: serde_json::Value,
238    ) -> ToolInvocationReply {
239        let started = std::time::Instant::now();
240        let args = json!({ "handle": handle.clone() });
241        let (handle_id, _hinted_tool_name) = match Self::parse_process_handle(&handle) {
242            Ok(parsed) => parsed,
243            Err(err) => {
244                return Self::recorded_process_error(call_id, "cancel_process", args, err, started);
245            }
246        };
247        // Run-local children bypass the grant-validating cancel ability:
248        // possession of the handle this run created is the capability, and
249        // these children carry no session grants by design.
250        let result = if self.is_run_local_process(&handle_id) {
251            self.dispatch
252                .processes
253                .cancel(
254                    &self.session_id,
255                    &handle_id,
256                    self.process_scope(self.parent_invocation.clone()),
257                )
258                .await
259        } else {
260            self.dispatch
261                .process_cancel_ability
262                .cancel(
263                    self.dispatch.processes.as_ref(),
264                    crate::ProcessCancelRequest::new(
265                        &self.session_id,
266                        &handle_id,
267                        self.process_scope(self.parent_invocation.clone()),
268                        crate::ProcessCancelSource::Process,
269                    )
270                    .with_handle(handle)
271                    .with_reason("requested by process handle"),
272                )
273                .await
274        };
275        let output = match result {
276            Ok(status) => ToolCallOutput::success(Self::process_status_value(&status)),
277            Err(err) => ToolInvocationReply::error(json!(format!("cancel failed: {err}"))).output,
278        };
279        Self::recorded_process_reply(call_id, "cancel_process", args, output, started)
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use crate::plugin::PluginHost;
287    use crate::runtime::RuntimeEffectControllerHandle;
288    use crate::tool_dispatch::ToolDispatchContext;
289    use crate::{
290        PreparedToolCall, ProcessRegistry, ToolCall, ToolDefinition, ToolPrepareCall, ToolProvider,
291        ToolResult,
292    };
293    use std::collections::BTreeMap;
294    use std::sync::Arc;
295    use std::sync::Mutex;
296    use std::sync::atomic::{AtomicUsize, Ordering};
297
298    struct PrepareRecordingTool {
299        prepares: Arc<AtomicUsize>,
300    }
301
302    #[derive(Default)]
303    struct DenyCancelAbility {
304        calls: Mutex<Vec<(crate::ProcessCancelSource, String)>>,
305    }
306
307    impl DenyCancelAbility {
308        fn calls(&self) -> Vec<(crate::ProcessCancelSource, String)> {
309            self.calls.lock().expect("cancel calls").clone()
310        }
311    }
312
313    #[async_trait::async_trait]
314    impl crate::ProcessCancelAbility for DenyCancelAbility {
315        async fn cancel(
316            &self,
317            _processes: &dyn crate::ProcessService,
318            request: crate::ProcessCancelRequest<'_>,
319        ) -> Result<crate::ProcessRecord, crate::PluginError> {
320            self.calls
321                .lock()
322                .expect("cancel calls")
323                .push((request.source, request.process_id.to_string()));
324            Err(crate::PluginError::Session("denied by host".to_string()))
325        }
326    }
327
328    fn process_tool_definition() -> ToolDefinition {
329        ToolDefinition::raw(
330            "tool:process_prepare",
331            "process_prepare",
332            "Records preparation before background registration.",
333            serde_json::json!({
334                "type": "object",
335                "properties": {
336                    "input": { "type": "string" }
337                },
338                "additionalProperties": false
339            }),
340            serde_json::json!({ "type": "object", "additionalProperties": true }),
341        )
342    }
343
344    #[async_trait::async_trait]
345    impl ToolProvider for PrepareRecordingTool {
346        fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
347            vec![process_tool_definition().manifest()]
348        }
349
350        fn resolve_contract(&self, name: &str) -> Option<Arc<crate::ToolContract>> {
351            (name == "process_prepare").then(|| Arc::new(process_tool_definition().contract()))
352        }
353
354        async fn prepare_tool_call(
355            &self,
356            call: ToolPrepareCall<'_>,
357        ) -> Result<PreparedToolCall, ToolResult> {
358            self.prepares.fetch_add(1, Ordering::SeqCst);
359            Ok(PreparedToolCall::from_parts(
360                call.pending.call_id,
361                call.pending.tool_name,
362                call.pending.args,
363                call.pending.replay,
364                serde_json::json!({ "prepared": true }),
365            ))
366        }
367
368        async fn execute(&self, call: ToolCall<'_>) -> ToolResult {
369            ToolResult::ok(serde_json::json!({
370                "payload": call.context.prepared_payload().clone(),
371            }))
372        }
373    }
374
375    #[tokio::test]
376    async fn process_handle_start_registers_prepared_tool_call() {
377        let prepares = Arc::new(AtomicUsize::new(0));
378        let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
379            prepares: Arc::clone(&prepares),
380        });
381        let plugins = PluginHost::empty()
382            .build_session("root", None)
383            .expect("plugin session");
384        let tools = Arc::clone(&provider);
385        let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
386            provider.tool_manifests(),
387            BTreeMap::new(),
388        ));
389        let host = Arc::new(crate::testing::MockSessionManager::default());
390        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
391        let dispatch = Arc::new(ToolDispatchContext {
392            plugins,
393            tools,
394            tool_catalog,
395            sessions: host.clone(),
396            session_lifecycle: host.clone(),
397            session_graph: host.clone(),
398            processes: host.clone(),
399            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
400            trigger_router: None,
401            effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
402                crate::InlineRuntimeEffectController,
403            )),
404            direct_completions: crate::DirectCompletionClient::unavailable(
405                "direct completions are unavailable in this test context",
406            ),
407            parent_invocation: None,
408            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
409                crate::PluginOptions::default(),
410                crate::SessionPolicy::default(),
411            ),
412            session_id: "session".to_string(),
413            agent_frame_id: String::new(),
414            event_tx,
415            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
416            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
417            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
418            turn_context: crate::TurnContext::default(),
419        });
420        let context = RuntimeExecutionContext::new(
421            "session".to_string(),
422            dispatch,
423            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
424            Arc::new(crate::InMemoryAttachmentStore::new()),
425            Arc::new(crate::ChronologicalProjection::default()),
426            None,
427            crate::TurnContext::default(),
428        )
429        .with_execution_env_spec(crate::ProcessExecutionEnvSpec::new(
430            crate::PluginOptions::default(),
431            crate::runtime::tests::helpers::standard_test_policy(),
432        ));
433
434        let started = context
435            .start_tool_process(
436                "async-call-1".to_string(),
437                "process_prepare".to_string(),
438                serde_json::json!({ "input": "live" }),
439            )
440            .await;
441        let crate::ToolCallOutcome::Success(handle) = started.output.outcome else {
442            panic!("expected process handle output");
443        };
444        assert_eq!(
445            handle
446                .to_json_value()
447                .get("id")
448                .and_then(|value| value.as_str()),
449            Some("async-call-1")
450        );
451        assert_eq!(prepares.load(Ordering::SeqCst), 1);
452        let record = host
453            .process_registry
454            .get_process("async-call-1")
455            .await
456            .expect("registered process");
457        let ProcessInput::ToolCall { call } = record.input.as_ref() else {
458            panic!("expected prepared tool call process input");
459        };
460        assert_eq!(call.tool_name, "process_prepare");
461        assert_eq!(call.args, serde_json::json!({ "input": "live" }));
462        assert_eq!(
463            call.prepared_payload,
464            serde_json::json!({ "prepared": true })
465        );
466
467        let awaited = context
468            .await_process_handle("await-async-call-1".to_string(), handle.to_json_value())
469            .await;
470
471        assert!(awaited.output.is_success());
472        let record = awaited.record.expect("await record");
473        assert_eq!(record.call_id.as_deref(), Some("await-async-call-1"));
474        assert_eq!(record.tool, "await_process");
475    }
476
477    #[tokio::test]
478    async fn process_handle_signal_appends_event_from_foreground() {
479        let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
480            prepares: Arc::new(AtomicUsize::new(0)),
481        });
482        let plugins = PluginHost::empty()
483            .build_session("root", None)
484            .expect("plugin session");
485        let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
486            provider.tool_manifests(),
487            BTreeMap::new(),
488        ));
489        let host = Arc::new(crate::testing::MockSessionManager::default());
490        host.process_registry
491            .register_process(
492                ProcessRegistration::new(
493                    "target-process",
494                    ProcessInput::External {
495                        metadata: serde_json::Value::Null,
496                    },
497                    crate::ProcessProvenance::host(),
498                )
499                .with_extra_event_types([crate::ProcessEventType {
500                    name: "signal.ready".to_string(),
501                    payload_schema: crate::LashSchema::any(),
502                    semantics: crate::ProcessEventSemanticsSpec::default(),
503                }]),
504            )
505            .await
506            .expect("register target process");
507        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
508        let dispatch = Arc::new(ToolDispatchContext {
509            plugins,
510            tools: provider,
511            tool_catalog,
512            sessions: host.clone(),
513            session_lifecycle: host.clone(),
514            session_graph: host.clone(),
515            processes: host.clone(),
516            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
517            trigger_router: None,
518            effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
519                crate::InlineRuntimeEffectController,
520            )),
521            direct_completions: crate::DirectCompletionClient::unavailable(
522                "direct completions are unavailable in this test context",
523            ),
524            parent_invocation: None,
525            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
526                crate::PluginOptions::default(),
527                crate::SessionPolicy::default(),
528            ),
529            session_id: "session".to_string(),
530            agent_frame_id: String::new(),
531            event_tx,
532            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
533            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
534            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
535            turn_context: crate::TurnContext::default(),
536        });
537        let context = RuntimeExecutionContext::new(
538            "session".to_string(),
539            dispatch,
540            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
541            Arc::new(crate::InMemoryAttachmentStore::new()),
542            Arc::new(crate::ChronologicalProjection::default()),
543            None,
544            crate::TurnContext::default(),
545        );
546
547        let handle = json!({ "__handle__": "process", "id": "target-process" });
548        let signalled = context
549            .signal_process_handle(
550                "signal-1".to_string(),
551                handle,
552                "ready".to_string(),
553                json!({ "kind": "ping" }),
554            )
555            .await;
556
557        assert!(
558            signalled.output.is_success(),
559            "{:?}",
560            signalled.output.value_for_projection()
561        );
562        let record = signalled.record.expect("signal record");
563        assert_eq!(record.call_id.as_deref(), Some("signal-1"));
564        assert_eq!(record.tool, "signal_process");
565        let events = host
566            .process_registry
567            .events_after("target-process", 0)
568            .await
569            .expect("list events");
570        assert!(
571            events.iter().any(|event| event.event_type == "signal.ready"
572                && event.payload.get("kind") == Some(&json!("ping"))),
573            "expected appended signal.ready event, got {events:?}"
574        );
575    }
576
577    #[tokio::test]
578    async fn process_handle_await_and_cancel_require_session_grant() {
579        let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
580            prepares: Arc::new(AtomicUsize::new(0)),
581        });
582        let plugins = PluginHost::empty()
583            .build_session("root", None)
584            .expect("plugin session");
585        let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
586            provider.tool_manifests(),
587            BTreeMap::new(),
588        ));
589        let host = Arc::new(crate::testing::MockSessionManager::default());
590        host.process_registry
591            .register_process(ProcessRegistration::new(
592                "hidden-process",
593                ProcessInput::External {
594                    metadata: serde_json::Value::Null,
595                },
596                crate::ProcessProvenance::host(),
597            ))
598            .await
599            .expect("register hidden process");
600        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
601        let dispatch = Arc::new(ToolDispatchContext {
602            plugins,
603            tools: provider,
604            tool_catalog,
605            sessions: host.clone(),
606            session_lifecycle: host.clone(),
607            session_graph: host.clone(),
608            processes: host.clone(),
609            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
610            trigger_router: None,
611            effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
612                crate::InlineRuntimeEffectController,
613            )),
614            direct_completions: crate::DirectCompletionClient::unavailable(
615                "direct completions are unavailable in this test context",
616            ),
617            parent_invocation: None,
618            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
619                crate::PluginOptions::default(),
620                crate::SessionPolicy::default(),
621            ),
622            session_id: "session".to_string(),
623            agent_frame_id: String::new(),
624            event_tx,
625            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
626            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
627            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
628            turn_context: crate::TurnContext::default(),
629        });
630        let context = RuntimeExecutionContext::new(
631            "session".to_string(),
632            dispatch,
633            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
634            Arc::new(crate::InMemoryAttachmentStore::new()),
635            Arc::new(crate::ChronologicalProjection::default()),
636            None,
637            crate::TurnContext::default(),
638        );
639        let handle = json!({
640            "__handle__": "process",
641            "id": "hidden-process"
642        });
643
644        let awaited = context
645            .await_process_handle("await-hidden-process".to_string(), handle.clone())
646            .await;
647        let cancelled = context
648            .cancel_process_handle("cancel-hidden-process".to_string(), handle)
649            .await;
650
651        assert!(!awaited.output.is_success());
652        assert!(!cancelled.output.is_success());
653        assert_eq!(
654            awaited
655                .record
656                .as_ref()
657                .and_then(|record| record.call_id.as_deref()),
658            Some("await-hidden-process")
659        );
660        assert_eq!(
661            cancelled
662                .record
663                .as_ref()
664                .and_then(|record| record.call_id.as_deref()),
665            Some("cancel-hidden-process")
666        );
667    }
668
669    #[tokio::test]
670    async fn process_handle_cancel_uses_host_cancel_ability() {
671        let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
672            prepares: Arc::new(AtomicUsize::new(0)),
673        });
674        let plugins = PluginHost::empty()
675            .build_session("root", None)
676            .expect("plugin session");
677        let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
678            provider.tool_manifests(),
679            BTreeMap::new(),
680        ));
681        let host = Arc::new(crate::testing::MockSessionManager::default());
682        let ability = Arc::new(DenyCancelAbility::default());
683        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
684        let dispatch = Arc::new(ToolDispatchContext {
685            plugins,
686            tools: provider,
687            tool_catalog,
688            sessions: host.clone(),
689            session_lifecycle: host.clone(),
690            session_graph: host,
691            processes: Arc::new(crate::UnavailableProcessService),
692            process_cancel_ability: ability.clone(),
693            trigger_router: None,
694            effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
695                crate::InlineRuntimeEffectController,
696            )),
697            direct_completions: crate::DirectCompletionClient::unavailable(
698                "direct completions are unavailable in this test context",
699            ),
700            parent_invocation: None,
701            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
702                crate::PluginOptions::default(),
703                crate::SessionPolicy::default(),
704            ),
705            session_id: "session".to_string(),
706            agent_frame_id: String::new(),
707            event_tx,
708            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
709            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
710            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
711            turn_context: crate::TurnContext::default(),
712        });
713        let context = RuntimeExecutionContext::new(
714            "session".to_string(),
715            dispatch,
716            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
717            Arc::new(crate::InMemoryAttachmentStore::new()),
718            Arc::new(crate::ChronologicalProjection::default()),
719            None,
720            crate::TurnContext::default(),
721        );
722
723        let cancelled = context
724            .cancel_process_handle(
725                "cancel-process-1".to_string(),
726                json!({
727                    "__handle__": "process",
728                    "id": "process-1"
729                }),
730            )
731            .await;
732
733        assert!(!cancelled.output.is_success());
734        assert_eq!(
735            cancelled.output.value_for_projection()["message"],
736            json!("cancel failed: plugin session error: denied by host")
737        );
738        assert_eq!(
739            ability.calls(),
740            vec![(crate::ProcessCancelSource::Process, "process-1".to_string())]
741        );
742    }
743}