Skip to main content

lash_core/session/
process_handles.rs

1use serde_json::json;
2
3use super::execution_context::RuntimeExecutionContext;
4use super::tool_execution::ToolInvocationReply;
5use crate::tool_dispatch::ToolPreparationOutcome;
6use crate::{
7    ProcessHandleDescriptor, ProcessInput, ProcessRegistration, ToolCallOutput, ToolCallRecord,
8};
9
10const PROCESS_HANDLE_KIND: &str = "process";
11
12impl RuntimeExecutionContext<'_> {
13    pub(super) fn process_handle_value(id: &str, tool_name: &str) -> serde_json::Value {
14        let _ = tool_name;
15        Self::process_handle_json(id)
16    }
17
18    pub fn process_handle_json(id: &str) -> serde_json::Value {
19        json!({
20            "__handle__": "process",
21            "id": id,
22        })
23    }
24
25    pub(super) fn process_status_value(status: &crate::ProcessRecord) -> serde_json::Value {
26        json!({
27            "process_id": status.id,
28            "status": status.status.label(),
29        })
30    }
31
32    pub(super) fn parse_process_handle(
33        handle: &serde_json::Value,
34    ) -> Result<(String, Option<String>), String> {
35        let kind = handle
36            .get("__handle__")
37            .and_then(|value| value.as_str())
38            .ok_or_else(|| "Invalid process handle: missing `__handle__`".to_string())?;
39        if kind != PROCESS_HANDLE_KIND {
40            return Err(format!("Invalid process handle kind: {kind}"));
41        }
42        let id = handle
43            .get("id")
44            .and_then(|value| value.as_str())
45            .filter(|value| !value.is_empty())
46            .ok_or_else(|| "Invalid process handle: missing `id`".to_string())?;
47        let tool_name = handle
48            .get("tool")
49            .and_then(|value| value.as_str())
50            .map(str::to_string);
51        Ok((id.to_string(), tool_name))
52    }
53
54    pub(super) async fn start_tool_process(
55        &self,
56        call_id: String,
57        tool_name: String,
58        args: serde_json::Value,
59    ) -> ToolInvocationReply {
60        let handle_id = call_id.clone();
61        let pending_call = crate::sansio::PendingToolCall {
62            call_id: call_id.clone(),
63            tool_name: tool_name.clone(),
64            args: args.clone(),
65            replay: None,
66        };
67        let prepared_call = match self.prepare_tool_call(pending_call).await {
68            ToolPreparationOutcome::Prepared(prepared) => prepared,
69            ToolPreparationOutcome::Completed(outcome) => {
70                let mut record = outcome.record;
71                record.call_id = Some(call_id);
72                return ToolInvocationReply::from_output(record.output.clone()).with_record(record);
73            }
74        };
75        let registration = ProcessRegistration::session_start_draft(
76            handle_id.clone(),
77            ProcessInput::ToolCall {
78                call: prepared_call.clone(),
79            },
80        );
81        let registration = match self
82            .attach_captured_process_execution_env(registration)
83            .await
84        {
85            Ok(registration) => registration,
86            Err(err) => return ToolInvocationReply::error(json!(err.to_string())),
87        };
88        if let Err(err) =
89            self.dispatch
90                .processes
91                .start(
92                    &self.session_id,
93                    registration,
94                    crate::ProcessStartOptions::new().with_descriptor(
95                        ProcessHandleDescriptor::new(Some("tool"), Some(tool_name.clone())),
96                    ),
97                    self.process_scope(self.parent_invocation.clone()),
98                )
99                .await
100        {
101            return ToolInvocationReply::error(json!(err.to_string()));
102        }
103
104        let handle_value = Self::process_handle_value(&handle_id, &tool_name);
105        let record = ToolCallRecord {
106            call_id: Some(call_id),
107            tool: prepared_call.tool_name,
108            args: prepared_call.args,
109            output: ToolCallOutput::success(handle_value.clone()),
110            duration_ms: 0,
111        };
112        ToolInvocationReply::success(handle_value).with_record(record)
113    }
114
115    fn elapsed_ms(&self, started: std::time::Instant) -> u64 {
116        self.dispatch
117            .clock
118            .now()
119            .duration_since(started)
120            .as_millis() as u64
121    }
122
123    fn recorded_process_reply(
124        call_id: String,
125        tool: impl Into<String>,
126        args: serde_json::Value,
127        output: ToolCallOutput,
128        duration_ms: u64,
129    ) -> ToolInvocationReply {
130        let record = ToolCallRecord {
131            call_id: Some(call_id),
132            tool: tool.into(),
133            args,
134            output: output.clone(),
135            duration_ms,
136        };
137        ToolInvocationReply::from_output(output).with_record(record)
138    }
139
140    fn recorded_process_error(
141        call_id: String,
142        tool: &'static str,
143        args: serde_json::Value,
144        message: impl Into<String>,
145        duration_ms: u64,
146    ) -> ToolInvocationReply {
147        let output = ToolInvocationReply::error(json!(message.into())).output;
148        Self::recorded_process_reply(call_id, tool, args, output, duration_ms)
149    }
150
151    pub(super) async fn await_process_handle(
152        &self,
153        call_id: String,
154        handle: serde_json::Value,
155    ) -> ToolInvocationReply {
156        let started = self.dispatch.clock.now();
157        let args = json!({ "handle": handle.clone() });
158        let (handle_id, _hinted_tool_name) = match Self::parse_process_handle(&handle) {
159            Ok(parsed) => parsed,
160            Err(err) => {
161                return Self::recorded_process_error(
162                    call_id,
163                    "await_process",
164                    args,
165                    err,
166                    self.elapsed_ms(started),
167                );
168            }
169        };
170        // Possession of a handle this run created is sufficient capability;
171        // session grant visibility only gates handles that arrived from
172        // elsewhere (run-local children carry no grants by design).
173        if !self.is_run_local_process(&handle_id)
174            && let Err(err) = self
175                .dispatch
176                .processes
177                .validate_visible(
178                    &self.session_id,
179                    std::slice::from_ref(&handle_id),
180                    self.process_scope(self.parent_invocation.clone()),
181                )
182                .await
183        {
184            return Self::recorded_process_error(
185                call_id,
186                "await_process",
187                args,
188                err.to_string(),
189                self.elapsed_ms(started),
190            );
191        }
192        let output = self
193            .await_process_with_cancellation(
194                &handle_id,
195                self.parent_invocation.clone(),
196                self.cancellation_token.clone(),
197            )
198            .await;
199        let output = match output {
200            Ok(output) => output.into_tool_output(),
201            Err(err) => ToolInvocationReply::error(json!(err.to_string())).output,
202        };
203        Self::recorded_process_reply(
204            call_id,
205            "await_process",
206            args,
207            output,
208            self.elapsed_ms(started),
209        )
210    }
211
212    pub(super) async fn signal_process_handle(
213        &self,
214        call_id: String,
215        handle: serde_json::Value,
216        signal_name: String,
217        payload: serde_json::Value,
218    ) -> ToolInvocationReply {
219        let started = self.dispatch.clock.now();
220        let args = json!({
221            "handle": handle.clone(),
222            "signal_name": signal_name.clone(),
223            "payload": payload.clone()
224        });
225        let (handle_id, _hinted_tool_name) = match Self::parse_process_handle(&handle) {
226            Ok(parsed) => parsed,
227            Err(err) => {
228                return Self::recorded_process_error(
229                    call_id,
230                    "signal_process",
231                    args,
232                    err,
233                    self.elapsed_ms(started),
234                );
235            }
236        };
237        let signal_id = format!("process-{call_id}");
238        let output = match self
239            .dispatch
240            .processes
241            .signal(
242                &self.session_id,
243                &handle_id,
244                signal_name,
245                signal_id,
246                payload,
247                self.process_scope(self.parent_invocation.clone()),
248            )
249            .await
250        {
251            Ok(event) => ToolCallOutput::success(json!({
252                "process_id": event.process_id,
253                "sequence": event.sequence,
254            })),
255            Err(err) => ToolInvocationReply::error(json!(format!("signal failed: {err}"))).output,
256        };
257        Self::recorded_process_reply(
258            call_id,
259            "signal_process",
260            args,
261            output,
262            self.elapsed_ms(started),
263        )
264    }
265
266    pub(super) async fn cancel_process_handle(
267        &self,
268        call_id: String,
269        handle: serde_json::Value,
270    ) -> ToolInvocationReply {
271        let started = self.dispatch.clock.now();
272        let args = json!({ "handle": handle.clone() });
273        let (handle_id, _hinted_tool_name) = match Self::parse_process_handle(&handle) {
274            Ok(parsed) => parsed,
275            Err(err) => {
276                return Self::recorded_process_error(
277                    call_id,
278                    "cancel_process",
279                    args,
280                    err,
281                    self.elapsed_ms(started),
282                );
283            }
284        };
285        // Run-local children bypass the grant-validating cancel ability:
286        // possession of the handle this run created is the capability, and
287        // these children carry no session grants by design.
288        let result = if self.is_run_local_process(&handle_id) {
289            self.dispatch
290                .processes
291                .cancel(
292                    &self.session_id,
293                    &handle_id,
294                    self.process_scope(self.parent_invocation.clone()),
295                )
296                .await
297        } else {
298            self.dispatch
299                .process_cancel_ability
300                .cancel(
301                    self.dispatch.processes.as_ref(),
302                    crate::ProcessCancelRequest::new(
303                        &self.session_id,
304                        &handle_id,
305                        self.process_scope(self.parent_invocation.clone()),
306                        crate::ProcessCancelSource::Process,
307                    )
308                    .with_handle(handle)
309                    .with_reason("requested by process handle"),
310                )
311                .await
312        };
313        let output = match result {
314            Ok(status) => ToolCallOutput::success(Self::process_status_value(&status)),
315            Err(err) => ToolInvocationReply::error(json!(format!("cancel failed: {err}"))).output,
316        };
317        Self::recorded_process_reply(
318            call_id,
319            "cancel_process",
320            args,
321            output,
322            self.elapsed_ms(started),
323        )
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330    use crate::plugin::PluginHost;
331    use crate::runtime::RuntimeEffectControllerHandle;
332    use crate::tool_dispatch::ToolDispatchContext;
333    use crate::{
334        PreparedToolCall, ProcessRegistry, ToolCall, ToolDefinition, ToolPrepareCall, ToolProvider,
335        ToolResult,
336    };
337    use std::collections::BTreeMap;
338    use std::sync::Arc;
339    use std::sync::Mutex;
340    use std::sync::atomic::{AtomicUsize, Ordering};
341
342    struct PrepareRecordingTool {
343        prepares: Arc<AtomicUsize>,
344    }
345
346    #[derive(Default)]
347    struct DenyCancelAbility {
348        calls: Mutex<Vec<(crate::ProcessCancelSource, String)>>,
349    }
350
351    impl DenyCancelAbility {
352        fn calls(&self) -> Vec<(crate::ProcessCancelSource, String)> {
353            self.calls.lock().expect("cancel calls").clone()
354        }
355    }
356
357    #[async_trait::async_trait]
358    impl crate::ProcessCancelAbility for DenyCancelAbility {
359        async fn cancel(
360            &self,
361            _processes: &dyn crate::ProcessService,
362            request: crate::ProcessCancelRequest<'_>,
363        ) -> Result<crate::ProcessRecord, crate::PluginError> {
364            self.calls
365                .lock()
366                .expect("cancel calls")
367                .push((request.source, request.process_id.to_string()));
368            Err(crate::PluginError::Session("denied by host".to_string()))
369        }
370    }
371
372    fn process_tool_definition() -> ToolDefinition {
373        ToolDefinition::raw(
374            "tool:process_prepare",
375            "process_prepare",
376            "Records preparation before background registration.",
377            serde_json::json!({
378                "type": "object",
379                "properties": {
380                    "input": { "type": "string" }
381                },
382                "additionalProperties": false
383            }),
384            serde_json::json!({ "type": "object", "additionalProperties": true }),
385        )
386    }
387
388    #[async_trait::async_trait]
389    impl ToolProvider for PrepareRecordingTool {
390        fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
391            vec![process_tool_definition().manifest()]
392        }
393
394        fn resolve_contract(&self, name: &str) -> Option<Arc<crate::ToolContract>> {
395            (name == "process_prepare").then(|| Arc::new(process_tool_definition().contract()))
396        }
397
398        async fn prepare_tool_call(
399            &self,
400            call: ToolPrepareCall<'_>,
401        ) -> Result<PreparedToolCall, ToolResult> {
402            self.prepares.fetch_add(1, Ordering::SeqCst);
403            Ok(PreparedToolCall::from_parts(
404                call.pending.call_id,
405                call.tool_id,
406                call.pending.tool_name,
407                call.pending.args,
408                call.pending.replay,
409                serde_json::json!({ "prepared": true }),
410            ))
411        }
412
413        async fn execute(&self, call: ToolCall<'_>) -> ToolResult {
414            ToolResult::ok(serde_json::json!({
415                "payload": call.context.prepared_payload().clone(),
416            }))
417        }
418    }
419
420    #[tokio::test]
421    async fn process_handle_start_registers_prepared_tool_call() {
422        let prepares = Arc::new(AtomicUsize::new(0));
423        let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
424            prepares: Arc::clone(&prepares),
425        });
426        let plugins = PluginHost::empty()
427            .build_session("root", None)
428            .expect("plugin session");
429        let tools = Arc::clone(&provider);
430        let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
431            provider.tool_manifests(),
432            BTreeMap::new(),
433        ));
434        let host = Arc::new(crate::testing::MockSessionManager::default());
435        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
436        let dispatch = Arc::new(ToolDispatchContext {
437            plugins,
438            tools,
439            tool_catalog,
440            sessions: host.clone(),
441            session_lifecycle: host.clone(),
442            session_graph: host.clone(),
443            processes: host.clone(),
444            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
445            trigger_router: None,
446            effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
447                crate::InlineRuntimeEffectController,
448            )),
449            direct_completions: crate::DirectCompletionClient::unavailable(
450                "direct completions are unavailable in this test context",
451            ),
452            parent_invocation: None,
453            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
454                crate::PluginOptions::default(),
455                crate::SessionPolicy::default(),
456            ),
457            session_id: "session".to_string(),
458            agent_frame_id: String::new(),
459            event_tx,
460            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
461            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
462            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
463            turn_context: crate::TurnContext::default(),
464            clock: std::sync::Arc::new(crate::SystemClock),
465        });
466        let context = RuntimeExecutionContext::new(
467            "session".to_string(),
468            dispatch,
469            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
470            Arc::new(crate::InMemoryAttachmentStore::new()),
471            Arc::new(crate::ChronologicalProjection::default()),
472            None,
473            crate::TurnContext::default(),
474        )
475        .with_execution_env_spec(crate::ProcessExecutionEnvSpec::new(
476            crate::PluginOptions::default(),
477            crate::runtime::tests::helpers::standard_test_policy(),
478        ));
479
480        let started = context
481            .start_tool_process(
482                "async-call-1".to_string(),
483                "process_prepare".to_string(),
484                serde_json::json!({ "input": "live" }),
485            )
486            .await;
487        let crate::ToolCallOutcome::Success(handle) = started.output.outcome else {
488            panic!("expected process handle output");
489        };
490        assert_eq!(
491            handle
492                .to_json_value()
493                .get("id")
494                .and_then(|value| value.as_str()),
495            Some("async-call-1")
496        );
497        assert_eq!(prepares.load(Ordering::SeqCst), 1);
498        let record = host
499            .process_registry
500            .get_process("async-call-1")
501            .await
502            .expect("registered process");
503        let ProcessInput::ToolCall { call } = record.input.as_ref() else {
504            panic!("expected prepared tool call process input");
505        };
506        assert_eq!(call.tool_name, "process_prepare");
507        assert_eq!(call.args, serde_json::json!({ "input": "live" }));
508        assert_eq!(
509            call.prepared_payload,
510            serde_json::json!({ "prepared": true })
511        );
512
513        let awaited = context
514            .await_process_handle("await-async-call-1".to_string(), handle.to_json_value())
515            .await;
516
517        assert!(awaited.output.is_success());
518        let record = awaited.record.expect("await record");
519        assert_eq!(record.call_id.as_deref(), Some("await-async-call-1"));
520        assert_eq!(record.tool, "await_process");
521    }
522
523    #[tokio::test]
524    async fn process_handle_signal_appends_event_from_foreground() {
525        let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
526            prepares: Arc::new(AtomicUsize::new(0)),
527        });
528        let plugins = PluginHost::empty()
529            .build_session("root", None)
530            .expect("plugin session");
531        let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
532            provider.tool_manifests(),
533            BTreeMap::new(),
534        ));
535        let host = Arc::new(crate::testing::MockSessionManager::default());
536        host.process_registry
537            .register_process(
538                ProcessRegistration::new(
539                    "target-process",
540                    ProcessInput::External {
541                        metadata: serde_json::Value::Null,
542                    },
543                    crate::ProcessProvenance::host(),
544                )
545                .with_extra_event_types([crate::ProcessEventType {
546                    name: "signal.ready".to_string(),
547                    payload_schema: crate::LashSchema::any(),
548                    semantics: crate::ProcessEventSemanticsSpec::default(),
549                }]),
550            )
551            .await
552            .expect("register target process");
553        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
554        let dispatch = Arc::new(ToolDispatchContext {
555            plugins,
556            tools: provider,
557            tool_catalog,
558            sessions: host.clone(),
559            session_lifecycle: host.clone(),
560            session_graph: host.clone(),
561            processes: host.clone(),
562            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
563            trigger_router: None,
564            effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
565                crate::InlineRuntimeEffectController,
566            )),
567            direct_completions: crate::DirectCompletionClient::unavailable(
568                "direct completions are unavailable in this test context",
569            ),
570            parent_invocation: None,
571            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
572                crate::PluginOptions::default(),
573                crate::SessionPolicy::default(),
574            ),
575            session_id: "session".to_string(),
576            agent_frame_id: String::new(),
577            event_tx,
578            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
579            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
580            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
581            turn_context: crate::TurnContext::default(),
582            clock: std::sync::Arc::new(crate::SystemClock),
583        });
584        let context = RuntimeExecutionContext::new(
585            "session".to_string(),
586            dispatch,
587            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
588            Arc::new(crate::InMemoryAttachmentStore::new()),
589            Arc::new(crate::ChronologicalProjection::default()),
590            None,
591            crate::TurnContext::default(),
592        );
593
594        let handle = json!({ "__handle__": "process", "id": "target-process" });
595        let signalled = context
596            .signal_process_handle(
597                "signal-1".to_string(),
598                handle,
599                "ready".to_string(),
600                json!({ "kind": "ping" }),
601            )
602            .await;
603
604        assert!(
605            signalled.output.is_success(),
606            "{:?}",
607            signalled.output.value_for_projection()
608        );
609        let record = signalled.record.expect("signal record");
610        assert_eq!(record.call_id.as_deref(), Some("signal-1"));
611        assert_eq!(record.tool, "signal_process");
612        let events = host
613            .process_registry
614            .events_after("target-process", 0)
615            .await
616            .expect("list events");
617        assert!(
618            events.iter().any(|event| event.event_type == "signal.ready"
619                && event.payload.get("kind") == Some(&json!("ping"))),
620            "expected appended signal.ready event, got {events:?}"
621        );
622    }
623
624    #[tokio::test]
625    async fn process_handle_await_and_cancel_require_session_grant() {
626        let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
627            prepares: Arc::new(AtomicUsize::new(0)),
628        });
629        let plugins = PluginHost::empty()
630            .build_session("root", None)
631            .expect("plugin session");
632        let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
633            provider.tool_manifests(),
634            BTreeMap::new(),
635        ));
636        let host = Arc::new(crate::testing::MockSessionManager::default());
637        host.process_registry
638            .register_process(ProcessRegistration::new(
639                "hidden-process",
640                ProcessInput::External {
641                    metadata: serde_json::Value::Null,
642                },
643                crate::ProcessProvenance::host(),
644            ))
645            .await
646            .expect("register hidden process");
647        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
648        let dispatch = Arc::new(ToolDispatchContext {
649            plugins,
650            tools: provider,
651            tool_catalog,
652            sessions: host.clone(),
653            session_lifecycle: host.clone(),
654            session_graph: host.clone(),
655            processes: host.clone(),
656            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
657            trigger_router: None,
658            effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
659                crate::InlineRuntimeEffectController,
660            )),
661            direct_completions: crate::DirectCompletionClient::unavailable(
662                "direct completions are unavailable in this test context",
663            ),
664            parent_invocation: None,
665            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
666                crate::PluginOptions::default(),
667                crate::SessionPolicy::default(),
668            ),
669            session_id: "session".to_string(),
670            agent_frame_id: String::new(),
671            event_tx,
672            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
673            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
674            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
675            turn_context: crate::TurnContext::default(),
676            clock: std::sync::Arc::new(crate::SystemClock),
677        });
678        let context = RuntimeExecutionContext::new(
679            "session".to_string(),
680            dispatch,
681            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
682            Arc::new(crate::InMemoryAttachmentStore::new()),
683            Arc::new(crate::ChronologicalProjection::default()),
684            None,
685            crate::TurnContext::default(),
686        );
687        let handle = json!({
688            "__handle__": "process",
689            "id": "hidden-process"
690        });
691
692        let awaited = context
693            .await_process_handle("await-hidden-process".to_string(), handle.clone())
694            .await;
695        let cancelled = context
696            .cancel_process_handle("cancel-hidden-process".to_string(), handle)
697            .await;
698
699        assert!(!awaited.output.is_success());
700        assert!(!cancelled.output.is_success());
701        assert_eq!(
702            awaited
703                .record
704                .as_ref()
705                .and_then(|record| record.call_id.as_deref()),
706            Some("await-hidden-process")
707        );
708        assert_eq!(
709            cancelled
710                .record
711                .as_ref()
712                .and_then(|record| record.call_id.as_deref()),
713            Some("cancel-hidden-process")
714        );
715    }
716
717    #[tokio::test]
718    async fn process_handle_cancel_uses_host_cancel_ability() {
719        let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
720            prepares: Arc::new(AtomicUsize::new(0)),
721        });
722        let plugins = PluginHost::empty()
723            .build_session("root", None)
724            .expect("plugin session");
725        let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
726            provider.tool_manifests(),
727            BTreeMap::new(),
728        ));
729        let host = Arc::new(crate::testing::MockSessionManager::default());
730        let ability = Arc::new(DenyCancelAbility::default());
731        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
732        let dispatch = Arc::new(ToolDispatchContext {
733            plugins,
734            tools: provider,
735            tool_catalog,
736            sessions: host.clone(),
737            session_lifecycle: host.clone(),
738            session_graph: host,
739            processes: Arc::new(crate::UnavailableProcessService),
740            process_cancel_ability: ability.clone(),
741            trigger_router: None,
742            effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
743                crate::InlineRuntimeEffectController,
744            )),
745            direct_completions: crate::DirectCompletionClient::unavailable(
746                "direct completions are unavailable in this test context",
747            ),
748            parent_invocation: None,
749            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
750                crate::PluginOptions::default(),
751                crate::SessionPolicy::default(),
752            ),
753            session_id: "session".to_string(),
754            agent_frame_id: String::new(),
755            event_tx,
756            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
757            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
758            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
759            turn_context: crate::TurnContext::default(),
760            clock: std::sync::Arc::new(crate::SystemClock),
761        });
762        let context = RuntimeExecutionContext::new(
763            "session".to_string(),
764            dispatch,
765            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
766            Arc::new(crate::InMemoryAttachmentStore::new()),
767            Arc::new(crate::ChronologicalProjection::default()),
768            None,
769            crate::TurnContext::default(),
770        );
771
772        let cancelled = context
773            .cancel_process_handle(
774                "cancel-process-1".to_string(),
775                json!({
776                    "__handle__": "process",
777                    "id": "process-1"
778                }),
779            )
780            .await;
781
782        assert!(!cancelled.output.is_success());
783        assert_eq!(
784            cancelled.output.value_for_projection()["message"],
785            json!("cancel failed: plugin session error: denied by host")
786        );
787        assert_eq!(
788            ability.calls(),
789            vec![(crate::ProcessCancelSource::Process, "process-1".to_string())]
790        );
791    }
792}