Skip to main content

lash_core/session/
execution_context.rs

1use std::sync::Arc;
2
3use tokio::sync::mpsc::Sender;
4use tokio_util::sync::CancellationToken;
5
6use crate::tool_dispatch::ToolDispatchContext;
7use crate::{TurnActivity, TurnActivityId, TurnEvent};
8
9#[derive(Clone)]
10pub struct RuntimeExecutionContext<'run> {
11    pub(super) session_id: String,
12    pub(super) dispatch: Arc<ToolDispatchContext<'run>>,
13    process_env_store: Arc<dyn crate::ProcessExecutionEnvStore>,
14    attachment_store: Arc<dyn crate::AttachmentStore>,
15    chronological_projection: Arc<crate::ChronologicalProjection>,
16    protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
17    turn_context: crate::TurnContext,
18    execution_env_spec: crate::ProcessExecutionEnvSpec,
19    process_originator: Option<crate::ProcessOriginator>,
20    pub(super) runtime_process_id: Option<String>,
21    pub(super) process_event_context: Option<RuntimeExecutionProcessEventContext>,
22    process_env_ref: Option<crate::ProcessExecutionEnvRef>,
23    process_wake_target: Option<crate::SessionScope>,
24    pub(super) parent_invocation: Option<crate::RuntimeInvocation>,
25    turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
26    pub(super) turn_event_tx: Option<Sender<TurnActivity>>,
27    pub(super) cancellation_token: Option<CancellationToken>,
28    /// Process ids started by THIS execution context. Possession of a handle
29    /// the run itself created is sufficient capability to await/cancel it —
30    /// run-local children are not session handle grants (the ephemeral
31    /// execution scope must never appear in durable grant state).
32    started_process_ids: Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
33}
34
35#[derive(Clone)]
36pub(super) struct RuntimeExecutionProcessEventContext {
37    pub process_id: String,
38    pub registry: Arc<dyn crate::ProcessRegistry>,
39    pub store: Option<Arc<dyn crate::RuntimePersistence>>,
40    pub session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
41    pub queued_work_driver: Option<crate::QueuedWorkDriver>,
42}
43
44impl<'run> RuntimeExecutionContext<'run> {
45    pub(crate) fn drain_tool_trigger_outcomes(
46        &self,
47    ) -> Result<Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>, crate::PluginError> {
48        self.dispatch
49            .trigger_outcomes
50            .drain()
51            .map_err(crate::PluginError::Session)
52    }
53
54    pub(super) fn process_scope(
55        &self,
56        parent_invocation: Option<crate::RuntimeInvocation>,
57    ) -> crate::ProcessOpScope<'_> {
58        crate::ProcessOpScope::new(self.dispatch.effect_controller.scoped())
59            .with_parent_invocation(parent_invocation)
60            .with_agent_frame_id(Some(self.dispatch.agent_frame_id.clone()))
61    }
62
63    #[allow(
64        clippy::too_many_arguments,
65        reason = "code execution bridge carries explicit per-turn runtime dependencies"
66    )]
67    pub(crate) fn new(
68        session_id: String,
69        dispatch: Arc<ToolDispatchContext<'run>>,
70        process_env_store: Arc<dyn crate::ProcessExecutionEnvStore>,
71        attachment_store: Arc<dyn crate::AttachmentStore>,
72        chronological_projection: Arc<crate::ChronologicalProjection>,
73        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
74        turn_context: crate::TurnContext,
75    ) -> Self {
76        Self {
77            session_id,
78            dispatch,
79            process_env_store,
80            attachment_store,
81            chronological_projection,
82            protocol_extension,
83            turn_context,
84            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
85                crate::PluginOptions::default(),
86                crate::SessionPolicy::default(),
87            ),
88            process_originator: None,
89            runtime_process_id: None,
90            process_event_context: None,
91            started_process_ids: Arc::default(),
92            process_env_ref: None,
93            process_wake_target: None,
94            parent_invocation: None,
95            turn_phase_probe: None,
96            turn_event_tx: None,
97            cancellation_token: None,
98        }
99    }
100
101    pub fn session_id(&self) -> &str {
102        &self.session_id
103    }
104
105    pub fn execution_scope_id(&self) -> String {
106        self.dispatch
107            .effect_controller
108            .scoped()
109            .scope_id()
110            .to_string()
111    }
112
113    pub fn session_scope(&self) -> crate::SessionScope {
114        if self.dispatch.agent_frame_id.is_empty() {
115            crate::SessionScope::new(self.session_id.clone())
116        } else {
117            crate::SessionScope::for_agent_frame(
118                self.session_id.clone(),
119                self.dispatch.agent_frame_id.clone(),
120            )
121        }
122    }
123
124    pub fn trigger_store(&self) -> Option<Arc<dyn crate::TriggerStore>> {
125        self.dispatch
126            .trigger_router
127            .as_ref()
128            .map(crate::TriggerRouter::store)
129    }
130
131    pub fn trigger_registration_originator(&self) -> crate::ProcessOriginator {
132        self.process_originator
133            .clone()
134            .unwrap_or_else(|| crate::ProcessOriginator::session(self.session_scope()))
135    }
136
137    pub fn trigger_registration_wake_target(&self) -> Option<crate::SessionScope> {
138        self.process_wake_target
139            .clone()
140            .or_else(|| Some(self.session_scope()))
141    }
142
143    pub fn attachment_store(&self) -> Arc<dyn crate::AttachmentStore> {
144        Arc::clone(&self.attachment_store)
145    }
146
147    pub fn process_env_store(&self) -> Arc<dyn crate::ProcessExecutionEnvStore> {
148        Arc::clone(&self.process_env_store)
149    }
150
151    pub fn chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
152        Arc::clone(&self.chronological_projection)
153    }
154
155    pub fn protocol_extension<T: 'static>(&self) -> Option<&T> {
156        self.protocol_extension
157            .as_ref()
158            .and_then(|extension| extension.as_any().downcast_ref::<T>())
159    }
160
161    pub fn turn_context(&self) -> &crate::TurnContext {
162        &self.turn_context
163    }
164
165    pub fn tool_catalog(&self) -> Arc<crate::ToolCatalog> {
166        Arc::clone(&self.dispatch.tool_catalog)
167    }
168
169    pub(crate) fn session_graph_service(&self) -> &dyn crate::plugin::SessionGraphService {
170        self.dispatch.session_graph.as_ref()
171    }
172
173    pub(super) async fn emit_turn_activity(
174        &self,
175        correlation_id: TurnActivityId,
176        event: TurnEvent,
177    ) {
178        if let Some(tx) = &self.turn_event_tx {
179            let _ = tx.send(TurnActivity::new(correlation_id, event)).await;
180        }
181    }
182
183    pub(crate) fn with_turn_event_sender(mut self, turn_event_tx: Sender<TurnActivity>) -> Self {
184        self.turn_event_tx = Some(turn_event_tx);
185        self
186    }
187
188    pub(crate) fn with_parent_invocation(mut self, metadata: crate::RuntimeInvocation) -> Self {
189        self.parent_invocation = Some(metadata);
190        self
191    }
192
193    pub(crate) fn with_execution_env_spec(
194        mut self,
195        execution_env_spec: crate::ProcessExecutionEnvSpec,
196    ) -> Self {
197        self.execution_env_spec = execution_env_spec;
198        self
199    }
200
201    pub(crate) fn with_process_registration_context(
202        mut self,
203        registration: &crate::ProcessRegistration,
204    ) -> Self {
205        self.process_originator = Some(registration.provenance.originator.clone());
206        self.runtime_process_id = Some(registration.id.clone());
207        self.process_env_ref = registration.env_ref.clone();
208        self.process_wake_target = registration.wake_target.clone();
209        self
210    }
211
212    pub(crate) fn with_process_event_context(
213        mut self,
214        process_id: impl Into<String>,
215        registry: Arc<dyn crate::ProcessRegistry>,
216        store: Option<Arc<dyn crate::RuntimePersistence>>,
217        session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
218        queued_work_driver: Option<crate::QueuedWorkDriver>,
219    ) -> Self {
220        self.process_event_context = Some(RuntimeExecutionProcessEventContext {
221            process_id: process_id.into(),
222            registry,
223            store,
224            session_store_factory,
225            queued_work_driver,
226        });
227        self
228    }
229
230    /// Spawn provenance for children started by this context, present only
231    /// when this context executes a process: children inherit the chain's
232    /// originator and wake target instead of the ephemeral execution scope.
233    pub(super) fn record_started_process(&self, process_id: &str) {
234        self.started_process_ids
235            .lock()
236            .expect("started process ids lock")
237            .insert(process_id.to_string());
238    }
239
240    pub(super) fn is_run_local_process(&self, process_id: &str) -> bool {
241        self.started_process_ids
242            .lock()
243            .expect("started process ids lock")
244            .contains(process_id)
245    }
246
247    pub(crate) fn process_spawn_provenance(&self) -> Option<crate::ProcessSpawnProvenance> {
248        self.process_originator
249            .clone()
250            .map(|originator| crate::ProcessSpawnProvenance {
251                originator,
252                wake_target: self.process_wake_target.clone(),
253            })
254    }
255
256    pub(super) async fn attach_captured_process_execution_env(
257        &self,
258        registration: crate::ProcessRegistration,
259    ) -> Result<crate::ProcessRegistration, crate::PluginError> {
260        if registration.env_ref.is_some() {
261            return Ok(registration);
262        }
263        match registration.input.as_ref() {
264            crate::ProcessInput::ToolCall { .. } | crate::ProcessInput::Engine { .. } => {
265                let env_ref = self.captured_process_execution_env_ref().await?;
266                Ok(registration.with_execution_env_ref(Some(env_ref)))
267            }
268            crate::ProcessInput::External { .. } | crate::ProcessInput::SessionTurn { .. } => {
269                Ok(registration)
270            }
271        }
272    }
273
274    pub async fn captured_process_execution_env_ref(
275        &self,
276    ) -> Result<crate::ProcessExecutionEnvRef, crate::PluginError> {
277        if let Some(env_ref) = self.process_env_ref.clone() {
278            return Ok(env_ref);
279        }
280        crate::persist_process_execution_env(
281            self.process_env_store.as_ref(),
282            &self.execution_env_spec,
283        )
284        .await
285    }
286
287    pub(crate) fn with_turn_phase_probe(
288        mut self,
289        probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
290    ) -> Self {
291        self.turn_phase_probe = probe;
292        self
293    }
294
295    #[doc(hidden)]
296    pub fn named_phase(&self, phase: &'static str) -> crate::runtime::RuntimeNamedPhase {
297        crate::runtime::RuntimeNamedPhase::begin(self.turn_phase_probe.clone(), phase)
298    }
299
300    pub fn parent_invocation(&self) -> Option<&crate::RuntimeInvocation> {
301        self.parent_invocation.as_ref()
302    }
303
304    pub(crate) fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
305        self.cancellation_token = Some(cancellation_token);
306        self
307    }
308
309    pub(crate) fn tool_scheduling(&self, name: &str) -> crate::ToolScheduling {
310        crate::tool_dispatch::resolve_tool_scheduling(&self.dispatch, name)
311    }
312
313    pub fn callable_tool_manifest(&self, name: &str) -> Option<crate::ToolManifest> {
314        crate::tool_dispatch::resolve_callable_manifest(&self.dispatch, name)
315    }
316
317    pub fn callable_tool_manifest_by_id(&self, id: &crate::ToolId) -> Option<crate::ToolManifest> {
318        crate::tool_dispatch::resolve_callable_manifest_by_id(&self.dispatch, id)
319    }
320
321    pub fn tool_argument_projection_policy(
322        &self,
323        name: &str,
324    ) -> crate::ToolArgumentProjectionPolicy {
325        crate::tool_dispatch::resolve_tool_argument_projection_policy(&self.dispatch, name)
326    }
327
328    pub async fn start_child_process(
329        &self,
330        registration: crate::ProcessRegistration,
331        kind: impl Into<String>,
332        label: Option<String>,
333    ) -> crate::ToolInvocationReply {
334        let _phase = self.named_phase("process.start_child");
335        let registration = match self
336            .attach_captured_process_execution_env(registration)
337            .await
338        {
339            Ok(registration) => registration,
340            Err(err) => {
341                return crate::ToolInvocationReply::error(serde_json::json!(err.to_string()));
342            }
343        };
344        let process_id = registration.id.clone();
345        let mut options = crate::ProcessStartOptions::new()
346            .with_descriptor(crate::ProcessHandleDescriptor::new(Some(kind), label));
347        if let Some(spawn) = self.process_spawn_provenance() {
348            options = options.with_spawn_provenance(spawn);
349        }
350        match self
351            .dispatch
352            .processes
353            .start(
354                &self.session_id,
355                registration,
356                options,
357                self.process_scope(self.parent_invocation.clone()),
358            )
359            .await
360        {
361            Ok(_) => {
362                self.record_started_process(&process_id);
363                crate::ToolInvocationReply::success(Self::process_handle_json(&process_id))
364            }
365            Err(err) => crate::ToolInvocationReply::error(serde_json::json!(err.to_string())),
366        }
367    }
368
369    pub async fn sleep_process(
370        &self,
371        scope: &str,
372        sequence: u64,
373        duration_ms: u64,
374    ) -> Result<(), crate::RuntimeEffectControllerError> {
375        let cancellation = self.cancellation_token.clone().unwrap_or_default();
376        let invocation = crate::runtime::causal::process_sleep_invocation(
377            &self.session_id,
378            self.parent_invocation.as_ref(),
379            scope,
380            sequence,
381        );
382        let outcome = self
383            .dispatch
384            .effect_controller
385            .controller()
386            .execute_effect(
387                crate::RuntimeEffectEnvelope::new(
388                    invocation,
389                    crate::RuntimeEffectCommand::Sleep { duration_ms },
390                ),
391                crate::RuntimeEffectLocalExecutor::sleep_with_clock(
392                    cancellation,
393                    std::sync::Arc::clone(&self.dispatch.clock),
394                ),
395            )
396            .await?;
397        match outcome {
398            crate::RuntimeEffectOutcome::Sleep => Ok(()),
399            other => Err(crate::RuntimeEffectControllerError::new(
400                "runtime_effect_wrong_outcome",
401                format!("expected sleep outcome, got {}", other.kind().as_str()),
402            )),
403        }
404    }
405
406    pub async fn await_process_signal_event(
407        &self,
408        process_id: &str,
409        signal_name: &str,
410        event_ordinal: u64,
411    ) -> Result<serde_json::Value, crate::RuntimeEffectControllerError> {
412        let cancellation = self.cancellation_token.clone().unwrap_or_default();
413        let key = self
414            .dispatch
415            .effect_controller
416            .controller()
417            .await_event_key(
418                &crate::ExecutionScope::process(process_id),
419                crate::AwaitEventWaitIdentity::process_signal(
420                    process_id,
421                    signal_name,
422                    event_ordinal,
423                ),
424            )
425            .await?;
426        let invocation = crate::runtime::causal::process_await_event_invocation(
427            &self.session_id,
428            self.parent_invocation.as_ref(),
429            process_id,
430            signal_name,
431            event_ordinal,
432        );
433        let outcome = self
434            .dispatch
435            .effect_controller
436            .controller()
437            .execute_effect(
438                crate::RuntimeEffectEnvelope::new(
439                    invocation,
440                    crate::RuntimeEffectCommand::AwaitEvent { key },
441                ),
442                crate::RuntimeEffectLocalExecutor::await_event_with_clock(
443                    cancellation,
444                    None,
445                    std::sync::Arc::clone(&self.dispatch.clock),
446                ),
447            )
448            .await?;
449        match outcome.into_await_event()? {
450            crate::Resolution::Ok(value) => Ok(value),
451            crate::Resolution::Err(err) => Err(crate::RuntimeEffectControllerError::new(
452                err.code,
453                err.message,
454            )),
455            crate::Resolution::Timeout => Err(crate::RuntimeEffectControllerError::new(
456                "process_signal_wait_timeout",
457                "process signal wait timed out",
458            )),
459            crate::Resolution::Cancelled => Err(crate::RuntimeEffectControllerError::new(
460                "process_signal_wait_cancelled",
461                "process signal wait was cancelled",
462            )),
463        }
464    }
465
466    pub async fn signal_process_by_id(
467        &self,
468        registry: Arc<dyn crate::ProcessRegistry>,
469        process_id: &str,
470        signal_name: &str,
471        signal_id: String,
472        payload: serde_json::Value,
473    ) -> Result<crate::ProcessEvent, crate::RuntimeEffectControllerError> {
474        let event_type = crate::process_signal_event_type(signal_name)?;
475        let replay_key = format!("process:{process_id}:signal.{signal_name}:{signal_id}");
476        let signal_payload = payload.clone();
477        let command = crate::ProcessCommand::Signal {
478            process_id: process_id.to_string(),
479            signal_name: signal_name.to_string(),
480            signal_id,
481            request: crate::ProcessEventAppendRequest::new(event_type.clone(), payload)
482                .with_replay_key(replay_key),
483        };
484        let effect_id = command.effect_id();
485        let invocation = crate::runtime::causal::process_effect_invocation(
486            &self.session_id,
487            self.parent_invocation.clone(),
488            &effect_id,
489        );
490        let outcome = self
491            .dispatch
492            .effect_controller
493            .controller()
494            .execute_effect(
495                crate::RuntimeEffectEnvelope::new(
496                    invocation,
497                    crate::RuntimeEffectCommand::process(command),
498                ),
499                crate::RuntimeEffectLocalExecutor::processes(Arc::clone(&registry)),
500            )
501            .await?;
502        match outcome.into_process()? {
503            crate::ProcessEffectOutcome::Signal { event } => {
504                let waiting_ordinal =
505                    registry
506                        .get_process(process_id)
507                        .await
508                        .and_then(|record| match record.wait {
509                            Some(crate::WaitState {
510                                kind:
511                                    crate::WaitKind::Signal {
512                                        name,
513                                        event_type: wait_event_type,
514                                        ordinal,
515                                        ..
516                                    },
517                                ..
518                            }) if name == signal_name && wait_event_type == event_type => {
519                                Some(ordinal)
520                            }
521                            _ => None,
522                        });
523                let ordinal = match waiting_ordinal {
524                    Some(ordinal) => ordinal,
525                    None => {
526                        registry
527                            .count_events_through(process_id, &event_type, event.sequence)
528                            .await?
529                    }
530                };
531                if ordinal > 0 {
532                    let key = self
533                        .dispatch
534                        .effect_controller
535                        .controller()
536                        .await_event_key(
537                            &crate::ExecutionScope::process(process_id),
538                            crate::AwaitEventWaitIdentity::process_signal(
539                                process_id,
540                                signal_name,
541                                ordinal,
542                            ),
543                        )
544                        .await?;
545                    let _ = self
546                        .dispatch
547                        .effect_controller
548                        .controller()
549                        .resolve_await_event(&key, crate::Resolution::Ok(signal_payload))
550                        .await?;
551                }
552                Ok(event)
553            }
554            other => Err(crate::RuntimeEffectControllerError::new(
555                "runtime_effect_wrong_outcome",
556                format!("expected signal outcome, got {other:?}"),
557            )),
558        }
559    }
560
561    pub async fn append_process_event(
562        &self,
563        registry: Arc<dyn crate::ProcessRegistry>,
564        process_id: &str,
565        request: crate::ProcessEventAppendRequest,
566    ) -> Result<crate::ProcessEvent, crate::PluginError> {
567        let result = registry.append_event(process_id, request).await?;
568        if let Some(context) = self.process_event_context.as_ref() {
569            crate::tool_provider::process_events::enqueue_wake_delivery(
570                context.store.clone(),
571                context.session_store_factory.as_ref(),
572                result.wake_delivery,
573                Some(self.session_graph_service()),
574                context.queued_work_driver.as_ref(),
575            )
576            .await?;
577        }
578        Ok(result.event)
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585    use crate::tool_dispatch::ToolDispatchContext;
586    use crate::{ToolCall, ToolProvider, ToolResult};
587
588    struct NoopTools;
589
590    #[async_trait::async_trait]
591    impl ToolProvider for NoopTools {
592        fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
593            Vec::new()
594        }
595
596        fn resolve_contract(&self, _name: &str) -> Option<Arc<crate::ToolContract>> {
597            None
598        }
599
600        async fn execute(&self, _call: ToolCall<'_>) -> ToolResult {
601            ToolResult::err_fmt("not used")
602        }
603    }
604
605    #[test]
606    fn tool_argument_projection_policy_resolves_from_active_catalog_and_defaults_unknown() {
607        let tool = crate::ToolDefinition::raw(
608            "tool:seedy",
609            "seedy",
610            "Seed-aware",
611            crate::ToolDefinition::default_input_schema(),
612            serde_json::json!({ "type": "string" }),
613        )
614        .with_argument_projection(
615            crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed"),
616        );
617        let plugins = crate::plugin::PluginHost::empty()
618            .build_session("session", None)
619            .expect("plugin session");
620        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
621        let dispatch = Arc::new(ToolDispatchContext {
622            plugins,
623            tools: Arc::new(NoopTools),
624            tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
625                vec![tool.manifest()],
626                std::collections::BTreeMap::new(),
627            )),
628            sessions: Arc::new(crate::testing::MockSessionManager::default()),
629            session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
630            session_graph: Arc::new(crate::testing::MockSessionManager::default()),
631            processes: Arc::new(crate::UnavailableProcessService),
632            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
633            trigger_router: None,
634            effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
635                crate::InlineRuntimeEffectController,
636            )),
637            direct_completions: crate::DirectCompletionClient::unavailable(
638                "direct completions are unavailable in this test context",
639            ),
640            parent_invocation: None,
641            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
642                crate::PluginOptions::default(),
643                crate::SessionPolicy::default(),
644            ),
645            session_id: "session".to_string(),
646            agent_frame_id: String::new(),
647            event_tx,
648            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
649            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
650            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
651            turn_context: crate::TurnContext::default(),
652            clock: std::sync::Arc::new(crate::SystemClock),
653        });
654        let ctx = RuntimeExecutionContext::new(
655            "session".to_string(),
656            dispatch,
657            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
658            Arc::new(crate::InMemoryAttachmentStore::new()),
659            Arc::new(crate::ChronologicalProjection::default()),
660            None,
661            crate::TurnContext::default(),
662        );
663
664        assert_eq!(
665            ctx.tool_argument_projection_policy("seedy"),
666            crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed")
667        );
668        assert_eq!(
669            ctx.tool_argument_projection_policy("missing"),
670            crate::ToolArgumentProjectionPolicy::MaterializeProjectedValues
671        );
672    }
673}