1use serde_json::json;
2
3use super::execution_context::RuntimeExecutionContext;
4use super::tool_execution::ToolInvocationReply;
5use crate::tool_dispatch::ToolPreparationOutcome;
6use crate::{
7 ProcessHandleDescriptor, ProcessInput, ProcessRegistration, ToolCallOutput, ToolCallRecord,
8};
9
10const PROCESS_HANDLE_KIND: &str = "process";
11
12impl RuntimeExecutionContext<'_> {
13 pub(super) fn process_handle_value(id: &str, tool_name: &str) -> serde_json::Value {
14 let _ = tool_name;
15 Self::process_handle_json(id)
16 }
17
18 pub fn process_handle_json(id: &str) -> serde_json::Value {
19 json!({
20 "__handle__": "process",
21 "id": id,
22 })
23 }
24
25 pub(super) fn process_status_value(status: &crate::ProcessRecord) -> serde_json::Value {
26 json!({
27 "process_id": status.id,
28 "status": status.status.label(),
29 })
30 }
31
32 pub(super) fn parse_process_handle(
33 handle: &serde_json::Value,
34 ) -> Result<(String, Option<String>), String> {
35 let kind = handle
36 .get("__handle__")
37 .and_then(|value| value.as_str())
38 .ok_or_else(|| "Invalid process handle: missing `__handle__`".to_string())?;
39 if kind != PROCESS_HANDLE_KIND {
40 return Err(format!("Invalid process handle kind: {kind}"));
41 }
42 let id = handle
43 .get("id")
44 .and_then(|value| value.as_str())
45 .filter(|value| !value.is_empty())
46 .ok_or_else(|| "Invalid process handle: missing `id`".to_string())?;
47 let tool_name = handle
48 .get("tool")
49 .and_then(|value| value.as_str())
50 .map(str::to_string);
51 Ok((id.to_string(), tool_name))
52 }
53
54 pub(super) async fn start_tool_process(
55 &self,
56 call_id: String,
57 tool_name: String,
58 args: serde_json::Value,
59 ) -> ToolInvocationReply {
60 let handle_id = call_id.clone();
61 let pending_call = crate::sansio::PendingToolCall {
62 call_id: call_id.clone(),
63 tool_name: tool_name.clone(),
64 args: args.clone(),
65 replay: None,
66 };
67 let prepared_call = match self.prepare_tool_call(pending_call).await {
68 ToolPreparationOutcome::Prepared(prepared) => prepared,
69 ToolPreparationOutcome::Completed(outcome) => {
70 let mut record = outcome.record;
71 record.call_id = Some(call_id);
72 return ToolInvocationReply::from_output(record.output.clone()).with_record(record);
73 }
74 };
75 let registration = ProcessRegistration::session_start_draft(
76 handle_id.clone(),
77 ProcessInput::ToolCall {
78 call: prepared_call.clone(),
79 },
80 );
81 let registration = match self
82 .attach_captured_process_execution_env(registration)
83 .await
84 {
85 Ok(registration) => registration,
86 Err(err) => return ToolInvocationReply::error(json!(err.to_string())),
87 };
88 if let Err(err) =
89 self.dispatch
90 .processes
91 .start(
92 &self.session_id,
93 registration,
94 crate::ProcessStartOptions::new().with_descriptor(
95 ProcessHandleDescriptor::new(Some("tool"), Some(tool_name.clone())),
96 ),
97 self.process_scope(self.parent_invocation.clone()),
98 )
99 .await
100 {
101 return ToolInvocationReply::error(json!(err.to_string()));
102 }
103
104 let handle_value = Self::process_handle_value(&handle_id, &tool_name);
105 let record = ToolCallRecord {
106 call_id: Some(call_id),
107 tool: prepared_call.tool_name,
108 args: prepared_call.args,
109 output: ToolCallOutput::success(handle_value.clone()),
110 duration_ms: 0,
111 };
112 ToolInvocationReply::success(handle_value).with_record(record)
113 }
114
115 fn recorded_process_reply(
116 call_id: String,
117 tool: impl Into<String>,
118 args: serde_json::Value,
119 output: ToolCallOutput,
120 started: std::time::Instant,
121 ) -> ToolInvocationReply {
122 let record = ToolCallRecord {
123 call_id: Some(call_id),
124 tool: tool.into(),
125 args,
126 output: output.clone(),
127 duration_ms: started.elapsed().as_millis() as u64,
128 };
129 ToolInvocationReply::from_output(output).with_record(record)
130 }
131
132 fn recorded_process_error(
133 call_id: String,
134 tool: &'static str,
135 args: serde_json::Value,
136 message: impl Into<String>,
137 started: std::time::Instant,
138 ) -> ToolInvocationReply {
139 let output = ToolInvocationReply::error(json!(message.into())).output;
140 Self::recorded_process_reply(call_id, tool, args, output, started)
141 }
142
143 pub(super) async fn await_process_handle(
144 &self,
145 call_id: String,
146 handle: serde_json::Value,
147 ) -> ToolInvocationReply {
148 let started = std::time::Instant::now();
149 let args = json!({ "handle": handle.clone() });
150 let (handle_id, _hinted_tool_name) = match Self::parse_process_handle(&handle) {
151 Ok(parsed) => parsed,
152 Err(err) => {
153 return Self::recorded_process_error(call_id, "await_process", args, err, started);
154 }
155 };
156 if !self.is_run_local_process(&handle_id)
160 && let Err(err) = self
161 .dispatch
162 .processes
163 .validate_visible(
164 &self.session_id,
165 std::slice::from_ref(&handle_id),
166 self.process_scope(self.parent_invocation.clone()),
167 )
168 .await
169 {
170 return Self::recorded_process_error(
171 call_id,
172 "await_process",
173 args,
174 err.to_string(),
175 started,
176 );
177 }
178 let output = self
179 .await_process_with_cancellation(
180 &handle_id,
181 self.parent_invocation.clone(),
182 self.cancellation_token.clone(),
183 )
184 .await;
185 let output = match output {
186 Ok(output) => output.into_tool_output(),
187 Err(err) => ToolInvocationReply::error(json!(err.to_string())).output,
188 };
189 Self::recorded_process_reply(call_id, "await_process", args, output, started)
190 }
191
192 pub(super) async fn signal_process_handle(
193 &self,
194 call_id: String,
195 handle: serde_json::Value,
196 signal_name: String,
197 payload: serde_json::Value,
198 ) -> ToolInvocationReply {
199 let started = std::time::Instant::now();
200 let args = json!({
201 "handle": handle.clone(),
202 "signal_name": signal_name.clone(),
203 "payload": payload.clone()
204 });
205 let (handle_id, _hinted_tool_name) = match Self::parse_process_handle(&handle) {
206 Ok(parsed) => parsed,
207 Err(err) => {
208 return Self::recorded_process_error(call_id, "signal_process", args, err, started);
209 }
210 };
211 let signal_id = format!("process-{call_id}");
212 let output = match self
213 .dispatch
214 .processes
215 .signal(
216 &self.session_id,
217 &handle_id,
218 signal_name,
219 signal_id,
220 payload,
221 self.process_scope(self.parent_invocation.clone()),
222 )
223 .await
224 {
225 Ok(event) => ToolCallOutput::success(json!({
226 "process_id": event.process_id,
227 "sequence": event.sequence,
228 })),
229 Err(err) => ToolInvocationReply::error(json!(format!("signal failed: {err}"))).output,
230 };
231 Self::recorded_process_reply(call_id, "signal_process", args, output, started)
232 }
233
234 pub(super) async fn cancel_process_handle(
235 &self,
236 call_id: String,
237 handle: serde_json::Value,
238 ) -> ToolInvocationReply {
239 let started = std::time::Instant::now();
240 let args = json!({ "handle": handle.clone() });
241 let (handle_id, _hinted_tool_name) = match Self::parse_process_handle(&handle) {
242 Ok(parsed) => parsed,
243 Err(err) => {
244 return Self::recorded_process_error(call_id, "cancel_process", args, err, started);
245 }
246 };
247 let result = if self.is_run_local_process(&handle_id) {
251 self.dispatch
252 .processes
253 .cancel(
254 &self.session_id,
255 &handle_id,
256 self.process_scope(self.parent_invocation.clone()),
257 )
258 .await
259 } else {
260 self.dispatch
261 .process_cancel_ability
262 .cancel(
263 self.dispatch.processes.as_ref(),
264 crate::ProcessCancelRequest::new(
265 &self.session_id,
266 &handle_id,
267 self.process_scope(self.parent_invocation.clone()),
268 crate::ProcessCancelSource::Process,
269 )
270 .with_handle(handle)
271 .with_reason("requested by process handle"),
272 )
273 .await
274 };
275 let output = match result {
276 Ok(status) => ToolCallOutput::success(Self::process_status_value(&status)),
277 Err(err) => ToolInvocationReply::error(json!(format!("cancel failed: {err}"))).output,
278 };
279 Self::recorded_process_reply(call_id, "cancel_process", args, output, started)
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use crate::plugin::PluginHost;
287 use crate::runtime::RuntimeEffectControllerHandle;
288 use crate::tool_dispatch::ToolDispatchContext;
289 use crate::{
290 PreparedToolCall, ProcessRegistry, ToolCall, ToolDefinition, ToolPrepareCall, ToolProvider,
291 ToolResult,
292 };
293 use std::collections::BTreeMap;
294 use std::sync::Arc;
295 use std::sync::Mutex;
296 use std::sync::atomic::{AtomicUsize, Ordering};
297
298 struct PrepareRecordingTool {
299 prepares: Arc<AtomicUsize>,
300 }
301
302 #[derive(Default)]
303 struct DenyCancelAbility {
304 calls: Mutex<Vec<(crate::ProcessCancelSource, String)>>,
305 }
306
307 impl DenyCancelAbility {
308 fn calls(&self) -> Vec<(crate::ProcessCancelSource, String)> {
309 self.calls.lock().expect("cancel calls").clone()
310 }
311 }
312
313 #[async_trait::async_trait]
314 impl crate::ProcessCancelAbility for DenyCancelAbility {
315 async fn cancel(
316 &self,
317 _processes: &dyn crate::ProcessService,
318 request: crate::ProcessCancelRequest<'_>,
319 ) -> Result<crate::ProcessRecord, crate::PluginError> {
320 self.calls
321 .lock()
322 .expect("cancel calls")
323 .push((request.source, request.process_id.to_string()));
324 Err(crate::PluginError::Session("denied by host".to_string()))
325 }
326 }
327
328 fn process_tool_definition() -> ToolDefinition {
329 ToolDefinition::raw(
330 "tool:process_prepare",
331 "process_prepare",
332 "Records preparation before background registration.",
333 serde_json::json!({
334 "type": "object",
335 "properties": {
336 "input": { "type": "string" }
337 },
338 "additionalProperties": false
339 }),
340 serde_json::json!({ "type": "object", "additionalProperties": true }),
341 )
342 }
343
344 #[async_trait::async_trait]
345 impl ToolProvider for PrepareRecordingTool {
346 fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
347 vec![process_tool_definition().manifest()]
348 }
349
350 fn resolve_contract(&self, name: &str) -> Option<Arc<crate::ToolContract>> {
351 (name == "process_prepare").then(|| Arc::new(process_tool_definition().contract()))
352 }
353
354 async fn prepare_tool_call(
355 &self,
356 call: ToolPrepareCall<'_>,
357 ) -> Result<PreparedToolCall, ToolResult> {
358 self.prepares.fetch_add(1, Ordering::SeqCst);
359 Ok(PreparedToolCall::from_parts(
360 call.pending.call_id,
361 call.pending.tool_name,
362 call.pending.args,
363 call.pending.replay,
364 serde_json::json!({ "prepared": true }),
365 ))
366 }
367
368 async fn execute(&self, call: ToolCall<'_>) -> ToolResult {
369 ToolResult::ok(serde_json::json!({
370 "payload": call.context.prepared_payload().clone(),
371 }))
372 }
373 }
374
375 #[tokio::test]
376 async fn process_handle_start_registers_prepared_tool_call() {
377 let prepares = Arc::new(AtomicUsize::new(0));
378 let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
379 prepares: Arc::clone(&prepares),
380 });
381 let plugins = PluginHost::empty()
382 .build_session("root", None)
383 .expect("plugin session");
384 let tools = Arc::clone(&provider);
385 let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
386 provider.tool_manifests(),
387 BTreeMap::new(),
388 ));
389 let host = Arc::new(crate::testing::MockSessionManager::default());
390 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
391 let dispatch = Arc::new(ToolDispatchContext {
392 plugins,
393 tools,
394 tool_catalog,
395 sessions: host.clone(),
396 session_lifecycle: host.clone(),
397 session_graph: host.clone(),
398 processes: host.clone(),
399 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
400 trigger_router: None,
401 effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
402 crate::InlineRuntimeEffectController,
403 )),
404 direct_completions: crate::DirectCompletionClient::unavailable(
405 "direct completions are unavailable in this test context",
406 ),
407 parent_invocation: None,
408 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
409 crate::PluginOptions::default(),
410 crate::SessionPolicy::default(),
411 ),
412 session_id: "session".to_string(),
413 agent_frame_id: String::new(),
414 event_tx,
415 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
416 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
417 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
418 turn_context: crate::TurnContext::default(),
419 });
420 let context = RuntimeExecutionContext::new(
421 "session".to_string(),
422 dispatch,
423 Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
424 Arc::new(crate::InMemoryAttachmentStore::new()),
425 Arc::new(crate::ChronologicalProjection::default()),
426 None,
427 crate::TurnContext::default(),
428 )
429 .with_execution_env_spec(crate::ProcessExecutionEnvSpec::new(
430 crate::PluginOptions::default(),
431 crate::runtime::tests::helpers::standard_test_policy(),
432 ));
433
434 let started = context
435 .start_tool_process(
436 "async-call-1".to_string(),
437 "process_prepare".to_string(),
438 serde_json::json!({ "input": "live" }),
439 )
440 .await;
441 let crate::ToolCallOutcome::Success(handle) = started.output.outcome else {
442 panic!("expected process handle output");
443 };
444 assert_eq!(
445 handle
446 .to_json_value()
447 .get("id")
448 .and_then(|value| value.as_str()),
449 Some("async-call-1")
450 );
451 assert_eq!(prepares.load(Ordering::SeqCst), 1);
452 let record = host
453 .process_registry
454 .get_process("async-call-1")
455 .await
456 .expect("registered process");
457 let ProcessInput::ToolCall { call } = record.input.as_ref() else {
458 panic!("expected prepared tool call process input");
459 };
460 assert_eq!(call.tool_name, "process_prepare");
461 assert_eq!(call.args, serde_json::json!({ "input": "live" }));
462 assert_eq!(
463 call.prepared_payload,
464 serde_json::json!({ "prepared": true })
465 );
466
467 let awaited = context
468 .await_process_handle("await-async-call-1".to_string(), handle.to_json_value())
469 .await;
470
471 assert!(awaited.output.is_success());
472 let record = awaited.record.expect("await record");
473 assert_eq!(record.call_id.as_deref(), Some("await-async-call-1"));
474 assert_eq!(record.tool, "await_process");
475 }
476
477 #[tokio::test]
478 async fn process_handle_signal_appends_event_from_foreground() {
479 let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
480 prepares: Arc::new(AtomicUsize::new(0)),
481 });
482 let plugins = PluginHost::empty()
483 .build_session("root", None)
484 .expect("plugin session");
485 let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
486 provider.tool_manifests(),
487 BTreeMap::new(),
488 ));
489 let host = Arc::new(crate::testing::MockSessionManager::default());
490 host.process_registry
491 .register_process(
492 ProcessRegistration::new(
493 "target-process",
494 ProcessInput::External {
495 metadata: serde_json::Value::Null,
496 },
497 crate::ProcessProvenance::host(),
498 )
499 .with_extra_event_types([crate::ProcessEventType {
500 name: "signal.ready".to_string(),
501 payload_schema: crate::LashSchema::any(),
502 semantics: crate::ProcessEventSemanticsSpec::default(),
503 }]),
504 )
505 .await
506 .expect("register target process");
507 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
508 let dispatch = Arc::new(ToolDispatchContext {
509 plugins,
510 tools: provider,
511 tool_catalog,
512 sessions: host.clone(),
513 session_lifecycle: host.clone(),
514 session_graph: host.clone(),
515 processes: host.clone(),
516 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
517 trigger_router: None,
518 effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
519 crate::InlineRuntimeEffectController,
520 )),
521 direct_completions: crate::DirectCompletionClient::unavailable(
522 "direct completions are unavailable in this test context",
523 ),
524 parent_invocation: None,
525 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
526 crate::PluginOptions::default(),
527 crate::SessionPolicy::default(),
528 ),
529 session_id: "session".to_string(),
530 agent_frame_id: String::new(),
531 event_tx,
532 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
533 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
534 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
535 turn_context: crate::TurnContext::default(),
536 });
537 let context = RuntimeExecutionContext::new(
538 "session".to_string(),
539 dispatch,
540 Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
541 Arc::new(crate::InMemoryAttachmentStore::new()),
542 Arc::new(crate::ChronologicalProjection::default()),
543 None,
544 crate::TurnContext::default(),
545 );
546
547 let handle = json!({ "__handle__": "process", "id": "target-process" });
548 let signalled = context
549 .signal_process_handle(
550 "signal-1".to_string(),
551 handle,
552 "ready".to_string(),
553 json!({ "kind": "ping" }),
554 )
555 .await;
556
557 assert!(
558 signalled.output.is_success(),
559 "{:?}",
560 signalled.output.value_for_projection()
561 );
562 let record = signalled.record.expect("signal record");
563 assert_eq!(record.call_id.as_deref(), Some("signal-1"));
564 assert_eq!(record.tool, "signal_process");
565 let events = host
566 .process_registry
567 .events_after("target-process", 0)
568 .await
569 .expect("list events");
570 assert!(
571 events.iter().any(|event| event.event_type == "signal.ready"
572 && event.payload.get("kind") == Some(&json!("ping"))),
573 "expected appended signal.ready event, got {events:?}"
574 );
575 }
576
577 #[tokio::test]
578 async fn process_handle_await_and_cancel_require_session_grant() {
579 let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
580 prepares: Arc::new(AtomicUsize::new(0)),
581 });
582 let plugins = PluginHost::empty()
583 .build_session("root", None)
584 .expect("plugin session");
585 let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
586 provider.tool_manifests(),
587 BTreeMap::new(),
588 ));
589 let host = Arc::new(crate::testing::MockSessionManager::default());
590 host.process_registry
591 .register_process(ProcessRegistration::new(
592 "hidden-process",
593 ProcessInput::External {
594 metadata: serde_json::Value::Null,
595 },
596 crate::ProcessProvenance::host(),
597 ))
598 .await
599 .expect("register hidden process");
600 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
601 let dispatch = Arc::new(ToolDispatchContext {
602 plugins,
603 tools: provider,
604 tool_catalog,
605 sessions: host.clone(),
606 session_lifecycle: host.clone(),
607 session_graph: host.clone(),
608 processes: host.clone(),
609 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
610 trigger_router: None,
611 effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
612 crate::InlineRuntimeEffectController,
613 )),
614 direct_completions: crate::DirectCompletionClient::unavailable(
615 "direct completions are unavailable in this test context",
616 ),
617 parent_invocation: None,
618 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
619 crate::PluginOptions::default(),
620 crate::SessionPolicy::default(),
621 ),
622 session_id: "session".to_string(),
623 agent_frame_id: String::new(),
624 event_tx,
625 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
626 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
627 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
628 turn_context: crate::TurnContext::default(),
629 });
630 let context = RuntimeExecutionContext::new(
631 "session".to_string(),
632 dispatch,
633 Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
634 Arc::new(crate::InMemoryAttachmentStore::new()),
635 Arc::new(crate::ChronologicalProjection::default()),
636 None,
637 crate::TurnContext::default(),
638 );
639 let handle = json!({
640 "__handle__": "process",
641 "id": "hidden-process"
642 });
643
644 let awaited = context
645 .await_process_handle("await-hidden-process".to_string(), handle.clone())
646 .await;
647 let cancelled = context
648 .cancel_process_handle("cancel-hidden-process".to_string(), handle)
649 .await;
650
651 assert!(!awaited.output.is_success());
652 assert!(!cancelled.output.is_success());
653 assert_eq!(
654 awaited
655 .record
656 .as_ref()
657 .and_then(|record| record.call_id.as_deref()),
658 Some("await-hidden-process")
659 );
660 assert_eq!(
661 cancelled
662 .record
663 .as_ref()
664 .and_then(|record| record.call_id.as_deref()),
665 Some("cancel-hidden-process")
666 );
667 }
668
669 #[tokio::test]
670 async fn process_handle_cancel_uses_host_cancel_ability() {
671 let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
672 prepares: Arc::new(AtomicUsize::new(0)),
673 });
674 let plugins = PluginHost::empty()
675 .build_session("root", None)
676 .expect("plugin session");
677 let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
678 provider.tool_manifests(),
679 BTreeMap::new(),
680 ));
681 let host = Arc::new(crate::testing::MockSessionManager::default());
682 let ability = Arc::new(DenyCancelAbility::default());
683 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
684 let dispatch = Arc::new(ToolDispatchContext {
685 plugins,
686 tools: provider,
687 tool_catalog,
688 sessions: host.clone(),
689 session_lifecycle: host.clone(),
690 session_graph: host,
691 processes: Arc::new(crate::UnavailableProcessService),
692 process_cancel_ability: ability.clone(),
693 trigger_router: None,
694 effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
695 crate::InlineRuntimeEffectController,
696 )),
697 direct_completions: crate::DirectCompletionClient::unavailable(
698 "direct completions are unavailable in this test context",
699 ),
700 parent_invocation: None,
701 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
702 crate::PluginOptions::default(),
703 crate::SessionPolicy::default(),
704 ),
705 session_id: "session".to_string(),
706 agent_frame_id: String::new(),
707 event_tx,
708 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
709 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
710 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
711 turn_context: crate::TurnContext::default(),
712 });
713 let context = RuntimeExecutionContext::new(
714 "session".to_string(),
715 dispatch,
716 Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
717 Arc::new(crate::InMemoryAttachmentStore::new()),
718 Arc::new(crate::ChronologicalProjection::default()),
719 None,
720 crate::TurnContext::default(),
721 );
722
723 let cancelled = context
724 .cancel_process_handle(
725 "cancel-process-1".to_string(),
726 json!({
727 "__handle__": "process",
728 "id": "process-1"
729 }),
730 )
731 .await;
732
733 assert!(!cancelled.output.is_success());
734 assert_eq!(
735 cancelled.output.value_for_projection()["message"],
736 json!("cancel failed: plugin session error: denied by host")
737 );
738 assert_eq!(
739 ability.calls(),
740 vec![(crate::ProcessCancelSource::Process, "process-1".to_string())]
741 );
742 }
743}