1use std::sync::Arc;
2
3use tokio::sync::mpsc::Sender;
4use tokio_util::sync::CancellationToken;
5
6use crate::tool_dispatch::ToolDispatchContext;
7use crate::{TurnActivity, TurnActivityId, TurnEvent};
8
9#[derive(Clone)]
10pub struct RuntimeExecutionContext<'run> {
11 pub(super) session_id: String,
12 pub(super) dispatch: Arc<ToolDispatchContext<'run>>,
13 process_env_store: Arc<dyn crate::ProcessExecutionEnvStore>,
14 attachment_store: Arc<dyn crate::AttachmentStore>,
15 chronological_projection: Arc<crate::ChronologicalProjection>,
16 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
17 turn_context: crate::TurnContext,
18 execution_env_spec: crate::ProcessExecutionEnvSpec,
19 process_originator: Option<crate::ProcessOriginator>,
20 pub(super) runtime_process_id: Option<String>,
21 pub(super) process_event_context: Option<RuntimeExecutionProcessEventContext>,
22 process_env_ref: Option<crate::ProcessExecutionEnvRef>,
23 process_wake_target: Option<crate::SessionScope>,
24 pub(super) parent_invocation: Option<crate::RuntimeInvocation>,
25 turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
26 pub(super) turn_event_tx: Option<Sender<TurnActivity>>,
27 pub(super) cancellation_token: Option<CancellationToken>,
28 started_process_ids: Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
33}
34
35#[derive(Clone)]
36pub(super) struct RuntimeExecutionProcessEventContext {
37 pub process_id: String,
38 pub registry: Arc<dyn crate::ProcessRegistry>,
39 pub store: Option<Arc<dyn crate::RuntimePersistence>>,
40 pub session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
41 pub queued_work_poke: Option<crate::QueuedWorkPoke>,
42}
43
44impl<'run> RuntimeExecutionContext<'run> {
45 pub(crate) fn drain_tool_trigger_outcomes(
46 &self,
47 ) -> Result<Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>, crate::PluginError> {
48 self.dispatch
49 .trigger_outcomes
50 .drain()
51 .map_err(crate::PluginError::Session)
52 }
53
54 pub(super) fn process_scope(
55 &self,
56 parent_invocation: Option<crate::RuntimeInvocation>,
57 ) -> crate::ProcessOpScope<'_> {
58 crate::ProcessOpScope::new(self.dispatch.effect_controller.scoped())
59 .with_parent_invocation(parent_invocation)
60 .with_agent_frame_id(Some(self.dispatch.agent_frame_id.clone()))
61 }
62
63 #[allow(
64 clippy::too_many_arguments,
65 reason = "code execution bridge carries explicit per-turn runtime dependencies"
66 )]
67 pub(crate) fn new(
68 session_id: String,
69 dispatch: Arc<ToolDispatchContext<'run>>,
70 process_env_store: Arc<dyn crate::ProcessExecutionEnvStore>,
71 attachment_store: Arc<dyn crate::AttachmentStore>,
72 chronological_projection: Arc<crate::ChronologicalProjection>,
73 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
74 turn_context: crate::TurnContext,
75 ) -> Self {
76 Self {
77 session_id,
78 dispatch,
79 process_env_store,
80 attachment_store,
81 chronological_projection,
82 protocol_extension,
83 turn_context,
84 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
85 crate::PluginOptions::default(),
86 crate::SessionPolicy::default(),
87 ),
88 process_originator: None,
89 runtime_process_id: None,
90 process_event_context: None,
91 started_process_ids: Arc::default(),
92 process_env_ref: None,
93 process_wake_target: None,
94 parent_invocation: None,
95 turn_phase_probe: None,
96 turn_event_tx: None,
97 cancellation_token: None,
98 }
99 }
100
101 pub fn session_id(&self) -> &str {
102 &self.session_id
103 }
104
105 pub fn session_scope(&self) -> crate::SessionScope {
106 self.dispatch
107 .agent_frame_id
108 .as_str()
109 .is_empty()
110 .then(|| crate::SessionScope::new(self.session_id.clone()))
111 .unwrap_or_else(|| {
112 crate::SessionScope::for_agent_frame(
113 self.session_id.clone(),
114 self.dispatch.agent_frame_id.clone(),
115 )
116 })
117 }
118
119 pub fn trigger_store(&self) -> Option<Arc<dyn crate::TriggerStore>> {
120 self.dispatch
121 .trigger_router
122 .as_ref()
123 .map(crate::TriggerRouter::store)
124 }
125
126 pub fn trigger_registration_originator(&self) -> crate::ProcessOriginator {
127 self.process_originator
128 .clone()
129 .unwrap_or_else(|| crate::ProcessOriginator::session(self.session_scope()))
130 }
131
132 pub fn trigger_registration_wake_target(&self) -> Option<crate::SessionScope> {
133 self.process_wake_target
134 .clone()
135 .or_else(|| Some(self.session_scope()))
136 }
137
138 pub fn attachment_store(&self) -> Arc<dyn crate::AttachmentStore> {
139 Arc::clone(&self.attachment_store)
140 }
141
142 pub fn process_env_store(&self) -> Arc<dyn crate::ProcessExecutionEnvStore> {
143 Arc::clone(&self.process_env_store)
144 }
145
146 pub fn chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
147 Arc::clone(&self.chronological_projection)
148 }
149
150 pub fn protocol_extension<T: 'static>(&self) -> Option<&T> {
151 self.protocol_extension
152 .as_ref()
153 .and_then(|extension| extension.as_any().downcast_ref::<T>())
154 }
155
156 pub fn turn_context(&self) -> &crate::TurnContext {
157 &self.turn_context
158 }
159
160 pub fn tool_catalog(&self) -> Arc<crate::ToolCatalog> {
161 Arc::clone(&self.dispatch.tool_catalog)
162 }
163
164 pub(crate) fn session_graph_service(&self) -> &dyn crate::plugin::SessionGraphService {
165 self.dispatch.session_graph.as_ref()
166 }
167
168 pub(super) async fn emit_turn_activity(
169 &self,
170 correlation_id: TurnActivityId,
171 event: TurnEvent,
172 ) {
173 if let Some(tx) = &self.turn_event_tx {
174 let _ = tx.send(TurnActivity::new(correlation_id, event)).await;
175 }
176 }
177
178 pub(crate) fn with_turn_event_sender(mut self, turn_event_tx: Sender<TurnActivity>) -> Self {
179 self.turn_event_tx = Some(turn_event_tx);
180 self
181 }
182
183 pub(crate) fn with_parent_invocation(mut self, metadata: crate::RuntimeInvocation) -> Self {
184 self.parent_invocation = Some(metadata);
185 self
186 }
187
188 pub(crate) fn with_execution_env_spec(
189 mut self,
190 execution_env_spec: crate::ProcessExecutionEnvSpec,
191 ) -> Self {
192 self.execution_env_spec = execution_env_spec;
193 self
194 }
195
196 pub(crate) fn with_process_registration_context(
197 mut self,
198 registration: &crate::ProcessRegistration,
199 ) -> Self {
200 self.process_originator = Some(registration.provenance.originator.clone());
201 self.runtime_process_id = Some(registration.id.clone());
202 self.process_env_ref = registration.env_ref.clone();
203 self.process_wake_target = registration.wake_target.clone();
204 self
205 }
206
207 pub(crate) fn with_process_event_context(
208 mut self,
209 process_id: impl Into<String>,
210 registry: Arc<dyn crate::ProcessRegistry>,
211 store: Option<Arc<dyn crate::RuntimePersistence>>,
212 session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
213 queued_work_poke: Option<crate::QueuedWorkPoke>,
214 ) -> Self {
215 self.process_event_context = Some(RuntimeExecutionProcessEventContext {
216 process_id: process_id.into(),
217 registry,
218 store,
219 session_store_factory,
220 queued_work_poke,
221 });
222 self
223 }
224
225 pub(super) fn record_started_process(&self, process_id: &str) {
229 self.started_process_ids
230 .lock()
231 .expect("started process ids lock")
232 .insert(process_id.to_string());
233 }
234
235 pub(super) fn is_run_local_process(&self, process_id: &str) -> bool {
236 self.started_process_ids
237 .lock()
238 .expect("started process ids lock")
239 .contains(process_id)
240 }
241
242 pub(crate) fn process_spawn_provenance(&self) -> Option<crate::ProcessSpawnProvenance> {
243 self.process_originator
244 .clone()
245 .map(|originator| crate::ProcessSpawnProvenance {
246 originator,
247 wake_target: self.process_wake_target.clone(),
248 })
249 }
250
251 pub(super) async fn attach_captured_process_execution_env(
252 &self,
253 registration: crate::ProcessRegistration,
254 ) -> Result<crate::ProcessRegistration, crate::PluginError> {
255 if registration.env_ref.is_some() {
256 return Ok(registration);
257 }
258 match registration.input.as_ref() {
259 crate::ProcessInput::ToolCall { .. } | crate::ProcessInput::Engine { .. } => {
260 let env_ref = self.captured_process_execution_env_ref().await?;
261 Ok(registration.with_execution_env_ref(Some(env_ref)))
262 }
263 crate::ProcessInput::External { .. } | crate::ProcessInput::SessionTurn { .. } => {
264 Ok(registration)
265 }
266 }
267 }
268
269 pub async fn captured_process_execution_env_ref(
270 &self,
271 ) -> Result<crate::ProcessExecutionEnvRef, crate::PluginError> {
272 if let Some(env_ref) = self.process_env_ref.clone() {
273 return Ok(env_ref);
274 }
275 crate::persist_process_execution_env(
276 self.process_env_store.as_ref(),
277 &self.execution_env_spec,
278 )
279 .await
280 }
281
282 pub(crate) fn with_turn_phase_probe(
283 mut self,
284 probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
285 ) -> Self {
286 self.turn_phase_probe = probe;
287 self
288 }
289
290 #[doc(hidden)]
291 pub fn named_phase(&self, phase: &'static str) -> crate::runtime::RuntimeNamedPhase {
292 crate::runtime::RuntimeNamedPhase::begin(self.turn_phase_probe.clone(), phase)
293 }
294
295 pub fn parent_invocation(&self) -> Option<&crate::RuntimeInvocation> {
296 self.parent_invocation.as_ref()
297 }
298
299 pub(crate) fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
300 self.cancellation_token = Some(cancellation_token);
301 self
302 }
303
304 pub(crate) fn tool_scheduling(&self, name: &str) -> crate::ToolScheduling {
305 crate::tool_dispatch::resolve_tool_scheduling(&self.dispatch, name)
306 }
307
308 pub fn callable_tool_manifest(&self, name: &str) -> Option<crate::ToolManifest> {
309 crate::tool_dispatch::resolve_callable_manifest(&self.dispatch, name)
310 }
311
312 pub fn callable_tool_manifest_by_id(&self, id: &crate::ToolId) -> Option<crate::ToolManifest> {
313 crate::tool_dispatch::resolve_callable_manifest_by_id(&self.dispatch, id)
314 }
315
316 pub fn tool_argument_projection_policy(
317 &self,
318 name: &str,
319 ) -> crate::ToolArgumentProjectionPolicy {
320 crate::tool_dispatch::resolve_tool_argument_projection_policy(&self.dispatch, name)
321 }
322
323 pub async fn start_child_process(
324 &self,
325 registration: crate::ProcessRegistration,
326 kind: impl Into<String>,
327 label: Option<String>,
328 ) -> crate::ToolInvocationReply {
329 let _phase = self.named_phase("process.start_child");
330 let registration = match self
331 .attach_captured_process_execution_env(registration)
332 .await
333 {
334 Ok(registration) => registration,
335 Err(err) => {
336 return crate::ToolInvocationReply::error(serde_json::json!(err.to_string()));
337 }
338 };
339 let process_id = registration.id.clone();
340 let mut options = crate::ProcessStartOptions::new()
341 .with_descriptor(crate::ProcessHandleDescriptor::new(Some(kind), label));
342 if let Some(spawn) = self.process_spawn_provenance() {
343 options = options.with_spawn_provenance(spawn);
344 }
345 match self
346 .dispatch
347 .processes
348 .start(
349 &self.session_id,
350 registration,
351 options,
352 self.process_scope(self.parent_invocation.clone()),
353 )
354 .await
355 {
356 Ok(_) => {
357 self.record_started_process(&process_id);
358 crate::ToolInvocationReply::success(Self::process_handle_json(&process_id))
359 }
360 Err(err) => crate::ToolInvocationReply::error(serde_json::json!(err.to_string())),
361 }
362 }
363
364 pub async fn sleep_process(
365 &self,
366 scope: &str,
367 sequence: u64,
368 duration_ms: u64,
369 ) -> Result<(), crate::RuntimeEffectControllerError> {
370 let cancellation = self.cancellation_token.clone().unwrap_or_default();
371 let invocation = crate::runtime::causal::process_sleep_invocation(
372 &self.session_id,
373 self.parent_invocation.as_ref(),
374 scope,
375 sequence,
376 );
377 let outcome = self
378 .dispatch
379 .effect_controller
380 .controller()
381 .execute_effect(
382 crate::RuntimeEffectEnvelope::new(
383 invocation,
384 crate::RuntimeEffectCommand::Sleep { duration_ms },
385 ),
386 crate::RuntimeEffectLocalExecutor::sleep(cancellation),
387 )
388 .await?;
389 match outcome {
390 crate::RuntimeEffectOutcome::Sleep => Ok(()),
391 other => Err(crate::RuntimeEffectControllerError::new(
392 "runtime_effect_wrong_outcome",
393 format!("expected sleep outcome, got {}", other.kind().as_str()),
394 )),
395 }
396 }
397
398 pub async fn await_process_signal_event(
399 &self,
400 process_id: &str,
401 signal_name: &str,
402 event_ordinal: u64,
403 ) -> Result<serde_json::Value, crate::RuntimeEffectControllerError> {
404 let cancellation = self.cancellation_token.clone().unwrap_or_default();
405 let key = self
406 .dispatch
407 .effect_controller
408 .controller()
409 .await_event_key(
410 &crate::ExecutionScope::process(process_id),
411 crate::AwaitEventWaitIdentity::process_signal(
412 process_id,
413 signal_name,
414 event_ordinal,
415 ),
416 )
417 .await?;
418 let invocation = crate::runtime::causal::process_await_event_invocation(
419 &self.session_id,
420 self.parent_invocation.as_ref(),
421 process_id,
422 signal_name,
423 event_ordinal,
424 );
425 let outcome = self
426 .dispatch
427 .effect_controller
428 .controller()
429 .execute_effect(
430 crate::RuntimeEffectEnvelope::new(
431 invocation,
432 crate::RuntimeEffectCommand::AwaitEvent { key },
433 ),
434 crate::RuntimeEffectLocalExecutor::await_event(cancellation, None),
435 )
436 .await?;
437 match outcome.into_await_event()? {
438 crate::Resolution::Ok(value) => Ok(value),
439 crate::Resolution::Err(err) => Err(crate::RuntimeEffectControllerError::new(
440 err.code,
441 err.message,
442 )),
443 crate::Resolution::Timeout => Err(crate::RuntimeEffectControllerError::new(
444 "process_signal_wait_timeout",
445 "process signal wait timed out",
446 )),
447 crate::Resolution::Cancelled => Err(crate::RuntimeEffectControllerError::new(
448 "process_signal_wait_cancelled",
449 "process signal wait was cancelled",
450 )),
451 }
452 }
453
454 pub async fn signal_process_by_id(
455 &self,
456 registry: Arc<dyn crate::ProcessRegistry>,
457 process_id: &str,
458 signal_name: &str,
459 signal_id: String,
460 payload: serde_json::Value,
461 ) -> Result<crate::ProcessEvent, crate::RuntimeEffectControllerError> {
462 let event_type = crate::process_signal_event_type(signal_name)?;
463 let replay_key = format!("process:{process_id}:signal.{signal_name}:{signal_id}");
464 let signal_payload = payload.clone();
465 let command = crate::ProcessCommand::Signal {
466 process_id: process_id.to_string(),
467 signal_name: signal_name.to_string(),
468 signal_id,
469 request: crate::ProcessEventAppendRequest::new(event_type.clone(), payload)
470 .with_replay_key(replay_key),
471 };
472 let effect_id = command.effect_id();
473 let invocation = crate::runtime::causal::process_effect_invocation(
474 &self.session_id,
475 self.parent_invocation.clone(),
476 &effect_id,
477 );
478 let outcome = self
479 .dispatch
480 .effect_controller
481 .controller()
482 .execute_effect(
483 crate::RuntimeEffectEnvelope::new(
484 invocation,
485 crate::RuntimeEffectCommand::process(command),
486 ),
487 crate::RuntimeEffectLocalExecutor::processes(Arc::clone(®istry)),
488 )
489 .await?;
490 match outcome.into_process()? {
491 crate::ProcessEffectOutcome::Signal { event } => {
492 let waiting_ordinal =
493 registry
494 .get_process(process_id)
495 .await
496 .and_then(|record| match record.wait {
497 Some(crate::WaitState {
498 kind:
499 crate::WaitKind::Signal {
500 name,
501 event_type: wait_event_type,
502 ordinal,
503 ..
504 },
505 ..
506 }) if name == signal_name && wait_event_type == event_type => {
507 Some(ordinal)
508 }
509 _ => None,
510 });
511 let ordinal = match waiting_ordinal {
512 Some(ordinal) => ordinal,
513 None => {
514 registry
515 .count_events_through(process_id, &event_type, event.sequence)
516 .await?
517 }
518 };
519 if ordinal > 0 {
520 let key = self
521 .dispatch
522 .effect_controller
523 .controller()
524 .await_event_key(
525 &crate::ExecutionScope::process(process_id),
526 crate::AwaitEventWaitIdentity::process_signal(
527 process_id,
528 signal_name,
529 ordinal,
530 ),
531 )
532 .await?;
533 let _ = self
534 .dispatch
535 .effect_controller
536 .controller()
537 .resolve_await_event(&key, crate::Resolution::Ok(signal_payload))
538 .await?;
539 }
540 Ok(event)
541 }
542 other => Err(crate::RuntimeEffectControllerError::new(
543 "runtime_effect_wrong_outcome",
544 format!("expected signal outcome, got {other:?}"),
545 )),
546 }
547 }
548
549 pub async fn append_process_event(
550 &self,
551 registry: Arc<dyn crate::ProcessRegistry>,
552 process_id: &str,
553 request: crate::ProcessEventAppendRequest,
554 ) -> Result<crate::ProcessEvent, crate::PluginError> {
555 let result = registry.append_event(process_id, request).await?;
556 if let Some(context) = self.process_event_context.as_ref() {
557 crate::tool_provider::process_events::enqueue_wake_delivery(
558 context.store.clone(),
559 context.session_store_factory.as_ref(),
560 result.wake_delivery,
561 Some(self.session_graph_service()),
562 context.queued_work_poke.as_ref(),
563 )
564 .await?;
565 }
566 Ok(result.event)
567 }
568}
569
570#[cfg(test)]
571mod tests {
572 use super::*;
573 use crate::tool_dispatch::ToolDispatchContext;
574 use crate::{ToolCall, ToolProvider, ToolResult};
575
576 struct NoopTools;
577
578 #[async_trait::async_trait]
579 impl ToolProvider for NoopTools {
580 fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
581 Vec::new()
582 }
583
584 fn resolve_contract(&self, _name: &str) -> Option<Arc<crate::ToolContract>> {
585 None
586 }
587
588 async fn execute(&self, _call: ToolCall<'_>) -> ToolResult {
589 ToolResult::err_fmt("not used")
590 }
591 }
592
593 #[test]
594 fn tool_argument_projection_policy_resolves_from_active_catalog_and_defaults_unknown() {
595 let tool = crate::ToolDefinition::raw(
596 "tool:seedy",
597 "seedy",
598 "Seed-aware",
599 crate::ToolDefinition::default_input_schema(),
600 serde_json::json!({ "type": "string" }),
601 )
602 .with_argument_projection(
603 crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed"),
604 );
605 let plugins = crate::plugin::PluginHost::empty()
606 .build_session("session", None)
607 .expect("plugin session");
608 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
609 let dispatch = Arc::new(ToolDispatchContext {
610 plugins,
611 tools: Arc::new(NoopTools),
612 tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
613 vec![tool.manifest()],
614 std::collections::BTreeMap::new(),
615 )),
616 sessions: Arc::new(crate::testing::MockSessionManager::default()),
617 session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
618 session_graph: Arc::new(crate::testing::MockSessionManager::default()),
619 processes: Arc::new(crate::UnavailableProcessService),
620 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
621 trigger_router: None,
622 effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
623 crate::InlineRuntimeEffectController,
624 )),
625 direct_completions: crate::DirectCompletionClient::unavailable(
626 "direct completions are unavailable in this test context",
627 ),
628 parent_invocation: None,
629 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
630 crate::PluginOptions::default(),
631 crate::SessionPolicy::default(),
632 ),
633 session_id: "session".to_string(),
634 agent_frame_id: String::new(),
635 event_tx,
636 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
637 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
638 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
639 turn_context: crate::TurnContext::default(),
640 });
641 let ctx = RuntimeExecutionContext::new(
642 "session".to_string(),
643 dispatch,
644 Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
645 Arc::new(crate::InMemoryAttachmentStore::new()),
646 Arc::new(crate::ChronologicalProjection::default()),
647 None,
648 crate::TurnContext::default(),
649 );
650
651 assert_eq!(
652 ctx.tool_argument_projection_policy("seedy"),
653 crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed")
654 );
655 assert_eq!(
656 ctx.tool_argument_projection_policy("missing"),
657 crate::ToolArgumentProjectionPolicy::MaterializeProjectedValues
658 );
659 }
660}