Skip to main content

lash_core/session/
execution_context.rs

1use std::sync::Arc;
2
3use tokio::sync::mpsc::Sender;
4use tokio_util::sync::CancellationToken;
5
6use crate::tool_dispatch::ToolDispatchContext;
7use crate::{TurnActivity, TurnActivityId, TurnEvent};
8
9pub(crate) fn lashlang_host_environment_from_tool_catalog(
10    catalog: &crate::ToolCatalog,
11    abilities: lashlang::LashlangAbilities,
12    language_features: lashlang::LashlangLanguageFeatures,
13    host_resources: lashlang::LashlangHostCatalog,
14) -> lashlang::LashlangHostEnvironment {
15    let mut resources = lashlang_resources_from_tool_catalog(catalog);
16    resources.extend(host_resources);
17    lashlang::LashlangHostEnvironment::new(resources, abilities)
18        .with_language_features(language_features)
19}
20
21pub(crate) fn lashlang_resources_from_tool_catalog(
22    catalog: &crate::ToolCatalog,
23) -> lashlang::LashlangHostCatalog {
24    let mut host_catalog = lashlang::LashlangHostCatalog::new();
25    for entry in catalog.tools.iter() {
26        if entry.availability.is_callable() {
27            let lashlang_binding = entry
28                .manifest
29                .lashlang_binding
30                .executable_for(&entry.manifest.name);
31            host_catalog.add_module_operation(
32                lashlang_binding.module_path.iter().map(String::as_str),
33                lashlang_binding.authority_type.clone(),
34                lashlang_binding.operation.clone(),
35                entry.manifest.name.clone(),
36                lashlang::TypeExpr::Any,
37                lashlang::TypeExpr::Any,
38            );
39        }
40    }
41    host_catalog
42}
43
44#[derive(Clone)]
45pub struct RuntimeExecutionContext<'run> {
46    pub(super) session_id: String,
47    pub(super) dispatch: Arc<ToolDispatchContext<'run>>,
48    lashlang_abilities: lashlang::LashlangAbilities,
49    lashlang_language_features: lashlang::LashlangLanguageFeatures,
50    lashlang_host_environment: lashlang::LashlangHostEnvironment,
51    lashlang_artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
52    attachment_store: Arc<dyn crate::AttachmentStore>,
53    chronological_projection: Arc<crate::ChronologicalProjection>,
54    protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
55    turn_context: crate::TurnContext,
56    execution_env_spec: crate::ProcessExecutionEnvSpec,
57    process_originator: Option<crate::ProcessOriginator>,
58    pub(super) runtime_process_id: Option<String>,
59    pub(super) process_event_context: Option<RuntimeExecutionProcessEventContext>,
60    process_env_ref: Option<crate::ProcessExecutionEnvRef>,
61    process_wake_target: Option<crate::SessionScope>,
62    pub(super) parent_invocation: Option<crate::RuntimeInvocation>,
63    lashlang_execution_sink: Option<Arc<dyn lash_trace::TraceSink>>,
64    lashlang_execution_context: lash_trace::TraceContext,
65    turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
66    pub(super) turn_event_tx: Option<Sender<TurnActivity>>,
67    pub(super) cancellation_token: Option<CancellationToken>,
68    /// Process ids started by THIS execution context. Possession of a handle
69    /// the run itself created is sufficient capability to await/cancel it —
70    /// run-local children are not session handle grants (the ephemeral
71    /// execution scope must never appear in durable grant state).
72    started_process_ids: Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
73}
74
75#[derive(Clone)]
76pub(super) struct RuntimeExecutionProcessEventContext {
77    pub process_id: String,
78    pub registry: Arc<dyn crate::ProcessRegistry>,
79    pub store: Option<Arc<dyn crate::RuntimePersistence>>,
80    pub session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
81    pub queued_work_poke: Option<crate::QueuedWorkPoke>,
82}
83
84impl<'run> RuntimeExecutionContext<'run> {
85    pub(crate) fn drain_tool_trigger_outcomes(
86        &self,
87    ) -> Result<Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>, crate::PluginError> {
88        self.dispatch
89            .trigger_outcomes
90            .drain()
91            .map_err(crate::PluginError::Session)
92    }
93
94    pub(super) fn process_scope(
95        &self,
96        parent_invocation: Option<crate::RuntimeInvocation>,
97    ) -> crate::ProcessOpScope<'_> {
98        crate::ProcessOpScope::new(self.dispatch.effect_controller.scoped())
99            .with_parent_invocation(parent_invocation)
100            .with_agent_frame_id(Some(self.dispatch.agent_frame_id.clone()))
101    }
102
103    #[allow(
104        clippy::too_many_arguments,
105        reason = "code execution bridge carries explicit per-turn runtime dependencies"
106    )]
107    pub(crate) fn new(
108        session_id: String,
109        dispatch: Arc<ToolDispatchContext<'run>>,
110        lashlang_abilities: lashlang::LashlangAbilities,
111        lashlang_language_features: lashlang::LashlangLanguageFeatures,
112        lashlang_artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
113        attachment_store: Arc<dyn crate::AttachmentStore>,
114        chronological_projection: Arc<crate::ChronologicalProjection>,
115        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
116        turn_context: crate::TurnContext,
117    ) -> Self {
118        let lashlang_host_environment = lashlang_host_environment_from_tool_catalog(
119            &dispatch.tool_catalog,
120            lashlang_abilities,
121            lashlang_language_features,
122            dispatch.plugins.lashlang_resources(),
123        );
124        Self {
125            session_id,
126            dispatch,
127            lashlang_abilities,
128            lashlang_language_features,
129            lashlang_host_environment,
130            lashlang_artifact_store,
131            attachment_store,
132            chronological_projection,
133            protocol_extension,
134            turn_context,
135            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
136                crate::PluginOptions::default(),
137                crate::SessionPolicy::default(),
138            ),
139            process_originator: None,
140            runtime_process_id: None,
141            process_event_context: None,
142            started_process_ids: Arc::default(),
143            process_env_ref: None,
144            process_wake_target: None,
145            parent_invocation: None,
146            lashlang_execution_sink: None,
147            lashlang_execution_context: lash_trace::TraceContext::default(),
148            turn_phase_probe: None,
149            turn_event_tx: None,
150            cancellation_token: None,
151        }
152    }
153
154    pub fn session_id(&self) -> &str {
155        &self.session_id
156    }
157
158    pub fn attachment_store(&self) -> Arc<dyn crate::AttachmentStore> {
159        Arc::clone(&self.attachment_store)
160    }
161
162    pub async fn put_lashlang_module_artifact(
163        &self,
164        artifact: &lashlang::ModuleArtifact,
165    ) -> Result<(), String> {
166        self.lashlang_artifact_store
167            .put_module_artifact(artifact)
168            .await
169            .map_err(|err| err.to_string())
170    }
171
172    pub fn chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
173        Arc::clone(&self.chronological_projection)
174    }
175
176    pub fn protocol_extension<T: 'static>(&self) -> Option<&T> {
177        self.protocol_extension
178            .as_ref()
179            .and_then(|extension| extension.as_any().downcast_ref::<T>())
180    }
181
182    pub fn turn_context(&self) -> &crate::TurnContext {
183        &self.turn_context
184    }
185
186    pub(crate) fn session_graph_service(&self) -> &dyn crate::plugin::SessionGraphService {
187        self.dispatch.session_graph.as_ref()
188    }
189
190    pub(super) async fn emit_turn_activity(
191        &self,
192        correlation_id: TurnActivityId,
193        event: TurnEvent,
194    ) {
195        if let Some(tx) = &self.turn_event_tx {
196            let _ = tx.send(TurnActivity::new(correlation_id, event)).await;
197        }
198    }
199
200    pub(crate) fn with_turn_event_sender(mut self, turn_event_tx: Sender<TurnActivity>) -> Self {
201        self.turn_event_tx = Some(turn_event_tx);
202        self
203    }
204
205    pub(crate) fn with_parent_invocation(mut self, metadata: crate::RuntimeInvocation) -> Self {
206        self.parent_invocation = Some(metadata);
207        self
208    }
209
210    pub(crate) fn with_execution_env_spec(
211        mut self,
212        execution_env_spec: crate::ProcessExecutionEnvSpec,
213    ) -> Self {
214        self.execution_env_spec = execution_env_spec;
215        self
216    }
217
218    pub(crate) fn with_process_registration_context(
219        mut self,
220        registration: &crate::ProcessRegistration,
221    ) -> Self {
222        self.process_originator = Some(registration.provenance.originator.clone());
223        self.runtime_process_id = Some(registration.id.clone());
224        self.process_env_ref = registration.env_ref.clone();
225        self.process_wake_target = registration.wake_target.clone();
226        self
227    }
228
229    pub(crate) fn with_process_event_context(
230        mut self,
231        process_id: impl Into<String>,
232        registry: Arc<dyn crate::ProcessRegistry>,
233        store: Option<Arc<dyn crate::RuntimePersistence>>,
234        session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
235        queued_work_poke: Option<crate::QueuedWorkPoke>,
236    ) -> Self {
237        self.process_event_context = Some(RuntimeExecutionProcessEventContext {
238            process_id: process_id.into(),
239            registry,
240            store,
241            session_store_factory,
242            queued_work_poke,
243        });
244        self
245    }
246
247    /// Spawn provenance for children started by this context, present only
248    /// when this context executes a process: children inherit the chain's
249    /// originator and wake target instead of the ephemeral execution scope.
250    pub(super) fn record_started_process(&self, process_id: &str) {
251        self.started_process_ids
252            .lock()
253            .expect("started process ids lock")
254            .insert(process_id.to_string());
255    }
256
257    pub(super) fn is_run_local_process(&self, process_id: &str) -> bool {
258        self.started_process_ids
259            .lock()
260            .expect("started process ids lock")
261            .contains(process_id)
262    }
263
264    pub(crate) fn process_spawn_provenance(&self) -> Option<crate::ProcessSpawnProvenance> {
265        self.process_originator
266            .clone()
267            .map(|originator| crate::ProcessSpawnProvenance {
268                originator,
269                wake_target: self.process_wake_target.clone(),
270            })
271    }
272
273    pub(super) async fn attach_captured_process_execution_env(
274        &self,
275        registration: crate::ProcessRegistration,
276    ) -> Result<crate::ProcessRegistration, crate::PluginError> {
277        if registration.env_ref.is_some() {
278            return Ok(registration);
279        }
280        match registration.input.as_ref() {
281            crate::ProcessInput::ToolCall { .. } | crate::ProcessInput::LashlangProcess { .. } => {
282                let env_ref = self.captured_process_execution_env_ref().await?;
283                Ok(registration.with_execution_env_ref(Some(env_ref)))
284            }
285            crate::ProcessInput::External { .. } | crate::ProcessInput::SessionTurn { .. } => {
286                Ok(registration)
287            }
288        }
289    }
290
291    async fn captured_process_execution_env_ref(
292        &self,
293    ) -> Result<crate::ProcessExecutionEnvRef, crate::PluginError> {
294        if let Some(env_ref) = self.process_env_ref.clone() {
295            return Ok(env_ref);
296        }
297        crate::persist_process_execution_env(
298            self.lashlang_artifact_store.as_ref(),
299            &self.execution_env_spec,
300        )
301        .await
302    }
303
304    pub(crate) fn with_lashlang_execution_trace(
305        mut self,
306        sink: Option<Arc<dyn lash_trace::TraceSink>>,
307        context: lash_trace::TraceContext,
308    ) -> Self {
309        self.lashlang_execution_sink = sink;
310        self.lashlang_execution_context = context;
311        self
312    }
313
314    pub(crate) fn with_turn_phase_probe(
315        mut self,
316        probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
317    ) -> Self {
318        self.turn_phase_probe = probe;
319        self
320    }
321
322    #[doc(hidden)]
323    pub fn named_phase(&self, phase: &'static str) -> crate::runtime::RuntimeNamedPhase {
324        crate::runtime::RuntimeNamedPhase::begin(self.turn_phase_probe.clone(), phase)
325    }
326
327    pub fn parent_invocation(&self) -> Option<&crate::RuntimeInvocation> {
328        self.parent_invocation.as_ref()
329    }
330
331    pub fn lashlang_execution_sink(&self) -> Option<Arc<dyn lash_trace::TraceSink>> {
332        self.lashlang_execution_sink.clone()
333    }
334
335    pub fn lashlang_execution_context(&self) -> &lash_trace::TraceContext {
336        &self.lashlang_execution_context
337    }
338
339    pub(crate) fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
340        self.cancellation_token = Some(cancellation_token);
341        self
342    }
343
344    pub(crate) fn tool_scheduling(&self, name: &str) -> crate::ToolScheduling {
345        crate::tool_dispatch::resolve_tool_scheduling(&self.dispatch, name)
346    }
347
348    pub fn callable_tool_manifest(&self, name: &str) -> Option<crate::ToolManifest> {
349        crate::tool_dispatch::resolve_callable_manifest(&self.dispatch, name)
350    }
351
352    pub fn callable_tool_manifest_by_id(&self, id: &crate::ToolId) -> Option<crate::ToolManifest> {
353        crate::tool_dispatch::resolve_callable_manifest_by_id(&self.dispatch, id)
354    }
355
356    pub fn resolve_lashlang_host_operation(
357        &self,
358        receiver: &lashlang::ResourceHandle,
359        operation: &str,
360    ) -> Result<String, String> {
361        self.lashlang_host_environment
362            .resources
363            .resolve_module_operation(&receiver.resource_type, &receiver.alias, operation)
364            .map(|binding| binding.host_operation.clone())
365            .ok_or_else(|| {
366                format!(
367                    "module `{}` of type `{}` does not expose operation `{operation}`",
368                    receiver.alias, receiver.resource_type
369                )
370            })
371    }
372
373    pub async fn prepare_lashlang_process_start(
374        &self,
375        start: lashlang::ProcessStart,
376    ) -> Result<(crate::ProcessRegistration, Option<String>), String> {
377        let _phase = self.named_phase("rlm_process.prepare_start");
378        let display_name = Some(start.process_name.clone());
379        let artifact = self
380            .lashlang_artifact_store
381            .get_module_artifact(&start.module_ref)
382            .await
383            .map_err(|err| format!("failed to load lashlang module artifact: {err}"))?
384            .ok_or_else(|| {
385                format!(
386                    "missing lashlang module artifact `{}` for process `{}`",
387                    start.module_ref, start.process_name
388                )
389            })?;
390        if artifact.host_requirements_ref != start.host_requirements_ref {
391            return Err(format!(
392                "lashlang module artifact `{}` host requirements mismatch: process requested {}, artifact has {}",
393                start.module_ref, start.host_requirements_ref, artifact.host_requirements_ref
394            ));
395        }
396        if artifact.process_ref(&start.process_name) != Some(&start.process_ref) {
397            return Err(format!(
398                "lashlang module artifact `{}` does not export process `{}` as requested ref {:?}",
399                start.module_ref, start.process_name, start.process_ref
400            ));
401        }
402        let args = match serde_json::to_value(lashlang::Value::Record(Arc::new(start.args)))
403            .map_err(|err| format!("failed to serialize process args: {err}"))?
404        {
405            serde_json::Value::Object(map) => map,
406            _ => return Err("process args must serialize as a record".to_string()),
407        };
408        let signal_event_types = artifact
409            .canonical_ir
410            .process(&start.process_name)
411            .map(crate::lashlang_process_signal_event_types)
412            .unwrap_or_default();
413        let process_id = format!("process:{}", uuid::Uuid::new_v4());
414        let registration = crate::ProcessRegistration::session_start_draft(
415            process_id,
416            crate::ProcessInput::LashlangProcess {
417                module_ref: start.module_ref,
418                process_ref: start.process_ref,
419                host_requirements_ref: start.host_requirements_ref,
420                process_name: start.process_name,
421                args,
422            },
423        )
424        .with_extra_event_types(
425            crate::lashlang_process_event_types()
426                .into_iter()
427                .chain(signal_event_types),
428        );
429        Ok((registration, display_name))
430    }
431
432    pub fn lashlang_host_environment(&self) -> &lashlang::LashlangHostEnvironment {
433        &self.lashlang_host_environment
434    }
435
436    pub fn lashlang_abilities(&self) -> lashlang::LashlangAbilities {
437        self.lashlang_abilities
438    }
439
440    pub fn lashlang_language_features(&self) -> lashlang::LashlangLanguageFeatures {
441        self.lashlang_language_features
442    }
443
444    pub fn link_lashlang_module(
445        &self,
446        program: lashlang::Program,
447    ) -> Result<lashlang::LinkedModule, String> {
448        lashlang::LinkedModule::link(program, self.lashlang_host_environment())
449            .map_err(|err| err.to_string())
450    }
451
452    pub async fn perform_lashlang_trigger_operation(
453        &self,
454        operation: &str,
455        payload: serde_json::Value,
456    ) -> Result<serde_json::Value, String> {
457        match lashlang::TriggerHostOperation::from_host_operation(operation) {
458            Some(lashlang::TriggerHostOperation::Register) => self
459                .register_trigger_subscription(payload)
460                .await
461                .map_err(|err| err.to_string()),
462            Some(lashlang::TriggerHostOperation::List) => self
463                .list_trigger_subscriptions(payload)
464                .await
465                .map_err(|err| err.to_string()),
466            Some(lashlang::TriggerHostOperation::Cancel) => self
467                .cancel_trigger_subscription(payload)
468                .await
469                .map_err(|err| err.to_string()),
470            None => Err(format!("unknown trigger operation `{operation}`")),
471        }
472    }
473
474    async fn register_trigger_subscription(
475        &self,
476        payload: serde_json::Value,
477    ) -> Result<serde_json::Value, crate::PluginError> {
478        let router = self.dispatch.trigger_router.as_ref().ok_or_else(|| {
479            crate::PluginError::Session("trigger store is unavailable in this runtime".to_string())
480        })?;
481        let request = lashlang::TriggerRegistrationRequest::decode(&payload)
482            .map_err(|err| crate::PluginError::Session(err.to_string()))?;
483        let source_type = request.source.source_type.clone();
484        let source_value = request.source.value.clone();
485        let source = request.source.to_json();
486        let event_type = lashlang::event_type_for_source(
487            &self.dispatch.plugins.lashlang_resources(),
488            &source_type,
489        )
490        .map_err(|err| crate::PluginError::Session(err.to_string()))?;
491        let validation = crate::plugin::validate_target_process(
492            &request.target,
493            &event_type,
494            &request.inputs,
495            self.lashlang_artifact_store.as_ref(),
496        )
497        .await?;
498        let store = router.store();
499        let source_key = store
500            .source_key_for_subscription(&source_type, &source_value)
501            .await?;
502        let env_ref = match self.process_env_ref.clone() {
503            Some(env_ref) => env_ref,
504            None => {
505                crate::persist_process_execution_env(
506                    self.lashlang_artifact_store.as_ref(),
507                    &self.execution_env_spec,
508                )
509                .await?
510            }
511        };
512        let registrant = self.process_originator.clone().unwrap_or_else(|| {
513            crate::ProcessOriginator::session(crate::SessionScope::new(self.session_id.clone()))
514        });
515        let wake_target = self
516            .process_wake_target
517            .clone()
518            .or_else(|| match &registrant {
519                crate::ProcessOriginator::Session { scope } => Some(scope.clone()),
520                crate::ProcessOriginator::Host => None,
521            });
522        let record = store
523            .register_subscription(crate::TriggerSubscriptionDraft {
524                registrant,
525                env_ref,
526                wake_target,
527                name: request.name,
528                source_type,
529                source_key,
530                source,
531                event_ty: validation.event_ty,
532                module_ref: request.target.module_ref,
533                host_requirements_ref: request.target.host_requirements_ref,
534                process_ref: request.target.process_ref,
535                process_name: request.target.process_name,
536                input_template: validation.inputs,
537            })
538            .await?;
539        Ok(crate::plugin::trigger_handle_json(&record.handle))
540    }
541
542    async fn list_trigger_subscriptions(
543        &self,
544        payload: serde_json::Value,
545    ) -> Result<serde_json::Value, crate::PluginError> {
546        let router = self.dispatch.trigger_router.as_ref().ok_or_else(|| {
547            crate::PluginError::Session("trigger store is unavailable in this runtime".to_string())
548        })?;
549        let request = lashlang::TriggerListRequest::decode(&payload)
550            .map_err(|err| crate::PluginError::Session(err.to_string()))?;
551        let mut filter = crate::TriggerSubscriptionFilter::for_session(&self.session_id);
552        filter.target = request.target;
553        filter.name = request.name;
554        filter.source_type = request.source_type;
555        filter.enabled = request.enabled;
556        let registrations = router
557            .store()
558            .list_subscriptions(filter)
559            .await?
560            .iter()
561            .map(crate::TriggerRegistration::from)
562            .collect::<Vec<_>>();
563        serde_json::to_value(registrations).map_err(|err| {
564            crate::PluginError::Session(format!("failed to encode trigger registrations: {err}"))
565        })
566    }
567
568    async fn cancel_trigger_subscription(
569        &self,
570        payload: serde_json::Value,
571    ) -> Result<serde_json::Value, crate::PluginError> {
572        let router = self.dispatch.trigger_router.as_ref().ok_or_else(|| {
573            crate::PluginError::Session("trigger store is unavailable in this runtime".to_string())
574        })?;
575        let request = lashlang::TriggerCancelRequest::decode(&payload)
576            .map_err(|err| crate::PluginError::Session(err.to_string()))?;
577        let changed = router
578            .store()
579            .cancel_subscription(&self.session_id, &request.handle)
580            .await?;
581        Ok(serde_json::json!(changed))
582    }
583
584    pub fn tool_argument_projection_policy(
585        &self,
586        name: &str,
587    ) -> crate::ToolArgumentProjectionPolicy {
588        crate::tool_dispatch::resolve_tool_argument_projection_policy(&self.dispatch, name)
589    }
590
591    pub async fn start_lashlang_process(
592        &self,
593        registration: crate::ProcessRegistration,
594        label: Option<String>,
595    ) -> crate::ToolInvocationReply {
596        let _phase = self.named_phase("rlm_process.start");
597        let registration = match self
598            .attach_captured_process_execution_env(registration)
599            .await
600        {
601            Ok(registration) => registration,
602            Err(err) => {
603                return crate::ToolInvocationReply::error(serde_json::json!(err.to_string()));
604            }
605        };
606        let process_id = registration.id.clone();
607        let mut options = crate::ProcessStartOptions::new()
608            .with_descriptor(crate::ProcessHandleDescriptor::new(Some("lashlang"), label));
609        if let Some(spawn) = self.process_spawn_provenance() {
610            options = options.with_spawn_provenance(spawn);
611        }
612        match self
613            .dispatch
614            .processes
615            .start(
616                &self.session_id,
617                registration,
618                options,
619                self.process_scope(self.parent_invocation.clone()),
620            )
621            .await
622        {
623            Ok(_) => {
624                self.record_started_process(&process_id);
625                crate::ToolInvocationReply::success(crate::lashlang_bridge::process_handle_json(
626                    &process_id,
627                ))
628            }
629            Err(err) => crate::ToolInvocationReply::error(serde_json::json!(err.to_string())),
630        }
631    }
632
633    pub async fn sleep_lashlang(
634        &self,
635        scope: &str,
636        sequence: u64,
637        duration_ms: u64,
638    ) -> Result<(), crate::RuntimeEffectControllerError> {
639        let cancellation = self.cancellation_token.clone().unwrap_or_default();
640        let invocation = crate::runtime::causal::lashlang_sleep_invocation(
641            &self.session_id,
642            self.parent_invocation.as_ref(),
643            scope,
644            sequence,
645        );
646        let outcome = self
647            .dispatch
648            .effect_controller
649            .controller()
650            .execute_effect(
651                crate::RuntimeEffectEnvelope::new(
652                    invocation,
653                    crate::RuntimeEffectCommand::Sleep { duration_ms },
654                ),
655                crate::RuntimeEffectLocalExecutor::sleep(cancellation),
656            )
657            .await?;
658        match outcome {
659            crate::RuntimeEffectOutcome::Sleep => Ok(()),
660            other => Err(crate::RuntimeEffectControllerError::new(
661                "runtime_effect_wrong_outcome",
662                format!("expected sleep outcome, got {}", other.kind().as_str()),
663            )),
664        }
665    }
666
667    pub async fn await_process_event_lashlang(
668        &self,
669        _registry: Arc<dyn crate::ProcessRegistry>,
670        process_id: &str,
671        signal_name: &str,
672        _event_type: &str,
673        event_ordinal: u64,
674    ) -> Result<serde_json::Value, crate::RuntimeEffectControllerError> {
675        let cancellation = self.cancellation_token.clone().unwrap_or_default();
676        let key = self
677            .dispatch
678            .effect_controller
679            .controller()
680            .await_event_key(
681                &crate::ExecutionScope::process(process_id),
682                crate::AwaitEventWaitIdentity::process_signal(
683                    process_id,
684                    signal_name,
685                    event_ordinal,
686                ),
687            )
688            .await?;
689        let invocation = crate::runtime::causal::lashlang_await_event_invocation(
690            &self.session_id,
691            self.parent_invocation.as_ref(),
692            process_id,
693            signal_name,
694            event_ordinal,
695        );
696        let outcome = self
697            .dispatch
698            .effect_controller
699            .controller()
700            .execute_effect(
701                crate::RuntimeEffectEnvelope::new(
702                    invocation,
703                    crate::RuntimeEffectCommand::AwaitEvent { key },
704                ),
705                crate::RuntimeEffectLocalExecutor::await_event(cancellation, None),
706            )
707            .await?;
708        match outcome.into_await_event()? {
709            crate::Resolution::Ok(value) => Ok(value),
710            crate::Resolution::Err(err) => Err(crate::RuntimeEffectControllerError::new(
711                err.code,
712                err.message,
713            )),
714            crate::Resolution::Timeout => Err(crate::RuntimeEffectControllerError::new(
715                "process_signal_wait_timeout",
716                "process signal wait timed out",
717            )),
718            crate::Resolution::Cancelled => Err(crate::RuntimeEffectControllerError::new(
719                "process_signal_wait_cancelled",
720                "process signal wait was cancelled",
721            )),
722        }
723    }
724
725    pub async fn signal_lashlang_process(
726        &self,
727        registry: Arc<dyn crate::ProcessRegistry>,
728        process_id: &str,
729        signal_name: &str,
730        signal_id: String,
731        payload: serde_json::Value,
732    ) -> Result<crate::ProcessEvent, crate::RuntimeEffectControllerError> {
733        let event_type = crate::process_signal_event_type(signal_name)?;
734        let replay_key = format!("process:{process_id}:signal.{signal_name}:{signal_id}");
735        let signal_payload = payload.clone();
736        let command = crate::ProcessCommand::Signal {
737            process_id: process_id.to_string(),
738            signal_name: signal_name.to_string(),
739            signal_id,
740            request: crate::ProcessEventAppendRequest::new(event_type.clone(), payload)
741                .with_replay_key(replay_key),
742        };
743        let effect_id = command.effect_id();
744        let invocation = crate::runtime::causal::process_effect_invocation(
745            &self.session_id,
746            self.parent_invocation.clone(),
747            &effect_id,
748        );
749        let outcome = self
750            .dispatch
751            .effect_controller
752            .controller()
753            .execute_effect(
754                crate::RuntimeEffectEnvelope::new(
755                    invocation,
756                    crate::RuntimeEffectCommand::process(command),
757                ),
758                crate::RuntimeEffectLocalExecutor::processes(Arc::clone(&registry)),
759            )
760            .await?;
761        match outcome.into_process()? {
762            crate::ProcessEffectOutcome::Signal { event } => {
763                let waiting_ordinal =
764                    registry
765                        .get_process(process_id)
766                        .await
767                        .and_then(|record| match record.wait {
768                            Some(crate::WaitState {
769                                kind:
770                                    crate::WaitKind::Signal {
771                                        name,
772                                        event_type: wait_event_type,
773                                        ordinal,
774                                        ..
775                                    },
776                                ..
777                            }) if name == signal_name && wait_event_type == event_type => {
778                                Some(ordinal)
779                            }
780                            _ => None,
781                        });
782                let ordinal = match waiting_ordinal {
783                    Some(ordinal) => ordinal,
784                    None => {
785                        registry
786                            .count_events_through(process_id, &event_type, event.sequence)
787                            .await?
788                    }
789                };
790                if ordinal > 0 {
791                    let key = self
792                        .dispatch
793                        .effect_controller
794                        .controller()
795                        .await_event_key(
796                            &crate::ExecutionScope::process(process_id),
797                            crate::AwaitEventWaitIdentity::process_signal(
798                                process_id,
799                                signal_name,
800                                ordinal,
801                            ),
802                        )
803                        .await?;
804                    let _ = self
805                        .dispatch
806                        .effect_controller
807                        .controller()
808                        .resolve_await_event(&key, crate::Resolution::Ok(signal_payload))
809                        .await?;
810                }
811                Ok(event)
812            }
813            other => Err(crate::RuntimeEffectControllerError::new(
814                "runtime_effect_wrong_outcome",
815                format!("expected signal outcome, got {other:?}"),
816            )),
817        }
818    }
819}
820
821#[cfg(test)]
822mod tests {
823    use super::*;
824    use crate::tool_dispatch::ToolDispatchContext;
825    use crate::{ToolCall, ToolProvider, ToolResult};
826
827    struct NoopTools;
828
829    #[async_trait::async_trait]
830    impl ToolProvider for NoopTools {
831        fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
832            Vec::new()
833        }
834
835        fn resolve_contract(&self, _name: &str) -> Option<Arc<crate::ToolContract>> {
836            None
837        }
838
839        async fn execute(&self, _call: ToolCall<'_>) -> ToolResult {
840            ToolResult::err_fmt("not used")
841        }
842    }
843
844    #[test]
845    fn tool_argument_projection_policy_resolves_from_active_catalog_and_defaults_unknown() {
846        let tool = crate::ToolDefinition::raw(
847            "tool:seedy",
848            "seedy",
849            "Seed-aware",
850            crate::ToolDefinition::default_input_schema(),
851            serde_json::json!({ "type": "string" }),
852        )
853        .with_argument_projection(
854            crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed"),
855        );
856        let plugins = crate::plugin::PluginHost::empty()
857            .build_session("session", None)
858            .expect("plugin session");
859        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
860        let dispatch = Arc::new(ToolDispatchContext {
861            plugins,
862            tools: Arc::new(NoopTools),
863            tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
864                vec![tool.manifest()],
865                std::collections::BTreeMap::new(),
866            )),
867            sessions: Arc::new(crate::testing::MockSessionManager::default()),
868            session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
869            session_graph: Arc::new(crate::testing::MockSessionManager::default()),
870            processes: Arc::new(crate::UnavailableProcessService),
871            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
872            trigger_router: None,
873            effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
874                crate::InlineRuntimeEffectController,
875            )),
876            direct_completions: crate::DirectCompletionClient::unavailable(
877                "direct completions are unavailable in this test context",
878            ),
879            parent_invocation: None,
880            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
881                crate::PluginOptions::default(),
882                crate::SessionPolicy::default(),
883            ),
884            session_id: "session".to_string(),
885            agent_frame_id: String::new(),
886            event_tx,
887            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
888            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
889            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
890            turn_context: crate::TurnContext::default(),
891        });
892        let ctx = RuntimeExecutionContext::new(
893            "session".to_string(),
894            dispatch,
895            Default::default(),
896            Default::default(),
897            Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
898            Arc::new(crate::InMemoryAttachmentStore::new()),
899            Arc::new(crate::ChronologicalProjection::default()),
900            None,
901            crate::TurnContext::default(),
902        );
903
904        assert_eq!(
905            ctx.tool_argument_projection_policy("seedy"),
906            crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed")
907        );
908        assert_eq!(
909            ctx.tool_argument_projection_policy("missing"),
910            crate::ToolArgumentProjectionPolicy::MaterializeProjectedValues
911        );
912    }
913
914    #[tokio::test]
915    async fn prepare_lashlang_process_start_captures_tool_ids_and_explicit_input() {
916        let tool = crate::ToolDefinition::raw(
917            "tool:alpha",
918            "alpha",
919            "Alpha tool.",
920            crate::ToolDefinition::default_input_schema(),
921            serde_json::json!({ "type": "object", "additionalProperties": true }),
922        );
923        let plugins = crate::plugin::PluginHost::empty()
924            .build_session("session", None)
925            .expect("plugin session");
926        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
927        let dispatch = Arc::new(ToolDispatchContext {
928            plugins,
929            tools: Arc::new(NoopTools),
930            tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
931                vec![tool.manifest()],
932                std::collections::BTreeMap::new(),
933            )),
934            sessions: Arc::new(crate::testing::MockSessionManager::default()),
935            session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
936            session_graph: Arc::new(crate::testing::MockSessionManager::default()),
937            processes: Arc::new(crate::UnavailableProcessService),
938            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
939            trigger_router: None,
940            effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
941                crate::InlineRuntimeEffectController,
942            )),
943            direct_completions: crate::DirectCompletionClient::unavailable(
944                "direct completions are unavailable in this test context",
945            ),
946            parent_invocation: None,
947            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
948                crate::PluginOptions::default(),
949                crate::SessionPolicy::default(),
950            ),
951            session_id: "session".to_string(),
952            agent_frame_id: String::new(),
953            event_tx,
954            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
955            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
956            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
957            turn_context: crate::TurnContext::default(),
958        });
959        let ctx = RuntimeExecutionContext::new(
960            "session".to_string(),
961            dispatch,
962            lashlang::LashlangAbilities::default().with_processes(),
963            Default::default(),
964            Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
965            Arc::new(crate::InMemoryAttachmentStore::new()),
966            Arc::new(crate::ChronologicalProjection::default()),
967            None,
968            crate::TurnContext::default(),
969        );
970        let mut input = lashlang::Record::new();
971        input.insert("root".to_string(), lashlang::Value::String(".".into()));
972        let linked = ctx
973            .link_lashlang_module(
974                lashlang::parse("process scan(root: str) { finish root }").expect("process module"),
975            )
976            .expect("link process module");
977        ctx.put_lashlang_module_artifact(&linked.artifact)
978            .await
979            .expect("store module artifact");
980        let process_ref = linked
981            .artifact
982            .process_ref("scan")
983            .expect("scan process ref")
984            .clone();
985        let (registration, label) = ctx
986            .prepare_lashlang_process_start(lashlang::ProcessStart {
987                module_ref: linked.module_ref.clone(),
988                process_ref,
989                host_requirements_ref: linked.host_requirements_ref.clone(),
990                process_name: "scan".to_string(),
991                args: input,
992            })
993            .await
994            .expect("process start should prepare");
995
996        assert_eq!(label.as_deref(), Some("scan"));
997        assert!(
998            registration
999                .event_types
1000                .iter()
1001                .any(|event_type| event_type.name == "process.wake")
1002        );
1003        let crate::ProcessInput::LashlangProcess {
1004            args, process_name, ..
1005        } = registration.input.as_ref()
1006        else {
1007            panic!("expected lashlang process input");
1008        };
1009        assert_eq!(process_name, "scan");
1010        assert_eq!(args.get("root"), Some(&serde_json::json!(".")));
1011    }
1012
1013    #[test]
1014    fn lashlang_host_environment_reflects_host_abilities() {
1015        let tool = crate::ToolDefinition::raw(
1016            "tool:alpha",
1017            "alpha",
1018            "Alpha tool.",
1019            crate::ToolDefinition::default_input_schema(),
1020            serde_json::json!({ "type": "object", "additionalProperties": true }),
1021        );
1022        let plugins = crate::plugin::PluginHost::empty()
1023            .build_session("session", None)
1024            .expect("plugin session");
1025        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
1026        let dispatch = Arc::new(ToolDispatchContext {
1027            plugins,
1028            tools: Arc::new(NoopTools),
1029            tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
1030                vec![tool.manifest()],
1031                std::collections::BTreeMap::new(),
1032            )),
1033            sessions: Arc::new(crate::testing::MockSessionManager::default()),
1034            session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
1035            session_graph: Arc::new(crate::testing::MockSessionManager::default()),
1036            processes: Arc::new(crate::UnavailableProcessService),
1037            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
1038            trigger_router: None,
1039            effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1040                crate::InlineRuntimeEffectController,
1041            )),
1042            direct_completions: crate::DirectCompletionClient::unavailable(
1043                "direct completions are unavailable in this test context",
1044            ),
1045            parent_invocation: None,
1046            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
1047                crate::PluginOptions::default(),
1048                crate::SessionPolicy::default(),
1049            ),
1050            session_id: "session".to_string(),
1051            agent_frame_id: String::new(),
1052            event_tx,
1053            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1054            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
1055            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
1056            turn_context: crate::TurnContext::default(),
1057        });
1058        let ctx = RuntimeExecutionContext::new(
1059            "session".to_string(),
1060            dispatch,
1061            lashlang::LashlangAbilities::default()
1062                .with_sleep()
1063                .with_processes()
1064                .with_process_signals(),
1065            Default::default(),
1066            Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
1067            Arc::new(crate::InMemoryAttachmentStore::new()),
1068            Arc::new(crate::ChronologicalProjection::default()),
1069            None,
1070            crate::TurnContext::default(),
1071        );
1072
1073        let environment = ctx.lashlang_host_environment();
1074
1075        assert!(std::ptr::eq(environment, ctx.lashlang_host_environment()));
1076        assert!(environment.abilities.processes);
1077        assert!(environment.abilities.sleep);
1078        assert!(environment.abilities.process_signals);
1079        assert!(!environment.abilities.triggers);
1080        assert!(
1081            environment
1082                .resources
1083                .resolve_operation("Tools", "alpha")
1084                .is_some()
1085        );
1086    }
1087
1088    #[test]
1089    fn lashlang_host_environment_reflects_host_resource_contributions() {
1090        let mut resources = lashlang::LashlangHostCatalog::new();
1091        resources
1092            .add_trigger_source_constructor(
1093                ["clock", "Alarm"],
1094                lashlang::TypeExpr::Object(vec![lashlang::TypeField {
1095                    name: "at".into(),
1096                    ty: lashlang::TypeExpr::Str,
1097                    optional: false,
1098                }]),
1099                lashlang::NamedDataType::object(
1100                    "clock.Tick",
1101                    vec![lashlang::TypeField {
1102                        name: "fired_at".into(),
1103                        ty: lashlang::TypeExpr::Str,
1104                        optional: false,
1105                    }],
1106                )
1107                .expect("valid clock tick type"),
1108            )
1109            .expect("valid clock trigger source");
1110        let plugin_host = crate::plugin::PluginHost::empty();
1111        let mut merged_resources = plugin_host.lashlang_resources();
1112        merged_resources.extend(resources);
1113        let plugins = plugin_host
1114            .with_lashlang_resources(merged_resources)
1115            .build_session("session", None)
1116            .expect("plugin session");
1117        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
1118        let dispatch = Arc::new(ToolDispatchContext {
1119            plugins,
1120            tools: Arc::new(NoopTools),
1121            tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
1122                Vec::new(),
1123                std::collections::BTreeMap::new(),
1124            )),
1125            sessions: Arc::new(crate::testing::MockSessionManager::default()),
1126            session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
1127            session_graph: Arc::new(crate::testing::MockSessionManager::default()),
1128            processes: Arc::new(crate::UnavailableProcessService),
1129            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
1130            trigger_router: None,
1131            effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1132                crate::InlineRuntimeEffectController,
1133            )),
1134            direct_completions: crate::DirectCompletionClient::unavailable(
1135                "direct completions are unavailable in this test context",
1136            ),
1137            parent_invocation: None,
1138            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
1139                crate::PluginOptions::default(),
1140                crate::SessionPolicy::default(),
1141            ),
1142            session_id: "session".to_string(),
1143            agent_frame_id: String::new(),
1144            event_tx,
1145            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1146            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
1147            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
1148            turn_context: crate::TurnContext::default(),
1149        });
1150        let ctx = RuntimeExecutionContext::new(
1151            "session".to_string(),
1152            dispatch,
1153            lashlang::LashlangAbilities::default()
1154                .with_processes()
1155                .with_triggers(),
1156            Default::default(),
1157            Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
1158            Arc::new(crate::InMemoryAttachmentStore::new()),
1159            Arc::new(crate::ChronologicalProjection::default()),
1160            None,
1161            crate::TurnContext::default(),
1162        );
1163
1164        let host_environment = ctx.lashlang_host_environment();
1165
1166        assert!(
1167            host_environment
1168                .resources
1169                .resolve_value_constructor(&["clock", "Alarm"])
1170                .is_some()
1171        );
1172        assert!(
1173            host_environment
1174                .resources
1175                .resolve_trigger_source("clock.Alarm")
1176                .is_some()
1177        );
1178        lashlang::LinkedModule::link(
1179            lashlang::parse(
1180                r#"
1181                process remember(tick: clock.Tick) {
1182                  finish true
1183                }
1184
1185                source = clock.Alarm({ at: "08:00" })
1186                await triggers.register({
1187                  source: source,
1188                  target: remember,
1189                  inputs: { tick: trigger.event }
1190                })?
1191                "#,
1192            )
1193            .expect("parse trigger registry module"),
1194            host_environment,
1195        )
1196        .expect("host resource contribution should be linkable");
1197    }
1198}