Skip to main content

lash_core/session/
execution_context.rs

1use std::sync::Arc;
2
3use tokio::sync::mpsc::Sender;
4use tokio_util::sync::CancellationToken;
5
6use crate::tool_dispatch::ToolDispatchContext;
7use crate::{TurnActivity, TurnActivityId, TurnEvent};
8
9#[derive(Clone)]
10pub struct RuntimeExecutionContext<'run> {
11    pub(super) session_id: String,
12    pub(super) dispatch: Arc<ToolDispatchContext<'run>>,
13    process_env_store: Arc<dyn crate::ProcessExecutionEnvStore>,
14    attachment_store: Arc<dyn crate::AttachmentStore>,
15    chronological_projection: Arc<crate::ChronologicalProjection>,
16    protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
17    turn_context: crate::TurnContext,
18    execution_env_spec: crate::ProcessExecutionEnvSpec,
19    process_originator: Option<crate::ProcessOriginator>,
20    pub(super) runtime_process_id: Option<String>,
21    pub(super) process_event_context: Option<RuntimeExecutionProcessEventContext>,
22    process_env_ref: Option<crate::ProcessExecutionEnvRef>,
23    process_wake_target: Option<crate::SessionScope>,
24    pub(super) parent_invocation: Option<crate::RuntimeInvocation>,
25    turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
26    pub(super) turn_event_tx: Option<Sender<TurnActivity>>,
27    pub(super) cancellation_token: Option<CancellationToken>,
28    /// Process ids started by THIS execution context. Possession of a handle
29    /// the run itself created is sufficient capability to await/cancel it —
30    /// run-local children are not session handle grants (the ephemeral
31    /// execution scope must never appear in durable grant state).
32    started_process_ids: Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
33}
34
35#[derive(Clone)]
36pub(super) struct RuntimeExecutionProcessEventContext {
37    pub process_id: String,
38    pub registry: Arc<dyn crate::ProcessRegistry>,
39    pub store: Option<Arc<dyn crate::RuntimePersistence>>,
40    pub session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
41    pub queued_work_poke: Option<crate::QueuedWorkPoke>,
42}
43
44impl<'run> RuntimeExecutionContext<'run> {
45    pub(crate) fn drain_tool_trigger_outcomes(
46        &self,
47    ) -> Result<Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>, crate::PluginError> {
48        self.dispatch
49            .trigger_outcomes
50            .drain()
51            .map_err(crate::PluginError::Session)
52    }
53
54    pub(super) fn process_scope(
55        &self,
56        parent_invocation: Option<crate::RuntimeInvocation>,
57    ) -> crate::ProcessOpScope<'_> {
58        crate::ProcessOpScope::new(self.dispatch.effect_controller.scoped())
59            .with_parent_invocation(parent_invocation)
60            .with_agent_frame_id(Some(self.dispatch.agent_frame_id.clone()))
61    }
62
63    #[allow(
64        clippy::too_many_arguments,
65        reason = "code execution bridge carries explicit per-turn runtime dependencies"
66    )]
67    pub(crate) fn new(
68        session_id: String,
69        dispatch: Arc<ToolDispatchContext<'run>>,
70        process_env_store: Arc<dyn crate::ProcessExecutionEnvStore>,
71        attachment_store: Arc<dyn crate::AttachmentStore>,
72        chronological_projection: Arc<crate::ChronologicalProjection>,
73        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
74        turn_context: crate::TurnContext,
75    ) -> Self {
76        Self {
77            session_id,
78            dispatch,
79            process_env_store,
80            attachment_store,
81            chronological_projection,
82            protocol_extension,
83            turn_context,
84            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
85                crate::PluginOptions::default(),
86                crate::SessionPolicy::default(),
87            ),
88            process_originator: None,
89            runtime_process_id: None,
90            process_event_context: None,
91            started_process_ids: Arc::default(),
92            process_env_ref: None,
93            process_wake_target: None,
94            parent_invocation: None,
95            turn_phase_probe: None,
96            turn_event_tx: None,
97            cancellation_token: None,
98        }
99    }
100
101    pub fn session_id(&self) -> &str {
102        &self.session_id
103    }
104
105    pub fn session_scope(&self) -> crate::SessionScope {
106        self.dispatch
107            .agent_frame_id
108            .as_str()
109            .is_empty()
110            .then(|| crate::SessionScope::new(self.session_id.clone()))
111            .unwrap_or_else(|| {
112                crate::SessionScope::for_agent_frame(
113                    self.session_id.clone(),
114                    self.dispatch.agent_frame_id.clone(),
115                )
116            })
117    }
118
119    pub fn trigger_store(&self) -> Option<Arc<dyn crate::TriggerStore>> {
120        self.dispatch
121            .trigger_router
122            .as_ref()
123            .map(crate::TriggerRouter::store)
124    }
125
126    pub fn trigger_registration_originator(&self) -> crate::ProcessOriginator {
127        self.process_originator
128            .clone()
129            .unwrap_or_else(|| crate::ProcessOriginator::session(self.session_scope()))
130    }
131
132    pub fn trigger_registration_wake_target(&self) -> Option<crate::SessionScope> {
133        self.process_wake_target
134            .clone()
135            .or_else(|| Some(self.session_scope()))
136    }
137
138    pub fn attachment_store(&self) -> Arc<dyn crate::AttachmentStore> {
139        Arc::clone(&self.attachment_store)
140    }
141
142    pub fn process_env_store(&self) -> Arc<dyn crate::ProcessExecutionEnvStore> {
143        Arc::clone(&self.process_env_store)
144    }
145
146    pub fn chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
147        Arc::clone(&self.chronological_projection)
148    }
149
150    pub fn protocol_extension<T: 'static>(&self) -> Option<&T> {
151        self.protocol_extension
152            .as_ref()
153            .and_then(|extension| extension.as_any().downcast_ref::<T>())
154    }
155
156    pub fn turn_context(&self) -> &crate::TurnContext {
157        &self.turn_context
158    }
159
160    pub fn tool_catalog(&self) -> Arc<crate::ToolCatalog> {
161        Arc::clone(&self.dispatch.tool_catalog)
162    }
163
164    pub(crate) fn session_graph_service(&self) -> &dyn crate::plugin::SessionGraphService {
165        self.dispatch.session_graph.as_ref()
166    }
167
168    pub(super) async fn emit_turn_activity(
169        &self,
170        correlation_id: TurnActivityId,
171        event: TurnEvent,
172    ) {
173        if let Some(tx) = &self.turn_event_tx {
174            let _ = tx.send(TurnActivity::new(correlation_id, event)).await;
175        }
176    }
177
178    pub(crate) fn with_turn_event_sender(mut self, turn_event_tx: Sender<TurnActivity>) -> Self {
179        self.turn_event_tx = Some(turn_event_tx);
180        self
181    }
182
183    pub(crate) fn with_parent_invocation(mut self, metadata: crate::RuntimeInvocation) -> Self {
184        self.parent_invocation = Some(metadata);
185        self
186    }
187
188    pub(crate) fn with_execution_env_spec(
189        mut self,
190        execution_env_spec: crate::ProcessExecutionEnvSpec,
191    ) -> Self {
192        self.execution_env_spec = execution_env_spec;
193        self
194    }
195
196    pub(crate) fn with_process_registration_context(
197        mut self,
198        registration: &crate::ProcessRegistration,
199    ) -> Self {
200        self.process_originator = Some(registration.provenance.originator.clone());
201        self.runtime_process_id = Some(registration.id.clone());
202        self.process_env_ref = registration.env_ref.clone();
203        self.process_wake_target = registration.wake_target.clone();
204        self
205    }
206
207    pub(crate) fn with_process_event_context(
208        mut self,
209        process_id: impl Into<String>,
210        registry: Arc<dyn crate::ProcessRegistry>,
211        store: Option<Arc<dyn crate::RuntimePersistence>>,
212        session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
213        queued_work_poke: Option<crate::QueuedWorkPoke>,
214    ) -> Self {
215        self.process_event_context = Some(RuntimeExecutionProcessEventContext {
216            process_id: process_id.into(),
217            registry,
218            store,
219            session_store_factory,
220            queued_work_poke,
221        });
222        self
223    }
224
225    /// Spawn provenance for children started by this context, present only
226    /// when this context executes a process: children inherit the chain's
227    /// originator and wake target instead of the ephemeral execution scope.
228    pub(super) fn record_started_process(&self, process_id: &str) {
229        self.started_process_ids
230            .lock()
231            .expect("started process ids lock")
232            .insert(process_id.to_string());
233    }
234
235    pub(super) fn is_run_local_process(&self, process_id: &str) -> bool {
236        self.started_process_ids
237            .lock()
238            .expect("started process ids lock")
239            .contains(process_id)
240    }
241
242    pub(crate) fn process_spawn_provenance(&self) -> Option<crate::ProcessSpawnProvenance> {
243        self.process_originator
244            .clone()
245            .map(|originator| crate::ProcessSpawnProvenance {
246                originator,
247                wake_target: self.process_wake_target.clone(),
248            })
249    }
250
251    pub(super) async fn attach_captured_process_execution_env(
252        &self,
253        registration: crate::ProcessRegistration,
254    ) -> Result<crate::ProcessRegistration, crate::PluginError> {
255        if registration.env_ref.is_some() {
256            return Ok(registration);
257        }
258        match registration.input.as_ref() {
259            crate::ProcessInput::ToolCall { .. } | crate::ProcessInput::Engine { .. } => {
260                let env_ref = self.captured_process_execution_env_ref().await?;
261                Ok(registration.with_execution_env_ref(Some(env_ref)))
262            }
263            crate::ProcessInput::External { .. } | crate::ProcessInput::SessionTurn { .. } => {
264                Ok(registration)
265            }
266        }
267    }
268
269    pub async fn captured_process_execution_env_ref(
270        &self,
271    ) -> Result<crate::ProcessExecutionEnvRef, crate::PluginError> {
272        if let Some(env_ref) = self.process_env_ref.clone() {
273            return Ok(env_ref);
274        }
275        crate::persist_process_execution_env(
276            self.process_env_store.as_ref(),
277            &self.execution_env_spec,
278        )
279        .await
280    }
281
282    pub(crate) fn with_turn_phase_probe(
283        mut self,
284        probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
285    ) -> Self {
286        self.turn_phase_probe = probe;
287        self
288    }
289
290    #[doc(hidden)]
291    pub fn named_phase(&self, phase: &'static str) -> crate::runtime::RuntimeNamedPhase {
292        crate::runtime::RuntimeNamedPhase::begin(self.turn_phase_probe.clone(), phase)
293    }
294
295    pub fn parent_invocation(&self) -> Option<&crate::RuntimeInvocation> {
296        self.parent_invocation.as_ref()
297    }
298
299    pub(crate) fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
300        self.cancellation_token = Some(cancellation_token);
301        self
302    }
303
304    pub(crate) fn tool_scheduling(&self, name: &str) -> crate::ToolScheduling {
305        crate::tool_dispatch::resolve_tool_scheduling(&self.dispatch, name)
306    }
307
308    pub fn callable_tool_manifest(&self, name: &str) -> Option<crate::ToolManifest> {
309        crate::tool_dispatch::resolve_callable_manifest(&self.dispatch, name)
310    }
311
312    pub fn callable_tool_manifest_by_id(&self, id: &crate::ToolId) -> Option<crate::ToolManifest> {
313        crate::tool_dispatch::resolve_callable_manifest_by_id(&self.dispatch, id)
314    }
315
316    pub fn tool_argument_projection_policy(
317        &self,
318        name: &str,
319    ) -> crate::ToolArgumentProjectionPolicy {
320        crate::tool_dispatch::resolve_tool_argument_projection_policy(&self.dispatch, name)
321    }
322
323    pub async fn start_child_process(
324        &self,
325        registration: crate::ProcessRegistration,
326        kind: impl Into<String>,
327        label: Option<String>,
328    ) -> crate::ToolInvocationReply {
329        let _phase = self.named_phase("process.start_child");
330        let registration = match self
331            .attach_captured_process_execution_env(registration)
332            .await
333        {
334            Ok(registration) => registration,
335            Err(err) => {
336                return crate::ToolInvocationReply::error(serde_json::json!(err.to_string()));
337            }
338        };
339        let process_id = registration.id.clone();
340        let mut options = crate::ProcessStartOptions::new()
341            .with_descriptor(crate::ProcessHandleDescriptor::new(Some(kind), label));
342        if let Some(spawn) = self.process_spawn_provenance() {
343            options = options.with_spawn_provenance(spawn);
344        }
345        match self
346            .dispatch
347            .processes
348            .start(
349                &self.session_id,
350                registration,
351                options,
352                self.process_scope(self.parent_invocation.clone()),
353            )
354            .await
355        {
356            Ok(_) => {
357                self.record_started_process(&process_id);
358                crate::ToolInvocationReply::success(Self::process_handle_json(&process_id))
359            }
360            Err(err) => crate::ToolInvocationReply::error(serde_json::json!(err.to_string())),
361        }
362    }
363
364    pub async fn sleep_process(
365        &self,
366        scope: &str,
367        sequence: u64,
368        duration_ms: u64,
369    ) -> Result<(), crate::RuntimeEffectControllerError> {
370        let cancellation = self.cancellation_token.clone().unwrap_or_default();
371        let invocation = crate::runtime::causal::process_sleep_invocation(
372            &self.session_id,
373            self.parent_invocation.as_ref(),
374            scope,
375            sequence,
376        );
377        let outcome = self
378            .dispatch
379            .effect_controller
380            .controller()
381            .execute_effect(
382                crate::RuntimeEffectEnvelope::new(
383                    invocation,
384                    crate::RuntimeEffectCommand::Sleep { duration_ms },
385                ),
386                crate::RuntimeEffectLocalExecutor::sleep(cancellation),
387            )
388            .await?;
389        match outcome {
390            crate::RuntimeEffectOutcome::Sleep => Ok(()),
391            other => Err(crate::RuntimeEffectControllerError::new(
392                "runtime_effect_wrong_outcome",
393                format!("expected sleep outcome, got {}", other.kind().as_str()),
394            )),
395        }
396    }
397
398    pub async fn await_process_signal_event(
399        &self,
400        process_id: &str,
401        signal_name: &str,
402        event_ordinal: u64,
403    ) -> Result<serde_json::Value, crate::RuntimeEffectControllerError> {
404        let cancellation = self.cancellation_token.clone().unwrap_or_default();
405        let key = self
406            .dispatch
407            .effect_controller
408            .controller()
409            .await_event_key(
410                &crate::ExecutionScope::process(process_id),
411                crate::AwaitEventWaitIdentity::process_signal(
412                    process_id,
413                    signal_name,
414                    event_ordinal,
415                ),
416            )
417            .await?;
418        let invocation = crate::runtime::causal::process_await_event_invocation(
419            &self.session_id,
420            self.parent_invocation.as_ref(),
421            process_id,
422            signal_name,
423            event_ordinal,
424        );
425        let outcome = self
426            .dispatch
427            .effect_controller
428            .controller()
429            .execute_effect(
430                crate::RuntimeEffectEnvelope::new(
431                    invocation,
432                    crate::RuntimeEffectCommand::AwaitEvent { key },
433                ),
434                crate::RuntimeEffectLocalExecutor::await_event(cancellation, None),
435            )
436            .await?;
437        match outcome.into_await_event()? {
438            crate::Resolution::Ok(value) => Ok(value),
439            crate::Resolution::Err(err) => Err(crate::RuntimeEffectControllerError::new(
440                err.code,
441                err.message,
442            )),
443            crate::Resolution::Timeout => Err(crate::RuntimeEffectControllerError::new(
444                "process_signal_wait_timeout",
445                "process signal wait timed out",
446            )),
447            crate::Resolution::Cancelled => Err(crate::RuntimeEffectControllerError::new(
448                "process_signal_wait_cancelled",
449                "process signal wait was cancelled",
450            )),
451        }
452    }
453
454    pub async fn signal_process_by_id(
455        &self,
456        registry: Arc<dyn crate::ProcessRegistry>,
457        process_id: &str,
458        signal_name: &str,
459        signal_id: String,
460        payload: serde_json::Value,
461    ) -> Result<crate::ProcessEvent, crate::RuntimeEffectControllerError> {
462        let event_type = crate::process_signal_event_type(signal_name)?;
463        let replay_key = format!("process:{process_id}:signal.{signal_name}:{signal_id}");
464        let signal_payload = payload.clone();
465        let command = crate::ProcessCommand::Signal {
466            process_id: process_id.to_string(),
467            signal_name: signal_name.to_string(),
468            signal_id,
469            request: crate::ProcessEventAppendRequest::new(event_type.clone(), payload)
470                .with_replay_key(replay_key),
471        };
472        let effect_id = command.effect_id();
473        let invocation = crate::runtime::causal::process_effect_invocation(
474            &self.session_id,
475            self.parent_invocation.clone(),
476            &effect_id,
477        );
478        let outcome = self
479            .dispatch
480            .effect_controller
481            .controller()
482            .execute_effect(
483                crate::RuntimeEffectEnvelope::new(
484                    invocation,
485                    crate::RuntimeEffectCommand::process(command),
486                ),
487                crate::RuntimeEffectLocalExecutor::processes(Arc::clone(&registry)),
488            )
489            .await?;
490        match outcome.into_process()? {
491            crate::ProcessEffectOutcome::Signal { event } => {
492                let waiting_ordinal =
493                    registry
494                        .get_process(process_id)
495                        .await
496                        .and_then(|record| match record.wait {
497                            Some(crate::WaitState {
498                                kind:
499                                    crate::WaitKind::Signal {
500                                        name,
501                                        event_type: wait_event_type,
502                                        ordinal,
503                                        ..
504                                    },
505                                ..
506                            }) if name == signal_name && wait_event_type == event_type => {
507                                Some(ordinal)
508                            }
509                            _ => None,
510                        });
511                let ordinal = match waiting_ordinal {
512                    Some(ordinal) => ordinal,
513                    None => {
514                        registry
515                            .count_events_through(process_id, &event_type, event.sequence)
516                            .await?
517                    }
518                };
519                if ordinal > 0 {
520                    let key = self
521                        .dispatch
522                        .effect_controller
523                        .controller()
524                        .await_event_key(
525                            &crate::ExecutionScope::process(process_id),
526                            crate::AwaitEventWaitIdentity::process_signal(
527                                process_id,
528                                signal_name,
529                                ordinal,
530                            ),
531                        )
532                        .await?;
533                    let _ = self
534                        .dispatch
535                        .effect_controller
536                        .controller()
537                        .resolve_await_event(&key, crate::Resolution::Ok(signal_payload))
538                        .await?;
539                }
540                Ok(event)
541            }
542            other => Err(crate::RuntimeEffectControllerError::new(
543                "runtime_effect_wrong_outcome",
544                format!("expected signal outcome, got {other:?}"),
545            )),
546        }
547    }
548
549    pub async fn append_process_event(
550        &self,
551        registry: Arc<dyn crate::ProcessRegistry>,
552        process_id: &str,
553        request: crate::ProcessEventAppendRequest,
554    ) -> Result<crate::ProcessEvent, crate::PluginError> {
555        let result = registry.append_event(process_id, request).await?;
556        if let Some(context) = self.process_event_context.as_ref() {
557            crate::tool_provider::process_events::enqueue_wake_delivery(
558                context.store.clone(),
559                context.session_store_factory.as_ref(),
560                result.wake_delivery,
561                Some(self.session_graph_service()),
562                context.queued_work_poke.as_ref(),
563            )
564            .await?;
565        }
566        Ok(result.event)
567    }
568}
569
570#[cfg(test)]
571mod tests {
572    use super::*;
573    use crate::tool_dispatch::ToolDispatchContext;
574    use crate::{ToolCall, ToolProvider, ToolResult};
575
576    struct NoopTools;
577
578    #[async_trait::async_trait]
579    impl ToolProvider for NoopTools {
580        fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
581            Vec::new()
582        }
583
584        fn resolve_contract(&self, _name: &str) -> Option<Arc<crate::ToolContract>> {
585            None
586        }
587
588        async fn execute(&self, _call: ToolCall<'_>) -> ToolResult {
589            ToolResult::err_fmt("not used")
590        }
591    }
592
593    #[test]
594    fn tool_argument_projection_policy_resolves_from_active_catalog_and_defaults_unknown() {
595        let tool = crate::ToolDefinition::raw(
596            "tool:seedy",
597            "seedy",
598            "Seed-aware",
599            crate::ToolDefinition::default_input_schema(),
600            serde_json::json!({ "type": "string" }),
601        )
602        .with_argument_projection(
603            crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed"),
604        );
605        let plugins = crate::plugin::PluginHost::empty()
606            .build_session("session", None)
607            .expect("plugin session");
608        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
609        let dispatch = Arc::new(ToolDispatchContext {
610            plugins,
611            tools: Arc::new(NoopTools),
612            tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
613                vec![tool.manifest()],
614                std::collections::BTreeMap::new(),
615            )),
616            sessions: Arc::new(crate::testing::MockSessionManager::default()),
617            session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
618            session_graph: Arc::new(crate::testing::MockSessionManager::default()),
619            processes: Arc::new(crate::UnavailableProcessService),
620            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
621            trigger_router: None,
622            effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
623                crate::InlineRuntimeEffectController,
624            )),
625            direct_completions: crate::DirectCompletionClient::unavailable(
626                "direct completions are unavailable in this test context",
627            ),
628            parent_invocation: None,
629            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
630                crate::PluginOptions::default(),
631                crate::SessionPolicy::default(),
632            ),
633            session_id: "session".to_string(),
634            agent_frame_id: String::new(),
635            event_tx,
636            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
637            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
638            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
639            turn_context: crate::TurnContext::default(),
640        });
641        let ctx = RuntimeExecutionContext::new(
642            "session".to_string(),
643            dispatch,
644            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
645            Arc::new(crate::InMemoryAttachmentStore::new()),
646            Arc::new(crate::ChronologicalProjection::default()),
647            None,
648            crate::TurnContext::default(),
649        );
650
651        assert_eq!(
652            ctx.tool_argument_projection_policy("seedy"),
653            crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed")
654        );
655        assert_eq!(
656            ctx.tool_argument_projection_policy("missing"),
657            crate::ToolArgumentProjectionPolicy::MaterializeProjectedValues
658        );
659    }
660}