1use std::sync::Arc;
2
3use tokio::sync::mpsc::Sender;
4use tokio_util::sync::CancellationToken;
5
6use crate::tool_dispatch::ToolDispatchContext;
7use crate::{TurnActivity, TurnActivityId, TurnEvent};
8
9#[derive(Clone)]
10pub struct RuntimeExecutionContext<'run> {
11 pub(super) session_id: String,
12 pub(super) dispatch: Arc<ToolDispatchContext<'run>>,
13 process_env_store: Arc<dyn crate::ProcessExecutionEnvStore>,
14 attachment_store: Arc<dyn crate::AttachmentStore>,
15 chronological_projection: Arc<crate::ChronologicalProjection>,
16 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
17 turn_context: crate::TurnContext,
18 execution_env_spec: crate::ProcessExecutionEnvSpec,
19 process_originator: Option<crate::ProcessOriginator>,
20 pub(super) runtime_process_id: Option<String>,
21 pub(super) process_event_context: Option<RuntimeExecutionProcessEventContext>,
22 process_env_ref: Option<crate::ProcessExecutionEnvRef>,
23 process_wake_target: Option<crate::SessionScope>,
24 pub(super) parent_invocation: Option<crate::RuntimeInvocation>,
25 turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
26 pub(super) turn_event_tx: Option<Sender<TurnActivity>>,
27 pub(super) cancellation_token: Option<CancellationToken>,
28 started_process_ids: Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
33}
34
35#[derive(Clone)]
36pub(super) struct RuntimeExecutionProcessEventContext {
37 pub process_id: String,
38 pub registry: Arc<dyn crate::ProcessRegistry>,
39 pub store: Option<Arc<dyn crate::RuntimePersistence>>,
40 pub session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
41 pub queued_work_driver: Option<crate::QueuedWorkDriver>,
42}
43
44impl<'run> RuntimeExecutionContext<'run> {
45 pub(crate) fn drain_tool_trigger_outcomes(
46 &self,
47 ) -> Result<Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>, crate::PluginError> {
48 self.dispatch
49 .trigger_outcomes
50 .drain()
51 .map_err(crate::PluginError::Session)
52 }
53
54 pub(super) fn process_scope(
55 &self,
56 parent_invocation: Option<crate::RuntimeInvocation>,
57 ) -> crate::ProcessOpScope<'_> {
58 crate::ProcessOpScope::new(self.dispatch.effect_controller.scoped())
59 .with_parent_invocation(parent_invocation)
60 .with_agent_frame_id(Some(self.dispatch.agent_frame_id.clone()))
61 }
62
63 #[allow(
64 clippy::too_many_arguments,
65 reason = "code execution bridge carries explicit per-turn runtime dependencies"
66 )]
67 pub(crate) fn new(
68 session_id: String,
69 dispatch: Arc<ToolDispatchContext<'run>>,
70 process_env_store: Arc<dyn crate::ProcessExecutionEnvStore>,
71 attachment_store: Arc<dyn crate::AttachmentStore>,
72 chronological_projection: Arc<crate::ChronologicalProjection>,
73 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
74 turn_context: crate::TurnContext,
75 ) -> Self {
76 Self {
77 session_id,
78 dispatch,
79 process_env_store,
80 attachment_store,
81 chronological_projection,
82 protocol_extension,
83 turn_context,
84 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
85 crate::PluginOptions::default(),
86 crate::SessionPolicy::default(),
87 ),
88 process_originator: None,
89 runtime_process_id: None,
90 process_event_context: None,
91 started_process_ids: Arc::default(),
92 process_env_ref: None,
93 process_wake_target: None,
94 parent_invocation: None,
95 turn_phase_probe: None,
96 turn_event_tx: None,
97 cancellation_token: None,
98 }
99 }
100
101 pub fn session_id(&self) -> &str {
102 &self.session_id
103 }
104
105 pub fn execution_scope_id(&self) -> String {
106 self.dispatch
107 .effect_controller
108 .scoped()
109 .scope_id()
110 .to_string()
111 }
112
113 pub fn session_scope(&self) -> crate::SessionScope {
114 if self.dispatch.agent_frame_id.is_empty() {
115 crate::SessionScope::new(self.session_id.clone())
116 } else {
117 crate::SessionScope::for_agent_frame(
118 self.session_id.clone(),
119 self.dispatch.agent_frame_id.clone(),
120 )
121 }
122 }
123
124 pub fn trigger_store(&self) -> Option<Arc<dyn crate::TriggerStore>> {
125 self.dispatch
126 .trigger_router
127 .as_ref()
128 .map(crate::TriggerRouter::store)
129 }
130
131 pub fn trigger_registration_originator(&self) -> crate::ProcessOriginator {
132 self.process_originator
133 .clone()
134 .unwrap_or_else(|| crate::ProcessOriginator::session(self.session_scope()))
135 }
136
137 pub fn trigger_registration_wake_target(&self) -> Option<crate::SessionScope> {
138 self.process_wake_target
139 .clone()
140 .or_else(|| Some(self.session_scope()))
141 }
142
143 pub fn attachment_store(&self) -> Arc<dyn crate::AttachmentStore> {
144 Arc::clone(&self.attachment_store)
145 }
146
147 pub fn process_env_store(&self) -> Arc<dyn crate::ProcessExecutionEnvStore> {
148 Arc::clone(&self.process_env_store)
149 }
150
151 pub fn chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
152 Arc::clone(&self.chronological_projection)
153 }
154
155 pub fn protocol_extension<T: 'static>(&self) -> Option<&T> {
156 self.protocol_extension
157 .as_ref()
158 .and_then(|extension| extension.as_any().downcast_ref::<T>())
159 }
160
161 pub fn turn_context(&self) -> &crate::TurnContext {
162 &self.turn_context
163 }
164
165 pub fn tool_catalog(&self) -> Arc<crate::ToolCatalog> {
166 Arc::clone(&self.dispatch.tool_catalog)
167 }
168
169 pub(crate) fn session_graph_service(&self) -> &dyn crate::plugin::SessionGraphService {
170 self.dispatch.session_graph.as_ref()
171 }
172
173 pub(super) async fn emit_turn_activity(
174 &self,
175 correlation_id: TurnActivityId,
176 event: TurnEvent,
177 ) {
178 if let Some(tx) = &self.turn_event_tx {
179 let _ = tx.send(TurnActivity::new(correlation_id, event)).await;
180 }
181 }
182
183 pub(crate) fn with_turn_event_sender(mut self, turn_event_tx: Sender<TurnActivity>) -> Self {
184 self.turn_event_tx = Some(turn_event_tx);
185 self
186 }
187
188 pub(crate) fn with_parent_invocation(mut self, metadata: crate::RuntimeInvocation) -> Self {
189 self.parent_invocation = Some(metadata);
190 self
191 }
192
193 pub(crate) fn with_execution_env_spec(
194 mut self,
195 execution_env_spec: crate::ProcessExecutionEnvSpec,
196 ) -> Self {
197 self.execution_env_spec = execution_env_spec;
198 self
199 }
200
201 pub(crate) fn with_process_registration_context(
202 mut self,
203 registration: &crate::ProcessRegistration,
204 ) -> Self {
205 self.process_originator = Some(registration.provenance.originator.clone());
206 self.runtime_process_id = Some(registration.id.clone());
207 self.process_env_ref = registration.env_ref.clone();
208 self.process_wake_target = registration.wake_target.clone();
209 self
210 }
211
212 pub(crate) fn with_process_event_context(
213 mut self,
214 process_id: impl Into<String>,
215 registry: Arc<dyn crate::ProcessRegistry>,
216 store: Option<Arc<dyn crate::RuntimePersistence>>,
217 session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
218 queued_work_driver: Option<crate::QueuedWorkDriver>,
219 ) -> Self {
220 self.process_event_context = Some(RuntimeExecutionProcessEventContext {
221 process_id: process_id.into(),
222 registry,
223 store,
224 session_store_factory,
225 queued_work_driver,
226 });
227 self
228 }
229
230 pub(super) fn record_started_process(&self, process_id: &str) {
234 self.started_process_ids
235 .lock()
236 .expect("started process ids lock")
237 .insert(process_id.to_string());
238 }
239
240 pub(super) fn is_run_local_process(&self, process_id: &str) -> bool {
241 self.started_process_ids
242 .lock()
243 .expect("started process ids lock")
244 .contains(process_id)
245 }
246
247 pub(crate) fn process_spawn_provenance(&self) -> Option<crate::ProcessSpawnProvenance> {
248 self.process_originator
249 .clone()
250 .map(|originator| crate::ProcessSpawnProvenance {
251 originator,
252 wake_target: self.process_wake_target.clone(),
253 })
254 }
255
256 pub(super) async fn attach_captured_process_execution_env(
257 &self,
258 registration: crate::ProcessRegistration,
259 ) -> Result<crate::ProcessRegistration, crate::PluginError> {
260 if registration.env_ref.is_some() {
261 return Ok(registration);
262 }
263 match registration.input.as_ref() {
264 crate::ProcessInput::ToolCall { .. } | crate::ProcessInput::Engine { .. } => {
265 let env_ref = self.captured_process_execution_env_ref().await?;
266 Ok(registration.with_execution_env_ref(Some(env_ref)))
267 }
268 crate::ProcessInput::External { .. } | crate::ProcessInput::SessionTurn { .. } => {
269 Ok(registration)
270 }
271 }
272 }
273
274 pub async fn captured_process_execution_env_ref(
275 &self,
276 ) -> Result<crate::ProcessExecutionEnvRef, crate::PluginError> {
277 if let Some(env_ref) = self.process_env_ref.clone() {
278 return Ok(env_ref);
279 }
280 crate::persist_process_execution_env(
281 self.process_env_store.as_ref(),
282 &self.execution_env_spec,
283 )
284 .await
285 }
286
287 pub(crate) fn with_turn_phase_probe(
288 mut self,
289 probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
290 ) -> Self {
291 self.turn_phase_probe = probe;
292 self
293 }
294
295 #[doc(hidden)]
296 pub fn named_phase(&self, phase: &'static str) -> crate::runtime::RuntimeNamedPhase {
297 crate::runtime::RuntimeNamedPhase::begin(self.turn_phase_probe.clone(), phase)
298 }
299
300 pub fn parent_invocation(&self) -> Option<&crate::RuntimeInvocation> {
301 self.parent_invocation.as_ref()
302 }
303
304 pub(crate) fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
305 self.cancellation_token = Some(cancellation_token);
306 self
307 }
308
309 pub(crate) fn tool_scheduling(&self, name: &str) -> crate::ToolScheduling {
310 crate::tool_dispatch::resolve_tool_scheduling(&self.dispatch, name)
311 }
312
313 pub fn callable_tool_manifest(&self, name: &str) -> Option<crate::ToolManifest> {
314 crate::tool_dispatch::resolve_callable_manifest(&self.dispatch, name)
315 }
316
317 pub fn callable_tool_manifest_by_id(&self, id: &crate::ToolId) -> Option<crate::ToolManifest> {
318 crate::tool_dispatch::resolve_callable_manifest_by_id(&self.dispatch, id)
319 }
320
321 pub fn tool_argument_projection_policy(
322 &self,
323 name: &str,
324 ) -> crate::ToolArgumentProjectionPolicy {
325 crate::tool_dispatch::resolve_tool_argument_projection_policy(&self.dispatch, name)
326 }
327
328 pub async fn start_child_process(
329 &self,
330 registration: crate::ProcessRegistration,
331 kind: impl Into<String>,
332 label: Option<String>,
333 ) -> crate::ToolInvocationReply {
334 let _phase = self.named_phase("process.start_child");
335 let registration = match self
336 .attach_captured_process_execution_env(registration)
337 .await
338 {
339 Ok(registration) => registration,
340 Err(err) => {
341 return crate::ToolInvocationReply::error(serde_json::json!(err.to_string()));
342 }
343 };
344 let process_id = registration.id.clone();
345 let mut options = crate::ProcessStartOptions::new()
346 .with_descriptor(crate::ProcessHandleDescriptor::new(Some(kind), label));
347 if let Some(spawn) = self.process_spawn_provenance() {
348 options = options.with_spawn_provenance(spawn);
349 }
350 match self
351 .dispatch
352 .processes
353 .start(
354 &self.session_id,
355 registration,
356 options,
357 self.process_scope(self.parent_invocation.clone()),
358 )
359 .await
360 {
361 Ok(_) => {
362 self.record_started_process(&process_id);
363 crate::ToolInvocationReply::success(Self::process_handle_json(&process_id))
364 }
365 Err(err) => crate::ToolInvocationReply::error(serde_json::json!(err.to_string())),
366 }
367 }
368
369 pub async fn sleep_process(
370 &self,
371 scope: &str,
372 sequence: u64,
373 duration_ms: u64,
374 ) -> Result<(), crate::RuntimeEffectControllerError> {
375 let cancellation = self.cancellation_token.clone().unwrap_or_default();
376 let invocation = crate::runtime::causal::process_sleep_invocation(
377 &self.session_id,
378 self.parent_invocation.as_ref(),
379 scope,
380 sequence,
381 );
382 let outcome = self
383 .dispatch
384 .effect_controller
385 .controller()
386 .execute_effect(
387 crate::RuntimeEffectEnvelope::new(
388 invocation,
389 crate::RuntimeEffectCommand::Sleep { duration_ms },
390 ),
391 crate::RuntimeEffectLocalExecutor::sleep_with_clock(
392 cancellation,
393 std::sync::Arc::clone(&self.dispatch.clock),
394 ),
395 )
396 .await?;
397 match outcome {
398 crate::RuntimeEffectOutcome::Sleep => Ok(()),
399 other => Err(crate::RuntimeEffectControllerError::new(
400 "runtime_effect_wrong_outcome",
401 format!("expected sleep outcome, got {}", other.kind().as_str()),
402 )),
403 }
404 }
405
406 pub async fn await_process_signal_event(
407 &self,
408 process_id: &str,
409 signal_name: &str,
410 event_ordinal: u64,
411 ) -> Result<serde_json::Value, crate::RuntimeEffectControllerError> {
412 let cancellation = self.cancellation_token.clone().unwrap_or_default();
413 let key = self
414 .dispatch
415 .effect_controller
416 .controller()
417 .await_event_key(
418 &crate::ExecutionScope::process(process_id),
419 crate::AwaitEventWaitIdentity::process_signal(
420 process_id,
421 signal_name,
422 event_ordinal,
423 ),
424 )
425 .await?;
426 let invocation = crate::runtime::causal::process_await_event_invocation(
427 &self.session_id,
428 self.parent_invocation.as_ref(),
429 process_id,
430 signal_name,
431 event_ordinal,
432 );
433 let outcome = self
434 .dispatch
435 .effect_controller
436 .controller()
437 .execute_effect(
438 crate::RuntimeEffectEnvelope::new(
439 invocation,
440 crate::RuntimeEffectCommand::AwaitEvent { key },
441 ),
442 crate::RuntimeEffectLocalExecutor::await_event_with_clock(
443 cancellation,
444 None,
445 std::sync::Arc::clone(&self.dispatch.clock),
446 ),
447 )
448 .await?;
449 match outcome.into_await_event()? {
450 crate::Resolution::Ok(value) => Ok(value),
451 crate::Resolution::Err(err) => Err(crate::RuntimeEffectControllerError::new(
452 err.code,
453 err.message,
454 )),
455 crate::Resolution::Timeout => Err(crate::RuntimeEffectControllerError::new(
456 "process_signal_wait_timeout",
457 "process signal wait timed out",
458 )),
459 crate::Resolution::Cancelled => Err(crate::RuntimeEffectControllerError::new(
460 "process_signal_wait_cancelled",
461 "process signal wait was cancelled",
462 )),
463 }
464 }
465
466 pub async fn signal_process_by_id(
467 &self,
468 registry: Arc<dyn crate::ProcessRegistry>,
469 process_id: &str,
470 signal_name: &str,
471 signal_id: String,
472 payload: serde_json::Value,
473 ) -> Result<crate::ProcessEvent, crate::RuntimeEffectControllerError> {
474 let event_type = crate::process_signal_event_type(signal_name)?;
475 let replay_key = format!("process:{process_id}:signal.{signal_name}:{signal_id}");
476 let signal_payload = payload.clone();
477 let command = crate::ProcessCommand::Signal {
478 process_id: process_id.to_string(),
479 signal_name: signal_name.to_string(),
480 signal_id,
481 request: crate::ProcessEventAppendRequest::new(event_type.clone(), payload)
482 .with_replay_key(replay_key),
483 };
484 let effect_id = command.effect_id();
485 let invocation = crate::runtime::causal::process_effect_invocation(
486 &self.session_id,
487 self.parent_invocation.clone(),
488 &effect_id,
489 );
490 let outcome = self
491 .dispatch
492 .effect_controller
493 .controller()
494 .execute_effect(
495 crate::RuntimeEffectEnvelope::new(
496 invocation,
497 crate::RuntimeEffectCommand::process(command),
498 ),
499 crate::RuntimeEffectLocalExecutor::processes(Arc::clone(®istry)),
500 )
501 .await?;
502 match outcome.into_process()? {
503 crate::ProcessEffectOutcome::Signal { event } => {
504 let waiting_ordinal =
505 registry
506 .get_process(process_id)
507 .await
508 .and_then(|record| match record.wait {
509 Some(crate::WaitState {
510 kind:
511 crate::WaitKind::Signal {
512 name,
513 event_type: wait_event_type,
514 ordinal,
515 ..
516 },
517 ..
518 }) if name == signal_name && wait_event_type == event_type => {
519 Some(ordinal)
520 }
521 _ => None,
522 });
523 let ordinal = match waiting_ordinal {
524 Some(ordinal) => ordinal,
525 None => {
526 registry
527 .count_events_through(process_id, &event_type, event.sequence)
528 .await?
529 }
530 };
531 if ordinal > 0 {
532 let key = self
533 .dispatch
534 .effect_controller
535 .controller()
536 .await_event_key(
537 &crate::ExecutionScope::process(process_id),
538 crate::AwaitEventWaitIdentity::process_signal(
539 process_id,
540 signal_name,
541 ordinal,
542 ),
543 )
544 .await?;
545 let _ = self
546 .dispatch
547 .effect_controller
548 .controller()
549 .resolve_await_event(&key, crate::Resolution::Ok(signal_payload))
550 .await?;
551 }
552 Ok(event)
553 }
554 other => Err(crate::RuntimeEffectControllerError::new(
555 "runtime_effect_wrong_outcome",
556 format!("expected signal outcome, got {other:?}"),
557 )),
558 }
559 }
560
561 pub async fn append_process_event(
562 &self,
563 registry: Arc<dyn crate::ProcessRegistry>,
564 process_id: &str,
565 request: crate::ProcessEventAppendRequest,
566 ) -> Result<crate::ProcessEvent, crate::PluginError> {
567 let result = registry.append_event(process_id, request).await?;
568 if let Some(context) = self.process_event_context.as_ref() {
569 crate::tool_provider::process_events::enqueue_wake_delivery(
570 context.store.clone(),
571 context.session_store_factory.as_ref(),
572 result.wake_delivery,
573 Some(self.session_graph_service()),
574 context.queued_work_driver.as_ref(),
575 )
576 .await?;
577 }
578 Ok(result.event)
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585 use crate::tool_dispatch::ToolDispatchContext;
586 use crate::{ToolCall, ToolProvider, ToolResult};
587
588 struct NoopTools;
589
590 #[async_trait::async_trait]
591 impl ToolProvider for NoopTools {
592 fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
593 Vec::new()
594 }
595
596 fn resolve_contract(&self, _name: &str) -> Option<Arc<crate::ToolContract>> {
597 None
598 }
599
600 async fn execute(&self, _call: ToolCall<'_>) -> ToolResult {
601 ToolResult::err_fmt("not used")
602 }
603 }
604
605 #[test]
606 fn tool_argument_projection_policy_resolves_from_active_catalog_and_defaults_unknown() {
607 let tool = crate::ToolDefinition::raw(
608 "tool:seedy",
609 "seedy",
610 "Seed-aware",
611 crate::ToolDefinition::default_input_schema(),
612 serde_json::json!({ "type": "string" }),
613 )
614 .with_argument_projection(
615 crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed"),
616 );
617 let plugins = crate::plugin::PluginHost::empty()
618 .build_session("session", None)
619 .expect("plugin session");
620 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
621 let dispatch = Arc::new(ToolDispatchContext {
622 plugins,
623 tools: Arc::new(NoopTools),
624 tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
625 vec![tool.manifest()],
626 std::collections::BTreeMap::new(),
627 )),
628 sessions: Arc::new(crate::testing::MockSessionManager::default()),
629 session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
630 session_graph: Arc::new(crate::testing::MockSessionManager::default()),
631 processes: Arc::new(crate::UnavailableProcessService),
632 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
633 trigger_router: None,
634 effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
635 crate::InlineRuntimeEffectController,
636 )),
637 direct_completions: crate::DirectCompletionClient::unavailable(
638 "direct completions are unavailable in this test context",
639 ),
640 parent_invocation: None,
641 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
642 crate::PluginOptions::default(),
643 crate::SessionPolicy::default(),
644 ),
645 session_id: "session".to_string(),
646 agent_frame_id: String::new(),
647 event_tx,
648 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
649 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
650 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
651 turn_context: crate::TurnContext::default(),
652 clock: std::sync::Arc::new(crate::SystemClock),
653 });
654 let ctx = RuntimeExecutionContext::new(
655 "session".to_string(),
656 dispatch,
657 Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
658 Arc::new(crate::InMemoryAttachmentStore::new()),
659 Arc::new(crate::ChronologicalProjection::default()),
660 None,
661 crate::TurnContext::default(),
662 );
663
664 assert_eq!(
665 ctx.tool_argument_projection_policy("seedy"),
666 crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed")
667 );
668 assert_eq!(
669 ctx.tool_argument_projection_policy("missing"),
670 crate::ToolArgumentProjectionPolicy::MaterializeProjectedValues
671 );
672 }
673}