1use serde_json::json;
2
3use super::execution_context::RuntimeExecutionContext;
4use super::tool_execution::ToolInvocationReply;
5use crate::tool_dispatch::ToolPreparationOutcome;
6use crate::{
7 ProcessHandleDescriptor, ProcessInput, ProcessRegistration, ToolCallOutput, ToolCallRecord,
8};
9
10const PROCESS_HANDLE_KIND: &str = "process";
11
12impl RuntimeExecutionContext<'_> {
13 pub(super) fn process_handle_value(id: &str, tool_name: &str) -> serde_json::Value {
14 let _ = tool_name;
15 Self::process_handle_json(id)
16 }
17
18 pub fn process_handle_json(id: &str) -> serde_json::Value {
19 json!({
20 "__handle__": "process",
21 "id": id,
22 })
23 }
24
25 pub(super) fn process_status_value(status: &crate::ProcessRecord) -> serde_json::Value {
26 json!({
27 "process_id": status.id,
28 "status": status.status.label(),
29 })
30 }
31
32 pub(super) fn parse_process_handle(
33 handle: &serde_json::Value,
34 ) -> Result<(String, Option<String>), String> {
35 let kind = handle
36 .get("__handle__")
37 .and_then(|value| value.as_str())
38 .ok_or_else(|| "Invalid process handle: missing `__handle__`".to_string())?;
39 if kind != PROCESS_HANDLE_KIND {
40 return Err(format!("Invalid process handle kind: {kind}"));
41 }
42 let id = handle
43 .get("id")
44 .and_then(|value| value.as_str())
45 .filter(|value| !value.is_empty())
46 .ok_or_else(|| "Invalid process handle: missing `id`".to_string())?;
47 let tool_name = handle
48 .get("tool")
49 .and_then(|value| value.as_str())
50 .map(str::to_string);
51 Ok((id.to_string(), tool_name))
52 }
53
54 pub(super) async fn start_tool_process(
55 &self,
56 call_id: String,
57 tool_name: String,
58 args: serde_json::Value,
59 ) -> ToolInvocationReply {
60 let handle_id = call_id.clone();
61 let pending_call = crate::sansio::PendingToolCall {
62 call_id: call_id.clone(),
63 tool_name: tool_name.clone(),
64 args: args.clone(),
65 replay: None,
66 };
67 let prepared_call = match self.prepare_tool_call(pending_call).await {
68 ToolPreparationOutcome::Prepared(prepared) => prepared,
69 ToolPreparationOutcome::Completed(outcome) => {
70 let mut record = outcome.record;
71 record.call_id = Some(call_id);
72 return ToolInvocationReply::from_output(record.output.clone()).with_record(record);
73 }
74 };
75 let registration = ProcessRegistration::session_start_draft(
76 handle_id.clone(),
77 ProcessInput::ToolCall {
78 call: prepared_call.clone(),
79 },
80 crate::RecoveryDisposition::Rerunnable,
83 );
84 let registration = match self
85 .attach_captured_process_execution_env(registration)
86 .await
87 {
88 Ok(registration) => registration,
89 Err(err) => return ToolInvocationReply::error(json!(err.to_string())),
90 };
91 if let Err(err) =
92 self.dispatch
93 .processes
94 .start(
95 &self.session_id,
96 registration,
97 crate::ProcessStartOptions::new().with_descriptor(
98 ProcessHandleDescriptor::new(Some("tool"), Some(tool_name.clone())),
99 ),
100 self.process_scope(self.parent_invocation.clone()),
101 )
102 .await
103 {
104 return ToolInvocationReply::error(json!(err.to_string()));
105 }
106
107 let handle_value = Self::process_handle_value(&handle_id, &tool_name);
108 let record = ToolCallRecord {
109 call_id: Some(call_id),
110 tool: prepared_call.tool_name,
111 args: prepared_call.args,
112 output: ToolCallOutput::success(handle_value.clone()),
113 duration_ms: 0,
114 };
115 ToolInvocationReply::success(handle_value).with_record(record)
116 }
117
118 fn elapsed_ms(&self, started: std::time::Instant) -> u64 {
119 self.dispatch
120 .clock
121 .now()
122 .duration_since(started)
123 .as_millis() as u64
124 }
125
126 fn recorded_process_reply(
127 call_id: String,
128 tool: impl Into<String>,
129 args: serde_json::Value,
130 output: ToolCallOutput,
131 duration_ms: u64,
132 ) -> ToolInvocationReply {
133 let record = ToolCallRecord {
134 call_id: Some(call_id),
135 tool: tool.into(),
136 args,
137 output: output.clone(),
138 duration_ms,
139 };
140 ToolInvocationReply::from_output(output).with_record(record)
141 }
142
143 fn recorded_process_error(
144 call_id: String,
145 tool: &'static str,
146 args: serde_json::Value,
147 message: impl Into<String>,
148 duration_ms: u64,
149 ) -> ToolInvocationReply {
150 let output = ToolInvocationReply::error(json!(message.into())).output;
151 Self::recorded_process_reply(call_id, tool, args, output, duration_ms)
152 }
153
154 pub(super) async fn await_process_handle(
155 &self,
156 call_id: String,
157 handle: serde_json::Value,
158 ) -> ToolInvocationReply {
159 let started = self.dispatch.clock.now();
160 let args = json!({ "handle": handle.clone() });
161 let (handle_id, _hinted_tool_name) = match Self::parse_process_handle(&handle) {
162 Ok(parsed) => parsed,
163 Err(err) => {
164 return Self::recorded_process_error(
165 call_id,
166 "await_process",
167 args,
168 err,
169 self.elapsed_ms(started),
170 );
171 }
172 };
173 if !self.is_run_local_process(&handle_id)
177 && let Err(err) = self
178 .dispatch
179 .processes
180 .validate_visible(
181 &self.session_id,
182 std::slice::from_ref(&handle_id),
183 self.process_scope(self.parent_invocation.clone()),
184 )
185 .await
186 {
187 return Self::recorded_process_error(
188 call_id,
189 "await_process",
190 args,
191 err.to_string(),
192 self.elapsed_ms(started),
193 );
194 }
195 let output = self
196 .await_process_with_cancellation(
197 &handle_id,
198 self.parent_invocation.clone(),
199 self.cancellation_token.clone(),
200 )
201 .await;
202 let output = match output {
203 Ok(output) => output.into_tool_output(),
204 Err(err) => ToolInvocationReply::error(json!(err.to_string())).output,
205 };
206 Self::recorded_process_reply(
207 call_id,
208 "await_process",
209 args,
210 output,
211 self.elapsed_ms(started),
212 )
213 }
214
215 pub(super) async fn signal_process_handle(
216 &self,
217 call_id: String,
218 handle: serde_json::Value,
219 signal_name: String,
220 payload: serde_json::Value,
221 ) -> ToolInvocationReply {
222 let started = self.dispatch.clock.now();
223 let args = json!({
224 "handle": handle.clone(),
225 "signal_name": signal_name.clone(),
226 "payload": payload.clone()
227 });
228 let (handle_id, _hinted_tool_name) = match Self::parse_process_handle(&handle) {
229 Ok(parsed) => parsed,
230 Err(err) => {
231 return Self::recorded_process_error(
232 call_id,
233 "signal_process",
234 args,
235 err,
236 self.elapsed_ms(started),
237 );
238 }
239 };
240 let signal_id = format!("process-{call_id}");
241 let output = match self
242 .dispatch
243 .processes
244 .signal(
245 &self.session_id,
246 &handle_id,
247 signal_name,
248 signal_id,
249 payload,
250 self.process_scope(self.parent_invocation.clone()),
251 )
252 .await
253 {
254 Ok(event) => ToolCallOutput::success(json!({
255 "process_id": event.process_id,
256 "sequence": event.sequence,
257 })),
258 Err(err) => ToolInvocationReply::error(json!(format!("signal failed: {err}"))).output,
259 };
260 Self::recorded_process_reply(
261 call_id,
262 "signal_process",
263 args,
264 output,
265 self.elapsed_ms(started),
266 )
267 }
268
269 pub(super) async fn cancel_process_handle(
270 &self,
271 call_id: String,
272 handle: serde_json::Value,
273 ) -> ToolInvocationReply {
274 let started = self.dispatch.clock.now();
275 let args = json!({ "handle": handle.clone() });
276 let (handle_id, _hinted_tool_name) = match Self::parse_process_handle(&handle) {
277 Ok(parsed) => parsed,
278 Err(err) => {
279 return Self::recorded_process_error(
280 call_id,
281 "cancel_process",
282 args,
283 err,
284 self.elapsed_ms(started),
285 );
286 }
287 };
288 let result = if self.is_run_local_process(&handle_id) {
292 self.dispatch
293 .processes
294 .cancel(
295 &self.session_id,
296 &handle_id,
297 self.process_scope(self.parent_invocation.clone()),
298 )
299 .await
300 } else {
301 self.dispatch
302 .process_cancel_ability
303 .cancel(
304 self.dispatch.processes.as_ref(),
305 crate::ProcessCancelRequest::new(
306 &self.session_id,
307 &handle_id,
308 self.process_scope(self.parent_invocation.clone()),
309 crate::ProcessCancelSource::Process,
310 )
311 .with_handle(handle)
312 .with_reason("requested by process handle"),
313 )
314 .await
315 };
316 let output = match result {
317 Ok(status) => ToolCallOutput::success(Self::process_status_value(&status)),
318 Err(err) => ToolInvocationReply::error(json!(format!("cancel failed: {err}"))).output,
319 };
320 Self::recorded_process_reply(
321 call_id,
322 "cancel_process",
323 args,
324 output,
325 self.elapsed_ms(started),
326 )
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use crate::plugin::PluginHost;
334 use crate::runtime::RuntimeEffectControllerHandle;
335 use crate::tool_dispatch::ToolDispatchContext;
336 use crate::{
337 PreparedToolCall, ProcessRegistry, ToolCall, ToolDefinition, ToolPrepareCall, ToolProvider,
338 ToolResult,
339 };
340 use std::collections::BTreeMap;
341 use std::sync::Arc;
342 use std::sync::Mutex;
343 use std::sync::atomic::{AtomicUsize, Ordering};
344
345 struct PrepareRecordingTool {
346 prepares: Arc<AtomicUsize>,
347 }
348
349 #[derive(Default)]
350 struct DenyCancelAbility {
351 calls: Mutex<Vec<(crate::ProcessCancelSource, String)>>,
352 }
353
354 impl DenyCancelAbility {
355 fn calls(&self) -> Vec<(crate::ProcessCancelSource, String)> {
356 self.calls.lock().expect("cancel calls").clone()
357 }
358 }
359
360 #[async_trait::async_trait]
361 impl crate::ProcessCancelAbility for DenyCancelAbility {
362 async fn cancel(
363 &self,
364 _processes: &dyn crate::ProcessService,
365 request: crate::ProcessCancelRequest<'_>,
366 ) -> Result<crate::ProcessRecord, crate::PluginError> {
367 self.calls
368 .lock()
369 .expect("cancel calls")
370 .push((request.source, request.process_id.to_string()));
371 Err(crate::PluginError::Session("denied by host".to_string()))
372 }
373 }
374
375 fn process_tool_definition() -> ToolDefinition {
376 ToolDefinition::raw(
377 "tool:process_prepare",
378 "process_prepare",
379 "Records preparation before background registration.",
380 serde_json::json!({
381 "type": "object",
382 "properties": {
383 "input": { "type": "string" }
384 },
385 "additionalProperties": false
386 }),
387 serde_json::json!({ "type": "object", "additionalProperties": true }),
388 )
389 }
390
391 #[async_trait::async_trait]
392 impl ToolProvider for PrepareRecordingTool {
393 fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
394 vec![process_tool_definition().manifest()]
395 }
396
397 fn resolve_contract(&self, name: &str) -> Option<Arc<crate::ToolContract>> {
398 (name == "process_prepare").then(|| Arc::new(process_tool_definition().contract()))
399 }
400
401 async fn prepare_tool_call(
402 &self,
403 call: ToolPrepareCall<'_>,
404 ) -> Result<PreparedToolCall, ToolResult> {
405 self.prepares.fetch_add(1, Ordering::SeqCst);
406 Ok(PreparedToolCall::from_parts(
407 call.pending.call_id,
408 call.tool_id,
409 call.pending.tool_name,
410 call.pending.args,
411 call.pending.replay,
412 serde_json::json!({ "prepared": true }),
413 ))
414 }
415
416 async fn execute(&self, call: ToolCall<'_>) -> ToolResult {
417 ToolResult::ok(serde_json::json!({
418 "payload": call.context.prepared_payload().clone(),
419 }))
420 }
421 }
422
423 #[tokio::test]
424 async fn process_handle_start_registers_prepared_tool_call() {
425 let prepares = Arc::new(AtomicUsize::new(0));
426 let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
427 prepares: Arc::clone(&prepares),
428 });
429 let plugins = PluginHost::empty()
430 .build_session("root", None)
431 .expect("plugin session");
432 let tools = Arc::clone(&provider);
433 let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
434 provider.tool_manifests(),
435 BTreeMap::new(),
436 ));
437 let host = Arc::new(crate::testing::MockSessionManager::default());
438 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
439 let dispatch = Arc::new(ToolDispatchContext {
440 plugins,
441 tools,
442 tool_catalog,
443 sessions: host.clone(),
444 session_lifecycle: host.clone(),
445 session_graph: host.clone(),
446 processes: host.clone(),
447 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
448 trigger_router: None,
449 effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
450 crate::InlineRuntimeEffectController,
451 )),
452 direct_completions: crate::DirectCompletionClient::unavailable(
453 "direct completions are unavailable in this test context",
454 ),
455 parent_invocation: None,
456 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
457 crate::PluginOptions::default(),
458 crate::SessionPolicy::default(),
459 ),
460 session_id: "session".to_string(),
461 agent_frame_id: String::new(),
462 event_tx,
463 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
464 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
465 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
466 turn_context: crate::TurnContext::default(),
467 clock: std::sync::Arc::new(crate::SystemClock),
468 });
469 let context = RuntimeExecutionContext::new(
470 "session".to_string(),
471 dispatch,
472 Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
473 Arc::new(crate::InMemoryAttachmentStore::new()),
474 Arc::new(crate::ChronologicalProjection::default()),
475 None,
476 crate::TurnContext::default(),
477 )
478 .with_execution_env_spec(crate::ProcessExecutionEnvSpec::new(
479 crate::PluginOptions::default(),
480 crate::runtime::tests::helpers::standard_test_policy(),
481 ));
482
483 let started = context
484 .start_tool_process(
485 "async-call-1".to_string(),
486 "process_prepare".to_string(),
487 serde_json::json!({ "input": "live" }),
488 )
489 .await;
490 let crate::ToolCallOutcome::Success(handle) = started.output.outcome else {
491 panic!("expected process handle output");
492 };
493 assert_eq!(
494 handle
495 .to_json_value()
496 .get("id")
497 .and_then(|value| value.as_str()),
498 Some("async-call-1")
499 );
500 assert_eq!(prepares.load(Ordering::SeqCst), 1);
501 let record = host
502 .process_registry
503 .get_process("async-call-1")
504 .await
505 .expect("registered process");
506 let ProcessInput::ToolCall { call } = record.input.as_ref() else {
507 panic!("expected prepared tool call process input");
508 };
509 assert_eq!(call.tool_name, "process_prepare");
510 assert_eq!(call.args, serde_json::json!({ "input": "live" }));
511 assert_eq!(
512 call.prepared_payload,
513 serde_json::json!({ "prepared": true })
514 );
515
516 let awaited = context
517 .await_process_handle("await-async-call-1".to_string(), handle.to_json_value())
518 .await;
519
520 assert!(awaited.output.is_success());
521 let record = awaited.record.expect("await record");
522 assert_eq!(record.call_id.as_deref(), Some("await-async-call-1"));
523 assert_eq!(record.tool, "await_process");
524 }
525
526 #[tokio::test]
527 async fn process_handle_signal_appends_event_from_foreground() {
528 let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
529 prepares: Arc::new(AtomicUsize::new(0)),
530 });
531 let plugins = PluginHost::empty()
532 .build_session("root", None)
533 .expect("plugin session");
534 let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
535 provider.tool_manifests(),
536 BTreeMap::new(),
537 ));
538 let host = Arc::new(crate::testing::MockSessionManager::default());
539 host.process_registry
540 .register_process(
541 ProcessRegistration::new(
542 "target-process",
543 ProcessInput::External {
544 metadata: serde_json::Value::Null,
545 },
546 crate::RecoveryDisposition::ExternallyOwned,
547 crate::ProcessProvenance::host(),
548 )
549 .with_extra_event_types([crate::ProcessEventType {
550 name: "signal.ready".to_string(),
551 payload_schema: crate::LashSchema::any(),
552 semantics: crate::ProcessEventSemanticsSpec::default(),
553 }]),
554 )
555 .await
556 .expect("register target process");
557 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
558 let dispatch = Arc::new(ToolDispatchContext {
559 plugins,
560 tools: provider,
561 tool_catalog,
562 sessions: host.clone(),
563 session_lifecycle: host.clone(),
564 session_graph: host.clone(),
565 processes: host.clone(),
566 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
567 trigger_router: None,
568 effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
569 crate::InlineRuntimeEffectController,
570 )),
571 direct_completions: crate::DirectCompletionClient::unavailable(
572 "direct completions are unavailable in this test context",
573 ),
574 parent_invocation: None,
575 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
576 crate::PluginOptions::default(),
577 crate::SessionPolicy::default(),
578 ),
579 session_id: "session".to_string(),
580 agent_frame_id: String::new(),
581 event_tx,
582 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
583 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
584 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
585 turn_context: crate::TurnContext::default(),
586 clock: std::sync::Arc::new(crate::SystemClock),
587 });
588 let context = RuntimeExecutionContext::new(
589 "session".to_string(),
590 dispatch,
591 Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
592 Arc::new(crate::InMemoryAttachmentStore::new()),
593 Arc::new(crate::ChronologicalProjection::default()),
594 None,
595 crate::TurnContext::default(),
596 );
597
598 let handle = json!({ "__handle__": "process", "id": "target-process" });
599 let signalled = context
600 .signal_process_handle(
601 "signal-1".to_string(),
602 handle,
603 "ready".to_string(),
604 json!({ "kind": "ping" }),
605 )
606 .await;
607
608 assert!(
609 signalled.output.is_success(),
610 "{:?}",
611 signalled.output.value_for_projection()
612 );
613 let record = signalled.record.expect("signal record");
614 assert_eq!(record.call_id.as_deref(), Some("signal-1"));
615 assert_eq!(record.tool, "signal_process");
616 let events = host
617 .process_registry
618 .events_after("target-process", 0)
619 .await
620 .expect("list events");
621 assert!(
622 events.iter().any(|event| event.event_type == "signal.ready"
623 && event.payload.get("kind") == Some(&json!("ping"))),
624 "expected appended signal.ready event, got {events:?}"
625 );
626 }
627
628 #[tokio::test]
629 async fn process_handle_await_and_cancel_require_session_grant() {
630 let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
631 prepares: Arc::new(AtomicUsize::new(0)),
632 });
633 let plugins = PluginHost::empty()
634 .build_session("root", None)
635 .expect("plugin session");
636 let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
637 provider.tool_manifests(),
638 BTreeMap::new(),
639 ));
640 let host = Arc::new(crate::testing::MockSessionManager::default());
641 host.process_registry
642 .register_process(ProcessRegistration::new(
643 "hidden-process",
644 ProcessInput::External {
645 metadata: serde_json::Value::Null,
646 },
647 crate::RecoveryDisposition::ExternallyOwned,
648 crate::ProcessProvenance::host(),
649 ))
650 .await
651 .expect("register hidden process");
652 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
653 let dispatch = Arc::new(ToolDispatchContext {
654 plugins,
655 tools: provider,
656 tool_catalog,
657 sessions: host.clone(),
658 session_lifecycle: host.clone(),
659 session_graph: host.clone(),
660 processes: host.clone(),
661 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
662 trigger_router: None,
663 effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
664 crate::InlineRuntimeEffectController,
665 )),
666 direct_completions: crate::DirectCompletionClient::unavailable(
667 "direct completions are unavailable in this test context",
668 ),
669 parent_invocation: None,
670 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
671 crate::PluginOptions::default(),
672 crate::SessionPolicy::default(),
673 ),
674 session_id: "session".to_string(),
675 agent_frame_id: String::new(),
676 event_tx,
677 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
678 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
679 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
680 turn_context: crate::TurnContext::default(),
681 clock: std::sync::Arc::new(crate::SystemClock),
682 });
683 let context = RuntimeExecutionContext::new(
684 "session".to_string(),
685 dispatch,
686 Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
687 Arc::new(crate::InMemoryAttachmentStore::new()),
688 Arc::new(crate::ChronologicalProjection::default()),
689 None,
690 crate::TurnContext::default(),
691 );
692 let handle = json!({
693 "__handle__": "process",
694 "id": "hidden-process"
695 });
696
697 let awaited = context
698 .await_process_handle("await-hidden-process".to_string(), handle.clone())
699 .await;
700 let cancelled = context
701 .cancel_process_handle("cancel-hidden-process".to_string(), handle)
702 .await;
703
704 assert!(!awaited.output.is_success());
705 assert!(!cancelled.output.is_success());
706 assert_eq!(
707 awaited
708 .record
709 .as_ref()
710 .and_then(|record| record.call_id.as_deref()),
711 Some("await-hidden-process")
712 );
713 assert_eq!(
714 cancelled
715 .record
716 .as_ref()
717 .and_then(|record| record.call_id.as_deref()),
718 Some("cancel-hidden-process")
719 );
720 }
721
722 #[tokio::test]
723 async fn process_handle_cancel_uses_host_cancel_ability() {
724 let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
725 prepares: Arc::new(AtomicUsize::new(0)),
726 });
727 let plugins = PluginHost::empty()
728 .build_session("root", None)
729 .expect("plugin session");
730 let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
731 provider.tool_manifests(),
732 BTreeMap::new(),
733 ));
734 let host = Arc::new(crate::testing::MockSessionManager::default());
735 let ability = Arc::new(DenyCancelAbility::default());
736 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
737 let dispatch = Arc::new(ToolDispatchContext {
738 plugins,
739 tools: provider,
740 tool_catalog,
741 sessions: host.clone(),
742 session_lifecycle: host.clone(),
743 session_graph: host,
744 processes: Arc::new(crate::UnavailableProcessService),
745 process_cancel_ability: ability.clone(),
746 trigger_router: None,
747 effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
748 crate::InlineRuntimeEffectController,
749 )),
750 direct_completions: crate::DirectCompletionClient::unavailable(
751 "direct completions are unavailable in this test context",
752 ),
753 parent_invocation: None,
754 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
755 crate::PluginOptions::default(),
756 crate::SessionPolicy::default(),
757 ),
758 session_id: "session".to_string(),
759 agent_frame_id: String::new(),
760 event_tx,
761 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
762 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
763 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
764 turn_context: crate::TurnContext::default(),
765 clock: std::sync::Arc::new(crate::SystemClock),
766 });
767 let context = RuntimeExecutionContext::new(
768 "session".to_string(),
769 dispatch,
770 Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
771 Arc::new(crate::InMemoryAttachmentStore::new()),
772 Arc::new(crate::ChronologicalProjection::default()),
773 None,
774 crate::TurnContext::default(),
775 );
776
777 let cancelled = context
778 .cancel_process_handle(
779 "cancel-process-1".to_string(),
780 json!({
781 "__handle__": "process",
782 "id": "process-1"
783 }),
784 )
785 .await;
786
787 assert!(!cancelled.output.is_success());
788 assert_eq!(
789 cancelled.output.value_for_projection()["message"],
790 json!("cancel failed: plugin session error: denied by host")
791 );
792 assert_eq!(
793 ability.calls(),
794 vec![(crate::ProcessCancelSource::Process, "process-1".to_string())]
795 );
796 }
797}