1use serde_json::json;
2
3use super::execution_context::RuntimeExecutionContext;
4use super::tool_execution::ToolInvocationReply;
5use crate::tool_dispatch::ToolPreparationOutcome;
6use crate::{
7 ProcessHandleDescriptor, ProcessInput, ProcessRegistration, ToolCallOutput, ToolCallRecord,
8};
9
10const PROCESS_HANDLE_KIND: &str = "process";
11
12impl RuntimeExecutionContext<'_> {
13 pub(super) fn process_handle_value(id: &str, tool_name: &str) -> serde_json::Value {
14 let _ = tool_name;
15 Self::process_handle_json(id)
16 }
17
18 pub fn process_handle_json(id: &str) -> serde_json::Value {
19 json!({
20 "__handle__": "process",
21 "id": id,
22 })
23 }
24
25 pub(super) fn process_status_value(status: &crate::ProcessRecord) -> serde_json::Value {
26 json!({
27 "process_id": status.id,
28 "status": status.status.label(),
29 })
30 }
31
32 pub(super) fn parse_process_handle(
33 handle: &serde_json::Value,
34 ) -> Result<(String, Option<String>), String> {
35 let kind = handle
36 .get("__handle__")
37 .and_then(|value| value.as_str())
38 .ok_or_else(|| "Invalid process handle: missing `__handle__`".to_string())?;
39 if kind != PROCESS_HANDLE_KIND {
40 return Err(format!("Invalid process handle kind: {kind}"));
41 }
42 let id = handle
43 .get("id")
44 .and_then(|value| value.as_str())
45 .filter(|value| !value.is_empty())
46 .ok_or_else(|| "Invalid process handle: missing `id`".to_string())?;
47 let tool_name = handle
48 .get("tool")
49 .and_then(|value| value.as_str())
50 .map(str::to_string);
51 Ok((id.to_string(), tool_name))
52 }
53
54 pub(super) async fn start_tool_process(
55 &self,
56 call_id: String,
57 tool_name: String,
58 args: serde_json::Value,
59 ) -> ToolInvocationReply {
60 let handle_id = call_id.clone();
61 let pending_call = crate::sansio::PendingToolCall {
62 call_id: call_id.clone(),
63 tool_name: tool_name.clone(),
64 args: args.clone(),
65 replay: None,
66 };
67 let prepared_call = match self.prepare_tool_call(pending_call).await {
68 ToolPreparationOutcome::Prepared(prepared) => prepared,
69 ToolPreparationOutcome::Completed(outcome) => {
70 let mut record = outcome.record;
71 record.call_id = Some(call_id);
72 return ToolInvocationReply::from_output(record.output.clone()).with_record(record);
73 }
74 };
75 let registration = ProcessRegistration::session_start_draft(
76 handle_id.clone(),
77 ProcessInput::ToolCall {
78 call: prepared_call.clone(),
79 },
80 );
81 let registration = match self
82 .attach_captured_process_execution_env(registration)
83 .await
84 {
85 Ok(registration) => registration,
86 Err(err) => return ToolInvocationReply::error(json!(err.to_string())),
87 };
88 if let Err(err) =
89 self.dispatch
90 .processes
91 .start(
92 &self.session_id,
93 registration,
94 crate::ProcessStartOptions::new().with_descriptor(
95 ProcessHandleDescriptor::new(Some("tool"), Some(tool_name.clone())),
96 ),
97 self.process_scope(self.parent_invocation.clone()),
98 )
99 .await
100 {
101 return ToolInvocationReply::error(json!(err.to_string()));
102 }
103
104 let handle_value = Self::process_handle_value(&handle_id, &tool_name);
105 let record = ToolCallRecord {
106 call_id: Some(call_id),
107 tool: prepared_call.tool_name,
108 args: prepared_call.args,
109 output: ToolCallOutput::success(handle_value.clone()),
110 duration_ms: 0,
111 };
112 ToolInvocationReply::success(handle_value).with_record(record)
113 }
114
115 fn elapsed_ms(&self, started: std::time::Instant) -> u64 {
116 self.dispatch
117 .clock
118 .now()
119 .duration_since(started)
120 .as_millis() as u64
121 }
122
123 fn recorded_process_reply(
124 call_id: String,
125 tool: impl Into<String>,
126 args: serde_json::Value,
127 output: ToolCallOutput,
128 duration_ms: u64,
129 ) -> ToolInvocationReply {
130 let record = ToolCallRecord {
131 call_id: Some(call_id),
132 tool: tool.into(),
133 args,
134 output: output.clone(),
135 duration_ms,
136 };
137 ToolInvocationReply::from_output(output).with_record(record)
138 }
139
140 fn recorded_process_error(
141 call_id: String,
142 tool: &'static str,
143 args: serde_json::Value,
144 message: impl Into<String>,
145 duration_ms: u64,
146 ) -> ToolInvocationReply {
147 let output = ToolInvocationReply::error(json!(message.into())).output;
148 Self::recorded_process_reply(call_id, tool, args, output, duration_ms)
149 }
150
151 pub(super) async fn await_process_handle(
152 &self,
153 call_id: String,
154 handle: serde_json::Value,
155 ) -> ToolInvocationReply {
156 let started = self.dispatch.clock.now();
157 let args = json!({ "handle": handle.clone() });
158 let (handle_id, _hinted_tool_name) = match Self::parse_process_handle(&handle) {
159 Ok(parsed) => parsed,
160 Err(err) => {
161 return Self::recorded_process_error(
162 call_id,
163 "await_process",
164 args,
165 err,
166 self.elapsed_ms(started),
167 );
168 }
169 };
170 if !self.is_run_local_process(&handle_id)
174 && let Err(err) = self
175 .dispatch
176 .processes
177 .validate_visible(
178 &self.session_id,
179 std::slice::from_ref(&handle_id),
180 self.process_scope(self.parent_invocation.clone()),
181 )
182 .await
183 {
184 return Self::recorded_process_error(
185 call_id,
186 "await_process",
187 args,
188 err.to_string(),
189 self.elapsed_ms(started),
190 );
191 }
192 let output = self
193 .await_process_with_cancellation(
194 &handle_id,
195 self.parent_invocation.clone(),
196 self.cancellation_token.clone(),
197 )
198 .await;
199 let output = match output {
200 Ok(output) => output.into_tool_output(),
201 Err(err) => ToolInvocationReply::error(json!(err.to_string())).output,
202 };
203 Self::recorded_process_reply(
204 call_id,
205 "await_process",
206 args,
207 output,
208 self.elapsed_ms(started),
209 )
210 }
211
212 pub(super) async fn signal_process_handle(
213 &self,
214 call_id: String,
215 handle: serde_json::Value,
216 signal_name: String,
217 payload: serde_json::Value,
218 ) -> ToolInvocationReply {
219 let started = self.dispatch.clock.now();
220 let args = json!({
221 "handle": handle.clone(),
222 "signal_name": signal_name.clone(),
223 "payload": payload.clone()
224 });
225 let (handle_id, _hinted_tool_name) = match Self::parse_process_handle(&handle) {
226 Ok(parsed) => parsed,
227 Err(err) => {
228 return Self::recorded_process_error(
229 call_id,
230 "signal_process",
231 args,
232 err,
233 self.elapsed_ms(started),
234 );
235 }
236 };
237 let signal_id = format!("process-{call_id}");
238 let output = match self
239 .dispatch
240 .processes
241 .signal(
242 &self.session_id,
243 &handle_id,
244 signal_name,
245 signal_id,
246 payload,
247 self.process_scope(self.parent_invocation.clone()),
248 )
249 .await
250 {
251 Ok(event) => ToolCallOutput::success(json!({
252 "process_id": event.process_id,
253 "sequence": event.sequence,
254 })),
255 Err(err) => ToolInvocationReply::error(json!(format!("signal failed: {err}"))).output,
256 };
257 Self::recorded_process_reply(
258 call_id,
259 "signal_process",
260 args,
261 output,
262 self.elapsed_ms(started),
263 )
264 }
265
266 pub(super) async fn cancel_process_handle(
267 &self,
268 call_id: String,
269 handle: serde_json::Value,
270 ) -> ToolInvocationReply {
271 let started = self.dispatch.clock.now();
272 let args = json!({ "handle": handle.clone() });
273 let (handle_id, _hinted_tool_name) = match Self::parse_process_handle(&handle) {
274 Ok(parsed) => parsed,
275 Err(err) => {
276 return Self::recorded_process_error(
277 call_id,
278 "cancel_process",
279 args,
280 err,
281 self.elapsed_ms(started),
282 );
283 }
284 };
285 let result = if self.is_run_local_process(&handle_id) {
289 self.dispatch
290 .processes
291 .cancel(
292 &self.session_id,
293 &handle_id,
294 self.process_scope(self.parent_invocation.clone()),
295 )
296 .await
297 } else {
298 self.dispatch
299 .process_cancel_ability
300 .cancel(
301 self.dispatch.processes.as_ref(),
302 crate::ProcessCancelRequest::new(
303 &self.session_id,
304 &handle_id,
305 self.process_scope(self.parent_invocation.clone()),
306 crate::ProcessCancelSource::Process,
307 )
308 .with_handle(handle)
309 .with_reason("requested by process handle"),
310 )
311 .await
312 };
313 let output = match result {
314 Ok(status) => ToolCallOutput::success(Self::process_status_value(&status)),
315 Err(err) => ToolInvocationReply::error(json!(format!("cancel failed: {err}"))).output,
316 };
317 Self::recorded_process_reply(
318 call_id,
319 "cancel_process",
320 args,
321 output,
322 self.elapsed_ms(started),
323 )
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330 use crate::plugin::PluginHost;
331 use crate::runtime::RuntimeEffectControllerHandle;
332 use crate::tool_dispatch::ToolDispatchContext;
333 use crate::{
334 PreparedToolCall, ProcessRegistry, ToolCall, ToolDefinition, ToolPrepareCall, ToolProvider,
335 ToolResult,
336 };
337 use std::collections::BTreeMap;
338 use std::sync::Arc;
339 use std::sync::Mutex;
340 use std::sync::atomic::{AtomicUsize, Ordering};
341
342 struct PrepareRecordingTool {
343 prepares: Arc<AtomicUsize>,
344 }
345
346 #[derive(Default)]
347 struct DenyCancelAbility {
348 calls: Mutex<Vec<(crate::ProcessCancelSource, String)>>,
349 }
350
351 impl DenyCancelAbility {
352 fn calls(&self) -> Vec<(crate::ProcessCancelSource, String)> {
353 self.calls.lock().expect("cancel calls").clone()
354 }
355 }
356
357 #[async_trait::async_trait]
358 impl crate::ProcessCancelAbility for DenyCancelAbility {
359 async fn cancel(
360 &self,
361 _processes: &dyn crate::ProcessService,
362 request: crate::ProcessCancelRequest<'_>,
363 ) -> Result<crate::ProcessRecord, crate::PluginError> {
364 self.calls
365 .lock()
366 .expect("cancel calls")
367 .push((request.source, request.process_id.to_string()));
368 Err(crate::PluginError::Session("denied by host".to_string()))
369 }
370 }
371
372 fn process_tool_definition() -> ToolDefinition {
373 ToolDefinition::raw(
374 "tool:process_prepare",
375 "process_prepare",
376 "Records preparation before background registration.",
377 serde_json::json!({
378 "type": "object",
379 "properties": {
380 "input": { "type": "string" }
381 },
382 "additionalProperties": false
383 }),
384 serde_json::json!({ "type": "object", "additionalProperties": true }),
385 )
386 }
387
388 #[async_trait::async_trait]
389 impl ToolProvider for PrepareRecordingTool {
390 fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
391 vec![process_tool_definition().manifest()]
392 }
393
394 fn resolve_contract(&self, name: &str) -> Option<Arc<crate::ToolContract>> {
395 (name == "process_prepare").then(|| Arc::new(process_tool_definition().contract()))
396 }
397
398 async fn prepare_tool_call(
399 &self,
400 call: ToolPrepareCall<'_>,
401 ) -> Result<PreparedToolCall, ToolResult> {
402 self.prepares.fetch_add(1, Ordering::SeqCst);
403 Ok(PreparedToolCall::from_parts(
404 call.pending.call_id,
405 call.tool_id,
406 call.pending.tool_name,
407 call.pending.args,
408 call.pending.replay,
409 serde_json::json!({ "prepared": true }),
410 ))
411 }
412
413 async fn execute(&self, call: ToolCall<'_>) -> ToolResult {
414 ToolResult::ok(serde_json::json!({
415 "payload": call.context.prepared_payload().clone(),
416 }))
417 }
418 }
419
420 #[tokio::test]
421 async fn process_handle_start_registers_prepared_tool_call() {
422 let prepares = Arc::new(AtomicUsize::new(0));
423 let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
424 prepares: Arc::clone(&prepares),
425 });
426 let plugins = PluginHost::empty()
427 .build_session("root", None)
428 .expect("plugin session");
429 let tools = Arc::clone(&provider);
430 let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
431 provider.tool_manifests(),
432 BTreeMap::new(),
433 ));
434 let host = Arc::new(crate::testing::MockSessionManager::default());
435 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
436 let dispatch = Arc::new(ToolDispatchContext {
437 plugins,
438 tools,
439 tool_catalog,
440 sessions: host.clone(),
441 session_lifecycle: host.clone(),
442 session_graph: host.clone(),
443 processes: host.clone(),
444 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
445 trigger_router: None,
446 effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
447 crate::InlineRuntimeEffectController,
448 )),
449 direct_completions: crate::DirectCompletionClient::unavailable(
450 "direct completions are unavailable in this test context",
451 ),
452 parent_invocation: None,
453 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
454 crate::PluginOptions::default(),
455 crate::SessionPolicy::default(),
456 ),
457 session_id: "session".to_string(),
458 agent_frame_id: String::new(),
459 event_tx,
460 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
461 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
462 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
463 turn_context: crate::TurnContext::default(),
464 clock: std::sync::Arc::new(crate::SystemClock),
465 });
466 let context = RuntimeExecutionContext::new(
467 "session".to_string(),
468 dispatch,
469 Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
470 Arc::new(crate::InMemoryAttachmentStore::new()),
471 Arc::new(crate::ChronologicalProjection::default()),
472 None,
473 crate::TurnContext::default(),
474 )
475 .with_execution_env_spec(crate::ProcessExecutionEnvSpec::new(
476 crate::PluginOptions::default(),
477 crate::runtime::tests::helpers::standard_test_policy(),
478 ));
479
480 let started = context
481 .start_tool_process(
482 "async-call-1".to_string(),
483 "process_prepare".to_string(),
484 serde_json::json!({ "input": "live" }),
485 )
486 .await;
487 let crate::ToolCallOutcome::Success(handle) = started.output.outcome else {
488 panic!("expected process handle output");
489 };
490 assert_eq!(
491 handle
492 .to_json_value()
493 .get("id")
494 .and_then(|value| value.as_str()),
495 Some("async-call-1")
496 );
497 assert_eq!(prepares.load(Ordering::SeqCst), 1);
498 let record = host
499 .process_registry
500 .get_process("async-call-1")
501 .await
502 .expect("registered process");
503 let ProcessInput::ToolCall { call } = record.input.as_ref() else {
504 panic!("expected prepared tool call process input");
505 };
506 assert_eq!(call.tool_name, "process_prepare");
507 assert_eq!(call.args, serde_json::json!({ "input": "live" }));
508 assert_eq!(
509 call.prepared_payload,
510 serde_json::json!({ "prepared": true })
511 );
512
513 let awaited = context
514 .await_process_handle("await-async-call-1".to_string(), handle.to_json_value())
515 .await;
516
517 assert!(awaited.output.is_success());
518 let record = awaited.record.expect("await record");
519 assert_eq!(record.call_id.as_deref(), Some("await-async-call-1"));
520 assert_eq!(record.tool, "await_process");
521 }
522
523 #[tokio::test]
524 async fn process_handle_signal_appends_event_from_foreground() {
525 let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
526 prepares: Arc::new(AtomicUsize::new(0)),
527 });
528 let plugins = PluginHost::empty()
529 .build_session("root", None)
530 .expect("plugin session");
531 let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
532 provider.tool_manifests(),
533 BTreeMap::new(),
534 ));
535 let host = Arc::new(crate::testing::MockSessionManager::default());
536 host.process_registry
537 .register_process(
538 ProcessRegistration::new(
539 "target-process",
540 ProcessInput::External {
541 metadata: serde_json::Value::Null,
542 },
543 crate::ProcessProvenance::host(),
544 )
545 .with_extra_event_types([crate::ProcessEventType {
546 name: "signal.ready".to_string(),
547 payload_schema: crate::LashSchema::any(),
548 semantics: crate::ProcessEventSemanticsSpec::default(),
549 }]),
550 )
551 .await
552 .expect("register target process");
553 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
554 let dispatch = Arc::new(ToolDispatchContext {
555 plugins,
556 tools: provider,
557 tool_catalog,
558 sessions: host.clone(),
559 session_lifecycle: host.clone(),
560 session_graph: host.clone(),
561 processes: host.clone(),
562 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
563 trigger_router: None,
564 effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
565 crate::InlineRuntimeEffectController,
566 )),
567 direct_completions: crate::DirectCompletionClient::unavailable(
568 "direct completions are unavailable in this test context",
569 ),
570 parent_invocation: None,
571 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
572 crate::PluginOptions::default(),
573 crate::SessionPolicy::default(),
574 ),
575 session_id: "session".to_string(),
576 agent_frame_id: String::new(),
577 event_tx,
578 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
579 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
580 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
581 turn_context: crate::TurnContext::default(),
582 clock: std::sync::Arc::new(crate::SystemClock),
583 });
584 let context = RuntimeExecutionContext::new(
585 "session".to_string(),
586 dispatch,
587 Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
588 Arc::new(crate::InMemoryAttachmentStore::new()),
589 Arc::new(crate::ChronologicalProjection::default()),
590 None,
591 crate::TurnContext::default(),
592 );
593
594 let handle = json!({ "__handle__": "process", "id": "target-process" });
595 let signalled = context
596 .signal_process_handle(
597 "signal-1".to_string(),
598 handle,
599 "ready".to_string(),
600 json!({ "kind": "ping" }),
601 )
602 .await;
603
604 assert!(
605 signalled.output.is_success(),
606 "{:?}",
607 signalled.output.value_for_projection()
608 );
609 let record = signalled.record.expect("signal record");
610 assert_eq!(record.call_id.as_deref(), Some("signal-1"));
611 assert_eq!(record.tool, "signal_process");
612 let events = host
613 .process_registry
614 .events_after("target-process", 0)
615 .await
616 .expect("list events");
617 assert!(
618 events.iter().any(|event| event.event_type == "signal.ready"
619 && event.payload.get("kind") == Some(&json!("ping"))),
620 "expected appended signal.ready event, got {events:?}"
621 );
622 }
623
624 #[tokio::test]
625 async fn process_handle_await_and_cancel_require_session_grant() {
626 let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
627 prepares: Arc::new(AtomicUsize::new(0)),
628 });
629 let plugins = PluginHost::empty()
630 .build_session("root", None)
631 .expect("plugin session");
632 let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
633 provider.tool_manifests(),
634 BTreeMap::new(),
635 ));
636 let host = Arc::new(crate::testing::MockSessionManager::default());
637 host.process_registry
638 .register_process(ProcessRegistration::new(
639 "hidden-process",
640 ProcessInput::External {
641 metadata: serde_json::Value::Null,
642 },
643 crate::ProcessProvenance::host(),
644 ))
645 .await
646 .expect("register hidden process");
647 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
648 let dispatch = Arc::new(ToolDispatchContext {
649 plugins,
650 tools: provider,
651 tool_catalog,
652 sessions: host.clone(),
653 session_lifecycle: host.clone(),
654 session_graph: host.clone(),
655 processes: host.clone(),
656 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
657 trigger_router: None,
658 effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
659 crate::InlineRuntimeEffectController,
660 )),
661 direct_completions: crate::DirectCompletionClient::unavailable(
662 "direct completions are unavailable in this test context",
663 ),
664 parent_invocation: None,
665 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
666 crate::PluginOptions::default(),
667 crate::SessionPolicy::default(),
668 ),
669 session_id: "session".to_string(),
670 agent_frame_id: String::new(),
671 event_tx,
672 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
673 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
674 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
675 turn_context: crate::TurnContext::default(),
676 clock: std::sync::Arc::new(crate::SystemClock),
677 });
678 let context = RuntimeExecutionContext::new(
679 "session".to_string(),
680 dispatch,
681 Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
682 Arc::new(crate::InMemoryAttachmentStore::new()),
683 Arc::new(crate::ChronologicalProjection::default()),
684 None,
685 crate::TurnContext::default(),
686 );
687 let handle = json!({
688 "__handle__": "process",
689 "id": "hidden-process"
690 });
691
692 let awaited = context
693 .await_process_handle("await-hidden-process".to_string(), handle.clone())
694 .await;
695 let cancelled = context
696 .cancel_process_handle("cancel-hidden-process".to_string(), handle)
697 .await;
698
699 assert!(!awaited.output.is_success());
700 assert!(!cancelled.output.is_success());
701 assert_eq!(
702 awaited
703 .record
704 .as_ref()
705 .and_then(|record| record.call_id.as_deref()),
706 Some("await-hidden-process")
707 );
708 assert_eq!(
709 cancelled
710 .record
711 .as_ref()
712 .and_then(|record| record.call_id.as_deref()),
713 Some("cancel-hidden-process")
714 );
715 }
716
717 #[tokio::test]
718 async fn process_handle_cancel_uses_host_cancel_ability() {
719 let provider: Arc<dyn ToolProvider> = Arc::new(PrepareRecordingTool {
720 prepares: Arc::new(AtomicUsize::new(0)),
721 });
722 let plugins = PluginHost::empty()
723 .build_session("root", None)
724 .expect("plugin session");
725 let tool_catalog = Arc::new(crate::ToolCatalog::from_tools(
726 provider.tool_manifests(),
727 BTreeMap::new(),
728 ));
729 let host = Arc::new(crate::testing::MockSessionManager::default());
730 let ability = Arc::new(DenyCancelAbility::default());
731 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
732 let dispatch = Arc::new(ToolDispatchContext {
733 plugins,
734 tools: provider,
735 tool_catalog,
736 sessions: host.clone(),
737 session_lifecycle: host.clone(),
738 session_graph: host,
739 processes: Arc::new(crate::UnavailableProcessService),
740 process_cancel_ability: ability.clone(),
741 trigger_router: None,
742 effect_controller: RuntimeEffectControllerHandle::shared(Arc::new(
743 crate::InlineRuntimeEffectController,
744 )),
745 direct_completions: crate::DirectCompletionClient::unavailable(
746 "direct completions are unavailable in this test context",
747 ),
748 parent_invocation: None,
749 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
750 crate::PluginOptions::default(),
751 crate::SessionPolicy::default(),
752 ),
753 session_id: "session".to_string(),
754 agent_frame_id: String::new(),
755 event_tx,
756 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
757 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
758 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
759 turn_context: crate::TurnContext::default(),
760 clock: std::sync::Arc::new(crate::SystemClock),
761 });
762 let context = RuntimeExecutionContext::new(
763 "session".to_string(),
764 dispatch,
765 Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
766 Arc::new(crate::InMemoryAttachmentStore::new()),
767 Arc::new(crate::ChronologicalProjection::default()),
768 None,
769 crate::TurnContext::default(),
770 );
771
772 let cancelled = context
773 .cancel_process_handle(
774 "cancel-process-1".to_string(),
775 json!({
776 "__handle__": "process",
777 "id": "process-1"
778 }),
779 )
780 .await;
781
782 assert!(!cancelled.output.is_success());
783 assert_eq!(
784 cancelled.output.value_for_projection()["message"],
785 json!("cancel failed: plugin session error: denied by host")
786 );
787 assert_eq!(
788 ability.calls(),
789 vec![(crate::ProcessCancelSource::Process, "process-1".to_string())]
790 );
791 }
792}