Skip to main content

lash_core/session/
execution_context.rs

1use std::sync::Arc;
2
3use tokio::sync::mpsc::Sender;
4use tokio_util::sync::CancellationToken;
5
6use crate::tool_dispatch::ToolDispatchContext;
7use crate::{TurnActivity, TurnActivityId, TurnEvent};
8
9#[derive(Clone)]
10pub struct RuntimeExecutionContext<'run> {
11    pub(super) session_id: String,
12    pub(super) dispatch: Arc<ToolDispatchContext<'run>>,
13    process_env_store: Arc<dyn crate::ProcessExecutionEnvStore>,
14    attachment_store: Arc<dyn crate::AttachmentStore>,
15    chronological_projection: Arc<crate::ChronologicalProjection>,
16    protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
17    turn_context: crate::TurnContext,
18    execution_env_spec: crate::ProcessExecutionEnvSpec,
19    process_originator: Option<crate::ProcessOriginator>,
20    pub(super) runtime_process_id: Option<String>,
21    pub(super) process_event_context: Option<RuntimeExecutionProcessEventContext>,
22    process_env_ref: Option<crate::ProcessExecutionEnvRef>,
23    process_wake_target: Option<crate::SessionScope>,
24    pub(super) parent_invocation: Option<crate::RuntimeInvocation>,
25    turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
26    pub(super) turn_event_tx: Option<Sender<TurnActivity>>,
27    pub(super) cancellation_token: Option<CancellationToken>,
28    /// Process ids started by THIS execution context. Possession of a handle
29    /// the run itself created is sufficient capability to await/cancel it —
30    /// run-local children are not session handle grants (the ephemeral
31    /// execution scope must never appear in durable grant state).
32    started_process_ids: Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
33}
34
35#[derive(Clone)]
36pub(super) struct RuntimeExecutionProcessEventContext {
37    pub process_id: String,
38    pub registry: Arc<dyn crate::ProcessRegistry>,
39    pub store: Option<Arc<dyn crate::RuntimePersistence>>,
40    pub session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
41    pub queued_work_driver: Option<crate::QueuedWorkDriver>,
42}
43
44impl<'run> RuntimeExecutionContext<'run> {
45    pub(crate) fn drain_tool_trigger_outcomes(
46        &self,
47    ) -> Result<Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>, crate::PluginError> {
48        self.dispatch
49            .trigger_outcomes
50            .drain()
51            .map_err(crate::PluginError::Session)
52    }
53
54    pub(super) fn process_scope(
55        &self,
56        parent_invocation: Option<crate::RuntimeInvocation>,
57    ) -> crate::ProcessOpScope<'_> {
58        crate::ProcessOpScope::new(self.dispatch.effect_controller.scoped())
59            .with_parent_invocation(parent_invocation)
60            .with_agent_frame_id(Some(self.dispatch.agent_frame_id.clone()))
61    }
62
63    #[allow(
64        clippy::too_many_arguments,
65        reason = "code execution bridge carries explicit per-turn runtime dependencies"
66    )]
67    pub(crate) fn new(
68        session_id: String,
69        dispatch: Arc<ToolDispatchContext<'run>>,
70        process_env_store: Arc<dyn crate::ProcessExecutionEnvStore>,
71        attachment_store: Arc<dyn crate::AttachmentStore>,
72        chronological_projection: Arc<crate::ChronologicalProjection>,
73        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
74        turn_context: crate::TurnContext,
75    ) -> Self {
76        Self {
77            session_id,
78            dispatch,
79            process_env_store,
80            attachment_store,
81            chronological_projection,
82            protocol_extension,
83            turn_context,
84            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
85                crate::PluginOptions::default(),
86                crate::SessionPolicy::default(),
87            ),
88            process_originator: None,
89            runtime_process_id: None,
90            process_event_context: None,
91            started_process_ids: Arc::default(),
92            process_env_ref: None,
93            process_wake_target: None,
94            parent_invocation: None,
95            turn_phase_probe: None,
96            turn_event_tx: None,
97            cancellation_token: None,
98        }
99    }
100
101    pub fn session_id(&self) -> &str {
102        &self.session_id
103    }
104
105    pub fn execution_scope_id(&self) -> String {
106        self.dispatch
107            .effect_controller
108            .scoped()
109            .scope_id()
110            .to_string()
111    }
112
113    pub fn session_scope(&self) -> crate::SessionScope {
114        self.dispatch
115            .agent_frame_id
116            .as_str()
117            .is_empty()
118            .then(|| crate::SessionScope::new(self.session_id.clone()))
119            .unwrap_or_else(|| {
120                crate::SessionScope::for_agent_frame(
121                    self.session_id.clone(),
122                    self.dispatch.agent_frame_id.clone(),
123                )
124            })
125    }
126
127    pub fn trigger_store(&self) -> Option<Arc<dyn crate::TriggerStore>> {
128        self.dispatch
129            .trigger_router
130            .as_ref()
131            .map(crate::TriggerRouter::store)
132    }
133
134    pub fn trigger_registration_originator(&self) -> crate::ProcessOriginator {
135        self.process_originator
136            .clone()
137            .unwrap_or_else(|| crate::ProcessOriginator::session(self.session_scope()))
138    }
139
140    pub fn trigger_registration_wake_target(&self) -> Option<crate::SessionScope> {
141        self.process_wake_target
142            .clone()
143            .or_else(|| Some(self.session_scope()))
144    }
145
146    pub fn attachment_store(&self) -> Arc<dyn crate::AttachmentStore> {
147        Arc::clone(&self.attachment_store)
148    }
149
150    pub fn process_env_store(&self) -> Arc<dyn crate::ProcessExecutionEnvStore> {
151        Arc::clone(&self.process_env_store)
152    }
153
154    pub fn chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
155        Arc::clone(&self.chronological_projection)
156    }
157
158    pub fn protocol_extension<T: 'static>(&self) -> Option<&T> {
159        self.protocol_extension
160            .as_ref()
161            .and_then(|extension| extension.as_any().downcast_ref::<T>())
162    }
163
164    pub fn turn_context(&self) -> &crate::TurnContext {
165        &self.turn_context
166    }
167
168    pub fn tool_catalog(&self) -> Arc<crate::ToolCatalog> {
169        Arc::clone(&self.dispatch.tool_catalog)
170    }
171
172    pub(crate) fn session_graph_service(&self) -> &dyn crate::plugin::SessionGraphService {
173        self.dispatch.session_graph.as_ref()
174    }
175
176    pub(super) async fn emit_turn_activity(
177        &self,
178        correlation_id: TurnActivityId,
179        event: TurnEvent,
180    ) {
181        if let Some(tx) = &self.turn_event_tx {
182            let _ = tx.send(TurnActivity::new(correlation_id, event)).await;
183        }
184    }
185
186    pub(crate) fn with_turn_event_sender(mut self, turn_event_tx: Sender<TurnActivity>) -> Self {
187        self.turn_event_tx = Some(turn_event_tx);
188        self
189    }
190
191    pub(crate) fn with_parent_invocation(mut self, metadata: crate::RuntimeInvocation) -> Self {
192        self.parent_invocation = Some(metadata);
193        self
194    }
195
196    pub(crate) fn with_execution_env_spec(
197        mut self,
198        execution_env_spec: crate::ProcessExecutionEnvSpec,
199    ) -> Self {
200        self.execution_env_spec = execution_env_spec;
201        self
202    }
203
204    pub(crate) fn with_process_registration_context(
205        mut self,
206        registration: &crate::ProcessRegistration,
207    ) -> Self {
208        self.process_originator = Some(registration.provenance.originator.clone());
209        self.runtime_process_id = Some(registration.id.clone());
210        self.process_env_ref = registration.env_ref.clone();
211        self.process_wake_target = registration.wake_target.clone();
212        self
213    }
214
215    pub(crate) fn with_process_event_context(
216        mut self,
217        process_id: impl Into<String>,
218        registry: Arc<dyn crate::ProcessRegistry>,
219        store: Option<Arc<dyn crate::RuntimePersistence>>,
220        session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
221        queued_work_driver: Option<crate::QueuedWorkDriver>,
222    ) -> Self {
223        self.process_event_context = Some(RuntimeExecutionProcessEventContext {
224            process_id: process_id.into(),
225            registry,
226            store,
227            session_store_factory,
228            queued_work_driver,
229        });
230        self
231    }
232
233    /// Spawn provenance for children started by this context, present only
234    /// when this context executes a process: children inherit the chain's
235    /// originator and wake target instead of the ephemeral execution scope.
236    pub(super) fn record_started_process(&self, process_id: &str) {
237        self.started_process_ids
238            .lock()
239            .expect("started process ids lock")
240            .insert(process_id.to_string());
241    }
242
243    pub(super) fn is_run_local_process(&self, process_id: &str) -> bool {
244        self.started_process_ids
245            .lock()
246            .expect("started process ids lock")
247            .contains(process_id)
248    }
249
250    pub(crate) fn process_spawn_provenance(&self) -> Option<crate::ProcessSpawnProvenance> {
251        self.process_originator
252            .clone()
253            .map(|originator| crate::ProcessSpawnProvenance {
254                originator,
255                wake_target: self.process_wake_target.clone(),
256            })
257    }
258
259    pub(super) async fn attach_captured_process_execution_env(
260        &self,
261        registration: crate::ProcessRegistration,
262    ) -> Result<crate::ProcessRegistration, crate::PluginError> {
263        if registration.env_ref.is_some() {
264            return Ok(registration);
265        }
266        match registration.input.as_ref() {
267            crate::ProcessInput::ToolCall { .. } | crate::ProcessInput::Engine { .. } => {
268                let env_ref = self.captured_process_execution_env_ref().await?;
269                Ok(registration.with_execution_env_ref(Some(env_ref)))
270            }
271            crate::ProcessInput::External { .. } | crate::ProcessInput::SessionTurn { .. } => {
272                Ok(registration)
273            }
274        }
275    }
276
277    pub async fn captured_process_execution_env_ref(
278        &self,
279    ) -> Result<crate::ProcessExecutionEnvRef, crate::PluginError> {
280        if let Some(env_ref) = self.process_env_ref.clone() {
281            return Ok(env_ref);
282        }
283        crate::persist_process_execution_env(
284            self.process_env_store.as_ref(),
285            &self.execution_env_spec,
286        )
287        .await
288    }
289
290    pub(crate) fn with_turn_phase_probe(
291        mut self,
292        probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
293    ) -> Self {
294        self.turn_phase_probe = probe;
295        self
296    }
297
298    #[doc(hidden)]
299    pub fn named_phase(&self, phase: &'static str) -> crate::runtime::RuntimeNamedPhase {
300        crate::runtime::RuntimeNamedPhase::begin(self.turn_phase_probe.clone(), phase)
301    }
302
303    pub fn parent_invocation(&self) -> Option<&crate::RuntimeInvocation> {
304        self.parent_invocation.as_ref()
305    }
306
307    pub(crate) fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
308        self.cancellation_token = Some(cancellation_token);
309        self
310    }
311
312    pub(crate) fn tool_scheduling(&self, name: &str) -> crate::ToolScheduling {
313        crate::tool_dispatch::resolve_tool_scheduling(&self.dispatch, name)
314    }
315
316    pub fn callable_tool_manifest(&self, name: &str) -> Option<crate::ToolManifest> {
317        crate::tool_dispatch::resolve_callable_manifest(&self.dispatch, name)
318    }
319
320    pub fn callable_tool_manifest_by_id(&self, id: &crate::ToolId) -> Option<crate::ToolManifest> {
321        crate::tool_dispatch::resolve_callable_manifest_by_id(&self.dispatch, id)
322    }
323
324    pub fn tool_argument_projection_policy(
325        &self,
326        name: &str,
327    ) -> crate::ToolArgumentProjectionPolicy {
328        crate::tool_dispatch::resolve_tool_argument_projection_policy(&self.dispatch, name)
329    }
330
331    pub async fn start_child_process(
332        &self,
333        registration: crate::ProcessRegistration,
334        kind: impl Into<String>,
335        label: Option<String>,
336    ) -> crate::ToolInvocationReply {
337        let _phase = self.named_phase("process.start_child");
338        let registration = match self
339            .attach_captured_process_execution_env(registration)
340            .await
341        {
342            Ok(registration) => registration,
343            Err(err) => {
344                return crate::ToolInvocationReply::error(serde_json::json!(err.to_string()));
345            }
346        };
347        let process_id = registration.id.clone();
348        let mut options = crate::ProcessStartOptions::new()
349            .with_descriptor(crate::ProcessHandleDescriptor::new(Some(kind), label));
350        if let Some(spawn) = self.process_spawn_provenance() {
351            options = options.with_spawn_provenance(spawn);
352        }
353        match self
354            .dispatch
355            .processes
356            .start(
357                &self.session_id,
358                registration,
359                options,
360                self.process_scope(self.parent_invocation.clone()),
361            )
362            .await
363        {
364            Ok(_) => {
365                self.record_started_process(&process_id);
366                crate::ToolInvocationReply::success(Self::process_handle_json(&process_id))
367            }
368            Err(err) => crate::ToolInvocationReply::error(serde_json::json!(err.to_string())),
369        }
370    }
371
372    pub async fn sleep_process(
373        &self,
374        scope: &str,
375        sequence: u64,
376        duration_ms: u64,
377    ) -> Result<(), crate::RuntimeEffectControllerError> {
378        let cancellation = self.cancellation_token.clone().unwrap_or_default();
379        let invocation = crate::runtime::causal::process_sleep_invocation(
380            &self.session_id,
381            self.parent_invocation.as_ref(),
382            scope,
383            sequence,
384        );
385        let outcome = self
386            .dispatch
387            .effect_controller
388            .controller()
389            .execute_effect(
390                crate::RuntimeEffectEnvelope::new(
391                    invocation,
392                    crate::RuntimeEffectCommand::Sleep { duration_ms },
393                ),
394                crate::RuntimeEffectLocalExecutor::sleep_with_clock(
395                    cancellation,
396                    std::sync::Arc::clone(&self.dispatch.clock),
397                ),
398            )
399            .await?;
400        match outcome {
401            crate::RuntimeEffectOutcome::Sleep => Ok(()),
402            other => Err(crate::RuntimeEffectControllerError::new(
403                "runtime_effect_wrong_outcome",
404                format!("expected sleep outcome, got {}", other.kind().as_str()),
405            )),
406        }
407    }
408
409    pub async fn await_process_signal_event(
410        &self,
411        process_id: &str,
412        signal_name: &str,
413        event_ordinal: u64,
414    ) -> Result<serde_json::Value, crate::RuntimeEffectControllerError> {
415        let cancellation = self.cancellation_token.clone().unwrap_or_default();
416        let key = self
417            .dispatch
418            .effect_controller
419            .controller()
420            .await_event_key(
421                &crate::ExecutionScope::process(process_id),
422                crate::AwaitEventWaitIdentity::process_signal(
423                    process_id,
424                    signal_name,
425                    event_ordinal,
426                ),
427            )
428            .await?;
429        let invocation = crate::runtime::causal::process_await_event_invocation(
430            &self.session_id,
431            self.parent_invocation.as_ref(),
432            process_id,
433            signal_name,
434            event_ordinal,
435        );
436        let outcome = self
437            .dispatch
438            .effect_controller
439            .controller()
440            .execute_effect(
441                crate::RuntimeEffectEnvelope::new(
442                    invocation,
443                    crate::RuntimeEffectCommand::AwaitEvent { key },
444                ),
445                crate::RuntimeEffectLocalExecutor::await_event_with_clock(
446                    cancellation,
447                    None,
448                    std::sync::Arc::clone(&self.dispatch.clock),
449                ),
450            )
451            .await?;
452        match outcome.into_await_event()? {
453            crate::Resolution::Ok(value) => Ok(value),
454            crate::Resolution::Err(err) => Err(crate::RuntimeEffectControllerError::new(
455                err.code,
456                err.message,
457            )),
458            crate::Resolution::Timeout => Err(crate::RuntimeEffectControllerError::new(
459                "process_signal_wait_timeout",
460                "process signal wait timed out",
461            )),
462            crate::Resolution::Cancelled => Err(crate::RuntimeEffectControllerError::new(
463                "process_signal_wait_cancelled",
464                "process signal wait was cancelled",
465            )),
466        }
467    }
468
469    pub async fn signal_process_by_id(
470        &self,
471        registry: Arc<dyn crate::ProcessRegistry>,
472        process_id: &str,
473        signal_name: &str,
474        signal_id: String,
475        payload: serde_json::Value,
476    ) -> Result<crate::ProcessEvent, crate::RuntimeEffectControllerError> {
477        let event_type = crate::process_signal_event_type(signal_name)?;
478        let replay_key = format!("process:{process_id}:signal.{signal_name}:{signal_id}");
479        let signal_payload = payload.clone();
480        let command = crate::ProcessCommand::Signal {
481            process_id: process_id.to_string(),
482            signal_name: signal_name.to_string(),
483            signal_id,
484            request: crate::ProcessEventAppendRequest::new(event_type.clone(), payload)
485                .with_replay_key(replay_key),
486        };
487        let effect_id = command.effect_id();
488        let invocation = crate::runtime::causal::process_effect_invocation(
489            &self.session_id,
490            self.parent_invocation.clone(),
491            &effect_id,
492        );
493        let outcome = self
494            .dispatch
495            .effect_controller
496            .controller()
497            .execute_effect(
498                crate::RuntimeEffectEnvelope::new(
499                    invocation,
500                    crate::RuntimeEffectCommand::process(command),
501                ),
502                crate::RuntimeEffectLocalExecutor::processes(Arc::clone(&registry)),
503            )
504            .await?;
505        match outcome.into_process()? {
506            crate::ProcessEffectOutcome::Signal { event } => {
507                let waiting_ordinal =
508                    registry
509                        .get_process(process_id)
510                        .await
511                        .and_then(|record| match record.wait {
512                            Some(crate::WaitState {
513                                kind:
514                                    crate::WaitKind::Signal {
515                                        name,
516                                        event_type: wait_event_type,
517                                        ordinal,
518                                        ..
519                                    },
520                                ..
521                            }) if name == signal_name && wait_event_type == event_type => {
522                                Some(ordinal)
523                            }
524                            _ => None,
525                        });
526                let ordinal = match waiting_ordinal {
527                    Some(ordinal) => ordinal,
528                    None => {
529                        registry
530                            .count_events_through(process_id, &event_type, event.sequence)
531                            .await?
532                    }
533                };
534                if ordinal > 0 {
535                    let key = self
536                        .dispatch
537                        .effect_controller
538                        .controller()
539                        .await_event_key(
540                            &crate::ExecutionScope::process(process_id),
541                            crate::AwaitEventWaitIdentity::process_signal(
542                                process_id,
543                                signal_name,
544                                ordinal,
545                            ),
546                        )
547                        .await?;
548                    let _ = self
549                        .dispatch
550                        .effect_controller
551                        .controller()
552                        .resolve_await_event(&key, crate::Resolution::Ok(signal_payload))
553                        .await?;
554                }
555                Ok(event)
556            }
557            other => Err(crate::RuntimeEffectControllerError::new(
558                "runtime_effect_wrong_outcome",
559                format!("expected signal outcome, got {other:?}"),
560            )),
561        }
562    }
563
564    pub async fn append_process_event(
565        &self,
566        registry: Arc<dyn crate::ProcessRegistry>,
567        process_id: &str,
568        request: crate::ProcessEventAppendRequest,
569    ) -> Result<crate::ProcessEvent, crate::PluginError> {
570        let result = registry.append_event(process_id, request).await?;
571        if let Some(context) = self.process_event_context.as_ref() {
572            crate::tool_provider::process_events::enqueue_wake_delivery(
573                context.store.clone(),
574                context.session_store_factory.as_ref(),
575                result.wake_delivery,
576                Some(self.session_graph_service()),
577                context.queued_work_driver.as_ref(),
578            )
579            .await?;
580        }
581        Ok(result.event)
582    }
583}
584
585#[cfg(test)]
586mod tests {
587    use super::*;
588    use crate::tool_dispatch::ToolDispatchContext;
589    use crate::{ToolCall, ToolProvider, ToolResult};
590
591    struct NoopTools;
592
593    #[async_trait::async_trait]
594    impl ToolProvider for NoopTools {
595        fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
596            Vec::new()
597        }
598
599        fn resolve_contract(&self, _name: &str) -> Option<Arc<crate::ToolContract>> {
600            None
601        }
602
603        async fn execute(&self, _call: ToolCall<'_>) -> ToolResult {
604            ToolResult::err_fmt("not used")
605        }
606    }
607
608    #[test]
609    fn tool_argument_projection_policy_resolves_from_active_catalog_and_defaults_unknown() {
610        let tool = crate::ToolDefinition::raw(
611            "tool:seedy",
612            "seedy",
613            "Seed-aware",
614            crate::ToolDefinition::default_input_schema(),
615            serde_json::json!({ "type": "string" }),
616        )
617        .with_argument_projection(
618            crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed"),
619        );
620        let plugins = crate::plugin::PluginHost::empty()
621            .build_session("session", None)
622            .expect("plugin session");
623        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
624        let dispatch = Arc::new(ToolDispatchContext {
625            plugins,
626            tools: Arc::new(NoopTools),
627            tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
628                vec![tool.manifest()],
629                std::collections::BTreeMap::new(),
630            )),
631            sessions: Arc::new(crate::testing::MockSessionManager::default()),
632            session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
633            session_graph: Arc::new(crate::testing::MockSessionManager::default()),
634            processes: Arc::new(crate::UnavailableProcessService),
635            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
636            trigger_router: None,
637            effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
638                crate::InlineRuntimeEffectController,
639            )),
640            direct_completions: crate::DirectCompletionClient::unavailable(
641                "direct completions are unavailable in this test context",
642            ),
643            parent_invocation: None,
644            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
645                crate::PluginOptions::default(),
646                crate::SessionPolicy::default(),
647            ),
648            session_id: "session".to_string(),
649            agent_frame_id: String::new(),
650            event_tx,
651            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
652            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
653            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
654            turn_context: crate::TurnContext::default(),
655            clock: std::sync::Arc::new(crate::SystemClock),
656        });
657        let ctx = RuntimeExecutionContext::new(
658            "session".to_string(),
659            dispatch,
660            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
661            Arc::new(crate::InMemoryAttachmentStore::new()),
662            Arc::new(crate::ChronologicalProjection::default()),
663            None,
664            crate::TurnContext::default(),
665        );
666
667        assert_eq!(
668            ctx.tool_argument_projection_policy("seedy"),
669            crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed")
670        );
671        assert_eq!(
672            ctx.tool_argument_projection_policy("missing"),
673            crate::ToolArgumentProjectionPolicy::MaterializeProjectedValues
674        );
675    }
676}