use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio_util::sync::CancellationToken;
use crate::tool_dispatch::ToolDispatchContext;
use crate::{TurnActivity, TurnActivityId, TurnEvent};
#[derive(Clone)]
pub struct RuntimeExecutionContext<'run> {
pub(super) session_id: String,
pub(super) dispatch: Arc<ToolDispatchContext<'run>>,
process_env_store: Arc<dyn crate::ProcessExecutionEnvStore>,
attachment_store: Arc<dyn crate::AttachmentStore>,
chronological_projection: Arc<crate::ChronologicalProjection>,
protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
turn_context: crate::TurnContext,
execution_env_spec: crate::ProcessExecutionEnvSpec,
process_originator: Option<crate::ProcessOriginator>,
pub(super) runtime_process_id: Option<String>,
pub(super) process_event_context: Option<RuntimeExecutionProcessEventContext>,
process_env_ref: Option<crate::ProcessExecutionEnvRef>,
process_wake_target: Option<crate::SessionScope>,
pub(super) parent_invocation: Option<crate::RuntimeInvocation>,
turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
pub(super) turn_event_tx: Option<Sender<TurnActivity>>,
pub(super) cancellation_token: Option<CancellationToken>,
started_process_ids: Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
}
#[derive(Clone)]
pub(super) struct RuntimeExecutionProcessEventContext {
pub process_id: String,
pub registry: Arc<dyn crate::ProcessRegistry>,
pub store: Option<Arc<dyn crate::RuntimePersistence>>,
pub session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
pub queued_work_poke: Option<crate::QueuedWorkPoke>,
}
impl<'run> RuntimeExecutionContext<'run> {
pub(crate) fn drain_tool_trigger_outcomes(
&self,
) -> Result<Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>, crate::PluginError> {
self.dispatch
.trigger_outcomes
.drain()
.map_err(crate::PluginError::Session)
}
pub(super) fn process_scope(
&self,
parent_invocation: Option<crate::RuntimeInvocation>,
) -> crate::ProcessOpScope<'_> {
crate::ProcessOpScope::new(self.dispatch.effect_controller.scoped())
.with_parent_invocation(parent_invocation)
.with_agent_frame_id(Some(self.dispatch.agent_frame_id.clone()))
}
#[allow(
clippy::too_many_arguments,
reason = "code execution bridge carries explicit per-turn runtime dependencies"
)]
pub(crate) fn new(
session_id: String,
dispatch: Arc<ToolDispatchContext<'run>>,
process_env_store: Arc<dyn crate::ProcessExecutionEnvStore>,
attachment_store: Arc<dyn crate::AttachmentStore>,
chronological_projection: Arc<crate::ChronologicalProjection>,
protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
turn_context: crate::TurnContext,
) -> Self {
Self {
session_id,
dispatch,
process_env_store,
attachment_store,
chronological_projection,
protocol_extension,
turn_context,
execution_env_spec: crate::ProcessExecutionEnvSpec::new(
crate::PluginOptions::default(),
crate::SessionPolicy::default(),
),
process_originator: None,
runtime_process_id: None,
process_event_context: None,
started_process_ids: Arc::default(),
process_env_ref: None,
process_wake_target: None,
parent_invocation: None,
turn_phase_probe: None,
turn_event_tx: None,
cancellation_token: None,
}
}
pub fn session_id(&self) -> &str {
&self.session_id
}
pub fn session_scope(&self) -> crate::SessionScope {
self.dispatch
.agent_frame_id
.as_str()
.is_empty()
.then(|| crate::SessionScope::new(self.session_id.clone()))
.unwrap_or_else(|| {
crate::SessionScope::for_agent_frame(
self.session_id.clone(),
self.dispatch.agent_frame_id.clone(),
)
})
}
pub fn trigger_store(&self) -> Option<Arc<dyn crate::TriggerStore>> {
self.dispatch
.trigger_router
.as_ref()
.map(crate::TriggerRouter::store)
}
pub fn trigger_registration_originator(&self) -> crate::ProcessOriginator {
self.process_originator
.clone()
.unwrap_or_else(|| crate::ProcessOriginator::session(self.session_scope()))
}
pub fn trigger_registration_wake_target(&self) -> Option<crate::SessionScope> {
self.process_wake_target
.clone()
.or_else(|| Some(self.session_scope()))
}
pub fn attachment_store(&self) -> Arc<dyn crate::AttachmentStore> {
Arc::clone(&self.attachment_store)
}
pub fn process_env_store(&self) -> Arc<dyn crate::ProcessExecutionEnvStore> {
Arc::clone(&self.process_env_store)
}
pub fn chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
Arc::clone(&self.chronological_projection)
}
pub fn protocol_extension<T: 'static>(&self) -> Option<&T> {
self.protocol_extension
.as_ref()
.and_then(|extension| extension.as_any().downcast_ref::<T>())
}
pub fn turn_context(&self) -> &crate::TurnContext {
&self.turn_context
}
pub fn tool_catalog(&self) -> Arc<crate::ToolCatalog> {
Arc::clone(&self.dispatch.tool_catalog)
}
pub(crate) fn session_graph_service(&self) -> &dyn crate::plugin::SessionGraphService {
self.dispatch.session_graph.as_ref()
}
pub(super) async fn emit_turn_activity(
&self,
correlation_id: TurnActivityId,
event: TurnEvent,
) {
if let Some(tx) = &self.turn_event_tx {
let _ = tx.send(TurnActivity::new(correlation_id, event)).await;
}
}
pub(crate) fn with_turn_event_sender(mut self, turn_event_tx: Sender<TurnActivity>) -> Self {
self.turn_event_tx = Some(turn_event_tx);
self
}
pub(crate) fn with_parent_invocation(mut self, metadata: crate::RuntimeInvocation) -> Self {
self.parent_invocation = Some(metadata);
self
}
pub(crate) fn with_execution_env_spec(
mut self,
execution_env_spec: crate::ProcessExecutionEnvSpec,
) -> Self {
self.execution_env_spec = execution_env_spec;
self
}
pub(crate) fn with_process_registration_context(
mut self,
registration: &crate::ProcessRegistration,
) -> Self {
self.process_originator = Some(registration.provenance.originator.clone());
self.runtime_process_id = Some(registration.id.clone());
self.process_env_ref = registration.env_ref.clone();
self.process_wake_target = registration.wake_target.clone();
self
}
pub(crate) fn with_process_event_context(
mut self,
process_id: impl Into<String>,
registry: Arc<dyn crate::ProcessRegistry>,
store: Option<Arc<dyn crate::RuntimePersistence>>,
session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
queued_work_poke: Option<crate::QueuedWorkPoke>,
) -> Self {
self.process_event_context = Some(RuntimeExecutionProcessEventContext {
process_id: process_id.into(),
registry,
store,
session_store_factory,
queued_work_poke,
});
self
}
pub(super) fn record_started_process(&self, process_id: &str) {
self.started_process_ids
.lock()
.expect("started process ids lock")
.insert(process_id.to_string());
}
pub(super) fn is_run_local_process(&self, process_id: &str) -> bool {
self.started_process_ids
.lock()
.expect("started process ids lock")
.contains(process_id)
}
pub(crate) fn process_spawn_provenance(&self) -> Option<crate::ProcessSpawnProvenance> {
self.process_originator
.clone()
.map(|originator| crate::ProcessSpawnProvenance {
originator,
wake_target: self.process_wake_target.clone(),
})
}
pub(super) async fn attach_captured_process_execution_env(
&self,
registration: crate::ProcessRegistration,
) -> Result<crate::ProcessRegistration, crate::PluginError> {
if registration.env_ref.is_some() {
return Ok(registration);
}
match registration.input.as_ref() {
crate::ProcessInput::ToolCall { .. } | crate::ProcessInput::Engine { .. } => {
let env_ref = self.captured_process_execution_env_ref().await?;
Ok(registration.with_execution_env_ref(Some(env_ref)))
}
crate::ProcessInput::External { .. } | crate::ProcessInput::SessionTurn { .. } => {
Ok(registration)
}
}
}
pub async fn captured_process_execution_env_ref(
&self,
) -> Result<crate::ProcessExecutionEnvRef, crate::PluginError> {
if let Some(env_ref) = self.process_env_ref.clone() {
return Ok(env_ref);
}
crate::persist_process_execution_env(
self.process_env_store.as_ref(),
&self.execution_env_spec,
)
.await
}
pub(crate) fn with_turn_phase_probe(
mut self,
probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
) -> Self {
self.turn_phase_probe = probe;
self
}
#[doc(hidden)]
pub fn named_phase(&self, phase: &'static str) -> crate::runtime::RuntimeNamedPhase {
crate::runtime::RuntimeNamedPhase::begin(self.turn_phase_probe.clone(), phase)
}
pub fn parent_invocation(&self) -> Option<&crate::RuntimeInvocation> {
self.parent_invocation.as_ref()
}
pub(crate) fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
self.cancellation_token = Some(cancellation_token);
self
}
pub(crate) fn tool_scheduling(&self, name: &str) -> crate::ToolScheduling {
crate::tool_dispatch::resolve_tool_scheduling(&self.dispatch, name)
}
pub fn callable_tool_manifest(&self, name: &str) -> Option<crate::ToolManifest> {
crate::tool_dispatch::resolve_callable_manifest(&self.dispatch, name)
}
pub fn callable_tool_manifest_by_id(&self, id: &crate::ToolId) -> Option<crate::ToolManifest> {
crate::tool_dispatch::resolve_callable_manifest_by_id(&self.dispatch, id)
}
pub fn tool_argument_projection_policy(
&self,
name: &str,
) -> crate::ToolArgumentProjectionPolicy {
crate::tool_dispatch::resolve_tool_argument_projection_policy(&self.dispatch, name)
}
pub async fn start_child_process(
&self,
registration: crate::ProcessRegistration,
kind: impl Into<String>,
label: Option<String>,
) -> crate::ToolInvocationReply {
let _phase = self.named_phase("process.start_child");
let registration = match self
.attach_captured_process_execution_env(registration)
.await
{
Ok(registration) => registration,
Err(err) => {
return crate::ToolInvocationReply::error(serde_json::json!(err.to_string()));
}
};
let process_id = registration.id.clone();
let mut options = crate::ProcessStartOptions::new()
.with_descriptor(crate::ProcessHandleDescriptor::new(Some(kind), label));
if let Some(spawn) = self.process_spawn_provenance() {
options = options.with_spawn_provenance(spawn);
}
match self
.dispatch
.processes
.start(
&self.session_id,
registration,
options,
self.process_scope(self.parent_invocation.clone()),
)
.await
{
Ok(_) => {
self.record_started_process(&process_id);
crate::ToolInvocationReply::success(Self::process_handle_json(&process_id))
}
Err(err) => crate::ToolInvocationReply::error(serde_json::json!(err.to_string())),
}
}
pub async fn sleep_process(
&self,
scope: &str,
sequence: u64,
duration_ms: u64,
) -> Result<(), crate::RuntimeEffectControllerError> {
let cancellation = self.cancellation_token.clone().unwrap_or_default();
let invocation = crate::runtime::causal::process_sleep_invocation(
&self.session_id,
self.parent_invocation.as_ref(),
scope,
sequence,
);
let outcome = self
.dispatch
.effect_controller
.controller()
.execute_effect(
crate::RuntimeEffectEnvelope::new(
invocation,
crate::RuntimeEffectCommand::Sleep { duration_ms },
),
crate::RuntimeEffectLocalExecutor::sleep(cancellation),
)
.await?;
match outcome {
crate::RuntimeEffectOutcome::Sleep => Ok(()),
other => Err(crate::RuntimeEffectControllerError::new(
"runtime_effect_wrong_outcome",
format!("expected sleep outcome, got {}", other.kind().as_str()),
)),
}
}
pub async fn await_process_signal_event(
&self,
process_id: &str,
signal_name: &str,
event_ordinal: u64,
) -> Result<serde_json::Value, crate::RuntimeEffectControllerError> {
let cancellation = self.cancellation_token.clone().unwrap_or_default();
let key = self
.dispatch
.effect_controller
.controller()
.await_event_key(
&crate::ExecutionScope::process(process_id),
crate::AwaitEventWaitIdentity::process_signal(
process_id,
signal_name,
event_ordinal,
),
)
.await?;
let invocation = crate::runtime::causal::process_await_event_invocation(
&self.session_id,
self.parent_invocation.as_ref(),
process_id,
signal_name,
event_ordinal,
);
let outcome = self
.dispatch
.effect_controller
.controller()
.execute_effect(
crate::RuntimeEffectEnvelope::new(
invocation,
crate::RuntimeEffectCommand::AwaitEvent { key },
),
crate::RuntimeEffectLocalExecutor::await_event(cancellation, None),
)
.await?;
match outcome.into_await_event()? {
crate::Resolution::Ok(value) => Ok(value),
crate::Resolution::Err(err) => Err(crate::RuntimeEffectControllerError::new(
err.code,
err.message,
)),
crate::Resolution::Timeout => Err(crate::RuntimeEffectControllerError::new(
"process_signal_wait_timeout",
"process signal wait timed out",
)),
crate::Resolution::Cancelled => Err(crate::RuntimeEffectControllerError::new(
"process_signal_wait_cancelled",
"process signal wait was cancelled",
)),
}
}
pub async fn signal_process_by_id(
&self,
registry: Arc<dyn crate::ProcessRegistry>,
process_id: &str,
signal_name: &str,
signal_id: String,
payload: serde_json::Value,
) -> Result<crate::ProcessEvent, crate::RuntimeEffectControllerError> {
let event_type = crate::process_signal_event_type(signal_name)?;
let replay_key = format!("process:{process_id}:signal.{signal_name}:{signal_id}");
let signal_payload = payload.clone();
let command = crate::ProcessCommand::Signal {
process_id: process_id.to_string(),
signal_name: signal_name.to_string(),
signal_id,
request: crate::ProcessEventAppendRequest::new(event_type.clone(), payload)
.with_replay_key(replay_key),
};
let effect_id = command.effect_id();
let invocation = crate::runtime::causal::process_effect_invocation(
&self.session_id,
self.parent_invocation.clone(),
&effect_id,
);
let outcome = self
.dispatch
.effect_controller
.controller()
.execute_effect(
crate::RuntimeEffectEnvelope::new(
invocation,
crate::RuntimeEffectCommand::process(command),
),
crate::RuntimeEffectLocalExecutor::processes(Arc::clone(®istry)),
)
.await?;
match outcome.into_process()? {
crate::ProcessEffectOutcome::Signal { event } => {
let waiting_ordinal =
registry
.get_process(process_id)
.await
.and_then(|record| match record.wait {
Some(crate::WaitState {
kind:
crate::WaitKind::Signal {
name,
event_type: wait_event_type,
ordinal,
..
},
..
}) if name == signal_name && wait_event_type == event_type => {
Some(ordinal)
}
_ => None,
});
let ordinal = match waiting_ordinal {
Some(ordinal) => ordinal,
None => {
registry
.count_events_through(process_id, &event_type, event.sequence)
.await?
}
};
if ordinal > 0 {
let key = self
.dispatch
.effect_controller
.controller()
.await_event_key(
&crate::ExecutionScope::process(process_id),
crate::AwaitEventWaitIdentity::process_signal(
process_id,
signal_name,
ordinal,
),
)
.await?;
let _ = self
.dispatch
.effect_controller
.controller()
.resolve_await_event(&key, crate::Resolution::Ok(signal_payload))
.await?;
}
Ok(event)
}
other => Err(crate::RuntimeEffectControllerError::new(
"runtime_effect_wrong_outcome",
format!("expected signal outcome, got {other:?}"),
)),
}
}
pub async fn append_process_event(
&self,
registry: Arc<dyn crate::ProcessRegistry>,
process_id: &str,
request: crate::ProcessEventAppendRequest,
) -> Result<crate::ProcessEvent, crate::PluginError> {
let result = registry.append_event(process_id, request).await?;
if let Some(context) = self.process_event_context.as_ref() {
crate::tool_provider::process_events::enqueue_wake_delivery(
context.store.clone(),
context.session_store_factory.as_ref(),
result.wake_delivery,
Some(self.session_graph_service()),
context.queued_work_poke.as_ref(),
)
.await?;
}
Ok(result.event)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tool_dispatch::ToolDispatchContext;
use crate::{ToolCall, ToolProvider, ToolResult};
struct NoopTools;
#[async_trait::async_trait]
impl ToolProvider for NoopTools {
fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
Vec::new()
}
fn resolve_contract(&self, _name: &str) -> Option<Arc<crate::ToolContract>> {
None
}
async fn execute(&self, _call: ToolCall<'_>) -> ToolResult {
ToolResult::err_fmt("not used")
}
}
#[test]
fn tool_argument_projection_policy_resolves_from_active_catalog_and_defaults_unknown() {
let tool = crate::ToolDefinition::raw(
"tool:seedy",
"seedy",
"Seed-aware",
crate::ToolDefinition::default_input_schema(),
serde_json::json!({ "type": "string" }),
)
.with_argument_projection(
crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed"),
);
let plugins = crate::plugin::PluginHost::empty()
.build_session("session", None)
.expect("plugin session");
let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
let dispatch = Arc::new(ToolDispatchContext {
plugins,
tools: Arc::new(NoopTools),
tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
vec![tool.manifest()],
std::collections::BTreeMap::new(),
)),
sessions: Arc::new(crate::testing::MockSessionManager::default()),
session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
session_graph: Arc::new(crate::testing::MockSessionManager::default()),
processes: Arc::new(crate::UnavailableProcessService),
process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
trigger_router: None,
effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
crate::InlineRuntimeEffectController,
)),
direct_completions: crate::DirectCompletionClient::unavailable(
"direct completions are unavailable in this test context",
),
parent_invocation: None,
execution_env_spec: crate::ProcessExecutionEnvSpec::new(
crate::PluginOptions::default(),
crate::SessionPolicy::default(),
),
session_id: "session".to_string(),
agent_frame_id: String::new(),
event_tx,
checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
turn_context: crate::TurnContext::default(),
});
let ctx = RuntimeExecutionContext::new(
"session".to_string(),
dispatch,
Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
Arc::new(crate::InMemoryAttachmentStore::new()),
Arc::new(crate::ChronologicalProjection::default()),
None,
crate::TurnContext::default(),
);
assert_eq!(
ctx.tool_argument_projection_policy("seedy"),
crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed")
);
assert_eq!(
ctx.tool_argument_projection_policy("missing"),
crate::ToolArgumentProjectionPolicy::MaterializeProjectedValues
);
}
}