1use std::sync::Arc;
2
3use tokio::sync::mpsc::Sender;
4use tokio_util::sync::CancellationToken;
5
6use crate::tool_dispatch::ToolDispatchContext;
7use crate::{TurnActivity, TurnActivityId, TurnEvent};
8
9#[derive(Clone)]
10pub struct RuntimeExecutionContext<'run> {
11 pub(super) session_id: String,
12 pub(super) dispatch: Arc<ToolDispatchContext<'run>>,
13 process_env_store: Arc<dyn crate::ProcessExecutionEnvStore>,
14 attachment_store: Arc<dyn crate::AttachmentStore>,
15 chronological_projection: Arc<crate::ChronologicalProjection>,
16 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
17 turn_context: crate::TurnContext,
18 execution_env_spec: crate::ProcessExecutionEnvSpec,
19 process_originator: Option<crate::ProcessOriginator>,
20 pub(super) runtime_process_id: Option<String>,
21 pub(super) process_event_context: Option<RuntimeExecutionProcessEventContext>,
22 process_env_ref: Option<crate::ProcessExecutionEnvRef>,
23 process_wake_target: Option<crate::SessionScope>,
24 pub(super) parent_invocation: Option<crate::RuntimeInvocation>,
25 turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
26 pub(super) turn_event_tx: Option<Sender<TurnActivity>>,
27 pub(super) cancellation_token: Option<CancellationToken>,
28 started_process_ids: Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
33}
34
35#[derive(Clone)]
36pub(super) struct RuntimeExecutionProcessEventContext {
37 pub process_id: String,
38 pub registry: Arc<dyn crate::ProcessRegistry>,
39 pub store: Option<Arc<dyn crate::RuntimePersistence>>,
40 pub session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
41 pub queued_work_driver: Option<crate::QueuedWorkDriver>,
42}
43
44impl<'run> RuntimeExecutionContext<'run> {
45 pub(crate) fn drain_tool_trigger_outcomes(
46 &self,
47 ) -> Result<Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>, crate::PluginError> {
48 self.dispatch
49 .trigger_outcomes
50 .drain()
51 .map_err(crate::PluginError::Session)
52 }
53
54 pub(super) fn process_scope(
55 &self,
56 parent_invocation: Option<crate::RuntimeInvocation>,
57 ) -> crate::ProcessOpScope<'_> {
58 crate::ProcessOpScope::new(self.dispatch.effect_controller.scoped())
59 .with_parent_invocation(parent_invocation)
60 .with_agent_frame_id(Some(self.dispatch.agent_frame_id.clone()))
61 }
62
63 #[allow(
64 clippy::too_many_arguments,
65 reason = "code execution bridge carries explicit per-turn runtime dependencies"
66 )]
67 pub(crate) fn new(
68 session_id: String,
69 dispatch: Arc<ToolDispatchContext<'run>>,
70 process_env_store: Arc<dyn crate::ProcessExecutionEnvStore>,
71 attachment_store: Arc<dyn crate::AttachmentStore>,
72 chronological_projection: Arc<crate::ChronologicalProjection>,
73 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
74 turn_context: crate::TurnContext,
75 ) -> Self {
76 Self {
77 session_id,
78 dispatch,
79 process_env_store,
80 attachment_store,
81 chronological_projection,
82 protocol_extension,
83 turn_context,
84 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
85 crate::PluginOptions::default(),
86 crate::SessionPolicy::default(),
87 ),
88 process_originator: None,
89 runtime_process_id: None,
90 process_event_context: None,
91 started_process_ids: Arc::default(),
92 process_env_ref: None,
93 process_wake_target: None,
94 parent_invocation: None,
95 turn_phase_probe: None,
96 turn_event_tx: None,
97 cancellation_token: None,
98 }
99 }
100
101 pub fn session_id(&self) -> &str {
102 &self.session_id
103 }
104
105 pub fn execution_scope_id(&self) -> String {
106 self.dispatch
107 .effect_controller
108 .scoped()
109 .scope_id()
110 .to_string()
111 }
112
113 pub fn session_scope(&self) -> crate::SessionScope {
114 self.dispatch
115 .agent_frame_id
116 .as_str()
117 .is_empty()
118 .then(|| crate::SessionScope::new(self.session_id.clone()))
119 .unwrap_or_else(|| {
120 crate::SessionScope::for_agent_frame(
121 self.session_id.clone(),
122 self.dispatch.agent_frame_id.clone(),
123 )
124 })
125 }
126
127 pub fn trigger_store(&self) -> Option<Arc<dyn crate::TriggerStore>> {
128 self.dispatch
129 .trigger_router
130 .as_ref()
131 .map(crate::TriggerRouter::store)
132 }
133
134 pub fn trigger_registration_originator(&self) -> crate::ProcessOriginator {
135 self.process_originator
136 .clone()
137 .unwrap_or_else(|| crate::ProcessOriginator::session(self.session_scope()))
138 }
139
140 pub fn trigger_registration_wake_target(&self) -> Option<crate::SessionScope> {
141 self.process_wake_target
142 .clone()
143 .or_else(|| Some(self.session_scope()))
144 }
145
146 pub fn attachment_store(&self) -> Arc<dyn crate::AttachmentStore> {
147 Arc::clone(&self.attachment_store)
148 }
149
150 pub fn process_env_store(&self) -> Arc<dyn crate::ProcessExecutionEnvStore> {
151 Arc::clone(&self.process_env_store)
152 }
153
154 pub fn chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
155 Arc::clone(&self.chronological_projection)
156 }
157
158 pub fn protocol_extension<T: 'static>(&self) -> Option<&T> {
159 self.protocol_extension
160 .as_ref()
161 .and_then(|extension| extension.as_any().downcast_ref::<T>())
162 }
163
164 pub fn turn_context(&self) -> &crate::TurnContext {
165 &self.turn_context
166 }
167
168 pub fn tool_catalog(&self) -> Arc<crate::ToolCatalog> {
169 Arc::clone(&self.dispatch.tool_catalog)
170 }
171
172 pub(crate) fn session_graph_service(&self) -> &dyn crate::plugin::SessionGraphService {
173 self.dispatch.session_graph.as_ref()
174 }
175
176 pub(super) async fn emit_turn_activity(
177 &self,
178 correlation_id: TurnActivityId,
179 event: TurnEvent,
180 ) {
181 if let Some(tx) = &self.turn_event_tx {
182 let _ = tx.send(TurnActivity::new(correlation_id, event)).await;
183 }
184 }
185
186 pub(crate) fn with_turn_event_sender(mut self, turn_event_tx: Sender<TurnActivity>) -> Self {
187 self.turn_event_tx = Some(turn_event_tx);
188 self
189 }
190
191 pub(crate) fn with_parent_invocation(mut self, metadata: crate::RuntimeInvocation) -> Self {
192 self.parent_invocation = Some(metadata);
193 self
194 }
195
196 pub(crate) fn with_execution_env_spec(
197 mut self,
198 execution_env_spec: crate::ProcessExecutionEnvSpec,
199 ) -> Self {
200 self.execution_env_spec = execution_env_spec;
201 self
202 }
203
204 pub(crate) fn with_process_registration_context(
205 mut self,
206 registration: &crate::ProcessRegistration,
207 ) -> Self {
208 self.process_originator = Some(registration.provenance.originator.clone());
209 self.runtime_process_id = Some(registration.id.clone());
210 self.process_env_ref = registration.env_ref.clone();
211 self.process_wake_target = registration.wake_target.clone();
212 self
213 }
214
215 pub(crate) fn with_process_event_context(
216 mut self,
217 process_id: impl Into<String>,
218 registry: Arc<dyn crate::ProcessRegistry>,
219 store: Option<Arc<dyn crate::RuntimePersistence>>,
220 session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
221 queued_work_driver: Option<crate::QueuedWorkDriver>,
222 ) -> Self {
223 self.process_event_context = Some(RuntimeExecutionProcessEventContext {
224 process_id: process_id.into(),
225 registry,
226 store,
227 session_store_factory,
228 queued_work_driver,
229 });
230 self
231 }
232
233 pub(super) fn record_started_process(&self, process_id: &str) {
237 self.started_process_ids
238 .lock()
239 .expect("started process ids lock")
240 .insert(process_id.to_string());
241 }
242
243 pub(super) fn is_run_local_process(&self, process_id: &str) -> bool {
244 self.started_process_ids
245 .lock()
246 .expect("started process ids lock")
247 .contains(process_id)
248 }
249
250 pub(crate) fn process_spawn_provenance(&self) -> Option<crate::ProcessSpawnProvenance> {
251 self.process_originator
252 .clone()
253 .map(|originator| crate::ProcessSpawnProvenance {
254 originator,
255 wake_target: self.process_wake_target.clone(),
256 })
257 }
258
259 pub(super) async fn attach_captured_process_execution_env(
260 &self,
261 registration: crate::ProcessRegistration,
262 ) -> Result<crate::ProcessRegistration, crate::PluginError> {
263 if registration.env_ref.is_some() {
264 return Ok(registration);
265 }
266 match registration.input.as_ref() {
267 crate::ProcessInput::ToolCall { .. } | crate::ProcessInput::Engine { .. } => {
268 let env_ref = self.captured_process_execution_env_ref().await?;
269 Ok(registration.with_execution_env_ref(Some(env_ref)))
270 }
271 crate::ProcessInput::External { .. } | crate::ProcessInput::SessionTurn { .. } => {
272 Ok(registration)
273 }
274 }
275 }
276
277 pub async fn captured_process_execution_env_ref(
278 &self,
279 ) -> Result<crate::ProcessExecutionEnvRef, crate::PluginError> {
280 if let Some(env_ref) = self.process_env_ref.clone() {
281 return Ok(env_ref);
282 }
283 crate::persist_process_execution_env(
284 self.process_env_store.as_ref(),
285 &self.execution_env_spec,
286 )
287 .await
288 }
289
290 pub(crate) fn with_turn_phase_probe(
291 mut self,
292 probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
293 ) -> Self {
294 self.turn_phase_probe = probe;
295 self
296 }
297
298 #[doc(hidden)]
299 pub fn named_phase(&self, phase: &'static str) -> crate::runtime::RuntimeNamedPhase {
300 crate::runtime::RuntimeNamedPhase::begin(self.turn_phase_probe.clone(), phase)
301 }
302
303 pub fn parent_invocation(&self) -> Option<&crate::RuntimeInvocation> {
304 self.parent_invocation.as_ref()
305 }
306
307 pub(crate) fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
308 self.cancellation_token = Some(cancellation_token);
309 self
310 }
311
312 pub(crate) fn tool_scheduling(&self, name: &str) -> crate::ToolScheduling {
313 crate::tool_dispatch::resolve_tool_scheduling(&self.dispatch, name)
314 }
315
316 pub fn callable_tool_manifest(&self, name: &str) -> Option<crate::ToolManifest> {
317 crate::tool_dispatch::resolve_callable_manifest(&self.dispatch, name)
318 }
319
320 pub fn callable_tool_manifest_by_id(&self, id: &crate::ToolId) -> Option<crate::ToolManifest> {
321 crate::tool_dispatch::resolve_callable_manifest_by_id(&self.dispatch, id)
322 }
323
324 pub fn tool_argument_projection_policy(
325 &self,
326 name: &str,
327 ) -> crate::ToolArgumentProjectionPolicy {
328 crate::tool_dispatch::resolve_tool_argument_projection_policy(&self.dispatch, name)
329 }
330
331 pub async fn start_child_process(
332 &self,
333 registration: crate::ProcessRegistration,
334 kind: impl Into<String>,
335 label: Option<String>,
336 ) -> crate::ToolInvocationReply {
337 let _phase = self.named_phase("process.start_child");
338 let registration = match self
339 .attach_captured_process_execution_env(registration)
340 .await
341 {
342 Ok(registration) => registration,
343 Err(err) => {
344 return crate::ToolInvocationReply::error(serde_json::json!(err.to_string()));
345 }
346 };
347 let process_id = registration.id.clone();
348 let mut options = crate::ProcessStartOptions::new()
349 .with_descriptor(crate::ProcessHandleDescriptor::new(Some(kind), label));
350 if let Some(spawn) = self.process_spawn_provenance() {
351 options = options.with_spawn_provenance(spawn);
352 }
353 match self
354 .dispatch
355 .processes
356 .start(
357 &self.session_id,
358 registration,
359 options,
360 self.process_scope(self.parent_invocation.clone()),
361 )
362 .await
363 {
364 Ok(_) => {
365 self.record_started_process(&process_id);
366 crate::ToolInvocationReply::success(Self::process_handle_json(&process_id))
367 }
368 Err(err) => crate::ToolInvocationReply::error(serde_json::json!(err.to_string())),
369 }
370 }
371
372 pub async fn sleep_process(
373 &self,
374 scope: &str,
375 sequence: u64,
376 duration_ms: u64,
377 ) -> Result<(), crate::RuntimeEffectControllerError> {
378 let cancellation = self.cancellation_token.clone().unwrap_or_default();
379 let invocation = crate::runtime::causal::process_sleep_invocation(
380 &self.session_id,
381 self.parent_invocation.as_ref(),
382 scope,
383 sequence,
384 );
385 let outcome = self
386 .dispatch
387 .effect_controller
388 .controller()
389 .execute_effect(
390 crate::RuntimeEffectEnvelope::new(
391 invocation,
392 crate::RuntimeEffectCommand::Sleep { duration_ms },
393 ),
394 crate::RuntimeEffectLocalExecutor::sleep_with_clock(
395 cancellation,
396 std::sync::Arc::clone(&self.dispatch.clock),
397 ),
398 )
399 .await?;
400 match outcome {
401 crate::RuntimeEffectOutcome::Sleep => Ok(()),
402 other => Err(crate::RuntimeEffectControllerError::new(
403 "runtime_effect_wrong_outcome",
404 format!("expected sleep outcome, got {}", other.kind().as_str()),
405 )),
406 }
407 }
408
409 pub async fn await_process_signal_event(
410 &self,
411 process_id: &str,
412 signal_name: &str,
413 event_ordinal: u64,
414 ) -> Result<serde_json::Value, crate::RuntimeEffectControllerError> {
415 let cancellation = self.cancellation_token.clone().unwrap_or_default();
416 let key = self
417 .dispatch
418 .effect_controller
419 .controller()
420 .await_event_key(
421 &crate::ExecutionScope::process(process_id),
422 crate::AwaitEventWaitIdentity::process_signal(
423 process_id,
424 signal_name,
425 event_ordinal,
426 ),
427 )
428 .await?;
429 let invocation = crate::runtime::causal::process_await_event_invocation(
430 &self.session_id,
431 self.parent_invocation.as_ref(),
432 process_id,
433 signal_name,
434 event_ordinal,
435 );
436 let outcome = self
437 .dispatch
438 .effect_controller
439 .controller()
440 .execute_effect(
441 crate::RuntimeEffectEnvelope::new(
442 invocation,
443 crate::RuntimeEffectCommand::AwaitEvent { key },
444 ),
445 crate::RuntimeEffectLocalExecutor::await_event_with_clock(
446 cancellation,
447 None,
448 std::sync::Arc::clone(&self.dispatch.clock),
449 ),
450 )
451 .await?;
452 match outcome.into_await_event()? {
453 crate::Resolution::Ok(value) => Ok(value),
454 crate::Resolution::Err(err) => Err(crate::RuntimeEffectControllerError::new(
455 err.code,
456 err.message,
457 )),
458 crate::Resolution::Timeout => Err(crate::RuntimeEffectControllerError::new(
459 "process_signal_wait_timeout",
460 "process signal wait timed out",
461 )),
462 crate::Resolution::Cancelled => Err(crate::RuntimeEffectControllerError::new(
463 "process_signal_wait_cancelled",
464 "process signal wait was cancelled",
465 )),
466 }
467 }
468
469 pub async fn signal_process_by_id(
470 &self,
471 registry: Arc<dyn crate::ProcessRegistry>,
472 process_id: &str,
473 signal_name: &str,
474 signal_id: String,
475 payload: serde_json::Value,
476 ) -> Result<crate::ProcessEvent, crate::RuntimeEffectControllerError> {
477 let event_type = crate::process_signal_event_type(signal_name)?;
478 let replay_key = format!("process:{process_id}:signal.{signal_name}:{signal_id}");
479 let signal_payload = payload.clone();
480 let command = crate::ProcessCommand::Signal {
481 process_id: process_id.to_string(),
482 signal_name: signal_name.to_string(),
483 signal_id,
484 request: crate::ProcessEventAppendRequest::new(event_type.clone(), payload)
485 .with_replay_key(replay_key),
486 };
487 let effect_id = command.effect_id();
488 let invocation = crate::runtime::causal::process_effect_invocation(
489 &self.session_id,
490 self.parent_invocation.clone(),
491 &effect_id,
492 );
493 let outcome = self
494 .dispatch
495 .effect_controller
496 .controller()
497 .execute_effect(
498 crate::RuntimeEffectEnvelope::new(
499 invocation,
500 crate::RuntimeEffectCommand::process(command),
501 ),
502 crate::RuntimeEffectLocalExecutor::processes(Arc::clone(®istry)),
503 )
504 .await?;
505 match outcome.into_process()? {
506 crate::ProcessEffectOutcome::Signal { event } => {
507 let waiting_ordinal =
508 registry
509 .get_process(process_id)
510 .await
511 .and_then(|record| match record.wait {
512 Some(crate::WaitState {
513 kind:
514 crate::WaitKind::Signal {
515 name,
516 event_type: wait_event_type,
517 ordinal,
518 ..
519 },
520 ..
521 }) if name == signal_name && wait_event_type == event_type => {
522 Some(ordinal)
523 }
524 _ => None,
525 });
526 let ordinal = match waiting_ordinal {
527 Some(ordinal) => ordinal,
528 None => {
529 registry
530 .count_events_through(process_id, &event_type, event.sequence)
531 .await?
532 }
533 };
534 if ordinal > 0 {
535 let key = self
536 .dispatch
537 .effect_controller
538 .controller()
539 .await_event_key(
540 &crate::ExecutionScope::process(process_id),
541 crate::AwaitEventWaitIdentity::process_signal(
542 process_id,
543 signal_name,
544 ordinal,
545 ),
546 )
547 .await?;
548 let _ = self
549 .dispatch
550 .effect_controller
551 .controller()
552 .resolve_await_event(&key, crate::Resolution::Ok(signal_payload))
553 .await?;
554 }
555 Ok(event)
556 }
557 other => Err(crate::RuntimeEffectControllerError::new(
558 "runtime_effect_wrong_outcome",
559 format!("expected signal outcome, got {other:?}"),
560 )),
561 }
562 }
563
564 pub async fn append_process_event(
565 &self,
566 registry: Arc<dyn crate::ProcessRegistry>,
567 process_id: &str,
568 request: crate::ProcessEventAppendRequest,
569 ) -> Result<crate::ProcessEvent, crate::PluginError> {
570 let result = registry.append_event(process_id, request).await?;
571 if let Some(context) = self.process_event_context.as_ref() {
572 crate::tool_provider::process_events::enqueue_wake_delivery(
573 context.store.clone(),
574 context.session_store_factory.as_ref(),
575 result.wake_delivery,
576 Some(self.session_graph_service()),
577 context.queued_work_driver.as_ref(),
578 )
579 .await?;
580 }
581 Ok(result.event)
582 }
583}
584
585#[cfg(test)]
586mod tests {
587 use super::*;
588 use crate::tool_dispatch::ToolDispatchContext;
589 use crate::{ToolCall, ToolProvider, ToolResult};
590
591 struct NoopTools;
592
593 #[async_trait::async_trait]
594 impl ToolProvider for NoopTools {
595 fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
596 Vec::new()
597 }
598
599 fn resolve_contract(&self, _name: &str) -> Option<Arc<crate::ToolContract>> {
600 None
601 }
602
603 async fn execute(&self, _call: ToolCall<'_>) -> ToolResult {
604 ToolResult::err_fmt("not used")
605 }
606 }
607
608 #[test]
609 fn tool_argument_projection_policy_resolves_from_active_catalog_and_defaults_unknown() {
610 let tool = crate::ToolDefinition::raw(
611 "tool:seedy",
612 "seedy",
613 "Seed-aware",
614 crate::ToolDefinition::default_input_schema(),
615 serde_json::json!({ "type": "string" }),
616 )
617 .with_argument_projection(
618 crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed"),
619 );
620 let plugins = crate::plugin::PluginHost::empty()
621 .build_session("session", None)
622 .expect("plugin session");
623 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
624 let dispatch = Arc::new(ToolDispatchContext {
625 plugins,
626 tools: Arc::new(NoopTools),
627 tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
628 vec![tool.manifest()],
629 std::collections::BTreeMap::new(),
630 )),
631 sessions: Arc::new(crate::testing::MockSessionManager::default()),
632 session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
633 session_graph: Arc::new(crate::testing::MockSessionManager::default()),
634 processes: Arc::new(crate::UnavailableProcessService),
635 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
636 trigger_router: None,
637 effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
638 crate::InlineRuntimeEffectController,
639 )),
640 direct_completions: crate::DirectCompletionClient::unavailable(
641 "direct completions are unavailable in this test context",
642 ),
643 parent_invocation: None,
644 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
645 crate::PluginOptions::default(),
646 crate::SessionPolicy::default(),
647 ),
648 session_id: "session".to_string(),
649 agent_frame_id: String::new(),
650 event_tx,
651 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
652 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
653 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
654 turn_context: crate::TurnContext::default(),
655 clock: std::sync::Arc::new(crate::SystemClock),
656 });
657 let ctx = RuntimeExecutionContext::new(
658 "session".to_string(),
659 dispatch,
660 Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
661 Arc::new(crate::InMemoryAttachmentStore::new()),
662 Arc::new(crate::ChronologicalProjection::default()),
663 None,
664 crate::TurnContext::default(),
665 );
666
667 assert_eq!(
668 ctx.tool_argument_projection_policy("seedy"),
669 crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed")
670 );
671 assert_eq!(
672 ctx.tool_argument_projection_policy("missing"),
673 crate::ToolArgumentProjectionPolicy::MaterializeProjectedValues
674 );
675 }
676}