Skip to main content

lash_core/session/
execution_context.rs

1use std::sync::Arc;
2
3use tokio::sync::mpsc::Sender;
4use tokio_util::sync::CancellationToken;
5
6use crate::tool_dispatch::ToolDispatchContext;
7use crate::{TurnActivity, TurnActivityId, TurnEvent};
8
9pub(crate) fn lashlang_surface_from_tool_surface(
10    surface: &crate::ToolSurface,
11    abilities: lashlang::LashlangAbilities,
12    language_features: lashlang::LashlangLanguageFeatures,
13    host_resources: lashlang::ResourceCatalog,
14) -> lashlang::LashlangSurface {
15    let mut resources = lashlang_resources_from_tool_surface(surface);
16    resources.extend(host_resources);
17    lashlang::LashlangSurface::new(resources, abilities).with_language_features(language_features)
18}
19
20pub(crate) fn lashlang_resources_from_tool_surface(
21    surface: &crate::ToolSurface,
22) -> lashlang::ResourceCatalog {
23    let mut catalog = lashlang::ResourceCatalog::new();
24    for entry in surface.tools.iter() {
25        if entry.availability.is_callable() {
26            let agent_surface = entry
27                .manifest
28                .agent_surface
29                .executable_for(&entry.manifest.name);
30            catalog.add_module_operation(
31                agent_surface.module_path.iter().map(String::as_str),
32                agent_surface.authority_type.clone(),
33                agent_surface.operation.clone(),
34                entry.manifest.name.clone(),
35                lashlang::TypeExpr::Any,
36                lashlang::TypeExpr::Any,
37            );
38        }
39    }
40    catalog
41}
42
43#[derive(Clone)]
44pub struct RuntimeExecutionContext<'run> {
45    pub(super) session_id: String,
46    pub(super) dispatch: Arc<ToolDispatchContext<'run>>,
47    lashlang_abilities: lashlang::LashlangAbilities,
48    lashlang_language_features: lashlang::LashlangLanguageFeatures,
49    lashlang_surface: lashlang::LashlangSurface,
50    lashlang_artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
51    attachment_store: Arc<dyn crate::AttachmentStore>,
52    chronological_projection: Arc<crate::ChronologicalProjection>,
53    protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
54    turn_context: crate::TurnContext,
55    execution_env_spec: crate::ProcessExecutionEnvSpec,
56    process_originator: Option<crate::ProcessOriginator>,
57    process_env_ref: Option<crate::ProcessExecutionEnvRef>,
58    process_wake_target: Option<crate::SessionScope>,
59    pub(super) parent_invocation: Option<crate::RuntimeInvocation>,
60    lashlang_execution_sink: Option<Arc<dyn lash_trace::TraceSink>>,
61    lashlang_execution_context: lash_trace::TraceContext,
62    pub(super) turn_event_tx: Option<Sender<TurnActivity>>,
63    pub(super) cancellation_token: Option<CancellationToken>,
64    /// Process ids started by THIS execution context. Possession of a handle
65    /// the run itself created is sufficient capability to await/cancel it —
66    /// run-local children are not session handle grants (the ephemeral
67    /// execution scope must never appear in durable grant state).
68    started_process_ids: Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
69}
70
71impl<'run> RuntimeExecutionContext<'run> {
72    pub(crate) fn drain_tool_host_event_outcomes(
73        &self,
74    ) -> Result<Vec<crate::tool_dispatch::ToolHostEventEffectOutcome>, crate::PluginError> {
75        self.dispatch
76            .host_event_outcomes
77            .drain()
78            .map_err(crate::PluginError::Session)
79    }
80
81    pub(super) fn process_scope(
82        &self,
83        parent_invocation: Option<crate::RuntimeInvocation>,
84    ) -> crate::ProcessOpScope<'_> {
85        crate::ProcessOpScope::new(self.dispatch.effect_controller.scoped())
86            .with_parent_invocation(parent_invocation)
87            .with_agent_frame_id(Some(self.dispatch.agent_frame_id.clone()))
88    }
89
90    #[allow(
91        clippy::too_many_arguments,
92        reason = "code execution bridge carries explicit per-turn runtime dependencies"
93    )]
94    pub(crate) fn new(
95        session_id: String,
96        dispatch: Arc<ToolDispatchContext<'run>>,
97        lashlang_abilities: lashlang::LashlangAbilities,
98        lashlang_language_features: lashlang::LashlangLanguageFeatures,
99        lashlang_artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
100        attachment_store: Arc<dyn crate::AttachmentStore>,
101        chronological_projection: Arc<crate::ChronologicalProjection>,
102        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
103        turn_context: crate::TurnContext,
104    ) -> Self {
105        let lashlang_surface = lashlang_surface_from_tool_surface(
106            &dispatch.surface,
107            lashlang_abilities,
108            lashlang_language_features,
109            dispatch.plugins.lashlang_resources(),
110        );
111        Self {
112            session_id,
113            dispatch,
114            lashlang_abilities,
115            lashlang_language_features,
116            lashlang_surface,
117            lashlang_artifact_store,
118            attachment_store,
119            chronological_projection,
120            protocol_extension,
121            turn_context,
122            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
123                crate::PluginOptions::default(),
124                crate::SessionPolicy::default(),
125            ),
126            process_originator: None,
127            started_process_ids: Arc::default(),
128            process_env_ref: None,
129            process_wake_target: None,
130            parent_invocation: None,
131            lashlang_execution_sink: None,
132            lashlang_execution_context: lash_trace::TraceContext::default(),
133            turn_event_tx: None,
134            cancellation_token: None,
135        }
136    }
137
138    pub fn session_id(&self) -> &str {
139        &self.session_id
140    }
141
142    pub fn attachment_store(&self) -> Arc<dyn crate::AttachmentStore> {
143        Arc::clone(&self.attachment_store)
144    }
145
146    pub async fn put_lashlang_module_artifact(
147        &self,
148        artifact: &lashlang::ModuleArtifact,
149    ) -> Result<(), String> {
150        self.lashlang_artifact_store
151            .put_module_artifact(artifact)
152            .await
153            .map_err(|err| err.to_string())
154    }
155
156    pub fn chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
157        Arc::clone(&self.chronological_projection)
158    }
159
160    pub fn protocol_extension<T: 'static>(&self) -> Option<&T> {
161        self.protocol_extension
162            .as_ref()
163            .and_then(|extension| extension.as_any().downcast_ref::<T>())
164    }
165
166    pub fn turn_context(&self) -> &crate::TurnContext {
167        &self.turn_context
168    }
169
170    pub(crate) fn session_graph_service(&self) -> &dyn crate::plugin::SessionGraphService {
171        self.dispatch.session_graph.as_ref()
172    }
173
174    pub(super) async fn emit_turn_activity(
175        &self,
176        correlation_id: TurnActivityId,
177        event: TurnEvent,
178    ) {
179        if let Some(tx) = &self.turn_event_tx {
180            let _ = tx.send(TurnActivity::new(correlation_id, event)).await;
181        }
182    }
183
184    pub(crate) fn with_turn_event_sender(mut self, turn_event_tx: Sender<TurnActivity>) -> Self {
185        self.turn_event_tx = Some(turn_event_tx);
186        self
187    }
188
189    pub(crate) fn with_parent_invocation(mut self, metadata: crate::RuntimeInvocation) -> Self {
190        self.parent_invocation = Some(metadata);
191        self
192    }
193
194    pub(crate) fn with_execution_env_spec(
195        mut self,
196        execution_env_spec: crate::ProcessExecutionEnvSpec,
197    ) -> Self {
198        self.execution_env_spec = execution_env_spec;
199        self
200    }
201
202    pub(crate) fn with_process_registration_context(
203        mut self,
204        registration: &crate::ProcessRegistration,
205    ) -> Self {
206        self.process_originator = Some(registration.provenance.originator.clone());
207        self.process_env_ref = registration.env_ref.clone();
208        self.process_wake_target = registration.wake_target.clone();
209        self
210    }
211
212    /// Spawn provenance for children started by this context, present only
213    /// when this context executes a process: children inherit the chain's
214    /// originator and wake target instead of the ephemeral execution scope.
215    pub(super) fn record_started_process(&self, process_id: &str) {
216        self.started_process_ids
217            .lock()
218            .expect("started process ids lock")
219            .insert(process_id.to_string());
220    }
221
222    pub(super) fn is_run_local_process(&self, process_id: &str) -> bool {
223        self.started_process_ids
224            .lock()
225            .expect("started process ids lock")
226            .contains(process_id)
227    }
228
229    pub(crate) fn process_spawn_provenance(&self) -> Option<crate::ProcessSpawnProvenance> {
230        self.process_originator
231            .clone()
232            .map(|originator| crate::ProcessSpawnProvenance {
233                originator,
234                wake_target: self.process_wake_target.clone(),
235            })
236    }
237
238    pub(super) async fn attach_captured_process_execution_env(
239        &self,
240        registration: crate::ProcessRegistration,
241    ) -> Result<crate::ProcessRegistration, crate::PluginError> {
242        if registration.env_ref.is_some() {
243            return Ok(registration);
244        }
245        match registration.input.as_ref() {
246            crate::ProcessInput::ToolCall { .. } | crate::ProcessInput::LashlangProcess { .. } => {
247                let env_ref = self.captured_process_execution_env_ref().await?;
248                Ok(registration.with_execution_env_ref(Some(env_ref)))
249            }
250            crate::ProcessInput::External { .. } | crate::ProcessInput::SessionTurn { .. } => {
251                Ok(registration)
252            }
253        }
254    }
255
256    async fn captured_process_execution_env_ref(
257        &self,
258    ) -> Result<crate::ProcessExecutionEnvRef, crate::PluginError> {
259        if let Some(env_ref) = self.process_env_ref.clone() {
260            return Ok(env_ref);
261        }
262        crate::persist_process_execution_env(
263            self.lashlang_artifact_store.as_ref(),
264            &self.execution_env_spec,
265        )
266        .await
267    }
268
269    pub(crate) fn with_lashlang_execution_trace(
270        mut self,
271        sink: Option<Arc<dyn lash_trace::TraceSink>>,
272        context: lash_trace::TraceContext,
273    ) -> Self {
274        self.lashlang_execution_sink = sink;
275        self.lashlang_execution_context = context;
276        self
277    }
278
279    pub fn parent_invocation(&self) -> Option<&crate::RuntimeInvocation> {
280        self.parent_invocation.as_ref()
281    }
282
283    pub fn lashlang_execution_sink(&self) -> Option<Arc<dyn lash_trace::TraceSink>> {
284        self.lashlang_execution_sink.clone()
285    }
286
287    pub fn lashlang_execution_context(&self) -> &lash_trace::TraceContext {
288        &self.lashlang_execution_context
289    }
290
291    pub(crate) fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
292        self.cancellation_token = Some(cancellation_token);
293        self
294    }
295
296    pub(crate) fn tool_scheduling(&self, name: &str) -> crate::ToolScheduling {
297        crate::tool_dispatch::resolve_tool_scheduling(&self.dispatch, name)
298    }
299
300    pub fn callable_tool_manifest(&self, name: &str) -> Option<crate::ToolManifest> {
301        crate::tool_dispatch::resolve_callable_manifest(&self.dispatch, name)
302    }
303
304    pub fn callable_tool_manifest_by_id(&self, id: &crate::ToolId) -> Option<crate::ToolManifest> {
305        crate::tool_dispatch::resolve_callable_manifest_by_id(&self.dispatch, id)
306    }
307
308    pub fn resolve_lashlang_host_operation(
309        &self,
310        receiver: &lashlang::ResourceHandle,
311        operation: &str,
312    ) -> Result<String, String> {
313        self.lashlang_surface
314            .resources
315            .resolve_module_operation(&receiver.resource_type, &receiver.alias, operation)
316            .map(|binding| binding.host_operation.clone())
317            .ok_or_else(|| {
318                format!(
319                    "module `{}` of type `{}` does not expose operation `{operation}`",
320                    receiver.alias, receiver.resource_type
321                )
322            })
323    }
324
325    pub async fn prepare_lashlang_process_start(
326        &self,
327        start: lashlang::ProcessStart,
328    ) -> Result<(crate::ProcessRegistration, Option<String>), String> {
329        let display_name = Some(start.process_name.clone());
330        let artifact = self
331            .lashlang_artifact_store
332            .get_module_artifact(&start.module_ref)
333            .await
334            .map_err(|err| format!("failed to load lashlang module artifact: {err}"))?
335            .ok_or_else(|| {
336                format!(
337                    "missing lashlang module artifact `{}` for process `{}`",
338                    start.module_ref, start.process_name
339                )
340            })?;
341        if artifact.required_surface_ref != start.required_surface_ref {
342            return Err(format!(
343                "lashlang module artifact `{}` required surface mismatch: process requested {}, artifact has {}",
344                start.module_ref, start.required_surface_ref, artifact.required_surface_ref
345            ));
346        }
347        if artifact.process_ref(&start.process_name) != Some(&start.process_ref) {
348            return Err(format!(
349                "lashlang module artifact `{}` does not export process `{}` as requested ref {:?}",
350                start.module_ref, start.process_name, start.process_ref
351            ));
352        }
353        let args = match serde_json::to_value(lashlang::Value::Record(Arc::new(start.args)))
354            .map_err(|err| format!("failed to serialize process args: {err}"))?
355        {
356            serde_json::Value::Object(map) => map,
357            _ => return Err("process args must serialize as a record".to_string()),
358        };
359        let signal_event_types = artifact
360            .canonical_ir
361            .process(&start.process_name)
362            .map(crate::lashlang_process_signal_event_types)
363            .unwrap_or_default();
364        let process_id = format!("process:{}", uuid::Uuid::new_v4());
365        let registration = crate::ProcessRegistration::session_start_draft(
366            process_id,
367            crate::ProcessInput::LashlangProcess {
368                module_ref: start.module_ref,
369                process_ref: start.process_ref,
370                required_surface_ref: start.required_surface_ref,
371                process_name: start.process_name,
372                args,
373            },
374        )
375        .with_extra_event_types(
376            crate::lashlang_process_event_types()
377                .into_iter()
378                .chain(signal_event_types),
379        );
380        Ok((registration, display_name))
381    }
382
383    pub fn lashlang_surface(&self) -> &lashlang::LashlangSurface {
384        &self.lashlang_surface
385    }
386
387    pub fn lashlang_abilities(&self) -> lashlang::LashlangAbilities {
388        self.lashlang_abilities
389    }
390
391    pub fn lashlang_language_features(&self) -> lashlang::LashlangLanguageFeatures {
392        self.lashlang_language_features
393    }
394
395    pub fn link_lashlang_module(
396        &self,
397        program: lashlang::Program,
398    ) -> Result<lashlang::LinkedModule, String> {
399        lashlang::LinkedModule::link(program, self.lashlang_surface())
400            .map_err(|err| err.to_string())
401    }
402
403    pub async fn perform_lashlang_trigger_operation(
404        &self,
405        operation: &str,
406        payload: serde_json::Value,
407    ) -> Result<serde_json::Value, String> {
408        match lashlang::TriggerHostOperation::from_host_operation(operation) {
409            Some(lashlang::TriggerHostOperation::Register) => self
410                .register_host_event_subscription(payload)
411                .await
412                .map_err(|err| err.to_string()),
413            Some(lashlang::TriggerHostOperation::List) => self
414                .list_host_event_subscriptions(payload)
415                .await
416                .map_err(|err| err.to_string()),
417            Some(lashlang::TriggerHostOperation::Cancel) => self
418                .cancel_host_event_subscription(payload)
419                .await
420                .map_err(|err| err.to_string()),
421            None => Err(format!("unknown trigger operation `{operation}`")),
422        }
423    }
424
425    async fn register_host_event_subscription(
426        &self,
427        payload: serde_json::Value,
428    ) -> Result<serde_json::Value, crate::PluginError> {
429        let router = self.dispatch.host_event_router.as_ref().ok_or_else(|| {
430            crate::PluginError::Session(
431                "host event store is unavailable in this runtime".to_string(),
432            )
433        })?;
434        let request = lashlang::TriggerRegistrationRequest::decode(&payload)
435            .map_err(|err| crate::PluginError::Session(err.to_string()))?;
436        let source_type = request.source.source_type.clone();
437        let source_value = request.source.value.clone();
438        let source = request.source.to_json();
439        let event_type = lashlang::event_type_for_source(
440            &self.dispatch.plugins.lashlang_resources(),
441            &source_type,
442        )
443        .map_err(|err| crate::PluginError::Session(err.to_string()))?;
444        let validation = crate::plugin::validate_target_process(
445            &request.target,
446            &event_type,
447            &request.inputs,
448            self.lashlang_artifact_store.as_ref(),
449        )
450        .await?;
451        let store = router.store();
452        let source_key = store
453            .source_key_for_subscription(&source_type, &source_value)
454            .await?;
455        let env_ref = match self.process_env_ref.clone() {
456            Some(env_ref) => env_ref,
457            None => {
458                crate::persist_process_execution_env(
459                    self.lashlang_artifact_store.as_ref(),
460                    &self.execution_env_spec,
461                )
462                .await?
463            }
464        };
465        let registrant = self.process_originator.clone().unwrap_or_else(|| {
466            crate::ProcessOriginator::session(crate::SessionScope::new(self.session_id.clone()))
467        });
468        let wake_target = self
469            .process_wake_target
470            .clone()
471            .or_else(|| match &registrant {
472                crate::ProcessOriginator::Session { scope } => Some(scope.clone()),
473                crate::ProcessOriginator::Host => None,
474            });
475        let record = store
476            .register_subscription(crate::TriggerSubscriptionDraft {
477                registrant,
478                env_ref,
479                wake_target,
480                name: request.name,
481                source_type,
482                source_key,
483                source,
484                event_ty: validation.event_ty,
485                module_ref: request.target.module_ref,
486                required_surface_ref: request.target.required_surface_ref,
487                process_ref: request.target.process_ref,
488                process_name: request.target.process_name,
489                input_template: validation.inputs,
490            })
491            .await?;
492        Ok(crate::plugin::trigger_handle_json(&record.handle))
493    }
494
495    async fn list_host_event_subscriptions(
496        &self,
497        payload: serde_json::Value,
498    ) -> Result<serde_json::Value, crate::PluginError> {
499        let router = self.dispatch.host_event_router.as_ref().ok_or_else(|| {
500            crate::PluginError::Session(
501                "host event store is unavailable in this runtime".to_string(),
502            )
503        })?;
504        let request = lashlang::TriggerListRequest::decode(&payload)
505            .map_err(|err| crate::PluginError::Session(err.to_string()))?;
506        let mut filter = crate::TriggerSubscriptionFilter::for_session(&self.session_id);
507        filter.target = request.target;
508        filter.name = request.name;
509        filter.source_type = request.source_type;
510        filter.enabled = request.enabled;
511        let registrations = router
512            .store()
513            .list_subscriptions(filter)
514            .await?
515            .iter()
516            .map(crate::TriggerRegistration::from)
517            .collect::<Vec<_>>();
518        serde_json::to_value(registrations).map_err(|err| {
519            crate::PluginError::Session(format!("failed to encode trigger registrations: {err}"))
520        })
521    }
522
523    async fn cancel_host_event_subscription(
524        &self,
525        payload: serde_json::Value,
526    ) -> Result<serde_json::Value, crate::PluginError> {
527        let router = self.dispatch.host_event_router.as_ref().ok_or_else(|| {
528            crate::PluginError::Session(
529                "host event store is unavailable in this runtime".to_string(),
530            )
531        })?;
532        let request = lashlang::TriggerCancelRequest::decode(&payload)
533            .map_err(|err| crate::PluginError::Session(err.to_string()))?;
534        let changed = router
535            .store()
536            .cancel_subscription(&self.session_id, &request.handle)
537            .await?;
538        Ok(serde_json::json!(changed))
539    }
540
541    pub fn tool_argument_projection_policy(
542        &self,
543        name: &str,
544    ) -> crate::ToolArgumentProjectionPolicy {
545        crate::tool_dispatch::resolve_tool_argument_projection_policy(&self.dispatch, name)
546    }
547
548    pub async fn start_lashlang_process(
549        &self,
550        registration: crate::ProcessRegistration,
551        label: Option<String>,
552    ) -> crate::ToolInvocationReply {
553        let registration = match self
554            .attach_captured_process_execution_env(registration)
555            .await
556        {
557            Ok(registration) => registration,
558            Err(err) => {
559                return crate::ToolInvocationReply::error(serde_json::json!(err.to_string()));
560            }
561        };
562        let process_id = registration.id.clone();
563        let mut options = crate::ProcessStartOptions::new()
564            .with_descriptor(crate::ProcessHandleDescriptor::new(Some("lashlang"), label));
565        if let Some(spawn) = self.process_spawn_provenance() {
566            options = options.with_spawn_provenance(spawn);
567        }
568        match self
569            .dispatch
570            .processes
571            .start(
572                &self.session_id,
573                registration,
574                options,
575                self.process_scope(self.parent_invocation.clone()),
576            )
577            .await
578        {
579            Ok(_) => {
580                self.record_started_process(&process_id);
581                crate::ToolInvocationReply::success(crate::lashlang_bridge::process_handle_json(
582                    &process_id,
583                ))
584            }
585            Err(err) => crate::ToolInvocationReply::error(serde_json::json!(err.to_string())),
586        }
587    }
588
589    pub async fn sleep_lashlang(
590        &self,
591        scope: &str,
592        sequence: u64,
593        duration_ms: u64,
594    ) -> Result<(), crate::RuntimeEffectControllerError> {
595        let cancellation = self.cancellation_token.clone().unwrap_or_default();
596        let invocation = crate::runtime::causal::lashlang_sleep_invocation(
597            &self.session_id,
598            self.parent_invocation.as_ref(),
599            scope,
600            sequence,
601        );
602        let outcome = self
603            .dispatch
604            .effect_controller
605            .controller()
606            .execute_effect(
607                crate::RuntimeEffectEnvelope::new(
608                    invocation,
609                    crate::RuntimeEffectCommand::Sleep { duration_ms },
610                ),
611                crate::RuntimeEffectLocalExecutor::sleep(cancellation),
612            )
613            .await?;
614        match outcome {
615            crate::RuntimeEffectOutcome::Sleep => Ok(()),
616            other => Err(crate::RuntimeEffectControllerError::new(
617                "runtime_effect_wrong_outcome",
618                format!("expected sleep outcome, got {}", other.kind().as_str()),
619            )),
620        }
621    }
622
623    pub async fn await_process_event_lashlang(
624        &self,
625        registry: Arc<dyn crate::ProcessRegistry>,
626        process_id: &str,
627        signal_name: &str,
628        event_type: &str,
629        event_ordinal: u64,
630    ) -> Result<serde_json::Value, crate::RuntimeEffectControllerError> {
631        let cancellation = self.cancellation_token.clone().unwrap_or_default();
632        let key = crate::process_signal_wait_key(process_id, signal_name, event_ordinal);
633        let invocation = crate::runtime::causal::lashlang_await_event_invocation(
634            &self.session_id,
635            self.parent_invocation.as_ref(),
636            process_id,
637            signal_name,
638            event_ordinal,
639        );
640        let outcome = self
641            .dispatch
642            .effect_controller
643            .controller()
644            .execute_effect(
645                crate::RuntimeEffectEnvelope::new(
646                    invocation,
647                    crate::RuntimeEffectCommand::AwaitEvent { key: key.clone() },
648                ),
649                crate::RuntimeEffectLocalExecutor::await_process_event(
650                    key,
651                    registry,
652                    process_id.to_string(),
653                    event_type.to_string(),
654                    event_ordinal,
655                    cancellation,
656                ),
657            )
658            .await?;
659        outcome.into_await_event()
660    }
661
662    pub async fn signal_lashlang_process(
663        &self,
664        registry: Arc<dyn crate::ProcessRegistry>,
665        process_id: &str,
666        signal_name: &str,
667        signal_id: String,
668        payload: serde_json::Value,
669    ) -> Result<crate::ProcessEvent, crate::RuntimeEffectControllerError> {
670        let event_type = crate::process_signal_event_type(signal_name)?;
671        let replay_key = format!("process:{process_id}:signal.{signal_name}:{signal_id}");
672        let command = crate::ProcessCommand::Signal {
673            process_id: process_id.to_string(),
674            signal_name: signal_name.to_string(),
675            signal_id,
676            request: crate::ProcessEventAppendRequest::new(event_type, payload)
677                .with_replay_key(replay_key),
678        };
679        let effect_id = command.effect_id();
680        let invocation = crate::runtime::causal::process_effect_invocation(
681            &self.session_id,
682            self.parent_invocation.clone(),
683            &effect_id,
684        );
685        let outcome = self
686            .dispatch
687            .effect_controller
688            .controller()
689            .execute_effect(
690                crate::RuntimeEffectEnvelope::new(
691                    invocation,
692                    crate::RuntimeEffectCommand::process(command),
693                ),
694                crate::RuntimeEffectLocalExecutor::process_control(registry),
695            )
696            .await?;
697        match outcome.into_process()? {
698            crate::ProcessEffectOutcome::Signal { event } => Ok(event),
699            other => Err(crate::RuntimeEffectControllerError::new(
700                "runtime_effect_wrong_outcome",
701                format!("expected signal outcome, got {other:?}"),
702            )),
703        }
704    }
705}
706
707#[cfg(test)]
708mod tests {
709    use super::*;
710    use crate::tool_dispatch::ToolDispatchContext;
711    use crate::{ToolCall, ToolProvider, ToolResult};
712
713    struct NoopTools;
714
715    #[async_trait::async_trait]
716    impl ToolProvider for NoopTools {
717        fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
718            Vec::new()
719        }
720
721        fn resolve_contract(&self, _name: &str) -> Option<Arc<crate::ToolContract>> {
722            None
723        }
724
725        async fn execute(&self, _call: ToolCall<'_>) -> ToolResult {
726            ToolResult::err_fmt("not used")
727        }
728    }
729
730    #[test]
731    fn tool_argument_projection_policy_resolves_from_active_surface_and_defaults_unknown() {
732        let tool = crate::ToolDefinition::raw(
733            "tool:seedy",
734            "seedy",
735            "Seed-aware",
736            crate::ToolDefinition::default_input_schema(),
737            serde_json::json!({ "type": "string" }),
738        )
739        .with_argument_projection(
740            crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed"),
741        );
742        let plugins = crate::plugin::PluginHost::empty()
743            .build_session("session", None)
744            .expect("plugin session");
745        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
746        let dispatch = Arc::new(ToolDispatchContext {
747            plugins,
748            tools: Arc::new(NoopTools),
749            surface: Arc::new(crate::ToolSurface::from_tools(
750                vec![tool.manifest()],
751                std::collections::BTreeMap::new(),
752            )),
753            sessions: Arc::new(crate::testing::MockSessionManager::default()),
754            session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
755            session_graph: Arc::new(crate::testing::MockSessionManager::default()),
756            processes: Arc::new(crate::UnavailableProcessService),
757            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
758            host_event_router: None,
759            effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
760                crate::InlineRuntimeEffectController,
761            )),
762            direct_completions: crate::DirectCompletionClient::unavailable(
763                "direct completions are unavailable in this test context",
764            ),
765            parent_invocation: None,
766            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
767                crate::PluginOptions::default(),
768                crate::SessionPolicy::default(),
769            ),
770            session_id: "session".to_string(),
771            agent_frame_id: String::new(),
772            event_tx,
773            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
774            host_event_outcomes: crate::tool_dispatch::ToolHostEventOutcomeBuffer::default(),
775            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
776            turn_context: crate::TurnContext::default(),
777        });
778        let ctx = RuntimeExecutionContext::new(
779            "session".to_string(),
780            dispatch,
781            Default::default(),
782            Default::default(),
783            Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
784            Arc::new(crate::InMemoryAttachmentStore::new()),
785            Arc::new(crate::ChronologicalProjection::default()),
786            None,
787            crate::TurnContext::default(),
788        );
789
790        assert_eq!(
791            ctx.tool_argument_projection_policy("seedy"),
792            crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed")
793        );
794        assert_eq!(
795            ctx.tool_argument_projection_policy("missing"),
796            crate::ToolArgumentProjectionPolicy::MaterializeProjectedValues
797        );
798    }
799
800    #[tokio::test]
801    async fn prepare_lashlang_process_start_captures_tool_ids_and_explicit_input() {
802        let tool = crate::ToolDefinition::raw(
803            "tool:alpha",
804            "alpha",
805            "Alpha tool.",
806            crate::ToolDefinition::default_input_schema(),
807            serde_json::json!({ "type": "object", "additionalProperties": true }),
808        );
809        let plugins = crate::plugin::PluginHost::empty()
810            .build_session("session", None)
811            .expect("plugin session");
812        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
813        let dispatch = Arc::new(ToolDispatchContext {
814            plugins,
815            tools: Arc::new(NoopTools),
816            surface: Arc::new(crate::ToolSurface::from_tools(
817                vec![tool.manifest()],
818                std::collections::BTreeMap::new(),
819            )),
820            sessions: Arc::new(crate::testing::MockSessionManager::default()),
821            session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
822            session_graph: Arc::new(crate::testing::MockSessionManager::default()),
823            processes: Arc::new(crate::UnavailableProcessService),
824            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
825            host_event_router: None,
826            effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
827                crate::InlineRuntimeEffectController,
828            )),
829            direct_completions: crate::DirectCompletionClient::unavailable(
830                "direct completions are unavailable in this test context",
831            ),
832            parent_invocation: None,
833            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
834                crate::PluginOptions::default(),
835                crate::SessionPolicy::default(),
836            ),
837            session_id: "session".to_string(),
838            agent_frame_id: String::new(),
839            event_tx,
840            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
841            host_event_outcomes: crate::tool_dispatch::ToolHostEventOutcomeBuffer::default(),
842            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
843            turn_context: crate::TurnContext::default(),
844        });
845        let ctx = RuntimeExecutionContext::new(
846            "session".to_string(),
847            dispatch,
848            lashlang::LashlangAbilities::default().with_processes(),
849            Default::default(),
850            Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
851            Arc::new(crate::InMemoryAttachmentStore::new()),
852            Arc::new(crate::ChronologicalProjection::default()),
853            None,
854            crate::TurnContext::default(),
855        );
856        let mut input = lashlang::Record::new();
857        input.insert("root".to_string(), lashlang::Value::String(".".into()));
858        let linked = ctx
859            .link_lashlang_module(
860                lashlang::parse("process scan(root: str) { finish root }").expect("process module"),
861            )
862            .expect("link process module");
863        ctx.put_lashlang_module_artifact(&linked.artifact)
864            .await
865            .expect("store module artifact");
866        let process_ref = linked
867            .artifact
868            .process_ref("scan")
869            .expect("scan process ref")
870            .clone();
871        let (registration, label) = ctx
872            .prepare_lashlang_process_start(lashlang::ProcessStart {
873                module_ref: linked.module_ref.clone(),
874                process_ref,
875                required_surface_ref: linked.required_surface_ref.clone(),
876                process_name: "scan".to_string(),
877                args: input,
878            })
879            .await
880            .expect("process start should prepare");
881
882        assert_eq!(label.as_deref(), Some("scan"));
883        assert!(
884            registration
885                .event_types
886                .iter()
887                .any(|event_type| event_type.name == "process.wake")
888        );
889        let crate::ProcessInput::LashlangProcess {
890            args, process_name, ..
891        } = registration.input.as_ref()
892        else {
893            panic!("expected lashlang process input");
894        };
895        assert_eq!(process_name, "scan");
896        assert_eq!(args.get("root"), Some(&serde_json::json!(".")));
897    }
898
899    #[test]
900    fn lashlang_surface_reflects_host_abilities() {
901        let tool = crate::ToolDefinition::raw(
902            "tool:alpha",
903            "alpha",
904            "Alpha tool.",
905            crate::ToolDefinition::default_input_schema(),
906            serde_json::json!({ "type": "object", "additionalProperties": true }),
907        );
908        let plugins = crate::plugin::PluginHost::empty()
909            .build_session("session", None)
910            .expect("plugin session");
911        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
912        let dispatch = Arc::new(ToolDispatchContext {
913            plugins,
914            tools: Arc::new(NoopTools),
915            surface: Arc::new(crate::ToolSurface::from_tools(
916                vec![tool.manifest()],
917                std::collections::BTreeMap::new(),
918            )),
919            sessions: Arc::new(crate::testing::MockSessionManager::default()),
920            session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
921            session_graph: Arc::new(crate::testing::MockSessionManager::default()),
922            processes: Arc::new(crate::UnavailableProcessService),
923            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
924            host_event_router: None,
925            effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
926                crate::InlineRuntimeEffectController,
927            )),
928            direct_completions: crate::DirectCompletionClient::unavailable(
929                "direct completions are unavailable in this test context",
930            ),
931            parent_invocation: None,
932            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
933                crate::PluginOptions::default(),
934                crate::SessionPolicy::default(),
935            ),
936            session_id: "session".to_string(),
937            agent_frame_id: String::new(),
938            event_tx,
939            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
940            host_event_outcomes: crate::tool_dispatch::ToolHostEventOutcomeBuffer::default(),
941            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
942            turn_context: crate::TurnContext::default(),
943        });
944        let ctx = RuntimeExecutionContext::new(
945            "session".to_string(),
946            dispatch,
947            lashlang::LashlangAbilities::default()
948                .with_sleep()
949                .with_processes()
950                .with_process_signals(),
951            Default::default(),
952            Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
953            Arc::new(crate::InMemoryAttachmentStore::new()),
954            Arc::new(crate::ChronologicalProjection::default()),
955            None,
956            crate::TurnContext::default(),
957        );
958
959        let surface = ctx.lashlang_surface();
960
961        assert!(std::ptr::eq(surface, ctx.lashlang_surface()));
962        assert!(surface.abilities.processes);
963        assert!(surface.abilities.sleep);
964        assert!(surface.abilities.process_signals);
965        assert!(!surface.abilities.triggers);
966        assert!(
967            surface
968                .resources
969                .resolve_operation("Tools", "alpha")
970                .is_some()
971        );
972    }
973
974    #[test]
975    fn lashlang_surface_reflects_host_resource_contributions() {
976        let mut resources = lashlang::ResourceCatalog::new();
977        resources
978            .add_trigger_source_constructor(
979                ["clock", "Alarm"],
980                lashlang::TypeExpr::Object(vec![lashlang::TypeField {
981                    name: "at".into(),
982                    ty: lashlang::TypeExpr::Str,
983                    optional: false,
984                }]),
985                lashlang::NamedDataType::object(
986                    "clock.Tick",
987                    vec![lashlang::TypeField {
988                        name: "fired_at".into(),
989                        ty: lashlang::TypeExpr::Str,
990                        optional: false,
991                    }],
992                )
993                .expect("valid clock tick type"),
994            )
995            .expect("valid clock trigger source");
996        let plugin_host = crate::plugin::PluginHost::empty();
997        let mut merged_resources = plugin_host.lashlang_resources();
998        merged_resources.extend(resources);
999        let plugins = plugin_host
1000            .with_lashlang_resources(merged_resources)
1001            .build_session("session", None)
1002            .expect("plugin session");
1003        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
1004        let dispatch = Arc::new(ToolDispatchContext {
1005            plugins,
1006            tools: Arc::new(NoopTools),
1007            surface: Arc::new(crate::ToolSurface::from_tools(
1008                Vec::new(),
1009                std::collections::BTreeMap::new(),
1010            )),
1011            sessions: Arc::new(crate::testing::MockSessionManager::default()),
1012            session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
1013            session_graph: Arc::new(crate::testing::MockSessionManager::default()),
1014            processes: Arc::new(crate::UnavailableProcessService),
1015            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
1016            host_event_router: None,
1017            effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1018                crate::InlineRuntimeEffectController,
1019            )),
1020            direct_completions: crate::DirectCompletionClient::unavailable(
1021                "direct completions are unavailable in this test context",
1022            ),
1023            parent_invocation: None,
1024            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
1025                crate::PluginOptions::default(),
1026                crate::SessionPolicy::default(),
1027            ),
1028            session_id: "session".to_string(),
1029            agent_frame_id: String::new(),
1030            event_tx,
1031            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1032            host_event_outcomes: crate::tool_dispatch::ToolHostEventOutcomeBuffer::default(),
1033            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
1034            turn_context: crate::TurnContext::default(),
1035        });
1036        let ctx = RuntimeExecutionContext::new(
1037            "session".to_string(),
1038            dispatch,
1039            lashlang::LashlangAbilities::default()
1040                .with_processes()
1041                .with_triggers(),
1042            Default::default(),
1043            Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
1044            Arc::new(crate::InMemoryAttachmentStore::new()),
1045            Arc::new(crate::ChronologicalProjection::default()),
1046            None,
1047            crate::TurnContext::default(),
1048        );
1049
1050        let surface = ctx.lashlang_surface();
1051
1052        assert!(
1053            surface
1054                .resources
1055                .resolve_value_constructor(&["clock", "Alarm"])
1056                .is_some()
1057        );
1058        assert!(
1059            surface
1060                .resources
1061                .resolve_trigger_source("clock.Alarm")
1062                .is_some()
1063        );
1064        lashlang::LinkedModule::link(
1065            lashlang::parse(
1066                r#"
1067                process remember(tick: clock.Tick) {
1068                  finish true
1069                }
1070
1071                source = clock.Alarm({ at: "08:00" })
1072                await triggers.register({
1073                  source: source,
1074                  target: remember,
1075                  inputs: { tick: trigger.event }
1076                })?
1077                "#,
1078            )
1079            .expect("parse trigger registry module"),
1080            surface,
1081        )
1082        .expect("host resource contribution should be linkable");
1083    }
1084}