Skip to main content

lash_core/session/
process_handles.rs

1use serde_json::json;
2
3use super::execution_context::RuntimeExecutionContext;
4use super::tool_execution::ToolInvocationReply;
5use crate::tool_dispatch::ToolPreparationOutcome;
6use crate::{
7    ProcessHandleDescriptor, ProcessInput, ProcessRegistration, ToolCallOutput, ToolCallRecord,
8};
9
10const PROCESS_HANDLE_KIND: &str = "process";
11
12impl RuntimeExecutionContext<'_> {
13    pub(super) fn process_handle_value(id: &str, tool_name: &str) -> serde_json::Value {
14        let _ = tool_name;
15        Self::process_handle_json(id)
16    }
17
18    pub fn process_handle_json(id: &str) -> serde_json::Value {
19        json!({
20            "__handle__": "process",
21            "id": id,
22        })
23    }
24
25    pub(super) fn process_status_value(status: &crate::ProcessRecord) -> serde_json::Value {
26        json!({
27            "process_id": status.id,
28            "status": status.status.label(),
29        })
30    }
31
32    pub(super) fn parse_process_handle(
33        handle: &serde_json::Value,
34    ) -> Result<(String, Option<String>), String> {
35        let kind = handle
36            .get("__handle__")
37            .and_then(|value| value.as_str())
38            .ok_or_else(|| "Invalid process handle: missing `__handle__`".to_string())?;
39        if kind != PROCESS_HANDLE_KIND {
40            return Err(format!("Invalid process handle kind: {kind}"));
41        }
42        let id = handle
43            .get("id")
44            .and_then(|value| value.as_str())
45            .filter(|value| !value.is_empty())
46            .ok_or_else(|| "Invalid process handle: missing `id`".to_string())?;
47        let tool_name = handle
48            .get("tool")
49            .and_then(|value| value.as_str())
50            .map(str::to_string);
51        Ok((id.to_string(), tool_name))
52    }
53
54    pub(super) async fn start_tool_process(
55        &self,
56        call_id: String,
57        tool_name: String,
58        args: serde_json::Value,
59    ) -> ToolInvocationReply {
60        let handle_id = call_id.clone();
61        let pending_call = crate::sansio::PendingToolCall {
62            call_id: call_id.clone(),
63            tool_name: tool_name.clone(),
64            args: args.clone(),
65            replay: None,
66        };
67        let prepared_call = match self.prepare_tool_call(pending_call).await {
68            ToolPreparationOutcome::Prepared(prepared) => prepared,
69            ToolPreparationOutcome::Completed(outcome) => {
70                let mut record = outcome.record;
71                record.call_id = Some(call_id);
72                return ToolInvocationReply::from_output(record.output.clone()).with_record(record);
73            }
74        };
75        let registration = ProcessRegistration::session_start_draft(
76            handle_id.clone(),
77            ProcessInput::ToolCall {
78                call: prepared_call.clone(),
79            },
80            // Tool-call rows are journaled and idempotent by process id, so
81            // recovery may re-execute them (ADR 0019).
82            crate::RecoveryDisposition::Rerunnable,
83        );
84        let registration = match self
85            .attach_captured_process_execution_env(registration)
86            .await
87        {
88            Ok(registration) => registration,
89            Err(err) => return ToolInvocationReply::error(json!(err.to_string())),
90        };
91        if let Err(err) =
92            self.dispatch
93                .processes
94                .start(
95                    &self.session_id,
96                    registration,
97                    crate::ProcessStartOptions::new().with_descriptor(
98                        ProcessHandleDescriptor::new(Some("tool"), Some(tool_name.clone())),
99                    ),
100                    self.process_scope(self.parent_invocation.clone()),
101                )
102                .await
103        {
104            return ToolInvocationReply::error(json!(err.to_string()));
105        }
106
107        let handle_value = Self::process_handle_value(&handle_id, &tool_name);
108        let record = ToolCallRecord {
109            call_id: Some(call_id),
110            tool: prepared_call.tool_name,
111            args: prepared_call.args,
112            output: ToolCallOutput::success(handle_value.clone()),
113            duration_ms: 0,
114        };
115        ToolInvocationReply::success(handle_value).with_record(record)
116    }
117
118    fn elapsed_ms(&self, started: std::time::Instant) -> u64 {
119        self.dispatch
120            .clock
121            .now()
122            .duration_since(started)
123            .as_millis() as u64
124    }
125
126    fn recorded_process_reply(
127        call_id: String,
128        tool: impl Into<String>,
129        args: serde_json::Value,
130        output: ToolCallOutput,
131        duration_ms: u64,
132    ) -> ToolInvocationReply {
133        let record = ToolCallRecord {
134            call_id: Some(call_id),
135            tool: tool.into(),
136            args,
137            output: output.clone(),
138            duration_ms,
139        };
140        ToolInvocationReply::from_output(output).with_record(record)
141    }
142
143    fn recorded_process_error(
144        call_id: String,
145        tool: &'static str,
146        args: serde_json::Value,
147        message: impl Into<String>,
148        duration_ms: u64,
149    ) -> ToolInvocationReply {
150        let output = ToolInvocationReply::error(json!(message.into())).output;
151        Self::recorded_process_reply(call_id, tool, args, output, duration_ms)
152    }
153
154    pub(super) async fn await_process_handle(
155        &self,
156        call_id: String,
157        handle: serde_json::Value,
158    ) -> ToolInvocationReply {
159        let started = self.dispatch.clock.now();
160        let args = json!({ "handle": handle.clone() });
161        let (handle_id, _hinted_tool_name) = match Self::parse_process_handle(&handle) {
162            Ok(parsed) => parsed,
163            Err(err) => {
164                return Self::recorded_process_error(
165                    call_id,
166                    "await_process",
167                    args,
168                    err,
169                    self.elapsed_ms(started),
170                );
171            }
172        };
173        // Possession of a handle this run created is sufficient capability;
174        // session grant visibility only gates handles that arrived from
175        // elsewhere (run-local children carry no grants by design).
176        if !self.is_run_local_process(&handle_id)
177            && let Err(err) = self
178                .dispatch
179                .processes
180                .validate_visible(
181                    &self.session_id,
182                    std::slice::from_ref(&handle_id),
183                    self.process_scope(self.parent_invocation.clone()),
184                )
185                .await
186        {
187            return Self::recorded_process_error(
188                call_id,
189                "await_process",
190                args,
191                err.to_string(),
192                self.elapsed_ms(started),
193            );
194        }
195        let output = self
196            .await_process_with_cancellation(
197                &handle_id,
198                self.parent_invocation.clone(),
199                self.cancellation_token.clone(),
200            )
201            .await;
202        let output = match output {
203            Ok(output) => output.into_tool_output(),
204            Err(err) => ToolInvocationReply::error(json!(err.to_string())).output,
205        };
206        Self::recorded_process_reply(
207            call_id,
208            "await_process",
209            args,
210            output,
211            self.elapsed_ms(started),
212        )
213    }
214
215    pub(super) async fn signal_process_handle(
216        &self,
217        call_id: String,
218        handle: serde_json::Value,
219        signal_name: String,
220        payload: serde_json::Value,
221    ) -> ToolInvocationReply {
222        let started = self.dispatch.clock.now();
223        let args = json!({
224            "handle": handle.clone(),
225            "signal_name": signal_name.clone(),
226            "payload": payload.clone()
227        });
228        let (handle_id, _hinted_tool_name) = match Self::parse_process_handle(&handle) {
229            Ok(parsed) => parsed,
230            Err(err) => {
231                return Self::recorded_process_error(
232                    call_id,
233                    "signal_process",
234                    args,
235                    err,
236                    self.elapsed_ms(started),
237                );
238            }
239        };
240        let signal_id = format!("process-{call_id}");
241        let output = match self
242            .dispatch
243            .processes
244            .signal(
245                &self.session_id,
246                &handle_id,
247                signal_name,
248                signal_id,
249                payload,
250                self.process_scope(self.parent_invocation.clone()),
251            )
252            .await
253        {
254            Ok(event) => ToolCallOutput::success(json!({
255                "process_id": event.process_id,
256                "sequence": event.sequence,
257            })),
258            Err(err) => ToolInvocationReply::error(json!(format!("signal failed: {err}"))).output,
259        };
260        Self::recorded_process_reply(
261            call_id,
262            "signal_process",
263            args,
264            output,
265            self.elapsed_ms(started),
266        )
267    }
268
269    pub(super) async fn cancel_process_handle(
270        &self,
271        call_id: String,
272        handle: serde_json::Value,
273    ) -> ToolInvocationReply {
274        let started = self.dispatch.clock.now();
275        let args = json!({ "handle": handle.clone() });
276        let (handle_id, _hinted_tool_name) = match Self::parse_process_handle(&handle) {
277            Ok(parsed) => parsed,
278            Err(err) => {
279                return Self::recorded_process_error(
280                    call_id,
281                    "cancel_process",
282                    args,
283                    err,
284                    self.elapsed_ms(started),
285                );
286            }
287        };
288        // Run-local children bypass the grant-validating cancel ability:
289        // possession of the handle this run created is the capability, and
290        // these children carry no session grants by design.
291        let result = if self.is_run_local_process(&handle_id) {
292            self.dispatch
293                .processes
294                .cancel(
295                    &self.session_id,
296                    &handle_id,
297                    self.process_scope(self.parent_invocation.clone()),
298                )
299                .await
300        } else {
301            self.dispatch
302                .process_cancel_ability
303                .cancel(
304                    self.dispatch.processes.as_ref(),
305                    crate::ProcessCancelRequest::new(
306                        &self.session_id,
307                        &handle_id,
308                        self.process_scope(self.parent_invocation.clone()),
309                        crate::ProcessCancelSource::Process,
310                    )
311                    .with_handle(handle)
312                    .with_reason("requested by process handle"),
313                )
314                .await
315        };
316        let output = match result {
317            Ok(status) => ToolCallOutput::success(Self::process_status_value(&status)),
318            Err(err) => ToolInvocationReply::error(json!(format!("cancel failed: {err}"))).output,
319        };
320        Self::recorded_process_reply(
321            call_id,
322            "cancel_process",
323            args,
324            output,
325            self.elapsed_ms(started),
326        )
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333    use crate::plugin::PluginHost;
334    use crate::runtime::RuntimeEffectControllerHandle;
335    use crate::tool_dispatch::ToolDispatchContext;
336    use crate::{
337        PreparedToolCall, ProcessRegistry, ToolCall, ToolDefinition, ToolPrepareCall, ToolProvider,
338        ToolResult,
339    };
340    use std::collections::BTreeMap;
341    use std::sync::Arc;
342    use std::sync::Mutex;
343    use std::sync::atomic::{AtomicUsize, Ordering};
344
345    struct PrepareRecordingTool {
346        prepares: Arc<AtomicUsize>,
347    }
348
349    #[derive(Default)]
350    struct DenyCancelAbility {
351        calls: Mutex<Vec<(crate::ProcessCancelSource, String)>>,
352    }
353
354    impl DenyCancelAbility {
355        fn calls(&self) -> Vec<(crate::ProcessCancelSource, String)> {
356            self.calls.lock().expect("cancel calls").clone()
357        }
358    }
359
360    #[async_trait::async_trait]
361    impl crate::ProcessCancelAbility for DenyCancelAbility {
362        async fn cancel(
363            &self,
364            _processes: &dyn crate::ProcessService,
365            request: crate::ProcessCancelRequest<'_>,
366        ) -> Result<crate::ProcessRecord, crate::PluginError> {
367            self.calls
368                .lock()
369                .expect("cancel calls")
370                .push((request.source, request.process_id.to_string()));
371            Err(crate::PluginError::Session("denied by host".to_string()))
372        }
373    }
374
375    fn process_tool_definition() -> ToolDefinition {
376        ToolDefinition::raw(
377            "tool:process_prepare",
378            "process_prepare",
379            "Records preparation before background registration.",
380            serde_json::json!({
381                "type": "object",
382                "properties": {
383                    "input": { "type": "string" }
384                },
385                "additionalProperties": false
386            }),
387            serde_json::json!({ "type": "object", "additionalProperties": true }),
388        )
389    }
390
391    #[async_trait::async_trait]
392    impl ToolProvider for PrepareRecordingTool {
393        fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
394            vec![process_tool_definition().manifest()]
395        }
396
397        fn resolve_contract(&self, name: &str) -> Option<Arc<crate::ToolContract>> {
398            (name == "process_prepare").then(|| Arc::new(process_tool_definition().contract()))
399        }
400
401        async fn prepare_tool_call(
402            &self,
403            call: ToolPrepareCall<'_>,
404        ) -> Result<PreparedToolCall, ToolResult> {
405            self.prepares.fetch_add(1, Ordering::SeqCst);
406            Ok(PreparedToolCall::from_parts(
407                call.pending.call_id,
408                call.tool_id,
409                call.pending.tool_name,
410                call.pending.args,
411                call.pending.replay,
412                serde_json::json!({ "prepared": true }),
413            ))
414        }
415
416        async fn execute(&self, call: ToolCall<'_>) -> ToolResult {
417            ToolResult::ok(serde_json::json!({
418                "payload": call.context.prepared_payload().clone(),
419            }))
420        }
421    }
422
423    #[tokio::test]
424    async fn process_handle_start_registers_prepared_tool_call() {
425        let prepares = Arc::new(AtomicUsize::new(0));
426        let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
427            prepares: Arc::clone(&prepares),
428        });
429        let plugins = PluginHost::empty()
430            .build_session("root", None)
431            .expect("plugin session");
432        let tools = Arc::clone(&provider);
433        let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
434            provider.tool_manifests(),
435            BTreeMap::new(),
436        ));
437        let host = Arc::new(crate::testing::MockSessionManager::default());
438        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
439        let dispatch = Arc::new(ToolDispatchContext {
440            plugins,
441            tools,
442            tool_catalog,
443            sessions: host.clone(),
444            session_lifecycle: host.clone(),
445            session_graph: host.clone(),
446            processes: host.clone(),
447            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
448            trigger_router: None,
449            effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
450                crate::InlineRuntimeEffectController,
451            )),
452            direct_completions: crate::DirectCompletionClient::unavailable(
453                "direct completions are unavailable in this test context",
454            ),
455            parent_invocation: None,
456            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
457                crate::PluginOptions::default(),
458                crate::SessionPolicy::default(),
459            ),
460            session_id: "session".to_string(),
461            agent_frame_id: String::new(),
462            event_tx,
463            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
464            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
465            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
466            turn_context: crate::TurnContext::default(),
467            clock: std::sync::Arc::new(crate::SystemClock),
468        });
469        let context = RuntimeExecutionContext::new(
470            "session".to_string(),
471            dispatch,
472            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
473            Arc::new(crate::InMemoryAttachmentStore::new()),
474            Arc::new(crate::ChronologicalProjection::default()),
475            None,
476            crate::TurnContext::default(),
477        )
478        .with_execution_env_spec(crate::ProcessExecutionEnvSpec::new(
479            crate::PluginOptions::default(),
480            crate::runtime::tests::helpers::standard_test_policy(),
481        ));
482
483        let started = context
484            .start_tool_process(
485                "async-call-1".to_string(),
486                "process_prepare".to_string(),
487                serde_json::json!({ "input": "live" }),
488            )
489            .await;
490        let crate::ToolCallOutcome::Success(handle) = started.output.outcome else {
491            panic!("expected process handle output");
492        };
493        assert_eq!(
494            handle
495                .to_json_value()
496                .get("id")
497                .and_then(|value| value.as_str()),
498            Some("async-call-1")
499        );
500        assert_eq!(prepares.load(Ordering::SeqCst), 1);
501        let record = host
502            .process_registry
503            .get_process("async-call-1")
504            .await
505            .expect("registered process");
506        let ProcessInput::ToolCall { call } = record.input.as_ref() else {
507            panic!("expected prepared tool call process input");
508        };
509        assert_eq!(call.tool_name, "process_prepare");
510        assert_eq!(call.args, serde_json::json!({ "input": "live" }));
511        assert_eq!(
512            call.prepared_payload,
513            serde_json::json!({ "prepared": true })
514        );
515
516        let awaited = context
517            .await_process_handle("await-async-call-1".to_string(), handle.to_json_value())
518            .await;
519
520        assert!(awaited.output.is_success());
521        let record = awaited.record.expect("await record");
522        assert_eq!(record.call_id.as_deref(), Some("await-async-call-1"));
523        assert_eq!(record.tool, "await_process");
524    }
525
526    #[tokio::test]
527    async fn process_handle_signal_appends_event_from_foreground() {
528        let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
529            prepares: Arc::new(AtomicUsize::new(0)),
530        });
531        let plugins = PluginHost::empty()
532            .build_session("root", None)
533            .expect("plugin session");
534        let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
535            provider.tool_manifests(),
536            BTreeMap::new(),
537        ));
538        let host = Arc::new(crate::testing::MockSessionManager::default());
539        host.process_registry
540            .register_process(
541                ProcessRegistration::new(
542                    "target-process",
543                    ProcessInput::External {
544                        metadata: serde_json::Value::Null,
545                    },
546                    crate::RecoveryDisposition::ExternallyOwned,
547                    crate::ProcessProvenance::host(),
548                )
549                .with_extra_event_types([crate::ProcessEventType {
550                    name: "signal.ready".to_string(),
551                    payload_schema: crate::LashSchema::any(),
552                    semantics: crate::ProcessEventSemanticsSpec::default(),
553                }]),
554            )
555            .await
556            .expect("register target process");
557        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
558        let dispatch = Arc::new(ToolDispatchContext {
559            plugins,
560            tools: provider,
561            tool_catalog,
562            sessions: host.clone(),
563            session_lifecycle: host.clone(),
564            session_graph: host.clone(),
565            processes: host.clone(),
566            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
567            trigger_router: None,
568            effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
569                crate::InlineRuntimeEffectController,
570            )),
571            direct_completions: crate::DirectCompletionClient::unavailable(
572                "direct completions are unavailable in this test context",
573            ),
574            parent_invocation: None,
575            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
576                crate::PluginOptions::default(),
577                crate::SessionPolicy::default(),
578            ),
579            session_id: "session".to_string(),
580            agent_frame_id: String::new(),
581            event_tx,
582            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
583            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
584            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
585            turn_context: crate::TurnContext::default(),
586            clock: std::sync::Arc::new(crate::SystemClock),
587        });
588        let context = RuntimeExecutionContext::new(
589            "session".to_string(),
590            dispatch,
591            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
592            Arc::new(crate::InMemoryAttachmentStore::new()),
593            Arc::new(crate::ChronologicalProjection::default()),
594            None,
595            crate::TurnContext::default(),
596        );
597
598        let handle = json!({ "__handle__": "process", "id": "target-process" });
599        let signalled = context
600            .signal_process_handle(
601                "signal-1".to_string(),
602                handle,
603                "ready".to_string(),
604                json!({ "kind": "ping" }),
605            )
606            .await;
607
608        assert!(
609            signalled.output.is_success(),
610            "{:?}",
611            signalled.output.value_for_projection()
612        );
613        let record = signalled.record.expect("signal record");
614        assert_eq!(record.call_id.as_deref(), Some("signal-1"));
615        assert_eq!(record.tool, "signal_process");
616        let events = host
617            .process_registry
618            .events_after("target-process", 0)
619            .await
620            .expect("list events");
621        assert!(
622            events.iter().any(|event| event.event_type == "signal.ready"
623                && event.payload.get("kind") == Some(&json!("ping"))),
624            "expected appended signal.ready event, got {events:?}"
625        );
626    }
627
628    #[tokio::test]
629    async fn process_handle_await_and_cancel_require_session_grant() {
630        let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
631            prepares: Arc::new(AtomicUsize::new(0)),
632        });
633        let plugins = PluginHost::empty()
634            .build_session("root", None)
635            .expect("plugin session");
636        let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
637            provider.tool_manifests(),
638            BTreeMap::new(),
639        ));
640        let host = Arc::new(crate::testing::MockSessionManager::default());
641        host.process_registry
642            .register_process(ProcessRegistration::new(
643                "hidden-process",
644                ProcessInput::External {
645                    metadata: serde_json::Value::Null,
646                },
647                crate::RecoveryDisposition::ExternallyOwned,
648                crate::ProcessProvenance::host(),
649            ))
650            .await
651            .expect("register hidden process");
652        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
653        let dispatch = Arc::new(ToolDispatchContext {
654            plugins,
655            tools: provider,
656            tool_catalog,
657            sessions: host.clone(),
658            session_lifecycle: host.clone(),
659            session_graph: host.clone(),
660            processes: host.clone(),
661            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
662            trigger_router: None,
663            effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
664                crate::InlineRuntimeEffectController,
665            )),
666            direct_completions: crate::DirectCompletionClient::unavailable(
667                "direct completions are unavailable in this test context",
668            ),
669            parent_invocation: None,
670            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
671                crate::PluginOptions::default(),
672                crate::SessionPolicy::default(),
673            ),
674            session_id: "session".to_string(),
675            agent_frame_id: String::new(),
676            event_tx,
677            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
678            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
679            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
680            turn_context: crate::TurnContext::default(),
681            clock: std::sync::Arc::new(crate::SystemClock),
682        });
683        let context = RuntimeExecutionContext::new(
684            "session".to_string(),
685            dispatch,
686            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
687            Arc::new(crate::InMemoryAttachmentStore::new()),
688            Arc::new(crate::ChronologicalProjection::default()),
689            None,
690            crate::TurnContext::default(),
691        );
692        let handle = json!({
693            "__handle__": "process",
694            "id": "hidden-process"
695        });
696
697        let awaited = context
698            .await_process_handle("await-hidden-process".to_string(), handle.clone())
699            .await;
700        let cancelled = context
701            .cancel_process_handle("cancel-hidden-process".to_string(), handle)
702            .await;
703
704        assert!(!awaited.output.is_success());
705        assert!(!cancelled.output.is_success());
706        assert_eq!(
707            awaited
708                .record
709                .as_ref()
710                .and_then(|record| record.call_id.as_deref()),
711            Some("await-hidden-process")
712        );
713        assert_eq!(
714            cancelled
715                .record
716                .as_ref()
717                .and_then(|record| record.call_id.as_deref()),
718            Some("cancel-hidden-process")
719        );
720    }
721
722    #[tokio::test]
723    async fn process_handle_cancel_uses_host_cancel_ability() {
724        let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
725            prepares: Arc::new(AtomicUsize::new(0)),
726        });
727        let plugins = PluginHost::empty()
728            .build_session("root", None)
729            .expect("plugin session");
730        let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
731            provider.tool_manifests(),
732            BTreeMap::new(),
733        ));
734        let host = Arc::new(crate::testing::MockSessionManager::default());
735        let ability = Arc::new(DenyCancelAbility::default());
736        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
737        let dispatch = Arc::new(ToolDispatchContext {
738            plugins,
739            tools: provider,
740            tool_catalog,
741            sessions: host.clone(),
742            session_lifecycle: host.clone(),
743            session_graph: host,
744            processes: Arc::new(crate::UnavailableProcessService),
745            process_cancel_ability: ability.clone(),
746            trigger_router: None,
747            effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
748                crate::InlineRuntimeEffectController,
749            )),
750            direct_completions: crate::DirectCompletionClient::unavailable(
751                "direct completions are unavailable in this test context",
752            ),
753            parent_invocation: None,
754            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
755                crate::PluginOptions::default(),
756                crate::SessionPolicy::default(),
757            ),
758            session_id: "session".to_string(),
759            agent_frame_id: String::new(),
760            event_tx,
761            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
762            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
763            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
764            turn_context: crate::TurnContext::default(),
765            clock: std::sync::Arc::new(crate::SystemClock),
766        });
767        let context = RuntimeExecutionContext::new(
768            "session".to_string(),
769            dispatch,
770            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
771            Arc::new(crate::InMemoryAttachmentStore::new()),
772            Arc::new(crate::ChronologicalProjection::default()),
773            None,
774            crate::TurnContext::default(),
775        );
776
777        let cancelled = context
778            .cancel_process_handle(
779                "cancel-process-1".to_string(),
780                json!({
781                    "__handle__": "process",
782                    "id": "process-1"
783                }),
784            )
785            .await;
786
787        assert!(!cancelled.output.is_success());
788        assert_eq!(
789            cancelled.output.value_for_projection()["message"],
790            json!("cancel failed: plugin session error: denied by host")
791        );
792        assert_eq!(
793            ability.calls(),
794            vec![(crate::ProcessCancelSource::Process, "process-1".to_string())]
795        );
796    }
797}