Skip to main content

lash_core/session/
execution_context.rs

1use std::sync::Arc;
2
3use tokio::sync::mpsc::Sender;
4use tokio_util::sync::CancellationToken;
5
6use crate::tool_dispatch::ToolDispatchContext;
7use crate::{TurnActivity, TurnActivityId, TurnEvent};
8
9pub(crate) fn lashlang_host_environment_from_tool_catalog(
10    catalog: &crate::ToolCatalog,
11    abilities: lashlang::LashlangAbilities,
12    language_features: lashlang::LashlangLanguageFeatures,
13    host_resources: lashlang::LashlangHostCatalog,
14) -> lashlang::LashlangHostEnvironment {
15    let mut resources = lashlang_resources_from_tool_catalog(catalog);
16    resources.extend(host_resources);
17    lashlang::LashlangHostEnvironment::new(resources, abilities)
18        .with_language_features(language_features)
19}
20
21pub(crate) fn lashlang_resources_from_tool_catalog(
22    catalog: &crate::ToolCatalog,
23) -> lashlang::LashlangHostCatalog {
24    let mut host_catalog = lashlang::LashlangHostCatalog::new();
25    for entry in catalog.tools.iter() {
26        if entry.availability.is_callable() {
27            let lashlang_binding = entry
28                .manifest
29                .lashlang_binding
30                .executable_for(&entry.manifest.name);
31            host_catalog.add_module_operation(
32                lashlang_binding.module_path.iter().map(String::as_str),
33                lashlang_binding.authority_type.clone(),
34                lashlang_binding.operation.clone(),
35                entry.manifest.name.clone(),
36                lashlang::TypeExpr::Any,
37                lashlang::TypeExpr::Any,
38            );
39        }
40    }
41    host_catalog
42}
43
44#[derive(Clone)]
45pub struct RuntimeExecutionContext<'run> {
46    pub(super) session_id: String,
47    pub(super) dispatch: Arc<ToolDispatchContext<'run>>,
48    lashlang_abilities: lashlang::LashlangAbilities,
49    lashlang_language_features: lashlang::LashlangLanguageFeatures,
50    lashlang_host_environment: lashlang::LashlangHostEnvironment,
51    lashlang_artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
52    attachment_store: Arc<dyn crate::AttachmentStore>,
53    chronological_projection: Arc<crate::ChronologicalProjection>,
54    protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
55    turn_context: crate::TurnContext,
56    execution_env_spec: crate::ProcessExecutionEnvSpec,
57    process_originator: Option<crate::ProcessOriginator>,
58    pub(super) runtime_process_id: Option<String>,
59    process_env_ref: Option<crate::ProcessExecutionEnvRef>,
60    process_wake_target: Option<crate::SessionScope>,
61    pub(super) parent_invocation: Option<crate::RuntimeInvocation>,
62    lashlang_execution_sink: Option<Arc<dyn lash_trace::TraceSink>>,
63    lashlang_execution_context: lash_trace::TraceContext,
64    turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
65    pub(super) turn_event_tx: Option<Sender<TurnActivity>>,
66    pub(super) cancellation_token: Option<CancellationToken>,
67    /// Process ids started by THIS execution context. Possession of a handle
68    /// the run itself created is sufficient capability to await/cancel it —
69    /// run-local children are not session handle grants (the ephemeral
70    /// execution scope must never appear in durable grant state).
71    started_process_ids: Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
72}
73
74impl<'run> RuntimeExecutionContext<'run> {
75    pub(crate) fn drain_tool_trigger_outcomes(
76        &self,
77    ) -> Result<Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>, crate::PluginError> {
78        self.dispatch
79            .trigger_outcomes
80            .drain()
81            .map_err(crate::PluginError::Session)
82    }
83
84    pub(super) fn process_scope(
85        &self,
86        parent_invocation: Option<crate::RuntimeInvocation>,
87    ) -> crate::ProcessOpScope<'_> {
88        crate::ProcessOpScope::new(self.dispatch.effect_controller.scoped())
89            .with_parent_invocation(parent_invocation)
90            .with_agent_frame_id(Some(self.dispatch.agent_frame_id.clone()))
91    }
92
93    #[allow(
94        clippy::too_many_arguments,
95        reason = "code execution bridge carries explicit per-turn runtime dependencies"
96    )]
97    pub(crate) fn new(
98        session_id: String,
99        dispatch: Arc<ToolDispatchContext<'run>>,
100        lashlang_abilities: lashlang::LashlangAbilities,
101        lashlang_language_features: lashlang::LashlangLanguageFeatures,
102        lashlang_artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
103        attachment_store: Arc<dyn crate::AttachmentStore>,
104        chronological_projection: Arc<crate::ChronologicalProjection>,
105        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
106        turn_context: crate::TurnContext,
107    ) -> Self {
108        let lashlang_host_environment = lashlang_host_environment_from_tool_catalog(
109            &dispatch.tool_catalog,
110            lashlang_abilities,
111            lashlang_language_features,
112            dispatch.plugins.lashlang_resources(),
113        );
114        Self {
115            session_id,
116            dispatch,
117            lashlang_abilities,
118            lashlang_language_features,
119            lashlang_host_environment,
120            lashlang_artifact_store,
121            attachment_store,
122            chronological_projection,
123            protocol_extension,
124            turn_context,
125            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
126                crate::PluginOptions::default(),
127                crate::SessionPolicy::default(),
128            ),
129            process_originator: None,
130            runtime_process_id: None,
131            started_process_ids: Arc::default(),
132            process_env_ref: None,
133            process_wake_target: None,
134            parent_invocation: None,
135            lashlang_execution_sink: None,
136            lashlang_execution_context: lash_trace::TraceContext::default(),
137            turn_phase_probe: None,
138            turn_event_tx: None,
139            cancellation_token: None,
140        }
141    }
142
143    pub fn session_id(&self) -> &str {
144        &self.session_id
145    }
146
147    pub fn attachment_store(&self) -> Arc<dyn crate::AttachmentStore> {
148        Arc::clone(&self.attachment_store)
149    }
150
151    pub async fn put_lashlang_module_artifact(
152        &self,
153        artifact: &lashlang::ModuleArtifact,
154    ) -> Result<(), String> {
155        self.lashlang_artifact_store
156            .put_module_artifact(artifact)
157            .await
158            .map_err(|err| err.to_string())
159    }
160
161    pub fn chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
162        Arc::clone(&self.chronological_projection)
163    }
164
165    pub fn protocol_extension<T: 'static>(&self) -> Option<&T> {
166        self.protocol_extension
167            .as_ref()
168            .and_then(|extension| extension.as_any().downcast_ref::<T>())
169    }
170
171    pub fn turn_context(&self) -> &crate::TurnContext {
172        &self.turn_context
173    }
174
175    pub(crate) fn session_graph_service(&self) -> &dyn crate::plugin::SessionGraphService {
176        self.dispatch.session_graph.as_ref()
177    }
178
179    pub(super) async fn emit_turn_activity(
180        &self,
181        correlation_id: TurnActivityId,
182        event: TurnEvent,
183    ) {
184        if let Some(tx) = &self.turn_event_tx {
185            let _ = tx.send(TurnActivity::new(correlation_id, event)).await;
186        }
187    }
188
189    pub(crate) fn with_turn_event_sender(mut self, turn_event_tx: Sender<TurnActivity>) -> Self {
190        self.turn_event_tx = Some(turn_event_tx);
191        self
192    }
193
194    pub(crate) fn with_parent_invocation(mut self, metadata: crate::RuntimeInvocation) -> Self {
195        self.parent_invocation = Some(metadata);
196        self
197    }
198
199    pub(crate) fn with_execution_env_spec(
200        mut self,
201        execution_env_spec: crate::ProcessExecutionEnvSpec,
202    ) -> Self {
203        self.execution_env_spec = execution_env_spec;
204        self
205    }
206
207    pub(crate) fn with_process_registration_context(
208        mut self,
209        registration: &crate::ProcessRegistration,
210    ) -> Self {
211        self.process_originator = Some(registration.provenance.originator.clone());
212        self.runtime_process_id = Some(registration.id.clone());
213        self.process_env_ref = registration.env_ref.clone();
214        self.process_wake_target = registration.wake_target.clone();
215        self
216    }
217
218    /// Spawn provenance for children started by this context, present only
219    /// when this context executes a process: children inherit the chain's
220    /// originator and wake target instead of the ephemeral execution scope.
221    pub(super) fn record_started_process(&self, process_id: &str) {
222        self.started_process_ids
223            .lock()
224            .expect("started process ids lock")
225            .insert(process_id.to_string());
226    }
227
228    pub(super) fn is_run_local_process(&self, process_id: &str) -> bool {
229        self.started_process_ids
230            .lock()
231            .expect("started process ids lock")
232            .contains(process_id)
233    }
234
235    pub(crate) fn process_spawn_provenance(&self) -> Option<crate::ProcessSpawnProvenance> {
236        self.process_originator
237            .clone()
238            .map(|originator| crate::ProcessSpawnProvenance {
239                originator,
240                wake_target: self.process_wake_target.clone(),
241            })
242    }
243
244    pub(super) async fn attach_captured_process_execution_env(
245        &self,
246        registration: crate::ProcessRegistration,
247    ) -> Result<crate::ProcessRegistration, crate::PluginError> {
248        if registration.env_ref.is_some() {
249            return Ok(registration);
250        }
251        match registration.input.as_ref() {
252            crate::ProcessInput::ToolCall { .. } | crate::ProcessInput::LashlangProcess { .. } => {
253                let env_ref = self.captured_process_execution_env_ref().await?;
254                Ok(registration.with_execution_env_ref(Some(env_ref)))
255            }
256            crate::ProcessInput::External { .. } | crate::ProcessInput::SessionTurn { .. } => {
257                Ok(registration)
258            }
259        }
260    }
261
262    async fn captured_process_execution_env_ref(
263        &self,
264    ) -> Result<crate::ProcessExecutionEnvRef, crate::PluginError> {
265        if let Some(env_ref) = self.process_env_ref.clone() {
266            return Ok(env_ref);
267        }
268        crate::persist_process_execution_env(
269            self.lashlang_artifact_store.as_ref(),
270            &self.execution_env_spec,
271        )
272        .await
273    }
274
275    pub(crate) fn with_lashlang_execution_trace(
276        mut self,
277        sink: Option<Arc<dyn lash_trace::TraceSink>>,
278        context: lash_trace::TraceContext,
279    ) -> Self {
280        self.lashlang_execution_sink = sink;
281        self.lashlang_execution_context = context;
282        self
283    }
284
285    pub(crate) fn with_turn_phase_probe(
286        mut self,
287        probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
288    ) -> Self {
289        self.turn_phase_probe = probe;
290        self
291    }
292
293    #[doc(hidden)]
294    pub fn named_phase(&self, phase: &'static str) -> crate::runtime::RuntimeNamedPhase {
295        crate::runtime::RuntimeNamedPhase::begin(self.turn_phase_probe.clone(), phase)
296    }
297
298    pub fn parent_invocation(&self) -> Option<&crate::RuntimeInvocation> {
299        self.parent_invocation.as_ref()
300    }
301
302    pub fn lashlang_execution_sink(&self) -> Option<Arc<dyn lash_trace::TraceSink>> {
303        self.lashlang_execution_sink.clone()
304    }
305
306    pub fn lashlang_execution_context(&self) -> &lash_trace::TraceContext {
307        &self.lashlang_execution_context
308    }
309
310    pub(crate) fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
311        self.cancellation_token = Some(cancellation_token);
312        self
313    }
314
315    pub(crate) fn tool_scheduling(&self, name: &str) -> crate::ToolScheduling {
316        crate::tool_dispatch::resolve_tool_scheduling(&self.dispatch, name)
317    }
318
319    pub fn callable_tool_manifest(&self, name: &str) -> Option<crate::ToolManifest> {
320        crate::tool_dispatch::resolve_callable_manifest(&self.dispatch, name)
321    }
322
323    pub fn callable_tool_manifest_by_id(&self, id: &crate::ToolId) -> Option<crate::ToolManifest> {
324        crate::tool_dispatch::resolve_callable_manifest_by_id(&self.dispatch, id)
325    }
326
327    pub fn resolve_lashlang_host_operation(
328        &self,
329        receiver: &lashlang::ResourceHandle,
330        operation: &str,
331    ) -> Result<String, String> {
332        self.lashlang_host_environment
333            .resources
334            .resolve_module_operation(&receiver.resource_type, &receiver.alias, operation)
335            .map(|binding| binding.host_operation.clone())
336            .ok_or_else(|| {
337                format!(
338                    "module `{}` of type `{}` does not expose operation `{operation}`",
339                    receiver.alias, receiver.resource_type
340                )
341            })
342    }
343
344    pub async fn prepare_lashlang_process_start(
345        &self,
346        start: lashlang::ProcessStart,
347    ) -> Result<(crate::ProcessRegistration, Option<String>), String> {
348        let _phase = self.named_phase("rlm_process.prepare_start");
349        let display_name = Some(start.process_name.clone());
350        let artifact = self
351            .lashlang_artifact_store
352            .get_module_artifact(&start.module_ref)
353            .await
354            .map_err(|err| format!("failed to load lashlang module artifact: {err}"))?
355            .ok_or_else(|| {
356                format!(
357                    "missing lashlang module artifact `{}` for process `{}`",
358                    start.module_ref, start.process_name
359                )
360            })?;
361        if artifact.host_requirements_ref != start.host_requirements_ref {
362            return Err(format!(
363                "lashlang module artifact `{}` host requirements mismatch: process requested {}, artifact has {}",
364                start.module_ref, start.host_requirements_ref, artifact.host_requirements_ref
365            ));
366        }
367        if artifact.process_ref(&start.process_name) != Some(&start.process_ref) {
368            return Err(format!(
369                "lashlang module artifact `{}` does not export process `{}` as requested ref {:?}",
370                start.module_ref, start.process_name, start.process_ref
371            ));
372        }
373        let args = match serde_json::to_value(lashlang::Value::Record(Arc::new(start.args)))
374            .map_err(|err| format!("failed to serialize process args: {err}"))?
375        {
376            serde_json::Value::Object(map) => map,
377            _ => return Err("process args must serialize as a record".to_string()),
378        };
379        let signal_event_types = artifact
380            .canonical_ir
381            .process(&start.process_name)
382            .map(crate::lashlang_process_signal_event_types)
383            .unwrap_or_default();
384        let process_id = format!("process:{}", uuid::Uuid::new_v4());
385        let registration = crate::ProcessRegistration::session_start_draft(
386            process_id,
387            crate::ProcessInput::LashlangProcess {
388                module_ref: start.module_ref,
389                process_ref: start.process_ref,
390                host_requirements_ref: start.host_requirements_ref,
391                process_name: start.process_name,
392                args,
393            },
394        )
395        .with_extra_event_types(
396            crate::lashlang_process_event_types()
397                .into_iter()
398                .chain(signal_event_types),
399        );
400        Ok((registration, display_name))
401    }
402
403    pub fn lashlang_host_environment(&self) -> &lashlang::LashlangHostEnvironment {
404        &self.lashlang_host_environment
405    }
406
407    pub fn lashlang_abilities(&self) -> lashlang::LashlangAbilities {
408        self.lashlang_abilities
409    }
410
411    pub fn lashlang_language_features(&self) -> lashlang::LashlangLanguageFeatures {
412        self.lashlang_language_features
413    }
414
415    pub fn link_lashlang_module(
416        &self,
417        program: lashlang::Program,
418    ) -> Result<lashlang::LinkedModule, String> {
419        lashlang::LinkedModule::link(program, self.lashlang_host_environment())
420            .map_err(|err| err.to_string())
421    }
422
423    pub async fn perform_lashlang_trigger_operation(
424        &self,
425        operation: &str,
426        payload: serde_json::Value,
427    ) -> Result<serde_json::Value, String> {
428        match lashlang::TriggerHostOperation::from_host_operation(operation) {
429            Some(lashlang::TriggerHostOperation::Register) => self
430                .register_trigger_subscription(payload)
431                .await
432                .map_err(|err| err.to_string()),
433            Some(lashlang::TriggerHostOperation::List) => self
434                .list_trigger_subscriptions(payload)
435                .await
436                .map_err(|err| err.to_string()),
437            Some(lashlang::TriggerHostOperation::Cancel) => self
438                .cancel_trigger_subscription(payload)
439                .await
440                .map_err(|err| err.to_string()),
441            None => Err(format!("unknown trigger operation `{operation}`")),
442        }
443    }
444
445    async fn register_trigger_subscription(
446        &self,
447        payload: serde_json::Value,
448    ) -> Result<serde_json::Value, crate::PluginError> {
449        let router = self.dispatch.trigger_router.as_ref().ok_or_else(|| {
450            crate::PluginError::Session("trigger store is unavailable in this runtime".to_string())
451        })?;
452        let request = lashlang::TriggerRegistrationRequest::decode(&payload)
453            .map_err(|err| crate::PluginError::Session(err.to_string()))?;
454        let source_type = request.source.source_type.clone();
455        let source_value = request.source.value.clone();
456        let source = request.source.to_json();
457        let event_type = lashlang::event_type_for_source(
458            &self.dispatch.plugins.lashlang_resources(),
459            &source_type,
460        )
461        .map_err(|err| crate::PluginError::Session(err.to_string()))?;
462        let validation = crate::plugin::validate_target_process(
463            &request.target,
464            &event_type,
465            &request.inputs,
466            self.lashlang_artifact_store.as_ref(),
467        )
468        .await?;
469        let store = router.store();
470        let source_key = store
471            .source_key_for_subscription(&source_type, &source_value)
472            .await?;
473        let env_ref = match self.process_env_ref.clone() {
474            Some(env_ref) => env_ref,
475            None => {
476                crate::persist_process_execution_env(
477                    self.lashlang_artifact_store.as_ref(),
478                    &self.execution_env_spec,
479                )
480                .await?
481            }
482        };
483        let registrant = self.process_originator.clone().unwrap_or_else(|| {
484            crate::ProcessOriginator::session(crate::SessionScope::new(self.session_id.clone()))
485        });
486        let wake_target = self
487            .process_wake_target
488            .clone()
489            .or_else(|| match &registrant {
490                crate::ProcessOriginator::Session { scope } => Some(scope.clone()),
491                crate::ProcessOriginator::Host => None,
492            });
493        let record = store
494            .register_subscription(crate::TriggerSubscriptionDraft {
495                registrant,
496                env_ref,
497                wake_target,
498                name: request.name,
499                source_type,
500                source_key,
501                source,
502                event_ty: validation.event_ty,
503                module_ref: request.target.module_ref,
504                host_requirements_ref: request.target.host_requirements_ref,
505                process_ref: request.target.process_ref,
506                process_name: request.target.process_name,
507                input_template: validation.inputs,
508            })
509            .await?;
510        Ok(crate::plugin::trigger_handle_json(&record.handle))
511    }
512
513    async fn list_trigger_subscriptions(
514        &self,
515        payload: serde_json::Value,
516    ) -> Result<serde_json::Value, crate::PluginError> {
517        let router = self.dispatch.trigger_router.as_ref().ok_or_else(|| {
518            crate::PluginError::Session("trigger store is unavailable in this runtime".to_string())
519        })?;
520        let request = lashlang::TriggerListRequest::decode(&payload)
521            .map_err(|err| crate::PluginError::Session(err.to_string()))?;
522        let mut filter = crate::TriggerSubscriptionFilter::for_session(&self.session_id);
523        filter.target = request.target;
524        filter.name = request.name;
525        filter.source_type = request.source_type;
526        filter.enabled = request.enabled;
527        let registrations = router
528            .store()
529            .list_subscriptions(filter)
530            .await?
531            .iter()
532            .map(crate::TriggerRegistration::from)
533            .collect::<Vec<_>>();
534        serde_json::to_value(registrations).map_err(|err| {
535            crate::PluginError::Session(format!("failed to encode trigger registrations: {err}"))
536        })
537    }
538
539    async fn cancel_trigger_subscription(
540        &self,
541        payload: serde_json::Value,
542    ) -> Result<serde_json::Value, crate::PluginError> {
543        let router = self.dispatch.trigger_router.as_ref().ok_or_else(|| {
544            crate::PluginError::Session("trigger store is unavailable in this runtime".to_string())
545        })?;
546        let request = lashlang::TriggerCancelRequest::decode(&payload)
547            .map_err(|err| crate::PluginError::Session(err.to_string()))?;
548        let changed = router
549            .store()
550            .cancel_subscription(&self.session_id, &request.handle)
551            .await?;
552        Ok(serde_json::json!(changed))
553    }
554
555    pub fn tool_argument_projection_policy(
556        &self,
557        name: &str,
558    ) -> crate::ToolArgumentProjectionPolicy {
559        crate::tool_dispatch::resolve_tool_argument_projection_policy(&self.dispatch, name)
560    }
561
562    pub async fn start_lashlang_process(
563        &self,
564        registration: crate::ProcessRegistration,
565        label: Option<String>,
566    ) -> crate::ToolInvocationReply {
567        let _phase = self.named_phase("rlm_process.start");
568        let registration = match self
569            .attach_captured_process_execution_env(registration)
570            .await
571        {
572            Ok(registration) => registration,
573            Err(err) => {
574                return crate::ToolInvocationReply::error(serde_json::json!(err.to_string()));
575            }
576        };
577        let process_id = registration.id.clone();
578        let mut options = crate::ProcessStartOptions::new()
579            .with_descriptor(crate::ProcessHandleDescriptor::new(Some("lashlang"), label));
580        if let Some(spawn) = self.process_spawn_provenance() {
581            options = options.with_spawn_provenance(spawn);
582        }
583        match self
584            .dispatch
585            .processes
586            .start(
587                &self.session_id,
588                registration,
589                options,
590                self.process_scope(self.parent_invocation.clone()),
591            )
592            .await
593        {
594            Ok(_) => {
595                self.record_started_process(&process_id);
596                crate::ToolInvocationReply::success(crate::lashlang_bridge::process_handle_json(
597                    &process_id,
598                ))
599            }
600            Err(err) => crate::ToolInvocationReply::error(serde_json::json!(err.to_string())),
601        }
602    }
603
604    pub async fn sleep_lashlang(
605        &self,
606        scope: &str,
607        sequence: u64,
608        duration_ms: u64,
609    ) -> Result<(), crate::RuntimeEffectControllerError> {
610        let cancellation = self.cancellation_token.clone().unwrap_or_default();
611        let invocation = crate::runtime::causal::lashlang_sleep_invocation(
612            &self.session_id,
613            self.parent_invocation.as_ref(),
614            scope,
615            sequence,
616        );
617        let outcome = self
618            .dispatch
619            .effect_controller
620            .controller()
621            .execute_effect(
622                crate::RuntimeEffectEnvelope::new(
623                    invocation,
624                    crate::RuntimeEffectCommand::Sleep { duration_ms },
625                ),
626                crate::RuntimeEffectLocalExecutor::sleep(cancellation),
627            )
628            .await?;
629        match outcome {
630            crate::RuntimeEffectOutcome::Sleep => Ok(()),
631            other => Err(crate::RuntimeEffectControllerError::new(
632                "runtime_effect_wrong_outcome",
633                format!("expected sleep outcome, got {}", other.kind().as_str()),
634            )),
635        }
636    }
637
638    pub async fn await_process_event_lashlang(
639        &self,
640        _registry: Arc<dyn crate::ProcessRegistry>,
641        process_id: &str,
642        signal_name: &str,
643        _event_type: &str,
644        event_ordinal: u64,
645    ) -> Result<serde_json::Value, crate::RuntimeEffectControllerError> {
646        let cancellation = self.cancellation_token.clone().unwrap_or_default();
647        let key = self
648            .dispatch
649            .effect_controller
650            .controller()
651            .await_event_key(
652                &crate::ExecutionScope::process(process_id),
653                crate::AwaitEventWaitIdentity::process_signal(
654                    process_id,
655                    signal_name,
656                    event_ordinal,
657                ),
658            )
659            .await?;
660        let invocation = crate::runtime::causal::lashlang_await_event_invocation(
661            &self.session_id,
662            self.parent_invocation.as_ref(),
663            process_id,
664            signal_name,
665            event_ordinal,
666        );
667        let outcome = self
668            .dispatch
669            .effect_controller
670            .controller()
671            .execute_effect(
672                crate::RuntimeEffectEnvelope::new(
673                    invocation,
674                    crate::RuntimeEffectCommand::AwaitEvent { key },
675                ),
676                crate::RuntimeEffectLocalExecutor::await_event(cancellation, None),
677            )
678            .await?;
679        match outcome.into_await_event()? {
680            crate::Resolution::Ok(value) => Ok(value),
681            crate::Resolution::Err(err) => Err(crate::RuntimeEffectControllerError::new(
682                err.code,
683                err.message,
684            )),
685            crate::Resolution::Timeout => Err(crate::RuntimeEffectControllerError::new(
686                "process_signal_wait_timeout",
687                "process signal wait timed out",
688            )),
689            crate::Resolution::Cancelled => Err(crate::RuntimeEffectControllerError::new(
690                "process_signal_wait_cancelled",
691                "process signal wait was cancelled",
692            )),
693        }
694    }
695
696    pub async fn signal_lashlang_process(
697        &self,
698        registry: Arc<dyn crate::ProcessRegistry>,
699        process_id: &str,
700        signal_name: &str,
701        signal_id: String,
702        payload: serde_json::Value,
703    ) -> Result<crate::ProcessEvent, crate::RuntimeEffectControllerError> {
704        let event_type = crate::process_signal_event_type(signal_name)?;
705        let replay_key = format!("process:{process_id}:signal.{signal_name}:{signal_id}");
706        let signal_payload = payload.clone();
707        let command = crate::ProcessCommand::Signal {
708            process_id: process_id.to_string(),
709            signal_name: signal_name.to_string(),
710            signal_id,
711            request: crate::ProcessEventAppendRequest::new(event_type.clone(), payload)
712                .with_replay_key(replay_key),
713        };
714        let effect_id = command.effect_id();
715        let invocation = crate::runtime::causal::process_effect_invocation(
716            &self.session_id,
717            self.parent_invocation.clone(),
718            &effect_id,
719        );
720        let outcome = self
721            .dispatch
722            .effect_controller
723            .controller()
724            .execute_effect(
725                crate::RuntimeEffectEnvelope::new(
726                    invocation,
727                    crate::RuntimeEffectCommand::process(command),
728                ),
729                crate::RuntimeEffectLocalExecutor::processes(Arc::clone(&registry)),
730            )
731            .await?;
732        match outcome.into_process()? {
733            crate::ProcessEffectOutcome::Signal { event } => {
734                let waiting_ordinal =
735                    registry
736                        .get_process(process_id)
737                        .await
738                        .and_then(|record| match record.wait {
739                            Some(crate::WaitState {
740                                kind:
741                                    crate::WaitKind::Signal {
742                                        name,
743                                        event_type: wait_event_type,
744                                        ordinal,
745                                        ..
746                                    },
747                                ..
748                            }) if name == signal_name && wait_event_type == event_type => {
749                                Some(ordinal)
750                            }
751                            _ => None,
752                        });
753                let ordinal = match waiting_ordinal {
754                    Some(ordinal) => ordinal,
755                    None => {
756                        registry
757                            .count_events_through(process_id, &event_type, event.sequence)
758                            .await?
759                    }
760                };
761                if ordinal > 0 {
762                    let key = self
763                        .dispatch
764                        .effect_controller
765                        .controller()
766                        .await_event_key(
767                            &crate::ExecutionScope::process(process_id),
768                            crate::AwaitEventWaitIdentity::process_signal(
769                                process_id,
770                                signal_name,
771                                ordinal,
772                            ),
773                        )
774                        .await?;
775                    let _ = self
776                        .dispatch
777                        .effect_controller
778                        .controller()
779                        .resolve_await_event(&key, crate::Resolution::Ok(signal_payload))
780                        .await?;
781                }
782                Ok(event)
783            }
784            other => Err(crate::RuntimeEffectControllerError::new(
785                "runtime_effect_wrong_outcome",
786                format!("expected signal outcome, got {other:?}"),
787            )),
788        }
789    }
790}
791
792#[cfg(test)]
793mod tests {
794    use super::*;
795    use crate::tool_dispatch::ToolDispatchContext;
796    use crate::{ToolCall, ToolProvider, ToolResult};
797
798    struct NoopTools;
799
800    #[async_trait::async_trait]
801    impl ToolProvider for NoopTools {
802        fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
803            Vec::new()
804        }
805
806        fn resolve_contract(&self, _name: &str) -> Option<Arc<crate::ToolContract>> {
807            None
808        }
809
810        async fn execute(&self, _call: ToolCall<'_>) -> ToolResult {
811            ToolResult::err_fmt("not used")
812        }
813    }
814
815    #[test]
816    fn tool_argument_projection_policy_resolves_from_active_catalog_and_defaults_unknown() {
817        let tool = crate::ToolDefinition::raw(
818            "tool:seedy",
819            "seedy",
820            "Seed-aware",
821            crate::ToolDefinition::default_input_schema(),
822            serde_json::json!({ "type": "string" }),
823        )
824        .with_argument_projection(
825            crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed"),
826        );
827        let plugins = crate::plugin::PluginHost::empty()
828            .build_session("session", None)
829            .expect("plugin session");
830        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
831        let dispatch = Arc::new(ToolDispatchContext {
832            plugins,
833            tools: Arc::new(NoopTools),
834            tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
835                vec![tool.manifest()],
836                std::collections::BTreeMap::new(),
837            )),
838            sessions: Arc::new(crate::testing::MockSessionManager::default()),
839            session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
840            session_graph: Arc::new(crate::testing::MockSessionManager::default()),
841            processes: Arc::new(crate::UnavailableProcessService),
842            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
843            trigger_router: None,
844            effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
845                crate::InlineRuntimeEffectController,
846            )),
847            direct_completions: crate::DirectCompletionClient::unavailable(
848                "direct completions are unavailable in this test context",
849            ),
850            parent_invocation: None,
851            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
852                crate::PluginOptions::default(),
853                crate::SessionPolicy::default(),
854            ),
855            session_id: "session".to_string(),
856            agent_frame_id: String::new(),
857            event_tx,
858            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
859            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
860            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
861            turn_context: crate::TurnContext::default(),
862        });
863        let ctx = RuntimeExecutionContext::new(
864            "session".to_string(),
865            dispatch,
866            Default::default(),
867            Default::default(),
868            Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
869            Arc::new(crate::InMemoryAttachmentStore::new()),
870            Arc::new(crate::ChronologicalProjection::default()),
871            None,
872            crate::TurnContext::default(),
873        );
874
875        assert_eq!(
876            ctx.tool_argument_projection_policy("seedy"),
877            crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed")
878        );
879        assert_eq!(
880            ctx.tool_argument_projection_policy("missing"),
881            crate::ToolArgumentProjectionPolicy::MaterializeProjectedValues
882        );
883    }
884
885    #[tokio::test]
886    async fn prepare_lashlang_process_start_captures_tool_ids_and_explicit_input() {
887        let tool = crate::ToolDefinition::raw(
888            "tool:alpha",
889            "alpha",
890            "Alpha tool.",
891            crate::ToolDefinition::default_input_schema(),
892            serde_json::json!({ "type": "object", "additionalProperties": true }),
893        );
894        let plugins = crate::plugin::PluginHost::empty()
895            .build_session("session", None)
896            .expect("plugin session");
897        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
898        let dispatch = Arc::new(ToolDispatchContext {
899            plugins,
900            tools: Arc::new(NoopTools),
901            tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
902                vec![tool.manifest()],
903                std::collections::BTreeMap::new(),
904            )),
905            sessions: Arc::new(crate::testing::MockSessionManager::default()),
906            session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
907            session_graph: Arc::new(crate::testing::MockSessionManager::default()),
908            processes: Arc::new(crate::UnavailableProcessService),
909            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
910            trigger_router: None,
911            effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
912                crate::InlineRuntimeEffectController,
913            )),
914            direct_completions: crate::DirectCompletionClient::unavailable(
915                "direct completions are unavailable in this test context",
916            ),
917            parent_invocation: None,
918            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
919                crate::PluginOptions::default(),
920                crate::SessionPolicy::default(),
921            ),
922            session_id: "session".to_string(),
923            agent_frame_id: String::new(),
924            event_tx,
925            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
926            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
927            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
928            turn_context: crate::TurnContext::default(),
929        });
930        let ctx = RuntimeExecutionContext::new(
931            "session".to_string(),
932            dispatch,
933            lashlang::LashlangAbilities::default().with_processes(),
934            Default::default(),
935            Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
936            Arc::new(crate::InMemoryAttachmentStore::new()),
937            Arc::new(crate::ChronologicalProjection::default()),
938            None,
939            crate::TurnContext::default(),
940        );
941        let mut input = lashlang::Record::new();
942        input.insert("root".to_string(), lashlang::Value::String(".".into()));
943        let linked = ctx
944            .link_lashlang_module(
945                lashlang::parse("process scan(root: str) { finish root }").expect("process module"),
946            )
947            .expect("link process module");
948        ctx.put_lashlang_module_artifact(&linked.artifact)
949            .await
950            .expect("store module artifact");
951        let process_ref = linked
952            .artifact
953            .process_ref("scan")
954            .expect("scan process ref")
955            .clone();
956        let (registration, label) = ctx
957            .prepare_lashlang_process_start(lashlang::ProcessStart {
958                module_ref: linked.module_ref.clone(),
959                process_ref,
960                host_requirements_ref: linked.host_requirements_ref.clone(),
961                process_name: "scan".to_string(),
962                args: input,
963            })
964            .await
965            .expect("process start should prepare");
966
967        assert_eq!(label.as_deref(), Some("scan"));
968        assert!(
969            registration
970                .event_types
971                .iter()
972                .any(|event_type| event_type.name == "process.wake")
973        );
974        let crate::ProcessInput::LashlangProcess {
975            args, process_name, ..
976        } = registration.input.as_ref()
977        else {
978            panic!("expected lashlang process input");
979        };
980        assert_eq!(process_name, "scan");
981        assert_eq!(args.get("root"), Some(&serde_json::json!(".")));
982    }
983
984    #[test]
985    fn lashlang_host_environment_reflects_host_abilities() {
986        let tool = crate::ToolDefinition::raw(
987            "tool:alpha",
988            "alpha",
989            "Alpha tool.",
990            crate::ToolDefinition::default_input_schema(),
991            serde_json::json!({ "type": "object", "additionalProperties": true }),
992        );
993        let plugins = crate::plugin::PluginHost::empty()
994            .build_session("session", None)
995            .expect("plugin session");
996        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
997        let dispatch = Arc::new(ToolDispatchContext {
998            plugins,
999            tools: Arc::new(NoopTools),
1000            tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
1001                vec![tool.manifest()],
1002                std::collections::BTreeMap::new(),
1003            )),
1004            sessions: Arc::new(crate::testing::MockSessionManager::default()),
1005            session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
1006            session_graph: Arc::new(crate::testing::MockSessionManager::default()),
1007            processes: Arc::new(crate::UnavailableProcessService),
1008            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
1009            trigger_router: None,
1010            effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1011                crate::InlineRuntimeEffectController,
1012            )),
1013            direct_completions: crate::DirectCompletionClient::unavailable(
1014                "direct completions are unavailable in this test context",
1015            ),
1016            parent_invocation: None,
1017            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
1018                crate::PluginOptions::default(),
1019                crate::SessionPolicy::default(),
1020            ),
1021            session_id: "session".to_string(),
1022            agent_frame_id: String::new(),
1023            event_tx,
1024            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1025            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
1026            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
1027            turn_context: crate::TurnContext::default(),
1028        });
1029        let ctx = RuntimeExecutionContext::new(
1030            "session".to_string(),
1031            dispatch,
1032            lashlang::LashlangAbilities::default()
1033                .with_sleep()
1034                .with_processes()
1035                .with_process_signals(),
1036            Default::default(),
1037            Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
1038            Arc::new(crate::InMemoryAttachmentStore::new()),
1039            Arc::new(crate::ChronologicalProjection::default()),
1040            None,
1041            crate::TurnContext::default(),
1042        );
1043
1044        let environment = ctx.lashlang_host_environment();
1045
1046        assert!(std::ptr::eq(environment, ctx.lashlang_host_environment()));
1047        assert!(environment.abilities.processes);
1048        assert!(environment.abilities.sleep);
1049        assert!(environment.abilities.process_signals);
1050        assert!(!environment.abilities.triggers);
1051        assert!(
1052            environment
1053                .resources
1054                .resolve_operation("Tools", "alpha")
1055                .is_some()
1056        );
1057    }
1058
1059    #[test]
1060    fn lashlang_host_environment_reflects_host_resource_contributions() {
1061        let mut resources = lashlang::LashlangHostCatalog::new();
1062        resources
1063            .add_trigger_source_constructor(
1064                ["clock", "Alarm"],
1065                lashlang::TypeExpr::Object(vec![lashlang::TypeField {
1066                    name: "at".into(),
1067                    ty: lashlang::TypeExpr::Str,
1068                    optional: false,
1069                }]),
1070                lashlang::NamedDataType::object(
1071                    "clock.Tick",
1072                    vec![lashlang::TypeField {
1073                        name: "fired_at".into(),
1074                        ty: lashlang::TypeExpr::Str,
1075                        optional: false,
1076                    }],
1077                )
1078                .expect("valid clock tick type"),
1079            )
1080            .expect("valid clock trigger source");
1081        let plugin_host = crate::plugin::PluginHost::empty();
1082        let mut merged_resources = plugin_host.lashlang_resources();
1083        merged_resources.extend(resources);
1084        let plugins = plugin_host
1085            .with_lashlang_resources(merged_resources)
1086            .build_session("session", None)
1087            .expect("plugin session");
1088        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
1089        let dispatch = Arc::new(ToolDispatchContext {
1090            plugins,
1091            tools: Arc::new(NoopTools),
1092            tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
1093                Vec::new(),
1094                std::collections::BTreeMap::new(),
1095            )),
1096            sessions: Arc::new(crate::testing::MockSessionManager::default()),
1097            session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
1098            session_graph: Arc::new(crate::testing::MockSessionManager::default()),
1099            processes: Arc::new(crate::UnavailableProcessService),
1100            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
1101            trigger_router: None,
1102            effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1103                crate::InlineRuntimeEffectController,
1104            )),
1105            direct_completions: crate::DirectCompletionClient::unavailable(
1106                "direct completions are unavailable in this test context",
1107            ),
1108            parent_invocation: None,
1109            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
1110                crate::PluginOptions::default(),
1111                crate::SessionPolicy::default(),
1112            ),
1113            session_id: "session".to_string(),
1114            agent_frame_id: String::new(),
1115            event_tx,
1116            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1117            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
1118            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
1119            turn_context: crate::TurnContext::default(),
1120        });
1121        let ctx = RuntimeExecutionContext::new(
1122            "session".to_string(),
1123            dispatch,
1124            lashlang::LashlangAbilities::default()
1125                .with_processes()
1126                .with_triggers(),
1127            Default::default(),
1128            Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
1129            Arc::new(crate::InMemoryAttachmentStore::new()),
1130            Arc::new(crate::ChronologicalProjection::default()),
1131            None,
1132            crate::TurnContext::default(),
1133        );
1134
1135        let host_environment = ctx.lashlang_host_environment();
1136
1137        assert!(
1138            host_environment
1139                .resources
1140                .resolve_value_constructor(&["clock", "Alarm"])
1141                .is_some()
1142        );
1143        assert!(
1144            host_environment
1145                .resources
1146                .resolve_trigger_source("clock.Alarm")
1147                .is_some()
1148        );
1149        lashlang::LinkedModule::link(
1150            lashlang::parse(
1151                r#"
1152                process remember(tick: clock.Tick) {
1153                  finish true
1154                }
1155
1156                source = clock.Alarm({ at: "08:00" })
1157                await triggers.register({
1158                  source: source,
1159                  target: remember,
1160                  inputs: { tick: trigger.event }
1161                })?
1162                "#,
1163            )
1164            .expect("parse trigger registry module"),
1165            host_environment,
1166        )
1167        .expect("host resource contribution should be linkable");
1168    }
1169}