1use std::sync::Arc;
2
3use tokio::sync::mpsc::Sender;
4use tokio_util::sync::CancellationToken;
5
6use crate::tool_dispatch::ToolDispatchContext;
7use crate::{TurnActivity, TurnActivityId, TurnEvent};
8
9pub(crate) fn lashlang_host_environment_from_tool_catalog(
10 catalog: &crate::ToolCatalog,
11 abilities: lashlang::LashlangAbilities,
12 language_features: lashlang::LashlangLanguageFeatures,
13 host_resources: lashlang::LashlangHostCatalog,
14) -> lashlang::LashlangHostEnvironment {
15 let mut resources = lashlang_resources_from_tool_catalog(catalog);
16 resources.extend(host_resources);
17 lashlang::LashlangHostEnvironment::new(resources, abilities)
18 .with_language_features(language_features)
19}
20
21pub(crate) fn lashlang_resources_from_tool_catalog(
22 catalog: &crate::ToolCatalog,
23) -> lashlang::LashlangHostCatalog {
24 let mut host_catalog = lashlang::LashlangHostCatalog::new();
25 for entry in catalog.tools.iter() {
26 if entry.availability.is_callable() {
27 let lashlang_binding = entry
28 .manifest
29 .lashlang_binding
30 .executable_for(&entry.manifest.name);
31 host_catalog.add_module_operation(
32 lashlang_binding.module_path.iter().map(String::as_str),
33 lashlang_binding.authority_type.clone(),
34 lashlang_binding.operation.clone(),
35 entry.manifest.name.clone(),
36 lashlang::TypeExpr::Any,
37 lashlang::TypeExpr::Any,
38 );
39 }
40 }
41 host_catalog
42}
43
44#[derive(Clone)]
45pub struct RuntimeExecutionContext<'run> {
46 pub(super) session_id: String,
47 pub(super) dispatch: Arc<ToolDispatchContext<'run>>,
48 lashlang_abilities: lashlang::LashlangAbilities,
49 lashlang_language_features: lashlang::LashlangLanguageFeatures,
50 lashlang_host_environment: lashlang::LashlangHostEnvironment,
51 lashlang_artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
52 attachment_store: Arc<dyn crate::AttachmentStore>,
53 chronological_projection: Arc<crate::ChronologicalProjection>,
54 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
55 turn_context: crate::TurnContext,
56 execution_env_spec: crate::ProcessExecutionEnvSpec,
57 process_originator: Option<crate::ProcessOriginator>,
58 pub(super) runtime_process_id: Option<String>,
59 pub(super) process_event_context: Option<RuntimeExecutionProcessEventContext>,
60 process_env_ref: Option<crate::ProcessExecutionEnvRef>,
61 process_wake_target: Option<crate::SessionScope>,
62 pub(super) parent_invocation: Option<crate::RuntimeInvocation>,
63 lashlang_execution_sink: Option<Arc<dyn lash_trace::TraceSink>>,
64 lashlang_execution_context: lash_trace::TraceContext,
65 turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
66 pub(super) turn_event_tx: Option<Sender<TurnActivity>>,
67 pub(super) cancellation_token: Option<CancellationToken>,
68 started_process_ids: Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
73}
74
75#[derive(Clone)]
76pub(super) struct RuntimeExecutionProcessEventContext {
77 pub process_id: String,
78 pub registry: Arc<dyn crate::ProcessRegistry>,
79 pub store: Option<Arc<dyn crate::RuntimePersistence>>,
80 pub session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
81 pub queued_work_poke: Option<crate::QueuedWorkPoke>,
82}
83
84impl<'run> RuntimeExecutionContext<'run> {
85 pub(crate) fn drain_tool_trigger_outcomes(
86 &self,
87 ) -> Result<Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>, crate::PluginError> {
88 self.dispatch
89 .trigger_outcomes
90 .drain()
91 .map_err(crate::PluginError::Session)
92 }
93
94 pub(super) fn process_scope(
95 &self,
96 parent_invocation: Option<crate::RuntimeInvocation>,
97 ) -> crate::ProcessOpScope<'_> {
98 crate::ProcessOpScope::new(self.dispatch.effect_controller.scoped())
99 .with_parent_invocation(parent_invocation)
100 .with_agent_frame_id(Some(self.dispatch.agent_frame_id.clone()))
101 }
102
103 #[allow(
104 clippy::too_many_arguments,
105 reason = "code execution bridge carries explicit per-turn runtime dependencies"
106 )]
107 pub(crate) fn new(
108 session_id: String,
109 dispatch: Arc<ToolDispatchContext<'run>>,
110 lashlang_abilities: lashlang::LashlangAbilities,
111 lashlang_language_features: lashlang::LashlangLanguageFeatures,
112 lashlang_artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
113 attachment_store: Arc<dyn crate::AttachmentStore>,
114 chronological_projection: Arc<crate::ChronologicalProjection>,
115 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
116 turn_context: crate::TurnContext,
117 ) -> Self {
118 let lashlang_host_environment = lashlang_host_environment_from_tool_catalog(
119 &dispatch.tool_catalog,
120 lashlang_abilities,
121 lashlang_language_features,
122 dispatch.plugins.lashlang_resources(),
123 );
124 Self {
125 session_id,
126 dispatch,
127 lashlang_abilities,
128 lashlang_language_features,
129 lashlang_host_environment,
130 lashlang_artifact_store,
131 attachment_store,
132 chronological_projection,
133 protocol_extension,
134 turn_context,
135 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
136 crate::PluginOptions::default(),
137 crate::SessionPolicy::default(),
138 ),
139 process_originator: None,
140 runtime_process_id: None,
141 process_event_context: None,
142 started_process_ids: Arc::default(),
143 process_env_ref: None,
144 process_wake_target: None,
145 parent_invocation: None,
146 lashlang_execution_sink: None,
147 lashlang_execution_context: lash_trace::TraceContext::default(),
148 turn_phase_probe: None,
149 turn_event_tx: None,
150 cancellation_token: None,
151 }
152 }
153
154 pub fn session_id(&self) -> &str {
155 &self.session_id
156 }
157
158 pub fn attachment_store(&self) -> Arc<dyn crate::AttachmentStore> {
159 Arc::clone(&self.attachment_store)
160 }
161
162 pub async fn put_lashlang_module_artifact(
163 &self,
164 artifact: &lashlang::ModuleArtifact,
165 ) -> Result<(), String> {
166 self.lashlang_artifact_store
167 .put_module_artifact(artifact)
168 .await
169 .map_err(|err| err.to_string())
170 }
171
172 pub fn chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
173 Arc::clone(&self.chronological_projection)
174 }
175
176 pub fn protocol_extension<T: 'static>(&self) -> Option<&T> {
177 self.protocol_extension
178 .as_ref()
179 .and_then(|extension| extension.as_any().downcast_ref::<T>())
180 }
181
182 pub fn turn_context(&self) -> &crate::TurnContext {
183 &self.turn_context
184 }
185
186 pub(crate) fn session_graph_service(&self) -> &dyn crate::plugin::SessionGraphService {
187 self.dispatch.session_graph.as_ref()
188 }
189
190 pub(super) async fn emit_turn_activity(
191 &self,
192 correlation_id: TurnActivityId,
193 event: TurnEvent,
194 ) {
195 if let Some(tx) = &self.turn_event_tx {
196 let _ = tx.send(TurnActivity::new(correlation_id, event)).await;
197 }
198 }
199
200 pub(crate) fn with_turn_event_sender(mut self, turn_event_tx: Sender<TurnActivity>) -> Self {
201 self.turn_event_tx = Some(turn_event_tx);
202 self
203 }
204
205 pub(crate) fn with_parent_invocation(mut self, metadata: crate::RuntimeInvocation) -> Self {
206 self.parent_invocation = Some(metadata);
207 self
208 }
209
210 pub(crate) fn with_execution_env_spec(
211 mut self,
212 execution_env_spec: crate::ProcessExecutionEnvSpec,
213 ) -> Self {
214 self.execution_env_spec = execution_env_spec;
215 self
216 }
217
218 pub(crate) fn with_process_registration_context(
219 mut self,
220 registration: &crate::ProcessRegistration,
221 ) -> Self {
222 self.process_originator = Some(registration.provenance.originator.clone());
223 self.runtime_process_id = Some(registration.id.clone());
224 self.process_env_ref = registration.env_ref.clone();
225 self.process_wake_target = registration.wake_target.clone();
226 self
227 }
228
229 pub(crate) fn with_process_event_context(
230 mut self,
231 process_id: impl Into<String>,
232 registry: Arc<dyn crate::ProcessRegistry>,
233 store: Option<Arc<dyn crate::RuntimePersistence>>,
234 session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
235 queued_work_poke: Option<crate::QueuedWorkPoke>,
236 ) -> Self {
237 self.process_event_context = Some(RuntimeExecutionProcessEventContext {
238 process_id: process_id.into(),
239 registry,
240 store,
241 session_store_factory,
242 queued_work_poke,
243 });
244 self
245 }
246
247 pub(super) fn record_started_process(&self, process_id: &str) {
251 self.started_process_ids
252 .lock()
253 .expect("started process ids lock")
254 .insert(process_id.to_string());
255 }
256
257 pub(super) fn is_run_local_process(&self, process_id: &str) -> bool {
258 self.started_process_ids
259 .lock()
260 .expect("started process ids lock")
261 .contains(process_id)
262 }
263
264 pub(crate) fn process_spawn_provenance(&self) -> Option<crate::ProcessSpawnProvenance> {
265 self.process_originator
266 .clone()
267 .map(|originator| crate::ProcessSpawnProvenance {
268 originator,
269 wake_target: self.process_wake_target.clone(),
270 })
271 }
272
273 pub(super) async fn attach_captured_process_execution_env(
274 &self,
275 registration: crate::ProcessRegistration,
276 ) -> Result<crate::ProcessRegistration, crate::PluginError> {
277 if registration.env_ref.is_some() {
278 return Ok(registration);
279 }
280 match registration.input.as_ref() {
281 crate::ProcessInput::ToolCall { .. } | crate::ProcessInput::LashlangProcess { .. } => {
282 let env_ref = self.captured_process_execution_env_ref().await?;
283 Ok(registration.with_execution_env_ref(Some(env_ref)))
284 }
285 crate::ProcessInput::External { .. } | crate::ProcessInput::SessionTurn { .. } => {
286 Ok(registration)
287 }
288 }
289 }
290
291 async fn captured_process_execution_env_ref(
292 &self,
293 ) -> Result<crate::ProcessExecutionEnvRef, crate::PluginError> {
294 if let Some(env_ref) = self.process_env_ref.clone() {
295 return Ok(env_ref);
296 }
297 crate::persist_process_execution_env(
298 self.lashlang_artifact_store.as_ref(),
299 &self.execution_env_spec,
300 )
301 .await
302 }
303
304 pub(crate) fn with_lashlang_execution_trace(
305 mut self,
306 sink: Option<Arc<dyn lash_trace::TraceSink>>,
307 context: lash_trace::TraceContext,
308 ) -> Self {
309 self.lashlang_execution_sink = sink;
310 self.lashlang_execution_context = context;
311 self
312 }
313
314 pub(crate) fn with_turn_phase_probe(
315 mut self,
316 probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
317 ) -> Self {
318 self.turn_phase_probe = probe;
319 self
320 }
321
322 #[doc(hidden)]
323 pub fn named_phase(&self, phase: &'static str) -> crate::runtime::RuntimeNamedPhase {
324 crate::runtime::RuntimeNamedPhase::begin(self.turn_phase_probe.clone(), phase)
325 }
326
327 pub fn parent_invocation(&self) -> Option<&crate::RuntimeInvocation> {
328 self.parent_invocation.as_ref()
329 }
330
331 pub fn lashlang_execution_sink(&self) -> Option<Arc<dyn lash_trace::TraceSink>> {
332 self.lashlang_execution_sink.clone()
333 }
334
335 pub fn lashlang_execution_context(&self) -> &lash_trace::TraceContext {
336 &self.lashlang_execution_context
337 }
338
339 pub(crate) fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
340 self.cancellation_token = Some(cancellation_token);
341 self
342 }
343
344 pub(crate) fn tool_scheduling(&self, name: &str) -> crate::ToolScheduling {
345 crate::tool_dispatch::resolve_tool_scheduling(&self.dispatch, name)
346 }
347
348 pub fn callable_tool_manifest(&self, name: &str) -> Option<crate::ToolManifest> {
349 crate::tool_dispatch::resolve_callable_manifest(&self.dispatch, name)
350 }
351
352 pub fn callable_tool_manifest_by_id(&self, id: &crate::ToolId) -> Option<crate::ToolManifest> {
353 crate::tool_dispatch::resolve_callable_manifest_by_id(&self.dispatch, id)
354 }
355
356 pub fn resolve_lashlang_host_operation(
357 &self,
358 receiver: &lashlang::ResourceHandle,
359 operation: &str,
360 ) -> Result<String, String> {
361 self.lashlang_host_environment
362 .resources
363 .resolve_module_operation(&receiver.resource_type, &receiver.alias, operation)
364 .map(|binding| binding.host_operation.clone())
365 .ok_or_else(|| {
366 format!(
367 "module `{}` of type `{}` does not expose operation `{operation}`",
368 receiver.alias, receiver.resource_type
369 )
370 })
371 }
372
373 pub async fn prepare_lashlang_process_start(
374 &self,
375 start: lashlang::ProcessStart,
376 ) -> Result<(crate::ProcessRegistration, Option<String>), String> {
377 let _phase = self.named_phase("rlm_process.prepare_start");
378 let display_name = Some(start.process_name.clone());
379 let artifact = self
380 .lashlang_artifact_store
381 .get_module_artifact(&start.module_ref)
382 .await
383 .map_err(|err| format!("failed to load lashlang module artifact: {err}"))?
384 .ok_or_else(|| {
385 format!(
386 "missing lashlang module artifact `{}` for process `{}`",
387 start.module_ref, start.process_name
388 )
389 })?;
390 if artifact.host_requirements_ref != start.host_requirements_ref {
391 return Err(format!(
392 "lashlang module artifact `{}` host requirements mismatch: process requested {}, artifact has {}",
393 start.module_ref, start.host_requirements_ref, artifact.host_requirements_ref
394 ));
395 }
396 if artifact.process_ref(&start.process_name) != Some(&start.process_ref) {
397 return Err(format!(
398 "lashlang module artifact `{}` does not export process `{}` as requested ref {:?}",
399 start.module_ref, start.process_name, start.process_ref
400 ));
401 }
402 let args = match serde_json::to_value(lashlang::Value::Record(Arc::new(start.args)))
403 .map_err(|err| format!("failed to serialize process args: {err}"))?
404 {
405 serde_json::Value::Object(map) => map,
406 _ => return Err("process args must serialize as a record".to_string()),
407 };
408 let signal_event_types = artifact
409 .canonical_ir
410 .process(&start.process_name)
411 .map(crate::lashlang_process_signal_event_types)
412 .unwrap_or_default();
413 let process_id = format!("process:{}", uuid::Uuid::new_v4());
414 let registration = crate::ProcessRegistration::session_start_draft(
415 process_id,
416 crate::ProcessInput::LashlangProcess {
417 module_ref: start.module_ref,
418 process_ref: start.process_ref,
419 host_requirements_ref: start.host_requirements_ref,
420 process_name: start.process_name,
421 args,
422 },
423 )
424 .with_extra_event_types(
425 crate::lashlang_process_event_types()
426 .into_iter()
427 .chain(signal_event_types),
428 );
429 Ok((registration, display_name))
430 }
431
432 pub fn lashlang_host_environment(&self) -> &lashlang::LashlangHostEnvironment {
433 &self.lashlang_host_environment
434 }
435
436 pub fn lashlang_abilities(&self) -> lashlang::LashlangAbilities {
437 self.lashlang_abilities
438 }
439
440 pub fn lashlang_language_features(&self) -> lashlang::LashlangLanguageFeatures {
441 self.lashlang_language_features
442 }
443
444 pub fn link_lashlang_module(
445 &self,
446 program: lashlang::Program,
447 ) -> Result<lashlang::LinkedModule, String> {
448 lashlang::LinkedModule::link(program, self.lashlang_host_environment())
449 .map_err(|err| err.to_string())
450 }
451
452 pub async fn perform_lashlang_trigger_operation(
453 &self,
454 operation: &str,
455 payload: serde_json::Value,
456 ) -> Result<serde_json::Value, String> {
457 match lashlang::TriggerHostOperation::from_host_operation(operation) {
458 Some(lashlang::TriggerHostOperation::Register) => self
459 .register_trigger_subscription(payload)
460 .await
461 .map_err(|err| err.to_string()),
462 Some(lashlang::TriggerHostOperation::List) => self
463 .list_trigger_subscriptions(payload)
464 .await
465 .map_err(|err| err.to_string()),
466 Some(lashlang::TriggerHostOperation::Cancel) => self
467 .cancel_trigger_subscription(payload)
468 .await
469 .map_err(|err| err.to_string()),
470 None => Err(format!("unknown trigger operation `{operation}`")),
471 }
472 }
473
474 async fn register_trigger_subscription(
475 &self,
476 payload: serde_json::Value,
477 ) -> Result<serde_json::Value, crate::PluginError> {
478 let router = self.dispatch.trigger_router.as_ref().ok_or_else(|| {
479 crate::PluginError::Session("trigger store is unavailable in this runtime".to_string())
480 })?;
481 let request = lashlang::TriggerRegistrationRequest::decode(&payload)
482 .map_err(|err| crate::PluginError::Session(err.to_string()))?;
483 let source_type = request.source.source_type.clone();
484 let source_value = request.source.value.clone();
485 let source = request.source.to_json();
486 let event_type = lashlang::event_type_for_source(
487 &self.dispatch.plugins.lashlang_resources(),
488 &source_type,
489 )
490 .map_err(|err| crate::PluginError::Session(err.to_string()))?;
491 let validation = crate::plugin::validate_target_process(
492 &request.target,
493 &event_type,
494 &request.inputs,
495 self.lashlang_artifact_store.as_ref(),
496 )
497 .await?;
498 let store = router.store();
499 let source_key = store
500 .source_key_for_subscription(&source_type, &source_value)
501 .await?;
502 let env_ref = match self.process_env_ref.clone() {
503 Some(env_ref) => env_ref,
504 None => {
505 crate::persist_process_execution_env(
506 self.lashlang_artifact_store.as_ref(),
507 &self.execution_env_spec,
508 )
509 .await?
510 }
511 };
512 let registrant = self.process_originator.clone().unwrap_or_else(|| {
513 crate::ProcessOriginator::session(crate::SessionScope::new(self.session_id.clone()))
514 });
515 let wake_target = self
516 .process_wake_target
517 .clone()
518 .or_else(|| match ®istrant {
519 crate::ProcessOriginator::Session { scope } => Some(scope.clone()),
520 crate::ProcessOriginator::Host => None,
521 });
522 let record = store
523 .register_subscription(crate::TriggerSubscriptionDraft {
524 registrant,
525 env_ref,
526 wake_target,
527 name: request.name,
528 source_type,
529 source_key,
530 source,
531 event_ty: validation.event_ty,
532 module_ref: request.target.module_ref,
533 host_requirements_ref: request.target.host_requirements_ref,
534 process_ref: request.target.process_ref,
535 process_name: request.target.process_name,
536 input_template: validation.inputs,
537 })
538 .await?;
539 Ok(crate::plugin::trigger_handle_json(&record.handle))
540 }
541
542 async fn list_trigger_subscriptions(
543 &self,
544 payload: serde_json::Value,
545 ) -> Result<serde_json::Value, crate::PluginError> {
546 let router = self.dispatch.trigger_router.as_ref().ok_or_else(|| {
547 crate::PluginError::Session("trigger store is unavailable in this runtime".to_string())
548 })?;
549 let request = lashlang::TriggerListRequest::decode(&payload)
550 .map_err(|err| crate::PluginError::Session(err.to_string()))?;
551 let mut filter = crate::TriggerSubscriptionFilter::for_session(&self.session_id);
552 filter.target = request.target;
553 filter.name = request.name;
554 filter.source_type = request.source_type;
555 filter.enabled = request.enabled;
556 let registrations = router
557 .store()
558 .list_subscriptions(filter)
559 .await?
560 .iter()
561 .map(crate::TriggerRegistration::from)
562 .collect::<Vec<_>>();
563 serde_json::to_value(registrations).map_err(|err| {
564 crate::PluginError::Session(format!("failed to encode trigger registrations: {err}"))
565 })
566 }
567
568 async fn cancel_trigger_subscription(
569 &self,
570 payload: serde_json::Value,
571 ) -> Result<serde_json::Value, crate::PluginError> {
572 let router = self.dispatch.trigger_router.as_ref().ok_or_else(|| {
573 crate::PluginError::Session("trigger store is unavailable in this runtime".to_string())
574 })?;
575 let request = lashlang::TriggerCancelRequest::decode(&payload)
576 .map_err(|err| crate::PluginError::Session(err.to_string()))?;
577 let changed = router
578 .store()
579 .cancel_subscription(&self.session_id, &request.handle)
580 .await?;
581 Ok(serde_json::json!(changed))
582 }
583
584 pub fn tool_argument_projection_policy(
585 &self,
586 name: &str,
587 ) -> crate::ToolArgumentProjectionPolicy {
588 crate::tool_dispatch::resolve_tool_argument_projection_policy(&self.dispatch, name)
589 }
590
591 pub async fn start_lashlang_process(
592 &self,
593 registration: crate::ProcessRegistration,
594 label: Option<String>,
595 ) -> crate::ToolInvocationReply {
596 let _phase = self.named_phase("rlm_process.start");
597 let registration = match self
598 .attach_captured_process_execution_env(registration)
599 .await
600 {
601 Ok(registration) => registration,
602 Err(err) => {
603 return crate::ToolInvocationReply::error(serde_json::json!(err.to_string()));
604 }
605 };
606 let process_id = registration.id.clone();
607 let mut options = crate::ProcessStartOptions::new()
608 .with_descriptor(crate::ProcessHandleDescriptor::new(Some("lashlang"), label));
609 if let Some(spawn) = self.process_spawn_provenance() {
610 options = options.with_spawn_provenance(spawn);
611 }
612 match self
613 .dispatch
614 .processes
615 .start(
616 &self.session_id,
617 registration,
618 options,
619 self.process_scope(self.parent_invocation.clone()),
620 )
621 .await
622 {
623 Ok(_) => {
624 self.record_started_process(&process_id);
625 crate::ToolInvocationReply::success(crate::lashlang_bridge::process_handle_json(
626 &process_id,
627 ))
628 }
629 Err(err) => crate::ToolInvocationReply::error(serde_json::json!(err.to_string())),
630 }
631 }
632
633 pub async fn sleep_lashlang(
634 &self,
635 scope: &str,
636 sequence: u64,
637 duration_ms: u64,
638 ) -> Result<(), crate::RuntimeEffectControllerError> {
639 let cancellation = self.cancellation_token.clone().unwrap_or_default();
640 let invocation = crate::runtime::causal::lashlang_sleep_invocation(
641 &self.session_id,
642 self.parent_invocation.as_ref(),
643 scope,
644 sequence,
645 );
646 let outcome = self
647 .dispatch
648 .effect_controller
649 .controller()
650 .execute_effect(
651 crate::RuntimeEffectEnvelope::new(
652 invocation,
653 crate::RuntimeEffectCommand::Sleep { duration_ms },
654 ),
655 crate::RuntimeEffectLocalExecutor::sleep(cancellation),
656 )
657 .await?;
658 match outcome {
659 crate::RuntimeEffectOutcome::Sleep => Ok(()),
660 other => Err(crate::RuntimeEffectControllerError::new(
661 "runtime_effect_wrong_outcome",
662 format!("expected sleep outcome, got {}", other.kind().as_str()),
663 )),
664 }
665 }
666
667 pub async fn await_process_event_lashlang(
668 &self,
669 _registry: Arc<dyn crate::ProcessRegistry>,
670 process_id: &str,
671 signal_name: &str,
672 _event_type: &str,
673 event_ordinal: u64,
674 ) -> Result<serde_json::Value, crate::RuntimeEffectControllerError> {
675 let cancellation = self.cancellation_token.clone().unwrap_or_default();
676 let key = self
677 .dispatch
678 .effect_controller
679 .controller()
680 .await_event_key(
681 &crate::ExecutionScope::process(process_id),
682 crate::AwaitEventWaitIdentity::process_signal(
683 process_id,
684 signal_name,
685 event_ordinal,
686 ),
687 )
688 .await?;
689 let invocation = crate::runtime::causal::lashlang_await_event_invocation(
690 &self.session_id,
691 self.parent_invocation.as_ref(),
692 process_id,
693 signal_name,
694 event_ordinal,
695 );
696 let outcome = self
697 .dispatch
698 .effect_controller
699 .controller()
700 .execute_effect(
701 crate::RuntimeEffectEnvelope::new(
702 invocation,
703 crate::RuntimeEffectCommand::AwaitEvent { key },
704 ),
705 crate::RuntimeEffectLocalExecutor::await_event(cancellation, None),
706 )
707 .await?;
708 match outcome.into_await_event()? {
709 crate::Resolution::Ok(value) => Ok(value),
710 crate::Resolution::Err(err) => Err(crate::RuntimeEffectControllerError::new(
711 err.code,
712 err.message,
713 )),
714 crate::Resolution::Timeout => Err(crate::RuntimeEffectControllerError::new(
715 "process_signal_wait_timeout",
716 "process signal wait timed out",
717 )),
718 crate::Resolution::Cancelled => Err(crate::RuntimeEffectControllerError::new(
719 "process_signal_wait_cancelled",
720 "process signal wait was cancelled",
721 )),
722 }
723 }
724
725 pub async fn signal_lashlang_process(
726 &self,
727 registry: Arc<dyn crate::ProcessRegistry>,
728 process_id: &str,
729 signal_name: &str,
730 signal_id: String,
731 payload: serde_json::Value,
732 ) -> Result<crate::ProcessEvent, crate::RuntimeEffectControllerError> {
733 let event_type = crate::process_signal_event_type(signal_name)?;
734 let replay_key = format!("process:{process_id}:signal.{signal_name}:{signal_id}");
735 let signal_payload = payload.clone();
736 let command = crate::ProcessCommand::Signal {
737 process_id: process_id.to_string(),
738 signal_name: signal_name.to_string(),
739 signal_id,
740 request: crate::ProcessEventAppendRequest::new(event_type.clone(), payload)
741 .with_replay_key(replay_key),
742 };
743 let effect_id = command.effect_id();
744 let invocation = crate::runtime::causal::process_effect_invocation(
745 &self.session_id,
746 self.parent_invocation.clone(),
747 &effect_id,
748 );
749 let outcome = self
750 .dispatch
751 .effect_controller
752 .controller()
753 .execute_effect(
754 crate::RuntimeEffectEnvelope::new(
755 invocation,
756 crate::RuntimeEffectCommand::process(command),
757 ),
758 crate::RuntimeEffectLocalExecutor::processes(Arc::clone(®istry)),
759 )
760 .await?;
761 match outcome.into_process()? {
762 crate::ProcessEffectOutcome::Signal { event } => {
763 let waiting_ordinal =
764 registry
765 .get_process(process_id)
766 .await
767 .and_then(|record| match record.wait {
768 Some(crate::WaitState {
769 kind:
770 crate::WaitKind::Signal {
771 name,
772 event_type: wait_event_type,
773 ordinal,
774 ..
775 },
776 ..
777 }) if name == signal_name && wait_event_type == event_type => {
778 Some(ordinal)
779 }
780 _ => None,
781 });
782 let ordinal = match waiting_ordinal {
783 Some(ordinal) => ordinal,
784 None => {
785 registry
786 .count_events_through(process_id, &event_type, event.sequence)
787 .await?
788 }
789 };
790 if ordinal > 0 {
791 let key = self
792 .dispatch
793 .effect_controller
794 .controller()
795 .await_event_key(
796 &crate::ExecutionScope::process(process_id),
797 crate::AwaitEventWaitIdentity::process_signal(
798 process_id,
799 signal_name,
800 ordinal,
801 ),
802 )
803 .await?;
804 let _ = self
805 .dispatch
806 .effect_controller
807 .controller()
808 .resolve_await_event(&key, crate::Resolution::Ok(signal_payload))
809 .await?;
810 }
811 Ok(event)
812 }
813 other => Err(crate::RuntimeEffectControllerError::new(
814 "runtime_effect_wrong_outcome",
815 format!("expected signal outcome, got {other:?}"),
816 )),
817 }
818 }
819}
820
821#[cfg(test)]
822mod tests {
823 use super::*;
824 use crate::tool_dispatch::ToolDispatchContext;
825 use crate::{ToolCall, ToolProvider, ToolResult};
826
827 struct NoopTools;
828
829 #[async_trait::async_trait]
830 impl ToolProvider for NoopTools {
831 fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
832 Vec::new()
833 }
834
835 fn resolve_contract(&self, _name: &str) -> Option<Arc<crate::ToolContract>> {
836 None
837 }
838
839 async fn execute(&self, _call: ToolCall<'_>) -> ToolResult {
840 ToolResult::err_fmt("not used")
841 }
842 }
843
844 #[test]
845 fn tool_argument_projection_policy_resolves_from_active_catalog_and_defaults_unknown() {
846 let tool = crate::ToolDefinition::raw(
847 "tool:seedy",
848 "seedy",
849 "Seed-aware",
850 crate::ToolDefinition::default_input_schema(),
851 serde_json::json!({ "type": "string" }),
852 )
853 .with_argument_projection(
854 crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed"),
855 );
856 let plugins = crate::plugin::PluginHost::empty()
857 .build_session("session", None)
858 .expect("plugin session");
859 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
860 let dispatch = Arc::new(ToolDispatchContext {
861 plugins,
862 tools: Arc::new(NoopTools),
863 tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
864 vec![tool.manifest()],
865 std::collections::BTreeMap::new(),
866 )),
867 sessions: Arc::new(crate::testing::MockSessionManager::default()),
868 session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
869 session_graph: Arc::new(crate::testing::MockSessionManager::default()),
870 processes: Arc::new(crate::UnavailableProcessService),
871 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
872 trigger_router: None,
873 effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
874 crate::InlineRuntimeEffectController,
875 )),
876 direct_completions: crate::DirectCompletionClient::unavailable(
877 "direct completions are unavailable in this test context",
878 ),
879 parent_invocation: None,
880 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
881 crate::PluginOptions::default(),
882 crate::SessionPolicy::default(),
883 ),
884 session_id: "session".to_string(),
885 agent_frame_id: String::new(),
886 event_tx,
887 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
888 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
889 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
890 turn_context: crate::TurnContext::default(),
891 });
892 let ctx = RuntimeExecutionContext::new(
893 "session".to_string(),
894 dispatch,
895 Default::default(),
896 Default::default(),
897 Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
898 Arc::new(crate::InMemoryAttachmentStore::new()),
899 Arc::new(crate::ChronologicalProjection::default()),
900 None,
901 crate::TurnContext::default(),
902 );
903
904 assert_eq!(
905 ctx.tool_argument_projection_policy("seedy"),
906 crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed")
907 );
908 assert_eq!(
909 ctx.tool_argument_projection_policy("missing"),
910 crate::ToolArgumentProjectionPolicy::MaterializeProjectedValues
911 );
912 }
913
914 #[tokio::test]
915 async fn prepare_lashlang_process_start_captures_tool_ids_and_explicit_input() {
916 let tool = crate::ToolDefinition::raw(
917 "tool:alpha",
918 "alpha",
919 "Alpha tool.",
920 crate::ToolDefinition::default_input_schema(),
921 serde_json::json!({ "type": "object", "additionalProperties": true }),
922 );
923 let plugins = crate::plugin::PluginHost::empty()
924 .build_session("session", None)
925 .expect("plugin session");
926 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
927 let dispatch = Arc::new(ToolDispatchContext {
928 plugins,
929 tools: Arc::new(NoopTools),
930 tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
931 vec![tool.manifest()],
932 std::collections::BTreeMap::new(),
933 )),
934 sessions: Arc::new(crate::testing::MockSessionManager::default()),
935 session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
936 session_graph: Arc::new(crate::testing::MockSessionManager::default()),
937 processes: Arc::new(crate::UnavailableProcessService),
938 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
939 trigger_router: None,
940 effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
941 crate::InlineRuntimeEffectController,
942 )),
943 direct_completions: crate::DirectCompletionClient::unavailable(
944 "direct completions are unavailable in this test context",
945 ),
946 parent_invocation: None,
947 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
948 crate::PluginOptions::default(),
949 crate::SessionPolicy::default(),
950 ),
951 session_id: "session".to_string(),
952 agent_frame_id: String::new(),
953 event_tx,
954 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
955 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
956 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
957 turn_context: crate::TurnContext::default(),
958 });
959 let ctx = RuntimeExecutionContext::new(
960 "session".to_string(),
961 dispatch,
962 lashlang::LashlangAbilities::default().with_processes(),
963 Default::default(),
964 Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
965 Arc::new(crate::InMemoryAttachmentStore::new()),
966 Arc::new(crate::ChronologicalProjection::default()),
967 None,
968 crate::TurnContext::default(),
969 );
970 let mut input = lashlang::Record::new();
971 input.insert("root".to_string(), lashlang::Value::String(".".into()));
972 let linked = ctx
973 .link_lashlang_module(
974 lashlang::parse("process scan(root: str) { finish root }").expect("process module"),
975 )
976 .expect("link process module");
977 ctx.put_lashlang_module_artifact(&linked.artifact)
978 .await
979 .expect("store module artifact");
980 let process_ref = linked
981 .artifact
982 .process_ref("scan")
983 .expect("scan process ref")
984 .clone();
985 let (registration, label) = ctx
986 .prepare_lashlang_process_start(lashlang::ProcessStart {
987 module_ref: linked.module_ref.clone(),
988 process_ref,
989 host_requirements_ref: linked.host_requirements_ref.clone(),
990 process_name: "scan".to_string(),
991 args: input,
992 })
993 .await
994 .expect("process start should prepare");
995
996 assert_eq!(label.as_deref(), Some("scan"));
997 assert!(
998 registration
999 .event_types
1000 .iter()
1001 .any(|event_type| event_type.name == "process.wake")
1002 );
1003 let crate::ProcessInput::LashlangProcess {
1004 args, process_name, ..
1005 } = registration.input.as_ref()
1006 else {
1007 panic!("expected lashlang process input");
1008 };
1009 assert_eq!(process_name, "scan");
1010 assert_eq!(args.get("root"), Some(&serde_json::json!(".")));
1011 }
1012
1013 #[test]
1014 fn lashlang_host_environment_reflects_host_abilities() {
1015 let tool = crate::ToolDefinition::raw(
1016 "tool:alpha",
1017 "alpha",
1018 "Alpha tool.",
1019 crate::ToolDefinition::default_input_schema(),
1020 serde_json::json!({ "type": "object", "additionalProperties": true }),
1021 );
1022 let plugins = crate::plugin::PluginHost::empty()
1023 .build_session("session", None)
1024 .expect("plugin session");
1025 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
1026 let dispatch = Arc::new(ToolDispatchContext {
1027 plugins,
1028 tools: Arc::new(NoopTools),
1029 tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
1030 vec![tool.manifest()],
1031 std::collections::BTreeMap::new(),
1032 )),
1033 sessions: Arc::new(crate::testing::MockSessionManager::default()),
1034 session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
1035 session_graph: Arc::new(crate::testing::MockSessionManager::default()),
1036 processes: Arc::new(crate::UnavailableProcessService),
1037 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
1038 trigger_router: None,
1039 effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1040 crate::InlineRuntimeEffectController,
1041 )),
1042 direct_completions: crate::DirectCompletionClient::unavailable(
1043 "direct completions are unavailable in this test context",
1044 ),
1045 parent_invocation: None,
1046 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
1047 crate::PluginOptions::default(),
1048 crate::SessionPolicy::default(),
1049 ),
1050 session_id: "session".to_string(),
1051 agent_frame_id: String::new(),
1052 event_tx,
1053 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1054 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
1055 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
1056 turn_context: crate::TurnContext::default(),
1057 });
1058 let ctx = RuntimeExecutionContext::new(
1059 "session".to_string(),
1060 dispatch,
1061 lashlang::LashlangAbilities::default()
1062 .with_sleep()
1063 .with_processes()
1064 .with_process_signals(),
1065 Default::default(),
1066 Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
1067 Arc::new(crate::InMemoryAttachmentStore::new()),
1068 Arc::new(crate::ChronologicalProjection::default()),
1069 None,
1070 crate::TurnContext::default(),
1071 );
1072
1073 let environment = ctx.lashlang_host_environment();
1074
1075 assert!(std::ptr::eq(environment, ctx.lashlang_host_environment()));
1076 assert!(environment.abilities.processes);
1077 assert!(environment.abilities.sleep);
1078 assert!(environment.abilities.process_signals);
1079 assert!(!environment.abilities.triggers);
1080 assert!(
1081 environment
1082 .resources
1083 .resolve_operation("Tools", "alpha")
1084 .is_some()
1085 );
1086 }
1087
1088 #[test]
1089 fn lashlang_host_environment_reflects_host_resource_contributions() {
1090 let mut resources = lashlang::LashlangHostCatalog::new();
1091 resources
1092 .add_trigger_source_constructor(
1093 ["clock", "Alarm"],
1094 lashlang::TypeExpr::Object(vec![lashlang::TypeField {
1095 name: "at".into(),
1096 ty: lashlang::TypeExpr::Str,
1097 optional: false,
1098 }]),
1099 lashlang::NamedDataType::object(
1100 "clock.Tick",
1101 vec![lashlang::TypeField {
1102 name: "fired_at".into(),
1103 ty: lashlang::TypeExpr::Str,
1104 optional: false,
1105 }],
1106 )
1107 .expect("valid clock tick type"),
1108 )
1109 .expect("valid clock trigger source");
1110 let plugin_host = crate::plugin::PluginHost::empty();
1111 let mut merged_resources = plugin_host.lashlang_resources();
1112 merged_resources.extend(resources);
1113 let plugins = plugin_host
1114 .with_lashlang_resources(merged_resources)
1115 .build_session("session", None)
1116 .expect("plugin session");
1117 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
1118 let dispatch = Arc::new(ToolDispatchContext {
1119 plugins,
1120 tools: Arc::new(NoopTools),
1121 tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
1122 Vec::new(),
1123 std::collections::BTreeMap::new(),
1124 )),
1125 sessions: Arc::new(crate::testing::MockSessionManager::default()),
1126 session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
1127 session_graph: Arc::new(crate::testing::MockSessionManager::default()),
1128 processes: Arc::new(crate::UnavailableProcessService),
1129 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
1130 trigger_router: None,
1131 effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1132 crate::InlineRuntimeEffectController,
1133 )),
1134 direct_completions: crate::DirectCompletionClient::unavailable(
1135 "direct completions are unavailable in this test context",
1136 ),
1137 parent_invocation: None,
1138 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
1139 crate::PluginOptions::default(),
1140 crate::SessionPolicy::default(),
1141 ),
1142 session_id: "session".to_string(),
1143 agent_frame_id: String::new(),
1144 event_tx,
1145 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1146 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
1147 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
1148 turn_context: crate::TurnContext::default(),
1149 });
1150 let ctx = RuntimeExecutionContext::new(
1151 "session".to_string(),
1152 dispatch,
1153 lashlang::LashlangAbilities::default()
1154 .with_processes()
1155 .with_triggers(),
1156 Default::default(),
1157 Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
1158 Arc::new(crate::InMemoryAttachmentStore::new()),
1159 Arc::new(crate::ChronologicalProjection::default()),
1160 None,
1161 crate::TurnContext::default(),
1162 );
1163
1164 let host_environment = ctx.lashlang_host_environment();
1165
1166 assert!(
1167 host_environment
1168 .resources
1169 .resolve_value_constructor(&["clock", "Alarm"])
1170 .is_some()
1171 );
1172 assert!(
1173 host_environment
1174 .resources
1175 .resolve_trigger_source("clock.Alarm")
1176 .is_some()
1177 );
1178 lashlang::LinkedModule::link(
1179 lashlang::parse(
1180 r#"
1181 process remember(tick: clock.Tick) {
1182 finish true
1183 }
1184
1185 source = clock.Alarm({ at: "08:00" })
1186 await triggers.register({
1187 source: source,
1188 target: remember,
1189 inputs: { tick: trigger.event }
1190 })?
1191 "#,
1192 )
1193 .expect("parse trigger registry module"),
1194 host_environment,
1195 )
1196 .expect("host resource contribution should be linkable");
1197 }
1198}